BUG/MEDIUM: epoll/threads: use one epoll_fd per thread

There currently is a problem regarding epoll(). While select() and poll()
compute their polling state on the fly upon each call, epoll() keeps a
shared state between all threads via the epoll_fd. The problem is that
once an fd is registered on *any* thread, all other threads receive
events for that FD as well. It is clearly visible when binding a listener
to a single thread like in the configuration below where all 4 threads
will work, 3 of them simply spinning to skip the event :

    global
        nbthread 4

    frontend foo
        bind :1234 process 1/1

The worst case happens when some slow operations are in progress on a
busy thread, preventing it from processing its task and causing the
other ones to wake up not being able to do anything with this event.
Typically computing a large TLS key will delay processing of next
events on the same thread while others will still wake up.

All this simply shows that the poller must remain thread-specific, with
its own events and its own ability to sleep when it doesn't have anyhing
to do.

This patch does exactly this. For this, it proceeds like this :

   - have one epoll_fd per thread instead of one per process
   - initialize these epoll_fd when threads are created.
   - mark all known FDs as updated so that the next invocation of
     _do_poll() recomputes their polling status (including a possible
     removal of undesired polling from the original FD) ;
   - use each fd's polled_mask to maintain an accurate status of
     the current polling activity for this FD.
   - when scanning updates, only focus on events whose new polling
     status differs from the existing one
   - during updates, always verify the thread_mask to resist migration
   - on __fd_clo(), for cloned FDs (typically listeners inherited
     from the parent during a graceful shutdown), run epoll_ctl(DEL)
     on all epoll_fd. This is the reason why epoll_fd is stored in a
     shared array and not in a thread_local storage. Note: maybe this
     can be moved to an update instead.

Interestingly, this shows that we don't need the FD's old state anymore
and that we only use it to convert it to the new state based on stable
information. It appears clearly that the FD code can be further improved
by computing the final state directly when manipulating it.

With this change, the config above goes from 22000 cps at 380% CPU to
43000 cps at 100% CPU : not only the 3 unused threads are not activated,
but they do not disturb the activity anymore.

The output of "show activity" before and after the patch on a 4-thread
config where a first listener on thread 2 forwards over SSL to threads
3 & 4 shows this a much smaller amount of undesired events (thread 1
doesn't wake up anymore, poll_skip remains zero, fd_skip stays low) :

  // before: 400% CPU, 7700 cps, 13 seconds
  loops: 11380717 65879 5733468 5728129
  wake_cache: 0 63986 317547 314174
  wake_tasks: 0 0 0 0
  wake_applets: 0 0 0 0
  wake_signal: 0 0 0 0
  poll_exp: 0 63986 317547 314174
  poll_drop: 1 0 49981 48893
  poll_dead: 65514 0 31334 31934
  poll_skip: 46293690 34071 22867786 22858208
  fd_skip: 66068135 174157 33732685 33825727
  fd_lock: 0 2 2809 2905
  fd_del: 0 494361 80890 79464
  conn_dead: 0 0 0 0
  stream: 0 407747 50526 49474
  empty_rq: 11380718 1914 5683023 5678715
  long_rq: 0 0 0 0

  // after: 200% cpu, 9450 cps, 11 seconds
  loops: 17 66147 1001631 450968
  wake_cache: 0 66119 865139 321227
  wake_tasks: 0 0 0 0
  wake_applets: 0 0 0 0
  wake_signal: 0 0 0 0
  poll_exp: 0 66119 865139 321227
  poll_drop: 6 5 38279 60768
  poll_dead: 0 0 0 0
  poll_skip: 0 0 0 0
  fd_skip: 54 172661 4411407 2008198
  fd_lock: 0 0 10890 5394
  fd_del: 0 492829 58965 105091
  conn_dead: 0 0 0 0
  stream: 0 406223 38663 61338
  empty_rq: 18 40 962999 390549
  long_rq: 0 0 0 0

This patch presents a few risks but fixes a real problem with threads,
and as such it needs be backported to 1.8. It depends on previous patch
("MINOR: fd: add a bitmask to indicate that an FD is known by the poller").

Special thanks go to Samuel Reed for providing a large amount of useful
debugging information and for testing fixes.
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index f37455f..635b8a5 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -29,7 +29,7 @@
 
 /* private data */
 static THREAD_LOCAL struct epoll_event *epoll_events = NULL;
-static int epoll_fd;
+static int epoll_fd[MAX_THREADS]; // per-thread epoll_fd
 
 /* This structure may be used for any purpose. Warning! do not use it in
  * recursive functions !
@@ -49,8 +49,14 @@
  */
 REGPRM1 static void __fd_clo(int fd)
 {
-	if (unlikely(fdtab[fd].cloned))
-		epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &ev);
+	if (unlikely(fdtab[fd].cloned)) {
+		unsigned long m = fdtab[fd].thread_mask;
+		int i;
+
+		for (i = global.nbthread - 1; i >= 0; i--)
+			if (m & (1UL << i))
+				epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, fd, &ev);
+	}
 }
 
 /*
@@ -82,34 +88,36 @@
 		fdtab[fd].state = en;
 		HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 
-		if ((eo ^ en) & FD_EV_POLLED_RW) {
-			/* poll status changed */
-
-			if ((en & FD_EV_POLLED_RW) == 0) {
+		if (fdtab[fd].polled_mask & tid_bit) {
+			if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
 				/* fd removed from poll list */
 				opcode = EPOLL_CTL_DEL;
-			}
-			else if ((eo & FD_EV_POLLED_RW) == 0) {
-				/* new fd in the poll list */
-				opcode = EPOLL_CTL_ADD;
+				HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
 			}
 			else {
 				/* fd status changed */
 				opcode = EPOLL_CTL_MOD;
 			}
-
-			/* construct the epoll events based on new state */
-			ev.events = 0;
-			if (en & FD_EV_POLLED_R)
-				ev.events |= EPOLLIN | EPOLLRDHUP;
+		}
+		else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
+			/* new fd in the poll list */
+			opcode = EPOLL_CTL_ADD;
+			HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+		}
+		else {
+			continue;
+		}
 
-			if (en & FD_EV_POLLED_W)
-				ev.events |= EPOLLOUT;
+		/* construct the epoll events based on new state */
+		ev.events = 0;
+		if (en & FD_EV_POLLED_R)
+			ev.events |= EPOLLIN | EPOLLRDHUP;
 
-			ev.data.fd = fd;
+		if (en & FD_EV_POLLED_W)
+			ev.events |= EPOLLOUT;
 
-			epoll_ctl(epoll_fd, opcode, fd, &ev);
-		}
+		ev.data.fd = fd;
+		epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
 	}
 	fd_nbupdt = 0;
 
@@ -129,7 +137,7 @@
 	/* now let's wait for polled events */
 
 	gettimeofday(&before_poll, NULL);
-	status = epoll_wait(epoll_fd, epoll_events, global.tune.maxpollevents, wait_time);
+	status = epoll_wait(epoll_fd[tid], epoll_events, global.tune.maxpollevents, wait_time);
 	tv_update_date(wait_time, status);
 	measure_idle();
 
@@ -146,7 +154,10 @@
 		}
 
 		if (!(fdtab[fd].thread_mask & tid_bit)) {
+			/* FD has been migrated */
 			activity[tid].poll_skip++;
+			epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
+			HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
 			continue;
 		}
 
@@ -178,14 +189,38 @@
 
 static int init_epoll_per_thread()
 {
+	int fd;
+
 	epoll_events = calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents);
 	if (epoll_events == NULL)
-		return 0;
+		goto fail_alloc;
+
+	if (tid) {
+		epoll_fd[tid] = epoll_create(global.maxsock + 1);
+		if (epoll_fd[tid] < 0)
+			goto fail_fd;
+	}
+
+	/* we may have to unregister some events initially registered on the
+	 * original fd when it was alone, and/or to register events on the new
+	 * fd for this thread. Let's just mark them as updated, the poller will
+	 * do the rest.
+	 */
+	for (fd = 0; fd < maxfd; fd++)
+		updt_fd_polling(fd);
+
 	return 1;
+ fail_fd:
+	free(epoll_events);
+ fail_alloc:
+	return 0;
 }
 
 static void deinit_epoll_per_thread()
 {
+	if (tid)
+		close(epoll_fd[tid]);
+
 	free(epoll_events);
 	epoll_events = NULL;
 }
@@ -199,8 +234,8 @@
 {
 	p->private = NULL;
 
-	epoll_fd = epoll_create(global.maxsock + 1);
-	if (epoll_fd < 0)
+	epoll_fd[tid] = epoll_create(global.maxsock + 1);
+	if (epoll_fd[tid] < 0)
 		goto fail_fd;
 
 	hap_register_per_thread_init(init_epoll_per_thread);
@@ -219,9 +254,9 @@
  */
 REGPRM1 static void _do_term(struct poller *p)
 {
-	if (epoll_fd >= 0) {
-		close(epoll_fd);
-		epoll_fd = -1;
+	if (epoll_fd[tid] >= 0) {
+		close(epoll_fd[tid]);
+		epoll_fd[tid] = -1;
 	}
 
 	p->private = NULL;
@@ -251,10 +286,10 @@
  */
 REGPRM1 static int _do_fork(struct poller *p)
 {
-	if (epoll_fd >= 0)
-		close(epoll_fd);
-	epoll_fd = epoll_create(global.maxsock + 1);
-	if (epoll_fd < 0)
+	if (epoll_fd[tid] >= 0)
+		close(epoll_fd[tid]);
+	epoll_fd[tid] = epoll_create(global.maxsock + 1);
+	if (epoll_fd[tid] < 0)
 		return 0;
 	return 1;
 }
@@ -268,11 +303,14 @@
 static void _do_register(void)
 {
 	struct poller *p;
+	int i;
 
 	if (nbpollers >= MAX_POLLERS)
 		return;
 
-	epoll_fd = -1;
+	for (i = 0; i < MAX_THREADS; i++)
+		epoll_fd[i] = -1;
+
 	p = &pollers[nbpollers++];
 
 	p->name = "epoll";