BUG/MEDIUM: mux-h1: Fix splicing by properly detecting end of message

Since the 2.4.4, the splicing support in the H1 multiplexer is buggy because
end of the message is not properly detected.

On the 2.4, when the requests is spliced, there is no issue. But when the
response is spliced, the client connection is always closed at the end of the
message. Note the response is still fully sent.

On the 2.5 and higher, when the last requests on a connection is spliced, a
client abort is reported. For other requests there is no issue. In all cases,
the requests are fully sent. When the response is spliced, the server connection
hangs till the server timeout and a server abort is reported. The response is
fully sent with no delay.

The root cause is the EOM block suppression. There is no longer extra block to
be sure to call a last time rcv_buf()/snd_buf() callback functions. At the end,
to fix the issue, we must now detect end of the message in rcv_pipe() and
snd_pipe() callback functions. To do so, we rely on the announced message length
to know when the payload is finished. This works because the chunk-encoded
messages are not spliced.

This patch must be backported as far as 2.4 after an observation period.

(cherry picked from commit 140f1a585248a7da1fefbb5f20f260dfafb32a9a)
Signed-off-by: Willy Tarreau <w@1wt.eu>
(cherry picked from commit beb6d0615404e090cac456857864c0b6eda1679c)
Signed-off-by: Willy Tarreau <w@1wt.eu>
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 07b2359..7372d19 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -1822,9 +1822,19 @@
 			void *old_area;
 
 			TRACE_PROTO("sending message data (zero-copy)", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s, chn_htx, (size_t[]){count});
-			if (h1m->state == H1_MSG_DATA && chn_htx->flags & HTX_FL_EOM) {
-				TRACE_DEVEL("last message block", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s);
-				last_data = 1;
+			if (h1m->state == H1_MSG_DATA) {
+				if (h1m->flags & H1_MF_CLEN) {
+					if (count > h1m->curr_len) {
+						TRACE_ERROR("too much payload, more than announced",
+							    H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+						goto error;
+					}
+					h1m->curr_len -= count;
+				}
+				if (chn_htx->flags & HTX_FL_EOM) {
+					TRACE_DEVEL("last message block", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s);
+					last_data = 1;
+				}
 			}
 
 			old_area = h1c->obuf.area;
@@ -2164,9 +2174,19 @@
 					last_data = 0;
 				}
 
-				if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP)) {
-					TRACE_PROTO("Skip data for bodyless response", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s, chn_htx);
-					goto skip_data;
+				if (h1m->state == H1_MSG_DATA) {
+					if (h1m->flags & H1_MF_CLEN) {
+						if (vlen > h1m->curr_len) {
+							TRACE_ERROR("too much payload, more than announced",
+								    H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+							goto error;
+						}
+						h1m->curr_len -= vlen;
+					}
+					if ((h1m->flags & H1_MF_RESP) && (h1s->flags & H1S_F_BODYLESS_RESP)) {
+						TRACE_PROTO("Skip data for bodyless response", H1_EV_TX_DATA|H1_EV_TX_BODY, h1c->conn, h1s, chn_htx);
+						goto skip_data;
+					}
 				}
 
 				chklen = 0;
@@ -2293,7 +2313,7 @@
 				h1c->flags |= H1C_F_ST_ERROR;
 				TRACE_ERROR("processing output error, set error on h1c/h1s",
 					    H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
-				break;
+				goto end;
 		}
 
 	  nextblk:
@@ -3521,11 +3541,22 @@
 	if (h1m->state == H1_MSG_DATA && count > h1m->curr_len)
 		count = h1m->curr_len;
 	ret = cs->conn->xprt->rcv_pipe(cs->conn, cs->conn->xprt_ctx, pipe, count);
-	if (h1m->state == H1_MSG_DATA && ret >= 0) {
-		h1m->curr_len -= ret;
-		if (!h1m->curr_len) {
-			h1c->flags &= ~H1C_F_WANT_SPLICE;
-			TRACE_STATE("Allow xprt rcv_buf on !curr_len", H1_EV_STRM_RECV, cs->conn, h1s);
+	if (ret >= 0) {
+		if (h1m->state == H1_MSG_DATA) {
+			if (ret > h1m->curr_len) {
+				h1s->flags |= H1S_F_PARSING_ERROR;
+				h1c->flags |= H1C_F_ST_ERROR;
+				cs->flags  |= CS_FL_ERROR;
+				TRACE_ERROR("too much payload, more than announced",
+					    H1_EV_RX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, cs->conn, h1s);
+				goto end;
+			}
+			h1m->curr_len -= ret;
+			if (!h1m->curr_len) {
+				h1m->state = H1_MSG_DONE;
+				h1c->flags &= ~H1C_F_WANT_SPLICE;
+				TRACE_STATE("payload fully received", H1_EV_STRM_RECV, cs->conn, h1s);
+			}
 		}
 	}
 
@@ -3552,19 +3583,36 @@
 static int h1_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
 {
 	struct h1s *h1s = cs->ctx;
+	struct h1c *h1c = h1s->h1c;
+	struct h1m *h1m = (!(h1c->flags & H1C_F_IS_BACK) ? &h1s->res : &h1s->req);
 	int ret = 0;
 
 	TRACE_ENTER(H1_EV_STRM_SEND, cs->conn, h1s, 0, (size_t[]){pipe->data});
 
-	if (b_data(&h1s->h1c->obuf)) {
-		if (!(h1s->h1c->wait_event.events & SUB_RETRY_SEND)) {
+	if (b_data(&h1c->obuf)) {
+		if (!(h1c->wait_event.events & SUB_RETRY_SEND)) {
 			TRACE_STATE("more data to send, subscribing", H1_EV_STRM_SEND, cs->conn, h1s);
-			cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, SUB_RETRY_SEND, &h1s->h1c->wait_event);
+			cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, SUB_RETRY_SEND, &h1c->wait_event);
 		}
 		goto end;
 	}
 
 	ret = cs->conn->xprt->snd_pipe(cs->conn, cs->conn->xprt_ctx, pipe);
+	if (h1m->state == H1_MSG_DATA) {
+		if (ret > h1m->curr_len) {
+			h1s->flags |= H1S_F_PROCESSING_ERROR;
+			h1c->flags |= H1C_F_ST_ERROR;
+			cs->flags  |= CS_FL_ERROR;
+			TRACE_ERROR("too much payload, more than announced",
+				    H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, cs->conn, h1s);
+			goto end;
+		}
+		h1m->curr_len -= ret;
+		if (!h1m->curr_len) {
+			h1m->state = H1_MSG_DONE;
+			TRACE_STATE("payload fully xferred", H1_EV_TX_DATA|H1_EV_TX_BODY, cs->conn, h1s);
+		}
+	}
 
   end:
 	TRACE_LEAVE(H1_EV_STRM_SEND, cs->conn, h1s, 0, (size_t[]){ret});