blob: 894502ac7723cf92830792c50ab98b79d7113ea3 [file] [log] [blame]
Emeric Brun2b920a12010-09-23 18:30:22 +02001/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01002 * Stick table synchro management.
Emeric Brun2b920a12010-09-23 18:30:22 +02003 *
4 * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <sys/socket.h>
20#include <sys/stat.h>
21#include <sys/types.h>
22
23#include <common/compat.h>
24#include <common/config.h>
25#include <common/time.h>
26
27#include <types/global.h>
28#include <types/peers.h>
29
30#include <proto/acl.h>
Willy Tarreauc7e42382012-08-24 19:22:53 +020031#include <proto/channel.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020032#include <proto/fd.h>
33#include <proto/log.h>
34#include <proto/hdr_idx.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020035#include <proto/protocols.h>
36#include <proto/proto_tcp.h>
37#include <proto/proto_http.h>
38#include <proto/proxy.h>
39#include <proto/session.h>
40#include <proto/stream_interface.h>
Emeric Brun2b920a12010-09-23 18:30:22 +020041#include <proto/task.h>
42#include <proto/stick_table.h>
43#include <proto/signal.h>
44
45
46/*******************************/
47/* Current peer learning state */
48/*******************************/
49
50/******************************/
51/* Current table resync state */
52/******************************/
53#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
54#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
55#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
56#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
57#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
58 to push data to new process */
59
60#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
61#define SHTABLE_RESYNC_FROMLOCAL 0x00000000
62#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL
63#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
64
65/******************************/
66/* Remote peer teaching state */
67/******************************/
68#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
69#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */
70#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */
71#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
72#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
73#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
74#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
75
76#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
77#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
78
79
80/**********************************/
81/* Peer Session IO handler states */
82/**********************************/
83
84#define PEER_SESSION_ACCEPT 1000 /* Initial state for session create by an accept */
85#define PEER_SESSION_GETVERSION 1001 /* Validate supported protocol version*/
86#define PEER_SESSION_GETHOST 1002 /* Validate host ID correspond to local host id */
87#define PEER_SESSION_GETPEER 1003 /* Validate peer ID correspond to a known remote peer id */
88#define PEER_SESSION_GETTABLE 1004 /* Search into registered table for a table with same id and
89 validate type and size */
90#define PEER_SESSION_SENDSUCCESS 1005 /* Send ret code 200 (success) and wait for message */
91/* next state is WAITMSG */
92
93#define PEER_SESSION_CONNECT 2000 /* Initial state for session create on a connect,
94 push presentation into buffer */
95#define PEER_SESSION_GETSTATUS 2001 /* Wait for the welcome message */
96#define PEER_SESSION_WAITMSG 2002 /* Wait for datamessages*/
97/* loop on WAITMSG */
98
99#define PEER_SESSION_EXIT 10000 /* Exit with status code */
100#define PEER_SESSION_END 10001 /* Killed session */
101/* session ended */
102
103
104/**********************************/
105/* Peer Session status code */
106/**********************************/
107
108#define PEER_SESSION_CONNECTCODE 100 /* connect in progress */
109#define PEER_SESSION_CONNECTEDCODE 110 /* tcp connect success */
110
111#define PEER_SESSION_SUCCESSCODE 200 /* accept or connect successful */
112
113#define PEER_SESSION_TRYAGAIN 300 /* try again later */
114
115#define PEER_SESSION_ERRPROTO 501 /* error protocol */
116#define PEER_SESSION_ERRVERSION 502 /* unknown protocol version */
117#define PEER_SESSION_ERRHOST 503 /* bad host name */
118#define PEER_SESSION_ERRPEER 504 /* unknown peer */
119#define PEER_SESSION_ERRTYPE 505 /* table key type mismatch */
120#define PEER_SESSION_ERRSIZE 506 /* table key size mismatch */
121#define PEER_SESSION_ERRTABLE 507 /* unknown table */
122
123#define PEER_SESSION_PROTO_NAME "HAProxyS"
124
125struct peers *peers = NULL;
Simon Horman96553772011-06-08 09:18:51 +0900126static void peer_session_forceshutdown(struct session * session);
Emeric Brun2b920a12010-09-23 18:30:22 +0200127
128
129/*
130 * This prepare the data update message of the stick session <ts>, <ps> is the the peer session
131 * where the data going to be pushed, <msg> is a buffer of <size> to recieve data message content
132 */
Simon Horman96553772011-06-08 09:18:51 +0900133static int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size)
Emeric Brun2b920a12010-09-23 18:30:22 +0200134{
135 uint32_t netinteger;
136 int len;
137 /* construct message */
138 if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) {
139 msg[0] = 0x80 + ts->upd.key - ps->lastpush;
140 len = sizeof(char);
141 }
142 else {
143 msg[0] = 'D';
144 netinteger = htonl(ts->upd.key);
145 memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger));
146 len = sizeof(char) + sizeof(netinteger);
147 }
148
149 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
150 int stlen = strlen((char *)ts->key.key);
151
152 netinteger = htonl(strlen((char *)ts->key.key));
153 memcpy(&msg[len], &netinteger, sizeof(netinteger));
154 memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen);
155 len += sizeof(netinteger) + stlen;
156
157 }
158 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
159 netinteger = htonl(*((uint32_t *)ts->key.key));
160 memcpy(&msg[len], &netinteger, sizeof(netinteger));
161 len += sizeof(netinteger);
162 }
163 else {
164 memcpy(&msg[len], ts->key.key, ps->table->table->key_size);
165 len += ps->table->table->key_size;
166 }
167
168 if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
169 netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id));
170 else
171 netinteger = 0;
172
173 memcpy(&msg[len], &netinteger , sizeof(netinteger));
174 len += sizeof(netinteger);
175
176 return len;
177}
178
179
180/*
181 * Callback to release a session with a peer
182 */
Simon Horman96553772011-06-08 09:18:51 +0900183static void peer_session_release(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200184{
Aman Guptad94991d2012-04-06 17:39:26 -0700185 struct task *t = (struct task *)si->owner;
Emeric Brun2b920a12010-09-23 18:30:22 +0200186 struct session *s = (struct session *)t->context;
Willy Tarreau94981132012-05-21 17:09:48 +0200187 struct peer_session *ps = (struct peer_session *)si->conn.data_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200188
Willy Tarreau94981132012-05-21 17:09:48 +0200189 /* si->conn.data_ctx is not a peer session */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100190 if (si->applet.st0 < PEER_SESSION_SENDSUCCESS)
Emeric Brun2b920a12010-09-23 18:30:22 +0200191 return;
192
193 /* peer session identified */
194 if (ps) {
195 if (ps->session == s) {
196 ps->session = NULL;
197 if (ps->flags & PEER_F_LEARN_ASSIGN) {
198 /* unassign current peer for learning */
199 ps->flags &= ~(PEER_F_LEARN_ASSIGN);
200 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
201
202 /* reschedule a resync */
203 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
204 }
205 /* reset teaching and learning flags to 0 */
206 ps->flags &= PEER_TEACH_RESET;
207 ps->flags &= PEER_LEARN_RESET;
208 }
209 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
210 }
211}
212
213
214/*
215 * IO Handler to handle message exchance with a peer
216 */
Willy Tarreaub24281b2011-02-13 13:16:36 +0100217static void peer_io_handler(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200218{
219 struct task *t= (struct task *)si->owner;
220 struct session *s = (struct session *)t->context;
221 struct peers *curpeers = (struct peers *)s->fe->parent;
222 int reql = 0;
223 int repl = 0;
224
225 while (1) {
226switchstate:
Willy Tarreaubc4af052011-02-13 13:25:14 +0100227 switch(si->applet.st0) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200228 case PEER_SESSION_ACCEPT:
Willy Tarreau94981132012-05-21 17:09:48 +0200229 si->conn.data_ctx = NULL;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100230 si->applet.st0 = PEER_SESSION_GETVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200231 /* fall through */
232 case PEER_SESSION_GETVERSION:
David du Colombier7af46052012-05-16 14:16:48 +0200233 reql = bo_getline(si->ob, trash, trashlen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200234 if (reql <= 0) { /* closed or EOL not found */
235 if (reql == 0)
236 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100237 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200238 goto switchstate;
239 }
240 if (trash[reql-1] != '\n') {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100241 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200242 goto switchstate;
243 }
244 else if (reql > 1 && (trash[reql-2] == '\r'))
245 trash[reql-2] = 0;
246 else
247 trash[reql-1] = 0;
248
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200249 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200250
251 /* test version */
252 if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash) != 0) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100253 si->applet.st0 = PEER_SESSION_EXIT;
254 si->applet.st1 = PEER_SESSION_ERRVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200255 /* test protocol */
256 if (strncmp(PEER_SESSION_PROTO_NAME " ", trash, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
Willy Tarreaubc4af052011-02-13 13:25:14 +0100257 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200258 goto switchstate;
259 }
260
Willy Tarreaubc4af052011-02-13 13:25:14 +0100261 si->applet.st0 = PEER_SESSION_GETHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200262 /* fall through */
263 case PEER_SESSION_GETHOST:
David du Colombier7af46052012-05-16 14:16:48 +0200264 reql = bo_getline(si->ob, trash, trashlen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200265 if (reql <= 0) { /* closed or EOL not found */
266 if (reql == 0)
267 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100268 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200269 goto switchstate;
270 }
271 if (trash[reql-1] != '\n') {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100272 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200273 goto switchstate;
274 }
275 else if (reql > 1 && (trash[reql-2] == '\r'))
276 trash[reql-2] = 0;
277 else
278 trash[reql-1] = 0;
279
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200280 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200281
282 /* test hostname match */
283 if (strcmp(localpeer, trash) != 0) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100284 si->applet.st0 = PEER_SESSION_EXIT;
285 si->applet.st1 = PEER_SESSION_ERRHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200286 goto switchstate;
287 }
288
Willy Tarreaubc4af052011-02-13 13:25:14 +0100289 si->applet.st0 = PEER_SESSION_GETPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200290 /* fall through */
291 case PEER_SESSION_GETPEER: {
292 struct peer *curpeer;
293 char *p;
David du Colombier7af46052012-05-16 14:16:48 +0200294 reql = bo_getline(si->ob, trash, trashlen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200295 if (reql <= 0) { /* closed or EOL not found */
296 if (reql == 0)
297 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100298 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200299 goto switchstate;
300 }
301 if (trash[reql-1] != '\n') {
302 /* Incomplete line, we quit */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100303 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200304 goto switchstate;
305 }
306 else if (reql > 1 && (trash[reql-2] == '\r'))
307 trash[reql-2] = 0;
308 else
309 trash[reql-1] = 0;
310
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200311 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200312
313 /* parse line "<peer name> <pid>" */
314 p = strchr(trash, ' ');
315 if (!p) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100316 si->applet.st0 = PEER_SESSION_EXIT;
317 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200318 goto switchstate;
319 }
320 *p = 0;
321
322 /* lookup known peer */
323 for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) {
324 if (strcmp(curpeer->id, trash) == 0)
325 break;
326 }
327
328 /* if unknown peer */
329 if (!curpeer) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100330 si->applet.st0 = PEER_SESSION_EXIT;
331 si->applet.st1 = PEER_SESSION_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200332 goto switchstate;
333 }
334
Willy Tarreau94981132012-05-21 17:09:48 +0200335 si->conn.data_ctx = curpeer;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100336 si->applet.st0 = PEER_SESSION_GETTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200337 /* fall through */
338 }
339 case PEER_SESSION_GETTABLE: {
Willy Tarreau94981132012-05-21 17:09:48 +0200340 struct peer *curpeer = (struct peer *)si->conn.data_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200341 struct shared_table *st;
342 struct peer_session *ps = NULL;
343 unsigned long key_type;
344 size_t key_size;
345 char *p;
346
David du Colombier7af46052012-05-16 14:16:48 +0200347 reql = bo_getline(si->ob, trash, trashlen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200348 if (reql <= 0) { /* closed or EOL not found */
349 if (reql == 0)
350 goto out;
Willy Tarreau94981132012-05-21 17:09:48 +0200351 si->conn.data_ctx = NULL;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100352 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200353 goto switchstate;
354 }
Willy Tarreau94981132012-05-21 17:09:48 +0200355 /* Re init si->conn.data_ctx to null, to handle correctly a release case */
356 si->conn.data_ctx = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200357
358 if (trash[reql-1] != '\n') {
359 /* Incomplete line, we quit */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100360 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200361 goto switchstate;
362 }
363 else if (reql > 1 && (trash[reql-2] == '\r'))
364 trash[reql-2] = 0;
365 else
366 trash[reql-1] = 0;
367
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200368 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200369
370 /* Parse line "<table name> <type> <size>" */
371 p = strchr(trash, ' ');
372 if (!p) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100373 si->applet.st0 = PEER_SESSION_EXIT;
374 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200375 goto switchstate;
376 }
377 *p = 0;
378 key_type = (unsigned long)atol(p+1);
379
380 p = strchr(p+1, ' ');
381 if (!p) {
Willy Tarreau94981132012-05-21 17:09:48 +0200382 si->conn.data_ctx = NULL;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100383 si->applet.st0 = PEER_SESSION_EXIT;
384 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200385 goto switchstate;
386 }
387
388 key_size = (size_t)atoi(p);
389 for (st = curpeers->tables; st; st = st->next) {
390 /* If table name matches */
391 if (strcmp(st->table->id, trash) == 0) {
392 /* If key size mismatches */
393 if (key_size != st->table->key_size) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100394 si->applet.st0 = PEER_SESSION_EXIT;
395 si->applet.st1 = PEER_SESSION_ERRSIZE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200396 goto switchstate;
397 }
398
399 /* If key type mismatches */
400 if (key_type != st->table->type) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100401 si->applet.st0 = PEER_SESSION_EXIT;
402 si->applet.st1 = PEER_SESSION_ERRTYPE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200403 goto switchstate;
404 }
405
406 /* lookup peer session of current peer */
407 for (ps = st->sessions; ps; ps = ps->next) {
408 if (ps->peer == curpeer) {
409 /* If session already active, replaced by new one */
410 if (ps->session && ps->session != s) {
411 if (ps->peer->local) {
412 /* Local connection, reply a retry */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100413 si->applet.st0 = PEER_SESSION_EXIT;
414 si->applet.st1 = PEER_SESSION_TRYAGAIN;
Emeric Brun2b920a12010-09-23 18:30:22 +0200415 goto switchstate;
416 }
417 peer_session_forceshutdown(ps->session);
418 }
419 ps->session = s;
420 break;
421 }
422 }
423 break;
424 }
425 }
426
427 /* If table not found */
428 if (!st){
Willy Tarreaubc4af052011-02-13 13:25:14 +0100429 si->applet.st0 = PEER_SESSION_EXIT;
430 si->applet.st1 = PEER_SESSION_ERRTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200431 goto switchstate;
432 }
433
434 /* If no peer session for current peer */
435 if (!ps) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100436 si->applet.st0 = PEER_SESSION_EXIT;
437 si->applet.st1 = PEER_SESSION_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200438 goto switchstate;
439 }
440
Willy Tarreau94981132012-05-21 17:09:48 +0200441 si->conn.data_ctx = ps;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100442 si->applet.st0 = PEER_SESSION_SENDSUCCESS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200443 /* fall through */
444 }
445 case PEER_SESSION_SENDSUCCESS:{
Willy Tarreau94981132012-05-21 17:09:48 +0200446 struct peer_session *ps = (struct peer_session *)si->conn.data_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200447
David du Colombier7af46052012-05-16 14:16:48 +0200448 repl = snprintf(trash, trashlen, "%d\n", PEER_SESSION_SUCCESSCODE);
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200449 repl = bi_putblk(si->ib, trash, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200450 if (repl <= 0) {
451 if (repl == -1)
452 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100453 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200454 goto switchstate;
455 }
456
457 /* Register status code */
458 ps->statuscode = PEER_SESSION_SUCCESSCODE;
459
460 /* Awake main task */
461 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
462
463 /* Init cursors */
464 ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0;
465 ps->pushed = ps->update;
466
467 /* Init confirm counter */
468 ps->confirm = 0;
469
470 /* reset teaching and learning flags to 0 */
471 ps->flags &= PEER_TEACH_RESET;
472 ps->flags &= PEER_LEARN_RESET;
473
474 /* if current peer is local */
475 if (ps->peer->local) {
476 /* if table need resyncfrom local and no process assined */
477 if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL &&
478 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
479 /* assign local peer for a lesson, consider lesson already requested */
480 ps->flags |= PEER_F_LEARN_ASSIGN;
481 ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
482 }
483
484 }
485 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
486 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
487 /* assign peer for a lesson */
488 ps->flags |= PEER_F_LEARN_ASSIGN;
489 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
490 }
491 /* switch to waiting message state */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100492 si->applet.st0 = PEER_SESSION_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200493 goto switchstate;
494 }
495 case PEER_SESSION_CONNECT: {
Willy Tarreau94981132012-05-21 17:09:48 +0200496 struct peer_session *ps = (struct peer_session *)si->conn.data_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200497
498 /* Send headers */
David du Colombier7af46052012-05-16 14:16:48 +0200499 repl = snprintf(trash, trashlen,
Emeric Brun2b920a12010-09-23 18:30:22 +0200500 PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n",
501 ps->peer->id,
502 localpeer,
Willy Tarreau7b77c9f2012-01-07 22:52:12 +0100503 (int)getpid(),
Emeric Brun2b920a12010-09-23 18:30:22 +0200504 ps->table->table->id,
505 ps->table->table->type,
Willy Tarreaubd55e312010-11-11 10:55:09 +0100506 (int)ps->table->table->key_size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200507
David du Colombier7af46052012-05-16 14:16:48 +0200508 if (repl >= trashlen) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100509 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200510 goto switchstate;
511 }
512
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200513 repl = bi_putblk(si->ib, trash, repl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200514 if (repl <= 0) {
515 if (repl == -1)
516 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100517 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200518 goto switchstate;
519 }
520
521 /* switch to the waiting statuscode state */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100522 si->applet.st0 = PEER_SESSION_GETSTATUS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200523 /* fall through */
524 }
525 case PEER_SESSION_GETSTATUS: {
Willy Tarreau94981132012-05-21 17:09:48 +0200526 struct peer_session *ps = (struct peer_session *)si->conn.data_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200527
528 if (si->ib->flags & BF_WRITE_PARTIAL)
529 ps->statuscode = PEER_SESSION_CONNECTEDCODE;
530
David du Colombier7af46052012-05-16 14:16:48 +0200531 reql = bo_getline(si->ob, trash, trashlen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200532 if (reql <= 0) { /* closed or EOL not found */
533 if (reql == 0)
534 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100535 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200536 goto switchstate;
537 }
538 if (trash[reql-1] != '\n') {
539 /* Incomplete line, we quit */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100540 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200541 goto switchstate;
542 }
543 else if (reql > 1 && (trash[reql-2] == '\r'))
544 trash[reql-2] = 0;
545 else
546 trash[reql-1] = 0;
547
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200548 bo_skip(si->ob, reql);
Emeric Brun2b920a12010-09-23 18:30:22 +0200549
550 /* Register status code */
551 ps->statuscode = atoi(trash);
552
553 /* Awake main task */
554 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
555
556 /* If status code is success */
557 if (ps->statuscode == PEER_SESSION_SUCCESSCODE) {
558 /* Init cursors */
559 ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0;
560 ps->pushed = ps->update;
561
562 /* Init confirm counter */
563 ps->confirm = 0;
564
565 /* reset teaching and learning flags to 0 */
566 ps->flags &= PEER_TEACH_RESET;
567 ps->flags &= PEER_LEARN_RESET;
568
569 /* If current peer is local */
570 if (ps->peer->local) {
571 /* Init cursors to push a resync */
572 ps->teaching_origin = ps->pushed = ps->table->table->update;
573 /* flag to start to teach lesson */
574 ps->flags |= PEER_F_TEACH_PROCESS;
575
576 }
577 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
578 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
579 /* If peer is remote and resync from remote is needed,
580 and no peer currently assigned */
581
582 /* assign peer for a lesson */
583 ps->flags |= PEER_F_LEARN_ASSIGN;
584 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
585 }
586
587 }
588 else {
589 /* Status code is not success, abort */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100590 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200591 goto switchstate;
592 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100593 si->applet.st0 = PEER_SESSION_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200594 /* fall through */
595 }
596 case PEER_SESSION_WAITMSG: {
Willy Tarreau94981132012-05-21 17:09:48 +0200597 struct peer_session *ps = (struct peer_session *)si->conn.data_ctx;
Emeric Brun2b920a12010-09-23 18:30:22 +0200598 char c;
599 int totl = 0;
600
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200601 reql = bo_getblk(si->ob, (char *)&c, sizeof(c), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200602 if (reql <= 0) { /* closed or EOL not found */
603 if (reql == 0) {
604 /* nothing to read */
605 goto incomplete;
606 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100607 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200608 goto switchstate;
609 }
610 totl += reql;
611
612 if ((c & 0x80) || (c == 'D')) {
613 /* Here we have data message */
614 unsigned int pushack;
615 struct stksess *ts;
616 struct stksess *newts;
617 struct stktable_key stkey;
618 int srvid;
619 uint32_t netinteger;
620
621 /* Compute update remote version */
622 if (c & 0x80) {
623 pushack = ps->pushack + (unsigned int)(c & 0x7F);
624 }
625 else {
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200626 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200627 if (reql <= 0) { /* closed or EOL not found */
628 if (reql == 0) {
629 goto incomplete;
630 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100631 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200632 goto switchstate;
633 }
634 totl += reql;
635 pushack = ntohl(netinteger);
636 }
637
638 /* read key */
639 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
640 /* type string */
641 stkey.key = stkey.data.buf;
642
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200643 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200644 if (reql <= 0) { /* closed or EOL not found */
645 if (reql == 0) {
646 goto incomplete;
647 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100648 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200649 goto switchstate;
650 }
651 totl += reql;
652 stkey.key_len = ntohl(netinteger);
653
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200654 reql = bo_getblk(si->ob, stkey.key, stkey.key_len, totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200655 if (reql <= 0) { /* closed or EOL not found */
656 if (reql == 0) {
657 goto incomplete;
658 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100659 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200660 goto switchstate;
661 }
662 totl += reql;
663 }
664 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
665 /* type integer */
666 stkey.key_len = (size_t)-1;
667 stkey.key = &stkey.data.integer;
668
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200669 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200670 if (reql <= 0) { /* closed or EOL not found */
671 if (reql == 0) {
672 goto incomplete;
673 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100674 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200675 goto switchstate;
676 }
677 totl += reql;
678 stkey.data.integer = ntohl(netinteger);
679 }
680 else {
681 /* type ip */
682 stkey.key_len = (size_t)-1;
683 stkey.key = stkey.data.buf;
684
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200685 reql = bo_getblk(si->ob, (char *)&stkey.data.buf, ps->table->table->key_size, totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200686 if (reql <= 0) { /* closed or EOL not found */
687 if (reql == 0) {
688 goto incomplete;
689 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100690 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200691 goto switchstate;
692 }
693 totl += reql;
694
695 }
696
697 /* read server id */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200698 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200699 if (reql <= 0) { /* closed or EOL not found */
700 if (reql == 0) {
701 goto incomplete;
702 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100703 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200704 goto switchstate;
705 }
706 totl += reql;
707 srvid = ntohl(netinteger);
708
709 /* update entry */
710 newts = stksess_new(ps->table->table, &stkey);
711 if (newts) {
712 /* lookup for existing entry */
713 ts = stktable_lookup(ps->table->table, newts);
714 if (ts) {
715 /* the entry already exist, we can free ours */
716 stktable_touch(ps->table->table, ts, 0);
717 stksess_free(ps->table->table, newts);
718 }
719 else {
720 struct eb32_node *eb;
721
722 /* create new entry */
723 ts = stktable_store(ps->table->table, newts, 0);
724 ts->upd.key= (++ps->table->table->update)+(2^31);
725 eb = eb32_insert(&ps->table->table->updates, &ts->upd);
726 if (eb != &ts->upd) {
727 eb32_delete(eb);
728 eb32_insert(&ps->table->table->updates, &ts->upd);
729 }
730 }
731
732 /* update entry */
733 if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
734 stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
735 ps->pushack = pushack;
736 }
737
738 }
739 else if (c == 'R') {
740 /* Reset message: remote need resync */
741
742 /* reinit counters for a resync */
743 ps->lastpush = 0;
744 ps->teaching_origin = ps->pushed = ps->table->table->update;
745
746 /* reset teaching flags to 0 */
747 ps->flags &= PEER_TEACH_RESET;
748
749 /* flag to start to teach lesson */
750 ps->flags |= PEER_F_TEACH_PROCESS;
751 }
752 else if (c == 'F') {
753 /* Finish message, all known updates have been pushed by remote */
754 /* and remote is up to date */
755
756 /* If resync is in progress with remote peer */
757 if (ps->flags & PEER_F_LEARN_ASSIGN) {
758
759 /* unassign current peer for learning */
760 ps->flags &= ~PEER_F_LEARN_ASSIGN;
761 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
762
763 /* Consider table is now up2date, resync resync no more needed from local neither remote */
764 ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE);
765 }
766 /* Increase confirm counter to launch a confirm message */
767 ps->confirm++;
768 }
769 else if (c == 'c') {
770 /* confirm message, remote peer is now up to date with us */
771
772 /* If stopping state */
773 if (stopping) {
774 /* Close session, push resync no more needed */
775 ps->flags |= PEER_F_TEACH_COMPLETE;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100776 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200777 goto switchstate;
778 }
779
780 /* reset teaching flags to 0 */
781 ps->flags &= PEER_TEACH_RESET;
782 }
783 else if (c == 'C') {
784 /* Continue message, all known updates have been pushed by remote */
785 /* but remote is not up to date */
786
787 /* If resync is in progress with current peer */
788 if (ps->flags & PEER_F_LEARN_ASSIGN) {
789
790 /* unassign current peer */
791 ps->flags &= ~PEER_F_LEARN_ASSIGN;
792 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
793
794 /* flag current peer is not up 2 date to try from an other */
795 ps->flags |= PEER_F_LEARN_NOTUP2DATE;
796
797 /* reschedule a resync */
798 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
799 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
800 }
801 ps->confirm++;
802 }
803 else if (c == 'A') {
804 /* ack message */
805 uint32_t netinteger;
806
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200807 reql = bo_getblk(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200808 if (reql <= 0) { /* closed or EOL not found */
809 if (reql == 0) {
810 goto incomplete;
811 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100812 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200813 goto switchstate;
814 }
815 totl += reql;
816
817 /* Consider remote is up to date with "acked" version */
818 ps->update = ntohl(netinteger);
819 }
820 else {
821 /* Unknown message */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100822 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200823 goto switchstate;
824 }
825
826 /* skip consumed message */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200827 bo_skip(si->ob, totl);
Emeric Brun2b920a12010-09-23 18:30:22 +0200828
829 /* loop on that state to peek next message */
830 continue;
831incomplete:
832 /* Nothing to read, now we start to write */
833
834 /* Confirm finished or partial messages */
835 while (ps->confirm) {
836 /* There is a confirm messages to send */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200837 repl = bi_putchr(si->ib, 'c');
Emeric Brun2b920a12010-09-23 18:30:22 +0200838 if (repl <= 0) {
839 /* no more write possible */
840 if (repl == -1)
841 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100842 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200843 goto switchstate;
844 }
845 ps->confirm--;
846 }
847
848 /* Need to request a resync */
849 if ((ps->flags & PEER_F_LEARN_ASSIGN) &&
850 (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) &&
851 !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
852 /* Current peer was elected to request a resync */
853
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200854 repl = bi_putchr(si->ib, 'R');
Emeric Brun2b920a12010-09-23 18:30:22 +0200855 if (repl <= 0) {
856 /* no more write possible */
857 if (repl == -1)
858 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100859 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200860 goto switchstate;
861 }
862 ps->table->flags |= SHTABLE_F_RESYNC_PROCESS;
863 }
864
865 /* It remains some updates to ack */
866 if (ps->pushack != ps->lastack) {
867 uint32_t netinteger;
868
869 trash[0] = 'A';
870 netinteger = htonl(ps->pushack);
871 memcpy(&trash[1], &netinteger, sizeof(netinteger));
872
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200873 repl = bi_putblk(si->ib, trash, 1+sizeof(netinteger));
Emeric Brun2b920a12010-09-23 18:30:22 +0200874 if (repl <= 0) {
875 /* no more write possible */
876 if (repl == -1)
877 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100878 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200879 goto switchstate;
880 }
881 ps->lastack = ps->pushack;
882 }
883
884 if (ps->flags & PEER_F_TEACH_PROCESS) {
885 /* current peer was requested for a lesson */
886
887 if (!(ps->flags & PEER_F_TEACH_STAGE1)) {
888 /* lesson stage 1 not complete */
889 struct eb32_node *eb;
890
891 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
892 while (1) {
893 int msglen;
894 struct stksess *ts;
895
896 if (!eb) {
897 /* flag lesson stage1 complete */
898 ps->flags |= PEER_F_TEACH_STAGE1;
899 eb = eb32_first(&ps->table->table->updates);
900 if (eb)
901 ps->pushed = eb->key - 1;
902 break;
903 }
904
905 ts = eb32_entry(eb, struct stksess, upd);
David du Colombier7af46052012-05-16 14:16:48 +0200906 msglen = peer_prepare_datamsg(ts, ps, trash, trashlen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200907 if (msglen) {
908 /* message to buffer */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200909 repl = bi_putblk(si->ib, trash, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200910 if (repl <= 0) {
911 /* no more write possible */
912 if (repl == -1)
913 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100914 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200915 goto switchstate;
916 }
917 ps->lastpush = ps->pushed = ts->upd.key;
918 }
919 eb = eb32_next(eb);
920 }
921 } /* !TEACH_STAGE1 */
922
923 if (!(ps->flags & PEER_F_TEACH_STAGE2)) {
924 /* lesson stage 2 not complete */
925 struct eb32_node *eb;
926
927 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
928 while (1) {
929 int msglen;
930 struct stksess *ts;
931
932 if (!eb || eb->key > ps->teaching_origin) {
933 /* flag lesson stage1 complete */
934 ps->flags |= PEER_F_TEACH_STAGE2;
935 ps->pushed = ps->teaching_origin;
936 break;
937 }
938
939 ts = eb32_entry(eb, struct stksess, upd);
David du Colombier7af46052012-05-16 14:16:48 +0200940 msglen = peer_prepare_datamsg(ts, ps, trash, trashlen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200941 if (msglen) {
942 /* message to buffer */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200943 repl = bi_putblk(si->ib, trash, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200944 if (repl <= 0) {
945 /* no more write possible */
946 if (repl == -1)
947 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100948 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200949 goto switchstate;
950 }
951 ps->lastpush = ps->pushed = ts->upd.key;
952 }
953 eb = eb32_next(eb);
954 }
955 } /* !TEACH_STAGE2 */
956
957 if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
958 /* process final lesson message */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +0200959 repl = bi_putchr(si->ib, ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
Emeric Brun2b920a12010-09-23 18:30:22 +0200960 if (repl <= 0) {
961 /* no more write possible */
962 if (repl == -1)
963 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100964 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200965 goto switchstate;
966 }
967
968 /* flag finished message sent */
969 ps->flags |= PEER_F_TEACH_FINISHED;
970 } /* !TEACH_FINISHED */
971 } /* TEACH_PROCESS */
972
973 if (!(ps->flags & PEER_F_LEARN_ASSIGN) &&
974 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
975 /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */
976 struct eb32_node *eb;
977
978 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
979 while (1) {
980 int msglen;
981 struct stksess *ts;
982
983 /* push local updates */
984 if (!eb) {
985 eb = eb32_first(&ps->table->table->updates);
986 if (!eb || ((int)(eb->key - ps->pushed) <= 0)) {
987 ps->pushed = ps->table->table->localupdate;
988 break;
989 }
990 }
991
992 if ((int)(eb->key - ps->table->table->localupdate) > 0) {
993 ps->pushed = ps->table->table->localupdate;
994 break;
995 }
996
997 ts = eb32_entry(eb, struct stksess, upd);
David du Colombier7af46052012-05-16 14:16:48 +0200998 msglen = peer_prepare_datamsg(ts, ps, trash, trashlen);
Emeric Brun2b920a12010-09-23 18:30:22 +0200999 if (msglen) {
1000 /* message to buffer */
Willy Tarreau9dab5fc2012-05-07 11:56:55 +02001001 repl = bi_putblk(si->ib, trash, msglen);
Emeric Brun2b920a12010-09-23 18:30:22 +02001002 if (repl <= 0) {
1003 /* no more write possible */
1004 if (repl == -1)
1005 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +01001006 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001007 goto switchstate;
1008 }
1009 ps->lastpush = ps->pushed = ts->upd.key;
1010 }
1011 eb = eb32_next(eb);
1012 }
1013 } /* ! LEARN_ASSIGN */
1014 /* noting more to do */
1015 goto out;
1016 }
1017 case PEER_SESSION_EXIT:
David du Colombier7af46052012-05-16 14:16:48 +02001018 repl = snprintf(trash, trashlen, "%d\n", si->applet.st1);
Emeric Brun2b920a12010-09-23 18:30:22 +02001019
Willy Tarreau9dab5fc2012-05-07 11:56:55 +02001020 if (bi_putblk(si->ib, trash, repl) == -1)
Emeric Brun2b920a12010-09-23 18:30:22 +02001021 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +01001022 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001023 /* fall through */
1024 case PEER_SESSION_END: {
Willy Tarreau73b013b2012-05-21 16:31:45 +02001025 si_shutw(si);
1026 si_shutr(si);
Emeric Brun2b920a12010-09-23 18:30:22 +02001027 si->ib->flags |= BF_READ_NULL;
1028 goto quit;
1029 }
1030 }
1031 }
1032out:
Willy Tarreau73b013b2012-05-21 16:31:45 +02001033 si_update(si);
Emeric Brun2b920a12010-09-23 18:30:22 +02001034 si->ob->flags |= BF_READ_DONTWAIT;
1035 /* we don't want to expire timeouts while we're processing requests */
1036 si->ib->rex = TICK_ETERNITY;
1037 si->ob->wex = TICK_ETERNITY;
1038quit:
1039 return;
1040}
1041
Willy Tarreaub24281b2011-02-13 13:16:36 +01001042static struct si_applet peer_applet = {
1043 .name = "<PEER>", /* used for logging */
1044 .fct = peer_io_handler,
Aman Gupta9a13e842012-04-02 18:57:53 -07001045 .release = peer_session_release,
Willy Tarreaub24281b2011-02-13 13:16:36 +01001046};
Emeric Brun2b920a12010-09-23 18:30:22 +02001047
1048/*
1049 * Use this function to force a close of a peer session
1050 */
Simon Horman96553772011-06-08 09:18:51 +09001051static void peer_session_forceshutdown(struct session * session)
Emeric Brun2b920a12010-09-23 18:30:22 +02001052{
1053 struct stream_interface *oldsi;
1054
Willy Tarreau7c0a1512011-03-10 11:17:02 +01001055 if (session->si[0].target.type == TARG_TYPE_APPLET &&
1056 session->si[0].target.ptr.a == &peer_applet) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001057 oldsi = &session->si[0];
1058 }
1059 else {
1060 oldsi = &session->si[1];
1061 }
1062
1063 /* call release to reinit resync states if needed */
1064 peer_session_release(oldsi);
Willy Tarreaubc4af052011-02-13 13:25:14 +01001065 oldsi->applet.st0 = PEER_SESSION_END;
Willy Tarreau94981132012-05-21 17:09:48 +02001066 oldsi->conn.data_ctx = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001067 task_wakeup(session->task, TASK_WOKEN_MSG);
1068}
1069
1070/*
1071 * this function is called on a read event from a listen socket, corresponding
1072 * to an accept. It tries to accept as many connections as possible.
Willy Tarreaubd55e312010-11-11 10:55:09 +01001073 * It returns a positive value upon success, 0 if the connection needs to be
1074 * closed and ignored, or a negative value upon critical failure.
Emeric Brun2b920a12010-09-23 18:30:22 +02001075 */
1076int peer_accept(struct session *s)
1077{
1078 /* we have a dedicated I/O handler for the stats */
Willy Tarreaub24281b2011-02-13 13:16:36 +01001079 stream_int_register_handler(&s->si[1], &peer_applet);
Willy Tarreau7b7a8e92011-03-27 19:53:06 +02001080 copy_target(&s->target, &s->si[1].target); // for logging only
Willy Tarreau94981132012-05-21 17:09:48 +02001081 s->si[1].conn.data_ctx = s;
Willy Tarreaubc4af052011-02-13 13:25:14 +01001082 s->si[1].applet.st0 = PEER_SESSION_ACCEPT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001083
1084 tv_zero(&s->logs.tv_request);
1085 s->logs.t_queue = 0;
1086 s->logs.t_connect = 0;
1087 s->logs.t_data = 0;
1088 s->logs.t_close = 0;
1089 s->logs.bytes_in = s->logs.bytes_out = 0;
1090 s->logs.prx_queue_size = 0;/* we get the number of pending conns before us */
1091 s->logs.srv_queue_size = 0; /* we will get this number soon */
1092
1093 s->req->flags |= BF_READ_DONTWAIT; /* we plan to read small requests */
1094
1095 if (s->listener->timeout) {
1096 s->req->rto = *s->listener->timeout;
1097 s->rep->wto = *s->listener->timeout;
1098 }
1099 return 1;
1100}
1101
1102/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01001103 * Create a new peer session in assigned state (connect will start automatically)
Emeric Brun2b920a12010-09-23 18:30:22 +02001104 */
Simon Horman96553772011-06-08 09:18:51 +09001105static struct session *peer_session_create(struct peer *peer, struct peer_session *ps)
Emeric Brun2b920a12010-09-23 18:30:22 +02001106{
1107 struct listener *l = ((struct proxy *)peer->peers->peers_fe)->listen;
1108 struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */
1109 struct session *s;
1110 struct http_txn *txn;
1111 struct task *t;
1112
1113 if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */
1114 Alert("out of memory in event_accept().\n");
Emeric Brun2b920a12010-09-23 18:30:22 +02001115 goto out_close;
1116 }
1117
1118 LIST_ADDQ(&sessions, &s->list);
1119 LIST_INIT(&s->back_refs);
1120
1121 s->flags = SN_ASSIGNED|SN_ADDR_SET;
1122 s->term_trace = 0;
1123
1124 /* if this session comes from a known monitoring system, we want to ignore
1125 * it as soon as possible, which means closing it immediately for TCP.
1126 */
1127 if ((t = task_new()) == NULL) { /* disable this proxy for a while */
1128 Alert("out of memory in event_accept().\n");
Emeric Brun2b920a12010-09-23 18:30:22 +02001129 goto out_free_session;
1130 }
1131
1132 ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
1133 ps->statuscode = PEER_SESSION_CONNECTCODE;
1134
1135 t->process = l->handler;
1136 t->context = s;
1137 t->nice = l->nice;
1138
Willy Tarreau6471afb2011-09-23 10:54:59 +02001139 memcpy(&s->si[1].addr.to, &peer->addr, sizeof(s->si[1].addr.to));
Emeric Brun2b920a12010-09-23 18:30:22 +02001140 s->task = t;
1141 s->listener = l;
1142
1143 /* Note: initially, the session's backend points to the frontend.
1144 * This changes later when switching rules are executed or
1145 * when the default backend is assigned.
1146 */
1147 s->be = s->fe = p;
1148
1149 s->req = s->rep = NULL; /* will be allocated later */
1150
Willy Tarreau96596ae2012-06-08 22:57:36 +02001151 s->si[0].conn.peeraddr = NULL;
1152 s->si[0].conn.peerlen = 0;
Willy Tarreaufb7508a2012-05-21 16:47:54 +02001153 s->si[0].conn.t.sock.fd = -1;
Willy Tarreau505e34a2012-07-06 10:17:53 +02001154 s->si[0].conn.flags = CO_FL_NONE;
Emeric Brun2b920a12010-09-23 18:30:22 +02001155 s->si[0].owner = t;
1156 s->si[0].state = s->si[0].prev_state = SI_ST_EST;
1157 s->si[0].err_type = SI_ET_NONE;
1158 s->si[0].err_loc = NULL;
Willy Tarreau26d8c592012-05-07 18:12:14 +02001159 s->si[0].release = NULL;
Willy Tarreau63e7fe32012-05-08 15:20:43 +02001160 s->si[0].send_proxy_ofs = 0;
Emeric Brun21adb022012-05-18 16:32:13 +02001161 set_target_client(&s->si[0].target, l);
Emeric Brun2b920a12010-09-23 18:30:22 +02001162 s->si[0].exp = TICK_ETERNITY;
1163 s->si[0].flags = SI_FL_NONE;
1164 if (s->fe->options2 & PR_O2_INDEPSTR)
1165 s->si[0].flags |= SI_FL_INDEP_STR;
Emeric Brun2b920a12010-09-23 18:30:22 +02001166
Willy Tarreaub24281b2011-02-13 13:16:36 +01001167 stream_int_register_handler(&s->si[0], &peer_applet);
Willy Tarreaufa6bac62012-05-31 14:16:59 +02001168 s->si[0].applet.st0 = PEER_SESSION_CONNECT;
1169 s->si[0].conn.data_ctx = (void *)ps;
Emeric Brun2b920a12010-09-23 18:30:22 +02001170
Willy Tarreau96596ae2012-06-08 22:57:36 +02001171 s->si[1].conn.peeraddr = NULL;
1172 s->si[1].conn.peerlen = 0;
Willy Tarreaufb7508a2012-05-21 16:47:54 +02001173 s->si[1].conn.t.sock.fd = -1; /* just to help with debugging */
Willy Tarreau505e34a2012-07-06 10:17:53 +02001174 s->si[1].conn.flags = CO_FL_NONE;
Emeric Brun2b920a12010-09-23 18:30:22 +02001175 s->si[1].owner = t;
1176 s->si[1].state = s->si[1].prev_state = SI_ST_ASS;
1177 s->si[1].conn_retries = p->conn_retries;
1178 s->si[1].err_type = SI_ET_NONE;
1179 s->si[1].err_loc = NULL;
Willy Tarreau26d8c592012-05-07 18:12:14 +02001180 s->si[1].release = NULL;
Willy Tarreau63e7fe32012-05-08 15:20:43 +02001181 s->si[1].send_proxy_ofs = 0;
Willy Tarreau9e000c62011-03-10 14:03:36 +01001182 set_target_proxy(&s->si[1].target, s->be);
Willy Tarreauc5788912012-08-24 18:12:41 +02001183 si_prepare_conn(&s->si[1], peer->proto, peer->data);
Emeric Brun2b920a12010-09-23 18:30:22 +02001184 s->si[1].exp = TICK_ETERNITY;
1185 s->si[1].flags = SI_FL_NONE;
1186 if (s->be->options2 & PR_O2_INDEPSTR)
1187 s->si[1].flags |= SI_FL_INDEP_STR;
1188
Willy Tarreau9bd0d742011-07-20 00:17:39 +02001189 session_init_srv_conn(s);
Simon Horman8b7b05a2011-08-13 08:03:48 +09001190 set_target_proxy(&s->target, s->be);
Emeric Brun2b920a12010-09-23 18:30:22 +02001191 s->pend_pos = NULL;
1192
1193 /* init store persistence */
1194 s->store_count = 0;
1195 s->stkctr1_entry = NULL;
1196 s->stkctr2_entry = NULL;
1197
1198 /* FIXME: the logs are horribly complicated now, because they are
1199 * defined in <p>, <p>, and later <be> and <be>.
1200 */
1201
1202 s->logs.logwait = 0;
1203 s->do_log = NULL;
1204
1205 /* default error reporting function, may be changed by analysers */
1206 s->srv_error = default_srv_error;
1207
Emeric Brun2b920a12010-09-23 18:30:22 +02001208 s->uniq_id = 0;
Willy Tarreaubd833142012-05-08 15:51:44 +02001209 s->unique_id = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001210
1211 txn = &s->txn;
1212 /* Those variables will be checked and freed if non-NULL in
1213 * session.c:session_free(). It is important that they are
1214 * properly initialized.
1215 */
1216 txn->sessid = NULL;
1217 txn->srv_cookie = NULL;
1218 txn->cli_cookie = NULL;
1219 txn->uri = NULL;
1220 txn->req.cap = NULL;
1221 txn->rsp.cap = NULL;
1222 txn->hdr_idx.v = NULL;
1223 txn->hdr_idx.size = txn->hdr_idx.used = 0;
1224
1225 if ((s->req = pool_alloc2(pool2_buffer)) == NULL)
1226 goto out_fail_req; /* no memory */
1227
Willy Tarreau572bf902012-07-02 17:01:20 +02001228 s->req->buf.size = global.tune.bufsize;
Emeric Brun2b920a12010-09-23 18:30:22 +02001229 buffer_init(s->req);
1230 s->req->prod = &s->si[0];
1231 s->req->cons = &s->si[1];
1232 s->si[0].ib = s->si[1].ob = s->req;
1233
1234 s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */
1235
1236 /* activate default analysers enabled for this listener */
1237 s->req->analysers = l->analysers;
1238
1239 /* note: this should not happen anymore since there's always at least the switching rules */
1240 if (!s->req->analysers) {
1241 buffer_auto_connect(s->req);/* don't wait to establish connection */
1242 buffer_auto_close(s->req);/* let the producer forward close requests */
1243 }
1244
1245 s->req->rto = s->fe->timeout.client;
1246 s->req->wto = s->be->timeout.server;
1247
1248 if ((s->rep = pool_alloc2(pool2_buffer)) == NULL)
1249 goto out_fail_rep; /* no memory */
1250
Willy Tarreau572bf902012-07-02 17:01:20 +02001251 s->rep->buf.size = global.tune.bufsize;
Emeric Brun2b920a12010-09-23 18:30:22 +02001252 buffer_init(s->rep);
1253 s->rep->prod = &s->si[1];
1254 s->rep->cons = &s->si[0];
1255 s->si[0].ob = s->si[1].ib = s->rep;
1256
1257 s->rep->rto = s->be->timeout.server;
1258 s->rep->wto = s->fe->timeout.client;
1259
1260 s->req->rex = TICK_ETERNITY;
1261 s->req->wex = TICK_ETERNITY;
1262 s->req->analyse_exp = TICK_ETERNITY;
1263 s->rep->rex = TICK_ETERNITY;
1264 s->rep->wex = TICK_ETERNITY;
1265 s->rep->analyse_exp = TICK_ETERNITY;
1266 t->expire = TICK_ETERNITY;
1267
1268 s->rep->flags |= BF_READ_DONTWAIT;
1269 /* it is important not to call the wakeup function directly but to
1270 * pass through task_wakeup(), because this one knows how to apply
1271 * priorities to tasks.
1272 */
1273 task_wakeup(t, TASK_WOKEN_INIT);
1274
1275 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
1276 p->feconn++;/* beconn will be increased later */
1277 jobs++;
Willy Tarreau3c63fd82011-09-07 18:00:47 +02001278 if (!(s->listener->options & LI_O_UNLIMITED))
1279 actconn++;
Emeric Brun2b920a12010-09-23 18:30:22 +02001280 totalconn++;
1281
1282 return s;
1283
1284 /* Error unrolling */
1285 out_fail_rep:
1286 pool_free2(pool2_buffer, s->req);
1287 out_fail_req:
1288 task_free(t);
1289 out_free_session:
1290 LIST_DEL(&s->list);
1291 pool_free2(pool2_session, s);
1292 out_close:
1293 return s;
1294}
1295
1296/*
1297 * Task processing function to manage re-connect and peer session
1298 * tasks wakeup on local update.
1299 */
Simon Horman96553772011-06-08 09:18:51 +09001300static struct task *process_peer_sync(struct task * task)
Emeric Brun2b920a12010-09-23 18:30:22 +02001301{
1302 struct shared_table *st = (struct shared_table *)task->context;
1303 struct peer_session *ps;
1304
1305 task->expire = TICK_ETERNITY;
1306
1307 if (!stopping) {
1308 /* Normal case (not soft stop)*/
1309 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) &&
1310 (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) &&
1311 !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) {
1312 /* Resync from local peer needed
1313 no peer was assigned for the lesson
1314 and no old local peer found
1315 or resync timeout expire */
1316
1317 /* flag no more resync from local, to try resync from remotes */
1318 st->flags |= SHTABLE_F_RESYNC_LOCAL;
1319
1320 /* reschedule a resync */
1321 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1322 }
1323
1324 /* For each session */
1325 for (ps = st->sessions; ps; ps = ps->next) {
1326 /* For each remote peers */
1327 if (!ps->peer->local) {
1328 if (!ps->session) {
1329 /* no active session */
1330 if (ps->statuscode == 0 ||
1331 ps->statuscode == PEER_SESSION_SUCCESSCODE ||
1332 ((ps->statuscode == PEER_SESSION_CONNECTCODE ||
1333 ps->statuscode == PEER_SESSION_CONNECTEDCODE) &&
1334 tick_is_expired(ps->reconnect, now_ms))) {
1335 /* connection never tried
1336 * or previous session established with success
1337 * or previous session failed during connection
1338 * and reconnection timer is expired */
1339
1340 /* retry a connect */
1341 ps->session = peer_session_create(ps->peer, ps);
1342 }
1343 else if (ps->statuscode == PEER_SESSION_CONNECTCODE ||
1344 ps->statuscode == PEER_SESSION_CONNECTEDCODE) {
1345 /* If previous session failed during connection
1346 * but reconnection timer is not expired */
1347
1348 /* reschedule task for reconnect */
1349 task->expire = tick_first(task->expire, ps->reconnect);
1350 }
1351 /* else do nothing */
1352 } /* !ps->session */
1353 else if (ps->statuscode == PEER_SESSION_SUCCESSCODE) {
1354 /* current session is active and established */
1355 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1356 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1357 !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
1358 /* Resync from a remote is needed
1359 * and no peer was assigned for lesson
1360 * and current peer may be up2date */
1361
1362 /* assign peer for the lesson */
1363 ps->flags |= PEER_F_LEARN_ASSIGN;
1364 st->flags |= SHTABLE_F_RESYNC_ASSIGN;
1365
1366 /* awake peer session task to handle a request of resync */
1367 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1368 }
1369 else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
1370 /* awake peer session task to push local updates */
1371 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1372 }
1373 /* else do nothing */
1374 } /* SUCCESSCODE */
1375 } /* !ps->peer->local */
1376 } /* for */
1377
1378 /* Resync from remotes expired: consider resync is finished */
1379 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1380 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1381 tick_is_expired(st->resync_timeout, now_ms)) {
1382 /* Resync from remote peer needed
1383 * no peer was assigned for the lesson
1384 * and resync timeout expire */
1385
1386 /* flag no more resync from remote, consider resync is finished */
1387 st->flags |= SHTABLE_F_RESYNC_REMOTE;
1388 }
1389
1390 if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) {
1391 /* Resync not finished*/
1392 /* reschedule task to resync timeout, to ended resync if needed */
1393 task->expire = tick_first(task->expire, st->resync_timeout);
1394 }
1395 } /* !stopping */
1396 else {
1397 /* soft stop case */
1398 if (task->state & TASK_WOKEN_SIGNAL) {
1399 /* We've just recieved the signal */
1400 if (!(st->flags & SHTABLE_F_DONOTSTOP)) {
1401 /* add DO NOT STOP flag if not present */
1402 jobs++;
1403 st->flags |= SHTABLE_F_DONOTSTOP;
1404 }
1405
1406 /* disconnect all connected peers */
1407 for (ps = st->sessions; ps; ps = ps->next) {
1408 if (ps->session) {
1409 peer_session_forceshutdown(ps->session);
1410 ps->session = NULL;
1411 }
1412 }
1413 }
1414 ps = st->local_session;
1415
1416 if (ps->flags & PEER_F_TEACH_COMPLETE) {
1417 if (st->flags & SHTABLE_F_DONOTSTOP) {
1418 /* resync of new process was complete, current process can die now */
1419 jobs--;
1420 st->flags &= ~SHTABLE_F_DONOTSTOP;
1421 }
1422 }
1423 else if (!ps->session) {
1424 /* If session is not active */
1425 if (ps->statuscode == 0 ||
1426 ps->statuscode == PEER_SESSION_SUCCESSCODE ||
1427 ps->statuscode == PEER_SESSION_CONNECTEDCODE ||
1428 ps->statuscode == PEER_SESSION_TRYAGAIN) {
1429 /* connection never tried
1430 * or previous session was successfully established
1431 * or previous session tcp connect success but init state incomplete
1432 * or during previous connect, peer replies a try again statuscode */
1433
1434 /* connect to the peer */
1435 ps->session = peer_session_create(ps->peer, ps);
1436 }
1437 else {
1438 /* Other error cases */
1439 if (st->flags & SHTABLE_F_DONOTSTOP) {
1440 /* unable to resync new process, current process can die now */
1441 jobs--;
1442 st->flags &= ~SHTABLE_F_DONOTSTOP;
1443 }
1444 }
1445 }
1446 else if (ps->statuscode == PEER_SESSION_SUCCESSCODE &&
1447 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
1448 /* current session active and established
1449 awake session to push remaining local updates */
1450 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1451 }
1452 } /* stopping */
1453 /* Wakeup for re-connect */
1454 return task;
1455}
1456
1457/*
1458 * Function used to register a table for sync on a group of peers
1459 *
1460 */
1461void peers_register_table(struct peers *peers, struct stktable *table)
1462{
1463 struct shared_table *st;
1464 struct peer * curpeer;
1465 struct peer_session *ps;
1466
1467 st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
1468 st->table = table;
1469 st->next = peers->tables;
1470 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1471 peers->tables = st;
1472
1473 for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
1474 ps = (struct peer_session *)calloc(1,sizeof(struct peer_session));
1475 ps->table = st;
1476 ps->peer = curpeer;
1477 if (curpeer->local)
1478 st->local_session = ps;
1479 ps->next = st->sessions;
1480 ps->reconnect = now_ms;
1481 st->sessions = ps;
1482 peers->peers_fe->maxconn += 3;
1483 }
1484
1485 peers->peers_fe->listen->maxconn = peers->peers_fe->maxconn;
1486 st->sync_task = task_new();
1487 st->sync_task->process = process_peer_sync;
1488 st->sync_task->expire = TICK_ETERNITY;
1489 st->sync_task->context = (void *)st;
1490 table->sync_task =st->sync_task;
1491 signal_register_task(0, table->sync_task, 0);
1492 task_wakeup(st->sync_task, TASK_WOKEN_INIT);
1493}
1494