MEDIUM: stream-int: make use of si_rx_chan_{rdy,blk} to control the stream-int from the channel

The channel can disable reading from the stream-interface using various
methods, such as :
  - CF_DONT_READ
  - !channel_may_recv()
  - and possibly others

Till now this was done by mangling SI_FL_RX_WAIT_EP which is not
appropriate at all since it's not the stream interface which decides
whether it wants to deliver data or not. Some places were also wrongly
relying on SI_FL_RXBLK_ROOM since it was the only other alternative,
but it's not suitable for CF_DONT_READ.

Let's use the SI_FL_RXBLK_CHAN flag for this instead. It will properly
prevent the stream interface from being woken up and reads from
subscribing to more receipt without being accidently removed. It is
automatically reset if CF_DONT_READ is not set in stream_int_notify().

The code is not trivial because it splits the logic between everything
related to buffer contents (channel_is_empty(), CF_WRITE_PARTIAL, etc)
and buffer policy (CF_DONT_READ). Also it now needs to decide timeouts
based on any blocking flag and not just SI_FL_RXBLK_ROOM anymore.

It looks like this patch has caused a minor performance degradation on
connection rate, which possibly deserves being investigated deeper as
the test conditions are uncertain (e.g. slightly more subscribe calls?).
diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h
index e7293d7..d1c022c 100644
--- a/include/proto/stream_interface.h
+++ b/include/proto/stream_interface.h
@@ -278,6 +278,18 @@
 	si->flags |=  SI_FL_RX_WAIT_EP;
 }
 
+/* Tell a stream interface the input channel is OK with it sending it some data */
+static inline void si_rx_chan_rdy(struct stream_interface *si)
+{
+	si->flags &= ~SI_FL_RXBLK_CHAN;
+}
+
+/* Tell a stream interface the input channel is not OK with it sending it some data */
+static inline void si_rx_chan_blk(struct stream_interface *si)
+{
+	si->flags |=  SI_FL_RXBLK_CHAN;
+}
+
 /* The stream interface just got the input buffer it was waiting for */
 static inline void si_rx_buff_rdy(struct stream_interface *si)
 {
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 80412e7..c349aac 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -481,10 +481,13 @@
 	}
 
 	if ((sio->flags & SI_FL_RXBLK_ROOM) &&
-	    ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL ||
-	     channel_is_empty(oc))) {
+	    ((oc->flags & CF_WRITE_PARTIAL) || channel_is_empty(oc)))
 		sio->flags &= ~SI_FL_RXBLK_ROOM;
-	}
+
+	if (oc->flags & CF_DONT_READ)
+		si_rx_chan_blk(sio);
+	else
+		si_rx_chan_rdy(sio);
 
 	/* Notify the other side when we've injected data into the IC that
 	 * needs to be forwarded. We can do fast-forwarding as soon as there
@@ -519,13 +522,16 @@
 			si->flags &= ~SI_FL_RXBLK_ROOM;
 	}
 
+	if (!(ic->flags & CF_DONT_READ))
+		si_rx_chan_rdy(si);
+
 	si_chk_rcv(si);
 	si_chk_rcv(sio);
 
-	if (si->flags & SI_FL_RXBLK_ROOM) {
+	if (si_rx_blocked(si)) {
 		ic->rex = TICK_ETERNITY;
 	}
-	else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL) {
+	else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) {
 		/* we must re-enable reading if si_chk_snd() has freed some space */
 		if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
 			ic->rex = tick_add_ifset(now_ms, ic->rto);
@@ -725,11 +731,8 @@
 
 	if (!(si->wait_event.wait_reason & SUB_CAN_SEND) && !channel_is_empty(si_oc(si)))
 		ret = si_cs_send(cs);
-	if (!(si->wait_event.wait_reason & SUB_CAN_RECV)) {
+	if (!(si->wait_event.wait_reason & SUB_CAN_RECV))
 		ret |= si_cs_recv(cs);
-		if (!(si_ic(si)->flags & (CF_SHUTR|CF_DONT_READ)))
-			si->flags &= ~SI_FL_RX_WAIT_EP;
-	}
 	if (ret != 0)
 		si_cs_process(cs);
 
@@ -751,10 +754,14 @@
 
 	if (!(ic->flags & CF_SHUTR)) {
 		/* Read not closed, update FD status and timeout for reads */
-		if ((ic->flags & CF_DONT_READ) || !channel_is_empty(ic)) {
+		if (ic->flags & CF_DONT_READ)
+			si_rx_chan_blk(si);
+		else
+			si_rx_chan_rdy(si);
+
+		if (!channel_is_empty(ic)) {
 			/* stop reading, imposed by channel's policy or contents */
 			si_cant_put(si);
-			ic->rex = TICK_ETERNITY;
 		}
 		else {
 			/* (re)start reading and update timeout. Note: we don't recompute the timeout
@@ -763,9 +770,12 @@
 			 * have updated it if there has been a completed I/O.
 			 */
 			si->flags &= ~SI_FL_RXBLK_ROOM;
-			if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
-				ic->rex = tick_add_ifset(now_ms, ic->rto);
 		}
+		if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
+			ic->rex = TICK_ETERNITY;
+		else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
+			ic->rex = tick_add_ifset(now_ms, ic->rto);
+
 		si_chk_rcv(si);
 	}
 	else
@@ -1254,7 +1264,7 @@
 
 		if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
 			/* we're stopped by the channel's policy */
-			si_cant_put(si);
+			si_rx_chan_blk(si);
 			break;
 		}
 
@@ -1269,7 +1279,7 @@
 			 */
 			if (ic->flags & CF_STREAMER) {
 				/* we're stopped by the channel's policy */
-				si_cant_put(si);
+				si_rx_chan_blk(si);
 				break;
 			}
 
@@ -1278,7 +1288,7 @@
 			 */
 			if (ret >= global.tune.recv_enough) {
 				/* we're stopped by the channel's policy */
-				si_cant_put(si);
+				si_rx_chan_blk(si);
 				break;
 			}
 		}
@@ -1286,7 +1296,7 @@
 		/* if we are waiting for more space, don't try to read more data
 		 * right now.
 		 */
-		if (si->flags & SI_FL_RXBLK_ROOM)
+		if (si_rx_blocked(si))
 			break;
 	} /* while !flags */