MINOR: pollers: Add a way to wake a thread sleeping in the poller.
Add a new pipe, one per thread, so that we can write on it to wake a thread
sleeping in a poller, and use it to wake threads supposed to take care of a
task, if they are all sleeping.
diff --git a/src/cli.c b/src/cli.c
index d028429..739107d 100644
--- a/src/cli.c
+++ b/src/cli.c
@@ -890,6 +890,7 @@
(fdt.iocb == dgram_fd_handler) ? "dgram_fd_handler" :
(fdt.iocb == listener_accept) ? "listener_accept" :
(fdt.iocb == thread_sync_io_handler) ? "thread_sync_io_handler" :
+ (fdt.iocb == poller_pipe_io_handler) ? "poller_pipe_io_handler" :
"unknown");
if (fdt.iocb == conn_fd_handler) {
diff --git a/src/fd.c b/src/fd.c
index 3b023a8..cbb7b47 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -147,6 +147,7 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
+#include <fcntl.h>
#include <sys/types.h>
#include <common/compat.h>
@@ -176,6 +177,8 @@
THREAD_LOCAL int *fd_updt = NULL; // FD updates list
THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list
+THREAD_LOCAL int poller_rd_pipe = -1; // Pipe to wake the thread
+int poller_wr_pipe[MAX_THREADS]; // Pipe to wake the threads
#define _GET_NEXT(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->next
#define _GET_PREV(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->prev
@@ -461,11 +464,31 @@
pollers[p].pref = 0;
}
+void poller_pipe_io_handler(int fd)
+{
+ char buf[1024];
+ /* Flush the pipe */
+ while (read(fd, buf, sizeof(buf)) > 0);
+ fd_cant_recv(fd);
+}
+
/* Initialize the pollers per thread */
static int init_pollers_per_thread()
{
+ int mypipe[2];
if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
return 0;
+ if (pipe(mypipe) < 0) {
+ free(fd_updt);
+ fd_updt = NULL;
+ return 0;
+ }
+ poller_rd_pipe = mypipe[0];
+ poller_wr_pipe[tid] = mypipe[1];
+ fcntl(poller_rd_pipe, F_SETFL, O_NONBLOCK);
+ fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler,
+ tid_bit);
+ fd_want_recv(poller_rd_pipe);
return 1;
}
diff --git a/src/haproxy.c b/src/haproxy.c
index e0e8791..a174391 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -123,6 +123,7 @@
int relative_pid = 1; /* process id starting at 1 */
unsigned long pid_bit = 1; /* bit corresponding to the process id */
+volatile unsigned long sleeping_thread_mask; /* Threads that are about to sleep in poll() */
/* global options */
struct global global = {
.hard_stop_after = TICK_ETERNITY,
@@ -2427,11 +2428,20 @@
activity[tid].wake_tasks++;
else if (signal_queue_len && tid == 0)
activity[tid].wake_signal++;
- else
- exp = next;
+ else {
+ HA_ATOMIC_OR(&sleeping_thread_mask, tid_bit);
+ __ha_barrier_store();
+ if (active_tasks_mask & tid_bit) {
+ activity[tid].wake_tasks++;
+ HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
+ } else
+ exp = next;
+ }
/* The poller will ensure it returns around <next> */
cur_poller.poll(&cur_poller, exp);
+ if (sleeping_thread_mask & tid_bit)
+ HA_ATOMIC_AND(&sleeping_thread_mask, ~tid_bit);
fd_process_cached_events();
diff --git a/src/task.c b/src/task.c
index 672730b..6e7441f 100644
--- a/src/task.c
+++ b/src/task.c
@@ -23,6 +23,7 @@
#include <proto/proxy.h>
#include <proto/stream.h>
#include <proto/task.h>
+#include <proto/fd.h>
struct pool_head *pool_head_task;
struct pool_head *pool_head_tasklet;
@@ -70,6 +71,7 @@
{
void *expected = NULL;
int *rq_size;
+ unsigned long old_active_mask;
#ifdef USE_THREAD
if (root == &rqueue) {
@@ -125,6 +127,7 @@
__ha_barrier_store();
}
#endif
+ old_active_mask = active_tasks_mask;
HA_ATOMIC_OR(&active_tasks_mask, t->thread_mask);
t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1);
@@ -152,6 +155,13 @@
rqueue_size[nb]++;
}
+ /* If all threads that are supposed to handle this task are sleeping,
+ * wake one.
+ */
+ if ((((t->thread_mask & all_threads_mask) & sleeping_thread_mask) ==
+ (t->thread_mask & all_threads_mask)) &&
+ !(t->thread_mask & old_active_mask))
+ wake_thread(my_ffsl((t->thread_mask & all_threads_mask) &~ tid_bit) - 1);
return;
}