MEDIUM: mux_h2: Revamp the send path when blocking.

Change fctl_list and send_list to be lists of struct wait_list, and nuke
send_wait_list, as it's now redundant.
Make the code responsible for shutr/shutw subscribe to those lists.
diff --git a/src/mux_h2.c b/src/mux_h2.c
index f0df0e0..64cd596 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -120,7 +120,6 @@
 	struct list send_list; /* list of blocked streams requesting to send */
 	struct list fctl_list; /* list of streams blocked by connection's fctl */
 	struct buffer_wait buf_wait; /* wait list for buffer allocations */
-	struct list send_wait_list;  /* list of tasks to wake when we're ready to send */
 	struct wait_list wait_list;  /* We're in a wait list, to send */
 };
 
@@ -177,14 +176,14 @@
 	struct h2c *h2c;
 	struct h1m req, res;      /* request and response parser state for H1 */
 	struct eb32_node by_id; /* place in h2c's streams_by_id */
-	struct list list; /* position in active/blocked lists if blocked>0 */
 	int32_t id; /* stream ID */
 	uint32_t flags;      /* H2_SF_* */
 	int mws;             /* mux window size for this stream */
 	enum h2_err errcode; /* H2 err code (H2_ERR_*) */
 	enum h2_ss st;
 	struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */
-	struct wait_list *recv_wait_list; /* Somebody subscribed to be waken up on recv */
+	struct wait_list wait_list; /* Wait list, when we're attempting to send a RST but we can't send */
+	struct wait_list *recv_wait_list; /* Address of the wait_list the conn_stream associated is waiting on */
 };
 
 /* descriptor for an h2 frame header */
@@ -227,6 +226,7 @@
 static inline struct h2s *h2c_st_by_id(struct h2c *h2c, int id);
 static int h2_frt_decode_headers(struct h2s *h2s);
 static int h2_frt_transfer_data(struct h2s *h2s);
+static struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned short state);
 
 /*****************************************************/
 /* functions below are for dynamic buffer management */
@@ -417,7 +417,6 @@
 	if (t)
 		task_queue(t);
 	conn_xprt_want_recv(conn);
-	LIST_INIT(&h2c->send_wait_list);
 	LIST_INIT(&h2c->wait_list.list);
 
 	/* Try to read, if nothing is available yet we'll just subscribe */
@@ -648,13 +647,12 @@
 static void h2s_destroy(struct h2s *h2s)
 {
 	h2s_close(h2s);
-	LIST_DEL(&h2s->list);
-	LIST_INIT(&h2s->list);
 	eb32_delete(&h2s->by_id);
 	if (b_size(&h2s->rxbuf)) {
 		b_free(&h2s->rxbuf);
 		offer_buffers(NULL, tasks_run_queue);
 	}
+	tasklet_free(h2s->wait_list.task);
 	pool_free(pool_head_h2s, h2s);
 }
 
@@ -671,6 +669,17 @@
 	if (!h2s)
 		goto out;
 
+	h2s->wait_list.task = tasklet_new();
+	if (!h2s->wait_list.task) {
+		pool_free(pool_head_h2s, h2s);
+		goto out;
+	}
+	LIST_INIT(&h2s->wait_list.list);
+	h2s->recv_wait_list = NULL;
+	h2s->wait_list.task->process = h2_deferred_shut;
+	h2s->wait_list.task->context = h2s;
+	h2s->wait_list.handle = NULL;
+	h2s->wait_list.wait_reason = 0;
 	h2s->h2c       = h2c;
 	h2s->mws       = h2c->miw;
 	h2s->flags     = H2_SF_NONE;
@@ -681,7 +690,6 @@
 	h1m_init(&h2s->res);
 	h2s->by_id.key = h2s->id = id;
 	h2c->max_id    = id;
-	LIST_INIT(&h2s->list);
 
 	eb32_insert(&h2c->streams_by_id, &h2s->by_id);
 	h2c->nb_streams++;
@@ -1442,14 +1450,7 @@
 		h2s->mws += inc;
 		if (h2s->mws > 0 && (h2s->flags & H2_SF_BLK_SFCTL)) {
 			h2s->flags &= ~H2_SF_BLK_SFCTL;
-			if (h2s->cs && LIST_ISEMPTY(&h2s->list) &&
-			    (h2s->cs->flags & CS_FL_DATA_WR_ENA)) {
-				/* This stream wanted to send but could not due to its
-				 * own flow control. We can put it back into the send
-				 * list now, it will be handled upon next send() call.
-				 */
-				LIST_ADDQ(&h2c->send_list, &h2s->list);
-			}
+			/* The task will be waken up later */
 		}
 	}
 	else {
@@ -2127,7 +2128,8 @@
  */
 static int h2_process_mux(struct h2c *h2c)
 {
-	struct h2s *h2s, *h2s_back;
+	struct h2s *h2s;
+	struct wait_list *sw, *sw_back;
 
 	/* start by sending possibly pending window updates */
 	if (h2c->rcvd_c > 0 &&
@@ -2140,84 +2142,47 @@
 	 * blocked just on this.
 	 */
 
-	list_for_each_entry_safe(h2s, h2s_back, &h2c->fctl_list, list) {
+	list_for_each_entry_safe(sw, sw_back, &h2c->fctl_list, list) {
+		h2s = sw->handle;
 		if (h2c->mws <= 0 || h2c->flags & H2_CF_MUX_BLOCK_ANY ||
 		    h2c->st0 >= H2_CS_ERROR)
 			break;
 
-		/* In theory it's possible that h2s->cs == NULL here :
-		 *  - client sends crap that causes a parse error
-		 *  - RST_STREAM is produced and CS_FL_ERROR at the same time
-		 *  - RST_STREAM cannot be emitted because mux is busy/full
-		 *  - stream gets notified, detaches and quits
-		 *  - mux buffer gets ready and wakes pending streams up
-		 *  - bam!
-		 */
-		h2s->flags &= ~H2_SF_BLK_ANY;
-
-		if (h2s->cs) {
-			h2s->cs->data_cb->wake(h2s->cs);
-		} else {
-			h2s_send_rst_stream(h2c, h2s);
-		}
-
-		/* depending on callee's blocking reasons, we may queue in send
-		 * list or completely dequeue.
-		 */
-		if ((h2s->flags & H2_SF_BLK_MFCTL) == 0) {
-			if (h2s->flags & H2_SF_BLK_ANY) {
-				LIST_DEL(&h2s->list);
-				LIST_ADDQ(&h2c->send_list, &h2s->list);
-			}
-			else {
-				LIST_DEL(&h2s->list);
-				LIST_INIT(&h2s->list);
-				if (h2s->cs)
-					h2s->cs->flags &= ~CS_FL_DATA_WR_ENA;
-				else {
-					/* just sent the last frame for this orphaned stream */
-					h2s_destroy(h2s);
-				}
-			}
+		/* If the tasklet was added to finish shutr/shutw, just wake the task */
+		if ((long)(h2s) & 3) {
+			sw->wait_reason &= ~SUB_CAN_SEND;
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			tasklet_wakeup(sw->task);
+		} else if (!(h2s->flags & H2_SF_BLK_SFCTL)) {
+			h2s->flags &= ~H2_SF_BLK_ANY;
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			sw->wait_reason &= ~SUB_CAN_SEND;
+			tasklet_wakeup(sw->task);
 		}
 	}
 
+	list_for_each_entry_safe(sw, sw_back, &h2c->send_list, list) {
+		h2s = sw->handle;
+
-	list_for_each_entry_safe(h2s, h2s_back, &h2c->send_list, list) {
 		if (h2c->st0 >= H2_CS_ERROR || h2c->flags & H2_CF_MUX_BLOCK_ANY)
 			break;
 
-		/* In theory it's possible that h2s->cs == NULL here :
-		 *  - client sends crap that causes a parse error
-		 *  - RST_STREAM is produced and CS_FL_ERROR at the same time
-		 *  - RST_STREAM cannot be emitted because mux is busy/full
-		 *  - stream gets notified, detaches and quits
-		 *  - mux buffer gets ready and wakes pending streams up
-		 *  - bam!
-		 */
-		h2s->flags &= ~H2_SF_BLK_ANY;
-
-		if (h2s->cs) {
-			h2s->cs->data_cb->wake(h2s->cs);
-		} else {
-			h2s_send_rst_stream(h2c, h2s);
-		}
-		/* depending on callee's blocking reasons, we may queue in fctl
-		 * list or completely dequeue.
-		 */
-		if (h2s->flags & H2_SF_BLK_MFCTL) {
-			/* stream hit the connection's flow control */
-			LIST_DEL(&h2s->list);
-			LIST_ADDQ(&h2c->fctl_list, &h2s->list);
+		/* If the tasklet was added to finish shutr/shutw, just wake the task */
+		if ((long)(h2s) & 3) {
+			sw->wait_reason &= ~SUB_CAN_SEND;
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			tasklet_wakeup(sw->task);
 		}
-		else if (!(h2s->flags & H2_SF_BLK_ANY)) {
-			LIST_DEL(&h2s->list);
-			LIST_INIT(&h2s->list);
-			if (h2s->cs)
-				h2s->cs->flags &= ~CS_FL_DATA_WR_ENA;
-			else {
-				/* just sent the last frame for this orphaned stream */
-				h2s_destroy(h2s);
-			}
+		else if (!(h2s->flags & H2_SF_BLK_SFCTL)) {
+			h2s->flags &= ~H2_SF_BLK_ANY;
+
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			sw->wait_reason &= ~SUB_CAN_SEND;
+			tasklet_wakeup(sw->task);
 		}
 	}
 
@@ -2350,8 +2315,8 @@
 	 * for us.
 	 */
 	if (!(h2c->flags & (H2_CF_MUX_MFULL | H2_CF_DEM_MROOM))) {
-		while (!LIST_ISEMPTY(&h2c->send_wait_list)) {
-			struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n,
+		while (!LIST_ISEMPTY(&h2c->send_list)) {
+			struct wait_list *sw = LIST_ELEM(h2c->send_list.n,
 			    struct wait_list *, list);
 			LIST_DEL(&sw->list);
 			LIST_INIT(&sw->list);
@@ -2582,20 +2547,11 @@
 	 */
 
 	if (cs->flags & CS_FL_DATA_WR_ENA) {
-		if (LIST_ISEMPTY(&h2s->list)) {
-			if (LIST_ISEMPTY(&h2s->h2c->send_list) &&
-			    !b_data(&h2s->h2c->mbuf) && // not yet subscribed
-			    !(cs->conn->flags & CO_FL_SOCK_WR_SH))
-				conn_xprt_want_send(cs->conn);
-			LIST_ADDQ(&h2s->h2c->send_list, &h2s->list);
-			tasklet_wakeup(h2s->h2c->wait_list.task);
-		}
-	}
-	else if (!LIST_ISEMPTY(&h2s->list)) {
-		LIST_DEL(&h2s->list);
-		LIST_INIT(&h2s->list);
-		h2s->flags &= ~(H2_SF_BLK_MBUSY | H2_SF_BLK_MROOM | H2_SF_BLK_MFCTL);
+		if (!b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_SOCK_WR_SH))
+			conn_xprt_want_send(cs->conn);
+		tasklet_wakeup(h2s->h2c->wait_list.task);
 	}
+	/* We don't support unsubscribing from here, it shouldn't be a problem */
 
 	/* this can happen from within si_chk_snd() */
 	if (b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_XPRT_WR_ENA))
@@ -2674,12 +2630,10 @@
 	}
 }
 
-static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
+static void h2_do_shutr(struct h2s *h2s)
 {
-	struct h2s *h2s = cs->ctx;
-
-	if (!mode)
-		return;
+	struct h2c *h2c = h2s->h2c;
+	struct wait_list *sw = &h2s->wait_list;
 
 	if (h2s->st == H2_SS_HLOC || h2s->st == H2_SS_ERROR || h2s->st == H2_SS_CLOSED)
 		return;
@@ -2690,31 +2644,36 @@
 	 * case we send a goaway to close the connection.
 	 */
 	if (!(h2s->flags & H2_SF_RST_SENT) &&
-	    h2s_send_rst_stream(h2s->h2c, h2s) <= 0)
+	    h2s_send_rst_stream(h2c, h2s) <= 0)
 		goto add_to_list;
 
 	if (!(h2s->flags & H2_SF_OUTGOING_DATA) &&
 	    !(h2s->h2c->flags & (H2_CF_GOAWAY_SENT|H2_CF_GOAWAY_FAILED)) &&
-	    h2c_send_goaway_error(h2s->h2c, h2s) <= 0)
-		goto add_to_list;
-
-	if (b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_XPRT_WR_ENA))
-		conn_xprt_want_send(cs->conn);
+	    h2c_send_goaway_error(h2c, h2s) <= 0)
+		return;
+	if (b_data(&h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA))
+		conn_xprt_want_send(h2c->conn);
 
 	h2s_close(h2s);
 
- add_to_list:
-	if (LIST_ISEMPTY(&h2s->list)) {
+	return;
+add_to_list:
+	if (LIST_ISEMPTY(&sw->list)) {
+		sw->wait_reason |= SUB_CAN_SEND;
 		if (h2s->flags & H2_SF_BLK_MFCTL)
-			LIST_ADDQ(&h2s->h2c->fctl_list, &h2s->list);
+			LIST_ADDQ(&h2c->fctl_list, &sw->list);
 		else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM))
-			LIST_ADDQ(&h2s->h2c->send_list, &h2s->list);
+			LIST_ADDQ(&h2c->send_list, &sw->list);
 	}
+	/* Let the handler know we want shutr */
+	sw->handle = (void *)((long)sw->handle | 1);
+
 }
 
-static void h2_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
+static void h2_do_shutw(struct h2s *h2s)
 {
-	struct h2s *h2s = cs->ctx;
+	struct h2c *h2c = h2s->h2c;
+	struct wait_list *sw = &h2s->wait_list;
 
 	if (h2s->st == H2_SS_HLOC || h2s->st == H2_SS_ERROR || h2s->st == H2_SS_CLOSED)
 		return;
@@ -2737,29 +2696,64 @@
 		 * case we send a goaway to close the connection.
 		 */
 		if (!(h2s->flags & H2_SF_RST_SENT) &&
-		    h2s_send_rst_stream(h2s->h2c, h2s) <= 0)
+		    h2s_send_rst_stream(h2c, h2s) <= 0)
 			goto add_to_list;
 
 		if (!(h2s->flags & H2_SF_OUTGOING_DATA) &&
 		    !(h2s->h2c->flags & (H2_CF_GOAWAY_SENT|H2_CF_GOAWAY_FAILED)) &&
-		    h2c_send_goaway_error(h2s->h2c, h2s) <= 0)
+		    h2c_send_goaway_error(h2c, h2s) <= 0)
 			goto add_to_list;
 
 		h2s_close(h2s);
 	}
 
-	if (b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_XPRT_WR_ENA))
-		conn_xprt_want_send(cs->conn);
+	if (b_data(&h2s->h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA))
+		conn_xprt_want_send(h2c->conn);
 
  add_to_list:
-	if (LIST_ISEMPTY(&h2s->list)) {
+	sw = &h2s->wait_list;
+
+	if (LIST_ISEMPTY(&sw->list)) {
+		sw->wait_reason |= SUB_CAN_SEND;
 		if (h2s->flags & H2_SF_BLK_MFCTL)
-			LIST_ADDQ(&h2s->h2c->fctl_list, &h2s->list);
+			LIST_ADDQ(&h2s->h2c->fctl_list, &sw->list);
 		else if (h2s->flags & (H2_SF_BLK_MBUSY|H2_SF_BLK_MROOM))
-			LIST_ADDQ(&h2s->h2c->send_list, &h2s->list);
+			LIST_ADDQ(&h2s->h2c->send_list, &sw->list);
 	}
+	/* let the handler know we want to shutr */
+	sw->handle = (void *)((long)(sw->handle) | 2);
+}
+
+static struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned short state)
+{
+	struct h2s *h2s = ctx;
+	long reason = (long)h2s->wait_list.handle;
+
+	if (reason & 1)
+		h2_do_shutr(h2s);
+	if (reason & 2)
+		h2_do_shutw(h2s);
+
+	return NULL;
 }
 
+static void h2_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
+{
+	struct h2s *h2s = cs->ctx;
+
+	if (!mode)
+		return;
+
+	h2_do_shutr(h2s);
+}
+
+static void h2_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
+{
+	struct h2s *h2s = cs->ctx;
+
+	h2_do_shutw(h2s);
+}
+
 /* Decode the payload of a HEADERS frame and produce the equivalent HTTP/1
  * request. Returns the number of bytes emitted if > 0, or 0 if it couldn't
  * proceed. Stream errors are reported in h2s->errcode and connection errors
@@ -3496,6 +3490,7 @@
 		sw = param;
 		if (!(sw->wait_reason & SUB_CAN_RECV)) {
 			sw->wait_reason |= SUB_CAN_RECV;
+			sw->handle = h2s;
 			h2s->recv_wait_list = sw;
 		}
 		return 0;
@@ -3503,7 +3498,11 @@
 		sw = param;
 		if (!(sw->wait_reason & SUB_CAN_SEND)) {
 			sw->wait_reason |= SUB_CAN_SEND;
-			LIST_ADDQ(&h2c->send_wait_list, &sw->list);
+			sw->handle = h2s;
+			if (h2s->flags & H2_SF_BLK_MFCTL)
+				LIST_ADDQ(&h2c->fctl_list, &sw->list);
+			else
+				LIST_ADDQ(&h2c->send_list, &sw->list);
 		}
 		return 0;
 	default:
@@ -3600,16 +3599,6 @@
 			h2s_close(h2s);
 	}
 
-	if (h2s->flags & H2_SF_BLK_SFCTL) {
-		/* stream flow control, quit the list */
-		LIST_DEL(&h2s->list);
-		LIST_INIT(&h2s->list);
-	}
-	else if (LIST_ISEMPTY(&h2s->list)) {
-		if (h2s->flags & H2_SF_BLK_MFCTL)
-			LIST_ADDQ(&h2s->h2c->fctl_list, &h2s->list);
-	}
-
 	b_del(buf, total);
 	if (total > 0) {
 		conn_xprt_want_send(h2s->h2c->conn);
@@ -3624,6 +3613,7 @@
 {
 	struct h2c *h2c = conn->mux_ctx;
 	struct h2s *h2s;
+	struct wait_list *sw;
 	struct eb32_node *node;
 	int fctl_cnt = 0;
 	int send_cnt = 0;
@@ -3633,10 +3623,10 @@
 	if (!h2c)
 		return;
 
-	list_for_each_entry(h2s, &h2c->fctl_list, list)
+	list_for_each_entry(sw, &h2c->fctl_list, list)
 		fctl_cnt++;
 
-	list_for_each_entry(h2s, &h2c->send_list, list)
+	list_for_each_entry(sw, &h2c->send_list, list)
 		send_cnt++;
 
 	node = eb32_first(&h2c->streams_by_id);