MEDIUM: connection: enable reading only once the connection is confirmed

In order to address the absurd polling sequence described in issue #253,
let's make sure we disable receiving on a connection until it's established.
Previously with bottom-top I/Os, we were almost certain that a connection
was ready when the first I/O was confirmed. Now we can enter various
functions, including process_stream(), which will attempt to read
something, will fail, and will then subscribe. But we don't want them
to try to receive if we know the connection didn't complete. The first
prerequisite for this is to mark the connection as not ready for receiving
until it's validated. But we don't want to mark it as not ready for sending
because we know that attempting I/Os later is extremely likely to work
without polling.

Once the connection is confirmed we re-enable recv readiness. In order
for this event to be taken into account, the call to tcp_connect_probe()
was moved earlier, between the attempt to send() and the attempt to recv().
This way if tcp_connect_probe() enables reading, we have a chance to
immediately fall back to this and read the possibly pending data.

Now the trace looks like the following. It's far from being perfect
but we've already saved one recvfrom() and one epollctl():

 epoll_wait(3, [], 200, 0) = 0
 socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) = 7
 fcntl(7, F_SETFL, O_RDONLY|O_NONBLOCK) = 0
 setsockopt(7, SOL_TCP, TCP_NODELAY, [1], 4) = 0
 connect(7, {sa_family=AF_INET, sin_port=htons(8000), sin_addr=inet_addr("127.0.0.1")}, 16) = -1 EINPROGRESS (Operation now in progress)
 epoll_ctl(3, EPOLL_CTL_ADD, 7, {EPOLLIN|EPOLLOUT|EPOLLRDHUP, {u32=7, u64=7}}) = 0
 epoll_wait(3, [{EPOLLOUT, {u32=7, u64=7}}], 200, 1000) = 1
 connect(7, {sa_family=AF_INET, sin_port=htons(8000), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
 getsockopt(7, SOL_SOCKET, SO_ERROR, [0], [4]) = 0
 sendto(7, "OPTIONS / HTTP/1.0\r\n\r\n", 22, MSG_DONTWAIT|MSG_NOSIGNAL, NULL, 0) = 22
 epoll_ctl(3, EPOLL_CTL_MOD, 7, {EPOLLIN|EPOLLRDHUP, {u32=7, u64=7}}) = 0
 epoll_wait(3, [{EPOLLIN|EPOLLRDHUP, {u32=7, u64=7}}], 200, 1000) = 1
 getsockopt(7, SOL_SOCKET, SO_ERROR, [0], [4]) = 0
 getsockopt(7, SOL_SOCKET, SO_ERROR, [0], [4]) = 0
 recvfrom(7, "HTTP/1.0 200\r\nContent-length: 0\r\nX-req: size=22, time=0 ms\r\nX-rsp: id=dummy, code=200, cache=1, size=0, time=0 ms (0 real)\r\n\r\n", 16384, 0, NULL, NULL) = 126
 close(7)                = 0
diff --git a/src/connection.c b/src/connection.c
index 9122850..7a2ab24 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -86,6 +86,15 @@
 		__conn_xprt_stop_send(conn);
 	}
 
+	if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN)) {
+		/* still waiting for a connection to establish and nothing was
+		 * attempted yet to probe the connection. Then let's retry the
+		 * connect().
+		 */
+		if (!tcp_connect_probe(conn))
+			goto leave;
+	}
+
 	/* The data transfer starts here and stops on error and handshakes. Note
 	 * that we must absolutely test conn->xprt at each step in case it suddenly
 	 * changes due to a quick unexpected close().
@@ -105,14 +114,6 @@
 		__conn_xprt_stop_recv(conn);
 	}
 
-	if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN)) {
-		/* still waiting for a connection to establish and nothing was
-		 * attempted yet to probe the connection. Then let's retry the
-		 * connect().
-		 */
-		if (!tcp_connect_probe(conn))
-			goto leave;
-	}
  leave:
 	/* Verify if the connection just established. */
 	if (unlikely(!(conn->flags & (CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN | CO_FL_CONNECTED))))
@@ -227,8 +228,14 @@
 	} while (ret < 0 && errno == EINTR);
 
 
-	if (ret > 0)
+	if (ret > 0) {
+		if (conn->flags & CO_FL_WAIT_L4_CONN) {
+			conn->flags &= ~CO_FL_WAIT_L4_CONN;
+			fd_may_send(conn->handle.fd);
+			fd_cond_recv(conn->handle.fd);
+		}
 		return ret;
+	}
 
 	if (ret == 0 || errno == EAGAIN || errno == ENOTCONN) {
 	wait:
diff --git a/src/proto_tcp.c b/src/proto_tcp.c
index ff252e8..25e01bd 100644
--- a/src/proto_tcp.c
+++ b/src/proto_tcp.c
@@ -576,6 +576,9 @@
 	conn_ctrl_init(conn);       /* registers the FD */
 	fdtab[fd].linger_risk = 1;  /* close hard if needed */
 
+	if (conn->flags & CO_FL_WAIT_L4_CONN)
+		fd_cant_recv(fd); // we'll change this once the connection is validated
+
 	if (conn_xprt_init(conn) < 0) {
 		conn_full_close(conn);
 		conn->flags |= CO_FL_ERROR;
@@ -711,6 +714,8 @@
 	 * data layer.
 	 */
 	conn->flags &= ~CO_FL_WAIT_L4_CONN;
+	fd_may_send(fd);
+	fd_cond_recv(fd);
 	return 1;
 
  out_error:
diff --git a/src/proto_uxst.c b/src/proto_uxst.c
index 6074daa..c5acfce 100644
--- a/src/proto_uxst.c
+++ b/src/proto_uxst.c
@@ -575,6 +575,9 @@
 	conn_ctrl_init(conn);       /* registers the FD */
 	fdtab[fd].linger_risk = 0;  /* no need to disable lingering */
 
+	if (conn->flags & CO_FL_WAIT_L4_CONN)
+		fd_cant_recv(fd); // we'll change this once the connection is validated
+
 	if (conn_xprt_init(conn) < 0) {
 		conn_full_close(conn);
 		conn->flags |= CO_FL_ERROR;
diff --git a/src/raw_sock.c b/src/raw_sock.c
index cc99669..64f7a05 100644
--- a/src/raw_sock.c
+++ b/src/raw_sock.c
@@ -207,8 +207,10 @@
 		done += ret;
 		pipe->data -= ret;
 	}
-	if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN) && done)
+	if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN) && done) {
 		conn->flags &= ~CO_FL_WAIT_L4_CONN;
+		fd_cond_recv(conn->handle.fd);
+	}
 
 	return done;
 }
@@ -387,8 +389,10 @@
 			break;
 		}
 	}
-	if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN) && done)
+	if (unlikely(conn->flags & CO_FL_WAIT_L4_CONN) && done) {
 		conn->flags &= ~CO_FL_WAIT_L4_CONN;
+		fd_cond_recv(conn->handle.fd);
+	}
 
 	if (done > 0) {
 		/* we count the total bytes sent, and the send rate for 32-byte