MEDIUM: threads/queue: Make queues thread-safe

The list of pending connections are now protected using the proxy or server
lock, depending on the context.
diff --git a/src/queue.c b/src/queue.c
index 95b8eda..93d3e94 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -13,6 +13,7 @@
 #include <common/config.h>
 #include <common/memory.h>
 #include <common/time.h>
+#include <common/hathreads.h>
 
 #include <proto/queue.h>
 #include <proto/server.h>
@@ -23,6 +24,8 @@
 
 struct pool_head *pool2_pendconn;
 
+static void __pendconn_free(struct pendconn *p);
+
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_pendconn()
 {
@@ -116,7 +119,7 @@
 			ps = pp;
 	}
 	strm = ps->strm;
-	pendconn_free(ps);
+	__pendconn_free(ps);
 
 	/* we want to note that the stream has now been assigned a server */
 	strm->flags |= SF_ASSIGNED;
@@ -139,10 +142,12 @@
 	struct proxy  *p = s->proxy;
 	int maxconn;
 
+	SPIN_LOCK(PROXY_LOCK,  &p->lock);
+	SPIN_LOCK(SERVER_LOCK, &s->lock);
+
 	/* First, check if we can handle some connections queued at the proxy. We
 	 * will take as many as we can handle.
 	 */
-
 	maxconn = srv_dynamic_maxconn(s);
 	while (s->served < maxconn) {
 		struct stream *strm = pendconn_get_next_strm(s, p);
@@ -151,6 +156,8 @@
 			break;
 		task_wakeup(strm->task, TASK_WOKEN_RES);
 	}
+	SPIN_UNLOCK(SERVER_LOCK, &s->lock);
+	SPIN_UNLOCK(PROXY_LOCK,  &p->lock);
 }
 
 /* Adds the stream <strm> to the pending connection list of server <strm>->srv
@@ -163,6 +170,7 @@
 {
 	struct pendconn *p;
 	struct server *srv;
+	int count;
 
 	p = pool_alloc2(pool2_pendconn);
 	if (!p)
@@ -170,21 +178,26 @@
 
 	strm->pend_pos = p;
 	p->strm = strm;
-	p->srv = srv = objt_server(strm->target);
+	srv = objt_server(strm->target);
 
-	if (strm->flags & SF_ASSIGNED && srv) {
+	if ((strm->flags & SF_ASSIGNED) && srv) {
+		p->srv = srv;
+		SPIN_LOCK(SERVER_LOCK, &srv->lock);
 		LIST_ADDQ(&srv->pendconns, &p->list);
-		srv->nbpend++;
-		strm->logs.srv_queue_size += srv->nbpend;
-		if (srv->nbpend > srv->counters.nbpend_max)
-			srv->counters.nbpend_max = srv->nbpend;
+		SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
+		count = HA_ATOMIC_ADD(&srv->nbpend, 1);
+		strm->logs.srv_queue_size += count;
+		HA_ATOMIC_UPDATE_MAX(&srv->counters.nbpend_max, count);
 	} else {
+		p->srv = NULL;
+		SPIN_LOCK(PROXY_LOCK, &strm->be->lock);
 		LIST_ADDQ(&strm->be->pendconns, &p->list);
-		strm->be->nbpend++;
-		strm->logs.prx_queue_size += strm->be->nbpend;
-		HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, strm->be->nbpend);
+		SPIN_UNLOCK(PROXY_LOCK, &strm->be->lock);
+		count = HA_ATOMIC_ADD(&strm->be->nbpend, 1);
+		strm->logs.prx_queue_size += count;
+		HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, count);
 	}
-	strm->be->totpend++;
+	HA_ATOMIC_ADD(&strm->be->totpend, 1);
 	return p;
 }
 
@@ -196,6 +209,7 @@
 	struct pendconn *pc, *pc_bck;
 	int xferred = 0;
 
+	SPIN_LOCK(SERVER_LOCK, &s->lock);
 	list_for_each_entry_safe(pc, pc_bck, &s->pendconns, list) {
 		struct stream *strm = pc->strm;
 
@@ -208,11 +222,12 @@
 			/* it's left to the dispatcher to choose a server */
 			strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
 
-			pendconn_free(pc);
+			__pendconn_free(pc);
 			task_wakeup(strm->task, TASK_WOKEN_RES);
 			xferred++;
 		}
 	}
+	SPIN_UNLOCK(SERVER_LOCK, &s->lock);
 	return xferred;
 }
 
@@ -228,6 +243,7 @@
 	if (!srv_currently_usable(s))
 		return 0;
 
+	SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
 	for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) {
 		struct stream *strm;
 		struct pendconn *p;
@@ -237,9 +253,10 @@
 			break;
 		p->strm->target = &s->obj_type;
 		strm = p->strm;
-		pendconn_free(p);
+		__pendconn_free(p);
 		task_wakeup(strm->task, TASK_WOKEN_RES);
 	}
+	SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock);
 	return xferred;
 }
 
@@ -250,16 +267,38 @@
  */
 void pendconn_free(struct pendconn *p)
 {
-	LIST_DEL(&p->list);
+	if (p->srv) {
+		SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
+		LIST_DEL(&p->list);
+		SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
+		HA_ATOMIC_SUB(&p->srv->nbpend, 1);
+	}
+	else {
+		SPIN_LOCK(SERVER_LOCK, &p->strm->be->lock);
+		LIST_DEL(&p->list);
+		SPIN_UNLOCK(SERVER_LOCK, &p->strm->be->lock);
+		HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+	}
 	p->strm->pend_pos = NULL;
-	if (p->srv)
-		p->srv->nbpend--;
-	else
-		p->strm->be->nbpend--;
-	p->strm->be->totpend--;
+	HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
 	pool_free2(pool2_pendconn, p);
 }
 
+/* Lock-free version of pendconn_free. */
+static void __pendconn_free(struct pendconn *p)
+{
+	if (p->srv) {
+		LIST_DEL(&p->list);
+		HA_ATOMIC_SUB(&p->srv->nbpend, 1);
+	}
+	else {
+		LIST_DEL(&p->list);
+		HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+	}
+	p->strm->pend_pos = NULL;
+	HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
+	pool_free2(pool2_pendconn, p);
+}
 
 /*
  * Local variables: