blob: 7f72e1c616c0de40b5e7f554eee073d5628eac9f [file] [log] [blame]
Christopher Faulet1329f2a2021-12-16 17:32:56 +01001/*
2 * Conn-stream management functions
3 *
4 * Copyright 2021 Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <haproxy/api.h>
Christopher Faulet37046632022-04-01 11:36:58 +020014#include <haproxy/applet.h>
Christopher Faulet1329f2a2021-12-16 17:32:56 +010015#include <haproxy/connection.h>
16#include <haproxy/conn_stream.h>
Christopher Faulet19bd7282022-04-01 13:58:09 +020017#include <haproxy/cs_utils.h>
Christopher Faulet1329f2a2021-12-16 17:32:56 +010018#include <haproxy/pool.h>
Christopher Fauletcda94ac2021-12-23 17:28:17 +010019#include <haproxy/stream_interface.h>
Christopher Faulet1329f2a2021-12-16 17:32:56 +010020
21DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream));
Christopher Fauletdb90f2a2022-03-22 16:06:25 +010022DECLARE_POOL(pool_head_cs_endpoint, "cs_endpoint", sizeof(struct cs_endpoint));
Christopher Faulet1329f2a2021-12-16 17:32:56 +010023
Christopher Faulet9ffddd52022-04-01 14:04:29 +020024/* functions used by default on a detached conn-stream */
25static void cs_app_shutr(struct conn_stream *cs);
26static void cs_app_shutw(struct conn_stream *cs);
27static void cs_app_chk_rcv(struct conn_stream *cs);
28static void cs_app_chk_snd(struct conn_stream *cs);
29
30/* functions used on a mux-based conn-stream */
31static void cs_app_shutr_conn(struct conn_stream *cs);
32static void cs_app_shutw_conn(struct conn_stream *cs);
33static void cs_app_chk_rcv_conn(struct conn_stream *cs);
34static void cs_app_chk_snd_conn(struct conn_stream *cs);
35
36/* functions used on an applet-based conn-stream */
37static void cs_app_shutr_applet(struct conn_stream *cs);
38static void cs_app_shutw_applet(struct conn_stream *cs);
39static void cs_app_chk_rcv_applet(struct conn_stream *cs);
40static void cs_app_chk_snd_applet(struct conn_stream *cs);
41
42/* conn-stream operations for connections */
43struct cs_app_ops cs_app_conn_ops = {
44 .chk_rcv = cs_app_chk_rcv_conn,
45 .chk_snd = cs_app_chk_snd_conn,
46 .shutr = cs_app_shutr_conn,
47 .shutw = cs_app_shutw_conn,
48};
49
50/* conn-stream operations for embedded tasks */
51struct cs_app_ops cs_app_embedded_ops = {
52 .chk_rcv = cs_app_chk_rcv,
53 .chk_snd = cs_app_chk_snd,
54 .shutr = cs_app_shutr,
55 .shutw = cs_app_shutw,
56};
57
58/* conn-stream operations for connections */
59struct cs_app_ops cs_app_applet_ops = {
60 .chk_rcv = cs_app_chk_rcv_applet,
61 .chk_snd = cs_app_chk_snd_applet,
62 .shutr = cs_app_shutr_applet,
63 .shutw = cs_app_shutw_applet,
64};
65
66
Christopher Fauletdb90f2a2022-03-22 16:06:25 +010067void cs_endpoint_init(struct cs_endpoint *endp)
68{
69 endp->target = NULL;
70 endp->ctx = NULL;
71 endp->flags = CS_EP_NONE;
72}
73
74struct cs_endpoint *cs_endpoint_new()
75{
76 struct cs_endpoint *endp;
77
78 endp = pool_alloc(pool_head_cs_endpoint);
79 if (unlikely(!endp))
80 return NULL;
81
82 cs_endpoint_init(endp);
83 return endp;
84}
85
86void cs_endpoint_free(struct cs_endpoint *endp)
87{
88 pool_free(pool_head_cs_endpoint, endp);
89}
Christopher Faulet1329f2a2021-12-16 17:32:56 +010090
Christopher Fauletdd2d0d82021-12-20 09:34:32 +010091/* Tries to allocate a new conn_stream and initialize its main fields. On
92 * failure, nothing is allocated and NULL is returned.
Christopher Faulet1329f2a2021-12-16 17:32:56 +010093 */
Christopher Fauletb669d682022-03-22 18:37:19 +010094struct conn_stream *cs_new(struct cs_endpoint *endp)
Christopher Faulet1329f2a2021-12-16 17:32:56 +010095{
96 struct conn_stream *cs;
97
98 cs = pool_alloc(pool_head_connstream);
Christopher Fauletdb90f2a2022-03-22 16:06:25 +010099
Christopher Faulet1329f2a2021-12-16 17:32:56 +0100100 if (unlikely(!cs))
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100101 goto alloc_error;
Christopher Fauletbb772d02022-03-22 15:28:36 +0100102
103 cs->obj_type = OBJ_TYPE_CS;
104 cs->flags = CS_FL_NONE;
Christopher Faulet62e75742022-03-31 09:16:34 +0200105 cs->state = CS_ST_INI;
Christopher Faulet1d987772022-03-29 18:03:35 +0200106 cs->hcto = TICK_ETERNITY;
Christopher Fauletbb772d02022-03-22 15:28:36 +0100107 cs->app = NULL;
Christopher Fauletbb772d02022-03-22 15:28:36 +0100108 cs->si = NULL;
109 cs->data_cb = NULL;
Christopher Faulet8da67aa2022-03-29 17:53:09 +0200110 cs->src = NULL;
111 cs->dst = NULL;
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200112 cs->wait_event.tasklet = NULL;
113 cs->wait_event.events = 0;
114
Christopher Fauletb669d682022-03-22 18:37:19 +0100115 if (!endp) {
116 endp = cs_endpoint_new();
117 if (unlikely(!endp))
118 goto alloc_error;
119 }
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100120 cs->endp = endp;
121
Christopher Faulet1329f2a2021-12-16 17:32:56 +0100122 return cs;
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100123
124 alloc_error:
125 pool_free(pool_head_connstream, cs);
126 return NULL;
Christopher Faulet1329f2a2021-12-16 17:32:56 +0100127}
128
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100129struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input)
130{
131 struct conn_stream *cs;
132
133 cs = cs_new(endp);
134 if (unlikely(!cs))
135 return NULL;
136 if (unlikely(!stream_new(sess, cs, input))) {
137 pool_free(pool_head_connstream, cs);
138 cs = NULL;
139 }
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100140 endp->flags &= ~CS_EP_ORPHAN;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100141 return cs;
142}
143
144struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input)
145{
146 struct conn_stream *cs;
147 struct appctx *appctx = endp->ctx;
148
149 cs = cs_new(endp);
150 if (unlikely(!cs))
151 return NULL;
152 appctx->owner = cs;
153 if (unlikely(!stream_new(sess, cs, input))) {
154 pool_free(pool_head_connstream, cs);
155 cs = NULL;
156 }
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100157 endp->flags &= ~CS_EP_ORPHAN;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100158 return cs;
159}
160
161struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags)
162{
163 struct conn_stream *cs;
164
165 cs = cs_new(NULL);
166 if (unlikely(!cs))
167 return NULL;
168 cs->flags |= flags;
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100169 cs->endp->flags |= CS_EP_DETACHED;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100170 cs->si = si_new(cs);
171 if (unlikely(!cs->si)) {
172 cs_free(cs);
173 return NULL;
174 }
Christopher Faulet0c6a64c2022-04-01 08:58:29 +0200175
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100176 cs->app = &strm->obj_type;
Christopher Faulet0c6a64c2022-04-01 08:58:29 +0200177 cs->ops = &cs_app_embedded_ops;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100178 cs->data_cb = NULL;
179 return cs;
180}
181
182struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags)
183{
184 struct conn_stream *cs;
185
186 cs = cs_new(NULL);
187 if (unlikely(!cs))
188 return NULL;
189 cs->flags |= flags;
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100190 cs->endp->flags |= CS_EP_DETACHED;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100191 cs->app = &check->obj_type;
192 cs->data_cb = &check_conn_cb;
193 return cs;
194}
195
Christopher Faulet1329f2a2021-12-16 17:32:56 +0100196/* Releases a conn_stream previously allocated by cs_new(), as well as any
197 * buffer it would still hold.
198 */
199void cs_free(struct conn_stream *cs)
200{
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100201 si_free(cs->si);
Christopher Faulet8da67aa2022-03-29 17:53:09 +0200202 sockaddr_free(&cs->src);
203 sockaddr_free(&cs->dst);
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100204 if (cs->endp) {
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100205 BUG_ON(!(cs->endp->flags & CS_EP_DETACHED));
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100206 cs_endpoint_free(cs->endp);
207 }
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200208 if (cs->wait_event.tasklet)
209 tasklet_free(cs->wait_event.tasklet);
Christopher Faulet1329f2a2021-12-16 17:32:56 +0100210 pool_free(pool_head_connstream, cs);
211}
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100212
213
Christopher Faulet93882042022-01-19 14:56:50 +0100214/* Attaches a conn_stream to an mux endpoint and sets the endpoint ctx */
Christopher Faulet070b91b2022-03-31 19:27:18 +0200215int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100216{
Christopher Faulet93882042022-01-19 14:56:50 +0100217 struct connection *conn = ctx;
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100218
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100219 cs->endp->target = target;
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100220 cs->endp->ctx = ctx;
221 cs->endp->flags |= CS_EP_T_MUX;
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100222 cs->endp->flags &= ~CS_EP_DETACHED;
Christopher Faulet93882042022-01-19 14:56:50 +0100223 if (!conn->ctx)
224 conn->ctx = cs;
225 if (cs_strm(cs)) {
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200226 if (!cs->wait_event.tasklet) {
227 cs->wait_event.tasklet = tasklet_new();
228 if (!cs->wait_event.tasklet)
229 return -1;
Christopher Faulet4a7764a2022-04-01 16:58:52 +0200230 cs->wait_event.tasklet->process = cs_conn_io_cb;
231 cs->wait_event.tasklet->context = cs;
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200232 cs->wait_event.events = 0;
233 }
234
Christopher Faulet0c6a64c2022-04-01 08:58:29 +0200235 cs->ops = &cs_app_conn_ops;
Christopher Faulet93882042022-01-19 14:56:50 +0100236 cs->data_cb = &si_conn_cb;
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100237 }
Christopher Faulet93882042022-01-19 14:56:50 +0100238 else if (cs_check(cs))
239 cs->data_cb = &check_conn_cb;
Christopher Faulet070b91b2022-03-31 19:27:18 +0200240 return 0;
Christopher Faulet93882042022-01-19 14:56:50 +0100241}
242
243/* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100244void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx)
Christopher Faulet93882042022-01-19 14:56:50 +0100245{
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100246 struct appctx *appctx = target;
Christopher Faulet93882042022-01-19 14:56:50 +0100247
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100248 cs->endp->target = target;
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100249 cs->endp->ctx = ctx;
250 cs->endp->flags |= CS_EP_T_APPLET;
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100251 cs->endp->flags &= ~CS_EP_DETACHED;
Christopher Faulet93882042022-01-19 14:56:50 +0100252 appctx->owner = cs;
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100253 if (cs_strm(cs)) {
Christopher Faulet0c6a64c2022-04-01 08:58:29 +0200254 cs->ops = &cs_app_applet_ops;
Christopher Faulet6059ba42022-04-01 16:34:53 +0200255 cs->data_cb = &cs_data_applet_cb;
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100256 }
257}
258
259/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100260int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100261{
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100262 cs->app = &strm->obj_type;
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100263
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100264 cs->si = si_new(cs);
265 if (unlikely(!cs->si))
266 return -1;
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200267
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100268 cs->endp->flags &= ~CS_EP_ORPHAN;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100269 if (cs->endp->flags & CS_EP_T_MUX) {
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200270 cs->wait_event.tasklet = tasklet_new();
271 if (!cs->wait_event.tasklet) {
272 si_free(cs->si);
273 cs->si = NULL;
274 return -1;
275 }
Christopher Faulet4a7764a2022-04-01 16:58:52 +0200276 cs->wait_event.tasklet->process = cs_conn_io_cb;
277 cs->wait_event.tasklet->context = cs;
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200278 cs->wait_event.events = 0;
279
Christopher Faulet0c6a64c2022-04-01 08:58:29 +0200280 cs->ops = &cs_app_conn_ops;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100281 cs->data_cb = &si_conn_cb;
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100282 }
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100283 else if (cs->endp->flags & CS_EP_T_APPLET) {
Christopher Faulet0c6a64c2022-04-01 08:58:29 +0200284 cs->ops = &cs_app_applet_ops;
Christopher Faulet6059ba42022-04-01 16:34:53 +0200285 cs->data_cb = &cs_data_applet_cb;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100286 }
287 else {
Christopher Faulet0c6a64c2022-04-01 08:58:29 +0200288 cs->ops = &cs_app_embedded_ops;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100289 cs->data_cb = NULL;
290 }
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100291 return 0;
292}
293
294/* Detach the conn_stream from the endpoint, if any. For a connecrion, if a mux
295 * owns the connection ->detach() callback is called. Otherwise, it means the
296 * conn-stream owns the connection. In this case the connection is closed and
297 * released. For an applet, the appctx is released. At the end, the conn-stream
298 * is not released but some fields a reset.
299 */
300void cs_detach_endp(struct conn_stream *cs)
301{
Christopher Fauletb041b232022-03-24 10:27:02 +0100302 if (!cs->endp)
303 goto reset_cs;
304
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100305 if (cs->endp->flags & CS_EP_T_MUX) {
306 struct connection *conn = cs_conn(cs);
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100307
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100308 if (conn->mux) {
Christopher Faulet54e85cb2022-01-06 08:46:56 +0100309 /* TODO: handle unsubscribe for healthchecks too */
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100310 cs->endp->flags |= CS_EP_ORPHAN;
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200311 if (cs->wait_event.events != 0)
312 conn->mux->unsubscribe(cs, cs->wait_event.events, &cs->wait_event);
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100313 conn->mux->detach(cs);
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100314 cs->endp = NULL;
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100315 }
316 else {
317 /* It's too early to have a mux, let's just destroy
318 * the connection
319 */
320 conn_stop_tracking(conn);
321 conn_full_close(conn);
322 if (conn->destroy_cb)
323 conn->destroy_cb(conn);
324 conn_free(conn);
325 }
326 }
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100327 else if (cs->endp->flags & CS_EP_T_APPLET) {
328 struct appctx *appctx = cs_appctx(cs);
329
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100330 cs->endp->flags |= CS_EP_ORPHAN;
Christopher Faulet37046632022-04-01 11:36:58 +0200331 cs_applet_release(cs);
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100332 appctx_free(appctx);
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100333 cs->endp = NULL;
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100334 }
335
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100336 if (cs->endp) {
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100337 /* the cs is the only one one the endpoint */
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100338 cs_endpoint_init(cs->endp);
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100339 cs->endp->flags |= CS_EP_DETACHED;
Christopher Fauletdb90f2a2022-03-22 16:06:25 +0100340 }
341
Christopher Fauletb041b232022-03-24 10:27:02 +0100342 reset_cs:
Christopher Fauletc36de9d2022-01-06 08:44:58 +0100343 /* FIXME: Rest CS for now but must be reviewed. CS flags are only
344 * connection related for now but this will evolved
345 */
Christopher Faulet30995112022-03-25 15:32:38 +0100346 cs->flags &= CS_FL_ISBACK;
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100347 if (cs->si)
Christopher Faulet0c6a64c2022-04-01 08:58:29 +0200348 cs->ops = &cs_app_embedded_ops;
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100349 cs->data_cb = NULL;
Christopher Fauletc36de9d2022-01-06 08:44:58 +0100350
351 if (cs->app == NULL)
352 cs_free(cs);
353}
354
355void cs_detach_app(struct conn_stream *cs)
356{
357 si_free(cs->si);
358 cs->app = NULL;
359 cs->si = NULL;
360 cs->data_cb = NULL;
Christopher Faulet8da67aa2022-03-29 17:53:09 +0200361 sockaddr_free(&cs->src);
362 sockaddr_free(&cs->dst);
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200363
364 if (cs->wait_event.tasklet)
365 tasklet_free(cs->wait_event.tasklet);
366 cs->wait_event.tasklet = NULL;
367 cs->wait_event.events = 0;
368
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100369 if (!cs->endp || (cs->endp->flags & CS_EP_DETACHED))
Christopher Fauletc36de9d2022-01-06 08:44:58 +0100370 cs_free(cs);
Christopher Fauletcda94ac2021-12-23 17:28:17 +0100371}
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100372
373int cs_reset_endp(struct conn_stream *cs)
374{
Christopher Fauletb041b232022-03-24 10:27:02 +0100375 struct cs_endpoint *new_endp;
376
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100377 BUG_ON(!cs->app);
Christopher Fauletb041b232022-03-24 10:27:02 +0100378 if (!__cs_endp_target(cs)) {
379 /* endpoint not attached or attached to a mux with no
380 * target. Thus the endpoint will not be release but just
381 * reset
382 */
383 cs_detach_endp(cs);
384 return 0;
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100385 }
Christopher Fauletb041b232022-03-24 10:27:02 +0100386
387 /* allocate the new endpoint first to be able to set error if it
388 * fails */
389 new_endp = cs_endpoint_new();
390 if (!unlikely(new_endp)) {
391 cs->endp->flags |= CS_EP_ERROR;
392 return -1;
393 }
394
395 cs_detach_endp(cs);
396 BUG_ON(cs->endp);
397 cs->endp = new_endp;
398 cs->endp->flags |= CS_EP_DETACHED;
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100399 return 0;
400}
Christopher Faulet37046632022-04-01 11:36:58 +0200401
402
403/* Register an applet to handle a conn-stream as a new appctx. The CS will
404 * wake it up every time it is solicited. The appctx must be deleted by the task
405 * handler using cs_detach_endp(), possibly from within the function itself.
406 * It also pre-initializes the applet's context and returns it (or NULL in case
407 * it could not be allocated).
408 */
409struct appctx *cs_register_applet(struct conn_stream *cs, struct applet *app)
410{
411 struct appctx *appctx;
412
413 DPRINTF(stderr, "registering handler %p for cs %p (was %p)\n", app, cs, cs_strm_task(cs));
414
415 appctx = appctx_new(app, cs->endp);
416 if (!appctx)
417 return NULL;
418 cs_attach_applet(cs, appctx, appctx);
419 appctx->owner = cs;
420 appctx->t->nice = __cs_strm(cs)->task->nice;
421 si_cant_get(cs->si);
422 appctx_wakeup(appctx);
423 return appctx;
424}
425
426/* call the applet's release function if any. Needs to be called upon close() */
427void cs_applet_release(struct conn_stream *cs)
428{
429 struct appctx *appctx = __cs_appctx(cs);
430
431 if (appctx->applet->release && !cs_state_in(cs->state, CS_SB_DIS|CS_SB_CLO))
432 appctx->applet->release(appctx);
433}
Christopher Faulet9ffddd52022-04-01 14:04:29 +0200434
435/*
436 * This function performs a shutdown-read on a detached conn-stream in a
437 * connected or init state (it does nothing for other states). It either shuts
438 * the read side or marks itself as closed. The buffer flags are updated to
439 * reflect the new state. If the stream interface has CS_FL_NOHALF, we also
440 * forward the close to the write side. The owner task is woken up if it exists.
441 */
442static void cs_app_shutr(struct conn_stream *cs)
443{
444 struct channel *ic = cs_ic(cs);
445
446 si_rx_shut_blk(cs->si);
447 if (ic->flags & CF_SHUTR)
448 return;
449 ic->flags |= CF_SHUTR;
450 ic->rex = TICK_ETERNITY;
451
452 if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
453 return;
454
455 if (cs_oc(cs)->flags & CF_SHUTW) {
456 cs->state = CS_ST_DIS;
457 __cs_strm(cs)->conn_exp = TICK_ETERNITY;
458 }
459 else if (cs->flags & CS_FL_NOHALF) {
460 /* we want to immediately forward this close to the write side */
461 return cs_app_shutw(cs);
462 }
463
464 /* note that if the task exists, it must unregister itself once it runs */
465 if (!(cs->flags & CS_FL_DONT_WAKE))
466 task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
467}
468
469/*
470 * This function performs a shutdown-write on a detached conn-stream in a
471 * connected or init state (it does nothing for other states). It either shuts
472 * the write side or marks itself as closed. The buffer flags are updated to
473 * reflect the new state. It does also close everything if the SI was marked as
474 * being in error state. The owner task is woken up if it exists.
475 */
476static void cs_app_shutw(struct conn_stream *cs)
477{
478 struct channel *ic = cs_ic(cs);
479 struct channel *oc = cs_oc(cs);
480
481 oc->flags &= ~CF_SHUTW_NOW;
482 if (oc->flags & CF_SHUTW)
483 return;
484 oc->flags |= CF_SHUTW;
485 oc->wex = TICK_ETERNITY;
486 si_done_get(cs->si);
487
488 if (tick_isset(cs->hcto)) {
489 ic->rto = cs->hcto;
490 ic->rex = tick_add(now_ms, ic->rto);
491 }
492
493 switch (cs->state) {
494 case CS_ST_RDY:
495 case CS_ST_EST:
496 /* we have to shut before closing, otherwise some short messages
497 * may never leave the system, especially when there are remaining
498 * unread data in the socket input buffer, or when nolinger is set.
499 * However, if CS_FL_NOLINGER is explicitly set, we know there is
500 * no risk so we close both sides immediately.
501 */
502 if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
503 !(ic->flags & (CF_SHUTR|CF_DONT_READ)))
504 return;
505
506 /* fall through */
507 case CS_ST_CON:
508 case CS_ST_CER:
509 case CS_ST_QUE:
510 case CS_ST_TAR:
511 /* Note that none of these states may happen with applets */
512 cs->state = CS_ST_DIS;
513 /* fall through */
514 default:
515 cs->flags &= ~CS_FL_NOLINGER;
516 si_rx_shut_blk(cs->si);
517 ic->flags |= CF_SHUTR;
518 ic->rex = TICK_ETERNITY;
519 __cs_strm(cs)->conn_exp = TICK_ETERNITY;
520 }
521
522 /* note that if the task exists, it must unregister itself once it runs */
523 if (!(cs->flags & CS_FL_DONT_WAKE))
524 task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
525}
526
527/* default chk_rcv function for scheduled tasks */
528static void cs_app_chk_rcv(struct conn_stream *cs)
529{
530 struct channel *ic = cs_ic(cs);
531
532 DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
533 __FUNCTION__,
534 cs, cs->state, ic->flags, cs_oc(cs)->flags);
535
536 if (ic->pipe) {
537 /* stop reading */
538 si_rx_room_blk(cs->si);
539 }
540 else {
541 /* (re)start reading */
542 if (!(cs->flags & CS_FL_DONT_WAKE))
543 task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
544 }
545}
546
547/* default chk_snd function for scheduled tasks */
548static void cs_app_chk_snd(struct conn_stream *cs)
549{
550 struct channel *oc = cs_oc(cs);
551
552 DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
553 __FUNCTION__,
554 cs, cs->state, cs_ic(cs)->flags, oc->flags);
555
556 if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
557 return;
558
559 if (!(cs->si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
560 channel_is_empty(oc)) /* called with nothing to send ! */
561 return;
562
563 /* Otherwise there are remaining data to be sent in the buffer,
564 * so we tell the handler.
565 */
566 cs->si->flags &= ~SI_FL_WAIT_DATA;
567 if (!tick_isset(oc->wex))
568 oc->wex = tick_add_ifset(now_ms, oc->wto);
569
570 if (!(cs->flags & CS_FL_DONT_WAKE))
571 task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
572}
573
574/*
575 * This function performs a shutdown-read on a conn-stream attached to
576 * a connection in a connected or init state (it does nothing for other
577 * states). It either shuts the read side or marks itself as closed. The buffer
578 * flags are updated to reflect the new state. If the stream interface has
579 * CS_FL_NOHALF, we also forward the close to the write side. If a control
580 * layer is defined, then it is supposed to be a socket layer and file
581 * descriptors are then shutdown or closed accordingly. The function
582 * automatically disables polling if needed.
583 */
584static void cs_app_shutr_conn(struct conn_stream *cs)
585{
586 struct channel *ic = cs_ic(cs);
587
588 BUG_ON(!cs_conn(cs));
589
590 si_rx_shut_blk(cs->si);
591 if (ic->flags & CF_SHUTR)
592 return;
593 ic->flags |= CF_SHUTR;
594 ic->rex = TICK_ETERNITY;
595
596 if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
597 return;
598
599 if (cs_oc(cs)->flags & CF_SHUTW) {
600 cs_conn_close(cs);
601 cs->state = CS_ST_DIS;
602 __cs_strm(cs)->conn_exp = TICK_ETERNITY;
603 }
604 else if (cs->flags & CS_FL_NOHALF) {
605 /* we want to immediately forward this close to the write side */
606 return cs_app_shutw_conn(cs);
607 }
608}
609
610/*
611 * This function performs a shutdown-write on a conn-stream attached to
612 * a connection in a connected or init state (it does nothing for other
613 * states). It either shuts the write side or marks itself as closed. The
614 * buffer flags are updated to reflect the new state. It does also close
615 * everything if the SI was marked as being in error state. If there is a
616 * data-layer shutdown, it is called.
617 */
618static void cs_app_shutw_conn(struct conn_stream *cs)
619{
620 struct channel *ic = cs_ic(cs);
621 struct channel *oc = cs_oc(cs);
622
623 BUG_ON(!cs_conn(cs));
624
625 oc->flags &= ~CF_SHUTW_NOW;
626 if (oc->flags & CF_SHUTW)
627 return;
628 oc->flags |= CF_SHUTW;
629 oc->wex = TICK_ETERNITY;
630 si_done_get(cs->si);
631
632 if (tick_isset(cs->hcto)) {
633 ic->rto = cs->hcto;
634 ic->rex = tick_add(now_ms, ic->rto);
635 }
636
637 switch (cs->state) {
638 case CS_ST_RDY:
639 case CS_ST_EST:
640 /* we have to shut before closing, otherwise some short messages
641 * may never leave the system, especially when there are remaining
642 * unread data in the socket input buffer, or when nolinger is set.
643 * However, if CS_FL_NOLINGER is explicitly set, we know there is
644 * no risk so we close both sides immediately.
645 */
646
647 if (cs->endp->flags & CS_EP_ERROR) {
648 /* quick close, the socket is already shut anyway */
649 }
650 else if (cs->flags & CS_FL_NOLINGER) {
651 /* unclean data-layer shutdown, typically an aborted request
652 * or a forwarded shutdown from a client to a server due to
653 * option abortonclose. No need for the TLS layer to try to
654 * emit a shutdown message.
655 */
656 cs_conn_shutw(cs, CO_SHW_SILENT);
657 }
658 else {
659 /* clean data-layer shutdown. This only happens on the
660 * frontend side, or on the backend side when forwarding
661 * a client close in TCP mode or in HTTP TUNNEL mode
662 * while option abortonclose is set. We want the TLS
663 * layer to try to signal it to the peer before we close.
664 */
665 cs_conn_shutw(cs, CO_SHW_NORMAL);
666
667 if (!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
668 return;
669 }
670
671 /* fall through */
672 case CS_ST_CON:
673 /* we may have to close a pending connection, and mark the
674 * response buffer as shutr
675 */
676 cs_conn_close(cs);
677 /* fall through */
678 case CS_ST_CER:
679 case CS_ST_QUE:
680 case CS_ST_TAR:
681 cs->state = CS_ST_DIS;
682 /* fall through */
683 default:
684 cs->flags &= ~CS_FL_NOLINGER;
685 si_rx_shut_blk(cs->si);
686 ic->flags |= CF_SHUTR;
687 ic->rex = TICK_ETERNITY;
688 __cs_strm(cs)->conn_exp = TICK_ETERNITY;
689 }
690}
691
692/* This function is used for inter-conn-stream calls. It is called by the
693 * consumer to inform the producer side that it may be interested in checking
694 * for free space in the buffer. Note that it intentionally does not update
695 * timeouts, so that we can still check them later at wake-up. This function is
696 * dedicated to connection-based stream interfaces.
697 */
698static void cs_app_chk_rcv_conn(struct conn_stream *cs)
699{
700 BUG_ON(!cs_conn(cs));
701
702 /* (re)start reading */
703 if (cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
704 tasklet_wakeup(cs->wait_event.tasklet);
705}
706
707
708/* This function is used for inter-conn-stream calls. It is called by the
709 * producer to inform the consumer side that it may be interested in checking
710 * for data in the buffer. Note that it intentionally does not update timeouts,
711 * so that we can still check them later at wake-up.
712 */
713static void cs_app_chk_snd_conn(struct conn_stream *cs)
714{
715 struct channel *oc = cs_oc(cs);
716
717 BUG_ON(!cs_conn(cs));
718
719 if (unlikely(!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST) ||
720 (oc->flags & CF_SHUTW)))
721 return;
722
723 if (unlikely(channel_is_empty(oc))) /* called with nothing to send ! */
724 return;
725
726 if (!oc->pipe && /* spliced data wants to be forwarded ASAP */
727 !(cs->si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
728 return;
729
730 if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
731 si_cs_send(cs);
732
733 if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
734 /* Write error on the file descriptor */
735 if (cs->state >= CS_ST_CON)
736 cs->endp->flags |= CS_EP_ERROR;
737 goto out_wakeup;
738 }
739
740 /* OK, so now we know that some data might have been sent, and that we may
741 * have to poll first. We have to do that too if the buffer is not empty.
742 */
743 if (channel_is_empty(oc)) {
744 /* the connection is established but we can't write. Either the
745 * buffer is empty, or we just refrain from sending because the
746 * ->o limit was reached. Maybe we just wrote the last
747 * chunk and need to close.
748 */
749 if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
750 (CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
751 cs_state_in(cs->state, CS_SB_RDY|CS_SB_EST)) {
752 cs_shutw(cs);
753 goto out_wakeup;
754 }
755
756 if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0)
757 cs->si->flags |= SI_FL_WAIT_DATA;
758 oc->wex = TICK_ETERNITY;
759 }
760 else {
761 /* Otherwise there are remaining data to be sent in the buffer,
762 * which means we have to poll before doing so.
763 */
764 cs->si->flags &= ~SI_FL_WAIT_DATA;
765 if (!tick_isset(oc->wex))
766 oc->wex = tick_add_ifset(now_ms, oc->wto);
767 }
768
769 if (likely(oc->flags & CF_WRITE_ACTIVITY)) {
770 struct channel *ic = cs_ic(cs);
771
772 /* update timeout if we have written something */
773 if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
774 !channel_is_empty(oc))
775 oc->wex = tick_add_ifset(now_ms, oc->wto);
776
777 if (tick_isset(ic->rex) && !(cs->flags & CS_FL_INDEP_STR)) {
778 /* Note: to prevent the client from expiring read timeouts
779 * during writes, we refresh it. We only do this if the
780 * interface is not configured for "independent streams",
781 * because for some applications it's better not to do this,
782 * for instance when continuously exchanging small amounts
783 * of data which can full the socket buffers long before a
784 * write timeout is detected.
785 */
786 ic->rex = tick_add_ifset(now_ms, ic->rto);
787 }
788 }
789
790 /* in case of special condition (error, shutdown, end of write...), we
791 * have to notify the task.
792 */
793 if (likely((oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
794 ((oc->flags & CF_WAKE_WRITE) &&
795 ((channel_is_empty(oc) && !oc->to_forward) ||
796 !cs_state_in(cs->state, CS_SB_EST))))) {
797 out_wakeup:
798 if (!(cs->flags & CS_FL_DONT_WAKE))
799 task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
800 }
801}
802
803/*
804 * This function performs a shutdown-read on a conn-stream attached to an
805 * applet in a connected or init state (it does nothing for other states). It
806 * either shuts the read side or marks itself as closed. The buffer flags are
807 * updated to reflect the new state. If the stream interface has CS_FL_NOHALF,
808 * we also forward the close to the write side. The owner task is woken up if
809 * it exists.
810 */
811static void cs_app_shutr_applet(struct conn_stream *cs)
812{
813 struct channel *ic = cs_ic(cs);
814
815 BUG_ON(!cs_appctx(cs));
816
817 si_rx_shut_blk(cs->si);
818 if (ic->flags & CF_SHUTR)
819 return;
820 ic->flags |= CF_SHUTR;
821 ic->rex = TICK_ETERNITY;
822
823 /* Note: on shutr, we don't call the applet */
824
825 if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
826 return;
827
828 if (cs_oc(cs)->flags & CF_SHUTW) {
829 cs_applet_release(cs);
830 cs->state = CS_ST_DIS;
831 __cs_strm(cs)->conn_exp = TICK_ETERNITY;
832 }
833 else if (cs->flags & CS_FL_NOHALF) {
834 /* we want to immediately forward this close to the write side */
835 return cs_app_shutw_applet(cs);
836 }
837}
838
839/*
840 * This function performs a shutdown-write on a conn-stream attached to an
841 * applet in a connected or init state (it does nothing for other states). It
842 * either shuts the write side or marks itself as closed. The buffer flags are
843 * updated to reflect the new state. It does also close everything if the SI
844 * was marked as being in error state. The owner task is woken up if it exists.
845 */
846static void cs_app_shutw_applet(struct conn_stream *cs)
847{
848 struct channel *ic = cs_ic(cs);
849 struct channel *oc = cs_oc(cs);
850
851 BUG_ON(!cs_appctx(cs));
852
853 oc->flags &= ~CF_SHUTW_NOW;
854 if (oc->flags & CF_SHUTW)
855 return;
856 oc->flags |= CF_SHUTW;
857 oc->wex = TICK_ETERNITY;
858 si_done_get(cs->si);
859
860 if (tick_isset(cs->hcto)) {
861 ic->rto = cs->hcto;
862 ic->rex = tick_add(now_ms, ic->rto);
863 }
864
865 /* on shutw we always wake the applet up */
866 appctx_wakeup(__cs_appctx(cs));
867
868 switch (cs->state) {
869 case CS_ST_RDY:
870 case CS_ST_EST:
871 /* we have to shut before closing, otherwise some short messages
872 * may never leave the system, especially when there are remaining
873 * unread data in the socket input buffer, or when nolinger is set.
874 * However, if CS_FL_NOLINGER is explicitly set, we know there is
875 * no risk so we close both sides immediately.
876 */
877 if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
878 !(ic->flags & (CF_SHUTR|CF_DONT_READ)))
879 return;
880
881 /* fall through */
882 case CS_ST_CON:
883 case CS_ST_CER:
884 case CS_ST_QUE:
885 case CS_ST_TAR:
886 /* Note that none of these states may happen with applets */
887 cs_applet_release(cs);
888 cs->state = CS_ST_DIS;
889 /* fall through */
890 default:
891 cs->flags &= ~CS_FL_NOLINGER;
892 si_rx_shut_blk(cs->si);
893 ic->flags |= CF_SHUTR;
894 ic->rex = TICK_ETERNITY;
895 __cs_strm(cs)->conn_exp = TICK_ETERNITY;
896 }
897}
898
899/* chk_rcv function for applets */
900static void cs_app_chk_rcv_applet(struct conn_stream *cs)
901{
902 struct channel *ic = cs_ic(cs);
903
904 BUG_ON(!cs_appctx(cs));
905
906 DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
907 __FUNCTION__,
908 cs, cs->state, ic->flags, cs_oc(cs)->flags);
909
910 if (!ic->pipe) {
911 /* (re)start reading */
912 appctx_wakeup(__cs_appctx(cs));
913 }
914}
915
916/* chk_snd function for applets */
917static void cs_app_chk_snd_applet(struct conn_stream *cs)
918{
919 struct channel *oc = cs_oc(cs);
920
921 BUG_ON(!cs_appctx(cs));
922
923 DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
924 __FUNCTION__,
925 cs, cs->state, cs_ic(cs)->flags, oc->flags);
926
927 if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
928 return;
929
930 /* we only wake the applet up if it was waiting for some data */
931
932 if (!(cs->si->flags & SI_FL_WAIT_DATA))
933 return;
934
935 if (!tick_isset(oc->wex))
936 oc->wex = tick_add_ifset(now_ms, oc->wto);
937
938 if (!channel_is_empty(oc)) {
939 /* (re)start sending */
940 appctx_wakeup(__cs_appctx(cs));
941 }
942}
Christopher Faulet13045f02022-04-01 14:23:38 +0200943
944
945/* This function is designed to be called from within the stream handler to
946 * update the input channel's expiration timer and the conn-stream's
947 * Rx flags based on the channel's flags. It needs to be called only once
948 * after the channel's flags have settled down, and before they are cleared,
949 * though it doesn't harm to call it as often as desired (it just slightly
950 * hurts performance). It must not be called from outside of the stream
951 * handler, as what it does will be used to compute the stream task's
952 * expiration.
953 */
954void cs_update_rx(struct conn_stream *cs)
955{
956 struct channel *ic = cs_ic(cs);
957
958 if (ic->flags & CF_SHUTR) {
959 si_rx_shut_blk(cs->si);
960 return;
961 }
962
963 /* Read not closed, update FD status and timeout for reads */
964 if (ic->flags & CF_DONT_READ)
965 si_rx_chan_blk(cs->si);
966 else
967 si_rx_chan_rdy(cs->si);
968
969 if (!channel_is_empty(ic) || !channel_may_recv(ic)) {
970 /* stop reading, imposed by channel's policy or contents */
971 si_rx_room_blk(cs->si);
972 }
973 else {
974 /* (re)start reading and update timeout. Note: we don't recompute the timeout
975 * every time we get here, otherwise it would risk never to expire. We only
976 * update it if is was not yet set. The stream socket handler will already
977 * have updated it if there has been a completed I/O.
978 */
979 si_rx_room_rdy(cs->si);
980 }
981 if (cs->si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
982 ic->rex = TICK_ETERNITY;
983 else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
984 ic->rex = tick_add_ifset(now_ms, ic->rto);
985
986 cs_chk_rcv(cs);
987}
988
989/* This function is designed to be called from within the stream handler to
990 * update the output channel's expiration timer and the conn-stream's
991 * Tx flags based on the channel's flags. It needs to be called only once
992 * after the channel's flags have settled down, and before they are cleared,
993 * though it doesn't harm to call it as often as desired (it just slightly
994 * hurts performance). It must not be called from outside of the stream
995 * handler, as what it does will be used to compute the stream task's
996 * expiration.
997 */
998void cs_update_tx(struct conn_stream *cs)
999{
1000 struct channel *oc = cs_oc(cs);
1001 struct channel *ic = cs_ic(cs);
1002
1003 if (oc->flags & CF_SHUTW)
1004 return;
1005
1006 /* Write not closed, update FD status and timeout for writes */
1007 if (channel_is_empty(oc)) {
1008 /* stop writing */
1009 if (!(cs->si->flags & SI_FL_WAIT_DATA)) {
1010 if ((oc->flags & CF_SHUTW_NOW) == 0)
1011 cs->si->flags |= SI_FL_WAIT_DATA;
1012 oc->wex = TICK_ETERNITY;
1013 }
1014 return;
1015 }
1016
1017 /* (re)start writing and update timeout. Note: we don't recompute the timeout
1018 * every time we get here, otherwise it would risk never to expire. We only
1019 * update it if is was not yet set. The stream socket handler will already
1020 * have updated it if there has been a completed I/O.
1021 */
1022 cs->si->flags &= ~SI_FL_WAIT_DATA;
1023 if (!tick_isset(oc->wex)) {
1024 oc->wex = tick_add_ifset(now_ms, oc->wto);
1025 if (tick_isset(ic->rex) && !(cs->flags & CS_FL_INDEP_STR)) {
1026 /* Note: depending on the protocol, we don't know if we're waiting
1027 * for incoming data or not. So in order to prevent the socket from
1028 * expiring read timeouts during writes, we refresh the read timeout,
1029 * except if it was already infinite or if we have explicitly setup
1030 * independent streams.
1031 */
1032 ic->rex = tick_add_ifset(now_ms, ic->rto);
1033 }
1034 }
1035}