MEDIUM: quic: move transport fields from qcs to qc_conn_stream
Move the xprt-buf and ack related fields from qcs to the qc_stream_desc
structure. In exchange, qcs has a pointer to the low-level stream. For
each new qcs, a qc_stream_desc is automatically allocated.
This simplify the transport layer by removing qcs/mux manipulation
during ACK frame parsing. An additional check is done to not notify the
MUX on sending if the stream is already released : this case may now
happen on retransmission.
To complete this change, the quic_stream frame now references the
quic_stream instance instead of a qcs.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index 20d633b..acc855b 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -10,6 +10,7 @@
#include <haproxy/buf-t.h>
#include <haproxy/connection-t.h>
+#include <haproxy/xprt_quic-t.h>
/* Stream types */
enum qcs_type {
@@ -98,14 +99,12 @@
struct {
uint64_t offset; /* last offset of data ready to be sent */
uint64_t sent_offset; /* last offset sent by transport layer */
- struct eb_root acked_frms; /* acked frames ordered by their offsets */
- uint64_t ack_offset; /* last acked ordered byte offset */
struct buffer buf; /* transmit buffer before sending via xprt */
- struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */
uint64_t msd; /* fctl bytes limit to respect on emission */
} tx;
struct eb64_node by_id; /* place in qcc's streams_by_id */
+ struct qc_stream_desc *stream;
struct wait_event wait_event;
struct wait_event *subs;
diff --git a/include/haproxy/quic_frame-t.h b/include/haproxy/quic_frame-t.h
index 2e94e05..12e3cd0 100644
--- a/include/haproxy/quic_frame-t.h
+++ b/include/haproxy/quic_frame-t.h
@@ -34,6 +34,7 @@
#include <import/ebtree-t.h>
#include <haproxy/mux_quic-t.h>
+#include <haproxy/xprt_quic-t.h>
/* QUIC frame types. */
enum quic_frame_type {
@@ -147,7 +148,7 @@
struct quic_stream {
uint64_t id;
- struct qcs *qcs;
+ struct qc_stream_desc *stream;
/* used only on TX when constructing frames.
* Data cleared when processing ACK related to this STREAM frame.
diff --git a/src/mux_quic.c b/src/mux_quic.c
index 40e26d6..f7ef248 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -96,6 +96,7 @@
struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
{
struct qcs *qcs;
+ struct qc_stream_desc *stream;
TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
@@ -103,6 +104,15 @@
if (!qcs)
goto out;
+ /* allocate transport layer stream descriptor */
+ stream = qc_stream_desc_new(qcc->conn->qc, id, qcs);
+ if (!stream) {
+ pool_free(pool_head_qcs, qcs);
+ qcs = NULL;
+ goto out;
+ }
+
+ qcs->stream = stream;
qcs->qcc = qcc;
qcs->cs = NULL;
qcs->flags = QC_SF_NONE;
@@ -122,11 +132,8 @@
qcs->rx.frms = EB_ROOT_UNIQUE;
qcs->tx.buf = BUF_NULL;
- qcs->tx.xprt_buf = BUF_NULL;
qcs->tx.offset = 0;
qcs->tx.sent_offset = 0;
- qcs->tx.ack_offset = 0;
- qcs->tx.acked_frms = EB_ROOT;
qcs->wait_event.tasklet = NULL;
qcs->wait_event.events = 0;
@@ -145,11 +152,12 @@
{
b_free(&qcs->rx.buf);
b_free(&qcs->tx.buf);
- b_free(&qcs->tx.xprt_buf);
BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams);
--qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams;
+ qc_stream_desc_release(qcs->stream);
+
eb64_delete(&qcs->by_id);
pool_free(pool_head_qcs, qcs);
}
@@ -260,6 +268,7 @@
for (i = largest_id + 1; i <= sub_id; i++) {
uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
+
tmp_qcs = qcs_new(qcc, id, type);
if (!tmp_qcs) {
/* allocation failure */
@@ -558,10 +567,10 @@
* |xxxxxxxxxxxxxxxxx|
*/
- BUG_ON_HOT(qcs->tx.sent_offset < qcs->tx.ack_offset);
+ BUG_ON_HOT(qcs->tx.sent_offset < qcs->stream->ack_offset);
BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset);
- head = qcs->tx.sent_offset - qcs->tx.ack_offset;
+ head = qcs->tx.sent_offset - qcs->stream->ack_offset;
left = qcs->tx.offset - qcs->tx.sent_offset;
to_xfer = QUIC_MIN(b_data(payload), b_room(out));
@@ -585,7 +594,7 @@
total = b_force_xfer(out, payload, to_xfer);
frm->type = QUIC_FT_STREAM_8;
- frm->stream.qcs = (struct qcs *)qcs;
+ frm->stream.stream = qcs->stream;
frm->stream.id = qcs->by_id.key;
frm->stream.buf = out;
frm->stream.data = (unsigned char *)b_peek(out, head);
@@ -753,7 +762,7 @@
while (node) {
struct qcs *qcs = container_of(node, struct qcs, by_id);
struct buffer *buf = &qcs->tx.buf;
- struct buffer *out = &qcs->tx.xprt_buf;
+ struct buffer *out = &qcs->stream->buf;
/* TODO
* for the moment, unidirectional streams have their own
@@ -819,7 +828,8 @@
node = eb64_next(node);
if (qcs->flags & QC_SF_DETACH) {
- if ((!b_data(&qcs->tx.buf) && !b_data(&qcs->tx.xprt_buf))) {
+ if (!b_data(&qcs->tx.buf) &&
+ qcs->tx.offset == qcs->tx.sent_offset) {
qcs_destroy(qcs);
release = 1;
}
@@ -1033,7 +1043,7 @@
* managment between xprt and mux is reorganized.
*/
- if ((b_data(&qcs->tx.buf) || b_data(&qcs->tx.xprt_buf))) {
+ if (b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) {
TRACE_DEVEL("leaving with remaining data, detaching qcs", QMUX_EV_STRM_END, qcc->conn, qcs);
qcs->flags |= QC_SF_DETACH;
return;
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index 36b50fd..5f31d0e 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -447,12 +447,12 @@
if (mask & QUIC_EV_CONN_ACKSTRM) {
const struct quic_stream *s = a2;
- const struct qcs *qcs = a3;
+ const struct qc_stream_desc *stream = a3;
if (s)
chunk_appendf(&trace_buf, " off=%llu len=%llu", (ull)s->offset.key, (ull)s->len);
- if (qcs)
- chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)qcs->tx.ack_offset);
+ if (stream)
+ chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)stream->ack_offset);
}
if (mask & QUIC_EV_CONN_RTTUPDT) {
@@ -1421,38 +1421,35 @@
return 0;
}
-/* Remove from <qcs> stream the acknowledged frames.
+/* Remove from <stream> the acknowledged frames.
*
* Returns 1 if at least one frame was removed else 0.
*/
-static int qcs_try_to_consume(struct qcs *qcs)
+static int quic_stream_try_to_consume(struct quic_conn *qc,
+ struct qc_stream_desc *stream)
{
int ret;
struct eb64_node *frm_node;
ret = 0;
- frm_node = eb64_first(&qcs->tx.acked_frms);
+ frm_node = eb64_first(&stream->acked_frms);
while (frm_node) {
struct quic_stream *strm;
struct quic_frame *frm;
strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
- if (strm->offset.key > qcs->tx.ack_offset)
+ if (strm->offset.key > stream->ack_offset)
break;
TRACE_PROTO("stream consumed", QUIC_EV_CONN_ACKSTRM,
- qcs->qcc->conn->qc, strm, qcs);
- if (strm->offset.key + strm->len > qcs->tx.ack_offset) {
+ qc, strm, stream);
+
+ if (strm->offset.key + strm->len > stream->ack_offset) {
const size_t diff = strm->offset.key + strm->len -
- qcs->tx.ack_offset;
- qcs->tx.ack_offset += diff;
+ stream->ack_offset;
+ stream->ack_offset += diff;
b_del(strm->buf, diff);
ret = 1;
-
- if (!b_data(strm->buf)) {
- b_free(strm->buf);
- offer_buffers(NULL, 1);
- }
}
frm_node = eb64_next(frm_node);
@@ -1464,6 +1461,9 @@
pool_free(pool_head_quic_frame, frm);
}
+ if (!b_data(&stream->buf))
+ qc_stream_desc_free(stream);
+
return ret;
}
@@ -1478,23 +1478,22 @@
switch (frm->type) {
case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F:
{
- struct quic_stream *strm = &frm->stream;
+ struct quic_stream *strm_frm = &frm->stream;
struct eb64_node *node = NULL;
- struct qcs *qcs = NULL;
+ struct qc_stream_desc *stream = NULL;
- /* do not use strm->qcs as the qcs instance might be freed at
- * this stage. Use the id to do a proper lookup.
+ /* do not use strm_frm->stream as the qc_stream_desc instance
+ * might be freed at this stage. Use the id to do a proper
+ * lookup.
*
* TODO if lookup operation impact on the perf is noticeable,
- * implement a refcount on qcs instances.
+ * implement a refcount on qc_stream_desc instances.
*/
- if (qc->mux_state == QC_MUX_READY) {
- node = eb64_lookup(&qc->qcc->streams_by_id, strm->id);
- qcs = eb64_entry(node, struct qcs, by_id);
- }
+ node = eb64_lookup(&qc->streams_by_id, strm_frm->id);
+ stream = eb64_entry(node, struct qc_stream_desc, by_id);
- if (!qcs) {
- TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm);
+ if (!stream) {
+ TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm);
LIST_DELETE(&frm->list);
quic_tx_packet_refdec(frm->pkt);
pool_free(pool_head_quic_frame, frm);
@@ -1503,32 +1502,34 @@
return;
}
- TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm, qcs);
- if (strm->offset.key <= qcs->tx.ack_offset) {
- if (strm->offset.key + strm->len > qcs->tx.ack_offset) {
- const size_t diff = strm->offset.key + strm->len -
- qcs->tx.ack_offset;
- qcs->tx.ack_offset += diff;
- b_del(strm->buf, diff);
+ TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
+ if (strm_frm->offset.key <= stream->ack_offset) {
+ if (strm_frm->offset.key + strm_frm->len > stream->ack_offset) {
+ const size_t diff = strm_frm->offset.key + strm_frm->len -
+ stream->ack_offset;
+ stream->ack_offset += diff;
+ b_del(strm_frm->buf, diff);
stream_acked = 1;
- if (!b_data(strm->buf)) {
- b_free(strm->buf);
- offer_buffers(NULL, 1);
+ if (!b_data(strm_frm->buf)) {
+ if (qc_stream_desc_free(stream)) {
+ /* early return */
+ return;
+ }
}
}
TRACE_PROTO("stream consumed", QUIC_EV_CONN_ACKSTRM,
- qcs->qcc->conn->qc, strm, qcs);
+ qc, strm_frm, stream);
LIST_DELETE(&frm->list);
quic_tx_packet_refdec(frm->pkt);
pool_free(pool_head_quic_frame, frm);
}
else {
- eb64_insert(&qcs->tx.acked_frms, &strm->offset);
+ eb64_insert(&stream->acked_frms, &strm_frm->offset);
}
- stream_acked |= qcs_try_to_consume(qcs);
+ stream_acked |= quic_stream_try_to_consume(qc, stream);
}
break;
default:
@@ -5089,9 +5090,16 @@
LIST_DELETE(&cf->list);
LIST_APPEND(outlist, &cf->list);
- qcc_streams_sent_done(cf->stream.qcs,
- cf->stream.len,
- cf->stream.offset.key);
+ /* The MUX stream might be released at this
+ * stage. This can most notably happen on
+ * retransmission.
+ */
+ if (qc->mux_state == QC_MUX_READY &&
+ !cf->stream.stream->release) {
+ qcc_streams_sent_done(cf->stream.stream->ctx,
+ cf->stream.len,
+ cf->stream.offset.key);
+ }
}
else {
struct quic_frame *new_cf;
@@ -5104,7 +5112,7 @@
}
new_cf->type = cf->type;
- new_cf->stream.qcs = cf->stream.qcs;
+ new_cf->stream.stream = cf->stream.stream;
new_cf->stream.buf = cf->stream.buf;
new_cf->stream.id = cf->stream.id;
if (cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)
@@ -5124,9 +5132,16 @@
cf->stream.offset.key += dlen;
cf->stream.data = (unsigned char *)b_peek(&cf_buf, dlen);
- qcc_streams_sent_done(new_cf->stream.qcs,
- new_cf->stream.len,
- new_cf->stream.offset.key);
+ /* The MUX stream might be released at this
+ * stage. This can most notably happen on
+ * retransmission.
+ */
+ if (qc->mux_state == QC_MUX_READY &&
+ !cf->stream.stream->release) {
+ qcc_streams_sent_done(new_cf->stream.stream->ctx,
+ new_cf->stream.len,
+ new_cf->stream.offset.key);
+ }
}
/* TODO the MUX is notified about the frame sending via