MAJOR: channel: add a new flag CF_WAKE_WRITE to notify the task of writes

Since commit 6b66f3e ([MAJOR] implement autonomous inter-socket forwarding)
introduced in 1.3.16-rc1, we've been relying on a stupid mechanism to wake
up the task after a write, which was an exact copy-paste of the reader side.

The principle was that if we empty a buffer and there's no forwarding
scheduled or if the *producer* is not in a connected state, then we wake
the task up.

That does not make any sense. It happens to wake up too late sometimes (eg,
when the request analyser waits for some room in the buffer to start to
work), and leads to unneeded wakeups in client-side keep-alive, because
the task is woken up when the response is sent, while the analysers are
simply waiting for a new request.

In order to fix this, we introduce a new channel flag : CF_WAKE_WRITE. It
is designed so that an analyser can explicitly request being notified when
some data were written. It is used only when the HTTP request or response
analysers need to wait for more room in the buffers. It is automatically
cleared upon wake up.

The flag is also automatically set by the functions which try to write into
a buffer from an applet when they fail (bi_putblk() etc...).

That allows us to remove the stupid condition above and avoid some wakeups.
In http-server-close and in http-keep-alive modes, this reduces from 4 to 3
the average number of wakeups per request, and increases the overall
performance by about 1.5%.
diff --git a/src/channel.c b/src/channel.c
index 2942d92..1250370 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -113,14 +113,18 @@
  * input is closed, -2 is returned. If there is not enough room left in the
  * buffer, -1 is returned. Otherwise the number of bytes copied is returned
  * (1). Channel flag READ_PARTIAL is updated if some data can be transferred.
+ * Channel flag CF_WAKE_WRITE is set if the write fails because the buffer is
+ * full.
  */
 int bi_putchr(struct channel *chn, char c)
 {
 	if (unlikely(channel_input_closed(chn)))
 		return -2;
 
-	if (channel_full(chn))
+	if (channel_full(chn)) {
+		chn->flags |= CF_WAKE_WRITE;
 		return -1;
+	}
 
 	*bi_end(chn->buf) = c;
 
@@ -143,7 +147,8 @@
  * -3 is returned. If there is not enough room left in the buffer, -1 is
  * returned. Otherwise the number of bytes copied is returned (0 being a valid
  * number). Channel flag READ_PARTIAL is updated if some data can be
- * transferred.
+ * transferred. Channel flag CF_WAKE_WRITE is set if the write fails because
+ * the buffer is full.
  */
 int bi_putblk(struct channel *chn, const char *blk, int len)
 {
@@ -161,6 +166,7 @@
 		if (len > max)
 			return -3;
 
+		chn->flags |= CF_WAKE_WRITE;
 		return -1;
 	}
 
diff --git a/src/dumpstats.c b/src/dumpstats.c
index eeb662b..e313b5a 100644
--- a/src/dumpstats.c
+++ b/src/dumpstats.c
@@ -1965,8 +1965,10 @@
 			/* ensure we have some output room left in the event we
 			 * would want to return some info right after parsing.
 			 */
-			if (buffer_almost_full(si->ib->buf))
+			if (buffer_almost_full(si->ib->buf)) {
+				si->ib->flags |= CF_WAKE_WRITE;
 				break;
+			}
 
 			reql = bo_getline(si->ob, trash.str, trash.size);
 			if (reql <= 0) { /* closed or EOL not found */
@@ -3360,8 +3362,10 @@
 	case STAT_PX_ST_LI:
 		/* stats.l has been initialized above */
 		for (; appctx->ctx.stats.l != &px->conf.listeners; appctx->ctx.stats.l = l->by_fe.n) {
-			if (buffer_almost_full(rep->buf))
+			if (buffer_almost_full(rep->buf)) {
+				rep->flags |= CF_WAKE_WRITE;
 				return 0;
+			}
 
 			l = LIST_ELEM(appctx->ctx.stats.l, struct listener *, by_fe);
 			if (!l->counters)
@@ -3390,8 +3394,10 @@
 		for (; appctx->ctx.stats.sv != NULL; appctx->ctx.stats.sv = sv->next) {
 			int sv_state; /* 0=DOWN, 1=going up, 2=going down, 3=UP, 4,5=NOLB, 6=unchecked */
 
-			if (buffer_almost_full(rep->buf))
+			if (buffer_almost_full(rep->buf)) {
+				rep->flags |= CF_WAKE_WRITE;
 				return 0;
+			}
 
 			sv = appctx->ctx.stats.sv;
 
@@ -3874,8 +3880,10 @@
 	case STAT_ST_LIST:
 		/* dump proxies */
 		while (appctx->ctx.stats.px) {
-			if (buffer_almost_full(rep->buf))
+			if (buffer_almost_full(rep->buf)) {
+				rep->flags |= CF_WAKE_WRITE;
 				return 0;
+			}
 
 			px = appctx->ctx.stats.px;
 			/* skip the disabled proxies, global frontend and non-networked ones */
diff --git a/src/proto_http.c b/src/proto_http.c
index 18da38f..16db90a 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -2307,6 +2307,7 @@
 				/* some data has still not left the buffer, wake us once that's done */
 				channel_dont_connect(req);
 				req->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
+				req->flags |= CF_WAKE_WRITE;
 				return 0;
 			}
 			if (unlikely(bi_end(req->buf) < b_ptr(req->buf, msg->next) ||
@@ -2331,6 +2332,7 @@
 				/* don't let a connection request be initiated */
 				channel_dont_connect(req);
 				s->rep->flags &= ~CF_EXPECT_MORE; /* speed up sending a previous response */
+				s->rep->flags |= CF_WAKE_WRITE;
 				s->rep->analysers |= an_bit; /* wake us up once it changes */
 				return 0;
 			}
@@ -5115,6 +5117,7 @@
 				goto abort_response;
 			channel_dont_close(rep);
 			rep->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
+			rep->flags |= CF_WAKE_WRITE;
 			return 0;
 		}
 
diff --git a/src/session.c b/src/session.c
index f06d06b..18d670e 100644
--- a/src/session.c
+++ b/src/session.c
@@ -1605,7 +1605,8 @@
 	memset(&s->txn.auth, 0, sizeof(s->txn.auth));
 
 	/* This flag must explicitly be set every time */
-	s->req->flags &= ~CF_READ_NOEXP;
+	s->req->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
+	s->rep->flags &= ~(CF_READ_NOEXP|CF_WAKE_WRITE);
 
 	/* Keep a copy of req/rep flags so that we can detect shutdowns */
 	rqf_last = s->req->flags & ~CF_MASK_ANALYSER;
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 9aa1907..f38ddc1 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -205,9 +205,7 @@
 	    /* changes on the consumption side */
 	    (si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
 	    ((si->ob->flags & CF_WRITE_ACTIVITY) &&
-	     ((si->ob->flags & CF_SHUTW) ||
-	      si->ob->prod->state != SI_ST_EST ||
-	      (channel_is_empty(si->ob) && !si->ob->to_forward)))) {
+	     (si->ob->flags & (CF_SHUTW|CF_WAKE_WRITE)))) {
 		if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
 			task_wakeup(si->owner, TASK_WOKEN_IO);
 	}
@@ -630,9 +628,7 @@
 	    /* changes on the consumption side */
 	    (si->ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
 	    ((si->ob->flags & CF_WRITE_ACTIVITY) &&
-	     ((si->ob->flags & CF_SHUTW) ||
-	      si->ob->prod->state != SI_ST_EST ||
-	      (channel_is_empty(si->ob) && !si->ob->to_forward)))) {
+	     (si->ob->flags & (CF_SHUTW|CF_WAKE_WRITE)))) {
 		task_wakeup(si->owner, TASK_WOKEN_IO);
 	}
 	if (si->ib->flags & CF_READ_ACTIVITY)
@@ -1045,9 +1041,7 @@
 	/* in case of special condition (error, shutdown, end of write...), we
 	 * have to notify the task.
 	 */
-	if (likely((ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
-		   (channel_is_empty(ob) && !ob->to_forward) ||
-		   si->state != SI_ST_EST)) {
+	if (likely(ob->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW|CF_WAKE_WRITE))) {
 	out_wakeup:
 		if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
 			task_wakeup(si->owner, TASK_WOKEN_IO);