blob: e23d1d4f107095fbd684861440928d3298597888 [file] [log] [blame]
Willy Tarreau92fb9832007-10-16 17:34:28 +02001/*
2 * UNIX SOCK_STREAM protocol layer (uxst)
3 *
4 * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <ctype.h>
14#include <errno.h>
15#include <fcntl.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <syslog.h>
20#include <time.h>
21
22#include <sys/param.h>
23#include <sys/socket.h>
24#include <sys/stat.h>
25#include <sys/types.h>
26#include <sys/un.h>
27
28#include <common/compat.h>
29#include <common/config.h>
30#include <common/debug.h>
31#include <common/memory.h>
32#include <common/mini-clist.h>
33#include <common/standard.h>
34#include <common/time.h>
35#include <common/version.h>
36
37#include <types/acl.h>
38#include <types/capture.h>
39#include <types/client.h>
40#include <types/global.h>
41#include <types/polling.h>
42#include <types/proxy.h>
43#include <types/server.h>
44
45#include <proto/acl.h>
46#include <proto/backend.h>
47#include <proto/buffers.h>
48#include <proto/fd.h>
49#include <proto/log.h>
50#include <proto/protocols.h>
51#include <proto/proto_uxst.h>
52#include <proto/queue.h>
53#include <proto/session.h>
54#include <proto/stream_sock.h>
55#include <proto/task.h>
56
57#ifndef MAXPATHLEN
58#define MAXPATHLEN 128
59#endif
60
61/* This function creates a named PF_UNIX stream socket at address <path>. Note
Willy Tarreaue6ad2b12007-10-18 12:45:54 +020062 * that the path cannot be NULL nor empty. <uid> and <gid> different of -1 will
63 * be used to change the socket owner. If <mode> is not 0, it will be used to
64 * restrict access to the socket. While it is known not to be portable on every
65 * OS, it's still useful where it works.
Willy Tarreau92fb9832007-10-16 17:34:28 +020066 * It returns the assigned file descriptor, or -1 in the event of an error.
67 */
Willy Tarreaue6ad2b12007-10-18 12:45:54 +020068static int create_uxst_socket(const char *path, uid_t uid, gid_t gid, mode_t mode)
Willy Tarreau92fb9832007-10-16 17:34:28 +020069{
70 char tempname[MAXPATHLEN];
71 char backname[MAXPATHLEN];
72 struct sockaddr_un addr;
73
74 int ret, sock;
75
76 /* 1. create socket names */
77 if (!path[0]) {
78 Alert("Invalid name for a UNIX socket. Aborting.\n");
79 goto err_return;
80 }
81
82 ret = snprintf(tempname, MAXPATHLEN, "%s.%d.tmp", path, pid);
83 if (ret < 0 || ret >= MAXPATHLEN) {
84 Alert("name too long for UNIX socket. Aborting.\n");
85 goto err_return;
86 }
87
88 ret = snprintf(backname, MAXPATHLEN, "%s.%d.bak", path, pid);
89 if (ret < 0 || ret >= MAXPATHLEN) {
90 Alert("name too long for UNIX socket. Aborting.\n");
91 goto err_return;
92 }
93
94 /* 2. clean existing orphaned entries */
95 if (unlink(tempname) < 0 && errno != ENOENT) {
96 Alert("error when trying to unlink previous UNIX socket. Aborting.\n");
97 goto err_return;
98 }
99
100 if (unlink(backname) < 0 && errno != ENOENT) {
101 Alert("error when trying to unlink previous UNIX socket. Aborting.\n");
102 goto err_return;
103 }
104
105 /* 3. backup existing socket */
106 if (link(path, backname) < 0 && errno != ENOENT) {
107 Alert("error when trying to preserve previous UNIX socket. Aborting.\n");
108 goto err_return;
109 }
110
111 /* 4. prepare new socket */
112 addr.sun_family = AF_UNIX;
113 strncpy(addr.sun_path, tempname, sizeof(addr.sun_path));
114 addr.sun_path[sizeof(addr.sun_path) - 1] = 0;
115
116 sock = socket(PF_UNIX, SOCK_STREAM, 0);
117 if (sock < 0) {
118 Alert("cannot create socket for UNIX listener. Aborting.\n");
119 goto err_unlink_back;
120 }
121
122 if (sock >= global.maxsock) {
123 Alert("socket(): not enough free sockets for UNIX listener. Raise -n argument. Aborting.\n");
124 goto err_unlink_temp;
125 }
126
127 if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1) {
128 Alert("cannot make UNIX socket non-blocking. Aborting.\n");
129 goto err_unlink_temp;
130 }
131
132 if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
133 /* note that bind() creates the socket <tempname> on the file system */
134 Alert("cannot bind socket for UNIX listener. Aborting.\n");
135 goto err_unlink_temp;
136 }
137
Willy Tarreaue6ad2b12007-10-18 12:45:54 +0200138 if (((uid != -1 || gid != -1) && (chown(tempname, uid, gid) == -1)) ||
139 (mode != 0 && chmod(tempname, mode) == -1)) {
140 Alert("cannot change UNIX socket ownership. Aborting.\n");
141 goto err_unlink_temp;
142 }
143
Willy Tarreau92fb9832007-10-16 17:34:28 +0200144 if (listen(sock, 0) < 0) {
145 Alert("cannot listen to socket for UNIX listener. Aborting.\n");
146 goto err_unlink_temp;
147 }
148
149 /* 5. install.
150 * Point of no return: we are ready, we'll switch the sockets. We don't
151 * fear loosing the socket <path> because we have a copy of it in
152 * backname.
153 */
154 if (rename(tempname, path) < 0) {
155 Alert("cannot switch final and temporary sockets for UNIX listener. Aborting.\n");
156 goto err_rename;
157 }
158
159 /* 6. cleanup */
160 unlink(backname); /* no need to keep this one either */
161
162 return sock;
163
164 err_rename:
165 ret = rename(backname, path);
166 if (ret < 0 && errno == ENOENT)
167 unlink(path);
168 err_unlink_temp:
169 unlink(tempname);
170 close(sock);
171 err_unlink_back:
172 unlink(backname);
173 err_return:
174 return -1;
175}
176
177/* Tries to destroy the UNIX stream socket <path>. The socket must not be used
178 * anymore. It practises best effort, and no error is returned.
179 */
180static void destroy_uxst_socket(const char *path)
181{
182 struct sockaddr_un addr;
183 int sock, ret;
184
185 /* We might have been chrooted, so we may not be able to access the
186 * socket. In order to avoid bothering the other end, we connect with a
187 * wrong protocol, namely SOCK_DGRAM. The return code from connect()
188 * is enough to know if the socket is still live or not. If it's live
189 * in mode SOCK_STREAM, we get EPROTOTYPE or anything else but not
190 * ECONNREFUSED. In this case, we do not touch it because it's used
191 * by some other process.
192 */
193 sock = socket(PF_UNIX, SOCK_DGRAM, 0);
194 if (sock < 0)
195 return;
196
197 addr.sun_family = AF_UNIX;
198 strncpy(addr.sun_path, path, sizeof(addr.sun_path));
199 addr.sun_path[sizeof(addr.sun_path)] = 0;
200 ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
201 if (ret < 0 && errno == ECONNREFUSED) {
202 /* Connect failed: the socket still exists but is not used
203 * anymore. Let's remove this socket now.
204 */
205 unlink(path);
206 }
207 close(sock);
208}
209
210
211/* This function creates all UNIX sockets bound to the protocol entry <proto>.
212 * It is intended to be used as the protocol's bind_all() function.
213 * The sockets will be registered but not added to any fd_set, in order not to
214 * loose them across the fork(). A call to uxst_enable_listeners() is needed
215 * to complete initialization.
216 *
217 * The return value is composed from ERR_NONE, ERR_RETRYABLE and ERR_FATAL.
218 */
219static int uxst_bind_listeners(struct protocol *proto)
220{
221 struct listener *listener;
222 int err = ERR_NONE;
223 int fd;
224
225 list_for_each_entry(listener, &proto->listeners, proto_list) {
226 if (listener->state != LI_INIT)
227 continue; /* already started */
228
Willy Tarreaue6ad2b12007-10-18 12:45:54 +0200229 fd = create_uxst_socket(((struct sockaddr_un *)&listener->addr)->sun_path,
230 listener->perm.ux.uid,
231 listener->perm.ux.gid,
232 listener->perm.ux.mode);
Willy Tarreau92fb9832007-10-16 17:34:28 +0200233 if (fd == -1) {
234 err |= ERR_FATAL;
235 continue;
236 }
237
238 /* the socket is listening */
239 listener->fd = fd;
240 listener->state = LI_LISTEN;
241
242 /* the function for the accept() event */
243 fd_insert(fd);
244 fdtab[fd].cb[DIR_RD].f = listener->accept;
245 fdtab[fd].cb[DIR_WR].f = NULL; /* never called */
246 fdtab[fd].cb[DIR_RD].b = fdtab[fd].cb[DIR_WR].b = NULL;
247 fdtab[fd].owner = (struct task *)listener; /* reference the listener instead of a task */
248 fdtab[fd].state = FD_STLISTEN;
249 fdtab[fd].peeraddr = NULL;
250 fdtab[fd].peerlen = 0;
251 fdtab[fd].listener = NULL;
252 fdtab[fd].ev = 0;
253 }
254
255 return err;
256}
257
258/* This function adds the UNIX sockets file descriptors to the polling lists
259 * for all listeners in the LI_LISTEN state. It is intended to be used as the
260 * protocol's enable_all() primitive, after the fork(). It always returns
261 * ERR_NONE.
262 */
263static int uxst_enable_listeners(struct protocol *proto)
264{
265 struct listener *listener;
266
267 list_for_each_entry(listener, &proto->listeners, proto_list) {
268 if (listener->state == LI_LISTEN) {
269 EV_FD_SET(listener->fd, DIR_RD);
270 listener->state = LI_READY;
271 }
272 }
273 return ERR_NONE;
274}
275
276/* This function stops all listening UNIX sockets bound to the protocol
277 * <proto>. It does not detaches them from the protocol.
278 * It always returns ERR_NONE.
279 */
280static int uxst_unbind_listeners(struct protocol *proto)
281{
282 struct listener *listener;
283
284 list_for_each_entry(listener, &proto->listeners, proto_list) {
285 if (listener->state != LI_INIT) {
286 EV_FD_CLR(listener->fd, DIR_RD);
287 close(listener->fd);
288 listener->state = LI_INIT;
289 destroy_uxst_socket(((struct sockaddr_un *)&listener->addr)->sun_path);
290 }
291 }
292 return ERR_NONE;
293}
294
295/*
296 * This function is called on a read event from a listen socket, corresponding
297 * to an accept. It tries to accept as many connections as possible.
298 * It returns 0. Since we use UNIX sockets on the local system for monitoring
299 * purposes and other related things, we do not need to output as many messages
300 * as with TCP which can fall under attack.
301 */
302int uxst_event_accept(int fd) {
303 struct listener *l = (struct listener *)fdtab[fd].owner;
304 struct session *s;
305 struct task *t;
306 int cfd;
307 int max_accept;
308
309 if (global.nbproc > 1)
310 max_accept = 8; /* let other processes catch some connections too */
311 else
312 max_accept = -1;
313
314 while (max_accept--) {
315 struct sockaddr_storage addr;
316 socklen_t laddr = sizeof(addr);
317
318 if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) == -1) {
319 switch (errno) {
320 case EAGAIN:
321 case EINTR:
322 case ECONNABORTED:
323 return 0; /* nothing more to accept */
324 case ENFILE:
325 /* Process reached system FD limit. Check system tunables. */
326 return 0;
327 case EMFILE:
328 /* Process reached process FD limit. Check 'ulimit-n'. */
329 return 0;
330 case ENOBUFS:
331 case ENOMEM:
332 /* Process reached system memory limit. Check system tunables. */
333 return 0;
334 default:
335 return 0;
336 }
337 }
338
339 if (l->nbconn >= l->maxconn) {
340 /* too many connections, we shoot this one and return.
341 * FIXME: it would be better to simply switch the listener's
342 * state to LI_FULL and disable the FD. We could re-enable
343 * it upon fd_delete(), but this requires all protocols to
344 * be switched.
345 */
346 close(cfd);
347 return 0;
348 }
349
350 if ((s = pool_alloc2(pool2_session)) == NULL) {
351 Alert("out of memory in uxst_event_accept().\n");
352 close(cfd);
353 return 0;
354 }
355
356 if ((t = pool_alloc2(pool2_task)) == NULL) {
357 Alert("out of memory in uxst_event_accept().\n");
358 close(cfd);
359 pool_free2(pool2_session, s);
360 return 0;
361 }
362
363 s->cli_addr = addr;
364
365 /* FIXME: should be checked earlier */
366 if (cfd >= global.maxsock) {
367 Alert("accept(): not enough free sockets. Raise -n argument. Giving up.\n");
368 close(cfd);
369 pool_free2(pool2_task, t);
370 pool_free2(pool2_session, s);
371 return 0;
372 }
373
374 if (fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) {
375 Alert("accept(): cannot set the socket in non blocking mode. Giving up\n");
376 close(cfd);
377 pool_free2(pool2_task, t);
378 pool_free2(pool2_session, s);
379 return 0;
380 }
381
382 t->wq = NULL;
383 t->qlist.p = NULL;
384 t->state = TASK_IDLE;
385 t->process = l->handler;
386 t->context = s;
387
388 s->task = t;
389 s->fe = NULL;
390 s->be = NULL;
391
392 s->cli_state = CL_STDATA;
393 s->srv_state = SV_STIDLE;
394 s->req = s->rep = NULL; /* will be allocated later */
395
396 s->cli_fd = cfd;
397 s->srv_fd = -1;
398 s->srv = NULL;
399 s->pend_pos = NULL;
400
401 memset(&s->logs, 0, sizeof(s->logs));
402 memset(&s->txn, 0, sizeof(s->txn));
403
404 s->data_source = DATA_SRC_NONE;
405 s->uniq_id = totalconn;
406
407 if ((s->req = pool_alloc2(pool2_buffer)) == NULL) { /* no memory */
408 close(cfd); /* nothing can be done for this fd without memory */
409 pool_free2(pool2_task, t);
410 pool_free2(pool2_session, s);
411 return 0;
412 }
413
414 if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) { /* no memory */
415 pool_free2(pool2_buffer, s->req);
416 close(cfd); /* nothing can be done for this fd without memory */
417 pool_free2(pool2_task, t);
418 pool_free2(pool2_session, s);
419 return 0;
420 }
421
422 buffer_init(s->req);
423 buffer_init(s->rep);
424 s->req->rlim += BUFSIZE;
425 s->rep->rlim += BUFSIZE;
426
427 fd_insert(cfd);
428 fdtab[cfd].owner = t;
429 fdtab[cfd].listener = l;
430 fdtab[cfd].state = FD_STREADY;
431 fdtab[cfd].cb[DIR_RD].f = l->proto->read;
432 fdtab[cfd].cb[DIR_RD].b = s->req;
433 fdtab[cfd].cb[DIR_WR].f = l->proto->write;
434 fdtab[cfd].cb[DIR_WR].b = s->rep;
435 fdtab[cfd].peeraddr = (struct sockaddr *)&s->cli_addr;
436 fdtab[cfd].peerlen = sizeof(s->cli_addr);
437 fdtab[cfd].ev = 0;
438
439
440 tv_eternity(&s->req->rex);
441 tv_eternity(&s->req->wex);
442 tv_eternity(&s->req->cex);
443 tv_eternity(&s->rep->rex);
444 tv_eternity(&s->rep->wex);
445
446 tv_eternity(&s->req->wto);
447 tv_eternity(&s->req->cto);
448 tv_eternity(&s->req->rto);
449 tv_eternity(&s->rep->rto);
450 tv_eternity(&s->rep->cto);
451 tv_eternity(&s->rep->wto);
452
453 if (l->timeout)
454 s->req->rto = *l->timeout;
455
456 if (l->timeout)
457 s->rep->wto = *l->timeout;
458
459 tv_eternity(&t->expire);
460 if (l->timeout && tv_isset(l->timeout)) {
461 EV_FD_SET(cfd, DIR_RD);
462 tv_add(&s->req->rex, &now, &s->req->rto);
463 tv_add(&s->rep->wex, &now, &s->rep->wto);
464 t->expire = s->req->rex;
465 }
466
467 task_queue(t);
468 task_wakeup(t);
469
470 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
471 if (l->nbconn >= l->maxconn) {
472 EV_FD_CLR(l->fd, DIR_RD);
473 l->state = LI_FULL;
474 }
475 actconn++;
476 totalconn++;
477
478 //fprintf(stderr, "accepting from %p => %d conn, %d total, task=%p, cfd=%d, maxfd=%d\n", p, actconn, totalconn, t, cfd, maxfd);
479 } /* end of while (p->feconn < p->maxconn) */
480 //fprintf(stderr,"fct %s:%d\n", __FUNCTION__, __LINE__);
481 return 0;
482}
483
484/*
485 * manages the client FSM and its socket. It returns 1 if a state has changed
486 * (and a resync may be needed), otherwise 0.
487 */
488static int process_uxst_cli(struct session *t)
489{
490 int s = t->srv_state;
491 int c = t->cli_state;
492 struct buffer *req = t->req;
493 struct buffer *rep = t->rep;
494 //fprintf(stderr,"fct %s:%d\n", __FUNCTION__, __LINE__);
495 if (c == CL_STDATA) {
496 /* FIXME: this error handling is partly buggy because we always report
497 * a 'DATA' phase while we don't know if the server was in IDLE, CONN
498 * or HEADER phase. BTW, it's not logical to expire the client while
499 * we're waiting for the server to connect.
500 */
501 /* read or write error */
502 if (rep->flags & BF_WRITE_ERROR || req->flags & BF_READ_ERROR) {
503 buffer_shutr(req);
504 buffer_shutw(rep);
505 fd_delete(t->cli_fd);
506 t->cli_state = CL_STCLOSE;
507 if (!(t->flags & SN_ERR_MASK))
508 t->flags |= SN_ERR_CLICL;
509 if (!(t->flags & SN_FINST_MASK)) {
510 if (t->pend_pos)
511 t->flags |= SN_FINST_Q;
512 else if (s == SV_STCONN)
513 t->flags |= SN_FINST_C;
514 else
515 t->flags |= SN_FINST_D;
516 }
517 return 1;
518 }
519 /* last read, or end of server write */
520 else if (req->flags & BF_READ_NULL || s == SV_STSHUTW || s == SV_STCLOSE) {
521 EV_FD_CLR(t->cli_fd, DIR_RD);
522 buffer_shutr(req);
523 t->cli_state = CL_STSHUTR;
524 return 1;
525 }
526 /* last server read and buffer empty */
527 else if ((s == SV_STSHUTR || s == SV_STCLOSE) && (rep->l == 0)) {
528 EV_FD_CLR(t->cli_fd, DIR_WR);
529 buffer_shutw(rep);
530 shutdown(t->cli_fd, SHUT_WR);
531 /* We must ensure that the read part is still alive when switching
532 * to shutw */
533 EV_FD_SET(t->cli_fd, DIR_RD);
534 tv_add_ifset(&req->rex, &now, &req->rto);
535 t->cli_state = CL_STSHUTW;
536 //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
537 return 1;
538 }
539 /* read timeout */
540 else if (tv_isle(&req->rex, &now)) {
541 EV_FD_CLR(t->cli_fd, DIR_RD);
542 buffer_shutr(req);
543 t->cli_state = CL_STSHUTR;
544 if (!(t->flags & SN_ERR_MASK))
545 t->flags |= SN_ERR_CLITO;
546 if (!(t->flags & SN_FINST_MASK)) {
547 if (t->pend_pos)
548 t->flags |= SN_FINST_Q;
549 else if (s == SV_STCONN)
550 t->flags |= SN_FINST_C;
551 else
552 t->flags |= SN_FINST_D;
553 }
554 return 1;
555 }
556 /* write timeout */
557 else if (tv_isle(&rep->wex, &now)) {
558 EV_FD_CLR(t->cli_fd, DIR_WR);
559 buffer_shutw(rep);
560 shutdown(t->cli_fd, SHUT_WR);
561 /* We must ensure that the read part is still alive when switching
562 * to shutw */
563 EV_FD_SET(t->cli_fd, DIR_RD);
564 tv_add_ifset(&req->rex, &now, &req->rto);
565
566 t->cli_state = CL_STSHUTW;
567 if (!(t->flags & SN_ERR_MASK))
568 t->flags |= SN_ERR_CLITO;
569 if (!(t->flags & SN_FINST_MASK)) {
570 if (t->pend_pos)
571 t->flags |= SN_FINST_Q;
572 else if (s == SV_STCONN)
573 t->flags |= SN_FINST_C;
574 else
575 t->flags |= SN_FINST_D;
576 }
577 return 1;
578 }
579
580 if (req->l >= req->rlim - req->data) {
581 /* no room to read more data */
582 if (EV_FD_COND_C(t->cli_fd, DIR_RD)) {
583 /* stop reading until we get some space */
584 tv_eternity(&req->rex);
585 }
586 } else {
587 /* there's still some space in the buffer */
588 if (EV_FD_COND_S(t->cli_fd, DIR_RD)) {
589 if (!tv_isset(&req->rto) ||
590 (t->srv_state < SV_STDATA && tv_isset(&req->wto)))
591 /* If the client has no timeout, or if the server not ready yet, and we
592 * know for sure that it can expire, then it's cleaner to disable the
593 * timeout on the client side so that too low values cannot make the
594 * sessions abort too early.
595 */
596 tv_eternity(&req->rex);
597 else
598 tv_add(&req->rex, &now, &req->rto);
599 }
600 }
601
602 if ((rep->l == 0) ||
603 ((s < SV_STDATA) /* FIXME: this may be optimized && (rep->w == rep->h)*/)) {
604 if (EV_FD_COND_C(t->cli_fd, DIR_WR)) {
605 /* stop writing */
606 tv_eternity(&rep->wex);
607 }
608 } else {
609 /* buffer not empty */
610 if (EV_FD_COND_S(t->cli_fd, DIR_WR)) {
611 /* restart writing */
612 if (tv_add_ifset(&rep->wex, &now, &rep->wto)) {
613 /* FIXME: to prevent the client from expiring read timeouts during writes,
614 * we refresh it. */
615 req->rex = rep->wex;
616 }
617 else
618 tv_eternity(&rep->wex);
619 }
620 }
621 return 0; /* other cases change nothing */
622 }
623 else if (c == CL_STSHUTR) {
624 if (rep->flags & BF_WRITE_ERROR) {
625 buffer_shutw(rep);
626 fd_delete(t->cli_fd);
627 t->cli_state = CL_STCLOSE;
628 if (!(t->flags & SN_ERR_MASK))
629 t->flags |= SN_ERR_CLICL;
630 if (!(t->flags & SN_FINST_MASK)) {
631 if (t->pend_pos)
632 t->flags |= SN_FINST_Q;
633 else if (s == SV_STCONN)
634 t->flags |= SN_FINST_C;
635 else
636 t->flags |= SN_FINST_D;
637 }
638 return 1;
639 }
640 else if ((s == SV_STSHUTR || s == SV_STCLOSE) && (rep->l == 0)) {
641 buffer_shutw(rep);
642 fd_delete(t->cli_fd);
643 t->cli_state = CL_STCLOSE;
644 return 1;
645 }
646 else if (tv_isle(&rep->wex, &now)) {
647 buffer_shutw(rep);
648 fd_delete(t->cli_fd);
649 t->cli_state = CL_STCLOSE;
650 if (!(t->flags & SN_ERR_MASK))
651 t->flags |= SN_ERR_CLITO;
652 if (!(t->flags & SN_FINST_MASK)) {
653 if (t->pend_pos)
654 t->flags |= SN_FINST_Q;
655 else if (s == SV_STCONN)
656 t->flags |= SN_FINST_C;
657 else
658 t->flags |= SN_FINST_D;
659 }
660 return 1;
661 }
662
663 if (rep->l == 0) {
664 if (EV_FD_COND_C(t->cli_fd, DIR_WR)) {
665 /* stop writing */
666 tv_eternity(&rep->wex);
667 }
668 } else {
669 /* buffer not empty */
670 if (EV_FD_COND_S(t->cli_fd, DIR_WR)) {
671 /* restart writing */
672 if (!tv_add_ifset(&rep->wex, &now, &rep->wto))
673 tv_eternity(&rep->wex);
674 }
675 }
676 return 0;
677 }
678 else if (c == CL_STSHUTW) {
679 if (req->flags & BF_READ_ERROR) {
680 buffer_shutr(req);
681 fd_delete(t->cli_fd);
682 t->cli_state = CL_STCLOSE;
683 if (!(t->flags & SN_ERR_MASK))
684 t->flags |= SN_ERR_CLICL;
685 if (!(t->flags & SN_FINST_MASK)) {
686 if (t->pend_pos)
687 t->flags |= SN_FINST_Q;
688 else if (s == SV_STCONN)
689 t->flags |= SN_FINST_C;
690 else
691 t->flags |= SN_FINST_D;
692 }
693 return 1;
694 }
695 else if (req->flags & BF_READ_NULL || s == SV_STSHUTW || s == SV_STCLOSE) {
696 buffer_shutr(req);
697 fd_delete(t->cli_fd);
698 t->cli_state = CL_STCLOSE;
699 return 1;
700 }
701 else if (tv_isle(&req->rex, &now)) {
702 buffer_shutr(req);
703 fd_delete(t->cli_fd);
704 t->cli_state = CL_STCLOSE;
705 if (!(t->flags & SN_ERR_MASK))
706 t->flags |= SN_ERR_CLITO;
707 if (!(t->flags & SN_FINST_MASK)) {
708 if (t->pend_pos)
709 t->flags |= SN_FINST_Q;
710 else if (s == SV_STCONN)
711 t->flags |= SN_FINST_C;
712 else
713 t->flags |= SN_FINST_D;
714 }
715 return 1;
716 }
717 else if (req->l >= req->rlim - req->data) {
718 /* no room to read more data */
719
720 /* FIXME-20050705: is it possible for a client to maintain a session
721 * after the timeout by sending more data after it receives a close ?
722 */
723
724 if (EV_FD_COND_C(t->cli_fd, DIR_RD)) {
725 /* stop reading until we get some space */
726 tv_eternity(&req->rex);
727 //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
728 }
729 } else {
730 /* there's still some space in the buffer */
731 if (EV_FD_COND_S(t->cli_fd, DIR_RD)) {
732 if (!tv_add_ifset(&req->rex, &now, &req->rto))
733 tv_eternity(&req->rex);
734 //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
735 }
736 }
737 return 0;
738 }
739 else { /* CL_STCLOSE: nothing to do */
740 if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) {
741 int len;
742 len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n", t->uniq_id, t->be?t->be->id:"",
743 (unsigned short)t->cli_fd, (unsigned short)t->srv_fd);
744 write(1, trash, len);
745 }
746 return 0;
747 }
748 return 0;
749}
750
751#if 0
752 /* FIXME! This part has not been completely converted yet, and it may
753 * still be very specific to TCPv4 ! Also, it relies on some parameters
754 * such as conn_retries which are not set upon accept().
755 */
756/*
757 * Manages the server FSM and its socket. It returns 1 if a state has changed
758 * (and a resync may be needed), otherwise 0.
759 */
760static int process_uxst_srv(struct session *t)
761{
762 int s = t->srv_state;
763 int c = t->cli_state;
764 struct buffer *req = t->req;
765 struct buffer *rep = t->rep;
766 int conn_err;
767
768 if (s == SV_STIDLE) {
769 if (c == CL_STCLOSE || c == CL_STSHUTW ||
770 (c == CL_STSHUTR &&
771 (t->req->l == 0 || t->be->options & PR_O_ABRT_CLOSE))) { /* give up */
772 tv_eternity(&req->cex);
773 if (t->pend_pos)
774 t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
775 srv_close_with_err(t, SN_ERR_CLICL, t->pend_pos ? SN_FINST_Q : SN_FINST_C);
776 return 1;
777 }
778 else {
779 /* FIXME: reimplement the TARPIT check here */
780
781 /* Right now, we will need to create a connection to the server.
782 * We might already have tried, and got a connection pending, in
783 * which case we will not do anything till it's pending. It's up
784 * to any other session to release it and wake us up again.
785 */
786 if (t->pend_pos) {
787 if (!tv_isle(&req->cex, &now))
788 return 0;
789 else {
790 /* we've been waiting too long here */
791 tv_eternity(&req->cex);
792 t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
793 srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_Q);
794 if (t->srv)
795 t->srv->failed_conns++;
796 if (t->fe)
797 t->fe->failed_conns++;
798 return 1;
799 }
800 }
801
802 do {
803 /* first, get a connection */
804 if (srv_redispatch_connect(t))
805 return t->srv_state != SV_STIDLE;
806
807 /* try to (re-)connect to the server, and fail if we expire the
808 * number of retries.
809 */
810 if (srv_retryable_connect(t)) {
811 t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
812 return t->srv_state != SV_STIDLE;
813 }
814 } while (1);
815 }
816 }
817 else if (s == SV_STCONN) { /* connection in progress */
818 if (c == CL_STCLOSE || c == CL_STSHUTW ||
819 (c == CL_STSHUTR &&
820 ((t->req->l == 0 && !(req->flags & BF_WRITE_STATUS)) ||
821 t->be->options & PR_O_ABRT_CLOSE))) { /* give up */
822 tv_eternity(&req->cex);
823 fd_delete(t->srv_fd);
824 if (t->srv)
825 t->srv->cur_sess--;
826
827 srv_close_with_err(t, SN_ERR_CLICL, SN_FINST_C);
828 return 1;
829 }
830 if (!(req->flags & BF_WRITE_STATUS) && !tv_isle(&req->cex, &now)) {
831 //fprintf(stderr,"1: c=%d, s=%d, now=%d.%06d, exp=%d.%06d\n", c, s, now.tv_sec, now.tv_usec, req->cex.tv_sec, req->cex.tv_usec);
832 return 0; /* nothing changed */
833 }
834 else if (!(req->flags & BF_WRITE_STATUS) || (req->flags & BF_WRITE_ERROR)) {
835 /* timeout, asynchronous connect error or first write error */
836 //fprintf(stderr,"2: c=%d, s=%d\n", c, s);
837
838 fd_delete(t->srv_fd);
839 if (t->srv)
840 t->srv->cur_sess--;
841
842 if (!(req->flags & BF_WRITE_STATUS))
843 conn_err = SN_ERR_SRVTO; // it was a connect timeout.
844 else
845 conn_err = SN_ERR_SRVCL; // it was an asynchronous connect error.
846
847 /* ensure that we have enough retries left */
848 if (srv_count_retry_down(t, conn_err))
849 return 1;
850
851 if (t->srv && t->conn_retries == 0 && t->be->options & PR_O_REDISP) {
852 /* We're on our last chance, and the REDISP option was specified.
853 * We will ignore cookie and force to balance or use the dispatcher.
854 */
855 /* let's try to offer this slot to anybody */
856 if (may_dequeue_tasks(t->srv, t->be))
857 task_wakeup(t->srv->queue_mgt);
858
859 if (t->srv)
860 t->srv->failed_conns++;
861 t->be->failed_conns++;
862
863 t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
864 t->srv = NULL; /* it's left to the dispatcher to choose a server */
865
866 /* first, get a connection */
867 if (srv_redispatch_connect(t))
868 return t->srv_state != SV_STIDLE;
869 }
870
871 do {
872 /* Now we will try to either reconnect to the same server or
873 * connect to another server. If the connection gets queued
874 * because all servers are saturated, then we will go back to
875 * the SV_STIDLE state.
876 */
877 if (srv_retryable_connect(t)) {
878 t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
879 return t->srv_state != SV_STCONN;
880 }
881
882 /* we need to redispatch the connection to another server */
883 if (srv_redispatch_connect(t))
884 return t->srv_state != SV_STCONN;
885 } while (1);
886 }
887 else { /* no error or write 0 */
888 t->logs.t_connect = tv_ms_elapsed(&t->logs.tv_accept, &now);
889
890 //fprintf(stderr,"3: c=%d, s=%d\n", c, s);
891 if (req->l == 0) /* nothing to write */ {
892 EV_FD_CLR(t->srv_fd, DIR_WR);
893 tv_eternity(&req->wex);
894 } else /* need the right to write */ {
895 EV_FD_SET(t->srv_fd, DIR_WR);
896 if (tv_add_ifset(&req->wex, &now, &req->wto)) {
897 /* FIXME: to prevent the server from expiring read timeouts during writes,
898 * we refresh it. */
899 rep->rex = req->wex;
900 }
901 else
902 tv_eternity(&req->wex);
903 }
904
905 EV_FD_SET(t->srv_fd, DIR_RD);
906 if (!tv_add_ifset(&rep->rex, &now, &rep->rto))
907 tv_eternity(&rep->rex);
908
909 t->srv_state = SV_STDATA;
910 if (t->srv)
911 t->srv->cum_sess++;
912 rep->rlim = rep->data + BUFSIZE; /* no rewrite needed */
913
914 /* if the user wants to log as soon as possible, without counting
915 bytes from the server, then this is the right moment. */
916 if (t->fe && t->fe->to_log && !(t->logs.logwait & LW_BYTES)) {
917 t->logs.t_close = t->logs.t_connect; /* to get a valid end date */
918 //uxst_sess_log(t);
919 }
920 tv_eternity(&req->cex);
921 return 1;
922 }
923 }
924 else if (s == SV_STDATA) {
925 /* read or write error */
926 if (req->flags & BF_WRITE_ERROR || rep->flags & BF_READ_ERROR) {
927 buffer_shutr(rep);
928 buffer_shutw(req);
929 fd_delete(t->srv_fd);
930 if (t->srv) {
931 t->srv->cur_sess--;
932 t->srv->failed_resp++;
933 }
934 t->be->failed_resp++;
935 t->srv_state = SV_STCLOSE;
936 if (!(t->flags & SN_ERR_MASK))
937 t->flags |= SN_ERR_SRVCL;
938 if (!(t->flags & SN_FINST_MASK))
939 t->flags |= SN_FINST_D;
940 /* We used to have a free connection slot. Since we'll never use it,
941 * we have to inform the server that it may be used by another session.
942 */
943 if (may_dequeue_tasks(t->srv, t->be))
944 task_wakeup(t->srv->queue_mgt);
945
946 return 1;
947 }
948 /* last read, or end of client write */
949 else if (rep->flags & BF_READ_NULL || c == CL_STSHUTW || c == CL_STCLOSE) {
950 EV_FD_CLR(t->srv_fd, DIR_RD);
951 buffer_shutr(rep);
952 t->srv_state = SV_STSHUTR;
953 //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
954 return 1;
955 }
956 /* end of client read and no more data to send */
957 else if ((c == CL_STSHUTR || c == CL_STCLOSE) && (req->l == 0)) {
958 EV_FD_CLR(t->srv_fd, DIR_WR);
959 buffer_shutw(req);
960 shutdown(t->srv_fd, SHUT_WR);
961 /* We must ensure that the read part is still alive when switching
962 * to shutw */
963 EV_FD_SET(t->srv_fd, DIR_RD);
964 tv_add_ifset(&rep->rex, &now, &rep->rto);
965
966 t->srv_state = SV_STSHUTW;
967 return 1;
968 }
969 /* read timeout */
970 else if (tv_isle(&rep->rex, &now)) {
971 EV_FD_CLR(t->srv_fd, DIR_RD);
972 buffer_shutr(rep);
973 t->srv_state = SV_STSHUTR;
974 if (!(t->flags & SN_ERR_MASK))
975 t->flags |= SN_ERR_SRVTO;
976 if (!(t->flags & SN_FINST_MASK))
977 t->flags |= SN_FINST_D;
978 return 1;
979 }
980 /* write timeout */
981 else if (tv_isle(&req->wex, &now)) {
982 EV_FD_CLR(t->srv_fd, DIR_WR);
983 buffer_shutw(req);
984 shutdown(t->srv_fd, SHUT_WR);
985 /* We must ensure that the read part is still alive when switching
986 * to shutw */
987 EV_FD_SET(t->srv_fd, DIR_RD);
988 tv_add_ifset(&rep->rex, &now, &rep->rto);
989 t->srv_state = SV_STSHUTW;
990 if (!(t->flags & SN_ERR_MASK))
991 t->flags |= SN_ERR_SRVTO;
992 if (!(t->flags & SN_FINST_MASK))
993 t->flags |= SN_FINST_D;
994 return 1;
995 }
996
997 /* recompute request time-outs */
998 if (req->l == 0) {
999 if (EV_FD_COND_C(t->srv_fd, DIR_WR)) {
1000 /* stop writing */
1001 tv_eternity(&req->wex);
1002 }
1003 }
1004 else { /* buffer not empty, there are still data to be transferred */
1005 if (EV_FD_COND_S(t->srv_fd, DIR_WR)) {
1006 /* restart writing */
1007 if (tv_add_ifset(&req->wex, &now, &req->wto)) {
1008 /* FIXME: to prevent the server from expiring read timeouts during writes,
1009 * we refresh it. */
1010 rep->rex = req->wex;
1011 }
1012 else
1013 tv_eternity(&req->wex);
1014 }
1015 }
1016
1017 /* recompute response time-outs */
1018 if (rep->l == BUFSIZE) { /* no room to read more data */
1019 if (EV_FD_COND_C(t->srv_fd, DIR_RD)) {
1020 tv_eternity(&rep->rex);
1021 }
1022 }
1023 else {
1024 if (EV_FD_COND_S(t->srv_fd, DIR_RD)) {
1025 if (!tv_add_ifset(&rep->rex, &now, &rep->rto))
1026 tv_eternity(&rep->rex);
1027 }
1028 }
1029
1030 return 0; /* other cases change nothing */
1031 }
1032 else if (s == SV_STSHUTR) {
1033 if (req->flags & BF_WRITE_ERROR) {
1034 //EV_FD_CLR(t->srv_fd, DIR_WR);
1035 buffer_shutw(req);
1036 fd_delete(t->srv_fd);
1037 if (t->srv) {
1038 t->srv->cur_sess--;
1039 t->srv->failed_resp++;
1040 }
1041 t->be->failed_resp++;
1042 //close(t->srv_fd);
1043 t->srv_state = SV_STCLOSE;
1044 if (!(t->flags & SN_ERR_MASK))
1045 t->flags |= SN_ERR_SRVCL;
1046 if (!(t->flags & SN_FINST_MASK))
1047 t->flags |= SN_FINST_D;
1048 /* We used to have a free connection slot. Since we'll never use it,
1049 * we have to inform the server that it may be used by another session.
1050 */
1051 if (may_dequeue_tasks(t->srv, t->be))
1052 task_wakeup(t->srv->queue_mgt);
1053
1054 return 1;
1055 }
1056 else if ((c == CL_STSHUTR || c == CL_STCLOSE) && (req->l == 0)) {
1057 //EV_FD_CLR(t->srv_fd, DIR_WR);
1058 buffer_shutw(req);
1059 fd_delete(t->srv_fd);
1060 if (t->srv)
1061 t->srv->cur_sess--;
1062 //close(t->srv_fd);
1063 t->srv_state = SV_STCLOSE;
1064 /* We used to have a free connection slot. Since we'll never use it,
1065 * we have to inform the server that it may be used by another session.
1066 */
1067 if (may_dequeue_tasks(t->srv, t->be))
1068 task_wakeup(t->srv->queue_mgt);
1069
1070 return 1;
1071 }
1072 else if (tv_isle(&req->wex, &now)) {
1073 //EV_FD_CLR(t->srv_fd, DIR_WR);
1074 buffer_shutw(req);
1075 fd_delete(t->srv_fd);
1076 if (t->srv)
1077 t->srv->cur_sess--;
1078 //close(t->srv_fd);
1079 t->srv_state = SV_STCLOSE;
1080 if (!(t->flags & SN_ERR_MASK))
1081 t->flags |= SN_ERR_SRVTO;
1082 if (!(t->flags & SN_FINST_MASK))
1083 t->flags |= SN_FINST_D;
1084 /* We used to have a free connection slot. Since we'll never use it,
1085 * we have to inform the server that it may be used by another session.
1086 */
1087 if (may_dequeue_tasks(t->srv, t->be))
1088 task_wakeup(t->srv->queue_mgt);
1089
1090 return 1;
1091 }
1092 else if (req->l == 0) {
1093 if (EV_FD_COND_C(t->srv_fd, DIR_WR)) {
1094 /* stop writing */
1095 tv_eternity(&req->wex);
1096 }
1097 }
1098 else { /* buffer not empty */
1099 if (EV_FD_COND_S(t->srv_fd, DIR_WR)) {
1100 /* restart writing */
1101 if (!tv_add_ifset(&req->wex, &now, &req->wto))
1102 tv_eternity(&req->wex);
1103 }
1104 }
1105 return 0;
1106 }
1107 else if (s == SV_STSHUTW) {
1108 if (rep->flags & BF_READ_ERROR) {
1109 //EV_FD_CLR(t->srv_fd, DIR_RD);
1110 buffer_shutr(rep);
1111 fd_delete(t->srv_fd);
1112 if (t->srv) {
1113 t->srv->cur_sess--;
1114 t->srv->failed_resp++;
1115 }
1116 t->be->failed_resp++;
1117 //close(t->srv_fd);
1118 t->srv_state = SV_STCLOSE;
1119 if (!(t->flags & SN_ERR_MASK))
1120 t->flags |= SN_ERR_SRVCL;
1121 if (!(t->flags & SN_FINST_MASK))
1122 t->flags |= SN_FINST_D;
1123 /* We used to have a free connection slot. Since we'll never use it,
1124 * we have to inform the server that it may be used by another session.
1125 */
1126 if (may_dequeue_tasks(t->srv, t->be))
1127 task_wakeup(t->srv->queue_mgt);
1128
1129 return 1;
1130 }
1131 else if (rep->flags & BF_READ_NULL || c == CL_STSHUTW || c == CL_STCLOSE) {
1132 //EV_FD_CLR(t->srv_fd, DIR_RD);
1133 buffer_shutr(rep);
1134 fd_delete(t->srv_fd);
1135 if (t->srv)
1136 t->srv->cur_sess--;
1137 //close(t->srv_fd);
1138 t->srv_state = SV_STCLOSE;
1139 /* We used to have a free connection slot. Since we'll never use it,
1140 * we have to inform the server that it may be used by another session.
1141 */
1142 if (may_dequeue_tasks(t->srv, t->be))
1143 task_wakeup(t->srv->queue_mgt);
1144
1145 return 1;
1146 }
1147 else if (tv_isle(&rep->rex, &now)) {
1148 //EV_FD_CLR(t->srv_fd, DIR_RD);
1149 buffer_shutr(rep);
1150 fd_delete(t->srv_fd);
1151 if (t->srv)
1152 t->srv->cur_sess--;
1153 //close(t->srv_fd);
1154 t->srv_state = SV_STCLOSE;
1155 if (!(t->flags & SN_ERR_MASK))
1156 t->flags |= SN_ERR_SRVTO;
1157 if (!(t->flags & SN_FINST_MASK))
1158 t->flags |= SN_FINST_D;
1159 /* We used to have a free connection slot. Since we'll never use it,
1160 * we have to inform the server that it may be used by another session.
1161 */
1162 if (may_dequeue_tasks(t->srv, t->be))
1163 task_wakeup(t->srv->queue_mgt);
1164
1165 return 1;
1166 }
1167 else if (rep->l == BUFSIZE) { /* no room to read more data */
1168 if (EV_FD_COND_C(t->srv_fd, DIR_RD)) {
1169 tv_eternity(&rep->rex);
1170 }
1171 }
1172 else {
1173 if (EV_FD_COND_S(t->srv_fd, DIR_RD)) {
1174 if (!tv_add_ifset(&rep->rex, &now, &rep->rto))
1175 tv_eternity(&rep->rex);
1176 }
1177 }
1178 return 0;
1179 }
1180 else { /* SV_STCLOSE : nothing to do */
1181 if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) {
1182 int len;
1183 len = sprintf(trash, "%08x:%s.srvcls[%04x:%04x]\n",
1184 t->uniq_id, t->be->id, (unsigned short)t->cli_fd, (unsigned short)t->srv_fd);
1185 write(1, trash, len);
1186 }
1187 return 0;
1188 }
1189 return 0;
1190}
1191
1192/* Processes the client and server jobs of a session task, then
1193 * puts it back to the wait queue in a clean state, or
1194 * cleans up its resources if it must be deleted. Returns
1195 * the time the task accepts to wait, or TIME_ETERNITY for
1196 * infinity.
1197 */
1198void process_uxst_session(struct task *t, struct timeval *next)
1199{
1200 struct session *s = t->context;
1201 int fsm_resync = 0;
1202
1203 do {
1204 fsm_resync = 0;
1205 fsm_resync |= process_uxst_cli(s);
1206 if (s->srv_state == SV_STIDLE) {
1207 if (s->cli_state == CL_STCLOSE || s->cli_state == CL_STSHUTW) {
1208 s->srv_state = SV_STCLOSE;
1209 fsm_resync |= 1;
1210 continue;
1211 }
1212 if (s->cli_state == CL_STSHUTR ||
1213 (s->req->l >= s->req->rlim - s->req->data)) {
1214 if (s->req->l == 0) {
1215 s->srv_state = SV_STCLOSE;
1216 fsm_resync |= 1;
1217 continue;
1218 }
1219 /* OK we have some remaining data to process */
1220 /* Just as an exercice, we copy the req into the resp,
1221 * and flush the req.
1222 */
1223 memcpy(s->rep->data, s->req->data, sizeof(s->rep->data));
1224 s->rep->l = s->req->l;
1225 s->rep->rlim = s->rep->data + BUFSIZE;
1226 s->rep->w = s->rep->data;
1227 s->rep->lr = s->rep->r = s->rep->data + s->rep->l;
1228
1229 s->req->l = 0;
1230 s->srv_state = SV_STCLOSE;
1231
1232 fsm_resync |= 1;
1233 continue;
1234 }
1235 }
1236 } while (fsm_resync);
1237
1238 if (likely(s->cli_state != CL_STCLOSE || s->srv_state != SV_STCLOSE)) {
1239 s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
1240 s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
1241
1242 t->expire = s->req->rex;
1243 tv_min(&t->expire, &s->req->rex, &s->req->wex);
1244 tv_bound(&t->expire, &s->req->cex);
1245 tv_bound(&t->expire, &s->rep->rex);
1246 tv_bound(&t->expire, &s->rep->wex);
1247
1248 /* restore t to its place in the task list */
1249 task_queue(t);
1250
1251 *next = t->expire;
1252 return; /* nothing more to do */
1253 }
1254
1255 if (s->fe)
1256 s->fe->feconn--;
1257 if (s->be && (s->flags & SN_BE_ASSIGNED))
1258 s->be->beconn--;
1259 actconn--;
1260
1261 if (unlikely((global.mode & MODE_DEBUG) &&
1262 (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
1263 int len;
1264 len = sprintf(trash, "%08x:%s.closed[%04x:%04x]\n",
1265 s->uniq_id, s->be->id,
1266 (unsigned short)s->cli_fd, (unsigned short)s->srv_fd);
1267 write(1, trash, len);
1268 }
1269
1270 s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now);
1271 if (s->req != NULL)
1272 s->logs.bytes_in = s->req->total;
1273 if (s->rep != NULL)
1274 s->logs.bytes_out = s->rep->total;
1275
1276 if (s->fe) {
1277 s->fe->bytes_in += s->logs.bytes_in;
1278 s->fe->bytes_out += s->logs.bytes_out;
1279 }
1280 if (s->be && (s->be != s->fe)) {
1281 s->be->bytes_in += s->logs.bytes_in;
1282 s->be->bytes_out += s->logs.bytes_out;
1283 }
1284 if (s->srv) {
1285 s->srv->bytes_in += s->logs.bytes_in;
1286 s->srv->bytes_out += s->logs.bytes_out;
1287 }
1288
1289 /* let's do a final log if we need it */
1290 if (s->logs.logwait &&
1291 !(s->flags & SN_MONITOR) &&
1292 (s->req->total || !(s->fe && s->fe->options & PR_O_NULLNOLOG))) {
1293 //uxst_sess_log(s);
1294 }
1295
1296 /* the task MUST not be in the run queue anymore */
1297 task_delete(t);
1298 session_free(s);
1299 task_free(t);
1300 tv_eternity(next);
1301}
1302#endif /* not converted */
1303
1304
1305/* Processes data exchanges on the statistics socket. The client processing
1306 * is called and the task is put back in the wait queue or it is cleared.
1307 * In order to ease the transition, we simply simulate the server status
1308 * for now. It only knows states SV_STIDLE and SV_STCLOSE. Returns in <next>
1309 * the task's expiration date.
1310 */
1311void process_uxst_stats(struct task *t, struct timeval *next)
1312{
1313 struct session *s = t->context;
1314 struct listener *listener;
1315 int fsm_resync = 0;
1316
1317 do {
1318 //fprintf(stderr,"fct %s:%d\n", __FUNCTION__, __LINE__);
1319 fsm_resync = 0;
1320 fsm_resync |= process_uxst_cli(s);
1321 if (s->srv_state == SV_STIDLE) {
1322 if (s->cli_state == CL_STCLOSE || s->cli_state == CL_STSHUTW) {
1323 s->srv_state = SV_STCLOSE;
1324 fsm_resync |= 1;
1325 continue;
1326 }
1327 else if (s->cli_state == CL_STSHUTR ||
1328 (s->req->l >= s->req->rlim - s->req->data)) {
1329 if (s->req->l == 0) {
1330 s->srv_state = SV_STCLOSE;
1331 fsm_resync |= 1;
1332 continue;
1333 }
1334 /* OK we have some remaining data to process. Just for the
1335 * sake of an exercice, we copy the req into the resp,
1336 * and flush the req. This produces a simple echo function.
1337 */
1338 memcpy(s->rep->data, s->req->data, sizeof(s->rep->data));
1339 s->rep->l = s->req->l;
1340 s->rep->rlim = s->rep->data + BUFSIZE;
1341 s->rep->w = s->rep->data;
1342 s->rep->lr = s->rep->r = s->rep->data + s->rep->l;
1343
1344 s->req->l = 0;
1345 s->srv_state = SV_STCLOSE;
1346
1347 fsm_resync |= 1;
1348 continue;
1349 }
1350 }
1351 } while (fsm_resync);
1352
1353 if (likely(s->cli_state != CL_STCLOSE || s->srv_state != SV_STCLOSE)) {
1354 s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
1355 s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
1356
1357 t->expire = s->req->rex;
1358 tv_min(&t->expire, &s->req->rex, &s->req->wex);
1359 tv_bound(&t->expire, &s->req->cex);
1360 tv_bound(&t->expire, &s->rep->rex);
1361 tv_bound(&t->expire, &s->rep->wex);
1362
1363 /* restore t to its place in the task list */
1364 task_queue(t);
1365
1366 *next = t->expire;
1367 return; /* nothing more to do */
1368 }
1369
1370 actconn--;
1371 listener = fdtab[s->cli_fd].listener;
1372 if (listener) {
1373 listener->nbconn--;
1374 if (listener->state == LI_FULL &&
1375 listener->nbconn < listener->maxconn) {
1376 /* we should reactivate the listener */
1377 EV_FD_SET(listener->fd, DIR_RD);
1378 listener->state = LI_READY;
1379 }
1380 }
1381
1382 /* the task MUST not be in the run queue anymore */
1383 task_delete(t);
1384 session_free(s);
1385 task_free(t);
1386 tv_eternity(next);
1387}
1388
1389/* Note: must not be declared <const> as its list will be overwritten */
1390static struct protocol proto_unix = {
1391 .name = "unix_stream",
1392 .sock_domain = PF_UNIX,
1393 .sock_type = SOCK_STREAM,
1394 .sock_prot = 0,
1395 .sock_family = AF_UNIX,
1396 .read = &stream_sock_read,
1397 .write = &stream_sock_write,
1398 .bind_all = uxst_bind_listeners,
1399 .unbind_all = uxst_unbind_listeners,
1400 .enable_all = uxst_enable_listeners,
1401 .listeners = LIST_HEAD_INIT(proto_unix.listeners),
1402 .nb_listeners = 0,
1403};
1404
1405/* Adds listener to the list of unix stream listeners */
1406void uxst_add_listener(struct listener *listener)
1407{
1408 listener->proto = &proto_unix;
1409 LIST_ADDQ(&proto_unix.listeners, &listener->proto_list);
1410 proto_unix.nb_listeners++;
1411}
1412
1413__attribute__((constructor))
1414static void __uxst_protocol_init(void)
1415{
1416 protocol_register(&proto_unix);
1417 //tv_eternity(&global.unix_fe.clitimeout);
1418}
1419
1420
1421/*
1422 * Local variables:
1423 * c-indent-level: 8
1424 * c-basic-offset: 8
1425 * End:
1426 */