MINOR: tasks: move the list walking code to its own function
New function run_tasks_from_list() will run over a tasklet list and will
run all the tasks and tasklets it finds there within a limit of <max>
that is passed in arggument. This is a preliminary work for scheduler QoS
improvements.
diff --git a/src/task.c b/src/task.c
index 0c26063..a3c581a 100644
--- a/src/task.c
+++ b/src/task.c
@@ -315,6 +315,85 @@
return ret;
}
+/* Walks over tasklet list <list> and run at most <max> of them. Returns
+ * the number of entries effectively processed (tasks and tasklets merged).
+ * The count of tasks in the list for the current thread is adjusted.
+ */
+static int run_tasks_from_list(struct list *list, int max)
+{
+ struct task *(*process)(struct task *t, void *ctx, unsigned short state);
+ struct task *t;
+ unsigned short state;
+ void *ctx;
+ int done = 0;
+
+ while (done < max && !LIST_ISEMPTY(list)) {
+ t = (struct task *)LIST_ELEM(list->n, struct tasklet *, list);
+ state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING;
+ state = _HA_ATOMIC_XCHG(&t->state, state);
+ __ha_barrier_atomic_store();
+ __tasklet_remove_from_tasklet_list((struct tasklet *)t);
+
+ ti->flags &= ~TI_FL_STUCK; // this thread is still running
+ activity[tid].ctxsw++;
+ ctx = t->context;
+ process = t->process;
+ t->calls++;
+
+ if (TASK_IS_TASKLET(t)) {
+ process(NULL, ctx, state);
+ done++;
+ continue;
+ }
+
+ /* OK then this is a regular task */
+
+ task_per_thread[tid].task_list_size--;
+ if (unlikely(t->call_date)) {
+ uint64_t now_ns = now_mono_time();
+
+ t->lat_time += now_ns - t->call_date;
+ t->call_date = now_ns;
+ }
+
+ sched->current = t;
+ __ha_barrier_store();
+ if (likely(process == process_stream))
+ t = process_stream(t, ctx, state);
+ else if (process != NULL)
+ t = process(t, ctx, state);
+ else {
+ __task_free(t);
+ sched->current = NULL;
+ __ha_barrier_store();
+ /* We don't want max_processed to be decremented if
+ * we're just freeing a destroyed task, we should only
+ * do so if we really ran a task.
+ */
+ continue;
+ }
+ sched->current = NULL;
+ __ha_barrier_store();
+ /* If there is a pending state we have to wake up the task
+ * immediately, else we defer it into wait queue
+ */
+ if (t != NULL) {
+ if (unlikely(t->call_date)) {
+ t->cpu_time += now_mono_time() - t->call_date;
+ t->call_date = 0;
+ }
+
+ state = _HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
+ if (state & TASK_WOKEN_ANY)
+ task_wakeup(t, 0);
+ else
+ task_queue(t);
+ }
+ done++;
+ }
+ return done;
+}
+
/* The run queue is chronologically sorted in a tree. An insertion counter is
* used to assign a position to each task. This counter may be combined with
* other variables (eg: nice value) to set the final position in the tree. The
@@ -334,7 +413,7 @@
struct eb32sc_node *lrq = NULL; // next local run queue entry
struct eb32sc_node *grq = NULL; // next global run queue entry
struct task *t;
- int max_processed;
+ int max_processed, done;
struct mt_list *tmp_list;
ti->flags &= ~TI_FL_STUCK; // this thread is still running
@@ -421,76 +500,8 @@
grq = NULL;
}
- while (max_processed > 0 && !LIST_ISEMPTY(&tt->task_list)) {
- struct task *t;
- unsigned short state;
- void *ctx;
- struct task *(*process)(struct task *t, void *ctx, unsigned short state);
-
- t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list);
- state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING;
- state = _HA_ATOMIC_XCHG(&t->state, state);
- __ha_barrier_atomic_store();
- __tasklet_remove_from_tasklet_list((struct tasklet *)t);
-
- ti->flags &= ~TI_FL_STUCK; // this thread is still running
- activity[tid].ctxsw++;
- ctx = t->context;
- process = t->process;
- t->calls++;
-
- if (TASK_IS_TASKLET(t)) {
- process(NULL, ctx, state);
- max_processed--;
- continue;
- }
-
- /* OK then this is a regular task */
-
- tt->task_list_size--;
- if (unlikely(t->call_date)) {
- uint64_t now_ns = now_mono_time();
-
- t->lat_time += now_ns - t->call_date;
- t->call_date = now_ns;
- }
-
- sched->current = t;
- __ha_barrier_store();
- if (likely(process == process_stream))
- t = process_stream(t, ctx, state);
- else if (process != NULL)
- t = process(t, ctx, state);
- else {
- __task_free(t);
- sched->current = NULL;
- __ha_barrier_store();
- /* We don't want max_processed to be decremented if
- * we're just freeing a destroyed task, we should only
- * do so if we really ran a task.
- */
- continue;
- }
- sched->current = NULL;
- __ha_barrier_store();
- /* If there is a pending state we have to wake up the task
- * immediately, else we defer it into wait queue
- */
- if (t != NULL) {
- if (unlikely(t->call_date)) {
- t->cpu_time += now_mono_time() - t->call_date;
- t->call_date = 0;
- }
-
- state = _HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
- if (state & TASK_WOKEN_ANY)
- task_wakeup(t, 0);
- else
- task_queue(t);
- }
-
- max_processed--;
- }
+ done = run_tasks_from_list(&tt->task_list, max_processed);
+ max_processed -= done;
if (!LIST_ISEMPTY(&tt->task_list))
activity[tid].long_rq++;