blob: d637e88d6e53e57114b00912b64ba4cd5bbb9772 [file] [log] [blame]
Emeric Brun2b920a12010-09-23 18:30:22 +02001/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01002 * Stick table synchro management.
Emeric Brun2b920a12010-09-23 18:30:22 +02003 *
4 * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <sys/socket.h>
20#include <sys/stat.h>
21#include <sys/types.h>
22
23#include <common/compat.h>
24#include <common/config.h>
25#include <common/time.h>
26
27#include <types/global.h>
Willy Tarreau3fdb3662012-11-12 00:42:33 +010028#include <types/listener.h>
29#include <types/obj_type.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020030#include <types/peers.h>
31
32#include <proto/acl.h>
Willy Tarreau8a8d83b2015-04-13 13:24:54 +020033#include <proto/applet.h>
Willy Tarreauc7e42382012-08-24 19:22:53 +020034#include <proto/channel.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020035#include <proto/fd.h>
Willy Tarreaud1d48d42015-03-13 16:15:46 +010036#include <proto/frontend.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020037#include <proto/log.h>
38#include <proto/hdr_idx.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020039#include <proto/proto_tcp.h>
40#include <proto/proto_http.h>
41#include <proto/proxy.h>
Willy Tarreaufeb76402015-04-03 14:10:06 +020042#include <proto/session.h>
Willy Tarreau87b09662015-04-03 00:22:06 +020043#include <proto/stream.h>
Willy Tarreau22ec1ea2014-11-27 20:45:39 +010044#include <proto/signal.h>
45#include <proto/stick_table.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020046#include <proto/stream_interface.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020047#include <proto/task.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020048
49
50/*******************************/
51/* Current peer learning state */
52/*******************************/
53
54/******************************/
55/* Current table resync state */
56/******************************/
57#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
58#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
59#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
60#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
61#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
62 to push data to new process */
63
64#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
65#define SHTABLE_RESYNC_FROMLOCAL 0x00000000
66#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL
67#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
68
69/******************************/
70/* Remote peer teaching state */
71/******************************/
72#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
73#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */
74#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */
75#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
76#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
77#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
78#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
79
80#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
81#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
82
83
84/**********************************/
85/* Peer Session IO handler states */
86/**********************************/
87
Willy Tarreaue4d927a2013-12-01 12:47:35 +010088enum {
89 PEER_SESS_ST_ACCEPT = 0, /* Initial state for session create by an accept, must be zero! */
90 PEER_SESS_ST_GETVERSION, /* Validate supported protocol version */
91 PEER_SESS_ST_GETHOST, /* Validate host ID correspond to local host id */
92 PEER_SESS_ST_GETPEER, /* Validate peer ID correspond to a known remote peer id */
93 PEER_SESS_ST_GETTABLE, /* Search into registered table for a table with same id and validate type and size */
94 /* after this point, data were possibly exchanged */
95 PEER_SESS_ST_SENDSUCCESS, /* Send ret code 200 (success) and wait for message */
96 PEER_SESS_ST_CONNECT, /* Initial state for session create on a connect, push presentation into buffer */
97 PEER_SESS_ST_GETSTATUS, /* Wait for the welcome message */
98 PEER_SESS_ST_WAITMSG, /* Wait for data messages */
99 PEER_SESS_ST_EXIT, /* Exit with status code */
100 PEER_SESS_ST_END, /* Killed session */
101};
Emeric Brun2b920a12010-09-23 18:30:22 +0200102
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100103/***************************************************/
104/* Peer Session status code - part of the protocol */
105/***************************************************/
Emeric Brun2b920a12010-09-23 18:30:22 +0200106
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100107#define PEER_SESS_SC_CONNECTCODE 100 /* connect in progress */
108#define PEER_SESS_SC_CONNECTEDCODE 110 /* tcp connect success */
Emeric Brun2b920a12010-09-23 18:30:22 +0200109
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100110#define PEER_SESS_SC_SUCCESSCODE 200 /* accept or connect successful */
Emeric Brun2b920a12010-09-23 18:30:22 +0200111
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100112#define PEER_SESS_SC_TRYAGAIN 300 /* try again later */
Emeric Brun2b920a12010-09-23 18:30:22 +0200113
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100114#define PEER_SESS_SC_ERRPROTO 501 /* error protocol */
115#define PEER_SESS_SC_ERRVERSION 502 /* unknown protocol version */
116#define PEER_SESS_SC_ERRHOST 503 /* bad host name */
117#define PEER_SESS_SC_ERRPEER 504 /* unknown peer */
118#define PEER_SESS_SC_ERRTYPE 505 /* table key type mismatch */
119#define PEER_SESS_SC_ERRSIZE 506 /* table key size mismatch */
120#define PEER_SESS_SC_ERRTABLE 507 /* unknown table */
Emeric Brun2b920a12010-09-23 18:30:22 +0200121
122#define PEER_SESSION_PROTO_NAME "HAProxyS"
123
124struct peers *peers = NULL;
Willy Tarreau87b09662015-04-03 00:22:06 +0200125static void peer_session_forceshutdown(struct stream * stream);
Emeric Brun2b920a12010-09-23 18:30:22 +0200126
127
128/*
129 * This prepare the data update message of the stick session <ts>, <ps> is the the peer session
130 * where the data going to be pushed, <msg> is a buffer of <size> to recieve data message content
131 */
Simon Horman96553772011-06-08 09:18:51 +0900132static int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size)
Emeric Brun2b920a12010-09-23 18:30:22 +0200133{
134 uint32_t netinteger;
135 int len;
136 /* construct message */
137 if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) {
138 msg[0] = 0x80 + ts->upd.key - ps->lastpush;
139 len = sizeof(char);
140 }
141 else {
142 msg[0] = 'D';
143 netinteger = htonl(ts->upd.key);
144 memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger));
145 len = sizeof(char) + sizeof(netinteger);
146 }
147
148 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
149 int stlen = strlen((char *)ts->key.key);
150
151 netinteger = htonl(strlen((char *)ts->key.key));
152 memcpy(&msg[len], &netinteger, sizeof(netinteger));
153 memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen);
154 len += sizeof(netinteger) + stlen;
155
156 }
157 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
158 netinteger = htonl(*((uint32_t *)ts->key.key));
159 memcpy(&msg[len], &netinteger, sizeof(netinteger));
160 len += sizeof(netinteger);
161 }
162 else {
163 memcpy(&msg[len], ts->key.key, ps->table->table->key_size);
164 len += ps->table->table->key_size;
165 }
166
167 if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
168 netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id));
169 else
170 netinteger = 0;
171
172 memcpy(&msg[len], &netinteger , sizeof(netinteger));
173 len += sizeof(netinteger);
174
175 return len;
176}
177
178
179/*
180 * Callback to release a session with a peer
181 */
Willy Tarreau00a37f02015-04-13 12:05:19 +0200182static void peer_session_release(struct appctx *appctx)
Emeric Brun2b920a12010-09-23 18:30:22 +0200183{
Willy Tarreau00a37f02015-04-13 12:05:19 +0200184 struct stream_interface *si = appctx->owner;
Willy Tarreau87b09662015-04-03 00:22:06 +0200185 struct stream *s = si_strm(si);
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100186 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200187
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100188 /* appctx->ctx.peers.ptr is not a peer session */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100189 if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
Emeric Brun2b920a12010-09-23 18:30:22 +0200190 return;
191
192 /* peer session identified */
193 if (ps) {
Willy Tarreau87b09662015-04-03 00:22:06 +0200194 if (ps->stream == s) {
195 ps->stream = NULL;
Willy Tarreaue5843b32015-04-27 18:40:14 +0200196 ps->appctx = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200197 if (ps->flags & PEER_F_LEARN_ASSIGN) {
198 /* unassign current peer for learning */
199 ps->flags &= ~(PEER_F_LEARN_ASSIGN);
200 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
201
202 /* reschedule a resync */
203 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
204 }
205 /* reset teaching and learning flags to 0 */
206 ps->flags &= PEER_TEACH_RESET;
207 ps->flags &= PEER_LEARN_RESET;
208 }
209 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
210 }
211}
212
213
214/*
215 * IO Handler to handle message exchance with a peer
216 */
Willy Tarreau00a37f02015-04-13 12:05:19 +0200217static void peer_io_handler(struct appctx *appctx)
Emeric Brun2b920a12010-09-23 18:30:22 +0200218{
Willy Tarreau00a37f02015-04-13 12:05:19 +0200219 struct stream_interface *si = appctx->owner;
Willy Tarreau87b09662015-04-03 00:22:06 +0200220 struct stream *s = si_strm(si);
Willy Tarreaud0d8da92015-04-04 02:10:38 +0200221 struct peers *curpeers = (struct peers *)strm_fe(s)->parent;
Emeric Brun2b920a12010-09-23 18:30:22 +0200222 int reql = 0;
223 int repl = 0;
224
225 while (1) {
226switchstate:
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100227 switch(appctx->st0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100228 case PEER_SESS_ST_ACCEPT:
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100229 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100230 appctx->st0 = PEER_SESS_ST_GETVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200231 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100232 case PEER_SESS_ST_GETVERSION:
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100233 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200234 if (reql <= 0) { /* closed or EOL not found */
235 if (reql == 0)
236 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100237 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200238 goto switchstate;
239 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100240 if (trash.str[reql-1] != '\n') {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100241 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200242 goto switchstate;
243 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100244 else if (reql > 1 && (trash.str[reql-2] == '\r'))
245 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200246 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100247 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200248
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100249 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200250
251 /* test version */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100252 if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash.str) != 0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100253 appctx->st0 = PEER_SESS_ST_EXIT;
254 appctx->st1 = PEER_SESS_SC_ERRVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200255 /* test protocol */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100256 if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100257 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200258 goto switchstate;
259 }
260
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100261 appctx->st0 = PEER_SESS_ST_GETHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200262 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100263 case PEER_SESS_ST_GETHOST:
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100264 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200265 if (reql <= 0) { /* closed or EOL not found */
266 if (reql == 0)
267 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100268 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200269 goto switchstate;
270 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100271 if (trash.str[reql-1] != '\n') {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100272 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200273 goto switchstate;
274 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100275 else if (reql > 1 && (trash.str[reql-2] == '\r'))
276 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200277 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100278 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200279
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100280 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200281
282 /* test hostname match */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100283 if (strcmp(localpeer, trash.str) != 0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100284 appctx->st0 = PEER_SESS_ST_EXIT;
285 appctx->st1 = PEER_SESS_SC_ERRHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200286 goto switchstate;
287 }
288
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100289 appctx->st0 = PEER_SESS_ST_GETPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200290 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100291 case PEER_SESS_ST_GETPEER: {
Emeric Brun2b920a12010-09-23 18:30:22 +0200292 struct peer *curpeer;
293 char *p;
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100294 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200295 if (reql <= 0) { /* closed or EOL not found */
296 if (reql == 0)
297 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100298 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200299 goto switchstate;
300 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100301 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200302 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100303 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200304 goto switchstate;
305 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100306 else if (reql > 1 && (trash.str[reql-2] == '\r'))
307 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200308 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100309 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200310
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100311 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200312
313 /* parse line "<peer name> <pid>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100314 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200315 if (!p) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100316 appctx->st0 = PEER_SESS_ST_EXIT;
317 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200318 goto switchstate;
319 }
320 *p = 0;
321
322 /* lookup known peer */
323 for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) {
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100324 if (strcmp(curpeer->id, trash.str) == 0)
Emeric Brun2b920a12010-09-23 18:30:22 +0200325 break;
326 }
327
328 /* if unknown peer */
329 if (!curpeer) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100330 appctx->st0 = PEER_SESS_ST_EXIT;
331 appctx->st1 = PEER_SESS_SC_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200332 goto switchstate;
333 }
334
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100335 appctx->ctx.peers.ptr = curpeer;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100336 appctx->st0 = PEER_SESS_ST_GETTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200337 /* fall through */
338 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100339 case PEER_SESS_ST_GETTABLE: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100340 struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200341 struct shared_table *st;
342 struct peer_session *ps = NULL;
343 unsigned long key_type;
344 size_t key_size;
345 char *p;
346
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100347 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200348 if (reql <= 0) { /* closed or EOL not found */
349 if (reql == 0)
350 goto out;
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100351 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100352 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200353 goto switchstate;
354 }
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100355 /* Re init appctx->ctx.peers.ptr to null, to handle correctly a release case */
356 appctx->ctx.peers.ptr = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200357
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100358 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200359 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100360 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200361 goto switchstate;
362 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100363 else if (reql > 1 && (trash.str[reql-2] == '\r'))
364 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200365 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100366 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200367
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100368 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200369
370 /* Parse line "<table name> <type> <size>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100371 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200372 if (!p) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100373 appctx->st0 = PEER_SESS_ST_EXIT;
374 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200375 goto switchstate;
376 }
377 *p = 0;
378 key_type = (unsigned long)atol(p+1);
379
380 p = strchr(p+1, ' ');
381 if (!p) {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100382 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100383 appctx->st0 = PEER_SESS_ST_EXIT;
384 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200385 goto switchstate;
386 }
387
388 key_size = (size_t)atoi(p);
389 for (st = curpeers->tables; st; st = st->next) {
390 /* If table name matches */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100391 if (strcmp(st->table->id, trash.str) == 0) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100392 /* Check key size mismatches, except for strings
393 * which may be truncated as long as they fit in
394 * a buffer.
395 */
396 if (key_size != st->table->key_size &&
397 (key_type != STKTABLE_TYPE_STRING ||
398 1 + 4 + 4 + key_size - 1 >= trash.size)) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100399 appctx->st0 = PEER_SESS_ST_EXIT;
400 appctx->st1 = PEER_SESS_SC_ERRSIZE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200401 goto switchstate;
402 }
403
404 /* If key type mismatches */
405 if (key_type != st->table->type) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100406 appctx->st0 = PEER_SESS_ST_EXIT;
407 appctx->st1 = PEER_SESS_SC_ERRTYPE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200408 goto switchstate;
409 }
410
Willy Tarreau87b09662015-04-03 00:22:06 +0200411 /* lookup peer stream of current peer */
Emeric Brun2b920a12010-09-23 18:30:22 +0200412 for (ps = st->sessions; ps; ps = ps->next) {
413 if (ps->peer == curpeer) {
Willy Tarreau87b09662015-04-03 00:22:06 +0200414 /* If stream already active, replaced by new one */
415 if (ps->stream && ps->stream != s) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200416 if (ps->peer->local) {
417 /* Local connection, reply a retry */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100418 appctx->st0 = PEER_SESS_ST_EXIT;
419 appctx->st1 = PEER_SESS_SC_TRYAGAIN;
Emeric Brun2b920a12010-09-23 18:30:22 +0200420 goto switchstate;
421 }
Willy Tarreau87b09662015-04-03 00:22:06 +0200422 peer_session_forceshutdown(ps->stream);
Emeric Brun2b920a12010-09-23 18:30:22 +0200423 }
Willy Tarreau87b09662015-04-03 00:22:06 +0200424 ps->stream = s;
Willy Tarreaue5843b32015-04-27 18:40:14 +0200425 ps->appctx = appctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200426 break;
427 }
428 }
429 break;
430 }
431 }
432
433 /* If table not found */
434 if (!st){
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100435 appctx->st0 = PEER_SESS_ST_EXIT;
436 appctx->st1 = PEER_SESS_SC_ERRTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200437 goto switchstate;
438 }
439
440 /* If no peer session for current peer */
441 if (!ps) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100442 appctx->st0 = PEER_SESS_ST_EXIT;
443 appctx->st1 = PEER_SESS_SC_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200444 goto switchstate;
445 }
446
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100447 appctx->ctx.peers.ptr = ps;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100448 appctx->st0 = PEER_SESS_ST_SENDSUCCESS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200449 /* fall through */
450 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100451 case PEER_SESS_ST_SENDSUCCESS: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100452 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200453
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100454 repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100455 repl = bi_putblk(si_ic(si), trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200456 if (repl <= 0) {
457 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100458 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100459 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200460 goto switchstate;
461 }
462
463 /* Register status code */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100464 ps->statuscode = PEER_SESS_SC_SUCCESSCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200465
466 /* Awake main task */
467 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
468
469 /* Init cursors */
470 ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0;
471 ps->pushed = ps->update;
472
473 /* Init confirm counter */
474 ps->confirm = 0;
475
476 /* reset teaching and learning flags to 0 */
477 ps->flags &= PEER_TEACH_RESET;
478 ps->flags &= PEER_LEARN_RESET;
479
480 /* if current peer is local */
481 if (ps->peer->local) {
482 /* if table need resyncfrom local and no process assined */
483 if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL &&
484 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
485 /* assign local peer for a lesson, consider lesson already requested */
486 ps->flags |= PEER_F_LEARN_ASSIGN;
487 ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
488 }
489
490 }
491 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
492 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
493 /* assign peer for a lesson */
494 ps->flags |= PEER_F_LEARN_ASSIGN;
495 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
496 }
497 /* switch to waiting message state */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100498 appctx->st0 = PEER_SESS_ST_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200499 goto switchstate;
500 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100501 case PEER_SESS_ST_CONNECT: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100502 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200503
504 /* Send headers */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100505 repl = snprintf(trash.str, trash.size,
Emeric Brun2b920a12010-09-23 18:30:22 +0200506 PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n",
507 ps->peer->id,
508 localpeer,
Willy Tarreau7b77c9f2012-01-07 22:52:12 +0100509 (int)getpid(),
Emeric Brun2b920a12010-09-23 18:30:22 +0200510 ps->table->table->id,
511 ps->table->table->type,
Willy Tarreaubd55e312010-11-11 10:55:09 +0100512 (int)ps->table->table->key_size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200513
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100514 if (repl >= trash.size) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100515 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200516 goto switchstate;
517 }
518
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100519 repl = bi_putblk(si_ic(si), trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200520 if (repl <= 0) {
521 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100522 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100523 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200524 goto switchstate;
525 }
526
527 /* switch to the waiting statuscode state */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100528 appctx->st0 = PEER_SESS_ST_GETSTATUS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200529 /* fall through */
530 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100531 case PEER_SESS_ST_GETSTATUS: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100532 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200533
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100534 if (si_ic(si)->flags & CF_WRITE_PARTIAL)
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100535 ps->statuscode = PEER_SESS_SC_CONNECTEDCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200536
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100537 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200538 if (reql <= 0) { /* closed or EOL not found */
539 if (reql == 0)
540 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100541 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200542 goto switchstate;
543 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100544 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200545 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100546 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200547 goto switchstate;
548 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100549 else if (reql > 1 && (trash.str[reql-2] == '\r'))
550 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200551 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100552 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200553
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100554 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200555
556 /* Register status code */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100557 ps->statuscode = atoi(trash.str);
Emeric Brun2b920a12010-09-23 18:30:22 +0200558
559 /* Awake main task */
560 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
561
562 /* If status code is success */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100563 if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200564 /* Init cursors */
565 ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0;
566 ps->pushed = ps->update;
567
568 /* Init confirm counter */
569 ps->confirm = 0;
570
571 /* reset teaching and learning flags to 0 */
572 ps->flags &= PEER_TEACH_RESET;
573 ps->flags &= PEER_LEARN_RESET;
574
575 /* If current peer is local */
576 if (ps->peer->local) {
577 /* Init cursors to push a resync */
578 ps->teaching_origin = ps->pushed = ps->table->table->update;
579 /* flag to start to teach lesson */
580 ps->flags |= PEER_F_TEACH_PROCESS;
581
582 }
583 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
584 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
585 /* If peer is remote and resync from remote is needed,
586 and no peer currently assigned */
587
588 /* assign peer for a lesson */
589 ps->flags |= PEER_F_LEARN_ASSIGN;
590 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
591 }
592
593 }
594 else {
595 /* Status code is not success, abort */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100596 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200597 goto switchstate;
598 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100599 appctx->st0 = PEER_SESS_ST_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200600 /* fall through */
601 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100602 case PEER_SESS_ST_WAITMSG: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100603 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200604 struct stksess *ts, *newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200605 char c;
606 int totl = 0;
607
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100608 reql = bo_getblk(si_oc(si), (char *)&c, sizeof(c), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200609 if (reql <= 0) /* closed or EOL not found */
610 goto incomplete;
611
Emeric Brun2b920a12010-09-23 18:30:22 +0200612 totl += reql;
613
614 if ((c & 0x80) || (c == 'D')) {
615 /* Here we have data message */
616 unsigned int pushack;
Emeric Brun2b920a12010-09-23 18:30:22 +0200617 int srvid;
618 uint32_t netinteger;
619
620 /* Compute update remote version */
621 if (c & 0x80) {
622 pushack = ps->pushack + (unsigned int)(c & 0x7F);
623 }
624 else {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100625 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200626 if (reql <= 0) /* closed or EOL not found */
627 goto incomplete;
628
Emeric Brun2b920a12010-09-23 18:30:22 +0200629 totl += reql;
630 pushack = ntohl(netinteger);
631 }
632
Willy Tarreau86a446e2013-11-25 23:02:37 +0100633 /* Read key. The string keys are read in two steps, the first step
634 * consists in reading whatever fits into the table directly into
635 * the pre-allocated key. The second step consists in simply
636 * draining all exceeding data. This can happen for example after a
637 * config reload with a smaller key size for the stick table than
638 * what was previously set, or when facing the impossibility to
639 * allocate a new stksess (for example when the table is full with
640 * "nopurge").
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200641 */
Emeric Brun2b920a12010-09-23 18:30:22 +0200642 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100643 unsigned int to_read, to_store;
644
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200645 /* read size first */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100646 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200647 if (reql <= 0) /* closed or EOL not found */
648 goto incomplete;
649
Emeric Brun2b920a12010-09-23 18:30:22 +0200650 totl += reql;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100651
652 to_store = 0;
653 to_read = ntohl(netinteger);
654
Willy Tarreau4e4292b2014-11-28 12:18:45 +0100655 if (to_read + totl > si_ob(si)->size) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100656 /* impossible to read a key this large, abort */
657 reql = -1;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200658 goto incomplete;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100659 }
Willy Tarreau72d6c162013-04-11 16:14:13 +0200660
Willy Tarreau86a446e2013-11-25 23:02:37 +0100661 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200662 if (newts)
Willy Tarreau86a446e2013-11-25 23:02:37 +0100663 to_store = MIN(to_read, ps->table->table->key_size - 1);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200664
Willy Tarreau86a446e2013-11-25 23:02:37 +0100665 /* we read up to two blocks, the first one goes into the key,
666 * the rest is drained into the trash.
667 */
668 if (to_store) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100669 reql = bo_getblk(si_oc(si), (char *)newts->key.key, to_store, totl);
Willy Tarreau86a446e2013-11-25 23:02:37 +0100670 if (reql <= 0) /* closed or incomplete */
671 goto incomplete;
672 newts->key.key[reql] = 0;
673 totl += reql;
674 to_read -= reql;
675 }
676 if (to_read) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100677 reql = bo_getblk(si_oc(si), trash.str, to_read, totl);
Willy Tarreau86a446e2013-11-25 23:02:37 +0100678 if (reql <= 0) /* closed or incomplete */
679 goto incomplete;
680 totl += reql;
681 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200682 }
683 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100684 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200685 if (reql <= 0) /* closed or EOL not found */
686 goto incomplete;
Cyril Bonté9a60ff92014-02-16 01:07:07 +0100687 newts = stksess_new(ps->table->table, NULL);
688 if (newts) {
689 netinteger = ntohl(netinteger);
690 memcpy(newts->key.key, &netinteger, sizeof(netinteger));
691 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200692 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200693 }
694 else {
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200695 /* type ip or binary */
696 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100697 reql = bo_getblk(si_oc(si), newts ? (char *)newts->key.key : trash.str, ps->table->table->key_size, totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200698 if (reql <= 0) /* closed or EOL not found */
699 goto incomplete;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200700 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200701 }
702
703 /* read server id */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100704 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200705 if (reql <= 0) /* closed or EOL not found */
706 goto incomplete;
707
Emeric Brun2b920a12010-09-23 18:30:22 +0200708 totl += reql;
709 srvid = ntohl(netinteger);
710
711 /* update entry */
Emeric Brun2b920a12010-09-23 18:30:22 +0200712 if (newts) {
713 /* lookup for existing entry */
714 ts = stktable_lookup(ps->table->table, newts);
715 if (ts) {
716 /* the entry already exist, we can free ours */
717 stktable_touch(ps->table->table, ts, 0);
718 stksess_free(ps->table->table, newts);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200719 newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200720 }
721 else {
722 struct eb32_node *eb;
723
724 /* create new entry */
725 ts = stktable_store(ps->table->table, newts, 0);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200726 newts = NULL; /* don't reuse it */
727
Emeric Brun2b920a12010-09-23 18:30:22 +0200728 ts->upd.key= (++ps->table->table->update)+(2^31);
729 eb = eb32_insert(&ps->table->table->updates, &ts->upd);
730 if (eb != &ts->upd) {
731 eb32_delete(eb);
732 eb32_insert(&ps->table->table->updates, &ts->upd);
733 }
734 }
735
736 /* update entry */
737 if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
738 stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
739 ps->pushack = pushack;
740 }
741
742 }
743 else if (c == 'R') {
744 /* Reset message: remote need resync */
745
746 /* reinit counters for a resync */
747 ps->lastpush = 0;
748 ps->teaching_origin = ps->pushed = ps->table->table->update;
749
750 /* reset teaching flags to 0 */
751 ps->flags &= PEER_TEACH_RESET;
752
753 /* flag to start to teach lesson */
754 ps->flags |= PEER_F_TEACH_PROCESS;
755 }
756 else if (c == 'F') {
757 /* Finish message, all known updates have been pushed by remote */
758 /* and remote is up to date */
759
760 /* If resync is in progress with remote peer */
761 if (ps->flags & PEER_F_LEARN_ASSIGN) {
762
763 /* unassign current peer for learning */
764 ps->flags &= ~PEER_F_LEARN_ASSIGN;
765 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
766
767 /* Consider table is now up2date, resync resync no more needed from local neither remote */
768 ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE);
769 }
770 /* Increase confirm counter to launch a confirm message */
771 ps->confirm++;
772 }
773 else if (c == 'c') {
774 /* confirm message, remote peer is now up to date with us */
775
776 /* If stopping state */
777 if (stopping) {
778 /* Close session, push resync no more needed */
779 ps->flags |= PEER_F_TEACH_COMPLETE;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100780 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200781 goto switchstate;
782 }
783
784 /* reset teaching flags to 0 */
785 ps->flags &= PEER_TEACH_RESET;
786 }
787 else if (c == 'C') {
788 /* Continue message, all known updates have been pushed by remote */
789 /* but remote is not up to date */
790
791 /* If resync is in progress with current peer */
792 if (ps->flags & PEER_F_LEARN_ASSIGN) {
793
794 /* unassign current peer */
795 ps->flags &= ~PEER_F_LEARN_ASSIGN;
796 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
797
798 /* flag current peer is not up 2 date to try from an other */
799 ps->flags |= PEER_F_LEARN_NOTUP2DATE;
800
801 /* reschedule a resync */
802 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
803 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
804 }
805 ps->confirm++;
806 }
807 else if (c == 'A') {
808 /* ack message */
809 uint32_t netinteger;
810
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100811 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200812 if (reql <= 0) /* closed or EOL not found */
813 goto incomplete;
814
Emeric Brun2b920a12010-09-23 18:30:22 +0200815 totl += reql;
816
817 /* Consider remote is up to date with "acked" version */
818 ps->update = ntohl(netinteger);
819 }
820 else {
821 /* Unknown message */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100822 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200823 goto switchstate;
824 }
825
826 /* skip consumed message */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100827 bo_skip(si_oc(si), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200828
829 /* loop on that state to peek next message */
Willy Tarreau72d6c162013-04-11 16:14:13 +0200830 goto switchstate;
831
Emeric Brun2b920a12010-09-23 18:30:22 +0200832incomplete:
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200833 /* we get here when a bo_getblk() returns <= 0 in reql */
834
835 /* first, we may have to release newts */
836 if (newts) {
837 stksess_free(ps->table->table, newts);
838 newts = NULL;
839 }
840
Willy Tarreau72d6c162013-04-11 16:14:13 +0200841 if (reql < 0) {
842 /* there was an error */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100843 appctx->st0 = PEER_SESS_ST_END;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200844 goto switchstate;
845 }
846
Emeric Brun2b920a12010-09-23 18:30:22 +0200847 /* Nothing to read, now we start to write */
848
849 /* Confirm finished or partial messages */
850 while (ps->confirm) {
851 /* There is a confirm messages to send */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100852 repl = bi_putchr(si_ic(si), 'c');
Emeric Brun2b920a12010-09-23 18:30:22 +0200853 if (repl <= 0) {
854 /* no more write possible */
855 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100856 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100857 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200858 goto switchstate;
859 }
860 ps->confirm--;
861 }
862
863 /* Need to request a resync */
864 if ((ps->flags & PEER_F_LEARN_ASSIGN) &&
865 (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) &&
866 !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
867 /* Current peer was elected to request a resync */
868
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100869 repl = bi_putchr(si_ic(si), 'R');
Emeric Brun2b920a12010-09-23 18:30:22 +0200870 if (repl <= 0) {
871 /* no more write possible */
872 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100873 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100874 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200875 goto switchstate;
876 }
877 ps->table->flags |= SHTABLE_F_RESYNC_PROCESS;
878 }
879
880 /* It remains some updates to ack */
881 if (ps->pushack != ps->lastack) {
882 uint32_t netinteger;
883
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100884 trash.str[0] = 'A';
Emeric Brun2b920a12010-09-23 18:30:22 +0200885 netinteger = htonl(ps->pushack);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100886 memcpy(&trash.str[1], &netinteger, sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200887
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100888 repl = bi_putblk(si_ic(si), trash.str, 1+sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200889 if (repl <= 0) {
890 /* no more write possible */
891 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100892 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100893 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200894 goto switchstate;
895 }
896 ps->lastack = ps->pushack;
897 }
898
899 if (ps->flags & PEER_F_TEACH_PROCESS) {
900 /* current peer was requested for a lesson */
901
902 if (!(ps->flags & PEER_F_TEACH_STAGE1)) {
903 /* lesson stage 1 not complete */
904 struct eb32_node *eb;
905
906 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
907 while (1) {
908 int msglen;
909 struct stksess *ts;
910
911 if (!eb) {
912 /* flag lesson stage1 complete */
913 ps->flags |= PEER_F_TEACH_STAGE1;
914 eb = eb32_first(&ps->table->table->updates);
915 if (eb)
916 ps->pushed = eb->key - 1;
917 break;
918 }
919
920 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100921 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200922 if (msglen) {
923 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100924 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200925 if (repl <= 0) {
926 /* no more write possible */
927 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100928 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100929 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200930 goto switchstate;
931 }
932 ps->lastpush = ps->pushed = ts->upd.key;
933 }
934 eb = eb32_next(eb);
935 }
936 } /* !TEACH_STAGE1 */
937
938 if (!(ps->flags & PEER_F_TEACH_STAGE2)) {
939 /* lesson stage 2 not complete */
940 struct eb32_node *eb;
941
942 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
943 while (1) {
944 int msglen;
945 struct stksess *ts;
946
947 if (!eb || eb->key > ps->teaching_origin) {
948 /* flag lesson stage1 complete */
949 ps->flags |= PEER_F_TEACH_STAGE2;
950 ps->pushed = ps->teaching_origin;
951 break;
952 }
953
954 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100955 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200956 if (msglen) {
957 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100958 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200959 if (repl <= 0) {
960 /* no more write possible */
961 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100962 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100963 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200964 goto switchstate;
965 }
966 ps->lastpush = ps->pushed = ts->upd.key;
967 }
968 eb = eb32_next(eb);
969 }
970 } /* !TEACH_STAGE2 */
971
972 if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
973 /* process final lesson message */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100974 repl = bi_putchr(si_ic(si), ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
Emeric Brun2b920a12010-09-23 18:30:22 +0200975 if (repl <= 0) {
976 /* no more write possible */
977 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100978 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100979 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200980 goto switchstate;
981 }
982
983 /* flag finished message sent */
984 ps->flags |= PEER_F_TEACH_FINISHED;
985 } /* !TEACH_FINISHED */
986 } /* TEACH_PROCESS */
987
988 if (!(ps->flags & PEER_F_LEARN_ASSIGN) &&
989 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
990 /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */
991 struct eb32_node *eb;
992
993 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
994 while (1) {
995 int msglen;
996 struct stksess *ts;
997
998 /* push local updates */
999 if (!eb) {
1000 eb = eb32_first(&ps->table->table->updates);
1001 if (!eb || ((int)(eb->key - ps->pushed) <= 0)) {
1002 ps->pushed = ps->table->table->localupdate;
1003 break;
1004 }
1005 }
1006
1007 if ((int)(eb->key - ps->table->table->localupdate) > 0) {
1008 ps->pushed = ps->table->table->localupdate;
1009 break;
1010 }
1011
1012 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001013 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +02001014 if (msglen) {
1015 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001016 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +02001017 if (repl <= 0) {
1018 /* no more write possible */
1019 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +01001020 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001021 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001022 goto switchstate;
1023 }
1024 ps->lastpush = ps->pushed = ts->upd.key;
1025 }
1026 eb = eb32_next(eb);
1027 }
1028 } /* ! LEARN_ASSIGN */
1029 /* noting more to do */
1030 goto out;
1031 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001032 case PEER_SESS_ST_EXIT:
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001033 repl = snprintf(trash.str, trash.size, "%d\n", appctx->st1);
Emeric Brun2b920a12010-09-23 18:30:22 +02001034
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001035 if (bi_putblk(si_ic(si), trash.str, repl) == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +01001036 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001037 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001038 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001039 case PEER_SESS_ST_END: {
Willy Tarreau73b013b2012-05-21 16:31:45 +02001040 si_shutw(si);
1041 si_shutr(si);
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001042 si_ic(si)->flags |= CF_READ_NULL;
Willy Tarreau828824a2015-04-19 17:20:03 +02001043 goto out;
Emeric Brun2b920a12010-09-23 18:30:22 +02001044 }
1045 }
1046 }
1047out:
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001048 si_oc(si)->flags |= CF_READ_DONTWAIT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001049 return;
Willy Tarreaubc18da12015-03-13 14:00:47 +01001050full:
Willy Tarreaufe127932015-04-21 19:23:39 +02001051 si_applet_cant_put(si);
Willy Tarreaubc18da12015-03-13 14:00:47 +01001052 goto out;
Emeric Brun2b920a12010-09-23 18:30:22 +02001053}
1054
Willy Tarreau30576452015-04-13 13:50:30 +02001055static struct applet peer_applet = {
Willy Tarreau3fdb3662012-11-12 00:42:33 +01001056 .obj_type = OBJ_TYPE_APPLET,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001057 .name = "<PEER>", /* used for logging */
1058 .fct = peer_io_handler,
Aman Gupta9a13e842012-04-02 18:57:53 -07001059 .release = peer_session_release,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001060};
Emeric Brun2b920a12010-09-23 18:30:22 +02001061
1062/*
1063 * Use this function to force a close of a peer session
1064 */
Willy Tarreau87b09662015-04-03 00:22:06 +02001065static void peer_session_forceshutdown(struct stream * stream)
Emeric Brun2b920a12010-09-23 18:30:22 +02001066{
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001067 struct appctx *appctx = NULL;
1068 int i;
Emeric Brun2b920a12010-09-23 18:30:22 +02001069
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001070 for (i = 0; i <= 1; i++) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001071 appctx = objt_appctx(stream->si[i].end);
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001072 if (!appctx)
1073 continue;
1074 if (appctx->applet != &peer_applet)
1075 continue;
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001076 break;
Emeric Brun2b920a12010-09-23 18:30:22 +02001077 }
1078
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001079 if (!appctx)
1080 return;
1081
Emeric Brun2b920a12010-09-23 18:30:22 +02001082 /* call release to reinit resync states if needed */
Willy Tarreau00a37f02015-04-13 12:05:19 +02001083 peer_session_release(appctx);
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001084 appctx->st0 = PEER_SESS_ST_END;
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001085 appctx->ctx.peers.ptr = NULL;
Willy Tarreau87b09662015-04-03 00:22:06 +02001086 task_wakeup(stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001087}
1088
Willy Tarreau91d96282015-03-13 15:47:26 +01001089/* Pre-configures a peers frontend to accept incoming connections */
1090void peers_setup_frontend(struct proxy *fe)
1091{
1092 fe->last_change = now.tv_sec;
1093 fe->cap = PR_CAP_FE;
1094 fe->maxconn = 0;
1095 fe->conn_retries = CONN_RETRIES;
1096 fe->timeout.client = MS_TO_TICKS(5000);
Willy Tarreaud1d48d42015-03-13 16:15:46 +01001097 fe->accept = frontend_accept;
Willy Tarreauf87ab942015-03-13 15:55:16 +01001098 fe->default_target = &peer_applet.obj_type;
Willy Tarreau91d96282015-03-13 15:47:26 +01001099 fe->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
1100}
1101
Emeric Brun2b920a12010-09-23 18:30:22 +02001102/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01001103 * Create a new peer session in assigned state (connect will start automatically)
Emeric Brun2b920a12010-09-23 18:30:22 +02001104 */
Willy Tarreau87b09662015-04-03 00:22:06 +02001105static struct stream *peer_session_create(struct peer *peer, struct peer_session *ps)
Emeric Brun2b920a12010-09-23 18:30:22 +02001106{
Willy Tarreau4348fad2012-09-20 16:48:07 +02001107 struct listener *l = LIST_NEXT(&peer->peers->peers_fe->conf.listeners, struct listener *, by_fe);
Emeric Brun2b920a12010-09-23 18:30:22 +02001108 struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001109 struct appctx *appctx;
Willy Tarreau15b5e142015-04-04 14:38:25 +02001110 struct session *sess;
Willy Tarreau87b09662015-04-03 00:22:06 +02001111 struct stream *s;
Emeric Brun2b920a12010-09-23 18:30:22 +02001112 struct task *t;
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001113 struct connection *conn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001114
Willy Tarreaud990baf2015-04-05 00:32:03 +02001115 ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
1116 ps->statuscode = PEER_SESS_SC_CONNECTCODE;
1117 s = NULL;
1118
1119 appctx = appctx_new(&peer_applet);
1120 if (!appctx)
1121 goto out_close;
1122
1123 appctx->st0 = PEER_SESS_ST_CONNECT;
1124 appctx->ctx.peers.ptr = (void *)ps;
1125
Willy Tarreau4099a7c2015-04-05 00:39:55 +02001126 sess = session_new(p, l, &appctx->obj_type);
Willy Tarreau15b5e142015-04-04 14:38:25 +02001127 if (!sess) {
Godbach430f2912013-06-20 13:28:38 +08001128 Alert("out of memory in peer_session_create().\n");
Willy Tarreaud990baf2015-04-05 00:32:03 +02001129 goto out_free_appctx;
Emeric Brun2b920a12010-09-23 18:30:22 +02001130 }
1131
Willy Tarreau8baf9062015-04-05 00:46:36 +02001132 if ((t = task_new()) == NULL) {
Willy Tarreau15b5e142015-04-04 14:38:25 +02001133 Alert("out of memory in peer_session_create().\n");
1134 goto out_free_sess;
1135 }
Willy Tarreau8baf9062015-04-05 00:46:36 +02001136 t->nice = l->nice;
1137
Willy Tarreau73b65ac2015-04-08 18:26:29 +02001138 if ((s = stream_new(sess, t, &appctx->obj_type)) == NULL) {
Willy Tarreau342bfb12015-04-05 01:35:34 +02001139 Alert("Failed to initialize stream in peer_session_create().\n");
Willy Tarreau8baf9062015-04-05 00:46:36 +02001140 goto out_free_task;
1141 }
1142
Willy Tarreau342bfb12015-04-05 01:35:34 +02001143 /* The tasks below are normally what is supposed to be done by
1144 * fe->accept().
1145 */
Willy Tarreaue7dff022015-04-03 01:14:29 +02001146 s->flags = SF_ASSIGNED|SF_ADDR_SET;
Emeric Brun2b920a12010-09-23 18:30:22 +02001147
Willy Tarreau6e2979c2015-04-27 13:21:15 +02001148 /* applet is waiting for data */
1149 si_applet_cant_get(&s->si[0]);
1150 appctx_wakeup(appctx);
1151
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001152 /* initiate an outgoing connection */
1153 si_set_state(&s->si[1], SI_ST_ASS);
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001154
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001155 /* automatically prepare the stream interface to connect to the
Willy Tarreaub363a1f2013-10-01 10:45:07 +02001156 * pre-initialized connection in si->conn.
1157 */
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001158 if (unlikely((conn = conn_new()) == NULL))
Willy Tarreau8baf9062015-04-05 00:46:36 +02001159 goto out_free_strm;
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001160
1161 conn_prepare(conn, peer->proto, peer->xprt);
1162 si_attach_conn(&s->si[1], conn);
1163
1164 conn->target = s->target = &s->be->obj_type;
1165 memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
Emeric Brun2b920a12010-09-23 18:30:22 +02001166 s->do_log = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001167 s->uniq_id = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +02001168
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001169 s->res.flags |= CF_READ_DONTWAIT;
Willy Tarreau696a2912014-11-24 11:36:57 +01001170
Emeric Brun2b920a12010-09-23 18:30:22 +02001171 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
1172 p->feconn++;/* beconn will be increased later */
1173 jobs++;
Willy Tarreaufb0afa72015-04-03 14:46:27 +02001174 if (!(s->sess->listener->options & LI_O_UNLIMITED))
Willy Tarreau3c63fd82011-09-07 18:00:47 +02001175 actconn++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001176 totalconn++;
1177
Willy Tarreaue5843b32015-04-27 18:40:14 +02001178 ps->appctx = appctx;
1179 ps->stream = s;
Emeric Brun2b920a12010-09-23 18:30:22 +02001180 return s;
1181
1182 /* Error unrolling */
Willy Tarreau15b5e142015-04-04 14:38:25 +02001183 out_free_strm:
Emeric Brun2b920a12010-09-23 18:30:22 +02001184 LIST_DEL(&s->list);
Willy Tarreau87b09662015-04-03 00:22:06 +02001185 pool_free2(pool2_stream, s);
Willy Tarreau8baf9062015-04-05 00:46:36 +02001186 out_free_task:
1187 task_free(t);
Willy Tarreau15b5e142015-04-04 14:38:25 +02001188 out_free_sess:
Willy Tarreau11c36242015-04-04 15:54:03 +02001189 session_free(sess);
Willy Tarreaud990baf2015-04-05 00:32:03 +02001190 out_free_appctx:
1191 appctx_free(appctx);
Emeric Brun2b920a12010-09-23 18:30:22 +02001192 out_close:
1193 return s;
1194}
1195
1196/*
1197 * Task processing function to manage re-connect and peer session
1198 * tasks wakeup on local update.
1199 */
Simon Horman96553772011-06-08 09:18:51 +09001200static struct task *process_peer_sync(struct task * task)
Emeric Brun2b920a12010-09-23 18:30:22 +02001201{
1202 struct shared_table *st = (struct shared_table *)task->context;
1203 struct peer_session *ps;
1204
1205 task->expire = TICK_ETERNITY;
1206
1207 if (!stopping) {
1208 /* Normal case (not soft stop)*/
1209 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) &&
1210 (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) &&
1211 !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) {
1212 /* Resync from local peer needed
1213 no peer was assigned for the lesson
1214 and no old local peer found
1215 or resync timeout expire */
1216
1217 /* flag no more resync from local, to try resync from remotes */
1218 st->flags |= SHTABLE_F_RESYNC_LOCAL;
1219
1220 /* reschedule a resync */
1221 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1222 }
1223
1224 /* For each session */
1225 for (ps = st->sessions; ps; ps = ps->next) {
1226 /* For each remote peers */
1227 if (!ps->peer->local) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001228 if (!ps->stream) {
1229 /* no active stream */
Emeric Brun2b920a12010-09-23 18:30:22 +02001230 if (ps->statuscode == 0 ||
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001231 ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
1232 ((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
1233 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
Emeric Brun2b920a12010-09-23 18:30:22 +02001234 tick_is_expired(ps->reconnect, now_ms))) {
1235 /* connection never tried
Willy Tarreau87b09662015-04-03 00:22:06 +02001236 * or previous stream established with success
1237 * or previous stream failed during connection
Emeric Brun2b920a12010-09-23 18:30:22 +02001238 * and reconnection timer is expired */
1239
1240 /* retry a connect */
Willy Tarreau87b09662015-04-03 00:22:06 +02001241 ps->stream = peer_session_create(ps->peer, ps);
Emeric Brun2b920a12010-09-23 18:30:22 +02001242 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001243 else if (ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
1244 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001245 /* If previous session failed during connection
1246 * but reconnection timer is not expired */
1247
1248 /* reschedule task for reconnect */
1249 task->expire = tick_first(task->expire, ps->reconnect);
1250 }
1251 /* else do nothing */
Willy Tarreau87b09662015-04-03 00:22:06 +02001252 } /* !ps->stream */
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001253 else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001254 /* current stream is active and established */
Emeric Brun2b920a12010-09-23 18:30:22 +02001255 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1256 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1257 !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
1258 /* Resync from a remote is needed
1259 * and no peer was assigned for lesson
1260 * and current peer may be up2date */
1261
1262 /* assign peer for the lesson */
1263 ps->flags |= PEER_F_LEARN_ASSIGN;
1264 st->flags |= SHTABLE_F_RESYNC_ASSIGN;
1265
Willy Tarreau87b09662015-04-03 00:22:06 +02001266 /* awake peer stream task to handle a request of resync */
Willy Tarreaue5843b32015-04-27 18:40:14 +02001267 appctx_wakeup(ps->appctx);
Emeric Brun2b920a12010-09-23 18:30:22 +02001268 }
1269 else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001270 /* awake peer stream task to push local updates */
Willy Tarreaue5843b32015-04-27 18:40:14 +02001271 appctx_wakeup(ps->appctx);
Emeric Brun2b920a12010-09-23 18:30:22 +02001272 }
1273 /* else do nothing */
1274 } /* SUCCESSCODE */
1275 } /* !ps->peer->local */
1276 } /* for */
1277
1278 /* Resync from remotes expired: consider resync is finished */
1279 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1280 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1281 tick_is_expired(st->resync_timeout, now_ms)) {
1282 /* Resync from remote peer needed
1283 * no peer was assigned for the lesson
1284 * and resync timeout expire */
1285
1286 /* flag no more resync from remote, consider resync is finished */
1287 st->flags |= SHTABLE_F_RESYNC_REMOTE;
1288 }
1289
1290 if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) {
1291 /* Resync not finished*/
1292 /* reschedule task to resync timeout, to ended resync if needed */
1293 task->expire = tick_first(task->expire, st->resync_timeout);
1294 }
1295 } /* !stopping */
1296 else {
1297 /* soft stop case */
1298 if (task->state & TASK_WOKEN_SIGNAL) {
1299 /* We've just recieved the signal */
1300 if (!(st->flags & SHTABLE_F_DONOTSTOP)) {
1301 /* add DO NOT STOP flag if not present */
1302 jobs++;
1303 st->flags |= SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001304 st->table->syncing++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001305 }
1306
1307 /* disconnect all connected peers */
1308 for (ps = st->sessions; ps; ps = ps->next) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001309 if (ps->stream) {
1310 peer_session_forceshutdown(ps->stream);
1311 ps->stream = NULL;
Willy Tarreaue5843b32015-04-27 18:40:14 +02001312 ps->appctx = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001313 }
1314 }
1315 }
1316 ps = st->local_session;
1317
1318 if (ps->flags & PEER_F_TEACH_COMPLETE) {
1319 if (st->flags & SHTABLE_F_DONOTSTOP) {
1320 /* resync of new process was complete, current process can die now */
1321 jobs--;
1322 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001323 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001324 }
1325 }
Willy Tarreau87b09662015-04-03 00:22:06 +02001326 else if (!ps->stream) {
1327 /* If stream is not active */
Emeric Brun2b920a12010-09-23 18:30:22 +02001328 if (ps->statuscode == 0 ||
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001329 ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
1330 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
1331 ps->statuscode == PEER_SESS_SC_TRYAGAIN) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001332 /* connection never tried
Willy Tarreau87b09662015-04-03 00:22:06 +02001333 * or previous stream was successfully established
1334 * or previous stream tcp connect success but init state incomplete
Emeric Brun2b920a12010-09-23 18:30:22 +02001335 * or during previous connect, peer replies a try again statuscode */
1336
1337 /* connect to the peer */
Willy Tarreaue5843b32015-04-27 18:40:14 +02001338 peer_session_create(ps->peer, ps);
Emeric Brun2b920a12010-09-23 18:30:22 +02001339 }
1340 else {
1341 /* Other error cases */
1342 if (st->flags & SHTABLE_F_DONOTSTOP) {
1343 /* unable to resync new process, current process can die now */
1344 jobs--;
1345 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001346 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001347 }
1348 }
1349 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001350 else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE &&
Emeric Brun2b920a12010-09-23 18:30:22 +02001351 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001352 /* current stream active and established
1353 awake stream to push remaining local updates */
Willy Tarreaue5843b32015-04-27 18:40:14 +02001354 appctx_wakeup(ps->appctx);
Emeric Brun2b920a12010-09-23 18:30:22 +02001355 }
1356 } /* stopping */
1357 /* Wakeup for re-connect */
1358 return task;
1359}
1360
1361/*
1362 * Function used to register a table for sync on a group of peers
1363 *
1364 */
1365void peers_register_table(struct peers *peers, struct stktable *table)
1366{
1367 struct shared_table *st;
1368 struct peer * curpeer;
1369 struct peer_session *ps;
Willy Tarreau4348fad2012-09-20 16:48:07 +02001370 struct listener *listener;
Emeric Brun2b920a12010-09-23 18:30:22 +02001371
1372 st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
1373 st->table = table;
1374 st->next = peers->tables;
1375 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1376 peers->tables = st;
1377
1378 for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
1379 ps = (struct peer_session *)calloc(1,sizeof(struct peer_session));
1380 ps->table = st;
1381 ps->peer = curpeer;
1382 if (curpeer->local)
1383 st->local_session = ps;
1384 ps->next = st->sessions;
1385 ps->reconnect = now_ms;
1386 st->sessions = ps;
1387 peers->peers_fe->maxconn += 3;
1388 }
1389
Willy Tarreau4348fad2012-09-20 16:48:07 +02001390 list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
1391 listener->maxconn = peers->peers_fe->maxconn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001392 st->sync_task = task_new();
1393 st->sync_task->process = process_peer_sync;
1394 st->sync_task->expire = TICK_ETERNITY;
1395 st->sync_task->context = (void *)st;
1396 table->sync_task =st->sync_task;
1397 signal_register_task(0, table->sync_task, 0);
1398 task_wakeup(st->sync_task, TASK_WOKEN_INIT);
1399}
1400