MINOR: mux-quic: implement subscribe on stream
Implement the subscription in the mux on the qcs instance.
Subscribe is now used by the h3 layer when receiving an incomplete frame
on the H3 control stream. It is also used when attaching the remote
uni-directional streams on the h3 layer.
In the qc_send, the mux wakes up the qcs for each new transfer executed.
This is done via the method qcs_notify_send().
The xprt wakes up the qcs when receiving data on unidirectional streams.
This is done via the method qcs_notify_recv().
diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h
index aa8572c..9e7ef17 100644
--- a/include/haproxy/mux_quic.h
+++ b/include/haproxy/mux_quic.h
@@ -15,6 +15,10 @@
struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
+int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
+void qcs_notify_recv(struct qcs *qcs);
+void qcs_notify_send(struct qcs *qcs);
+
/* Bit shift to get the stream sub ID for internal use which is obtained
* shifting the stream IDs by this value, knowing that the
* QCS_ID_TYPE_SHIFT less significant bits identify the stream ID
diff --git a/src/h3.c b/src/h3.c
index ecaa02d..359b276 100644
--- a/src/h3.c
+++ b/src/h3.c
@@ -322,9 +322,12 @@
b_del(rxbuf, flen);
}
- /* TODO handle the case when the buffer is not empty. This can happens
- * if there is an incomplete frame.
+ /* Handle the case where remaining data are present in the buffer. This
+ * can happen if there is an incomplete frame. In this case, subscribe
+ * on the lower layer to restart receive operation.
*/
+ if (b_data(rxbuf))
+ qcs_subscribe(h3_uqs->qcs, SUB_RETRY_RECV, &h3_uqs->wait_event);
return 1;
}
@@ -658,7 +661,7 @@
h3->rctrl.qcs = qcs;
h3->rctrl.cb = h3_control_recv;
- // TODO wake-up rctrl tasklet on reception
+ qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rctrl.wait_event);
break;
case H3_UNI_STRM_TP_PUSH_STREAM:
/* NOT SUPPORTED */
@@ -671,7 +674,7 @@
h3->rqpack_enc.qcs = qcs;
h3->rqpack_enc.cb = qpack_decode_enc;
- // TODO wake-up rqpack_enc tasklet on reception
+ qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_enc.wait_event);
break;
case H3_UNI_STRM_TP_QPACK_DECODER:
if (h3->rqpack_dec.qcs) {
@@ -681,7 +684,7 @@
h3->rqpack_dec.qcs = qcs;
h3->rqpack_dec.cb = qpack_decode_dec;
- // TODO wake-up rqpack_dec tasklet on reception
+ qcs_subscribe(qcs, SUB_RETRY_RECV, &h3->rqpack_dec.wait_event);
break;
default:
/* Error */
diff --git a/src/mux_quic.c b/src/mux_quic.c
index df58686..3b194f6 100644
--- a/src/mux_quic.c
+++ b/src/mux_quic.c
@@ -84,6 +84,39 @@
return buf;
}
+int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
+{
+ fprintf(stderr, "%s\n", __func__);
+
+ BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
+ BUG_ON(qcs->subs && qcs->subs != es);
+
+ es->events |= event_type;
+ qcs->subs = es;
+
+ return 0;
+}
+
+void qcs_notify_recv(struct qcs *qcs)
+{
+ if (qcs->subs && qcs->subs->events & SUB_RETRY_RECV) {
+ tasklet_wakeup(qcs->subs->tasklet);
+ qcs->subs->events &= ~SUB_RETRY_RECV;
+ if (!qcs->subs->events)
+ qcs->subs = NULL;
+ }
+}
+
+void qcs_notify_send(struct qcs *qcs)
+{
+ if (qcs->subs && qcs->subs->events & SUB_RETRY_SEND) {
+ tasklet_wakeup(qcs->subs->tasklet);
+ qcs->subs->events &= ~SUB_RETRY_SEND;
+ if (!qcs->subs->events)
+ qcs->subs = NULL;
+ }
+}
+
static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
{
struct quic_frame *frm;
@@ -157,6 +190,9 @@
if (ret < 0)
ABORT_NOW();
+ if (ret > 0)
+ qcs_notify_send(qcs);
+
/* TODO wake-up xprt if data were transfered */
fprintf(stderr, "%s ret=%d\n", __func__, ret);
@@ -323,8 +359,7 @@
static int qc_subscribe(struct conn_stream *cs, int event_type,
struct wait_event *es)
{
- /* XXX TODO XXX */
- return 0;
+ return qcs_subscribe(cs->ctx, event_type, es);
}
/* Called from the upper layer, to unsubscribe <es> from events <event_type>.
diff --git a/src/xprt_quic.c b/src/xprt_quic.c
index 96d409f..d70be2c 100644
--- a/src/xprt_quic.c
+++ b/src/xprt_quic.c
@@ -2149,6 +2149,9 @@
return 0;
}
+ if (ret)
+ qcs_notify_recv(strm);
+
strm_frm->offset.key += ret;
}
/* Take this frame into an account for the stream flow control */