MEDIUM: mux-quic: respect peer bidirectional stream data limit

Implement the flow-control max-streams-data limit on emission. We ensure
that we never push more than the offset limit set by the peer. When the
limit is reached, the stream is marked as blocked with a new flag
QC_SF_BLK_SFCTL to disable emission.

Currently, this is only implemented for bidirectional streams. It's
required to unify the sending for unidirectional streams via
qcs_push_frame from the H3 layer to respect the flow-control limit for
them.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index a5d4d2d..c3aac18 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -51,6 +51,8 @@
 
 	/* flow-control fields set by the peer which we must respect. */
 	struct {
+		uint64_t msd_bidi_l; /* initial max-stream-data for peer local streams */
+		uint64_t msd_bidi_r; /* initial max-stream-data for peer remote streams */
 	} rfctl;
 
 	struct {
@@ -78,6 +80,7 @@
 #define QC_SF_FIN_STREAM        0x00000002  /* FIN bit must be set for last frame of the stream */
 #define QC_SF_BLK_MROOM         0x00000004  /* app layer is blocked waiting for room in the qcs.tx.buf */
 #define QC_SF_DETACH            0x00000008  /* cs is detached but there is remaining data to send */
+#define QC_SF_BLK_SFCTL         0x00000010  /* stream blocked due to stream flow control limit */
 
 struct qcs {
 	struct qcc *qcc;
@@ -97,6 +100,7 @@
 		uint64_t ack_offset; /* last acked ordered byte offset */
 		struct buffer buf; /* transmit buffer before sending via xprt */
 		struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */
+		uint64_t msd; /* fctl bytes limit to respect on emission */
 	} tx;
 
 	struct eb64_node by_id; /* place in qcc's streams_by_id */
diff --git a/src/mux_quic.c b/src/mux_quic.c
index e239856..eda6347 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -33,6 +33,11 @@
 	eb64_insert(&qcc->streams_by_id, &qcs->by_id);
 	qcc->strms[type].nb_streams++;
 
+	/* If stream is local, use peer remote-limit, or else the opposite. */
+	/* TODO use uni limit for unidirectional streams */
+	qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r :
+	                                              qcc->rfctl.msd_bidi_l;
+
 	qcs->rx.buf = BUF_NULL;
 	qcs->rx.app_buf = BUF_NULL;
 	qcs->rx.offset = 0;
@@ -355,6 +360,8 @@
  * <payload> to <out> buffer. The STREAM frame payload points to the <out>
  * buffer. The frame is then pushed to <frm_list>. If <fin> is set, and the
  * <payload> buf is emptied after transfer, FIN bit is set on the STREAM frame.
+ * Transfer is automatically adjusted to not exceed the stream flow-control
+ * limit.
  *
  * Returns the total bytes of newly transferred data or a negative error code.
  */
@@ -390,6 +397,12 @@
 	head = qcs->tx.sent_offset - qcs->tx.ack_offset;
 	left = qcs->tx.offset - qcs->tx.sent_offset;
 	to_xfer = QUIC_MIN(b_data(payload), b_room(out));
+
+	BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd);
+	/* do not exceed flow control limit */
+	if (qcs->tx.offset + to_xfer > qcs->tx.msd)
+		to_xfer = qcs->tx.msd - qcs->tx.offset;
+
 	if (!left && !to_xfer)
 		goto out;
 
@@ -450,6 +463,9 @@
 
 	/* increase offset on stream */
 	qcs->tx.sent_offset += diff;
+	BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
+	if (qcs->tx.sent_offset == qcs->tx.msd)
+		qcs->flags |= QC_SF_BLK_SFCTL;
 }
 
 /* Wrapper for send on transport layer. Send a list of frames <frms> for the
@@ -552,6 +568,11 @@
 			continue;
 		}
 
+		if (qcs->flags & QC_SF_BLK_SFCTL) {
+			node = eb64_next(node);
+			continue;
+		}
+
 		if (b_data(buf) || b_data(out)) {
 			int ret;
 			char fin = qcs->flags & QC_SF_FIN_STREAM;
@@ -703,7 +724,7 @@
                    struct session *sess, struct buffer *input)
 {
 	struct qcc *qcc;
-	struct quic_transport_params *lparams;
+	struct quic_transport_params *lparams, *rparams;
 
 	qcc = pool_alloc(pool_head_qcc);
 	if (!qcc)
@@ -752,6 +773,10 @@
 	qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi;
 	qcc->lfctl.cl_bidi_r = 0;
 
+	rparams = &conn->qc->tx.params;
+	qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
+	qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
+
 	qcc->wait_event.tasklet = tasklet_new();
 	if (!qcc->wait_event.tasklet)
 		goto fail_no_tasklet;