BUG/MINOR: event_hdl: make event_hdl_subscribe thread-safe
List insertion in event_hdl_subscribe() was not thread-safe when dealing
with unique identifiers. Indeed, in this case the list insertion is
conditional (we check for a duplicate, then we insert). And while we're
using mt lists for this, the whole operation is not atomic: there is a
race between the check and the insertion.
This could lead to the same ID being registered multiple times with
concurrent calls to event_hdl_subscribe() on the same ID.
To fix this, we add 'insert_lock' dedicated lock in the subscription
list struct. The lock's cost is nearly 0 since it is only used when
registering identified subscriptions and the lock window is very short:
we only guard the duplicate check and the list insertion to make the
conditional insertion "atomic" within a given subscription list.
This is the only place where we need the lock: as soon as the item is
properly inserted we're out of trouble because all other operations on
the list are already thread-safe thanks to mt lists.
A new lock hint is introduced: LOCK_EHDL which is dedicated to event_hdl
The patch may seem quite large since we had to rework the logic around
the subscribe function and switch from simple mt_list to a dedicated
struct wrapping both the mt_list and the insert_lock for the
event_hdl_sub_list type.
(sizeof(event_hdl_sub_list) is now 24 instead of 16)
However, all the changes are internal: we don't break the API.
If 68e692da0 ("MINOR: event_hdl: add event handler base api")
is being backported, then this commit should be backported with it.
diff --git a/src/event_hdl.c b/src/event_hdl.c
index 6444386..86fcbf5 100644
--- a/src/event_hdl.c
+++ b/src/event_hdl.c
@@ -43,14 +43,20 @@
DECLARE_STATIC_POOL(pool_head_sub_event_data, "ehdl_sub_ed", sizeof(struct event_hdl_async_event_data));
DECLARE_STATIC_POOL(pool_head_sub_taskctx, "ehdl_sub_tctx", sizeof(struct event_hdl_async_task_default_ctx));
-/* global subscription list (implicit where NULL is used as sublist argument) */
-static struct mt_list global_event_hdl_sub_list = MT_LIST_HEAD_INIT(global_event_hdl_sub_list);
-
/* TODO: will become a config tunable
* ie: tune.events.max-async-notif-at-once
*/
static int event_hdl_async_max_notif_at_once = 10;
+/* global subscription list (implicit where NULL is used as sublist argument) */
+static event_hdl_sub_list global_event_hdl_sub_list;
+
+static void event_hdl_init(void)
+{
+ /* initialize global subscription list */
+ event_hdl_sub_list_init(&global_event_hdl_sub_list);
+}
+
/* general purpose hashing function when you want to compute
* an ID based on <scope> x <name>
* It is your responsibility to make sure <scope> is not used
@@ -372,10 +378,9 @@
struct event_hdl_sub *event_hdl_subscribe_ptr(event_hdl_sub_list *sub_list,
struct event_hdl_sub_type e_type, struct event_hdl hdl)
{
- struct event_hdl_sub *new_sub;
+ struct event_hdl_sub *new_sub = NULL;
struct mt_list *elt1, elt2;
- uint8_t found = 0;
- struct event_hdl_async_task_default_ctx *task_ctx;
+ struct event_hdl_async_task_default_ctx *task_ctx = NULL;
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
@@ -387,26 +392,9 @@
(hdl.async == EVENT_HDL_ASYNC_MODE_ADVANCED &&
(!hdl.async_equeue || !hdl.async_task)));
- /* first check if such identified hdl is not already registered */
- if (hdl.id) {
- mt_list_for_each_entry_safe(new_sub, sub_list, mt_list, elt1, elt2) {
- if (hdl.id == new_sub->hdl.id) {
- /* we found matching registered hdl */
- found = 1;
- break;
- }
- }
- }
-
- if (found) {
- /* error already registered */
- event_hdl_report_hdl_state(ha_warning, &hdl, "SUB", "could not subscribe: subscription with this id already exists");
- return NULL;
- }
-
new_sub = pool_alloc(pool_head_sub);
if (new_sub == NULL) {
- goto new_sub_memory_error;
+ goto memory_error;
}
/* assignments */
@@ -419,7 +407,7 @@
new_sub->async_end = pool_alloc(pool_head_sub_event);
if (!new_sub->async_end) {
/* memory error */
- goto new_sub_memory_error_event_end;
+ goto memory_error;
}
if (hdl.async == EVENT_HDL_ASYNC_MODE_NORMAL) {
/* normal mode: no task provided, we must initialize it */
@@ -429,7 +417,7 @@
if (!task_ctx) {
/* memory error */
- goto new_sub_memory_error_task_ctx;
+ goto memory_error;
}
MT_LIST_INIT(&task_ctx->e_queue);
task_ctx->func = new_sub->hdl.async_ptr;
@@ -439,13 +427,11 @@
if (!new_sub->hdl.async_task) {
/* memory error */
- goto new_sub_memory_error_task;
+ goto memory_error;
}
new_sub->hdl.async_task->context = task_ctx;
new_sub->hdl.async_task->process = event_hdl_async_task_default;
}
- /* registration cannot fail anymore */
-
/* initialize END event (used to notify about subscription ending)
* used by both normal and advanced mode:
* - to safely terminate the task in normal mode
@@ -464,32 +450,65 @@
*/
new_sub->refcount = 2;
+ /* ready for registration */
+ MT_LIST_INIT(&new_sub->mt_list);
+
+ /* check if such identified hdl is not already registered */
+ if (hdl.id) {
+ struct event_hdl_sub *cur_sub;
+ uint8_t found = 0;
+
+ HA_SPIN_LOCK(EHDL_LOCK, &sub_list->insert_lock);
+ mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
+ if (hdl.id == cur_sub->hdl.id) {
+ /* we found matching registered hdl */
+ found = 1;
+ break;
+ }
+ }
+ if (found) {
+ /* error already registered */
+ HA_SPIN_UNLOCK(EHDL_LOCK, &sub_list->insert_lock);
+ event_hdl_report_hdl_state(ha_alert, &hdl, "SUB", "could not subscribe: subscription with this id already exists");
+ goto cleanup;
+ }
+ }
+
/* Append in list (global or user specified list).
* For now, append when sync mode, and insert when async mode
* so that async handlers are executed first
*/
- MT_LIST_INIT(&new_sub->mt_list);
if (hdl.async) {
/* async mode, insert at the beginning of the list */
- MT_LIST_INSERT(sub_list, &new_sub->mt_list);
+ MT_LIST_INSERT(&sub_list->head, &new_sub->mt_list);
} else {
/* sync mode, append at the end of the list */
- MT_LIST_APPEND(sub_list, &new_sub->mt_list);
+ MT_LIST_APPEND(&sub_list->head, &new_sub->mt_list);
}
- return new_sub;
+ if (hdl.id)
+ HA_SPIN_UNLOCK(EHDL_LOCK, &sub_list->insert_lock);
-new_sub_memory_error_task:
- pool_free(pool_head_sub_taskctx, task_ctx);
-new_sub_memory_error_task_ctx:
- pool_free(pool_head_sub_event, new_sub->async_end);
-new_sub_memory_error_event_end:
- pool_free(pool_head_sub, new_sub);
-new_sub_memory_error:
+ return new_sub;
- event_hdl_report_hdl_state(ha_warning, &hdl, "SUB", "could not register subscription due to memory error");
+ cleanup:
+ if (new_sub) {
+ if (hdl.async == EVENT_HDL_ASYNC_MODE_NORMAL) {
+ if (new_sub->hdl.async_task)
+ tasklet_free(new_sub->hdl.async_task);
+ if (task_ctx)
+ pool_free(pool_head_sub_taskctx, task_ctx);
+ }
+ if (hdl.async)
+ pool_free(pool_head_sub_event, new_sub->async_end);
+ pool_free(pool_head_sub, new_sub);
+ }
return NULL;
+
+ memory_error:
+ event_hdl_report_hdl_state(ha_warning, &hdl, "SUB", "could not register subscription due to memory error");
+ goto cleanup;
}
void event_hdl_take(struct event_hdl_sub *sub)
@@ -547,7 +566,7 @@
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
- mt_list_for_each_entry_safe(del_sub, sub_list, mt_list, elt1, elt2) {
+ mt_list_for_each_entry_safe(del_sub, &sub_list->head, mt_list, elt1, elt2) {
if (lookup_id == del_sub->hdl.id) {
/* we found matching registered hdl */
MT_LIST_DELETE_SAFE(elt1);
@@ -569,7 +588,7 @@
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
- mt_list_for_each_entry_safe(cur_sub, sub_list, mt_list, elt1, elt2) {
+ mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
if (lookup_id == cur_sub->hdl.id) {
/* we found matching registered hdl */
status = _event_hdl_resub(cur_sub, type);
@@ -589,7 +608,7 @@
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
- mt_list_for_each_entry_safe(cur_sub, sub_list, mt_list, elt1, elt2) {
+ mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
if (lookup_id == cur_sub->hdl.id) {
/* we found matching registered hdl */
event_hdl_take(cur_sub);
@@ -612,7 +631,7 @@
struct event_hdl_async_event_data *async_data = NULL; /* reuse async data for multiple async hdls */
int error = 0;
- mt_list_for_each_entry_safe(cur_sub, sub_list, mt_list, elt1, elt2) {
+ mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
/* notify each function that has subscribed to sub_family.type */
if ((cur_sub->sub.family == e_type.family) &&
((cur_sub->sub.subtype & e_type.subtype) == e_type.subtype)) {
@@ -751,10 +770,13 @@
if (!sub_list)
sub_list = &global_event_hdl_sub_list; /* fall back to global list */
- mt_list_for_each_entry_safe(cur_sub, sub_list, mt_list, elt1, elt2) {
+ mt_list_for_each_entry_safe(cur_sub, &sub_list->head, mt_list, elt1, elt2) {
/* remove cur elem from list */
MT_LIST_DELETE_SAFE(elt1);
/* then free it */
_event_hdl_unsubscribe(cur_sub);
}
+ HA_SPIN_DESTROY(&sub_list->insert_lock);
}
+
+INITCALL0(STG_INIT, event_hdl_init);