MINOR: quic: Make use of buffer structs to handle STREAM frames
The STREAM data to send coming from the upper layer must be stored until
having being acked by the peer. To do so, we store them in buffer structs,
one by stream (see qcs.tx.buf). Each time a STREAM is built by quic_push_frame(),
its offset must match the offset of the first byte added to the buffer (modulo
the size of the buffer) by the frame. As they are not always acknowledged in
order, they may be stored in eb_trees ordered by their offset to be sure
to sequentially delete the STREAM data from their buffer, in the order they
have been added to it.
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index 0fdd45d..40346f9 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -571,7 +571,7 @@
!!(s->id & QUIC_STREAM_FRAME_ID_DIR_BIT),
!!(frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT),
(unsigned long long)s->id,
- (unsigned long long)s->offset,
+ (unsigned long long)s->offset.key,
(unsigned long long)s->len);
}
}
@@ -1149,13 +1149,56 @@
return 1;
}
+/* Remove from <qcs> stream the acknowledged frames.
+ * Never fails.
+ */
+static void qcs_try_to_consume(struct qcs *qcs)
+{
+ struct eb64_node *frm_node;
+
+ frm_node = eb64_first(&qcs->tx.acked_frms);
+ while (frm_node) {
+ struct quic_stream *strm;
+
+ strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
+ if (strm->offset.key != qcs->tx.ack_offset)
+ break;
+
+ b_del(strm->buf, strm->len);
+ qcs->tx.ack_offset += strm->len;
+ frm_node = eb64_next(frm_node);
+ eb64_delete(&strm->offset);
+ }
+}
+
/* Treat <frm> frame whose packet it is attached to has just been acknowledged. */
static inline void qc_treat_acked_tx_frm(struct quic_frame *frm,
struct ssl_sock_ctx *ctx)
{
+
TRACE_PROTO("Removing frame", QUIC_EV_CONN_PRSAFRM, ctx->conn, frm);
- LIST_DELETE(&frm->list);
- pool_free(pool_head_quic_frame, frm);
+ switch (frm->type) {
+ case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F:
+ {
+ struct qcs *qcs = frm->stream.qcs;
+ struct quic_stream *strm = &frm->stream;
+
+ if (qcs->tx.ack_offset == strm->offset.key) {
+ b_del(strm->buf, strm->len);
+ qcs->tx.ack_offset += strm->len;
+ LIST_DELETE(&frm->list);
+ pool_free(pool_head_quic_frame, frm);
+ }
+ else {
+ eb64_insert(&qcs->tx.acked_frms, &strm->offset);
+ }
+ qcs_try_to_consume(qcs);
+ }
+ break;
+ default:
+ LIST_DELETE(&frm->list);
+ pool_free(pool_head_quic_frame, frm);
+ }
}
/* Remove <largest> down to <smallest> node entries from <pkts> tree of TX packet,
@@ -1582,7 +1625,7 @@
frm = pool_alloc(pool_head_quic_rx_strm_frm);
if (frm) {
- frm->offset_node.key = stream_frm->offset;
+ frm->offset_node.key = stream_frm->offset.key;
frm->len = stream_frm->len;
frm->data = stream_frm->data;
frm->pkt = pkt;
@@ -1686,7 +1729,7 @@
try = strm_frm->len;
memcpy(b_tail(buf), strm_frm->data, try);
strm_frm->len -= try;
- strm_frm->offset += try;
+ strm_frm->offset.key += try;
b_add(buf, try);
ret += try;
}
@@ -1715,7 +1758,7 @@
}
strm = eb64_entry(&strm_node->node, struct qcs, by_id);
- frm_node = eb64_lookup(&strm->frms, strm_frm->offset);
+ frm_node = eb64_lookup(&strm->rx.frms, strm_frm->offset.key);
/* FIXME: handle the case where this frame overlap others */
if (frm_node) {
TRACE_PROTO("Already existing stream data",
@@ -1723,7 +1766,7 @@
goto out;
}
- if (strm_frm->offset == strm->rx.offset) {
+ if (strm_frm->offset.key == strm->rx.offset) {
int ret;
if (!qc_get_buf(qc->qcc, &strm->rx.buf))
@@ -1749,7 +1792,7 @@
return 0;
}
- eb64_insert(&strm->frms, &frm->offset_node);
+ eb64_insert(&strm->rx.frms, &frm->offset_node);
quic_rx_packet_refinc(pkt);
out:
@@ -1778,7 +1821,7 @@
}
strm = eb64_entry(&strm_node->node, struct qcs, by_id);
- frm_node = eb64_lookup(&strm->frms, strm_frm->offset);
+ frm_node = eb64_lookup(&strm->rx.frms, strm_frm->offset.key);
/* FIXME: handle the case where this frame overlap others */
if (frm_node) {
TRACE_PROTO("Already existing stream data",
@@ -1787,7 +1830,7 @@
}
strm_frm_len = strm_frm->len;
- if (strm_frm->offset == strm->rx.offset) {
+ if (strm_frm->offset.key == strm->rx.offset) {
int ret;
if (!qc_get_buf(qc->qcc, &strm->rx.buf))
@@ -1806,7 +1849,7 @@
if (ret)
ruqs_notify_recv(strm);
- strm_frm->offset += ret;
+ strm_frm->offset.key += ret;
}
/* Take this frame into an account for the stream flow control */
strm->rx.offset += strm_frm_len;
@@ -1824,7 +1867,7 @@
return 0;
}
- eb64_insert(&strm->frms, &frm->offset_node);
+ eb64_insert(&strm->rx.frms, &frm->offset_node);
quic_rx_packet_refinc(pkt);
out:
@@ -3719,11 +3762,11 @@
* excepting the variable ones. Note that +1 is for the type of this frame.
*/
hlen = 1 + quic_int_getsize(cf->stream.id) +
- ((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset) : 0);
+ ((cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) ? quic_int_getsize(cf->stream.offset.key) : 0);
/* Compute the data length of this STREAM frame. */
avail_room = room - hlen - *len;
if ((ssize_t)avail_room <= 0)
- continue;
+ break;
if (cf->type & QUIC_STREAM_FRAME_TYPE_LEN_BIT) {
dlen = max_available_room(avail_room, &dlen_sz);
@@ -3761,6 +3804,8 @@
}
new_cf->type = cf->type;
+ new_cf->stream.qcs = cf->stream.qcs;
+ new_cf->stream.buf = cf->stream.buf;
new_cf->stream.id = cf->stream.id;
if (cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)
new_cf->stream.offset = cf->stream.offset;
@@ -3773,7 +3818,7 @@
cf->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
/* Consume <dlen> bytes of the current frame. */
cf->stream.len -= dlen;
- cf->stream.offset += dlen;
+ cf->stream.offset.key += dlen;
cf->stream.data += dlen;
}
break;