[BUG] fix ev_sepoll again, this time with a new state machine
It was possible in ev_sepoll() to ignore certain events if
all speculative events had been processed at once, because
the epoll_wait() timeout was not cleared, thus delaying the
events delivery.
The state machine was complicated, it has been rewritten.
It seems faster and more correct right now.
diff --git a/src/ev_sepoll.c b/src/ev_sepoll.c
index c3b7776..de92c5d 100644
--- a/src/ev_sepoll.c
+++ b/src/ev_sepoll.c
@@ -105,6 +105,10 @@
#define FD_EV_MASK (FD_EV_MASK_W | FD_EV_MASK_R)
+/* This is the minimum number of events successfully processed in speculative
+ * mode above which we agree to return without checking epoll() (1/2 times).
+ */
+#define MIN_RETURN_EVENTS 25
/* descriptor of one FD.
* FIXME: should be a bit field */
@@ -215,6 +219,7 @@
return 0;
}
+/* normally unused */
REGPRM1 static void __fd_rem(int fd)
{
__fd_clr(fd, DIR_RD);
@@ -233,147 +238,150 @@
}
/*
- * operations to perform when reaching _do_poll() for some FDs in the spec
- * queue depending on their state. This is mainly used to cleanup some FDs
- * which are in STOP state. It is also used to compute EPOLL* flags when
- * switching from SPEC to WAIT.
- */
-static struct ev_to_epoll {
- char op; // epoll opcode to switch from spec to wait, 0 if none
- char m; // inverted mask for existing events
- char ev; // remaining epoll events after change
- char pad;
-} ev_to_epoll[16] = {
- [FD_EV_IDLE_W | FD_EV_STOP_R] = { .op=EPOLL_CTL_DEL, .m=FD_EV_MASK_R },
- [FD_EV_SPEC_W | FD_EV_STOP_R] = { .op=EPOLL_CTL_DEL, .m=FD_EV_MASK_R },
- [FD_EV_STOP_W | FD_EV_IDLE_R] = { .op=EPOLL_CTL_DEL, .m=FD_EV_MASK_W },
- [FD_EV_STOP_W | FD_EV_SPEC_R] = { .op=EPOLL_CTL_DEL, .m=FD_EV_MASK_W },
- [FD_EV_WAIT_W | FD_EV_STOP_R] = { .op=EPOLL_CTL_MOD, .m=FD_EV_MASK_R, .ev=EPOLLOUT },
- [FD_EV_STOP_W | FD_EV_WAIT_R] = { .op=EPOLL_CTL_MOD, .m=FD_EV_MASK_W, .ev=EPOLLIN },
- [FD_EV_STOP_W | FD_EV_STOP_R] = { .op=EPOLL_CTL_DEL, .m=FD_EV_MASK_R|FD_EV_MASK_W },
- [FD_EV_SPEC_W | FD_EV_WAIT_R] = { .ev=EPOLLIN },
- [FD_EV_WAIT_W | FD_EV_SPEC_R] = { .ev=EPOLLOUT },
- [FD_EV_WAIT_W | FD_EV_WAIT_R] = { .ev=EPOLLIN|EPOLLOUT },
-};
-
-/*
* speculative epoll() poller
*/
REGPRM2 static void _do_poll(struct poller *p, int wait_time)
{
static unsigned int last_skipped;
- int status;
+ int status, eo;
int fd, opcode;
int count;
int spec_idx;
/* Here we have two options :
- * - either walk the list forwards and hope to atch more events
+ * - either walk the list forwards and hope to match more events
* - or walk it backwards to minimize the number of changes and
* to make better use of the cache.
* Tests have shown that walking backwards improves perf by 0.2%.
*/
+ status = 0;
spec_idx = nbspec;
while (likely(spec_idx > 0)) {
spec_idx--;
fd = spec_list[spec_idx];
-
- opcode = ev_to_epoll[fd_list[fd].e].op;
- if (opcode) {
- ev.data.fd = fd;
- ev.events = ev_to_epoll[fd_list[fd].e].ev;
- fd_list[fd].e &= ~(unsigned int)ev_to_epoll[fd_list[fd].e].m;
- epoll_ctl(epoll_fd, opcode, fd, &ev);
- }
+ eo = fd_list[fd].e; /* save old events */
- if (!(fd_list[fd].e & FD_EV_RW_SL)) {
- // This one must be removed. Let's clear it now.
- delete_spec_entry(spec_idx);
- continue;
- }
-
- /* OK so now we do not have any event marked STOP anymore in
- * the list. We can simply try to execute functions for the
- * events we have found, and requeue them in case of EAGAIN.
+ /*
+ * Process the speculative events.
+ *
+ * Principle: events which are marked FD_EV_SPEC are processed
+ * with their assigned function. If the function returns 0, it
+ * means there is nothing doable without polling first. We will
+ * then convert the event to a pollable one by assigning them
+ * the WAIT status.
*/
- status = 0;
fdtab[fd].ev = 0;
-
- if ((fd_list[fd].e & FD_EV_MASK_R) == FD_EV_SPEC_R) {
+ if ((eo & FD_EV_MASK_R) == FD_EV_SPEC_R) {
+ /* The owner is interested in reading from this FD */
if (fdtab[fd].state != FD_STCLOSE && fdtab[fd].state != FD_STERROR) {
+ /* Pretend there is something to read */
fdtab[fd].ev |= FD_POLL_IN;
- if (fdtab[fd].cb[DIR_RD].f(fd) == 0)
- status |= EPOLLIN;
+ if (!fdtab[fd].cb[DIR_RD].f(fd))
+ fd_list[fd].e ^= (FD_EV_WAIT_R ^ FD_EV_SPEC_R);
+ else
+ status++;
}
}
+ else if ((eo & FD_EV_MASK_R) == FD_EV_STOP_R) {
+ /* This FD was being polled and is now being removed. */
+ fd_list[fd].e &= ~FD_EV_MASK_R;
+ }
- if ((fd_list[fd].e & FD_EV_MASK_W) == FD_EV_SPEC_W) {
+ if ((eo & FD_EV_MASK_W) == FD_EV_SPEC_W) {
+ /* The owner is interested in writing to this FD */
if (fdtab[fd].state != FD_STCLOSE && fdtab[fd].state != FD_STERROR) {
+ /* Pretend there is something to write */
fdtab[fd].ev |= FD_POLL_OUT;
- if (fdtab[fd].cb[DIR_WR].f(fd) == 0)
- status |= EPOLLOUT;
+ if (!fdtab[fd].cb[DIR_WR].f(fd))
+ fd_list[fd].e ^= (FD_EV_WAIT_W ^ FD_EV_SPEC_W);
+ else
+ status++;
}
}
+ else if ((eo & FD_EV_MASK_W) == FD_EV_STOP_W) {
+ /* This FD was being polled and is now being removed. */
+ fd_list[fd].e &= ~FD_EV_MASK_W;
+ }
- if (status) {
- /* Some speculative accesses have failed, we must
- * switch to the WAIT state.
- */
- ev.events = status;
- ev.data.fd = fd;
- if (fd_list[fd].e & FD_EV_RW_PL) {
- // Event already in poll list
- ev.events |= ev_to_epoll[fd_list[fd].e].ev;
- opcode = EPOLL_CTL_MOD;
- } else {
- // Event not in poll list yet
+ /* Now, we will adjust the event in the poll list. Indeed, it
+ * is possible that an event which was previously in the poll
+ * list now goes out, and the opposite is possible too. We can
+ * have opposite changes for READ and WRITE too.
+ */
+
+ if ((eo ^ fd_list[fd].e) & FD_EV_RW_PL) {
+ /* poll status changed*/
+ if ((fd_list[fd].e & FD_EV_RW_PL) == 0) {
+ /* fd removed from poll list */
+ opcode = EPOLL_CTL_DEL;
+ }
+ else if ((eo & FD_EV_RW_PL) == 0) {
+ /* new fd in the poll list */
opcode = EPOLL_CTL_ADD;
}
+ else {
+ /* fd status changed */
+ opcode = EPOLL_CTL_MOD;
+ }
+
+ /* construct the epoll events based on new state */
+ ev.events = 0;
+ if (fd_list[fd].e & FD_EV_WAIT_R)
+ ev.events |= EPOLLIN;
+
+ if (fd_list[fd].e & FD_EV_WAIT_W)
+ ev.events |= EPOLLOUT;
+
+ ev.data.fd = fd;
epoll_ctl(epoll_fd, opcode, fd, &ev);
+ }
- if (status & EPOLLIN) {
- fd_list[fd].e &= ~FD_EV_MASK_R;
- fd_list[fd].e |= FD_EV_WAIT_R;
- }
- if (status & EPOLLOUT) {
- fd_list[fd].e &= ~FD_EV_MASK_W;
- fd_list[fd].e |= FD_EV_WAIT_W;
- }
- if ((fd_list[fd].e & FD_EV_MASK_R) != FD_EV_SPEC_R &&
- (fd_list[fd].e & FD_EV_MASK_W) != FD_EV_SPEC_W) {
- delete_spec_entry(spec_idx);
- continue;
- }
+ if (!(fd_list[fd].e & FD_EV_RW_SL)) {
+ /* This fd switched to combinations of either WAIT or
+ * IDLE. It must be removed from the spec list.
+ */
+ delete_spec_entry(spec_idx);
+ continue;
}
}
- /* If some speculative events remain, we must not set the timeout in
- * epoll_wait(). Also, if some speculative events remain, it means
- * that some have been immediately processed, otherwise they would
- * have been disabled.
+ /* It may make sense to immediately return here if there are enough
+ * processed events, without passing through epoll_wait() because we
+ * have exactly done a poll.
+ * Measures have shown a great performance increase if we call the
+ * epoll_wait() only the second time after speculative accesses have
+ * succeeded. This reduces the number of unsucessful calls to
+ * epoll_wait() by a factor of about 3, and the total number of calls
+ * by about 2.
*/
- if (nbspec) {
- if (!last_skipped++) {
- /* Measures have shown a great performance increase if
- * we call the epoll_wait() only the second time after
- * speculative accesses have succeeded. This reduces
- * the number of unsucessful calls to epoll_wait() by
- * a factor of about 3, and the total number of calls
- * by about 2.
- */
+
+ if (status >= MIN_RETURN_EVENTS) {
+ /* We have processed at least MIN_RETURN_EVENTS, it's worth
+ * returning now without checking epoll_wait().
+ */
+ if (++last_skipped <= 1) {
tv_now(&now);
return;
}
- wait_time = 0;
}
last_skipped = 0;
- /* now let's wait for events */
+ if (nbspec || status) {
+ /* Maybe we have processed some events that we must report, or
+ * maybe we still have events in the spec list, so we must not
+ * wait in epoll() otherwise we will delay their delivery by
+ * the next timeout.
+ */
+ wait_time = 0;
+ }
+
+ /* now let's wait for real events */
status = epoll_wait(epoll_fd, epoll_events, maxfd, wait_time);
+
tv_now(&now);
for (count = 0; count < status; count++) {