BUG/MEDIUM: listener: make sure the listener never accepts too many conns
We were not checking p->feconn nor the global actconn soon enough. In
older versions this could result in a frontend accepting more connections
than allowed by its maxconn or the global maxconn, exactly N-1 extra
connections where N is the number of threads, provided each of these
threads were running a different listener. But with the lock removal,
it became worse, the excess could be the listener's maxconn multiplied
by the number of threads. Among the nasty side effect was that LI_FULL
could be removed while the limit was still over and in some cases the
polling on the socket was no re-enabled.
This commit takes care of updating and checking p->feconn and the global
actconn *before* processing the connection, so that the listener can be
turned off before accepting the socket if needed. This requires to move
some of the bookkeeping operations form session to listen, which totally
makes sense in this context.
Now the limits are properly respected, even if a listener's maxconn is
over a frontend's. This only applies on top of the listener lock removal
series and doesn't have to be backported.
diff --git a/src/listener.c b/src/listener.c
index 8512ed0..3ae20e2 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -578,6 +578,8 @@
struct proxy *p;
int max_accept;
int next_conn = 0;
+ int next_feconn = 0;
+ int next_actconn = 0;
int expire;
int cfd;
int ret;
@@ -648,12 +650,15 @@
* worst case. If we fail due to system limits or temporary resource
* shortage, we try again 100ms later in the worst case.
*/
- for (; max_accept-- > 0; next_conn = 0) {
+ for (; max_accept-- > 0; next_conn = next_feconn = next_actconn = 0) {
struct sockaddr_storage addr;
socklen_t laddr = sizeof(addr);
unsigned int count;
- /* pre-increase the number of connections without going too far */
+ /* pre-increase the number of connections without going too far.
+ * We process the listener, then the proxy, then the process.
+ * We know which ones to unroll based on the next_xxx value.
+ */
do {
count = l->nbconn;
if (count >= l->maxconn) {
@@ -671,15 +676,42 @@
listener_full(l);
}
- if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
- limit_listener(l, &global_listener_queue);
- task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
- goto end;
+ if (p) {
+ do {
+ count = p->feconn;
+ if (count >= p->maxconn) {
+ /* the frontend was marked full or another
+ * thread is going to do it.
+ */
+ next_feconn = 0;
+ goto end;
+ }
+ next_feconn = count + 1;
+ } while (!HA_ATOMIC_CAS(&p->feconn, &count, next_feconn));
+
+ if (unlikely(next_feconn == p->maxconn)) {
+ /* we just filled it */
+ limit_listener(l, &p->listener_queue);
+ }
}
- if (unlikely(p && p->feconn >= p->maxconn)) {
- limit_listener(l, &p->listener_queue);
- goto end;
+ if (!(l->options & LI_O_UNLIMITED)) {
+ do {
+ count = actconn;
+ if (count >= global.maxconn) {
+ /* the process was marked full or another
+ * thread is going to do it.
+ */
+ next_actconn = 0;
+ goto end;
+ }
+ next_actconn = count + 1;
+ } while (!HA_ATOMIC_CAS(&actconn, &count, next_actconn));
+
+ if (unlikely(next_actconn == global.maxconn)) {
+ limit_listener(l, &global_listener_queue);
+ task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
+ }
}
/* with sockpair@ we don't want to do an accept */
@@ -723,6 +755,10 @@
case EINTR:
case ECONNABORTED:
HA_ATOMIC_SUB(&l->nbconn, 1);
+ if (p)
+ HA_ATOMIC_SUB(&p->feconn, 1);
+ if (!(l->options & LI_O_UNLIMITED))
+ HA_ATOMIC_SUB(&actconn, 1);
continue;
case ENFILE:
if (p)
@@ -757,18 +793,20 @@
if (l->counters)
HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn);
+ if (p)
+ HA_ATOMIC_UPDATE_MAX(&p->fe_counters.conn_max, next_feconn);
+
+ proxy_inc_fe_conn_ctr(l, p);
+
if (!(l->options & LI_O_UNLIMITED)) {
count = update_freq_ctr(&global.conn_per_sec, 1);
HA_ATOMIC_UPDATE_MAX(&global.cps_max, count);
- HA_ATOMIC_ADD(&actconn, 1);
}
if (unlikely(cfd >= global.maxsock)) {
send_log(p, LOG_EMERG,
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
p->id);
- if (!(l->options & LI_O_UNLIMITED))
- HA_ATOMIC_SUB(&actconn, 1);
close(cfd);
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
@@ -776,11 +814,13 @@
}
/* past this point, l->accept() will automatically decrement
- * l->nbconn and actconn once done. Setting next_conn=0 allows
- * the error path not to rollback on nbconn. It's more convenient
- * than duplicating all exit labels.
+ * l->nbconn, feconn and actconn once done. Setting next_*conn=0
+ * allows the error path not to rollback on nbconn. It's more
+ * convenient than duplicating all exit labels.
*/
next_conn = 0;
+ next_feconn = 0;
+ next_actconn = 0;
#if defined(USE_THREAD)
count = l->bind_conf->thr_count;
@@ -875,7 +915,14 @@
if (next_conn)
HA_ATOMIC_SUB(&l->nbconn, 1);
- if (l->nbconn < l->maxconn && l->state == LI_FULL) {
+ if (p && next_feconn)
+ HA_ATOMIC_SUB(&p->feconn, 1);
+
+ if (next_actconn)
+ HA_ATOMIC_SUB(&actconn, 1);
+
+ if ((l->state == LI_FULL && l->nbconn < l->maxconn) ||
+ (l->state == LI_LIMITED && ((!p || p->feconn < p->maxconn) && (actconn < global.maxconn)))) {
/* at least one thread has to this when quitting */
resume_listener(l);
@@ -899,9 +946,12 @@
if (!(l->options & LI_O_UNLIMITED))
HA_ATOMIC_SUB(&actconn, 1);
+ if (fe)
+ HA_ATOMIC_SUB(&fe->feconn, 1);
HA_ATOMIC_SUB(&l->nbconn, 1);
HA_ATOMIC_SUB(&l->thr_conn[tid], 1);
- if (l->state == LI_FULL)
+
+ if (l->state == LI_FULL || l->state == LI_LIMITED)
resume_listener(l);
/* Dequeues all of the listeners waiting for a resource */
diff --git a/src/session.c b/src/session.c
index 2732dbc..58317b8 100644
--- a/src/session.c
+++ b/src/session.c
@@ -55,10 +55,6 @@
vars_init(&sess->vars, SCOPE_SESS);
sess->task = NULL;
sess->t_handshake = -1; /* handshake not done yet */
- HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.conn_max,
- HA_ATOMIC_ADD(&fe->feconn, 1));
- if (li)
- proxy_inc_fe_conn_ctr(li, fe);
HA_ATOMIC_ADD(&totalconn, 1);
HA_ATOMIC_ADD(&jobs, 1);
LIST_INIT(&sess->srv_list);
@@ -72,7 +68,6 @@
struct connection *conn, *conn_back;
struct sess_srv_list *srv_list, *srv_list_back;
- HA_ATOMIC_SUB(&sess->fe->feconn, 1);
if (sess->listener)
listener_release(sess->listener);
session_store_counters(sess);