BUG/MEDIUM: kqueue/threads: use one kqueue_fd per thread
This is the same principle as the previous patch (BUG/MEDIUM:
epoll/threads: use one epoll_fd per thread) except that this time it's
for kqueue. We don't want all threads to wake up because of activity on
a single other thread that the other ones are not interested in.
Just like with previous patch, this one shows that the polling state
doesn't need to be changed here and that some simplifications are now
possible. This patch only implements the minimum required for a stable
backport.
This should be backported to 1.8.
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 20fa290..532d49d 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -29,7 +29,7 @@
/* private data */
-static int kqueue_fd;
+static int kqueue_fd[MAX_THREADS]; // per-thread kqueue_fd
static THREAD_LOCAL struct kevent *kev = NULL;
/*
@@ -61,35 +61,34 @@
fdtab[fd].state = en;
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
- if ((eo ^ en) & FD_EV_POLLED_RW) {
- /* poll status changed */
- if ((eo ^ en) & FD_EV_POLLED_R) {
- /* read poll status changed */
- if (en & FD_EV_POLLED_R) {
- EV_SET(&kev[changes], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
- changes++;
- }
- else {
- EV_SET(&kev[changes], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- changes++;
- }
+ if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
+ if (!(fdtab[fd].polled_mask & tid_bit)) {
+ /* fd was not watched, it's still not */
+ continue;
}
+ /* fd totally removed from poll list */
+ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+ HA_ATOMIC_AND(&fdtab[fd].polled_mask, ~tid_bit);
+ }
+ else {
+ /* OK fd has to be monitored, it was either added or changed */
- if ((eo ^ en) & FD_EV_POLLED_W) {
- /* write poll status changed */
- if (en & FD_EV_POLLED_W) {
- EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
- changes++;
- }
- else {
- EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- changes++;
- }
- }
+ if (en & FD_EV_POLLED_R)
+ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+ else if (fdtab[fd].polled_mask & tid_bit)
+ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+
+ if (en & FD_EV_POLLED_W)
+ EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+ else if (fdtab[fd].polled_mask & tid_bit)
+ EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+
+ HA_ATOMIC_OR(&fdtab[fd].polled_mask, tid_bit);
}
}
if (changes)
- kevent(kqueue_fd, kev, changes, NULL, 0, NULL);
+ kevent(kqueue_fd[tid], kev, changes, NULL, 0, NULL);
fd_nbupdt = 0;
delta_ms = 0;
@@ -113,7 +112,7 @@
fd = MIN(maxfd, global.tune.maxpollevents);
gettimeofday(&before_poll, NULL);
- status = kevent(kqueue_fd, // int kq
+ status = kevent(kqueue_fd[tid], // int kq
NULL, // const struct kevent *changelist
0, // int nchanges
kev, // struct kevent *eventlist
@@ -155,11 +154,32 @@
static int init_kqueue_per_thread()
{
+ int fd;
+
/* we can have up to two events per fd (*/
kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
if (kev == NULL)
- return 0;
+ goto fail_alloc;
+
+ if (tid) {
+ kqueue_fd[tid] = kqueue();
+ if (kqueue_fd[tid] < 0)
+ goto fail_fd;
+ }
+
+ /* we may have to unregister some events initially registered on the
+ * original fd when it was alone, and/or to register events on the new
+ * fd for this thread. Let's just mark them as updated, the poller will
+ * do the rest.
+ */
+ for (fd = 0; fd < maxfd; fd++)
+ updt_fd_polling(fd);
+
return 1;
+ fail_fd:
+ free(kev);
+ fail_alloc:
+ return 0;
}
static void deinit_kqueue_per_thread()
@@ -177,8 +197,8 @@
{
p->private = NULL;
- kqueue_fd = kqueue();
- if (kqueue_fd < 0)
+ kqueue_fd[tid] = kqueue();
+ if (kqueue_fd[tid] < 0)
goto fail_fd;
hap_register_per_thread_init(init_kqueue_per_thread);
@@ -196,9 +216,9 @@
*/
REGPRM1 static void _do_term(struct poller *p)
{
- if (kqueue_fd >= 0) {
- close(kqueue_fd);
- kqueue_fd = -1;
+ if (kqueue_fd[tid] >= 0) {
+ close(kqueue_fd[tid]);
+ kqueue_fd[tid] = -1;
}
p->private = NULL;
@@ -227,8 +247,8 @@
*/
REGPRM1 static int _do_fork(struct poller *p)
{
- kqueue_fd = kqueue();
- if (kqueue_fd < 0)
+ kqueue_fd[tid] = kqueue();
+ if (kqueue_fd[tid] < 0)
return 0;
return 1;
}
@@ -242,11 +262,14 @@
static void _do_register(void)
{
struct poller *p;
+ int i;
if (nbpollers >= MAX_POLLERS)
return;
- kqueue_fd = -1;
+ for (i = 0; i < MAX_THREADS; i++)
+ kqueue_fd[i] = -1;
+
p = &pollers[nbpollers++];
p->name = "kqueue";