MINOR: streams: use one list per stream instead of a global one
The global streams list is exclusively used for "show sess", to look up
a stream to shut down, and for the hard-stop. Having all of them in a
single list is extremely expensive in terms of locking when using threads,
with performance losses as high as 7% having been observed just due to
this.
This patch makes the list per-thread, since there's no need to have a
global one in this situation. All call places just iterate over all
threads. The most "invasive" changes was in "show sess" where the end
of list needs to go back to the beginning of next thread's list until
the last thread is seen. For now the lock was maintained to keep the
code auditable but a next commit should get rid of it.
The observed performance gain here with only 4 threads is already 7%
(350krps -> 374krps).
(cherry picked from commit a698eb6739b502d957da773e121ef40833f818dc)
Signed-off-by: Willy Tarreau <w@1wt.eu>
diff --git a/src/proxy.c b/src/proxy.c
index 861548e..1c746c8 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -1219,9 +1219,13 @@
}
thread_isolate();
- list_for_each_entry(s, &streams, list) {
- stream_shutdown(s, SF_ERR_KILLED);
+
+ for (thr = 0; thr < global.nbthread; thr++) {
+ list_for_each_entry(s, &ha_thread_info[thr].streams, list) {
+ stream_shutdown(s, SF_ERR_KILLED);
+ }
}
+
thread_release();
killed = 1;
diff --git a/src/stream.c b/src/stream.c
index b2ed69a..55bb5ce 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -65,8 +65,6 @@
/* incremented by each "show sess" to fix a delimiter between streams */
unsigned stream_epoch = 0;
-struct list streams = LIST_HEAD_INIT(streams);
-__decl_spinlock(streams_lock);
/* List of all use-service keywords. */
static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
@@ -513,9 +511,9 @@
s->dns_ctx.hostname_dn_len = 0;
s->dns_ctx.parent = NULL;
- HA_SPIN_LOCK(STRMS_LOCK, &streams_lock);
- LIST_ADDQ(&streams, &s->list);
- HA_SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
+ HA_SPIN_LOCK(STRMS_LOCK, &ti->streams_lock);
+ LIST_ADDQ(&ti->streams, &s->list);
+ HA_SPIN_UNLOCK(STRMS_LOCK, &ti->streams_lock);
if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
goto out_fail_accept;
@@ -673,19 +671,19 @@
stream_store_counters(s);
- HA_SPIN_LOCK(STRMS_LOCK, &streams_lock);
+ HA_SPIN_LOCK(STRMS_LOCK, &ti->streams_lock);
list_for_each_entry_safe(bref, back, &s->back_refs, users) {
/* we have to unlink all watchers. We must not relink them if
* this stream was the last one in the list.
*/
LIST_DEL(&bref->users);
LIST_INIT(&bref->users);
- if (s->list.n != &streams)
+ if (s->list.n != &ti->streams)
LIST_ADDQ(&LIST_ELEM(s->list.n, struct stream *, list)->back_refs, &bref->users);
bref->ref = s->list.n;
}
LIST_DEL(&s->list);
- HA_SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
+ HA_SPIN_UNLOCK(STRMS_LOCK, &ti->streams_lock);
/* applets do not release session yet */
must_free_sess = objt_appctx(sess->origin) && sess->origin == s->si[0].end;
@@ -2703,6 +2701,16 @@
abort();
}
+/* initialize the require structures */
+static void init_stream()
+{
+ int thr;
+
+ for (thr = 0; thr < MAX_THREADS; thr++)
+ LIST_INIT(&ha_thread_info[thr].streams);
+}
+INITCALL0(STG_INIT, init_stream);
+
/* Generates a unique ID based on the given <format>, stores it in the given <strm> and
* returns the unique ID.
@@ -3151,6 +3159,7 @@
appctx->ctx.sess.target = NULL;
appctx->ctx.sess.section = 0; /* start with stream status */
appctx->ctx.sess.pos = 0;
+ appctx->ctx.sess.thr = 0;
/* let's set our own stream's epoch to the current one and increment
* it so that we know which streams were already there before us.
@@ -3196,7 +3205,7 @@
* pointer points back to the head of the streams list.
*/
LIST_INIT(&appctx->ctx.sess.bref.users);
- appctx->ctx.sess.bref.ref = streams.n;
+ appctx->ctx.sess.bref.ref = ha_thread_info[appctx->ctx.sess.thr].streams.n;
appctx->st2 = STAT_ST_LIST;
/* fall through */
@@ -3208,15 +3217,27 @@
}
/* and start from where we stopped */
- while (appctx->ctx.sess.bref.ref != &streams) {
+ while (1) {
char pn[INET6_ADDRSTRLEN];
struct stream *curr_strm;
+ int done= 0;
- curr_strm = LIST_ELEM(appctx->ctx.sess.bref.ref, struct stream *, list);
+ if (appctx->ctx.sess.bref.ref == &ha_thread_info[appctx->ctx.sess.thr].streams)
+ done = 1;
+ else {
+ /* check if we've found a stream created after issuing the "show sess" */
+ curr_strm = LIST_ELEM(appctx->ctx.sess.bref.ref, struct stream *, list);
+ if ((int)(curr_strm->stream_epoch - si_strm(appctx->owner)->stream_epoch) > 0)
+ done = 1;
+ }
- /* check if we've found a stream created after issuing the "show sess" */
- if ((int)(curr_strm->stream_epoch - si_strm(appctx->owner)->stream_epoch) > 0)
- break;
+ if (done) {
+ appctx->ctx.sess.thr++;
+ if (appctx->ctx.sess.thr >= global.nbthread)
+ break;
+ appctx->ctx.sess.bref.ref = ha_thread_info[appctx->ctx.sess.thr].streams.n;
+ continue;
+ }
if (appctx->ctx.sess.target) {
if (appctx->ctx.sess.target != (void *)-1 && appctx->ctx.sess.target != curr_strm)
@@ -3389,11 +3410,11 @@
static void cli_release_show_sess(struct appctx *appctx)
{
- if (appctx->st2 == STAT_ST_LIST) {
- HA_SPIN_LOCK(STRMS_LOCK, &streams_lock);
+ if (appctx->st2 == STAT_ST_LIST && appctx->ctx.sess.thr < global.nbthread) {
+ HA_SPIN_LOCK(STRMS_LOCK, &ha_thread_info[appctx->ctx.sess.thr].streams_lock);
if (!LIST_ISEMPTY(&appctx->ctx.sess.bref.users))
LIST_DEL(&appctx->ctx.sess.bref.users);
- HA_SPIN_UNLOCK(STRMS_LOCK, &streams_lock);
+ HA_SPIN_UNLOCK(STRMS_LOCK, &ha_thread_info[appctx->ctx.sess.thr].streams_lock);
}
}
@@ -3401,6 +3422,7 @@
static int cli_parse_shutdown_session(char **args, char *payload, struct appctx *appctx, void *private)
{
struct stream *strm, *ptr;
+ int thr;
if (!cli_has_level(appctx, ACCESS_LVL_ADMIN))
return 1;
@@ -3409,21 +3431,24 @@
return cli_err(appctx, "Session pointer expected (use 'show sess').\n");
ptr = (void *)strtoul(args[2], NULL, 0);
+ strm = NULL;
thread_isolate();
/* first, look for the requested stream in the stream table */
- list_for_each_entry(strm, &streams, list) {
- if (strm == ptr) {
- stream_shutdown(strm, SF_ERR_KILLED);
- break;
+ for (thr = 0; !strm && thr < global.nbthread; thr++) {
+ list_for_each_entry(strm, &ha_thread_info[thr].streams, list) {
+ if (strm == ptr) {
+ stream_shutdown(strm, SF_ERR_KILLED);
+ break;
+ }
}
}
thread_release();
/* do we have the stream ? */
- if (strm != ptr)
+ if (!strm)
return cli_err(appctx, "No such session (use 'show sess').\n");
return 1;