MEDIUM: threads/lb: Make LB algorithms (lb_*.c) thread-safe

A lock for LB parameters has been added inside the proxy structure and atomic
operations have been used to update server variables releated to lb.

The only significant change is about lb_map. Because the servers status are
updated in the sync-point, we can call recalc_server_map function synchronously
in map_set_server_status_up/down function.
diff --git a/src/backend.c b/src/backend.c
index d17635f..23b85ce 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -99,6 +99,10 @@
  * this.
  * This functions is designed to be called before server's weight and state
  * commit so it uses 'next' weight and states values.
+ *
+ * threads: this is the caller responsibility to lock data. For now, this
+ * function is called from lb modules, so it should be ok. But if you need to
+ * call it from another place, be careful (and update this comment).
  */
 void recount_servers(struct proxy *px)
 {
@@ -129,6 +133,10 @@
 /* This function simply updates the backend's tot_weight and tot_used values
  * after servers weights have been updated. It is designed to be used after
  * recount_servers() or equivalent.
+ *
+ * threads: this is the caller responsibility to lock data. For now, this
+ * function is called from lb modules, so it should be ok. But if you need to
+ * call it from another place, be careful (and update this comment).
  */
 void update_backend_weight(struct proxy *px)
 {
@@ -233,7 +241,7 @@
 		return map_get_server_hash(px, hash);
 }
 
-/* 
+/*
  * This function tries to find a running server for the proxy <px> following
  * the URL parameter hash method. It looks for a specific parameter in the
  * URL and hashes it to compute the server ID. This is useful to optimize
@@ -503,7 +511,7 @@
 	else
 		return map_get_server_hash(px, hash);
 }
- 
+
 /*
  * This function applies the load-balancing algorithm to the stream, as
  * defined by the backend it is assigned to. The stream is then marked as
@@ -579,6 +587,7 @@
 		s->target = &srv->obj_type;
 	}
 	else if (s->be->lbprm.algo & BE_LB_KIND) {
+
 		/* we must check if we have at least one server available */
 		if (!s->be->lbprm.tot_weight) {
 			err = SRV_STATUS_NOSRV;
diff --git a/src/cfgparse.c b/src/cfgparse.c
index e765fdb..dd49009 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -8434,6 +8434,7 @@
 			}
 			break;
 		}
+		SPIN_INIT(&curproxy->lbprm.lock);
 
 		if (curproxy->options & PR_O_LOGASAP)
 			curproxy->to_log &= ~LW_BYTES;
diff --git a/src/haproxy.c b/src/haproxy.c
index 5710cc2..30ac157 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -2129,6 +2129,7 @@
 
 		p0 = p;
 		p = p->next;
+		SPIN_DESTROY(&p0->lbprm.lock);
 		SPIN_DESTROY(&p0->lock);
 		free(p0);
 	}/* end while(p) */
diff --git a/src/lb_chash.c b/src/lb_chash.c
index f368b68..32d7d1d 100644
--- a/src/lb_chash.c
+++ b/src/lb_chash.c
@@ -118,7 +118,7 @@
 	struct proxy *p = srv->proxy;
 
 	if (!srv_lb_status_changed(srv))
-		return;
+               return;
 
 	if (srv_willbe_usable(srv))
 		goto out_update_state;
@@ -169,7 +169,7 @@
 	struct proxy *p = srv->proxy;
 
 	if (!srv_lb_status_changed(srv))
-		return;
+               return;
 
 	if (!srv_willbe_usable(srv))
 		goto out_update_state;
@@ -364,14 +364,19 @@
 	srv = avoided = NULL;
 	avoided_node = NULL;
 
+	SPIN_LOCK(LBPRM_LOCK, &p->lbprm.lock);
 	if (p->srv_act)
 		root = &p->lbprm.chash.act;
-	else if (p->lbprm.fbck)
-		return p->lbprm.fbck;
+	else if (p->lbprm.fbck) {
+		srv = p->lbprm.fbck;
+		goto out;
+	}
 	else if (p->srv_bck)
 		root = &p->lbprm.chash.bck;
-	else
-		return NULL;
+	else {
+		srv = NULL;
+		goto out;
+	}
 
 	stop = node = p->lbprm.chash.last;
 	do {
@@ -415,6 +420,8 @@
 		p->lbprm.chash.last = avoided_node;
 	}
 
+ out:
+	SPIN_UNLOCK(LBPRM_LOCK, &p->lbprm.lock);
 	return srv;
 }
 
diff --git a/src/lb_fas.c b/src/lb_fas.c
index f8e739b..db292db 100644
--- a/src/lb_fas.c
+++ b/src/lb_fas.c
@@ -63,8 +63,11 @@
 {
 	if (!s->lb_tree)
 		return;
+
+	SPIN_LOCK(LBPRM_LOCK, &s->proxy->lbprm.lock);
 	fas_dequeue_srv(s);
 	fas_queue_srv(s);
+	SPIN_UNLOCK(LBPRM_LOCK, &s->proxy->lbprm.lock);
 }
 
 /* This function updates the server trees according to server <srv>'s new
@@ -111,7 +114,7 @@
 	fas_dequeue_srv(srv);
 	fas_remove_from_tree(srv);
 
-out_update_backend:
+ out_update_backend:
 	/* check/update tot_used, tot_weight */
 	update_backend_weight(p);
  out_update_state:
@@ -274,14 +277,19 @@
 
 	srv = avoided = NULL;
 
+	SPIN_LOCK(LBPRM_LOCK, &p->lbprm.lock);
 	if (p->srv_act)
 		node = eb32_first(&p->lbprm.fas.act);
-	else if (p->lbprm.fbck)
-		return p->lbprm.fbck;
+	else if (p->lbprm.fbck) {
+		srv = p->lbprm.fbck;
+		goto out;
+	}
 	else if (p->srv_bck)
 		node = eb32_first(&p->lbprm.fas.bck);
-	else
-		return NULL;
+	else {
+		srv = NULL;
+		goto out;
+	}
 
 	while (node) {
 		/* OK, we have a server. However, it may be saturated, in which
@@ -304,7 +312,8 @@
 
 	if (!srv)
 		srv = avoided;
-
+  out:
+	SPIN_UNLOCK(LBPRM_LOCK, &p->lbprm.lock);
 	return srv;
 }
 
diff --git a/src/lb_fwlc.c b/src/lb_fwlc.c
index 5890312..8bd3ac2 100644
--- a/src/lb_fwlc.c
+++ b/src/lb_fwlc.c
@@ -55,8 +55,11 @@
 {
 	if (!s->lb_tree)
 		return;
+
+	SPIN_LOCK(LBPRM_LOCK, &s->proxy->lbprm.lock);
 	fwlc_dequeue_srv(s);
 	fwlc_queue_srv(s);
+	SPIN_UNLOCK(LBPRM_LOCK, &s->proxy->lbprm.lock);
 }
 
 /* This function updates the server trees according to server <srv>'s new
@@ -266,14 +269,19 @@
 
 	srv = avoided = NULL;
 
+	SPIN_LOCK(LBPRM_LOCK, &p->lbprm.lock);
 	if (p->srv_act)
 		node = eb32_first(&p->lbprm.fwlc.act);
-	else if (p->lbprm.fbck)
-		return p->lbprm.fbck;
+	else if (p->lbprm.fbck) {
+		srv = p->lbprm.fbck;
+		goto out;
+	}
 	else if (p->srv_bck)
 		node = eb32_first(&p->lbprm.fwlc.bck);
-	else
-		return NULL;
+	else {
+		srv = NULL;
+		goto out;
+	}
 
 	while (node) {
 		/* OK, we have a server. However, it may be saturated, in which
@@ -296,7 +304,8 @@
 
 	if (!srv)
 		srv = avoided;
-
+ out:
+	SPIN_UNLOCK(LBPRM_LOCK, &p->lbprm.lock);
 	return srv;
 }
 
diff --git a/src/lb_fwrr.c b/src/lb_fwrr.c
index e273a27..fe2777d 100644
--- a/src/lb_fwrr.c
+++ b/src/lb_fwrr.c
@@ -315,7 +315,7 @@
 	struct fwrr_group *grp;
 
 	grp = (s->flags & SRV_F_BACKUP) ? &p->lbprm.fwrr.bck : &p->lbprm.fwrr.act;
-	
+
 	/* Delay everything which does not fit into the window and everything
 	 * which does not fit into the theorical new window.
 	 */
@@ -327,7 +327,7 @@
 		 s->npos >= grp->curr_weight + grp->next_weight) {
 		/* put into next tree, and readjust npos in case we could
 		 * finally take this back to current. */
-		s->npos -= grp->curr_weight;
+		HA_ATOMIC_SUB(&s->npos, grp->curr_weight);
 		fwrr_queue_by_weight(grp->next, s);
 	}
 	else {
@@ -359,7 +359,7 @@
 		&s->proxy->lbprm.fwrr.bck :
 		&s->proxy->lbprm.fwrr.act;
 
-	s->npos += grp->curr_weight;
+	HA_ATOMIC_ADD(&s->npos, grp->curr_weight);
 }
 
 /* prepares a server when it was marked down */
@@ -414,7 +414,7 @@
 
 	node = eb32_first(&grp->curr);
 	s = eb32_entry(node, struct server, lb_node);
-	
+
 	if (!node || s->npos > grp->curr_pos) {
 		/* either we have no server left, or we have a hole */
 		struct eb32_node *node2;
@@ -442,20 +442,20 @@
 		/* first time ever for this server */
 		s->lpos = grp->curr_pos;
 		s->npos = grp->curr_pos + grp->next_weight / s->cur_eweight;
-		s->rweight += grp->next_weight % s->cur_eweight;
+		HA_ATOMIC_ADD(&s->rweight, (grp->next_weight % s->cur_eweight));
 
 		if (s->rweight >= s->cur_eweight) {
-			s->rweight -= s->cur_eweight;
-			s->npos++;
+			HA_ATOMIC_SUB(&s->rweight, s->cur_eweight);
+			HA_ATOMIC_ADD(&s->npos, 1);
 		}
 	} else {
 		s->lpos = s->npos;
-		s->npos += grp->next_weight / s->cur_eweight;
-		s->rweight += grp->next_weight % s->cur_eweight;
+		HA_ATOMIC_ADD(&s->npos, (grp->next_weight / s->cur_eweight));
+		HA_ATOMIC_ADD(&s->rweight, (grp->next_weight % s->cur_eweight));
 
 		if (s->rweight >= s->cur_eweight) {
-			s->rweight -= s->cur_eweight;
-			s->npos++;
+			HA_ATOMIC_SUB(&s->rweight, s->cur_eweight);
+			HA_ATOMIC_ADD(&s->npos, 1);
 		}
 	}
 }
@@ -470,14 +470,19 @@
 	struct fwrr_group *grp;
 	int switched;
 
+	SPIN_LOCK(LBPRM_LOCK, &p->lbprm.lock);
 	if (p->srv_act)
 		grp = &p->lbprm.fwrr.act;
-	else if (p->lbprm.fbck)
-		return p->lbprm.fbck;
+	else if (p->lbprm.fbck) {
+		srv = p->lbprm.fbck;
+		goto out;
+	}
 	else if (p->srv_bck)
 		grp = &p->lbprm.fwrr.bck;
-	else
-		return NULL;
+	else {
+		srv = NULL;
+		goto out;
+	}
 
 	switched = 0;
 	avoided = NULL;
@@ -558,6 +563,8 @@
 			} while (full);
 		}
 	}
+ out:
+	SPIN_UNLOCK(LBPRM_LOCK, &p->lbprm.lock);
 	return srv;
 }
 
diff --git a/src/lb_map.c b/src/lb_map.c
index fef16ac..028e85b 100644
--- a/src/lb_map.c
+++ b/src/lb_map.c
@@ -19,6 +19,7 @@
 #include <types/server.h>
 
 #include <proto/backend.h>
+#include <proto/lb_map.h>
 #include <proto/proto_http.h>
 #include <proto/proto_tcp.h>
 #include <proto/queue.h>
@@ -37,7 +38,7 @@
 	/* FIXME: could be optimized since we know what changed */
 	recount_servers(p);
 	update_backend_weight(p);
-	p->lbprm.map.state |= LB_MAP_RECALC;
+	recalc_server_map(p);
  out_update_state:
 	srv_lb_commit_status(srv);
 }
@@ -56,7 +57,7 @@
 	/* FIXME: could be optimized since we know what changed */
 	recount_servers(p);
 	update_backend_weight(p);
-	p->lbprm.map.state |= LB_MAP_RECALC;
+	recalc_server_map(p);
  out_update_state:
 	srv_lb_commit_status(srv);
 }
@@ -73,7 +74,6 @@
 
 	switch (px->lbprm.tot_used) {
 	case 0:	/* no server */
-		px->lbprm.map.state &= ~LB_MAP_RECALC;
 		return;
 	default:
 		tot = px->lbprm.tot_weight;
@@ -113,7 +113,7 @@
 					break;
 				}
 
-				cur->wscore += cur->next_eweight;
+				HA_ATOMIC_ADD(&cur->wscore, cur->next_eweight);
 				v = (cur->wscore + tot) / tot; /* result between 0 and 3 */
 				if (best == NULL || v > max) {
 					max = v;
@@ -122,9 +122,8 @@
 			}
 		}
 		px->lbprm.map.srv[o] = best;
-		best->wscore -= tot;
+		HA_ATOMIC_ADD(&best->wscore, tot);
 	}
-	px->lbprm.map.state &= ~LB_MAP_RECALC;
 }
 
 /* This function is responsible of building the server MAP for map-based LB
@@ -193,7 +192,6 @@
 
 	p->lbprm.map.srv = calloc(act, sizeof(struct server *));
 	/* recounts servers and their weights */
-	p->lbprm.map.state = LB_MAP_RECALC;
 	recount_servers(p);
 	update_backend_weight(p);
 	recalc_server_map(p);
@@ -210,11 +208,11 @@
 	int newidx, avoididx;
 	struct server *srv, *avoided;
 
-	if (px->lbprm.tot_weight == 0)
-		return NULL;
-
-	if (px->lbprm.map.state & LB_MAP_RECALC)
-		recalc_server_map(px);
+	SPIN_LOCK(LBPRM_LOCK, &px->lbprm.lock);
+	if (px->lbprm.tot_weight == 0) {
+		avoided = NULL;
+		goto out;
+	}
 
 	if (px->lbprm.map.rr_idx < 0 || px->lbprm.map.rr_idx >= px->lbprm.tot_weight)
 		px->lbprm.map.rr_idx = 0;
@@ -241,6 +239,8 @@
 	if (avoided)
 		px->lbprm.map.rr_idx = avoididx;
 
+  out:
+	SPIN_UNLOCK(LBPRM_LOCK, &px->lbprm.lock);
 	/* return NULL or srvtoavoid if found */
 	return avoided;
 }
@@ -255,10 +255,6 @@
 {
 	if (px->lbprm.tot_weight == 0)
 		return NULL;
-
-	if (px->lbprm.map.state & LB_MAP_RECALC)
-		recalc_server_map(px);
-
 	return px->lbprm.map.srv[hash % px->lbprm.tot_weight];
 }