MEDIUM: task: implement tasklet kill
Implement an equivalent of task_kill for tasklets. This function can be
used to request a tasklet deletion in a thread-safe way.
Currently this function is unused.
diff --git a/include/haproxy/task.h b/include/haproxy/task.h
index 336dad4..f83aba7 100644
--- a/include/haproxy/task.h
+++ b/include/haproxy/task.h
@@ -107,6 +107,7 @@
void __tasklet_wakeup_on(struct tasklet *tl, int thr);
void task_kill(struct task *t);
+void tasklet_kill(struct tasklet *t);
void __task_wakeup(struct task *t);
void __task_queue(struct task *task, struct eb_root *wq);
diff --git a/src/task.c b/src/task.c
index c3145b0..7dc0cd4 100644
--- a/src/task.c
+++ b/src/task.c
@@ -103,6 +103,44 @@
}
}
+/* Equivalent of task_kill for tasklets. Mark the tasklet <t> for destruction.
+ * It will be deleted on the next scheduler invocation. This function is
+ * thread-safe : a thread can kill a tasklet of another thread.
+ */
+void tasklet_kill(struct tasklet *t)
+{
+ unsigned int state = t->state;
+ unsigned int thr;
+
+ BUG_ON(state & TASK_KILLED);
+
+ while (1) {
+ while (state & (TASK_IN_LIST)) {
+ /* Tasklet already in the list ready to be executed. Add
+ * the killed flag and wait for the process loop to
+ * detect it.
+ */
+ if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_KILLED))
+ return;
+ }
+
+ /* Mark the tasklet as killed and wake the thread to process it
+ * as soon as possible.
+ */
+ if (_HA_ATOMIC_CAS(&t->state, &state, state | TASK_IN_LIST | TASK_KILLED)) {
+ thr = t->tid > 0 ? t->tid: tid;
+ MT_LIST_APPEND(&task_per_thread[thr].shared_tasklet_list,
+ (struct mt_list *)&t->list);
+ _HA_ATOMIC_INC(&task_per_thread[thr].rq_total);
+ if (sleeping_thread_mask & (1UL << thr)) {
+ _HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
+ wake_thread(thr);
+ }
+ return;
+ }
+ }
+}
+
/* Do not call this one, please use tasklet_wakeup_on() instead, as this one is
* the slow path of tasklet_wakeup_on() which performs some preliminary checks
* and sets TASK_IN_LIST before calling this one. A negative <thr> designates
@@ -485,7 +523,7 @@
budgets[queue]--;
t = (struct task *)LIST_ELEM(tl_queues[queue].n, struct tasklet *, list);
- state = t->state & (TASK_SHARED_WQ|TASK_SELF_WAKING|TASK_HEAVY|TASK_F_TASKLET|TASK_KILLED|TASK_F_USR1);
+ state = t->state & (TASK_SHARED_WQ|TASK_SELF_WAKING|TASK_HEAVY|TASK_F_TASKLET|TASK_KILLED|TASK_F_USR1|TASK_KILLED);
ti->flags &= ~TI_FL_STUCK; // this thread is still running
activity[tid].ctxsw++;
@@ -516,7 +554,16 @@
state = _HA_ATOMIC_XCHG(&t->state, state);
__ha_barrier_atomic_store();
- process(t, ctx, state);
+ if (likely(!(state & TASK_KILLED))) {
+ process(t, ctx, state);
+ }
+ else {
+ done++;
+ sched->current = NULL;
+ pool_free(pool_head_tasklet, t);
+ __ha_barrier_store();
+ continue;
+ }
if (unlikely(task_profiling_mask & tid_bit)) {
HA_ATOMIC_INC(&profile_entry->calls);