MEDIUM: quic: refactor uni streams RX
The whole QUIC stack is impacted by this change :
* at quic-conn level, a single function is now used to handle uni and
bidirectional streams. It uses qcc_recv() function from MUX.
* at MUX level, qc_recv() io-handler function does not skip uni streams
* most changes are conducted at app layer. Most notably, all received
data is handle by decode_qcs operation.
Now that decode_qcs is the single app read function, the H3 layer can be
simplified. Uni streams parsing was extracted from h3_attach_ruqs() to
h3_decode_qcs().
h3_decode_qcs() is able to deal with all HTTP/3 frame types. It first
check if the frame is valid for the H3 stream type. Most notably,
SETTINGS parsing was moved from h3_control_recv() into h3_decode_qcs().
This commit has some major benefits besides removing duplicated code.
Mainly, QUIC flow control is now enforced for uni streams as with bidi
streams. Also, an unknown frame received on control stream does not set
an error : it is now silently ignored as required by the specification.
Some cleaning in H3 code is already done with this patch :
h3_control_recv() and h3_attach_ruqs() are removed as they are now
unused. A final patch should clean up the unneeded remaining bit.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index 903e326..fca28f7 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -136,7 +136,6 @@
struct qcc_app_ops {
int (*init)(struct qcc *qcc);
int (*attach)(struct qcs *qcs);
- int (*attach_ruqs)(struct qcs *qcs, void *ctx);
int (*decode_qcs)(struct qcs *qcs, int fin, void *ctx);
size_t (*snd_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags);
void (*detach)(struct qcs *qcs);
diff --git a/src/h3.c b/src/h3.c
index e3a3f77..d5e96d5 100644
--- a/src/h3.c
+++ b/src/h3.c
@@ -165,9 +165,8 @@
*
* Returns 0 on success else non-zero.
*/
-static int h3_parse_uni_stream_no_h3(struct qcs *qcs, void *ctx)
+static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct ncbuf *rxbuf)
{
- struct ncbuf *rxbuf = &qcs->rx.ncbuf;
struct h3s *h3s = qcs->ctx;
BUG_ON_HOT(!quic_stream_is_uni(qcs->id) ||
@@ -418,6 +417,47 @@
return htx_sent;
}
+/* Parse a SETTINGS frame which must not be truncated with <flen> as length from
+ * <rxbuf> buffer. This function does not update this buffer.
+ *
+ * Returns 0 on success else non-zero.
+ */
+static int h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf, size_t flen)
+{
+ uint64_t id, value;
+ const unsigned char *buf, *end;
+
+ buf = (const unsigned char *)ncb_head(rxbuf);
+ end = buf + flen;
+
+ while (buf < end) {
+ if (!quic_dec_int(&id, &buf, end) || !quic_dec_int(&value, &buf, end))
+ return 1;
+
+ h3_debug_printf(stderr, "%s id: %llu value: %llu\n",
+ __func__, (unsigned long long)id, (unsigned long long)value);
+ switch (id) {
+ case H3_SETTINGS_QPACK_MAX_TABLE_CAPACITY:
+ h3c->qpack_max_table_capacity = value;
+ break;
+ case H3_SETTINGS_MAX_FIELD_SECTION_SIZE:
+ h3c->max_field_section_size = value;
+ break;
+ case H3_SETTINGS_QPACK_BLOCKED_STREAMS:
+ h3c->qpack_blocked_streams = value;
+ break;
+ case H3_SETTINGS_RESERVED_2 ... H3_SETTINGS_RESERVED_5:
+ h3c->err = H3_SETTINGS_ERROR;
+ return 1;
+ default:
+ /* MUST be ignored */
+ break;
+ }
+ }
+
+ return 0;
+}
+
/* Decode <qcs> remotely initiated bidi-stream. <fin> must be set to indicate
* that we received the last data of the stream.
*
@@ -434,6 +474,18 @@
if (!ncb_data(rxbuf, 0))
return 0;
+ if (quic_stream_is_uni(qcs->id) && !(h3s->flags & H3_SF_UNI_INIT)) {
+ if (h3_init_uni_stream(h3c, qcs, rxbuf))
+ return 1;
+ }
+
+ if (quic_stream_is_uni(qcs->id) && (h3s->flags & H3_SF_UNI_NO_H3)) {
+ /* For non-h3 STREAM, parse it and return immediately. */
+ if (h3_parse_uni_stream_no_h3(qcs, rxbuf))
+ return 1;
+ return 0;
+ }
+
while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) {
uint64_t ftype, flen;
struct buffer b;
@@ -492,10 +544,18 @@
/* TODO handle error reporting. Stream closure required. */
if (ret < 0) { ABORT_NOW(); }
break;
+ case H3_FT_CANCEL_PUSH:
case H3_FT_PUSH_PROMISE:
+ case H3_FT_MAX_PUSH_ID:
+ case H3_FT_GOAWAY:
/* Not supported */
ret = flen;
break;
+ case H3_FT_SETTINGS:
+ if (h3_parse_settings_frm(qcs->qcc->ctx, rxbuf, flen))
+ return 1;
+ ret = flen;
+ break;
default:
/* draft-ietf-quic-http34 9. Extensions to HTTP/3
*
@@ -521,105 +581,6 @@
return 0;
}
-/* Parse a SETTINGS frame which must not be truncated with <flen> as length from
- * <rxbuf> buffer. This function does not update this buffer.
- * Returns 0 if something wrong happened, 1 if not.
- */
-static int h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf, size_t flen)
-{
- uint64_t id, value;
- const unsigned char *buf, *end;
-
- buf = (const unsigned char *)ncb_head(rxbuf);
- end = buf + flen;
-
- while (buf < end) {
- if (!quic_dec_int(&id, &buf, end) || !quic_dec_int(&value, &buf, end))
- return 0;
-
- h3_debug_printf(stderr, "%s id: %llu value: %llu\n",
- __func__, (unsigned long long)id, (unsigned long long)value);
- switch (id) {
- case H3_SETTINGS_QPACK_MAX_TABLE_CAPACITY:
- h3c->qpack_max_table_capacity = value;
- break;
- case H3_SETTINGS_MAX_FIELD_SECTION_SIZE:
- h3c->max_field_section_size = value;
- break;
- case H3_SETTINGS_QPACK_BLOCKED_STREAMS:
- h3c->qpack_blocked_streams = value;
- break;
- case H3_SETTINGS_RESERVED_2 ... H3_SETTINGS_RESERVED_5:
- h3c->err = H3_SETTINGS_ERROR;
- return 0;
- default:
- /* MUST be ignored */
- break;
- }
- }
-
- return 1;
-}
-
-/* Decode <qcs> remotely initiated uni-stream. We stop parsing a frame as soon as
- * there is not enough received data.
- * Returns 0 if something wrong happened, 1 if not.
- */
-static int h3_control_recv(struct qcs *qcs, void *ctx)
-{
- struct ncbuf *rxbuf = &qcs->rx.ncbuf;
- struct h3c *h3c = ctx;
-
- h3_debug_printf(stderr, "%s STREAM ID: %lu\n", __func__, qcs->id);
- if (!ncb_data(rxbuf, 0))
- return 1;
-
- while (ncb_data(rxbuf, 0)) {
- size_t hlen;
- uint64_t ftype, flen;
- struct buffer b;
-
- /* Work on a copy of <rxbuf> */
- b = h3_b_dup(rxbuf);
- hlen = h3_decode_frm_header(&ftype, &flen, &b);
- if (!hlen)
- break;
-
- h3_debug_printf(stderr, "%s: ftype: %llu, flen: %llu\n", __func__,
- (unsigned long long)ftype, (unsigned long long)flen);
- if (flen > b_data(&b))
- break;
-
- qcs_consume(qcs, hlen);
- /* From here, a frame must not be truncated */
- switch (ftype) {
- case H3_FT_CANCEL_PUSH:
- /* XXX TODO XXX */
- ABORT_NOW();
- break;
- case H3_FT_SETTINGS:
- if (!h3_parse_settings_frm(h3c, rxbuf, flen))
- return 0;
- break;
- case H3_FT_GOAWAY:
- /* XXX TODO XXX */
- ABORT_NOW();
- break;
- case H3_FT_MAX_PUSH_ID:
- /* XXX TODO XXX */
- ABORT_NOW();
- break;
- default:
- /* Error */
- h3c->err = H3_FRAME_UNEXPECTED;
- return 0;
- }
- qcs_consume(qcs, flen);
- }
-
- return 1;
-}
-
/* Returns buffer for data sending.
* May be NULL if the allocation failed.
*/
@@ -957,50 +918,6 @@
return 0;
}
-/* Finalize the initialization of remotely initiated uni-stream <qcs>.
- * Return 1 if succeeded, 0 if not. In this latter case, set the ->err h3 error
- * to inform the QUIC mux layer of the encountered error.
- */
-static int h3_attach_ruqs(struct qcs *qcs, void *ctx)
-{
- struct h3c *h3c = ctx;
- struct h3s *h3s = qcs->ctx;
- struct ncbuf *rxbuf = &qcs->rx.ncbuf;
-
- if (h3_init_uni_stream(h3c, qcs, rxbuf))
- return 0;
-
- /* Note that for all the uni-streams below, this is an error to receive two times the
- * same type of uni-stream (even for Push stream which is not supported at this time.
- */
- switch (h3s->type) {
- case H3S_T_CTRL:
- h3c->rctrl.qcs = qcs;
- h3c->rctrl.cb = h3_control_recv;
- qcs_subscribe(qcs, SUB_RETRY_RECV, &h3c->rctrl.wait_event);
- break;
- case H3S_T_PUSH:
- /* NOT SUPPORTED */
- break;
- case H3S_T_QPACK_ENC:
- h3c->rqpack_enc.qcs = qcs;
- h3c->rqpack_enc.cb = h3_parse_uni_stream_no_h3;
- qcs_subscribe(qcs, SUB_RETRY_RECV, &h3c->rqpack_enc.wait_event);
- break;
- case H3S_T_QPACK_DEC:
- h3c->rqpack_dec.qcs = qcs;
- h3c->rqpack_dec.cb = h3_parse_uni_stream_no_h3;
- qcs_subscribe(qcs, SUB_RETRY_RECV, &h3c->rqpack_dec.wait_event);
- break;
- default:
- /* Error */
- h3c->err = H3_STREAM_CREATION_ERROR;
- return 0;
- }
-
- return 1;
-}
-
static void h3_detach(struct qcs *qcs)
{
struct h3s *h3s = qcs->ctx;
@@ -1109,7 +1026,7 @@
if (!h3_uqs_init(&h3c->rqpack_enc, h3c, NULL, h3_uqs_task) ||
!h3_uqs_init(&h3c->rqpack_dec, h3c, NULL, h3_uqs_task) ||
- !h3_uqs_init(&h3c->rctrl, h3c, h3_control_recv, h3_uqs_task))
+ !h3_uqs_init(&h3c->rctrl, h3c, NULL, h3_uqs_task))
goto fail_no_h3_ruqs;
if (!h3_uqs_init(&h3c->lctrl, h3c, NULL, h3_uqs_task) ||
@@ -1156,7 +1073,6 @@
const struct qcc_app_ops h3_ops = {
.init = h3_init,
.attach = h3_attach,
- .attach_ruqs = h3_attach_ruqs,
.decode_qcs = h3_decode_qcs,
.snd_buf = h3_snd_buf,
.detach = h3_detach,
diff --git a/src/mux_quic.c b/src/mux_quic.c
index 206307f..c47e83a 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -1175,17 +1175,17 @@
node = eb64_first(&qcc->streams_by_id);
while (node) {
+ uint64_t id;
+
qcs = eb64_entry(node, struct qcs, by_id);
+ id = qcs->id;
- /* TODO unidirectional streams have their own mechanism for Rx.
- * This should be unified.
- */
- if (quic_stream_is_uni(qcs->id)) {
+ if (!ncb_data(&qcs->rx.ncbuf, 0) || (qcs->flags & QC_SF_DEM_FULL)) {
node = eb64_next(node);
continue;
}
- if (!ncb_data(&qcs->rx.ncbuf, 0) || (qcs->flags & QC_SF_DEM_FULL)) {
+ if (quic_stream_is_uni(id) && quic_stream_is_local(qcc, id)) {
node = eb64_next(node);
continue;
}
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index b081c88..41d7ef6 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -2169,92 +2169,17 @@
return 0;
}
-/* Handle <strm_frm> bidirectional STREAM frame. Depending on its ID, several
- * streams may be open. The data are copied to the stream RX buffer if possible.
- * If not, the STREAM frame is stored to be treated again later.
- * We rely on the flow control so that not to store too much STREAM frames.
- * Return 1 if succeeded, 0 if not.
- */
-static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt,
- struct quic_stream *strm_frm,
- struct quic_conn *qc)
-{
- int ret;
-
- ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len,
- strm_frm->offset.key, strm_frm->fin,
- (char *)strm_frm->data);
-
- /* frame rejected - packet must not be acknowledeged */
- if (ret)
- return 0;
-
- return 1;
-}
-
-/* Handle <strm_frm> unidirectional STREAM frame. Depending on its ID, several
- * streams may be open. The data are copied to the stream RX buffer if possible.
- * If not, the STREAM frame is stored to be treated again later.
- * We rely on the flow control so that not to store too much STREAM frames.
- * Return 1 if succeeded, 0 if not.
- */
-static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
- struct quic_stream *strm_frm,
- struct quic_conn *qc)
-{
- struct qcs *strm;
- enum ncb_ret ret;
-
- strm = qcc_get_qcs(qc->qcc, strm_frm->id);
- if (!strm) {
- TRACE_PROTO("Stream not found", QUIC_EV_CONN_PSTRM, qc);
- return 0;
- }
-
- if (strm_frm->offset.key < strm->rx.offset) {
- size_t diff;
-
- if (strm_frm->offset.key + strm_frm->len <= strm->rx.offset) {
- TRACE_PROTO("Already received STREAM data",
- QUIC_EV_CONN_PSTRM, qc);
- goto out;
- }
-
- TRACE_PROTO("Partially already received STREAM data", QUIC_EV_CONN_PSTRM, qc);
- diff = strm->rx.offset - strm_frm->offset.key;
- strm_frm->offset.key = strm->rx.offset;
- strm_frm->len -= diff;
- strm_frm->data += diff;
- }
-
- qc_get_ncbuf(strm, &strm->rx.ncbuf);
- if (ncb_is_null(&strm->rx.ncbuf))
- return 0;
-
- ret = ncb_add(&strm->rx.ncbuf, strm_frm->offset.key - strm->rx.offset,
- (char *)strm_frm->data, strm_frm->len, NCB_ADD_COMPARE);
- if (ret != NCB_RET_OK)
- return 0;
-
- /* Inform the application of the arrival of this new stream */
- if (!strm->rx.offset && !qc->qcc->app_ops->attach_ruqs(strm, qc->qcc->ctx)) {
- TRACE_PROTO("Could not set an uni-stream", QUIC_EV_CONN_PSTRM, qc);
- return 0;
- }
-
- qcs_notify_recv(strm);
-
- out:
- return 1;
-}
-
-/* Returns 1 on success or 0 on error. On error, the packet containing the
- * frame must not be acknowledged.
+/* Parse a STREAM frame <strm_frm>
+ *
+ * Return 1 on success. On error, 0 is returned. In this case, the packet
+ * containing the frame must not be acknowledged.
*/
static inline int qc_handle_strm_frm(struct quic_rx_packet *pkt,
struct quic_stream *strm_frm,
struct quic_conn *qc)
{
+ int ret;
+
/* RFC9000 13.1. Packet Processing
*
* A packet MUST NOT be acknowledged until packet protection has been
@@ -2263,11 +2188,15 @@
* enqueued in preparation to be received by the application protocol,
* but it does not require that data be delivered and consumed.
*/
+ ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len,
+ strm_frm->offset.key, strm_frm->fin,
+ (char *)strm_frm->data);
- if (strm_frm->id & QCS_ID_DIR_BIT)
- return qc_handle_uni_strm_frm(pkt, strm_frm, qc);
- else
- return qc_handle_bidi_strm_frm(pkt, strm_frm, qc);
+ /* frame rejected - packet must not be acknowledeged */
+ if (ret)
+ return 0;
+
+ return 1;
}
/* Duplicate all frames from <pkt_frm_list> list into <out_frm_list> list