MAJOR: threads/fd: Make fd stuffs thread-safe
Many changes have been made to do so. First, the fd_updt array, where all
pending FDs for polling are stored, is now a thread-local array. Then 3 locks
have been added to protect, respectively, the fdtab array, the fd_cache array
and poll information. In addition, a lock for each entry in the fdtab array has
been added to protect all accesses to a specific FD or its information.
For pollers, according to the poller, the way to manage the concurrency is
different. There is a poller loop on each thread. So the set of monitored FDs
may need to be protected. epoll and kqueue are thread-safe per-se, so there few
things to do to protect these pollers. This is not possible with select and
poll, so there is no sharing between the threads. The poller on each thread is
independant from others.
Finally, per-thread init/deinit functions are used for each pollers and for FD
part for manage thread-local ressources.
Now, you must be carefull when a FD is created during the HAProxy startup. All
update on the FD state must be made in the threads context and never before
their creation. This is mandatory because fd_updt array is thread-local and
initialized only for threads. Because there is no pollers for the main one, this
array remains uninitialized in this context. For this reason, listeners are now
enabled in run_thread_poll_loop function, just like the worker pipe.
diff --git a/src/haproxy.c b/src/haproxy.c
index 0733189..7f48514 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -2202,7 +2202,35 @@
deinit_pollers();
} /* end deinit() */
+void mworker_pipe_handler(int fd)
+{
+ char c;
+
+ while (read(fd, &c, 1) == -1) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EAGAIN) {
+ fd_cant_recv(fd);
+ return;
+ }
+ break;
+ }
+
+ deinit();
+ exit(EXIT_FAILURE);
+ return;
+}
+
+void mworker_pipe_register(int pipefd[2])
+{
+ close(mworker_pipe[1]); /* close the write end of the master pipe in the children */
+ fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
+ fdtab[mworker_pipe[0]].owner = mworker_pipe;
+ fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler;
+ fd_insert(mworker_pipe[0]);
+ fd_want_recv(mworker_pipe[0]);
+}
static void sync_poll_loop()
{
@@ -2278,6 +2306,10 @@
}
}
+ if (global.mode & MODE_MWORKER)
+ mworker_pipe_register(mworker_pipe);
+
+ protocol_enable_all();
THREAD_SYNC_ENABLE();
run_poll_loop();
@@ -2320,37 +2352,6 @@
return t;
}
-void mworker_pipe_handler(int fd)
-{
- char c;
-
- while (read(fd, &c, 1) == -1) {
- if (errno == EINTR)
- continue;
- if (errno == EAGAIN) {
- fd_cant_recv(fd);
- return;
- }
- break;
- }
-
- deinit();
- exit(EXIT_FAILURE);
- return;
-}
-
-void mworker_pipe_register(int pipefd[2])
-{
- close(mworker_pipe[1]); /* close the write end of the master pipe in the children */
-
- fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
- fdtab[mworker_pipe[0]].owner = mworker_pipe;
- fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler;
- fd_insert(mworker_pipe[0]);
- fd_want_recv(mworker_pipe[0]);
- }
-
-
int main(int argc, char **argv)
{
int err, retry;
@@ -2798,11 +2799,6 @@
}
global.mode &= ~MODE_STARTING;
-
- if (global.mode & MODE_MWORKER)
- mworker_pipe_register(mworker_pipe);
-
- protocol_enable_all();
/*
* That's it : the central polling loop. Run until we stop.
*/
@@ -2827,6 +2823,12 @@
}
else {
tid = 0;
+
+ if (global.mode & MODE_MWORKER)
+ mworker_pipe_register(mworker_pipe);
+
+ protocol_enable_all();
+
run_poll_loop();
}