MAJOR: threads/fd: Make fd stuffs thread-safe

Many changes have been made to do so. First, the fd_updt array, where all
pending FDs for polling are stored, is now a thread-local array. Then 3 locks
have been added to protect, respectively, the fdtab array, the fd_cache array
and poll information. In addition, a lock for each entry in the fdtab array has
been added to protect all accesses to a specific FD or its information.

For pollers, according to the poller, the way to manage the concurrency is
different. There is a poller loop on each thread. So the set of monitored FDs
may need to be protected. epoll and kqueue are thread-safe per-se, so there few
things to do to protect these pollers. This is not possible with select and
poll, so there is no sharing between the threads. The poller on each thread is
independant from others.

Finally, per-thread init/deinit functions are used for each pollers and for FD
part for manage thread-local ressources.

Now, you must be carefull when a FD is created during the HAProxy startup. All
update on the FD state must be made in the threads context and never before
their creation. This is mandatory because fd_updt array is thread-local and
initialized only for threads. Because there is no pollers for the main one, this
array remains uninitialized in this context. For this reason, listeners are now
enabled in run_thread_poll_loop function, just like the worker pipe.
diff --git a/src/fd.c b/src/fd.c
index 8dab497..6c53a3b 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -155,6 +155,7 @@
 #include <types/global.h>
 
 #include <proto/fd.h>
+#include <proto/log.h>
 #include <proto/port_range.h>
 
 struct fdtab *fdtab = NULL;     /* array of all the file descriptors */
@@ -168,15 +169,23 @@
 int nbpollers = 0;
 
 unsigned int *fd_cache = NULL; // FD events cache
-unsigned int *fd_updt = NULL;  // FD updates list
 int fd_cache_num = 0;          // number of events in the cache
-int fd_nbupdt = 0;             // number of updates in the list
+
+THREAD_LOCAL int *fd_updt  = NULL;  // FD updates list
+THREAD_LOCAL int  fd_nbupdt = 0;   // number of updates in the list
+
+#ifdef USE_THREAD
+HA_SPINLOCK_T fdtab_lock;       /* global lock to protect fdtab array */
+HA_RWLOCK_T   fdcache_lock;     /* global lock to protect fd_cache array */
+HA_SPINLOCK_T poll_lock;        /* global lock to protect poll info */
+#endif
 
 /* Deletes an FD from the fdsets, and recomputes the maxfd limit.
  * The file descriptor is also closed.
  */
 static void fd_dodelete(int fd, int do_close)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (fdtab[fd].linger_risk) {
 		/* this is generally set when connecting to servers */
 		setsockopt(fd, SOL_SOCKET, SO_LINGER,
@@ -195,9 +204,12 @@
 	fdtab[fd].new = 0;
 	if (do_close)
 		close(fd);
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 
+	SPIN_LOCK(FDTAB_LOCK, &fdtab_lock);
 	while ((maxfd-1 >= 0) && !fdtab[maxfd-1].owner)
 		maxfd--;
+	SPIN_UNLOCK(FDTAB_LOCK, &fdtab_lock);
 }
 
 /* Deletes an FD from the fdsets, and recomputes the maxfd limit.
@@ -225,10 +237,19 @@
 {
 	int fd, entry, e;
 
+	if (!fd_cache_num)
+		return;
+
+	RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
 	for (entry = 0; entry < fd_cache_num; ) {
 		fd = fd_cache[entry];
-		e = fdtab[fd].state;
+
+		if (SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock))
+			goto next;
+
+		RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
 
+		e = fdtab[fd].state;
 		fdtab[fd].ev &= FD_POLL_STICKY;
 
 		if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
@@ -237,18 +258,25 @@
 		if ((e & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
 			fdtab[fd].ev |= FD_POLL_OUT;
 
-		if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev)
+		if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) {
+			SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 			fdtab[fd].iocb(fd);
-		else
+		}
+		else {
 			fd_release_cache_entry(fd);
+			SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+		}
 
+		RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
 		/* If the fd was removed from the cache, it has been
 		 * replaced by the next one that we don't want to skip !
 		 */
 		if (entry < fd_cache_num && fd_cache[entry] != fd)
 			continue;
+	  next:
 		entry++;
 	}
+	RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
 }
 
 /* disable the specified poller */
@@ -261,6 +289,21 @@
 			pollers[p].pref = 0;
 }
 
+/* Initialize the pollers per thread */
+static int init_pollers_per_thread()
+{
+	if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
+		return 0;
+	return 1;
+}
+
+/* Deinitialize the pollers per thread */
+static void deinit_pollers_per_thread()
+{
+	free(fd_updt);
+	fd_updt = NULL;
+}
+
 /*
  * Initialize the pollers till the best one is found.
  * If none works, returns 0, otherwise 1.
@@ -279,9 +322,21 @@
 	if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL)
 		goto fail_cache;
 
-	if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
+	if (global.nbthread > 1) {
+		hap_register_per_thread_init(init_pollers_per_thread);
+		hap_register_per_thread_deinit(deinit_pollers_per_thread);
+	}
+	else if (!init_pollers_per_thread())
 		goto fail_updt;
 
+	for (p = 0; p < global.maxsock; p++)
+		SPIN_INIT(&fdtab[p].lock);
+
+	//memset(fd_cache, -1, global.maxsock);
+
+	SPIN_INIT(&fdtab_lock);
+	RWLOCK_INIT(&fdcache_lock);
+	SPIN_INIT(&poll_lock);
 	do {
 		bp = NULL;
 		for (p = 0; p < nbpollers; p++)
@@ -316,16 +371,24 @@
 	struct poller *bp;
 	int p;
 
+	for (p = 0; p < global.maxsock; p++)
+		SPIN_DESTROY(&fdtab[p].lock);
+
 	for (p = 0; p < nbpollers; p++) {
 		bp = &pollers[p];
 
 		if (bp && bp->pref)
 			bp->term(bp);
 	}
+
 	free(fd_updt);  fd_updt  = NULL;
 	free(fd_cache); fd_cache = NULL;
 	free(fdinfo);   fdinfo   = NULL;
 	free(fdtab);    fdtab    = NULL;
+
+	SPIN_DESTROY(&fdtab_lock);
+	RWLOCK_DESTROY(&fdcache_lock);
+	SPIN_DESTROY(&poll_lock);
 }
 
 /*