blob: d36bc62fd3b76b5572e1c357b18ec118ac5aa53a [file] [log] [blame]
Amaury Denoyelledeed7772021-12-03 11:36:46 +01001#include <haproxy/mux_quic.h>
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002
Amaury Denoyelleeb01f592021-10-07 16:44:05 +02003#include <import/eb64tree.h>
4
Frédéric Lécailledfbae762021-02-18 09:59:01 +01005#include <haproxy/api.h>
Frédéric Lécailledfbae762021-02-18 09:59:01 +01006#include <haproxy/connection.h>
Amaury Denoyelledeed7772021-12-03 11:36:46 +01007#include <haproxy/dynbuf.h>
Frédéric Lécaille9969adb2023-01-18 11:52:21 +01008#include <haproxy/h3.h>
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02009#include <haproxy/list.h>
Amaury Denoyelle1290f1e2022-05-13 14:49:05 +020010#include <haproxy/ncbuf.h>
Amaury Denoyelledeed7772021-12-03 11:36:46 +010011#include <haproxy/pool.h>
Frédéric Lécaille9969adb2023-01-18 11:52:21 +010012#include <haproxy/proxy.h>
Amaury Denoyelled80fbca2022-09-19 17:02:28 +020013#include <haproxy/qmux_http.h>
Amaury Denoyelle36d50bf2022-09-19 16:12:38 +020014#include <haproxy/qmux_trace.h>
Amaury Denoyelle92fa63f2022-09-30 18:11:13 +020015#include <haproxy/quic_conn.h>
Amaury Denoyelle40c24f12023-01-27 17:47:49 +010016#include <haproxy/quic_frame.h>
Amaury Denoyelleb3aa07c2023-01-24 18:20:28 +010017#include <haproxy/quic_sock.h>
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +020018#include <haproxy/quic_stream.h>
Frédéric Lécaille748ece62022-05-21 23:58:40 +020019#include <haproxy/quic_tp-t.h>
Amaury Denoyelleeb01f592021-10-07 16:44:05 +020020#include <haproxy/ssl_sock-t.h>
Willy Tarreaucb086c62022-05-27 09:47:12 +020021#include <haproxy/stconn.h>
Amaury Denoyelle1a2faef2023-05-15 15:17:28 +020022#include <haproxy/time.h>
Amaury Denoyelledd4fbfb2022-03-24 16:09:16 +010023#include <haproxy/trace.h>
Frédéric Lécailledfbae762021-02-18 09:59:01 +010024
Amaury Denoyelledeed7772021-12-03 11:36:46 +010025DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
Frédéric Lécailledfbae762021-02-18 09:59:01 +010026DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
27
Amaury Denoyelled68f8b52023-05-30 15:04:46 +020028static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
Amaury Denoyelle4b167002022-12-12 09:59:50 +010029{
30 struct buffer buf;
31
32 if (ncb_is_null(ncbuf))
33 return;
34
35 buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
36 b_free(&buf);
37 offer_buffers(NULL, 1);
38
39 *ncbuf = NCBUF_NULL;
Amaury Denoyelle217b0f42023-09-21 17:06:16 +020040
41 /* Reset DEM_FULL as buffer is released. This ensures mux is not woken
42 * up from rcv_buf stream callback when demux was previously blocked.
43 */
44 qcs->flags &= ~QC_SF_DEM_FULL;
Amaury Denoyelle4b167002022-12-12 09:59:50 +010045}
46
47/* Free <qcs> instance. This function is reserved for internal usage : it must
48 * only be called on qcs alloc error or on connection shutdown. Else
Ilya Shipitsin07be66d2023-04-01 12:26:42 +020049 * qcs_destroy must be preferred to handle QUIC flow-control increase.
Amaury Denoyelle4b167002022-12-12 09:59:50 +010050 */
51static void qcs_free(struct qcs *qcs)
52{
53 struct qcc *qcc = qcs->qcc;
54
55 TRACE_ENTER(QMUX_EV_QCS_END, qcc->conn, qcs);
56
Amaury Denoyelle15337fd2022-12-20 14:47:16 +010057 /* Safe to use even if already removed from the list. */
58 LIST_DEL_INIT(&qcs->el_opening);
Amaury Denoyelle20f2a422023-01-03 14:39:24 +010059 LIST_DEL_INIT(&qcs->el_send);
Amaury Denoyelle4b167002022-12-12 09:59:50 +010060
61 /* Release stream endpoint descriptor. */
62 BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
63 sedesc_free(qcs->sd);
64
65 /* Release app-layer context. */
66 if (qcs->ctx && qcc->app_ops->detach)
67 qcc->app_ops->detach(qcs);
68
69 /* Release qc_stream_desc buffer from quic-conn layer. */
Amaury Denoyellebbb18202024-01-26 14:41:04 +010070 qc_stream_desc_release(qcs->stream, qcs->tx.sent_offset);
Amaury Denoyelle4b167002022-12-12 09:59:50 +010071
72 /* Free Rx/Tx buffers. */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +020073 qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
Amaury Denoyelle4b167002022-12-12 09:59:50 +010074 b_free(&qcs->tx.buf);
75
Amaury Denoyelle4b167002022-12-12 09:59:50 +010076 /* Remove qcs from qcc tree. */
77 eb64_delete(&qcs->by_id);
78
79 pool_free(pool_head_qcs, qcs);
80
81 TRACE_LEAVE(QMUX_EV_QCS_END, qcc->conn);
82}
83
Amaury Denoyelledeed7772021-12-03 11:36:46 +010084/* Allocate a new QUIC streams with id <id> and type <type>. */
Amaury Denoyellea509ffb2022-07-04 15:50:33 +020085static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
Frédéric Lécailledfbae762021-02-18 09:59:01 +010086{
Amaury Denoyelledeed7772021-12-03 11:36:46 +010087 struct qcs *qcs;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010088
Amaury Denoyelle4f137572022-03-24 17:10:00 +010089 TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
90
Amaury Denoyelledeed7772021-12-03 11:36:46 +010091 qcs = pool_alloc(pool_head_qcs);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +020092 if (!qcs) {
Amaury Denoyellec7fb0d22022-08-10 16:39:54 +020093 TRACE_ERROR("alloc failure", QMUX_EV_QCS_NEW, qcc->conn);
Amaury Denoyelle17014a62022-04-27 15:09:27 +020094 return NULL;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +020095 }
Amaury Denoyelle17014a62022-04-27 15:09:27 +020096
97 qcs->stream = NULL;
98 qcs->qcc = qcc;
Amaury Denoyelle17014a62022-04-27 15:09:27 +020099 qcs->flags = QC_SF_NONE;
Amaury Denoyelle38e60062022-07-01 16:48:42 +0200100 qcs->st = QC_SS_IDLE;
Amaury Denoyelle47447af2022-04-27 15:17:11 +0200101 qcs->ctx = NULL;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100102
Christopher Faulet543dd972024-05-23 10:45:28 +0200103 qcs->sd = sedesc_new();
104 if (!qcs->sd)
105 goto err;
106 qcs->sd->se = qcs;
107 qcs->sd->conn = qcc->conn;
108 se_fl_set(qcs->sd, SE_FL_T_MUX | SE_FL_ORPHAN | SE_FL_NOT_FIRST);
109 se_expect_no_data(qcs->sd);
110
Amaury Denoyelle30e260e2022-08-03 11:17:57 +0200111 /* App callback attach may register the stream for http-request wait.
112 * These fields must be initialed before.
113 */
114 LIST_INIT(&qcs->el_opening);
Amaury Denoyelle20f2a422023-01-03 14:39:24 +0100115 LIST_INIT(&qcs->el_send);
Amaury Denoyelle30e260e2022-08-03 11:17:57 +0200116 qcs->start = TICK_ETERNITY;
117
Amaury Denoyelle4b167002022-12-12 09:59:50 +0100118 /* store transport layer stream descriptor in qcc tree */
119 qcs->id = qcs->by_id.key = id;
120 eb64_insert(&qcc->streams_by_id, &qcs->by_id);
121
Amaury Denoyelle6ea78192022-03-07 15:47:02 +0100122 /* If stream is local, use peer remote-limit, or else the opposite. */
Amaury Denoyelle176174f2022-10-21 17:02:18 +0200123 if (quic_stream_is_bidi(id)) {
124 qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r :
125 qcc->rfctl.msd_bidi_l;
126 }
127 else if (quic_stream_is_local(qcc, id)) {
128 qcs->tx.msd = qcc->rfctl.msd_uni_l;
129 }
Amaury Denoyelle6ea78192022-03-07 15:47:02 +0100130
Amaury Denoyelled323ab32023-10-09 16:15:20 +0200131 /* Properly set flow-control blocking if initial MSD is nul. */
132 if (!qcs->tx.msd)
133 qcs->flags |= QC_SF_BLK_SFCTL;
134
Amaury Denoyelle1290f1e2022-05-13 14:49:05 +0200135 qcs->rx.ncbuf = NCBUF_NULL;
Amaury Denoyelle9a327a72022-02-14 17:11:09 +0100136 qcs->rx.app_buf = BUF_NULL;
Amaury Denoyelled46b0f52022-05-20 15:05:07 +0200137 qcs->rx.offset = qcs->rx.offset_max = 0;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100138
Amaury Denoyelle176174f2022-10-21 17:02:18 +0200139 if (quic_stream_is_bidi(id)) {
140 qcs->rx.msd = quic_stream_is_local(qcc, id) ? qcc->lfctl.msd_bidi_l :
141 qcc->lfctl.msd_bidi_r;
142 }
143 else if (quic_stream_is_remote(qcc, id)) {
144 qcs->rx.msd = qcc->lfctl.msd_uni_r;
145 }
Amaury Denoyellea9773552022-05-16 14:38:25 +0200146 qcs->rx.msd_init = qcs->rx.msd;
Amaury Denoyelle44d09122022-04-26 11:21:10 +0200147
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100148 qcs->tx.buf = BUF_NULL;
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100149 qcs->tx.offset = 0;
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +0100150 qcs->tx.sent_offset = 0;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100151
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100152 qcs->wait_event.tasklet = NULL;
153 qcs->wait_event.events = 0;
154 qcs->subs = NULL;
155
Amaury Denoyelle843a1192022-07-04 11:44:38 +0200156 qcs->err = 0;
157
Amaury Denoyelle87386502023-10-11 17:32:04 +0200158 /* Allocate transport layer stream descriptor. Only needed for TX. */
159 if (!quic_stream_is_uni(id) || !quic_stream_is_remote(qcc, id)) {
160 struct quic_conn *qc = qcc->conn->handle.qc;
161 qcs->stream = qc_stream_desc_new(id, type, qcs, qc);
162 if (!qcs->stream) {
163 TRACE_ERROR("qc_stream_desc alloc failure", QMUX_EV_QCS_NEW, qcc->conn, qcs);
164 goto err;
165 }
166 }
167
Amaury Denoyelle3d550842023-01-24 17:42:21 +0100168 if (qcc->app_ops->attach && qcc->app_ops->attach(qcs, qcc->ctx)) {
169 TRACE_ERROR("app proto failure", QMUX_EV_QCS_NEW, qcc->conn, qcs);
170 goto err;
171 }
172
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100173 out:
Amaury Denoyelle4f137572022-03-24 17:10:00 +0100174 TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn, qcs);
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100175 return qcs;
Amaury Denoyelle17014a62022-04-27 15:09:27 +0200176
177 err:
Amaury Denoyelle4b167002022-12-12 09:59:50 +0100178 qcs_free(qcs);
179 TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
Amaury Denoyelle17014a62022-04-27 15:09:27 +0200180 return NULL;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100181}
182
Amaury Denoyelle3abeb572022-07-04 11:42:27 +0200183static forceinline struct stconn *qcs_sc(const struct qcs *qcs)
184{
185 return qcs->sd ? qcs->sd->sc : NULL;
186}
187
Amaury Denoyellebd6ec1b2022-07-25 11:53:18 +0200188/* Reset the <qcc> inactivity timeout for http-keep-alive timeout. */
189static forceinline void qcc_reset_idle_start(struct qcc *qcc)
190{
191 qcc->idle_start = now_ms;
192}
193
Amaury Denoyellec603de42022-07-25 11:21:46 +0200194/* Decrement <qcc> sc. */
195static forceinline void qcc_rm_sc(struct qcc *qcc)
196{
Amaury Denoyelle8d6d2462023-05-11 16:55:30 +0200197 BUG_ON(!qcc->nb_sc); /* Ensure sc count is always valid (ie >=0). */
Amaury Denoyellec603de42022-07-25 11:21:46 +0200198 --qcc->nb_sc;
Amaury Denoyellebd6ec1b2022-07-25 11:53:18 +0200199
200 /* Reset qcc idle start for http-keep-alive timeout. Timeout will be
201 * refreshed after this on stream detach.
202 */
203 if (!qcc->nb_sc && !qcc->nb_hreq)
204 qcc_reset_idle_start(qcc);
Amaury Denoyellec603de42022-07-25 11:21:46 +0200205}
206
207/* Decrement <qcc> hreq. */
208static forceinline void qcc_rm_hreq(struct qcc *qcc)
209{
Amaury Denoyelle8d6d2462023-05-11 16:55:30 +0200210 BUG_ON(!qcc->nb_hreq); /* Ensure http req count is always valid (ie >=0). */
Amaury Denoyellec603de42022-07-25 11:21:46 +0200211 --qcc->nb_hreq;
Amaury Denoyellebd6ec1b2022-07-25 11:53:18 +0200212
213 /* Reset qcc idle start for http-keep-alive timeout. Timeout will be
214 * refreshed after this on I/O handler.
215 */
216 if (!qcc->nb_sc && !qcc->nb_hreq)
217 qcc_reset_idle_start(qcc);
Amaury Denoyellec603de42022-07-25 11:21:46 +0200218}
219
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200220static inline int qcc_is_dead(const struct qcc *qcc)
221{
Amaury Denoyelle51f116d2023-05-04 15:49:02 +0200222 /* Maintain connection if stream endpoints are still active. */
223 if (qcc->nb_sc)
224 return 0;
225
226 /* Connection considered dead if either :
227 * - remote error detected at tranport level
228 * - error detected locally
Amaury Denoyelleca7a0632023-10-26 18:17:29 +0200229 * - MUX timeout expired
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200230 */
Amaury Denoyelle5f67b172023-05-04 18:52:42 +0200231 if (qcc->flags & (QC_CF_ERR_CONN|QC_CF_ERRL_DONE) ||
Amaury Denoyelle51f116d2023-05-04 15:49:02 +0200232 !qcc->task) {
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200233 return 1;
Amaury Denoyelle51f116d2023-05-04 15:49:02 +0200234 }
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200235
236 return 0;
237}
238
239/* Return true if the mux timeout should be armed. */
240static inline int qcc_may_expire(struct qcc *qcc)
241{
242 return !qcc->nb_sc;
243}
244
245/* Refresh the timeout on <qcc> if needed depending on its state. */
246static void qcc_refresh_timeout(struct qcc *qcc)
247{
248 const struct proxy *px = qcc->proxy;
249
250 TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
251
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200252 if (!qcc->task) {
253 TRACE_DEVEL("already expired", QMUX_EV_QCC_WAKE, qcc->conn);
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200254 goto leave;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200255 }
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200256
Amaury Denoyelle6ec98372022-08-01 17:59:38 +0200257 /* Check if upper layer is responsible of timeout management. */
258 if (!qcc_may_expire(qcc)) {
259 TRACE_DEVEL("not eligible for timeout", QMUX_EV_QCC_WAKE, qcc->conn);
260 qcc->task->expire = TICK_ETERNITY;
261 task_queue(qcc->task);
262 goto leave;
263 }
264
Amaury Denoyelle6ec98372022-08-01 17:59:38 +0200265 /* Frontend timeout management
Amaury Denoyelleeb7d3202023-02-08 15:55:24 +0100266 * - shutdown done -> timeout client-fin
Amaury Denoyelle6ec98372022-08-01 17:59:38 +0200267 * - detached streams with data left to send -> default timeout
Amaury Denoyelle30e260e2022-08-03 11:17:57 +0200268 * - stream waiting on incomplete request or no stream yet activated -> timeout http-request
Amaury Denoyelle6ec98372022-08-01 17:59:38 +0200269 * - idle after stream processing -> timeout http-keep-alive
Amaury Denoyelleb3aa07c2023-01-24 18:20:28 +0100270 *
271 * If proxy stop-stop in progress, immediate or spread close will be
272 * processed if shutdown already one or connection is idle.
Amaury Denoyelle6ec98372022-08-01 17:59:38 +0200273 */
274 if (!conn_is_back(qcc->conn)) {
Amaury Denoyelleeb7d3202023-02-08 15:55:24 +0100275 if (qcc->nb_hreq && !(qcc->flags & QC_CF_APP_SHUT)) {
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200276 TRACE_DEVEL("one or more requests still in progress", QMUX_EV_QCC_WAKE, qcc->conn);
277 qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
Amaury Denoyelle6ec98372022-08-01 17:59:38 +0200278 task_queue(qcc->task);
279 goto leave;
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200280 }
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200281
Amaury Denoyelleeb7d3202023-02-08 15:55:24 +0100282 if ((!LIST_ISEMPTY(&qcc->opening_list) || unlikely(!qcc->largest_bidi_r)) &&
283 !(qcc->flags & QC_CF_APP_SHUT)) {
Amaury Denoyelle30e260e2022-08-03 11:17:57 +0200284 int timeout = px->timeout.httpreq;
285 struct qcs *qcs = NULL;
286 int base_time;
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200287
Amaury Denoyelle30e260e2022-08-03 11:17:57 +0200288 /* Use start time of first stream waiting on HTTP or
289 * qcc idle if no stream not yet used.
290 */
291 if (likely(!LIST_ISEMPTY(&qcc->opening_list)))
292 qcs = LIST_ELEM(qcc->opening_list.n, struct qcs *, el_opening);
293 base_time = qcs ? qcs->start : qcc->idle_start;
294
295 TRACE_DEVEL("waiting on http request", QMUX_EV_QCC_WAKE, qcc->conn, qcs);
296 qcc->task->expire = tick_add_ifset(base_time, timeout);
297 }
298 else {
Amaury Denoyelleeb7d3202023-02-08 15:55:24 +0100299 if (qcc->flags & QC_CF_APP_SHUT) {
300 TRACE_DEVEL("connection in closing", QMUX_EV_QCC_WAKE, qcc->conn);
301 qcc->task->expire = tick_add_ifset(now_ms,
302 qcc->shut_timeout);
303 }
304 else {
305 /* Use http-request timeout if keep-alive timeout not set */
306 int timeout = tick_isset(px->timeout.httpka) ?
307 px->timeout.httpka : px->timeout.httpreq;
308 TRACE_DEVEL("at least one request achieved but none currently in progress", QMUX_EV_QCC_WAKE, qcc->conn);
309 qcc->task->expire = tick_add_ifset(qcc->idle_start, timeout);
310 }
Amaury Denoyelleb3aa07c2023-01-24 18:20:28 +0100311
312 /* If proxy soft-stop in progress and connection is
313 * inactive, close the connection immediately. If a
314 * close-spread-time is configured, randomly spread the
315 * timer over a closing window.
316 */
317 if ((qcc->proxy->flags & (PR_FL_DISABLED|PR_FL_STOPPED)) &&
318 !(global.tune.options & GTUNE_DISABLE_ACTIVE_CLOSE)) {
319
320 /* Wake timeout task immediately if window already expired. */
321 int remaining_window = tick_isset(global.close_spread_end) ?
322 tick_remain(now_ms, global.close_spread_end) : 0;
323
324 TRACE_DEVEL("proxy disabled, prepare connection soft-stop", QMUX_EV_QCC_WAKE, qcc->conn);
325 if (remaining_window) {
326 /* We don't need to reset the expire if it would
327 * already happen before the close window end.
328 */
329 if (!tick_isset(qcc->task->expire) ||
330 tick_is_le(global.close_spread_end, qcc->task->expire)) {
331 /* Set an expire value shorter than the current value
332 * because the close spread window end comes earlier.
333 */
334 qcc->task->expire = tick_add(now_ms,
335 statistical_prng_range(remaining_window));
336 }
337 }
338 else {
339 /* We are past the soft close window end, wake the timeout
340 * task up immediately.
341 */
342 qcc->task->expire = now_ms;
343 task_wakeup(qcc->task, TASK_WOKEN_TIMER);
344 }
345 }
Amaury Denoyelle30e260e2022-08-03 11:17:57 +0200346 }
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200347 }
Amaury Denoyelle6ec98372022-08-01 17:59:38 +0200348
349 /* fallback to default timeout if frontend specific undefined or for
350 * backend connections.
351 */
352 if (!tick_isset(qcc->task->expire)) {
353 TRACE_DEVEL("fallback to default timeout", QMUX_EV_QCC_WAKE, qcc->conn);
354 qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200355 }
356
Amaury Denoyelle418ba212022-08-02 15:57:16 +0200357 task_queue(qcc->task);
358
359 leave:
360 TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
361}
362
Amaury Denoyelle38e60062022-07-01 16:48:42 +0200363/* Mark a stream as open if it was idle. This can be used on every
364 * successful emission/reception operation to update the stream state.
365 */
366static void qcs_idle_open(struct qcs *qcs)
367{
368 /* This operation must not be used if the stream is already closed. */
369 BUG_ON_HOT(qcs->st == QC_SS_CLO);
370
371 if (qcs->st == QC_SS_IDLE) {
Amaury Denoyelle047d86a2022-08-10 16:42:35 +0200372 TRACE_STATE("opening stream", QMUX_EV_QCS_NEW, qcs->qcc->conn, qcs);
Amaury Denoyelle38e60062022-07-01 16:48:42 +0200373 qcs->st = QC_SS_OPEN;
Amaury Denoyelle38e60062022-07-01 16:48:42 +0200374 }
375}
376
377/* Close the local channel of <qcs> instance. */
378static void qcs_close_local(struct qcs *qcs)
379{
Amaury Denoyelle047d86a2022-08-10 16:42:35 +0200380 TRACE_STATE("closing stream locally", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
381
Amaury Denoyelle38e60062022-07-01 16:48:42 +0200382 /* The stream must have already been opened. */
383 BUG_ON_HOT(qcs->st == QC_SS_IDLE);
384
385 /* This operation cannot be used multiple times. */
386 BUG_ON_HOT(qcs->st == QC_SS_HLOC || qcs->st == QC_SS_CLO);
387
388 if (quic_stream_is_bidi(qcs->id)) {
389 qcs->st = (qcs->st == QC_SS_HREM) ? QC_SS_CLO : QC_SS_HLOC;
Amaury Denoyelleafb7b9d2022-09-19 11:58:24 +0200390
391 if (qcs->flags & QC_SF_HREQ_RECV)
392 qcc_rm_hreq(qcs->qcc);
Amaury Denoyelle38e60062022-07-01 16:48:42 +0200393 }
394 else {
395 /* Only local uni streams are valid for this operation. */
396 BUG_ON_HOT(quic_stream_is_remote(qcs->qcc, qcs->id));
397 qcs->st = QC_SS_CLO;
398 }
Amaury Denoyelle38e60062022-07-01 16:48:42 +0200399}
400
401/* Close the remote channel of <qcs> instance. */
402static void qcs_close_remote(struct qcs *qcs)
403{
Amaury Denoyelle047d86a2022-08-10 16:42:35 +0200404 TRACE_STATE("closing stream remotely", QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
405
Amaury Denoyelle38e60062022-07-01 16:48:42 +0200406 /* The stream must have already been opened. */
407 BUG_ON_HOT(qcs->st == QC_SS_IDLE);
408
409 /* This operation cannot be used multiple times. */
410 BUG_ON_HOT(qcs->st == QC_SS_HREM || qcs->st == QC_SS_CLO);
411
412 if (quic_stream_is_bidi(qcs->id)) {
413 qcs->st = (qcs->st == QC_SS_HLOC) ? QC_SS_CLO : QC_SS_HREM;
414 }
415 else {
416 /* Only remote uni streams are valid for this operation. */
417 BUG_ON_HOT(quic_stream_is_local(qcs->qcc, qcs->id));
418 qcs->st = QC_SS_CLO;
419 }
Amaury Denoyelle38e60062022-07-01 16:48:42 +0200420}
421
422static int qcs_is_close_local(struct qcs *qcs)
423{
424 return qcs->st == QC_SS_HLOC || qcs->st == QC_SS_CLO;
425}
426
Amaury Denoyelle6eb3c4b2022-12-09 16:26:03 +0100427static int qcs_is_close_remote(struct qcs *qcs)
Amaury Denoyelle38e60062022-07-01 16:48:42 +0200428{
429 return qcs->st == QC_SS_HREM || qcs->st == QC_SS_CLO;
430}
431
Amaury Denoyelle0abde9d2023-05-11 16:52:17 +0200432/* Allocate if needed buffer <bptr> for stream <qcs>.
433 *
434 * Returns the buffer instance or NULL on allocation failure.
435 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +0200436struct buffer *qcs_get_buf(struct qcs *qcs, struct buffer *bptr)
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100437{
Amaury Denoyelle0abde9d2023-05-11 16:52:17 +0200438 return b_alloc(bptr);
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100439}
440
Amaury Denoyelled00b3092023-05-11 17:00:54 +0200441/* Allocate if needed buffer <ncbuf> for stream <qcs>.
442 *
443 * Returns the buffer instance or NULL on allocation failure.
444 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +0200445static struct ncbuf *qcs_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
Amaury Denoyelle1290f1e2022-05-13 14:49:05 +0200446{
447 struct buffer buf = BUF_NULL;
448
449 if (ncb_is_null(ncbuf)) {
Amaury Denoyelled00b3092023-05-11 17:00:54 +0200450 if (!b_alloc(&buf))
451 return NULL;
Amaury Denoyelle1290f1e2022-05-13 14:49:05 +0200452
453 *ncbuf = ncb_make(buf.area, buf.size, 0);
454 ncb_init(ncbuf, 0);
455 }
456
457 return ncbuf;
458}
459
Ilya Shipitsin3b64a282022-07-29 22:26:53 +0500460/* Notify an eventual subscriber on <qcs> or else wakeup up the stconn layer if
Amaury Denoyelle4561f842022-07-06 14:54:34 +0200461 * initialized.
462 */
463static void qcs_alert(struct qcs *qcs)
464{
465 if (qcs->subs) {
466 qcs_notify_recv(qcs);
467 qcs_notify_send(qcs);
468 }
469 else if (qcs_sc(qcs) && qcs->sd->sc->app_ops->wake) {
Amaury Denoyelle2d5c3f52023-05-11 13:41:41 +0200470 TRACE_POINT(QMUX_EV_STRM_WAKE, qcs->qcc->conn, qcs);
Amaury Denoyelle4561f842022-07-06 14:54:34 +0200471 qcs->sd->sc->app_ops->wake(qcs->sd->sc);
472 }
473}
474
Amaury Denoyellea3f222d2021-12-06 11:24:00 +0100475int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
476{
Amaury Denoyelle4f137572022-03-24 17:10:00 +0100477 struct qcc *qcc = qcs->qcc;
478
479 TRACE_ENTER(QMUX_EV_STRM_SEND|QMUX_EV_STRM_RECV, qcc->conn, qcs);
Amaury Denoyellea3f222d2021-12-06 11:24:00 +0100480
481 BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
482 BUG_ON(qcs->subs && qcs->subs != es);
483
484 es->events |= event_type;
485 qcs->subs = es;
486
Amaury Denoyelle4f137572022-03-24 17:10:00 +0100487 if (event_type & SUB_RETRY_RECV)
488 TRACE_DEVEL("subscribe(recv)", QMUX_EV_STRM_RECV, qcc->conn, qcs);
489
490 if (event_type & SUB_RETRY_SEND)
491 TRACE_DEVEL("subscribe(send)", QMUX_EV_STRM_SEND, qcc->conn, qcs);
492
493 TRACE_LEAVE(QMUX_EV_STRM_SEND|QMUX_EV_STRM_RECV, qcc->conn, qcs);
494
Amaury Denoyellea3f222d2021-12-06 11:24:00 +0100495 return 0;
496}
497
498void qcs_notify_recv(struct qcs *qcs)
499{
500 if (qcs->subs && qcs->subs->events & SUB_RETRY_RECV) {
Amaury Denoyelle2d5c3f52023-05-11 13:41:41 +0200501 TRACE_POINT(QMUX_EV_STRM_WAKE, qcs->qcc->conn, qcs);
Amaury Denoyellea3f222d2021-12-06 11:24:00 +0100502 tasklet_wakeup(qcs->subs->tasklet);
503 qcs->subs->events &= ~SUB_RETRY_RECV;
504 if (!qcs->subs->events)
505 qcs->subs = NULL;
506 }
507}
508
509void qcs_notify_send(struct qcs *qcs)
510{
511 if (qcs->subs && qcs->subs->events & SUB_RETRY_SEND) {
Amaury Denoyelle2d5c3f52023-05-11 13:41:41 +0200512 TRACE_POINT(QMUX_EV_STRM_WAKE, qcs->qcc->conn, qcs);
Amaury Denoyellea3f222d2021-12-06 11:24:00 +0100513 tasklet_wakeup(qcs->subs->tasklet);
514 qcs->subs->events &= ~SUB_RETRY_SEND;
515 if (!qcs->subs->events)
516 qcs->subs = NULL;
517 }
518}
519
Amaury Denoyelle51f116d2023-05-04 15:49:02 +0200520/* A fatal error is detected locally for <qcc> connection. It should be closed
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200521 * with a CONNECTION_CLOSE using <err> code. Set <app> to true to indicate that
522 * the code must be considered as an application level error. This function
523 * must not be called more than once by connection.
Amaury Denoyelle51f116d2023-05-04 15:49:02 +0200524 */
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200525void qcc_set_error(struct qcc *qcc, int err, int app)
Amaury Denoyelle51f116d2023-05-04 15:49:02 +0200526{
527 /* This must not be called multiple times per connection. */
528 BUG_ON(qcc->flags & QC_CF_ERRL);
529
530 TRACE_STATE("connection on error", QMUX_EV_QCC_ERR, qcc->conn);
531
532 qcc->flags |= QC_CF_ERRL;
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200533 qcc->err = app ? quic_err_app(err) : quic_err_transport(err);
Amaury Denoyelleda24bcf2023-05-09 18:20:45 +0200534
535 /* TODO
Amaury Denoyelled68f8b52023-05-30 15:04:46 +0200536 * Ensure qcc_io_send() will be conducted to convert QC_CF_ERRL in
Amaury Denoyelleda24bcf2023-05-09 18:20:45 +0200537 * QC_CF_ERRL_DONE with CONNECTION_CLOSE frame emission. This may be
538 * unnecessary if we are currently in the MUX tasklet context, but it
539 * is too tedious too not forget a wakeup outside of this function for
540 * the moment.
541 */
542 tasklet_wakeup(qcc->wait_event.tasklet);
Amaury Denoyelle51f116d2023-05-04 15:49:02 +0200543}
544
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200545/* Open a locally initiated stream for the connection <qcc>. Set <bidi> for a
546 * bidirectional stream, else an unidirectional stream is opened. The next
547 * available ID on the connection will be used according to the stream type.
548 *
549 * Returns the allocated stream instance or NULL on error.
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100550 */
Amaury Denoyelleb1437232022-07-08 11:53:22 +0200551struct qcs *qcc_init_stream_local(struct qcc *qcc, int bidi)
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100552{
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200553 struct qcs *qcs;
554 enum qcs_type type;
555 uint64_t *next;
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100556
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200557 TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
558
559 if (bidi) {
560 next = &qcc->next_bidi_l;
561 type = conn_is_back(qcc->conn) ? QCS_CLT_BIDI : QCS_SRV_BIDI;
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100562 }
563 else {
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200564 next = &qcc->next_uni_l;
565 type = conn_is_back(qcc->conn) ? QCS_CLT_UNI : QCS_SRV_UNI;
566 }
567
568 /* TODO ensure that we won't overflow remote peer flow control limit on
569 * streams. Else, we should emit a STREAMS_BLOCKED frame.
570 */
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100571
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200572 qcs = qcs_new(qcc, *next, type);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200573 if (!qcs) {
574 TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200575 qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200576 return NULL;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200577 }
Amaury Denoyellec055e302022-02-07 16:09:06 +0100578
Amaury Denoyelle047d86a2022-08-10 16:42:35 +0200579 TRACE_PROTO("opening local stream", QMUX_EV_QCS_NEW, qcc->conn, qcs);
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200580 *next += 4;
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100581
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200582 TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn, qcs);
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200583 return qcs;
584}
585
586/* Open a remote initiated stream for the connection <qcc> with ID <id>. The
587 * caller is responsible to ensure that a stream with the same ID was not
588 * already opened. This function will also create all intermediaries streams
589 * with ID smaller than <id> not already opened before.
590 *
591 * Returns the allocated stream instance or NULL on error.
592 */
Amaury Denoyelleb1437232022-07-08 11:53:22 +0200593static struct qcs *qcc_init_stream_remote(struct qcc *qcc, uint64_t id)
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200594{
595 struct qcs *qcs = NULL;
596 enum qcs_type type;
Amaury Denoyellebf3c2082022-08-16 11:29:08 +0200597 uint64_t *largest, max_id;
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100598
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200599 TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
Amaury Denoyelle7272cd72022-03-29 15:15:54 +0200600
Amaury Denoyelle8d6d2462023-05-11 16:55:30 +0200601 /* Function reserved to remote stream IDs. */
602 BUG_ON(quic_stream_is_local(qcc, id));
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100603
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200604 if (quic_stream_is_bidi(id)) {
605 largest = &qcc->largest_bidi_r;
606 type = conn_is_back(qcc->conn) ? QCS_SRV_BIDI : QCS_CLT_BIDI;
607 }
608 else {
609 largest = &qcc->largest_uni_r;
610 type = conn_is_back(qcc->conn) ? QCS_SRV_UNI : QCS_CLT_UNI;
611 }
612
Amaury Denoyellebf3c2082022-08-16 11:29:08 +0200613 /* RFC 9000 4.6. Controlling Concurrency
614 *
615 * An endpoint that receives a frame with a stream ID exceeding the
616 * limit it has sent MUST treat this as a connection error of type
617 * STREAM_LIMIT_ERROR
618 */
619 max_id = quic_stream_is_bidi(id) ? qcc->lfctl.ms_bidi * 4 :
620 qcc->lfctl.ms_uni * 4;
621 if (id >= max_id) {
622 TRACE_ERROR("flow control error", QMUX_EV_QCS_NEW|QMUX_EV_PROTO_ERR, qcc->conn);
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200623 qcc_set_error(qcc, QC_ERR_STREAM_LIMIT_ERROR, 0);
Amaury Denoyellebf3c2082022-08-16 11:29:08 +0200624 goto err;
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200625 }
626
627 /* Only stream ID not already opened can be used. */
628 BUG_ON(id < *largest);
629
630 while (id >= *largest) {
Amaury Denoyellefd79ddb2022-08-16 11:13:45 +0200631 const char *str = *largest < id ? "initializing intermediary remote stream" :
632 "initializing remote stream";
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200633
634 qcs = qcs_new(qcc, *largest, type);
635 if (!qcs) {
Amaury Denoyellec7fb0d22022-08-10 16:39:54 +0200636 TRACE_ERROR("stream fallocation failure", QMUX_EV_QCS_NEW, qcc->conn);
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200637 qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200638 goto err;
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100639 }
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200640
Amaury Denoyelle047d86a2022-08-10 16:42:35 +0200641 TRACE_PROTO(str, QMUX_EV_QCS_NEW, qcc->conn, qcs);
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200642 *largest += 4;
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100643 }
644
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200645 out:
646 TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn, qcs);
Amaury Denoyelle50742292022-03-29 14:57:19 +0200647 return qcs;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200648
649 err:
650 TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
651 return NULL;
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200652}
653
Amaury Denoyelled68f8b52023-05-30 15:04:46 +0200654struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin)
Amaury Denoyelle1a2faef2023-05-15 15:17:28 +0200655{
656 struct qcc *qcc = qcs->qcc;
657 struct session *sess = qcc->conn->owner;
658
Amaury Denoyelle1a2faef2023-05-15 15:17:28 +0200659
660 /* TODO duplicated from mux_h2 */
661 sess->t_idle = ns_to_ms(now_ns - sess->accept_ts) - sess->t_handshake;
662
663 if (!sc_new_from_endp(qcs->sd, sess, buf))
664 return NULL;
665
666 /* QC_SF_HREQ_RECV must be set once for a stream. Else, nb_hreq counter
667 * will be incorrect for the connection.
668 */
669 BUG_ON_HOT(qcs->flags & QC_SF_HREQ_RECV);
670 qcs->flags |= QC_SF_HREQ_RECV;
671 ++qcc->nb_sc;
672 ++qcc->nb_hreq;
673
674 /* TODO duplicated from mux_h2 */
675 sess->accept_date = date;
676 sess->accept_ts = now_ns;
677 sess->t_handshake = 0;
678 sess->t_idle = 0;
679
680 /* A stream must have been registered for HTTP wait before attaching
681 * it to sedesc. See <qcs_wait_http_req> for more info.
682 */
683 BUG_ON_HOT(!LIST_INLIST(&qcs->el_opening));
684 LIST_DEL_INIT(&qcs->el_opening);
685
Amaury Denoyellebf86d892023-05-12 18:16:31 +0200686 if (fin) {
687 TRACE_STATE("report end-of-input", QMUX_EV_STRM_RECV, qcc->conn, qcs);
Amaury Denoyellebfddb422023-05-25 15:02:24 +0200688 se_fl_set(qcs->sd, SE_FL_EOI);
Amaury Denoyellebf86d892023-05-12 18:16:31 +0200689 }
690
Amaury Denoyelle67585762023-12-13 16:28:28 +0100691 /* A QCS can be already locally closed before stream layer
692 * instantiation. This notably happens if STOP_SENDING was the first
693 * frame received for this instance. In this case, an error is
694 * immediately to the stream layer to prevent transmission.
695 *
696 * TODO it could be better to not instantiate at all the stream layer.
697 * However, extra care is required to ensure QCS instance is released.
698 */
699 if (unlikely(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET))) {
700 TRACE_STATE("report early error", QMUX_EV_STRM_RECV, qcc->conn, qcs);
701 se_fl_set_error(qcs->sd);
702 }
703
Amaury Denoyelle1a2faef2023-05-15 15:17:28 +0200704 return qcs->sd->sc;
705}
706
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200707/* Use this function for a stream <id> which is not in <qcc> stream tree. It
708 * returns true if the associated stream is closed.
709 */
710static int qcc_stream_id_is_closed(struct qcc *qcc, uint64_t id)
711{
712 uint64_t *largest;
713
714 /* This function must only be used for stream not present in the stream tree. */
715 BUG_ON_HOT(eb64_lookup(&qcc->streams_by_id, id));
716
717 if (quic_stream_is_local(qcc, id)) {
718 largest = quic_stream_is_uni(id) ? &qcc->next_uni_l :
719 &qcc->next_bidi_l;
720 }
721 else {
722 largest = quic_stream_is_uni(id) ? &qcc->largest_uni_r :
723 &qcc->largest_bidi_r;
724 }
725
726 return id < *largest;
727}
728
729/* Retrieve the stream instance from <id> ID. This can be used when receiving
730 * STREAM, STREAM_DATA_BLOCKED, RESET_STREAM, MAX_STREAM_DATA or STOP_SENDING
Amaury Denoyelle5fbb8692022-07-06 15:43:21 +0200731 * frames. Set to false <receive_only> or <send_only> if these particular types
Amaury Denoyelle57161b72022-07-07 15:02:32 +0200732 * of streams are not allowed. If the stream instance is found, it is stored in
733 * <out>.
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200734 *
Amaury Denoyelle57161b72022-07-07 15:02:32 +0200735 * Returns 0 on success else non-zero. On error, a RESET_STREAM or a
736 * CONNECTION_CLOSE is automatically emitted. Beware that <out> may be NULL
737 * on success if the stream has already been closed.
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200738 */
Amaury Denoyelle57161b72022-07-07 15:02:32 +0200739int qcc_get_qcs(struct qcc *qcc, uint64_t id, int receive_only, int send_only,
740 struct qcs **out)
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200741{
742 struct eb64_node *node;
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200743
744 TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelle57161b72022-07-07 15:02:32 +0200745 *out = NULL;
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200746
Amaury Denoyelle5fbb8692022-07-06 15:43:21 +0200747 if (!receive_only && quic_stream_is_uni(id) && quic_stream_is_remote(qcc, id)) {
Amaury Denoyellec7fb0d22022-08-10 16:39:54 +0200748 TRACE_ERROR("receive-only stream not allowed", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS|QMUX_EV_PROTO_ERR, qcc->conn, NULL, &id);
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200749 qcc_set_error(qcc, QC_ERR_STREAM_STATE_ERROR, 0);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200750 goto err;
Amaury Denoyelle5fbb8692022-07-06 15:43:21 +0200751 }
752
753 if (!send_only && quic_stream_is_uni(id) && quic_stream_is_local(qcc, id)) {
Amaury Denoyellec7fb0d22022-08-10 16:39:54 +0200754 TRACE_ERROR("send-only stream not allowed", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS|QMUX_EV_PROTO_ERR, qcc->conn, NULL, &id);
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200755 qcc_set_error(qcc, QC_ERR_STREAM_STATE_ERROR, 0);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200756 goto err;
Amaury Denoyelle5fbb8692022-07-06 15:43:21 +0200757 }
758
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200759 /* Search the stream in the connection tree. */
760 node = eb64_lookup(&qcc->streams_by_id, id);
761 if (node) {
Amaury Denoyelle57161b72022-07-07 15:02:32 +0200762 *out = eb64_entry(node, struct qcs, by_id);
763 TRACE_DEVEL("using stream from connection tree", QMUX_EV_QCC_RECV, qcc->conn, *out);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200764 goto out;
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200765 }
766
767 /* Check if stream is already closed. */
768 if (qcc_stream_id_is_closed(qcc, id)) {
Amaury Denoyelle047d86a2022-08-10 16:42:35 +0200769 TRACE_DATA("already closed stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
Amaury Denoyelle57161b72022-07-07 15:02:32 +0200770 /* Consider this as a success even if <out> is left NULL. */
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200771 goto out;
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200772 }
773
774 /* Create the stream. This is valid only for remote initiated one. A
Ilya Shipitsin3b64a282022-07-29 22:26:53 +0500775 * local stream must have already been explicitly created by the
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200776 * application protocol layer.
777 */
778 if (quic_stream_is_local(qcc, id)) {
779 /* RFC 9000 19.8. STREAM Frames
780 *
781 * An endpoint MUST terminate the connection with error
782 * STREAM_STATE_ERROR if it receives a STREAM frame for a locally
783 * initiated stream that has not yet been created, or for a send-only
784 * stream.
785 */
Amaury Denoyellec7fb0d22022-08-10 16:39:54 +0200786 TRACE_ERROR("locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS|QMUX_EV_PROTO_ERR, qcc->conn, NULL, &id);
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200787 qcc_set_error(qcc, QC_ERR_STREAM_STATE_ERROR, 0);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200788 goto err;
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200789 }
790 else {
791 /* Remote stream not found - try to open it. */
Amaury Denoyelle57161b72022-07-07 15:02:32 +0200792 *out = qcc_init_stream_remote(qcc, id);
793 if (!*out) {
Amaury Denoyellec7fb0d22022-08-10 16:39:54 +0200794 TRACE_ERROR("stream creation error", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200795 goto err;
Amaury Denoyelle57161b72022-07-07 15:02:32 +0200796 }
Amaury Denoyellea509ffb2022-07-04 15:50:33 +0200797 }
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100798
799 out:
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200800 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn, *out);
Amaury Denoyelle57161b72022-07-07 15:02:32 +0200801 return 0;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200802
803 err:
804 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
805 return 1;
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100806}
807
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200808/* Simple function to duplicate a buffer */
809static inline struct buffer qcs_b_dup(const struct ncbuf *b)
810{
811 return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
812}
813
Amaury Denoyelle36d4b5e2022-07-01 11:25:40 +0200814/* Remove <bytes> from <qcs> Rx buffer. Flow-control for received offsets may
815 * be allocated for the peer if needed.
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200816 */
817static void qcs_consume(struct qcs *qcs, uint64_t bytes)
818{
819 struct qcc *qcc = qcs->qcc;
820 struct quic_frame *frm;
821 struct ncbuf *buf = &qcs->rx.ncbuf;
822 enum ncb_ret ret;
823
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +0200824 TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
825
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200826 ret = ncb_advance(buf, bytes);
827 if (ret) {
828 ABORT_NOW(); /* should not happens because removal only in data */
829 }
830
831 if (ncb_is_empty(buf))
Amaury Denoyelled68f8b52023-05-30 15:04:46 +0200832 qcs_free_ncbuf(qcs, buf);
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200833
834 qcs->rx.offset += bytes;
Amaury Denoyellebb6296c2022-12-09 15:00:17 +0100835 /* Not necessary to emit a MAX_STREAM_DATA if all data received. */
836 if (qcs->flags & QC_SF_SIZE_KNOWN)
837 goto conn_fctl;
838
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200839 if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +0200840 TRACE_DATA("increase stream credit via MAX_STREAM_DATA", QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelle40c24f12023-01-27 17:47:49 +0100841 frm = qc_frm_alloc(QUIC_FT_MAX_STREAM_DATA);
Amaury Denoyelleabbb5ad2023-03-09 10:16:38 +0100842 if (!frm) {
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200843 qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
Amaury Denoyelleabbb5ad2023-03-09 10:16:38 +0100844 return;
845 }
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200846
847 qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
848
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200849 frm->max_stream_data.id = qcs->id;
850 frm->max_stream_data.max_stream_data = qcs->rx.msd;
851
852 LIST_APPEND(&qcc->lfctl.frms, &frm->list);
853 tasklet_wakeup(qcc->wait_event.tasklet);
854 }
855
Amaury Denoyellebb6296c2022-12-09 15:00:17 +0100856 conn_fctl:
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200857 qcc->lfctl.offsets_consume += bytes;
858 if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) {
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +0200859 TRACE_DATA("increase conn credit via MAX_DATA", QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelle40c24f12023-01-27 17:47:49 +0100860 frm = qc_frm_alloc(QUIC_FT_MAX_DATA);
Amaury Denoyelleabbb5ad2023-03-09 10:16:38 +0100861 if (!frm) {
Amaury Denoyelle58721f22023-05-09 18:01:09 +0200862 qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
Amaury Denoyelleabbb5ad2023-03-09 10:16:38 +0100863 return;
864 }
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200865
866 qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init;
867
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200868 frm->max_data.max_data = qcc->lfctl.md;
869
870 LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list);
871 tasklet_wakeup(qcs->qcc->wait_event.tasklet);
872 }
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +0200873
874 TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200875}
876
Amaury Denoyelle3a086402022-05-18 11:38:22 +0200877/* Decode the content of STREAM frames already received on the stream instance
878 * <qcs>.
879 *
880 * Returns 0 on success else non-zero.
881 */
882static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
883{
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200884 struct buffer b;
Amaury Denoyelle1f21ebd2022-06-07 17:30:55 +0200885 ssize_t ret;
Amaury Denoyelle6befccd2022-07-01 11:26:04 +0200886 int fin = 0;
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200887
Amaury Denoyelle3a086402022-05-18 11:38:22 +0200888 TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
889
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200890 b = qcs_b_dup(&qcs->rx.ncbuf);
Amaury Denoyelle6befccd2022-07-01 11:26:04 +0200891
Amaury Denoyelled1310f82022-09-16 13:30:59 +0200892 /* Signal FIN to application if STREAM FIN received with all data. */
893 if (qcs_is_close_remote(qcs))
Amaury Denoyelle6befccd2022-07-01 11:26:04 +0200894 fin = 1;
895
Amaury Denoyelle663e8722022-12-09 14:58:28 +0100896 if (!(qcs->flags & QC_SF_READ_ABORTED)) {
897 ret = qcc->app_ops->decode_qcs(qcs, &b, fin);
898 if (ret < 0) {
899 TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
900 goto err;
901 }
Amaury Denoyelle152beee2023-05-24 14:43:43 +0200902
903 if (qcs->flags & QC_SF_TO_RESET) {
904 if (qcs_sc(qcs) && !se_fl_test(qcs->sd, SE_FL_ERROR|SE_FL_ERR_PENDING)) {
905 se_fl_set_error(qcs->sd);
906 qcs_alert(qcs);
907 }
908 }
Amaury Denoyelle663e8722022-12-09 14:58:28 +0100909 }
910 else {
911 TRACE_DATA("ignore read on stream", QMUX_EV_QCS_RECV, qcc->conn, qcs);
912 ret = b_data(&b);
Amaury Denoyelle3a086402022-05-18 11:38:22 +0200913 }
914
Amaury Denoyelle381d8132023-02-17 09:51:20 +0100915 if (ret)
Amaury Denoyelle1f21ebd2022-06-07 17:30:55 +0200916 qcs_consume(qcs, ret);
Amaury Denoyelle381d8132023-02-17 09:51:20 +0100917 if (ret || (!b_data(&b) && fin))
Amaury Denoyelle62eef852022-06-03 16:40:34 +0200918 qcs_notify_recv(qcs);
Amaury Denoyelle3a086402022-05-18 11:38:22 +0200919
920 TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelle3a086402022-05-18 11:38:22 +0200921 return 0;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +0200922
923 err:
924 TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs);
925 return 1;
Amaury Denoyelle3a086402022-05-18 11:38:22 +0200926}
927
Amaury Denoyelle843a1192022-07-04 11:44:38 +0200928/* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. */
929void qcc_reset_stream(struct qcs *qcs, int err)
930{
931 struct qcc *qcc = qcs->qcc;
932
933 if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs))
934 return;
935
Amaury Denoyelle047d86a2022-08-10 16:42:35 +0200936 TRACE_STATE("reset stream", QMUX_EV_QCS_END, qcc->conn, qcs);
Amaury Denoyelle843a1192022-07-04 11:44:38 +0200937 qcs->flags |= QC_SF_TO_RESET;
938 qcs->err = err;
Amaury Denoyelle0a1154a2023-01-06 17:43:11 +0100939
Amaury Denoyelle178fbff2023-03-22 11:17:59 +0100940 /* Remove prepared stream data from connection flow-control calcul. */
941 if (qcs->tx.offset > qcs->tx.sent_offset) {
942 const uint64_t diff = qcs->tx.offset - qcs->tx.sent_offset;
943 BUG_ON(qcc->tx.offsets - diff < qcc->tx.sent_offsets);
944 qcc->tx.offsets -= diff;
945 /* Reset qcs offset to prevent BUG_ON() on qcs_destroy(). */
946 qcs->tx.offset = qcs->tx.sent_offset;
947 }
948
Amaury Denoyelle2f590382023-12-19 11:22:28 +0100949 /* Report send error to stream-endpoint layer. */
950 if (qcs_sc(qcs)) {
951 se_fl_set_error(qcs->sd);
952 qcs_alert(qcs);
953 }
954
Amaury Denoyelle0a1154a2023-01-06 17:43:11 +0100955 qcc_send_stream(qcs, 1);
Amaury Denoyelle843a1192022-07-04 11:44:38 +0200956 tasklet_wakeup(qcc->wait_event.tasklet);
Amaury Denoyelle663e8722022-12-09 14:58:28 +0100957}
958
Amaury Denoyellef9b03262023-01-09 10:34:25 +0100959/* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM.
960 * Set <urg> to 1 if stream content should be treated in priority compared to
961 * other streams.
962 */
963void qcc_send_stream(struct qcs *qcs, int urg)
Amaury Denoyelle20f2a422023-01-03 14:39:24 +0100964{
965 struct qcc *qcc = qcs->qcc;
966
967 TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
968
969 /* Cannot send if already closed. */
970 BUG_ON(qcs_is_close_local(qcs));
971
Amaury Denoyellef9b03262023-01-09 10:34:25 +0100972 if (urg) {
973 LIST_DEL_INIT(&qcs->el_send);
974 LIST_INSERT(&qcc->send_list, &qcs->el_send);
975 }
976 else {
977 if (!LIST_INLIST(&qcs->el_send))
978 LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send);
979 }
Amaury Denoyelle20f2a422023-01-03 14:39:24 +0100980
981 TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
982}
983
Amaury Denoyelle663e8722022-12-09 14:58:28 +0100984/* Prepare for the emission of STOP_SENDING on <qcs>. */
985void qcc_abort_stream_read(struct qcs *qcs)
986{
987 struct qcc *qcc = qcs->qcc;
988
989 TRACE_ENTER(QMUX_EV_QCC_NEW, qcc->conn, qcs);
990
991 if ((qcs->flags & QC_SF_TO_STOP_SENDING) || qcs_is_close_remote(qcs))
992 goto end;
993
994 TRACE_STATE("abort stream read", QMUX_EV_QCS_END, qcc->conn, qcs);
995 qcs->flags |= (QC_SF_TO_STOP_SENDING|QC_SF_READ_ABORTED);
Amaury Denoyelle0a1154a2023-01-06 17:43:11 +0100996
997 qcc_send_stream(qcs, 1);
Amaury Denoyelle663e8722022-12-09 14:58:28 +0100998 tasklet_wakeup(qcc->wait_event.tasklet);
999
1000 end:
1001 TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn, qcs);
Amaury Denoyellef9e190e2022-05-23 16:12:15 +02001002}
1003
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001004/* Install the <app_ops> applicative layer of a QUIC connection on mux <qcc>.
1005 * Returns 0 on success else non-zero.
1006 */
1007int qcc_install_app_ops(struct qcc *qcc, const struct qcc_app_ops *app_ops)
1008{
1009 TRACE_ENTER(QMUX_EV_QCC_NEW, qcc->conn);
1010
Amaury Denoyelleb4d119f2023-01-25 17:44:36 +01001011 if (app_ops->init && !app_ops->init(qcc)) {
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001012 TRACE_ERROR("app ops init error", QMUX_EV_QCC_NEW, qcc->conn);
1013 goto err;
1014 }
1015
1016 TRACE_PROTO("application layer initialized", QMUX_EV_QCC_NEW, qcc->conn);
Amaury Denoyelleb4d119f2023-01-25 17:44:36 +01001017 qcc->app_ops = app_ops;
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001018
Amaury Denoyelle71fd0362023-01-24 17:35:37 +01001019 /* RFC 9114 7.2.4.2. Initialization
1020 *
1021 * Endpoints MUST NOT require any data to be
1022 * received from the peer prior to sending the SETTINGS frame;
1023 * settings MUST be sent as soon as the transport is ready to
1024 * send data.
1025 */
1026 if (qcc->app_ops->finalize) {
1027 if (qcc->app_ops->finalize(qcc->ctx)) {
1028 TRACE_ERROR("app ops finalize error", QMUX_EV_QCC_NEW, qcc->conn);
1029 goto err;
1030 }
1031 tasklet_wakeup(qcc->wait_event.tasklet);
1032 }
1033
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001034 TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn);
1035 return 0;
1036
1037 err:
1038 TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn);
1039 return 1;
1040}
1041
Amaury Denoyelle3a086402022-05-18 11:38:22 +02001042/* Handle a new STREAM frame for stream with id <id>. Payload is pointed by
1043 * <data> with length <len> and represents the offset <offset>. <fin> is set if
1044 * the QUIC frame FIN bit is set.
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001045 *
Amaury Denoyelle57161b72022-07-07 15:02:32 +02001046 * Returns 0 on success else non-zero. On error, the received frame should not
1047 * be acknowledged.
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001048 */
1049int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
Amaury Denoyelle3a086402022-05-18 11:38:22 +02001050 char fin, char *data)
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001051{
1052 struct qcs *qcs;
Amaury Denoyelle1290f1e2022-05-13 14:49:05 +02001053 enum ncb_ret ret;
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001054
Amaury Denoyelle4f137572022-03-24 17:10:00 +01001055 TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
1056
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02001057 if (qcc->flags & QC_CF_ERRL) {
1058 TRACE_DATA("connection on error", QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001059 goto err;
Amaury Denoyelle5c4373a2022-05-24 14:47:48 +02001060 }
1061
Amaury Denoyelle6754d7e2022-05-23 16:12:49 +02001062 /* RFC 9000 19.8. STREAM Frames
1063 *
1064 * An endpoint MUST terminate the connection with error
1065 * STREAM_STATE_ERROR if it receives a STREAM frame for a locally
1066 * initiated stream that has not yet been created, or for a send-only
1067 * stream.
1068 */
Amaury Denoyelle57161b72022-07-07 15:02:32 +02001069 if (qcc_get_qcs(qcc, id, 1, 0, &qcs)) {
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001070 TRACE_DATA("qcs retrieval error", QMUX_EV_QCC_RECV, qcc->conn);
1071 goto err;
Amaury Denoyelle57161b72022-07-07 15:02:32 +02001072 }
1073
1074 if (!qcs) {
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001075 TRACE_DATA("already closed stream", QMUX_EV_QCC_RECV, qcc->conn);
1076 goto out;
Amaury Denoyelle57161b72022-07-07 15:02:32 +02001077 }
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001078
Amaury Denoyellebf91e392022-07-04 10:02:04 +02001079 /* RFC 9000 4.5. Stream Final Size
1080 *
1081 * Once a final size for a stream is known, it cannot change. If a
1082 * RESET_STREAM or STREAM frame is received indicating a change in the
1083 * final size for the stream, an endpoint SHOULD respond with an error
1084 * of type FINAL_SIZE_ERROR; see Section 11 for details on error
1085 * handling.
1086 */
1087 if (qcs->flags & QC_SF_SIZE_KNOWN &&
1088 (offset + len > qcs->rx.offset_max || (fin && offset + len < qcs->rx.offset_max))) {
Amaury Denoyellec7fb0d22022-08-10 16:39:54 +02001089 TRACE_ERROR("final size error", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV|QMUX_EV_PROTO_ERR, qcc->conn, qcs);
Amaury Denoyelle58721f22023-05-09 18:01:09 +02001090 qcc_set_error(qcc, QC_ERR_FINAL_SIZE_ERROR, 0);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001091 goto err;
Amaury Denoyellebf91e392022-07-04 10:02:04 +02001092 }
1093
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001094 if (qcs_is_close_remote(qcs)) {
1095 TRACE_DATA("skipping STREAM for remotely closed", QMUX_EV_QCC_RECV, qcc->conn);
1096 goto out;
1097 }
1098
Amaury Denoyellefa241932023-02-14 15:36:36 +01001099 if (offset + len < qcs->rx.offset ||
1100 (offset + len == qcs->rx.offset && (!fin || (qcs->flags & QC_SF_SIZE_KNOWN)))) {
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001101 TRACE_DATA("already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
1102 goto out;
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001103 }
1104
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001105 TRACE_PROTO("receiving STREAM", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelle38e60062022-07-01 16:48:42 +02001106 qcs_idle_open(qcs);
1107
Amaury Denoyelled46b0f52022-05-20 15:05:07 +02001108 if (offset + len > qcs->rx.offset_max) {
1109 uint64_t diff = offset + len - qcs->rx.offset_max;
1110 qcs->rx.offset_max = offset + len;
1111 qcc->lfctl.offsets_recv += diff;
1112
1113 if (offset + len > qcs->rx.msd ||
1114 qcc->lfctl.offsets_recv > qcc->lfctl.md) {
1115 /* RFC 9000 4.1. Data Flow Control
1116 *
1117 * A receiver MUST close the connection with an error
1118 * of type FLOW_CONTROL_ERROR if the sender violates
1119 * the advertised connection or stream data limits
1120 */
Amaury Denoyellec7fb0d22022-08-10 16:39:54 +02001121 TRACE_ERROR("flow control error", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV|QMUX_EV_PROTO_ERR,
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001122 qcc->conn, qcs);
Amaury Denoyelle58721f22023-05-09 18:01:09 +02001123 qcc_set_error(qcc, QC_ERR_FLOW_CONTROL_ERROR, 0);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001124 goto err;
Amaury Denoyelled46b0f52022-05-20 15:05:07 +02001125 }
1126 }
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001127
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02001128 if (!qcs_get_ncbuf(qcs, &qcs->rx.ncbuf) || ncb_is_null(&qcs->rx.ncbuf)) {
Amaury Denoyelled00b3092023-05-11 17:00:54 +02001129 TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
1130 qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
1131 goto err;
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001132 }
1133
Amaury Denoyelle047d86a2022-08-10 16:42:35 +02001134 TRACE_DATA("newly received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelle1290f1e2022-05-13 14:49:05 +02001135 if (offset < qcs->rx.offset) {
Frédéric Lécaillea18c3332022-07-04 09:54:58 +02001136 size_t diff = qcs->rx.offset - offset;
1137
1138 len -= diff;
1139 data += diff;
Amaury Denoyelle1290f1e2022-05-13 14:49:05 +02001140 offset = qcs->rx.offset;
1141 }
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001142
Amaury Denoyellefa241932023-02-14 15:36:36 +01001143 if (len) {
1144 ret = ncb_add(&qcs->rx.ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
1145 switch (ret) {
1146 case NCB_RET_OK:
1147 break;
1148
1149 case NCB_RET_DATA_REJ:
Amaury Denoyellecc3d7162022-05-20 15:14:57 +02001150 /* RFC 9000 2.2. Sending and Receiving Data
1151 *
1152 * An endpoint could receive data for a stream at the
1153 * same stream offset multiple times. Data that has
1154 * already been received can be discarded. The data at
1155 * a given offset MUST NOT change if it is sent
1156 * multiple times; an endpoint MAY treat receipt of
1157 * different data at the same offset within a stream as
1158 * a connection error of type PROTOCOL_VIOLATION.
1159 */
Amaury Denoyellec7fb0d22022-08-10 16:39:54 +02001160 TRACE_ERROR("overlapping data rejected", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV|QMUX_EV_PROTO_ERR,
Amaury Denoyelle1290f1e2022-05-13 14:49:05 +02001161 qcc->conn, qcs);
Amaury Denoyelle58721f22023-05-09 18:01:09 +02001162 qcc_set_error(qcc, QC_ERR_PROTOCOL_VIOLATION, 0);
Amaury Denoyellefa241932023-02-14 15:36:36 +01001163 return 1;
1164
1165 case NCB_RET_GAP_SIZE:
Amaury Denoyelle047d86a2022-08-10 16:42:35 +02001166 TRACE_DATA("cannot bufferize frame due to gap size limit", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
1167 qcc->conn, qcs);
Amaury Denoyellefa241932023-02-14 15:36:36 +01001168 return 1;
Amaury Denoyelle1290f1e2022-05-13 14:49:05 +02001169 }
Amaury Denoyelle1290f1e2022-05-13 14:49:05 +02001170 }
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001171
1172 if (fin)
Amaury Denoyelle3f39b402022-07-01 16:11:03 +02001173 qcs->flags |= QC_SF_SIZE_KNOWN;
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001174
Amaury Denoyelled1310f82022-09-16 13:30:59 +02001175 if (qcs->flags & QC_SF_SIZE_KNOWN &&
1176 qcs->rx.offset_max == qcs->rx.offset + ncb_data(&qcs->rx.ncbuf, 0)) {
Amaury Denoyelle38e60062022-07-01 16:48:42 +02001177 qcs_close_remote(qcs);
Amaury Denoyelled1310f82022-09-16 13:30:59 +02001178 }
Amaury Denoyelle38e60062022-07-01 16:48:42 +02001179
Amaury Denoyellefa241932023-02-14 15:36:36 +01001180 if ((ncb_data(&qcs->rx.ncbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) || fin) {
Amaury Denoyelle3a086402022-05-18 11:38:22 +02001181 qcc_decode_qcs(qcc, qcs);
Amaury Denoyelle418ba212022-08-02 15:57:16 +02001182 qcc_refresh_timeout(qcc);
1183 }
Amaury Denoyelle3a086402022-05-18 11:38:22 +02001184
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001185 out:
Amaury Denoyelle4f137572022-03-24 17:10:00 +01001186 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001187 return 0;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001188
1189 err:
1190 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
1191 return 1;
Amaury Denoyelle0e3010b2022-02-28 11:37:48 +01001192}
1193
Amaury Denoyelle1e5e5132022-03-08 16:23:03 +01001194/* Handle a new MAX_DATA frame. <max> must contains the maximum data field of
1195 * the frame.
1196 *
1197 * Returns 0 on success else non-zero.
1198 */
1199int qcc_recv_max_data(struct qcc *qcc, uint64_t max)
1200{
Amaury Denoyelle392e94e2022-07-06 15:44:16 +02001201 TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
1202
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001203 TRACE_PROTO("receiving MAX_DATA", QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelle1e5e5132022-03-08 16:23:03 +01001204 if (qcc->rfctl.md < max) {
1205 qcc->rfctl.md = max;
Amaury Denoyelleb7143a82023-03-22 15:08:01 +01001206 TRACE_DATA("increase remote max-data", QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelle1e5e5132022-03-08 16:23:03 +01001207
1208 if (qcc->flags & QC_CF_BLK_MFCTL) {
1209 qcc->flags &= ~QC_CF_BLK_MFCTL;
1210 tasklet_wakeup(qcc->wait_event.tasklet);
1211 }
1212 }
Amaury Denoyelle392e94e2022-07-06 15:44:16 +02001213
1214 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelle1e5e5132022-03-08 16:23:03 +01001215 return 0;
1216}
1217
Amaury Denoyelle8727ff42022-03-08 10:39:55 +01001218/* Handle a new MAX_STREAM_DATA frame. <max> must contains the maximum data
1219 * field of the frame and <id> is the identifier of the QUIC stream.
1220 *
Amaury Denoyelleb68559a2022-07-06 15:45:20 +02001221 * Returns 0 on success else non-zero. On error, the received frame should not
1222 * be acknowledged.
Amaury Denoyelle8727ff42022-03-08 10:39:55 +01001223 */
1224int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
1225{
1226 struct qcs *qcs;
Amaury Denoyelle8727ff42022-03-08 10:39:55 +01001227
Amaury Denoyelle392e94e2022-07-06 15:44:16 +02001228 TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
1229
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02001230 if (qcc->flags & QC_CF_ERRL) {
1231 TRACE_DATA("connection on error", QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelleb47310d2023-03-09 15:49:48 +01001232 goto err;
1233 }
1234
Amaury Denoyelleb68559a2022-07-06 15:45:20 +02001235 /* RFC 9000 19.10. MAX_STREAM_DATA Frames
1236 *
1237 * Receiving a MAX_STREAM_DATA frame for a locally
1238 * initiated stream that has not yet been created MUST be treated as a
1239 * connection error of type STREAM_STATE_ERROR. An endpoint that
1240 * receives a MAX_STREAM_DATA frame for a receive-only stream MUST
1241 * terminate the connection with error STREAM_STATE_ERROR.
1242 */
Amaury Denoyelleb47310d2023-03-09 15:49:48 +01001243 if (qcc_get_qcs(qcc, id, 0, 1, &qcs))
1244 goto err;
Amaury Denoyelleb68559a2022-07-06 15:45:20 +02001245
1246 if (qcs) {
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001247 TRACE_PROTO("receiving MAX_STREAM_DATA", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelle8727ff42022-03-08 10:39:55 +01001248 if (max > qcs->tx.msd) {
1249 qcs->tx.msd = max;
Amaury Denoyelleb7143a82023-03-22 15:08:01 +01001250 TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelle8727ff42022-03-08 10:39:55 +01001251
1252 if (qcs->flags & QC_SF_BLK_SFCTL) {
1253 qcs->flags &= ~QC_SF_BLK_SFCTL;
Amaury Denoyelle20f2a422023-01-03 14:39:24 +01001254 /* TODO optim: only wakeup IO-CB if stream has data to sent. */
Amaury Denoyelle8727ff42022-03-08 10:39:55 +01001255 tasklet_wakeup(qcc->wait_event.tasklet);
1256 }
1257 }
1258 }
1259
Amaury Denoyelle30e260e2022-08-03 11:17:57 +02001260 if (qcc_may_expire(qcc) && !qcc->nb_hreq)
1261 qcc_refresh_timeout(qcc);
1262
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001263 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
1264 return 0;
Amaury Denoyelleb47310d2023-03-09 15:49:48 +01001265
1266 err:
1267 TRACE_DEVEL("leaving on error", QMUX_EV_QCC_RECV, qcc->conn);
1268 return 1;
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001269}
1270
1271/* Handle a new RESET_STREAM frame from stream ID <id> with error code <err>
1272 * and final stream size <final_size>.
1273 *
1274 * Returns 0 on success else non-zero. On error, the received frame should not
1275 * be acknowledged.
1276 */
1277int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t final_size)
1278{
1279 struct qcs *qcs;
1280
1281 TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
1282
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02001283 if (qcc->flags & QC_CF_ERRL) {
1284 TRACE_DATA("connection on error", QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelleb47310d2023-03-09 15:49:48 +01001285 goto err;
1286 }
1287
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001288 /* RFC 9000 19.4. RESET_STREAM Frames
1289 *
1290 * An endpoint that receives a RESET_STREAM frame for a send-only stream
1291 * MUST terminate the connection with error STREAM_STATE_ERROR.
1292 */
1293 if (qcc_get_qcs(qcc, id, 1, 0, &qcs)) {
1294 TRACE_ERROR("RESET_STREAM for send-only stream received", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001295 goto err;
1296 }
1297
Amaury Denoyelle3cb78142023-05-15 11:31:20 +02001298 /* RFC 9000 3.2. Receiving Stream States
1299 *
1300 * A RESET_STREAM signal might be suppressed or withheld
1301 * if stream data is completely received and is buffered to be read by
1302 * the application. If the RESET_STREAM is suppressed, the receiving
1303 * part of the stream remains in "Data Recvd".
1304 */
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001305 if (!qcs || qcs_is_close_remote(qcs))
1306 goto out;
1307
1308 TRACE_PROTO("receiving RESET_STREAM", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
1309 qcs_idle_open(qcs);
1310
Amaury Denoyelle3cb78142023-05-15 11:31:20 +02001311 /* Ensure stream closure is not forbidden by application protocol. */
Amaury Denoyellee269aeb2023-01-30 12:13:22 +01001312 if (qcc->app_ops->close) {
1313 if (qcc->app_ops->close(qcs, QCC_APP_OPS_CLOSE_SIDE_RD)) {
1314 TRACE_ERROR("closure rejected by app layer", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
1315 goto out;
1316 }
1317 }
1318
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001319 if (qcs->rx.offset_max > final_size ||
1320 ((qcs->flags & QC_SF_SIZE_KNOWN) && qcs->rx.offset_max != final_size)) {
1321 TRACE_ERROR("final size error on RESET_STREAM", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelle58721f22023-05-09 18:01:09 +02001322 qcc_set_error(qcc, QC_ERR_FINAL_SIZE_ERROR, 0);
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001323 goto err;
1324 }
1325
Amaury Denoyelle3cb78142023-05-15 11:31:20 +02001326 /* RFC 9000 3.2. Receiving Stream States
1327 *
1328 * An
1329 * implementation MAY interrupt delivery of stream data, discard any
1330 * data that was not consumed, and signal the receipt of the
1331 * RESET_STREAM.
1332 */
1333 qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET;
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001334 qcs_close_remote(qcs);
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02001335 qcs_free_ncbuf(qcs, &qcs->rx.ncbuf);
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001336
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001337 out:
Amaury Denoyelle392e94e2022-07-06 15:44:16 +02001338 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelle8727ff42022-03-08 10:39:55 +01001339 return 0;
Amaury Denoyelle5854fc02022-12-09 16:25:48 +01001340
1341 err:
1342 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
1343 return 1;
Amaury Denoyelle8727ff42022-03-08 10:39:55 +01001344}
1345
Amaury Denoyellea5b50752022-07-04 11:44:53 +02001346/* Handle a new STOP_SENDING frame for stream ID <id>. The error code should be
1347 * specified in <err>.
1348 *
1349 * Returns 0 on success else non-zero. On error, the received frame should not
1350 * be acknowledged.
1351 */
1352int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err)
1353{
1354 struct qcs *qcs;
1355
1356 TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
1357
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02001358 if (qcc->flags & QC_CF_ERRL) {
1359 TRACE_DATA("connection on error", QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelleb47310d2023-03-09 15:49:48 +01001360 goto err;
1361 }
1362
Amaury Denoyellea5b50752022-07-04 11:44:53 +02001363 /* RFC 9000 19.5. STOP_SENDING Frames
1364 *
1365 * Receiving a STOP_SENDING frame for a
1366 * locally initiated stream that has not yet been created MUST be
1367 * treated as a connection error of type STREAM_STATE_ERROR. An
1368 * endpoint that receives a STOP_SENDING frame for a receive-only stream
1369 * MUST terminate the connection with error STREAM_STATE_ERROR.
1370 */
Amaury Denoyelleb47310d2023-03-09 15:49:48 +01001371 if (qcc_get_qcs(qcc, id, 0, 1, &qcs))
1372 goto err;
Amaury Denoyellea5b50752022-07-04 11:44:53 +02001373
1374 if (!qcs)
1375 goto out;
1376
Amaury Denoyelle047d86a2022-08-10 16:42:35 +02001377 TRACE_PROTO("receiving STOP_SENDING", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
Amaury Denoyelled7755372022-10-03 17:20:31 +02001378
1379 /* RFC 9000 3.5. Solicited State Transitions
1380 *
1381 * An endpoint is expected to send another STOP_SENDING frame if a
1382 * packet containing a previous STOP_SENDING is lost. However, once
1383 * either all stream data or a RESET_STREAM frame has been received for
1384 * the stream -- that is, the stream is in any state other than "Recv"
1385 * or "Size Known" -- sending a STOP_SENDING frame is unnecessary.
1386 */
1387
1388 /* TODO thanks to previous RFC clause, STOP_SENDING is ignored if current stream
1389 * has already been closed locally. This is useful to not emit multiple
1390 * RESET_STREAM for a single stream. This is functional if stream is
1391 * locally closed due to all data transmitted, but in this case the RFC
1392 * advices to use an explicit RESET_STREAM.
1393 */
1394 if (qcs_is_close_local(qcs)) {
1395 TRACE_STATE("ignoring STOP_SENDING", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
1396 goto out;
1397 }
1398
Amaury Denoyelle96ca1b72022-08-09 17:36:38 +02001399 qcs_idle_open(qcs);
1400
Amaury Denoyelle87f87662023-01-30 12:12:43 +01001401 if (qcc->app_ops->close) {
1402 if (qcc->app_ops->close(qcs, QCC_APP_OPS_CLOSE_SIDE_WR)) {
1403 TRACE_ERROR("closure rejected by app layer", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
1404 goto out;
1405 }
1406 }
1407
Amaury Denoyelle2f590382023-12-19 11:22:28 +01001408 /* If FIN already reached, future RESET_STREAMS will be ignored.
1409 * Manually set EOS in this case.
1410 */
1411 if (qcs_sc(qcs) && se_fl_test(qcs->sd, SE_FL_EOI)) {
1412 se_fl_set(qcs->sd, SE_FL_EOS);
1413 qcs_alert(qcs);
1414 }
1415
Amaury Denoyellea5b50752022-07-04 11:44:53 +02001416 /* RFC 9000 3.5. Solicited State Transitions
1417 *
1418 * An endpoint that receives a STOP_SENDING frame
1419 * MUST send a RESET_STREAM frame if the stream is in the "Ready" or
1420 * "Send" state. If the stream is in the "Data Sent" state, the
1421 * endpoint MAY defer sending the RESET_STREAM frame until the packets
1422 * containing outstanding data are acknowledged or declared lost. If
1423 * any outstanding data is declared lost, the endpoint SHOULD send a
1424 * RESET_STREAM frame instead of retransmitting the data.
1425 *
1426 * An endpoint SHOULD copy the error code from the STOP_SENDING frame to
1427 * the RESET_STREAM frame it sends, but it can use any application error
1428 * code.
1429 */
Amaury Denoyellea5b50752022-07-04 11:44:53 +02001430 qcc_reset_stream(qcs, err);
1431
Amaury Denoyelle30e260e2022-08-03 11:17:57 +02001432 if (qcc_may_expire(qcc) && !qcc->nb_hreq)
1433 qcc_refresh_timeout(qcc);
1434
Amaury Denoyellea5b50752022-07-04 11:44:53 +02001435 out:
1436 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
1437 return 0;
Amaury Denoyelleb47310d2023-03-09 15:49:48 +01001438
1439 err:
1440 TRACE_DEVEL("leaving on error", QMUX_EV_QCC_RECV, qcc->conn);
1441 return 1;
Amaury Denoyellea5b50752022-07-04 11:44:53 +02001442}
1443
Amaury Denoyellec985cb12022-05-16 14:29:59 +02001444/* Signal the closing of remote stream with id <id>. Flow-control for new
1445 * streams may be allocated for the peer if needed.
1446 */
1447static int qcc_release_remote_stream(struct qcc *qcc, uint64_t id)
Amaury Denoyellec055e302022-02-07 16:09:06 +01001448{
Amaury Denoyellec985cb12022-05-16 14:29:59 +02001449 struct quic_frame *frm;
1450
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001451 TRACE_ENTER(QMUX_EV_QCS_END, qcc->conn);
1452
Amaury Denoyellec985cb12022-05-16 14:29:59 +02001453 if (quic_stream_is_bidi(id)) {
1454 ++qcc->lfctl.cl_bidi_r;
1455 if (qcc->lfctl.cl_bidi_r > qcc->lfctl.ms_bidi_init / 2) {
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001456 TRACE_DATA("increase max stream limit with MAX_STREAMS_BIDI", QMUX_EV_QCC_SEND, qcc->conn);
Amaury Denoyelle40c24f12023-01-27 17:47:49 +01001457 frm = qc_frm_alloc(QUIC_FT_MAX_STREAMS_BIDI);
Amaury Denoyelleabbb5ad2023-03-09 10:16:38 +01001458 if (!frm) {
Amaury Denoyelle58721f22023-05-09 18:01:09 +02001459 qcc_set_error(qcc, QC_ERR_INTERNAL_ERROR, 0);
Amaury Denoyelleabbb5ad2023-03-09 10:16:38 +01001460 goto err;
1461 }
Amaury Denoyellec985cb12022-05-16 14:29:59 +02001462
Amaury Denoyellec985cb12022-05-16 14:29:59 +02001463 frm->max_streams_bidi.max_streams = qcc->lfctl.ms_bidi +
1464 qcc->lfctl.cl_bidi_r;
1465 LIST_APPEND(&qcc->lfctl.frms, &frm->list);
1466 tasklet_wakeup(qcc->wait_event.tasklet);
1467
1468 qcc->lfctl.ms_bidi += qcc->lfctl.cl_bidi_r;
1469 qcc->lfctl.cl_bidi_r = 0;
1470 }
1471 }
1472 else {
Amaury Denoyelle91077312022-12-22 18:56:09 +01001473 /* TODO unidirectional stream flow control with MAX_STREAMS_UNI
1474 * emission not implemented. It should be unnecessary for
1475 * HTTP/3 but may be required if other application protocols
1476 * are supported.
Amaury Denoyellebf3c2082022-08-16 11:29:08 +02001477 */
Amaury Denoyellec985cb12022-05-16 14:29:59 +02001478 }
1479
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001480 TRACE_LEAVE(QMUX_EV_QCS_END, qcc->conn);
1481
Amaury Denoyellec985cb12022-05-16 14:29:59 +02001482 return 0;
Amaury Denoyelleabbb5ad2023-03-09 10:16:38 +01001483
1484 err:
1485 TRACE_DEVEL("leaving on error", QMUX_EV_QCS_END, qcc->conn);
1486 return 1;
Amaury Denoyellec055e302022-02-07 16:09:06 +01001487}
1488
Ilya Shipitsin5e87bcf2021-12-25 11:45:52 +05001489/* detaches the QUIC stream from its QCC and releases it to the QCS pool. */
Amaury Denoyelle2873a312021-12-08 14:42:55 +01001490static void qcs_destroy(struct qcs *qcs)
1491{
Amaury Denoyelleb47310d2023-03-09 15:49:48 +01001492 struct qcc *qcc = qcs->qcc;
1493 struct connection *conn = qcc->conn;
Amaury Denoyelled8e680c2022-03-29 15:18:44 +02001494 const uint64_t id = qcs->id;
Amaury Denoyellec055e302022-02-07 16:09:06 +01001495
Amaury Denoyelle4f137572022-03-24 17:10:00 +01001496 TRACE_ENTER(QMUX_EV_QCS_END, conn, qcs);
Amaury Denoyelle2873a312021-12-08 14:42:55 +01001497
Amaury Denoyelle178fbff2023-03-22 11:17:59 +01001498 /* MUST not removed a stream with sending prepared data left. This is
1499 * to ensure consistency on connection flow-control calculation.
1500 */
1501 BUG_ON(qcs->tx.offset < qcs->tx.sent_offset);
1502
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02001503 if (!(qcc->flags & QC_CF_ERRL)) {
Amaury Denoyelleb47310d2023-03-09 15:49:48 +01001504 if (quic_stream_is_remote(qcc, id))
1505 qcc_release_remote_stream(qcc, id);
1506 }
Amaury Denoyellec055e302022-02-07 16:09:06 +01001507
Amaury Denoyelledccbd732022-03-29 18:36:59 +02001508 qcs_free(qcs);
Amaury Denoyelle4f137572022-03-24 17:10:00 +01001509
1510 TRACE_LEAVE(QMUX_EV_QCS_END, conn);
Amaury Denoyelle2873a312021-12-08 14:42:55 +01001511}
1512
Amaury Denoyelleb9e06402022-06-10 15:16:21 +02001513/* Transfer as much as possible data on <qcs> from <in> to <out>. This is done
1514 * in respect with available flow-control at stream and connection level.
Amaury Denoyelle75d14ad2022-03-22 15:10:29 +01001515 *
Amaury Denoyelle0abde9d2023-05-11 16:52:17 +02001516 * Returns the total bytes of transferred data or a negative error code.
Amaury Denoyelle75d14ad2022-03-22 15:10:29 +01001517 */
Amaury Denoyelleb9e06402022-06-10 15:16:21 +02001518static int qcs_xfer_data(struct qcs *qcs, struct buffer *out, struct buffer *in)
Frédéric Lécaille578a7892021-09-13 16:13:00 +02001519{
Amaury Denoyelle05ce55e2022-03-08 10:35:42 +01001520 struct qcc *qcc = qcs->qcc;
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02001521 int left, to_xfer;
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001522 int total = 0;
Frédéric Lécaille578a7892021-09-13 16:13:00 +02001523
Amaury Denoyelle4f137572022-03-24 17:10:00 +01001524 TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
Amaury Denoyelledeed7772021-12-03 11:36:46 +01001525
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02001526 if (!qcs_get_buf(qcs, out)) {
Amaury Denoyelle0abde9d2023-05-11 16:52:17 +02001527 TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
1528 goto err;
1529 }
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001530
1531 /*
1532 * QCS out buffer diagram
1533 * head left to_xfer
1534 * -------------> ----------> ----->
Amaury Denoyellee0320b82022-03-11 19:12:23 +01001535 * --------------------------------------------------
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001536 * |...............|xxxxxxxxxxx|<<<<<
Amaury Denoyellee0320b82022-03-11 19:12:23 +01001537 * --------------------------------------------------
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001538 * ^ ack-off ^ sent-off ^ off
1539 *
1540 * STREAM frame
1541 * ^ ^
1542 * |xxxxxxxxxxxxxxxxx|
1543 */
1544
Amaury Denoyelle7272cd72022-03-29 15:15:54 +02001545 BUG_ON_HOT(qcs->tx.sent_offset < qcs->stream->ack_offset);
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001546 BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset);
Amaury Denoyelle78fa5592022-06-10 15:18:12 +02001547 BUG_ON_HOT(qcc->tx.offsets < qcc->tx.sent_offsets);
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001548
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001549 left = qcs->tx.offset - qcs->tx.sent_offset;
Amaury Denoyellefe8f5552022-04-27 16:44:49 +02001550 to_xfer = QUIC_MIN(b_data(in), b_room(out));
Amaury Denoyelle6ea78192022-03-07 15:47:02 +01001551
1552 BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd);
1553 /* do not exceed flow control limit */
Amaury Denoyelle1ec78ff2023-03-22 11:58:32 +01001554 if (qcs->tx.offset + to_xfer > qcs->tx.msd) {
1555 TRACE_DATA("do not exceed stream flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs);
Amaury Denoyelle6ea78192022-03-07 15:47:02 +01001556 to_xfer = qcs->tx.msd - qcs->tx.offset;
Amaury Denoyelle1ec78ff2023-03-22 11:58:32 +01001557 }
Amaury Denoyelle6ea78192022-03-07 15:47:02 +01001558
Amaury Denoyelleb9e06402022-06-10 15:16:21 +02001559 BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md);
Amaury Denoyelle05ce55e2022-03-08 10:35:42 +01001560 /* do not overcome flow control limit on connection */
Amaury Denoyelle1ec78ff2023-03-22 11:58:32 +01001561 if (qcc->tx.offsets + to_xfer > qcc->rfctl.md) {
1562 TRACE_DATA("do not exceed conn flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs);
Amaury Denoyelleb9e06402022-06-10 15:16:21 +02001563 to_xfer = qcc->rfctl.md - qcc->tx.offsets;
Amaury Denoyelle1ec78ff2023-03-22 11:58:32 +01001564 }
Amaury Denoyelle05ce55e2022-03-08 10:35:42 +01001565
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001566 if (!left && !to_xfer)
Frédéric Lécailled2ba0962021-09-20 17:50:03 +02001567 goto out;
1568
Amaury Denoyellefe8f5552022-04-27 16:44:49 +02001569 total = b_force_xfer(out, in, to_xfer);
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02001570
1571 out:
1572 {
1573 struct qcs_xfer_data_trace_arg arg = {
1574 .prep = b_data(out), .xfer = total,
1575 };
1576 TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_XFER_DATA,
1577 qcc->conn, qcs, &arg);
1578 }
1579
1580 return total;
Amaury Denoyelle0abde9d2023-05-11 16:52:17 +02001581
1582 err:
1583 TRACE_DEVEL("leaving on error", QMUX_EV_QCS_SEND, qcc->conn, qcs);
1584 return -1;
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02001585}
1586
Amaury Denoyellefe8f5552022-04-27 16:44:49 +02001587/* Prepare a STREAM frame for <qcs> instance using <out> as payload. The frame
1588 * is appended in <frm_list>. Set <fin> if this is supposed to be the last
Amaury Denoyellea57ab0f2023-04-26 11:38:11 +02001589 * stream frame. If <out> is NULL an empty STREAM frame is built : this may be
1590 * useful if FIN needs to be sent without any data left.
Amaury Denoyellefe8f5552022-04-27 16:44:49 +02001591 *
Amaury Denoyellea57ab0f2023-04-26 11:38:11 +02001592 * Returns the payload length of the STREAM frame or a negative error code.
Amaury Denoyellefe8f5552022-04-27 16:44:49 +02001593 */
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02001594static int qcs_build_stream_frm(struct qcs *qcs, struct buffer *out, char fin,
1595 struct list *frm_list)
1596{
1597 struct qcc *qcc = qcs->qcc;
1598 struct quic_frame *frm;
1599 int head, total;
Amaury Denoyellea4569202022-04-15 17:29:25 +02001600 uint64_t base_off;
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02001601
1602 TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
1603
Amaury Denoyellea4569202022-04-15 17:29:25 +02001604 /* if ack_offset < buf_offset, it points to an older buffer. */
1605 base_off = MAX(qcs->stream->buf_offset, qcs->stream->ack_offset);
1606 BUG_ON(qcs->tx.sent_offset < base_off);
1607
1608 head = qcs->tx.sent_offset - base_off;
Amaury Denoyellea57ab0f2023-04-26 11:38:11 +02001609 total = out ? b_data(out) - head : 0;
Amaury Denoyellea4569202022-04-15 17:29:25 +02001610 BUG_ON(total < 0);
1611
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001612 if (!total && !fin) {
1613 /* No need to send anything if total is NULL and no FIN to signal. */
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02001614 TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
1615 return 0;
1616 }
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001617 BUG_ON((!total && qcs->tx.sent_offset > qcs->tx.offset) ||
1618 (total && qcs->tx.sent_offset >= qcs->tx.offset));
Amaury Denoyellea4569202022-04-15 17:29:25 +02001619 BUG_ON(qcs->tx.sent_offset + total > qcs->tx.offset);
Amaury Denoyelle78fa5592022-06-10 15:18:12 +02001620 BUG_ON(qcc->tx.sent_offsets + total > qcc->rfctl.md);
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02001621
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001622 TRACE_PROTO("sending STREAM frame", QMUX_EV_QCS_SEND, qcc->conn, qcs);
Amaury Denoyelle40c24f12023-01-27 17:47:49 +01001623 frm = qc_frm_alloc(QUIC_FT_STREAM_8);
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001624 if (!frm) {
1625 TRACE_ERROR("frame alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
Frédéric Lécaille578a7892021-09-13 16:13:00 +02001626 goto err;
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001627 }
Frédéric Lécaille578a7892021-09-13 16:13:00 +02001628
Amaury Denoyelle7272cd72022-03-29 15:15:54 +02001629 frm->stream.stream = qcs->stream;
Amaury Denoyelled8e680c2022-03-29 15:18:44 +02001630 frm->stream.id = qcs->id;
Amaury Denoyelle1dac0182023-02-02 16:45:07 +01001631 frm->stream.offset.key = 0;
Amaury Denoyelleebfafc22023-03-07 18:07:08 +01001632 frm->stream.dup = 0;
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001633
Amaury Denoyelle42c5b752023-04-25 16:39:32 +02001634 if (total) {
1635 frm->stream.buf = out;
1636 frm->stream.data = (unsigned char *)b_peek(out, head);
1637 }
1638 else {
1639 /* Empty STREAM frame. */
1640 frm->stream.buf = NULL;
1641 frm->stream.data = NULL;
1642 }
1643
Amaury Denoyellefecfa0d2021-12-07 16:50:14 +01001644 /* FIN is positioned only when the buffer has been totally emptied. */
Frédéric Lécaille578a7892021-09-13 16:13:00 +02001645 if (fin)
1646 frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001647
1648 if (qcs->tx.sent_offset) {
Frédéric Lécaille578a7892021-09-13 16:13:00 +02001649 frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001650 frm->stream.offset.key = qcs->tx.sent_offset;
Frédéric Lécaille578a7892021-09-13 16:13:00 +02001651 }
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001652
Amaury Denoyelle42c5b752023-04-25 16:39:32 +02001653 /* Always set length bit as we do not know if there is remaining frames
1654 * in the final packet after this STREAM.
1655 */
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02001656 frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
1657 frm->stream.len = total;
Frédéric Lécaille578a7892021-09-13 16:13:00 +02001658
Amaury Denoyelle2c71fe52022-02-09 18:16:49 +01001659 LIST_APPEND(frm_list, &frm->list);
Amaury Denoyelle4f137572022-03-24 17:10:00 +01001660
Frédéric Lécailled2ba0962021-09-20 17:50:03 +02001661 out:
Amaury Denoyellefdcec362022-03-25 09:28:10 +01001662 {
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02001663 struct qcs_build_stream_trace_arg arg = {
1664 .len = frm->stream.len, .fin = fin,
1665 .offset = frm->stream.offset.key,
Amaury Denoyellefdcec362022-03-25 09:28:10 +01001666 };
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02001667 TRACE_LEAVE(QMUX_EV_QCS_SEND|QMUX_EV_QCS_BUILD_STRM,
Amaury Denoyellefdcec362022-03-25 09:28:10 +01001668 qcc->conn, qcs, &arg);
1669 }
Amaury Denoyelle4f137572022-03-24 17:10:00 +01001670
Frédéric Lécaille578a7892021-09-13 16:13:00 +02001671 return total;
1672
1673 err:
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001674 TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
Frédéric Lécaille578a7892021-09-13 16:13:00 +02001675 return -1;
1676}
1677
Ilya Shipitsin3b64a282022-07-29 22:26:53 +05001678/* Check after transferring data from qcs.tx.buf if FIN must be set on the next
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001679 * STREAM frame for <qcs>.
1680 *
1681 * Returns true if FIN must be set else false.
1682 */
1683static int qcs_stream_fin(struct qcs *qcs)
1684{
1685 return qcs->flags & QC_SF_FIN_STREAM && !b_data(&qcs->tx.buf);
1686}
1687
Amaury Denoyelle0a1154a2023-01-06 17:43:11 +01001688/* Return true if <qcs> has data to send in new STREAM frames. */
1689static forceinline int qcs_need_sending(struct qcs *qcs)
1690{
1691 return b_data(&qcs->tx.buf) || qcs->tx.sent_offset < qcs->tx.offset ||
1692 qcs_stream_fin(qcs);
1693}
1694
Amaury Denoyelle54445d02022-03-10 16:44:14 +01001695/* This function must be called by the upper layer to inform about the sending
1696 * of a STREAM frame for <qcs> instance. The frame is of <data> length and on
1697 * <offset>.
1698 */
1699void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
1700{
Amaury Denoyelle05ce55e2022-03-08 10:35:42 +01001701 struct qcc *qcc = qcs->qcc;
1702 uint64_t diff;
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001703
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001704 TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
1705
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001706 BUG_ON(offset > qcs->tx.sent_offset);
Amaury Denoyelle78fa5592022-06-10 15:18:12 +02001707 BUG_ON(offset + data > qcs->tx.offset);
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001708
Amaury Denoyelle54445d02022-03-10 16:44:14 +01001709 /* check if the STREAM frame has already been notified. It can happen
1710 * for retransmission.
1711 */
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001712 if (offset + data < qcs->tx.sent_offset) {
1713 TRACE_DEVEL("offset already notified", QMUX_EV_QCS_SEND, qcc->conn, qcs);
1714 goto out;
1715 }
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001716
Amaury Denoyelle38e60062022-07-01 16:48:42 +02001717 qcs_idle_open(qcs);
1718
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001719 diff = offset + data - qcs->tx.sent_offset;
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001720 if (diff) {
1721 /* increase offset sum on connection */
1722 qcc->tx.sent_offsets += diff;
1723 BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md);
Amaury Denoyelle31d20572023-01-06 15:29:59 +01001724 if (qcc->tx.sent_offsets == qcc->rfctl.md) {
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001725 qcc->flags |= QC_CF_BLK_MFCTL;
Amaury Denoyelle31d20572023-01-06 15:29:59 +01001726 TRACE_STATE("connection flow-control reached", QMUX_EV_QCS_SEND, qcc->conn);
1727 }
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01001728
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001729 /* increase offset on stream */
1730 qcs->tx.sent_offset += diff;
1731 BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
1732 BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset);
Amaury Denoyelle31d20572023-01-06 15:29:59 +01001733 if (qcs->tx.sent_offset == qcs->tx.msd) {
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001734 qcs->flags |= QC_SF_BLK_SFCTL;
Amaury Denoyelle31d20572023-01-06 15:29:59 +01001735 TRACE_STATE("stream flow-control reached", QMUX_EV_QCS_SEND, qcc->conn, qcs);
1736 }
Amaury Denoyelle05ce55e2022-03-08 10:35:42 +01001737
Amaury Denoyellea9de7ea2023-01-06 17:16:47 +01001738 /* If qcs.stream.buf is full, release it to the lower layer. */
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001739 if (qcs->tx.offset == qcs->tx.sent_offset &&
1740 b_full(&qcs->stream->buf->buf)) {
1741 qc_stream_buf_release(qcs->stream);
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001742 }
Amaury Denoyelle1bcb6952023-04-28 16:24:44 +02001743
1744 /* Add measurement for send rate. This is done at the MUX layer
1745 * to account only for STREAM frames without retransmission.
Amaury Denoyelle1bcb6952023-04-28 16:24:44 +02001746 */
Amaury Denoyellebc0adfa2023-04-28 16:46:11 +02001747 increment_send_rate(diff, 0);
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001748 }
Amaury Denoyellea4569202022-04-15 17:29:25 +02001749
Amaury Denoyelle20f2a422023-01-03 14:39:24 +01001750 if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf)) {
1751 /* Remove stream from send_list if all was sent. */
1752 LIST_DEL_INIT(&qcs->el_send);
1753 TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
1754
1755 if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
1756 /* Close stream locally. */
1757 qcs_close_local(qcs);
1758 /* Reset flag to not emit multiple FIN STREAM frames. */
1759 qcs->flags &= ~QC_SF_FIN_STREAM;
1760 }
Amaury Denoyellea4569202022-04-15 17:29:25 +02001761 }
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02001762
1763 out:
1764 TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
Amaury Denoyelle54445d02022-03-10 16:44:14 +01001765}
1766
Amaury Denoyelleb35e32e2023-05-03 09:50:25 +02001767/* Returns true if subscribe set, false otherwise. */
1768static int qcc_subscribe_send(struct qcc *qcc)
1769{
1770 struct connection *conn = qcc->conn;
Amaury Denoyelleb2e31d32023-05-10 11:57:40 +02001771
1772 /* Do not subscribe if lower layer in error. */
1773 if (conn->flags & CO_FL_ERROR)
1774 return 0;
1775
Amaury Denoyelleb35e32e2023-05-03 09:50:25 +02001776 if (qcc->wait_event.events & SUB_RETRY_SEND)
1777 return 1;
1778
1779 TRACE_DEVEL("subscribe for send", QMUX_EV_QCC_SEND, qcc->conn);
1780 conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_SEND, &qcc->wait_event);
1781 return 1;
1782}
1783
Amaury Denoyelle2c71fe52022-02-09 18:16:49 +01001784/* Wrapper for send on transport layer. Send a list of frames <frms> for the
1785 * connection <qcc>.
1786 *
1787 * Returns 0 if all data sent with success else non-zero.
1788 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02001789static int qcc_send_frames(struct qcc *qcc, struct list *frms)
Amaury Denoyelle2c71fe52022-02-09 18:16:49 +01001790{
Amaury Denoyelle4f137572022-03-24 17:10:00 +01001791 TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
1792
1793 if (LIST_ISEMPTY(frms)) {
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001794 TRACE_DEVEL("no frames to send", QMUX_EV_QCC_SEND, qcc->conn);
1795 goto err;
Amaury Denoyelle4f137572022-03-24 17:10:00 +01001796 }
Frédéric Lécaille4e22f282022-03-18 18:38:19 +01001797
Amaury Denoyellecaa16542023-02-28 15:11:26 +01001798 if (!qc_send_mux(qcc->conn->handle.qc, frms)) {
Amaury Denoyelleb35e32e2023-05-03 09:50:25 +02001799 TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
Amaury Denoyelleb35e32e2023-05-03 09:50:25 +02001800 qcc_subscribe_send(qcc);
Amaury Denoyelle036cc5d2022-09-26 15:02:31 +02001801 goto err;
Amaury Denoyellecaa16542023-02-28 15:11:26 +01001802 }
Amaury Denoyellee9c4cc12022-03-04 15:29:53 +01001803
Amaury Denoyelledb5d1a12022-03-10 16:42:23 +01001804 /* If there is frames left at this stage, transport layer is blocked.
1805 * Subscribe on it to retry later.
1806 */
Amaury Denoyelle2c71fe52022-02-09 18:16:49 +01001807 if (!LIST_ISEMPTY(frms)) {
Amaury Denoyelleb35e32e2023-05-03 09:50:25 +02001808 TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
1809 qcc_subscribe_send(qcc);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001810 goto err;
Amaury Denoyelle2c71fe52022-02-09 18:16:49 +01001811 }
1812
Amaury Denoyelle3baab742022-08-11 18:35:55 +02001813 TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
Amaury Denoyelle843a1192022-07-04 11:44:38 +02001814 return 0;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001815
1816 err:
Amaury Denoyelle2ad41b82023-05-10 11:59:10 +02001817 TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001818 return 1;
Amaury Denoyelle843a1192022-07-04 11:44:38 +02001819}
1820
1821/* Emit a RESET_STREAM on <qcs>.
1822 *
1823 * Returns 0 if the frame has been successfully sent else non-zero.
1824 */
1825static int qcs_send_reset(struct qcs *qcs)
1826{
1827 struct list frms = LIST_HEAD_INIT(frms);
1828 struct quic_frame *frm;
1829
1830 TRACE_ENTER(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
1831
Amaury Denoyelle40c24f12023-01-27 17:47:49 +01001832 frm = qc_frm_alloc(QUIC_FT_RESET_STREAM);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001833 if (!frm) {
1834 TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
Amaury Denoyelle843a1192022-07-04 11:44:38 +02001835 return 1;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02001836 }
Amaury Denoyelle843a1192022-07-04 11:44:38 +02001837
Amaury Denoyelle843a1192022-07-04 11:44:38 +02001838 frm->reset_stream.id = qcs->id;
1839 frm->reset_stream.app_error_code = qcs->err;
1840 frm->reset_stream.final_size = qcs->tx.sent_offset;
1841
1842 LIST_APPEND(&frms, &frm->list);
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02001843 if (qcc_send_frames(qcs->qcc, &frms)) {
Amaury Denoyelle131f2d92023-05-09 14:10:55 +02001844 if (!LIST_ISEMPTY(&frms))
1845 qc_frm_free(&frm);
Amaury Denoyelle843a1192022-07-04 11:44:38 +02001846 TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
1847 return 1;
1848 }
1849
Amaury Denoyelle843a1192022-07-04 11:44:38 +02001850 qcs_close_local(qcs);
1851 qcs->flags &= ~QC_SF_TO_RESET;
1852
1853 TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
Amaury Denoyelle2c71fe52022-02-09 18:16:49 +01001854 return 0;
1855}
1856
Amaury Denoyelle663e8722022-12-09 14:58:28 +01001857/* Emit a STOP_SENDING on <qcs>.
1858 *
1859 * Returns 0 if the frame has been successfully sent else non-zero.
1860 */
1861static int qcs_send_stop_sending(struct qcs *qcs)
1862{
1863 struct list frms = LIST_HEAD_INIT(frms);
1864 struct quic_frame *frm;
1865 struct qcc *qcc = qcs->qcc;
1866
1867 TRACE_ENTER(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
1868
1869 /* RFC 9000 3.3. Permitted Frame Types
1870 *
1871 * A
1872 * receiver MAY send a STOP_SENDING frame in any state where it has not
1873 * received a RESET_STREAM frame -- that is, states other than "Reset
1874 * Recvd" or "Reset Read". However, there is little value in sending a
1875 * STOP_SENDING frame in the "Data Recvd" state, as all stream data has
1876 * been received. A sender could receive either of these two types of
1877 * frames in any state as a result of delayed delivery of packets.¶
1878 */
1879 if (qcs_is_close_remote(qcs)) {
1880 TRACE_STATE("skip STOP_SENDING on remote already closed", QMUX_EV_QCS_SEND, qcc->conn, qcs);
1881 goto done;
1882 }
1883
Amaury Denoyelle40c24f12023-01-27 17:47:49 +01001884 frm = qc_frm_alloc(QUIC_FT_STOP_SENDING);
Amaury Denoyelle663e8722022-12-09 14:58:28 +01001885 if (!frm) {
1886 TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
1887 return 1;
1888 }
1889
Amaury Denoyelle663e8722022-12-09 14:58:28 +01001890 frm->stop_sending.id = qcs->id;
1891 frm->stop_sending.app_error_code = qcs->err;
1892
1893 LIST_APPEND(&frms, &frm->list);
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02001894 if (qcc_send_frames(qcs->qcc, &frms)) {
Amaury Denoyelle131f2d92023-05-09 14:10:55 +02001895 if (!LIST_ISEMPTY(&frms))
1896 qc_frm_free(&frm);
Amaury Denoyelle663e8722022-12-09 14:58:28 +01001897 TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
1898 return 1;
1899 }
1900
1901 done:
1902 qcs->flags &= ~QC_SF_TO_STOP_SENDING;
1903
1904 TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
1905 return 0;
1906}
1907
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02001908/* Used internally by qcc_io_send function. Proceed to send for <qcs>. This will
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001909 * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
Amaury Denoyelleb9e06402022-06-10 15:16:21 +02001910 * is then generated and inserted in <frms> list.
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001911 *
1912 * Returns the total bytes transferred between qcs and quic_stream buffers. Can
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02001913 * be null if out buffer cannot be allocated. On error a negative error code is
1914 * used.
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001915 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02001916static int qcs_send(struct qcs *qcs, struct list *frms)
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001917{
1918 struct qcc *qcc = qcs->qcc;
1919 struct buffer *buf = &qcs->tx.buf;
1920 struct buffer *out = qc_stream_buf_get(qcs->stream);
Amaury Denoyelle6c501ed2023-05-12 16:19:32 +02001921 int xfer = 0, buf_avail;
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001922 char fin = 0;
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001923
Amaury Denoyelle1ec78ff2023-03-22 11:58:32 +01001924 TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
1925
Amaury Denoyelle20f2a422023-01-03 14:39:24 +01001926 /* Cannot send STREAM on remote unidirectional streams. */
1927 BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id));
1928
Amaury Denoyellea57ab0f2023-04-26 11:38:11 +02001929 if (b_data(buf)) {
1930 /* Allocate <out> buffer if not already done. */
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001931 if (!out) {
Amaury Denoyellea57ab0f2023-04-26 11:38:11 +02001932 if (qcc->flags & QC_CF_CONN_FULL)
1933 goto out;
1934
Amaury Denoyelle6c501ed2023-05-12 16:19:32 +02001935 out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset,
1936 &buf_avail);
Amaury Denoyellea57ab0f2023-04-26 11:38:11 +02001937 if (!out) {
Amaury Denoyelle1611a762023-05-15 13:56:46 +02001938 if (buf_avail) {
Amaury Denoyelle6c501ed2023-05-12 16:19:32 +02001939 TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
1940 goto err;
1941 }
1942
Amaury Denoyelle1611a762023-05-15 13:56:46 +02001943 TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
Amaury Denoyellea57ab0f2023-04-26 11:38:11 +02001944 qcc->flags |= QC_CF_CONN_FULL;
1945 goto out;
1946 }
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001947 }
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001948
Amaury Denoyellea57ab0f2023-04-26 11:38:11 +02001949 /* Transfer data from <buf> to <out>. */
Amaury Denoyelleb9e06402022-06-10 15:16:21 +02001950 xfer = qcs_xfer_data(qcs, out, buf);
Amaury Denoyelle0abde9d2023-05-11 16:52:17 +02001951 if (xfer < 0)
1952 goto err;
1953
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001954 if (xfer > 0) {
1955 qcs_notify_send(qcs);
1956 qcs->flags &= ~QC_SF_BLK_MROOM;
1957 }
1958
1959 qcs->tx.offset += xfer;
Amaury Denoyelle78fa5592022-06-10 15:18:12 +02001960 BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd);
Amaury Denoyelleb9e06402022-06-10 15:16:21 +02001961 qcc->tx.offsets += xfer;
Amaury Denoyelle78fa5592022-06-10 15:18:12 +02001962 BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md);
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001963
Amaury Denoyellea57ab0f2023-04-26 11:38:11 +02001964 /* out buffer cannot be emptied if qcs offsets differ. */
1965 BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset);
1966 }
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001967
Ilya Shipitsin3b64a282022-07-29 22:26:53 +05001968 /* FIN is set if all incoming data were transferred. */
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001969 fin = qcs_stream_fin(qcs);
1970
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001971 /* Build a new STREAM frame with <out> buffer. */
Amaury Denoyellee53b4892022-07-08 17:19:40 +02001972 if (qcs->tx.sent_offset != qcs->tx.offset || fin) {
Amaury Denoyelle04b22082023-05-03 09:50:39 +02001973 /* Skip STREAM frame allocation if already subscribed for send.
1974 * Happens on sendto transient error or network congestion.
1975 */
1976 if (qcc->wait_event.events & SUB_RETRY_SEND) {
1977 TRACE_DEVEL("already subscribed for sending",
1978 QMUX_EV_QCS_SEND, qcc->conn, qcs);
1979 goto err;
1980 }
1981
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02001982 if (qcs_build_stream_frm(qcs, out, fin, frms) < 0)
1983 goto err;
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001984 }
1985
Amaury Denoyelle1ec78ff2023-03-22 11:58:32 +01001986 out:
1987 TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001988 return xfer;
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02001989
1990 err:
1991 TRACE_DEVEL("leaving on error", QMUX_EV_QCS_SEND, qcc->conn, qcs);
1992 return -1;
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02001993}
1994
Amaury Denoyelle75d14ad2022-03-22 15:10:29 +01001995/* Proceed to sending. Loop through all available streams for the <qcc>
1996 * instance and try to send as much as possible.
1997 *
1998 * Returns the total of bytes sent to the transport layer.
1999 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002000static int qcc_io_send(struct qcc *qcc)
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002001{
Amaury Denoyelle6ccfa3c2022-03-10 16:45:53 +01002002 struct list frms = LIST_HEAD_INIT(frms);
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02002003 /* Temporary list for QCS on error. */
2004 struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
Amaury Denoyelle7c5591f2023-04-21 14:48:01 +02002005 struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02002006 int ret, total = 0;
Frédéric Lécaille578a7892021-09-13 16:13:00 +02002007
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002008 TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
Frédéric Lécaille8526f142021-09-20 17:58:22 +02002009
Amaury Denoyelle04b22082023-05-03 09:50:39 +02002010 /* TODO if socket in transient error, sending should be temporarily
2011 * disabled for all frames. However, checking for send subscription is
2012 * not valid as this may be caused by a congestion error which only
2013 * apply for STREAM frames.
2014 */
2015
Amaury Denoyelle5f67b172023-05-04 18:52:42 +02002016 /* Check for transport error. */
2017 if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
2018 TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
2019 goto out;
2020 }
2021
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02002022 /* Check for locally detected connection error. */
2023 if (qcc->flags & QC_CF_ERRL) {
2024 /* Prepare a CONNECTION_CLOSE if not already done. */
2025 if (!(qcc->flags & QC_CF_ERRL_DONE)) {
2026 TRACE_DATA("report a connection error", QMUX_EV_QCC_SEND|QMUX_EV_QCC_ERR, qcc->conn);
2027 quic_set_connection_close(qcc->conn->handle.qc, qcc->err);
2028 qcc->flags |= QC_CF_ERRL_DONE;
2029 }
Amaury Denoyelle2ad41b82023-05-10 11:59:10 +02002030 goto out;
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02002031 }
2032
2033 if (qcc->conn->flags & CO_FL_SOCK_WR_SH) {
Amaury Denoyelled97fc802022-04-06 16:13:09 +02002034 qcc->conn->flags |= CO_FL_ERROR;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002035 TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
Amaury Denoyelle2ad41b82023-05-10 11:59:10 +02002036 goto out;
Amaury Denoyelled97fc802022-04-06 16:13:09 +02002037 }
2038
Amaury Denoyellec985cb12022-05-16 14:29:59 +02002039 if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002040 if (qcc_send_frames(qcc, &qcc->lfctl.frms)) {
Amaury Denoyellec985cb12022-05-16 14:29:59 +02002041 TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
2042 goto out;
2043 }
2044 }
Amaury Denoyellec9337802022-04-04 16:36:34 +02002045
Amaury Denoyelle0a1154a2023-01-06 17:43:11 +01002046 /* Send STREAM/STOP_SENDING/RESET_STREAM data for registered streams. */
2047 list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_list, el_send) {
Amaury Denoyelle7c5591f2023-04-21 14:48:01 +02002048 /* Check if all QCS were processed. */
2049 if (qcs == first_qcs)
2050 break;
2051
Amaury Denoyelle0a1154a2023-01-06 17:43:11 +01002052 /* Stream must not be present in send_list if it has nothing to send. */
2053 BUG_ON(!(qcs->flags & (QC_SF_TO_STOP_SENDING|QC_SF_TO_RESET)) &&
2054 !qcs_need_sending(qcs));
Amaury Denoyellec6195d72022-05-23 11:39:14 +02002055
Amaury Denoyelle0a1154a2023-01-06 17:43:11 +01002056 /* Each STOP_SENDING/RESET_STREAM frame is sent individually to
2057 * guarantee its emission.
2058 *
2059 * TODO multiplex several frames in same datagram to optimize sending
2060 */
2061 if (qcs->flags & QC_SF_TO_STOP_SENDING) {
2062 if (qcs_send_stop_sending(qcs))
Amaury Denoyelle2ad41b82023-05-10 11:59:10 +02002063 goto sent_done;
Amaury Denoyelle2c71fe52022-02-09 18:16:49 +01002064
Amaury Denoyelle0a1154a2023-01-06 17:43:11 +01002065 /* Remove stream from send_list if it had only STOP_SENDING
2066 * to send.
2067 */
2068 if (!(qcs->flags & QC_SF_TO_RESET) && !qcs_need_sending(qcs)) {
2069 LIST_DEL_INIT(&qcs->el_send);
2070 continue;
2071 }
Amaury Denoyellee2ec9422022-03-10 16:46:18 +01002072 }
2073
Amaury Denoyelle843a1192022-07-04 11:44:38 +02002074 if (qcs->flags & QC_SF_TO_RESET) {
Amaury Denoyelle0a1154a2023-01-06 17:43:11 +01002075 if (qcs_send_reset(qcs))
Amaury Denoyelle2ad41b82023-05-10 11:59:10 +02002076 goto sent_done;
Amaury Denoyelle6ea78192022-03-07 15:47:02 +01002077
Amaury Denoyelle0a1154a2023-01-06 17:43:11 +01002078 /* RFC 9000 3.3. Permitted Frame Types
2079 *
2080 * A sender MUST NOT send
2081 * a STREAM or STREAM_DATA_BLOCKED frame for a stream in the
2082 * "Reset Sent" state or any terminal state -- that is, after
2083 * sending a RESET_STREAM frame.
2084 */
Amaury Denoyelle20f2a422023-01-03 14:39:24 +01002085 LIST_DEL_INIT(&qcs->el_send);
Amaury Denoyelled2f80a22022-04-15 17:30:49 +02002086 continue;
2087 }
Amaury Denoyellea4569202022-04-15 17:29:25 +02002088
Amaury Denoyelle61e91092024-01-09 11:42:08 +01002089 if (!(qcc->flags & QC_CF_BLK_MFCTL) &&
2090 !(qcs->flags & QC_SF_BLK_SFCTL)) {
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002091 if ((ret = qcs_send(qcs, &frms)) < 0) {
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02002092 /* Temporarily remove QCS from send-list. */
2093 LIST_DEL_INIT(&qcs->el_send);
2094 LIST_APPEND(&qcs_failed, &qcs->el_send);
2095 continue;
2096 }
2097
2098 total += ret;
Amaury Denoyelle7c5591f2023-04-21 14:48:01 +02002099 if (ret) {
2100 /* Move QCS with some bytes transferred at the
2101 * end of send-list for next iterations.
2102 */
2103 LIST_DEL_INIT(&qcs->el_send);
2104 LIST_APPEND(&qcc->send_list, &qcs->el_send);
2105 /* Remember first moved QCS as checkpoint to interrupt loop */
2106 if (!first_qcs)
2107 first_qcs = qcs;
2108 }
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02002109 }
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02002110 }
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02002111
Amaury Denoyellea9de7ea2023-01-06 17:16:47 +01002112 /* Retry sending until no frame to send, data rejected or connection
2113 * flow-control limit reached.
2114 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002115 while (qcc_send_frames(qcc, &frms) == 0 && !(qcc->flags & QC_CF_BLK_MFCTL)) {
Amaury Denoyellea9de7ea2023-01-06 17:16:47 +01002116 /* Reloop over <qcc.send_list>. Useful for streams which have
2117 * fulfilled their qc_stream_desc buf and have now release it.
2118 */
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02002119 list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_list, el_send) {
Amaury Denoyellea9de7ea2023-01-06 17:16:47 +01002120 /* Only streams blocked on flow-control or waiting on a
2121 * new qc_stream_desc should be present in send_list as
2122 * long as transport layer can handle all data.
2123 */
2124 BUG_ON(qcs->stream->buf && !(qcs->flags & QC_SF_BLK_SFCTL));
Amaury Denoyelleda6ad202022-04-12 11:41:04 +02002125
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02002126 if (!(qcs->flags & QC_SF_BLK_SFCTL)) {
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002127 if ((ret = qcs_send(qcs, &frms)) < 0) {
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02002128 LIST_DEL_INIT(&qcs->el_send);
2129 LIST_APPEND(&qcs_failed, &qcs->el_send);
2130 continue;
2131 }
2132
2133 total += ret;
2134 }
Amaury Denoyellea9de7ea2023-01-06 17:16:47 +01002135 }
Frédéric Lécaille578a7892021-09-13 16:13:00 +02002136 }
Frédéric Lécaille8526f142021-09-20 17:58:22 +02002137
Amaury Denoyelle2ad41b82023-05-10 11:59:10 +02002138 sent_done:
Amaury Denoyelle43c090c2022-06-10 15:16:40 +02002139 /* Deallocate frames that the transport layer has rejected. */
2140 if (!LIST_ISEMPTY(&frms)) {
2141 struct quic_frame *frm, *frm2;
Amaury Denoyelle20f2a422023-01-03 14:39:24 +01002142
Amaury Denoyelle57b3eaa2023-02-02 16:12:09 +01002143 list_for_each_entry_safe(frm, frm2, &frms, list)
2144 qc_frm_free(&frm);
Amaury Denoyelle43c090c2022-06-10 15:16:40 +02002145 }
2146
Amaury Denoyelle93d2ebe2023-04-19 11:42:24 +02002147 /* Re-insert on-error QCS at the end of the send-list. */
2148 if (!LIST_ISEMPTY(&qcs_failed)) {
2149 list_for_each_entry_safe(qcs, qcs_tmp, &qcs_failed, el_send) {
2150 LIST_DEL_INIT(&qcs->el_send);
2151 LIST_APPEND(&qcc->send_list, &qcs->el_send);
2152 }
2153
2154 if (!(qcc->flags & QC_CF_BLK_MFCTL))
2155 tasklet_wakeup(qcc->wait_event.tasklet);
2156 }
2157
Amaury Denoyelle2ad41b82023-05-10 11:59:10 +02002158 out:
Amaury Denoyelle5f67b172023-05-04 18:52:42 +02002159 if (qcc->conn->flags & CO_FL_ERROR && !(qcc->flags & QC_CF_ERR_CONN)) {
2160 TRACE_ERROR("error reported by transport layer",
2161 QMUX_EV_QCC_SEND, qcc->conn);
2162 qcc->flags |= QC_CF_ERR_CONN;
2163 }
2164
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002165 TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
Amaury Denoyelle75d14ad2022-03-22 15:10:29 +01002166 return total;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002167}
2168
Amaury Denoyelle37c2e4a2022-05-16 13:54:59 +02002169/* Proceed on receiving. Loop through all streams from <qcc> and use decode_qcs
2170 * operation.
2171 *
2172 * Returns 0 on success else non-zero.
2173 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002174static int qcc_io_recv(struct qcc *qcc)
Amaury Denoyelle37c2e4a2022-05-16 13:54:59 +02002175{
2176 struct eb64_node *node;
2177 struct qcs *qcs;
2178
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002179 TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyellee1cad8b2022-05-23 18:52:11 +02002180
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02002181 if (qcc->flags & QC_CF_ERRL) {
2182 TRACE_DATA("connection on error", QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002183 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelle5c4373a2022-05-24 14:47:48 +02002184 return 0;
2185 }
2186
Amaury Denoyelle37c2e4a2022-05-16 13:54:59 +02002187 node = eb64_first(&qcc->streams_by_id);
2188 while (node) {
Amaury Denoyellef8db5aa2022-05-24 15:26:07 +02002189 uint64_t id;
2190
Amaury Denoyelle37c2e4a2022-05-16 13:54:59 +02002191 qcs = eb64_entry(node, struct qcs, by_id);
Amaury Denoyellef8db5aa2022-05-24 15:26:07 +02002192 id = qcs->id;
Amaury Denoyelle37c2e4a2022-05-16 13:54:59 +02002193
Amaury Denoyellef8db5aa2022-05-24 15:26:07 +02002194 if (!ncb_data(&qcs->rx.ncbuf, 0) || (qcs->flags & QC_SF_DEM_FULL)) {
Amaury Denoyelle37c2e4a2022-05-16 13:54:59 +02002195 node = eb64_next(node);
2196 continue;
2197 }
2198
Amaury Denoyellef8db5aa2022-05-24 15:26:07 +02002199 if (quic_stream_is_uni(id) && quic_stream_is_local(qcc, id)) {
Amaury Denoyelle37c2e4a2022-05-16 13:54:59 +02002200 node = eb64_next(node);
2201 continue;
2202 }
2203
2204 qcc_decode_qcs(qcc, qcs);
2205 node = eb64_next(node);
2206 }
2207
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002208 TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
Amaury Denoyelle37c2e4a2022-05-16 13:54:59 +02002209 return 0;
2210}
2211
Amaury Denoyellec1a6dfd2022-07-08 14:04:21 +02002212
2213/* Release all streams which have their transfer operation achieved.
Amaury Denoyelle6a4aebf2022-02-01 10:16:05 +01002214 *
Amaury Denoyellec1a6dfd2022-07-08 14:04:21 +02002215 * Returns true if at least one stream is released.
Amaury Denoyelle6a4aebf2022-02-01 10:16:05 +01002216 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002217static int qcc_purge_streams(struct qcc *qcc)
Amaury Denoyelle2873a312021-12-08 14:42:55 +01002218{
2219 struct eb64_node *node;
2220 int release = 0;
2221
Amaury Denoyelle3baab742022-08-11 18:35:55 +02002222 TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
Amaury Denoyellec1a6dfd2022-07-08 14:04:21 +02002223
Amaury Denoyelle2873a312021-12-08 14:42:55 +01002224 node = eb64_first(&qcc->streams_by_id);
2225 while (node) {
Amaury Denoyellee4301da2022-04-19 17:59:50 +02002226 struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
Amaury Denoyelle2873a312021-12-08 14:42:55 +01002227 node = eb64_next(node);
2228
Amaury Denoyelle38e60062022-07-01 16:48:42 +02002229 /* Release not attached closed streams. */
2230 if (qcs->st == QC_SS_CLO && !qcs_sc(qcs)) {
Amaury Denoyelle047d86a2022-08-10 16:42:35 +02002231 TRACE_STATE("purging closed stream", QMUX_EV_QCC_WAKE, qcs->qcc->conn, qcs);
Amaury Denoyelle38e60062022-07-01 16:48:42 +02002232 qcs_destroy(qcs);
2233 release = 1;
2234 continue;
2235 }
2236
Amaury Denoyellec1a6dfd2022-07-08 14:04:21 +02002237 /* Release detached streams with empty buffer. */
Amaury Denoyelle2873a312021-12-08 14:42:55 +01002238 if (qcs->flags & QC_SF_DETACH) {
Amaury Denoyelle20d1f842022-07-11 11:23:17 +02002239 if (qcs_is_close_local(qcs)) {
Amaury Denoyelle047d86a2022-08-10 16:42:35 +02002240 TRACE_STATE("purging detached stream", QMUX_EV_QCC_WAKE, qcs->qcc->conn, qcs);
Amaury Denoyelle2873a312021-12-08 14:42:55 +01002241 qcs_destroy(qcs);
2242 release = 1;
Amaury Denoyellec1a6dfd2022-07-08 14:04:21 +02002243 continue;
Amaury Denoyelle2873a312021-12-08 14:42:55 +01002244 }
Amaury Denoyelle2873a312021-12-08 14:42:55 +01002245 }
2246 }
2247
Amaury Denoyelle3baab742022-08-11 18:35:55 +02002248 TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
Amaury Denoyelle2873a312021-12-08 14:42:55 +01002249 return release;
2250}
2251
Amaury Denoyelleb30247b2023-01-24 18:18:23 +01002252/* Execute application layer shutdown. If this operation is not defined, a
2253 * CONNECTION_CLOSE will be prepared as a fallback. This function is protected
2254 * against multiple invocation with the flag QC_CF_APP_SHUT.
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002255 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002256static void qcc_shutdown(struct qcc *qcc)
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002257{
Amaury Denoyelleb30247b2023-01-24 18:18:23 +01002258 TRACE_ENTER(QMUX_EV_QCC_END, qcc->conn);
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002259
Amaury Denoyelle5f67b172023-05-04 18:52:42 +02002260 if (qcc->flags & (QC_CF_ERR_CONN|QC_CF_ERRL)) {
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02002261 TRACE_DATA("connection on error", QMUX_EV_QCC_END, qcc->conn);
Amaury Denoyelleb30247b2023-01-24 18:18:23 +01002262 goto out;
Amaury Denoyelle665817a2023-03-20 17:34:22 +01002263 }
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002264
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02002265 if (qcc->flags & QC_CF_APP_SHUT)
2266 goto out;
2267
2268 TRACE_STATE("perform graceful shutdown", QMUX_EV_QCC_END, qcc->conn);
Amaury Denoyellef8aaf8b2022-09-14 16:23:47 +02002269 if (qcc->app_ops && qcc->app_ops->shutdown) {
Amaury Denoyellef8aaf8b2022-09-14 16:23:47 +02002270 qcc->app_ops->shutdown(qcc->ctx);
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002271 qcc_io_send(qcc);
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002272 }
2273 else {
Amaury Denoyelleec1070c2024-05-13 09:02:47 +02002274 qcc->err = quic_err_transport(QC_ERR_NO_ERROR);
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002275 }
2276
Amaury Denoyelle51f116d2023-05-04 15:49:02 +02002277 /* Register "no error" code at transport layer. Do not use
2278 * quic_set_connection_close() as retransmission may be performed to
2279 * finalized transfers. Do not overwrite quic-conn existing code if
2280 * already set.
2281 *
2282 * TODO implement a wrapper function for this in quic-conn module
2283 */
2284 if (!(qcc->conn->handle.qc->flags & QUIC_FL_CONN_IMMEDIATE_CLOSE))
2285 qcc->conn->handle.qc->err = qcc->err;
2286
Amaury Denoyelleb30247b2023-01-24 18:18:23 +01002287 out:
2288 qcc->flags |= QC_CF_APP_SHUT;
2289 TRACE_LEAVE(QMUX_EV_QCC_END, qcc->conn);
2290}
2291
Amaury Denoyelle35542ce2023-05-03 18:16:40 +02002292/* Loop through all qcs from <qcc>. Report error on stream endpoint if
2293 * connection on error and wake them.
2294 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002295static int qcc_wake_some_streams(struct qcc *qcc)
Amaury Denoyelle35542ce2023-05-03 18:16:40 +02002296{
2297 struct qcs *qcs;
2298 struct eb64_node *node;
2299
2300 TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn);
2301
2302 for (node = eb64_first(&qcc->streams_by_id); node;
2303 node = eb64_next(node)) {
2304 qcs = eb64_entry(node, struct qcs, by_id);
2305
2306 if (!qcs_sc(qcs))
2307 continue;
2308
Amaury Denoyelle5f67b172023-05-04 18:52:42 +02002309 if (qcc->flags & (QC_CF_ERR_CONN|QC_CF_ERRL)) {
Amaury Denoyelle35542ce2023-05-03 18:16:40 +02002310 TRACE_POINT(QMUX_EV_QCC_WAKE, qcc->conn, qcs);
2311 se_fl_set_error(qcs->sd);
2312 qcs_alert(qcs);
2313 }
2314 }
2315
2316 return 0;
2317}
2318
Amaury Denoyelle14dbb842023-01-24 18:19:47 +01002319/* Conduct operations which should be made for <qcc> connection after
2320 * input/output. Most notably, closed streams are purged which may leave the
2321 * connection has ready to be released.
2322 *
2323 * Returns 1 if <qcc> must be released else 0.
2324 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002325static int qcc_io_process(struct qcc *qcc)
Amaury Denoyelle14dbb842023-01-24 18:19:47 +01002326{
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002327 qcc_purge_streams(qcc);
Amaury Denoyelle14dbb842023-01-24 18:19:47 +01002328
Amaury Denoyelleb3aa07c2023-01-24 18:20:28 +01002329 /* Check if a soft-stop is in progress.
2330 *
2331 * TODO this is relevant for frontend connections only.
2332 */
2333 if (unlikely(qcc->proxy->flags & (PR_FL_DISABLED|PR_FL_STOPPED))) {
2334 int close = 1;
2335
2336 /* If using listener socket, soft-stop is not supported. The
2337 * connection must be closed immediately.
2338 */
2339 if (!qc_test_fd(qcc->conn->handle.qc)) {
2340 TRACE_DEVEL("proxy disabled with listener socket, closing connection", QMUX_EV_QCC_WAKE, qcc->conn);
2341 qcc->conn->flags |= (CO_FL_SOCK_RD_SH|CO_FL_SOCK_WR_SH);
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002342 qcc_io_send(qcc);
Amaury Denoyelleb3aa07c2023-01-24 18:20:28 +01002343 goto out;
2344 }
2345
2346 TRACE_DEVEL("proxy disabled, prepare connection soft-stop", QMUX_EV_QCC_WAKE, qcc->conn);
2347
2348 /* If a close-spread-time option is set, we want to avoid
2349 * closing all the active HTTP3 connections at once so we add a
2350 * random factor that will spread the closing.
2351 */
2352 if (tick_isset(global.close_spread_end)) {
2353 int remaining_window = tick_remain(now_ms, global.close_spread_end);
2354 if (remaining_window) {
2355 /* This should increase the closing rate the
2356 * further along the window we are. */
2357 close = (remaining_window <= statistical_prng_range(global.close_spread_time));
2358 }
2359 }
2360 else if (global.tune.options & GTUNE_DISABLE_ACTIVE_CLOSE) {
2361 close = 0; /* let the client close his connection himself */
2362 }
2363
2364 if (close)
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002365 qcc_shutdown(qcc);
Amaury Denoyelleb3aa07c2023-01-24 18:20:28 +01002366 }
2367
Amaury Denoyelle35542ce2023-05-03 18:16:40 +02002368 /* Report error if set on stream endpoint layer. */
Amaury Denoyelle5f67b172023-05-04 18:52:42 +02002369 if (qcc->flags & (QC_CF_ERR_CONN|QC_CF_ERRL))
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002370 qcc_wake_some_streams(qcc);
Amaury Denoyelle35542ce2023-05-03 18:16:40 +02002371
Amaury Denoyelleb3aa07c2023-01-24 18:20:28 +01002372 out:
Amaury Denoyelle14dbb842023-01-24 18:19:47 +01002373 if (qcc_is_dead(qcc))
2374 return 1;
2375
2376 return 0;
2377}
2378
Amaury Denoyelleb30247b2023-01-24 18:18:23 +01002379/* release function. This one should be called to free all resources allocated
2380 * to the mux.
2381 */
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002382static void qcc_release(struct qcc *qcc)
Amaury Denoyelleb30247b2023-01-24 18:18:23 +01002383{
2384 struct connection *conn = qcc->conn;
2385 struct eb64_node *node;
2386
2387 TRACE_ENTER(QMUX_EV_QCC_END, conn);
2388
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002389 qcc_shutdown(qcc);
Amaury Denoyelleb30247b2023-01-24 18:18:23 +01002390
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002391 if (qcc->task) {
2392 task_destroy(qcc->task);
2393 qcc->task = NULL;
2394 }
2395
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002396 /* liberate remaining qcs instances */
2397 node = eb64_first(&qcc->streams_by_id);
2398 while (node) {
2399 struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
2400 node = eb64_next(node);
2401 qcs_free(qcs);
2402 }
2403
Amaury Denoyelled15b2312024-03-22 15:13:42 +01002404 tasklet_free(qcc->wait_event.tasklet);
2405 if (conn && qcc->wait_event.events) {
2406 conn->xprt->unsubscribe(conn, conn->xprt_ctx,
2407 qcc->wait_event.events,
2408 &qcc->wait_event);
2409 }
2410
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002411 while (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
2412 struct quic_frame *frm = LIST_ELEM(qcc->lfctl.frms.n, struct quic_frame *, list);
Amaury Denoyelle57b3eaa2023-02-02 16:12:09 +01002413 qc_frm_free(&frm);
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002414 }
2415
Amaury Denoyellef8aaf8b2022-09-14 16:23:47 +02002416 if (qcc->app_ops && qcc->app_ops->release)
2417 qcc->app_ops->release(qcc->ctx);
2418 TRACE_PROTO("application layer released", QMUX_EV_QCC_END, conn);
2419
Amaury Denoyellec49d5d12022-07-15 10:32:53 +02002420 pool_free(pool_head_qcc, qcc);
2421
2422 if (conn) {
2423 LIST_DEL_INIT(&conn->stopping_list);
2424
2425 conn->handle.qc->conn = NULL;
2426 conn->mux = NULL;
2427 conn->ctx = NULL;
2428
2429 TRACE_DEVEL("freeing conn", QMUX_EV_QCC_END, conn);
2430
2431 conn_stop_tracking(conn);
2432 conn_full_close(conn);
2433 if (conn->destroy_cb)
2434 conn->destroy_cb(conn);
2435 conn_free(conn);
2436 }
2437
2438 TRACE_LEAVE(QMUX_EV_QCC_END);
2439}
2440
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002441struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002442{
Amaury Denoyelle769e9ff2021-10-05 11:43:50 +02002443 struct qcc *qcc = ctx;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002444
Amaury Denoyelle3baab742022-08-11 18:35:55 +02002445 TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002446
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002447 qcc_io_send(qcc);
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002448
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002449 qcc_io_recv(qcc);
Amaury Denoyelle37c2e4a2022-05-16 13:54:59 +02002450
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002451 if (qcc_io_process(qcc)) {
Amaury Denoyelle14dbb842023-01-24 18:19:47 +01002452 TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn);
2453 goto release;
2454 }
Amaury Denoyelle5fc05d12022-07-25 14:58:48 +02002455
2456 qcc_refresh_timeout(qcc);
2457
Amaury Denoyelled3973852022-07-25 14:56:54 +02002458 end:
Amaury Denoyelle3baab742022-08-11 18:35:55 +02002459 TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
2460 return NULL;
Amaury Denoyelle4f137572022-03-24 17:10:00 +01002461
Amaury Denoyelle3baab742022-08-11 18:35:55 +02002462 release:
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002463 qcc_release(qcc);
Amaury Denoyelle3baab742022-08-11 18:35:55 +02002464 TRACE_LEAVE(QMUX_EV_QCC_WAKE);
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002465 return NULL;
2466}
2467
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002468static struct task *qcc_timeout_task(struct task *t, void *ctx, unsigned int state)
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002469{
2470 struct qcc *qcc = ctx;
2471 int expired = tick_is_expired(t->expire, now_ms);
2472
Amaury Denoyelle4f137572022-03-24 17:10:00 +01002473 TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc ? qcc->conn : NULL);
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002474
2475 if (qcc) {
2476 if (!expired) {
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002477 TRACE_DEVEL("not expired", QMUX_EV_QCC_WAKE, qcc->conn);
2478 goto requeue;
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002479 }
2480
2481 if (!qcc_may_expire(qcc)) {
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002482 TRACE_DEVEL("cannot expired", QMUX_EV_QCC_WAKE, qcc->conn);
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002483 t->expire = TICK_ETERNITY;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002484 goto requeue;
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002485 }
2486 }
2487
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002488 task_destroy(t);
Amaury Denoyelleea3e0352022-02-21 10:05:16 +01002489
Amaury Denoyelle4f137572022-03-24 17:10:00 +01002490 if (!qcc) {
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002491 TRACE_DEVEL("no more qcc", QMUX_EV_QCC_WAKE);
2492 goto out;
Amaury Denoyelle4f137572022-03-24 17:10:00 +01002493 }
Amaury Denoyelleea3e0352022-02-21 10:05:16 +01002494
Amaury Denoyelleca7a0632023-10-26 18:17:29 +02002495 /* Mark timeout as triggered by setting task to NULL. */
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002496 qcc->task = NULL;
2497
Amaury Denoyelle5fc05d12022-07-25 14:58:48 +02002498 /* TODO depending on the timeout condition, different shutdown mode
2499 * should be used. For http keep-alive or disabled proxy, a graceful
2500 * shutdown should occurs. For all other cases, an immediate close
2501 * seems legitimate.
2502 */
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02002503 if (qcc_is_dead(qcc)) {
2504 TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn);
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002505 qcc_release(qcc);
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02002506 }
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002507
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002508 out:
Amaury Denoyelle4f137572022-03-24 17:10:00 +01002509 TRACE_LEAVE(QMUX_EV_QCC_WAKE);
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002510 return NULL;
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002511
2512 requeue:
2513 TRACE_LEAVE(QMUX_EV_QCC_WAKE);
2514 return t;
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002515}
2516
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002517static int qmux_init(struct connection *conn, struct proxy *prx,
2518 struct session *sess, struct buffer *input)
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002519{
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002520 struct qcc *qcc;
Amaury Denoyelle6ea78192022-03-07 15:47:02 +01002521 struct quic_transport_params *lparams, *rparams;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002522
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02002523 TRACE_ENTER(QMUX_EV_QCC_NEW);
2524
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002525 qcc = pool_alloc(pool_head_qcc);
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02002526 if (!qcc) {
2527 TRACE_ERROR("alloc failure", QMUX_EV_QCC_NEW);
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002528 goto fail_no_qcc;
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02002529 }
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002530
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002531 qcc->conn = conn;
2532 conn->ctx = qcc;
Amaury Denoyellec603de42022-07-25 11:21:46 +02002533 qcc->nb_hreq = qcc->nb_sc = 0;
Amaury Denoyellece1f30d2022-02-01 15:14:24 +01002534 qcc->flags = 0;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002535
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002536 qcc->app_ops = NULL;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002537
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002538 qcc->streams_by_id = EB_ROOT_UNIQUE;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002539
Amaury Denoyellef3b0ba72021-12-08 15:12:01 +01002540 /* Server parameters, params used for RX flow control. */
Willy Tarreau784b8682022-04-11 14:18:10 +02002541 lparams = &conn->handle.qc->rx.params;
Amaury Denoyellef3b0ba72021-12-08 15:12:01 +01002542
Amaury Denoyelleb9e06402022-06-10 15:16:21 +02002543 qcc->tx.sent_offsets = qcc->tx.offsets = 0;
Amaury Denoyellef3b0ba72021-12-08 15:12:01 +01002544
Amaury Denoyellec985cb12022-05-16 14:29:59 +02002545 LIST_INIT(&qcc->lfctl.frms);
Amaury Denoyelle78396e52022-03-21 17:13:32 +01002546 qcc->lfctl.ms_bidi = qcc->lfctl.ms_bidi_init = lparams->initial_max_streams_bidi;
Amaury Denoyellebf3c2082022-08-16 11:29:08 +02002547 qcc->lfctl.ms_uni = lparams->initial_max_streams_uni;
Amaury Denoyelle44d09122022-04-26 11:21:10 +02002548 qcc->lfctl.msd_bidi_l = lparams->initial_max_stream_data_bidi_local;
2549 qcc->lfctl.msd_bidi_r = lparams->initial_max_stream_data_bidi_remote;
Amaury Denoyelle176174f2022-10-21 17:02:18 +02002550 qcc->lfctl.msd_uni_r = lparams->initial_max_stream_data_uni;
Amaury Denoyelle78396e52022-03-21 17:13:32 +01002551 qcc->lfctl.cl_bidi_r = 0;
Amaury Denoyellec055e302022-02-07 16:09:06 +01002552
Amaury Denoyellec830e1e2022-05-16 16:19:59 +02002553 qcc->lfctl.md = qcc->lfctl.md_init = lparams->initial_max_data;
Amaury Denoyelled46b0f52022-05-20 15:05:07 +02002554 qcc->lfctl.offsets_recv = qcc->lfctl.offsets_consume = 0;
Amaury Denoyellec830e1e2022-05-16 16:19:59 +02002555
Willy Tarreau784b8682022-04-11 14:18:10 +02002556 rparams = &conn->handle.qc->tx.params;
Amaury Denoyelle05ce55e2022-03-08 10:35:42 +01002557 qcc->rfctl.md = rparams->initial_max_data;
Amaury Denoyelle6ea78192022-03-07 15:47:02 +01002558 qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
2559 qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
Amaury Denoyelle176174f2022-10-21 17:02:18 +02002560 qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
Amaury Denoyelle6ea78192022-03-07 15:47:02 +01002561
Amaury Denoyellea509ffb2022-07-04 15:50:33 +02002562 if (conn_is_back(conn)) {
2563 qcc->next_bidi_l = 0x00;
2564 qcc->largest_bidi_r = 0x01;
2565 qcc->next_uni_l = 0x02;
2566 qcc->largest_uni_r = 0x03;
2567 }
2568 else {
2569 qcc->largest_bidi_r = 0x00;
2570 qcc->next_bidi_l = 0x01;
2571 qcc->largest_uni_r = 0x02;
2572 qcc->next_uni_l = 0x03;
2573 }
2574
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002575 qcc->wait_event.tasklet = tasklet_new();
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02002576 if (!qcc->wait_event.tasklet) {
2577 TRACE_ERROR("taslket alloc failure", QMUX_EV_QCC_NEW);
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002578 goto fail_no_tasklet;
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02002579 }
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002580
Amaury Denoyelle20f2a422023-01-03 14:39:24 +01002581 LIST_INIT(&qcc->send_list);
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +02002582
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002583 qcc->wait_event.tasklet->process = qcc_io_cb;
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002584 qcc->wait_event.tasklet->context = qcc;
Frédéric Lécaillef27b66f2022-03-18 22:49:22 +01002585 qcc->wait_event.events = 0;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002586
Amaury Denoyelle07bf8f42022-07-22 16:16:03 +02002587 qcc->proxy = prx;
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002588 /* haproxy timeouts */
Amaury Denoyelleeb7d3202023-02-08 15:55:24 +01002589 if (conn_is_back(qcc->conn)) {
2590 qcc->timeout = prx->timeout.server;
2591 qcc->shut_timeout = tick_isset(prx->timeout.serverfin) ?
2592 prx->timeout.serverfin : prx->timeout.server;
2593 }
2594 else {
2595 qcc->timeout = prx->timeout.client;
2596 qcc->shut_timeout = tick_isset(prx->timeout.clientfin) ?
2597 prx->timeout.clientfin : prx->timeout.client;
2598 }
2599
Amaury Denoyelleca7a0632023-10-26 18:17:29 +02002600 /* Always allocate task even if timeout is unset. In MUX code, if task
2601 * is NULL, it indicates that a timeout has stroke earlier.
2602 */
2603 qcc->task = task_new_here();
2604 if (!qcc->task) {
2605 TRACE_ERROR("timeout task alloc failure", QMUX_EV_QCC_NEW);
2606 goto fail_no_timeout_task;
Amaury Denoyellea3daaec2022-04-21 16:29:27 +02002607 }
Amaury Denoyelleca7a0632023-10-26 18:17:29 +02002608 qcc->task->process = qcc_timeout_task;
2609 qcc->task->context = qcc;
2610 qcc->task->expire = tick_add_ifset(now_ms, qcc->timeout);
2611
Amaury Denoyellebd6ec1b2022-07-25 11:53:18 +02002612 qcc_reset_idle_start(qcc);
Amaury Denoyelle30e260e2022-08-03 11:17:57 +02002613 LIST_INIT(&qcc->opening_list);
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002614
Willy Tarreau784b8682022-04-11 14:18:10 +02002615 HA_ATOMIC_STORE(&conn->handle.qc->qcc, qcc);
Amaury Denoyelleb4d119f2023-01-25 17:44:36 +01002616
2617 if (qcc_install_app_ops(qcc, conn->handle.qc->app_ops)) {
Amaury Denoyelle8d44bfa2023-05-04 15:16:01 +02002618 TRACE_PROTO("Cannot install app layer", QMUX_EV_QCC_NEW|QMUX_EV_QCC_ERR, qcc->conn);
Amaury Denoyelleb4d119f2023-01-25 17:44:36 +01002619 /* prepare a CONNECTION_CLOSE frame */
2620 quic_set_connection_close(conn->handle.qc, quic_err_transport(QC_ERR_APPLICATION_ERROR));
2621 goto fail_install_app_ops;
2622 }
2623
Frédéric Lécaille9969adb2023-01-18 11:52:21 +01002624 if (qcc->app_ops == &h3_ops)
2625 proxy_inc_fe_cum_sess_ver_ctr(sess->listener, prx, 3);
2626
Amaury Denoyelleed820822023-04-19 17:58:39 +02002627 /* Register conn for idle front closing. This is done once everything is allocated. */
2628 if (!conn_is_back(conn))
2629 LIST_APPEND(&mux_stopping_data[tid].list, &conn->stopping_list);
2630
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002631 /* init read cycle */
2632 tasklet_wakeup(qcc->wait_event.tasklet);
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002633
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02002634 TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn);
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002635 return 0;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002636
Amaury Denoyelleb4d119f2023-01-25 17:44:36 +01002637 fail_install_app_ops:
2638 if (qcc->app_ops && qcc->app_ops->release)
2639 qcc->app_ops->release(qcc->ctx);
Amaury Denoyelleee65efb2023-05-12 16:29:48 +02002640 task_destroy(qcc->task);
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +01002641 fail_no_timeout_task:
2642 tasklet_free(qcc->wait_event.tasklet);
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002643 fail_no_tasklet:
2644 pool_free(pool_head_qcc, qcc);
2645 fail_no_qcc:
Amaury Denoyelle4c9a1642022-08-10 16:58:01 +02002646 TRACE_LEAVE(QMUX_EV_QCC_NEW);
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002647 return -1;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002648}
2649
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002650static void qmux_destroy(void *ctx)
Amaury Denoyelle2461bd52022-04-13 16:54:52 +02002651{
2652 struct qcc *qcc = ctx;
2653
2654 TRACE_ENTER(QMUX_EV_QCC_END, qcc->conn);
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002655 qcc_release(qcc);
Amaury Denoyelle2461bd52022-04-13 16:54:52 +02002656 TRACE_LEAVE(QMUX_EV_QCC_END);
2657}
2658
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002659static void qmux_strm_detach(struct sedesc *sd)
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002660{
Willy Tarreaud7b7e0d2022-05-27 16:09:35 +02002661 struct qcs *qcs = sd->se;
Amaury Denoyelle916f0ac2021-12-06 16:03:47 +01002662 struct qcc *qcc = qcs->qcc;
2663
Amaury Denoyelle4f137572022-03-24 17:10:00 +01002664 TRACE_ENTER(QMUX_EV_STRM_END, qcc->conn, qcs);
Amaury Denoyelle916f0ac2021-12-06 16:03:47 +01002665
Amaury Denoyelle38e60062022-07-01 16:48:42 +02002666 /* TODO this BUG_ON_HOT() is not correct as the stconn layer may detach
2667 * from the stream even if it is not closed remotely at the QUIC layer.
2668 * This happens for example when a stream must be closed due to a
2669 * rejected request. To better handle these cases, it will be required
2670 * to implement shutr/shutw MUX operations. Once this is done, this
2671 * BUG_ON_HOT() statement can be adjusted.
2672 */
2673 //BUG_ON_HOT(!qcs_is_close_remote(qcs));
Amaury Denoyellec603de42022-07-25 11:21:46 +02002674
2675 qcc_rm_sc(qcc);
Amaury Denoyelle06890aa2022-04-04 16:15:06 +02002676
Amaury Denoyelle5f67b172023-05-04 18:52:42 +02002677 if (!qcs_is_close_local(qcs) &&
2678 !(qcc->flags & (QC_CF_ERR_CONN|QC_CF_ERRL))) {
Amaury Denoyelle047d86a2022-08-10 16:42:35 +02002679 TRACE_STATE("remaining data, detaching qcs", QMUX_EV_STRM_END, qcc->conn, qcs);
Amaury Denoyelle2873a312021-12-08 14:42:55 +01002680 qcs->flags |= QC_SF_DETACH;
Amaury Denoyelle5fc05d12022-07-25 14:58:48 +02002681 qcc_refresh_timeout(qcc);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002682
2683 TRACE_LEAVE(QMUX_EV_STRM_END, qcc->conn, qcs);
Amaury Denoyelle2873a312021-12-08 14:42:55 +01002684 return;
2685 }
2686
Amaury Denoyelle916f0ac2021-12-06 16:03:47 +01002687 qcs_destroy(qcs);
Amaury Denoyelle1136e922022-02-01 10:33:09 +01002688
Amaury Denoyelle06890aa2022-04-04 16:15:06 +02002689 if (qcc_is_dead(qcc)) {
Amaury Denoyelle047d86a2022-08-10 16:42:35 +02002690 TRACE_STATE("killing dead connection", QMUX_EV_STRM_END, qcc->conn);
Amaury Denoyelle35a66c02022-08-12 15:56:21 +02002691 goto release;
Amaury Denoyelle06890aa2022-04-04 16:15:06 +02002692 }
Amaury Denoyelleca7a0632023-10-26 18:17:29 +02002693 else {
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002694 TRACE_DEVEL("refreshing connection's timeout", QMUX_EV_STRM_END, qcc->conn);
Amaury Denoyelle5fc05d12022-07-25 14:58:48 +02002695 qcc_refresh_timeout(qcc);
Amaury Denoyelle916f0ac2021-12-06 16:03:47 +01002696 }
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002697
2698 TRACE_LEAVE(QMUX_EV_STRM_END, qcc->conn);
Amaury Denoyelle35a66c02022-08-12 15:56:21 +02002699 return;
2700
2701 release:
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002702 qcc_release(qcc);
Amaury Denoyelle35a66c02022-08-12 15:56:21 +02002703 TRACE_LEAVE(QMUX_EV_STRM_END);
2704 return;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002705}
2706
Amaury Denoyelledeed7772021-12-03 11:36:46 +01002707/* Called from the upper layer, to receive data */
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002708static size_t qmux_strm_rcv_buf(struct stconn *sc, struct buffer *buf,
2709 size_t count, int flags)
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002710{
Willy Tarreau3215e732022-05-27 10:09:11 +02002711 struct qcs *qcs = __sc_mux_strm(sc);
Amaury Denoyelle16494692023-05-15 11:35:45 +02002712 struct qcc *qcc = qcs->qcc;
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01002713 size_t ret = 0;
Amaury Denoyelleeb53e5b2022-02-14 17:11:32 +01002714 char fin = 0;
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01002715
Amaury Denoyelle16494692023-05-15 11:35:45 +02002716 TRACE_ENTER(QMUX_EV_STRM_RECV, qcc->conn, qcs);
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002717
Amaury Denoyelled80fbca2022-09-19 17:02:28 +02002718 ret = qcs_http_rcv_buf(qcs, buf, count, &fin);
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01002719
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01002720 if (b_data(&qcs->rx.app_buf)) {
Willy Tarreaud7b7e0d2022-05-27 16:09:35 +02002721 se_fl_set(qcs->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01002722 }
2723 else {
Willy Tarreaud7b7e0d2022-05-27 16:09:35 +02002724 se_fl_clr(qcs->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01002725
Amaury Denoyellebf86d892023-05-12 18:16:31 +02002726 /* Set end-of-input when full message properly received. */
Christopher Faulet85eabfb2023-02-23 14:52:09 +01002727 if (fin) {
Amaury Denoyelle16494692023-05-15 11:35:45 +02002728 TRACE_STATE("report end-of-input", QMUX_EV_STRM_RECV, qcc->conn, qcs);
Amaury Denoyellebfddb422023-05-25 15:02:24 +02002729 se_fl_set(qcs->sd, SE_FL_EOI);
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01002730
Christopher Faulet85eabfb2023-02-23 14:52:09 +01002731 /* If request EOM is reported to the upper layer, it means the
2732 * QCS now expects data from the opposite side.
2733 */
2734 se_expect_data(qcs->sd);
2735 }
2736
Amaury Denoyelle3cb78142023-05-15 11:31:20 +02002737 /* Set end-of-stream on read closed. */
2738 if (qcs->flags & QC_SF_RECV_RESET ||
2739 qcc->conn->flags & CO_FL_SOCK_RD_SH) {
2740 TRACE_STATE("report end-of-stream", QMUX_EV_STRM_RECV, qcc->conn, qcs);
2741 se_fl_set(qcs->sd, SE_FL_EOS);
2742
2743 /* Set error if EOI not reached. This may happen on
2744 * RESET_STREAM reception or connection error.
2745 */
2746 if (!se_fl_test(qcs->sd, SE_FL_EOI)) {
2747 TRACE_STATE("report error on stream aborted", QMUX_EV_STRM_RECV, qcc->conn, qcs);
Amaury Denoyellebfddb422023-05-25 15:02:24 +02002748 se_fl_set(qcs->sd, SE_FL_ERROR);
Amaury Denoyelle3cb78142023-05-15 11:31:20 +02002749 }
2750 }
2751
Amaury Denoyelle16494692023-05-15 11:35:45 +02002752 if (se_fl_test(qcs->sd, SE_FL_ERR_PENDING)) {
2753 TRACE_STATE("report error", QMUX_EV_STRM_RECV, qcc->conn, qcs);
2754 se_fl_set(qcs->sd, SE_FL_ERROR);
2755 }
2756
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01002757 if (b_size(&qcs->rx.app_buf)) {
2758 b_free(&qcs->rx.app_buf);
2759 offer_buffers(NULL, 1);
2760 }
2761 }
2762
Amaury Denoyelleb8901d22023-05-03 15:30:04 +02002763 /* Restart demux if it was interrupted on full buffer. */
2764 if (ret && qcs->flags & QC_SF_DEM_FULL) {
Amaury Denoyelle217b0f42023-09-21 17:06:16 +02002765 /* Ensure DEM_FULL is only set if there is available data to
2766 * ensure we never do unnecessary wakeup here.
Amaury Denoyelleb8901d22023-05-03 15:30:04 +02002767 */
2768 BUG_ON(!ncb_data(&qcs->rx.ncbuf, 0));
2769
Amaury Denoyellef1fc0b32022-05-02 11:07:06 +02002770 qcs->flags &= ~QC_SF_DEM_FULL;
Amaury Denoyelle16494692023-05-15 11:35:45 +02002771 if (!(qcc->flags & QC_CF_ERRL))
2772 tasklet_wakeup(qcc->wait_event.tasklet);
Amaury Denoyellef1fc0b32022-05-02 11:07:06 +02002773 }
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01002774
Amaury Denoyelle16494692023-05-15 11:35:45 +02002775 TRACE_LEAVE(QMUX_EV_STRM_RECV, qcc->conn, qcs);
Amaury Denoyelle4f137572022-03-24 17:10:00 +01002776
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01002777 return ret;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002778}
2779
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002780static size_t qmux_strm_snd_buf(struct stconn *sc, struct buffer *buf,
2781 size_t count, int flags)
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002782{
Willy Tarreau3215e732022-05-27 10:09:11 +02002783 struct qcs *qcs = __sc_mux_strm(sc);
Amaury Denoyelle35542ce2023-05-03 18:16:40 +02002784 size_t ret = 0;
Amaury Denoyelle9534e592022-09-19 17:14:27 +02002785 char fin;
Amaury Denoyelle4f137572022-03-24 17:10:00 +01002786
2787 TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002788
Amaury Denoyelle3dc4e5a2022-09-13 16:49:21 +02002789 /* stream layer has been detached so no transfer must occur after. */
2790 BUG_ON_HOT(qcs->flags & QC_SF_DETACH);
2791
Amaury Denoyelle35542ce2023-05-03 18:16:40 +02002792 /* Report error if set on stream endpoint layer. */
Amaury Denoyelle5f67b172023-05-04 18:52:42 +02002793 if (qcs->qcc->flags & (QC_CF_ERR_CONN|QC_CF_ERRL)) {
Amaury Denoyelle35542ce2023-05-03 18:16:40 +02002794 se_fl_set(qcs->sd, SE_FL_ERROR);
2795 TRACE_DEVEL("connection in error", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
2796 goto end;
2797 }
2798
Amaury Denoyelle843a1192022-07-04 11:44:38 +02002799 if (qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)) {
Amaury Denoyelle0ed617a2022-09-20 14:46:40 +02002800 ret = qcs_http_reset_buf(qcs, buf, count);
Amaury Denoyelle38e60062022-07-01 16:48:42 +02002801 goto end;
2802 }
2803
Amaury Denoyelle9534e592022-09-19 17:14:27 +02002804 ret = qcs_http_snd_buf(qcs, buf, count, &fin);
Amaury Denoyelle24962dd2023-04-24 17:50:23 +02002805 if (fin) {
2806 TRACE_STATE("reached stream fin", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
Amaury Denoyelle9534e592022-09-19 17:14:27 +02002807 qcs->flags |= QC_SF_FIN_STREAM;
Amaury Denoyelle24962dd2023-04-24 17:50:23 +02002808 }
Amaury Denoyelle9534e592022-09-19 17:14:27 +02002809
Amaury Denoyelleab6cdec2023-01-10 10:41:41 +01002810 if (ret || fin) {
Amaury Denoyellef9b03262023-01-09 10:34:25 +01002811 qcc_send_stream(qcs, 0);
Amaury Denoyelle9534e592022-09-19 17:14:27 +02002812 if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
2813 tasklet_wakeup(qcs->qcc->wait_event.tasklet);
2814 }
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002815
Amaury Denoyelle38e60062022-07-01 16:48:42 +02002816 end:
Amaury Denoyelle4f137572022-03-24 17:10:00 +01002817 TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
2818
2819 return ret;
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002820}
2821
2822/* Called from the upper layer, to subscribe <es> to events <event_type>. The
2823 * event subscriber <es> is not allowed to change from a previous call as long
2824 * as at least one event is still subscribed. The <event_type> must only be a
2825 * combination of SUB_RETRY_RECV and SUB_RETRY_SEND. It always returns 0.
2826 */
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002827static int qmux_strm_subscribe(struct stconn *sc, int event_type,
2828 struct wait_event *es)
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002829{
Willy Tarreau3215e732022-05-27 10:09:11 +02002830 return qcs_subscribe(__sc_mux_strm(sc), event_type, es);
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002831}
2832
2833/* Called from the upper layer, to unsubscribe <es> from events <event_type>.
2834 * The <es> pointer is not allowed to differ from the one passed to the
2835 * subscribe() call. It always returns zero.
2836 */
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002837static int qmux_strm_unsubscribe(struct stconn *sc, int event_type, struct wait_event *es)
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002838{
Willy Tarreau3215e732022-05-27 10:09:11 +02002839 struct qcs *qcs = __sc_mux_strm(sc);
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002840
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002841 BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
2842 BUG_ON(qcs->subs && qcs->subs != es);
2843
2844 es->events &= ~event_type;
2845 if (!es->events)
2846 qcs->subs = NULL;
2847
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002848 return 0;
2849}
2850
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002851static int qmux_wake(struct connection *conn)
Amaury Denoyelle0e0969d2022-01-31 15:41:14 +01002852{
2853 struct qcc *qcc = conn->ctx;
Amaury Denoyelled97fc802022-04-06 16:13:09 +02002854
2855 TRACE_ENTER(QMUX_EV_QCC_WAKE, conn);
Amaury Denoyelle0e0969d2022-01-31 15:41:14 +01002856
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002857 if (qcc_io_process(qcc)) {
Amaury Denoyelle14dbb842023-01-24 18:19:47 +01002858 TRACE_STATE("releasing dead connection", QMUX_EV_QCC_WAKE, qcc->conn);
Amaury Denoyelled97fc802022-04-06 16:13:09 +02002859 goto release;
Amaury Denoyelle14dbb842023-01-24 18:19:47 +01002860 }
2861
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002862 qcc_wake_some_streams(qcc);
Amaury Denoyelled97fc802022-04-06 16:13:09 +02002863
Amaury Denoyelle5fc05d12022-07-25 14:58:48 +02002864 qcc_refresh_timeout(qcc);
2865
Amaury Denoyelled97fc802022-04-06 16:13:09 +02002866 TRACE_LEAVE(QMUX_EV_QCC_WAKE, conn);
Amaury Denoyelled97fc802022-04-06 16:13:09 +02002867 return 0;
Amaury Denoyelle0e0969d2022-01-31 15:41:14 +01002868
Amaury Denoyelled97fc802022-04-06 16:13:09 +02002869 release:
Amaury Denoyelled68f8b52023-05-30 15:04:46 +02002870 qcc_release(qcc);
Amaury Denoyellef0b67f92022-08-10 16:14:32 +02002871 TRACE_LEAVE(QMUX_EV_QCC_WAKE);
Amaury Denoyelle0e0969d2022-01-31 15:41:14 +01002872 return 1;
2873}
2874
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002875static void qmux_strm_shutw(struct stconn *sc, enum co_shw_mode mode)
Amaury Denoyellea473f192022-12-21 10:21:58 +01002876{
2877 struct qcs *qcs = __sc_mux_strm(sc);
Amaury Denoyelle24962dd2023-04-24 17:50:23 +02002878 struct qcc *qcc = qcs->qcc;
Amaury Denoyellea473f192022-12-21 10:21:58 +01002879
Amaury Denoyelle24962dd2023-04-24 17:50:23 +02002880 TRACE_ENTER(QMUX_EV_STRM_SHUT, qcc->conn, qcs);
Amaury Denoyellea473f192022-12-21 10:21:58 +01002881
Amaury Denoyelle24962dd2023-04-24 17:50:23 +02002882 /* Early closure reported if QC_SF_FIN_STREAM not yet set. */
Amaury Denoyellea473f192022-12-21 10:21:58 +01002883 if (!qcs_is_close_local(qcs) &&
2884 !(qcs->flags & (QC_SF_FIN_STREAM|QC_SF_TO_RESET))) {
Amaury Denoyelle24962dd2023-04-24 17:50:23 +02002885
2886 if (qcs->flags & QC_SF_UNKNOWN_PL_LENGTH) {
2887 /* Close stream with a FIN STREAM frame. */
Amaury Denoyelle5f67b172023-05-04 18:52:42 +02002888 if (!(qcc->flags & (QC_CF_ERR_CONN|QC_CF_ERRL))) {
Amaury Denoyelle3fd40932023-05-10 10:41:47 +02002889 TRACE_STATE("set FIN STREAM",
2890 QMUX_EV_STRM_SHUT, qcc->conn, qcs);
2891 qcs->flags |= QC_SF_FIN_STREAM;
2892 qcc_send_stream(qcs, 0);
2893 }
Amaury Denoyelle24962dd2023-04-24 17:50:23 +02002894 }
2895 else {
2896 /* RESET_STREAM necessary. */
Amaury Denoyelle2f590382023-12-19 11:22:28 +01002897 qcc_reset_stream(qcs, 0);
Amaury Denoyelle24962dd2023-04-24 17:50:23 +02002898 }
2899
2900 tasklet_wakeup(qcc->wait_event.tasklet);
Amaury Denoyellea473f192022-12-21 10:21:58 +01002901 }
2902
Amaury Denoyelled4af0412023-05-03 18:17:19 +02002903 out:
Amaury Denoyelle24962dd2023-04-24 17:50:23 +02002904 TRACE_LEAVE(QMUX_EV_STRM_SHUT, qcc->conn, qcs);
Amaury Denoyellea473f192022-12-21 10:21:58 +01002905}
Amaury Denoyelle38e60062022-07-01 16:48:42 +02002906
Willy Tarreaub4a4fee2022-09-02 16:00:40 +02002907/* for debugging with CLI's "show sess" command. May emit multiple lines, each
2908 * new one being prefixed with <pfx>, if <pfx> is not NULL, otherwise a single
2909 * line is used. Each field starts with a space so it's safe to print it after
2910 * existing fields.
2911 */
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002912static int qmux_strm_show_sd(struct buffer *msg, struct sedesc *sd, const char *pfx)
Willy Tarreaub4a4fee2022-09-02 16:00:40 +02002913{
2914 struct qcs *qcs = sd->se;
2915 struct qcc *qcc;
2916 int ret = 0;
2917
2918 if (!qcs)
2919 return ret;
2920
2921 chunk_appendf(msg, " qcs=%p .flg=%#x .id=%llu .st=%s .ctx=%p, .err=%#llx",
2922 qcs, qcs->flags, (ull)qcs->id, qcs_st_to_str(qcs->st), qcs->ctx, (ull)qcs->err);
2923
2924 if (pfx)
2925 chunk_appendf(msg, "\n%s", pfx);
2926
2927 qcc = qcs->qcc;
2928 chunk_appendf(msg, " qcc=%p .flg=%#x .nbsc=%llu .nbhreq=%llu, .task=%p",
2929 qcc, qcc->flags, (ull)qcc->nb_sc, (ull)qcc->nb_hreq, qcc->task);
2930 return ret;
2931}
2932
2933
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002934static const struct mux_ops qmux_ops = {
2935 .init = qmux_init,
2936 .destroy = qmux_destroy,
2937 .detach = qmux_strm_detach,
2938 .rcv_buf = qmux_strm_rcv_buf,
2939 .snd_buf = qmux_strm_snd_buf,
2940 .subscribe = qmux_strm_subscribe,
2941 .unsubscribe = qmux_strm_unsubscribe,
2942 .wake = qmux_wake,
2943 .shutw = qmux_strm_shutw,
2944 .show_sd = qmux_strm_show_sd,
Willy Tarreaub5821e12022-04-26 11:54:08 +02002945 .flags = MX_FL_HTX|MX_FL_NO_UPG|MX_FL_FRAMED,
Willy Tarreau671bd5a2022-04-11 09:29:21 +02002946 .name = "QUIC",
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002947};
2948
2949static struct mux_proto_list mux_proto_quic =
Amaury Denoyelle0f61e4f2023-05-30 14:51:57 +02002950 { .token = IST("quic"), .mode = PROTO_MODE_HTTP, .side = PROTO_SIDE_FE, .mux = &qmux_ops };
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002951
2952INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_quic);