blob: 57954d74a049680ddaa2eca52c0a1f05bbe34303 [file] [log] [blame]
Emeric Brun2b920a12010-09-23 18:30:22 +02001/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01002 * Stick table synchro management.
Emeric Brun2b920a12010-09-23 18:30:22 +02003 *
4 * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <sys/socket.h>
20#include <sys/stat.h>
21#include <sys/types.h>
22
23#include <common/compat.h>
24#include <common/config.h>
25#include <common/time.h>
26
27#include <types/global.h>
Willy Tarreau3fdb3662012-11-12 00:42:33 +010028#include <types/listener.h>
29#include <types/obj_type.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020030#include <types/peers.h>
31
32#include <proto/acl.h>
Willy Tarreauc7e42382012-08-24 19:22:53 +020033#include <proto/channel.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020034#include <proto/fd.h>
Willy Tarreaud1d48d42015-03-13 16:15:46 +010035#include <proto/frontend.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020036#include <proto/log.h>
37#include <proto/hdr_idx.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020038#include <proto/proto_tcp.h>
39#include <proto/proto_http.h>
40#include <proto/proxy.h>
Willy Tarreaufeb76402015-04-03 14:10:06 +020041#include <proto/session.h>
Willy Tarreau87b09662015-04-03 00:22:06 +020042#include <proto/stream.h>
Willy Tarreau22ec1ea2014-11-27 20:45:39 +010043#include <proto/signal.h>
44#include <proto/stick_table.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020045#include <proto/stream_interface.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020046#include <proto/task.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020047
48
49/*******************************/
50/* Current peer learning state */
51/*******************************/
52
53/******************************/
54/* Current table resync state */
55/******************************/
56#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
57#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
58#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
59#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
60#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
61 to push data to new process */
62
63#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
64#define SHTABLE_RESYNC_FROMLOCAL 0x00000000
65#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL
66#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
67
68/******************************/
69/* Remote peer teaching state */
70/******************************/
71#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
72#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */
73#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */
74#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
75#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
76#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
77#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
78
79#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
80#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
81
82
83/**********************************/
84/* Peer Session IO handler states */
85/**********************************/
86
Willy Tarreaue4d927a2013-12-01 12:47:35 +010087enum {
88 PEER_SESS_ST_ACCEPT = 0, /* Initial state for session create by an accept, must be zero! */
89 PEER_SESS_ST_GETVERSION, /* Validate supported protocol version */
90 PEER_SESS_ST_GETHOST, /* Validate host ID correspond to local host id */
91 PEER_SESS_ST_GETPEER, /* Validate peer ID correspond to a known remote peer id */
92 PEER_SESS_ST_GETTABLE, /* Search into registered table for a table with same id and validate type and size */
93 /* after this point, data were possibly exchanged */
94 PEER_SESS_ST_SENDSUCCESS, /* Send ret code 200 (success) and wait for message */
95 PEER_SESS_ST_CONNECT, /* Initial state for session create on a connect, push presentation into buffer */
96 PEER_SESS_ST_GETSTATUS, /* Wait for the welcome message */
97 PEER_SESS_ST_WAITMSG, /* Wait for data messages */
98 PEER_SESS_ST_EXIT, /* Exit with status code */
99 PEER_SESS_ST_END, /* Killed session */
100};
Emeric Brun2b920a12010-09-23 18:30:22 +0200101
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100102/***************************************************/
103/* Peer Session status code - part of the protocol */
104/***************************************************/
Emeric Brun2b920a12010-09-23 18:30:22 +0200105
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100106#define PEER_SESS_SC_CONNECTCODE 100 /* connect in progress */
107#define PEER_SESS_SC_CONNECTEDCODE 110 /* tcp connect success */
Emeric Brun2b920a12010-09-23 18:30:22 +0200108
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100109#define PEER_SESS_SC_SUCCESSCODE 200 /* accept or connect successful */
Emeric Brun2b920a12010-09-23 18:30:22 +0200110
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100111#define PEER_SESS_SC_TRYAGAIN 300 /* try again later */
Emeric Brun2b920a12010-09-23 18:30:22 +0200112
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100113#define PEER_SESS_SC_ERRPROTO 501 /* error protocol */
114#define PEER_SESS_SC_ERRVERSION 502 /* unknown protocol version */
115#define PEER_SESS_SC_ERRHOST 503 /* bad host name */
116#define PEER_SESS_SC_ERRPEER 504 /* unknown peer */
117#define PEER_SESS_SC_ERRTYPE 505 /* table key type mismatch */
118#define PEER_SESS_SC_ERRSIZE 506 /* table key size mismatch */
119#define PEER_SESS_SC_ERRTABLE 507 /* unknown table */
Emeric Brun2b920a12010-09-23 18:30:22 +0200120
121#define PEER_SESSION_PROTO_NAME "HAProxyS"
122
123struct peers *peers = NULL;
Willy Tarreau87b09662015-04-03 00:22:06 +0200124static void peer_session_forceshutdown(struct stream * stream);
Emeric Brun2b920a12010-09-23 18:30:22 +0200125
126
127/*
128 * This prepare the data update message of the stick session <ts>, <ps> is the the peer session
129 * where the data going to be pushed, <msg> is a buffer of <size> to recieve data message content
130 */
Simon Horman96553772011-06-08 09:18:51 +0900131static int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size)
Emeric Brun2b920a12010-09-23 18:30:22 +0200132{
133 uint32_t netinteger;
134 int len;
135 /* construct message */
136 if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) {
137 msg[0] = 0x80 + ts->upd.key - ps->lastpush;
138 len = sizeof(char);
139 }
140 else {
141 msg[0] = 'D';
142 netinteger = htonl(ts->upd.key);
143 memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger));
144 len = sizeof(char) + sizeof(netinteger);
145 }
146
147 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
148 int stlen = strlen((char *)ts->key.key);
149
150 netinteger = htonl(strlen((char *)ts->key.key));
151 memcpy(&msg[len], &netinteger, sizeof(netinteger));
152 memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen);
153 len += sizeof(netinteger) + stlen;
154
155 }
156 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
157 netinteger = htonl(*((uint32_t *)ts->key.key));
158 memcpy(&msg[len], &netinteger, sizeof(netinteger));
159 len += sizeof(netinteger);
160 }
161 else {
162 memcpy(&msg[len], ts->key.key, ps->table->table->key_size);
163 len += ps->table->table->key_size;
164 }
165
166 if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
167 netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id));
168 else
169 netinteger = 0;
170
171 memcpy(&msg[len], &netinteger , sizeof(netinteger));
172 len += sizeof(netinteger);
173
174 return len;
175}
176
177
178/*
179 * Callback to release a session with a peer
180 */
Simon Horman96553772011-06-08 09:18:51 +0900181static void peer_session_release(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200182{
Willy Tarreau87b09662015-04-03 00:22:06 +0200183 struct stream *s = si_strm(si);
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100184 struct appctx *appctx = objt_appctx(si->end);
185 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200186
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100187 /* appctx->ctx.peers.ptr is not a peer session */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100188 if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
Emeric Brun2b920a12010-09-23 18:30:22 +0200189 return;
190
191 /* peer session identified */
192 if (ps) {
Willy Tarreau87b09662015-04-03 00:22:06 +0200193 if (ps->stream == s) {
194 ps->stream = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200195 if (ps->flags & PEER_F_LEARN_ASSIGN) {
196 /* unassign current peer for learning */
197 ps->flags &= ~(PEER_F_LEARN_ASSIGN);
198 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
199
200 /* reschedule a resync */
201 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
202 }
203 /* reset teaching and learning flags to 0 */
204 ps->flags &= PEER_TEACH_RESET;
205 ps->flags &= PEER_LEARN_RESET;
206 }
207 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
208 }
209}
210
211
212/*
213 * IO Handler to handle message exchance with a peer
214 */
Willy Tarreaub24281b2011-02-13 13:16:36 +0100215static void peer_io_handler(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200216{
Willy Tarreau87b09662015-04-03 00:22:06 +0200217 struct stream *s = si_strm(si);
Willy Tarreaud0d8da92015-04-04 02:10:38 +0200218 struct peers *curpeers = (struct peers *)strm_fe(s)->parent;
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100219 struct appctx *appctx = objt_appctx(si->end);
Emeric Brun2b920a12010-09-23 18:30:22 +0200220 int reql = 0;
221 int repl = 0;
222
223 while (1) {
224switchstate:
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100225 switch(appctx->st0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100226 case PEER_SESS_ST_ACCEPT:
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100227 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100228 appctx->st0 = PEER_SESS_ST_GETVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200229 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100230 case PEER_SESS_ST_GETVERSION:
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100231 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200232 if (reql <= 0) { /* closed or EOL not found */
233 if (reql == 0)
234 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100235 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200236 goto switchstate;
237 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100238 if (trash.str[reql-1] != '\n') {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100239 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200240 goto switchstate;
241 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100242 else if (reql > 1 && (trash.str[reql-2] == '\r'))
243 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200244 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100245 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200246
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100247 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200248
249 /* test version */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100250 if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash.str) != 0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100251 appctx->st0 = PEER_SESS_ST_EXIT;
252 appctx->st1 = PEER_SESS_SC_ERRVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200253 /* test protocol */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100254 if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100255 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200256 goto switchstate;
257 }
258
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100259 appctx->st0 = PEER_SESS_ST_GETHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200260 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100261 case PEER_SESS_ST_GETHOST:
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100262 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200263 if (reql <= 0) { /* closed or EOL not found */
264 if (reql == 0)
265 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100266 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200267 goto switchstate;
268 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100269 if (trash.str[reql-1] != '\n') {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100270 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200271 goto switchstate;
272 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100273 else if (reql > 1 && (trash.str[reql-2] == '\r'))
274 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200275 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100276 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200277
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100278 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200279
280 /* test hostname match */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100281 if (strcmp(localpeer, trash.str) != 0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100282 appctx->st0 = PEER_SESS_ST_EXIT;
283 appctx->st1 = PEER_SESS_SC_ERRHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200284 goto switchstate;
285 }
286
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100287 appctx->st0 = PEER_SESS_ST_GETPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200288 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100289 case PEER_SESS_ST_GETPEER: {
Emeric Brun2b920a12010-09-23 18:30:22 +0200290 struct peer *curpeer;
291 char *p;
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100292 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200293 if (reql <= 0) { /* closed or EOL not found */
294 if (reql == 0)
295 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100296 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200297 goto switchstate;
298 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100299 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200300 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100301 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200302 goto switchstate;
303 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100304 else if (reql > 1 && (trash.str[reql-2] == '\r'))
305 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200306 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100307 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200308
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100309 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200310
311 /* parse line "<peer name> <pid>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100312 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200313 if (!p) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100314 appctx->st0 = PEER_SESS_ST_EXIT;
315 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200316 goto switchstate;
317 }
318 *p = 0;
319
320 /* lookup known peer */
321 for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) {
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100322 if (strcmp(curpeer->id, trash.str) == 0)
Emeric Brun2b920a12010-09-23 18:30:22 +0200323 break;
324 }
325
326 /* if unknown peer */
327 if (!curpeer) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100328 appctx->st0 = PEER_SESS_ST_EXIT;
329 appctx->st1 = PEER_SESS_SC_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200330 goto switchstate;
331 }
332
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100333 appctx->ctx.peers.ptr = curpeer;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100334 appctx->st0 = PEER_SESS_ST_GETTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200335 /* fall through */
336 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100337 case PEER_SESS_ST_GETTABLE: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100338 struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200339 struct shared_table *st;
340 struct peer_session *ps = NULL;
341 unsigned long key_type;
342 size_t key_size;
343 char *p;
344
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100345 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200346 if (reql <= 0) { /* closed or EOL not found */
347 if (reql == 0)
348 goto out;
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100349 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100350 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200351 goto switchstate;
352 }
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100353 /* Re init appctx->ctx.peers.ptr to null, to handle correctly a release case */
354 appctx->ctx.peers.ptr = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200355
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100356 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200357 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100358 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200359 goto switchstate;
360 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100361 else if (reql > 1 && (trash.str[reql-2] == '\r'))
362 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200363 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100364 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200365
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100366 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200367
368 /* Parse line "<table name> <type> <size>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100369 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200370 if (!p) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100371 appctx->st0 = PEER_SESS_ST_EXIT;
372 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200373 goto switchstate;
374 }
375 *p = 0;
376 key_type = (unsigned long)atol(p+1);
377
378 p = strchr(p+1, ' ');
379 if (!p) {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100380 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100381 appctx->st0 = PEER_SESS_ST_EXIT;
382 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200383 goto switchstate;
384 }
385
386 key_size = (size_t)atoi(p);
387 for (st = curpeers->tables; st; st = st->next) {
388 /* If table name matches */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100389 if (strcmp(st->table->id, trash.str) == 0) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100390 /* Check key size mismatches, except for strings
391 * which may be truncated as long as they fit in
392 * a buffer.
393 */
394 if (key_size != st->table->key_size &&
395 (key_type != STKTABLE_TYPE_STRING ||
396 1 + 4 + 4 + key_size - 1 >= trash.size)) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100397 appctx->st0 = PEER_SESS_ST_EXIT;
398 appctx->st1 = PEER_SESS_SC_ERRSIZE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200399 goto switchstate;
400 }
401
402 /* If key type mismatches */
403 if (key_type != st->table->type) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100404 appctx->st0 = PEER_SESS_ST_EXIT;
405 appctx->st1 = PEER_SESS_SC_ERRTYPE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200406 goto switchstate;
407 }
408
Willy Tarreau87b09662015-04-03 00:22:06 +0200409 /* lookup peer stream of current peer */
Emeric Brun2b920a12010-09-23 18:30:22 +0200410 for (ps = st->sessions; ps; ps = ps->next) {
411 if (ps->peer == curpeer) {
Willy Tarreau87b09662015-04-03 00:22:06 +0200412 /* If stream already active, replaced by new one */
413 if (ps->stream && ps->stream != s) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200414 if (ps->peer->local) {
415 /* Local connection, reply a retry */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100416 appctx->st0 = PEER_SESS_ST_EXIT;
417 appctx->st1 = PEER_SESS_SC_TRYAGAIN;
Emeric Brun2b920a12010-09-23 18:30:22 +0200418 goto switchstate;
419 }
Willy Tarreau87b09662015-04-03 00:22:06 +0200420 peer_session_forceshutdown(ps->stream);
Emeric Brun2b920a12010-09-23 18:30:22 +0200421 }
Willy Tarreau87b09662015-04-03 00:22:06 +0200422 ps->stream = s;
Emeric Brun2b920a12010-09-23 18:30:22 +0200423 break;
424 }
425 }
426 break;
427 }
428 }
429
430 /* If table not found */
431 if (!st){
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100432 appctx->st0 = PEER_SESS_ST_EXIT;
433 appctx->st1 = PEER_SESS_SC_ERRTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200434 goto switchstate;
435 }
436
437 /* If no peer session for current peer */
438 if (!ps) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100439 appctx->st0 = PEER_SESS_ST_EXIT;
440 appctx->st1 = PEER_SESS_SC_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200441 goto switchstate;
442 }
443
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100444 appctx->ctx.peers.ptr = ps;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100445 appctx->st0 = PEER_SESS_ST_SENDSUCCESS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200446 /* fall through */
447 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100448 case PEER_SESS_ST_SENDSUCCESS: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100449 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200450
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100451 repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100452 repl = bi_putblk(si_ic(si), trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200453 if (repl <= 0) {
454 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100455 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100456 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200457 goto switchstate;
458 }
459
460 /* Register status code */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100461 ps->statuscode = PEER_SESS_SC_SUCCESSCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200462
463 /* Awake main task */
464 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
465
466 /* Init cursors */
467 ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0;
468 ps->pushed = ps->update;
469
470 /* Init confirm counter */
471 ps->confirm = 0;
472
473 /* reset teaching and learning flags to 0 */
474 ps->flags &= PEER_TEACH_RESET;
475 ps->flags &= PEER_LEARN_RESET;
476
477 /* if current peer is local */
478 if (ps->peer->local) {
479 /* if table need resyncfrom local and no process assined */
480 if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL &&
481 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
482 /* assign local peer for a lesson, consider lesson already requested */
483 ps->flags |= PEER_F_LEARN_ASSIGN;
484 ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
485 }
486
487 }
488 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
489 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
490 /* assign peer for a lesson */
491 ps->flags |= PEER_F_LEARN_ASSIGN;
492 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
493 }
494 /* switch to waiting message state */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100495 appctx->st0 = PEER_SESS_ST_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200496 goto switchstate;
497 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100498 case PEER_SESS_ST_CONNECT: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100499 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200500
501 /* Send headers */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100502 repl = snprintf(trash.str, trash.size,
Emeric Brun2b920a12010-09-23 18:30:22 +0200503 PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n",
504 ps->peer->id,
505 localpeer,
Willy Tarreau7b77c9f2012-01-07 22:52:12 +0100506 (int)getpid(),
Emeric Brun2b920a12010-09-23 18:30:22 +0200507 ps->table->table->id,
508 ps->table->table->type,
Willy Tarreaubd55e312010-11-11 10:55:09 +0100509 (int)ps->table->table->key_size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200510
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100511 if (repl >= trash.size) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100512 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200513 goto switchstate;
514 }
515
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100516 repl = bi_putblk(si_ic(si), trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200517 if (repl <= 0) {
518 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100519 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100520 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200521 goto switchstate;
522 }
523
524 /* switch to the waiting statuscode state */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100525 appctx->st0 = PEER_SESS_ST_GETSTATUS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200526 /* fall through */
527 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100528 case PEER_SESS_ST_GETSTATUS: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100529 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200530
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100531 if (si_ic(si)->flags & CF_WRITE_PARTIAL)
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100532 ps->statuscode = PEER_SESS_SC_CONNECTEDCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200533
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100534 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200535 if (reql <= 0) { /* closed or EOL not found */
536 if (reql == 0)
537 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100538 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200539 goto switchstate;
540 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100541 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200542 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100543 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200544 goto switchstate;
545 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100546 else if (reql > 1 && (trash.str[reql-2] == '\r'))
547 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200548 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100549 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200550
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100551 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200552
553 /* Register status code */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100554 ps->statuscode = atoi(trash.str);
Emeric Brun2b920a12010-09-23 18:30:22 +0200555
556 /* Awake main task */
557 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
558
559 /* If status code is success */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100560 if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200561 /* Init cursors */
562 ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0;
563 ps->pushed = ps->update;
564
565 /* Init confirm counter */
566 ps->confirm = 0;
567
568 /* reset teaching and learning flags to 0 */
569 ps->flags &= PEER_TEACH_RESET;
570 ps->flags &= PEER_LEARN_RESET;
571
572 /* If current peer is local */
573 if (ps->peer->local) {
574 /* Init cursors to push a resync */
575 ps->teaching_origin = ps->pushed = ps->table->table->update;
576 /* flag to start to teach lesson */
577 ps->flags |= PEER_F_TEACH_PROCESS;
578
579 }
580 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
581 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
582 /* If peer is remote and resync from remote is needed,
583 and no peer currently assigned */
584
585 /* assign peer for a lesson */
586 ps->flags |= PEER_F_LEARN_ASSIGN;
587 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
588 }
589
590 }
591 else {
592 /* Status code is not success, abort */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100593 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200594 goto switchstate;
595 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100596 appctx->st0 = PEER_SESS_ST_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200597 /* fall through */
598 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100599 case PEER_SESS_ST_WAITMSG: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100600 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200601 struct stksess *ts, *newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200602 char c;
603 int totl = 0;
604
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100605 reql = bo_getblk(si_oc(si), (char *)&c, sizeof(c), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200606 if (reql <= 0) /* closed or EOL not found */
607 goto incomplete;
608
Emeric Brun2b920a12010-09-23 18:30:22 +0200609 totl += reql;
610
611 if ((c & 0x80) || (c == 'D')) {
612 /* Here we have data message */
613 unsigned int pushack;
Emeric Brun2b920a12010-09-23 18:30:22 +0200614 int srvid;
615 uint32_t netinteger;
616
617 /* Compute update remote version */
618 if (c & 0x80) {
619 pushack = ps->pushack + (unsigned int)(c & 0x7F);
620 }
621 else {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100622 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200623 if (reql <= 0) /* closed or EOL not found */
624 goto incomplete;
625
Emeric Brun2b920a12010-09-23 18:30:22 +0200626 totl += reql;
627 pushack = ntohl(netinteger);
628 }
629
Willy Tarreau86a446e2013-11-25 23:02:37 +0100630 /* Read key. The string keys are read in two steps, the first step
631 * consists in reading whatever fits into the table directly into
632 * the pre-allocated key. The second step consists in simply
633 * draining all exceeding data. This can happen for example after a
634 * config reload with a smaller key size for the stick table than
635 * what was previously set, or when facing the impossibility to
636 * allocate a new stksess (for example when the table is full with
637 * "nopurge").
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200638 */
Emeric Brun2b920a12010-09-23 18:30:22 +0200639 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100640 unsigned int to_read, to_store;
641
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200642 /* read size first */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100643 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200644 if (reql <= 0) /* closed or EOL not found */
645 goto incomplete;
646
Emeric Brun2b920a12010-09-23 18:30:22 +0200647 totl += reql;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100648
649 to_store = 0;
650 to_read = ntohl(netinteger);
651
Willy Tarreau4e4292b2014-11-28 12:18:45 +0100652 if (to_read + totl > si_ob(si)->size) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100653 /* impossible to read a key this large, abort */
654 reql = -1;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200655 goto incomplete;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100656 }
Willy Tarreau72d6c162013-04-11 16:14:13 +0200657
Willy Tarreau86a446e2013-11-25 23:02:37 +0100658 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200659 if (newts)
Willy Tarreau86a446e2013-11-25 23:02:37 +0100660 to_store = MIN(to_read, ps->table->table->key_size - 1);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200661
Willy Tarreau86a446e2013-11-25 23:02:37 +0100662 /* we read up to two blocks, the first one goes into the key,
663 * the rest is drained into the trash.
664 */
665 if (to_store) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100666 reql = bo_getblk(si_oc(si), (char *)newts->key.key, to_store, totl);
Willy Tarreau86a446e2013-11-25 23:02:37 +0100667 if (reql <= 0) /* closed or incomplete */
668 goto incomplete;
669 newts->key.key[reql] = 0;
670 totl += reql;
671 to_read -= reql;
672 }
673 if (to_read) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100674 reql = bo_getblk(si_oc(si), trash.str, to_read, totl);
Willy Tarreau86a446e2013-11-25 23:02:37 +0100675 if (reql <= 0) /* closed or incomplete */
676 goto incomplete;
677 totl += reql;
678 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200679 }
680 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100681 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200682 if (reql <= 0) /* closed or EOL not found */
683 goto incomplete;
Cyril Bonté9a60ff92014-02-16 01:07:07 +0100684 newts = stksess_new(ps->table->table, NULL);
685 if (newts) {
686 netinteger = ntohl(netinteger);
687 memcpy(newts->key.key, &netinteger, sizeof(netinteger));
688 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200689 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200690 }
691 else {
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200692 /* type ip or binary */
693 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100694 reql = bo_getblk(si_oc(si), newts ? (char *)newts->key.key : trash.str, ps->table->table->key_size, totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200695 if (reql <= 0) /* closed or EOL not found */
696 goto incomplete;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200697 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200698 }
699
700 /* read server id */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100701 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200702 if (reql <= 0) /* closed or EOL not found */
703 goto incomplete;
704
Emeric Brun2b920a12010-09-23 18:30:22 +0200705 totl += reql;
706 srvid = ntohl(netinteger);
707
708 /* update entry */
Emeric Brun2b920a12010-09-23 18:30:22 +0200709 if (newts) {
710 /* lookup for existing entry */
711 ts = stktable_lookup(ps->table->table, newts);
712 if (ts) {
713 /* the entry already exist, we can free ours */
714 stktable_touch(ps->table->table, ts, 0);
715 stksess_free(ps->table->table, newts);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200716 newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200717 }
718 else {
719 struct eb32_node *eb;
720
721 /* create new entry */
722 ts = stktable_store(ps->table->table, newts, 0);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200723 newts = NULL; /* don't reuse it */
724
Emeric Brun2b920a12010-09-23 18:30:22 +0200725 ts->upd.key= (++ps->table->table->update)+(2^31);
726 eb = eb32_insert(&ps->table->table->updates, &ts->upd);
727 if (eb != &ts->upd) {
728 eb32_delete(eb);
729 eb32_insert(&ps->table->table->updates, &ts->upd);
730 }
731 }
732
733 /* update entry */
734 if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
735 stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
736 ps->pushack = pushack;
737 }
738
739 }
740 else if (c == 'R') {
741 /* Reset message: remote need resync */
742
743 /* reinit counters for a resync */
744 ps->lastpush = 0;
745 ps->teaching_origin = ps->pushed = ps->table->table->update;
746
747 /* reset teaching flags to 0 */
748 ps->flags &= PEER_TEACH_RESET;
749
750 /* flag to start to teach lesson */
751 ps->flags |= PEER_F_TEACH_PROCESS;
752 }
753 else if (c == 'F') {
754 /* Finish message, all known updates have been pushed by remote */
755 /* and remote is up to date */
756
757 /* If resync is in progress with remote peer */
758 if (ps->flags & PEER_F_LEARN_ASSIGN) {
759
760 /* unassign current peer for learning */
761 ps->flags &= ~PEER_F_LEARN_ASSIGN;
762 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
763
764 /* Consider table is now up2date, resync resync no more needed from local neither remote */
765 ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE);
766 }
767 /* Increase confirm counter to launch a confirm message */
768 ps->confirm++;
769 }
770 else if (c == 'c') {
771 /* confirm message, remote peer is now up to date with us */
772
773 /* If stopping state */
774 if (stopping) {
775 /* Close session, push resync no more needed */
776 ps->flags |= PEER_F_TEACH_COMPLETE;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100777 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200778 goto switchstate;
779 }
780
781 /* reset teaching flags to 0 */
782 ps->flags &= PEER_TEACH_RESET;
783 }
784 else if (c == 'C') {
785 /* Continue message, all known updates have been pushed by remote */
786 /* but remote is not up to date */
787
788 /* If resync is in progress with current peer */
789 if (ps->flags & PEER_F_LEARN_ASSIGN) {
790
791 /* unassign current peer */
792 ps->flags &= ~PEER_F_LEARN_ASSIGN;
793 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
794
795 /* flag current peer is not up 2 date to try from an other */
796 ps->flags |= PEER_F_LEARN_NOTUP2DATE;
797
798 /* reschedule a resync */
799 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
800 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
801 }
802 ps->confirm++;
803 }
804 else if (c == 'A') {
805 /* ack message */
806 uint32_t netinteger;
807
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100808 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200809 if (reql <= 0) /* closed or EOL not found */
810 goto incomplete;
811
Emeric Brun2b920a12010-09-23 18:30:22 +0200812 totl += reql;
813
814 /* Consider remote is up to date with "acked" version */
815 ps->update = ntohl(netinteger);
816 }
817 else {
818 /* Unknown message */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100819 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200820 goto switchstate;
821 }
822
823 /* skip consumed message */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100824 bo_skip(si_oc(si), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200825
826 /* loop on that state to peek next message */
Willy Tarreau72d6c162013-04-11 16:14:13 +0200827 goto switchstate;
828
Emeric Brun2b920a12010-09-23 18:30:22 +0200829incomplete:
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200830 /* we get here when a bo_getblk() returns <= 0 in reql */
831
832 /* first, we may have to release newts */
833 if (newts) {
834 stksess_free(ps->table->table, newts);
835 newts = NULL;
836 }
837
Willy Tarreau72d6c162013-04-11 16:14:13 +0200838 if (reql < 0) {
839 /* there was an error */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100840 appctx->st0 = PEER_SESS_ST_END;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200841 goto switchstate;
842 }
843
Emeric Brun2b920a12010-09-23 18:30:22 +0200844 /* Nothing to read, now we start to write */
845
846 /* Confirm finished or partial messages */
847 while (ps->confirm) {
848 /* There is a confirm messages to send */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100849 repl = bi_putchr(si_ic(si), 'c');
Emeric Brun2b920a12010-09-23 18:30:22 +0200850 if (repl <= 0) {
851 /* no more write possible */
852 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100853 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100854 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200855 goto switchstate;
856 }
857 ps->confirm--;
858 }
859
860 /* Need to request a resync */
861 if ((ps->flags & PEER_F_LEARN_ASSIGN) &&
862 (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) &&
863 !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
864 /* Current peer was elected to request a resync */
865
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100866 repl = bi_putchr(si_ic(si), 'R');
Emeric Brun2b920a12010-09-23 18:30:22 +0200867 if (repl <= 0) {
868 /* no more write possible */
869 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100870 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100871 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200872 goto switchstate;
873 }
874 ps->table->flags |= SHTABLE_F_RESYNC_PROCESS;
875 }
876
877 /* It remains some updates to ack */
878 if (ps->pushack != ps->lastack) {
879 uint32_t netinteger;
880
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100881 trash.str[0] = 'A';
Emeric Brun2b920a12010-09-23 18:30:22 +0200882 netinteger = htonl(ps->pushack);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100883 memcpy(&trash.str[1], &netinteger, sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200884
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100885 repl = bi_putblk(si_ic(si), trash.str, 1+sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200886 if (repl <= 0) {
887 /* no more write possible */
888 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100889 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100890 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200891 goto switchstate;
892 }
893 ps->lastack = ps->pushack;
894 }
895
896 if (ps->flags & PEER_F_TEACH_PROCESS) {
897 /* current peer was requested for a lesson */
898
899 if (!(ps->flags & PEER_F_TEACH_STAGE1)) {
900 /* lesson stage 1 not complete */
901 struct eb32_node *eb;
902
903 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
904 while (1) {
905 int msglen;
906 struct stksess *ts;
907
908 if (!eb) {
909 /* flag lesson stage1 complete */
910 ps->flags |= PEER_F_TEACH_STAGE1;
911 eb = eb32_first(&ps->table->table->updates);
912 if (eb)
913 ps->pushed = eb->key - 1;
914 break;
915 }
916
917 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100918 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200919 if (msglen) {
920 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100921 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200922 if (repl <= 0) {
923 /* no more write possible */
924 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100925 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100926 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200927 goto switchstate;
928 }
929 ps->lastpush = ps->pushed = ts->upd.key;
930 }
931 eb = eb32_next(eb);
932 }
933 } /* !TEACH_STAGE1 */
934
935 if (!(ps->flags & PEER_F_TEACH_STAGE2)) {
936 /* lesson stage 2 not complete */
937 struct eb32_node *eb;
938
939 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
940 while (1) {
941 int msglen;
942 struct stksess *ts;
943
944 if (!eb || eb->key > ps->teaching_origin) {
945 /* flag lesson stage1 complete */
946 ps->flags |= PEER_F_TEACH_STAGE2;
947 ps->pushed = ps->teaching_origin;
948 break;
949 }
950
951 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100952 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200953 if (msglen) {
954 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100955 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200956 if (repl <= 0) {
957 /* no more write possible */
958 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100959 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100960 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200961 goto switchstate;
962 }
963 ps->lastpush = ps->pushed = ts->upd.key;
964 }
965 eb = eb32_next(eb);
966 }
967 } /* !TEACH_STAGE2 */
968
969 if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
970 /* process final lesson message */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100971 repl = bi_putchr(si_ic(si), ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
Emeric Brun2b920a12010-09-23 18:30:22 +0200972 if (repl <= 0) {
973 /* no more write possible */
974 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100975 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100976 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200977 goto switchstate;
978 }
979
980 /* flag finished message sent */
981 ps->flags |= PEER_F_TEACH_FINISHED;
982 } /* !TEACH_FINISHED */
983 } /* TEACH_PROCESS */
984
985 if (!(ps->flags & PEER_F_LEARN_ASSIGN) &&
986 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
987 /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */
988 struct eb32_node *eb;
989
990 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
991 while (1) {
992 int msglen;
993 struct stksess *ts;
994
995 /* push local updates */
996 if (!eb) {
997 eb = eb32_first(&ps->table->table->updates);
998 if (!eb || ((int)(eb->key - ps->pushed) <= 0)) {
999 ps->pushed = ps->table->table->localupdate;
1000 break;
1001 }
1002 }
1003
1004 if ((int)(eb->key - ps->table->table->localupdate) > 0) {
1005 ps->pushed = ps->table->table->localupdate;
1006 break;
1007 }
1008
1009 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001010 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +02001011 if (msglen) {
1012 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001013 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +02001014 if (repl <= 0) {
1015 /* no more write possible */
1016 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +01001017 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001018 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001019 goto switchstate;
1020 }
1021 ps->lastpush = ps->pushed = ts->upd.key;
1022 }
1023 eb = eb32_next(eb);
1024 }
1025 } /* ! LEARN_ASSIGN */
1026 /* noting more to do */
1027 goto out;
1028 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001029 case PEER_SESS_ST_EXIT:
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001030 repl = snprintf(trash.str, trash.size, "%d\n", appctx->st1);
Emeric Brun2b920a12010-09-23 18:30:22 +02001031
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001032 if (bi_putblk(si_ic(si), trash.str, repl) == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +01001033 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001034 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001035 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001036 case PEER_SESS_ST_END: {
Willy Tarreau73b013b2012-05-21 16:31:45 +02001037 si_shutw(si);
1038 si_shutr(si);
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001039 si_ic(si)->flags |= CF_READ_NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001040 goto quit;
1041 }
1042 }
1043 }
1044out:
Willy Tarreau73b013b2012-05-21 16:31:45 +02001045 si_update(si);
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001046 si_oc(si)->flags |= CF_READ_DONTWAIT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001047 /* we don't want to expire timeouts while we're processing requests */
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001048 si_ic(si)->rex = TICK_ETERNITY;
1049 si_oc(si)->wex = TICK_ETERNITY;
Emeric Brun2b920a12010-09-23 18:30:22 +02001050quit:
1051 return;
Willy Tarreaubc18da12015-03-13 14:00:47 +01001052full:
1053 si->flags |= SI_FL_WAIT_ROOM;
1054 goto out;
Emeric Brun2b920a12010-09-23 18:30:22 +02001055}
1056
Willy Tarreaub24281b2011-02-13 13:16:36 +01001057static struct si_applet peer_applet = {
Willy Tarreau3fdb3662012-11-12 00:42:33 +01001058 .obj_type = OBJ_TYPE_APPLET,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001059 .name = "<PEER>", /* used for logging */
1060 .fct = peer_io_handler,
Aman Gupta9a13e842012-04-02 18:57:53 -07001061 .release = peer_session_release,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001062};
Emeric Brun2b920a12010-09-23 18:30:22 +02001063
1064/*
1065 * Use this function to force a close of a peer session
1066 */
Willy Tarreau87b09662015-04-03 00:22:06 +02001067static void peer_session_forceshutdown(struct stream * stream)
Emeric Brun2b920a12010-09-23 18:30:22 +02001068{
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001069 struct stream_interface *oldsi = NULL;
1070 struct appctx *appctx = NULL;
1071 int i;
Emeric Brun2b920a12010-09-23 18:30:22 +02001072
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001073 for (i = 0; i <= 1; i++) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001074 appctx = objt_appctx(stream->si[i].end);
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001075 if (!appctx)
1076 continue;
1077 if (appctx->applet != &peer_applet)
1078 continue;
1079
Willy Tarreau87b09662015-04-03 00:22:06 +02001080 oldsi = &stream->si[i];
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001081 break;
Emeric Brun2b920a12010-09-23 18:30:22 +02001082 }
1083
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001084 if (!appctx)
1085 return;
1086
Emeric Brun2b920a12010-09-23 18:30:22 +02001087 /* call release to reinit resync states if needed */
1088 peer_session_release(oldsi);
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001089 appctx->st0 = PEER_SESS_ST_END;
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001090 appctx->ctx.peers.ptr = NULL;
Willy Tarreau87b09662015-04-03 00:22:06 +02001091 task_wakeup(stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001092}
1093
Willy Tarreau91d96282015-03-13 15:47:26 +01001094/* Pre-configures a peers frontend to accept incoming connections */
1095void peers_setup_frontend(struct proxy *fe)
1096{
1097 fe->last_change = now.tv_sec;
1098 fe->cap = PR_CAP_FE;
1099 fe->maxconn = 0;
1100 fe->conn_retries = CONN_RETRIES;
1101 fe->timeout.client = MS_TO_TICKS(5000);
Willy Tarreaud1d48d42015-03-13 16:15:46 +01001102 fe->accept = frontend_accept;
Willy Tarreauf87ab942015-03-13 15:55:16 +01001103 fe->default_target = &peer_applet.obj_type;
Willy Tarreau91d96282015-03-13 15:47:26 +01001104 fe->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
1105}
1106
Emeric Brun2b920a12010-09-23 18:30:22 +02001107/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01001108 * Create a new peer session in assigned state (connect will start automatically)
Emeric Brun2b920a12010-09-23 18:30:22 +02001109 */
Willy Tarreau87b09662015-04-03 00:22:06 +02001110static struct stream *peer_session_create(struct peer *peer, struct peer_session *ps)
Emeric Brun2b920a12010-09-23 18:30:22 +02001111{
Willy Tarreau4348fad2012-09-20 16:48:07 +02001112 struct listener *l = LIST_NEXT(&peer->peers->peers_fe->conf.listeners, struct listener *, by_fe);
Emeric Brun2b920a12010-09-23 18:30:22 +02001113 struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001114 struct appctx *appctx;
Willy Tarreau15b5e142015-04-04 14:38:25 +02001115 struct session *sess;
Willy Tarreau87b09662015-04-03 00:22:06 +02001116 struct stream *s;
Emeric Brun2b920a12010-09-23 18:30:22 +02001117 struct task *t;
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001118 struct connection *conn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001119
Willy Tarreau15b5e142015-04-04 14:38:25 +02001120 sess = pool_alloc2(pool2_session);
1121 if (!sess) {
Godbach430f2912013-06-20 13:28:38 +08001122 Alert("out of memory in peer_session_create().\n");
Emeric Brun2b920a12010-09-23 18:30:22 +02001123 goto out_close;
1124 }
1125
Willy Tarreau15b5e142015-04-04 14:38:25 +02001126 sess->listener = l;
1127 sess->fe = p;
Willy Tarreau7ea671b2015-04-04 14:46:56 +02001128 sess->accept_date = date; /* user-visible date for logging */
1129 sess->tv_accept = now; /* corrected date for internal use */
Willy Tarreaub2bf8332015-04-04 15:58:58 +02001130 memset(sess->stkctr, 0, sizeof(sess->stkctr));
Willy Tarreau15b5e142015-04-04 14:38:25 +02001131
1132 if ((s = pool_alloc2(pool2_stream)) == NULL) { /* disable this proxy for a while */
1133 Alert("out of memory in peer_session_create().\n");
1134 goto out_free_sess;
1135 }
1136
Willy Tarreau87b09662015-04-03 00:22:06 +02001137 LIST_ADDQ(&streams, &s->list);
Emeric Brun2b920a12010-09-23 18:30:22 +02001138 LIST_INIT(&s->back_refs);
Willy Tarreau2d7ec462015-02-14 14:14:57 +01001139 LIST_INIT(&s->buffer_wait);
Emeric Brun2b920a12010-09-23 18:30:22 +02001140
Willy Tarreaue7dff022015-04-03 01:14:29 +02001141 s->flags = SF_ASSIGNED|SF_ADDR_SET;
Emeric Brun2b920a12010-09-23 18:30:22 +02001142
1143 /* if this session comes from a known monitoring system, we want to ignore
1144 * it as soon as possible, which means closing it immediately for TCP.
1145 */
1146 if ((t = task_new()) == NULL) { /* disable this proxy for a while */
Godbach430f2912013-06-20 13:28:38 +08001147 Alert("out of memory in peer_session_create().\n");
Willy Tarreau15b5e142015-04-04 14:38:25 +02001148 goto out_free_strm;
Emeric Brun2b920a12010-09-23 18:30:22 +02001149 }
1150
1151 ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001152 ps->statuscode = PEER_SESS_SC_CONNECTCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +02001153
1154 t->process = l->handler;
1155 t->context = s;
1156 t->nice = l->nice;
1157
Emeric Brun2b920a12010-09-23 18:30:22 +02001158 s->task = t;
Willy Tarreau15b5e142015-04-04 14:38:25 +02001159 s->sess = sess;
Willy Tarreaue36cbcb2015-04-03 15:40:56 +02001160 s->be = s->sess->fe;
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001161 s->req.buf = s->res.buf = NULL;
Willy Tarreaucb7dd012015-04-03 22:16:32 +02001162 s->req_cap = NULL;
1163 s->res_cap = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001164
Willy Tarreaua5f5d8d2014-11-28 11:26:07 +01001165 s->si[0].flags = SI_FL_NONE;
1166 s->si[1].flags = SI_FL_ISBACK;
1167
Willy Tarreau819d3322014-11-28 12:12:34 +01001168 si_reset(&s->si[0]);
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001169 si_set_state(&s->si[0], SI_ST_EST);
1170
Willy Tarreaue36cbcb2015-04-03 15:40:56 +02001171 if (s->sess->fe->options2 & PR_O2_INDEPSTR)
Emeric Brun2b920a12010-09-23 18:30:22 +02001172 s->si[0].flags |= SI_FL_INDEP_STR;
Emeric Brun2b920a12010-09-23 18:30:22 +02001173
Willy Tarreau1fbe1c92013-12-01 09:35:41 +01001174 appctx = stream_int_register_handler(&s->si[0], &peer_applet);
1175 if (!appctx)
Willy Tarreau15b5e142015-04-04 14:38:25 +02001176 goto out_free_task;
1177
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001178 appctx->st0 = PEER_SESS_ST_CONNECT;
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001179 appctx->ctx.peers.ptr = (void *)ps;
Willy Tarreau40606ab2015-04-03 18:08:29 +02001180 s->sess->origin = &appctx->obj_type;
Emeric Brun2b920a12010-09-23 18:30:22 +02001181
Willy Tarreau819d3322014-11-28 12:12:34 +01001182 si_reset(&s->si[1]);
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001183
1184 /* initiate an outgoing connection */
1185 si_set_state(&s->si[1], SI_ST_ASS);
Emeric Brun2b920a12010-09-23 18:30:22 +02001186 s->si[1].conn_retries = p->conn_retries;
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001187
Emeric Brun2b920a12010-09-23 18:30:22 +02001188 if (s->be->options2 & PR_O2_INDEPSTR)
1189 s->si[1].flags |= SI_FL_INDEP_STR;
1190
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001191 /* automatically prepare the stream interface to connect to the
Willy Tarreaub363a1f2013-10-01 10:45:07 +02001192 * pre-initialized connection in si->conn.
1193 */
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001194 if (unlikely((conn = conn_new()) == NULL))
Willy Tarreau15b5e142015-04-04 14:38:25 +02001195 goto out_free_strm;
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001196
1197 conn_prepare(conn, peer->proto, peer->xprt);
1198 si_attach_conn(&s->si[1], conn);
1199
1200 conn->target = s->target = &s->be->obj_type;
1201 memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
Willy Tarreaub363a1f2013-10-01 10:45:07 +02001202
Willy Tarreau87b09662015-04-03 00:22:06 +02001203 stream_init_srv_conn(s);
Emeric Brun2b920a12010-09-23 18:30:22 +02001204 s->pend_pos = NULL;
1205
1206 /* init store persistence */
1207 s->store_count = 0;
Willy Tarreaud5ca9ab2013-05-28 17:40:25 +02001208 memset(s->stkctr, 0, sizeof(s->stkctr));
Emeric Brun2b920a12010-09-23 18:30:22 +02001209
1210 /* FIXME: the logs are horribly complicated now, because they are
Willy Tarreauae727bf2013-10-01 17:06:10 +02001211 * defined in <p>, <p>, and later <be> and <be>. We still initialize
1212 * a few of them to help troubleshooting (eg: show sess shows them).
Emeric Brun2b920a12010-09-23 18:30:22 +02001213 */
1214
1215 s->logs.logwait = 0;
Willy Tarreauabcd5142013-06-11 17:18:02 +02001216 s->logs.level = 0;
Willy Tarreau7ea671b2015-04-04 14:46:56 +02001217 s->logs.accept_date = sess->accept_date; /* user-visible date for logging */
1218 s->logs.tv_accept = sess->tv_accept; /* corrected date for internal use */
Emeric Brun2b920a12010-09-23 18:30:22 +02001219 s->do_log = NULL;
1220
1221 /* default error reporting function, may be changed by analysers */
1222 s->srv_error = default_srv_error;
1223
Emeric Brun2b920a12010-09-23 18:30:22 +02001224 s->uniq_id = 0;
Willy Tarreaubd833142012-05-08 15:51:44 +02001225 s->unique_id = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001226
Willy Tarreaueee5b512015-04-03 23:46:31 +02001227 s->txn = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001228
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001229 channel_init(&s->req);
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001230 s->req.flags |= CF_READ_ATTACHED; /* the producer is already connected */
Emeric Brun2b920a12010-09-23 18:30:22 +02001231
1232 /* activate default analysers enabled for this listener */
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001233 s->req.analysers = l->analysers;
Emeric Brun2b920a12010-09-23 18:30:22 +02001234
1235 /* note: this should not happen anymore since there's always at least the switching rules */
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001236 if (!s->req.analysers) {
1237 channel_auto_connect(&s->req);/* don't wait to establish connection */
1238 channel_auto_close(&s->req);/* let the producer forward close requests */
Emeric Brun2b920a12010-09-23 18:30:22 +02001239 }
1240
Willy Tarreaue36cbcb2015-04-03 15:40:56 +02001241 s->req.rto = s->sess->fe->timeout.client;
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001242 s->req.wto = s->be->timeout.server;
Emeric Brun2b920a12010-09-23 18:30:22 +02001243
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001244 channel_init(&s->res);
Willy Tarreauef573c02014-11-28 14:17:09 +01001245 s->res.flags |= CF_ISRESP;
1246
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001247 s->res.rto = s->be->timeout.server;
Willy Tarreaue36cbcb2015-04-03 15:40:56 +02001248 s->res.wto = s->sess->fe->timeout.client;
Emeric Brun2b920a12010-09-23 18:30:22 +02001249
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001250 s->req.rex = TICK_ETERNITY;
1251 s->req.wex = TICK_ETERNITY;
1252 s->req.analyse_exp = TICK_ETERNITY;
1253 s->res.rex = TICK_ETERNITY;
1254 s->res.wex = TICK_ETERNITY;
1255 s->res.analyse_exp = TICK_ETERNITY;
Emeric Brun2b920a12010-09-23 18:30:22 +02001256 t->expire = TICK_ETERNITY;
1257
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001258 s->res.flags |= CF_READ_DONTWAIT;
Willy Tarreau696a2912014-11-24 11:36:57 +01001259
Emeric Brun2b920a12010-09-23 18:30:22 +02001260 /* it is important not to call the wakeup function directly but to
1261 * pass through task_wakeup(), because this one knows how to apply
1262 * priorities to tasks.
1263 */
1264 task_wakeup(t, TASK_WOKEN_INIT);
1265
1266 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
1267 p->feconn++;/* beconn will be increased later */
1268 jobs++;
Willy Tarreaufb0afa72015-04-03 14:46:27 +02001269 if (!(s->sess->listener->options & LI_O_UNLIMITED))
Willy Tarreau3c63fd82011-09-07 18:00:47 +02001270 actconn++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001271 totalconn++;
1272
1273 return s;
1274
1275 /* Error unrolling */
Willy Tarreaufeb76402015-04-03 14:10:06 +02001276 out_free_task:
Emeric Brun2b920a12010-09-23 18:30:22 +02001277 task_free(t);
Willy Tarreau15b5e142015-04-04 14:38:25 +02001278 out_free_strm:
Emeric Brun2b920a12010-09-23 18:30:22 +02001279 LIST_DEL(&s->list);
Willy Tarreau87b09662015-04-03 00:22:06 +02001280 pool_free2(pool2_stream, s);
Willy Tarreau15b5e142015-04-04 14:38:25 +02001281 out_free_sess:
Willy Tarreau11c36242015-04-04 15:54:03 +02001282 session_free(sess);
Emeric Brun2b920a12010-09-23 18:30:22 +02001283 out_close:
1284 return s;
1285}
1286
1287/*
1288 * Task processing function to manage re-connect and peer session
1289 * tasks wakeup on local update.
1290 */
Simon Horman96553772011-06-08 09:18:51 +09001291static struct task *process_peer_sync(struct task * task)
Emeric Brun2b920a12010-09-23 18:30:22 +02001292{
1293 struct shared_table *st = (struct shared_table *)task->context;
1294 struct peer_session *ps;
1295
1296 task->expire = TICK_ETERNITY;
1297
1298 if (!stopping) {
1299 /* Normal case (not soft stop)*/
1300 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) &&
1301 (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) &&
1302 !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) {
1303 /* Resync from local peer needed
1304 no peer was assigned for the lesson
1305 and no old local peer found
1306 or resync timeout expire */
1307
1308 /* flag no more resync from local, to try resync from remotes */
1309 st->flags |= SHTABLE_F_RESYNC_LOCAL;
1310
1311 /* reschedule a resync */
1312 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1313 }
1314
1315 /* For each session */
1316 for (ps = st->sessions; ps; ps = ps->next) {
1317 /* For each remote peers */
1318 if (!ps->peer->local) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001319 if (!ps->stream) {
1320 /* no active stream */
Emeric Brun2b920a12010-09-23 18:30:22 +02001321 if (ps->statuscode == 0 ||
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001322 ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
1323 ((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
1324 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
Emeric Brun2b920a12010-09-23 18:30:22 +02001325 tick_is_expired(ps->reconnect, now_ms))) {
1326 /* connection never tried
Willy Tarreau87b09662015-04-03 00:22:06 +02001327 * or previous stream established with success
1328 * or previous stream failed during connection
Emeric Brun2b920a12010-09-23 18:30:22 +02001329 * and reconnection timer is expired */
1330
1331 /* retry a connect */
Willy Tarreau87b09662015-04-03 00:22:06 +02001332 ps->stream = peer_session_create(ps->peer, ps);
Emeric Brun2b920a12010-09-23 18:30:22 +02001333 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001334 else if (ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
1335 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001336 /* If previous session failed during connection
1337 * but reconnection timer is not expired */
1338
1339 /* reschedule task for reconnect */
1340 task->expire = tick_first(task->expire, ps->reconnect);
1341 }
1342 /* else do nothing */
Willy Tarreau87b09662015-04-03 00:22:06 +02001343 } /* !ps->stream */
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001344 else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001345 /* current stream is active and established */
Emeric Brun2b920a12010-09-23 18:30:22 +02001346 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1347 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1348 !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
1349 /* Resync from a remote is needed
1350 * and no peer was assigned for lesson
1351 * and current peer may be up2date */
1352
1353 /* assign peer for the lesson */
1354 ps->flags |= PEER_F_LEARN_ASSIGN;
1355 st->flags |= SHTABLE_F_RESYNC_ASSIGN;
1356
Willy Tarreau87b09662015-04-03 00:22:06 +02001357 /* awake peer stream task to handle a request of resync */
1358 task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001359 }
1360 else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001361 /* awake peer stream task to push local updates */
1362 task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001363 }
1364 /* else do nothing */
1365 } /* SUCCESSCODE */
1366 } /* !ps->peer->local */
1367 } /* for */
1368
1369 /* Resync from remotes expired: consider resync is finished */
1370 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1371 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1372 tick_is_expired(st->resync_timeout, now_ms)) {
1373 /* Resync from remote peer needed
1374 * no peer was assigned for the lesson
1375 * and resync timeout expire */
1376
1377 /* flag no more resync from remote, consider resync is finished */
1378 st->flags |= SHTABLE_F_RESYNC_REMOTE;
1379 }
1380
1381 if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) {
1382 /* Resync not finished*/
1383 /* reschedule task to resync timeout, to ended resync if needed */
1384 task->expire = tick_first(task->expire, st->resync_timeout);
1385 }
1386 } /* !stopping */
1387 else {
1388 /* soft stop case */
1389 if (task->state & TASK_WOKEN_SIGNAL) {
1390 /* We've just recieved the signal */
1391 if (!(st->flags & SHTABLE_F_DONOTSTOP)) {
1392 /* add DO NOT STOP flag if not present */
1393 jobs++;
1394 st->flags |= SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001395 st->table->syncing++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001396 }
1397
1398 /* disconnect all connected peers */
1399 for (ps = st->sessions; ps; ps = ps->next) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001400 if (ps->stream) {
1401 peer_session_forceshutdown(ps->stream);
1402 ps->stream = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001403 }
1404 }
1405 }
1406 ps = st->local_session;
1407
1408 if (ps->flags & PEER_F_TEACH_COMPLETE) {
1409 if (st->flags & SHTABLE_F_DONOTSTOP) {
1410 /* resync of new process was complete, current process can die now */
1411 jobs--;
1412 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001413 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001414 }
1415 }
Willy Tarreau87b09662015-04-03 00:22:06 +02001416 else if (!ps->stream) {
1417 /* If stream is not active */
Emeric Brun2b920a12010-09-23 18:30:22 +02001418 if (ps->statuscode == 0 ||
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001419 ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
1420 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
1421 ps->statuscode == PEER_SESS_SC_TRYAGAIN) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001422 /* connection never tried
Willy Tarreau87b09662015-04-03 00:22:06 +02001423 * or previous stream was successfully established
1424 * or previous stream tcp connect success but init state incomplete
Emeric Brun2b920a12010-09-23 18:30:22 +02001425 * or during previous connect, peer replies a try again statuscode */
1426
1427 /* connect to the peer */
Willy Tarreau87b09662015-04-03 00:22:06 +02001428 ps->stream = peer_session_create(ps->peer, ps);
Emeric Brun2b920a12010-09-23 18:30:22 +02001429 }
1430 else {
1431 /* Other error cases */
1432 if (st->flags & SHTABLE_F_DONOTSTOP) {
1433 /* unable to resync new process, current process can die now */
1434 jobs--;
1435 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001436 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001437 }
1438 }
1439 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001440 else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE &&
Emeric Brun2b920a12010-09-23 18:30:22 +02001441 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
Willy Tarreau87b09662015-04-03 00:22:06 +02001442 /* current stream active and established
1443 awake stream to push remaining local updates */
1444 task_wakeup(ps->stream->task, TASK_WOKEN_MSG);
Emeric Brun2b920a12010-09-23 18:30:22 +02001445 }
1446 } /* stopping */
1447 /* Wakeup for re-connect */
1448 return task;
1449}
1450
1451/*
1452 * Function used to register a table for sync on a group of peers
1453 *
1454 */
1455void peers_register_table(struct peers *peers, struct stktable *table)
1456{
1457 struct shared_table *st;
1458 struct peer * curpeer;
1459 struct peer_session *ps;
Willy Tarreau4348fad2012-09-20 16:48:07 +02001460 struct listener *listener;
Emeric Brun2b920a12010-09-23 18:30:22 +02001461
1462 st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
1463 st->table = table;
1464 st->next = peers->tables;
1465 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1466 peers->tables = st;
1467
1468 for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
1469 ps = (struct peer_session *)calloc(1,sizeof(struct peer_session));
1470 ps->table = st;
1471 ps->peer = curpeer;
1472 if (curpeer->local)
1473 st->local_session = ps;
1474 ps->next = st->sessions;
1475 ps->reconnect = now_ms;
1476 st->sessions = ps;
1477 peers->peers_fe->maxconn += 3;
1478 }
1479
Willy Tarreau4348fad2012-09-20 16:48:07 +02001480 list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
1481 listener->maxconn = peers->peers_fe->maxconn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001482 st->sync_task = task_new();
1483 st->sync_task->process = process_peer_sync;
1484 st->sync_task->expire = TICK_ETERNITY;
1485 st->sync_task->context = (void *)st;
1486 table->sync_task =st->sync_task;
1487 signal_register_task(0, table->sync_task, 0);
1488 task_wakeup(st->sync_task, TASK_WOKEN_INIT);
1489}
1490