MINOR: spoe: Support the async mode with several threads
A different engine-id is now generated for each thread. So, it is possible to
enable the async mode with several threads.
This patch may be backported to older versions.
(cherry picked from commit b1bb1afa4741a20e5bf954f0065ae7b747a3e219)
Signed-off-by: Christopher Faulet <cfaulet@haproxy.com>
diff --git a/include/types/spoe.h b/include/types/spoe.h
index 9bba492..2dbf6e5 100644
--- a/include/types/spoe.h
+++ b/include/types/spoe.h
@@ -244,7 +244,6 @@
} timeout;
/* Config info */
- char *engine_id; /* engine-id string */
char *var_pfx; /* Prefix used for vars set by the agent */
char *var_on_error; /* Variable to set when an error occurred, in the TXN scope */
char *var_t_process; /* Variable to set to report the processing time of the last event/group, in the TXN scope */
@@ -264,6 +263,7 @@
/* running info */
struct {
+ char *engine_id; /* engine-id string */
unsigned int frame_size; /* current maximum frame size, only used to encode messages */
unsigned int processing;
struct freq_ctr processing_per_sec;
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 3d2fcb1..f02ba45 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -172,7 +172,6 @@
free(agent->id);
free(agent->conf.file);
free(agent->var_pfx);
- free(agent->engine_id);
free(agent->var_on_error);
free(agent->var_t_process);
free(agent->var_t_total);
@@ -185,8 +184,10 @@
spoe_release_group(grp);
}
if (agent->rt) {
- for (i = 0; i < global.nbthread; ++i)
+ for (i = 0; i < global.nbthread; ++i) {
+ free(agent->rt[i].engine_id);
HA_SPIN_DESTROY(&agent->rt[i].lock);
+ }
}
free(agent->rt);
free(agent);
@@ -460,14 +461,14 @@
goto too_big;
/* (optionnal) "engine-id" K/V item, if present */
- if (agent != NULL && agent->engine_id != NULL) {
+ if (agent != NULL && agent->rt[tid].engine_id != NULL) {
sz = SLEN(ENGINE_ID_KEY);
if (spoe_encode_buffer(ENGINE_ID_KEY, sz, &p, end) == -1)
goto too_big;
*p++ = SPOE_DATA_T_STR;
- sz = strlen(agent->engine_id);
- if (spoe_encode_buffer(agent->engine_id, sz, &p, end) == -1)
+ sz = strlen(agent->rt[tid].engine_id);
+ if (spoe_encode_buffer(agent->rt[tid].engine_id, sz, &p, end) == -1)
goto too_big;
}
@@ -3089,16 +3090,13 @@
return 1;
}
- /* finish per-thread agent initialization */
- if (global.nbthread == 1)
- conf->agent->flags |= SPOE_FL_ASYNC;
-
if ((conf->agent->rt = calloc(global.nbthread, sizeof(*conf->agent->rt))) == NULL) {
ha_alert("Proxy %s : out of memory initializing SPOE agent '%s' declared at %s:%d.\n",
px->id, conf->agent->id, conf->agent->conf.file, conf->agent->conf.line);
return 1;
}
for (i = 0; i < global.nbthread; ++i) {
+ conf->agent->rt[i].engine_id = NULL;
conf->agent->rt[i].frame_size = conf->agent->max_frame_size;
conf->agent->rt[i].processing = 0;
LIST_INIT(&conf->agent->rt[i].applets);
@@ -3121,12 +3119,13 @@
struct spoe_config *conf = fconf->conf;
struct spoe_agent *agent = conf->agent;
- if (agent->engine_id == NULL) {
+ /* Use a != seed per process */
+ if (relative_pid > 1 && tid == 0)
srandom(now_ms * pid);
- agent->engine_id = generate_pseudo_uuid();
- if (agent->engine_id == NULL)
- return -1;
- }
+
+ agent->rt[tid].engine_id = generate_pseudo_uuid();
+ if (agent->rt[tid].engine_id == NULL)
+ return -1;
return 0;
}
@@ -3394,12 +3393,11 @@
curagent->timeout.idle = TICK_ETERNITY;
curagent->timeout.processing = TICK_ETERNITY;
- curagent->engine_id = NULL;
curagent->var_pfx = NULL;
curagent->var_on_error = NULL;
curagent->var_t_process = NULL;
curagent->var_t_total = NULL;
- curagent->flags = (SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION);
+ curagent->flags = (SPOE_FL_ASYNC | SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION);
curagent->cps_max = 0;
curagent->eps_max = 0;
curagent->max_frame_size = MAX_FRAME_SIZE;
@@ -3544,15 +3542,8 @@
goto out;
if (kwm == 1)
curagent->flags &= ~SPOE_FL_ASYNC;
- else {
- if (global.nbthread == 1)
- curagent->flags |= SPOE_FL_ASYNC;
- else {
- ha_warning("parsing [%s:%d] Async option is not supported with threads.\n",
- file, linenum);
- err_code |= ERR_WARN;
- }
- }
+ else
+ curagent->flags |= SPOE_FL_ASYNC;
goto out;
}
else if (!strcmp(args[1], "send-frag-payload")) {