MAJOR: threads/task: handle multithread on task scheduler
2 global locks have been added to protect, respectively, the run queue and the
wait queue. And a process mask has been added on each task. Like for FDs, this
mask is used to know which threads are allowed to process a task.
For many tasks, all threads are granted. And this must be your first intension
when you create a new task, else you have a good reason to make a task sticky on
some threads. This is then the responsibility to the process callback to lock
what have to be locked in the task context.
Nevertheless, all tasks linked to a session must be sticky on the thread
creating the session. It is important that I/O handlers processing session FDs
and these tasks run on the same thread to avoid conflicts.
diff --git a/src/cfgparse.c b/src/cfgparse.c
index 1cb98f3..e765fdb 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -42,6 +42,7 @@
#include <common/time.h>
#include <common/uri_auth.h>
#include <common/namespace.h>
+#include <common/hathreads.h>
#include <types/capture.h>
#include <types/compression.h>
@@ -8862,7 +8863,7 @@
}
/* create the task associated with the proxy */
- curproxy->task = task_new();
+ curproxy->task = task_new(MAX_THREADS_MASK);
if (curproxy->task) {
curproxy->task->context = curproxy;
curproxy->task->process = manage_proxy;
diff --git a/src/checks.c b/src/checks.c
index 704bed2..3d60237 100644
--- a/src/checks.c
+++ b/src/checks.c
@@ -2226,7 +2226,7 @@
{
struct task *t;
/* task for the check */
- if ((t = task_new()) == NULL) {
+ if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
Alert("Starting [%s:%s] check: out of memory.\n",
check->server->proxy->id, check->server->id);
return 0;
@@ -2272,7 +2272,7 @@
for (px = proxy; px; px = px->next) {
for (s = px->srv; s; s = s->next) {
if (s->slowstart) {
- if ((t = task_new()) == NULL) {
+ if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
Alert("Starting [%s:%s] check: out of memory.\n", px->id, s->id);
return ERR_ALERT | ERR_FATAL;
}
@@ -3130,7 +3130,7 @@
check->port = 587;
//check->server = s;
- if ((t = task_new()) == NULL) {
+ if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
memprintf(err, "out of memory while allocating mailer alerts task");
goto error;
}
diff --git a/src/dns.c b/src/dns.c
index c2b87c0..3383faa 100644
--- a/src/dns.c
+++ b/src/dns.c
@@ -1851,7 +1851,7 @@
}
/* Create the task associated to the resolvers section */
- if ((t = task_new()) == NULL) {
+ if ((t = task_new(MAX_THREADS_MASK)) == NULL) {
Alert("config : resolvers '%s' : out of memory.\n", resolvers->id);
err_code |= (ERR_ALERT|ERR_ABORT);
goto err;
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 51730a2..9543f8f 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1939,7 +1939,7 @@
memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size);
appctx->st0 = SPOE_APPCTX_ST_CONNECT;
- if ((SPOE_APPCTX(appctx)->task = task_new()) == NULL)
+ if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL)
goto out_free_spoe_appctx;
SPOE_APPCTX(appctx)->owner = appctx;
@@ -1975,10 +1975,10 @@
strm->do_log = NULL;
strm->res.flags |= CF_READ_DONTWAIT;
- task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
conf->agent->applets_act++;
+ task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
task_wakeup(strm->task, TASK_WOKEN_INIT);
return appctx;
diff --git a/src/haproxy.c b/src/haproxy.c
index 170b002..ff63844 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -1511,7 +1511,7 @@
exit(2);
}
- global_listener_queue_task = task_new();
+ global_listener_queue_task = task_new(MAX_THREADS_MASK);
if (!global_listener_queue_task) {
Alert("Out of memory when initializing global task\n");
exit(1);
diff --git a/src/hlua.c b/src/hlua.c
index f5fed04..137117c 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -5450,7 +5450,7 @@
if (!hlua)
WILL_LJMP(luaL_error(L, "lua out of memory error."));
- task = task_new();
+ task = task_new(MAX_THREADS_MASK);
task->context = hlua;
task->process = hlua_process_task;
@@ -6031,7 +6031,7 @@
ctx->ctx.hlua_apptcp.flags = 0;
/* Create task used by signal to wakeup applets. */
- task = task_new();
+ task = task_new(MAX_THREADS_MASK);
if (!task) {
SEND_ERR(px, "Lua applet tcp '%s': out of memory.\n",
ctx->rule->arg.hlua_rule->fcn.name);
@@ -6232,7 +6232,7 @@
ctx->ctx.hlua_apphttp.flags |= APPLET_HTTP11;
/* Create task used by signal to wakeup applets. */
- task = task_new();
+ task = task_new(MAX_THREADS_MASK);
if (!task) {
SEND_ERR(px, "Lua applet http '%s': out of memory.\n",
ctx->rule->arg.hlua_rule->fcn.name);
@@ -6777,7 +6777,7 @@
* We use the same wakeup fonction than the Lua applet_tcp and
* applet_http. It is absolutely compatible.
*/
- appctx->ctx.hlua_cli.task = task_new();
+ appctx->ctx.hlua_cli.task = task_new(MAX_THREADS_MASK);
if (!appctx->ctx.hlua_cli.task) {
SEND_ERR(NULL, "Lua cli '%s': out of memory.\n", fcn->name);
goto error;
diff --git a/src/peers.c b/src/peers.c
index 4c85af5..b98ec61 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -2055,7 +2055,7 @@
list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
listener->maxconn = peers->peers_fe->maxconn;
- peers->sync_task = task_new();
+ peers->sync_task = task_new(MAX_THREADS_MASK);
peers->sync_task->process = process_peer_sync;
peers->sync_task->context = (void *)peers;
peers->sighandler = signal_register_task(0, peers->sync_task, 0);
diff --git a/src/proxy.c b/src/proxy.c
index 53f886e..71091d0 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -979,7 +979,7 @@
stopping = 1;
if (tick_isset(global.hard_stop_after)) {
- task = task_new();
+ task = task_new(MAX_THREADS_MASK);
if (task) {
task->process = hard_stop;
task_schedule(task, tick_add(now_ms, global.hard_stop_after));
diff --git a/src/session.c b/src/session.c
index ecfa2f1..54a879b 100644
--- a/src/session.c
+++ b/src/session.c
@@ -241,7 +241,7 @@
* conn -- owner ---> task <-----+
*/
if (cli_conn->flags & (CO_FL_HANDSHAKE | CO_FL_EARLY_SSL_HS)) {
- if (unlikely((sess->task = task_new()) == NULL))
+ if (unlikely((sess->task = task_new(tid_bit)) == NULL))
goto out_free_sess;
conn_set_xprt_done_cb(cli_conn, conn_complete_session);
diff --git a/src/stick_table.c b/src/stick_table.c
index d95c77f..5e82116 100644
--- a/src/stick_table.c
+++ b/src/stick_table.c
@@ -436,7 +436,7 @@
t->exp_next = TICK_ETERNITY;
if ( t->expire ) {
- t->exp_task = task_new();
+ t->exp_task = task_new(MAX_THREADS_MASK);
t->exp_task->process = process_table_expire;
t->exp_task->context = (void *)t;
}
diff --git a/src/stream.c b/src/stream.c
index a78ee96..522441f 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -163,7 +163,7 @@
s->flags |= SF_INITIALIZED;
s->unique_id = NULL;
- if ((t = task_new()) == NULL)
+ if ((t = task_new(tid_bit)) == NULL)
goto out_fail_alloc;
s->task = t;
diff --git a/src/task.c b/src/task.c
index 8d4ab39..0022bff 100644
--- a/src/task.c
+++ b/src/task.c
@@ -36,6 +36,10 @@
unsigned int nb_tasks_cur = 0; /* copy of the tasks count */
unsigned int niced_tasks = 0; /* number of niced tasks in the run queue */
+#ifdef USE_THREAD
+HA_SPINLOCK_T rq_lock; /* spin lock related to run queue */
+HA_SPINLOCK_T wq_lock; /* spin lock related to wait queue */
+#endif
static struct eb_root timers; /* sorted timers tree */
static struct eb_root rqueue; /* tree constituting the run queue */
@@ -113,22 +117,29 @@
{
struct task *task;
struct eb32_node *eb;
+ int ret;
while (1) {
+ SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
+ lookup_next:
eb = eb32_lookup_ge(&timers, now_ms - TIMER_LOOK_BACK);
- if (unlikely(!eb)) {
+ if (!eb) {
/* we might have reached the end of the tree, typically because
* <now_ms> is in the first half and we're first scanning the last
* half. Let's loop back to the beginning of the tree now.
*/
eb = eb32_first(&timers);
- if (likely(!eb))
+ if (likely(!eb)) {
+ SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
break;
+ }
}
if (likely(tick_is_lt(now_ms, eb->key))) {
+ ret = eb->key;
+ SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
/* timer not expired yet, revisit it later */
- return eb->key;
+ return ret;
}
/* timer looks expired, detach it from the queue */
@@ -150,10 +161,11 @@
*/
if (!tick_is_expired(task->expire, now_ms)) {
if (!tick_isset(task->expire))
- continue;
+ goto lookup_next;
__task_queue(task);
- continue;
+ goto lookup_next;
}
+ SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
task_wakeup(task, TASK_WOKEN_TIMER);
}
@@ -192,6 +204,7 @@
if (likely(niced_tasks))
max_processed = (max_processed + 3) / 4;
+ SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
while (max_processed > 0) {
/* Note: this loop is one of the fastest code path in
* the whole program. It should not be re-arranged
@@ -216,12 +229,14 @@
while (local_tasks_count < 16) {
t = eb32_entry(rq_next, struct task, rq);
rq_next = eb32_next(rq_next);
- /* detach the task from the queue */
- __task_unlink_rq(t);
- t->state |= TASK_RUNNING;
- t->pending_state = 0;
- t->calls++;
- local_tasks[local_tasks_count++] = t;
+ if (t->process_mask & (1UL << tid)) {
+ /* detach the task from the queue */
+ __task_unlink_rq(t);
+ t->state |= TASK_RUNNING;
+ t->pending_state = 0;
+ t->calls++;
+ local_tasks[local_tasks_count++] = t;
+ }
if (!rq_next) {
if (rewind || !(rq_next = eb32_first(&rqueue))) {
break;
@@ -233,6 +248,7 @@
if (!local_tasks_count)
break;
+ SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
for (i = 0; i < local_tasks_count ; i++) {
t = local_tasks[i];
@@ -247,6 +263,7 @@
}
max_processed -= local_tasks_count;
+ SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
for (i = 0; i < local_tasks_count ; i++) {
t = local_tasks[i];
if (likely(t != NULL)) {
@@ -263,6 +280,7 @@
}
}
}
+ SPIN_UNLOCK(TASK_RQ_LOCK, &rq_lock);
}
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
@@ -270,6 +288,8 @@
{
memset(&timers, 0, sizeof(timers));
memset(&rqueue, 0, sizeof(rqueue));
+ SPIN_INIT(&wq_lock);
+ SPIN_INIT(&rq_lock);
pool2_task = create_pool("task", sizeof(struct task), MEM_F_SHARED);
if (!pool2_task)
return 0;