BUG/MEDIUM: peers: fix a case where peer session is not cleanly reset on release.

The deinit took place in only peer_session_release, but in the a case of a
previous call to peer_session_forceshutdown, the session cursors
won't be reset, resulting in a bad state for new session of the same
peer. For instance, a table definition message could be dropped and
so all update messages will be dropped by the remote peer.

This patch move the deinit processing directly in the force shutdown
funtion. Killed session remains in "ST_END" state but ref on peer was
reset to NULL and deinit will be skipped on session release function.

The session release continue to assure the deinit for "active" sessions.

This patch should be backported on all stable version since proto
peers v2.
diff --git a/src/peers.c b/src/peers.c
index a210de3..01120e6 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -208,7 +208,7 @@
 
 static size_t proto_len = sizeof(PEER_SESSION_PROTO_NAME) - 1;
 struct peers *cfg_peers = NULL;
-static void peer_session_forceshutdown(struct appctx *appctx);
+static void peer_session_forceshutdown(struct peer *peer);
 
 /* This function encode an uint64 to 'dynamic' length format.
    The encoded value is written at address *str, and the
@@ -595,14 +595,57 @@
 }
 
 /*
+ * Function to deinit connected peer
+ */
+void __peer_session_deinit(struct peer *peer)
+{
+	struct stream_interface *si;
+	struct stream *s;
+	struct peers *peers;
+
+	if (!peer->appctx)
+		return;
+
+	si = peer->appctx->owner;
+	if (!si)
+		return;
+
+	s = si_strm(si);
+	if (!s)
+		return;
+
+	peers = strm_fe(s)->parent;
+	if (!peers)
+		return;
+
+	if (peer->appctx->st0 == PEER_SESS_ST_WAITMSG)
+		HA_ATOMIC_SUB(&connected_peers, 1);
+
+	HA_ATOMIC_SUB(&active_peers, 1);
+
+	/* Re-init current table pointers to force announcement on re-connect */
+	peer->remote_table = peer->last_local_table = NULL;
+	peer->appctx = NULL;
+	if (peer->flags & PEER_F_LEARN_ASSIGN) {
+		/* unassign current peer for learning */
+		peer->flags &= ~(PEER_F_LEARN_ASSIGN);
+		peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
+
+		/* reschedule a resync */
+		peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
+	}
+	/* reset teaching and learning flags to 0 */
+	peer->flags &= PEER_TEACH_RESET;
+	peer->flags &= PEER_LEARN_RESET;
+	task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
+}
+
+/*
  * Callback to release a session with a peer
  */
 static void peer_session_release(struct appctx *appctx)
 {
-	struct stream_interface *si = appctx->owner;
-	struct stream *s = si_strm(si);
 	struct peer *peer = appctx->ctx.peers.ptr;
-	struct peers *peers = strm_fe(s)->parent;
 
 	/* appctx->ctx.peers.ptr is not a peer session */
 	if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
@@ -610,28 +653,10 @@
 
 	/* peer session identified */
 	if (peer) {
-		if (appctx->st0 == PEER_SESS_ST_WAITMSG)
-			_HA_ATOMIC_SUB(&connected_peers, 1);
-		_HA_ATOMIC_SUB(&active_peers, 1);
 		HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
-		if (peer->appctx == appctx) {
-			/* Re-init current table pointers to force announcement on re-connect */
-			peer->remote_table = peer->last_local_table = NULL;
-			peer->appctx = NULL;
-			if (peer->flags & PEER_F_LEARN_ASSIGN) {
-				/* unassign current peer for learning */
-				peer->flags &= ~(PEER_F_LEARN_ASSIGN);
-				peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
-
-				/* reschedule a resync */
-				peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
-			}
-			/* reset teaching and learning flags to 0 */
-			peer->flags &= PEER_TEACH_RESET;
-			peer->flags &= PEER_LEARN_RESET;
-		}
+		if (peer->appctx == appctx)
+			__peer_session_deinit(peer);
 		HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
-		task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
 	}
 }
 
@@ -2033,7 +2058,7 @@
 					 * for a while.
 					 */
 					curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
-					peer_session_forceshutdown(curpeer->appctx);
+					peer_session_forceshutdown(curpeer);
 				}
 				if (maj_ver != (unsigned int)-1 && min_ver != (unsigned int)-1) {
 					if (min_ver == PEER_DWNGRD_MINOR_VER) {
@@ -2251,11 +2276,14 @@
 	.release = peer_session_release,
 };
 
+
 /*
  * Use this function to force a close of a peer session
  */
-static void peer_session_forceshutdown(struct appctx *appctx)
+static void peer_session_forceshutdown(struct peer *peer)
 {
+	struct appctx *appctx = peer->appctx;
+
 	/* Note that the peer sessions which have just been created
 	 * (->st0 == PEER_SESS_ST_CONNECT) must not
 	 * be shutdown, if not, the TCP session will never be closed
@@ -2268,8 +2296,8 @@
 	if (appctx->applet != &peer_applet)
 		return;
 
-	if (appctx->st0 == PEER_SESS_ST_WAITMSG)
-		_HA_ATOMIC_SUB(&connected_peers, 1);
+	__peer_session_deinit(peer);
+
 	appctx->st0 = PEER_SESS_ST_END;
 	appctx_wakeup(appctx);
 }
@@ -2504,7 +2532,7 @@
 								}
 								else  {
 									ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
-									peer_session_forceshutdown(ps->appctx);
+									peer_session_forceshutdown(ps);
 								}
 							}
 							else if (tick_is_expired(ps->heartbeat, now_ms)) {
@@ -2560,8 +2588,7 @@
 				 */
 				ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
 				if (ps->appctx) {
-					peer_session_forceshutdown(ps->appctx);
-					ps->appctx = NULL;
+					peer_session_forceshutdown(ps);
 				}
 			}
 		}