[MAJOR] use an ebtree instead of a list for the run queue
We now insert tasks in a certain sequence in the run queue.
The sorting key currently is the arrival order. It will now
be possible to apply a "nice" value to any task so that it
goes forwards or backwards in the run queue.
The calls to wake_expired_tasks() and maintain_proxies()
have been moved to the main run_poll_loop(), because they
had nothing to do in process_runnable_tasks().
The task_wakeup() function is not inlined anymore, as it was
only used at one place.
The qlist member of the task structure has been removed now.
The run_queue list has been replaced for an integer indicating
the number of tasks in the run queue.
diff --git a/src/task.c b/src/task.c
index 2d35e21..dd9638a 100644
--- a/src/task.c
+++ b/src/task.c
@@ -23,7 +23,7 @@
struct pool_head *pool2_task;
-void *run_queue = NULL;
+unsigned int run_queue = 0;
/* Principle of the wait queue.
*
@@ -91,6 +91,8 @@
#define TIMER_SIGN_BIT (1 << (TIMER_TICK_BITS - 1))
static struct eb_root timers[TIMER_TREES]; /* trees with MSB 00, 01, 10 and 11 */
+static struct eb_root rqueue[TIMER_TREES]; /* trees constituting the run queue */
+static unsigned int rqueue_ticks; /* insertion count */
/* returns an ordered key based on an expiration date. */
static inline unsigned int timeval_to_ticks(const struct timeval *t)
@@ -118,13 +120,26 @@
int init_task()
{
memset(&timers, 0, sizeof(timers));
+ memset(&rqueue, 0, sizeof(rqueue));
pool2_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
return pool2_task != NULL;
}
-struct task *_task_wakeup(struct task *t)
+/* puts the task <t> in run queue <q>, and returns <t> */
+struct task *task_wakeup(struct task *t)
{
- return __task_wakeup(t);
+ if (t->state == TASK_RUNNING)
+ return t;
+
+ if (likely(t->eb.node.leaf_p))
+ eb32_delete(&t->eb);
+
+ run_queue++;
+ t->eb.key = ++rqueue_ticks;
+ t->state = TASK_RUNNING;
+
+ eb32_insert(&rqueue[ticks_to_tree(t->eb.key)], &t->eb);
+ return t;
}
/*
@@ -156,7 +171,6 @@
/*
* Extract all expired timers from the timer queue, and wakes up all
* associated tasks. Returns the date of next event (or eternity).
- *
*/
void wake_expired_tasks(struct timeval *next)
{
@@ -190,7 +204,7 @@
/* detach the task from the queue and add the task to the run queue */
eb = eb32_next(eb);
- _task_wakeup(task);
+ task_wakeup(task);
}
tree = (tree + 1) & TIMER_TREE_MASK;
} while (((tree - now_tree) & TIMER_TREE_MASK) < TIMER_TREES/2);
@@ -200,43 +214,58 @@
return;
}
-/*
- * This does 4 things :
- * - wake up all expired tasks
- * - call all runnable tasks
- * - call maintain_proxies() to enable/disable the listeners
- * - return the date of next event in <next> or eternity.
+/* The run queue is chronologically sorted in a tree. An insertion counter is
+ * used to assign a position to each task. This counter may be combined with
+ * other variables (eg: nice value) to set the final position in the tree. The
+ * counter may wrap without a problem, of course. We then limit the number of
+ * tasks processed at once to 1/8 of the number of tasks in the queue, so that
+ * general latency remains low and so that task positions have a chance to be
+ * considered. It also reduces the number of trees to be evaluated when no task
+ * remains.
*
+ * Just like with timers, we start with tree[(current - 1)], which holds past
+ * values, and stop when we reach the middle of the list. In practise, we visit
+ * 3 out of 4 trees.
+ *
+ * The function adjusts <next> if a new event is closer.
*/
void process_runnable_tasks(struct timeval *next)
{
struct timeval temp;
struct task *t;
- void *queue;
+ struct eb32_node *eb;
+ unsigned int tree, stop;
+ unsigned int max_processed;
- wake_expired_tasks(next);
- /* process each task in the run queue now. Each task may be deleted
- * since we only use the run queue's head. Note that any task can be
- * woken up by any other task and it will be processed immediately
- * after as it will be queued on the run queue's head !
- */
+ if (!run_queue)
+ return;
- queue = run_queue;
- foreach_dlist_item(t, queue, struct task *, qlist) {
- DLIST_DEL(&t->qlist);
- t->qlist.p = NULL;
+ max_processed = (run_queue + 7) / 8;
- t->state = TASK_IDLE;
- t->process(t, &temp);
- tv_bound(next, &temp);
- }
+ tree = ticks_to_tree(rqueue_ticks);
+ stop = (tree + TIMER_TREES / 2) & TIMER_TREE_MASK;
+ tree = (tree - 1) & TIMER_TREE_MASK;
- /* maintain all proxies in a consistent state. This should quickly
- * become a task because it becomes expensive when there are huge
- * numbers of proxies. */
- maintain_proxies(&temp);
- tv_bound(next, &temp);
- return;
+ do {
+ eb = eb32_first(&rqueue[tree]);
+ while (eb) {
+ t = eb32_entry(eb, struct task, eb);
+
+ /* detach the task from the queue and add the task to the run queue */
+ eb = eb32_next(eb);
+
+ run_queue--;
+ t->state = TASK_IDLE;
+ eb32_delete(&t->eb);
+
+ t->process(t, &temp);
+ tv_bound(next, &temp);
+
+ if (!--max_processed)
+ return;
+ }
+ tree = (tree + 1) & TIMER_TREE_MASK;
+ } while (tree != stop);
}
/*