MAJOR: task: task scheduler rework.
In order to authorize call of task_wakeup on running task:
- from within the task handler itself.
- in futur, from another thread.
The lookups on runqueue and waitqueue are re-worked
to prepare multithread stuff.
If task_wakeup is called on a running task, the woken
message flags are savec in the 'pending_state' attribute of
the state. The real wakeup is postponed at the end of the handler
process and the woken messages are copied from pending_state
to the state attribute of the task.
It's important to note that this change will cause a very minor
(though measurable) performance loss but it is necessary to make
forward progress on a multi-threaded scheduler. Most users won't
ever notice.
diff --git a/include/proto/task.h b/include/proto/task.h
index 70fd0b3..e510cd9 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -85,8 +85,6 @@
extern unsigned int nb_tasks_cur;
extern unsigned int niced_tasks; /* number of niced tasks in the run queue */
extern struct pool_head *pool2_task;
-extern struct eb32_node *last_timer; /* optimization: last queued timer */
-extern struct eb32_node *rq_next; /* optimization: next task except if delete/insert */
/* return 0 if task is in run queue, otherwise non-zero */
static inline int task_in_rq(struct task *t)
@@ -104,6 +102,13 @@
struct task *__task_wakeup(struct task *t);
static inline struct task *task_wakeup(struct task *t, unsigned int f)
{
+ /* If task is running, we postpone the call
+ * and backup the state.
+ */
+ if (unlikely(t->state & TASK_RUNNING)) {
+ t->pending_state |= f;
+ return t;
+ }
if (likely(!task_in_rq(t)))
__task_wakeup(t);
t->state |= f;
@@ -119,8 +124,6 @@
static inline struct task *__task_unlink_wq(struct task *t)
{
eb32_delete(&t->wq);
- if (last_timer == &t->wq)
- last_timer = NULL;
return t;
}
@@ -153,8 +156,6 @@
static inline struct task *task_unlink_rq(struct task *t)
{
if (likely(task_in_rq(t))) {
- if (&t->rq == rq_next)
- rq_next = eb32_next(rq_next);
__task_unlink_rq(t);
}
return t;
@@ -180,7 +181,7 @@
{
t->wq.node.leaf_p = NULL;
t->rq.node.leaf_p = NULL;
- t->state = TASK_SLEEPING;
+ t->pending_state = t->state = TASK_SLEEPING;
t->nice = 0;
t->calls = 0;
return t;
diff --git a/include/types/task.h b/include/types/task.h
index 0379a22..6dfae98 100644
--- a/include/types/task.h
+++ b/include/types/task.h
@@ -54,6 +54,7 @@
struct task {
struct eb32_node rq; /* ebtree node used to hold the task in the run queue */
unsigned short state; /* task state : bit field of TASK_* */
+ unsigned short pending_state; /* pending states for running talk */
short nice; /* the task's current nice value from -1024 to +1024 */
unsigned int calls; /* number of times ->process() was called */
struct task * (*process)(struct task *t); /* the function which processes the task */
diff --git a/src/task.c b/src/task.c
index c99cea8..63b46c8 100644
--- a/src/task.c
+++ b/src/task.c
@@ -30,8 +30,7 @@
unsigned int tasks_run_queue_cur = 0; /* copy of the run queue size */
unsigned int nb_tasks_cur = 0; /* copy of the tasks count */
unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
-struct eb32_node *last_timer = NULL; /* optimization: last queued timer */
-struct eb32_node *rq_next = NULL; /* optimization: next task except if delete/insert */
+
static struct eb_root timers; /* sorted timers tree */
static struct eb_root rqueue; /* tree constituting the run queue */
@@ -61,11 +60,12 @@
t->rq.key += offset;
}
- /* clear state flags at the same time */
- t->state &= ~TASK_WOKEN_ANY;
-
+ /* reset flag to pending ones
+ * Note: __task_wakeup must not be called
+ * if task is running
+ */
+ t->state = t->pending_state;
eb32_insert(&rqueue, &t->rq);
- rq_next = NULL;
return t;
}
@@ -95,25 +95,8 @@
return;
#endif
- if (likely(last_timer &&
- last_timer->node.bit < 0 &&
- last_timer->key == task->wq.key &&
- last_timer->node.node_p)) {
- /* Most often, last queued timer has the same expiration date, so
- * if it's not queued at the root, let's queue a dup directly there.
- * Note that we can only use dups at the dup tree's root (most
- * negative bit).
- */
- eb_insert_dup(&last_timer->node, &task->wq.node);
- if (task->wq.node.bit < last_timer->node.bit)
- last_timer = &task->wq;
- return;
- }
eb32_insert(&timers, &task->wq);
- /* Make sure we don't assign the last_timer to a node-less entry */
- if (task->wq.node.node_p && (!last_timer || (task->wq.node.bit < last_timer->node.bit)))
- last_timer = &task->wq;
return;
}
@@ -126,8 +109,8 @@
struct task *task;
struct eb32_node *eb;
- eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
while (1) {
+ eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
if (unlikely(!eb)) {
/* we might have reached the end of the tree, typically because
* <now_ms> is in the first half and we're first scanning the last
@@ -145,7 +128,6 @@
/* timer looks expired, detach it from the queue */
task = eb32_entry(eb, struct task, wq);
- eb = eb32_next(eb);
__task_unlink_wq(task);
/* It is possible that this task was left at an earlier place in the
@@ -165,8 +147,6 @@
if (!tick_isset(task->expire))
continue;
__task_queue(task);
- if (!eb || eb->key > task->wq.key)
- eb = &task->wq;
continue;
}
task_wakeup(task, TASK_WOKEN_TIMER);
@@ -189,12 +169,15 @@
void process_runnable_tasks()
{
struct task *t;
- unsigned int max_processed;
-
+ int i;
+ int max_processed;
+ struct eb32_node *rq_next;
+ int rewind;
+ struct task *local_tasks[16];
+ int local_tasks_count;
tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
nb_tasks_cur = nb_tasks;
max_processed = tasks_run_queue;
-
if (!tasks_run_queue)
return;
@@ -204,45 +187,75 @@
if (likely(niced_tasks))
max_processed = (max_processed + 3) / 4;
- while (max_processed--) {
+ while (max_processed > 0) {
/* Note: this loop is one of the fastest code path in
* the whole program. It should not be re-arranged
* without a good reason.
*/
- if (unlikely(!rq_next)) {
- rq_next = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
+
+ rewind = 0;
+ rq_next = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
+ if (!rq_next) {
+ /* we might have reached the end of the tree, typically because
+ * <rqueue_ticks> is in the first half and we're first scanning
+ * the last half. Let's loop back to the beginning of the tree now.
+ */
+ rq_next = eb32_first(&rqueue);
if (!rq_next) {
- /* we might have reached the end of the tree, typically because
- * <rqueue_ticks> is in the first half and we're first scanning
- * the last half. Let's loop back to the beginning of the tree now.
- */
- rq_next = eb32_first(&rqueue);
- if (!rq_next)
+ break;
+ }
+ rewind = 1;
+ }
+
+ local_tasks_count = 0;
+ while (local_tasks_count < 16) {
+ t = eb32_entry(rq_next, struct task, rq);
+ rq_next = eb32_next(rq_next);
+ /* detach the task from the queue */
+ __task_unlink_rq(t);
+ t->state |= TASK_RUNNING;
+ t->pending_state = 0;
+ t->calls++;
+ local_tasks[local_tasks_count++] = t;
+ if (!rq_next) {
+ if (rewind || !(rq_next = eb32_first(&rqueue))) {
break;
+ }
+ rewind = 1;
}
}
- /* detach the task from the queue after updating the pointer to
- * the next entry.
- */
- t = eb32_entry(rq_next, struct task, rq);
- rq_next = eb32_next(rq_next);
- __task_unlink_rq(t);
+ if (!local_tasks_count)
+ break;
- t->state |= TASK_RUNNING;
- /* This is an optimisation to help the processor's branch
- * predictor take this most common call.
- */
- t->calls++;
- if (likely(t->process == process_stream))
- t = process_stream(t);
- else
- t = t->process(t);
+
+ for (i = 0; i < local_tasks_count ; i++) {
+ t = local_tasks[i];
+ /* This is an optimisation to help the processor's branch
+ * predictor take this most common call.
+ */
+ if (likely(t->process == process_stream))
+ t = process_stream(t);
+ else
+ t = t->process(t);
+ local_tasks[i] = t;
+ }
- if (likely(t != NULL)) {
- t->state &= ~TASK_RUNNING;
- if (t->expire)
- task_queue(t);
+ max_processed -= local_tasks_count;
+ for (i = 0; i < local_tasks_count ; i++) {
+ t = local_tasks[i];
+ if (likely(t != NULL)) {
+ t->state &= ~TASK_RUNNING;
+ /* If there is a pending state
+ * we have to wake up the task
+ * immediatly, else we defer
+ * it into wait queue
+ */
+ if (t->pending_state)
+ __task_wakeup(t);
+ else
+ task_queue(t);
+ }
}
}
}