[CRITICAL] fix a crashing trouble with the maxconn limits.

If a task was queued on a server and if this task was alone and aborted
before any other task did anything, there were situations by which it
might have queued itself in the run queue, then exited, and the upcoming
tv_queue() associated to the run loop would have resurrected it siently,
causing crashes in task_queue.

The new principle consists in assigning a task to every server that
needs a connection limit. This task will be woken up every time we
suspect we might leave some place to queue a task. The server's task
itself will only have to run across its queue and run the available
number of tasks.
diff --git a/haproxy.c b/haproxy.c
index 766841c..eaa6bc1 100644
--- a/haproxy.c
+++ b/haproxy.c
@@ -531,6 +531,7 @@
     char *id;				/* just for identification */
     struct list pendconns;		/* pending connections */
     int nbpend;				/* number of pending connections */
+    struct task *queue_mgt;		/* the task associated to the queue processing */
     struct sockaddr_in addr;		/* the address to connect to */
     struct sockaddr_in source_addr;	/* the address to which we want to bind for connect() */
     short check_port;			/* the port to use for the health checks */
@@ -1879,20 +1880,6 @@
     return sess;
 }
 
-/* Checks if other sessions are waiting for a known server, and wakes the
- * first one up. Note that neither <srv> nor <px> can be NULL. Returns 1
- * if a session has been assigned, 0 if nothing has been done.
- */
-static int offer_connection_slot(struct server *srv, struct proxy *px) {
-    struct session *sess;
-
-    sess = pendconn_get_next_sess(srv, px);
-    if (sess == NULL)
-	return 0;
-    task_wakeup(&rq, sess->task);
-    return 1;
-}
-
 /* Adds the session <sess> to the pending connection list of server <sess>->srv
  * or to the one of <sess>->proxy if srv is NULL. All counters and back pointers
  * are updated accordingly. Returns NULL if no memory is available, otherwise the
@@ -1919,6 +1906,16 @@
     return p;
 }
 
+/* returns 0 if nothing has to be done for server <s> regarding queued connections,
+ * and non-zero otherwise. Suited for and if/else usage.
+ */
+static inline int may_dequeue_tasks(struct server *s, struct proxy *p) {
+    return (s && (s->nbpend || p->nbpend) &&
+	    s->maxconn && s->cur_sess < s->maxconn && s->queue_mgt);
+}
+
+
+
 /*********************************************************************/
 /*   more specific functions   ***************************************/
 /*********************************************************************/
@@ -4560,10 +4557,10 @@
 			   503, t->proxy->errmsg.len503, t->proxy->errmsg.msg503);
 	    
 	/* We used to have a free connection slot. Since we'll never use it,
-	 * we have to pass it on to another session.
+	 * we have to inform the server that it may be used by another session.
 	 */
-	if (t->srv)
-	    offer_connection_slot(t->srv, t->proxy);
+	if (may_dequeue_tasks(t->srv, t->proxy))
+	    task_wakeup(&rq, t->srv->queue_mgt);
 	return 1;
     }
     return 0;
@@ -4596,20 +4593,26 @@
 	    srv_close_with_err(t, SN_ERR_INTERNAL, SN_FINST_C,
 			       500, t->proxy->errmsg.len500, t->proxy->errmsg.msg500);
 	    /* release other sessions waiting for this server */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 	    return 1;
 	}
 	/* ensure that we have enough retries left */
-	if (srv_count_retry_down(t, conn_err))
-	    /* FIXME-20060509: should not we try to offer this slot to anybody ? */
+	if (srv_count_retry_down(t, conn_err)) {
+	    /* let's try to offer this slot to anybody */
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 	    return 1;
+	}
     } while (t->srv == NULL || t->conn_retries > 0 || !(t->proxy->options & PR_O_REDISP));
 
     /* We're on our last chance, and the REDISP option was specified.
      * We will ignore cookie and force to balance or use the dispatcher.
      */
-    /* FIXME-20060509: should not we try to offer this slot to anybody ? */
+    /* let's try to offer this slot to anybody */
+    if (may_dequeue_tasks(t->srv, t->proxy))
+	task_wakeup(&rq, t->srv->queue_mgt);
+
     t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
     t->srv = NULL; /* it's left to the dispatcher to choose a server */
     if ((t->flags & SN_CK_MASK) == SN_CK_VALID) {
@@ -4639,17 +4642,10 @@
 	break;
 
     case SRV_STATUS_NOSRV:
+	/* note: it is guaranteed that t->srv == NULL here */
 	tv_eternity(&t->cnexpire);
 	srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_C,
 			   503, t->proxy->errmsg.len503, t->proxy->errmsg.msg503);
-
-	/* FIXME-20060501: we should not need this once we flush every session
-	 * when the last server goes down.
-	 * FIXME-20060509: this will never execute because it is guaranteed that t->srv == NULL here.
-	 */
-	/* release other sessions waiting for this server */
-	if (t->srv)
-	    offer_connection_slot(t->srv, t->proxy);
 	return 1;
 
     case SRV_STATUS_QUEUED:
@@ -4669,8 +4665,8 @@
 	srv_close_with_err(t, SN_ERR_INTERNAL, SN_FINST_C,
 			   500, t->proxy->errmsg.len500, t->proxy->errmsg.msg500);
 	/* release other sessions waiting for this server */
-	if (t->srv)
-	    offer_connection_slot(t->srv, t->proxy);
+	if (may_dequeue_tasks(t->srv, t->proxy))
+	    task_wakeup(&rq, t->srv->queue_mgt);
 	return 1;
     }
     /* if we get here, it's because we got SRV_STATUS_OK, which also
@@ -4709,12 +4705,6 @@
 	    tv_eternity(&t->cnexpire);
 	    srv_close_with_err(t, SN_ERR_CLICL, SN_FINST_C, 0, 0, NULL);
 
-	    /* it might be possible that we have been granted an access to the
-	     * server while waiting for a free slot. Since we'll never use it,
-	     * we have to pass it on to another session.
-	     */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
 	    return 1;
 	}
 	else {
@@ -4879,10 +4869,10 @@
 			send_log(t->proxy, LOG_ALERT, "Blocking cacheable cookie in response from instance %s, server %s.\n", t->proxy->id, t->srv->id);
 
 			/* We used to have a free connection slot. Since we'll never use it,
-			 * we have to pass it on to another session.
+			 * we have to inform the server that it may be used by another session.
 			 */
-			if (t->srv)
-			    offer_connection_slot(t->srv, t->proxy);
+			if (may_dequeue_tasks(t->srv, t->proxy))
+			    task_wakeup(&rq, t->srv->queue_mgt);
 
 			return 1;
 		    }
@@ -4903,10 +4893,10 @@
 		    if (!(t->flags & SN_FINST_MASK))
 			t->flags |= SN_FINST_H;
 		    /* We used to have a free connection slot. Since we'll never use it,
-		     * we have to pass it on to another session.
+		     * we have to inform the server that it may be used by another session.
 		     */
-		    if (t->srv)
-			offer_connection_slot(t->srv, t->proxy);
+		    if (may_dequeue_tasks(t->srv, t->proxy))
+			task_wakeup(&rq, t->srv->queue_mgt);
 
 		    return 1;
 		}
@@ -5343,10 +5333,10 @@
 	    if (!(t->flags & SN_FINST_MASK))
 		t->flags |= SN_FINST_H;
 	    /* We used to have a free connection slot. Since we'll never use it,
-	     * we have to pass it on to another session.
+	     * we have to inform the server that it may be used by another session.
 	     */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 
 	    return 1;
 	}
@@ -5378,10 +5368,10 @@
 	    if (!(t->flags & SN_FINST_MASK))
 		t->flags |= SN_FINST_H;
 	    /* We used to have a free connection slot. Since we'll never use it,
-	     * we have to pass it on to another session.
+	     * we have to inform the server that it may be used by another session.
 	     */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 
 	    return 1;
 	}	
@@ -5478,10 +5468,10 @@
 	    if (!(t->flags & SN_FINST_MASK))
 		t->flags |= SN_FINST_D;
 	    /* We used to have a free connection slot. Since we'll never use it,
-	     * we have to pass it on to another session.
+	     * we have to inform the server that it may be used by another session.
 	     */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 
 	    return 1;
 	}
@@ -5592,10 +5582,10 @@
 	    if (!(t->flags & SN_FINST_MASK))
 		t->flags |= SN_FINST_D;
 	    /* We used to have a free connection slot. Since we'll never use it,
-	     * we have to pass it on to another session.
+	     * we have to inform the server that it may be used by another session.
 	     */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 
 	    return 1;
 	}
@@ -5608,10 +5598,10 @@
 	    //close(t->srv_fd);
 	    t->srv_state = SV_STCLOSE;
 	    /* We used to have a free connection slot. Since we'll never use it,
-	     * we have to pass it on to another session.
+	     * we have to inform the server that it may be used by another session.
 	     */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 
 	    return 1;
 	}
@@ -5628,10 +5618,10 @@
 	    if (!(t->flags & SN_FINST_MASK))
 		t->flags |= SN_FINST_D;
 	    /* We used to have a free connection slot. Since we'll never use it,
-	     * we have to pass it on to another session.
+	     * we have to inform the server that it may be used by another session.
 	     */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 
 	    return 1;
 	}
@@ -5670,10 +5660,10 @@
 	    if (!(t->flags & SN_FINST_MASK))
 		t->flags |= SN_FINST_D;
 	    /* We used to have a free connection slot. Since we'll never use it,
-	     * we have to pass it on to another session.
+	     * we have to inform the server that it may be used by another session.
 	     */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 
 	    return 1;
 	}
@@ -5686,10 +5676,10 @@
 	    //close(t->srv_fd);
 	    t->srv_state = SV_STCLOSE;
 	    /* We used to have a free connection slot. Since we'll never use it,
-	     * we have to pass it on to another session.
+	     * we have to inform the server that it may be used by another session.
 	     */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 
 	    return 1;
 	}
@@ -5706,10 +5696,10 @@
 	    if (!(t->flags & SN_FINST_MASK))
 		t->flags |= SN_FINST_D;
 	    /* We used to have a free connection slot. Since we'll never use it,
-	     * we have to pass it on to another session.
+	     * we have to inform the server that it may be used by another session.
 	     */
-	    if (t->srv)
-		offer_connection_slot(t->srv, t->proxy);
+	    if (may_dequeue_tasks(t->srv, t->proxy))
+		task_wakeup(&rq, t->srv->queue_mgt);
 
 	    return 1;
 	}
@@ -6054,6 +6044,31 @@
 
 
 
+/*
+ * Manages a server's connection queue. If woken up, will try to dequeue as
+ * many pending sessions as possible, and wake them up. The task has nothing
+ * else to do, so it always returns TIME_ETERNITY.
+ */
+int process_srv_queue(struct task *t) {
+    struct server *s = (struct server*)t->context;
+    struct proxy  *p = s->proxy;
+    int xferred;
+
+    /* First, check if we can handle some connections queued at the proxy. We
+     * will take as many as we can handle.
+     */
+    for (xferred = 0; s->cur_sess + xferred < s->maxconn; xferred++) {
+	struct session *sess;
+
+	sess = pendconn_get_next_sess(s, p);
+	if (sess == NULL)
+	    break;
+	task_wakeup(&rq, sess->task);
+    }
+
+    return TIME_ETERNITY;
+}
+
 #if STATTIME > 0
 int stats(void);
 #endif
@@ -8596,6 +8611,34 @@
 	    curproxy->errmsg.len504 = strlen(HTTP_504);
 	}
 
+	/*
+	 * If this server supports a maxconn parameter, it needs a dedicated
+	 * tasks to fill the emptied slots when a connection leaves.
+	 */
+	newsrv = curproxy->srv;
+	while (newsrv != NULL) {
+	    if (newsrv->maxconn > 0) {
+		struct task *t;
+
+		if ((t = pool_alloc(task)) == NULL) {
+		    Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+		    return -1;
+		}
+		
+		t->next = t->prev = t->rqnext = NULL; /* task not in run queue yet */
+		t->wq = LIST_HEAD(wait_queue[1]); /* already assigned to the eternity queue */
+		t->state = TASK_IDLE;
+		t->process = process_srv_queue;
+		t->context = newsrv;
+		newsrv->queue_mgt = t;
+
+		/* never run it unless specifically woken up */
+		tv_eternity(&t->expire);
+		task_queue(t);
+	    }
+	    newsrv = newsrv->next;
+	}
+
 	/* now we'll start this proxy's health checks if any */
 	/* 1- count the checkers to run simultaneously */
 	nbchk = 0;