MINOR: spoe: Add metrics in to know time spent in the SPOE

Following metrics are added for each event or group of messages processed in the
SPOE:

  * processing time: the delay to process the event or the group. From the
                     stream point of view, it is the latency added by the SPOE
                     processing.
  * request time : It is the encoding time. It includes ACLs processing, if
                   any. For fragmented frames, it is the sum of all fragments.
  * queue time : the delay before the request gets out the sending queue. For
                 fragmented frames, it is the sum of all fragments.
  * waiting time: the delay before the reponse is received. No fragmentation
                  supported here.
  * response time: the delay to process the response. No fragmentation supported
                   here.
  * total time: (unused for now). It is the sum of all events or groups
                processed by the SPOE for a specific threads.

Log messages has been updated. Before, only errors was logged (status_code !=
0). Now every processing is logged, following this format:

  SPOE: [AGENT] <TYPE:NAME> sid=STREAM-ID st=STATUC-CODE reqT/qT/wT/resT/pT

where:

  AGENT              is the agent name
  TYPE               is EVENT of GROUP
  NAME               is the event or the group name
  STREAM-ID          is an integer, the unique id of the stream
  STATUS_CODE        is the processing's status code
  reqT/qT/wT/resT/pT are delays descrive above

For all these delays, -1 means the processing was interrupted before the end. So
-1 for the queue time means the request was never dequeued. For fragmented
frames it is harder to know when the interruption happened.

For now, messages are logged using the same logger than the backend of the
stream which initiated the request.
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 5767dfe..7d5eb3d 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -270,6 +270,17 @@
 	return uuid;
 }
 
+
+static inline void
+spoe_update_stat_time(struct timeval *tv, long *t)
+{
+	if (*t == -1)
+		*t = tv_ms_elapsed(tv, &now);
+	else
+		*t += tv_ms_elapsed(tv, &now);
+	tv_zero(tv);
+}
+
 /********************************************************************
  * Functions that encode/decode SPOE frames
  ********************************************************************/
@@ -1246,6 +1257,7 @@
 	list_for_each_entry_safe(ctx, back, &spoe_appctx->waiting_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
+		spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
@@ -1273,6 +1285,7 @@
 	list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
+		spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
@@ -1280,6 +1293,7 @@
 	list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
+		spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
@@ -1472,6 +1486,7 @@
 			spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
+			spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 			ctx->spoe_appctx = NULL;
 			ctx->state = SPOE_CTX_ST_ERROR;
 			ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
@@ -1490,6 +1505,7 @@
 			spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
+			spoe_update_stat_time(&ctx->stats.tv_queue, &ctx->stats.t_queue);
 			ctx->spoe_appctx = SPOE_APPCTX(appctx);
 			if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
 			    (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN))
@@ -1523,6 +1539,7 @@
 		*skip = 1;
 		LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
 	}
+	ctx->stats.tv_wait = now;
 	SPOE_APPCTX(appctx)->frag_ctx.ctx    = NULL;
 	SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
 	SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
@@ -1576,6 +1593,8 @@
 		default:
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
+			spoe_update_stat_time(&ctx->stats.tv_wait, &ctx->stats.t_waiting);
+			ctx->stats.tv_response = now;
 			if (ctx->spoe_appctx) {
 				ctx->spoe_appctx->cur_fpa--;
 				ctx->spoe_appctx = NULL;
@@ -1589,7 +1608,6 @@
 			}
 			else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
 				appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
-
 			task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 			break;
 	}
@@ -2045,6 +2063,8 @@
 
 	/* Add the SPOE context in the sending queue */
 	LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
+	spoe_update_stat_time(&ctx->stats.tv_request, &ctx->stats.t_request);
+	ctx->stats.tv_queue = now;
 
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 		    " - Add stream in sending queue"
@@ -2439,6 +2459,15 @@
 		return 0;
 
 	agent->rt[tid].processing++;
+	ctx->stats.tv_start   = now;
+	ctx->stats.tv_request = now;
+	ctx->stats.t_request  = -1;
+	ctx->stats.t_queue    = -1;
+	ctx->stats.t_waiting  = -1;
+	ctx->stats.t_response = -1;
+	ctx->stats.t_process  = -1;
+
+	ctx->status_code = 0;
 
 	/* Set the right flag to prevent request and response processing
 	 * in same time. */
@@ -2469,8 +2498,6 @@
 	agent->rt[tid].processing--;
 	ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
 
-	ctx->status_code = 0;
-
 	/* Reset processing timer */
 	ctx->process_exp = TICK_ETERNITY;
 
@@ -2489,6 +2516,20 @@
 }
 
 static void
+spoe_update_stats(struct stream *s, struct spoe_agent *agent,
+		  struct spoe_context *ctx, int dir)
+{
+	if (!tv_iszero(&ctx->stats.tv_start)) {
+		spoe_update_stat_time(&ctx->stats.tv_start, &ctx->stats.t_process);
+		ctx->stats.t_total  += ctx->stats.t_process;
+		tv_zero(&ctx->stats.tv_request);
+		tv_zero(&ctx->stats.tv_queue);
+		tv_zero(&ctx->stats.tv_wait);
+		tv_zero(&ctx->stats.tv_response);
+	}
+}
+
+static void
 spoe_handle_processing_error(struct stream *s, struct spoe_agent *agent,
 			     struct spoe_context *ctx, int dir)
 {
@@ -2506,13 +2547,6 @@
 		spoe_set_var(ctx, "txn", agent->var_on_error,
 			     strlen(agent->var_on_error), &smp);
 	}
-	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
-		    " - failed to process messages: code=%u\n",
-		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
-		    __FUNCTION__, s, ctx->status_code);
-	send_log(ctx->strm->be, LOG_WARNING,
-		 "SPOE: [%s] failed to process messages: code=%u\n",
-		 agent->id, ctx->status_code);
 
 	ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
 		      ? SPOE_CTX_ST_READY
@@ -2569,6 +2603,8 @@
 	}
 
 	if (ctx->state == SPOE_CTX_ST_ENCODING_MSGS) {
+		if (!tv_iszero(&ctx->stats.tv_request))
+		    ctx->stats.tv_request = now;
 		if (!spoe_acquire_buffer(&ctx->buffer, &ctx->buffer_wait))
 			goto out;
 		ret = spoe_encode_messages(s, ctx, messages, dir, type);
@@ -2598,6 +2634,7 @@
 		ret = 1;
 		ctx->frame_id++;
 		ctx->state = SPOE_CTX_ST_READY;
+		spoe_update_stat_time(&ctx->stats.tv_response, &ctx->stats.t_response);
 		goto end;
 	}
 
@@ -2610,10 +2647,12 @@
 	goto end;
 
   skip:
+	tv_zero(&ctx->stats.tv_start);
 	ctx->state = SPOE_CTX_ST_READY;
 	ret = 1;
 
   end:
+	spoe_update_stats(s, agent, ctx, dir);
 	spoe_stop_processing(agent, ctx);
 	return ret;
 }
@@ -2624,12 +2663,13 @@
 spoe_process_group(struct stream *s, struct spoe_context *ctx,
 		   struct spoe_group *group, int dir)
 {
+	struct spoe_config *conf = FLT_CONF(ctx->filter);
+	struct spoe_agent  *agent = conf->agent;
 	int ret;
 
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 		    " - ctx-state=%s - Process messages for group=%s\n",
-		    (int)now.tv_sec, (int)now.tv_usec,
-		    ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
+		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
 		    __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
 		    group->id);
 
@@ -2637,6 +2677,19 @@
 		return 1;
 
 	ret = spoe_process_messages(s, ctx, &group->messages, dir, SPOE_MSGS_BY_GROUP);
+	if (ret && ctx->stats.t_process != -1) {
+		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+			    " - <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
+			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
+			    __FUNCTION__, s, s->uniq_id, ctx->status_code,
+			    ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
+			    ctx->stats.t_response, ctx->stats.t_process);
+		send_log(s->be, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
+			 "SPOE: [%s] <GROUP:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
+			 agent->id, group->id, s->uniq_id, ctx->status_code,
+			 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
+			 ctx->stats.t_response, ctx->stats.t_process);
+	}
 	return ret;
 }
 
@@ -2646,12 +2699,13 @@
 spoe_process_event(struct stream *s, struct spoe_context *ctx,
 		   enum spoe_event ev)
 {
+	struct spoe_config *conf = FLT_CONF(ctx->filter);
+	struct spoe_agent  *agent = conf->agent;
 	int dir, ret;
 
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 		    " - ctx-state=%s - Process messages for event=%s\n",
-		    (int)now.tv_sec, (int)now.tv_usec,
-		    ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
+		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
 		    __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
 		    spoe_event_str[ev]);
 
@@ -2661,6 +2715,19 @@
 		return 1;
 
 	ret = spoe_process_messages(s, ctx, &(ctx->events[ev]), dir, SPOE_MSGS_BY_EVENT);
+	if (ret && ctx->stats.t_process != -1) {
+		SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+			    " - <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
+			    (int)now.tv_sec, (int)now.tv_usec, agent->id,
+			    __FUNCTION__, s, spoe_event_str[ev], s->uniq_id, ctx->status_code,
+			    ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
+			    ctx->stats.t_response, ctx->stats.t_process);
+		send_log(s->be, (!ctx->status_code ? LOG_NOTICE : LOG_WARNING),
+			 "SPOE: [%s] <EVENT:%s> sid=%u st=%u %ld/%ld/%ld/%ld/%ld\n",
+			 agent->id, spoe_event_str[ev], s->uniq_id, ctx->status_code,
+			 ctx->stats.t_request, ctx->stats.t_queue, ctx->stats.t_waiting,
+			 ctx->stats.t_response, ctx->stats.t_process);
+	}
 	return ret;
 }
 
@@ -2741,6 +2808,18 @@
 	ctx->frame_id    = 1;
 	ctx->process_exp = TICK_ETERNITY;
 
+	tv_zero(&ctx->stats.tv_start);
+	tv_zero(&ctx->stats.tv_request);
+	tv_zero(&ctx->stats.tv_queue);
+	tv_zero(&ctx->stats.tv_wait);
+	tv_zero(&ctx->stats.tv_response);
+	ctx->stats.t_request  = -1;
+	ctx->stats.t_queue    = -1;
+	ctx->stats.t_waiting  = -1;
+	ctx->stats.t_response = -1;
+	ctx->stats.t_process  = -1;
+	ctx->stats.t_total    =  0;
+
 	ctx->strm   = s;
 	ctx->state  = SPOE_CTX_ST_READY;
 	filter->ctx = ctx;
@@ -2767,6 +2846,18 @@
 {
 	ctx->state  = SPOE_CTX_ST_READY;
 	ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
+
+	tv_zero(&ctx->stats.tv_start);
+	tv_zero(&ctx->stats.tv_request);
+	tv_zero(&ctx->stats.tv_queue);
+	tv_zero(&ctx->stats.tv_wait);
+	tv_zero(&ctx->stats.tv_response);
+	ctx->stats.t_request  = -1;
+	ctx->stats.t_queue    = -1;
+	ctx->stats.t_waiting  = -1;
+	ctx->stats.t_response = -1;
+	ctx->stats.t_process  = -1;
+	ctx->stats.t_total    =  0;
 }