MINOR: mux-quic: split xfer and STREAM frames build
Split qcs_push_frame() in two functions.
The first one is qcs_xfer_data(). Its purpose is to transfer data from
qcs.tx.buf to qc_stream_desc buffer. The second function is named
qcs_build_stream_frm(). It generates a STREAM frame using qc_stream_desc
buffer as payload.
The trace events previously associated with qcs_push_frame() has also
been split in two to reflect the new code structure.
The purpose of this refactoring is first to better reflect how sending
is implemented. It will also simplify the implementation of Tx
multi-buffer per streams.
diff --git a/src/mux_quic.c b/src/mux_quic.c
index a4ffe97..d51bed1 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -49,16 +49,23 @@
{ .mask = QMUX_EV_STRM_END, .name = "strm_end", .desc = "detaching app-layer stream" },
#define QMUX_EV_SEND_FRM (1ULL << 13)
{ .mask = QMUX_EV_SEND_FRM, .name = "send_frm", .desc = "sending QUIC frame" },
-/* special event dedicated to qcs_push_frame */
-#define QMUX_EV_QCS_PUSH_FRM (1ULL << 14)
- { .mask = QMUX_EV_QCS_PUSH_FRM, .name = "qcs_push_frm", .desc = "qcs_push_frame" },
+/* special event dedicated to qcs_xfer_data */
+#define QMUX_EV_QCS_XFER_DATA (1ULL << 14)
+ { .mask = QMUX_EV_QCS_XFER_DATA, .name = "qcs_xfer_data", .desc = "qcs_xfer_data" },
+/* special event dedicated to qcs_build_stream_frm */
+#define QMUX_EV_QCS_BUILD_STRM (1ULL << 15)
+ { .mask = QMUX_EV_QCS_BUILD_STRM, .name = "qcs_build_stream_frm", .desc = "qcs_build_stream_frm" },
{ }
};
-/* custom arg for QMUX_EV_QCS_PUSH_FRM */
-struct qcs_push_frm_trace_arg {
- size_t sent;
+/* custom arg for QMUX_EV_QCS_XFER_DATA */
+struct qcs_xfer_data_trace_arg {
+ size_t prep;
int xfer;
+};
+/* custom arg for QMUX_EV_QCS_BUILD_STRM */
+struct qcs_build_stream_trace_arg {
+ size_t len;
char fin;
uint64_t offset;
};
@@ -552,13 +559,11 @@
*
* Returns the total bytes of newly transferred data or a negative error code.
*/
-static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
- struct buffer *payload, int fin,
- struct list *frm_list, uint64_t max_data)
+static int qcs_xfer_data(struct qcs *qcs, struct buffer *out,
+ struct buffer *payload, uint64_t max_data)
{
struct qcc *qcc = qcs->qcc;
- struct quic_frame *frm;
- int head, left, to_xfer;
+ int left, to_xfer;
int total = 0;
TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
@@ -582,7 +587,6 @@
BUG_ON_HOT(qcs->tx.sent_offset < qcs->stream->ack_offset);
BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset);
- head = qcs->tx.sent_offset - qcs->stream->ack_offset;
left = qcs->tx.offset - qcs->tx.sent_offset;
to_xfer = QUIC_MIN(b_data(payload), b_room(out));
@@ -599,12 +603,45 @@
if (!left && !to_xfer)
goto out;
+ total = b_force_xfer(out, payload, to_xfer);
+
+ out:
+ {
+ struct qcs_xfer_data_trace_arg arg = {
+ .prep = b_data(out), .xfer = total,
+ };
+ TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_XFER_DATA,
+ qcc->conn, qcs, &arg);
+ }
+
+ return total;
+
+ err:
+ TRACE_DEVEL("leaving in error", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ return -1;
+}
+
+static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
+ struct list *frm_list)
+{
+ struct qcc *qcc = qcs->qcc;
+ struct quic_frame *frm;
+ int head, total;
+
+ TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+ /* cf buffer schema in qcs_xfer_data */
+ head = qcs->tx.sent_offset - qcs->stream->ack_offset;
+ total = b_data(out) - head;
+ if (!total) {
+ TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ return 0;
+ }
+
frm = pool_zalloc(pool_head_quic_frame);
if (!frm)
goto err;
- total = b_force_xfer(out, payload, to_xfer);
-
frm->type = QUIC_FT_STREAM_8;
frm->stream.stream = qcs->stream;
frm->stream.id = qcs->id;
@@ -612,7 +649,6 @@
frm->stream.data = (unsigned char *)b_peek(out, head);
/* FIN is positioned only when the buffer has been totally emptied. */
- fin = fin && !b_data(payload);
if (fin)
frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
@@ -621,20 +657,18 @@
frm->stream.offset.key = qcs->tx.sent_offset;
}
- if (left + total) {
- frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
- frm->stream.len = left + total;
- }
+ frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
+ frm->stream.len = total;
LIST_APPEND(frm_list, &frm->list);
out:
{
- struct qcs_push_frm_trace_arg arg = {
- .sent = b_data(out), .xfer = total, .fin = fin,
- .offset = qcs->tx.sent_offset
+ struct qcs_build_stream_trace_arg arg = {
+ .len = frm->stream.len, .fin = fin,
+ .offset = frm->stream.offset.key,
};
- TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_PUSH_FRM,
+ TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_BUILD_STRM,
qcc->conn, qcs, &arg);
}
@@ -831,12 +865,10 @@
continue;
}
- if (b_data(buf) || b_data(out)) {
- int ret;
- char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
-
- ret = qcs_push_frame(qcs, out, buf, fin, &frms,
- qcc->tx.sent_offsets + total);
+ /* Prepare <out> buffer with data from <buf>. */
+ if (b_data(buf)) {
+ int ret = qcs_xfer_data(qcs, out, buf,
+ qcc->tx.sent_offsets + total);
BUG_ON(ret < 0); /* TODO handle this properly */
if (ret > 0) {
@@ -847,13 +879,25 @@
qcs->tx.offset += ret;
total += ret;
+ }
- /* Subscribe if not all data can be send. */
- if (b_data(buf)) {
- qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
- SUB_RETRY_SEND, &qcc->wait_event);
- }
+ /* Subscribe if not all data can be transfered. */
+ if (b_data(buf)) {
+ qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
+ SUB_RETRY_SEND, &qcc->wait_event);
}
+
+ /* Build a new STREAM frame with <out> buffer. */
+ if (b_data(out)) {
+ int ret;
+ char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
+
+ /* FIN is set if all incoming data were transfered. */
+ fin = !!(fin && !b_data(buf));
+ ret = qcs_build_stream_frm(qcs, out, fin, &frms);
+ BUG_ON(ret < 0); /* TODO handle this properly */
+ }
+
node = eb64_next(node);
}
@@ -1329,10 +1373,16 @@
if (mask & QMUX_EV_SEND_FRM)
qmux_trace_frm(a3);
- if (mask & QMUX_EV_QCS_PUSH_FRM) {
- const struct qcs_push_frm_trace_arg *arg = a3;
- chunk_appendf(&trace_buf, " sent=%lu xfer=%d fin=%d offset=%lu",
- arg->sent, arg->xfer, arg->fin, arg->offset);
+ if (mask & QMUX_EV_QCS_XFER_DATA) {
+ const struct qcs_xfer_data_trace_arg *arg = a3;
+ chunk_appendf(&trace_buf, " prep=%lu xfer=%d",
+ arg->prep, arg->xfer);
+ }
+
+ if (mask & QMUX_EV_QCS_BUILD_STRM) {
+ const struct qcs_build_stream_trace_arg *arg = a3;
+ chunk_appendf(&trace_buf, " len=%lu fin=%d offset=%lu",
+ arg->len, arg->fin, arg->offset);
}
}
}