MEDIUM: pollers: Remember the state for read and write for each threads.
In the poller code, instead of just remembering if we're currently polling
a fd or not, remember if we're polling it for writing and/or for reading, that
way, we can avoid to modify the polling if it's already polled as needed.
diff --git a/include/proto/fd.h b/include/proto/fd.h
index 673eaae..4fbb9dc 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -37,7 +37,11 @@
extern volatile struct fdlist update_list;
-extern unsigned long *polled_mask;
+
+extern struct polled_mask {
+ unsigned long poll_recv;
+ unsigned long poll_send;
+} *polled_mask;
extern THREAD_LOCAL int *fd_updt; // FD updates list
extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index bd2d616..dd3a561 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -53,7 +53,7 @@
REGPRM1 static void __fd_clo(int fd)
{
if (unlikely(fdtab[fd].cloned)) {
- unsigned long m = polled_mask[fd];
+ unsigned long m = polled_mask[fd].poll_recv | polled_mask[fd].poll_send;
int i;
for (i = global.nbthread - 1; i >= 0; i--)
@@ -68,13 +68,35 @@
en = fdtab[fd].state;
- if (polled_mask[fd] & tid_bit) {
+ if ((polled_mask[fd].poll_send | polled_mask[fd].poll_recv) & tid_bit) {
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
/* fd removed from poll list */
opcode = EPOLL_CTL_DEL;
- _HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
+ if (polled_mask[fd].poll_recv & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (polled_mask[fd].poll_send & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
}
else {
+ if (((en & FD_EV_POLLED_R) != 0) ==
+ ((polled_mask[fd].poll_recv & tid_bit) != 0) &&
+ ((en & FD_EV_POLLED_W) != 0) ==
+ ((polled_mask[fd].poll_send & tid_bit) != 0))
+ return;
+ if (en & FD_EV_POLLED_R) {
+ if (!(polled_mask[fd].poll_recv & tid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ } else {
+ if (polled_mask[fd].poll_recv & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ }
+ if (en & FD_EV_POLLED_W) {
+ if (!(polled_mask[fd].poll_send & tid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ } else {
+ if (polled_mask[fd].poll_send & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ }
/* fd status changed */
opcode = EPOLL_CTL_MOD;
}
@@ -82,7 +104,10 @@
else if ((fdtab[fd].thread_mask & tid_bit) && (en & FD_EV_POLLED_RW)) {
/* new fd in the poll list */
opcode = EPOLL_CTL_ADD;
- _HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
+ if (en & FD_EV_POLLED_R)
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ if (en & FD_EV_POLLED_W)
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
}
else {
return;
@@ -188,7 +213,8 @@
/* FD has been migrated */
activity[tid].poll_skip++;
epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
- _HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
continue;
}
diff --git a/src/ev_evports.c b/src/ev_evports.c
index 7842bf2..d9d1637 100644
--- a/src/ev_evports.c
+++ b/src/ev_evports.c
@@ -74,18 +74,36 @@
en = fdtab[fd].state;
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
- if (!(polled_mask[fd] & tid_bit)) {
+ if (!(polled_mask[fd].poll_recv & tid_bit) &&
+ !(polled_mask[fd].poll_send & tid_bit)) {
/* fd was not watched, it's still not */
return;
}
/* fd totally removed from poll list */
events = 0;
- _HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
+ if (polled_mask[fd].poll_recv & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (polled_mask[fd].poll_send & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
events = evports_state_to_events(en);
- _HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
+ if (en & FD_EV_POLLED_R) {
+ if (!(polled_mask[fd].poll_recv & tid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ } else {
+ if (polled_mask[fd].poll_recv & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ }
+ if (en & FD_EV_POLLED_W) {
+ if (!(polled_mask[fd].poll_send & tid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ } else {
+ if (polled_mask[fd].poll_send & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ }
+
}
evports_resync_fd(fd, events);
}
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 6924377..d634b67 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -44,29 +44,44 @@
en = fdtab[fd].state;
if (!(fdtab[fd].thread_mask & tid_bit) || !(en & FD_EV_POLLED_RW)) {
- if (!(polled_mask[fd] & tid_bit)) {
+ if (!(polled_mask[fd].poll_recv & tid_bit) &&
+ !(polled_mask[fd].poll_send & tid_bit)) {
/* fd was not watched, it's still not */
return changes;
}
/* fd totally removed from poll list */
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- _HA_ATOMIC_AND(&polled_mask[fd], ~tid_bit);
+ if (polled_mask[fd].poll_recv & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ if (polled_mask[fd].poll_send & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
}
else {
/* OK fd has to be monitored, it was either added or changed */
- if (en & FD_EV_POLLED_R)
- EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
- else if (polled_mask[fd] & tid_bit)
+ if (en & FD_EV_POLLED_R) {
+ if (!(polled_mask[fd].poll_recv & tid_bit)) {
+ EV_SET(&kev[changes++], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ }
+ }
+ else if (polled_mask[fd].poll_recv & tid_bit) {
EV_SET(&kev[changes++], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ }
- if (en & FD_EV_POLLED_W)
- EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
- else if (polled_mask[fd] & tid_bit)
+ if (en & FD_EV_POLLED_W) {
+ if (!(polled_mask[fd].poll_send & tid_bit)) {
+ EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ }
+ }
+ else if (polled_mask[fd].poll_send & tid_bit) {
EV_SET(&kev[changes++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ }
- _HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
}
return changes;
}
diff --git a/src/ev_poll.c b/src/ev_poll.c
index b349c55..d4a1351 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -58,28 +58,38 @@
* takes it for every other one.
*/
if (!(en & FD_EV_POLLED_RW)) {
- if (!polled_mask[fd]) {
+ if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
/* fd was not watched, it's still not */
return;
}
/* fd totally removed from poll list */
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
- _HA_ATOMIC_AND(&polled_mask[fd], 0);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, 0);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, 0);
}
else {
/* OK fd has to be monitored, it was either added or changed */
- if (!(en & FD_EV_POLLED_R))
+ if (!(en & FD_EV_POLLED_R)) {
hap_fd_clr(fd, fd_evts[DIR_RD]);
- else
+ if (polled_mask[fd].poll_recv & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ } else {
hap_fd_set(fd, fd_evts[DIR_RD]);
+ if (!(polled_mask[fd].poll_recv & tid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ }
- if (!(en & FD_EV_POLLED_W))
+ if (!(en & FD_EV_POLLED_W)) {
hap_fd_clr(fd, fd_evts[DIR_WR]);
- else
+ if (polled_mask[fd].poll_send & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ }else {
hap_fd_set(fd, fd_evts[DIR_WR]);
+ if (!(polled_mask[fd].poll_send & tid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ }
- _HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
if (fd > *max_add_fd)
*max_add_fd = fd;
}
diff --git a/src/ev_select.c b/src/ev_select.c
index be88bc2..f2a2acf 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -49,28 +49,38 @@
* takes it for every other one.
*/
if (!(en & FD_EV_POLLED_RW)) {
- if (!polled_mask[fd]) {
+ if (!(polled_mask[fd].poll_recv | polled_mask[fd].poll_send)) {
/* fd was not watched, it's still not */
return;
}
/* fd totally removed from poll list */
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
- _HA_ATOMIC_AND(&polled_mask[fd], 0);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, 0);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, 0);
}
else {
/* OK fd has to be monitored, it was either added or changed */
- if (!(en & FD_EV_POLLED_R))
+ if (!(en & FD_EV_POLLED_R)) {
hap_fd_clr(fd, fd_evts[DIR_RD]);
- else
+ if (polled_mask[fd].poll_recv & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ } else {
hap_fd_set(fd, fd_evts[DIR_RD]);
+ if (!(polled_mask[fd].poll_recv & tid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_recv, tid_bit);
+ }
- if (!(en & FD_EV_POLLED_W))
+ if (!(en & FD_EV_POLLED_W)) {
hap_fd_clr(fd, fd_evts[DIR_WR]);
- else
+ if (polled_mask[fd].poll_send & tid_bit)
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ } else {
hap_fd_set(fd, fd_evts[DIR_WR]);
+ if (!(polled_mask[fd].poll_send & tid_bit))
+ _HA_ATOMIC_OR(&polled_mask[fd].poll_send, tid_bit);
+ }
- _HA_ATOMIC_OR(&polled_mask[fd], tid_bit);
if (fd > *max_add_fd)
*max_add_fd = fd;
}
diff --git a/src/fd.c b/src/fd.c
index 71df46e..a1a4578 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -122,7 +122,7 @@
#include <proto/port_range.h>
struct fdtab *fdtab = NULL; /* array of all the file descriptors */
-unsigned long *polled_mask = NULL; /* Array for the polled_mask of each fd */
+struct polled_mask *polled_mask = NULL; /* Array for the polled_mask of each fd */
struct fdinfo *fdinfo = NULL; /* less-often used infos for file descriptors */
int totalconn; /* total # of terminated sessions */
int actconn; /* # of active sessions */
@@ -338,7 +338,7 @@
fdtab[fd].owner = NULL;
fdtab[fd].thread_mask = 0;
if (do_close) {
- polled_mask[fd] = 0;
+ polled_mask[fd].poll_recv = polled_mask[fd].poll_send = 0;
close(fd);
_HA_ATOMIC_SUB(&ha_used_fds, 1);
}
@@ -525,7 +525,7 @@
if ((fdtab = calloc(global.maxsock, sizeof(struct fdtab))) == NULL)
goto fail_tab;
- if ((polled_mask = calloc(global.maxsock, sizeof(unsigned long))) == NULL)
+ if ((polled_mask = calloc(global.maxsock, sizeof(*polled_mask))) == NULL)
goto fail_polledmask;
if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)