MAJOR: listener: use the multi-queue for multi-thread listeners
The idea is to redistribute an incoming connection to one of the
threads a bind_conf is bound to when there is more than one. We do this
using a random improved by the p2c algorithm : a random() call returns
two different thread numbers. We then compare their respective connection
count and the length of their accept queues, and pick the least loaded
one. We even use this deferred accept mechanism if the target thread
ends up being the local thread, because this maintains fairness between
all connections and tests show that it's about 1% faster this way,
likely due to cache locality. If the target thread's accept queue is
full, the connection is accepted synchronously by the current thread.
diff --git a/src/listener.c b/src/listener.c
index 9e8c878..2b8c2df 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -816,6 +816,52 @@
*/
next_conn = 0;
+#if defined(USE_THREAD)
+ count = l->bind_conf->thr_count;
+ if (count > 1) {
+ struct accept_queue_ring *ring;
+ int r, t1, t2, q1, q2;
+
+ /* pick two small distinct random values and drop lower bits */
+ r = (random() >> 8) % ((count - 1) * count);
+ t2 = r / count; // 0..thr_count-2
+ t1 = r % count; // 0..thr_count-1
+ t2 += t1 + 1; // necessarily different from t1
+
+ if (t2 >= count)
+ t2 -= count;
+
+ t1 = bind_map_thread_id(l->bind_conf, t1);
+ t2 = bind_map_thread_id(l->bind_conf, t2);
+
+ q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE;
+ if (q1 >= ACCEPT_QUEUE_SIZE)
+ q1 -= ACCEPT_QUEUE_SIZE;
+
+ q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE;
+ if (q2 >= ACCEPT_QUEUE_SIZE)
+ q2 -= ACCEPT_QUEUE_SIZE;
+
+ /* make t1 the lowest loaded thread */
+ if (q1 >= ACCEPT_QUEUE_SIZE || l->thr_conn[t1] + q1 > l->thr_conn[t2] + q2)
+ t1 = t2;
+
+ /* We use deferred accepts even if it's the local thread because
+ * tests show that it's the best performing model, likely due to
+ * better cache locality when processing this loop.
+ */
+ ring = &accept_queue_rings[t1];
+ if (accept_queue_push_mp(ring, cfd, l, &addr, laddr)) {
+ task_wakeup(ring->task, TASK_WOKEN_IO);
+ continue;
+ }
+ /* If the ring is full we do a synchronous accept on
+ * the local thread here.
+ * FIXME: we should update some stats here.
+ */
+ }
+#endif // USE_THREAD
+
HA_ATOMIC_ADD(&l->thr_conn[tid], 1);
ret = l->accept(l, cfd, &addr);
if (unlikely(ret <= 0)) {