MEDIUM: thread/vars: Make vars thread-safe
A RW lock has been added to the vars structure to protect each list of
variables. And a global RW lock is used to protect registered names.
When a varibable is fetched, we duplicate sample data because the variable could
be modified by another thread.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 1a14a62..df5b9ed 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -163,6 +163,7 @@
PATREF_LOCK,
PATEXP_LOCK,
PATLRU_LOCK,
+ VARS_LOCK,
LOCK_LABELS
};
struct lock_stat {
@@ -250,7 +251,7 @@
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
"APPLETS", "PEER", "BUF_WQ", "STREAMS", "SSL", "SSL_GEN_CERTS",
- "PATREF", "PATEXP", "PATLRU" };
+ "PATREF", "PATEXP", "PATLRU", "VARS" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
diff --git a/include/types/vars.h b/include/types/vars.h
index cd1620c..8a4f7aa 100644
--- a/include/types/vars.h
+++ b/include/types/vars.h
@@ -2,6 +2,7 @@
#define _TYPES_VARS_H
#include <common/mini-clist.h>
+#include <common/hathreads.h>
#include <types/sample.h>
@@ -17,6 +18,9 @@
struct list head;
enum vars_scope scope;
unsigned int size;
+#ifdef USE_THREAD
+ HA_RWLOCK_T rwlock;
+#endif
};
/* This struct describes a variable. */
diff --git a/src/vars.c b/src/vars.c
index 8cc0839..6e8e256 100644
--- a/src/vars.c
+++ b/src/vars.c
@@ -31,6 +31,11 @@
static unsigned int var_txn_limit = 0;
static unsigned int var_reqres_limit = 0;
+
+#ifdef USE_THREAD
+HA_RWLOCK_T var_names_rwlock;
+#endif
+
/* This function adds or remove memory size from the accounting. The inner
* pointers may be null when setting the outer ones only.
*/
@@ -39,17 +44,17 @@
switch (vars->scope) {
case SCOPE_REQ:
case SCOPE_RES:
- strm->vars_reqres.size += size;
+ HA_ATOMIC_ADD(&strm->vars_reqres.size, size);
/* fall through */
case SCOPE_TXN:
- strm->vars_txn.size += size;
+ HA_ATOMIC_ADD(&strm->vars_txn.size, size);
/* fall through */
case SCOPE_SESS:
- sess->vars.size += size;
+ HA_ATOMIC_ADD(&sess->vars.size, size);
/* fall through */
case SCOPE_PROC:
- global.vars.size += size;
- var_global_size += size;
+ HA_ATOMIC_ADD(&global.vars.size, size);
+ HA_ATOMIC_ADD(&var_global_size, size);
}
}
@@ -113,9 +118,11 @@
struct var *var, *tmp;
unsigned int size = 0;
+ RWLOCK_WRLOCK(VARS_LOCK, &vars->rwlock);
list_for_each_entry_safe(var, tmp, &vars->head, l) {
size += var_clear(var);
}
+ RWLOCK_WRUNLOCK(VARS_LOCK, &vars->rwlock);
var_accounting_diff(vars, sess, strm, -size);
}
@@ -127,12 +134,15 @@
struct var *var, *tmp;
unsigned int size = 0;
+ RWLOCK_WRLOCK(VARS_LOCK, &vars->rwlock);
list_for_each_entry_safe(var, tmp, &vars->head, l) {
size += var_clear(var);
}
- vars->size -= size;
- global.vars.size -= size;
- var_global_size -= size;
+ RWLOCK_WRUNLOCK(VARS_LOCK, &vars->rwlock);
+
+ HA_ATOMIC_SUB(&vars->size, size);
+ HA_ATOMIC_SUB(&global.vars.size, size);
+ HA_ATOMIC_SUB(&var_global_size, size);
}
/* This function init a list of variabes. */
@@ -141,6 +151,7 @@
LIST_INIT(&vars->head);
vars->scope = scope;
vars->size = 0;
+ RWLOCK_INIT(&vars->rwlock);
}
/* This function declares a new variable name. It returns a pointer
@@ -160,11 +171,13 @@
int i;
char **var_names2;
const char *tmp;
+ char *res = NULL;
/* Check length. */
if (len == 0) {
memprintf(err, "Empty variable name cannot be accepted");
- return NULL;
+ res = NULL;
+ goto end;
}
/* Check scope. */
@@ -196,29 +209,42 @@
else {
memprintf(err, "invalid variable name '%s'. A variable name must be start by its scope. "
"The scope can be 'proc', 'sess', 'txn', 'req' or 'res'", name);
- return NULL;
+ res = NULL;
+ goto end;
}
+ if (alloc)
+ RWLOCK_WRLOCK(VARS_LOCK, &var_names_rwlock);
+ else
+ RWLOCK_RDLOCK(VARS_LOCK, &var_names_rwlock);
+
+
/* Look for existing variable name. */
for (i = 0; i < var_names_nb; i++)
- if (strncmp(var_names[i], name, len) == 0 && var_names[i][len] == '\0')
- return var_names[i];
+ if (strncmp(var_names[i], name, len) == 0 && var_names[i][len] == '\0') {
+ res = var_names[i];
+ goto end;
+ }
- if (!alloc)
- return NULL;
+ if (!alloc) {
+ res = NULL;
+ goto end;
+ }
/* Store variable name. If realloc fails, var_names remains valid */
var_names2 = realloc(var_names, (var_names_nb + 1) * sizeof(*var_names));
if (!var_names2) {
memprintf(err, "out of memory error");
- return NULL;
+ res = NULL;
+ goto end;
}
var_names_nb++;
var_names = var_names2;
var_names[var_names_nb - 1] = malloc(len + 1);
if (!var_names[var_names_nb - 1]) {
memprintf(err, "out of memory error");
- return NULL;
+ res = NULL;
+ goto end;
}
memcpy(var_names[var_names_nb - 1], name, len);
var_names[var_names_nb - 1][len] = '\0';
@@ -228,13 +254,20 @@
while (*tmp) {
if (!isalnum((int)(unsigned char)*tmp) && *tmp != '_' && *tmp != '.') {
memprintf(err, "invalid syntax at char '%s'", tmp);
- return NULL;
+ res = NULL;
+ goto end;
}
tmp++;
}
+ res = var_names[var_names_nb - 1];
- /* Return the result. */
- return var_names[var_names_nb - 1];
+ end:
+ if (alloc)
+ RWLOCK_WRUNLOCK(VARS_LOCK, &var_names_rwlock);
+ else
+ RWLOCK_RDUNLOCK(VARS_LOCK, &var_names_rwlock);
+
+ return res;
}
/* This function returns an existing variable or returns NULL. */
@@ -278,15 +311,23 @@
}
if (vars->scope != var_desc->scope)
return 0;
+
+ RWLOCK_RDLOCK(VARS_LOCK, &vars->rwlock);
var = var_get(vars, var_desc->name);
/* check for the variable avalaibility */
- if (!var)
+ if (!var) {
+ RWLOCK_RDUNLOCK(VARS_LOCK, &vars->rwlock);
return 0;
+ }
- /* Copy sample. */
+ /* Duplicate the sample data because it could modified by another
+ * thread */
smp->data = var->data;
+ smp_dup(smp);
smp->flags |= SMP_F_CONST;
+
+ RWLOCK_RDUNLOCK(VARS_LOCK, &vars->rwlock);
return 1;
}
@@ -384,6 +425,7 @@
static inline int sample_store_stream(const char *name, enum vars_scope scope, struct sample *smp)
{
struct vars *vars;
+ int ret;
switch (scope) {
case SCOPE_PROC: vars = &global.vars; break;
@@ -395,7 +437,11 @@
}
if (vars->scope != scope)
return 0;
- return sample_store(vars, name, smp);
+
+ RWLOCK_WRLOCK(VARS_LOCK, &vars->rwlock);
+ ret = sample_store(vars, name, smp);
+ RWLOCK_WRUNLOCK(VARS_LOCK, &vars->rwlock);
+ return ret;
}
/* Returns 0 if fails, else returns 1. Note that stream may be null for SCOPE_SESS. */
@@ -417,11 +463,13 @@
return 0;
/* Look for existing variable name. */
+ RWLOCK_WRLOCK(VARS_LOCK, &vars->rwlock);
var = var_get(vars, name);
if (var) {
size = var_clear(var);
var_accounting_diff(vars, smp->sess, smp->strm, -size);
}
+ RWLOCK_WRUNLOCK(VARS_LOCK, &vars->rwlock);
return 1;
}
@@ -853,7 +901,7 @@
}};
__attribute__((constructor))
-static void __http_protocol_init(void)
+static void __vars_init(void)
{
var_pool = create_pool("vars", sizeof(struct var), MEM_F_SHARED);
@@ -865,4 +913,6 @@
http_req_keywords_register(&http_req_kws);
http_res_keywords_register(&http_res_kws);
cfg_register_keywords(&cfg_kws);
+
+ RWLOCK_INIT(&var_names_rwlock);
}