[MAJOR] create proto_tcp and move initialization of proxy listeners
Proxy listeners were very special and not very easy to manipulate.
A proto_tcp file has been created with all that is required to
manage TCPv4/TCPv6 as raw protocols, and provide generic listeners.
The code of start_proxies() and maintain_proxies() now looks less
like spaghetti. Also, event_accept will need a serious lifting in
order to use more of the information provided by the listener.
diff --git a/src/cfgparse.c b/src/cfgparse.c
index 0df66da..aaafe8c 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -44,6 +44,9 @@
#include <proto/dumpstats.h>
#include <proto/httperr.h>
#include <proto/log.h>
+#include <proto/protocols.h>
+#include <proto/proto_tcp.h>
+#include <proto/proto_http.h>
#include <proto/proxy.h>
#include <proto/server.h>
#include <proto/task.h>
@@ -208,11 +211,16 @@
l->fd = -1;
l->addr = ss;
- if (ss.ss_family == AF_INET6)
+ l->state = LI_INIT;
+
+ if (ss.ss_family == AF_INET6) {
((struct sockaddr_in6 *)(&l->addr))->sin6_port = htons(port);
- else
+ tcpv6_add_listener(l);
+ } else {
((struct sockaddr_in *)(&l->addr))->sin_port = htons(port);
-
+ tcpv4_add_listener(l);
+ }
+ listeners++;
} /* end for(port) */
} /* end while(next) */
free(dupstr);
@@ -2444,6 +2452,7 @@
while (curproxy != NULL) {
struct switching_rule *rule;
+ struct listener *listener;
if (curproxy->state == PR_STSTOPPED) {
curproxy = curproxy->next;
@@ -2726,6 +2735,19 @@
newsrv = newsrv->next;
}
+ /* adjust this proxy's listeners */
+ listener = curproxy->listen;
+ while (listener) {
+ if (curproxy->options & PR_O_TCP_NOLING)
+ listener->options |= LI_O_NOLINGER;
+ listener->maxconn = curproxy->maxconn;
+ listener->timeout = &curproxy->clitimeout;
+ listener->accept = event_accept;
+ listener->private = curproxy;
+
+ listener = listener->next;
+ }
+
curproxy = curproxy->next;
}
if (cfgerr > 0) {
diff --git a/src/client.c b/src/client.c
index c813cba..2ee41a6 100644
--- a/src/client.c
+++ b/src/client.c
@@ -68,7 +68,8 @@
* It returns 0.
*/
int event_accept(int fd) {
- struct proxy *p = (struct proxy *)fdtab[fd].owner;
+ struct listener *l = (struct listener *)fdtab[fd].owner;
+ struct proxy *p = (struct proxy *)l->private; /* attached frontend */
struct session *s;
struct http_txn *txn;
struct task *t;
@@ -362,10 +363,11 @@
fd_insert(cfd);
fdtab[cfd].owner = t;
+ fdtab[cfd].listener = l;
fdtab[cfd].state = FD_STREADY;
- fdtab[cfd].cb[DIR_RD].f = &stream_sock_read;
+ fdtab[cfd].cb[DIR_RD].f = l->proto->read;
fdtab[cfd].cb[DIR_RD].b = s->req;
- fdtab[cfd].cb[DIR_WR].f = &stream_sock_write;
+ fdtab[cfd].cb[DIR_WR].f = l->proto->write;
fdtab[cfd].cb[DIR_WR].b = s->rep;
fdtab[cfd].peeraddr = (struct sockaddr *)&s->cli_addr;
fdtab[cfd].peerlen = sizeof(s->cli_addr);
diff --git a/src/proto_tcp.c b/src/proto_tcp.c
new file mode 100644
index 0000000..593096d
--- /dev/null
+++ b/src/proto_tcp.c
@@ -0,0 +1,255 @@
+/*
+ * AF_INET/AF_INET6 SOCK_STREAM protocol layer (tcp)
+ *
+ * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ *
+ */
+
+#include <ctype.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+
+#include <sys/param.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/un.h>
+
+#include <common/compat.h>
+#include <common/config.h>
+#include <common/debug.h>
+#include <common/errors.h>
+#include <common/memory.h>
+#include <common/mini-clist.h>
+#include <common/standard.h>
+#include <common/time.h>
+#include <common/version.h>
+
+#include <types/acl.h>
+#include <types/client.h>
+#include <types/global.h>
+#include <types/polling.h>
+#include <types/proxy.h>
+#include <types/server.h>
+
+#include <proto/acl.h>
+#include <proto/backend.h>
+#include <proto/buffers.h>
+#include <proto/fd.h>
+#include <proto/protocols.h>
+#include <proto/proto_tcp.h>
+#include <proto/queue.h>
+#include <proto/senddata.h>
+#include <proto/session.h>
+#include <proto/stream_sock.h>
+#include <proto/task.h>
+
+static int tcp_bind_listeners(struct protocol *proto);
+
+/* Note: must not be declared <const> as its list will be overwritten */
+static struct protocol proto_tcpv4 = {
+ .name = "tcpv4",
+ .sock_domain = AF_INET,
+ .sock_type = SOCK_STREAM,
+ .sock_prot = IPPROTO_TCP,
+ .sock_family = AF_INET,
+ .sock_addrlen = sizeof(struct sockaddr_in),
+ .l3_addrlen = 32/8,
+ .read = &stream_sock_read,
+ .write = &stream_sock_write,
+ .bind_all = tcp_bind_listeners,
+ .unbind_all = unbind_all_listeners,
+ .enable_all = enable_all_listeners,
+ .listeners = LIST_HEAD_INIT(proto_tcpv4.listeners),
+ .nb_listeners = 0,
+};
+
+/* Note: must not be declared <const> as its list will be overwritten */
+static struct protocol proto_tcpv6 = {
+ .name = "tcpv6",
+ .sock_domain = AF_INET6,
+ .sock_type = SOCK_STREAM,
+ .sock_prot = IPPROTO_TCP,
+ .sock_family = AF_INET6,
+ .sock_addrlen = sizeof(struct sockaddr_in6),
+ .l3_addrlen = 128/8,
+ .read = &stream_sock_read,
+ .write = &stream_sock_write,
+ .bind_all = tcp_bind_listeners,
+ .unbind_all = unbind_all_listeners,
+ .enable_all = enable_all_listeners,
+ .listeners = LIST_HEAD_INIT(proto_tcpv6.listeners),
+ .nb_listeners = 0,
+};
+
+
+/* This function tries to bind a TCPv4/v6 listener. It may return a warning or
+ * an error message in <err> if the message is at most <errlen> bytes long
+ * (including '\0'). The return value is composed from ERR_ABORT, ERR_WARN,
+ * ERR_ALERT, ERR_RETRYABLE and ERR_FATAL. ERR_NONE indicates that everything
+ * was alright and that no message was returned. ERR_RETRYABLE means that an
+ * error occurred but that it may vanish after a retry (eg: port in use), and
+ * ERR_FATAL indicates a non-fixable error.ERR_WARN and ERR_ALERT do not alter
+ * the meaning of the error, but just indicate that a message is present which
+ * should be displayed with the respective level. Last, ERR_ABORT indicates
+ * that it's pointless to try to start other listeners. No error message is
+ * returned if errlen is NULL.
+ */
+int tcp_bind_listener(struct listener *listener, char *errmsg, int errlen)
+{
+ __label__ tcp_return, tcp_close_return;
+ int fd, err;
+ const char *msg = NULL;
+
+ /* ensure we never return garbage */
+ if (errmsg && errlen)
+ *errmsg = 0;
+
+ if (listener->state != LI_ASSIGNED)
+ return ERR_NONE; /* already bound */
+
+ err = ERR_NONE;
+
+ if ((fd = socket(listener->addr.ss_family, SOCK_STREAM, IPPROTO_TCP)) == -1) {
+ err |= ERR_RETRYABLE | ERR_ALERT;
+ msg = "cannot create listening socket";
+ goto tcp_return;
+ }
+
+ if (fd >= global.maxsock) {
+ err |= ERR_FATAL | ERR_ABORT | ERR_ALERT;
+ msg = "not enough free sockets (raise '-n' parameter)";
+ goto tcp_close_return;
+ }
+
+ if ((fcntl(fd, F_SETFL, O_NONBLOCK) == -1) ||
+ (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
+ (char *) &one, sizeof(one)) == -1)) {
+ err |= ERR_FATAL | ERR_ALERT;
+ msg = "cannot make socket non-blocking";
+ goto tcp_close_return;
+ }
+
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1) {
+ /* not fatal but should be reported */
+ msg = "cannot do so_reuseaddr";
+ err |= ERR_ALERT;
+ }
+
+ if (listener->options & LI_O_NOLINGER)
+ setsockopt(fd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger));
+
+#ifdef SO_REUSEPORT
+ /* OpenBSD supports this. As it's present in old libc versions of Linux,
+ * it might return an error that we will silently ignore.
+ */
+ setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (char *) &one, sizeof(one));
+#endif
+ if (bind(fd, (struct sockaddr *)&listener->addr, listener->proto->sock_addrlen) == -1) {
+ err |= ERR_RETRYABLE | ERR_ALERT;
+ msg = "cannot bind socket";
+ goto tcp_close_return;
+ }
+
+ if (listen(fd, listener->maxconn) == -1) {
+ err |= ERR_RETRYABLE | ERR_ALERT;
+ msg = "cannot listen to socket";
+ goto tcp_close_return;
+ }
+
+ /* the socket is ready */
+ listener->fd = fd;
+ listener->state = LI_LISTEN;
+
+ /* the function for the accept() event */
+ fd_insert(fd);
+ fdtab[fd].cb[DIR_RD].f = listener->accept;
+ fdtab[fd].cb[DIR_WR].f = NULL; /* never called */
+ fdtab[fd].cb[DIR_RD].b = fdtab[fd].cb[DIR_WR].b = NULL;
+ fdtab[fd].owner = (struct task *)listener; /* reference the listener instead of a task */
+ fdtab[fd].state = FD_STLISTEN;
+ fdtab[fd].peeraddr = NULL;
+ fdtab[fd].peerlen = 0;
+ fdtab[fd].listener = NULL;
+ fdtab[fd].ev = 0;
+ tcp_return:
+ if (msg && errlen)
+ strlcpy2(errmsg, msg, errlen);
+ return err;
+
+ tcp_close_return:
+ close(fd);
+ goto tcp_return;
+}
+
+/* This function creates all TCP sockets bound to the protocol entry <proto>.
+ * It is intended to be used as the protocol's bind_all() function.
+ * The sockets will be registered but not added to any fd_set, in order not to
+ * loose them across the fork(). A call to enable_all_listeners() is needed
+ * to complete initialization. The return value is composed from ERR_*.
+ */
+static int tcp_bind_listeners(struct protocol *proto)
+{
+ struct listener *listener;
+ int err = ERR_NONE;
+
+ list_for_each_entry(listener, &proto->listeners, proto_list) {
+ err |= tcp_bind_listener(listener, NULL, 0);
+ if ((err & ERR_CODE) == ERR_ABORT)
+ break;
+ }
+
+ return err;
+}
+
+/* Add listener to the list of tcpv4 listeners. The listener's state
+ * is automatically updated from LI_INIT to LI_ASSIGNED. The number of
+ * listeners is updated. This is the function to use to add a new listener.
+ */
+void tcpv4_add_listener(struct listener *listener)
+{
+ if (listener->state != LI_INIT)
+ return;
+ listener->state = LI_ASSIGNED;
+ listener->proto = &proto_tcpv4;
+ LIST_ADDQ(&proto_tcpv4.listeners, &listener->proto_list);
+ proto_tcpv4.nb_listeners++;
+}
+
+/* Add listener to the list of tcpv4 listeners. The listener's state
+ * is automatically updated from LI_INIT to LI_ASSIGNED. The number of
+ * listeners is updated. This is the function to use to add a new listener.
+ */
+void tcpv6_add_listener(struct listener *listener)
+{
+ if (listener->state != LI_INIT)
+ return;
+ listener->state = LI_ASSIGNED;
+ listener->proto = &proto_tcpv6;
+ LIST_ADDQ(&proto_tcpv6.listeners, &listener->proto_list);
+ proto_tcpv6.nb_listeners++;
+}
+
+__attribute__((constructor))
+static void __tcp_protocol_init(void)
+{
+ protocol_register(&proto_tcpv4);
+ protocol_register(&proto_tcpv6);
+}
+
+
+/*
+ * Local variables:
+ * c-indent-level: 8
+ * c-basic-offset: 8
+ * End:
+ */
diff --git a/src/proxy.c b/src/proxy.c
index feb93c2..1c7cff6 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -30,10 +30,12 @@
#include <proto/backend.h>
#include <proto/fd.h>
#include <proto/log.h>
+#include <proto/protocols.h>
+#include <proto/proto_tcp.h>
#include <proto/proxy.h>
-int listeners; /* # of listeners */
+int listeners; /* # of proxy listeners, set by cfgparse, unset by maintain_proxies */
struct proxy *proxy = NULL; /* list of all existing proxies */
/*
@@ -120,8 +122,9 @@
{
struct proxy *curproxy;
struct listener *listener;
- int err = ERR_NONE;
- int fd, pxerr;
+ int lerr, err = ERR_NONE;
+ int pxerr;
+ char msg[100];
for (curproxy = proxy; curproxy != NULL; curproxy = curproxy->next) {
if (curproxy->state != PR_STNEW)
@@ -129,96 +132,39 @@
pxerr = 0;
for (listener = curproxy->listen; listener != NULL; listener = listener->next) {
- if (listener->fd != -1)
- continue; /* already initialized */
+ if (listener->state != LI_ASSIGNED)
+ continue; /* already started */
- if ((fd = socket(listener->addr.ss_family, SOCK_STREAM, IPPROTO_TCP)) == -1) {
- if (verbose)
- Alert("cannot create listening socket for proxy %s. Aborting.\n",
- curproxy->id);
- err |= ERR_RETRYABLE;
- pxerr |= 1;
- continue;
- }
-
- if (fd >= global.maxsock) {
- Alert("socket(): not enough free sockets for proxy %s. Raise -n argument. Aborting.\n",
- curproxy->id);
- close(fd);
- err |= ERR_FATAL;
- pxerr |= 1;
- break;
- }
+ lerr = tcp_bind_listener(listener, msg, sizeof(msg));
- if ((fcntl(fd, F_SETFL, O_NONBLOCK) == -1) ||
- (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
- (char *) &one, sizeof(one)) == -1)) {
- Alert("cannot make socket non-blocking for proxy %s. Aborting.\n",
- curproxy->id);
- close(fd);
- err |= ERR_FATAL;
- pxerr |= 1;
- break;
+ /* errors are reported if <verbose> is set or if they are fatal */
+ if (verbose || (lerr & (ERR_FATAL | ERR_ABORT))) {
+ if (lerr & ERR_ALERT)
+ Alert("Starting %s %s: %s\n",
+ proxy_type_str(curproxy), curproxy->id, msg);
+ else if (lerr & ERR_WARN)
+ Warning("Starting %s %s: %s\n",
+ proxy_type_str(curproxy), curproxy->id, msg);
}
- if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1) {
- Alert("cannot do so_reuseaddr for proxy %s. Continuing.\n",
- curproxy->id);
- }
-
- if (curproxy->options & PR_O_TCP_NOLING)
- setsockopt(fd, SOL_SOCKET, SO_LINGER, (struct linger *) &nolinger, sizeof(struct linger));
-
-#ifdef SO_REUSEPORT
- /* OpenBSD supports this. As it's present in old libc versions of Linux,
- * it might return an error that we will silently ignore.
- */
- setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (char *) &one, sizeof(one));
-#endif
- if (bind(fd,
- (struct sockaddr *)&listener->addr,
- listener->addr.ss_family == AF_INET6 ?
- sizeof(struct sockaddr_in6) :
- sizeof(struct sockaddr_in)) == -1) {
- if (verbose)
- Alert("cannot bind socket for proxy %s. Aborting.\n",
- curproxy->id);
- close(fd);
- err |= ERR_RETRYABLE;
+ err |= lerr;
+ if (lerr & (ERR_ABORT | ERR_FATAL)) {
pxerr |= 1;
- continue;
+ break;
}
-
- if (listen(fd, curproxy->maxconn) == -1) {
- if (verbose)
- Alert("cannot listen to socket for proxy %s. Aborting.\n",
- curproxy->id);
- close(fd);
- err |= ERR_RETRYABLE;
+ else if (lerr & ERR_CODE) {
pxerr |= 1;
continue;
}
-
- /* the socket is ready */
- listener->fd = fd;
-
- /* the function for the accept() event */
- fd_insert(fd);
- fdtab[fd].cb[DIR_RD].f = &event_accept;
- fdtab[fd].cb[DIR_WR].f = NULL; /* never called */
- fdtab[fd].cb[DIR_RD].b = fdtab[fd].cb[DIR_WR].b = NULL;
- fdtab[fd].owner = (struct task *)curproxy; /* reference the proxy instead of a task */
- fdtab[fd].state = FD_STLISTEN;
- fdtab[fd].peeraddr = NULL;
- fdtab[fd].peerlen = 0;
- fdtab[fd].ev = 0;
- listeners++;
}
if (!pxerr) {
curproxy->state = PR_STIDLE;
send_log(curproxy, LOG_NOTICE, "Proxy %s started.\n", curproxy->id);
}
+
+ if (err & ERR_ABORT)
+ break;
}
return err;
@@ -244,17 +190,15 @@
while (p) {
if (p->feconn < p->maxconn) {
if (p->state == PR_STIDLE) {
- for (l = p->listen; l != NULL; l = l->next) {
- EV_FD_SET(l->fd, DIR_RD);
- }
+ for (l = p->listen; l != NULL; l = l->next)
+ enable_listener(l);
p->state = PR_STRUN;
}
}
else {
if (p->state == PR_STRUN) {
- for (l = p->listen; l != NULL; l = l->next) {
- EV_FD_CLR(l->fd, DIR_RD);
- }
+ for (l = p->listen; l != NULL; l = l->next)
+ disable_listener(l);
p->state = PR_STIDLE;
}
}
@@ -264,9 +208,8 @@
else { /* block all proxies */
while (p) {
if (p->state == PR_STRUN) {
- for (l = p->listen; l != NULL; l = l->next) {
- EV_FD_CLR(l->fd, DIR_RD);
- }
+ for (l = p->listen; l != NULL; l = l->next)
+ disable_listener(l);
p->state = PR_STIDLE;
}
p = p->next;
@@ -284,8 +227,11 @@
send_log(p, LOG_WARNING, "Proxy %s stopped.\n", p->id);
for (l = p->listen; l != NULL; l = l->next) {
- fd_delete(l->fd);
- listeners--;
+ unbind_listener(l);
+ if (l->state >= LI_ASSIGNED) {
+ delete_listener(l);
+ listeners--;
+ }
}
p->state = PR_STSTOPPED;
/* try to free more memory */