BUG/MAJOR: task: add a new TASK_SHARED_WQ flag to fix foreing requeuing

Since 1.9 with commit b20aa9eef3 ("MAJOR: tasks: create per-thread wait
queues") a task bound to a single thread will not use locks when being
queued or dequeued because the wait queue is assumed to be the owner
thread's.

But there exists a rare situation where this is not true: the health
check tasks may be running on one thread waiting for a response, and
may in parallel be requeued by another thread calling health_adjust()
after a detecting a response error in traffic when "observe l7" is set,
and "fastinter" is lower than "inter", requiring to shorten the running
check's timeout. In this case, the task being requeued was present in
another thread's wait queue, thus opening a race during task_unlink_wq(),
and gets requeued into the calling thread's wait queue instead of the
running one's, opening a second race here.

This patch aims at protecting against the risk of calling task_unlink_wq()
from one thread while the task is queued on another thread, hence unlocked,
by introducing a new TASK_SHARED_WQ flag.

This new flag indicates that a task's position in the wait queue may be
adjusted by other threads than then one currently executing it. This means
that such WQ manipulations must be performed under a lock. There are two
types of such tasks:
  - the global ones, using the global wait queue (technically speaking,
    those whose thread_mask has at least 2 bits set).
  - some local ones, which for now will be placed into the global wait
    queue as well in order to benefit from its lock.

The flag is automatically set on initialization if the task's thread mask
indicates more than one thread. The caller must also set it if it intends
to let other threads update the task's expiration delay (e.g. delegated
I/Os), or if it intends to change the task's affinity over time as this
could lead to the same situation.

Right now only the situation described above seems to be affected by this
issue, and it is very difficult to trigger, and even then, will often have
no visible effect beyond stopping the checks for example once the race is
met. On my laptop it is feasible with the following config, chained to
httpterm:

    global
        maxconn 400 # provoke FD errors, calling health_adjust()

    defaults
        mode http
        timeout client 10s
        timeout server 10s
        timeout connect 10s

    listen px
        bind :8001
        option httpchk /?t=50
        server sback 127.0.0.1:8000 backup
        server-template s 0-999 127.0.0.1:8000 check port 8001 inter 100 fastinter 10 observe layer7

This patch will automatically address the case for the checks because
check tasks are created with multiple threads bound and will get the
TASK_SHARED_WQ flag set.

If in the future more tasks need to rely on this (multi-threaded muxes
for example) and the use of the global wait queue becomes a bottleneck
again, then it should not be too difficult to place locks on the local
wait queues and queue the task on its bound thread.

This patch needs to be backported to 2.1, 2.0 and 1.9. It depends on
previous patch "MINOR: task: only check TASK_WOKEN_ANY to decide to
requeue a task".

Many thanks to William Dauchy for providing detailed traces allowing to
spot the problem.

(cherry picked from commit dd0e89a084fc9a9f51eebb10c202d57ec9f8c91c)
Signed-off-by: Willy Tarreau <w@1wt.eu>
(cherry picked from commit 56bfc697cadb29cd568777afd442012d11072b21)
Signed-off-by: Willy Tarreau <w@1wt.eu>
diff --git a/include/proto/task.h b/include/proto/task.h
index 8649a88..5c7aa6a 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -150,7 +150,13 @@
 	}
 }
 
-/* change the thread affinity of a task to <thread_mask> */
+/* change the thread affinity of a task to <thread_mask>.
+ * This may only be done from within the running task itself or during its
+ * initialization. It will unqueue and requeue the task from the wait queue
+ * if it was in it. This is safe against a concurrent task_queue() call because
+ * task_queue() itself will unlink again if needed after taking into account
+ * the new thread_mask.
+ */
 static inline void task_set_affinity(struct task *t, unsigned long thread_mask)
 {
 	if (unlikely(task_in_wq(t))) {
@@ -175,15 +181,15 @@
 }
 
 /* remove a task from its wait queue. It may either be the local wait queue if
- * the task is bound to a single thread (in which case there's no locking
- * involved) or the global queue, with locking.
+ * the task is bound to a single thread or the global queue. If the task uses a
+ * shared wait queue, the global wait queue lock is used.
  */
 static inline struct task *task_unlink_wq(struct task *t)
 {
 	unsigned long locked;
 
 	if (likely(task_in_wq(t))) {
-		locked = atleast2(t->thread_mask);
+		locked = t->state & TASK_SHARED_WQ;
 		if (locked)
 			HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
 		__task_unlink_wq(t);
@@ -269,7 +275,8 @@
 /*
  * Initialize a new task. The bare minimum is performed (queue pointers and
  * state).  The task is returned. This function should not be used outside of
- * task_new().
+ * task_new(). If the thread mask contains more than one thread, TASK_SHARED_WQ
+ * is set.
  */
 static inline struct task *task_init(struct task *t, unsigned long thread_mask)
 {
@@ -277,6 +284,8 @@
 	t->rq.node.leaf_p = NULL;
 	t->state = TASK_SLEEPING;
 	t->thread_mask = thread_mask;
+	if (atleast2(thread_mask))
+		t->state |= TASK_SHARED_WQ;
 	t->nice = 0;
 	t->calls = 0;
 	t->call_date = 0;
@@ -381,9 +390,9 @@
 
 /* Place <task> into the wait queue, where it may already be. If the expiration
  * timer is infinite, do nothing and rely on wake_expired_task to clean up.
- * If the task is bound to a single thread, it's assumed to be bound to the
- * current thread's queue and is queued without locking. Otherwise it's queued
- * into the global wait queue, protected by locks.
+ * If the task uses a shared wait queue, it's queued into the global wait queue,
+ * protected by the global wq_lock, otherwise by it necessarily belongs to the
+ * current thread'sand is queued without locking.
  */
 static inline void task_queue(struct task *task)
 {
@@ -400,7 +409,7 @@
 		return;
 
 #ifdef USE_THREAD
-	if (atleast2(task->thread_mask)) {
+	if (task->state & TASK_SHARED_WQ) {
 		HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
 		if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
 			__task_queue(task, &timers);
@@ -408,6 +417,7 @@
 	} else
 #endif
 	{
+		BUG_ON((task->thread_mask & tid_bit) == 0); // should have TASK_SHARED_WQ
 		if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
 			__task_queue(task, &task_per_thread[tid].timers);
 	}
@@ -424,7 +434,7 @@
 		return;
 
 #ifdef USE_THREAD
-	if (atleast2(task->thread_mask)) {
+	if (task->state & TASK_SHARED_WQ) {
 		/* FIXME: is it really needed to lock the WQ during the check ? */
 		HA_RWLOCK_WRLOCK(TASK_WQ_LOCK, &wq_lock);
 		if (task_in_wq(task))
@@ -437,6 +447,7 @@
 	} else
 #endif
 	{
+		BUG_ON((task->thread_mask & tid_bit) == 0); // should have TASK_SHARED_WQ
 		if (task_in_wq(task))
 			when = tick_first(when, task->expire);
 
diff --git a/include/types/task.h b/include/types/task.h
index a949058..d02e3c1 100644
--- a/include/types/task.h
+++ b/include/types/task.h
@@ -34,6 +34,8 @@
 #define TASK_RUNNING      0x0001  /* the task is currently running */
 #define TASK_GLOBAL       0x0002  /* The task is currently in the global runqueue */
 #define TASK_QUEUED       0x0004  /* The task has been (re-)added to the run queue */
+#define TASK_SHARED_WQ    0x0008  /* The task's expiration may be updated by other
+                                   * threads, must be set before first queue/wakeup */
 
 #define TASK_WOKEN_INIT   0x0100  /* woken up for initialisation purposes */
 #define TASK_WOKEN_TIMER  0x0200  /* woken up because of expired timer */
diff --git a/src/task.c b/src/task.c
index 06eab27..c688c16 100644
--- a/src/task.c
+++ b/src/task.c
@@ -387,7 +387,8 @@
 		struct task *(*process)(struct task *t, void *ctx, unsigned short state);
 
 		t = (struct task *)LIST_ELEM(task_per_thread[tid].task_list.n, struct tasklet *, list);
-		state = _HA_ATOMIC_XCHG(&t->state, TASK_RUNNING);
+		state = (t->state & TASK_SHARED_WQ) | TASK_RUNNING;
+		state = _HA_ATOMIC_XCHG(&t->state, state);
 		__ha_barrier_atomic_store();
 		__tasklet_remove_from_tasklet_list((struct tasklet *)t);
 		if (!TASK_IS_TASKLET(t))