MINOR: spoe: Count the number of frames waiting for an ack for each applet
So it is easier to respect the max_fpa value. This is no more the maximum frames
processed by an applet at each loop but the maximum frames waiting for an ack
for a specific applet.
The function spoe_handle_processing_appctx has been rewritten accordingly.
diff --git a/include/types/spoe.h b/include/types/spoe.h
index 2354f6e..659dd27 100644
--- a/include/types/spoe.h
+++ b/include/types/spoe.h
@@ -336,6 +336,7 @@
struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */
struct list list; /* next spoe appctx for the same agent */
+ unsigned int cur_fpa;
struct {
struct spoe_context *ctx; /* SPOE context owning the fragmented frame */
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 2baa327..ae34c3b 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1523,6 +1523,7 @@
SPOE_APPCTX(appctx)->frag_ctx.ctx = NULL;
SPOE_APPCTX(appctx)->frag_ctx.cursid = 0;
SPOE_APPCTX(appctx)->frag_ctx.curfid = 0;
+ SPOE_APPCTX(appctx)->cur_fpa++;
ctx->state = SPOE_CTX_ST_WAITING_ACK;
goto end;
@@ -1571,8 +1572,10 @@
default:
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
- if (ctx->spoe_appctx)
+ if (ctx->spoe_appctx) {
+ ctx->spoe_appctx->cur_fpa--;
ctx->spoe_appctx = NULL;
+ }
if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY &&
ctx == SPOE_APPCTX(appctx)->frag_ctx.ctx) {
appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
@@ -1599,8 +1602,7 @@
{
struct stream_interface *si = appctx->owner;
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- unsigned int fpa = 0;
- int ret, skip_sending = 0, skip_receiving = 0;
+ int ret, skip_sending = 0, skip_receiving = 0, active_s = 0, active_r = 0;
if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
@@ -1614,86 +1616,76 @@
goto next;
}
- process:
+
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
- " - process: fpa=%u/%u - skip_sending=%d - skip_receiving=%d"
- " - appctx-state=%s\n",
+ " - process: fpa=%u/%u - appctx-state=%s - flags=0x%08x\n",
(int)now.tv_sec, (int)now.tv_usec, agent->id,
- __FUNCTION__, appctx, fpa, agent->max_fpa,
- skip_sending, skip_receiving,
- spoe_appctx_state_str[appctx->st0]);
+ __FUNCTION__, appctx, SPOE_APPCTX(appctx)->cur_fpa,
+ agent->max_fpa, spoe_appctx_state_str[appctx->st0],
+ SPOE_APPCTX(appctx)->flags);
- if (fpa > agent->max_fpa)
- goto stop;
- else if (skip_sending || appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
- if (skip_receiving)
- goto stop;
- goto recv_frame;
- }
+ if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
+ skip_sending = 1;
- /* send_frame */
- ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending);
- switch (ret) {
- case -1: /* error */
- goto next;
+ /* receiving_frame loop */
+ while (!skip_receiving) {
+ ret = spoe_handle_receiving_frame_appctx(appctx, &skip_receiving);
+ switch (ret) {
+ case -1: /* error */
+ goto next;
- case 0: /* ignore */
- update_freq_ctr(&agent->rt[tid].processing_per_sec, 1);
- fpa++;
- break;
+ case 0: /* ignore */
+ active_r = 1;
+ break;
- case 1: /* retry */
- break;
+ case 1: /* retry */
+ break;
- default:
- update_freq_ctr(&agent->rt[tid].processing_per_sec, 1);
- fpa++;
- break;
+ default:
+ active_r = 1;
+ break;
+ }
}
- if (fpa > agent->max_fpa)
- goto stop;
- recv_frame:
- if (skip_receiving)
- goto process;
- ret = spoe_handle_receiving_frame_appctx(appctx, &skip_receiving);
- switch (ret) {
- case -1: /* error */
- goto next;
+ /* send_frame loop */
+ while (!skip_sending && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
+ ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending);
+ switch (ret) {
+ case -1: /* error */
+ goto next;
- case 0: /* ignore */
- fpa++;
- break;
+ case 0: /* ignore */
+ active_s++;
+ break;
- case 1: /* retry */
- break;
+ case 1: /* retry */
+ break;
- default:
- fpa++;
- break;
+ default:
+ active_s++;
+ break;
+ }
}
- goto process;
- next:
- SPOE_APPCTX(appctx)->task->expire =
- tick_add_ifset(now_ms, agent->timeout.idle);
- return 0;
- stop:
- if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) {
- appctx->st0 = SPOE_APPCTX_ST_IDLE;
- agent->rt[tid].applets_idle++;
- }
- if (fpa) {
+ if (active_s || active_r) {
HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
LIST_DEL(&SPOE_APPCTX(appctx)->list);
LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
- if (fpa)
- SPOE_APPCTX(appctx)->task->expire =
- tick_add_ifset(now_ms, agent->timeout.idle);
+
+ update_freq_ctr(&agent->rt[tid].processing_per_sec, active_s);
+ SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ }
+ if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
+ appctx->st0 = SPOE_APPCTX_ST_IDLE;
+ agent->rt[tid].applets_idle++;
}
return 1;
+ next:
+ SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ return 0;
+
exit:
appctx->st0 = SPOE_APPCTX_ST_EXIT;
return 0;
@@ -1935,6 +1927,7 @@
SPOE_APPCTX(appctx)->flags = 0;
SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE;
SPOE_APPCTX(appctx)->buffer = &buf_empty;
+ SPOE_APPCTX(appctx)->cur_fpa = 0;
LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
SPOE_APPCTX(appctx)->buffer_wait.target = appctx;