blob: 26b3e6fde220cec01684a82df492df88d87fae6c [file] [log] [blame]
Emeric Brun2b920a12010-09-23 18:30:22 +02001/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01002 * Stick table synchro management.
Emeric Brun2b920a12010-09-23 18:30:22 +02003 *
4 * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <sys/socket.h>
20#include <sys/stat.h>
21#include <sys/types.h>
22
23#include <common/compat.h>
24#include <common/config.h>
25#include <common/time.h>
26
27#include <types/global.h>
Willy Tarreau3fdb3662012-11-12 00:42:33 +010028#include <types/listener.h>
29#include <types/obj_type.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020030#include <types/peers.h>
31
32#include <proto/acl.h>
Willy Tarreau8a8d83b2015-04-13 13:24:54 +020033#include <proto/applet.h>
Willy Tarreauc7e42382012-08-24 19:22:53 +020034#include <proto/channel.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020035#include <proto/fd.h>
Willy Tarreaud1d48d42015-03-13 16:15:46 +010036#include <proto/frontend.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020037#include <proto/log.h>
38#include <proto/hdr_idx.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020039#include <proto/proto_tcp.h>
40#include <proto/proto_http.h>
41#include <proto/proxy.h>
Willy Tarreaufeb76402015-04-03 14:10:06 +020042#include <proto/session.h>
Willy Tarreau87b09662015-04-03 00:22:06 +020043#include <proto/stream.h>
Willy Tarreau22ec1ea2014-11-27 20:45:39 +010044#include <proto/signal.h>
45#include <proto/stick_table.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020046#include <proto/stream_interface.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020047#include <proto/task.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020048
49
50/*******************************/
51/* Current peer learning state */
52/*******************************/
53
54/******************************/
55/* Current table resync state */
56/******************************/
57#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
58#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
59#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
60#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
61#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
62 to push data to new process */
63
64#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
65#define SHTABLE_RESYNC_FROMLOCAL 0x00000000
66#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL
67#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
68
69/******************************/
70/* Remote peer teaching state */
71/******************************/
72#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
73#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */
74#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */
75#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
76#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
77#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
78#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
79
80#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
81#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
82
83
84/**********************************/
85/* Peer Session IO handler states */
86/**********************************/
87
Willy Tarreaue4d927a2013-12-01 12:47:35 +010088enum {
89 PEER_SESS_ST_ACCEPT = 0, /* Initial state for session create by an accept, must be zero! */
90 PEER_SESS_ST_GETVERSION, /* Validate supported protocol version */
91 PEER_SESS_ST_GETHOST, /* Validate host ID correspond to local host id */
92 PEER_SESS_ST_GETPEER, /* Validate peer ID correspond to a known remote peer id */
93 PEER_SESS_ST_GETTABLE, /* Search into registered table for a table with same id and validate type and size */
94 /* after this point, data were possibly exchanged */
95 PEER_SESS_ST_SENDSUCCESS, /* Send ret code 200 (success) and wait for message */
96 PEER_SESS_ST_CONNECT, /* Initial state for session create on a connect, push presentation into buffer */
97 PEER_SESS_ST_GETSTATUS, /* Wait for the welcome message */
98 PEER_SESS_ST_WAITMSG, /* Wait for data messages */
99 PEER_SESS_ST_EXIT, /* Exit with status code */
100 PEER_SESS_ST_END, /* Killed session */
101};
Emeric Brun2b920a12010-09-23 18:30:22 +0200102
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100103/***************************************************/
104/* Peer Session status code - part of the protocol */
105/***************************************************/
Emeric Brun2b920a12010-09-23 18:30:22 +0200106
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100107#define PEER_SESS_SC_CONNECTCODE 100 /* connect in progress */
108#define PEER_SESS_SC_CONNECTEDCODE 110 /* tcp connect success */
Emeric Brun2b920a12010-09-23 18:30:22 +0200109
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100110#define PEER_SESS_SC_SUCCESSCODE 200 /* accept or connect successful */
Emeric Brun2b920a12010-09-23 18:30:22 +0200111
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100112#define PEER_SESS_SC_TRYAGAIN 300 /* try again later */
Emeric Brun2b920a12010-09-23 18:30:22 +0200113
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100114#define PEER_SESS_SC_ERRPROTO 501 /* error protocol */
115#define PEER_SESS_SC_ERRVERSION 502 /* unknown protocol version */
116#define PEER_SESS_SC_ERRHOST 503 /* bad host name */
117#define PEER_SESS_SC_ERRPEER 504 /* unknown peer */
118#define PEER_SESS_SC_ERRTYPE 505 /* table key type mismatch */
119#define PEER_SESS_SC_ERRSIZE 506 /* table key size mismatch */
120#define PEER_SESS_SC_ERRTABLE 507 /* unknown table */
Emeric Brun2b920a12010-09-23 18:30:22 +0200121
122#define PEER_SESSION_PROTO_NAME "HAProxyS"
123
124struct peers *peers = NULL;
Willy Tarreau87b09662015-04-03 00:22:06 +0200125static void peer_session_forceshutdown(struct stream * stream);
Emeric Brun2b920a12010-09-23 18:30:22 +0200126
127
128/*
129 * This prepare the data update message of the stick session <ts>, <ps> is the the peer session
130 * where the data going to be pushed, <msg> is a buffer of <size> to recieve data message content
131 */
Simon Horman96553772011-06-08 09:18:51 +0900132static int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size)
Emeric Brun2b920a12010-09-23 18:30:22 +0200133{
134 uint32_t netinteger;
135 int len;
136 /* construct message */
137 if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) {
138 msg[0] = 0x80 + ts->upd.key - ps->lastpush;
139 len = sizeof(char);
140 }
141 else {
142 msg[0] = 'D';
143 netinteger = htonl(ts->upd.key);
144 memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger));
145 len = sizeof(char) + sizeof(netinteger);
146 }
147
148 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
149 int stlen = strlen((char *)ts->key.key);
150
151 netinteger = htonl(strlen((char *)ts->key.key));
152 memcpy(&msg[len], &netinteger, sizeof(netinteger));
153 memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen);
154 len += sizeof(netinteger) + stlen;
155
156 }
157 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
158 netinteger = htonl(*((uint32_t *)ts->key.key));
159 memcpy(&msg[len], &netinteger, sizeof(netinteger));
160 len += sizeof(netinteger);
161 }
162 else {
163 memcpy(&msg[len], ts->key.key, ps->table->table->key_size);
164 len += ps->table->table->key_size;
165 }
166
167 if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
168 netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id));
169 else
170 netinteger = 0;
171
172 memcpy(&msg[len], &netinteger , sizeof(netinteger));
173 len += sizeof(netinteger);
174
175 return len;
176}
177
178
179/*
180 * Callback to release a session with a peer
181 */
Willy Tarreau00a37f02015-04-13 12:05:19 +0200182static void peer_session_release(struct appctx *appctx)
Emeric Brun2b920a12010-09-23 18:30:22 +0200183{
Willy Tarreau00a37f02015-04-13 12:05:19 +0200184 struct stream_interface *si = appctx->owner;
Willy Tarreau87b09662015-04-03 00:22:06 +0200185 struct stream *s = si_strm(si);
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100186 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200187
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100188 /* appctx->ctx.peers.ptr is not a peer session */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100189 if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
Emeric Brun2b920a12010-09-23 18:30:22 +0200190 return;
191
192 /* peer session identified */
193 if (ps) {
Willy Tarreau87b09662015-04-03 00:22:06 +0200194 if (ps->stream == s) {
195 ps->stream = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200196 if (ps->flags & PEER_F_LEARN_ASSIGN) {
197 /* unassign current peer for learning */
198 ps->flags &= ~(PEER_F_LEARN_ASSIGN);
199 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
200
201 /* reschedule a resync */
202 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
203 }
204 /* reset teaching and learning flags to 0 */
205 ps->flags &= PEER_TEACH_RESET;
206 ps->flags &= PEER_LEARN_RESET;
207 }
208 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
209 }
210}
211
212
213/*
214 * IO Handler to handle message exchance with a peer
215 */
Willy Tarreau00a37f02015-04-13 12:05:19 +0200216static void peer_io_handler(struct appctx *appctx)
Emeric Brun2b920a12010-09-23 18:30:22 +0200217{
Willy Tarreau00a37f02015-04-13 12:05:19 +0200218 struct stream_interface *si = appctx->owner;
Willy Tarreau87b09662015-04-03 00:22:06 +0200219 struct stream *s = si_strm(si);
Willy Tarreaud0d8da92015-04-04 02:10:38 +0200220 struct peers *curpeers = (struct peers *)strm_fe(s)->parent;
Emeric Brun2b920a12010-09-23 18:30:22 +0200221 int reql = 0;
222 int repl = 0;
223
224 while (1) {
225switchstate:
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100226 switch(appctx->st0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100227 case PEER_SESS_ST_ACCEPT:
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100228 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100229 appctx->st0 = PEER_SESS_ST_GETVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200230 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100231 case PEER_SESS_ST_GETVERSION:
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100232 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200233 if (reql <= 0) { /* closed or EOL not found */
234 if (reql == 0)
235 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100236 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200237 goto switchstate;
238 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100239 if (trash.str[reql-1] != '\n') {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100240 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200241 goto switchstate;
242 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100243 else if (reql > 1 && (trash.str[reql-2] == '\r'))
244 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200245 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100246 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200247
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100248 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200249
250 /* test version */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100251 if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash.str) != 0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100252 appctx->st0 = PEER_SESS_ST_EXIT;
253 appctx->st1 = PEER_SESS_SC_ERRVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200254 /* test protocol */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100255 if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100256 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200257 goto switchstate;
258 }
259
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100260 appctx->st0 = PEER_SESS_ST_GETHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200261 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100262 case PEER_SESS_ST_GETHOST:
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100263 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200264 if (reql <= 0) { /* closed or EOL not found */
265 if (reql == 0)
266 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100267 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200268 goto switchstate;
269 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100270 if (trash.str[reql-1] != '\n') {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100271 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200272 goto switchstate;
273 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100274 else if (reql > 1 && (trash.str[reql-2] == '\r'))
275 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200276 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100277 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200278
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100279 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200280
281 /* test hostname match */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100282 if (strcmp(localpeer, trash.str) != 0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100283 appctx->st0 = PEER_SESS_ST_EXIT;
284 appctx->st1 = PEER_SESS_SC_ERRHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200285 goto switchstate;
286 }
287
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100288 appctx->st0 = PEER_SESS_ST_GETPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200289 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100290 case PEER_SESS_ST_GETPEER: {
Emeric Brun2b920a12010-09-23 18:30:22 +0200291 struct peer *curpeer;
292 char *p;
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100293 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200294 if (reql <= 0) { /* closed or EOL not found */
295 if (reql == 0)
296 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100297 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200298 goto switchstate;
299 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100300 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200301 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100302 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200303 goto switchstate;
304 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100305 else if (reql > 1 && (trash.str[reql-2] == '\r'))
306 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200307 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100308 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200309
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100310 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200311
312 /* parse line "<peer name> <pid>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100313 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200314 if (!p) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100315 appctx->st0 = PEER_SESS_ST_EXIT;
316 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200317 goto switchstate;
318 }
319 *p = 0;
320
321 /* lookup known peer */
322 for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) {
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100323 if (strcmp(curpeer->id, trash.str) == 0)
Emeric Brun2b920a12010-09-23 18:30:22 +0200324 break;
325 }
326
327 /* if unknown peer */
328 if (!curpeer) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100329 appctx->st0 = PEER_SESS_ST_EXIT;
330 appctx->st1 = PEER_SESS_SC_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200331 goto switchstate;
332 }
333
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100334 appctx->ctx.peers.ptr = curpeer;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100335 appctx->st0 = PEER_SESS_ST_GETTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200336 /* fall through */
337 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100338 case PEER_SESS_ST_GETTABLE: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100339 struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200340 struct shared_table *st;
341 struct peer_session *ps = NULL;
342 unsigned long key_type;
343 size_t key_size;
344 char *p;
345
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100346 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200347 if (reql <= 0) { /* closed or EOL not found */
348 if (reql == 0)
349 goto out;
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100350 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100351 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200352 goto switchstate;
353 }
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100354 /* Re init appctx->ctx.peers.ptr to null, to handle correctly a release case */
355 appctx->ctx.peers.ptr = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200356
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100357 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200358 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100359 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200360 goto switchstate;
361 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100362 else if (reql > 1 && (trash.str[reql-2] == '\r'))
363 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200364 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100365 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200366
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100367 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200368
369 /* Parse line "<table name> <type> <size>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100370 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200371 if (!p) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100372 appctx->st0 = PEER_SESS_ST_EXIT;
373 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200374 goto switchstate;
375 }
376 *p = 0;
377 key_type = (unsigned long)atol(p+1);
378
379 p = strchr(p+1, ' ');
380 if (!p) {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100381 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100382 appctx->st0 = PEER_SESS_ST_EXIT;
383 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200384 goto switchstate;
385 }
386
387 key_size = (size_t)atoi(p);
388 for (st = curpeers->tables; st; st = st->next) {
389 /* If table name matches */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100390 if (strcmp(st->table->id, trash.str) == 0) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100391 /* Check key size mismatches, except for strings
392 * which may be truncated as long as they fit in
393 * a buffer.
394 */
395 if (key_size != st->table->key_size &&
396 (key_type != STKTABLE_TYPE_STRING ||
397 1 + 4 + 4 + key_size - 1 >= trash.size)) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100398 appctx->st0 = PEER_SESS_ST_EXIT;
399 appctx->st1 = PEER_SESS_SC_ERRSIZE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200400 goto switchstate;
401 }
402
403 /* If key type mismatches */
404 if (key_type != st->table->type) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100405 appctx->st0 = PEER_SESS_ST_EXIT;
406 appctx->st1 = PEER_SESS_SC_ERRTYPE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200407 goto switchstate;
408 }
409
Willy Tarreau87b09662015-04-03 00:22:06 +0200410 /* lookup peer stream of current peer */
Emeric Brun2b920a12010-09-23 18:30:22 +0200411 for (ps = st->sessions; ps; ps = ps->next) {
412 if (ps->peer == curpeer) {
Willy Tarreau87b09662015-04-03 00:22:06 +0200413 /* If stream already active, replaced by new one */
414 if (ps->stream && ps->stream != s) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200415 if (ps->peer->local) {
416 /* Local connection, reply a retry */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100417 appctx->st0 = PEER_SESS_ST_EXIT;
418 appctx->st1 = PEER_SESS_SC_TRYAGAIN;
Emeric Brun2b920a12010-09-23 18:30:22 +0200419 goto switchstate;
420 }
Willy Tarreau87b09662015-04-03 00:22:06 +0200421 peer_session_forceshutdown(ps->stream);
Emeric Brun2b920a12010-09-23 18:30:22 +0200422 }
Willy Tarreau87b09662015-04-03 00:22:06 +0200423 ps->stream = s;
Emeric Brun2b920a12010-09-23 18:30:22 +0200424 break;
425 }
426 }
427 break;
428 }
429 }
430
431 /* If table not found */
432 if (!st){
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100433 appctx->st0 = PEER_SESS_ST_EXIT;
434 appctx->st1 = PEER_SESS_SC_ERRTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200435 goto switchstate;
436 }
437
438 /* If no peer session for current peer */
439 if (!ps) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100440 appctx->st0 = PEER_SESS_ST_EXIT;
441 appctx->st1 = PEER_SESS_SC_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200442 goto switchstate;
443 }
444
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100445 appctx->ctx.peers.ptr = ps;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100446 appctx->st0 = PEER_SESS_ST_SENDSUCCESS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200447 /* fall through */
448 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100449 case PEER_SESS_ST_SENDSUCCESS: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100450 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200451
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100452 repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100453 repl = bi_putblk(si_ic(si), trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200454 if (repl <= 0) {
455 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100456 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100457 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200458 goto switchstate;
459 }
460
461 /* Register status code */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100462 ps->statuscode = PEER_SESS_SC_SUCCESSCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200463
464 /* Awake main task */
465 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
466
467 /* Init cursors */
468 ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0;
469 ps->pushed = ps->update;
470
471 /* Init confirm counter */
472 ps->confirm = 0;
473
474 /* reset teaching and learning flags to 0 */
475 ps->flags &= PEER_TEACH_RESET;
476 ps->flags &= PEER_LEARN_RESET;
477
478 /* if current peer is local */
479 if (ps->peer->local) {
480 /* if table need resyncfrom local and no process assined */
481 if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL &&
482 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
483 /* assign local peer for a lesson, consider lesson already requested */
484 ps->flags |= PEER_F_LEARN_ASSIGN;
485 ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
486 }
487
488 }
489 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
490 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
491 /* assign peer for a lesson */
492 ps->flags |= PEER_F_LEARN_ASSIGN;
493 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
494 }
495 /* switch to waiting message state */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100496 appctx->st0 = PEER_SESS_ST_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200497 goto switchstate;
498 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100499 case PEER_SESS_ST_CONNECT: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100500 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200501
502 /* Send headers */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100503 repl = snprintf(trash.str, trash.size,
Emeric Brun2b920a12010-09-23 18:30:22 +0200504 PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n",
505 ps->peer->id,
506 localpeer,
Willy Tarreau7b77c9f2012-01-07 22:52:12 +0100507 (int)getpid(),
Emeric Brun2b920a12010-09-23 18:30:22 +0200508 ps->table->table->id,
509 ps->table->table->type,
Willy Tarreaubd55e312010-11-11 10:55:09 +0100510 (int)ps->table->table->key_size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200511
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100512 if (repl >= trash.size) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100513 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200514 goto switchstate;
515 }
516
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100517 repl = bi_putblk(si_ic(si), trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200518 if (repl <= 0) {
519 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100520 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100521 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200522 goto switchstate;
523 }
524
525 /* switch to the waiting statuscode state */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100526 appctx->st0 = PEER_SESS_ST_GETSTATUS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200527 /* fall through */
528 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100529 case PEER_SESS_ST_GETSTATUS: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100530 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200531
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100532 if (si_ic(si)->flags & CF_WRITE_PARTIAL)
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100533 ps->statuscode = PEER_SESS_SC_CONNECTEDCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200534
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100535 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200536 if (reql <= 0) { /* closed or EOL not found */
537 if (reql == 0)
538 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100539 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200540 goto switchstate;
541 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100542 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200543 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100544 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200545 goto switchstate;
546 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100547 else if (reql > 1 && (trash.str[reql-2] == '\r'))
548 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200549 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100550 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200551
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100552 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200553
554 /* Register status code */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100555 ps->statuscode = atoi(trash.str);
Emeric Brun2b920a12010-09-23 18:30:22 +0200556
557 /* Awake main task */
558 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
559
560 /* If status code is success */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100561 if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200562 /* Init cursors */
563 ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0;
564 ps->pushed = ps->update;
565
566 /* Init confirm counter */
567 ps->confirm = 0;
568
569 /* reset teaching and learning flags to 0 */
570 ps->flags &= PEER_TEACH_RESET;
571 ps->flags &= PEER_LEARN_RESET;
572
573 /* If current peer is local */
574 if (ps->peer->local) {
575 /* Init cursors to push a resync */
576 ps->teaching_origin = ps->pushed = ps->table->table->update;
577 /* flag to start to teach lesson */
578 ps->flags |= PEER_F_TEACH_PROCESS;
579
580 }
581 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
582 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
583 /* If peer is remote and resync from remote is needed,
584 and no peer currently assigned */
585
586 /* assign peer for a lesson */
587 ps->flags |= PEER_F_LEARN_ASSIGN;
588 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
589 }
590
591 }
592 else {
593 /* Status code is not success, abort */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100594 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200595 goto switchstate;
596 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100597 appctx->st0 = PEER_SESS_ST_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200598 /* fall through */
599 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100600 case PEER_SESS_ST_WAITMSG: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100601 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200602 struct stksess *ts, *newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200603 char c;
604 int totl = 0;
605
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100606 reql = bo_getblk(si_oc(si), (char *)&c, sizeof(c), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200607 if (reql <= 0) /* closed or EOL not found */
608 goto incomplete;
609
Emeric Brun2b920a12010-09-23 18:30:22 +0200610 totl += reql;
611
612 if ((c & 0x80) || (c == 'D')) {
613 /* Here we have data message */
614 unsigned int pushack;
Emeric Brun2b920a12010-09-23 18:30:22 +0200615 int srvid;
616 uint32_t netinteger;
617
618 /* Compute update remote version */
619 if (c & 0x80) {
620 pushack = ps->pushack + (unsigned int)(c & 0x7F);
621 }
622 else {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100623 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200624 if (reql <= 0) /* closed or EOL not found */
625 goto incomplete;
626
Emeric Brun2b920a12010-09-23 18:30:22 +0200627 totl += reql;
628 pushack = ntohl(netinteger);
629 }
630
Willy Tarreau86a446e2013-11-25 23:02:37 +0100631 /* Read key. The string keys are read in two steps, the first step
632 * consists in reading whatever fits into the table directly into
633 * the pre-allocated key. The second step consists in simply
634 * draining all exceeding data. This can happen for example after a
635 * config reload with a smaller key size for the stick table than
636 * what was previously set, or when facing the impossibility to
637 * allocate a new stksess (for example when the table is full with
638 * "nopurge").
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200639 */
Emeric Brun2b920a12010-09-23 18:30:22 +0200640 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100641 unsigned int to_read, to_store;
642
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200643 /* read size first */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100644 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200645 if (reql <= 0) /* closed or EOL not found */
646 goto incomplete;
647
Emeric Brun2b920a12010-09-23 18:30:22 +0200648 totl += reql;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100649
650 to_store = 0;
651 to_read = ntohl(netinteger);
652
Willy Tarreau4e4292b2014-11-28 12:18:45 +0100653 if (to_read + totl > si_ob(si)->size) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100654 /* impossible to read a key this large, abort */
655 reql = -1;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200656 goto incomplete;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100657 }
Willy Tarreau72d6c162013-04-11 16:14:13 +0200658
Willy Tarreau86a446e2013-11-25 23:02:37 +0100659 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200660 if (newts)
Willy Tarreau86a446e2013-11-25 23:02:37 +0100661 to_store = MIN(to_read, ps->table->table->key_size - 1);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200662
Willy Tarreau86a446e2013-11-25 23:02:37 +0100663 /* we read up to two blocks, the first one goes into the key,
664 * the rest is drained into the trash.
665 */
666 if (to_store) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100667 reql = bo_getblk(si_oc(si), (char *)newts->key.key, to_store, totl);
Willy Tarreau86a446e2013-11-25 23:02:37 +0100668 if (reql <= 0) /* closed or incomplete */
669 goto incomplete;
670 newts->key.key[reql] = 0;
671 totl += reql;
672 to_read -= reql;
673 }
674 if (to_read) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100675 reql = bo_getblk(si_oc(si), trash.str, to_read, totl);
Willy Tarreau86a446e2013-11-25 23:02:37 +0100676 if (reql <= 0) /* closed or incomplete */
677 goto incomplete;
678 totl += reql;
679 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200680 }
681 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100682 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200683 if (reql <= 0) /* closed or EOL not found */
684 goto incomplete;
Cyril Bonté9a60ff92014-02-16 01:07:07 +0100685 newts = stksess_new(ps->table->table, NULL);
686 if (newts) {
687 netinteger = ntohl(netinteger);
688 memcpy(newts->key.key, &netinteger, sizeof(netinteger));
689 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200690 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200691 }
692 else {
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200693 /* type ip or binary */
694 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100695 reql = bo_getblk(si_oc(si), newts ? (char *)newts->key.key : trash.str, ps->table->table->key_size, totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200696 if (reql <= 0) /* closed or EOL not found */
697 goto incomplete;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200698 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200699 }
700
701 /* read server id */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100702 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200703 if (reql <= 0) /* closed or EOL not found */
704 goto incomplete;
705
Emeric Brun2b920a12010-09-23 18:30:22 +0200706 totl += reql;
707 srvid = ntohl(netinteger);
708
709 /* update entry */
Emeric Brun2b920a12010-09-23 18:30:22 +0200710 if (newts) {
711 /* lookup for existing entry */
712 ts = stktable_lookup(ps->table->table, newts);
713 if (ts) {
714 /* the entry already exist, we can free ours */
715 stktable_touch(ps->table->table, ts, 0);
716 stksess_free(ps->table->table, newts);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200717 newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200718 }
719 else {
720 struct eb32_node *eb;
721
722 /* create new entry */
723 ts = stktable_store(ps->table->table, newts, 0);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200724 newts = NULL; /* don't reuse it */
725
Emeric Brun2b920a12010-09-23 18:30:22 +0200726 ts->upd.key= (++ps->table->table->update)+(2^31);
727 eb = eb32_insert(&ps->table->table->updates, &ts->upd);
728 if (eb != &ts->upd) {
729 eb32_delete(eb);
730 eb32_insert(&ps->table->table->updates, &ts->upd);
731 }
732 }
733
734 /* update entry */
735 if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
736 stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
737 ps->pushack = pushack;
738 }
739
740 }
741 else if (c == 'R') {
742 /* Reset message: remote need resync */
743
744 /* reinit counters for a resync */
745 ps->lastpush = 0;
746 ps->teaching_origin = ps->pushed = ps->table->table->update;
747
748 /* reset teaching flags to 0 */
749 ps->flags &= PEER_TEACH_RESET;
750
751 /* flag to start to teach lesson */
752 ps->flags |= PEER_F_TEACH_PROCESS;
753 }
754 else if (c == 'F') {
755 /* Finish message, all known updates have been pushed by remote */
756 /* and remote is up to date */
757
758 /* If resync is in progress with remote peer */
759 if (ps->flags & PEER_F_LEARN_ASSIGN) {
760
761 /* unassign current peer for learning */
762 ps->flags &= ~PEER_F_LEARN_ASSIGN;
763 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
764
765 /* Consider table is now up2date, resync resync no more needed from local neither remote */
766 ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE);
767 }
768 /* Increase confirm counter to launch a confirm message */
769 ps->confirm++;
770 }
771 else if (c == 'c') {
772 /* confirm message, remote peer is now up to date with us */
773
774 /* If stopping state */
775 if (stopping) {
776 /* Close session, push resync no more needed */
777 ps->flags |= PEER_F_TEACH_COMPLETE;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100778 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200779 goto switchstate;
780 }
781
782 /* reset teaching flags to 0 */
783 ps->flags &= PEER_TEACH_RESET;
784 }
785 else if (c == 'C') {
786 /* Continue message, all known updates have been pushed by remote */
787 /* but remote is not up to date */
788
789 /* If resync is in progress with current peer */
790 if (ps->flags & PEER_F_LEARN_ASSIGN) {
791
792 /* unassign current peer */
793 ps->flags &= ~PEER_F_LEARN_ASSIGN;
794 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
795
796 /* flag current peer is not up 2 date to try from an other */
797 ps->flags |= PEER_F_LEARN_NOTUP2DATE;
798
799 /* reschedule a resync */
800 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
801 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
802 }
803 ps->confirm++;
804 }
805 else if (c == 'A') {
806 /* ack message */
807 uint32_t netinteger;
808
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100809 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200810 if (reql <= 0) /* closed or EOL not found */
811 goto incomplete;
812
Emeric Brun2b920a12010-09-23 18:30:22 +0200813 totl += reql;
814
815 /* Consider remote is up to date with "acked" version */
816 ps->update = ntohl(netinteger);
817 }
818 else {
819 /* Unknown message */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100820 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200821 goto switchstate;
822 }
823
824 /* skip consumed message */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100825 bo_skip(si_oc(si), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200826
827 /* loop on that state to peek next message */
Willy Tarreau72d6c162013-04-11 16:14:13 +0200828 goto switchstate;
829
Emeric Brun2b920a12010-09-23 18:30:22 +0200830incomplete:
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200831 /* we get here when a bo_getblk() returns <= 0 in reql */
832
833 /* first, we may have to release newts */
834 if (newts) {
835 stksess_free(ps->table->table, newts);
836 newts = NULL;
837 }
838
Willy Tarreau72d6c162013-04-11 16:14:13 +0200839 if (reql < 0) {
840 /* there was an error */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100841 appctx->st0 = PEER_SESS_ST_END;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200842 goto switchstate;
843 }
844
Emeric Brun2b920a12010-09-23 18:30:22 +0200845 /* Nothing to read, now we start to write */
846
847 /* Confirm finished or partial messages */
848 while (ps->confirm) {
849 /* There is a confirm messages to send */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100850 repl = bi_putchr(si_ic(si), 'c');
Emeric Brun2b920a12010-09-23 18:30:22 +0200851 if (repl <= 0) {
852 /* no more write possible */
853 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100854 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100855 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200856 goto switchstate;
857 }
858 ps->confirm--;
859 }
860
861 /* Need to request a resync */
862 if ((ps->flags & PEER_F_LEARN_ASSIGN) &&
863 (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) &&
864 !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
865 /* Current peer was elected to request a resync */
866
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100867 repl = bi_putchr(si_ic(si), 'R');
Emeric Brun2b920a12010-09-23 18:30:22 +0200868 if (repl <= 0) {
869 /* no more write possible */
870 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100871 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100872 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200873 goto switchstate;
874 }
875 ps->table->flags |= SHTABLE_F_RESYNC_PROCESS;
876 }
877
878 /* It remains some updates to ack */
879 if (ps->pushack != ps->lastack) {
880 uint32_t netinteger;
881
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100882 trash.str[0] = 'A';
Emeric Brun2b920a12010-09-23 18:30:22 +0200883 netinteger = htonl(ps->pushack);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100884 memcpy(&trash.str[1], &netinteger, sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200885
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100886 repl = bi_putblk(si_ic(si), trash.str, 1+sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200887 if (repl <= 0) {
888 /* no more write possible */
889 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100890 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100891 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200892 goto switchstate;
893 }
894 ps->lastack = ps->pushack;
895 }
896
897 if (ps->flags & PEER_F_TEACH_PROCESS) {
898 /* current peer was requested for a lesson */
899
900 if (!(ps->flags & PEER_F_TEACH_STAGE1)) {
901 /* lesson stage 1 not complete */
902 struct eb32_node *eb;
903
904 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
905 while (1) {
906 int msglen;
907 struct stksess *ts;
908
909 if (!eb) {
910 /* flag lesson stage1 complete */
911 ps->flags |= PEER_F_TEACH_STAGE1;
912 eb = eb32_first(&ps->table->table->updates);
913 if (eb)
914 ps->pushed = eb->key - 1;
915 break;
916 }
917
918 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100919 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200920 if (msglen) {
921 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100922 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200923 if (repl <= 0) {
924 /* no more write possible */
925 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100926 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100927 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200928 goto switchstate;
929 }
930 ps->lastpush = ps->pushed = ts->upd.key;
931 }
932 eb = eb32_next(eb);
933 }
934 } /* !TEACH_STAGE1 */
935
936 if (!(ps->flags & PEER_F_TEACH_STAGE2)) {
937 /* lesson stage 2 not complete */
938 struct eb32_node *eb;
939
940 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
941 while (1) {
942 int msglen;
943 struct stksess *ts;
944
945 if (!eb || eb->key > ps->teaching_origin) {
946 /* flag lesson stage1 complete */
947 ps->flags |= PEER_F_TEACH_STAGE2;
948 ps->pushed = ps->teaching_origin;
949 break;
950 }
951
952 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100953 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200954 if (msglen) {
955 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100956 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200957 if (repl <= 0) {
958 /* no more write possible */
959 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100960 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100961 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200962 goto switchstate;
963 }
964 ps->lastpush = ps->pushed = ts->upd.key;
965 }
966 eb = eb32_next(eb);
967 }
968 } /* !TEACH_STAGE2 */
969
970 if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
971 /* process final lesson message */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100972 repl = bi_putchr(si_ic(si), ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
Emeric Brun2b920a12010-09-23 18:30:22 +0200973 if (repl <= 0) {
974 /* no more write possible */
975 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100976 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100977 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200978 goto switchstate;
979 }
980
981 /* flag finished message sent */
982 ps->flags |= PEER_F_TEACH_FINISHED;
983 } /* !TEACH_FINISHED */
984 } /* TEACH_PROCESS */
985
986 if (!(ps->flags & PEER_F_LEARN_ASSIGN) &&
987 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
988 /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */
989 struct eb32_node *eb;
990
991 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
992 while (1) {
993 int msglen;
994 struct stksess *ts;
995
996 /* push local updates */
997 if (!eb) {
998 eb = eb32_first(&ps->table->table->updates);
999 if (!eb || ((int)(eb->key - ps->pushed) <= 0)) {
1000 ps->pushed = ps->table->table->localupdate;
1001 break;
1002 }
1003 }
1004
1005 if ((int)(eb->key - ps->table->table->localupdate) > 0) {
1006 ps->pushed = ps->table->table->localupdate;
1007 break;
1008 }
1009
1010 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001011 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +02001012 if (msglen) {
1013 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001014 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +02001015 if (repl <= 0) {
1016 /* no more write possible */
1017 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +01001018 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001019 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001020 goto switchstate;
1021 }
1022 ps->lastpush = ps->pushed = ts->upd.key;
1023 }
1024 eb = eb32_next(eb);
1025 }
1026 } /* ! LEARN_ASSIGN */
1027 /* noting more to do */
1028 goto out;
1029 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001030 case PEER_SESS_ST_EXIT:
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001031 repl = snprintf(trash.str, trash.size, "%d\n", appctx->st1);
Emeric Brun2b920a12010-09-23 18:30:22 +02001032
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001033 if (bi_putblk(si_ic(si), trash.str, repl) == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +01001034 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001035 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001036 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001037 case PEER_SESS_ST_END: {
Willy Tarreau73b013b2012-05-21 16:31:45 +02001038 si_shutw(si);
1039 si_shutr(si);
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001040 si_ic(si)->flags |= CF_READ_NULL;
Willy Tarreau828824a2015-04-19 17:20:03 +02001041 goto out;
Emeric Brun2b920a12010-09-23 18:30:22 +02001042 }
1043 }
1044 }
1045out:
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001046 si_oc(si)->flags |= CF_READ_DONTWAIT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001047 return;
Willy Tarreaubc18da12015-03-13 14:00:47 +01001048full:
Willy Tarreaufe127932015-04-21 19:23:39 +02001049 si_applet_cant_put(si);
Willy Tarreaubc18da12015-03-13 14:00:47 +01001050 goto out;
Emeric Brun2b920a12010-09-23 18:30:22 +02001051}
1052
Willy Tarreau30576452015-04-13 13:50:30 +02001053static struct applet peer_applet = {
Willy Tarreau3fdb3662012-11-12 00:42:33 +01001054 .obj_type = OBJ_TYPE_APPLET,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001055 .name = "<PEER>", /* used for logging */
1056 .fct = peer_io_handler,
Aman Gupta9a13e842012-04-02 18:57:53 -07001057 .release = peer_session_release,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001058};
Emeric Brun2b920a12010-09-23 18:30:22 +02001059
1060/*
1061 * Use this function to force a close of a peer session
1062 */
Willy Tarreau87b09662015-04-03 00:22:06 +02001063static void peer_session_forceshutdown(struct stream * stream)
Emeric Brun2b920a12010-09-23 18:30:22 +02001064{
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001065 struct appctx *appctx = NULL;
1066 int i;
Emeric Brun2b920a12010-09-23 18:30:22 +02001067
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001068 for (i = 0; i <= 1; i++) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001069 appctx = objt_appctx(stream->si[i].end);
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001070 if (!appctx)
1071 continue;
1072 if (appctx->applet != &peer_applet)
1073 continue;
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001074 break;
Emeric Brun2b920a12010-09-23 18:30:22 +02001075 }
1076
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001077 if (!appctx)
1078 return;
1079
Emeric Brun2b920a12010-09-23 18:30:22 +02001080 /* call release to reinit resync states if needed */
Willy Tarreau00a37f02015-04-13 12:05:19 +02001081 peer_session_release(appctx);
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001082 appctx->st0 = PEER_SESS_ST_END;
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001083 appctx->ctx.peers.ptr = NULL;
Willy Tarreau87b09662015-04-03 00:22:06 +02001084 task_wakeup(stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001085}
1086
Willy Tarreau91d96282015-03-13 15:47:26 +01001087/* Pre-configures a peers frontend to accept incoming connections */
1088void peers_setup_frontend(struct proxy *fe)
1089{
1090 fe->last_change = now.tv_sec;
1091 fe->cap = PR_CAP_FE;
1092 fe->maxconn = 0;
1093 fe->conn_retries = CONN_RETRIES;
1094 fe->timeout.client = MS_TO_TICKS(5000);
Willy Tarreaud1d48d42015-03-13 16:15:46 +01001095 fe->accept = frontend_accept;
Willy Tarreauf87ab942015-03-13 15:55:16 +01001096 fe->default_target = &peer_applet.obj_type;
Willy Tarreau91d96282015-03-13 15:47:26 +01001097 fe->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
1098}
1099
Emeric Brun2b920a12010-09-23 18:30:22 +02001100/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01001101 * Create a new peer session in assigned state (connect will start automatically)
Emeric Brun2b920a12010-09-23 18:30:22 +02001102 */
Willy Tarreau87b09662015-04-03 00:22:06 +02001103static struct stream *peer_session_create(struct peer *peer, struct peer_session *ps)
Emeric Brun2b920a12010-09-23 18:30:22 +02001104{
Willy Tarreau4348fad2012-09-20 16:48:07 +02001105 struct listener *l = LIST_NEXT(&peer->peers->peers_fe->conf.listeners, struct listener *, by_fe);
Emeric Brun2b920a12010-09-23 18:30:22 +02001106 struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001107 struct appctx *appctx;
Willy Tarreau15b5e142015-04-04 14:38:25 +02001108 struct session *sess;
Willy Tarreau87b09662015-04-03 00:22:06 +02001109 struct stream *s;
Emeric Brun2b920a12010-09-23 18:30:22 +02001110 struct task *t;
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001111 struct connection *conn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001112
Willy Tarreaud990baf2015-04-05 00:32:03 +02001113 ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
1114 ps->statuscode = PEER_SESS_SC_CONNECTCODE;
1115 s = NULL;
1116
1117 appctx = appctx_new(&peer_applet);
1118 if (!appctx)
1119 goto out_close;
1120
1121 appctx->st0 = PEER_SESS_ST_CONNECT;
1122 appctx->ctx.peers.ptr = (void *)ps;
1123
Willy Tarreau4099a7c2015-04-05 00:39:55 +02001124 sess = session_new(p, l, &appctx->obj_type);
Willy Tarreau15b5e142015-04-04 14:38:25 +02001125 if (!sess) {
Godbach430f2912013-06-20 13:28:38 +08001126 Alert("out of memory in peer_session_create().\n");
Willy Tarreaud990baf2015-04-05 00:32:03 +02001127 goto out_free_appctx;
Emeric Brun2b920a12010-09-23 18:30:22 +02001128 }
1129
Willy Tarreau8baf9062015-04-05 00:46:36 +02001130 if ((t = task_new()) == NULL) {
Willy Tarreau15b5e142015-04-04 14:38:25 +02001131 Alert("out of memory in peer_session_create().\n");
1132 goto out_free_sess;
1133 }
Willy Tarreau8baf9062015-04-05 00:46:36 +02001134 t->nice = l->nice;
1135
Willy Tarreau73b65ac2015-04-08 18:26:29 +02001136 if ((s = stream_new(sess, t, &appctx->obj_type)) == NULL) {
Willy Tarreau342bfb12015-04-05 01:35:34 +02001137 Alert("Failed to initialize stream in peer_session_create().\n");
Willy Tarreau8baf9062015-04-05 00:46:36 +02001138 goto out_free_task;
1139 }
1140
Willy Tarreau342bfb12015-04-05 01:35:34 +02001141 /* The tasks below are normally what is supposed to be done by
1142 * fe->accept().
1143 */
Willy Tarreaue7dff022015-04-03 01:14:29 +02001144 s->flags = SF_ASSIGNED|SF_ADDR_SET;
Emeric Brun2b920a12010-09-23 18:30:22 +02001145
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001146 /* initiate an outgoing connection */
1147 si_set_state(&s->si[1], SI_ST_ASS);
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001148
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001149 /* automatically prepare the stream interface to connect to the
Willy Tarreaub363a1f2013-10-01 10:45:07 +02001150 * pre-initialized connection in si->conn.
1151 */
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001152 if (unlikely((conn = conn_new()) == NULL))
Willy Tarreau8baf9062015-04-05 00:46:36 +02001153 goto out_free_strm;
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001154
1155 conn_prepare(conn, peer->proto, peer->xprt);
1156 si_attach_conn(&s->si[1], conn);
1157
1158 conn->target = s->target = &s->be->obj_type;
1159 memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
Emeric Brun2b920a12010-09-23 18:30:22 +02001160 s->do_log = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001161 s->uniq_id = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +02001162
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001163 s->res.flags |= CF_READ_DONTWAIT;
Willy Tarreau696a2912014-11-24 11:36:57 +01001164
Emeric Brun2b920a12010-09-23 18:30:22 +02001165 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
1166 p->feconn++;/* beconn will be increased later */
1167 jobs++;
Willy Tarreaufb0afa72015-04-03 14:46:27 +02001168 if (!(s->sess->listener->options & LI_O_UNLIMITED))
Willy Tarreau3c63fd82011-09-07 18:00:47 +02001169 actconn++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001170 totalconn++;
1171
1172 return s;
1173
1174 /* Error unrolling */
Willy Tarreau15b5e142015-04-04 14:38:25 +02001175 out_free_strm:
Emeric Brun2b920a12010-09-23 18:30:22 +02001176 LIST_DEL(&s->list);
Willy Tarreau87b09662015-04-03 00:22:06 +02001177 pool_free2(pool2_stream, s);
Willy Tarreau8baf9062015-04-05 00:46:36 +02001178 out_free_task:
1179 task_free(t);
Willy Tarreau15b5e142015-04-04 14:38:25 +02001180 out_free_sess:
Willy Tarreau11c36242015-04-04 15:54:03 +02001181 session_free(sess);
Willy Tarreaud990baf2015-04-05 00:32:03 +02001182 out_free_appctx:
1183 appctx_free(appctx);
Emeric Brun2b920a12010-09-23 18:30:22 +02001184 out_close:
1185 return s;
1186}
1187
1188/*
1189 * Task processing function to manage re-connect and peer session
1190 * tasks wakeup on local update.
1191 */
Simon Horman96553772011-06-08 09:18:51 +09001192static struct task *process_peer_sync(struct task * task)
Emeric Brun2b920a12010-09-23 18:30:22 +02001193{
1194 struct shared_table *st = (struct shared_table *)task->context;
1195 struct peer_session *ps;
1196
1197 task->expire = TICK_ETERNITY;
1198
1199 if (!stopping) {
1200 /* Normal case (not soft stop)*/
1201 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) &&
1202 (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) &&
1203 !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) {
1204 /* Resync from local peer needed
1205 no peer was assigned for the lesson
1206 and no old local peer found
1207 or resync timeout expire */
1208
1209 /* flag no more resync from local, to try resync from remotes */
1210 st->flags |= SHTABLE_F_RESYNC_LOCAL;
1211
1212 /* reschedule a resync */
1213 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1214 }
1215
1216 /* For each session */
1217 for (ps = st->sessions; ps; ps = ps->next) {
1218 /* For each remote peers */
1219 if (!ps->peer->local) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001220 if (!ps->stream) {
1221 /* no active stream */
Emeric Brun2b920a12010-09-23 18:30:22 +02001222 if (ps->statuscode == 0 ||
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001223 ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
1224 ((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
1225 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
Emeric Brun2b920a12010-09-23 18:30:22 +02001226 tick_is_expired(ps->reconnect, now_ms))) {
1227 /* connection never tried
Willy Tarreau87b09662015-04-03 00:22:06 +02001228 * or previous stream established with success
1229 * or previous stream failed during connection
Emeric Brun2b920a12010-09-23 18:30:22 +02001230 * and reconnection timer is expired */
1231
1232 /* retry a connect */
Willy Tarreau87b09662015-04-03 00:22:06 +02001233 ps->stream = peer_session_create(ps->peer, ps);
Emeric Brun2b920a12010-09-23 18:30:22 +02001234 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001235 else if (ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
1236 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001237 /* If previous session failed during connection
1238 * but reconnection timer is not expired */
1239
1240 /* reschedule task for reconnect */
1241 task->expire = tick_first(task->expire, ps->reconnect);
1242 }
1243 /* else do nothing */
Willy Tarreau87b09662015-04-03 00:22:06 +02001244 } /* !ps->stream */
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001245 else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001246 /* current stream is active and established */
Emeric Brun2b920a12010-09-23 18:30:22 +02001247 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1248 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1249 !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
1250 /* Resync from a remote is needed
1251 * and no peer was assigned for lesson
1252 * and current peer may be up2date */
1253
1254 /* assign peer for the lesson */
1255 ps->flags |= PEER_F_LEARN_ASSIGN;
1256 st->flags |= SHTABLE_F_RESYNC_ASSIGN;
1257
Willy Tarreau87b09662015-04-03 00:22:06 +02001258 /* awake peer stream task to handle a request of resync */
1259 task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001260 }
1261 else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001262 /* awake peer stream task to push local updates */
1263 task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001264 }
1265 /* else do nothing */
1266 } /* SUCCESSCODE */
1267 } /* !ps->peer->local */
1268 } /* for */
1269
1270 /* Resync from remotes expired: consider resync is finished */
1271 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1272 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1273 tick_is_expired(st->resync_timeout, now_ms)) {
1274 /* Resync from remote peer needed
1275 * no peer was assigned for the lesson
1276 * and resync timeout expire */
1277
1278 /* flag no more resync from remote, consider resync is finished */
1279 st->flags |= SHTABLE_F_RESYNC_REMOTE;
1280 }
1281
1282 if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) {
1283 /* Resync not finished*/
1284 /* reschedule task to resync timeout, to ended resync if needed */
1285 task->expire = tick_first(task->expire, st->resync_timeout);
1286 }
1287 } /* !stopping */
1288 else {
1289 /* soft stop case */
1290 if (task->state & TASK_WOKEN_SIGNAL) {
1291 /* We've just recieved the signal */
1292 if (!(st->flags & SHTABLE_F_DONOTSTOP)) {
1293 /* add DO NOT STOP flag if not present */
1294 jobs++;
1295 st->flags |= SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001296 st->table->syncing++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001297 }
1298
1299 /* disconnect all connected peers */
1300 for (ps = st->sessions; ps; ps = ps->next) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001301 if (ps->stream) {
1302 peer_session_forceshutdown(ps->stream);
1303 ps->stream = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001304 }
1305 }
1306 }
1307 ps = st->local_session;
1308
1309 if (ps->flags & PEER_F_TEACH_COMPLETE) {
1310 if (st->flags & SHTABLE_F_DONOTSTOP) {
1311 /* resync of new process was complete, current process can die now */
1312 jobs--;
1313 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001314 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001315 }
1316 }
Willy Tarreau87b09662015-04-03 00:22:06 +02001317 else if (!ps->stream) {
1318 /* If stream is not active */
Emeric Brun2b920a12010-09-23 18:30:22 +02001319 if (ps->statuscode == 0 ||
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001320 ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
1321 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
1322 ps->statuscode == PEER_SESS_SC_TRYAGAIN) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001323 /* connection never tried
Willy Tarreau87b09662015-04-03 00:22:06 +02001324 * or previous stream was successfully established
1325 * or previous stream tcp connect success but init state incomplete
Emeric Brun2b920a12010-09-23 18:30:22 +02001326 * or during previous connect, peer replies a try again statuscode */
1327
1328 /* connect to the peer */
Willy Tarreau87b09662015-04-03 00:22:06 +02001329 ps->stream = peer_session_create(ps->peer, ps);
Emeric Brun2b920a12010-09-23 18:30:22 +02001330 }
1331 else {
1332 /* Other error cases */
1333 if (st->flags & SHTABLE_F_DONOTSTOP) {
1334 /* unable to resync new process, current process can die now */
1335 jobs--;
1336 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001337 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001338 }
1339 }
1340 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001341 else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE &&
Emeric Brun2b920a12010-09-23 18:30:22 +02001342 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001343 /* current stream active and established
1344 awake stream to push remaining local updates */
1345 task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001346 }
1347 } /* stopping */
1348 /* Wakeup for re-connect */
1349 return task;
1350}
1351
1352/*
1353 * Function used to register a table for sync on a group of peers
1354 *
1355 */
1356void peers_register_table(struct peers *peers, struct stktable *table)
1357{
1358 struct shared_table *st;
1359 struct peer * curpeer;
1360 struct peer_session *ps;
Willy Tarreau4348fad2012-09-20 16:48:07 +02001361 struct listener *listener;
Emeric Brun2b920a12010-09-23 18:30:22 +02001362
1363 st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
1364 st->table = table;
1365 st->next = peers->tables;
1366 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1367 peers->tables = st;
1368
1369 for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
1370 ps = (struct peer_session *)calloc(1,sizeof(struct peer_session));
1371 ps->table = st;
1372 ps->peer = curpeer;
1373 if (curpeer->local)
1374 st->local_session = ps;
1375 ps->next = st->sessions;
1376 ps->reconnect = now_ms;
1377 st->sessions = ps;
1378 peers->peers_fe->maxconn += 3;
1379 }
1380
Willy Tarreau4348fad2012-09-20 16:48:07 +02001381 list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
1382 listener->maxconn = peers->peers_fe->maxconn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001383 st->sync_task = task_new();
1384 st->sync_task->process = process_peer_sync;
1385 st->sync_task->expire = TICK_ETERNITY;
1386 st->sync_task->context = (void *)st;
1387 table->sync_task =st->sync_task;
1388 signal_register_task(0, table->sync_task, 0);
1389 task_wakeup(st->sync_task, TASK_WOKEN_INIT);
1390}
1391