MAJOR: ev_kqueue: make the poller support speculative events

The poller was updated to support speculative events. We'll need this
to fully support SSL.

As an a side effect, the code has become much simpler and much more
efficient, by taking advantage of the nice kqueue API which supports
batched updates. All references to fd_sets have disappeared, and only
the fdtab[].spec_e fields are used to decide about file descriptor
state.
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index e771c33..4ede2ec 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -34,81 +34,77 @@
 #include <proto/task.h>
 
 /* private data */
-static fd_set *fd_evts[2];
 static int kqueue_fd;
 static struct kevent *kev = NULL;
 
-/* speeds up conversion of DIR_RD/DIR_WR to EVFILT* */
-static const int dir2filt[2] = { EVFILT_READ, EVFILT_WRITE };
-
-/* completes a change list for deletion */
-REGPRM3 static int kqev_del(struct kevent *kev, const int fd, const int dir)
-{
-	if (FD_ISSET(fd, fd_evts[dir])) {
-		FD_CLR(fd, fd_evts[dir]);
-		EV_SET(kev, fd, dir2filt[dir], EV_DELETE, 0, 0, NULL);
-		return 1;
-	}
-	return 0;
-}
-
 /*
- * Returns non-zero if direction <dir> is already set for <fd>.
+ * kqueue() poller
  */
-REGPRM2 static int __fd_is_set(const int fd, int dir)
+REGPRM2 static void _do_poll(struct poller *p, int exp)
 {
-	return FD_ISSET(fd, fd_evts[dir]);
-}
+	int status;
+	int count, fd, delta_ms;
+	struct timespec timeout;
+	int updt_idx, en, eo;
+	int changes = 0;
 
-REGPRM2 static void __fd_set(const int fd, int dir)
-{
-	/* if the value was set, do nothing */
-	if (FD_ISSET(fd, fd_evts[dir]))
-		return;
+	/* first, scan the update list to find changes */
+	for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
+		fd = fd_updt[updt_idx];
+		en = fdtab[fd].spec_e & 15;  /* new events */
+		eo = fdtab[fd].spec_e >> 4;  /* previous events */
 
-	FD_SET(fd, fd_evts[dir]);
-	EV_SET(kev, fd, dir2filt[dir], EV_ADD, 0, 0, NULL);
-	kevent(kqueue_fd, kev, 1, NULL, 0, NULL);
-}
+		if (fdtab[fd].owner && (eo ^ en)) {
+			if ((eo ^ en) & FD_EV_POLLED_R) {
+				/* read poll status changed */
+				if (en & FD_EV_POLLED_R) {
+					EV_SET(&kev[changes], fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+					changes++;
+				}
+				else {
+					EV_SET(&kev[changes], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+					changes++;
+				}
+			}
 
-REGPRM2 static void __fd_clr(const int fd, int dir)
-{
-	if (!kqev_del(kev, fd, dir))
-		return;
-	kevent(kqueue_fd, kev, 1, NULL, 0, NULL);
-}
+			if ((eo ^ en) & FD_EV_POLLED_W) {
+				/* write poll status changed */
+				if (en & FD_EV_POLLED_W) {
+					EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+					changes++;
+				}
+				else {
+					EV_SET(&kev[changes], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+					changes++;
+				}
+			}
 
-REGPRM1 static void __fd_rem(int fd)
-{
-	int changes = 0;
+			fdtab[fd].spec_e = (en << 4) + en;  /* save new events */
 
-	changes += kqev_del(&kev[changes], fd, DIR_RD);
-	changes += kqev_del(&kev[changes], fd, DIR_WR);
+			if (!(en & FD_EV_ACTIVE_RW)) {
+				/* This fd doesn't use any active entry anymore, we can
+				 * kill its entry.
+				 */
+				release_spec_entry(fd);
+			}
+			else if ((en & ~eo) & FD_EV_ACTIVE_RW) {
+				/* we need a new spec entry now */
+				alloc_spec_entry(fd);
+			}
 
+		}
+		fdtab[fd].updated = 0;
+		fdtab[fd].new = 0;
+	}
 	if (changes)
 		kevent(kqueue_fd, kev, changes, NULL, 0, NULL);
-}
-
-REGPRM1 static void __fd_clo(int fd)
-{
-	FD_CLR(fd, fd_evts[DIR_RD]);
-	FD_CLR(fd, fd_evts[DIR_WR]);
-}
-
-/*
- * kqueue() poller
- */
-REGPRM2 static void _do_poll(struct poller *p, int exp)
-{
-	int status;
-	int count, fd, delta_ms;
-	struct timespec timeout;
+	fd_nbupdt = 0;
 
 	delta_ms        = 0;
 	timeout.tv_sec  = 0;
 	timeout.tv_nsec = 0;
 
-	if (!run_queue && !signal_queue_len) {
+	if (!fd_nbspec && !run_queue && !signal_queue_len) {
 		if (!exp) {
 			delta_ms        = MAX_DELAY_MS;
 			timeout.tv_sec  = (MAX_DELAY_MS / 1000);
@@ -141,17 +137,29 @@
 			continue;
 
 		fdtab[fd].ev &= FD_POLL_STICKY;
+
 		if (kev[count].filter ==  EVFILT_READ) {
-			if (FD_ISSET(fd, fd_evts[DIR_RD])) {
+			if ((fdtab[fd].spec_e & FD_EV_STATUS_R))
 				fdtab[fd].ev |= FD_POLL_IN;
-			}
-		} else if (kev[count].filter ==  EVFILT_WRITE) {
-			if (FD_ISSET(fd, fd_evts[DIR_WR])) {
+		}
+		else if (kev[count].filter ==  EVFILT_WRITE) {
+			if ((fdtab[fd].spec_e & FD_EV_STATUS_W))
 				fdtab[fd].ev |= FD_POLL_OUT;
-			}
 		}
-		if (fdtab[fd].iocb && fdtab[fd].ev)
+
+		if (fdtab[fd].iocb && fdtab[fd].ev) {
+			/* Mark the events as speculative before processing
+			 * them so that if nothing can be done we don't need
+			 * to poll again.
+			 */
+			if (fdtab[fd].ev & (FD_POLL_IN|FD_POLL_HUP|FD_POLL_ERR))
+				fd_ev_set(fd, DIR_RD);
+
+			if (fdtab[fd].ev & (FD_POLL_OUT|FD_POLL_ERR))
+				fd_ev_set(fd, DIR_WR);
+
 			fdtab[fd].iocb(fd);
+		}
 	}
 }
 
@@ -162,33 +170,19 @@
  */
 REGPRM1 static int _do_init(struct poller *p)
 {
-	__label__ fail_wevt, fail_revt, fail_fd;
-	int fd_set_bytes;
-
 	p->private = NULL;
-	fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE;
 
 	kqueue_fd = kqueue();
 	if (kqueue_fd < 0)
 		goto fail_fd;
 
-	kev = (struct kevent*)calloc(1, sizeof(struct kevent) * global.tune.maxpollevents);
-
+	/* we can have up to two events per fd (*/
+	kev = (struct kevent*)calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
 	if (kev == NULL)
 		goto fail_kev;
 		
-	if ((fd_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
-		goto fail_revt;
-
-	if ((fd_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
-		goto fail_wevt;
-
 	return 1;
 
- fail_wevt:
-	free(fd_evts[DIR_RD]);
- fail_revt:
-	free(kev);
  fail_kev:
 	close(kqueue_fd);
 	kqueue_fd = -1;
@@ -203,8 +197,6 @@
  */
 REGPRM1 static void _do_term(struct poller *p)
 {
-	free(fd_evts[DIR_WR]);
-	free(fd_evts[DIR_RD]);
 	free(kev);
 
 	if (kqueue_fd >= 0) {
@@ -272,12 +264,12 @@
 	p->poll = _do_poll;
 	p->fork = _do_fork;
 
-	p->is_set  = __fd_is_set;
-	p->set = __fd_set;
-	p->wai = __fd_set;
-	p->clr = __fd_clr;
-	p->rem = __fd_rem;
-	p->clo = __fd_clo;
+	p->is_set = NULL;
+	p->set = NULL;
+	p->wai = NULL;
+	p->clr = NULL;
+	p->rem = NULL;
+	p->clo = NULL;
 }