MAJOR: stream-interface: make conn_notify_si() more robust

This function was relying on the result of file descriptor polling
which is inappropriate as it may be subject to race conditions during
handshakes. Make it more robust by relying solely on buffer activity.
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 2f24c18..ec8287e 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -559,7 +559,6 @@
  */
 void conn_notify_si(struct connection *conn)
 {
-	int fd = conn->t.sock.fd;
 	struct stream_interface *si = container_of(conn, struct stream_interface, conn);
 
 	DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
@@ -575,64 +574,63 @@
 		si->ob->flags |= BF_WRITE_NULL;
 	}
 
-	/* process consumer side, only once if possible */
-	if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) {
-		if (si->ob->flags & BF_OUT_EMPTY) {
-			if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
-			    (si->state == SI_ST_EST))
-				stream_int_shutw(si);
+	/* process consumer side */
+	if (si->ob->flags & BF_OUT_EMPTY) {
+		if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
+		    (si->state == SI_ST_EST))
+			stream_int_shutw(si);
+		if (conn->flags & CO_FL_DATA_WR_ENA)
 			conn_data_stop_send(conn);
-			si->ob->wex = TICK_ETERNITY;
-		}
+		si->ob->wex = TICK_ETERNITY;
+	}
 
-		if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
-			si->flags |= SI_FL_WAIT_DATA;
+	if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
+		si->flags |= SI_FL_WAIT_DATA;
 
-		if (si->ob->flags & BF_WRITE_ACTIVITY) {
-			/* update timeouts if we have written something */
-			if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
-				if (tick_isset(si->ob->wex))
-					si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
+	if (si->ob->flags & BF_WRITE_ACTIVITY) {
+		/* update timeouts if we have written something */
+		if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
+			if (tick_isset(si->ob->wex))
+				si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
 
-			if (!(si->flags & SI_FL_INDEP_STR))
-				if (tick_isset(si->ib->rex))
-					si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
+		if (!(si->flags & SI_FL_INDEP_STR))
+			if (tick_isset(si->ib->rex))
+				si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
 
-			if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
-			           (si->ob->prod->flags & SI_FL_WAIT_ROOM)))
-				si_chk_rcv(si->ob->prod);
-		}
+		if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
+			   (si->ob->prod->flags & SI_FL_WAIT_ROOM)))
+			si_chk_rcv(si->ob->prod);
 	}
 
-	/* process producer side, only once if possible */
-	if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) {
-		/* We might have some data the consumer is waiting for.
-		 * We can do fast-forwarding, but we avoid doing this for partial
-		 * buffers, because it is very likely that it will be done again
-		 * immediately afterwards once the following data is parsed (eg:
-		 * HTTP chunking).
-		 */
-		if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
-		    (si->ib->pipe /* always try to send spliced data */ ||
-		     (si->ib->buf.i == 0 && (si->ib->cons->flags & SI_FL_WAIT_DATA)))) {
-			int last_len = si->ib->pipe ? si->ib->pipe->data : 0;
+	/* process producer side.
+	 * We might have some data the consumer is waiting for.
+	 * We can do fast-forwarding, but we avoid doing this for partial
+	 * buffers, because it is very likely that it will be done again
+	 * immediately afterwards once the following data is parsed (eg:
+	 * HTTP chunking).
+	 */
+	if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
+	    (si->ib->pipe /* always try to send spliced data */ ||
+	     (si->ib->buf.i == 0 && (si->ib->cons->flags & SI_FL_WAIT_DATA)))) {
+		int last_len = si->ib->pipe ? si->ib->pipe->data : 0;
 
-			si_chk_snd(si->ib->cons);
+		si_chk_snd(si->ib->cons);
 
-			/* check if the consumer has freed some space */
-			if (!(si->ib->flags & BF_FULL) &&
-			    (!last_len || !si->ib->pipe || si->ib->pipe->data < last_len))
-				si->flags &= ~SI_FL_WAIT_ROOM;
-		}
+		/* check if the consumer has freed some space either in the
+		 * buffer or in the pipe.
+		 */
+		if (!(si->ib->flags & BF_FULL) &&
+		    (!last_len || !si->ib->pipe || si->ib->pipe->data < last_len))
+			si->flags &= ~SI_FL_WAIT_ROOM;
+	}
 
-		if (si->flags & SI_FL_WAIT_ROOM) {
-			conn_data_stop_recv(conn);
-			si->ib->rex = TICK_ETERNITY;
-		}
-		else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
-			if (tick_isset(si->ib->rex))
-				si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
-		}
+	if (si->flags & SI_FL_WAIT_ROOM) {
+		conn_data_stop_recv(conn);
+		si->ib->rex = TICK_ETERNITY;
+	}
+	else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
+		if (tick_isset(si->ib->rex))
+			si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
 	}
 
 	/* wake the task up only when needed */