MAJOR: threads/fd: Make fd stuffs thread-safe
Many changes have been made to do so. First, the fd_updt array, where all
pending FDs for polling are stored, is now a thread-local array. Then 3 locks
have been added to protect, respectively, the fdtab array, the fd_cache array
and poll information. In addition, a lock for each entry in the fdtab array has
been added to protect all accesses to a specific FD or its information.
For pollers, according to the poller, the way to manage the concurrency is
different. There is a poller loop on each thread. So the set of monitored FDs
may need to be protected. epoll and kqueue are thread-safe per-se, so there few
things to do to protect these pollers. This is not possible with select and
poll, so there is no sharing between the threads. The poller on each thread is
independant from others.
Finally, per-thread init/deinit functions are used for each pollers and for FD
part for manage thread-local ressources.
Now, you must be carefull when a FD is created during the HAProxy startup. All
update on the FD state must be made in the threads context and never before
their creation. This is mandatory because fd_updt array is thread-local and
initialized only for threads. Because there is no pollers for the main one, this
array remains uninitialized in this context. For this reason, listeners are now
enabled in run_thread_poll_loop function, just like the worker pipe.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 3e2a350..a2c047e 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -138,6 +138,10 @@
enum lock_label {
THREAD_SYNC_LOCK = 0,
+ FDTAB_LOCK,
+ FDCACHE_LOCK,
+ FD_LOCK,
+ POLL_LOCK,
POOL_LOCK,
LOCK_LABELS
};
@@ -221,7 +225,8 @@
static inline void show_lock_stats()
{
- const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "POOL"};
+ const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
+ "POOL" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
diff --git a/include/proto/fd.h b/include/proto/fd.h
index 56e2082..e47d8fd 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -28,13 +28,22 @@
#include <unistd.h>
#include <common/config.h>
+
#include <types/fd.h>
/* public variables */
+
extern unsigned int *fd_cache; // FD events cache
-extern unsigned int *fd_updt; // FD updates list
extern int fd_cache_num; // number of events in the cache
-extern int fd_nbupdt; // number of updates in the list
+
+extern THREAD_LOCAL int *fd_updt; // FD updates list
+extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
+
+#ifdef USE_THREAD
+HA_SPINLOCK_T fdtab_lock; /* global lock to protect fdtab array */
+HA_RWLOCK_T fdcache_lock; /* global lock to protect fd_cache array */
+HA_SPINLOCK_T poll_lock; /* global lock to protect poll info */
+#endif
/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
* The file descriptor is also closed.
@@ -104,11 +113,14 @@
*/
static inline void fd_alloc_cache_entry(const int fd)
{
+ RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
if (fdtab[fd].cache)
- return;
+ goto end;
fd_cache_num++;
fdtab[fd].cache = fd_cache_num;
fd_cache[fd_cache_num-1] = fd;
+ end:
+ RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
}
/* Removes entry used by fd <fd> from the FD cache and replaces it with the
@@ -119,9 +131,10 @@
{
unsigned int pos;
+ RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
pos = fdtab[fd].cache;
if (!pos)
- return;
+ goto end;
fdtab[fd].cache = 0;
fd_cache_num--;
if (likely(pos <= fd_cache_num)) {
@@ -130,6 +143,8 @@
fd_cache[pos - 1] = fd;
fdtab[fd].cache = pos;
}
+ end:
+ RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
}
/* Computes the new polled status based on the active and ready statuses, for
@@ -252,46 +267,56 @@
/* Disable processing recv events on fd <fd> */
static inline void fd_stop_recv(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_recv_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable processing send events on fd <fd> */
static inline void fd_stop_send(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_send_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable processing of events on fd <fd> for both directions. */
static inline void fd_stop_both(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_RW;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
static inline void fd_cant_recv(const int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_recv_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> can receive anymore without polling. */
static inline void fd_may_recv(const int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_recv_ready(fd)) {
fdtab[fd].state |= FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable readiness when polled. This is useful to interrupt reading when it
@@ -301,54 +326,66 @@
*/
static inline void fd_done_recv(const int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_recv_polled(fd) && fd_recv_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> cannot send anymore without polling (EAGAIN detected). */
static inline void fd_cant_send(const int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_send_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
static inline void fd_may_send(const int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_send_ready(fd)) {
fdtab[fd].state |= FD_EV_READY_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Prepare FD <fd> to try to receive */
static inline void fd_want_recv(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_recv_active(fd)) {
fdtab[fd].state |= FD_EV_ACTIVE_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Prepare FD <fd> to try to send */
static inline void fd_want_send(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_send_active(fd)) {
fdtab[fd].state |= FD_EV_ACTIVE_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Update events seen for FD <fd> and its state if needed. This should be called
* by the poller to set FD_POLL_* flags. */
static inline void fd_update_events(int fd, int evts)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].ev &= FD_POLL_STICKY;
fdtab[fd].ev |= evts;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
fd_may_recv(fd);
@@ -360,13 +397,19 @@
/* Prepares <fd> for being polled */
static inline void fd_insert(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].ev = 0;
fdtab[fd].new = 1;
fdtab[fd].updated = 0;
fdtab[fd].linger_risk = 0;
fdtab[fd].cloned = 0;
+ fdtab[fd].cache = 0;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+
+ SPIN_LOCK(FDTAB_LOCK, &fdtab_lock);
if (fd + 1 > maxfd)
maxfd = fd + 1;
+ SPIN_UNLOCK(FDTAB_LOCK, &fdtab_lock);
}
diff --git a/include/types/fd.h b/include/types/fd.h
index 2bd7c07..7042dab 100644
--- a/include/types/fd.h
+++ b/include/types/fd.h
@@ -23,6 +23,7 @@
#define _TYPES_FD_H
#include <common/config.h>
+#include <common/hathreads.h>
#include <types/port_range.h>
/* Direction for each FD event update */
@@ -93,6 +94,9 @@
struct fdtab {
void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */
+#ifdef USE_THREAD
+ HA_SPINLOCK_T lock;
+#endif
unsigned int cache; /* position+1 in the FD cache. 0=not in cache. */
unsigned char state; /* FD state for read and write directions (2*3 bits) */
unsigned char ev; /* event seen in return of poll() : FD_POLL_* */
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index 9e72802..0b815b4 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -28,13 +28,13 @@
/* private data */
-static struct epoll_event *epoll_events;
+static THREAD_LOCAL struct epoll_event *epoll_events = NULL;
static int epoll_fd;
/* This structure may be used for any purpose. Warning! do not use it in
* recursive functions !
*/
-static struct epoll_event ev;
+static THREAD_LOCAL struct epoll_event ev;
#ifndef EPOLLRDHUP
/* EPOLLRDHUP was defined late in libc, and it appeared in kernel 2.6.17 */
@@ -49,9 +49,8 @@
*/
REGPRM1 static void __fd_clo(int fd)
{
- if (unlikely(fdtab[fd].cloned)) {
+ if (unlikely(fdtab[fd].cloned))
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &ev);
- }
}
/*
@@ -68,18 +67,21 @@
/* first, scan the update list to find polling changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- fdtab[fd].updated = 0;
- fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ fdtab[fd].updated = 0;
+ fdtab[fd].new = 0;
+
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed */
- fdtab[fd].state = en;
if ((en & FD_EV_POLLED_RW) == 0) {
/* fd removed from poll list */
@@ -103,6 +105,7 @@
ev.events |= EPOLLOUT;
ev.data.fd = fd;
+
epoll_ctl(epoll_fd, opcode, fd, &ev);
}
}
@@ -154,7 +157,7 @@
/* always remap RDHUP to HUP as they're used similarly */
if (e & EPOLLRDHUP) {
- cur_poller.flags |= HAP_POLL_F_RDHUP;
+ HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP);
n |= FD_POLL_HUP;
}
fd_update_events(fd, n);
@@ -162,6 +165,19 @@
/* the caller will take care of cached events */
}
+static int init_epoll_per_thread()
+{
+ epoll_events = calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents);
+ if (epoll_events == NULL)
+ return 0;
+ return 1;
+}
+
+static void deinit_epoll_per_thread()
+{
+ free(epoll_events);
+}
+
/*
* Initialization of the epoll() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
@@ -175,10 +191,11 @@
if (epoll_fd < 0)
goto fail_fd;
- epoll_events = (struct epoll_event*)
- calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents);
-
- if (epoll_events == NULL)
+ if (global.nbthread > 1) {
+ hap_register_per_thread_init(init_epoll_per_thread);
+ hap_register_per_thread_deinit(deinit_epoll_per_thread);
+ }
+ else if (!init_epoll_per_thread())
goto fail_ee;
return 1;
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 02723cc..326d616 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -30,7 +30,7 @@
/* private data */
static int kqueue_fd;
-static struct kevent *kev = NULL;
+static THREAD_LOCAL struct kevent *kev = NULL;
/*
* kqueue() poller
@@ -46,19 +46,21 @@
/* first, scan the update list to find changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- fdtab[fd].updated = 0;
- fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ fdtab[fd].updated = 0;
+ fdtab[fd].new = 0;
+
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed */
- fdtab[fd].state = en;
-
if ((eo ^ en) & FD_EV_POLLED_R) {
/* read poll status changed */
if (en & FD_EV_POLLED_R) {
@@ -139,6 +141,21 @@
}
}
+
+static int init_kqueue_per_thread()
+{
+ /* we can have up to two events per fd (*/
+ kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
+ if (kev == NULL)
+ return 0;
+ return 1;
+}
+
+static void deinit_kqueue_per_thread()
+{
+ free(kev);
+}
+
/*
* Initialization of the kqueue() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
@@ -152,11 +169,13 @@
if (kqueue_fd < 0)
goto fail_fd;
- /* we can have up to two events per fd (*/
- kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
- if (kev == NULL)
+ if (global.nbthread > 1) {
+ hap_register_per_thread_init(init_kqueue_per_thread);
+ hap_register_per_thread_deinit(deinit_kqueue_per_thread);
+ }
+ else if (!init_kqueue_per_thread())
goto fail_kev;
-
+
return 1;
fail_kev:
diff --git a/src/ev_poll.c b/src/ev_poll.c
index 1cb8d2d..e16968b 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -35,8 +35,8 @@
static unsigned int *fd_evts[2];
/* private data */
-static struct pollfd *poll_events = NULL;
-
+static THREAD_LOCAL int nbfd = 0;
+static THREAD_LOCAL struct pollfd *poll_events = NULL;
static inline void hap_fd_set(int fd, unsigned int *evts)
{
@@ -50,8 +50,10 @@
REGPRM1 static void __fd_clo(int fd)
{
+ SPIN_LOCK(POLL_LOCK, &poll_lock);
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
+ SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
/*
@@ -60,7 +62,7 @@
REGPRM2 static void _do_poll(struct poller *p, int exp)
{
int status;
- int fd, nbfd;
+ int fd;
int wait_time;
int updt_idx, en, eo;
int fds, count;
@@ -70,19 +72,22 @@
/* first, scan the update list to find changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- fdtab[fd].updated = 0;
- fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ fdtab[fd].updated = 0;
+ fdtab[fd].new = 0;
+
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed, update the lists */
- fdtab[fd].state = en;
-
+ SPIN_LOCK(POLL_LOCK, &poll_lock);
if ((eo & ~en) & FD_EV_POLLED_R)
hap_fd_clr(fd, fd_evts[DIR_RD]);
else if ((en & ~eo) & FD_EV_POLLED_R)
@@ -92,6 +97,7 @@
hap_fd_clr(fd, fd_evts[DIR_WR]);
else if ((en & ~eo) & FD_EV_POLLED_W)
hap_fd_set(fd, fd_evts[DIR_WR]);
+ SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
}
fd_nbupdt = 0;
@@ -100,7 +106,7 @@
for (fds = 0; (fds * 8*sizeof(**fd_evts)) < maxfd; fds++) {
rn = fd_evts[DIR_RD][fds];
wn = fd_evts[DIR_WR][fds];
-
+
if (!(rn|wn))
continue;
@@ -112,9 +118,9 @@
poll_events[nbfd].events = (sr ? (POLLIN | POLLRDHUP) : 0) | (sw ? POLLOUT : 0);
nbfd++;
}
- }
+ }
}
-
+
/* now let's wait for events */
if (!exp)
wait_time = MAX_DELAY_MS;
@@ -135,7 +141,7 @@
unsigned int n;
int e = poll_events[count].revents;
fd = poll_events[count].fd;
-
+
if (!(e & ( POLLOUT | POLLIN | POLLERR | POLLHUP | POLLRDHUP )))
continue;
@@ -161,15 +167,28 @@
/* always remap RDHUP to HUP as they're used similarly */
if (e & POLLRDHUP) {
- cur_poller.flags |= HAP_POLL_F_RDHUP;
+ HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP);
n |= FD_POLL_HUP;
}
-
fd_update_events(fd, n);
}
}
+
+static int init_poll_per_thread()
+{
+ poll_events = calloc(1, sizeof(struct pollfd) * global.maxsock);
+ if (poll_events == NULL)
+ return 0;
+ return 1;
+}
+
+static void deinit_poll_per_thread()
+{
+ free(poll_events);
+}
+
/*
* Initialization of the poll() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
@@ -183,14 +202,15 @@
p->private = NULL;
fd_evts_bytes = (global.maxsock + sizeof(**fd_evts) - 1) / sizeof(**fd_evts) * sizeof(**fd_evts);
- poll_events = calloc(1, sizeof(struct pollfd) * global.maxsock);
-
- if (poll_events == NULL)
+ if (global.nbthread > 1) {
+ hap_register_per_thread_init(init_poll_per_thread);
+ hap_register_per_thread_deinit(deinit_poll_per_thread);
+ }
+ else if (!init_poll_per_thread())
goto fail_pe;
-
+
if ((fd_evts[DIR_RD] = calloc(1, fd_evts_bytes)) == NULL)
goto fail_srevt;
-
if ((fd_evts[DIR_WR] = calloc(1, fd_evts_bytes)) == NULL)
goto fail_swevt;
diff --git a/src/ev_select.c b/src/ev_select.c
index cf80ac8..97d4286 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -24,14 +24,17 @@
#include <proto/fd.h>
+/* private data */
static fd_set *fd_evts[2];
-static fd_set *tmp_evts[2];
+static THREAD_LOCAL fd_set *tmp_evts[2];
/* Immediately remove the entry upon close() */
REGPRM1 static void __fd_clo(int fd)
{
+ SPIN_LOCK(POLL_LOCK, &poll_lock);
FD_CLR(fd, fd_evts[DIR_RD]);
FD_CLR(fd, fd_evts[DIR_WR]);
+ SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
/*
@@ -43,27 +46,30 @@
int fd, i;
struct timeval delta;
int delta_ms;
- int readnotnull, writenotnull;
int fds;
int updt_idx, en, eo;
char count;
-
+ int readnotnull, writenotnull;
+
/* first, scan the update list to find changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- fdtab[fd].updated = 0;
- fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ fdtab[fd].updated = 0;
+ fdtab[fd].new = 0;
+
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed, update the lists */
- fdtab[fd].state = en;
-
+ SPIN_LOCK(POLL_LOCK, &poll_lock);
if ((eo & ~en) & FD_EV_POLLED_R)
FD_CLR(fd, fd_evts[DIR_RD]);
else if ((en & ~eo) & FD_EV_POLLED_R)
@@ -73,10 +79,28 @@
FD_CLR(fd, fd_evts[DIR_WR]);
else if ((en & ~eo) & FD_EV_POLLED_W)
FD_SET(fd, fd_evts[DIR_WR]);
+ SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
}
fd_nbupdt = 0;
+ /* let's restore fdset state */
+ readnotnull = 0; writenotnull = 0;
+ for (i = 0; i < (maxfd + FD_SETSIZE - 1)/(8*sizeof(int)); i++) {
+ readnotnull |= (*(((int*)tmp_evts[DIR_RD])+i) = *(((int*)fd_evts[DIR_RD])+i)) != 0;
+ writenotnull |= (*(((int*)tmp_evts[DIR_WR])+i) = *(((int*)fd_evts[DIR_WR])+i)) != 0;
+ }
+
+#if 0
+ /* just a verification code, needs to be removed for performance */
+ for (i=0; i<maxfd; i++) {
+ if (FD_ISSET(i, tmp_evts[DIR_RD]) != FD_ISSET(i, fd_evts[DIR_RD]))
+ abort();
+ if (FD_ISSET(i, tmp_evts[DIR_WR]) != FD_ISSET(i, fd_evts[DIR_WR]))
+ abort();
+ }
+#endif
+
delta_ms = 0;
delta.tv_sec = 0;
delta.tv_usec = 0;
@@ -94,30 +118,13 @@
delta.tv_usec = (delta_ms % 1000) * 1000;
}
- /* let's restore fdset state */
-
- readnotnull = 0; writenotnull = 0;
- for (i = 0; i < (maxfd + FD_SETSIZE - 1)/(8*sizeof(int)); i++) {
- readnotnull |= (*(((int*)tmp_evts[DIR_RD])+i) = *(((int*)fd_evts[DIR_RD])+i)) != 0;
- writenotnull |= (*(((int*)tmp_evts[DIR_WR])+i) = *(((int*)fd_evts[DIR_WR])+i)) != 0;
- }
-
- // /* just a verification code, needs to be removed for performance */
- // for (i=0; i<maxfd; i++) {
- // if (FD_ISSET(i, tmp_evts[DIR_RD]) != FD_ISSET(i, fd_evts[DIR_RD]))
- // abort();
- // if (FD_ISSET(i, tmp_evts[DIR_WR]) != FD_ISSET(i, fd_evts[DIR_WR]))
- // abort();
- //
- // }
-
gettimeofday(&before_poll, NULL);
status = select(maxfd,
readnotnull ? tmp_evts[DIR_RD] : NULL,
writenotnull ? tmp_evts[DIR_WR] : NULL,
NULL,
&delta);
-
+
tv_update_date(delta_ms, status);
measure_idle();
@@ -148,6 +155,28 @@
}
}
+static int init_select_per_thread()
+{
+ int fd_set_bytes;
+
+ fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE;
+ if ((tmp_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
+ goto fail;
+ if ((tmp_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
+ goto fail;
+ return 1;
+ fail:
+ free(tmp_evts[DIR_RD]);
+ free(tmp_evts[DIR_WR]);
+ return 0;
+}
+
+static void deinit_select_per_thread()
+{
+ free(tmp_evts[DIR_WR]);
+ free(tmp_evts[DIR_RD]);
+}
+
/*
* Initialization of the select() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
@@ -155,7 +184,7 @@
*/
REGPRM1 static int _do_init(struct poller *p)
{
- __label__ fail_swevt, fail_srevt, fail_wevt, fail_revt;
+ __label__ fail_swevt, fail_srevt, fail_revt;
int fd_set_bytes;
p->private = NULL;
@@ -164,16 +193,15 @@
goto fail_revt;
fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE;
-
- if ((tmp_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
+ if (global.nbthread > 1) {
+ hap_register_per_thread_init(init_select_per_thread);
+ hap_register_per_thread_deinit(deinit_select_per_thread);
+ }
+ else if (!init_select_per_thread())
goto fail_revt;
-
- if ((tmp_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
- goto fail_wevt;
if ((fd_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
goto fail_srevt;
-
if ((fd_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
goto fail_swevt;
@@ -183,7 +211,6 @@
free(fd_evts[DIR_RD]);
fail_srevt:
free(tmp_evts[DIR_WR]);
- fail_wevt:
free(tmp_evts[DIR_RD]);
fail_revt:
p->pref = 0;
diff --git a/src/fd.c b/src/fd.c
index 8dab497..6c53a3b 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -155,6 +155,7 @@
#include <types/global.h>
#include <proto/fd.h>
+#include <proto/log.h>
#include <proto/port_range.h>
struct fdtab *fdtab = NULL; /* array of all the file descriptors */
@@ -168,15 +169,23 @@
int nbpollers = 0;
unsigned int *fd_cache = NULL; // FD events cache
-unsigned int *fd_updt = NULL; // FD updates list
int fd_cache_num = 0; // number of events in the cache
-int fd_nbupdt = 0; // number of updates in the list
+
+THREAD_LOCAL int *fd_updt = NULL; // FD updates list
+THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list
+
+#ifdef USE_THREAD
+HA_SPINLOCK_T fdtab_lock; /* global lock to protect fdtab array */
+HA_RWLOCK_T fdcache_lock; /* global lock to protect fd_cache array */
+HA_SPINLOCK_T poll_lock; /* global lock to protect poll info */
+#endif
/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
* The file descriptor is also closed.
*/
static void fd_dodelete(int fd, int do_close)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fdtab[fd].linger_risk) {
/* this is generally set when connecting to servers */
setsockopt(fd, SOL_SOCKET, SO_LINGER,
@@ -195,9 +204,12 @@
fdtab[fd].new = 0;
if (do_close)
close(fd);
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ SPIN_LOCK(FDTAB_LOCK, &fdtab_lock);
while ((maxfd-1 >= 0) && !fdtab[maxfd-1].owner)
maxfd--;
+ SPIN_UNLOCK(FDTAB_LOCK, &fdtab_lock);
}
/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
@@ -225,10 +237,19 @@
{
int fd, entry, e;
+ if (!fd_cache_num)
+ return;
+
+ RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
for (entry = 0; entry < fd_cache_num; ) {
fd = fd_cache[entry];
- e = fdtab[fd].state;
+
+ if (SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock))
+ goto next;
+
+ RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
+ e = fdtab[fd].state;
fdtab[fd].ev &= FD_POLL_STICKY;
if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
@@ -237,18 +258,25 @@
if ((e & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
fdtab[fd].ev |= FD_POLL_OUT;
- if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev)
+ if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) {
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].iocb(fd);
- else
+ }
+ else {
fd_release_cache_entry(fd);
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ }
+ RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
/* If the fd was removed from the cache, it has been
* replaced by the next one that we don't want to skip !
*/
if (entry < fd_cache_num && fd_cache[entry] != fd)
continue;
+ next:
entry++;
}
+ RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
}
/* disable the specified poller */
@@ -261,6 +289,21 @@
pollers[p].pref = 0;
}
+/* Initialize the pollers per thread */
+static int init_pollers_per_thread()
+{
+ if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
+ return 0;
+ return 1;
+}
+
+/* Deinitialize the pollers per thread */
+static void deinit_pollers_per_thread()
+{
+ free(fd_updt);
+ fd_updt = NULL;
+}
+
/*
* Initialize the pollers till the best one is found.
* If none works, returns 0, otherwise 1.
@@ -279,9 +322,21 @@
if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL)
goto fail_cache;
- if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
+ if (global.nbthread > 1) {
+ hap_register_per_thread_init(init_pollers_per_thread);
+ hap_register_per_thread_deinit(deinit_pollers_per_thread);
+ }
+ else if (!init_pollers_per_thread())
goto fail_updt;
+ for (p = 0; p < global.maxsock; p++)
+ SPIN_INIT(&fdtab[p].lock);
+
+ //memset(fd_cache, -1, global.maxsock);
+
+ SPIN_INIT(&fdtab_lock);
+ RWLOCK_INIT(&fdcache_lock);
+ SPIN_INIT(&poll_lock);
do {
bp = NULL;
for (p = 0; p < nbpollers; p++)
@@ -316,16 +371,24 @@
struct poller *bp;
int p;
+ for (p = 0; p < global.maxsock; p++)
+ SPIN_DESTROY(&fdtab[p].lock);
+
for (p = 0; p < nbpollers; p++) {
bp = &pollers[p];
if (bp && bp->pref)
bp->term(bp);
}
+
free(fd_updt); fd_updt = NULL;
free(fd_cache); fd_cache = NULL;
free(fdinfo); fdinfo = NULL;
free(fdtab); fdtab = NULL;
+
+ SPIN_DESTROY(&fdtab_lock);
+ RWLOCK_DESTROY(&fdcache_lock);
+ SPIN_DESTROY(&poll_lock);
}
/*
diff --git a/src/haproxy.c b/src/haproxy.c
index 0733189..7f48514 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -2202,7 +2202,35 @@
deinit_pollers();
} /* end deinit() */
+void mworker_pipe_handler(int fd)
+{
+ char c;
+
+ while (read(fd, &c, 1) == -1) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EAGAIN) {
+ fd_cant_recv(fd);
+ return;
+ }
+ break;
+ }
+
+ deinit();
+ exit(EXIT_FAILURE);
+ return;
+}
+
+void mworker_pipe_register(int pipefd[2])
+{
+ close(mworker_pipe[1]); /* close the write end of the master pipe in the children */
+ fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
+ fdtab[mworker_pipe[0]].owner = mworker_pipe;
+ fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler;
+ fd_insert(mworker_pipe[0]);
+ fd_want_recv(mworker_pipe[0]);
+}
static void sync_poll_loop()
{
@@ -2278,6 +2306,10 @@
}
}
+ if (global.mode & MODE_MWORKER)
+ mworker_pipe_register(mworker_pipe);
+
+ protocol_enable_all();
THREAD_SYNC_ENABLE();
run_poll_loop();
@@ -2320,37 +2352,6 @@
return t;
}
-void mworker_pipe_handler(int fd)
-{
- char c;
-
- while (read(fd, &c, 1) == -1) {
- if (errno == EINTR)
- continue;
- if (errno == EAGAIN) {
- fd_cant_recv(fd);
- return;
- }
- break;
- }
-
- deinit();
- exit(EXIT_FAILURE);
- return;
-}
-
-void mworker_pipe_register(int pipefd[2])
-{
- close(mworker_pipe[1]); /* close the write end of the master pipe in the children */
-
- fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
- fdtab[mworker_pipe[0]].owner = mworker_pipe;
- fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler;
- fd_insert(mworker_pipe[0]);
- fd_want_recv(mworker_pipe[0]);
- }
-
-
int main(int argc, char **argv)
{
int err, retry;
@@ -2798,11 +2799,6 @@
}
global.mode &= ~MODE_STARTING;
-
- if (global.mode & MODE_MWORKER)
- mworker_pipe_register(mworker_pipe);
-
- protocol_enable_all();
/*
* That's it : the central polling loop. Run until we stop.
*/
@@ -2827,6 +2823,12 @@
}
else {
tid = 0;
+
+ if (global.mode & MODE_MWORKER)
+ mworker_pipe_register(mworker_pipe);
+
+ protocol_enable_all();
+
run_poll_loop();
}