MINOR: spoe: Add counters to log info about SPOE agents

In addition to metrics about time spent in the SPOE, following counters have
been added:

  * applets : number of SPOE applets.
  * idles : number of idle applets.
  * nb_sending : number of streams waiting to send data.
  * nb_waiting : number of streams waiting for a ack.
  * nb_processed : number of events/groups processed by the SPOE (from the
                   stream point of view).
  * nb_errors : number of errors during the processing (from the stream point of
                view).

Log messages has been updated to report these counters. Following pattern has
been added at the end of the log message:

    ... <idles>/<applets> <nb_sending>/<nb_waiting> <nb_error>/<nb_processed>
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 3b425cc..7c019c4 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1233,6 +1233,7 @@
 
 	/* Remove applet from the list of running applets */
 	SPOE_DEBUG_STMT(agent->rt[tid].applets_act--);
+	HA_ATOMIC_SUB(&agent->counters.applets, 1);
 	HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 	if (!LIST_ISEMPTY(&spoe_appctx->list)) {
 		LIST_DEL(&spoe_appctx->list);
@@ -1245,6 +1246,7 @@
 		if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
 			eb32_delete(&spoe_appctx->node);
 			SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
+			HA_ATOMIC_SUB(&agent->counters.idles, 1);
 		}
 
 		appctx->st0 = SPOE_APPCTX_ST_END;
@@ -1266,6 +1268,7 @@
 	list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
+		HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
 		spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
@@ -1294,6 +1297,7 @@
 	list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
+		HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
 		spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
@@ -1302,6 +1306,7 @@
 	list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
+		HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
 		spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
@@ -1426,6 +1431,7 @@
 			/* HELLO handshake is finished, set the idle timeout and
 			 * add the applet in the list of running applets. */
 			SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
+			HA_ATOMIC_ADD(&agent->counters.idles, 1);
 			appctx->st0 = SPOE_APPCTX_ST_IDLE;
 			SPOE_APPCTX(appctx)->node.key = 0;
 			eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
@@ -1495,6 +1501,7 @@
 			spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
+			HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
 			spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 			ctx->spoe_appctx = NULL;
 			ctx->state = SPOE_CTX_ST_ERROR;
@@ -1514,6 +1521,7 @@
 			spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
+			HA_ATOMIC_SUB(&agent->counters.nb_sending, 1);
 			spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 			ctx->spoe_appctx = SPOE_APPCTX(appctx);
 			if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
@@ -1548,6 +1556,7 @@
 		*skip = 1;
 		LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
 	}
+	HA_ATOMIC_ADD(&agent->counters.nb_waiting, 1);
 	ctx->stats.tv_wait = now;
 	SPOE_APPCTX(appctx)->frag_ctx.ctx    = NULL;
 	SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
@@ -1571,6 +1580,7 @@
 static int
 spoe_handle_receiving_frame_appctx(struct appctx *appctx, int *skip)
 {
+	struct spoe_agent   *agent = SPOE_APPCTX(appctx)->agent;
 	struct spoe_context *ctx = NULL;
 	char *frame;
 	int   ret;
@@ -1602,6 +1612,7 @@
 		default:
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
+			HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
 			spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 			ctx->stats.tv_response = now;
 			if (ctx->spoe_appctx) {
@@ -1708,6 +1719,7 @@
 
 	if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
 		SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
+		HA_ATOMIC_ADD(&agent->counters.idles, 1);
 		appctx->st0 = SPOE_APPCTX_ST_IDLE;
 		eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
 	}
@@ -1871,6 +1883,7 @@
 
 		case SPOE_APPCTX_ST_IDLE:
 			SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
+			HA_ATOMIC_SUB(&agent->counters.idles, 1);
 			eb32_delete(&SPOE_APPCTX(appctx)->node);
 			if (stopping &&
 			    LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
@@ -1988,6 +2001,7 @@
 	LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
 	HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
 	SPOE_DEBUG_STMT(conf->agent->rt[tid].applets_act++);
+	HA_ATOMIC_ADD(&conf->agent->counters.applets, 1);
 
 	task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
 	task_wakeup(strm->task, TASK_WOKEN_INIT);
@@ -2072,6 +2086,7 @@
 
 	/* Add the SPOE context in the sending queue */
 	LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
+	HA_ATOMIC_ADD(&agent->counters.nb_sending, 1);
 	spoe_update_stat_time(&ctx->stats.tv_request, &ctx->stats.t_request);
 	ctx->stats.tv_queue = now;
 
@@ -2459,6 +2474,68 @@
 /***************************************************************************
  * Functions that process SPOE events
  **************************************************************************/
+static void
+spoe_update_stats(struct stream *s, struct spoe_agent *agent,
+		  struct spoe_context *ctx, int dir)
+{
+	if (!tv_iszero(&ctx->stats.tv_start)) {
+		spoe_update_stat_time(&ctx->stats.tv_start, &ctx->stats.t_process);
+		ctx->stats.t_total  += ctx->stats.t_process;
+		tv_zero(&ctx->stats.tv_request);
+		tv_zero(&ctx->stats.tv_queue);
+		tv_zero(&ctx->stats.tv_wait);
+		tv_zero(&ctx->stats.tv_response);
+	}
+
+	if (agent->var_t_process) {
+		struct sample smp;
+
+		memset(&smp, 0, sizeof(smp));
+		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+		smp.data.u.sint = ctx->stats.t_process;
+		smp.data.type   = SMP_T_SINT;
+
+		spoe_set_var(ctx, "txn", agent->var_t_process,
+			     strlen(agent->var_t_process), &smp);
+	}
+
+	if (agent->var_t_total) {
+		struct sample smp;
+
+		memset(&smp, 0, sizeof(smp));
+		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+		smp.data.u.sint = ctx->stats.t_total;
+		smp.data.type   = SMP_T_SINT;
+
+		spoe_set_var(ctx, "txn", agent->var_t_total,
+			     strlen(agent->var_t_total), &smp);
+	}
+}
+
+static void
+spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
+			     struct spoe_context *ctx, int dir)
+{
+	if (agent->eps_max > 0)
+		update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
+
+	if (agent->var_on_error) {
+		struct sample smp;
+
+		memset(&smp, 0, sizeof(smp));
+		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+		smp.data.u.sint = ctx->status_code;
+		smp.data.type   = SMP_T_BOOL;
+
+		spoe_set_var(ctx, "txn", agent->var_on_error,
+			     strlen(agent->var_on_error), &smp);
+	}
+
+	ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
+		      ? SPOE_CTX_ST_READY
+		      : SPOE_CTX_ST_NONE);
+}
+
 static inline int
 spoe_start_processing(struct spoe_agent *agent, struct spoe_context *ctx, int dir)
 {
@@ -2493,7 +2570,7 @@
 
 	if (!(ctx->flags & SPOE_CTX_FL_PROCESS))
 		return;
-
+	HA_ATOMIC_ADD(&agent->counters.nb_processed, 1);
 	if (sa) {
 		if (sa->frag_ctx.ctx == ctx) {
 			sa->frag_ctx.ctx = NULL;
@@ -2519,71 +2596,14 @@
 	ctx->frag_ctx.flags       = 0;
 
 	if (!LIST_ISEMPTY(&ctx->list)) {
+		if (ctx->state == SPOE_CTX_ST_SENDING_MSGS)
+			HA_ATOMIC_ADD(&agent->counters.nb_sending, 1);
+		else
+			HA_ATOMIC_SUB(&agent->counters.nb_waiting, 1);
+
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
 	}
-}
-
-static void
-spoe_update_stats(struct stream *s, struct spoe_agent *agent,
-		  struct spoe_context *ctx, int dir)
-{
-	if (!tv_iszero(&ctx->stats.tv_start)) {
-		spoe_update_stat_time(&ctx->stats.tv_start, &ctx->stats.t_process);
-		ctx->stats.t_total  += ctx->stats.t_process;
-		tv_zero(&ctx->stats.tv_request);
-		tv_zero(&ctx->stats.tv_queue);
-		tv_zero(&ctx->stats.tv_wait);
-		tv_zero(&ctx->stats.tv_response);
-	}
-
-	if (agent->var_t_process) {
-		struct sample smp;
-
-		memset(&smp, 0, sizeof(smp));
-		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
-		smp.data.u.sint = ctx->stats.t_process;
-		smp.data.type   = SMP_T_SINT;
-
-		spoe_set_var(ctx, "txn", agent->var_t_process,
-			     strlen(agent->var_t_process), &smp);
-	}
-
-	if (agent->var_t_total) {
-		struct sample smp;
-
-		memset(&smp, 0, sizeof(smp));
-		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
-		smp.data.u.sint = ctx->stats.t_total;
-		smp.data.type   = SMP_T_SINT;
-
-		spoe_set_var(ctx, "txn", agent->var_t_total,
-			     strlen(agent->var_t_total), &smp);
-	}
-}
-
-static void
-spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
-			     struct spoe_context *ctx, int dir)
-{
-	if (agent->eps_max > 0)
-		update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
-
-	if (agent->var_on_error) {
-		struct sample smp;
-
-		memset(&smp, 0, sizeof(smp));
-		smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
-		smp.data.u.sint = ctx->status_code;
-		smp.data.type   = SMP_T_BOOL;
-
-		spoe_set_var(ctx, "txn", agent->var_on_error,
-			     strlen(agent->var_on_error), &smp);
-	}
-
-	ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
-		      ? SPOE_CTX_ST_READY
-		      : SPOE_CTX_ST_NONE);
 }
 
 /* Process a list of SPOE messages. First, this functions will process messages
@@ -2600,7 +2620,7 @@
 	int                 ret = 1;
 
 	if (ctx->state == SPOE_CTX_ST_ERROR)
-		goto error;
+		goto end;
 
 	if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
@@ -2608,7 +2628,7 @@
 			    (int)now.tv_sec, (int)now.tv_usec,
 			    agent->id, __FUNCTION__, s);
 		ctx->status_code = SPOE_CTX_ERR_TOUT;
-		goto error;
+		goto end;
 	}
 
 	if (ctx->state == SPOE_CTX_ST_READY) {
@@ -2642,11 +2662,11 @@
 			goto out;
 		ret = spoe_encode_messages(s, ctx, messages, dir, type);
 		if (ret < 0)
-			goto error;
+			goto end;
 		if (!ret)
 			goto skip;
 		if (spoe_queue_context(ctx) < 0)
-			goto error;
+			goto end;
 		ctx->state = SPOE_CTX_ST_SENDING_MSGS;
 	}
 
@@ -2674,19 +2694,20 @@
   out:
 	return ret;
 
-  error:
-	spoe_handle_processing_error(s, agent, ctx, dir);
-	ret = 1;
-	goto end;
-
   skip:
 	tv_zero(&ctx->stats.tv_start);
 	ctx->state = SPOE_CTX_ST_READY;
-	ret = 1;
+	spoe_stop_processing(agent, ctx);
+	return 1;
 
   end:
 	spoe_update_stats(s, agent, ctx, dir);
 	spoe_stop_processing(agent, ctx);
+	if (ctx->status_code) {
+		HA_ATOMIC_ADD(&agent->counters.nb_errors, 1);
+		spoe_handle_processing_error(s, agent, ctx, dir);
+		ret = 1;
+	}
 	return ret;
 }
 
@@ -2712,16 +2733,24 @@
 	ret = spoe_process_messages(s, ctx, &group->messages, dir, SPOE_MSGS_BY_GROUP);
 	if (ret && ctx->stats.t_process != -1) {
 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-			    " - <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
+			    " - <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n",
 			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
-			    __FUNCTION__, s, s->uniq_id, ctx->status_code,
+			    __FUNCTION__, s, group->id, s->uniq_id, ctx->status_code,
 			    ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
-			    ctx->stats.t_response, ctx->stats.t_process);
-		send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
-			 "SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
-			 agent->id, group->id, s->uniq_id, ctx->status_code,
-			 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
-			 ctx->stats.t_response, ctx->stats.t_process);
+			    ctx->stats.t_response, ctx->stats.t_process,
+			    agent->counters.idles, agent->counters.applets,
+			    agent->counters.nb_sending, agent->counters.nb_waiting,
+			    agent->counters.nb_errors, agent->counters.nb_processed,
+			    agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec));
+		if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
+			send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
+				 "SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
+				 agent->id, group->id, s->uniq_id, ctx->status_code,
+				 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
+				 ctx->stats.t_response, ctx->stats.t_process,
+				 agent->counters.idles, agent->counters.applets,
+				 agent->counters.nb_sending, agent->counters.nb_waiting,
+				 agent->counters.nb_errors, agent->counters.nb_processed);
 	}
 	return ret;
 }
@@ -2750,16 +2779,24 @@
 	ret = spoe_process_messages(s, ctx, &(ctx->events[ev]), dir, SPOE_MSGS_BY_EVENT);
 	if (ret && ctx->stats.t_process != -1) {
 		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-			    " - <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
+			    " - <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu %u/%u\n",
 			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
 			    __FUNCTION__, s, spoe_event_str[ev], s->uniq_id, ctx->status_code,
 			    ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
-			    ctx->stats.t_response, ctx->stats.t_process);
-		send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
-			 "SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
-			 agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
-			 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
-			 ctx->stats.t_response, ctx->stats.t_process);
+			    ctx->stats.t_response, ctx->stats.t_process,
+			    agent->counters.idles, agent->counters.applets,
+			    agent->counters.nb_sending, agent->counters.nb_waiting,
+			    agent->counters.nb_errors, agent->counters.nb_processed,
+			    agent->rt[tid].processing, read_freq_ctr(&agent->rt[tid].processing_per_sec));
+		if (ctx->status_code || !(conf->agent_fe.options2 & PR_O2_NOLOGNORM))
+			send_log(&conf->agent_fe, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
+				 "SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld %u/%u %u/%u %llu/%llu\n",
+				 agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
+				 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
+				 ctx->stats.t_response, ctx->stats.t_process,
+				 agent->counters.idles, agent->counters.applets,
+				 agent->counters.nb_sending, agent->counters.nb_waiting,
+				 agent->counters.nb_errors, agent->counters.nb_processed);
 	}
 	return ret;
 }
@@ -4451,8 +4488,8 @@
 				    (int)now.tv_sec, (int)now.tv_usec,
 				    agent->id, __FUNCTION__, s, group->id);
 			ctx->status_code = SPOE_CTX_ERR_INTERRUPT;
-			spoe_handle_processing_error(s, agent, ctx, dir);
 			spoe_stop_processing(agent, ctx);
+			spoe_handle_processing_error(s, agent, ctx, dir);
 			return ACT_RET_CONT;
 		}
 		return ACT_RET_YIELD;