MINOR: peers: Move update receive code to reduce the size of the I/O handler.
This patch implements a new function to treat the stick-table
update messages so that to reduce the size of the peer I/O handler
by ~200 lines.
May be backported as far as 1.5.
diff --git a/src/peers.c b/src/peers.c
index 6e211e1..2fb51d7 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -858,6 +858,232 @@
return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st, 0);
}
+
+/*
+ * Function used to parse a stick-table update message after it has been received
+ * by <p> peer with <msg_cur> as address of the pointer to the position in the
+ * receipt buffer with <msg_end> being position of the end of the stick-table message.
+ * Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error
+ * was encountered.
+ * <exp> must be set if the stick-table entry expires.
+ * <updt> must be set for PEER_MSG_STKT_UPDATE or PEER_MSG_STKT_UPDATE_TIMED stick-table
+ * messages, in this case the stick-table udpate message is received with a stick-table
+ * update ID.
+ * <totl> is the length of the stick-table update message computed upon receipt.
+ */
+static int peer_recv_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp,
+ char **msg_cur, char *msg_end, int msg_len, int totl)
+{
+ struct stream_interface *si = appctx->owner;
+ struct shared_table *st = p->remote_table;
+ struct stksess *ts, *newts;
+ uint32_t update;
+ int expire;
+ unsigned int data_type;
+ void *data_ptr;
+
+ /* Here we have data message */
+ if (!st)
+ goto ignore_msg;
+
+ expire = MS_TO_TICKS(st->table->expire);
+
+ if (updt) {
+ if (msg_len < sizeof(update)) {
+ /* malformed message */
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+ memcpy(&update, *msg_cur, sizeof(update));
+ *msg_cur += sizeof(update);
+ st->last_get = htonl(update);
+ }
+ else {
+ st->last_get++;
+ }
+
+ if (exp) {
+ size_t expire_sz = sizeof expire;
+
+ if (*msg_cur + expire_sz > msg_end) {
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+ memcpy(&expire, *msg_cur, expire_sz);
+ *msg_cur += expire_sz;
+ expire = ntohl(expire);
+ }
+
+ newts = stksess_new(st->table, NULL);
+ if (!newts)
+ goto ignore_msg;
+
+ if (st->table->type == SMP_T_STR) {
+ unsigned int to_read, to_store;
+
+ to_read = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ /* malformed message */
+ stksess_free(st->table, newts);
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+ to_store = MIN(to_read, st->table->key_size - 1);
+ if (*msg_cur + to_store > msg_end) {
+ /* malformed message */
+ stksess_free(st->table, newts);
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+
+ memcpy(newts->key.key, *msg_cur, to_store);
+ newts->key.key[to_store] = 0;
+ *msg_cur += to_read;
+ }
+ else if (st->table->type == SMP_T_SINT) {
+ unsigned int netinteger;
+
+ if (*msg_cur + sizeof(netinteger) > msg_end) {
+ /* malformed message */
+ stksess_free(st->table, newts);
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+ memcpy(&netinteger, *msg_cur, sizeof(netinteger));
+ netinteger = ntohl(netinteger);
+ memcpy(newts->key.key, &netinteger, sizeof(netinteger));
+ *msg_cur += sizeof(netinteger);
+ }
+ else {
+ if (*msg_cur + st->table->key_size > msg_end) {
+ /* malformed message */
+ stksess_free(st->table, newts);
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+ memcpy(newts->key.key, *msg_cur, st->table->key_size);
+ *msg_cur += st->table->key_size;
+ }
+
+ /* lookup for existing entry */
+ ts = stktable_set_entry(st->table, newts);
+ if (ts != newts) {
+ stksess_free(st->table, newts);
+ newts = NULL;
+ }
+
+ HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
+
+ for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
+
+ if (!((1 << data_type) & st->remote_data))
+ continue;
+
+ switch (stktable_data_types[data_type].std_type) {
+ case STD_T_SINT: {
+ int data;
+
+ data = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ /* malformed message */
+ HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+ stktable_touch_remote(st->table, ts, 1);
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+
+ data_ptr = stktable_data_ptr(st->table, ts, data_type);
+ if (data_ptr)
+ stktable_data_cast(data_ptr, std_t_sint) = data;
+ break;
+ }
+ case STD_T_UINT: {
+ unsigned int data;
+
+ data = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ /* malformed message */
+ HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+ stktable_touch_remote(st->table, ts, 1);
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+
+ data_ptr = stktable_data_ptr(st->table, ts, data_type);
+ if (data_ptr)
+ stktable_data_cast(data_ptr, std_t_uint) = data;
+ break;
+ }
+ case STD_T_ULL: {
+ unsigned long long data;
+
+ data = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ /* malformed message */
+ HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+ stktable_touch_remote(st->table, ts, 1);
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+
+ data_ptr = stktable_data_ptr(st->table, ts, data_type);
+ if (data_ptr)
+ stktable_data_cast(data_ptr, std_t_ull) = data;
+ break;
+ }
+ case STD_T_FRQP: {
+ struct freq_ctr_period data;
+
+ /* First bit is reserved for the freq_ctr_period lock
+ Note: here we're still protected by the stksess lock
+ so we don't need to update the update the freq_ctr_period
+ using its internal lock */
+
+ data.curr_tick = tick_add(now_ms, -intdecode(msg_cur, msg_end)) & ~0x1;
+ if (!*msg_cur) {
+ /* malformed message */
+ HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+ stktable_touch_remote(st->table, ts, 1);
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+ data.curr_ctr = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ /* malformed message */
+ HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+ stktable_touch_remote(st->table, ts, 1);
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+ data.prev_ctr = intdecode(msg_cur, msg_end);
+ if (!*msg_cur) {
+ /* malformed message */
+ HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+ stktable_touch_remote(st->table, ts, 1);
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+
+ data_ptr = stktable_data_ptr(st->table, ts, data_type);
+ if (data_ptr)
+ stktable_data_cast(data_ptr, std_t_frqp) = data;
+ break;
+ }
+ }
+ }
+ /* Force new expiration */
+ ts->expire = tick_add(now_ms, expire);
+
+ HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+ stktable_touch_remote(st->table, ts, 1);
+ return 1;
+
+ ignore_msg:
+ /* skip consumed message */
+ co_skip(si_oc(si), totl);
+ return 0;
+}
+
/*
* IO Handler to handle message exchance with a peer
*/
@@ -1172,7 +1398,6 @@
/* fall through */
}
case PEER_SESS_ST_WAITMSG: {
- struct stksess *ts, *newts = NULL;
uint32_t msg_len = 0;
char *msg_cur = trash.area;
char *msg_end = trash.area;
@@ -1420,208 +1645,13 @@
|| msg_head[1] == PEER_MSG_STKT_INCUPDATE
|| msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED
|| msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
- struct shared_table *st = curpeer->remote_table;
- uint32_t update;
- int expire;
- unsigned int data_type;
- void *data_ptr;
-
- /* Here we have data message */
- if (!st)
- goto ignore_msg;
+ int update, expire;
- expire = MS_TO_TICKS(st->table->expire);
-
- if (msg_head[1] == PEER_MSG_STKT_UPDATE ||
- msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED) {
- if (msg_len < sizeof(update)) {
- /* malformed message */
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
- memcpy(&update, msg_cur, sizeof(update));
- msg_cur += sizeof(update);
- st->last_get = htonl(update);
- }
- else {
- st->last_get++;
- }
-
- if (msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED ||
- msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
- size_t expire_sz = sizeof expire;
-
- if (msg_cur + expire_sz > msg_end) {
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
- memcpy(&expire, msg_cur, expire_sz);
- msg_cur += expire_sz;
- expire = ntohl(expire);
- }
-
- newts = stksess_new(st->table, NULL);
- if (!newts)
- goto ignore_msg;
-
- if (st->table->type == SMP_T_STR) {
- unsigned int to_read, to_store;
-
- to_read = intdecode(&msg_cur, msg_end);
- if (!msg_cur) {
- /* malformed message */
- stksess_free(st->table, newts);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
- to_store = MIN(to_read, st->table->key_size - 1);
- if (msg_cur + to_store > msg_end) {
- /* malformed message */
- stksess_free(st->table, newts);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
-
- memcpy(newts->key.key, msg_cur, to_store);
- newts->key.key[to_store] = 0;
- msg_cur += to_read;
- }
- else if (st->table->type == SMP_T_SINT) {
- unsigned int netinteger;
-
- if (msg_cur + sizeof(netinteger) > msg_end) {
- /* malformed message */
- stksess_free(st->table, newts);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
- memcpy(&netinteger, msg_cur, sizeof(netinteger));
- netinteger = ntohl(netinteger);
- memcpy(newts->key.key, &netinteger, sizeof(netinteger));
- msg_cur += sizeof(netinteger);
- }
- else {
- if (msg_cur + st->table->key_size > msg_end) {
- /* malformed message */
- stksess_free(st->table, newts);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
- memcpy(newts->key.key, msg_cur, st->table->key_size);
- msg_cur += st->table->key_size;
- }
-
- /* lookup for existing entry */
- ts = stktable_set_entry(st->table, newts);
- if (ts != newts) {
- stksess_free(st->table, newts);
- newts = NULL;
- }
-
- HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
-
- for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
-
- if ((1 << data_type) & st->remote_data) {
- switch (stktable_data_types[data_type].std_type) {
- case STD_T_SINT: {
- int data;
-
- data = intdecode(&msg_cur, msg_end);
- if (!msg_cur) {
- /* malformed message */
- HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
- stktable_touch_remote(st->table, ts, 1);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
-
- data_ptr = stktable_data_ptr(st->table, ts, data_type);
- if (data_ptr)
- stktable_data_cast(data_ptr, std_t_sint) = data;
- break;
- }
- case STD_T_UINT: {
- unsigned int data;
-
- data = intdecode(&msg_cur, msg_end);
- if (!msg_cur) {
- /* malformed message */
- HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
- stktable_touch_remote(st->table, ts, 1);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
-
- data_ptr = stktable_data_ptr(st->table, ts, data_type);
- if (data_ptr)
- stktable_data_cast(data_ptr, std_t_uint) = data;
- break;
- }
- case STD_T_ULL: {
- unsigned long long data;
-
- data = intdecode(&msg_cur, msg_end);
- if (!msg_cur) {
- /* malformed message */
- HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
- stktable_touch_remote(st->table, ts, 1);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
-
- data_ptr = stktable_data_ptr(st->table, ts, data_type);
- if (data_ptr)
- stktable_data_cast(data_ptr, std_t_ull) = data;
- break;
- }
- case STD_T_FRQP: {
- struct freq_ctr_period data;
-
- /* First bit is reserved for the freq_ctr_period lock
- Note: here we're still protected by the stksess lock
- so we don't need to update the update the freq_ctr_period
- using its internal lock */
-
- data.curr_tick = tick_add(now_ms, -intdecode(&msg_cur, msg_end)) & ~0x1;
- if (!msg_cur) {
- /* malformed message */
- HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
- stktable_touch_remote(st->table, ts, 1);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
- data.curr_ctr = intdecode(&msg_cur, msg_end);
- if (!msg_cur) {
- /* malformed message */
- HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
- stktable_touch_remote(st->table, ts, 1);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
- data.prev_ctr = intdecode(&msg_cur, msg_end);
- if (!msg_cur) {
- /* malformed message */
- HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
- stktable_touch_remote(st->table, ts, 1);
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
- goto switchstate;
- }
-
- data_ptr = stktable_data_ptr(st->table, ts, data_type);
- if (data_ptr)
- stktable_data_cast(data_ptr, std_t_frqp) = data;
- break;
- }
- }
- }
- }
-
- /* Force new expiration */
- ts->expire = tick_add(now_ms, expire);
-
- HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
- stktable_touch_remote(st->table, ts, 1);
+ update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED;
+ expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED;
+ if (!peer_recv_updatemsg(appctx, curpeer, update, expire,
+ &msg_cur, msg_end, msg_len, totl))
+ goto switchstate;
}
else if (msg_head[1] == PEER_MSG_STKT_ACK) {