blob: e286d6a9aa92545ed3af0d826d01163d9e5a3874 [file] [log] [blame]
Amaury Denoyelledeed7772021-12-03 11:36:46 +01001#include <haproxy/mux_quic.h>
Frédéric Lécailledfbae762021-02-18 09:59:01 +01002
Amaury Denoyelleeb01f592021-10-07 16:44:05 +02003#include <import/eb64tree.h>
4
Frédéric Lécailledfbae762021-02-18 09:59:01 +01005#include <haproxy/api.h>
Frédéric Lécailledfbae762021-02-18 09:59:01 +01006#include <haproxy/connection.h>
Christopher Faulet1329f2a2021-12-16 17:32:56 +01007#include <haproxy/conn_stream.h>
Amaury Denoyelledeed7772021-12-03 11:36:46 +01008#include <haproxy/dynbuf.h>
Amaury Denoyelle9a327a72022-02-14 17:11:09 +01009#include <haproxy/htx.h>
Amaury Denoyelledeed7772021-12-03 11:36:46 +010010#include <haproxy/pool.h>
Amaury Denoyelleeb01f592021-10-07 16:44:05 +020011#include <haproxy/ssl_sock-t.h>
Frédéric Lécailledfbae762021-02-18 09:59:01 +010012
Amaury Denoyelledeed7772021-12-03 11:36:46 +010013DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
Frédéric Lécailledfbae762021-02-18 09:59:01 +010014DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
15
Frédéric Lécailledfbae762021-02-18 09:59:01 +010016void quic_mux_transport_params_update(struct qcc *qcc)
17{
Amaury Denoyelledeed7772021-12-03 11:36:46 +010018 struct quic_transport_params *clt_params;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010019
Amaury Denoyelledeed7772021-12-03 11:36:46 +010020 /* Client parameters, params used to TX. */
21 clt_params = &qcc->conn->qc->tx.params;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010022
Amaury Denoyelledeed7772021-12-03 11:36:46 +010023 qcc->tx.max_data = clt_params->initial_max_data;
24 /* Client initiated streams must respect the server flow control. */
25 qcc->strms[QCS_CLT_BIDI].rx.max_data = clt_params->initial_max_stream_data_bidi_local;
26 qcc->strms[QCS_CLT_UNI].rx.max_data = clt_params->initial_max_stream_data_uni;
27
28 /* Server initiated streams must respect the server flow control. */
29 qcc->strms[QCS_SRV_BIDI].max_streams = clt_params->initial_max_streams_bidi;
30 qcc->strms[QCS_SRV_BIDI].tx.max_data = clt_params->initial_max_stream_data_bidi_remote;
31
32 qcc->strms[QCS_SRV_UNI].max_streams = clt_params->initial_max_streams_uni;
33 qcc->strms[QCS_SRV_UNI].tx.max_data = clt_params->initial_max_stream_data_uni;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010034}
35
Amaury Denoyelledeed7772021-12-03 11:36:46 +010036/* Allocate a new QUIC streams with id <id> and type <type>. */
37struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
Frédéric Lécailledfbae762021-02-18 09:59:01 +010038{
Amaury Denoyelledeed7772021-12-03 11:36:46 +010039 struct qcs *qcs;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010040
Amaury Denoyelledeed7772021-12-03 11:36:46 +010041 qcs = pool_alloc(pool_head_qcs);
42 if (!qcs)
43 goto out;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010044
Amaury Denoyellefdbf63e2021-12-16 15:22:30 +010045 fprintf(stderr, "%s: stream ID %lu\n", __func__, id);
Frédéric Lécailledfbae762021-02-18 09:59:01 +010046
Amaury Denoyelledeed7772021-12-03 11:36:46 +010047 qcs->qcc = qcc;
48 qcs->cs = NULL;
49 qcs->flags = QC_SF_NONE;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010050
Amaury Denoyelledeed7772021-12-03 11:36:46 +010051 qcs->by_id.key = id;
52 eb64_insert(&qcc->streams_by_id, &qcs->by_id);
53 qcc->strms[type].nb_streams++;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010054
Amaury Denoyelledeed7772021-12-03 11:36:46 +010055 qcs->rx.buf = BUF_NULL;
Amaury Denoyelle9a327a72022-02-14 17:11:09 +010056 qcs->rx.app_buf = BUF_NULL;
Amaury Denoyelledeed7772021-12-03 11:36:46 +010057 qcs->rx.offset = 0;
58 qcs->rx.frms = EB_ROOT_UNIQUE;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010059
Amaury Denoyelledeed7772021-12-03 11:36:46 +010060 qcs->tx.buf = BUF_NULL;
61 qcs->tx.xprt_buf = BUF_NULL;
62 qcs->tx.offset = 0;
63 qcs->tx.ack_offset = 0;
64 qcs->tx.acked_frms = EB_ROOT_UNIQUE;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010065
Amaury Denoyelledeed7772021-12-03 11:36:46 +010066 qcs->wait_event.tasklet = NULL;
67 qcs->wait_event.events = 0;
68 qcs->subs = NULL;
69
70 out:
71 return qcs;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010072}
73
Amaury Denoyelledeed7772021-12-03 11:36:46 +010074/* Free a qcs. This function must only be used for unidirectional streams.
75 * Bidirectional streams are released by the upper layer through qc_detach().
Frédéric Lécailledfbae762021-02-18 09:59:01 +010076 */
Amaury Denoyelledeed7772021-12-03 11:36:46 +010077void uni_qcs_free(struct qcs *qcs)
Frédéric Lécailledfbae762021-02-18 09:59:01 +010078{
Amaury Denoyelledeed7772021-12-03 11:36:46 +010079 eb64_delete(&qcs->by_id);
80 pool_free(pool_head_qcs, qcs);
Frédéric Lécailledfbae762021-02-18 09:59:01 +010081}
82
Amaury Denoyelledeed7772021-12-03 11:36:46 +010083struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr)
Frédéric Lécailledfbae762021-02-18 09:59:01 +010084{
Amaury Denoyelledeed7772021-12-03 11:36:46 +010085 struct buffer *buf = b_alloc(bptr);
86 BUG_ON(!buf);
87 return buf;
Frédéric Lécailledfbae762021-02-18 09:59:01 +010088}
89
Amaury Denoyellea3f222d2021-12-06 11:24:00 +010090int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
91{
92 fprintf(stderr, "%s\n", __func__);
93
94 BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
95 BUG_ON(qcs->subs && qcs->subs != es);
96
97 es->events |= event_type;
98 qcs->subs = es;
99
100 return 0;
101}
102
103void qcs_notify_recv(struct qcs *qcs)
104{
105 if (qcs->subs && qcs->subs->events & SUB_RETRY_RECV) {
106 tasklet_wakeup(qcs->subs->tasklet);
107 qcs->subs->events &= ~SUB_RETRY_RECV;
108 if (!qcs->subs->events)
109 qcs->subs = NULL;
110 }
111}
112
113void qcs_notify_send(struct qcs *qcs)
114{
115 if (qcs->subs && qcs->subs->events & SUB_RETRY_SEND) {
116 tasklet_wakeup(qcs->subs->tasklet);
117 qcs->subs->events &= ~SUB_RETRY_SEND;
118 if (!qcs->subs->events)
119 qcs->subs = NULL;
120 }
121}
122
Amaury Denoyelle8a5b27a2021-12-21 11:53:10 +0100123/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
124 * several streams, depending on the already open ones.
125 * Return this node if succeeded, NULL if not.
126 */
127struct eb64_node *qcc_get_qcs(struct qcc *qcc, uint64_t id)
128{
129 unsigned int strm_type;
130 int64_t sub_id;
131 struct eb64_node *strm_node;
132
133 strm_type = id & QCS_ID_TYPE_MASK;
134 sub_id = id >> QCS_ID_TYPE_SHIFT;
135 strm_node = NULL;
136 if (qc_local_stream_id(qcc, id)) {
137 /* Local streams: this stream must be already opened. */
138 strm_node = eb64_lookup(&qcc->streams_by_id, id);
139 if (!strm_node) {
140 /* unknown stream id */
141 goto out;
142 }
143 }
144 else {
145 /* Remote streams. */
146 struct eb_root *strms;
147 uint64_t largest_id;
148 enum qcs_type qcs_type;
149
150 strms = &qcc->streams_by_id;
151 qcs_type = qcs_id_type(id);
152 if (sub_id + 1 > qcc->strms[qcs_type].max_streams) {
153 /* streams limit reached */
154 goto out;
155 }
156
157 /* Note: ->largest_id was initialized with (uint64_t)-1 as value, 0 being a
158 * correct value.
159 */
160 largest_id = qcc->strms[qcs_type].largest_id;
161 if (sub_id > (int64_t)largest_id) {
162 /* RFC: "A stream ID that is used out of order results in all streams
163 * of that type with lower-numbered stream IDs also being opened".
164 * So, let's "open" these streams.
165 */
166 int64_t i;
167 struct qcs *qcs;
168
169 qcs = NULL;
170 for (i = largest_id + 1; i <= sub_id; i++) {
171 uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
172 enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
173 qcs = qcs_new(qcc, id, type);
174 if (!qcs) {
175 /* allocation failure */
176 goto out;
177 }
178
179 qcc->strms[qcs_type].largest_id = i;
180 }
181 if (qcs)
182 strm_node = &qcs->by_id;
183 }
184 else {
185 strm_node = eb64_lookup(strms, id);
186 }
187 }
188
189 return strm_node;
190
191 out:
192 return NULL;
193}
194
Ilya Shipitsin5e87bcf2021-12-25 11:45:52 +0500195/* detaches the QUIC stream from its QCC and releases it to the QCS pool. */
Amaury Denoyelle2873a312021-12-08 14:42:55 +0100196static void qcs_destroy(struct qcs *qcs)
197{
198 fprintf(stderr, "%s: release stream %llu\n", __func__, qcs->by_id.key);
199
200 eb64_delete(&qcs->by_id);
201
202 b_free(&qcs->rx.buf);
203 b_free(&qcs->tx.buf);
204 b_free(&qcs->tx.xprt_buf);
205
206 --qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams;
207
208 pool_free(pool_head_qcs, qcs);
209}
210
211static inline int qcc_is_dead(const struct qcc *qcc)
212{
213 fprintf(stderr, "%s: %lu\n", __func__, qcc->strms[QCS_CLT_BIDI].nb_streams);
214
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +0100215 if (!qcc->strms[QCS_CLT_BIDI].nb_streams && !qcc->task)
216 return 1;
217
218 return 0;
219}
220
221/* Return true if the mux timeout should be armed. */
222static inline int qcc_may_expire(struct qcc *qcc)
223{
224
225 /* Consider that the timeout must be set if no bidirectional streams
226 * are opened.
227 */
Amaury Denoyelle2873a312021-12-08 14:42:55 +0100228 if (!qcc->strms[QCS_CLT_BIDI].nb_streams)
229 return 1;
230
231 return 0;
232}
233
234/* release function. This one should be called to free all resources allocated
235 * to the mux.
236 */
237static void qc_release(struct qcc *qcc)
238{
239 struct connection *conn = NULL;
240
241 if (qcc) {
242 /* The connection must be aattached to this mux to be released */
243 if (qcc->conn && qcc->conn->ctx == qcc)
244 conn = qcc->conn;
245
246 if (qcc->wait_event.tasklet)
247 tasklet_free(qcc->wait_event.tasklet);
248
249 pool_free(pool_head_qcc, qcc);
250 }
251
252 if (conn) {
Amaury Denoyelle0e0969d2022-01-31 15:41:14 +0100253 LIST_DEL_INIT(&conn->stopping_list);
254
Frédéric Lécaille19cd46e2022-01-10 11:40:33 +0100255 conn->qc->conn = NULL;
Amaury Denoyelle2873a312021-12-08 14:42:55 +0100256 conn->mux = NULL;
257 conn->ctx = NULL;
258
259 conn_stop_tracking(conn);
260 conn_full_close(conn);
261 if (conn->destroy_cb)
262 conn->destroy_cb(conn);
263 conn_free(conn);
Frédéric Lécaille19cd46e2022-01-10 11:40:33 +0100264 fprintf(stderr, "conn@%p released\n", conn);
Amaury Denoyelle2873a312021-12-08 14:42:55 +0100265 }
266}
267
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200268static int qcs_push_frame(struct qcs *qcs, struct buffer *payload, int fin, uint64_t offset)
269{
270 struct quic_frame *frm;
Amaury Denoyelled3d97c62021-10-05 11:45:58 +0200271 struct buffer *buf = &qcs->tx.xprt_buf;
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200272 struct quic_enc_level *qel = &qcs->qcc->conn->qc->els[QUIC_TLS_ENC_LEVEL_APP];
Frédéric Lécailled2ba0962021-09-20 17:50:03 +0200273 int total = 0, to_xfer;
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200274
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100275 fprintf(stderr, "%s\n", __func__);
276
Amaury Denoyelle1e308ff2021-10-12 18:14:12 +0200277 qc_get_buf(qcs, buf);
Frédéric Lécailled2ba0962021-09-20 17:50:03 +0200278 to_xfer = QUIC_MIN(b_data(payload), b_room(buf));
279 if (!to_xfer)
280 goto out;
281
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200282 frm = pool_zalloc(pool_head_quic_frame);
283 if (!frm)
284 goto err;
285
Frédéric Lécailled2ba0962021-09-20 17:50:03 +0200286 total = b_force_xfer(buf, payload, to_xfer);
Amaury Denoyellefecfa0d2021-12-07 16:50:14 +0100287 /* FIN is positioned only when the buffer has been totally emptied. */
Frédéric Lécailled2ba0962021-09-20 17:50:03 +0200288 fin = fin && !b_data(payload);
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200289 frm->type = QUIC_FT_STREAM_8;
290 if (fin)
291 frm->type |= QUIC_STREAM_FRAME_TYPE_FIN_BIT;
292 if (offset) {
293 frm->type |= QUIC_STREAM_FRAME_TYPE_OFF_BIT;
294 frm->stream.offset.key = offset;
295 }
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100296 frm->stream.qcs = (struct qcs *)qcs;
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200297 frm->stream.buf = buf;
298 frm->stream.id = qcs->by_id.key;
299 if (total) {
300 frm->type |= QUIC_STREAM_FRAME_TYPE_LEN_BIT;
301 frm->stream.len = total;
302 }
303
Frédéric Lécaille82468ea2022-01-14 20:23:22 +0100304 LIST_APPEND(&qel->pktns->tx.frms, &frm->list);
Frédéric Lécailled2ba0962021-09-20 17:50:03 +0200305 out:
Frédéric Lécaille677b99d2021-12-21 11:53:33 +0100306 fprintf(stderr, "%s: total=%d fin=%d id=%llu offset=%lu\n",
307 __func__, total, fin, (ull)qcs->by_id.key, offset);
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200308 return total;
309
310 err:
311 return -1;
312}
313
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100314static int qc_send(struct qcc *qcc)
315{
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200316 struct eb64_node *node;
Amaury Denoyellee257d9e2021-12-03 14:39:29 +0100317 int xprt_wake = 0;
Amaury Denoyellec0b66ca2022-02-21 18:45:22 +0100318 int ret = 0;
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200319
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100320 fprintf(stderr, "%s\n", __func__);
Frédéric Lécaille8526f142021-09-20 17:58:22 +0200321
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200322 /* TODO simple loop through all streams and check if there is frames to
323 * send
324 */
325 node = eb64_first(&qcc->streams_by_id);
326 while (node) {
Amaury Denoyelled3d97c62021-10-05 11:45:58 +0200327 struct qcs *qcs = container_of(node, struct qcs, by_id);
328 struct buffer *buf = &qcs->tx.buf;
329 if (b_data(buf)) {
Amaury Denoyellefecfa0d2021-12-07 16:50:14 +0100330 char fin = qcs->flags & QC_SF_FIN_STREAM;
Amaury Denoyellec2025c12021-12-03 15:03:36 +0100331 ret = qcs_push_frame(qcs, buf, fin, qcs->tx.offset);
Amaury Denoyelled3d97c62021-10-05 11:45:58 +0200332 if (ret < 0)
333 ABORT_NOW();
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200334
Amaury Denoyellee257d9e2021-12-03 14:39:29 +0100335 if (ret > 0) {
Amaury Denoyellea3f222d2021-12-06 11:24:00 +0100336 qcs_notify_send(qcs);
Amaury Denoyelle84ea8dc2021-12-03 14:40:01 +0100337 if (qcs->flags & QC_SF_BLK_MROOM)
338 qcs->flags &= ~QC_SF_BLK_MROOM;
339
Amaury Denoyellee257d9e2021-12-03 14:39:29 +0100340 xprt_wake = 1;
341 }
Amaury Denoyellea543eb12021-10-06 14:53:13 +0200342
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100343 fprintf(stderr, "%s ret=%d\n", __func__, ret);
Amaury Denoyelled3d97c62021-10-05 11:45:58 +0200344 qcs->tx.offset += ret;
Amaury Denoyellea2c58a72021-12-03 14:38:31 +0100345
346 if (b_data(buf)) {
347 qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
348 SUB_RETRY_SEND, &qcc->wait_event);
349 }
Frédéric Lécaille578a7892021-09-13 16:13:00 +0200350 }
351 node = eb64_next(node);
352 }
Frédéric Lécaille8526f142021-09-20 17:58:22 +0200353
Amaury Denoyellee257d9e2021-12-03 14:39:29 +0100354 if (xprt_wake)
355 tasklet_wakeup(((struct ssl_sock_ctx *)(qcc->conn->xprt_ctx))->wait_event.tasklet);
356
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100357 return ret;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100358}
359
Amaury Denoyelle6a4aebf2022-02-01 10:16:05 +0100360/* Release all streams that are already marked as detached. This is only done
361 * if their TX buffers are empty or if a CONNECTION_CLOSE has been received.
362 *
363 * Return the number of released stream.
364 */
Amaury Denoyelle2873a312021-12-08 14:42:55 +0100365static int qc_release_detached_streams(struct qcc *qcc)
366{
367 struct eb64_node *node;
368 int release = 0;
369
370 node = eb64_first(&qcc->streams_by_id);
371 while (node) {
372 struct qcs *qcs = container_of(node, struct qcs, by_id);
373 node = eb64_next(node);
374
375 if (qcs->flags & QC_SF_DETACH) {
Amaury Denoyelled9751482022-02-01 15:15:11 +0100376 if ((!b_data(&qcs->tx.buf) && !b_data(&qcs->tx.xprt_buf))) {
Amaury Denoyelle2873a312021-12-08 14:42:55 +0100377 qcs_destroy(qcs);
378 release = 1;
379 }
380 else {
381 qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
382 SUB_RETRY_SEND, &qcc->wait_event);
383 }
384 }
385 }
386
387 return release;
388}
389
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100390static struct task *qc_io_cb(struct task *t, void *ctx, unsigned int status)
391{
Amaury Denoyelle769e9ff2021-10-05 11:43:50 +0200392 struct qcc *qcc = ctx;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100393
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100394 fprintf(stderr, "%s\n", __func__);
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100395
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100396 qc_send(qcc);
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100397
Amaury Denoyelle2873a312021-12-08 14:42:55 +0100398 if (qc_release_detached_streams(qcc)) {
Amaury Denoyelle1136e922022-02-01 10:33:09 +0100399 /* Schedule the mux timeout if no bidirectional streams left. */
400 if (qcc_may_expire(qcc)) {
401 qcc->task->expire = tick_add(now_ms, qcc->timeout);
402 task_queue(qcc->task);
Amaury Denoyelle2873a312021-12-08 14:42:55 +0100403 }
404 }
405
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100406 return NULL;
407}
408
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +0100409static struct task *qc_timeout_task(struct task *t, void *ctx, unsigned int state)
410{
411 struct qcc *qcc = ctx;
412 int expired = tick_is_expired(t->expire, now_ms);
413
414 fprintf(stderr, "%s\n", __func__);
415
416 if (qcc) {
417 if (!expired) {
418 fprintf(stderr, "%s: not expired\n", __func__);
419 return t;
420 }
421
422 if (!qcc_may_expire(qcc)) {
423 fprintf(stderr, "%s: cannot expire\n", __func__);
424 t->expire = TICK_ETERNITY;
425 return t;
426 }
427 }
428
429 fprintf(stderr, "%s: timeout\n", __func__);
430 task_destroy(t);
Amaury Denoyelleea3e0352022-02-21 10:05:16 +0100431
432 if (!qcc)
433 return NULL;
434
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +0100435 qcc->task = NULL;
436
437 if (qcc_is_dead(qcc))
438 qc_release(qcc);
439
440 return NULL;
441}
442
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100443static int qc_init(struct connection *conn, struct proxy *prx,
444 struct session *sess, struct buffer *input)
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100445{
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100446 struct qcc *qcc;
Amaury Denoyellef3b0ba72021-12-08 15:12:01 +0100447 struct quic_transport_params *srv_params;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100448
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100449 qcc = pool_alloc(pool_head_qcc);
450 if (!qcc)
451 goto fail_no_qcc;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100452
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100453 qcc->conn = conn;
454 conn->ctx = qcc;
Amaury Denoyellece1f30d2022-02-01 15:14:24 +0100455 qcc->flags = 0;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100456
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100457 qcc->app_ops = NULL;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100458
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100459 qcc->streams_by_id = EB_ROOT_UNIQUE;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100460
Amaury Denoyellef3b0ba72021-12-08 15:12:01 +0100461 /* Server parameters, params used for RX flow control. */
462 srv_params = &conn->qc->rx.params;
463
464 qcc->rx.max_data = srv_params->initial_max_data;
465 qcc->tx.max_data = 0;
466
467 /* Client initiated streams must respect the server flow control. */
468 qcc->strms[QCS_CLT_BIDI].max_streams = srv_params->initial_max_streams_bidi;
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100469 qcc->strms[QCS_CLT_BIDI].nb_streams = 0;
470 qcc->strms[QCS_CLT_BIDI].largest_id = -1;
Amaury Denoyellef3b0ba72021-12-08 15:12:01 +0100471 qcc->strms[QCS_CLT_BIDI].rx.max_data = 0;
472 qcc->strms[QCS_CLT_BIDI].tx.max_data = srv_params->initial_max_stream_data_bidi_remote;
473
474 qcc->strms[QCS_CLT_UNI].max_streams = srv_params->initial_max_streams_uni;
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100475 qcc->strms[QCS_CLT_UNI].nb_streams = 0;
476 qcc->strms[QCS_CLT_UNI].largest_id = -1;
Amaury Denoyellef3b0ba72021-12-08 15:12:01 +0100477 qcc->strms[QCS_CLT_UNI].rx.max_data = 0;
478 qcc->strms[QCS_CLT_UNI].tx.max_data = srv_params->initial_max_stream_data_uni;
479
480 /* Server initiated streams must respect the server flow control. */
481 qcc->strms[QCS_SRV_BIDI].max_streams = 0;
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100482 qcc->strms[QCS_SRV_BIDI].nb_streams = 0;
483 qcc->strms[QCS_SRV_BIDI].largest_id = -1;
Amaury Denoyellef3b0ba72021-12-08 15:12:01 +0100484 qcc->strms[QCS_SRV_BIDI].rx.max_data = srv_params->initial_max_stream_data_bidi_local;
485 qcc->strms[QCS_SRV_BIDI].tx.max_data = 0;
486
487 qcc->strms[QCS_SRV_UNI].max_streams = 0;
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100488 qcc->strms[QCS_SRV_UNI].nb_streams = 0;
489 qcc->strms[QCS_SRV_UNI].largest_id = -1;
Amaury Denoyellef3b0ba72021-12-08 15:12:01 +0100490 qcc->strms[QCS_SRV_UNI].rx.max_data = srv_params->initial_max_stream_data_uni;
491 qcc->strms[QCS_SRV_UNI].tx.max_data = 0;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100492
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100493 qcc->wait_event.tasklet = tasklet_new();
494 if (!qcc->wait_event.tasklet)
495 goto fail_no_tasklet;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100496
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100497 qcc->subs = NULL;
498 qcc->wait_event.tasklet->process = qc_io_cb;
499 qcc->wait_event.tasklet->context = qcc;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100500
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +0100501 /* haproxy timeouts */
502 qcc->timeout = prx->timeout.client;
503 qcc->task = task_new_here();
504 if (!qcc->task)
505 goto fail_no_timeout_task;
506 qcc->task->process = qc_timeout_task;
507 qcc->task->context = qcc;
508 qcc->task->expire = tick_add(now_ms, qcc->timeout);
509
Amaury Denoyelle0e0969d2022-01-31 15:41:14 +0100510 if (!conn_is_back(conn)) {
511 if (!LIST_INLIST(&conn->stopping_list)) {
512 LIST_APPEND(&mux_stopping_data[tid].list,
513 &conn->stopping_list);
514 }
515 }
516
Frédéric Lécailleb80b20c2022-01-12 17:46:56 +0100517 HA_ATOMIC_STORE(&conn->qc->qcc, qcc);
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100518 /* init read cycle */
519 tasklet_wakeup(qcc->wait_event.tasklet);
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100520
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100521 return 0;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100522
Amaury Denoyelleaebe26f2022-01-13 16:28:06 +0100523 fail_no_timeout_task:
524 tasklet_free(qcc->wait_event.tasklet);
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100525 fail_no_tasklet:
526 pool_free(pool_head_qcc, qcc);
527 fail_no_qcc:
528 return -1;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100529}
530
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100531static void qc_detach(struct conn_stream *cs)
532{
Amaury Denoyelle916f0ac2021-12-06 16:03:47 +0100533 struct qcs *qcs = cs->ctx;
534 struct qcc *qcc = qcs->qcc;
535
536 fprintf(stderr, "%s: leaving with tx.buf.data=%lu, tx.xprt_buf.data=%lu\n",
537 __func__, b_data(&qcs->tx.buf), b_data(&qcs->tx.xprt_buf));
538
Amaury Denoyelled9751482022-02-01 15:15:11 +0100539 /* TODO on CONNECTION_CLOSE reception, it should be possible to free
540 * qcs instances. This should be done once the buffering and ACK
541 * managment between xprt and mux is reorganized.
542 */
543
544 if ((b_data(&qcs->tx.buf) || b_data(&qcs->tx.xprt_buf))) {
Amaury Denoyelle2873a312021-12-08 14:42:55 +0100545 qcs->flags |= QC_SF_DETACH;
546 return;
547 }
548
Amaury Denoyelle916f0ac2021-12-06 16:03:47 +0100549 qcs_destroy(qcs);
Amaury Denoyelle1136e922022-02-01 10:33:09 +0100550
551 /* Schedule the mux timeout if no bidirectional streams left. */
552 if (qcc_may_expire(qcc)) {
553 qcc->task->expire = tick_add(now_ms, qcc->timeout);
554 task_queue(qcc->task);
Amaury Denoyelle916f0ac2021-12-06 16:03:47 +0100555 }
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100556}
557
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100558/* Called from the upper layer, to receive data */
559static size_t qc_rcv_buf(struct conn_stream *cs, struct buffer *buf,
560 size_t count, int flags)
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100561{
Amaury Denoyelle9a327a72022-02-14 17:11:09 +0100562 struct qcs *qcs = cs->ctx;
563 struct htx *qcs_htx = NULL;
564 struct htx *cs_htx = NULL;
565 size_t ret = 0;
Amaury Denoyelleeb53e5b2022-02-14 17:11:32 +0100566 char fin = 0;
Amaury Denoyelle9a327a72022-02-14 17:11:09 +0100567
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100568 fprintf(stderr, "%s\n", __func__);
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100569
Amaury Denoyelle9a327a72022-02-14 17:11:09 +0100570 qcs_htx = htx_from_buf(&qcs->rx.app_buf);
571 if (htx_is_empty(qcs_htx)) {
572 /* Set buffer data to 0 as HTX is empty. */
573 htx_to_buf(qcs_htx, &qcs->rx.app_buf);
574 goto end;
575 }
576
577 ret = qcs_htx->data;
578
579 cs_htx = htx_from_buf(buf);
580 if (htx_is_empty(cs_htx) && htx_used_space(qcs_htx) <= count) {
581 htx_to_buf(cs_htx, buf);
582 htx_to_buf(qcs_htx, &qcs->rx.app_buf);
583 b_xfer(buf, &qcs->rx.app_buf, b_data(&qcs->rx.app_buf));
584 goto end;
585 }
586
587 htx_xfer_blks(cs_htx, qcs_htx, count, HTX_BLK_UNUSED);
588 BUG_ON(qcs_htx->flags & HTX_FL_PARSING_ERROR);
589
590 /* Copy EOM from src to dst buffer if all data copied. */
Amaury Denoyelleeb53e5b2022-02-14 17:11:32 +0100591 if (htx_is_empty(qcs_htx) && (qcs_htx->flags & HTX_FL_EOM)) {
592 cs_htx->flags |= HTX_FL_EOM;
593 fin = 1;
594 }
Amaury Denoyelle9a327a72022-02-14 17:11:09 +0100595
596 cs_htx->extra = qcs_htx->extra ? (qcs_htx->data + qcs_htx->extra) : 0;
597 htx_to_buf(cs_htx, buf);
598 htx_to_buf(qcs_htx, &qcs->rx.app_buf);
599 ret -= qcs_htx->data;
600
601 end:
602 if (b_data(&qcs->rx.app_buf)) {
603 cs->flags |= (CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
604 }
605 else {
606 cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
607 if (cs->flags & CS_FL_ERR_PENDING)
608 cs->flags |= CS_FL_ERROR;
609
Amaury Denoyelleeb53e5b2022-02-14 17:11:32 +0100610 if (fin)
611 cs->flags |= (CS_FL_EOI|CS_FL_EOS);
Amaury Denoyelle9a327a72022-02-14 17:11:09 +0100612
613 if (b_size(&qcs->rx.app_buf)) {
614 b_free(&qcs->rx.app_buf);
615 offer_buffers(NULL, 1);
616 }
617 }
618
619 if (ret)
620 tasklet_wakeup(qcs->qcc->wait_event.tasklet);
621
622 return ret;
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100623}
624
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100625static size_t qc_snd_buf(struct conn_stream *cs, struct buffer *buf,
626 size_t count, int flags)
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100627{
628 struct qcs *qcs = cs->ctx;
629
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100630 fprintf(stderr, "%s\n", __func__);
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100631
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100632 return qcs->qcc->app_ops->snd_buf(cs, buf, count, flags);
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100633}
634
635/* Called from the upper layer, to subscribe <es> to events <event_type>. The
636 * event subscriber <es> is not allowed to change from a previous call as long
637 * as at least one event is still subscribed. The <event_type> must only be a
638 * combination of SUB_RETRY_RECV and SUB_RETRY_SEND. It always returns 0.
639 */
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100640static int qc_subscribe(struct conn_stream *cs, int event_type,
641 struct wait_event *es)
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100642{
Amaury Denoyellea3f222d2021-12-06 11:24:00 +0100643 return qcs_subscribe(cs->ctx, event_type, es);
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100644}
645
646/* Called from the upper layer, to unsubscribe <es> from events <event_type>.
647 * The <es> pointer is not allowed to differ from the one passed to the
648 * subscribe() call. It always returns zero.
649 */
650static int qc_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
651{
652 struct qcs *qcs = cs->ctx;
653
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100654 BUG_ON(event_type & ~(SUB_RETRY_SEND|SUB_RETRY_RECV));
655 BUG_ON(qcs->subs && qcs->subs != es);
656
657 es->events &= ~event_type;
658 if (!es->events)
659 qcs->subs = NULL;
660
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100661 return 0;
662}
663
Amaury Denoyelle0e0969d2022-01-31 15:41:14 +0100664static int qc_wake(struct connection *conn)
665{
666 struct qcc *qcc = conn->ctx;
667
668 /* Check if a soft-stop is in progress.
669 * Release idling front connection if this is the case.
670 */
671 if (unlikely(conn->qc->li->bind_conf->frontend->flags & (PR_FL_DISABLED|PR_FL_STOPPED))) {
672 qc_release(qcc);
673 }
674
675 return 1;
676}
677
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100678static const struct mux_ops qc_ops = {
679 .init = qc_init,
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100680 .detach = qc_detach,
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100681 .rcv_buf = qc_rcv_buf,
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100682 .snd_buf = qc_snd_buf,
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100683 .subscribe = qc_subscribe,
684 .unsubscribe = qc_unsubscribe,
Amaury Denoyelle0e0969d2022-01-31 15:41:14 +0100685 .wake = qc_wake,
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100686};
687
688static struct mux_proto_list mux_proto_quic =
Amaury Denoyelledeed7772021-12-03 11:36:46 +0100689 { .token = IST("quic"), .mode = PROTO_MODE_HTTP, .side = PROTO_SIDE_FE, .mux = &qc_ops };
Frédéric Lécailledfbae762021-02-18 09:59:01 +0100690
691INITCALL1(STG_REGISTER, register_mux_proto, &mux_proto_quic);