MEDIUM: quic: implement multi-buffered Tx streams

Complete the qc_stream_desc type to support multiple buffers on
emission. The main objective is to increase the transfer throughput.
The MUX is now able to transfer more data without having to wait ACKs.

To implement this feature, a new type qc_stream_buf is declared. it
encapsulates a buffer with a list element. New functions are defined to
retrieve the current buffer, release it or allocate a new one. Each
buffer is kept in the qc_stream_desc list until all of its data is
acknowledged.

On the MUX side, a qcs uses the current stream buffer to transfer data.
Once the buffer is full, it is released and a new one will be allocated
on a future qc_send() invocation.
diff --git a/include/haproxy/quic_stream-t.h b/include/haproxy/quic_stream-t.h
index f1feb5c..e10ca6d 100644
--- a/include/haproxy/quic_stream-t.h
+++ b/include/haproxy/quic_stream-t.h
@@ -6,6 +6,18 @@
 #include <import/ebtree-t.h>
 
 #include <haproxy/buf-t.h>
+#include <haproxy/list-t.h>
+
+/* A QUIC STREAM buffer used for Tx.
+ *
+ * Currently, no offset is associated with an offset. The qc_stream_desc must
+ * store them in order and keep the offset of the oldest buffer. The buffers
+ * can be freed in strict order.
+ */
+struct qc_stream_buf {
+	struct buffer buf; /* STREAM payload */
+	struct list list; /* element for qc_stream_desc list */
+};
 
 /* QUIC STREAM descriptor.
  *
@@ -20,7 +32,10 @@
 	struct eb64_node by_id; /* node for quic_conn tree */
 	struct quic_conn *qc;
 
-	struct buffer buf; /* buffer for STREAM data on Tx, emptied on acknowledge */
+	struct list buf_list; /* buffers waiting for ACK, oldest offset first */
+	struct qc_stream_buf *buf; /* current buffer used by the MUX */
+	uint64_t buf_offset; /* base offset of current buffer */
+
 	uint64_t ack_offset; /* last acknowledged offset */
 	struct eb_root acked_frms; /* ACK frames tree for non-contiguous ACK ranges */
 
diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h
index 38a180b..bd26a48 100644
--- a/include/haproxy/quic_stream.h
+++ b/include/haproxy/quic_stream.h
@@ -10,7 +10,13 @@
 struct qc_stream_desc *qc_stream_desc_new(uint64_t id, void *ctx,
                                           struct quic_conn *qc);
 void qc_stream_desc_release(struct qc_stream_desc *stream);
-int qc_stream_desc_free(struct qc_stream_desc *stream);
+void qc_stream_desc_free(struct qc_stream_desc *stream);
+
+struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
+struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
+                                   uint64_t offset);
+void qc_stream_buf_release(struct qc_stream_desc *stream);
+int qc_stream_desc_free_buf(struct qc_stream_desc *stream);
 
 #endif /* USE_QUIC */
 #endif /* _HAPROXY_QUIC_STREAM_H_ */
diff --git a/src/mux_quic.c b/src/mux_quic.c
index 63a75e2..d0fd0b2 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -627,16 +627,24 @@
 	struct qcc *qcc = qcs->qcc;
 	struct quic_frame *frm;
 	int head, total;
+	uint64_t base_off;
 
 	TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
 
-	/* cf buffer schema in qcs_xfer_data */
-	head = qcs->tx.sent_offset - qcs->stream->ack_offset;
+	/* if ack_offset < buf_offset, it points to an older buffer. */
+	base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
+	BUG_ON(qcs->tx.sent_offset < base_off);
+
+	head = qcs->tx.sent_offset - base_off;
 	total = b_data(out) - head;
+	BUG_ON(total < 0);
+
 	if (!total) {
 		TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
 		return 0;
 	}
+	BUG_ON(qcs->tx.sent_offset >= qcs->tx.offset);
+	BUG_ON(qcs->tx.sent_offset + total > qcs->tx.offset);
 
 	frm = pool_zalloc(pool_head_quic_frame);
 	if (!frm)
@@ -689,6 +697,7 @@
 	uint64_t diff;
 
 	BUG_ON(offset > qcs->tx.sent_offset);
+	BUG_ON(offset >= qcs->tx.offset);
 
 	/* check if the STREAM frame has already been notified. It can happen
 	 * for retransmission.
@@ -707,8 +716,14 @@
 	/* increase offset on stream */
 	qcs->tx.sent_offset += diff;
 	BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
+	BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset);
 	if (qcs->tx.sent_offset == qcs->tx.msd)
 		qcs->flags |= QC_SF_BLK_SFCTL;
+
+	if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
+		qc_stream_buf_release(qcs->stream);
+		tasklet_wakeup(qcc->wait_event.tasklet);
+	}
 }
 
 /* Wrapper for send on transport layer. Send a list of frames <frms> for the
@@ -847,7 +862,7 @@
 	while (node) {
 		struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
 		struct buffer *buf = &qcs->tx.buf;
-		struct buffer *out = &qcs->stream->buf;
+		struct buffer *out = qc_stream_buf_get(qcs->stream);
 
 		/* TODO
 		 * for the moment, unidirectional streams have their own
@@ -864,6 +879,24 @@
 			continue;
 		}
 
+		if (!b_data(buf) && !out) {
+			node = eb64_next(node);
+			continue;
+		}
+
+		if (!out) {
+			struct connection *conn = qcc->conn;
+
+			out = qc_stream_buf_alloc(qcs->stream,
+			                          qcs->tx.offset);
+			if (!out) {
+				conn->xprt->subscribe(conn, conn->xprt_ctx,
+				                      SUB_RETRY_SEND, &qcc->wait_event);
+				node = eb64_next(node);
+				continue;
+			}
+		}
+
 		/* Prepare <out> buffer with data from <buf>. */
 		if (b_data(buf)) {
 			int ret = qcs_xfer_data(qcs, out, buf,
@@ -887,7 +920,7 @@
 		}
 
 		/* Build a new STREAM frame with <out> buffer. */
-		if (b_data(out)) {
+		if (b_data(out) && qcs->tx.sent_offset != qcs->tx.offset) {
 			int ret;
 			char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
 
diff --git a/src/quic_stream.c b/src/quic_stream.c
index 69fafbb..72474ac 100644
--- a/src/quic_stream.c
+++ b/src/quic_stream.c
@@ -11,6 +11,9 @@
 
 DECLARE_STATIC_POOL(pool_head_quic_conn_stream, "qc_stream_desc",
                     sizeof(struct qc_stream_desc));
+DECLARE_STATIC_POOL(pool_head_quic_conn_stream_buf, "qc_stream_buf",
+                    sizeof(struct qc_stream_buf));
+
 
 /* Allocate a new stream descriptor with id <id>. The caller is responsible to
  * store the stream in the appropriate tree.
@@ -30,7 +33,10 @@
 	eb64_insert(&qc->streams_by_id, &stream->by_id);
 	stream->qc = qc;
 
-	stream->buf = BUF_NULL;
+	stream->buf = NULL;
+	LIST_INIT(&stream->buf_list);
+	stream->buf_offset = 0;
+
 	stream->acked_frms = EB_ROOT;
 	stream->ack_offset = 0;
 	stream->release = 0;
@@ -50,46 +56,144 @@
 	stream->release = 1;
 	stream->ctx = NULL;
 
-	if (!b_data(&stream->buf))
+	if (LIST_ISEMPTY(&stream->buf_list)) {
+		/* if no buffer left we can free the stream. */
 		qc_stream_desc_free(stream);
+	}
+	else {
+		/* A released stream does not use <stream.buf>. */
+		stream->buf = NULL;
+	}
+}
+
+/* Free the stream descriptor <stream> content. This function should be used
+ * when all its data have been acknowledged or on full connection closing. It
+ * must only be called after the stream is released.
+ */
+void qc_stream_desc_free(struct qc_stream_desc *stream)
+{
+	struct qc_stream_buf *buf, *buf_back;
+	struct eb64_node *frm_node;
+	unsigned int free_count = 0;
+
+	/* This function only deals with released streams. */
+	BUG_ON(!stream->release);
+
+	/* free remaining stream buffers */
+	list_for_each_entry_safe(buf, buf_back, &stream->buf_list, list) {
+		if (!(b_data(&buf->buf))) {
+			b_free(&buf->buf);
+			LIST_DELETE(&buf->list);
+			pool_free(pool_head_quic_conn_stream_buf, buf);
+
+			++free_count;
+		}
+	}
+
+	if (free_count)
+		offer_buffers(NULL, free_count);
+
+	/* qc_stream_desc might be freed before having received all its ACKs.
+	 * This is the case if some frames were retransmitted.
+	 */
+	frm_node = eb64_first(&stream->acked_frms);
+	while (frm_node) {
+		struct quic_stream *strm;
+		struct quic_frame *frm;
+
+		strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
+
+		frm_node = eb64_next(frm_node);
+		eb64_delete(&strm->offset);
+
+		frm = container_of(strm, struct quic_frame, stream);
+		LIST_DELETE(&frm->list);
+		quic_tx_packet_refdec(frm->pkt);
+		pool_free(pool_head_quic_frame, frm);
+	}
+
+	eb64_delete(&stream->by_id);
+	pool_free(pool_head_quic_conn_stream, stream);
+}
+
+/* Return the current buffer of <stream>. May be NULL if not allocated. */
+struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
+{
+	if (!stream->buf)
+		return NULL;
+
+	return &stream->buf->buf;
 }
 
-/* Free the stream descriptor <stream> buffer. This function should be used
- * when all its data have been acknowledged. If the stream was released by the
- * upper layer, the stream descriptor will be freed.
+/* Allocate a new current buffer for <stream>. This function is not allowed if
+ * current buffer is not NULL prior to this call. The new buffer represents
+ * stream payload at offset <offset>.
  *
- * Returns 0 if the stream was not freed else non-zero.
+ * Returns the buffer or NULL.
  */
-int qc_stream_desc_free(struct qc_stream_desc *stream)
+struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
+                                   uint64_t offset)
 {
-	b_free(&stream->buf);
-	offer_buffers(NULL, 1);
+	/* current buffer must be released first before allocate a new one. */
+	BUG_ON(stream->buf);
 
-	if (stream->release) {
-		/* Free frames still waiting for an ACK. Even if the stream buf
-		 * is NULL, some frames could still be not acknowledged. This
-		 * is notably the case for retransmission where multiple frames
-		 * points to the same buffer content.
-		 */
-		struct eb64_node *frm_node = eb64_first(&stream->acked_frms);
-		while (frm_node) {
-			struct quic_stream *strm;
-			struct quic_frame *frm;
+	stream->buf_offset = offset;
+	stream->buf = pool_alloc(pool_head_quic_conn_stream_buf);
+	if (!stream->buf)
+		return NULL;
 
-			strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
+	stream->buf->buf = BUF_NULL;
+	LIST_APPEND(&stream->buf_list, &stream->buf->list);
 
-			frm_node = eb64_next(frm_node);
-			eb64_delete(&strm->offset);
+	return &stream->buf->buf;
+}
 
-			frm = container_of(strm, struct quic_frame, stream);
-			LIST_DELETE(&frm->list);
-			quic_tx_packet_refdec(frm->pkt);
-			pool_free(pool_head_quic_frame, frm);
-		}
+/* Release the current buffer of <stream>. It will be kept internally by
+ * the <stream>. The current buffer cannot be NULL.
+ */
+void qc_stream_buf_release(struct qc_stream_desc *stream)
+{
+	/* current buffer already released */
+	BUG_ON(!stream->buf);
 
-		eb64_delete(&stream->by_id);
-		pool_free(pool_head_quic_conn_stream, stream);
+	stream->buf = NULL;
+	stream->buf_offset = 0;
+}
 
+/* Free the oldest buffer of <stream>. If the stream was already released and
+ * does not references any buffers, it is freed. This function must only be
+ * called if at least one buffer is present. Use qc_stream_desc_free() to free
+ * a released stream.
+ *
+ * Returns 0 if the stream is not yet freed else 1.
+ */
+int qc_stream_desc_free_buf(struct qc_stream_desc *stream)
+{
+	struct qc_stream_buf *stream_buf;
+
+	BUG_ON(LIST_ISEMPTY(&stream->buf_list) && !stream->buf);
+
+	if (!LIST_ISEMPTY(&stream->buf_list)) {
+		/* get first buffer from buf_list and remove it */
+		stream_buf = LIST_NEXT(&stream->buf_list, struct qc_stream_buf *,
+		                       list);
+		LIST_DELETE(&stream_buf->list);
+	}
+	else {
+		stream_buf = stream->buf;
+		stream->buf = NULL;
+	}
+
+	b_free(&stream_buf->buf);
+	pool_free(pool_head_quic_conn_stream_buf, stream_buf);
+
+	offer_buffers(NULL, 1);
+
+	/* If qc_stream_desc is released and does not contains any buffers, we
+	 * can free it now.
+	 */
+	if (stream->release && LIST_ISEMPTY(&stream->buf_list)) {
+		qc_stream_desc_free(stream);
 		return 1;
 	}
 
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index 47ec2c4..85cc764 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -1457,6 +1457,12 @@
 			                    stream->ack_offset;
 			stream->ack_offset += diff;
 			b_del(strm->buf, diff);
+			if (!b_data(strm->buf)) {
+				if (qc_stream_desc_free_buf(stream)) {
+					/* stream is freed here */
+					return 1;
+				}
+			}
 			ret = 1;
 		}
 
@@ -1469,11 +1475,6 @@
 		pool_free(pool_head_quic_frame, frm);
 	}
 
-	if (!b_data(&stream->buf)) {
-		if (qc_stream_desc_free(stream))
-			TRACE_PROTO("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);
-	}
-
 	return ret;
 }
 
@@ -1521,7 +1522,7 @@
 				stream_acked = 1;
 
 				if (!b_data(strm_frm->buf)) {
-					if (qc_stream_desc_free(stream)) {
+					if (qc_stream_desc_free_buf(stream)) {
 						/* stream is freed at this stage,
 						 * no need to continue.
 						 */