MINOR: spoe: Replace sending_rate by a frequency counter
sending_rate was a counter used to evaluate the SPOE capacity to process
frames. Because it was not really accurrate, it has been replaced by a frequency
counter representing the number of frames handled by the SPOE per second. We
just check this counter is higher than the number of streams waiting for a
reply. If not, a new applet is created.
diff --git a/include/types/spoe.h b/include/types/spoe.h
index 3051056..2354f6e 100644
--- a/include/types/spoe.h
+++ b/include/types/spoe.h
@@ -262,7 +262,9 @@
unsigned int frame_size; /* current maximum frame size, only used to encode messages */
unsigned int applets_act; /* # of applets alive at a time */
unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */
- unsigned int sending_rate; /* the global sending rate */
+
+ unsigned int processing;
+ struct freq_ctr processing_per_sec;
struct freq_ctr conn_per_sec; /* connections per second */
struct freq_ctr err_per_sec; /* connetion errors per second */
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 1d8e4e9..2baa327 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1638,7 +1638,7 @@
goto next;
case 0: /* ignore */
- agent->rt[tid].sending_rate++;
+ update_freq_ctr(&agent->rt[tid].processing_per_sec, 1);
fpa++;
break;
@@ -1646,7 +1646,7 @@
break;
default:
- agent->rt[tid].sending_rate++;
+ update_freq_ctr(&agent->rt[tid].processing_per_sec, 1);
fpa++;
break;
}
@@ -1991,7 +1991,7 @@
/* Check if we need to create a new SPOE applet or not. */
if (agent->rt[tid].applets_idle &&
- agent->rt[tid].sending_rate)
+ agent->rt[tid].processing < read_freq_ctr(&agent->rt[tid].processing_per_sec))
goto end;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
@@ -2045,18 +2045,15 @@
return -1;
}
- /* Add the SPOE context in the sending queue and update all running
- * info */
+ /* Add the SPOE context in the sending queue */
LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
- if (agent->rt[tid].sending_rate)
- agent->rt[tid].sending_rate--;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - Add stream in sending queue"
- " - applets_act=%u - applets_idle=%u - sending_rate=%u\n",
+ " - applets_act=%u - applets_idle=%u - processing=%u\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
- agent->rt[tid].sending_rate);
+ agent->rt[tid].processing);
/* Finally try to wakeup the first IDLE applet found and move it at the
* end of the list. */
@@ -2436,13 +2433,15 @@
* Functions that process SPOE events
**************************************************************************/
static inline int
-spoe_start_processing(struct spoe_context *ctx, int dir)
+spoe_start_processing(struct spoe_agent *agent, struct spoe_context *ctx, int dir)
{
/* If a process is already started for this SPOE context, retry
* later. */
if (ctx->flags & SPOE_CTX_FL_PROCESS)
return 0;
+ agent->rt[tid].processing++;
+
/* Set the right flag to prevent request and response processing
* in same time. */
ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
@@ -2452,16 +2451,20 @@
}
static inline void
-spoe_stop_processing(struct spoe_context *ctx)
+spoe_stop_processing(struct spoe_agent *agent, struct spoe_context *ctx)
{
struct spoe_appctx *sa = ctx->spoe_appctx;
+ if (!(ctx->flags & SPOE_CTX_FL_PROCESS))
+ return;
+
if (sa && sa->frag_ctx.ctx == ctx) {
sa->frag_ctx.ctx = NULL;
spoe_wakeup_appctx(sa->owner);
}
/* Reset the flag to allow next processing */
+ agent->rt[tid].processing--;
ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
ctx->status_code = 0;
@@ -2555,7 +2558,7 @@
s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
ctx->process_exp);
}
- ret = spoe_start_processing(ctx, dir);
+ ret = spoe_start_processing(agent, ctx, dir);
if (!ret)
goto out;
@@ -2609,7 +2612,7 @@
ret = 1;
end:
- spoe_stop_processing(ctx);
+ spoe_stop_processing(agent, ctx);
return ret;
}
@@ -2710,7 +2713,7 @@
}
static struct spoe_context *
-spoe_create_context(struct filter *filter)
+spoe_create_context(struct stream *s, struct filter *filter)
{
struct spoe_config *conf = FLT_CONF(filter);
struct spoe_context *ctx;
@@ -2736,17 +2739,25 @@
ctx->frame_id = 1;
ctx->process_exp = TICK_ETERNITY;
+ ctx->strm = s;
+ ctx->state = SPOE_CTX_ST_READY;
+ filter->ctx = ctx;
+
return ctx;
}
static void
-spoe_destroy_context(struct spoe_context *ctx)
+spoe_destroy_context(struct filter *filter)
{
+ struct spoe_config *conf = FLT_CONF(filter);
+ struct spoe_context *ctx = filter->ctx;
+
if (!ctx)
return;
- spoe_stop_processing(ctx);
+ spoe_stop_processing(conf->agent, ctx);
pool_free(pool_head_spoe_ctx, ctx);
+ filter->ctx = NULL;
}
static void
@@ -2907,8 +2918,7 @@
(int)now.tv_sec, (int)now.tv_usec, agent->id,
__FUNCTION__, s);
- ctx = spoe_create_context(filter);
- if (ctx == NULL) {
+ if ((ctx = spoe_create_context(s, filter)) == NULL) {
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - failed to create SPOE context\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
@@ -2919,10 +2929,6 @@
return 0;
}
- ctx->strm = s;
- ctx->state = SPOE_CTX_ST_READY;
- filter->ctx = ctx;
-
if (!LIST_ISEMPTY(&ctx->events[SPOE_EV_ON_TCP_REQ_FE]))
filter->pre_analyzers |= AN_REQ_INSPECT_FE;
@@ -2953,7 +2959,7 @@
(int)now.tv_sec, (int)now.tv_usec,
((struct spoe_config *)FLT_CONF(filter))->agent->id,
__FUNCTION__, s);
- spoe_destroy_context(filter->ctx);
+ spoe_destroy_context(filter);
}
@@ -3186,7 +3192,7 @@
curagent->rt[i].frame_size = curagent->max_frame_size;
curagent->rt[i].applets_act = 0;
curagent->rt[i].applets_idle = 0;
- curagent->rt[i].sending_rate = 0;
+ curagent->rt[i].processing = 0;
LIST_INIT(&curagent->rt[i].applets);
LIST_INIT(&curagent->rt[i].sending_queue);
LIST_INIT(&curagent->rt[i].waiting_queue);
@@ -4168,7 +4174,7 @@
agent->id, __FUNCTION__, s, group->id);
ctx->status_code = SPOE_CTX_ERR_INTERRUPT;
spoe_handle_processing_error(s, agent, ctx, dir);
- spoe_stop_processing(ctx);
+ spoe_stop_processing(agent, ctx);
return ACT_RET_CONT;
}
return ACT_RET_YIELD;