blob: 3cd4412727d0ea517c9c604ca87df5dc5d6c1665 [file] [log] [blame]
Emeric Brun2b920a12010-09-23 18:30:22 +02001/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01002 * Stick table synchro management.
Emeric Brun2b920a12010-09-23 18:30:22 +02003 *
4 * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <sys/socket.h>
20#include <sys/stat.h>
21#include <sys/types.h>
22
23#include <common/compat.h>
24#include <common/config.h>
25#include <common/time.h>
26
27#include <types/global.h>
Willy Tarreau3fdb3662012-11-12 00:42:33 +010028#include <types/listener.h>
29#include <types/obj_type.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020030#include <types/peers.h>
31
32#include <proto/acl.h>
Willy Tarreauc7e42382012-08-24 19:22:53 +020033#include <proto/channel.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020034#include <proto/fd.h>
Willy Tarreaud1d48d42015-03-13 16:15:46 +010035#include <proto/frontend.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020036#include <proto/log.h>
37#include <proto/hdr_idx.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020038#include <proto/proto_tcp.h>
39#include <proto/proto_http.h>
40#include <proto/proxy.h>
41#include <proto/session.h>
Willy Tarreau22ec1ea2014-11-27 20:45:39 +010042#include <proto/signal.h>
43#include <proto/stick_table.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020044#include <proto/stream_interface.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020045#include <proto/task.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020046
47
48/*******************************/
49/* Current peer learning state */
50/*******************************/
51
52/******************************/
53/* Current table resync state */
54/******************************/
55#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
56#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
57#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
58#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
59#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
60 to push data to new process */
61
62#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
63#define SHTABLE_RESYNC_FROMLOCAL 0x00000000
64#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL
65#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
66
67/******************************/
68/* Remote peer teaching state */
69/******************************/
70#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
71#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */
72#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */
73#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
74#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
75#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
76#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
77
78#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
79#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
80
81
82/**********************************/
83/* Peer Session IO handler states */
84/**********************************/
85
Willy Tarreaue4d927a2013-12-01 12:47:35 +010086enum {
87 PEER_SESS_ST_ACCEPT = 0, /* Initial state for session create by an accept, must be zero! */
88 PEER_SESS_ST_GETVERSION, /* Validate supported protocol version */
89 PEER_SESS_ST_GETHOST, /* Validate host ID correspond to local host id */
90 PEER_SESS_ST_GETPEER, /* Validate peer ID correspond to a known remote peer id */
91 PEER_SESS_ST_GETTABLE, /* Search into registered table for a table with same id and validate type and size */
92 /* after this point, data were possibly exchanged */
93 PEER_SESS_ST_SENDSUCCESS, /* Send ret code 200 (success) and wait for message */
94 PEER_SESS_ST_CONNECT, /* Initial state for session create on a connect, push presentation into buffer */
95 PEER_SESS_ST_GETSTATUS, /* Wait for the welcome message */
96 PEER_SESS_ST_WAITMSG, /* Wait for data messages */
97 PEER_SESS_ST_EXIT, /* Exit with status code */
98 PEER_SESS_ST_END, /* Killed session */
99};
Emeric Brun2b920a12010-09-23 18:30:22 +0200100
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100101/***************************************************/
102/* Peer Session status code - part of the protocol */
103/***************************************************/
Emeric Brun2b920a12010-09-23 18:30:22 +0200104
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100105#define PEER_SESS_SC_CONNECTCODE 100 /* connect in progress */
106#define PEER_SESS_SC_CONNECTEDCODE 110 /* tcp connect success */
Emeric Brun2b920a12010-09-23 18:30:22 +0200107
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100108#define PEER_SESS_SC_SUCCESSCODE 200 /* accept or connect successful */
Emeric Brun2b920a12010-09-23 18:30:22 +0200109
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100110#define PEER_SESS_SC_TRYAGAIN 300 /* try again later */
Emeric Brun2b920a12010-09-23 18:30:22 +0200111
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100112#define PEER_SESS_SC_ERRPROTO 501 /* error protocol */
113#define PEER_SESS_SC_ERRVERSION 502 /* unknown protocol version */
114#define PEER_SESS_SC_ERRHOST 503 /* bad host name */
115#define PEER_SESS_SC_ERRPEER 504 /* unknown peer */
116#define PEER_SESS_SC_ERRTYPE 505 /* table key type mismatch */
117#define PEER_SESS_SC_ERRSIZE 506 /* table key size mismatch */
118#define PEER_SESS_SC_ERRTABLE 507 /* unknown table */
Emeric Brun2b920a12010-09-23 18:30:22 +0200119
120#define PEER_SESSION_PROTO_NAME "HAProxyS"
121
122struct peers *peers = NULL;
Simon Horman96553772011-06-08 09:18:51 +0900123static void peer_session_forceshutdown(struct session * session);
Emeric Brun2b920a12010-09-23 18:30:22 +0200124
125
126/*
127 * This prepare the data update message of the stick session <ts>, <ps> is the the peer session
128 * where the data going to be pushed, <msg> is a buffer of <size> to recieve data message content
129 */
Simon Horman96553772011-06-08 09:18:51 +0900130static int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size)
Emeric Brun2b920a12010-09-23 18:30:22 +0200131{
132 uint32_t netinteger;
133 int len;
134 /* construct message */
135 if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) {
136 msg[0] = 0x80 + ts->upd.key - ps->lastpush;
137 len = sizeof(char);
138 }
139 else {
140 msg[0] = 'D';
141 netinteger = htonl(ts->upd.key);
142 memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger));
143 len = sizeof(char) + sizeof(netinteger);
144 }
145
146 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
147 int stlen = strlen((char *)ts->key.key);
148
149 netinteger = htonl(strlen((char *)ts->key.key));
150 memcpy(&msg[len], &netinteger, sizeof(netinteger));
151 memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen);
152 len += sizeof(netinteger) + stlen;
153
154 }
155 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
156 netinteger = htonl(*((uint32_t *)ts->key.key));
157 memcpy(&msg[len], &netinteger, sizeof(netinteger));
158 len += sizeof(netinteger);
159 }
160 else {
161 memcpy(&msg[len], ts->key.key, ps->table->table->key_size);
162 len += ps->table->table->key_size;
163 }
164
165 if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
166 netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id));
167 else
168 netinteger = 0;
169
170 memcpy(&msg[len], &netinteger , sizeof(netinteger));
171 len += sizeof(netinteger);
172
173 return len;
174}
175
176
177/*
178 * Callback to release a session with a peer
179 */
Simon Horman96553772011-06-08 09:18:51 +0900180static void peer_session_release(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200181{
Willy Tarreau3c23a852014-12-28 12:19:57 +0100182 struct session *s = si_sess(si);
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100183 struct appctx *appctx = objt_appctx(si->end);
184 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200185
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100186 /* appctx->ctx.peers.ptr is not a peer session */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100187 if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
Emeric Brun2b920a12010-09-23 18:30:22 +0200188 return;
189
190 /* peer session identified */
191 if (ps) {
192 if (ps->session == s) {
193 ps->session = NULL;
194 if (ps->flags & PEER_F_LEARN_ASSIGN) {
195 /* unassign current peer for learning */
196 ps->flags &= ~(PEER_F_LEARN_ASSIGN);
197 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
198
199 /* reschedule a resync */
200 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
201 }
202 /* reset teaching and learning flags to 0 */
203 ps->flags &= PEER_TEACH_RESET;
204 ps->flags &= PEER_LEARN_RESET;
205 }
206 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
207 }
208}
209
210
211/*
212 * IO Handler to handle message exchance with a peer
213 */
Willy Tarreaub24281b2011-02-13 13:16:36 +0100214static void peer_io_handler(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200215{
Willy Tarreau3c23a852014-12-28 12:19:57 +0100216 struct session *s = si_sess(si);
Emeric Brun2b920a12010-09-23 18:30:22 +0200217 struct peers *curpeers = (struct peers *)s->fe->parent;
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100218 struct appctx *appctx = objt_appctx(si->end);
Emeric Brun2b920a12010-09-23 18:30:22 +0200219 int reql = 0;
220 int repl = 0;
221
222 while (1) {
223switchstate:
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100224 switch(appctx->st0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100225 case PEER_SESS_ST_ACCEPT:
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100226 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100227 appctx->st0 = PEER_SESS_ST_GETVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200228 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100229 case PEER_SESS_ST_GETVERSION:
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100230 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200231 if (reql <= 0) { /* closed or EOL not found */
232 if (reql == 0)
233 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100234 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200235 goto switchstate;
236 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100237 if (trash.str[reql-1] != '\n') {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100238 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200239 goto switchstate;
240 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100241 else if (reql > 1 && (trash.str[reql-2] == '\r'))
242 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200243 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100244 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200245
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100246 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200247
248 /* test version */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100249 if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash.str) != 0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100250 appctx->st0 = PEER_SESS_ST_EXIT;
251 appctx->st1 = PEER_SESS_SC_ERRVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200252 /* test protocol */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100253 if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100254 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200255 goto switchstate;
256 }
257
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100258 appctx->st0 = PEER_SESS_ST_GETHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200259 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100260 case PEER_SESS_ST_GETHOST:
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100261 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200262 if (reql <= 0) { /* closed or EOL not found */
263 if (reql == 0)
264 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100265 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200266 goto switchstate;
267 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100268 if (trash.str[reql-1] != '\n') {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100269 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200270 goto switchstate;
271 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100272 else if (reql > 1 && (trash.str[reql-2] == '\r'))
273 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200274 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100275 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200276
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100277 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200278
279 /* test hostname match */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100280 if (strcmp(localpeer, trash.str) != 0) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100281 appctx->st0 = PEER_SESS_ST_EXIT;
282 appctx->st1 = PEER_SESS_SC_ERRHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200283 goto switchstate;
284 }
285
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100286 appctx->st0 = PEER_SESS_ST_GETPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200287 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100288 case PEER_SESS_ST_GETPEER: {
Emeric Brun2b920a12010-09-23 18:30:22 +0200289 struct peer *curpeer;
290 char *p;
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100291 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200292 if (reql <= 0) { /* closed or EOL not found */
293 if (reql == 0)
294 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100295 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200296 goto switchstate;
297 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100298 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200299 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100300 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200301 goto switchstate;
302 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100303 else if (reql > 1 && (trash.str[reql-2] == '\r'))
304 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200305 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100306 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200307
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100308 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200309
310 /* parse line "<peer name> <pid>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100311 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200312 if (!p) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100313 appctx->st0 = PEER_SESS_ST_EXIT;
314 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200315 goto switchstate;
316 }
317 *p = 0;
318
319 /* lookup known peer */
320 for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) {
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100321 if (strcmp(curpeer->id, trash.str) == 0)
Emeric Brun2b920a12010-09-23 18:30:22 +0200322 break;
323 }
324
325 /* if unknown peer */
326 if (!curpeer) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100327 appctx->st0 = PEER_SESS_ST_EXIT;
328 appctx->st1 = PEER_SESS_SC_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200329 goto switchstate;
330 }
331
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100332 appctx->ctx.peers.ptr = curpeer;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100333 appctx->st0 = PEER_SESS_ST_GETTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200334 /* fall through */
335 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100336 case PEER_SESS_ST_GETTABLE: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100337 struct peer *curpeer = (struct peer *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200338 struct shared_table *st;
339 struct peer_session *ps = NULL;
340 unsigned long key_type;
341 size_t key_size;
342 char *p;
343
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100344 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200345 if (reql <= 0) { /* closed or EOL not found */
346 if (reql == 0)
347 goto out;
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100348 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100349 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200350 goto switchstate;
351 }
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100352 /* Re init appctx->ctx.peers.ptr to null, to handle correctly a release case */
353 appctx->ctx.peers.ptr = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200354
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100355 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200356 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100357 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200358 goto switchstate;
359 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100360 else if (reql > 1 && (trash.str[reql-2] == '\r'))
361 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200362 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100363 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200364
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100365 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200366
367 /* Parse line "<table name> <type> <size>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100368 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200369 if (!p) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100370 appctx->st0 = PEER_SESS_ST_EXIT;
371 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200372 goto switchstate;
373 }
374 *p = 0;
375 key_type = (unsigned long)atol(p+1);
376
377 p = strchr(p+1, ' ');
378 if (!p) {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100379 appctx->ctx.peers.ptr = NULL;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100380 appctx->st0 = PEER_SESS_ST_EXIT;
381 appctx->st1 = PEER_SESS_SC_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200382 goto switchstate;
383 }
384
385 key_size = (size_t)atoi(p);
386 for (st = curpeers->tables; st; st = st->next) {
387 /* If table name matches */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100388 if (strcmp(st->table->id, trash.str) == 0) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100389 /* Check key size mismatches, except for strings
390 * which may be truncated as long as they fit in
391 * a buffer.
392 */
393 if (key_size != st->table->key_size &&
394 (key_type != STKTABLE_TYPE_STRING ||
395 1 + 4 + 4 + key_size - 1 >= trash.size)) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100396 appctx->st0 = PEER_SESS_ST_EXIT;
397 appctx->st1 = PEER_SESS_SC_ERRSIZE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200398 goto switchstate;
399 }
400
401 /* If key type mismatches */
402 if (key_type != st->table->type) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100403 appctx->st0 = PEER_SESS_ST_EXIT;
404 appctx->st1 = PEER_SESS_SC_ERRTYPE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200405 goto switchstate;
406 }
407
408 /* lookup peer session of current peer */
409 for (ps = st->sessions; ps; ps = ps->next) {
410 if (ps->peer == curpeer) {
411 /* If session already active, replaced by new one */
412 if (ps->session && ps->session != s) {
413 if (ps->peer->local) {
414 /* Local connection, reply a retry */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100415 appctx->st0 = PEER_SESS_ST_EXIT;
416 appctx->st1 = PEER_SESS_SC_TRYAGAIN;
Emeric Brun2b920a12010-09-23 18:30:22 +0200417 goto switchstate;
418 }
419 peer_session_forceshutdown(ps->session);
420 }
421 ps->session = s;
422 break;
423 }
424 }
425 break;
426 }
427 }
428
429 /* If table not found */
430 if (!st){
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100431 appctx->st0 = PEER_SESS_ST_EXIT;
432 appctx->st1 = PEER_SESS_SC_ERRTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200433 goto switchstate;
434 }
435
436 /* If no peer session for current peer */
437 if (!ps) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100438 appctx->st0 = PEER_SESS_ST_EXIT;
439 appctx->st1 = PEER_SESS_SC_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200440 goto switchstate;
441 }
442
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100443 appctx->ctx.peers.ptr = ps;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100444 appctx->st0 = PEER_SESS_ST_SENDSUCCESS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200445 /* fall through */
446 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100447 case PEER_SESS_ST_SENDSUCCESS: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100448 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200449
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100450 repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100451 repl = bi_putblk(si_ic(si), trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200452 if (repl <= 0) {
453 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100454 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100455 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200456 goto switchstate;
457 }
458
459 /* Register status code */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100460 ps->statuscode = PEER_SESS_SC_SUCCESSCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200461
462 /* Awake main task */
463 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
464
465 /* Init cursors */
466 ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0;
467 ps->pushed = ps->update;
468
469 /* Init confirm counter */
470 ps->confirm = 0;
471
472 /* reset teaching and learning flags to 0 */
473 ps->flags &= PEER_TEACH_RESET;
474 ps->flags &= PEER_LEARN_RESET;
475
476 /* if current peer is local */
477 if (ps->peer->local) {
478 /* if table need resyncfrom local and no process assined */
479 if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL &&
480 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
481 /* assign local peer for a lesson, consider lesson already requested */
482 ps->flags |= PEER_F_LEARN_ASSIGN;
483 ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
484 }
485
486 }
487 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
488 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
489 /* assign peer for a lesson */
490 ps->flags |= PEER_F_LEARN_ASSIGN;
491 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
492 }
493 /* switch to waiting message state */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100494 appctx->st0 = PEER_SESS_ST_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200495 goto switchstate;
496 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100497 case PEER_SESS_ST_CONNECT: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100498 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200499
500 /* Send headers */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100501 repl = snprintf(trash.str, trash.size,
Emeric Brun2b920a12010-09-23 18:30:22 +0200502 PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n",
503 ps->peer->id,
504 localpeer,
Willy Tarreau7b77c9f2012-01-07 22:52:12 +0100505 (int)getpid(),
Emeric Brun2b920a12010-09-23 18:30:22 +0200506 ps->table->table->id,
507 ps->table->table->type,
Willy Tarreaubd55e312010-11-11 10:55:09 +0100508 (int)ps->table->table->key_size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200509
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100510 if (repl >= trash.size) {
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100511 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200512 goto switchstate;
513 }
514
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100515 repl = bi_putblk(si_ic(si), trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200516 if (repl <= 0) {
517 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100518 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100519 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200520 goto switchstate;
521 }
522
523 /* switch to the waiting statuscode state */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100524 appctx->st0 = PEER_SESS_ST_GETSTATUS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200525 /* fall through */
526 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100527 case PEER_SESS_ST_GETSTATUS: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100528 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200529
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100530 if (si_ic(si)->flags & CF_WRITE_PARTIAL)
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100531 ps->statuscode = PEER_SESS_SC_CONNECTEDCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200532
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100533 reql = bo_getline(si_oc(si), trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200534 if (reql <= 0) { /* closed or EOL not found */
535 if (reql == 0)
536 goto out;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100537 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200538 goto switchstate;
539 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100540 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200541 /* Incomplete line, we quit */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100542 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200543 goto switchstate;
544 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100545 else if (reql > 1 && (trash.str[reql-2] == '\r'))
546 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200547 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100548 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200549
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100550 bo_skip(si_oc(si), reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200551
552 /* Register status code */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100553 ps->statuscode = atoi(trash.str);
Emeric Brun2b920a12010-09-23 18:30:22 +0200554
555 /* Awake main task */
556 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
557
558 /* If status code is success */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100559 if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200560 /* Init cursors */
561 ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0;
562 ps->pushed = ps->update;
563
564 /* Init confirm counter */
565 ps->confirm = 0;
566
567 /* reset teaching and learning flags to 0 */
568 ps->flags &= PEER_TEACH_RESET;
569 ps->flags &= PEER_LEARN_RESET;
570
571 /* If current peer is local */
572 if (ps->peer->local) {
573 /* Init cursors to push a resync */
574 ps->teaching_origin = ps->pushed = ps->table->table->update;
575 /* flag to start to teach lesson */
576 ps->flags |= PEER_F_TEACH_PROCESS;
577
578 }
579 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
580 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
581 /* If peer is remote and resync from remote is needed,
582 and no peer currently assigned */
583
584 /* assign peer for a lesson */
585 ps->flags |= PEER_F_LEARN_ASSIGN;
586 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
587 }
588
589 }
590 else {
591 /* Status code is not success, abort */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100592 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200593 goto switchstate;
594 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100595 appctx->st0 = PEER_SESS_ST_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200596 /* fall through */
597 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100598 case PEER_SESS_ST_WAITMSG: {
Willy Tarreau7b4b4992013-12-01 09:15:12 +0100599 struct peer_session *ps = (struct peer_session *)appctx->ctx.peers.ptr;
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200600 struct stksess *ts, *newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200601 char c;
602 int totl = 0;
603
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100604 reql = bo_getblk(si_oc(si), (char *)&c, sizeof(c), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200605 if (reql <= 0) /* closed or EOL not found */
606 goto incomplete;
607
Emeric Brun2b920a12010-09-23 18:30:22 +0200608 totl += reql;
609
610 if ((c & 0x80) || (c == 'D')) {
611 /* Here we have data message */
612 unsigned int pushack;
Emeric Brun2b920a12010-09-23 18:30:22 +0200613 int srvid;
614 uint32_t netinteger;
615
616 /* Compute update remote version */
617 if (c & 0x80) {
618 pushack = ps->pushack + (unsigned int)(c & 0x7F);
619 }
620 else {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100621 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200622 if (reql <= 0) /* closed or EOL not found */
623 goto incomplete;
624
Emeric Brun2b920a12010-09-23 18:30:22 +0200625 totl += reql;
626 pushack = ntohl(netinteger);
627 }
628
Willy Tarreau86a446e2013-11-25 23:02:37 +0100629 /* Read key. The string keys are read in two steps, the first step
630 * consists in reading whatever fits into the table directly into
631 * the pre-allocated key. The second step consists in simply
632 * draining all exceeding data. This can happen for example after a
633 * config reload with a smaller key size for the stick table than
634 * what was previously set, or when facing the impossibility to
635 * allocate a new stksess (for example when the table is full with
636 * "nopurge").
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200637 */
Emeric Brun2b920a12010-09-23 18:30:22 +0200638 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100639 unsigned int to_read, to_store;
640
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200641 /* read size first */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100642 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200643 if (reql <= 0) /* closed or EOL not found */
644 goto incomplete;
645
Emeric Brun2b920a12010-09-23 18:30:22 +0200646 totl += reql;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100647
648 to_store = 0;
649 to_read = ntohl(netinteger);
650
Willy Tarreau4e4292b2014-11-28 12:18:45 +0100651 if (to_read + totl > si_ob(si)->size) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100652 /* impossible to read a key this large, abort */
653 reql = -1;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200654 goto incomplete;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100655 }
Willy Tarreau72d6c162013-04-11 16:14:13 +0200656
Willy Tarreau86a446e2013-11-25 23:02:37 +0100657 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200658 if (newts)
Willy Tarreau86a446e2013-11-25 23:02:37 +0100659 to_store = MIN(to_read, ps->table->table->key_size - 1);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200660
Willy Tarreau86a446e2013-11-25 23:02:37 +0100661 /* we read up to two blocks, the first one goes into the key,
662 * the rest is drained into the trash.
663 */
664 if (to_store) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100665 reql = bo_getblk(si_oc(si), (char *)newts->key.key, to_store, totl);
Willy Tarreau86a446e2013-11-25 23:02:37 +0100666 if (reql <= 0) /* closed or incomplete */
667 goto incomplete;
668 newts->key.key[reql] = 0;
669 totl += reql;
670 to_read -= reql;
671 }
672 if (to_read) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100673 reql = bo_getblk(si_oc(si), trash.str, to_read, totl);
Willy Tarreau86a446e2013-11-25 23:02:37 +0100674 if (reql <= 0) /* closed or incomplete */
675 goto incomplete;
676 totl += reql;
677 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200678 }
679 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100680 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200681 if (reql <= 0) /* closed or EOL not found */
682 goto incomplete;
Cyril Bonté9a60ff92014-02-16 01:07:07 +0100683 newts = stksess_new(ps->table->table, NULL);
684 if (newts) {
685 netinteger = ntohl(netinteger);
686 memcpy(newts->key.key, &netinteger, sizeof(netinteger));
687 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200688 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200689 }
690 else {
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200691 /* type ip or binary */
692 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100693 reql = bo_getblk(si_oc(si), newts ? (char *)newts->key.key : trash.str, ps->table->table->key_size, totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200694 if (reql <= 0) /* closed or EOL not found */
695 goto incomplete;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200696 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200697 }
698
699 /* read server id */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100700 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200701 if (reql <= 0) /* closed or EOL not found */
702 goto incomplete;
703
Emeric Brun2b920a12010-09-23 18:30:22 +0200704 totl += reql;
705 srvid = ntohl(netinteger);
706
707 /* update entry */
Emeric Brun2b920a12010-09-23 18:30:22 +0200708 if (newts) {
709 /* lookup for existing entry */
710 ts = stktable_lookup(ps->table->table, newts);
711 if (ts) {
712 /* the entry already exist, we can free ours */
713 stktable_touch(ps->table->table, ts, 0);
714 stksess_free(ps->table->table, newts);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200715 newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200716 }
717 else {
718 struct eb32_node *eb;
719
720 /* create new entry */
721 ts = stktable_store(ps->table->table, newts, 0);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200722 newts = NULL; /* don't reuse it */
723
Emeric Brun2b920a12010-09-23 18:30:22 +0200724 ts->upd.key= (++ps->table->table->update)+(2^31);
725 eb = eb32_insert(&ps->table->table->updates, &ts->upd);
726 if (eb != &ts->upd) {
727 eb32_delete(eb);
728 eb32_insert(&ps->table->table->updates, &ts->upd);
729 }
730 }
731
732 /* update entry */
733 if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
734 stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
735 ps->pushack = pushack;
736 }
737
738 }
739 else if (c == 'R') {
740 /* Reset message: remote need resync */
741
742 /* reinit counters for a resync */
743 ps->lastpush = 0;
744 ps->teaching_origin = ps->pushed = ps->table->table->update;
745
746 /* reset teaching flags to 0 */
747 ps->flags &= PEER_TEACH_RESET;
748
749 /* flag to start to teach lesson */
750 ps->flags |= PEER_F_TEACH_PROCESS;
751 }
752 else if (c == 'F') {
753 /* Finish message, all known updates have been pushed by remote */
754 /* and remote is up to date */
755
756 /* If resync is in progress with remote peer */
757 if (ps->flags & PEER_F_LEARN_ASSIGN) {
758
759 /* unassign current peer for learning */
760 ps->flags &= ~PEER_F_LEARN_ASSIGN;
761 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
762
763 /* Consider table is now up2date, resync resync no more needed from local neither remote */
764 ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE);
765 }
766 /* Increase confirm counter to launch a confirm message */
767 ps->confirm++;
768 }
769 else if (c == 'c') {
770 /* confirm message, remote peer is now up to date with us */
771
772 /* If stopping state */
773 if (stopping) {
774 /* Close session, push resync no more needed */
775 ps->flags |= PEER_F_TEACH_COMPLETE;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100776 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200777 goto switchstate;
778 }
779
780 /* reset teaching flags to 0 */
781 ps->flags &= PEER_TEACH_RESET;
782 }
783 else if (c == 'C') {
784 /* Continue message, all known updates have been pushed by remote */
785 /* but remote is not up to date */
786
787 /* If resync is in progress with current peer */
788 if (ps->flags & PEER_F_LEARN_ASSIGN) {
789
790 /* unassign current peer */
791 ps->flags &= ~PEER_F_LEARN_ASSIGN;
792 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
793
794 /* flag current peer is not up 2 date to try from an other */
795 ps->flags |= PEER_F_LEARN_NOTUP2DATE;
796
797 /* reschedule a resync */
798 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
799 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
800 }
801 ps->confirm++;
802 }
803 else if (c == 'A') {
804 /* ack message */
805 uint32_t netinteger;
806
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100807 reql = bo_getblk(si_oc(si), (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200808 if (reql <= 0) /* closed or EOL not found */
809 goto incomplete;
810
Emeric Brun2b920a12010-09-23 18:30:22 +0200811 totl += reql;
812
813 /* Consider remote is up to date with "acked" version */
814 ps->update = ntohl(netinteger);
815 }
816 else {
817 /* Unknown message */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100818 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200819 goto switchstate;
820 }
821
822 /* skip consumed message */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100823 bo_skip(si_oc(si), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200824
825 /* loop on that state to peek next message */
Willy Tarreau72d6c162013-04-11 16:14:13 +0200826 goto switchstate;
827
Emeric Brun2b920a12010-09-23 18:30:22 +0200828incomplete:
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200829 /* we get here when a bo_getblk() returns <= 0 in reql */
830
831 /* first, we may have to release newts */
832 if (newts) {
833 stksess_free(ps->table->table, newts);
834 newts = NULL;
835 }
836
Willy Tarreau72d6c162013-04-11 16:14:13 +0200837 if (reql < 0) {
838 /* there was an error */
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100839 appctx->st0 = PEER_SESS_ST_END;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200840 goto switchstate;
841 }
842
Emeric Brun2b920a12010-09-23 18:30:22 +0200843 /* Nothing to read, now we start to write */
844
845 /* Confirm finished or partial messages */
846 while (ps->confirm) {
847 /* There is a confirm messages to send */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100848 repl = bi_putchr(si_ic(si), 'c');
Emeric Brun2b920a12010-09-23 18:30:22 +0200849 if (repl <= 0) {
850 /* no more write possible */
851 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100852 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100853 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200854 goto switchstate;
855 }
856 ps->confirm--;
857 }
858
859 /* Need to request a resync */
860 if ((ps->flags & PEER_F_LEARN_ASSIGN) &&
861 (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) &&
862 !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
863 /* Current peer was elected to request a resync */
864
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100865 repl = bi_putchr(si_ic(si), 'R');
Emeric Brun2b920a12010-09-23 18:30:22 +0200866 if (repl <= 0) {
867 /* no more write possible */
868 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100869 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100870 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200871 goto switchstate;
872 }
873 ps->table->flags |= SHTABLE_F_RESYNC_PROCESS;
874 }
875
876 /* It remains some updates to ack */
877 if (ps->pushack != ps->lastack) {
878 uint32_t netinteger;
879
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100880 trash.str[0] = 'A';
Emeric Brun2b920a12010-09-23 18:30:22 +0200881 netinteger = htonl(ps->pushack);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100882 memcpy(&trash.str[1], &netinteger, sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200883
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100884 repl = bi_putblk(si_ic(si), trash.str, 1+sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200885 if (repl <= 0) {
886 /* no more write possible */
887 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100888 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100889 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200890 goto switchstate;
891 }
892 ps->lastack = ps->pushack;
893 }
894
895 if (ps->flags & PEER_F_TEACH_PROCESS) {
896 /* current peer was requested for a lesson */
897
898 if (!(ps->flags & PEER_F_TEACH_STAGE1)) {
899 /* lesson stage 1 not complete */
900 struct eb32_node *eb;
901
902 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
903 while (1) {
904 int msglen;
905 struct stksess *ts;
906
907 if (!eb) {
908 /* flag lesson stage1 complete */
909 ps->flags |= PEER_F_TEACH_STAGE1;
910 eb = eb32_first(&ps->table->table->updates);
911 if (eb)
912 ps->pushed = eb->key - 1;
913 break;
914 }
915
916 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100917 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200918 if (msglen) {
919 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100920 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200921 if (repl <= 0) {
922 /* no more write possible */
923 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100924 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100925 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200926 goto switchstate;
927 }
928 ps->lastpush = ps->pushed = ts->upd.key;
929 }
930 eb = eb32_next(eb);
931 }
932 } /* !TEACH_STAGE1 */
933
934 if (!(ps->flags & PEER_F_TEACH_STAGE2)) {
935 /* lesson stage 2 not complete */
936 struct eb32_node *eb;
937
938 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
939 while (1) {
940 int msglen;
941 struct stksess *ts;
942
943 if (!eb || eb->key > ps->teaching_origin) {
944 /* flag lesson stage1 complete */
945 ps->flags |= PEER_F_TEACH_STAGE2;
946 ps->pushed = ps->teaching_origin;
947 break;
948 }
949
950 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100951 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200952 if (msglen) {
953 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100954 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200955 if (repl <= 0) {
956 /* no more write possible */
957 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100958 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100959 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200960 goto switchstate;
961 }
962 ps->lastpush = ps->pushed = ts->upd.key;
963 }
964 eb = eb32_next(eb);
965 }
966 } /* !TEACH_STAGE2 */
967
968 if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
969 /* process final lesson message */
Willy Tarreau2bb4a962014-11-28 11:11:05 +0100970 repl = bi_putchr(si_ic(si), ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
Emeric Brun2b920a12010-09-23 18:30:22 +0200971 if (repl <= 0) {
972 /* no more write possible */
973 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +0100974 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +0100975 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200976 goto switchstate;
977 }
978
979 /* flag finished message sent */
980 ps->flags |= PEER_F_TEACH_FINISHED;
981 } /* !TEACH_FINISHED */
982 } /* TEACH_PROCESS */
983
984 if (!(ps->flags & PEER_F_LEARN_ASSIGN) &&
985 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
986 /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */
987 struct eb32_node *eb;
988
989 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
990 while (1) {
991 int msglen;
992 struct stksess *ts;
993
994 /* push local updates */
995 if (!eb) {
996 eb = eb32_first(&ps->table->table->updates);
997 if (!eb || ((int)(eb->key - ps->pushed) <= 0)) {
998 ps->pushed = ps->table->table->localupdate;
999 break;
1000 }
1001 }
1002
1003 if ((int)(eb->key - ps->table->table->localupdate) > 0) {
1004 ps->pushed = ps->table->table->localupdate;
1005 break;
1006 }
1007
1008 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001009 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +02001010 if (msglen) {
1011 /* message to buffer */
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001012 repl = bi_putblk(si_ic(si), trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +02001013 if (repl <= 0) {
1014 /* no more write possible */
1015 if (repl == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +01001016 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001017 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001018 goto switchstate;
1019 }
1020 ps->lastpush = ps->pushed = ts->upd.key;
1021 }
1022 eb = eb32_next(eb);
1023 }
1024 } /* ! LEARN_ASSIGN */
1025 /* noting more to do */
1026 goto out;
1027 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001028 case PEER_SESS_ST_EXIT:
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001029 repl = snprintf(trash.str, trash.size, "%d\n", appctx->st1);
Emeric Brun2b920a12010-09-23 18:30:22 +02001030
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001031 if (bi_putblk(si_ic(si), trash.str, repl) == -1)
Willy Tarreaubc18da12015-03-13 14:00:47 +01001032 goto full;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001033 appctx->st0 = PEER_SESS_ST_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001034 /* fall through */
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001035 case PEER_SESS_ST_END: {
Willy Tarreau73b013b2012-05-21 16:31:45 +02001036 si_shutw(si);
1037 si_shutr(si);
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001038 si_ic(si)->flags |= CF_READ_NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001039 goto quit;
1040 }
1041 }
1042 }
1043out:
Willy Tarreau73b013b2012-05-21 16:31:45 +02001044 si_update(si);
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001045 si_oc(si)->flags |= CF_READ_DONTWAIT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001046 /* we don't want to expire timeouts while we're processing requests */
Willy Tarreau2bb4a962014-11-28 11:11:05 +01001047 si_ic(si)->rex = TICK_ETERNITY;
1048 si_oc(si)->wex = TICK_ETERNITY;
Emeric Brun2b920a12010-09-23 18:30:22 +02001049quit:
1050 return;
Willy Tarreaubc18da12015-03-13 14:00:47 +01001051full:
1052 si->flags |= SI_FL_WAIT_ROOM;
1053 goto out;
Emeric Brun2b920a12010-09-23 18:30:22 +02001054}
1055
Willy Tarreaub24281b2011-02-13 13:16:36 +01001056static struct si_applet peer_applet = {
Willy Tarreau3fdb3662012-11-12 00:42:33 +01001057 .obj_type = OBJ_TYPE_APPLET,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001058 .name = "<PEER>", /* used for logging */
1059 .fct = peer_io_handler,
Aman Gupta9a13e842012-04-02 18:57:53 -07001060 .release = peer_session_release,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001061};
Emeric Brun2b920a12010-09-23 18:30:22 +02001062
1063/*
1064 * Use this function to force a close of a peer session
1065 */
Simon Horman96553772011-06-08 09:18:51 +09001066static void peer_session_forceshutdown(struct session * session)
Emeric Brun2b920a12010-09-23 18:30:22 +02001067{
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001068 struct stream_interface *oldsi = NULL;
1069 struct appctx *appctx = NULL;
1070 int i;
Emeric Brun2b920a12010-09-23 18:30:22 +02001071
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001072 for (i = 0; i <= 1; i++) {
1073 appctx = objt_appctx(session->si[i].end);
1074 if (!appctx)
1075 continue;
1076 if (appctx->applet != &peer_applet)
1077 continue;
1078
1079 oldsi = &session->si[i];
1080 break;
Emeric Brun2b920a12010-09-23 18:30:22 +02001081 }
1082
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001083 if (!appctx)
1084 return;
1085
Emeric Brun2b920a12010-09-23 18:30:22 +02001086 /* call release to reinit resync states if needed */
1087 peer_session_release(oldsi);
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001088 appctx->st0 = PEER_SESS_ST_END;
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001089 appctx->ctx.peers.ptr = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001090 task_wakeup(session->task, TASK_WOKEN_MSG);
1091}
1092
Willy Tarreau91d96282015-03-13 15:47:26 +01001093/* Pre-configures a peers frontend to accept incoming connections */
1094void peers_setup_frontend(struct proxy *fe)
1095{
1096 fe->last_change = now.tv_sec;
1097 fe->cap = PR_CAP_FE;
1098 fe->maxconn = 0;
1099 fe->conn_retries = CONN_RETRIES;
1100 fe->timeout.client = MS_TO_TICKS(5000);
Willy Tarreaud1d48d42015-03-13 16:15:46 +01001101 fe->accept = frontend_accept;
Willy Tarreauf87ab942015-03-13 15:55:16 +01001102 fe->default_target = &peer_applet.obj_type;
Willy Tarreau91d96282015-03-13 15:47:26 +01001103 fe->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
1104}
1105
Emeric Brun2b920a12010-09-23 18:30:22 +02001106/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01001107 * Create a new peer session in assigned state (connect will start automatically)
Emeric Brun2b920a12010-09-23 18:30:22 +02001108 */
Simon Horman96553772011-06-08 09:18:51 +09001109static struct session *peer_session_create(struct peer *peer, struct peer_session *ps)
Emeric Brun2b920a12010-09-23 18:30:22 +02001110{
Willy Tarreau4348fad2012-09-20 16:48:07 +02001111 struct listener *l = LIST_NEXT(&peer->peers->peers_fe->conf.listeners, struct listener *, by_fe);
Emeric Brun2b920a12010-09-23 18:30:22 +02001112 struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001113 struct appctx *appctx;
Emeric Brun2b920a12010-09-23 18:30:22 +02001114 struct session *s;
1115 struct http_txn *txn;
1116 struct task *t;
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001117 struct connection *conn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001118
1119 if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */
Godbach430f2912013-06-20 13:28:38 +08001120 Alert("out of memory in peer_session_create().\n");
Emeric Brun2b920a12010-09-23 18:30:22 +02001121 goto out_close;
1122 }
1123
1124 LIST_ADDQ(&sessions, &s->list);
1125 LIST_INIT(&s->back_refs);
Willy Tarreau2d7ec462015-02-14 14:14:57 +01001126 LIST_INIT(&s->buffer_wait);
Emeric Brun2b920a12010-09-23 18:30:22 +02001127
1128 s->flags = SN_ASSIGNED|SN_ADDR_SET;
Emeric Brun2b920a12010-09-23 18:30:22 +02001129
1130 /* if this session comes from a known monitoring system, we want to ignore
1131 * it as soon as possible, which means closing it immediately for TCP.
1132 */
1133 if ((t = task_new()) == NULL) { /* disable this proxy for a while */
Godbach430f2912013-06-20 13:28:38 +08001134 Alert("out of memory in peer_session_create().\n");
Emeric Brun2b920a12010-09-23 18:30:22 +02001135 goto out_free_session;
1136 }
1137
1138 ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001139 ps->statuscode = PEER_SESS_SC_CONNECTCODE;
Emeric Brun2b920a12010-09-23 18:30:22 +02001140
1141 t->process = l->handler;
1142 t->context = s;
1143 t->nice = l->nice;
1144
Emeric Brun2b920a12010-09-23 18:30:22 +02001145 s->task = t;
1146 s->listener = l;
1147
1148 /* Note: initially, the session's backend points to the frontend.
1149 * This changes later when switching rules are executed or
1150 * when the default backend is assigned.
1151 */
1152 s->be = s->fe = p;
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001153 s->req.buf = s->res.buf = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001154
Willy Tarreaua5f5d8d2014-11-28 11:26:07 +01001155 s->si[0].flags = SI_FL_NONE;
1156 s->si[1].flags = SI_FL_ISBACK;
1157
Willy Tarreau819d3322014-11-28 12:12:34 +01001158 si_reset(&s->si[0]);
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001159 si_set_state(&s->si[0], SI_ST_EST);
1160
Emeric Brun2b920a12010-09-23 18:30:22 +02001161 if (s->fe->options2 & PR_O2_INDEPSTR)
1162 s->si[0].flags |= SI_FL_INDEP_STR;
Emeric Brun2b920a12010-09-23 18:30:22 +02001163
Willy Tarreau1fbe1c92013-12-01 09:35:41 +01001164 appctx = stream_int_register_handler(&s->si[0], &peer_applet);
1165 if (!appctx)
1166 goto out_fail_conn1;
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001167 appctx->st0 = PEER_SESS_ST_CONNECT;
Willy Tarreau7b4b4992013-12-01 09:15:12 +01001168 appctx->ctx.peers.ptr = (void *)ps;
Emeric Brun2b920a12010-09-23 18:30:22 +02001169
Willy Tarreau819d3322014-11-28 12:12:34 +01001170 si_reset(&s->si[1]);
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001171
1172 /* initiate an outgoing connection */
1173 si_set_state(&s->si[1], SI_ST_ASS);
Emeric Brun2b920a12010-09-23 18:30:22 +02001174 s->si[1].conn_retries = p->conn_retries;
Willy Tarreau3ed35ef2013-10-24 11:51:38 +02001175
Emeric Brun2b920a12010-09-23 18:30:22 +02001176 if (s->be->options2 & PR_O2_INDEPSTR)
1177 s->si[1].flags |= SI_FL_INDEP_STR;
1178
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001179 /* automatically prepare the stream interface to connect to the
Willy Tarreaub363a1f2013-10-01 10:45:07 +02001180 * pre-initialized connection in si->conn.
1181 */
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001182 if (unlikely((conn = conn_new()) == NULL))
1183 goto out_fail_conn1;
1184
1185 conn_prepare(conn, peer->proto, peer->xprt);
1186 si_attach_conn(&s->si[1], conn);
1187
1188 conn->target = s->target = &s->be->obj_type;
1189 memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
Willy Tarreaub363a1f2013-10-01 10:45:07 +02001190
Willy Tarreau9bd0d742011-07-20 00:17:39 +02001191 session_init_srv_conn(s);
Emeric Brun2b920a12010-09-23 18:30:22 +02001192 s->pend_pos = NULL;
1193
1194 /* init store persistence */
1195 s->store_count = 0;
Willy Tarreaud5ca9ab2013-05-28 17:40:25 +02001196 memset(s->stkctr, 0, sizeof(s->stkctr));
Emeric Brun2b920a12010-09-23 18:30:22 +02001197
1198 /* FIXME: the logs are horribly complicated now, because they are
Willy Tarreauae727bf2013-10-01 17:06:10 +02001199 * defined in <p>, <p>, and later <be> and <be>. We still initialize
1200 * a few of them to help troubleshooting (eg: show sess shows them).
Emeric Brun2b920a12010-09-23 18:30:22 +02001201 */
1202
1203 s->logs.logwait = 0;
Willy Tarreauabcd5142013-06-11 17:18:02 +02001204 s->logs.level = 0;
Willy Tarreauae727bf2013-10-01 17:06:10 +02001205 s->logs.accept_date = date; /* user-visible date for logging */
1206 s->logs.tv_accept = now; /* corrected date for internal use */
Emeric Brun2b920a12010-09-23 18:30:22 +02001207 s->do_log = NULL;
1208
1209 /* default error reporting function, may be changed by analysers */
1210 s->srv_error = default_srv_error;
1211
Emeric Brun2b920a12010-09-23 18:30:22 +02001212 s->uniq_id = 0;
Willy Tarreaubd833142012-05-08 15:51:44 +02001213 s->unique_id = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001214
1215 txn = &s->txn;
1216 /* Those variables will be checked and freed if non-NULL in
1217 * session.c:session_free(). It is important that they are
1218 * properly initialized.
1219 */
1220 txn->sessid = NULL;
1221 txn->srv_cookie = NULL;
1222 txn->cli_cookie = NULL;
1223 txn->uri = NULL;
1224 txn->req.cap = NULL;
1225 txn->rsp.cap = NULL;
1226 txn->hdr_idx.v = NULL;
1227 txn->hdr_idx.size = txn->hdr_idx.used = 0;
1228
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001229 channel_init(&s->req);
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001230 s->req.flags |= CF_READ_ATTACHED; /* the producer is already connected */
Emeric Brun2b920a12010-09-23 18:30:22 +02001231
1232 /* activate default analysers enabled for this listener */
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001233 s->req.analysers = l->analysers;
Emeric Brun2b920a12010-09-23 18:30:22 +02001234
1235 /* note: this should not happen anymore since there's always at least the switching rules */
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001236 if (!s->req.analysers) {
1237 channel_auto_connect(&s->req);/* don't wait to establish connection */
1238 channel_auto_close(&s->req);/* let the producer forward close requests */
Emeric Brun2b920a12010-09-23 18:30:22 +02001239 }
1240
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001241 s->req.rto = s->fe->timeout.client;
1242 s->req.wto = s->be->timeout.server;
Emeric Brun2b920a12010-09-23 18:30:22 +02001243
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001244 channel_init(&s->res);
Willy Tarreauef573c02014-11-28 14:17:09 +01001245 s->res.flags |= CF_ISRESP;
1246
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001247 s->res.rto = s->be->timeout.server;
1248 s->res.wto = s->fe->timeout.client;
Emeric Brun2b920a12010-09-23 18:30:22 +02001249
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001250 s->req.rex = TICK_ETERNITY;
1251 s->req.wex = TICK_ETERNITY;
1252 s->req.analyse_exp = TICK_ETERNITY;
1253 s->res.rex = TICK_ETERNITY;
1254 s->res.wex = TICK_ETERNITY;
1255 s->res.analyse_exp = TICK_ETERNITY;
Emeric Brun2b920a12010-09-23 18:30:22 +02001256 t->expire = TICK_ETERNITY;
1257
Willy Tarreau22ec1ea2014-11-27 20:45:39 +01001258 s->res.flags |= CF_READ_DONTWAIT;
Willy Tarreau696a2912014-11-24 11:36:57 +01001259
Emeric Brun2b920a12010-09-23 18:30:22 +02001260 /* it is important not to call the wakeup function directly but to
1261 * pass through task_wakeup(), because this one knows how to apply
1262 * priorities to tasks.
1263 */
1264 task_wakeup(t, TASK_WOKEN_INIT);
1265
1266 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
1267 p->feconn++;/* beconn will be increased later */
1268 jobs++;
Willy Tarreau3c63fd82011-09-07 18:00:47 +02001269 if (!(s->listener->options & LI_O_UNLIMITED))
1270 actconn++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001271 totalconn++;
1272
1273 return s;
1274
1275 /* Error unrolling */
Willy Tarreau32e3c6a2013-10-11 19:34:20 +02001276 out_fail_conn1:
Emeric Brun2b920a12010-09-23 18:30:22 +02001277 task_free(t);
1278 out_free_session:
1279 LIST_DEL(&s->list);
1280 pool_free2(pool2_session, s);
1281 out_close:
1282 return s;
1283}
1284
1285/*
1286 * Task processing function to manage re-connect and peer session
1287 * tasks wakeup on local update.
1288 */
Simon Horman96553772011-06-08 09:18:51 +09001289static struct task *process_peer_sync(struct task * task)
Emeric Brun2b920a12010-09-23 18:30:22 +02001290{
1291 struct shared_table *st = (struct shared_table *)task->context;
1292 struct peer_session *ps;
1293
1294 task->expire = TICK_ETERNITY;
1295
1296 if (!stopping) {
1297 /* Normal case (not soft stop)*/
1298 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) &&
1299 (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) &&
1300 !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) {
1301 /* Resync from local peer needed
1302 no peer was assigned for the lesson
1303 and no old local peer found
1304 or resync timeout expire */
1305
1306 /* flag no more resync from local, to try resync from remotes */
1307 st->flags |= SHTABLE_F_RESYNC_LOCAL;
1308
1309 /* reschedule a resync */
1310 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1311 }
1312
1313 /* For each session */
1314 for (ps = st->sessions; ps; ps = ps->next) {
1315 /* For each remote peers */
1316 if (!ps->peer->local) {
1317 if (!ps->session) {
1318 /* no active session */
1319 if (ps->statuscode == 0 ||
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001320 ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
1321 ((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
1322 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
Emeric Brun2b920a12010-09-23 18:30:22 +02001323 tick_is_expired(ps->reconnect, now_ms))) {
1324 /* connection never tried
1325 * or previous session established with success
1326 * or previous session failed during connection
1327 * and reconnection timer is expired */
1328
1329 /* retry a connect */
1330 ps->session = peer_session_create(ps->peer, ps);
1331 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001332 else if (ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
1333 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001334 /* If previous session failed during connection
1335 * but reconnection timer is not expired */
1336
1337 /* reschedule task for reconnect */
1338 task->expire = tick_first(task->expire, ps->reconnect);
1339 }
1340 /* else do nothing */
1341 } /* !ps->session */
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001342 else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001343 /* current session is active and established */
1344 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1345 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1346 !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
1347 /* Resync from a remote is needed
1348 * and no peer was assigned for lesson
1349 * and current peer may be up2date */
1350
1351 /* assign peer for the lesson */
1352 ps->flags |= PEER_F_LEARN_ASSIGN;
1353 st->flags |= SHTABLE_F_RESYNC_ASSIGN;
1354
1355 /* awake peer session task to handle a request of resync */
1356 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1357 }
1358 else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
1359 /* awake peer session task to push local updates */
1360 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1361 }
1362 /* else do nothing */
1363 } /* SUCCESSCODE */
1364 } /* !ps->peer->local */
1365 } /* for */
1366
1367 /* Resync from remotes expired: consider resync is finished */
1368 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1369 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1370 tick_is_expired(st->resync_timeout, now_ms)) {
1371 /* Resync from remote peer needed
1372 * no peer was assigned for the lesson
1373 * and resync timeout expire */
1374
1375 /* flag no more resync from remote, consider resync is finished */
1376 st->flags |= SHTABLE_F_RESYNC_REMOTE;
1377 }
1378
1379 if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) {
1380 /* Resync not finished*/
1381 /* reschedule task to resync timeout, to ended resync if needed */
1382 task->expire = tick_first(task->expire, st->resync_timeout);
1383 }
1384 } /* !stopping */
1385 else {
1386 /* soft stop case */
1387 if (task->state & TASK_WOKEN_SIGNAL) {
1388 /* We've just recieved the signal */
1389 if (!(st->flags & SHTABLE_F_DONOTSTOP)) {
1390 /* add DO NOT STOP flag if not present */
1391 jobs++;
1392 st->flags |= SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001393 st->table->syncing++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001394 }
1395
1396 /* disconnect all connected peers */
1397 for (ps = st->sessions; ps; ps = ps->next) {
1398 if (ps->session) {
1399 peer_session_forceshutdown(ps->session);
1400 ps->session = NULL;
1401 }
1402 }
1403 }
1404 ps = st->local_session;
1405
1406 if (ps->flags & PEER_F_TEACH_COMPLETE) {
1407 if (st->flags & SHTABLE_F_DONOTSTOP) {
1408 /* resync of new process was complete, current process can die now */
1409 jobs--;
1410 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001411 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001412 }
1413 }
1414 else if (!ps->session) {
1415 /* If session is not active */
1416 if (ps->statuscode == 0 ||
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001417 ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
1418 ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
1419 ps->statuscode == PEER_SESS_SC_TRYAGAIN) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001420 /* connection never tried
1421 * or previous session was successfully established
1422 * or previous session tcp connect success but init state incomplete
1423 * or during previous connect, peer replies a try again statuscode */
1424
1425 /* connect to the peer */
1426 ps->session = peer_session_create(ps->peer, ps);
1427 }
1428 else {
1429 /* Other error cases */
1430 if (st->flags & SHTABLE_F_DONOTSTOP) {
1431 /* unable to resync new process, current process can die now */
1432 jobs--;
1433 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001434 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001435 }
1436 }
1437 }
Willy Tarreaue4d927a2013-12-01 12:47:35 +01001438 else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE &&
Emeric Brun2b920a12010-09-23 18:30:22 +02001439 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
1440 /* current session active and established
1441 awake session to push remaining local updates */
1442 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1443 }
1444 } /* stopping */
1445 /* Wakeup for re-connect */
1446 return task;
1447}
1448
1449/*
1450 * Function used to register a table for sync on a group of peers
1451 *
1452 */
1453void peers_register_table(struct peers *peers, struct stktable *table)
1454{
1455 struct shared_table *st;
1456 struct peer * curpeer;
1457 struct peer_session *ps;
Willy Tarreau4348fad2012-09-20 16:48:07 +02001458 struct listener *listener;
Emeric Brun2b920a12010-09-23 18:30:22 +02001459
1460 st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
1461 st->table = table;
1462 st->next = peers->tables;
1463 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1464 peers->tables = st;
1465
1466 for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
1467 ps = (struct peer_session *)calloc(1,sizeof(struct peer_session));
1468 ps->table = st;
1469 ps->peer = curpeer;
1470 if (curpeer->local)
1471 st->local_session = ps;
1472 ps->next = st->sessions;
1473 ps->reconnect = now_ms;
1474 st->sessions = ps;
1475 peers->peers_fe->maxconn += 3;
1476 }
1477
Willy Tarreau4348fad2012-09-20 16:48:07 +02001478 list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
1479 listener->maxconn = peers->peers_fe->maxconn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001480 st->sync_task = task_new();
1481 st->sync_task->process = process_peer_sync;
1482 st->sync_task->expire = TICK_ETERNITY;
1483 st->sync_task->context = (void *)st;
1484 table->sync_task =st->sync_task;
1485 signal_register_task(0, table->sync_task, 0);
1486 task_wakeup(st->sync_task, TASK_WOKEN_INIT);
1487}
1488