MINOR: mux-quic: implement MAX_STREAM_DATA emission

Send MAX_STREAM_DATA frames when at least half of the allocated
flow-control has been demuxed, frame and cleared. This is necessary to
support QUIC STREAM with received data greater than a buffer.

Transcoders must use the new function qcc_consume_qcs() to empty the QCS
buffer. This will allow to monitor current flow-control level and
generate a MAX_STREAM_DATA frame if required. This frame will be emitted
via qc_io_cb().
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index 1135ee2..7a9d09a 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -105,7 +105,8 @@
 		uint64_t offset; /* absolute current base offset of ncbuf */
 		struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
 		struct buffer app_buf; /* receive buffer used by conn_stream layer */
-		uint64_t msd; /* fctl bytes limit to enforce */
+		uint64_t msd; /* current max-stream-data limit to enforce */
+		uint64_t msd_init; /* initial max-stream-data */
 	} rx;
 	struct {
 		uint64_t offset; /* last offset of data ready to be sent */
diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h
index 55c6b00..87d1b99 100644
--- a/include/haproxy/mux_quic.h
+++ b/include/haproxy/mux_quic.h
@@ -23,6 +23,7 @@
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
 void qcs_notify_recv(struct qcs *qcs);
 void qcs_notify_send(struct qcs *qcs);
+void qcs_consume(struct qcs *qcs, uint64_t bytes);
 
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
              char fin, char *data);
diff --git a/src/h3.c b/src/h3.c
index 5307134..d61b8e3 100644
--- a/src/h3.c
+++ b/src/h3.c
@@ -288,10 +288,9 @@
 			h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n",
 			                __func__, ftype, flen);
 
-			ncb_advance(rxbuf, hlen);
 			h3s->demux_frame_type = ftype;
 			h3s->demux_frame_len = flen;
-			qcs->rx.offset += hlen;
+			qcs_consume(qcs, hlen);
 		}
 
 		flen = h3s->demux_frame_len;
@@ -327,10 +326,9 @@
 		}
 
 		if (ret) {
-			ncb_advance(rxbuf, ret);
 			BUG_ON(h3s->demux_frame_len < ret);
 			h3s->demux_frame_len -= ret;
-			qcs->rx.offset += ret;
+			qcs_consume(qcs, ret);
 		}
 	}
 
@@ -410,8 +408,7 @@
 		if (flen > b_data(&b))
 			break;
 
-		ncb_advance(rxbuf, hlen);
-		h3_uqs->qcs->rx.offset += hlen;
+		qcs_consume(h3_uqs->qcs, hlen);
 		/* From here, a frame must not be truncated */
 		switch (ftype) {
 		case H3_FT_CANCEL_PUSH:
@@ -435,8 +432,7 @@
 			h3->err = H3_FRAME_UNEXPECTED;
 			return 0;
 		}
-		ncb_advance(rxbuf, flen);
-		h3_uqs->qcs->rx.offset += flen;
+		qcs_consume(h3_uqs->qcs, flen);
 	}
 
 	/* Handle the case where remaining data are present in the buffer. This
@@ -796,8 +792,7 @@
 	if (!b_quic_dec_int(&strm_type, &b, &len) || strm_type > H3_UNI_STRM_TP_MAX)
 		return 0;
 
-	ncb_advance(rxbuf, len);
-	qcs->rx.offset += len;
+	qcs_consume(qcs, len);
 
 	/* Note that for all the uni-streams below, this is an error to receive two times the
 	 * same type of uni-stream (even for Push stream which is not supported at this time.
diff --git a/src/hq_interop.c b/src/hq_interop.c
index db4387e..a47f5d1 100644
--- a/src/hq_interop.c
+++ b/src/hq_interop.c
@@ -76,9 +76,7 @@
 	if (!cs)
 		return -1;
 
-
-	qcs->rx.offset += ncb_data(rxbuf, 0);
-	ncb_advance(rxbuf, ncb_data(rxbuf, 0));
+	qcs_consume(qcs, ncb_data(rxbuf, 0));
 	b_free(&htx_buf);
 
 	if (fin)
diff --git a/src/mux_quic.c b/src/mux_quic.c
index 54bec78..2d34a7a 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -160,6 +160,7 @@
 	/* TODO use uni limit for unidirectional streams */
 	qcs->rx.msd = quic_stream_is_local(qcc, id) ? qcc->lfctl.msd_bidi_l :
 	                                              qcc->lfctl.msd_bidi_r;
+	qcs->rx.msd_init = qcs->rx.msd;
 
 	qcs->tx.buf = BUF_NULL;
 	qcs->tx.offset = 0;
@@ -283,6 +284,38 @@
 	}
 }
 
+/* Remove <bytes> from <qcs> Rx buffer. This must be called by transcoders
+ * after STREAM parsing. Flow-control for received offsets may be allocated for
+ * the peer if needed.
+ */
+void qcs_consume(struct qcs *qcs, uint64_t bytes)
+{
+	struct qcc *qcc = qcs->qcc;
+	struct quic_frame *frm;
+	enum ncb_ret ret;
+
+	ret = ncb_advance(&qcs->rx.ncbuf, bytes);
+	if (ret) {
+		ABORT_NOW(); /* should not happens because removal only in data */
+	}
+
+	qcs->rx.offset += bytes;
+	if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
+		frm = pool_zalloc(pool_head_quic_frame);
+		BUG_ON(!frm); /* TODO handle this properly */
+
+		qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
+
+		LIST_INIT(&frm->reflist);
+		frm->type = QUIC_FT_MAX_STREAM_DATA;
+		frm->max_stream_data.id = qcs->id;
+		frm->max_stream_data.max_stream_data = qcs->rx.msd;
+
+		LIST_APPEND(&qcc->lfctl.frms, &frm->list);
+		tasklet_wakeup(qcc->wait_event.tasklet);
+	}
+}
+
 /* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
  * several streams, depending on the already open ones.
  * Return this node if succeeded, NULL if not.
@@ -449,9 +482,6 @@
 		return 1;
 	}
 
-	/* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */
-	BUG_ON(offset + len == qcs->rx.msd);
-
 	if (fin)
 		qcs->flags |= QC_SF_FIN_RECV;