MAJOR: threads/fd: Make fd stuffs thread-safe

Many changes have been made to do so. First, the fd_updt array, where all
pending FDs for polling are stored, is now a thread-local array. Then 3 locks
have been added to protect, respectively, the fdtab array, the fd_cache array
and poll information. In addition, a lock for each entry in the fdtab array has
been added to protect all accesses to a specific FD or its information.

For pollers, according to the poller, the way to manage the concurrency is
different. There is a poller loop on each thread. So the set of monitored FDs
may need to be protected. epoll and kqueue are thread-safe per-se, so there few
things to do to protect these pollers. This is not possible with select and
poll, so there is no sharing between the threads. The poller on each thread is
independant from others.

Finally, per-thread init/deinit functions are used for each pollers and for FD
part for manage thread-local ressources.

Now, you must be carefull when a FD is created during the HAProxy startup. All
update on the FD state must be made in the threads context and never before
their creation. This is mandatory because fd_updt array is thread-local and
initialized only for threads. Because there is no pollers for the main one, this
array remains uninitialized in this context. For this reason, listeners are now
enabled in run_thread_poll_loop function, just like the worker pipe.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 3e2a350..a2c047e 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -138,6 +138,10 @@
 
 enum lock_label {
 	THREAD_SYNC_LOCK = 0,
+	FDTAB_LOCK,
+	FDCACHE_LOCK,
+	FD_LOCK,
+	POLL_LOCK,
 	POOL_LOCK,
 	LOCK_LABELS
 };
@@ -221,7 +225,8 @@
 
 static inline void show_lock_stats()
 {
-	const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "POOL"};
+	const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
+					   "POOL" };
 	int lbl;
 
 	for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
diff --git a/include/proto/fd.h b/include/proto/fd.h
index 56e2082..e47d8fd 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -28,13 +28,22 @@
 #include <unistd.h>
 
 #include <common/config.h>
+
 #include <types/fd.h>
 
 /* public variables */
+
 extern unsigned int *fd_cache;      // FD events cache
-extern unsigned int *fd_updt;       // FD updates list
 extern int fd_cache_num;            // number of events in the cache
-extern int fd_nbupdt;               // number of updates in the list
+
+extern THREAD_LOCAL int *fd_updt;  // FD updates list
+extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
+
+#ifdef USE_THREAD
+HA_SPINLOCK_T fdtab_lock;      /* global lock to protect fdtab array */
+HA_RWLOCK_T   fdcache_lock;    /* global lock to protect fd_cache array */
+HA_SPINLOCK_T poll_lock;       /* global lock to protect poll info */
+#endif
 
 /* Deletes an FD from the fdsets, and recomputes the maxfd limit.
  * The file descriptor is also closed.
@@ -104,11 +113,14 @@
  */
 static inline void fd_alloc_cache_entry(const int fd)
 {
+	RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
 	if (fdtab[fd].cache)
-		return;
+		goto end;
 	fd_cache_num++;
 	fdtab[fd].cache = fd_cache_num;
 	fd_cache[fd_cache_num-1] = fd;
+  end:
+	RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
 }
 
 /* Removes entry used by fd <fd> from the FD cache and replaces it with the
@@ -119,9 +131,10 @@
 {
 	unsigned int pos;
 
+	RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
 	pos = fdtab[fd].cache;
 	if (!pos)
-		return;
+		goto end;
 	fdtab[fd].cache = 0;
 	fd_cache_num--;
 	if (likely(pos <= fd_cache_num)) {
@@ -130,6 +143,8 @@
 		fd_cache[pos - 1] = fd;
 		fdtab[fd].cache = pos;
 	}
+  end:
+	RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
 }
 
 /* Computes the new polled status based on the active and ready statuses, for
@@ -252,46 +267,56 @@
 /* Disable processing recv events on fd <fd> */
 static inline void fd_stop_recv(int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (fd_recv_active(fd)) {
 		fdtab[fd].state &= ~FD_EV_ACTIVE_R;
 		fd_update_cache(fd); /* need an update entry to change the state */
 	}
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Disable processing send events on fd <fd> */
 static inline void fd_stop_send(int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (fd_send_active(fd)) {
 		fdtab[fd].state &= ~FD_EV_ACTIVE_W;
 		fd_update_cache(fd); /* need an update entry to change the state */
 	}
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Disable processing of events on fd <fd> for both directions. */
 static inline void fd_stop_both(int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (fd_active(fd)) {
 		fdtab[fd].state &= ~FD_EV_ACTIVE_RW;
 		fd_update_cache(fd); /* need an update entry to change the state */
 	}
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
 static inline void fd_cant_recv(const int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (fd_recv_ready(fd)) {
 		fdtab[fd].state &= ~FD_EV_READY_R;
 		fd_update_cache(fd); /* need an update entry to change the state */
 	}
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> can receive anymore without polling. */
 static inline void fd_may_recv(const int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (!fd_recv_ready(fd)) {
 		fdtab[fd].state |= FD_EV_READY_R;
 		fd_update_cache(fd); /* need an update entry to change the state */
 	}
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Disable readiness when polled. This is useful to interrupt reading when it
@@ -301,54 +326,66 @@
  */
 static inline void fd_done_recv(const int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (fd_recv_polled(fd) && fd_recv_ready(fd)) {
 		fdtab[fd].state &= ~FD_EV_READY_R;
 		fd_update_cache(fd); /* need an update entry to change the state */
 	}
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> cannot send anymore without polling (EAGAIN detected). */
 static inline void fd_cant_send(const int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (fd_send_ready(fd)) {
 		fdtab[fd].state &= ~FD_EV_READY_W;
 		fd_update_cache(fd); /* need an update entry to change the state */
 	}
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
 static inline void fd_may_send(const int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (!fd_send_ready(fd)) {
 		fdtab[fd].state |= FD_EV_READY_W;
 		fd_update_cache(fd); /* need an update entry to change the state */
 	}
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Prepare FD <fd> to try to receive */
 static inline void fd_want_recv(int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (!fd_recv_active(fd)) {
 		fdtab[fd].state |= FD_EV_ACTIVE_R;
 		fd_update_cache(fd); /* need an update entry to change the state */
 	}
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Prepare FD <fd> to try to send */
 static inline void fd_want_send(int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	if (!fd_send_active(fd)) {
 		fdtab[fd].state |= FD_EV_ACTIVE_W;
 		fd_update_cache(fd); /* need an update entry to change the state */
 	}
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 }
 
 /* Update events seen for FD <fd> and its state if needed. This should be called
  * by the poller to set FD_POLL_* flags. */
 static inline void fd_update_events(int fd, int evts)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	fdtab[fd].ev &= FD_POLL_STICKY;
 	fdtab[fd].ev |= evts;
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
 
 	if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
 		fd_may_recv(fd);
@@ -360,13 +397,19 @@
 /* Prepares <fd> for being polled */
 static inline void fd_insert(int fd)
 {
+	SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
 	fdtab[fd].ev = 0;
 	fdtab[fd].new = 1;
 	fdtab[fd].updated = 0;
 	fdtab[fd].linger_risk = 0;
 	fdtab[fd].cloned = 0;
+	fdtab[fd].cache = 0;
+	SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+
+	SPIN_LOCK(FDTAB_LOCK, &fdtab_lock);
 	if (fd + 1 > maxfd)
 		maxfd = fd + 1;
+	SPIN_UNLOCK(FDTAB_LOCK, &fdtab_lock);
 }
 
 
diff --git a/include/types/fd.h b/include/types/fd.h
index 2bd7c07..7042dab 100644
--- a/include/types/fd.h
+++ b/include/types/fd.h
@@ -23,6 +23,7 @@
 #define _TYPES_FD_H
 
 #include <common/config.h>
+#include <common/hathreads.h>
 #include <types/port_range.h>
 
 /* Direction for each FD event update */
@@ -93,6 +94,9 @@
 struct fdtab {
 	void (*iocb)(int fd);                /* I/O handler */
 	void *owner;                         /* the connection or listener associated with this fd, NULL if closed */
+#ifdef USE_THREAD
+	HA_SPINLOCK_T lock;
+#endif
 	unsigned int  cache;                 /* position+1 in the FD cache. 0=not in cache. */
 	unsigned char state;                 /* FD state for read and write directions (2*3 bits) */
 	unsigned char ev;                    /* event seen in return of poll() : FD_POLL_* */