MAJOR: polling: centralize calls to I/O callbacks

In order for HTTP/2 not to eat too much memory, we'll have to support
on-the-fly buffer allocation, since most streams will have an empty
request buffer at some point. Supporting allocation on the fly means
being able to sleep inside I/O callbacks if a buffer is not available.

Till now, the I/O callbacks were called from two locations :
  - when processing the cached events
  - when processing the polled events from the poller

This change cleans up the design a bit further than what was started in
1.5. It now ensures that we never call any iocb from the poller itself
and that instead, events learned by the poller are put into the cache.
The benefit is important in terms of stability : we don't have to care
anymore about the risk that new events are added into the poller while
processing its events, and we're certain that updates are processed at
a single location.

To achieve this, we now modify all the fd_* functions so that instead of
creating updates, they add/remove the fd to/from the cache depending on
its state, and only create an update when the polling status reaches a
state where it will have to change. Since the pollers make use of these
functions to notify readiness (using fd_may_recv/fd_may_send), the cache
is always up to date with the poller.

Creating updates only when the polling status needs to change saves a
significant amount of work for the pollers : a benchmark showed that on
a typical TCP proxy test, the amount of updates per connection dropped
from 11 to 1 on average. This also means that the update list is smaller
and has more chances of not thrashing too many CPU cache lines. The first
observed benefit is a net 2% performance gain on the connection rate.

A second benefit is that when a connection is accepted, it's only when
we're processing the cache, and the recv event is automatically added
into the cache *after* the current one, resulting in this event to be
processed immediately during the same loop. Previously we used to have
a second run over the updates to detect if new events were added to
catch them before waking up tasks.

The next gain will be offered by the next steps on this subject consisting
in implementing an I/O queue containing all cached events ordered by priority
just like the run queue, and to be able to leave some events pending there
as long as needed. That will allow us *not* to perform some FD processing
if it's not the proper time for this (typically keep waiting for a buffer
to be allocated if none is available for an recv()). And by only processing
a small bunch of them, we'll allow priorities to take place even at the I/O
level.

As a result of this change, functions fd_alloc_or_release_cache_entry()
and fd_process_polled_events() have disappeared, and the code dedicated
to checking for new fd events after the callback during the poll() loop
was removed as well. Despite the patch looking large, it's mostly a
change of what function is falled upon fd_*() and almost nothing was
added.
diff --git a/include/proto/fd.h b/include/proto/fd.h
index b0a478e..87309bf 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -81,18 +81,10 @@
  */
 void fd_process_cached_events();
 
-/* Check the events attached to a file descriptor, update its cache
- * accordingly, and call the associated I/O callback. If new updates are
- * detected, the function tries to process them as well in order to save
- * wakeups after accept().
+/* Mark fd <fd> as updated for polling and allocate an entry in the update list
+ * for this if it was not already there. This can be done at any time.
  */
-void fd_process_polled_events(int fd);
-
-
-/* Mark fd <fd> as updated and allocate an entry in the update list for this if
- * it was not already there. This can be done at any time.
- */
-static inline void updt_fd(const int fd)
+static inline void updt_fd_polling(const int fd)
 {
 	if (fdtab[fd].updated)
 		/* already scheduled for update */
@@ -157,15 +149,22 @@
 	return state;
 }
 
-/* Automatically allocates or releases a cache entry for fd <fd> depending on
- * its new state. This is meant to be used by pollers while processing updates.
+/* This function automatically enables/disables caching for an entry depending
+ * on its state, and also possibly creates an update entry so that the poller
+ * does its job as well. It is only called on state changes.
  */
-static inline void fd_alloc_or_release_cache_entry(int fd, int new_state)
+static inline void fd_update_cache(int fd)
 {
-	/* READY and ACTIVE states (the two with both flags set) require a cache entry */
+	/* 3 states for each direction require a polling update */
+	if ((fdtab[fd].state & (FD_EV_POLLED_R |                 FD_EV_ACTIVE_R)) == FD_EV_POLLED_R ||
+	    (fdtab[fd].state & (FD_EV_POLLED_R | FD_EV_READY_R | FD_EV_ACTIVE_R)) == FD_EV_ACTIVE_R ||
+	    (fdtab[fd].state & (FD_EV_POLLED_W |                 FD_EV_ACTIVE_W)) == FD_EV_POLLED_W ||
+	    (fdtab[fd].state & (FD_EV_POLLED_W | FD_EV_READY_W | FD_EV_ACTIVE_W)) == FD_EV_ACTIVE_W)
+		updt_fd_polling(fd);
 
-	if (((new_state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
-	    ((new_state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
+	/* only READY and ACTIVE states (the two with both flags set) require a cache entry */
+	if (((fdtab[fd].state & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R)) ||
+	    ((fdtab[fd].state & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))) {
 		fd_alloc_cache_entry(fd);
 	}
 	else {
@@ -243,7 +242,7 @@
 	if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_R))
 		return; /* already disabled */
 	fdtab[fd].state &= ~FD_EV_ACTIVE_R;
-	updt_fd(fd); /* need an update entry to change the state */
+	fd_update_cache(fd); /* need an update entry to change the state */
 }
 
 /* Disable processing send events on fd <fd> */
@@ -252,7 +251,7 @@
 	if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_W))
 		return; /* already disabled */
 	fdtab[fd].state &= ~FD_EV_ACTIVE_W;
-	updt_fd(fd); /* need an update entry to change the state */
+	fd_update_cache(fd); /* need an update entry to change the state */
 }
 
 /* Disable processing of events on fd <fd> for both directions. */
@@ -261,7 +260,7 @@
 	if (!((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_RW))
 		return; /* already disabled */
 	fdtab[fd].state &= ~FD_EV_ACTIVE_RW;
-	updt_fd(fd); /* need an update entry to change the state */
+	fd_update_cache(fd); /* need an update entry to change the state */
 }
 
 /* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
@@ -270,7 +269,7 @@
 	if (!(((unsigned int)fdtab[fd].state) & FD_EV_READY_R))
 		return; /* already marked as blocked */
 	fdtab[fd].state &= ~FD_EV_READY_R;
-	updt_fd(fd);
+	fd_update_cache(fd);
 }
 
 /* Report that FD <fd> can receive anymore without polling. */
@@ -279,7 +278,7 @@
 	if (((unsigned int)fdtab[fd].state) & FD_EV_READY_R)
 		return; /* already marked as blocked */
 	fdtab[fd].state |= FD_EV_READY_R;
-	updt_fd(fd);
+	fd_update_cache(fd);
 }
 
 /* Disable readiness when polled. This is useful to interrupt reading when it
@@ -299,7 +298,7 @@
 	if (!(((unsigned int)fdtab[fd].state) & FD_EV_READY_W))
 		return; /* already marked as blocked */
 	fdtab[fd].state &= ~FD_EV_READY_W;
-	updt_fd(fd);
+	fd_update_cache(fd);
 }
 
 /* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
@@ -308,7 +307,7 @@
 	if (((unsigned int)fdtab[fd].state) & FD_EV_READY_W)
 		return; /* already marked as blocked */
 	fdtab[fd].state |= FD_EV_READY_W;
-	updt_fd(fd);
+	fd_update_cache(fd);
 }
 
 /* Prepare FD <fd> to try to receive */
@@ -317,7 +316,7 @@
 	if (((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_R))
 		return; /* already enabled */
 	fdtab[fd].state |= FD_EV_ACTIVE_R;
-	updt_fd(fd); /* need an update entry to change the state */
+	fd_update_cache(fd); /* need an update entry to change the state */
 }
 
 /* Prepare FD <fd> to try to send */
@@ -326,7 +325,7 @@
 	if (((unsigned int)fdtab[fd].state & FD_EV_ACTIVE_W))
 		return; /* already enabled */
 	fdtab[fd].state |= FD_EV_ACTIVE_W;
-	updt_fd(fd); /* need an update entry to change the state */
+	fd_update_cache(fd); /* need an update entry to change the state */
 }
 
 /* Prepares <fd> for being polled */
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index 9d359b2..755c6fa 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -69,7 +69,7 @@
 	int updt_idx;
 	int wait_time;
 
-	/* first, scan the update list to find changes */
+	/* first, scan the update list to find polling changes */
 	for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
 		fd = fd_updt[updt_idx];
 		fdtab[fd].updated = 0;
@@ -109,8 +109,6 @@
 			ev.data.fd = fd;
 			epoll_ctl(epoll_fd, opcode, fd, &ev);
 		}
-
-		fd_alloc_or_release_cache_entry(fd, en);
 	}
 	fd_nbupdt = 0;
 
@@ -175,7 +173,11 @@
 			n |= FD_POLL_HUP;
 
 		fdtab[fd].ev |= n;
-		fd_process_polled_events(fd);
+		if (n & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+			fd_may_recv(fd);
+
+		if (n & (FD_POLL_OUT | FD_POLL_ERR))
+			fd_may_send(fd);
 	}
 	/* the caller will take care of cached events */
 }
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 06ccaee..2af94b6 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -84,8 +84,6 @@
 				}
 			}
 		}
-
-		fd_alloc_or_release_cache_entry(fd, en);
 	}
 	if (changes)
 		kevent(kqueue_fd, kev, changes, NULL, 0, NULL);
@@ -138,7 +136,11 @@
 				fdtab[fd].ev |= FD_POLL_OUT;
 		}
 
-		fd_process_polled_events(fd);
+		if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+			fd_may_recv(fd);
+
+		if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
+			fd_may_send(fd);
 	}
 }
 
diff --git a/src/ev_poll.c b/src/ev_poll.c
index 2f6e56d..866906c 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -93,8 +93,6 @@
 			else if ((en & ~eo) & FD_EV_POLLED_W)
 				hap_fd_set(fd, fd_evts[DIR_WR]);
 		}
-
-		fd_alloc_or_release_cache_entry(fd, en);
 	}
 	fd_nbupdt = 0;
 
@@ -164,7 +162,11 @@
 				((e & POLLHUP) ? FD_POLL_HUP : 0);
 		}
 
-		fd_process_polled_events(fd);
+		if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+			fd_may_recv(fd);
+
+		if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
+			fd_may_send(fd);
 	}
 
 }
diff --git a/src/ev_select.c b/src/ev_select.c
index 5a76d44..73fe327 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -76,8 +76,6 @@
 			else if ((en & ~eo) & FD_EV_POLLED_W)
 				FD_SET(fd, fd_evts[DIR_WR]);
 		}
-
-		fd_alloc_or_release_cache_entry(fd, en);
 	}
 	fd_nbupdt = 0;
 
@@ -148,7 +146,11 @@
 			if (FD_ISSET(fd, tmp_evts[DIR_WR]))
 				fdtab[fd].ev |= FD_POLL_OUT;
 
-			fd_process_polled_events(fd);
+			if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+				fd_may_recv(fd);
+
+			if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
+				fd_may_send(fd);
 		}
 	}
 }
diff --git a/src/fd.c b/src/fd.c
index c238bc8..2a6179f 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -199,7 +199,9 @@
 }
 
 /* Scan and process the cached events. This should be called right after
- * the poller.
+ * the poller. The loop may cause new entries to be created, for example
+ * if a listener causes an accept() to initiate a new incoming connection
+ * wanting to attempt an recv().
  */
 void fd_process_cached_events()
 {
@@ -209,12 +211,6 @@
 		fd = fd_cache[entry];
 		e = fdtab[fd].state;
 
-		/* Principle: events which are marked FD_EV_ACTIVE are processed
-		 * with their usual I/O callback. The callback may remove the
-		 * events from the cache or tag them for polling. Changes will be
-		 * applied on next round. Cache entries with no more activity are
-		 * automatically scheduled for removal.
-		 */
 		fdtab[fd].ev &= FD_POLL_STICKY;
 
 		if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
@@ -226,7 +222,7 @@
 		if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev)
 			fdtab[fd].iocb(fd);
 		else
-			updt_fd(fd);
+			fd_release_cache_entry(fd);
 
 		/* If the fd was removed from the cache, it has been
 		 * replaced by the next one that we don't want to skip !
@@ -237,75 +233,6 @@
 	}
 }
 
-/* Check the events attached to a file descriptor, update its cache
- * accordingly, and call the associated I/O callback. If new updates are
- * detected, the function tries to process them as well in order to save
- * wakeups after accept().
- */
-void fd_process_polled_events(int fd)
-{
-	int new_updt, old_updt;
-
-	/* First thing to do is to mark the reported events as ready, in order
-	 * for them to later be continued from the cache without polling if
-	 * they have to be interrupted (eg: recv fills a buffer).
-	 */
-	if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
-		fd_may_recv(fd);
-
-	if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
-		fd_may_send(fd);
-
-	if (fdtab[fd].cache) {
-		/* This fd is already cached, no need to process it now. */
-		return;
-	}
-
-	if (unlikely(!fdtab[fd].iocb || !fdtab[fd].ev)) {
-		/* nothing to do */
-		return;
-	}
-
-	/* Save number of updates to detect creation of new FDs. */
-	old_updt = fd_nbupdt;
-	fdtab[fd].iocb(fd);
-
-	/* One or more fd might have been created during the iocb().
-	 * This mainly happens with new incoming connections that have
-	 * just been accepted, so we'd like to process them immediately
-	 * for better efficiency, as it saves one useless task wakeup.
-	 * Second benefit, if at the end the fds are disabled again, we can
-	 * safely destroy their update entry to reduce the scope of later
-	 * scans. This is the reason we scan the new entries backwards.
-	 */
-	for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
-		fd = fd_updt[new_updt - 1];
-		if (!fdtab[fd].new)
-			continue;
-
-		fdtab[fd].new = 0;
-		fdtab[fd].ev &= FD_POLL_STICKY;
-
-		if ((fdtab[fd].state & FD_EV_STATUS_R) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
-			fdtab[fd].ev |= FD_POLL_IN;
-
-		if ((fdtab[fd].state & FD_EV_STATUS_W) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
-			fdtab[fd].ev |= FD_POLL_OUT;
-
-		if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
-			fdtab[fd].iocb(fd);
-
-		/* we can remove this update entry if it's the last one and is
-		 * unused, otherwise we don't touch anything, especially given
-		 * that the FD might have been closed already.
-		 */
-		if (new_updt == fd_nbupdt && !fd_recv_active(fd) && !fd_send_active(fd)) {
-			fdtab[fd].updated = 0;
-			fd_nbupdt--;
-		}
-	}
-}
-
 /* disable the specified poller */
 void disable_poller(const char *poller_name)
 {