MINOR: quic: Enhance the listener RX buffering part
Add a buffer per QUIC connection. At this time the listener which receives
the UDP datagram is responsible of identifying the underlying QUIC connection
and must copy the QUIC packets to its buffer.
->pkt_list member has been added to quic_conn struct to enlist the packets
in the order they have been copied to the connection buffer so that to be
able to consume this buffer when the packets are freed. This list is locked
thanks to a R/W lock to protect it from concurent accesses.
quic_rx_packet struct does not use a static buffer anymore to store the QUIC
packets contents.
diff --git a/include/haproxy/xprt_quic-t.h b/include/haproxy/xprt_quic-t.h
index f7975cf..7e843ab 100644
--- a/include/haproxy/xprt_quic-t.h
+++ b/include/haproxy/xprt_quic-t.h
@@ -234,6 +234,8 @@
#define QUIC_TX_RING_BUFSZ (1UL << 12)
/* Size of the internal buffer of QUIC RX buffer. */
#define QUIC_RX_BUFSZ (1UL << 18)
+/* Size of the QUIC RX buffer for the connections */
+#define QUIC_CONN_RX_BUFSZ (1UL << 13)
extern struct trace_source trace_quic;
extern struct pool_head *pool_head_quic_tx_ring;
@@ -408,6 +410,7 @@
struct quic_rx_packet {
struct mt_list list;
struct mt_list rx_list;
+ struct list qc_rx_pkt_list;
struct quic_conn *qc;
unsigned char type;
uint32_t version;
@@ -423,9 +426,11 @@
uint64_t token_len;
/* Packet length */
uint64_t len;
+ /* Packet length before decryption */
+ uint64_t raw_len;
/* Additional authenticated data length */
size_t aad_len;
- unsigned char data[QUIC_PACKET_MAXLEN];
+ unsigned char *data;
struct eb64_node pn_node;
volatile unsigned int refcnt;
/* Source address of this packet. */
@@ -661,6 +666,11 @@
size_t nb_ack_eliciting;
/* Transport parameters the peer will receive */
struct quic_transport_params params;
+ /* RX buffer */
+ struct buffer buf;
+ /* RX buffer read/write lock */
+ __decl_thread(HA_RWLOCK_T buf_rwlock);
+ struct list pkt_list;
} rx;
unsigned int max_ack_delay;
struct quic_path paths[1];
diff --git a/include/haproxy/xprt_quic.h b/include/haproxy/xprt_quic.h
index 57f639e..f033b66 100644
--- a/include/haproxy/xprt_quic.h
+++ b/include/haproxy/xprt_quic.h
@@ -1026,6 +1026,27 @@
return pkt->type != QUIC_PACKET_TYPE_SHORT;
}
+/* Release the memory for the RX packets which are no more referenced
+ * and consume their payloads which have been copied to the RX buffer
+ * for the connection.
+ * Always succeeds.
+ */
+static inline void quic_rx_packet_pool_purge(struct quic_conn *qc)
+{
+ struct quic_rx_packet *pkt, *pktback;
+
+ list_for_each_entry_safe(pkt, pktback, &qc->rx.pkt_list, qc_rx_pkt_list) {
+ if (pkt->data != (unsigned char *)b_head(&qc->rx.buf))
+ break;
+
+ if (!HA_ATOMIC_LOAD(&pkt->refcnt)) {
+ b_del(&qc->rx.buf, pkt->raw_len);
+ LIST_DELETE(&pkt->qc_rx_pkt_list);
+ pool_free(pool_head_quic_rx_packet, pkt);
+ }
+ }
+}
+
/* Increment the reference counter of <pkt> */
static inline void quic_rx_packet_refinc(struct quic_rx_packet *pkt)
{
@@ -1035,8 +1056,27 @@
/* Decrement the reference counter of <pkt> */
static inline void quic_rx_packet_refdec(struct quic_rx_packet *pkt)
{
- if (!HA_ATOMIC_SUB_FETCH(&pkt->refcnt, 1))
+ if (HA_ATOMIC_SUB_FETCH(&pkt->refcnt, 1))
+ return;
+
+ if (!pkt->qc) {
+ /* It is possible the connection for this packet has not already been
+ * identified. In such a case, we only need to free this packet.
+ */
pool_free(pool_head_quic_rx_packet, pkt);
+ }
+ else {
+ struct quic_conn *qc = pkt->qc;
+
+ HA_RWLOCK_WRLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
+ if (pkt->data == (unsigned char *)b_head(&qc->rx.buf)) {
+ b_del(&qc->rx.buf, pkt->raw_len);
+ LIST_DELETE(&pkt->qc_rx_pkt_list);
+ pool_free(pool_head_quic_rx_packet, pkt);
+ quic_rx_packet_pool_purge(qc);
+ }
+ HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
+ }
}
/* Increment the reference counter of <pkt> */
@@ -1052,7 +1092,7 @@
pool_free(pool_head_quic_tx_packet, pkt);
}
-ssize_t quic_lstnr_dgram_read(char *buf, size_t len, void *owner,
+ssize_t quic_lstnr_dgram_read(struct buffer *buf, size_t len, void *owner,
struct sockaddr_storage *saddr);
#endif /* USE_QUIC */
#endif /* _HAPROXY_XPRT_QUIC_H */
diff --git a/src/quic_sock.c b/src/quic_sock.c
index ffd4504..875779d 100644
--- a/src/quic_sock.c
+++ b/src/quic_sock.c
@@ -175,10 +175,13 @@
void quic_sock_fd_iocb(int fd)
{
ssize_t ret;
+ struct rxbuf *rxbuf;
struct buffer *buf;
struct listener *l = objt_listener(fdtab[fd].owner);
+ struct quic_transport_params *params = &l->bind_conf->quic_params;
/* Source address */
struct sockaddr_storage saddr = {0};
+ size_t max_sz;
socklen_t saddrlen;
BUG_ON(!l);
@@ -186,20 +189,32 @@
if (!(fdtab[fd].state & FD_POLL_IN) || !fd_recv_ready(fd))
return;
- buf = get_trash_chunk();
+ rxbuf = MT_LIST_POP(&l->rx.rxbuf_list, typeof(rxbuf), mt_list);
+ buf = &rxbuf->buf;
+ max_sz = params->max_udp_payload_size;
+ if (b_contig_space(buf) < max_sz) {
+ /* Note that when we enter this function, <buf> is always empty */
+ b_reset(buf);
+ if (b_contig_space(buf) < max_sz)
+ goto out;
+ }
+
saddrlen = sizeof saddr;
do {
- ret = recvfrom(fd, buf->area, buf->size, 0,
+ ret = recvfrom(fd, b_tail(buf), max_sz, 0,
(struct sockaddr *)&saddr, &saddrlen);
if (ret < 0) {
if (errno == EINTR)
continue;
if (errno == EAGAIN)
fd_cant_recv(fd);
- return;
+ goto out;
}
} while (0);
- buf->data = ret;
- quic_lstnr_dgram_read(buf->area, buf->data, l, &saddr);
+ b_add(buf, ret);
+ quic_lstnr_dgram_read(buf, ret, l, &saddr);
+ b_del(buf, ret);
+ out:
+ MT_LIST_APPEND(&l->rx.rxbuf_list, &rxbuf->mt_list);
}
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index 244dfcd..af694cd 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -140,6 +140,7 @@
DECLARE_POOL(pool_head_quic_tx_ring, "quic_tx_ring_pool", QUIC_TX_RING_BUFSZ);
DECLARE_POOL(pool_head_quic_rxbuf, "quic_rxbuf_pool", QUIC_RX_BUFSZ);
+DECLARE_POOL(pool_head_quic_conn_rxbuf, "quic_conn_rxbuf", QUIC_CONN_RX_BUFSZ);
DECLARE_STATIC_POOL(pool_head_quic_conn_ctx,
"quic_conn_ctx_pool", sizeof(struct ssl_sock_ctx));
DECLARE_STATIC_POOL(pool_head_quic_conn, "quic_conn", sizeof(struct quic_conn));
@@ -2921,6 +2922,7 @@
quic_conn_enc_level_uninit(&conn->els[i]);
if (conn->timer_task)
task_destroy(conn->timer_task);
+ pool_free(pool_head_quic_conn_rxbuf, conn->rx.buf.area);
pool_free(pool_head_quic_conn, conn);
}
@@ -2994,6 +2996,7 @@
struct quic_conn *qc;
/* Initial CID. */
struct quic_connection_id *icid;
+ char *buf_area;
TRACE_ENTER(QUIC_EV_CONN_INIT);
qc = pool_zalloc(pool_head_quic_conn);
@@ -3002,6 +3005,12 @@
goto err;
}
+ buf_area = pool_alloc(pool_head_quic_conn_rxbuf);
+ if (!buf_area) {
+ TRACE_PROTO("Could not allocate a new RX buffer", QUIC_EV_CONN_INIT);
+ goto err;
+ }
+
qc->cids = EB_ROOT;
/* QUIC Server (or listener). */
if (server) {
@@ -3063,6 +3072,9 @@
/* RX part. */
qc->rx.bytes = 0;
qc->rx.nb_ack_eliciting = 0;
+ qc->rx.buf = b_make(buf_area, QUIC_CONN_RX_BUFSZ, 0, 0);
+ HA_RWLOCK_INIT(&qc->rx.buf_rwlock);
+ LIST_INIT(&qc->rx.pkt_list);
/* XXX TO DO: Only one path at this time. */
qc->path = &qc->paths[0];
@@ -3179,7 +3191,17 @@
return 0;
}
+/* Insert <pkt> RX packet in its <qel> RX packets tree */
+static void qc_pkt_insert(struct quic_rx_packet *pkt, struct quic_enc_level *qel)
+{
+ pkt->pn_node.key = pkt->pn;
+ HA_RWLOCK_WRLOCK(QUIC_LOCK, &qel->rx.pkts_rwlock);
+ eb64_insert(&qel->rx.pkts, &pkt->pn_node);
+ HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qel->rx.pkts_rwlock);
+ quic_rx_packet_refinc(pkt);
+}
+
-/* Try to remove the header protecttion of <pkt> QUIC packet attached to <conn>
+/* Try to remove the header protection of <pkt> QUIC packet attached to <qc>
* QUIC connection with <buf> as packet number field address, <end> a pointer to one
* byte past the end of the buffer containing this packet and <beg> the address of
* the packet first byte.
@@ -3189,7 +3211,8 @@
static inline int qc_try_rm_hp(struct quic_rx_packet *pkt,
unsigned char **buf, unsigned char *beg,
const unsigned char *end,
- struct quic_conn *qc, struct ssl_sock_ctx *ctx)
+ struct quic_conn *qc, struct quic_enc_level **el,
+ struct ssl_sock_ctx *ctx)
{
unsigned char *pn = NULL; /* Packet number field */
struct quic_enc_level *qel;
@@ -3217,21 +3240,33 @@
/* The AAD includes the packet number field found at <pn>. */
pkt->aad_len = pn - beg + pkt->pnl;
qpkt_trace = pkt;
- /* Store the packet */
- pkt->pn_node.key = pkt->pn;
- HA_RWLOCK_WRLOCK(QUIC_LOCK, &qel->rx.pkts_rwlock);
- eb64_insert(&qel->rx.pkts, &pkt->pn_node);
- quic_rx_packet_refinc(pkt);
- HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qel->rx.pkts_rwlock);
}
else if (qel) {
+ if (qel->tls_ctx.rx.flags & QUIC_FL_TLS_SECRETS_DCD) {
+ /* If the packet number space has been discarded, this packet
+ * will be not parsed.
+ */
+ TRACE_PROTO("Discarded pktns", QUIC_EV_CONN_TRMHP, ctx ? ctx->conn : NULL, pkt);
+ goto out;
+ }
+
TRACE_PROTO("hp not removed", QUIC_EV_CONN_TRMHP, ctx ? ctx->conn : NULL, pkt);
pkt->pn_offset = pn - beg;
MT_LIST_APPEND(&qel->rx.pqpkts, &pkt->list);
quic_rx_packet_refinc(pkt);
}
+ else {
+ TRACE_PROTO("Unknown packet type", QUIC_EV_CONN_TRMHP, ctx ? ctx->conn : NULL);
+ goto err;
+ }
- memcpy(pkt->data, beg, pkt->len);
+ *el = qel;
+ /* No reference counter incrementation here!!! */
+ LIST_APPEND(&qc->rx.pkt_list, &pkt->qc_rx_pkt_list);
+ memcpy(b_tail(&qc->rx.buf), beg, pkt->len);
+ pkt->data = (unsigned char *)b_tail(&qc->rx.buf);
+ b_add(&qc->rx.buf, pkt->len);
+ out:
/* Updtate the offset of <*buf> for the next QUIC packet. */
*buf = beg + pkt->len;
@@ -3274,6 +3309,8 @@
struct connection *srv_conn;
struct ssl_sock_ctx *conn_ctx;
int long_header;
+ size_t b_cspace;
+ struct quic_enc_level *qel;
qc = NULL;
TRACE_ENTER(QUIC_EV_CONN_SPKT);
@@ -3377,14 +3414,27 @@
goto err;
}
- if (pkt->len > sizeof pkt->data) {
- TRACE_PROTO("Too big packet", QUIC_EV_CONN_SPKT, qc->conn, pkt, &pkt->len);
- goto err;
+ HA_RWLOCK_WRLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
+ b_cspace = b_contig_space(&qc->rx.buf);
+ if (b_cspace < pkt->len) {
+ /* Let us consume the remaining contiguous space. */
+ b_add(&qc->rx.buf, b_cspace);
+ if (b_contig_space(&qc->rx.buf) < pkt->len) {
+ HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
+ TRACE_PROTO("Too big packet", QUIC_EV_CONN_SPKT, qc->conn, pkt, &pkt->len);
+ goto err;
+ }
}
- if (!qc_try_rm_hp(pkt, buf, beg, end, qc, conn_ctx))
+ if (!qc_try_rm_hp(pkt, buf, beg, end, qc, &qel, conn_ctx)) {
+ HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
+ TRACE_PROTO("Packet dropped", QUIC_EV_CONN_SPKT, qc->conn);
goto err;
+ }
+ HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
+ if (pkt->aad_len)
+ qc_pkt_insert(pkt, qel);
/* Wake the tasklet of the QUIC connection packet handler. */
if (conn_ctx)
tasklet_wakeup(conn_ctx->wait_event.tasklet);
@@ -3410,6 +3460,8 @@
struct listener *l;
struct ssl_sock_ctx *conn_ctx;
int long_header = 0;
+ size_t b_cspace;
+ struct quic_enc_level *qel;
qc = NULL;
conn_ctx = NULL;
@@ -3600,6 +3652,8 @@
pkt->len = end - *buf;
}
+ pkt->qc = qc;
+
/* Store the DCID used for this packet to check the packet which
* come in this UDP datagram match with it.
*/
@@ -3609,25 +3663,35 @@
}
/* Increase the total length of this packet by the header length. */
- pkt->len += *buf - beg;
+ pkt->raw_len = pkt->len += *buf - beg;
/* Do not check the DCID node before the length. */
if (dgram_ctx->dcid_node != node) {
TRACE_PROTO("Packet dropped", QUIC_EV_CONN_LPKT, qc->conn);
goto err;
}
- if (pkt->len > sizeof pkt->data) {
- TRACE_PROTO("Too big packet", QUIC_EV_CONN_LPKT, qc->conn, pkt, &pkt->len);
- goto err;
+ HA_RWLOCK_WRLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
+ b_cspace = b_contig_space(&qc->rx.buf);
+ if (b_cspace < pkt->len) {
+ /* Let us consume the remaining contiguous space. */
+ b_add(&qc->rx.buf, b_cspace);
+ if (b_contig_space(&qc->rx.buf) < pkt->len) {
+ HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
+ TRACE_PROTO("Too big packet", QUIC_EV_CONN_LPKT, qc->conn, pkt, &pkt->len);
+ goto err;
+ }
}
- if (!qc_try_rm_hp(pkt, buf, beg, end, qc, conn_ctx)) {
+ if (!qc_try_rm_hp(pkt, buf, beg, end, qc, &qel, conn_ctx)) {
+ HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
TRACE_PROTO("Packet dropped", QUIC_EV_CONN_LPKT, qc->conn);
goto err;
}
-
+ HA_RWLOCK_WRUNLOCK(QUIC_LOCK, &qc->rx.buf_rwlock);
TRACE_PROTO("New packet", QUIC_EV_CONN_LPKT, qc->conn, pkt);
+ if (pkt->aad_len)
+ qc_pkt_insert(pkt, qel);
/* Wake up the connection packet handler task from here only if all
* the contexts have been initialized, especially the mux context
* conn_ctx->conn->ctx. Note that this is ->start xprt callback which
@@ -4671,12 +4735,11 @@
BIO_meth_free(ha_quic_meth);
}
-/* Read all the QUIC packets found in <buf> with <len> as length (typically a UDP
- * datagram), <ctx> being the QUIC I/O handler context, from QUIC connections,
- * calling <func> function;
+/* Read all the QUIC packets found in <buf> from QUIC connection with <owner>
+ * as owner calling <func> function.
* Return the number of bytes read if succeeded, -1 if not.
*/
-static ssize_t quic_dgram_read(char *buf, size_t len, void *owner,
+static ssize_t quic_dgram_read(struct buffer *buf, size_t len, void *owner,
struct sockaddr_storage *saddr, qpkt_read_func *func)
{
unsigned char *pos;
@@ -4686,9 +4749,8 @@
.owner = owner,
};
- pos = (unsigned char *)buf;
+ pos = (unsigned char *)b_head(buf);
end = pos + len;
-
do {
int ret;
struct quic_rx_packet *pkt;
@@ -4720,7 +4782,7 @@
return -1;
}
-ssize_t quic_lstnr_dgram_read(char *buf, size_t len, void *owner,
+ssize_t quic_lstnr_dgram_read(struct buffer *buf, size_t len, void *owner,
struct sockaddr_storage *saddr)
{
return quic_dgram_read(buf, len, owner, saddr, qc_lstnr_pkt_rcv);