MAJOR: tasks: Create a per-thread runqueue.

A lot of tasks are run on one thread only, so instead of having them all
in the global runqueue, create a per-thread runqueue which doesn't require
any locking, and add all tasks belonging to only one thread to the
corresponding runqueue.

The global runqueue is still used for non-local tasks, and is visited
by each thread when checking its own runqueue. The nice parameter is
thus used both in the global runqueue and in the local ones. The rare
tasks that are bound to multiple threads will have their nice value
used twice (once for the global queue, once for the thread-local one).
diff --git a/include/proto/task.h b/include/proto/task.h
index c1c4c07..59ac382 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -92,6 +92,8 @@
 extern struct pool_head *pool_head_notification;
 extern THREAD_LOCAL struct task *curr_task; /* task currently running or NULL */
 extern THREAD_LOCAL struct eb32sc_node *rq_next; /* Next task to be potentially run */
+extern struct eb_root rqueue;      /* tree constituting the run queue */
+extern struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
 
 __decl_hathreads(extern HA_SPINLOCK_T rq_lock);  /* spin lock related to run queue */
 __decl_hathreads(extern HA_SPINLOCK_T wq_lock);  /* spin lock related to wait queue */
@@ -109,25 +111,28 @@
 }
 
 /* puts the task <t> in run queue with reason flags <f>, and returns <t> */
-struct task *__task_wakeup(struct task *t);
-static inline struct task *task_wakeup(struct task *t, unsigned int f)
+/* This will put the task in the local runqueue if the task is only runnable
+ * by the current thread, in the global runqueue otherwies.
+ */
+void __task_wakeup(struct task *t, struct eb_root *);
+static inline void task_wakeup(struct task *t, unsigned int f)
 {
-	HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+	unsigned short state;
 
-	/* If task is running, we postpone the call
-	 * and backup the state.
-	 */
-	if (unlikely(t->state & TASK_RUNNING)) {
-		t->pending_state |= f;
-		HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
-		return t;
-	}
-	if (likely(!task_in_rq(t)))
-		__task_wakeup(t);
-	t->state |= f;
-	HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+#ifdef USE_THREAD
+	struct eb_root *root;
 
-	return t;
+	if (t->thread_mask == tid_bit && global.nbthread > 1)
+		root = &rqueue_local[tid];
+	else
+		root = &rqueue;
+#else
+	struct eb_root *root = &rqueue;
+#endif
+
+	state = HA_ATOMIC_OR(&t->state, f);
+	if (!(state & TASK_RUNNING))
+		__task_wakeup(t, root);
 }
 
 /* change the thread affinity of a task to <thread_mask> */
@@ -167,9 +172,9 @@
 static inline struct task *__task_unlink_rq(struct task *t)
 {
 	eb32sc_delete(&t->rq);
-	tasks_run_queue--;
+	HA_ATOMIC_SUB(&tasks_run_queue, 1);
 	if (likely(t->nice))
-		niced_tasks--;
+		HA_ATOMIC_SUB(&niced_tasks, 1);
 	return t;
 }
 
@@ -178,13 +183,15 @@
  */
 static inline struct task *task_unlink_rq(struct task *t)
 {
-	HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+	if (t->thread_mask != tid_bit)
+		HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
 	if (likely(task_in_rq(t))) {
 		if (&t->rq == rq_next)
 			rq_next = eb32sc_next(rq_next, tid_bit);
 		__task_unlink_rq(t);
 	}
-	HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+	if (t->thread_mask != tid_bit)
+		HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 	return t;
 }
 
@@ -208,7 +215,7 @@
 {
 	t->wq.node.leaf_p = NULL;
 	t->rq.node.leaf_p = NULL;
-	t->pending_state = t->state = TASK_SLEEPING;
+	t->state = TASK_SLEEPING;
 	t->thread_mask = thread_mask;
 	t->nice = 0;
 	t->calls = 0;
diff --git a/src/task.c b/src/task.c
index 23e310b..876b837 100644
--- a/src/task.c
+++ b/src/task.c
@@ -45,7 +45,10 @@
 __decl_hathreads(HA_SPINLOCK_T __attribute__((aligned(64))) wq_lock); /* spin lock related to wait queue */
 
 static struct eb_root timers;      /* sorted timers tree */
-static struct eb_root rqueue;      /* tree constituting the run queue */
+struct eb_root rqueue;      /* tree constituting the run queue */
+struct eb_root rqueue_local[MAX_THREADS]; /* tree constituting the per-thread run queue */
+static int global_rqueue_size; /* Number of element sin the global runqueue */
+static int rqueue_size[MAX_THREADS]; /* Number of elements in the per-thread run queue */
 static unsigned int rqueue_ticks;  /* insertion count */
 
 /* Puts the task <t> in run queue at a position depending on t->nice. <t> is
@@ -56,30 +59,76 @@
  * The task must not already be in the run queue. If unsure, use the safer
  * task_wakeup() function.
  */
-struct task *__task_wakeup(struct task *t)
+void __task_wakeup(struct task *t, struct eb_root *root)
 {
-	tasks_run_queue++;
+	void *expected = NULL;
+	int *rq_size;
+
+	if (root == &rqueue) {
+		rq_size = &global_rqueue_size;
+		HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+	} else {
+		int nb = root - &rqueue_local[0];
+		rq_size = &rqueue_size[nb];
+	}
+	/* Make sure if the task isn't in the runqueue, nobody inserts it
+	 * in the meanwhile.
+	 */
+redo:
+	if (unlikely(!HA_ATOMIC_CAS(&t->rq.node.leaf_p, &expected, 0x1))) {
+		if (root == &rqueue)
+			HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+		return;
+	}
+	/* There's a small race condition, when running a task, the thread
+	 * first sets TASK_RUNNING, and then unlink the task.
+	 * If an another thread calls task_wakeup() for the same task,
+	 * it may set t->state before TASK_RUNNING was set, and then try
+	 * to set t->rq.nod.leaf_p after it was unlinked.
+	 * To make sure it is not a problem, we check if TASK_RUNNING is set
+	 * again. If it is, we unset t->rq.node.leaf_p.
+	 * We then check for TASK_RUNNING a third time. If it is still there,
+	 * then we can give up, the task will be re-queued later if it needs
+	 * to be. If it's not there, and there is still something in t->state,
+	 * then we have to requeue.
+	 */
+	if (((volatile unsigned short)(t->state)) & TASK_RUNNING) {
+		unsigned short state;
+		t->rq.node.leaf_p = NULL;
+		__ha_barrier_store();
+
+		state = (volatile unsigned short)(t->state);
+		if (unlikely(state != 0 && !(state & TASK_RUNNING)))
+			goto redo;
+		if (root == &rqueue)
+			HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+		return;
+	}
+	HA_ATOMIC_ADD(&tasks_run_queue, 1);
 	active_tasks_mask |= t->thread_mask;
-	t->rq.key = ++rqueue_ticks;
+	t->rq.key = HA_ATOMIC_ADD(&rqueue_ticks, 1);
 
 	if (likely(t->nice)) {
 		int offset;
 
-		niced_tasks++;
+		HA_ATOMIC_ADD(&niced_tasks, 1);
 		if (likely(t->nice > 0))
-			offset = (unsigned)((tasks_run_queue * (unsigned int)t->nice) / 32U);
+			offset = (unsigned)((*rq_size * (unsigned int)t->nice) / 32U);
 		else
-			offset = -(unsigned)((tasks_run_queue * (unsigned int)-t->nice) / 32U);
+			offset = -(unsigned)((*rq_size * (unsigned int)-t->nice) / 32U);
 		t->rq.key += offset;
 	}
 
-	/* reset flag to pending ones
-	 * Note: __task_wakeup must not be called
-	 * if task is running
-	 */
-	t->state = t->pending_state;
-	eb32sc_insert(&rqueue, &t->rq, t->thread_mask);
-	return t;
+	eb32sc_insert(root, &t->rq, t->thread_mask);
+	if (root == &rqueue) {
+		global_rqueue_size++;
+		HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+	} else {
+		int nb = root - &rqueue_local[0];
+
+		rqueue_size[nb]++;
+	}
+	return;
 }
 
 /*
@@ -185,11 +234,8 @@
 void process_runnable_tasks()
 {
 	struct task *t;
-	int i;
 	int max_processed;
-	struct task *local_tasks[16];
-	int local_tasks_count;
-	int final_tasks_count;
+	uint64_t average = 0;
 
 	tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
 	nb_tasks_cur = nb_tasks;
@@ -216,9 +262,11 @@
 
 			t = eb32sc_entry(rq_next, struct task, rq);
 			rq_next = eb32sc_next(rq_next, tid_bit);
+			global_rqueue_size--;
+
+			/* detach the task from the queue */
 			__task_unlink_rq(t);
 			t->state |= TASK_RUNNING;
-			t->pending_state = 0;
 
 			t->calls++;
 			curr_task = t;
@@ -244,8 +292,8 @@
 				 * immediatly, else we defer
 				 * it into wait queue
 				 */
-				if (t->pending_state)
-					__task_wakeup(t);
+				if (t->state)
+					__task_wakeup(t, &rqueue);
 				else
 					task_queue(t);
 			}
@@ -267,104 +315,105 @@
 		return;
 	}
 
+	average = tasks_run_queue / global.nbthread;
+
+	/* Get some elements from the global run queue and put it in the
+	 * local run queue. To try to keep a bit of fairness, just get as
+	 * much elements from the global list as to have a bigger local queue
+	 * than the average.
+	 */
+	while (rqueue_size[tid] <= average) {
+
+		/* we have to restart looking up after every batch */
+		rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
+		if (unlikely(!rq_next)) {
+			/* either we just started or we reached the end
+			 * of the tree, typically because <rqueue_ticks>
+			 * is in the first half and we're first scanning
+			 * the last half. Let's loop back to the beginning
+			 * of the tree now.
+			 */
+			rq_next = eb32sc_first(&rqueue, tid_bit);
+			if (!rq_next)
+				break;
+		}
+
+		t = eb32sc_entry(rq_next, struct task, rq);
+		rq_next = eb32sc_next(rq_next, tid_bit);
+
+		/* detach the task from the queue */
+		__task_unlink_rq(t);
+		__task_wakeup(t, &rqueue_local[tid]);
+	}
+
+	HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 	active_tasks_mask &= ~tid_bit;
 	while (1) {
+		unsigned short state;
 		/* Note: this loop is one of the fastest code path in
 		 * the whole program. It should not be re-arranged
 		 * without a good reason.
 		 */
 
 		/* we have to restart looking up after every batch */
-		rq_next = eb32sc_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
-		for (local_tasks_count = 0; local_tasks_count < 16; local_tasks_count++) {
-			if (unlikely(!rq_next)) {
-				/* either we just started or we reached the end
-				 * of the tree, typically because <rqueue_ticks>
-				 * is in the first half and we're first scanning
-				 * the last half. Let's loop back to the beginning
-				 * of the tree now.
-				 */
-				rq_next = eb32sc_first(&rqueue, tid_bit);
-				if (!rq_next)
-					break;
-			}
-
-			t = eb32sc_entry(rq_next, struct task, rq);
-			rq_next = eb32sc_next(rq_next, tid_bit);
-
-			/* detach the task from the queue */
-			__task_unlink_rq(t);
-			local_tasks[local_tasks_count] = t;
-			t->state |= TASK_RUNNING;
-			t->pending_state = 0;
-			t->calls++;
-			max_processed--;
-		}
-
-		if (!local_tasks_count)
-			break;
-
-		HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
-
-		final_tasks_count = 0;
-		for (i = 0; i < local_tasks_count ; i++) {
-			t = local_tasks[i];
-			/* This is an optimisation to help the processor's branch
-			 * predictor take this most common call.
+		rq_next = eb32sc_lookup_ge(&rqueue_local[tid], rqueue_ticks - TIMER_LOOK_BACK, tid_bit);
+		if (unlikely(!rq_next)) {
+			/* either we just started or we reached the end
+			 * of the tree, typically because <rqueue_ticks>
+			 * is in the first half and we're first scanning
+			 * the last half. Let's loop back to the beginning
+			 * of the tree now.
 			 */
-			curr_task = t;
-			if (likely(t->process == process_stream))
-				t = process_stream(t, t->context, t->state);
-			else {
-				if (t->process != NULL)
-					t = t->process(t, t->context, t->state);
-				else {
-					__task_free(t);
-					t = NULL;
-				}
-			}
-			curr_task = NULL;
-			if (t)
-				local_tasks[final_tasks_count++] = t;
+			rq_next = eb32sc_first(&rqueue_local[tid], tid_bit);
+			if (!rq_next)
+				break;
 		}
+		t = eb32sc_entry(rq_next, struct task, rq);
+		rq_next = eb32sc_next(rq_next, tid_bit);
 
-		for (i = 0; i < final_tasks_count ; i++) {
-			t = local_tasks[i];
-			/* If there is a pending state
-			 * we have to wake up the task
-			 * immediatly, else we defer
-			 * it into wait queue
-			 */
-			HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
-			t->state &= ~TASK_RUNNING;
-			if (t->pending_state) {
-				__task_wakeup(t);
-				HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
-			}
-			else {
-				/* we must never hold the RQ lock before the WQ lock */
-				HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+		state = HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
+		/* detach the task from the queue */
+		__task_unlink_rq(t);
+		t->calls++;
+		max_processed--;
+		rqueue_size[tid]--;
+		curr_task = t;
+		if (likely(t->process == process_stream))
+			t = process_stream(t, t->context, state);
+		else
+			t = t->process(t, t->context, state);
+		curr_task = NULL;
+		/* If there is a pending state  we have to wake up the task
+		 * immediatly, else we defer it into wait queue
+		 */
+		if (t != NULL) {
+			state = HA_ATOMIC_AND(&t->state, ~TASK_RUNNING);
+			if (state)
+				__task_wakeup(t, (t->thread_mask == tid_bit) ?
+				    &rqueue_local[tid] : &rqueue);
+			else
 				task_queue(t);
-			}
 		}
 
-		HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
 		if (max_processed <= 0) {
 			active_tasks_mask |= tid_bit;
 			activity[tid].long_rq++;
 			break;
 		}
 	}
-	HA_SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_task()
 {
+	int i;
+
 	memset(&timers, 0, sizeof(timers));
 	memset(&rqueue, 0, sizeof(rqueue));
 	HA_SPIN_INIT(&wq_lock);
 	HA_SPIN_INIT(&rq_lock);
+	for (i = 0; i < MAX_THREADS; i++)
+		memset(&rqueue_local[i], 0, sizeof(rqueue_local[i]));
 	pool_head_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
 	if (!pool_head_task)
 		return 0;