MINOR: mux-quic: implement immediate send retry
Complete qc_send function. After having processed each qcs emission, it
will now retry send on qcs where transfer can continue. This is useful
when qc_stream_desc buffer is full and there is still data present in
qcs buf.
To implement this, each eligible qcs is inserted in a new list
<qcc.send_retry_list>. This is done on send notification from the
transport layer through qcc_streams_sent_done(). Retry emission until
send_retry_list is empty or the transport layer cannot proceed more
data.
Several send operations are now called on two different places. Thus a
new _qc_send_qcs() function is defined to factorize the code.
This change should maximize the throughput during QUIC transfers.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index 44fb4d6..d95e9dc 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -10,6 +10,7 @@
#include <haproxy/buf-t.h>
#include <haproxy/connection-t.h>
+#include <haproxy/list-t.h>
#include <haproxy/quic_stream-t.h>
#include <haproxy/xprt_quic-t.h>
#include <haproxy/conn_stream-t.h>
@@ -70,6 +71,8 @@
struct eb_root streams_by_id; /* all active streams by their ID */
+ struct list send_retry_list; /* list of qcs eligible to send retry */
+
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
struct wait_event *subs;
@@ -111,6 +114,8 @@
uint64_t id;
struct qc_stream_desc *stream;
+ struct list el; /* element of qcc.send_retry_list */
+
struct wait_event wait_event;
struct wait_event *subs;
};
diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h
index 6d9359d..0550f4f 100644
--- a/include/haproxy/quic_stream.h
+++ b/include/haproxy/quic_stream.h
@@ -14,7 +14,6 @@
void qc_stream_desc_free(struct qc_stream_desc *stream);
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
-int qc_stream_buf_avail(struct quic_conn *qc);
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
uint64_t offset);
void qc_stream_buf_release(struct qc_stream_desc *stream);
diff --git a/src/mux_quic.c b/src/mux_quic.c
index b969d3a..6a65d69 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -7,6 +7,7 @@
#include <haproxy/conn_stream.h>
#include <haproxy/dynbuf.h>
#include <haproxy/htx.h>
+#include <haproxy/list.h>
#include <haproxy/pool.h>
#include <haproxy/quic_stream.h>
#include <haproxy/sink.h>
@@ -722,14 +723,9 @@
if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
qc_stream_buf_release(qcs->stream);
-
- /* reschedule send if buffers available */
- if (qc_stream_buf_avail(qcc->conn->handle.qc)) {
- tasklet_wakeup(qcc->wait_event.tasklet);
- }
- else {
- qcc->flags |= QC_CF_CONN_FULL;
- }
+ /* prepare qcs for immediate send retry if data to send */
+ if (b_data(&qcs->tx.buf))
+ LIST_APPEND(&qcc->send_retry_list, &qcs->el);
}
}
@@ -760,9 +756,11 @@
if (LIST_ISEMPTY(frms)) {
TRACE_DEVEL("leaving with no frames to send", QMUX_EV_QCC_SEND, qcc->conn);
- return 0;
+ return 1;
}
+ LIST_INIT(&qcc->send_retry_list);
+
retry_send:
first_frm = LIST_ELEM(frms->n, struct quic_frame *, list);
if ((first_frm->type & QUIC_FT_STREAM_8) == QUIC_FT_STREAM_8) {
@@ -837,6 +835,65 @@
return 0;
}
+/* Used internally by qc_send function. Proceed to send for <qcs>. This will
+ * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
+ * is then generated and inserted in <frms> list. <qcc_max_data> is the current
+ * flow-control max-data at the connection level which must not be surpassed.
+ *
+ * Returns the total bytes transferred between qcs and quic_stream buffers. Can
+ * be null if out buffer cannot be allocated.
+ */
+static int _qc_send_qcs(struct qcs *qcs, struct list *frms,
+ uint64_t qcc_max_data)
+{
+ struct qcc *qcc = qcs->qcc;
+ struct buffer *buf = &qcs->tx.buf;
+ struct buffer *out = qc_stream_buf_get(qcs->stream);
+ int xfer = 0;
+
+ /* Allocate <out> buffer if necessary. */
+ if (!out) {
+ if (qcc->flags & QC_CF_CONN_FULL)
+ return 0;
+
+ out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
+ if (!out) {
+ qcc->flags |= QC_CF_CONN_FULL;
+ return 0;
+ }
+ }
+
+ /* Transfer data from <buf> to <out>. */
+ if (b_data(buf)) {
+ xfer = qcs_xfer_data(qcs, out, buf, qcc_max_data);
+ BUG_ON(xfer < 0); /* TODO handle this properly */
+
+ if (xfer > 0) {
+ qcs_notify_send(qcs);
+ qcs->flags &= ~QC_SF_BLK_MROOM;
+ }
+
+ qcs->tx.offset += xfer;
+ }
+
+ /* out buffer cannot be emptied if qcs offsets differ. */
+ BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset);
+
+ /* Build a new STREAM frame with <out> buffer. */
+ if (qcs->tx.sent_offset != qcs->tx.offset) {
+ int ret;
+ char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
+
+ /* FIN is set if all incoming data were transfered. */
+ fin = !!(fin && !b_data(buf));
+
+ ret = qcs_build_stream_frm(qcs, out, fin, frms);
+ BUG_ON(ret < 0); /* TODO handle this properly */
+ }
+
+ return xfer;
+}
+
/* Proceed to sending. Loop through all available streams for the <qcc>
* instance and try to send as much as possible.
*
@@ -846,7 +903,8 @@
{
struct list frms = LIST_HEAD_INIT(frms);
struct eb64_node *node;
- int total = 0;
+ struct qcs *qcs, *qcs_tmp;
+ int total = 0, tmp_total = 0;
TRACE_ENTER(QMUX_EV_QCC_SEND);
@@ -867,9 +925,8 @@
*/
node = eb64_first(&qcc->streams_by_id);
while (node) {
- struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
- struct buffer *buf = &qcs->tx.buf;
- struct buffer *out = qc_stream_buf_get(qcs->stream);
+ int ret;
+ qcs = eb64_entry(node, struct qcs, by_id);
/* TODO
* for the moment, unidirectional streams have their own
@@ -886,63 +943,38 @@
continue;
}
- if (!b_data(buf) && !out) {
- node = eb64_next(node);
- continue;
- }
-
- if (!out && (qcc->flags & QC_CF_CONN_FULL)) {
+ if (!b_data(&qcs->tx.buf) && !qc_stream_buf_get(qcs->stream)) {
node = eb64_next(node);
continue;
}
- if (!out) {
- out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
- if (!out) {
- qcc->flags |= QC_CF_CONN_FULL;
- node = eb64_next(node);
- continue;
- }
- }
-
- /* Prepare <out> buffer with data from <buf>. */
- if (b_data(buf)) {
- int ret = qcs_xfer_data(qcs, out, buf,
- qcc->tx.sent_offsets + total);
- BUG_ON(ret < 0); /* TODO handle this properly */
-
- if (ret > 0) {
- qcs_notify_send(qcs);
- if (qcs->flags & QC_SF_BLK_MROOM)
- qcs->flags &= ~QC_SF_BLK_MROOM;
- }
-
- qcs->tx.offset += ret;
- total += ret;
- }
-
- /* Subscribe if not all data can be transfered. */
- if (b_data(buf)) {
- qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
- SUB_RETRY_SEND, &qcc->wait_event);
- }
+ ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + total);
+ total += ret;
+ node = eb64_next(node);
+ }
- /* Build a new STREAM frame with <out> buffer. */
- if (b_data(out) && qcs->tx.sent_offset != qcs->tx.offset) {
- int ret;
- char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
+ if (qc_send_frames(qcc, &frms)) {
+ /* data rejected by transport layer, do not retry. */
+ goto out;
+ }
- /* FIN is set if all incoming data were transfered. */
- fin = !!(fin && !b_data(buf));
- ret = qcs_build_stream_frm(qcs, out, fin, &frms);
- BUG_ON(ret < 0); /* TODO handle this properly */
- }
+ retry:
+ tmp_total = 0;
+ list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_retry_list, el) {
+ int ret;
+ BUG_ON(!b_data(&qcs->tx.buf));
+ BUG_ON(qc_stream_buf_get(qcs->stream));
- node = eb64_next(node);
+ ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + tmp_total);
+ tmp_total += ret;
+ LIST_DELETE(&qcs->el);
}
- qc_send_frames(qcc, &frms);
+ total += tmp_total;
+ if (!qc_send_frames(qcc, &frms) && !LIST_ISEMPTY(&qcc->send_retry_list))
+ goto retry;
+ out:
TRACE_LEAVE(QMUX_EV_QCC_SEND);
return total;
@@ -1105,6 +1137,8 @@
if (!qcc->wait_event.tasklet)
goto fail_no_tasklet;
+ LIST_INIT(&qcc->send_retry_list);
+
qcc->subs = NULL;
qcc->wait_event.tasklet->process = qc_io_cb;
qcc->wait_event.tasklet->context = qcc;
diff --git a/src/quic_stream.c b/src/quic_stream.c
index 2dd9b1c..839efd0 100644
--- a/src/quic_stream.c
+++ b/src/quic_stream.c
@@ -206,7 +206,7 @@
/* Check if a new stream buffer can be allocated for the connection <qc>.
* Returns a boolean.
*/
-int qc_stream_buf_avail(struct quic_conn *qc)
+static int qc_stream_buf_avail(struct quic_conn *qc)
{
/* TODO use a global tune settings for max */
return qc->stream_buf_count < 30;