MEDIUM: spoe: Be sure to wakeup the good entity waiting for a buffer

This happens when buffer allocation failed. In the SPOE context, buffers are
allocated by streams and SPOE applets at different time. First, by streams, when
messages need to be encoded before sending them in a NOTIFY frame. Then, by SPOE
applets, when a ACK frame is received.

The first case works as expected, we wake up the stream. But for the second one,
we must wake up the waiting SPOE applet.
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 1de125f..f4da4dd 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -246,8 +246,8 @@
 	struct stream      *strm;         /* The stream that should be offloaded */
 
 	struct list        *messages;     /* List of messages that will be sent during the stream processing */
-	struct buffer      *buffer;       /* Buffer used to store a NOTIFY or ACK frame */
-	struct buffer_wait  buffer_wait;  /* position in the list of streams waiting for a buffer */
+	struct buffer      *buffer;       /* Buffer used to store a encoded messages */
+	struct buffer_wait  buffer_wait;  /* position in the list of ressources waiting for a buffer */
 	struct list         list;
 
 	enum spoe_ctx_state state;        /* SPOE_CTX_ST_* */
@@ -270,6 +270,8 @@
 	unsigned int        flags;          /* SPOE_APPCTX_FL_* */
 
 	unsigned int        status_code;    /* SPOE_FRM_ERR_* */
+	struct buffer      *buffer;         /* Buffer used to store a encoded messages */
+	struct buffer_wait  buffer_wait;    /* position in the list of ressources waiting for a buffer */
 	struct list         waiting_queue;  /* list of streams waiting for a ACK frame, in sync and pipelining mode */
 	struct list         list;           /* next spoe appctx for the same agent */
 };
@@ -309,8 +311,8 @@
 struct flt_ops spoe_ops;
 
 static int  queue_spoe_context(struct spoe_context *ctx);
-static int  acquire_spoe_buffer(struct spoe_context *ctx);
-static void release_spoe_buffer(struct spoe_context *ctx);
+static int  acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
+static void release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
 
 /********************************************************************
  * helper functions/globals
@@ -913,15 +915,15 @@
 	idx += encode_spoe_varint(ctx->stream_id, frame+idx);
 	idx += encode_spoe_varint(ctx->frame_id, frame+idx);
 
-	/* Copy encoded messages */
-	if (idx + ctx->buffer->i > size) {
+	/* check the buffer size */
+	if (idx + SPOE_APPCTX(appctx)->buffer->i > size) {
 		spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
 		return 0;
 	}
 
 	/* Copy encoded messages */
-	memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
-	idx += ctx->buffer->i;
+	memcpy(frame+idx, SPOE_APPCTX(appctx)->buffer->p, SPOE_APPCTX(appctx)->buffer->i);
+	idx += SPOE_APPCTX(appctx)->buffer->i;
 
 	return idx;
 }
@@ -1230,9 +1232,13 @@
 	return 0;
 
   found:
-	if (!acquire_spoe_buffer(ctx))
+	if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait))
 		return 1; /* Retry later */
 
+	/* Transfer the buffer ownership to the SPOE context */
+	ctx->buffer = SPOE_APPCTX(appctx)->buffer;
+	SPOE_APPCTX(appctx)->buffer = &buf_empty;
+
 	/* Copy encoded actions */
 	memcpy(ctx->buffer->p, frame+idx, size-idx);
 	ctx->buffer->i = size-idx;
@@ -1379,6 +1385,15 @@
 /********************************************************************
  * Functions that manage the SPOE applet
  ********************************************************************/
+static int
+wakeup_spoe_appctx(struct appctx *appctx)
+{
+	si_applet_want_get(appctx->owner);
+	si_applet_want_put(appctx->owner);
+	appctx_wakeup(appctx);
+	return 1;
+}
+
 /* Callback function that catches applet timeouts. If a timeout occurred, we set
  * <appctx->st1> flag and the SPOE applet is woken up. */
 static struct task *
@@ -1391,9 +1406,7 @@
 		task->expire = TICK_ETERNITY;
 		appctx->st1 = SPOE_APPCTX_ERR_TOUT;
 	}
-	si_applet_want_get(appctx->owner);
-	si_applet_want_put(appctx->owner);
-	appctx_wakeup(appctx);
+	wakeup_spoe_appctx(appctx);
 	return task;
 }
 
@@ -1441,6 +1454,7 @@
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	}
 
+	release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
 	pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
 
 	if (!LIST_ISEMPTY(&agent->applets))
@@ -1633,6 +1647,11 @@
 	}
 
 	ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+
+	/* Transfer the buffer ownership to the SPOE appctx */
+	SPOE_APPCTX(appctx)->buffer = ctx->buffer;
+	ctx->buffer = &buf_empty;
+
 	ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
 	if (ret > 1)
 		ret = send_spoe_frame(appctx, frame, ret);
@@ -1646,7 +1665,7 @@
 			agent->sending_rate++;
 			ctx->state = SPOE_CTX_ST_ERROR;
 			ctx->status_code = (spoe_status_code + 0x100);
-			release_spoe_buffer(ctx);
+			release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
 			task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
@@ -1661,7 +1680,7 @@
 		default:
 			agent->sending_rate++;
 			ctx->state = SPOE_CTX_ST_WAITING_ACK;
-			release_spoe_buffer(ctx);
+			release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
 			if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC)
@@ -1963,6 +1982,11 @@
 	SPOE_APPCTX(appctx)->max_frame_size  = conf->agent->max_frame_size;
 	SPOE_APPCTX(appctx)->flags           = 0;
 	SPOE_APPCTX(appctx)->status_code     = SPOE_FRM_ERR_NONE;
+	SPOE_APPCTX(appctx)->buffer          = &buf_empty;
+
+	LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
+	SPOE_APPCTX(appctx)->buffer_wait.target = appctx;
+	SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_appctx;
 
 	LIST_INIT(&SPOE_APPCTX(appctx)->list);
 	LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue);
@@ -2093,9 +2117,7 @@
 	list_for_each_entry(spoe_appctx, &agent->applets, list) {
 		appctx = spoe_appctx->owner;
 		if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
-			si_applet_want_get(appctx->owner);
-			si_applet_want_put(appctx->owner);
-			appctx_wakeup(appctx);
+			wakeup_spoe_appctx(appctx);
 			LIST_DEL(&spoe_appctx->list);
 			LIST_ADDQ(&agent->applets, &spoe_appctx->list);
 			break;
@@ -2379,7 +2401,7 @@
 	if (ctx->flags & SPOE_CTX_FL_PROCESS)
 		goto wait;
 
-	if (!acquire_spoe_buffer(ctx))
+	if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait))
 		goto wait;
 
 	/* Set the right flag to prevent request and response processing
@@ -2405,7 +2427,7 @@
 	/* Reset processing timer */
 	ctx->process_exp = TICK_ETERNITY;
 
-	release_spoe_buffer(ctx);
+	release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
 
 	if (!LIST_ISEMPTY(&ctx->list)) {
 		LIST_DEL(&ctx->list);
@@ -2539,39 +2561,41 @@
  * Functions that create/destroy SPOE contexts
  **************************************************************************/
 static int
-acquire_spoe_buffer(struct spoe_context *ctx)
+acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
 {
-	if (ctx->buffer != &buf_empty)
+	if (*buf != &buf_empty)
 		return 1;
 
-	if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
-		LIST_DEL(&ctx->buffer_wait.list);
-		LIST_INIT(&ctx->buffer_wait.list);
+	if (!LIST_ISEMPTY(&buffer_wait->list)) {
+		LIST_DEL(&buffer_wait->list);
+		LIST_INIT(&buffer_wait->list);
 	}
 
-	if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs))
+	if (b_alloc_margin(buf, global.tune.reserved_bufs))
 		return 1;
 
-	LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
+	LIST_ADDQ(&buffer_wq, &buffer_wait->list);
 	return 0;
 }
 
 static void
-release_spoe_buffer(struct spoe_context *ctx)
+release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
 {
-	if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
-		LIST_DEL(&ctx->buffer_wait.list);
-		LIST_INIT(&ctx->buffer_wait.list);
+	if (!LIST_ISEMPTY(&buffer_wait->list)) {
+		LIST_DEL(&buffer_wait->list);
+		LIST_INIT(&buffer_wait->list);
 	}
 
 	/* Release the buffer if needed */
-	if (ctx->buffer != &buf_empty) {
-		b_free(&ctx->buffer);
-		offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+	if (*buf != &buf_empty) {
+		b_free(buf);
+		offer_buffers(buffer_wait->target,
+			      tasks_run_queue + applets_active_queue);
 	}
 }
 
-static int wakeup_spoe_context(struct spoe_context *ctx)
+static int
+wakeup_spoe_context(struct spoe_context *ctx)
 {
 	task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	return 1;
@@ -2643,7 +2667,6 @@
 		list_for_each_entry(fconf, &p->filter_configs, list) {
 			struct spoe_config *conf;
 			struct spoe_agent  *agent;
-			struct appctx      *appctx;
 			struct spoe_appctx *spoe_appctx;
 
 			if (fconf->id != spoe_filter_id)
@@ -2653,10 +2676,7 @@
 			agent = conf->agent;
 
 			list_for_each_entry(spoe_appctx, &agent->applets, list) {
-				appctx = spoe_appctx->owner;
-				si_applet_want_get(appctx->owner);
-				si_applet_want_put(appctx->owner);
-				appctx_wakeup(appctx);
+				wakeup_spoe_appctx(spoe_appctx->owner);
 			}
 		}
 		p = p->next;
@@ -2818,7 +2838,7 @@
 
 	if (tick_is_expired(ctx->process_exp, now_ms)) {
 		s->pending_events |= TASK_WOKEN_MSG;
-		release_spoe_buffer(ctx);
+		release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
 	}
 }