MINOR: peers: move messages treatment code to reduce the size of the I/O handler.
Extract the code of the peer I/O handler responsible of treating
any peer protocol message to create peer_treat_awaited_msg() function.
Also rename peer_recv_updatemsg() to peer_treat_updatemsg() as this
function only parse a stick-table update message already received
by peer_recv_msg().
May be backported as far as 1.5.
diff --git a/src/peers.c b/src/peers.c
index 04cd046..7013410 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -1139,8 +1139,8 @@
* update ID.
* <totl> is the length of the stick-table update message computed upon receipt.
*/
-static int peer_recv_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp,
- char **msg_cur, char *msg_end, int msg_len, int totl)
+static int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp,
+ char **msg_cur, char *msg_end, int msg_len, int totl)
{
struct stream_interface *si = appctx->owner;
struct shared_table *st = p->remote_table;
@@ -1603,6 +1603,107 @@
return 0;
}
+
+/*
+ * Treat the awaited message with <msg_head> as header.*
+ * Return 1 if succeeded, 0 if not.
+ */
+static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *peer, unsigned char *msg_head,
+ char **msg_cur, char *msg_end, int msg_len, int totl)
+{
+ struct stream_interface *si = appctx->owner;
+ struct stream *s = si_strm(si);
+ struct peers *peers = strm_fe(s)->parent;
+
+ if (msg_head[0] == PEER_MSG_CLASS_CONTROL) {
+ if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) {
+ struct shared_table *st;
+ /* Reset message: remote need resync */
+
+ /* prepare tables fot a global push */
+ for (st = peer->tables; st; st = st->next) {
+ st->teaching_origin = st->last_pushed = st->table->update;
+ st->flags = 0;
+ }
+
+ /* reset teaching flags to 0 */
+ peer->flags &= PEER_TEACH_RESET;
+
+ /* flag to start to teach lesson */
+ peer->flags |= PEER_F_TEACH_PROCESS;
+ }
+ else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
+ if (peer->flags & PEER_F_LEARN_ASSIGN) {
+ peer->flags &= ~PEER_F_LEARN_ASSIGN;
+ peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
+ peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
+ }
+ peer->confirm++;
+ }
+ else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
+ if (peer->flags & PEER_F_LEARN_ASSIGN) {
+ peer->flags &= ~PEER_F_LEARN_ASSIGN;
+ peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
+
+ peer->flags |= PEER_F_LEARN_NOTUP2DATE;
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
+ task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
+ }
+ peer->confirm++;
+ }
+ else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) {
+ struct shared_table *st;
+
+ /* If stopping state */
+ if (stopping) {
+ /* Close session, push resync no more needed */
+ peer->flags |= PEER_F_TEACH_COMPLETE;
+ appctx->st0 = PEER_SESS_ST_END;
+ return 0;
+ }
+ for (st = peer->tables; st; st = st->next) {
+ st->update = st->last_pushed = st->teaching_origin;
+ st->flags = 0;
+ }
+
+ /* reset teaching flags to 0 */
+ peer->flags &= PEER_TEACH_RESET;
+ }
+ }
+ else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
+ if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
+ if (!peer_treat_definemsg(appctx, peer, msg_cur, msg_end, totl))
+ return 0;
+ }
+ else if (msg_head[1] == PEER_MSG_STKT_SWITCH) {
+ if (!peer_treat_switchmsg(appctx, peer, msg_cur, msg_end))
+ return 0;
+ }
+ else if (msg_head[1] == PEER_MSG_STKT_UPDATE ||
+ msg_head[1] == PEER_MSG_STKT_INCUPDATE ||
+ msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED ||
+ msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
+ int update, expire;
+
+ update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED;
+ expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED;
+ if (!peer_treat_updatemsg(appctx, peer, update, expire,
+ msg_cur, msg_end, msg_len, totl))
+ return 0;
+
+ }
+ else if (msg_head[1] == PEER_MSG_STKT_ACK) {
+ if (!peer_treat_ackmsg(appctx, peer, msg_cur, msg_end))
+ return 0;
+ }
+ }
+ else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) {
+ appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ return 0;
+ }
+
+ return 1;
+}
/*
* IO Handler to handle message exchance with a peer
*/
@@ -1924,94 +2025,8 @@
}
msg_end += msg_len;
-
- if (msg_head[0] == PEER_MSG_CLASS_CONTROL) {
- if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) {
- struct shared_table *st;
- /* Reset message: remote need resync */
-
- /* prepare tables fot a global push */
- for (st = curpeer->tables; st; st = st->next) {
- st->teaching_origin = st->last_pushed = st->table->update;
- st->flags = 0;
- }
-
- /* reset teaching flags to 0 */
- curpeer->flags &= PEER_TEACH_RESET;
-
- /* flag to start to teach lesson */
- curpeer->flags |= PEER_F_TEACH_PROCESS;
- }
- else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
- if (curpeer->flags & PEER_F_LEARN_ASSIGN) {
- curpeer->flags &= ~PEER_F_LEARN_ASSIGN;
- curpeers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
- curpeers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
- }
- curpeer->confirm++;
- }
- else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
- if (curpeer->flags & PEER_F_LEARN_ASSIGN) {
- curpeer->flags &= ~PEER_F_LEARN_ASSIGN;
- curpeers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
-
- curpeer->flags |= PEER_F_LEARN_NOTUP2DATE;
- curpeers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
- task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG);
- }
- curpeer->confirm++;
- }
- else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) {
- struct shared_table *st;
-
- /* If stopping state */
- if (stopping) {
- /* Close session, push resync no more needed */
- curpeer->flags |= PEER_F_TEACH_COMPLETE;
- appctx->st0 = PEER_SESS_ST_END;
- goto switchstate;
- }
- for (st = curpeer->tables; st; st = st->next) {
- st->update = st->last_pushed = st->teaching_origin;
- st->flags = 0;
- }
-
- /* reset teaching flags to 0 */
- curpeer->flags &= PEER_TEACH_RESET;
- }
- }
- else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
- if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
- if (!peer_treat_definemsg(appctx, curpeer, &msg_cur, msg_end, totl))
- goto switchstate;
- }
- else if (msg_head[1] == PEER_MSG_STKT_SWITCH) {
- if (!peer_treat_switchmsg(appctx, curpeer, &msg_cur, msg_end))
- goto switchstate;
- }
- else if (msg_head[1] == PEER_MSG_STKT_UPDATE
- || msg_head[1] == PEER_MSG_STKT_INCUPDATE
- || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED
- || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
- int update, expire;
-
- update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED;
- expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED;
- if (!peer_recv_updatemsg(appctx, curpeer, update, expire,
- &msg_cur, msg_end, msg_len, totl))
- goto switchstate;
-
- }
- else if (msg_head[1] == PEER_MSG_STKT_ACK) {
- if (!peer_treat_ackmsg(appctx, curpeer, &msg_cur, msg_end))
- goto switchstate;
- }
- }
- else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) {
- appctx->st0 = PEER_SESS_ST_ERRPROTO;
+ if (!peer_treat_awaited_msg(appctx, curpeer, msg_head, &msg_cur, msg_end, msg_len, totl))
goto switchstate;
- }
-
ignore_msg:
/* skip consumed message */
co_skip(si_oc(si), totl);