blob: 4899ee7a0a5dfaf4e38792f92e612e02ff3cbf0b [file] [log] [blame]
Emeric Brun2b920a12010-09-23 18:30:22 +02001/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01002 * Stick table synchro management.
Emeric Brun2b920a12010-09-23 18:30:22 +02003 *
4 * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <sys/socket.h>
20#include <sys/stat.h>
21#include <sys/types.h>
22
23#include <common/compat.h>
24#include <common/config.h>
25#include <common/time.h>
26
27#include <types/global.h>
Willy Tarreau3fdb3662012-11-12 00:42:33 +010028#include <types/listener.h>
29#include <types/obj_type.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020030#include <types/peers.h>
31
32#include <proto/acl.h>
Willy Tarreauc7e42382012-08-24 19:22:53 +020033#include <proto/channel.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020034#include <proto/fd.h>
Willy Tarreaud1d48d42015-03-13 16:15:46 +010035#include <proto/frontend.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020036#include <proto/log.h>
37#include <proto/hdr_idx.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020038#include <proto/proto_tcp.h>
39#include <proto/proto_http.h>
40#include <proto/proxy.h>
Willy Tarreaufeb76402015-04-03 14:10:06 +020041#include <proto/session.h>
Willy Tarreau87b09662015-04-03 00:22:06 +020042#include <proto/stream.h>
Willy Tarreau22ec1ea2014-11-27 20:45:39 +010043#include <proto/signal.h>
44#include <proto/stick_table.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020045#include <proto/stream_interface.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020046#include <proto/task.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020047
48
49/*******************************/
50/* Current peer learning state */
51/*******************************/
52
53/******************************/
54/* Current table resync state */
55/******************************/
56#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
57#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
58#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
59#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
60#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
61 to push data to new process */
62
63#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
64#define SHTABLE_RESYNC_FROMLOCAL 0x00000000
65#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL
66#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
67
68/******************************/
69/* Remote peer teaching state */
70/******************************/
71#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
72#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */
73#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */
74#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
75#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
76#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
77#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
78
79#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
80#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
81
82
83/**********************************/
84/* Peer Session IO handler states */
85/**********************************/
86
Willy Tarreaue4d927a2013-12-01 12:47:35 +010087enum {
88 PEER_SESS_ST_ACCEPT = 0, /* Initial state for session create by an accept, must be zero! */
89 PEER_SESS_ST_GETVERSION, /* Validate supported protocol version */
90 PEER_SESS_ST_GETHOST, /* Validate host ID correspond to local host id */
91 PEER_SESS_ST_GETPEER, /* Validate peer ID correspond to a known remote peer id */
92 PEER_SESS_ST_GETTABLE, /* Search into registered table for a table with same id and validate type and size */
93 /* after this point, data were possibly exchanged */
94 PEER_SESS_ST_SENDSUCCESS, /* Send ret code 200 (success) and wait for message */
95 PEER_SESS_ST_CONNECT, /* Initial state for session create on a connect, push presentation into buffer */
96 PEER_SESS_ST_GETSTATUS, /* Wait for the welcome message */
97 PEER_SESS_ST_WAITMSG, /* Wait for data messages */
98 PEER_SESS_ST_EXIT, /* Exit with status code */
99 PEER_SESS_ST_END, /* Killed session */
100};
Emeric Brun2b920a12010-09-23 18:30:22 +0200101
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100102/***************************************************/
103/* Peer Session status code - part of the protocol */
104/***************************************************/
Emeric Brun2b920a12010-09-23 18:30:22 +0200105
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100106#define PEER_SESS_SC_CONNECTCODE 100 /* connect in progress */
107#define PEER_SESS_SC_CONNECTEDCODE 110 /* tcp connect success */
Emeric Brun2b920a12010-09-23 18:30:22 +0200108
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100109#define PEER_SESS_SC_SUCCESSCODE 200 /* accept or connect successful */
Emeric Brun2b920a12010-09-23 18:30:22 +0200110
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100111#define PEER_SESS_SC_TRYAGAIN 300 /* try again later */
Emeric Brun2b920a12010-09-23 18:30:22 +0200112
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100113#define PEER_SESS_SC_ERRPROTO 501 /* error protocol */
114#define PEER_SESS_SC_ERRVERSION 502 /* unknown protocol version */
115#define PEER_SESS_SC_ERRHOST 503 /* bad host name */
116#define PEER_SESS_SC_ERRPEER 504 /* unknown peer */
117#define PEER_SESS_SC_ERRTYPE 505 /* table key type mismatch */
118#define PEER_SESS_SC_ERRSIZE 506 /* table key size mismatch */
119#define PEER_SESS_SC_ERRTABLE 507 /* unknown table */
Emeric Brun2b920a12010-09-23 18:30:22 +0200120
121#define PEER_SESSION_PROTO_NAME "HAProxyS"
122
123struct peers *peers = NULL;
Willy Tarreau87b09662015-04-03 00:22:06 +0200124static void peer_session_forceshutdown(struct stream * stream);
Emeric Brun2b920a12010-09-23 18:30:22 +0200125
126
127/*
128 * This prepare the data update message of the stick session <ts>, <ps> is the the peer session
129 * where the data going to be pushed, <msg> is a buffer of <size> to recieve data message content
130 */
Simon Horman96553772011-06-08 09:18:51 +0900131static int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size)
Emeric Brun2b920a12010-09-23 18:30:22 +0200132{
133 uint32_t netinteger;
134 int len;
135 /* construct message */
136 if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) {
137 msg[0] = 0x80 + ts->upd.key - ps->lastpush;
138 len = sizeof(char);
139 }
140 else {
141 msg[0] = 'D';
142 netinteger = htonl(ts->upd.key);
143 memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger));
144 len = sizeof(char) + sizeof(netinteger);
145 }
146
147 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
148 int stlen = strlen((char *)ts->key.key);
149
150 netinteger = htonl(strlen((char *)ts->key.key));
151 memcpy(&msg[len], &netinteger, sizeof(netinteger));
152 memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen);
153 len += sizeof(netinteger) + stlen;
154
155 }
156 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
157 netinteger = htonl(*((uint32_t *)ts->key.key));
158 memcpy(&msg[len], &netinteger, sizeof(netinteger));
159 len += sizeof(netinteger);
160 }
161 else {
162 memcpy(&msg[len], ts->key.key, ps->table->table->key_size);
163 len += ps->table->table->key_size;
164 }
165
166 if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
167 netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id));
168 else
169 netinteger = 0;
170
171 memcpy(&msg[len], &netinteger , sizeof(netinteger));
172 len += sizeof(netinteger);
173
174 return len;
175}
176
177
178/*
179 * Callback to release a session with a peer
180 */
Simon Horman96553772011-06-08 09:18:51 +0900181static void peer_session_release(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200182{
Willy Tarreau87b09662015-04-03 00:22:06 +0200183 struct stream *s = si_strm(si);
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100184 struct appctx *appctx = objt_appctx(si->end);
185 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200186
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100187 /* appctx->ctx.peers.ptr is not a peer session */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100188 if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
Emeric Brun2b920a12010-09-23 18:30:22 +0200189 return;
190
191 /* peer session identified */
192 if (ps) {
Willy Tarreau87b09662015-04-03 00:22:06 +0200193 if (ps->stream == s) {
194 ps->stream = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200195 if (ps->flags & PEER_F_LEARN_ASSIGN) {
196 /* unassign current peer for learning */
197 ps->flags &= ~(PEER_F_LEARN_ASSIGN);
198 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
199
200 /* reschedule a resync */
201 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
202 }
203 /* reset teaching and learning flags to 0 */
204 ps->flags &= PEER_TEACH_RESET;
205 ps->flags &= PEER_LEARN_RESET;
206 }
207 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
208 }
209}
210
211
212/*
213 * IO Handler to handle message exchance with a peer
214 */
Willy Tarreaub24281b2011-02-13 13:16:36 +0100215static void peer_io_handler(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200216{
Willy Tarreau87b09662015-04-03 00:22:06 +0200217 struct stream *s = si_strm(si);
Willy Tarreaud0d8da92015-04-04 02:10:38 +0200218 struct peers *curpeers = (struct peers *)strm_fe(s)->parent;
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100219 struct appctx *appctx = objt_appctx(si->end);
Emeric Brun2b920a12010-09-23 18:30:22 +0200220 int reql = 0;
221 int repl = 0;
222
223 while (1) {
224switchstate:
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100225 switch(appctx->st0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100226 case PEER_SESS_ST_ACCEPT:
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100227 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100228 appctx->st0 = PEER_SESS_ST_GETVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200229 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100230 case PEER_SESS_ST_GETVERSION:
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100231 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200232 if (reql <= 0) { /* closed or EOL not found */
233 if (reql == 0)
234 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100235 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200236 goto switchstate;
237 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100238 if (trash.str[reql-1] != '\n') {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100239 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200240 goto switchstate;
241 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100242 else if (reql > 1 && (trash.str[reql-2] == '\r'))
243 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200244 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100245 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200246
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100247 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200248
249 /* test version */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100250 if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash.str) != 0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100251 appctx->st0 = PEER_SESS_ST_EXIT;
252 appctx->st1 = PEER_SESS_SC_ERRVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200253 /* test protocol */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100254 if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100255 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200256 goto switchstate;
257 }
258
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100259 appctx->st0 = PEER_SESS_ST_GETHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200260 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100261 case PEER_SESS_ST_GETHOST:
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100262 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200263 if (reql <= 0) { /* closed or EOL not found */
264 if (reql == 0)
265 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100266 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200267 goto switchstate;
268 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100269 if (trash.str[reql-1] != '\n') {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100270 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200271 goto switchstate;
272 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100273 else if (reql > 1 && (trash.str[reql-2] == '\r'))
274 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200275 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100276 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200277
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100278 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200279
280 /* test hostname match */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100281 if (strcmp(localpeer, trash.str) != 0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100282 appctx->st0 = PEER_SESS_ST_EXIT;
283 appctx->st1 = PEER_SESS_SC_ERRHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200284 goto switchstate;
285 }
286
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100287 appctx->st0 = PEER_SESS_ST_GETPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200288 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100289 case PEER_SESS_ST_GETPEER: {
Emeric Brun2b920a12010-09-23 18:30:22 +0200290 struct peer *curpeer;
291 char *p;
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100292 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200293 if (reql <= 0) { /* closed or EOL not found */
294 if (reql == 0)
295 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100296 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200297 goto switchstate;
298 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100299 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200300 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100301 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200302 goto switchstate;
303 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100304 else if (reql > 1 && (trash.str[reql-2] == '\r'))
305 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200306 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100307 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200308
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100309 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200310
311 /* parse line "<peer name> <pid>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100312 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200313 if (!p) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100314 appctx->st0 = PEER_SESS_ST_EXIT;
315 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200316 goto switchstate;
317 }
318 *p = 0;
319
320 /* lookup known peer */
321 for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) {
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100322 if (strcmp(curpeer->id, trash.str) == 0)
Emeric Brun2b920a12010-09-23 18:30:22 +0200323 break;
324 }
325
326 /* if unknown peer */
327 if (!curpeer) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100328 appctx->st0 = PEER_SESS_ST_EXIT;
329 appctx->st1 = PEER_SESS_SC_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200330 goto switchstate;
331 }
332
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100333 appctx->ctx.peers.ptr = curpeer;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100334 appctx->st0 = PEER_SESS_ST_GETTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200335 /* fall through */
336 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100337 case PEER_SESS_ST_GETTABLE: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100338 struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200339 struct shared_table *st;
340 struct peer_session *ps = NULL;
341 unsigned long key_type;
342 size_t key_size;
343 char *p;
344
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100345 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200346 if (reql <= 0) { /* closed or EOL not found */
347 if (reql == 0)
348 goto out;
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100349 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100350 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200351 goto switchstate;
352 }
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100353 /* Re init appctx->ctx.peers.ptr to null, to handle correctly a release case */
354 appctx->ctx.peers.ptr = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200355
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100356 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200357 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100358 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200359 goto switchstate;
360 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100361 else if (reql > 1 && (trash.str[reql-2] == '\r'))
362 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200363 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100364 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200365
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100366 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200367
368 /* Parse line "<table name> <type> <size>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100369 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200370 if (!p) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100371 appctx->st0 = PEER_SESS_ST_EXIT;
372 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200373 goto switchstate;
374 }
375 *p = 0;
376 key_type = (unsigned long)atol(p+1);
377
378 p = strchr(p+1, ' ');
379 if (!p) {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100380 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100381 appctx->st0 = PEER_SESS_ST_EXIT;
382 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200383 goto switchstate;
384 }
385
386 key_size = (size_t)atoi(p);
387 for (st = curpeers->tables; st; st = st->next) {
388 /* If table name matches */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100389 if (strcmp(st->table->id, trash.str) == 0) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100390 /* Check key size mismatches, except for strings
391 * which may be truncated as long as they fit in
392 * a buffer.
393 */
394 if (key_size != st->table->key_size &&
395 (key_type != STKTABLE_TYPE_STRING ||
396 1 + 4 + 4 + key_size - 1 >= trash.size)) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100397 appctx->st0 = PEER_SESS_ST_EXIT;
398 appctx->st1 = PEER_SESS_SC_ERRSIZE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200399 goto switchstate;
400 }
401
402 /* If key type mismatches */
403 if (key_type != st->table->type) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100404 appctx->st0 = PEER_SESS_ST_EXIT;
405 appctx->st1 = PEER_SESS_SC_ERRTYPE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200406 goto switchstate;
407 }
408
Willy Tarreau87b09662015-04-03 00:22:06 +0200409 /* lookup peer stream of current peer */
Emeric Brun2b920a12010-09-23 18:30:22 +0200410 for (ps = st->sessions; ps; ps = ps->next) {
411 if (ps->peer == curpeer) {
Willy Tarreau87b09662015-04-03 00:22:06 +0200412 /* If stream already active, replaced by new one */
413 if (ps->stream && ps->stream != s) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200414 if (ps->peer->local) {
415 /* Local connection, reply a retry */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100416 appctx->st0 = PEER_SESS_ST_EXIT;
417 appctx->st1 = PEER_SESS_SC_TRYAGAIN;
Emeric Brun2b920a12010-09-23 18:30:22 +0200418 goto switchstate;
419 }
Willy Tarreau87b09662015-04-03 00:22:06 +0200420 peer_session_forceshutdown(ps->stream);
Emeric Brun2b920a12010-09-23 18:30:22 +0200421 }
Willy Tarreau87b09662015-04-03 00:22:06 +0200422 ps->stream = s;
Emeric Brun2b920a12010-09-23 18:30:22 +0200423 break;
424 }
425 }
426 break;
427 }
428 }
429
430 /* If table not found */
431 if (!st){
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100432 appctx->st0 = PEER_SESS_ST_EXIT;
433 appctx->st1 = PEER_SESS_SC_ERRTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200434 goto switchstate;
435 }
436
437 /* If no peer session for current peer */
438 if (!ps) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100439 appctx->st0 = PEER_SESS_ST_EXIT;
440 appctx->st1 = PEER_SESS_SC_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200441 goto switchstate;
442 }
443
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100444 appctx->ctx.peers.ptr = ps;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100445 appctx->st0 = PEER_SESS_ST_SENDSUCCESS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200446 /* fall through */
447 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100448 case PEER_SESS_ST_SENDSUCCESS: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100449 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200450
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100451 repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100452 repl = bi_putblk(si_ic(si), trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200453 if (repl <= 0) {
454 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100455 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100456 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200457 goto switchstate;
458 }
459
460 /* Register status code */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100461 ps->statuscode = PEER_SESS_SC_SUCCESSCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200462
463 /* Awake main task */
464 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
465
466 /* Init cursors */
467 ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0;
468 ps->pushed = ps->update;
469
470 /* Init confirm counter */
471 ps->confirm = 0;
472
473 /* reset teaching and learning flags to 0 */
474 ps->flags &= PEER_TEACH_RESET;
475 ps->flags &= PEER_LEARN_RESET;
476
477 /* if current peer is local */
478 if (ps->peer->local) {
479 /* if table need resyncfrom local and no process assined */
480 if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL &&
481 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
482 /* assign local peer for a lesson, consider lesson already requested */
483 ps->flags |= PEER_F_LEARN_ASSIGN;
484 ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
485 }
486
487 }
488 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
489 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
490 /* assign peer for a lesson */
491 ps->flags |= PEER_F_LEARN_ASSIGN;
492 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
493 }
494 /* switch to waiting message state */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100495 appctx->st0 = PEER_SESS_ST_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200496 goto switchstate;
497 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100498 case PEER_SESS_ST_CONNECT: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100499 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200500
501 /* Send headers */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100502 repl = snprintf(trash.str, trash.size,
Emeric Brun2b920a12010-09-23 18:30:22 +0200503 PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n",
504 ps->peer->id,
505 localpeer,
Willy Tarreau7b77c9f2012-01-07 22:52:12 +0100506 (int)getpid(),
Emeric Brun2b920a12010-09-23 18:30:22 +0200507 ps->table->table->id,
508 ps->table->table->type,
Willy Tarreaubd55e312010-11-11 10:55:09 +0100509 (int)ps->table->table->key_size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200510
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100511 if (repl >= trash.size) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100512 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200513 goto switchstate;
514 }
515
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100516 repl = bi_putblk(si_ic(si), trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200517 if (repl <= 0) {
518 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100519 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100520 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200521 goto switchstate;
522 }
523
524 /* switch to the waiting statuscode state */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100525 appctx->st0 = PEER_SESS_ST_GETSTATUS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200526 /* fall through */
527 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100528 case PEER_SESS_ST_GETSTATUS: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100529 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200530
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100531 if (si_ic(si)->flags & CF_WRITE_PARTIAL)
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100532 ps->statuscode = PEER_SESS_SC_CONNECTEDCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200533
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100534 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200535 if (reql <= 0) { /* closed or EOL not found */
536 if (reql == 0)
537 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100538 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200539 goto switchstate;
540 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100541 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200542 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100543 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200544 goto switchstate;
545 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100546 else if (reql > 1 && (trash.str[reql-2] == '\r'))
547 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200548 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100549 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200550
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100551 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200552
553 /* Register status code */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100554 ps->statuscode = atoi(trash.str);
Emeric Brun2b920a12010-09-23 18:30:22 +0200555
556 /* Awake main task */
557 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
558
559 /* If status code is success */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100560 if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200561 /* Init cursors */
562 ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0;
563 ps->pushed = ps->update;
564
565 /* Init confirm counter */
566 ps->confirm = 0;
567
568 /* reset teaching and learning flags to 0 */
569 ps->flags &= PEER_TEACH_RESET;
570 ps->flags &= PEER_LEARN_RESET;
571
572 /* If current peer is local */
573 if (ps->peer->local) {
574 /* Init cursors to push a resync */
575 ps->teaching_origin = ps->pushed = ps->table->table->update;
576 /* flag to start to teach lesson */
577 ps->flags |= PEER_F_TEACH_PROCESS;
578
579 }
580 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
581 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
582 /* If peer is remote and resync from remote is needed,
583 and no peer currently assigned */
584
585 /* assign peer for a lesson */
586 ps->flags |= PEER_F_LEARN_ASSIGN;
587 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
588 }
589
590 }
591 else {
592 /* Status code is not success, abort */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100593 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200594 goto switchstate;
595 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100596 appctx->st0 = PEER_SESS_ST_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200597 /* fall through */
598 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100599 case PEER_SESS_ST_WAITMSG: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100600 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200601 struct stksess *ts, *newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200602 char c;
603 int totl = 0;
604
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100605 reql = bo_getblk(si_oc(si), (char *)&c, sizeof(c), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200606 if (reql <= 0) /* closed or EOL not found */
607 goto incomplete;
608
Emeric Brun2b920a12010-09-23 18:30:22 +0200609 totl += reql;
610
611 if ((c & 0x80) || (c == 'D')) {
612 /* Here we have data message */
613 unsigned int pushack;
Emeric Brun2b920a12010-09-23 18:30:22 +0200614 int srvid;
615 uint32_t netinteger;
616
617 /* Compute update remote version */
618 if (c & 0x80) {
619 pushack = ps->pushack + (unsigned int)(c & 0x7F);
620 }
621 else {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100622 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200623 if (reql <= 0) /* closed or EOL not found */
624 goto incomplete;
625
Emeric Brun2b920a12010-09-23 18:30:22 +0200626 totl += reql;
627 pushack = ntohl(netinteger);
628 }
629
Willy Tarreau86a446e2013-11-25 23:02:37 +0100630 /* Read key. The string keys are read in two steps, the first step
631 * consists in reading whatever fits into the table directly into
632 * the pre-allocated key. The second step consists in simply
633 * draining all exceeding data. This can happen for example after a
634 * config reload with a smaller key size for the stick table than
635 * what was previously set, or when facing the impossibility to
636 * allocate a new stksess (for example when the table is full with
637 * "nopurge").
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200638 */
Emeric Brun2b920a12010-09-23 18:30:22 +0200639 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100640 unsigned int to_read, to_store;
641
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200642 /* read size first */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100643 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200644 if (reql <= 0) /* closed or EOL not found */
645 goto incomplete;
646
Emeric Brun2b920a12010-09-23 18:30:22 +0200647 totl += reql;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100648
649 to_store = 0;
650 to_read = ntohl(netinteger);
651
Willy Tarreau4e4292b2014-11-28 12:18:45 +0100652 if (to_read + totl > si_ob(si)->size) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100653 /* impossible to read a key this large, abort */
654 reql = -1;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200655 goto incomplete;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100656 }
Willy Tarreau72d6c162013-04-11 16:14:13 +0200657
Willy Tarreau86a446e2013-11-25 23:02:37 +0100658 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200659 if (newts)
Willy Tarreau86a446e2013-11-25 23:02:37 +0100660 to_store = MIN(to_read, ps->table->table->key_size - 1);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200661
Willy Tarreau86a446e2013-11-25 23:02:37 +0100662 /* we read up to two blocks, the first one goes into the key,
663 * the rest is drained into the trash.
664 */
665 if (to_store) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100666 reql = bo_getblk(si_oc(si), (char *)newts->key.key, to_store, totl);
Willy Tarreau86a446e2013-11-25 23:02:37 +0100667 if (reql <= 0) /* closed or incomplete */
668 goto incomplete;
669 newts->key.key[reql] = 0;
670 totl += reql;
671 to_read -= reql;
672 }
673 if (to_read) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100674 reql = bo_getblk(si_oc(si), trash.str, to_read, totl);
Willy Tarreau86a446e2013-11-25 23:02:37 +0100675 if (reql <= 0) /* closed or incomplete */
676 goto incomplete;
677 totl += reql;
678 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200679 }
680 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100681 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200682 if (reql <= 0) /* closed or EOL not found */
683 goto incomplete;
Cyril Bonté9a60ff92014-02-16 01:07:07 +0100684 newts = stksess_new(ps->table->table, NULL);
685 if (newts) {
686 netinteger = ntohl(netinteger);
687 memcpy(newts->key.key, &netinteger, sizeof(netinteger));
688 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200689 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200690 }
691 else {
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200692 /* type ip or binary */
693 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100694 reql = bo_getblk(si_oc(si), newts ? (char *)newts->key.key : trash.str, ps->table->table->key_size, totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200695 if (reql <= 0) /* closed or EOL not found */
696 goto incomplete;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200697 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200698 }
699
700 /* read server id */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100701 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200702 if (reql <= 0) /* closed or EOL not found */
703 goto incomplete;
704
Emeric Brun2b920a12010-09-23 18:30:22 +0200705 totl += reql;
706 srvid = ntohl(netinteger);
707
708 /* update entry */
Emeric Brun2b920a12010-09-23 18:30:22 +0200709 if (newts) {
710 /* lookup for existing entry */
711 ts = stktable_lookup(ps->table->table, newts);
712 if (ts) {
713 /* the entry already exist, we can free ours */
714 stktable_touch(ps->table->table, ts, 0);
715 stksess_free(ps->table->table, newts);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200716 newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200717 }
718 else {
719 struct eb32_node *eb;
720
721 /* create new entry */
722 ts = stktable_store(ps->table->table, newts, 0);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200723 newts = NULL; /* don't reuse it */
724
Emeric Brun2b920a12010-09-23 18:30:22 +0200725 ts->upd.key= (++ps->table->table->update)+(2^31);
726 eb = eb32_insert(&ps->table->table->updates, &ts->upd);
727 if (eb != &ts->upd) {
728 eb32_delete(eb);
729 eb32_insert(&ps->table->table->updates, &ts->upd);
730 }
731 }
732
733 /* update entry */
734 if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
735 stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
736 ps->pushack = pushack;
737 }
738
739 }
740 else if (c == 'R') {
741 /* Reset message: remote need resync */
742
743 /* reinit counters for a resync */
744 ps->lastpush = 0;
745 ps->teaching_origin = ps->pushed = ps->table->table->update;
746
747 /* reset teaching flags to 0 */
748 ps->flags &= PEER_TEACH_RESET;
749
750 /* flag to start to teach lesson */
751 ps->flags |= PEER_F_TEACH_PROCESS;
752 }
753 else if (c == 'F') {
754 /* Finish message, all known updates have been pushed by remote */
755 /* and remote is up to date */
756
757 /* If resync is in progress with remote peer */
758 if (ps->flags & PEER_F_LEARN_ASSIGN) {
759
760 /* unassign current peer for learning */
761 ps->flags &= ~PEER_F_LEARN_ASSIGN;
762 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
763
764 /* Consider table is now up2date, resync resync no more needed from local neither remote */
765 ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE);
766 }
767 /* Increase confirm counter to launch a confirm message */
768 ps->confirm++;
769 }
770 else if (c == 'c') {
771 /* confirm message, remote peer is now up to date with us */
772
773 /* If stopping state */
774 if (stopping) {
775 /* Close session, push resync no more needed */
776 ps->flags |= PEER_F_TEACH_COMPLETE;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100777 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200778 goto switchstate;
779 }
780
781 /* reset teaching flags to 0 */
782 ps->flags &= PEER_TEACH_RESET;
783 }
784 else if (c == 'C') {
785 /* Continue message, all known updates have been pushed by remote */
786 /* but remote is not up to date */
787
788 /* If resync is in progress with current peer */
789 if (ps->flags & PEER_F_LEARN_ASSIGN) {
790
791 /* unassign current peer */
792 ps->flags &= ~PEER_F_LEARN_ASSIGN;
793 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
794
795 /* flag current peer is not up 2 date to try from an other */
796 ps->flags |= PEER_F_LEARN_NOTUP2DATE;
797
798 /* reschedule a resync */
799 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
800 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
801 }
802 ps->confirm++;
803 }
804 else if (c == 'A') {
805 /* ack message */
806 uint32_t netinteger;
807
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100808 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200809 if (reql <= 0) /* closed or EOL not found */
810 goto incomplete;
811
Emeric Brun2b920a12010-09-23 18:30:22 +0200812 totl += reql;
813
814 /* Consider remote is up to date with "acked" version */
815 ps->update = ntohl(netinteger);
816 }
817 else {
818 /* Unknown message */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100819 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200820 goto switchstate;
821 }
822
823 /* skip consumed message */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100824 bo_skip(si_oc(si), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200825
826 /* loop on that state to peek next message */
Willy Tarreau72d6c162013-04-11 16:14:13 +0200827 goto switchstate;
828
Emeric Brun2b920a12010-09-23 18:30:22 +0200829incomplete:
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200830 /* we get here when a bo_getblk() returns <= 0 in reql */
831
832 /* first, we may have to release newts */
833 if (newts) {
834 stksess_free(ps->table->table, newts);
835 newts = NULL;
836 }
837
Willy Tarreau72d6c162013-04-11 16:14:13 +0200838 if (reql < 0) {
839 /* there was an error */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100840 appctx->st0 = PEER_SESS_ST_END;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200841 goto switchstate;
842 }
843
Emeric Brun2b920a12010-09-23 18:30:22 +0200844 /* Nothing to read, now we start to write */
845
846 /* Confirm finished or partial messages */
847 while (ps->confirm) {
848 /* There is a confirm messages to send */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100849 repl = bi_putchr(si_ic(si), 'c');
Emeric Brun2b920a12010-09-23 18:30:22 +0200850 if (repl <= 0) {
851 /* no more write possible */
852 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100853 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100854 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200855 goto switchstate;
856 }
857 ps->confirm--;
858 }
859
860 /* Need to request a resync */
861 if ((ps->flags & PEER_F_LEARN_ASSIGN) &&
862 (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) &&
863 !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
864 /* Current peer was elected to request a resync */
865
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100866 repl = bi_putchr(si_ic(si), 'R');
Emeric Brun2b920a12010-09-23 18:30:22 +0200867 if (repl <= 0) {
868 /* no more write possible */
869 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100870 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100871 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200872 goto switchstate;
873 }
874 ps->table->flags |= SHTABLE_F_RESYNC_PROCESS;
875 }
876
877 /* It remains some updates to ack */
878 if (ps->pushack != ps->lastack) {
879 uint32_t netinteger;
880
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100881 trash.str[0] = 'A';
Emeric Brun2b920a12010-09-23 18:30:22 +0200882 netinteger = htonl(ps->pushack);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100883 memcpy(&trash.str[1], &netinteger, sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200884
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100885 repl = bi_putblk(si_ic(si), trash.str, 1+sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200886 if (repl <= 0) {
887 /* no more write possible */
888 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100889 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100890 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200891 goto switchstate;
892 }
893 ps->lastack = ps->pushack;
894 }
895
896 if (ps->flags & PEER_F_TEACH_PROCESS) {
897 /* current peer was requested for a lesson */
898
899 if (!(ps->flags & PEER_F_TEACH_STAGE1)) {
900 /* lesson stage 1 not complete */
901 struct eb32_node *eb;
902
903 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
904 while (1) {
905 int msglen;
906 struct stksess *ts;
907
908 if (!eb) {
909 /* flag lesson stage1 complete */
910 ps->flags |= PEER_F_TEACH_STAGE1;
911 eb = eb32_first(&ps->table->table->updates);
912 if (eb)
913 ps->pushed = eb->key - 1;
914 break;
915 }
916
917 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100918 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200919 if (msglen) {
920 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100921 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200922 if (repl <= 0) {
923 /* no more write possible */
924 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100925 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100926 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200927 goto switchstate;
928 }
929 ps->lastpush = ps->pushed = ts->upd.key;
930 }
931 eb = eb32_next(eb);
932 }
933 } /* !TEACH_STAGE1 */
934
935 if (!(ps->flags & PEER_F_TEACH_STAGE2)) {
936 /* lesson stage 2 not complete */
937 struct eb32_node *eb;
938
939 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
940 while (1) {
941 int msglen;
942 struct stksess *ts;
943
944 if (!eb || eb->key > ps->teaching_origin) {
945 /* flag lesson stage1 complete */
946 ps->flags |= PEER_F_TEACH_STAGE2;
947 ps->pushed = ps->teaching_origin;
948 break;
949 }
950
951 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100952 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200953 if (msglen) {
954 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100955 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200956 if (repl <= 0) {
957 /* no more write possible */
958 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100959 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100960 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200961 goto switchstate;
962 }
963 ps->lastpush = ps->pushed = ts->upd.key;
964 }
965 eb = eb32_next(eb);
966 }
967 } /* !TEACH_STAGE2 */
968
969 if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
970 /* process final lesson message */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100971 repl = bi_putchr(si_ic(si), ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
Emeric Brun2b920a12010-09-23 18:30:22 +0200972 if (repl <= 0) {
973 /* no more write possible */
974 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100975 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100976 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200977 goto switchstate;
978 }
979
980 /* flag finished message sent */
981 ps->flags |= PEER_F_TEACH_FINISHED;
982 } /* !TEACH_FINISHED */
983 } /* TEACH_PROCESS */
984
985 if (!(ps->flags & PEER_F_LEARN_ASSIGN) &&
986 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
987 /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */
988 struct eb32_node *eb;
989
990 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
991 while (1) {
992 int msglen;
993 struct stksess *ts;
994
995 /* push local updates */
996 if (!eb) {
997 eb = eb32_first(&ps->table->table->updates);
998 if (!eb || ((int)(eb->key - ps->pushed) <= 0)) {
999 ps->pushed = ps->table->table->localupdate;
1000 break;
1001 }
1002 }
1003
1004 if ((int)(eb->key - ps->table->table->localupdate) > 0) {
1005 ps->pushed = ps->table->table->localupdate;
1006 break;
1007 }
1008
1009 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001010 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +02001011 if (msglen) {
1012 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001013 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +02001014 if (repl <= 0) {
1015 /* no more write possible */
1016 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +01001017 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001018 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001019 goto switchstate;
1020 }
1021 ps->lastpush = ps->pushed = ts->upd.key;
1022 }
1023 eb = eb32_next(eb);
1024 }
1025 } /* ! LEARN_ASSIGN */
1026 /* noting more to do */
1027 goto out;
1028 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001029 case PEER_SESS_ST_EXIT:
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001030 repl = snprintf(trash.str, trash.size, "%d\n", appctx->st1);
Emeric Brun2b920a12010-09-23 18:30:22 +02001031
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001032 if (bi_putblk(si_ic(si), trash.str, repl) == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +01001033 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001034 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001035 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001036 case PEER_SESS_ST_END: {
Willy Tarreau73b013b2012-05-21 16:31:45 +02001037 si_shutw(si);
1038 si_shutr(si);
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001039 si_ic(si)->flags |= CF_READ_NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001040 goto quit;
1041 }
1042 }
1043 }
1044out:
Willy Tarreau73b013b2012-05-21 16:31:45 +02001045 si_update(si);
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001046 si_oc(si)->flags |= CF_READ_DONTWAIT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001047 /* we don't want to expire timeouts while we're processing requests */
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001048 si_ic(si)->rex = TICK_ETERNITY;
1049 si_oc(si)->wex = TICK_ETERNITY;
Emeric Brun2b920a12010-09-23 18:30:22 +02001050quit:
1051 return;
Willy Tarreaubc18da12015-03-13 14:00:47 +01001052full:
1053 si->flags |= SI_FL_WAIT_ROOM;
1054 goto out;
Emeric Brun2b920a12010-09-23 18:30:22 +02001055}
1056
Willy Tarreaub24281b2011-02-13 13:16:36 +01001057static struct si_applet peer_applet = {
Willy Tarreau3fdb3662012-11-12 00:42:33 +01001058 .obj_type = OBJ_TYPE_APPLET,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001059 .name = "<PEER>", /* used for logging */
1060 .fct = peer_io_handler,
Aman Gupta9a13e842012-04-02 18:57:53 -07001061 .release = peer_session_release,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001062};
Emeric Brun2b920a12010-09-23 18:30:22 +02001063
1064/*
1065 * Use this function to force a close of a peer session
1066 */
Willy Tarreau87b09662015-04-03 00:22:06 +02001067static void peer_session_forceshutdown(struct stream * stream)
Emeric Brun2b920a12010-09-23 18:30:22 +02001068{
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001069 struct stream_interface *oldsi = NULL;
1070 struct appctx *appctx = NULL;
1071 int i;
Emeric Brun2b920a12010-09-23 18:30:22 +02001072
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001073 for (i = 0; i <= 1; i++) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001074 appctx = objt_appctx(stream->si[i].end);
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001075 if (!appctx)
1076 continue;
1077 if (appctx->applet != &peer_applet)
1078 continue;
1079
Willy Tarreau87b09662015-04-03 00:22:06 +02001080 oldsi = &stream->si[i];
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001081 break;
Emeric Brun2b920a12010-09-23 18:30:22 +02001082 }
1083
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001084 if (!appctx)
1085 return;
1086
Emeric Brun2b920a12010-09-23 18:30:22 +02001087 /* call release to reinit resync states if needed */
1088 peer_session_release(oldsi);
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001089 appctx->st0 = PEER_SESS_ST_END;
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001090 appctx->ctx.peers.ptr = NULL;
Willy Tarreau87b09662015-04-03 00:22:06 +02001091 task_wakeup(stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001092}
1093
Willy Tarreau91d96282015-03-13 15:47:26 +01001094/* Pre-configures a peers frontend to accept incoming connections */
1095void peers_setup_frontend(struct proxy *fe)
1096{
1097 fe->last_change = now.tv_sec;
1098 fe->cap = PR_CAP_FE;
1099 fe->maxconn = 0;
1100 fe->conn_retries = CONN_RETRIES;
1101 fe->timeout.client = MS_TO_TICKS(5000);
Willy Tarreaud1d48d42015-03-13 16:15:46 +01001102 fe->accept = frontend_accept;
Willy Tarreauf87ab942015-03-13 15:55:16 +01001103 fe->default_target = &peer_applet.obj_type;
Willy Tarreau91d96282015-03-13 15:47:26 +01001104 fe->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
1105}
1106
Emeric Brun2b920a12010-09-23 18:30:22 +02001107/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01001108 * Create a new peer session in assigned state (connect will start automatically)
Emeric Brun2b920a12010-09-23 18:30:22 +02001109 */
Willy Tarreau87b09662015-04-03 00:22:06 +02001110static struct stream *peer_session_create(struct peer *peer, struct peer_session *ps)
Emeric Brun2b920a12010-09-23 18:30:22 +02001111{
Willy Tarreau4348fad2012-09-20 16:48:07 +02001112 struct listener *l = LIST_NEXT(&peer->peers->peers_fe->conf.listeners, struct listener *, by_fe);
Emeric Brun2b920a12010-09-23 18:30:22 +02001113 struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001114 struct appctx *appctx;
Willy Tarreau15b5e142015-04-04 14:38:25 +02001115 struct session *sess;
Willy Tarreau87b09662015-04-03 00:22:06 +02001116 struct stream *s;
Emeric Brun2b920a12010-09-23 18:30:22 +02001117 struct task *t;
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001118 struct connection *conn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001119
Willy Tarreaud990baf2015-04-05 00:32:03 +02001120 ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
1121 ps->statuscode = PEER_SESS_SC_CONNECTCODE;
1122 s = NULL;
1123
1124 appctx = appctx_new(&peer_applet);
1125 if (!appctx)
1126 goto out_close;
1127
1128 appctx->st0 = PEER_SESS_ST_CONNECT;
1129 appctx->ctx.peers.ptr = (void *)ps;
1130
Willy Tarreau4099a7c2015-04-05 00:39:55 +02001131 sess = session_new(p, l, &appctx->obj_type);
Willy Tarreau15b5e142015-04-04 14:38:25 +02001132 if (!sess) {
Godbach430f2912013-06-20 13:28:38 +08001133 Alert("out of memory in peer_session_create().\n");
Willy Tarreaud990baf2015-04-05 00:32:03 +02001134 goto out_free_appctx;
Emeric Brun2b920a12010-09-23 18:30:22 +02001135 }
1136
Willy Tarreau8baf9062015-04-05 00:46:36 +02001137 if ((t = task_new()) == NULL) {
Willy Tarreau15b5e142015-04-04 14:38:25 +02001138 Alert("out of memory in peer_session_create().\n");
1139 goto out_free_sess;
1140 }
Willy Tarreau8baf9062015-04-05 00:46:36 +02001141 t->nice = l->nice;
1142
Willy Tarreau02d86382015-04-05 12:00:52 +02001143 if ((s = stream_new(sess, t)) == NULL) {
Willy Tarreau342bfb12015-04-05 01:35:34 +02001144 Alert("Failed to initialize stream in peer_session_create().\n");
Willy Tarreau8baf9062015-04-05 00:46:36 +02001145 goto out_free_task;
1146 }
1147
Willy Tarreau342bfb12015-04-05 01:35:34 +02001148 /* The tasks below are normally what is supposed to be done by
1149 * fe->accept().
1150 */
Willy Tarreaue7dff022015-04-03 01:14:29 +02001151 s->flags = SF_ASSIGNED|SF_ADDR_SET;
Emeric Brun2b920a12010-09-23 18:30:22 +02001152
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001153 /* initiate an outgoing connection */
1154 si_set_state(&s->si[1], SI_ST_ASS);
Emeric Brun2b920a12010-09-23 18:30:22 +02001155 s->si[1].conn_retries = p->conn_retries;
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001156
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001157 /* automatically prepare the stream interface to connect to the
Willy Tarreaub363a1f2013-10-01 10:45:07 +02001158 * pre-initialized connection in si->conn.
1159 */
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001160 if (unlikely((conn = conn_new()) == NULL))
Willy Tarreau8baf9062015-04-05 00:46:36 +02001161 goto out_free_strm;
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001162
1163 conn_prepare(conn, peer->proto, peer->xprt);
1164 si_attach_conn(&s->si[1], conn);
1165
1166 conn->target = s->target = &s->be->obj_type;
1167 memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
Emeric Brun2b920a12010-09-23 18:30:22 +02001168 s->do_log = NULL;
1169
1170 /* default error reporting function, may be changed by analysers */
1171 s->srv_error = default_srv_error;
Emeric Brun2b920a12010-09-23 18:30:22 +02001172 s->uniq_id = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +02001173
1174 /* note: this should not happen anymore since there's always at least the switching rules */
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001175 if (!s->req.analysers) {
1176 channel_auto_connect(&s->req);/* don't wait to establish connection */
Willy Tarreau342bfb12015-04-05 01:35:34 +02001177 channel_auto_close(&s->req); /* let the producer forward close requests */
Emeric Brun2b920a12010-09-23 18:30:22 +02001178 }
1179
Willy Tarreaue36cbcb2015-04-03 15:40:56 +02001180 s->req.rto = s->sess->fe->timeout.client;
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001181 s->req.wto = s->be->timeout.server;
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001182 s->res.rto = s->be->timeout.server;
Willy Tarreaue36cbcb2015-04-03 15:40:56 +02001183 s->res.wto = s->sess->fe->timeout.client;
Emeric Brun2b920a12010-09-23 18:30:22 +02001184
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001185 s->res.flags |= CF_READ_DONTWAIT;
Willy Tarreau696a2912014-11-24 11:36:57 +01001186
Emeric Brun2b920a12010-09-23 18:30:22 +02001187 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
1188 p->feconn++;/* beconn will be increased later */
1189 jobs++;
Willy Tarreaufb0afa72015-04-03 14:46:27 +02001190 if (!(s->sess->listener->options & LI_O_UNLIMITED))
Willy Tarreau3c63fd82011-09-07 18:00:47 +02001191 actconn++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001192 totalconn++;
1193
1194 return s;
1195
1196 /* Error unrolling */
Willy Tarreau15b5e142015-04-04 14:38:25 +02001197 out_free_strm:
Emeric Brun2b920a12010-09-23 18:30:22 +02001198 LIST_DEL(&s->list);
Willy Tarreau87b09662015-04-03 00:22:06 +02001199 pool_free2(pool2_stream, s);
Willy Tarreau8baf9062015-04-05 00:46:36 +02001200 out_free_task:
1201 task_free(t);
Willy Tarreau15b5e142015-04-04 14:38:25 +02001202 out_free_sess:
Willy Tarreau11c36242015-04-04 15:54:03 +02001203 session_free(sess);
Willy Tarreaud990baf2015-04-05 00:32:03 +02001204 out_free_appctx:
1205 appctx_free(appctx);
Emeric Brun2b920a12010-09-23 18:30:22 +02001206 out_close:
1207 return s;
1208}
1209
1210/*
1211 * Task processing function to manage re-connect and peer session
1212 * tasks wakeup on local update.
1213 */
Simon Horman96553772011-06-08 09:18:51 +09001214static struct task *process_peer_sync(struct task * task)
Emeric Brun2b920a12010-09-23 18:30:22 +02001215{
1216 struct shared_table *st = (struct shared_table *)task->context;
1217 struct peer_session *ps;
1218
1219 task->expire = TICK_ETERNITY;
1220
1221 if (!stopping) {
1222 /* Normal case (not soft stop)*/
1223 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) &&
1224 (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) &&
1225 !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) {
1226 /* Resync from local peer needed
1227 no peer was assigned for the lesson
1228 and no old local peer found
1229 or resync timeout expire */
1230
1231 /* flag no more resync from local, to try resync from remotes */
1232 st->flags |= SHTABLE_F_RESYNC_LOCAL;
1233
1234 /* reschedule a resync */
1235 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1236 }
1237
1238 /* For each session */
1239 for (ps = st->sessions; ps; ps = ps->next) {
1240 /* For each remote peers */
1241 if (!ps->peer->local) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001242 if (!ps->stream) {
1243 /* no active stream */
Emeric Brun2b920a12010-09-23 18:30:22 +02001244 if (ps->statuscode == 0 ||
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001245 ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
1246 ((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
1247 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
Emeric Brun2b920a12010-09-23 18:30:22 +02001248 tick_is_expired(ps->reconnect, now_ms))) {
1249 /* connection never tried
Willy Tarreau87b09662015-04-03 00:22:06 +02001250 * or previous stream established with success
1251 * or previous stream failed during connection
Emeric Brun2b920a12010-09-23 18:30:22 +02001252 * and reconnection timer is expired */
1253
1254 /* retry a connect */
Willy Tarreau87b09662015-04-03 00:22:06 +02001255 ps->stream = peer_session_create(ps->peer, ps);
Emeric Brun2b920a12010-09-23 18:30:22 +02001256 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001257 else if (ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
1258 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001259 /* If previous session failed during connection
1260 * but reconnection timer is not expired */
1261
1262 /* reschedule task for reconnect */
1263 task->expire = tick_first(task->expire, ps->reconnect);
1264 }
1265 /* else do nothing */
Willy Tarreau87b09662015-04-03 00:22:06 +02001266 } /* !ps->stream */
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001267 else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001268 /* current stream is active and established */
Emeric Brun2b920a12010-09-23 18:30:22 +02001269 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1270 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1271 !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
1272 /* Resync from a remote is needed
1273 * and no peer was assigned for lesson
1274 * and current peer may be up2date */
1275
1276 /* assign peer for the lesson */
1277 ps->flags |= PEER_F_LEARN_ASSIGN;
1278 st->flags |= SHTABLE_F_RESYNC_ASSIGN;
1279
Willy Tarreau87b09662015-04-03 00:22:06 +02001280 /* awake peer stream task to handle a request of resync */
1281 task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001282 }
1283 else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001284 /* awake peer stream task to push local updates */
1285 task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001286 }
1287 /* else do nothing */
1288 } /* SUCCESSCODE */
1289 } /* !ps->peer->local */
1290 } /* for */
1291
1292 /* Resync from remotes expired: consider resync is finished */
1293 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1294 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1295 tick_is_expired(st->resync_timeout, now_ms)) {
1296 /* Resync from remote peer needed
1297 * no peer was assigned for the lesson
1298 * and resync timeout expire */
1299
1300 /* flag no more resync from remote, consider resync is finished */
1301 st->flags |= SHTABLE_F_RESYNC_REMOTE;
1302 }
1303
1304 if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) {
1305 /* Resync not finished*/
1306 /* reschedule task to resync timeout, to ended resync if needed */
1307 task->expire = tick_first(task->expire, st->resync_timeout);
1308 }
1309 } /* !stopping */
1310 else {
1311 /* soft stop case */
1312 if (task->state & TASK_WOKEN_SIGNAL) {
1313 /* We've just recieved the signal */
1314 if (!(st->flags & SHTABLE_F_DONOTSTOP)) {
1315 /* add DO NOT STOP flag if not present */
1316 jobs++;
1317 st->flags |= SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001318 st->table->syncing++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001319 }
1320
1321 /* disconnect all connected peers */
1322 for (ps = st->sessions; ps; ps = ps->next) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001323 if (ps->stream) {
1324 peer_session_forceshutdown(ps->stream);
1325 ps->stream = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001326 }
1327 }
1328 }
1329 ps = st->local_session;
1330
1331 if (ps->flags & PEER_F_TEACH_COMPLETE) {
1332 if (st->flags & SHTABLE_F_DONOTSTOP) {
1333 /* resync of new process was complete, current process can die now */
1334 jobs--;
1335 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001336 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001337 }
1338 }
Willy Tarreau87b09662015-04-03 00:22:06 +02001339 else if (!ps->stream) {
1340 /* If stream is not active */
Emeric Brun2b920a12010-09-23 18:30:22 +02001341 if (ps->statuscode == 0 ||
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001342 ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
1343 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
1344 ps->statuscode == PEER_SESS_SC_TRYAGAIN) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001345 /* connection never tried
Willy Tarreau87b09662015-04-03 00:22:06 +02001346 * or previous stream was successfully established
1347 * or previous stream tcp connect success but init state incomplete
Emeric Brun2b920a12010-09-23 18:30:22 +02001348 * or during previous connect, peer replies a try again statuscode */
1349
1350 /* connect to the peer */
Willy Tarreau87b09662015-04-03 00:22:06 +02001351 ps->stream = peer_session_create(ps->peer, ps);
Emeric Brun2b920a12010-09-23 18:30:22 +02001352 }
1353 else {
1354 /* Other error cases */
1355 if (st->flags & SHTABLE_F_DONOTSTOP) {
1356 /* unable to resync new process, current process can die now */
1357 jobs--;
1358 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001359 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001360 }
1361 }
1362 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001363 else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE &&
Emeric Brun2b920a12010-09-23 18:30:22 +02001364 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001365 /* current stream active and established
1366 awake stream to push remaining local updates */
1367 task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001368 }
1369 } /* stopping */
1370 /* Wakeup for re-connect */
1371 return task;
1372}
1373
1374/*
1375 * Function used to register a table for sync on a group of peers
1376 *
1377 */
1378void peers_register_table(struct peers *peers, struct stktable *table)
1379{
1380 struct shared_table *st;
1381 struct peer * curpeer;
1382 struct peer_session *ps;
Willy Tarreau4348fad2012-09-20 16:48:07 +02001383 struct listener *listener;
Emeric Brun2b920a12010-09-23 18:30:22 +02001384
1385 st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
1386 st->table = table;
1387 st->next = peers->tables;
1388 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1389 peers->tables = st;
1390
1391 for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
1392 ps = (struct peer_session *)calloc(1,sizeof(struct peer_session));
1393 ps->table = st;
1394 ps->peer = curpeer;
1395 if (curpeer->local)
1396 st->local_session = ps;
1397 ps->next = st->sessions;
1398 ps->reconnect = now_ms;
1399 st->sessions = ps;
1400 peers->peers_fe->maxconn += 3;
1401 }
1402
Willy Tarreau4348fad2012-09-20 16:48:07 +02001403 list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
1404 listener->maxconn = peers->peers_fe->maxconn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001405 st->sync_task = task_new();
1406 st->sync_task->process = process_peer_sync;
1407 st->sync_task->expire = TICK_ETERNITY;
1408 st->sync_task->context = (void *)st;
1409 table->sync_task =st->sync_task;
1410 signal_register_task(0, table->sync_task, 0);
1411 task_wakeup(st->sync_task, TASK_WOKEN_INIT);
1412}
1413