MINOR: quic: Make use of buffer structs to handle STREAM frames
The STREAM data to send coming from the upper layer must be stored until
having being acked by the peer. To do so, we store them in buffer structs,
one by stream (see qcs.tx.buf). Each time a STREAM is built by quic_push_frame(),
its offset must match the offset of the first byte added to the buffer (modulo
the size of the buffer) by the frame. As they are not always acknowledged in
order, they may be stored in eb_trees ordered by their offset to be sure
to sequentially delete the STREAM data from their buffer, in the order they
have been added to it.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index a78a902..241aa60 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -185,7 +185,6 @@
struct session *sess;
struct qcc *qcc;
struct eb64_node by_id; /* place in qcc's streams_by_id */
- struct eb_root frms;
uint64_t id; /* stream ID */
uint32_t flags; /* QC_SF_* */
struct {
@@ -194,12 +193,15 @@
uint64_t offset; /* the current offset of received data */
uint64_t bytes; /* number of bytes received */
struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */
+ struct eb_root frms; /* received frames ordered by their offsets */
} rx;
struct {
enum qcs_tx_st st; /* TX state */
uint64_t max_data; /* maximum number of bytes which may be sent */
uint64_t offset; /* the current offset of data to send */
uint64_t bytes; /* number of bytes sent */
+ uint64_t ack_offset; /* last acked ordered byte offset */
+ struct eb_root acked_frms; /* acked frames ordered by their offsets */
struct buffer buf; /* transmit buffer, always valid (buf_empty or real buffer) */
struct buffer mbuf[QCC_MBUF_CNT];
uint64_t left; /* data currently stored in mbuf waiting for send */
diff --git a/include/haproxy/quic_frame-t.h b/include/haproxy/quic_frame-t.h
index c8a396b..13b4cbf 100644
--- a/include/haproxy/quic_frame-t.h
+++ b/include/haproxy/quic_frame-t.h
@@ -31,6 +31,8 @@
#include <haproxy/list.h>
+#include <import/eb64tree.h>
+
/* QUIC frame types. */
enum quic_frame_type {
QUIC_FT_PADDING = 0x00,
@@ -141,7 +143,9 @@
struct quic_stream {
uint64_t id;
- uint64_t offset;
+ struct qcs *qcs;
+ struct buffer *buf;
+ struct eb64_node offset;
uint64_t len;
const unsigned char *data;
};
diff --git a/include/haproxy/quic_frame.h b/include/haproxy/quic_frame.h
index a5c022a..917b667 100644
--- a/include/haproxy/quic_frame.h
+++ b/include/haproxy/quic_frame.h
@@ -72,7 +72,7 @@
case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F: {
struct quic_stream *f = &frm->stream;
len += 1 + quic_int_getsize(f->id) +
- ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(f->offset) : 0) +
+ ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(f->offset.key) : 0) +
((frm->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) ? quic_int_getsize(f->len) : 0) + f->len;
break;
}
diff --git a/src/mux_quic.c b/src/mux_quic.c
index cb71011..886f3c4 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -970,19 +970,20 @@
qcs->qcc = qcc;
qcs->cs = NULL;
qcs->id = qcs->by_id.key = id;
- qcs->frms = EB_ROOT_UNIQUE;
qcs->flags = QC_SF_NONE;
qcs->rx.buf = BUF_NULL;
qcs->rx.st = QC_RX_SS_IDLE;
qcs->rx.bytes = qcs->rx.offset = 0;
qcs->rx.max_data = qcc->strms[qcs_type].rx.max_data;
-
qcs->rx.buf = BUF_NULL;
+ qcs->rx.frms = EB_ROOT_UNIQUE;
+
qcs->tx.st = QC_TX_SS_IDLE;
- qcs->tx.bytes = qcs->tx.offset = 0;
+ qcs->tx.bytes = qcs->tx.offset = qcs->tx.ack_offset = 0;
+ qcs->tx.acked_frms = EB_ROOT_UNIQUE;
qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
- qcs->tx.buf = BUF_NULL;
+ qcs->tx.buf = BUF_NULL;
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
qcs->tx.left = 0;
@@ -1041,13 +1042,13 @@
qcs->qcc = qcc;
qcs->cs = NULL;
qcs->id = qcs->by_id.key = next_id;
- qcs->frms = EB_ROOT_UNIQUE;
qcs->flags = QC_SF_NONE;
- qcs->tx.st = QC_TX_SS_IDLE;
- qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
- qcs->tx.offset = qcs->tx.bytes = 0;
- qcs->tx.buf = BUF_NULL;
+ qcs->tx.st = QC_TX_SS_IDLE;
+ qcs->tx.max_data = qcc->strms[qcs_type].tx.max_data;
+ qcs->tx.offset = qcs->tx.bytes = qcs->tx.ack_offset = 0;
+ qcs->tx.acked_frms = EB_ROOT_UNIQUE;
+ qcs->tx.buf = BUF_NULL;
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
qcs->tx.left = 0;
@@ -1083,13 +1084,13 @@
qcs->qcc = qcc;
qcs->id = qcs->by_id.key = id;
- qcs->frms = EB_ROOT_UNIQUE;
qcs->flags = QC_SF_NONE;
qcs->rx.st = QC_RX_SS_IDLE;
qcs->rx.max_data = qcc->strms[qcs_type].rx.max_data;
qcs->rx.offset = qcs->rx.bytes = 0;
qcs->rx.buf = BUF_NULL;
+ qcs->rx.frms = EB_ROOT_UNIQUE;
br_init(qcs->tx.mbuf, sizeof(qcs->tx.mbuf) / sizeof(qcs->tx.mbuf[0]));
qcs->tx.left = 0;
@@ -1396,12 +1397,12 @@
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
{
struct quic_frame *frm;
- struct buffer buf = BUF_NULL;
+ struct buffer *buf = &qcs->tx.buf;
+ struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
int total = 0;
- qc_get_buf(qcs->qcc, &buf);
- total = b_xfer(&buf, payload, b_data(payload));
-
+ qc_get_buf(qcs->qcc, buf);
+ total = b_force_xfer(buf, payload, QUIC_MIN(b_data(payload), b_room(buf)));
frm = pool_zalloc(pool_head_quic_frame);
if (!frm)
goto err;
@@ -1411,16 +1412,16 @@
frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
if (offset) {
frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
- frm->stream.offset = offset;
+ frm->stream.offset.key = offset;
}
+ frm->stream.qcs = qcs;
+ frm->stream.buf = buf;
frm->stream.id = qcs->by_id.key;
if (total) {
frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
frm->stream.len = total;
- frm->stream.data = (unsigned char *)b_head(&buf);
}
- struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
MT_LIST_APPEND(&qel->pktns->tx.frms, &frm->mt_list);
fprintf(stderr, "%s: total=%d fin=%d offset=%lu\n", __func__, total, fin, offset);
return total;
diff --git a/src/quic_frame.c b/src/quic_frame.c
index 4f07462..51b073d 100644
--- a/src/quic_frame.c
+++ b/src/quic_frame.c
@@ -375,15 +375,29 @@
struct quic_frame *frm, struct quic_conn *conn)
{
struct quic_stream *stream = &frm->stream;
+ size_t offset, block1, block2;
+ struct buffer b;
if (!quic_enc_int(buf, end, stream->id) ||
- ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) && !quic_enc_int(buf, end, stream->offset)) ||
+ ((frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) && !quic_enc_int(buf, end, stream->offset.key)) ||
((frm->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) &&
(!quic_enc_int(buf, end, stream->len) || end - *buf < stream->len)))
return 0;
- memcpy(*buf, stream->data, stream->len);
- *buf += stream->len;
+ /* Buffer copy */
+ b = *stream->buf;
+ offset = (frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ?
+ stream->offset.key & (b_size(stream->buf) - 1): 0;
+ block1 = b_wrap(&b) - (b_orig(&b) + offset);
+ if (block1 > stream->len)
+ block1 = stream->len;
+ block2 = stream->len - block1;
+ memcpy(*buf, b_orig(&b) + offset, block1);
+ *buf += block1;
+ if (block2) {
+ memcpy(*buf, b_orig(&b), block2);
+ *buf += block2;
+ }
return 1;
}
@@ -401,9 +415,9 @@
/* Offset parsing */
if (!(frm->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)) {
- stream->offset = 0;
+ stream->offset.key = 0;
}
- else if (!quic_dec_int(&stream->offset, buf, end))
+ else if (!quic_dec_int((uint64_t *)&stream->offset.key, buf, end))
return 0;
/* Length parsing */
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index 0fdd45d..40346f9 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -571,7 +571,7 @@
!!(s->id & QUIC_STREAM_FRAME_ID_DIR_BIT),
!!(frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT),
(unsigned long long)s->id,
- (unsigned long long)s->offset,
+ (unsigned long long)s->offset.key,
(unsigned long long)s->len);
}
}
@@ -1149,13 +1149,56 @@
return 1;
}
+/* Remove from <qcs> stream the acknowledged frames.
+ * Never fails.
+ */
+static void qcs_try_to_consume(struct qcs *qcs)
+{
+ struct eb64_node *frm_node;
+
+ frm_node = eb64_first(&qcs->tx.acked_frms);
+ while (frm_node) {
+ struct quic_stream *strm;
+
+ strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
+ if (strm->offset.key != qcs->tx.ack_offset)
+ break;
+
+ b_del(strm->buf, strm->len);
+ qcs->tx.ack_offset += strm->len;
+ frm_node = eb64_next(frm_node);
+ eb64_delete(&strm->offset);
+ }
+}
+
/* Treat <frm> frame whose packet it is attached to has just been acknowledged. */
static inline void qc_treat_acked_tx_frm(struct quic_frame *frm,
struct ssl_sock_ctx *ctx)
{
+
TRACE_PROTO("Removing frame", QUIC_EV_CONN_PRSAFRM, ctx->conn, frm);
- LIST_DELETE(&frm->list);
- pool_free(pool_head_quic_frame, frm);
+ switch (frm->type) {
+ case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F:
+ {
+ struct qcs *qcs = frm->stream.qcs;
+ struct quic_stream *strm = &frm->stream;
+
+ if (qcs->tx.ack_offset == strm->offset.key) {
+ b_del(strm->buf, strm->len);
+ qcs->tx.ack_offset += strm->len;
+ LIST_DELETE(&frm->list);
+ pool_free(pool_head_quic_frame, frm);
+ }
+ else {
+ eb64_insert(&qcs->tx.acked_frms, &strm->offset);
+ }
+ qcs_try_to_consume(qcs);
+ }
+ break;
+ default:
+ LIST_DELETE(&frm->list);
+ pool_free(pool_head_quic_frame, frm);
+ }
}
/* Remove <largest> down to <smallest> node entries from <pkts> tree of TX packet,
@@ -1582,7 +1625,7 @@
frm = pool_alloc(pool_head_quic_rx_strm_frm);
if (frm) {
- frm->offset_node.key = stream_frm->offset;
+ frm->offset_node.key = stream_frm->offset.key;
frm->len = stream_frm->len;
frm->data = stream_frm->data;
frm->pkt = pkt;
@@ -1686,7 +1729,7 @@
try = strm_frm->len;
memcpy(b_tail(buf), strm_frm->data, try);
strm_frm->len -= try;
- strm_frm->offset += try;
+ strm_frm->offset.key += try;
b_add(buf, try);
ret += try;
}
@@ -1715,7 +1758,7 @@
}
strm = eb64_entry(&strm_node->node, struct qcs, by_id);
- frm_node = eb64_lookup(&strm->frms, strm_frm->offset);
+ frm_node = eb64_lookup(&strm->rx.frms, strm_frm->offset.key);
/* FIXME: handle the case where this frame overlap others */
if (frm_node) {
TRACE_PROTO("Already existing stream data",
@@ -1723,7 +1766,7 @@
goto out;
}
- if (strm_frm->offset == strm->rx.offset) {
+ if (strm_frm->offset.key == strm->rx.offset) {
int ret;
if (!qc_get_buf(qc->qcc, &strm->rx.buf))
@@ -1749,7 +1792,7 @@
return 0;
}
- eb64_insert(&strm->frms, &frm->offset_node);
+ eb64_insert(&strm->rx.frms, &frm->offset_node);
quic_rx_packet_refinc(pkt);
out:
@@ -1778,7 +1821,7 @@
}
strm = eb64_entry(&strm_node->node, struct qcs, by_id);
- frm_node = eb64_lookup(&strm->frms, strm_frm->offset);
+ frm_node = eb64_lookup(&strm->rx.frms, strm_frm->offset.key);
/* FIXME: handle the case where this frame overlap others */
if (frm_node) {
TRACE_PROTO("Already existing stream data",
@@ -1787,7 +1830,7 @@
}
strm_frm_len = strm_frm->len;
- if (strm_frm->offset == strm->rx.offset) {
+ if (strm_frm->offset.key == strm->rx.offset) {
int ret;
if (!qc_get_buf(qc->qcc, &strm->rx.buf))
@@ -1806,7 +1849,7 @@
if (ret)
ruqs_notify_recv(strm);
- strm_frm->offset += ret;
+ strm_frm->offset.key += ret;
}
/* Take this frame into an account for the stream flow control */
strm->rx.offset += strm_frm_len;
@@ -1824,7 +1867,7 @@
return 0;
}
- eb64_insert(&strm->frms, &frm->offset_node);
+ eb64_insert(&strm->rx.frms, &frm->offset_node);
quic_rx_packet_refinc(pkt);
out:
@@ -3719,11 +3762,11 @@
* excepting the variable ones. Note that +1 is for the type of this frame.
*/
hlen = 1 + quic_int_getsize(cf->stream.id) +
- ((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset) : 0);
+ ((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset.key) : 0);
/* Compute the data length of this STREAM frame. */
avail_room = room - hlen - *len;
if ((ssize_t)avail_room <= 0)
- continue;
+ break;
if (cf->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) {
dlen = max_available_room(avail_room, &dlen_sz);
@@ -3761,6 +3804,8 @@
}
new_cf->type = cf->type;
+ new_cf->stream.qcs = cf->stream.qcs;
+ new_cf->stream.buf = cf->stream.buf;
new_cf->stream.id = cf->stream.id;
if (cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)
new_cf->stream.offset = cf->stream.offset;
@@ -3773,7 +3818,7 @@
cf->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
/* Consume <dlen> bytes of the current frame. */
cf->stream.len -= dlen;
- cf->stream.offset += dlen;
+ cf->stream.offset.key += dlen;
cf->stream.data += dlen;
}
break;