MAJOR: tasks: create per-thread wait queues
Now we still have a main contention point with the timers in the main
wait queue, but the vast majority of the tasks are pinned to a single
thread. This patch creates a per-thread wait queue and queues a task
to the local wait queue without any locking if the task is bound to a
single thread (the current one) otherwise to the shared queue using
locking. This significantly reduces contention on the wait queue. A
test with 12 threads showed 11 ms spent in the WQ lock compared to
4.7 seconds in the same test without this change. The cache miss ratio
decreased from 19.7% to 19.2% on the 12-thread test, and its performance
increased by 1.5%.
Another indirect benefit is that the average queue size is divided
by the number of threads, which roughly removes log(nbthreads) levels
in the tree and further speeds up lookups.
diff --git a/include/proto/task.h b/include/proto/task.h
index 5445c99..6bc4f43 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -94,10 +94,12 @@
extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */
extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */
#ifdef USE_THREAD
+extern struct eb_root timers; /* sorted timers tree, global */
extern struct eb_root rqueue; /* tree constituting the run queue */
extern int global_rqueue_size; /* Number of element sin the global runqueue */
#endif
+extern struct eb_root timers_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
extern int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
@@ -167,12 +169,19 @@
return t;
}
+/* remove a task from its wait queue. It may either be the local wait queue if
+ * the task is bound to a single thread (in which case there's no locking
+ * involved) or the global queue, with locking.
+ */
static inline struct task *task_unlink_wq(struct task *t)
{
- HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
- if (likely(task_in_wq(t)))
+ if (likely(task_in_wq(t))) {
+ if (atleast2(t->thread_mask))
+ HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
__task_unlink_wq(t);
- HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+ if (atleast2(t->thread_mask))
+ HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+ }
return t;
}
@@ -356,10 +365,14 @@
pool_flush(pool_head_tasklet);
}
+void __task_queue(struct task *task, struct eb_root *wq);
+
/* Place <task> into the wait queue, where it may already be. If the expiration
* timer is infinite, do nothing and rely on wake_expired_task to clean up.
+ * If the task is bound to a single thread, it's assumed to be bound to the
+ * current thread's queue and is queued without locking. Otherwise it's queued
+ * into the global wait queue, protected by locks.
*/
-void __task_queue(struct task *task);
static inline void task_queue(struct task *task)
{
/* If we already have a place in the wait queue no later than the
@@ -374,10 +387,18 @@
if (!tick_isset(task->expire))
return;
- HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
- if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
- __task_queue(task);
- HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+#ifdef USE_THREAD
+ if (atleast2(task->thread_mask)) {
+ HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
+ if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+ __task_queue(task, &timers);
+ HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+ } else
+#endif
+ {
+ if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+ __task_queue(task, &timers_local[tid]);
+ }
}
/* Ensure <task> will be woken up at most at <when>. If the task is already in
@@ -390,14 +411,26 @@
if (task_in_rq(task))
return;
+#ifdef USE_THREAD
+ if (atleast2(task->thread_mask)) {
+ HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
+ if (task_in_wq(task))
+ when = tick_first(when, task->expire);
+
- HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
- if (task_in_wq(task))
- when = tick_first(when, task->expire);
+ task->expire = when;
+ if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+ __task_queue(task, &timers);
+ HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+ } else
+#endif
+ {
+ if (task_in_wq(task))
+ when = tick_first(when, task->expire);
- task->expire = when;
- if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
- __task_queue(task);
- HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+ task->expire = when;
+ if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+ __task_queue(task, &timers_local[tid]);
+ }
}
/* This function register a new signal. "lua" is the current lua
diff --git a/src/task.c b/src/task.c
index 3f193f2..27408b1 100644
--- a/src/task.c
+++ b/src/task.c
@@ -50,14 +50,16 @@
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */
__decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
-static struct eb_root timers; /* sorted timers tree */
#ifdef USE_THREAD
+struct eb_root timers; /* sorted timers tree, global */
struct eb_root rqueue; /* tree constituting the run queue */
int global_rqueue_size; /* Number of element sin the global runqueue */
#endif
+
struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
static unsigned int rqueue_ticks; /* insertion count */
+struct eb_root timers_local[MAX_THREADS]; /* sorted timers tree, per thread */
/* Puts the task <t> in run queue at a position depending on t->nice. <t> is
* returned. The nice value assigns boosts in 32th of the run queue size. A
@@ -170,7 +172,7 @@
/*
* __task_queue()
*
- * Inserts a task into the wait queue at the position given by its expiration
+ * Inserts a task into wait queue <wq> at the position given by its expiration
* date. It does not matter if the task was already in the wait queue or not,
* as it will be unlinked. The task must not have an infinite expiration timer.
* Last, tasks must not be queued further than the end of the tree, which is
@@ -178,9 +180,11 @@
*
* This function should not be used directly, it is meant to be called by the
* inline version of task_queue() which performs a few cheap preliminary tests
- * before deciding to call __task_queue().
+ * before deciding to call __task_queue(). Moreover this function doesn't care
+ * at all about locking so the caller must be careful when deciding whether to
+ * lock or not around this call.
*/
-void __task_queue(struct task *task)
+void __task_queue(struct task *task, struct eb_root *wq)
{
if (likely(task_in_wq(task)))
__task_unlink_wq(task);
@@ -193,9 +197,7 @@
return;
#endif
- eb32_insert(&timers, &task->wq);
-
- return;
+ eb32_insert(wq, &task->wq);
}
/*
@@ -209,6 +211,51 @@
int ret = TICK_ETERNITY;
while (1) {
+ lookup_next_local:
+ eb = eb32_lookup_ge(&timers_local[tid], now_ms - TIMER_LOOK_BACK);
+ if (!eb) {
+ /* we might have reached the end of the tree, typically because
+ * <now_ms> is in the first half and we're first scanning the last
+ * half. Let's loop back to the beginning of the tree now.
+ */
+ eb = eb32_first(&timers_local[tid]);
+ if (likely(!eb))
+ break;
+ }
+
+ if (tick_is_lt(now_ms, eb->key)) {
+ /* timer not expired yet, revisit it later */
+ ret = eb->key;
+ break;
+ }
+
+ /* timer looks expired, detach it from the queue */
+ task = eb32_entry(eb, struct task, wq);
+ __task_unlink_wq(task);
+
+ /* It is possible that this task was left at an earlier place in the
+ * tree because a recent call to task_queue() has not moved it. This
+ * happens when the new expiration date is later than the old one.
+ * Since it is very unlikely that we reach a timeout anyway, it's a
+ * lot cheaper to proceed like this because we almost never update
+ * the tree. We may also find disabled expiration dates there. Since
+ * we have detached the task from the tree, we simply call task_queue
+ * to take care of this. Note that we might occasionally requeue it at
+ * the same place, before <eb>, so we have to check if this happens,
+ * and adjust <eb>, otherwise we may skip it which is not what we want.
+ * We may also not requeue the task (and not point eb at it) if its
+ * expiration time is not set.
+ */
+ if (!tick_is_expired(task->expire, now_ms)) {
+ if (tick_isset(task->expire))
+ __task_queue(task, &timers_local[tid]);
+ goto lookup_next_local;
+ }
+ task_wakeup(task, TASK_WOKEN_TIMER);
+ }
+
+#ifdef USE_THREAD
+ while (1) {
HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
lookup_next:
eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
@@ -224,7 +271,7 @@
if (tick_is_lt(now_ms, eb->key)) {
/* timer not expired yet, revisit it later */
- ret = eb->key;
+ ret = tick_first(ret, eb->key);
break;
}
@@ -247,7 +294,7 @@
*/
if (!tick_is_expired(task->expire, now_ms)) {
if (tick_isset(task->expire))
- __task_queue(task);
+ __task_queue(task, &timers);
goto lookup_next;
}
task_wakeup(task, TASK_WOKEN_TIMER);
@@ -255,6 +302,7 @@
}
HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+#endif
return ret;
}
@@ -415,13 +463,14 @@
{
int i;
- memset(&timers, 0, sizeof(timers));
#ifdef USE_THREAD
+ memset(&timers, 0, sizeof(timers));
memset(&rqueue, 0, sizeof(rqueue));
#endif
HA_SPIN_INIT(&wq_lock);
HA_SPIN_INIT(&rq_lock);
for (i = 0; i < MAX_THREADS; i++) {
+ memset(&timers_local[i], 0, sizeof(timers_local[i]));
memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
LIST_INIT(&task_list[i]);
task_list_size[i] = 0;