BUG/MAJOR: Fix how the list of entities waiting for a buffer is handled

When an entity tries to get a buffer, if it cannot be allocted, for example
because the number of buffers which may be allocated per process is limited,
this entity is added in a list (called <buffer_wq>) and wait for an available
buffer.

Historically, the <buffer_wq> list was logically attached to streams because it
were the only entities likely to be added in it. Now, applets can also be
waiting for a free buffer. And with filters, we could imagine to have more other
entities waiting for a buffer. So it make sense to have a generic list.

Anyway, with the current design there is a bug. When an applet failed to get a
buffer, it will wait. But we add the stream attached to the applet in
<buffer_wq>, instead of the applet itself. So when a buffer is available, we
wake up the stream and not the waiting applet. So, it is possible to have
waiting applets and never awakened.

So, now, <buffer_wq> is independant from streams. And we really add the waiting
entity in <buffer_wq>. To be generic, the entity is responsible to define the
callback used to awaken it.

In addition, applets will still request an input buffer when they become
active. But they will not be sleeped anymore if no buffer are available. So this
is the responsibility to the applet I/O handler to check if this buffer is
allocated or not. This way, an applet can decide if this buffer is required or
not and can do additional processing if not.

[wt: backport to 1.7 and 1.6]
diff --git a/src/stream.c b/src/stream.c
index db8702d..298830d 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -62,9 +62,6 @@
 struct pool_head *pool2_stream;
 struct list streams;
 
-/* list of streams waiting for at least one buffer */
-struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
-
 /* List of all use-service keywords. */
 static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
 
@@ -139,7 +136,10 @@
 	/* OK, we're keeping the stream, so let's properly initialize the stream */
 	LIST_ADDQ(&streams, &s->list);
 	LIST_INIT(&s->back_refs);
-	LIST_INIT(&s->buffer_wait);
+
+	LIST_INIT(&s->buffer_wait.list);
+	s->buffer_wait.target = s;
+	s->buffer_wait.wakeup_cb = (int (*)(void *))stream_res_wakeup;
 
 	s->flags |= SF_INITIALIZED;
 	s->unique_id = NULL;
@@ -289,15 +289,15 @@
 		put_pipe(s->res.pipe);
 
 	/* We may still be present in the buffer wait queue */
-	if (!LIST_ISEMPTY(&s->buffer_wait)) {
-		LIST_DEL(&s->buffer_wait);
-		LIST_INIT(&s->buffer_wait);
+	if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+		LIST_DEL(&s->buffer_wait.list);
+		LIST_INIT(&s->buffer_wait.list);
 	}
-
-	b_drop(&s->req.buf);
-	b_drop(&s->res.buf);
-	if (!LIST_ISEMPTY(&buffer_wq))
-		stream_offer_buffers();
+	if (s->req.buf->size || s->res.buf->size) {
+		b_drop(&s->req.buf);
+		b_drop(&s->res.buf);
+		offer_buffers(NULL, tasks_run_queue + applets_active_queue);
+	}
 
 	hlua_ctx_destroy(&s->hlua);
 	if (s->txn)
@@ -370,33 +370,6 @@
 	}
 }
 
-/* Allocates a receive buffer for channel <chn>, but only if it's guaranteed
- * that it's not the last available buffer or it's the response buffer. Unless
- * the buffer is the response buffer, an extra control is made so that we always
- * keep <tune.buffers.reserved> buffers available after this allocation. To be
- * called at the beginning of recv() callbacks to ensure that the required
- * buffers are properly allocated. Returns 0 in case of failure, non-zero
- * otherwise.
- */
-int stream_alloc_recv_buffer(struct channel *chn)
-{
-	struct stream *s;
-	struct buffer *b;
-	int margin = 0;
-
-	if (!(chn->flags & CF_ISRESP))
-		margin = global.tune.reserved_bufs;
-
-	s = chn_strm(chn);
-
-	b = b_alloc_margin(&chn->buf, margin);
-	if (b)
-		return 1;
-
-	if (LIST_ISEMPTY(&s->buffer_wait))
-		LIST_ADDQ(&buffer_wq, &s->buffer_wait);
-	return 0;
-}
 
 /* Allocates a work buffer for stream <s>. It is meant to be called inside
  * process_stream(). It will only allocate the side needed for the function
@@ -406,60 +379,44 @@
  * server from releasing a connection. Returns 0 in case of failure, non-zero
  * otherwise.
  */
-int stream_alloc_work_buffer(struct stream *s)
+static int stream_alloc_work_buffer(struct stream *s)
 {
-	if (!LIST_ISEMPTY(&s->buffer_wait)) {
-		LIST_DEL(&s->buffer_wait);
-		LIST_INIT(&s->buffer_wait);
+	if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+		LIST_DEL(&s->buffer_wait.list);
+		LIST_INIT(&s->buffer_wait.list);
 	}
 
 	if (b_alloc_margin(&s->res.buf, 0))
 		return 1;
 
-	LIST_ADDQ(&buffer_wq, &s->buffer_wait);
+	LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
 	return 0;
 }
 
 /* releases unused buffers after processing. Typically used at the end of the
- * update() functions. It will try to wake up as many tasks as the number of
- * buffers that it releases. In practice, most often streams are blocked on
- * a single buffer, so it makes sense to try to wake two up when two buffers
- * are released at once.
+ * update() functions. It will try to wake up as many tasks/applets as the
+ * number of buffers that it releases. In practice, most often streams are
+ * blocked on a single buffer, so it makes sense to try to wake two up when two
+ * buffers are released at once.
  */
 void stream_release_buffers(struct stream *s)
 {
-	if (s->req.buf->size && buffer_empty(s->req.buf))
-		b_free(&s->req.buf);
+	int offer = 0;
 
-	if (s->res.buf->size && buffer_empty(s->res.buf))
+	if (s->req.buf->size && buffer_empty(s->req.buf)) {
+		offer = 1;
+		b_free(&s->req.buf);
+	}
+	if (s->res.buf->size && buffer_empty(s->res.buf)) {
+		offer = 1;
 		b_free(&s->res.buf);
+	}
 
 	/* if we're certain to have at least 1 buffer available, and there is
 	 * someone waiting, we can wake up a waiter and offer them.
 	 */
-	if (!LIST_ISEMPTY(&buffer_wq))
-		stream_offer_buffers();
-}
-
-/* Runs across the list of pending streams waiting for a buffer and wakes one
- * up if buffers are available. Will stop when the run queue reaches <rqlimit>.
- * Should not be called directly, use stream_offer_buffers() instead.
- */
-void __stream_offer_buffers(int rqlimit)
-{
-	struct stream *sess, *bak;
-
-	list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) {
-		if (rqlimit <= tasks_run_queue)
-			break;
-
-		if (sess->task->state & TASK_RUNNING)
-			continue;
-
-		LIST_DEL(&sess->buffer_wait);
-		LIST_INIT(&sess->buffer_wait);
-		task_wakeup(sess->task, TASK_WOKEN_RES);
-	}
+	if (offer)
+		offer_buffers(s, tasks_run_queue + applets_active_queue);
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
@@ -2817,7 +2774,7 @@
 			chunk_appendf(&trash,
 			     "  txn=%p flags=0x%x meth=%d status=%d req.st=%s rsp.st=%s waiting=%d\n",
 			      strm->txn, strm->txn->flags, strm->txn->meth, strm->txn->status,
-			      http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait));
+			      http_msg_state_str(strm->txn->req.msg_state), http_msg_state_str(strm->txn->rsp.msg_state), !LIST_ISEMPTY(&strm->buffer_wait.list));
 
 		chunk_appendf(&trash,
 			     "  si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s, et=0x%03x)\n",