MAJOR: threads/peers: Make peers thread safe

A lock is used to protect accesses to a peer structure.

A the lock is taken in the applet handler when the peer is identified
and released living the applet handler.

In the scheduling task for peers section, the lock is taken for every
listed peer and released at the end of the process task function.

The peer 'force shutdown' function was also re-worked.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 8b32cf6..3a77bd1 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -155,6 +155,7 @@
 	STK_TABLE_LOCK,
 	STK_SESS_LOCK,
 	APPLETS_LOCK,
+	PEER_LOCK,
 	LOCK_LABELS
 };
 struct lock_stat {
@@ -241,7 +242,7 @@
 					   "TASK_RQ", "TASK_WQ", "POOL",
 					   "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
 					   "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
-					   "APPLETS" };
+					   "APPLETS", "PEER" };
 	int lbl;
 
 	for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
diff --git a/include/types/applet.h b/include/types/applet.h
index e96b703..b56c563 100644
--- a/include/types/applet.h
+++ b/include/types/applet.h
@@ -71,7 +71,7 @@
 
 	union {
 		struct {
-			void *ptr;              /* multi-purpose pointer for peers */
+			void *ptr;              /* current peer or NULL, do not use for something else */
 		} peers;                        /* used by the peers applet */
 		struct {
 			int connected;
diff --git a/include/types/peers.h b/include/types/peers.h
index a77a094..2fc7435 100644
--- a/include/types/peers.h
+++ b/include/types/peers.h
@@ -67,6 +67,9 @@
 	struct shared_table *remote_table;
 	struct shared_table *last_local_table;
 	struct shared_table *tables;
+#ifdef USE_THREAD
+	HA_SPINLOCK_T lock;	 /* lock used to handle this peer section */
+#endif
 	struct peer *next;	  /* next peer in the list */
 };
 
diff --git a/src/cfgparse.c b/src/cfgparse.c
index dd49009..ca2d5d7 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -2039,6 +2039,7 @@
 		newpeer->proto = proto;
 		newpeer->xprt  = xprt_get(XPRT_RAW);
 		newpeer->sock_init_arg = NULL;
+		SPIN_INIT(&newpeer->lock);
 
 		if (strcmp(newpeer->id, localpeer) == 0) {
 			/* Current is local peer, it define a frontend */
diff --git a/src/peers.c b/src/peers.c
index ef332eb..2ca08fe 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -509,6 +509,7 @@
 
 	/* peer session identified */
 	if (peer) {
+		SPIN_LOCK(PEER_LOCK, &peer->lock);
 		if (peer->appctx == appctx) {
 			/* Re-init current table pointers to force announcement on re-connect */
 			peer->remote_table = peer->last_local_table = NULL;
@@ -525,6 +526,7 @@
 			peer->flags &= PEER_TEACH_RESET;
 			peer->flags &= PEER_LEARN_RESET;
 		}
+		SPIN_UNLOCK(PEER_LOCK, &peer->lock);
 		task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
 	}
 }
@@ -566,6 +568,7 @@
 	struct stream_interface *si = appctx->owner;
 	struct stream *s = si_strm(si);
 	struct peers *curpeers = strm_fe(s)->parent;
+	struct peer *curpeer = NULL;
 	int reql = 0;
 	int repl = 0;
 	size_t proto_len = strlen(PEER_SESSION_PROTO_NAME);
@@ -646,7 +649,6 @@
 				appctx->st0 = PEER_SESS_ST_GETPEER;
 				/* fall through */
 			case PEER_SESS_ST_GETPEER: {
-				struct peer *curpeer;
 				char *p;
 				reql = co_getline(si_oc(si), trash.str, trash.size);
 				if (reql <= 0) { /* closed or EOL not found */
@@ -689,6 +691,7 @@
 					goto switchstate;
 				}
 
+				SPIN_LOCK(PEER_LOCK, &curpeer->lock);
 				if (curpeer->appctx && curpeer->appctx != appctx) {
 					if (curpeer->local) {
 						/* Local connection, reply a retry */
@@ -696,6 +699,12 @@
 						appctx->st1 = PEER_SESS_SC_TRYAGAIN;
 						goto switchstate;
 					}
+
+					/* we're killing a connection, we must apply a random delay before
+					 * retrying otherwise the other end will do the same and we can loop
+					 * for a while.
+					 */
+					curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
 					peer_session_forceshutdown(curpeer->appctx);
 				}
 				if (maj_ver != (unsigned int)-1 && min_ver != (unsigned int)-1) {
@@ -712,9 +721,16 @@
 				/* fall through */
 			}
 			case PEER_SESS_ST_SENDSUCCESS: {
-				struct peer *curpeer = appctx->ctx.peers.ptr;
 				struct shared_table *st;
 
+				if (!curpeer) {
+					curpeer = appctx->ctx.peers.ptr;
+					SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+					if (curpeer->appctx != appctx) {
+						appctx->st0 = PEER_SESS_ST_END;
+						goto switchstate;
+					}
+				}
 				repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
 				repl = ci_putblk(si_ic(si), trash.str, repl);
 				if (repl <= 0) {
@@ -767,7 +783,15 @@
 				goto switchstate;
 			}
 			case PEER_SESS_ST_CONNECT: {
-				struct peer *curpeer = appctx->ctx.peers.ptr;
+
+				if (!curpeer) {
+					curpeer = appctx->ctx.peers.ptr;
+					SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+					if (curpeer->appctx != appctx) {
+						appctx->st0 = PEER_SESS_ST_END;
+						goto switchstate;
+					}
+				}
 
 				/* Send headers */
 				repl = snprintf(trash.str, trash.size,
@@ -797,9 +821,17 @@
 				/* fall through */
 			}
 			case PEER_SESS_ST_GETSTATUS: {
-				struct peer *curpeer = appctx->ctx.peers.ptr;
 				struct shared_table *st;
 
+				if (!curpeer) {
+					curpeer = appctx->ctx.peers.ptr;
+					SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+					if (curpeer->appctx != appctx) {
+						appctx->st0 = PEER_SESS_ST_END;
+						goto switchstate;
+					}
+				}
+
 				if (si_ic(si)->flags & CF_WRITE_PARTIAL)
 					curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
 
@@ -871,7 +903,6 @@
 				/* fall through */
 			}
 			case PEER_SESS_ST_WAITMSG: {
-				struct peer *curpeer = appctx->ctx.peers.ptr;
 				struct stksess *ts, *newts = NULL;
 				uint32_t msg_len = 0;
 				char *msg_cur = trash.str;
@@ -879,6 +910,15 @@
 				unsigned char msg_head[7];
 				int totl = 0;
 
+				if (!curpeer) {
+					curpeer = appctx->ctx.peers.ptr;
+					SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+					if (curpeer->appctx != appctx) {
+						appctx->st0 = PEER_SESS_ST_END;
+						goto switchstate;
+					}
+				}
+
 				reql = co_getblk(si_oc(si), (char *)msg_head, 2*sizeof(unsigned char), totl);
 				if (reql <= 0) /* closed or EOL not found */
 					goto incomplete;
@@ -1417,6 +1457,7 @@
 						}
 
 						if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) {
+							SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
 							if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) &&
 							    ((int)(st->last_pushed - st->table->localupdate) < 0)) {
 								struct eb32_node *eb;
@@ -1447,7 +1488,6 @@
 
 								/* We force new pushed to 1 to force identifier in update message */
 								new_pushed = 1;
-								SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
 								while (1) {
 									uint32_t msglen;
 									struct stksess *ts;
@@ -1505,8 +1545,8 @@
 									/* identifier may not needed in next update message */
 									new_pushed = 0;
 								}
-								SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
 							}
+							SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
 						}
 						else {
 							if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
@@ -1756,6 +1796,10 @@
 				/* fall through */
 			}
 			case PEER_SESS_ST_END: {
+				if (curpeer) {
+					SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
+					curpeer = NULL;
+				}
 				si_shutw(si);
 				si_shutr(si);
 				si_ic(si)->flags |= CF_READ_NULL;
@@ -1765,6 +1809,9 @@
 	}
 out:
 	si_oc(si)->flags |= CF_READ_DONTWAIT;
+
+	if (curpeer)
+		SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
 	return;
 full:
 	si_applet_cant_put(si);
@@ -1783,8 +1830,6 @@
  */
 static void peer_session_forceshutdown(struct appctx *appctx)
 {
-	struct peer *ps;
-
 	/* Note that the peer sessions which have just been created
 	 * (->st0 == PEER_SESS_ST_CONNECT) must not
 	 * be shutdown, if not, the TCP session will never be closed
@@ -1797,16 +1842,7 @@
 	if (appctx->applet != &peer_applet)
 		return;
 
-	ps = appctx->ctx.peers.ptr;
-	/* we're killing a connection, we must apply a random delay before
-	 * retrying otherwise the other end will do the same and we can loop
-	 * for a while.
-	 */
-	if (ps)
-		ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
-
 	appctx->st0 = PEER_SESS_ST_END;
-	appctx->ctx.peers.ptr = NULL;
 	appctx_wakeup(appctx);
 }
 
@@ -1922,6 +1958,10 @@
 		return NULL;
 	}
 
+	/* Acquire lock for all peers of the section */
+	for (ps = peers->remote; ps; ps = ps->next)
+		SPIN_LOCK(PEER_LOCK, &ps->lock);
+
 	if (!stopping) {
 		/* Normal case (not soft stop)*/
 
@@ -2033,6 +2073,11 @@
 
 			/* disconnect all connected peers */
 			for (ps = peers->remote; ps; ps = ps->next) {
+				/* we're killing a connection, we must apply a random delay before
+				 * retrying otherwise the other end will do the same and we can loop
+				 * for a while.
+				 */
+				ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
 				if (ps->appctx) {
 					peer_session_forceshutdown(ps->appctx);
 					ps->appctx = NULL;
@@ -2086,6 +2131,11 @@
 			}
 		}
 	} /* stopping */
+
+	/* Release lock for all peers of the section */
+	for (ps = peers->remote; ps; ps = ps->next)
+		SPIN_UNLOCK(PEER_LOCK, &ps->lock);
+
 	/* Wakeup for re-connect */
 	return task;
 }