blob: 2cc1962d196b1f87a7adda5a20939c6b9c65d8a9 [file] [log] [blame]
Willy Tarreaucff64112008-11-03 06:26:53 +01001/*
2 * Functions managing stream_interface structures
3 *
Willy Tarreauf873d752012-05-11 17:47:17 +02004 * Copyright 2000-2012 Willy Tarreau <w@1wt.eu>
Willy Tarreaucff64112008-11-03 06:26:53 +01005 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17
18#include <sys/socket.h>
19#include <sys/stat.h>
20#include <sys/types.h>
21
22#include <common/compat.h>
23#include <common/config.h>
24#include <common/debug.h>
25#include <common/standard.h>
26#include <common/ticks.h>
27#include <common/time.h>
28
29#include <proto/buffers.h>
Willy Tarreau8b117082012-08-06 15:06:49 +020030#include <proto/connection.h>
Willy Tarreaucff64112008-11-03 06:26:53 +010031#include <proto/fd.h>
Willy Tarreau2c6be842012-07-06 17:12:34 +020032#include <proto/frontend.h>
Willy Tarreau269358d2009-09-20 20:14:49 +020033#include <proto/stream_interface.h>
Willy Tarreaucff64112008-11-03 06:26:53 +010034#include <proto/task.h>
35
Willy Tarreaufd31e532012-07-23 18:24:25 +020036#include <types/pipe.h>
37
Willy Tarreauf873d752012-05-11 17:47:17 +020038/* socket functions used when running a stream interface as a task */
39static void stream_int_update(struct stream_interface *si);
40static void stream_int_update_embedded(struct stream_interface *si);
Willy Tarreauf873d752012-05-11 17:47:17 +020041static void stream_int_chk_rcv(struct stream_interface *si);
42static void stream_int_chk_snd(struct stream_interface *si);
43
Willy Tarreau5c979a92012-05-07 17:15:39 +020044/* socket operations for embedded tasks */
45struct sock_ops stream_int_embedded = {
46 .update = stream_int_update_embedded,
Willy Tarreau4a36b562012-08-06 19:31:45 +020047 .shutr = NULL,
48 .shutw = NULL,
Willy Tarreau5c979a92012-05-07 17:15:39 +020049 .chk_rcv = stream_int_chk_rcv,
50 .chk_snd = stream_int_chk_snd,
51 .read = NULL,
52 .write = NULL,
Willy Tarreau24208272012-05-21 17:28:50 +020053 .close = NULL,
Willy Tarreau5c979a92012-05-07 17:15:39 +020054};
55
56/* socket operations for external tasks */
57struct sock_ops stream_int_task = {
58 .update = stream_int_update,
Willy Tarreau4a36b562012-08-06 19:31:45 +020059 .shutr = NULL,
60 .shutw = NULL,
Willy Tarreau5c979a92012-05-07 17:15:39 +020061 .chk_rcv = stream_int_chk_rcv,
62 .chk_snd = stream_int_chk_snd,
63 .read = NULL,
64 .write = NULL,
Willy Tarreau24208272012-05-21 17:28:50 +020065 .close = NULL,
Willy Tarreau5c979a92012-05-07 17:15:39 +020066};
67
Willy Tarreaucff64112008-11-03 06:26:53 +010068/*
69 * This function only has to be called once after a wakeup event in case of
70 * suspected timeout. It controls the stream interface timeouts and sets
71 * si->flags accordingly. It does NOT close anything, as this timeout may
72 * be used for any purpose. It returns 1 if the timeout fired, otherwise
73 * zero.
74 */
75int stream_int_check_timeouts(struct stream_interface *si)
76{
77 if (tick_is_expired(si->exp, now_ms)) {
78 si->flags |= SI_FL_EXP;
79 return 1;
80 }
81 return 0;
82}
83
Willy Tarreaufe3718a2008-11-30 18:14:12 +010084/* to be called only when in SI_ST_DIS with SI_FL_ERR */
Willy Tarreaucff64112008-11-03 06:26:53 +010085void stream_int_report_error(struct stream_interface *si)
86{
87 if (!si->err_type)
88 si->err_type = SI_ET_DATA_ERR;
89
Willy Tarreaucff64112008-11-03 06:26:53 +010090 si->ob->flags |= BF_WRITE_ERROR;
Willy Tarreaucff64112008-11-03 06:26:53 +010091 si->ib->flags |= BF_READ_ERROR;
92}
93
94/*
Willy Tarreaudded32d2008-11-30 19:48:07 +010095 * Returns a message to the client ; the connection is shut down for read,
96 * and the request is cleared so that no server connection can be initiated.
97 * The buffer is marked for read shutdown on the other side to protect the
98 * message, and the buffer write is enabled. The message is contained in a
Willy Tarreau148d0992010-01-10 10:21:21 +010099 * "chunk". If it is null, then an empty message is used. The reply buffer does
100 * not need to be empty before this, and its contents will not be overwritten.
101 * The primary goal of this function is to return error messages to a client.
Willy Tarreaudded32d2008-11-30 19:48:07 +0100102 */
103void stream_int_retnclose(struct stream_interface *si, const struct chunk *msg)
104{
Willy Tarreau148d0992010-01-10 10:21:21 +0100105 buffer_auto_read(si->ib);
Willy Tarreaudded32d2008-11-30 19:48:07 +0100106 buffer_abort(si->ib);
Willy Tarreau148d0992010-01-10 10:21:21 +0100107 buffer_auto_close(si->ib);
108 buffer_erase(si->ib);
Willy Tarreau798e1282010-12-12 13:06:00 +0100109
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200110 bi_erase(si->ob);
Willy Tarreau148d0992010-01-10 10:21:21 +0100111 if (likely(msg && msg->len))
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200112 bo_inject(si->ob, msg->str, msg->len);
Willy Tarreaudded32d2008-11-30 19:48:07 +0100113
114 si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
Willy Tarreau148d0992010-01-10 10:21:21 +0100115 buffer_auto_read(si->ob);
Willy Tarreau520d95e2009-09-19 21:04:57 +0200116 buffer_auto_close(si->ob);
Willy Tarreau5d881d02009-12-27 22:51:06 +0100117 buffer_shutr_now(si->ob);
Willy Tarreau5d881d02009-12-27 22:51:06 +0100118}
119
Willy Tarreaufb90d942009-09-05 20:57:35 +0200120/* default update function for scheduled tasks, not used for embedded tasks */
Willy Tarreauf873d752012-05-11 17:47:17 +0200121static void stream_int_update(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200122{
123 DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
124 __FUNCTION__,
125 si, si->state, si->ib->flags, si->ob->flags);
126
127 if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
128 task_wakeup(si->owner, TASK_WOKEN_IO);
129}
130
131/* default update function for embedded tasks, to be used at the end of the i/o handler */
Willy Tarreauf873d752012-05-11 17:47:17 +0200132static void stream_int_update_embedded(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200133{
Willy Tarreau3488e252010-08-09 16:24:56 +0200134 int old_flags = si->flags;
135
Willy Tarreaufb90d942009-09-05 20:57:35 +0200136 DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
137 __FUNCTION__,
138 si, si->state, si->ib->flags, si->ob->flags);
139
140 if (si->state != SI_ST_EST)
141 return;
142
143 if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW))
Willy Tarreau73b013b2012-05-21 16:31:45 +0200144 si_shutw(si);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200145
146 if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
147 si->flags |= SI_FL_WAIT_DATA;
148
Willy Tarreau96fd4b52009-10-04 17:18:35 +0200149 /* we're almost sure that we need some space if the buffer is not
150 * empty, even if it's not full, because the applets can't fill it.
151 */
Willy Tarreauf1ba4b32009-10-17 14:37:52 +0200152 if ((si->ib->flags & (BF_SHUTR|BF_OUT_EMPTY|BF_DONT_READ)) == 0)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200153 si->flags |= SI_FL_WAIT_ROOM;
154
Willy Tarreauf27b5ea2009-10-03 22:01:18 +0200155 if (si->ob->flags & BF_WRITE_ACTIVITY) {
Willy Tarreaufb90d942009-09-05 20:57:35 +0200156 if (tick_isset(si->ob->wex))
157 si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
158 }
159
Willy Tarreauf27b5ea2009-10-03 22:01:18 +0200160 if (si->ib->flags & BF_READ_ACTIVITY ||
161 (si->ob->flags & BF_WRITE_ACTIVITY && !(si->flags & SI_FL_INDEP_STR))) {
162 if (tick_isset(si->ib->rex))
163 si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
164 }
165
Willy Tarreau3488e252010-08-09 16:24:56 +0200166 /* save flags to detect changes */
167 old_flags = si->flags;
Willy Tarreauf1ba4b32009-10-17 14:37:52 +0200168 if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
Willy Tarreau96fd4b52009-10-04 17:18:35 +0200169 (si->ob->prod->flags & SI_FL_WAIT_ROOM)))
Willy Tarreau73b013b2012-05-21 16:31:45 +0200170 si_chk_rcv(si->ob->prod);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200171
Willy Tarreau96fd4b52009-10-04 17:18:35 +0200172 if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
Willy Tarreau3488e252010-08-09 16:24:56 +0200173 (si->ib->cons->flags & SI_FL_WAIT_DATA)) {
Willy Tarreau73b013b2012-05-21 16:31:45 +0200174 si_chk_snd(si->ib->cons);
Willy Tarreau3488e252010-08-09 16:24:56 +0200175 /* check if the consumer has freed some space */
176 if (!(si->ib->flags & BF_FULL))
177 si->flags &= ~SI_FL_WAIT_ROOM;
178 }
Willy Tarreaufb90d942009-09-05 20:57:35 +0200179
180 /* Note that we're trying to wake up in two conditions here :
181 * - special event, which needs the holder task attention
182 * - status indicating that the applet can go on working. This
183 * is rather hard because we might be blocking on output and
184 * don't want to wake up on input and vice-versa. The idea is
Willy Tarreau3488e252010-08-09 16:24:56 +0200185 * to only rely on the changes the chk_* might have performed.
Willy Tarreaufb90d942009-09-05 20:57:35 +0200186 */
187 if (/* check stream interface changes */
Willy Tarreau3488e252010-08-09 16:24:56 +0200188 ((old_flags & ~si->flags) & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) ||
189
190 /* changes on the production side */
191 (si->ib->flags & (BF_READ_NULL|BF_READ_ERROR)) ||
192 si->state != SI_ST_EST ||
193 (si->flags & SI_FL_ERR) ||
194 ((si->ib->flags & BF_READ_PARTIAL) &&
195 (!si->ib->to_forward || si->ib->cons->state != SI_ST_EST)) ||
196
197 /* changes on the consumption side */
198 (si->ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR)) ||
199 ((si->ob->flags & BF_WRITE_ACTIVITY) &&
200 ((si->ob->flags & BF_SHUTW) ||
201 si->ob->prod->state != SI_ST_EST ||
202 ((si->ob->flags & BF_OUT_EMPTY) && !si->ob->to_forward)))) {
Willy Tarreaufb90d942009-09-05 20:57:35 +0200203 if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
204 task_wakeup(si->owner, TASK_WOKEN_IO);
205 }
Willy Tarreau3488e252010-08-09 16:24:56 +0200206 if (si->ib->flags & BF_READ_ACTIVITY)
207 si->ib->flags &= ~BF_READ_DONTWAIT;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200208}
209
Willy Tarreau4a36b562012-08-06 19:31:45 +0200210/*
211 * This function performs a shutdown-read on a stream interface in a connected
212 * or init state (it does nothing for other states). It either shuts the read
213 * side or marks itself as closed. The buffer flags are updated to reflect the
214 * new state. If the stream interface has SI_FL_NOHALF, we also forward the
215 * close to the write side. If a control layer is defined, then it is supposed
216 * to be a socket layer and file descriptors are then shutdown or closed
217 * accordingly. If no control layer is defined, then the SI is supposed to be
218 * an embedded one and the owner task is woken up if it exists. The function
219 * does not disable polling on the FD by itself, it returns non-zero instead
220 * if the caller needs to do so (except when the FD is deleted where this is
221 * implicit).
222 */
223int stream_int_shutr(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200224{
Willy Tarreau4a36b562012-08-06 19:31:45 +0200225 struct connection *conn = &si->conn;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200226
227 si->ib->flags &= ~BF_SHUTR_NOW;
228 if (si->ib->flags & BF_SHUTR)
Willy Tarreau4a36b562012-08-06 19:31:45 +0200229 return 0;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200230 si->ib->flags |= BF_SHUTR;
231 si->ib->rex = TICK_ETERNITY;
232 si->flags &= ~SI_FL_WAIT_ROOM;
233
234 if (si->state != SI_ST_EST && si->state != SI_ST_CON)
Willy Tarreau4a36b562012-08-06 19:31:45 +0200235 return 0;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200236
237 if (si->ob->flags & BF_SHUTW) {
Willy Tarreau4a36b562012-08-06 19:31:45 +0200238 conn_data_close(&si->conn);
239 if (conn->ctrl)
240 fd_delete(si_fd(si));
Willy Tarreaufb90d942009-09-05 20:57:35 +0200241 si->state = SI_ST_DIS;
242 si->exp = TICK_ETERNITY;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200243
Willy Tarreaud8ccffe2010-09-07 16:16:50 +0200244 if (si->release)
245 si->release(si);
246 }
Willy Tarreau4a36b562012-08-06 19:31:45 +0200247 else if (si->flags & SI_FL_NOHALF) {
248 /* we want to immediately forward this close to the write side */
249 return stream_int_shutw(si);
250 }
251 else if (conn->ctrl) {
252 /* we want the caller to disable polling on this FD */
253 return 1;
254 }
Willy Tarreau0bd05ea2010-07-02 11:18:03 +0200255
Willy Tarreau4a36b562012-08-06 19:31:45 +0200256 /* note that if the task exists, it must unregister itself once it runs */
257 if (!conn->ctrl && !(si->flags & SI_FL_DONT_WAKE) && si->owner)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200258 task_wakeup(si->owner, TASK_WOKEN_IO);
Willy Tarreau4a36b562012-08-06 19:31:45 +0200259 return 0;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200260}
261
Willy Tarreau4a36b562012-08-06 19:31:45 +0200262/*
263 * This function performs a shutdown-write on a stream interface in a connected or
264 * init state (it does nothing for other states). It either shuts the write side
265 * or marks itself as closed. The buffer flags are updated to reflect the new state.
266 * It does also close everything if the SI was marked as being in error state. If
267 * there is a data-layer shutdown, it is called. If a control layer is defined, then
268 * it is supposed to be a socket layer and file descriptors are then shutdown or
269 * closed accordingly. If no control layer is defined, then the SI is supposed to
270 * be an embedded one and the owner task is woken up if it exists. The function
271 * does not disable polling on the FD by itself, it returns non-zero instead if
272 * the caller needs to do so (except when the FD is deleted where this is implicit).
273 */
274int stream_int_shutw(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200275{
Willy Tarreau4a36b562012-08-06 19:31:45 +0200276 struct connection *conn = &si->conn;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200277
278 si->ob->flags &= ~BF_SHUTW_NOW;
279 if (si->ob->flags & BF_SHUTW)
Willy Tarreau4a36b562012-08-06 19:31:45 +0200280 return 0;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200281 si->ob->flags |= BF_SHUTW;
282 si->ob->wex = TICK_ETERNITY;
283 si->flags &= ~SI_FL_WAIT_DATA;
284
285 switch (si->state) {
286 case SI_ST_EST:
Willy Tarreau4a36b562012-08-06 19:31:45 +0200287 /* we have to shut before closing, otherwise some short messages
288 * may never leave the system, especially when there are remaining
289 * unread data in the socket input buffer, or when nolinger is set.
290 * However, if SI_FL_NOLINGER is explicitly set, we know there is
291 * no risk so we close both sides immediately.
292 */
293 if (si->flags & SI_FL_ERR) {
294 /* quick close, the socket is already shut. Remove pending flags. */
295 si->flags &= ~SI_FL_NOLINGER;
296 } else if (si->flags & SI_FL_NOLINGER) {
297 si->flags &= ~SI_FL_NOLINGER;
298 if (conn->ctrl) {
299 setsockopt(si_fd(si), SOL_SOCKET, SO_LINGER,
300 (struct linger *) &nolinger, sizeof(struct linger));
301 }
302 /* unclean data-layer shutdown */
303 if (conn->data && conn->data->shutw)
304 conn->data->shutw(conn, 0);
305 } else {
306 /* clean data-layer shutdown */
307 if (conn->data && conn->data->shutw)
308 conn->data->shutw(conn, 1);
309
310 if (!(si->flags & SI_FL_NOHALF)) {
311 /* We shutdown transport layer */
312 if (conn->ctrl)
313 shutdown(si_fd(si), SHUT_WR);
314
315 if (!(si->ib->flags & (BF_SHUTR|BF_DONT_READ))) {
316 /* OK just a shutw, but we want the caller
317 * to disable polling on this FD if exists.
318 */
319 return !!conn->ctrl;
320 }
321 }
322 }
Willy Tarreaufb90d942009-09-05 20:57:35 +0200323
324 /* fall through */
325 case SI_ST_CON:
Willy Tarreau4a36b562012-08-06 19:31:45 +0200326 /* we may have to close a pending connection, and mark the
327 * response buffer as shutr
328 */
329 conn_data_close(&si->conn);
330 if (conn->ctrl)
331 fd_delete(si_fd(si));
332 /* fall through */
Willy Tarreaufb90d942009-09-05 20:57:35 +0200333 case SI_ST_CER:
Willy Tarreau32d3ee92010-12-29 14:03:02 +0100334 case SI_ST_QUE:
335 case SI_ST_TAR:
Willy Tarreaufb90d942009-09-05 20:57:35 +0200336 si->state = SI_ST_DIS;
Willy Tarreaud8ccffe2010-09-07 16:16:50 +0200337
338 if (si->release)
339 si->release(si);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200340 default:
341 si->flags &= ~SI_FL_WAIT_ROOM;
342 si->ib->flags |= BF_SHUTR;
343 si->ib->rex = TICK_ETERNITY;
344 si->exp = TICK_ETERNITY;
345 }
346
Willy Tarreau4a36b562012-08-06 19:31:45 +0200347 /* note that if the task exists, it must unregister itself once it runs */
348 if (!conn->ctrl && !(si->flags & SI_FL_DONT_WAKE) && si->owner)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200349 task_wakeup(si->owner, TASK_WOKEN_IO);
Willy Tarreau4a36b562012-08-06 19:31:45 +0200350 return 0;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200351}
352
353/* default chk_rcv function for scheduled tasks */
Willy Tarreauf873d752012-05-11 17:47:17 +0200354static void stream_int_chk_rcv(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200355{
Willy Tarreau7421efb2012-07-02 15:11:27 +0200356 struct channel *ib = si->ib;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200357
358 DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
359 __FUNCTION__,
360 si, si->state, si->ib->flags, si->ob->flags);
361
362 if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
363 return;
364
Willy Tarreauf1ba4b32009-10-17 14:37:52 +0200365 if (ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) {
Willy Tarreaufb90d942009-09-05 20:57:35 +0200366 /* stop reading */
Willy Tarreauf1ba4b32009-10-17 14:37:52 +0200367 if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200368 si->flags |= SI_FL_WAIT_ROOM;
369 }
370 else {
371 /* (re)start reading */
372 si->flags &= ~SI_FL_WAIT_ROOM;
373 if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
374 task_wakeup(si->owner, TASK_WOKEN_IO);
375 }
376}
377
378/* default chk_snd function for scheduled tasks */
Willy Tarreauf873d752012-05-11 17:47:17 +0200379static void stream_int_chk_snd(struct stream_interface *si)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200380{
Willy Tarreau7421efb2012-07-02 15:11:27 +0200381 struct channel *ob = si->ob;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200382
383 DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
384 __FUNCTION__,
385 si, si->state, si->ib->flags, si->ob->flags);
386
387 if (unlikely(si->state != SI_ST_EST || (si->ob->flags & BF_SHUTW)))
388 return;
389
390 if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
391 (ob->flags & BF_OUT_EMPTY)) /* called with nothing to send ! */
392 return;
393
394 /* Otherwise there are remaining data to be sent in the buffer,
395 * so we tell the handler.
396 */
397 si->flags &= ~SI_FL_WAIT_DATA;
398 if (!tick_isset(ob->wex))
399 ob->wex = tick_add_ifset(now_ms, ob->wto);
400
401 if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
402 task_wakeup(si->owner, TASK_WOKEN_IO);
403}
404
Willy Tarreaub24281b2011-02-13 13:16:36 +0100405/* Register an applet to handle a stream_interface as part of the stream
Willy Tarreaufb90d942009-09-05 20:57:35 +0200406 * interface's owner task, which is returned. The SI will wake it up everytime
Willy Tarreaub24281b2011-02-13 13:16:36 +0100407 * it is solicited. The task's processing function must call the applet's
Willy Tarreaufb90d942009-09-05 20:57:35 +0200408 * function before returning. It must be deleted by the task handler using
Willy Tarreaub24281b2011-02-13 13:16:36 +0100409 * stream_int_unregister_handler(), possibly from within the function itself.
Willy Tarreaufa6bac62012-05-31 14:16:59 +0200410 * It also pre-initializes applet.state to zero and the connection context
411 * to NULL.
Willy Tarreaufb90d942009-09-05 20:57:35 +0200412 */
Willy Tarreaub24281b2011-02-13 13:16:36 +0100413struct task *stream_int_register_handler(struct stream_interface *si, struct si_applet *app)
Willy Tarreaufb90d942009-09-05 20:57:35 +0200414{
Simon Horman7abd00d2011-08-13 08:03:51 +0900415 DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si->owner);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200416
Willy Tarreau5c979a92012-05-07 17:15:39 +0200417 stream_interface_prepare(si, &stream_int_embedded);
Willy Tarreau73b013b2012-05-21 16:31:45 +0200418 si->conn.ctrl = NULL;
Willy Tarreau9e000c62011-03-10 14:03:36 +0100419 set_target_applet(&si->target, app);
Aman Gupta9a13e842012-04-02 18:57:53 -0700420 si->release = app->release;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200421 si->flags |= SI_FL_WAIT_DATA;
422 return si->owner;
423}
424
425/* Register a function to handle a stream_interface as a standalone task. The
426 * new task itself is returned and is assigned as si->owner. The stream_interface
427 * pointer will be pointed to by the task's context. The handler can be detached
428 * by using stream_int_unregister_handler().
Willy Tarreau7c0a1512011-03-10 11:17:02 +0100429 * FIXME: the code should be updated to ensure that we don't change si->owner
430 * anymore as this is not needed. However, process_session still relies on it.
Willy Tarreaufb90d942009-09-05 20:57:35 +0200431 */
432struct task *stream_int_register_handler_task(struct stream_interface *si,
433 struct task *(*fct)(struct task *))
434{
435 struct task *t;
436
437 DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner);
438
Willy Tarreau5c979a92012-05-07 17:15:39 +0200439 stream_interface_prepare(si, &stream_int_task);
Willy Tarreau73b013b2012-05-21 16:31:45 +0200440 si->conn.ctrl = NULL;
Willy Tarreau9e000c62011-03-10 14:03:36 +0100441 clear_target(&si->target);
Willy Tarreau0bd05ea2010-07-02 11:18:03 +0200442 si->release = NULL;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200443 si->flags |= SI_FL_WAIT_DATA;
444
445 t = task_new();
446 si->owner = t;
447 if (!t)
448 return t;
Willy Tarreau7c0a1512011-03-10 11:17:02 +0100449
Willy Tarreau9e000c62011-03-10 14:03:36 +0100450 set_target_task(&si->target, t);
Willy Tarreau7c0a1512011-03-10 11:17:02 +0100451
Willy Tarreaufb90d942009-09-05 20:57:35 +0200452 t->process = fct;
453 t->context = si;
454 task_wakeup(si->owner, TASK_WOKEN_INIT);
455
456 return t;
457}
458
459/* Unregister a stream interface handler. This must be called by the handler task
460 * itself when it detects that it is in the SI_ST_DIS state. This function can
461 * both detach standalone handlers and embedded handlers.
462 */
463void stream_int_unregister_handler(struct stream_interface *si)
464{
Willy Tarreau7c0a1512011-03-10 11:17:02 +0100465 if (si->target.type == TARG_TYPE_TASK) {
Willy Tarreaufb90d942009-09-05 20:57:35 +0200466 /* external handler : kill the task */
Willy Tarreau7c0a1512011-03-10 11:17:02 +0100467 task_delete(si->target.ptr.t);
468 task_free(si->target.ptr.t);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200469 }
Willy Tarreau0bd05ea2010-07-02 11:18:03 +0200470 si->release = NULL;
Willy Tarreaufb90d942009-09-05 20:57:35 +0200471 si->owner = NULL;
Willy Tarreau9e000c62011-03-10 14:03:36 +0100472 clear_target(&si->target);
Willy Tarreaufb90d942009-09-05 20:57:35 +0200473}
474
Willy Tarreau2c6be842012-07-06 17:12:34 +0200475/* This callback is used to send a valid PROXY protocol line to a socket being
Willy Tarreauafad0e02012-08-09 14:45:22 +0200476 * established. It returns 0 if it fails in a fatal way or needs to poll to go
477 * further, otherwise it returns non-zero and removes itself from the connection's
478 * flags (the bit is provided in <flag> by the caller).
Willy Tarreau2c6be842012-07-06 17:12:34 +0200479 */
480int conn_si_send_proxy(struct connection *conn, unsigned int flag)
481{
482 int fd = conn->t.sock.fd;
483 struct stream_interface *si = container_of(conn, struct stream_interface, conn);
Willy Tarreau7421efb2012-07-02 15:11:27 +0200484 struct channel *b = si->ob;
Willy Tarreau2c6be842012-07-06 17:12:34 +0200485
486 /* we might have been called just after an asynchronous shutw */
487 if (b->flags & BF_SHUTW)
488 goto out_error;
489
490 /* If we have a PROXY line to send, we'll use this to validate the
491 * connection, in which case the connection is validated only once
492 * we've sent the whole proxy line. Otherwise we use connect().
493 */
494 if (si->send_proxy_ofs) {
495 int ret;
496
497 /* The target server expects a PROXY line to be sent first.
498 * If the send_proxy_ofs is negative, it corresponds to the
499 * offset to start sending from then end of the proxy string
500 * (which is recomputed every time since it's constant). If
501 * it is positive, it means we have to send from the start.
502 */
503 ret = make_proxy_line(trash, trashlen, &b->prod->addr.from, &b->prod->addr.to);
504 if (!ret)
505 goto out_error;
506
507 if (si->send_proxy_ofs > 0)
508 si->send_proxy_ofs = -ret; /* first call */
509
510 /* we have to send trash from (ret+sp for -sp bytes) */
511 ret = send(fd, trash + ret + si->send_proxy_ofs, -si->send_proxy_ofs,
512 (b->flags & BF_OUT_EMPTY) ? 0 : MSG_MORE);
513
514 if (ret == 0)
515 goto out_wait;
516
517 if (ret < 0) {
518 if (errno == EAGAIN)
519 goto out_wait;
520 goto out_error;
521 }
522
523 si->send_proxy_ofs += ret; /* becomes zero once complete */
524 if (si->send_proxy_ofs != 0)
525 goto out_wait;
526
527 /* OK we've sent the whole line, we're connected */
528 }
529
530 /* The FD is ready now, simply return and let the connection handler
531 * notify upper layers if needed.
532 */
533 if (conn->flags & CO_FL_WAIT_L4_CONN)
534 conn->flags &= ~CO_FL_WAIT_L4_CONN;
535 b->flags |= BF_WRITE_NULL;
536 si->exp = TICK_ETERNITY;
Willy Tarreau2c6be842012-07-06 17:12:34 +0200537 conn->flags &= ~flag;
Willy Tarreauafad0e02012-08-09 14:45:22 +0200538 return 1;
Willy Tarreau2c6be842012-07-06 17:12:34 +0200539
540 out_error:
Willy Tarreauafad0e02012-08-09 14:45:22 +0200541 /* Write error on the file descriptor */
Willy Tarreau2c6be842012-07-06 17:12:34 +0200542 conn->flags |= CO_FL_ERROR;
Willy Tarreauafad0e02012-08-09 14:45:22 +0200543 conn->flags &= ~flag;
Willy Tarreau2c6be842012-07-06 17:12:34 +0200544 fdtab[fd].ev &= ~FD_POLL_STICKY;
Willy Tarreauf9dabec2012-08-17 17:33:53 +0200545 conn_sock_stop_both(conn);
Willy Tarreauafad0e02012-08-09 14:45:22 +0200546 return 0;
Willy Tarreau2c6be842012-07-06 17:12:34 +0200547
548 out_wait:
Willy Tarreauf9dabec2012-08-17 17:33:53 +0200549 conn_sock_stop_recv(conn);
550 conn_sock_poll_send(conn);
Willy Tarreauafad0e02012-08-09 14:45:22 +0200551 return 0;
Willy Tarreau2c6be842012-07-06 17:12:34 +0200552}
553
Willy Tarreau100c4672012-08-20 12:06:26 +0200554/* Callback to be used by connection I/O handlers upon completion. It differs from
555 * the function below in that it is designed to be called by lower layers after I/O
556 * events have been completed. It will also try to wake the associated task up if
557 * an important event requires special handling.
558 */
559void conn_notify_si(struct connection *conn)
Willy Tarreaufd31e532012-07-23 18:24:25 +0200560{
561 int fd = conn->t.sock.fd;
562 struct stream_interface *si = container_of(conn, struct stream_interface, conn);
563
564 DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n",
565 __FUNCTION__,
566 si, si->state, si->ib->flags, si->ob->flags);
567
Willy Tarreau3c55ec22012-07-23 19:19:51 +0200568 if (conn->flags & CO_FL_ERROR)
569 si->flags |= SI_FL_ERR;
570
Willy Tarreau8f8c92f2012-07-23 19:45:44 +0200571 /* check for recent connection establishment */
Willy Tarreauc76ae332012-07-12 15:32:13 +0200572 if (unlikely(!(conn->flags & (CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN | CO_FL_CONNECTED)))) {
Willy Tarreau8f8c92f2012-07-23 19:45:44 +0200573 si->exp = TICK_ETERNITY;
574 si->ob->flags |= BF_WRITE_NULL;
575 }
576
Willy Tarreaufd31e532012-07-23 18:24:25 +0200577 /* process consumer side, only once if possible */
578 if (fdtab[fd].ev & (FD_POLL_OUT | FD_POLL_ERR)) {
579 if (si->ob->flags & BF_OUT_EMPTY) {
580 if (((si->ob->flags & (BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == BF_SHUTW_NOW) &&
581 (si->state == SI_ST_EST))
Willy Tarreau4a36b562012-08-06 19:31:45 +0200582 stream_int_shutw(si);
Willy Tarreauf9dabec2012-08-17 17:33:53 +0200583 conn_data_stop_send(conn);
Willy Tarreaufd31e532012-07-23 18:24:25 +0200584 si->ob->wex = TICK_ETERNITY;
585 }
586
587 if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0)
588 si->flags |= SI_FL_WAIT_DATA;
589
590 if (si->ob->flags & BF_WRITE_ACTIVITY) {
591 /* update timeouts if we have written something */
592 if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
593 if (tick_isset(si->ob->wex))
594 si->ob->wex = tick_add_ifset(now_ms, si->ob->wto);
595
596 if (!(si->flags & SI_FL_INDEP_STR))
597 if (tick_isset(si->ib->rex))
598 si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
599
600 if (likely((si->ob->flags & (BF_SHUTW|BF_WRITE_PARTIAL|BF_FULL|BF_DONT_READ)) == BF_WRITE_PARTIAL &&
601 (si->ob->prod->flags & SI_FL_WAIT_ROOM)))
602 si_chk_rcv(si->ob->prod);
603 }
604 }
605
606 /* process producer side, only once if possible */
607 if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) {
608 /* We might have some data the consumer is waiting for.
609 * We can do fast-forwarding, but we avoid doing this for partial
610 * buffers, because it is very likely that it will be done again
611 * immediately afterwards once the following data is parsed (eg:
612 * HTTP chunking).
613 */
614 if (((si->ib->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
615 (si->ib->pipe /* always try to send spliced data */ ||
Willy Tarreau572bf902012-07-02 17:01:20 +0200616 (si->ib->buf.i == 0 && (si->ib->cons->flags & SI_FL_WAIT_DATA)))) {
Willy Tarreaufd31e532012-07-23 18:24:25 +0200617 int last_len = si->ib->pipe ? si->ib->pipe->data : 0;
618
619 si_chk_snd(si->ib->cons);
620
621 /* check if the consumer has freed some space */
622 if (!(si->ib->flags & BF_FULL) &&
623 (!last_len || !si->ib->pipe || si->ib->pipe->data < last_len))
624 si->flags &= ~SI_FL_WAIT_ROOM;
625 }
626
627 if (si->flags & SI_FL_WAIT_ROOM) {
Willy Tarreauf9dabec2012-08-17 17:33:53 +0200628 conn_data_stop_recv(conn);
Willy Tarreaufd31e532012-07-23 18:24:25 +0200629 si->ib->rex = TICK_ETERNITY;
630 }
631 else if ((si->ib->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
632 if (tick_isset(si->ib->rex))
633 si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
634 }
635 }
636
637 /* wake the task up only when needed */
638 if (/* changes on the production side */
639 (si->ib->flags & (BF_READ_NULL|BF_READ_ERROR)) ||
640 si->state != SI_ST_EST ||
641 (si->flags & SI_FL_ERR) ||
642 ((si->ib->flags & BF_READ_PARTIAL) &&
643 (!si->ib->to_forward || si->ib->cons->state != SI_ST_EST)) ||
644
645 /* changes on the consumption side */
646 (si->ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR)) ||
647 ((si->ob->flags & BF_WRITE_ACTIVITY) &&
648 ((si->ob->flags & BF_SHUTW) ||
649 si->ob->prod->state != SI_ST_EST ||
650 ((si->ob->flags & BF_OUT_EMPTY) && !si->ob->to_forward)))) {
651 task_wakeup(si->owner, TASK_WOKEN_IO);
652 }
653 if (si->ib->flags & BF_READ_ACTIVITY)
654 si->ib->flags &= ~BF_READ_DONTWAIT;
655}
Willy Tarreau2c6be842012-07-06 17:12:34 +0200656
Willy Tarreau100c4672012-08-20 12:06:26 +0200657/* Updates the timers and flags of a stream interface attached to a connection,
658 * depending on the buffers' flags. It should only be called once after the
659 * buffer flags have settled down, and before they are cleared. It doesn't
660 * harm to call it as often as desired (it just slightly hurts performance).
661 * It is only meant to be called by upper layers after buffer flags have been
662 * manipulated by analysers.
663 */
664void stream_int_update_conn(struct stream_interface *si)
665{
Willy Tarreau7421efb2012-07-02 15:11:27 +0200666 struct channel *ib = si->ib;
667 struct channel *ob = si->ob;
Willy Tarreau100c4672012-08-20 12:06:26 +0200668
669 if (si->conn.flags & CO_FL_HANDSHAKE) {
670 /* a handshake is in progress */
671 si->flags &= ~SI_FL_WAIT_DATA;
672 return;
673 }
674
675 /* Check if we need to close the read side */
676 if (!(ib->flags & BF_SHUTR)) {
677 /* Read not closed, update FD status and timeout for reads */
678 if (ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) {
679 /* stop reading */
680 if (!(si->flags & SI_FL_WAIT_ROOM)) {
681 if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
682 si->flags |= SI_FL_WAIT_ROOM;
683 conn_data_stop_recv(&si->conn);
684 ib->rex = TICK_ETERNITY;
685 }
686 }
687 else {
688 /* (re)start reading and update timeout. Note: we don't recompute the timeout
689 * everytime we get here, otherwise it would risk never to expire. We only
690 * update it if is was not yet set. The stream socket handler will already
691 * have updated it if there has been a completed I/O.
692 */
693 si->flags &= ~SI_FL_WAIT_ROOM;
694 conn_data_want_recv(&si->conn);
695 if (!(ib->flags & (BF_READ_NOEXP|BF_DONT_READ)) && !tick_isset(ib->rex))
696 ib->rex = tick_add_ifset(now_ms, ib->rto);
697 }
698 }
699
700 /* Check if we need to close the write side */
701 if (!(ob->flags & BF_SHUTW)) {
702 /* Write not closed, update FD status and timeout for writes */
703 if (ob->flags & BF_OUT_EMPTY) {
704 /* stop writing */
705 if (!(si->flags & SI_FL_WAIT_DATA)) {
706 if ((ob->flags & (BF_FULL|BF_HIJACK|BF_SHUTW_NOW)) == 0)
707 si->flags |= SI_FL_WAIT_DATA;
708 conn_data_stop_send(&si->conn);
709 ob->wex = TICK_ETERNITY;
710 }
711 }
712 else {
713 /* (re)start writing and update timeout. Note: we don't recompute the timeout
714 * everytime we get here, otherwise it would risk never to expire. We only
715 * update it if is was not yet set. The stream socket handler will already
716 * have updated it if there has been a completed I/O.
717 */
718 si->flags &= ~SI_FL_WAIT_DATA;
719 conn_data_want_send(&si->conn);
720 if (!tick_isset(ob->wex)) {
721 ob->wex = tick_add_ifset(now_ms, ob->wto);
722 if (tick_isset(ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
723 /* Note: depending on the protocol, we don't know if we're waiting
724 * for incoming data or not. So in order to prevent the socket from
725 * expiring read timeouts during writes, we refresh the read timeout,
726 * except if it was already infinite or if we have explicitly setup
727 * independent streams.
728 */
729 ib->rex = tick_add_ifset(now_ms, ib->rto);
730 }
731 }
732 }
733 }
734}
735
Willy Tarreau46a8d922012-08-20 12:38:36 +0200736/* This function is used for inter-stream-interface calls. It is called by the
737 * consumer to inform the producer side that it may be interested in checking
738 * for free space in the buffer. Note that it intentionally does not update
739 * timeouts, so that we can still check them later at wake-up. This function is
740 * dedicated to connection-based stream interfaces.
741 */
742void stream_int_chk_rcv_conn(struct stream_interface *si)
743{
Willy Tarreau7421efb2012-07-02 15:11:27 +0200744 struct channel *ib = si->ib;
Willy Tarreau46a8d922012-08-20 12:38:36 +0200745
746 if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR)))
747 return;
748
749 if (si->conn.flags & CO_FL_HANDSHAKE) {
750 /* a handshake is in progress */
751 return;
752 }
753
754 if (ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) {
755 /* stop reading */
756 if ((ib->flags & (BF_FULL|BF_HIJACK|BF_DONT_READ)) == BF_FULL)
757 si->flags |= SI_FL_WAIT_ROOM;
758 conn_data_stop_recv(&si->conn);
759 }
760 else {
761 /* (re)start reading */
762 si->flags &= ~SI_FL_WAIT_ROOM;
763 conn_data_want_recv(&si->conn);
764 }
765}
766
767
Willy Tarreaude5722c2012-08-20 15:01:10 +0200768/* This function is used for inter-stream-interface calls. It is called by the
769 * producer to inform the consumer side that it may be interested in checking
770 * for data in the buffer. Note that it intentionally does not update timeouts,
771 * so that we can still check them later at wake-up.
772 */
773void stream_int_chk_snd_conn(struct stream_interface *si)
774{
Willy Tarreau7421efb2012-07-02 15:11:27 +0200775 struct channel *ob = si->ob;
Willy Tarreaude5722c2012-08-20 15:01:10 +0200776
777 if (unlikely(si->state != SI_ST_EST || (ob->flags & BF_SHUTW)))
778 return;
779
780 /* handshake running on producer */
781 if (si->conn.flags & CO_FL_HANDSHAKE) {
782 /* a handshake is in progress */
783 si->flags &= ~SI_FL_WAIT_DATA;
784 return;
785 }
786
787 if (unlikely((ob->flags & BF_OUT_EMPTY))) /* called with nothing to send ! */
788 return;
789
790 if (!ob->pipe && /* spliced data wants to be forwarded ASAP */
791 (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
792 (fdtab[si_fd(si)].ev & FD_POLL_OUT))) /* we'll be called anyway */
793 return;
794
795 if (conn_data_snd_buf(&si->conn) < 0) {
796 /* Write error on the file descriptor. We mark the FD as STERROR so
797 * that we don't use it anymore and we notify the task.
798 */
799 fdtab[si_fd(si)].ev &= ~FD_POLL_STICKY;
800 conn_data_stop_both(&si->conn);
801 si->flags |= SI_FL_ERR;
802 si->conn.flags |= CO_FL_ERROR;
803 goto out_wakeup;
804 }
805
806 /* OK, so now we know that some data might have been sent, and that we may
807 * have to poll first. We have to do that too if the buffer is not empty.
808 */
809 if (ob->flags & BF_OUT_EMPTY) {
810 /* the connection is established but we can't write. Either the
811 * buffer is empty, or we just refrain from sending because the
812 * ->o limit was reached. Maybe we just wrote the last
813 * chunk and need to close.
814 */
815 if (((ob->flags & (BF_SHUTW|BF_HIJACK|BF_AUTO_CLOSE|BF_SHUTW_NOW)) ==
816 (BF_AUTO_CLOSE|BF_SHUTW_NOW)) &&
817 (si->state == SI_ST_EST)) {
818 si_shutw(si);
819 goto out_wakeup;
820 }
821
822 if ((ob->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_FULL|BF_HIJACK)) == 0)
823 si->flags |= SI_FL_WAIT_DATA;
824 ob->wex = TICK_ETERNITY;
825 }
826 else {
827 /* Otherwise there are remaining data to be sent in the buffer,
828 * which means we have to poll before doing so.
829 */
830 conn_data_want_send(&si->conn);
831 si->flags &= ~SI_FL_WAIT_DATA;
832 if (!tick_isset(ob->wex))
833 ob->wex = tick_add_ifset(now_ms, ob->wto);
834 }
835
836 if (likely(ob->flags & BF_WRITE_ACTIVITY)) {
837 /* update timeout if we have written something */
838 if ((ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_WRITE_PARTIAL)) == BF_WRITE_PARTIAL)
839 ob->wex = tick_add_ifset(now_ms, ob->wto);
840
841 if (tick_isset(si->ib->rex) && !(si->flags & SI_FL_INDEP_STR)) {
842 /* Note: to prevent the client from expiring read timeouts
843 * during writes, we refresh it. We only do this if the
844 * interface is not configured for "independent streams",
845 * because for some applications it's better not to do this,
846 * for instance when continuously exchanging small amounts
847 * of data which can full the socket buffers long before a
848 * write timeout is detected.
849 */
850 si->ib->rex = tick_add_ifset(now_ms, si->ib->rto);
851 }
852 }
853
854 /* in case of special condition (error, shutdown, end of write...), we
855 * have to notify the task.
856 */
857 if (likely((ob->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
858 ((ob->flags & BF_OUT_EMPTY) && !ob->to_forward) ||
859 si->state != SI_ST_EST)) {
860 out_wakeup:
861 if (!(si->flags & SI_FL_DONT_WAKE) && si->owner)
862 task_wakeup(si->owner, TASK_WOKEN_IO);
863 }
864}
865
Willy Tarreaueecf6ca2012-08-20 15:09:53 +0200866/*
Willy Tarreauce323de2012-08-20 21:41:06 +0200867 * This is the callback which is called by the connection layer to receive data
868 * into the buffer from the connection. It iterates over the data layer's rcv_buf
869 * function.
870 */
871void si_conn_recv_cb(struct connection *conn)
872{
873 struct stream_interface *si = container_of(conn, struct stream_interface, conn);
874 struct channel *b = si->ib;
875 int ret, max, cur_read;
876 int read_poll = MAX_READ_POLL_LOOPS;
877
878 /* stop immediately on errors. Note that we DON'T want to stop on
879 * POLL_ERR, as the poller might report a write error while there
880 * are still data available in the recv buffer. This typically
881 * happens when we send too large a request to a backend server
882 * which rejects it before reading it all.
883 */
884 if (conn->flags & CO_FL_ERROR)
885 goto out_error;
886
887 /* stop here if we reached the end of data */
888 if (conn_data_read0_pending(conn))
889 goto out_shutdown_r;
890
891 /* maybe we were called immediately after an asynchronous shutr */
892 if (b->flags & BF_SHUTR)
893 return;
894
895#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
896 if (b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) {
897
898 /* Under Linux, if FD_POLL_HUP is set, we have reached the end.
899 * Since older splice() implementations were buggy and returned
900 * EAGAIN on end of read, let's bypass the call to splice() now.
901 */
902 if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
903 goto out_shutdown_r;
904
905 if (sock_raw_splice_in(b, si) >= 0) {
906 if (si->flags & SI_FL_ERR)
907 goto out_error;
908 if (b->flags & BF_READ_NULL)
909 goto out_shutdown_r;
910 return;
911 }
912 /* splice not possible (anymore), let's go on on standard copy */
913 }
914#endif
915 cur_read = 0;
916 conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
917 while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
918 max = bi_avail(b);
919
920 if (!max) {
921 b->flags |= BF_FULL;
922 si->flags |= SI_FL_WAIT_ROOM;
923 break;
924 }
925
926 ret = conn->data->rcv_buf(conn, &b->buf, max);
927 if (ret <= 0)
928 break;
929
930 cur_read += ret;
931
932 /* if we're allowed to directly forward data, we must update ->o */
933 if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) {
934 unsigned long fwd = ret;
935 if (b->to_forward != BUF_INFINITE_FORWARD) {
936 if (fwd > b->to_forward)
937 fwd = b->to_forward;
938 b->to_forward -= fwd;
939 }
940 b_adv(b, fwd);
941 }
942
943 if (conn->flags & CO_FL_WAIT_L4_CONN)
944 conn->flags &= ~CO_FL_WAIT_L4_CONN;
945
946 b->flags |= BF_READ_PARTIAL;
947 b->total += ret;
948
949 if (bi_full(b)) {
950 /* The buffer is now full, there's no point in going through
951 * the loop again.
952 */
953 if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) {
954 b->xfer_small = 0;
955 b->xfer_large++;
956 if (b->xfer_large >= 3) {
957 /* we call this buffer a fast streamer if it manages
958 * to be filled in one call 3 consecutive times.
959 */
960 b->flags |= (BF_STREAMER | BF_STREAMER_FAST);
961 //fputc('+', stderr);
962 }
963 }
964 else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
965 (cur_read <= b->buf.size / 2)) {
966 b->xfer_large = 0;
967 b->xfer_small++;
968 if (b->xfer_small >= 2) {
969 /* if the buffer has been at least half full twice,
970 * we receive faster than we send, so at least it
971 * is not a "fast streamer".
972 */
973 b->flags &= ~BF_STREAMER_FAST;
974 //fputc('-', stderr);
975 }
976 }
977 else {
978 b->xfer_small = 0;
979 b->xfer_large = 0;
980 }
981
982 b->flags |= BF_FULL;
983 si->flags |= SI_FL_WAIT_ROOM;
984 break;
985 }
986
987 if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0)
988 break;
989
990 /* if too many bytes were missing from last read, it means that
991 * it's pointless trying to read again because the system does
992 * not have them in buffers.
993 */
994 if (ret < max) {
995 if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
996 (cur_read <= b->buf.size / 2)) {
997 b->xfer_large = 0;
998 b->xfer_small++;
999 if (b->xfer_small >= 3) {
1000 /* we have read less than half of the buffer in
1001 * one pass, and this happened at least 3 times.
1002 * This is definitely not a streamer.
1003 */
1004 b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST);
1005 //fputc('!', stderr);
1006 }
1007 }
1008
1009 /* if a streamer has read few data, it may be because we
1010 * have exhausted system buffers. It's not worth trying
1011 * again.
1012 */
1013 if (b->flags & BF_STREAMER)
1014 break;
1015
1016 /* if we read a large block smaller than what we requested,
1017 * it's almost certain we'll never get anything more.
1018 */
1019 if (ret >= global.tune.recv_enough)
1020 break;
1021 }
1022 } /* while !flags */
1023
1024 if (conn->flags & CO_FL_WAIT_DATA) {
1025 /* we don't automatically ask for polling if we have
1026 * read enough data, as it saves some syscalls with
1027 * speculative pollers.
1028 */
1029 if (cur_read < MIN_RET_FOR_READ_LOOP)
1030 __conn_data_poll_recv(conn);
1031 else
1032 __conn_data_want_recv(conn);
1033 }
1034
1035 if (conn->flags & CO_FL_ERROR)
1036 goto out_error;
1037
1038 if (conn_data_read0_pending(conn))
1039 /* connection closed */
1040 goto out_shutdown_r;
1041
1042 return;
1043
1044 out_shutdown_r:
1045 /* we received a shutdown */
1046 b->flags |= BF_READ_NULL;
1047 if (b->flags & BF_AUTO_CLOSE)
1048 buffer_shutw_now(b);
1049 stream_sock_read0(si);
1050 conn_data_read0(conn);
1051 return;
1052
1053 out_error:
1054 /* Read error on the connection, report the error and stop I/O */
1055 conn->flags |= CO_FL_ERROR;
1056 conn_data_stop_both(conn);
1057}
1058
1059/*
Willy Tarreaueecf6ca2012-08-20 15:09:53 +02001060 * This is the callback which is called by the connection layer to send data
1061 * from the buffer to the connection. It iterates over the data layer's snd_buf
1062 * function.
1063 */
1064void si_conn_send_cb(struct connection *conn)
1065{
1066 struct stream_interface *si = container_of(conn, struct stream_interface, conn);
Willy Tarreau7421efb2012-07-02 15:11:27 +02001067 struct channel *b = si->ob;
Willy Tarreaueecf6ca2012-08-20 15:09:53 +02001068
1069 if (conn->flags & CO_FL_ERROR)
1070 goto out_error;
1071
1072 if (si->conn.flags & CO_FL_HANDSHAKE)
1073 /* a handshake was requested */
1074 return;
1075
1076 /* we might have been called just after an asynchronous shutw */
1077 if (b->flags & BF_SHUTW)
1078 return;
1079
1080 /* OK there are data waiting to be sent */
1081 if (conn_data_snd_buf(conn) < 0)
1082 goto out_error;
1083
1084 /* OK all done */
1085 return;
1086
1087 out_error:
1088 /* Write error on the connection, report the error and stop I/O */
1089 conn->flags |= CO_FL_ERROR;
1090 conn_data_stop_both(conn);
1091}
1092
Willy Tarreau9bf9c142012-08-20 15:38:41 +02001093/*
1094 * This function propagates a null read received on a socket-based connection.
1095 * It updates the stream interface. If the stream interface has SI_FL_NOHALF,
1096 * the close is also forwarded to the write side as an abort. This function is
1097 * still socket-specific as it handles a setsockopt() call to set the SO_LINGER
1098 * state on the socket.
1099 */
1100void stream_sock_read0(struct stream_interface *si)
1101{
1102 si->ib->flags &= ~BF_SHUTR_NOW;
1103 if (si->ib->flags & BF_SHUTR)
1104 return;
1105 si->ib->flags |= BF_SHUTR;
1106 si->ib->rex = TICK_ETERNITY;
1107 si->flags &= ~SI_FL_WAIT_ROOM;
1108
1109 if (si->state != SI_ST_EST && si->state != SI_ST_CON)
1110 return;
1111
1112 if (si->ob->flags & BF_SHUTW)
1113 goto do_close;
1114
1115 if (si->flags & SI_FL_NOHALF) {
1116 /* we want to immediately forward this close to the write side */
1117 if (si->flags & SI_FL_NOLINGER) {
1118 si->flags &= ~SI_FL_NOLINGER;
1119 setsockopt(si_fd(si), SOL_SOCKET, SO_LINGER,
1120 (struct linger *) &nolinger, sizeof(struct linger));
1121 }
1122 /* force flag on ssl to keep session in cache */
1123 if (si->conn.data->shutw)
1124 si->conn.data->shutw(&si->conn, 0);
1125 goto do_close;
1126 }
1127
1128 /* otherwise that's just a normal read shutdown */
1129 conn_data_stop_recv(&si->conn);
1130 return;
1131
1132 do_close:
1133 conn_data_close(&si->conn);
1134 fd_delete(si_fd(si));
1135 si->state = SI_ST_DIS;
1136 si->exp = TICK_ETERNITY;
1137 if (si->release)
1138 si->release(si);
1139 return;
1140}
1141
Willy Tarreaude5722c2012-08-20 15:01:10 +02001142
Willy Tarreaudded32d2008-11-30 19:48:07 +01001143/*
Willy Tarreaucff64112008-11-03 06:26:53 +01001144 * Local variables:
1145 * c-indent-level: 8
1146 * c-basic-offset: 8
1147 * End:
1148 */