MEDIUM: listener: change the LB algorithm again to use two round robins instead

At this point, the random used in the hybrid queue distribution algorithm
provides little benefit over a periodic scan, can even have a slightly
worse worst case, and it requires to establish a mapping between a
discrete number and a thread ID among a mask.

This patch introduces a different approach using two indexes. One scans
the thread mask from the left, the other one from the right. The related
threads' loads are compared, and the least loaded one receives the new
connection. Then one index is adjusted depending on the load resulting
from this election, so that we start the next election from two known
lightly loaded threads.

This approach provides an extra 1% peak performance boost over the previous
one, which likely corresponds to the removal of the extra work on the
random and the previously required two mappings of index to thread.

A test was attempted with two indexes going in the same direction but it
was much less interesting because the same thread pairs were compared most
of the time with the load climbing in a ladder-like model. With the reverse
directions this cannot happen.
diff --git a/src/listener.c b/src/listener.c
index 3e080b4..a7e7290 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -597,6 +597,7 @@
 {
 	struct listener *l = fdtab[fd].owner;
 	struct proxy *p;
+	__decl_hathreads(unsigned long mask);
 	int max_accept;
 	int next_conn = 0;
 	int next_feconn = 0;
@@ -844,60 +845,116 @@
 		next_actconn = 0;
 
 #if defined(USE_THREAD)
-		count = l->bind_conf->thr_count;
-		if (count > 1 && (global.tune.options & GTUNE_LISTENER_MQ)) {
+		mask = thread_mask(l->bind_conf->bind_thread);
+		if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ)) {
 			struct accept_queue_ring *ring;
-			int t1, t2, q1, q2;
+			unsigned int t, t0, t1, t2;
 
-			/* pick a first thread ID using a round robin index,
-			 * and a second thread ID using a random. The
-			 * connection will be assigned to the one with the
-			 * least connections. This provides fairness on short
+			/* The principle is that we have two running indexes,
+			 * each visiting in turn all threads bound to this
+			 * listener. The connection will be assigned to the one
+			 * with the least connections, and the other one will
+			 * be updated. This provides a good fairness on short
 			 * connections (round robin) and on long ones (conn
-			 * count).
+			 * count), without ever missing any idle thread.
 			 */
-			t1 = l->bind_conf->thr_idx;
+
+			/* keep a copy for the final update. thr_idx is composite
+			 * and made of (t2<<16) + t1.
+			 */
+			t0 = l->bind_conf->thr_idx;
 			do {
-				t2 = t1 + 1;
-				if (t2 >= count)
-					t2 = 0;
-			} while (!HA_ATOMIC_CAS(&l->bind_conf->thr_idx, &t1, t2));
+				unsigned long m1, m2;
+				int q1, q2;
+
+				t2 = t1 = t0;
+				t2 >>= 16;
+				t1 &= 0xFFFF;
+
+				/* t1 walks low to high bits ;
+				 * t2 walks high to low.
+				 */
+				m1 = mask >> t1;
+				m2 = mask & (t2 ? nbits(t2 + 1) : ~0UL);
+
+				if (unlikely((signed long)m2 >= 0)) {
+					/* highest bit not set */
+					if (!m2)
+						m2 = mask;
+
+					t2 = my_flsl(m2) - 1;
+				}
+
+				if (unlikely(!(m1 & 1) || t1 == t2)) {
+					m1 &= ~1UL;
+					if (!m1) {
+						m1 = mask;
+						t1 = 0;
+					}
+					t1 += my_ffsl(m1) - 1;
+				}
 
-			t2 = (random() >> 8) % (count - 1);  // 0..thr_count-2
-			t2 += t1 + 1;   // necessarily different from t1
+				/* now we have two distinct thread IDs belonging to the mask */
+				q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE;
+				if (q1 >= ACCEPT_QUEUE_SIZE)
+					q1 -= ACCEPT_QUEUE_SIZE;
 
-			if (t2 >= count)
-				t2 -= count;
+				q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE;
+				if (q2 >= ACCEPT_QUEUE_SIZE)
+					q2 -= ACCEPT_QUEUE_SIZE;
 
-			t1 = bind_map_thread_id(l->bind_conf, t1);
-			t2 = bind_map_thread_id(l->bind_conf, t2);
+				/* we have 3 possibilities now :
+				 *   q1 < q2 : t1 is less loaded than t2, so we pick it
+				 *             and update t2 (since t1 might still be
+				 *             lower than another thread)
+				 *   q1 > q2 : t2 is less loaded than t1, so we pick it
+				 *             and update t1 (since t2 might still be
+				 *             lower than another thread)
+				 *   q1 = q2 : both are equally loaded, thus we pick t1
+				 *             and update t1 as it will become more loaded
+				 *             than t2.
+				 */
 
-			q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE;
-			if (q1 >= ACCEPT_QUEUE_SIZE)
-				q1 -= ACCEPT_QUEUE_SIZE;
+				q1 += l->thr_conn[t1];
+				q2 += l->thr_conn[t2];
 
-			q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE;
-			if (q2 >= ACCEPT_QUEUE_SIZE)
-				q2 -= ACCEPT_QUEUE_SIZE;
+				if (q1 - q2 < 0) {
+					t = t1;
+					t2 = t2 ? t2 - 1 : LONGBITS - 1;
+				}
+				else if (q1 - q2 > 0) {
+					t = t2;
+					t1++;
+					if (t1 >= LONGBITS)
+						t1 = 0;
+				}
+				else {
+					t = t1;
+					t1++;
+					if (t1 >= LONGBITS)
+						t1 = 0;
+				}
 
-			/* make t1 the lowest loaded thread */
-			if (q1 >= ACCEPT_QUEUE_SIZE || l->thr_conn[t1] + q1 > l->thr_conn[t2] + q2)
-				t1 = t2;
+				/* new value for thr_idx */
+				t1 += (t2 << 16);
+			} while (unlikely(!HA_ATOMIC_CAS(&l->bind_conf->thr_idx, &t0, t1)));
 
-			/* We use deferred accepts even if it's the local thread because
-			 * tests show that it's the best performing model, likely due to
-			 * better cache locality when processing this loop.
+			/* We successfully selected the best thread "t" for this
+			 * connection. We use deferred accepts even if it's the
+			 * local thread because tests show that it's the best
+			 * performing model, likely due to better cache locality
+			 * when processing this loop.
 			 */
-			ring = &accept_queue_rings[t1];
+			ring = &accept_queue_rings[t];
 			if (accept_queue_push_mp(ring, cfd, l, &addr, laddr)) {
-				HA_ATOMIC_ADD(&activity[t1].accq_pushed, 1);
+				HA_ATOMIC_ADD(&activity[t].accq_pushed, 1);
 				task_wakeup(ring->task, TASK_WOKEN_IO);
 				continue;
 			}
 			/* If the ring is full we do a synchronous accept on
 			 * the local thread here.
 			 */
-			HA_ATOMIC_ADD(&activity[t1].accq_full, 1);
+			HA_ATOMIC_ADD(&activity[t].accq_full, 1);
 		}
 #endif // USE_THREAD