MAJOR: spoe: Add support of pipelined and asynchronous exchanges with agents

Now, HAProxy and agents can announce the support for "pipelining" and/or "async"
capabilities during the HELLO handshake. For now, HAProxy always announces the
support of both. In addition, in its HELLO frames. HAproxy adds the "engine-id"
key. It is a uniq string that identify a SPOE engine.

The "pipelining" capability is the ability for a peer to decouple NOTIFY and ACK
frames. This is a symmectical capability. To be used, it must be supported by
HAproxy and agents. Unlike HTTP pipelining, the ACK frames can be send in any
order, but always on the same TCP connection used for the corresponding NOTIFY
frame.

The "async" capability is similar to the pipelining, but here any TCP connection
established between HAProxy and the agent can be used to send ACK frames. if an
agent accepts connections from multiple HAProxy, it can use the "engine-id"
value to group TCP connections.
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index f5918dc..17290d2 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -66,6 +66,11 @@
 
 #define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
 
+/* Flags set on the SPOE applet */
+#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */
+#define SPOE_APPCTX_FL_ASYNC      0x00000002 /* Set if asynchronus frames is supported */
+#define SPOE_APPCTX_FL_PERSIST    0x00000004 /* Set if the applet is persistent */
+
 #define SPOE_APPCTX_ERR_NONE    0x00000000 /* no error yet, leave it to zero */
 #define SPOE_APPCTX_ERR_TOUT    0x00000001 /* SPOE applet timeout */
 
@@ -83,6 +88,7 @@
 enum spoe_appctx_state {
 	SPOE_APPCTX_ST_CONNECT = 0,
 	SPOE_APPCTX_ST_CONNECTING,
+	SPOE_APPCTX_ST_IDLE,
 	SPOE_APPCTX_ST_PROCESSING,
 	SPOE_APPCTX_ST_DISCONNECT,
 	SPOE_APPCTX_ST_DISCONNECTING,
@@ -162,15 +168,15 @@
 /* Describe a message that will be sent in a NOTIFY frame. A message has a name,
  * an argument list (see above) and it is linked to a specific event. */
 struct spoe_message {
-	char             *id;      /* SPOE message id */
-	unsigned int      id_len;  /* The message id length */
+	char              *id;      /* SPOE message id */
+	unsigned int       id_len;  /* The message id length */
 	struct spoe_agent *agent;   /* SPOE agent owning this SPOE message */
         struct {
-                char     *file;    /* file where the SPOE message appears */
-                int       line;    /* line where the SPOE message appears */
-        } conf;                    /* config information */
-	struct list       args;    /* Arguments added when the SPOE messages is sent */
-	struct list       list;    /* Used to chain SPOE messages */
+                char      *file;    /* file where the SPOE message appears */
+                int        line;    /* line where the SPOE message appears */
+        } conf;                     /* config information */
+	struct list        args;    /* Arguments added when the SPOE messages is sent */
+	struct list        list;    /* Used to chain SPOE messages */
 
 	enum spoe_event    event;   /* SPOE_EV_* */
 };
@@ -192,21 +198,32 @@
 		unsigned int  processing;     /* Max time to process an event (in the main stream) */
 	} timeout;
 
+	/* Config info */
+	char                 *engine_id;      /* engine-id string */
 	char                 *var_pfx;        /* Prefix used for vars set by the agent */
 	char                 *var_on_error;   /* Variable to set when an error occured, in the TXN scope */
 	unsigned int          flags;          /* SPOE_FL_* */
-	unsigned int          cps_max;        /* Maximum number of connections per second */
-	unsigned int          eps_max;        /* Maximum number of errors per second */
-
-	struct list           cache;          /* List used to cache SPOE streams. In
-					       * fact, we cache the SPOE applect ctx */
+	unsigned int          cps_max;        /* Maximum # of connections per second */
+	unsigned int          eps_max;        /* Maximum # of errors per second */
+	unsigned int          max_frame_size; /* Maximum frame size for this agent, before any negotiation */
+	unsigned int          min_applets;    /* Minimum # applets alive at a time */
+	unsigned int          max_fpa;        /* Maximum # of frames handled per applet at once */
 
 	struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
 					       * for each supported events */
 
+	/* running info */
+	unsigned int          applets_act;    /* # of applets alive at a time */
+	unsigned int          applets_idle;   /* # of applets in the state SPOE_APPCTX_ST_IDLE */
+	unsigned int          sending_rate;   /* the global sending rate */
+
+	struct freq_ctr       conn_per_sec;   /* connections per second */
+	struct freq_ctr       err_per_sec;    /* connetion errors per second */
+
-	struct list        applet_wq;         /* List of streams waiting for a SPOE applet */
-	struct freq_ctr    conn_per_sec;      /* connections per second */
-	struct freq_ctr    err_per_sec;       /* connetion errors per second */
+	struct list           applets;        /* List of available SPOE applets */
+	struct list           sending_queue;  /* Queue of streams waiting to send data */
+	struct list           waiting_queue;  /* Queue of streams waiting for a ack, in async mode */
+
 };
 
 /* SPOE filter configuration */
@@ -221,11 +238,11 @@
 struct spoe_context {
 	struct filter      *filter;       /* The SPOE filter */
 	struct stream      *strm;         /* The stream that should be offloaded */
-	struct appctx      *appctx;       /* The SPOE appctx */
+
 	struct list        *messages;     /* List of messages that will be sent during the stream processing */
 	struct buffer      *buffer;       /* Buffer used to store a NOTIFY or ACK frame */
 	struct buffer_wait  buffer_wait;  /* position in the list of streams waiting for a buffer */
-	struct list         applet_wait;  /* position in the list of streams waiting for a SPOE applet */
+	struct list         list;
 
 	enum spoe_ctx_state state;        /* SPOE_CTX_ST_* */
 	unsigned int        flags;        /* SPOE_CTX_FL_* */
@@ -266,9 +283,9 @@
 
 struct flt_ops spoe_ops;
 
-static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
-static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
-static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
+static int  queue_spoe_context(struct spoe_context *ctx);
+static int  acquire_spoe_buffer(struct spoe_context *ctx);
+static void release_spoe_buffer(struct spoe_context *ctx);
 
 /********************************************************************
  * helper functions/globals
@@ -312,6 +329,7 @@
 	free(agent->id);
 	free(agent->conf.file);
 	free(agent->var_pfx);
+	free(agent->engine_id);
 	free(agent->var_on_error);
 	for (i = 0; i < SPOE_EV_EVENTS; ++i) {
 		list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
@@ -363,6 +381,7 @@
 static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
 	[SPOE_APPCTX_ST_CONNECT]       = "CONNECT",
 	[SPOE_APPCTX_ST_CONNECTING]    = "CONNECTING",
+	[SPOE_APPCTX_ST_IDLE]          = "IDLE",
 	[SPOE_APPCTX_ST_PROCESSING]    = "PROCESSING",
 	[SPOE_APPCTX_ST_DISCONNECT]    = "DISCONNECT",
 	[SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
@@ -371,6 +390,49 @@
 };
 
 #endif
+
+static char *
+generate_pseudo_uuid()
+{
+	static int init = 0;
+
+	const char uuid_fmt[] = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx";
+	const char uuid_chr[] = "0123456789ABCDEF-";
+	char *uuid;
+	int i;
+
+	if ((uuid = calloc(1, sizeof(uuid_fmt))) == NULL)
+		return NULL;
+
+	if (!init) {
+		srand(now_ms);
+		init = 1;
+	}
+
+	for (i = 0; i < sizeof(uuid_fmt)-1; i++) {
+		int r = rand () % 16;
+
+		switch (uuid_fmt[i]) {
+			case 'x' : uuid[i] = uuid_chr[r]; break;
+			case 'y' : uuid[i] = uuid_chr[(r & 0x03) | 0x08]; break;
+			default  : uuid[i] = uuid_fmt[i]; break;
+		}
+	}
+	return uuid;
+}
+
+static inline unsigned int
+min_applets_act(struct spoe_agent *agent)
+{
+	unsigned int nbsrv;
+
+	if (agent->min_applets)
+		return agent->min_applets;
+
+	nbsrv = (agent->b.be->srv_act ? agent->b.be->srv_act : agent->b.be->srv_bck);
+	return 2*nbsrv;
+}
+
 /********************************************************************
  * Functions that encode/decode SPOE frames
  ********************************************************************/
@@ -418,6 +480,7 @@
 #define VERSION_KEY                "version"
 #define MAX_FRAME_SIZE_KEY         "max-frame-size"
 #define CAPABILITIES_KEY           "capabilities"
+#define ENGINE_ID_KEY              "engine-id"
 #define HEALTHCHECK_KEY            "healthcheck"
 #define STATUS_CODE_KEY            "status-code"
 #define MSG_KEY                    "message"
@@ -438,7 +501,8 @@
 #define SUPPORTED_VERSIONS_VAL  "1.0"
 
 /* Comma-separated list of supported capabilities (none for now) */
-#define CAPABILITIES_VAL ""
+//#define CAPABILITIES_VAL ""
+#define CAPABILITIES_VAL "pipelining,async"
 
 static int
 decode_spoe_version(const char *str, size_t len)
@@ -707,11 +771,13 @@
 static int
 prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
 {
+	struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
 	int      idx = 0;
 	size_t   max = (7   /* TYPE + METADATA */
 			+ 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
 			+ 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
-			+ 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
+			+ 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)
+			+ 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36);
 
 	if (size < max)
 		return -1;
@@ -745,6 +811,13 @@
 	frame[idx++] = SPOE_DATA_T_STR;
 	idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
 
+	/* "engine-id" K/V item */
+	if (agent != NULL && agent->engine_id != NULL) {
+		idx += encode_spoe_string(ENGINE_ID_KEY, SLEN(ENGINE_ID_KEY), frame+idx);
+		frame[idx++] = SPOE_DATA_T_STR;
+		idx += encode_spoe_string(agent->engine_id, strlen(agent->engine_id), frame+idx);
+	}
+
 	return idx;
 }
 
@@ -798,10 +871,10 @@
 /* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
  * success, 0 if the frame can be ignored and -1 if an error occurred. */
 static int
-prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
+prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
+			    char *frame, size_t size)
 {
-	struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
-	int                  idx = 0;
+	int idx = 0;
 
 	if (size < APPCTX_SPOE(appctx).max_frame_size)
 		return -1;
@@ -817,6 +890,10 @@
 	idx += encode_spoe_varint(ctx->frame_id, frame+idx);
 
 	/* Copy encoded messages */
+	if (idx + ctx->buffer->i > size)
+		return 0;
+
+	/* Copy encoded messages */
 	memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
 	idx += ctx->buffer->i;
 
@@ -828,7 +905,7 @@
 static int
 handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
 {
-	int    vsn, max_frame_size;
+	int    vsn, max_frame_size, flags;
 	int    i, idx = 0;
 	size_t min_size = (7   /* TYPE + METADATA */
 			   + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
@@ -858,7 +935,7 @@
 	 * "capabilities" */
 
 	/* Loop on K/V items */
-	vsn = max_frame_size = 0;
+	vsn = max_frame_size = flags = 0;
 	while (idx < size) {
 		char     *str;
 		uint64_t  sz;
@@ -921,7 +998,42 @@
 			}
 			max_frame_size = sz;
 		}
+		/* Check "capabilities" K/V item */
+		else if (!memcmp(str, CAPABILITIES_KEY, sz)) {
+			int i;
+
+			/* The value must be a string */
+			if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
+				spoe_status_code = SPOE_FRM_ERR_INVALID;
+				return -1;
+			}
+			idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
+			if (str == NULL)
+				continue;
+
+			i = 0;
+			while (i < sz) {
+				char *delim;
+
+				/* Skip leading spaces */
+				for (; isspace(str[i]) && i < sz; i++);
+
+				if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) {
+					i += 10;
+					if (sz == i || isspace(str[i]) || str[i] == ',')
+						flags |= SPOE_APPCTX_FL_PIPELINING;
+				}
+				else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) {
+					i += 5;
+					if (sz == i || isspace(str[i]) || str[i] == ',')
+						flags |= SPOE_APPCTX_FL_ASYNC;
+				}
+
+				if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
+					break;
+				i = (delim - str) + 1;
+			}
+		}
-		/* Skip "capabilities" K/V item for now */
 		else {
 			/* Silently ignore unknown item */
 			if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
@@ -944,6 +1056,7 @@
 
 	APPCTX_SPOE(appctx).version        = (unsigned int)vsn;
 	APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
+	APPCTX_SPOE(appctx).flags         |= flags;
 	return idx;
 }
 
@@ -1041,14 +1154,15 @@
 }
 
 
-/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
+/* Decode ACK frame sent by an agent. It returns the number of read bytes on
  * success, 0 if the frame can be ignored and -1 if an error occurred. */
 static int
 handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
 {
-	struct spoe_context  *ctx = APPCTX_SPOE(appctx).ctx;
+	struct spoe_agent    *agent = APPCTX_SPOE(appctx).agent;
+	struct spoe_context  *ctx, *back;
 	uint64_t              stream_id, frame_id;
-	int                   idx = 0;
+	int                   i, idx = 0;
 	size_t                min_size = (7  /* TYPE + METADATA */);
 
 	/* Check frame type */
@@ -1064,19 +1178,45 @@
 	idx += 4;
 
 	/* Get the stream-id and the frame-id */
-	idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
-	idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
-
-	/* Check stream-id and frame-id */
-	if (ctx->stream_id != (unsigned int)stream_id ||
-	    ctx->frame_id  != (unsigned int)frame_id)
+	if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1)
+		return 0;
+	idx += i;
+	if ((i= decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1)
 		return 0;
+	idx += i;
+
+	if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC) {
+		list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
+			if (ctx->stream_id == (unsigned int)stream_id &&
+			    ctx->frame_id  == (unsigned int)frame_id)
+				goto found;
+		}
+	}
+	else {
+		list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) {
+			if (ctx->stream_id == (unsigned int)stream_id &&
+			    ctx->frame_id  == (unsigned int)frame_id)
+				goto found;
+		}
+	}
+
+	/* No Stream found, ignore the frame */
+	return 0;
+
+  found:
+	if (acquire_spoe_buffer(ctx) <= 0)
+		return 1; /* Retry later */
 
 	/* Copy encoded actions */
-	b_reset(ctx->buffer);
 	memcpy(ctx->buffer->p, frame+idx, size-idx);
 	ctx->buffer->i = size-idx;
 
+	/* Notify the stream */
+	LIST_DEL(&ctx->list);
+	LIST_INIT(&ctx->list);
+	ctx->state = SPOE_CTX_ST_DONE;
+	task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+
 	return idx;
 }
 
@@ -1093,7 +1233,7 @@
 
 	memset(&a, 0, sizeof(a));
 	memset(buf, 0, sizeof(buf));
-	APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
+	APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4;
 
 	frame = buf+4;
 	idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
@@ -1126,7 +1266,7 @@
 	int           r;
 
 	memset(&a, 0, sizeof(a));
-	APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
+	APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4;
 
 	if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
 		goto error;
@@ -1145,6 +1285,62 @@
 	return -1;
 }
 
+/* Send a SPOE frame to an agent. It returns -1 when an error occurred, 0 when
+ * the frame can be ignored, 1 to retry later, and the frame legnth on
+ * success. */
+static int
+send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
+{
+	struct stream_interface *si = appctx->owner;
+	int                      ret;
+	uint32_t                 netint;
+
+	if (si_ic(si)->buf == &buf_empty)
+		return 1;
+
+	netint = htonl(framesz);
+	memcpy(buf, (char *)&netint, 4);
+	ret = bi_putblk(si_ic(si), buf, framesz+4);
+
+	if (ret <= 0) {
+		if (ret == -1)
+			return 1; /* retry */
+		return -1; /* error */
+	}
+	return framesz;
+}
+
+/* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0
+ * when the frame can be ignored, 1 to retry later and the frame length on
+ * success. */
+static int
+recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
+{
+	struct stream_interface *si = appctx->owner;
+	int                      ret;
+	uint32_t                 netint;
+
+	if (si_oc(si)->buf == &buf_empty)
+		return 1;
+
+	ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0);
+	if (ret > 0) {
+		framesz = ntohl(netint);
+		if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
+			spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
+			return -1;
+		}
+		ret = bo_getblk(si_oc(si), trash.str, framesz, 4);
+	}
+	if (ret <= 0) {
+		if (ret == 0)
+			return 1; /* retry */
+		spoe_status_code = SPOE_FRM_ERR_IO;
+		return -1; /* error */
+	}
+	return framesz;
+}
+
 /********************************************************************
  * Functions that manage the SPOE applet
  ********************************************************************/
@@ -1161,29 +1357,11 @@
 		appctx->st1 = SPOE_APPCTX_ERR_TOUT;
 	}
 	si_applet_want_get(appctx->owner);
+	si_applet_want_put(appctx->owner);
 	appctx_wakeup(appctx);
 	return task;
 }
 
-/* Remove a SPOE applet from the agent cache */
-static void
-remove_spoe_applet_from_cache(struct appctx *appctx)
-{
-	struct appctx     *a, *back;
-	struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
-
-	if (LIST_ISEMPTY(&agent->cache))
-		return;
-
-	list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
-		if (a == appctx) {
-			LIST_DEL(&APPCTX_SPOE(appctx).list);
-			break;
-		}
-	}
-}
-
-
 /* Callback function that releases a SPOE applet. This happens when the
  * connection with the agent is closed. */
 static void
@@ -1191,335 +1369,479 @@
 {
 	struct stream_interface *si    = appctx->owner;
 	struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
-	struct spoe_context     *ctx   = APPCTX_SPOE(appctx).ctx;
+	struct spoe_context     *ctx, *back;
+
+	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
+		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
+		    __FUNCTION__, appctx);
 
-	if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
-	    appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
-		on_new_spoe_appctx_failure(agent);
+	agent->applets_act--;
+	if (!LIST_ISEMPTY(&APPCTX_SPOE(appctx).list)) {
+		LIST_DEL(&APPCTX_SPOE(appctx).list);
+		LIST_INIT(&APPCTX_SPOE(appctx).list);
+	}
 
 	if (appctx->st0 != SPOE_APPCTX_ST_END) {
+		if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
+			agent->applets_idle--;
+
 		si_shutw(si);
 		si_shutr(si);
 		si_ic(si)->flags |= CF_READ_NULL;
 		appctx->st0 = SPOE_APPCTX_ST_END;
 	}
 
+	if (APPCTX_SPOE(appctx).task) {
+		task_delete(APPCTX_SPOE(appctx).task);
+		task_free(APPCTX_SPOE(appctx).task);
+	}
+
-	if (ctx != NULL) {
+	list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) {
+		LIST_DEL(&ctx->list);
+		LIST_INIT(&ctx->list);
+		ctx->state = SPOE_CTX_ST_ERROR;
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-		ctx->appctx = NULL;
 	}
 
-	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
-		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
-		    __FUNCTION__, appctx);
+	if (!LIST_ISEMPTY(&agent->applets))
+		return;
 
-	/* Release the task attached to the SPOE applet */
-	if (APPCTX_SPOE(appctx).task) {
-		task_delete(APPCTX_SPOE(appctx).task);
-		task_free(APPCTX_SPOE(appctx).task);
+	list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
+		LIST_DEL(&ctx->list);
+		LIST_INIT(&ctx->list);
+		ctx->state = SPOE_CTX_ST_ERROR;
+		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	}
 
-	/* And remove it from the agent cache */
-	remove_spoe_applet_from_cache(appctx);
-	APPCTX_SPOE(appctx).ctx = NULL;
+	list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
+		LIST_DEL(&ctx->list);
+		LIST_INIT(&ctx->list);
+		ctx->state = SPOE_CTX_ST_ERROR;
+		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+	}
 }
 
-/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
- * the frame can be ignored, 0 to retry later and 1 on success. The frame is
- * encoded using the callback function <prepare>. */
 static int
-send_spoe_frame(struct appctx *appctx,
-		int (*prepare)(struct appctx *, char *, size_t))
+handle_connect_spoe_applet(struct appctx *appctx)
 {
-	struct stream_interface *si  = appctx->owner;
-	int                      framesz, ret;
-	uint32_t                 netint;
+	struct stream_interface *si    = appctx->owner;
+	struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
+	char *frame = trash.str;
+	int   ret;
 
-	if (si_ic(si)->buf->size == 0)
-		return -1;
+	if (si->state <= SI_ST_CON) {
+		si_applet_want_put(si);
+		task_wakeup(si_strm(si)->task, TASK_WOKEN_MSG);
+		goto stop;
+	}
+	if (si->state != SI_ST_EST)
+		goto exit;
 
-	ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
-	if (ret <= 0)
-		goto skip_or_error;
-	framesz = ret;
-	netint  = htonl(framesz);
-	ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
-	if (ret > 0)
-		ret = bi_putblk(si_ic(si), trash.str, framesz);
-	if (ret <= 0) {
-		if (ret == -1)
-			return -1;
-		return -2;
+	if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
+			    (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
+		goto exit;
 	}
-	return 1;
 
- skip_or_error:
-	if (!ret)
-		return -1;
-	return -2;
+	if (APPCTX_SPOE(appctx).task->expire == TICK_ETERNITY)
+		APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
+
+	ret = prepare_spoe_hahello_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
+	if (ret > 1)
+		ret = send_spoe_frame(appctx, frame, ret);
+
+	switch (ret) {
+		case -1: /* error */
+			goto exit;
+
+		case  0: /* ignore => an error, cannot be ignored */
+			goto exit;
+
+		case  1: /* retry later */
+			si_applet_cant_put(si);
+			goto stop;
+
+		default: /* CONNECT frame successfully sent */
+			appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
+			goto next;
+	}
+
+  next:
+	return 0;
+  stop:
+	return 1;
+  exit:
+	appctx->st0 = SPOE_APPCTX_ST_EXIT;
+	return 0;
 }
 
-/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
- * when the frame can be ignored, 0 to retry later and 1 on success. The frame
- * is decoded using the callback function <handle>. */
 static int
-recv_spoe_frame(struct appctx *appctx,
-		int (*handle)(struct appctx *, char *, size_t))
+handle_connecting_spoe_applet(struct appctx *appctx)
 {
-	struct stream_interface *si  = appctx->owner;
-	int                      framesz, ret;
-	uint32_t                 netint;
+	struct stream_interface *si     = appctx->owner;
+	struct spoe_agent       *agent  = APPCTX_SPOE(appctx).agent;
+	char *frame = trash.str;
+	int   ret, framesz = 0;
 
-	ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
-	if (ret <= 0)
-		goto empty_or_error;
-	framesz = ntohl(netint);
-	if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
-		spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
-		return -2;
-	}
 
-	ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
-	if (ret <= 0)
-		goto empty_or_error;
-	bo_skip(si_oc(si), ret+sizeof(netint));
+	if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+		goto exit;
 
-	/* First check if the received frame is a DISCONNECT frame */
-	ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
-	if (ret != 0) {
-		if (ret > 0) {
-			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-				    " - disconnected by peer (%d): %s\n",
-				    (int)now.tv_sec, (int)now.tv_usec,
-				    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
-				    __FUNCTION__, appctx, spoe_status_code,
-				    spoe_reason);
-			return 2;
+	if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
+			    (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
+		goto exit;
+	}
+
+	ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
+	if (ret > 1) {
+		if (*frame == SPOE_FRM_T_AGENT_DISCON) {
+			appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+			goto next;
 		}
-		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-			    " - error on frame (%s)\n",
-			    (int)now.tv_sec, (int)now.tv_usec,
-			    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
-			    __FUNCTION__, appctx,
-			    spoe_frm_err_reasons[spoe_status_code]);
-		return -2;
+		framesz = ret;
+		ret = handle_spoe_agenthello_frame(appctx, frame, framesz);
 	}
-	if (handle == NULL)
-		goto out;
 
-	/* If not, try to decode it */
-	ret = handle(appctx, trash.str, framesz);
-	if (ret <= 0) {
-		if (!ret)
-			return -1;
-		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-			    " - error on frame (%s)\n",
-			    (int)now.tv_sec, (int)now.tv_usec,
-			    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
-			    __FUNCTION__, appctx,
-			    spoe_frm_err_reasons[spoe_status_code]);
-		return -2;
+	switch (ret) {
+		case -1: /* error */
+			if (framesz)
+				bo_skip(si_oc(si), framesz+4);
+			appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+			goto next;
+
+		case 0: /* ignore */
+			if (framesz)
+				bo_skip(si_oc(si), framesz+4);
+			appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+			goto next;
+
+		case 1: /* retry later */
+			goto stop;
+
+		default:
+			/* hello handshake is finished, set the idle timeout,
+			 * Add the appctx in the agent cache, decrease the
+			 * number of new applets and wake up waiting streams. */
+			if (framesz)
+				bo_skip(si_oc(si), framesz+4);
+			agent->applets_idle++;
+			appctx->st0 = SPOE_APPCTX_ST_IDLE;
+			goto next;
 	}
-  out:
-	return 1;
 
-  empty_or_error:
-	if (!ret)
-		return 0;
-	spoe_status_code = SPOE_FRM_ERR_IO;
-	return -2;
+  next:
+	APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+	return 0;
+  stop:
+	return 1;
+  exit:
+	appctx->st0 = SPOE_APPCTX_ST_EXIT;
+	return 0;
 }
 
-/* I/O Handler processing messages exchanged with the agent */
-static void
-handle_spoe_applet(struct appctx *appctx)
+static int
+handle_processing_spoe_applet(struct appctx *appctx)
 {
 	struct stream_interface *si    = appctx->owner;
-	struct stream           *s     = si_strm(si);
 	struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
-	struct spoe_context     *ctx   = APPCTX_SPOE(appctx).ctx;
-	int                      ret;
+	struct spoe_context     *ctx;
+	char         *frame = trash.str;
+	unsigned int  fpa = 0;
+	int           ret, framesz = 0, skip_sending = 0, skip_receiving = 0;
 
- switchstate:
-	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-		    " - appctx-state=%s\n",
-		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
-		    __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
+	if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+		goto exit;
 
-	switch (appctx->st0) {
-		case SPOE_APPCTX_ST_CONNECT:
-			spoe_status_code = SPOE_FRM_ERR_NONE;
-			if (si->state <= SI_ST_CON) {
-				si_applet_want_put(si);
-				task_wakeup(s->task, TASK_WOKEN_MSG);
-				break;
-			}
-			else if (si->state != SI_ST_EST) {
-				appctx->st0 = SPOE_APPCTX_ST_EXIT;
-				on_new_spoe_appctx_failure(agent);
-				goto switchstate;
-			}
-			ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
-			if (ret < 0) {
-				appctx->st0 = SPOE_APPCTX_ST_EXIT;
-				on_new_spoe_appctx_failure(agent);
-				goto switchstate;
-			}
-			else if (!ret)
-				goto full;
+	if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+		spoe_status_code = SPOE_FRM_ERR_TOUT;
+		appctx->st0      = SPOE_APPCTX_ST_DISCONNECT;
+		appctx->st1      = SPOE_APPCTX_ERR_NONE;
+		goto next;
+	}
 
-			/* Hello frame was sent. Set the hello timeout and
-			 * wait for the reply. */
-			APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
-			appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
-			/* fall through */
+  process:
+	if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
+		goto stop;
 
-		case SPOE_APPCTX_ST_CONNECTING:
-			if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
-				appctx->st0 = SPOE_APPCTX_ST_EXIT;
-				on_new_spoe_appctx_failure(agent);
-				goto switchstate;
-			}
-			if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
-				SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-					    " - Connection timed out\n",
-					    (int)now.tv_sec, (int)now.tv_usec,
-					    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
-					    __FUNCTION__, appctx);
-				appctx->st0 = SPOE_APPCTX_ST_EXIT;
-				on_new_spoe_appctx_failure(agent);
-				goto switchstate;
-			}
-			ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
-			if (ret < 0) {
-				appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
-				on_new_spoe_appctx_failure(agent);
-				goto switchstate;
-			}
-			if (ret == 2) {
-				appctx->st0 = SPOE_APPCTX_ST_EXIT;
-				on_new_spoe_appctx_failure(agent);
-				goto switchstate;
-			}
-			if (!ret)
-				goto out;
+	/* Frames must be handled synchronously and a the applet is waiting for
+	 * a ACK frame */
+	if (!(APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) &&
+	    !LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
+		if (skip_receiving)
+			goto stop;
+		goto recv_frame;
+	}
 
-			/* hello handshake is finished, set the idle timeout,
-			 * Add the appctx in the agent cache, decrease the
-			 * number of new applets and wake up waiting streams. */
-			APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
-			appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
-			on_new_spoe_appctx_success(agent, appctx);
-			break;
+	if (LIST_ISEMPTY(&agent->sending_queue) || skip_sending) {
+		skip_sending = 1;
+		goto recv_frame;
+	}
 
-		case SPOE_APPCTX_ST_PROCESSING:
-			if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
-				appctx->st0 = SPOE_APPCTX_ST_EXIT;
-				goto switchstate;
-			}
-			if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
-				spoe_status_code = SPOE_FRM_ERR_TOUT;
-				appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
-				appctx->st1 = SPOE_APPCTX_ERR_NONE;
-				goto switchstate;
-			}
-			if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
-				ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
-				if (ret < 0) {
-					if (ret == -1) {
-						ctx->state = SPOE_CTX_ST_ERROR;
-						task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-						goto skip_notify_frame;
-					}
-					appctx->st0 = SPOE_APPCTX_ST_EXIT;
-					goto switchstate;
-				}
-				else if (!ret)
-					goto full;
-				ctx->state = SPOE_CTX_ST_WAITING_ACK;
-				APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
-			}
+	ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+	ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
+	if (ret > 1)
+		ret = send_spoe_frame(appctx, frame, ret);
 
-		  skip_notify_frame:
-			if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
-				ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
-				if (ret < 0) {
-					if (ret == -1)
-						goto skip_notify_frame;
-					ctx->state = SPOE_CTX_ST_ERROR;
-					task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-					appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
-					goto switchstate;
-				}
-				if (!ret)
-					goto out;
-				if (ret == 2) {
-					ctx->state = SPOE_CTX_ST_ERROR;
-					task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-					appctx->st0 = SPOE_APPCTX_ST_EXIT;
-					goto switchstate;
-				}
-				ctx->state = SPOE_CTX_ST_DONE;
-				task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-				APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
-			}
-			else {
-				if (stopping) {
-					appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
-					goto switchstate;
-				}
+	switch (ret) {
+		case -1: /* error */
+			appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+			goto next;
 
-				ret = recv_spoe_frame(appctx, NULL);
-				if (ret < 0) {
-					if (ret == -1)
-						goto skip_notify_frame;
-					appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
-					goto switchstate;
-				}
-				if (!ret)
-					goto out;
-				if (ret == 2) {
-					appctx->st0 = SPOE_APPCTX_ST_EXIT;
-					goto switchstate;
-				}
-				APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
-			}
+		case 0: /* ignore */
+			agent->sending_rate++;
+			ctx->state = SPOE_CTX_ST_ERROR;
+			release_spoe_buffer(ctx);
+			task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+			LIST_DEL(&ctx->list);
+			LIST_INIT(&ctx->list);
+			fpa++;
 			break;
 
-		case SPOE_APPCTX_ST_DISCONNECT:
-			ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
-			if (ret < 0) {
-				appctx->st0 = SPOE_APPCTX_ST_EXIT;
-				goto switchstate;
-			}
-			else if (!ret)
-				goto full;
-			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-				    " - disconnected by HAProxy (%d): %s\n",
-				    (int)now.tv_sec, (int)now.tv_usec,
-				    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
-				    __FUNCTION__, appctx, spoe_status_code,
-				    spoe_frm_err_reasons[spoe_status_code]);
+		case 1: /* retry */
+			si_applet_cant_put(si);
+			skip_sending = 1;
+			break;
 
-			APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
-			appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
-			/* fall through */
+		default:
+			agent->sending_rate++;
+			ctx->state = SPOE_CTX_ST_WAITING_ACK;
+			release_spoe_buffer(ctx);
+			LIST_DEL(&ctx->list);
+			LIST_INIT(&ctx->list);
+			if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC)
+				LIST_ADDQ(&agent->waiting_queue, &ctx->list);
+			else
+				LIST_ADDQ(&APPCTX_SPOE(appctx).waiting_queue, &ctx->list);
+			fpa++;
+	}
 
-		case SPOE_APPCTX_ST_DISCONNECTING:
-			if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
-				appctx->st0 = SPOE_APPCTX_ST_EXIT;
-				goto switchstate;
-			}
-			if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
-				appctx->st0 = SPOE_APPCTX_ST_EXIT;
-				goto switchstate;
-			}
-			ret = recv_spoe_frame(appctx, NULL);
-			if (ret < 0 || ret == 2) {
-				appctx->st0 = SPOE_APPCTX_ST_EXIT;
+	if (fpa > agent->max_fpa)
+		goto stop;
+
+  recv_frame:
+	if (skip_receiving)
+		goto process;
+
+	framesz = 0;
+	ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
+	if (ret > 1) {
+		if (*frame == SPOE_FRM_T_AGENT_DISCON) {
+			appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+			goto next;
+		}
+		framesz = ret;
+		ret = handle_spoe_agentack_frame(appctx, frame, framesz);
+	}
+
+	switch (ret) {
+		case -1: /* error */
+			if (framesz)
+				bo_skip(si_oc(si), framesz+4);
+			appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+			goto next;
+
+		case 0: /* ignore */
+			if (framesz)
+				bo_skip(si_oc(si), framesz+4);
+			fpa++;
+			break;
+
+		case 1: /* retry */
+			skip_receiving = 1;
+			break;
+
+		default:
+			if (framesz)
+				bo_skip(si_oc(si), framesz+4);
+			fpa++;
+	}
+	goto process;
+
+  next:
+	APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+	return 0;
+  stop:
+	if ((APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) ||
+	    LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
+		agent->applets_idle++;
+		appctx->st0 = SPOE_APPCTX_ST_IDLE;
+	}
+	if (fpa || (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_PERSIST)) {
+		LIST_DEL(&APPCTX_SPOE(appctx).list);
+		LIST_ADD(&agent->applets, &APPCTX_SPOE(appctx).list);
+		if (fpa)
+			APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+	}
+	return 1;
+
+  exit:
+	appctx->st0 = SPOE_APPCTX_ST_EXIT;
+	return 0;
+}
+
+static int
+handle_disconnect_spoe_applet(struct appctx *appctx)
+{
+	struct stream_interface *si    = appctx->owner;
+	struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
+	char *frame = trash.str;
+	int ret;
+
+	if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+		goto exit;
+
+	if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
+		goto exit;
+
+	ret = prepare_spoe_hadiscon_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
+	if (ret > 1)
+		ret = send_spoe_frame(appctx, frame, ret);
+
+	switch (ret) {
+		case -1: /* error */
+			goto exit;
+
+		case  0: /* ignore */
+			goto exit;
+
+		case 1: /* retry */
+			si_applet_cant_put(si);
+			goto stop;
+
+		default:
+			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+				    " - disconnected by HAProxy (%d): %s\n",
+				    (int)now.tv_sec, (int)now.tv_usec, agent->id,
+				    __FUNCTION__, appctx, spoe_status_code,
+				    spoe_frm_err_reasons[spoe_status_code]);
+
+			appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+			goto next;
+	}
+
+  next:
+	APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+	return 0;
+  stop:
+	return 1;
+  exit:
+	appctx->st0 = SPOE_APPCTX_ST_EXIT;
+	return 0;
+}
+
+static int
+handle_disconnecting_spoe_applet(struct appctx *appctx)
+{
+	struct stream_interface *si = appctx->owner;
+	char *frame = trash.str;
+	int   ret, framesz = 0;
+
+	if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
+		goto exit;
+
+	if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
+		goto exit;
+
+	framesz = 0;
+	ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
+	if (ret > 1) {
+		framesz = ret;
+		ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz);
+	}
+
+	switch (ret) {
+		case -1: /* error  */
+			if (framesz)
+				bo_skip(si_oc(si), framesz+4);
+			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+				    " - error on frame (%s)\n",
+				    (int)now.tv_sec, (int)now.tv_usec,
+				    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
+				    __FUNCTION__, appctx,
+				    spoe_frm_err_reasons[spoe_status_code]);
+			goto exit;
+
+		case  0: /* ignore */
+			if (framesz)
+				bo_skip(si_oc(si), framesz+4);
+			goto next;
+
+		case  1: /* retry */
+			goto stop;
+
+		default:
+			if (framesz)
+				bo_skip(si_oc(si), framesz+4);
+			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+				    " - disconnected by peer (%d): %s\n",
+				    (int)now.tv_sec, (int)now.tv_usec,
+				    ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
+				    __FUNCTION__, appctx, spoe_status_code,
+				    spoe_reason);
+			goto exit;
+	}
+
+  next:
+	return 0;
+  stop:
+	return 1;
+  exit:
+	appctx->st0 = SPOE_APPCTX_ST_EXIT;
+	return 0;
+}
+
+/* I/O Handler processing messages exchanged with the agent */
+static void
+handle_spoe_applet(struct appctx *appctx)
+{
+	struct stream_interface *si    = appctx->owner;
+	struct spoe_agent       *agent = APPCTX_SPOE(appctx).agent;
+
+  switchstate:
+	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+		    " - appctx-state=%s\n",
+		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
+		    __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
+
+	switch (appctx->st0) {
+		case SPOE_APPCTX_ST_CONNECT:
+			spoe_status_code = SPOE_FRM_ERR_NONE;
+			if (handle_connect_spoe_applet(appctx))
+				goto out;
+			goto switchstate;
+
+		case SPOE_APPCTX_ST_CONNECTING:
+			if (handle_connecting_spoe_applet(appctx))
+				goto out;
+			goto switchstate;
+
+		case SPOE_APPCTX_ST_IDLE:
+			if (stopping &&
+			    LIST_ISEMPTY(&agent->sending_queue) &&
+			    LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
+				APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+				appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
 				goto switchstate;
 			}
-			break;
+			agent->applets_idle--;
+			appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+			/* fall through */
+
+		case SPOE_APPCTX_ST_PROCESSING:
+			if (handle_processing_spoe_applet(appctx))
+				goto out;
+			goto switchstate;
+
+		case SPOE_APPCTX_ST_DISCONNECT:
+			if (handle_disconnect_spoe_applet(appctx))
+				goto out;
+			goto switchstate;
+
+		case SPOE_APPCTX_ST_DISCONNECTING:
+			if (handle_disconnecting_spoe_applet(appctx))
+				goto out;
+			goto switchstate;
 
 		case SPOE_APPCTX_ST_EXIT:
 			si_shutw(si);
@@ -1532,16 +1854,11 @@
 		case SPOE_APPCTX_ST_END:
 			return;
 	}
-
- out:
+  out:
 	if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
 		task_queue(APPCTX_SPOE(appctx).task);
 	si_oc(si)->flags |= CF_READ_DONTWAIT;
 	task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
-	return;
- full:
-	si_applet_cant_put(si);
-	goto out;
 }
 
 struct applet spoe_applet = {
@@ -1568,13 +1885,15 @@
 	if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
 		goto out_free_appctx;
 	APPCTX_SPOE(appctx).task->process   = process_spoe_applet;
-	APPCTX_SPOE(appctx).task->expire    = TICK_ETERNITY;
+	APPCTX_SPOE(appctx).task->expire    = TICK_ETERNITY;//tick_add_ifset(now_ms, conf->agent->timeout.hello);
 	APPCTX_SPOE(appctx).task->context   = appctx;
 	APPCTX_SPOE(appctx).agent           = conf->agent;
-	APPCTX_SPOE(appctx).ctx             = NULL;
 	APPCTX_SPOE(appctx).version         = 0;
-	APPCTX_SPOE(appctx).max_frame_size  = global.tune.bufsize;
-	task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
+	APPCTX_SPOE(appctx).max_frame_size  = conf->agent->max_frame_size;
+	APPCTX_SPOE(appctx).flags           = 0;
+
+	LIST_INIT(&APPCTX_SPOE(appctx).list);
+	LIST_INIT(&APPCTX_SPOE(appctx).waiting_queue);
 
 	sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
 	if (!sess)
@@ -1592,10 +1911,6 @@
 	si_applet_cant_get(&strm->si[0]);
 	appctx_wakeup(appctx);
 
-	/* Increase the per-process number of cumulated connections */
-	if (conf->agent->cps_max > 0)
-		update_freq_ctr(&conf->agent->conn_per_sec, 1);
-
 	strm->do_log = NULL;
 	strm->res.flags |= CF_READ_DONTWAIT;
 
@@ -1603,6 +1918,9 @@
 	jobs++;
 	totalconn++;
 
+	task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
+	LIST_ADDQ(&conf->agent->applets, &APPCTX_SPOE(appctx).list);
+	conf->agent->applets_act++;
 	return appctx;
 
 	/* Error unrolling */
@@ -1618,200 +1936,92 @@
 	return NULL;
 }
 
-/* Wake up a SPOE applet attached to a SPOE context. */
-static void
-wakeup_spoe_appctx(struct spoe_context *ctx)
-{
-	if (ctx->appctx == NULL)
-		return;
-	if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
-		si_applet_want_get(ctx->appctx->owner);
-		si_applet_want_put(ctx->appctx->owner);
-		appctx_wakeup(ctx->appctx);
-	}
-}
-
-
-/* Run across the list of pending streams waiting for a SPOE applet and wake the
- * first. */
-static void
-offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
-{
-	struct spoe_context *ctx;
-
-	if  (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
-		return;
-
-	if (LIST_ISEMPTY(&agent->applet_wq))
-		LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
-	else {
-		ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
-		APPCTX_SPOE(appctx).ctx = ctx;
-		ctx->appctx = appctx;
-		LIST_DEL(&ctx->applet_wait);
-		LIST_INIT(&ctx->applet_wait);
-		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-			    " - wake up stream to get available SPOE applet\n",
-			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
-			    __FUNCTION__, ctx->strm);
-	}
-}
-
-/* A failure occurred during SPOE applet creation. */
-static void
-on_new_spoe_appctx_failure(struct spoe_agent *agent)
-{
-	struct spoe_context *ctx;
-
-	list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
-		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
-		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-			    " - wake up stream because to SPOE applet connection failed\n",
-			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
-			    __FUNCTION__, ctx->strm);
-	}
-}
-
-static void
-on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
-{
-	offer_spoe_appctx(agent, appctx);
-}
-/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
- * returns 1 on success, 0 to retry later and -1 if an error occurred. */
 static int
-acquire_spoe_appctx(struct spoe_context *ctx, int dir)
+queue_spoe_context(struct spoe_context *ctx)
 {
 	struct spoe_config *conf = FLT_CONF(ctx->filter);
 	struct spoe_agent  *agent = conf->agent;
 	struct appctx      *appctx;
+	unsigned int        min_applets;
 
-	/* If a process is already started for this SPOE context, retry
-	 * later. */
-	if (ctx->flags & SPOE_CTX_FL_PROCESS)
-		goto wait;
-
-	/* If needed, initialize the buffer that will be used to encode messages
-	 * and decode actions. */
-	if (ctx->buffer == &buf_empty) {
-		if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
-			LIST_DEL(&ctx->buffer_wait.list);
-			LIST_INIT(&ctx->buffer_wait.list);
-		}
+	min_applets = min_applets_act(agent);
 
-		if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
-			LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
-			goto wait;
-		}
-	}
-
-	/* If the SPOE applet was already set, all is done. */
-	if (ctx->appctx)
-		goto success;
-
-	/* Else try to retrieve it from the agent cache */
-	if (!LIST_ISEMPTY(&agent->cache)) {
-		appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
-		LIST_DEL(&APPCTX_SPOE(appctx).list);
-		APPCTX_SPOE(appctx).ctx = ctx;
-		ctx->appctx = appctx;
-		goto success;
-	}
-
-	/* If there is no server up for the agent's backend, this is an
-	 * error. */
-	if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
-		goto error;
+	/* Check if we need to create a new SPOE applet or not. */
+	if (agent->applets_act >= min_applets && agent->applets_idle && agent->sending_rate)
+		goto end;
 
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-		    " - waiting for available SPOE appctx\n",
+		    " - try to create new SPOE appctx\n",
 		    (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
 		    ctx->strm);
 
-	/* Else add the stream in the waiting queue. */
-	if (LIST_ISEMPTY(&ctx->applet_wait))
-		LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
+	/* Do not try to create a new applet if there is no server up for the
+	 * agent's backend. */
+	if (!agent->b.be->srv_act && !agent->b.be->srv_bck) {
+		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+			    " - cannot create SPOE appctx: no server up\n",
+			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
+			    __FUNCTION__, ctx->strm);
+		goto end;
+	}
 
-	/* Finally, create new SPOE applet if we can */
+	/* Do not try to create a new applet if we have reached the maximum of
+	 * connection per seconds */
 	if (agent->cps_max > 0) {
-		if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
-			goto wait;
+		if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) {
+			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+				    " - cannot create SPOE appctx: max CPS reached\n",
+				    (int)now.tv_sec, (int)now.tv_usec, agent->id,
+				    __FUNCTION__, ctx->strm);
+			goto end;
+		}
 	}
-	if (create_spoe_appctx(conf) == NULL)
-		goto error;
 
-  wait:
-	return 0;
-
-  success:
-	/* Remove the stream from the waiting queue */
-	if (!LIST_ISEMPTY(&ctx->applet_wait)) {
-		LIST_DEL(&ctx->applet_wait);
-		LIST_INIT(&ctx->applet_wait);
+	appctx = create_spoe_appctx(conf);
+	if (appctx == NULL) {
+		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+			    " - failed to create SPOE appctx\n",
+			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
+			    __FUNCTION__, ctx->strm);
+		goto end;
 	}
+	if (agent->applets_act <= min_applets)
+		APPCTX_SPOE(appctx).flags |= SPOE_APPCTX_FL_PERSIST;
 
-	/* Set the right flag to prevent request and response processing
-	 * in same time. */
-	ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
-		       ? SPOE_CTX_FL_REQ_PROCESS
-		       : SPOE_CTX_FL_RSP_PROCESS);
+	/* Increase the per-process number of cumulated connections */
+	if (agent->cps_max > 0)
+		update_freq_ctr(&agent->conn_per_sec, 1);
 
-	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-		    " - acquire SPOE appctx %p from cache\n",
-		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
-		    __FUNCTION__, ctx->strm, ctx->appctx);
-	return 1;
+  end:
+	/* The only reason to return an error is when there is no applet */
+	if (LIST_ISEMPTY(&agent->applets))
+		return 0;
 
-  error:
-	/* Remove the stream from the waiting queue */
-	if (!LIST_ISEMPTY(&ctx->applet_wait)) {
-		LIST_DEL(&ctx->applet_wait);
-		LIST_INIT(&ctx->applet_wait);
-	}
+	/* Add the SPOE context in the sending queue and update all running
+	 * info */
+	LIST_ADDQ(&agent->sending_queue, &ctx->list);
+	if (agent->sending_rate)
+		agent->sending_rate--;
 
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-		    " - failed to acquire SPOE appctx\n",
-		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
-		    __FUNCTION__, ctx->strm);
-	send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
-
-	return -1;
-}
-
-/* Release a SPOE applet and push it in the agent cache. */
-static void
-release_spoe_appctx(struct spoe_context *ctx)
-{
-	struct spoe_config *conf = FLT_CONF(ctx->filter);
-	struct spoe_agent  *agent = conf->agent;
-	struct appctx      *appctx = ctx->appctx;
-
-	/* Reset the flag to allow next processing */
-	ctx->flags &= ~SPOE_CTX_FL_PROCESS;
-
-	/* Reset processing timer */
-	ctx->process_exp = TICK_ETERNITY;
+		    " - Add stream in sending queue - applets_act=%u - applets_idle=%u"
+		    " - sending_rate=%u\n",
+		    (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
+		    ctx->strm, agent->applets_act, agent->applets_idle, agent->sending_rate);
 
-	/* Release the buffer if needed */
-	if (ctx->buffer != &buf_empty) {
-		b_free(&ctx->buffer);
-		offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+	/* Finally try to wakeup the first IDLE applet found and move it at the
+	 * end of the list. */
+	list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) {
+		if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
+			si_applet_want_get(appctx->owner);
+			si_applet_want_put(appctx->owner);
+			appctx_wakeup(appctx);
+			LIST_DEL(&APPCTX_SPOE(appctx).list);
+			LIST_ADDQ(&agent->applets, &APPCTX_SPOE(appctx).list);
+			break;
+		}
 	}
-
-	/* If there is no SPOE applet, all is done */
-	if (!appctx)
-		return;
-
-	/* Else, reassign it or push it in the agent cache */
-	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-		    " - release SPOE appctx %p\n",
-		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
-		    __FUNCTION__, ctx->strm, appctx);
-
-	APPCTX_SPOE(appctx).ctx = NULL;
-	ctx->appctx = NULL;
-	offer_spoe_appctx(agent, appctx);
+	return 1;
 }
 
 /***************************************************************************
@@ -1824,6 +2034,8 @@
 process_spoe_messages(struct stream *s, struct spoe_context *ctx,
 		      struct list *messages, int dir)
 {
+	struct spoe_config  *conf = FLT_CONF(ctx->filter);
+	struct spoe_agent   *agent = conf->agent;
 	struct spoe_message *msg;
 	struct sample       *smp;
 	struct spoe_arg     *arg;
@@ -1832,9 +2044,8 @@
 	int     off, flag, idx = 0;
 
 	/* Reserve 32 bytes from the frame Metadata */
-	max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
+	max_size = agent->max_frame_size - 32;
 
-	b_reset(ctx->buffer);
 	p = ctx->buffer->p;
 
 	/* Loop on messages */
@@ -1937,7 +2148,6 @@
 	return 1;
 
   skip:
-	b_reset(ctx->buffer);
 	return 0;
 }
 
@@ -2081,6 +2291,47 @@
 	return 0;
 }
 
+static int
+start_event_processing(struct spoe_context *ctx, int dir)
+{
+	int ret;
+	/* If a process is already started for this SPOE context, retry
+	 * later. */
+	if (ctx->flags & SPOE_CTX_FL_PROCESS)
+		goto wait;
+
+	ret = acquire_spoe_buffer(ctx);
+	if (ret <= 0)
+		return ret;
+
+	/* Set the right flag to prevent request and response processing
+	 * in same time. */
+	ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
+		       ? SPOE_CTX_FL_REQ_PROCESS
+		       : SPOE_CTX_FL_RSP_PROCESS);
+
+	return 1;
+
+  wait:
+	return 0;
+}
+
+static void
+stop_event_processing(struct spoe_context *ctx)
+{
+	/* Reset the flag to allow next processing */
+	ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+
+	/* Reset processing timer */
+	ctx->process_exp = TICK_ETERNITY;
+
+	release_spoe_buffer(ctx);
+
+	if (!LIST_ISEMPTY(&ctx->list)) {
+		LIST_DEL(&ctx->list);
+		LIST_INIT(&ctx->list);
+	}
+}
 
 /* Process a SPOE event. First, this functions will process messages attached to
  * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
@@ -2101,15 +2352,6 @@
 		    agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
 		    spoe_event_str[ev]);
 
-	if (agent->eps_max > 0) {
-		if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
-			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-				    " - skip event '%s': max EPS reached\n",
-				    (int)now.tv_sec, (int)now.tv_usec,
-				    agent->id, __FUNCTION__, s, spoe_event_str[ev]);
-			goto skip;
-		}
-	}
 
 	dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
 
@@ -2131,38 +2373,43 @@
 	}
 
 	if (ctx->state == SPOE_CTX_ST_READY) {
+		if (agent->eps_max > 0) {
+			if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
+				SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+					    " - skip event '%s': max EPS reached\n",
+					    (int)now.tv_sec, (int)now.tv_usec,
+					    agent->id, __FUNCTION__, s, spoe_event_str[ev]);
+				goto skip;
+			}
+		}
+
 		if (!tick_isset(ctx->process_exp)) {
 			ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
 			s->task->expire  = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
 						      ctx->process_exp);
 		}
-
-		ret = acquire_spoe_appctx(ctx, dir);
+		ret = start_event_processing(ctx, dir);
 		if (ret <= 0) {
 			if (!ret)
 				goto out;
 			goto error;
 		}
-		ctx->state = SPOE_CTX_ST_SENDING_MSGS;
-	}
-
-	if (ctx->appctx == NULL)
-		goto error;
-
-	if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
 		ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
 		if (ret <= 0) {
 			if (!ret)
 				goto skip;
 			goto error;
 		}
-		wakeup_spoe_appctx(ctx);
-		ret = 0;
-		goto out;
+
+		if (!queue_spoe_context(ctx))
+			goto error;
+
+		ctx->state = SPOE_CTX_ST_SENDING_MSGS;
+		/* fall through */
 	}
 
-	if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
-		wakeup_spoe_appctx(ctx);
+	if (ctx->state == SPOE_CTX_ST_SENDING_MSGS ||
+	    ctx->state == SPOE_CTX_ST_WAITING_ACK) {
 		ret = 0;
 		goto out;
 	}
@@ -2175,18 +2422,13 @@
 			goto error;
 		}
 		ctx->frame_id++;
-		release_spoe_appctx(ctx);
 		ctx->state = SPOE_CTX_ST_READY;
+		goto end;
 	}
 
   out:
 	return ret;
 
-  skip:
-	release_spoe_appctx(ctx);
-	ctx->state = SPOE_CTX_ST_READY;
-	return 1;
-
   error:
 	if (agent->eps_max > 0)
 		update_freq_ctr(&agent->err_per_sec, 1);
@@ -2194,6 +2436,7 @@
 	if (agent->var_on_error) {
 		struct sample smp;
 
+		// FIXME: Get the error code here
 		memset(&smp, 0, sizeof(smp));
 		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
 		smp.data.u.sint = 1;
@@ -2203,17 +2446,57 @@
 			     strlen(agent->var_on_error), &smp);
 	}
 
-	release_spoe_appctx(ctx);
 	ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
 		      ? SPOE_CTX_ST_READY
 		      : SPOE_CTX_ST_ERROR);
-	return 1;
-}
+	ret = 1;
+	goto end;
+
+  skip:
+	ctx->state = SPOE_CTX_ST_READY;
+	ret = 1;
 
+  end:
+	stop_event_processing(ctx);
+	return ret;
+}
 
 /***************************************************************************
  * Functions that create/destroy SPOE contexts
  **************************************************************************/
+static int
+acquire_spoe_buffer(struct spoe_context *ctx)
+{
+	if (ctx->buffer != &buf_empty)
+		return 1;
+
+	if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
+		LIST_DEL(&ctx->buffer_wait.list);
+		LIST_INIT(&ctx->buffer_wait.list);
+	}
+
+	if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs))
+		return 1;
+
+	LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
+	return 0;
+}
+
+static void
+release_spoe_buffer(struct spoe_context *ctx)
+{
+	if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
+		LIST_DEL(&ctx->buffer_wait.list);
+		LIST_INIT(&ctx->buffer_wait.list);
+	}
+
+	/* Release the buffer if needed */
+	if (ctx->buffer != &buf_empty) {
+		b_free(&ctx->buffer);
+		offer_buffers(ctx, tasks_run_queue + applets_active_queue);
+	}
+}
+
 static int wakeup_spoe_context(struct spoe_context *ctx)
 {
 	task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
@@ -2239,7 +2522,7 @@
 	LIST_INIT(&ctx->buffer_wait.list);
 	ctx->buffer_wait.target = ctx;
 	ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
-	LIST_INIT(&ctx->applet_wait);
+	LIST_INIT(&ctx->list);
 
 	ctx->stream_id   = 0;
 	ctx->frame_id    = 1;
@@ -2254,12 +2537,10 @@
 	if (!ctx)
 		return;
 
-	if (ctx->appctx)
-		APPCTX_SPOE(ctx->appctx).ctx = NULL;
 	if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
 		LIST_DEL(&ctx->buffer_wait.list);
-	if (!LIST_ISEMPTY(&ctx->applet_wait))
-		LIST_DEL(&ctx->applet_wait);
+	if (!LIST_ISEMPTY(&ctx->list))
+		LIST_DEL(&ctx->list);
 	pool_free2(pool2_spoe_ctx, ctx);
 }
 
@@ -2295,7 +2576,7 @@
 			conf  = fconf->conf;
 			agent = conf->agent;
 
-			list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
+			list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) {
 				si_applet_want_get(appctx->owner);
 				si_applet_want_put(appctx->owner);
 				appctx_wakeup(appctx);
@@ -2437,17 +2718,11 @@
 static void
 spoe_stop(struct stream *s, struct filter *filter)
 {
-	struct spoe_context *ctx = filter->ctx;
-
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
 		    (int)now.tv_sec, (int)now.tv_usec,
 		    ((struct spoe_config *)FLT_CONF(filter))->agent->id,
 		    __FUNCTION__, s);
-
-	if (ctx) {
-		release_spoe_appctx(ctx);
-		destroy_spoe_context(ctx);
-	}
+	destroy_spoe_context(filter->ctx);
 }
 
 
@@ -2461,10 +2736,7 @@
 
 	if (tick_is_expired(ctx->process_exp, now_ms)) {
 		s->pending_events |= TASK_WOKEN_MSG;
-		if (ctx->buffer != &buf_empty) {
-			b_free(&ctx->buffer);
-			offer_buffers(ctx, tasks_run_queue + applets_active_queue);
-		}
+		release_spoe_buffer(ctx);
 	}
 }
 
@@ -2511,13 +2783,13 @@
 				goto out;
 		}
 		ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
+		if (!ret) {
+			channel_dont_read(chn);
+			channel_dont_close(chn);
+		}
 	}
 
   out:
-	if (!ret) {
-                channel_dont_read(chn);
-                channel_dont_close(chn);
-	}
 	return ret;
 }
 
@@ -2654,21 +2926,34 @@
 		}
 
 		curagent->id              = strdup(args[1]);
+
 		curagent->conf.file       = strdup(file);
 		curagent->conf.line       = linenum;
-		curagent->timeout.hello   = TICK_ETERNITY;
-		curagent->timeout.idle    = TICK_ETERNITY;
+
+		curagent->timeout.hello      = TICK_ETERNITY;
+		curagent->timeout.idle       = TICK_ETERNITY;
 		curagent->timeout.processing = TICK_ETERNITY;
-		curagent->var_pfx         = NULL;
-		curagent->var_on_error    = NULL;
-		curagent->flags           = 0;
-		curagent->cps_max         = 0;
-		curagent->eps_max         = 0;
+
+		curagent->engine_id      = NULL;
+		curagent->var_pfx        = NULL;
+		curagent->var_on_error   = NULL;
+		curagent->flags          = 0;
+		curagent->cps_max        = 0;
+		curagent->eps_max        = 0;
+		curagent->max_frame_size = global.tune.bufsize - 4;
+		curagent->min_applets    = 0;
+		curagent->max_fpa        = 100;
 
 		for (i = 0; i < SPOE_EV_EVENTS; ++i)
 			LIST_INIT(&curagent->messages[i]);
-		LIST_INIT(&curagent->cache);
-		LIST_INIT(&curagent->applet_wq);
+
+		curagent->applets_act  = 0;
+		curagent->applets_idle = 0;
+		curagent->sending_rate = 0;
+
+		LIST_INIT(&curagent->applets);
+		LIST_INIT(&curagent->sending_queue);
+		LIST_INIT(&curagent->waiting_queue);
 	}
 	else if (!strcmp(args[0], "use-backend")) {
 		if (!*args[1]) {
@@ -3114,6 +3399,8 @@
 		}
 		curagent->var_pfx = strdup(curagent->id);
 	}
+	if (curagent->engine_id == NULL)
+		curagent->engine_id = generate_pseudo_uuid();
 
 	if (LIST_ISEMPTY(&curmps)) {
 		Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",