blob: 739c09b05df4ebbdb591d9f68293894b54256c49 [file] [log] [blame]
Emeric Brun2b920a12010-09-23 18:30:22 +02001/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01002 * Stick table synchro management.
Emeric Brun2b920a12010-09-23 18:30:22 +02003 *
4 * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12
13#include <errno.h>
14#include <fcntl.h>
15#include <stdio.h>
16#include <stdlib.h>
17#include <string.h>
18
19#include <sys/socket.h>
20#include <sys/stat.h>
21#include <sys/types.h>
22
23#include <common/compat.h>
24#include <common/config.h>
25#include <common/time.h>
26
27#include <types/global.h>
28#include <types/peers.h>
29
30#include <proto/acl.h>
31#include <proto/buffers.h>
32#include <proto/fd.h>
33#include <proto/log.h>
34#include <proto/hdr_idx.h>
35#include <proto/pattern.h>
36#include <proto/protocols.h>
37#include <proto/proto_tcp.h>
38#include <proto/proto_http.h>
39#include <proto/proxy.h>
40#include <proto/session.h>
41#include <proto/stream_interface.h>
42#include <proto/stream_sock.h>
43#include <proto/task.h>
44#include <proto/stick_table.h>
45#include <proto/signal.h>
46
47
48/*******************************/
49/* Current peer learning state */
50/*******************************/
51
52/******************************/
53/* Current table resync state */
54/******************************/
55#define SHTABLE_F_RESYNC_LOCAL 0x00000001 /* Learn from local finished or no more needed */
56#define SHTABLE_F_RESYNC_REMOTE 0x00000002 /* Learn from remote finished or no more needed */
57#define SHTABLE_F_RESYNC_ASSIGN 0x00000004 /* A peer was assigned to learn our lesson */
58#define SHTABLE_F_RESYNC_PROCESS 0x00000008 /* The assigned peer was requested for resync */
59#define SHTABLE_F_DONOTSTOP 0x00010000 /* Main table sync task block process during soft stop
60 to push data to new process */
61
62#define SHTABLE_RESYNC_STATEMASK (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
63#define SHTABLE_RESYNC_FROMLOCAL 0x00000000
64#define SHTABLE_RESYNC_FROMREMOTE SHTABLE_F_RESYNC_LOCAL
65#define SHTABLE_RESYNC_FINISHED (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE)
66
67/******************************/
68/* Remote peer teaching state */
69/******************************/
70#define PEER_F_TEACH_PROCESS 0x00000001 /* Teach a lesson to current peer */
71#define PEER_F_TEACH_STAGE1 0x00000002 /* Teach state 1 complete */
72#define PEER_F_TEACH_STAGE2 0x00000004 /* Teach stage 2 complete */
73#define PEER_F_TEACH_FINISHED 0x00000008 /* Teach conclude, (wait for confirm) */
74#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
75#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
76#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
77
78#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_STAGE1|PEER_F_TEACH_STAGE2|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
79#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
80
81
82/**********************************/
83/* Peer Session IO handler states */
84/**********************************/
85
86#define PEER_SESSION_ACCEPT 1000 /* Initial state for session create by an accept */
87#define PEER_SESSION_GETVERSION 1001 /* Validate supported protocol version*/
88#define PEER_SESSION_GETHOST 1002 /* Validate host ID correspond to local host id */
89#define PEER_SESSION_GETPEER 1003 /* Validate peer ID correspond to a known remote peer id */
90#define PEER_SESSION_GETTABLE 1004 /* Search into registered table for a table with same id and
91 validate type and size */
92#define PEER_SESSION_SENDSUCCESS 1005 /* Send ret code 200 (success) and wait for message */
93/* next state is WAITMSG */
94
95#define PEER_SESSION_CONNECT 2000 /* Initial state for session create on a connect,
96 push presentation into buffer */
97#define PEER_SESSION_GETSTATUS 2001 /* Wait for the welcome message */
98#define PEER_SESSION_WAITMSG 2002 /* Wait for datamessages*/
99/* loop on WAITMSG */
100
101#define PEER_SESSION_EXIT 10000 /* Exit with status code */
102#define PEER_SESSION_END 10001 /* Killed session */
103/* session ended */
104
105
106/**********************************/
107/* Peer Session status code */
108/**********************************/
109
110#define PEER_SESSION_CONNECTCODE 100 /* connect in progress */
111#define PEER_SESSION_CONNECTEDCODE 110 /* tcp connect success */
112
113#define PEER_SESSION_SUCCESSCODE 200 /* accept or connect successful */
114
115#define PEER_SESSION_TRYAGAIN 300 /* try again later */
116
117#define PEER_SESSION_ERRPROTO 501 /* error protocol */
118#define PEER_SESSION_ERRVERSION 502 /* unknown protocol version */
119#define PEER_SESSION_ERRHOST 503 /* bad host name */
120#define PEER_SESSION_ERRPEER 504 /* unknown peer */
121#define PEER_SESSION_ERRTYPE 505 /* table key type mismatch */
122#define PEER_SESSION_ERRSIZE 506 /* table key size mismatch */
123#define PEER_SESSION_ERRTABLE 507 /* unknown table */
124
125#define PEER_SESSION_PROTO_NAME "HAProxyS"
126
127struct peers *peers = NULL;
128void peer_session_forceshutdown(struct session * session);
129
130
131/*
132 * This prepare the data update message of the stick session <ts>, <ps> is the the peer session
133 * where the data going to be pushed, <msg> is a buffer of <size> to recieve data message content
134 */
135int peer_prepare_datamsg(struct stksess *ts, struct peer_session *ps, char *msg, size_t size)
136{
137 uint32_t netinteger;
138 int len;
139 /* construct message */
140 if (ps->lastpush && ts->upd.key > ps->lastpush && (ts->upd.key - ps->lastpush) <= 127) {
141 msg[0] = 0x80 + ts->upd.key - ps->lastpush;
142 len = sizeof(char);
143 }
144 else {
145 msg[0] = 'D';
146 netinteger = htonl(ts->upd.key);
147 memcpy(&msg[sizeof(char)], &netinteger, sizeof(netinteger));
148 len = sizeof(char) + sizeof(netinteger);
149 }
150
151 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
152 int stlen = strlen((char *)ts->key.key);
153
154 netinteger = htonl(strlen((char *)ts->key.key));
155 memcpy(&msg[len], &netinteger, sizeof(netinteger));
156 memcpy(&msg[len+sizeof(netinteger)], ts->key.key, stlen);
157 len += sizeof(netinteger) + stlen;
158
159 }
160 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
161 netinteger = htonl(*((uint32_t *)ts->key.key));
162 memcpy(&msg[len], &netinteger, sizeof(netinteger));
163 len += sizeof(netinteger);
164 }
165 else {
166 memcpy(&msg[len], ts->key.key, ps->table->table->key_size);
167 len += ps->table->table->key_size;
168 }
169
170 if (stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
171 netinteger = htonl(stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id));
172 else
173 netinteger = 0;
174
175 memcpy(&msg[len], &netinteger , sizeof(netinteger));
176 len += sizeof(netinteger);
177
178 return len;
179}
180
181
182/*
183 * Callback to release a session with a peer
184 */
185void peer_session_release(struct stream_interface *si)
186{
187 struct task *t= (struct task *)si->owner;
188 struct session *s = (struct session *)t->context;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100189 struct peer_session *ps = (struct peer_session *)si->applet.private;
Emeric Brun2b920a12010-09-23 18:30:22 +0200190
Willy Tarreaubc4af052011-02-13 13:25:14 +0100191 /* si->applet.private is not a peer session */
192 if (si->applet.st0 < PEER_SESSION_SENDSUCCESS)
Emeric Brun2b920a12010-09-23 18:30:22 +0200193 return;
194
195 /* peer session identified */
196 if (ps) {
197 if (ps->session == s) {
198 ps->session = NULL;
199 if (ps->flags & PEER_F_LEARN_ASSIGN) {
200 /* unassign current peer for learning */
201 ps->flags &= ~(PEER_F_LEARN_ASSIGN);
202 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
203
204 /* reschedule a resync */
205 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
206 }
207 /* reset teaching and learning flags to 0 */
208 ps->flags &= PEER_TEACH_RESET;
209 ps->flags &= PEER_LEARN_RESET;
210 }
211 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
212 }
213}
214
215
216/*
217 * IO Handler to handle message exchance with a peer
218 */
Willy Tarreaub24281b2011-02-13 13:16:36 +0100219static void peer_io_handler(struct stream_interface *si)
Emeric Brun2b920a12010-09-23 18:30:22 +0200220{
221 struct task *t= (struct task *)si->owner;
222 struct session *s = (struct session *)t->context;
223 struct peers *curpeers = (struct peers *)s->fe->parent;
224 int reql = 0;
225 int repl = 0;
226
227 while (1) {
228switchstate:
Willy Tarreaubc4af052011-02-13 13:25:14 +0100229 switch(si->applet.st0) {
Emeric Brun2b920a12010-09-23 18:30:22 +0200230 case PEER_SESSION_ACCEPT:
Willy Tarreaubc4af052011-02-13 13:25:14 +0100231 si->applet.private = NULL;
232 si->applet.st0 = PEER_SESSION_GETVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200233 /* fall through */
234 case PEER_SESSION_GETVERSION:
235 reql = buffer_get_line(si->ob, trash, sizeof(trash));
236 if (reql <= 0) { /* closed or EOL not found */
237 if (reql == 0)
238 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100239 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200240 goto switchstate;
241 }
242 if (trash[reql-1] != '\n') {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100243 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200244 goto switchstate;
245 }
246 else if (reql > 1 && (trash[reql-2] == '\r'))
247 trash[reql-2] = 0;
248 else
249 trash[reql-1] = 0;
250
251 buffer_skip(si->ob, reql);
252
253 /* test version */
254 if (strcmp(PEER_SESSION_PROTO_NAME " 1.0", trash) != 0) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100255 si->applet.st0 = PEER_SESSION_EXIT;
256 si->applet.st1 = PEER_SESSION_ERRVERSION;
Emeric Brun2b920a12010-09-23 18:30:22 +0200257 /* test protocol */
258 if (strncmp(PEER_SESSION_PROTO_NAME " ", trash, strlen(PEER_SESSION_PROTO_NAME)+1) != 0)
Willy Tarreaubc4af052011-02-13 13:25:14 +0100259 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200260 goto switchstate;
261 }
262
Willy Tarreaubc4af052011-02-13 13:25:14 +0100263 si->applet.st0 = PEER_SESSION_GETHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200264 /* fall through */
265 case PEER_SESSION_GETHOST:
266 reql = buffer_get_line(si->ob, trash, sizeof(trash));
267 if (reql <= 0) { /* closed or EOL not found */
268 if (reql == 0)
269 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100270 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200271 goto switchstate;
272 }
273 if (trash[reql-1] != '\n') {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100274 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200275 goto switchstate;
276 }
277 else if (reql > 1 && (trash[reql-2] == '\r'))
278 trash[reql-2] = 0;
279 else
280 trash[reql-1] = 0;
281
282 buffer_skip(si->ob, reql);
283
284 /* test hostname match */
285 if (strcmp(localpeer, trash) != 0) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100286 si->applet.st0 = PEER_SESSION_EXIT;
287 si->applet.st1 = PEER_SESSION_ERRHOST;
Emeric Brun2b920a12010-09-23 18:30:22 +0200288 goto switchstate;
289 }
290
Willy Tarreaubc4af052011-02-13 13:25:14 +0100291 si->applet.st0 = PEER_SESSION_GETPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200292 /* fall through */
293 case PEER_SESSION_GETPEER: {
294 struct peer *curpeer;
295 char *p;
296 reql = buffer_get_line(si->ob, trash, sizeof(trash));
297 if (reql <= 0) { /* closed or EOL not found */
298 if (reql == 0)
299 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100300 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200301 goto switchstate;
302 }
303 if (trash[reql-1] != '\n') {
304 /* Incomplete line, we quit */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100305 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200306 goto switchstate;
307 }
308 else if (reql > 1 && (trash[reql-2] == '\r'))
309 trash[reql-2] = 0;
310 else
311 trash[reql-1] = 0;
312
313 buffer_skip(si->ob, reql);
314
315 /* parse line "<peer name> <pid>" */
316 p = strchr(trash, ' ');
317 if (!p) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100318 si->applet.st0 = PEER_SESSION_EXIT;
319 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200320 goto switchstate;
321 }
322 *p = 0;
323
324 /* lookup known peer */
325 for (curpeer = curpeers->remote; curpeer; curpeer = curpeer->next) {
326 if (strcmp(curpeer->id, trash) == 0)
327 break;
328 }
329
330 /* if unknown peer */
331 if (!curpeer) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100332 si->applet.st0 = PEER_SESSION_EXIT;
333 si->applet.st1 = PEER_SESSION_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200334 goto switchstate;
335 }
336
Willy Tarreaubc4af052011-02-13 13:25:14 +0100337 si->applet.private = curpeer;
338 si->applet.st0 = PEER_SESSION_GETTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200339 /* fall through */
340 }
341 case PEER_SESSION_GETTABLE: {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100342 struct peer *curpeer = (struct peer *)si->applet.private;
Emeric Brun2b920a12010-09-23 18:30:22 +0200343 struct shared_table *st;
344 struct peer_session *ps = NULL;
345 unsigned long key_type;
346 size_t key_size;
347 char *p;
348
349 reql = buffer_get_line(si->ob, trash, sizeof(trash));
350 if (reql <= 0) { /* closed or EOL not found */
351 if (reql == 0)
352 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100353 si->applet.private = NULL;
354 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200355 goto switchstate;
356 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100357 /* Re init si->applet.private to null, to handle correctly a release case */
358 si->applet.private = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +0200359
360 if (trash[reql-1] != '\n') {
361 /* Incomplete line, we quit */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100362 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200363 goto switchstate;
364 }
365 else if (reql > 1 && (trash[reql-2] == '\r'))
366 trash[reql-2] = 0;
367 else
368 trash[reql-1] = 0;
369
370 buffer_skip(si->ob, reql);
371
372 /* Parse line "<table name> <type> <size>" */
373 p = strchr(trash, ' ');
374 if (!p) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100375 si->applet.st0 = PEER_SESSION_EXIT;
376 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200377 goto switchstate;
378 }
379 *p = 0;
380 key_type = (unsigned long)atol(p+1);
381
382 p = strchr(p+1, ' ');
383 if (!p) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100384 si->applet.private = NULL;
385 si->applet.st0 = PEER_SESSION_EXIT;
386 si->applet.st1 = PEER_SESSION_ERRPROTO;
Emeric Brun2b920a12010-09-23 18:30:22 +0200387 goto switchstate;
388 }
389
390 key_size = (size_t)atoi(p);
391 for (st = curpeers->tables; st; st = st->next) {
392 /* If table name matches */
393 if (strcmp(st->table->id, trash) == 0) {
394 /* If key size mismatches */
395 if (key_size != st->table->key_size) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100396 si->applet.st0 = PEER_SESSION_EXIT;
397 si->applet.st1 = PEER_SESSION_ERRSIZE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200398 goto switchstate;
399 }
400
401 /* If key type mismatches */
402 if (key_type != st->table->type) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100403 si->applet.st0 = PEER_SESSION_EXIT;
404 si->applet.st1 = PEER_SESSION_ERRTYPE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200405 goto switchstate;
406 }
407
408 /* lookup peer session of current peer */
409 for (ps = st->sessions; ps; ps = ps->next) {
410 if (ps->peer == curpeer) {
411 /* If session already active, replaced by new one */
412 if (ps->session && ps->session != s) {
413 if (ps->peer->local) {
414 /* Local connection, reply a retry */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100415 si->applet.st0 = PEER_SESSION_EXIT;
416 si->applet.st1 = PEER_SESSION_TRYAGAIN;
Emeric Brun2b920a12010-09-23 18:30:22 +0200417 goto switchstate;
418 }
419 peer_session_forceshutdown(ps->session);
420 }
421 ps->session = s;
422 break;
423 }
424 }
425 break;
426 }
427 }
428
429 /* If table not found */
430 if (!st){
Willy Tarreaubc4af052011-02-13 13:25:14 +0100431 si->applet.st0 = PEER_SESSION_EXIT;
432 si->applet.st1 = PEER_SESSION_ERRTABLE;
Emeric Brun2b920a12010-09-23 18:30:22 +0200433 goto switchstate;
434 }
435
436 /* If no peer session for current peer */
437 if (!ps) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100438 si->applet.st0 = PEER_SESSION_EXIT;
439 si->applet.st1 = PEER_SESSION_ERRPEER;
Emeric Brun2b920a12010-09-23 18:30:22 +0200440 goto switchstate;
441 }
442
Willy Tarreaubc4af052011-02-13 13:25:14 +0100443 si->applet.private = ps;
444 si->applet.st0 = PEER_SESSION_SENDSUCCESS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200445 /* fall through */
446 }
447 case PEER_SESSION_SENDSUCCESS:{
Willy Tarreaubc4af052011-02-13 13:25:14 +0100448 struct peer_session *ps = (struct peer_session *)si->applet.private;
Emeric Brun2b920a12010-09-23 18:30:22 +0200449
450 repl = snprintf(trash, sizeof(trash), "%d\n", PEER_SESSION_SUCCESSCODE);
451 repl = buffer_put_block(si->ib, trash, repl);
452 if (repl <= 0) {
453 if (repl == -1)
454 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100455 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200456 goto switchstate;
457 }
458
459 /* Register status code */
460 ps->statuscode = PEER_SESSION_SUCCESSCODE;
461
462 /* Awake main task */
463 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
464
465 /* Init cursors */
466 ps->teaching_origin =ps->lastpush = ps->lastack = ps->pushack = 0;
467 ps->pushed = ps->update;
468
469 /* Init confirm counter */
470 ps->confirm = 0;
471
472 /* reset teaching and learning flags to 0 */
473 ps->flags &= PEER_TEACH_RESET;
474 ps->flags &= PEER_LEARN_RESET;
475
476 /* if current peer is local */
477 if (ps->peer->local) {
478 /* if table need resyncfrom local and no process assined */
479 if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL &&
480 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
481 /* assign local peer for a lesson, consider lesson already requested */
482 ps->flags |= PEER_F_LEARN_ASSIGN;
483 ps->table->flags |= (SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
484 }
485
486 }
487 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
488 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
489 /* assign peer for a lesson */
490 ps->flags |= PEER_F_LEARN_ASSIGN;
491 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
492 }
493 /* switch to waiting message state */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100494 si->applet.st0 = PEER_SESSION_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200495 goto switchstate;
496 }
497 case PEER_SESSION_CONNECT: {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100498 struct peer_session *ps = (struct peer_session *)si->applet.private;
Emeric Brun2b920a12010-09-23 18:30:22 +0200499
500 /* Send headers */
501 repl = snprintf(trash, sizeof(trash),
502 PEER_SESSION_PROTO_NAME " 1.0\n%s\n%s %d\n%s %lu %d\n",
503 ps->peer->id,
504 localpeer,
505 getpid(),
506 ps->table->table->id,
507 ps->table->table->type,
Willy Tarreaubd55e312010-11-11 10:55:09 +0100508 (int)ps->table->table->key_size);
Emeric Brun2b920a12010-09-23 18:30:22 +0200509
510 if (repl >= sizeof(trash)) {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100511 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200512 goto switchstate;
513 }
514
515 repl = buffer_put_block(si->ib, trash, repl);
516 if (repl <= 0) {
517 if (repl == -1)
518 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100519 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200520 goto switchstate;
521 }
522
523 /* switch to the waiting statuscode state */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100524 si->applet.st0 = PEER_SESSION_GETSTATUS;
Emeric Brun2b920a12010-09-23 18:30:22 +0200525 /* fall through */
526 }
527 case PEER_SESSION_GETSTATUS: {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100528 struct peer_session *ps = (struct peer_session *)si->applet.private;
Emeric Brun2b920a12010-09-23 18:30:22 +0200529
530 if (si->ib->flags & BF_WRITE_PARTIAL)
531 ps->statuscode = PEER_SESSION_CONNECTEDCODE;
532
533 reql = buffer_get_line(si->ob, trash, sizeof(trash));
534 if (reql <= 0) { /* closed or EOL not found */
535 if (reql == 0)
536 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100537 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200538 goto switchstate;
539 }
540 if (trash[reql-1] != '\n') {
541 /* Incomplete line, we quit */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100542 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200543 goto switchstate;
544 }
545 else if (reql > 1 && (trash[reql-2] == '\r'))
546 trash[reql-2] = 0;
547 else
548 trash[reql-1] = 0;
549
550 buffer_skip(si->ob, reql);
551
552 /* Register status code */
553 ps->statuscode = atoi(trash);
554
555 /* Awake main task */
556 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
557
558 /* If status code is success */
559 if (ps->statuscode == PEER_SESSION_SUCCESSCODE) {
560 /* Init cursors */
561 ps->teaching_origin = ps->lastpush = ps->lastack = ps->pushack = 0;
562 ps->pushed = ps->update;
563
564 /* Init confirm counter */
565 ps->confirm = 0;
566
567 /* reset teaching and learning flags to 0 */
568 ps->flags &= PEER_TEACH_RESET;
569 ps->flags &= PEER_LEARN_RESET;
570
571 /* If current peer is local */
572 if (ps->peer->local) {
573 /* Init cursors to push a resync */
574 ps->teaching_origin = ps->pushed = ps->table->table->update;
575 /* flag to start to teach lesson */
576 ps->flags |= PEER_F_TEACH_PROCESS;
577
578 }
579 else if ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE &&
580 !(ps->table->flags & SHTABLE_F_RESYNC_ASSIGN)) {
581 /* If peer is remote and resync from remote is needed,
582 and no peer currently assigned */
583
584 /* assign peer for a lesson */
585 ps->flags |= PEER_F_LEARN_ASSIGN;
586 ps->table->flags |= SHTABLE_F_RESYNC_ASSIGN;
587 }
588
589 }
590 else {
591 /* Status code is not success, abort */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100592 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200593 goto switchstate;
594 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100595 si->applet.st0 = PEER_SESSION_WAITMSG;
Emeric Brun2b920a12010-09-23 18:30:22 +0200596 /* fall through */
597 }
598 case PEER_SESSION_WAITMSG: {
Willy Tarreaubc4af052011-02-13 13:25:14 +0100599 struct peer_session *ps = (struct peer_session *)si->applet.private;
Emeric Brun2b920a12010-09-23 18:30:22 +0200600 char c;
601 int totl = 0;
602
603 reql = buffer_get_block(si->ob, (char *)&c, sizeof(c), totl);
604 if (reql <= 0) { /* closed or EOL not found */
605 if (reql == 0) {
606 /* nothing to read */
607 goto incomplete;
608 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100609 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200610 goto switchstate;
611 }
612 totl += reql;
613
614 if ((c & 0x80) || (c == 'D')) {
615 /* Here we have data message */
616 unsigned int pushack;
617 struct stksess *ts;
618 struct stksess *newts;
619 struct stktable_key stkey;
620 int srvid;
621 uint32_t netinteger;
622
623 /* Compute update remote version */
624 if (c & 0x80) {
625 pushack = ps->pushack + (unsigned int)(c & 0x7F);
626 }
627 else {
628 reql = buffer_get_block(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
629 if (reql <= 0) { /* closed or EOL not found */
630 if (reql == 0) {
631 goto incomplete;
632 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100633 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200634 goto switchstate;
635 }
636 totl += reql;
637 pushack = ntohl(netinteger);
638 }
639
640 /* read key */
641 if (ps->table->table->type == STKTABLE_TYPE_STRING) {
642 /* type string */
643 stkey.key = stkey.data.buf;
644
645 reql = buffer_get_block(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
646 if (reql <= 0) { /* closed or EOL not found */
647 if (reql == 0) {
648 goto incomplete;
649 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100650 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200651 goto switchstate;
652 }
653 totl += reql;
654 stkey.key_len = ntohl(netinteger);
655
656 reql = buffer_get_block(si->ob, stkey.key, stkey.key_len, totl);
657 if (reql <= 0) { /* closed or EOL not found */
658 if (reql == 0) {
659 goto incomplete;
660 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100661 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200662 goto switchstate;
663 }
664 totl += reql;
665 }
666 else if (ps->table->table->type == STKTABLE_TYPE_INTEGER) {
667 /* type integer */
668 stkey.key_len = (size_t)-1;
669 stkey.key = &stkey.data.integer;
670
671 reql = buffer_get_block(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
672 if (reql <= 0) { /* closed or EOL not found */
673 if (reql == 0) {
674 goto incomplete;
675 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100676 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200677 goto switchstate;
678 }
679 totl += reql;
680 stkey.data.integer = ntohl(netinteger);
681 }
682 else {
683 /* type ip */
684 stkey.key_len = (size_t)-1;
685 stkey.key = stkey.data.buf;
686
687 reql = buffer_get_block(si->ob, (char *)&stkey.data.buf, ps->table->table->key_size, totl);
688 if (reql <= 0) { /* closed or EOL not found */
689 if (reql == 0) {
690 goto incomplete;
691 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100692 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200693 goto switchstate;
694 }
695 totl += reql;
696
697 }
698
699 /* read server id */
700 reql = buffer_get_block(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
701 if (reql <= 0) { /* closed or EOL not found */
702 if (reql == 0) {
703 goto incomplete;
704 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100705 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200706 goto switchstate;
707 }
708 totl += reql;
709 srvid = ntohl(netinteger);
710
711 /* update entry */
712 newts = stksess_new(ps->table->table, &stkey);
713 if (newts) {
714 /* lookup for existing entry */
715 ts = stktable_lookup(ps->table->table, newts);
716 if (ts) {
717 /* the entry already exist, we can free ours */
718 stktable_touch(ps->table->table, ts, 0);
719 stksess_free(ps->table->table, newts);
720 }
721 else {
722 struct eb32_node *eb;
723
724 /* create new entry */
725 ts = stktable_store(ps->table->table, newts, 0);
726 ts->upd.key= (++ps->table->table->update)+(2^31);
727 eb = eb32_insert(&ps->table->table->updates, &ts->upd);
728 if (eb != &ts->upd) {
729 eb32_delete(eb);
730 eb32_insert(&ps->table->table->updates, &ts->upd);
731 }
732 }
733
734 /* update entry */
735 if (srvid && stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID))
736 stktable_data_cast(stktable_data_ptr(ps->table->table, ts, STKTABLE_DT_SERVER_ID), server_id) = srvid;
737 ps->pushack = pushack;
738 }
739
740 }
741 else if (c == 'R') {
742 /* Reset message: remote need resync */
743
744 /* reinit counters for a resync */
745 ps->lastpush = 0;
746 ps->teaching_origin = ps->pushed = ps->table->table->update;
747
748 /* reset teaching flags to 0 */
749 ps->flags &= PEER_TEACH_RESET;
750
751 /* flag to start to teach lesson */
752 ps->flags |= PEER_F_TEACH_PROCESS;
753 }
754 else if (c == 'F') {
755 /* Finish message, all known updates have been pushed by remote */
756 /* and remote is up to date */
757
758 /* If resync is in progress with remote peer */
759 if (ps->flags & PEER_F_LEARN_ASSIGN) {
760
761 /* unassign current peer for learning */
762 ps->flags &= ~PEER_F_LEARN_ASSIGN;
763 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
764
765 /* Consider table is now up2date, resync resync no more needed from local neither remote */
766 ps->table->flags |= (SHTABLE_F_RESYNC_LOCAL|SHTABLE_F_RESYNC_REMOTE);
767 }
768 /* Increase confirm counter to launch a confirm message */
769 ps->confirm++;
770 }
771 else if (c == 'c') {
772 /* confirm message, remote peer is now up to date with us */
773
774 /* If stopping state */
775 if (stopping) {
776 /* Close session, push resync no more needed */
777 ps->flags |= PEER_F_TEACH_COMPLETE;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100778 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200779 goto switchstate;
780 }
781
782 /* reset teaching flags to 0 */
783 ps->flags &= PEER_TEACH_RESET;
784 }
785 else if (c == 'C') {
786 /* Continue message, all known updates have been pushed by remote */
787 /* but remote is not up to date */
788
789 /* If resync is in progress with current peer */
790 if (ps->flags & PEER_F_LEARN_ASSIGN) {
791
792 /* unassign current peer */
793 ps->flags &= ~PEER_F_LEARN_ASSIGN;
794 ps->table->flags &= ~(SHTABLE_F_RESYNC_ASSIGN|SHTABLE_F_RESYNC_PROCESS);
795
796 /* flag current peer is not up 2 date to try from an other */
797 ps->flags |= PEER_F_LEARN_NOTUP2DATE;
798
799 /* reschedule a resync */
800 ps->table->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
801 task_wakeup(ps->table->sync_task, TASK_WOKEN_MSG);
802 }
803 ps->confirm++;
804 }
805 else if (c == 'A') {
806 /* ack message */
807 uint32_t netinteger;
808
809 reql = buffer_get_block(si->ob, (char *)&netinteger, sizeof(netinteger), totl);
810 if (reql <= 0) { /* closed or EOL not found */
811 if (reql == 0) {
812 goto incomplete;
813 }
Willy Tarreaubc4af052011-02-13 13:25:14 +0100814 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200815 goto switchstate;
816 }
817 totl += reql;
818
819 /* Consider remote is up to date with "acked" version */
820 ps->update = ntohl(netinteger);
821 }
822 else {
823 /* Unknown message */
Willy Tarreaubc4af052011-02-13 13:25:14 +0100824 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200825 goto switchstate;
826 }
827
828 /* skip consumed message */
829 buffer_skip(si->ob, totl);
830
831 /* loop on that state to peek next message */
832 continue;
833incomplete:
834 /* Nothing to read, now we start to write */
835
836 /* Confirm finished or partial messages */
837 while (ps->confirm) {
838 /* There is a confirm messages to send */
839 repl = buffer_put_char(si->ib, 'c');
840 if (repl <= 0) {
841 /* no more write possible */
842 if (repl == -1)
843 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100844 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200845 goto switchstate;
846 }
847 ps->confirm--;
848 }
849
850 /* Need to request a resync */
851 if ((ps->flags & PEER_F_LEARN_ASSIGN) &&
852 (ps->table->flags & SHTABLE_F_RESYNC_ASSIGN) &&
853 !(ps->table->flags & SHTABLE_F_RESYNC_PROCESS)) {
854 /* Current peer was elected to request a resync */
855
856 repl = buffer_put_char(si->ib, 'R');
857 if (repl <= 0) {
858 /* no more write possible */
859 if (repl == -1)
860 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100861 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200862 goto switchstate;
863 }
864 ps->table->flags |= SHTABLE_F_RESYNC_PROCESS;
865 }
866
867 /* It remains some updates to ack */
868 if (ps->pushack != ps->lastack) {
869 uint32_t netinteger;
870
871 trash[0] = 'A';
872 netinteger = htonl(ps->pushack);
873 memcpy(&trash[1], &netinteger, sizeof(netinteger));
874
875 repl = buffer_put_block(si->ib, trash, 1+sizeof(netinteger));
876 if (repl <= 0) {
877 /* no more write possible */
878 if (repl == -1)
879 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100880 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200881 goto switchstate;
882 }
883 ps->lastack = ps->pushack;
884 }
885
886 if (ps->flags & PEER_F_TEACH_PROCESS) {
887 /* current peer was requested for a lesson */
888
889 if (!(ps->flags & PEER_F_TEACH_STAGE1)) {
890 /* lesson stage 1 not complete */
891 struct eb32_node *eb;
892
893 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
894 while (1) {
895 int msglen;
896 struct stksess *ts;
897
898 if (!eb) {
899 /* flag lesson stage1 complete */
900 ps->flags |= PEER_F_TEACH_STAGE1;
901 eb = eb32_first(&ps->table->table->updates);
902 if (eb)
903 ps->pushed = eb->key - 1;
904 break;
905 }
906
907 ts = eb32_entry(eb, struct stksess, upd);
908 msglen = peer_prepare_datamsg(ts, ps, trash, sizeof(trash));
909 if (msglen) {
910 /* message to buffer */
911 repl = buffer_put_block(si->ib, trash, msglen);
912 if (repl <= 0) {
913 /* no more write possible */
914 if (repl == -1)
915 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100916 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200917 goto switchstate;
918 }
919 ps->lastpush = ps->pushed = ts->upd.key;
920 }
921 eb = eb32_next(eb);
922 }
923 } /* !TEACH_STAGE1 */
924
925 if (!(ps->flags & PEER_F_TEACH_STAGE2)) {
926 /* lesson stage 2 not complete */
927 struct eb32_node *eb;
928
929 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
930 while (1) {
931 int msglen;
932 struct stksess *ts;
933
934 if (!eb || eb->key > ps->teaching_origin) {
935 /* flag lesson stage1 complete */
936 ps->flags |= PEER_F_TEACH_STAGE2;
937 ps->pushed = ps->teaching_origin;
938 break;
939 }
940
941 ts = eb32_entry(eb, struct stksess, upd);
942 msglen = peer_prepare_datamsg(ts, ps, trash, sizeof(trash));
943 if (msglen) {
944 /* message to buffer */
945 repl = buffer_put_block(si->ib, trash, msglen);
946 if (repl <= 0) {
947 /* no more write possible */
948 if (repl == -1)
949 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100950 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200951 goto switchstate;
952 }
953 ps->lastpush = ps->pushed = ts->upd.key;
954 }
955 eb = eb32_next(eb);
956 }
957 } /* !TEACH_STAGE2 */
958
959 if (!(ps->flags & PEER_F_TEACH_FINISHED)) {
960 /* process final lesson message */
961 repl = buffer_put_char(si->ib, ((ps->table->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FINISHED) ? 'F' : 'C');
962 if (repl <= 0) {
963 /* no more write possible */
964 if (repl == -1)
965 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +0100966 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +0200967 goto switchstate;
968 }
969
970 /* flag finished message sent */
971 ps->flags |= PEER_F_TEACH_FINISHED;
972 } /* !TEACH_FINISHED */
973 } /* TEACH_PROCESS */
974
975 if (!(ps->flags & PEER_F_LEARN_ASSIGN) &&
976 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
977 /* Push local updates, only if no learning in progress (to avoid ping-pong effects) */
978 struct eb32_node *eb;
979
980 eb = eb32_lookup_ge(&ps->table->table->updates, ps->pushed+1);
981 while (1) {
982 int msglen;
983 struct stksess *ts;
984
985 /* push local updates */
986 if (!eb) {
987 eb = eb32_first(&ps->table->table->updates);
988 if (!eb || ((int)(eb->key - ps->pushed) <= 0)) {
989 ps->pushed = ps->table->table->localupdate;
990 break;
991 }
992 }
993
994 if ((int)(eb->key - ps->table->table->localupdate) > 0) {
995 ps->pushed = ps->table->table->localupdate;
996 break;
997 }
998
999 ts = eb32_entry(eb, struct stksess, upd);
1000 msglen = peer_prepare_datamsg(ts, ps, trash, sizeof(trash));
1001 if (msglen) {
1002 /* message to buffer */
1003 repl = buffer_put_block(si->ib, trash, msglen);
1004 if (repl <= 0) {
1005 /* no more write possible */
1006 if (repl == -1)
1007 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +01001008 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001009 goto switchstate;
1010 }
1011 ps->lastpush = ps->pushed = ts->upd.key;
1012 }
1013 eb = eb32_next(eb);
1014 }
1015 } /* ! LEARN_ASSIGN */
1016 /* noting more to do */
1017 goto out;
1018 }
1019 case PEER_SESSION_EXIT:
Willy Tarreaubc4af052011-02-13 13:25:14 +01001020 repl = snprintf(trash, sizeof(trash), "%d\n", si->applet.st1);
Emeric Brun2b920a12010-09-23 18:30:22 +02001021
1022 if (buffer_put_block(si->ib, trash, repl) == -1)
1023 goto out;
Willy Tarreaubc4af052011-02-13 13:25:14 +01001024 si->applet.st0 = PEER_SESSION_END;
Emeric Brun2b920a12010-09-23 18:30:22 +02001025 /* fall through */
1026 case PEER_SESSION_END: {
1027 si->shutw(si);
1028 si->shutr(si);
1029 si->ib->flags |= BF_READ_NULL;
1030 goto quit;
1031 }
1032 }
1033 }
1034out:
1035 si->update(si);
1036 si->ob->flags |= BF_READ_DONTWAIT;
1037 /* we don't want to expire timeouts while we're processing requests */
1038 si->ib->rex = TICK_ETERNITY;
1039 si->ob->wex = TICK_ETERNITY;
1040quit:
1041 return;
1042}
1043
Willy Tarreaub24281b2011-02-13 13:16:36 +01001044static struct si_applet peer_applet = {
1045 .name = "<PEER>", /* used for logging */
1046 .fct = peer_io_handler,
1047};
Emeric Brun2b920a12010-09-23 18:30:22 +02001048
1049/*
1050 * Use this function to force a close of a peer session
1051 */
1052void peer_session_forceshutdown(struct session * session)
1053{
1054 struct stream_interface *oldsi;
1055
Willy Tarreau7c0a1512011-03-10 11:17:02 +01001056 if (session->si[0].target.type == TARG_TYPE_APPLET &&
1057 session->si[0].target.ptr.a == &peer_applet) {
Emeric Brun2b920a12010-09-23 18:30:22 +02001058 oldsi = &session->si[0];
1059 }
1060 else {
1061 oldsi = &session->si[1];
1062 }
1063
1064 /* call release to reinit resync states if needed */
1065 peer_session_release(oldsi);
Willy Tarreaubc4af052011-02-13 13:25:14 +01001066 oldsi->applet.st0 = PEER_SESSION_END;
1067 oldsi->applet.private = NULL;
Emeric Brun2b920a12010-09-23 18:30:22 +02001068 task_wakeup(session->task, TASK_WOKEN_MSG);
1069}
1070
1071/*
1072 * this function is called on a read event from a listen socket, corresponding
1073 * to an accept. It tries to accept as many connections as possible.
Willy Tarreaubd55e312010-11-11 10:55:09 +01001074 * It returns a positive value upon success, 0 if the connection needs to be
1075 * closed and ignored, or a negative value upon critical failure.
Emeric Brun2b920a12010-09-23 18:30:22 +02001076 */
1077int peer_accept(struct session *s)
1078{
1079 /* we have a dedicated I/O handler for the stats */
Willy Tarreaub24281b2011-02-13 13:16:36 +01001080 stream_int_register_handler(&s->si[1], &peer_applet);
Emeric Brun2b920a12010-09-23 18:30:22 +02001081 s->si[1].release = peer_session_release;
Willy Tarreaubc4af052011-02-13 13:25:14 +01001082 s->si[1].applet.private = s;
1083 s->si[1].applet.st0 = PEER_SESSION_ACCEPT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001084
1085 tv_zero(&s->logs.tv_request);
1086 s->logs.t_queue = 0;
1087 s->logs.t_connect = 0;
1088 s->logs.t_data = 0;
1089 s->logs.t_close = 0;
1090 s->logs.bytes_in = s->logs.bytes_out = 0;
1091 s->logs.prx_queue_size = 0;/* we get the number of pending conns before us */
1092 s->logs.srv_queue_size = 0; /* we will get this number soon */
1093
1094 s->req->flags |= BF_READ_DONTWAIT; /* we plan to read small requests */
1095
1096 if (s->listener->timeout) {
1097 s->req->rto = *s->listener->timeout;
1098 s->rep->wto = *s->listener->timeout;
1099 }
1100 return 1;
1101}
1102
1103/*
Willy Tarreaubd55e312010-11-11 10:55:09 +01001104 * Create a new peer session in assigned state (connect will start automatically)
Emeric Brun2b920a12010-09-23 18:30:22 +02001105 */
1106struct session *peer_session_create(struct peer *peer, struct peer_session *ps)
1107{
1108 struct listener *l = ((struct proxy *)peer->peers->peers_fe)->listen;
1109 struct proxy *p = (struct proxy *)l->frontend; /* attached frontend */
1110 struct session *s;
1111 struct http_txn *txn;
1112 struct task *t;
1113
1114 if ((s = pool_alloc2(pool2_session)) == NULL) { /* disable this proxy for a while */
1115 Alert("out of memory in event_accept().\n");
1116 p->state = PR_STIDLE;
1117 goto out_close;
1118 }
1119
1120 LIST_ADDQ(&sessions, &s->list);
1121 LIST_INIT(&s->back_refs);
1122
1123 s->flags = SN_ASSIGNED|SN_ADDR_SET;
1124 s->term_trace = 0;
1125
1126 /* if this session comes from a known monitoring system, we want to ignore
1127 * it as soon as possible, which means closing it immediately for TCP.
1128 */
1129 if ((t = task_new()) == NULL) { /* disable this proxy for a while */
1130 Alert("out of memory in event_accept().\n");
1131 p->state = PR_STIDLE;
1132 goto out_free_session;
1133 }
1134
1135 ps->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
1136 ps->statuscode = PEER_SESSION_CONNECTCODE;
1137
1138 t->process = l->handler;
1139 t->context = s;
1140 t->nice = l->nice;
1141
Willy Tarreau957c0a52011-03-03 17:42:23 +01001142 memcpy(&s->si[1].addr.s.to, &peer->addr, sizeof(s->si[1].addr.s.to));
Emeric Brun2b920a12010-09-23 18:30:22 +02001143 s->task = t;
1144 s->listener = l;
1145
1146 /* Note: initially, the session's backend points to the frontend.
1147 * This changes later when switching rules are executed or
1148 * when the default backend is assigned.
1149 */
1150 s->be = s->fe = p;
1151
1152 s->req = s->rep = NULL; /* will be allocated later */
1153
1154 s->si[0].fd = -1;
1155 s->si[0].owner = t;
1156 s->si[0].state = s->si[0].prev_state = SI_ST_EST;
1157 s->si[0].err_type = SI_ET_NONE;
1158 s->si[0].err_loc = NULL;
Willy Tarreauac825402011-03-04 22:04:29 +01001159 s->si[0].connect = NULL;
Willy Tarreau9e000c62011-03-10 14:03:36 +01001160 clear_target(&s->si[0].target);
Emeric Brun2b920a12010-09-23 18:30:22 +02001161 s->si[0].exp = TICK_ETERNITY;
1162 s->si[0].flags = SI_FL_NONE;
1163 if (s->fe->options2 & PR_O2_INDEPSTR)
1164 s->si[0].flags |= SI_FL_INDEP_STR;
Willy Tarreaubc4af052011-02-13 13:25:14 +01001165 s->si[0].applet.private = (void *)ps;
1166 s->si[0].applet.st0 = PEER_SESSION_CONNECT;
Emeric Brun2b920a12010-09-23 18:30:22 +02001167
Willy Tarreaub24281b2011-02-13 13:16:36 +01001168 stream_int_register_handler(&s->si[0], &peer_applet);
Emeric Brun2b920a12010-09-23 18:30:22 +02001169 s->si[0].release = peer_session_release;
1170
1171 s->si[1].fd = -1; /* just to help with debugging */
1172 s->si[1].owner = t;
1173 s->si[1].state = s->si[1].prev_state = SI_ST_ASS;
1174 s->si[1].conn_retries = p->conn_retries;
1175 s->si[1].err_type = SI_ET_NONE;
1176 s->si[1].err_loc = NULL;
David du Colombier6f5ccb12011-03-10 22:26:24 +01001177 s->si[1].connect = tcp_connect_server;
Willy Tarreau9e000c62011-03-10 14:03:36 +01001178 set_target_proxy(&s->si[1].target, s->be);
Emeric Brun2b920a12010-09-23 18:30:22 +02001179 s->si[1].exp = TICK_ETERNITY;
1180 s->si[1].flags = SI_FL_NONE;
1181 if (s->be->options2 & PR_O2_INDEPSTR)
1182 s->si[1].flags |= SI_FL_INDEP_STR;
1183
1184 stream_sock_prepare_interface(&s->si[1]);
1185 s->si[1].release = NULL;
1186
Willy Tarreau827aee92011-03-10 16:55:02 +01001187 s->srv_conn = NULL;
Willy Tarreau9e000c62011-03-10 14:03:36 +01001188 clear_target(&s->target);
Emeric Brun2b920a12010-09-23 18:30:22 +02001189 s->pend_pos = NULL;
1190
1191 /* init store persistence */
1192 s->store_count = 0;
1193 s->stkctr1_entry = NULL;
1194 s->stkctr2_entry = NULL;
1195
1196 /* FIXME: the logs are horribly complicated now, because they are
1197 * defined in <p>, <p>, and later <be> and <be>.
1198 */
1199
1200 s->logs.logwait = 0;
1201 s->do_log = NULL;
1202
1203 /* default error reporting function, may be changed by analysers */
1204 s->srv_error = default_srv_error;
1205
Emeric Brun2b920a12010-09-23 18:30:22 +02001206 s->uniq_id = 0;
1207
1208 txn = &s->txn;
1209 /* Those variables will be checked and freed if non-NULL in
1210 * session.c:session_free(). It is important that they are
1211 * properly initialized.
1212 */
1213 txn->sessid = NULL;
1214 txn->srv_cookie = NULL;
1215 txn->cli_cookie = NULL;
1216 txn->uri = NULL;
1217 txn->req.cap = NULL;
1218 txn->rsp.cap = NULL;
1219 txn->hdr_idx.v = NULL;
1220 txn->hdr_idx.size = txn->hdr_idx.used = 0;
1221
1222 if ((s->req = pool_alloc2(pool2_buffer)) == NULL)
1223 goto out_fail_req; /* no memory */
1224
1225 s->req->size = global.tune.bufsize;
1226 buffer_init(s->req);
1227 s->req->prod = &s->si[0];
1228 s->req->cons = &s->si[1];
1229 s->si[0].ib = s->si[1].ob = s->req;
1230
1231 s->req->flags |= BF_READ_ATTACHED; /* the producer is already connected */
1232
1233 /* activate default analysers enabled for this listener */
1234 s->req->analysers = l->analysers;
1235
1236 /* note: this should not happen anymore since there's always at least the switching rules */
1237 if (!s->req->analysers) {
1238 buffer_auto_connect(s->req);/* don't wait to establish connection */
1239 buffer_auto_close(s->req);/* let the producer forward close requests */
1240 }
1241
1242 s->req->rto = s->fe->timeout.client;
1243 s->req->wto = s->be->timeout.server;
1244
1245 if ((s->rep = pool_alloc2(pool2_buffer)) == NULL)
1246 goto out_fail_rep; /* no memory */
1247
1248 s->rep->size = global.tune.bufsize;
1249 buffer_init(s->rep);
1250 s->rep->prod = &s->si[1];
1251 s->rep->cons = &s->si[0];
1252 s->si[0].ob = s->si[1].ib = s->rep;
1253
1254 s->rep->rto = s->be->timeout.server;
1255 s->rep->wto = s->fe->timeout.client;
1256
1257 s->req->rex = TICK_ETERNITY;
1258 s->req->wex = TICK_ETERNITY;
1259 s->req->analyse_exp = TICK_ETERNITY;
1260 s->rep->rex = TICK_ETERNITY;
1261 s->rep->wex = TICK_ETERNITY;
1262 s->rep->analyse_exp = TICK_ETERNITY;
1263 t->expire = TICK_ETERNITY;
1264
1265 s->rep->flags |= BF_READ_DONTWAIT;
1266 /* it is important not to call the wakeup function directly but to
1267 * pass through task_wakeup(), because this one knows how to apply
1268 * priorities to tasks.
1269 */
1270 task_wakeup(t, TASK_WOKEN_INIT);
1271
1272 l->nbconn++; /* warning! right now, it's up to the handler to decrease this */
1273 p->feconn++;/* beconn will be increased later */
1274 jobs++;
1275 actconn++;
1276 totalconn++;
1277
1278 return s;
1279
1280 /* Error unrolling */
1281 out_fail_rep:
1282 pool_free2(pool2_buffer, s->req);
1283 out_fail_req:
1284 task_free(t);
1285 out_free_session:
1286 LIST_DEL(&s->list);
1287 pool_free2(pool2_session, s);
1288 out_close:
1289 return s;
1290}
1291
1292/*
1293 * Task processing function to manage re-connect and peer session
1294 * tasks wakeup on local update.
1295 */
1296struct task *process_peer_sync(struct task * task)
1297{
1298 struct shared_table *st = (struct shared_table *)task->context;
1299 struct peer_session *ps;
1300
1301 task->expire = TICK_ETERNITY;
1302
1303 if (!stopping) {
1304 /* Normal case (not soft stop)*/
1305 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMLOCAL) &&
1306 (!nb_oldpids || tick_is_expired(st->resync_timeout, now_ms)) &&
1307 !(st->flags & SHTABLE_F_RESYNC_ASSIGN)) {
1308 /* Resync from local peer needed
1309 no peer was assigned for the lesson
1310 and no old local peer found
1311 or resync timeout expire */
1312
1313 /* flag no more resync from local, to try resync from remotes */
1314 st->flags |= SHTABLE_F_RESYNC_LOCAL;
1315
1316 /* reschedule a resync */
1317 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1318 }
1319
1320 /* For each session */
1321 for (ps = st->sessions; ps; ps = ps->next) {
1322 /* For each remote peers */
1323 if (!ps->peer->local) {
1324 if (!ps->session) {
1325 /* no active session */
1326 if (ps->statuscode == 0 ||
1327 ps->statuscode == PEER_SESSION_SUCCESSCODE ||
1328 ((ps->statuscode == PEER_SESSION_CONNECTCODE ||
1329 ps->statuscode == PEER_SESSION_CONNECTEDCODE) &&
1330 tick_is_expired(ps->reconnect, now_ms))) {
1331 /* connection never tried
1332 * or previous session established with success
1333 * or previous session failed during connection
1334 * and reconnection timer is expired */
1335
1336 /* retry a connect */
1337 ps->session = peer_session_create(ps->peer, ps);
1338 }
1339 else if (ps->statuscode == PEER_SESSION_CONNECTCODE ||
1340 ps->statuscode == PEER_SESSION_CONNECTEDCODE) {
1341 /* If previous session failed during connection
1342 * but reconnection timer is not expired */
1343
1344 /* reschedule task for reconnect */
1345 task->expire = tick_first(task->expire, ps->reconnect);
1346 }
1347 /* else do nothing */
1348 } /* !ps->session */
1349 else if (ps->statuscode == PEER_SESSION_SUCCESSCODE) {
1350 /* current session is active and established */
1351 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1352 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1353 !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
1354 /* Resync from a remote is needed
1355 * and no peer was assigned for lesson
1356 * and current peer may be up2date */
1357
1358 /* assign peer for the lesson */
1359 ps->flags |= PEER_F_LEARN_ASSIGN;
1360 st->flags |= SHTABLE_F_RESYNC_ASSIGN;
1361
1362 /* awake peer session task to handle a request of resync */
1363 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1364 }
1365 else if ((int)(ps->pushed - ps->table->table->localupdate) < 0) {
1366 /* awake peer session task to push local updates */
1367 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1368 }
1369 /* else do nothing */
1370 } /* SUCCESSCODE */
1371 } /* !ps->peer->local */
1372 } /* for */
1373
1374 /* Resync from remotes expired: consider resync is finished */
1375 if (((st->flags & SHTABLE_RESYNC_STATEMASK) == SHTABLE_RESYNC_FROMREMOTE) &&
1376 !(st->flags & SHTABLE_F_RESYNC_ASSIGN) &&
1377 tick_is_expired(st->resync_timeout, now_ms)) {
1378 /* Resync from remote peer needed
1379 * no peer was assigned for the lesson
1380 * and resync timeout expire */
1381
1382 /* flag no more resync from remote, consider resync is finished */
1383 st->flags |= SHTABLE_F_RESYNC_REMOTE;
1384 }
1385
1386 if ((st->flags & SHTABLE_RESYNC_STATEMASK) != SHTABLE_RESYNC_FINISHED) {
1387 /* Resync not finished*/
1388 /* reschedule task to resync timeout, to ended resync if needed */
1389 task->expire = tick_first(task->expire, st->resync_timeout);
1390 }
1391 } /* !stopping */
1392 else {
1393 /* soft stop case */
1394 if (task->state & TASK_WOKEN_SIGNAL) {
1395 /* We've just recieved the signal */
1396 if (!(st->flags & SHTABLE_F_DONOTSTOP)) {
1397 /* add DO NOT STOP flag if not present */
1398 jobs++;
1399 st->flags |= SHTABLE_F_DONOTSTOP;
1400 }
1401
1402 /* disconnect all connected peers */
1403 for (ps = st->sessions; ps; ps = ps->next) {
1404 if (ps->session) {
1405 peer_session_forceshutdown(ps->session);
1406 ps->session = NULL;
1407 }
1408 }
1409 }
1410 ps = st->local_session;
1411
1412 if (ps->flags & PEER_F_TEACH_COMPLETE) {
1413 if (st->flags & SHTABLE_F_DONOTSTOP) {
1414 /* resync of new process was complete, current process can die now */
1415 jobs--;
1416 st->flags &= ~SHTABLE_F_DONOTSTOP;
1417 }
1418 }
1419 else if (!ps->session) {
1420 /* If session is not active */
1421 if (ps->statuscode == 0 ||
1422 ps->statuscode == PEER_SESSION_SUCCESSCODE ||
1423 ps->statuscode == PEER_SESSION_CONNECTEDCODE ||
1424 ps->statuscode == PEER_SESSION_TRYAGAIN) {
1425 /* connection never tried
1426 * or previous session was successfully established
1427 * or previous session tcp connect success but init state incomplete
1428 * or during previous connect, peer replies a try again statuscode */
1429
1430 /* connect to the peer */
1431 ps->session = peer_session_create(ps->peer, ps);
1432 }
1433 else {
1434 /* Other error cases */
1435 if (st->flags & SHTABLE_F_DONOTSTOP) {
1436 /* unable to resync new process, current process can die now */
1437 jobs--;
1438 st->flags &= ~SHTABLE_F_DONOTSTOP;
1439 }
1440 }
1441 }
1442 else if (ps->statuscode == PEER_SESSION_SUCCESSCODE &&
1443 (int)(ps->pushed - ps->table->table->localupdate) < 0) {
1444 /* current session active and established
1445 awake session to push remaining local updates */
1446 task_wakeup(ps->session->task, TASK_WOKEN_MSG);
1447 }
1448 } /* stopping */
1449 /* Wakeup for re-connect */
1450 return task;
1451}
1452
1453/*
1454 * Function used to register a table for sync on a group of peers
1455 *
1456 */
1457void peers_register_table(struct peers *peers, struct stktable *table)
1458{
1459 struct shared_table *st;
1460 struct peer * curpeer;
1461 struct peer_session *ps;
1462
1463 st = (struct shared_table *)calloc(1,sizeof(struct shared_table));
1464 st->table = table;
1465 st->next = peers->tables;
1466 st->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
1467 peers->tables = st;
1468
1469 for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
1470 ps = (struct peer_session *)calloc(1,sizeof(struct peer_session));
1471 ps->table = st;
1472 ps->peer = curpeer;
1473 if (curpeer->local)
1474 st->local_session = ps;
1475 ps->next = st->sessions;
1476 ps->reconnect = now_ms;
1477 st->sessions = ps;
1478 peers->peers_fe->maxconn += 3;
1479 }
1480
1481 peers->peers_fe->listen->maxconn = peers->peers_fe->maxconn;
1482 st->sync_task = task_new();
1483 st->sync_task->process = process_peer_sync;
1484 st->sync_task->expire = TICK_ETERNITY;
1485 st->sync_task->context = (void *)st;
1486 table->sync_task =st->sync_task;
1487 signal_register_task(0, table->sync_task, 0);
1488 task_wakeup(st->sync_task, TASK_WOKEN_INIT);
1489}
1490