MEDIUM: listener: rework thread assignment to consider all groups
Till now threads were assigned in listener_accept() to other threads of
the same group only, using a single group mask. Now that we have all the
relevant info (array of listeners of the same shard), we can spread the
thr_idx to cover all assigned groups. The thread indexes now contain the
group number in their upper bits, and the indexes run over te whole list
of threads, all groups included.
One particular subtlety here is that switching to a thread from another
group also means switching the group, hence the listener. As such, when
changing the group we need to update the connection's owner to point to
the listener of the same shard that is bound to the target group.
diff --git a/src/listener.c b/src/listener.c
index 8215cec..f97e2d7 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -1176,67 +1176,167 @@
#if defined(USE_THREAD)
+ if (!(global.tune.options & GTUNE_LISTENER_MQ_ANY) || stopping)
+ goto local_accept;
+
+ /* we want to perform thread rebalancing if the listener is
+ * bound to more than one thread or if it's part of a shard
+ * with more than one listener.
+ */
mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled);
- if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ_ANY) && !stopping) {
+ if (l->rx.shard_info || atleast2(mask)) {
struct accept_queue_ring *ring;
- unsigned int t, t0, t1, t2;
- int base = tg->base;
+ struct listener *new_li;
+ uint n0, n1, n2, r1, r2, t, t1, t2;
+ const struct tgroup_info *g1, *g2;
+ ulong m1, m2;
/* The principle is that we have two running indexes,
* each visiting in turn all threads bound to this
- * listener. The connection will be assigned to the one
- * with the least connections, and the other one will
- * be updated. This provides a good fairness on short
- * connections (round robin) and on long ones (conn
- * count), without ever missing any idle thread.
+ * listener's shard. The connection will be assigned to
+ * the one with the least connections, and the other
+ * one will be updated. This provides a good fairness
+ * on short connections (round robin) and on long ones
+ * (conn count), without ever missing any idle thread.
+ * Each thread number is encoded as a combination of
+ * times the receiver number and its local thread
+ * number from 0 to MAX_THREADS_PER_GROUP - 1. The two
+ * indexes are stored as 16 bit numbers in the thr_idx
+ * variable.
+ *
+ * In the loop below we have this for each index:
+ * - n is the thread index
+ * - r is the receiver number
+ * - g is the receiver's thread group
+ * - t is the thread number in this receiver
+ * - m is the receiver's thread mask shifted by the thread number
*/
/* keep a copy for the final update. thr_idx is composite
- * and made of (t2<<16) + t1.
+ * and made of (n2<<16) + n1.
*/
- t0 = l->thr_idx;
- do {
- unsigned long m1, m2;
+ n0 = l->thr_idx;
+ while (1) {
int q1, q2;
+ new_li = NULL;
+
- t2 = t1 = t0;
- t2 >>= 16;
- t1 &= 0xFFFF;
+ n2 = n1 = n0;
+ n2 >>= 16;
+ n1 &= 0xFFFF;
/* t1 walks low to high bits ;
* t2 walks high to low.
*/
- m1 = mask >> t1;
- m2 = mask & (t2 ? nbits(t2 + 1) : ~0UL);
- if (unlikely(!(m1 & 1))) {
- m1 &= ~1UL;
- if (!m1) {
- m1 = mask;
- t1 = 0;
+ /* calculate r1/g1/t1 first */
+ r1 = n1 / MAX_THREADS_PER_GROUP;
+ t1 = n1 % MAX_THREADS_PER_GROUP;
+ while (1) {
+ if (l->rx.shard_info) {
+ /* multiple listeners, take the group into account */
+ if (r1 >= l->rx.shard_info->nbgroups)
+ r1 = 0;
+
+ g1 = &ha_tgroup_info[l->rx.shard_info->members[r1]->bind_tgroup - 1];
+ m1 = l->rx.shard_info->members[r1]->bind_thread;
+ } else {
+ /* single listener */
+ r1 = 0;
+ g1 = tg;
+ m1 = l->rx.bind_thread;
}
- t1 += my_ffsl(m1) - 1;
+ m1 &= _HA_ATOMIC_LOAD(&g1->threads_enabled);
+ m1 >>= t1;
+
+ /* find first existing thread */
+ if (unlikely(!(m1 & 1))) {
+ m1 &= ~1UL;
+ if (!m1) {
+ /* no more threads here, switch to
+ * first thread of next group.
+ */
+ t1 = 0;
+ if (l->rx.shard_info)
+ r1++;
+ /* loop again */
+ continue;
+ }
+ t1 += my_ffsl(m1) - 1;
+ }
+ /* done: r1 and t1 are OK */
+ break;
}
+ /* now r2/g2/t2 */
+ r2 = n2 / MAX_THREADS_PER_GROUP;
+ t2 = n2 % MAX_THREADS_PER_GROUP;
+
/* if running in round-robin mode ("fair"), we don't need
* to go further.
*/
if ((global.tune.options & GTUNE_LISTENER_MQ_ANY) == GTUNE_LISTENER_MQ_FAIR) {
- t = t1;
+ t = g1->base + t1;
+ if (l->rx.shard_info && t != tid)
+ new_li = l->rx.shard_info->members[r1]->owner;
goto updt_t1;
}
- if (unlikely(!(m2 & (1UL << t2)) || t1 == t2)) {
- /* highest bit not set */
- if (!m2)
- m2 = mask;
+ while (1) {
+ if (l->rx.shard_info) {
+ /* multiple listeners, take the group into account */
+ if (r2 >= l->rx.shard_info->nbgroups)
+ r2 = l->rx.shard_info->nbgroups - 1;
- t2 = my_flsl(m2) - 1;
+ g2 = &ha_tgroup_info[l->rx.shard_info->members[r2]->bind_tgroup - 1];
+ m2 = l->rx.shard_info->members[r2]->bind_thread;
+ } else {
+ /* single listener */
+ r2 = 0;
+ g2 = tg;
+ m2 = l->rx.bind_thread;
+ }
+ m2 &= _HA_ATOMIC_LOAD(&g2->threads_enabled);
+ m2 &= nbits(t2 + 1);
+
+ /* find previous existing thread */
+ if (unlikely(!(m2 & (1UL << t2)) || (g1 == g2 && t1 == t2))) {
+ /* highest bit not set or colliding threads, let's check
+ * if we still have other threads available after this
+ * one.
+ */
+ m2 &= ~(1UL << t2);
+ if (!m2) {
+ /* no more threads here, switch to
+ * last thread of previous group.
+ */
+ t2 = MAX_THREADS_PER_GROUP - 1;
+ if (l->rx.shard_info)
+ r2--;
+ /* loop again */
+ continue;
+ }
+ t2 = my_flsl(m2) - 1;
+ }
+ /* done: r2 and t2 are OK */
+ break;
}
- /* now we have two distinct thread IDs belonging to the mask */
- q1 = accept_queue_ring_len(&accept_queue_rings[base + t1]);
- q2 = accept_queue_ring_len(&accept_queue_rings[base + t2]);
+ /* here we have (r1,g1,t1) that designate the first receiver, its
+ * thread group and local thread, and (r2,g2,t2) that designate
+ * the second receiver, its thread group and local thread.
+ */
+ q1 = accept_queue_ring_len(&accept_queue_rings[g1->base + t1]);
+ q2 = accept_queue_ring_len(&accept_queue_rings[g2->base + t2]);
+
+ /* add to this the currently active connections */
+ if (l->rx.shard_info) {
+ q1 += _HA_ATOMIC_LOAD(&((struct listener *)l->rx.shard_info->members[r1]->owner)->thr_conn[t1]);
+ q2 += _HA_ATOMIC_LOAD(&((struct listener *)l->rx.shard_info->members[r2]->owner)->thr_conn[t2]);
+ } else {
+ q1 += _HA_ATOMIC_LOAD(&l->thr_conn[t1]);
+ q2 += _HA_ATOMIC_LOAD(&l->thr_conn[t2]);
+ }
/* we have 3 possibilities now :
* q1 < q2 : t1 is less loaded than t2, so we pick it
@@ -1250,33 +1350,64 @@
* than t2.
*/
- q1 += l->thr_conn[t1];
- q2 += l->thr_conn[t2];
-
if (q1 - q2 < 0) {
- t = t1;
- t2 = t2 ? t2 - 1 : LONGBITS - 1;
+ t = g1->base + t1;
+
+ if (l->rx.shard_info)
+ new_li = l->rx.shard_info->members[r1]->owner;
+
+ t2--;
+ if (t2 >= MAX_THREADS_PER_GROUP) {
+ if (l->rx.shard_info)
+ r2--;
+ t2 = MAX_THREADS_PER_GROUP - 1;
+ }
}
else if (q1 - q2 > 0) {
- t = t2;
- t1++;
- if (t1 >= LONGBITS)
- t1 = 0;
+ t = g2->base + t2;
+
+ if (l->rx.shard_info)
+ new_li = l->rx.shard_info->members[r2]->owner;
+ goto updt_t1;
}
else {
- t = t1;
+ t = g1->base + t1;
+
+ if (l->rx.shard_info)
+ new_li = l->rx.shard_info->members[r1]->owner;
updt_t1:
t1++;
- if (t1 >= LONGBITS)
+ if (t1 >= MAX_THREADS_PER_GROUP) {
+ if (l->rx.shard_info)
+ r1++;
t1 = 0;
+ }
}
+ /* the target thread number is in <t> now */
+
/* new value for thr_idx */
- t1 += (t2 << 16);
- } while (unlikely(!_HA_ATOMIC_CAS(&l->thr_idx, &t0, t1)));
+ n1 = ((r1 & 63) * MAX_THREADS_PER_GROUP) + t1;
+ n2 = ((r2 & 63) * MAX_THREADS_PER_GROUP) + t2;
+ n1 += (n2 << 16);
+
+ /* try to update the index */
+ if (likely(_HA_ATOMIC_CAS(&l->thr_idx, &n0, n1)))
+ break;
+ } /* end of main while() loop */
+
+ /* we may need to update the listener in the connection
+ * if we switched to another group.
+ */
+ if (new_li)
+ cli_conn->target = &new_li->obj_type;
+
+ /* here we have the target thread number in <t> and we hold a
+ * reservation in the target ring.
+ */
if (l->rx.proto && l->rx.proto->set_affinity) {
- if (l->rx.proto->set_affinity(cli_conn, base + t)) {
+ if (l->rx.proto->set_affinity(cli_conn, t)) {
/* Failed migration, stay on the same thread. */
goto local_accept;
}
@@ -1288,20 +1419,22 @@
* performing model, likely due to better cache locality
* when processing this loop.
*/
- ring = &accept_queue_rings[base + t];
+ ring = &accept_queue_rings[t];
if (accept_queue_push_mp(ring, cli_conn)) {
- _HA_ATOMIC_INC(&activity[base + t].accq_pushed);
+ _HA_ATOMIC_INC(&activity[t].accq_pushed);
tasklet_wakeup(ring->tasklet);
continue;
}
/* If the ring is full we do a synchronous accept on
* the local thread here.
*/
- _HA_ATOMIC_INC(&activity[base + t].accq_full);
+ _HA_ATOMIC_INC(&activity[t].accq_full);
}
#endif // USE_THREAD
local_accept:
+ /* restore the connection's listener in case we failed to migrate above */
+ cli_conn->target = &l->obj_type;
_HA_ATOMIC_INC(&l->thr_conn[ti->ltid]);
ret = l->bind_conf->accept(cli_conn);
if (unlikely(ret <= 0)) {