MINOR: stats: report the number of currently connected peers

The active peers output indicates both the number of established peers
connections and the number of peers connection attempts. The new counter
"ConnectedPeers" also indicates the number of currently connected peers.
This helps detect that some peers cannot be reached for example. It's
worth mentioning that this value changes over time because unused peers
are often disconnected and reconnected. Most of the time it should be
equal to ActivePeers.
diff --git a/src/peers.c b/src/peers.c
index 1d6dac1..8a8e890 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -506,6 +506,8 @@
 
 	/* peer session identified */
 	if (peer) {
+		if (appctx->st0 == PEER_SESS_ST_WAITMSG)
+			HA_ATOMIC_SUB(&connected_peers, 1);
 		HA_ATOMIC_SUB(&active_peers, 1);
 		HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
 		if (peer->appctx == appctx) {
@@ -571,20 +573,24 @@
 	int repl = 0;
 	size_t proto_len = strlen(PEER_SESSION_PROTO_NAME);
 	unsigned int maj_ver, min_ver;
+	int prev_state;
 
 	/* Check if the input buffer is avalaible. */
 	if (si_ic(si)->buf.size == 0)
 		goto full;
 
 	while (1) {
+		prev_state = appctx->st0;
 switchstate:
 		maj_ver = min_ver = (unsigned int)-1;
 		switch(appctx->st0) {
 			case PEER_SESS_ST_ACCEPT:
+				prev_state = appctx->st0;
 				appctx->ctx.peers.ptr = NULL;
 				appctx->st0 = PEER_SESS_ST_GETVERSION;
 				/* fall through */
 			case PEER_SESS_ST_GETVERSION:
+				prev_state = appctx->st0;
 				reql = co_getline(si_oc(si), trash.area,
 						  trash.size);
 				if (reql <= 0) { /* closed or EOL not found */
@@ -620,6 +626,7 @@
 				appctx->st0 = PEER_SESS_ST_GETHOST;
 				/* fall through */
 			case PEER_SESS_ST_GETHOST:
+				prev_state = appctx->st0;
 				reql = co_getline(si_oc(si), trash.area,
 						  trash.size);
 				if (reql <= 0) { /* closed or EOL not found */
@@ -650,6 +657,8 @@
 				/* fall through */
 			case PEER_SESS_ST_GETPEER: {
 				char *p;
+
+				prev_state = appctx->st0;
 				reql = co_getline(si_oc(si), trash.area,
 						  trash.size);
 				if (reql <= 0) { /* closed or EOL not found */
@@ -725,6 +734,7 @@
 			case PEER_SESS_ST_SENDSUCCESS: {
 				struct shared_table *st;
 
+				prev_state = appctx->st0;
 				if (!curpeer) {
 					curpeer = appctx->ctx.peers.ptr;
 					HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
@@ -783,11 +793,12 @@
 
 
 				/* switch to waiting message state */
+				HA_ATOMIC_ADD(&connected_peers, 1);
 				appctx->st0 = PEER_SESS_ST_WAITMSG;
 				goto switchstate;
 			}
 			case PEER_SESS_ST_CONNECT: {
-
+				prev_state = appctx->st0;
 				if (!curpeer) {
 					curpeer = appctx->ctx.peers.ptr;
 					HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
@@ -827,6 +838,7 @@
 			case PEER_SESS_ST_GETSTATUS: {
 				struct shared_table *st;
 
+				prev_state = appctx->st0;
 				if (!curpeer) {
 					curpeer = appctx->ctx.peers.ptr;
 					HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
@@ -904,6 +916,7 @@
 					appctx->st0 = PEER_SESS_ST_END;
 					goto switchstate;
 				}
+				HA_ATOMIC_ADD(&connected_peers, 1);
 				appctx->st0 = PEER_SESS_ST_WAITMSG;
 				/* fall through */
 			}
@@ -915,6 +928,7 @@
 				unsigned char msg_head[7];
 				int totl = 0;
 
+				prev_state = appctx->st0;
 				if (!curpeer) {
 					curpeer = appctx->ctx.peers.ptr;
 					HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
@@ -1818,6 +1832,9 @@
 				goto out;
 			}
 			case PEER_SESS_ST_EXIT:
+				if (prev_state == PEER_SESS_ST_WAITMSG)
+					HA_ATOMIC_SUB(&connected_peers, 1);
+				prev_state = appctx->st0;
 				repl = snprintf(trash.area, trash.size,
 						"%d\n", appctx->st1);
 				if (ci_putblk(si_ic(si), trash.area, repl) == -1)
@@ -1827,6 +1844,9 @@
 			case PEER_SESS_ST_ERRSIZE: {
 				unsigned char msg[2];
 
+				if (prev_state == PEER_SESS_ST_WAITMSG)
+					HA_ATOMIC_SUB(&connected_peers, 1);
+				prev_state = appctx->st0;
 				msg[0] = PEER_MSG_CLASS_ERROR;
 				msg[1] = PEER_MSG_ERR_SIZELIMIT;
 
@@ -1838,15 +1858,22 @@
 			case PEER_SESS_ST_ERRPROTO: {
 				unsigned char msg[2];
 
+				if (prev_state == PEER_SESS_ST_WAITMSG)
+					HA_ATOMIC_SUB(&connected_peers, 1);
+				prev_state = appctx->st0;
 				msg[0] = PEER_MSG_CLASS_ERROR;
 				msg[1] = PEER_MSG_ERR_PROTOCOL;
 
 				if (ci_putblk(si_ic(si), (char *)msg, sizeof(msg)) == -1)
 					goto full;
 				appctx->st0 = PEER_SESS_ST_END;
+				prev_state = appctx->st0;
 				/* fall through */
 			}
 			case PEER_SESS_ST_END: {
+				if (prev_state == PEER_SESS_ST_WAITMSG)
+					HA_ATOMIC_SUB(&connected_peers, 1);
+				prev_state = appctx->st0;
 				if (curpeer) {
 					HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
 					curpeer = NULL;
@@ -1893,6 +1920,8 @@
 	if (appctx->applet != &peer_applet)
 		return;
 
+	if (appctx->st0 == PEER_SESS_ST_WAITMSG)
+		HA_ATOMIC_SUB(&connected_peers, 1);
 	appctx->st0 = PEER_SESS_ST_END;
 	appctx_wakeup(appctx);
 }