MEDIUM: connections: Attempt to get idle connections from other threads.

In connect_server(), if we no longer have any idle connections for the
current thread, attempt to use the new "takeover" mux method to steal a
connection from another thread.
This should have no impact right now, given no mux implements it.
diff --git a/include/proto/connection.h b/include/proto/connection.h
index 65c99a1..f05a452 100644
--- a/include/proto/connection.h
+++ b/include/proto/connection.h
@@ -477,7 +477,8 @@
 	if (conn->idle_time > 0) {
 		struct server *srv = __objt_server(conn->target);
 		_HA_ATOMIC_SUB(&srv->curr_idle_conns, 1);
-		srv->curr_idle_thr[tid]--;
+		_HA_ATOMIC_SUB(conn->flags & CO_FL_SAFE_LIST ? &srv->curr_safe_nb : &srv->curr_idle_nb, 1);
+		_HA_ATOMIC_SUB(&srv->curr_idle_thr[tid], 1);
 	}
 
 	conn_force_unsubscribe(conn);
diff --git a/include/proto/server.h b/include/proto/server.h
index 7ba9c83..6eb4515 100644
--- a/include/proto/server.h
+++ b/include/proto/server.h
@@ -262,11 +262,16 @@
 			return 0;
 		}
 		MT_LIST_DEL(&conn->list);
-		conn->flags = (conn->flags &~ CO_FL_LIST_MASK) |
-		              (is_safe ? CO_FL_SAFE_LIST : CO_FL_IDLE_LIST);
-		MT_LIST_ADDQ(is_safe ? &srv->safe_conns[tid] : &srv->idle_conns[tid],
-		             (struct mt_list *)&conn->list);
-		srv->curr_idle_thr[tid]++;
+		if (is_safe) {
+			conn->flags = (conn->flags & ~CO_FL_LIST_MASK) | CO_FL_SAFE_LIST;
+			MT_LIST_ADDQ(&srv->safe_conns[tid], (struct mt_list *)&conn->list);
+			_HA_ATOMIC_ADD(&srv->curr_safe_nb, 1);
+		} else {
+			conn->flags = (conn->flags & ~CO_FL_LIST_MASK) | CO_FL_IDLE_LIST;
+			MT_LIST_ADDQ(&srv->idle_conns[tid], (struct mt_list *)&conn->list);
+			_HA_ATOMIC_ADD(&srv->curr_idle_nb, 1);
+		}
+		_HA_ATOMIC_ADD(&srv->curr_idle_thr[tid], 1);
 
 		conn->idle_time = now_ms;
 		__ha_barrier_full();
diff --git a/include/types/server.h b/include/types/server.h
index c29fd18..0ffebb4 100644
--- a/include/types/server.h
+++ b/include/types/server.h
@@ -227,7 +227,9 @@
 	struct list *available_conns;           /* Connection in used, but with still new streams available */
 	unsigned int pool_purge_delay;          /* Delay before starting to purge the idle conns pool */
 	unsigned int max_idle_conns;            /* Max number of connection allowed in the orphan connections list */
-	unsigned int curr_idle_conns;           /* Current number of orphan idling connections */
+	unsigned int curr_idle_conns;           /* Current number of orphan idling connections, both the idle and the safe lists */
+	unsigned int curr_idle_nb;              /* Current number of connections in the idle list */
+	unsigned int curr_safe_nb;              /* Current number of connections in the safe list */
 	unsigned int *curr_idle_thr;            /* Current number of orphan idling connections per thread */
 	int max_reuse;                          /* Max number of requests on a same connection */
 	struct eb32_node idle_node;             /* When to next do cleanup in the idle connections */
diff --git a/src/backend.c b/src/backend.c
index c868d93..cae3f20 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -1074,6 +1074,58 @@
 #endif
 }
 
+/* Attempt to get a backend connection from the specified mt_list array
+ * (safe or idle connections).
+ */
+static struct connection *conn_backend_get(struct server *srv, int is_safe)
+{
+	struct mt_list *mt_list = is_safe ? srv->safe_conns : srv->idle_conns;
+	struct connection *conn;
+	int i;
+	int found = 0;
+
+	/* We need to lock even if this is our own list, because another
+	 * thread may be trying to migrate that connection, and we don't want
+	 * to end up with two threads using the same connection.
+	 */
+	HA_SPIN_LOCK(OTHER_LOCK, &toremove_lock[tid]);
+	conn = MT_LIST_POP(&mt_list[tid], struct connection *, list);
+	HA_SPIN_UNLOCK(OTHER_LOCK, &toremove_lock[tid]);
+
+	/* If we found a connection in our own list, and we don't have to
+	 * steal one from another thread, then we're done.
+	 */
+	if (conn)
+		return conn;
+
+	/* Lookup all other threads for an idle connection, starting from tid + 1 */
+	for (i = tid; !found && (i = ((i + 1 == global.nbthread) ? 0 : i + 1)) != tid;) {
+		struct mt_list *elt1, elt2;
+
+		HA_SPIN_LOCK(OTHER_LOCK, &toremove_lock[i]);
+		mt_list_for_each_entry_safe(conn, &mt_list[i], list, elt1, elt2) {
+			if (conn->mux->takeover && conn->mux->takeover(conn) == 0) {
+				MT_LIST_DEL_SAFE(elt1);
+				found = 1;
+				break;
+			}
+		}
+		HA_SPIN_UNLOCK(OTHER_LOCK, &toremove_lock[i]);
+	}
+
+	if (!found)
+		conn = NULL;
+	else {
+		conn->idle_time = 0;
+		_HA_ATOMIC_SUB(&srv->curr_idle_conns, 1);
+		_HA_ATOMIC_SUB(&srv->curr_idle_thr[i], 1);
+		_HA_ATOMIC_SUB(is_safe ? &srv->curr_safe_nb : &srv->curr_idle_nb, 1);
+		__ha_barrier_atomic_store();
+		LIST_ADDQ(&srv->available_conns[tid], mt_list_to_list(&conn->list));
+	}
+	return conn;
+}
+
 /*
  * This function initiates a connection to the server assigned to this stream
  * (s->target, s->si[1].addr.to). It will assign a server if none
@@ -1148,32 +1200,39 @@
 		 * that there is no concurrency issues.
 		 */
 		if (srv->available_conns && !LIST_ISEMPTY(&srv->available_conns[tid]) &&
-		    ((s->be->options & PR_O_REUSE_MASK) != PR_O_REUSE_NEVR))
+		    ((s->be->options & PR_O_REUSE_MASK) != PR_O_REUSE_NEVR)) {
 			    srv_conn = LIST_ELEM(srv->available_conns[tid].n, struct connection *, list);
-		if (srv->idle_conns && !MT_LIST_ISEMPTY(&srv->idle_conns[tid]) &&
-		    ((s->be->options & PR_O_REUSE_MASK) != PR_O_REUSE_NEVR &&
-		     s->txn && (s->txn->flags & TX_NOT_FIRST))) {
-			srv_conn = MT_LIST_POP(&srv->idle_conns[tid], struct connection *, list);
-		}
-		else if (srv->safe_conns && !MT_LIST_ISEMPTY(&srv->safe_conns[tid]) &&
-			 ((s->txn && (s->txn->flags & TX_NOT_FIRST)) ||
-			  (s->be->options & PR_O_REUSE_MASK) >= PR_O_REUSE_AGGR)) {
-			srv_conn = MT_LIST_POP(&srv->safe_conns[tid], struct connection *, list);
+			    reuse = 1;
 		}
-		else if (srv->idle_conns && !MT_LIST_ISEMPTY(&srv->idle_conns[tid]) &&
-			 (s->be->options & PR_O_REUSE_MASK) == PR_O_REUSE_ALWS) {
-			srv_conn = MT_LIST_POP(&srv->idle_conns[tid], struct connection *, list);
-		}
-		/* If we've picked a connection from the pool, we now have to
-		 * detach it. We may have to get rid of the previous idle
-		 * connection we had, so for this we try to swap it with the
-		 * other owner's. That way it may remain alive for others to
-		 * pick.
-		 */
-		if (srv_conn) {
-			reuse_orphan = 1;
-			reuse = 1;
-			srv_conn->flags &= ~CO_FL_LIST_MASK;
+		else if (!srv_conn && srv->curr_idle_conns > 0) {
+			if (srv->idle_conns &&
+			    ((s->be->options & PR_O_REUSE_MASK) != PR_O_REUSE_NEVR &&
+			     s->txn && (s->txn->flags & TX_NOT_FIRST)) &&
+			    srv->curr_idle_nb > 0) {
+				srv_conn = conn_backend_get(srv, 0);
+			}
+			else if (srv->safe_conns &&
+			         ((s->txn && (s->txn->flags & TX_NOT_FIRST)) ||
+				  (s->be->options & PR_O_REUSE_MASK) >= PR_O_REUSE_AGGR) &&
+				 srv->curr_safe_nb > 0) {
+				srv_conn = conn_backend_get(srv, 1);
+			}
+			else if (srv->idle_conns &&
+			         ((s->be->options & PR_O_REUSE_MASK) == PR_O_REUSE_ALWS) &&
+				 srv->curr_idle_nb > 0) {
+				srv_conn = conn_backend_get(srv, 0);
+			}
+			/* If we've picked a connection from the pool, we now have to
+			 * detach it. We may have to get rid of the previous idle
+			 * connection we had, so for this we try to swap it with the
+			 * other owner's. That way it may remain alive for others to
+			 * pick.
+			 */
+			if (srv_conn) {
+				reuse_orphan = 1;
+				reuse = 1;
+				srv_conn->flags &= ~CO_FL_LIST_MASK;
+			}
 		}
 	}
 
@@ -1247,14 +1306,7 @@
 	 * list and add it back to the idle list.
 	 */
 	if (reuse) {
-		if (reuse_orphan) {
-			srv_conn->idle_time = 0;
-			_HA_ATOMIC_SUB(&srv->curr_idle_conns, 1);
-			__ha_barrier_atomic_store();
-			srv->curr_idle_thr[tid]--;
-			LIST_ADDQ(&srv->available_conns[tid], mt_list_to_list(&srv_conn->list));
-		}
-		else {
+		if (!reuse_orphan) {
 			if (srv_conn->flags & CO_FL_SESS_IDLE) {
 				struct session *sess = srv_conn->owner;