MINOR: task: Add tasklet_wakeup_after()
We want to be able to schedule a tasklet onto a thread after the current tasklet
is done. What we have to do is to insert this tasklet at the head of the thread
task list. Furthermore, we would like to serialize the tasklets. They must be
run in the same order as the order in which they have been scheduled. This is
implemented passing a list of tasklet as parameter (see <head> parameters) which
must be reused for subsequent calls.
_tasklet_wakeup_after_on() is implemented to accomplish this job.
tasklet_wakeup_after_on() and tasklet_wake_after() are only wrapper macros around
_tasklet_wakeup_after_on(). tasklet_wakeup_after_on() does exactly the same thing
as _tasklet_wakeup_after_on() without having to pass the filename and line in the
filename as parameters (usefull when DEBUG_TASK is enabled).
tasklet_wakeup_after() hides also the usage of the thread parameter which is
<tl> tasklet thread ID.
diff --git a/include/haproxy/task.h b/include/haproxy/task.h
index 5d65013..552f08a 100644
--- a/include/haproxy/task.h
+++ b/include/haproxy/task.h
@@ -106,6 +106,7 @@
__decl_thread(extern HA_RWLOCK_T wq_lock); /* RW lock related to the wait queue */
void __tasklet_wakeup_on(struct tasklet *tl, int thr);
+struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl);
void task_kill(struct task *t);
void tasklet_kill(struct tasklet *t);
void __task_wakeup(struct task *t);
@@ -458,6 +459,40 @@
__tasklet_wakeup_on(tl, thr);
}
+/* schedules tasklet <tl> to run immediately after the current one is done
+ * <tl> will be queued after entry <head>, or at the head of the task list. Return
+ * the new head to be used to queue future tasks. This is used to insert multiple entries
+ * at the head of the tasklet list, typically to transfer processing from a tasklet
+ * to another one or a set of other ones. If <head> is NULL, the tasklet list of <thr>
+ * thread will be used.
+ * With DEBUG_TASK, the <file>:<line> from the call place are stored into the tasklet
+ * for tracing purposes.
+ */
+#define tasklet_wakeup_after(head, tl) _tasklet_wakeup_after(head, tl, __FILE__, __LINE__)
+static inline struct list *_tasklet_wakeup_after(struct list *head, struct tasklet *tl,
+ const char *file, int line)
+{
+ unsigned int state = tl->state;
+
+ do {
+ /* do nothing if someone else already added it */
+ if (state & TASK_IN_LIST)
+ return head;
+ } while (!_HA_ATOMIC_CAS(&tl->state, &state, state | TASK_IN_LIST));
+
+ /* at this point we're the first one to add this task to the list */
+#ifdef DEBUG_TASK
+ if ((unsigned int)tl->debug.caller_idx > 1)
+ ABORT_NOW();
+ tl->debug.caller_idx = !tl->debug.caller_idx;
+ tl->debug.caller_file[tl->debug.caller_idx] = file;
+ tl->debug.caller_line[tl->debug.caller_idx] = line;
+ if (th_ctx->flags & TH_FL_TASK_PROFILING)
+ tl->call_date = now_mono_time();
+#endif
+ return __tasklet_wakeup_after(head, tl);
+}
+
/* This macro shows the current function name and the last known caller of the
* task (or tasklet) wakeup.
*/
diff --git a/src/task.c b/src/task.c
index 515a43d..19df263 100644
--- a/src/task.c
+++ b/src/task.c
@@ -181,6 +181,44 @@
}
}
+/* Do not call this one, please use tasklet_wakeup_after_on() instead, as this one is
+ * the slow path of tasklet_wakeup_after() which performs some preliminary checks
+ * and sets TASK_IN_LIST before calling this one.
+ */
+struct list *__tasklet_wakeup_after(struct list *head, struct tasklet *tl)
+{
+ BUG_ON(tid != tl->tid);
+ /* this tasklet runs on the caller thread */
+ if (!head) {
+ if (tl->state & TASK_HEAVY) {
+ LIST_INSERT(&th_ctx->tasklets[TL_HEAVY], &tl->list);
+ th_ctx->tl_class_mask |= 1 << TL_HEAVY;
+ }
+ else if (tl->state & TASK_SELF_WAKING) {
+ LIST_INSERT(&th_ctx->tasklets[TL_BULK], &tl->list);
+ th_ctx->tl_class_mask |= 1 << TL_BULK;
+ }
+ else if ((struct task *)tl == th_ctx->current) {
+ _HA_ATOMIC_OR(&tl->state, TASK_SELF_WAKING);
+ LIST_INSERT(&th_ctx->tasklets[TL_BULK], &tl->list);
+ th_ctx->tl_class_mask |= 1 << TL_BULK;
+ }
+ else if (th_ctx->current_queue < 0) {
+ LIST_INSERT(&th_ctx->tasklets[TL_URGENT], &tl->list);
+ th_ctx->tl_class_mask |= 1 << TL_URGENT;
+ }
+ else {
+ LIST_INSERT(&th_ctx->tasklets[th_ctx->current_queue], &tl->list);
+ th_ctx->tl_class_mask |= 1 << th_ctx->current_queue;
+ }
+ }
+ else {
+ LIST_APPEND(head, &tl->list);
+ }
+ _HA_ATOMIC_INC(&th_ctx->rq_total);
+ return &tl->list;
+}
+
/* Puts the task <t> in run queue at a position depending on t->nice. <t> is
* returned. The nice value assigns boosts in 32th of the run queue size. A
* nice value of -1024 sets the task to -tasks_run_queue*32, while a nice value