MEDIUM: mux-h1: Handle errors and timeouts in the stream

To do so, the stream is created as earlier as possible. It means, during the mux
creation for the first request, and for others, just at the end of the previous
transaction. Because all timeouts are handled by the strream, the mux's task is
now useless, so it is removed. Finally, to report errors, flags are set on the
HTX message. The HTX message is passed to the stream if there is some content to
analyse or if there is some error to handle.

All of this will probably be reworked later to handle errors and timeouts
directly in the mux. For now, it is the simpler way to handle all of this.
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 178465a..9492538 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -80,10 +80,6 @@
 	struct wait_event wait_event;    /* To be used if we're waiting for I/Os */
 
 	struct h1s *h1s;                 /* H1 stream descriptor */
-	struct task *task;               /* timeout management task */
-
-	int idle_exp;                    /* expiration date for idle connections, in ticks (client-side only)*/
-	int http_exp;                    /* expiration date for HTTP headers parsing (client-side only) */
 };
 
 /* H1 stream descriptor */
@@ -108,7 +104,6 @@
 static struct pool_head *pool_head_h1c;
 static struct pool_head *pool_head_h1s;
 
-static struct task *h1_timeout_task(struct task *t, void *context, unsigned short state);
 static int h1_recv(struct h1c *h1c);
 static int h1_send(struct h1c *h1c);
 static int h1_process(struct h1c *h1c);
@@ -225,13 +220,36 @@
 /*****************************************************************/
 /* functions below are dedicated to the mux setup and management */
 /*****************************************************************/
+static struct conn_stream *h1s_new_cs(struct h1s *h1s)
+{
+	struct conn_stream *cs;
+
+	cs = cs_new(h1s->h1c->conn);
+	if (!cs)
+		goto err;
+	h1s->cs = cs;
+	cs->ctx = h1s;
+
+	if (h1s->flags & H1S_F_NOT_FIRST)
+		cs->flags |= CS_FL_NOT_FIRST;
+
+	if (stream_create_from_cs(cs) < 0)
+		goto err;
+	return cs;
+
+  err:
+	cs_free(cs);
+	h1s->cs = NULL;
+	return NULL;
+}
+
 static struct h1s *h1s_create(struct h1c *h1c, struct conn_stream *cs)
 {
 	struct h1s *h1s;
 
 	h1s = pool_alloc(pool_head_h1s);
 	if (!h1s)
-		goto end;
+		goto fail;
 
 	h1s->h1c = h1c;
 	h1c->h1s = h1s;
@@ -252,27 +270,36 @@
 	h1s->status = 0;
 	h1s->meth   = HTTP_METH_OTHER;
 
+	if (h1c->flags & H1C_F_WAIT_NEXT_REQ)
+		h1s->flags |= H1S_F_NOT_FIRST;
+	h1c->flags &= ~H1C_F_WAIT_NEXT_REQ;
+
 	if (!conn_is_back(h1c->conn)) {
 		if (h1c->px->options2 & PR_O2_REQBUG_OK)
 			h1s->req.err_pos = -1;
-
-		if (h1c->flags & H1C_F_WAIT_NEXT_REQ)
-			h1s->flags |= H1S_F_NOT_FIRST;
-		h1c->flags &= ~H1C_F_WAIT_NEXT_REQ;
-		h1c->http_exp = tick_add_ifset(now_ms, h1c->px->timeout.httpreq);
 	}
 	else {
 		if (h1c->px->options2 & PR_O2_RSPBUG_OK)
 			h1s->res.err_pos = -1;
 	}
 
-	/* If a conn_stream already exists, attach it to this H1S */
 	if (cs) {
+		/* If a conn_stream already exists, attach it to this H1S */
 		cs->ctx = h1s;
 		h1s->cs = cs;
 	}
-  end:
+#if 1
+	else {
+		cs = h1s_new_cs(h1s);
+		if (!cs)
+			goto fail;
+	}
+#endif
 	return h1s;
+
+  fail:
+	pool_free(pool_head_h1s, h1s);
+	return NULL;
 }
 
 static void h1s_destroy(struct h1s *h1s)
@@ -288,10 +315,9 @@
 		if (h1s->send_wait != NULL)
 			h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
 
-		if (!conn_is_back(h1c->conn)) {
-			h1c->flags |= H1C_F_WAIT_NEXT_REQ;
-			h1c->http_exp = tick_add_ifset(now_ms, h1c->px->timeout.httpka);
-		}
+		h1c->flags |= H1C_F_WAIT_NEXT_REQ;
+		if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR))
+			h1c->flags |= H1C_F_CS_ERROR;
 
 		h1_release_buf(h1c, &h1s->rxbuf);
 		cs_free(h1s->cs);
@@ -299,29 +325,6 @@
 	}
 }
 
-static struct conn_stream *h1s_new_cs(struct h1s *h1s)
-{
-	struct conn_stream *cs;
-
-	cs = cs_new(h1s->h1c->conn);
-	if (!cs)
-		goto err;
-	h1s->cs = cs;
-	cs->ctx = h1s;
-
-	if (h1s->flags & H1S_F_NOT_FIRST)
-		cs->flags |= CS_FL_NOT_FIRST;
-
-	if (stream_create_from_cs(cs) < 0)
-		goto err;
-	return cs;
-
-  err:
-	cs_free(cs);
-	h1s->cs = NULL;
-	return NULL;
-}
-
 /*
  * Initialize the mux once it's attached. It is expected that conn->mux_ctx
  * points to the existing conn_stream (for outgoing connections) or NULL (for
@@ -330,7 +333,6 @@
 static int h1_init(struct connection *conn, struct proxy *proxy)
 {
 	struct h1c *h1c;
-	struct task *t = NULL;
 
 	h1c = pool_alloc(pool_head_h1c);
 	if (!h1c)
@@ -343,17 +345,6 @@
 	h1c->obuf  = BUF_NULL;
 	h1c->h1s   = NULL;
 
-	t = task_new(tid_bit);
-	if (!t)
-		goto fail;
-	h1c->task  = t;
-	t->process = h1_timeout_task;
-	t->context = h1c;
-	t->expire  = TICK_ETERNITY;
-
-	h1c->idle_exp = TICK_ETERNITY;
-	h1c->http_exp = TICK_ETERNITY;
-
 	LIST_INIT(&h1c->buf_wait.list);
 	h1c->wait_event.task = tasklet_new();
 	if (!h1c->wait_event.task)
@@ -370,7 +361,6 @@
 		goto fail;
 
 	conn->mux_ctx = h1c;
-	task_wakeup(t, TASK_WOKEN_INIT);
 
 	/* Try to read, if nothing is available yet we'll just subscribe */
 	if (h1_recv(h1c))
@@ -380,9 +370,7 @@
 	return 0;
 
   fail:
-	if (t)
-		task_free(t);
-	if (h1c && h1c->wait_event.task)
+	if (h1c->wait_event.task)
 		tasklet_free(h1c->wait_event.task);
 	pool_free(pool_head_h1c, h1c);
  fail_h1c:
@@ -410,11 +398,6 @@
 		h1_release_buf(h1c, &h1c->ibuf);
 		h1_release_buf(h1c, &h1c->obuf);
 
-		if (h1c->task) {
-			h1c->task->context = NULL;
-			task_wakeup(h1c->task, TASK_WOKEN_OTHER);
-			h1c->task = NULL;
-		}
 		if (h1c->wait_event.task)
 			tasklet_free(h1c->wait_event.task);
 
@@ -438,21 +421,6 @@
 /******************************************************/
 /* functions below are for the H1 protocol processing */
 /******************************************************/
-/*
- * Set the appropriate error message. It first tries to get it from the proxy if
- * it exists. Otherwise, it falls back on default one.
- */
-static void h1_cpy_error_message(struct h1c *h1c, struct buffer *dst, int status)
-{
-	const int msgnum = http_get_status_idx(status);
-	const struct buffer *err;
-
-	err = (h1c->px->errmsg[msgnum].area
-	       ? &h1c->px->errmsg[msgnum]
-	       : &http_err_chunks[msgnum]);
-	b_putblk(dst, b_head(err), b_data(err));
-}
-
 /* Parse the request version and set H1_MF_VER_11 on <h1m> if the version is
  * greater or equal to 1.1
  */
@@ -1099,22 +1067,22 @@
 
 	h1s = NULL;
 
-	/* Create a new H1S without CS if not already done */
+	/* Create a new H1S if not already done */
 	if (!h1c->h1s && !h1s_create(h1c, NULL))
-		goto err;
+		goto fatal_err;
 	h1s = h1c->h1s;
-
 #if 0
-	// FIXME: Use a proxy option to enable early creation of the CS
 	/* Create the CS if not already attached to the H1S */
 	if (!h1s->cs && !h1s_new_cs(h1s))
-		goto err;
+		goto fatal_err;
 #endif
-
+	if (!count)
+		goto end;
 	if (!h1_get_buf(h1c, &h1s->rxbuf)) {
 		h1c->flags |= H1C_F_RX_ALLOC;
 		goto end;
 	}
+
 	htx = htx_from_buf(&h1s->rxbuf);
 
 	if (!conn_is_back(h1c->conn)) {
@@ -1132,13 +1100,11 @@
 			ret = h1_process_headers(h1s, h1m, htx, buf, &total, max);
 			if (!ret)
 				break;
-
-			/* Reset request timeout */
-			h1s->h1c->http_exp = TICK_ETERNITY;
-
+#if 0
 			/* Create the CS if not already attached to the H1S */
 			if (!h1s->cs && !h1s_new_cs(h1s))
-				goto err;
+				goto fatal_err;
+#endif
 		}
 		else if (h1m->state <= H1_MSG_TRAILERS) {
 			/* Do not parse the body if the header part is not yet
@@ -1165,59 +1131,41 @@
 		max -= ret;
 	}
 
-	if (h1s->flags & errflag) {
-		if (conn_is_back(h1c->conn))
-			goto err;
-
-		// FIXME: Do following actions when an error is catched during
-		// the request parsing:
-		//
-		//  * Do same than stream_inc_http_req_ctr,
-		//    stream_inc_http_err_ctr and proxy_inc_fe_req_ctr
-		//  * Capture bad message for snapshots
-		//  * Increment fe->fe_counters.failed_req and
-		//    listeners->counters->failed_req
-		//
-		// FIXME: Do following actions when an error is catched during
-		// the response parsing:
-		//
-		//  * Capture bad message for snapshots
-		//  * increment be->be_counters.failed_resp
-		//  * increment srv->counters.failed_resp (if srv assigned)
-		if (!h1_get_buf(h1c, &h1c->obuf)) {
-			h1c->flags |= H1C_F_OUT_ALLOC;
-			goto err;
-		}
-		h1_cpy_error_message(h1c, &h1c->obuf, 400);
-		goto err;
-	}
+	if (h1s->flags & errflag)
+		goto parsing_err;
 
 	b_del(buf, total);
-
 	if (htx_is_not_empty(htx)) {
 		b_set_data(&h1s->rxbuf, b_size(&h1s->rxbuf));
 		if (!htx_free_data_space(htx))
 			h1c->flags |= H1C_F_RX_FULL;
-
-		if (h1s->recv_wait) {
-			h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
-			tasklet_wakeup(h1s->recv_wait->task);
-			h1s->recv_wait = NULL;
-		}
 	}
 	else
 		h1_release_buf(h1c, &h1s->rxbuf);
 
 	ret = count - max;
-
+	if (h1s->recv_wait) {
+		h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
+		tasklet_wakeup(h1s->recv_wait->task);
+		h1s->recv_wait = NULL;
+	}
   end:
 	return ret;
 
-  err:
-	//h1s_destroy(h1s);
+  fatal_err:
 	h1c->flags |= H1C_F_CS_ERROR;
-	if (!h1s || !h1s->cs)
-		sess_log(h1c->conn->owner);
+	sess_log(h1c->conn->owner);
+	return 0;
+
+  parsing_err:
+	// FIXME: create an error snapshot here
+	b_reset(&h1c->ibuf);
+	h1s->cs->flags |= CS_FL_REOS;
+	if (h1s->recv_wait) {
+		h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
+		tasklet_wakeup(h1s->recv_wait->task);
+		h1s->recv_wait = NULL;
+	}
 	return 0;
 }
 
@@ -1236,6 +1184,8 @@
 	size_t total = 0;
 	int errflag;
 
+	if (!count)
+		goto end;
 	chn_htx = htx_from_buf(buf);
 
 	if (!h1_get_buf(h1c, &h1c->obuf)) {
@@ -1391,7 +1341,6 @@
 		htx_reset(chn_htx);
 		b_set_data(buf, 0);
 	}
-
   end:
 	return total;
 }
@@ -1411,12 +1360,15 @@
 
 	h1m = (!conn_is_back(h1c->conn) ? &h1s->req : &h1s->res);
 	mux_htx = htx_from_buf(&h1s->rxbuf);
+	chn_htx = htx_from_buf(buf);
 
+	if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR)) {
+		chn_htx->flags |= HTX_FL_PARSING_ERROR;
+		b_set_data(buf, b_size(buf));
+		goto end;
+	}
 	if (htx_is_empty(mux_htx))
 		goto end;
-
-	chn_htx = htx_from_buf(buf);
-
 	count = htx_free_space(chn_htx);
 	if (flags & CO_RFL_KEEP_RSV) {
 		if (count < global.tune.maxrewrite)
@@ -1442,7 +1394,6 @@
 
 	if (htx_is_not_empty(chn_htx))
 		b_set_data(buf, b_size(buf));
-
   end:
 	if (h1c->flags & H1C_F_RX_FULL && htx_free_data_space(mux_htx)) {
 		h1c->flags &= ~H1C_F_RX_FULL;
@@ -1501,8 +1452,11 @@
 		h1c->flags &= ~H1C_F_IN_FULL;
 		ret = conn->xprt->rcv_buf(conn, &h1c->ibuf, max, 0);
 	}
-	if (ret > 0)
+	if (ret > 0) {
 		rcvd = 1;
+		if (h1c->h1s && h1c->h1s->cs)
+			h1c->h1s->cs->flags |= CS_FL_READ_PARTIAL;
+	}
 
 	if (h1_recv_allowed(h1c))
 		conn->xprt->subscribe(conn, SUB_CAN_RECV, &h1c->wait_event);
@@ -1609,7 +1563,7 @@
 {
 	struct connection *conn = h1c->conn;
 
-	if (b_data(&h1c->ibuf) && !(h1c->flags & (H1C_F_CS_ERROR|H1C_F_RX_FULL|H1C_F_RX_ALLOC))) {
+	if (!(h1c->flags & (H1C_F_CS_ERROR|H1C_F_RX_FULL|H1C_F_RX_ALLOC))) {
 		size_t ret;
 
 		ret = h1_process_input(h1c, &h1c->ibuf, b_data(&h1c->ibuf));
@@ -1630,7 +1584,7 @@
 			h1c->flags &= ~H1C_F_CS_WAIT_CONN;
 			h1_wake_stream(h1c);
 		}
-		return 0;
+		goto end;
 	}
 
 	if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn)) {
@@ -1641,20 +1595,7 @@
 		}
 	}
 
-	/* If there is a stream attached to the mux, let it
-	 * handle the timeout.
-	 */
-	if (h1c->h1s && h1c->h1s->cs)
-		h1c->idle_exp = TICK_ETERNITY;
-	else {
-		int tout = (!conn_is_back(conn)
-			    ? h1c->px->timeout.client
-			    : h1c->px->timeout.server);
-		h1c->idle_exp = tick_add_ifset(now_ms, tout);
-	}
-	h1c->task->expire = tick_first(h1c->http_exp, h1c->idle_exp);
-	if (tick_isset(h1c->task->expire))
-		task_queue(h1c->task);
+  end:
 	return 0;
 }
 
@@ -1677,74 +1618,9 @@
 {
 	struct h1c *h1c = conn->mux_ctx;
 
-	//return 0;
 	return (h1_process(h1c));
 }
 
-
-/* Connection timeout management. The principle is that if there's no receipt
- * nor sending for a certain amount of time, the connection is closed.
- */
-static struct task *h1_timeout_task(struct task *t, void *context, unsigned short state)
-{
-	struct h1c *h1c = context;
-	int expired = tick_is_expired(t->expire, now_ms);
-
-	if (!h1c)
-		goto end;
-
-	if (!expired) {
-		t->expire = tick_first(t->expire, tick_first(h1c->idle_exp, h1c->http_exp));
-		return t;
-	}
-
-	h1c->flags   |= H1C_F_CS_ERROR;
-	h1c->idle_exp = TICK_ETERNITY;
-	h1c->http_exp = TICK_ETERNITY;
-	t->expire     = TICK_ETERNITY;
-
-	/* Don't try send error message on the server-side */
-	if (conn_is_back(h1c->conn))
-		goto release;
-
-	/* Don't send error message if no input data is pending _AND_ if null
-	 * requests is ignored or it's not the first request.
-	 */
-	if (!b_data(&h1c->ibuf) && (h1c->px->options & PR_O_IGNORE_PRB ||
-				    h1c->flags & H1C_F_WAIT_NEXT_REQ))
-		goto release;
-
-	/* Try to allocate output buffer to store the error message. If
-	 * allocation fails, just go away.
-	 */
-	if (!h1_get_buf(h1c, &h1c->obuf))
-		goto release;
-
-	// FIXME: Do the following:
-	//
-	//  * Do same than stream_inc_http_req_ctr,
-	//    stream_inc_http_err_ctr and proxy_inc_fe_req_ctr
-	//  * Capture bad message for snapshots
-	//  * Increment fe->fe_counters.failed_req and
-	//    listeners->counters->failed_req
-	h1_cpy_error_message(h1c, &h1c->obuf, 408);
-	tasklet_wakeup(h1c->wait_event.task);
-	sess_log(h1c->conn->owner);
-	return t;
-
-  release:
-	if (h1c->h1s) {
-		tasklet_wakeup(h1c->wait_event.task);
-		return t;
-	}
-	h1c->task = NULL;
-	h1_release(h1c->conn);
-  end:
-	task_delete(t);
-	task_free(t);
-	return NULL;
-}
-
 /*******************************************/
 /* functions below are used by the streams */
 /*******************************************/
diff --git a/src/proto_htx.c b/src/proto_htx.c
index 382df6c..c16fc1e 100644
--- a/src/proto_htx.c
+++ b/src/proto_htx.c
@@ -117,6 +117,16 @@
 	 * a bad request is.
 	 */
 	if (unlikely(htx_is_empty(htx) || htx_get_tail_type(htx) < HTX_BLK_EOH)) {
+		/*
+		 * First catch invalid request
+		 */
+		if (htx->flags & HTX_FL_PARSING_ERROR) {
+			stream_inc_http_req_ctr(s);
+			stream_inc_http_err_ctr(s);
+			proxy_inc_fe_req_ctr(sess->fe);
+			goto return_bad_req;
+		}
+
 		/* 1: have we encountered a read error ? */
 		if (req->flags & CF_READ_ERROR) {
 			if (!(s->flags & SF_ERR_MASK))
@@ -217,8 +227,7 @@
 			setsockopt(__objt_conn(sess->origin)->handle.fd, IPPROTO_TCP, TCP_QUICKACK, &one, sizeof(one));
 		}
 #endif
-
-		if ((msg->msg_state != HTTP_MSG_RQBEFORE) && (txn->flags & TX_WAIT_NEXT_RQ)) {
+		if ((req->flags & CF_READ_PARTIAL) && (txn->flags & TX_WAIT_NEXT_RQ)) {
 			/* If the client starts to talk, let's fall back to
 			 * request timeout processing.
 			 */
@@ -228,9 +237,7 @@
 
 		/* just set the request timeout once at the beginning of the request */
 		if (!tick_isset(req->analyse_exp)) {
-			if ((msg->msg_state == HTTP_MSG_RQBEFORE) &&
-			    (txn->flags & TX_WAIT_NEXT_RQ) &&
-			    tick_isset(s->be->timeout.httpka))
+			if ((txn->flags & TX_WAIT_NEXT_RQ) && tick_isset(s->be->timeout.httpka))
 				req->analyse_exp = tick_add(now_ms, s->be->timeout.httpka);
 			else
 				req->analyse_exp = tick_add_ifset(now_ms, s->be->timeout.httpreq);
@@ -1091,6 +1098,9 @@
 		goto http_end;
 
  missing_data:
+	if (htx->flags & HTX_FL_PARSING_ERROR)
+		goto return_bad_req;
+
 	if ((req->flags & CF_READ_TIMEOUT) || tick_is_expired(req->analyse_exp, now_ms)) {
 		txn->status = 408;
 		htx_reply_and_close(s, txn->status, http_error_message(s));
@@ -1305,6 +1315,8 @@
 	if (req->flags & CF_SHUTW)
 		goto aborted_xfer;
 
+	if (htx->flags & HTX_FL_PARSING_ERROR)
+		goto return_bad_req;
 
 	/* When TE: chunked is used, we need to get there again to parse remaining
 	 * chunks even if the client has closed, so we don't want to set CF_DONTCLOSE.
@@ -1438,6 +1450,12 @@
 	 * errors somewhere else.
 	 */
 	if (unlikely(htx_is_empty(htx) || htx_get_tail_type(htx) < HTX_BLK_EOH)) {
+		/*
+		 * First catch invalid response
+		 */
+		if (htx->flags & HTX_FL_PARSING_ERROR)
+			goto return_bad_res;
+
 		/* 1: have we encountered a read error ? */
 		if (rep->flags & CF_READ_ERROR) {
 			if (txn->flags & TX_NOT_FIRST)
@@ -1704,6 +1722,23 @@
 	channel_auto_close(rep);
 	return 1;
 
+ return_bad_res:
+	HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
+	if (objt_server(s->target)) {
+		HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1);
+		health_adjust(objt_server(s->target), HANA_STATUS_HTTP_HDRRSP);
+	}
+	txn->status = 502;
+	s->si[1].flags |= SI_FL_NOLINGER;
+	htx_reply_and_close(s, txn->status, http_error_message(s));
+	rep->analysers &= AN_RES_FLT_END;
+
+	if (!(s->flags & SF_ERR_MASK))
+		s->flags |= SF_ERR_PRXCOND;
+	if (!(s->flags & SF_FINST_MASK))
+		s->flags |= SF_FINST_H;
+	return 0;
+
  abort_keep_alive:
 	/* A keep-alive request to the server failed on a network error.
 	 * The client is required to retry. We need to close without returning
@@ -2145,6 +2180,9 @@
 	if (res->flags & CF_SHUTW)
 		goto aborted_xfer;
 
+	if (htx->flags & HTX_FL_PARSING_ERROR)
+		goto return_bad_res;
+
 	/* stop waiting for data if the input is closed before the end. If the
 	 * client side was already closed, it means that the client has aborted,
 	 * so we don't want to count this as a server abort. Otherwise it's a