blob: 2066b06bac4adf77b73ebb4a8c2510d198b9e03f [file] [log] [blame]
Willy Tarreaucff64112008-11-03 06:26:53 +01001/*
2 * Functions managing stream_interface structures
3 *
Willy Tarreauf873d752012-05-11 17:47:17 +02004 * Copyright 2000-2012 Willy Tarreau <w@1wt.eu>
Willy Tarreaucff64112008-11-03 06:26:53 +01005 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17
18#include <sys/socket.h>
19#include <sys/stat.h>
20#include <sys/types.h>
21
22#include <common/compat.h>
23#include <common/config.h>
24#include <common/debug.h>
25#include <common/standard.h>
26#include <common/ticks.h>
27#include <common/time.h>
28
Willy Tarreauc7e42382012-08-24 19:22:53 +020029#include <proto/channel.h>
Willy Tarreau8b117082012-08-06 15:06:49 +020030#include <proto/connection.h>
Willy Tarreaucff64112008-11-03 06:26:53 +010031#include <proto/fd.h>
Willy Tarreau2c6be842012-07-06 17:12:34 +020032#include <proto/frontend.h>
Willy Tarreau96199b12012-08-24 00:46:52 +020033#include <proto/pipe.h>
Willy Tarreau269358d2009-09-20 20:14:49 +020034#include <proto/stream_interface.h>
Willy Tarreaucff64112008-11-03 06:26:53 +010035#include <proto/task.h>
36
Willy Tarreaufd31e532012-07-23 18:24:25 +020037#include <types/pipe.h>
38
Willy Tarreauf873d752012-05-11 17:47:17 +020039/* socket functions used when running a stream interface as a task */
40static void stream_int_update(struct stream_interface *si);
41static void stream_int_update_embedded(struct stream_interface *si);
Willy Tarreauf873d752012-05-11 17:47:17 +020042static void stream_int_chk_rcv(struct stream_interface *si);
43static void stream_int_chk_snd(struct stream_interface *si);
Willy Tarreauc5788912012-08-24 18:12:41 +020044static void stream_int_update_conn(struct stream_interface *si);
45static void stream_int_chk_rcv_conn(struct stream_interface *si);
46static void stream_int_chk_snd_conn(struct stream_interface *si);
Willy Tarreauf873d752012-05-11 17:47:17 +020047
Willy Tarreauc5788912012-08-24 18:12:41 +020048/* stream-interface operations for embedded tasks */
49struct si_ops si_embedded_ops = {
Willy Tarreau5c979a92012-05-07 17:15:39 +020050 .update = stream_int_update_embedded,
Willy Tarreau5c979a92012-05-07 17:15:39 +020051 .chk_rcv = stream_int_chk_rcv,
52 .chk_snd = stream_int_chk_snd,
Willy Tarreau5c979a92012-05-07 17:15:39 +020053};
54
Willy Tarreauc5788912012-08-24 18:12:41 +020055/* stream-interface operations for external tasks */
56struct si_ops si_task_ops = {
Willy Tarreau5c979a92012-05-07 17:15:39 +020057 .update = stream_int_update,
Willy Tarreau5c979a92012-05-07 17:15:39 +020058 .chk_rcv = stream_int_chk_rcv,
59 .chk_snd = stream_int_chk_snd,
Willy Tarreau5c979a92012-05-07 17:15:39 +020060};
61
Willy Tarreauc5788912012-08-24 18:12:41 +020062/* stream-interface operations for connections */
63struct si_ops si_conn_ops = {
64 .update = stream_int_update_conn,
65 .chk_rcv = stream_int_chk_rcv_conn,
66 .chk_snd = stream_int_chk_snd_conn,
67};
68
69struct app_cb si_conn_cb = {
70 .recv = si_conn_recv_cb,
71 .send = si_conn_send_cb,
72};
73
Willy Tarreaucff64112008-11-03 06:26:53 +010074/*
75 * This function only has to be called once after a wakeup event in case of
76 * suspected timeout. It controls the stream interface timeouts and sets
77 * si->flags accordingly. It does NOT close anything, as this timeout may
78 * be used for any purpose. It returns 1 if the timeout fired, otherwise
79 * zero.
80 */
81int stream_int_check_timeouts(struct stream_interface *si)
82{
83 if (tick_is_expired(si->exp, now_ms)) {
84 si->flags |= SI_FL_EXP;
85 return 1;
86 }
87 return 0;
88}
89
Willy Tarreaufe3718a2008-11-30 18:14:12 +010090/* to be called only when in SI_ST_DIS with SI_FL_ERR */
Willy Tarreaucff64112008-11-03 06:26:53 +010091void stream_int_report_error(struct stream_interface *si)
92{
93 if (!si->err_type)
94 si->err_type = SI_ET_DATA_ERR;
95
Willy Tarreaucff64112008-11-03 06:26:53 +010096 si->ob->flags |= BF_WRITE_ERROR;
Willy Tarreaucff64112008-11-03 06:26:53 +010097 si->ib->flags |= BF_READ_ERROR;
98}
99
100/*
Willy Tarreaudded32d2008-11-30 19:48:07 +0100101 * Returns a message to the client ; the connection is shut down for read,
102 * and the request is cleared so that no server connection can be initiated.
103 * The buffer is marked for read shutdown on the other side to protect the
104 * message, and the buffer write is enabled. The message is contained in a
Willy Tarreau148d0992010-01-10 10:21:21 +0100105 * "chunk". If it is null, then an empty message is used. The reply buffer does
106 * not need to be empty before this, and its contents will not be overwritten.
107 * The primary goal of this function is to return error messages to a client.
Willy Tarreaudded32d2008-11-30 19:48:07 +0100108 */
109void stream_int_retnclose(struct stream_interface *si, const struct chunk *msg)
110{
Willy Tarreau148d0992010-01-10 10:21:21 +0100111 buffer_auto_read(si->ib);
Willy Tarreaudded32d2008-11-30 19:48:07 +0100112 buffer_abort(si->ib);
Willy Tarreau148d0992010-01-10 10:21:21 +0100113 buffer_auto_close(si->ib);
114 buffer_erase(si->ib);
Willy Tarreau798e1282010-12-12 13:06:00 +0100115
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200116 bi_erase(si->ob);
Willy Tarreau148d0992010-01-10 10:21:21 +0100117 if (likely(msg && msg->len))
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200118 bo_inject(si->ob, msg->str, msg->len);
Willy Tarreaudded32d2008-11-30 19:48:07 +0100119
120 si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
Willy Tarreau148d0992010-01-10 10:21:21 +0100121 buffer_auto_read(si->ob);
Willy Tarreau520d95e2009-09-19 21:04:57 +0200122 buffer_auto_close(si->ob);
Willy Tarreau5d881d02009-12-27 22:51:06 +0100123 buffer_shutr_now(si->ob);
Willy Tarreau5d881d02009-12-27 22:51:06 +0100124}
125
Willy Tarreaufb90d942009-09-05 20:57:35 +0200126/* default update function for scheduled tasks, not used for embedded tasks */
Willy Tarreauf873d752012-05-11 17:47:17 +0200127static void stream_int_update(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200128{
129 DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
130 __FUNCTION__,
131 si, si->state, si->ib->flags, si->ob->flags);
132
133 if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
134 task_wakeup(si->owner, TASK_WOKEN_IO);
135}
136
137/* default update function for embedded tasks, to be used at the end of the i/o handler */
Willy Tarreauf873d752012-05-11 17:47:17 +0200138static void stream_int_update_embedded(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200139{
Willy Tarreau3488e252010-08-09 16:24:56 +0200140 int old_flags = si->flags;
141
Willy Tarreaufb90d942009-09-05 20:57:35 +0200142 DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
143 __FUNCTION__,
144 si, si->state, si->ib->flags, si->ob->flags);
145
146 if (si->state != SI_ST_EST)
147 return;
148
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200149 if ((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW &&
150 channel_is_empty(si->ob))
Willy Tarreau73b013b2012-05-21 16:31:45 +0200151 si_shutw(si);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200152
153 if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
154 si->flags |= SI_FL_WAIT_DATA;
155
Willy Tarreau96fd4b52009-10-04 17:18:35 +0200156 /* we're almost sure that we need some space if the buffer is not
157 * empty, even if it's not full, because the applets can't fill it.
158 */
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200159 if ((si->ib->flags & (BF_SHUTR|BF_DONT_READ)) == 0 && !channel_is_empty(si->ib))
Willy Tarreaufb90d942009-09-05 20:57:35 +0200160 si->flags |= SI_FL_WAIT_ROOM;
161
Willy Tarreauf27b5ea2009-10-03 22:01:18 +0200162 if (si->ob->flags & BF_WRITE_ACTIVITY) {
Willy Tarreaufb90d942009-09-05 20:57:35 +0200163 if (tick_isset(si->ob->wex))
164 si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
165 }
166
Willy Tarreauf27b5ea2009-10-03 22:01:18 +0200167 if (si->ib->flags & BF_READ_ACTIVITY ||
168 (si->ob->flags & BF_WRITE_ACTIVITY && !(si->flags & SI_FL_INDEP_STR))) {
169 if (tick_isset(si->ib->rex))
170 si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
171 }
172
Willy Tarreau3488e252010-08-09 16:24:56 +0200173 /* save flags to detect changes */
174 old_flags = si->flags;
Willy Tarreauf1ba4b32009-10-17 14:37:52 +0200175 if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
Willy Tarreau96fd4b52009-10-04 17:18:35 +0200176 (si->ob->prod->flags & SI_FL_WAIT_ROOM)))
Willy Tarreau73b013b2012-05-21 16:31:45 +0200177 si_chk_rcv(si->ob->prod);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200178
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200179 if (((si->ib->flags & BF_READ_PARTIAL) && !channel_is_empty(si->ib)) &&
Willy Tarreau3488e252010-08-09 16:24:56 +0200180 (si->ib->cons->flags & SI_FL_WAIT_DATA)) {
Willy Tarreau73b013b2012-05-21 16:31:45 +0200181 si_chk_snd(si->ib->cons);
Willy Tarreau3488e252010-08-09 16:24:56 +0200182 /* check if the consumer has freed some space */
183 if (!(si->ib->flags & BF_FULL))
184 si->flags &= ~SI_FL_WAIT_ROOM;
185 }
Willy Tarreaufb90d942009-09-05 20:57:35 +0200186
187 /* Note that we're trying to wake up in two conditions here :
188 * - special event, which needs the holder task attention
189 * - status indicating that the applet can go on working. This
190 * is rather hard because we might be blocking on output and
191 * don't want to wake up on input and vice-versa. The idea is
Willy Tarreau3488e252010-08-09 16:24:56 +0200192 * to only rely on the changes the chk_* might have performed.
Willy Tarreaufb90d942009-09-05 20:57:35 +0200193 */
194 if (/* check stream interface changes */
Willy Tarreau3488e252010-08-09 16:24:56 +0200195 ((old_flags & ~si->flags) & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) ||
196
197 /* changes on the production side */
198 (si->ib->flags & (BF_READ_NULL|BF_READ_ERROR)) ||
199 si->state != SI_ST_EST ||
200 (si->flags & SI_FL_ERR) ||
201 ((si->ib->flags & BF_READ_PARTIAL) &&
202 (!si->ib->to_forward || si->ib->cons->state != SI_ST_EST)) ||
203
204 /* changes on the consumption side */
205 (si->ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR)) ||
206 ((si->ob->flags & BF_WRITE_ACTIVITY) &&
207 ((si->ob->flags & BF_SHUTW) ||
208 si->ob->prod->state != SI_ST_EST ||
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200209 (channel_is_empty(si->ob) && !si->ob->to_forward)))) {
Willy Tarreaufb90d942009-09-05 20:57:35 +0200210 if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
211 task_wakeup(si->owner, TASK_WOKEN_IO);
212 }
Willy Tarreau3488e252010-08-09 16:24:56 +0200213 if (si->ib->flags & BF_READ_ACTIVITY)
214 si->ib->flags &= ~BF_READ_DONTWAIT;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200215}
216
Willy Tarreau4a36b562012-08-06 19:31:45 +0200217/*
218 * This function performs a shutdown-read on a stream interface in a connected
219 * or init state (it does nothing for other states). It either shuts the read
220 * side or marks itself as closed. The buffer flags are updated to reflect the
221 * new state. If the stream interface has SI_FL_NOHALF, we also forward the
222 * close to the write side. If a control layer is defined, then it is supposed
223 * to be a socket layer and file descriptors are then shutdown or closed
224 * accordingly. If no control layer is defined, then the SI is supposed to be
225 * an embedded one and the owner task is woken up if it exists. The function
226 * does not disable polling on the FD by itself, it returns non-zero instead
227 * if the caller needs to do so (except when the FD is deleted where this is
228 * implicit).
229 */
230int stream_int_shutr(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200231{
Willy Tarreau4a36b562012-08-06 19:31:45 +0200232 struct connection *conn = &si->conn;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200233
234 si->ib->flags &= ~BF_SHUTR_NOW;
235 if (si->ib->flags & BF_SHUTR)
Willy Tarreau4a36b562012-08-06 19:31:45 +0200236 return 0;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200237 si->ib->flags |= BF_SHUTR;
238 si->ib->rex = TICK_ETERNITY;
239 si->flags &= ~SI_FL_WAIT_ROOM;
240
241 if (si->state != SI_ST_EST && si->state != SI_ST_CON)
Willy Tarreau4a36b562012-08-06 19:31:45 +0200242 return 0;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200243
244 if (si->ob->flags & BF_SHUTW) {
Willy Tarreau4a36b562012-08-06 19:31:45 +0200245 conn_data_close(&si->conn);
246 if (conn->ctrl)
247 fd_delete(si_fd(si));
Willy Tarreaufb90d942009-09-05 20:57:35 +0200248 si->state = SI_ST_DIS;
249 si->exp = TICK_ETERNITY;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200250
Willy Tarreaud8ccffe2010-09-07 16:16:50 +0200251 if (si->release)
252 si->release(si);
253 }
Willy Tarreau4a36b562012-08-06 19:31:45 +0200254 else if (si->flags & SI_FL_NOHALF) {
255 /* we want to immediately forward this close to the write side */
256 return stream_int_shutw(si);
257 }
258 else if (conn->ctrl) {
259 /* we want the caller to disable polling on this FD */
260 return 1;
261 }
Willy Tarreau0bd05ea2010-07-02 11:18:03 +0200262
Willy Tarreau4a36b562012-08-06 19:31:45 +0200263 /* note that if the task exists, it must unregister itself once it runs */
264 if (!conn->ctrl && !(si->flags & SI_FL_DONT_WAKE) && si->owner)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200265 task_wakeup(si->owner, TASK_WOKEN_IO);
Willy Tarreau4a36b562012-08-06 19:31:45 +0200266 return 0;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200267}
268
Willy Tarreau4a36b562012-08-06 19:31:45 +0200269/*
270 * This function performs a shutdown-write on a stream interface in a connected or
271 * init state (it does nothing for other states). It either shuts the write side
272 * or marks itself as closed. The buffer flags are updated to reflect the new state.
273 * It does also close everything if the SI was marked as being in error state. If
274 * there is a data-layer shutdown, it is called. If a control layer is defined, then
275 * it is supposed to be a socket layer and file descriptors are then shutdown or
276 * closed accordingly. If no control layer is defined, then the SI is supposed to
277 * be an embedded one and the owner task is woken up if it exists. The function
278 * does not disable polling on the FD by itself, it returns non-zero instead if
279 * the caller needs to do so (except when the FD is deleted where this is implicit).
280 */
281int stream_int_shutw(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200282{
Willy Tarreau4a36b562012-08-06 19:31:45 +0200283 struct connection *conn = &si->conn;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200284
285 si->ob->flags &= ~BF_SHUTW_NOW;
286 if (si->ob->flags & BF_SHUTW)
Willy Tarreau4a36b562012-08-06 19:31:45 +0200287 return 0;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200288 si->ob->flags |= BF_SHUTW;
289 si->ob->wex = TICK_ETERNITY;
290 si->flags &= ~SI_FL_WAIT_DATA;
291
292 switch (si->state) {
293 case SI_ST_EST:
Willy Tarreau4a36b562012-08-06 19:31:45 +0200294 /* we have to shut before closing, otherwise some short messages
295 * may never leave the system, especially when there are remaining
296 * unread data in the socket input buffer, or when nolinger is set.
297 * However, if SI_FL_NOLINGER is explicitly set, we know there is
298 * no risk so we close both sides immediately.
299 */
300 if (si->flags & SI_FL_ERR) {
301 /* quick close, the socket is already shut. Remove pending flags. */
302 si->flags &= ~SI_FL_NOLINGER;
303 } else if (si->flags & SI_FL_NOLINGER) {
304 si->flags &= ~SI_FL_NOLINGER;
305 if (conn->ctrl) {
306 setsockopt(si_fd(si), SOL_SOCKET, SO_LINGER,
307 (struct linger *) &nolinger, sizeof(struct linger));
308 }
309 /* unclean data-layer shutdown */
310 if (conn->data && conn->data->shutw)
311 conn->data->shutw(conn, 0);
312 } else {
313 /* clean data-layer shutdown */
314 if (conn->data && conn->data->shutw)
315 conn->data->shutw(conn, 1);
316
317 if (!(si->flags & SI_FL_NOHALF)) {
318 /* We shutdown transport layer */
319 if (conn->ctrl)
320 shutdown(si_fd(si), SHUT_WR);
321
322 if (!(si->ib->flags & (BF_SHUTR|BF_DONT_READ))) {
323 /* OK just a shutw, but we want the caller
324 * to disable polling on this FD if exists.
325 */
326 return !!conn->ctrl;
327 }
328 }
329 }
Willy Tarreaufb90d942009-09-05 20:57:35 +0200330
331 /* fall through */
332 case SI_ST_CON:
Willy Tarreau4a36b562012-08-06 19:31:45 +0200333 /* we may have to close a pending connection, and mark the
334 * response buffer as shutr
335 */
336 conn_data_close(&si->conn);
337 if (conn->ctrl)
338 fd_delete(si_fd(si));
339 /* fall through */
Willy Tarreaufb90d942009-09-05 20:57:35 +0200340 case SI_ST_CER:
Willy Tarreau32d3ee92010-12-29 14:03:02 +0100341 case SI_ST_QUE:
342 case SI_ST_TAR:
Willy Tarreaufb90d942009-09-05 20:57:35 +0200343 si->state = SI_ST_DIS;
Willy Tarreaud8ccffe2010-09-07 16:16:50 +0200344
345 if (si->release)
346 si->release(si);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200347 default:
348 si->flags &= ~SI_FL_WAIT_ROOM;
349 si->ib->flags |= BF_SHUTR;
350 si->ib->rex = TICK_ETERNITY;
351 si->exp = TICK_ETERNITY;
352 }
353
Willy Tarreau4a36b562012-08-06 19:31:45 +0200354 /* note that if the task exists, it must unregister itself once it runs */
355 if (!conn->ctrl && !(si->flags & SI_FL_DONT_WAKE) && si->owner)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200356 task_wakeup(si->owner, TASK_WOKEN_IO);
Willy Tarreau4a36b562012-08-06 19:31:45 +0200357 return 0;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200358}
359
360/* default chk_rcv function for scheduled tasks */
Willy Tarreauf873d752012-05-11 17:47:17 +0200361static void stream_int_chk_rcv(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200362{
Willy Tarreau7421efb2012-07-02 15:11:27 +0200363 struct channel *ib = si->ib;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200364
365 DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
366 __FUNCTION__,
367 si, si->state, si->ib->flags, si->ob->flags);
368
369 if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
370 return;
371
Willy Tarreauf1ba4b32009-10-17 14:37:52 +0200372 if (ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) {
Willy Tarreaufb90d942009-09-05 20:57:35 +0200373 /* stop reading */
Willy Tarreauf1ba4b32009-10-17 14:37:52 +0200374 if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200375 si->flags |= SI_FL_WAIT_ROOM;
376 }
377 else {
378 /* (re)start reading */
379 si->flags &= ~SI_FL_WAIT_ROOM;
380 if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
381 task_wakeup(si->owner, TASK_WOKEN_IO);
382 }
383}
384
385/* default chk_snd function for scheduled tasks */
Willy Tarreauf873d752012-05-11 17:47:17 +0200386static void stream_int_chk_snd(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200387{
Willy Tarreau7421efb2012-07-02 15:11:27 +0200388 struct channel *ob = si->ob;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200389
390 DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
391 __FUNCTION__,
392 si, si->state, si->ib->flags, si->ob->flags);
393
394 if (unlikely(si->state != SI_ST_EST || (si->ob->flags & BF_SHUTW)))
395 return;
396
397 if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200398 channel_is_empty(ob)) /* called with nothing to send ! */
Willy Tarreaufb90d942009-09-05 20:57:35 +0200399 return;
400
401 /* Otherwise there are remaining data to be sent in the buffer,
402 * so we tell the handler.
403 */
404 si->flags &= ~SI_FL_WAIT_DATA;
405 if (!tick_isset(ob->wex))
406 ob->wex = tick_add_ifset(now_ms, ob->wto);
407
408 if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
409 task_wakeup(si->owner, TASK_WOKEN_IO);
410}
411
Willy Tarreaub24281b2011-02-13 13:16:36 +0100412/* Register an applet to handle a stream_interface as part of the stream
Willy Tarreaufb90d942009-09-05 20:57:35 +0200413 * interface's owner task, which is returned. The SI will wake it up everytime
Willy Tarreaub24281b2011-02-13 13:16:36 +0100414 * it is solicited. The task's processing function must call the applet's
Willy Tarreaufb90d942009-09-05 20:57:35 +0200415 * function before returning. It must be deleted by the task handler using
Willy Tarreaub24281b2011-02-13 13:16:36 +0100416 * stream_int_unregister_handler(), possibly from within the function itself.
Willy Tarreaufa6bac62012-05-31 14:16:59 +0200417 * It also pre-initializes applet.state to zero and the connection context
418 * to NULL.
Willy Tarreaufb90d942009-09-05 20:57:35 +0200419 */
Willy Tarreaub24281b2011-02-13 13:16:36 +0100420struct task *stream_int_register_handler(struct stream_interface *si, struct si_applet *app)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200421{
Simon Horman7abd00d2011-08-13 08:03:51 +0900422 DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si->owner);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200423
Willy Tarreauc5788912012-08-24 18:12:41 +0200424 si_prepare_embedded(si);
Willy Tarreau9e000c62011-03-10 14:03:36 +0100425 set_target_applet(&si->target, app);
Aman Gupta9a13e842012-04-02 18:57:53 -0700426 si->release = app->release;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200427 si->flags |= SI_FL_WAIT_DATA;
428 return si->owner;
429}
430
431/* Register a function to handle a stream_interface as a standalone task. The
432 * new task itself is returned and is assigned as si->owner. The stream_interface
433 * pointer will be pointed to by the task's context. The handler can be detached
434 * by using stream_int_unregister_handler().
Willy Tarreau7c0a1512011-03-10 11:17:02 +0100435 * FIXME: the code should be updated to ensure that we don't change si->owner
436 * anymore as this is not needed. However, process_session still relies on it.
Willy Tarreaufb90d942009-09-05 20:57:35 +0200437 */
438struct task *stream_int_register_handler_task(struct stream_interface *si,
439 struct task *(*fct)(struct task *))
440{
441 struct task *t;
442
443 DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner);
444
Willy Tarreauc5788912012-08-24 18:12:41 +0200445 si_prepare_task(si);
Willy Tarreau9e000c62011-03-10 14:03:36 +0100446 clear_target(&si->target);
Willy Tarreau0bd05ea2010-07-02 11:18:03 +0200447 si->release = NULL;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200448 si->flags |= SI_FL_WAIT_DATA;
449
450 t = task_new();
451 si->owner = t;
452 if (!t)
453 return t;
Willy Tarreau7c0a1512011-03-10 11:17:02 +0100454
Willy Tarreau9e000c62011-03-10 14:03:36 +0100455 set_target_task(&si->target, t);
Willy Tarreau7c0a1512011-03-10 11:17:02 +0100456
Willy Tarreaufb90d942009-09-05 20:57:35 +0200457 t->process = fct;
458 t->context = si;
459 task_wakeup(si->owner, TASK_WOKEN_INIT);
460
461 return t;
462}
463
464/* Unregister a stream interface handler. This must be called by the handler task
465 * itself when it detects that it is in the SI_ST_DIS state. This function can
466 * both detach standalone handlers and embedded handlers.
467 */
468void stream_int_unregister_handler(struct stream_interface *si)
469{
Willy Tarreau7c0a1512011-03-10 11:17:02 +0100470 if (si->target.type == TARG_TYPE_TASK) {
Willy Tarreaufb90d942009-09-05 20:57:35 +0200471 /* external handler : kill the task */
Willy Tarreau7c0a1512011-03-10 11:17:02 +0100472 task_delete(si->target.ptr.t);
473 task_free(si->target.ptr.t);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200474 }
Willy Tarreau0bd05ea2010-07-02 11:18:03 +0200475 si->release = NULL;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200476 si->owner = NULL;
Willy Tarreau9e000c62011-03-10 14:03:36 +0100477 clear_target(&si->target);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200478}
479
Willy Tarreau2c6be842012-07-06 17:12:34 +0200480/* This callback is used to send a valid PROXY protocol line to a socket being
Willy Tarreauafad0e02012-08-09 14:45:22 +0200481 * established. It returns 0 if it fails in a fatal way or needs to poll to go
482 * further, otherwise it returns non-zero and removes itself from the connection's
Willy Tarreaua1a74742012-08-24 12:14:49 +0200483 * flags (the bit is provided in <flag> by the caller). It is designed to be
484 * called by the connection handler and relies on it to commit polling changes.
Willy Tarreau2c6be842012-07-06 17:12:34 +0200485 */
486int conn_si_send_proxy(struct connection *conn, unsigned int flag)
487{
Willy Tarreau2c6be842012-07-06 17:12:34 +0200488 struct stream_interface *si = container_of(conn, struct stream_interface, conn);
Willy Tarreau2c6be842012-07-06 17:12:34 +0200489
490 /* we might have been called just after an asynchronous shutw */
Willy Tarreaua1a74742012-08-24 12:14:49 +0200491 if (conn->flags & CO_FL_SOCK_WR_SH)
Willy Tarreau2c6be842012-07-06 17:12:34 +0200492 goto out_error;
493
494 /* If we have a PROXY line to send, we'll use this to validate the
495 * connection, in which case the connection is validated only once
496 * we've sent the whole proxy line. Otherwise we use connect().
497 */
498 if (si->send_proxy_ofs) {
499 int ret;
500
501 /* The target server expects a PROXY line to be sent first.
502 * If the send_proxy_ofs is negative, it corresponds to the
503 * offset to start sending from then end of the proxy string
504 * (which is recomputed every time since it's constant). If
505 * it is positive, it means we have to send from the start.
506 */
Willy Tarreaua1a74742012-08-24 12:14:49 +0200507 ret = make_proxy_line(trash, trashlen, &si->ob->prod->addr.from, &si->ob->prod->addr.to);
Willy Tarreau2c6be842012-07-06 17:12:34 +0200508 if (!ret)
509 goto out_error;
510
511 if (si->send_proxy_ofs > 0)
512 si->send_proxy_ofs = -ret; /* first call */
513
Willy Tarreaua1a74742012-08-24 12:14:49 +0200514 /* we have to send trash from (ret+sp for -sp bytes). If the
515 * data layer has a pending write, we'll also set MSG_MORE.
516 */
517 ret = send(conn->t.sock.fd, trash + ret + si->send_proxy_ofs, -si->send_proxy_ofs,
518 (conn->flags & CO_FL_DATA_WR_ENA) ? MSG_MORE : 0);
Willy Tarreau2c6be842012-07-06 17:12:34 +0200519
520 if (ret == 0)
521 goto out_wait;
522
523 if (ret < 0) {
524 if (errno == EAGAIN)
525 goto out_wait;
526 goto out_error;
527 }
528
529 si->send_proxy_ofs += ret; /* becomes zero once complete */
530 if (si->send_proxy_ofs != 0)
531 goto out_wait;
532
533 /* OK we've sent the whole line, we're connected */
534 }
535
Willy Tarreaua1a74742012-08-24 12:14:49 +0200536 /* The connection is ready now, simply return and let the connection
537 * handler notify upper layers if needed.
Willy Tarreau2c6be842012-07-06 17:12:34 +0200538 */
539 if (conn->flags & CO_FL_WAIT_L4_CONN)
540 conn->flags &= ~CO_FL_WAIT_L4_CONN;
Willy Tarreau2c6be842012-07-06 17:12:34 +0200541 conn->flags &= ~flag;
Willy Tarreauafad0e02012-08-09 14:45:22 +0200542 return 1;
Willy Tarreau2c6be842012-07-06 17:12:34 +0200543
544 out_error:
Willy Tarreauafad0e02012-08-09 14:45:22 +0200545 /* Write error on the file descriptor */
Willy Tarreau2c6be842012-07-06 17:12:34 +0200546 conn->flags |= CO_FL_ERROR;
Willy Tarreauafad0e02012-08-09 14:45:22 +0200547 conn->flags &= ~flag;
Willy Tarreaua1a74742012-08-24 12:14:49 +0200548 __conn_sock_stop_both(conn);
Willy Tarreauafad0e02012-08-09 14:45:22 +0200549 return 0;
Willy Tarreau2c6be842012-07-06 17:12:34 +0200550
551 out_wait:
Willy Tarreaua1a74742012-08-24 12:14:49 +0200552 __conn_sock_stop_recv(conn);
553 __conn_sock_poll_send(conn);
Willy Tarreauafad0e02012-08-09 14:45:22 +0200554 return 0;
Willy Tarreau2c6be842012-07-06 17:12:34 +0200555}
556
Willy Tarreau100c4672012-08-20 12:06:26 +0200557/* Callback to be used by connection I/O handlers upon completion. It differs from
558 * the function below in that it is designed to be called by lower layers after I/O
559 * events have been completed. It will also try to wake the associated task up if
Willy Tarreauf16723e2012-08-24 12:52:22 +0200560 * an important event requires special handling. It relies on the connection handler
561 * to commit any polling updates.
Willy Tarreau100c4672012-08-20 12:06:26 +0200562 */
563void conn_notify_si(struct connection *conn)
Willy Tarreaufd31e532012-07-23 18:24:25 +0200564{
Willy Tarreaufd31e532012-07-23 18:24:25 +0200565 struct stream_interface *si = container_of(conn, struct stream_interface, conn);
566
567 DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
568 __FUNCTION__,
569 si, si->state, si->ib->flags, si->ob->flags);
570
Willy Tarreau3c55ec22012-07-23 19:19:51 +0200571 if (conn->flags & CO_FL_ERROR)
572 si->flags |= SI_FL_ERR;
573
Willy Tarreau8f8c92f2012-07-23 19:45:44 +0200574 /* check for recent connection establishment */
Willy Tarreauc76ae332012-07-12 15:32:13 +0200575 if (unlikely(!(conn->flags & (CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN | CO_FL_CONNECTED)))) {
Willy Tarreau8f8c92f2012-07-23 19:45:44 +0200576 si->exp = TICK_ETERNITY;
577 si->ob->flags |= BF_WRITE_NULL;
578 }
579
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200580 /* process consumer side */
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200581 if (channel_is_empty(si->ob)) {
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200582 if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
583 (si->state == SI_ST_EST))
584 stream_int_shutw(si);
Willy Tarreauf16723e2012-08-24 12:52:22 +0200585 __conn_data_stop_send(conn);
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200586 si->ob->wex = TICK_ETERNITY;
587 }
Willy Tarreaufd31e532012-07-23 18:24:25 +0200588
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200589 if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
590 si->flags |= SI_FL_WAIT_DATA;
Willy Tarreaufd31e532012-07-23 18:24:25 +0200591
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200592 if (si->ob->flags & BF_WRITE_ACTIVITY) {
593 /* update timeouts if we have written something */
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200594 if ((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL &&
595 !channel_is_empty(si->ob))
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200596 if (tick_isset(si->ob->wex))
597 si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
Willy Tarreaufd31e532012-07-23 18:24:25 +0200598
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200599 if (!(si->flags & SI_FL_INDEP_STR))
600 if (tick_isset(si->ib->rex))
601 si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
Willy Tarreaufd31e532012-07-23 18:24:25 +0200602
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200603 if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
604 (si->ob->prod->flags & SI_FL_WAIT_ROOM)))
605 si_chk_rcv(si->ob->prod);
Willy Tarreaufd31e532012-07-23 18:24:25 +0200606 }
607
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200608 /* process producer side.
609 * We might have some data the consumer is waiting for.
610 * We can do fast-forwarding, but we avoid doing this for partial
611 * buffers, because it is very likely that it will be done again
612 * immediately afterwards once the following data is parsed (eg:
613 * HTTP chunking).
614 */
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200615 if (((si->ib->flags & BF_READ_PARTIAL) && !channel_is_empty(si->ib)) &&
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200616 (si->ib->pipe /* always try to send spliced data */ ||
617 (si->ib->buf.i == 0 && (si->ib->cons->flags & SI_FL_WAIT_DATA)))) {
618 int last_len = si->ib->pipe ? si->ib->pipe->data : 0;
Willy Tarreaufd31e532012-07-23 18:24:25 +0200619
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200620 si_chk_snd(si->ib->cons);
Willy Tarreaufd31e532012-07-23 18:24:25 +0200621
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200622 /* check if the consumer has freed some space either in the
623 * buffer or in the pipe.
624 */
625 if (!(si->ib->flags & BF_FULL) &&
626 (!last_len || !si->ib->pipe || si->ib->pipe->data < last_len))
627 si->flags &= ~SI_FL_WAIT_ROOM;
628 }
Willy Tarreaufd31e532012-07-23 18:24:25 +0200629
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200630 if (si->flags & SI_FL_WAIT_ROOM) {
Willy Tarreauf16723e2012-08-24 12:52:22 +0200631 __conn_data_stop_recv(conn);
Willy Tarreau44b5dc62012-08-24 12:12:53 +0200632 si->ib->rex = TICK_ETERNITY;
633 }
634 else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
635 if (tick_isset(si->ib->rex))
636 si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
Willy Tarreaufd31e532012-07-23 18:24:25 +0200637 }
638
639 /* wake the task up only when needed */
640 if (/* changes on the production side */
641 (si->ib->flags & (BF_READ_NULL|BF_READ_ERROR)) ||
642 si->state != SI_ST_EST ||
643 (si->flags & SI_FL_ERR) ||
644 ((si->ib->flags & BF_READ_PARTIAL) &&
645 (!si->ib->to_forward || si->ib->cons->state != SI_ST_EST)) ||
646
647 /* changes on the consumption side */
648 (si->ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR)) ||
649 ((si->ob->flags & BF_WRITE_ACTIVITY) &&
650 ((si->ob->flags & BF_SHUTW) ||
651 si->ob->prod->state != SI_ST_EST ||
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200652 (channel_is_empty(si->ob) && !si->ob->to_forward)))) {
Willy Tarreaufd31e532012-07-23 18:24:25 +0200653 task_wakeup(si->owner, TASK_WOKEN_IO);
654 }
655 if (si->ib->flags & BF_READ_ACTIVITY)
656 si->ib->flags &= ~BF_READ_DONTWAIT;
657}
Willy Tarreau2c6be842012-07-06 17:12:34 +0200658
Willy Tarreau5368d802012-08-21 18:22:06 +0200659/*
660 * This function is called to send buffer data to a stream socket.
661 * It returns -1 in case of unrecoverable error, otherwise zero.
Willy Tarreauf16723e2012-08-24 12:52:22 +0200662 * It iterates the data layer's snd_buf function. It relies on the
663 * caller to commit polling changes.
Willy Tarreau5368d802012-08-21 18:22:06 +0200664 */
665static int si_conn_send_loop(struct connection *conn)
666{
667 struct stream_interface *si = container_of(conn, struct stream_interface, conn);
668 struct channel *b = si->ob;
669 int write_poll = MAX_WRITE_POLL_LOOPS;
670 int ret;
671
Willy Tarreau96199b12012-08-24 00:46:52 +0200672 conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
Willy Tarreau5368d802012-08-21 18:22:06 +0200673
Willy Tarreau96199b12012-08-24 00:46:52 +0200674 if (b->pipe && conn->data->snd_pipe) {
675 ret = conn->data->snd_pipe(conn, b->pipe);
676 if (ret > 0)
677 b->flags |= BF_WRITE_PARTIAL;
Willy Tarreau5368d802012-08-21 18:22:06 +0200678
679 if (!b->pipe->data) {
680 put_pipe(b->pipe);
681 b->pipe = NULL;
Willy Tarreau5368d802012-08-21 18:22:06 +0200682 }
683
Willy Tarreau96199b12012-08-24 00:46:52 +0200684 if (conn->flags & CO_FL_ERROR)
685 return -1;
Willy Tarreau5368d802012-08-21 18:22:06 +0200686
Willy Tarreau96199b12012-08-24 00:46:52 +0200687 if (conn->flags & CO_FL_WAIT_ROOM) {
Willy Tarreauf16723e2012-08-24 12:52:22 +0200688 __conn_data_poll_send(conn);
Willy Tarreau96199b12012-08-24 00:46:52 +0200689 return 0;
690 }
Willy Tarreau5368d802012-08-21 18:22:06 +0200691 }
692
693 /* At this point, the pipe is empty, but we may still have data pending
694 * in the normal buffer.
695 */
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200696 if (!b->buf.o)
Willy Tarreau5368d802012-08-21 18:22:06 +0200697 return 0;
Willy Tarreau5368d802012-08-21 18:22:06 +0200698
699 /* when we're in this loop, we already know that there is no spliced
700 * data left, and that there are sendable buffered data.
701 */
Willy Tarreau5368d802012-08-21 18:22:06 +0200702 while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_DATA_WR_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
703 /* check if we want to inform the kernel that we're interested in
704 * sending more data after this call. We want this if :
705 * - we're about to close after this last send and want to merge
706 * the ongoing FIN with the last segment.
707 * - we know we can't send everything at once and must get back
708 * here because of unaligned data
709 * - there is still a finite amount of data to forward
710 * The test is arranged so that the most common case does only 2
711 * tests.
712 */
713 unsigned int send_flag = MSG_DONTWAIT | MSG_NOSIGNAL;
714
715 if ((!(b->flags & (BF_NEVER_WAIT|BF_SEND_DONTWAIT)) &&
716 ((b->to_forward && b->to_forward != BUF_INFINITE_FORWARD) ||
717 (b->flags & BF_EXPECT_MORE))) ||
718 ((b->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == BF_SHUTW_NOW))
719 send_flag |= MSG_MORE;
720
721 ret = conn->data->snd_buf(conn, &b->buf, send_flag);
722 if (ret <= 0)
723 break;
724
725 if (si->conn.flags & CO_FL_WAIT_L4_CONN)
726 si->conn.flags &= ~CO_FL_WAIT_L4_CONN;
727
728 b->flags |= BF_WRITE_PARTIAL;
729
730 if (likely(!bi_full(b)))
731 b->flags &= ~BF_FULL;
732
733 if (!b->buf.o) {
734 /* Always clear both flags once everything has been sent, they're one-shot */
735 b->flags &= ~(BF_EXPECT_MORE | BF_SEND_DONTWAIT);
Willy Tarreau5368d802012-08-21 18:22:06 +0200736 break;
737 }
738
739 if (--write_poll <= 0)
740 break;
741 } /* while */
742
743 if (conn->flags & CO_FL_ERROR)
744 return -1;
745
746 if (conn->flags & CO_FL_WAIT_ROOM) {
747 /* we need to poll before going on */
Willy Tarreauf16723e2012-08-24 12:52:22 +0200748 __conn_data_poll_send(&si->conn);
Willy Tarreau5368d802012-08-21 18:22:06 +0200749 return 0;
750 }
751 return 0;
752}
753
754
Willy Tarreau100c4672012-08-20 12:06:26 +0200755/* Updates the timers and flags of a stream interface attached to a connection,
756 * depending on the buffers' flags. It should only be called once after the
757 * buffer flags have settled down, and before they are cleared. It doesn't
758 * harm to call it as often as desired (it just slightly hurts performance).
759 * It is only meant to be called by upper layers after buffer flags have been
760 * manipulated by analysers.
761 */
762void stream_int_update_conn(struct stream_interface *si)
763{
Willy Tarreau7421efb2012-07-02 15:11:27 +0200764 struct channel *ib = si->ib;
765 struct channel *ob = si->ob;
Willy Tarreau100c4672012-08-20 12:06:26 +0200766
767 if (si->conn.flags & CO_FL_HANDSHAKE) {
768 /* a handshake is in progress */
Willy Tarreau100c4672012-08-20 12:06:26 +0200769 return;
770 }
771
772 /* Check if we need to close the read side */
773 if (!(ib->flags & BF_SHUTR)) {
774 /* Read not closed, update FD status and timeout for reads */
775 if (ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) {
776 /* stop reading */
777 if (!(si->flags & SI_FL_WAIT_ROOM)) {
778 if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
779 si->flags |= SI_FL_WAIT_ROOM;
780 conn_data_stop_recv(&si->conn);
781 ib->rex = TICK_ETERNITY;
782 }
783 }
784 else {
785 /* (re)start reading and update timeout. Note: we don't recompute the timeout
786 * everytime we get here, otherwise it would risk never to expire. We only
787 * update it if is was not yet set. The stream socket handler will already
788 * have updated it if there has been a completed I/O.
789 */
790 si->flags &= ~SI_FL_WAIT_ROOM;
791 conn_data_want_recv(&si->conn);
792 if (!(ib->flags & (BF_READ_NOEXP|BF_DONT_READ)) && !tick_isset(ib->rex))
793 ib->rex = tick_add_ifset(now_ms, ib->rto);
794 }
795 }
796
797 /* Check if we need to close the write side */
798 if (!(ob->flags & BF_SHUTW)) {
799 /* Write not closed, update FD status and timeout for writes */
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200800 if (channel_is_empty(ob)) {
Willy Tarreau100c4672012-08-20 12:06:26 +0200801 /* stop writing */
802 if (!(si->flags & SI_FL_WAIT_DATA)) {
803 if ((ob->flags & (BF_FULL|BF_HIJACK|BF_SHUTW_NOW)) == 0)
804 si->flags |= SI_FL_WAIT_DATA;
805 conn_data_stop_send(&si->conn);
806 ob->wex = TICK_ETERNITY;
807 }
808 }
809 else {
810 /* (re)start writing and update timeout. Note: we don't recompute the timeout
811 * everytime we get here, otherwise it would risk never to expire. We only
812 * update it if is was not yet set. The stream socket handler will already
813 * have updated it if there has been a completed I/O.
814 */
815 si->flags &= ~SI_FL_WAIT_DATA;
816 conn_data_want_send(&si->conn);
817 if (!tick_isset(ob->wex)) {
818 ob->wex = tick_add_ifset(now_ms, ob->wto);
819 if (tick_isset(ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
820 /* Note: depending on the protocol, we don't know if we're waiting
821 * for incoming data or not. So in order to prevent the socket from
822 * expiring read timeouts during writes, we refresh the read timeout,
823 * except if it was already infinite or if we have explicitly setup
824 * independent streams.
825 */
826 ib->rex = tick_add_ifset(now_ms, ib->rto);
827 }
828 }
829 }
830 }
831}
832
Willy Tarreau46a8d922012-08-20 12:38:36 +0200833/* This function is used for inter-stream-interface calls. It is called by the
834 * consumer to inform the producer side that it may be interested in checking
835 * for free space in the buffer. Note that it intentionally does not update
836 * timeouts, so that we can still check them later at wake-up. This function is
837 * dedicated to connection-based stream interfaces.
838 */
Willy Tarreauc5788912012-08-24 18:12:41 +0200839static void stream_int_chk_rcv_conn(struct stream_interface *si)
Willy Tarreau46a8d922012-08-20 12:38:36 +0200840{
Willy Tarreau7421efb2012-07-02 15:11:27 +0200841 struct channel *ib = si->ib;
Willy Tarreau46a8d922012-08-20 12:38:36 +0200842
843 if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
844 return;
845
846 if (si->conn.flags & CO_FL_HANDSHAKE) {
847 /* a handshake is in progress */
848 return;
849 }
850
851 if (ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) {
852 /* stop reading */
853 if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
854 si->flags |= SI_FL_WAIT_ROOM;
855 conn_data_stop_recv(&si->conn);
856 }
857 else {
858 /* (re)start reading */
859 si->flags &= ~SI_FL_WAIT_ROOM;
860 conn_data_want_recv(&si->conn);
861 }
862}
863
864
Willy Tarreaude5722c2012-08-20 15:01:10 +0200865/* This function is used for inter-stream-interface calls. It is called by the
866 * producer to inform the consumer side that it may be interested in checking
867 * for data in the buffer. Note that it intentionally does not update timeouts,
868 * so that we can still check them later at wake-up.
869 */
Willy Tarreauc5788912012-08-24 18:12:41 +0200870static void stream_int_chk_snd_conn(struct stream_interface *si)
Willy Tarreaude5722c2012-08-20 15:01:10 +0200871{
Willy Tarreau7421efb2012-07-02 15:11:27 +0200872 struct channel *ob = si->ob;
Willy Tarreaude5722c2012-08-20 15:01:10 +0200873
874 if (unlikely(si->state != SI_ST_EST || (ob->flags & BF_SHUTW)))
875 return;
876
877 /* handshake running on producer */
878 if (si->conn.flags & CO_FL_HANDSHAKE) {
879 /* a handshake is in progress */
Willy Tarreaude5722c2012-08-20 15:01:10 +0200880 return;
881 }
882
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200883 if (unlikely(channel_is_empty(ob))) /* called with nothing to send ! */
Willy Tarreaude5722c2012-08-20 15:01:10 +0200884 return;
885
886 if (!ob->pipe && /* spliced data wants to be forwarded ASAP */
887 (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
888 (fdtab[si_fd(si)].ev & FD_POLL_OUT))) /* we'll be called anyway */
889 return;
890
Willy Tarreau5368d802012-08-21 18:22:06 +0200891 if (si_conn_send_loop(&si->conn) < 0) {
Willy Tarreaude5722c2012-08-20 15:01:10 +0200892 /* Write error on the file descriptor. We mark the FD as STERROR so
893 * that we don't use it anymore and we notify the task.
894 */
895 fdtab[si_fd(si)].ev &= ~FD_POLL_STICKY;
Willy Tarreauf16723e2012-08-24 12:52:22 +0200896 __conn_data_stop_both(&si->conn);
Willy Tarreaude5722c2012-08-20 15:01:10 +0200897 si->flags |= SI_FL_ERR;
898 si->conn.flags |= CO_FL_ERROR;
899 goto out_wakeup;
900 }
901
902 /* OK, so now we know that some data might have been sent, and that we may
903 * have to poll first. We have to do that too if the buffer is not empty.
904 */
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200905 if (channel_is_empty(ob)) {
Willy Tarreaude5722c2012-08-20 15:01:10 +0200906 /* the connection is established but we can't write. Either the
907 * buffer is empty, or we just refrain from sending because the
908 * ->o limit was reached. Maybe we just wrote the last
909 * chunk and need to close.
910 */
911 if (((ob->flags & (BF_SHUTW|BF_HIJACK|BF_AUTO_CLOSE|BF_SHUTW_NOW)) ==
912 (BF_AUTO_CLOSE|BF_SHUTW_NOW)) &&
913 (si->state == SI_ST_EST)) {
914 si_shutw(si);
915 goto out_wakeup;
916 }
917
918 if ((ob->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_FULL|BF_HIJACK)) == 0)
919 si->flags |= SI_FL_WAIT_DATA;
920 ob->wex = TICK_ETERNITY;
921 }
922 else {
923 /* Otherwise there are remaining data to be sent in the buffer,
924 * which means we have to poll before doing so.
925 */
Willy Tarreauf16723e2012-08-24 12:52:22 +0200926 __conn_data_want_send(&si->conn);
Willy Tarreaude5722c2012-08-20 15:01:10 +0200927 si->flags &= ~SI_FL_WAIT_DATA;
928 if (!tick_isset(ob->wex))
929 ob->wex = tick_add_ifset(now_ms, ob->wto);
930 }
931
932 if (likely(ob->flags & BF_WRITE_ACTIVITY)) {
933 /* update timeout if we have written something */
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200934 if ((ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL &&
935 !channel_is_empty(ob))
Willy Tarreaude5722c2012-08-20 15:01:10 +0200936 ob->wex = tick_add_ifset(now_ms, ob->wto);
937
938 if (tick_isset(si->ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
939 /* Note: to prevent the client from expiring read timeouts
940 * during writes, we refresh it. We only do this if the
941 * interface is not configured for "independent streams",
942 * because for some applications it's better not to do this,
943 * for instance when continuously exchanging small amounts
944 * of data which can full the socket buffers long before a
945 * write timeout is detected.
946 */
947 si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
948 }
949 }
950
951 /* in case of special condition (error, shutdown, end of write...), we
952 * have to notify the task.
953 */
954 if (likely((ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
Willy Tarreau8e21bb92012-08-24 22:40:29 +0200955 (channel_is_empty(ob) && !ob->to_forward) ||
Willy Tarreaude5722c2012-08-20 15:01:10 +0200956 si->state != SI_ST_EST)) {
957 out_wakeup:
958 if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
959 task_wakeup(si->owner, TASK_WOKEN_IO);
960 }
Willy Tarreauf16723e2012-08-24 12:52:22 +0200961
962 /* commit possible polling changes */
963 conn_cond_update_polling(&si->conn);
Willy Tarreaude5722c2012-08-20 15:01:10 +0200964}
965
Willy Tarreaueecf6ca2012-08-20 15:09:53 +0200966/*
Willy Tarreauce323de2012-08-20 21:41:06 +0200967 * This is the callback which is called by the connection layer to receive data
968 * into the buffer from the connection. It iterates over the data layer's rcv_buf
969 * function.
970 */
971void si_conn_recv_cb(struct connection *conn)
972{
973 struct stream_interface *si = container_of(conn, struct stream_interface, conn);
974 struct channel *b = si->ib;
975 int ret, max, cur_read;
976 int read_poll = MAX_READ_POLL_LOOPS;
977
978 /* stop immediately on errors. Note that we DON'T want to stop on
979 * POLL_ERR, as the poller might report a write error while there
980 * are still data available in the recv buffer. This typically
981 * happens when we send too large a request to a backend server
982 * which rejects it before reading it all.
983 */
984 if (conn->flags & CO_FL_ERROR)
985 goto out_error;
986
987 /* stop here if we reached the end of data */
988 if (conn_data_read0_pending(conn))
989 goto out_shutdown_r;
990
991 /* maybe we were called immediately after an asynchronous shutr */
992 if (b->flags & BF_SHUTR)
993 return;
994
Willy Tarreau96199b12012-08-24 00:46:52 +0200995 cur_read = 0;
996 conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
997
998 /* First, let's see if we may splice data across the channel without
999 * using a buffer.
1000 */
1001 if (conn->data->rcv_pipe &&
1002 b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) {
1003 if (buffer_not_empty(&b->buf)) {
1004 /* We're embarrassed, there are already data pending in
1005 * the buffer and we don't want to have them at two
1006 * locations at a time. Let's indicate we need some
1007 * place and ask the consumer to hurry.
1008 */
1009 goto abort_splice;
1010 }
Willy Tarreauce323de2012-08-20 21:41:06 +02001011
Willy Tarreau96199b12012-08-24 00:46:52 +02001012 if (unlikely(b->pipe == NULL)) {
1013 if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
1014 b->flags &= ~BF_KERN_SPLICING;
1015 goto abort_splice;
1016 }
1017 }
1018
1019 ret = conn->data->rcv_pipe(conn, b->pipe, b->to_forward);
1020 if (ret < 0) {
1021 /* splice not supported on this end, let's disable it */
1022 b->flags &= ~BF_KERN_SPLICING;
1023 si->flags &= ~SI_FL_CAP_SPLICE;
1024 goto abort_splice;
1025 }
Willy Tarreauce323de2012-08-20 21:41:06 +02001026
Willy Tarreau96199b12012-08-24 00:46:52 +02001027 if (ret > 0) {
1028 if (b->to_forward != BUF_INFINITE_FORWARD)
1029 b->to_forward -= ret;
1030 b->total += ret;
1031 cur_read += ret;
1032 b->flags |= BF_READ_PARTIAL;
Willy Tarreauce323de2012-08-20 21:41:06 +02001033 }
Willy Tarreau96199b12012-08-24 00:46:52 +02001034
1035 if (conn_data_read0_pending(conn))
1036 goto out_shutdown_r;
1037
1038 if (conn->flags & CO_FL_ERROR)
1039 goto out_error;
1040
Willy Tarreauce323de2012-08-20 21:41:06 +02001041 /* splice not possible (anymore), let's go on on standard copy */
1042 }
Willy Tarreau96199b12012-08-24 00:46:52 +02001043
1044 abort_splice:
1045 /* release the pipe if we can, which is almost always the case */
1046 if (b->pipe && !b->pipe->data) {
1047 put_pipe(b->pipe);
1048 b->pipe = NULL;
1049 }
1050
1051 while (!b->pipe && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
Willy Tarreauce323de2012-08-20 21:41:06 +02001052 max = bi_avail(b);
1053
1054 if (!max) {
1055 b->flags |= BF_FULL;
Willy Tarreau96199b12012-08-24 00:46:52 +02001056 conn->flags |= CO_FL_WAIT_ROOM;
Willy Tarreauce323de2012-08-20 21:41:06 +02001057 break;
1058 }
1059
1060 ret = conn->data->rcv_buf(conn, &b->buf, max);
1061 if (ret <= 0)
1062 break;
1063
1064 cur_read += ret;
1065
1066 /* if we're allowed to directly forward data, we must update ->o */
1067 if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) {
1068 unsigned long fwd = ret;
1069 if (b->to_forward != BUF_INFINITE_FORWARD) {
1070 if (fwd > b->to_forward)
1071 fwd = b->to_forward;
1072 b->to_forward -= fwd;
1073 }
1074 b_adv(b, fwd);
1075 }
1076
1077 if (conn->flags & CO_FL_WAIT_L4_CONN)
1078 conn->flags &= ~CO_FL_WAIT_L4_CONN;
1079
1080 b->flags |= BF_READ_PARTIAL;
1081 b->total += ret;
1082
1083 if (bi_full(b)) {
1084 /* The buffer is now full, there's no point in going through
1085 * the loop again.
1086 */
1087 if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) {
1088 b->xfer_small = 0;
1089 b->xfer_large++;
1090 if (b->xfer_large >= 3) {
1091 /* we call this buffer a fast streamer if it manages
1092 * to be filled in one call 3 consecutive times.
1093 */
1094 b->flags |= (BF_STREAMER | BF_STREAMER_FAST);
1095 //fputc('+', stderr);
1096 }
1097 }
1098 else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
1099 (cur_read <= b->buf.size / 2)) {
1100 b->xfer_large = 0;
1101 b->xfer_small++;
1102 if (b->xfer_small >= 2) {
1103 /* if the buffer has been at least half full twice,
1104 * we receive faster than we send, so at least it
1105 * is not a "fast streamer".
1106 */
1107 b->flags &= ~BF_STREAMER_FAST;
1108 //fputc('-', stderr);
1109 }
1110 }
1111 else {
1112 b->xfer_small = 0;
1113 b->xfer_large = 0;
1114 }
1115
1116 b->flags |= BF_FULL;
1117 si->flags |= SI_FL_WAIT_ROOM;
1118 break;
1119 }
1120
1121 if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0)
1122 break;
1123
1124 /* if too many bytes were missing from last read, it means that
1125 * it's pointless trying to read again because the system does
1126 * not have them in buffers.
1127 */
1128 if (ret < max) {
1129 if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
1130 (cur_read <= b->buf.size / 2)) {
1131 b->xfer_large = 0;
1132 b->xfer_small++;
1133 if (b->xfer_small >= 3) {
1134 /* we have read less than half of the buffer in
1135 * one pass, and this happened at least 3 times.
1136 * This is definitely not a streamer.
1137 */
1138 b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST);
1139 //fputc('!', stderr);
1140 }
1141 }
1142
1143 /* if a streamer has read few data, it may be because we
1144 * have exhausted system buffers. It's not worth trying
1145 * again.
1146 */
1147 if (b->flags & BF_STREAMER)
1148 break;
1149
1150 /* if we read a large block smaller than what we requested,
1151 * it's almost certain we'll never get anything more.
1152 */
1153 if (ret >= global.tune.recv_enough)
1154 break;
1155 }
1156 } /* while !flags */
1157
Willy Tarreau96199b12012-08-24 00:46:52 +02001158 if (conn->flags & CO_FL_ERROR)
1159 goto out_error;
1160
1161 if (conn->flags & CO_FL_WAIT_ROOM) {
Willy Tarreau2c052082012-08-24 12:53:56 +02001162 si->flags |= SI_FL_WAIT_ROOM;
Willy Tarreau96199b12012-08-24 00:46:52 +02001163 }
1164 else if (conn->flags & CO_FL_WAIT_DATA) {
Willy Tarreauce323de2012-08-20 21:41:06 +02001165 /* we don't automatically ask for polling if we have
1166 * read enough data, as it saves some syscalls with
1167 * speculative pollers.
1168 */
1169 if (cur_read < MIN_RET_FOR_READ_LOOP)
1170 __conn_data_poll_recv(conn);
1171 else
1172 __conn_data_want_recv(conn);
1173 }
1174
Willy Tarreauce323de2012-08-20 21:41:06 +02001175 if (conn_data_read0_pending(conn))
1176 /* connection closed */
1177 goto out_shutdown_r;
1178
1179 return;
1180
1181 out_shutdown_r:
1182 /* we received a shutdown */
1183 b->flags |= BF_READ_NULL;
1184 if (b->flags & BF_AUTO_CLOSE)
1185 buffer_shutw_now(b);
1186 stream_sock_read0(si);
1187 conn_data_read0(conn);
1188 return;
1189
1190 out_error:
1191 /* Read error on the connection, report the error and stop I/O */
1192 conn->flags |= CO_FL_ERROR;
Willy Tarreauf16723e2012-08-24 12:52:22 +02001193 __conn_data_stop_both(conn);
Willy Tarreauce323de2012-08-20 21:41:06 +02001194}
1195
1196/*
Willy Tarreaueecf6ca2012-08-20 15:09:53 +02001197 * This is the callback which is called by the connection layer to send data
1198 * from the buffer to the connection. It iterates over the data layer's snd_buf
1199 * function.
1200 */
1201void si_conn_send_cb(struct connection *conn)
1202{
1203 struct stream_interface *si = container_of(conn, struct stream_interface, conn);
Willy Tarreau7421efb2012-07-02 15:11:27 +02001204 struct channel *b = si->ob;
Willy Tarreaueecf6ca2012-08-20 15:09:53 +02001205
1206 if (conn->flags & CO_FL_ERROR)
1207 goto out_error;
1208
1209 if (si->conn.flags & CO_FL_HANDSHAKE)
1210 /* a handshake was requested */
1211 return;
1212
1213 /* we might have been called just after an asynchronous shutw */
1214 if (b->flags & BF_SHUTW)
1215 return;
1216
1217 /* OK there are data waiting to be sent */
Willy Tarreau5368d802012-08-21 18:22:06 +02001218 if (si_conn_send_loop(conn) < 0)
Willy Tarreaueecf6ca2012-08-20 15:09:53 +02001219 goto out_error;
1220
1221 /* OK all done */
1222 return;
1223
1224 out_error:
1225 /* Write error on the connection, report the error and stop I/O */
1226 conn->flags |= CO_FL_ERROR;
Willy Tarreauf16723e2012-08-24 12:52:22 +02001227 __conn_data_stop_both(conn);
Willy Tarreaueecf6ca2012-08-20 15:09:53 +02001228}
1229
Willy Tarreau9bf9c142012-08-20 15:38:41 +02001230/*
1231 * This function propagates a null read received on a socket-based connection.
1232 * It updates the stream interface. If the stream interface has SI_FL_NOHALF,
1233 * the close is also forwarded to the write side as an abort. This function is
1234 * still socket-specific as it handles a setsockopt() call to set the SO_LINGER
1235 * state on the socket.
1236 */
1237void stream_sock_read0(struct stream_interface *si)
1238{
1239 si->ib->flags &= ~BF_SHUTR_NOW;
1240 if (si->ib->flags & BF_SHUTR)
1241 return;
1242 si->ib->flags |= BF_SHUTR;
1243 si->ib->rex = TICK_ETERNITY;
1244 si->flags &= ~SI_FL_WAIT_ROOM;
1245
1246 if (si->state != SI_ST_EST && si->state != SI_ST_CON)
1247 return;
1248
1249 if (si->ob->flags & BF_SHUTW)
1250 goto do_close;
1251
1252 if (si->flags & SI_FL_NOHALF) {
1253 /* we want to immediately forward this close to the write side */
1254 if (si->flags & SI_FL_NOLINGER) {
1255 si->flags &= ~SI_FL_NOLINGER;
1256 setsockopt(si_fd(si), SOL_SOCKET, SO_LINGER,
1257 (struct linger *) &nolinger, sizeof(struct linger));
1258 }
1259 /* force flag on ssl to keep session in cache */
1260 if (si->conn.data->shutw)
1261 si->conn.data->shutw(&si->conn, 0);
1262 goto do_close;
1263 }
1264
1265 /* otherwise that's just a normal read shutdown */
Willy Tarreauf16723e2012-08-24 12:52:22 +02001266 __conn_data_stop_recv(&si->conn);
Willy Tarreau9bf9c142012-08-20 15:38:41 +02001267 return;
1268
1269 do_close:
1270 conn_data_close(&si->conn);
1271 fd_delete(si_fd(si));
1272 si->state = SI_ST_DIS;
1273 si->exp = TICK_ETERNITY;
1274 if (si->release)
1275 si->release(si);
1276 return;
1277}
1278
Willy Tarreaudded32d2008-11-30 19:48:07 +01001279/*
Willy Tarreaucff64112008-11-03 06:26:53 +01001280 * Local variables:
1281 * c-indent-level: 8
1282 * c-basic-offset: 8
1283 * End:
1284 */