MINOR: peers: Move update receive code to reduce the size of the I/O handler.

This patch implements a new function to treat the stick-table
update messages so that to reduce the size of the peer I/O handler
by ~200 lines.

May be backported as far as 1.5.
diff --git a/src/peers.c b/src/peers.c
index 6e211e1..2fb51d7 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -858,6 +858,232 @@
 	return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st, 0);
 }
 
+
+/*
+ * Function used to parse a stick-table update message after it has been received
+ * by <p> peer with <msg_cur> as address of the pointer to the position in the
+ * receipt buffer with <msg_end> being position of the end of the stick-table message.
+ * Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error
+ * was encountered.
+ * <exp> must be set if the stick-table entry expires.
+ * <updt> must be set for  PEER_MSG_STKT_UPDATE or PEER_MSG_STKT_UPDATE_TIMED stick-table
+ * messages, in this case the stick-table udpate message is received with a stick-table
+ * update ID.
+ * <totl> is the length of the stick-table update message computed upon receipt.
+ */
+static int peer_recv_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp,
+                               char **msg_cur, char *msg_end, int msg_len, int totl)
+{
+	struct stream_interface *si = appctx->owner;
+	struct shared_table *st = p->remote_table;
+	struct stksess *ts, *newts;
+	uint32_t update;
+	int expire;
+	unsigned int data_type;
+	void *data_ptr;
+
+	/* Here we have data message */
+	if (!st)
+		goto ignore_msg;
+
+	expire = MS_TO_TICKS(st->table->expire);
+
+	if (updt) {
+		if (msg_len < sizeof(update)) {
+			/* malformed message */
+			appctx->st0 = PEER_SESS_ST_ERRPROTO;
+			return 0;
+		}
+		memcpy(&update, *msg_cur, sizeof(update));
+		*msg_cur += sizeof(update);
+		st->last_get = htonl(update);
+	}
+	else {
+		st->last_get++;
+	}
+
+	if (exp) {
+		size_t expire_sz = sizeof expire;
+
+		if (*msg_cur + expire_sz > msg_end) {
+			appctx->st0 = PEER_SESS_ST_ERRPROTO;
+			return 0;
+		}
+		memcpy(&expire, *msg_cur, expire_sz);
+		*msg_cur += expire_sz;
+		expire = ntohl(expire);
+	}
+
+	newts = stksess_new(st->table, NULL);
+	if (!newts)
+		goto ignore_msg;
+
+	if (st->table->type == SMP_T_STR) {
+		unsigned int to_read, to_store;
+
+		to_read = intdecode(msg_cur, msg_end);
+		if (!*msg_cur) {
+			/* malformed message */
+			stksess_free(st->table, newts);
+			appctx->st0 = PEER_SESS_ST_ERRPROTO;
+			return 0;
+		}
+		to_store = MIN(to_read, st->table->key_size - 1);
+		if (*msg_cur + to_store > msg_end) {
+			/* malformed message */
+			stksess_free(st->table, newts);
+			appctx->st0 = PEER_SESS_ST_ERRPROTO;
+			return 0;
+		}
+
+		memcpy(newts->key.key, *msg_cur, to_store);
+		newts->key.key[to_store] = 0;
+		*msg_cur += to_read;
+	}
+	else if (st->table->type == SMP_T_SINT) {
+		unsigned int netinteger;
+
+		if (*msg_cur + sizeof(netinteger) > msg_end) {
+			/* malformed message */
+			stksess_free(st->table, newts);
+			appctx->st0 = PEER_SESS_ST_ERRPROTO;
+			return 0;
+		}
+		memcpy(&netinteger, *msg_cur, sizeof(netinteger));
+		netinteger = ntohl(netinteger);
+		memcpy(newts->key.key, &netinteger, sizeof(netinteger));
+		*msg_cur += sizeof(netinteger);
+	}
+	else {
+		if (*msg_cur + st->table->key_size > msg_end) {
+			/* malformed message */
+			stksess_free(st->table, newts);
+			appctx->st0 = PEER_SESS_ST_ERRPROTO;
+			return 0;
+		}
+		memcpy(newts->key.key, *msg_cur, st->table->key_size);
+		*msg_cur += st->table->key_size;
+	}
+
+	/* lookup for existing entry */
+	ts = stktable_set_entry(st->table, newts);
+	if (ts != newts) {
+		stksess_free(st->table, newts);
+		newts = NULL;
+	}
+
+	HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
+
+	for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
+
+		if (!((1 << data_type) & st->remote_data))
+			continue;
+
+		switch (stktable_data_types[data_type].std_type) {
+		case STD_T_SINT: {
+			int data;
+
+			data = intdecode(msg_cur, msg_end);
+			if (!*msg_cur) {
+				/* malformed message */
+				HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+				stktable_touch_remote(st->table, ts, 1);
+				appctx->st0 = PEER_SESS_ST_ERRPROTO;
+				return 0;
+			}
+
+			data_ptr = stktable_data_ptr(st->table, ts, data_type);
+			if (data_ptr)
+				stktable_data_cast(data_ptr, std_t_sint) = data;
+			break;
+		}
+		case STD_T_UINT: {
+			unsigned int data;
+
+			data = intdecode(msg_cur, msg_end);
+			if (!*msg_cur) {
+				/* malformed message */
+				HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+				stktable_touch_remote(st->table, ts, 1);
+				appctx->st0 = PEER_SESS_ST_ERRPROTO;
+				return 0;
+			}
+
+			data_ptr = stktable_data_ptr(st->table, ts, data_type);
+			if (data_ptr)
+				stktable_data_cast(data_ptr, std_t_uint) = data;
+			break;
+		}
+		case STD_T_ULL: {
+			unsigned long long  data;
+
+			data = intdecode(msg_cur, msg_end);
+			if (!*msg_cur) {
+				/* malformed message */
+				HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+				stktable_touch_remote(st->table, ts, 1);
+				appctx->st0 = PEER_SESS_ST_ERRPROTO;
+				return 0;
+			}
+
+			data_ptr = stktable_data_ptr(st->table, ts, data_type);
+			if (data_ptr)
+				stktable_data_cast(data_ptr, std_t_ull) = data;
+			break;
+		}
+		case STD_T_FRQP: {
+			struct freq_ctr_period data;
+
+			/* First bit is reserved for the freq_ctr_period lock
+			Note: here we're still protected by the stksess lock
+			so we don't need to update the update the freq_ctr_period
+			using its internal lock */
+
+			data.curr_tick = tick_add(now_ms, -intdecode(msg_cur, msg_end)) & ~0x1;
+			if (!*msg_cur) {
+				/* malformed message */
+				HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+				stktable_touch_remote(st->table, ts, 1);
+				appctx->st0 = PEER_SESS_ST_ERRPROTO;
+				return 0;
+			}
+			data.curr_ctr = intdecode(msg_cur, msg_end);
+			if (!*msg_cur) {
+				/* malformed message */
+				HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+				stktable_touch_remote(st->table, ts, 1);
+				appctx->st0 = PEER_SESS_ST_ERRPROTO;
+				return 0;
+			}
+			data.prev_ctr = intdecode(msg_cur, msg_end);
+			if (!*msg_cur) {
+				/* malformed message */
+				HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+				stktable_touch_remote(st->table, ts, 1);
+				appctx->st0 = PEER_SESS_ST_ERRPROTO;
+				return 0;
+			}
+
+			data_ptr = stktable_data_ptr(st->table, ts, data_type);
+			if (data_ptr)
+				stktable_data_cast(data_ptr, std_t_frqp) = data;
+			break;
+		}
+		}
+	}
+	/* Force new expiration */
+	ts->expire = tick_add(now_ms, expire);
+
+	HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+	stktable_touch_remote(st->table, ts, 1);
+	return 1;
+
+ ignore_msg:
+	/* skip consumed message */
+	co_skip(si_oc(si), totl);
+	return 0;
+}
+
 /*
  * IO Handler to handle message exchance with a peer
  */
@@ -1172,7 +1398,6 @@
 				/* fall through */
 			}
 			case PEER_SESS_ST_WAITMSG: {
-				struct stksess *ts, *newts = NULL;
 				uint32_t msg_len = 0;
 				char *msg_cur = trash.area;
 				char *msg_end = trash.area;
@@ -1420,208 +1645,13 @@
 						 || msg_head[1] == PEER_MSG_STKT_INCUPDATE
 						 || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED
 						 || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
-						struct shared_table *st = curpeer->remote_table;
-						uint32_t update;
-						int expire;
-						unsigned int data_type;
-						void *data_ptr;
-
-						/* Here we have data message */
-						if (!st)
-							goto ignore_msg;
+						int update, expire;
 
-						expire = MS_TO_TICKS(st->table->expire);
-
-						if (msg_head[1] == PEER_MSG_STKT_UPDATE ||
-						    msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED) {
-							if (msg_len < sizeof(update)) {
-								/* malformed message */
-								appctx->st0 = PEER_SESS_ST_ERRPROTO;
-								goto switchstate;
-							}
-							memcpy(&update, msg_cur, sizeof(update));
-							msg_cur += sizeof(update);
-							st->last_get = htonl(update);
-						}
-						else {
-							st->last_get++;
-						}
-
-						if (msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED ||
-						    msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
-							size_t expire_sz = sizeof expire;
-
-							if (msg_cur + expire_sz > msg_end) {
-								appctx->st0 = PEER_SESS_ST_ERRPROTO;
-								goto switchstate;
-							}
-							memcpy(&expire, msg_cur, expire_sz);
-							msg_cur += expire_sz;
-							expire = ntohl(expire);
-						}
-
-						newts = stksess_new(st->table, NULL);
-						if (!newts)
-							goto ignore_msg;
-
-						if (st->table->type == SMP_T_STR) {
-							unsigned int to_read, to_store;
-
-							to_read = intdecode(&msg_cur, msg_end);
-							if (!msg_cur) {
-								/* malformed message */
-								stksess_free(st->table, newts);
-								appctx->st0 = PEER_SESS_ST_ERRPROTO;
-								goto switchstate;
-							}
-							to_store = MIN(to_read, st->table->key_size - 1);
-							if (msg_cur + to_store > msg_end) {
-								/* malformed message */
-								stksess_free(st->table, newts);
-								appctx->st0 = PEER_SESS_ST_ERRPROTO;
-								goto switchstate;
-							}
-
-							memcpy(newts->key.key, msg_cur, to_store);
-							newts->key.key[to_store] = 0;
-							msg_cur += to_read;
-						}
-						else if (st->table->type == SMP_T_SINT) {
-							unsigned int netinteger;
-
-							if (msg_cur + sizeof(netinteger) > msg_end) {
-								/* malformed message */
-								stksess_free(st->table, newts);
-								appctx->st0 = PEER_SESS_ST_ERRPROTO;
-								goto switchstate;
-							}
-							memcpy(&netinteger, msg_cur, sizeof(netinteger));
-							netinteger = ntohl(netinteger);
-							memcpy(newts->key.key, &netinteger, sizeof(netinteger));
-							msg_cur += sizeof(netinteger);
-						}
-						else {
-							if (msg_cur + st->table->key_size > msg_end) {
-								/* malformed message */
-								stksess_free(st->table, newts);
-								appctx->st0 = PEER_SESS_ST_ERRPROTO;
-								goto switchstate;
-							}
-							memcpy(newts->key.key, msg_cur, st->table->key_size);
-							msg_cur += st->table->key_size;
-						}
-
-						/* lookup for existing entry */
-						ts = stktable_set_entry(st->table, newts);
-						if (ts != newts) {
-							stksess_free(st->table, newts);
-							newts = NULL;
-						}
-
-						HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
-
-						for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
-
-							if ((1 << data_type) & st->remote_data) {
-								switch (stktable_data_types[data_type].std_type) {
-									case STD_T_SINT: {
-										int data;
-
-										data = intdecode(&msg_cur, msg_end);
-										if (!msg_cur) {
-											/* malformed message */
-											HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
-											stktable_touch_remote(st->table, ts, 1);
-											appctx->st0 = PEER_SESS_ST_ERRPROTO;
-											goto switchstate;
-										}
-
-										data_ptr = stktable_data_ptr(st->table, ts, data_type);
-										if (data_ptr)
-											stktable_data_cast(data_ptr, std_t_sint) = data;
-										break;
-									}
-									case STD_T_UINT: {
-										unsigned int data;
-
-										data = intdecode(&msg_cur, msg_end);
-										if (!msg_cur) {
-											/* malformed message */
-											HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
-											stktable_touch_remote(st->table, ts, 1);
-											appctx->st0 = PEER_SESS_ST_ERRPROTO;
-											goto switchstate;
-										}
-
-										data_ptr = stktable_data_ptr(st->table, ts, data_type);
-										if (data_ptr)
-											stktable_data_cast(data_ptr, std_t_uint) = data;
-										break;
-									}
-									case STD_T_ULL: {
-										unsigned long long  data;
-
-										data = intdecode(&msg_cur, msg_end);
-										if (!msg_cur) {
-											/* malformed message */
-											HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
-											stktable_touch_remote(st->table, ts, 1);
-											appctx->st0 = PEER_SESS_ST_ERRPROTO;
-											goto switchstate;
-										}
-
-										data_ptr = stktable_data_ptr(st->table, ts, data_type);
-										if (data_ptr)
-											stktable_data_cast(data_ptr, std_t_ull) = data;
-										break;
-									}
-									case STD_T_FRQP: {
-										struct freq_ctr_period data;
-
-										/* First bit is reserved for the freq_ctr_period lock
-										   Note: here we're still protected by the stksess lock
-										   so we don't need to update the update the freq_ctr_period
-										   using its internal lock */
-
-										data.curr_tick = tick_add(now_ms, -intdecode(&msg_cur, msg_end)) & ~0x1;
-										if (!msg_cur) {
-											/* malformed message */
-											HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
-											stktable_touch_remote(st->table, ts, 1);
-											appctx->st0 = PEER_SESS_ST_ERRPROTO;
-											goto switchstate;
-										}
-										data.curr_ctr = intdecode(&msg_cur, msg_end);
-										if (!msg_cur) {
-											/* malformed message */
-											HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
-											stktable_touch_remote(st->table, ts, 1);
-											appctx->st0 = PEER_SESS_ST_ERRPROTO;
-											goto switchstate;
-										}
-										data.prev_ctr = intdecode(&msg_cur, msg_end);
-										if (!msg_cur) {
-											/* malformed message */
-											HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
-											stktable_touch_remote(st->table, ts, 1);
-											appctx->st0 = PEER_SESS_ST_ERRPROTO;
-											goto switchstate;
-										}
-
-										data_ptr = stktable_data_ptr(st->table, ts, data_type);
-										if (data_ptr)
-											stktable_data_cast(data_ptr, std_t_frqp) = data;
-										break;
-									}
-								}
-							}
-						}
-
-						/* Force new expiration */
-						ts->expire = tick_add(now_ms, expire);
-
-						HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
-						stktable_touch_remote(st->table, ts, 1);
+						update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED;
+						expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED;
+						if (!peer_recv_updatemsg(appctx, curpeer, update, expire,
+						                          &msg_cur, msg_end, msg_len, totl))
+							goto switchstate;
 
 					}
 					else if (msg_head[1] == PEER_MSG_STKT_ACK) {