MINOR: peers: Support for peer shards

Add "shards" new keyword for "peers" section to configure the number
of peer shards attached to such secions. This impact all the stick-tables
attached to the section.
Add "shard" new "server" parameter to configure the peers which participate to
all the stick-tables contents distribution. Each peer receive the stick-tables updates
only for keys with this shard value as distribution hash. The "shard" value
is stored in ->shard new server struct member.
cfg_parse_peers() which is the function which is called to parse all
the lines of a "peers" section is modified to parse the "shards" parameter
stored in ->nb_shards new peers struct member.
Add srv_parse_shard() new callback into server.c to pare the "shard"
parameter.
Implement stksess_getkey_hash() to compute the distribution hash for a
stick-table key as the 64-bits xxhash of the key concatenated to the stick-table
name. This function is called by stksess_setkey_shard(), itself
called by the already implemented function which create a new stick-table
key (stksess_new()).
Add ->idlen new stktable struct member to store the stick-table name length
to not have to compute it each time a stick-table key hash is computed.
diff --git a/doc/configuration.txt b/doc/configuration.txt
index f1b5cd9..d3ab65c 100644
--- a/doc/configuration.txt
+++ b/doc/configuration.txt
@@ -3428,6 +3428,13 @@
         server haproxy2 192.168.0.2:1024
         server haproxy3 10.2.0.1:1024
 
+shards <shards>
+
+  In some configurations, one would like to distribute the stick-table contents
+  to some peers in place of sending all the stick-table contents to each peer
+  declared in the "peers" section. In such cases, "shards" specifies the
+  number of peer involved in this stick-table contents distribution.
+  See also "shard" server parameter.
 
 table <tablename> type {ip | integer | string [len <length>] | binary [len <length>]}
       size <size> [expire <expire>] [nopurge] [store <data_type>]*
@@ -15646,6 +15653,25 @@
   protocol. See also the "no-send-proxy-v2-ssl-cn" option of this section and
   the "send-proxy-v2" option of the "bind" keyword.
 
+shard <shard>
+  This parameter in used only in the context of stick-tables synchronisation
+  with peers protocol. The "shard" parameter identifies the peers which will
+  receive all the stick-table updates for keys with this shard as distribution
+  hash. The accepted values are 0 up to "shards" parameter value specified in
+  the "peers" section. 0 value is the default value meaning that the peer will
+  receive all the key updates. Greater values than "shards" will be ignored.
+  This is also the case for any value provided to the local peer.
+
+  Example :
+
+  peers mypeers
+       shards 3
+       peer A 127.0.0.1:40001 # local peer without shard value (0 internally)
+       peer B 127.0.0.1:40002 shard 1
+       peer C 127.0.0.1:40003 shard 2
+       peer D 127.0.0.1:40004 shard 3
+
+
 slowstart <start_time_in_ms>
   The "slowstart" parameter for a server accepts a value in milliseconds which
   indicates after how long a server which has just come back up will run at
diff --git a/include/haproxy/peers-t.h b/include/haproxy/peers-t.h
index 6a1c215..965bf3e 100644
--- a/include/haproxy/peers-t.h
+++ b/include/haproxy/peers-t.h
@@ -101,6 +101,7 @@
 	unsigned int flags;             /* current peers section resync state */
 	unsigned int resync_timeout;    /* resync timeout timer */
 	int count;                      /* total of peers */
+	int nb_shards;                  /* Number of peer shards */
 	int disabled;                   /* peers proxy disabled if >0 */
 	int applet_count[MAX_THREADS];  /* applet count per thread */
 };
diff --git a/include/haproxy/server-t.h b/include/haproxy/server-t.h
index ac287df..ff5faae 100644
--- a/include/haproxy/server-t.h
+++ b/include/haproxy/server-t.h
@@ -264,6 +264,7 @@
 	unsigned rweight;			/* remainder of weight in the current LB tree */
 	unsigned cumulative_weight;		/* weight of servers prior to this one in the same group, for chash balancing */
 	int maxqueue;				/* maximum number of pending connections allowed */
+	int shard;				/* shard (in peers protocol context only) */
 
 	enum srv_ws_mode ws;                    /* configure the protocol selection for websocket */
 	/* 3 bytes hole here */
diff --git a/include/haproxy/stick_table-t.h b/include/haproxy/stick_table-t.h
index b423f2d..54b27bb 100644
--- a/include/haproxy/stick_table-t.h
+++ b/include/haproxy/stick_table-t.h
@@ -145,6 +145,7 @@
 	unsigned int expire;      /* session expiration date */
 	unsigned int ref_cnt;     /* reference count, can only purge when zero */
 	__decl_thread(HA_RWLOCK_T lock); /* lock related to the table entry */
+	int shard;                /* shard */
 	struct eb32_node exp;     /* ebtree node used to hold the session in expiration tree */
 	struct eb32_node upd;     /* ebtree node used to hold the update sequence tree */
 	struct ebmb_node key;     /* ebtree node used to hold the session in table */
@@ -155,6 +156,7 @@
 /* stick table */
 struct stktable {
 	char *id;		  /* local table id name. */
+	size_t idlen;	  /* local table id name length. */
 	char *nid;		  /* table id name sent over the network with peers protocol. */
 	struct stktable *next;    /* The stick-table may be linked when belonging to
 	                           * the same configuration section.
diff --git a/src/cfgparse.c b/src/cfgparse.c
index 0cf0485..2164880 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -685,6 +685,7 @@
 int cfg_parse_peers(const char *file, int linenum, char **args, int kwm)
 {
 	static struct peers *curpeers = NULL;
+	static int nb_shards = 0;
 	struct peer *newpeer = NULL;
 	const char *err;
 	struct bind_conf *bind_conf;
@@ -905,6 +906,13 @@
 			goto out;
 		}
 
+		if (nb_shards && curpeers->peers_fe->srv->shard > nb_shards) {
+			ha_warning("parsing [%s:%d] : '%s %s' : %d peer shard greater value than %d shards value is ignored.\n",
+			           file, linenum, args[0], args[1], curpeers->peers_fe->srv->shard, nb_shards);
+			curpeers->peers_fe->srv->shard = 0;
+			err_code |= ERR_WARN;
+		}
+
 		if (curpeers->peers_fe->srv->init_addr_methods || curpeers->peers_fe->srv->resolvers_id ||
 		    curpeers->peers_fe->srv->do_check || curpeers->peers_fe->srv->do_agent) {
 			ha_warning("parsing [%s:%d] : '%s %s' : init_addr, resolvers, check and agent are ignored for peers.\n", file, linenum, args[0], args[1]);
@@ -966,6 +974,32 @@
 		l->options |= LI_O_UNLIMITED; /* don't make the peers subject to global limits */
 		global.maxsock++; /* for the listening socket */
 	}
+	else if (strcmp(args[0], "shards") == 0) {
+		char *endptr;
+
+		if (!*args[1]) {
+			ha_alert("parsing [%s:%d] : '%s' : missing value\n", file, linenum, args[0]);
+			err_code |= ERR_FATAL;
+			goto out;
+		}
+
+		curpeers->nb_shards = strtol(args[1], &endptr, 10);
+		if (*endptr != '\0') {
+			ha_alert("parsing [%s:%d] : '%s' : expects an integer argument, found '%s'\n",
+			         file, linenum, args[0], args[1]);
+			err_code |= ERR_FATAL;
+			goto out;
+		}
+
+		if (!curpeers->nb_shards) {
+			ha_alert("parsing [%s:%d] : '%s' : expects a strictly positive integer argument\n",
+			         file, linenum, args[0]);
+			err_code |= ERR_FATAL;
+			goto out;
+		}
+
+		nb_shards = curpeers->nb_shards;
+	}
 	else if (strcmp(args[0], "table") == 0) {
 		struct stktable *t, *other;
 		char *id;
@@ -4373,6 +4407,7 @@
 		 */
 		last = &cfg_peers;
 		while (*last) {
+			struct peer *peer;
 			struct stktable *t;
 			curpeers = *last;
 
@@ -4464,6 +4499,26 @@
 					break;
 				}
 				last = &curpeers->next;
+
+				/* Ignore the peer shard greater than the number of peer shard for this section.
+				 * Also ignore the peer shard of the local peer.
+				 */
+				for (peer = curpeers->remote; peer; peer = peer->next) {
+					if (peer == curpeers->local) {
+						if (peer->srv->shard) {
+							ha_warning("Peers section '%s': shard ignored for '%s' local peer\n",
+									   curpeers->id, peer->id);
+							peer->srv->shard = 0;
+						}
+					}
+					else if (peer->srv->shard > curpeers->nb_shards) {
+						ha_warning("Peers section '%s': shard ignored for '%s' local peer because "
+								   "%d shard value is greater than the section number of shards (%d)\n",
+								   curpeers->id, peer->id, peer->srv->shard, curpeers->nb_shards);
+						peer->srv->shard = 0;
+					}
+				}
+
 				continue;
 			}
 
diff --git a/src/peers.c b/src/peers.c
index e0fedbd..8e6c8b0 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -1597,6 +1597,13 @@
 		}
 
 		updateid = ts->upd.key;
+		if (p->srv->shard && ts->shard != p->srv->shard) {
+			/* Skip this entry */
+			st->last_pushed = updateid;
+			new_pushed = 1;
+			continue;
+		}
+
 		ts->ref_cnt++;
 		HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->lock);
 
diff --git a/src/server.c b/src/server.c
index 4b59e90..437f3c2 100644
--- a/src/server.c
+++ b/src/server.c
@@ -807,6 +807,14 @@
 	return srv_disable_pp_flags(newsrv, SRV_PP_V2);
 }
 
+/* Parse the "shard" server keyword */
+static int srv_parse_shard(char **args, int *cur_arg,
+                           struct proxy *curproxy, struct server *newsrv, char **err)
+{
+	newsrv->shard = atol(args[*cur_arg + 1]);
+	return 0;
+}
+
 /* Parse the "no-tfo" server keyword */
 static int srv_parse_no_tfo(char **args, int *cur_arg,
                             struct proxy *curproxy, struct server *newsrv, char **err)
@@ -1791,6 +1799,7 @@
 	{ "resolvers",           srv_parse_resolvers,           1,  1,  0 }, /* Configure the resolver to use for name resolution */
 	{ "send-proxy",          srv_parse_send_proxy,          0,  1,  1 }, /* Enforce use of PROXY V1 protocol */
 	{ "send-proxy-v2",       srv_parse_send_proxy_v2,       0,  1,  1 }, /* Enforce use of PROXY V2 protocol */
+	{ "shard",               srv_parse_shard,               1,  1,  1 }, /* Server shard (only in peers protocol context) */
 	{ "slowstart",           srv_parse_slowstart,           1,  1,  1 }, /* Set the warm-up timer for a previously failed server */
 	{ "source",              srv_parse_source,             -1,  1,  1 }, /* Set the source address to be used to connect to the server */
 	{ "stick",               srv_parse_stick,               0,  1,  0 }, /* Enable stick-table persistence */
diff --git a/src/stick_table.c b/src/stick_table.c
index c6fe243..77a5ba5 100644
--- a/src/stick_table.c
+++ b/src/stick_table.c
@@ -17,6 +17,7 @@
 #include <import/ebmbtree.h>
 #include <import/ebsttree.h>
 #include <import/ebistree.h>
+#include <import/xxhash.h>
 
 #include <haproxy/api.h>
 #include <haproxy/applet.h>
@@ -155,8 +156,50 @@
 	}
 }
 
+/*
+ * Initialize or update the key hash in the sticky session <ts> present in table <t>
+ * from the value present in <key>.
+ */
+static unsigned long long stksess_getkey_hash(struct stktable *t,
+                                              struct stksess *ts,
+                                              struct stktable_key *key)
+{
+	struct buffer *buf;
+	size_t keylen;
+
+	/* Copy the stick-table id into <buf> */
+	buf = get_trash_chunk();
+	memcpy(b_tail(buf), t->id, t->idlen);
+	b_add(buf, t->idlen);
+	/* Copy the key into <buf> */
+	if (t->type == SMP_T_STR)
+		keylen = key->key_len;
+	else
+		keylen = t->key_size;
+	memcpy(b_tail(buf), key->key, keylen);
+	b_add(buf, keylen);
+
+	return XXH64(b_head(buf), b_data(buf), 0);
+}
 
 /*
+ * Set the shard for <key> key of <ts> sticky session attached to <t> stick table.
+ * Do nothing for stick-table without peers synchronisation.
+ */
+static void stksess_setkey_shard(struct stktable *t, struct stksess *ts,
+                                 struct stktable_key *key)
+{
+	if (!t->peers.p)
+		/* This stick-table is not attached to any peers section */
+		return;
+
+	if (!t->peers.p->nb_shards)
+		ts->shard = 0;
+	else
+		ts->shard = stksess_getkey_hash(t, ts, key) % t->peers.p->nb_shards + 1;
+}
+
+/*
  * Init sticky session <ts> of table <t>. The data parts are cleared and <ts>
  * is returned.
  */
@@ -282,8 +325,10 @@
 	if (ts) {
 		ts = (void *)ts + round_ptr_size(t->data_size);
 		__stksess_init(t, ts);
-		if (key)
+		if (key) {
 			stksess_setkey(t, ts, key);
+			stksess_setkey_shard(t, ts, key);
+		}
 	}
 
 	return ts;
@@ -851,6 +896,7 @@
 	}
 
 	t->id =  id;
+	t->idlen = strlen(id);
 	t->nid =  nid;
 	t->type = (unsigned int)-1;
 	t->conf.file = file;