[MAJOR] implemented support for speculative I/O processing

The pollers will now be able to speculatively call the I/O
processing functions and decide whether or not they want to
poll on those FDs. The changes primarily consist in teaching
those functions how to pass the info they got an EAGAIN.
diff --git a/include/common/defaults.h b/include/common/defaults.h
index e6552de..c99aafe 100644
--- a/include/common/defaults.h
+++ b/include/common/defaults.h
@@ -64,6 +64,13 @@
 #define MAX_READ_POLL_LOOPS 4
 #endif
 
+// same, but for writes. Generally, it's enough to write twice: one time for
+// first half of the buffer, and a second time for the last half after a
+// wrap-around.
+#ifndef MAX_WRITE_POLL_LOOPS
+#define MAX_WRITE_POLL_LOOPS 2
+#endif
+
 // the number of bytes returned by a read below which we will not try to
 // poll the socket again. Generally, return values below the MSS are worthless
 // to try again.
diff --git a/src/checks.c b/src/checks.c
index 2ae01db..309d0c4 100644
--- a/src/checks.c
+++ b/src/checks.c
@@ -22,6 +22,7 @@
 #include <common/compat.h>
 #include <common/config.h>
 #include <common/mini-clist.h>
+#include <common/standard.h>
 #include <common/time.h>
 
 #include <types/global.h>
@@ -47,7 +48,7 @@
  * remaining servers on the proxy and transfers queued sessions whenever
  * possible to other servers.
  */
-void set_server_down(struct server *s)
+static void set_server_down(struct server *s)
 {
 	struct pendconn *pc, *pc_bck, *pc_end;
 	struct session *sess;
@@ -102,25 +103,31 @@
 /*
  * This function is used only for server health-checks. It handles
  * the connection acknowledgement. If the proxy requires HTTP health-checks,
- * it sends the request. In other cases, it returns 1 if the socket is OK,
- * or -1 if an error occured.
+ * it sends the request. In other cases, it returns 1 in s->result if the
+ * socket is OK, or -1 if an error occured.
+ * The function itself returns 0 if it needs some polling before being called
+ * again, otherwise 1.
  */
-int event_srv_chk_w(int fd)
+static int event_srv_chk_w(int fd)
 {
+	__label__ out_wakeup, out_nowake;
 	struct task *t = fdtab[fd].owner;
 	struct server *s = t->context;
 	int skerr;
 	socklen_t lskerr = sizeof(skerr);
 
 	skerr = 1;
-	if ((getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1)
-	    || (skerr != 0)) {
+	if (unlikely(fdtab[fd].state == FD_STERROR ||
+		     (fdtab[fd].ev & FD_POLL_ERR) ||
+		     (getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1) ||
+		     (skerr != 0))) {
 		/* in case of TCP only, this tells us if the connection failed */
 		s->result = -1;
 		fdtab[fd].state = FD_STERROR;
-		EV_FD_CLR(fd, DIR_WR);
+		goto out_wakeup;
 	}
-	else if (s->result != -1) {
+
+	if (s->result != -1) {
 		/* we don't want to mark 'UP' a server on which we detected an error earlier */
 		if ((s->proxy->options & PR_O_HTTP_CHK) ||
 		    (s->proxy->options & PR_O_SSL3_CHK)) {
@@ -142,7 +149,11 @@
 #endif
 			if (ret == s->proxy->check_len) {
 				EV_FD_SET(fd, DIR_RD);   /* prepare for reading reply */
-				EV_FD_CLR(fd, DIR_WR);   /* nothing more to write */
+				goto out_nowake;
+			}
+			else if (ret == 0 || errno == EAGAIN) {
+				/* we want some polling to happen first */
+				fdtab[fd].ev &= ~FD_POLL_WR;
 				return 0;
 			}
 			else {
@@ -155,9 +166,12 @@
 			s->result = 1;
 		}
 	}
-
+ out_wakeup:
 	task_wakeup(&rq, t);
-	return 0;
+ out_nowake:
+	EV_FD_CLR(fd, DIR_WR);   /* nothing more to write */
+	fdtab[fd].ev &= ~FD_POLL_WR;
+	return 1;
 }
 
 
@@ -167,10 +181,12 @@
  * server replies HTTP 2xx or 3xx (valid responses), or if it returns at least
  * 5 bytes in response to SSL HELLO. The principle is that this is enough to
  * distinguish between an SSL server and a pure TCP relay. All other cases will
- * return -1. The function returns 0.
+ * return -1. The function returns 0 if it needs to be called again after some
+ * polling, otherwise non-zero..
  */
-int event_srv_chk_r(int fd)
+static int event_srv_chk_r(int fd)
 {
+	__label__ out_wakeup;
 	char reply[64];
 	int len, result;
 	struct task *t = fdtab[fd].owner;
@@ -179,34 +195,51 @@
 	socklen_t lskerr = sizeof(skerr);
 
 	result = len = -1;
-	if (!getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) && !skerr) {
+
+	if (unlikely(fdtab[fd].state == FD_STERROR ||
+		     (fdtab[fd].ev & FD_POLL_ERR) ||
+		     (getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr) == -1) ||
+		     (skerr != 0))) {
+		/* in case of TCP only, this tells us if the connection failed */
+		s->result = -1;
+		fdtab[fd].state = FD_STERROR;
+		goto out_wakeup;
+	}
+
 #ifndef MSG_NOSIGNAL
-		len = recv(fd, reply, sizeof(reply), 0);
+	len = recv(fd, reply, sizeof(reply), 0);
 #else
-		/* Warning! Linux returns EAGAIN on SO_ERROR if data are still available
-		 * but the connection was closed on the remote end. Fortunately, recv still
-		 * works correctly and we don't need to do the getsockopt() on linux.
-		 */
-		len = recv(fd, reply, sizeof(reply), MSG_NOSIGNAL);
+	/* Warning! Linux returns EAGAIN on SO_ERROR if data are still available
+	 * but the connection was closed on the remote end. Fortunately, recv still
+	 * works correctly and we don't need to do the getsockopt() on linux.
+	 */
+	len = recv(fd, reply, sizeof(reply), MSG_NOSIGNAL);
 #endif
-		if (((s->proxy->options & PR_O_HTTP_CHK) &&
-		     (len >= sizeof("HTTP/1.0 000")) &&
-		    !memcmp(reply, "HTTP/1.", 7) &&
-		    (reply[9] == '2' || reply[9] == '3')) /* 2xx or 3xx */
-		    || ((s->proxy->options & PR_O_SSL3_CHK) && (len >= 5) &&
-			(reply[0] == 0x15 || reply[0] == 0x16))) /* alert or handshake */
-			result = 1;
+	if (unlikely(len < 0 && errno == EAGAIN)) {
+		/* we want some polling to happen first */
+		fdtab[fd].ev &= ~FD_POLL_RD;
+		return 0;
 	}
 
+	if (((s->proxy->options & PR_O_HTTP_CHK) &&
+	     (len >= sizeof("HTTP/1.0 000")) &&
+	     !memcmp(reply, "HTTP/1.", 7) &&
+	     (reply[9] == '2' || reply[9] == '3')) /* 2xx or 3xx */
+	    || ((s->proxy->options & PR_O_SSL3_CHK) && (len >= 5) &&
+		(reply[0] == 0x15 || reply[0] == 0x16))) /* alert or handshake */
+		result = 1;
+
 	if (result == -1)
 		fdtab[fd].state = FD_STERROR;
 
 	if (s->result != -1)
 		s->result = result;
 
+ out_wakeup:
 	EV_FD_CLR(fd, DIR_RD);
 	task_wakeup(&rq, t);
-	return 0;
+	fdtab[fd].ev &= ~FD_POLL_RD;
+	return 1;
 }
 
 /*
diff --git a/src/stream_sock.c b/src/stream_sock.c
index a150a08..7d2aa30 100644
--- a/src/stream_sock.c
+++ b/src/stream_sock.c
@@ -21,6 +21,7 @@
 
 #include <common/compat.h>
 #include <common/config.h>
+#include <common/standard.h>
 #include <common/time.h>
 
 #include <types/buffers.h>
@@ -35,98 +36,118 @@
 
 /*
  * this function is called on a read event from a stream socket.
- * It returns 0.
+ * It returns 0 if we have a high confidence that we will not be
+ * able to read more data without polling first. Returns non-zero
+ * otherwise.
  */
 int stream_sock_read(int fd) {
+	__label__ out_wakeup;
 	struct buffer *b = fdtab[fd].cb[DIR_RD].b;
-	int ret, max;
+	int ret, max, retval;
 	int read_poll = MAX_READ_POLL_LOOPS;
 
 #ifdef DEBUG_FULL
 	fprintf(stderr,"stream_sock_read : fd=%d, owner=%p\n", fd, fdtab[fd].owner);
 #endif
 
-	if (fdtab[fd].state != FD_STERROR) {
-		while (read_poll-- > 0)
-		{
-			if (b->l == 0) { /* let's realign the buffer to optimize I/O */
-				b->r = b->w = b->lr  = b->data;
+	retval = 1;
+
+	if (unlikely(fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR))) {
+		/* read/write error */
+		b->flags |= BF_READ_ERROR;
+		fdtab[fd].state = FD_STERROR;
+		goto out_wakeup;
+	}
+
+	if (unlikely(fdtab[fd].ev & FD_POLL_HUP)) {
+		/* connection closed */
+		b->flags |= BF_READ_NULL;
+		goto out_wakeup;
+	}
+
+	retval = 0;
+	while (read_poll-- > 0)	{
+		if (b->l == 0) { /* let's realign the buffer to optimize I/O */
+			b->r = b->w = b->lr  = b->data;
+			max = b->rlim - b->data;
+		}
+		else if (b->r > b->w) {
+			max = b->rlim - b->r;
+		}
+		else {
+			max = b->w - b->r;
+			/* FIXME: theorically, if w>0, we shouldn't have rlim < data+size anymore
+			 * since it means that the rewrite protection has been removed. This
+			 * implies that the if statement can be removed.
+			 */
+			if (max > b->rlim - b->data)
 				max = b->rlim - b->data;
-			}
-			else if (b->r > b->w) {
-				max = b->rlim - b->r;
-			}
-			else {
-				max = b->w - b->r;
-				/* FIXME: theorically, if w>0, we shouldn't have rlim < data+size anymore
-				 * since it means that the rewrite protection has been removed. This
-				 * implies that the if statement can be removed.
-				 */
-				if (max > b->rlim - b->data)
-					max = b->rlim - b->data;
-			}
+		}
 	    
-			if (max == 0) {  /* not anymore room to store data */
-				EV_FD_CLR(fd, DIR_RD);
-				break;
-			}
+		if (max == 0) {  /* not anymore room to store data */
+			EV_FD_CLR(fd, DIR_RD);
+			break;
+		}
 
 #ifndef MSG_NOSIGNAL
-			{
-				int skerr;
-				socklen_t lskerr = sizeof(skerr);
-	
-				ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
-				if (ret == -1 || skerr)
-					ret = -1;
-				else
-					ret = recv(fd, b->r, max, 0);
-			}
+		{
+			int skerr;
+			socklen_t lskerr = sizeof(skerr);
+
+			ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
+			if (ret == -1 || skerr)
+				ret = -1;
+			else
+				ret = recv(fd, b->r, max, 0);
+		}
 #else
-			ret = recv(fd, b->r, max, MSG_NOSIGNAL);
+		ret = recv(fd, b->r, max, MSG_NOSIGNAL);
 #endif
-			if (ret > 0) {
-				b->r += ret;
-				b->l += ret;
-				b->flags |= BF_PARTIAL_READ;
+		if (ret > 0) {
+			b->r += ret;
+			b->l += ret;
+			b->flags |= BF_PARTIAL_READ;
+			retval = 1;
 	
-				if (b->r == b->data + BUFSIZE) {
-					b->r = b->data; /* wrap around the buffer */
-				}
-
-				b->total += ret;
+			if (b->r == b->data + BUFSIZE) {
+				b->r = b->data; /* wrap around the buffer */
+			}
 
-				/* generally if we read something smaller than the 1 or 2 MSS,
-				 * it means that it's not worth trying to read again.
-				 */
-				if (ret < MIN_RET_FOR_READ_LOOP)
-					break;
-				if (!read_poll)
-					break;
+			b->total += ret;
 
-				/* we hope to read more data or to get a close on next round */
-				continue;
-			}
-			else if (ret == 0) {
-				b->flags |= BF_READ_NULL;
-				break;
-			}
-			else if (errno == EAGAIN) {/* ignore EAGAIN */
+			/* generally if we read something smaller than the 1 or 2 MSS,
+			 * it means that it's not worth trying to read again. It may
+			 * also happen on headers, but the application then can stop
+			 * reading before we start polling.
+			 */
+			if (ret < MIN_RET_FOR_READ_LOOP)
 				break;
-			}
-			else {
-				b->flags |= BF_READ_ERROR;
-				fdtab[fd].state = FD_STERROR;
+
+			if (!read_poll)
 				break;
-			}
-		} /* while(1) */
-	}
-	else {
-		b->flags |= BF_READ_ERROR;
-		fdtab[fd].state = FD_STERROR;
-	}
+
+			/* we hope to read more data or to get a close on next round */
+			continue;
+		}
+		else if (ret == 0) {
+			b->flags |= BF_READ_NULL;
+			retval = 1;     // connection closed
+			break;
+		}
+		else if (errno == EAGAIN) {/* ignore EAGAIN */
+			retval = 0;
+			break;
+		}
+		else {
+			retval = 1;
+			b->flags |= BF_READ_ERROR;
+			fdtab[fd].state = FD_STERROR;
+			break;
+		}
+	} /* while (read_poll) */
 
 	if (b->flags & BF_READ_STATUS) {
+	out_wakeup:
 		if (b->rto && EV_FD_ISSET(fd, DIR_RD))
 			tv_delayfrom(&b->rex, &now, b->rto);
 		else
@@ -135,55 +156,71 @@
 		task_wakeup(&rq, fdtab[fd].owner);
 	}
 
-	return 0;
+	fdtab[fd].ev &= ~FD_POLL_RD;
+	return retval;
 }
 
 
 /*
  * this function is called on a write event from a stream socket.
- * It returns 0.
+ * It returns 0 if we have a high confidence that we will not be
+ * able to write more data without polling first. Returns non-zero
+ * otherwise.
  */
 int stream_sock_write(int fd) {
+	__label__ out_eternity;
 	struct buffer *b = fdtab[fd].cb[DIR_WR].b;
-	int ret, max;
+	int ret, max, retval;
+	int write_poll = MAX_WRITE_POLL_LOOPS;
 
 #ifdef DEBUG_FULL
 	fprintf(stderr,"stream_sock_write : fd=%d, owner=%p\n", fd, fdtab[fd].owner);
 #endif
 
-	if (b->l == 0) { /* let's realign the buffer to optimize I/O */
-		b->r = b->w = b->lr  = b->data;
-		max = 0;
-	}
-	else if (b->r > b->w) {
-		max = b->r - b->w;
+	retval = 1;
+
+	if (unlikely(fdtab[fd].state == FD_STERROR || (fdtab[fd].ev & FD_POLL_ERR))) {
+		/* read/write error */
+		b->flags |= BF_WRITE_ERROR;
+		fdtab[fd].state = FD_STERROR;
+		EV_FD_CLR(fd, DIR_WR);
+		goto out_eternity;
 	}
-	else
-		max = b->data + BUFSIZE - b->w;
-    
-	if (fdtab[fd].state != FD_STERROR) {
+
+	retval = 0;
+	while (write_poll-- > 0) {
+		if (b->l == 0) { /* let's realign the buffer to optimize I/O */
+			b->r = b->w = b->lr  = b->data;
+			max = 0;
+		}
+		else if (b->r > b->w) {
+			max = b->r - b->w;
+		}
+		else {
+			max = b->data + BUFSIZE - b->w;
+		}
+
 		if (max == 0) {
 			/* may be we have received a connection acknowledgement in TCP mode without data */
-			if (fdtab[fd].state == FD_STCONN) {
+			if (!(b->flags & BF_PARTIAL_WRITE)
+			    && fdtab[fd].state == FD_STCONN) {
 				int skerr;
 				socklen_t lskerr = sizeof(skerr);
 				ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
 				if (ret == -1 || skerr) {
 					b->flags |= BF_WRITE_ERROR;
 					fdtab[fd].state = FD_STERROR;
-					task_wakeup(&rq, fdtab[fd].owner);
-					tv_eternity(&b->wex);
 					EV_FD_CLR(fd, DIR_WR);
-					return 0;
+					retval = 1;
+					goto out_eternity;
 				}
 			}
 
 			b->flags |= BF_WRITE_NULL;
-			task_wakeup(&rq, fdtab[fd].owner);
 			fdtab[fd].state = FD_STREADY;
-			tv_eternity(&b->wex);
 			EV_FD_CLR(fd, DIR_WR);
-			return 0;
+			retval = 1;
+			goto out_eternity;
 		}
 
 #ifndef MSG_NOSIGNAL
@@ -206,41 +243,54 @@
 			b->w += ret;
 	    
 			b->flags |= BF_PARTIAL_WRITE;
+			retval = 1;
 	    
 			if (b->w == b->data + BUFSIZE) {
 				b->w = b->data; /* wrap around the buffer */
 			}
+
+			if (!write_poll)
+				break;
+
+			/* we hope to be able to write more data */
+			continue;
 		}
 		else if (ret == 0) {
 			/* nothing written, just pretend we were never called */
-			// b->flags |= BF_WRITE_NULL;
-			return 0;
+			retval = 0;
+			break;
 		}
-		else if (errno == EAGAIN) /* ignore EAGAIN */
-			return 0;
+		else if (errno == EAGAIN) {/* ignore EAGAIN */
+			retval = 0;
+			break;
+		}
 		else {
 			b->flags |= BF_WRITE_ERROR;
 			fdtab[fd].state = FD_STERROR;
+			EV_FD_CLR(fd, DIR_WR);
+			retval = 1;
+			goto out_eternity;
 		}
-	}
-	else {
-		b->flags |= BF_WRITE_ERROR;
-		fdtab[fd].state = FD_STERROR;
-	}
+	} /* while (write_poll) */
 
-	if (b->wto) {
-		tv_delayfrom(&b->wex, &now, b->wto);
-		/* FIXME: to prevent the client from expiring read timeouts during writes,
-		 * we refresh it. A solution would be to merge read+write timeouts into a
-		 * unique one, although that needs some study particularly on full-duplex
-		 * TCP connections. */
-		b->rex = b->wex;
+	if (b->flags & BF_WRITE_STATUS) {
+		if (b->wto) {
+			tv_delayfrom(&b->wex, &now, b->wto);
+			/* FIXME: to prevent the client from expiring read timeouts during writes,
+			 * we refresh it. A solution would be to merge read+write timeouts into a
+			 * unique one, although that needs some study particularly on full-duplex
+			 * TCP connections. */
+			b->rex = b->wex;
+		}
+		else {
+		out_eternity:
+			tv_eternity(&b->wex);
+		}
 	}
-	else
-		tv_eternity(&b->wex);
 
 	task_wakeup(&rq, fdtab[fd].owner);
-	return 0;
+	fdtab[fd].ev &= ~FD_POLL_WR;
+	return retval;
 }