blob: 0d9e12ab641ef5c0b6d230673bb305d31837f2ba [file] [log] [blame]
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +02001#include <haproxy/quic_stream.h>
2
3#include <import/eb64tree.h>
4
5#include <haproxy/api.h>
6#include <haproxy/buf.h>
7#include <haproxy/list.h>
8#include <haproxy/dynbuf.h>
9#include <haproxy/pool.h>
10#include <haproxy/xprt_quic.h>
11
12DECLARE_STATIC_POOL(pool_head_quic_conn_stream, "qc_stream_desc",
13 sizeof(struct qc_stream_desc));
Amaury Denoyellea4569202022-04-15 17:29:25 +020014DECLARE_STATIC_POOL(pool_head_quic_conn_stream_buf, "qc_stream_buf",
15 sizeof(struct qc_stream_buf));
16
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +020017
18/* Allocate a new stream descriptor with id <id>. The caller is responsible to
19 * store the stream in the appropriate tree.
20 *
21 * Returns the newly allocated instance on success or else NULL.
22 */
Frédéric Lécaille664741e2022-05-02 18:46:58 +020023struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void *ctx,
Amaury Denoyellee4301da2022-04-19 17:59:50 +020024 struct quic_conn *qc)
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +020025{
26 struct qc_stream_desc *stream;
27
28 stream = pool_alloc(pool_head_quic_conn_stream);
29 if (!stream)
30 return NULL;
31
32 stream->by_id.key = id;
Amaury Denoyellee4301da2022-04-19 17:59:50 +020033 eb64_insert(&qc->streams_by_id, &stream->by_id);
Frédéric Lécaille664741e2022-05-02 18:46:58 +020034 qc->rx.strms[type].nb_streams++;
Amaury Denoyelleb22c0462022-04-21 11:00:41 +020035 stream->qc = qc;
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +020036
Amaury Denoyellea4569202022-04-15 17:29:25 +020037 stream->buf = NULL;
38 LIST_INIT(&stream->buf_list);
39 stream->buf_offset = 0;
40
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +020041 stream->acked_frms = EB_ROOT;
42 stream->ack_offset = 0;
43 stream->release = 0;
44 stream->ctx = ctx;
45
46 return stream;
47}
48
Amaury Denoyellee4301da2022-04-19 17:59:50 +020049/* Mark the stream descriptor <stream> as released. It will be freed as soon as
50 * all its buffered data are acknowledged.
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +020051 */
Amaury Denoyellee4301da2022-04-19 17:59:50 +020052void qc_stream_desc_release(struct qc_stream_desc *stream)
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +020053{
Amaury Denoyellee4301da2022-04-19 17:59:50 +020054 /* A stream can be released only one time. */
55 BUG_ON(stream->release);
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +020056
57 stream->release = 1;
58 stream->ctx = NULL;
59
Amaury Denoyellea4569202022-04-15 17:29:25 +020060 if (LIST_ISEMPTY(&stream->buf_list)) {
61 /* if no buffer left we can free the stream. */
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +020062 qc_stream_desc_free(stream);
Amaury Denoyellea4569202022-04-15 17:29:25 +020063 }
64 else {
65 /* A released stream does not use <stream.buf>. */
66 stream->buf = NULL;
67 }
68}
69
Amaury Denoyelle1b81dda2022-04-21 09:32:53 +020070/* Acknowledge data at <offset> of length <len> for <stream>. It is handled
71 * only if it covers a range corresponding to stream.ack_offset. After data
72 * removal, if the stream does not contains data any more and is already
73 * released, the instance stream is freed. <stream> is set to NULL to indicate
74 * this.
75 *
76 * Returns the count of byte removed from stream. Do not forget to check if
77 * <stream> is NULL after invocation.
78 */
Amaury Denoyelled2f80a22022-04-15 17:30:49 +020079int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len)
Amaury Denoyelle1b81dda2022-04-21 09:32:53 +020080{
81 struct qc_stream_desc *s = *stream;
82 struct qc_stream_buf *stream_buf;
Amaury Denoyelled2f80a22022-04-15 17:30:49 +020083 struct quic_conn *qc = s->qc;
Amaury Denoyelle1b81dda2022-04-21 09:32:53 +020084 struct buffer *buf;
85 size_t diff;
86
87 if (offset + len <= s->ack_offset || offset > s->ack_offset)
88 return 0;
89
90 /* There must be at least a buffer or we must not report an ACK. */
91 BUG_ON(LIST_ISEMPTY(&s->buf_list));
92
93 /* get oldest buffer from buf_list */
94 stream_buf = LIST_NEXT(&s->buf_list, struct qc_stream_buf *, list);
95 buf = &stream_buf->buf;
96
97 diff = offset + len - s->ack_offset;
98 s->ack_offset += diff;
99 b_del(buf, diff);
100
101 /* nothing more to do if buf still not empty. */
102 if (b_data(buf))
103 return diff;
104
105 /* buf is empty and can now be freed. Do not forget to reset current
106 * buf ptr if we were working on it.
107 */
108 LIST_DELETE(&stream_buf->list);
109 if (stream_buf == s->buf) {
110 /* current buf must always be last entry in buflist */
111 BUG_ON(!LIST_ISEMPTY(&s->buf_list));
112 s->buf = NULL;
113 }
114
115 b_free(buf);
116 pool_free(pool_head_quic_conn_stream_buf, stream_buf);
117 offer_buffers(NULL, 1);
118
Amaury Denoyelled2f80a22022-04-15 17:30:49 +0200119 /* notify MUX about available buffers. */
120 --qc->stream_buf_count;
121 if (qc->mux_state == QC_MUX_READY) {
122 if (qc->qcc->flags & QC_CF_CONN_FULL) {
123 qc->qcc->flags &= ~QC_CF_CONN_FULL;
124 tasklet_wakeup(qc->qcc->wait_event.tasklet);
125 }
126 }
127
Amaury Denoyelle1b81dda2022-04-21 09:32:53 +0200128 /* Free stream instance if already released and no buffers left. */
129 if (s->release && LIST_ISEMPTY(&s->buf_list)) {
130 qc_stream_desc_free(s);
131 *stream = NULL;
132 }
133
134 return diff;
135}
136
Amaury Denoyellea4569202022-04-15 17:29:25 +0200137/* Free the stream descriptor <stream> content. This function should be used
138 * when all its data have been acknowledged or on full connection closing. It
139 * must only be called after the stream is released.
140 */
141void qc_stream_desc_free(struct qc_stream_desc *stream)
142{
143 struct qc_stream_buf *buf, *buf_back;
Amaury Denoyelled2f80a22022-04-15 17:30:49 +0200144 struct quic_conn *qc = stream->qc;
Amaury Denoyellea4569202022-04-15 17:29:25 +0200145 struct eb64_node *frm_node;
146 unsigned int free_count = 0;
147
148 /* This function only deals with released streams. */
149 BUG_ON(!stream->release);
150
151 /* free remaining stream buffers */
152 list_for_each_entry_safe(buf, buf_back, &stream->buf_list, list) {
153 if (!(b_data(&buf->buf))) {
154 b_free(&buf->buf);
155 LIST_DELETE(&buf->list);
156 pool_free(pool_head_quic_conn_stream_buf, buf);
157
158 ++free_count;
159 }
160 }
161
Amaury Denoyelled2f80a22022-04-15 17:30:49 +0200162 if (free_count) {
Amaury Denoyellea4569202022-04-15 17:29:25 +0200163 offer_buffers(NULL, free_count);
164
Amaury Denoyelled2f80a22022-04-15 17:30:49 +0200165 qc->stream_buf_count -= free_count;
166 if (qc->mux_state == QC_MUX_READY) {
167 /* notify MUX about available buffers. */
168 if (qc->qcc->flags & QC_CF_CONN_FULL) {
169 qc->qcc->flags &= ~QC_CF_CONN_FULL;
170 tasklet_wakeup(qc->qcc->wait_event.tasklet);
171 }
172 }
173 }
174
Amaury Denoyellea4569202022-04-15 17:29:25 +0200175 /* qc_stream_desc might be freed before having received all its ACKs.
176 * This is the case if some frames were retransmitted.
177 */
178 frm_node = eb64_first(&stream->acked_frms);
179 while (frm_node) {
180 struct quic_stream *strm;
181 struct quic_frame *frm;
182
Frédéric Lécaillea54e49d2022-05-10 15:15:24 +0200183 strm = eb64_entry(frm_node, struct quic_stream, offset);
Amaury Denoyellea4569202022-04-15 17:29:25 +0200184
185 frm_node = eb64_next(frm_node);
186 eb64_delete(&strm->offset);
187
188 frm = container_of(strm, struct quic_frame, stream);
Frédéric Lécailleda342552022-04-25 10:28:49 +0200189 qc_release_frm(qc, frm);
Amaury Denoyellea4569202022-04-15 17:29:25 +0200190 }
191
192 eb64_delete(&stream->by_id);
193 pool_free(pool_head_quic_conn_stream, stream);
194}
195
196/* Return the current buffer of <stream>. May be NULL if not allocated. */
197struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
198{
199 if (!stream->buf)
200 return NULL;
201
202 return &stream->buf->buf;
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +0200203}
204
Amaury Denoyelled2f80a22022-04-15 17:30:49 +0200205/* Check if a new stream buffer can be allocated for the connection <qc>.
206 * Returns a boolean.
207 */
Amaury Denoyelle1b2dba52022-04-15 17:32:04 +0200208static int qc_stream_buf_avail(struct quic_conn *qc)
Amaury Denoyelled2f80a22022-04-15 17:30:49 +0200209{
Amaury Denoyelle97e84c62022-04-19 18:26:55 +0200210 return qc->stream_buf_count < global.tune.quic_streams_buf;
Amaury Denoyelled2f80a22022-04-15 17:30:49 +0200211}
212
213/* Allocate a new current buffer for <stream>. The buffer limit count for the
214 * connection is checked first. This function is not allowed if current buffer
215 * is not NULL prior to this call. The new buffer represents stream payload at
216 * offset <offset>.
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +0200217 *
Amaury Denoyellea4569202022-04-15 17:29:25 +0200218 * Returns the buffer or NULL.
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +0200219 */
Amaury Denoyellea4569202022-04-15 17:29:25 +0200220struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
221 uint64_t offset)
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +0200222{
Amaury Denoyelled2f80a22022-04-15 17:30:49 +0200223 struct quic_conn *qc = stream->qc;
224
Amaury Denoyellea4569202022-04-15 17:29:25 +0200225 /* current buffer must be released first before allocate a new one. */
226 BUG_ON(stream->buf);
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +0200227
Amaury Denoyelled2f80a22022-04-15 17:30:49 +0200228 if (!qc_stream_buf_avail(qc))
229 return NULL;
230
231 ++qc->stream_buf_count;
232
Amaury Denoyellea4569202022-04-15 17:29:25 +0200233 stream->buf_offset = offset;
234 stream->buf = pool_alloc(pool_head_quic_conn_stream_buf);
235 if (!stream->buf)
236 return NULL;
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +0200237
Amaury Denoyellea4569202022-04-15 17:29:25 +0200238 stream->buf->buf = BUF_NULL;
239 LIST_APPEND(&stream->buf_list, &stream->buf->list);
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +0200240
Amaury Denoyellea4569202022-04-15 17:29:25 +0200241 return &stream->buf->buf;
242}
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +0200243
Amaury Denoyellea4569202022-04-15 17:29:25 +0200244/* Release the current buffer of <stream>. It will be kept internally by
245 * the <stream>. The current buffer cannot be NULL.
246 */
247void qc_stream_buf_release(struct qc_stream_desc *stream)
248{
249 /* current buffer already released */
250 BUG_ON(!stream->buf);
Amaury Denoyelle0cc02a32022-04-19 17:21:11 +0200251
Amaury Denoyellea4569202022-04-15 17:29:25 +0200252 stream->buf = NULL;
253 stream->buf_offset = 0;
254}