MEDIUM: peers: synchronizaiton code factorization to reduce the size of the I/O handler.
Factorize the code responsible of synchronizing the peers upon startup.
May be backported as far as 1.5.
diff --git a/src/peers.c b/src/peers.c
index 1d54258..6e211e1 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -667,6 +667,197 @@
return peer_send_msg(st, appctx, peer_prepare_updatemsg, ts, updateid, use_identifier, use_timed);
}
+
+/*
+ * Function used to lookup for recent stick-table updates associated with
+ * <st> shared stick-table when a lesson must be taught a peer (PEER_F_LEARN_ASSIGN flag set).
+ */
+static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st)
+{
+ struct eb32_node *eb;
+
+ eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
+ if (!eb) {
+ eb = eb32_first(&st->table->updates);
+ if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) {
+ st->table->commitupdate = st->last_pushed = st->table->localupdate;
+ return NULL;
+ }
+ }
+
+ if ((int)(eb->key - st->table->localupdate) > 0) {
+ st->table->commitupdate = st->last_pushed = st->table->localupdate;
+ return NULL;
+ }
+
+ return eb32_entry(eb, struct stksess, upd);
+}
+
+/*
+ * Function used to lookup for recent stick-table updates associated with
+ * <st> shared stick-table during teach state 1 step.
+ */
+static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st)
+{
+ struct eb32_node *eb;
+
+ eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
+ if (!eb) {
+ st->flags |= SHTABLE_F_TEACH_STAGE1;
+ eb = eb32_first(&st->table->updates);
+ if (eb)
+ st->last_pushed = eb->key - 1;
+ return NULL;
+ }
+
+ return eb32_entry(eb, struct stksess, upd);
+}
+
+/*
+ * Function used to lookup for recent stick-table updates associated with
+ * <st> shared stick-table during teach state 2 step.
+ */
+static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st)
+{
+ struct eb32_node *eb;
+
+ eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
+ if (!eb || eb->key > st->teaching_origin) {
+ st->flags |= SHTABLE_F_TEACH_STAGE2;
+ return NULL;
+ }
+
+ return eb32_entry(eb, struct stksess, upd);
+}
+
+/*
+ * Generic function to emit update messages for <st> stick-table when a lesson must
+ * be taught to the peer <p>.
+ * <locked> must be set to 1 if the shared table <st> is already locked when entering
+ * this function, 0 if not.
+ *
+ * This function temporary unlock/lock <st> when it sends stick-table updates or
+ * when decrementing its refcount in case of any error when it sends this updates.
+ *
+ * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
+ * Returns -1 if there was not enough room left to send the message,
+ * any other negative returned value must be considered as an error with an appcxt st0
+ * returned value equal to PEER_SESS_ST_END.
+ * If it returns 0 or -1, this function leave <st> locked if already locked when entering this function
+ * unlocked if not already locked when entering this function.
+ */
+static inline int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
+ struct stksess *(*peer_stksess_lookup)(struct shared_table *),
+ struct shared_table *st, int locked)
+{
+ int ret, new_pushed, use_timed;
+
+ ret = 1;
+ use_timed = 0;
+ if (st != p->last_local_table) {
+ ret = peer_send_switchmsg(st, appctx);
+ if (ret <= 0)
+ return ret;
+
+ p->last_local_table = st;
+ }
+
+ if (peer_stksess_lookup != peer_teach_process_stksess_lookup)
+ use_timed = !(p->flags & PEER_F_DWNGRD);
+
+ /* We force new pushed to 1 to force identifier in update message */
+ new_pushed = 1;
+
+ if (!locked)
+ HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
+
+ while (1) {
+ struct stksess *ts;
+ unsigned updateid;
+
+ /* push local updates */
+ ts = peer_stksess_lookup(st);
+ if (!ts)
+ break;
+
+ updateid = ts->upd.key;
+ ts->ref_cnt++;
+ HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
+
+ ret = peer_send_updatemsg(st, appctx, ts, updateid, new_pushed, use_timed);
+ if (ret <= 0) {
+ HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
+ ts->ref_cnt--;
+ if (!locked)
+ HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
+ return ret;
+ }
+
+ HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
+ ts->ref_cnt--;
+ st->last_pushed = updateid;
+
+ if (peer_stksess_lookup == peer_teach_process_stksess_lookup &&
+ (int)(st->last_pushed - st->table->commitupdate) > 0)
+ st->table->commitupdate = st->last_pushed;
+
+ /* identifier may not needed in next update message */
+ new_pushed = 0;
+ }
+
+ out:
+ if (!locked)
+ HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
+ return 1;
+}
+
+/*
+ * Function to emit update messages for <st> stick-table when a lesson must
+ * be taught to the peer <p> (PEER_F_LEARN_ASSIGN flag set).
+ *
+ * Note that <st> shared stick-table is locked when calling this function.
+ *
+ * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
+ * Returns -1 if there was not enough room left to send the message,
+ * any other negative returned value must be considered as an error with an appcxt st0
+ * returned value equal to PEER_SESS_ST_END.
+ */
+static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p,
+ struct shared_table *st)
+{
+ return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st, 1);
+}
+
+/*
+ * Function to emit update messages for <st> stick-table when a lesson must
+ * be taught to the peer <p> during teach state 1 step.
+ *
+ * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
+ * Returns -1 if there was not enough room left to send the message,
+ * any other negative returned value must be considered as an error with an appcxt st0
+ * returned value equal to PEER_SESS_ST_END.
+ */
+static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p,
+ struct shared_table *st)
+{
+ return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st, 0);
+}
+
+/*
+ * Function to emit update messages for <st> stick-table when a lesson must
+ * be taught to the peer <p> during teach state 1 step.
+ *
+ * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
+ * Returns -1 if there was not enough room left to send the message,
+ * any other negative returned value must be considered as an error with an appcxt st0
+ * returned value equal to PEER_SESS_ST_END.
+ */
+static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p,
+ struct shared_table *st)
+{
+ return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st, 0);
+}
+
/*
* IO Handler to handle message exchance with a peer
*/
@@ -1530,182 +1721,34 @@
HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) &&
((int)(st->last_pushed - st->table->localupdate) < 0)) {
- struct eb32_node *eb;
- int new_pushed;
-
- if (st != curpeer->last_local_table) {
- repl = peer_send_switchmsg(st, appctx);
- if (repl <= 0) {
- HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
- if (repl == -1)
- goto out;
- goto switchstate;
- }
- curpeer->last_local_table = st;
- }
-
- /* We force new pushed to 1 to force identifier in update message */
- new_pushed = 1;
- while (1) {
- struct stksess *ts;
- unsigned updateid;
-
- /* push local updates */
- eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
- if (!eb) {
- eb = eb32_first(&st->table->updates);
- if (!eb || ((int)(eb->key - st->last_pushed) <= 0)) {
- st->table->commitupdate = st->last_pushed = st->table->localupdate;
- break;
- }
- }
-
- if ((int)(eb->key - st->table->localupdate) > 0) {
- st->table->commitupdate = st->last_pushed = st->table->localupdate;
- break;
- }
- ts = eb32_entry(eb, struct stksess, upd);
- updateid = ts->upd.key;
- ts->ref_cnt++;
+ repl = peer_send_teach_process_msgs(appctx, curpeer, st);
+ if (repl <= 0) {
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-
- repl = peer_send_updatemsg(st, appctx, ts,
- updateid, new_pushed, 0);
- if (repl <= 0) {
- HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
- ts->ref_cnt--;
- HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
- if (repl == -1)
- goto out;
- goto switchstate;
- }
-
- HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
- ts->ref_cnt--;
- st->last_pushed = updateid;
- if ((int)(st->last_pushed - st->table->commitupdate) > 0)
- st->table->commitupdate = st->last_pushed;
- /* identifier may not needed in next update message */
- new_pushed = 0;
+ if (repl == -1)
+ goto out;
+ goto switchstate;
}
}
HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
}
else {
if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
- struct eb32_node *eb;
- int new_pushed;
-
- if (st != curpeer->last_local_table) {
- repl = peer_send_switchmsg(st, appctx);
- if (repl <= 0) {
- if (repl == -1)
- goto out;
- goto switchstate;
- }
- curpeer->last_local_table = st;
- }
-
- /* We force new pushed to 1 to force identifier in update message */
- new_pushed = 1;
- HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
- while (1) {
- struct stksess *ts;
- int use_timed;
- unsigned updateid;
-
- /* push local updates */
- eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
- if (!eb) {
- st->flags |= SHTABLE_F_TEACH_STAGE1;
- eb = eb32_first(&st->table->updates);
- if (eb)
- st->last_pushed = eb->key - 1;
- break;
- }
-
- ts = eb32_entry(eb, struct stksess, upd);
- updateid = ts->upd.key;
- ts->ref_cnt++;
- HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-
- use_timed = !(curpeer->flags & PEER_F_DWNGRD);
- repl = peer_send_updatemsg(st, appctx, ts,
- updateid, new_pushed, use_timed);
- if (repl <= 0) {
- HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
- ts->ref_cnt--;
- HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
- if (repl == -1)
- goto out;
- goto switchstate;
- }
-
- HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
- ts->ref_cnt--;
- st->last_pushed = updateid;
- /* identifier may not needed in next update message */
- new_pushed = 0;
+ repl = peer_send_teach_stage1_msgs(appctx, curpeer, st);
+ if (repl <= 0) {
+ if (repl == -1)
+ goto out;
+ goto switchstate;
}
- HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
}
if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
- struct eb32_node *eb;
- int new_pushed;
-
- if (st != curpeer->last_local_table) {
- repl = peer_send_switchmsg(st, appctx);
- if (repl <= 0) {
- if (repl == -1)
- goto out;
- goto switchstate;
- }
- curpeer->last_local_table = st;
- }
-
- /* We force new pushed to 1 to force identifier in update message */
- new_pushed = 1;
- HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
- while (1) {
- struct stksess *ts;
- int use_timed;
- unsigned updateid;
-
- /* push local updates */
- eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
-
- /* push local updates */
- if (!eb || eb->key > st->teaching_origin) {
- st->flags |= SHTABLE_F_TEACH_STAGE2;
- break;
- }
-
- ts = eb32_entry(eb, struct stksess, upd);
- updateid = ts->upd.key;
- ts->ref_cnt++;
- HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
-
- use_timed = !(curpeer->flags & PEER_F_DWNGRD);
- repl = peer_send_updatemsg(st, appctx, ts,
- updateid, new_pushed, use_timed);
- if (repl <= 0) {
- HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
- ts->ref_cnt--;
- HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
- if (repl == -1)
- goto out;
- goto switchstate;
- }
-
- HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
- ts->ref_cnt--;
- st->last_pushed = updateid;
- /* identifier may not needed in next update message */
- new_pushed = 0;
+ repl = peer_send_teach_stage2_msgs(appctx, curpeer, st);
+ if (repl <= 0) {
+ if (repl == -1)
+ goto out;
+ goto switchstate;
}
- HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
}
}