MEDIUM: threads/stream: Make streams list thread safe
Adds a global lock to protect the full streams list used to dump
sessions on stats socket.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 1717cc9..81c2aad 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -157,6 +157,7 @@
APPLETS_LOCK,
PEER_LOCK,
BUF_WQ_LOCK,
+ STRMS_LOCK,
LOCK_LABELS
};
struct lock_stat {
@@ -243,7 +244,7 @@
"TASK_RQ", "TASK_WQ", "POOL",
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
- "APPLETS", "PEER", "BUF_WQ" };
+ "APPLETS", "PEER", "BUF_WQ", "STREAMS" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
@@ -524,7 +525,6 @@
l->info.last_location.line = line;
__RWLOCK_WRUNLOCK(&l->lock);
-
HA_ATOMIC_ADD(&lock_stats[lbl].num_write_unlocked, 1);
}
diff --git a/src/stream.c b/src/stream.c
index 51d2354..889908f 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -63,6 +63,9 @@
struct pool_head *pool2_stream;
struct list streams;
+#ifdef USE_THREAD
+HA_SPINLOCK_T streams_lock;
+#endif
/* List of all use-service keywords. */
static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
@@ -154,7 +157,6 @@
s->uniq_id = global.req_count++;
/* OK, we're keeping the stream, so let's properly initialize the stream */
- LIST_ADDQ(&streams, &s->list);
LIST_INIT(&s->back_refs);
LIST_INIT(&s->buffer_wait.list);
@@ -251,6 +253,10 @@
s->txn = NULL;
s->hlua = NULL;
+ SPIN_LOCK(STRMS_LOCK, &streams_lock);
+ LIST_ADDQ(&streams, &s->list);
+ SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
+
if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
goto out_fail_accept;
@@ -369,6 +375,7 @@
stream_store_counters(s);
+ SPIN_LOCK(STRMS_LOCK, &streams_lock);
list_for_each_entry_safe(bref, back, &s->back_refs, users) {
/* we have to unlink all watchers. We must not relink them if
* this stream was the last one in the list.
@@ -380,6 +387,8 @@
bref->ref = s->list.n;
}
LIST_DEL(&s->list);
+ SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
+
si_release_endpoint(&s->si[1]);
si_release_endpoint(&s->si[0]);
@@ -462,6 +471,7 @@
int init_stream()
{
LIST_INIT(&streams);
+ SPIN_INIT(&streams_lock);
pool2_stream = create_pool("stream", sizeof(struct stream), MEM_F_SHARED);
return pool2_stream != NULL;
}
@@ -3039,11 +3049,14 @@
* pointer points back to the head of the streams list.
*/
LIST_INIT(&appctx->ctx.sess.bref.users);
+ SPIN_LOCK(STRMS_LOCK, &streams_lock);
appctx->ctx.sess.bref.ref = streams.n;
+ SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
appctx->st2 = STAT_ST_LIST;
/* fall through */
case STAT_ST_LIST:
+ SPIN_LOCK(STRMS_LOCK, &streams_lock);
/* first, let's detach the back-ref from a possible previous stream */
if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users)) {
LIST_DEL(&appctx->ctx.sess.bref.users);
@@ -3063,8 +3076,10 @@
LIST_ADDQ(&curr_strm->back_refs, &appctx->ctx.sess.bref.users);
/* call the proper dump() function and return if we're missing space */
- if (!stats_dump_full_strm_to_buffer(si, curr_strm))
+ if (!stats_dump_full_strm_to_buffer(si, curr_strm)) {
+ SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
return 0;
+ }
/* stream dump complete */
LIST_DEL(&appctx->ctx.sess.bref.users);
@@ -3190,6 +3205,7 @@
*/
si_applet_cant_put(si);
LIST_ADDQ(&curr_strm->back_refs, &appctx->ctx.sess.bref.users);
+ SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
return 0;
}
@@ -3211,9 +3227,11 @@
appctx->ctx.sess.target = NULL;
appctx->ctx.sess.uid = 0;
+ SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
return 1;
}
+ SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
appctx->st2 = STAT_ST_FIN;
/* fall through */
@@ -3226,8 +3244,10 @@
static void cli_release_show_sess(struct appctx *appctx)
{
if (appctx->st2 == STAT_ST_LIST) {
+ SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users))
LIST_DEL(&appctx->ctx.sess.bref.users);
+ SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
}
}