MINOR: spoe: Add counters to log info about SPOE agents

In addition to metrics about time spent in the SPOE, following counters have
been added:

  * applets : number of SPOE applets.
  * idles : number of idle applets.
  * nb_sending : number of streams waiting to send data.
  * nb_waiting : number of streams waiting for a ack.
  * nb_processed : number of events/groups processed by the SPOE (from the
                   stream point of view).
  * nb_errors : number of errors during the processing (from the stream point of
                view).

Log messages has been updated to report these counters. Following pattern has
been added at the end of the log message:

    ... <idles>/<applets> <nb_sending>/<nb_waiting> <nb_error>/<nb_processed>
diff --git a/doc/SPOE.txt b/doc/SPOE.txt
index 016590e..756988f 100644
--- a/doc/SPOE.txt
+++ b/doc/SPOE.txt
@@ -1200,7 +1200,8 @@
 The messages are logged using the agent's logger, if defined, and use the
 following format:
 
-    SPOE: [AGENT] <TYPE:NAME> sid=STREAM-ID st=STATUC-CODE reqT/qT/wT/resT/pT
+    SPOE: [AGENT] <TYPE:NAME> sid=STREAM-ID st=STATUC-CODE reqT/qT/wT/resT/pT \
+    <idles>/<applets> <nb_sending>/<nb_waiting> <nb_error>/<nb_processed>
 
       AGENT              is the agent name
       TYPE               is EVENT of GROUP
@@ -1221,6 +1222,14 @@
                  point of view, it is the latency added by the SPOE processing.
                  It is more or less the sum of values above.
 
+      <idle>             is the numbers of idle SPOE applets
+      <applets>          is the numbers of SPOE applets
+      <nb_sending>       is the numbers of streams waiting to send data
+      <nb_waiting>       is the numbers of streams waiting for a ack
+      <nb_error>         is the numbers of processing errors
+      <nb_processed>     is the numbers of events/groups processed
+
+
 For all these time events, -1 means the processing was interrupted before the
 end. So -1 for the queue time means the request was never dequeued. For
 fragmented frames it is harder to know when the interruption happened.
diff --git a/include/types/spoe.h b/include/types/spoe.h
index b7d54f3..c01a03e 100644
--- a/include/types/spoe.h
+++ b/include/types/spoe.h
@@ -282,6 +282,14 @@
 		__decl_hathreads(HA_SPINLOCK_T lock);
 	} *rt;
 
+	struct {
+		unsigned int applets;            /* # of SPOE applets */
+		unsigned int idles;              /* # of idle applets */
+		unsigned int nb_sending;         /* # of streams waiting to send data */
+		unsigned int nb_waiting;         /* # of streams waiting for a ack */
+		unsigned long long nb_processed; /* # of frames processed by the SPOE */
+		unsigned long long nb_errors;    /* # of errors during the processing */
+	} counters;
 };
 
 /* SPOE filter configuration */
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 3b425cc..7c019c4 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1233,6 +1233,7 @@
 
 	/* Remove applet from the list of running applets */
 	SPOE_DEBUG_STMT(agent->rt[tid].applets_act--);
+	HA_ATOMIC_SUB(&agent->counters.applets, 1);
 	HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 	if (!LIST_ISEMPTY(&spoe_appctx->list)) {
 		LIST_DEL(&spoe_appctx->list);
@@ -1245,6 +1246,7 @@
 		if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
 			eb32_delete(&spoe_appctx->node);
 			SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
+			HA_ATOMIC_SUB(&agent->counters.idles, 1);
 		}
 
 		appctx->st0 = SPOE_APPCTX_ST_END;
@@ -1266,6 +1268,7 @@
 	list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
+		HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
 		spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
@@ -1294,6 +1297,7 @@
 	list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
+		HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
 		spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
@@ -1302,6 +1306,7 @@
 	list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
+		HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
 		spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
@@ -1426,6 +1431,7 @@
 			/* HELLO handshake is finished, set the idle timeout and
 			 * add the applet in the list of running applets. */
 			SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
+			HA_ATOMIC_ADD(&agent->counters.idles, 1);
 			appctx->st0 = SPOE_APPCTX_ST_IDLE;
 			SPOE_APPCTX(appctx)->node.key = 0;
 			eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
@@ -1495,6 +1501,7 @@
 			spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
+			HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
 			spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 			ctx->spoe_appctx = NULL;
 			ctx->state = SPOE_CTX_ST_ERROR;
@@ -1514,6 +1521,7 @@
 			spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
+			HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
 			spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 			ctx->spoe_appctx = SPOE_APPCTX(appctx);
 			if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
@@ -1548,6 +1556,7 @@
 		*skip = 1;
 		LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
 	}
+	HA_ATOMIC_ADD(&agent->counters.nb_waiting, 1);
 	ctx->stats.tv_wait = now;
 	SPOE_APPCTX(appctx)->frag_ctx.ctx    = NULL;
 	SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
@@ -1571,6 +1580,7 @@
 static int
 spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
 {
+	struct spoe_agent   *agent = SPOE_APPCTX(appctx)->agent;
 	struct spoe_context *ctx = NULL;
 	char *frame;
 	int   ret;
@@ -1602,6 +1612,7 @@
 		default:
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
+			HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
 			spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 			ctx->stats.tv_response = now;
 			if (ctx->spoe_appctx) {
@@ -1708,6 +1719,7 @@
 
 	if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
 		SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
+		HA_ATOMIC_ADD(&agent->counters.idles, 1);
 		appctx->st0 = SPOE_APPCTX_ST_IDLE;
 		eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
 	}
@@ -1871,6 +1883,7 @@
 
 		case SPOE_APPCTX_ST_IDLE:
 			SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
+			HA_ATOMIC_SUB(&agent->counters.idles, 1);
 			eb32_delete(&SPOE_APPCTX(appctx)->node);
 			if (stopping &&
 			    LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
@@ -1988,6 +2001,7 @@
 	LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
 	HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
 	SPOE_DEBUG_STMT(conf->agent->rt[tid].applets_act++);
+	HA_ATOMIC_ADD(&conf->agent->counters.applets, 1);
 
 	task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
 	task_wakeup(strm->task, TASK_WOKEN_INIT);
@@ -2072,6 +2086,7 @@
 
 	/* Add the SPOE context in the sending queue */
 	LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
+	HA_ATOMIC_ADD(&agent->counters.nb_sending, 1);
 	spoe_update_stat_time(&ctx->stats.tv_request, &ctx->stats.t_request);
 	ctx->stats.tv_queue = now;
 
@@ -2459,6 +2474,68 @@
 /***************************************************************************
  * Functions that process SPOE events
  **************************************************************************/
+static void
+spoe_update_stats(struct stream *s, struct spoe_agent *agent,
+		  struct spoe_context *ctx, int dir)
+{
+	if (!tv_iszero(&ctx->stats.tv_start)) {
+		spoe_update_stat_time(&ctx->stats.tv_start, &ctx->stats.t_process);
+		ctx->stats.t_total  += ctx->stats.t_process;
+		tv_zero(&ctx->stats.tv_request);
+		tv_zero(&ctx->stats.tv_queue);
+		tv_zero(&ctx->stats.tv_wait);
+		tv_zero(&ctx->stats.tv_response);
+	}
+
+	if (agent->var_t_process) {
+		struct sample smp;
+
+		memset(&smp, 0, sizeof(smp));
+		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+		smp.data.u.sint = ctx->stats.t_process;
+		smp.data.type   = SMP_T_SINT;
+
+		spoe_set_var(ctx, "txn", agent->var_t_process,
+			     strlen(agent->var_t_process), &smp);
+	}
+
+	if (agent->var_t_total) {
+		struct sample smp;
+
+		memset(&smp, 0, sizeof(smp));
+		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+		smp.data.u.sint = ctx->stats.t_total;
+		smp.data.type   = SMP_T_SINT;
+
+		spoe_set_var(ctx, "txn", agent->var_t_total,
+			     strlen(agent->var_t_total), &smp);
+	}
+}
+
+static void
+spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
+			     struct spoe_context *ctx, int dir)
+{
+	if (agent->eps_max > 0)
+		update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
+
+	if (agent->var_on_error) {
+		struct sample smp;
+
+		memset(&smp, 0, sizeof(smp));
+		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+		smp.data.u.sint = ctx->status_code;
+		smp.data.type   = SMP_T_BOOL;
+
+		spoe_set_var(ctx, "txn", agent->var_on_error,
+			     strlen(agent->var_on_error), &smp);
+	}
+
+	ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
+		      ? SPOE_CTX_ST_READY
+		      : SPOE_CTX_ST_NONE);
+}
+
 static inline int
 spoe_start_processing(struct spoe_agent *agent, struct spoe_context *ctx, int dir)
 {
@@ -2493,7 +2570,7 @@
 
 	if (!(ctx->flags & SPOE_CTX_FL_PROCESS))
 		return;
-
+	HA_ATOMIC_ADD(&agent->counters.nb_processed, 1);
 	if (sa) {
 		if (sa->frag_ctx.ctx == ctx) {
 			sa->frag_ctx.ctx = NULL;
@@ -2519,71 +2596,14 @@
 	ctx->frag_ctx.flags       = 0;
 
 	if (!LIST_ISEMPTY(&ctx->list)) {
+		if (ctx->state == SPOE_CTX_ST_SENDING_MSGS)
+			HA_ATOMIC_ADD(&agent->counters.nb_sending, 1);
+		else
+			HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
+
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
 	}
-}
-
-static void
-spoe_update_stats(struct stream *s, struct spoe_agent *agent,
-		  struct spoe_context *ctx, int dir)
-{
-	if (!tv_iszero(&ctx->stats.tv_start)) {
-		spoe_update_stat_time(&ctx->stats.tv_start, &ctx->stats.t_process);
-		ctx->stats.t_total  += ctx->stats.t_process;
-		tv_zero(&ctx->stats.tv_request);
-		tv_zero(&ctx->stats.tv_queue);
-		tv_zero(&ctx->stats.tv_wait);
-		tv_zero(&ctx->stats.tv_response);
-	}
-
-	if (agent->var_t_process) {
-		struct sample smp;
-
-		memset(&smp, 0, sizeof(smp));
-		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
-		smp.data.u.sint = ctx->stats.t_process;
-		smp.data.type   = SMP_T_SINT;
-
-		spoe_set_var(ctx, "txn", agent->var_t_process,
-			     strlen(agent->var_t_process), &smp);
-	}
-
-	if (agent->var_t_total) {
-		struct sample smp;
-
-		memset(&smp, 0, sizeof(smp));
-		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
-		smp.data.u.sint = ctx->stats.t_total;
-		smp.data.type   = SMP_T_SINT;
-
-		spoe_set_var(ctx, "txn", agent->var_t_total,
-			     strlen(agent->var_t_total), &smp);
-	}
-}
-
-static void
-spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
-			     struct spoe_context *ctx, int dir)
-{
-	if (agent->eps_max > 0)
-		update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
-
-	if (agent->var_on_error) {
-		struct sample smp;
-
-		memset(&smp, 0, sizeof(smp));
-		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
-		smp.data.u.sint = ctx->status_code;
-		smp.data.type   = SMP_T_BOOL;
-
-		spoe_set_var(ctx, "txn", agent->var_on_error,
-			     strlen(agent->var_on_error), &smp);
-	}
-
-	ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
-		      ? SPOE_CTX_ST_READY
-		      : SPOE_CTX_ST_NONE);
 }
 
 /* Process a list of SPOE messages. First, this functions will process messages
@@ -2600,7 +2620,7 @@
 	int                 ret = 1;
 
 	if (ctx->state == SPOE_CTX_ST_ERROR)
-		goto error;
+		goto end;
 
 	if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
@@ -2608,7 +2628,7 @@
 			    (int)now.tv_sec, (int)now.tv_usec,
 			    agent->id, __FUNCTION__, s);
 		ctx->status_code = SPOE_CTX_ERR_TOUT;
-		goto error;
+		goto end;
 	}
 
 	if (ctx->state == SPOE_CTX_ST_READY) {
@@ -2642,11 +2662,11 @@
 			goto out;
 		ret = spoe_encode_messages(s, ctx, messages, dir, type);
 		if (ret < 0)
-			goto error;
+			goto end;
 		if (!ret)
 			goto skip;
 		if (spoe_queue_context(ctx) < 0)
-			goto error;
+			goto end;
 		ctx->state = SPOE_CTX_ST_SENDING_MSGS;
 	}
 
@@ -2674,19 +2694,20 @@
   out:
 	return ret;
 
-  error:
-	spoe_handle_processing_error(s, agent, ctx, dir);
-	ret = 1;
-	goto end;
-
   skip:
 	tv_zero(&ctx->stats.tv_start);
 	ctx->state = SPOE_CTX_ST_READY;
-	ret = 1;
+	spoe_stop_processing(agent, ctx);
+	return 1;
 
   end:
 	spoe_update_stats(s, agent, ctx, dir);
 	spoe_stop_processing(agent, ctx);
+	if (ctx->status_code) {
+		HA_ATOMIC_ADD(&agent->counters.nb_errors, 1);
+		spoe_handle_processing_error(s, agent, ctx, dir);
+		ret = 1;
+	}
 	return ret;
 }
 
@@ -2712,16 +2733,24 @@
 	ret = spoe_process_messages(s, ctx, &group->messages, dir, SPOE_MSGS_BY_GROUP);
 	if (ret && ctx->stats.t_process != -1) {
 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-			    " - <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
+			    " - <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n",
 			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
-			    __FUNCTION__, s, s->uniq_id, ctx->status_code,
+			    __FUNCTION__, s, group->id, s->uniq_id, ctx->status_code,
 			    ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
-			    ctx->stats.t_response, ctx->stats.t_process);
-		send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
-			 "SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
-			 agent->id, group->id, s->uniq_id, ctx->status_code,
-			 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
-			 ctx->stats.t_response, ctx->stats.t_process);
+			    ctx->stats.t_response, ctx->stats.t_process,
+			    agent->counters.idles, agent->counters.applets,
+			    agent->counters.nb_sending, agent->counters.nb_waiting,
+			    agent->counters.nb_errors, agent->counters.nb_processed,
+			    agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec));
+		if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
+			send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
+				 "SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
+				 agent->id, group->id, s->uniq_id, ctx->status_code,
+				 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
+				 ctx->stats.t_response, ctx->stats.t_process,
+				 agent->counters.idles, agent->counters.applets,
+				 agent->counters.nb_sending, agent->counters.nb_waiting,
+				 agent->counters.nb_errors, agent->counters.nb_processed);
 	}
 	return ret;
 }
@@ -2750,16 +2779,24 @@
 	ret = spoe_process_messages(s, ctx, &(ctx->events[ev]), dir, SPOE_MSGS_BY_EVENT);
 	if (ret && ctx->stats.t_process != -1) {
 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-			    " - <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
+			    " - <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n",
 			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
 			    __FUNCTION__, s, spoe_event_str[ev], s->uniq_id, ctx->status_code,
 			    ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
-			    ctx->stats.t_response, ctx->stats.t_process);
-		send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
-			 "SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
-			 agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
-			 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
-			 ctx->stats.t_response, ctx->stats.t_process);
+			    ctx->stats.t_response, ctx->stats.t_process,
+			    agent->counters.idles, agent->counters.applets,
+			    agent->counters.nb_sending, agent->counters.nb_waiting,
+			    agent->counters.nb_errors, agent->counters.nb_processed,
+			    agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec));
+		if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
+			send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
+				 "SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
+				 agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
+				 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
+				 ctx->stats.t_response, ctx->stats.t_process,
+				 agent->counters.idles, agent->counters.applets,
+				 agent->counters.nb_sending, agent->counters.nb_waiting,
+				 agent->counters.nb_errors, agent->counters.nb_processed);
 	}
 	return ret;
 }
@@ -4451,8 +4488,8 @@
 				    (int)now.tv_sec, (int)now.tv_usec,
 				    agent->id, __FUNCTION__, s, group->id);
 			ctx->status_code = SPOE_CTX_ERR_INTERRUPT;
-			spoe_handle_processing_error(s, agent, ctx, dir);
 			spoe_stop_processing(agent, ctx);
+			spoe_handle_processing_error(s, agent, ctx, dir);
 			return ACT_RET_CONT;
 		}
 		return ACT_RET_YIELD;