MEDIUM: listener: use protocol->accept_conn() to accept a connection
Now listener_accept() doesn't have to deal with the incoming FD anymore
(except for a little bit of side band stuff). It directly retrieves a
valid connection from the protocol layer, or receives a well-defined
error code that helps it decide how to proceed. This removes a lot of
hardly maintainable low-level code and opens the function to receive
new protocol stacks.
diff --git a/src/listener.c b/src/listener.c
index b7dd934..c98f347 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -10,7 +10,6 @@
*
*/
-#define _GNU_SOURCE
#include <ctype.h>
#include <errno.h>
#include <stdio.h>
@@ -29,7 +28,6 @@
#include <haproxy/list.h>
#include <haproxy/listener.h>
#include <haproxy/log.h>
-#include <haproxy/proto_sockpair.h>
#include <haproxy/protocol-t.h>
#include <haproxy/protocol.h>
#include <haproxy/sample.h>
@@ -695,11 +693,7 @@
int next_feconn = 0;
int next_actconn = 0;
int expire;
- int cfd;
int ret;
-#ifdef USE_ACCEPT4
- static int accept4_broken;
-#endif
if (!l)
return;
@@ -768,10 +762,9 @@
* shortage, we try again 100ms later in the worst case.
*/
for (; max_accept; next_conn = next_feconn = next_actconn = 0, max_accept--) {
- struct sockaddr_storage addr;
- socklen_t laddr = sizeof(addr);
unsigned int count;
__decl_thread(unsigned long mask);
+ int status;
/* pre-increase the number of connections without going too far.
* We process the listener, then the proxy, then the process.
@@ -820,100 +813,29 @@
} while (!_HA_ATOMIC_CAS(&actconn, (int *)(&count), next_actconn));
}
- /* with sockpair@ we don't want to do an accept */
- if (unlikely(l->rx.addr.ss_family == AF_CUST_SOCKPAIR)) {
- if ((cfd = recv_fd_uxst(fd)) != -1)
- fcntl(cfd, F_SETFL, O_NONBLOCK);
- /* just like with UNIX sockets, only the family is filled */
- addr.ss_family = AF_UNIX;
- laddr = sizeof(addr.ss_family);
- } else
-
-#ifdef USE_ACCEPT4
- /* only call accept4() if it's known to be safe, otherwise
- * fallback to the legacy accept() + fcntl().
- */
- if (unlikely(accept4_broken ||
- ((cfd = accept4(fd, (struct sockaddr *)&addr, &laddr, SOCK_NONBLOCK)) == -1 &&
- (errno == ENOSYS || errno == EINVAL || errno == EBADF) &&
- (accept4_broken = 1))))
-#endif
- if ((cfd = accept(fd, (struct sockaddr *)&addr, &laddr)) != -1)
- fcntl(cfd, F_SETFL, O_NONBLOCK);
+ cli_conn = l->rx.proto->accept_conn(l, &status);
+ if (!cli_conn) {
+ switch (status) {
+ case CO_AC_DONE:
+ goto end;
- if (unlikely(cfd == -1)) {
- switch (errno) {
- case EAGAIN:
- if (fdtab[fd].ev & (FD_POLL_HUP|FD_POLL_ERR)) {
- /* the listening socket might have been disabled in a shared
- * process and we're a collateral victim. We'll just pause for
- * a while in case it comes back. In the mean time, we need to
- * clear this sticky flag.
- */
- _HA_ATOMIC_AND(&fdtab[fd].ev, ~(FD_POLL_HUP|FD_POLL_ERR));
- goto transient_error;
- }
- goto end; /* nothing more to accept */
- case EINVAL:
- /* might be trying to accept on a shut fd (eg: soft stop) */
- goto transient_error;
- case EINTR:
- case ECONNABORTED:
+ case CO_AC_RETRY: /* likely a signal */
_HA_ATOMIC_SUB(&l->nbconn, 1);
if (p)
_HA_ATOMIC_SUB(&p->feconn, 1);
if (!(l->options & LI_O_UNLIMITED))
_HA_ATOMIC_SUB(&actconn, 1);
continue;
- case ENFILE:
- if (p)
- send_log(p, LOG_EMERG,
- "Proxy %s reached system FD limit (maxsock=%d). Please check system tunables.\n",
- p->id, global.maxsock);
- goto transient_error;
- case EMFILE:
- if (p)
- send_log(p, LOG_EMERG,
- "Proxy %s reached process FD limit (maxsock=%d). Please check 'ulimit-n' and restart.\n",
- p->id, global.maxsock);
- goto transient_error;
- case ENOBUFS:
- case ENOMEM:
- if (p)
- send_log(p, LOG_EMERG,
- "Proxy %s reached system memory limit (maxsock=%d). Please check system tunables.\n",
- p->id, global.maxsock);
- goto transient_error;
- default:
- /* unexpected result, let's give up and let other tasks run */
+
+ case CO_AC_YIELD:
max_accept = 0;
goto end;
- }
- }
-
- /* we don't want to leak the FD upon reload if it's in the master */
- if (unlikely(master == 1))
- fcntl(cfd, F_SETFD, FD_CLOEXEC);
- /* we'll have to at least allocate a connection, assign the listener
- * to conn->target, set the source address, and set the fd.
- */
- cli_conn = conn_new(&l->obj_type);
- if (cli_conn) {
- cli_conn->handle.fd = cfd;
- cli_conn->flags |= CO_FL_ADDR_FROM_SET;
- if (!sockaddr_alloc(&cli_conn->src, &addr, laddr)) {
- conn_free(cli_conn);
- cli_conn = NULL;
+ default:
+ goto transient_error;
}
}
- if (!cli_conn) {
- /* no more memory, give up! */
- close(cfd);
- continue;
- }
-
/* The connection was accepted, it must be counted as such */
if (l->counters)
HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn);
@@ -930,12 +852,12 @@
_HA_ATOMIC_ADD(&activity[tid].accepted, 1);
- if (unlikely(cfd >= global.maxsock)) {
+ if (unlikely(cli_conn->handle.fd >= global.maxsock)) {
send_log(p, LOG_EMERG,
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
p->id);
conn_free(cli_conn);
- close(cfd);
+ close(cli_conn->handle.fd);
expire = tick_add(now_ms, 1000); /* try again in 1 second */
goto limit_global;
}