[MEDIUM] enable inter-stream_interface wakeup calls

By letting the producer tell the consumer there is data to check,
and the consumer tell the producer there is some space left again,
we can cut in half the number of session wakeups.

This is also an important starting point for future splicing support.
diff --git a/include/proto/stream_sock.h b/include/proto/stream_sock.h
index a3fd992..929cb08 100644
--- a/include/proto/stream_sock.h
+++ b/include/proto/stream_sock.h
@@ -3,7 +3,7 @@
   This file contains client-side definitions.
 
   Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
-  
+
   This library is free software; you can redistribute it and/or
   modify it under the terms of the GNU Lesser General Public
   License as published by the Free Software Foundation, version 2.1
@@ -36,6 +36,8 @@
 void stream_sock_data_finish(struct stream_interface *si);
 void stream_sock_shutr(struct stream_interface *si);
 void stream_sock_shutw(struct stream_interface *si);
+void stream_sock_chk_rcv(struct stream_interface *si);
+void stream_sock_chk_snd(struct stream_interface *si);
 
 
 /* This either returns the sockname or the original destination address. Code
diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h
index d34cfa4..bb4a9e3 100644
--- a/include/types/stream_interface.h
+++ b/include/types/stream_interface.h
@@ -78,6 +78,8 @@
 	unsigned int exp;       /* wake up time for connect, queue, turn-around, ... */
 	void (*shutr)(struct stream_interface *);  /* shutr function */
 	void (*shutw)(struct stream_interface *);  /* shutw function */
+	void (*chk_rcv)(struct stream_interface *);/* chk_rcv function */
+	void (*chk_snd)(struct stream_interface *);/* chk_snd function */
 	struct buffer *ib, *ob; /* input and output buffers */
 	unsigned int err_type;  /* first error detected, one of SI_ET_* */
 	void *err_loc;          /* commonly the server, NULL when SI_ET_NONE */
diff --git a/src/client.c b/src/client.c
index 4e8004e..5885f06 100644
--- a/src/client.c
+++ b/src/client.c
@@ -182,6 +182,8 @@
 		s->si[0].owner = t;
 		s->si[0].shutr = stream_sock_shutr;
 		s->si[0].shutw = stream_sock_shutw;
+		s->si[0].chk_rcv = stream_sock_chk_rcv;
+		s->si[0].chk_snd = stream_sock_chk_snd;
 		s->si[0].fd = cfd;
 		s->si[0].flags = SI_FL_NONE;
 		s->si[0].exp = TICK_ETERNITY;
@@ -192,6 +194,8 @@
 		s->si[1].owner = t;
 		s->si[1].shutr = stream_sock_shutr;
 		s->si[1].shutw = stream_sock_shutw;
+		s->si[1].chk_rcv = stream_sock_chk_rcv;
+		s->si[1].chk_snd = stream_sock_chk_snd;
 		s->si[1].exp = TICK_ETERNITY;
 		s->si[1].fd = -1; /* just to help with debugging */
 		s->si[1].flags = SI_FL_NONE;
diff --git a/src/proto_uxst.c b/src/proto_uxst.c
index 3da3049..6c13679 100644
--- a/src/proto_uxst.c
+++ b/src/proto_uxst.c
@@ -452,6 +452,8 @@
 		s->si[0].owner = t;
 		s->si[0].shutr = stream_sock_shutr;
 		s->si[0].shutw = stream_sock_shutw;
+		s->si[0].chk_rcv = stream_sock_chk_rcv;
+		s->si[0].chk_snd = stream_sock_chk_snd;
 		s->si[0].fd = cfd;
 		s->si[0].flags = SI_FL_NONE;
 		s->si[0].exp = TICK_ETERNITY;
@@ -462,6 +464,8 @@
 		s->si[1].owner = t;
 		s->si[1].shutr = stream_sock_shutr;
 		s->si[1].shutw = stream_sock_shutw;
+		s->si[1].chk_rcv = stream_sock_chk_rcv;
+		s->si[1].chk_snd = stream_sock_chk_snd;
 		s->si[1].exp = TICK_ETERNITY;
 		s->si[1].fd = -1; /* just to help with debugging */
 		s->si[1].flags = SI_FL_NONE;
diff --git a/src/stream_sock.c b/src/stream_sock.c
index 72a42c7..fdd0dbd 100644
--- a/src/stream_sock.c
+++ b/src/stream_sock.c
@@ -244,12 +244,16 @@
 	 * have at least read something.
 	 */
 
-	if (tick_isset(b->rex) && b->flags & BF_READ_PARTIAL)
+	if ((b->flags & (BF_READ_PARTIAL|BF_FULL|BF_READ_NOEXP)) == BF_READ_PARTIAL)
 		b->rex = tick_add_ifset(now_ms, b->rto);
 
 	if (!(b->flags & BF_READ_ACTIVITY))
 		goto out_skip_wakeup;
  out_wakeup:
+	/* the consumer might be waiting for data */
+	if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL))
+		b->cons->chk_snd(b->cons);
+
 	task_wakeup(si->owner, TASK_WOKEN_IO);
 
  out_skip_wakeup:
@@ -433,7 +437,7 @@
 	 * written something.
 	 */
 
-	if (tick_isset(b->wex) && b->flags & BF_WRITE_PARTIAL) {
+	if ((b->flags & (BF_WRITE_PARTIAL|BF_EMPTY|BF_SHUTW)) == BF_WRITE_PARTIAL) {
 		b->wex = tick_add_ifset(now_ms, b->wto);
 		if (tick_isset(b->wex) & tick_isset(si->ib->rex)) {
 			/* FIXME: to prevent the client from expiring read timeouts during writes,
@@ -448,6 +452,10 @@
 	if (!(b->flags & BF_WRITE_ACTIVITY))
 		goto out_skip_wakeup;
  out_wakeup:
+	/* the producer might be waiting for more room to store data */
+	if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL))
+		b->prod->chk_rcv(b->prod);
+
 	task_wakeup(si->owner, TASK_WOKEN_IO);
 
  out_skip_wakeup:
@@ -579,7 +587,8 @@
 	/* Check if we need to close the write side */
 	if (!(ob->flags & BF_SHUTW)) {
 		/* Write not closed, update FD status and timeout for writes */
-		if ((ob->flags & BF_EMPTY) ||
+		if ((ob->send_max == 0) ||
+		    (ob->flags & BF_EMPTY) ||
 		    (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
 			/* stop writing */
 			if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
@@ -609,6 +618,75 @@
 	}
 }
 
+/* This function is used for inter-stream-interface calls. It is called by the
+ * consumer to inform the producer side that it may be interested in checking
+ * for free space in the buffer. Note that it intentionally does not update
+ * timeouts, so that we can still check them later at wake-up.
+ */
+void stream_sock_chk_rcv(struct stream_interface *si)
+{
+	struct buffer *ib = si->ib;
+
+	DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
+		now_ms, __FUNCTION__,
+		fd, fdtab[fd].owner,
+		ib, ob,
+		ib->rex, ob->wex,
+		ib->flags, ob->flags,
+		ib->l, ob->l, si->state);
+
+	if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
+		return;
+
+	if (ib->flags & (BF_FULL|BF_HIJACK)) {
+		/* stop reading */
+		if ((ib->flags & (BF_FULL|BF_HIJACK)) == BF_FULL)
+			si->flags |= SI_FL_WAIT_ROOM;
+		EV_FD_COND_C(si->fd, DIR_RD);
+	}
+	else {
+		/* (re)start reading */
+		si->flags &= ~SI_FL_WAIT_ROOM;
+		EV_FD_COND_S(si->fd, DIR_RD);
+	}
+}
+
+
+/* This function is used for inter-stream-interface calls. It is called by the
+ * producer to inform the consumer side that it may be interested in checking
+ * for data in the buffer. Note that it intentionally does not update timeouts,
+ * so that we can still check them later at wake-up.
+ */
+void stream_sock_chk_snd(struct stream_interface *si)
+{
+	struct buffer *ob = si->ob;
+
+	DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
+		now_ms, __FUNCTION__,
+		fd, fdtab[fd].owner,
+		ib, ob,
+		ib->rex, ob->wex,
+		ib->flags, ob->flags,
+		ib->l, ob->l, si->state);
+
+	if (unlikely(si->state != SI_ST_EST || (ob->flags & BF_SHUTW)))
+		return;
+
+	if ((ob->send_max == 0) ||
+	    (ob->flags & BF_EMPTY) ||
+	    (ob->flags & (BF_HIJACK|BF_WRITE_ENA)) == 0) {
+		/* stop writing */
+		if ((ob->flags & (BF_EMPTY|BF_HIJACK|BF_WRITE_ENA)) == (BF_EMPTY|BF_WRITE_ENA))
+			si->flags |= SI_FL_WAIT_DATA;
+		EV_FD_COND_C(si->fd, DIR_WR);
+	}
+	else {
+		/* (re)start writing. */
+		si->flags &= ~SI_FL_WAIT_DATA;
+		EV_FD_COND_S(si->fd, DIR_WR);
+	}
+}
+
 
 /*
  * Local variables: