MEDIUM: peers: handle arrays of std types in peers protocol

This patch adds support of array data_types on the peer protocol.

The table definition message will provide an additionnal parameter
for array data-types: the number of elements of the array.

In case of array of frqp it also provides a second parameter:
the period used to compute freq counter.

The array elements are std_type values linearly encoded in
the update message.

Note: if a remote peer announces an array data_type without
parameters into the table definition message, all updates
on this table will be ignored because we can not
parse update messages consistently.
diff --git a/include/haproxy/peers-t.h b/include/haproxy/peers-t.h
index ee9d905..0c712e5 100644
--- a/include/haproxy/peers-t.h
+++ b/include/haproxy/peers-t.h
@@ -40,6 +40,7 @@
 	int remote_id;
 	int flags;
 	uint64_t remote_data;
+	unsigned int remote_data_nbelem[STKTABLE_DATA_TYPES];
 	unsigned int last_acked;
 	unsigned int last_pushed;
 	unsigned int last_get;
diff --git a/src/peers.c b/src/peers.c
index 87a1284..e14f1b8 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -715,6 +715,67 @@
 
 		data_ptr = stktable_data_ptr(st->table, ts, data_type);
 		if (data_ptr) {
+			/* in case of array all elements use
+			 * the same std_type and they are linearly
+			 * encoded.
+			 */
+			if (stktable_data_types[data_type].is_array) {
+				unsigned int idx = 0;
+
+				switch (stktable_data_types[data_type].std_type) {
+				case STD_T_SINT: {
+					int data;
+
+					do {
+						data = stktable_data_cast(data_ptr, std_t_sint);
+						intencode(data, &cursor);
+
+						data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
+					} while(data_ptr);
+					break;
+				}
+				case STD_T_UINT: {
+					unsigned int data;
+
+					do {
+						data = stktable_data_cast(data_ptr, std_t_uint);
+						intencode(data, &cursor);
+
+						data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
+					} while(data_ptr);
+					break;
+				}
+				case STD_T_ULL: {
+					unsigned long long data;
+
+					do {
+						data = stktable_data_cast(data_ptr, std_t_ull);
+						intencode(data, &cursor);
+
+						data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
+					} while(data_ptr);
+					break;
+				}
+				case STD_T_FRQP: {
+					struct freq_ctr *frqp;
+
+					do {
+						frqp = &stktable_data_cast(data_ptr, std_t_frqp);
+						intencode((unsigned int)(now_ms - frqp->curr_tick), &cursor);
+						intencode(frqp->curr_ctr, &cursor);
+						intencode(frqp->prev_ctr, &cursor);
+
+						data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
+					} while(data_ptr);
+					break;
+				}
+				}
+
+				/* array elements fully encoded
+				 * proceed next data_type.
+				 */
+				continue;
+			}
 			switch (stktable_data_types[data_type].std_type) {
 				case STD_T_SINT: {
 					int data;
@@ -854,19 +915,58 @@
 	/* encode available known data types in table */
 	for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
 		if (st->table->data_ofs[data_type]) {
-			switch (stktable_data_types[data_type].std_type) {
-				case STD_T_SINT:
-				case STD_T_UINT:
-				case STD_T_ULL:
-				case STD_T_DICT:
-					data |= 1ULL << data_type;
-					break;
-				case STD_T_FRQP:
-					data |= 1ULL << data_type;
-					intencode(data_type, &chunkq);
+			/* stored data types parameters are all linearly encoded
+			 * at the end of the 'table definition' message.
+			 *
+			 * Currently only array data_types and and data_types
+			 * using freq_counter base type have parameters:
+			 *
+			 * - array has always at least one parameter set to the
+			 *   number of elements.
+			 *
+			 * - array of base-type freq_counters has an additional
+			 *  parameter set to the period used to compute those
+			 *  freq_counters.
+			 *
+			 * - simple freq counter has a parameter set to the period
+			 *   used to compute
+			 *
+			 *  A set of parameter for a datatype MUST BE prefixed
+			 *  by the data-type id itself:
+			 *  This is useless because the data_types are ordered and
+			 *  the data_type bitfield already gives the information of
+			 *  stored types, but it was designed this way when the
+			 *  push of period parameter was added for freq counters
+			 *  and we don't want to break the compatibility.
+			 *
+			 */
+			if (stktable_data_types[data_type].is_array) {
+				/* This is an array type so we first encode
+				 * the data_type itself to prefix parameters
+				 */
+				intencode(data_type, &chunkq);
+
+				/* We encode the first parameter which is
+				 * the number of elements of this array
+				 */
+				intencode(st->table->data_nbelem[data_type], &chunkq);
+
+				/* for array of freq counters, there is an additionnal
+				 * period parameter to encode
+				 */
+				if (stktable_data_types[data_type].std_type == STD_T_FRQP)
 					intencode(st->table->data_arg[data_type].u, &chunkq);
-					break;
 			}
+			else if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
+				/* this datatype is a simple freq counter not part
+				 * of an array. We encode the data_type itself
+				 * to prefix the 'period' parameter
+				 */
+				intencode(data_type, &chunkq);
+				intencode(st->table->data_arg[data_type].u, &chunkq);
+			}
+			/* set the bit corresponding to stored data type */
+			data |= 1ULL << data_type;
 		}
 	}
 	intencode(data, &cursor);
@@ -1670,10 +1770,98 @@
 
 	for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
 		uint64_t decoded_int;
+		unsigned int idx;
 
 		if (!((1ULL << data_type) & st->remote_data))
 			continue;
+		if (stktable_data_types[data_type].is_array) {
+			/* in case of array all elements
+			 * use the same std_type and they
+			 * are linearly encoded.
+			 * The number of elements was provided
+			 * by table definition message
+			 */
+			switch (stktable_data_types[data_type].std_type) {
+			case STD_T_SINT:
+				for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
+					decoded_int = intdecode(msg_cur, msg_end);
+					if (!*msg_cur) {
+						TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+						goto malformed_unlock;
+					}
 
+					data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
+					if (data_ptr)
+						stktable_data_cast(data_ptr, std_t_sint) = decoded_int;
+				}
+				break;
+			case STD_T_UINT:
+				for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
+					decoded_int = intdecode(msg_cur, msg_end);
+					if (!*msg_cur) {
+						TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+						goto malformed_unlock;
+					}
+
+					data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
+					if (data_ptr)
+						stktable_data_cast(data_ptr, std_t_uint) = decoded_int;
+				}
+				break;
+			case STD_T_ULL:
+				for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
+					decoded_int = intdecode(msg_cur, msg_end);
+					if (!*msg_cur) {
+						TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+						goto malformed_unlock;
+					}
+
+					data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
+					if (data_ptr)
+						stktable_data_cast(data_ptr, std_t_ull) = decoded_int;
+				}
+				break;
+			case STD_T_FRQP:
+				for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
+					struct freq_ctr data;
+
+					/* First bit is reserved for the freq_ctr lock
+					 * Note: here we're still protected by the stksess lock
+					 * so we don't need to update the update the freq_ctr
+					 * using its internal lock.
+					 */
+
+					decoded_int = intdecode(msg_cur, msg_end);
+					if (!*msg_cur) {
+						TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+						goto malformed_unlock;
+					}
+
+					data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1;
+					data.curr_ctr = intdecode(msg_cur, msg_end);
+					if (!*msg_cur) {
+						TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+						goto malformed_unlock;
+					}
+
+					data.prev_ctr = intdecode(msg_cur, msg_end);
+					if (!*msg_cur) {
+						TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+						goto malformed_unlock;
+					}
+
+					data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
+					if (data_ptr)
+						stktable_data_cast(data_ptr, std_t_frqp) = data;
+				}
+				break;
+			}
+
+			/* array is fully decoded
+			 * proceed next data_type.
+			 */
+			continue;
+		}
 		decoded_int = intdecode(msg_cur, msg_end);
 		if (!*msg_cur) {
 			TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
@@ -1705,7 +1893,8 @@
 			/* First bit is reserved for the freq_ctr lock
 			Note: here we're still protected by the stksess lock
 			so we don't need to update the update the freq_ctr
-			using its internal lock */
+			using its internal lock.
+			*/
 
 			data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1;
 			data.curr_ctr = intdecode(msg_cur, msg_end);
@@ -1990,6 +2179,103 @@
 		goto ignore_msg;
 	}
 
+	/* Check if there there is the additionnal expire data */
+	intdecode(msg_cur, msg_end);
+	if (*msg_cur) {
+		uint64_t data_type;
+		uint64_t type;
+
+		/* This define contains the expire data so we consider
+		 * it also contain all data_types parameters.
+		 */
+		for (data_type = 0; data_type < STKTABLE_DATA_TYPES; data_type++) {
+			if (table_data & (1ULL << data_type)) {
+				if (stktable_data_types[data_type].is_array) {
+					/* This should be an array
+					 * so we parse the data_type prefix
+					 * because we must have parameters.
+					 */
+					type = intdecode(msg_cur, msg_end);
+					if (!*msg_cur) {
+						p->remote_table = NULL;
+						TRACE_PROTO("missing meta data for array", PEERS_EV_DEFMSG, NULL, p);
+						goto ignore_msg;
+					}
+
+					/* check if the data_type match the current from the bitfield */
+					if (type != data_type) {
+						p->remote_table = NULL;
+						TRACE_PROTO("meta data missmatch type", PEERS_EV_DEFMSG, NULL, p);
+						goto ignore_msg;
+					}
+
+					/* decode the nbelem of the array */
+					p->remote_table->remote_data_nbelem[type] = intdecode(msg_cur, msg_end);
+					if (!*msg_cur) {
+						p->remote_table = NULL;
+						TRACE_PROTO("missing array size meta data for array", PEERS_EV_DEFMSG, NULL, p);
+						goto ignore_msg;
+					}
+
+					/* if it is an array of frqp, we must also have the period to decode */
+					if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
+						intdecode(msg_cur, msg_end);
+						if (!*msg_cur) {
+							p->remote_table = NULL;
+							TRACE_PROTO("missing period for frqp", PEERS_EV_DEFMSG, NULL, p);
+							goto ignore_msg;
+						}
+					}
+				}
+				else if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
+					/* This should be a std freq counter data_type
+					 * so we parse the data_type prefix
+					 * because we must have parameters.
+					 */
+					type = intdecode(msg_cur, msg_end);
+					if (!*msg_cur) {
+						p->remote_table = NULL;
+						TRACE_PROTO("missing meta data for frqp", PEERS_EV_DEFMSG, NULL, p);
+						goto ignore_msg;
+					}
+
+					/* check if the data_type match the current from the bitfield */
+					if (type != data_type) {
+						p->remote_table = NULL;
+						TRACE_PROTO("meta data missmatch type", PEERS_EV_DEFMSG, NULL, p);
+						goto ignore_msg;
+					}
+
+					/* decode the period */
+					intdecode(msg_cur, msg_end);
+					if (!*msg_cur) {
+						p->remote_table = NULL;
+						TRACE_PROTO("missing period for frqp", PEERS_EV_DEFMSG, NULL, p);
+						goto ignore_msg;
+					}
+				}
+			}
+		}
+	}
+	else {
+		uint64_t data_type;
+
+		/* There is not additional data but
+		 * array size parameter is mandatory to parse array
+		 * so we consider an error if an array data_type is define
+		 * but there is no additional data.
+		 */
+		for (data_type = 0; data_type < STKTABLE_DATA_TYPES; data_type++) {
+			if (table_data & (1ULL << data_type)) {
+				if (stktable_data_types[data_type].is_array) {
+					p->remote_table = NULL;
+					TRACE_PROTO("missing array size meta data for array", PEERS_EV_DEFMSG, NULL, p);
+					goto ignore_msg;
+				}
+			}
+		}
+	}
+
 	p->remote_table->remote_data = table_data;
 	p->remote_table->remote_id = table_id;
 	return 1;