MEDIUM: threads/server: Add a lock per server and atomically update server vars

The server's lock is use, among other things, to lock acces to the active
connection list of a server.
diff --git a/src/stream.c b/src/stream.c
index 2703f41..7bfe786 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -298,7 +298,7 @@
 	if (objt_server(s->target)) { /* there may be requests left pending in queue */
 		if (s->flags & SF_CURR_SESS) {
 			s->flags &= ~SF_CURR_SESS;
-			objt_server(s->target)->cur_sess--;
+			HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1);
 		}
 		if (may_dequeue_tasks(objt_server(s->target), s->be))
 			process_srv_queue(objt_server(s->target));
@@ -474,7 +474,7 @@
 		HA_ATOMIC_ADD(&s->be->be_counters.bytes_in,    bytes);
 
 		if (objt_server(s->target))
-			objt_server(s->target)->counters.bytes_in += bytes;
+			HA_ATOMIC_ADD(&objt_server(s->target)->counters.bytes_in, bytes);
 
 		if (sess->listener && sess->listener->counters)
 			HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes);
@@ -510,7 +510,7 @@
 		HA_ATOMIC_ADD(&s->be->be_counters.bytes_out,    bytes);
 
 		if (objt_server(s->target))
-			objt_server(s->target)->counters.bytes_out += bytes;
+			HA_ATOMIC_ADD(&objt_server(s->target)->counters.bytes_out, bytes);
 
 		if (sess->listener && sess->listener->counters)
 			HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes);
@@ -630,7 +630,7 @@
 
 		if (s->flags & SF_CURR_SESS) {
 			s->flags &= ~SF_CURR_SESS;
-			objt_server(s->target)->cur_sess--;
+			HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1);
 		}
 
 		if ((si->flags & SI_FL_ERR) &&
@@ -663,7 +663,7 @@
 		}
 
 		if (objt_server(s->target))
-			objt_server(s->target)->counters.failed_conns++;
+			HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_conns, 1);
 		HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
 		sess_change_server(s, NULL);
 		if (may_dequeue_tasks(objt_server(s->target), s->be))
@@ -711,7 +711,7 @@
 		si->state = SI_ST_REQ;
 	} else {
 		if (objt_server(s->target))
-			objt_server(s->target)->counters.retries++;
+			HA_ATOMIC_ADD(&objt_server(s->target)->counters.retries, 1);
 		HA_ATOMIC_ADD(&s->be->be_counters.retries, 1);
 		si->state = SI_ST_ASS;
 	}
@@ -860,7 +860,7 @@
 			if (srv)
 				srv_set_sess_last(srv);
 			if (srv)
-				srv->counters.failed_conns++;
+				HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
 			HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
 
 			/* release other streams waiting for this server */
@@ -916,7 +916,7 @@
 			si->exp = TICK_ETERNITY;
 			s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
 			if (srv)
-				srv->counters.failed_conns++;
+				HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
 			HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
 			si_shutr(si);
 			si_shutw(si);
@@ -1703,7 +1703,7 @@
 				HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
 				HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
 				if (srv)
-					srv->counters.cli_aborts++;
+					HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
 				if (!(s->flags & SF_ERR_MASK))
 					s->flags |= SF_ERR_CLICL;
 				if (!(s->flags & SF_FINST_MASK))
@@ -1719,12 +1719,12 @@
 			stream_int_report_error(si_b);
 			HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
 			if (srv)
-				srv->counters.failed_resp++;
+				HA_ATOMIC_ADD(&srv->counters.failed_resp, 1);
 			if (!(req->analysers) && !(res->analysers)) {
 				HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
 				HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
 				if (srv)
-					srv->counters.srv_aborts++;
+					HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
 				if (!(s->flags & SF_ERR_MASK))
 					s->flags |= SF_ERR_SRVCL;
 				if (!(s->flags & SF_FINST_MASK))
@@ -1782,7 +1782,7 @@
 		if (srv) {
 			if (s->flags & SF_CURR_SESS) {
 				s->flags &= ~SF_CURR_SESS;
-				srv->cur_sess--;
+				HA_ATOMIC_SUB(&srv->cur_sess, 1);
 			}
 			sess_change_server(s, NULL);
 			if (may_dequeue_tasks(srv, s->be))
@@ -1980,28 +1980,28 @@
 				HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
 				HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
 				if (srv)
-					srv->counters.cli_aborts++;
+					HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
 				s->flags |= SF_ERR_CLICL;
 			}
 			else if (req->flags & CF_READ_TIMEOUT) {
 				HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
 				HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
 				if (srv)
-					srv->counters.cli_aborts++;
+					HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
 				s->flags |= SF_ERR_CLITO;
 			}
 			else if (req->flags & CF_WRITE_ERROR) {
 				HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
 				HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
 				if (srv)
-					srv->counters.srv_aborts++;
+					HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
 				s->flags |= SF_ERR_SRVCL;
 			}
 			else {
 				HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
 				HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
 				if (srv)
-					srv->counters.srv_aborts++;
+					HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
 				s->flags |= SF_ERR_SRVTO;
 			}
 			sess_set_term_flags(s);
@@ -2013,28 +2013,28 @@
 				HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
 				HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
 				if (srv)
-					srv->counters.srv_aborts++;
+					HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
 				s->flags |= SF_ERR_SRVCL;
 			}
 			else if (res->flags & CF_READ_TIMEOUT) {
 				HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
 				HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
 				if (srv)
-					srv->counters.srv_aborts++;
+					HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
 				s->flags |= SF_ERR_SRVTO;
 			}
 			else if (res->flags & CF_WRITE_ERROR) {
 				HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
 				HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
 				if (srv)
-					srv->counters.cli_aborts++;
+					HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
 				s->flags |= SF_ERR_CLICL;
 			}
 			else {
 				HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
 				HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
 				if (srv)
-					srv->counters.cli_aborts++;
+					HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
 				s->flags |= SF_ERR_CLITO;
 			}
 			sess_set_term_flags(s);
@@ -2532,7 +2532,7 @@
 		return;
 
 	if (sess->srv_conn) {
-		sess->srv_conn->served--;
+		HA_ATOMIC_SUB(&sess->srv_conn->served, 1);
 		HA_ATOMIC_SUB(&sess->srv_conn->proxy->served, 1);
 		if (sess->srv_conn->proxy->lbprm.server_drop_conn)
 			sess->srv_conn->proxy->lbprm.server_drop_conn(sess->srv_conn);
@@ -2540,7 +2540,7 @@
 	}
 
 	if (newsrv) {
-		newsrv->served++;
+		HA_ATOMIC_ADD(&newsrv->served, 1);
 		HA_ATOMIC_ADD(&newsrv->proxy->served, 1);
 		if (newsrv->proxy->lbprm.server_take_conn)
 			newsrv->proxy->lbprm.server_take_conn(newsrv);
@@ -3263,9 +3263,11 @@
 		return 1;
 
 	/* kill all the stream that are on this server */
+	SPIN_LOCK(SERVER_LOCK, &sv->lock);
 	list_for_each_entry_safe(strm, strm_bck, &sv->actconns, by_srv)
 		if (strm->srv_conn == sv)
 			stream_shutdown(strm, SF_ERR_KILLED);
+	SPIN_UNLOCK(SERVER_LOCK, &sv->lock);
 	return 1;
 }