MEDIUM: fd: rely more on fd_update_events() to detect changes
This function already performs a number of checks prior to calling the
IOCB, and detects the change of thread (FD migration). Half of the
controls are still in each poller, and these pollers also maintain
activity counters for various cases.
Note that the unreliable test on thread_mask was removed so that only
the one performed by fd_set_running() is now used, since this one is
reliable.
Let's centralize all that fd-specific logic into the function and make
it return a status among:
FD_UPDT_DONE, // update done, nothing else to be done
FD_UPDT_DEAD, // FD was already dead, ignore it
FD_UPDT_CLOSED, // FD was closed
FD_UPDT_MIGRATED, // FD was migrated, ignore it now
Some pollers already used to call it last and have nothing to do after
it, regardless of the result. epoll has to delete the FD in case a
migration is detected. Overall this removes more code than it adds.
diff --git a/include/haproxy/fd-t.h b/include/haproxy/fd-t.h
index 4759ef2..9923150 100644
--- a/include/haproxy/fd-t.h
+++ b/include/haproxy/fd-t.h
@@ -110,6 +110,14 @@
#define FD_EXPORTED (1U << FD_EXPORTED_BIT)
#define FD_EXCL_SYSCALL (1U << FD_EXCL_SYSCALL_BIT)
+/* FD update status after fd_update_events() */
+enum {
+ FD_UPDT_DONE = 0, // update done, nothing else to be done
+ FD_UPDT_DEAD, // FD was already dead, ignore it
+ FD_UPDT_CLOSED, // FD was closed
+ FD_UPDT_MIGRATED, // FD was migrated, ignore it now
+};
+
/* This is the value used to mark a file descriptor as dead. This value is
* negative, this is important so that tests on fd < 0 properly match. It
* also has the nice property of being highly negative but neither overflowing
diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h
index 7d9d0e6..8e1a442 100644
--- a/include/haproxy/fd.h
+++ b/include/haproxy/fd.h
@@ -117,7 +117,7 @@
void fd_add_to_fd_list(volatile struct fdlist *list, int fd, int off);
void fd_rm_from_fd_list(volatile struct fdlist *list, int fd, int off);
void updt_fd_polling(const int fd);
-void fd_update_events(int fd, uint evts);
+int fd_update_events(int fd, uint evts);
/* Called from the poller to acknowledge we read an entry from the global
* update list, to remove our bit from the update_mask, and remove it from
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index 8810b77..1de2e0f 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -218,6 +218,7 @@
for (count = 0; count < status; count++) {
struct epoll_event ev;
unsigned int n, e;
+ int ret;
e = epoll_events[count].events;
fd = epoll_events[count].data.fd;
@@ -228,27 +229,20 @@
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
- if (!fdtab[fd].owner) {
- activity[tid].poll_dead_fd++;
- continue;
- }
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- /* FD has been migrated */
- activity[tid].poll_skip_fd++;
- epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
- _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
- _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
- continue;
- }
-
n = ((e & EPOLLIN) ? FD_EV_READY_R : 0) |
((e & EPOLLOUT) ? FD_EV_READY_W : 0) |
((e & EPOLLRDHUP) ? FD_EV_SHUT_R : 0) |
((e & EPOLLHUP) ? FD_EV_SHUT_RW : 0) |
((e & EPOLLERR) ? FD_EV_ERR_RW : 0);
+ ret = fd_update_events(fd, n);
+
- fd_update_events(fd, n);
+ if (ret == FD_UPDT_MIGRATED) {
+ /* FD has been migrated */
+ epoll_ctl(epoll_fd[tid], EPOLL_CTL_DEL, fd, &ev);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_recv, ~tid_bit);
+ _HA_ATOMIC_AND(&polled_mask[fd].poll_send, ~tid_bit);
+ }
}
/* the caller will take care of cached events */
}
diff --git a/src/ev_evports.c b/src/ev_evports.c
index 109e59c..c7bf4f6 100644
--- a/src/ev_evports.c
+++ b/src/ev_evports.c
@@ -213,24 +213,14 @@
for (i = 0; i < nevlist; i++) {
unsigned int n = 0;
int events, rebind_events;
+ int ret;
+
fd = evports_evlist[i].portev_object;
events = evports_evlist[i].portev_events;
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
- if (fdtab[fd].owner == NULL) {
- activity[tid].poll_dead_fd++;
- continue;
- }
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- activity[tid].poll_skip_fd++;
- if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
- fd_updt[fd_nbupdt++] = fd;
- continue;
- }
-
/*
* By virtue of receiving an event for this file descriptor, it
* is no longer associated with the port in question. Store
@@ -255,13 +245,24 @@
* entry. If it changes, the fd will be placed on the updated
* list for processing the next time we are called.
*/
- fd_update_events(fd, n);
+ ret = fd_update_events(fd, n);
+
+ /* If the FD was already dead , skip it */
+ if (ret == FD_UPDT_DEAD)
+ continue;
+
+ /* disable polling on this instance if the FD was migrated */
+ if (ret == FD_UPDT_MIGRATED) {
+ if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+ fd_updt[fd_nbupdt++] = fd;
+ continue;
+ }
/*
* This file descriptor was closed during the processing of
* polled events. No need to reassociate.
*/
- if (fdtab[fd].owner == NULL)
+ if (ret == FD_UPDT_CLOSED)
continue;
/*
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index d51a833..ce71484 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -181,23 +181,13 @@
for (count = 0; count < status; count++) {
unsigned int n = 0;
+ int ret;
+
fd = kev[count].ident;
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
- if (!fdtab[fd].owner) {
- activity[tid].poll_dead_fd++;
- continue;
- }
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- activity[tid].poll_skip_fd++;
- if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
- fd_updt[fd_nbupdt++] = fd;
- continue;
- }
-
if (kev[count].filter == EVFILT_READ) {
if (kev[count].data || !(kev[count].flags & EV_EOF))
n |= FD_EV_READY_R;
@@ -210,7 +200,13 @@
n |= FD_EV_ERR_RW;
}
- fd_update_events(fd, n);
+ ret = fd_update_events(fd, n);
+
+ if (ret == FD_UPDT_MIGRATED) {
+ /* FD was migrated, let's stop polling it */
+ if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+ fd_updt[fd_nbupdt++] = fd;
+ }
}
}
diff --git a/src/ev_poll.c b/src/ev_poll.c
index 6bd0cf4..bb9d8f8 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -215,6 +215,7 @@
for (count = 0; status > 0 && count < nbfd; count++) {
unsigned int n;
+ int ret;
int e = poll_events[count].revents;
fd = poll_events[count].fd;
@@ -230,27 +231,20 @@
/* ok, we found one active fd */
status--;
- if (!fdtab[fd].owner) {
- activity[tid].poll_dead_fd++;
- continue;
- }
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- activity[tid].poll_skip_fd++;
- if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
- fd_updt[fd_nbupdt++] = fd;
- continue;
- }
-
n = ((e & POLLIN) ? FD_EV_READY_R : 0) |
((e & POLLOUT) ? FD_EV_READY_W : 0) |
((e & POLLRDHUP) ? FD_EV_SHUT_R : 0) |
((e & POLLHUP) ? FD_EV_SHUT_RW : 0) |
((e & POLLERR) ? FD_EV_ERR_RW : 0);
- fd_update_events(fd, n);
- }
+ ret = fd_update_events(fd, n);
+ if (ret == FD_UPDT_MIGRATED) {
+ /* FD was migrated, let's stop polling it */
+ if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+ fd_updt[fd_nbupdt++] = fd;
+ }
+ }
}
diff --git a/src/ev_select.c b/src/ev_select.c
index 3e5ee5a..c143bd9 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -209,15 +209,6 @@
#ifdef DEBUG_FD
_HA_ATOMIC_INC(&fdtab[fd].event_count);
#endif
- if (!fdtab[fd].owner) {
- activity[tid].poll_dead_fd++;
- continue;
- }
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- activity[tid].poll_skip_fd++;
- continue;
- }
fd_update_events(fd, n);
}
diff --git a/src/fd.c b/src/fd.c
index febe451..df9b5de 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -448,9 +448,10 @@
/* Update events seen for FD <fd> and its state if needed. This should be
* called by the poller, passing FD_EV_*_{R,W,RW} in <evts>. FD_EV_ERR_*
* doesn't need to also pass FD_EV_SHUT_*, it's implied. ERR and SHUT are
- * allowed to be reported regardless of R/W readiness.
+ * allowed to be reported regardless of R/W readiness. Returns one of
+ * FD_UPDT_*.
*/
-void fd_update_events(int fd, uint evts)
+int fd_update_events(int fd, uint evts)
{
unsigned long locked;
uint old, new;
@@ -458,9 +459,17 @@
ti->flags &= ~TI_FL_STUCK; // this thread is still running
+ /* do nothing on remains of an old dead FD */
+ if (!fdtab[fd].owner) {
+ activity[tid].poll_dead_fd++;
+ return FD_UPDT_DEAD;
+ }
+
/* do nothing if the FD was taken over under us */
- if (fd_set_running(fd) == -1)
- return;
+ if (fd_set_running(fd) == -1) {
+ activity[tid].poll_skip_fd++;
+ return FD_UPDT_MIGRATED;
+ }
locked = (fdtab[fd].thread_mask != tid_bit);
@@ -523,6 +532,7 @@
if ((fdtab[fd].running_mask & tid_bit) &&
fd_clr_running(fd) == 0 && !fdtab[fd].thread_mask) {
_fd_delete_orphan(fd);
+ return FD_UPDT_CLOSED;
}
/* we had to stop this FD and it still must be stopped after the I/O
@@ -534,6 +544,8 @@
if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
fd_updt[fd_nbupdt++] = fd;
}
+
+ return FD_UPDT_DONE;
}
/* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>