[BUG] fix the dequeuing logic to ensure that all requests get served

The dequeuing logic was completely wrong. First, a task was assigned
to all servers to process the queue, but this task was never scheduled
and was only woken up on session free. Second, there was no reservation
of server entries when a task was assigned a server. This means that
as long as the task was not connected to the server, its presence was
not accounted for. This was causing trouble when detecting whether or
not a server had reached maxconn. Third, during a redispatch, a session
could lose its place at the server's and get blocked because another
session at the same moment would have stolen the entry. Fourth, the
redispatch option did not work when maxqueue was reached for a server,
and it was not possible to do so without indefinitely hanging a session.

The root cause of all those problems was the lack of pre-reservation of
connections at the server's, and the lack of tracking of servers during
a redispatch. Everything relied on combinations of flags which could
appear similarly in quite distinct situations.

This patch is a major rework but there was no other solution, as the
internal logic was deeply flawed. The resulting code is cleaner, more
understandable, uses less magics and is overall more robust.

As an added bonus, "option redispatch" now works when maxqueue has
been reached on a server.
diff --git a/src/backend.c b/src/backend.c
index 9ced724..033b650 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -20,6 +20,7 @@
 
 #include <common/compat.h>
 #include <common/config.h>
+#include <common/debug.h>
 #include <common/eb32tree.h>
 #include <common/time.h>
 
@@ -40,6 +41,7 @@
 #include <proto/proto_http.h>
 #include <proto/proto_tcp.h>
 #include <proto/queue.h>
+#include <proto/session.h>
 #include <proto/stream_sock.h>
 #include <proto/task.h>
 
@@ -785,7 +787,7 @@
 		fwrr_update_position(grp, srv);
 		fwrr_dequeue_srv(srv);
 		grp->curr_pos++;
-		if (!srv->maxconn || srv->cur_sess < srv_dynamic_maxconn(srv)) {
+		if (!srv->maxconn || (!srv->nbpend && srv->served < srv_dynamic_maxconn(srv))) {
 			/* make sure it is not the server we are trying to exclude... */
 			if (srv != srvtoavoid || avoided)
 				break;
@@ -851,7 +853,7 @@
  */
 static inline void fwlc_queue_srv(struct server *s)
 {
-	s->lb_node.key = s->cur_sess * SRV_EWGHT_MAX / s->eweight;
+	s->lb_node.key = s->served * SRV_EWGHT_MAX / s->eweight;
 	eb32_insert(s->lb_tree, &s->lb_node);
 }
 
@@ -1099,7 +1101,7 @@
 		struct server *s;
 
 		s = eb32_entry(node, struct server, lb_node);
-		if (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)) {
+		if (!s->maxconn || (!s->nbpend && s->served < srv_dynamic_maxconn(s))) {
 			if (s != srvtoavoid) {
 				srv = s;
 				break;
@@ -1281,117 +1283,163 @@
 
 
 /*
- * This function marks the session as 'assigned' in direct or dispatch modes,
- * or tries to assign one in balance mode, according to the algorithm. It does
- * nothing if the session had already been assigned a server.
+ * This function applies the load-balancing algorithm to the session, as
+ * defined by the backend it is assigned to. The session is then marked as
+ * 'assigned'.
+ *
+ * This function MAY NOT be called with SN_ASSIGNED already set. If the session
+ * had a server previously assigned, it is rebalanced, trying to avoid the same
+ * server.
+ * The function tries to keep the original connection slot if it reconnects to
+ * the same server, otherwise it releases it and tries to offer it.
+ *
+ * It is illegal to call this function with a session in a queue.
  *
  * It may return :
- *   SRV_STATUS_OK       if everything is OK. s->srv will be valid.
- *   SRV_STATUS_NOSRV    if no server is available. s->srv = NULL.
- *   SRV_STATUS_FULL     if all servers are saturated. s->srv = NULL.
+ *   SRV_STATUS_OK       if everything is OK. Session assigned to ->srv
+ *   SRV_STATUS_NOSRV    if no server is available. Session is not ASSIGNED
+ *   SRV_STATUS_FULL     if all servers are saturated. Session is not ASSIGNED
  *   SRV_STATUS_INTERNAL for other unrecoverable errors.
  *
- * Upon successful return, the session flag SN_ASSIGNED to indicate that it does
- * not need to be called anymore. This usually means that s->srv can be trusted
- * in balance and direct modes. This flag is not cleared, so it's to the caller
- * to clear it if required (eg: redispatch).
+ * Upon successful return, the session flag SN_ASSIGNED is set to indicate that
+ * it does not need to be called anymore. This means that s->srv can be trusted
+ * in balance and direct modes.
  *
  */
 
 int assign_server(struct session *s)
 {
 
-	struct server *srvtoavoid;
+	struct server *conn_slot;
+	int err;
 
 #ifdef DEBUG_FULL
 	fprintf(stderr,"assign_server : s=%p\n",s);
 #endif
 
-	srvtoavoid = s->srv;
-	s->srv = NULL;
+	err = SRV_STATUS_INTERNAL;
+	if (unlikely(s->pend_pos || s->flags & SN_ASSIGNED))
+		goto out_err;
 
-	if (s->pend_pos)
-		return SRV_STATUS_INTERNAL;
+	s->prev_srv = s->prev_srv;
+	conn_slot = s->srv_conn;
 
-	if (!(s->flags & SN_ASSIGNED)) {
-		if (s->be->lbprm.algo & BE_LB_ALGO) {
-			int len;
-		
-			if (s->flags & SN_DIRECT) {
-				s->flags |= SN_ASSIGNED;
-				return SRV_STATUS_OK;
-			}
+	/* We have to release any connection slot before applying any LB algo,
+	 * otherwise we may erroneously end up with no available slot.
+	 */
+	if (conn_slot)
+		sess_change_server(s, NULL);
+
+	/* We will now try to find the good server and store it into <s->srv>.
+	 * Note that <s->srv> may be NULL in case of dispatch or proxy mode,
+	 * as well as if no server is available (check error code).
+	 */
 
-			if (!s->be->lbprm.tot_weight)
-				return SRV_STATUS_NOSRV;
+	s->srv = NULL;
+	if (s->be->lbprm.algo & BE_LB_ALGO) {
+		int len;
+		/* we must check if we have at least one server available */
+		if (!s->be->lbprm.tot_weight) {
+			err = SRV_STATUS_NOSRV;
+			goto out;
+		}
 
-			switch (s->be->lbprm.algo & BE_LB_ALGO) {
-			case BE_LB_ALGO_RR:
-				s->srv = fwrr_get_next_server(s->be, srvtoavoid);
-				if (!s->srv)
-					return SRV_STATUS_FULL;
-				break;
-			case BE_LB_ALGO_LC:
-				s->srv = fwlc_get_next_server(s->be, srvtoavoid);
-				if (!s->srv)
-					return SRV_STATUS_FULL;
-				break;
-			case BE_LB_ALGO_SH:
-				if (s->cli_addr.ss_family == AF_INET)
-					len = 4;
-				else if (s->cli_addr.ss_family == AF_INET6)
-					len = 16;
-				else /* unknown IP family */
-					return SRV_STATUS_INTERNAL;
+		switch (s->be->lbprm.algo & BE_LB_ALGO) {
+		case BE_LB_ALGO_RR:
+			s->srv = fwrr_get_next_server(s->be, s->prev_srv);
+			if (!s->srv) {
+				err = SRV_STATUS_FULL;
+				goto out;
+			}
+			break;
+		case BE_LB_ALGO_LC:
+			s->srv = fwlc_get_next_server(s->be, s->prev_srv);
+			if (!s->srv) {
+				err = SRV_STATUS_FULL;
+				goto out;
+			}
+			break;
+		case BE_LB_ALGO_SH:
+			if (s->cli_addr.ss_family == AF_INET)
+				len = 4;
+			else if (s->cli_addr.ss_family == AF_INET6)
+				len = 16;
+			else {
+				/* unknown IP family */
+				err = SRV_STATUS_INTERNAL;
+				goto out;
+			}
 		
-				s->srv = get_server_sh(s->be,
-						       (void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr,
-						       len);
-				break;
-			case BE_LB_ALGO_UH:
-				/* URI hashing */
-				s->srv = get_server_uh(s->be,
+			s->srv = get_server_sh(s->be,
+					       (void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr,
+					       len);
+			break;
+		case BE_LB_ALGO_UH:
+			/* URI hashing */
+			s->srv = get_server_uh(s->be,
+					       s->txn.req.sol + s->txn.req.sl.rq.u,
+					       s->txn.req.sl.rq.u_l);
+			break;
+		case BE_LB_ALGO_PH:
+			/* URL Parameter hashing */
+			if (s->txn.meth == HTTP_METH_POST &&
+			    memchr(s->txn.req.sol + s->txn.req.sl.rq.u, '&',
+				   s->txn.req.sl.rq.u_l ) == NULL)
+				s->srv = get_server_ph_post(s);
+			else
+				s->srv = get_server_ph(s->be,
 						       s->txn.req.sol + s->txn.req.sl.rq.u,
 						       s->txn.req.sl.rq.u_l);
-				break;
-			case BE_LB_ALGO_PH:
-				/* URL Parameter hashing */
-				if (s->txn.meth == HTTP_METH_POST &&
-                                    memchr(s->txn.req.sol + s->txn.req.sl.rq.u, '&',
-                                           s->txn.req.sl.rq.u_l ) == NULL)
-					s->srv = get_server_ph_post(s);
-				else
-					s->srv = get_server_ph(s->be,
-							       s->txn.req.sol + s->txn.req.sl.rq.u,
-							       s->txn.req.sl.rq.u_l);
 
+			if (!s->srv) {
+				/* parameter not found, fall back to round robin on the map */
+				s->srv = get_server_rr_with_conns(s->be, s->prev_srv);
 				if (!s->srv) {
-					/* parameter not found, fall back to round robin on the map */
-					s->srv = get_server_rr_with_conns(s->be, srvtoavoid);
-					if (!s->srv)
-						return SRV_STATUS_FULL;
+					err = SRV_STATUS_FULL;
+					goto out;
 				}
-				break;
-			default:
-				/* unknown balancing algorithm */
-				return SRV_STATUS_INTERNAL;
-			}
-			if (s->srv != srvtoavoid) {
-				s->be->cum_lbconn++;
-				s->srv->cum_lbconn++;
 			}
+			break;
+		default:
+			/* unknown balancing algorithm */
+			err = SRV_STATUS_INTERNAL;
+			goto out;
 		}
-		else if (s->be->options & PR_O_HTTP_PROXY) {
-			if (!s->srv_addr.sin_addr.s_addr)
-				return SRV_STATUS_NOSRV;
+		if (s->srv != s->prev_srv) {
+			s->be->cum_lbconn++;
+			s->srv->cum_lbconn++;
 		}
-		else if (!*(int *)&s->be->dispatch_addr.sin_addr &&
-			 !(s->fe->options & PR_O_TRANSP)) {
-			return SRV_STATUS_NOSRV;
+	}
+	else if (s->be->options & PR_O_HTTP_PROXY) {
+		if (!s->srv_addr.sin_addr.s_addr) {
+			err = SRV_STATUS_NOSRV;
+			goto out;
 		}
-		s->flags |= SN_ASSIGNED;
 	}
-	return SRV_STATUS_OK;
+	else if (!*(int *)&s->be->dispatch_addr.sin_addr &&
+		 !(s->fe->options & PR_O_TRANSP)) {
+		err = SRV_STATUS_NOSRV;
+		goto out;
+	}
+
+	s->flags |= SN_ASSIGNED;
+	err = SRV_STATUS_OK;
+ out:
+
+	/* Either we take back our connection slot, or we offer it to someone
+	 * else if we don't need it anymore.
+	 */
+	if (conn_slot) {
+		if (conn_slot == s->srv) {
+			sess_change_server(s, s->srv);
+		} else {
+			if (may_dequeue_tasks(conn_slot, s->be))
+				process_srv_queue(conn_slot);
+		}
+	}
+
+ out_err:
+	return err;
 }
 
 
@@ -1465,6 +1513,11 @@
 
 /* This function assigns a server to session <s> if required, and can add the
  * connection to either the assigned server's queue or to the proxy's queue.
+ * If ->srv_conn is set, the session is first released from the server.
+ * It may also be called with SN_DIRECT and/or SN_ASSIGNED though. It will
+ * be called before any connection and after any retry or redispatch occurs.
+ *
+ * It is not allowed to call this function with a session in a queue.
  *
  * Returns :
  *
@@ -1472,92 +1525,89 @@
  *   SRV_STATUS_NOSRV    if no server is available. s->srv = NULL.
  *   SRV_STATUS_QUEUED   if the connection has been queued.
  *   SRV_STATUS_FULL     if the server(s) is/are saturated and the
- *                       connection could not be queued.
+ *                       connection could not be queued in s->srv,
+ *                       which may be NULL if we queue on the backend.
  *   SRV_STATUS_INTERNAL for other unrecoverable errors.
  *
  */
 int assign_server_and_queue(struct session *s)
 {
 	struct pendconn *p;
-	struct server *srv;
 	int err;
 
 	if (s->pend_pos)
 		return SRV_STATUS_INTERNAL;
 
-	if (s->flags & SN_ASSIGNED) {
-		if ((s->flags & SN_REDIRECTABLE) && s->srv && s->srv->rdr_len) {
-			/* server scheduled for redirection, and already assigned. We
-			 * don't want to go further nor check the queue.
-			 */
-			return SRV_STATUS_OK;
-		}
-
-		if (s->srv && s->srv->maxqueue > 0 && s->srv->nbpend >= s->srv->maxqueue) {
-			/* it's left to the dispatcher to choose a server */
-			s->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
-		} else {
-			/* a server does not need to be assigned, perhaps because we're in
-			 * direct mode, or in dispatch or transparent modes where the server
-			 * is not needed.
-			 */
-			if (s->srv &&
-			    s->srv->maxconn && s->srv->cur_sess >= srv_dynamic_maxconn(s->srv)) {
-				p = pendconn_add(s);
-				if (p)
-					return SRV_STATUS_QUEUED;
-				else
-					return SRV_STATUS_FULL;
-			}
-			return SRV_STATUS_OK;
-		}
-	}
-
-	/* a server needs to be assigned */
-	srv = s->srv;
-	err = assign_server(s);
-
-	if (srv) {
-		if (srv != s->srv) {
-			/* This session was previously dispatched to another server:
-			 *  - set TX_CK_DOWN if txn.flags was TX_CK_VALID
-			 *  - set SN_REDISP if it was successfully redispatched
-			 *  - increment srv->redispatches and be->redispatches
+	err = SRV_STATUS_OK;
+	if (!(s->flags & SN_ASSIGNED)) {
+		err = assign_server(s);
+		if (s->prev_srv) {
+			/* This session was previously assigned to a server. We have to
+			 * update the session's and the server's stats :
+			 *  - if the server changed :
+			 *    - set TX_CK_DOWN if txn.flags was TX_CK_VALID
+			 *    - set SN_REDISP if it was successfully redispatched
+			 *    - increment srv->redispatches and be->redispatches
+			 *  - if the server remained the same : update retries.
 			 */
 
-			if ((s->txn.flags & TX_CK_MASK) == TX_CK_VALID) {
-				s->txn.flags &= ~TX_CK_MASK;
-				s->txn.flags |= TX_CK_DOWN;
+			if (s->prev_srv != s->srv) {
+				if ((s->txn.flags & TX_CK_MASK) == TX_CK_VALID) {
+					s->txn.flags &= ~TX_CK_MASK;
+					s->txn.flags |= TX_CK_DOWN;
+				}
+				s->flags |= SN_REDISP;
+				s->prev_srv->redispatches++;
+				s->be->redispatches++;
+			} else {
+				s->prev_srv->retries++;
+				s->be->retries++;
 			}
-
-			s->flags |= SN_REDISP;
-
-			srv->redispatches++;
-			s->be->redispatches++;
-		} else {
-			srv->retries++;
-			s->be->retries++;
 		}
 	}
 
 	switch (err) {
 	case SRV_STATUS_OK:
-		if ((s->flags & SN_REDIRECTABLE) && s->srv && s->srv->rdr_len) {
-			/* server supporting redirection and it is possible.
-			 * Let's report that and ignore maxconn !
+		/* we have SN_ASSIGNED set */
+		if (!s->srv)
+			return SRV_STATUS_OK;   /* dispatch or proxy mode */
+
+		/* If we already have a connection slot, no need to check any queue */
+		if (s->srv_conn == s->srv)
+			return SRV_STATUS_OK;
+
+		/* OK, this session already has an assigned server, but no
+		 * connection slot yet. Either it is a redispatch, or it was
+		 * assigned from persistence information (direct mode).
+		 */
+		if ((s->flags & SN_REDIRECTABLE) && s->srv->rdr_len) {
+			/* server scheduled for redirection, and already assigned. We
+			 * don't want to go further nor check the queue.
 			 */
+			sess_change_server(s, s->srv); /* not really needed in fact */
 			return SRV_STATUS_OK;
 		}
 
-		/* in balance mode, we might have servers with connection limits */
-		if (s->srv &&
-		    s->srv->maxconn && s->srv->cur_sess >= srv_dynamic_maxconn(s->srv)) {
+		/* We might have to queue this session if the assigned server is full.
+		 * We know we have to queue it into the server's queue, so if a maxqueue
+		 * is set on the server, we must also check that the server's queue is
+		 * not full, in which case we have to return FULL.
+		 */
+		if (s->srv->maxconn &&
+		    (s->srv->nbpend || s->srv->served >= srv_dynamic_maxconn(s->srv))) {
+
+			if (s->srv->maxqueue > 0 && s->srv->nbpend >= s->srv->maxqueue)
+				return SRV_STATUS_FULL;
+
 			p = pendconn_add(s);
 			if (p)
 				return SRV_STATUS_QUEUED;
 			else
-				return SRV_STATUS_FULL;
+				return SRV_STATUS_INTERNAL;
 		}
+
+		/* OK, we can use this server. Let's reserve our place */
+		sess_change_server(s, s->srv);
 		return SRV_STATUS_OK;
 
 	case SRV_STATUS_FULL:
@@ -1566,11 +1616,14 @@
 		if (p)
 			return SRV_STATUS_QUEUED;
 		else
-			return SRV_STATUS_FULL;
+			return SRV_STATUS_INTERNAL;
 
 	case SRV_STATUS_NOSRV:
+		return err;
+
 	case SRV_STATUS_INTERNAL:
 		return err;
+
 	default:
 		return SRV_STATUS_INTERNAL;
 	}
@@ -1808,7 +1861,7 @@
 		 * we have to inform the server that it may be used by another session.
 		 */
 		if (may_dequeue_tasks(t->srv, t->be))
-			task_wakeup(t->srv->queue_mgt);
+			process_srv_queue(t->srv);
 		return 1;
 	}
 	return 0;
@@ -1851,7 +1904,7 @@
 			t->be->failed_conns++;
 			/* release other sessions waiting for this server */
 			if (may_dequeue_tasks(t->srv, t->be))
-				task_wakeup(t->srv->queue_mgt);
+				process_srv_queue(t->srv);
 			return 1;
 		}
 		/* ensure that we have enough retries left */
@@ -1865,13 +1918,14 @@
 	 */
 	/* let's try to offer this slot to anybody */
 	if (may_dequeue_tasks(t->srv, t->be))
-		task_wakeup(t->srv->queue_mgt);
+		process_srv_queue(t->srv);
 
 	if (t->srv)
 		t->srv->cum_sess++;		//FIXME?
 
 	/* it's left to the dispatcher to choose a server */
 	t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
+	t->prev_srv = t->srv;
 	return 0;
 }
 
@@ -1891,11 +1945,34 @@
 	/* We know that we don't have any connection pending, so we will
 	 * try to get a new one, and wait in this state if it's queued
 	 */
+ redispatch:
 	conn_err = assign_server_and_queue(t);
 	switch (conn_err) {
 	case SRV_STATUS_OK:
 		break;
 
+	case SRV_STATUS_FULL:
+		/* The server has reached its maxqueue limit. Either PR_O_REDISP is set
+		 * and we can redispatch to another server, or it is not and we return
+		 * 503. This only makes sense in DIRECT mode however, because normal LB
+		 * algorithms would never select such a server, and hash algorithms
+		 * would bring us on the same server again. Note that t->srv is set in
+		 * this case.
+		 */
+		if ((t->flags & SN_DIRECT) && (t->be->options & PR_O_REDISP)) {
+			t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
+			t->prev_srv = t->srv;
+			goto redispatch;
+		}
+
+		tv_eternity(&t->req->cex);
+		srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_Q,
+				   503, error_message(t, HTTP_ERR_503));
+
+		t->srv->failed_conns++;
+		t->be->failed_conns++;
+		return 1;
+
 	case SRV_STATUS_NOSRV:
 		/* note: it is guaranteed that t->srv == NULL here */
 		tv_eternity(&t->req->cex);
@@ -1903,18 +1980,15 @@
 				   503, error_message(t, HTTP_ERR_503));
 
 		t->be->failed_conns++;
-
 		return 1;
 
 	case SRV_STATUS_QUEUED:
-		/* note: we use the connect expiration date for the queue. */
 		if (!tv_add_ifset(&t->req->cex, &now, &t->be->timeout.queue))
 			tv_eternity(&t->req->cex);
 		t->srv_state = SV_STIDLE;
 		/* do nothing else and do not wake any other session up */
 		return 1;
 
-	case SRV_STATUS_FULL:
 	case SRV_STATUS_INTERNAL:
 	default:
 		tv_eternity(&t->req->cex);
@@ -1928,7 +2002,7 @@
 
 		/* release other sessions waiting for this server */
 		if (may_dequeue_tasks(t->srv, t->be))
-			task_wakeup(t->srv->queue_mgt);
+			process_srv_queue(t->srv);
 		return 1;
 	}
 	/* if we get here, it's because we got SRV_STATUS_OK, which also