MAJOR: applet: applet scheduler rework.

In order to authorize call of appctx_wakeup on running task:
- from within the task handler itself.
- in futur, from another thread.

The appctx is considered paused as default after running the handler.

The handler should explicitly call appctx_wakeup to be re-called.

When the appctx_free is called on a running handler. The real
free is postponed at the end of the handler process.
diff --git a/include/proto/applet.h b/include/proto/applet.h
index 653be31..3cf8578 100644
--- a/include/proto/applet.h
+++ b/include/proto/applet.h
@@ -48,6 +48,7 @@
 {
 	appctx->st0 = appctx->st1 = appctx->st2 = 0;
 	appctx->io_release = NULL;
+	appctx->state = APPLET_SLEEPING;
 }
 
 /* Tries to allocate a new appctx and initialize its main fields. The appctx
@@ -76,7 +77,7 @@
 /* Releases an appctx previously allocated by appctx_new(). Note that
  * we share the connection pool.
  */
-static inline void appctx_free(struct appctx *appctx)
+static inline void __appctx_free(struct appctx *appctx)
 {
 	if (!LIST_ISEMPTY(&appctx->runq)) {
 		LIST_DEL(&appctx->runq);
@@ -89,9 +90,17 @@
 	pool_free2(pool2_connection, appctx);
 	nb_applets--;
 }
+static inline void appctx_free(struct appctx *appctx)
+{
+	if (appctx->state & APPLET_RUNNING) {
+		appctx->state |= APPLET_WANT_DIE;
+		return;
+	}
+	__appctx_free(appctx);
+}
 
 /* wakes up an applet when conditions have changed */
-static inline void appctx_wakeup(struct appctx *appctx)
+static inline void __appctx_wakeup(struct appctx *appctx)
 {
 	if (LIST_ISEMPTY(&appctx->runq)) {
 		LIST_ADDQ(&applet_active_queue, &appctx->runq);
@@ -99,25 +108,34 @@
 	}
 }
 
-/* removes an applet from the list of active applets */
-static inline void appctx_pause(struct appctx *appctx)
+static inline void appctx_wakeup(struct appctx *appctx)
 {
-	if (!LIST_ISEMPTY(&appctx->runq)) {
-		LIST_DEL(&appctx->runq);
-		LIST_INIT(&appctx->runq);
-		applets_active_queue--;
+	if (appctx->state & APPLET_RUNNING) {
+		appctx->state |= APPLET_WOKEN_UP;
+		return;
 	}
+	__appctx_wakeup(appctx);
 }
 
 /* Callback used to wake up an applet when a buffer is available. The applet
  * <appctx> is woken up is if it is not already in the list of "active"
  * applets. This functions returns 1 is the stream is woken up, otherwise it
- * returns 0. */
+ * returns 0. If task is running we request we check if woken was already
+ * requested */
 static inline int appctx_res_wakeup(struct appctx *appctx)
 {
-	if (!LIST_ISEMPTY(&appctx->runq))
+	if (appctx->state & APPLET_RUNNING) {
+		if (appctx->state & APPLET_WOKEN_UP) {
+			return 0;
+		}
+		appctx->state |= APPLET_WOKEN_UP;
+		return 1;
+	}
+
+	if (!LIST_ISEMPTY(&appctx->runq)) {
 		return 0;
-	appctx_wakeup(appctx);
+	}
+	__appctx_wakeup(appctx);
 	return 1;
 }
 
diff --git a/include/types/applet.h b/include/types/applet.h
index 90484f6..71976d8 100644
--- a/include/types/applet.h
+++ b/include/types/applet.h
@@ -44,11 +44,17 @@
 	unsigned int timeout;              /* execution timeout. */
 };
 
+#define APPLET_SLEEPING     0x00  /* applet is currently sleeping or pending in active queue */
+#define APPLET_RUNNING      0x01  /* applet is currently running */
+#define APPLET_WOKEN_UP     0x02  /* applet was running and requested to woken up again */
+#define APPLET_WANT_DIE     0x04  /* applet was running and requested to die */
+
 /* Context of a running applet. */
 struct appctx {
 	struct list runq;          /* chaining in the applet run queue */
 	enum obj_type obj_type;    /* OBJ_TYPE_APPCTX */
 	/* 3 unused bytes here */
+	unsigned short state;      /* Internal appctx state */
 	unsigned int st0;          /* CLI state for stats, session state for peers */
 	unsigned int st1;          /* prompt for stats, session error for peers */
 	unsigned int st2;          /* output state for stats, unused by peers  */
@@ -59,6 +65,7 @@
 	void (*io_release)(struct appctx *appctx);  /* used within the cli_io_handler when st0 = CLI_ST_CALLBACK,
 	                                               if the command is terminated or the session released */
 	struct buffer_wait buffer_wait; /* position in the list of objects waiting for a buffer */
+	unsigned long process_mask;     /* mask of thread IDs authorized to process the applet */
 
 	union {
 		struct {
diff --git a/src/applet.c b/src/applet.c
index f5bc79d..324dfd3 100644
--- a/src/applet.c
+++ b/src/applet.c
@@ -24,22 +24,22 @@
 unsigned int applets_active_queue = 0;
 
 struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue);
-struct list applet_cur_queue    = LIST_HEAD_INIT(applet_cur_queue);
 
 void applet_run_active()
 {
-	struct appctx *curr;
+	struct appctx *curr, *next;
 	struct stream_interface *si;
+	struct list applet_cur_queue = LIST_HEAD_INIT(applet_cur_queue);
 
-	if (LIST_ISEMPTY(&applet_active_queue))
-		return;
-
-	/* move active queue to run queue */
-	applet_active_queue.n->p = &applet_cur_queue;
-	applet_active_queue.p->n = &applet_cur_queue;
-
-	applet_cur_queue = applet_active_queue;
-	LIST_INIT(&applet_active_queue);
+	curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq);
+	while (&curr->runq != &applet_active_queue) {
+		next = LIST_NEXT(&curr->runq, typeof(next), runq);
+		LIST_DEL(&curr->runq);
+		curr->state = APPLET_RUNNING;
+		LIST_ADDQ(&applet_cur_queue, &curr->runq);
+		applets_active_queue--;
+		curr = next;
+	}
 
 	/* The list is only scanned from the head. This guarantees that if any
 	 * applet removes another one, there is no side effect while walking
@@ -70,7 +70,20 @@
 		if (applet_cur_queue.n == &curr->runq) {
 			/* curr was left in the list, move it back to the active list */
 			LIST_DEL(&curr->runq);
-			LIST_ADDQ(&applet_active_queue, &curr->runq);
+			LIST_INIT(&curr->runq);
+			if (curr->state & APPLET_WANT_DIE) {
+				curr->state = APPLET_SLEEPING;
+				__appctx_free(curr);
+			}
+			else {
+				if (curr->state & APPLET_WOKEN_UP) {
+					curr->state = APPLET_SLEEPING;
+					__appctx_wakeup(curr);
+				}
+				else {
+					curr->state = APPLET_SLEEPING;
+				}
+			}
 		}
 	}
 }
diff --git a/src/stream.c b/src/stream.c
index 3781ac7..4e34f38 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -1061,7 +1061,6 @@
 	/* Stops the applet sheduling, in case of the init function miss
 	 * some data.
 	 */
-	appctx_pause(appctx);
 	si_applet_stop_get(&s->si[1]);
 
 	/* Call initialisation. */
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 47ba8c1..52e2df4 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -1369,16 +1369,6 @@
 
 	/* update the stream-int, channels, and possibly wake the stream up */
 	stream_int_notify(si);
-
-	/* Get away from the active list if we can't work anymore.
-	 * We also do that if the main task has already scheduled, because it
-	 * saves a useless wakeup/pause/wakeup cycle causing one useless call
-	 * per session on average.
-	 */
-	if (task_in_rq(si_task(si)) ||
-	    (((si->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) != SI_FL_WANT_PUT) &&
-	     ((si->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) != SI_FL_WANT_GET)))
-		appctx_pause(si_appctx(si));
 }
 
 
@@ -1393,8 +1383,6 @@
 	if (((si->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) ||
 	    ((si->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET))
 		appctx_wakeup(si_appctx(si));
-	else
-		appctx_pause(si_appctx(si));
 }
 
 /*