MEDIUM: connections: Don't directly mess with the polling from the upper layers.

Avoid using conn_xprt_want_send/recv, and totally nuke cs_want_send/recv,
from the upper layers. The polling is now directly handled by the connection
layer, it is activated on subscribe(), and unactivated once we got the event
and we woke the related task.
diff --git a/src/stream_interface.c b/src/stream_interface.c
index a0487ef..8057d27 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -66,7 +66,6 @@
 
 /* stream-interface operations for connections */
 struct si_ops si_conn_ops = {
-	.update  = stream_int_update_conn,
 	.chk_rcv = stream_int_chk_rcv_conn,
 	.chk_snd = stream_int_chk_snd_conn,
 	.shutr   = stream_int_shutr_conn,
@@ -258,6 +257,7 @@
 	else {
 		/* (re)start reading */
 		si->flags &= ~SI_FL_WAIT_ROOM;
+		tasklet_wakeup(si->wait_event.task);
 		if (!(si->flags & SI_FL_DONT_WAKE))
 			task_wakeup(si_task(si), TASK_WOKEN_IO);
 	}
@@ -518,8 +518,10 @@
 		/* check if the consumer has freed some space either in the
 		 * buffer or in the pipe.
 		 */
-		if (channel_may_recv(ic) && new_len < last_len)
+		if (channel_may_recv(ic) && new_len < last_len) {
+			tasklet_wakeup(si->wait_event.task);
 			si->flags &= ~SI_FL_WAIT_ROOM;
+		}
 	}
 
 	if (si->flags & SI_FL_WAIT_ROOM) {
@@ -566,6 +568,7 @@
 	struct stream_interface *si = cs->data;
 	struct channel *ic = si_ic(si);
 	struct channel *oc = si_oc(si);
+	int wait_room = si->flags & SI_FL_WAIT_ROOM;
 
 	/* If we have data to send, try it now */
 	if (!channel_is_empty(oc) && objt_cs(si->end))
@@ -600,20 +603,9 @@
 	stream_int_notify(si);
 	channel_release_buffer(ic, &(si_strm(si)->buffer_wait));
 
-	/* Third step : update the connection's polling status based on what
-	 * was done above (eg: maybe some buffers got emptied).
-	 */
-	if (channel_is_empty(oc))
-		__cs_stop_send(cs);
-
-
-	if (si->flags & SI_FL_WAIT_ROOM) {
-		__cs_stop_recv(cs);
-	}
-	else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
-		 channel_may_recv(ic)) {
-		__cs_want_recv(cs);
-	}
+	/* Try to recv() again if we free'd some room in the process */
+	if (wait_room && !(si->flags & SI_FL_WAIT_ROOM))
+		si_cs_recv(cs);
 	return 0;
 }
 
@@ -720,10 +712,8 @@
 		}
 	}
 	/* We couldn't send all of our data, let the mux know we'd like to send more */
-	if (co_data(oc)) {
-		cs_want_send(cs);
+	if (co_data(oc))
 		conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_event);
-	}
 	return did_send;
 }
 
@@ -776,6 +766,7 @@
 			 * have updated it if there has been a completed I/O.
 			 */
 			si->flags &= ~SI_FL_WAIT_ROOM;
+			tasklet_wakeup(si->wait_event.task);
 			if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
 				ic->rex = tick_add_ifset(now_ms, ic->rto);
 		}
@@ -814,37 +805,6 @@
 	}
 }
 
-/* Updates the polling status of a connection outside of the connection handler
- * based on the channel's flags and the stream interface's flags. It needs to be
- * called once after the channels' flags have settled down and the stream has
- * been updated. It is not designed to be called from within the connection
- * handler itself.
- */
-void stream_int_update_conn(struct stream_interface *si)
-{
-	struct channel *ic = si_ic(si);
-	struct channel *oc = si_oc(si);
-	struct conn_stream *cs = __objt_cs(si->end);
-
-	if (!(ic->flags & CF_SHUTR)) {
-		/* Read not closed */
-		if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic))
-			__cs_stop_recv(cs);
-		else
-			__cs_want_recv(cs);
-	}
-
-	if (!(oc->flags & CF_SHUTW)) {
-		/* Write not closed */
-		if (channel_is_empty(oc))
-			__cs_stop_send(cs);
-		else
-			__cs_want_send(cs);
-	}
-
-	cs_update_mux_polling(cs);
-}
-
 /*
  * This function performs a shutdown-read on a stream interface attached to
  * a connection in a connected or init state (it does nothing for other
@@ -858,7 +818,6 @@
 static void stream_int_shutr_conn(struct stream_interface *si)
 {
 	struct conn_stream *cs = __objt_cs(si->end);
-	struct connection *conn = cs->conn;
 	struct channel *ic = si_ic(si);
 
 	ic->flags &= ~CF_SHUTR_NOW;
@@ -880,10 +839,6 @@
 		/* we want to immediately forward this close to the write side */
 		return stream_int_shutw_conn(si);
 	}
-	else if (conn->ctrl) {
-		/* we want the caller to disable polling on this FD */
-		cs_stop_recv(cs);
-	}
 }
 
 /*
@@ -980,24 +935,23 @@
 static void stream_int_chk_rcv_conn(struct stream_interface *si)
 {
 	struct channel *ic = si_ic(si);
-	struct conn_stream *cs = __objt_cs(si->end);
 
 	if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR)))
 		return;
 
 	if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
 		/* stop reading */
-		if (!(ic->flags & CF_DONT_READ)) /* full */ {
-			si->flags |= SI_FL_WAIT_ROOM;
-		}
-		__cs_stop_recv(cs);
+		si->flags |= SI_FL_WAIT_ROOM;
 	}
 	else {
+		struct conn_stream *cs = objt_cs(si->end);
 		/* (re)start reading */
 		si->flags &= ~SI_FL_WAIT_ROOM;
-		__cs_want_recv(cs);
+		if (cs) {
+			si_cs_recv(cs);
+			tasklet_wakeup(si->wait_event.task);
 	}
-	cs_update_mux_polling(cs);
+	}
 }
 
 
@@ -1024,20 +978,10 @@
 	    !(si->flags & SI_FL_WAIT_DATA))       /* not waiting for data */
 		return;
 
-	if (cs->flags & CS_FL_DATA_WR_ENA) {
-		/* already subscribed to write notifications, will be called
-		 * anyway, so let's avoid calling it especially if the reader
-		 * is not ready.
-		 */
-		return;
-	}
-
-	__cs_want_send(cs);
-
 	si_cs_send(cs);
+	tasklet_wakeup(si->wait_event.task);
 	if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) {
 		/* Write error on the file descriptor */
-		__cs_stop_both(cs);
 		si->flags |= SI_FL_ERR;
 		goto out_wakeup;
 	}
@@ -1051,7 +995,6 @@
 		 * ->o limit was reached. Maybe we just wrote the last
 		 * chunk and need to close.
 		 */
-		__cs_stop_send(cs);
 		if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
 		     (CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
 		    (si->state == SI_ST_EST)) {
@@ -1067,7 +1010,6 @@
 		/* Otherwise there are remaining data to be sent in the buffer,
 		 * which means we have to poll before doing so.
 		 */
-		__cs_want_send(cs);
 		si->flags &= ~SI_FL_WAIT_DATA;
 		if (!tick_isset(oc->wex))
 			oc->wex = tick_add_ifset(now_ms, oc->wto);
@@ -1105,9 +1047,6 @@
 		if (!(si->flags & SI_FL_DONT_WAKE))
 			task_wakeup(si_task(si), TASK_WOKEN_IO);
 	}
-
-	/* commit possible polling changes */
-	cs_update_mux_polling(cs);
 }
 
 /*
@@ -1208,7 +1147,6 @@
 			 * could soon be full. Let's stop before needing to poll.
 			 */
 			si->flags |= SI_FL_WAIT_ROOM;
-			__cs_stop_recv(cs);
 		}
 
 		/* splice not possible (anymore), let's go on on standard copy */
@@ -1274,7 +1212,6 @@
 			 * This was changed to accomodate with the mux code,
 			 * but we may have lost a worthwhile optimization.
 			 */
-			__cs_stop_recv(cs);
 			si->flags |= SI_FL_WAIT_ROOM;
 			break;
 		}
@@ -1347,9 +1284,10 @@
 		goto out_shutdown_r;
 
 	/* Subscribe to receive events */
-	conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
+	if (!(si->flags & SI_FL_WAIT_ROOM))
+		conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
 
-	return cur_read != 0;
+	return (cur_read != 0 || (si->flags & SI_FL_WAIT_ROOM));
 
  out_shutdown_r:
 	/* we received a shutdown */
@@ -1392,7 +1330,6 @@
 	}
 
 	/* otherwise that's just a normal read shutdown */
-	__cs_stop_recv(cs);
 	return;
 
  do_close: