REORG/MEDIUM: move the default accept function from sockstream to protocols.c
The previous sockstream_accept() function uses nothing from sockstream, and
is totally irrelevant to stream interfaces. Move this to the protocols.c
file which handles listeners and protocols, and call it listener_accept().
It now makes much more sense that the code dealing with listen() also handles
accept() and passes it to upper layers.
diff --git a/include/proto/protocols.h b/include/proto/protocols.h
index cebe560..91334da 100644
--- a/include/proto/protocols.h
+++ b/include/proto/protocols.h
@@ -99,6 +99,12 @@
*/
void delete_listener(struct listener *listener);
+/* This function is called on a read event from a listening socket, corresponding
+ * to an accept. It tries to accept as many connections as possible, and for each
+ * calls the listener's accept handler (generally the frontend's accept handler).
+ */
+int listener_accept(int fd);
+
/* Registers the protocol <proto> */
void protocol_register(struct protocol *proto);
diff --git a/include/proto/stream_sock.h b/include/proto/stream_sock.h
index 577dad8..c93e483 100644
--- a/include/proto/stream_sock.h
+++ b/include/proto/stream_sock.h
@@ -31,7 +31,6 @@
/* main event functions used to move data between sockets and buffers */
-int stream_sock_accept(int fd);
int stream_sock_read(int fd);
int stream_sock_write(int fd);
void stream_sock_data_finish(struct stream_interface *si);
diff --git a/src/proto_tcp.c b/src/proto_tcp.c
index 478bff5..0c389d8 100644
--- a/src/proto_tcp.c
+++ b/src/proto_tcp.c
@@ -69,7 +69,7 @@
.sock_family = AF_INET,
.sock_addrlen = sizeof(struct sockaddr_in),
.l3_addrlen = 32/8,
- .accept = &stream_sock_accept,
+ .accept = &listener_accept,
.connect = tcp_connect_server,
.bind = tcp_bind_listener,
.bind_all = tcp_bind_listeners,
@@ -88,7 +88,7 @@
.sock_family = AF_INET6,
.sock_addrlen = sizeof(struct sockaddr_in6),
.l3_addrlen = 128/8,
- .accept = &stream_sock_accept,
+ .accept = &listener_accept,
.connect = tcp_connect_server,
.bind = tcp_bind_listener,
.bind_all = tcp_bind_listeners,
diff --git a/src/proto_uxst.c b/src/proto_uxst.c
index 2140ae6..1a808ea 100644
--- a/src/proto_uxst.c
+++ b/src/proto_uxst.c
@@ -55,7 +55,7 @@
.sock_family = AF_UNIX,
.sock_addrlen = sizeof(struct sockaddr_un),
.l3_addrlen = sizeof(((struct sockaddr_un*)0)->sun_path),/* path len */
- .accept = &stream_sock_accept,
+ .accept = &listener_accept,
.bind = uxst_bind_listener,
.bind_all = uxst_bind_listeners,
.unbind_all = uxst_unbind_listeners,
diff --git a/src/protocols.c b/src/protocols.c
index 155636f..cc7b3ce 100644
--- a/src/protocols.c
+++ b/src/protocols.c
@@ -10,6 +10,7 @@
*
*/
+#include <errno.h>
#include <stdio.h>
#include <string.h>
@@ -17,9 +18,15 @@
#include <common/errors.h>
#include <common/mini-clist.h>
#include <common/standard.h>
+#include <common/time.h>
+
+#include <types/global.h>
#include <proto/acl.h>
#include <proto/fd.h>
+#include <proto/freq_ctr.h>
+#include <proto/log.h>
+#include <proto/task.h>
/* List head of all registered protocols */
static struct list protocols = LIST_HEAD_INIT(protocols);
@@ -230,6 +237,164 @@
listener->proto->nb_listeners--;
}
+/* This function is called on a read event from a listening socket, corresponding
+ * to an accept. It tries to accept as many connections as possible, and for each
+ * calls the listener's accept handler (generally the frontend's accept handler).
+ */
+int listener_accept(int fd)
+{
+ struct listener *l = fdtab[fd].owner;
+ struct proxy *p = l->frontend;
+ int max_accept = global.tune.maxaccept;
+ int cfd;
+ int ret;
+
+ if (unlikely(l->nbconn >= l->maxconn)) {
+ listener_full(l);
+ return 0;
+ }
+
+ if (global.cps_lim && !(l->options & LI_O_UNLIMITED)) {
+ int max = freq_ctr_remain(&global.conn_per_sec, global.cps_lim, 0);
+
+ if (unlikely(!max)) {
+ /* frontend accept rate limit was reached */
+ limit_listener(l, &global_listener_queue);
+ task_schedule(global_listener_queue_task, tick_add(now_ms, next_event_delay(&global.conn_per_sec, global.cps_lim, 0)));
+ return 0;
+ }
+
+ if (max_accept > max)
+ max_accept = max;
+ }
+
+ if (p && p->fe_sps_lim) {
+ int max = freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0);
+
+ if (unlikely(!max)) {
+ /* frontend accept rate limit was reached */
+ limit_listener(l, &p->listener_queue);
+ task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0)));
+ return 0;
+ }
+
+ if (max_accept > max)
+ max_accept = max;
+ }
+
+ /* Note: if we fail to allocate a connection because of configured
+ * limits, we'll schedule a new attempt worst 1 second later in the
+ * worst case. If we fail due to system limits or temporary resource
+ * shortage, we try again 100ms later in the worst case.
+ */
+ while (max_accept--) {
+ struct sockaddr_storage addr;
+ socklen_t laddr = sizeof(addr);
+
+ if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
+ limit_listener(l, &global_listener_queue);
+ task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
+ return 0;
+ }
+
+ if (unlikely(p && p->feconn >= p->maxconn)) {
+ limit_listener(l, &p->listener_queue);
+ return 0;
+ }
+
+ cfd = accept(fd, (struct sockaddr *)&addr, &laddr);
+ if (unlikely(cfd == -1)) {
+ switch (errno) {
+ case EAGAIN:
+ case EINTR:
+ case ECONNABORTED:
+ return 0; /* nothing more to accept */
+ case ENFILE:
+ if (p)
+ send_log(p, LOG_EMERG,
+ "Proxy %s reached system FD limit at %d. Please check system tunables.\n",
+ p->id, maxfd);
+ limit_listener(l, &global_listener_queue);
+ task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
+ return 0;
+ case EMFILE:
+ if (p)
+ send_log(p, LOG_EMERG,
+ "Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n",
+ p->id, maxfd);
+ limit_listener(l, &global_listener_queue);
+ task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
+ return 0;
+ case ENOBUFS:
+ case ENOMEM:
+ if (p)
+ send_log(p, LOG_EMERG,
+ "Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n",
+ p->id, maxfd);
+ limit_listener(l, &global_listener_queue);
+ task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
+ return 0;
+ default:
+ return 0;
+ }
+ }
+
+ if (unlikely(cfd >= global.maxsock)) {
+ send_log(p, LOG_EMERG,
+ "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
+ p->id);
+ close(cfd);
+ limit_listener(l, &global_listener_queue);
+ task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
+ return 0;
+ }
+
+ /* increase the per-process number of cumulated connections */
+ if (!(l->options & LI_O_UNLIMITED)) {
+ update_freq_ctr(&global.conn_per_sec, 1);
+ if (global.conn_per_sec.curr_ctr > global.cps_max)
+ global.cps_max = global.conn_per_sec.curr_ctr;
+ actconn++;
+ }
+
+ jobs++;
+ totalconn++;
+ l->nbconn++;
+
+ if (l->counters) {
+ if (l->nbconn > l->counters->conn_max)
+ l->counters->conn_max = l->nbconn;
+ }
+
+ ret = l->accept(l, cfd, &addr);
+ if (unlikely(ret <= 0)) {
+ /* The connection was closed by session_accept(). Either
+ * we just have to ignore it (ret == 0) or it's a critical
+ * error due to a resource shortage, and we must stop the
+ * listener (ret < 0).
+ */
+ if (!(l->options & LI_O_UNLIMITED))
+ actconn--;
+ jobs--;
+ l->nbconn--;
+ if (ret == 0) /* successful termination */
+ continue;
+
+ limit_listener(l, &global_listener_queue);
+ task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
+ return 0;
+ }
+
+ if (l->nbconn >= l->maxconn) {
+ listener_full(l);
+ return 0;
+ }
+
+ } /* end of while (p->feconn < p->maxconn) */
+
+ return 0;
+}
+
/* Registers the protocol <proto> */
void protocol_register(struct protocol *proto)
{
diff --git a/src/stream_sock.c b/src/stream_sock.c
index 93e0a92..accd4a1 100644
--- a/src/stream_sock.c
+++ b/src/stream_sock.c
@@ -1138,164 +1138,6 @@
}
}
-/* This function is called on a read event from a listening socket, corresponding
- * to an accept. It tries to accept as many connections as possible, and for each
- * calls the listener's accept handler (generally the frontend's accept handler).
- */
-int stream_sock_accept(int fd)
-{
- struct listener *l = fdtab[fd].owner;
- struct proxy *p = l->frontend;
- int max_accept = global.tune.maxaccept;
- int cfd;
- int ret;
-
- if (unlikely(l->nbconn >= l->maxconn)) {
- listener_full(l);
- return 0;
- }
-
- if (global.cps_lim && !(l->options & LI_O_UNLIMITED)) {
- int max = freq_ctr_remain(&global.conn_per_sec, global.cps_lim, 0);
-
- if (unlikely(!max)) {
- /* frontend accept rate limit was reached */
- limit_listener(l, &global_listener_queue);
- task_schedule(global_listener_queue_task, tick_add(now_ms, next_event_delay(&global.conn_per_sec, global.cps_lim, 0)));
- return 0;
- }
-
- if (max_accept > max)
- max_accept = max;
- }
-
- if (p && p->fe_sps_lim) {
- int max = freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0);
-
- if (unlikely(!max)) {
- /* frontend accept rate limit was reached */
- limit_listener(l, &p->listener_queue);
- task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0)));
- return 0;
- }
-
- if (max_accept > max)
- max_accept = max;
- }
-
- /* Note: if we fail to allocate a connection because of configured
- * limits, we'll schedule a new attempt worst 1 second later in the
- * worst case. If we fail due to system limits or temporary resource
- * shortage, we try again 100ms later in the worst case.
- */
- while (max_accept--) {
- struct sockaddr_storage addr;
- socklen_t laddr = sizeof(addr);
-
- if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
- limit_listener(l, &global_listener_queue);
- task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
- return 0;
- }
-
- if (unlikely(p && p->feconn >= p->maxconn)) {
- limit_listener(l, &p->listener_queue);
- return 0;
- }
-
- cfd = accept(fd, (struct sockaddr *)&addr, &laddr);
- if (unlikely(cfd == -1)) {
- switch (errno) {
- case EAGAIN:
- case EINTR:
- case ECONNABORTED:
- return 0; /* nothing more to accept */
- case ENFILE:
- if (p)
- send_log(p, LOG_EMERG,
- "Proxy %s reached system FD limit at %d. Please check system tunables.\n",
- p->id, maxfd);
- limit_listener(l, &global_listener_queue);
- task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
- return 0;
- case EMFILE:
- if (p)
- send_log(p, LOG_EMERG,
- "Proxy %s reached process FD limit at %d. Please check 'ulimit-n' and restart.\n",
- p->id, maxfd);
- limit_listener(l, &global_listener_queue);
- task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
- return 0;
- case ENOBUFS:
- case ENOMEM:
- if (p)
- send_log(p, LOG_EMERG,
- "Proxy %s reached system memory limit at %d sockets. Please check system tunables.\n",
- p->id, maxfd);
- limit_listener(l, &global_listener_queue);
- task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
- return 0;
- default:
- return 0;
- }
- }
-
- if (unlikely(cfd >= global.maxsock)) {
- send_log(p, LOG_EMERG,
- "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
- p->id);
- close(cfd);
- limit_listener(l, &global_listener_queue);
- task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
- return 0;
- }
-
- /* increase the per-process number of cumulated connections */
- if (!(l->options & LI_O_UNLIMITED)) {
- update_freq_ctr(&global.conn_per_sec, 1);
- if (global.conn_per_sec.curr_ctr > global.cps_max)
- global.cps_max = global.conn_per_sec.curr_ctr;
- actconn++;
- }
-
- jobs++;
- totalconn++;
- l->nbconn++;
-
- if (l->counters) {
- if (l->nbconn > l->counters->conn_max)
- l->counters->conn_max = l->nbconn;
- }
-
- ret = l->accept(l, cfd, &addr);
- if (unlikely(ret <= 0)) {
- /* The connection was closed by session_accept(). Either
- * we just have to ignore it (ret == 0) or it's a critical
- * error due to a resource shortage, and we must stop the
- * listener (ret < 0).
- */
- if (!(l->options & LI_O_UNLIMITED))
- actconn--;
- jobs--;
- l->nbconn--;
- if (ret == 0) /* successful termination */
- continue;
-
- limit_listener(l, &global_listener_queue);
- task_schedule(global_listener_queue_task, tick_add(now_ms, 100)); /* try again in 100 ms */
- return 0;
- }
-
- if (l->nbconn >= l->maxconn) {
- listener_full(l);
- return 0;
- }
-
- } /* end of while (p->feconn < p->maxconn) */
-
- return 0;
-}
-
/* stream sock operations */
struct sock_ops stream_sock = {
.update = stream_sock_data_finish,