BUG/MAJOR: Fix how the list of entities waiting for a buffer is handled

When an entity tries to get a buffer, if it cannot be allocted, for example
because the number of buffers which may be allocated per process is limited,
this entity is added in a list (called <buffer_wq>) and wait for an available
buffer.

Historically, the <buffer_wq> list was logically attached to streams because it
were the only entities likely to be added in it. Now, applets can also be
waiting for a free buffer. And with filters, we could imagine to have more other
entities waiting for a buffer. So it make sense to have a generic list.

Anyway, with the current design there is a bug. When an applet failed to get a
buffer, it will wait. But we add the stream attached to the applet in
<buffer_wq>, instead of the applet itself. So when a buffer is available, we
wake up the stream and not the waiting applet. So, it is possible to have
waiting applets and never awakened.

So, now, <buffer_wq> is independant from streams. And we really add the waiting
entity in <buffer_wq>. To be generic, the entity is responsible to define the
callback used to awaken it.

In addition, applets will still request an input buffer when they become
active. But they will not be sleeped anymore if no buffer are available. So this
is the responsibility to the applet I/O handler to check if this buffer is
allocated or not. This way, an applet can decide if this buffer is required or
not and can do additional processing if not.

[wt: backport to 1.7 and 1.6]
diff --git a/include/proto/applet.h b/include/proto/applet.h
index 5a503b4..653be31 100644
--- a/include/proto/applet.h
+++ b/include/proto/applet.h
@@ -36,6 +36,10 @@
 
 void applet_run_active();
 
+
+static int inline appctx_res_wakeup(struct appctx *appctx);
+
+
 /* Initializes all required fields for a new appctx. Note that it does the
  * minimum acceptable initialization for an appctx. This means only the
  * 3 integer states st0, st1, st2 are zeroed.
@@ -61,6 +65,9 @@
 		appctx->applet = applet;
 		appctx_init(appctx);
 		LIST_INIT(&appctx->runq);
+		LIST_INIT(&appctx->buffer_wait.list);
+		appctx->buffer_wait.target = appctx;
+		appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
 		nb_applets++;
 	}
 	return appctx;
@@ -75,6 +82,10 @@
 		LIST_DEL(&appctx->runq);
 		applets_active_queue--;
 	}
+	if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
+		LIST_DEL(&appctx->buffer_wait.list);
+		LIST_INIT(&appctx->buffer_wait.list);
+	}
 	pool_free2(pool2_connection, appctx);
 	nb_applets--;
 }
@@ -98,6 +109,19 @@
 	}
 }
 
+/* Callback used to wake up an applet when a buffer is available. The applet
+ * <appctx> is woken up is if it is not already in the list of "active"
+ * applets. This functions returns 1 is the stream is woken up, otherwise it
+ * returns 0. */
+static inline int appctx_res_wakeup(struct appctx *appctx)
+{
+	if (!LIST_ISEMPTY(&appctx->runq))
+		return 0;
+	appctx_wakeup(appctx);
+	return 1;
+}
+
+
 #endif /* _PROTO_APPLET_H */
 
 /*
diff --git a/include/proto/channel.h b/include/proto/channel.h
index 3d435c4..304a935 100644
--- a/include/proto/channel.h
+++ b/include/proto/channel.h
@@ -36,6 +36,9 @@
 #include <types/stream.h>
 #include <types/stream_interface.h>
 
+#include <proto/applet.h>
+#include <proto/task.h>
+
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_channel();
 
@@ -439,6 +442,41 @@
 	return ret;
 }
 
+/* Allocates a buffer for channel <chn>, but only if it's guaranteed that it's
+ * not the last available buffer or it's the response buffer. Unless the buffer
+ * is the response buffer, an extra control is made so that we always keep
+ * <tune.buffers.reserved> buffers available after this allocation. Returns 0 in
+ * case of failure, non-zero otherwise.
+ *
+ * If no buffer are available, the requester, represented by <wait> pointer,
+ * will be added in the list of objects waiting for an available buffer.
+ */
+static inline int channel_alloc_buffer(struct channel *chn, struct buffer_wait *wait)
+{
+	int margin = 0;
+
+	if (!(chn->flags & CF_ISRESP))
+		margin = global.tune.reserved_bufs;
+
+	if (b_alloc_margin(&chn->buf, margin) != NULL)
+		return 1;
+
+	if (LIST_ISEMPTY(&wait->list))
+		LIST_ADDQ(&buffer_wq, &wait->list);
+	return 0;
+}
+
+/* Releases a possibly allocated buffer for channel <chn>. If it was not
+ * allocated, this function does nothing. Else the buffer is released and we try
+ * to wake up as many streams/applets as possible. */
+static inline void channel_release_buffer(struct channel *chn, struct buffer_wait *wait)
+{
+	if (chn->buf->size && buffer_empty(chn->buf)) {
+		b_free(&chn->buf);
+		offer_buffers(wait->target, tasks_run_queue + applets_active_queue);
+	}
+}
+
 /* Truncate any unread data in the channel's buffer, and disable forwarding.
  * Outgoing data are left intact. This is mainly to be used to send error
  * messages after existing data.
diff --git a/include/proto/stream.h b/include/proto/stream.h
index b439344..85c234e 100644
--- a/include/proto/stream.h
+++ b/include/proto/stream.h
@@ -32,7 +32,6 @@
 
 extern struct pool_head *pool2_stream;
 extern struct list streams;
-extern struct list buffer_wq;
 
 extern struct data_cb sess_conn_cb;
 
@@ -55,11 +54,7 @@
 
 /* Update the stream's backend and server time stats */
 void stream_update_time_stats(struct stream *s);
-void __stream_offer_buffers(int rqlimit);
-static inline void stream_offer_buffers();
-int stream_alloc_work_buffer(struct stream *s);
 void stream_release_buffers(struct stream *s);
-int stream_alloc_recv_buffer(struct channel *chn);
 
 /* returns the session this stream belongs to */
 static inline struct session *strm_sess(const struct stream *strm)
@@ -285,25 +280,16 @@
 	LIST_INIT(&sess->by_srv);
 }
 
-static inline void stream_offer_buffers()
+/* Callback used to wake up a stream when a buffer is available. The stream <s>
+ * is woken up is if it is not already running and if it is not already in the
+ * task run queue. This functions returns 1 is the stream is woken up, otherwise
+ * it returns 0. */
+static int inline stream_res_wakeup(struct stream *s)
 {
-	int avail;
-
-	if (LIST_ISEMPTY(&buffer_wq))
-		return;
-
-	/* all streams will need 1 buffer, so we can stop waking up streams
-	 * once we have enough of them to eat all the buffers. Note that we
-	 * don't really know if they are streams or just other tasks, but
-	 * that's a rough estimate. Similarly, for each cached event we'll need
-	 * 1 buffer. If no buffer is currently used, always wake up the number
-	 * of tasks we can offer a buffer based on what is allocated, and in
-	 * any case at least one task per two reserved buffers.
-	 */
-	avail = pool2_buffer->allocated - pool2_buffer->used - global.tune.reserved_bufs / 2;
-
-	if (avail > (int)tasks_run_queue)
-		__stream_offer_buffers(avail);
+	if (s->task->state & TASK_RUNNING || task_in_rq(s->task))
+		return 0;
+	task_wakeup(s->task, TASK_WOKEN_RES);
+	return 1;
 }
 
 void service_keywords_register(struct action_kw_list *kw_list);