MAJOR: threads/task: handle multithread on task scheduler

2 global locks have been added to protect, respectively, the run queue and the
wait queue. And a process mask has been added on each task. Like for FDs, this
mask is used to know which threads are allowed to process a task.

For many tasks, all threads are granted. And this must be your first intension
when you create a new task, else you have a good reason to make a task sticky on
some threads. This is then the responsibility to the process callback to lock
what have to be locked in the task context.

Nevertheless, all tasks linked to a session must be sticky on the thread
creating the session. It is important that I/O handlers processing session FDs
and these tasks run on the same thread to avoid conflicts.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index a2c047e..19cdf83 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -142,6 +142,8 @@
 	FDCACHE_LOCK,
 	FD_LOCK,
 	POLL_LOCK,
+	TASK_RQ_LOCK,
+	TASK_WQ_LOCK,
 	POOL_LOCK,
 	LOCK_LABELS
 };
@@ -226,7 +228,7 @@
 static inline void show_lock_stats()
 {
 	const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
-					   "POOL" };
+					   "TASK_RQ", "TASK_WQ", "POOL" };
 	int lbl;
 
 	for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
diff --git a/include/proto/task.h b/include/proto/task.h
index c6177d0..bc3a173 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -30,6 +30,8 @@
 #include <common/mini-clist.h>
 #include <common/standard.h>
 #include <common/ticks.h>
+#include <common/hathreads.h>
+
 #include <eb32tree.h>
 
 #include <types/global.h>
@@ -86,6 +88,10 @@
 extern unsigned int niced_tasks;  /* number of niced tasks in the run queue */
 extern struct pool_head *pool2_task;
 extern struct pool_head *pool2_notification;
+#ifdef USE_THREAD
+extern HA_SPINLOCK_T rq_lock;        /* spin lock related to run queue */
+extern HA_SPINLOCK_T wq_lock;        /* spin lock related to wait queue */
+#endif
 
 /* return 0 if task is in run queue, otherwise non-zero */
 static inline int task_in_rq(struct task *t)
@@ -103,19 +109,29 @@
 struct task *__task_wakeup(struct task *t);
 static inline struct task *task_wakeup(struct task *t, unsigned int f)
 {
+	SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+
 	/* If task is running, we postpone the call
 	 * and backup the state.
 	 */
 	if (unlikely(t->state & TASK_RUNNING)) {
 		t->pending_state |= f;
+		SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 		return t;
 	}
 	if (likely(!task_in_rq(t)))
 		__task_wakeup(t);
 	t->state |= f;
+	SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+
 	return t;
 }
 
+static inline void task_set_affinity(struct task *t, unsigned long thread_mask)
+{
+
+	t->process_mask = thread_mask;
+}
 /*
  * Unlink the task from the wait queue, and possibly update the last_timer
  * pointer. A pointer to the task itself is returned. The task *must* already
@@ -130,8 +146,10 @@
 
 static inline struct task *task_unlink_wq(struct task *t)
 {
+	SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
 	if (likely(task_in_wq(t)))
 		__task_unlink_wq(t);
+	SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 	return t;
 }
 
@@ -156,9 +174,10 @@
  */
 static inline struct task *task_unlink_rq(struct task *t)
 {
-	if (likely(task_in_rq(t))) {
+	SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+	if (likely(task_in_rq(t)))
 		__task_unlink_rq(t);
-	}
+	SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 	return t;
 }
 
@@ -178,11 +197,12 @@
  * state).  The task is returned. This function should not be used outside of
  * task_new().
  */
-static inline struct task *task_init(struct task *t)
+static inline struct task *task_init(struct task *t, unsigned long thread_mask)
 {
 	t->wq.node.leaf_p = NULL;
 	t->rq.node.leaf_p = NULL;
 	t->pending_state = t->state = TASK_SLEEPING;
+	t->process_mask = thread_mask;
 	t->nice = 0;
 	t->calls = 0;
 	t->expire = TICK_ETERNITY;
@@ -194,12 +214,12 @@
  * case of lack of memory. The task count is incremented. Tasks should only
  * be allocated this way, and must be freed using task_free().
  */
-static inline struct task *task_new(void)
+static inline struct task *task_new(unsigned long thread_mask)
 {
 	struct task *t = pool_alloc2(pool2_task);
 	if (t) {
-		nb_tasks++;
-		task_init(t);
+		HA_ATOMIC_ADD(&nb_tasks, 1);
+		task_init(t, thread_mask);
 	}
 	return t;
 }
@@ -213,7 +233,7 @@
 	pool_free2(pool2_task, t);
 	if (unlikely(stopping))
 		pool_flush2(pool2_task);
-	nb_tasks--;
+	HA_ATOMIC_SUB(&nb_tasks, 1);
 }
 
 /* Place <task> into the wait queue, where it may already be. If the expiration
@@ -234,8 +254,10 @@
 	if (!tick_isset(task->expire))
 		return;
 
+	SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
 	if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
 		__task_queue(task);
+	SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 }
 
 /* Ensure <task> will be woken up at most at <when>. If the task is already in
@@ -244,15 +266,18 @@
  */
 static inline void task_schedule(struct task *task, int when)
 {
+	/* TODO: mthread, check if there is no tisk with this test */
 	if (task_in_rq(task))
 		return;
 
+	SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
 	if (task_in_wq(task))
 		when = tick_first(when, task->expire);
 
 	task->expire = when;
 	if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
 		__task_queue(task);
+	SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 }
 
 /* This function register a new signal. "lua" is the current lua
diff --git a/include/types/task.h b/include/types/task.h
index e0ae382..da7c929 100644
--- a/include/types/task.h
+++ b/include/types/task.h
@@ -69,6 +69,7 @@
 	void *context;			/* the task's context */
 	struct eb32_node wq;		/* ebtree node used to hold the task in the wait queue */
 	int expire;			/* next expiration date for this task, in ticks */
+	unsigned long process_mask;	/* mask of thread IDs authorized to process the task */
 };
 
 /*
diff --git a/src/cfgparse.c b/src/cfgparse.c
index 1cb98f3..e765fdb 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -42,6 +42,7 @@
 #include <common/time.h>
 #include <common/uri_auth.h>
 #include <common/namespace.h>
+#include <common/hathreads.h>
 
 #include <types/capture.h>
 #include <types/compression.h>
@@ -8862,7 +8863,7 @@
 		}
 
 		/* create the task associated with the proxy */
-		curproxy->task = task_new();
+		curproxy->task = task_new(MAX_THREADS_MASK);
 		if (curproxy->task) {
 			curproxy->task->context = curproxy;
 			curproxy->task->process = manage_proxy;
diff --git a/src/checks.c b/src/checks.c
index 704bed2..3d60237 100644
--- a/src/checks.c
+++ b/src/checks.c
@@ -2226,7 +2226,7 @@
 {
 	struct task *t;
 	/* task for the check */
-	if ((t = task_new()) == NULL) {
+	if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
 		Alert("Starting [%s:%s] check: out of memory.\n",
 		      check->server->proxy->id, check->server->id);
 		return 0;
@@ -2272,7 +2272,7 @@
 	for (px = proxy; px; px = px->next) {
 		for (s = px->srv; s; s = s->next) {
 			if (s->slowstart) {
-				if ((t = task_new()) == NULL) {
+				if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
 					Alert("Starting [%s:%s] check: out of memory.\n", px->id, s->id);
 					return ERR_ALERT | ERR_FATAL;
 				}
@@ -3130,7 +3130,7 @@
 			check->port = 587;
 		//check->server = s;
 
-		if ((t = task_new()) == NULL) {
+		if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
 			memprintf(err, "out of memory while allocating mailer alerts task");
 			goto error;
 		}
diff --git a/src/dns.c b/src/dns.c
index c2b87c0..3383faa 100644
--- a/src/dns.c
+++ b/src/dns.c
@@ -1851,7 +1851,7 @@
 		}
 
 		/* Create the task associated to the resolvers section */
-		if ((t = task_new()) == NULL) {
+		if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
 			Alert("config : resolvers '%s' : out of memory.\n", resolvers->id);
 			err_code |= (ERR_ALERT|ERR_ABORT);
 			goto err;
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 51730a2..9543f8f 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1939,7 +1939,7 @@
 	memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size);
 
 	appctx->st0 = SPOE_APPCTX_ST_CONNECT;
-	if ((SPOE_APPCTX(appctx)->task = task_new()) == NULL)
+	if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL)
 		goto out_free_spoe_appctx;
 
 	SPOE_APPCTX(appctx)->owner           = appctx;
@@ -1975,10 +1975,10 @@
 	strm->do_log = NULL;
 	strm->res.flags |= CF_READ_DONTWAIT;
 
-	task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
 	LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
 	conf->agent->applets_act++;
 
+	task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
 	task_wakeup(strm->task, TASK_WOKEN_INIT);
 	return appctx;
 
diff --git a/src/haproxy.c b/src/haproxy.c
index 170b002..ff63844 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -1511,7 +1511,7 @@
 		exit(2);
 	}
 
-	global_listener_queue_task = task_new();
+	global_listener_queue_task = task_new(MAX_THREADS_MASK);
 	if (!global_listener_queue_task) {
 		Alert("Out of memory when initializing global task\n");
 		exit(1);
diff --git a/src/hlua.c b/src/hlua.c
index f5fed04..137117c 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -5450,7 +5450,7 @@
 	if (!hlua)
 		WILL_LJMP(luaL_error(L, "lua out of memory error."));
 
-	task = task_new();
+	task = task_new(MAX_THREADS_MASK);
 	task->context = hlua;
 	task->process = hlua_process_task;
 
@@ -6031,7 +6031,7 @@
 	ctx->ctx.hlua_apptcp.flags = 0;
 
 	/* Create task used by signal to wakeup applets. */
-	task = task_new();
+	task = task_new(MAX_THREADS_MASK);
 	if (!task) {
 		SEND_ERR(px, "Lua applet tcp '%s': out of memory.\n",
 		         ctx->rule->arg.hlua_rule->fcn.name);
@@ -6232,7 +6232,7 @@
 		ctx->ctx.hlua_apphttp.flags |= APPLET_HTTP11;
 
 	/* Create task used by signal to wakeup applets. */
-	task = task_new();
+	task = task_new(MAX_THREADS_MASK);
 	if (!task) {
 		SEND_ERR(px, "Lua applet http '%s': out of memory.\n",
 		         ctx->rule->arg.hlua_rule->fcn.name);
@@ -6777,7 +6777,7 @@
 	 * We use the same wakeup fonction than the Lua applet_tcp and
 	 * applet_http. It is absolutely compatible.
 	 */
-	appctx->ctx.hlua_cli.task = task_new();
+	appctx->ctx.hlua_cli.task = task_new(MAX_THREADS_MASK);
 	if (!appctx->ctx.hlua_cli.task) {
 		SEND_ERR(NULL, "Lua cli '%s': out of memory.\n", fcn->name);
 		goto error;
diff --git a/src/peers.c b/src/peers.c
index 4c85af5..b98ec61 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -2055,7 +2055,7 @@
 
 	list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
 		listener->maxconn = peers->peers_fe->maxconn;
-	peers->sync_task = task_new();
+	peers->sync_task = task_new(MAX_THREADS_MASK);
 	peers->sync_task->process = process_peer_sync;
 	peers->sync_task->context = (void *)peers;
 	peers->sighandler = signal_register_task(0, peers->sync_task, 0);
diff --git a/src/proxy.c b/src/proxy.c
index 53f886e..71091d0 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -979,7 +979,7 @@
 
 	stopping = 1;
 	if (tick_isset(global.hard_stop_after)) {
-		task = task_new();
+		task = task_new(MAX_THREADS_MASK);
 		if (task) {
 			task->process = hard_stop;
 			task_schedule(task, tick_add(now_ms, global.hard_stop_after));
diff --git a/src/session.c b/src/session.c
index ecfa2f1..54a879b 100644
--- a/src/session.c
+++ b/src/session.c
@@ -241,7 +241,7 @@
 	 *          conn -- owner ---> task <-----+
 	 */
 	if (cli_conn->flags & (CO_FL_HANDSHAKE | CO_FL_EARLY_SSL_HS)) {
-		if (unlikely((sess->task = task_new()) == NULL))
+		if (unlikely((sess->task = task_new(tid_bit)) == NULL))
 			goto out_free_sess;
 
 		conn_set_xprt_done_cb(cli_conn, conn_complete_session);
diff --git a/src/stick_table.c b/src/stick_table.c
index d95c77f..5e82116 100644
--- a/src/stick_table.c
+++ b/src/stick_table.c
@@ -436,7 +436,7 @@
 
 		t->exp_next = TICK_ETERNITY;
 		if ( t->expire ) {
-			t->exp_task = task_new();
+			t->exp_task = task_new(MAX_THREADS_MASK);
 			t->exp_task->process = process_table_expire;
 			t->exp_task->context = (void *)t;
 		}
diff --git a/src/stream.c b/src/stream.c
index a78ee96..522441f 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -163,7 +163,7 @@
 	s->flags |= SF_INITIALIZED;
 	s->unique_id = NULL;
 
-	if ((t = task_new()) == NULL)
+	if ((t = task_new(tid_bit)) == NULL)
 		goto out_fail_alloc;
 
 	s->task = t;
diff --git a/src/task.c b/src/task.c
index 8d4ab39..0022bff 100644
--- a/src/task.c
+++ b/src/task.c
@@ -36,6 +36,10 @@
 unsigned int nb_tasks_cur = 0;     /* copy of the tasks count */
 unsigned int niced_tasks = 0;      /* number of niced tasks in the run queue */
 
+#ifdef USE_THREAD
+HA_SPINLOCK_T rq_lock;        /* spin lock related to run queue */
+HA_SPINLOCK_T wq_lock;        /* spin lock related to wait queue */
+#endif
 
 static struct eb_root timers;      /* sorted timers tree */
 static struct eb_root rqueue;      /* tree constituting the run queue */
@@ -113,22 +117,29 @@
 {
 	struct task *task;
 	struct eb32_node *eb;
+	int ret;
 
 	while (1) {
+		SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
+  lookup_next:
 		eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
-		if (unlikely(!eb)) {
+		if (!eb) {
 			/* we might have reached the end of the tree, typically because
 			* <now_ms> is in the first half and we're first scanning the last
 			* half. Let's loop back to the beginning of the tree now.
 			*/
 			eb = eb32_first(&timers);
-			if (likely(!eb))
+			if (likely(!eb)) {
+				SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 				break;
+			}
 		}
 
 		if (likely(tick_is_lt(now_ms, eb->key))) {
+			ret = eb->key;
+			SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 			/* timer not expired yet, revisit it later */
-			return eb->key;
+			return ret;
 		}
 
 		/* timer looks expired, detach it from the queue */
@@ -150,10 +161,11 @@
 		 */
 		if (!tick_is_expired(task->expire, now_ms)) {
 			if (!tick_isset(task->expire))
-				continue;
+				goto lookup_next;
 			__task_queue(task);
-			continue;
+			goto lookup_next;
 		}
+		SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
 		task_wakeup(task, TASK_WOKEN_TIMER);
 	}
 
@@ -192,6 +204,7 @@
 	if (likely(niced_tasks))
 		max_processed = (max_processed + 3) / 4;
 
+	SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
 	while (max_processed > 0) {
 		/* Note: this loop is one of the fastest code path in
 		 * the whole program. It should not be re-arranged
@@ -216,12 +229,14 @@
 		while (local_tasks_count < 16) {
 			t = eb32_entry(rq_next, struct task, rq);
 			rq_next = eb32_next(rq_next);
-			/* detach the task from the queue */
-			__task_unlink_rq(t);
-			t->state |= TASK_RUNNING;
-			t->pending_state = 0;
-			t->calls++;
-			local_tasks[local_tasks_count++] = t;
+			if (t->process_mask & (1UL << tid)) {
+				/* detach the task from the queue */
+				__task_unlink_rq(t);
+				t->state |= TASK_RUNNING;
+				t->pending_state = 0;
+				t->calls++;
+				local_tasks[local_tasks_count++] = t;
+			}
 			if (!rq_next) {
 				if (rewind || !(rq_next = eb32_first(&rqueue))) {
 					break;
@@ -233,6 +248,7 @@
 		if (!local_tasks_count)
 			break;
 
+		SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 
 		for (i = 0; i < local_tasks_count ; i++) {
 			t = local_tasks[i];
@@ -247,6 +263,7 @@
 		}
 
 		max_processed -= local_tasks_count;
+		SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
 		for (i = 0; i < local_tasks_count ; i++) {
 			t = local_tasks[i];
 			if (likely(t != NULL)) {
@@ -263,6 +280,7 @@
 			}
 		}
 	}
+	SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
@@ -270,6 +288,8 @@
 {
 	memset(&timers, 0, sizeof(timers));
 	memset(&rqueue, 0, sizeof(rqueue));
+	SPIN_INIT(&wq_lock);
+	SPIN_INIT(&rq_lock);
 	pool2_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
 	if (!pool2_task)
 		return 0;