MAJOR: mux-quic: rework stream sending priorization

Implement a mechanism to register streams ready to send data in new
STREAM frames. Internally, this is implemented with a new list
<qcc.send_list> which contains qcs instances.

A qcs can be registered safely using the new function qcc_send_stream().
This is done automatically in qc_send_buf() which covers most cases.
Also, application layer is free to use it for internal usage streams.
This is currently the case for H3 control stream with SETTINGS sending.

The main point of this patch is to handle stream sending fairly. This is
in stark contrast with previous code where streams with lower ID were
always prioritized. This could cause other streams to be indefinitely
blocked behind a stream which has a lot of data to transfer. Now,
streams are handled in an order scheduled by se_desc layer.

This commit is the first one of a serie which will bring other
improvments which also relied on the send_list implementation.

This must be backported up to 2.7 when deemed sufficiently stable.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index f1c96f0..256cdd9 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -95,6 +95,7 @@
 	struct eb_root streams_by_id; /* all active streams by their ID */
 
 	struct list send_retry_list; /* list of qcs eligible to send retry */
+	struct list send_list; /* list of qcs ready to send */
 
 	struct wait_event wait_event;  /* To be used if we're waiting for I/Os */
 
@@ -174,6 +175,7 @@
 	struct qc_stream_desc *stream;
 
 	struct list el; /* element of qcc.send_retry_list */
+	struct list el_send; /* element of qcc.send_list */
 	struct list el_opening; /* element of qcc.opening_list */
 
 	struct wait_event wait_event;
diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h
index a165779..1d374b0 100644
--- a/include/haproxy/mux_quic.h
+++ b/include/haproxy/mux_quic.h
@@ -21,6 +21,7 @@
 
 void qcc_emit_cc_app(struct qcc *qcc, int err, int immediate);
 void qcc_reset_stream(struct qcs *qcs, int err);
+void qcc_send_stream(struct qcs *qcs);
 void qcc_abort_stream_read(struct qcs *qcs);
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
              char fin, char *data);
diff --git a/src/h3.c b/src/h3.c
index 8035764..e840448 100644
--- a/src/h3.c
+++ b/src/h3.c
@@ -1032,8 +1032,11 @@
 	}
 
 	ret = b_force_xfer(res, &pos, b_data(&pos));
-	if (ret > 0)
+	if (ret > 0) {
+		/* Register qcs for sending before other streams. */
+		qcc_send_stream(qcs);
 		h3c->flags |= H3_CF_SETTINGS_SENT;
+	}
 
 	TRACE_LEAVE(H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
 	return ret;
diff --git a/src/mux_quic.c b/src/mux_quic.c
index 120eeb1..6a324ac 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -61,6 +61,7 @@
 
 	/* Safe to use even if already removed from the list. */
 	LIST_DEL_INIT(&qcs->el_opening);
+	LIST_DEL_INIT(&qcs->el_send);
 
 	/* Release stream endpoint descriptor. */
 	BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
@@ -112,6 +113,7 @@
 	 * These fields must be initialed before.
 	 */
 	LIST_INIT(&qcs->el_opening);
+	LIST_INIT(&qcs->el_send);
 	qcs->start = TICK_ETERNITY;
 
 	/* store transport layer stream descriptor in qcc tree */
@@ -819,6 +821,22 @@
 	tasklet_wakeup(qcc->wait_event.tasklet);
 }
 
+/* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM. */
+void qcc_send_stream(struct qcs *qcs)
+{
+	struct qcc *qcc = qcs->qcc;
+
+	TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+	/* Cannot send if already closed. */
+	BUG_ON(qcs_is_close_local(qcs));
+
+	if (!LIST_INLIST(&qcs->el_send))
+		LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send);
+
+	TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+}
+
 /* Prepare for the emission of STOP_SENDING on <qcs>. */
 void qcc_abort_stream_read(struct qcs *qcs)
 {
@@ -1067,6 +1085,7 @@
 
 			if (qcs->flags & QC_SF_BLK_SFCTL) {
 				qcs->flags &= ~QC_SF_BLK_SFCTL;
+				/* TODO optim: only wakeup IO-CB if stream has data to sent. */
 				tasklet_wakeup(qcc->wait_event.tasklet);
 			}
 		}
@@ -1469,12 +1488,17 @@
 		}
 	}
 
-	if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf) &&
-	     qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
-		/* Close stream locally. */
-		qcs_close_local(qcs);
-		/* Reset flag to not emit multiple FIN STREAM frames. */
-		qcs->flags &= ~QC_SF_FIN_STREAM;
+	if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf)) {
+		/* Remove stream from send_list if all was sent. */
+		LIST_DEL_INIT(&qcs->el_send);
+		TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+		if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
+			/* Close stream locally. */
+			qcs_close_local(qcs);
+			/* Reset flag to not emit multiple FIN STREAM frames. */
+			qcs->flags &= ~QC_SF_FIN_STREAM;
+		}
 	}
 
  out:
@@ -1627,6 +1651,9 @@
 	int xfer = 0;
 	char fin = 0;
 
+	/* Cannot send STREAM on remote unidirectional streams. */
+	BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id));
+
 	/* Allocate <out> buffer if necessary. */
 	if (!out) {
 		if (qcc->flags & QC_CF_CONN_FULL)
@@ -1708,12 +1735,13 @@
 		qcc->flags |= QC_CF_APP_FINAL;
 	}
 
-	/* loop through all streams, construct STREAM frames if data available.
-	 * TODO optimize the loop to favor streams which are not too heavy.
+	/* Loop through all streams for STOP_SENDING/RESET_STREAM sending. Each
+	 * frame is send individually to guarantee emission.
+	 *
+	 * TODO Optimize sending by multiplexing several frames in one datagram.
 	 */
 	node = eb64_first(&qcc->streams_by_id);
 	while (node) {
-		int ret;
 		uint64_t id;
 
 		qcs = eb64_entry(node, struct qcs, by_id);
@@ -1733,26 +1761,23 @@
 			continue;
 		}
 
-		if (qcs_is_close_local(qcs)) {
-			node = eb64_next(node);
-			continue;
-		}
+		node = eb64_next(node);
+	}
 
-		if (qcs->flags & QC_SF_BLK_SFCTL) {
-			node = eb64_next(node);
-			continue;
-		}
+	/* Send STREAM data for registered streams. */
+	list_for_each_entry(qcs, &qcc->send_list, el_send) {
+		/* Stream must not be present in send_list if it has nothing to send. */
+		BUG_ON(!b_data(&qcs->tx.buf) &&
+		       qcs->tx.sent_offset == qcs->tx.offset &&
+		       !qcs_stream_fin(qcs));
 
-		/* Check if there is something to send. */
-		if (!b_data(&qcs->tx.buf) && !qcs_stream_fin(qcs) &&
-		    !qc_stream_buf_get(qcs->stream)) {
-			node = eb64_next(node);
+		if (qcs_is_close_local(qcs)) {
+			LIST_DEL_INIT(&qcs->el_send);
 			continue;
 		}
 
-		ret = _qc_send_qcs(qcs, &frms);
-		total += ret;
-		node = eb64_next(node);
+		if (!(qcs->flags & QC_SF_BLK_SFCTL))
+			total += _qc_send_qcs(qcs, &frms);
 	}
 
 	if (qc_send_frames(qcc, &frms)) {
@@ -1780,6 +1805,7 @@
 	/* Deallocate frames that the transport layer has rejected. */
 	if (!LIST_ISEMPTY(&frms)) {
 		struct quic_frame *frm, *frm2;
+
 		list_for_each_entry_safe(frm, frm2, &frms, list) {
 			LIST_DELETE(&frm->list);
 			pool_free(pool_head_quic_frame, frm);
@@ -2130,6 +2156,7 @@
 		goto fail_no_tasklet;
 	}
 
+	LIST_INIT(&qcc->send_list);
 	LIST_INIT(&qcc->send_retry_list);
 
 	qcc->wait_event.tasklet->process = qc_io_cb;
@@ -2300,6 +2327,7 @@
 		qcs->flags |= QC_SF_FIN_STREAM;
 
 	if (ret || fin) {
+		qcc_send_stream(qcs);
 		if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
 			tasklet_wakeup(qcs->qcc->wait_event.tasklet);
 	}