MAJOR: threads: change thread_isolate to support inter-group synchronization

thread_isolate() and thread_isolate_full() were relying on a set of thread
masks for all threads in different states (rdv, harmless, idle). This cannot
work anymore when the number of threads increases beyond LONGBITS so we need
to change the mechanism.

What is done here is to have a counter of requesters and the number of the
current isolated thread. Threads which want to isolate themselves increment
the request counter and wait for all threads to be marked harmless (or idle)
by scanning all groups and watching the respective masks. This is possible
because threads cannot escape once they discover this counter, unless they
also want to isolate and possibly pass first. Once all threads are harmless,
the requesting thread tries to self-assign the isolated thread number, and
if it fails it loops back to checking all threads. If it wins it's guaranted
to be alone, and can drop its harmless bit, so that other competing threads
go back to the loop waiting for all threads to be harmless. The benefit of
proceeding this way is that there's very little write contention on the
thread number (none during work), hence no cache line moves between caches,
thus frozen threads do not slow down the isolated one.

Once it's done, the isolated thread resets the thread number (hence lets
another thread take the place) and decrements the requester count, thus
possibly releasing all harmless threads.

With this change there's no more need for any global mask to synchronize
any thread, and we only need to loop over a number of groups to check
64 threads at a time per iteration. As such, tinfo's threads_want_rdv
could be dropped.

This was tested with 64 threads spread into 2 groups, running 64 tasks
(from the debug dev command), 20 "show sess" (thread_isolate()), 20
"add server blah/blah" (thread_isolate()), and 20 "del server blah/blah"
(thread_isolate_full()). The load remained very low (limited by external
socat forks) and no stuck nor starved thread was found.
diff --git a/include/haproxy/thread.h b/include/haproxy/thread.h
index 5f7f515..3ce436a 100644
--- a/include/haproxy/thread.h
+++ b/include/haproxy/thread.h
@@ -173,29 +173,12 @@
 
 extern volatile unsigned long all_threads_mask;
 extern volatile unsigned long all_tgroups_mask;
+extern volatile unsigned int rdv_requests;
+extern volatile unsigned int isolated_thread;
 extern THREAD_LOCAL unsigned long tid_bit; /* The bit corresponding to the thread id */
 extern THREAD_LOCAL unsigned int tid;      /* The thread id */
 extern THREAD_LOCAL unsigned int tgid;     /* The thread group id (starts at 1) */
 
-/* explanation for tg_ctx->threads_want_rdv, and tg_ctx->threads_harmless:
- * - tg_ctx->threads_want_rdv is a bit field indicating all threads that have
- *   requested a rendez-vous of other threads using thread_isolate().
- * - tg_ctx->threads_harmless is a bit field indicating all threads that are
- *   currently harmless in that they promise not to access a shared resource.
- *
- * For a given thread, its bits in want_rdv and harmless can be translated like
- * this :
- *
- *  ----------+----------+----------------------------------------------------
- *   want_rdv | harmless | description
- *  ----------+----------+----------------------------------------------------
- *       0    |     0    | thread not interested in RDV, possibly harmful
- *       0    |     1    | thread not interested in RDV but harmless
- *       1    |     1    | thread interested in RDV and waiting for its turn
- *       1    |     0    | thread currently working isolated from others
- *  ----------+----------+----------------------------------------------------
- */
-
 #define ha_sigmask(how, set, oldset)  pthread_sigmask(how, set, oldset)
 
 /* Sets the current thread to a valid one described by <thr>, or to any thread
@@ -276,19 +259,17 @@
 static inline void thread_harmless_end()
 {
 	while (1) {
-		HA_ATOMIC_AND(&tg_ctx->threads_harmless, ~tid_bit);
-		if (likely((_HA_ATOMIC_LOAD(&tg_ctx->threads_want_rdv) &
-			    tg->threads_enabled & ~ti->ltid_bit) == 0))
+		HA_ATOMIC_AND(&tg_ctx->threads_harmless, ~ti->ltid_bit);
+		if (likely(_HA_ATOMIC_LOAD(&rdv_requests) == 0))
 			break;
 		thread_harmless_till_end();
 	}
 }
 
-/* an isolated thread has harmless cleared and want_rdv set */
+/* an isolated thread has its ID in isolated_thread */
 static inline unsigned long thread_isolated()
 {
-	return _HA_ATOMIC_LOAD(&tg_ctx->threads_want_rdv) &
-		~_HA_ATOMIC_LOAD(&tg_ctx->threads_harmless) & ti->ltid_bit;
+	return _HA_ATOMIC_LOAD(&isolated_thread) == tid;
 }
 
 /* Returns 1 if the cpu set is currently restricted for the process else 0.
diff --git a/include/haproxy/tinfo-t.h b/include/haproxy/tinfo-t.h
index 505bef6..16baffd 100644
--- a/include/haproxy/tinfo-t.h
+++ b/include/haproxy/tinfo-t.h
@@ -66,7 +66,6 @@
  * etc). It uses one cache line per thread to limit false sharing.
  */
 struct tgroup_ctx {
-	ulong threads_want_rdv;           /* mask of threads that wand a rendez-vous */
 	ulong threads_harmless;           /* mask of threads that are not modifying anything */
 	ulong threads_idle;               /* mask of threads idling in the poller */
 	ulong stopping_threads;           /* mask of threads currently stopping */
diff --git a/src/thread.c b/src/thread.c
index cc18b76..28cfa9d 100644
--- a/src/thread.c
+++ b/src/thread.c
@@ -65,6 +65,8 @@
 
 volatile unsigned long all_threads_mask __read_mostly  = 1; // nbthread 1 assumed by default
 volatile unsigned long all_tgroups_mask __read_mostly  = 1; // nbtgroup 1 assumed by default
+volatile unsigned int rdv_requests       = 0;  // total number of threads requesting RDV
+volatile unsigned int isolated_thread    = ~0; // ID of the isolated thread, or ~0 when none
 THREAD_LOCAL unsigned int  tgid          = 1; // thread ID starts at 1
 THREAD_LOCAL unsigned int  tid           = 0;
 THREAD_LOCAL unsigned long tid_bit       = (1UL << 0);
@@ -72,15 +74,14 @@
 static pthread_t ha_pthread[MAX_THREADS] = { };
 
 /* Marks the thread as harmless until the last thread using the rendez-vous
- * point quits, excluding the current one. Thus an isolated thread may be safely
- * marked as harmless. Given that we can wait for a long time, sched_yield() is
+ * point quits. Given that we can wait for a long time, sched_yield() is
  * used when available to offer the CPU resources to competing threads if
  * needed.
  */
 void thread_harmless_till_end()
 {
 	_HA_ATOMIC_OR(&tg_ctx->threads_harmless, ti->ltid_bit);
-	while (_HA_ATOMIC_LOAD(&tg_ctx->threads_want_rdv) & tg->threads_enabled & ~ti->ltid_bit) {
+	while (_HA_ATOMIC_LOAD(&rdv_requests) != 0) {
 		ha_thread_relax();
 	}
 }
@@ -93,25 +94,42 @@
  */
 void thread_isolate()
 {
-	unsigned long old;
+	uint tgrp, thr;
 
 	_HA_ATOMIC_OR(&tg_ctx->threads_harmless, ti->ltid_bit);
 	__ha_barrier_atomic_store();
-	_HA_ATOMIC_OR(&tg_ctx->threads_want_rdv, ti->ltid_bit);
+	_HA_ATOMIC_INC(&rdv_requests);
 
-	/* wait for all threads to become harmless */
-	old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless);
+	/* wait for all threads to become harmless. They cannot change their
+	 * mind once seen thanks to rdv_requests above, unless they pass in
+	 * front of us.
+	 */
 	while (1) {
-		if (unlikely((old & tg->threads_enabled) != tg->threads_enabled))
-			old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless);
-		else if (_HA_ATOMIC_CAS(&tg_ctx->threads_harmless, &old, old & ~ti->ltid_bit))
-			break;
+		for (tgrp = 0; tgrp < global.nbtgroups; tgrp++) {
+			while ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless) &
+				ha_tgroup_info[tgrp].threads_enabled) != ha_tgroup_info[tgrp].threads_enabled)
+				ha_thread_relax();
+		}
 
+		/* Now we've seen all threads marked harmless, we can try to run
+		 * by competing with other threads to win the race of the isolated
+		 * thread. It eventually converges since winners will enventually
+		 * relax their request and go back to wait for this to be over.
+		 * Competing on this only after seeing all threads harmless limits
+		 * the write contention.
+		 */
+		thr = _HA_ATOMIC_LOAD(&isolated_thread);
+		if (thr == ~0U && _HA_ATOMIC_CAS(&isolated_thread, &thr, tid))
+			break; // we won!
 		ha_thread_relax();
 	}
-	/* one thread gets released at a time here, with its harmess bit off.
-	 * The loss of this bit makes the other one continue to spin while the
-	 * thread is working alone.
+
+	/* the thread is no longer harmless as it runs */
+	_HA_ATOMIC_AND(&tg_ctx->threads_harmless, ~ti->ltid_bit);
+
+	/* the thread is isolated until it calls thread_release() which will
+	 * 1) reset isolated_thread to ~0;
+	 * 2) decrement rdv_requests.
 	 */
 }
 
@@ -131,42 +149,56 @@
  */
 void thread_isolate_full()
 {
-	unsigned long old;
+	uint tgrp, thr;
 
 	_HA_ATOMIC_OR(&tg_ctx->threads_idle, ti->ltid_bit);
 	_HA_ATOMIC_OR(&tg_ctx->threads_harmless, ti->ltid_bit);
 	__ha_barrier_atomic_store();
-	_HA_ATOMIC_OR(&tg_ctx->threads_want_rdv, ti->ltid_bit);
+	_HA_ATOMIC_INC(&rdv_requests);
 
-	/* wait for all threads to become harmless */
-	old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless);
+	/* wait for all threads to become harmless. They cannot change their
+	 * mind once seen thanks to rdv_requests above, unless they pass in
+	 * front of us.
+	 */
 	while (1) {
-		unsigned long idle = _HA_ATOMIC_LOAD(&tg_ctx->threads_idle);
+		for (tgrp = 0; tgrp < global.nbtgroups; tgrp++) {
+			while ((_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_harmless) &
+				_HA_ATOMIC_LOAD(&ha_tgroup_ctx[tgrp].threads_idle) &
+				ha_tgroup_info[tgrp].threads_enabled) != ha_tgroup_info[tgrp].threads_enabled)
+				ha_thread_relax();
+		}
 
-		if (unlikely((old & tg->threads_enabled) != tg->threads_enabled))
-			old = _HA_ATOMIC_LOAD(&tg_ctx->threads_harmless);
-		else if ((idle & tg->threads_enabled) == tg->threads_enabled &&
-			 _HA_ATOMIC_CAS(&tg_ctx->threads_harmless, &old, old & ~ti->ltid_bit))
-			break;
-
+		/* Now we've seen all threads marked harmless and idle, we can
+		 * try to run by competing with other threads to win the race
+		 * of the isolated thread. It eventually converges since winners
+		 * will enventually relax their request and go back to wait for
+		 * this to be over. Competing on this only after seeing all
+		 * threads harmless+idle limits the write contention.
+		 */
+		thr = _HA_ATOMIC_LOAD(&isolated_thread);
+		if (thr == ~0U && _HA_ATOMIC_CAS(&isolated_thread, &thr, tid))
+			break; // we won!
 		ha_thread_relax();
 	}
 
-	/* we're not idle anymore at this point. Other threads waiting on this
-	 * condition will need to wait until out next pass to the poller, or
-	 * our next call to thread_isolate_full().
+	/* we're not idle nor harmless anymore at this point. Other threads
+	 * waiting on this condition will need to wait until out next pass to
+	 * the poller, or our next call to thread_isolate_full().
 	 */
 	_HA_ATOMIC_AND(&tg_ctx->threads_idle, ~ti->ltid_bit);
+	_HA_ATOMIC_AND(&tg_ctx->threads_harmless, ~ti->ltid_bit);
 }
 
-/* Cancels the effect of thread_isolate() by releasing the current thread's bit
- * in &tg_ctx->threads_want_rdv. This immediately allows other threads to expect be
- * executed, though they will first have to wait for this thread to become
- * harmless again (possibly by reaching the poller again).
+/* Cancels the effect of thread_isolate() by resetting the ID of the isolated
+ * thread and decrementing the number of RDV requesters. This immediately allows
+ * other threads to expect to be executed, though they will first have to wait
+ * for this thread to become harmless again (possibly by reaching the poller
+ * again).
  */
 void thread_release()
 {
-	_HA_ATOMIC_AND(&tg_ctx->threads_want_rdv, ~ti->ltid_bit);
+	HA_ATOMIC_STORE(&isolated_thread, ~0U);
+	HA_ATOMIC_DEC(&rdv_requests);
 }
 
 /* Sets up threads, signals and masks, and starts threads 2 and above.