MAJOR: session: implement a wait-queue for sessions who need a buffer

When a session_alloc_buffers() fails to allocate one or two buffers,
it subscribes the session to buffer_wq, and waits for another session
to release buffers. It's then removed from the queue and woken up with
TASK_WAKE_RES, and can attempt its allocation again.

We decide to try to wake as many waiters as we release buffers so
that if we release 2 and two waiters need only once, they both have
their chance. We must never come to the situation where we don't wake
enough tasks up.

It's common to release buffers after the completion of an I/O callback,
which can happen even if the I/O could not be performed due to half a
failure on memory allocation. In this situation, we don't want to move
out of the wait queue the session that was just added, otherwise it
will never get any buffer. Thus, we only force ourselves out of the
queue when freeing the session.

Note: at the moment, since session_alloc_buffers() is not used, no task
is subscribed to the wait queue.
diff --git a/src/session.c b/src/session.c
index fdd99ba..d236859 100644
--- a/src/session.c
+++ b/src/session.c
@@ -51,6 +51,9 @@
 struct pool_head *pool2_session;
 struct list sessions;
 
+/* list of sessions waiting for at least one buffer */
+struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
+
 static int conn_session_complete(struct connection *conn);
 static int conn_session_update(struct connection *conn);
 static struct task *expire_mini_session(struct task *t);
@@ -409,6 +412,7 @@
 	/* OK, we're keeping the session, so let's properly initialize the session */
 	LIST_ADDQ(&sessions, &s->list);
 	LIST_INIT(&s->back_refs);
+	LIST_INIT(&s->buffer_wait);
 
 	s->flags |= SN_INITIALIZED;
 	s->unique_id = NULL;
@@ -618,8 +622,16 @@
 	if (s->rep->pipe)
 		put_pipe(s->rep->pipe);
 
-	b_free(&s->req->buf);
-	b_free(&s->rep->buf);
+	/* We may still be present in the buffer wait queue */
+	if (!LIST_ISEMPTY(&s->buffer_wait)) {
+		LIST_DEL(&s->buffer_wait);
+		LIST_INIT(&s->buffer_wait);
+	}
+
+	b_drop(&s->req->buf);
+	b_drop(&s->rep->buf);
+	if (!LIST_ISEMPTY(&buffer_wq))
+		session_offer_buffers(1);
 
 	pool_free2(pool2_channel, s->req);
 	pool_free2(pool2_channel, s->rep);
@@ -688,48 +700,75 @@
 	if (b)
 		return 1;
 
-	/* FIXME: normally we're supposed to subscribe to a list of waiters
-	 * for buffers. We release what we failed to allocate.
-	 */
+	if (LIST_ISEMPTY(&s->buffer_wait))
+		LIST_ADDQ(&buffer_wq, &s->buffer_wait);
 	return 0;
 }
 
 /* Allocates up to two buffers for session <s>. Only succeeds if both buffers
  * are properly allocated. It is meant to be called inside process_session() so
- * that both request and response buffers are allocated. Returns 0 incase of
+ * that both request and response buffers are allocated. Returns 0 in case of
  * failure, non-zero otherwise.
  */
 int session_alloc_buffers(struct session *s)
 {
-	if (!s->req->buf->size && !b_alloc(&s->req->buf))
-		return 0;
+	if (!LIST_ISEMPTY(&s->buffer_wait)) {
+		LIST_DEL(&s->buffer_wait);
+		LIST_INIT(&s->buffer_wait);
+	}
 
-	if (s->rep->buf->size || b_alloc(&s->rep->buf))
+	if ((s->req->buf->size || b_alloc(&s->req->buf)) &&
+	    (s->rep->buf->size || b_alloc(&s->rep->buf)))
 		return 1;
 
-	if (buffer_empty(s->req->buf)) {
-		__b_drop(&s->req->buf);
-		s->req->buf = &buf_wanted;
-	}
-
-	/* FIXME: normally we're supposed to subscribe to a list of waiters
-	 * for buffers. We release what we failed to allocate.
-	 */
+	session_release_buffers(s);
+	LIST_ADDQ(&buffer_wq, &s->buffer_wait);
 	return 0;
 }
 
 /* releases unused buffers after processing. Typically used at the end of the
- * update() functions.
+ * update() functions. It will try to wake up as many tasks as the number of
+ * buffers that it releases. In practice, most often sessions are blocked on
+ * a single buffer, so it makes sense to try to wake two up when two buffers
+ * are released at once.
  */
 void session_release_buffers(struct session *s)
 {
+	int release_count = 0;
+
+	release_count = !!s->req->buf->size + !!s->rep->buf->size;
+
 	if (s->req->buf->size && buffer_empty(s->req->buf))
 		b_free(&s->req->buf);
 
 	if (s->rep->buf->size && buffer_empty(s->rep->buf))
 		b_free(&s->rep->buf);
 
+	/* if we're certain to have at least 1 buffer available, and there is
+	 * someone waiting, we can wake up a waiter and offer them.
+	 */
+	if (release_count >= 1 && !LIST_ISEMPTY(&buffer_wq))
+		session_offer_buffers(release_count);
+}
+
-	/* FIXME: normally we want to wake up pending tasks */
+/* run across the list of pending sessions waiting for a buffer and wake
+ * one up if buffers are available.
+ */
+void session_offer_buffers(int count)
+{
+	struct session *sess, *bak;
+
+	list_for_each_entry_safe(sess, bak, &buffer_wq, buffer_wait) {
+		if (sess->task->state & TASK_RUNNING)
+			continue;
+
+		LIST_DEL(&sess->buffer_wait);
+		LIST_INIT(&sess->buffer_wait);
+
+		task_wakeup(sess->task, TASK_WOKEN_RES);
+		if (--count <= 0)
+			break;
+	}
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */