BUG/MEDIUM: pollers: Use a global list for fd shared between threads.

With the old model, any fd shared by multiple threads, such as listeners
or dns sockets, would only be updated on one threads, so that could lead
to missed event, or spurious wakeups.
To avoid this, add a global list for fd that are shared, using the same
implementation as the fd cache, and only remove entries from this list
when every thread as updated its poller.

[wt: this will need to be backported to 1.8 but differently so this patch
 must not be backported as-is]
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 0f10b48..e27ecc6 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -256,6 +256,8 @@
 int  thread_no_sync(void);
 int  thread_need_sync(void);
 
+extern unsigned long all_threads_mask;
+
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 
 /* WARNING!!! if you update this enum, please also keep lock_label() up to date below */
diff --git a/include/proto/fd.h b/include/proto/fd.h
index 543a420..da09731 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -36,6 +36,8 @@
 extern volatile struct fdlist fd_cache;
 extern volatile struct fdlist fd_cache_local[MAX_THREADS];
 
+extern volatile struct fdlist update_list;
+
 extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
 
 extern THREAD_LOCAL int *fd_updt;  // FD updates list
@@ -101,15 +103,57 @@
  */
 static inline void updt_fd_polling(const int fd)
 {
-	unsigned int oldupdt;
+	if (fdtab[fd].thread_mask == tid_bit) {
+		unsigned int oldupdt;
+
+		/* note: we don't have a test-and-set yet in hathreads */
 
-	/* note: we don't have a test-and-set yet in hathreads */
+		if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+			return;
+
+		oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1;
+		fd_updt[oldupdt] = fd;
+	} else {
+		unsigned long update_mask = fdtab[fd].update_mask;
+		do {
+			if (update_mask == fdtab[fd].thread_mask)
+				return;
+		} while (!HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask,
+		    fdtab[fd].thread_mask));
+		fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, update));
+	}
+
+}
 
-	if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
-		return;
+/* Called from the poller to acknoledge we read an entry from the global
+ * update list, to remove our bit from the update_mask, and remove it from
+ * the list if we were the last one.
+ */
+static inline void done_update_polling(int fd)
+{
+	unsigned long update_mask;
+
+	update_mask = HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+	while ((update_mask & all_threads_mask)== 0) {
+		/* If we were the last one that had to update that entry, remove it from the list */
+		fd_rm_from_fd_list(&update_list, fd, offsetof(struct fdtab, update));
+		if (update_list.first == fd)
+			abort();
+		update_mask = (volatile unsigned long)fdtab[fd].update_mask;
+		if ((update_mask & all_threads_mask) != 0) {
+			/* Maybe it's been re-updated in the meanwhile, and we
+			 * wrongly removed it from the list, if so, re-add it
+			 */
+			fd_add_to_fd_list(&update_list, fd, offsetof(struct fdtab, update));
+			update_mask = (volatile unsigned long)(fdtab[fd].update_mask);
+			/* And then check again, just in case after all it
+			 * should be removed, even if it's very unlikely, given
+			 * the current thread wouldn't have been able to take
+			 * care of it yet */
+		} else
+			break;
 
-	oldupdt = HA_ATOMIC_ADD(&fd_nbupdt, 1) - 1;
-	fd_updt[oldupdt] = fd;
+	}
 }
 
 /* Allocates a cache entry for a file descriptor if it does not yet have one.
diff --git a/include/types/fd.h b/include/types/fd.h
index 0902e7f..aa18ebe 100644
--- a/include/types/fd.h
+++ b/include/types/fd.h
@@ -117,6 +117,7 @@
 	unsigned long polled_mask;           /* mask of thread IDs currently polling this fd */
 	unsigned long update_mask;           /* mask of thread IDs having an update for fd */
 	struct fdlist_entry cache;           /* Entry in the fdcache */
+	struct fdlist_entry update;          /* Entry in the global update list */
 	void (*iocb)(int fd);                /* I/O handler */
 	void *owner;                         /* the connection or listener associated with this fd, NULL if closed */
 	unsigned char state;                 /* FD state for read and write directions (2*3 bits) */
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index a8e5797..584bf64 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -59,16 +59,55 @@
 	}
 }
 
+static void _update_fd(int fd)
+{
+	int en, opcode;
+
+	en = fdtab[fd].state;
+
+	if (fdtab[fd].polled_mask & tid_bit) {
+		if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
+			/* fd removed from poll list */
+			opcode = EPOLL_CTL_DEL;
+			HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+		}
+		else {
+			/* fd status changed */
+			opcode = EPOLL_CTL_MOD;
+		}
+	}
+	else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
+		/* new fd in the poll list */
+		opcode = EPOLL_CTL_ADD;
+		HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+	}
+	else {
+		return;
+	}
+
+	/* construct the epoll events based on new state */
+	ev.events = 0;
+	if (en & FD_EV_POLLED_R)
+		ev.events |= EPOLLIN | EPOLLRDHUP;
+
+	if (en & FD_EV_POLLED_W)
+		ev.events |= EPOLLOUT;
+
+	ev.data.fd = fd;
+	epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
+}
+
 /*
  * Linux epoll() poller
  */
 REGPRM2 static void _do_poll(struct poller *p, int exp)
 {
-	int status, en;
-	int fd, opcode;
+	int status;
+	int fd;
 	int count;
 	int updt_idx;
 	int wait_time;
+	int old_fd;
 
 	/* first, scan the update list to find polling changes */
 	for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
@@ -80,40 +119,27 @@
 			continue;
 		}
 
-		en = fdtab[fd].state;
-
-		if (fdtab[fd].polled_mask & tid_bit) {
-			if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
-				/* fd removed from poll list */
-				opcode = EPOLL_CTL_DEL;
-				HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
-			}
-			else {
-				/* fd status changed */
-				opcode = EPOLL_CTL_MOD;
-			}
-		}
-		else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
-			/* new fd in the poll list */
-			opcode = EPOLL_CTL_ADD;
-			HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
-		}
-		else {
+		_update_fd(fd);
+	}
+	fd_nbupdt = 0;
+	/* Scan the global update list */
+	for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+		if (fd == -2) {
+			fd = old_fd;
 			continue;
 		}
-
-		/* construct the epoll events based on new state */
-		ev.events = 0;
-		if (en & FD_EV_POLLED_R)
-			ev.events |= EPOLLIN | EPOLLRDHUP;
-
-		if (en & FD_EV_POLLED_W)
-			ev.events |= EPOLLOUT;
-
-		ev.data.fd = fd;
-		epoll_ctl(epoll_fd[tid], opcode, fd, &ev);
+		else if (fd <= -3)
+			fd = -fd -4;
+		if (fd == -1)
+			break;
+		if (fdtab[fd].update_mask & tid_bit)
+			done_update_polling(fd);
+		else
+			continue;
+		if (!fdtab[fd].owner)
+			continue;
+		_update_fd(fd);
 	}
-	fd_nbupdt = 0;
 
 	/* compute the epoll_wait() timeout */
 	if (!exp)
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index ebfd5d2..926f77c 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -33,6 +33,41 @@
 static THREAD_LOCAL struct kevent *kev = NULL;
 static struct kevent *kev_out = NULL; // Trash buffer for kevent() to write the eventlist in
 
+static int _update_fd(int fd)
+{
+	int en;
+	int changes = 0;
+
+	en = fdtab[fd].state;
+
+	if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
+		if (!(fdtab[fd].polled_mask & tid_bit)) {
+			/* fd was not watched, it's still not */
+			return 0;
+		}
+		/* fd totally removed from poll list */
+		EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+		EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+		HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+	}
+	else {
+		/* OK fd has to be monitored, it was either added or changed */
+
+		if (en & FD_EV_POLLED_R)
+			EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+		else if (fdtab[fd].polled_mask & tid_bit)
+			EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+
+		if (en & FD_EV_POLLED_W)
+			EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+		else if (fdtab[fd].polled_mask & tid_bit)
+			EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+
+		HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+	}
+	return changes;
+}
+
 /*
  * kqueue() poller
  */
@@ -41,8 +76,9 @@
 	int status;
 	int count, fd, delta_ms;
 	struct timespec timeout;
-	int updt_idx, en;
+	int updt_idx;
 	int changes = 0;
+	int old_fd;
 
 	timeout.tv_sec  = 0;
 	timeout.tv_nsec = 0;
@@ -55,35 +91,27 @@
 			activity[tid].poll_drop++;
 			continue;
 		}
-
-		en = fdtab[fd].state;
-
-		if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
-			if (!(fdtab[fd].polled_mask & tid_bit)) {
-				/* fd was not watched, it's still not */
-				continue;
-			}
-			/* fd totally removed from poll list */
-			EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
-			EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-			HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
-		}
-		else {
-			/* OK fd has to be monitored, it was either added or changed */
-
-			if (en & FD_EV_POLLED_R)
-				EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
-			else if (fdtab[fd].polled_mask & tid_bit)
-				EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
-
-			if (en & FD_EV_POLLED_W)
-				EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
-			else if (fdtab[fd].polled_mask & tid_bit)
-				EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-
-			HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+		changes += _update_fd(fd);
+	}
+	/* Scan the global update list */
+	for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+		if (fd == -2) {
+			fd = old_fd;
+			continue;
 		}
+		else if (fd <= -3)
+			fd = -fd -4;
+		if (fd == -1)
+			break;
+		if (fdtab[fd].update_mask & tid_bit)
+			done_update_polling(fd);
+		else
+			continue;
+		if (!fdtab[fd].owner)
+			continue;
+		changes += _update_fd(fd);
 	}
+
 	if (changes) {
 #ifdef EV_RECEIPT
 		kev[0].flags |= EV_RECEIPT;
diff --git a/src/ev_poll.c b/src/ev_poll.c
index 6093b65..155ac82 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -45,6 +45,44 @@
 	hap_fd_clr(fd, fd_evts[DIR_WR]);
 }
 
+static void _update_fd(int fd, int *max_add_fd)
+{
+	int en;
+
+	en = fdtab[fd].state;
+
+	/* we have a single state for all threads, which is why we
+	 * don't check the tid_bit. First thread to see the update
+	 * takes it for every other one.
+	 */
+	if (!(en & FD_EV_POLLED_RW)) {
+		if (!fdtab[fd].polled_mask) {
+			/* fd was not watched, it's still not */
+			return;
+		}
+		/* fd totally removed from poll list */
+		hap_fd_clr(fd, fd_evts[DIR_RD]);
+		hap_fd_clr(fd, fd_evts[DIR_WR]);
+		HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
+	}
+	else {
+		/* OK fd has to be monitored, it was either added or changed */
+		if (!(en & FD_EV_POLLED_R))
+			hap_fd_clr(fd, fd_evts[DIR_RD]);
+		else
+			hap_fd_set(fd, fd_evts[DIR_RD]);
+
+		if (!(en & FD_EV_POLLED_W))
+			hap_fd_clr(fd, fd_evts[DIR_WR]);
+		else
+			hap_fd_set(fd, fd_evts[DIR_WR]);
+
+		HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+		if (fd > *max_add_fd)
+			*max_add_fd = fd;
+	}
+}
+
 /*
  * Poll() poller
  */
@@ -53,11 +91,12 @@
 	int status;
 	int fd;
 	int wait_time;
-	int updt_idx, en;
+	int updt_idx;
 	int fds, count;
 	int sr, sw;
 	int old_maxfd, new_maxfd, max_add_fd;
 	unsigned rn, wn; /* read new, write new */
+	int old_fd;
 
 	max_add_fd = -1;
 
@@ -70,39 +109,31 @@
 			activity[tid].poll_drop++;
 			continue;
 		}
+		_update_fd(fd, &max_add_fd);
+	}
 
-		en = fdtab[fd].state;
-
-		/* we have a single state for all threads, which is why we
-		 * don't check the tid_bit. First thread to see the update
-		 * takes it for every other one.
-		 */
-		if (!(en & FD_EV_POLLED_RW)) {
-			if (!fdtab[fd].polled_mask) {
-				/* fd was not watched, it's still not */
-				continue;
-			}
-			/* fd totally removed from poll list */
-			hap_fd_clr(fd, fd_evts[DIR_RD]);
-			hap_fd_clr(fd, fd_evts[DIR_WR]);
-			HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
-		}
-		else {
-			/* OK fd has to be monitored, it was either added or changed */
-			if (!(en & FD_EV_POLLED_R))
-				hap_fd_clr(fd, fd_evts[DIR_RD]);
-			else
-				hap_fd_set(fd, fd_evts[DIR_RD]);
-
-			if (!(en & FD_EV_POLLED_W))
-				hap_fd_clr(fd, fd_evts[DIR_WR]);
-			else
-				hap_fd_set(fd, fd_evts[DIR_WR]);
-
-			HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
-			if (fd > max_add_fd)
-				max_add_fd = fd;
+	/* Now scan the global update list */
+	for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+		if (fd == -2) {
+			fd = old_fd;
+			continue;
 		}
+		else if (fd <= -3)
+			fd = -fd -4;
+		if (fd == -1)
+			break;
+		if (fdtab[fd].update_mask & tid_bit) {
+			/* Cheat a bit, as the state is global to all pollers
+			 * we don't need every thread ot take care of the
+			 * update.
+			 */
+			HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
+			done_update_polling(fd);
+		} else
+			continue;
+		if (!fdtab[fd].owner)
+			continue;
+		_update_fd(fd, &max_add_fd);
 	}
 
 	/* maybe we added at least one fd larger than maxfd */
diff --git a/src/ev_select.c b/src/ev_select.c
index 163a458..ac4a360 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -36,6 +36,44 @@
 	hap_fd_clr(fd, fd_evts[DIR_WR]);
 }
 
+static void _update_fd(int fd, int *max_add_fd)
+{
+	int en;
+
+	en = fdtab[fd].state;
+
+	/* we have a single state for all threads, which is why we
+	 * don't check the tid_bit. First thread to see the update
+	 * takes it for every other one.
+	 */
+	if (!(en & FD_EV_POLLED_RW)) {
+		if (!fdtab[fd].polled_mask) {
+			/* fd was not watched, it's still not */
+			return;
+		}
+		/* fd totally removed from poll list */
+		hap_fd_clr(fd, fd_evts[DIR_RD]);
+		hap_fd_clr(fd, fd_evts[DIR_WR]);
+		HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
+	}
+	else {
+		/* OK fd has to be monitored, it was either added or changed */
+		if (!(en & FD_EV_POLLED_R))
+			hap_fd_clr(fd, fd_evts[DIR_RD]);
+		else
+			hap_fd_set(fd, fd_evts[DIR_RD]);
+
+		if (!(en & FD_EV_POLLED_W))
+			hap_fd_clr(fd, fd_evts[DIR_WR]);
+		else
+			hap_fd_set(fd, fd_evts[DIR_WR]);
+
+		HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
+		if (fd > *max_add_fd)
+			*max_add_fd = fd;
+	}
+}
+
 /*
  * Select() poller
  */
@@ -46,10 +84,11 @@
 	struct timeval delta;
 	int delta_ms;
 	int fds;
-	int updt_idx, en;
+	int updt_idx;
 	char count;
 	int readnotnull, writenotnull;
 	int old_maxfd, new_maxfd, max_add_fd;
+	int old_fd;
 
 	max_add_fd = -1;
 
@@ -62,41 +101,33 @@
 			activity[tid].poll_drop++;
 			continue;
 		}
-
-		en = fdtab[fd].state;
-
-		/* we have a single state for all threads, which is why we
-		 * don't check the tid_bit. First thread to see the update
-		 * takes it for every other one.
-		 */
-		if (!(en & FD_EV_POLLED_RW)) {
-			if (!fdtab[fd].polled_mask) {
-				/* fd was not watched, it's still not */
-				continue;
-			}
-			/* fd totally removed from poll list */
-			hap_fd_clr(fd, fd_evts[DIR_RD]);
-			hap_fd_clr(fd, fd_evts[DIR_WR]);
-			HA_ATOMIC_AND(&fdtab[fd].polled_mask, 0);
-		}
-		else {
-			/* OK fd has to be monitored, it was either added or changed */
-			if (!(en & FD_EV_POLLED_R))
-				hap_fd_clr(fd, fd_evts[DIR_RD]);
-			else
-				hap_fd_set(fd, fd_evts[DIR_RD]);
-
-			if (!(en & FD_EV_POLLED_W))
-				hap_fd_clr(fd, fd_evts[DIR_WR]);
-			else
-				hap_fd_set(fd, fd_evts[DIR_WR]);
-
-			HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
-			if (fd > max_add_fd)
-				max_add_fd = fd;
+		_update_fd(fd, &max_add_fd);
+	}
+	/* Now scan the global update list */
+	for (old_fd = fd = update_list.first; fd != -1; fd = fdtab[fd].update.next) {
+		if (fd == -2) {
+			fd = old_fd;
+			continue;
 		}
+		else if (fd <= -3)
+			fd = -fd -4;
+		if (fd == -1)
+			break;
+		if (fdtab[fd].update_mask & tid_bit) {
+			/* Cheat a bit, as the state is global to all pollers
+			 * we don't need every thread ot take care of the
+			 * update.
+			 */
+			HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
+			done_update_polling(fd);
+		} else
+			continue;
+		if (!fdtab[fd].owner)
+			continue;
+		_update_fd(fd, &max_add_fd);
 	}
 
+
 	/* maybe we added at least one fd larger than maxfd */
 	for (old_maxfd = maxfd; old_maxfd <= max_add_fd; ) {
 		if (HA_ATOMIC_CAS(&maxfd, &old_maxfd, max_add_fd + 1))
diff --git a/src/fd.c b/src/fd.c
index 01de0e1..4e88d30 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -169,6 +169,7 @@
 
 volatile struct fdlist fd_cache ; // FD events cache
 volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread
+volatile struct fdlist update_list; // Global update list
 
 unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
 
@@ -244,7 +245,6 @@
 	int prev;
 	int next;
 	int last;
-
 lock_self:
 #if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
 	next_list.next = next_list.prev = -2;
@@ -492,6 +492,7 @@
 		goto fail_info;
 
 	fd_cache.first = fd_cache.last = -1;
+	update_list.first = update_list.last = -1;
 	hap_register_per_thread_init(init_pollers_per_thread);
 	hap_register_per_thread_deinit(deinit_pollers_per_thread);
 
@@ -499,7 +500,7 @@
 		HA_SPIN_INIT(&fdtab[p].lock);
 		/* Mark the fd as out of the fd cache */
 		fdtab[p].cache.next = -3;
-		fdtab[p].cache.next = -3;
+		fdtab[p].update.next = -3;
 	}
 	for (p = 0; p < global.nbthread; p++)
 		fd_cache_local[p].first = fd_cache_local[p].last = -1;
diff --git a/src/hathreads.c b/src/hathreads.c
index 0d690f3..5db3c21 100644
--- a/src/hathreads.c
+++ b/src/hathreads.c
@@ -31,7 +31,7 @@
 static HA_SPINLOCK_T sync_lock;
 static int           threads_sync_pipe[2];
 static unsigned long threads_want_sync = 0;
-static unsigned long all_threads_mask  = 0;
+unsigned long all_threads_mask  = 0;
 
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 struct lock_stat lock_stats[LOCK_LABELS];