MINOR: mux-quic: implement immediate send retry

Complete qc_send function. After having processed each qcs emission, it
will now retry send on qcs where transfer can continue. This is useful
when qc_stream_desc buffer is full and there is still data present in
qcs buf.

To implement this, each eligible qcs is inserted in a new list
<qcc.send_retry_list>. This is done on send notification from the
transport layer through qcc_streams_sent_done(). Retry emission until
send_retry_list is empty or the transport layer cannot proceed more
data.

Several send operations are now called on two different places. Thus a
new _qc_send_qcs() function is defined to factorize the code.

This change should maximize the throughput during QUIC transfers.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index 44fb4d6..d95e9dc 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -10,6 +10,7 @@
 
 #include <haproxy/buf-t.h>
 #include <haproxy/connection-t.h>
+#include <haproxy/list-t.h>
 #include <haproxy/quic_stream-t.h>
 #include <haproxy/xprt_quic-t.h>
 #include <haproxy/conn_stream-t.h>
@@ -70,6 +71,8 @@
 
 	struct eb_root streams_by_id; /* all active streams by their ID */
 
+	struct list send_retry_list; /* list of qcs eligible to send retry */
+
 	struct wait_event wait_event;  /* To be used if we're waiting for I/Os */
 	struct wait_event *subs;
 
@@ -111,6 +114,8 @@
 	uint64_t id;
 	struct qc_stream_desc *stream;
 
+	struct list el; /* element of qcc.send_retry_list */
+
 	struct wait_event wait_event;
 	struct wait_event *subs;
 };
diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h
index 6d9359d..0550f4f 100644
--- a/include/haproxy/quic_stream.h
+++ b/include/haproxy/quic_stream.h
@@ -14,7 +14,6 @@
 void qc_stream_desc_free(struct qc_stream_desc *stream);
 
 struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
-int qc_stream_buf_avail(struct quic_conn *qc);
 struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
                                    uint64_t offset);
 void qc_stream_buf_release(struct qc_stream_desc *stream);
diff --git a/src/mux_quic.c b/src/mux_quic.c
index b969d3a..6a65d69 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -7,6 +7,7 @@
 #include <haproxy/conn_stream.h>
 #include <haproxy/dynbuf.h>
 #include <haproxy/htx.h>
+#include <haproxy/list.h>
 #include <haproxy/pool.h>
 #include <haproxy/quic_stream.h>
 #include <haproxy/sink.h>
@@ -722,14 +723,9 @@
 
 	if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
 		qc_stream_buf_release(qcs->stream);
-
-		/* reschedule send if buffers available */
-		if (qc_stream_buf_avail(qcc->conn->handle.qc)) {
-			tasklet_wakeup(qcc->wait_event.tasklet);
-		}
-		else {
-			qcc->flags |= QC_CF_CONN_FULL;
-		}
+		/* prepare qcs for immediate send retry if data to send */
+		if (b_data(&qcs->tx.buf))
+			LIST_APPEND(&qcc->send_retry_list, &qcs->el);
 	}
 }
 
@@ -760,9 +756,11 @@
 
 	if (LIST_ISEMPTY(frms)) {
 		TRACE_DEVEL("leaving with no frames to send", QMUX_EV_QCC_SEND, qcc->conn);
-		return 0;
+		return 1;
 	}
 
+	LIST_INIT(&qcc->send_retry_list);
+
  retry_send:
 	first_frm = LIST_ELEM(frms->n, struct quic_frame *, list);
 	if ((first_frm->type & QUIC_FT_STREAM_8) == QUIC_FT_STREAM_8) {
@@ -837,6 +835,65 @@
 	return 0;
 }
 
+/* Used internally by qc_send function. Proceed to send for <qcs>. This will
+ * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
+ * is then generated and inserted in <frms> list. <qcc_max_data> is the current
+ * flow-control max-data at the connection level which must not be surpassed.
+ *
+ * Returns the total bytes transferred between qcs and quic_stream buffers. Can
+ * be null if out buffer cannot be allocated.
+ */
+static int _qc_send_qcs(struct qcs *qcs, struct list *frms,
+                        uint64_t qcc_max_data)
+{
+	struct qcc *qcc = qcs->qcc;
+	struct buffer *buf = &qcs->tx.buf;
+	struct buffer *out = qc_stream_buf_get(qcs->stream);
+	int xfer = 0;
+
+	/* Allocate <out> buffer if necessary. */
+	if (!out) {
+		if (qcc->flags & QC_CF_CONN_FULL)
+			return 0;
+
+		out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
+		if (!out) {
+			qcc->flags |= QC_CF_CONN_FULL;
+			return 0;
+		}
+	}
+
+	/* Transfer data from <buf> to <out>. */
+	if (b_data(buf)) {
+		xfer = qcs_xfer_data(qcs, out, buf, qcc_max_data);
+		BUG_ON(xfer < 0); /* TODO handle this properly */
+
+		if (xfer > 0) {
+			qcs_notify_send(qcs);
+			qcs->flags &= ~QC_SF_BLK_MROOM;
+		}
+
+		qcs->tx.offset += xfer;
+	}
+
+	/* out buffer cannot be emptied if qcs offsets differ. */
+	BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset);
+
+	/* Build a new STREAM frame with <out> buffer. */
+	if (qcs->tx.sent_offset != qcs->tx.offset) {
+		int ret;
+		char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
+
+		/* FIN is set if all incoming data were transfered. */
+		fin = !!(fin && !b_data(buf));
+
+		ret = qcs_build_stream_frm(qcs, out, fin, frms);
+		BUG_ON(ret < 0); /* TODO handle this properly */
+	}
+
+	return xfer;
+}
+
 /* Proceed to sending. Loop through all available streams for the <qcc>
  * instance and try to send as much as possible.
  *
@@ -846,7 +903,8 @@
 {
 	struct list frms = LIST_HEAD_INIT(frms);
 	struct eb64_node *node;
-	int total = 0;
+	struct qcs *qcs, *qcs_tmp;
+	int total = 0, tmp_total = 0;
 
 	TRACE_ENTER(QMUX_EV_QCC_SEND);
 
@@ -867,9 +925,8 @@
 	 */
 	node = eb64_first(&qcc->streams_by_id);
 	while (node) {
-		struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
-		struct buffer *buf = &qcs->tx.buf;
-		struct buffer *out = qc_stream_buf_get(qcs->stream);
+		int ret;
+		qcs = eb64_entry(node, struct qcs, by_id);
 
 		/* TODO
 		 * for the moment, unidirectional streams have their own
@@ -886,63 +943,38 @@
 			continue;
 		}
 
-		if (!b_data(buf) && !out) {
-			node = eb64_next(node);
-			continue;
-		}
-
-		if (!out && (qcc->flags & QC_CF_CONN_FULL)) {
+		if (!b_data(&qcs->tx.buf) && !qc_stream_buf_get(qcs->stream)) {
 			node = eb64_next(node);
 			continue;
 		}
 
-		if (!out) {
-			out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
-			if (!out) {
-				qcc->flags |= QC_CF_CONN_FULL;
-				node = eb64_next(node);
-				continue;
-			}
-		}
-
-		/* Prepare <out> buffer with data from <buf>. */
-		if (b_data(buf)) {
-			int ret = qcs_xfer_data(qcs, out, buf,
-			                        qcc->tx.sent_offsets + total);
-			BUG_ON(ret < 0); /* TODO handle this properly */
-
-			if (ret > 0) {
-				qcs_notify_send(qcs);
-				if (qcs->flags & QC_SF_BLK_MROOM)
-					qcs->flags &= ~QC_SF_BLK_MROOM;
-			}
-
-			qcs->tx.offset += ret;
-			total += ret;
-		}
-
-		/* Subscribe if not all data can be transfered. */
-		if (b_data(buf)) {
-			qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
-			                           SUB_RETRY_SEND, &qcc->wait_event);
-		}
+		ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + total);
+		total += ret;
+		node = eb64_next(node);
+	}
 
-		/* Build a new STREAM frame with <out> buffer. */
-		if (b_data(out) && qcs->tx.sent_offset != qcs->tx.offset) {
-			int ret;
-			char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
+	if (qc_send_frames(qcc, &frms)) {
+		/* data rejected by transport layer, do not retry. */
+		goto out;
+	}
 
-			/* FIN is set if all incoming data were transfered. */
-			fin = !!(fin && !b_data(buf));
-			ret = qcs_build_stream_frm(qcs, out, fin, &frms);
-			BUG_ON(ret < 0); /* TODO handle this properly */
-		}
+ retry:
+	tmp_total = 0;
+	list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_retry_list, el) {
+		int ret;
+		BUG_ON(!b_data(&qcs->tx.buf));
+		BUG_ON(qc_stream_buf_get(qcs->stream));
 
-		node = eb64_next(node);
+		ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + tmp_total);
+		tmp_total += ret;
+		LIST_DELETE(&qcs->el);
 	}
 
-	qc_send_frames(qcc, &frms);
+	total += tmp_total;
+	if (!qc_send_frames(qcc, &frms) && !LIST_ISEMPTY(&qcc->send_retry_list))
+		goto retry;
 
+ out:
 	TRACE_LEAVE(QMUX_EV_QCC_SEND);
 
 	return total;
@@ -1105,6 +1137,8 @@
 	if (!qcc->wait_event.tasklet)
 		goto fail_no_tasklet;
 
+	LIST_INIT(&qcc->send_retry_list);
+
 	qcc->subs = NULL;
 	qcc->wait_event.tasklet->process = qc_io_cb;
 	qcc->wait_event.tasklet->context = qcc;
diff --git a/src/quic_stream.c b/src/quic_stream.c
index 2dd9b1c..839efd0 100644
--- a/src/quic_stream.c
+++ b/src/quic_stream.c
@@ -206,7 +206,7 @@
 /* Check if a new stream buffer can be allocated for the connection <qc>.
  * Returns a boolean.
  */
-int qc_stream_buf_avail(struct quic_conn *qc)
+static int qc_stream_buf_avail(struct quic_conn *qc)
 {
 	/* TODO use a global tune settings for max */
 	return qc->stream_buf_count < 30;