[MAJOR] use an ebtree instead of a list for the run queue
We now insert tasks in a certain sequence in the run queue.
The sorting key currently is the arrival order. It will now
be possible to apply a "nice" value to any task so that it
goes forwards or backwards in the run queue.
The calls to wake_expired_tasks() and maintain_proxies()
have been moved to the main run_poll_loop(), because they
had nothing to do in process_runnable_tasks().
The task_wakeup() function is not inlined anymore, as it was
only used at one place.
The qlist member of the task structure has been removed now.
The run_queue list has been replaced for an integer indicating
the number of tasks in the run queue.
diff --git a/include/proto/task.h b/include/proto/task.h
index d27e1db..f99bbdb 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -32,28 +32,14 @@
#include <types/task.h>
-extern void *run_queue;
+extern unsigned int run_queue; /* run queue size */
extern struct pool_head *pool2_task;
/* perform minimal initializations, report 0 in case of error, 1 if OK. */
int init_task();
/* puts the task <t> in run queue <q>, and returns <t> */
-#define task_wakeup _task_wakeup
-struct task *_task_wakeup(struct task *t);
-static inline struct task *__task_wakeup(struct task *t)
-{
- if (t->state == TASK_RUNNING)
- return t;
-
- if (likely(t->eb.node.leaf_p))
- eb32_delete(&t->eb);
-
- DLIST_ADD(run_queue, &t->qlist);
- t->state = TASK_RUNNING;
-
- return t;
-}
+struct task *task_wakeup(struct task *t);
/* removes the task <t> from the run queue if it was in it.
* returns <t>.
@@ -61,29 +47,27 @@
static inline struct task *task_sleep(struct task *t)
{
if (t->state == TASK_RUNNING) {
- DLIST_DEL(&t->qlist);
- t->qlist.p = NULL;
t->state = TASK_IDLE;
+ eb32_delete(&t->eb);
+ run_queue--;
}
return t;
}
/*
* unlinks the task from wherever it is queued :
- * - eternity_queue, run_queue
+ * - run_queue
* - wait queue
* A pointer to the task itself is returned.
*/
static inline struct task *task_delete(struct task *t)
{
- if (t->qlist.p != NULL) {
- DLIST_DEL(&t->qlist);
- t->qlist.p = NULL;
- }
-
if (t->eb.node.leaf_p)
eb32_delete(&t->eb);
+ if (t->state == TASK_RUNNING)
+ run_queue--;
+
return t;
}
@@ -93,7 +77,6 @@
*/
static inline struct task *task_init(struct task *t)
{
- t->qlist.p = NULL;
t->eb.node.leaf_p = NULL;
t->state = TASK_IDLE;
return t;
@@ -123,6 +106,12 @@
void process_runnable_tasks(struct timeval *next);
+/*
+ * Extract all expired timers from the timer queue, and wakes up all
+ * associated tasks. Returns the date of next event (or eternity).
+ */
+void wake_expired_tasks(struct timeval *next);
+
#endif /* _PROTO_TASK_H */
diff --git a/include/types/task.h b/include/types/task.h
index 6b56547..b6c9c72 100644
--- a/include/types/task.h
+++ b/include/types/task.h
@@ -34,7 +34,6 @@
/* The base for all tasks */
struct task {
- struct list qlist; /* chaining in the same queue; bidirectionnal but not circular */
struct eb32_node eb; /* ebtree node used to hold the task in the wait queue */
int state; /* task state : IDLE or RUNNING */
struct timeval expire; /* next expiration time for this task, use only for fast sorting */
diff --git a/src/haproxy.c b/src/haproxy.c
index 4ecf4c0..d0a6bbf 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -898,12 +898,22 @@
tv_update_date(0,1);
while (1) {
+ /* Check if we can expire some tasks */
+ wake_expired_tasks(&next);
+
+ /* Process a few tasks */
process_runnable_tasks(&next);
+ /* maintain all proxies in a consistent state. This should quickly
+ * become a task because it becomes expensive when there are huge
+ * numbers of proxies. */
+ maintain_proxies(&next);
+
/* stop when there's no connection left and we don't allow them anymore */
if (!actconn && listeners == 0)
break;
+ /* The poller will ensure it returns around <next> */
cur_poller.poll(&cur_poller, &next);
}
}
diff --git a/src/proxy.c b/src/proxy.c
index a7b4efc..21b039d 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -300,8 +300,8 @@
/*
* this function enables proxies when there are enough free sessions,
* or stops them when the table is full. It is designed to be called from the
- * select_loop(). It returns the date of next expiration event during stop
- * time, ETERNITY otherwise.
+ * select_loop(). It adjusts the date of next expiration event during stop
+ * time if appropriate.
*/
void maintain_proxies(struct timeval *next)
{
@@ -309,7 +309,6 @@
struct listener *l;
p = proxy;
- tv_eternity(next);
/* if there are enough free sessions, we'll activate proxies */
if (actconn < global.maxconn) {
diff --git a/src/task.c b/src/task.c
index 2d35e21..dd9638a 100644
--- a/src/task.c
+++ b/src/task.c
@@ -23,7 +23,7 @@
struct pool_head *pool2_task;
-void *run_queue = NULL;
+unsigned int run_queue = 0;
/* Principle of the wait queue.
*
@@ -91,6 +91,8 @@
#define TIMER_SIGN_BIT (1 << (TIMER_TICK_BITS - 1))
static struct eb_root timers[TIMER_TREES]; /* trees with MSB 00, 01, 10 and 11 */
+static struct eb_root rqueue[TIMER_TREES]; /* trees constituting the run queue */
+static unsigned int rqueue_ticks; /* insertion count */
/* returns an ordered key based on an expiration date. */
static inline unsigned int timeval_to_ticks(const struct timeval *t)
@@ -118,13 +120,26 @@
int init_task()
{
memset(&timers, 0, sizeof(timers));
+ memset(&rqueue, 0, sizeof(rqueue));
pool2_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
return pool2_task != NULL;
}
-struct task *_task_wakeup(struct task *t)
+/* puts the task <t> in run queue <q>, and returns <t> */
+struct task *task_wakeup(struct task *t)
{
- return __task_wakeup(t);
+ if (t->state == TASK_RUNNING)
+ return t;
+
+ if (likely(t->eb.node.leaf_p))
+ eb32_delete(&t->eb);
+
+ run_queue++;
+ t->eb.key = ++rqueue_ticks;
+ t->state = TASK_RUNNING;
+
+ eb32_insert(&rqueue[ticks_to_tree(t->eb.key)], &t->eb);
+ return t;
}
/*
@@ -156,7 +171,6 @@
/*
* Extract all expired timers from the timer queue, and wakes up all
* associated tasks. Returns the date of next event (or eternity).
- *
*/
void wake_expired_tasks(struct timeval *next)
{
@@ -190,7 +204,7 @@
/* detach the task from the queue and add the task to the run queue */
eb = eb32_next(eb);
- _task_wakeup(task);
+ task_wakeup(task);
}
tree = (tree + 1) & TIMER_TREE_MASK;
} while (((tree - now_tree) & TIMER_TREE_MASK) < TIMER_TREES/2);
@@ -200,43 +214,58 @@
return;
}
-/*
- * This does 4 things :
- * - wake up all expired tasks
- * - call all runnable tasks
- * - call maintain_proxies() to enable/disable the listeners
- * - return the date of next event in <next> or eternity.
+/* The run queue is chronologically sorted in a tree. An insertion counter is
+ * used to assign a position to each task. This counter may be combined with
+ * other variables (eg: nice value) to set the final position in the tree. The
+ * counter may wrap without a problem, of course. We then limit the number of
+ * tasks processed at once to 1/8 of the number of tasks in the queue, so that
+ * general latency remains low and so that task positions have a chance to be
+ * considered. It also reduces the number of trees to be evaluated when no task
+ * remains.
*
+ * Just like with timers, we start with tree[(current - 1)], which holds past
+ * values, and stop when we reach the middle of the list. In practise, we visit
+ * 3 out of 4 trees.
+ *
+ * The function adjusts <next> if a new event is closer.
*/
void process_runnable_tasks(struct timeval *next)
{
struct timeval temp;
struct task *t;
- void *queue;
+ struct eb32_node *eb;
+ unsigned int tree, stop;
+ unsigned int max_processed;
- wake_expired_tasks(next);
- /* process each task in the run queue now. Each task may be deleted
- * since we only use the run queue's head. Note that any task can be
- * woken up by any other task and it will be processed immediately
- * after as it will be queued on the run queue's head !
- */
+ if (!run_queue)
+ return;
- queue = run_queue;
- foreach_dlist_item(t, queue, struct task *, qlist) {
- DLIST_DEL(&t->qlist);
- t->qlist.p = NULL;
+ max_processed = (run_queue + 7) / 8;
- t->state = TASK_IDLE;
- t->process(t, &temp);
- tv_bound(next, &temp);
- }
+ tree = ticks_to_tree(rqueue_ticks);
+ stop = (tree + TIMER_TREES / 2) & TIMER_TREE_MASK;
+ tree = (tree - 1) & TIMER_TREE_MASK;
- /* maintain all proxies in a consistent state. This should quickly
- * become a task because it becomes expensive when there are huge
- * numbers of proxies. */
- maintain_proxies(&temp);
- tv_bound(next, &temp);
- return;
+ do {
+ eb = eb32_first(&rqueue[tree]);
+ while (eb) {
+ t = eb32_entry(eb, struct task, eb);
+
+ /* detach the task from the queue and add the task to the run queue */
+ eb = eb32_next(eb);
+
+ run_queue--;
+ t->state = TASK_IDLE;
+ eb32_delete(&t->eb);
+
+ t->process(t, &temp);
+ tv_bound(next, &temp);
+
+ if (!--max_processed)
+ return;
+ }
+ tree = (tree + 1) & TIMER_TREE_MASK;
+ } while (tree != stop);
}
/*