MINOR: mux_quic: Add QUIC mux layer.
This file has been derived from mux_h2.c removing all h2 parts. At
QUIC mux layer, there must not be any reference to http. This will be the
responsability of the application layer (h3) to open streams handled by the mux.
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index 851d2f4..d798b3a 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -34,7 +34,9 @@
#include <haproxy/fd.h>
#include <haproxy/freq_ctr.h>
#include <haproxy/global.h>
+#include <haproxy/h3.h>
#include <haproxy/log.h>
+#include <haproxy/mux_quic.h>
#include <haproxy/pipe.h>
#include <haproxy/proxy.h>
#include <haproxy/quic_cc.h>
@@ -153,6 +155,8 @@
DECLARE_STATIC_POOL(pool_head_quic_rx_crypto_frm, "quic_rx_crypto_frm_pool", sizeof(struct quic_rx_crypto_frm));
+DECLARE_POOL(pool_head_quic_rx_strm_frm, "quic_rx_strm_frm", sizeof(struct quic_rx_strm_frm));
+
DECLARE_POOL(pool_head_quic_tx_frm, "quic_tx_frm_pool", sizeof(struct quic_tx_frm));
DECLARE_STATIC_POOL(pool_head_quic_crypto_buf, "quic_crypto_buf_pool", sizeof(struct quic_crypto_buf));
@@ -1572,6 +1576,277 @@
return 0;
}
+/* Allocate a new STREAM RX frame from <stream_fm> STREAM frame attached to
+ * <pkt> RX packet.
+ * Return it if succeeded, NULL if not.
+ */
+static inline
+struct quic_rx_strm_frm *new_quic_rx_strm_frm(struct quic_stream *stream_frm,
+ struct quic_rx_packet *pkt)
+{
+ struct quic_rx_strm_frm *frm;
+
+ frm = pool_alloc(pool_head_quic_rx_strm_frm);
+ if (frm) {
+ frm->offset_node.key = stream_frm->offset;
+ frm->len = stream_frm->len;
+ frm->data = stream_frm->data;
+ frm->pkt = pkt;
+ }
+
+ return frm;
+}
+
+/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
+ * several streams, depending on the already open onces.
+ * Return this node if succeeded, NULL if not.
+ */
+static struct eb64_node *qcc_get_qcs(struct qcc *qcc, uint64_t id)
+{
+ unsigned int strm_type;
+ int64_t sub_id;
+ struct eb64_node *strm_node;
+
+ TRACE_ENTER(QUIC_EV_CONN_PSTRM, qcc->conn);
+
+ strm_type = id & QCS_ID_TYPE_MASK;
+ sub_id = id >> QCS_ID_TYPE_SHIFT;
+ strm_node = NULL;
+ if (qc_local_stream_id(qcc, id)) {
+ /* Local streams: this stream must be already opened. */
+ strm_node = eb64_lookup(&qcc->streams_by_id, id);
+ if (!strm_node) {
+ TRACE_PROTO("Unknown stream ID", QUIC_EV_CONN_PSTRM, qcc->conn);
+ goto out;
+ }
+ }
+ else {
+ /* Remote streams. */
+ struct eb_root *strms;
+ uint64_t largest_id;
+ enum qcs_type qcs_type;
+
+ strms = &qcc->streams_by_id;
+ qcs_type = qcs_id_type(id);
+ if (sub_id + 1 > qcc->strms[qcs_type].max_streams) {
+ TRACE_PROTO("Streams limit reached", QUIC_EV_CONN_PSTRM, qcc->conn);
+ goto out;
+ }
+
+ /* Note: ->largest_id was initialized with (uint64_t)-1 as value, 0 being a
+ * correct value.
+ */
+ largest_id = qcc->strms[qcs_type].largest_id;
+ if (sub_id > (int64_t)largest_id) {
+ /* RFC: "A stream ID that is used out of order results in all streams
+ * of that type with lower-numbered stream IDs also being opened".
+ * So, let's "open" these streams.
+ */
+ int64_t i;
+ struct qcs *qcs;
+
+ qcs = NULL;
+ for (i = largest_id + 1; i <= sub_id; i++) {
+ qcs = qcs_new(qcc, (i << QCS_ID_TYPE_SHIFT) | strm_type);
+ if (!qcs) {
+ TRACE_PROTO("Could not allocate a new stream",
+ QUIC_EV_CONN_PSTRM, qcc->conn);
+ goto out;
+ }
+
+ qcc->strms[qcs_type].largest_id = i;
+ }
+ if (qcs)
+ strm_node = &qcs->by_id;
+ }
+ else {
+ strm_node = eb64_lookup(strms, id);
+ }
+ }
+
+ TRACE_LEAVE(QUIC_EV_CONN_PSTRM, qcc->conn);
+ return strm_node;
+
+ out:
+ TRACE_LEAVE(QUIC_EV_CONN_PSTRM, qcc->conn);
+ return NULL;
+}
+
+/* Copy as most as possible STREAM data from <strm_frm> into <strm> stream.
+ * Returns the number of bytes copied or -1 if failed. Also update <strm_frm> frame
+ * to reflect the data which have been consumed.
+ */
+static size_t qc_strm_cpy(struct buffer *buf, struct quic_stream *strm_frm)
+{
+ size_t ret;
+
+ ret = 0;
+ while (strm_frm->len) {
+ size_t try;
+
+ try = b_contig_space(buf);
+ if (!try)
+ break;
+
+ if (try > strm_frm->len)
+ try = strm_frm->len;
+ memcpy(b_tail(buf), strm_frm->data, try);
+ strm_frm->len -= try;
+ strm_frm->offset += try;
+ b_add(buf, try);
+ ret += try;
+ }
+
+ return ret;
+}
+
+/* Handle <strm_frm> bidirectional STREAM frame. Depending on its ID, several
+ * streams may be open. The data are copied to the stream RX buffer if possible.
+ * If not, the STREAM frame is stored to be treated again later.
+ * We rely on the flow control so that not to store too much STREAM frames.
+ * Return 1 if succeeded, 0 if not.
+ */
+static int qc_handle_bidi_strm_frm(struct quic_rx_packet *pkt,
+ struct quic_stream *strm_frm,
+ struct quic_conn *qc)
+{
+ struct qcs *strm;
+ struct eb64_node *strm_node, *frm_node;
+ struct quic_rx_strm_frm *frm;
+
+ strm_node = qcc_get_qcs(qc->qcc, strm_frm->id);
+ if (!strm_node) {
+ TRACE_PROTO("Stream not found", QUIC_EV_CONN_PSTRM, qc->conn);
+ return 0;
+ }
+
+ strm = eb64_entry(&strm_node->node, struct qcs, by_id);
+ frm_node = eb64_lookup(&strm->frms, strm_frm->offset);
+ /* FIXME: handle the case where this frame overlap others */
+ if (frm_node) {
+ TRACE_PROTO("Already existing stream data",
+ QUIC_EV_CONN_PSTRM, qc->conn);
+ goto out;
+ }
+
+ if (strm_frm->offset == strm->rx.offset) {
+ int ret;
+
+ if (!qc_get_buf(qc->qcc, &strm->rx.buf))
+ goto store_frm;
+
+ ret = qc_strm_cpy(&strm->rx.buf, strm_frm);
+ if (ret && qc->qcc->app_ops->decode_qcs(strm, qc->qcc->ctx) == -1) {
+ TRACE_PROTO("Decoding error", QUIC_EV_CONN_PSTRM);
+ return 0;
+ }
+
+ strm->rx.offset += ret;
+ }
+
+ if (!strm_frm->len)
+ goto out;
+
+ store_frm:
+ frm = new_quic_rx_strm_frm(strm_frm, pkt);
+ if (!frm) {
+ TRACE_PROTO("Could not alloc RX STREAM frame",
+ QUIC_EV_CONN_PSTRM, qc->conn);
+ return 0;
+ }
+
+ eb64_insert(&strm->frms, &frm->offset_node);
+ quic_rx_packet_refinc(pkt);
+
+ out:
+ return 1;
+}
+
+/* Handle <strm_frm> unidirectional STREAM frame. Depending on its ID, several
+ * streams may be open. The data are copied to the stream RX buffer if possible.
+ * If not, the STREAM frame is stored to be treated again later.
+ * We rely on the flow control so that not to store too much STREAM frames.
+ * Return 1 if succeeded, 0 if not.
+ */
+static int qc_handle_uni_strm_frm(struct quic_rx_packet *pkt,
+ struct quic_stream *strm_frm,
+ struct quic_conn *qc)
+{
+ struct qcs *strm;
+ struct eb64_node *strm_node, *frm_node;
+ struct quic_rx_strm_frm *frm;
+ size_t strm_frm_len;
+
+ strm_node = qcc_get_qcs(qc->qcc, strm_frm->id);
+ if (!strm_node) {
+ TRACE_PROTO("Stream not found", QUIC_EV_CONN_PSTRM, qc->conn);
+ return 0;
+ }
+
+ strm = eb64_entry(&strm_node->node, struct qcs, by_id);
+ frm_node = eb64_lookup(&strm->frms, strm_frm->offset);
+ /* FIXME: handle the case where this frame overlap others */
+ if (frm_node) {
+ TRACE_PROTO("Already existing stream data",
+ QUIC_EV_CONN_PSTRM, qc->conn);
+ goto out;
+ }
+
+ strm_frm_len = strm_frm->len;
+ if (strm_frm->offset == strm->rx.offset) {
+ int ret;
+
+ if (!qc_get_buf(qc->qcc, &strm->rx.buf))
+ goto store_frm;
+
+ /* qc_strm_cpy() will modify the offset, depending on the number
+ * of bytes copied.
+ */
+ ret = qc_strm_cpy(&strm->rx.buf, strm_frm);
+ /* Inform the application of the arrival of this new stream */
+ if (!strm->rx.offset && !qc->qcc->app_ops->attach_ruqs(strm, qc->qcc->ctx)) {
+ TRACE_PROTO("Could not set an uni-stream", QUIC_EV_CONN_PSTRM, qc->conn);
+ return 0;
+ }
+
+ if (ret)
+ ruqs_notify_recv(strm);
+
+ strm_frm->offset += ret;
+ }
+ /* Take this frame into an account for the stream flow control */
+ strm->rx.offset += strm_frm_len;
+ /* It all the data were provided to the application, there is no need to
+ * store any more inforamtion for it.
+ */
+ if (!strm_frm->len)
+ goto out;
+
+ store_frm:
+ frm = new_quic_rx_strm_frm(strm_frm, pkt);
+ if (!frm) {
+ TRACE_PROTO("Could not alloc RX STREAM frame",
+ QUIC_EV_CONN_PSTRM, qc->conn);
+ return 0;
+ }
+
+ eb64_insert(&strm->frms, &frm->offset_node);
+ quic_rx_packet_refinc(pkt);
+
+ out:
+ return 1;
+}
+
+static inline int qc_handle_strm_frm(struct quic_rx_packet *pkt,
+ struct quic_stream *strm_frm,
+ struct quic_conn *qc)
+{
+ if (strm_frm->id & QCS_ID_DIR_BIT)
+ return qc_handle_uni_strm_frm(pkt, strm_frm, qc);
+ else
+ return qc_handle_bidi_strm_frm(pkt, strm_frm, qc);
+}
+
/* Parse all the frames of <pkt> QUIC packet for QUIC connection with <ctx>
* as I/O handler context and <qel> as encryption level.
* Returns 1 if succeeded, 0 if failed.
@@ -1666,6 +1941,10 @@
goto err;
} else if (!(stream->id & QUIC_STREAM_FRAME_ID_INITIATOR_BIT))
goto err;
+
+ if (!qc_handle_strm_frm(pkt, stream, ctx->conn->qc))
+ goto err;
+
break;
}
case QUIC_FT_NEW_CONNECTION_ID: