MEDIUM: stream_interface: Make recv() subscribe when more data is needed.

Refactor the code so that si_cs_recv() subscribes to receive events.
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 46e57d4..e5ddee6 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -51,10 +51,10 @@
 static void stream_int_shutw_applet(struct stream_interface *si);
 static void stream_int_chk_rcv_applet(struct stream_interface *si);
 static void stream_int_chk_snd_applet(struct stream_interface *si);
-static void si_cs_recv(struct conn_stream *cs);
+static int si_cs_recv(struct conn_stream *cs);
 static int si_cs_wake_cb(struct conn_stream *cs);
 static int si_idle_conn_wake_cb(struct conn_stream *cs);
-static struct task * si_cs_send(struct conn_stream *cs);
+static int si_cs_send(struct conn_stream *cs);
 
 /* stream-interface operations for embedded tasks */
 struct si_ops si_embedded_ops = {
@@ -631,7 +631,7 @@
  * caller to commit polling changes. The caller should check conn->flags
  * for errors.
  */
-static struct task * si_cs_send(struct conn_stream *cs)
+static int si_cs_send(struct conn_stream *cs)
 {
 	struct connection *conn = cs->conn;
 	struct stream_interface *si = cs->data;
@@ -641,21 +641,21 @@
 
 	/* We're already waiting to be able to send, give up */
 	if (si->wait_list.wait_reason & SUB_CAN_SEND)
-		return NULL;
+		return 0;
 
 	if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-		return NULL;
+		return 0;
 
 	if (conn->flags & CO_FL_HANDSHAKE) {
 		/* a handshake was requested */
 		/* Schedule ourself to be woken up once the handshake is done */
 		conn->xprt->subscribe(conn, SUB_CAN_SEND,  &si->wait_list);
-		return NULL;
+		return 0;
 	}
 
 	/* we might have been called just after an asynchronous shutw */
 	if (si_oc(si)->flags & CF_SHUTW)
-		return NULL;
+		return 0;
 
 	/* ensure it's only set if a write attempt has succeeded */
 	oc->flags &= ~CF_WRITE_PARTIAL;
@@ -673,7 +673,7 @@
 		}
 
 		if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-			return NULL;
+			return 0;
 	}
 
 	/* At this point, the pipe is empty, but we may still have data pending
@@ -753,20 +753,24 @@
 		}
 
 	}
-	return NULL;
+	return did_send;
 }
 
 struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
 {
 	struct stream_interface *si = ctx;
 	struct conn_stream *cs = objt_cs(si->end);
+	int ret = 0;
 
 	if (!cs)
 		return NULL;
-	if (!(si->wait_list.wait_reason & SUB_CAN_SEND)) {
-		si_cs_send(cs);
+	if (!(si->wait_list.wait_reason & SUB_CAN_SEND))
+		ret = si_cs_send(cs);
+	if (!(si->wait_list.wait_reason & SUB_CAN_RECV))
+		ret |= si_cs_recv(cs);
+	if (ret != 0)
 		si_cs_wake_cb(cs);
-	}
+
 	return (NULL);
 }
 
@@ -1138,12 +1142,12 @@
  * into the buffer from the connection. It iterates over the mux layer's
  * rcv_buf function.
  */
-static void si_cs_recv(struct conn_stream *cs)
+static int si_cs_recv(struct conn_stream *cs)
 {
 	struct connection *conn = cs->conn;
 	struct stream_interface *si = cs->data;
 	struct channel *ic = si_ic(si);
-	int ret, max, cur_read;
+	int ret, max, cur_read = 0;
 	int read_poll = MAX_READ_POLL_LOOPS;
 
 	/* stop immediately on errors. Note that we DON'T want to stop on
@@ -1153,18 +1157,22 @@
 	 * which rejects it before reading it all.
 	 */
 	if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-		return;
+		return 0;
+
+	/* If another call to si_cs_recv() failed, and we subscribed to
+	 * recv events already, give up now.
+	 */
+	if (si->wait_list.wait_reason & SUB_CAN_RECV)
+		return 0;
 
 	/* maybe we were called immediately after an asynchronous shutr */
 	if (ic->flags & CF_SHUTR)
-		return;
+		return 0;
 
 	/* stop here if we reached the end of data */
 	if (cs->flags & CS_FL_EOS)
 		goto out_shutdown_r;
 
-	cur_read = 0;
-
 	if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) &&
 	    global.tune.idle_timer &&
 	    (unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) {
@@ -1218,7 +1226,7 @@
 			goto out_shutdown_r;
 
 		if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-			return;
+			return cur_read != 0;
 
 		if (conn->flags & CO_FL_WAIT_ROOM) {
 			/* the pipe is full or we have read enough data that it
@@ -1377,13 +1385,16 @@
 
  end_recv:
 	if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
-		return;
+		return cur_read != 0;
 
 	if (cs->flags & CS_FL_EOS)
 		/* connection closed */
 		goto out_shutdown_r;
 
-	return;
+	/* Subscribe to receive events */
+	conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_list);
+
+	return cur_read != 0;
 
  out_shutdown_r:
 	/* we received a shutdown */
@@ -1391,7 +1402,7 @@
 	if (ic->flags & CF_AUTO_CLOSE)
 		channel_shutw_now(ic);
 	stream_sock_read0(si);
-	return;
+	return cur_read != 0;
 }
 
 /*