MINOR: peers: move send code to reduce the size of the I/O handler.
This patch extracts the code responsible of sending peer protocol
messages from the peer I/O handler to create a new function and to
reduce the size of this handler.
May be backported as far as 1.5.
diff --git a/src/peers.c b/src/peers.c
index 7013410..48faa9d 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -1704,6 +1704,110 @@
return 1;
}
+
+
+/*
+ * Send any message to <peer> peer.
+ * Returns 1 if succeeded, or -1 or 0 if failed.
+ * -1 means an internal error occured, 0 is for a peer protocol error leading
+ * to a peer state change (from the peer I/O handler point of view).
+ */
+static inline int peer_send_msgs(struct appctx *appctx, struct peer *peer)
+{
+ int repl;
+ struct stream_interface *si = appctx->owner;
+ struct stream *s = si_strm(si);
+ struct peers *peers = strm_fe(s)->parent;
+
+ /* Need to request a resync */
+ if ((peer->flags & PEER_F_LEARN_ASSIGN) &&
+ (peers->flags & PEERS_F_RESYNC_ASSIGN) &&
+ !(peers->flags & PEERS_F_RESYNC_PROCESS)) {
+
+ repl = peer_send_resync_reqmsg(appctx);
+ if (repl <= 0)
+ return repl;
+
+ peers->flags |= PEERS_F_RESYNC_PROCESS;
+ }
+
+ /* Nothing to read, now we start to write */
+ if (peer->tables) {
+ struct shared_table *st;
+ struct shared_table *last_local_table;
+
+ last_local_table = peer->last_local_table;
+ if (!last_local_table)
+ last_local_table = peer->tables;
+ st = last_local_table->next;
+
+ while (1) {
+ if (!st)
+ st = peer->tables;
+
+ /* It remains some updates to ack */
+ if (st->last_get != st->last_acked) {
+ repl = peer_send_ackmsg(st, appctx);
+ if (repl <= 0)
+ return repl;
+
+ st->last_acked = st->last_get;
+ }
+
+ if (!(peer->flags & PEER_F_TEACH_PROCESS)) {
+ HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
+ if (!(peer->flags & PEER_F_LEARN_ASSIGN) &&
+ ((int)(st->last_pushed - st->table->localupdate) < 0)) {
+
+ repl = peer_send_teach_process_msgs(appctx, peer, st);
+ if (repl <= 0) {
+ HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
+ return repl;
+ }
+ }
+ HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
+ }
+ else {
+ if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
+ repl = peer_send_teach_stage1_msgs(appctx, peer, st);
+ if (repl <= 0)
+ return repl;
+ }
+
+ if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
+ repl = peer_send_teach_stage2_msgs(appctx, peer, st);
+ if (repl <= 0)
+ return repl;
+ }
+ }
+
+ if (st == last_local_table)
+ break;
+ st = st->next;
+ }
+ }
+
+ if ((peer->flags & PEER_F_TEACH_PROCESS) && !(peer->flags & PEER_F_TEACH_FINISHED)) {
+ repl = peer_send_resync_finishedmsg(appctx, peer);
+ if (repl <= 0)
+ return repl;
+
+ /* flag finished message sent */
+ peer->flags |= PEER_F_TEACH_FINISHED;
+ }
+
+ /* Confirm finished or partial messages */
+ while (peer->confirm) {
+ repl = peer_send_resync_confirmsg(appctx);
+ if (repl <= 0)
+ return repl;
+
+ peer->confirm--;
+ }
+
+ return 1;
+}
+
/*
* IO Handler to handle message exchance with a peer
*/
@@ -2042,111 +2146,11 @@
goto switchstate;
}
-
-
-
- /* Need to request a resync */
- if ((curpeer->flags & PEER_F_LEARN_ASSIGN) &&
- (curpeers->flags & PEERS_F_RESYNC_ASSIGN) &&
- !(curpeers->flags & PEERS_F_RESYNC_PROCESS)) {
-
- repl = peer_send_resync_reqmsg(appctx);
- if (repl <= 0) {
- if (repl == -1)
- goto out;
- goto switchstate;
- }
-
- curpeers->flags |= PEERS_F_RESYNC_PROCESS;
- }
-
- /* Nothing to read, now we start to write */
- if (curpeer->tables) {
- struct shared_table *st;
- struct shared_table *last_local_table;
-
- last_local_table = curpeer->last_local_table;
- if (!last_local_table)
- last_local_table = curpeer->tables;
- st = last_local_table->next;
-
- while (1) {
- if (!st)
- st = curpeer->tables;
-
- /* It remains some updates to ack */
- if (st->last_get != st->last_acked) {
- repl = peer_send_ackmsg(st, appctx);
- if (repl <= 0) {
- if (repl == -1)
- goto out;
- goto switchstate;
- }
- st->last_acked = st->last_get;
- }
-
- if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) {
- HA_SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
- if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) &&
- ((int)(st->last_pushed - st->table->localupdate) < 0)) {
-
- repl = peer_send_teach_process_msgs(appctx, curpeer, st);
- if (repl <= 0) {
- HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
- if (repl == -1)
- goto out;
- goto switchstate;
- }
- }
- HA_SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
- }
- else {
- if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
- repl = peer_send_teach_stage1_msgs(appctx, curpeer, st);
- if (repl <= 0) {
- if (repl == -1)
- goto out;
- goto switchstate;
- }
- }
-
- if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
- repl = peer_send_teach_stage2_msgs(appctx, curpeer, st);
- if (repl <= 0) {
- if (repl == -1)
- goto out;
- goto switchstate;
- }
- }
- }
-
- if (st == last_local_table)
- break;
- st = st->next;
- }
- }
-
-
- if ((curpeer->flags & PEER_F_TEACH_PROCESS) && !(curpeer->flags & PEER_F_TEACH_FINISHED)) {
- repl = peer_send_resync_finishedmsg(appctx, curpeer);
- if (repl <= 0) {
- if (repl == -1)
- goto out;
- goto switchstate;
- }
- /* flag finished message sent */
- curpeer->flags |= PEER_F_TEACH_FINISHED;
- }
-
- /* Confirm finished or partial messages */
- while (curpeer->confirm) {
- repl = peer_send_resync_confirmsg(appctx);
- if (repl <= 0) {
- if (repl == -1)
- goto out;
- goto switchstate;
- }
- curpeer->confirm--;
+ repl = peer_send_msgs(appctx, curpeer);
+ if (repl <= 0) {
+ if (repl == -1)
+ goto out;
+ goto switchstate;
}
/* noting more to do */