MEDIUM: peers: support of any stick-table data-types for sync

It is possible to propagate entries of any data-types in stick-tables between
several haproxy instances over TCP connections in a multi-master fashion. Each
instance pushes its local updates and insertions to remote peers. The pushed
values overwrite remote ones without aggregation. Interrupted exchanges are
automatically detected and recovered from the last known point.
diff --git a/src/peers.c b/src/peers.c
index d05d653..d447a92 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -242,6 +242,8 @@
 	uint32_t netinteger;
 	unsigned short datalen;
 	char *cursor, *datamsg;
+	unsigned int data_type;
+	void *data_ptr;
 
 	cursor = datamsg = msg + 1 + 5;
 
@@ -278,11 +280,43 @@
 	}
 
 	/* encode values */
-	if (stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID)) {
-		int srvid;
+	for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
 
-		srvid = stktable_data_cast(stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID), server_id);
-		intencode(srvid, &cursor);
+		data_ptr = stktable_data_ptr(st->table, ts, data_type);
+		if (data_ptr) {
+			switch (stktable_data_types[data_type].std_type) {
+				case STD_T_SINT: {
+					int data;
+
+					data = stktable_data_cast(data_ptr, std_t_sint);
+					intencode(data, &cursor);
+					break;
+				}
+				case STD_T_UINT: {
+					unsigned int data;
+
+					data = stktable_data_cast(data_ptr, std_t_uint);
+					intencode(data, &cursor);
+					break;
+				}
+				case STD_T_ULL: {
+					unsigned long long data;
+
+					data = stktable_data_cast(data_ptr, std_t_ull);
+					intencode(data, &cursor);
+					break;
+				}
+				case STD_T_FRQP: {
+					struct freq_ctr_period *frqp;
+
+					frqp = &stktable_data_cast(data_ptr, std_t_frqp);
+					intencode((unsigned int)(now_ms - frqp->curr_tick), &cursor);
+					intencode(frqp->curr_ctr, &cursor);
+					intencode(frqp->prev_ctr, &cursor);
+					break;
+				}
+			}
+		}
 	}
 
 	/* Compute datalen */
@@ -317,6 +351,7 @@
 	unsigned short datalen;
 	char *cursor, *datamsg;
 	uint64_t data = 0;
+	unsigned int data_type;
 
 	cursor = datamsg = msg + 2 + 5;
 
@@ -338,9 +373,18 @@
 	/* encode table key size */
 	intencode(st->table->key_size, &cursor);
 
-	/* encode available data types in table */
-	if (st->table->data_ofs[STKTABLE_DT_SERVER_ID]) {
-		data |= 1 << STKTABLE_DT_SERVER_ID;
+	/* encode available known data types in table */
+	for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
+		if (st->table->data_ofs[data_type]) {
+			switch (stktable_data_types[data_type].std_type) {
+				case STD_T_SINT:
+				case STD_T_UINT:
+				case STD_T_ULL:
+				case STD_T_FRQP:
+					data |= 1 << data_type;
+					break;
+			}
+		}
 	}
 	intencode(data, &cursor);
 
@@ -959,6 +1003,8 @@
 						 || msg_head[1] == PEER_MSG_STKT_INCUPDATE) {
 						struct shared_table *st = curpeer->remote_table;
 						uint32_t update;
+						unsigned int data_type;
+						void *data_ptr;
 
 						/* Here we have data message */
 						if (!st)
@@ -1052,18 +1098,83 @@
 							}
 						}
 
-						if ((1 << STKTABLE_DT_SERVER_ID) & st->remote_data) {
-							int srvid;
+						for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
 
-							srvid = intdecode(&msg_cur, msg_end);
-							if (!msg_cur) {
-								/* malformed message */
-								appctx->st0 = PEER_SESS_ST_ERRPROTO;
-								goto switchstate;
-							}
+							if ((1 << data_type) & st->remote_data) {
+								switch (stktable_data_types[data_type].std_type) {
+									case STD_T_SINT: {
+										int data;
+
+										data = intdecode(&msg_cur, msg_end);
+										if (!msg_cur) {
+											/* malformed message */
+											appctx->st0 = PEER_SESS_ST_ERRPROTO;
+											goto switchstate;
+										}
+
+										data_ptr = stktable_data_ptr(st->table, ts, data_type);
+										if (data_ptr)
+											stktable_data_cast(data_ptr, std_t_sint) = data;
+										break;
+									}
+									case STD_T_UINT: {
+										unsigned int data;
+
+										data = intdecode(&msg_cur, msg_end);
+										if (!msg_cur) {
+											/* malformed message */
+											appctx->st0 = PEER_SESS_ST_ERRPROTO;
+											goto switchstate;
+										}
+
+										data_ptr = stktable_data_ptr(st->table, ts, data_type);
+										if (data_ptr)
+											stktable_data_cast(data_ptr, std_t_uint) = data;
+										break;
+									}
+									case STD_T_ULL: {
+										unsigned long long  data;
+
+										data = intdecode(&msg_cur, msg_end);
+										if (!msg_cur) {
+											/* malformed message */
+											appctx->st0 = PEER_SESS_ST_ERRPROTO;
+											goto switchstate;
+										}
+
+										data_ptr = stktable_data_ptr(st->table, ts, data_type);
+										if (data_ptr)
+											stktable_data_cast(data_ptr, std_t_ull) = data;
+										break;
+									}
+									case STD_T_FRQP: {
+										struct freq_ctr_period data;
+
+										data.curr_tick = tick_add(now_ms, intdecode(&msg_cur, msg_end));
+										if (!msg_cur) {
+											/* malformed message */
+											appctx->st0 = PEER_SESS_ST_ERRPROTO;
+											goto switchstate;
+										}
+										data.curr_ctr = intdecode(&msg_cur, msg_end);
+										if (!msg_cur) {
+											/* malformed message */
+											appctx->st0 = PEER_SESS_ST_ERRPROTO;
+											goto switchstate;
+										}
+										data.prev_ctr = intdecode(&msg_cur, msg_end);
+										if (!msg_cur) {
+											/* malformed message */
+											appctx->st0 = PEER_SESS_ST_ERRPROTO;
+											goto switchstate;
+										}
 
-							if (stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID)) {
-								stktable_data_cast(stktable_data_ptr(st->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
+										data_ptr = stktable_data_ptr(st->table, ts, data_type);
+										if (data_ptr)
+											stktable_data_cast(data_ptr, std_t_frqp) = data;
+										break;
+									}
+								}
 							}
 						}
 					}