MEDIUM: mux-quic: refactor streams opening

Review the whole API used to access/instantiate qcs.

A public function qcc_open_stream_local() is available to the
application protocol layer. It allows to easily opening a local stream.
The ID is automatically attributed to the next one available.

For remote streams, qcc_open_stream_remote() has been implemented. It
will automatically take care of allocating streams in a linear way
according to the ID. This function is called via qcc_get_qcs() which can
be used for each qcc_recv*() operations. For the moment, it is only used
for STREAM frames via qcc_recv(), but soon it will be implemented for
other frames types which can also be used to open a new stream.

qcs_new() and qcs_free() has been restricted to the MUX QUIC only as
they are now reserved for internal usage.

This change is a pure refactoring and should not have any noticeable
impact. It clarifies the developer intent and help to ensure that a
stream is not automatically opened when not desired.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index 1772a38..168f074 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -37,7 +37,6 @@
 
 	struct {
 		uint64_t max_streams; /* maximum number of concurrent streams */
-		uint64_t largest_id;  /* Largest ID of the open streams */
 		uint64_t nb_streams;  /* Number of open streams */
 		struct {
 			uint64_t max_data; /* Maximum number of bytes which may be received */
@@ -80,6 +79,11 @@
 		uint64_t sent_offsets; /* sum of all offset sent */
 	} tx;
 
+	uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */
+	uint64_t largest_uni_r;  /* largest remote uni stream ID opened. */
+	uint64_t next_bidi_l; /* next stream ID to use for local bidi stream */
+	uint64_t next_uni_l;  /* next stream ID to use for local uni stream */
+
 	struct eb_root streams_by_id; /* all active streams by their ID */
 
 	struct list send_retry_list; /* list of qcs eligible to send retry */
diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h
index 3da5736..75e39a7 100644
--- a/include/haproxy/mux_quic.h
+++ b/include/haproxy/mux_quic.h
@@ -14,9 +14,7 @@
 #include <haproxy/stream.h>
 #include <haproxy/xprt_quic-t.h>
 
-struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type);
-void qcs_free(struct qcs *qcs);
-
+struct qcs *qcc_open_stream_local(struct qcc *qcc, int bidi);
 struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
 
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
@@ -71,8 +69,6 @@
 	return !quic_stream_is_uni(id);
 }
 
-struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id);
-
 /* Install the <app_ops> applicative layer of a QUIC connection on mux <qcc>.
  * Returns 0 on success else non-zero.
  */
diff --git a/src/h3.c b/src/h3.c
index 60c003b..eb930af 100644
--- a/src/h3.c
+++ b/src/h3.c
@@ -1081,7 +1081,7 @@
 	struct h3c *h3c = ctx;
 	struct qcs *qcs;
 
-	qcs = qcs_new(h3c->qcc, 0x3, QCS_SRV_UNI);
+	qcs = qcc_open_stream_local(h3c->qcc, 0);
 	if (!qcs)
 		return 0;
 
diff --git a/src/mux_quic.c b/src/mux_quic.c
index 8abf164..24c7373 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -113,7 +113,7 @@
 }
 
 /* Allocate a new QUIC streams with id <id> and type <type>. */
-struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
+static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
 {
 	struct qcs *qcs;
 
@@ -201,7 +201,7 @@
  * error or connection shutdown. Else use qcs_destroy which handle all the
  * QUIC connection mechanism.
  */
-void qcs_free(struct qcs *qcs)
+static void qcs_free(struct qcs *qcs)
 {
 	qc_free_ncbuf(qcs, &qcs->rx.ncbuf);
 	b_free(&qcs->tx.buf);
@@ -291,92 +291,180 @@
 	}
 }
 
-/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
- * several streams, depending on the already open ones.
- * Return this node if succeeded, NULL if not.
+/* Open a locally initiated stream for the connection <qcc>. Set <bidi> for a
+ * bidirectional stream, else an unidirectional stream is opened. The next
+ * available ID on the connection will be used according to the stream type.
+ *
+ * Returns the allocated stream instance or NULL on error.
  */
-struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
+struct qcs *qcc_open_stream_local(struct qcc *qcc, int bidi)
 {
-	unsigned int strm_type;
-	int64_t sub_id;
-	struct eb64_node *node;
-	struct qcs *qcs = NULL;
+	struct qcs *qcs;
+	enum qcs_type type;
+	uint64_t *next;
 
-	strm_type = id & QCS_ID_TYPE_MASK;
-	sub_id = id >> QCS_ID_TYPE_SHIFT;
-	node = NULL;
-	if (quic_stream_is_local(qcc, id)) {
-		/* Local streams: this stream must be already opened. */
-		node = eb64_lookup(&qcc->streams_by_id, id);
-		if (!node) {
-			/* unknown stream id */
-			goto out;
-		}
-		qcs = eb64_entry(node, struct qcs, by_id);
+	TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
+
+	if (bidi) {
+		next = &qcc->next_bidi_l;
+		type = conn_is_back(qcc->conn) ? QCS_CLT_BIDI : QCS_SRV_BIDI;
 	}
 	else {
-		/* Remote streams. */
-		struct eb_root *strms;
-		uint64_t largest_id;
-		enum qcs_type qcs_type;
+		next = &qcc->next_uni_l;
+		type = conn_is_back(qcc->conn) ? QCS_CLT_UNI : QCS_SRV_UNI;
+	}
+
+	/* TODO ensure that we won't overflow remote peer flow control limit on
+	 * streams. Else, we should emit a STREAMS_BLOCKED frame.
+	 */
 
-		strms = &qcc->streams_by_id;
-		qcs_type = qcs_id_type(id);
+	qcs = qcs_new(qcc, *next, type);
+	if (!qcs)
+		return NULL;
 
-		/* TODO also checks max-streams for uni streams */
-		if (quic_stream_is_bidi(id)) {
-			if (sub_id + 1 > qcc->lfctl.ms_bidi) {
-				/* RFC 9000 4.6. Controlling Concurrency
-				 *
-				 * An endpoint that receives a frame with a
-				 * stream ID exceeding the limit it has sent
-				 * MUST treat this as a connection error of
-				 * type STREAM_LIMIT_ERROR
-				 */
-				qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR);
-				goto out;
-			}
-		}
+	TRACE_DEVEL("opening local stream",  QMUX_EV_QCS_NEW, qcc->conn, qcs);
+	*next += 4;
 
-		/* Note: ->largest_id was initialized with (uint64_t)-1 as value, 0 being a
-		 * correct value.
-		 */
-		largest_id = qcc->strms[qcs_type].largest_id;
-		if (sub_id > (int64_t)largest_id) {
-			/* RFC: "A stream ID that is used out of order results in all streams
-			 * of that type with lower-numbered stream IDs also being opened".
-			 * So, let's "open" these streams.
-			 */
-			int64_t i;
-			struct qcs *tmp_qcs;
+	TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
+	return qcs;
+}
+
+/* Open a remote initiated stream for the connection <qcc> with ID <id>. The
+ * caller is responsible to ensure that a stream with the same ID was not
+ * already opened. This function will also create all intermediaries streams
+ * with ID smaller than <id> not already opened before.
+ *
+ * Returns the allocated stream instance or NULL on error.
+ */
+static struct qcs *qcc_open_stream_remote(struct qcc *qcc, uint64_t id)
+{
+	struct qcs *qcs = NULL;
+	enum qcs_type type;
+	uint64_t *largest;
 
-			tmp_qcs = NULL;
-			for (i = largest_id + 1; i <= sub_id; i++) {
-				uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
-				enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
+	TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
 
-				tmp_qcs = qcs_new(qcc, id, type);
-				if (!tmp_qcs) {
-					/* allocation failure */
-					goto out;
-				}
+	BUG_ON_HOT(quic_stream_is_local(qcc, id));
 
-				qcc->strms[qcs_type].largest_id = i;
-			}
-			if (tmp_qcs)
-				qcs = tmp_qcs;
+	if (quic_stream_is_bidi(id)) {
+		largest = &qcc->largest_bidi_r;
+		type = conn_is_back(qcc->conn) ? QCS_SRV_BIDI : QCS_CLT_BIDI;
+	}
+	else {
+		largest = &qcc->largest_uni_r;
+		type = conn_is_back(qcc->conn) ? QCS_SRV_UNI : QCS_CLT_UNI;
+	}
+
+	/* TODO also checks max-streams for uni streams */
+	if (quic_stream_is_bidi(id)) {
+		if (id >= qcc->lfctl.ms_bidi * 4) {
+			/* RFC 9000 4.6. Controlling Concurrency
+			 *
+			 * An endpoint that receives a frame with a
+			 * stream ID exceeding the limit it has sent
+			 * MUST treat this as a connection error of
+			 * type STREAM_LIMIT_ERROR
+			 */
+			TRACE_DEVEL("leaving on flow control error", QMUX_EV_QCS_NEW, qcc->conn);
+			qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR);
+			return NULL;
 		}
-		else {
-			node = eb64_lookup(strms, id);
-			if (node)
-				qcs = eb64_entry(node, struct qcs, by_id);
+	}
+
+	/* Only stream ID not already opened can be used. */
+	BUG_ON(id < *largest);
+
+	while (id >= *largest) {
+		const char *str = *largest < id ? "opening intermediary stream" :
+		                                  "opening remote stream";
+
+		qcs = qcs_new(qcc, *largest, type);
+		if (!qcs) {
+			TRACE_DEVEL("leaving on stream fallocation failure", QMUX_EV_QCS_NEW, qcc->conn);
+			return NULL;
 		}
+
+		TRACE_DEVEL(str, QMUX_EV_QCS_NEW, qcc->conn, qcs);
+		*largest += 4;
 	}
 
+	TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
 	return qcs;
+}
+
+/* Use this function for a stream <id> which is not in <qcc> stream tree. It
+ * returns true if the associated stream is closed.
+ */
+static int qcc_stream_id_is_closed(struct qcc *qcc, uint64_t id)
+{
+	uint64_t *largest;
+
+	/* This function must only be used for stream not present in the stream tree. */
+	BUG_ON_HOT(eb64_lookup(&qcc->streams_by_id, id));
+
+	if (quic_stream_is_local(qcc, id)) {
+		largest = quic_stream_is_uni(id) ? &qcc->next_uni_l :
+		                                   &qcc->next_bidi_l;
+	}
+	else {
+		largest = quic_stream_is_uni(id) ? &qcc->largest_uni_r :
+		                                   &qcc->largest_bidi_r;
+	}
+
+	return id < *largest;
+}
+
+/* Retrieve the stream instance from <id> ID. This can be used when receiving
+ * STREAM, STREAM_DATA_BLOCKED, RESET_STREAM, MAX_STREAM_DATA or STOP_SENDING
+ * frames.
+ *
+ * Return the stream instance or NULL if not found.
+ */
+static struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
+{
+	struct eb64_node *node;
+	struct qcs *qcs = NULL;
+
+	TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
+
+	/* Search the stream in the connection tree. */
+	node = eb64_lookup(&qcc->streams_by_id, id);
+	if (node) {
+		qcs = eb64_entry(node, struct qcs, by_id);
+		TRACE_DEVEL("using stream from connection tree", QMUX_EV_QCC_RECV, qcc->conn, qcs);
+		return qcs;
+	}
+
+	/* Check if stream is already closed. */
+	if (qcc_stream_id_is_closed(qcc, id)) {
+		TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
+		return NULL;
+	}
+
+	/* Create the stream. This is valid only for remote initiated one. A
+	 * local stream must have already been explicitely created by the
+	 * application protocol layer.
+	 */
+	if (quic_stream_is_local(qcc, id)) {
+		/* RFC 9000 19.8. STREAM Frames
+		 *
+		 * An endpoint MUST terminate the connection with error
+		 * STREAM_STATE_ERROR if it receives a STREAM frame for a locally
+		 * initiated stream that has not yet been created, or for a send-only
+		 * stream.
+		 */
+		TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
+		qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR);
+		return NULL;
+	}
+	else {
+		/* Remote stream not found - try to open it. */
+		qcs = qcc_open_stream_remote(qcc, id);
+	}
 
  out:
-	return NULL;
+	TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
+	return qcs;
 }
 
 /* Simple function to duplicate a buffer */
@@ -515,30 +603,8 @@
 	}
 
 	qcs = qcc_get_qcs(qcc, id);
-	if (!qcs) {
-		if ((id >> QCS_ID_TYPE_SHIFT) <= qcc->strms[qcs_id_type(id)].largest_id) {
-			TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
-			return 0;
-		}
-		else {
-			/* RFC 9000 19.8. STREAM Frames
-			 *
-			 * An endpoint MUST terminate the connection with error
-			 * STREAM_STATE_ERROR if it receives a STREAM frame for a locally
-			 * initiated stream that has not yet been created, or for a send-only
-			 * stream.
-			 */
-			if (quic_stream_is_local(qcc, id)) {
-				TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
-				qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR);
-				return 1;
-			}
-			else {
-				TRACE_DEVEL("leaving on stream not found", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
-				return 1;
-			}
-		}
-	}
+	if (!qcs)
+		return 0;
 
 	if (offset + len <= qcs->rx.offset) {
 		TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
@@ -1370,26 +1436,22 @@
 	/* Client initiated streams must respect the server flow control. */
 	qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;
 	qcc->strms[QCS_CLT_BIDI].nb_streams = 0;
-	qcc->strms[QCS_CLT_BIDI].largest_id = -1;
 	qcc->strms[QCS_CLT_BIDI].rx.max_data = 0;
 	qcc->strms[QCS_CLT_BIDI].tx.max_data = lparams->initial_max_stream_data_bidi_remote;
 
 	qcc->strms[QCS_CLT_UNI].max_streams = lparams->initial_max_streams_uni;
 	qcc->strms[QCS_CLT_UNI].nb_streams = 0;
-	qcc->strms[QCS_CLT_UNI].largest_id = -1;
 	qcc->strms[QCS_CLT_UNI].rx.max_data = 0;
 	qcc->strms[QCS_CLT_UNI].tx.max_data = lparams->initial_max_stream_data_uni;
 
 	/* Server initiated streams must respect the server flow control. */
 	qcc->strms[QCS_SRV_BIDI].max_streams = 0;
 	qcc->strms[QCS_SRV_BIDI].nb_streams = 0;
-	qcc->strms[QCS_SRV_BIDI].largest_id = -1;
 	qcc->strms[QCS_SRV_BIDI].rx.max_data = lparams->initial_max_stream_data_bidi_local;
 	qcc->strms[QCS_SRV_BIDI].tx.max_data = 0;
 
 	qcc->strms[QCS_SRV_UNI].max_streams = 0;
 	qcc->strms[QCS_SRV_UNI].nb_streams = 0;
-	qcc->strms[QCS_SRV_UNI].largest_id = -1;
 	qcc->strms[QCS_SRV_UNI].rx.max_data = lparams->initial_max_stream_data_uni;
 	qcc->strms[QCS_SRV_UNI].tx.max_data = 0;
 
@@ -1407,6 +1469,19 @@
 	qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
 	qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
 
+	if (conn_is_back(conn)) {
+		qcc->next_bidi_l    = 0x00;
+		qcc->largest_bidi_r = 0x01;
+		qcc->next_uni_l     = 0x02;
+		qcc->largest_uni_r  = 0x03;
+	}
+	else {
+		qcc->largest_bidi_r = 0x00;
+		qcc->next_bidi_l    = 0x01;
+		qcc->largest_uni_r  = 0x02;
+		qcc->next_uni_l     = 0x03;
+	}
+
 	qcc->wait_event.tasklet = tasklet_new();
 	if (!qcc->wait_event.tasklet)
 		goto fail_no_tasklet;