MINOR: connections/mux: Add a new "subscribe" method.

Add a new "subscribe" method for connection, conn_stream and mux, so that
upper layer can subscribe to them, to be called when the event happens.
Right now, the only event implemented is "SUB_CAN_SEND", where the upper
layer can register to be called back when it is possible to send data.

The connection and conn_stream got a new "send_wait_list" entry, which
required to move a few struct members around to maintain an efficient
cache alignment (and actually this slightly improved performance).
diff --git a/src/connection.c b/src/connection.c
index db869fb..94e7209 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -128,6 +128,13 @@
 		 */
 		flags = 0;
 		conn->mux->send(conn);
+		while (!LIST_ISEMPTY(&conn->send_wait_list)) {
+			struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
+			    struct wait_list *, list);
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			tasklet_wakeup(sw->task);
+		}
 	}
 
 	/* The data transfer starts here and stops on error and handshakes. Note
@@ -323,6 +330,22 @@
 	return ret;
 }
 
+int conn_subscribe(struct connection *conn, int event_type, void *param)
+{
+	struct wait_list *sw;
+
+	switch (event_type) {
+	case SUB_CAN_SEND:
+		sw = param;
+		if (LIST_ISEMPTY(&sw->list))
+			LIST_ADDQ(&conn->send_wait_list, &sw->list);
+		return 0;
+	default:
+		break;
+	}
+	return (-1);
+}
+
 /* Drains possibly pending incoming data on the file descriptor attached to the
  * connection and update the connection's flags accordingly. This is used to
  * know whether we need to disable lingering on close. Returns non-zero if it
diff --git a/src/mux_h2.c b/src/mux_h2.c
index e252083..ba6bd8d 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -120,6 +120,7 @@
 	struct list send_list; /* list of blocked streams requesting to send */
 	struct list fctl_list; /* list of streams blocked by connection's fctl */
 	struct buffer_wait buf_wait; /* wait list for buffer allocations */
+	struct list send_wait_list;  /* list of tasks to wake when we're ready to send */
 };
 
 /* H2 stream state, in h2s->st */
@@ -379,6 +380,7 @@
 	if (t)
 		task_queue(t);
 	conn_xprt_want_recv(conn);
+	LIST_INIT(&h2c->send_wait_list);
 
 	/* mux->wake will be called soon to complete the operation */
 	return 0;
@@ -2228,6 +2230,19 @@
 		/* output closed, nothing to send, clear the buffer to release it */
 		b_reset(&h2c->mbuf);
 	}
+	/* We're not full anymore, so we can wake any task that are waiting
+	 * for us.
+	 */
+	if (!(h2c->flags & (H2_CF_MUX_MFULL | H2_CF_DEM_MROOM))) {
+		while (!LIST_ISEMPTY(&h2c->send_wait_list)) {
+			struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n,
+			    struct wait_list *, list);
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			tasklet_wakeup(sw->task);
+		}
+
+	}
 }
 
 /* callback called on any event by the connection handler.
@@ -3369,6 +3384,26 @@
 	return total;
 }
 
+/* Called from the upper layer, to subscribe to events, such as being able to send */
+static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
+{
+	struct wait_list *sw;
+	struct h2s *h2s = cs->ctx;
+
+	switch (event_type) {
+	case SUB_CAN_SEND:
+		sw = param;
+		if (LIST_ISEMPTY(&h2s->list) && LIST_ISEMPTY(&sw->list))
+			LIST_ADDQ(&h2s->h2c->send_wait_list, &sw->list);
+		return 0;
+	default:
+		break;
+	}
+	return -1;
+
+
+}
+
 /* Called from the upper layer, to send data */
 static size_t h2_snd_buf(struct conn_stream *cs, const struct buffer *buf, size_t count, int flags)
 {
@@ -3545,6 +3580,7 @@
 	.update_poll = h2_update_poll,
 	.rcv_buf = h2_rcv_buf,
 	.snd_buf = h2_snd_buf,
+	.subscribe = h2_subscribe,
 	.attach = h2_attach,
 	.detach = h2_detach,
 	.shutr = h2_shutr,
diff --git a/src/mux_pt.c b/src/mux_pt.c
index b6d0b1a..059e499 100644
--- a/src/mux_pt.c
+++ b/src/mux_pt.c
@@ -177,6 +177,12 @@
 	return cs->conn->xprt->snd_buf(cs->conn, buf, count, flags);
 }
 
+/* Called from the upper layer, to subscribe to events */
+static int mux_pt_subscribe(struct conn_stream *cs, int event_type, void *param)
+{
+	return (cs->conn->xprt->subscribe(cs->conn, event_type, param));
+}
+
 #if defined(CONFIG_HAP_LINUX_SPLICE)
 /* Send and get, using splicing */
 static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
@@ -206,6 +212,7 @@
 	.update_poll = mux_pt_update_poll,
 	.rcv_buf = mux_pt_rcv_buf,
 	.snd_buf = mux_pt_snd_buf,
+	.subscribe = mux_pt_subscribe,
 #if defined(CONFIG_HAP_LINUX_SPLICE)
 	.rcv_pipe = mux_pt_rcv_pipe,
 	.snd_pipe = mux_pt_snd_pipe,
diff --git a/src/raw_sock.c b/src/raw_sock.c
index 375c453..c108a42 100644
--- a/src/raw_sock.c
+++ b/src/raw_sock.c
@@ -424,6 +424,7 @@
 static struct xprt_ops raw_sock = {
 	.snd_buf  = raw_sock_from_buf,
 	.rcv_buf  = raw_sock_to_buf,
+	.subscribe = conn_subscribe,
 #if defined(CONFIG_HAP_LINUX_SPLICE)
 	.rcv_pipe = raw_sock_to_pipe,
 	.snd_pipe = raw_sock_from_pipe,
diff --git a/src/ssl_sock.c b/src/ssl_sock.c
index 5689820..7e8739a 100644
--- a/src/ssl_sock.c
+++ b/src/ssl_sock.c
@@ -8895,6 +8895,7 @@
 static struct xprt_ops ssl_sock = {
 	.snd_buf  = ssl_sock_from_buf,
 	.rcv_buf  = ssl_sock_to_buf,
+	.subscribe = conn_subscribe,
 	.rcv_pipe = NULL,
 	.snd_pipe = NULL,
 	.shutr    = NULL,