MEDIUM: mux-fcgi: merge recv_wait and send_wait event notifications

This is the last of the "recv_wait+send_wait merge" patches and is
functionally equivalent to previous commit "MEDIUM: mux-h2: merge
recv_wait and send_wait event notifications" but for FCGI this time.
The principle is pretty much the same, since the code is very similar.
We use a single wait_event for both recv and send and rely on the
subscribe flags to know the desired notifications.
diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c
index bad2e30..8e98f1b 100644
--- a/src/mux_fcgi.c
+++ b/src/mux_fcgi.c
@@ -164,8 +164,7 @@
 	struct buffer rxbuf;          /* receive buffer, always valid (buf_empty or real buffer) */
 
 	struct eb32_node by_id;       /* place in fcgi_conn's streams_by_id */
-	struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */
-	struct wait_event *send_wait; /* Address of the wait_event the conn_stream associated is waiting on */
+	struct wait_event *subs;      /* Address of the wait_event the conn_stream associated is waiting on */
 	struct list send_list;        /* To be used when adding in fcgi_conn->send_list */
 	struct tasklet *shut_tl;      /* deferred shutdown tasklet, to retry to close after we failed to by lack of space */
 };
@@ -911,29 +910,25 @@
 /* Attempts to notify the data layer of recv availability */
 static void fcgi_strm_notify_recv(struct fcgi_strm *fstrm)
 {
-	struct wait_event *sw;
-
-	if (fstrm->recv_wait) {
+	if (fstrm->subs && (fstrm->subs->events & SUB_RETRY_RECV)) {
 		TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
-		sw = fstrm->recv_wait;
-		sw->events &= ~SUB_RETRY_RECV;
-		tasklet_wakeup(sw->tasklet);
-		fstrm->recv_wait = NULL;
+		tasklet_wakeup(fstrm->subs->tasklet);
+		fstrm->subs->events &= ~SUB_RETRY_RECV;
+		if (!fstrm->subs->events)
+			fstrm->subs = NULL;
 	}
 }
 
 /* Attempts to notify the data layer of send availability */
 static void fcgi_strm_notify_send(struct fcgi_strm *fstrm)
 {
-	struct wait_event *sw;
-
-	if (fstrm->send_wait) {
+	if (fstrm->subs && (fstrm->subs->events & SUB_RETRY_SEND)) {
 		TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
-		sw = fstrm->send_wait;
-		sw->events &= ~SUB_RETRY_SEND;
 		fstrm->flags |= FCGI_SF_NOTIFIED;
-		tasklet_wakeup(sw->tasklet);
-		fstrm->send_wait = NULL;
+		tasklet_wakeup(fstrm->subs->tasklet);
+		fstrm->subs->events &= ~SUB_RETRY_SEND;
+		if (!fstrm->subs->events)
+			fstrm->subs = NULL;
 	}
 	else if (fstrm->flags & (FCGI_SF_WANT_SHUTR | FCGI_SF_WANT_SHUTW)) {
 		TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
@@ -953,7 +948,7 @@
 static void fcgi_strm_alert(struct fcgi_strm *fstrm)
 {
 	TRACE_POINT(FCGI_EV_STRM_WAKE, fstrm->fconn->conn, fstrm);
-	if (fstrm->recv_wait || fstrm->send_wait ||
+	if (fstrm->subs ||
 	    (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
 		fcgi_strm_notify_recv(fstrm);
 		fcgi_strm_notify_send(fstrm);
@@ -1018,10 +1013,8 @@
 		b_free(&fstrm->rxbuf);
 		offer_buffers(NULL, tasks_run_queue);
 	}
-	if (fstrm->send_wait != NULL)
-		fstrm->send_wait->events &= ~SUB_RETRY_SEND;
-	if (fstrm->recv_wait != NULL)
-		fstrm->recv_wait->events &= ~SUB_RETRY_RECV;
+	if (fstrm->subs)
+		fstrm->subs->events = 0;
 	/* There's no need to explicitly call unsubscribe here, the only
 	 * reference left would be in the fconn send_list/fctl_list, and if
 	 * we're in it, we're getting out anyway
@@ -1054,8 +1047,7 @@
 		pool_free(pool_head_fcgi_strm, fstrm);
 		goto out;
 	}
-	fstrm->send_wait = NULL;
-	fstrm->recv_wait = NULL;
+	fstrm->subs = NULL;
 	fstrm->shut_tl->process = fcgi_deferred_shut;
 	fstrm->shut_tl->context = fstrm;
 	LIST_INIT(&fstrm->send_list);
@@ -2653,17 +2645,20 @@
 		/* If the sender changed his mind and unsubscribed, let's just
 		 * remove the stream from the send_list.
 		 */
-		if (!fstrm->send_wait &&
-		    !(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
+		if (!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)) &&
+		    (!fstrm->subs || !(fstrm->subs->events & SUB_RETRY_SEND))) {
 			LIST_DEL_INIT(&fstrm->send_list);
 			continue;
 		}
-		if (fstrm->send_wait) {
+
+		if (fstrm->subs && fstrm->subs->events & SUB_RETRY_SEND) {
 			TRACE_POINT(FCGI_EV_STRM_WAKE, fconn->conn, fstrm);
 			fstrm->flags &= ~FCGI_SF_BLK_ANY;
-			fstrm->send_wait->events &= ~SUB_RETRY_SEND;
 			fstrm->flags |= FCGI_SF_NOTIFIED;
-			tasklet_wakeup(fstrm->send_wait->tasklet);
+			tasklet_wakeup(fstrm->subs->tasklet);
+			fstrm->subs->events &= ~SUB_RETRY_SEND;
+			if (!fstrm->subs->events)
+				fstrm->subs = NULL;
 		} else {
 			/* it's the shut request that was queued */
 			TRACE_POINT(FCGI_EV_STRM_WAKE, fconn->conn, fstrm);
@@ -2868,17 +2863,20 @@
 			/* If the sender changed his mind and unsubscribed, let's just
 			 * remove the stream from the send_list.
 			 */
-			if (!fstrm->send_wait &&
-			    !(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW))) {
+			if (!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)) &&
+			    (!fstrm->subs || !(fstrm->subs->events & SUB_RETRY_SEND))) {
 				LIST_DEL_INIT(&fstrm->send_list);
 				continue;
 			}
-			if (fstrm->send_wait) {
-				fstrm->flags &= ~FCGI_SF_BLK_ANY;
-				fstrm->send_wait->events &= ~SUB_RETRY_SEND;
+
+			if (fstrm->subs && fstrm->subs->events & SUB_RETRY_SEND) {
 				TRACE_DEVEL("waking up pending stream", FCGI_EV_FCONN_SEND|FCGI_EV_STRM_WAKE, conn, fstrm);
-				tasklet_wakeup(fstrm->send_wait->tasklet);
+				fstrm->flags &= ~FCGI_SF_BLK_ANY;
 				fstrm->flags |= FCGI_SF_NOTIFIED;
+				tasklet_wakeup(fstrm->subs->tasklet);
+				fstrm->subs->events &= ~SUB_RETRY_SEND;
+				if (!fstrm->subs->events)
+					fstrm->subs = NULL;
 			} else {
 				/* it's the shut request that was queued */
 				TRACE_POINT(FCGI_EV_STRM_WAKE, fconn->conn, fstrm);
@@ -3448,7 +3446,7 @@
 	if (!(cs->conn->flags & CO_FL_ERROR) &&
 	    (fconn->state != FCGI_CS_CLOSED) &&
 	    (fstrm->flags & (FCGI_SF_BLK_MBUSY|FCGI_SF_BLK_MROOM)) &&
-	    (fstrm->send_wait || fstrm->recv_wait || (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))) {
+	    (fstrm->subs || (fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))) {
 		TRACE_DEVEL("leaving on stream blocked", FCGI_EV_STRM_END|FCGI_EV_FSTRM_BLK, fconn->conn, fstrm);
 		return;
 	}
@@ -3699,37 +3697,31 @@
 
 /* Called from the upper layer, to subscribe to events, such as being able to send.
  * The <param> argument here is supposed to be a pointer to a wait_event struct
- * which will be passed to fstrm->recv_wait or fstrm->send_wait depending on the
- * event_type. The event_type must only be a combination of SUB_RETRY_RECV and
- * SUB_RETRY_SEND, other values will lead to -1 being returned. It always
- * returns 0 except for the error above.
+ * which will be passed to fstrm->subs. The event_type must only be a
+ * combination of SUB_RETRY_RECV and SUB_RETRY_SEND, other values will lead to -1
+ * being returned. It always returns 0 except for the error above.
  */
 static int fcgi_subscribe(struct conn_stream *cs, int event_type, void *param)
 {
-	struct wait_event *sw;
+	struct wait_event *sw = param;
 	struct fcgi_strm *fstrm = cs->ctx;
 	struct fcgi_conn *fconn = fstrm->fconn;
 
+	BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
+	BUG_ON(fstrm->subs && fstrm->subs->events & event_type);
+	BUG_ON(fstrm->subs && fstrm->subs != sw);
+
-	if (event_type & SUB_RETRY_RECV) {
+	sw->events |= event_type;
+	fstrm->subs = sw;
+
+	if (event_type & SUB_RETRY_RECV)
 		TRACE_DEVEL("unsubscribe(recv)", FCGI_EV_STRM_RECV, fconn->conn, fstrm);
-		sw = param;
-		BUG_ON(fstrm->recv_wait != NULL || (sw->events & SUB_RETRY_RECV));
-		sw->events |= SUB_RETRY_RECV;
-		fstrm->recv_wait = sw;
-		event_type &= ~SUB_RETRY_RECV;
-	}
+
 	if (event_type & SUB_RETRY_SEND) {
 		TRACE_DEVEL("unsubscribe(send)", FCGI_EV_STRM_SEND, fconn->conn, fstrm);
-		sw = param;
-		BUG_ON(fstrm->send_wait != NULL || (sw->events & SUB_RETRY_SEND));
-		sw->events |= SUB_RETRY_SEND;
-		fstrm->send_wait = sw;
 		if (!LIST_ADDED(&fstrm->send_list))
 			LIST_ADDQ(&fconn->send_list, &fstrm->send_list);
-		event_type &= ~SUB_RETRY_SEND;
 	}
-	if (event_type != 0)
-		return -1;
 	return 0;
 }
 
@@ -3740,26 +3732,25 @@
  */
 static int fcgi_unsubscribe(struct conn_stream *cs, int event_type, void *param)
 {
-	struct wait_event *sw;
+	struct wait_event *sw = param;
 	struct fcgi_strm *fstrm = cs->ctx;
 	struct fcgi_conn *fconn = fstrm->fconn;
 
+	BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
+	BUG_ON(fstrm->subs && fstrm->subs != sw);
+
+	sw->events &= ~event_type;
+	if (!sw->events)
+		fstrm->subs = NULL;
+
-	if (event_type & SUB_RETRY_RECV) {
+	if (event_type & SUB_RETRY_RECV)
 		TRACE_DEVEL("subscribe(recv)", FCGI_EV_STRM_RECV, fconn->conn, fstrm);
-		sw = param;
-		BUG_ON(fstrm->recv_wait != sw);
-		sw->events &= ~SUB_RETRY_RECV;
-		fstrm->recv_wait = NULL;
-	}
+
 	if (event_type & SUB_RETRY_SEND) {
 		TRACE_DEVEL("subscribe(send)", FCGI_EV_STRM_SEND, fconn->conn, fstrm);
-		sw = param;
-		BUG_ON(fstrm->send_wait != sw);
+		fstrm->flags &= ~FCGI_SF_NOTIFIED;
 		if (!(fstrm->flags & (FCGI_SF_WANT_SHUTR|FCGI_SF_WANT_SHUTW)))
 			LIST_DEL_INIT(&fstrm->send_list);
-		sw->events &= ~SUB_RETRY_SEND;
-		fstrm->flags &= ~FCGI_SF_NOTIFIED;
-		fstrm->send_wait = NULL;
 	}
 	return 0;
 }