MEDIUM: connection: make use of the new polling functions

Now the connection handler, the handshake callbacks and the I/O callbacks
make use of the connection-layer polling functions to enable or disable
polling on a file descriptor.

Some changes still need to be done to avoid using the FD_WAIT_* constants.
diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h
index 3ddc56d..224ce84 100644
--- a/include/proto/stream_interface.h
+++ b/include/proto/stream_interface.h
@@ -27,6 +27,7 @@
 #include <common/config.h>
 #include <types/session.h>
 #include <types/stream_interface.h>
+#include <proto/connection.h>
 
 
 /* main event functions used to move data between sockets and buffers */
@@ -167,14 +168,14 @@
 static inline void si_shutr(struct stream_interface *si)
 {
 	if (stream_int_shutr(si))
-		fd_stop_recv(si_fd(si));
+		conn_data_stop_recv(&si->conn);
 }
 
 /* Sends a shutw to the connection using the data layer */
 static inline void si_shutw(struct stream_interface *si)
 {
 	if (stream_int_shutw(si))
-		fd_stop_send(si_fd(si));
+		conn_data_stop_send(&si->conn);
 }
 
 /* Calls the data state update on the stream interfaace */
diff --git a/src/connection.c b/src/connection.c
index 712dfba..a3f38eb 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -31,6 +31,12 @@
 		goto leave;
 
  process_handshake:
+	/* The handshake callbacks are called in sequence. If either of them is
+	 * missing something, it must enable the required polling at the socket
+	 * layer of the connection. Polling state is not guaranteed when entering
+	 * these handlers, so any handshake handler which does not complete its
+	 * work must explicitly disable events it's not interested in.
+	 */
 	while (unlikely(conn->flags & CO_FL_HANDSHAKE)) {
 		if (unlikely(conn->flags & CO_FL_ERROR))
 			goto leave;
@@ -40,7 +46,9 @@
 				goto leave;
 	}
 
-	/* OK now we're in the data phase now */
+	/* Once we're purely in the data phase, we disable handshake polling */
+	if (!(conn->flags & CO_FL_POLL_SOCK))
+		__conn_sock_stop_both(conn);
 
 	if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
 		if (!conn->data->read(conn))
@@ -86,6 +94,9 @@
 
 	/* remove the events before leaving */
 	fdtab[fd].ev &= ~(FD_POLL_IN | FD_POLL_OUT | FD_POLL_HUP | FD_POLL_ERR);
+
+	/* commit polling changes */
+	conn_cond_update_polling(conn);
 	return ret;
 }
 
diff --git a/src/proto_tcp.c b/src/proto_tcp.c
index 7de238a..2f56797 100644
--- a/src/proto_tcp.c
+++ b/src/proto_tcp.c
@@ -475,7 +475,9 @@
 
 	fdtab[fd].iocb = conn_fd_handler;
 	fd_insert(fd);
-	fd_want_send(fd);  /* for connect status */
+	conn_sock_want_send(&si->conn);  /* for connect status */
+	if (!(si->ob->flags & BF_OUT_EMPTY))
+		conn_data_want_send(&si->conn);  /* prepare to send data if any */
 
 	si->state = SI_ST_CON;
 	si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
@@ -548,8 +550,11 @@
 	 *  - connected (EISCONN, 0)
 	 */
 	if ((connect(fd, conn->peeraddr, conn->peerlen) < 0)) {
-		if (errno == EALREADY || errno == EINPROGRESS)
+		if (errno == EALREADY || errno == EINPROGRESS) {
+			conn_sock_stop_recv(conn);
+			conn_sock_poll_send(conn);
 			return 0;
+		}
 
 		if (errno && errno != EISCONN)
 			goto out_error;
@@ -570,7 +575,7 @@
 	 */
 
 	conn->flags |= CO_FL_ERROR;
-	fd_stop_both(fd);
+	conn_sock_stop_both(conn);
 	return 1;
 }
 
diff --git a/src/session.c b/src/session.c
index ffb683e..6612f0a 100644
--- a/src/session.c
+++ b/src/session.c
@@ -284,7 +284,7 @@
 	fdtab[cfd].owner = &s->si[0].conn;
 	fdtab[cfd].flags = 0;
 	fdtab[cfd].iocb = conn_fd_handler;
-	fd_want_recv(cfd);
+	conn_data_want_recv(&s->si[0].conn);
 
 	if (p->accept && (ret = p->accept(s)) <= 0) {
 		/* Either we had an unrecoverable error (<0) or work is
diff --git a/src/sock_raw.c b/src/sock_raw.c
index eb2bfbd..c48480a 100644
--- a/src/sock_raw.c
+++ b/src/sock_raw.c
@@ -102,7 +102,7 @@
 		 * place and ask the consumer to hurry.
 		 */
 		si->flags |= SI_FL_WAIT_ROOM;
-		fd_stop_recv(fd);
+		conn_data_stop_recv(&si->conn);
 		b->rex = TICK_ETERNITY;
 		si_chk_snd(b->cons);
 		return 1;
@@ -467,7 +467,7 @@
 	 */
 
 	conn->flags |= CO_FL_ERROR;
-	fd_stop_both(fd);
+	conn_data_stop_both(conn);
 	retval = 1;
 	goto out_wakeup;
 }
@@ -628,7 +628,6 @@
  */
 static int sock_raw_write(struct connection *conn)
 {
-	int fd = conn->t.sock.fd;
 	struct stream_interface *si = container_of(conn, struct stream_interface, conn);
 	struct buffer *b = si->ob;
 	int retval = 1;
@@ -660,7 +659,7 @@
 	 */
 
 	conn->flags |= CO_FL_ERROR;
-	fd_stop_both(fd);
+	conn_data_stop_both(conn);
 	return 1;
 }
 
@@ -700,7 +699,7 @@
 	}
 
 	/* otherwise that's just a normal read shutdown */
-	fd_stop_recv(si_fd(si));
+	conn_data_stop_recv(&si->conn);
 	return;
 
  do_close:
@@ -723,11 +722,10 @@
 {
 	struct buffer *ib = si->ib;
 	struct buffer *ob = si->ob;
-	int fd = si_fd(si);
 
 	DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibh=%d ibt=%d obh=%d obd=%d si=%d\n",
 		now_ms, __FUNCTION__,
-		fd, fdtab[fd].owner,
+		si_fd(si), fdtab[si_fd(fd)].owner,
 		ib, ob,
 		ib->rex, ob->wex,
 		ib->flags, ob->flags,
@@ -741,7 +739,7 @@
 			if (!(si->flags & SI_FL_WAIT_ROOM)) {
 				if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
 					si->flags |= SI_FL_WAIT_ROOM;
-				fd_stop_recv(fd);
+				conn_data_stop_recv(&si->conn);
 				ib->rex = TICK_ETERNITY;
 			}
 		}
@@ -752,7 +750,7 @@
 			 * have updated it if there has been a completed I/O.
 			 */
 			si->flags &= ~SI_FL_WAIT_ROOM;
-			fd_want_recv(fd);
+			conn_data_want_recv(&si->conn);
 			if (!(ib->flags & (BF_READ_NOEXP|BF_DONT_READ)) && !tick_isset(ib->rex))
 				ib->rex = tick_add_ifset(now_ms, ib->rto);
 		}
@@ -766,7 +764,7 @@
 			if (!(si->flags & SI_FL_WAIT_DATA)) {
 				if ((ob->flags & (BF_FULL|BF_HIJACK|BF_SHUTW_NOW)) == 0)
 					si->flags |= SI_FL_WAIT_DATA;
-				fd_stop_send(fd);
+				conn_data_stop_send(&si->conn);
 				ob->wex = TICK_ETERNITY;
 			}
 		}
@@ -777,7 +775,7 @@
 			 * have updated it if there has been a completed I/O.
 			 */
 			si->flags &= ~SI_FL_WAIT_DATA;
-			fd_want_send(fd);
+			conn_data_want_send(&si->conn);
 			if (!tick_isset(ob->wex)) {
 				ob->wex = tick_add_ifset(now_ms, ob->wto);
 				if (tick_isset(ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
@@ -818,12 +816,12 @@
 		/* stop reading */
 		if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
 			si->flags |= SI_FL_WAIT_ROOM;
-		fd_stop_recv(si_fd(si));
+		conn_data_stop_recv(&si->conn);
 	}
 	else {
 		/* (re)start reading */
 		si->flags &= ~SI_FL_WAIT_ROOM;
-		fd_want_recv(si_fd(si));
+		conn_data_want_recv(&si->conn);
 	}
 }
 
@@ -869,7 +867,7 @@
 		 */
 		si->conn.flags |= CO_FL_ERROR;
 		fdtab[si_fd(si)].ev &= ~FD_POLL_STICKY;
-		fd_stop_both(si_fd(si));
+		conn_data_stop_both(&si->conn);
 		si->flags |= SI_FL_ERR;
 		goto out_wakeup;
 	}
@@ -899,7 +897,7 @@
 		/* Otherwise there are remaining data to be sent in the buffer,
 		 * which means we have to poll before doing so.
 		 */
-		fd_want_send(si_fd(si));
+		conn_data_want_send(&si->conn);
 		si->flags &= ~SI_FL_WAIT_DATA;
 		if (!tick_isset(ob->wex))
 			ob->wex = tick_add_ifset(now_ms, ob->wto);
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 45707dd..c5da3f1 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -550,10 +550,12 @@
 
 	conn->flags |= CO_FL_ERROR;
 	fdtab[fd].ev &= ~FD_POLL_STICKY;
-	fd_stop_both(fd);
+	conn_sock_stop_both(conn);
 	goto out_leave;
 
  out_wait:
+	conn_sock_stop_recv(conn);
+	conn_sock_poll_send(conn);
 	return FD_WAIT_WRITE;
 }
 
@@ -582,7 +584,7 @@
 			if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
 			    (si->state == SI_ST_EST))
 				stream_int_shutw(si);
-			fd_stop_send(fd);
+			conn_data_stop_send(conn);
 			si->ob->wex = TICK_ETERNITY;
 		}
 
@@ -627,7 +629,7 @@
 		}
 
 		if (si->flags & SI_FL_WAIT_ROOM) {
-			fd_stop_recv(fd);
+			conn_data_stop_recv(conn);
 			si->ib->rex = TICK_ETERNITY;
 		}
 		else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {