[MEDIUM] enable inter-stream_interface wakeup calls

By letting the producer tell the consumer there is data to check,
and the consumer tell the producer there is some space left again,
we can cut in half the number of session wakeups.

This is also an important starting point for future splicing support.
diff --git a/src/client.c b/src/client.c
index 4e8004e..5885f06 100644
--- a/src/client.c
+++ b/src/client.c
@@ -182,6 +182,8 @@
 		s->si[0].owner = t;
 		s->si[0].shutr = stream_sock_shutr;
 		s->si[0].shutw = stream_sock_shutw;
+		s->si[0].chk_rcv = stream_sock_chk_rcv;
+		s->si[0].chk_snd = stream_sock_chk_snd;
 		s->si[0].fd = cfd;
 		s->si[0].flags = SI_FL_NONE;
 		s->si[0].exp = TICK_ETERNITY;
@@ -192,6 +194,8 @@
 		s->si[1].owner = t;
 		s->si[1].shutr = stream_sock_shutr;
 		s->si[1].shutw = stream_sock_shutw;
+		s->si[1].chk_rcv = stream_sock_chk_rcv;
+		s->si[1].chk_snd = stream_sock_chk_snd;
 		s->si[1].exp = TICK_ETERNITY;
 		s->si[1].fd = -1; /* just to help with debugging */
 		s->si[1].flags = SI_FL_NONE;
diff --git a/src/proto_uxst.c b/src/proto_uxst.c
index 3da3049..6c13679 100644
--- a/src/proto_uxst.c
+++ b/src/proto_uxst.c
@@ -452,6 +452,8 @@
 		s->si[0].owner = t;
 		s->si[0].shutr = stream_sock_shutr;
 		s->si[0].shutw = stream_sock_shutw;
+		s->si[0].chk_rcv = stream_sock_chk_rcv;
+		s->si[0].chk_snd = stream_sock_chk_snd;
 		s->si[0].fd = cfd;
 		s->si[0].flags = SI_FL_NONE;
 		s->si[0].exp = TICK_ETERNITY;
@@ -462,6 +464,8 @@
 		s->si[1].owner = t;
 		s->si[1].shutr = stream_sock_shutr;
 		s->si[1].shutw = stream_sock_shutw;
+		s->si[1].chk_rcv = stream_sock_chk_rcv;
+		s->si[1].chk_snd = stream_sock_chk_snd;
 		s->si[1].exp = TICK_ETERNITY;
 		s->si[1].fd = -1; /* just to help with debugging */
 		s->si[1].flags = SI_FL_NONE;
diff --git a/src/stream_sock.c b/src/stream_sock.c
index 72a42c7..fdd0dbd 100644
--- a/src/stream_sock.c
+++ b/src/stream_sock.c
@@ -244,12 +244,16 @@
 	 * have at least read something.
 	 */
 
-	if (tick_isset(b->rex) && b->flags & BF_READ_PARTIAL)
+	if ((b->flags & (BF_READ_PARTIAL|BF_FULL|BF_READ_NOEXP)) == BF_READ_PARTIAL)
 		b->rex = tick_add_ifset(now_ms, b->rto);
 
 	if (!(b->flags & BF_READ_ACTIVITY))
 		goto out_skip_wakeup;
  out_wakeup:
+	/* the consumer might be waiting for data */
+	if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL))
+		b->cons->chk_snd(b->cons);
+
 	task_wakeup(si->owner, TASK_WOKEN_IO);
 
  out_skip_wakeup:
@@ -433,7 +437,7 @@
 	 * written something.
 	 */
 
-	if (tick_isset(b->wex) && b->flags & BF_WRITE_PARTIAL) {
+	if ((b->flags & (BF_WRITE_PARTIAL|BF_EMPTY|BF_SHUTW)) == BF_WRITE_PARTIAL) {
 		b->wex = tick_add_ifset(now_ms, b->wto);
 		if (tick_isset(b->wex) & tick_isset(si->ib->rex)) {
 			/* FIXME: to prevent the client from expiring read timeouts during writes,
@@ -448,6 +452,10 @@
 	if (!(b->flags & BF_WRITE_ACTIVITY))
 		goto out_skip_wakeup;
  out_wakeup:
+	/* the producer might be waiting for more room to store data */
+	if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL))
+		b->prod->chk_rcv(b->prod);
+
 	task_wakeup(si->owner, TASK_WOKEN_IO);
 
  out_skip_wakeup:
@@ -579,7 +587,8 @@
 	/* Check if we need to close the write side */
 	if (!(ob->flags & BF_SHUTW)) {
 		/* Write not closed, update FD status and timeout for writes */
-		if ((ob->flags & BF_EMPTY) ||
+		if ((ob->send_max == 0) ||
+		    (ob->flags & BF_EMPTY) ||
 		    (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
 			/* stop writing */
 			if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
@@ -609,6 +618,75 @@
 	}
 }
 
+/* This function is used for inter-stream-interface calls. It is called by the
+ * consumer to inform the producer side that it may be interested in checking
+ * for free space in the buffer. Note that it intentionally does not update
+ * timeouts, so that we can still check them later at wake-up.
+ */
+void stream_sock_chk_rcv(struct stream_interface *si)
+{
+	struct buffer *ib = si->ib;
+
+	DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
+		now_ms, __FUNCTION__,
+		fd, fdtab[fd].owner,
+		ib, ob,
+		ib->rex, ob->wex,
+		ib->flags, ob->flags,
+		ib->l, ob->l, si->state);
+
+	if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
+		return;
+
+	if (ib->flags & (BF_FULL|BF_HIJACK)) {
+		/* stop reading */
+		if ((ib->flags & (BF_FULL|BF_HIJACK)) == BF_FULL)
+			si->flags |= SI_FL_WAIT_ROOM;
+		EV_FD_COND_C(si->fd, DIR_RD);
+	}
+	else {
+		/* (re)start reading */
+		si->flags &= ~SI_FL_WAIT_ROOM;
+		EV_FD_COND_S(si->fd, DIR_RD);
+	}
+}
+
+
+/* This function is used for inter-stream-interface calls. It is called by the
+ * producer to inform the consumer side that it may be interested in checking
+ * for data in the buffer. Note that it intentionally does not update timeouts,
+ * so that we can still check them later at wake-up.
+ */
+void stream_sock_chk_snd(struct stream_interface *si)
+{
+	struct buffer *ob = si->ob;
+
+	DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
+		now_ms, __FUNCTION__,
+		fd, fdtab[fd].owner,
+		ib, ob,
+		ib->rex, ob->wex,
+		ib->flags, ob->flags,
+		ib->l, ob->l, si->state);
+
+	if (unlikely(si->state != SI_ST_EST || (ob->flags & BF_SHUTW)))
+		return;
+
+	if ((ob->send_max == 0) ||
+	    (ob->flags & BF_EMPTY) ||
+	    (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
+		/* stop writing */
+		if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
+			si->flags |= SI_FL_WAIT_DATA;
+		EV_FD_COND_C(si->fd, DIR_WR);
+	}
+	else {
+		/* (re)start writing. */
+		si->flags &= ~SI_FL_WAIT_DATA;
+		EV_FD_COND_S(si->fd, DIR_WR);
+	}
+}
+
 
 /*
  * Local variables: