MINOR: pollers: Add a way to wake a thread sleeping in the poller.

Add a new pipe, one per thread, so that we can write on it to wake a thread
sleeping in a poller, and use it to wake threads supposed to take care of a
task, if they are all sleeping.
diff --git a/include/proto/fd.h b/include/proto/fd.h
index c5a03f7..a4cee32 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -45,6 +45,8 @@
 extern THREAD_LOCAL int *fd_updt;  // FD updates list
 extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
 
+extern int poller_wr_pipe[MAX_THREADS];
+
 __decl_hathreads(extern HA_RWLOCK_T   __attribute__((aligned(64))) fdcache_lock);    /* global lock to protect fd_cache array */
 
 /* Deletes an FD from the fdsets.
@@ -60,6 +62,8 @@
 /* disable the specified poller */
 void disable_poller(const char *poller_name);
 
+void poller_pipe_io_handler(int fd);
+
 /*
  * Initialize the pollers till the best one is found.
  * If none works, returns 0, otherwise 1.
@@ -516,6 +520,13 @@
 	return evts[fd / (8*sizeof(*evts))] & (1U << (fd & (8*sizeof(*evts) - 1)));
 }
 
+static inline void wake_thread(int tid)
+{
+	char c = 'c';
+
+	shut_your_big_mouth_gcc(write(poller_wr_pipe[tid], &c, 1));
+}
+
 
 #endif /* _PROTO_FD_H */
 
diff --git a/include/types/global.h b/include/types/global.h
index a684ea6..616e8d3 100644
--- a/include/types/global.h
+++ b/include/types/global.h
@@ -219,6 +219,7 @@
 extern struct list global_listener_queue; /* list of the temporarily limited listeners */
 extern struct task *global_listener_queue_task;
 extern unsigned int warned;     /* bitfield of a few warnings to emit just once */
+extern volatile unsigned long sleeping_thread_mask;
 
 /* bit values to go with "warned" above */
 #define WARN_BLOCK_DEPRECATED       0x00000001
diff --git a/src/cli.c b/src/cli.c
index d028429..739107d 100644
--- a/src/cli.c
+++ b/src/cli.c
@@ -890,6 +890,7 @@
 			     (fdt.iocb == dgram_fd_handler) ? "dgram_fd_handler" :
 			     (fdt.iocb == listener_accept)  ? "listener_accept" :
 			     (fdt.iocb == thread_sync_io_handler) ? "thread_sync_io_handler" :
+			     (fdt.iocb == poller_pipe_io_handler) ? "poller_pipe_io_handler" :
 			     "unknown");
 
 		if (fdt.iocb == conn_fd_handler) {
diff --git a/src/fd.c b/src/fd.c
index 3b023a8..cbb7b47 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -147,6 +147,7 @@
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
+#include <fcntl.h>
 #include <sys/types.h>
 
 #include <common/compat.h>
@@ -176,6 +177,8 @@
 
 THREAD_LOCAL int *fd_updt  = NULL;  // FD updates list
 THREAD_LOCAL int  fd_nbupdt = 0;   // number of updates in the list
+THREAD_LOCAL int poller_rd_pipe = -1; // Pipe to wake the thread
+int poller_wr_pipe[MAX_THREADS]; // Pipe to wake the threads
 
 #define _GET_NEXT(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->next
 #define _GET_PREV(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->prev
@@ -461,11 +464,31 @@
 			pollers[p].pref = 0;
 }
 
+void poller_pipe_io_handler(int fd)
+{
+	char buf[1024];
+	/* Flush the pipe */
+	while (read(fd, buf, sizeof(buf)) > 0);
+	fd_cant_recv(fd);
+}
+
 /* Initialize the pollers per thread */
 static int init_pollers_per_thread()
 {
+	int mypipe[2];
 	if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
 		return 0;
+	if (pipe(mypipe) < 0) {
+		free(fd_updt);
+		fd_updt = NULL;
+		return 0;
+	}
+	poller_rd_pipe = mypipe[0];
+	poller_wr_pipe[tid] = mypipe[1];
+	fcntl(poller_rd_pipe, F_SETFL, O_NONBLOCK);
+	fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler,
+	    tid_bit);
+	fd_want_recv(poller_rd_pipe);
 	return 1;
 }
 
diff --git a/src/haproxy.c b/src/haproxy.c
index e0e8791..a174391 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -123,6 +123,7 @@
 int  relative_pid = 1;		/* process id starting at 1 */
 unsigned long pid_bit = 1;      /* bit corresponding to the process id */
 
+volatile unsigned long sleeping_thread_mask; /* Threads that are about to sleep in poll() */
 /* global options */
 struct global global = {
 	.hard_stop_after = TICK_ETERNITY,
@@ -2427,11 +2428,20 @@
 			activity[tid].wake_tasks++;
 		else if (signal_queue_len && tid == 0)
 			activity[tid].wake_signal++;
-		else
-			exp = next;
+		else {
+			HA_ATOMIC_OR(&sleeping_thread_mask, tid_bit);
+			__ha_barrier_store();
+			if (active_tasks_mask & tid_bit) {
+				activity[tid].wake_tasks++;
+				HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
+			} else
+				exp = next;
+		}
 
 		/* The poller will ensure it returns around <next> */
 		cur_poller.poll(&cur_poller, exp);
+		if (sleeping_thread_mask & tid_bit)
+			HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
 		fd_process_cached_events();
 
 
diff --git a/src/task.c b/src/task.c
index 672730b..6e7441f 100644
--- a/src/task.c
+++ b/src/task.c
@@ -23,6 +23,7 @@
 #include <proto/proxy.h>
 #include <proto/stream.h>
 #include <proto/task.h>
+#include <proto/fd.h>
 
 struct pool_head *pool_head_task;
 struct pool_head *pool_head_tasklet;
@@ -70,6 +71,7 @@
 {
 	void *expected = NULL;
 	int *rq_size;
+	unsigned long old_active_mask;
 
 #ifdef USE_THREAD
 	if (root == &rqueue) {
@@ -125,6 +127,7 @@
 		__ha_barrier_store();
 	}
 #endif
+	old_active_mask = active_tasks_mask;
 	HA_ATOMIC_OR(&active_tasks_mask, t->thread_mask);
 	t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1);
 
@@ -152,6 +155,13 @@
 
 		rqueue_size[nb]++;
 	}
+	/* If all threads that are supposed to handle this task are sleeping,
+	 * wake one.
+	 */
+	if ((((t->thread_mask & all_threads_mask) & sleeping_thread_mask) ==
+	    (t->thread_mask & all_threads_mask)) &&
+	    !(t->thread_mask & old_active_mask))
+		wake_thread(my_ffsl((t->thread_mask & all_threads_mask) &~ tid_bit) - 1);
 	return;
 }