MAJOR: threads/task: handle multithread on task scheduler

2 global locks have been added to protect, respectively, the run queue and the
wait queue. And a process mask has been added on each task. Like for FDs, this
mask is used to know which threads are allowed to process a task.

For many tasks, all threads are granted. And this must be your first intension
when you create a new task, else you have a good reason to make a task sticky on
some threads. This is then the responsibility to the process callback to lock
what have to be locked in the task context.

Nevertheless, all tasks linked to a session must be sticky on the thread
creating the session. It is important that I/O handlers processing session FDs
and these tasks run on the same thread to avoid conflicts.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index a2c047e..19cdf83 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -142,6 +142,8 @@
 	FDCACHE_LOCK,
 	FD_LOCK,
 	POLL_LOCK,
+	TASK_RQ_LOCK,
+	TASK_WQ_LOCK,
 	POOL_LOCK,
 	LOCK_LABELS
 };
@@ -226,7 +228,7 @@
 static inline void show_lock_stats()
 {
 	const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
-					   "POOL" };
+					   "TASK_RQ", "TASK_WQ", "POOL" };
 	int lbl;
 
 	for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
diff --git a/include/proto/task.h b/include/proto/task.h
index c6177d0..bc3a173 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -30,6 +30,8 @@
 #include <common/mini-clist.h>
 #include <common/standard.h>
 #include <common/ticks.h>
+#include <common/hathreads.h>
+
 #include <eb32tree.h>
 
 #include <types/global.h>
@@ -86,6 +88,10 @@
 extern unsigned int niced_tasks;  /* number of niced tasks in the run queue */
 extern struct pool_head *pool2_task;
 extern struct pool_head *pool2_notification;
+#ifdef USE_THREAD
+extern HA_SPINLOCK_T rq_lock;        /* spin lock related to run queue */
+extern HA_SPINLOCK_T wq_lock;        /* spin lock related to wait queue */
+#endif
 
 /* return 0 if task is in run queue, otherwise non-zero */
 static inline int task_in_rq(struct task *t)
@@ -103,19 +109,29 @@
 struct task *__task_wakeup(struct task *t);
 static inline struct task *task_wakeup(struct task *t, unsigned int f)
 {
+	SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+
 	/* If task is running, we postpone the call
 	 * and backup the state.
 	 */
 	if (unlikely(t->state & TASK_RUNNING)) {
 		t->pending_state |= f;
+		SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 		return t;
 	}
 	if (likely(!task_in_rq(t)))
 		__task_wakeup(t);
 	t->state |= f;
+	SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+
 	return t;
 }
 
+static inline void task_set_affinity(struct task *t, unsigned long thread_mask)
+{
+
+	t->process_mask = thread_mask;
+}
 /*
  * Unlink the task from the wait queue, and possibly update the last_timer
  * pointer. A pointer to the task itself is returned. The task *must* already
@@ -130,8 +146,10 @@
 
 static inline struct task *task_unlink_wq(struct task *t)
 {
+	SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
 	if (likely(task_in_wq(t)))
 		__task_unlink_wq(t);
+	SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 	return t;
 }
 
@@ -156,9 +174,10 @@
  */
 static inline struct task *task_unlink_rq(struct task *t)
 {
-	if (likely(task_in_rq(t))) {
+	SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+	if (likely(task_in_rq(t)))
 		__task_unlink_rq(t);
-	}
+	SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 	return t;
 }
 
@@ -178,11 +197,12 @@
  * state).  The task is returned. This function should not be used outside of
  * task_new().
  */
-static inline struct task *task_init(struct task *t)
+static inline struct task *task_init(struct task *t, unsigned long thread_mask)
 {
 	t->wq.node.leaf_p = NULL;
 	t->rq.node.leaf_p = NULL;
 	t->pending_state = t->state = TASK_SLEEPING;
+	t->process_mask = thread_mask;
 	t->nice = 0;
 	t->calls = 0;
 	t->expire = TICK_ETERNITY;
@@ -194,12 +214,12 @@
  * case of lack of memory. The task count is incremented. Tasks should only
  * be allocated this way, and must be freed using task_free().
  */
-static inline struct task *task_new(void)
+static inline struct task *task_new(unsigned long thread_mask)
 {
 	struct task *t = pool_alloc2(pool2_task);
 	if (t) {
-		nb_tasks++;
-		task_init(t);
+		HA_ATOMIC_ADD(&nb_tasks, 1);
+		task_init(t, thread_mask);
 	}
 	return t;
 }
@@ -213,7 +233,7 @@
 	pool_free2(pool2_task, t);
 	if (unlikely(stopping))
 		pool_flush2(pool2_task);
-	nb_tasks--;
+	HA_ATOMIC_SUB(&nb_tasks, 1);
 }
 
 /* Place <task> into the wait queue, where it may already be. If the expiration
@@ -234,8 +254,10 @@
 	if (!tick_isset(task->expire))
 		return;
 
+	SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
 	if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
 		__task_queue(task);
+	SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 }
 
 /* Ensure <task> will be woken up at most at <when>. If the task is already in
@@ -244,15 +266,18 @@
  */
 static inline void task_schedule(struct task *task, int when)
 {
+	/* TODO: mthread, check if there is no tisk with this test */
 	if (task_in_rq(task))
 		return;
 
+	SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
 	if (task_in_wq(task))
 		when = tick_first(when, task->expire);
 
 	task->expire = when;
 	if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
 		__task_queue(task);
+	SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 }
 
 /* This function register a new signal. "lua" is the current lua
diff --git a/include/types/task.h b/include/types/task.h
index e0ae382..da7c929 100644
--- a/include/types/task.h
+++ b/include/types/task.h
@@ -69,6 +69,7 @@
 	void *context;			/* the task's context */
 	struct eb32_node wq;		/* ebtree node used to hold the task in the wait queue */
 	int expire;			/* next expiration date for this task, in ticks */
+	unsigned long process_mask;	/* mask of thread IDs authorized to process the task */
 };
 
 /*