MEDIUM: peers: limit the number of updates sent at once

As seen in GH issue #1770, peers synchronization do not cope well with
very large buffers because by default the only two reasons for stopping
the processing of updates is either that the end was reached or that
the buffer is full. This can cause high latencies, and even rightfully
trigger the watchdog when the operations are numerous and slowed down
by competition on the stick-table lock.

This patch introduces a limit to the number of messages one may send
at once, which now defaults to 200, regardless of the buffer size. This
means taking and releasing the lock up to 400 times in a row, which is
costly enough to let some other parts work.

After some observation this could be backported to 2.6. If so, however,
previous commits "BUG/MEDIUM: applet: fix incorrect check for abnormal
return condition from handler" and "BUG/MINOR: applet: make the call_rate
only count the no-progress calls" must be backported otherwise the call
rate might trigger the looping protection.
diff --git a/doc/configuration.txt b/doc/configuration.txt
index d5a1ec8..fc4f3a4 100644
--- a/doc/configuration.txt
+++ b/doc/configuration.txt
@@ -1122,6 +1122,7 @@
    - tune.maxpollevents
    - tune.maxrewrite
    - tune.pattern.cache-size
+   - tune.peers.max-updates-at-once
    - tune.pipesize
    - tune.pool-high-fd-ratio
    - tune.pool-low-fd-ratio
@@ -2931,6 +2932,15 @@
   aging components. If this is not acceptable, the cache can be disabled by
   setting this parameter to 0.
 
+tune.peers.max-updates-at-once <number>
+  Sets the maximum number of stick-table updates that haproxy will try to
+  process at once when sending messages. Retrieving the data for these updates
+  requires some locking operations which can be CPU intensive on highly
+  threaded machines if unbound, and may also increase the traffic latency
+  during the initial batched transfer between an older and a newer process.
+  Conversely low values may also incur higher CPU overhead, and take longer
+  to complete. The default value is 200 and it is suggested not to change it.
+
 tune.pipesize <number>
   Sets the kernel pipe buffer size to this size (in bytes). By default, pipes
   are the default size for the system. But sometimes when using TCP splicing,
diff --git a/src/peers.c b/src/peers.c
index 3df4999..642ad1d 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -25,6 +25,7 @@
 
 #include <haproxy/api.h>
 #include <haproxy/applet.h>
+#include <haproxy/cfgparse.h>
 #include <haproxy/channel.h>
 #include <haproxy/cli.h>
 #include <haproxy/dict.h>
@@ -105,6 +106,9 @@
 #define PEER_LOCAL_RECONNECT_TIMEOUT 500 /* 500ms */
 #define PEER_HEARTBEAT_TIMEOUT      3000 /* 3 seconds */
 
+/* default maximum of updates sent at once */
+#define PEER_DEF_MAX_UPDATES_AT_ONCE      200
+
 /* flags for "show peers" */
 #define PEERS_SHOW_F_DICT           0x00000001 /* also show the contents of the dictionary */
 
@@ -302,6 +306,7 @@
 
 static size_t proto_len = sizeof(PEER_SESSION_PROTO_NAME) - 1;
 struct peers *cfg_peers = NULL;
+static int peers_max_updates_at_once = PEER_DEF_MAX_UPDATES_AT_ONCE;
 static void peer_session_forceshutdown(struct peer *peer);
 
 static struct ebpt_node *dcache_tx_insert(struct dcache *dc,
@@ -1559,6 +1564,7 @@
                                       struct shared_table *st, int locked)
 {
 	int ret, new_pushed, use_timed;
+	int updates_sent = 0;
 
 	ret = 1;
 	use_timed = 0;
@@ -1585,8 +1591,10 @@
 
 		/* push local updates */
 		ts = peer_stksess_lookup(st);
-		if (!ts)
+		if (!ts) {
+			ret = 1; // done
 			break;
+		}
 
 		updateid = ts->upd.key;
 		ts->ref_cnt++;
@@ -1596,9 +1604,7 @@
 		if (ret <= 0) {
 			HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
 			ts->ref_cnt--;
-			if (!locked)
-				HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-			return ret;
+			break;
 		}
 
 		HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
@@ -1611,12 +1617,22 @@
 
 		/* identifier may not needed in next update message */
 		new_pushed = 0;
+
+		updates_sent++;
+		if (updates_sent >= peers_max_updates_at_once) {
+			/* pretend we're full so that we get back ASAP */
+			struct stconn *sc = appctx_sc(appctx);
+
+			sc_need_room(sc);
+			ret = -1;
+			break;
+		}
 	}
 
  out:
 	if (!locked)
 		HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-	return 1;
+	return ret;
 }
 
 /*
@@ -2548,6 +2564,7 @@
 	if (peer->tables) {
 		struct shared_table *st;
 		struct shared_table *last_local_table;
+		int updates_sent = 0;
 
 		last_local_table = peer->last_local_table;
 		if (!last_local_table)
@@ -2597,6 +2614,15 @@
 			if (st == last_local_table)
 				break;
 			st = st->next;
+
+			updates_sent++;
+			if (updates_sent >= peers_max_updates_at_once) {
+				/* pretend we're full so that we get back ASAP */
+				struct stconn *sc = appctx_sc(appctx);
+
+				sc_need_room(sc);
+				return -1;
+			}
 		}
 	}
 
@@ -3997,6 +4023,36 @@
 	return ret;
 }
 
+/* config parser for global "tune.peers.max-updates-at-once" */
+static int cfg_parse_max_updt_at_once(char **args, int section_type, struct proxy *curpx,
+                                      const struct proxy *defpx, const char *file, int line,
+                                      char **err)
+{
+	int arg = -1;
+
+	if (too_many_args(1, args, err, NULL))
+		return -1;
+
+	if (*(args[1]) != 0)
+		arg = atoi(args[1]);
+
+	if (arg < 1) {
+		memprintf(err, "'%s' expects an integer argument greater than 0.", args[0]);
+		return -1;
+	}
+
+	peers_max_updates_at_once = arg;
+	return 0;
+}
+
+/* config keyword parsers */
+static struct cfg_kw_list cfg_kws = {ILH, {
+	{ CFG_GLOBAL, "tune.peers.max-updates-at-once",  cfg_parse_max_updt_at_once },
+	{ 0, NULL, NULL }
+}};
+
+INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);
+
 /*
  * CLI keywords.
  */