MEDIUM: h2: start to consider the H2_CF_{MUX,DEM}_* flags for polling

Now we start to set the flags to indicate that the response buffer is
being awaited or that it is full, it makes it possible to centralize a
little bit the polling management into the wake() callback.

In case of error, we wake all the streams up so that they are aware of
the nature of the event and are able to detach if needed.
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 759c6a1..5457bc2 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -589,7 +589,10 @@
 	int max;
 
 	if (conn->flags & CO_FL_ERROR)
-		goto error;
+		return;
+
+	if (h2c->flags & H2_CF_DEM_BLOCK_ANY)
+		return;
 
 	buf = h2_get_dbuf(h2c);
 	if (!buf) {
@@ -600,32 +603,28 @@
 	/* note: buf->o == 0 */
 	max = buf->size - buf->i;
 	if (!max) {
-		/* FIXME: buffer full, add a flag, stop polling and wait */
-		__conn_xprt_stop_recv(conn);
+		h2c->flags |= H2_CF_DEM_DFULL;
 		return;
 	}
 
 	conn->xprt->rcv_buf(conn, buf, max);
 	if (conn->flags & CO_FL_ERROR)
-		goto error;
+		return;
 
-	if (!buf->i)
+	if (!buf->i) {
 		h2_release_dbuf(h2c);
-
-	if (buf->i == buf->size) {
-		/* buffer now full */
-		__conn_xprt_stop_recv(conn);
 		return;
 	}
 
+	if (buf->i == buf->size)
+		h2c->flags |= H2_CF_DEM_DFULL;
+
 	/* FIXME: should we try to process streams here instead of doing it in ->wake ? */
 
-	if (conn_xprt_read0_pending(conn))
-		__conn_xprt_stop_recv(conn);
+	/* after streams have been processed, we should have made some room */
+	if (buf->i != buf->size)
+		h2c->flags &= ~H2_CF_DEM_DFULL;
 	return;
-
- error:
-	__conn_xprt_stop_recv(conn);
 }
 
 /* callback called on send event by the connection handler */
@@ -636,22 +635,16 @@
 	/* FIXME: should we try to process pending streams here instead of doing it in ->wake ? */
 
 	if (conn->flags & CO_FL_ERROR)
-		goto error;
+		return;
 
 	if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) {
 		/* a handshake was requested */
 		return;
 	}
 
-	if (!h2c->mbuf->o) {
-		/* nothing to send */
-		goto done;
-	}
-
 	if (conn->flags & CO_FL_SOCK_WR_SH) {
 		/* output closed, nothing to send, clear the buffer to release it */
 		h2c->mbuf->o = 0;
-		goto done;
 	}
 
 	/* pending response data, we need to try to send or subscribe to
@@ -667,33 +660,39 @@
 	 * problematic for ACKs. The latter should possibly not be set
 	 * for now.
 	 */
-	conn->xprt->snd_buf(conn, h2c->mbuf, 0);
+	if (conn->xprt->snd_buf(conn, h2c->mbuf, 0) > 0)
+		h2c->flags &= ~(H2_CF_MUX_MFULL | H2_CF_DEM_MROOM);
 
 	if (conn->flags & CO_FL_ERROR)
-		goto error;
+		return;
+}
 
-	if (!h2c->mbuf->o)
-		h2_release_mbuf(h2c);
+/* call the wake up function of all streams attached to the connection */
+static void h2_wake_all_streams(struct h2c *h2c)
+{
+	struct eb32_node *node;
+	struct h2s *h2s;
+	unsigned int flags = 0;
 
-	if (h2c->mbuf->o) {
-		/* incomplete send, the snd_buf callback has already updated
-		 * the connection flags.
-		 *
-		 * FIXME: we should arm a send timeout here
-		 */
-		__conn_xprt_want_send(conn);
-		return;
-	}
+	if (h2c->st0 >= H2_CS_ERROR || h2c->conn->flags & CO_FL_ERROR)
+		flags |= CS_FL_ERROR;
 
- done:
-	/* FIXME: release the output buffer when empty or do it in ->wake() ? */
-	__conn_xprt_stop_send(conn);
-	return;
+	if (conn_xprt_read0_pending(h2c->conn))
+		flags |= CS_FL_EOS;
 
- error:
-	/* FIXME: report an error somewhere in the mux */
-	__conn_xprt_stop_send(conn);
-	return;
+	node = eb32_first(&h2c->streams_by_id);
+	while (node) {
+		h2s = container_of(node, struct h2s, by_id);
+		node = eb32_next(node);
+		if (h2s->cs) {
+			h2s->cs->flags |= flags;
+			/* recv is used to force to detect CS_FL_EOS that wake()
+			 * doesn't handle in the stream int code.
+			 */
+			h2s->cs->data_cb->recv(h2s->cs);
+			h2s->cs->data_cb->wake(h2s->cs);
+		}
+	}
 }
 
 /* callback called on any event by the connection handler.
@@ -704,9 +703,48 @@
 {
 	struct h2c *h2c = conn->mux_ctx;
 
-	if ((conn->flags & CO_FL_ERROR) && eb_is_empty(&h2c->streams_by_id)) {
-		h2_release(conn);
-		return -1;
+	if (conn->flags & CO_FL_ERROR || h2c->st0 == H2_CS_ERROR2) {
+		h2_wake_all_streams(h2c);
+
+		if (eb_is_empty(&h2c->streams_by_id)) {
+			/* no more stream, kill the connection now */
+			h2_release(conn);
+			return -1;
+		}
+		else {
+			/* some streams still there, we need to signal them all and
+			 * wait for their departure.
+			 */
+			__conn_xprt_stop_recv(conn);
+			__conn_xprt_stop_send(conn);
+			return 0;
+		}
+	}
+
+	if (!h2c->dbuf->i)
+		h2_release_dbuf(h2c);
+
+	/* stop being notified of incoming data if we can't process them */
+	if (h2c->st0 >= H2_CS_ERROR ||
+	    (h2c->flags & H2_CF_DEM_BLOCK_ANY) || conn_xprt_read0_pending(conn)) {
+		/* FIXME: we should clear a read timeout here */
+		__conn_xprt_stop_recv(conn);
+	}
+	else {
+		/* FIXME: we should (re-)arm a read timeout here */
+		__conn_xprt_want_recv(conn);
+	}
+
+	/* adjust output polling */
+	if ((h2c->st0 == H2_CS_ERROR || h2c->mbuf->o) &&
+	    !(conn->flags & CO_FL_SOCK_WR_SH)) {
+		/* FIXME: we should (re-)arm a send timeout here */
+		__conn_xprt_want_send(conn);
+	}
+	else {
+		/* FIXME: we should clear a send timeout here */
+		h2_release_mbuf(h2c);
+		__conn_xprt_stop_send(conn);
 	}
 
 	return 0;