MINOR: spoe: Add 'timeout processing' option to limit time to process an event
It is a way to set the maximum time to wait for a stream to process an event,
i.e to acquire a stream to talk with an agent, to encode all messages, to send
the NOTIFY frame, to receive the corrsponding acknowledgement and to process all
actions. It is applied on the stream that handle the client and the server
sessions.
diff --git a/doc/SPOE.txt b/doc/SPOE.txt
index fa0a533..f9aac71 100644
--- a/doc/SPOE.txt
+++ b/doc/SPOE.txt
@@ -158,7 +158,7 @@
following keywords are supported :
- messages
- option var-prefix
- - timeout hello|idle|ack
+ - timeout hello|idle|ack|processing
- use-backend
@@ -200,7 +200,8 @@
timeout ack <timeout>
Set the maximum time to wait for an agent to receive the acknowledgement to a
- NOTIFY frame.
+ NOTIFY frame. It is applied on the stream that handle the connection with the
+ agent.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
@@ -210,6 +211,7 @@
timeout hello <timeout>
Set the maximum time to wait for an agent to receive the AGENT-HELLO frame.
+ It is applied on the stream that handle the connection with the agent.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
@@ -221,7 +223,21 @@
timeout idle <timeout>
- Set the maximum time to wait for an agent to close an idle connection.
+ Set the maximum time to wait for an agent to close an idle connection. It is
+ applied on the stream that handle the connection with the agent.
+
+ Arguments :
+ <timeout> is the timeout value specified in milliseconds by default, but
+ can be in any other unit if the number is suffixed by the unit,
+ as explained at the top of this document.
+
+
+timeout processing <timeout>
+ Set the maximum time to wait for a stream to process an event, i.e to acquire
+ a stream to talk with an agent, to encode all messages, to send the NOTIFY
+ frame, to receive the corrsponding acknowledgement and to process all
+ actions. It is applied on the stream that handle the client and the server
+ sessions.
Arguments :
<timeout> is the timeout value specified in milliseconds by default, but
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 12e589e..b3f2ef3 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -190,9 +190,10 @@
char *name; /* Backend name used during conf parsing */
} b;
struct {
- unsigned int hello; /* Max time to receive AGENT-HELLO frame */
- unsigned int idle; /* Max Idle timeout */
- unsigned int ack; /* Max time to acknowledge a NOTIFY frame */
+ unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
+ unsigned int idle; /* Max Idle timeout (in SPOE applet) */
+ unsigned int ack; /* Max time to acknowledge a NOTIFY frame (in SPOE applet)*/
+ unsigned int processing; /* Max time to process an event (in the main stream) */
} timeout;
char *var_pfx; /* Prefix used for vars set by the agent */
@@ -232,7 +233,7 @@
unsigned int stream_id; /* stream_id and frame_id are used */
unsigned int frame_id; /* to map NOTIFY and ACK frames */
-
+ unsigned int process_exp; /* expiration date to process an event */
};
/* Set if the handle on SIGUSR1 is registered */
@@ -1636,6 +1637,9 @@
{
struct spoe_context *ctx;
+ if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
+ return;
+
if (LIST_ISEMPTY(&agent->applet_wq))
LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
else {
@@ -1787,6 +1791,9 @@
/* Reset the flag to allow next processing */
ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+ /* Reset processing timer */
+ ctx->process_exp = TICK_ETERNITY;
+
/* Release the buffer if needed */
if (ctx->buffer != &buf_empty) {
b_free(&ctx->buffer);
@@ -2084,13 +2091,14 @@
process_spoe_event(struct stream *s, struct spoe_context *ctx,
enum spoe_event ev)
{
- int dir, ret = 1;
+ struct spoe_config *conf = FLT_CONF(ctx->filter);
+ struct spoe_agent *agent = conf->agent;
+ int dir, ret = 1;
SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
" - ctx-state=%s - event=%s\n",
(int)now.tv_sec, (int)now.tv_usec,
- ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
- __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
+ agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
spoe_event_str[ev]);
dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
@@ -2100,8 +2108,25 @@
if (ctx->state == SPOE_CTX_ST_ERROR)
goto error;
+
+ if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - failed to process event '%s': timeout\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ agent->id, __FUNCTION__, s, spoe_event_str[ev]);
+ send_log(ctx->strm->be, LOG_WARNING,
+ "failed to process event '%s': timeout.\n",
+ spoe_event_str[ev]);
+ goto error;
+ }
if (ctx->state == SPOE_CTX_ST_READY) {
+ if (!tick_isset(ctx->process_exp)) {
+ ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
+ s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
+ ctx->process_exp);
+ }
+
ret = acquire_spoe_appctx(ctx, dir);
if (ret <= 0) {
if (!ret)
@@ -2182,8 +2207,9 @@
LIST_INIT(&ctx->buffer_wait);
LIST_INIT(&ctx->applet_wait);
- ctx->stream_id = 0;
- ctx->frame_id = 1;
+ ctx->stream_id = 0;
+ ctx->frame_id = 1;
+ ctx->process_exp = TICK_ETERNITY;
return ctx;
}
@@ -2399,6 +2425,19 @@
}
}
+
+/*
+ * Called when the stream is woken up because of expired timer.
+ */
+static void
+spoe_check_timeouts(struct stream *s, struct filter *filter)
+{
+ struct spoe_context *ctx = filter->ctx;
+
+ if (tick_is_expired(ctx->process_exp, now_ms))
+ s->task->state |= TASK_WOKEN_MSG;
+}
+
/* Called when we are ready to filter data on a channel */
static int
spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
@@ -2528,8 +2567,9 @@
.check = spoe_check,
/* Handle start/stop of SPOE */
- .attach = spoe_start,
- .detach = spoe_stop,
+ .attach = spoe_start,
+ .detach = spoe_stop,
+ .check_timeouts = spoe_check_timeouts,
/* Handle channels activity */
.channel_start_analyze = spoe_start_analyze,
@@ -2589,6 +2629,7 @@
curagent->timeout.hello = TICK_ETERNITY;
curagent->timeout.ack = TICK_ETERNITY;
curagent->timeout.idle = TICK_ETERNITY;
+ curagent->timeout.processing = TICK_ETERNITY;
curagent->var_pfx = NULL;
curagent->new_applets = 0;
@@ -2654,8 +2695,10 @@
tv = &curagent->timeout.idle;
else if (!strcmp(args[1], "ack"))
tv = &curagent->timeout.ack;
+ else if (!strcmp(args[1], "processing"))
+ tv = &curagent->timeout.processing;
else {
- Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' and 'ack' (got %s).\n",
+ Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle', 'ack' or 'processing' (got %s).\n",
file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto out;
@@ -2956,13 +2999,17 @@
curagent->id, curagent->conf.file, curagent->conf.line);
goto error;
}
- if (curagent->timeout.hello == TICK_ETERNITY ||
- curagent->timeout.idle == TICK_ETERNITY ||
- curagent->timeout.ack == TICK_ETERNITY) {
+ if (curagent->timeout.hello == TICK_ETERNITY ||
+ curagent->timeout.idle == TICK_ETERNITY ||
+ curagent->timeout.ack == TICK_ETERNITY ||
+ curagent->timeout.processing == TICK_ETERNITY) {
+ if (curagent->timeout.ack == TICK_ETERNITY)
+ curagent->timeout.ack = curagent->timeout.idle;
+
Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
" | While not properly invalid, you will certainly encounter various problems\n"
" | with such a configuration. To fix this, please ensure that all following\n"
- " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack'.\n",
+ " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack', 'processing'.\n",
px->id, curagent->id, curagent->conf.file, curagent->conf.line);
}
if (curagent->var_pfx == NULL) {