MEDIUM: mux-quic: improve bidir STREAM frames sending
The current implementation of STREAM frames emission has some
limitation. Most notably when we cannot sent all frames in a single
qc_send run.
In this case, frames are left in front of the MUX list. It will be
re-send individually before other frames, possibly another frame from
the same STREAM with new data. An opportunity to merge the frames is
lost here.
This method is now improved. If a frame cannot be send entirely, it is
discarded. On the next qc_send run, we retry to send to this position. A
new field qcs.sent_offset is used to remember this. A new frame list is
used for each qc_send.
The impact of this change is not precisely known. The most notable point
is that it is a more logical method of emission. It might also improve
performance as we do not keep old STREAM frames which might delay other
streams.
diff --git a/src/mux_quic.c b/src/mux_quic.c
index e12edcc..cb90bb3 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -41,6 +41,7 @@
qcs->tx.buf = BUF_NULL;
qcs->tx.xprt_buf = BUF_NULL;
qcs->tx.offset = 0;
+ qcs->tx.sent_offset = 0;
qcs->tx.ack_offset = 0;
qcs->tx.acked_frms = EB_ROOT_UNIQUE;
@@ -350,50 +351,72 @@
}
}
-static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset,
+static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
+ struct buffer *payload, int fin,
struct list *frm_list)
{
struct quic_frame *frm;
- struct buffer *buf = &qcs->tx.xprt_buf;
- int total = 0, to_xfer;
- unsigned char *btail;
+ int head, left, to_xfer;
+ int total = 0;
fprintf(stderr, "%s\n", __func__);
- qc_get_buf(qcs, buf);
- to_xfer = QUIC_MIN(b_data(payload), b_room(buf));
- if (!to_xfer)
+ qc_get_buf(qcs, out);
+
+ /*
+ * QCS out buffer diagram
+ * head left to_xfer
+ * -------------> ----------> ----->
+ * ==================================================
+ * |...............|xxxxxxxxxxx|<<<<<
+ * ==================================================
+ * ^ ack-off ^ sent-off ^ off
+ *
+ * STREAM frame
+ * ^ ^
+ * |xxxxxxxxxxxxxxxxx|
+ */
+
+ BUG_ON_HOT(qcs->tx.sent_offset < qcs->tx.ack_offset);
+ BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset);
+
+ head = qcs->tx.sent_offset - qcs->tx.ack_offset;
+ left = qcs->tx.offset - qcs->tx.sent_offset;
+ to_xfer = QUIC_MIN(b_data(payload), b_room(out));
+ if (!left && !to_xfer)
goto out;
frm = pool_zalloc(pool_head_quic_frame);
if (!frm)
goto err;
+ total = b_force_xfer(out, payload, to_xfer);
+
+ frm->type = QUIC_FT_STREAM_8;
+ frm->stream.qcs = (struct qcs *)qcs;
+ frm->stream.id = qcs->by_id.key;
+ frm->stream.buf = out;
+ frm->stream.data = (unsigned char *)b_peek(out, head);
+
- /* store buffer end before transfering data for frm.stream.data */
- btail = (unsigned char *)b_tail(buf);
- total = b_force_xfer(buf, payload, to_xfer);
/* FIN is positioned only when the buffer has been totally emptied. */
fin = fin && !b_data(payload);
- frm->type = QUIC_FT_STREAM_8;
if (fin)
frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
- if (offset) {
+
+ if (qcs->tx.sent_offset) {
frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
- frm->stream.offset.key = offset;
+ frm->stream.offset.key = qcs->tx.sent_offset;
}
- frm->stream.qcs = (struct qcs *)qcs;
- frm->stream.buf = buf;
- frm->stream.data = btail;
- frm->stream.id = qcs->by_id.key;
- if (total) {
+
+ if (left + total) {
frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
- frm->stream.len = total;
+ frm->stream.len = left + total;
}
LIST_APPEND(frm_list, &frm->list);
out:
- fprintf(stderr, "%s: total=%d fin=%d id=%llu offset=%lu\n",
- __func__, total, fin, (ull)qcs->by_id.key, offset);
+ fprintf(stderr, "%s: sent=%lu total=%d fin=%d id=%llu offset=%lu\n",
+ __func__, (long unsigned)b_data(out), total, fin, (ull)qcs->by_id.key, qcs->tx.sent_offset);
return total;
err:
@@ -406,11 +429,20 @@
*/
void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
{
+ uint64_t diff = data;
+
+ BUG_ON(offset > qcs->tx.sent_offset);
+
/* check if the STREAM frame has already been notified. It can happen
* for retransmission.
*/
if (offset + data <= qcs->tx.sent_offset)
return;
+
+ diff = offset + data - qcs->tx.sent_offset;
+
+ /* increase offset on stream */
+ qcs->tx.sent_offset += diff;
}
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
@@ -480,6 +512,7 @@
static int qc_send(struct qcc *qcc)
{
+ struct list frms = LIST_HEAD_INIT(frms);
struct eb64_node *node;
int ret = 0;
@@ -492,6 +525,7 @@
while (node) {
struct qcs *qcs = container_of(node, struct qcs, by_id);
struct buffer *buf = &qcs->tx.buf;
+ struct buffer *out = &qcs->tx.xprt_buf;
/* TODO
* for the moment, unidirectional streams have their own
@@ -503,10 +537,9 @@
continue;
}
- if (b_data(buf)) {
+ if (b_data(buf) || b_data(out)) {
char fin = qcs->flags & QC_SF_FIN_STREAM;
- ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset,
- &qcc->tx.frms);
+ ret = qcs_push_frame(qcs, out, buf, fin, &frms);
BUG_ON(ret < 0); /* TODO handle this properly */
if (ret > 0) {
@@ -527,7 +560,7 @@
node = eb64_next(node);
}
- qc_send_frames(qcc, &qcc->tx.frms);
+ qc_send_frames(qcc, &frms);
/* TODO adjust ret if not all frames are sent. */
return ret;
@@ -672,7 +705,6 @@
qcc->rx.max_data = lparams->initial_max_data;
qcc->tx.max_data = 0;
- LIST_INIT(&qcc->tx.frms);
/* Client initiated streams must respect the server flow control. */
qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;