BUG/MINOR: thread: always reload threads_enabled in loops
A few loops waiting for threads to synchronize such as thread_isolate()
rightfully filter the thread masks via the threads_enabled field that
contains the list of enabled threads. However, it doesn't use an atomic
load on it. Before 2.7, the equivalent variables were marked as volatile
and were always reloaded. In 2.7 they're fields in ha_tgroup_ctx[], and
the risk that the compiler keeps them in a register inside a loop is not
null at all. In practice when ha_thread_relax() calls sched_yield() or
an x86 PAUSE instruction, it could be verified that the variable is
always reloaded. If these are avoided (e.g. architecture providing
neither solution), it's visible in asm code that the variables are not
reloaded. In this case, if a thread exists just between the moment the
two values are read, the loop could spin forever.
This patch adds the required _HA_ATOMIC_LOAD() on the relevant
threads_enabled fields. It must be backported to 2.7.
diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h
index 2a5bcad..984b151 100644
--- a/include/haproxy/fd.h
+++ b/include/haproxy/fd.h
@@ -136,11 +136,11 @@
unsigned long update_mask;
update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~ti->ltid_bit);
- while ((update_mask & tg->threads_enabled) == 0) {
+ while ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) == 0) {
/* If we were the last one that had to update that entry, remove it from the list */
fd_rm_from_fd_list(&update_list[tgid - 1], fd);
update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask);
- if ((update_mask & tg->threads_enabled) != 0) {
+ if ((update_mask & _HA_ATOMIC_LOAD(&tg->threads_enabled)) != 0) {
/* Maybe it's been re-updated in the meanwhile, and we
* wrongly removed it from the list, if so, re-add it
*/
diff --git a/src/fd.c b/src/fd.c
index 7e56d8a..4d4700f 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -481,7 +481,8 @@
unsigned long update_mask = fdtab[fd].update_mask;
int thr;
- while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, ha_tgroup_info[tgrp - 1].threads_enabled))
+ while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask,
+ _HA_ATOMIC_LOAD(&ha_tgroup_info[tgrp - 1].threads_enabled)))
__ha_cpu_relax();
fd_add_to_fd_list(&update_list[tgrp - 1], fd);
diff --git a/src/haproxy.c b/src/haproxy.c
index 481fe5a..e0b48a7 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -2968,7 +2968,7 @@
_HA_ATOMIC_OR_FETCH(&stopping_tgroup_mask, tg->tgid_bit) == tg->tgid_bit) {
/* first one to detect it, notify all threads that stopping was just set */
for (i = 0; i < global.nbthread; i++) {
- if (ha_thread_info[i].tg->threads_enabled &
+ if (_HA_ATOMIC_LOAD(&ha_thread_info[i].tg->threads_enabled) &
ha_thread_info[i].ltid_bit &
~_HA_ATOMIC_LOAD(&ha_thread_info[i].tg_ctx->stopping_threads))
wake_thread(i);
@@ -2981,14 +2981,15 @@
(_HA_ATOMIC_LOAD(&stopping_tgroup_mask) & all_tgroups_mask) == all_tgroups_mask) {
/* check that all threads are aware of the stopping status */
for (i = 0; i < global.nbtgroups; i++)
- if ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[i].stopping_threads) & ha_tgroup_info[i].threads_enabled) !=
- ha_tgroup_info[i].threads_enabled)
+ if ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[i].stopping_threads) &
+ _HA_ATOMIC_LOAD(&ha_tgroup_info[i].threads_enabled)) !=
+ _HA_ATOMIC_LOAD(&ha_tgroup_info[i].threads_enabled))
break;
#ifdef USE_THREAD
if (i == global.nbtgroups) {
/* all are OK, let's wake them all and stop */
for (i = 0; i < global.nbthread; i++)
- if (i != tid && ha_thread_info[i].tg->threads_enabled & ha_thread_info[i].ltid_bit)
+ if (i != tid && _HA_ATOMIC_LOAD(&ha_thread_info[i].tg->threads_enabled) & ha_thread_info[i].ltid_bit)
wake_thread(i);
break;
}
diff --git a/src/listener.c b/src/listener.c
index aa466d0..4867566 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -1021,7 +1021,7 @@
if (l->rx.flags & RX_F_LOCAL_ACCEPT)
goto local_accept;
- mask = l->rx.bind_thread & tg->threads_enabled;
+ mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled);
if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ) && !stopping) {
struct accept_queue_ring *ring;
unsigned int t, t0, t1, t2;
diff --git a/src/proxy.c b/src/proxy.c
index a0e8c15..4696dec 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -2164,7 +2164,7 @@
send_log(NULL, LOG_WARNING, "Some tasks resisted to hard-stop, exiting now.\n");
killed = 2;
for (thr = 0; thr < global.nbthread; thr++)
- if (ha_thread_info[thr].tg->threads_enabled & ha_thread_info[thr].ltid_bit)
+ if (_HA_ATOMIC_LOAD(&ha_thread_info[thr].tg->threads_enabled) & ha_thread_info[thr].ltid_bit)
wake_thread(thr);
t->expire = TICK_ETERNITY;
return t;
diff --git a/src/thread.c b/src/thread.c
index 04b910b..00d9f9f 100644
--- a/src/thread.c
+++ b/src/thread.c
@@ -104,9 +104,14 @@
*/
while (1) {
for (tgrp = 0; tgrp < global.nbtgroups; tgrp++) {
- while ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless) &
- ha_tgroup_info[tgrp].threads_enabled) != ha_tgroup_info[tgrp].threads_enabled)
+ do {
+ ulong te = _HA_ATOMIC_LOAD(&ha_tgroup_info[tgrp].threads_enabled);
+ ulong th = _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless);
+
+ if ((th & te) == te)
+ break;
ha_thread_relax();
+ } while (1);
}
/* Now we've seen all threads marked harmless, we can try to run
@@ -160,10 +165,15 @@
*/
while (1) {
for (tgrp = 0; tgrp < global.nbtgroups; tgrp++) {
- while ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless) &
- _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_idle) &
- ha_tgroup_info[tgrp].threads_enabled) != ha_tgroup_info[tgrp].threads_enabled)
+ do {
+ ulong te = _HA_ATOMIC_LOAD(&ha_tgroup_info[tgrp].threads_enabled);
+ ulong th = _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless);
+ ulong id = _HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_idle);
+
+ if ((th & id & te) == te)
+ break;
ha_thread_relax();
+ } while (1);
}
/* Now we've seen all threads marked harmless and idle, we can