MAJOR: threads/task: handle multithread on task scheduler
2 global locks have been added to protect, respectively, the run queue and the
wait queue. And a process mask has been added on each task. Like for FDs, this
mask is used to know which threads are allowed to process a task.
For many tasks, all threads are granted. And this must be your first intension
when you create a new task, else you have a good reason to make a task sticky on
some threads. This is then the responsibility to the process callback to lock
what have to be locked in the task context.
Nevertheless, all tasks linked to a session must be sticky on the thread
creating the session. It is important that I/O handlers processing session FDs
and these tasks run on the same thread to avoid conflicts.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index a2c047e..19cdf83 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -142,6 +142,8 @@
FDCACHE_LOCK,
FD_LOCK,
POLL_LOCK,
+ TASK_RQ_LOCK,
+ TASK_WQ_LOCK,
POOL_LOCK,
LOCK_LABELS
};
@@ -226,7 +228,7 @@
static inline void show_lock_stats()
{
const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
- "POOL" };
+ "TASK_RQ", "TASK_WQ", "POOL" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
diff --git a/include/proto/task.h b/include/proto/task.h
index c6177d0..bc3a173 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -30,6 +30,8 @@
#include <common/mini-clist.h>
#include <common/standard.h>
#include <common/ticks.h>
+#include <common/hathreads.h>
+
#include <eb32tree.h>
#include <types/global.h>
@@ -86,6 +88,10 @@
extern unsigned int niced_tasks; /* number of niced tasks in the run queue */
extern struct pool_head *pool2_task;
extern struct pool_head *pool2_notification;
+#ifdef USE_THREAD
+extern HA_SPINLOCK_T rq_lock; /* spin lock related to run queue */
+extern HA_SPINLOCK_T wq_lock; /* spin lock related to wait queue */
+#endif
/* return 0 if task is in run queue, otherwise non-zero */
static inline int task_in_rq(struct task *t)
@@ -103,19 +109,29 @@
struct task *__task_wakeup(struct task *t);
static inline struct task *task_wakeup(struct task *t, unsigned int f)
{
+ SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+
/* If task is running, we postpone the call
* and backup the state.
*/
if (unlikely(t->state & TASK_RUNNING)) {
t->pending_state |= f;
+ SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
return t;
}
if (likely(!task_in_rq(t)))
__task_wakeup(t);
t->state |= f;
+ SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
+
return t;
}
+static inline void task_set_affinity(struct task *t, unsigned long thread_mask)
+{
+
+ t->process_mask = thread_mask;
+}
/*
* Unlink the task from the wait queue, and possibly update the last_timer
* pointer. A pointer to the task itself is returned. The task *must* already
@@ -130,8 +146,10 @@
static inline struct task *task_unlink_wq(struct task *t)
{
+ SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
if (likely(task_in_wq(t)))
__task_unlink_wq(t);
+ SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
return t;
}
@@ -156,9 +174,10 @@
*/
static inline struct task *task_unlink_rq(struct task *t)
{
- if (likely(task_in_rq(t))) {
+ SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
+ if (likely(task_in_rq(t)))
__task_unlink_rq(t);
- }
+ SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
return t;
}
@@ -178,11 +197,12 @@
* state). The task is returned. This function should not be used outside of
* task_new().
*/
-static inline struct task *task_init(struct task *t)
+static inline struct task *task_init(struct task *t, unsigned long thread_mask)
{
t->wq.node.leaf_p = NULL;
t->rq.node.leaf_p = NULL;
t->pending_state = t->state = TASK_SLEEPING;
+ t->process_mask = thread_mask;
t->nice = 0;
t->calls = 0;
t->expire = TICK_ETERNITY;
@@ -194,12 +214,12 @@
* case of lack of memory. The task count is incremented. Tasks should only
* be allocated this way, and must be freed using task_free().
*/
-static inline struct task *task_new(void)
+static inline struct task *task_new(unsigned long thread_mask)
{
struct task *t = pool_alloc2(pool2_task);
if (t) {
- nb_tasks++;
- task_init(t);
+ HA_ATOMIC_ADD(&nb_tasks, 1);
+ task_init(t, thread_mask);
}
return t;
}
@@ -213,7 +233,7 @@
pool_free2(pool2_task, t);
if (unlikely(stopping))
pool_flush2(pool2_task);
- nb_tasks--;
+ HA_ATOMIC_SUB(&nb_tasks, 1);
}
/* Place <task> into the wait queue, where it may already be. If the expiration
@@ -234,8 +254,10 @@
if (!tick_isset(task->expire))
return;
+ SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
__task_queue(task);
+ SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
}
/* Ensure <task> will be woken up at most at <when>. If the task is already in
@@ -244,15 +266,18 @@
*/
static inline void task_schedule(struct task *task, int when)
{
+ /* TODO: mthread, check if there is no tisk with this test */
if (task_in_rq(task))
return;
+ SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
if (task_in_wq(task))
when = tick_first(when, task->expire);
task->expire = when;
if (!task_in_wq(task) || tick_is_lt(task->expire, task->wq.key))
__task_queue(task);
+ SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
}
/* This function register a new signal. "lua" is the current lua
diff --git a/include/types/task.h b/include/types/task.h
index e0ae382..da7c929 100644
--- a/include/types/task.h
+++ b/include/types/task.h
@@ -69,6 +69,7 @@
void *context; /* the task's context */
struct eb32_node wq; /* ebtree node used to hold the task in the wait queue */
int expire; /* next expiration date for this task, in ticks */
+ unsigned long process_mask; /* mask of thread IDs authorized to process the task */
};
/*