REORG: thread/sched: move the task_per_thread stuff to thread_ctx
The scheduler contains a lot of stuff that is thread-local and not
exclusively tied to the scheduler. Other parts (namely thread_info)
contain similar thread-local context that ought to be merged with
it but that is even less related to the scheduler. However moving
more data into this structure isn't possible since task.h is high
level and cannot be included everywhere (e.g. activity) without
causing include loops.
In the end, it appears that the task_per_thread represents most of
the per-thread context defined with generic types and should simply
move to tinfo.h so that everyone can use them.
The struct was renamed to thread_ctx and the variable "sched" was
renamed to "th_ctx". "sched" used to be initialized manually from
run_thread_poll_loop(), now it's initialized by ha_set_tid() just
like ti, tid, tid_bit.
The memset() in init_task() was removed in favor of a bss initialization
of the array, so that other subsystems can put their stuff in this array.
Since the tasklet array has TL_CLASSES elements, the TL_* definitions
was moved there as well, but it's not a problem.
The vast majority of the change in this patch is caused by the
renaming of the structures.
diff --git a/src/activity.c b/src/activity.c
index a4527f2..b1b91ac 100644
--- a/src/activity.c
+++ b/src/activity.c
@@ -876,7 +876,7 @@
/* 2. all threads's local run queues */
for (thr = 0; thr < global.nbthread; thr++) {
/* task run queue */
- rqnode = eb32sc_first(&task_per_thread[thr].rqueue, ~0UL);
+ rqnode = eb32sc_first(&ha_thread_ctx[thr].rqueue, ~0UL);
while (rqnode) {
t = eb32sc_entry(rqnode, struct task, rq);
entry = sched_activity_entry(tmp_activity, t->process);
@@ -890,7 +890,7 @@
}
/* shared tasklet list */
- list_for_each_entry(tl, mt_list_to_list(&task_per_thread[thr].shared_tasklet_list), list) {
+ list_for_each_entry(tl, mt_list_to_list(&ha_thread_ctx[thr].shared_tasklet_list), list) {
t = (const struct task *)tl;
entry = sched_activity_entry(tmp_activity, t->process);
if (!TASK_IS_TASKLET(t) && t->call_date) {
@@ -903,7 +903,7 @@
/* classful tasklets */
for (queue = 0; queue < TL_CLASSES; queue++) {
- list_for_each_entry(tl, &task_per_thread[thr].tasklets[queue], list) {
+ list_for_each_entry(tl, &ha_thread_ctx[thr].tasklets[queue], list) {
t = (const struct task *)tl;
entry = sched_activity_entry(tmp_activity, t->process);
if (!TASK_IS_TASKLET(t) && t->call_date) {
diff --git a/src/debug.c b/src/debug.c
index ae3d9ae..5144094 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -161,14 +161,14 @@
ha_get_pthread_id(thr),
thread_has_tasks(),
!!(global_tasks_mask & thr_bit),
- !eb_is_empty(&task_per_thread[thr].timers),
- !eb_is_empty(&task_per_thread[thr].rqueue),
- !(LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_URGENT]) &&
- LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_NORMAL]) &&
- LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_BULK]) &&
- MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
- task_per_thread[thr].tasks_in_list,
- task_per_thread[thr].rq_total,
+ !eb_is_empty(&ha_thread_ctx[thr].timers),
+ !eb_is_empty(&ha_thread_ctx[thr].rqueue),
+ !(LIST_ISEMPTY(&ha_thread_ctx[thr].tasklets[TL_URGENT]) &&
+ LIST_ISEMPTY(&ha_thread_ctx[thr].tasklets[TL_NORMAL]) &&
+ LIST_ISEMPTY(&ha_thread_ctx[thr].tasklets[TL_BULK]) &&
+ MT_LIST_ISEMPTY(&ha_thread_ctx[thr].shared_tasklet_list)),
+ ha_thread_ctx[thr].tasks_in_list,
+ ha_thread_ctx[thr].rq_total,
stuck,
!!(task_profiling_mask & thr_bit));
@@ -186,7 +186,7 @@
return;
chunk_appendf(buf, " curr_task=");
- ha_task_dump(buf, sched->current, " ");
+ ha_task_dump(buf, th_ctx->current, " ");
if (stuck) {
/* We only emit the backtrace for stuck threads in order not to
diff --git a/src/haproxy.c b/src/haproxy.c
index 885cc26..c8b5ee7 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -2699,7 +2699,6 @@
ha_set_tid((unsigned long)data);
set_thread_cpu_affinity();
- sched = &task_per_thread[tid];
clock_set_local_source();
/* Now, initialize one thread init at a time. This is better since
diff --git a/src/task.c b/src/task.c
index 45111f1..65aa646 100644
--- a/src/task.c
+++ b/src/task.c
@@ -38,8 +38,6 @@
volatile unsigned long global_tasks_mask = 0; /* Mask of threads with tasks in the global runqueue */
unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
-THREAD_LOCAL struct task_per_thread *sched = &task_per_thread[0]; /* scheduler context for the current thread */
-
__decl_aligned_spinlock(rq_lock); /* spin lock related to run queue */
__decl_aligned_rwlock(wq_lock); /* RW lock related to the wait queue */
@@ -51,8 +49,6 @@
#endif
-struct task_per_thread task_per_thread[MAX_THREADS];
-
/* Flags the task <t> for immediate destruction and puts it into its first
* thread's shared tasklet list if not yet queued/running. This will bypass
@@ -92,10 +88,10 @@
thr = my_ffsl(t->thread_mask) - 1;
/* Beware: tasks that have never run don't have their ->list empty yet! */
- MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list,
+ MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list,
(struct mt_list *)&((struct tasklet *)t)->list);
- _HA_ATOMIC_INC(&task_per_thread[thr].rq_total);
- _HA_ATOMIC_INC(&task_per_thread[thr].tasks_in_list);
+ _HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total);
+ _HA_ATOMIC_INC(&ha_thread_ctx[thr].tasks_in_list);
if (sleeping_thread_mask & (1UL << thr)) {
_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
wake_thread(thr);
@@ -131,9 +127,9 @@
*/
if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_IN_LIST | TASK_KILLED)) {
thr = t->tid > 0 ? t->tid: tid;
- MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list,
+ MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list,
(struct mt_list *)&t->list);
- _HA_ATOMIC_INC(&task_per_thread[thr].rq_total);
+ _HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total);
if (sleeping_thread_mask & (1UL << thr)) {
_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
wake_thread(thr);
@@ -153,31 +149,31 @@
if (likely(thr < 0)) {
/* this tasklet runs on the caller thread */
if (tl->state & TASK_HEAVY) {
- LIST_APPEND(&sched->tasklets[TL_HEAVY], &tl->list);
- sched->tl_class_mask |= 1 << TL_HEAVY;
+ LIST_APPEND(&th_ctx->tasklets[TL_HEAVY], &tl->list);
+ th_ctx->tl_class_mask |= 1 << TL_HEAVY;
}
else if (tl->state & TASK_SELF_WAKING) {
- LIST_APPEND(&sched->tasklets[TL_BULK], &tl->list);
- sched->tl_class_mask |= 1 << TL_BULK;
+ LIST_APPEND(&th_ctx->tasklets[TL_BULK], &tl->list);
+ th_ctx->tl_class_mask |= 1 << TL_BULK;
}
- else if ((struct task *)tl == sched->current) {
+ else if ((struct task *)tl == th_ctx->current) {
_HA_ATOMIC_OR(&tl->state, TASK_SELF_WAKING);
- LIST_APPEND(&sched->tasklets[TL_BULK], &tl->list);
- sched->tl_class_mask |= 1 << TL_BULK;
+ LIST_APPEND(&th_ctx->tasklets[TL_BULK], &tl->list);
+ th_ctx->tl_class_mask |= 1 << TL_BULK;
}
- else if (sched->current_queue < 0) {
- LIST_APPEND(&sched->tasklets[TL_URGENT], &tl->list);
- sched->tl_class_mask |= 1 << TL_URGENT;
+ else if (th_ctx->current_queue < 0) {
+ LIST_APPEND(&th_ctx->tasklets[TL_URGENT], &tl->list);
+ th_ctx->tl_class_mask |= 1 << TL_URGENT;
}
else {
- LIST_APPEND(&sched->tasklets[sched->current_queue], &tl->list);
- sched->tl_class_mask |= 1 << sched->current_queue;
+ LIST_APPEND(&th_ctx->tasklets[th_ctx->current_queue], &tl->list);
+ th_ctx->tl_class_mask |= 1 << th_ctx->current_queue;
}
- _HA_ATOMIC_INC(&sched->rq_total);
+ _HA_ATOMIC_INC(&th_ctx->rq_total);
} else {
/* this tasklet runs on a specific thread. */
- MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list, (struct mt_list *)&tl->list);
- _HA_ATOMIC_INC(&task_per_thread[thr].rq_total);
+ MT_LIST_APPEND(&ha_thread_ctx[thr].shared_tasklet_list, (struct mt_list *)&tl->list);
+ _HA_ATOMIC_INC(&ha_thread_ctx[thr].rq_total);
if (sleeping_thread_mask & (1UL << thr)) {
_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
wake_thread(thr);
@@ -195,7 +191,7 @@
*/
void __task_wakeup(struct task *t)
{
- struct eb_root *root = &sched->rqueue;
+ struct eb_root *root = &th_ctx->rqueue;
#ifdef USE_THREAD
if (t->thread_mask != tid_bit && global.nbthread != 1) {
@@ -210,8 +206,8 @@
} else
#endif
{
- _HA_ATOMIC_INC(&sched->rq_total);
- t->rq.key = ++sched->rqueue_ticks;
+ _HA_ATOMIC_INC(&th_ctx->rq_total);
+ t->rq.key = ++th_ctx->rqueue_ticks;
}
if (likely(t->nice)) {
@@ -267,8 +263,8 @@
{
#ifdef USE_THREAD
BUG_ON((wq == &timers && !(task->state & TASK_SHARED_WQ)) ||
- (wq == &sched->timers && (task->state & TASK_SHARED_WQ)) ||
- (wq != &timers && wq != &sched->timers));
+ (wq == &th_ctx->timers && (task->state & TASK_SHARED_WQ)) ||
+ (wq != &timers && wq != &th_ctx->timers));
#endif
/* if this happens the process is doomed anyway, so better catch it now
* so that we have the caller in the stack.
@@ -295,7 +291,7 @@
*/
void wake_expired_tasks()
{
- struct task_per_thread * const tt = sched; // thread's tasks
+ struct thread_ctx * const tt = th_ctx; // thread's tasks
int max_processed = global.tune.runqueue_depth;
struct task *task;
struct eb32_node *eb;
@@ -436,7 +432,7 @@
*/
int next_timer_expiry()
{
- struct task_per_thread * const tt = sched; // thread's tasks
+ struct thread_ctx * const tt = th_ctx; // thread's tasks
struct eb32_node *eb;
int ret = TICK_ETERNITY;
__decl_thread(int key = TICK_ETERNITY);
@@ -470,7 +466,7 @@
return ret;
}
-/* Walks over tasklet lists sched->tasklets[0..TL_CLASSES-1] and run at most
+/* Walks over tasklet lists th_ctx->tasklets[0..TL_CLASSES-1] and run at most
* budget[TL_*] of them. Returns the number of entries effectively processed
* (tasks and tasklets merged). The count of tasks in the list for the current
* thread is adjusted.
@@ -478,7 +474,7 @@
unsigned int run_tasks_from_lists(unsigned int budgets[])
{
struct task *(*process)(struct task *t, void *ctx, unsigned int state);
- struct list *tl_queues = sched->tasklets;
+ struct list *tl_queues = th_ctx->tasklets;
struct task *t;
uint8_t budget_mask = (1 << TL_CLASSES) - 1;
struct sched_activity *profile_entry = NULL;
@@ -488,29 +484,29 @@
void *ctx;
for (queue = 0; queue < TL_CLASSES;) {
- sched->current_queue = queue;
+ th_ctx->current_queue = queue;
/* global.tune.sched.low-latency is set */
if (global.tune.options & GTUNE_SCHED_LOW_LATENCY) {
- if (unlikely(sched->tl_class_mask & budget_mask & ((1 << queue) - 1))) {
+ if (unlikely(th_ctx->tl_class_mask & budget_mask & ((1 << queue) - 1))) {
/* a lower queue index has tasks again and still has a
* budget to run them. Let's switch to it now.
*/
- queue = (sched->tl_class_mask & 1) ? 0 :
- (sched->tl_class_mask & 2) ? 1 : 2;
+ queue = (th_ctx->tl_class_mask & 1) ? 0 :
+ (th_ctx->tl_class_mask & 2) ? 1 : 2;
continue;
}
if (unlikely(queue > TL_URGENT &&
budget_mask & (1 << TL_URGENT) &&
- !MT_LIST_ISEMPTY(&sched->shared_tasklet_list))) {
+ !MT_LIST_ISEMPTY(&th_ctx->shared_tasklet_list))) {
/* an urgent tasklet arrived from another thread */
break;
}
if (unlikely(queue > TL_NORMAL &&
budget_mask & (1 << TL_NORMAL) &&
- (!eb_is_empty(&sched->rqueue) ||
+ (!eb_is_empty(&th_ctx->rqueue) ||
(global_tasks_mask & tid_bit)))) {
/* a task was woken up by a bulk tasklet or another thread */
break;
@@ -518,7 +514,7 @@
}
if (LIST_ISEMPTY(&tl_queues[queue])) {
- sched->tl_class_mask &= ~(1 << queue);
+ th_ctx->tl_class_mask &= ~(1 << queue);
queue++;
continue;
}
@@ -538,9 +534,9 @@
ctx = t->context;
process = t->process;
t->calls++;
- sched->current = t;
+ th_ctx->current = t;
- _HA_ATOMIC_DEC(&sched->rq_total);
+ _HA_ATOMIC_DEC(&th_ctx->rq_total);
if (state & TASK_F_TASKLET) {
uint64_t before = 0;
@@ -567,7 +563,7 @@
}
else {
done++;
- sched->current = NULL;
+ th_ctx->current = NULL;
pool_free(pool_head_tasklet, t);
__ha_barrier_store();
continue;
@@ -579,7 +575,7 @@
}
done++;
- sched->current = NULL;
+ th_ctx->current = NULL;
__ha_barrier_store();
continue;
}
@@ -591,7 +587,7 @@
/* OK then this is a regular task */
- _HA_ATOMIC_DEC(&task_per_thread[tid].tasks_in_list);
+ _HA_ATOMIC_DEC(&ha_thread_ctx[tid].tasks_in_list);
if (unlikely(t->call_date)) {
uint64_t now_ns = now_mono_time();
uint64_t lat = now_ns - t->call_date;
@@ -616,7 +612,7 @@
else {
task_unlink_wq(t);
__task_free(t);
- sched->current = NULL;
+ th_ctx->current = NULL;
__ha_barrier_store();
/* We don't want max_processed to be decremented if
* we're just freeing a destroyed task, we should only
@@ -624,7 +620,7 @@
*/
continue;
}
- sched->current = NULL;
+ th_ctx->current = NULL;
__ha_barrier_store();
/* If there is a pending state we have to wake up the task
* immediately, else we defer it into wait queue
@@ -650,7 +646,7 @@
}
done++;
}
- sched->current_queue = -1;
+ th_ctx->current_queue = -1;
return done;
}
@@ -670,7 +666,7 @@
*/
void process_runnable_tasks()
{
- struct task_per_thread * const tt = sched;
+ struct thread_ctx * const tt = th_ctx;
struct eb32sc_node *lrq; // next local run queue entry
struct eb32sc_node *grq; // next global run queue entry
struct task *t;
@@ -701,11 +697,11 @@
if (likely(niced_tasks))
max_processed = (max_processed + 3) / 4;
- if (max_processed < sched->rq_total && sched->rq_total <= 2*max_processed) {
+ if (max_processed < th_ctx->rq_total && th_ctx->rq_total <= 2*max_processed) {
/* If the run queue exceeds the budget by up to 50%, let's cut it
* into two identical halves to improve latency.
*/
- max_processed = sched->rq_total / 2;
+ max_processed = th_ctx->rq_total / 2;
}
not_done_yet:
@@ -718,7 +714,7 @@
/* normal tasklets list gets a default weight of ~37% */
if ((tt->tl_class_mask & (1 << TL_NORMAL)) ||
- !eb_is_empty(&sched->rqueue) || (global_tasks_mask & tid_bit))
+ !eb_is_empty(&th_ctx->rqueue) || (global_tasks_mask & tid_bit))
max[TL_NORMAL] = default_weights[TL_NORMAL];
/* bulk tasklets list gets a default weight of ~13% */
@@ -889,14 +885,14 @@
#endif
/* clean the per thread run queue */
for (i = 0; i < global.nbthread; i++) {
- tmp_rq = eb32sc_first(&task_per_thread[i].rqueue, MAX_THREADS_MASK);
+ tmp_rq = eb32sc_first(&ha_thread_ctx[i].rqueue, MAX_THREADS_MASK);
while (tmp_rq) {
t = eb32sc_entry(tmp_rq, struct task, rq);
tmp_rq = eb32sc_next(tmp_rq, MAX_THREADS_MASK);
task_destroy(t);
}
/* cleanup the per thread timers queue */
- tmp_wq = eb32_first(&task_per_thread[i].timers);
+ tmp_wq = eb32_first(&ha_thread_ctx[i].timers);
while (tmp_wq) {
t = eb32_entry(tmp_wq, struct task, wq);
tmp_wq = eb32_next(tmp_wq);
@@ -914,11 +910,10 @@
memset(&timers, 0, sizeof(timers));
memset(&rqueue, 0, sizeof(rqueue));
#endif
- memset(&task_per_thread, 0, sizeof(task_per_thread));
for (i = 0; i < MAX_THREADS; i++) {
for (q = 0; q < TL_CLASSES; q++)
- LIST_INIT(&task_per_thread[i].tasklets[q]);
- MT_LIST_INIT(&task_per_thread[i].shared_tasklet_list);
+ LIST_INIT(&ha_thread_ctx[i].tasklets[q]);
+ MT_LIST_INIT(&ha_thread_ctx[i].shared_tasklet_list);
}
}
diff --git a/src/thread.c b/src/thread.c
index ad5327a..ab9ef90 100644
--- a/src/thread.c
+++ b/src/thread.c
@@ -53,6 +53,9 @@
struct thread_info ha_thread_info[MAX_THREADS] = { };
THREAD_LOCAL struct thread_info *ti = &ha_thread_info[0];
+struct thread_ctx ha_thread_ctx[MAX_THREADS] = { };
+THREAD_LOCAL struct thread_ctx *th_ctx = &ha_thread_ctx[0];
+
#ifdef USE_THREAD
volatile unsigned long threads_want_rdv_mask __read_mostly = 0;