MEDIUM: mux-quic/h3/hq-interop: use ncbuf for bidir streams

Add a ncbuf for data reception on qcs. Thanks to this, the MUX is able
to buffered all received frame directly into the buffer. Flow control
parameters will be used to ensure there is never an overflow.

This change will simplify Rx path with the future deletion of acked
frames tree previously used for frames out of order.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index 83fccb2..d096d1d 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -11,6 +11,7 @@
 #include <haproxy/buf-t.h>
 #include <haproxy/connection-t.h>
 #include <haproxy/list-t.h>
+#include <haproxy/ncbuf-t.h>
 #include <haproxy/quic_stream-t.h>
 #include <haproxy/conn_stream-t.h>
 
@@ -101,8 +102,9 @@
 
 	struct {
 		struct eb_root frms; /* received frames ordered by their offsets */
-		uint64_t offset; /* the current offset of received data */
+		uint64_t offset; /* absolute current base offset of ncbuf */
 		struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */
+		struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
 		struct buffer app_buf; /* receive buffer used by conn_stream layer */
 		uint64_t msd; /* fctl bytes limit to enforce */
 	} rx;
diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h
index 08ff9a3..72e52f3 100644
--- a/include/haproxy/mux_quic.h
+++ b/include/haproxy/mux_quic.h
@@ -18,13 +18,14 @@
 void qcs_free(struct qcs *qcs);
 
 struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
+struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf);
 
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
 void qcs_notify_recv(struct qcs *qcs);
 void qcs_notify_send(struct qcs *qcs);
 
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
-             char fin, char *data, struct qcs **out_qcs, size_t *done);
+             char fin, char *data, struct qcs **out_qcs);
 int qcc_recv_max_data(struct qcc *qcc, uint64_t max);
 int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max);
 int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs);
diff --git a/src/h3.c b/src/h3.c
index 68fc102..488fccf 100644
--- a/src/h3.c
+++ b/src/h3.c
@@ -25,6 +25,7 @@
 #include <haproxy/htx.h>
 #include <haproxy/istbuf.h>
 #include <haproxy/mux_quic.h>
+#include <haproxy/ncbuf.h>
 #include <haproxy/pool.h>
 #include <haproxy/qpack-dec.h>
 #include <haproxy/qpack-enc.h>
@@ -76,9 +77,9 @@
 DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s));
 
 /* Simple function to duplicate a buffer */
-static inline struct buffer h3_b_dup(struct buffer *b)
+static inline struct buffer h3_b_dup(struct ncbuf *b)
 {
-	return b_make(b->area, b->size, b->head, b->data);
+	return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
 }
 
 /* Decode a h3 frame header made of two QUIC varints from <b> buffer.
@@ -104,7 +105,7 @@
  *
  * Returns the number of bytes handled or a negative error code.
  */
-static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
+static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
                              char fin)
 {
 	struct buffer htx_buf = BUF_NULL;
@@ -119,8 +120,8 @@
 	int hdr_idx;
 
 	/* TODO support buffer wrapping */
-	BUG_ON(b_contig_data(buf, 0) != b_data(buf));
-	if (qpack_decode_fs((const unsigned char *)b_head(buf), len, tmp, list) < 0)
+	BUG_ON(ncb_head(buf) + len >= ncb_wrap(buf));
+	if (qpack_decode_fs((const unsigned char *)ncb_head(buf), len, tmp, list) < 0)
 		return -1;
 
 	qc_get_buf(qcs, &htx_buf);
@@ -200,12 +201,12 @@
  *
  * Returns the number of bytes handled or a negative error code.
  */
-static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
+static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
                           char fin)
 {
 	struct buffer *appbuf;
 	struct htx *htx = NULL;
-	size_t contig = 0, htx_sent = 0;
+	size_t htx_sent = 0;
 	int htx_space;
 	char *head;
 
@@ -213,12 +214,12 @@
 	BUG_ON(!appbuf);
 	htx = htx_from_buf(appbuf);
 
-	if (len > b_data(buf)) {
-		len = b_data(buf);
+	if (len > ncb_data(buf, 0)) {
+		len = ncb_data(buf, 0);
 		fin = 0;
 	}
 
-	head = b_head(buf);
+	head = ncb_head(buf);
  retry:
 	htx_space = htx_free_data_space(htx);
 	if (!htx_space) {
@@ -231,10 +232,10 @@
 		fin = 0;
 	}
 
-	contig = b_contig_data(buf, contig);
-	if (len > contig) {
-		htx_sent = htx_add_data(htx, ist2(b_head(buf), contig));
-		head = b_orig(buf);
+	if (head + len > ncb_wrap(buf)) {
+		size_t contig = ncb_wrap(buf) - head;
+		htx_sent = htx_add_data(htx, ist2(ncb_head(buf), contig));
+		head = ncb_orig(buf);
 		len -= contig;
 		goto retry;
 	}
@@ -256,15 +257,15 @@
  */
 static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
 {
-	struct buffer *rxbuf = &qcs->rx.buf;
+	struct ncbuf *rxbuf = &qcs->rx.ncbuf;
 	struct h3s *h3s = qcs->ctx;
 	ssize_t ret;
 
 	h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id);
-	if (!b_data(rxbuf))
+	if (!ncb_data(rxbuf, 0))
 		return 0;
 
-	while (b_data(rxbuf) && !(qcs->flags & QC_SF_DEM_FULL)) {
+	while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) {
 		uint64_t ftype, flen;
 		struct buffer b;
 		char last_stream_frame = 0;
@@ -279,16 +280,17 @@
 			h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n",
 			                __func__, ftype, flen);
 
-			b_del(rxbuf, hlen);
+			ncb_advance(rxbuf, hlen);
 			h3s->demux_frame_type = ftype;
 			h3s->demux_frame_len = flen;
+			qcs->rx.offset += hlen;
 		}
 
 		flen = h3s->demux_frame_len;
 		ftype = h3s->demux_frame_type;
-		if (flen > b_data(&b) && !b_full(rxbuf))
+		if (flen > b_data(&b) && !ncb_is_full(rxbuf))
 			break;
-		last_stream_frame = (fin && flen == b_data(rxbuf));
+		last_stream_frame = (fin && flen == ncb_total_data(rxbuf));
 
 		switch (ftype) {
 		case H3_FT_DATA:
@@ -303,20 +305,21 @@
 			break;
 		case H3_FT_PUSH_PROMISE:
 			/* Not supported */
-			ret = MIN(b_data(rxbuf), flen);
+			ret = MIN(ncb_data(rxbuf, 0), flen);
 			break;
 		default:
 			/* draft-ietf-quic-http34 9. Extensions to HTTP/3
 			 * unknown frame types MUST be ignored
 			 */
 			h3_debug_printf(stderr, "ignore unknown frame type 0x%lx\n", ftype);
-			ret = MIN(b_data(rxbuf), flen);
+			ret = MIN(ncb_data(rxbuf, 0), flen);
 		}
 
 		if (ret) {
-			b_del(rxbuf, ret);
+			ncb_advance(rxbuf, ret);
 			BUG_ON(h3s->demux_frame_len < ret);
 			h3s->demux_frame_len -= ret;
+			qcs->rx.offset += ret;
 		}
 	}
 
@@ -386,7 +389,7 @@
 		struct buffer b;
 
 		/* Work on a copy of <rxbuf> */
-		b = h3_b_dup(rxbuf);
+		b = b_make(rxbuf->area, rxbuf->size, rxbuf->head, rxbuf->data);
 		hlen = h3_decode_frm_header(&ftype, &flen, &b);
 		if (!hlen)
 			break;
diff --git a/src/hq_interop.c b/src/hq_interop.c
index dcaba8e..db4387e 100644
--- a/src/hq_interop.c
+++ b/src/hq_interop.c
@@ -7,19 +7,20 @@
 #include <haproxy/htx.h>
 #include <haproxy/http.h>
 #include <haproxy/mux_quic.h>
+#include <haproxy/ncbuf.h>
 
 static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
 {
-	struct buffer *rxbuf = &qcs->rx.buf;
+	struct ncbuf *rxbuf = &qcs->rx.ncbuf;
 	struct htx *htx;
 	struct htx_sl *sl;
 	struct conn_stream *cs;
 	struct buffer htx_buf = BUF_NULL;
 	struct ist path;
-	char *ptr = b_head(rxbuf);
-	char *end = b_wrap(rxbuf);
-	size_t size = b_size(rxbuf);
-	size_t data = b_data(rxbuf);
+	char *ptr = ncb_head(rxbuf);
+	char *end = ncb_wrap(rxbuf);
+	size_t size = ncb_size(rxbuf);
+	size_t data = ncb_data(rxbuf, 0);
 
 	b_alloc(&htx_buf);
 	htx = htx_from_buf(&htx_buf);
@@ -76,7 +77,8 @@
 		return -1;
 
 
-	b_del(rxbuf, b_data(rxbuf));
+	qcs->rx.offset += ncb_data(rxbuf, 0);
+	ncb_advance(rxbuf, ncb_data(rxbuf, 0));
 	b_free(&htx_buf);
 
 	if (fin)
diff --git a/src/mux_quic.c b/src/mux_quic.c
index bc8910c..022b1db 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -8,6 +8,7 @@
 #include <haproxy/dynbuf.h>
 #include <haproxy/htx.h>
 #include <haproxy/list.h>
+#include <haproxy/ncbuf.h>
 #include <haproxy/pool.h>
 #include <haproxy/quic_stream.h>
 #include <haproxy/sink.h>
@@ -153,6 +154,7 @@
 	                                              qcc->rfctl.msd_bidi_l;
 
 	qcs->rx.buf = BUF_NULL;
+	qcs->rx.ncbuf = NCBUF_NULL;
 	qcs->rx.app_buf = BUF_NULL;
 	qcs->rx.offset = 0;
 	qcs->rx.frms = EB_ROOT_UNIQUE;
@@ -184,6 +186,16 @@
 	return NULL;
 }
 
+static void qc_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
+{
+	struct buffer buf;
+
+	buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
+	b_free(&buf);
+
+	*ncbuf = NCBUF_NULL;
+}
+
 /* Free a qcs. This function must only be done to remove a stream on allocation
  * error or connection shutdown. Else use qcs_destroy which handle all the
  * QUIC connection mechanism.
@@ -191,6 +203,7 @@
 void qcs_free(struct qcs *qcs)
 {
 	b_free(&qcs->rx.buf);
+	qc_free_ncbuf(qcs, &qcs->rx.ncbuf);
 	b_free(&qcs->tx.buf);
 
 	BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams);
@@ -215,6 +228,21 @@
 	return buf;
 }
 
+struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
+{
+	struct buffer buf = BUF_NULL;
+
+	if (ncb_is_null(ncbuf)) {
+		b_alloc(&buf);
+		BUG_ON(b_is_null(&buf));
+
+		*ncbuf = ncb_make(buf.area, buf.size, 0);
+		ncb_init(ncbuf, 0);
+	}
+
+	return ncbuf;
+}
+
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
 {
 	struct qcc *qcc = qcs->qcc;
@@ -344,22 +372,17 @@
  * <out_qcs>. In case of success, the caller can immediatly call qcc_decode_qcs
  * to process the frame content.
  *
- * Returns a code indicating how the frame was handled.
- * - 0: frame received completely and can be dropped.
- * - 1: frame not received but can be dropped.
- * - 2: frame cannot be handled, either partially or not at all. <done>
- *   indicated the number of bytes handled. The rest should be buffered.
+ * Returns 0 on success else non-zero.
  */
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
-             char fin, char *data, struct qcs **out_qcs, size_t *done)
+             char fin, char *data, struct qcs **out_qcs)
 {
 	struct qcs *qcs;
-	size_t total, diff;
+	enum ncb_ret ret;
 
 	TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
 
 	*out_qcs = NULL;
-	*done = 0;
 
 	qcs = qcc_get_qcs(qcc, id);
 	if (!qcs) {
@@ -375,44 +398,46 @@
 
 	*out_qcs = qcs;
 
-	if (offset > qcs->rx.offset)
-		return 2;
-
 	if (offset + len <= qcs->rx.offset) {
 		TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
 		return 0;
 	}
 
+	/* TODO if last frame already received, stream size must not change.
+	 * Else send FINAL_SIZE_ERROR.
+	 */
+
-	/* Last frame already handled for this stream. */
-	BUG_ON(qcs->flags & QC_SF_FIN_RECV);
 	/* TODO initial max-stream-data overflow. Implement FLOW_CONTROL_ERROR emission. */
 	BUG_ON(offset + len > qcs->rx.msd);
 
-	if (!qc_get_buf(qcs, &qcs->rx.buf) || b_full(&qcs->rx.buf)) {
+	if (!qc_get_ncbuf(qcs, &qcs->rx.ncbuf) || ncb_is_null(&qcs->rx.ncbuf)) {
 		/* TODO should mark qcs as full */
-		return 2;
+		ABORT_NOW();
+		return 1;
 	}
 
 	TRACE_DEVEL("newly received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
-	diff = qcs->rx.offset - offset;
-
-	len -= diff;
-	data += diff;
-
-	/* TODO handle STREAM frames larger than RX buffer. */
-	BUG_ON(len > b_size(&qcs->rx.buf));
+	if (offset < qcs->rx.offset) {
+		len -= qcs->rx.offset - offset;
+		offset = qcs->rx.offset;
+	}
 
-	total = b_putblk(&qcs->rx.buf, data, len);
-	qcs->rx.offset += total;
-	*done = total;
+	ret = ncb_add(&qcs->rx.ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
+	if (ret != NCB_RET_OK) {
+		if (ret == NCB_RET_DATA_REJ) {
+			/* TODO generate PROTOCOL_VIOLATION error */
+			TRACE_DEVEL("leaving on data rejected", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
+			            qcc->conn, qcs);
+		}
+		else if (ret == NCB_RET_GAP_SIZE) {
+			TRACE_DEVEL("cannot bufferize frame due to gap size limit", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
+			            qcc->conn, qcs);
+		}
+		return 1;
+	}
 
 	/* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */
-	BUG_ON(qcs->rx.offset == qcs->rx.msd);
-
-	if (total < len) {
-		TRACE_DEVEL("leaving on partially received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
-		return 2;
-	}
+	BUG_ON(offset + len == qcs->rx.msd);
 
 	if (fin)
 		qcs->flags |= QC_SF_FIN_RECV;
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index d4f3ef0..4f3bcf7 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -2194,105 +2194,19 @@
                                    struct quic_stream *strm_frm,
                                    struct quic_conn *qc)
 {
-	struct quic_rx_strm_frm *frm;
-	struct eb64_node *frm_node;
 	struct qcs *qcs = NULL;
-	size_t done, buf_was_full;
 	int ret;
 
 	ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len,
 	               strm_frm->offset.key, strm_frm->fin,
-	               (char *)strm_frm->data, &qcs, &done);
+	               (char *)strm_frm->data, &qcs);
 
-	/* invalid frame */
-	if (ret == 1)
+	/* frame rejected - packet must not be acknowledeged */
+	if (ret)
 		return 0;
 
-	/* already fully received offset */
-	if (ret == 0 && done == 0)
-		return 1;
-
-	/* frame not handled (partially or completely) must be buffered */
-	if (ret == 2) {
-		frm = new_quic_rx_strm_frm(strm_frm, pkt);
-		if (!frm) {
-			TRACE_PROTO("Could not alloc RX STREAM frame",
-			            QUIC_EV_CONN_PSTRM, qc);
-			return 0;
-		}
-
-		/* frame partially handled by the MUX */
-		if (done) {
-			BUG_ON(done >= frm->len); /* must never happen */
-			frm->len -= done;
-			frm->data += done;
-			frm->offset_node.key += done;
-		}
-
-		eb64_insert(&qcs->rx.frms, &frm->offset_node);
-		quic_rx_packet_refinc(pkt);
-
-		/* interrupt only if frame was not received at all. */
-		if (!done)
-			return 1;
-	}
-
-	/* Decode the data if buffer is already full as it's not possible to
-	 * dequeue a frame in this condition.
-	 */
-	if (b_full(&qcs->rx.buf))
+	if (qcs)
 		qcc_decode_qcs(qc->qcc, qcs);
-
- retry:
-	/* Frame received (partially or not) by the mux.
-	 * If there is buffered frame for next offset, it may be possible to
-	 * receive them now.
-	 */
-	frm_node = eb64_first(&qcs->rx.frms);
-	while (frm_node) {
-		frm = eb64_entry(frm_node,
-		                 struct quic_rx_strm_frm, offset_node);
-
-		ret = qcc_recv(qc->qcc, qcs->id, frm->len,
-		               frm->offset_node.key, frm->fin,
-		               (char *)frm->data, &qcs, &done);
-
-		BUG_ON(ret == 1); /* must never happen for buffered frames */
-
-		/* interrupt the parsing if the frame cannot be handled
-		 * entirely for the moment only.
-		 */
-		if (ret == 2) {
-			if (done) {
-				BUG_ON(done >= frm->len); /* must never happen */
-				frm->len -= done;
-				frm->data += done;
-
-				eb64_delete(&frm->offset_node);
-				frm->offset_node.key += done;
-				eb64_insert(&qcs->rx.frms, &frm->offset_node);
-			}
-			break;
-		}
-
-		/* Remove a newly received frame or an invalid one. */
-		frm_node = eb64_next(frm_node);
-		eb64_delete(&frm->offset_node);
-		quic_rx_packet_refdec(frm->pkt);
-		pool_free(pool_head_quic_rx_strm_frm, frm);
-	}
-
-	buf_was_full = b_full(&qcs->rx.buf);
-	/* Decode the received data. */
-	qcc_decode_qcs(qc->qcc, qcs);
-
-	/* Buffer was full so the reception was stopped. Now the buffer has
-	 * space available thanks to qcc_decode_qcs(). We can now retry to
-	 * handle more data.
-	 */
-	if (buf_was_full && !b_full(&qcs->rx.buf))
-		goto retry;
-
 	return 1;
 }