BUG/MEDIUM: mux-h1: Add a task to handle connection timeouts

The mux h1 mainly depends on the stream to handle errors and timeouts. But,
there is one unhandled case. If a timeout occurred when some outgoing data are
blocked in the output buffer, the stream is detached and the mux waits infinitly
the data are gone before closing the connection. To fix the bug, a task has been
added on the mux to handle connection timeouts. For now, a expiration date is
set only when some outgoing data are blocked. And if a stream is still attached
when the mux's task timed out, an error flag is set on the mux but the
connection is not closed immediatly. We assume the stream will hit the same
timeout just after.

This patch must be backported to 1.9.
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 7eff9d2..4df1289 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -82,6 +82,9 @@
 	struct wait_event wait_event;    /* To be used if we're waiting for I/Os */
 
 	struct h1s *h1s;                 /* H1 stream descriptor */
+	struct task *task;               /* timeout management task */
+	int timeout;                     /* idle timeout duration in ticks */
+	int shut_timeout;                /* idle timeout duration in ticks after stream shutdown */
 };
 
 /* H1 stream descriptor */
@@ -111,6 +114,7 @@
 static int h1_process(struct h1c *h1c);
 static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short state);
 static void h1_shutw_conn(struct connection *conn);
+static struct task *h1_timeout_task(struct task *t, void *context, unsigned short state);
 
 /*****************************************************/
 /* functions below are for dynamic buffer management */
@@ -354,6 +358,7 @@
 static int h1_init(struct connection *conn, struct proxy *proxy, struct session *sess)
 {
 	struct h1c *h1c;
+	struct task *t = NULL;
 
 	h1c = pool_alloc(pool_head_h1c);
 	if (!h1c)
@@ -365,6 +370,7 @@
 	h1c->ibuf  = BUF_NULL;
 	h1c->obuf  = BUF_NULL;
 	h1c->h1s   = NULL;
+	h1c->task = NULL;
 
 	LIST_INIT(&h1c->buf_wait.list);
 	h1c->wait_event.task = tasklet_new();
@@ -374,6 +380,26 @@
 	h1c->wait_event.task->context = h1c;
 	h1c->wait_event.events   = 0;
 
+	if (conn->ctx) {
+		h1c->shut_timeout = h1c->timeout = proxy->timeout.server;
+		if (tick_isset(proxy->timeout.serverfin))
+			h1c->shut_timeout = proxy->timeout.serverfin;
+	} else {
+		h1c->shut_timeout = h1c->timeout = proxy->timeout.client;
+		if (tick_isset(proxy->timeout.clientfin))
+			h1c->shut_timeout = proxy->timeout.clientfin;
+	}
+	if (tick_isset(h1c->timeout)) {
+		t = task_new(tid_bit);
+		if (!t)
+			goto fail;
+
+		h1c->task = t;
+		t->process = h1_timeout_task;
+		t->context = h1c;
+		t->expire = tick_add(now_ms, h1c->timeout);
+	}
+
 	if (!(conn->flags & CO_FL_CONNECTED))
 		h1c->flags |= H1C_F_CS_WAIT_CONN;
 
@@ -383,6 +409,10 @@
 
 	conn->ctx = h1c;
 
+
+	if (t)
+		task_queue(t);
+
 	/* Try to read, if nothing is available yet we'll just subscribe */
 	if (h1_recv(h1c))
 		h1_process(h1c);
@@ -391,6 +421,8 @@
 	return 0;
 
   fail:
+	if (t)
+		task_free(t);
 	if (h1c->wait_event.task)
 		tasklet_free(h1c->wait_event.task);
 	pool_free(pool_head_h1c, h1c);
@@ -419,6 +451,12 @@
 		h1_release_buf(h1c, &h1c->ibuf);
 		h1_release_buf(h1c, &h1c->obuf);
 
+		if (h1c->task) {
+			h1c->task->context = NULL;
+			task_wakeup(h1c->task, TASK_WOKEN_OTHER);
+			h1c->task = NULL;
+		}
+
 		if (h1c->wait_event.task)
 			tasklet_free(h1c->wait_event.task);
 
@@ -1830,6 +1868,15 @@
 		h1s->cs->data_cb->wake(h1s->cs);
 	}
   end:
+	if (h1c->task) {
+		h1c->task->expire = TICK_ETERNITY;
+		if (b_data(&h1c->obuf)) {
+			h1c->task->expire = tick_add(now_ms, ((h1c->flags & (H1C_F_CS_SHUTW_NOW|H1C_F_CS_SHUTW))
+							      ? h1c->shut_timeout
+							      : h1c->timeout));
+			task_queue(h1c->task);
+		}
+	}
 	return 0;
 
   release:
@@ -1875,6 +1922,37 @@
 	return ret;
 }
 
+/* Connection timeout management. The principle is that if there's no receipt
+ * nor sending for a certain amount of time, the connection is closed.
+ */
+static struct task *h1_timeout_task(struct task *t, void *context, unsigned short state)
+{
+	struct h1c *h1c = context;
+	int expired = tick_is_expired(t->expire, now_ms);
+
+	if (!expired && h1c)
+		return t;
+
+	task_delete(t);
+	task_free(t);
+
+	if (!h1c) {
+		/* resources were already deleted */
+		return NULL;
+	}
+
+	h1c->task = NULL;
+	/* If a stream is still attached to the mux, just set an error and wait
+	 * for the stream's timeout. Otherwise, release the mux. This is only ok
+	 * because same timeouts are used.
+	 */
+	if (h1c->h1s && h1c->h1s->cs)
+		h1c->flags |= H1C_F_CS_ERROR;
+	else
+		h1_release(h1c->conn);
+	return NULL;
+}
+
 /*******************************************/
 /* functions below are used by the streams */
 /*******************************************/
@@ -2001,8 +2079,18 @@
 	if ((h1c->flags & (H1C_F_CS_ERROR|H1C_F_CS_SHUTW)) ||
 	    (h1c->conn->flags & CO_FL_ERROR) || !h1c->conn->owner)
 		h1_release(h1c->conn);
-	else
+	else {
 		tasklet_wakeup(h1c->wait_event.task);
+		if (h1c->task) {
+			h1c->task->expire = TICK_ETERNITY;
+			if (b_data(&h1c->obuf)) {
+				h1c->task->expire = tick_add(now_ms, ((h1c->flags & (H1C_F_CS_SHUTW_NOW|H1C_F_CS_SHUTW))
+								      ? h1c->shut_timeout
+								      : h1c->timeout));
+				task_queue(h1c->task);
+			}
+		}
+	}
 }