MINOR: spoe: Add 'timeout processing' option to limit time to process an event

It is a way to set the maximum time to wait for a stream to process an event,
i.e to acquire a stream to talk with an agent, to encode all messages, to send
the NOTIFY frame, to receive the corrsponding acknowledgement and to process all
actions. It is applied on the stream that handle the client and the server
sessions.
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 12e589e..b3f2ef3 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -190,9 +190,10 @@
 		char         *name;           /* Backend name used during conf parsing */
 	} b;
 	struct {
-		unsigned int  hello;          /* Max time to receive AGENT-HELLO frame */
-		unsigned int  idle;           /* Max Idle timeout */
-		unsigned int  ack;            /* Max time to acknowledge a NOTIFY frame */
+		unsigned int  hello;          /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
+		unsigned int  idle;           /* Max Idle timeout  (in SPOE applet) */
+		unsigned int  ack;            /* Max time to acknowledge a NOTIFY frame  (in SPOE applet)*/
+		unsigned int  processing;     /* Max time to process an event (in the main stream) */
 	} timeout;
 
 	char                 *var_pfx;        /* Prefix used for vars set by the agent */
@@ -232,7 +233,7 @@
 
 	unsigned int        stream_id;    /* stream_id and frame_id are used */
 	unsigned int        frame_id;     /* to map NOTIFY and ACK frames */
-
+	unsigned int        process_exp;  /* expiration date to process an event */
 };
 
 /* Set if the handle on SIGUSR1 is registered */
@@ -1636,6 +1637,9 @@
 {
 	struct spoe_context *ctx;
 
+	if  (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
+		return;
+
 	if (LIST_ISEMPTY(&agent->applet_wq))
 		LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
 	else {
@@ -1787,6 +1791,9 @@
 	/* Reset the flag to allow next processing */
 	ctx->flags &= ~SPOE_CTX_FL_PROCESS;
 
+	/* Reset processing timer */
+	ctx->process_exp = TICK_ETERNITY;
+
 	/* Release the buffer if needed */
 	if (ctx->buffer != &buf_empty) {
 		b_free(&ctx->buffer);
@@ -2084,13 +2091,14 @@
 process_spoe_event(struct stream *s, struct spoe_context *ctx,
 		   enum spoe_event ev)
 {
-	int dir, ret = 1;
+	struct spoe_config *conf = FLT_CONF(ctx->filter);
+	struct spoe_agent  *agent = conf->agent;
+	int                 dir, ret = 1;
 
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 		    " - ctx-state=%s - event=%s\n",
 		    (int)now.tv_sec, (int)now.tv_usec,
-		    ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
-		    __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
+		    agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
 		    spoe_event_str[ev]);
 
 	dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
@@ -2100,8 +2108,25 @@
 
 	if (ctx->state == SPOE_CTX_ST_ERROR)
 		goto error;
+
+	if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
+		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+			    " - failed to process event '%s': timeout\n",
+			    (int)now.tv_sec, (int)now.tv_usec,
+			    agent->id, __FUNCTION__, s, spoe_event_str[ev]);
+		send_log(ctx->strm->be, LOG_WARNING,
+			 "failed to process event '%s': timeout.\n",
+			 spoe_event_str[ev]);
+		goto error;
+	}
 
 	if (ctx->state == SPOE_CTX_ST_READY) {
+		if (!tick_isset(ctx->process_exp)) {
+			ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
+			s->task->expire  = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
+						      ctx->process_exp);
+		}
+
 		ret = acquire_spoe_appctx(ctx, dir);
 		if (ret <= 0) {
 			if (!ret)
@@ -2182,8 +2207,9 @@
 	LIST_INIT(&ctx->buffer_wait);
 	LIST_INIT(&ctx->applet_wait);
 
-	ctx->stream_id  = 0;
-	ctx->frame_id   = 1;
+	ctx->stream_id   = 0;
+	ctx->frame_id    = 1;
+	ctx->process_exp = TICK_ETERNITY;
 
 	return ctx;
 }
@@ -2399,6 +2425,19 @@
 	}
 }
 
+
+/*
+ * Called when the stream is woken up because of expired timer.
+ */
+static void
+spoe_check_timeouts(struct stream *s, struct filter *filter)
+{
+	struct spoe_context *ctx = filter->ctx;
+
+	if (tick_is_expired(ctx->process_exp, now_ms))
+		s->task->state |= TASK_WOKEN_MSG;
+}
+
 /* Called when we are ready to filter data on a channel */
 static int
 spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
@@ -2528,8 +2567,9 @@
 	.check  = spoe_check,
 
 	/* Handle start/stop of SPOE */
-	.attach = spoe_start,
-	.detach = spoe_stop,
+	.attach         = spoe_start,
+	.detach         = spoe_stop,
+	.check_timeouts = spoe_check_timeouts,
 
 	/* Handle channels activity */
 	.channel_start_analyze = spoe_start_analyze,
@@ -2589,6 +2629,7 @@
 		curagent->timeout.hello   = TICK_ETERNITY;
 		curagent->timeout.ack     = TICK_ETERNITY;
 		curagent->timeout.idle    = TICK_ETERNITY;
+		curagent->timeout.processing = TICK_ETERNITY;
 		curagent->var_pfx         = NULL;
 		curagent->new_applets     = 0;
 
@@ -2654,8 +2695,10 @@
 			tv = &curagent->timeout.idle;
 		else if (!strcmp(args[1], "ack"))
 			tv = &curagent->timeout.ack;
+		else if (!strcmp(args[1], "processing"))
+			tv = &curagent->timeout.processing;
 		else {
-			Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' and 'ack' (got %s).\n",
+			Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle', 'ack' or 'processing' (got %s).\n",
 			      file, linenum, args[1]);
 			err_code |= ERR_ALERT | ERR_FATAL;
 			goto out;
@@ -2956,13 +2999,17 @@
 			  curagent->id, curagent->conf.file, curagent->conf.line);
 		goto error;
 	}
-	if (curagent->timeout.hello == TICK_ETERNITY ||
-	    curagent->timeout.idle  == TICK_ETERNITY ||
-	    curagent->timeout.ack   == TICK_ETERNITY) {
+	if (curagent->timeout.hello      == TICK_ETERNITY ||
+	    curagent->timeout.idle       == TICK_ETERNITY ||
+	    curagent->timeout.ack        == TICK_ETERNITY ||
+	    curagent->timeout.processing == TICK_ETERNITY) {
+		if (curagent->timeout.ack == TICK_ETERNITY)
+			curagent->timeout.ack = curagent->timeout.idle;
+
 		Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
 			"   | While not properly invalid, you will certainly encounter various problems\n"
 			"   | with such a configuration. To fix this, please ensure that all following\n"
-			"   | timeouts are set to a non-zero value: 'hello', 'idle', 'ack'.\n",
+			"   | timeouts are set to a non-zero value: 'hello', 'idle', 'ack', 'processing'.\n",
 			px->id, curagent->id, curagent->conf.file, curagent->conf.line);
 	}
 	if (curagent->var_pfx == NULL) {