MINOR: quic: limit total stream buffers per connection
MUX streams can now allocate multiple buffers for sending. quic-conn is
responsible to limit the total count of allowed allocated buffers. A
counter is stored in the new field <stream_buf_count>.
For the moment, the value is hardcoded to 30.
On stream buffer allocation failure, the qcc MUX is flagged with
QC_CF_CONN_FULL. The MUX is then woken up as soon as a buffer is freed,
most notably on ACK reception.
diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h
index 0c8ce60..44fb4d6 100644
--- a/include/haproxy/mux_quic-t.h
+++ b/include/haproxy/mux_quic-t.h
@@ -26,6 +26,7 @@
};
#define QC_CF_BLK_MFCTL 0x00000001 /* sending blocked due to connection flow-control */
+#define QC_CF_CONN_FULL 0x00000002 /* no stream buffers available on connection */
struct qcc {
struct connection *conn;
diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h
index 0550f4f..6d9359d 100644
--- a/include/haproxy/quic_stream.h
+++ b/include/haproxy/quic_stream.h
@@ -14,6 +14,7 @@
void qc_stream_desc_free(struct qc_stream_desc *stream);
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
+int qc_stream_buf_avail(struct quic_conn *qc);
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
uint64_t offset);
void qc_stream_buf_release(struct qc_stream_desc *stream);
diff --git a/include/haproxy/xprt_quic-t.h b/include/haproxy/xprt_quic-t.h
index ab714ea..6c4336c 100644
--- a/include/haproxy/xprt_quic-t.h
+++ b/include/haproxy/xprt_quic-t.h
@@ -750,6 +750,7 @@
struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
struct eb_root streams_by_id; /* qc_stream_desc tree */
+ int stream_buf_count; /* total count of allocated stream buffers for this connection */
/* MUX */
struct qcc *qcc;
diff --git a/src/mux_quic.c b/src/mux_quic.c
index d0fd0b2..b969d3a 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -722,7 +722,14 @@
if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
qc_stream_buf_release(qcs->stream);
- tasklet_wakeup(qcc->wait_event.tasklet);
+
+ /* reschedule send if buffers available */
+ if (qc_stream_buf_avail(qcc->conn->handle.qc)) {
+ tasklet_wakeup(qcc->wait_event.tasklet);
+ }
+ else {
+ qcc->flags |= QC_CF_CONN_FULL;
+ }
}
}
@@ -884,14 +891,15 @@
continue;
}
- if (!out) {
- struct connection *conn = qcc->conn;
+ if (!out && (qcc->flags & QC_CF_CONN_FULL)) {
+ node = eb64_next(node);
+ continue;
+ }
- out = qc_stream_buf_alloc(qcs->stream,
- qcs->tx.offset);
+ if (!out) {
+ out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
if (!out) {
- conn->xprt->subscribe(conn, conn->xprt_ctx,
- SUB_RETRY_SEND, &qcc->wait_event);
+ qcc->flags |= QC_CF_CONN_FULL;
node = eb64_next(node);
continue;
}
diff --git a/src/quic_stream.c b/src/quic_stream.c
index 0e58366..2dd9b1c 100644
--- a/src/quic_stream.c
+++ b/src/quic_stream.c
@@ -75,11 +75,11 @@
* Returns the count of byte removed from stream. Do not forget to check if
* <stream> is NULL after invocation.
*/
-int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset,
- size_t len)
+int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len)
{
struct qc_stream_desc *s = *stream;
struct qc_stream_buf *stream_buf;
+ struct quic_conn *qc = s->qc;
struct buffer *buf;
size_t diff;
@@ -115,6 +115,15 @@
pool_free(pool_head_quic_conn_stream_buf, stream_buf);
offer_buffers(NULL, 1);
+ /* notify MUX about available buffers. */
+ --qc->stream_buf_count;
+ if (qc->mux_state == QC_MUX_READY) {
+ if (qc->qcc->flags & QC_CF_CONN_FULL) {
+ qc->qcc->flags &= ~QC_CF_CONN_FULL;
+ tasklet_wakeup(qc->qcc->wait_event.tasklet);
+ }
+ }
+
/* Free stream instance if already released and no buffers left. */
if (s->release && LIST_ISEMPTY(&s->buf_list)) {
qc_stream_desc_free(s);
@@ -131,6 +140,7 @@
void qc_stream_desc_free(struct qc_stream_desc *stream)
{
struct qc_stream_buf *buf, *buf_back;
+ struct quic_conn *qc = stream->qc;
struct eb64_node *frm_node;
unsigned int free_count = 0;
@@ -148,9 +158,19 @@
}
}
- if (free_count)
+ if (free_count) {
offer_buffers(NULL, free_count);
+ qc->stream_buf_count -= free_count;
+ if (qc->mux_state == QC_MUX_READY) {
+ /* notify MUX about available buffers. */
+ if (qc->qcc->flags & QC_CF_CONN_FULL) {
+ qc->qcc->flags &= ~QC_CF_CONN_FULL;
+ tasklet_wakeup(qc->qcc->wait_event.tasklet);
+ }
+ }
+ }
+
/* qc_stream_desc might be freed before having received all its ACKs.
* This is the case if some frames were retransmitted.
*/
@@ -183,18 +203,35 @@
return &stream->buf->buf;
}
+/* Check if a new stream buffer can be allocated for the connection <qc>.
+ * Returns a boolean.
+ */
+int qc_stream_buf_avail(struct quic_conn *qc)
+{
+ /* TODO use a global tune settings for max */
+ return qc->stream_buf_count < 30;
+}
+
-/* Allocate a new current buffer for <stream>. This function is not allowed if
- * current buffer is not NULL prior to this call. The new buffer represents
- * stream payload at offset <offset>.
+/* Allocate a new current buffer for <stream>. The buffer limit count for the
+ * connection is checked first. This function is not allowed if current buffer
+ * is not NULL prior to this call. The new buffer represents stream payload at
+ * offset <offset>.
*
* Returns the buffer or NULL.
*/
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
uint64_t offset)
{
+ struct quic_conn *qc = stream->qc;
+
/* current buffer must be released first before allocate a new one. */
BUG_ON(stream->buf);
+ if (!qc_stream_buf_avail(qc))
+ return NULL;
+
+ ++qc->stream_buf_count;
+
stream->buf_offset = offset;
stream->buf = pool_alloc(pool_head_quic_conn_stream_buf);
if (!stream->buf)
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index 543fa2c..21abd85 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -4057,6 +4057,7 @@
MT_LIST_INIT(&qc->accept_list);
qc->streams_by_id = EB_ROOT_UNIQUE;
+ qc->stream_buf_count = 0;
TRACE_LEAVE(QUIC_EV_CONN_INIT, qc);