MEDIUM: stream-int/conn-stream: Handle I/O subscriptions in the conn-stream

wait_event structure is moved in the conn-stream. The tasklet is only
created if the conn-stream is attached to a mux and released when the mux is
detached. This implies a subtle change. In stream_int_chk_rcv() function,
the wakeup of the tasklet was removed because there is no longer tasklet at
this stage (stream_int_chk_rcv() is a callback function of si_embedded_ops).
diff --git a/include/haproxy/conn_stream-t.h b/include/haproxy/conn_stream-t.h
index a1e3538..3491cfe 100644
--- a/include/haproxy/conn_stream-t.h
+++ b/include/haproxy/conn_stream-t.h
@@ -157,6 +157,7 @@
 
 	unsigned int flags;                  /* CS_FL_* */
 	unsigned int hcto;                   /* half-closed timeout (0 = unset) */
+	struct wait_event wait_event;        /* We're in a wait list */
 	struct cs_endpoint *endp;            /* points to the end point (MUX stream or appctx) */
 	enum obj_type *app;                  /* points to the applicative point (stream or check) */
 	struct stream_interface *si;
diff --git a/include/haproxy/stream_interface-t.h b/include/haproxy/stream_interface-t.h
index c435b31..2b44e30 100644
--- a/include/haproxy/stream_interface-t.h
+++ b/include/haproxy/stream_interface-t.h
@@ -63,8 +63,6 @@
 	unsigned int flags;     /* SI_FL_* */
 	struct conn_stream *cs; /* points to the conn-streams that owns the endpoint (connection or applet) */
 	struct si_ops *ops;     /* general operations at the stream interface layer */
-
-	struct wait_event wait_event; /* We're in a wait list */
 };
 
 /* operations available on a stream-interface */
diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h
index 9aba847..ce2e459 100644
--- a/include/haproxy/stream_interface.h
+++ b/include/haproxy/stream_interface.h
@@ -108,12 +108,6 @@
 	si->flags         &= SI_FL_ISBACK;
 	si->cs             = NULL;
 	si->ops            = &si_embedded_ops;
-	si->wait_event.tasklet = tasklet_new();
-	if (!si->wait_event.tasklet)
-		return -1;
-	si->wait_event.tasklet->process = si_cs_io_cb;
-	si->wait_event.tasklet->context = si;
-	si->wait_event.events = 0;
 	return 0;
 }
 
diff --git a/src/conn_stream.c b/src/conn_stream.c
index b632912..68db216 100644
--- a/src/conn_stream.c
+++ b/src/conn_stream.c
@@ -64,6 +64,9 @@
 	cs->data_cb = NULL;
 	cs->src = NULL;
 	cs->dst = NULL;
+	cs->wait_event.tasklet = NULL;
+	cs->wait_event.events = 0;
+
 	if (!endp) {
 		endp = cs_endpoint_new();
 		if (unlikely(!endp))
@@ -156,6 +159,8 @@
 		BUG_ON(!(cs->endp->flags & CS_EP_DETACHED));
 		cs_endpoint_free(cs->endp);
 	}
+	if (cs->wait_event.tasklet)
+		tasklet_free(cs->wait_event.tasklet);
 	pool_free(pool_head_connstream, cs);
 }
 
@@ -172,6 +177,15 @@
 	if (!conn->ctx)
 		conn->ctx = cs;
 	if (cs_strm(cs)) {
+		if (!cs->wait_event.tasklet) {
+			cs->wait_event.tasklet = tasklet_new();
+			if (!cs->wait_event.tasklet)
+				return -1;
+			cs->wait_event.tasklet->process = si_cs_io_cb;
+			cs->wait_event.tasklet->context = cs->si;
+			cs->wait_event.events = 0;
+		}
+
 		cs->si->ops = &si_conn_ops;
 		cs->data_cb = &si_conn_cb;
 	}
@@ -204,8 +218,19 @@
 	cs->si = si_new(cs);
 	if (unlikely(!cs->si))
 		return -1;
+
 	cs->endp->flags &= ~CS_EP_ORPHAN;
 	if (cs->endp->flags & CS_EP_T_MUX) {
+		cs->wait_event.tasklet = tasklet_new();
+		if (!cs->wait_event.tasklet) {
+			si_free(cs->si);
+			cs->si = NULL;
+			return -1;
+		}
+		cs->wait_event.tasklet->process = si_cs_io_cb;
+		cs->wait_event.tasklet->context = cs->si;
+		cs->wait_event.events = 0;
+
 		cs->si->ops = &si_conn_ops;
 		cs->data_cb = &si_conn_cb;
 	}
@@ -237,8 +262,8 @@
 		if (conn->mux) {
 			/* TODO: handle unsubscribe for healthchecks too */
 			cs->endp->flags |= CS_EP_ORPHAN;
-			if (cs->si && cs->si->wait_event.events != 0)
-				conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event);
+			if (cs->wait_event.events != 0)
+				conn->mux->unsubscribe(cs, cs->wait_event.events, &cs->wait_event);
 			conn->mux->detach(cs);
 			cs->endp = NULL;
 		}
@@ -290,6 +315,12 @@
 	cs->data_cb = NULL;
 	sockaddr_free(&cs->src);
 	sockaddr_free(&cs->dst);
+
+	if (cs->wait_event.tasklet)
+		tasklet_free(cs->wait_event.tasklet);
+	cs->wait_event.tasklet = NULL;
+	cs->wait_event.events = 0;
+
 	if (!cs->endp || (cs->endp->flags & CS_EP_DETACHED))
 		cs_free(cs);
 }
diff --git a/src/stream.c b/src/stream.c
index ec6e1f6..ec7c159 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -1502,9 +1502,8 @@
 		 * mux will probably want to subscribe to
 		 * the underlying XPRT
 		 */
-		if (cs_si(s->csf)->wait_event.events)
-			conn->mux->unsubscribe(cs, cs_si(s->csf)->wait_event.events,
-					       &(cs_si(s->csf)->wait_event));
+		if (s->csf->wait_event.events)
+			conn->mux->unsubscribe(cs, s->csf->wait_event.events, &(s->csf->wait_event));
 
 		if (conn->mux->flags & MX_FL_NO_UPG)
 			return 0;
@@ -3278,22 +3277,22 @@
 			      strm->txn->req.flags, strm->txn->rsp.flags);
 
 		chunk_appendf(&trash,
-			     "  si[0]=%p (flags=0x%02x endp0=%s:%p sub=%d)\n",
+			     "  si[0]=%p (flags=0x%02x endp0=%s:%p)\n",
 			     strm->csf->si,
 			     strm->csf->si->flags,
 			     (strm->csf->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"),
-			      __cs_endp_target(strm->csf), strm->csf->si->wait_event.events);
+			      __cs_endp_target(strm->csf));
 
 		chunk_appendf(&trash,
-			     "  si[1]=%p (flags=0x%02x endp1=%s:%p sub=%d)\n",
+			     "  si[1]=%p (flags=0x%02x endp1=%s:%p)\n",
 			     strm->csb->si,
 			     strm->csb->si->flags,
 			     (strm->csb->endp->flags & CS_EP_T_MUX ? "CONN" : "APPCTX"),
-			      __cs_endp_target(strm->csb), strm->csb->si->wait_event.events);
+			      __cs_endp_target(strm->csb));
 
 		csf = strm->csf;
-		chunk_appendf(&trash, "  cs=%p csf=0x%08x state=%s endp=%p,0x%08x\n", csf, csf->flags,
-			      cs_state_str(csf->state), csf->endp->target, csf->endp->flags);
+		chunk_appendf(&trash, "  cs=%p csf=0x%08x state=%s endp=%p,0x%08x sub=%d\n", csf, csf->flags,
+			      cs_state_str(csf->state), csf->endp->target, csf->endp->flags, csf->wait_event.events);
 
 		if ((conn = cs_conn(csf)) != NULL) {
 			chunk_appendf(&trash,
@@ -3329,8 +3328,8 @@
 		}
 
 		csb = strm->csb;
-		chunk_appendf(&trash, "  cs=%p csb=0x%08x state=%s endp=%p,0x%08x\n", csb, csb->flags,
-			      cs_state_str(csb->state), csb->endp->target, csb->endp->flags);
+		chunk_appendf(&trash, "  cs=%p csb=0x%08x state=%s endp=%p,0x%08x sub=%d\n", csb, csb->flags,
+			      cs_state_str(csb->state), csb->endp->target, csb->endp->flags, csb->wait_event.events);
 		if ((conn = cs_conn(csb)) != NULL) {
 			chunk_appendf(&trash,
 			              "      co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 139b654..d206e9c 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -124,7 +124,6 @@
 	if (!si)
 		return;
 
-	tasklet_free(si->wait_event.tasklet);
 	pool_free(pool_head_streaminterface, si);
 }
 
@@ -235,7 +234,6 @@
 	}
 	else {
 		/* (re)start reading */
-		tasklet_wakeup(si->wait_event.tasklet);
 		if (!(si->cs->flags & CS_FL_DONT_WAKE))
 			task_wakeup(si_task(si), TASK_WOKEN_IO);
 	}
@@ -555,7 +553,7 @@
 	BUG_ON(!conn);
 
 	/* If we have data to send, try it now */
-	if (!channel_is_empty(oc) && !(si->wait_event.events & SUB_RETRY_SEND))
+	if (!channel_is_empty(oc) && !(si->cs->wait_event.events & SUB_RETRY_SEND))
 		si_cs_send(cs);
 
 	/* First step, report to the conn-stream what was detected at the
@@ -658,7 +656,7 @@
 	}
 
 	/* We're already waiting to be able to send, give up */
-	if (si->wait_event.events & SUB_RETRY_SEND)
+	if (si->cs->wait_event.events & SUB_RETRY_SEND)
 		return 0;
 
 	/* we might have been called just after an asynchronous shutw */
@@ -773,11 +771,11 @@
 
 	/* We couldn't send all of our data, let the mux know we'd like to send more */
 	if (!channel_is_empty(oc))
-		conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->wait_event);
+		conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->cs->wait_event);
 	return did_send;
 }
 
-/* This is the ->process() function for any stream-interface's wait_event task.
+/* This is the ->process() function for any conn-stream's wait_event task.
  * It's assigned during the stream-interface's initialization, for any type of
  * stream interface. Thus it is always safe to perform a tasklet_wakeup() on a
  * stream interface, as the presence of the CS is checked there.
@@ -791,9 +789,9 @@
 	if (!cs_conn(cs))
 		return t;
 
-	if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
+	if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
 		ret = si_cs_send(cs);
-	if (!(si->wait_event.events & SUB_RETRY_RECV))
+	if (!(cs->wait_event.events & SUB_RETRY_RECV))
 		ret |= si_cs_recv(cs);
 	if (ret != 0)
 		si_cs_process(cs);
@@ -909,7 +907,7 @@
 	if (!cs_conn_mux(si->cs))
 		return 0; // only conn_streams are supported
 
-	if (si->wait_event.events & SUB_RETRY_RECV)
+	if (si->cs->wait_event.events & SUB_RETRY_RECV)
 		return 0; // already subscribed
 
 	if (!si_rx_endp_ready(si) || si_rx_blocked(si))
@@ -1111,7 +1109,7 @@
 {
 	/* (re)start reading */
 	if (cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
-		tasklet_wakeup(si->wait_event.tasklet);
+		tasklet_wakeup(si->cs->wait_event.tasklet);
 }
 
 
@@ -1138,7 +1136,7 @@
 	    !(si->flags & SI_FL_WAIT_DATA))       /* not waiting for data */
 		return;
 
-	if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
+	if (!(si->cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
 		si_cs_send(cs);
 
 	if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(si)) {
@@ -1232,7 +1230,7 @@
 	/* If another call to si_cs_recv() failed, and we subscribed to
 	 * recv events already, give up now.
 	 */
-	if (si->wait_event.events & SUB_RETRY_RECV)
+	if (si->cs->wait_event.events & SUB_RETRY_RECV)
 		return 0;
 
 	/* maybe we were called immediately after an asynchronous shutr */
@@ -1532,7 +1530,7 @@
 	}
 	else if (!si_rx_blocked(si)) {
 		/* Subscribe to receive events if we're blocking on I/O */
-		conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->wait_event);
+		conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->cs->wait_event);
 		si_rx_endp_done(si);
 	} else {
 		si_rx_endp_more(si);