MEDIUM: connections/mux: Add a recv and a send+recv wait list.

For struct connection, struct conn_stream, and for the h2 mux, add 2 new
lists, one that handles waiters for recv, and one that handles waiters for
recv and send. That way we can ask to subscribe for either recv or send.
diff --git a/include/proto/connection.h b/include/proto/connection.h
index ea6b17b..c7f2561 100644
--- a/include/proto/connection.h
+++ b/include/proto/connection.h
@@ -602,6 +602,8 @@
 	cs->flags = CS_FL_NONE;
 	LIST_INIT(&cs->wait_list.list);
 	LIST_INIT(&cs->send_wait_list);
+	LIST_INIT(&cs->recv_wait_list);
+	LIST_INIT(&cs->sendrecv_wait_list);
 	cs->conn = conn;
 	cs->wait_list.wait_reason = 0;
 }
@@ -629,6 +631,8 @@
 	conn->proxy_netns = NULL;
 	LIST_INIT(&conn->list);
 	LIST_INIT(&conn->send_wait_list);
+	LIST_INIT(&conn->recv_wait_list);
+	LIST_INIT(&conn->sendrecv_wait_list);
 }
 
 /* sets <owner> as the connection's owner */
@@ -711,8 +715,19 @@
 /* Releases a connection previously allocated by conn_new() */
 static inline void conn_free(struct connection *conn)
 {
-	LIST_DEL(&conn->send_wait_list);
-	LIST_INIT(&conn->send_wait_list);
+	struct wait_list *sw, *sw_back;
+	list_for_each_entry_safe(sw, sw_back, &conn->recv_wait_list, list) {
+		LIST_DEL(&sw->list);
+		LIST_INIT(&sw->list);
+	}
+	list_for_each_entry_safe(sw, sw_back, &conn->send_wait_list, list) {
+		LIST_DEL(&sw->list);
+		LIST_INIT(&sw->list);
+	}
+	list_for_each_entry_safe(sw, sw_back, &conn->sendrecv_wait_list, list) {
+		LIST_DEL(&sw->list);
+		LIST_INIT(&sw->list);
+	}
 	pool_free(pool_head_connection, conn);
 }
 
diff --git a/include/types/connection.h b/include/types/connection.h
index 9a1ba96..421df3c 100644
--- a/include/types/connection.h
+++ b/include/types/connection.h
@@ -375,6 +375,8 @@
 	struct connection *conn;             /* xprt-level connection */
 	struct wait_list wait_list;          /* We're in a wait list for send */
 	struct list send_wait_list;          /* list of tasks to wake when we're ready to send */
+	struct list recv_wait_list;          /* list of tasks to wake when we're ready to recv */
+	struct list sendrecv_wait_list;      /* list of tasks to wake when we're ready to either send or recv */
 	void *data;                          /* pointer to upper layer's entity (eg: stream interface) */
 	const struct data_cb *data_cb;       /* data layer callbacks. Must be set before xprt->init() */
 	void *ctx;                           /* mux-specific context */
@@ -406,6 +408,8 @@
 
 	/* second cache line */
 	struct list send_wait_list;   /* list of tasks to wake when we're ready to send */
+	struct list recv_wait_list;          /* list of tasks to wake when we're ready to recv */
+	struct list sendrecv_wait_list;      /* list of tasks to wake when we're ready to either send or recv */
 	struct list list;             /* attach point to various connection lists (idle, ...) */
 	int xprt_st;                  /* transport layer state, initialized to zero */
 	int tmp_early_data;           /* 1st byte of early data, if any */
diff --git a/src/connection.c b/src/connection.c
index e303f2c..005e0e7 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -137,6 +137,15 @@
 			sw->wait_reason &= ~SUB_CAN_SEND;
 			tasklet_wakeup(sw->task);
 		}
+		while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) {
+			struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
+			    struct wait_list *, list);
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			LIST_ADDQ(&conn->recv_wait_list, &sw->list);
+			sw->wait_reason &= ~SUB_CAN_SEND;
+			tasklet_wakeup(sw->task);
+		}
 	}
 
 	/* The data transfer starts here and stops on error and handshakes. Note
@@ -334,11 +343,34 @@
 	struct wait_list *sw;
 
 	switch (event_type) {
+	case SUB_CAN_RECV:
+		sw = param;
+		if (!(sw->wait_reason & SUB_CAN_RECV)) {
+			sw->wait_reason |= SUB_CAN_RECV;
+			/* If we're already subscribed for send(), move it
+			 * to the send+recv list
+			 */
+			if (sw->wait_reason & SUB_CAN_SEND) {
+				LIST_DEL(&sw->list);
+				LIST_INIT(&sw->list);
+				LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list);
+			} else
+				LIST_ADDQ(&conn->recv_wait_list, &sw->list);
+		}
+		return 0;
 	case SUB_CAN_SEND:
 		sw = param;
 		if (!(sw->wait_reason & SUB_CAN_SEND)) {
 			sw->wait_reason |= SUB_CAN_SEND;
-			LIST_ADDQ(&conn->send_wait_list, &sw->list);
+			/* If we're already subscribed for recv(), move it
+			 * to the send+recv list
+			 */
+			if (sw->wait_reason & SUB_CAN_RECV) {
+				LIST_DEL(&sw->list);
+				LIST_INIT(&sw->list);
+				LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list);
+			} else
+				LIST_ADDQ(&conn->send_wait_list, &sw->list);
 		}
 		return 0;
 	default:
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 946288d..3c873e9 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -121,6 +121,8 @@
 	struct list fctl_list; /* list of streams blocked by connection's fctl */
 	struct buffer_wait buf_wait; /* wait list for buffer allocations */
 	struct list send_wait_list;  /* list of tasks to wake when we're ready to send */
+	struct list recv_wait_list;          /* list of tasks to wake when we're ready to recv */
+	struct list sendrecv_wait_list;      /* list of tasks to wake when we're ready to either send or recv */
 	struct wait_list wait_list;  /* We're in a wait list, to send */
 };
 
@@ -406,6 +408,8 @@
 		task_queue(t);
 	conn_xprt_want_recv(conn);
 	LIST_INIT(&h2c->send_wait_list);
+	LIST_INIT(&h2c->recv_wait_list);
+	LIST_INIT(&h2c->sendrecv_wait_list);
 	LIST_INIT(&h2c->wait_list.list);
 
 	/* mux->wake will be called soon to complete the operation */
@@ -2333,6 +2337,16 @@
 			sw->wait_reason &= ~SUB_CAN_SEND;
 			tasklet_wakeup(sw->task);
 		}
+		while (!(LIST_ISEMPTY(&h2c->sendrecv_wait_list))) {
+			struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n,
+			    struct wait_list *, list);
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
+			sw->wait_reason &= ~SUB_CAN_SEND;
+			tasklet_wakeup(sw->task);
+		}
+
 
 	}
 	/* We're done, no more to send */
@@ -3456,14 +3470,37 @@
 {
 	struct wait_list *sw;
 	struct h2s *h2s = cs->ctx;
+	struct h2c *h2c = h2s->h2c;
 
 	switch (event_type) {
+	case SUB_CAN_RECV:
+		sw = param;
+		if (!(sw->wait_reason & SUB_CAN_RECV)) {
+			sw->wait_reason |= SUB_CAN_RECV;
+			/* If we're already subscribed for send(), move it
+			 * to the send+recv list
+			 */
+			if (sw->wait_reason & SUB_CAN_SEND) {
+				LIST_DEL(&sw->list);
+				LIST_INIT(&sw->list);
+				LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
+			} else
+				LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
+		}
+		return 0;
 	case SUB_CAN_SEND:
 		sw = param;
-		if (LIST_ISEMPTY(&h2s->list) &&
-		    !(sw->wait_reason & SUB_CAN_SEND)) {
-			LIST_ADDQ(&h2s->h2c->send_wait_list, &sw->list);
+		if (!(sw->wait_reason & SUB_CAN_SEND)) {
 			sw->wait_reason |= SUB_CAN_SEND;
+			/* If we're already subscribed for recv(), move it
+			 * to the send+recv list
+			 */
+			if (sw->wait_reason & SUB_CAN_RECV) {
+				LIST_DEL(&sw->list);
+				LIST_INIT(&sw->list);
+				LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
+			} else
+				LIST_ADDQ(&h2c->send_wait_list, &sw->list);
 		}
 		return 0;
 	default:
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 72fec21..cfa613a 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -752,6 +752,16 @@
 			sw->wait_reason &= ~SUB_CAN_SEND;
 			tasklet_wakeup(sw->task);
 		}
+		while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) {
+			struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n,
+			    struct wait_list *, list);
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			LIST_ADDQ(&cs->recv_wait_list, &sw->list);
+			sw->wait_reason &= ~SUB_CAN_SEND;
+			tasklet_wakeup(sw->task);
+		}
+
 	}
 	return NULL;
 }