MEDIUM: mux-h2: merge recv_wait and send_wait event notifications

This is the continuation of the recv+send event notifications merge
that was started. This patch is less trivial than the previous ones
because the existence of a send event subscription is also used to
decide to put a stream back into the send list.
diff --git a/src/mux_h2.c b/src/mux_h2.c
index c4edd40..5a1b226 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -206,8 +206,7 @@
 	uint16_t status;     /* HTTP response status */
 	unsigned long long body_len; /* remaining body length according to content-length if H2_SF_DATA_CLEN */
 	struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */
-	struct wait_event *recv_wait; /* recv wait_event the conn_stream associated is waiting on (via h2_subscribe) */
-	struct wait_event *send_wait; /* send wait_event the conn_stream associated is waiting on (via h2_subscribe) */
+	struct wait_event *subs;      /* recv wait_event the conn_stream associated is waiting on (via h2_subscribe) */
 	struct list list; /* To be used when adding in h2c->send_list or h2c->fctl_lsit */
 	struct tasklet *shut_tl;  /* deferred shutdown tasklet, to retry to send an RST after we failed to,
 				   * in case there's no other subscription to do it */
@@ -1037,29 +1036,25 @@
 /* attempt to notify the data layer of recv availability */
 static void __maybe_unused h2s_notify_recv(struct h2s *h2s)
 {
-	struct wait_event *sw;
-
-	if (h2s->recv_wait) {
+	if (h2s->subs && h2s->subs->events & SUB_RETRY_RECV) {
 		TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s);
-		sw = h2s->recv_wait;
-		sw->events &= ~SUB_RETRY_RECV;
-		tasklet_wakeup(sw->tasklet);
-		h2s->recv_wait = NULL;
+		tasklet_wakeup(h2s->subs->tasklet);
+		h2s->subs->events &= ~SUB_RETRY_RECV;
+		if (!h2s->subs->events)
+			h2s->subs = NULL;
 	}
 }
 
 /* attempt to notify the data layer of send availability */
 static void __maybe_unused h2s_notify_send(struct h2s *h2s)
 {
-	struct wait_event *sw;
-
-	if (h2s->send_wait) {
+	if (h2s->subs && h2s->subs->events & SUB_RETRY_SEND) {
 		TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s);
-		sw = h2s->send_wait;
-		h2s->send_wait = NULL;
-		sw->events &= ~SUB_RETRY_SEND;
 		h2s->flags |= H2_SF_NOTIFIED;
-		tasklet_wakeup(sw->tasklet);
+		tasklet_wakeup(h2s->subs->tasklet);
+		h2s->subs->events &= ~SUB_RETRY_SEND;
+		if (!h2s->subs->events)
+			h2s->subs = NULL;
 	}
 	else if (h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)) {
 		TRACE_POINT(H2_EV_STRM_WAKE, h2s->h2c->conn, h2s);
@@ -1079,7 +1074,7 @@
 {
 	TRACE_ENTER(H2_EV_H2S_WAKE, h2s->h2c->conn, h2s);
 
-	if (h2s->recv_wait || h2s->send_wait ||
+	if (h2s->subs ||
 	    (h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW))) {
 		h2s_notify_recv(h2s);
 		h2s_notify_send(h2s);
@@ -1266,10 +1261,10 @@
 		b_free(&h2s->rxbuf);
 		offer_buffers(NULL, tasks_run_queue);
 	}
-	if (h2s->send_wait != NULL)
-		h2s->send_wait->events &= ~SUB_RETRY_SEND;
-	if (h2s->recv_wait != NULL)
-		h2s->recv_wait->events &= ~SUB_RETRY_RECV;
+
+	if (h2s->subs)
+		h2s->subs->events = 0;
+
 	/* There's no need to explicitly call unsubscribe here, the only
 	 * reference left would be in the h2c send_list/fctl_list, and if
 	 * we're in it, we're getting out anyway
@@ -1304,8 +1299,7 @@
 		pool_free(pool_head_h2s, h2s);
 		goto out;
 	}
-	h2s->send_wait = NULL;
-	h2s->recv_wait = NULL;
+	h2s->subs = NULL;
 	h2s->shut_tl->process = h2_deferred_shut;
 	h2s->shut_tl->context = h2s;
 	LIST_INIT(&h2s->list);
@@ -1982,7 +1976,8 @@
 		if (h2s->flags & H2_SF_BLK_SFCTL && h2s_mws(h2s) > 0) {
 			h2s->flags &= ~H2_SF_BLK_SFCTL;
 			LIST_DEL_INIT(&h2s->list);
-			if (h2s->send_wait || h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))
+			if ((h2s->subs && h2s->subs->events & SUB_RETRY_SEND) ||
+			    h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))
 				LIST_ADDQ(&h2c->send_list, &h2s->list);
 		}
 		node = eb32_next(node);
@@ -2322,7 +2317,8 @@
 		if (h2s_mws(h2s) > 0 && (h2s->flags & H2_SF_BLK_SFCTL)) {
 			h2s->flags &= ~H2_SF_BLK_SFCTL;
 			LIST_DEL_INIT(&h2s->list);
-			if (h2s->send_wait || h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))
+			if ((h2s->subs && h2s->subs->events & SUB_RETRY_SEND) ||
+			    h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))
 				LIST_ADDQ(&h2c->send_list, &h2s->list);
 		}
 	}
@@ -3263,17 +3259,18 @@
 		/* If the sender changed his mind and unsubscribed, let's just
 		 * remove the stream from the send_list.
 		 */
-		if (!h2s->send_wait &&
-		    !(h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW))) {
+		if (!(h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW)) &&
+		    (!h2s->subs || !(h2s->subs->events & SUB_RETRY_SEND))) {
 			LIST_DEL_INIT(&h2s->list);
 			continue;
 		}
 
-		if (h2s->send_wait) {
-			h2s->send_wait->events &= ~SUB_RETRY_SEND;
+		if (h2s->subs && h2s->subs->events & SUB_RETRY_SEND) {
 			h2s->flags |= H2_SF_NOTIFIED;
-			tasklet_wakeup(h2s->send_wait->tasklet);
-			h2s->send_wait = NULL;
+			tasklet_wakeup(h2s->subs->tasklet);
+			h2s->subs->events &= ~SUB_RETRY_SEND;
+			if (!h2s->subs->events)
+				h2s->subs = NULL;
 		}
 		else if (h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW)) {
 			tasklet_wakeup(h2s->shut_tl);
@@ -3861,7 +3858,7 @@
 	if (!(cs->conn->flags & CO_FL_ERROR) &&
 	    (h2c->st0 < H2_CS_ERROR) &&
 	    (h2s->flags & (H2_SF_BLK_MBUSY | H2_SF_BLK_MROOM | H2_SF_BLK_MFCTL)) &&
-	    ((h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)) || h2s->send_wait || h2s->recv_wait)) {
+	    ((h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)) || h2s->subs)) {
 		TRACE_DEVEL("leaving on stream blocked", H2_EV_STRM_END|H2_EV_H2S_BLK, h2c->conn, h2s);
 		return;
 	}
@@ -5594,32 +5591,30 @@
 
 /* Called from the upper layer, to subscribe to events, such as being able to send.
  * The <param> argument here is supposed to be a pointer to a wait_event struct
- * which will be passed to h2s->recv_wait or h2s->send_wait depending on the
- * event_type. The event_type must only be a combination of SUB_RETRY_RECV and
- * SUB_RETRY_SEND, other values will lead to -1 being returned. It always
- * returns 0 except for the error above.
+ * which will be passed to h2s->subs. The event_type must only be a
+ * combination of SUB_RETRY_RECV and SUB_RETRY_SEND, other values will lead to -1
+ * being returned. It always returns 0 except for the error above.
  */
 static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
 {
-	struct wait_event *sw;
+	struct wait_event *sw = param;
 	struct h2s *h2s = cs->ctx;
 	struct h2c *h2c = h2s->h2c;
 
 	TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2c->conn, h2s);
-	if (event_type & SUB_RETRY_RECV) {
+
+	BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
+	BUG_ON(h2s->subs && h2s->subs->events & event_type);
+	BUG_ON(h2s->subs && h2s->subs != sw);
+
+	sw->events |= event_type;
+	h2s->subs = sw;
+
+	if (event_type & SUB_RETRY_RECV)
 		TRACE_DEVEL("subscribe(recv)", H2_EV_STRM_RECV, h2c->conn, h2s);
-		sw = param;
-		BUG_ON(h2s->recv_wait != NULL || (sw->events & SUB_RETRY_RECV));
-		sw->events |= SUB_RETRY_RECV;
-		h2s->recv_wait = sw;
-		event_type &= ~SUB_RETRY_RECV;
-	}
+
 	if (event_type & SUB_RETRY_SEND) {
 		TRACE_DEVEL("subscribe(send)", H2_EV_STRM_SEND, h2c->conn, h2s);
-		sw = param;
-		BUG_ON(h2s->send_wait != NULL || (sw->events & SUB_RETRY_SEND));
-		sw->events |= SUB_RETRY_SEND;
-		h2s->send_wait = sw;
 		if (!(h2s->flags & H2_SF_BLK_SFCTL) &&
 		    !LIST_ADDED(&h2s->list)) {
 			if (h2s->flags & H2_SF_BLK_MFCTL)
@@ -5627,11 +5622,8 @@
 			else
 				LIST_ADDQ(&h2c->send_list, &h2s->list);
 		}
-		event_type &= ~SUB_RETRY_SEND;
 	}
 	TRACE_LEAVE(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2c->conn, h2s);
-	if (event_type != 0)
-		return -1;
 	return 0;
 }
 
@@ -5642,28 +5634,28 @@
  */
 static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param)
 {
-	struct wait_event *sw;
+	struct wait_event *sw = param;
 	struct h2s *h2s = cs->ctx;
 
 	TRACE_ENTER(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2s->h2c->conn, h2s);
-	if (event_type & SUB_RETRY_RECV) {
+
+	BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
+	BUG_ON(h2s->subs && h2s->subs != sw);
+
+	sw->events &= ~event_type;
+	if (!sw->events)
+		h2s->subs = NULL;
+
+	if (event_type & SUB_RETRY_RECV)
 		TRACE_DEVEL("unsubscribe(recv)", H2_EV_STRM_RECV, h2s->h2c->conn, h2s);
-		sw = param;
-		BUG_ON(h2s->recv_wait != sw);
-		sw->events &= ~SUB_RETRY_RECV;
-		h2s->recv_wait = NULL;
-	}
 
 	if (event_type & SUB_RETRY_SEND) {
 		TRACE_DEVEL("subscribe(send)", H2_EV_STRM_SEND, h2s->h2c->conn, h2s);
-		sw = param;
-		BUG_ON(h2s->send_wait != sw);
-		if (!(h2s->flags & (H2_SF_WANT_SHUTR|H2_SF_WANT_SHUTW)))
-			LIST_DEL_INIT(&h2s->list);
-		sw->events &= ~SUB_RETRY_SEND;
 		h2s->flags &= ~H2_SF_NOTIFIED;
-		h2s->send_wait = NULL;
+		if (!(h2s->flags & (H2_SF_WANT_SHUTR | H2_SF_WANT_SHUTW)))
+			LIST_DEL_INIT(&h2s->list);
 	}
+
 	TRACE_LEAVE(H2_EV_STRM_SEND|H2_EV_STRM_RECV, h2s->h2c->conn, h2s);
 	return 0;
 }