blob: 6a1a8a4ae3a2fa157b5035bca8c702dd56d8c684 [file] [log] [blame]
Emeric Brun2b920a12010-09-23 18:30:22 +02001/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01002 * Stick table synchro management.
Emeric Brun2b920a12010-09-23 18:30:22 +02003 *
4 * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <sys/socket.h>
20#include <sys/stat.h>
21#include <sys/types.h>
22
23#include <common/compat.h>
24#include <common/config.h>
25#include <common/time.h>
26
27#include <types/global.h>
Willy Tarreau3fdb3662012-11-12 00:42:33 +010028#include <types/listener.h>
29#include <types/obj_type.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020030#include <types/peers.h>
31
32#include <proto/acl.h>
Willy Tarreauc7e42382012-08-24 19:22:53 +020033#include <proto/channel.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020034#include <proto/fd.h>
35#include <proto/log.h>
36#include <proto/hdr_idx.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020037#include <proto/proto_tcp.h>
38#include <proto/proto_http.h>
39#include <proto/proxy.h>
40#include <proto/session.h>
41#include <proto/stream_interface.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020042#include <proto/task.h>
43#include <proto/stick_table.h>
44#include <proto/signal.h>
45
46
47/*******************************/
48/* Current peer learning state */
49/*******************************/
50
51/******************************/
52/* Current table resync state */
53/******************************/
54#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
55#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
56#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
57#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
58#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
59 to push data to new process */
60
61#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
62#define SHTABLE_RESYNC_FROMLOCAL 0x00000000
63#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL
64#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
65
66/******************************/
67/* Remote peer teaching state */
68/******************************/
69#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
70#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */
71#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */
72#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
73#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
74#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
75#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
76
77#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
78#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
79
80
81/**********************************/
82/* Peer Session IO handler states */
83/**********************************/
84
85#define PEER_SESSION_ACCEPT 1000 /* Initial state for session create by an accept */
86#define PEER_SESSION_GETVERSION 1001 /* Validate supported protocol version*/
87#define PEER_SESSION_GETHOST 1002 /* Validate host ID correspond to local host id */
88#define PEER_SESSION_GETPEER 1003 /* Validate peer ID correspond to a known remote peer id */
89#define PEER_SESSION_GETTABLE 1004 /* Search into registered table for a table with same id and
90 validate type and size */
91#define PEER_SESSION_SENDSUCCESS 1005 /* Send ret code 200 (success) and wait for message */
92/* next state is WAITMSG */
93
94#define PEER_SESSION_CONNECT 2000 /* Initial state for session create on a connect,
95 push presentation into buffer */
96#define PEER_SESSION_GETSTATUS 2001 /* Wait for the welcome message */
97#define PEER_SESSION_WAITMSG 2002 /* Wait for datamessages*/
98/* loop on WAITMSG */
99
100#define PEER_SESSION_EXIT 10000 /* Exit with status code */
101#define PEER_SESSION_END 10001 /* Killed session */
102/* session ended */
103
104
105/**********************************/
106/* Peer Session status code */
107/**********************************/
108
109#define PEER_SESSION_CONNECTCODE 100 /* connect in progress */
110#define PEER_SESSION_CONNECTEDCODE 110 /* tcp connect success */
111
112#define PEER_SESSION_SUCCESSCODE 200 /* accept or connect successful */
113
114#define PEER_SESSION_TRYAGAIN 300 /* try again later */
115
116#define PEER_SESSION_ERRPROTO 501 /* error protocol */
117#define PEER_SESSION_ERRVERSION 502 /* unknown protocol version */
118#define PEER_SESSION_ERRHOST 503 /* bad host name */
119#define PEER_SESSION_ERRPEER 504 /* unknown peer */
120#define PEER_SESSION_ERRTYPE 505 /* table key type mismatch */
121#define PEER_SESSION_ERRSIZE 506 /* table key size mismatch */
122#define PEER_SESSION_ERRTABLE 507 /* unknown table */
123
124#define PEER_SESSION_PROTO_NAME "HAProxyS"
125
126struct peers *peers = NULL;
Simon Horman96553772011-06-08 09:18:51 +0900127static void peer_session_forceshutdown(struct session * session);
Emeric Brun2b920a12010-09-23 18:30:22 +0200128
129
130/*
131 * This prepare the data update message of the stick session <ts>, <ps> is the the peer session
132 * where the data going to be pushed, <msg> is a buffer of <size> to recieve data message content
133 */
Simon Horman96553772011-06-08 09:18:51 +0900134static int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size)
Emeric Brun2b920a12010-09-23 18:30:22 +0200135{
136 uint32_t netinteger;
137 int len;
138 /* construct message */
139 if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) {
140 msg[0] = 0x80 + ts->upd.key - ps->lastpush;
141 len = sizeof(char);
142 }
143 else {
144 msg[0] = 'D';
145 netinteger = htonl(ts->upd.key);
146 memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger));
147 len = sizeof(char) + sizeof(netinteger);
148 }
149
150 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
151 int stlen = strlen((char *)ts->key.key);
152
153 netinteger = htonl(strlen((char *)ts->key.key));
154 memcpy(&msg[len], &netinteger, sizeof(netinteger));
155 memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen);
156 len += sizeof(netinteger) + stlen;
157
158 }
159 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
160 netinteger = htonl(*((uint32_t *)ts->key.key));
161 memcpy(&msg[len], &netinteger, sizeof(netinteger));
162 len += sizeof(netinteger);
163 }
164 else {
165 memcpy(&msg[len], ts->key.key, ps->table->table->key_size);
166 len += ps->table->table->key_size;
167 }
168
169 if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
170 netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id));
171 else
172 netinteger = 0;
173
174 memcpy(&msg[len], &netinteger , sizeof(netinteger));
175 len += sizeof(netinteger);
176
177 return len;
178}
179
180
181/*
182 * Callback to release a session with a peer
183 */
Simon Horman96553772011-06-08 09:18:51 +0900184static void peer_session_release(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200185{
Willy Tarreau9f681482013-07-08 16:05:07 +0200186 struct session *s = session_from_task(si->owner);
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100187 struct peer_session *ps = (struct peer_session *)si->appctx.ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200188
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100189 /* si->appctx.ctx.peers.ptr is not a peer session */
190 if (si->appctx.st0 < PEER_SESSION_SENDSUCCESS)
Emeric Brun2b920a12010-09-23 18:30:22 +0200191 return;
192
193 /* peer session identified */
194 if (ps) {
195 if (ps->session == s) {
196 ps->session = NULL;
197 if (ps->flags & PEER_F_LEARN_ASSIGN) {
198 /* unassign current peer for learning */
199 ps->flags &= ~(PEER_F_LEARN_ASSIGN);
200 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
201
202 /* reschedule a resync */
203 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
204 }
205 /* reset teaching and learning flags to 0 */
206 ps->flags &= PEER_TEACH_RESET;
207 ps->flags &= PEER_LEARN_RESET;
208 }
209 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
210 }
211}
212
213
214/*
215 * IO Handler to handle message exchance with a peer
216 */
Willy Tarreaub24281b2011-02-13 13:16:36 +0100217static void peer_io_handler(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200218{
Willy Tarreau9f681482013-07-08 16:05:07 +0200219 struct session *s = session_from_task(si->owner);
Emeric Brun2b920a12010-09-23 18:30:22 +0200220 struct peers *curpeers = (struct peers *)s->fe->parent;
221 int reql = 0;
222 int repl = 0;
223
224 while (1) {
225switchstate:
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100226 switch(si->appctx.st0) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200227 case PEER_SESSION_ACCEPT:
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100228 si->appctx.ctx.peers.ptr = NULL;
229 si->appctx.st0 = PEER_SESSION_GETVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200230 /* fall through */
231 case PEER_SESSION_GETVERSION:
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100232 reql = bo_getline(si->ob, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200233 if (reql <= 0) { /* closed or EOL not found */
234 if (reql == 0)
235 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100236 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200237 goto switchstate;
238 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100239 if (trash.str[reql-1] != '\n') {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100240 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200241 goto switchstate;
242 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100243 else if (reql > 1 && (trash.str[reql-2] == '\r'))
244 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200245 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100246 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200247
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200248 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200249
250 /* test version */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100251 if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash.str) != 0) {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100252 si->appctx.st0 = PEER_SESSION_EXIT;
253 si->appctx.st1 = PEER_SESSION_ERRVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200254 /* test protocol */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100255 if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100256 si->appctx.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200257 goto switchstate;
258 }
259
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100260 si->appctx.st0 = PEER_SESSION_GETHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200261 /* fall through */
262 case PEER_SESSION_GETHOST:
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100263 reql = bo_getline(si->ob, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200264 if (reql <= 0) { /* closed or EOL not found */
265 if (reql == 0)
266 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100267 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200268 goto switchstate;
269 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100270 if (trash.str[reql-1] != '\n') {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100271 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200272 goto switchstate;
273 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100274 else if (reql > 1 && (trash.str[reql-2] == '\r'))
275 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200276 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100277 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200278
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200279 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200280
281 /* test hostname match */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100282 if (strcmp(localpeer, trash.str) != 0) {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100283 si->appctx.st0 = PEER_SESSION_EXIT;
284 si->appctx.st1 = PEER_SESSION_ERRHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200285 goto switchstate;
286 }
287
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100288 si->appctx.st0 = PEER_SESSION_GETPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200289 /* fall through */
290 case PEER_SESSION_GETPEER: {
291 struct peer *curpeer;
292 char *p;
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100293 reql = bo_getline(si->ob, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200294 if (reql <= 0) { /* closed or EOL not found */
295 if (reql == 0)
296 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100297 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200298 goto switchstate;
299 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100300 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200301 /* Incomplete line, we quit */
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100302 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200303 goto switchstate;
304 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100305 else if (reql > 1 && (trash.str[reql-2] == '\r'))
306 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200307 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100308 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200309
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200310 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200311
312 /* parse line "<peer name> <pid>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100313 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200314 if (!p) {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100315 si->appctx.st0 = PEER_SESSION_EXIT;
316 si->appctx.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200317 goto switchstate;
318 }
319 *p = 0;
320
321 /* lookup known peer */
322 for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) {
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100323 if (strcmp(curpeer->id, trash.str) == 0)
Emeric Brun2b920a12010-09-23 18:30:22 +0200324 break;
325 }
326
327 /* if unknown peer */
328 if (!curpeer) {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100329 si->appctx.st0 = PEER_SESSION_EXIT;
330 si->appctx.st1 = PEER_SESSION_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200331 goto switchstate;
332 }
333
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100334 si->appctx.ctx.peers.ptr = curpeer;
335 si->appctx.st0 = PEER_SESSION_GETTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200336 /* fall through */
337 }
338 case PEER_SESSION_GETTABLE: {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100339 struct peer *curpeer = (struct peer *)si->appctx.ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200340 struct shared_table *st;
341 struct peer_session *ps = NULL;
342 unsigned long key_type;
343 size_t key_size;
344 char *p;
345
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100346 reql = bo_getline(si->ob, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200347 if (reql <= 0) { /* closed or EOL not found */
348 if (reql == 0)
349 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100350 si->appctx.ctx.peers.ptr = NULL;
351 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200352 goto switchstate;
353 }
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100354 /* Re init si->appctx.ctx.peers.ptr to null, to handle correctly a release case */
355 si->appctx.ctx.peers.ptr = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200356
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100357 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200358 /* Incomplete line, we quit */
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100359 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200360 goto switchstate;
361 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100362 else if (reql > 1 && (trash.str[reql-2] == '\r'))
363 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200364 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100365 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200366
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200367 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200368
369 /* Parse line "<table name> <type> <size>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100370 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200371 if (!p) {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100372 si->appctx.st0 = PEER_SESSION_EXIT;
373 si->appctx.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200374 goto switchstate;
375 }
376 *p = 0;
377 key_type = (unsigned long)atol(p+1);
378
379 p = strchr(p+1, ' ');
380 if (!p) {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100381 si->appctx.ctx.peers.ptr = NULL;
382 si->appctx.st0 = PEER_SESSION_EXIT;
383 si->appctx.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200384 goto switchstate;
385 }
386
387 key_size = (size_t)atoi(p);
388 for (st = curpeers->tables; st; st = st->next) {
389 /* If table name matches */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100390 if (strcmp(st->table->id, trash.str) == 0) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100391 /* Check key size mismatches, except for strings
392 * which may be truncated as long as they fit in
393 * a buffer.
394 */
395 if (key_size != st->table->key_size &&
396 (key_type != STKTABLE_TYPE_STRING ||
397 1 + 4 + 4 + key_size - 1 >= trash.size)) {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100398 si->appctx.st0 = PEER_SESSION_EXIT;
399 si->appctx.st1 = PEER_SESSION_ERRSIZE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200400 goto switchstate;
401 }
402
403 /* If key type mismatches */
404 if (key_type != st->table->type) {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100405 si->appctx.st0 = PEER_SESSION_EXIT;
406 si->appctx.st1 = PEER_SESSION_ERRTYPE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200407 goto switchstate;
408 }
409
410 /* lookup peer session of current peer */
411 for (ps = st->sessions; ps; ps = ps->next) {
412 if (ps->peer == curpeer) {
413 /* If session already active, replaced by new one */
414 if (ps->session && ps->session != s) {
415 if (ps->peer->local) {
416 /* Local connection, reply a retry */
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100417 si->appctx.st0 = PEER_SESSION_EXIT;
418 si->appctx.st1 = PEER_SESSION_TRYAGAIN;
Emeric Brun2b920a12010-09-23 18:30:22 +0200419 goto switchstate;
420 }
421 peer_session_forceshutdown(ps->session);
422 }
423 ps->session = s;
424 break;
425 }
426 }
427 break;
428 }
429 }
430
431 /* If table not found */
432 if (!st){
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100433 si->appctx.st0 = PEER_SESSION_EXIT;
434 si->appctx.st1 = PEER_SESSION_ERRTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200435 goto switchstate;
436 }
437
438 /* If no peer session for current peer */
439 if (!ps) {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100440 si->appctx.st0 = PEER_SESSION_EXIT;
441 si->appctx.st1 = PEER_SESSION_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200442 goto switchstate;
443 }
444
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100445 si->appctx.ctx.peers.ptr = ps;
446 si->appctx.st0 = PEER_SESSION_SENDSUCCESS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200447 /* fall through */
448 }
449 case PEER_SESSION_SENDSUCCESS:{
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100450 struct peer_session *ps = (struct peer_session *)si->appctx.ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200451
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100452 repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESSION_SUCCESSCODE);
453 repl = bi_putblk(si->ib, trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200454 if (repl <= 0) {
455 if (repl == -1)
456 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100457 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200458 goto switchstate;
459 }
460
461 /* Register status code */
462 ps->statuscode = PEER_SESSION_SUCCESSCODE;
463
464 /* Awake main task */
465 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
466
467 /* Init cursors */
468 ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0;
469 ps->pushed = ps->update;
470
471 /* Init confirm counter */
472 ps->confirm = 0;
473
474 /* reset teaching and learning flags to 0 */
475 ps->flags &= PEER_TEACH_RESET;
476 ps->flags &= PEER_LEARN_RESET;
477
478 /* if current peer is local */
479 if (ps->peer->local) {
480 /* if table need resyncfrom local and no process assined */
481 if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL &&
482 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
483 /* assign local peer for a lesson, consider lesson already requested */
484 ps->flags |= PEER_F_LEARN_ASSIGN;
485 ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
486 }
487
488 }
489 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
490 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
491 /* assign peer for a lesson */
492 ps->flags |= PEER_F_LEARN_ASSIGN;
493 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
494 }
495 /* switch to waiting message state */
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100496 si->appctx.st0 = PEER_SESSION_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200497 goto switchstate;
498 }
499 case PEER_SESSION_CONNECT: {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100500 struct peer_session *ps = (struct peer_session *)si->appctx.ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200501
502 /* Send headers */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100503 repl = snprintf(trash.str, trash.size,
Emeric Brun2b920a12010-09-23 18:30:22 +0200504 PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n",
505 ps->peer->id,
506 localpeer,
Willy Tarreau7b77c9f2012-01-07 22:52:12 +0100507 (int)getpid(),
Emeric Brun2b920a12010-09-23 18:30:22 +0200508 ps->table->table->id,
509 ps->table->table->type,
Willy Tarreaubd55e312010-11-11 10:55:09 +0100510 (int)ps->table->table->key_size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200511
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100512 if (repl >= trash.size) {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100513 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200514 goto switchstate;
515 }
516
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100517 repl = bi_putblk(si->ib, trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200518 if (repl <= 0) {
519 if (repl == -1)
520 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100521 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200522 goto switchstate;
523 }
524
525 /* switch to the waiting statuscode state */
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100526 si->appctx.st0 = PEER_SESSION_GETSTATUS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200527 /* fall through */
528 }
529 case PEER_SESSION_GETSTATUS: {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100530 struct peer_session *ps = (struct peer_session *)si->appctx.ctx.peers.ptr;
Emeric Brun2b920a12010-09-23 18:30:22 +0200531
Willy Tarreau03cdb7c2012-08-27 23:14:58 +0200532 if (si->ib->flags & CF_WRITE_PARTIAL)
Emeric Brun2b920a12010-09-23 18:30:22 +0200533 ps->statuscode = PEER_SESSION_CONNECTEDCODE;
534
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100535 reql = bo_getline(si->ob, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200536 if (reql <= 0) { /* closed or EOL not found */
537 if (reql == 0)
538 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100539 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200540 goto switchstate;
541 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100542 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200543 /* Incomplete line, we quit */
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100544 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200545 goto switchstate;
546 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100547 else if (reql > 1 && (trash.str[reql-2] == '\r'))
548 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200549 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100550 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200551
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200552 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200553
554 /* Register status code */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100555 ps->statuscode = atoi(trash.str);
Emeric Brun2b920a12010-09-23 18:30:22 +0200556
557 /* Awake main task */
558 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
559
560 /* If status code is success */
561 if (ps->statuscode == PEER_SESSION_SUCCESSCODE) {
562 /* Init cursors */
563 ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0;
564 ps->pushed = ps->update;
565
566 /* Init confirm counter */
567 ps->confirm = 0;
568
569 /* reset teaching and learning flags to 0 */
570 ps->flags &= PEER_TEACH_RESET;
571 ps->flags &= PEER_LEARN_RESET;
572
573 /* If current peer is local */
574 if (ps->peer->local) {
575 /* Init cursors to push a resync */
576 ps->teaching_origin = ps->pushed = ps->table->table->update;
577 /* flag to start to teach lesson */
578 ps->flags |= PEER_F_TEACH_PROCESS;
579
580 }
581 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
582 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
583 /* If peer is remote and resync from remote is needed,
584 and no peer currently assigned */
585
586 /* assign peer for a lesson */
587 ps->flags |= PEER_F_LEARN_ASSIGN;
588 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
589 }
590
591 }
592 else {
593 /* Status code is not success, abort */
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100594 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200595 goto switchstate;
596 }
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100597 si->appctx.st0 = PEER_SESSION_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200598 /* fall through */
599 }
600 case PEER_SESSION_WAITMSG: {
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100601 struct peer_session *ps = (struct peer_session *)si->appctx.ctx.peers.ptr;
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200602 struct stksess *ts, *newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200603 char c;
604 int totl = 0;
605
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200606 reql = bo_getblk(si->ob, (char *)&c, sizeof(c), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200607 if (reql <= 0) /* closed or EOL not found */
608 goto incomplete;
609
Emeric Brun2b920a12010-09-23 18:30:22 +0200610 totl += reql;
611
612 if ((c & 0x80) || (c == 'D')) {
613 /* Here we have data message */
614 unsigned int pushack;
Emeric Brun2b920a12010-09-23 18:30:22 +0200615 int srvid;
616 uint32_t netinteger;
617
618 /* Compute update remote version */
619 if (c & 0x80) {
620 pushack = ps->pushack + (unsigned int)(c & 0x7F);
621 }
622 else {
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200623 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200624 if (reql <= 0) /* closed or EOL not found */
625 goto incomplete;
626
Emeric Brun2b920a12010-09-23 18:30:22 +0200627 totl += reql;
628 pushack = ntohl(netinteger);
629 }
630
Willy Tarreau86a446e2013-11-25 23:02:37 +0100631 /* Read key. The string keys are read in two steps, the first step
632 * consists in reading whatever fits into the table directly into
633 * the pre-allocated key. The second step consists in simply
634 * draining all exceeding data. This can happen for example after a
635 * config reload with a smaller key size for the stick table than
636 * what was previously set, or when facing the impossibility to
637 * allocate a new stksess (for example when the table is full with
638 * "nopurge").
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200639 */
Emeric Brun2b920a12010-09-23 18:30:22 +0200640 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
Willy Tarreau86a446e2013-11-25 23:02:37 +0100641 unsigned int to_read, to_store;
642
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200643 /* read size first */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200644 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200645 if (reql <= 0) /* closed or EOL not found */
646 goto incomplete;
647
Emeric Brun2b920a12010-09-23 18:30:22 +0200648 totl += reql;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100649
650 to_store = 0;
651 to_read = ntohl(netinteger);
652
653 if (to_read + totl > si->ob->buf->size) {
654 /* impossible to read a key this large, abort */
655 reql = -1;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200656 goto incomplete;
Willy Tarreau86a446e2013-11-25 23:02:37 +0100657 }
Willy Tarreau72d6c162013-04-11 16:14:13 +0200658
Willy Tarreau86a446e2013-11-25 23:02:37 +0100659 newts = stksess_new(ps->table->table, NULL);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200660 if (newts)
Willy Tarreau86a446e2013-11-25 23:02:37 +0100661 to_store = MIN(to_read, ps->table->table->key_size - 1);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200662
Willy Tarreau86a446e2013-11-25 23:02:37 +0100663 /* we read up to two blocks, the first one goes into the key,
664 * the rest is drained into the trash.
665 */
666 if (to_store) {
667 reql = bo_getblk(si->ob, (char *)newts->key.key, to_store, totl);
668 if (reql <= 0) /* closed or incomplete */
669 goto incomplete;
670 newts->key.key[reql] = 0;
671 totl += reql;
672 to_read -= reql;
673 }
674 if (to_read) {
675 reql = bo_getblk(si->ob, trash.str, to_read, totl);
676 if (reql <= 0) /* closed or incomplete */
677 goto incomplete;
678 totl += reql;
679 }
Emeric Brun2b920a12010-09-23 18:30:22 +0200680 }
681 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200682 newts = stksess_new(ps->table->table, NULL);
683 reql = bo_getblk(si->ob, newts ? (char *)newts->key.key : trash.str, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200684 if (reql <= 0) /* closed or EOL not found */
685 goto incomplete;
Emeric Brun2b920a12010-09-23 18:30:22 +0200686 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200687 }
688 else {
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200689 /* type ip or binary */
690 newts = stksess_new(ps->table->table, NULL);
691 reql = bo_getblk(si->ob, newts ? (char *)newts->key.key : trash.str, ps->table->table->key_size, totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200692 if (reql <= 0) /* closed or EOL not found */
693 goto incomplete;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200694 totl += reql;
Emeric Brun2b920a12010-09-23 18:30:22 +0200695 }
696
697 /* read server id */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200698 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200699 if (reql <= 0) /* closed or EOL not found */
700 goto incomplete;
701
Emeric Brun2b920a12010-09-23 18:30:22 +0200702 totl += reql;
703 srvid = ntohl(netinteger);
704
705 /* update entry */
Emeric Brun2b920a12010-09-23 18:30:22 +0200706 if (newts) {
707 /* lookup for existing entry */
708 ts = stktable_lookup(ps->table->table, newts);
709 if (ts) {
710 /* the entry already exist, we can free ours */
711 stktable_touch(ps->table->table, ts, 0);
712 stksess_free(ps->table->table, newts);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200713 newts = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200714 }
715 else {
716 struct eb32_node *eb;
717
718 /* create new entry */
719 ts = stktable_store(ps->table->table, newts, 0);
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200720 newts = NULL; /* don't reuse it */
721
Emeric Brun2b920a12010-09-23 18:30:22 +0200722 ts->upd.key= (++ps->table->table->update)+(2^31);
723 eb = eb32_insert(&ps->table->table->updates, &ts->upd);
724 if (eb != &ts->upd) {
725 eb32_delete(eb);
726 eb32_insert(&ps->table->table->updates, &ts->upd);
727 }
728 }
729
730 /* update entry */
731 if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
732 stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
733 ps->pushack = pushack;
734 }
735
736 }
737 else if (c == 'R') {
738 /* Reset message: remote need resync */
739
740 /* reinit counters for a resync */
741 ps->lastpush = 0;
742 ps->teaching_origin = ps->pushed = ps->table->table->update;
743
744 /* reset teaching flags to 0 */
745 ps->flags &= PEER_TEACH_RESET;
746
747 /* flag to start to teach lesson */
748 ps->flags |= PEER_F_TEACH_PROCESS;
749 }
750 else if (c == 'F') {
751 /* Finish message, all known updates have been pushed by remote */
752 /* and remote is up to date */
753
754 /* If resync is in progress with remote peer */
755 if (ps->flags & PEER_F_LEARN_ASSIGN) {
756
757 /* unassign current peer for learning */
758 ps->flags &= ~PEER_F_LEARN_ASSIGN;
759 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
760
761 /* Consider table is now up2date, resync resync no more needed from local neither remote */
762 ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE);
763 }
764 /* Increase confirm counter to launch a confirm message */
765 ps->confirm++;
766 }
767 else if (c == 'c') {
768 /* confirm message, remote peer is now up to date with us */
769
770 /* If stopping state */
771 if (stopping) {
772 /* Close session, push resync no more needed */
773 ps->flags |= PEER_F_TEACH_COMPLETE;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100774 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200775 goto switchstate;
776 }
777
778 /* reset teaching flags to 0 */
779 ps->flags &= PEER_TEACH_RESET;
780 }
781 else if (c == 'C') {
782 /* Continue message, all known updates have been pushed by remote */
783 /* but remote is not up to date */
784
785 /* If resync is in progress with current peer */
786 if (ps->flags & PEER_F_LEARN_ASSIGN) {
787
788 /* unassign current peer */
789 ps->flags &= ~PEER_F_LEARN_ASSIGN;
790 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
791
792 /* flag current peer is not up 2 date to try from an other */
793 ps->flags |= PEER_F_LEARN_NOTUP2DATE;
794
795 /* reschedule a resync */
796 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
797 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
798 }
799 ps->confirm++;
800 }
801 else if (c == 'A') {
802 /* ack message */
803 uint32_t netinteger;
804
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200805 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Willy Tarreau72d6c162013-04-11 16:14:13 +0200806 if (reql <= 0) /* closed or EOL not found */
807 goto incomplete;
808
Emeric Brun2b920a12010-09-23 18:30:22 +0200809 totl += reql;
810
811 /* Consider remote is up to date with "acked" version */
812 ps->update = ntohl(netinteger);
813 }
814 else {
815 /* Unknown message */
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100816 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200817 goto switchstate;
818 }
819
820 /* skip consumed message */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200821 bo_skip(si->ob, totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200822
823 /* loop on that state to peek next message */
Willy Tarreau72d6c162013-04-11 16:14:13 +0200824 goto switchstate;
825
Emeric Brun2b920a12010-09-23 18:30:22 +0200826incomplete:
Willy Tarreau9d9179b2013-04-11 16:56:44 +0200827 /* we get here when a bo_getblk() returns <= 0 in reql */
828
829 /* first, we may have to release newts */
830 if (newts) {
831 stksess_free(ps->table->table, newts);
832 newts = NULL;
833 }
834
Willy Tarreau72d6c162013-04-11 16:14:13 +0200835 if (reql < 0) {
836 /* there was an error */
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100837 si->appctx.st0 = PEER_SESSION_END;
Willy Tarreau72d6c162013-04-11 16:14:13 +0200838 goto switchstate;
839 }
840
Emeric Brun2b920a12010-09-23 18:30:22 +0200841 /* Nothing to read, now we start to write */
842
843 /* Confirm finished or partial messages */
844 while (ps->confirm) {
845 /* There is a confirm messages to send */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200846 repl = bi_putchr(si->ib, 'c');
Emeric Brun2b920a12010-09-23 18:30:22 +0200847 if (repl <= 0) {
848 /* no more write possible */
849 if (repl == -1)
850 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100851 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200852 goto switchstate;
853 }
854 ps->confirm--;
855 }
856
857 /* Need to request a resync */
858 if ((ps->flags & PEER_F_LEARN_ASSIGN) &&
859 (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) &&
860 !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
861 /* Current peer was elected to request a resync */
862
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200863 repl = bi_putchr(si->ib, 'R');
Emeric Brun2b920a12010-09-23 18:30:22 +0200864 if (repl <= 0) {
865 /* no more write possible */
866 if (repl == -1)
867 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100868 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200869 goto switchstate;
870 }
871 ps->table->flags |= SHTABLE_F_RESYNC_PROCESS;
872 }
873
874 /* It remains some updates to ack */
875 if (ps->pushack != ps->lastack) {
876 uint32_t netinteger;
877
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100878 trash.str[0] = 'A';
Emeric Brun2b920a12010-09-23 18:30:22 +0200879 netinteger = htonl(ps->pushack);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100880 memcpy(&trash.str[1], &netinteger, sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200881
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100882 repl = bi_putblk(si->ib, trash.str, 1+sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200883 if (repl <= 0) {
884 /* no more write possible */
885 if (repl == -1)
886 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100887 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200888 goto switchstate;
889 }
890 ps->lastack = ps->pushack;
891 }
892
893 if (ps->flags & PEER_F_TEACH_PROCESS) {
894 /* current peer was requested for a lesson */
895
896 if (!(ps->flags & PEER_F_TEACH_STAGE1)) {
897 /* lesson stage 1 not complete */
898 struct eb32_node *eb;
899
900 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
901 while (1) {
902 int msglen;
903 struct stksess *ts;
904
905 if (!eb) {
906 /* flag lesson stage1 complete */
907 ps->flags |= PEER_F_TEACH_STAGE1;
908 eb = eb32_first(&ps->table->table->updates);
909 if (eb)
910 ps->pushed = eb->key - 1;
911 break;
912 }
913
914 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100915 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200916 if (msglen) {
917 /* message to buffer */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100918 repl = bi_putblk(si->ib, trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200919 if (repl <= 0) {
920 /* no more write possible */
921 if (repl == -1)
922 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100923 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200924 goto switchstate;
925 }
926 ps->lastpush = ps->pushed = ts->upd.key;
927 }
928 eb = eb32_next(eb);
929 }
930 } /* !TEACH_STAGE1 */
931
932 if (!(ps->flags & PEER_F_TEACH_STAGE2)) {
933 /* lesson stage 2 not complete */
934 struct eb32_node *eb;
935
936 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
937 while (1) {
938 int msglen;
939 struct stksess *ts;
940
941 if (!eb || eb->key > ps->teaching_origin) {
942 /* flag lesson stage1 complete */
943 ps->flags |= PEER_F_TEACH_STAGE2;
944 ps->pushed = ps->teaching_origin;
945 break;
946 }
947
948 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100949 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200950 if (msglen) {
951 /* message to buffer */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100952 repl = bi_putblk(si->ib, trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200953 if (repl <= 0) {
954 /* no more write possible */
955 if (repl == -1)
956 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100957 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200958 goto switchstate;
959 }
960 ps->lastpush = ps->pushed = ts->upd.key;
961 }
962 eb = eb32_next(eb);
963 }
964 } /* !TEACH_STAGE2 */
965
966 if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
967 /* process final lesson message */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200968 repl = bi_putchr(si->ib, ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
Emeric Brun2b920a12010-09-23 18:30:22 +0200969 if (repl <= 0) {
970 /* no more write possible */
971 if (repl == -1)
972 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +0100973 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200974 goto switchstate;
975 }
976
977 /* flag finished message sent */
978 ps->flags |= PEER_F_TEACH_FINISHED;
979 } /* !TEACH_FINISHED */
980 } /* TEACH_PROCESS */
981
982 if (!(ps->flags & PEER_F_LEARN_ASSIGN) &&
983 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
984 /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */
985 struct eb32_node *eb;
986
987 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
988 while (1) {
989 int msglen;
990 struct stksess *ts;
991
992 /* push local updates */
993 if (!eb) {
994 eb = eb32_first(&ps->table->table->updates);
995 if (!eb || ((int)(eb->key - ps->pushed) <= 0)) {
996 ps->pushed = ps->table->table->localupdate;
997 break;
998 }
999 }
1000
1001 if ((int)(eb->key - ps->table->table->localupdate) > 0) {
1002 ps->pushed = ps->table->table->localupdate;
1003 break;
1004 }
1005
1006 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001007 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +02001008 if (msglen) {
1009 /* message to buffer */
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001010 repl = bi_putblk(si->ib, trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +02001011 if (repl <= 0) {
1012 /* no more write possible */
1013 if (repl == -1)
1014 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +01001015 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001016 goto switchstate;
1017 }
1018 ps->lastpush = ps->pushed = ts->upd.key;
1019 }
1020 eb = eb32_next(eb);
1021 }
1022 } /* ! LEARN_ASSIGN */
1023 /* noting more to do */
1024 goto out;
1025 }
1026 case PEER_SESSION_EXIT:
Willy Tarreau9b6c2c72013-11-24 09:38:33 +01001027 repl = snprintf(trash.str, trash.size, "%d\n", si->appctx.st1);
Emeric Brun2b920a12010-09-23 18:30:22 +02001028
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001029 if (bi_putblk(si->ib, trash.str, repl) == -1)
Emeric Brun2b920a12010-09-23 18:30:22 +02001030 goto out;
Willy Tarreau9b6c2c72013-11-24 09:38:33 +01001031 si->appctx.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001032 /* fall through */
1033 case PEER_SESSION_END: {
Willy Tarreau73b013b2012-05-21 16:31:45 +02001034 si_shutw(si);
1035 si_shutr(si);
Willy Tarreau03cdb7c2012-08-27 23:14:58 +02001036 si->ib->flags |= CF_READ_NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001037 goto quit;
1038 }
1039 }
1040 }
1041out:
Willy Tarreau73b013b2012-05-21 16:31:45 +02001042 si_update(si);
Willy Tarreau03cdb7c2012-08-27 23:14:58 +02001043 si->ob->flags |= CF_READ_DONTWAIT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001044 /* we don't want to expire timeouts while we're processing requests */
1045 si->ib->rex = TICK_ETERNITY;
1046 si->ob->wex = TICK_ETERNITY;
1047quit:
1048 return;
1049}
1050
Willy Tarreaub24281b2011-02-13 13:16:36 +01001051static struct si_applet peer_applet = {
Willy Tarreau3fdb3662012-11-12 00:42:33 +01001052 .obj_type = OBJ_TYPE_APPLET,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001053 .name = "<PEER>", /* used for logging */
1054 .fct = peer_io_handler,
Aman Gupta9a13e842012-04-02 18:57:53 -07001055 .release = peer_session_release,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001056};
Emeric Brun2b920a12010-09-23 18:30:22 +02001057
1058/*
1059 * Use this function to force a close of a peer session
1060 */
Simon Horman96553772011-06-08 09:18:51 +09001061static void peer_session_forceshutdown(struct session * session)
Emeric Brun2b920a12010-09-23 18:30:22 +02001062{
1063 struct stream_interface *oldsi;
1064
Willy Tarreaucf644ed2013-09-29 17:19:56 +02001065 if (si_applet(&session->si[0]) == &peer_applet) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001066 oldsi = &session->si[0];
1067 }
1068 else {
1069 oldsi = &session->si[1];
1070 }
1071
1072 /* call release to reinit resync states if needed */
1073 peer_session_release(oldsi);
Willy Tarreau9b6c2c72013-11-24 09:38:33 +01001074 oldsi->appctx.st0 = PEER_SESSION_END;
1075 oldsi->appctx.ctx.peers.ptr = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001076 task_wakeup(session->task, TASK_WOKEN_MSG);
1077}
1078
Willy Tarreaub363a1f2013-10-01 10:45:07 +02001079/* Finish a session accept() for a peer. It returns a negative value in case of
1080 * a critical failure which must cause the listener to be disabled, a positive
1081 * value in case of success, or zero if it is a success but the session must be
1082 * closed ASAP and ignored.
Emeric Brun2b920a12010-09-23 18:30:22 +02001083 */
1084int peer_accept(struct session *s)
1085{
Willy Tarreauce9dbcd2013-10-01 17:12:05 +02001086 /* we have a dedicated I/O handler for the peers, so we can safely
1087 * release the pre-allocated connection that we will never use.
1088 */
1089 pool_free2(pool2_connection, s->si[1].conn);
1090 s->si[1].conn = NULL;
1091
Willy Tarreaub24281b2011-02-13 13:16:36 +01001092 stream_int_register_handler(&s->si[1], &peer_applet);
Willy Tarreaucf644ed2013-09-29 17:19:56 +02001093 s->target = &peer_applet.obj_type; // for logging only
Willy Tarreau9b6c2c72013-11-24 09:38:33 +01001094 s->si[1].appctx.ctx.peers.ptr = s;
1095 s->si[1].appctx.st0 = PEER_SESSION_ACCEPT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001096
1097 tv_zero(&s->logs.tv_request);
1098 s->logs.t_queue = 0;
1099 s->logs.t_connect = 0;
1100 s->logs.t_data = 0;
1101 s->logs.t_close = 0;
1102 s->logs.bytes_in = s->logs.bytes_out = 0;
1103 s->logs.prx_queue_size = 0;/* we get the number of pending conns before us */
1104 s->logs.srv_queue_size = 0; /* we will get this number soon */
1105
Willy Tarreau03cdb7c2012-08-27 23:14:58 +02001106 s->req->flags |= CF_READ_DONTWAIT; /* we plan to read small requests */
Emeric Brun2b920a12010-09-23 18:30:22 +02001107
1108 if (s->listener->timeout) {
1109 s->req->rto = *s->listener->timeout;
1110 s->rep->wto = *s->listener->timeout;
1111 }
1112 return 1;
1113}
1114
1115/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01001116 * Create a new peer session in assigned state (connect will start automatically)
Emeric Brun2b920a12010-09-23 18:30:22 +02001117 */
Simon Horman96553772011-06-08 09:18:51 +09001118static struct session *peer_session_create(struct peer *peer, struct peer_session *ps)
Emeric Brun2b920a12010-09-23 18:30:22 +02001119{
Willy Tarreau4348fad2012-09-20 16:48:07 +02001120 struct listener *l = LIST_NEXT(&peer->peers->peers_fe->conf.listeners, struct listener *, by_fe);
Emeric Brun2b920a12010-09-23 18:30:22 +02001121 struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */
1122 struct session *s;
1123 struct http_txn *txn;
1124 struct task *t;
1125
1126 if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */
Godbach430f2912013-06-20 13:28:38 +08001127 Alert("out of memory in peer_session_create().\n");
Emeric Brun2b920a12010-09-23 18:30:22 +02001128 goto out_close;
1129 }
1130
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001131 if (unlikely((s->si[1].conn = pool_alloc2(pool2_connection)) == NULL))
1132 goto out_fail_conn1;
1133
Emeric Brun2b920a12010-09-23 18:30:22 +02001134 LIST_ADDQ(&sessions, &s->list);
1135 LIST_INIT(&s->back_refs);
1136
1137 s->flags = SN_ASSIGNED|SN_ADDR_SET;
Emeric Brun2b920a12010-09-23 18:30:22 +02001138
1139 /* if this session comes from a known monitoring system, we want to ignore
1140 * it as soon as possible, which means closing it immediately for TCP.
1141 */
1142 if ((t = task_new()) == NULL) { /* disable this proxy for a while */
Godbach430f2912013-06-20 13:28:38 +08001143 Alert("out of memory in peer_session_create().\n");
Emeric Brun2b920a12010-09-23 18:30:22 +02001144 goto out_free_session;
1145 }
1146
1147 ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
1148 ps->statuscode = PEER_SESSION_CONNECTCODE;
1149
1150 t->process = l->handler;
1151 t->context = s;
1152 t->nice = l->nice;
1153
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001154 memcpy(&s->si[1].conn->addr.to, &peer->addr, sizeof(s->si[1].conn->addr.to));
Emeric Brun2b920a12010-09-23 18:30:22 +02001155 s->task = t;
1156 s->listener = l;
1157
1158 /* Note: initially, the session's backend points to the frontend.
1159 * This changes later when switching rules are executed or
1160 * when the default backend is assigned.
1161 */
1162 s->be = s->fe = p;
1163
1164 s->req = s->rep = NULL; /* will be allocated later */
1165
Willy Tarreauce9dbcd2013-10-01 17:12:05 +02001166 s->si[0].conn = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001167 s->si[0].owner = t;
1168 s->si[0].state = s->si[0].prev_state = SI_ST_EST;
1169 s->si[0].err_type = SI_ET_NONE;
Willy Tarreau63e7fe32012-05-08 15:20:43 +02001170 s->si[0].send_proxy_ofs = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +02001171 s->si[0].exp = TICK_ETERNITY;
1172 s->si[0].flags = SI_FL_NONE;
1173 if (s->fe->options2 & PR_O2_INDEPSTR)
1174 s->si[0].flags |= SI_FL_INDEP_STR;
Emeric Brun2b920a12010-09-23 18:30:22 +02001175
Willy Tarreaub24281b2011-02-13 13:16:36 +01001176 stream_int_register_handler(&s->si[0], &peer_applet);
Willy Tarreau9b6c2c72013-11-24 09:38:33 +01001177 s->si[0].appctx.st0 = PEER_SESSION_CONNECT;
1178 s->si[0].appctx.ctx.peers.ptr = (void *)ps;
Emeric Brun2b920a12010-09-23 18:30:22 +02001179
Willy Tarreau1e6902f2013-09-29 10:47:38 +02001180 s->si[1].conn->obj_type = OBJ_TYPE_CONN;
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001181 s->si[1].conn->t.sock.fd = -1; /* just to help with debugging */
1182 s->si[1].conn->flags = CO_FL_NONE;
Willy Tarreau14cba4b2012-11-30 17:33:05 +01001183 s->si[1].conn->err_code = CO_ER_NONE;
Willy Tarreaub363a1f2013-10-01 10:45:07 +02001184 s->si[1].conn->target = &s->be->obj_type;
1185
Emeric Brun2b920a12010-09-23 18:30:22 +02001186 s->si[1].owner = t;
1187 s->si[1].state = s->si[1].prev_state = SI_ST_ASS;
1188 s->si[1].conn_retries = p->conn_retries;
1189 s->si[1].err_type = SI_ET_NONE;
Willy Tarreau63e7fe32012-05-08 15:20:43 +02001190 s->si[1].send_proxy_ofs = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +02001191 s->si[1].exp = TICK_ETERNITY;
1192 s->si[1].flags = SI_FL_NONE;
1193 if (s->be->options2 & PR_O2_INDEPSTR)
1194 s->si[1].flags |= SI_FL_INDEP_STR;
1195
Willy Tarreaub363a1f2013-10-01 10:45:07 +02001196 /* will automatically prepare the stream interface to connect to the
1197 * pre-initialized connection in si->conn.
1198 */
1199 si_prepare_conn(&s->si[1], peer->proto, peer->xprt);
1200
Willy Tarreau9bd0d742011-07-20 00:17:39 +02001201 session_init_srv_conn(s);
Willy Tarreau3fdb3662012-11-12 00:42:33 +01001202 s->target = &s->be->obj_type;
Emeric Brun2b920a12010-09-23 18:30:22 +02001203 s->pend_pos = NULL;
1204
1205 /* init store persistence */
1206 s->store_count = 0;
Willy Tarreaud5ca9ab2013-05-28 17:40:25 +02001207 memset(s->stkctr, 0, sizeof(s->stkctr));
Emeric Brun2b920a12010-09-23 18:30:22 +02001208
1209 /* FIXME: the logs are horribly complicated now, because they are
Willy Tarreauae727bf2013-10-01 17:06:10 +02001210 * defined in <p>, <p>, and later <be> and <be>. We still initialize
1211 * a few of them to help troubleshooting (eg: show sess shows them).
Emeric Brun2b920a12010-09-23 18:30:22 +02001212 */
1213
1214 s->logs.logwait = 0;
Willy Tarreauabcd5142013-06-11 17:18:02 +02001215 s->logs.level = 0;
Willy Tarreauae727bf2013-10-01 17:06:10 +02001216 s->logs.accept_date = date; /* user-visible date for logging */
1217 s->logs.tv_accept = now; /* corrected date for internal use */
Emeric Brun2b920a12010-09-23 18:30:22 +02001218 s->do_log = NULL;
1219
1220 /* default error reporting function, may be changed by analysers */
1221 s->srv_error = default_srv_error;
1222
Emeric Brun2b920a12010-09-23 18:30:22 +02001223 s->uniq_id = 0;
Willy Tarreaubd833142012-05-08 15:51:44 +02001224 s->unique_id = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001225
1226 txn = &s->txn;
1227 /* Those variables will be checked and freed if non-NULL in
1228 * session.c:session_free(). It is important that they are
1229 * properly initialized.
1230 */
1231 txn->sessid = NULL;
1232 txn->srv_cookie = NULL;
1233 txn->cli_cookie = NULL;
1234 txn->uri = NULL;
1235 txn->req.cap = NULL;
1236 txn->rsp.cap = NULL;
1237 txn->hdr_idx.v = NULL;
1238 txn->hdr_idx.size = txn->hdr_idx.used = 0;
1239
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001240 if ((s->req = pool_alloc2(pool2_channel)) == NULL)
Emeric Brun2b920a12010-09-23 18:30:22 +02001241 goto out_fail_req; /* no memory */
1242
Willy Tarreau9b28e032012-10-12 23:49:43 +02001243 if ((s->req->buf = pool_alloc2(pool2_buffer)) == NULL)
1244 goto out_fail_req_buf; /* no memory */
1245
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001246 s->req->buf->size = trash.size;
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001247 channel_init(s->req);
Emeric Brun2b920a12010-09-23 18:30:22 +02001248 s->req->prod = &s->si[0];
1249 s->req->cons = &s->si[1];
1250 s->si[0].ib = s->si[1].ob = s->req;
1251
Willy Tarreau03cdb7c2012-08-27 23:14:58 +02001252 s->req->flags |= CF_READ_ATTACHED; /* the producer is already connected */
Emeric Brun2b920a12010-09-23 18:30:22 +02001253
1254 /* activate default analysers enabled for this listener */
1255 s->req->analysers = l->analysers;
1256
1257 /* note: this should not happen anymore since there's always at least the switching rules */
1258 if (!s->req->analysers) {
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001259 channel_auto_connect(s->req);/* don't wait to establish connection */
1260 channel_auto_close(s->req);/* let the producer forward close requests */
Emeric Brun2b920a12010-09-23 18:30:22 +02001261 }
1262
1263 s->req->rto = s->fe->timeout.client;
1264 s->req->wto = s->be->timeout.server;
1265
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001266 if ((s->rep = pool_alloc2(pool2_channel)) == NULL)
Emeric Brun2b920a12010-09-23 18:30:22 +02001267 goto out_fail_rep; /* no memory */
1268
Willy Tarreau9b28e032012-10-12 23:49:43 +02001269 if ((s->rep->buf = pool_alloc2(pool2_buffer)) == NULL)
1270 goto out_fail_rep_buf; /* no memory */
1271
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001272 s->rep->buf->size = trash.size;
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001273 channel_init(s->rep);
Emeric Brun2b920a12010-09-23 18:30:22 +02001274 s->rep->prod = &s->si[1];
1275 s->rep->cons = &s->si[0];
1276 s->si[0].ob = s->si[1].ib = s->rep;
1277
1278 s->rep->rto = s->be->timeout.server;
1279 s->rep->wto = s->fe->timeout.client;
1280
1281 s->req->rex = TICK_ETERNITY;
1282 s->req->wex = TICK_ETERNITY;
1283 s->req->analyse_exp = TICK_ETERNITY;
1284 s->rep->rex = TICK_ETERNITY;
1285 s->rep->wex = TICK_ETERNITY;
1286 s->rep->analyse_exp = TICK_ETERNITY;
1287 t->expire = TICK_ETERNITY;
1288
Willy Tarreau03cdb7c2012-08-27 23:14:58 +02001289 s->rep->flags |= CF_READ_DONTWAIT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001290 /* it is important not to call the wakeup function directly but to
1291 * pass through task_wakeup(), because this one knows how to apply
1292 * priorities to tasks.
1293 */
1294 task_wakeup(t, TASK_WOKEN_INIT);
1295
1296 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
1297 p->feconn++;/* beconn will be increased later */
1298 jobs++;
Willy Tarreau3c63fd82011-09-07 18:00:47 +02001299 if (!(s->listener->options & LI_O_UNLIMITED))
1300 actconn++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001301 totalconn++;
1302
1303 return s;
1304
1305 /* Error unrolling */
Willy Tarreau9b28e032012-10-12 23:49:43 +02001306 out_fail_rep_buf:
1307 pool_free2(pool2_channel, s->rep);
Emeric Brun2b920a12010-09-23 18:30:22 +02001308 out_fail_rep:
Willy Tarreau9b28e032012-10-12 23:49:43 +02001309 pool_free2(pool2_buffer, s->req->buf);
1310 out_fail_req_buf:
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001311 pool_free2(pool2_channel, s->req);
Emeric Brun2b920a12010-09-23 18:30:22 +02001312 out_fail_req:
1313 task_free(t);
1314 out_free_session:
1315 LIST_DEL(&s->list);
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001316 pool_free2(pool2_connection, s->si[1].conn);
1317 out_fail_conn1:
Emeric Brun2b920a12010-09-23 18:30:22 +02001318 pool_free2(pool2_session, s);
1319 out_close:
1320 return s;
1321}
1322
1323/*
1324 * Task processing function to manage re-connect and peer session
1325 * tasks wakeup on local update.
1326 */
Simon Horman96553772011-06-08 09:18:51 +09001327static struct task *process_peer_sync(struct task * task)
Emeric Brun2b920a12010-09-23 18:30:22 +02001328{
1329 struct shared_table *st = (struct shared_table *)task->context;
1330 struct peer_session *ps;
1331
1332 task->expire = TICK_ETERNITY;
1333
1334 if (!stopping) {
1335 /* Normal case (not soft stop)*/
1336 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) &&
1337 (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) &&
1338 !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) {
1339 /* Resync from local peer needed
1340 no peer was assigned for the lesson
1341 and no old local peer found
1342 or resync timeout expire */
1343
1344 /* flag no more resync from local, to try resync from remotes */
1345 st->flags |= SHTABLE_F_RESYNC_LOCAL;
1346
1347 /* reschedule a resync */
1348 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1349 }
1350
1351 /* For each session */
1352 for (ps = st->sessions; ps; ps = ps->next) {
1353 /* For each remote peers */
1354 if (!ps->peer->local) {
1355 if (!ps->session) {
1356 /* no active session */
1357 if (ps->statuscode == 0 ||
1358 ps->statuscode == PEER_SESSION_SUCCESSCODE ||
1359 ((ps->statuscode == PEER_SESSION_CONNECTCODE ||
1360 ps->statuscode == PEER_SESSION_CONNECTEDCODE) &&
1361 tick_is_expired(ps->reconnect, now_ms))) {
1362 /* connection never tried
1363 * or previous session established with success
1364 * or previous session failed during connection
1365 * and reconnection timer is expired */
1366
1367 /* retry a connect */
1368 ps->session = peer_session_create(ps->peer, ps);
1369 }
1370 else if (ps->statuscode == PEER_SESSION_CONNECTCODE ||
1371 ps->statuscode == PEER_SESSION_CONNECTEDCODE) {
1372 /* If previous session failed during connection
1373 * but reconnection timer is not expired */
1374
1375 /* reschedule task for reconnect */
1376 task->expire = tick_first(task->expire, ps->reconnect);
1377 }
1378 /* else do nothing */
1379 } /* !ps->session */
1380 else if (ps->statuscode == PEER_SESSION_SUCCESSCODE) {
1381 /* current session is active and established */
1382 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1383 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1384 !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
1385 /* Resync from a remote is needed
1386 * and no peer was assigned for lesson
1387 * and current peer may be up2date */
1388
1389 /* assign peer for the lesson */
1390 ps->flags |= PEER_F_LEARN_ASSIGN;
1391 st->flags |= SHTABLE_F_RESYNC_ASSIGN;
1392
1393 /* awake peer session task to handle a request of resync */
1394 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1395 }
1396 else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
1397 /* awake peer session task to push local updates */
1398 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1399 }
1400 /* else do nothing */
1401 } /* SUCCESSCODE */
1402 } /* !ps->peer->local */
1403 } /* for */
1404
1405 /* Resync from remotes expired: consider resync is finished */
1406 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1407 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1408 tick_is_expired(st->resync_timeout, now_ms)) {
1409 /* Resync from remote peer needed
1410 * no peer was assigned for the lesson
1411 * and resync timeout expire */
1412
1413 /* flag no more resync from remote, consider resync is finished */
1414 st->flags |= SHTABLE_F_RESYNC_REMOTE;
1415 }
1416
1417 if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) {
1418 /* Resync not finished*/
1419 /* reschedule task to resync timeout, to ended resync if needed */
1420 task->expire = tick_first(task->expire, st->resync_timeout);
1421 }
1422 } /* !stopping */
1423 else {
1424 /* soft stop case */
1425 if (task->state & TASK_WOKEN_SIGNAL) {
1426 /* We've just recieved the signal */
1427 if (!(st->flags & SHTABLE_F_DONOTSTOP)) {
1428 /* add DO NOT STOP flag if not present */
1429 jobs++;
1430 st->flags |= SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001431 st->table->syncing++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001432 }
1433
1434 /* disconnect all connected peers */
1435 for (ps = st->sessions; ps; ps = ps->next) {
1436 if (ps->session) {
1437 peer_session_forceshutdown(ps->session);
1438 ps->session = NULL;
1439 }
1440 }
1441 }
1442 ps = st->local_session;
1443
1444 if (ps->flags & PEER_F_TEACH_COMPLETE) {
1445 if (st->flags & SHTABLE_F_DONOTSTOP) {
1446 /* resync of new process was complete, current process can die now */
1447 jobs--;
1448 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001449 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001450 }
1451 }
1452 else if (!ps->session) {
1453 /* If session is not active */
1454 if (ps->statuscode == 0 ||
1455 ps->statuscode == PEER_SESSION_SUCCESSCODE ||
1456 ps->statuscode == PEER_SESSION_CONNECTEDCODE ||
1457 ps->statuscode == PEER_SESSION_TRYAGAIN) {
1458 /* connection never tried
1459 * or previous session was successfully established
1460 * or previous session tcp connect success but init state incomplete
1461 * or during previous connect, peer replies a try again statuscode */
1462
1463 /* connect to the peer */
1464 ps->session = peer_session_create(ps->peer, ps);
1465 }
1466 else {
1467 /* Other error cases */
1468 if (st->flags & SHTABLE_F_DONOTSTOP) {
1469 /* unable to resync new process, current process can die now */
1470 jobs--;
1471 st->flags &= ~SHTABLE_F_DONOTSTOP;
Willy Tarreau3a925c12013-09-04 17:54:01 +02001472 st->table->syncing--;
Emeric Brun2b920a12010-09-23 18:30:22 +02001473 }
1474 }
1475 }
1476 else if (ps->statuscode == PEER_SESSION_SUCCESSCODE &&
1477 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
1478 /* current session active and established
1479 awake session to push remaining local updates */
1480 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1481 }
1482 } /* stopping */
1483 /* Wakeup for re-connect */
1484 return task;
1485}
1486
1487/*
1488 * Function used to register a table for sync on a group of peers
1489 *
1490 */
1491void peers_register_table(struct peers *peers, struct stktable *table)
1492{
1493 struct shared_table *st;
1494 struct peer * curpeer;
1495 struct peer_session *ps;
Willy Tarreau4348fad2012-09-20 16:48:07 +02001496 struct listener *listener;
Emeric Brun2b920a12010-09-23 18:30:22 +02001497
1498 st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
1499 st->table = table;
1500 st->next = peers->tables;
1501 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1502 peers->tables = st;
1503
1504 for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
1505 ps = (struct peer_session *)calloc(1,sizeof(struct peer_session));
1506 ps->table = st;
1507 ps->peer = curpeer;
1508 if (curpeer->local)
1509 st->local_session = ps;
1510 ps->next = st->sessions;
1511 ps->reconnect = now_ms;
1512 st->sessions = ps;
1513 peers->peers_fe->maxconn += 3;
1514 }
1515
Willy Tarreau4348fad2012-09-20 16:48:07 +02001516 list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
1517 listener->maxconn = peers->peers_fe->maxconn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001518 st->sync_task = task_new();
1519 st->sync_task->process = process_peer_sync;
1520 st->sync_task->expire = TICK_ETERNITY;
1521 st->sync_task->context = (void *)st;
1522 table->sync_task =st->sync_task;
1523 signal_register_task(0, table->sync_task, 0);
1524 task_wakeup(st->sync_task, TASK_WOKEN_INIT);
1525}
1526