MEDIUM: peers: handle arrays of std types in peers protocol
This patch adds support of array data_types on the peer protocol.
The table definition message will provide an additionnal parameter
for array data-types: the number of elements of the array.
In case of array of frqp it also provides a second parameter:
the period used to compute freq counter.
The array elements are std_type values linearly encoded in
the update message.
Note: if a remote peer announces an array data_type without
parameters into the table definition message, all updates
on this table will be ignored because we can not
parse update messages consistently.
diff --git a/include/haproxy/peers-t.h b/include/haproxy/peers-t.h
index ee9d905..0c712e5 100644
--- a/include/haproxy/peers-t.h
+++ b/include/haproxy/peers-t.h
@@ -40,6 +40,7 @@
int remote_id;
int flags;
uint64_t remote_data;
+ unsigned int remote_data_nbelem[STKTABLE_DATA_TYPES];
unsigned int last_acked;
unsigned int last_pushed;
unsigned int last_get;
diff --git a/src/peers.c b/src/peers.c
index 87a1284..e14f1b8 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -715,6 +715,67 @@
data_ptr = stktable_data_ptr(st->table, ts, data_type);
if (data_ptr) {
+ /* in case of array all elements use
+ * the same std_type and they are linearly
+ * encoded.
+ */
+ if (stktable_data_types[data_type].is_array) {
+ unsigned int idx = 0;
+
+ switch (stktable_data_types[data_type].std_type) {
+ case STD_T_SINT: {
+ int data;
+
+ do {
+ data = stktable_data_cast(data_ptr, std_t_sint);
+ intencode(data, &cursor);
+
+ data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
+ } while(data_ptr);
+ break;
+ }
+ case STD_T_UINT: {
+ unsigned int data;
+
+ do {
+ data = stktable_data_cast(data_ptr, std_t_uint);
+ intencode(data, &cursor);
+
+ data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
+ } while(data_ptr);
+ break;
+ }
+ case STD_T_ULL: {
+ unsigned long long data;
+
+ do {
+ data = stktable_data_cast(data_ptr, std_t_ull);
+ intencode(data, &cursor);
+
+ data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
+ } while(data_ptr);
+ break;
+ }
+ case STD_T_FRQP: {
+ struct freq_ctr *frqp;
+
+ do {
+ frqp = &stktable_data_cast(data_ptr, std_t_frqp);
+ intencode((unsigned int)(now_ms - frqp->curr_tick), &cursor);
+ intencode(frqp->curr_ctr, &cursor);
+ intencode(frqp->prev_ctr, &cursor);
+
+ data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
+ } while(data_ptr);
+ break;
+ }
+ }
+
+ /* array elements fully encoded
+ * proceed next data_type.
+ */
+ continue;
+ }
switch (stktable_data_types[data_type].std_type) {
case STD_T_SINT: {
int data;
@@ -854,19 +915,58 @@
/* encode available known data types in table */
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
if (st->table->data_ofs[data_type]) {
- switch (stktable_data_types[data_type].std_type) {
- case STD_T_SINT:
- case STD_T_UINT:
- case STD_T_ULL:
- case STD_T_DICT:
- data |= 1ULL << data_type;
- break;
- case STD_T_FRQP:
- data |= 1ULL << data_type;
- intencode(data_type, &chunkq);
+ /* stored data types parameters are all linearly encoded
+ * at the end of the 'table definition' message.
+ *
+ * Currently only array data_types and and data_types
+ * using freq_counter base type have parameters:
+ *
+ * - array has always at least one parameter set to the
+ * number of elements.
+ *
+ * - array of base-type freq_counters has an additional
+ * parameter set to the period used to compute those
+ * freq_counters.
+ *
+ * - simple freq counter has a parameter set to the period
+ * used to compute
+ *
+ * A set of parameter for a datatype MUST BE prefixed
+ * by the data-type id itself:
+ * This is useless because the data_types are ordered and
+ * the data_type bitfield already gives the information of
+ * stored types, but it was designed this way when the
+ * push of period parameter was added for freq counters
+ * and we don't want to break the compatibility.
+ *
+ */
+ if (stktable_data_types[data_type].is_array) {
+ /* This is an array type so we first encode
+ * the data_type itself to prefix parameters
+ */
+ intencode(data_type, &chunkq);
+
+ /* We encode the first parameter which is
+ * the number of elements of this array
+ */
+ intencode(st->table->data_nbelem[data_type], &chunkq);
+
+ /* for array of freq counters, there is an additionnal
+ * period parameter to encode
+ */
+ if (stktable_data_types[data_type].std_type == STD_T_FRQP)
intencode(st->table->data_arg[data_type].u, &chunkq);
- break;
}
+ else if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
+ /* this datatype is a simple freq counter not part
+ * of an array. We encode the data_type itself
+ * to prefix the 'period' parameter
+ */
+ intencode(data_type, &chunkq);
+ intencode(st->table->data_arg[data_type].u, &chunkq);
+ }
+ /* set the bit corresponding to stored data type */
+ data |= 1ULL << data_type;
}
}
intencode(data, &cursor);
@@ -1670,10 +1770,98 @@
for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
uint64_t decoded_int;
+ unsigned int idx;
if (!((1ULL << data_type) & st->remote_data))
continue;
+ if (stktable_data_types[data_type].is_array) {
+ /* in case of array all elements
+ * use the same std_type and they
+ * are linearly encoded.
+ * The number of elements was provided
+ * by table definition message
+ */
+ switch (stktable_data_types[data_type].std_type) {
+ case STD_T_SINT:
+ for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
+ decoded_int = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ goto malformed_unlock;
+ }
+ data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
+ if (data_ptr)
+ stktable_data_cast(data_ptr, std_t_sint) = decoded_int;
+ }
+ break;
+ case STD_T_UINT:
+ for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
+ decoded_int = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ goto malformed_unlock;
+ }
+
+ data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
+ if (data_ptr)
+ stktable_data_cast(data_ptr, std_t_uint) = decoded_int;
+ }
+ break;
+ case STD_T_ULL:
+ for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
+ decoded_int = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ goto malformed_unlock;
+ }
+
+ data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
+ if (data_ptr)
+ stktable_data_cast(data_ptr, std_t_ull) = decoded_int;
+ }
+ break;
+ case STD_T_FRQP:
+ for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
+ struct freq_ctr data;
+
+ /* First bit is reserved for the freq_ctr lock
+ * Note: here we're still protected by the stksess lock
+ * so we don't need to update the update the freq_ctr
+ * using its internal lock.
+ */
+
+ decoded_int = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ goto malformed_unlock;
+ }
+
+ data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1;
+ data.curr_ctr = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ goto malformed_unlock;
+ }
+
+ data.prev_ctr = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
+ goto malformed_unlock;
+ }
+
+ data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, idx);
+ if (data_ptr)
+ stktable_data_cast(data_ptr, std_t_frqp) = data;
+ }
+ break;
+ }
+
+ /* array is fully decoded
+ * proceed next data_type.
+ */
+ continue;
+ }
decoded_int = intdecode(msg_cur, msg_end);
if (!*msg_cur) {
TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p);
@@ -1705,7 +1893,8 @@
/* First bit is reserved for the freq_ctr lock
Note: here we're still protected by the stksess lock
so we don't need to update the update the freq_ctr
- using its internal lock */
+ using its internal lock.
+ */
data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1;
data.curr_ctr = intdecode(msg_cur, msg_end);
@@ -1990,6 +2179,103 @@
goto ignore_msg;
}
+ /* Check if there there is the additionnal expire data */
+ intdecode(msg_cur, msg_end);
+ if (*msg_cur) {
+ uint64_t data_type;
+ uint64_t type;
+
+ /* This define contains the expire data so we consider
+ * it also contain all data_types parameters.
+ */
+ for (data_type = 0; data_type < STKTABLE_DATA_TYPES; data_type++) {
+ if (table_data & (1ULL << data_type)) {
+ if (stktable_data_types[data_type].is_array) {
+ /* This should be an array
+ * so we parse the data_type prefix
+ * because we must have parameters.
+ */
+ type = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ p->remote_table = NULL;
+ TRACE_PROTO("missing meta data for array", PEERS_EV_DEFMSG, NULL, p);
+ goto ignore_msg;
+ }
+
+ /* check if the data_type match the current from the bitfield */
+ if (type != data_type) {
+ p->remote_table = NULL;
+ TRACE_PROTO("meta data missmatch type", PEERS_EV_DEFMSG, NULL, p);
+ goto ignore_msg;
+ }
+
+ /* decode the nbelem of the array */
+ p->remote_table->remote_data_nbelem[type] = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ p->remote_table = NULL;
+ TRACE_PROTO("missing array size meta data for array", PEERS_EV_DEFMSG, NULL, p);
+ goto ignore_msg;
+ }
+
+ /* if it is an array of frqp, we must also have the period to decode */
+ if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
+ intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ p->remote_table = NULL;
+ TRACE_PROTO("missing period for frqp", PEERS_EV_DEFMSG, NULL, p);
+ goto ignore_msg;
+ }
+ }
+ }
+ else if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
+ /* This should be a std freq counter data_type
+ * so we parse the data_type prefix
+ * because we must have parameters.
+ */
+ type = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ p->remote_table = NULL;
+ TRACE_PROTO("missing meta data for frqp", PEERS_EV_DEFMSG, NULL, p);
+ goto ignore_msg;
+ }
+
+ /* check if the data_type match the current from the bitfield */
+ if (type != data_type) {
+ p->remote_table = NULL;
+ TRACE_PROTO("meta data missmatch type", PEERS_EV_DEFMSG, NULL, p);
+ goto ignore_msg;
+ }
+
+ /* decode the period */
+ intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ p->remote_table = NULL;
+ TRACE_PROTO("missing period for frqp", PEERS_EV_DEFMSG, NULL, p);
+ goto ignore_msg;
+ }
+ }
+ }
+ }
+ }
+ else {
+ uint64_t data_type;
+
+ /* There is not additional data but
+ * array size parameter is mandatory to parse array
+ * so we consider an error if an array data_type is define
+ * but there is no additional data.
+ */
+ for (data_type = 0; data_type < STKTABLE_DATA_TYPES; data_type++) {
+ if (table_data & (1ULL << data_type)) {
+ if (stktable_data_types[data_type].is_array) {
+ p->remote_table = NULL;
+ TRACE_PROTO("missing array size meta data for array", PEERS_EV_DEFMSG, NULL, p);
+ goto ignore_msg;
+ }
+ }
+ }
+ }
+
p->remote_table->remote_data = table_data;
p->remote_table->remote_id = table_id;
return 1;