MEDIUM: mux-quic: refactor streams opening
Review the whole API used to access/instantiate qcs.
A public function qcc_open_stream_local() is available to the
application protocol layer. It allows to easily opening a local stream.
The ID is automatically attributed to the next one available.
For remote streams, qcc_open_stream_remote() has been implemented. It
will automatically take care of allocating streams in a linear way
according to the ID. This function is called via qcc_get_qcs() which can
be used for each qcc_recv*() operations. For the moment, it is only used
for STREAM frames via qcc_recv(), but soon it will be implemented for
other frames types which can also be used to open a new stream.
qcs_new() and qcs_free() has been restricted to the MUX QUIC only as
they are now reserved for internal usage.
This change is a pure refactoring and should not have any noticeable
impact. It clarifies the developer intent and help to ensure that a
stream is not automatically opened when not desired.
diff --git a/src/h3.c b/src/h3.c
index 60c003b..eb930af 100644
--- a/src/h3.c
+++ b/src/h3.c
@@ -1081,7 +1081,7 @@
struct h3c *h3c = ctx;
struct qcs *qcs;
- qcs = qcs_new(h3c->qcc, 0x3, QCS_SRV_UNI);
+ qcs = qcc_open_stream_local(h3c->qcc, 0);
if (!qcs)
return 0;
diff --git a/src/mux_quic.c b/src/mux_quic.c
index 8abf164..24c7373 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -113,7 +113,7 @@
}
/* Allocate a new QUIC streams with id <id> and type <type>. */
-struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
+static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
{
struct qcs *qcs;
@@ -201,7 +201,7 @@
* error or connection shutdown. Else use qcs_destroy which handle all the
* QUIC connection mechanism.
*/
-void qcs_free(struct qcs *qcs)
+static void qcs_free(struct qcs *qcs)
{
qc_free_ncbuf(qcs, &qcs->rx.ncbuf);
b_free(&qcs->tx.buf);
@@ -291,92 +291,180 @@
}
}
-/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
- * several streams, depending on the already open ones.
- * Return this node if succeeded, NULL if not.
+/* Open a locally initiated stream for the connection <qcc>. Set <bidi> for a
+ * bidirectional stream, else an unidirectional stream is opened. The next
+ * available ID on the connection will be used according to the stream type.
+ *
+ * Returns the allocated stream instance or NULL on error.
*/
-struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
+struct qcs *qcc_open_stream_local(struct qcc *qcc, int bidi)
{
- unsigned int strm_type;
- int64_t sub_id;
- struct eb64_node *node;
- struct qcs *qcs = NULL;
+ struct qcs *qcs;
+ enum qcs_type type;
+ uint64_t *next;
- strm_type = id & QCS_ID_TYPE_MASK;
- sub_id = id >> QCS_ID_TYPE_SHIFT;
- node = NULL;
- if (quic_stream_is_local(qcc, id)) {
- /* Local streams: this stream must be already opened. */
- node = eb64_lookup(&qcc->streams_by_id, id);
- if (!node) {
- /* unknown stream id */
- goto out;
- }
- qcs = eb64_entry(node, struct qcs, by_id);
+ TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
+
+ if (bidi) {
+ next = &qcc->next_bidi_l;
+ type = conn_is_back(qcc->conn) ? QCS_CLT_BIDI : QCS_SRV_BIDI;
}
else {
- /* Remote streams. */
- struct eb_root *strms;
- uint64_t largest_id;
- enum qcs_type qcs_type;
+ next = &qcc->next_uni_l;
+ type = conn_is_back(qcc->conn) ? QCS_CLT_UNI : QCS_SRV_UNI;
+ }
+
+ /* TODO ensure that we won't overflow remote peer flow control limit on
+ * streams. Else, we should emit a STREAMS_BLOCKED frame.
+ */
- strms = &qcc->streams_by_id;
- qcs_type = qcs_id_type(id);
+ qcs = qcs_new(qcc, *next, type);
+ if (!qcs)
+ return NULL;
- /* TODO also checks max-streams for uni streams */
- if (quic_stream_is_bidi(id)) {
- if (sub_id + 1 > qcc->lfctl.ms_bidi) {
- /* RFC 9000 4.6. Controlling Concurrency
- *
- * An endpoint that receives a frame with a
- * stream ID exceeding the limit it has sent
- * MUST treat this as a connection error of
- * type STREAM_LIMIT_ERROR
- */
- qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR);
- goto out;
- }
- }
+ TRACE_DEVEL("opening local stream", QMUX_EV_QCS_NEW, qcc->conn, qcs);
+ *next += 4;
- /* Note: ->largest_id was initialized with (uint64_t)-1 as value, 0 being a
- * correct value.
- */
- largest_id = qcc->strms[qcs_type].largest_id;
- if (sub_id > (int64_t)largest_id) {
- /* RFC: "A stream ID that is used out of order results in all streams
- * of that type with lower-numbered stream IDs also being opened".
- * So, let's "open" these streams.
- */
- int64_t i;
- struct qcs *tmp_qcs;
+ TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
+ return qcs;
+}
+
+/* Open a remote initiated stream for the connection <qcc> with ID <id>. The
+ * caller is responsible to ensure that a stream with the same ID was not
+ * already opened. This function will also create all intermediaries streams
+ * with ID smaller than <id> not already opened before.
+ *
+ * Returns the allocated stream instance or NULL on error.
+ */
+static struct qcs *qcc_open_stream_remote(struct qcc *qcc, uint64_t id)
+{
+ struct qcs *qcs = NULL;
+ enum qcs_type type;
+ uint64_t *largest;
- tmp_qcs = NULL;
- for (i = largest_id + 1; i <= sub_id; i++) {
- uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
- enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
+ TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
- tmp_qcs = qcs_new(qcc, id, type);
- if (!tmp_qcs) {
- /* allocation failure */
- goto out;
- }
+ BUG_ON_HOT(quic_stream_is_local(qcc, id));
- qcc->strms[qcs_type].largest_id = i;
- }
- if (tmp_qcs)
- qcs = tmp_qcs;
+ if (quic_stream_is_bidi(id)) {
+ largest = &qcc->largest_bidi_r;
+ type = conn_is_back(qcc->conn) ? QCS_SRV_BIDI : QCS_CLT_BIDI;
+ }
+ else {
+ largest = &qcc->largest_uni_r;
+ type = conn_is_back(qcc->conn) ? QCS_SRV_UNI : QCS_CLT_UNI;
+ }
+
+ /* TODO also checks max-streams for uni streams */
+ if (quic_stream_is_bidi(id)) {
+ if (id >= qcc->lfctl.ms_bidi * 4) {
+ /* RFC 9000 4.6. Controlling Concurrency
+ *
+ * An endpoint that receives a frame with a
+ * stream ID exceeding the limit it has sent
+ * MUST treat this as a connection error of
+ * type STREAM_LIMIT_ERROR
+ */
+ TRACE_DEVEL("leaving on flow control error", QMUX_EV_QCS_NEW, qcc->conn);
+ qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR);
+ return NULL;
}
- else {
- node = eb64_lookup(strms, id);
- if (node)
- qcs = eb64_entry(node, struct qcs, by_id);
+ }
+
+ /* Only stream ID not already opened can be used. */
+ BUG_ON(id < *largest);
+
+ while (id >= *largest) {
+ const char *str = *largest < id ? "opening intermediary stream" :
+ "opening remote stream";
+
+ qcs = qcs_new(qcc, *largest, type);
+ if (!qcs) {
+ TRACE_DEVEL("leaving on stream fallocation failure", QMUX_EV_QCS_NEW, qcc->conn);
+ return NULL;
}
+
+ TRACE_DEVEL(str, QMUX_EV_QCS_NEW, qcc->conn, qcs);
+ *largest += 4;
}
+ TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
return qcs;
+}
+
+/* Use this function for a stream <id> which is not in <qcc> stream tree. It
+ * returns true if the associated stream is closed.
+ */
+static int qcc_stream_id_is_closed(struct qcc *qcc, uint64_t id)
+{
+ uint64_t *largest;
+
+ /* This function must only be used for stream not present in the stream tree. */
+ BUG_ON_HOT(eb64_lookup(&qcc->streams_by_id, id));
+
+ if (quic_stream_is_local(qcc, id)) {
+ largest = quic_stream_is_uni(id) ? &qcc->next_uni_l :
+ &qcc->next_bidi_l;
+ }
+ else {
+ largest = quic_stream_is_uni(id) ? &qcc->largest_uni_r :
+ &qcc->largest_bidi_r;
+ }
+
+ return id < *largest;
+}
+
+/* Retrieve the stream instance from <id> ID. This can be used when receiving
+ * STREAM, STREAM_DATA_BLOCKED, RESET_STREAM, MAX_STREAM_DATA or STOP_SENDING
+ * frames.
+ *
+ * Return the stream instance or NULL if not found.
+ */
+static struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
+{
+ struct eb64_node *node;
+ struct qcs *qcs = NULL;
+
+ TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
+
+ /* Search the stream in the connection tree. */
+ node = eb64_lookup(&qcc->streams_by_id, id);
+ if (node) {
+ qcs = eb64_entry(node, struct qcs, by_id);
+ TRACE_DEVEL("using stream from connection tree", QMUX_EV_QCC_RECV, qcc->conn, qcs);
+ return qcs;
+ }
+
+ /* Check if stream is already closed. */
+ if (qcc_stream_id_is_closed(qcc, id)) {
+ TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
+ return NULL;
+ }
+
+ /* Create the stream. This is valid only for remote initiated one. A
+ * local stream must have already been explicitely created by the
+ * application protocol layer.
+ */
+ if (quic_stream_is_local(qcc, id)) {
+ /* RFC 9000 19.8. STREAM Frames
+ *
+ * An endpoint MUST terminate the connection with error
+ * STREAM_STATE_ERROR if it receives a STREAM frame for a locally
+ * initiated stream that has not yet been created, or for a send-only
+ * stream.
+ */
+ TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
+ qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR);
+ return NULL;
+ }
+ else {
+ /* Remote stream not found - try to open it. */
+ qcs = qcc_open_stream_remote(qcc, id);
+ }
out:
- return NULL;
+ TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
+ return qcs;
}
/* Simple function to duplicate a buffer */
@@ -515,30 +603,8 @@
}
qcs = qcc_get_qcs(qcc, id);
- if (!qcs) {
- if ((id >> QCS_ID_TYPE_SHIFT) <= qcc->strms[qcs_id_type(id)].largest_id) {
- TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
- return 0;
- }
- else {
- /* RFC 9000 19.8. STREAM Frames
- *
- * An endpoint MUST terminate the connection with error
- * STREAM_STATE_ERROR if it receives a STREAM frame for a locally
- * initiated stream that has not yet been created, or for a send-only
- * stream.
- */
- if (quic_stream_is_local(qcc, id)) {
- TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
- qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR);
- return 1;
- }
- else {
- TRACE_DEVEL("leaving on stream not found", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
- return 1;
- }
- }
- }
+ if (!qcs)
+ return 0;
if (offset + len <= qcs->rx.offset) {
TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
@@ -1370,26 +1436,22 @@
/* Client initiated streams must respect the server flow control. */
qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;
qcc->strms[QCS_CLT_BIDI].nb_streams = 0;
- qcc->strms[QCS_CLT_BIDI].largest_id = -1;
qcc->strms[QCS_CLT_BIDI].rx.max_data = 0;
qcc->strms[QCS_CLT_BIDI].tx.max_data = lparams->initial_max_stream_data_bidi_remote;
qcc->strms[QCS_CLT_UNI].max_streams = lparams->initial_max_streams_uni;
qcc->strms[QCS_CLT_UNI].nb_streams = 0;
- qcc->strms[QCS_CLT_UNI].largest_id = -1;
qcc->strms[QCS_CLT_UNI].rx.max_data = 0;
qcc->strms[QCS_CLT_UNI].tx.max_data = lparams->initial_max_stream_data_uni;
/* Server initiated streams must respect the server flow control. */
qcc->strms[QCS_SRV_BIDI].max_streams = 0;
qcc->strms[QCS_SRV_BIDI].nb_streams = 0;
- qcc->strms[QCS_SRV_BIDI].largest_id = -1;
qcc->strms[QCS_SRV_BIDI].rx.max_data = lparams->initial_max_stream_data_bidi_local;
qcc->strms[QCS_SRV_BIDI].tx.max_data = 0;
qcc->strms[QCS_SRV_UNI].max_streams = 0;
qcc->strms[QCS_SRV_UNI].nb_streams = 0;
- qcc->strms[QCS_SRV_UNI].largest_id = -1;
qcc->strms[QCS_SRV_UNI].rx.max_data = lparams->initial_max_stream_data_uni;
qcc->strms[QCS_SRV_UNI].tx.max_data = 0;
@@ -1407,6 +1469,19 @@
qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
+ if (conn_is_back(conn)) {
+ qcc->next_bidi_l = 0x00;
+ qcc->largest_bidi_r = 0x01;
+ qcc->next_uni_l = 0x02;
+ qcc->largest_uni_r = 0x03;
+ }
+ else {
+ qcc->largest_bidi_r = 0x00;
+ qcc->next_bidi_l = 0x01;
+ qcc->largest_uni_r = 0x02;
+ qcc->next_uni_l = 0x03;
+ }
+
qcc->wait_event.tasklet = tasklet_new();
if (!qcc->wait_event.tasklet)
goto fail_no_tasklet;