MAJOR: applets: Use tasks, instead of rolling our own scheduler.

There's no real reason to have a specific scheduler for applets anymore, so
nuke it and just use tasks. This comes with some benefits, the first one
being that applets cannot induce high latencies anymore since they share
nice values with other tasks. Later it will be possible to configure the
applets' nice value. The second benefit is that the applet scheduler was
not very thread-friendly, having a big lock around it in prevision of this
change. Thus applet-intensive workloads should now scale much better with
threads.

Some more improvement is possible now : some applets also use a task to
handle timers and timeouts. These ones could now be simplified to use only
one task.
diff --git a/include/proto/applet.h b/include/proto/applet.h
index 7cc2c0a..00ad63b 100644
--- a/include/proto/applet.h
+++ b/include/proto/applet.h
@@ -28,14 +28,11 @@
 #include <common/mini-clist.h>
 #include <types/applet.h>
 #include <proto/connection.h>
+#include <proto/task.h>
 
 extern unsigned int nb_applets;
-extern unsigned long active_applets_mask;
-extern unsigned int applets_active_queue;
-__decl_hathreads(extern HA_SPINLOCK_T applet_active_lock);
-extern struct list applet_active_queue;
 
-void applet_run_active();
+struct task *task_run_applet(struct task *t, void *context, unsigned short state);
 
 
 static int inline appctx_res_wakeup(struct appctx *appctx);
@@ -52,7 +49,7 @@
 	appctx->chunk = NULL;
 	appctx->io_release = NULL;
 	appctx->thread_mask = thread_mask;
-	appctx->state = APPLET_SLEEPING;
+	appctx->state = 0;
 }
 
 /* Tries to allocate a new appctx and initialize its main fields. The appctx
@@ -69,7 +66,13 @@
 		appctx->obj_type = OBJ_TYPE_APPCTX;
 		appctx->applet = applet;
 		appctx_init(appctx, thread_mask);
-		LIST_INIT(&appctx->runq);
+		appctx->t = task_new(thread_mask);
+		if (unlikely(appctx->t == NULL)) {
+			pool_free(pool_head_connection, appctx);
+			return NULL;
+		}
+		appctx->t->process = task_run_applet;
+		appctx->t->context = appctx;
 		LIST_INIT(&appctx->buffer_wait.list);
 		appctx->buffer_wait.target = appctx;
 		appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
@@ -83,11 +86,8 @@
  */
 static inline void __appctx_free(struct appctx *appctx)
 {
-	if (!LIST_ISEMPTY(&appctx->runq)) {
-		LIST_DEL(&appctx->runq);
-		applets_active_queue--;
-	}
-
+	task_delete(appctx->t);
+	task_free(appctx->t);
 	if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
 		HA_SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
 		LIST_DEL(&appctx->buffer_wait.list);
@@ -98,38 +98,27 @@
 	pool_free(pool_head_connection, appctx);
 	HA_ATOMIC_SUB(&nb_applets, 1);
 }
+
 static inline void appctx_free(struct appctx *appctx)
 {
-	HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
-	if (appctx->state & APPLET_RUNNING) {
+	/* The task is supposed to be run on this thread, so we can just
+	 * check if it's running already (or about to run) or not
+	 */
+	if (!(appctx->t->state & TASK_RUNNING))
+		__appctx_free(appctx);
+	else {
+		/* if it's running, or about to run, defer the freeing
+		 * until the callback is called.
+		 */
 		appctx->state |= APPLET_WANT_DIE;
-		HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-		return;
+		task_wakeup(appctx->t, TASK_WOKEN_OTHER);
 	}
-	__appctx_free(appctx);
-	HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
 }
 
 /* wakes up an applet when conditions have changed */
-static inline void __appctx_wakeup(struct appctx *appctx)
-{
-	if (LIST_ISEMPTY(&appctx->runq)) {
-		LIST_ADDQ(&applet_active_queue, &appctx->runq);
-		applets_active_queue++;
-		active_applets_mask |= appctx->thread_mask;
-	}
-}
-
 static inline void appctx_wakeup(struct appctx *appctx)
 {
-	HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
-	if (appctx->state & APPLET_RUNNING) {
-		appctx->state |= APPLET_WOKEN_UP;
-		HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-		return;
-	}
-	__appctx_wakeup(appctx);
-	HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
+	task_wakeup(appctx->t, TASK_WOKEN_OTHER);
 }
 
 /* Callback used to wake up an applet when a buffer is available. The applet
@@ -139,19 +128,17 @@
  * requested */
 static inline int appctx_res_wakeup(struct appctx *appctx)
 {
-	HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
-	if (appctx->state & APPLET_RUNNING) {
-		if (appctx->state & APPLET_WOKEN_UP) {
-			HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-			return 0;
-		}
-		appctx->state |= APPLET_WOKEN_UP;
-		HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-		return 1;
-	}
-	__appctx_wakeup(appctx);
-	HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-	return 1;
+	int ret;
+
+	/* To detect if we have already been waken or not, we now that
+	 * if the state contains TASK_RUNNING, but not just TASK_RUNNING.
+	 * This is racy, but that's OK. At worst we will wake a little more
+	 * tasks than necessary when a buffer is available.
+	 */
+	ret = ((appctx->state & TASK_RUNNING) != 0) &&
+	      ((appctx->state != TASK_RUNNING));
+	task_wakeup(appctx->t, TASK_WOKEN_OTHER);
+	return ret;
 }
 
 
diff --git a/include/proto/channel.h b/include/proto/channel.h
index 274495f..d66fc91 100644
--- a/include/proto/channel.h
+++ b/include/proto/channel.h
@@ -36,7 +36,6 @@
 #include <types/stream.h>
 #include <types/stream_interface.h>
 
-#include <proto/applet.h>
 #include <proto/task.h>
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
@@ -456,7 +455,7 @@
 {
 	if (chn->buf->size && buffer_empty(chn->buf)) {
 		b_free(&chn->buf);
-		offer_buffers(wait->target, tasks_run_queue + applets_active_queue);
+		offer_buffers(wait->target, tasks_run_queue);
 	}
 }
 
diff --git a/include/types/applet.h b/include/types/applet.h
index b071586..8b7c28f 100644
--- a/include/types/applet.h
+++ b/include/types/applet.h
@@ -45,17 +45,13 @@
 	unsigned int timeout;              /* execution timeout. */
 };
 
-#define APPLET_SLEEPING     0x00  /* applet is currently sleeping or pending in active queue */
-#define APPLET_RUNNING      0x01  /* applet is currently running */
-#define APPLET_WOKEN_UP     0x02  /* applet was running and requested to woken up again */
-#define APPLET_WANT_DIE     0x04  /* applet was running and requested to die */
+#define APPLET_WANT_DIE     0x01  /* applet was running and requested to die */
 
 #define APPCTX_CLI_ST1_PROMPT  (1 << 0)
 #define APPCTX_CLI_ST1_PAYLOAD (1 << 1)
 
 /* Context of a running applet. */
 struct appctx {
-	struct list runq;          /* chaining in the applet run queue */
 	enum obj_type obj_type;    /* OBJ_TYPE_APPCTX */
 	/* 3 unused bytes here */
 	unsigned short state;      /* Internal appctx state */
@@ -72,6 +68,7 @@
 	int cli_severity_output;        /* used within the cli_io_handler to format severity output of informational feedback */
 	struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
 	unsigned long thread_mask;      /* mask of thread IDs authorized to process the applet */
+	struct task *t;                  /* task associated to the applet */
 
 	union {
 		struct {
diff --git a/include/types/global.h b/include/types/global.h
index d603c42..2ab124b 100644
--- a/include/types/global.h
+++ b/include/types/global.h
@@ -183,7 +183,6 @@
 	unsigned int loops;        // complete loops in run_poll_loop()
 	unsigned int wake_cache;   // active fd_cache prevented poll() from sleeping
 	unsigned int wake_tasks;   // active tasks prevented poll() from sleeping
-	unsigned int wake_applets; // active applets prevented poll() from sleeping
 	unsigned int wake_signal;  // pending signal prevented poll() from sleeping
 	unsigned int poll_exp;     // number of times poll() sees an expired timeout (includes wake_*)
 	unsigned int poll_drop;    // poller dropped a dead FD from the update list
diff --git a/src/applet.c b/src/applet.c
index 77f984d..d7fbb53 100644
--- a/src/applet.c
+++ b/src/applet.c
@@ -19,100 +19,36 @@
 #include <proto/channel.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
+#include <proto/task.h>
 
 unsigned int nb_applets = 0;
-unsigned long active_applets_mask = 0;
-unsigned int applets_active_queue = 0;
-__decl_hathreads(HA_SPINLOCK_T applet_active_lock);  /* spin lock related to applet active queue */
 
-struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue);
-
-void applet_run_active()
+struct task *task_run_applet(struct task *t, void *context, unsigned short state)
 {
-	struct appctx *curr, *next;
-	struct stream_interface *si;
-	struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue);
-	int max_processed;
+	struct appctx *app = context;
+	struct stream_interface *si = app->owner;
 
-	max_processed = applets_active_queue;
-	if (max_processed > 200)
-		max_processed = 200;
-
-	HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
-	if (!(active_applets_mask & tid_bit)) {
-		HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-		return;
+	if (app->state & APPLET_WANT_DIE) {
+		__appctx_free(app);
+		return NULL;
 	}
-	active_applets_mask &= ~tid_bit;
-	curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq);
-	while (&curr->runq != &applet_active_queue) {
-		next = LIST_NEXT(&curr->runq, typeof(next), runq);
-		if (curr->thread_mask & tid_bit) {
-			LIST_DEL(&curr->runq);
-			curr->state = APPLET_RUNNING;
-			LIST_ADDQ(&applet_cur_queue, &curr->runq);
-			applets_active_queue--;
-			max_processed--;
-		}
-		curr = next;
-		if (max_processed <= 0) {
-			active_applets_mask |= tid_bit;
-			break;
-		}
-	}
-	HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
+	/* Now we'll try to allocate the input buffer. We wake up the
+	 * applet in all cases. So this is the applet responsibility to
+	 * check if this buffer was allocated or not. This let a chance
+	 * for applets to do some other processing if needed. */
+	if (!channel_alloc_buffer(si_ic(si), &app->buffer_wait))
+		si_applet_cant_put(si);
 
-	/* The list is only scanned from the head. This guarantees that if any
-	 * applet removes another one, there is no side effect while walking
-	 * through the list.
+	/* We always pretend the applet can't get and doesn't want to
+	 * put, it's up to it to change this if needed. This ensures
+	 * that one applet which ignores any event will not spin.
 	 */
-	while (!LIST_ISEMPTY(&applet_cur_queue)) {
-		curr = LIST_ELEM(applet_cur_queue.n, typeof(curr), runq);
-		si = curr->owner;
+	si_applet_cant_get(si);
+	si_applet_stop_put(si);
 
-		/* Now we'll try to allocate the input buffer. We wake up the
-		 * applet in all cases. So this is the applet responsibility to
-		 * check if this buffer was allocated or not. This let a chance
-		 * for applets to do some other processing if needed. */
-		if (!channel_alloc_buffer(si_ic(si), &curr->buffer_wait))
-			si_applet_cant_put(si);
-
-		/* We always pretend the applet can't get and doesn't want to
-		 * put, it's up to it to change this if needed. This ensures
-		 * that one applet which ignores any event will not spin.
-		 */
-		si_applet_cant_get(si);
-		si_applet_stop_put(si);
-
-		curr->applet->fct(curr);
-		si_applet_wake_cb(si);
-		channel_release_buffer(si_ic(si), &curr->buffer_wait);
-
-		if (applet_cur_queue.n == &curr->runq) {
-			/* curr was left in the list, move it back to the active list */
-			LIST_DEL(&curr->runq);
-			LIST_INIT(&curr->runq);
-			HA_SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
-			if (curr->state & APPLET_WANT_DIE) {
-				curr->state = APPLET_SLEEPING;
-				__appctx_free(curr);
-			}
-			else {
-				if (curr->state & APPLET_WOKEN_UP) {
-					curr->state = APPLET_SLEEPING;
-					__appctx_wakeup(curr);
-				}
-				else {
-					curr->state = APPLET_SLEEPING;
-				}
-			}
-			HA_SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
-		}
-	}
+	app->applet->fct(app);
+	si_applet_wake_cb(si);
+	channel_release_buffer(si_ic(si), &app->buffer_wait);
+	return t;
 }
 
-__attribute__((constructor))
-static void __applet_init(void)
-{
-	HA_SPIN_INIT(&applet_active_lock);
-}
diff --git a/src/cfgparse.c b/src/cfgparse.c
index 023973f..3d224b9 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -84,6 +84,7 @@
 #include <proto/stick_table.h>
 #include <proto/task.h>
 #include <proto/tcp_rules.h>
+#include <proto/connection.h>
 
 
 /* This is the SSLv3 CLIENT HELLO packet used in conjunction with the
diff --git a/src/cli.c b/src/cli.c
index 4adf684..c34078f 100644
--- a/src/cli.c
+++ b/src/cli.c
@@ -953,7 +953,6 @@
 	chunk_appendf(&trash, "\nloops:");        for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].loops);
 	chunk_appendf(&trash, "\nwake_cache:");   for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_cache);
 	chunk_appendf(&trash, "\nwake_tasks:");   for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_tasks);
-	chunk_appendf(&trash, "\nwake_applets:"); for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_applets);
 	chunk_appendf(&trash, "\nwake_signal:");  for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].wake_signal);
 	chunk_appendf(&trash, "\npoll_exp:");     for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].poll_exp);
 	chunk_appendf(&trash, "\npoll_drop:");    for (thr = 0; thr < global.nbthread; thr++) chunk_appendf(&trash, " %u", activity[thr].poll_drop);
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 856050d..c256303 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -2840,8 +2840,7 @@
 	/* Release the buffer if needed */
 	if ((*buf)->size) {
 		b_free(buf);
-		offer_buffers(buffer_wait->target,
-			      tasks_run_queue + applets_active_queue);
+		offer_buffers(buffer_wait->target, tasks_run_queue);
 	}
 }
 
diff --git a/src/haproxy.c b/src/haproxy.c
index 4e61e40..766758f 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -90,7 +90,6 @@
 #include <types/peers.h>
 
 #include <proto/acl.h>
-#include <proto/applet.h>
 #include <proto/arg.h>
 #include <proto/auth.h>
 #include <proto/backend.h>
@@ -2419,8 +2418,6 @@
 			activity[tid].wake_cache++;
 		else if (active_tasks_mask & tid_bit)
 			activity[tid].wake_tasks++;
-		else if (active_applets_mask & tid_bit)
-			activity[tid].wake_applets++;
 		else if (signal_queue_len)
 			activity[tid].wake_signal++;
 		else
@@ -2429,7 +2426,6 @@
 		/* The poller will ensure it returns around <next> */
 		cur_poller.poll(&cur_poller, exp);
 		fd_process_cached_events();
-		applet_run_active();
 
 
 		/* Synchronize all polling loops */
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 9c7b828..57b1722 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -17,7 +17,6 @@
 #include <common/hpack-enc.h>
 #include <common/hpack-tbl.h>
 #include <common/net_helper.h>
-#include <proto/applet.h>
 #include <proto/connection.h>
 #include <proto/h1.h>
 #include <proto/stream.h>
@@ -303,8 +302,7 @@
 {
 	if ((*bptr)->size) {
 		b_free(bptr);
-		offer_buffers(h2c->buf_wait.target,
-			      tasks_run_queue + applets_active_queue);
+		offer_buffers(h2c->buf_wait.target, tasks_run_queue);
 	}
 }
 
diff --git a/src/stream.c b/src/stream.c
index 3ea8953..7ad84e9 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -339,7 +339,7 @@
 	if (s->req.buf->size || s->res.buf->size) {
 		b_drop(&s->req.buf);
 		b_drop(&s->res.buf);
-		offer_buffers(NULL, tasks_run_queue + applets_active_queue);
+		offer_buffers(NULL, tasks_run_queue);
 	}
 
 	hlua_ctx_destroy(s->hlua);
@@ -469,7 +469,7 @@
 	 * someone waiting, we can wake up a waiter and offer them.
 	 */
 	if (offer)
-		offer_buffers(s, tasks_run_queue + applets_active_queue);
+		offer_buffers(s, tasks_run_queue);
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */