blob: ed0f7ca747a45bc2b6245ba20596571c65242394 [file] [log] [blame]
Willy Tarreaucff64112008-11-03 06:26:53 +01001/*
2 * Functions managing stream_interface structures
3 *
Willy Tarreauf873d752012-05-11 17:47:17 +02004 * Copyright 2000-2012 Willy Tarreau <w@1wt.eu>
Willy Tarreaucff64112008-11-03 06:26:53 +01005 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17
18#include <sys/socket.h>
19#include <sys/stat.h>
20#include <sys/types.h>
21
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020023#include <haproxy/applet.h>
Willy Tarreauf1d32c42020-06-04 21:07:02 +020024#include <haproxy/channel.h>
Willy Tarreau7ea393d2020-06-04 18:02:10 +020025#include <haproxy/connection.h>
Christopher Faulet0c6a64c2022-04-01 08:58:29 +020026#include <haproxy/conn_stream.h>
27#include <haproxy/cs_utils.h>
Willy Tarreau2741c8c2020-06-02 11:28:02 +020028#include <haproxy/dynbuf.h>
Willy Tarreaub7fc4c42021-10-06 18:56:42 +020029#include <haproxy/http_ana.h>
Willy Tarreau87735332020-06-04 09:08:41 +020030#include <haproxy/http_htx.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020031#include <haproxy/pipe-t.h>
32#include <haproxy/pipe.h>
Christopher Fauletcda94ac2021-12-23 17:28:17 +010033#include <haproxy/pool.h>
Willy Tarreaua264d962020-06-04 22:29:18 +020034#include <haproxy/proxy.h>
Willy Tarreaudfd3de82020-06-04 23:46:14 +020035#include <haproxy/stream-t.h>
Willy Tarreau5e539c92020-06-04 20:45:39 +020036#include <haproxy/stream_interface.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020037#include <haproxy/task.h>
Willy Tarreauc2f7c582020-06-02 18:15:32 +020038#include <haproxy/ticks.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020039#include <haproxy/tools.h>
Willy Tarreaucff64112008-11-03 06:26:53 +010040
Willy Tarreaufd31e532012-07-23 18:24:25 +020041
Christopher Fauletcda94ac2021-12-23 17:28:17 +010042DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream_interface));
43
Willy Tarreau14bfe9a2018-12-19 15:19:27 +010044/* last read notification */
Christopher Fauletd715d362022-04-01 16:38:32 +020045static void cs_conn_read0(struct conn_stream *cs);
Willy Tarreau14bfe9a2018-12-19 15:19:27 +010046
47/* post-IO notification callback */
Christopher Faulet9029a722022-04-01 16:48:36 +020048static void cs_notify(struct conn_stream *cs);
Willy Tarreauf873d752012-05-11 17:47:17 +020049
Willy Tarreau74beec32012-10-03 00:41:04 +020050struct data_cb si_conn_cb = {
Olivier Houchard21df6cc2018-09-14 23:21:44 +020051 .wake = si_cs_process,
Willy Tarreau8e0bb0a2016-11-24 16:58:12 +010052 .name = "STRM",
Willy Tarreauc5788912012-08-24 18:12:41 +020053};
54
Christopher Fauletcda94ac2021-12-23 17:28:17 +010055
Christopher Faulet6059ba42022-04-01 16:34:53 +020056struct data_cb cs_data_applet_cb = {
57 .wake = cs_applet_process,
58 .name = "STRM",
59};
60
Christopher Fauletcda94ac2021-12-23 17:28:17 +010061struct stream_interface *si_new(struct conn_stream *cs)
62{
63 struct stream_interface *si;
64
65 si = pool_alloc(pool_head_streaminterface);
66 if (unlikely(!si))
67 return NULL;
68 si->flags = SI_FL_NONE;
Christopher Faulet014ac352022-01-06 08:13:46 +010069 if (si_init(si) < 0) {
Christopher Fauletcda94ac2021-12-23 17:28:17 +010070 pool_free(pool_head_streaminterface, si);
71 return NULL;
72 }
73 si->cs = cs;
74 return si;
75}
76
77void si_free(struct stream_interface *si)
78{
79 if (!si)
80 return;
81
Christopher Fauletcda94ac2021-12-23 17:28:17 +010082 pool_free(pool_head_streaminterface, si);
83}
84
Christopher Faulet13045f02022-04-01 14:23:38 +020085/* This function is the equivalent to cs_update() except that it's
Willy Tarreau615f28b2015-09-23 18:40:09 +020086 * designed to be called from outside the stream handlers, typically the lower
87 * layers (applets, connections) after I/O completion. After updating the stream
88 * interface and timeouts, it will try to forward what can be forwarded, then to
89 * wake the associated task up if an important event requires special handling.
Willy Tarreaud0f5bbc2018-11-14 11:10:26 +010090 * It may update SI_FL_WAIT_DATA and/or SI_FL_RXBLK_ROOM, that the callers are
Willy Tarreau0dfccb22018-10-25 13:55:20 +020091 * encouraged to watch to take appropriate action.
Christopher Faulet13045f02022-04-01 14:23:38 +020092 * It should not be called from within the stream itself, cs_update()
Willy Tarreau615f28b2015-09-23 18:40:09 +020093 * is designed for this.
94 */
Christopher Faulet9029a722022-04-01 16:48:36 +020095static void cs_notify(struct conn_stream *cs)
Willy Tarreau615f28b2015-09-23 18:40:09 +020096{
Christopher Faulet9029a722022-04-01 16:48:36 +020097 struct channel *ic = cs_ic(cs);
98 struct channel *oc = cs_oc(cs);
99 struct conn_stream *cso = cs_opposite(cs);
100 struct task *task = cs_strm_task(cs);
Willy Tarreau615f28b2015-09-23 18:40:09 +0200101
102 /* process consumer side */
103 if (channel_is_empty(oc)) {
Christopher Faulet9029a722022-04-01 16:48:36 +0200104 struct connection *conn = cs_conn(cs);
Olivier Houcharde9bed532017-11-16 17:49:25 +0100105
Willy Tarreau615f28b2015-09-23 18:40:09 +0200106 if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
Christopher Faulet9029a722022-04-01 16:48:36 +0200107 (cs->state == CS_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS))))
108 cs_shutw(cs);
Willy Tarreau615f28b2015-09-23 18:40:09 +0200109 oc->wex = TICK_ETERNITY;
110 }
111
Willy Tarreau8cf9c8e2016-12-13 15:21:25 +0100112 /* indicate that we may be waiting for data from the output channel or
113 * we're about to close and can't expect more data if SHUTW_NOW is there.
114 */
Christopher Fauletb3e0de42018-10-11 13:54:13 +0200115 if (!(oc->flags & (CF_SHUTW|CF_SHUTW_NOW)))
Christopher Faulet9029a722022-04-01 16:48:36 +0200116 cs->si->flags |= SI_FL_WAIT_DATA;
Willy Tarreau8cf9c8e2016-12-13 15:21:25 +0100117 else if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW)
Christopher Faulet9029a722022-04-01 16:48:36 +0200118 cs->si->flags &= ~SI_FL_WAIT_DATA;
Willy Tarreau615f28b2015-09-23 18:40:09 +0200119
120 /* update OC timeouts and wake the other side up if it's waiting for room */
121 if (oc->flags & CF_WRITE_ACTIVITY) {
122 if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
123 !channel_is_empty(oc))
124 if (tick_isset(oc->wex))
125 oc->wex = tick_add_ifset(now_ms, oc->wto);
126
Christopher Faulet9029a722022-04-01 16:48:36 +0200127 if (!(cs->flags & CS_FL_INDEP_STR))
Willy Tarreau615f28b2015-09-23 18:40:09 +0200128 if (tick_isset(ic->rex))
129 ic->rex = tick_add_ifset(now_ms, ic->rto);
Willy Tarreauf26c26c2018-11-12 16:11:08 +0100130 }
Willy Tarreau615f28b2015-09-23 18:40:09 +0200131
Willy Tarreaub26a6f92018-11-14 17:10:36 +0100132 if (oc->flags & CF_DONT_READ)
Christopher Faulet9029a722022-04-01 16:48:36 +0200133 si_rx_chan_blk(cso->si);
Willy Tarreaub26a6f92018-11-14 17:10:36 +0100134 else
Christopher Faulet9029a722022-04-01 16:48:36 +0200135 si_rx_chan_rdy(cso->si);
Willy Tarreau615f28b2015-09-23 18:40:09 +0200136
137 /* Notify the other side when we've injected data into the IC that
138 * needs to be forwarded. We can do fast-forwarding as soon as there
139 * are output data, but we avoid doing this if some of the data are
140 * not yet scheduled for being forwarded, because it is very likely
141 * that it will be done again immediately afterwards once the following
Willy Tarreaud0f5bbc2018-11-14 11:10:26 +0100142 * data are parsed (eg: HTTP chunking). We only SI_FL_RXBLK_ROOM once
Willy Tarreau615f28b2015-09-23 18:40:09 +0200143 * we've emptied *some* of the output buffer, and not just when there
144 * is available room, because applets are often forced to stop before
145 * the buffer is full. We must not stop based on input data alone because
146 * an HTTP parser might need more data to complete the parsing.
147 */
148 if (!channel_is_empty(ic) &&
Christopher Faulet9029a722022-04-01 16:48:36 +0200149 (cso->si->flags & SI_FL_WAIT_DATA) &&
Willy Tarreau89b6a2b2018-11-18 15:46:10 +0100150 (!(ic->flags & CF_EXPECT_MORE) || c_full(ic) || ci_data(ic) == 0 || ic->pipe)) {
Willy Tarreau615f28b2015-09-23 18:40:09 +0200151 int new_len, last_len;
152
Willy Tarreau77e478c2018-06-19 07:03:14 +0200153 last_len = co_data(ic);
Willy Tarreau615f28b2015-09-23 18:40:09 +0200154 if (ic->pipe)
155 last_len += ic->pipe->data;
156
Christopher Faulet9029a722022-04-01 16:48:36 +0200157 cs_chk_snd(cso);
Willy Tarreau615f28b2015-09-23 18:40:09 +0200158
Willy Tarreau77e478c2018-06-19 07:03:14 +0200159 new_len = co_data(ic);
Willy Tarreau615f28b2015-09-23 18:40:09 +0200160 if (ic->pipe)
161 new_len += ic->pipe->data;
162
163 /* check if the consumer has freed some space either in the
164 * buffer or in the pipe.
165 */
Willy Tarreau47baeb82018-11-15 07:46:57 +0100166 if (new_len < last_len)
Christopher Faulet9029a722022-04-01 16:48:36 +0200167 si_rx_room_rdy(cs->si);
Willy Tarreau615f28b2015-09-23 18:40:09 +0200168 }
169
Willy Tarreaub26a6f92018-11-14 17:10:36 +0100170 if (!(ic->flags & CF_DONT_READ))
Christopher Faulet9029a722022-04-01 16:48:36 +0200171 si_rx_chan_rdy(cs->si);
Willy Tarreaub26a6f92018-11-14 17:10:36 +0100172
Christopher Faulet9029a722022-04-01 16:48:36 +0200173 cs_chk_rcv(cs);
174 cs_chk_rcv(cso);
Willy Tarreau47baeb82018-11-15 07:46:57 +0100175
Christopher Faulet9029a722022-04-01 16:48:36 +0200176 if (si_rx_blocked(cs->si)) {
Willy Tarreau615f28b2015-09-23 18:40:09 +0200177 ic->rex = TICK_ETERNITY;
178 }
Willy Tarreaub26a6f92018-11-14 17:10:36 +0100179 else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) {
Christopher Fauletda098e62022-03-31 17:44:45 +0200180 /* we must re-enable reading if cs_chk_snd() has freed some space */
Willy Tarreau615f28b2015-09-23 18:40:09 +0200181 if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
182 ic->rex = tick_add_ifset(now_ms, ic->rto);
183 }
184
185 /* wake the task up only when needed */
186 if (/* changes on the production side */
187 (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
Christopher Faulet9029a722022-04-01 16:48:36 +0200188 !cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST) ||
189 (cs->endp->flags & CS_EP_ERROR) ||
Willy Tarreau615f28b2015-09-23 18:40:09 +0200190 ((ic->flags & CF_READ_PARTIAL) &&
Christopher Faulet9029a722022-04-01 16:48:36 +0200191 ((ic->flags & CF_EOI) || !ic->to_forward || cso->state != CS_ST_EST)) ||
Willy Tarreau615f28b2015-09-23 18:40:09 +0200192
193 /* changes on the consumption side */
194 (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
Willy Tarreauede3d882018-10-24 17:17:56 +0200195 ((oc->flags & CF_WRITE_ACTIVITY) &&
Willy Tarreau615f28b2015-09-23 18:40:09 +0200196 ((oc->flags & CF_SHUTW) ||
Willy Tarreau78f5ff82018-12-19 11:00:00 +0100197 (((oc->flags & CF_WAKE_WRITE) ||
198 !(oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW|CF_SHUTW))) &&
Christopher Faulet9029a722022-04-01 16:48:36 +0200199 (cso->state != CS_ST_EST ||
Willy Tarreau615f28b2015-09-23 18:40:09 +0200200 (channel_is_empty(oc) && !oc->to_forward)))))) {
Christopher Fauletd7607de2019-01-03 16:24:54 +0100201 task_wakeup(task, TASK_WOKEN_IO);
202 }
203 else {
204 /* Update expiration date for the task and requeue it */
205 task->expire = tick_first((tick_is_expired(task->expire, now_ms) ? 0 : task->expire),
206 tick_first(tick_first(ic->rex, ic->wex),
207 tick_first(oc->rex, oc->wex)));
Willy Tarreau45bcb372019-08-01 18:51:38 +0200208
209 task->expire = tick_first(task->expire, ic->analyse_exp);
210 task->expire = tick_first(task->expire, oc->analyse_exp);
Christopher Faulet9029a722022-04-01 16:48:36 +0200211 task->expire = tick_first(task->expire, __cs_strm(cs)->conn_exp);
Willy Tarreau45bcb372019-08-01 18:51:38 +0200212
Christopher Fauletd7607de2019-01-03 16:24:54 +0100213 task_queue(task);
Willy Tarreau615f28b2015-09-23 18:40:09 +0200214 }
215 if (ic->flags & CF_READ_ACTIVITY)
216 ic->flags &= ~CF_READ_DONTWAIT;
Willy Tarreau615f28b2015-09-23 18:40:09 +0200217}
218
Olivier Houchardc2aa7112018-09-11 18:27:21 +0200219/* Called by I/O handlers after completion.. It propagates
Willy Tarreau651e1822015-09-23 20:06:13 +0200220 * connection flags to the stream interface, updates the stream (which may or
221 * may not take this opportunity to try to forward data), then update the
222 * connection's polling based on the channels and stream interface's final
223 * states. The function always returns 0.
Willy Tarreau100c4672012-08-20 12:06:26 +0200224 */
Christopher Faulet9b7a9b42022-04-01 13:48:39 +0200225int si_cs_process(struct conn_stream *cs)
Willy Tarreaufd31e532012-07-23 18:24:25 +0200226{
Christopher Faulet693b23b2022-02-28 09:09:05 +0100227 struct connection *conn = __cs_conn(cs);
Christopher Fauletf835dea2021-12-21 14:35:17 +0100228 struct stream_interface *si = cs_si(cs);
Willy Tarreauafc8a222014-11-28 15:46:27 +0100229 struct channel *ic = si_ic(si);
230 struct channel *oc = si_oc(si);
Willy Tarreaufd31e532012-07-23 18:24:25 +0200231
Christopher Faulet897d6122021-12-17 17:28:35 +0100232 BUG_ON(!conn);
233
Olivier Houchardc7ffa912018-08-28 19:37:41 +0200234 /* If we have data to send, try it now */
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200235 if (!channel_is_empty(oc) && !(si->cs->wait_event.events & SUB_RETRY_SEND))
Willy Tarreau908d26f2018-10-25 14:02:47 +0200236 si_cs_send(cs);
237
Christopher Fauletaf642df2022-03-30 10:06:11 +0200238 /* First step, report to the conn-stream what was detected at the
Willy Tarreau651e1822015-09-23 20:06:13 +0200239 * connection layer : errors and connection establishment.
Christopher Faulet6cd56d52022-03-30 10:47:32 +0200240 * Only add CS_EP_ERROR if we're connected, or we're attempting to
Olivier Houchardc31e2cb2019-06-24 16:08:08 +0200241 * connect, we may get there because we got woken up, but only run
242 * after process_stream() noticed there were an error, and decided
243 * to retry to connect, the connection may still have CO_FL_ERROR,
Christopher Faulet6cd56d52022-03-30 10:47:32 +0200244 * and we don't want to add CS_EP_ERROR back
Christopher Faulet36b536d2019-11-20 11:56:33 +0100245 *
246 * Note: This test is only required because si_cs_process is also the SI
247 * wake callback. Otherwise si_cs_recv()/si_cs_send() already take
248 * care of it.
Willy Tarreau651e1822015-09-23 20:06:13 +0200249 */
Willy Tarreaud1480cc2022-03-17 16:19:09 +0100250
Christopher Faulet62e75742022-03-31 09:16:34 +0200251 if (si->cs->state >= CS_ST_CON) {
Christopher Faulet6cd56d52022-03-30 10:47:32 +0200252 if (si_is_conn_error(si))
253 cs->endp->flags |= CS_EP_ERROR;
Willy Tarreaud1480cc2022-03-17 16:19:09 +0100254 }
Willy Tarreau3c55ec22012-07-23 19:19:51 +0200255
Olivier Houchardccaa7de2017-10-02 11:51:03 +0200256 /* If we had early data, and the handshake ended, then
257 * we can remove the flag, and attempt to wake the task up,
258 * in the event there's an analyser waiting for the end of
259 * the handshake.
260 */
Willy Tarreau911db9b2020-01-23 16:27:54 +0100261 if (!(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) &&
Christopher Faulete9e48202022-03-22 18:13:29 +0100262 (cs->endp->flags & CS_EP_WAIT_FOR_HS)) {
263 cs->endp->flags &= ~CS_EP_WAIT_FOR_HS;
Olivier Houchardccaa7de2017-10-02 11:51:03 +0200264 task_wakeup(si_task(si), TASK_WOKEN_MSG);
265 }
266
Christopher Faulet62e75742022-03-31 09:16:34 +0200267 if (!cs_state_in(si->cs->state, CS_SB_EST|CS_SB_DIS|CS_SB_CLO) &&
Willy Tarreau911db9b2020-01-23 16:27:54 +0100268 (conn->flags & CO_FL_WAIT_XPRT) == 0) {
Christopher Fauletae024ce2022-03-29 19:02:31 +0200269 __cs_strm(cs)->conn_exp = TICK_ETERNITY;
Willy Tarreauafc8a222014-11-28 15:46:27 +0100270 oc->flags |= CF_WRITE_NULL;
Christopher Faulet62e75742022-03-31 09:16:34 +0200271 if (si->cs->state == CS_ST_CON)
272 si->cs->state = CS_ST_RDY;
Willy Tarreau8f8c92f2012-07-23 19:45:44 +0200273 }
274
Christopher Faulet89e34c22021-01-21 16:22:01 +0100275 /* Report EOS on the channel if it was reached from the mux point of
276 * view.
277 *
278 * Note: This test is only required because si_cs_process is also the SI
279 * wake callback. Otherwise si_cs_recv()/si_cs_send() already take
280 * care of it.
281 */
Christopher Fauletb041b232022-03-24 10:27:02 +0100282 if (cs->endp->flags & CS_EP_EOS && !(ic->flags & CF_SHUTR)) {
Christopher Faulet89e34c22021-01-21 16:22:01 +0100283 /* we received a shutdown */
284 ic->flags |= CF_READ_NULL;
285 if (ic->flags & CF_AUTO_CLOSE)
286 channel_shutw_now(ic);
Christopher Fauletd715d362022-04-01 16:38:32 +0200287 cs_conn_read0(cs);
Christopher Faulet89e34c22021-01-21 16:22:01 +0100288 }
289
Christopher Faulet297d3e22019-03-22 14:16:14 +0100290 /* Report EOI on the channel if it was reached from the mux point of
Christopher Faulet36b536d2019-11-20 11:56:33 +0100291 * view.
292 *
293 * Note: This test is only required because si_cs_process is also the SI
294 * wake callback. Otherwise si_cs_recv()/si_cs_send() already take
295 * care of it.
296 */
Christopher Fauletb041b232022-03-24 10:27:02 +0100297 if ((cs->endp->flags & CS_EP_EOI) && !(ic->flags & CF_EOI))
Christopher Faulet8e9e3ef2019-05-17 09:14:10 +0200298 ic->flags |= (CF_EOI|CF_READ_PARTIAL);
Christopher Faulet203b2b02019-03-08 09:23:46 +0100299
Willy Tarreau651e1822015-09-23 20:06:13 +0200300 /* Second step : update the stream-int and channels, try to forward any
301 * pending data, then possibly wake the stream up based on the new
302 * stream-int status.
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200303 */
Christopher Faulet9029a722022-04-01 16:48:36 +0200304 cs_notify(cs);
Willy Tarreaua64c7032019-08-01 14:17:02 +0200305 stream_release_buffers(si_strm(si));
Willy Tarreau2396c1c2012-10-03 21:12:16 +0200306 return 0;
Willy Tarreaufd31e532012-07-23 18:24:25 +0200307}
Willy Tarreau2c6be842012-07-06 17:12:34 +0200308
Willy Tarreau5368d802012-08-21 18:22:06 +0200309/*
310 * This function is called to send buffer data to a stream socket.
Olivier Houchard9aaf7782017-09-13 18:30:23 +0200311 * It calls the mux layer's snd_buf function. It relies on the
Godbach4f489902013-12-04 17:24:06 +0800312 * caller to commit polling changes. The caller should check conn->flags
313 * for errors.
Willy Tarreau5368d802012-08-21 18:22:06 +0200314 */
Christopher Faulet9b7a9b42022-04-01 13:48:39 +0200315int si_cs_send(struct conn_stream *cs)
Willy Tarreau5368d802012-08-21 18:22:06 +0200316{
Christopher Faulet693b23b2022-02-28 09:09:05 +0100317 struct connection *conn = __cs_conn(cs);
Christopher Fauletf835dea2021-12-21 14:35:17 +0100318 struct stream_interface *si = cs_si(cs);
Christopher Faulete05bf9e2022-03-29 15:23:40 +0200319 struct stream *s = si_strm(si);
Willy Tarreauafc8a222014-11-28 15:46:27 +0100320 struct channel *oc = si_oc(si);
Willy Tarreau5368d802012-08-21 18:22:06 +0200321 int ret;
Olivier Houchard910b2bc2018-07-17 18:49:38 +0200322 int did_send = 0;
323
Christopher Fauletb041b232022-03-24 10:27:02 +0100324 if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(si)) {
Olivier Houchardc31e2cb2019-06-24 16:08:08 +0200325 /* We're probably there because the tasklet was woken up,
326 * but process_stream() ran before, detected there were an
Christopher Faulet62e75742022-03-31 09:16:34 +0200327 * error and put the si back to CS_ST_TAR. There's still
Olivier Houchardc31e2cb2019-06-24 16:08:08 +0200328 * CO_FL_ERROR on the connection but we don't want to add
Christopher Faulet6cd56d52022-03-30 10:47:32 +0200329 * CS_EP_ERROR back, so give up
Olivier Houchardc31e2cb2019-06-24 16:08:08 +0200330 */
Christopher Faulet62e75742022-03-31 09:16:34 +0200331 if (si->cs->state < CS_ST_CON)
Olivier Houchardc31e2cb2019-06-24 16:08:08 +0200332 return 0;
Christopher Faulet6cd56d52022-03-30 10:47:32 +0200333 cs->endp->flags |= CS_EP_ERROR;
Olivier Houchardc2aa7112018-09-11 18:27:21 +0200334 return 1;
Willy Tarreaubddf7fc2018-12-19 17:17:10 +0100335 }
Olivier Houchard910b2bc2018-07-17 18:49:38 +0200336
Christopher Faulet328ed222019-09-23 15:57:29 +0200337 /* We're already waiting to be able to send, give up */
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200338 if (si->cs->wait_event.events & SUB_RETRY_SEND)
Christopher Faulet328ed222019-09-23 15:57:29 +0200339 return 0;
340
Olivier Houchard910b2bc2018-07-17 18:49:38 +0200341 /* we might have been called just after an asynchronous shutw */
Willy Tarreauf22758d2020-01-23 18:25:23 +0100342 if (oc->flags & CF_SHUTW)
Olivier Houchardc2aa7112018-09-11 18:27:21 +0200343 return 1;
Willy Tarreau5368d802012-08-21 18:22:06 +0200344
Christopher Faulete96993b2020-07-30 09:26:46 +0200345 /* we must wait because the mux is not installed yet */
346 if (!conn->mux)
347 return 0;
348
Olivier Houchard9aaf7782017-09-13 18:30:23 +0200349 if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
350 ret = conn->mux->snd_pipe(cs, oc->pipe);
Christopher Faulet86162db2019-07-05 11:49:11 +0200351 if (ret > 0)
Olivier Houchard910b2bc2018-07-17 18:49:38 +0200352 did_send = 1;
Willy Tarreau5368d802012-08-21 18:22:06 +0200353
Willy Tarreauafc8a222014-11-28 15:46:27 +0100354 if (!oc->pipe->data) {
355 put_pipe(oc->pipe);
356 oc->pipe = NULL;
Willy Tarreau5368d802012-08-21 18:22:06 +0200357 }
358
Christopher Faulet3f76f4c2018-11-20 10:21:08 +0100359 if (oc->pipe)
360 goto end;
Willy Tarreau5368d802012-08-21 18:22:06 +0200361 }
362
363 /* At this point, the pipe is empty, but we may still have data pending
364 * in the normal buffer.
365 */
Christopher Faulet55dec0d2018-11-20 10:30:02 +0100366 if (co_data(oc)) {
367 /* when we're here, we already know that there is no spliced
368 * data left, and that there are sendable buffered data.
369 */
Willy Tarreau5368d802012-08-21 18:22:06 +0200370
Willy Tarreau5368d802012-08-21 18:22:06 +0200371 /* check if we want to inform the kernel that we're interested in
372 * sending more data after this call. We want this if :
373 * - we're about to close after this last send and want to merge
374 * the ongoing FIN with the last segment.
375 * - we know we can't send everything at once and must get back
376 * here because of unaligned data
377 * - there is still a finite amount of data to forward
378 * The test is arranged so that the most common case does only 2
379 * tests.
380 */
Willy Tarreau1049b1f2014-02-02 01:51:17 +0100381 unsigned int send_flag = 0;
Willy Tarreau5368d802012-08-21 18:22:06 +0200382
Willy Tarreauafc8a222014-11-28 15:46:27 +0100383 if ((!(oc->flags & (CF_NEVER_WAIT|CF_SEND_DONTWAIT)) &&
384 ((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) ||
Willy Tarreau8945bb62020-06-19 17:07:06 +0200385 (oc->flags & CF_EXPECT_MORE) ||
Christopher Faulet9e3dc832020-07-22 16:28:44 +0200386 (IS_HTX_STRM(si_strm(si)) &&
387 (!(oc->flags & (CF_EOI|CF_SHUTR)) && htx_expect_more(htxbuf(&oc->buf)))))) ||
Willy Tarreauecd2e152017-11-07 15:07:25 +0100388 ((oc->flags & CF_ISRESP) &&
389 ((oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW)) == (CF_AUTO_CLOSE|CF_SHUTW_NOW))))
Willy Tarreau1049b1f2014-02-02 01:51:17 +0100390 send_flag |= CO_SFL_MSG_MORE;
Willy Tarreau5368d802012-08-21 18:22:06 +0200391
Willy Tarreauafc8a222014-11-28 15:46:27 +0100392 if (oc->flags & CF_STREAMER)
Willy Tarreau7bed9452014-02-02 02:00:24 +0100393 send_flag |= CO_SFL_STREAMER;
394
Christopher Faulete05bf9e2022-03-29 15:23:40 +0200395 if (s->txn && s->txn->flags & TX_L7_RETRY && !b_data(&s->txn->l7_buffer)) {
Christopher Faulet9f5382e2021-05-21 13:46:14 +0200396 /* If we want to be able to do L7 retries, copy
397 * the data we're about to send, so that we are able
398 * to resend them if needed
399 */
400 /* Try to allocate a buffer if we had none.
401 * If it fails, the next test will just
402 * disable the l7 retries by setting
403 * l7_conn_retries to 0.
404 */
Christopher Faulete05bf9e2022-03-29 15:23:40 +0200405 if (s->txn->req.msg_state != HTTP_MSG_DONE)
406 s->txn->flags &= ~TX_L7_RETRY;
Christopher Faulet9f5382e2021-05-21 13:46:14 +0200407 else {
Christopher Faulete05bf9e2022-03-29 15:23:40 +0200408 if (b_alloc(&s->txn->l7_buffer) == NULL)
409 s->txn->flags &= ~TX_L7_RETRY;
Christopher Faulet9f5382e2021-05-21 13:46:14 +0200410 else {
Christopher Faulete05bf9e2022-03-29 15:23:40 +0200411 memcpy(b_orig(&s->txn->l7_buffer),
Christopher Faulet9f5382e2021-05-21 13:46:14 +0200412 b_orig(&oc->buf),
413 b_size(&oc->buf));
Christopher Faulete05bf9e2022-03-29 15:23:40 +0200414 s->txn->l7_buffer.head = co_data(oc);
415 b_add(&s->txn->l7_buffer, co_data(oc));
Christopher Faulet9f5382e2021-05-21 13:46:14 +0200416 }
417
418 }
419 }
420
Christopher Faulet897d6122021-12-17 17:28:35 +0100421 ret = conn->mux->snd_buf(cs, &oc->buf, co_data(oc), send_flag);
Godbache68e02d2013-10-11 15:48:29 +0800422 if (ret > 0) {
Olivier Houchard910b2bc2018-07-17 18:49:38 +0200423 did_send = 1;
Willy Tarreau84240042022-02-28 16:51:23 +0100424 c_rew(oc, ret);
Willy Tarreaudeccd112018-06-14 18:38:55 +0200425 c_realign_if_empty(oc);
426
427 if (!co_data(oc)) {
Godbache68e02d2013-10-11 15:48:29 +0800428 /* Always clear both flags once everything has been sent, they're one-shot */
Willy Tarreauafc8a222014-11-28 15:46:27 +0100429 oc->flags &= ~(CF_EXPECT_MORE | CF_SEND_DONTWAIT);
Godbache68e02d2013-10-11 15:48:29 +0800430 }
Godbache68e02d2013-10-11 15:48:29 +0800431 /* if some data remain in the buffer, it's only because the
432 * system buffers are full, we will try next time.
433 */
Willy Tarreau5368d802012-08-21 18:22:06 +0200434 }
Godbache68e02d2013-10-11 15:48:29 +0800435 }
Christopher Faulet55dec0d2018-11-20 10:30:02 +0100436
Willy Tarreauf6975aa2018-11-15 14:33:05 +0100437 end:
Christopher Faulet86162db2019-07-05 11:49:11 +0200438 if (did_send) {
439 oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
Christopher Faulet62e75742022-03-31 09:16:34 +0200440 if (si->cs->state == CS_ST_CON)
441 si->cs->state = CS_ST_RDY;
Christopher Faulet037b3eb2019-07-05 13:44:29 +0200442
443 si_rx_room_rdy(si_opposite(si));
Christopher Faulet86162db2019-07-05 11:49:11 +0200444 }
445
Christopher Fauletb041b232022-03-24 10:27:02 +0100446 if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING)) {
Christopher Faulet6cd56d52022-03-30 10:47:32 +0200447 cs->endp->flags |= CS_EP_ERROR;
Christopher Faulet86162db2019-07-05 11:49:11 +0200448 return 1;
449 }
450
Olivier Houchard910b2bc2018-07-17 18:49:38 +0200451 /* We couldn't send all of our data, let the mux know we'd like to send more */
Willy Tarreau691fe392018-11-12 18:48:52 +0100452 if (!channel_is_empty(oc))
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200453 conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->cs->wait_event);
Olivier Houchardf6535282018-08-31 17:29:12 +0200454 return did_send;
Willy Tarreau5368d802012-08-21 18:22:06 +0200455}
456
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200457/* This is the ->process() function for any conn-stream's wait_event task.
Willy Tarreau8ccd2082018-11-07 07:47:52 +0100458 * It's assigned during the stream-interface's initialization, for any type of
459 * stream interface. Thus it is always safe to perform a tasklet_wakeup() on a
460 * stream interface, as the presence of the CS is checked there.
461 */
Willy Tarreau144f84a2021-03-02 16:09:26 +0100462struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned int state)
Olivier Houchard91894cb2018-08-02 18:06:28 +0200463{
Olivier Houchard8f0b4c62018-08-02 18:21:38 +0200464 struct stream_interface *si = ctx;
Christopher Faulet13a35e52021-12-20 15:34:16 +0100465 struct conn_stream *cs = si->cs;
Olivier Houchardf6535282018-08-31 17:29:12 +0200466 int ret = 0;
Olivier Houcharda6ff0352018-08-21 15:59:43 +0200467
Christopher Faulet0256da12021-12-15 09:50:17 +0100468 if (!cs_conn(cs))
Willy Tarreau74163142021-03-13 11:30:19 +0100469 return t;
Willy Tarreau8ccd2082018-11-07 07:47:52 +0100470
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200471 if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
Olivier Houchardf6535282018-08-31 17:29:12 +0200472 ret = si_cs_send(cs);
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200473 if (!(cs->wait_event.events & SUB_RETRY_RECV))
Olivier Houchardf6535282018-08-31 17:29:12 +0200474 ret |= si_cs_recv(cs);
475 if (ret != 0)
Olivier Houchardc2aa7112018-09-11 18:27:21 +0200476 si_cs_process(cs);
Olivier Houchardf6535282018-08-31 17:29:12 +0200477
Willy Tarreaua64c7032019-08-01 14:17:02 +0200478 stream_release_buffers(si_strm(si));
Willy Tarreau74163142021-03-13 11:30:19 +0100479 return t;
Olivier Houchard91894cb2018-08-02 18:06:28 +0200480}
481
Christopher Faulet9936dc62022-02-28 09:21:58 +0100482/* This tries to perform a synchronous receive on the stream interface to
483 * try to collect last arrived data. In practice it's only implemented on
484 * conn_streams. Returns 0 if nothing was done, non-zero if new data or a
485 * shutdown were collected. This may result on some delayed receive calls
486 * to be programmed and performed later, though it doesn't provide any
487 * such guarantee.
488 */
489int si_sync_recv(struct stream_interface *si)
490{
Christopher Faulet62e75742022-03-31 09:16:34 +0200491 if (!cs_state_in(si->cs->state, CS_SB_RDY|CS_SB_EST))
Christopher Faulet9936dc62022-02-28 09:21:58 +0100492 return 0;
493
494 if (!cs_conn_mux(si->cs))
495 return 0; // only conn_streams are supported
496
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200497 if (si->cs->wait_event.events & SUB_RETRY_RECV)
Christopher Faulet9936dc62022-02-28 09:21:58 +0100498 return 0; // already subscribed
499
500 if (!si_rx_endp_ready(si) || si_rx_blocked(si))
501 return 0; // already failed
502
503 return si_cs_recv(si->cs);
504}
505
Willy Tarreau3b285d72019-06-06 08:20:17 +0200506/* perform a synchronous send() for the stream interface. The CF_WRITE_NULL and
507 * CF_WRITE_PARTIAL flags are cleared prior to the attempt, and will possibly
508 * be updated in case of success.
509 */
510void si_sync_send(struct stream_interface *si)
511{
512 struct channel *oc = si_oc(si);
Willy Tarreau3b285d72019-06-06 08:20:17 +0200513
514 oc->flags &= ~(CF_WRITE_NULL|CF_WRITE_PARTIAL);
515
516 if (oc->flags & CF_SHUTW)
517 return;
518
519 if (channel_is_empty(oc))
520 return;
521
Christopher Faulet62e75742022-03-31 09:16:34 +0200522 if (!cs_state_in(si->cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
Willy Tarreau3b285d72019-06-06 08:20:17 +0200523 return;
524
Christopher Faulet13a35e52021-12-20 15:34:16 +0100525 if (!cs_conn_mux(si->cs))
Willy Tarreau3b285d72019-06-06 08:20:17 +0200526 return;
527
Christopher Faulet13a35e52021-12-20 15:34:16 +0100528 si_cs_send(si->cs);
Willy Tarreau3b285d72019-06-06 08:20:17 +0200529}
530
Willy Tarreau8b3d7df2013-09-29 14:51:58 +0200531/*
Willy Tarreauce323de2012-08-20 21:41:06 +0200532 * This is the callback which is called by the connection layer to receive data
Olivier Houchard9aaf7782017-09-13 18:30:23 +0200533 * into the buffer from the connection. It iterates over the mux layer's
Willy Tarreauf7bc57c2012-10-03 00:19:48 +0200534 * rcv_buf function.
Willy Tarreauce323de2012-08-20 21:41:06 +0200535 */
Christopher Faulet9b7a9b42022-04-01 13:48:39 +0200536int si_cs_recv(struct conn_stream *cs)
Willy Tarreauce323de2012-08-20 21:41:06 +0200537{
Christopher Faulet693b23b2022-02-28 09:09:05 +0100538 struct connection *conn = __cs_conn(cs);
Christopher Fauletf835dea2021-12-21 14:35:17 +0100539 struct stream_interface *si = cs_si(cs);
Willy Tarreauafc8a222014-11-28 15:46:27 +0100540 struct channel *ic = si_ic(si);
Olivier Houchardf6535282018-08-31 17:29:12 +0200541 int ret, max, cur_read = 0;
Willy Tarreauce323de2012-08-20 21:41:06 +0200542 int read_poll = MAX_READ_POLL_LOOPS;
Christopher Fauletc6618d62018-10-11 15:56:04 +0200543 int flags = 0;
Willy Tarreauce323de2012-08-20 21:41:06 +0200544
Christopher Faulet04400bc2019-10-25 10:21:01 +0200545 /* If not established yet, do nothing. */
Christopher Faulet62e75742022-03-31 09:16:34 +0200546 if (cs->state != CS_ST_EST)
Christopher Faulet04400bc2019-10-25 10:21:01 +0200547 return 0;
548
Olivier Houchardf6535282018-08-31 17:29:12 +0200549 /* If another call to si_cs_recv() failed, and we subscribed to
550 * recv events already, give up now.
551 */
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200552 if (si->cs->wait_event.events & SUB_RETRY_RECV)
Olivier Houchardf6535282018-08-31 17:29:12 +0200553 return 0;
Willy Tarreauce323de2012-08-20 21:41:06 +0200554
Willy Tarreauce323de2012-08-20 21:41:06 +0200555 /* maybe we were called immediately after an asynchronous shutr */
Willy Tarreauafc8a222014-11-28 15:46:27 +0100556 if (ic->flags & CF_SHUTR)
Olivier Houchardc2aa7112018-09-11 18:27:21 +0200557 return 1;
Willy Tarreauce323de2012-08-20 21:41:06 +0200558
Christopher Faulete96993b2020-07-30 09:26:46 +0200559 /* we must wait because the mux is not installed yet */
560 if (!conn->mux)
561 return 0;
562
Willy Tarreau54e917c2017-08-30 07:35:35 +0200563 /* stop here if we reached the end of data */
Christopher Fauletb041b232022-03-24 10:27:02 +0100564 if (cs->endp->flags & CS_EP_EOS)
Christopher Faulet36b536d2019-11-20 11:56:33 +0100565 goto end_recv;
Willy Tarreau54e917c2017-08-30 07:35:35 +0200566
Christopher Fauletf061e422018-12-07 14:51:20 +0100567 /* stop immediately on errors. Note that we DON'T want to stop on
568 * POLL_ERR, as the poller might report a write error while there
569 * are still data available in the recv buffer. This typically
570 * happens when we send too large a request to a backend server
571 * which rejects it before reading it all.
572 */
Christopher Fauletb041b232022-03-24 10:27:02 +0100573 if (!(cs->endp->flags & CS_EP_RCV_MORE)) {
Christopher Fauletf061e422018-12-07 14:51:20 +0100574 if (!conn_xprt_ready(conn))
575 return 0;
Christopher Fauletb041b232022-03-24 10:27:02 +0100576 if (cs->endp->flags & CS_EP_ERROR)
Christopher Faulet36b536d2019-11-20 11:56:33 +0100577 goto end_recv;
Christopher Fauletf061e422018-12-07 14:51:20 +0100578 }
Willy Tarreauf26c26c2018-11-12 16:11:08 +0100579
Willy Tarreau7ab99a32018-12-18 09:15:43 +0100580 /* prepare to detect if the mux needs more room */
Christopher Fauletb041b232022-03-24 10:27:02 +0100581 cs->endp->flags &= ~CS_EP_WANT_ROOM;
Willy Tarreau7ab99a32018-12-18 09:15:43 +0100582
Willy Tarreau77e478c2018-06-19 07:03:14 +0200583 if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) &&
Willy Tarreau7e312732014-02-12 16:35:14 +0100584 global.tune.idle_timer &&
Willy Tarreauafc8a222014-11-28 15:46:27 +0100585 (unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) {
Willy Tarreauc5890e62014-02-09 17:47:01 +0100586 /* The buffer was empty and nothing was transferred for more
587 * than one second. This was caused by a pause and not by
588 * congestion. Reset any streaming mode to reduce latency.
589 */
Willy Tarreauafc8a222014-11-28 15:46:27 +0100590 ic->xfer_small = 0;
591 ic->xfer_large = 0;
592 ic->flags &= ~(CF_STREAMER | CF_STREAMER_FAST);
Willy Tarreauc5890e62014-02-09 17:47:01 +0100593 }
594
Willy Tarreau96199b12012-08-24 00:46:52 +0200595 /* First, let's see if we may splice data across the channel without
596 * using a buffer.
597 */
Christopher Faulete9e48202022-03-22 18:13:29 +0100598 if (cs->endp->flags & CS_EP_MAY_SPLICE &&
Willy Tarreauafc8a222014-11-28 15:46:27 +0100599 (ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
600 ic->flags & CF_KERN_SPLICING) {
Willy Tarreaud760eec2018-07-10 09:50:25 +0200601 if (c_data(ic)) {
Willy Tarreau96199b12012-08-24 00:46:52 +0200602 /* We're embarrassed, there are already data pending in
603 * the buffer and we don't want to have them at two
604 * locations at a time. Let's indicate we need some
605 * place and ask the consumer to hurry.
606 */
Christopher Fauletc6618d62018-10-11 15:56:04 +0200607 flags |= CO_RFL_BUF_FLUSH;
Willy Tarreau96199b12012-08-24 00:46:52 +0200608 goto abort_splice;
609 }
Willy Tarreauce323de2012-08-20 21:41:06 +0200610
Willy Tarreauafc8a222014-11-28 15:46:27 +0100611 if (unlikely(ic->pipe == NULL)) {
612 if (pipes_used >= global.maxpipes || !(ic->pipe = get_pipe())) {
613 ic->flags &= ~CF_KERN_SPLICING;
Willy Tarreau96199b12012-08-24 00:46:52 +0200614 goto abort_splice;
615 }
616 }
617
Olivier Houchard9aaf7782017-09-13 18:30:23 +0200618 ret = conn->mux->rcv_pipe(cs, ic->pipe, ic->to_forward);
Willy Tarreau96199b12012-08-24 00:46:52 +0200619 if (ret < 0) {
620 /* splice not supported on this end, let's disable it */
Willy Tarreauafc8a222014-11-28 15:46:27 +0100621 ic->flags &= ~CF_KERN_SPLICING;
Willy Tarreau96199b12012-08-24 00:46:52 +0200622 goto abort_splice;
623 }
Willy Tarreauce323de2012-08-20 21:41:06 +0200624
Willy Tarreau96199b12012-08-24 00:46:52 +0200625 if (ret > 0) {
Willy Tarreauafc8a222014-11-28 15:46:27 +0100626 if (ic->to_forward != CHN_INFINITE_FORWARD)
627 ic->to_forward -= ret;
628 ic->total += ret;
Willy Tarreau96199b12012-08-24 00:46:52 +0200629 cur_read += ret;
Willy Tarreauafc8a222014-11-28 15:46:27 +0100630 ic->flags |= CF_READ_PARTIAL;
Willy Tarreauce323de2012-08-20 21:41:06 +0200631 }
Willy Tarreau96199b12012-08-24 00:46:52 +0200632
Christopher Fauletb041b232022-03-24 10:27:02 +0100633 if (cs->endp->flags & (CS_EP_EOS|CS_EP_ERROR))
Christopher Faulet36b536d2019-11-20 11:56:33 +0100634 goto end_recv;
Willy Tarreau96199b12012-08-24 00:46:52 +0200635
Willy Tarreau61d39a02013-07-18 21:49:32 +0200636 if (conn->flags & CO_FL_WAIT_ROOM) {
637 /* the pipe is full or we have read enough data that it
638 * could soon be full. Let's stop before needing to poll.
639 */
Willy Tarreaudb398432018-11-15 11:08:52 +0100640 si_rx_room_blk(si);
Willy Tarreauffb12052018-11-15 16:06:02 +0100641 goto done_recv;
Willy Tarreau61d39a02013-07-18 21:49:32 +0200642 }
Willy Tarreau56a77e52012-09-02 18:34:44 +0200643
Willy Tarreauce323de2012-08-20 21:41:06 +0200644 /* splice not possible (anymore), let's go on on standard copy */
645 }
Willy Tarreau96199b12012-08-24 00:46:52 +0200646
647 abort_splice:
Willy Tarreauafc8a222014-11-28 15:46:27 +0100648 if (ic->pipe && unlikely(!ic->pipe->data)) {
649 put_pipe(ic->pipe);
650 ic->pipe = NULL;
Willy Tarreau96199b12012-08-24 00:46:52 +0200651 }
652
Christopher Faulete9e48202022-03-22 18:13:29 +0100653 if (ic->pipe && ic->to_forward && !(flags & CO_RFL_BUF_FLUSH) && cs->endp->flags & CS_EP_MAY_SPLICE) {
Willy Tarreauc640ef12019-12-03 18:13:04 +0100654 /* don't break splicing by reading, but still call rcv_buf()
655 * to pass the flag.
656 */
657 goto done_recv;
658 }
659
Christopher Fauleta73e59b2016-12-09 17:30:18 +0100660 /* now we'll need a input buffer for the stream */
Willy Tarreau581abd32018-10-25 10:21:41 +0200661 if (!si_alloc_ibuf(si, &(si_strm(si)->buffer_wait)))
Willy Tarreau10fc09e2014-11-25 19:46:36 +0100662 goto end_recv;
Willy Tarreau10fc09e2014-11-25 19:46:36 +0100663
Christopher Faulet2bc364c2021-09-21 15:22:12 +0200664 /* For an HTX stream, if the buffer is stuck (no output data with some
665 * input data) and if the HTX message is fragmented or if its free space
666 * wraps, we force an HTX deframentation. It is a way to have a
667 * contiguous free space nad to let the mux to copy as much data as
668 * possible.
669 *
670 * NOTE: A possible optim may be to let the mux decides if defrag is
671 * required or not, depending on amount of data to be xferred.
672 */
673 if (IS_HTX_STRM(si_strm(si)) && !co_data(ic)) {
674 struct htx *htx = htxbuf(&ic->buf);
675
676 if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx)))
677 htx_defrag(htxbuf(&ic->buf), NULL, 0);
678 }
Christopher Faulet68a14db2021-09-21 15:14:57 +0200679
680 /* Instruct the mux it must subscribed for read events */
681 flags |= ((!conn_is_back(conn) && (si_strm(si)->be->options & PR_O_ABRT_CLOSE)) ? CO_RFL_KEEP_RECV : 0);
682
Willy Tarreau61d39a02013-07-18 21:49:32 +0200683 /* Important note : if we're called with POLL_IN|POLL_HUP, it means the read polling
684 * was enabled, which implies that the recv buffer was not full. So we have a guarantee
685 * that if such an event is not handled above in splice, it will be handled here by
686 * recv().
687 */
Christopher Fauletb041b232022-03-24 10:27:02 +0100688 while ((cs->endp->flags & CS_EP_RCV_MORE) ||
Willy Tarreaud1480cc2022-03-17 16:19:09 +0100689 (!(conn->flags & CO_FL_HANDSHAKE) &&
Christopher Fauletb041b232022-03-24 10:27:02 +0100690 (!(cs->endp->flags & (CS_EP_ERROR|CS_EP_EOS))) && !(ic->flags & CF_SHUTR))) {
Christopher Faulet68a14db2021-09-21 15:14:57 +0200691 int cur_flags = flags;
692
693 /* Compute transient CO_RFL_* flags */
Christopher Faulet564e39c2021-09-21 15:50:55 +0200694 if (co_data(ic)) {
695 cur_flags |= (CO_RFL_BUF_WET | CO_RFL_BUF_NOT_STUCK);
696 }
Christopher Faulet68a14db2021-09-21 15:14:57 +0200697
Christopher Faulet4eb7d742018-10-11 15:29:21 +0200698 /* <max> may be null. This is the mux responsibility to set
Christopher Fauletb041b232022-03-24 10:27:02 +0100699 * CS_EP_RCV_MORE on the CS if more space is needed.
Christopher Faulet4eb7d742018-10-11 15:29:21 +0200700 */
Willy Tarreauafc8a222014-11-28 15:46:27 +0100701 max = channel_recv_max(ic);
Christopher Faulet897d6122021-12-17 17:28:35 +0100702 ret = conn->mux->rcv_buf(cs, &ic->buf, max, cur_flags);
Willy Tarreau674e0ad2018-12-05 13:45:41 +0100703
Christopher Fauletb041b232022-03-24 10:27:02 +0100704 if (cs->endp->flags & CS_EP_WANT_ROOM) {
705 /* CS_EP_WANT_ROOM must not be reported if the channel's
Christopher Fauletae179252022-02-21 16:12:00 +0100706 * buffer is empty.
707 */
708 BUG_ON(c_empty(ic));
709
Willy Tarreaudb398432018-11-15 11:08:52 +0100710 si_rx_room_blk(si);
Christopher Fauletdf994082021-09-23 14:17:20 +0200711 /* Add READ_PARTIAL because some data are pending but
712 * cannot be xferred to the channel
713 */
714 ic->flags |= CF_READ_PARTIAL;
715 }
Willy Tarreau6577b482017-12-10 21:19:33 +0100716
Willy Tarreauf26c26c2018-11-12 16:11:08 +0100717 if (ret <= 0) {
Willy Tarreau1ac5f202019-12-03 18:08:45 +0100718 /* if we refrained from reading because we asked for a
719 * flush to satisfy rcv_pipe(), we must not subscribe
720 * and instead report that there's not enough room
721 * here to proceed.
722 */
723 if (flags & CO_RFL_BUF_FLUSH)
724 si_rx_room_blk(si);
Willy Tarreauce323de2012-08-20 21:41:06 +0200725 break;
Willy Tarreauf26c26c2018-11-12 16:11:08 +0100726 }
Willy Tarreauce323de2012-08-20 21:41:06 +0200727
728 cur_read += ret;
729
730 /* if we're allowed to directly forward data, we must update ->o */
Willy Tarreauafc8a222014-11-28 15:46:27 +0100731 if (ic->to_forward && !(ic->flags & (CF_SHUTW|CF_SHUTW_NOW))) {
Willy Tarreauce323de2012-08-20 21:41:06 +0200732 unsigned long fwd = ret;
Willy Tarreauafc8a222014-11-28 15:46:27 +0100733 if (ic->to_forward != CHN_INFINITE_FORWARD) {
734 if (fwd > ic->to_forward)
735 fwd = ic->to_forward;
736 ic->to_forward -= fwd;
Willy Tarreauce323de2012-08-20 21:41:06 +0200737 }
Willy Tarreaubcbd3932018-06-06 07:13:22 +0200738 c_adv(ic, fwd);
Willy Tarreauce323de2012-08-20 21:41:06 +0200739 }
740
Willy Tarreauafc8a222014-11-28 15:46:27 +0100741 ic->flags |= CF_READ_PARTIAL;
742 ic->total += ret;
Willy Tarreauce323de2012-08-20 21:41:06 +0200743
Christopher Faulet883d83e2021-09-09 10:17:59 +0200744 /* End-of-input reached, we can leave. In this case, it is
745 * important to break the loop to not block the SI because of
746 * the channel's policies.This way, we are still able to receive
747 * shutdowns.
748 */
Christopher Fauletb041b232022-03-24 10:27:02 +0100749 if (cs->endp->flags & CS_EP_EOI)
Christopher Faulet883d83e2021-09-09 10:17:59 +0200750 break;
751
Willy Tarreauf26c26c2018-11-12 16:11:08 +0100752 if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
753 /* we're stopped by the channel's policy */
Willy Tarreaub26a6f92018-11-14 17:10:36 +0100754 si_rx_chan_blk(si);
Willy Tarreau62dd6982017-11-18 11:26:20 +0100755 break;
Willy Tarreauf26c26c2018-11-12 16:11:08 +0100756 }
Willy Tarreauce323de2012-08-20 21:41:06 +0200757
758 /* if too many bytes were missing from last read, it means that
759 * it's pointless trying to read again because the system does
760 * not have them in buffers.
761 */
762 if (ret < max) {
Willy Tarreauce323de2012-08-20 21:41:06 +0200763 /* if a streamer has read few data, it may be because we
764 * have exhausted system buffers. It's not worth trying
765 * again.
766 */
Willy Tarreauf26c26c2018-11-12 16:11:08 +0100767 if (ic->flags & CF_STREAMER) {
768 /* we're stopped by the channel's policy */
Willy Tarreaub26a6f92018-11-14 17:10:36 +0100769 si_rx_chan_blk(si);
Willy Tarreauce323de2012-08-20 21:41:06 +0200770 break;
Willy Tarreauf26c26c2018-11-12 16:11:08 +0100771 }
Willy Tarreauce323de2012-08-20 21:41:06 +0200772
773 /* if we read a large block smaller than what we requested,
774 * it's almost certain we'll never get anything more.
775 */
Willy Tarreauf26c26c2018-11-12 16:11:08 +0100776 if (ret >= global.tune.recv_enough) {
777 /* we're stopped by the channel's policy */
Willy Tarreaub26a6f92018-11-14 17:10:36 +0100778 si_rx_chan_blk(si);
Willy Tarreauce323de2012-08-20 21:41:06 +0200779 break;
Willy Tarreauf26c26c2018-11-12 16:11:08 +0100780 }
Willy Tarreauce323de2012-08-20 21:41:06 +0200781 }
Christopher Fauletb3e0de42018-10-11 13:54:13 +0200782
783 /* if we are waiting for more space, don't try to read more data
784 * right now.
785 */
Willy Tarreaub26a6f92018-11-14 17:10:36 +0100786 if (si_rx_blocked(si))
Christopher Fauletb3e0de42018-10-11 13:54:13 +0200787 break;
Willy Tarreauce323de2012-08-20 21:41:06 +0200788 } /* while !flags */
789
Willy Tarreauffb12052018-11-15 16:06:02 +0100790 done_recv:
Willy Tarreauc5890e62014-02-09 17:47:01 +0100791 if (cur_read) {
Willy Tarreauafc8a222014-11-28 15:46:27 +0100792 if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) &&
Willy Tarreauc9fa0482018-07-10 17:43:27 +0200793 (cur_read <= ic->buf.size / 2)) {
Willy Tarreauafc8a222014-11-28 15:46:27 +0100794 ic->xfer_large = 0;
795 ic->xfer_small++;
796 if (ic->xfer_small >= 3) {
Willy Tarreauc5890e62014-02-09 17:47:01 +0100797 /* we have read less than half of the buffer in
798 * one pass, and this happened at least 3 times.
799 * This is definitely not a streamer.
800 */
Willy Tarreauafc8a222014-11-28 15:46:27 +0100801 ic->flags &= ~(CF_STREAMER | CF_STREAMER_FAST);
Willy Tarreauc5890e62014-02-09 17:47:01 +0100802 }
Willy Tarreauafc8a222014-11-28 15:46:27 +0100803 else if (ic->xfer_small >= 2) {
Willy Tarreauc5890e62014-02-09 17:47:01 +0100804 /* if the buffer has been at least half full twice,
805 * we receive faster than we send, so at least it
806 * is not a "fast streamer".
807 */
Willy Tarreauafc8a222014-11-28 15:46:27 +0100808 ic->flags &= ~CF_STREAMER_FAST;
Willy Tarreauc5890e62014-02-09 17:47:01 +0100809 }
810 }
Willy Tarreauafc8a222014-11-28 15:46:27 +0100811 else if (!(ic->flags & CF_STREAMER_FAST) &&
Willy Tarreauc9fa0482018-07-10 17:43:27 +0200812 (cur_read >= ic->buf.size - global.tune.maxrewrite)) {
Willy Tarreauc5890e62014-02-09 17:47:01 +0100813 /* we read a full buffer at once */
Willy Tarreauafc8a222014-11-28 15:46:27 +0100814 ic->xfer_small = 0;
815 ic->xfer_large++;
816 if (ic->xfer_large >= 3) {
Willy Tarreauc5890e62014-02-09 17:47:01 +0100817 /* we call this buffer a fast streamer if it manages
818 * to be filled in one call 3 consecutive times.
819 */
Willy Tarreauafc8a222014-11-28 15:46:27 +0100820 ic->flags |= (CF_STREAMER | CF_STREAMER_FAST);
Willy Tarreauc5890e62014-02-09 17:47:01 +0100821 }
822 }
823 else {
Willy Tarreauafc8a222014-11-28 15:46:27 +0100824 ic->xfer_small = 0;
825 ic->xfer_large = 0;
Willy Tarreauc5890e62014-02-09 17:47:01 +0100826 }
Willy Tarreauafc8a222014-11-28 15:46:27 +0100827 ic->last_read = now_ms;
Willy Tarreauc5890e62014-02-09 17:47:01 +0100828 }
829
Willy Tarreau10fc09e2014-11-25 19:46:36 +0100830 end_recv:
Christopher Faulete6d8cb12019-11-20 16:42:00 +0100831 ret = (cur_read != 0);
832
Christopher Faulet36b536d2019-11-20 11:56:33 +0100833 /* Report EOI on the channel if it was reached from the mux point of
834 * view. */
Christopher Fauletb041b232022-03-24 10:27:02 +0100835 if ((cs->endp->flags & CS_EP_EOI) && !(ic->flags & CF_EOI)) {
Christopher Faulet36b536d2019-11-20 11:56:33 +0100836 ic->flags |= (CF_EOI|CF_READ_PARTIAL);
Christopher Faulete6d8cb12019-11-20 16:42:00 +0100837 ret = 1;
838 }
Willy Tarreau10fc09e2014-11-25 19:46:36 +0100839
Christopher Faulet6cd56d52022-03-30 10:47:32 +0200840 if (cs->endp->flags & CS_EP_ERROR)
Christopher Faulete6d8cb12019-11-20 16:42:00 +0100841 ret = 1;
Christopher Fauletb041b232022-03-24 10:27:02 +0100842 else if (cs->endp->flags & CS_EP_EOS) {
Willy Tarreau18955db2020-01-23 16:32:24 +0100843 /* we received a shutdown */
844 ic->flags |= CF_READ_NULL;
845 if (ic->flags & CF_AUTO_CLOSE)
846 channel_shutw_now(ic);
Christopher Fauletd715d362022-04-01 16:38:32 +0200847 cs_conn_read0(cs);
Christopher Faulete6d8cb12019-11-20 16:42:00 +0100848 ret = 1;
Christopher Faulet36b536d2019-11-20 11:56:33 +0100849 }
850 else if (!si_rx_blocked(si)) {
851 /* Subscribe to receive events if we're blocking on I/O */
Christopher Faulet2f35e7b2022-03-31 11:09:28 +0200852 conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->cs->wait_event);
Willy Tarreaudd5621a2018-11-15 16:55:14 +0100853 si_rx_endp_done(si);
854 } else {
855 si_rx_endp_more(si);
Christopher Faulete6d8cb12019-11-20 16:42:00 +0100856 ret = 1;
Willy Tarreaudd5621a2018-11-15 16:55:14 +0100857 }
Christopher Faulete6d8cb12019-11-20 16:42:00 +0100858 return ret;
Willy Tarreauce323de2012-08-20 21:41:06 +0200859}
860
861/*
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200862 * This function propagates a null read received on a socket-based connection.
Christopher Faulet8abe7122022-03-30 15:10:18 +0200863 * It updates the stream interface. If the stream interface has CS_FL_NOHALF,
Willy Tarreau11405122015-03-12 22:32:27 +0100864 * the close is also forwarded to the write side as an abort.
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200865 */
Christopher Fauletd715d362022-04-01 16:38:32 +0200866static void cs_conn_read0(struct conn_stream *cs)
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200867{
Christopher Fauletd715d362022-04-01 16:38:32 +0200868 struct channel *ic = cs_ic(cs);
869 struct channel *oc = cs_oc(cs);
Willy Tarreaub363a1f2013-10-01 10:45:07 +0200870
Christopher Faulet13a35e52021-12-20 15:34:16 +0100871 BUG_ON(!cs_conn(cs));
872
Christopher Fauletd715d362022-04-01 16:38:32 +0200873 si_rx_shut_blk(cs->si);
Willy Tarreauafc8a222014-11-28 15:46:27 +0100874 if (ic->flags & CF_SHUTR)
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200875 return;
Willy Tarreauafc8a222014-11-28 15:46:27 +0100876 ic->flags |= CF_SHUTR;
877 ic->rex = TICK_ETERNITY;
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200878
Christopher Faulet62e75742022-03-31 09:16:34 +0200879 if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200880 return;
881
Willy Tarreauafc8a222014-11-28 15:46:27 +0100882 if (oc->flags & CF_SHUTW)
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200883 goto do_close;
884
Christopher Faulet8abe7122022-03-30 15:10:18 +0200885 if (cs->flags & CS_FL_NOHALF) {
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200886 /* we want to immediately forward this close to the write side */
Willy Tarreau87b09662015-04-03 00:22:06 +0200887 /* force flag on ssl to keep stream in cache */
Christopher Faulet69ef6c92022-03-31 14:20:00 +0200888 cs_conn_shutw(cs, CO_SHW_SILENT);
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200889 goto do_close;
890 }
891
892 /* otherwise that's just a normal read shutdown */
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200893 return;
894
895 do_close:
Christopher Fauletda098e62022-03-31 17:44:45 +0200896 /* OK we completely close the socket here just as if we went through cs_shut[rw]() */
Christopher Faulet69ef6c92022-03-31 14:20:00 +0200897 cs_conn_close(cs);
Willy Tarreauf9fbfe82012-11-21 21:51:53 +0100898
Willy Tarreauafc8a222014-11-28 15:46:27 +0100899 oc->flags &= ~CF_SHUTW_NOW;
900 oc->flags |= CF_SHUTW;
901 oc->wex = TICK_ETERNITY;
Willy Tarreauf9fbfe82012-11-21 21:51:53 +0100902
Christopher Fauletd715d362022-04-01 16:38:32 +0200903 si_done_get(cs->si);
Willy Tarreauf9fbfe82012-11-21 21:51:53 +0100904
Christopher Faulet62e75742022-03-31 09:16:34 +0200905 cs->state = CS_ST_DIS;
Christopher Fauletae024ce2022-03-29 19:02:31 +0200906 __cs_strm(cs)->conn_exp = TICK_ETERNITY;
Willy Tarreau9bf9c142012-08-20 15:38:41 +0200907 return;
908}
909
Willy Tarreau651e1822015-09-23 20:06:13 +0200910/* Callback to be used by applet handlers upon completion. It updates the stream
911 * (which may or may not take this opportunity to try to forward data), then
Emeric Brun2802b072017-06-30 14:11:56 +0200912 * may re-enable the applet's based on the channels and stream interface's final
Willy Tarreau651e1822015-09-23 20:06:13 +0200913 * states.
914 */
Christopher Faulet6059ba42022-04-01 16:34:53 +0200915int cs_applet_process(struct conn_stream *cs)
Willy Tarreaue5f86492015-04-19 15:16:35 +0200916{
Christopher Faulet6059ba42022-04-01 16:34:53 +0200917 struct channel *ic = cs_ic(cs);
Willy Tarreaueca572f2015-09-25 19:11:55 +0200918
Christopher Faulet6059ba42022-04-01 16:34:53 +0200919 BUG_ON(!cs_appctx(cs));
Christopher Faulet693b23b2022-02-28 09:09:05 +0100920
Willy Tarreaueca572f2015-09-25 19:11:55 +0200921 /* If the applet wants to write and the channel is closed, it's a
922 * broken pipe and it must be reported.
923 */
Christopher Faulet6059ba42022-04-01 16:34:53 +0200924 if (!(cs->si->flags & SI_FL_RX_WAIT_EP) && (ic->flags & CF_SHUTR))
925 cs->endp->flags |= CS_EP_ERROR;
Willy Tarreaueca572f2015-09-25 19:11:55 +0200926
Willy Tarreau186dcdd2018-11-16 16:18:34 +0100927 /* automatically mark the applet having data available if it reported
928 * begin blocked by the channel.
929 */
Christopher Faulet6059ba42022-04-01 16:34:53 +0200930 if (si_rx_blocked(cs->si))
931 si_rx_endp_more(cs->si);
Willy Tarreau186dcdd2018-11-16 16:18:34 +0100932
Willy Tarreau651e1822015-09-23 20:06:13 +0200933 /* update the stream-int, channels, and possibly wake the stream up */
Christopher Faulet9029a722022-04-01 16:48:36 +0200934 cs_notify(cs);
Christopher Faulet6059ba42022-04-01 16:34:53 +0200935 stream_release_buffers(__cs_strm(cs));
Willy Tarreaue5f86492015-04-19 15:16:35 +0200936
Christopher Faulet9029a722022-04-01 16:48:36 +0200937 /* cs_notify may have passed through chk_snd and released some
Willy Tarreau32742fd2018-11-14 14:07:59 +0100938 * RXBLK flags. Process_stream will consider those flags to wake up the
939 * appctx but in the case the task is not in runqueue we may have to
940 * wakeup the appctx immediately.
Emeric Brun2802b072017-06-30 14:11:56 +0200941 */
Christopher Faulet6059ba42022-04-01 16:34:53 +0200942 if ((si_rx_endp_ready(cs->si) && !si_rx_blocked(cs->si)) ||
943 (si_tx_endp_ready(cs->si) && !si_tx_blocked(cs->si)))
944 appctx_wakeup(__cs_appctx(cs));
945 return 0;
Willy Tarreaud45b9f82015-04-13 16:30:14 +0200946}
947
Willy Tarreaudded32d2008-11-30 19:48:07 +0100948/*
Willy Tarreaucff64112008-11-03 06:26:53 +0100949 * Local variables:
950 * c-indent-level: 8
951 * c-basic-offset: 8
952 * End:
953 */