BUG/MAJOR: Fix how the list of entities waiting for a buffer is handled

When an entity tries to get a buffer, if it cannot be allocted, for example
because the number of buffers which may be allocated per process is limited,
this entity is added in a list (called <buffer_wq>) and wait for an available
buffer.

Historically, the <buffer_wq> list was logically attached to streams because it
were the only entities likely to be added in it. Now, applets can also be
waiting for a free buffer. And with filters, we could imagine to have more other
entities waiting for a buffer. So it make sense to have a generic list.

Anyway, with the current design there is a bug. When an applet failed to get a
buffer, it will wait. But we add the stream attached to the applet in
<buffer_wq>, instead of the applet itself. So when a buffer is available, we
wake up the stream and not the waiting applet. So, it is possible to have
waiting applets and never awakened.

So, now, <buffer_wq> is independant from streams. And we really add the waiting
entity in <buffer_wq>. To be generic, the entity is responsible to define the
callback used to awaken it.

In addition, applets will still request an input buffer when they become
active. But they will not be sleeped anymore if no buffer are available. So this
is the responsibility to the applet I/O handler to check if this buffer is
allocated or not. This way, an applet can decide if this buffer is required or
not and can do additional processing if not.

[wt: backport to 1.7 and 1.6]
diff --git a/include/common/buffer.h b/include/common/buffer.h
index ca90fbe..ce3eb40 100644
--- a/include/common/buffer.h
+++ b/include/common/buffer.h
@@ -39,9 +39,18 @@
 	char data[0];                   /* <size> bytes */
 };
 
+/* an element of the <buffer_wq> list. It represents an object that need to
+ * acquire a buffer to continue its process. */
+struct buffer_wait {
+	void *target;              /* The waiting object that should be woken up */
+	int (*wakeup_cb)(void *);  /* The function used to wake up the <target>, passed as argument */
+	struct list list;          /* Next element in the <buffer_wq> list */
+};
+
 extern struct pool_head *pool2_buffer;
 extern struct buffer buf_empty;
 extern struct buffer buf_wanted;
+extern struct list buffer_wq;
 
 int init_buffer();
 int buffer_replace2(struct buffer *b, char *pos, char *end, const char *str, int len);
@@ -522,6 +531,16 @@
 	return next;
 }
 
+
+void __offer_buffer(void *from, unsigned int threshold);
+
+static inline void offer_buffers(void *from, unsigned int threshold)
+{
+	if (LIST_ISEMPTY(&buffer_wq))
+		return;
+	__offer_buffer(from, threshold);
+}
+
 #endif /* _COMMON_BUFFER_H */
 
 /*
diff --git a/include/proto/applet.h b/include/proto/applet.h
index 5a503b4..653be31 100644
--- a/include/proto/applet.h
+++ b/include/proto/applet.h
@@ -36,6 +36,10 @@
 
 void applet_run_active();
 
+
+static int inline appctx_res_wakeup(struct appctx *appctx);
+
+
 /* Initializes all required fields for a new appctx. Note that it does the
  * minimum acceptable initialization for an appctx. This means only the
  * 3 integer states st0, st1, st2 are zeroed.
@@ -61,6 +65,9 @@
 		appctx->applet = applet;
 		appctx_init(appctx);
 		LIST_INIT(&appctx->runq);
+		LIST_INIT(&appctx->buffer_wait.list);
+		appctx->buffer_wait.target = appctx;
+		appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
 		nb_applets++;
 	}
 	return appctx;
@@ -75,6 +82,10 @@
 		LIST_DEL(&appctx->runq);
 		applets_active_queue--;
 	}
+	if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
+		LIST_DEL(&appctx->buffer_wait.list);
+		LIST_INIT(&appctx->buffer_wait.list);
+	}
 	pool_free2(pool2_connection, appctx);
 	nb_applets--;
 }
@@ -98,6 +109,19 @@
 	}
 }
 
+/* Callback used to wake up an applet when a buffer is available. The applet
+ * <appctx> is woken up is if it is not already in the list of "active"
+ * applets. This functions returns 1 is the stream is woken up, otherwise it
+ * returns 0. */
+static inline int appctx_res_wakeup(struct appctx *appctx)
+{
+	if (!LIST_ISEMPTY(&appctx->runq))
+		return 0;
+	appctx_wakeup(appctx);
+	return 1;
+}
+
+
 #endif /* _PROTO_APPLET_H */
 
 /*
diff --git a/include/proto/channel.h b/include/proto/channel.h
index 3d435c4..304a935 100644
--- a/include/proto/channel.h
+++ b/include/proto/channel.h
@@ -36,6 +36,9 @@
 #include <types/stream.h>
 #include <types/stream_interface.h>
 
+#include <proto/applet.h>
+#include <proto/task.h>
+
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_channel();
 
@@ -439,6 +442,41 @@
 	return ret;
 }
 
+/* Allocates a buffer for channel <chn>, but only if it's guaranteed that it's
+ * not the last available buffer or it's the response buffer. Unless the buffer
+ * is the response buffer, an extra control is made so that we always keep
+ * <tune.buffers.reserved> buffers available after this allocation. Returns 0 in
+ * case of failure, non-zero otherwise.
+ *
+ * If no buffer are available, the requester, represented by <wait> pointer,
+ * will be added in the list of objects waiting for an available buffer.
+ */
+static inline int channel_alloc_buffer(struct channel *chn, struct buffer_wait *wait)
+{
+	int margin = 0;
+
+	if (!(chn->flags & CF_ISRESP))
+		margin = global.tune.reserved_bufs;
+
+	if (b_alloc_margin(&chn->buf, margin) != NULL)
+		return 1;
+
+	if (LIST_ISEMPTY(&wait->list))
+		LIST_ADDQ(&buffer_wq, &wait->list);
+	return 0;
+}
+
+/* Releases a possibly allocated buffer for channel <chn>. If it was not
+ * allocated, this function does nothing. Else the buffer is released and we try
+ * to wake up as many streams/applets as possible. */
+static inline void channel_release_buffer(struct channel *chn, struct buffer_wait *wait)
+{
+	if (chn->buf->size && buffer_empty(chn->buf)) {
+		b_free(&chn->buf);
+		offer_buffers(wait->target, tasks_run_queue + applets_active_queue);
+	}
+}
+
 /* Truncate any unread data in the channel's buffer, and disable forwarding.
  * Outgoing data are left intact. This is mainly to be used to send error
  * messages after existing data.
diff --git a/include/proto/stream.h b/include/proto/stream.h
index b439344..85c234e 100644
--- a/include/proto/stream.h
+++ b/include/proto/stream.h
@@ -32,7 +32,6 @@
 
 extern struct pool_head *pool2_stream;
 extern struct list streams;
-extern struct list buffer_wq;
 
 extern struct data_cb sess_conn_cb;
 
@@ -55,11 +54,7 @@
 
 /* Update the stream's backend and server time stats */
 void stream_update_time_stats(struct stream *s);
-void __stream_offer_buffers(int rqlimit);
-static inline void stream_offer_buffers();
-int stream_alloc_work_buffer(struct stream *s);
 void stream_release_buffers(struct stream *s);
-int stream_alloc_recv_buffer(struct channel *chn);
 
 /* returns the session this stream belongs to */
 static inline struct session *strm_sess(const struct stream *strm)
@@ -285,25 +280,16 @@
 	LIST_INIT(&sess->by_srv);
 }
 
-static inline void stream_offer_buffers()
+/* Callback used to wake up a stream when a buffer is available. The stream <s>
+ * is woken up is if it is not already running and if it is not already in the
+ * task run queue. This functions returns 1 is the stream is woken up, otherwise
+ * it returns 0. */
+static int inline stream_res_wakeup(struct stream *s)
 {
-	int avail;
-
-	if (LIST_ISEMPTY(&buffer_wq))
-		return;
-
-	/* all streams will need 1 buffer, so we can stop waking up streams
-	 * once we have enough of them to eat all the buffers. Note that we
-	 * don't really know if they are streams or just other tasks, but
-	 * that's a rough estimate. Similarly, for each cached event we'll need
-	 * 1 buffer. If no buffer is currently used, always wake up the number
-	 * of tasks we can offer a buffer based on what is allocated, and in
-	 * any case at least one task per two reserved buffers.
-	 */
-	avail = pool2_buffer->allocated - pool2_buffer->used - global.tune.reserved_bufs / 2;
-
-	if (avail > (int)tasks_run_queue)
-		__stream_offer_buffers(avail);
+	if (s->task->state & TASK_RUNNING || task_in_rq(s->task))
+		return 0;
+	task_wakeup(s->task, TASK_WOKEN_RES);
+	return 1;
 }
 
 void service_keywords_register(struct action_kw_list *kw_list);
diff --git a/include/types/applet.h b/include/types/applet.h
index da9f787..89602aa 100644
--- a/include/types/applet.h
+++ b/include/types/applet.h
@@ -26,6 +26,7 @@
 #include <types/obj_type.h>
 #include <types/proxy.h>
 #include <types/stream.h>
+#include <common/buffer.h>
 #include <common/chunk.h>
 #include <common/config.h>
 
@@ -58,6 +59,7 @@
 	void (*io_release)(struct appctx *appctx);  /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK,
 	                                               if the command is terminated or the session released */
 	void *private;
+	struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
 
 	union {
 		struct {
diff --git a/include/types/stream.h b/include/types/stream.h
index 2cc903c..26e8dd5 100644
--- a/include/types/stream.h
+++ b/include/types/stream.h
@@ -135,7 +135,7 @@
 	struct list list;               /* position in global streams list */
 	struct list by_srv;             /* position in server stream list */
 	struct list back_refs;          /* list of users tracking this stream */
-	struct list buffer_wait;        /* position in the list of streams waiting for a buffer */
+	struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
 
 	struct {
 		struct stksess *ts;
diff --git a/src/applet.c b/src/applet.c
index ad40e1f..f5bc79d 100644
--- a/src/applet.c
+++ b/src/applet.c
@@ -16,6 +16,7 @@
 #include <common/config.h>
 #include <common/mini-clist.h>
 #include <proto/applet.h>
+#include <proto/channel.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
 
@@ -48,13 +49,12 @@
 		curr = LIST_ELEM(applet_cur_queue.n, typeof(curr), runq);
 		si = curr->owner;
 
-		/* now we'll need a buffer */
-		if (!stream_alloc_recv_buffer(si_ic(si))) {
-			si->flags |= SI_FL_WAIT_ROOM;
-			LIST_DEL(&curr->runq);
-			LIST_INIT(&curr->runq);
-			continue;
-		}
+		/* Now we'll try to allocate the input buffer. We wake up the
+		 * applet in all cases. So this is the applet responsibility to
+		 * check if this buffer was allocated or not. This let a chance
+		 * for applets to do some other processing if needed. */
+		if (!channel_alloc_buffer(si_ic(si), &curr->buffer_wait))
+			si_applet_cant_put(si);
 
 		/* We always pretend the applet can't get and doesn't want to
 		 * put, it's up to it to change this if needed. This ensures
@@ -65,6 +65,7 @@
 
 		curr->applet->fct(curr);
 		si_applet_wake_cb(si);
+		channel_release_buffer(si_ic(si), &curr->buffer_wait);
 
 		if (applet_cur_queue.n == &curr->runq) {
 			/* curr was left in the list, move it back to the active list */
diff --git a/src/buffer.c b/src/buffer.c
index f47fbdd..4f8f647 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -31,6 +31,9 @@
 struct buffer buf_empty  = { .p = buf_empty.data };
 struct buffer buf_wanted = { .p = buf_wanted.data };
 
+/* list of objects waiting for at least one buffer */
+struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
+
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_buffer()
 {
@@ -278,6 +281,35 @@
 	fflush(o);
 }
 
+void __offer_buffer(void *from, unsigned int threshold)
+{
+	struct buffer_wait *wait, *bak;
+	int avail;
+
+	/* For now, we consider that all objects need 1 buffer, so we can stop
+	 * waking up them once we have enough of them to eat all the available
+	 * buffers. Note that we don't really know if they are streams or just
+	 * other tasks, but that's a rough estimate. Similarly, for each cached
+	 * event we'll need 1 buffer. If no buffer is currently used, always
+	 * wake up the number of tasks we can offer a buffer based on what is
+	 * allocated, and in any case at least one task per two reserved
+	 * buffers.
+	 */
+	avail = pool2_buffer->allocated - pool2_buffer->used - global.tune.reserved_bufs / 2;
+
+	list_for_each_entry_safe(wait, bak, &buffer_wq, list) {
+		if (avail <= threshold)
+			break;
+
+		if (wait->target == from || !wait->wakeup_cb(wait->target))
+			continue;
+
+		LIST_DEL(&wait->list);
+		LIST_INIT(&wait->list);
+
+		avail--;
+	}
+}
 
 /*
  * Local variables:
diff --git a/src/cli.c b/src/cli.c
index a1923bc..3d537ba 100644
--- a/src/cli.c
+++ b/src/cli.c
@@ -488,6 +488,12 @@
 	if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO))
 		goto out;
 
+	/* Check if the input buffer is avalaible. */
+	if (res->buf->size == 0) {
+		si_applet_cant_put(si);
+		goto out;
+	}
+
 	while (1) {
 		if (appctx->st0 == CLI_ST_INIT) {
 			/* Stats output not initialized yet */
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 776848e..aa6414a 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -224,7 +224,7 @@
 	struct appctx      *appctx;       /* The SPOE appctx */
 	struct list        *messages;     /* List of messages that will be sent during the stream processing */
 	struct buffer      *buffer;       /* Buffer used to store a NOTIFY or ACK frame */
-	struct list         buffer_wait;  /* position in the list of streams waiting for a buffer */
+	struct buffer_wait  buffer_wait;  /* position in the list of streams waiting for a buffer */
 	struct list         applet_wait;  /* position in the list of streams waiting for a SPOE applet */
 
 	enum spoe_ctx_state state;        /* SPOE_CTX_ST_* */
@@ -1232,6 +1232,9 @@
 	int                      framesz, ret;
 	uint32_t                 netint;
 
+	if (si_ic(si)->buf->size == 0)
+		return -1;
+
 	ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
 	if (ret <= 0)
 		goto skip_or_error;
@@ -1524,7 +1527,7 @@
 			/* fall through */
 
 		case SPOE_APPCTX_ST_END:
-			break;
+			return;
 	}
 
  out:
@@ -1693,13 +1696,13 @@
 	/* If needed, initialize the buffer that will be used to encode messages
 	 * and decode actions. */
 	if (ctx->buffer == &buf_empty) {
-		if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
-			LIST_DEL(&ctx->buffer_wait);
-			LIST_INIT(&ctx->buffer_wait);
+		if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
+			LIST_DEL(&ctx->buffer_wait.list);
+			LIST_INIT(&ctx->buffer_wait.list);
 		}
 
-		if (!b_alloc_margin(&ctx->buffer, 0)) {
-			LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
+		if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
+			LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
 			goto wait;
 		}
 	}
@@ -1794,8 +1797,7 @@
 	/* Release the buffer if needed */
 	if (ctx->buffer != &buf_empty) {
 		b_free(&ctx->buffer);
-		if (!LIST_ISEMPTY(&buffer_wq))
-			stream_offer_buffers();
+		offer_buffers(ctx, tasks_run_queue + applets_active_queue);
 	}
 
 	/* If there is no SPOE applet, all is done */
@@ -2213,6 +2215,12 @@
 /***************************************************************************
  * Functions that create/destroy SPOE contexts
  **************************************************************************/
+static int wakeup_spoe_context(struct spoe_context *ctx)
+{
+	task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+	return 1;
+}
+
 static struct spoe_context *
 create_spoe_context(struct filter *filter)
 {
@@ -2229,7 +2237,9 @@
 	ctx->flags    = 0;
 	ctx->messages = conf->agent->messages;
 	ctx->buffer   = &buf_empty;
-	LIST_INIT(&ctx->buffer_wait);
+	LIST_INIT(&ctx->buffer_wait.list);
+	ctx->buffer_wait.target = ctx;
+	ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
 	LIST_INIT(&ctx->applet_wait);
 
 	ctx->stream_id   = 0;
@@ -2247,8 +2257,8 @@
 
 	if (ctx->appctx)
 		APPCTX_SPOE(ctx->appctx).ctx = NULL;
-	if (!LIST_ISEMPTY(&ctx->buffer_wait))
-		LIST_DEL(&ctx->buffer_wait);
+	if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
+		LIST_DEL(&ctx->buffer_wait.list);
 	if (!LIST_ISEMPTY(&ctx->applet_wait))
 		LIST_DEL(&ctx->applet_wait);
 	pool_free2(pool2_spoe_ctx, ctx);
@@ -2459,8 +2469,13 @@
 {
 	struct spoe_context *ctx = filter->ctx;
 
-	if (tick_is_expired(ctx->process_exp, now_ms))
-		s->task->state |= TASK_WOKEN_MSG;
+	if (tick_is_expired(ctx->process_exp, now_ms)) {
+		s->pending_events |= TASK_WOKEN_MSG;
+		if (ctx->buffer != &buf_empty) {
+			b_free(&ctx->buffer);
+			offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+		}
+	}
 }
 
 /* Called when we are ready to filter data on a channel */
diff --git a/src/hlua.c b/src/hlua.c
index 0ca3ec2..10ed8ee 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -1884,10 +1884,8 @@
 	 * the request buffer if its not required.
 	 */
 	if (socket->s->req.buf->size == 0) {
-		if (!stream_alloc_recv_buffer(&socket->s->req)) {
-			socket->s->si[0].flags |= SI_FL_WAIT_ROOM;
-			goto hlua_socket_write_yield_return;
-		}
+		si_applet_cant_put(&socket->s->si[0]);
+		goto hlua_socket_write_yield_return;
 	}
 
 	/* Check for avalaible space. */
@@ -2610,6 +2608,14 @@
 	int ret;
 	int max;
 
+	/* Check if the buffer is avalaible because HAProxy doesn't allocate
+	 * the request buffer if its not required.
+	 */
+	if (chn->buf->size == 0) {
+		si_applet_cant_put(chn_prod(chn));
+		WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_append_yield, TICK_ETERNITY, 0));
+	}
+
 	max = channel_recv_limit(chn) - buffer_len(chn->buf);
 	if (max > len - l)
 		max = len - l;
@@ -2700,10 +2706,8 @@
 	 * the request buffer if its not required.
 	 */
 	if (chn->buf->size == 0) {
-		if (!stream_alloc_recv_buffer(chn)) {
-			chn_prod(chn)->flags |= SI_FL_WAIT_ROOM;
-			WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_send_yield, TICK_ETERNITY, 0));
-		}
+		si_applet_cant_put(chn_prod(chn));
+		WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_channel_send_yield, TICK_ETERNITY, 0));
 	}
 
 	/* the writed data will be immediatly sent, so we can check
diff --git a/src/peers.c b/src/peers.c
index 1a80ab3..1a280a5 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -547,6 +547,10 @@
 	size_t proto_len = strlen(PEER_SESSION_PROTO_NAME);
 	unsigned int maj_ver, min_ver;
 
+	/* Check if the input buffer is avalaible. */
+	if (si_ic(si)->buf->size == 0)
+		goto full;
+
 	while (1) {
 switchstate:
 		maj_ver = min_ver = (unsigned int)-1;
diff --git a/src/stats.c b/src/stats.c
index 1a842e8..8ad983d 100644
--- a/src/stats.c
+++ b/src/stats.c
@@ -2766,6 +2766,12 @@
 	if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO))
 		goto out;
 
+	/* Check if the input buffer is avalaible. */
+	if (res->buf->size == 0) {
+		si_applet_cant_put(si);
+		goto out;
+	}
+
 	/* check that the output is not closed */
 	if (res->flags & (CF_SHUTW|CF_SHUTW_NOW))
 		appctx->st0 = STAT_HTTP_DONE;
diff --git a/src/stream.c b/src/stream.c
index db8702d..298830d 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -62,9 +62,6 @@
 struct pool_head *pool2_stream;
 struct list streams;
 
-/* list of streams waiting for at least one buffer */
-struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
-
 /* List of all use-service keywords. */
 static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
 
@@ -139,7 +136,10 @@
 	/* OK, we're keeping the stream, so let's properly initialize the stream */
 	LIST_ADDQ(&streams, &s->list);
 	LIST_INIT(&s->back_refs);
-	LIST_INIT(&s->buffer_wait);
+
+	LIST_INIT(&s->buffer_wait.list);
+	s->buffer_wait.target = s;
+	s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup;
 
 	s->flags |= SF_INITIALIZED;
 	s->unique_id = NULL;
@@ -289,15 +289,15 @@
 		put_pipe(s->res.pipe);
 
 	/* We may still be present in the buffer wait queue */
-	if (!LIST_ISEMPTY(&s->buffer_wait)) {
-		LIST_DEL(&s->buffer_wait);
-		LIST_INIT(&s->buffer_wait);
+	if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+		LIST_DEL(&s->buffer_wait.list);
+		LIST_INIT(&s->buffer_wait.list);
 	}
-
-	b_drop(&s->req.buf);
-	b_drop(&s->res.buf);
-	if (!LIST_ISEMPTY(&buffer_wq))
-		stream_offer_buffers();
+	if (s->req.buf->size || s->res.buf->size) {
+		b_drop(&s->req.buf);
+		b_drop(&s->res.buf);
+		offer_buffers(NULL, tasks_run_queue + applets_active_queue);
+	}
 
 	hlua_ctx_destroy(&s->hlua);
 	if (s->txn)
@@ -370,33 +370,6 @@
 	}
 }
 
-/* Allocates a receive buffer for channel <chn>, but only if it's guaranteed
- * that it's not the last available buffer or it's the response buffer. Unless
- * the buffer is the response buffer, an extra control is made so that we always
- * keep <tune.buffers.reserved> buffers available after this allocation. To be
- * called at the beginning of recv() callbacks to ensure that the required
- * buffers are properly allocated. Returns 0 in case of failure, non-zero
- * otherwise.
- */
-int stream_alloc_recv_buffer(struct channel *chn)
-{
-	struct stream *s;
-	struct buffer *b;
-	int margin = 0;
-
-	if (!(chn->flags & CF_ISRESP))
-		margin = global.tune.reserved_bufs;
-
-	s = chn_strm(chn);
-
-	b = b_alloc_margin(&chn->buf, margin);
-	if (b)
-		return 1;
-
-	if (LIST_ISEMPTY(&s->buffer_wait))
-		LIST_ADDQ(&buffer_wq, &s->buffer_wait);
-	return 0;
-}
 
 /* Allocates a work buffer for stream <s>. It is meant to be called inside
  * process_stream(). It will only allocate the side needed for the function
@@ -406,60 +379,44 @@
  * server from releasing a connection. Returns 0 in case of failure, non-zero
  * otherwise.
  */
-int stream_alloc_work_buffer(struct stream *s)
+static int stream_alloc_work_buffer(struct stream *s)
 {
-	if (!LIST_ISEMPTY(&s->buffer_wait)) {
-		LIST_DEL(&s->buffer_wait);
-		LIST_INIT(&s->buffer_wait);
+	if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+		LIST_DEL(&s->buffer_wait.list);
+		LIST_INIT(&s->buffer_wait.list);
 	}
 
 	if (b_alloc_margin(&s->res.buf, 0))
 		return 1;
 
-	LIST_ADDQ(&buffer_wq, &s->buffer_wait);
+	LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
 	return 0;
 }
 
 /* releases unused buffers after processing. Typically used at the end of the
- * update() functions. It will try to wake up as many tasks as the number of
- * buffers that it releases. In practice, most often streams are blocked on
- * a single buffer, so it makes sense to try to wake two up when two buffers
- * are released at once.
+ * update() functions. It will try to wake up as many tasks/applets as the
+ * number of buffers that it releases. In practice, most often streams are
+ * blocked on a single buffer, so it makes sense to try to wake two up when two
+ * buffers are released at once.
  */
 void stream_release_buffers(struct stream *s)
 {
-	if (s->req.buf->size && buffer_empty(s->req.buf))
-		b_free(&s->req.buf);
+	int offer = 0;
 
-	if (s->res.buf->size && buffer_empty(s->res.buf))
+	if (s->req.buf->size && buffer_empty(s->req.buf)) {
+		offer = 1;
+		b_free(&s->req.buf);
+	}
+	if (s->res.buf->size && buffer_empty(s->res.buf)) {
+		offer = 1;
 		b_free(&s->res.buf);
+	}
 
 	/* if we're certain to have at least 1 buffer available, and there is
 	 * someone waiting, we can wake up a waiter and offer them.
 	 */
-	if (!LIST_ISEMPTY(&buffer_wq))
-		stream_offer_buffers();
-}
-
-/* Runs across the list of pending streams waiting for a buffer and wakes one
- * up if buffers are available. Will stop when the run queue reaches <rqlimit>.
- * Should not be called directly, use stream_offer_buffers() instead.
- */
-void __stream_offer_buffers(int rqlimit)
-{
-	struct stream *sess, *bak;
-
-	list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) {
-		if (rqlimit <= tasks_run_queue)
-			break;
-
-		if (sess->task->state & TASK_RUNNING)
-			continue;
-
-		LIST_DEL(&sess->buffer_wait);
-		LIST_INIT(&sess->buffer_wait);
-		task_wakeup(sess->task, TASK_WOKEN_RES);
-	}
+	if (offer)
+		offer_buffers(s, tasks_run_queue + applets_active_queue);
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
@@ -2817,7 +2774,7 @@
 			chunk_appendf(&trash,
 			     "  txn=%p flags=0x%x meth=%d status=%d req.st=%s rsp.st=%s waiting=%d\n",
 			      strm->txn, strm->txn->flags, strm->txn->meth, strm->txn->status,
-			      http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait));
+			      http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait.list));
 
 		chunk_appendf(&trash,
 			     "  si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s, et=0x%03x)\n",
diff --git a/src/stream_interface.c b/src/stream_interface.c
index e3e6cc6..d5f2c87 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -538,8 +538,6 @@
 	}
 	if (ic->flags & CF_READ_ACTIVITY)
 		ic->flags &= ~CF_READ_DONTWAIT;
-
-	stream_release_buffers(si_strm(si));
 }
 
 
@@ -571,6 +569,7 @@
 	 * stream-int status.
 	 */
 	stream_int_notify(si);
+	channel_release_buffer(ic, &(si_strm(si)->buffer_wait));
 
 	/* Third step : update the connection's polling status based on what
 	 * was done above (eg: maybe some buffers got emptied).
@@ -1128,8 +1127,8 @@
 		ic->pipe = NULL;
 	}
 
-	/* now we'll need a buffer */
-	if (!stream_alloc_recv_buffer(ic)) {
+	/* now we'll need a input buffer for the stream */
+	if (!channel_alloc_buffer(ic, &(si_strm(si)->buffer_wait))) {
 		si->flags |= SI_FL_WAIT_ROOM;
 		goto end_recv;
 	}