MEDIUM: connections: Don't directly mess with the polling from the upper layers.

Avoid using conn_xprt_want_send/recv, and totally nuke cs_want_send/recv,
from the upper layers. The polling is now directly handled by the connection
layer, it is activated on subscribe(), and unactivated once we got the event
and we woke the related task.
diff --git a/include/proto/connection.h b/include/proto/connection.h
index 90c4fdd..47825a0 100644
--- a/include/proto/connection.h
+++ b/include/proto/connection.h
@@ -291,17 +291,6 @@
 	}
 }
 
-/* recompute the mux polling flags after updating the current conn_stream and
- * propagate the result down the transport layer.
- */
-static inline void cs_update_mux_polling(struct conn_stream *cs)
-{
-	struct connection *conn = cs->conn;
-
-	if (conn->mux && conn->mux->update_poll)
-		conn->mux->update_poll(cs);
-}
-
 /***** Event manipulation primitives for use by DATA I/O callbacks *****/
 /* The __conn_* versions do not propagate to lower layers and are only meant
  * to be used by handlers called by the connection handler. The other ones
@@ -317,28 +306,6 @@
 	c->flags &= ~CO_FL_XPRT_RD_ENA;
 }
 
-static inline void __cs_want_recv(struct conn_stream *cs)
-{
-	cs->flags |= CS_FL_DATA_RD_ENA;
-}
-
-static inline void __cs_stop_recv(struct conn_stream *cs)
-{
-	cs->flags &= ~CS_FL_DATA_RD_ENA;
-}
-
-static inline void cs_want_recv(struct conn_stream *cs)
-{
-	__cs_want_recv(cs);
-	cs_update_mux_polling(cs);
-}
-
-static inline void cs_stop_recv(struct conn_stream *cs)
-{
-	__cs_stop_recv(cs);
-	cs_update_mux_polling(cs);
-}
-
 /* this one is used only to stop speculative recv(). It doesn't stop it if the
  * fd is already polled in order to avoid expensive polling status changes.
  * Since it might require the upper layer to re-enable reading, we'll return 1
@@ -368,40 +335,6 @@
 	c->flags &= ~(CO_FL_XPRT_WR_ENA | CO_FL_XPRT_RD_ENA);
 }
 
-static inline void __cs_want_send(struct conn_stream *cs)
-{
-	cs->flags |= CS_FL_DATA_WR_ENA;
-}
-
-static inline void __cs_stop_send(struct conn_stream *cs)
-{
-	cs->flags &= ~CS_FL_DATA_WR_ENA;
-}
-
-static inline void cs_stop_send(struct conn_stream *cs)
-{
-	__cs_stop_send(cs);
-	cs_update_mux_polling(cs);
-}
-
-static inline void cs_want_send(struct conn_stream *cs)
-{
-	__cs_want_send(cs);
-	cs_update_mux_polling(cs);
-}
-
-static inline void __cs_stop_both(struct conn_stream *cs)
-{
-	cs->flags &= ~(CS_FL_DATA_WR_ENA | CS_FL_DATA_RD_ENA);
-}
-
-static inline void cs_stop_both(struct conn_stream *cs)
-{
-	__cs_stop_both(cs);
-	cs_update_mux_polling(cs);
-}
-
-
 static inline void conn_xprt_want_recv(struct connection *c)
 {
 	__conn_xprt_want_recv(c);
@@ -546,7 +479,6 @@
 /* shut read */
 static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
 {
-	__cs_stop_recv(cs);
 
 	/* clean data-layer shutdown */
 	if (cs->conn->mux && cs->conn->mux->shutr)
@@ -557,7 +489,6 @@
 /* shut write */
 static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
 {
-	__cs_stop_send(cs);
 
 	/* clean data-layer shutdown */
 	if (cs->conn->mux && cs->conn->mux->shutw)
diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h
index 7860c79..2b50c06 100644
--- a/include/proto/stream_interface.h
+++ b/include/proto/stream_interface.h
@@ -199,7 +199,6 @@
 		LIST_ADD(pool, &conn->list);
 
 	cs_attach(cs, si, &si_idle_conn_cb);
-	cs_want_recv(cs);
 }
 
 /* Attach conn_stream <cs> to the stream interface <si>. The stream interface
@@ -349,13 +348,15 @@
 /* Calls chk_rcv on the connection using the data layer */
 static inline void si_chk_rcv(struct stream_interface *si)
 {
-	si->ops->chk_rcv(si);
+	if (si->ops->chk_rcv)
+		si->ops->chk_rcv(si);
 }
 
 /* Calls chk_snd on the connection using the data layer */
 static inline void si_chk_snd(struct stream_interface *si)
 {
-	si->ops->chk_snd(si);
+	if (si->ops->chk_snd)
+		si->ops->chk_snd(si);
 }
 
 /* Calls chk_snd on the connection using the ctrl layer */
@@ -378,10 +379,6 @@
 	}
 	else {
 		/* reuse the existing connection */
-		if (!channel_is_empty(si_oc(si))) {
-			/* we'll have to send a request there. */
-			cs_want_send(cs);
-		}
 
 		/* the connection is established */
 		si->state = SI_ST_EST;
diff --git a/include/types/connection.h b/include/types/connection.h
index 3a7de64..da53059 100644
--- a/include/types/connection.h
+++ b/include/types/connection.h
@@ -66,9 +66,6 @@
 /* conn_stream flags */
 enum {
 	CS_FL_NONE          = 0x00000000,  /* Just for initialization purposes */
-	CS_FL_DATA_RD_ENA   = 0x00000001,  /* receiving data is allowed */
-	CS_FL_DATA_WR_ENA   = 0x00000002,  /* sending data is desired */
-
 	CS_FL_SHRD          = 0x00000010,  /* read shut, draining extra data */
 	CS_FL_SHRR          = 0x00000020,  /* read shut, resetting extra data */
 	CS_FL_SHR           = CS_FL_SHRD | CS_FL_SHRR, /* read shut status */
@@ -315,7 +312,6 @@
 struct mux_ops {
 	int  (*init)(struct connection *conn, struct proxy *prx);  /* early initialization */
 	int  (*wake)(struct connection *conn);        /* mux-layer callback to report activity, mandatory */
-	void (*update_poll)(struct conn_stream *cs);  /* commit cs flags to mux/conn */
 	size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */
 	size_t (*snd_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to send data */
 	int  (*rcv_pipe)(struct conn_stream *cs, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */
diff --git a/src/checks.c b/src/checks.c
index a3110e7..4829a54 100644
--- a/src/checks.c
+++ b/src/checks.c
@@ -745,7 +745,6 @@
 
 	if (retrieve_errno_from_socket(conn)) {
 		chk_report_conn_err(check, errno, 0);
-		__cs_stop_both(cs);
 		goto out_wakeup;
 	}
 
@@ -771,7 +770,6 @@
 		b_realign_if_empty(&check->bo);
 		if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) {
 			chk_report_conn_err(check, errno, 0);
-			__cs_stop_both(cs);
 			goto out_wakeup;
 		}
 		if (b_data(&check->bo)) {
@@ -785,12 +783,10 @@
 		t->expire = tick_add_ifset(now_ms, s->proxy->timeout.check);
 		task_queue(t);
 	}
-	goto out_nowake;
+	goto out;
 
  out_wakeup:
 	task_wakeup(t, TASK_WOKEN_IO);
- out_nowake:
-	__cs_stop_send(cs);   /* nothing more to write */
  out:
 	return;
 }
@@ -1373,7 +1369,6 @@
 	 * range quickly.  To avoid sending RSTs all the time, we first try to
 	 * drain pending data.
 	 */
-	__cs_stop_both(cs);
 	cs_shutw(cs, CS_SHW_NORMAL);
 
 	/* OK, let's not stay here forever */
@@ -1385,7 +1380,6 @@
 	return;
 
  wait_more_data:
-	__cs_want_recv(cs);
 	cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &check->wait_list);
         goto out;
 }
@@ -1420,10 +1414,9 @@
 		 * we expect errno to still be valid.
 		 */
 		chk_report_conn_err(check, errno, 0);
-		__cs_stop_both(cs);
 		task_wakeup(check->task, TASK_WOKEN_IO);
 	}
-	else if (!(conn->flags & CO_FL_HANDSHAKE) && !(cs->flags & (CS_FL_DATA_RD_ENA|CS_FL_DATA_WR_ENA))) {
+	else if (!(conn->flags & CO_FL_HANDSHAKE) && !check->type) {
 		/* we may get here if only a connection probe was required : we
 		 * don't have any data to send nor anything expected in response,
 		 * so the completion of the connection establishment is enough.
@@ -1624,8 +1617,6 @@
 	if (proto && proto->connect)
 		ret = proto->connect(conn, check->type, quickack ? 2 : 0);
 
-	if (check->type)
-		cs_want_send(cs);
 
 #ifdef USE_OPENSSL
 	if (s->check.sni)
@@ -2180,10 +2171,8 @@
 				t->expire = tick_first(t->expire, t_con);
 			}
 
-			if (check->type) {
-				cs_want_recv(cs);   /* prepare for reading a possible reply */
+			if (check->type)
 				__event_srv_chk_r(cs);
-			}
 
 			task_set_affinity(t, tid_bit);
 			goto reschedule;
@@ -2683,10 +2672,6 @@
 			t->expire = tick_add_ifset(now_ms, s->proxy->timeout.check);
 	}
 
-	/* It's only the rules which will enable send/recv */
-	if (cs)
-		cs_stop_both(cs);
-
 	while (1) {
 		/* We have to try to flush the output buffer before reading, at
 		 * the end, or if we're about to send a string that does not fit
@@ -2699,14 +2684,12 @@
 		     check->current_step->string_len >= b_room(&check->bo))) {
 			int ret;
 
-			__cs_want_send(cs);
 			ret = cs->conn->mux->snd_buf(cs, &check->bo, b_data(&check->bo), 0);
 			b_realign_if_empty(&check->bo);
 
 			if (ret <= 0) {
 				if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) {
 					chk_report_conn_err(check, errno, 0);
-					__cs_stop_both(cs);
 					goto out_end_tcpcheck;
 				}
 				break;
@@ -2924,7 +2907,6 @@
 			if (unlikely(check->result == CHK_RES_FAILED))
 				goto out_end_tcpcheck;
 
-			__cs_want_recv(cs);
 			if (cs->conn->mux->rcv_buf(cs, &check->bi, b_size(&check->bi), 0) <= 0) {
 				if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) {
 					done = 1;
@@ -3026,7 +3008,6 @@
 
 					if (check->current_step->action == TCPCHK_ACT_EXPECT)
 						goto tcpcheck_expect;
-					__cs_stop_recv(cs);
 				}
 			}
 			else {
@@ -3046,7 +3027,6 @@
 
 					if (check->current_step->action == TCPCHK_ACT_EXPECT)
 						goto tcpcheck_expect;
-					__cs_stop_recv(cs);
 				}
 				/* not matched but was supposed to => ERROR */
 				else {
@@ -3098,15 +3078,9 @@
 		goto out_end_tcpcheck;
 	}
 
-	/* warning, current_step may now point to the head */
-	if (b_data(&check->bo))
-		__cs_want_send(cs);
-
 	if (&check->current_step->list != head &&
-	    check->current_step->action == TCPCHK_ACT_EXPECT) {
-		__cs_want_recv(cs);
+	    check->current_step->action == TCPCHK_ACT_EXPECT)
 		__event_srv_chk_r(cs);
-	}
 	goto out;
 
  out_end_tcpcheck:
@@ -3120,8 +3094,6 @@
 	if (check->result == CHK_RES_FAILED)
 		conn->flags |= CO_FL_ERROR;
 
-	__cs_stop_both(cs);
-
  out:
 	return retcode;
 }
diff --git a/src/connection.c b/src/connection.c
index b62ccce..5022772 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -134,6 +134,7 @@
 			conn->send_wait = NULL;
 		} else
 			io_available = 1;
+		__conn_xprt_stop_send(conn);
 	}
 
 	/* The data transfer starts here and stops on error and handshakes. Note
@@ -153,6 +154,7 @@
 			conn->recv_wait = NULL;
 		} else
 			io_available = 1;
+		__conn_xprt_stop_recv(conn);
 	}
 
 	/* It may happen during the data phase that a handshake is
@@ -341,6 +343,7 @@
 			conn->recv_wait = NULL;
 			sw->wait_reason &= ~SUB_CAN_RECV;
 		}
+		__conn_xprt_stop_recv(conn);
 	}
 	if (event_type & SUB_CAN_SEND) {
 		sw = param;
@@ -348,7 +351,9 @@
 			conn->send_wait = NULL;
 			sw->wait_reason &= ~SUB_CAN_SEND;
 		}
+		__conn_xprt_stop_send(conn);
 	}
+	conn_update_xprt_polling(conn);
 	return 0;
 }
 
@@ -363,6 +368,7 @@
 			conn->recv_wait = sw;
 		}
 		event_type &= ~SUB_CAN_RECV;
+		__conn_xprt_want_recv(conn);
 	}
 	if (event_type & SUB_CAN_SEND) {
 		sw = param;
@@ -371,9 +377,11 @@
 			conn->send_wait = sw;
 		}
 		event_type &= ~SUB_CAN_SEND;
+		__conn_xprt_want_send(conn);
 	}
 	if (event_type != 0)
 		return (-1);
+	conn_update_xprt_polling(conn);
 	return 0;
 }
 
@@ -603,6 +611,7 @@
 			}
 			line++;
 		}
+		__conn_xprt_stop_recv(conn);
 
 		if (!dst_s || !sport_s || !dport_s)
 			goto bad_header;
diff --git a/src/mux_h2.c b/src/mux_h2.c
index f7999d2..5255ca0 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -286,24 +286,18 @@
 
 	if ((h2c->flags & H2_CF_DEM_DALLOC) && b_alloc_margin(&h2c->dbuf, 0)) {
 		h2c->flags &= ~H2_CF_DEM_DALLOC;
-		if (h2_recv_allowed(h2c)) {
-			conn_xprt_want_recv(h2c->conn);
+		if (h2_recv_allowed(h2c))
 			tasklet_wakeup(h2c->wait_event.task);
-		}
 		return 1;
 	}
 
 	if ((h2c->flags & H2_CF_MUX_MALLOC) && b_alloc_margin(&h2c->mbuf, 0)) {
 		h2c->flags &= ~H2_CF_MUX_MALLOC;
-		if (!(h2c->flags & H2_CF_MUX_BLOCK_ANY))
-			conn_xprt_want_send(h2c->conn);
 
 		if (h2c->flags & H2_CF_DEM_MROOM) {
 			h2c->flags &= ~H2_CF_DEM_MROOM;
-			if (h2_recv_allowed(h2c)) {
-				conn_xprt_want_recv(h2c->conn);
+			if (h2_recv_allowed(h2c))
 				tasklet_wakeup(h2c->wait_event.task);
-			}
 		}
 		return 1;
 	}
@@ -312,10 +306,8 @@
 	    (h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s->cs &&
 	    b_alloc_margin(&h2s->rxbuf, 0)) {
 		h2c->flags &= ~H2_CF_DEM_SALLOC;
-		if (h2_recv_allowed(h2c)) {
-			conn_xprt_want_recv(h2c->conn);
+		if (h2_recv_allowed(h2c))
 			tasklet_wakeup(h2c->wait_event.task);
-		}
 		return 1;
 	}
 
@@ -427,7 +419,6 @@
 		task_queue(t);
 
 	/* prepare to read something */
-	conn_xprt_want_recv(conn);
 	tasklet_wakeup(h2c->wait_event.task);
 	return 0;
   fail:
@@ -2255,10 +2246,8 @@
 			ret = 0;
 	} while (ret > 0);
 
-	if (h2_recv_allowed(h2c) && (b_data(buf) < buf->size)) {
-		conn_xprt_want_recv(conn);
+	if (h2_recv_allowed(h2c) && (b_data(buf) < buf->size))
 		conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_event);
-	}
 
 	if (!b_data(buf)) {
 		h2_release_buf(h2c, &h2c->dbuf);
@@ -2445,25 +2434,13 @@
 	if (!b_data(&h2c->dbuf))
 		h2_release_buf(h2c, &h2c->dbuf);
 
-	/* stop being notified of incoming data if we can't process them */
-	if (!h2_recv_allowed(h2c))
-		__conn_xprt_stop_recv(conn);
-	else
-		__conn_xprt_want_recv(conn);
-
-	/* adjust output polling */
-	if (!(conn->flags & CO_FL_SOCK_WR_SH) &&
-	    h2c->st0 != H2_CS_ERROR2 && !(h2c->flags & H2_CF_GOAWAY_FAILED) &&
-	    (h2c->st0 == H2_CS_ERROR ||
-	     b_data(&h2c->mbuf) ||
-	     (h2c->mws > 0 && !LIST_ISEMPTY(&h2c->fctl_list)) ||
-	     (!(h2c->flags & H2_CF_MUX_BLOCK_ANY) && !LIST_ISEMPTY(&h2c->send_list)))) {
-		__conn_xprt_want_send(conn);
-	}
-	else {
+	if ((conn->flags & CO_FL_SOCK_WR_SH) ||
+	    h2c->st0 == H2_CS_ERROR2 || (h2c->flags & H2_CF_GOAWAY_FAILED) ||
+	    (h2c->st0 != H2_CS_ERROR &&
+	     !b_data(&h2c->mbuf) &&
+	     (h2c->mws <= 0 || LIST_ISEMPTY(&h2c->fctl_list)) &&
+	     ((h2c->flags & H2_CF_MUX_BLOCK_ANY) || LIST_ISEMPTY(&h2c->send_list))))
 		h2_release_buf(h2c, &h2c->mbuf);
-		__conn_xprt_stop_send(conn);
-	}
 
 	if (h2c->task) {
 		if (eb_is_empty(&h2c->streams_by_id) || b_data(&h2c->mbuf)) {
@@ -2553,48 +2530,6 @@
 	return NULL;
 }
 
-/* callback used to update the mux's polling flags after changing a cs' status.
- * The caller (cs_update_mux_polling) will take care of propagating any changes
- * to the transport layer.
- */
-static void h2_update_poll(struct conn_stream *cs)
-{
-	struct h2s *h2s = cs->ctx;
-
-	if (!h2s)
-		return;
-
-	/* we may unblock a blocked read */
-
-	if (cs->flags & CS_FL_DATA_RD_ENA) {
-		/* the stream indicates it's willing to read */
-		h2s->h2c->flags &= ~H2_CF_DEM_SFULL;
-		if (h2s->h2c->dsi == h2s->id) {
-			conn_xprt_want_recv(cs->conn);
-			tasklet_wakeup(h2s->h2c->wait_event.task);
-			conn_xprt_want_send(cs->conn);
-		}
-	}
-
-	/* Note: the stream and stream-int code doesn't allow us to perform a
-	 * synchronous send() here unfortunately, because this code is called
-	 * as si_update() from the process_stream() context. This means that
-	 * we have to queue the current cs and defer its processing after the
-	 * connection's cs list is processed anyway.
-	 */
-
-	if (cs->flags & CS_FL_DATA_WR_ENA) {
-		if (!b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_SOCK_WR_SH))
-			conn_xprt_want_send(cs->conn);
-		tasklet_wakeup(h2s->h2c->wait_event.task);
-	}
-	/* We don't support unsubscribing from here, it shouldn't be a problem */
-
-	/* this can happen from within si_chk_snd() */
-	if (b_data(&h2s->h2c->mbuf) && !(cs->conn->flags & CO_FL_XPRT_WR_ENA))
-		conn_xprt_want_send(cs->conn);
-}
-
 /*
  * Detach the stream from the connection and possibly release the connection.
  */
@@ -2613,11 +2548,8 @@
 	if (h2c->flags & H2_CF_DEM_TOOMANY &&
 	    !h2_has_too_many_cs(h2c)) {
 		h2c->flags &= ~H2_CF_DEM_TOOMANY;
-		if (h2_recv_allowed(h2c)) {
-			__conn_xprt_want_recv(h2c->conn);
+		if (h2_recv_allowed(h2c))
 			tasklet_wakeup(h2c->wait_event.task);
-			conn_xprt_want_send(h2c->conn);
-		}
 	}
 
 	/* this stream may be blocked waiting for some data to leave (possibly
@@ -2635,9 +2567,7 @@
 		 */
 		h2c->flags &= ~H2_CF_DEM_BLOCK_ANY;
 		h2c->flags &= ~H2_CF_MUX_BLOCK_ANY;
-		conn_xprt_want_recv(cs->conn);
 		tasklet_wakeup(h2c->wait_event.task);
-		conn_xprt_want_send(cs->conn);
 	}
 
 	h2s_destroy(h2s);
@@ -2688,8 +2618,6 @@
 	    !(h2s->h2c->flags & (H2_CF_GOAWAY_SENT|H2_CF_GOAWAY_FAILED)) &&
 	    h2c_send_goaway_error(h2c, h2s) <= 0)
 		return;
-	if (b_data(&h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA))
-		conn_xprt_want_send(h2c->conn);
 
 	h2s_close(h2s);
 
@@ -2747,8 +2675,6 @@
 		h2s_close(h2s);
 	}
 
-	if (b_data(&h2s->h2c->mbuf) && !(h2c->conn->flags & CO_FL_XPRT_WR_ENA))
-		conn_xprt_want_send(h2c->conn);
 
  add_to_list:
 	if (LIST_ISEMPTY(&h2s->list)) {
@@ -3690,7 +3616,6 @@
 
 	b_del(buf, total);
 	if (total > 0) {
-		conn_xprt_want_send(h2s->h2c->conn);
 		if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND))
 			tasklet_wakeup(h2s->h2c->wait_event.task);
 	}
@@ -3795,7 +3720,6 @@
 const struct mux_ops h2_ops = {
 	.init = h2_init,
 	.wake = h2_wake,
-	.update_poll = h2_update_poll,
 	.snd_buf = h2_snd_buf,
 	.rcv_buf = h2_rcv_buf,
 	.subscribe = h2_subscribe,
diff --git a/src/mux_pt.c b/src/mux_pt.c
index 3a573b5..beca8c2 100644
--- a/src/mux_pt.c
+++ b/src/mux_pt.c
@@ -60,31 +60,9 @@
 	if ((conn->flags & (CO_FL_EARLY_DATA | CO_FL_EARLY_SSL_HS | CO_FL_HANDSHAKE)) ==
 	    CO_FL_EARLY_DATA)
 		conn->flags &= ~CO_FL_EARLY_DATA;
-	if (ret >= 0)
-		cs_update_mux_polling(cs);
 	return ret;
 }
 
-/* callback used to update the mux's polling flags after changing a cs' status.
- * The caller (cs_mux_update_poll) will take care of propagating any changes to
- * the transport layer.
- */
-static void mux_pt_update_poll(struct conn_stream *cs)
-{
-	struct connection *conn = cs->conn;
-	int flags = 0;
-
-	conn_refresh_polling_flags(conn);
-
-	if (cs->flags & CS_FL_DATA_RD_ENA)
-		flags |= CO_FL_XPRT_RD_ENA;
-	if (cs->flags & CS_FL_DATA_WR_ENA)
-		flags |= CO_FL_XPRT_WR_ENA;
-
-	conn->flags = (conn->flags & ~(CO_FL_XPRT_RD_ENA | CO_FL_XPRT_WR_ENA)) | flags;
-	conn_cond_update_xprt_polling(conn);
-}
-
 /*
  * Attach a new stream to a connection
  * (Used for outgoing connections)
@@ -191,7 +169,6 @@
 const struct mux_ops mux_pt_ops = {
 	.init = mux_pt_init,
 	.wake = mux_pt_wake,
-	.update_poll = mux_pt_update_poll,
 	.rcv_buf = mux_pt_rcv_buf,
 	.snd_buf = mux_pt_snd_buf,
 	.subscribe = mux_pt_subscribe,
diff --git a/src/stream.c b/src/stream.c
index 7444115..e202b87 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -268,9 +268,7 @@
 		goto out_fail_accept;
 
 	/* finish initialization of the accepted file descriptor */
-	if (cs)
-		cs_want_recv(cs);
-	else if (appctx)
+	if (appctx)
 		si_applet_want_get(&s->si[0]);
 
 	if (sess->fe->accept && sess->fe->accept(s) < 0)
@@ -1679,6 +1677,7 @@
 		si_cs_send(cs);
 		si_cs_recv(cs);
 	}
+redo:
 
 	//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
 	//        si_f->state, si_b->state, si_b->err_type, req->flags, res->flags);
@@ -2444,6 +2443,19 @@
 		if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED))
 			stream_process_counters(s);
 
+		cs = objt_cs(si_f->end);
+		ret = 0;
+		if (cs && !(cs->conn->flags & CO_FL_ERROR) &&
+		    !(cs->flags & CS_FL_ERROR) && !(si_oc(si_f)->flags & CF_SHUTW))
+			ret = si_cs_send(cs);
+		cs = objt_cs(si_b->end);
+		if (cs && !(cs->conn->flags & CO_FL_ERROR) &&
+		    !(cs->flags & CS_FL_ERROR) && !(si_oc(si_b)->flags & CF_SHUTW))
+			ret |= si_cs_send(cs);
+
+		if (ret)
+			goto redo;
+
 		if (si_f->state == SI_ST_EST)
 			si_update(si_f);
 
@@ -2510,22 +2522,6 @@
 		s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
 		stream_release_buffers(s);
 		/* We may have free'd some space in buffers, or have more to send/recv, try again */
-		cs = objt_cs(si_f->end);
-		ret = 0;
-		if (cs && !(cs->conn->flags & CO_FL_ERROR)) {
-			ret |= si_cs_send(cs);
-			si_cs_recv(cs);
-			ret |= (si_ic(si_f)->flags & CF_READ_PARTIAL) | (cs->conn->flags & CO_FL_ERROR);
-		}
-		cs = objt_cs(si_b->end);
-		if (cs && !(cs->conn->flags & CO_FL_ERROR)) {
-			ret |= si_cs_send(cs);
-			si_cs_recv(cs);
-			ret |= (si_ic(si_b)->flags & CF_READ_PARTIAL) | (cs->conn->flags & CO_FL_ERROR);
-
-		}
-		if (ret)
-			task_wakeup(t, TASK_WOKEN_IO);
 		return t; /* nothing more to do */
 	}
 
diff --git a/src/stream_interface.c b/src/stream_interface.c
index a0487ef..8057d27 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -66,7 +66,6 @@
 
 /* stream-interface operations for connections */
 struct si_ops si_conn_ops = {
-	.update  = stream_int_update_conn,
 	.chk_rcv = stream_int_chk_rcv_conn,
 	.chk_snd = stream_int_chk_snd_conn,
 	.shutr   = stream_int_shutr_conn,
@@ -258,6 +257,7 @@
 	else {
 		/* (re)start reading */
 		si->flags &= ~SI_FL_WAIT_ROOM;
+		tasklet_wakeup(si->wait_event.task);
 		if (!(si->flags & SI_FL_DONT_WAKE))
 			task_wakeup(si_task(si), TASK_WOKEN_IO);
 	}
@@ -518,8 +518,10 @@
 		/* check if the consumer has freed some space either in the
 		 * buffer or in the pipe.
 		 */
-		if (channel_may_recv(ic) && new_len < last_len)
+		if (channel_may_recv(ic) && new_len < last_len) {
+			tasklet_wakeup(si->wait_event.task);
 			si->flags &= ~SI_FL_WAIT_ROOM;
+		}
 	}
 
 	if (si->flags & SI_FL_WAIT_ROOM) {
@@ -566,6 +568,7 @@
 	struct stream_interface *si = cs->data;
 	struct channel *ic = si_ic(si);
 	struct channel *oc = si_oc(si);
+	int wait_room = si->flags & SI_FL_WAIT_ROOM;
 
 	/* If we have data to send, try it now */
 	if (!channel_is_empty(oc) && objt_cs(si->end))
@@ -600,20 +603,9 @@
 	stream_int_notify(si);
 	channel_release_buffer(ic, &(si_strm(si)->buffer_wait));
 
-	/* Third step : update the connection's polling status based on what
-	 * was done above (eg: maybe some buffers got emptied).
-	 */
-	if (channel_is_empty(oc))
-		__cs_stop_send(cs);
-
-
-	if (si->flags & SI_FL_WAIT_ROOM) {
-		__cs_stop_recv(cs);
-	}
-	else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
-		 channel_may_recv(ic)) {
-		__cs_want_recv(cs);
-	}
+	/* Try to recv() again if we free'd some room in the process */
+	if (wait_room && !(si->flags & SI_FL_WAIT_ROOM))
+		si_cs_recv(cs);
 	return 0;
 }
 
@@ -720,10 +712,8 @@
 		}
 	}
 	/* We couldn't send all of our data, let the mux know we'd like to send more */
-	if (co_data(oc)) {
-		cs_want_send(cs);
+	if (co_data(oc))
 		conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_event);
-	}
 	return did_send;
 }
 
@@ -776,6 +766,7 @@
 			 * have updated it if there has been a completed I/O.
 			 */
 			si->flags &= ~SI_FL_WAIT_ROOM;
+			tasklet_wakeup(si->wait_event.task);
 			if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
 				ic->rex = tick_add_ifset(now_ms, ic->rto);
 		}
@@ -814,37 +805,6 @@
 	}
 }
 
-/* Updates the polling status of a connection outside of the connection handler
- * based on the channel's flags and the stream interface's flags. It needs to be
- * called once after the channels' flags have settled down and the stream has
- * been updated. It is not designed to be called from within the connection
- * handler itself.
- */
-void stream_int_update_conn(struct stream_interface *si)
-{
-	struct channel *ic = si_ic(si);
-	struct channel *oc = si_oc(si);
-	struct conn_stream *cs = __objt_cs(si->end);
-
-	if (!(ic->flags & CF_SHUTR)) {
-		/* Read not closed */
-		if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic))
-			__cs_stop_recv(cs);
-		else
-			__cs_want_recv(cs);
-	}
-
-	if (!(oc->flags & CF_SHUTW)) {
-		/* Write not closed */
-		if (channel_is_empty(oc))
-			__cs_stop_send(cs);
-		else
-			__cs_want_send(cs);
-	}
-
-	cs_update_mux_polling(cs);
-}
-
 /*
  * This function performs a shutdown-read on a stream interface attached to
  * a connection in a connected or init state (it does nothing for other
@@ -858,7 +818,6 @@
 static void stream_int_shutr_conn(struct stream_interface *si)
 {
 	struct conn_stream *cs = __objt_cs(si->end);
-	struct connection *conn = cs->conn;
 	struct channel *ic = si_ic(si);
 
 	ic->flags &= ~CF_SHUTR_NOW;
@@ -880,10 +839,6 @@
 		/* we want to immediately forward this close to the write side */
 		return stream_int_shutw_conn(si);
 	}
-	else if (conn->ctrl) {
-		/* we want the caller to disable polling on this FD */
-		cs_stop_recv(cs);
-	}
 }
 
 /*
@@ -980,24 +935,23 @@
 static void stream_int_chk_rcv_conn(struct stream_interface *si)
 {
 	struct channel *ic = si_ic(si);
-	struct conn_stream *cs = __objt_cs(si->end);
 
 	if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR)))
 		return;
 
 	if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
 		/* stop reading */
-		if (!(ic->flags & CF_DONT_READ)) /* full */ {
-			si->flags |= SI_FL_WAIT_ROOM;
-		}
-		__cs_stop_recv(cs);
+		si->flags |= SI_FL_WAIT_ROOM;
 	}
 	else {
+		struct conn_stream *cs = objt_cs(si->end);
 		/* (re)start reading */
 		si->flags &= ~SI_FL_WAIT_ROOM;
-		__cs_want_recv(cs);
+		if (cs) {
+			si_cs_recv(cs);
+			tasklet_wakeup(si->wait_event.task);
 	}
-	cs_update_mux_polling(cs);
+	}
 }
 
 
@@ -1024,20 +978,10 @@
 	    !(si->flags & SI_FL_WAIT_DATA))       /* not waiting for data */
 		return;
 
-	if (cs->flags & CS_FL_DATA_WR_ENA) {
-		/* already subscribed to write notifications, will be called
-		 * anyway, so let's avoid calling it especially if the reader
-		 * is not ready.
-		 */
-		return;
-	}
-
-	__cs_want_send(cs);
-
 	si_cs_send(cs);
+	tasklet_wakeup(si->wait_event.task);
 	if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) {
 		/* Write error on the file descriptor */
-		__cs_stop_both(cs);
 		si->flags |= SI_FL_ERR;
 		goto out_wakeup;
 	}
@@ -1051,7 +995,6 @@
 		 * ->o limit was reached. Maybe we just wrote the last
 		 * chunk and need to close.
 		 */
-		__cs_stop_send(cs);
 		if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
 		     (CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
 		    (si->state == SI_ST_EST)) {
@@ -1067,7 +1010,6 @@
 		/* Otherwise there are remaining data to be sent in the buffer,
 		 * which means we have to poll before doing so.
 		 */
-		__cs_want_send(cs);
 		si->flags &= ~SI_FL_WAIT_DATA;
 		if (!tick_isset(oc->wex))
 			oc->wex = tick_add_ifset(now_ms, oc->wto);
@@ -1105,9 +1047,6 @@
 		if (!(si->flags & SI_FL_DONT_WAKE))
 			task_wakeup(si_task(si), TASK_WOKEN_IO);
 	}
-
-	/* commit possible polling changes */
-	cs_update_mux_polling(cs);
 }
 
 /*
@@ -1208,7 +1147,6 @@
 			 * could soon be full. Let's stop before needing to poll.
 			 */
 			si->flags |= SI_FL_WAIT_ROOM;
-			__cs_stop_recv(cs);
 		}
 
 		/* splice not possible (anymore), let's go on on standard copy */
@@ -1274,7 +1212,6 @@
 			 * This was changed to accomodate with the mux code,
 			 * but we may have lost a worthwhile optimization.
 			 */
-			__cs_stop_recv(cs);
 			si->flags |= SI_FL_WAIT_ROOM;
 			break;
 		}
@@ -1347,9 +1284,10 @@
 		goto out_shutdown_r;
 
 	/* Subscribe to receive events */
-	conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
+	if (!(si->flags & SI_FL_WAIT_ROOM))
+		conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
 
-	return cur_read != 0;
+	return (cur_read != 0 || (si->flags & SI_FL_WAIT_ROOM));
 
  out_shutdown_r:
 	/* we received a shutdown */
@@ -1392,7 +1330,6 @@
 	}
 
 	/* otherwise that's just a normal read shutdown */
-	__cs_stop_recv(cs);
 	return;
 
  do_close: