MEDIUM: mux-quic/h3/qpack: use ncbuf for uni streams
This commit is the equivalent for uni-streams of previous commit
MEDIUM: mux-quic/h3/hq-interop: use ncbuf for bidir streams
All unidirectional streams data is now handle in MUX Rx ncbuf. The
obsolete buffer is not unused and will be cleared in the following
patches.
diff --git a/src/h3.c b/src/h3.c
index 488fccf..9919c7a 100644
--- a/src/h3.c
+++ b/src/h3.c
@@ -334,12 +334,12 @@
* <rxbuf> buffer. This function does not update this buffer.
* Returns 0 if something wrong happened, 1 if not.
*/
-static int h3_parse_settings_frm(struct h3 *h3, const struct buffer *rxbuf, size_t flen)
+static int h3_parse_settings_frm(struct h3 *h3, const struct ncbuf *rxbuf, size_t flen)
{
uint64_t id, value;
const unsigned char *buf, *end;
- buf = (const unsigned char *)b_head(rxbuf);
+ buf = (const unsigned char *)ncb_head(rxbuf);
end = buf + flen;
while (buf <= end) {
@@ -376,20 +376,20 @@
*/
static int h3_control_recv(struct h3_uqs *h3_uqs, void *ctx)
{
- struct buffer *rxbuf = &h3_uqs->qcs->rx.buf;
+ struct ncbuf *rxbuf = &h3_uqs->qcs->rx.ncbuf;
struct h3 *h3 = ctx;
h3_debug_printf(stderr, "%s STREAM ID: %lu\n", __func__, h3_uqs->qcs->id);
- if (!b_data(rxbuf))
+ if (!ncb_data(rxbuf, 0))
return 1;
- while (b_data(rxbuf)) {
+ while (ncb_data(rxbuf, 0)) {
size_t hlen;
uint64_t ftype, flen;
struct buffer b;
/* Work on a copy of <rxbuf> */
- b = b_make(rxbuf->area, rxbuf->size, rxbuf->head, rxbuf->data);
+ b = h3_b_dup(rxbuf);
hlen = h3_decode_frm_header(&ftype, &flen, &b);
if (!hlen)
break;
@@ -399,7 +399,8 @@
if (flen > b_data(&b))
break;
- b_del(rxbuf, hlen);
+ ncb_advance(rxbuf, hlen);
+ h3_uqs->qcs->rx.offset += hlen;
/* From here, a frame must not be truncated */
switch (ftype) {
case H3_FT_CANCEL_PUSH:
@@ -423,14 +424,15 @@
h3->err = H3_FRAME_UNEXPECTED;
return 0;
}
- b_del(rxbuf, flen);
+ ncb_advance(rxbuf, flen);
+ h3_uqs->qcs->rx.offset += flen;
}
/* Handle the case where remaining data are present in the buffer. This
* can happen if there is an incomplete frame. In this case, subscribe
* on the lower layer to restart receive operation.
*/
- if (b_data(rxbuf))
+ if (ncb_data(rxbuf, 0))
qcs_subscribe(h3_uqs->qcs, SUB_RETRY_RECV, &h3_uqs->wait_event);
return 1;
@@ -773,12 +775,19 @@
{
uint64_t strm_type;
struct h3 *h3 = ctx;
- struct buffer *rxbuf = &qcs->rx.buf;
+ struct ncbuf *rxbuf = &qcs->rx.ncbuf;
+ struct buffer b;
+ size_t len = 0;
+
+ b = h3_b_dup(rxbuf);
/* First octets: the uni-stream type */
- if (!b_quic_dec_int(&strm_type, rxbuf, NULL) || strm_type > H3_UNI_STRM_TP_MAX)
+ if (!b_quic_dec_int(&strm_type, &b, &len) || strm_type > H3_UNI_STRM_TP_MAX)
return 0;
+ ncb_advance(rxbuf, len);
+ qcs->rx.offset += len;
+
/* Note that for all the uni-streams below, this is an error to receive two times the
* same type of uni-stream (even for Push stream which is not supported at this time.
*/
diff --git a/src/qpack-dec.c b/src/qpack-dec.c
index 166e1ae..30ab6a7 100644
--- a/src/qpack-dec.c
+++ b/src/qpack-dec.c
@@ -27,6 +27,7 @@
#include <haproxy/buf.h>
#include <haproxy/chunk.h>
#include <haproxy/h3.h>
+#include <haproxy/ncbuf.h>
#include <haproxy/qpack-t.h>
#include <haproxy/qpack-dec.h>
#include <haproxy/qpack-tbl.h>
@@ -96,19 +97,19 @@
int qpack_decode_enc(struct h3_uqs *h3_uqs, void *ctx)
{
size_t len;
- struct buffer *rxbuf;
+ struct ncbuf *rxbuf;
unsigned char inst;
- rxbuf = &h3_uqs->qcs->rx.buf;
- len = b_data(rxbuf);
- qpack_debug_hexdump(stderr, "[QPACK-DEC-ENC] ", b_head(rxbuf), 0, len);
+ rxbuf = &h3_uqs->qcs->rx.ncbuf;
+ len = ncb_data(rxbuf, 0);
+ qpack_debug_hexdump(stderr, "[QPACK-DEC-ENC] ", ncb_head(rxbuf), 0, len);
if (!len) {
qpack_debug_printf(stderr, "[QPACK-DEC-ENC] empty stream\n");
return 0;
}
- inst = (unsigned char)*b_head(rxbuf) & QPACK_ENC_INST_BITMASK;
+ inst = (unsigned char)*ncb_head(rxbuf) & QPACK_ENC_INST_BITMASK;
if (inst == QPACK_ENC_INST_DUP) {
/* Duplicate */
}
@@ -129,19 +130,19 @@
int qpack_decode_dec(struct h3_uqs *h3_uqs, void *ctx)
{
size_t len;
- struct buffer *rxbuf;
+ struct ncbuf *rxbuf;
unsigned char inst;
- rxbuf = &h3_uqs->qcs->rx.buf;
- len = b_data(rxbuf);
- qpack_debug_hexdump(stderr, "[QPACK-DEC-DEC] ", b_head(rxbuf), 0, len);
+ rxbuf = &h3_uqs->qcs->rx.ncbuf;
+ len = ncb_data(rxbuf, 0);
+ qpack_debug_hexdump(stderr, "[QPACK-DEC-DEC] ", ncb_head(rxbuf), 0, len);
if (!len) {
qpack_debug_printf(stderr, "[QPACK-DEC-DEC] empty stream\n");
return 0;
}
- inst = (unsigned char)*b_head(rxbuf) & QPACK_DEC_INST_BITMASK;
+ inst = (unsigned char)*ncb_head(rxbuf) & QPACK_DEC_INST_BITMASK;
if (inst == QPACK_DEC_INST_ICINC) {
/* Insert count increment */
}
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index 4f3bcf7..099ce67 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -38,6 +38,7 @@
#include <haproxy/hq_interop.h>
#include <haproxy/log.h>
#include <haproxy/mux_quic.h>
+#include <haproxy/ncbuf.h>
#include <haproxy/pipe.h>
#include <haproxy/proxy.h>
#include <haproxy/quic_cc.h>
@@ -2170,20 +2171,6 @@
return frm;
}
-/* Copy as most as possible STREAM data from <strm_frm> into <strm> stream.
- * Also update <strm_frm> frame to reflect the data which have been consumed.
- */
-static size_t qc_strm_cpy(struct buffer *buf, struct quic_stream *strm_frm)
-{
- size_t ret;
-
- ret = b_putblk(buf, (char *)strm_frm->data, strm_frm->len);
- strm_frm->len -= ret;
- strm_frm->offset.key += ret;
-
- return ret;
-}
-
/* Handle <strm_frm> bidirectional STREAM frame. Depending on its ID, several
* streams may be open. The data are copied to the stream RX buffer if possible.
* If not, the STREAM frame is stored to be treated again later.
@@ -2221,8 +2208,7 @@
struct quic_conn *qc)
{
struct qcs *strm;
- struct quic_rx_strm_frm *frm;
- size_t strm_frm_len;
+ enum ncb_ret ret;
strm = qcc_get_qcs(qc->qcc, strm_frm->id);
if (!strm) {
@@ -2246,46 +2232,22 @@
strm_frm->data += diff;
}
- strm_frm_len = strm_frm->len;
- if (strm_frm->offset.key == strm->rx.offset) {
- int ret;
-
- if (!qc_get_buf(strm, &strm->rx.buf))
- goto store_frm;
-
- /* qc_strm_cpy() will modify the offset, depending on the number
- * of bytes copied.
- */
- ret = qc_strm_cpy(&strm->rx.buf, strm_frm);
- /* Inform the application of the arrival of this new stream */
- if (!strm->rx.offset && !qc->qcc->app_ops->attach_ruqs(strm, qc->qcc->ctx)) {
- TRACE_PROTO("Could not set an uni-stream", QUIC_EV_CONN_PSTRM, qc);
- return 0;
- }
-
- if (ret)
- qcs_notify_recv(strm);
+ qc_get_ncbuf(strm, &strm->rx.ncbuf);
+ if (ncb_is_null(&strm->rx.ncbuf))
+ return 0;
- strm_frm->offset.key += ret;
- }
- /* Take this frame into an account for the stream flow control */
- strm->rx.offset += strm_frm_len;
- /* It all the data were provided to the application, there is no need to
- * store any more information for it.
- */
- if (!strm_frm->len)
- goto out;
+ ret = ncb_add(&strm->rx.ncbuf, strm_frm->offset.key - strm->rx.offset,
+ (char *)strm_frm->data, strm_frm->len, NCB_ADD_COMPARE);
+ if (ret != NCB_RET_OK)
+ return 0;
- store_frm:
- frm = new_quic_rx_strm_frm(strm_frm, pkt);
- if (!frm) {
- TRACE_PROTO("Could not alloc RX STREAM frame",
- QUIC_EV_CONN_PSTRM, qc);
+ /* Inform the application of the arrival of this new stream */
+ if (!strm->rx.offset && !qc->qcc->app_ops->attach_ruqs(strm, qc->qcc->ctx)) {
+ TRACE_PROTO("Could not set an uni-stream", QUIC_EV_CONN_PSTRM, qc);
return 0;
}
- eb64_insert(&strm->rx.frms, &frm->offset_node);
- quic_rx_packet_refinc(pkt);
+ qcs_notify_recv(strm);
out:
return 1;