blob: 263cc8a0df112e45e040db3e6754b75ce706deda [file] [log] [blame]
Emeric Brun2b920a12010-09-23 18:30:22 +02001/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01002 * Stick table synchro management.
Emeric Brun2b920a12010-09-23 18:30:22 +02003 *
4 * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <sys/socket.h>
20#include <sys/stat.h>
21#include <sys/types.h>
22
23#include <common/compat.h>
24#include <common/config.h>
25#include <common/time.h>
26
27#include <types/global.h>
Willy Tarreaud1d54542012-09-12 22:58:11 +020028#include <proto/listener.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020029#include <types/peers.h>
30
31#include <proto/acl.h>
Willy Tarreauc7e42382012-08-24 19:22:53 +020032#include <proto/channel.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020033#include <proto/fd.h>
34#include <proto/log.h>
35#include <proto/hdr_idx.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020036#include <proto/proto_tcp.h>
37#include <proto/proto_http.h>
38#include <proto/proxy.h>
39#include <proto/session.h>
40#include <proto/stream_interface.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020041#include <proto/task.h>
42#include <proto/stick_table.h>
43#include <proto/signal.h>
44
45
46/*******************************/
47/* Current peer learning state */
48/*******************************/
49
50/******************************/
51/* Current table resync state */
52/******************************/
53#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
54#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
55#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
56#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
57#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
58 to push data to new process */
59
60#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
61#define SHTABLE_RESYNC_FROMLOCAL 0x00000000
62#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL
63#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
64
65/******************************/
66/* Remote peer teaching state */
67/******************************/
68#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
69#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */
70#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */
71#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
72#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
73#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
74#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
75
76#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
77#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
78
79
80/**********************************/
81/* Peer Session IO handler states */
82/**********************************/
83
84#define PEER_SESSION_ACCEPT 1000 /* Initial state for session create by an accept */
85#define PEER_SESSION_GETVERSION 1001 /* Validate supported protocol version*/
86#define PEER_SESSION_GETHOST 1002 /* Validate host ID correspond to local host id */
87#define PEER_SESSION_GETPEER 1003 /* Validate peer ID correspond to a known remote peer id */
88#define PEER_SESSION_GETTABLE 1004 /* Search into registered table for a table with same id and
89 validate type and size */
90#define PEER_SESSION_SENDSUCCESS 1005 /* Send ret code 200 (success) and wait for message */
91/* next state is WAITMSG */
92
93#define PEER_SESSION_CONNECT 2000 /* Initial state for session create on a connect,
94 push presentation into buffer */
95#define PEER_SESSION_GETSTATUS 2001 /* Wait for the welcome message */
96#define PEER_SESSION_WAITMSG 2002 /* Wait for datamessages*/
97/* loop on WAITMSG */
98
99#define PEER_SESSION_EXIT 10000 /* Exit with status code */
100#define PEER_SESSION_END 10001 /* Killed session */
101/* session ended */
102
103
104/**********************************/
105/* Peer Session status code */
106/**********************************/
107
108#define PEER_SESSION_CONNECTCODE 100 /* connect in progress */
109#define PEER_SESSION_CONNECTEDCODE 110 /* tcp connect success */
110
111#define PEER_SESSION_SUCCESSCODE 200 /* accept or connect successful */
112
113#define PEER_SESSION_TRYAGAIN 300 /* try again later */
114
115#define PEER_SESSION_ERRPROTO 501 /* error protocol */
116#define PEER_SESSION_ERRVERSION 502 /* unknown protocol version */
117#define PEER_SESSION_ERRHOST 503 /* bad host name */
118#define PEER_SESSION_ERRPEER 504 /* unknown peer */
119#define PEER_SESSION_ERRTYPE 505 /* table key type mismatch */
120#define PEER_SESSION_ERRSIZE 506 /* table key size mismatch */
121#define PEER_SESSION_ERRTABLE 507 /* unknown table */
122
123#define PEER_SESSION_PROTO_NAME "HAProxyS"
124
125struct peers *peers = NULL;
Simon Horman96553772011-06-08 09:18:51 +0900126static void peer_session_forceshutdown(struct session * session);
Emeric Brun2b920a12010-09-23 18:30:22 +0200127
128
129/*
130 * This prepare the data update message of the stick session <ts>, <ps> is the the peer session
131 * where the data going to be pushed, <msg> is a buffer of <size> to recieve data message content
132 */
Simon Horman96553772011-06-08 09:18:51 +0900133static int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size)
Emeric Brun2b920a12010-09-23 18:30:22 +0200134{
135 uint32_t netinteger;
136 int len;
137 /* construct message */
138 if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) {
139 msg[0] = 0x80 + ts->upd.key - ps->lastpush;
140 len = sizeof(char);
141 }
142 else {
143 msg[0] = 'D';
144 netinteger = htonl(ts->upd.key);
145 memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger));
146 len = sizeof(char) + sizeof(netinteger);
147 }
148
149 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
150 int stlen = strlen((char *)ts->key.key);
151
152 netinteger = htonl(strlen((char *)ts->key.key));
153 memcpy(&msg[len], &netinteger, sizeof(netinteger));
154 memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen);
155 len += sizeof(netinteger) + stlen;
156
157 }
158 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
159 netinteger = htonl(*((uint32_t *)ts->key.key));
160 memcpy(&msg[len], &netinteger, sizeof(netinteger));
161 len += sizeof(netinteger);
162 }
163 else {
164 memcpy(&msg[len], ts->key.key, ps->table->table->key_size);
165 len += ps->table->table->key_size;
166 }
167
168 if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
169 netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id));
170 else
171 netinteger = 0;
172
173 memcpy(&msg[len], &netinteger , sizeof(netinteger));
174 len += sizeof(netinteger);
175
176 return len;
177}
178
179
180/*
181 * Callback to release a session with a peer
182 */
Simon Horman96553772011-06-08 09:18:51 +0900183static void peer_session_release(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200184{
Aman Guptad94991d2012-04-06 17:39:26 -0700185 struct task *t = (struct task *)si->owner;
Emeric Brun2b920a12010-09-23 18:30:22 +0200186 struct session *s = (struct session *)t->context;
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200187 struct peer_session *ps = (struct peer_session *)si->conn->xprt_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200188
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200189 /* si->conn->xprt_ctx is not a peer session */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100190 if (si->applet.st0 < PEER_SESSION_SENDSUCCESS)
Emeric Brun2b920a12010-09-23 18:30:22 +0200191 return;
192
193 /* peer session identified */
194 if (ps) {
195 if (ps->session == s) {
196 ps->session = NULL;
197 if (ps->flags & PEER_F_LEARN_ASSIGN) {
198 /* unassign current peer for learning */
199 ps->flags &= ~(PEER_F_LEARN_ASSIGN);
200 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
201
202 /* reschedule a resync */
203 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
204 }
205 /* reset teaching and learning flags to 0 */
206 ps->flags &= PEER_TEACH_RESET;
207 ps->flags &= PEER_LEARN_RESET;
208 }
209 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
210 }
211}
212
213
214/*
215 * IO Handler to handle message exchance with a peer
216 */
Willy Tarreaub24281b2011-02-13 13:16:36 +0100217static void peer_io_handler(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200218{
219 struct task *t= (struct task *)si->owner;
220 struct session *s = (struct session *)t->context;
221 struct peers *curpeers = (struct peers *)s->fe->parent;
222 int reql = 0;
223 int repl = 0;
224
225 while (1) {
226switchstate:
Willy Tarreaubc4af052011-02-13 13:25:14 +0100227 switch(si->applet.st0) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200228 case PEER_SESSION_ACCEPT:
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200229 si->conn->xprt_ctx = NULL;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100230 si->applet.st0 = PEER_SESSION_GETVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200231 /* fall through */
232 case PEER_SESSION_GETVERSION:
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100233 reql = bo_getline(si->ob, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200234 if (reql <= 0) { /* closed or EOL not found */
235 if (reql == 0)
236 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100237 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200238 goto switchstate;
239 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100240 if (trash.str[reql-1] != '\n') {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100241 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200242 goto switchstate;
243 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100244 else if (reql > 1 && (trash.str[reql-2] == '\r'))
245 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200246 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100247 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200248
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200249 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200250
251 /* test version */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100252 if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash.str) != 0) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100253 si->applet.st0 = PEER_SESSION_EXIT;
254 si->applet.st1 = PEER_SESSION_ERRVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200255 /* test protocol */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100256 if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.str, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
Willy Tarreaubc4af052011-02-13 13:25:14 +0100257 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200258 goto switchstate;
259 }
260
Willy Tarreaubc4af052011-02-13 13:25:14 +0100261 si->applet.st0 = PEER_SESSION_GETHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200262 /* fall through */
263 case PEER_SESSION_GETHOST:
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100264 reql = bo_getline(si->ob, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200265 if (reql <= 0) { /* closed or EOL not found */
266 if (reql == 0)
267 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100268 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200269 goto switchstate;
270 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100271 if (trash.str[reql-1] != '\n') {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100272 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200273 goto switchstate;
274 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100275 else if (reql > 1 && (trash.str[reql-2] == '\r'))
276 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200277 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100278 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200279
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200280 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200281
282 /* test hostname match */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100283 if (strcmp(localpeer, trash.str) != 0) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100284 si->applet.st0 = PEER_SESSION_EXIT;
285 si->applet.st1 = PEER_SESSION_ERRHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200286 goto switchstate;
287 }
288
Willy Tarreaubc4af052011-02-13 13:25:14 +0100289 si->applet.st0 = PEER_SESSION_GETPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200290 /* fall through */
291 case PEER_SESSION_GETPEER: {
292 struct peer *curpeer;
293 char *p;
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100294 reql = bo_getline(si->ob, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200295 if (reql <= 0) { /* closed or EOL not found */
296 if (reql == 0)
297 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100298 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200299 goto switchstate;
300 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100301 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200302 /* Incomplete line, we quit */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100303 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200304 goto switchstate;
305 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100306 else if (reql > 1 && (trash.str[reql-2] == '\r'))
307 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200308 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100309 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200310
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200311 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200312
313 /* parse line "<peer name> <pid>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100314 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200315 if (!p) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100316 si->applet.st0 = PEER_SESSION_EXIT;
317 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200318 goto switchstate;
319 }
320 *p = 0;
321
322 /* lookup known peer */
323 for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) {
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100324 if (strcmp(curpeer->id, trash.str) == 0)
Emeric Brun2b920a12010-09-23 18:30:22 +0200325 break;
326 }
327
328 /* if unknown peer */
329 if (!curpeer) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100330 si->applet.st0 = PEER_SESSION_EXIT;
331 si->applet.st1 = PEER_SESSION_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200332 goto switchstate;
333 }
334
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200335 si->conn->xprt_ctx = curpeer;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100336 si->applet.st0 = PEER_SESSION_GETTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200337 /* fall through */
338 }
339 case PEER_SESSION_GETTABLE: {
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200340 struct peer *curpeer = (struct peer *)si->conn->xprt_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200341 struct shared_table *st;
342 struct peer_session *ps = NULL;
343 unsigned long key_type;
344 size_t key_size;
345 char *p;
346
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100347 reql = bo_getline(si->ob, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200348 if (reql <= 0) { /* closed or EOL not found */
349 if (reql == 0)
350 goto out;
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200351 si->conn->xprt_ctx = NULL;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100352 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200353 goto switchstate;
354 }
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200355 /* Re init si->conn->xprt_ctx to null, to handle correctly a release case */
356 si->conn->xprt_ctx = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200357
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100358 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200359 /* Incomplete line, we quit */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100360 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200361 goto switchstate;
362 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100363 else if (reql > 1 && (trash.str[reql-2] == '\r'))
364 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200365 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100366 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200367
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200368 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200369
370 /* Parse line "<table name> <type> <size>" */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100371 p = strchr(trash.str, ' ');
Emeric Brun2b920a12010-09-23 18:30:22 +0200372 if (!p) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100373 si->applet.st0 = PEER_SESSION_EXIT;
374 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200375 goto switchstate;
376 }
377 *p = 0;
378 key_type = (unsigned long)atol(p+1);
379
380 p = strchr(p+1, ' ');
381 if (!p) {
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200382 si->conn->xprt_ctx = NULL;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100383 si->applet.st0 = PEER_SESSION_EXIT;
384 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200385 goto switchstate;
386 }
387
388 key_size = (size_t)atoi(p);
389 for (st = curpeers->tables; st; st = st->next) {
390 /* If table name matches */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100391 if (strcmp(st->table->id, trash.str) == 0) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200392 /* If key size mismatches */
393 if (key_size != st->table->key_size) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100394 si->applet.st0 = PEER_SESSION_EXIT;
395 si->applet.st1 = PEER_SESSION_ERRSIZE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200396 goto switchstate;
397 }
398
399 /* If key type mismatches */
400 if (key_type != st->table->type) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100401 si->applet.st0 = PEER_SESSION_EXIT;
402 si->applet.st1 = PEER_SESSION_ERRTYPE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200403 goto switchstate;
404 }
405
406 /* lookup peer session of current peer */
407 for (ps = st->sessions; ps; ps = ps->next) {
408 if (ps->peer == curpeer) {
409 /* If session already active, replaced by new one */
410 if (ps->session && ps->session != s) {
411 if (ps->peer->local) {
412 /* Local connection, reply a retry */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100413 si->applet.st0 = PEER_SESSION_EXIT;
414 si->applet.st1 = PEER_SESSION_TRYAGAIN;
Emeric Brun2b920a12010-09-23 18:30:22 +0200415 goto switchstate;
416 }
417 peer_session_forceshutdown(ps->session);
418 }
419 ps->session = s;
420 break;
421 }
422 }
423 break;
424 }
425 }
426
427 /* If table not found */
428 if (!st){
Willy Tarreaubc4af052011-02-13 13:25:14 +0100429 si->applet.st0 = PEER_SESSION_EXIT;
430 si->applet.st1 = PEER_SESSION_ERRTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200431 goto switchstate;
432 }
433
434 /* If no peer session for current peer */
435 if (!ps) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100436 si->applet.st0 = PEER_SESSION_EXIT;
437 si->applet.st1 = PEER_SESSION_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200438 goto switchstate;
439 }
440
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200441 si->conn->xprt_ctx = ps;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100442 si->applet.st0 = PEER_SESSION_SENDSUCCESS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200443 /* fall through */
444 }
445 case PEER_SESSION_SENDSUCCESS:{
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200446 struct peer_session *ps = (struct peer_session *)si->conn->xprt_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200447
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100448 repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESSION_SUCCESSCODE);
449 repl = bi_putblk(si->ib, trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200450 if (repl <= 0) {
451 if (repl == -1)
452 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100453 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200454 goto switchstate;
455 }
456
457 /* Register status code */
458 ps->statuscode = PEER_SESSION_SUCCESSCODE;
459
460 /* Awake main task */
461 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
462
463 /* Init cursors */
464 ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0;
465 ps->pushed = ps->update;
466
467 /* Init confirm counter */
468 ps->confirm = 0;
469
470 /* reset teaching and learning flags to 0 */
471 ps->flags &= PEER_TEACH_RESET;
472 ps->flags &= PEER_LEARN_RESET;
473
474 /* if current peer is local */
475 if (ps->peer->local) {
476 /* if table need resyncfrom local and no process assined */
477 if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL &&
478 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
479 /* assign local peer for a lesson, consider lesson already requested */
480 ps->flags |= PEER_F_LEARN_ASSIGN;
481 ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
482 }
483
484 }
485 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
486 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
487 /* assign peer for a lesson */
488 ps->flags |= PEER_F_LEARN_ASSIGN;
489 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
490 }
491 /* switch to waiting message state */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100492 si->applet.st0 = PEER_SESSION_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200493 goto switchstate;
494 }
495 case PEER_SESSION_CONNECT: {
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200496 struct peer_session *ps = (struct peer_session *)si->conn->xprt_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200497
498 /* Send headers */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100499 repl = snprintf(trash.str, trash.size,
Emeric Brun2b920a12010-09-23 18:30:22 +0200500 PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n",
501 ps->peer->id,
502 localpeer,
Willy Tarreau7b77c9f2012-01-07 22:52:12 +0100503 (int)getpid(),
Emeric Brun2b920a12010-09-23 18:30:22 +0200504 ps->table->table->id,
505 ps->table->table->type,
Willy Tarreaubd55e312010-11-11 10:55:09 +0100506 (int)ps->table->table->key_size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200507
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100508 if (repl >= trash.size) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100509 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200510 goto switchstate;
511 }
512
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100513 repl = bi_putblk(si->ib, trash.str, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200514 if (repl <= 0) {
515 if (repl == -1)
516 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100517 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200518 goto switchstate;
519 }
520
521 /* switch to the waiting statuscode state */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100522 si->applet.st0 = PEER_SESSION_GETSTATUS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200523 /* fall through */
524 }
525 case PEER_SESSION_GETSTATUS: {
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200526 struct peer_session *ps = (struct peer_session *)si->conn->xprt_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200527
Willy Tarreau03cdb7c2012-08-27 23:14:58 +0200528 if (si->ib->flags & CF_WRITE_PARTIAL)
Emeric Brun2b920a12010-09-23 18:30:22 +0200529 ps->statuscode = PEER_SESSION_CONNECTEDCODE;
530
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100531 reql = bo_getline(si->ob, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200532 if (reql <= 0) { /* closed or EOL not found */
533 if (reql == 0)
534 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100535 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200536 goto switchstate;
537 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100538 if (trash.str[reql-1] != '\n') {
Emeric Brun2b920a12010-09-23 18:30:22 +0200539 /* Incomplete line, we quit */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100540 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200541 goto switchstate;
542 }
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100543 else if (reql > 1 && (trash.str[reql-2] == '\r'))
544 trash.str[reql-2] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200545 else
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100546 trash.str[reql-1] = 0;
Emeric Brun2b920a12010-09-23 18:30:22 +0200547
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200548 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200549
550 /* Register status code */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100551 ps->statuscode = atoi(trash.str);
Emeric Brun2b920a12010-09-23 18:30:22 +0200552
553 /* Awake main task */
554 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
555
556 /* If status code is success */
557 if (ps->statuscode == PEER_SESSION_SUCCESSCODE) {
558 /* Init cursors */
559 ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0;
560 ps->pushed = ps->update;
561
562 /* Init confirm counter */
563 ps->confirm = 0;
564
565 /* reset teaching and learning flags to 0 */
566 ps->flags &= PEER_TEACH_RESET;
567 ps->flags &= PEER_LEARN_RESET;
568
569 /* If current peer is local */
570 if (ps->peer->local) {
571 /* Init cursors to push a resync */
572 ps->teaching_origin = ps->pushed = ps->table->table->update;
573 /* flag to start to teach lesson */
574 ps->flags |= PEER_F_TEACH_PROCESS;
575
576 }
577 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
578 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
579 /* If peer is remote and resync from remote is needed,
580 and no peer currently assigned */
581
582 /* assign peer for a lesson */
583 ps->flags |= PEER_F_LEARN_ASSIGN;
584 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
585 }
586
587 }
588 else {
589 /* Status code is not success, abort */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100590 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200591 goto switchstate;
592 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100593 si->applet.st0 = PEER_SESSION_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200594 /* fall through */
595 }
596 case PEER_SESSION_WAITMSG: {
Willy Tarreauf2943dc2012-10-26 20:10:28 +0200597 struct peer_session *ps = (struct peer_session *)si->conn->xprt_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200598 char c;
599 int totl = 0;
600
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200601 reql = bo_getblk(si->ob, (char *)&c, sizeof(c), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200602 if (reql <= 0) { /* closed or EOL not found */
603 if (reql == 0) {
604 /* nothing to read */
605 goto incomplete;
606 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100607 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200608 goto switchstate;
609 }
610 totl += reql;
611
612 if ((c & 0x80) || (c == 'D')) {
613 /* Here we have data message */
614 unsigned int pushack;
615 struct stksess *ts;
616 struct stksess *newts;
617 struct stktable_key stkey;
618 int srvid;
619 uint32_t netinteger;
620
621 /* Compute update remote version */
622 if (c & 0x80) {
623 pushack = ps->pushack + (unsigned int)(c & 0x7F);
624 }
625 else {
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200626 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200627 if (reql <= 0) { /* closed or EOL not found */
628 if (reql == 0) {
629 goto incomplete;
630 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100631 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200632 goto switchstate;
633 }
634 totl += reql;
635 pushack = ntohl(netinteger);
636 }
637
638 /* read key */
639 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
640 /* type string */
641 stkey.key = stkey.data.buf;
642
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200643 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200644 if (reql <= 0) { /* closed or EOL not found */
645 if (reql == 0) {
646 goto incomplete;
647 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100648 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200649 goto switchstate;
650 }
651 totl += reql;
652 stkey.key_len = ntohl(netinteger);
653
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200654 reql = bo_getblk(si->ob, stkey.key, stkey.key_len, totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200655 if (reql <= 0) { /* closed or EOL not found */
656 if (reql == 0) {
657 goto incomplete;
658 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100659 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200660 goto switchstate;
661 }
662 totl += reql;
663 }
664 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
665 /* type integer */
666 stkey.key_len = (size_t)-1;
667 stkey.key = &stkey.data.integer;
668
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200669 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200670 if (reql <= 0) { /* closed or EOL not found */
671 if (reql == 0) {
672 goto incomplete;
673 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100674 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200675 goto switchstate;
676 }
677 totl += reql;
678 stkey.data.integer = ntohl(netinteger);
679 }
680 else {
681 /* type ip */
682 stkey.key_len = (size_t)-1;
683 stkey.key = stkey.data.buf;
684
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200685 reql = bo_getblk(si->ob, (char *)&stkey.data.buf, ps->table->table->key_size, totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200686 if (reql <= 0) { /* closed or EOL not found */
687 if (reql == 0) {
688 goto incomplete;
689 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100690 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200691 goto switchstate;
692 }
693 totl += reql;
694
695 }
696
697 /* read server id */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200698 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200699 if (reql <= 0) { /* closed or EOL not found */
700 if (reql == 0) {
701 goto incomplete;
702 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100703 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200704 goto switchstate;
705 }
706 totl += reql;
707 srvid = ntohl(netinteger);
708
709 /* update entry */
710 newts = stksess_new(ps->table->table, &stkey);
711 if (newts) {
712 /* lookup for existing entry */
713 ts = stktable_lookup(ps->table->table, newts);
714 if (ts) {
715 /* the entry already exist, we can free ours */
716 stktable_touch(ps->table->table, ts, 0);
717 stksess_free(ps->table->table, newts);
718 }
719 else {
720 struct eb32_node *eb;
721
722 /* create new entry */
723 ts = stktable_store(ps->table->table, newts, 0);
724 ts->upd.key= (++ps->table->table->update)+(2^31);
725 eb = eb32_insert(&ps->table->table->updates, &ts->upd);
726 if (eb != &ts->upd) {
727 eb32_delete(eb);
728 eb32_insert(&ps->table->table->updates, &ts->upd);
729 }
730 }
731
732 /* update entry */
733 if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
734 stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
735 ps->pushack = pushack;
736 }
737
738 }
739 else if (c == 'R') {
740 /* Reset message: remote need resync */
741
742 /* reinit counters for a resync */
743 ps->lastpush = 0;
744 ps->teaching_origin = ps->pushed = ps->table->table->update;
745
746 /* reset teaching flags to 0 */
747 ps->flags &= PEER_TEACH_RESET;
748
749 /* flag to start to teach lesson */
750 ps->flags |= PEER_F_TEACH_PROCESS;
751 }
752 else if (c == 'F') {
753 /* Finish message, all known updates have been pushed by remote */
754 /* and remote is up to date */
755
756 /* If resync is in progress with remote peer */
757 if (ps->flags & PEER_F_LEARN_ASSIGN) {
758
759 /* unassign current peer for learning */
760 ps->flags &= ~PEER_F_LEARN_ASSIGN;
761 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
762
763 /* Consider table is now up2date, resync resync no more needed from local neither remote */
764 ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE);
765 }
766 /* Increase confirm counter to launch a confirm message */
767 ps->confirm++;
768 }
769 else if (c == 'c') {
770 /* confirm message, remote peer is now up to date with us */
771
772 /* If stopping state */
773 if (stopping) {
774 /* Close session, push resync no more needed */
775 ps->flags |= PEER_F_TEACH_COMPLETE;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100776 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200777 goto switchstate;
778 }
779
780 /* reset teaching flags to 0 */
781 ps->flags &= PEER_TEACH_RESET;
782 }
783 else if (c == 'C') {
784 /* Continue message, all known updates have been pushed by remote */
785 /* but remote is not up to date */
786
787 /* If resync is in progress with current peer */
788 if (ps->flags & PEER_F_LEARN_ASSIGN) {
789
790 /* unassign current peer */
791 ps->flags &= ~PEER_F_LEARN_ASSIGN;
792 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
793
794 /* flag current peer is not up 2 date to try from an other */
795 ps->flags |= PEER_F_LEARN_NOTUP2DATE;
796
797 /* reschedule a resync */
798 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
799 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
800 }
801 ps->confirm++;
802 }
803 else if (c == 'A') {
804 /* ack message */
805 uint32_t netinteger;
806
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200807 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200808 if (reql <= 0) { /* closed or EOL not found */
809 if (reql == 0) {
810 goto incomplete;
811 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100812 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200813 goto switchstate;
814 }
815 totl += reql;
816
817 /* Consider remote is up to date with "acked" version */
818 ps->update = ntohl(netinteger);
819 }
820 else {
821 /* Unknown message */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100822 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200823 goto switchstate;
824 }
825
826 /* skip consumed message */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200827 bo_skip(si->ob, totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200828
829 /* loop on that state to peek next message */
830 continue;
831incomplete:
832 /* Nothing to read, now we start to write */
833
834 /* Confirm finished or partial messages */
835 while (ps->confirm) {
836 /* There is a confirm messages to send */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200837 repl = bi_putchr(si->ib, 'c');
Emeric Brun2b920a12010-09-23 18:30:22 +0200838 if (repl <= 0) {
839 /* no more write possible */
840 if (repl == -1)
841 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100842 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200843 goto switchstate;
844 }
845 ps->confirm--;
846 }
847
848 /* Need to request a resync */
849 if ((ps->flags & PEER_F_LEARN_ASSIGN) &&
850 (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) &&
851 !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
852 /* Current peer was elected to request a resync */
853
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200854 repl = bi_putchr(si->ib, 'R');
Emeric Brun2b920a12010-09-23 18:30:22 +0200855 if (repl <= 0) {
856 /* no more write possible */
857 if (repl == -1)
858 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100859 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200860 goto switchstate;
861 }
862 ps->table->flags |= SHTABLE_F_RESYNC_PROCESS;
863 }
864
865 /* It remains some updates to ack */
866 if (ps->pushack != ps->lastack) {
867 uint32_t netinteger;
868
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100869 trash.str[0] = 'A';
Emeric Brun2b920a12010-09-23 18:30:22 +0200870 netinteger = htonl(ps->pushack);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100871 memcpy(&trash.str[1], &netinteger, sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200872
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100873 repl = bi_putblk(si->ib, trash.str, 1+sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200874 if (repl <= 0) {
875 /* no more write possible */
876 if (repl == -1)
877 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100878 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200879 goto switchstate;
880 }
881 ps->lastack = ps->pushack;
882 }
883
884 if (ps->flags & PEER_F_TEACH_PROCESS) {
885 /* current peer was requested for a lesson */
886
887 if (!(ps->flags & PEER_F_TEACH_STAGE1)) {
888 /* lesson stage 1 not complete */
889 struct eb32_node *eb;
890
891 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
892 while (1) {
893 int msglen;
894 struct stksess *ts;
895
896 if (!eb) {
897 /* flag lesson stage1 complete */
898 ps->flags |= PEER_F_TEACH_STAGE1;
899 eb = eb32_first(&ps->table->table->updates);
900 if (eb)
901 ps->pushed = eb->key - 1;
902 break;
903 }
904
905 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100906 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200907 if (msglen) {
908 /* message to buffer */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100909 repl = bi_putblk(si->ib, trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200910 if (repl <= 0) {
911 /* no more write possible */
912 if (repl == -1)
913 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100914 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200915 goto switchstate;
916 }
917 ps->lastpush = ps->pushed = ts->upd.key;
918 }
919 eb = eb32_next(eb);
920 }
921 } /* !TEACH_STAGE1 */
922
923 if (!(ps->flags & PEER_F_TEACH_STAGE2)) {
924 /* lesson stage 2 not complete */
925 struct eb32_node *eb;
926
927 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
928 while (1) {
929 int msglen;
930 struct stksess *ts;
931
932 if (!eb || eb->key > ps->teaching_origin) {
933 /* flag lesson stage1 complete */
934 ps->flags |= PEER_F_TEACH_STAGE2;
935 ps->pushed = ps->teaching_origin;
936 break;
937 }
938
939 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100940 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200941 if (msglen) {
942 /* message to buffer */
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100943 repl = bi_putblk(si->ib, trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200944 if (repl <= 0) {
945 /* no more write possible */
946 if (repl == -1)
947 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100948 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200949 goto switchstate;
950 }
951 ps->lastpush = ps->pushed = ts->upd.key;
952 }
953 eb = eb32_next(eb);
954 }
955 } /* !TEACH_STAGE2 */
956
957 if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
958 /* process final lesson message */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200959 repl = bi_putchr(si->ib, ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
Emeric Brun2b920a12010-09-23 18:30:22 +0200960 if (repl <= 0) {
961 /* no more write possible */
962 if (repl == -1)
963 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100964 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200965 goto switchstate;
966 }
967
968 /* flag finished message sent */
969 ps->flags |= PEER_F_TEACH_FINISHED;
970 } /* !TEACH_FINISHED */
971 } /* TEACH_PROCESS */
972
973 if (!(ps->flags & PEER_F_LEARN_ASSIGN) &&
974 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
975 /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */
976 struct eb32_node *eb;
977
978 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
979 while (1) {
980 int msglen;
981 struct stksess *ts;
982
983 /* push local updates */
984 if (!eb) {
985 eb = eb32_first(&ps->table->table->updates);
986 if (!eb || ((int)(eb->key - ps->pushed) <= 0)) {
987 ps->pushed = ps->table->table->localupdate;
988 break;
989 }
990 }
991
992 if ((int)(eb->key - ps->table->table->localupdate) > 0) {
993 ps->pushed = ps->table->table->localupdate;
994 break;
995 }
996
997 ts = eb32_entry(eb, struct stksess, upd);
Willy Tarreau19d14ef2012-10-29 16:51:55 +0100998 msglen = peer_prepare_datamsg(ts, ps, trash.str, trash.size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200999 if (msglen) {
1000 /* message to buffer */
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001001 repl = bi_putblk(si->ib, trash.str, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +02001002 if (repl <= 0) {
1003 /* no more write possible */
1004 if (repl == -1)
1005 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +01001006 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001007 goto switchstate;
1008 }
1009 ps->lastpush = ps->pushed = ts->upd.key;
1010 }
1011 eb = eb32_next(eb);
1012 }
1013 } /* ! LEARN_ASSIGN */
1014 /* noting more to do */
1015 goto out;
1016 }
1017 case PEER_SESSION_EXIT:
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001018 repl = snprintf(trash.str, trash.size, "%d\n", si->applet.st1);
Emeric Brun2b920a12010-09-23 18:30:22 +02001019
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001020 if (bi_putblk(si->ib, trash.str, repl) == -1)
Emeric Brun2b920a12010-09-23 18:30:22 +02001021 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +01001022 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001023 /* fall through */
1024 case PEER_SESSION_END: {
Willy Tarreau73b013b2012-05-21 16:31:45 +02001025 si_shutw(si);
1026 si_shutr(si);
Willy Tarreau03cdb7c2012-08-27 23:14:58 +02001027 si->ib->flags |= CF_READ_NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001028 goto quit;
1029 }
1030 }
1031 }
1032out:
Willy Tarreau73b013b2012-05-21 16:31:45 +02001033 si_update(si);
Willy Tarreau03cdb7c2012-08-27 23:14:58 +02001034 si->ob->flags |= CF_READ_DONTWAIT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001035 /* we don't want to expire timeouts while we're processing requests */
1036 si->ib->rex = TICK_ETERNITY;
1037 si->ob->wex = TICK_ETERNITY;
1038quit:
1039 return;
1040}
1041
Willy Tarreaub24281b2011-02-13 13:16:36 +01001042static struct si_applet peer_applet = {
1043 .name = "<PEER>", /* used for logging */
1044 .fct = peer_io_handler,
Aman Gupta9a13e842012-04-02 18:57:53 -07001045 .release = peer_session_release,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001046};
Emeric Brun2b920a12010-09-23 18:30:22 +02001047
1048/*
1049 * Use this function to force a close of a peer session
1050 */
Simon Horman96553772011-06-08 09:18:51 +09001051static void peer_session_forceshutdown(struct session * session)
Emeric Brun2b920a12010-09-23 18:30:22 +02001052{
1053 struct stream_interface *oldsi;
1054
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001055 if (session->si[0].conn->target.type == TARG_TYPE_APPLET &&
1056 session->si[0].conn->target.ptr.a == &peer_applet) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001057 oldsi = &session->si[0];
1058 }
1059 else {
1060 oldsi = &session->si[1];
1061 }
1062
1063 /* call release to reinit resync states if needed */
1064 peer_session_release(oldsi);
Willy Tarreaubc4af052011-02-13 13:25:14 +01001065 oldsi->applet.st0 = PEER_SESSION_END;
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001066 oldsi->conn->xprt_ctx = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001067 task_wakeup(session->task, TASK_WOKEN_MSG);
1068}
1069
1070/*
1071 * this function is called on a read event from a listen socket, corresponding
1072 * to an accept. It tries to accept as many connections as possible.
Willy Tarreaubd55e312010-11-11 10:55:09 +01001073 * It returns a positive value upon success, 0 if the connection needs to be
1074 * closed and ignored, or a negative value upon critical failure.
Emeric Brun2b920a12010-09-23 18:30:22 +02001075 */
1076int peer_accept(struct session *s)
1077{
1078 /* we have a dedicated I/O handler for the stats */
Willy Tarreaub24281b2011-02-13 13:16:36 +01001079 stream_int_register_handler(&s->si[1], &peer_applet);
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001080 copy_target(&s->target, &s->si[1].conn->target); // for logging only
1081 s->si[1].conn->xprt_ctx = s;
Willy Tarreaubc4af052011-02-13 13:25:14 +01001082 s->si[1].applet.st0 = PEER_SESSION_ACCEPT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001083
1084 tv_zero(&s->logs.tv_request);
1085 s->logs.t_queue = 0;
1086 s->logs.t_connect = 0;
1087 s->logs.t_data = 0;
1088 s->logs.t_close = 0;
1089 s->logs.bytes_in = s->logs.bytes_out = 0;
1090 s->logs.prx_queue_size = 0;/* we get the number of pending conns before us */
1091 s->logs.srv_queue_size = 0; /* we will get this number soon */
1092
Willy Tarreau03cdb7c2012-08-27 23:14:58 +02001093 s->req->flags |= CF_READ_DONTWAIT; /* we plan to read small requests */
Emeric Brun2b920a12010-09-23 18:30:22 +02001094
1095 if (s->listener->timeout) {
1096 s->req->rto = *s->listener->timeout;
1097 s->rep->wto = *s->listener->timeout;
1098 }
1099 return 1;
1100}
1101
1102/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01001103 * Create a new peer session in assigned state (connect will start automatically)
Emeric Brun2b920a12010-09-23 18:30:22 +02001104 */
Simon Horman96553772011-06-08 09:18:51 +09001105static struct session *peer_session_create(struct peer *peer, struct peer_session *ps)
Emeric Brun2b920a12010-09-23 18:30:22 +02001106{
Willy Tarreau4348fad2012-09-20 16:48:07 +02001107 struct listener *l = LIST_NEXT(&peer->peers->peers_fe->conf.listeners, struct listener *, by_fe);
Emeric Brun2b920a12010-09-23 18:30:22 +02001108 struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */
1109 struct session *s;
1110 struct http_txn *txn;
1111 struct task *t;
1112
1113 if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */
1114 Alert("out of memory in event_accept().\n");
Emeric Brun2b920a12010-09-23 18:30:22 +02001115 goto out_close;
1116 }
1117
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001118 if (unlikely((s->si[0].conn = pool_alloc2(pool2_connection)) == NULL))
1119 goto out_fail_conn0;
1120
1121 if (unlikely((s->si[1].conn = pool_alloc2(pool2_connection)) == NULL))
1122 goto out_fail_conn1;
1123
Emeric Brun2b920a12010-09-23 18:30:22 +02001124 LIST_ADDQ(&sessions, &s->list);
1125 LIST_INIT(&s->back_refs);
1126
1127 s->flags = SN_ASSIGNED|SN_ADDR_SET;
Emeric Brun2b920a12010-09-23 18:30:22 +02001128
1129 /* if this session comes from a known monitoring system, we want to ignore
1130 * it as soon as possible, which means closing it immediately for TCP.
1131 */
1132 if ((t = task_new()) == NULL) { /* disable this proxy for a while */
1133 Alert("out of memory in event_accept().\n");
Emeric Brun2b920a12010-09-23 18:30:22 +02001134 goto out_free_session;
1135 }
1136
1137 ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
1138 ps->statuscode = PEER_SESSION_CONNECTCODE;
1139
1140 t->process = l->handler;
1141 t->context = s;
1142 t->nice = l->nice;
1143
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001144 memcpy(&s->si[1].conn->addr.to, &peer->addr, sizeof(s->si[1].conn->addr.to));
Emeric Brun2b920a12010-09-23 18:30:22 +02001145 s->task = t;
1146 s->listener = l;
1147
1148 /* Note: initially, the session's backend points to the frontend.
1149 * This changes later when switching rules are executed or
1150 * when the default backend is assigned.
1151 */
1152 s->be = s->fe = p;
1153
1154 s->req = s->rep = NULL; /* will be allocated later */
1155
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001156 s->si[0].conn->t.sock.fd = -1;
1157 s->si[0].conn->flags = CO_FL_NONE;
Emeric Brun2b920a12010-09-23 18:30:22 +02001158 s->si[0].owner = t;
1159 s->si[0].state = s->si[0].prev_state = SI_ST_EST;
1160 s->si[0].err_type = SI_ET_NONE;
1161 s->si[0].err_loc = NULL;
Willy Tarreau26d8c592012-05-07 18:12:14 +02001162 s->si[0].release = NULL;
Willy Tarreau63e7fe32012-05-08 15:20:43 +02001163 s->si[0].send_proxy_ofs = 0;
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001164 set_target_client(&s->si[0].conn->target, l);
Emeric Brun2b920a12010-09-23 18:30:22 +02001165 s->si[0].exp = TICK_ETERNITY;
1166 s->si[0].flags = SI_FL_NONE;
1167 if (s->fe->options2 & PR_O2_INDEPSTR)
1168 s->si[0].flags |= SI_FL_INDEP_STR;
Emeric Brun2b920a12010-09-23 18:30:22 +02001169
Willy Tarreaub24281b2011-02-13 13:16:36 +01001170 stream_int_register_handler(&s->si[0], &peer_applet);
Willy Tarreaufa6bac62012-05-31 14:16:59 +02001171 s->si[0].applet.st0 = PEER_SESSION_CONNECT;
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001172 s->si[0].conn->xprt_ctx = (void *)ps;
Emeric Brun2b920a12010-09-23 18:30:22 +02001173
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001174 s->si[1].conn->t.sock.fd = -1; /* just to help with debugging */
1175 s->si[1].conn->flags = CO_FL_NONE;
Emeric Brun2b920a12010-09-23 18:30:22 +02001176 s->si[1].owner = t;
1177 s->si[1].state = s->si[1].prev_state = SI_ST_ASS;
1178 s->si[1].conn_retries = p->conn_retries;
1179 s->si[1].err_type = SI_ET_NONE;
1180 s->si[1].err_loc = NULL;
Willy Tarreau26d8c592012-05-07 18:12:14 +02001181 s->si[1].release = NULL;
Willy Tarreau63e7fe32012-05-08 15:20:43 +02001182 s->si[1].send_proxy_ofs = 0;
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001183 set_target_proxy(&s->si[1].conn->target, s->be);
Willy Tarreauf7bc57c2012-10-03 00:19:48 +02001184 si_prepare_conn(&s->si[1], peer->proto, peer->xprt);
Emeric Brun2b920a12010-09-23 18:30:22 +02001185 s->si[1].exp = TICK_ETERNITY;
1186 s->si[1].flags = SI_FL_NONE;
1187 if (s->be->options2 & PR_O2_INDEPSTR)
1188 s->si[1].flags |= SI_FL_INDEP_STR;
1189
Willy Tarreau9bd0d742011-07-20 00:17:39 +02001190 session_init_srv_conn(s);
Simon Horman8b7b05a2011-08-13 08:03:48 +09001191 set_target_proxy(&s->target, s->be);
Emeric Brun2b920a12010-09-23 18:30:22 +02001192 s->pend_pos = NULL;
1193
1194 /* init store persistence */
1195 s->store_count = 0;
1196 s->stkctr1_entry = NULL;
1197 s->stkctr2_entry = NULL;
1198
1199 /* FIXME: the logs are horribly complicated now, because they are
1200 * defined in <p>, <p>, and later <be> and <be>.
1201 */
1202
1203 s->logs.logwait = 0;
1204 s->do_log = NULL;
1205
1206 /* default error reporting function, may be changed by analysers */
1207 s->srv_error = default_srv_error;
1208
Emeric Brun2b920a12010-09-23 18:30:22 +02001209 s->uniq_id = 0;
Willy Tarreaubd833142012-05-08 15:51:44 +02001210 s->unique_id = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001211
1212 txn = &s->txn;
1213 /* Those variables will be checked and freed if non-NULL in
1214 * session.c:session_free(). It is important that they are
1215 * properly initialized.
1216 */
1217 txn->sessid = NULL;
1218 txn->srv_cookie = NULL;
1219 txn->cli_cookie = NULL;
1220 txn->uri = NULL;
1221 txn->req.cap = NULL;
1222 txn->rsp.cap = NULL;
1223 txn->hdr_idx.v = NULL;
1224 txn->hdr_idx.size = txn->hdr_idx.used = 0;
1225
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001226 if ((s->req = pool_alloc2(pool2_channel)) == NULL)
Emeric Brun2b920a12010-09-23 18:30:22 +02001227 goto out_fail_req; /* no memory */
1228
Willy Tarreau9b28e032012-10-12 23:49:43 +02001229 if ((s->req->buf = pool_alloc2(pool2_buffer)) == NULL)
1230 goto out_fail_req_buf; /* no memory */
1231
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001232 s->req->buf->size = trash.size;
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001233 channel_init(s->req);
Emeric Brun2b920a12010-09-23 18:30:22 +02001234 s->req->prod = &s->si[0];
1235 s->req->cons = &s->si[1];
1236 s->si[0].ib = s->si[1].ob = s->req;
1237
Willy Tarreau03cdb7c2012-08-27 23:14:58 +02001238 s->req->flags |= CF_READ_ATTACHED; /* the producer is already connected */
Emeric Brun2b920a12010-09-23 18:30:22 +02001239
1240 /* activate default analysers enabled for this listener */
1241 s->req->analysers = l->analysers;
1242
1243 /* note: this should not happen anymore since there's always at least the switching rules */
1244 if (!s->req->analysers) {
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001245 channel_auto_connect(s->req);/* don't wait to establish connection */
1246 channel_auto_close(s->req);/* let the producer forward close requests */
Emeric Brun2b920a12010-09-23 18:30:22 +02001247 }
1248
1249 s->req->rto = s->fe->timeout.client;
1250 s->req->wto = s->be->timeout.server;
1251
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001252 if ((s->rep = pool_alloc2(pool2_channel)) == NULL)
Emeric Brun2b920a12010-09-23 18:30:22 +02001253 goto out_fail_rep; /* no memory */
1254
Willy Tarreau9b28e032012-10-12 23:49:43 +02001255 if ((s->rep->buf = pool_alloc2(pool2_buffer)) == NULL)
1256 goto out_fail_rep_buf; /* no memory */
1257
Willy Tarreau19d14ef2012-10-29 16:51:55 +01001258 s->rep->buf->size = trash.size;
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001259 channel_init(s->rep);
Emeric Brun2b920a12010-09-23 18:30:22 +02001260 s->rep->prod = &s->si[1];
1261 s->rep->cons = &s->si[0];
1262 s->si[0].ob = s->si[1].ib = s->rep;
1263
1264 s->rep->rto = s->be->timeout.server;
1265 s->rep->wto = s->fe->timeout.client;
1266
1267 s->req->rex = TICK_ETERNITY;
1268 s->req->wex = TICK_ETERNITY;
1269 s->req->analyse_exp = TICK_ETERNITY;
1270 s->rep->rex = TICK_ETERNITY;
1271 s->rep->wex = TICK_ETERNITY;
1272 s->rep->analyse_exp = TICK_ETERNITY;
1273 t->expire = TICK_ETERNITY;
1274
Willy Tarreau03cdb7c2012-08-27 23:14:58 +02001275 s->rep->flags |= CF_READ_DONTWAIT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001276 /* it is important not to call the wakeup function directly but to
1277 * pass through task_wakeup(), because this one knows how to apply
1278 * priorities to tasks.
1279 */
1280 task_wakeup(t, TASK_WOKEN_INIT);
1281
1282 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
1283 p->feconn++;/* beconn will be increased later */
1284 jobs++;
Willy Tarreau3c63fd82011-09-07 18:00:47 +02001285 if (!(s->listener->options & LI_O_UNLIMITED))
1286 actconn++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001287 totalconn++;
1288
1289 return s;
1290
1291 /* Error unrolling */
Willy Tarreau9b28e032012-10-12 23:49:43 +02001292 out_fail_rep_buf:
1293 pool_free2(pool2_channel, s->rep);
Emeric Brun2b920a12010-09-23 18:30:22 +02001294 out_fail_rep:
Willy Tarreau9b28e032012-10-12 23:49:43 +02001295 pool_free2(pool2_buffer, s->req->buf);
1296 out_fail_req_buf:
Willy Tarreau8263d2b2012-08-28 00:06:31 +02001297 pool_free2(pool2_channel, s->req);
Emeric Brun2b920a12010-09-23 18:30:22 +02001298 out_fail_req:
1299 task_free(t);
1300 out_free_session:
1301 LIST_DEL(&s->list);
Willy Tarreauf2943dc2012-10-26 20:10:28 +02001302 pool_free2(pool2_connection, s->si[1].conn);
1303 out_fail_conn1:
1304 pool_free2(pool2_connection, s->si[0].conn);
1305 out_fail_conn0:
Emeric Brun2b920a12010-09-23 18:30:22 +02001306 pool_free2(pool2_session, s);
1307 out_close:
1308 return s;
1309}
1310
1311/*
1312 * Task processing function to manage re-connect and peer session
1313 * tasks wakeup on local update.
1314 */
Simon Horman96553772011-06-08 09:18:51 +09001315static struct task *process_peer_sync(struct task * task)
Emeric Brun2b920a12010-09-23 18:30:22 +02001316{
1317 struct shared_table *st = (struct shared_table *)task->context;
1318 struct peer_session *ps;
1319
1320 task->expire = TICK_ETERNITY;
1321
1322 if (!stopping) {
1323 /* Normal case (not soft stop)*/
1324 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) &&
1325 (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) &&
1326 !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) {
1327 /* Resync from local peer needed
1328 no peer was assigned for the lesson
1329 and no old local peer found
1330 or resync timeout expire */
1331
1332 /* flag no more resync from local, to try resync from remotes */
1333 st->flags |= SHTABLE_F_RESYNC_LOCAL;
1334
1335 /* reschedule a resync */
1336 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1337 }
1338
1339 /* For each session */
1340 for (ps = st->sessions; ps; ps = ps->next) {
1341 /* For each remote peers */
1342 if (!ps->peer->local) {
1343 if (!ps->session) {
1344 /* no active session */
1345 if (ps->statuscode == 0 ||
1346 ps->statuscode == PEER_SESSION_SUCCESSCODE ||
1347 ((ps->statuscode == PEER_SESSION_CONNECTCODE ||
1348 ps->statuscode == PEER_SESSION_CONNECTEDCODE) &&
1349 tick_is_expired(ps->reconnect, now_ms))) {
1350 /* connection never tried
1351 * or previous session established with success
1352 * or previous session failed during connection
1353 * and reconnection timer is expired */
1354
1355 /* retry a connect */
1356 ps->session = peer_session_create(ps->peer, ps);
1357 }
1358 else if (ps->statuscode == PEER_SESSION_CONNECTCODE ||
1359 ps->statuscode == PEER_SESSION_CONNECTEDCODE) {
1360 /* If previous session failed during connection
1361 * but reconnection timer is not expired */
1362
1363 /* reschedule task for reconnect */
1364 task->expire = tick_first(task->expire, ps->reconnect);
1365 }
1366 /* else do nothing */
1367 } /* !ps->session */
1368 else if (ps->statuscode == PEER_SESSION_SUCCESSCODE) {
1369 /* current session is active and established */
1370 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1371 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1372 !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
1373 /* Resync from a remote is needed
1374 * and no peer was assigned for lesson
1375 * and current peer may be up2date */
1376
1377 /* assign peer for the lesson */
1378 ps->flags |= PEER_F_LEARN_ASSIGN;
1379 st->flags |= SHTABLE_F_RESYNC_ASSIGN;
1380
1381 /* awake peer session task to handle a request of resync */
1382 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1383 }
1384 else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
1385 /* awake peer session task to push local updates */
1386 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1387 }
1388 /* else do nothing */
1389 } /* SUCCESSCODE */
1390 } /* !ps->peer->local */
1391 } /* for */
1392
1393 /* Resync from remotes expired: consider resync is finished */
1394 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1395 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1396 tick_is_expired(st->resync_timeout, now_ms)) {
1397 /* Resync from remote peer needed
1398 * no peer was assigned for the lesson
1399 * and resync timeout expire */
1400
1401 /* flag no more resync from remote, consider resync is finished */
1402 st->flags |= SHTABLE_F_RESYNC_REMOTE;
1403 }
1404
1405 if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) {
1406 /* Resync not finished*/
1407 /* reschedule task to resync timeout, to ended resync if needed */
1408 task->expire = tick_first(task->expire, st->resync_timeout);
1409 }
1410 } /* !stopping */
1411 else {
1412 /* soft stop case */
1413 if (task->state & TASK_WOKEN_SIGNAL) {
1414 /* We've just recieved the signal */
1415 if (!(st->flags & SHTABLE_F_DONOTSTOP)) {
1416 /* add DO NOT STOP flag if not present */
1417 jobs++;
1418 st->flags |= SHTABLE_F_DONOTSTOP;
1419 }
1420
1421 /* disconnect all connected peers */
1422 for (ps = st->sessions; ps; ps = ps->next) {
1423 if (ps->session) {
1424 peer_session_forceshutdown(ps->session);
1425 ps->session = NULL;
1426 }
1427 }
1428 }
1429 ps = st->local_session;
1430
1431 if (ps->flags & PEER_F_TEACH_COMPLETE) {
1432 if (st->flags & SHTABLE_F_DONOTSTOP) {
1433 /* resync of new process was complete, current process can die now */
1434 jobs--;
1435 st->flags &= ~SHTABLE_F_DONOTSTOP;
1436 }
1437 }
1438 else if (!ps->session) {
1439 /* If session is not active */
1440 if (ps->statuscode == 0 ||
1441 ps->statuscode == PEER_SESSION_SUCCESSCODE ||
1442 ps->statuscode == PEER_SESSION_CONNECTEDCODE ||
1443 ps->statuscode == PEER_SESSION_TRYAGAIN) {
1444 /* connection never tried
1445 * or previous session was successfully established
1446 * or previous session tcp connect success but init state incomplete
1447 * or during previous connect, peer replies a try again statuscode */
1448
1449 /* connect to the peer */
1450 ps->session = peer_session_create(ps->peer, ps);
1451 }
1452 else {
1453 /* Other error cases */
1454 if (st->flags & SHTABLE_F_DONOTSTOP) {
1455 /* unable to resync new process, current process can die now */
1456 jobs--;
1457 st->flags &= ~SHTABLE_F_DONOTSTOP;
1458 }
1459 }
1460 }
1461 else if (ps->statuscode == PEER_SESSION_SUCCESSCODE &&
1462 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
1463 /* current session active and established
1464 awake session to push remaining local updates */
1465 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1466 }
1467 } /* stopping */
1468 /* Wakeup for re-connect */
1469 return task;
1470}
1471
1472/*
1473 * Function used to register a table for sync on a group of peers
1474 *
1475 */
1476void peers_register_table(struct peers *peers, struct stktable *table)
1477{
1478 struct shared_table *st;
1479 struct peer * curpeer;
1480 struct peer_session *ps;
Willy Tarreau4348fad2012-09-20 16:48:07 +02001481 struct listener *listener;
Emeric Brun2b920a12010-09-23 18:30:22 +02001482
1483 st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
1484 st->table = table;
1485 st->next = peers->tables;
1486 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1487 peers->tables = st;
1488
1489 for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
1490 ps = (struct peer_session *)calloc(1,sizeof(struct peer_session));
1491 ps->table = st;
1492 ps->peer = curpeer;
1493 if (curpeer->local)
1494 st->local_session = ps;
1495 ps->next = st->sessions;
1496 ps->reconnect = now_ms;
1497 st->sessions = ps;
1498 peers->peers_fe->maxconn += 3;
1499 }
1500
Willy Tarreau4348fad2012-09-20 16:48:07 +02001501 list_for_each_entry(listener, &peers->peers_fe->conf.listeners, by_fe)
1502 listener->maxconn = peers->peers_fe->maxconn;
Emeric Brun2b920a12010-09-23 18:30:22 +02001503 st->sync_task = task_new();
1504 st->sync_task->process = process_peer_sync;
1505 st->sync_task->expire = TICK_ETERNITY;
1506 st->sync_task->context = (void *)st;
1507 table->sync_task =st->sync_task;
1508 signal_register_task(0, table->sync_task, 0);
1509 task_wakeup(st->sync_task, TASK_WOKEN_INIT);
1510}
1511