MEDIUM: threads/listeners: Make listeners thread-safe

First, we use atomic operations to update jobs/totalconn/actconn variables,
listener's nbconn variable and listener's counters. Then we add a lock on
listeners to protect access to their information. And finally, listener queues
(global and per proxy) are also protected by a lock. Here, because access to
these queues are unusal, we use the same lock for all queues instead of a global
one for the global queue and a lock per proxy for others.
diff --git a/src/listener.c b/src/listener.c
index a7e2b0d..9646bba 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -38,6 +38,11 @@
 #include <proto/stream.h>
 #include <proto/task.h>
 
+#ifdef USE_THREAD
+ /* listner_queue lock (same for global and per proxy queues) */
+static HA_SPINLOCK_T lq_lock;
+#endif
+
 /* List head of all known bind keywords */
 static struct bind_kw_list bind_keywords = {
 	.list = LIST_HEAD_INIT(bind_keywords.list)
@@ -53,6 +58,7 @@
  */
 static void enable_listener(struct listener *listener)
 {
+	SPIN_LOCK(LISTENER_LOCK, &listener->lock);
 	if (listener->state == LI_LISTEN) {
 		if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) &&
 		    listener->bind_conf->bind_proc &&
@@ -75,6 +81,7 @@
 			listener->state = LI_FULL;
 		}
 	}
+	SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
 }
 
 /* This function removes the specified listener's file descriptor from the
@@ -83,13 +90,19 @@
  */
 static void disable_listener(struct listener *listener)
 {
+	SPIN_LOCK(LISTENER_LOCK, &listener->lock);
 	if (listener->state < LI_READY)
-		return;
+		goto end;
 	if (listener->state == LI_READY)
 		fd_stop_recv(listener->fd);
-	if (listener->state == LI_LIMITED)
+	if (listener->state == LI_LIMITED) {
+		SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
 		LIST_DEL(&listener->wait_queue);
+		SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+	}
 	listener->state = LI_LISTEN;
+  end:
+	SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
 }
 
 /* This function tries to temporarily disable a listener, depending on the OS
@@ -101,8 +114,12 @@
  */
 int pause_listener(struct listener *l)
 {
+	int ret = 1;
+
+	SPIN_LOCK(LISTENER_LOCK, &l->lock);
+
 	if (l->state <= LI_ZOMBIE)
-		return 1;
+		goto end;
 
 	if (l->proto->pause) {
 		/* Returns < 0 in case of failure, 0 if the listener
@@ -110,18 +127,25 @@
 		 */
 		int ret = l->proto->pause(l);
 
-		if (ret < 0)
-			return 0;
+		if (ret < 0) {
+			ret = 0;
+			goto end;
+		}
 		else if (ret == 0)
-			return 1;
+			goto end;
 	}
 
-	if (l->state == LI_LIMITED)
+	if (l->state == LI_LIMITED) {
+		SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
 		LIST_DEL(&l->wait_queue);
+		SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+	}
 
 	fd_stop_recv(l->fd);
 	l->state = LI_PAUSED;
-	return 1;
+  end:
+	SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
+	return ret;
 }
 
 /* This function tries to resume a temporarily disabled listener. Paused, full,
@@ -134,12 +158,16 @@
  * stopped it. If the resume fails, 0 is returned and an error might be
  * displayed.
  */
-int resume_listener(struct listener *l)
+static int __resume_listener(struct listener *l)
 {
+	int ret = 1;
+
+	SPIN_LOCK(LISTENER_LOCK, &l->lock);
+
 	if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) &&
 	    l->bind_conf->bind_proc &&
 	    !(l->bind_conf->bind_proc & (1UL << (relative_pid - 1))))
-		return 1;
+		goto end;
 
 	if (l->state == LI_ASSIGNED) {
 		char msg[100];
@@ -151,42 +179,66 @@
 		else if (err & ERR_WARN)
 			Warning("Resuming listener: %s\n", msg);
 
-		if (err & (ERR_FATAL | ERR_ABORT))
-			return 0;
+		if (err & (ERR_FATAL | ERR_ABORT)) {
+			ret = 0;
+			goto end;
+		}
 	}
 
-	if (l->state < LI_PAUSED || l->state == LI_ZOMBIE)
-		return 0;
+	if (l->state < LI_PAUSED || l->state == LI_ZOMBIE) {
+		ret = 0;
+		goto end;
+	}
 
 	if (l->proto->sock_prot == IPPROTO_TCP &&
 	    l->state == LI_PAUSED &&
-	    listen(l->fd, l->backlog ? l->backlog : l->maxconn) != 0)
-		return 0;
+	    listen(l->fd, l->backlog ? l->backlog : l->maxconn) != 0) {
+		ret = 0;
+		goto end;
+	}
 
 	if (l->state == LI_READY)
-		return 1;
+		goto end;
 
 	if (l->state == LI_LIMITED)
 		LIST_DEL(&l->wait_queue);
 
 	if (l->nbconn >= l->maxconn) {
 		l->state = LI_FULL;
-		return 1;
+		goto end;
 	}
 
 	fd_want_recv(l->fd);
 	l->state = LI_READY;
-	return 1;
+  end:
+	SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
+	return ret;
+}
+
+int resume_listener(struct listener *l)
+{
+	int ret;
+
+	SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+	ret = __resume_listener(l);
+	SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+	return ret;
 }
 
 /* Marks a ready listener as full so that the stream code tries to re-enable
  * it upon next close() using resume_listener().
+ *
+ * Note: this function is only called from listener_accept so <l> is already
+ *       locked.
  */
 static void listener_full(struct listener *l)
 {
 	if (l->state >= LI_READY) {
-		if (l->state == LI_LIMITED)
+		if (l->state == LI_LIMITED) {
+			SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
 			LIST_DEL(&l->wait_queue);
+			SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+		}
 
 		fd_stop_recv(l->fd);
 		l->state = LI_FULL;
@@ -195,11 +247,16 @@
 
 /* Marks a ready listener as limited so that we only try to re-enable it when
  * resources are free again. It will be queued into the specified queue.
+ *
+ * Note: this function is only called from listener_accept so <l> is already
+ *       locked.
  */
 static void limit_listener(struct listener *l, struct list *list)
 {
 	if (l->state == LI_READY) {
+		SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
 		LIST_ADDQ(list, &l->wait_queue);
+		SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
 		fd_stop_recv(l->fd);
 		l->state = LI_LIMITED;
 	}
@@ -239,22 +296,28 @@
 {
 	struct listener *listener, *l_back;
 
+	SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
 	list_for_each_entry_safe(listener, l_back, list, wait_queue) {
 		/* This cannot fail because the listeners are by definition in
 		 * the LI_LIMITED state. The function also removes the entry
 		 * from the queue.
 		 */
-		resume_listener(listener);
+		__resume_listener(listener);
 	}
+	SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
 }
 
 static int do_unbind_listener(struct listener *listener, int do_close)
 {
+	SPIN_LOCK(LISTENER_LOCK, &listener->lock);
 	if (listener->state == LI_READY)
 		fd_stop_recv(listener->fd);
 
-	if (listener->state == LI_LIMITED)
+	if (listener->state == LI_LIMITED) {
+		SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
 		LIST_DEL(&listener->wait_queue);
+		SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+	}
 
 	if (listener->state >= LI_PAUSED) {
 		if (do_close) {
@@ -265,6 +328,7 @@
 			fd_remove(listener->fd);
 		listener->state = LI_ASSIGNED;
 	}
+	SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
 	return ERR_NONE;
 }
 
@@ -335,8 +399,9 @@
 
 		proto->add(l, port);
 
-		jobs++;
-		listeners++;
+		SPIN_INIT(&l->lock);
+		HA_ATOMIC_ADD(&jobs, 1);
+		HA_ATOMIC_ADD(&listeners, 1);
 	}
 	return 1;
 }
@@ -351,11 +416,14 @@
 {
 	if (listener->state != LI_ASSIGNED)
 		return;
+
+	SPIN_LOCK(LISTENER_LOCK, &listener->lock);
 	listener->state = LI_INIT;
 	LIST_DEL(&listener->proto_list);
 	listener->proto->nb_listeners--;
-	listeners--;
-	jobs--;
+	HA_ATOMIC_SUB(&jobs, 1);
+	HA_ATOMIC_SUB(&listeners, 1);
+	SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
 }
 
 /* This function is called on a read event from a listening socket, corresponding
@@ -374,9 +442,12 @@
 	static int accept4_broken;
 #endif
 
+	if (SPIN_TRYLOCK(LISTENER_LOCK, &l->lock))
+		return;
+
 	if (unlikely(l->nbconn >= l->maxconn)) {
 		listener_full(l);
-		return;
+		goto end;
 	}
 
 	if (!(l->options & LI_O_UNLIMITED) && global.sps_lim) {
@@ -425,7 +496,7 @@
 			/* frontend accept rate limit was reached */
 			limit_listener(l, &p->listener_queue);
 			task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0)));
-			return;
+			goto end;
 		}
 
 		if (max_accept > max)
@@ -440,16 +511,17 @@
 	while (max_accept--) {
 		struct sockaddr_storage addr;
 		socklen_t laddr = sizeof(addr);
+		unsigned int count;
 
 		if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
 			limit_listener(l, &global_listener_queue);
 			task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
-			return;
+			goto end;
 		}
 
 		if (unlikely(p && p->feconn >= p->maxconn)) {
 			limit_listener(l, &p->listener_queue);
-			return;
+			goto end;
 		}
 
 #ifdef USE_ACCEPT4
@@ -477,7 +549,7 @@
 					goto transient_error;
 				}
 				fd_cant_recv(fd);
-				return;   /* nothing more to accept */
+				goto end; /* nothing more to accept */
 			case EINVAL:
 				/* might be trying to accept on a shut fd (eg: soft stop) */
 				goto transient_error;
@@ -516,23 +588,19 @@
 			close(cfd);
 			limit_listener(l, &global_listener_queue);
 			task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
-			return;
+			goto end;
 		}
 
 		/* increase the per-process number of cumulated connections */
 		if (!(l->options & LI_O_UNLIMITED)) {
-			update_freq_ctr(&global.conn_per_sec, 1);
-			if (global.conn_per_sec.curr_ctr > global.cps_max)
-				global.cps_max = global.conn_per_sec.curr_ctr;
-			actconn++;
+			count = update_freq_ctr(&global.conn_per_sec, 1);
+			HA_ATOMIC_UPDATE_MAX(&global.cps_max, count);
+			HA_ATOMIC_ADD(&actconn, 1);
 		}
 
-		l->nbconn++;
-
-		if (l->counters) {
-			if (l->nbconn > l->counters->conn_max)
-				l->counters->conn_max = l->nbconn;
-		}
+		count = HA_ATOMIC_ADD(&l->nbconn, 1);
+		if (l->counters)
+			HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, count);
 
 		ret = l->accept(l, cfd, &addr);
 		if (unlikely(ret <= 0)) {
@@ -542,8 +610,8 @@
 			 * listener (ret < 0).
 			 */
 			if (!(l->options & LI_O_UNLIMITED))
-				actconn--;
-			l->nbconn--;
+				HA_ATOMIC_SUB(&actconn, 1);
+			HA_ATOMIC_SUB(&l->nbconn, 1);
 			if (ret == 0) /* successful termination */
 				continue;
 
@@ -552,21 +620,18 @@
 
 		if (l->nbconn >= l->maxconn) {
 			listener_full(l);
-			return;
+			goto end;
 		}
 
 		/* increase the per-process number of cumulated connections */
 		if (!(l->options & LI_O_UNLIMITED)) {
-			update_freq_ctr(&global.sess_per_sec, 1);
-			if (global.sess_per_sec.curr_ctr > global.sps_max)
-				global.sps_max = global.sess_per_sec.curr_ctr;
+			count = update_freq_ctr(&global.sess_per_sec, 1);
+			HA_ATOMIC_UPDATE_MAX(&global.sps_max, count);
 		}
 #ifdef USE_OPENSSL
 		if (!(l->options & LI_O_UNLIMITED) && l->bind_conf && l->bind_conf->is_ssl) {
-
-			update_freq_ctr(&global.ssl_per_sec, 1);
-			if (global.ssl_per_sec.curr_ctr > global.ssl_max)
-				global.ssl_max = global.ssl_per_sec.curr_ctr;
+			count = update_freq_ctr(&global.ssl_per_sec, 1);
+			HA_ATOMIC_UPDATE_MAX(&global.ssl_max, count);
 		}
 #endif
 
@@ -575,7 +640,7 @@
 	/* we've exhausted max_accept, so there is no need to poll again */
  stop:
 	fd_done_recv(fd);
-	return;
+	goto end;
 
  transient_error:
 	/* pause the listener and try again in 100 ms */
@@ -584,7 +649,8 @@
  wait_expire:
 	limit_listener(l, &global_listener_queue);
 	task_schedule(global_listener_queue_task, tick_first(expire, global_listener_queue_task->expire));
-	return;
+ end:
+	SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
 }
 
 /* Notify the listener that a connection initiated from it was released. This
@@ -596,8 +662,8 @@
 	struct proxy *fe = l->bind_conf->frontend;
 
 	if (!(l->options & LI_O_UNLIMITED))
-		actconn--;
-	l->nbconn--;
+		HA_ATOMIC_SUB(&actconn, 1);
+	HA_ATOMIC_SUB(&l->nbconn, 1);
 	if (l->state == LI_FULL)
 		resume_listener(l);
 
@@ -946,6 +1012,7 @@
 	sample_register_fetches(&smp_kws);
 	acl_register_keywords(&acl_kws);
 	bind_register_keywords(&bind_kws);
+	SPIN_INIT(&lq_lock);
 }
 
 /*
diff --git a/src/peers.c b/src/peers.c
index b98ec61..579e096 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -24,6 +24,7 @@
 #include <common/config.h>
 #include <common/time.h>
 #include <common/standard.h>
+#include <common/hathreads.h>
 
 #include <types/global.h>
 #include <types/listener.h>
@@ -1974,7 +1975,7 @@
 			/* We've just recieved the signal */
 			if (!(peers->flags & PEERS_F_DONOTSTOP)) {
 				/* add DO NOT STOP flag if not present */
-				jobs++;
+				HA_ATOMIC_ADD(&jobs, 1);
 				peers->flags |= PEERS_F_DONOTSTOP;
 				ps = peers->local;
 				for (st = ps->tables; st ; st = st->next)
@@ -1994,7 +1995,7 @@
 		if (ps->flags & PEER_F_TEACH_COMPLETE) {
 			if (peers->flags & PEERS_F_DONOTSTOP) {
 				/* resync of new process was complete, current process can die now */
-				jobs--;
+				HA_ATOMIC_ADD(&jobs, 1);
 				peers->flags &= ~PEERS_F_DONOTSTOP;
 				for (st = ps->tables; st ; st = st->next)
 					st->table->syncing--;
@@ -2018,7 +2019,7 @@
 				/* Other error cases */
 				if (peers->flags & PEERS_F_DONOTSTOP) {
 					/* unable to resync new process, current process can die now */
-					jobs--;
+					HA_ATOMIC_SUB(&jobs, 1);
 					peers->flags &= ~PEERS_F_DONOTSTOP;
 					for (st = ps->tables; st ; st = st->next)
 						st->table->syncing--;
diff --git a/src/proto_http.c b/src/proto_http.c
index efbbc84..0849f52 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -1744,7 +1744,7 @@
 			proxy_inc_fe_req_ctr(sess->fe);
 			sess->fe->fe_counters.failed_req++;
 			if (sess->listener->counters)
-				sess->listener->counters->failed_req++;
+				HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
 			if (!(s->flags & SF_FINST_MASK))
 				s->flags |= SF_FINST_R;
@@ -1777,7 +1777,7 @@
 			proxy_inc_fe_req_ctr(sess->fe);
 			sess->fe->fe_counters.failed_req++;
 			if (sess->listener->counters)
-				sess->listener->counters->failed_req++;
+				HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
 			if (!(s->flags & SF_FINST_MASK))
 				s->flags |= SF_FINST_R;
@@ -1807,7 +1807,7 @@
 			proxy_inc_fe_req_ctr(sess->fe);
 			sess->fe->fe_counters.failed_req++;
 			if (sess->listener->counters)
-				sess->listener->counters->failed_req++;
+				HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
 			if (!(s->flags & SF_FINST_MASK))
 				s->flags |= SF_FINST_R;
@@ -2177,7 +2177,7 @@
 
 	sess->fe->fe_counters.failed_req++;
 	if (sess->listener->counters)
-		sess->listener->counters->failed_req++;
+		HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
  return_prx_cond:
 	if (!(s->flags & SF_ERR_MASK))
@@ -3545,7 +3545,7 @@
 	if (sess->fe != s->be)
 		s->be->be_counters.denied_req++;
 	if (sess->listener->counters)
-		sess->listener->counters->denied_req++;
+		HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
 	goto done_without_exp;
 
  deny:	/* this request was blocked (denied) */
@@ -3564,7 +3564,7 @@
 	if (sess->fe != s->be)
 		s->be->be_counters.denied_req++;
 	if (sess->listener->counters)
-		sess->listener->counters->denied_req++;
+		HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
 	goto return_prx_cond;
 
  return_bad_req:
@@ -3583,7 +3583,7 @@
 
 	sess->fe->fe_counters.failed_req++;
 	if (sess->listener->counters)
-		sess->listener->counters->failed_req++;
+		HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
  return_prx_cond:
 	if (!(s->flags & SF_ERR_MASK))
@@ -3921,7 +3921,7 @@
 
 	sess->fe->fe_counters.failed_req++;
 	if (sess->listener->counters)
-		sess->listener->counters->failed_req++;
+		HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
 	if (!(s->flags & SF_ERR_MASK))
 		s->flags |= SF_ERR_PRXCOND;
@@ -4127,7 +4127,7 @@
 	req->analysers &= AN_REQ_FLT_END;
 	sess->fe->fe_counters.failed_req++;
 	if (sess->listener->counters)
-		sess->listener->counters->failed_req++;
+		HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 	return 0;
 }
 
@@ -4912,7 +4912,7 @@
  return_bad_req: /* let's centralize all bad requests */
 	sess->fe->fe_counters.failed_req++;
 	if (sess->listener->counters)
-		sess->listener->counters->failed_req++;
+		HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
  return_bad_req_stats_ok:
 	txn->req.err_state = txn->req.msg_state;
@@ -5700,7 +5700,7 @@
 			s->be->be_counters.denied_resp++;
 			sess->fe->fe_counters.denied_resp++;
 			if (sess->listener->counters)
-				sess->listener->counters->denied_resp++;
+				HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
 
 			goto return_srv_prx_502;
 		}
@@ -5850,7 +5850,7 @@
 		s->be->be_counters.denied_resp++;
 		sess->fe->fe_counters.denied_resp++;
 		if (sess->listener->counters)
-			sess->listener->counters->denied_resp++;
+			HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
 
 		Alert("Blocking cacheable cookie in response from instance %s, server %s.\n",
 		      s->be->id, objt_server(s->target) ? objt_server(s->target)->id : "<dispatch>");
diff --git a/src/proto_tcp.c b/src/proto_tcp.c
index 1a26bcc..c6e33f2 100644
--- a/src/proto_tcp.c
+++ b/src/proto_tcp.c
@@ -1377,7 +1377,7 @@
 
 	sess->fe->fe_counters.denied_req++;
 	if (sess->listener->counters)
-		sess->listener->counters->denied_req++;
+		HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
 
 	return ACT_RET_STOP;
 }
diff --git a/src/session.c b/src/session.c
index 54a879b..3753a2c 100644
--- a/src/session.c
+++ b/src/session.c
@@ -57,8 +57,8 @@
 			fe->fe_counters.conn_max = fe->feconn;
 		if (li)
 			proxy_inc_fe_conn_ctr(li, fe);
-		totalconn++;
-		jobs++;
+		HA_ATOMIC_ADD(&totalconn, 1);
+		HA_ATOMIC_ADD(&jobs, 1);
 	}
 	return sess;
 }
@@ -69,7 +69,7 @@
 	session_store_counters(sess);
 	vars_prune_per_sess(&sess->vars);
 	pool_free2(pool2_session, sess);
-	jobs--;
+	HA_ATOMIC_SUB(&jobs, 1);
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
diff --git a/src/ssl_sock.c b/src/ssl_sock.c
index ca9647d..508e9bd 100644
--- a/src/ssl_sock.c
+++ b/src/ssl_sock.c
@@ -427,7 +427,7 @@
 	/* Now we can safely call SSL_free, no more pending job in engines */
 	SSL_free(ssl);
 	sslconns--;
-	jobs--;
+	HA_ATOMIC_SUB(&jobs, 1);
 }
 /*
  * function used to manage a returned SSL_ERROR_WANT_ASYNC
@@ -5487,7 +5487,7 @@
 					fd_cant_recv(afd);
 				}
 				conn->xprt_ctx = NULL;
-				jobs++;
+				HA_ATOMIC_ADD(&jobs, 1);
 				return;
 			}
 			/* Else we can remove the fds from the fdtab
diff --git a/src/stream.c b/src/stream.c
index 522441f..8cb9bfb 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -19,6 +19,7 @@
 #include <common/buffer.h>
 #include <common/debug.h>
 #include <common/memory.h>
+#include <common/hathreads.h>
 
 #include <types/applet.h>
 #include <types/capture.h>
@@ -477,7 +478,7 @@
 			objt_server(s->target)->counters.bytes_in += bytes;
 
 		if (sess->listener && sess->listener->counters)
-			sess->listener->counters->bytes_in += bytes;
+			HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes);
 
 		for (i = 0; i < MAX_SESS_STKCTR; i++) {
 			struct stkctr *stkctr = &s->stkctr[i];
@@ -514,7 +515,7 @@
 			objt_server(s->target)->counters.bytes_out += bytes;
 
 		if (sess->listener && sess->listener->counters)
-			sess->listener->counters->bytes_out += bytes;
+			HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes);
 
 		for (i = 0; i < MAX_SESS_STKCTR; i++) {
 			struct stkctr *stkctr = &s->stkctr[i];
@@ -986,7 +987,7 @@
 
 			strm_fe(s)->fe_counters.failed_req++;
 			if (strm_li(s) && strm_li(s)->counters)
-				strm_li(s)->counters->failed_req++;
+				HA_ATOMIC_ADD(&strm_li(s)->counters->failed_req, 1);
 
 			s->flags |= SF_FINST_R;
 		}
diff --git a/src/tcp_rules.c b/src/tcp_rules.c
index bdf97c8..68b2c6a 100644
--- a/src/tcp_rules.c
+++ b/src/tcp_rules.c
@@ -169,7 +169,7 @@
 				s->be->be_counters.denied_req++;
 				sess->fe->fe_counters.denied_req++;
 				if (sess->listener && sess->listener->counters)
-					sess->listener->counters->denied_req++;
+					HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
 
 				if (!(s->flags & SF_ERR_MASK))
 					s->flags |= SF_ERR_PRXCOND;
@@ -347,7 +347,7 @@
 				s->be->be_counters.denied_resp++;
 				sess->fe->fe_counters.denied_resp++;
 				if (sess->listener && sess->listener->counters)
-					sess->listener->counters->denied_resp++;
+					HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
 
 				if (!(s->flags & SF_ERR_MASK))
 					s->flags |= SF_ERR_PRXCOND;
@@ -429,7 +429,7 @@
 			else if (rule->action == ACT_ACTION_DENY) {
 				sess->fe->fe_counters.denied_conn++;
 				if (sess->listener && sess->listener->counters)
-					sess->listener->counters->denied_conn++;
+					HA_ATOMIC_ADD(&sess->listener->counters->denied_conn, 1);
 
 				result = 0;
 				break;
@@ -516,7 +516,7 @@
 			else if (rule->action == ACT_ACTION_DENY) {
 				sess->fe->fe_counters.denied_sess++;
 				if (sess->listener && sess->listener->counters)
-					sess->listener->counters->denied_sess++;
+					HA_ATOMIC_ADD(&sess->listener->counters->denied_sess, 1);
 
 				result = 0;
 				break;