BUG/MAJOR: threads/queue: Fix thread-safety issues on the queues management

The management of the servers and the proxies queues was not thread-safe at
all. First, the accesses to <strm>->pend_pos were not protected. So it was
possible to release it on a thread (for instance because the stream is released)
and to use it in same time on another one (because we redispatch pending
connections for a server). Then, the accesses to stream's information (flags and
target) from anywhere is forbidden. To be safe, The stream's state must always
be updated in the context of process_stream.

So to fix these issues, the queue module has been refactored. A lock has been
added in the pendconn structure. And now, when we try to dequeue a pending
connection, we start by unlinking it from the server/proxy queue and we wake up
the stream. Then, it is the stream reponsibility to really dequeue it (or
release it). This way, we are sure that only the stream can create and release
its <pend_pos> field.

However, be careful. This new implementation should be thread-safe
(hopefully...). But it is not optimal and in some situations, it could be really
slower in multi-threaded mode than in single-threaded one. The problem is that,
when we try to dequeue pending connections, we process it from the older one to
the newer one independently to the thread's affinity. So we need to wait the
other threads' wakeup to really process them. If threads are blocked in the
poller, this will add a significant latency. This problem happens when maxconn
values are very low.

This patch must be backported in 1.8.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 30009cc..2620b77 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -292,6 +292,7 @@
 	PIPES_LOCK,
 	START_LOCK,
 	TLSKEYS_REF_LOCK,
+	PENDCONN_LOCK,
 	LOCK_LABELS
 };
 struct lock_stat {
@@ -409,6 +410,7 @@
 	case PIPES_LOCK:           return "PIPES";
 	case START_LOCK:           return "START";
 	case TLSKEYS_REF_LOCK:     return "TLSKEYS_REF";
+	case PENDCONN_LOCK:        return "PENDCONN";
 	case LOCK_LABELS:          break; /* keep compiler happy */
 	};
 	/* only way to come here is consecutive to an internal bug */
diff --git a/include/proto/queue.h b/include/proto/queue.h
index f66d809..2d4773a 100644
--- a/include/proto/queue.h
+++ b/include/proto/queue.h
@@ -38,6 +38,7 @@
 
 int init_pendconn();
 struct pendconn *pendconn_add(struct stream *strm);
+int pendconn_dequeue(struct stream *strm);
 void pendconn_free(struct pendconn *p);
 void process_srv_queue(struct server *s);
 unsigned int srv_dynamic_maxconn(const struct server *s);
diff --git a/include/types/queue.h b/include/types/queue.h
index 4b35451..42dbbd0 100644
--- a/include/types/queue.h
+++ b/include/types/queue.h
@@ -24,15 +24,19 @@
 
 #include <common/config.h>
 #include <common/mini-clist.h>
+#include <common/hathreads.h>
 
 #include <types/server.h>
 
 struct stream;
 
 struct pendconn {
-	struct list list;		/* chaining ... */
-	struct stream *strm;		/* the stream waiting for a connection */
-	struct server *srv;		/* the server we are waiting for */
+	int            strm_flags; /* stream flags */
+	struct stream *strm;
+	struct proxy  *px;
+	struct server *srv;        /* the server we are waiting for, may be NULL */
+	struct list    list;       /* next pendconn */
+	__decl_hathreads(HA_SPINLOCK_T lock);
 };
 
 #endif /* _TYPES_QUEUE_H */
diff --git a/include/types/stream.h b/include/types/stream.h
index 227b0ff..0dbc79f 100644
--- a/include/types/stream.h
+++ b/include/types/stream.h
@@ -124,7 +124,7 @@
 	struct session *sess;           /* the session this stream is attached to */
 
 	struct server *srv_conn;        /* stream already has a slot on a server and is not in queue */
-	struct pendconn *pend_pos;      /* if not NULL, points to the position in the pending queue */
+	struct pendconn *pend_pos;      /* if not NULL, points to the pending position in the pending queue */
 
 	struct http_txn *txn;           /* current HTTP transaction being processed. Should become a list. */
 
diff --git a/src/proto_http.c b/src/proto_http.c
index ae582b3..80e001d 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -8253,8 +8253,6 @@
 	s->store_count = 0;
 	s->uniq_id = global.req_count++;
 
-	s->pend_pos = NULL;
-
 	s->req.flags |= CF_READ_DONTWAIT; /* one read is usually enough */
 
 	/* We must trim any excess data from the response buffer, because we
diff --git a/src/queue.c b/src/queue.c
index 1dea7d5..aa40fba 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -24,8 +24,6 @@
 
 struct pool_head *pool_head_pendconn;
 
-static void __pendconn_free(struct pendconn *p);
-
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_pendconn()
 {
@@ -63,78 +61,99 @@
 	return max;
 }
 
-
-/* Returns the first pending connection for server <s>, which may be NULL if
- * nothing is pending.
+/* Remove the pendconn from the server/proxy queue. At this stage, the
+ * connection is not really dequeued. It will be done during the
+ * process_stream. This function must be called by function owning the locks on
+ * the pendconn _AND_ the server/proxy. It also decreases the pending count.
+ *
+ * The caller must own the lock on the pendconn _AND_ the queue containing the
+ * pendconn. The pendconn must still be queued.
  */
-static inline struct pendconn *pendconn_from_srv(const struct server *s) {
-	if (!s->nbpend)
-		return NULL;
-	return LIST_ELEM(s->pendconns.n, struct pendconn *, list);
-}
-
-/* Returns the first pending connection for proxy <px>, which may be NULL if
- * nothing is pending.
- */
-static inline struct pendconn *pendconn_from_px(const struct proxy *px) {
-	if (!px->nbpend)
-		return NULL;
-
-	return LIST_ELEM(px->pendconns.n, struct pendconn *, list);
+static void pendconn_unlink(struct pendconn *p)
+{
+	if (p->srv)
+		p->srv->nbpend--;
+	else
+		p->px->nbpend--;
+	HA_ATOMIC_SUB(&p->px->totpend, 1);
+	LIST_DEL(&p->list);
+	LIST_INIT(&p->list);
 }
 
-
-/* Detaches the next pending connection from either a server or a proxy, and
- * returns its associated stream. If no pending connection is found, NULL is
- * returned. Note that neither <srv> nor <px> may be NULL.
- * Priority is given to the oldest request in the queue if both <srv> and <px>
- * have pending requests. This ensures that no request will be left unserved.
- * The <px> queue is not considered if the server (or a tracked server) is not
- * RUNNING, is disabled, or has a null weight (server going down). The <srv>
- * queue is still considered in this case, because if some connections remain
- * there, it means that some requests have been forced there after it was seen
- * down (eg: due to option persist).
- * The stream is immediately marked as "assigned", and both its <srv> and
- * <srv_conn> are set to <srv>,
+/* Process the next pending connection from either a server or a proxy, and
+ * returns 0 on success. If no pending connection is found, 1 is returned.
+ * Note that neither <srv> nor <px> may be NULL.  Priority is given to the
+ * oldest request in the queue if both <srv> and <px> have pending
+ * requests. This ensures that no request will be left unserved.  The <px> queue
+ * is not considered if the server (or a tracked server) is not RUNNING, is
+ * disabled, or has a null weight (server going down). The <srv> queue is still
+ * considered in this case, because if some connections remain there, it means
+ * that some requests have been forced there after it was seen down (eg: due to
+ * option persist).  The stream is immediately marked as "assigned", and both
+ * its <srv> and <srv_conn> are set to <srv>.
+ *
+ * This function must only be called if the server queue _AND_ the proxy queue
+ * are locked. Today it is only called by process_srv_queue.
  */
-static struct stream *pendconn_get_next_strm(struct server *srv, struct proxy *px)
+static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 {
-	struct pendconn *ps, *pp;
-	struct stream *strm;
-	struct server *rsrv;
+	struct pendconn *p = NULL;
+	struct server   *rsrv;
 
 	rsrv = srv->track;
 	if (!rsrv)
 		rsrv = srv;
 
-	ps = pendconn_from_srv(srv);
-	pp = pendconn_from_px(px);
-	/* we want to get the definitive pendconn in <ps> */
-	if (!pp || !srv_currently_usable(rsrv)) {
-		if (!ps)
-			return NULL;
-	} else {
-		/* pendconn exists in the proxy queue */
-		if (!ps || tv_islt(&pp->strm->logs.tv_request, &ps->strm->logs.tv_request))
-			ps = pp;
+	if (srv->nbpend) {
+		list_for_each_entry(p, &srv->pendconns, list) {
+			if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
+				goto ps_found;
+		}
+		p = NULL;
 	}
-	strm = ps->strm;
-	__pendconn_free(ps);
 
-	/* we want to note that the stream has now been assigned a server */
-	strm->flags |= SF_ASSIGNED;
-	strm->target = &srv->obj_type;
-	__stream_add_srv_conn(strm, srv);
+  ps_found:
+	if (srv_currently_usable(rsrv) && px->nbpend) {
+		struct pendconn *pp;
+
+		list_for_each_entry(pp, &px->pendconns, list) {
+			/* If the server pendconn is older than the proxy one,
+			 * we process the server one. */
+			if (p && !tv_islt(&pp->strm->logs.tv_request, &p->strm->logs.tv_request))
+				goto pendconn_found;
+
+			if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock)) {
+				/* Let's switch from the server pendconn to the
+				 * proxy pendconn. Don't forget to unlock the
+				 * server pendconn, if any. */
+				if (p)
+					HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+				p = pp;
+				goto pendconn_found;
+			}
+		}
+	}
+
+	if (!p)
+		return 1;
+
+  pendconn_found:
+	pendconn_unlink(p);
+	p->strm_flags |= SF_ASSIGNED;
+	p->srv = srv;
+
 	HA_ATOMIC_ADD(&srv->served, 1);
 	HA_ATOMIC_ADD(&srv->proxy->served, 1);
 	if (px->lbprm.server_take_conn)
 		px->lbprm.server_take_conn(srv);
+	__stream_add_srv_conn(p->strm, srv);
 
-	return strm;
+	task_wakeup(p->strm->task, TASK_WOKEN_RES);
+	HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+	return 0;
 }
 
-/*
- * Manages a server's connection queue. This function will try to dequeue as
+/* Manages a server's connection queue. This function will try to dequeue as
  * many pending streams as possible, and wake them up.
  */
 void process_srv_queue(struct server *s)
@@ -144,17 +163,10 @@
 
 	HA_SPIN_LOCK(PROXY_LOCK,  &p->lock);
 	HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
-
-	/* First, check if we can handle some connections queued at the proxy. We
-	 * will take as many as we can handle.
-	 */
 	maxconn = srv_dynamic_maxconn(s);
 	while (s->served < maxconn) {
-		struct stream *strm = pendconn_get_next_strm(s, p);
-
-		if (strm == NULL)
+		if (pendconn_process_next_strm(s, p))
 			break;
-		task_wakeup(strm->task, TASK_WOKEN_RES);
 	}
 	HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
 	HA_SPIN_UNLOCK(PROXY_LOCK,  &p->lock);
@@ -165,39 +177,50 @@
  * are updated accordingly. Returns NULL if no memory is available, otherwise the
  * pendconn itself. If the stream was already marked as served, its flag is
  * cleared. It is illegal to call this function with a non-NULL strm->srv_conn.
+ *
+ * This function must be called by the stream itself, so in the context of
+ * process_stream.
  */
 struct pendconn *pendconn_add(struct stream *strm)
 {
 	struct pendconn *p;
-	struct server *srv;
-	int count;
+	struct proxy    *px;
+	struct server   *srv;
 
 	p = pool_alloc(pool_head_pendconn);
 	if (!p)
 		return NULL;
 
-	strm->pend_pos = p;
-	p->strm = strm;
 	srv = objt_server(strm->target);
+	px  = strm->be;
+
+	p->srv        = NULL;
+	p->px         = px;
+	p->strm       = strm;
+	p->strm_flags = strm->flags;
+	HA_SPIN_INIT(&p->lock);
 
 	if ((strm->flags & SF_ASSIGNED) && srv) {
 		p->srv = srv;
 		HA_SPIN_LOCK(SERVER_LOCK, &srv->lock);
+		srv->nbpend++;
+		strm->logs.srv_queue_size += srv->nbpend;
+		if (srv->nbpend > srv->counters.nbpend_max)
+			srv->counters.nbpend_max = srv->nbpend;
 		LIST_ADDQ(&srv->pendconns, &p->list);
 		HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
-		count = HA_ATOMIC_ADD(&srv->nbpend, 1);
-		strm->logs.srv_queue_size += count;
-		HA_ATOMIC_UPDATE_MAX(&srv->counters.nbpend_max, count);
-	} else {
-		p->srv = NULL;
-		HA_SPIN_LOCK(PROXY_LOCK, &strm->be->lock);
-		LIST_ADDQ(&strm->be->pendconns, &p->list);
-		HA_SPIN_UNLOCK(PROXY_LOCK, &strm->be->lock);
-		count = HA_ATOMIC_ADD(&strm->be->nbpend, 1);
-		strm->logs.prx_queue_size += count;
-		HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, count);
+	}
+	else {
+		HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
+		px->nbpend++;
+		strm->logs.prx_queue_size += px->nbpend;
+		if (px->nbpend > px->be_counters.nbpend_max)
+			px->be_counters.nbpend_max = px->nbpend;
+		LIST_ADDQ(&px->pendconns, &p->list);
+		HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
 	}
-	HA_ATOMIC_ADD(&strm->be->totpend, 1);
+	HA_ATOMIC_ADD(&px->totpend, 1);
+	strm->pend_pos = p;
 	return p;
 }
 
@@ -206,26 +229,28 @@
  */
 int pendconn_redistribute(struct server *s)
 {
-	struct pendconn *pc, *pc_bck;
+	struct pendconn *p, *pback;
 	int xferred = 0;
 
+	/* The REDISP option was specified. We will ignore cookie and force to
+	 * balance or use the dispatcher. */
+	if ((s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP)
+		return 0;
+
 	HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
-	list_for_each_entry_safe(pc, pc_bck, &s->pendconns, list) {
-		struct stream *strm = pc->strm;
+	list_for_each_entry_safe(p, pback, &s->pendconns, list) {
+		if (p->strm_flags & SF_FORCE_PRST)
+			continue;
 
-		if ((strm->be->options & (PR_O_REDISP|PR_O_PERSIST)) == PR_O_REDISP &&
-		    !(strm->flags & SF_FORCE_PRST)) {
-			/* The REDISP option was specified. We will ignore
-			 * cookie and force to balance or use the dispatcher.
-			 */
+		if (HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
+			continue;
 
-			/* it's left to the dispatcher to choose a server */
-			strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
+		/* it's left to the dispatcher to choose a server */
+		pendconn_unlink(p);
+		p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
 
-			__pendconn_free(pc);
-			task_wakeup(strm->task, TASK_WOKEN_RES);
-			xferred++;
-		}
+		task_wakeup(p->strm->task, TASK_WOKEN_RES);
+		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
 	}
 	HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
 	return xferred;
@@ -238,65 +263,110 @@
  */
 int pendconn_grab_from_px(struct server *s)
 {
-	int xferred;
+	struct pendconn *p, *pback;
+	int maxconn, xferred = 0;
 
 	if (!srv_currently_usable(s))
 		return 0;
 
 	HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
-	for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) {
-		struct stream *strm;
-		struct pendconn *p;
-
-		p = pendconn_from_px(s->proxy);
-		if (!p)
+	maxconn = srv_dynamic_maxconn(s);
+	list_for_each_entry_safe(p, pback, &s->proxy->pendconns, list) {
+		if (s->maxconn && s->served + xferred >= maxconn)
 			break;
-		p->strm->target = &s->obj_type;
-		strm = p->strm;
-		__pendconn_free(p);
-		task_wakeup(strm->task, TASK_WOKEN_RES);
+
+		if (HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
+			continue;
+
+		pendconn_unlink(p);
+		p->srv = s;
+
+		task_wakeup(p->strm->task, TASK_WOKEN_RES);
+		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+		xferred++;
 	}
 	HA_SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock);
 	return xferred;
 }
 
-/*
- * Detaches pending connection <p>, decreases the pending count, and frees
- * the pending connection. The connection might have been queued to a specific
- * server as well as to the proxy. The stream also gets marked unqueued.
+/* Try to dequeue pending connection attached to the stream <strm>. It must
+ * always exists here. If the pendconn is still linked to the server or the
+ * proxy queue, nothing is done and the function returns 1. Otherwise,
+ * <strm>->flags and <strm>->target are updated, the pendconn is released and 0
+ * is returned.
+ *
+ * This function must be called by the stream itself, so in the context of
+ * process_stream.
  */
-void pendconn_free(struct pendconn *p)
+int pendconn_dequeue(struct stream *strm)
 {
-	if (p->srv) {
-		HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
-		LIST_DEL(&p->list);
-		HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
-		HA_ATOMIC_SUB(&p->srv->nbpend, 1);
+	struct pendconn *p;
+
+	if (unlikely(!strm->pend_pos)) {
+		/* unexpected case because it is called by the stream itself and
+		 * only the stream can release a pendconn. So it is only
+		 * possible if a pendconn is released by someone else or if the
+		 * stream is supposed to be queued but without its associated
+		 * pendconn. In both cases it is a bug! */
+		abort();
 	}
-	else {
-		HA_SPIN_LOCK(SERVER_LOCK, &p->strm->be->lock);
-		LIST_DEL(&p->list);
-		HA_SPIN_UNLOCK(SERVER_LOCK, &p->strm->be->lock);
-		HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+	p = strm->pend_pos;
+	HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock);
+
+	/* the pendconn is still linked to the server/proxy queue, so unlock it
+	 * and go away. */
+	if (!LIST_ISEMPTY(&p->list)) {
+		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+		return 1;
 	}
-	p->strm->pend_pos = NULL;
-	HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
+
+	/* the pendconn must be dequeued now */
+	if (p->srv)
+		strm->target = &p->srv->obj_type;
+
+	strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
+	strm->flags |= p->strm_flags & (SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
+	strm->pend_pos = NULL;
+	HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
 	pool_free(pool_head_pendconn, p);
+	return 0;
 }
 
-/* Lock-free version of pendconn_free. */
-static void __pendconn_free(struct pendconn *p)
+/* Release the pending connection <p>, and decreases the pending count if
+ * needed. The connection might have been queued to a specific server as well as
+ * to the proxy. The stream also gets marked unqueued. <p> must always be
+ * defined here. So it is the caller responsibility to check its existance.
+ *
+ * This function must be called by the stream itself, so in the context of
+ * process_stream.
+ */
+void pendconn_free(struct pendconn *p)
 {
+	struct stream *strm = p->strm;
+
+	HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock);
+
+	/* The pendconn was already unlinked, just release it. */
+	if (LIST_ISEMPTY(&p->list))
+		goto release;
+
 	if (p->srv) {
+		HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
+		p->srv->nbpend--;
 		LIST_DEL(&p->list);
-		HA_ATOMIC_SUB(&p->srv->nbpend, 1);
+		HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
 	}
 	else {
+		HA_SPIN_LOCK(PROXY_LOCK, &p->px->lock);
+		p->px->nbpend--;
 		LIST_DEL(&p->list);
-		HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+		HA_SPIN_UNLOCK(PROXY_LOCK, &p->px->lock);
 	}
-	p->strm->pend_pos = NULL;
-	HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
+	HA_ATOMIC_SUB(&p->px->totpend, 1);
+
+  release:
+	strm->pend_pos = NULL;
+	HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
 	pool_free(pool_head_pendconn, p);
 }
 
diff --git a/src/stream.c b/src/stream.c
index afa9f99..38d7240 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -929,7 +929,7 @@
 	}
 	else if (si->state == SI_ST_QUE) {
 		/* connection request was queued, check for any update */
-		if (!s->pend_pos) {
+		if (!pendconn_dequeue(s)) {
 			/* The connection is not in the queue anymore. Either
 			 * we have a server connection slot available and we
 			 * go directly to the assigned state, or we need to