MINOR: pollers: Add a way to wake a thread sleeping in the poller.
Add a new pipe, one per thread, so that we can write on it to wake a thread
sleeping in a poller, and use it to wake threads supposed to take care of a
task, if they are all sleeping.
diff --git a/src/fd.c b/src/fd.c
index 3b023a8..cbb7b47 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -147,6 +147,7 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
+#include <fcntl.h>
#include <sys/types.h>
#include <common/compat.h>
@@ -176,6 +177,8 @@
THREAD_LOCAL int *fd_updt = NULL; // FD updates list
THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list
+THREAD_LOCAL int poller_rd_pipe = -1; // Pipe to wake the thread
+int poller_wr_pipe[MAX_THREADS]; // Pipe to wake the threads
#define _GET_NEXT(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->next
#define _GET_PREV(fd, off) ((struct fdlist_entry *)(void *)((char *)(&fdtab[fd]) + off))->prev
@@ -461,11 +464,31 @@
pollers[p].pref = 0;
}
+void poller_pipe_io_handler(int fd)
+{
+ char buf[1024];
+ /* Flush the pipe */
+ while (read(fd, buf, sizeof(buf)) > 0);
+ fd_cant_recv(fd);
+}
+
/* Initialize the pollers per thread */
static int init_pollers_per_thread()
{
+ int mypipe[2];
if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
return 0;
+ if (pipe(mypipe) < 0) {
+ free(fd_updt);
+ fd_updt = NULL;
+ return 0;
+ }
+ poller_rd_pipe = mypipe[0];
+ poller_wr_pipe[tid] = mypipe[1];
+ fcntl(poller_rd_pipe, F_SETFL, O_NONBLOCK);
+ fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler,
+ tid_bit);
+ fd_want_recv(poller_rd_pipe);
return 1;
}