MEDIUM: listener: switch bind_thread from global to group-local
It requires to both adapt the parser and change the algorithm to
redispatch incoming traffic so that local threads IDs may always
be used.
The internal structures now only reference thread group IDs and
group-local masks which are compatible with those now used by the
FD layer and the rest of the code.
diff --git a/src/cfgparse.c b/src/cfgparse.c
index 8fb866b..2bd498a 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -2645,17 +2645,19 @@
curproxy->id, err, bind_conf->arg, bind_conf->file, bind_conf->line);
free(err);
cfgerr++;
- } else if (!((mask = bind_conf->bind_thread) & all_threads_mask)) {
+ } else if (!((mask = bind_conf->bind_thread) & ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled)) {
unsigned long new_mask = 0;
+ ulong thr_mask = ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled;
while (mask) {
- new_mask |= mask & all_threads_mask;
- mask >>= global.nbthread;
+ new_mask |= mask & thr_mask;
+ mask >>= ha_tgroup_info[bind_conf->bind_tgroup-1].count;
}
bind_conf->bind_thread = new_mask;
- ha_warning("Proxy '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range defined by the global 'nbthread' directive. The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
- curproxy->id, bind_conf->arg, bind_conf->file, bind_conf->line, new_mask);
+ ha_warning("Proxy '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range supported by thread group %d (%d). The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
+ curproxy->id, bind_conf->arg, bind_conf->file, bind_conf->line,
+ bind_conf->bind_tgroup, ha_tgroup_info[bind_conf->bind_tgroup-1].count, new_mask);
}
/* apply thread masks and groups to all receivers */
@@ -4102,17 +4104,19 @@
curpeers->peers_fe->id, err, bind_conf->arg, bind_conf->file, bind_conf->line);
free(err);
cfgerr++;
- } else if (!((mask = bind_conf->bind_thread) & all_threads_mask)) {
+ } else if (!((mask = bind_conf->bind_thread) & ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled)) {
unsigned long new_mask = 0;
+ ulong thr_mask = ha_tgroup_info[bind_conf->bind_tgroup-1].threads_enabled;
while (mask) {
- new_mask |= mask & all_threads_mask;
- mask >>= global.nbthread;
+ new_mask |= mask & thr_mask;
+ mask >>= ha_tgroup_info[bind_conf->bind_tgroup-1].count;
}
bind_conf->bind_thread = new_mask;
- ha_warning("Peers section '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range defined by the global 'nbthread' directive. The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
- curpeers->peers_fe->id, bind_conf->arg, bind_conf->file, bind_conf->line, new_mask);
+ ha_warning("Peers section '%s': the thread range specified on the 'thread' directive of 'bind %s' at [%s:%d] only refers to thread numbers out of the range supported by thread group %d (%d). The thread numbers were remapped to existing threads instead (mask 0x%lx).\n",
+ curpeers->peers_fe->id, bind_conf->arg, bind_conf->file, bind_conf->line,
+ bind_conf->bind_tgroup, ha_tgroup_info[bind_conf->bind_tgroup-1].count, new_mask);
}
/* apply thread masks and groups to all receivers */
diff --git a/src/listener.c b/src/listener.c
index 5b91faf..6f8d4ad 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -992,10 +992,11 @@
if (l->rx.flags & RX_F_LOCAL_ACCEPT)
goto local_accept;
- mask = l->rx.bind_thread & all_threads_mask;
+ mask = l->rx.bind_thread & tg->threads_enabled;
if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ) && !stopping) {
struct accept_queue_ring *ring;
unsigned int t, t0, t1, t2;
+ int base = tg->base;
/* The principle is that we have two running indexes,
* each visiting in turn all threads bound to this
@@ -1042,11 +1043,11 @@
}
/* now we have two distinct thread IDs belonging to the mask */
- q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE;
+ q1 = accept_queue_rings[base + t1].tail - accept_queue_rings[base + t1].head + ACCEPT_QUEUE_SIZE;
if (q1 >= ACCEPT_QUEUE_SIZE)
q1 -= ACCEPT_QUEUE_SIZE;
- q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE;
+ q2 = accept_queue_rings[base + t2].tail - accept_queue_rings[base + t2].head + ACCEPT_QUEUE_SIZE;
if (q2 >= ACCEPT_QUEUE_SIZE)
q2 -= ACCEPT_QUEUE_SIZE;
@@ -1062,8 +1063,8 @@
* than t2.
*/
- q1 += l->thr_conn[t1];
- q2 += l->thr_conn[t2];
+ q1 += l->thr_conn[base + t1];
+ q2 += l->thr_conn[base + t2];
if (q1 - q2 < 0) {
t = t1;
@@ -1092,16 +1093,16 @@
* performing model, likely due to better cache locality
* when processing this loop.
*/
- ring = &accept_queue_rings[t];
+ ring = &accept_queue_rings[base + t];
if (accept_queue_push_mp(ring, cli_conn)) {
- _HA_ATOMIC_INC(&activity[t].accq_pushed);
+ _HA_ATOMIC_INC(&activity[base + t].accq_pushed);
tasklet_wakeup(ring->tasklet);
continue;
}
/* If the ring is full we do a synchronous accept on
* the local thread here.
*/
- _HA_ATOMIC_INC(&activity[t].accq_full);
+ _HA_ATOMIC_INC(&activity[base + t].accq_full);
}
#endif // USE_THREAD