MAJOR: mux-h1: Remove the rxbuf and decode HTTP messages in channel's buffer

It avoids a copy between the rxbuf and the channel's buffer. It means the
parsing is done in h1_rcv_buf(). So we need to have a stream to start the
parsing. This change should improve the overall performances. It also implies a
better split between the connection layer and the applicative layer. Now, on the
connection layer, only raw data are manipulated. Raw data received from the
socket are stored in ibuf and those sent are get from obuf. On the applicative
layer, data in ibuf are parsed and copied into the channel's buffer. And on the
other side, those structured data are formatted and copied into obuf.
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 9492538..cbbcd90 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -37,12 +37,7 @@
 /* Flags indicating why reading input data are blocked. */
 #define H1C_F_IN_ALLOC       0x00000010 /* mux is blocked on lack of input buffer */
 #define H1C_F_IN_FULL        0x00000020 /* mux is blocked on input buffer full */
-/* 0x00000040 - 0x00000080 unused */
-
-/* Flags indicating why parsing data are blocked */
-#define H1C_F_RX_ALLOC       0x00000100 /* mux is blocked on lack of rx buffer */
-#define H1C_F_RX_FULL        0x00000200 /* mux is blocked on rx buffer full */
-/* 0x00000400 - 0x00000800 unused */
+/* 0x00000040 - 0x00000800 unused */
 
 #define H1C_F_CS_ERROR       0x00001000 /* connection must be closed ASAP because an error occurred */
 #define H1C_F_CS_SHUTW_NOW   0x00002000 /* connection must be shut down for writes ASAP */
@@ -58,13 +53,13 @@
 #define H1S_F_ERROR          0x00000001 /* An error occurred on the H1 stream */
 #define H1S_F_REQ_ERROR      0x00000002 /* An error occurred during the request parsing/xfer */
 #define H1S_F_RES_ERROR      0x00000004 /* An error occurred during the response parsing/xfer */
-#define H1S_F_MSG_XFERED     0x00000008 /* current message was transferred to the data layer */
+/* 0x00000008 unused */
 #define H1S_F_WANT_KAL       0x00000010
 #define H1S_F_WANT_TUN       0x00000020
 #define H1S_F_WANT_CLO       0x00000040
 #define H1S_F_WANT_MSK       0x00000070
 #define H1S_F_NOT_FIRST      0x00000080 /* The H1 stream is not the first one */
-#define H1S_F_BUF_FLUSH      0x00000100 /* Flush input buffers (ibuf and rxbuf) and don't read more data */
+#define H1S_F_BUF_FLUSH      0x00000100 /* Flush input buffer and don't read more data */
 
 
 /* H1 connection descriptor */
@@ -88,8 +83,6 @@
 	struct conn_stream *cs;
 	uint32_t flags; /* Connection flags: H1S_F_* */
 
-	struct buffer rxbuf; /*receive buffer, always valid (buf_empty or real buffer) */
-
 	struct wait_event *recv_wait; /* Address of the wait_event the conn_stream associated is waiting on */
 	struct wait_event *send_wait; /* Address of the wait_event the conn_stream associated is waiting on */
 
@@ -168,13 +161,6 @@
 		return 1;
 	}
 
-	if ((h1c->flags & H1C_F_RX_ALLOC) && h1c->h1s && b_alloc_margin(&h1c->h1s->rxbuf, 0)) {
-		h1c->flags &= ~H1C_F_RX_ALLOC;
-		if (h1_recv_allowed(h1c))
-			tasklet_wakeup(h1c->wait_event.task);
-		return 1;
-	}
-
 	return 0;
 }
 
@@ -255,7 +241,6 @@
 	h1c->h1s = h1s;
 
 	h1s->cs    = NULL;
-	h1s->rxbuf = BUF_NULL;
 	h1s->flags = H1S_F_NONE;
 
 	h1s->recv_wait = NULL;
@@ -283,18 +268,18 @@
 			h1s->res.err_pos = -1;
 	}
 
+	/* If a conn_stream already exists, attach it to this H1S. Otherwise we
+	 * create a new one.
+	 */
 	if (cs) {
-		/* If a conn_stream already exists, attach it to this H1S */
 		cs->ctx = h1s;
 		h1s->cs = cs;
 	}
-#if 1
 	else {
 		cs = h1s_new_cs(h1s);
 		if (!cs)
 			goto fail;
 	}
-#endif
 	return h1s;
 
   fail:
@@ -308,7 +293,6 @@
 		struct h1c *h1c = h1s->h1c;
 
 		h1c->h1s = NULL;
-		h1c->flags &= ~(H1C_F_RX_FULL|H1C_F_RX_ALLOC);
 
 		if (h1s->recv_wait != NULL)
 			h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
@@ -319,7 +303,6 @@
 		if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR))
 			h1c->flags |= H1C_F_CS_ERROR;
 
-		h1_release_buf(h1c, &h1s->rxbuf);
 		cs_free(h1s->cs);
 		pool_free(pool_head_h1s, h1s);
 	}
@@ -1030,7 +1013,7 @@
 
 	if (h1s->res.state == H1_MSG_DONE &&
 	    (h1s->status < 200 && (h1s->status == 100 || h1s->status >= 102)) &&
-	    ((!conn_is_back(h1c->conn) && !b_data(&h1c->obuf)) || !b_data(&h1s->rxbuf))) {
+	    (conn_is_back(h1c->conn) || !b_data(&h1c->obuf))) {
 		/* For 100-Continue response or any other informational 1xx
 		 * response which is non-final, don't reset the request, the
 		 * transaction is not finished. We take care the response was
@@ -1039,7 +1022,7 @@
 		h1m_init_res(&h1s->res);
 		h1s->res.flags |= H1_MF_NO_PHDR;
 	}
-	else if (!b_data(&h1s->rxbuf) && !b_data(&h1c->obuf) &&
+	else if (!b_data(&h1c->obuf) &&
 		 h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE) {
 		if (h1s->flags & H1S_F_WANT_TUN) {
 			h1m_init_req(&h1s->req);
@@ -1052,38 +1035,29 @@
 
 /*
  * Process incoming data. It parses data and transfer them from h1c->ibuf into
- * h1s->rxbuf. It returns the number of bytes parsed and transferred if > 0, or
- * 0 if it couldn't proceed.
+ * <buf>. It returns the number of bytes parsed and transferred if > 0, or 0 if
+ * it couldn't proceed.
  */
-static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, size_t count)
+static size_t h1_process_input(struct h1c *h1c, struct buffer *buf, int flags)
 {
-	struct h1s *h1s = NULL;
+	struct h1s *h1s = h1c->h1s;
 	struct h1m *h1m;
 	struct htx *htx;
 	size_t total = 0;
 	size_t ret = 0;
-	size_t max;
+	size_t count, max;
 	int errflag;
 
-	h1s = NULL;
-
-	/* Create a new H1S if not already done */
-	if (!h1c->h1s && !h1s_create(h1c, NULL))
-		goto fatal_err;
-	h1s = h1c->h1s;
-#if 0
-	/* Create the CS if not already attached to the H1S */
-	if (!h1s->cs && !h1s_new_cs(h1s))
-		goto fatal_err;
-#endif
-	if (!count)
-		goto end;
-	if (!h1_get_buf(h1c, &h1s->rxbuf)) {
-		h1c->flags |= H1C_F_RX_ALLOC;
-		goto end;
+	htx = htx_from_buf(buf);
+	count = b_data(&h1c->ibuf);
+	max = htx_free_space(htx);
+	if (flags & CO_RFL_KEEP_RSV) {
+		if (max < global.tune.maxrewrite)
+			goto end;
+		max -= global.tune.maxrewrite;
 	}
-
-	htx = htx_from_buf(&h1s->rxbuf);
+	if (count > max)
+		count = max;
 
 	if (!conn_is_back(h1c->conn)) {
 		h1m = &h1s->req;
@@ -1094,32 +1068,21 @@
 		errflag = H1S_F_RES_ERROR;
 	}
 
-	max = count;
-	while (!(h1s->flags & errflag) && max) {
+	while (!(h1s->flags & errflag) && count) {
 		if (h1m->state <= H1_MSG_LAST_LF) {
-			ret = h1_process_headers(h1s, h1m, htx, buf, &total, max);
+			ret = h1_process_headers(h1s, h1m, htx, &h1c->ibuf, &total, count);
 			if (!ret)
 				break;
-#if 0
-			/* Create the CS if not already attached to the H1S */
-			if (!h1s->cs && !h1s_new_cs(h1s))
-				goto fatal_err;
-#endif
 		}
 		else if (h1m->state <= H1_MSG_TRAILERS) {
-			/* Do not parse the body if the header part is not yet
-			 * transferred to the stream.
-			 */
-			if (!(h1s->flags & H1S_F_MSG_XFERED))
-				break;
-			ret = h1_process_data(h1s, h1m, htx, buf, &total, max);
+			ret = h1_process_data(h1s, h1m, htx, &h1c->ibuf, &total, count);
 			if (!ret)
 				break;
 		}
 		else if (h1m->state == H1_MSG_DONE)
 			break;
 		else if (h1m->state == H1_MSG_TUNNEL) {
-			ret = h1_process_data(h1s, h1m, htx, buf, &total, max);
+			ret = h1_process_data(h1s, h1m, htx, &h1c->ibuf, &total, count);
 			if (!ret)
 				break;
 		}
@@ -1128,44 +1091,45 @@
 			break;
 		}
 
-		max -= ret;
+		count -= ret;
 	}
 
 	if (h1s->flags & errflag)
 		goto parsing_err;
 
-	b_del(buf, total);
-	if (htx_is_not_empty(htx)) {
-		b_set_data(&h1s->rxbuf, b_size(&h1s->rxbuf));
-		if (!htx_free_data_space(htx))
-			h1c->flags |= H1C_F_RX_FULL;
+	b_del(&h1c->ibuf, total);
+
+  end:
+	if (htx_is_not_empty(htx))
+		b_set_data(buf, b_size(buf));
+	else {
+		htx_reset(htx);
+		b_set_data(buf, 0);
 	}
-	else
-		h1_release_buf(h1c, &h1s->rxbuf);
 
-	ret = count - max;
-	if (h1s->recv_wait) {
-		h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
-		tasklet_wakeup(h1s->recv_wait->task);
-		h1s->recv_wait = NULL;
+	if (h1c->flags & H1C_F_IN_FULL && b_room(&h1c->ibuf)) {
+		h1c->flags &= ~H1C_F_IN_FULL;
+		tasklet_wakeup(h1c->wait_event.task);
 	}
-  end:
-	return ret;
 
-  fatal_err:
-	h1c->flags |= H1C_F_CS_ERROR;
-	sess_log(h1c->conn->owner);
-	return 0;
+	if (b_data(&h1c->ibuf))
+		h1s->cs->flags |= CS_FL_RCV_MORE;
+	else {
+		h1_release_buf(h1c, &h1c->ibuf);
+		h1_sync_messages(h1c);
+
+		h1s->cs->flags &= ~CS_FL_RCV_MORE;
+		if (h1s->cs->flags & CS_FL_REOS)
+			h1s->cs->flags |= CS_FL_EOS;
+	}
+	return total;
 
   parsing_err:
 	// FIXME: create an error snapshot here
 	b_reset(&h1c->ibuf);
-	h1s->cs->flags |= CS_FL_REOS;
-	if (h1s->recv_wait) {
-		h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
-		tasklet_wakeup(h1s->recv_wait->task);
-		h1s->recv_wait = NULL;
-	}
+	htx->flags |= HTX_FL_PARSING_ERROR;
+	b_set_data(buf, b_size(buf));
+	h1s->cs->flags |= CS_FL_EOS;
 	return 0;
 }
 
@@ -1345,76 +1309,6 @@
 	return total;
 }
 
-/*
- * Transfer data from h1s->rxbuf into the channel buffer. It returns the number
- * of bytes transferred.
- */
-static size_t h1_xfer(struct h1s *h1s, struct buffer *buf, int flags)
-{
-	struct h1c *h1c = h1s->h1c;
-	struct h1m *h1m;
-	struct conn_stream *cs = h1s->cs;
-	struct htx *mux_htx, *chn_htx;
-	struct htx_ret htx_ret;
-	size_t count, ret = 0;
-
-	h1m = (!conn_is_back(h1c->conn) ? &h1s->req : &h1s->res);
-	mux_htx = htx_from_buf(&h1s->rxbuf);
-	chn_htx = htx_from_buf(buf);
-
-	if (h1s->flags & (H1S_F_REQ_ERROR|H1S_F_RES_ERROR)) {
-		chn_htx->flags |= HTX_FL_PARSING_ERROR;
-		b_set_data(buf, b_size(buf));
-		goto end;
-	}
-	if (htx_is_empty(mux_htx))
-		goto end;
-	count = htx_free_space(chn_htx);
-	if (flags & CO_RFL_KEEP_RSV) {
-		if (count < global.tune.maxrewrite)
-			goto end;
-		count -= global.tune.maxrewrite;
-	}
-
-	// FIXME: if chn empty and count > htx => b_xfer !
-	if (!(h1s->flags & H1S_F_MSG_XFERED)) {
-		htx_ret = htx_xfer_blks(chn_htx, mux_htx, count,
-					((h1m->state == H1_MSG_DONE) ? HTX_BLK_EOM : HTX_BLK_EOH));
-		ret = htx_ret.ret;
-		if (htx_ret.blk && htx_get_blk_type(htx_ret.blk) >= HTX_BLK_EOH)
-			h1s->flags |= H1S_F_MSG_XFERED;
-	}
-	else {
-		htx_ret = htx_xfer_blks(chn_htx, mux_htx, count, HTX_BLK_EOM);
-		ret = htx_ret.ret;
-	}
-	chn_htx->extra = mux_htx->extra;
-	if (h1m->flags & H1_MF_XFER_LEN)
-		chn_htx->extra += mux_htx->data;
-
-	if (htx_is_not_empty(chn_htx))
-		b_set_data(buf, b_size(buf));
-  end:
-	if (h1c->flags & H1C_F_RX_FULL && htx_free_data_space(mux_htx)) {
-		h1c->flags &= ~H1C_F_RX_FULL;
-		tasklet_wakeup(h1c->wait_event.task);
-	}
-
-	if (htx_is_not_empty(mux_htx)) {
-		cs->flags |= CS_FL_RCV_MORE;
-	}
-	else {
-		h1c->flags &= ~H1C_F_RX_FULL;
-		h1_release_buf(h1c, &h1s->rxbuf);
-		h1_sync_messages(h1c);
-
-		cs->flags &= ~CS_FL_RCV_MORE;
-		if (!b_data(&h1c->ibuf) && (cs->flags & CS_FL_REOS))
-			cs->flags |= CS_FL_EOS;
-	}
-	return ret;
-}
-
 /*********************************************************/
 /* functions below are I/O callbacks from the connection */
 /*********************************************************/
@@ -1430,11 +1324,8 @@
 	if (h1c->wait_event.wait_reason & SUB_CAN_RECV)
 		return 0;
 
-	if (!h1_recv_allowed(h1c)) {
-		if (h1c->h1s && b_data(&h1c->h1s->rxbuf))
-			rcvd = 1;
+	if (!h1_recv_allowed(h1c))
 		goto end;
-	}
 
 	if (h1c->h1s && (h1c->h1s->flags & H1S_F_BUF_FLUSH)) {
 		rcvd = 1;
@@ -1500,12 +1391,6 @@
 		h1c->flags &= ~H1C_F_OUT_FULL;
 		b_del(&h1c->obuf, ret);
 		sent = 1;
-
-		if (h1c->h1s && h1c->h1s->send_wait) {
-			h1c->h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
-			tasklet_wakeup(h1c->h1s->send_wait->task);
-			h1c->h1s->send_wait = NULL;
-		}
 	}
 
   end:
@@ -1563,40 +1448,33 @@
 {
 	struct connection *conn = h1c->conn;
 
-	if (!(h1c->flags & (H1C_F_CS_ERROR|H1C_F_RX_FULL|H1C_F_RX_ALLOC))) {
-		size_t ret;
-
-		ret = h1_process_input(h1c, &h1c->ibuf, b_data(&h1c->ibuf));
-		if (ret > 0) {
-			h1c->flags &= ~H1C_F_IN_FULL;
-			if (!b_data(&h1c->ibuf))
-				h1_release_buf(h1c, &h1c->ibuf);
-		}
-	}
-
-	h1_send(h1c);
-
 	if (!conn->mux_ctx)
 		return -1;
 
 	if (h1c->flags & H1C_F_CS_WAIT_CONN) {
-		if (conn->flags & (CO_FL_CONNECTED|CO_FL_ERROR)) {
-			h1c->flags &= ~H1C_F_CS_WAIT_CONN;
-			h1_wake_stream(h1c);
-		}
-		goto end;
+		if (!(conn->flags & (CO_FL_CONNECTED|CO_FL_ERROR)))
+			goto end;
+		h1c->flags &= ~H1C_F_CS_WAIT_CONN;
 	}
 
-	if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn)) {
-		h1_wake_stream(h1c);
-		if (!h1c->h1s || !h1c->h1s->cs) {
-			h1_release(conn);
-			return -1;
+	if (!h1c->h1s) {
+		if (h1c->flags & H1C_F_CS_ERROR   ||
+		    conn->flags & CO_FL_ERROR     ||
+		    conn_xprt_read0_pending(conn))
+			goto release;
+		if (!(h1c->flags & (H1C_F_CS_SHUTW_NOW|H1C_F_CS_SHUTW))) {
+			if (!h1s_create(h1c, NULL))
+				goto release;
 		}
 	}
 
+	h1_wake_stream(h1c);
   end:
 	return 0;
+
+  release:
+	h1_release(conn);
+	return -1;
 }
 
 static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status)
@@ -1608,7 +1486,7 @@
 		ret = h1_send(h1c);
 	if (!(h1c->wait_event.wait_reason & SUB_CAN_RECV))
 		ret |= h1_recv(h1c);
-	if (ret || b_data(&h1c->ibuf) || (h1c->h1s && b_data(&h1c->h1s->rxbuf)))
+	if (ret/* || b_data(&h1c->ibuf)*/)
 		h1_process(h1c);
 	return NULL;
 }
@@ -1618,7 +1496,8 @@
 {
 	struct h1c *h1c = conn->mux_ctx;
 
-	return (h1_process(h1c));
+	h1_send(h1c);
+	return h1_process(h1c);
 }
 
 /*******************************************/
@@ -1817,20 +1696,18 @@
 static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
 {
 	struct h1s *h1s = cs->ctx;
+	struct h1c *h1c = h1s->h1c;
 	size_t ret = 0;
 
-	if (!h1s)
-		return ret;
-
-	if (!(h1s->h1c->flags & H1C_F_RX_ALLOC))
-		ret = h1_xfer(h1s, buf, flags);
+	if (!(h1c->flags & H1C_F_IN_ALLOC))
+		ret = h1_process_input(h1c, buf, flags);
 
 	if (flags & CO_RFL_BUF_FLUSH)
 		h1s->flags |= H1S_F_BUF_FLUSH;
 	else if (ret > 0 || (h1s->flags & H1S_F_BUF_FLUSH)) {
 		h1s->flags &= ~H1S_F_BUF_FLUSH;
-		if (!(h1s->h1c->wait_event.wait_reason & SUB_CAN_RECV))
-			tasklet_wakeup(h1s->h1c->wait_event.task);
+		if (!(h1c->wait_event.wait_reason & SUB_CAN_RECV))
+			tasklet_wakeup(h1c->wait_event.task);
 	}
 	return ret;
 }
@@ -1851,12 +1728,16 @@
 	if (h1c->flags & H1C_F_CS_WAIT_CONN)
 		return 0;
 
-	if (!(h1c->flags & (H1C_F_OUT_FULL|H1C_F_OUT_ALLOC)) && b_data(buf))
+	if (!(h1c->flags & (H1C_F_OUT_FULL|H1C_F_OUT_ALLOC)))
 		ret = h1_process_output(h1c, buf, count);
 	if (ret > 0) {
 		h1_send(h1c);
 
-		/* We need to do that because of the infinite forwarding. */
+		/* We need to do that because of the infinite forwarding. <buf>
+		 * contains HTX messages so when infinite forwarding is enabled,
+		 * count is equal to the buffer size. From outside, the buffer
+		 * appears as full.
+		 */
 		if (!b_data(buf))
 			ret = count;
 	}
@@ -1871,7 +1752,7 @@
 	struct h1m *h1m = (!conn_is_back(cs->conn) ? &h1s->req : &h1s->res);
 	int ret = 0;
 
-	if (b_data(&h1s->rxbuf) || b_data(&h1s->h1c->ibuf))
+	if (b_data(&h1s->h1c->ibuf))
 		goto end;
 	if (h1m->state == H1_MSG_DATA && count > h1m->curr_len)
 		count = h1m->curr_len;