BUG/MEDIUM: threads/queue: wake up other threads upon dequeue

The previous patch about queues (5cd4bbd7a "BUG/MAJOR: threads/queue: Fix
thread-safety issues on the queues management") revealed a performance drop when
multithreading is enabled (nbthread > 1). This happens when pending connections
handled by other theads are dequeued. If these other threads are blocked in the
poller, we have to wait the poller's timeout (or any I/O event) to process the
dequeued connections.

To fix the problem, at least temporarly, we "wake up" the threads by requesting
a synchronization. This may seem a bit overkill to use the sync point to do a
wakeup on threads, but it fixes this performance issue. So we can now think
calmly on the good way to address this kind of issues.

This patch should be backported in 1.8 with the commit 5cd4bbd7a ("BUG/MAJOR:
threads/queue: Fix thread-safety issues on the queues management").
diff --git a/src/queue.c b/src/queue.c
index aa40fba..7cad813 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -81,24 +81,28 @@
 }
 
 /* Process the next pending connection from either a server or a proxy, and
- * returns 0 on success. If no pending connection is found, 1 is returned.
- * Note that neither <srv> nor <px> may be NULL.  Priority is given to the
- * oldest request in the queue if both <srv> and <px> have pending
- * requests. This ensures that no request will be left unserved.  The <px> queue
- * is not considered if the server (or a tracked server) is not RUNNING, is
- * disabled, or has a null weight (server going down). The <srv> queue is still
- * considered in this case, because if some connections remain there, it means
- * that some requests have been forced there after it was seen down (eg: due to
- * option persist).  The stream is immediately marked as "assigned", and both
- * its <srv> and <srv_conn> are set to <srv>.
+ * returns a strictly positive value on success (see below). If no pending
+ * connection is found, 0 is returned.  Note that neither <srv> nor <px> may be
+ * NULL.  Priority is given to the oldest request in the queue if both <srv> and
+ * <px> have pending requests. This ensures that no request will be left
+ * unserved.  The <px> queue is not considered if the server (or a tracked
+ * server) is not RUNNING, is disabled, or has a null weight (server going
+ * down). The <srv> queue is still considered in this case, because if some
+ * connections remain there, it means that some requests have been forced there
+ * after it was seen down (eg: due to option persist).  The stream is
+ * immediately marked as "assigned", and both its <srv> and <srv_conn> are set
+ * to <srv>.
  *
  * This function must only be called if the server queue _AND_ the proxy queue
- * are locked. Today it is only called by process_srv_queue.
+ * are locked. Today it is only called by process_srv_queue. When a pending
+ * connection is dequeued, this function returns 1 if the pending connection can
+ * be handled by the current thread, else it returns 2.
  */
 static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 {
 	struct pendconn *p = NULL;
 	struct server   *rsrv;
+	int remote;
 
 	rsrv = srv->track;
 	if (!rsrv)
@@ -135,7 +139,7 @@
 	}
 
 	if (!p)
-		return 1;
+		return 0;
 
   pendconn_found:
 	pendconn_unlink(p);
@@ -148,9 +152,12 @@
 		px->lbprm.server_take_conn(srv);
 	__stream_add_srv_conn(p->strm, srv);
 
+	remote = !(p->strm->task->thread_mask & tid_bit);
 	task_wakeup(p->strm->task, TASK_WOKEN_RES);
 	HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
-	return 0;
+
+	/* Returns 1 if the current thread can process the stream, otherwise returns 2. */
+	return remote ? 2 : 1;
 }
 
 /* Manages a server's connection queue. This function will try to dequeue as
@@ -159,17 +166,22 @@
 void process_srv_queue(struct server *s)
 {
 	struct proxy  *p = s->proxy;
-	int maxconn;
+	int maxconn, remote = 0;
 
 	HA_SPIN_LOCK(PROXY_LOCK,  &p->lock);
 	HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
 	maxconn = srv_dynamic_maxconn(s);
 	while (s->served < maxconn) {
-		if (pendconn_process_next_strm(s, p))
+		int ret = pendconn_process_next_strm(s, p);
+		if (!ret)
 			break;
+		remote |= (ret == 2);
 	}
 	HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
 	HA_SPIN_UNLOCK(PROXY_LOCK,  &p->lock);
+
+	if (remote)
+		thread_want_sync();
 }
 
 /* Adds the stream <strm> to the pending connection list of server <strm>->srv
@@ -231,6 +243,7 @@
 {
 	struct pendconn *p, *pback;
 	int xferred = 0;
+	int remote = 0;
 
 	/* The REDISP option was specified. We will ignore cookie and force to
 	 * balance or use the dispatcher. */
@@ -249,10 +262,15 @@
 		pendconn_unlink(p);
 		p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
 
+		remote |= !(p->strm->task->thread_mask & tid_bit);
 		task_wakeup(p->strm->task, TASK_WOKEN_RES);
 		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
 	}
 	HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
+
+	if (remote)
+		thread_want_sync();
+
 	return xferred;
 }
 
@@ -265,6 +283,7 @@
 {
 	struct pendconn *p, *pback;
 	int maxconn, xferred = 0;
+	int remote = 0;
 
 	if (!srv_currently_usable(s))
 		return 0;
@@ -281,11 +300,16 @@
 		pendconn_unlink(p);
 		p->srv = s;
 
+		remote |= !(p->strm->task->thread_mask & tid_bit);
 		task_wakeup(p->strm->task, TASK_WOKEN_RES);
 		HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
 		xferred++;
 	}
 	HA_SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock);
+
+	if (remote)
+		thread_want_sync();
+
 	return xferred;
 }