MINOR: mux-quic: simplify decode_qcs API

Slightly modify decode_qcs function used by transcoders. The MUX now
gives a buffer instance on which each transcoder is free to work on it.
At the return of the function, the MUX removes consume data from its own
buffer.

This reduces the number of invocation to qcs_consume at the end of a
full demuxing process. The API is also cleaner with the transcoders not
responsible of calling it with the risk of having the input buffer
freed if empty.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index b32d4f9..9a69b76 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -102,6 +102,9 @@
 #define QC_SF_DEM_FULL          0x00000020  /* demux blocked on request channel buffer full */
 #define QC_SF_READ_ABORTED      0x00000040  /* stream rejected by app layer */
 
+/* Maximum size of stream Rx buffer. */
+#define QC_S_RX_BUF_SZ   (global.tune.bufsize - NCB_RESERVED_SZ)
+
 struct qcs {
 	struct qcc *qcc;
 	struct sedesc *sd;
@@ -137,7 +140,7 @@
 struct qcc_app_ops {
 	int (*init)(struct qcc *qcc);
 	int (*attach)(struct qcs *qcs, void *conn_ctx);
-	int (*decode_qcs)(struct qcs *qcs, int fin);
+	int (*decode_qcs)(struct qcs *qcs, struct buffer *b, int fin);
 	size_t (*snd_buf)(struct stconn *sc, struct buffer *buf, size_t count, int flags);
 	void (*detach)(struct qcs *qcs);
 	int (*finalize)(void *ctx);
diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h
index 3c713a3..0846ca7 100644
--- a/include/haproxy/mux_quic.h
+++ b/include/haproxy/mux_quic.h
@@ -23,7 +23,6 @@
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
 void qcs_notify_recv(struct qcs *qcs);
 void qcs_notify_send(struct qcs *qcs);
-void qcs_consume(struct qcs *qcs, uint64_t bytes);
 
 void qcc_emit_cc_app(struct qcc *qcc, int err);
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
diff --git a/src/h3.c b/src/h3.c
index b9a9fed..655bf26 100644
--- a/src/h3.c
+++ b/src/h3.c
@@ -26,7 +26,6 @@
 #include <haproxy/intops.h>
 #include <haproxy/istbuf.h>
 #include <haproxy/mux_quic.h>
-#include <haproxy/ncbuf.h>
 #include <haproxy/pool.h>
 #include <haproxy/qpack-dec.h>
 #include <haproxy/qpack-enc.h>
@@ -144,22 +143,15 @@
 
 DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s));
 
-/* Simple function to duplicate a buffer */
-static inline struct buffer h3_b_dup(const struct ncbuf *b)
-{
-	return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
-}
-
-/* Initialize an uni-stream <qcs> by reading its type from <rxbuf>.
+/* Initialize an uni-stream <qcs> by reading its type from <b>.
  *
  * Returns 0 on success else non-zero.
  */
 static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs,
-                              struct ncbuf *rxbuf)
+                              struct buffer *b)
 {
 	/* decode unidirectional stream type */
 	struct h3s *h3s = qcs->ctx;
-	struct buffer b;
 	uint64_t type;
 	size_t len = 0, ret;
 
@@ -168,8 +160,7 @@
 	BUG_ON_HOT(!quic_stream_is_uni(qcs->id) ||
 	           h3s->flags & H3_SF_UNI_INIT);
 
-	b = h3_b_dup(rxbuf);
-	ret = b_quic_dec_int(&type, &b, &len);
+	ret = b_quic_dec_int(&type, b, &len);
 	if (!ret) {
 		ABORT_NOW();
 	}
@@ -220,7 +211,6 @@
 	};
 
 	h3s->flags |= H3_SF_UNI_INIT;
-	qcs_consume(qcs, len);
 
 	TRACE_LEAVE(H3_EV_H3S_NEW, qcs->qcc->conn, qcs);
 	return 0;
@@ -231,7 +221,7 @@
  *
  * Returns 0 on success else non-zero.
  */
-static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct ncbuf *rxbuf)
+static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct buffer *b)
 {
 	struct h3s *h3s = qcs->ctx;
 
@@ -263,14 +253,13 @@
  * consumed.
  */
 static inline size_t h3_decode_frm_header(uint64_t *ftype, uint64_t *flen,
-                                          struct ncbuf *rxbuf)
+                                          struct buffer *b)
 {
 	size_t hlen;
-	struct buffer b = h3_b_dup(rxbuf);
 
 	hlen = 0;
-	if (!b_quic_dec_int(ftype, &b, &hlen) ||
-	    !b_quic_dec_int(flen, &b, &hlen)) {
+	if (!b_quic_dec_int(ftype, b, &hlen) ||
+	    !b_quic_dec_int(flen, b, &hlen)) {
 		return 0;
 	}
 
@@ -333,8 +322,8 @@
  *
  * Returns the number of bytes handled or a negative error code.
  */
-static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
-                             char fin)
+static int h3_headers_to_htx(struct qcs *qcs, const struct buffer *buf,
+                             uint64_t len, char fin)
 {
 	struct buffer htx_buf = BUF_NULL;
 	struct buffer *tmp = get_trash_chunk();
@@ -350,8 +339,8 @@
 	TRACE_ENTER(H3_EV_RX_FRAME|H3_EV_RX_HDR, qcs->qcc->conn, qcs);
 
 	/* TODO support buffer wrapping */
-	BUG_ON(ncb_head(buf) + len >= ncb_wrap(buf));
-	if (qpack_decode_fs((const unsigned char *)ncb_head(buf), len, tmp, list) < 0)
+	BUG_ON(b_head(buf) + len >= b_wrap(buf));
+	if (qpack_decode_fs((const unsigned char *)b_head(buf), len, tmp, list) < 0)
 		return -1;
 
 	qc_get_buf(qcs, &htx_buf);
@@ -431,8 +420,8 @@
  *
  * Returns the number of bytes handled or a negative error code.
  */
-static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
-                          char fin)
+static int h3_data_to_htx(struct qcs *qcs, const struct buffer *buf,
+                          uint64_t len, char fin)
 {
 	struct buffer *appbuf;
 	struct htx *htx = NULL;
@@ -446,12 +435,12 @@
 	BUG_ON(!appbuf);
 	htx = htx_from_buf(appbuf);
 
-	if (len > ncb_data(buf, 0)) {
-		len = ncb_data(buf, 0);
+	if (len > b_data(buf)) {
+		len = b_data(buf);
 		fin = 0;
 	}
 
-	head = ncb_head(buf);
+	head = b_head(buf);
  retry:
 	htx_space = htx_free_data_space(htx);
 	if (!htx_space) {
@@ -464,16 +453,16 @@
 		fin = 0;
 	}
 
-	if (head + len > ncb_wrap(buf)) {
-		size_t contig = ncb_wrap(buf) - head;
-		htx_sent = htx_add_data(htx, ist2(ncb_head(buf), contig));
+	if (head + len > b_wrap(buf)) {
+		size_t contig = b_wrap(buf) - head;
+		htx_sent = htx_add_data(htx, ist2(b_head(buf), contig));
 		if (htx_sent < contig) {
 			qcs->flags |= QC_SF_DEM_FULL;
 			goto out;
 		}
 
 		len -= contig;
-		head = ncb_orig(buf);
+		head = b_orig(buf);
 		goto retry;
 	}
 
@@ -493,11 +482,11 @@
 	return htx_sent;
 }
 
-/* Parse a SETTINGS frame of length <len> of payload <rxbuf>.
+/* Parse a SETTINGS frame of length <len> of payload <buf>.
  *
  * Returns the number of bytes handled or a negative error code.
  */
-static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf,
+static size_t h3_parse_settings_frm(struct h3c *h3c, const struct buffer *buf,
                                     size_t len)
 {
 	struct buffer b;
@@ -507,8 +496,11 @@
 
 	TRACE_ENTER(H3_EV_RX_FRAME|H3_EV_RX_SETTINGS, h3c->qcc->conn);
 
-	b = h3_b_dup(rxbuf);
-	b_set_data(&b, len);
+	/* Work on a copy of <buf>. */
+	b = b_make(b_orig(buf), b_size(buf), b_head_ofs(buf), b_data(buf));
+
+	/* TODO handle incomplete SETTINGS frame */
+	BUG_ON(len < b_data(&b));
 
 	while (b_data(&b)) {
 		if (!b_quic_dec_int(&id, &b, &ret) || !b_quic_dec_int(&value, &b, &ret)) {
@@ -576,36 +568,35 @@
  *
  * Returns 0 on success else non-zero.
  */
-static int h3_decode_qcs(struct qcs *qcs, int fin)
+static int h3_decode_qcs(struct qcs *qcs, struct buffer *b, int fin)
 {
-	struct ncbuf *rxbuf = &qcs->rx.ncbuf;
 	struct h3s *h3s = qcs->ctx;
 	struct h3c *h3c = h3s->h3c;
 	ssize_t ret;
 
 	h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id);
-	if (!ncb_data(rxbuf, 0))
+	if (!b_data(b))
 		return 0;
 
 	if (quic_stream_is_uni(qcs->id) && !(h3s->flags & H3_SF_UNI_INIT)) {
-		if (h3_init_uni_stream(h3c, qcs, rxbuf))
+		if (h3_init_uni_stream(h3c, qcs, b))
 			return 1;
 	}
 
 	if (quic_stream_is_uni(qcs->id) && (h3s->flags & H3_SF_UNI_NO_H3)) {
 		/* For non-h3 STREAM, parse it and return immediately. */
-		if (h3_parse_uni_stream_no_h3(qcs, rxbuf))
+		if (h3_parse_uni_stream_no_h3(qcs, b))
 			return 1;
 		return 0;
 	}
 
-	while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) {
+	while (b_data(b) && !(qcs->flags & QC_SF_DEM_FULL)) {
 		uint64_t ftype, flen;
 		char last_stream_frame = 0;
 
 		/* Work on a copy of <rxbuf> */
 		if (!h3s->demux_frame_len) {
-			size_t hlen = h3_decode_frm_header(&ftype, &flen, rxbuf);
+			size_t hlen = h3_decode_frm_header(&ftype, &flen, b);
 			if (!hlen)
 				break;
 
@@ -620,8 +611,7 @@
 				return 1;
 			}
 
-			qcs_consume(qcs, hlen);
-			if (!ncb_data(rxbuf, 0))
+			if (!b_data(b))
 				break;
 		}
 
@@ -631,31 +621,31 @@
 		/* Do not demux incomplete frames except H3 DATA which can be
 		 * fragmented in multiple HTX blocks.
 		 */
-		if (flen > ncb_data(rxbuf, 0) && ftype != H3_FT_DATA) {
+		if (flen > b_data(b) && ftype != H3_FT_DATA) {
 			/* Reject frames bigger than bufsize.
 			 *
 			 * TODO HEADERS should in complement be limited with H3
 			 * SETTINGS_MAX_FIELD_SECTION_SIZE parameter to prevent
 			 * excessive decompressed size.
 			 */
-			if (flen > ncb_size(rxbuf)) {
+			if (flen > QC_S_RX_BUF_SZ) {
 				qcc_emit_cc_app(qcs->qcc, H3_EXCESSIVE_LOAD);
 				return 1;
 			}
 			break;
 		}
 
-		last_stream_frame = (fin && flen == ncb_total_data(rxbuf));
+		last_stream_frame = (fin && flen == b_data(b));
 
 		h3_inc_frame_type_cnt(h3c->prx_counters, ftype);
 		switch (ftype) {
 		case H3_FT_DATA:
-			ret = h3_data_to_htx(qcs, rxbuf, flen, last_stream_frame);
+			ret = h3_data_to_htx(qcs, b, flen, last_stream_frame);
 			/* TODO handle error reporting. Stream closure required. */
 			if (ret < 0) { ABORT_NOW(); }
 			break;
 		case H3_FT_HEADERS:
-			ret = h3_headers_to_htx(qcs, rxbuf, flen, last_stream_frame);
+			ret = h3_headers_to_htx(qcs, b, flen, last_stream_frame);
 			/* TODO handle error reporting. Stream closure required. */
 			if (ret < 0) { ABORT_NOW(); }
 			break;
@@ -667,7 +657,7 @@
 			ret = flen;
 			break;
 		case H3_FT_SETTINGS:
-			ret = h3_parse_settings_frm(qcs->qcc->ctx, rxbuf, flen);
+			ret = h3_parse_settings_frm(qcs->qcc->ctx, b, flen);
 			if (ret < 0) {
 				qcc_emit_cc_app(qcs->qcc, h3c->err);
 				return 1;
@@ -688,7 +678,7 @@
 		if (ret) {
 			BUG_ON(h3s->demux_frame_len < ret);
 			h3s->demux_frame_len -= ret;
-			qcs_consume(qcs, ret);
+			b_del(b, ret);
 		}
 	}
 
diff --git a/src/hq_interop.c b/src/hq_interop.c
index f5c0e79..4b4b522 100644
--- a/src/hq_interop.c
+++ b/src/hq_interop.c
@@ -7,20 +7,18 @@
 #include <haproxy/htx.h>
 #include <haproxy/http.h>
 #include <haproxy/mux_quic.h>
-#include <haproxy/ncbuf.h>
 
-static int hq_interop_decode_qcs(struct qcs *qcs, int fin)
+static int hq_interop_decode_qcs(struct qcs *qcs, struct buffer *b, int fin)
 {
-	struct ncbuf *rxbuf = &qcs->rx.ncbuf;
 	struct htx *htx;
 	struct htx_sl *sl;
 	struct stconn *sc;
 	struct buffer htx_buf = BUF_NULL;
 	struct ist path;
-	char *ptr = ncb_head(rxbuf);
-	char *end = ncb_wrap(rxbuf);
-	size_t size = ncb_size(rxbuf);
-	size_t data = ncb_data(rxbuf, 0);
+	char *ptr = b_head(b);
+	char *end = b_wrap(b);
+	size_t size = b_size(b);
+	size_t data = b_data(b);
 
 	b_alloc(&htx_buf);
 	htx = htx_from_buf(&htx_buf);
@@ -76,7 +74,7 @@
 	if (!sc)
 		return 1;
 
-	qcs_consume(qcs, ncb_data(rxbuf, 0));
+	b_reset(b);
 	b_free(&htx_buf);
 
 	if (fin)
diff --git a/src/mux_quic.c b/src/mux_quic.c
index 84237c3..28a1896 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -286,57 +286,6 @@
 	}
 }
 
-/* Remove <bytes> from <qcs> Rx buffer. This must be called by transcoders
- * after STREAM parsing. Flow-control for received offsets may be allocated for
- * the peer if needed.
- */
-void qcs_consume(struct qcs *qcs, uint64_t bytes)
-{
-	struct qcc *qcc = qcs->qcc;
-	struct quic_frame *frm;
-	struct ncbuf *buf = &qcs->rx.ncbuf;
-	enum ncb_ret ret;
-
-	ret = ncb_advance(buf, bytes);
-	if (ret) {
-		ABORT_NOW(); /* should not happens because removal only in data */
-	}
-
-	if (ncb_is_empty(buf))
-		qc_free_ncbuf(qcs, buf);
-
-	qcs->rx.offset += bytes;
-	if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
-		frm = pool_zalloc(pool_head_quic_frame);
-		BUG_ON(!frm); /* TODO handle this properly */
-
-		qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
-
-		LIST_INIT(&frm->reflist);
-		frm->type = QUIC_FT_MAX_STREAM_DATA;
-		frm->max_stream_data.id = qcs->id;
-		frm->max_stream_data.max_stream_data = qcs->rx.msd;
-
-		LIST_APPEND(&qcc->lfctl.frms, &frm->list);
-		tasklet_wakeup(qcc->wait_event.tasklet);
-	}
-
-	qcc->lfctl.offsets_consume += bytes;
-	if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) {
-		frm = pool_zalloc(pool_head_quic_frame);
-		BUG_ON(!frm); /* TODO handle this properly */
-
-		qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init;
-
-		LIST_INIT(&frm->reflist);
-		frm->type = QUIC_FT_MAX_DATA;
-		frm->max_data.max_data = qcc->lfctl.md;
-
-		LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list);
-		tasklet_wakeup(qcs->qcc->wait_event.tasklet);
-	}
-}
-
 /* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
  * several streams, depending on the already open ones.
  * Return this node if succeeded, NULL if not.
@@ -425,6 +374,63 @@
 	return NULL;
 }
 
+/* Simple function to duplicate a buffer */
+static inline struct buffer qcs_b_dup(const struct ncbuf *b)
+{
+	return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
+}
+
+/* Remove <bytes> from <qcs> Rx buffer. This must be called by transcoders
+ * after STREAM parsing. Flow-control for received offsets may be allocated for
+ * the peer if needed.
+ */
+static void qcs_consume(struct qcs *qcs, uint64_t bytes)
+{
+	struct qcc *qcc = qcs->qcc;
+	struct quic_frame *frm;
+	struct ncbuf *buf = &qcs->rx.ncbuf;
+	enum ncb_ret ret;
+
+	ret = ncb_advance(buf, bytes);
+	if (ret) {
+		ABORT_NOW(); /* should not happens because removal only in data */
+	}
+
+	if (ncb_is_empty(buf))
+		qc_free_ncbuf(qcs, buf);
+
+	qcs->rx.offset += bytes;
+	if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
+		frm = pool_zalloc(pool_head_quic_frame);
+		BUG_ON(!frm); /* TODO handle this properly */
+
+		qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
+
+		LIST_INIT(&frm->reflist);
+		frm->type = QUIC_FT_MAX_STREAM_DATA;
+		frm->max_stream_data.id = qcs->id;
+		frm->max_stream_data.max_stream_data = qcs->rx.msd;
+
+		LIST_APPEND(&qcc->lfctl.frms, &frm->list);
+		tasklet_wakeup(qcc->wait_event.tasklet);
+	}
+
+	qcc->lfctl.offsets_consume += bytes;
+	if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) {
+		frm = pool_zalloc(pool_head_quic_frame);
+		BUG_ON(!frm); /* TODO handle this properly */
+
+		qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init;
+
+		LIST_INIT(&frm->reflist);
+		frm->type = QUIC_FT_MAX_DATA;
+		frm->max_data.max_data = qcc->lfctl.md;
+
+		LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list);
+		tasklet_wakeup(qcs->qcc->wait_event.tasklet);
+	}
+}
+
 /* Decode the content of STREAM frames already received on the stream instance
  * <qcs>.
  *
@@ -432,14 +438,27 @@
  */
 static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
 {
+	struct buffer b;
+	size_t data, done;
+	int ret;
+
 	TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
 
-	if (qcc->app_ops->decode_qcs(qcs, qcs->flags & QC_SF_FIN_RECV)) {
+	b = qcs_b_dup(&qcs->rx.ncbuf);
+	data = b_data(&b);
+
+	ret = qcc->app_ops->decode_qcs(qcs, &b, qcs->flags & QC_SF_FIN_RECV);
+	if (ret) {
 		TRACE_DEVEL("leaving on decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
 		return 1;
 	}
 
-	qcs_notify_recv(qcs);
+	BUG_ON_HOT(data < b_data(&b));
+	done = data - b_data(&b);
+	if (done) {
+		qcs_consume(qcs, done);
+		qcs_notify_recv(qcs);
+	}
 
 	TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs);