MAJOR: tasks: create per-thread wait queues

Now we still have a main contention point with the timers in the main
wait queue, but the vast majority of the tasks are pinned to a single
thread. This patch creates a per-thread wait queue and queues a task
to the local wait queue without any locking if the task is bound to a
single thread (the current one) otherwise to the shared queue using
locking. This significantly reduces contention on the wait queue. A
test with 12 threads showed 11 ms spent in the WQ lock compared to
4.7 seconds in the same test without this change. The cache miss ratio
decreased from 19.7% to 19.2% on the 12-thread test, and its performance
increased by 1.5%.

Another indirect benefit is that the average queue size is divided
by the number of threads, which roughly removes log(nbthreads) levels
in the tree and further speeds up lookups.
diff --git a/include/proto/task.h b/include/proto/task.h
index 5445c99..6bc4f43 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -94,10 +94,12 @@
 extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */
 extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */
 #ifdef USE_THREAD
+extern struct eb_root timers;      /* sorted timers tree, global */
 extern struct eb_root rqueue;      /* tree constituting the run queue */
 extern int global_rqueue_size; /* Number of element sin the global runqueue */
 #endif
 
+extern struct eb_root timers_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
 extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
 extern int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
 extern struct list task_list[MAX_THREADS]; /* List of tasks to be run, mixing tasks and tasklets */
@@ -167,12 +169,19 @@
 	return t;
 }
 
+/* remove a task from its wait queue. It may either be the local wait queue if
+ * the task is bound to a single thread (in which case there's no locking
+ * involved) or the global queue, with locking.
+ */
 static inline struct task *task_unlink_wq(struct task *t)
 {
-	HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
-	if (likely(task_in_wq(t)))
+	if (likely(task_in_wq(t))) {
+		if (atleast2(t->thread_mask))
+			HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
 		__task_unlink_wq(t);
-	HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+		if (atleast2(t->thread_mask))
+			HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+	}
 	return t;
 }
 
@@ -356,10 +365,14 @@
 		pool_flush(pool_head_tasklet);
 }
 
+void __task_queue(struct task *task, struct eb_root *wq);
+
 /* Place <task> into the wait queue, where it may already be. If the expiration
  * timer is infinite, do nothing and rely on wake_expired_task to clean up.
+ * If the task is bound to a single thread, it's assumed to be bound to the
+ * current thread's queue and is queued without locking. Otherwise it's queued
+ * into the global wait queue, protected by locks.
  */
-void __task_queue(struct task *task);
 static inline void task_queue(struct task *task)
 {
 	/* If we already have a place in the wait queue no later than the
@@ -374,10 +387,18 @@
 	if (!tick_isset(task->expire))
 		return;
 
-	HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
-	if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
-		__task_queue(task);
-	HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+#ifdef USE_THREAD
+	if (atleast2(task->thread_mask)) {
+		HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
+		if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+			__task_queue(task, &timers);
+		HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+	} else
+#endif
+	{
+		if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+			__task_queue(task, &timers_local[tid]);
+	}
 }
 
 /* Ensure <task> will be woken up at most at <when>. If the task is already in
@@ -390,14 +411,26 @@
 	if (task_in_rq(task))
 		return;
 
+#ifdef USE_THREAD
+	if (atleast2(task->thread_mask)) {
+		HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
+		if (task_in_wq(task))
+			when = tick_first(when, task->expire);
+
-	HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
-	if (task_in_wq(task))
-		when = tick_first(when, task->expire);
+		task->expire = when;
+		if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+			__task_queue(task, &timers);
+		HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+	} else
+#endif
+	{
+		if (task_in_wq(task))
+			when = tick_first(when, task->expire);
 
-	task->expire = when;
-	if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
-		__task_queue(task);
-	HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+		task->expire = when;
+		if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
+			__task_queue(task, &timers_local[tid]);
+	}
 }
 
 /* This function register a new signal. "lua" is the current lua
diff --git a/src/task.c b/src/task.c
index 3f193f2..27408b1 100644
--- a/src/task.c
+++ b/src/task.c
@@ -50,14 +50,16 @@
 __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) rq_lock); /* spin lock related to run queue */
 __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
 
-static struct eb_root timers;      /* sorted timers tree */
 #ifdef USE_THREAD
+struct eb_root timers;      /* sorted timers tree, global */
 struct eb_root rqueue;      /* tree constituting the run queue */
 int global_rqueue_size; /* Number of element sin the global runqueue */
 #endif
+
 struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
 int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
 static unsigned int rqueue_ticks;  /* insertion count */
+struct eb_root timers_local[MAX_THREADS];  /* sorted timers tree, per thread */
 
 /* Puts the task <t> in run queue at a position depending on t->nice. <t> is
  * returned. The nice value assigns boosts in 32th of the run queue size. A
@@ -170,7 +172,7 @@
 /*
  * __task_queue()
  *
- * Inserts a task into the wait queue at the position given by its expiration
+ * Inserts a task into wait queue <wq> at the position given by its expiration
  * date. It does not matter if the task was already in the wait queue or not,
  * as it will be unlinked. The task must not have an infinite expiration timer.
  * Last, tasks must not be queued further than the end of the tree, which is
@@ -178,9 +180,11 @@
  *
  * This function should not be used directly, it is meant to be called by the
  * inline version of task_queue() which performs a few cheap preliminary tests
- * before deciding to call __task_queue().
+ * before deciding to call __task_queue(). Moreover this function doesn't care
+ * at all about locking so the caller must be careful when deciding whether to
+ * lock or not around this call.
  */
-void __task_queue(struct task *task)
+void __task_queue(struct task *task, struct eb_root *wq)
 {
 	if (likely(task_in_wq(task)))
 		__task_unlink_wq(task);
@@ -193,9 +197,7 @@
 		return;
 #endif
 
-	eb32_insert(&timers, &task->wq);
-
-	return;
+	eb32_insert(wq, &task->wq);
 }
 
 /*
@@ -209,6 +211,51 @@
 	int ret = TICK_ETERNITY;
 
 	while (1) {
+  lookup_next_local:
+		eb = eb32_lookup_ge(&timers_local[tid], now_ms - TIMER_LOOK_BACK);
+		if (!eb) {
+			/* we might have reached the end of the tree, typically because
+			* <now_ms> is in the first half and we're first scanning the last
+			* half. Let's loop back to the beginning of the tree now.
+			*/
+			eb = eb32_first(&timers_local[tid]);
+			if (likely(!eb))
+				break;
+		}
+
+		if (tick_is_lt(now_ms, eb->key)) {
+			/* timer not expired yet, revisit it later */
+			ret = eb->key;
+			break;
+		}
+
+		/* timer looks expired, detach it from the queue */
+		task = eb32_entry(eb, struct task, wq);
+		__task_unlink_wq(task);
+
+		/* It is possible that this task was left at an earlier place in the
+		 * tree because a recent call to task_queue() has not moved it. This
+		 * happens when the new expiration date is later than the old one.
+		 * Since it is very unlikely that we reach a timeout anyway, it's a
+		 * lot cheaper to proceed like this because we almost never update
+		 * the tree. We may also find disabled expiration dates there. Since
+		 * we have detached the task from the tree, we simply call task_queue
+		 * to take care of this. Note that we might occasionally requeue it at
+		 * the same place, before <eb>, so we have to check if this happens,
+		 * and adjust <eb>, otherwise we may skip it which is not what we want.
+		 * We may also not requeue the task (and not point eb at it) if its
+		 * expiration time is not set.
+		 */
+		if (!tick_is_expired(task->expire, now_ms)) {
+			if (tick_isset(task->expire))
+				__task_queue(task, &timers_local[tid]);
+			goto lookup_next_local;
+		}
+		task_wakeup(task, TASK_WOKEN_TIMER);
+	}
+
+#ifdef USE_THREAD
+	while (1) {
 		HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
   lookup_next:
 		eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
@@ -224,7 +271,7 @@
 
 		if (tick_is_lt(now_ms, eb->key)) {
 			/* timer not expired yet, revisit it later */
-			ret = eb->key;
+			ret = tick_first(ret, eb->key);
 			break;
 		}
 
@@ -247,7 +294,7 @@
 		 */
 		if (!tick_is_expired(task->expire, now_ms)) {
 			if (tick_isset(task->expire))
-				__task_queue(task);
+				__task_queue(task, &timers);
 			goto lookup_next;
 		}
 		task_wakeup(task, TASK_WOKEN_TIMER);
@@ -255,6 +302,7 @@
 	}
 
 	HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
+#endif
 	return ret;
 }
 
@@ -415,13 +463,14 @@
 {
 	int i;
 
-	memset(&timers, 0, sizeof(timers));
 #ifdef USE_THREAD
+	memset(&timers, 0, sizeof(timers));
 	memset(&rqueue, 0, sizeof(rqueue));
 #endif
 	HA_SPIN_INIT(&wq_lock);
 	HA_SPIN_INIT(&rq_lock);
 	for (i = 0; i < MAX_THREADS; i++) {
+		memset(&timers_local[i], 0, sizeof(timers_local[i]));
 		memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
 		LIST_INIT(&task_list[i]);
 		task_list_size[i] = 0;