MEDIUM: fd/poller: turn polled_mask to group-local IDs
This changes the signification of each bit in the polled_mask so that
now each bit represents a local thread ID for the current group instead
of a global thread ID. As such, all tests now apply to ltid_bit instead
of tid_bit.
No particular check was made to verify that the FD's tgid matches the
current one because there should be no case where this is not true. A
check was added in epoll's __fd_clo() to confirm it never differs unless
expected (soft stop under thread isolation, or master in starting mode
going to exec mode), but that doesn't prevent from doing the job: it
only consists in checking in the group's threads those that are still
polling this FD and to remove them.
Some atomic loads were added at the various locations, and most repetitive
references to polled_mask[fd].xx were turned to a local copy instead making
the code much more clear.
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index ea2b24f..6a71650 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -43,12 +43,25 @@
static void __fd_clo(int fd)
{
if (unlikely(fdtab[fd].state & FD_CLONED)) {
- unsigned long m = polled_mask[fd].poll_recv | polled_mask[fd].poll_send;
+ unsigned long m = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv) | _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
+ int tgrp = fd_tgid(fd);
struct epoll_event ev;
int i;
- for (i = global.nbthread - 1; i >= 0; i--)
- if (m & (1UL << i))
+ if (!m)
+ return;
+
+ /* since FDs may only be shared per group and are only closed
+ * once entirely reset, it should never happen that we have to
+ * close an FD for another group, unless we're stopping from the
+ * wrong thread or during startup, which is what we're checking
+ * for. Regardless, it is not a problem to do so.
+ */
+ if (unlikely(!(global.mode & MODE_STARTING)))
+ CHECK_IF(tgid != tgrp && !thread_isolated());
+
+ for (i = ha_tgroup_info[tgrp-1].base; i < ha_tgroup_info[tgrp-1].base + ha_tgroup_info[tgrp-1].count; i++)
+ if (m & ha_thread_info[i].ltid_bit)
epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, fd, &ev);
}
}
@@ -57,18 +70,21 @@
{
int en, opcode;
struct epoll_event ev = { };
+ ulong pr, ps;
en = fdtab[fd].state;
+ pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+ ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
/* Try to force EPOLLET on FDs that support it */
if (fdtab[fd].state & FD_ET_POSSIBLE) {
/* already done ? */
- if (polled_mask[fd].poll_recv & polled_mask[fd].poll_send & tid_bit)
+ if (pr & ps & ti->ltid_bit)
return;
/* enable ET polling in both directions */
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
opcode = EPOLL_CTL_ADD;
ev.events = EPOLLIN | EPOLLRDHUP | EPOLLOUT | EPOLLET;
goto done;
@@ -79,38 +95,35 @@
* needlessly unsubscribe then re-subscribe it.
*/
if (!(en & FD_EV_READY_R) &&
- ((en & FD_EV_ACTIVE_W) ||
- ((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit)))
+ ((en & FD_EV_ACTIVE_W) || ((ps | pr) & ti->ltid_bit)))
en |= FD_EV_ACTIVE_R;
- if ((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit) {
+ if ((ps | pr) & ti->ltid_bit) {
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
/* fd removed from poll list */
opcode = EPOLL_CTL_DEL;
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
else {
- if (((en & FD_EV_ACTIVE_R) != 0) ==
- ((polled_mask[fd].poll_recv & tid_bit) != 0) &&
- ((en & FD_EV_ACTIVE_W) != 0) ==
- ((polled_mask[fd].poll_send & tid_bit) != 0))
+ if (((en & FD_EV_ACTIVE_R) != 0) == ((pr & ti->ltid_bit) != 0) &&
+ ((en & FD_EV_ACTIVE_W) != 0) == ((ps & ti->ltid_bit) != 0))
return;
if (en & FD_EV_ACTIVE_R) {
- if (!(polled_mask[fd].poll_recv & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ if (!(pr & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
} else {
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
}
if (en & FD_EV_ACTIVE_W) {
- if (!(polled_mask[fd].poll_send & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ if (!(ps & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
} else {
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
/* fd status changed */
opcode = EPOLL_CTL_MOD;
@@ -120,9 +133,9 @@
/* new fd in the poll list */
opcode = EPOLL_CTL_ADD;
if (en & FD_EV_ACTIVE_R)
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
if (en & FD_EV_ACTIVE_W)
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
else {
return;
diff --git a/src/ev_evports.c b/src/ev_evports.c
index 25cc79b..8129e9e 100644
--- a/src/ev_evports.c
+++ b/src/ev_evports.c
@@ -65,38 +65,40 @@
{
int en;
int events;
+ ulong pr, ps;
en = fdtab[fd].state;
+ pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+ ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
- if (!(polled_mask[fd].poll_recv & tid_bit) &&
- !(polled_mask[fd].poll_send & tid_bit)) {
+ if (!((pr | ps) & ti->ltid_bit)) {
/* fd was not watched, it's still not */
return;
}
/* fd totally removed from poll list */
events = 0;
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
events = evports_state_to_events(en);
if (en & FD_EV_ACTIVE_R) {
- if (!(polled_mask[fd].poll_recv & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ if (!(pr & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
} else {
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
}
if (en & FD_EV_ACTIVE_W) {
- if (!(polled_mask[fd].poll_send & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ if (!(ps & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
} else {
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
}
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 42a1bc5..79ddf7d 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -36,46 +36,48 @@
{
int en;
int changes = start;
+ ulong pr, ps;
en = fdtab[fd].state;
+ pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+ ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_ACTIVE_RW)) {
- if (!(polled_mask[fd].poll_recv & tid_bit) &&
- !(polled_mask[fd].poll_send & tid_bit)) {
+ if (!((pr | ps) & ti->ltid_bit)) {
/* fd was not watched, it's still not */
return changes;
}
/* fd totally removed from poll list */
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
if (en & FD_EV_ACTIVE_R) {
- if (!(polled_mask[fd].poll_recv & tid_bit)) {
+ if (!(pr & ti->ltid_bit)) {
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
}
}
- else if (polled_mask[fd].poll_recv & tid_bit) {
+ else if (pr & ti->ltid_bit) {
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
}
if (en & FD_EV_ACTIVE_W) {
- if (!(polled_mask[fd].poll_send & tid_bit)) {
+ if (!(ps & ti->ltid_bit)) {
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
}
- else if (polled_mask[fd].poll_send & tid_bit) {
+ else if (ps & ti->ltid_bit) {
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
}
}
diff --git a/src/ev_poll.c b/src/ev_poll.c
index 093184f..a7fadea 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -47,15 +47,18 @@
static void _update_fd(int fd, int *max_add_fd)
{
int en;
+ ulong pr, ps;
en = fdtab[fd].state;
+ pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+ ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
/* we have a single state for all threads, which is why we
* don't check the tid_bit. First thread to see the update
* takes it for every other one.
*/
if (!(en & FD_EV_ACTIVE_RW)) {
- if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
+ if (!(pr | ps)) {
/* fd was not watched, it's still not */
return;
}
@@ -69,22 +72,22 @@
/* OK fd has to be monitored, it was either added or changed */
if (!(en & FD_EV_ACTIVE_R)) {
hap_fd_clr(fd, fd_evts[DIR_RD]);
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
} else {
hap_fd_set(fd, fd_evts[DIR_RD]);
- if (!(polled_mask[fd].poll_recv & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ if (!(pr & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
}
if (!(en & FD_EV_ACTIVE_W)) {
hap_fd_clr(fd, fd_evts[DIR_WR]);
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
- }else {
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
+ } else {
hap_fd_set(fd, fd_evts[DIR_WR]);
- if (!(polled_mask[fd].poll_send & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ if (!(ps & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
if (fd > *max_add_fd)
diff --git a/src/ev_select.c b/src/ev_select.c
index 86a89c7..2b39a81 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -38,15 +38,18 @@
static void _update_fd(int fd, int *max_add_fd)
{
int en;
+ ulong pr, ps;
en = fdtab[fd].state;
+ pr = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_recv);
+ ps = _HA_ATOMIC_LOAD(&polled_mask[fd].poll_send);
/* we have a single state for all threads, which is why we
* don't check the tid_bit. First thread to see the update
* takes it for every other one.
*/
if (!(en & FD_EV_ACTIVE_RW)) {
- if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
+ if (!(pr | ps)) {
/* fd was not watched, it's still not */
return;
}
@@ -60,22 +63,22 @@
/* OK fd has to be monitored, it was either added or changed */
if (!(en & FD_EV_ACTIVE_R)) {
hap_fd_clr(fd, fd_evts[DIR_RD]);
- if (polled_mask[fd].poll_recv & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (pr & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~ti->ltid_bit);
} else {
hap_fd_set(fd, fd_evts[DIR_RD]);
- if (!(polled_mask[fd].poll_recv & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ if (!(pr & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, ti->ltid_bit);
}
if (!(en & FD_EV_ACTIVE_W)) {
hap_fd_clr(fd, fd_evts[DIR_WR]);
- if (polled_mask[fd].poll_send & tid_bit)
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ if (ps & ti->ltid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~ti->ltid_bit);
} else {
hap_fd_set(fd, fd_evts[DIR_WR]);
- if (!(polled_mask[fd].poll_send & tid_bit))
- _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ if (!(ps & ti->ltid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, ti->ltid_bit);
}
if (fd > *max_add_fd)