MEDIUM: peers: synchronizaiton code factorization to reduce the size of the I/O handler.

Factorize the code responsible of synchronizing the peers upon startup.

May be backported as far as 1.5.
diff --git a/src/peers.c b/src/peers.c
index 1d54258..6e211e1 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -667,6 +667,197 @@
 	return peer_send_msg(st, appctx, peer_prepare_updatemsg, ts, updateid, use_identifier, use_timed);
 }
 
+
+/*
+ * Function used to lookup for recent stick-table updates associated with
+ * <st> shared stick-table when a lesson must be taught a peer (PEER_F_LEARN_ASSIGN flag set).
+ */
+static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st)
+{
+	struct eb32_node *eb;
+
+	eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
+	if (!eb) {
+		eb = eb32_first(&st->table->updates);
+		if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) {
+			st->table->commitupdate = st->last_pushed = st->table->localupdate;
+			return NULL;
+		}
+	}
+
+	if ((int)(eb->key - st->table->localupdate) > 0) {
+		st->table->commitupdate = st->last_pushed = st->table->localupdate;
+		return NULL;
+	}
+
+	return eb32_entry(eb, struct stksess, upd);
+}
+
+/*
+ * Function used to lookup for recent stick-table updates associated with
+ * <st> shared stick-table during teach state 1 step.
+ */
+static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st)
+{
+	struct eb32_node *eb;
+
+	eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
+	if (!eb) {
+		st->flags |= SHTABLE_F_TEACH_STAGE1;
+		eb = eb32_first(&st->table->updates);
+		if (eb)
+			st->last_pushed = eb->key - 1;
+		return NULL;
+	}
+
+	return eb32_entry(eb, struct stksess, upd);
+}
+
+/*
+ * Function used to lookup for recent stick-table updates associated with
+ * <st> shared stick-table during teach state 2 step.
+ */
+static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st)
+{
+	struct eb32_node *eb;
+
+	eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
+	if (!eb || eb->key > st->teaching_origin) {
+		st->flags |= SHTABLE_F_TEACH_STAGE2;
+		return NULL;
+	}
+
+	return eb32_entry(eb, struct stksess, upd);
+}
+
+/*
+ * Generic function to emit update messages for <st> stick-table when a lesson must
+ * be taught to the peer <p>.
+ * <locked> must be set to 1 if the shared table <st> is already locked when entering
+ * this function, 0 if not.
+ *
+ * This function temporary unlock/lock <st> when it sends stick-table updates or
+ * when decrementing its refcount in case of any error when it sends this updates.
+ *
+ * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
+ * Returns -1 if there was not enough room left to send the message,
+ * any other negative returned value must  be considered as an error with an appcxt st0
+ * returned value equal to PEER_SESS_ST_END.
+ * If it returns 0 or -1, this function leave <st> locked if already locked when entering this function
+ * unlocked if not already locked when entering this function.
+ */
+static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
+                                      struct stksess *(*peer_stksess_lookup)(struct shared_table *),
+                                      struct shared_table *st, int locked)
+{
+	int ret, new_pushed, use_timed;
+
+	ret = 1;
+	use_timed = 0;
+	if (st != p->last_local_table) {
+		ret = peer_send_switchmsg(st, appctx);
+		if (ret <= 0)
+			return ret;
+
+		p->last_local_table = st;
+	}
+
+	if (peer_stksess_lookup != peer_teach_process_stksess_lookup)
+		use_timed = !(p->flags & PEER_F_DWNGRD);
+
+	/* We force new pushed to 1 to force identifier in update message */
+	new_pushed = 1;
+
+	if (!locked)
+		HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
+
+	while (1) {
+		struct stksess *ts;
+		unsigned updateid;
+
+		/* push local updates */
+		ts = peer_stksess_lookup(st);
+		if (!ts)
+			break;
+
+		updateid = ts->upd.key;
+		ts->ref_cnt++;
+		HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
+
+		ret = peer_send_updatemsg(st, appctx, ts, updateid, new_pushed, use_timed);
+		if (ret <= 0) {
+			HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
+			ts->ref_cnt--;
+			if (!locked)
+				HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
+			return ret;
+		}
+
+		HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
+		ts->ref_cnt--;
+		st->last_pushed = updateid;
+
+		if (peer_stksess_lookup == peer_teach_process_stksess_lookup &&
+		    (int)(st->last_pushed - st->table->commitupdate) > 0)
+			st->table->commitupdate = st->last_pushed;
+
+		/* identifier may not needed in next update message */
+		new_pushed = 0;
+	}
+
+ out:
+	if (!locked)
+		HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
+	return 1;
+}
+
+/*
+ * Function to emit update messages for <st> stick-table when a lesson must
+ * be taught to the peer <p> (PEER_F_LEARN_ASSIGN flag set).
+ *
+ * Note that <st> shared stick-table is locked when calling this function.
+ *
+ * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
+ * Returns -1 if there was not enough room left to send the message,
+ * any other negative returned value must  be considered as an error with an appcxt st0
+ * returned value equal to PEER_SESS_ST_END.
+ */
+static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p,
+                                               struct shared_table *st)
+{
+	return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st, 1);
+}
+
+/*
+ * Function to emit update messages for <st> stick-table when a lesson must
+ * be taught to the peer <p> during teach state 1 step.
+ *
+ * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
+ * Returns -1 if there was not enough room left to send the message,
+ * any other negative returned value must  be considered as an error with an appcxt st0
+ * returned value equal to PEER_SESS_ST_END.
+ */
+static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p,
+                                              struct shared_table *st)
+{
+	return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st, 0);
+}
+
+/*
+ * Function to emit update messages for <st> stick-table when a lesson must
+ * be taught to the peer <p> during teach state 1 step.
+ *
+ * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
+ * Returns -1 if there was not enough room left to send the message,
+ * any other negative returned value must  be considered as an error with an appcxt st0
+ * returned value equal to PEER_SESS_ST_END.
+ */
+static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p,
+                                              struct shared_table *st)
+{
+	return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st, 0);
+}
+
 /*
  * IO Handler to handle message exchance with a peer
  */
@@ -1530,182 +1721,34 @@
 							HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
 							if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) &&
 							    ((int)(st->last_pushed - st->table->localupdate) < 0)) {
-								struct eb32_node *eb;
-								int new_pushed;
-
-								if (st != curpeer->last_local_table) {
-									repl = peer_send_switchmsg(st, appctx);
-									if (repl <= 0) {
-										HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-										if (repl == -1)
-											goto out;
-										goto switchstate;
-									}
-									curpeer->last_local_table = st;
-								}
-
-								/* We force new pushed to 1 to force identifier in update message */
-								new_pushed = 1;
-								while (1) {
-									struct stksess *ts;
-									unsigned updateid;
-
-									/* push local updates */
-									eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
-									if (!eb) {
-										eb = eb32_first(&st->table->updates);
-										if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) {
-											st->table->commitupdate = st->last_pushed = st->table->localupdate;
-											break;
-										}
-									}
-
-									if ((int)(eb->key - st->table->localupdate) > 0) {
-										st->table->commitupdate = st->last_pushed = st->table->localupdate;
-										break;
-									}
 
-									ts = eb32_entry(eb, struct stksess, upd);
-									updateid = ts->upd.key;
-									ts->ref_cnt++;
+								repl = peer_send_teach_process_msgs(appctx, curpeer, st);
+								if (repl <= 0) {
 									HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-
-									repl = peer_send_updatemsg(st, appctx, ts,
-									                           updateid, new_pushed, 0);
-									if (repl <= 0) {
-										HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
-										ts->ref_cnt--;
-										HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-										if (repl == -1)
-											goto out;
-										goto switchstate;
-									}
-
-									HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
-									ts->ref_cnt--;
-									st->last_pushed = updateid;
-									if ((int)(st->last_pushed - st->table->commitupdate) > 0)
-											st->table->commitupdate = st->last_pushed;
-									/* identifier may not needed in next update message */
-									new_pushed = 0;
+									if (repl == -1)
+										goto out;
+									goto switchstate;
 								}
 							}
 							HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
 						}
 						else {
 							if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
-								struct eb32_node *eb;
-								int new_pushed;
-
-								if (st != curpeer->last_local_table) {
-									repl = peer_send_switchmsg(st, appctx);
-									if (repl <= 0) {
-										if (repl == -1)
-											goto out;
-										goto switchstate;
-									}
-									curpeer->last_local_table = st;
-								}
-
-								/* We force new pushed to 1 to force identifier in update message */
-								new_pushed = 1;
-								HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
-								while (1) {
-									struct stksess *ts;
-									int use_timed;
-									unsigned updateid;
-
-									/* push local updates */
-									eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
-									if (!eb) {
-										st->flags |= SHTABLE_F_TEACH_STAGE1;
-										eb = eb32_first(&st->table->updates);
-										if (eb)
-											st->last_pushed = eb->key - 1;
-										break;
-									}
-
-									ts = eb32_entry(eb, struct stksess, upd);
-									updateid = ts->upd.key;
-									ts->ref_cnt++;
-									HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-
-									use_timed = !(curpeer->flags & PEER_F_DWNGRD);
-									repl = peer_send_updatemsg(st, appctx, ts,
-									                           updateid, new_pushed, use_timed);
-									if (repl <= 0) {
-										HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
-										ts->ref_cnt--;
-										HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-										if (repl == -1)
-											goto out;
-										goto switchstate;
-									}
-
-									HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
-									ts->ref_cnt--;
-									st->last_pushed = updateid;
-									/* identifier may not needed in next update message */
-									new_pushed = 0;
+								repl = peer_send_teach_stage1_msgs(appctx, curpeer, st);
+								if (repl <= 0) {
+									if (repl == -1)
+										goto out;
+									goto switchstate;
 								}
-								HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
 							}
 
 							if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
-								struct eb32_node *eb;
-								int new_pushed;
-
-								if (st != curpeer->last_local_table) {
-									repl = peer_send_switchmsg(st, appctx);
-									if (repl <= 0) {
-										if (repl == -1)
-											goto out;
-										goto switchstate;
-									}
-									curpeer->last_local_table = st;
-								}
-
-								/* We force new pushed to 1 to force identifier in update message */
-								new_pushed = 1;
-								HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
-								while (1) {
-									struct stksess *ts;
-									int use_timed;
-									unsigned updateid;
-
-									/* push local updates */
-									eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
-
-									/* push local updates */
-									if (!eb || eb->key > st->teaching_origin) {
-										st->flags |= SHTABLE_F_TEACH_STAGE2;
-										break;
-									}
-
-									ts = eb32_entry(eb, struct stksess, upd);
-									updateid = ts->upd.key;
-									ts->ref_cnt++;
-									HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-
-									use_timed = !(curpeer->flags & PEER_F_DWNGRD);
-									repl = peer_send_updatemsg(st, appctx, ts,
-									                           updateid, new_pushed, use_timed);
-									if (repl <= 0) {
-										HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
-										ts->ref_cnt--;
-										HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-										if (repl == -1)
-											goto out;
-										goto switchstate;
-									}
-
-									HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
-									ts->ref_cnt--;
-									st->last_pushed = updateid;
-									/* identifier may not needed in next update message */
-									new_pushed = 0;
+								repl = peer_send_teach_stage2_msgs(appctx, curpeer, st);
+								if (repl <= 0) {
+									if (repl == -1)
+										goto out;
+									goto switchstate;
 								}
-								HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
 							}
 						}