blob: 1920376697127fe68f349f03846748c7187453cc [file] [log] [blame]
Willy Tarreau92fb9832007-10-16 17:34:28 +02001/*
2 * UNIX SOCK_STREAM protocol layer (uxst)
3 *
4 * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <ctype.h>
14#include <errno.h>
15#include <fcntl.h>
16#include <stdio.h>
17#include <stdlib.h>
18#include <string.h>
19#include <syslog.h>
20#include <time.h>
21
22#include <sys/param.h>
23#include <sys/socket.h>
24#include <sys/stat.h>
25#include <sys/types.h>
26#include <sys/un.h>
27
28#include <common/compat.h>
29#include <common/config.h>
30#include <common/debug.h>
31#include <common/memory.h>
32#include <common/mini-clist.h>
33#include <common/standard.h>
34#include <common/time.h>
35#include <common/version.h>
36
37#include <types/acl.h>
38#include <types/capture.h>
39#include <types/client.h>
40#include <types/global.h>
41#include <types/polling.h>
42#include <types/proxy.h>
43#include <types/server.h>
44
45#include <proto/acl.h>
46#include <proto/backend.h>
47#include <proto/buffers.h>
48#include <proto/fd.h>
49#include <proto/log.h>
50#include <proto/protocols.h>
51#include <proto/proto_uxst.h>
52#include <proto/queue.h>
53#include <proto/session.h>
54#include <proto/stream_sock.h>
55#include <proto/task.h>
56
57#ifndef MAXPATHLEN
58#define MAXPATHLEN 128
59#endif
60
61/* This function creates a named PF_UNIX stream socket at address <path>. Note
62 * that the path cannot be NULL nor empty.
63 * It returns the assigned file descriptor, or -1 in the event of an error.
64 */
65static int create_uxst_socket(const char *path)
66{
67 char tempname[MAXPATHLEN];
68 char backname[MAXPATHLEN];
69 struct sockaddr_un addr;
70
71 int ret, sock;
72
73 /* 1. create socket names */
74 if (!path[0]) {
75 Alert("Invalid name for a UNIX socket. Aborting.\n");
76 goto err_return;
77 }
78
79 ret = snprintf(tempname, MAXPATHLEN, "%s.%d.tmp", path, pid);
80 if (ret < 0 || ret >= MAXPATHLEN) {
81 Alert("name too long for UNIX socket. Aborting.\n");
82 goto err_return;
83 }
84
85 ret = snprintf(backname, MAXPATHLEN, "%s.%d.bak", path, pid);
86 if (ret < 0 || ret >= MAXPATHLEN) {
87 Alert("name too long for UNIX socket. Aborting.\n");
88 goto err_return;
89 }
90
91 /* 2. clean existing orphaned entries */
92 if (unlink(tempname) < 0 && errno != ENOENT) {
93 Alert("error when trying to unlink previous UNIX socket. Aborting.\n");
94 goto err_return;
95 }
96
97 if (unlink(backname) < 0 && errno != ENOENT) {
98 Alert("error when trying to unlink previous UNIX socket. Aborting.\n");
99 goto err_return;
100 }
101
102 /* 3. backup existing socket */
103 if (link(path, backname) < 0 && errno != ENOENT) {
104 Alert("error when trying to preserve previous UNIX socket. Aborting.\n");
105 goto err_return;
106 }
107
108 /* 4. prepare new socket */
109 addr.sun_family = AF_UNIX;
110 strncpy(addr.sun_path, tempname, sizeof(addr.sun_path));
111 addr.sun_path[sizeof(addr.sun_path) - 1] = 0;
112
113 sock = socket(PF_UNIX, SOCK_STREAM, 0);
114 if (sock < 0) {
115 Alert("cannot create socket for UNIX listener. Aborting.\n");
116 goto err_unlink_back;
117 }
118
119 if (sock >= global.maxsock) {
120 Alert("socket(): not enough free sockets for UNIX listener. Raise -n argument. Aborting.\n");
121 goto err_unlink_temp;
122 }
123
124 if (fcntl(sock, F_SETFL, O_NONBLOCK) == -1) {
125 Alert("cannot make UNIX socket non-blocking. Aborting.\n");
126 goto err_unlink_temp;
127 }
128
129 if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
130 /* note that bind() creates the socket <tempname> on the file system */
131 Alert("cannot bind socket for UNIX listener. Aborting.\n");
132 goto err_unlink_temp;
133 }
134
135 if (listen(sock, 0) < 0) {
136 Alert("cannot listen to socket for UNIX listener. Aborting.\n");
137 goto err_unlink_temp;
138 }
139
140 /* 5. install.
141 * Point of no return: we are ready, we'll switch the sockets. We don't
142 * fear loosing the socket <path> because we have a copy of it in
143 * backname.
144 */
145 if (rename(tempname, path) < 0) {
146 Alert("cannot switch final and temporary sockets for UNIX listener. Aborting.\n");
147 goto err_rename;
148 }
149
150 /* 6. cleanup */
151 unlink(backname); /* no need to keep this one either */
152
153 return sock;
154
155 err_rename:
156 ret = rename(backname, path);
157 if (ret < 0 && errno == ENOENT)
158 unlink(path);
159 err_unlink_temp:
160 unlink(tempname);
161 close(sock);
162 err_unlink_back:
163 unlink(backname);
164 err_return:
165 return -1;
166}
167
168/* Tries to destroy the UNIX stream socket <path>. The socket must not be used
169 * anymore. It practises best effort, and no error is returned.
170 */
171static void destroy_uxst_socket(const char *path)
172{
173 struct sockaddr_un addr;
174 int sock, ret;
175
176 /* We might have been chrooted, so we may not be able to access the
177 * socket. In order to avoid bothering the other end, we connect with a
178 * wrong protocol, namely SOCK_DGRAM. The return code from connect()
179 * is enough to know if the socket is still live or not. If it's live
180 * in mode SOCK_STREAM, we get EPROTOTYPE or anything else but not
181 * ECONNREFUSED. In this case, we do not touch it because it's used
182 * by some other process.
183 */
184 sock = socket(PF_UNIX, SOCK_DGRAM, 0);
185 if (sock < 0)
186 return;
187
188 addr.sun_family = AF_UNIX;
189 strncpy(addr.sun_path, path, sizeof(addr.sun_path));
190 addr.sun_path[sizeof(addr.sun_path)] = 0;
191 ret = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
192 if (ret < 0 && errno == ECONNREFUSED) {
193 /* Connect failed: the socket still exists but is not used
194 * anymore. Let's remove this socket now.
195 */
196 unlink(path);
197 }
198 close(sock);
199}
200
201
202/* This function creates all UNIX sockets bound to the protocol entry <proto>.
203 * It is intended to be used as the protocol's bind_all() function.
204 * The sockets will be registered but not added to any fd_set, in order not to
205 * loose them across the fork(). A call to uxst_enable_listeners() is needed
206 * to complete initialization.
207 *
208 * The return value is composed from ERR_NONE, ERR_RETRYABLE and ERR_FATAL.
209 */
210static int uxst_bind_listeners(struct protocol *proto)
211{
212 struct listener *listener;
213 int err = ERR_NONE;
214 int fd;
215
216 list_for_each_entry(listener, &proto->listeners, proto_list) {
217 if (listener->state != LI_INIT)
218 continue; /* already started */
219
220 fd = create_uxst_socket(((struct sockaddr_un *)&listener->addr)->sun_path);
221 if (fd == -1) {
222 err |= ERR_FATAL;
223 continue;
224 }
225
226 /* the socket is listening */
227 listener->fd = fd;
228 listener->state = LI_LISTEN;
229
230 /* the function for the accept() event */
231 fd_insert(fd);
232 fdtab[fd].cb[DIR_RD].f = listener->accept;
233 fdtab[fd].cb[DIR_WR].f = NULL; /* never called */
234 fdtab[fd].cb[DIR_RD].b = fdtab[fd].cb[DIR_WR].b = NULL;
235 fdtab[fd].owner = (struct task *)listener; /* reference the listener instead of a task */
236 fdtab[fd].state = FD_STLISTEN;
237 fdtab[fd].peeraddr = NULL;
238 fdtab[fd].peerlen = 0;
239 fdtab[fd].listener = NULL;
240 fdtab[fd].ev = 0;
241 }
242
243 return err;
244}
245
246/* This function adds the UNIX sockets file descriptors to the polling lists
247 * for all listeners in the LI_LISTEN state. It is intended to be used as the
248 * protocol's enable_all() primitive, after the fork(). It always returns
249 * ERR_NONE.
250 */
251static int uxst_enable_listeners(struct protocol *proto)
252{
253 struct listener *listener;
254
255 list_for_each_entry(listener, &proto->listeners, proto_list) {
256 if (listener->state == LI_LISTEN) {
257 EV_FD_SET(listener->fd, DIR_RD);
258 listener->state = LI_READY;
259 }
260 }
261 return ERR_NONE;
262}
263
264/* This function stops all listening UNIX sockets bound to the protocol
265 * <proto>. It does not detaches them from the protocol.
266 * It always returns ERR_NONE.
267 */
268static int uxst_unbind_listeners(struct protocol *proto)
269{
270 struct listener *listener;
271
272 list_for_each_entry(listener, &proto->listeners, proto_list) {
273 if (listener->state != LI_INIT) {
274 EV_FD_CLR(listener->fd, DIR_RD);
275 close(listener->fd);
276 listener->state = LI_INIT;
277 destroy_uxst_socket(((struct sockaddr_un *)&listener->addr)->sun_path);
278 }
279 }
280 return ERR_NONE;
281}
282
283/*
284 * This function is called on a read event from a listen socket, corresponding
285 * to an accept. It tries to accept as many connections as possible.
286 * It returns 0. Since we use UNIX sockets on the local system for monitoring
287 * purposes and other related things, we do not need to output as many messages
288 * as with TCP which can fall under attack.
289 */
290int uxst_event_accept(int fd) {
291 struct listener *l = (struct listener *)fdtab[fd].owner;
292 struct session *s;
293 struct task *t;
294 int cfd;
295 int max_accept;
296
297 if (global.nbproc > 1)
298 max_accept = 8; /* let other processes catch some connections too */
299 else
300 max_accept = -1;
301
302 while (max_accept--) {
303 struct sockaddr_storage addr;
304 socklen_t laddr = sizeof(addr);
305
306 if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) == -1) {
307 switch (errno) {
308 case EAGAIN:
309 case EINTR:
310 case ECONNABORTED:
311 return 0; /* nothing more to accept */
312 case ENFILE:
313 /* Process reached system FD limit. Check system tunables. */
314 return 0;
315 case EMFILE:
316 /* Process reached process FD limit. Check 'ulimit-n'. */
317 return 0;
318 case ENOBUFS:
319 case ENOMEM:
320 /* Process reached system memory limit. Check system tunables. */
321 return 0;
322 default:
323 return 0;
324 }
325 }
326
327 if (l->nbconn >= l->maxconn) {
328 /* too many connections, we shoot this one and return.
329 * FIXME: it would be better to simply switch the listener's
330 * state to LI_FULL and disable the FD. We could re-enable
331 * it upon fd_delete(), but this requires all protocols to
332 * be switched.
333 */
334 close(cfd);
335 return 0;
336 }
337
338 if ((s = pool_alloc2(pool2_session)) == NULL) {
339 Alert("out of memory in uxst_event_accept().\n");
340 close(cfd);
341 return 0;
342 }
343
344 if ((t = pool_alloc2(pool2_task)) == NULL) {
345 Alert("out of memory in uxst_event_accept().\n");
346 close(cfd);
347 pool_free2(pool2_session, s);
348 return 0;
349 }
350
351 s->cli_addr = addr;
352
353 /* FIXME: should be checked earlier */
354 if (cfd >= global.maxsock) {
355 Alert("accept(): not enough free sockets. Raise -n argument. Giving up.\n");
356 close(cfd);
357 pool_free2(pool2_task, t);
358 pool_free2(pool2_session, s);
359 return 0;
360 }
361
362 if (fcntl(cfd, F_SETFL, O_NONBLOCK) == -1) {
363 Alert("accept(): cannot set the socket in non blocking mode. Giving up\n");
364 close(cfd);
365 pool_free2(pool2_task, t);
366 pool_free2(pool2_session, s);
367 return 0;
368 }
369
370 t->wq = NULL;
371 t->qlist.p = NULL;
372 t->state = TASK_IDLE;
373 t->process = l->handler;
374 t->context = s;
375
376 s->task = t;
377 s->fe = NULL;
378 s->be = NULL;
379
380 s->cli_state = CL_STDATA;
381 s->srv_state = SV_STIDLE;
382 s->req = s->rep = NULL; /* will be allocated later */
383
384 s->cli_fd = cfd;
385 s->srv_fd = -1;
386 s->srv = NULL;
387 s->pend_pos = NULL;
388
389 memset(&s->logs, 0, sizeof(s->logs));
390 memset(&s->txn, 0, sizeof(s->txn));
391
392 s->data_source = DATA_SRC_NONE;
393 s->uniq_id = totalconn;
394
395 if ((s->req = pool_alloc2(pool2_buffer)) == NULL) { /* no memory */
396 close(cfd); /* nothing can be done for this fd without memory */
397 pool_free2(pool2_task, t);
398 pool_free2(pool2_session, s);
399 return 0;
400 }
401
402 if ((s->rep = pool_alloc2(pool2_buffer)) == NULL) { /* no memory */
403 pool_free2(pool2_buffer, s->req);
404 close(cfd); /* nothing can be done for this fd without memory */
405 pool_free2(pool2_task, t);
406 pool_free2(pool2_session, s);
407 return 0;
408 }
409
410 buffer_init(s->req);
411 buffer_init(s->rep);
412 s->req->rlim += BUFSIZE;
413 s->rep->rlim += BUFSIZE;
414
415 fd_insert(cfd);
416 fdtab[cfd].owner = t;
417 fdtab[cfd].listener = l;
418 fdtab[cfd].state = FD_STREADY;
419 fdtab[cfd].cb[DIR_RD].f = l->proto->read;
420 fdtab[cfd].cb[DIR_RD].b = s->req;
421 fdtab[cfd].cb[DIR_WR].f = l->proto->write;
422 fdtab[cfd].cb[DIR_WR].b = s->rep;
423 fdtab[cfd].peeraddr = (struct sockaddr *)&s->cli_addr;
424 fdtab[cfd].peerlen = sizeof(s->cli_addr);
425 fdtab[cfd].ev = 0;
426
427
428 tv_eternity(&s->req->rex);
429 tv_eternity(&s->req->wex);
430 tv_eternity(&s->req->cex);
431 tv_eternity(&s->rep->rex);
432 tv_eternity(&s->rep->wex);
433
434 tv_eternity(&s->req->wto);
435 tv_eternity(&s->req->cto);
436 tv_eternity(&s->req->rto);
437 tv_eternity(&s->rep->rto);
438 tv_eternity(&s->rep->cto);
439 tv_eternity(&s->rep->wto);
440
441 if (l->timeout)
442 s->req->rto = *l->timeout;
443
444 if (l->timeout)
445 s->rep->wto = *l->timeout;
446
447 tv_eternity(&t->expire);
448 if (l->timeout && tv_isset(l->timeout)) {
449 EV_FD_SET(cfd, DIR_RD);
450 tv_add(&s->req->rex, &now, &s->req->rto);
451 tv_add(&s->rep->wex, &now, &s->rep->wto);
452 t->expire = s->req->rex;
453 }
454
455 task_queue(t);
456 task_wakeup(t);
457
458 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
459 if (l->nbconn >= l->maxconn) {
460 EV_FD_CLR(l->fd, DIR_RD);
461 l->state = LI_FULL;
462 }
463 actconn++;
464 totalconn++;
465
466 //fprintf(stderr, "accepting from %p => %d conn, %d total, task=%p, cfd=%d, maxfd=%d\n", p, actconn, totalconn, t, cfd, maxfd);
467 } /* end of while (p->feconn < p->maxconn) */
468 //fprintf(stderr,"fct %s:%d\n", __FUNCTION__, __LINE__);
469 return 0;
470}
471
472/*
473 * manages the client FSM and its socket. It returns 1 if a state has changed
474 * (and a resync may be needed), otherwise 0.
475 */
476static int process_uxst_cli(struct session *t)
477{
478 int s = t->srv_state;
479 int c = t->cli_state;
480 struct buffer *req = t->req;
481 struct buffer *rep = t->rep;
482 //fprintf(stderr,"fct %s:%d\n", __FUNCTION__, __LINE__);
483 if (c == CL_STDATA) {
484 /* FIXME: this error handling is partly buggy because we always report
485 * a 'DATA' phase while we don't know if the server was in IDLE, CONN
486 * or HEADER phase. BTW, it's not logical to expire the client while
487 * we're waiting for the server to connect.
488 */
489 /* read or write error */
490 if (rep->flags & BF_WRITE_ERROR || req->flags & BF_READ_ERROR) {
491 buffer_shutr(req);
492 buffer_shutw(rep);
493 fd_delete(t->cli_fd);
494 t->cli_state = CL_STCLOSE;
495 if (!(t->flags & SN_ERR_MASK))
496 t->flags |= SN_ERR_CLICL;
497 if (!(t->flags & SN_FINST_MASK)) {
498 if (t->pend_pos)
499 t->flags |= SN_FINST_Q;
500 else if (s == SV_STCONN)
501 t->flags |= SN_FINST_C;
502 else
503 t->flags |= SN_FINST_D;
504 }
505 return 1;
506 }
507 /* last read, or end of server write */
508 else if (req->flags & BF_READ_NULL || s == SV_STSHUTW || s == SV_STCLOSE) {
509 EV_FD_CLR(t->cli_fd, DIR_RD);
510 buffer_shutr(req);
511 t->cli_state = CL_STSHUTR;
512 return 1;
513 }
514 /* last server read and buffer empty */
515 else if ((s == SV_STSHUTR || s == SV_STCLOSE) && (rep->l == 0)) {
516 EV_FD_CLR(t->cli_fd, DIR_WR);
517 buffer_shutw(rep);
518 shutdown(t->cli_fd, SHUT_WR);
519 /* We must ensure that the read part is still alive when switching
520 * to shutw */
521 EV_FD_SET(t->cli_fd, DIR_RD);
522 tv_add_ifset(&req->rex, &now, &req->rto);
523 t->cli_state = CL_STSHUTW;
524 //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
525 return 1;
526 }
527 /* read timeout */
528 else if (tv_isle(&req->rex, &now)) {
529 EV_FD_CLR(t->cli_fd, DIR_RD);
530 buffer_shutr(req);
531 t->cli_state = CL_STSHUTR;
532 if (!(t->flags & SN_ERR_MASK))
533 t->flags |= SN_ERR_CLITO;
534 if (!(t->flags & SN_FINST_MASK)) {
535 if (t->pend_pos)
536 t->flags |= SN_FINST_Q;
537 else if (s == SV_STCONN)
538 t->flags |= SN_FINST_C;
539 else
540 t->flags |= SN_FINST_D;
541 }
542 return 1;
543 }
544 /* write timeout */
545 else if (tv_isle(&rep->wex, &now)) {
546 EV_FD_CLR(t->cli_fd, DIR_WR);
547 buffer_shutw(rep);
548 shutdown(t->cli_fd, SHUT_WR);
549 /* We must ensure that the read part is still alive when switching
550 * to shutw */
551 EV_FD_SET(t->cli_fd, DIR_RD);
552 tv_add_ifset(&req->rex, &now, &req->rto);
553
554 t->cli_state = CL_STSHUTW;
555 if (!(t->flags & SN_ERR_MASK))
556 t->flags |= SN_ERR_CLITO;
557 if (!(t->flags & SN_FINST_MASK)) {
558 if (t->pend_pos)
559 t->flags |= SN_FINST_Q;
560 else if (s == SV_STCONN)
561 t->flags |= SN_FINST_C;
562 else
563 t->flags |= SN_FINST_D;
564 }
565 return 1;
566 }
567
568 if (req->l >= req->rlim - req->data) {
569 /* no room to read more data */
570 if (EV_FD_COND_C(t->cli_fd, DIR_RD)) {
571 /* stop reading until we get some space */
572 tv_eternity(&req->rex);
573 }
574 } else {
575 /* there's still some space in the buffer */
576 if (EV_FD_COND_S(t->cli_fd, DIR_RD)) {
577 if (!tv_isset(&req->rto) ||
578 (t->srv_state < SV_STDATA && tv_isset(&req->wto)))
579 /* If the client has no timeout, or if the server not ready yet, and we
580 * know for sure that it can expire, then it's cleaner to disable the
581 * timeout on the client side so that too low values cannot make the
582 * sessions abort too early.
583 */
584 tv_eternity(&req->rex);
585 else
586 tv_add(&req->rex, &now, &req->rto);
587 }
588 }
589
590 if ((rep->l == 0) ||
591 ((s < SV_STDATA) /* FIXME: this may be optimized && (rep->w == rep->h)*/)) {
592 if (EV_FD_COND_C(t->cli_fd, DIR_WR)) {
593 /* stop writing */
594 tv_eternity(&rep->wex);
595 }
596 } else {
597 /* buffer not empty */
598 if (EV_FD_COND_S(t->cli_fd, DIR_WR)) {
599 /* restart writing */
600 if (tv_add_ifset(&rep->wex, &now, &rep->wto)) {
601 /* FIXME: to prevent the client from expiring read timeouts during writes,
602 * we refresh it. */
603 req->rex = rep->wex;
604 }
605 else
606 tv_eternity(&rep->wex);
607 }
608 }
609 return 0; /* other cases change nothing */
610 }
611 else if (c == CL_STSHUTR) {
612 if (rep->flags & BF_WRITE_ERROR) {
613 buffer_shutw(rep);
614 fd_delete(t->cli_fd);
615 t->cli_state = CL_STCLOSE;
616 if (!(t->flags & SN_ERR_MASK))
617 t->flags |= SN_ERR_CLICL;
618 if (!(t->flags & SN_FINST_MASK)) {
619 if (t->pend_pos)
620 t->flags |= SN_FINST_Q;
621 else if (s == SV_STCONN)
622 t->flags |= SN_FINST_C;
623 else
624 t->flags |= SN_FINST_D;
625 }
626 return 1;
627 }
628 else if ((s == SV_STSHUTR || s == SV_STCLOSE) && (rep->l == 0)) {
629 buffer_shutw(rep);
630 fd_delete(t->cli_fd);
631 t->cli_state = CL_STCLOSE;
632 return 1;
633 }
634 else if (tv_isle(&rep->wex, &now)) {
635 buffer_shutw(rep);
636 fd_delete(t->cli_fd);
637 t->cli_state = CL_STCLOSE;
638 if (!(t->flags & SN_ERR_MASK))
639 t->flags |= SN_ERR_CLITO;
640 if (!(t->flags & SN_FINST_MASK)) {
641 if (t->pend_pos)
642 t->flags |= SN_FINST_Q;
643 else if (s == SV_STCONN)
644 t->flags |= SN_FINST_C;
645 else
646 t->flags |= SN_FINST_D;
647 }
648 return 1;
649 }
650
651 if (rep->l == 0) {
652 if (EV_FD_COND_C(t->cli_fd, DIR_WR)) {
653 /* stop writing */
654 tv_eternity(&rep->wex);
655 }
656 } else {
657 /* buffer not empty */
658 if (EV_FD_COND_S(t->cli_fd, DIR_WR)) {
659 /* restart writing */
660 if (!tv_add_ifset(&rep->wex, &now, &rep->wto))
661 tv_eternity(&rep->wex);
662 }
663 }
664 return 0;
665 }
666 else if (c == CL_STSHUTW) {
667 if (req->flags & BF_READ_ERROR) {
668 buffer_shutr(req);
669 fd_delete(t->cli_fd);
670 t->cli_state = CL_STCLOSE;
671 if (!(t->flags & SN_ERR_MASK))
672 t->flags |= SN_ERR_CLICL;
673 if (!(t->flags & SN_FINST_MASK)) {
674 if (t->pend_pos)
675 t->flags |= SN_FINST_Q;
676 else if (s == SV_STCONN)
677 t->flags |= SN_FINST_C;
678 else
679 t->flags |= SN_FINST_D;
680 }
681 return 1;
682 }
683 else if (req->flags & BF_READ_NULL || s == SV_STSHUTW || s == SV_STCLOSE) {
684 buffer_shutr(req);
685 fd_delete(t->cli_fd);
686 t->cli_state = CL_STCLOSE;
687 return 1;
688 }
689 else if (tv_isle(&req->rex, &now)) {
690 buffer_shutr(req);
691 fd_delete(t->cli_fd);
692 t->cli_state = CL_STCLOSE;
693 if (!(t->flags & SN_ERR_MASK))
694 t->flags |= SN_ERR_CLITO;
695 if (!(t->flags & SN_FINST_MASK)) {
696 if (t->pend_pos)
697 t->flags |= SN_FINST_Q;
698 else if (s == SV_STCONN)
699 t->flags |= SN_FINST_C;
700 else
701 t->flags |= SN_FINST_D;
702 }
703 return 1;
704 }
705 else if (req->l >= req->rlim - req->data) {
706 /* no room to read more data */
707
708 /* FIXME-20050705: is it possible for a client to maintain a session
709 * after the timeout by sending more data after it receives a close ?
710 */
711
712 if (EV_FD_COND_C(t->cli_fd, DIR_RD)) {
713 /* stop reading until we get some space */
714 tv_eternity(&req->rex);
715 //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
716 }
717 } else {
718 /* there's still some space in the buffer */
719 if (EV_FD_COND_S(t->cli_fd, DIR_RD)) {
720 if (!tv_add_ifset(&req->rex, &now, &req->rto))
721 tv_eternity(&req->rex);
722 //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
723 }
724 }
725 return 0;
726 }
727 else { /* CL_STCLOSE: nothing to do */
728 if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) {
729 int len;
730 len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n", t->uniq_id, t->be?t->be->id:"",
731 (unsigned short)t->cli_fd, (unsigned short)t->srv_fd);
732 write(1, trash, len);
733 }
734 return 0;
735 }
736 return 0;
737}
738
739#if 0
740 /* FIXME! This part has not been completely converted yet, and it may
741 * still be very specific to TCPv4 ! Also, it relies on some parameters
742 * such as conn_retries which are not set upon accept().
743 */
744/*
745 * Manages the server FSM and its socket. It returns 1 if a state has changed
746 * (and a resync may be needed), otherwise 0.
747 */
748static int process_uxst_srv(struct session *t)
749{
750 int s = t->srv_state;
751 int c = t->cli_state;
752 struct buffer *req = t->req;
753 struct buffer *rep = t->rep;
754 int conn_err;
755
756 if (s == SV_STIDLE) {
757 if (c == CL_STCLOSE || c == CL_STSHUTW ||
758 (c == CL_STSHUTR &&
759 (t->req->l == 0 || t->be->options & PR_O_ABRT_CLOSE))) { /* give up */
760 tv_eternity(&req->cex);
761 if (t->pend_pos)
762 t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
763 srv_close_with_err(t, SN_ERR_CLICL, t->pend_pos ? SN_FINST_Q : SN_FINST_C);
764 return 1;
765 }
766 else {
767 /* FIXME: reimplement the TARPIT check here */
768
769 /* Right now, we will need to create a connection to the server.
770 * We might already have tried, and got a connection pending, in
771 * which case we will not do anything till it's pending. It's up
772 * to any other session to release it and wake us up again.
773 */
774 if (t->pend_pos) {
775 if (!tv_isle(&req->cex, &now))
776 return 0;
777 else {
778 /* we've been waiting too long here */
779 tv_eternity(&req->cex);
780 t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
781 srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_Q);
782 if (t->srv)
783 t->srv->failed_conns++;
784 if (t->fe)
785 t->fe->failed_conns++;
786 return 1;
787 }
788 }
789
790 do {
791 /* first, get a connection */
792 if (srv_redispatch_connect(t))
793 return t->srv_state != SV_STIDLE;
794
795 /* try to (re-)connect to the server, and fail if we expire the
796 * number of retries.
797 */
798 if (srv_retryable_connect(t)) {
799 t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
800 return t->srv_state != SV_STIDLE;
801 }
802 } while (1);
803 }
804 }
805 else if (s == SV_STCONN) { /* connection in progress */
806 if (c == CL_STCLOSE || c == CL_STSHUTW ||
807 (c == CL_STSHUTR &&
808 ((t->req->l == 0 && !(req->flags & BF_WRITE_STATUS)) ||
809 t->be->options & PR_O_ABRT_CLOSE))) { /* give up */
810 tv_eternity(&req->cex);
811 fd_delete(t->srv_fd);
812 if (t->srv)
813 t->srv->cur_sess--;
814
815 srv_close_with_err(t, SN_ERR_CLICL, SN_FINST_C);
816 return 1;
817 }
818 if (!(req->flags & BF_WRITE_STATUS) && !tv_isle(&req->cex, &now)) {
819 //fprintf(stderr,"1: c=%d, s=%d, now=%d.%06d, exp=%d.%06d\n", c, s, now.tv_sec, now.tv_usec, req->cex.tv_sec, req->cex.tv_usec);
820 return 0; /* nothing changed */
821 }
822 else if (!(req->flags & BF_WRITE_STATUS) || (req->flags & BF_WRITE_ERROR)) {
823 /* timeout, asynchronous connect error or first write error */
824 //fprintf(stderr,"2: c=%d, s=%d\n", c, s);
825
826 fd_delete(t->srv_fd);
827 if (t->srv)
828 t->srv->cur_sess--;
829
830 if (!(req->flags & BF_WRITE_STATUS))
831 conn_err = SN_ERR_SRVTO; // it was a connect timeout.
832 else
833 conn_err = SN_ERR_SRVCL; // it was an asynchronous connect error.
834
835 /* ensure that we have enough retries left */
836 if (srv_count_retry_down(t, conn_err))
837 return 1;
838
839 if (t->srv && t->conn_retries == 0 && t->be->options & PR_O_REDISP) {
840 /* We're on our last chance, and the REDISP option was specified.
841 * We will ignore cookie and force to balance or use the dispatcher.
842 */
843 /* let's try to offer this slot to anybody */
844 if (may_dequeue_tasks(t->srv, t->be))
845 task_wakeup(t->srv->queue_mgt);
846
847 if (t->srv)
848 t->srv->failed_conns++;
849 t->be->failed_conns++;
850
851 t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
852 t->srv = NULL; /* it's left to the dispatcher to choose a server */
853
854 /* first, get a connection */
855 if (srv_redispatch_connect(t))
856 return t->srv_state != SV_STIDLE;
857 }
858
859 do {
860 /* Now we will try to either reconnect to the same server or
861 * connect to another server. If the connection gets queued
862 * because all servers are saturated, then we will go back to
863 * the SV_STIDLE state.
864 */
865 if (srv_retryable_connect(t)) {
866 t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
867 return t->srv_state != SV_STCONN;
868 }
869
870 /* we need to redispatch the connection to another server */
871 if (srv_redispatch_connect(t))
872 return t->srv_state != SV_STCONN;
873 } while (1);
874 }
875 else { /* no error or write 0 */
876 t->logs.t_connect = tv_ms_elapsed(&t->logs.tv_accept, &now);
877
878 //fprintf(stderr,"3: c=%d, s=%d\n", c, s);
879 if (req->l == 0) /* nothing to write */ {
880 EV_FD_CLR(t->srv_fd, DIR_WR);
881 tv_eternity(&req->wex);
882 } else /* need the right to write */ {
883 EV_FD_SET(t->srv_fd, DIR_WR);
884 if (tv_add_ifset(&req->wex, &now, &req->wto)) {
885 /* FIXME: to prevent the server from expiring read timeouts during writes,
886 * we refresh it. */
887 rep->rex = req->wex;
888 }
889 else
890 tv_eternity(&req->wex);
891 }
892
893 EV_FD_SET(t->srv_fd, DIR_RD);
894 if (!tv_add_ifset(&rep->rex, &now, &rep->rto))
895 tv_eternity(&rep->rex);
896
897 t->srv_state = SV_STDATA;
898 if (t->srv)
899 t->srv->cum_sess++;
900 rep->rlim = rep->data + BUFSIZE; /* no rewrite needed */
901
902 /* if the user wants to log as soon as possible, without counting
903 bytes from the server, then this is the right moment. */
904 if (t->fe && t->fe->to_log && !(t->logs.logwait & LW_BYTES)) {
905 t->logs.t_close = t->logs.t_connect; /* to get a valid end date */
906 //uxst_sess_log(t);
907 }
908 tv_eternity(&req->cex);
909 return 1;
910 }
911 }
912 else if (s == SV_STDATA) {
913 /* read or write error */
914 if (req->flags & BF_WRITE_ERROR || rep->flags & BF_READ_ERROR) {
915 buffer_shutr(rep);
916 buffer_shutw(req);
917 fd_delete(t->srv_fd);
918 if (t->srv) {
919 t->srv->cur_sess--;
920 t->srv->failed_resp++;
921 }
922 t->be->failed_resp++;
923 t->srv_state = SV_STCLOSE;
924 if (!(t->flags & SN_ERR_MASK))
925 t->flags |= SN_ERR_SRVCL;
926 if (!(t->flags & SN_FINST_MASK))
927 t->flags |= SN_FINST_D;
928 /* We used to have a free connection slot. Since we'll never use it,
929 * we have to inform the server that it may be used by another session.
930 */
931 if (may_dequeue_tasks(t->srv, t->be))
932 task_wakeup(t->srv->queue_mgt);
933
934 return 1;
935 }
936 /* last read, or end of client write */
937 else if (rep->flags & BF_READ_NULL || c == CL_STSHUTW || c == CL_STCLOSE) {
938 EV_FD_CLR(t->srv_fd, DIR_RD);
939 buffer_shutr(rep);
940 t->srv_state = SV_STSHUTR;
941 //fprintf(stderr,"%p:%s(%d), c=%d, s=%d\n", t, __FUNCTION__, __LINE__, t->cli_state, t->cli_state);
942 return 1;
943 }
944 /* end of client read and no more data to send */
945 else if ((c == CL_STSHUTR || c == CL_STCLOSE) && (req->l == 0)) {
946 EV_FD_CLR(t->srv_fd, DIR_WR);
947 buffer_shutw(req);
948 shutdown(t->srv_fd, SHUT_WR);
949 /* We must ensure that the read part is still alive when switching
950 * to shutw */
951 EV_FD_SET(t->srv_fd, DIR_RD);
952 tv_add_ifset(&rep->rex, &now, &rep->rto);
953
954 t->srv_state = SV_STSHUTW;
955 return 1;
956 }
957 /* read timeout */
958 else if (tv_isle(&rep->rex, &now)) {
959 EV_FD_CLR(t->srv_fd, DIR_RD);
960 buffer_shutr(rep);
961 t->srv_state = SV_STSHUTR;
962 if (!(t->flags & SN_ERR_MASK))
963 t->flags |= SN_ERR_SRVTO;
964 if (!(t->flags & SN_FINST_MASK))
965 t->flags |= SN_FINST_D;
966 return 1;
967 }
968 /* write timeout */
969 else if (tv_isle(&req->wex, &now)) {
970 EV_FD_CLR(t->srv_fd, DIR_WR);
971 buffer_shutw(req);
972 shutdown(t->srv_fd, SHUT_WR);
973 /* We must ensure that the read part is still alive when switching
974 * to shutw */
975 EV_FD_SET(t->srv_fd, DIR_RD);
976 tv_add_ifset(&rep->rex, &now, &rep->rto);
977 t->srv_state = SV_STSHUTW;
978 if (!(t->flags & SN_ERR_MASK))
979 t->flags |= SN_ERR_SRVTO;
980 if (!(t->flags & SN_FINST_MASK))
981 t->flags |= SN_FINST_D;
982 return 1;
983 }
984
985 /* recompute request time-outs */
986 if (req->l == 0) {
987 if (EV_FD_COND_C(t->srv_fd, DIR_WR)) {
988 /* stop writing */
989 tv_eternity(&req->wex);
990 }
991 }
992 else { /* buffer not empty, there are still data to be transferred */
993 if (EV_FD_COND_S(t->srv_fd, DIR_WR)) {
994 /* restart writing */
995 if (tv_add_ifset(&req->wex, &now, &req->wto)) {
996 /* FIXME: to prevent the server from expiring read timeouts during writes,
997 * we refresh it. */
998 rep->rex = req->wex;
999 }
1000 else
1001 tv_eternity(&req->wex);
1002 }
1003 }
1004
1005 /* recompute response time-outs */
1006 if (rep->l == BUFSIZE) { /* no room to read more data */
1007 if (EV_FD_COND_C(t->srv_fd, DIR_RD)) {
1008 tv_eternity(&rep->rex);
1009 }
1010 }
1011 else {
1012 if (EV_FD_COND_S(t->srv_fd, DIR_RD)) {
1013 if (!tv_add_ifset(&rep->rex, &now, &rep->rto))
1014 tv_eternity(&rep->rex);
1015 }
1016 }
1017
1018 return 0; /* other cases change nothing */
1019 }
1020 else if (s == SV_STSHUTR) {
1021 if (req->flags & BF_WRITE_ERROR) {
1022 //EV_FD_CLR(t->srv_fd, DIR_WR);
1023 buffer_shutw(req);
1024 fd_delete(t->srv_fd);
1025 if (t->srv) {
1026 t->srv->cur_sess--;
1027 t->srv->failed_resp++;
1028 }
1029 t->be->failed_resp++;
1030 //close(t->srv_fd);
1031 t->srv_state = SV_STCLOSE;
1032 if (!(t->flags & SN_ERR_MASK))
1033 t->flags |= SN_ERR_SRVCL;
1034 if (!(t->flags & SN_FINST_MASK))
1035 t->flags |= SN_FINST_D;
1036 /* We used to have a free connection slot. Since we'll never use it,
1037 * we have to inform the server that it may be used by another session.
1038 */
1039 if (may_dequeue_tasks(t->srv, t->be))
1040 task_wakeup(t->srv->queue_mgt);
1041
1042 return 1;
1043 }
1044 else if ((c == CL_STSHUTR || c == CL_STCLOSE) && (req->l == 0)) {
1045 //EV_FD_CLR(t->srv_fd, DIR_WR);
1046 buffer_shutw(req);
1047 fd_delete(t->srv_fd);
1048 if (t->srv)
1049 t->srv->cur_sess--;
1050 //close(t->srv_fd);
1051 t->srv_state = SV_STCLOSE;
1052 /* We used to have a free connection slot. Since we'll never use it,
1053 * we have to inform the server that it may be used by another session.
1054 */
1055 if (may_dequeue_tasks(t->srv, t->be))
1056 task_wakeup(t->srv->queue_mgt);
1057
1058 return 1;
1059 }
1060 else if (tv_isle(&req->wex, &now)) {
1061 //EV_FD_CLR(t->srv_fd, DIR_WR);
1062 buffer_shutw(req);
1063 fd_delete(t->srv_fd);
1064 if (t->srv)
1065 t->srv->cur_sess--;
1066 //close(t->srv_fd);
1067 t->srv_state = SV_STCLOSE;
1068 if (!(t->flags & SN_ERR_MASK))
1069 t->flags |= SN_ERR_SRVTO;
1070 if (!(t->flags & SN_FINST_MASK))
1071 t->flags |= SN_FINST_D;
1072 /* We used to have a free connection slot. Since we'll never use it,
1073 * we have to inform the server that it may be used by another session.
1074 */
1075 if (may_dequeue_tasks(t->srv, t->be))
1076 task_wakeup(t->srv->queue_mgt);
1077
1078 return 1;
1079 }
1080 else if (req->l == 0) {
1081 if (EV_FD_COND_C(t->srv_fd, DIR_WR)) {
1082 /* stop writing */
1083 tv_eternity(&req->wex);
1084 }
1085 }
1086 else { /* buffer not empty */
1087 if (EV_FD_COND_S(t->srv_fd, DIR_WR)) {
1088 /* restart writing */
1089 if (!tv_add_ifset(&req->wex, &now, &req->wto))
1090 tv_eternity(&req->wex);
1091 }
1092 }
1093 return 0;
1094 }
1095 else if (s == SV_STSHUTW) {
1096 if (rep->flags & BF_READ_ERROR) {
1097 //EV_FD_CLR(t->srv_fd, DIR_RD);
1098 buffer_shutr(rep);
1099 fd_delete(t->srv_fd);
1100 if (t->srv) {
1101 t->srv->cur_sess--;
1102 t->srv->failed_resp++;
1103 }
1104 t->be->failed_resp++;
1105 //close(t->srv_fd);
1106 t->srv_state = SV_STCLOSE;
1107 if (!(t->flags & SN_ERR_MASK))
1108 t->flags |= SN_ERR_SRVCL;
1109 if (!(t->flags & SN_FINST_MASK))
1110 t->flags |= SN_FINST_D;
1111 /* We used to have a free connection slot. Since we'll never use it,
1112 * we have to inform the server that it may be used by another session.
1113 */
1114 if (may_dequeue_tasks(t->srv, t->be))
1115 task_wakeup(t->srv->queue_mgt);
1116
1117 return 1;
1118 }
1119 else if (rep->flags & BF_READ_NULL || c == CL_STSHUTW || c == CL_STCLOSE) {
1120 //EV_FD_CLR(t->srv_fd, DIR_RD);
1121 buffer_shutr(rep);
1122 fd_delete(t->srv_fd);
1123 if (t->srv)
1124 t->srv->cur_sess--;
1125 //close(t->srv_fd);
1126 t->srv_state = SV_STCLOSE;
1127 /* We used to have a free connection slot. Since we'll never use it,
1128 * we have to inform the server that it may be used by another session.
1129 */
1130 if (may_dequeue_tasks(t->srv, t->be))
1131 task_wakeup(t->srv->queue_mgt);
1132
1133 return 1;
1134 }
1135 else if (tv_isle(&rep->rex, &now)) {
1136 //EV_FD_CLR(t->srv_fd, DIR_RD);
1137 buffer_shutr(rep);
1138 fd_delete(t->srv_fd);
1139 if (t->srv)
1140 t->srv->cur_sess--;
1141 //close(t->srv_fd);
1142 t->srv_state = SV_STCLOSE;
1143 if (!(t->flags & SN_ERR_MASK))
1144 t->flags |= SN_ERR_SRVTO;
1145 if (!(t->flags & SN_FINST_MASK))
1146 t->flags |= SN_FINST_D;
1147 /* We used to have a free connection slot. Since we'll never use it,
1148 * we have to inform the server that it may be used by another session.
1149 */
1150 if (may_dequeue_tasks(t->srv, t->be))
1151 task_wakeup(t->srv->queue_mgt);
1152
1153 return 1;
1154 }
1155 else if (rep->l == BUFSIZE) { /* no room to read more data */
1156 if (EV_FD_COND_C(t->srv_fd, DIR_RD)) {
1157 tv_eternity(&rep->rex);
1158 }
1159 }
1160 else {
1161 if (EV_FD_COND_S(t->srv_fd, DIR_RD)) {
1162 if (!tv_add_ifset(&rep->rex, &now, &rep->rto))
1163 tv_eternity(&rep->rex);
1164 }
1165 }
1166 return 0;
1167 }
1168 else { /* SV_STCLOSE : nothing to do */
1169 if ((global.mode & MODE_DEBUG) && (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE))) {
1170 int len;
1171 len = sprintf(trash, "%08x:%s.srvcls[%04x:%04x]\n",
1172 t->uniq_id, t->be->id, (unsigned short)t->cli_fd, (unsigned short)t->srv_fd);
1173 write(1, trash, len);
1174 }
1175 return 0;
1176 }
1177 return 0;
1178}
1179
1180/* Processes the client and server jobs of a session task, then
1181 * puts it back to the wait queue in a clean state, or
1182 * cleans up its resources if it must be deleted. Returns
1183 * the time the task accepts to wait, or TIME_ETERNITY for
1184 * infinity.
1185 */
1186void process_uxst_session(struct task *t, struct timeval *next)
1187{
1188 struct session *s = t->context;
1189 int fsm_resync = 0;
1190
1191 do {
1192 fsm_resync = 0;
1193 fsm_resync |= process_uxst_cli(s);
1194 if (s->srv_state == SV_STIDLE) {
1195 if (s->cli_state == CL_STCLOSE || s->cli_state == CL_STSHUTW) {
1196 s->srv_state = SV_STCLOSE;
1197 fsm_resync |= 1;
1198 continue;
1199 }
1200 if (s->cli_state == CL_STSHUTR ||
1201 (s->req->l >= s->req->rlim - s->req->data)) {
1202 if (s->req->l == 0) {
1203 s->srv_state = SV_STCLOSE;
1204 fsm_resync |= 1;
1205 continue;
1206 }
1207 /* OK we have some remaining data to process */
1208 /* Just as an exercice, we copy the req into the resp,
1209 * and flush the req.
1210 */
1211 memcpy(s->rep->data, s->req->data, sizeof(s->rep->data));
1212 s->rep->l = s->req->l;
1213 s->rep->rlim = s->rep->data + BUFSIZE;
1214 s->rep->w = s->rep->data;
1215 s->rep->lr = s->rep->r = s->rep->data + s->rep->l;
1216
1217 s->req->l = 0;
1218 s->srv_state = SV_STCLOSE;
1219
1220 fsm_resync |= 1;
1221 continue;
1222 }
1223 }
1224 } while (fsm_resync);
1225
1226 if (likely(s->cli_state != CL_STCLOSE || s->srv_state != SV_STCLOSE)) {
1227 s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
1228 s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
1229
1230 t->expire = s->req->rex;
1231 tv_min(&t->expire, &s->req->rex, &s->req->wex);
1232 tv_bound(&t->expire, &s->req->cex);
1233 tv_bound(&t->expire, &s->rep->rex);
1234 tv_bound(&t->expire, &s->rep->wex);
1235
1236 /* restore t to its place in the task list */
1237 task_queue(t);
1238
1239 *next = t->expire;
1240 return; /* nothing more to do */
1241 }
1242
1243 if (s->fe)
1244 s->fe->feconn--;
1245 if (s->be && (s->flags & SN_BE_ASSIGNED))
1246 s->be->beconn--;
1247 actconn--;
1248
1249 if (unlikely((global.mode & MODE_DEBUG) &&
1250 (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
1251 int len;
1252 len = sprintf(trash, "%08x:%s.closed[%04x:%04x]\n",
1253 s->uniq_id, s->be->id,
1254 (unsigned short)s->cli_fd, (unsigned short)s->srv_fd);
1255 write(1, trash, len);
1256 }
1257
1258 s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now);
1259 if (s->req != NULL)
1260 s->logs.bytes_in = s->req->total;
1261 if (s->rep != NULL)
1262 s->logs.bytes_out = s->rep->total;
1263
1264 if (s->fe) {
1265 s->fe->bytes_in += s->logs.bytes_in;
1266 s->fe->bytes_out += s->logs.bytes_out;
1267 }
1268 if (s->be && (s->be != s->fe)) {
1269 s->be->bytes_in += s->logs.bytes_in;
1270 s->be->bytes_out += s->logs.bytes_out;
1271 }
1272 if (s->srv) {
1273 s->srv->bytes_in += s->logs.bytes_in;
1274 s->srv->bytes_out += s->logs.bytes_out;
1275 }
1276
1277 /* let's do a final log if we need it */
1278 if (s->logs.logwait &&
1279 !(s->flags & SN_MONITOR) &&
1280 (s->req->total || !(s->fe && s->fe->options & PR_O_NULLNOLOG))) {
1281 //uxst_sess_log(s);
1282 }
1283
1284 /* the task MUST not be in the run queue anymore */
1285 task_delete(t);
1286 session_free(s);
1287 task_free(t);
1288 tv_eternity(next);
1289}
1290#endif /* not converted */
1291
1292
1293/* Processes data exchanges on the statistics socket. The client processing
1294 * is called and the task is put back in the wait queue or it is cleared.
1295 * In order to ease the transition, we simply simulate the server status
1296 * for now. It only knows states SV_STIDLE and SV_STCLOSE. Returns in <next>
1297 * the task's expiration date.
1298 */
1299void process_uxst_stats(struct task *t, struct timeval *next)
1300{
1301 struct session *s = t->context;
1302 struct listener *listener;
1303 int fsm_resync = 0;
1304
1305 do {
1306 //fprintf(stderr,"fct %s:%d\n", __FUNCTION__, __LINE__);
1307 fsm_resync = 0;
1308 fsm_resync |= process_uxst_cli(s);
1309 if (s->srv_state == SV_STIDLE) {
1310 if (s->cli_state == CL_STCLOSE || s->cli_state == CL_STSHUTW) {
1311 s->srv_state = SV_STCLOSE;
1312 fsm_resync |= 1;
1313 continue;
1314 }
1315 else if (s->cli_state == CL_STSHUTR ||
1316 (s->req->l >= s->req->rlim - s->req->data)) {
1317 if (s->req->l == 0) {
1318 s->srv_state = SV_STCLOSE;
1319 fsm_resync |= 1;
1320 continue;
1321 }
1322 /* OK we have some remaining data to process. Just for the
1323 * sake of an exercice, we copy the req into the resp,
1324 * and flush the req. This produces a simple echo function.
1325 */
1326 memcpy(s->rep->data, s->req->data, sizeof(s->rep->data));
1327 s->rep->l = s->req->l;
1328 s->rep->rlim = s->rep->data + BUFSIZE;
1329 s->rep->w = s->rep->data;
1330 s->rep->lr = s->rep->r = s->rep->data + s->rep->l;
1331
1332 s->req->l = 0;
1333 s->srv_state = SV_STCLOSE;
1334
1335 fsm_resync |= 1;
1336 continue;
1337 }
1338 }
1339 } while (fsm_resync);
1340
1341 if (likely(s->cli_state != CL_STCLOSE || s->srv_state != SV_STCLOSE)) {
1342 s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
1343 s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
1344
1345 t->expire = s->req->rex;
1346 tv_min(&t->expire, &s->req->rex, &s->req->wex);
1347 tv_bound(&t->expire, &s->req->cex);
1348 tv_bound(&t->expire, &s->rep->rex);
1349 tv_bound(&t->expire, &s->rep->wex);
1350
1351 /* restore t to its place in the task list */
1352 task_queue(t);
1353
1354 *next = t->expire;
1355 return; /* nothing more to do */
1356 }
1357
1358 actconn--;
1359 listener = fdtab[s->cli_fd].listener;
1360 if (listener) {
1361 listener->nbconn--;
1362 if (listener->state == LI_FULL &&
1363 listener->nbconn < listener->maxconn) {
1364 /* we should reactivate the listener */
1365 EV_FD_SET(listener->fd, DIR_RD);
1366 listener->state = LI_READY;
1367 }
1368 }
1369
1370 /* the task MUST not be in the run queue anymore */
1371 task_delete(t);
1372 session_free(s);
1373 task_free(t);
1374 tv_eternity(next);
1375}
1376
1377/* Note: must not be declared <const> as its list will be overwritten */
1378static struct protocol proto_unix = {
1379 .name = "unix_stream",
1380 .sock_domain = PF_UNIX,
1381 .sock_type = SOCK_STREAM,
1382 .sock_prot = 0,
1383 .sock_family = AF_UNIX,
1384 .read = &stream_sock_read,
1385 .write = &stream_sock_write,
1386 .bind_all = uxst_bind_listeners,
1387 .unbind_all = uxst_unbind_listeners,
1388 .enable_all = uxst_enable_listeners,
1389 .listeners = LIST_HEAD_INIT(proto_unix.listeners),
1390 .nb_listeners = 0,
1391};
1392
1393/* Adds listener to the list of unix stream listeners */
1394void uxst_add_listener(struct listener *listener)
1395{
1396 listener->proto = &proto_unix;
1397 LIST_ADDQ(&proto_unix.listeners, &listener->proto_list);
1398 proto_unix.nb_listeners++;
1399}
1400
1401__attribute__((constructor))
1402static void __uxst_protocol_init(void)
1403{
1404 protocol_register(&proto_unix);
1405 //tv_eternity(&global.unix_fe.clitimeout);
1406}
1407
1408
1409/*
1410 * Local variables:
1411 * c-indent-level: 8
1412 * c-basic-offset: 8
1413 * End:
1414 */