MEDIUM: polling: centralize polled events processing
Currently, each poll loop handles the polled events the same way,
resulting in a lot of duplicated, complex code. Additionally, epoll
was the only one to handle newly created FDs immediately.
So instead, let's move that code to fd.c in a new function dedicated
to this task : fd_process_polled_events(). All pollers now use this
function.
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index ae415fc..b90d9c1 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -174,63 +174,8 @@
if (e & EPOLLRDHUP)
n |= FD_POLL_HUP;
- if (!n)
- continue;
-
fdtab[fd].ev |= n;
-
- if (fdtab[fd].iocb) {
- int new_updt, old_updt;
-
- /* Mark the events as ready before processing */
- if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
- fd_may_recv(fd);
-
- if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
- fd_may_send(fd);
-
- if (fdtab[fd].cache)
- continue;
-
- /* Save number of updates to detect creation of new FDs. */
- old_updt = fd_nbupdt;
- fdtab[fd].iocb(fd);
-
- /* One or more fd might have been created during the iocb().
- * This mainly happens with new incoming connections that have
- * just been accepted, so we'd like to process them immediately
- * for better efficiency. Second benefit, if at the end the fds
- * are disabled again, we can safely destroy their update entry
- * to reduce the scope of later scans. This is the reason we
- * scan the new entries backwards.
- */
-
- for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
- fd = fd_updt[new_updt - 1];
- if (!fdtab[fd].new)
- continue;
-
- fdtab[fd].new = 0;
- fdtab[fd].ev &= FD_POLL_STICKY;
-
- if ((fdtab[fd].state & FD_EV_STATUS_R) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
- fdtab[fd].ev |= FD_POLL_IN;
-
- if ((fdtab[fd].state & FD_EV_STATUS_W) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
- fdtab[fd].ev |= FD_POLL_OUT;
-
- if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
- fdtab[fd].iocb(fd);
-
- /* we can remove this update entry if it's the last one and is
- * unused, otherwise we don't touch anything.
- */
- if (new_updt == fd_nbupdt && !fd_recv_active(fd) && !fd_send_active(fd)) {
- fdtab[fd].updated = 0;
- fd_nbupdt--;
- }
- }
- }
+ fd_process_polled_events(fd);
}
/* the caller will take care of cached events */
}
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index dab6f5b..0473adc 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -152,18 +152,7 @@
fdtab[fd].ev |= FD_POLL_OUT;
}
- if (fdtab[fd].iocb && fdtab[fd].ev) {
- if (fdtab[fd].ev & FD_POLL_IN)
- fd_may_recv(fd);
-
- if (fdtab[fd].ev & FD_POLL_OUT)
- fd_may_send(fd);
-
- if (fdtab[fd].cache)
- continue;
-
- fdtab[fd].iocb(fd);
- }
+ fd_process_polled_events(fd);
}
}
diff --git a/src/ev_poll.c b/src/ev_poll.c
index da927dc..84ba486 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -177,18 +177,7 @@
((e & POLLHUP) ? FD_POLL_HUP : 0);
}
- if (fdtab[fd].iocb && fdtab[fd].ev) {
- if (fdtab[fd].ev & FD_POLL_IN)
- fd_may_recv(fd);
-
- if (fdtab[fd].ev & FD_POLL_OUT)
- fd_may_send(fd);
-
- if (fdtab[fd].cache)
- continue;
-
- fdtab[fd].iocb(fd);
- }
+ fd_process_polled_events(fd);
}
}
diff --git a/src/ev_select.c b/src/ev_select.c
index a878340..87ca348 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -161,18 +161,7 @@
if (FD_ISSET(fd, tmp_evts[DIR_WR]))
fdtab[fd].ev |= FD_POLL_OUT;
- if (fdtab[fd].iocb && fdtab[fd].ev) {
- if (fdtab[fd].ev & FD_POLL_IN)
- fd_may_recv(fd);
-
- if (fdtab[fd].ev & FD_POLL_OUT)
- fd_may_send(fd);
-
- if (fdtab[fd].cache)
- continue;
-
- fdtab[fd].iocb(fd);
- }
+ fd_process_polled_events(fd);
}
}
}
diff --git a/src/fd.c b/src/fd.c
index 9e37068..66f1e8b 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -237,6 +237,75 @@
}
}
+/* Check the events attached to a file descriptor, update its cache
+ * accordingly, and call the associated I/O callback. If new updates are
+ * detected, the function tries to process them as well in order to save
+ * wakeups after accept().
+ */
+void fd_process_polled_events(int fd)
+{
+ int new_updt, old_updt;
+
+ /* First thing to do is to mark the reported events as ready, in order
+ * for them to later be continued from the cache without polling if
+ * they have to be interrupted (eg: recv fills a buffer).
+ */
+ if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
+ fd_may_recv(fd);
+
+ if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR))
+ fd_may_send(fd);
+
+ if (fdtab[fd].cache) {
+ /* This fd is already cached, no need to process it now. */
+ return;
+ }
+
+ if (unlikely(!fdtab[fd].iocb || !fdtab[fd].ev)) {
+ /* nothing to do */
+ return;
+ }
+
+ /* Save number of updates to detect creation of new FDs. */
+ old_updt = fd_nbupdt;
+ fdtab[fd].iocb(fd);
+
+ /* One or more fd might have been created during the iocb().
+ * This mainly happens with new incoming connections that have
+ * just been accepted, so we'd like to process them immediately
+ * for better efficiency, as it saves one useless task wakeup.
+ * Second benefit, if at the end the fds are disabled again, we can
+ * safely destroy their update entry to reduce the scope of later
+ * scans. This is the reason we scan the new entries backwards.
+ */
+ for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
+ fd = fd_updt[new_updt - 1];
+ if (!fdtab[fd].new)
+ continue;
+
+ fdtab[fd].new = 0;
+ fdtab[fd].ev &= FD_POLL_STICKY;
+
+ if ((fdtab[fd].state & FD_EV_STATUS_R) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
+ fdtab[fd].ev |= FD_POLL_IN;
+
+ if ((fdtab[fd].state & FD_EV_STATUS_W) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
+ fdtab[fd].ev |= FD_POLL_OUT;
+
+ if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
+ fdtab[fd].iocb(fd);
+
+ /* we can remove this update entry if it's the last one and is
+ * unused, otherwise we don't touch anything, especially given
+ * that the FD might have been closed already.
+ */
+ if (new_updt == fd_nbupdt && !fd_recv_active(fd) && !fd_send_active(fd)) {
+ fdtab[fd].updated = 0;
+ fd_nbupdt--;
+ }
+ }
+}
+
/* disable the specified poller */
void disable_poller(const char *poller_name)
{