MEDIUM: threads/queue: Make queues thread-safe
The list of pending connections are now protected using the proxy or server
lock, depending on the context.
diff --git a/src/queue.c b/src/queue.c
index 95b8eda..93d3e94 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -13,6 +13,7 @@
#include <common/config.h>
#include <common/memory.h>
#include <common/time.h>
+#include <common/hathreads.h>
#include <proto/queue.h>
#include <proto/server.h>
@@ -23,6 +24,8 @@
struct pool_head *pool2_pendconn;
+static void __pendconn_free(struct pendconn *p);
+
/* perform minimal intializations, report 0 in case of error, 1 if OK. */
int init_pendconn()
{
@@ -116,7 +119,7 @@
ps = pp;
}
strm = ps->strm;
- pendconn_free(ps);
+ __pendconn_free(ps);
/* we want to note that the stream has now been assigned a server */
strm->flags |= SF_ASSIGNED;
@@ -139,10 +142,12 @@
struct proxy *p = s->proxy;
int maxconn;
+ SPIN_LOCK(PROXY_LOCK, &p->lock);
+ SPIN_LOCK(SERVER_LOCK, &s->lock);
+
/* First, check if we can handle some connections queued at the proxy. We
* will take as many as we can handle.
*/
-
maxconn = srv_dynamic_maxconn(s);
while (s->served < maxconn) {
struct stream *strm = pendconn_get_next_strm(s, p);
@@ -151,6 +156,8 @@
break;
task_wakeup(strm->task, TASK_WOKEN_RES);
}
+ SPIN_UNLOCK(SERVER_LOCK, &s->lock);
+ SPIN_UNLOCK(PROXY_LOCK, &p->lock);
}
/* Adds the stream <strm> to the pending connection list of server <strm>->srv
@@ -163,6 +170,7 @@
{
struct pendconn *p;
struct server *srv;
+ int count;
p = pool_alloc2(pool2_pendconn);
if (!p)
@@ -170,21 +178,26 @@
strm->pend_pos = p;
p->strm = strm;
- p->srv = srv = objt_server(strm->target);
+ srv = objt_server(strm->target);
- if (strm->flags & SF_ASSIGNED && srv) {
+ if ((strm->flags & SF_ASSIGNED) && srv) {
+ p->srv = srv;
+ SPIN_LOCK(SERVER_LOCK, &srv->lock);
LIST_ADDQ(&srv->pendconns, &p->list);
- srv->nbpend++;
- strm->logs.srv_queue_size += srv->nbpend;
- if (srv->nbpend > srv->counters.nbpend_max)
- srv->counters.nbpend_max = srv->nbpend;
+ SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
+ count = HA_ATOMIC_ADD(&srv->nbpend, 1);
+ strm->logs.srv_queue_size += count;
+ HA_ATOMIC_UPDATE_MAX(&srv->counters.nbpend_max, count);
} else {
+ p->srv = NULL;
+ SPIN_LOCK(PROXY_LOCK, &strm->be->lock);
LIST_ADDQ(&strm->be->pendconns, &p->list);
- strm->be->nbpend++;
- strm->logs.prx_queue_size += strm->be->nbpend;
- HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, strm->be->nbpend);
+ SPIN_UNLOCK(PROXY_LOCK, &strm->be->lock);
+ count = HA_ATOMIC_ADD(&strm->be->nbpend, 1);
+ strm->logs.prx_queue_size += count;
+ HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, count);
}
- strm->be->totpend++;
+ HA_ATOMIC_ADD(&strm->be->totpend, 1);
return p;
}
@@ -196,6 +209,7 @@
struct pendconn *pc, *pc_bck;
int xferred = 0;
+ SPIN_LOCK(SERVER_LOCK, &s->lock);
list_for_each_entry_safe(pc, pc_bck, &s->pendconns, list) {
struct stream *strm = pc->strm;
@@ -208,11 +222,12 @@
/* it's left to the dispatcher to choose a server */
strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
- pendconn_free(pc);
+ __pendconn_free(pc);
task_wakeup(strm->task, TASK_WOKEN_RES);
xferred++;
}
}
+ SPIN_UNLOCK(SERVER_LOCK, &s->lock);
return xferred;
}
@@ -228,6 +243,7 @@
if (!srv_currently_usable(s))
return 0;
+ SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) {
struct stream *strm;
struct pendconn *p;
@@ -237,9 +253,10 @@
break;
p->strm->target = &s->obj_type;
strm = p->strm;
- pendconn_free(p);
+ __pendconn_free(p);
task_wakeup(strm->task, TASK_WOKEN_RES);
}
+ SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock);
return xferred;
}
@@ -250,16 +267,38 @@
*/
void pendconn_free(struct pendconn *p)
{
- LIST_DEL(&p->list);
+ if (p->srv) {
+ SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
+ LIST_DEL(&p->list);
+ SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
+ HA_ATOMIC_SUB(&p->srv->nbpend, 1);
+ }
+ else {
+ SPIN_LOCK(SERVER_LOCK, &p->strm->be->lock);
+ LIST_DEL(&p->list);
+ SPIN_UNLOCK(SERVER_LOCK, &p->strm->be->lock);
+ HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+ }
p->strm->pend_pos = NULL;
- if (p->srv)
- p->srv->nbpend--;
- else
- p->strm->be->nbpend--;
- p->strm->be->totpend--;
+ HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
pool_free2(pool2_pendconn, p);
}
+/* Lock-free version of pendconn_free. */
+static void __pendconn_free(struct pendconn *p)
+{
+ if (p->srv) {
+ LIST_DEL(&p->list);
+ HA_ATOMIC_SUB(&p->srv->nbpend, 1);
+ }
+ else {
+ LIST_DEL(&p->list);
+ HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+ }
+ p->strm->pend_pos = NULL;
+ HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
+ pool_free2(pool2_pendconn, p);
+}
/*
* Local variables: