blob: b3f2ef3a370032243df889ebe488e5323357c809 [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
33#include <proto/frontend.h>
34#include <proto/log.h>
35#include <proto/proto_http.h>
36#include <proto/proxy.h>
37#include <proto/sample.h>
38#include <proto/session.h>
39#include <proto/signal.h>
40#include <proto/stream.h>
41#include <proto/stream_interface.h>
42#include <proto/task.h>
43#include <proto/vars.h>
44
45#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
46#define SPOE_PRINTF(x...) fprintf(x)
47#else
48#define SPOE_PRINTF(x...)
49#endif
50
51/* Helper to get ctx inside an appctx */
52#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
53
54/* TODO: add an option to customize these values */
55/* The maximum number of new applet waiting the end of the hello handshake */
56#define MAX_NEW_SPOE_APPLETS 5
57
58/* The maximum number of error when a stream is waiting of a SPOE applet */
59#define MAX_NEW_SPOE_APPLET_ERRS 3
60
61/* Minimal size for a frame */
62#define MIN_FRAME_SIZE 256
63
64/* Flags set on the SPOE context */
65#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
66#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
67#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
68#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
69
70#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
71
72#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
73#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
74
75/* All possible states for a SPOE context */
76enum spoe_ctx_state {
77 SPOE_CTX_ST_NONE = 0,
78 SPOE_CTX_ST_READY,
79 SPOE_CTX_ST_SENDING_MSGS,
80 SPOE_CTX_ST_WAITING_ACK,
81 SPOE_CTX_ST_DONE,
82 SPOE_CTX_ST_ERROR,
83};
84
85/* All possible states for a SPOE applet */
86enum spoe_appctx_state {
87 SPOE_APPCTX_ST_CONNECT = 0,
88 SPOE_APPCTX_ST_CONNECTING,
89 SPOE_APPCTX_ST_PROCESSING,
90 SPOE_APPCTX_ST_DISCONNECT,
91 SPOE_APPCTX_ST_DISCONNECTING,
92 SPOE_APPCTX_ST_EXIT,
93 SPOE_APPCTX_ST_END,
94};
95
96/* All supported SPOE actions */
97enum spoe_action_type {
98 SPOE_ACT_T_SET_VAR = 1,
99 SPOE_ACT_T_UNSET_VAR,
100 SPOE_ACT_TYPES,
101};
102
103/* All supported SPOE events */
104enum spoe_event {
105 SPOE_EV_NONE = 0,
106
107 /* Request events */
108 SPOE_EV_ON_CLIENT_SESS = 1,
109 SPOE_EV_ON_TCP_REQ_FE,
110 SPOE_EV_ON_TCP_REQ_BE,
111 SPOE_EV_ON_HTTP_REQ_FE,
112 SPOE_EV_ON_HTTP_REQ_BE,
113
114 /* Response events */
115 SPOE_EV_ON_SERVER_SESS,
116 SPOE_EV_ON_TCP_RSP,
117 SPOE_EV_ON_HTTP_RSP,
118
119 SPOE_EV_EVENTS
120};
121
122/* Errors triggerd by SPOE applet */
123enum spoe_frame_error {
124 SPOE_FRM_ERR_NONE = 0,
125 SPOE_FRM_ERR_IO,
126 SPOE_FRM_ERR_TOUT,
127 SPOE_FRM_ERR_TOO_BIG,
128 SPOE_FRM_ERR_INVALID,
129 SPOE_FRM_ERR_NO_VSN,
130 SPOE_FRM_ERR_NO_FRAME_SIZE,
131 SPOE_FRM_ERR_NO_CAP,
132 SPOE_FRM_ERR_BAD_VSN,
133 SPOE_FRM_ERR_BAD_FRAME_SIZE,
134 SPOE_FRM_ERR_UNKNOWN = 99,
135 SPOE_FRM_ERRS,
136};
137
138/* Scopes used for variables set by agents. It is a way to be agnotic to vars
139 * scope. */
140enum spoe_vars_scope {
141 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
142 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
143 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
144 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
145 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
146};
147
148
149/* Describe an argument that will be linked to a message. It is a sample fetch,
150 * with an optional name. */
151struct spoe_arg {
152 char *name; /* Name of the argument, may be NULL */
153 unsigned int name_len; /* The name length, 0 if NULL */
154 struct sample_expr *expr; /* Sample expression */
155 struct list list; /* Used to chain SPOE args */
156};
157
158/* Used during the config parsing only because, when a SPOE agent section is
159 * parsed, messages can be undefined. */
160struct spoe_msg_placeholder {
161 char *id; /* SPOE message placeholder id */
162 struct list list; /* Use to chain SPOE message placeholders */
163};
164
165/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
166 * an argument list (see above) and it is linked to a specific event. */
167struct spoe_message {
168 char *id; /* SPOE message id */
169 unsigned int id_len; /* The message id length */
170 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
171 struct {
172 char *file; /* file where the SPOE message appears */
173 int line; /* line where the SPOE message appears */
174 } conf; /* config information */
175 struct list args; /* Arguments added when the SPOE messages is sent */
176 struct list list; /* Used to chain SPOE messages */
177
178 enum spoe_event event; /* SPOE_EV_* */
179};
180
181/* Describe a SPOE agent. */
182struct spoe_agent {
183 char *id; /* SPOE agent id (name) */
184 struct {
185 char *file; /* file where the SPOE agent appears */
186 int line; /* line where the SPOE agent appears */
187 } conf; /* config information */
188 union {
189 struct proxy *be; /* Backend used by this agent */
190 char *name; /* Backend name used during conf parsing */
191 } b;
192 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100193 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
194 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
195 unsigned int ack; /* Max time to acknowledge a NOTIFY frame (in SPOE applet)*/
196 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200197 } timeout;
198
199 char *var_pfx; /* Prefix used for vars set by the agent */
200
201 struct list cache; /* List used to cache SPOE streams. In
202 * fact, we cache the SPOE applect ctx */
203
204 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
205 * for each supported events */
206
207 struct list applet_wq; /* List of streams waiting for a SPOE applet */
208 unsigned int new_applets; /* The number of new SPOE applets */
209};
210
211/* SPOE filter configuration */
212struct spoe_config {
213 struct proxy *proxy; /* Proxy owning the filter */
214 struct spoe_agent *agent; /* Agent used by this filter */
215 struct proxy agent_fe; /* Agent frontend */
216};
217
218/* SPOE context attached to a stream. It is the main structure that handles the
219 * processing offload */
220struct spoe_context {
221 struct filter *filter; /* The SPOE filter */
222 struct stream *strm; /* The stream that should be offloaded */
223 struct appctx *appctx; /* The SPOE appctx */
224 struct list *messages; /* List of messages that will be sent during the stream processing */
225 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
226 struct list buffer_wait; /* position in the list of streams waiting for a buffer */
227 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
228
229 unsigned int errs; /* The number of errors to acquire a SPOE applet */
230
231 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
232 unsigned int flags; /* SPOE_CTX_FL_* */
233
234 unsigned int stream_id; /* stream_id and frame_id are used */
235 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100236 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200237};
238
239/* Set if the handle on SIGUSR1 is registered */
240static int sighandler_registered = 0;
241
242/* proxy used during the parsing */
243struct proxy *curproxy = NULL;
244
245/* The name of the SPOE engine, used during the parsing */
246char *curengine = NULL;
247
248/* SPOE agent used during the parsing */
249struct spoe_agent *curagent = NULL;
250
251/* SPOE message used during the parsing */
252struct spoe_message *curmsg = NULL;
253
254/* list of SPOE messages and placeholders used during the parsing */
255struct list curmsgs;
256struct list curmps;
257
258/* Pool used to allocate new SPOE contexts */
259static struct pool_head *pool2_spoe_ctx = NULL;
260
261/* Temporary variables used to ease error processing */
262int spoe_status_code = SPOE_FRM_ERR_NONE;
263char spoe_reason[256];
264
265struct flt_ops spoe_ops;
266
267static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
268static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
269static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
270
271/********************************************************************
272 * helper functions/globals
273 ********************************************************************/
274static void
275release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
276{
277 if (!mp)
278 return;
279 free(mp->id);
280 free(mp);
281}
282
283
284static void
285release_spoe_message(struct spoe_message *msg)
286{
287 struct spoe_arg *arg, *back;
288
289 if (!msg)
290 return;
291 free(msg->id);
292 free(msg->conf.file);
293 list_for_each_entry_safe(arg, back, &msg->args, list) {
294 release_sample_expr(arg->expr);
295 free(arg->name);
296 LIST_DEL(&arg->list);
297 free(arg);
298 }
299 free(msg);
300}
301
302static void
303release_spoe_agent(struct spoe_agent *agent)
304{
305 struct spoe_message *msg, *back;
306 int i;
307
308 if (!agent)
309 return;
310 free(agent->id);
311 free(agent->conf.file);
312 free(agent->var_pfx);
313 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
314 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
315 LIST_DEL(&msg->list);
316 release_spoe_message(msg);
317 }
318 }
319 free(agent);
320}
321
322static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
323 [SPOE_FRM_ERR_NONE] = "normal",
324 [SPOE_FRM_ERR_IO] = "I/O error",
325 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
326 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
327 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
328 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
329 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
330 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
331 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
332 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
333 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
334};
335
336static const char *spoe_event_str[SPOE_EV_EVENTS] = {
337 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
338 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
339 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
340 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
341 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
342
343 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
344 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
345 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
346};
347
348
349#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
350
351static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
352 [SPOE_CTX_ST_NONE] = "NONE",
353 [SPOE_CTX_ST_READY] = "READY",
354 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
355 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
356 [SPOE_CTX_ST_DONE] = "DONE",
357 [SPOE_CTX_ST_ERROR] = "ERROR",
358};
359
360static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
361 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
362 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
363 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
364 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
365 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
366 [SPOE_APPCTX_ST_EXIT] = "EXIT",
367 [SPOE_APPCTX_ST_END] = "END",
368};
369
370#endif
371/********************************************************************
372 * Functions that encode/decode SPOE frames
373 ********************************************************************/
374/* Frame Types sent by HAProxy and by agents */
375enum spoe_frame_type {
376 /* Frames sent by HAProxy */
377 SPOE_FRM_T_HAPROXY_HELLO = 1,
378 SPOE_FRM_T_HAPROXY_DISCON,
379 SPOE_FRM_T_HAPROXY_NOTIFY,
380
381 /* Frames sent by the agents */
382 SPOE_FRM_T_AGENT_HELLO = 101,
383 SPOE_FRM_T_AGENT_DISCON,
384 SPOE_FRM_T_AGENT_ACK
385};
386
387/* All supported data types */
388enum spoe_data_type {
389 SPOE_DATA_T_NULL = 0,
390 SPOE_DATA_T_BOOL,
391 SPOE_DATA_T_INT32,
392 SPOE_DATA_T_UINT32,
393 SPOE_DATA_T_INT64,
394 SPOE_DATA_T_UINT64,
395 SPOE_DATA_T_IPV4,
396 SPOE_DATA_T_IPV6,
397 SPOE_DATA_T_STR,
398 SPOE_DATA_T_BIN,
399 SPOE_DATA_TYPES
400};
401
402/* Masks to get data type or flags value */
403#define SPOE_DATA_T_MASK 0x0F
404#define SPOE_DATA_FL_MASK 0xF0
405
406/* Flags to set Boolean values */
407#define SPOE_DATA_FL_FALSE 0x00
408#define SPOE_DATA_FL_TRUE 0x10
409
410/* Helper to get static string length, excluding the terminating null byte */
411#define SLEN(str) (sizeof(str)-1)
412
413/* Predefined key used in HELLO/DISCONNECT frames */
414#define SUPPORTED_VERSIONS_KEY "supported-versions"
415#define VERSION_KEY "version"
416#define MAX_FRAME_SIZE_KEY "max-frame-size"
417#define CAPABILITIES_KEY "capabilities"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100418#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200419#define STATUS_CODE_KEY "status-code"
420#define MSG_KEY "message"
421
422struct spoe_version {
423 char *str;
424 int min;
425 int max;
426};
427
428/* All supported versions */
429static struct spoe_version supported_versions[] = {
430 {"1.0", 1000, 1000},
431 {NULL, 0, 0}
432};
433
434/* Comma-separated list of supported versions */
435#define SUPPORTED_VERSIONS_VAL "1.0"
436
437/* Comma-separated list of supported capabilities (none for now) */
438#define CAPABILITIES_VAL ""
439
440static int
441decode_spoe_version(const char *str, size_t len)
442{
443 char tmp[len+1], *start, *end;
444 double d;
445 int vsn = -1;
446
447 memset(tmp, 0, len+1);
448 memcpy(tmp, str, len);
449
450 start = tmp;
451 while (isspace(*start))
452 start++;
453
454 d = strtod(start, &end);
455 if (d == 0 || start == end)
456 goto out;
457
458 if (*end) {
459 while (isspace(*end))
460 end++;
461 if (*end)
462 goto out;
463 }
464 vsn = (int)(d * 1000);
465 out:
466 return vsn;
467}
468
469/* Encode a variable-length integer. This function never fails and returns the
470 * number of written bytes. */
471static int
472encode_spoe_varint(uint64_t i, char *buf)
473{
474 int idx;
475
476 if (i < 240) {
477 buf[0] = (unsigned char)i;
478 return 1;
479 }
480
481 buf[0] = (unsigned char)i | 240;
482 i = (i - 240) >> 4;
483 for (idx = 1; i >= 128; ++idx) {
484 buf[idx] = (unsigned char)i | 128;
485 i = (i - 128) >> 7;
486 }
487 buf[idx++] = (unsigned char)i;
488 return idx;
489}
490
491/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
492 * happens when the buffer's end in reached. On success, the number of read
493 * bytes is returned. */
494static int
495decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
496{
497 unsigned char *msg = (unsigned char *)buf;
498 int idx = 0;
499
500 if (msg > (unsigned char *)end)
501 return -1;
502
503 if (msg[0] < 240) {
504 *i = msg[0];
505 return 1;
506 }
507 *i = msg[0];
508 do {
509 ++idx;
510 if (msg+idx > (unsigned char *)end)
511 return -1;
512 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
513 } while (msg[idx] >= 128);
514 return (idx + 1);
515}
516
517/* Encode a string. The string will be prefix by its length, encoded as a
518 * variable-length integer. This function never fails and returns the number of
519 * written bytes. */
520static int
521encode_spoe_string(const char *str, size_t len, char *dst)
522{
523 int idx = 0;
524
525 if (!len) {
526 dst[0] = 0;
527 return 1;
528 }
529
530 idx += encode_spoe_varint(len, dst);
531 memcpy(dst+idx, str, len);
532 return (idx + len);
533}
534
535/* Decode a string. Its length is decoded first as a variable-length integer. If
536 * it succeeds, and if the string length is valid, the begin of the string is
537 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
538 * read is returned. If an error occurred, -1 is returned and <*str> remains
539 * NULL. */
540static int
541decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
542{
543 int i, idx = 0;
544
545 *str = NULL;
546 *len = 0;
547
548 if ((i = decode_spoe_varint(buf, end, len)) == -1)
549 goto error;
550 idx += i;
551 if (buf + idx + *len > end)
552 goto error;
553
554 *str = buf+idx;
555 return (idx + *len);
556
557 error:
558 return -1;
559}
560
561/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
562 * of bytes read is returned. A types data is composed of a type (1 byte) and
563 * corresponding data:
564 * - boolean: non additional data (0 bytes)
565 * - integers: a variable-length integer (see decode_spoe_varint)
566 * - ipv4: 4 bytes
567 * - ipv6: 16 bytes
568 * - binary and string: a buffer prefixed by its size, a variable-length
569 * integer (see decode_spoe_string) */
570static int
571skip_spoe_data(char *frame, char *end)
572{
573 uint64_t sz = 0;
574 int i, idx = 0;
575
576 if (frame > end)
577 return -1;
578
579 switch (frame[idx++] & SPOE_DATA_T_MASK) {
580 case SPOE_DATA_T_BOOL:
581 break;
582 case SPOE_DATA_T_INT32:
583 case SPOE_DATA_T_INT64:
584 case SPOE_DATA_T_UINT32:
585 case SPOE_DATA_T_UINT64:
586 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
587 return -1;
588 idx += i;
589 break;
590 case SPOE_DATA_T_IPV4:
591 idx += 4;
592 break;
593 case SPOE_DATA_T_IPV6:
594 idx += 16;
595 break;
596 case SPOE_DATA_T_STR:
597 case SPOE_DATA_T_BIN:
598 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
599 return -1;
600 idx += i + sz;
601 break;
602 }
603
604 if (frame+idx > end)
605 return -1;
606 return idx;
607}
608
609/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
610 * number of read bytes is returned. See skip_spoe_data for details. */
611static int
612decode_spoe_data(char *frame, char *end, struct sample *smp)
613{
614 uint64_t sz = 0;
615 int type, i, idx = 0;
616
617 if (frame > end)
618 return -1;
619
620 type = frame[idx++];
621 switch (type & SPOE_DATA_T_MASK) {
622 case SPOE_DATA_T_BOOL:
623 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
624 smp->data.type = SMP_T_BOOL;
625 break;
626 case SPOE_DATA_T_INT32:
627 case SPOE_DATA_T_INT64:
628 case SPOE_DATA_T_UINT32:
629 case SPOE_DATA_T_UINT64:
630 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
631 return -1;
632 idx += i;
633 smp->data.type = SMP_T_SINT;
634 break;
635 case SPOE_DATA_T_IPV4:
636 if (frame+idx+4 > end)
637 return -1;
638 memcpy(&smp->data.u.ipv4, frame+idx, 4);
639 smp->data.type = SMP_T_IPV4;
640 idx += 4;
641 break;
642 case SPOE_DATA_T_IPV6:
643 if (frame+idx+16 > end)
644 return -1;
645 memcpy(&smp->data.u.ipv6, frame+idx, 16);
646 smp->data.type = SMP_T_IPV6;
647 idx += 16;
648 break;
649 case SPOE_DATA_T_STR:
650 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
651 return -1;
652 idx += i;
653 if (frame+idx+sz > end)
654 return -1;
655 smp->data.u.str.str = frame+idx;
656 smp->data.u.str.len = sz;
657 smp->data.type = SMP_T_STR;
658 idx += sz;
659 break;
660 case SPOE_DATA_T_BIN:
661 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
662 return -1;
663 idx += i;
664 if (frame+idx+sz > end)
665 return -1;
666 smp->data.u.str.str = frame+idx;
667 smp->data.u.str.len = sz;
668 smp->data.type = SMP_T_BIN;
669 idx += sz;
670 break;
671 }
672
673 if (frame+idx > end)
674 return -1;
675 return idx;
676}
677
678/* Skip an action in a frame received from an agent. If an error occurred, -1 is
679 * returned, otherwise the number of read bytes is returned. An action is
680 * composed of the action type followed by a typed data. */
681static int
682skip_spoe_action(char *frame, char *end)
683{
684 int n, i, idx = 0;
685
686 if (frame+2 > end)
687 return -1;
688
689 idx++; /* Skip the action type */
690 n = frame[idx++];
691 while (n-- > 0) {
692 if ((i = skip_spoe_data(frame+idx, end)) == -1)
693 return -1;
694 idx += i;
695 }
696
697 if (frame+idx > end)
698 return -1;
699 return idx;
700}
701
702/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
703 * success, 0 if the frame can be ignored and -1 if an error occurred. */
704static int
705prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
706{
707 int idx = 0;
708 size_t max = (7 /* TYPE + METADATA */
709 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
710 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
711 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
712
713 if (size < max)
714 return -1;
715
716 /* Frame type */
717 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
718
719 /* No flags for now */
720 memset(frame+idx, 0, 4);
721 idx += 4;
722
723 /* No stream-id and frame-id for HELLO frames */
724 frame[idx++] = 0;
725 frame[idx++] = 0;
726
727 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
728 * and "capabilities" */
729
730 /* "supported-versions" K/V item */
731 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
732 frame[idx++] = SPOE_DATA_T_STR;
733 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
734
735 /* "max-fram-size" K/V item */
736 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
737 frame[idx++] = SPOE_DATA_T_UINT32;
738 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
739
740 /* "capabilities" K/V item */
741 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
742 frame[idx++] = SPOE_DATA_T_STR;
743 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
744
745 return idx;
746}
747
748/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
749 * size on success, 0 if the frame can be ignored and -1 if an error
750 * occurred. */
751static int
752prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
753{
754 const char *reason;
755 int rlen, idx = 0;
756 size_t max = (7 /* TYPE + METADATA */
757 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
758 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
759
760 if (size < max)
761 return -1;
762
763 /* Get the message corresponding to the status code */
764 if (spoe_status_code >= SPOE_FRM_ERRS)
765 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
766 reason = spoe_frm_err_reasons[spoe_status_code];
767 rlen = strlen(reason);
768
769 /* Frame type */
770 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
771
772 /* No flags for now */
773 memset(frame+idx, 0, 4);
774 idx += 4;
775
776 /* No stream-id and frame-id for DISCONNECT frames */
777 frame[idx++] = 0;
778 frame[idx++] = 0;
779
780 /* There are 2 mandatory items: "status-code" and "message" */
781
782 /* "status-code" K/V item */
783 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
784 frame[idx++] = SPOE_DATA_T_UINT32;
785 idx += encode_spoe_varint(spoe_status_code, frame+idx);
786
787 /* "message" K/V item */
788 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
789 frame[idx++] = SPOE_DATA_T_STR;
790 idx += encode_spoe_string(reason, rlen, frame+idx);
791
792 return idx;
793}
794
795/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
796 * success, 0 if the frame can be ignored and -1 if an error occurred. */
797static int
798prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
799{
800 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
801 int idx = 0;
802
803 if (size < APPCTX_SPOE(appctx).max_frame_size)
804 return -1;
805
806 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
807
808 /* No flags for now */
809 memset(frame+idx, 0, 4);
810 idx += 4;
811
812 /* Set stream-id and frame-id */
813 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
814 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
815
816 /* Copy encoded messages */
817 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
818 idx += ctx->buffer->i;
819
820 return idx;
821}
822
823/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
824 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
825static int
826handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
827{
828 int vsn, max_frame_size;
829 int i, idx = 0;
830 size_t min_size = (7 /* TYPE + METADATA */
831 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
832 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
833 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
834
835 /* Check frame type */
836 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
837 return 0;
838
839 if (size < min_size) {
840 spoe_status_code = SPOE_FRM_ERR_INVALID;
841 return -1;
842 }
843
844 /* Skip flags: fragmentation is not supported for now */
845 idx += 4;
846
847 /* stream-id and frame-id must be cleared */
848 if (frame[idx] != 0 || frame[idx+1] != 0) {
849 spoe_status_code = SPOE_FRM_ERR_INVALID;
850 return -1;
851 }
852 idx += 2;
853
854 /* There are 3 mandatory items: "version", "max-frame-size" and
855 * "capabilities" */
856
857 /* Loop on K/V items */
858 vsn = max_frame_size = 0;
859 while (idx < size) {
860 char *str;
861 uint64_t sz;
862
863 /* Decode the item key */
864 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
865 if (str == NULL) {
866 spoe_status_code = SPOE_FRM_ERR_INVALID;
867 return -1;
868 }
869 /* Check "version" K/V item */
870 if (!memcmp(str, VERSION_KEY, sz)) {
871 /* The value must be a string */
872 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
873 spoe_status_code = SPOE_FRM_ERR_INVALID;
874 return -1;
875 }
876 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
877 if (str == NULL) {
878 spoe_status_code = SPOE_FRM_ERR_INVALID;
879 return -1;
880 }
881
882 vsn = decode_spoe_version(str, sz);
883 if (vsn == -1) {
884 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
885 return -1;
886 }
887 for (i = 0; supported_versions[i].str != NULL; ++i) {
888 if (vsn >= supported_versions[i].min &&
889 vsn <= supported_versions[i].max)
890 break;
891 }
892 if (supported_versions[i].str == NULL) {
893 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
894 return -1;
895 }
896 }
897 /* Check "max-frame-size" K/V item */
898 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
899 int type;
900
901 /* The value must be integer */
902 type = frame[idx++];
903 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
904 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
905 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
906 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
907 spoe_status_code = SPOE_FRM_ERR_INVALID;
908 return -1;
909 }
910 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
911 spoe_status_code = SPOE_FRM_ERR_INVALID;
912 return -1;
913 }
914 idx += i;
915 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
916 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
917 return -1;
918 }
919 max_frame_size = sz;
920 }
921 /* Skip "capabilities" K/V item for now */
922 else {
923 /* Silently ignore unknown item */
924 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
925 spoe_status_code = SPOE_FRM_ERR_INVALID;
926 return -1;
927 }
928 idx += i;
929 }
930 }
931
932 /* Final checks */
933 if (!vsn) {
934 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
935 return -1;
936 }
937 if (!max_frame_size) {
938 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
939 return -1;
940 }
941
942 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
943 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
944 return idx;
945}
946
947/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
948 * bytes on success, 0 if the frame can be ignored and -1 if an error
949 * occurred. */
950static int
951handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
952{
953 int i, idx = 0;
954 size_t min_size = (7 /* TYPE + METADATA */
955 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
956 + 1 + SLEN(MSG_KEY) + 1 + 1);
957
958 /* Check frame type */
959 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
960 return 0;
961
962 if (size < min_size) {
963 spoe_status_code = SPOE_FRM_ERR_INVALID;
964 return -1;
965 }
966
967 /* Skip flags: fragmentation is not supported for now */
968 idx += 4;
969
970 /* stream-id and frame-id must be cleared */
971 if (frame[idx] != 0 || frame[idx+1] != 0) {
972 spoe_status_code = SPOE_FRM_ERR_INVALID;
973 return -1;
974 }
975 idx += 2;
976
977 /* There are 2 mandatory items: "status-code" and "message" */
978
979 /* Loop on K/V items */
980 while (idx < size) {
981 char *str;
982 uint64_t sz;
983
984 /* Decode the item key */
985 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
986 if (str == NULL) {
987 spoe_status_code = SPOE_FRM_ERR_INVALID;
988 return -1;
989 }
990
991 /* Check "status-code" K/V item */
992 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
993 int type;
994
995 /* The value must be an integer */
996 type = frame[idx++];
997 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
998 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
999 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1000 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1001 spoe_status_code = SPOE_FRM_ERR_INVALID;
1002 return -1;
1003 }
1004 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1005 spoe_status_code = SPOE_FRM_ERR_INVALID;
1006 return -1;
1007 }
1008 idx += i;
1009 spoe_status_code = sz;
1010 }
1011
1012 /* Check "message" K/V item */
1013 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1014 /* The value must be a string */
1015 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1016 spoe_status_code = SPOE_FRM_ERR_INVALID;
1017 return -1;
1018 }
1019 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1020 if (str == NULL || sz > 255) {
1021 spoe_status_code = SPOE_FRM_ERR_INVALID;
1022 return -1;
1023 }
1024 memcpy(spoe_reason, str, sz);
1025 spoe_reason[sz] = 0;
1026 }
1027 else {
1028 /* Silently ignore unknown item */
1029 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1030 spoe_status_code = SPOE_FRM_ERR_INVALID;
1031 return -1;
1032 }
1033 idx += i;
1034 }
1035 }
1036
1037 return idx;
1038}
1039
1040
1041/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1042 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1043static int
1044handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1045{
1046 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1047 uint64_t stream_id, frame_id;
1048 int idx = 0;
1049 size_t min_size = (7 /* TYPE + METADATA */);
1050
1051 /* Check frame type */
1052 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1053 return 0;
1054
1055 if (size < min_size) {
1056 spoe_status_code = SPOE_FRM_ERR_INVALID;
1057 return -1;
1058 }
1059
1060 /* Skip flags: fragmentation is not supported for now */
1061 idx += 4;
1062
1063 /* Get the stream-id and the frame-id */
1064 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1065 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1066
1067 /* Check stream-id and frame-id */
1068 if (ctx->stream_id != (unsigned int)stream_id ||
1069 ctx->frame_id != (unsigned int)frame_id)
1070 return 0;
1071
1072 /* Copy encoded actions */
1073 b_reset(ctx->buffer);
1074 memcpy(ctx->buffer->p, frame+idx, size-idx);
1075 ctx->buffer->i = size-idx;
1076
1077 return idx;
1078}
1079
Christopher Fauletba7bc162016-11-07 21:07:38 +01001080/* This function is used in cfgparse.c and declared in proto/checks.h. It
1081 * prepare the request to send to agents during a healthcheck. It returns 0 on
1082 * success and -1 if an error occurred. */
1083int
1084prepare_spoe_healthcheck_request(char **req, int *len)
1085{
1086 struct appctx a;
1087 char *frame, buf[global.tune.bufsize];
1088 unsigned int framesz;
1089 int idx;
1090
1091 memset(&a, 0, sizeof(a));
1092 memset(buf, 0, sizeof(buf));
1093 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1094
1095 frame = buf+4;
1096 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1097 if (idx <= 0)
1098 return -1;
1099 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1100 return -1;
1101
1102 /* "healthcheck" K/V item */
1103 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1104 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1105
1106 framesz = htonl(idx);
1107 memcpy(buf, (char *)&framesz, 4);
1108
1109 if ((*req = malloc(idx+4)) == NULL)
1110 return -1;
1111 memcpy(*req, buf, idx+4);
1112 *len = idx+4;
1113 return 0;
1114}
1115
1116/* This function is used in checks.c and declared in proto/checks.h. It decode
1117 * the response received from an agent during a healthcheck. It returns 0 on
1118 * success and -1 if an error occurred. */
1119int
1120handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1121{
1122 struct appctx a;
1123 int r;
1124
1125 memset(&a, 0, sizeof(a));
1126 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1127
1128 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1129 goto error;
1130 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1131 if (r == 0)
1132 spoe_status_code = SPOE_FRM_ERR_INVALID;
1133 goto error;
1134 }
1135
1136 return 0;
1137
1138 error:
1139 if (spoe_status_code >= SPOE_FRM_ERRS)
1140 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1141 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1142 return -1;
1143}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001144
1145/********************************************************************
1146 * Functions that manage the SPOE applet
1147 ********************************************************************/
1148/* Callback function that catches applet timeouts. If a timeout occurred, we set
1149 * <appctx->st1> flag and the SPOE applet is woken up. */
1150static struct task *
1151process_spoe_applet(struct task * task)
1152{
1153 struct appctx *appctx = task->context;
1154
1155 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1156 if (tick_is_expired(task->expire, now_ms)) {
1157 task->expire = TICK_ETERNITY;
1158 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1159 }
1160 si_applet_want_get(appctx->owner);
1161 appctx_wakeup(appctx);
1162 return task;
1163}
1164
1165/* Remove a SPOE applet from the agent cache */
1166static void
1167remove_spoe_applet_from_cache(struct appctx *appctx)
1168{
1169 struct appctx *a, *back;
1170 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1171
1172 if (LIST_ISEMPTY(&agent->cache))
1173 return;
1174
1175 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1176 if (a == appctx) {
1177 LIST_DEL(&APPCTX_SPOE(appctx).list);
1178 break;
1179 }
1180 }
1181}
1182
1183
1184/* Callback function that releases a SPOE applet. This happens when the
1185 * connection with the agent is closed. */
1186static void
1187release_spoe_applet(struct appctx *appctx)
1188{
1189 struct stream_interface *si = appctx->owner;
1190 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1191 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1192
1193 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1194 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1195 on_new_spoe_appctx_failure(agent);
1196
1197 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1198 si_shutw(si);
1199 si_shutr(si);
1200 si_ic(si)->flags |= CF_READ_NULL;
1201 appctx->st0 = SPOE_APPCTX_ST_END;
1202 }
1203
1204 if (ctx != NULL) {
1205 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1206 ctx->appctx = NULL;
1207 }
1208
1209 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1210 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1211 __FUNCTION__, appctx);
1212
1213 /* Release the task attached to the SPOE applet */
1214 if (APPCTX_SPOE(appctx).task) {
1215 task_delete(APPCTX_SPOE(appctx).task);
1216 task_free(APPCTX_SPOE(appctx).task);
1217 }
1218
1219 /* And remove it from the agent cache */
1220 remove_spoe_applet_from_cache(appctx);
1221 APPCTX_SPOE(appctx).ctx = NULL;
1222}
1223
1224/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1225 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1226 * encoded using the callback function <prepare>. */
1227static int
1228send_spoe_frame(struct appctx *appctx,
1229 int (*prepare)(struct appctx *, char *, size_t))
1230{
1231 struct stream_interface *si = appctx->owner;
1232 int framesz, ret;
1233 uint32_t netint;
1234
1235 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1236 if (ret <= 0)
1237 goto skip_or_error;
1238 framesz = ret;
1239 netint = htonl(framesz);
1240 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1241 if (ret > 0)
1242 ret = bi_putblk(si_ic(si), trash.str, framesz);
1243 if (ret <= 0) {
1244 if (ret == -1)
1245 return -1;
1246 return -2;
1247 }
1248 return 1;
1249
1250 skip_or_error:
1251 if (!ret)
1252 return -1;
1253 return -2;
1254}
1255
1256/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1257 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1258 * is decoded using the callback function <handle>. */
1259static int
1260recv_spoe_frame(struct appctx *appctx,
1261 int (*handle)(struct appctx *, char *, size_t))
1262{
1263 struct stream_interface *si = appctx->owner;
1264 int framesz, ret;
1265 uint32_t netint;
1266
1267 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1268 if (ret <= 0)
1269 goto empty_or_error;
1270 framesz = ntohl(netint);
1271 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1272 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1273 return -2;
1274 }
1275
1276 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1277 if (ret <= 0)
1278 goto empty_or_error;
1279 bo_skip(si_oc(si), ret+sizeof(netint));
1280
1281 /* First check if the received frame is a DISCONNECT frame */
1282 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1283 if (ret != 0) {
1284 if (ret > 0) {
1285 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1286 " - disconnected by peer (%d): %s\n",
1287 (int)now.tv_sec, (int)now.tv_usec,
1288 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1289 __FUNCTION__, appctx, spoe_status_code,
1290 spoe_reason);
1291 return 2;
1292 }
1293 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1294 " - error on frame (%s)\n",
1295 (int)now.tv_sec, (int)now.tv_usec,
1296 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1297 __FUNCTION__, appctx,
1298 spoe_frm_err_reasons[spoe_status_code]);
1299 return -2;
1300 }
1301 if (handle == NULL)
1302 goto out;
1303
1304 /* If not, try to decode it */
1305 ret = handle(appctx, trash.str, framesz);
1306 if (ret <= 0) {
1307 if (!ret)
1308 return -1;
1309 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1310 " - error on frame (%s)\n",
1311 (int)now.tv_sec, (int)now.tv_usec,
1312 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1313 __FUNCTION__, appctx,
1314 spoe_frm_err_reasons[spoe_status_code]);
1315 return -2;
1316 }
1317 out:
1318 return 1;
1319
1320 empty_or_error:
1321 if (!ret)
1322 return 0;
1323 spoe_status_code = SPOE_FRM_ERR_IO;
1324 return -2;
1325}
1326
1327/* I/O Handler processing messages exchanged with the agent */
1328static void
1329handle_spoe_applet(struct appctx *appctx)
1330{
1331 struct stream_interface *si = appctx->owner;
1332 struct stream *s = si_strm(si);
1333 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1334 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1335 int ret;
1336
1337 switchstate:
1338 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1339 " - appctx-state=%s\n",
1340 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1341 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1342
1343 switch (appctx->st0) {
1344 case SPOE_APPCTX_ST_CONNECT:
1345 spoe_status_code = SPOE_FRM_ERR_NONE;
1346 if (si->state <= SI_ST_CON) {
1347 si_applet_want_put(si);
1348 task_wakeup(s->task, TASK_WOKEN_MSG);
1349 break;
1350 }
1351 else if (si->state != SI_ST_EST) {
1352 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1353 on_new_spoe_appctx_failure(agent);
1354 goto switchstate;
1355 }
1356 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1357 if (ret < 0) {
1358 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1359 on_new_spoe_appctx_failure(agent);
1360 goto switchstate;
1361 }
1362 else if (!ret)
1363 goto full;
1364
1365 /* Hello frame was sent. Set the hello timeout and
1366 * wait for the reply. */
1367 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1368 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1369 /* fall through */
1370
1371 case SPOE_APPCTX_ST_CONNECTING:
1372 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1373 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1374 on_new_spoe_appctx_failure(agent);
1375 goto switchstate;
1376 }
1377 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1378 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1379 " - Connection timed out\n",
1380 (int)now.tv_sec, (int)now.tv_usec,
1381 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1382 __FUNCTION__, appctx);
1383 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1384 on_new_spoe_appctx_failure(agent);
1385 goto switchstate;
1386 }
1387 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1388 if (ret < 0) {
1389 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1390 on_new_spoe_appctx_failure(agent);
1391 goto switchstate;
1392 }
1393 if (ret == 2) {
1394 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1395 on_new_spoe_appctx_failure(agent);
1396 goto switchstate;
1397 }
1398 if (!ret)
1399 goto out;
1400
1401 /* hello handshake is finished, set the idle timeout,
1402 * Add the appctx in the agent cache, decrease the
1403 * number of new applets and wake up waiting streams. */
1404 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1405 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1406 on_new_spoe_appctx_success(agent, appctx);
1407 break;
1408
1409 case SPOE_APPCTX_ST_PROCESSING:
1410 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1411 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1412 goto switchstate;
1413 }
1414 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1415 spoe_status_code = SPOE_FRM_ERR_TOUT;
1416 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1417 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1418 goto switchstate;
1419 }
1420 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1421 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1422 if (ret < 0) {
1423 if (ret == -1) {
1424 ctx->state = SPOE_CTX_ST_ERROR;
1425 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1426 goto skip_notify_frame;
1427 }
1428 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1429 goto switchstate;
1430 }
1431 else if (!ret)
1432 goto full;
1433 ctx->state = SPOE_CTX_ST_WAITING_ACK;
1434 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.ack);
1435 }
1436
1437 skip_notify_frame:
1438 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1439 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1440 if (ret < 0) {
1441 if (ret == -1)
1442 goto skip_notify_frame;
1443 ctx->state = SPOE_CTX_ST_ERROR;
1444 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1445 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1446 goto switchstate;
1447 }
1448 if (!ret)
1449 goto out;
1450 if (ret == 2) {
1451 ctx->state = SPOE_CTX_ST_ERROR;
1452 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1453 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1454 goto switchstate;
1455 }
1456 ctx->state = SPOE_CTX_ST_DONE;
1457 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1458 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1459 }
1460 else {
1461 if (stopping) {
1462 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1463 goto switchstate;
1464 }
1465
1466 ret = recv_spoe_frame(appctx, NULL);
1467 if (ret < 0) {
1468 if (ret == -1)
1469 goto skip_notify_frame;
1470 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1471 goto switchstate;
1472 }
1473 if (!ret)
1474 goto out;
1475 if (ret == 2) {
1476 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1477 goto switchstate;
1478 }
1479 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1480 }
1481 break;
1482
1483 case SPOE_APPCTX_ST_DISCONNECT:
1484 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1485 if (ret < 0) {
1486 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1487 goto switchstate;
1488 }
1489 else if (!ret)
1490 goto full;
1491 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1492 " - disconnected by HAProxy (%d): %s\n",
1493 (int)now.tv_sec, (int)now.tv_usec,
1494 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1495 __FUNCTION__, appctx, spoe_status_code,
1496 spoe_frm_err_reasons[spoe_status_code]);
1497
1498 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.ack);
1499 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1500 /* fall through */
1501
1502 case SPOE_APPCTX_ST_DISCONNECTING:
1503 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1504 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1505 goto switchstate;
1506 }
1507 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1508 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1509 goto switchstate;
1510 }
1511 ret = recv_spoe_frame(appctx, NULL);
1512 if (ret < 0 || ret == 2) {
1513 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1514 goto switchstate;
1515 }
1516 break;
1517
1518 case SPOE_APPCTX_ST_EXIT:
1519 si_shutw(si);
1520 si_shutr(si);
1521 si_ic(si)->flags |= CF_READ_NULL;
1522 appctx->st0 = SPOE_APPCTX_ST_END;
1523 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1524 /* fall through */
1525
1526 case SPOE_APPCTX_ST_END:
1527 break;
1528 }
1529
1530 out:
1531 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1532 task_queue(APPCTX_SPOE(appctx).task);
1533 si_oc(si)->flags |= CF_READ_DONTWAIT;
1534 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1535 return;
1536 full:
1537 si_applet_cant_put(si);
1538 goto out;
1539}
1540
1541struct applet spoe_applet = {
1542 .obj_type = OBJ_TYPE_APPLET,
1543 .name = "<SPOE>", /* used for logging */
1544 .fct = handle_spoe_applet,
1545 .release = release_spoe_applet,
1546};
1547
1548/* Create a SPOE applet. On success, the created applet is returned, else
1549 * NULL. */
1550static struct appctx *
1551create_spoe_appctx(struct spoe_config *conf)
1552{
1553 struct appctx *appctx;
1554 struct session *sess;
1555 struct task *task;
1556 struct stream *strm;
1557 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1558 struct listener *, by_fe);
1559
1560 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1561 goto out_error;
1562
1563 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1564 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1565 goto out_free_appctx;
1566 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1567 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1568 APPCTX_SPOE(appctx).task->context = appctx;
1569 APPCTX_SPOE(appctx).agent = conf->agent;
1570 APPCTX_SPOE(appctx).ctx = NULL;
1571 APPCTX_SPOE(appctx).version = 0;
1572 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1573 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1574
1575 sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1576 if (!sess)
1577 goto out_free_spoe;
1578
1579 if ((task = task_new()) == NULL)
1580 goto out_free_sess;
1581
1582 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1583 goto out_free_task;
1584
1585 strm->target = sess->listener->default_target;
1586 strm->req.analysers |= sess->listener->analysers;
1587 stream_set_backend(strm, conf->agent->b.be);
1588
1589 /* applet is waiting for data */
1590 si_applet_cant_get(&strm->si[0]);
1591 appctx_wakeup(appctx);
1592
1593 /* Increase the number of applets waiting the end of the hello
1594 * handshake. */
1595 conf->agent->new_applets++;
1596
1597 strm->do_log = NULL;
1598 strm->res.flags |= CF_READ_DONTWAIT;
1599
1600 conf->agent_fe.feconn++;
1601 jobs++;
1602 totalconn++;
1603
1604 return appctx;
1605
1606 /* Error unrolling */
1607 out_free_task:
1608 task_free(task);
1609 out_free_sess:
1610 session_free(sess);
1611 out_free_spoe:
1612 task_free(APPCTX_SPOE(appctx).task);
1613 out_free_appctx:
1614 appctx_free(appctx);
1615 out_error:
1616 return NULL;
1617}
1618
1619/* Wake up a SPOE applet attached to a SPOE context. */
1620static void
1621wakeup_spoe_appctx(struct spoe_context *ctx)
1622{
1623 if (ctx->appctx == NULL)
1624 return;
1625 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1626 si_applet_want_get(ctx->appctx->owner);
1627 si_applet_want_put(ctx->appctx->owner);
1628 appctx_wakeup(ctx->appctx);
1629 }
1630}
1631
1632
1633/* Run across the list of pending streams waiting for a SPOE applet and wake the
1634 * first. */
1635static void
1636offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1637{
1638 struct spoe_context *ctx;
1639
Christopher Fauletf7a30922016-11-10 15:04:51 +01001640 if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1641 return;
1642
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001643 if (LIST_ISEMPTY(&agent->applet_wq))
1644 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1645 else {
1646 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1647 APPCTX_SPOE(appctx).ctx = ctx;
1648 ctx->appctx = appctx;
1649 LIST_DEL(&ctx->applet_wait);
1650 LIST_INIT(&ctx->applet_wait);
1651 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1652 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1653 " - wake up stream to get available SPOE applet\n",
1654 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1655 __FUNCTION__, ctx->strm);
1656 }
1657}
1658
1659/* A failure occurred during SPOE applet creation. */
1660static void
1661on_new_spoe_appctx_failure(struct spoe_agent *agent)
1662{
1663 struct spoe_context *ctx;
1664
1665 agent->new_applets--;
1666 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
1667 ctx->errs++;
1668 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1669 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1670 " - wake up stream because to SPOE applet connection failed\n",
1671 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1672 __FUNCTION__, ctx->strm);
1673 }
1674}
1675
1676static void
1677on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1678{
1679 agent->new_applets--;
1680 offer_spoe_appctx(agent, appctx);
1681}
1682/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1683 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1684static int
1685acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1686{
1687 struct spoe_config *conf = FLT_CONF(ctx->filter);
1688 struct spoe_agent *agent = conf->agent;
1689 struct appctx *appctx;
1690
1691 /* If a process is already started for this SPOE context, retry
1692 * later. */
1693 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1694 goto wait;
1695
1696 /* If needed, initialize the buffer that will be used to encode messages
1697 * and decode actions. */
1698 if (ctx->buffer == &buf_empty) {
1699 if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
1700 LIST_DEL(&ctx->buffer_wait);
1701 LIST_INIT(&ctx->buffer_wait);
1702 }
1703
1704 if (!b_alloc_margin(&ctx->buffer, 0)) {
1705 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
1706 goto wait;
1707 }
1708 }
1709
1710 /* If the SPOE applet was already set, all is done. */
1711 if (ctx->appctx)
1712 goto success;
1713
1714 /* Else try to retrieve it from the agent cache */
1715 if (!LIST_ISEMPTY(&agent->cache)) {
1716 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1717 LIST_DEL(&APPCTX_SPOE(appctx).list);
1718 APPCTX_SPOE(appctx).ctx = ctx;
1719 ctx->appctx = appctx;
1720 goto success;
1721 }
1722
1723 /* If there is no server up for the agent's backend or it too many
1724 * failure occurred, this is an error. */
1725 if ((!agent->b.be->srv_act && !agent->b.be->srv_bck) ||
1726 ctx->errs >= MAX_NEW_SPOE_APPLET_ERRS)
1727 goto error;
1728
1729 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1730 " - waiting for available SPOE appctx\n",
1731 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1732 ctx->strm);
1733
1734 /* Else add the stream in the waiting queue. */
1735 if (LIST_ISEMPTY(&ctx->applet_wait))
1736 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1737
1738 /* Finally, create new SPOE applet if we can */
1739 if (agent->new_applets < MAX_NEW_SPOE_APPLETS) {
1740 if (create_spoe_appctx(conf) == NULL)
1741 goto error;
1742 }
1743
1744 wait:
1745 return 0;
1746
1747 success:
1748 /* Remove the stream from the waiting queue */
1749 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1750 LIST_DEL(&ctx->applet_wait);
1751 LIST_INIT(&ctx->applet_wait);
1752 }
1753
1754 /* Set the right flag to prevent request and response processing
1755 * in same time. */
1756 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1757 ? SPOE_CTX_FL_REQ_PROCESS
1758 : SPOE_CTX_FL_RSP_PROCESS);
1759 ctx->errs = 0;
1760
1761 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1762 " - acquire SPOE appctx %p from cache\n",
1763 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1764 __FUNCTION__, ctx->strm, ctx->appctx);
1765 return 1;
1766
1767 error:
1768 /* Remove the stream from the waiting queue */
1769 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1770 LIST_DEL(&ctx->applet_wait);
1771 LIST_INIT(&ctx->applet_wait);
1772 }
1773
1774 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1775 " - failed to acquire SPOE appctx errs=%u\n",
1776 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1777 __FUNCTION__, ctx->strm, ctx->errs);
1778 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1779
1780 return -1;
1781}
1782
1783/* Release a SPOE applet and push it in the agent cache. */
1784static void
1785release_spoe_appctx(struct spoe_context *ctx)
1786{
1787 struct spoe_config *conf = FLT_CONF(ctx->filter);
1788 struct spoe_agent *agent = conf->agent;
1789 struct appctx *appctx = ctx->appctx;
1790
1791 /* Reset the flag to allow next processing */
1792 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1793
Christopher Fauletf7a30922016-11-10 15:04:51 +01001794 /* Reset processing timer */
1795 ctx->process_exp = TICK_ETERNITY;
1796
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001797 /* Release the buffer if needed */
1798 if (ctx->buffer != &buf_empty) {
1799 b_free(&ctx->buffer);
1800 if (!LIST_ISEMPTY(&buffer_wq))
1801 stream_offer_buffers();
1802 }
1803
1804 /* If there is no SPOE applet, all is done */
1805 if (!appctx)
1806 return;
1807
1808 /* Else, reassign it or push it in the agent cache */
1809 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1810 " - release SPOE appctx %p\n",
1811 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1812 __FUNCTION__, ctx->strm, appctx);
1813
1814 APPCTX_SPOE(appctx).ctx = NULL;
1815 ctx->appctx = NULL;
1816 offer_spoe_appctx(agent, appctx);
1817}
1818
1819/***************************************************************************
1820 * Functions that process SPOE messages and actions
1821 **************************************************************************/
1822/* Process SPOE messages for a specific event. During the processing, it returns
1823 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1824 * is returned. */
1825static int
1826process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1827 struct list *messages, int dir)
1828{
1829 struct spoe_message *msg;
1830 struct sample *smp;
1831 struct spoe_arg *arg;
1832 char *p;
1833 size_t max_size;
1834 int off, flag, idx = 0;
1835
1836 /* Reserve 32 bytes from the frame Metadata */
1837 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1838
1839 b_reset(ctx->buffer);
1840 p = ctx->buffer->p;
1841
1842 /* Loop on messages */
1843 list_for_each_entry(msg, messages, list) {
1844 if (idx + msg->id_len + 1 > max_size)
1845 goto skip;
1846
1847 /* Set the message name */
1848 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1849
1850 /* Save offset where to store the number of arguments for this
1851 * message */
1852 off = idx++;
1853 p[off] = 0;
1854
1855 /* Loop on arguments */
1856 list_for_each_entry(arg, &msg->args, list) {
1857 p[off]++; /* Increment the number of arguments */
1858
1859 if (idx + arg->name_len + 1 > max_size)
1860 goto skip;
1861
1862 /* Encode the arguement name as a string. It can by NULL */
1863 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1864
1865 /* Fetch the arguement value */
1866 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1867 if (!smp) {
1868 /* If no value is available, set it to NULL */
1869 p[idx++] = SPOE_DATA_T_NULL;
1870 continue;
1871 }
1872
1873 /* Else, encode the arguement value */
1874 switch (smp->data.type) {
1875 case SMP_T_BOOL:
1876 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1877 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1878 break;
1879 case SMP_T_SINT:
1880 p[idx++] = SPOE_DATA_T_INT64;
1881 if (idx + 8 > max_size)
1882 goto skip;
1883 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1884 break;
1885 case SMP_T_IPV4:
1886 p[idx++] = SPOE_DATA_T_IPV4;
1887 if (idx + 4 > max_size)
1888 goto skip;
1889 memcpy(p+idx, &smp->data.u.ipv4, 4);
1890 idx += 4;
1891 break;
1892 case SMP_T_IPV6:
1893 p[idx++] = SPOE_DATA_T_IPV6;
1894 if (idx + 16 > max_size)
1895 goto skip;
1896 memcpy(p+idx, &smp->data.u.ipv6, 16);
1897 idx += 16;
1898 break;
1899 case SMP_T_STR:
1900 p[idx++] = SPOE_DATA_T_STR;
1901 if (idx + smp->data.u.str.len > max_size)
1902 goto skip;
1903 idx += encode_spoe_string(smp->data.u.str.str,
1904 smp->data.u.str.len,
1905 p+idx);
1906 break;
1907 case SMP_T_BIN:
1908 p[idx++] = SPOE_DATA_T_BIN;
1909 if (idx + smp->data.u.str.len > max_size)
1910 goto skip;
1911 idx += encode_spoe_string(smp->data.u.str.str,
1912 smp->data.u.str.len,
1913 p+idx);
1914 break;
1915 case SMP_T_METH:
1916 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1917 p[idx++] = SPOE_DATA_T_STR;
1918 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1919 goto skip;
1920 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1921 http_known_methods[smp->data.u.meth.meth].len,
1922 p+idx);
1923 }
1924 else {
1925 p[idx++] = SPOE_DATA_T_STR;
1926 if (idx + smp->data.u.str.len > max_size)
1927 goto skip;
1928 idx += encode_spoe_string(smp->data.u.meth.str.str,
1929 smp->data.u.meth.str.len,
1930 p+idx);
1931 }
1932 break;
1933 default:
1934 p[idx++] = SPOE_DATA_T_NULL;
1935 }
1936 }
1937 }
1938 ctx->buffer->i = idx;
1939 return 1;
1940
1941 skip:
1942 b_reset(ctx->buffer);
1943 return 0;
1944}
1945
1946/* Helper function to set a variable */
1947static void
1948set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1949 struct sample *smp)
1950{
1951 struct spoe_config *conf = FLT_CONF(ctx->filter);
1952 struct spoe_agent *agent = conf->agent;
1953 char varname[64];
1954
1955 memset(varname, 0, sizeof(varname));
1956 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1957 scope, agent->var_pfx, len, name);
1958 vars_set_by_name_ifexist(varname, len, smp);
1959}
1960
1961/* Helper function to unset a variable */
1962static void
1963unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1964 struct sample *smp)
1965{
1966 struct spoe_config *conf = FLT_CONF(ctx->filter);
1967 struct spoe_agent *agent = conf->agent;
1968 char varname[64];
1969
1970 memset(varname, 0, sizeof(varname));
1971 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1972 scope, agent->var_pfx, len, name);
1973 vars_unset_by_name_ifexist(varname, len, smp);
1974}
1975
1976
1977/* Process SPOE actions for a specific event. During the processing, it returns
1978 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1979 * is returned. */
1980static int
1981process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1982 enum spoe_event ev, int dir)
1983{
1984 char *p;
1985 size_t size;
1986 int off, i, idx = 0;
1987
1988 p = ctx->buffer->p;
1989 size = ctx->buffer->i;
1990
1991 while (idx < size) {
1992 char *str;
1993 uint64_t sz;
1994 struct sample smp;
1995 enum spoe_action_type type;
1996
1997 off = idx;
1998 if (idx+2 > size)
1999 goto skip;
2000
2001 type = p[idx++];
2002 switch (type) {
2003 case SPOE_ACT_T_SET_VAR: {
2004 char *scope;
2005
2006 if (p[idx++] != 3)
2007 goto skip_action;
2008
2009 switch (p[idx++]) {
2010 case SPOE_SCOPE_PROC: scope = "proc"; break;
2011 case SPOE_SCOPE_SESS: scope = "sess"; break;
2012 case SPOE_SCOPE_TXN : scope = "txn"; break;
2013 case SPOE_SCOPE_REQ : scope = "req"; break;
2014 case SPOE_SCOPE_RES : scope = "res"; break;
2015 default: goto skip;
2016 }
2017
2018 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2019 if (str == NULL)
2020 goto skip;
2021 memset(&smp, 0, sizeof(smp));
2022 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2023 if (decode_spoe_data(p+idx, p+size, &smp) == -1)
2024 goto skip;
2025
2026 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2027 " - set-var '%s.%s.%.*s'\n",
2028 (int)now.tv_sec, (int)now.tv_usec,
2029 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2030 __FUNCTION__, s, scope,
2031 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2032 (int)sz, str);
2033
2034 set_spoe_var(ctx, scope, str, sz, &smp);
2035 break;
2036 }
2037
2038 case SPOE_ACT_T_UNSET_VAR: {
2039 char *scope;
2040
2041 if (p[idx++] != 2)
2042 goto skip_action;
2043
2044 switch (p[idx++]) {
2045 case SPOE_SCOPE_PROC: scope = "proc"; break;
2046 case SPOE_SCOPE_SESS: scope = "sess"; break;
2047 case SPOE_SCOPE_TXN : scope = "txn"; break;
2048 case SPOE_SCOPE_REQ : scope = "req"; break;
2049 case SPOE_SCOPE_RES : scope = "res"; break;
2050 default: goto skip;
2051 }
2052
2053 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2054 if (str == NULL)
2055 goto skip;
2056 memset(&smp, 0, sizeof(smp));
2057 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2058
2059 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2060 " - unset-var '%s.%s.%.*s'\n",
2061 (int)now.tv_sec, (int)now.tv_usec,
2062 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2063 __FUNCTION__, s, scope,
2064 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2065 (int)sz, str);
2066
2067 unset_spoe_var(ctx, scope, str, sz, &smp);
2068 break;
2069 }
2070
2071 default:
2072 skip_action:
2073 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2074 goto skip;
2075 idx += i;
2076 }
2077 }
2078
2079 return 1;
2080 skip:
2081 return 0;
2082}
2083
2084
2085/* Process a SPOE event. First, this functions will process messages attached to
2086 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2087 * ACK frame to process corresponding actions. During all the processing, it
2088 * returns 0 and it returns 1 when the processing is finished. If an error
2089 * occurred, -1 is returned. */
2090static int
2091process_spoe_event(struct stream *s, struct spoe_context *ctx,
2092 enum spoe_event ev)
2093{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002094 struct spoe_config *conf = FLT_CONF(ctx->filter);
2095 struct spoe_agent *agent = conf->agent;
2096 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002097
2098 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2099 " - ctx-state=%s - event=%s\n",
2100 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002101 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002102 spoe_event_str[ev]);
2103
2104 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2105
2106 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2107 goto out;
2108
2109 if (ctx->state == SPOE_CTX_ST_ERROR)
2110 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002111
2112 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2113 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2114 " - failed to process event '%s': timeout\n",
2115 (int)now.tv_sec, (int)now.tv_usec,
2116 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2117 send_log(ctx->strm->be, LOG_WARNING,
2118 "failed to process event '%s': timeout.\n",
2119 spoe_event_str[ev]);
2120 goto error;
2121 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002122
2123 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauletf7a30922016-11-10 15:04:51 +01002124 if (!tick_isset(ctx->process_exp)) {
2125 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2126 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2127 ctx->process_exp);
2128 }
2129
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002130 ret = acquire_spoe_appctx(ctx, dir);
2131 if (ret <= 0) {
2132 if (!ret)
2133 goto out;
2134 goto error;
2135 }
2136 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2137 }
2138
2139 if (ctx->appctx == NULL)
2140 goto error;
2141
2142 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2143 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2144 if (ret <= 0) {
2145 if (!ret)
2146 goto skip;
2147 goto error;
2148 }
2149 wakeup_spoe_appctx(ctx);
2150 ret = 0;
2151 goto out;
2152 }
2153
2154 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2155 wakeup_spoe_appctx(ctx);
2156 ret = 0;
2157 goto out;
2158 }
2159
2160 if (ctx->state == SPOE_CTX_ST_DONE) {
2161 ret = process_spoe_actions(s, ctx, ev, dir);
2162 if (ret <= 0) {
2163 if (!ret)
2164 goto skip;
2165 goto error;
2166 }
2167 ctx->frame_id++;
2168 release_spoe_appctx(ctx);
2169 ctx->state = SPOE_CTX_ST_READY;
2170 }
2171
2172 out:
2173 return ret;
2174
2175 skip:
2176 release_spoe_appctx(ctx);
2177 ctx->state = SPOE_CTX_ST_READY;
2178 return 1;
2179
2180 error:
2181 release_spoe_appctx(ctx);
2182 ctx->state = SPOE_CTX_ST_ERROR;
2183 return 1;
2184}
2185
2186
2187/***************************************************************************
2188 * Functions that create/destroy SPOE contexts
2189 **************************************************************************/
2190static struct spoe_context *
2191create_spoe_context(struct filter *filter)
2192{
2193 struct spoe_config *conf = FLT_CONF(filter);
2194 struct spoe_context *ctx;
2195
2196 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2197 if (ctx == NULL) {
2198 return NULL;
2199 }
2200 memset(ctx, 0, sizeof(*ctx));
2201 ctx->filter = filter;
2202 ctx->state = SPOE_CTX_ST_NONE;
2203 ctx->flags = 0;
2204 ctx->errs = 0;
2205 ctx->messages = conf->agent->messages;
2206 ctx->buffer = &buf_empty;
2207 LIST_INIT(&ctx->buffer_wait);
2208 LIST_INIT(&ctx->applet_wait);
2209
Christopher Fauletf7a30922016-11-10 15:04:51 +01002210 ctx->stream_id = 0;
2211 ctx->frame_id = 1;
2212 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002213
2214 return ctx;
2215}
2216
2217static void
2218destroy_spoe_context(struct spoe_context *ctx)
2219{
2220 if (!ctx)
2221 return;
2222
2223 if (ctx->appctx)
2224 APPCTX_SPOE(ctx->appctx).ctx = NULL;
2225 if (!LIST_ISEMPTY(&ctx->buffer_wait))
2226 LIST_DEL(&ctx->buffer_wait);
2227 if (!LIST_ISEMPTY(&ctx->applet_wait))
2228 LIST_DEL(&ctx->applet_wait);
2229 pool_free2(pool2_spoe_ctx, ctx);
2230}
2231
2232static void
2233reset_spoe_context(struct spoe_context *ctx)
2234{
2235 ctx->state = SPOE_CTX_ST_READY;
2236 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2237}
2238
2239
2240/***************************************************************************
2241 * Hooks that manage the filter lifecycle (init/check/deinit)
2242 **************************************************************************/
2243/* Signal handler: Do a soft stop, wakeup SPOE applet */
2244static void
2245sig_stop_spoe(struct sig_handler *sh)
2246{
2247 struct proxy *p;
2248
2249 p = proxy;
2250 while (p) {
2251 struct flt_conf *fconf;
2252
2253 list_for_each_entry(fconf, &p->filter_configs, list) {
2254 struct spoe_config *conf = fconf->conf;
2255 struct spoe_agent *agent = conf->agent;
2256 struct appctx *appctx;
2257
2258 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2259 si_applet_want_get(appctx->owner);
2260 si_applet_want_put(appctx->owner);
2261 appctx_wakeup(appctx);
2262 }
2263 }
2264 p = p->next;
2265 }
2266}
2267
2268
2269/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2270static int
2271spoe_init(struct proxy *px, struct flt_conf *fconf)
2272{
2273 struct spoe_config *conf = fconf->conf;
2274 struct listener *l;
2275
2276 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2277 init_new_proxy(&conf->agent_fe);
2278 conf->agent_fe.parent = conf->agent;
2279 conf->agent_fe.last_change = now.tv_sec;
2280 conf->agent_fe.id = conf->agent->id;
2281 conf->agent_fe.cap = PR_CAP_FE;
2282 conf->agent_fe.mode = PR_MODE_TCP;
2283 conf->agent_fe.maxconn = 0;
2284 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2285 conf->agent_fe.conn_retries = CONN_RETRIES;
2286 conf->agent_fe.accept = frontend_accept;
2287 conf->agent_fe.srv = NULL;
2288 conf->agent_fe.timeout.client = TICK_ETERNITY;
2289 conf->agent_fe.default_target = &spoe_applet.obj_type;
2290 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2291
2292 if ((l = calloc(1, sizeof(*l))) == NULL) {
2293 Alert("spoe_init : out of memory.\n");
2294 goto out_error;
2295 }
2296 l->obj_type = OBJ_TYPE_LISTENER;
2297 l->obj_type = OBJ_TYPE_LISTENER;
2298 l->frontend = &conf->agent_fe;
2299 l->state = LI_READY;
2300 l->analysers = conf->agent_fe.fe_req_ana;
2301 LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2302
2303 if (!sighandler_registered) {
2304 signal_register_fct(0, sig_stop_spoe, 0);
2305 sighandler_registered = 1;
2306 }
2307
2308 return 0;
2309
2310 out_error:
2311 return -1;
2312}
2313
2314/* Free ressources allocated by the SPOE filter. */
2315static void
2316spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2317{
2318 struct spoe_config *conf = fconf->conf;
2319
2320 if (conf) {
2321 struct spoe_agent *agent = conf->agent;
2322 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2323 struct listener *, by_fe);
2324
2325 free(l);
2326 release_spoe_agent(agent);
2327 free(conf);
2328 }
2329 fconf->conf = NULL;
2330}
2331
2332/* Check configuration of a SPOE filter for a specified proxy.
2333 * Return 1 on error, else 0. */
2334static int
2335spoe_check(struct proxy *px, struct flt_conf *fconf)
2336{
2337 struct spoe_config *conf = fconf->conf;
2338 struct proxy *target;
2339
2340 target = proxy_be_by_name(conf->agent->b.name);
2341 if (target == NULL) {
2342 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2343 " declared at %s:%d.\n",
2344 px->id, conf->agent->b.name, conf->agent->id,
2345 conf->agent->conf.file, conf->agent->conf.line);
2346 return 1;
2347 }
2348 if (target->mode != PR_MODE_TCP) {
2349 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2350 " at %s:%d does not support HTTP mode.\n",
2351 px->id, target->id, conf->agent->id,
2352 conf->agent->conf.file, conf->agent->conf.line);
2353 return 1;
2354 }
2355
2356 free(conf->agent->b.name);
2357 conf->agent->b.name = NULL;
2358 conf->agent->b.be = target;
2359 return 0;
2360}
2361
2362/**************************************************************************
2363 * Hooks attached to a stream
2364 *************************************************************************/
2365/* Called when a filter instance is created and attach to a stream. It creates
2366 * the context that will be used to process this stream. */
2367static int
2368spoe_start(struct stream *s, struct filter *filter)
2369{
2370 struct spoe_context *ctx;
2371
2372 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2373 (int)now.tv_sec, (int)now.tv_usec,
2374 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2375 __FUNCTION__, s);
2376
2377 ctx = create_spoe_context(filter);
2378 if (ctx == NULL) {
2379 send_log(s->be, LOG_EMERG,
2380 "failed to create SPOE context for proxy %s\n",
2381 s->be->id);
2382 return 0;
2383 }
2384
2385 ctx->strm = s;
2386 ctx->state = SPOE_CTX_ST_READY;
2387 filter->ctx = ctx;
2388
2389 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2390 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2391
2392 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2393 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2394
2395 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2396 filter->pre_analyzers |= AN_RES_INSPECT;
2397
2398 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2399 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2400
2401 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2402 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2403
2404 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2405 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2406
2407 return 1;
2408}
2409
2410/* Called when a filter instance is detached from a stream. It release the
2411 * attached SPOE context. */
2412static void
2413spoe_stop(struct stream *s, struct filter *filter)
2414{
2415 struct spoe_context *ctx = filter->ctx;
2416
2417 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2418 (int)now.tv_sec, (int)now.tv_usec,
2419 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2420 __FUNCTION__, s);
2421
2422 if (ctx) {
2423 release_spoe_appctx(ctx);
2424 destroy_spoe_context(ctx);
2425 }
2426}
2427
Christopher Fauletf7a30922016-11-10 15:04:51 +01002428
2429/*
2430 * Called when the stream is woken up because of expired timer.
2431 */
2432static void
2433spoe_check_timeouts(struct stream *s, struct filter *filter)
2434{
2435 struct spoe_context *ctx = filter->ctx;
2436
2437 if (tick_is_expired(ctx->process_exp, now_ms))
2438 s->task->state |= TASK_WOKEN_MSG;
2439}
2440
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002441/* Called when we are ready to filter data on a channel */
2442static int
2443spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2444{
2445 struct spoe_context *ctx = filter->ctx;
2446 int ret = 1;
2447
2448 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2449 " - ctx-flags=0x%08x\n",
2450 (int)now.tv_sec, (int)now.tv_usec,
2451 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2452 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2453
2454 if (!(chn->flags & CF_ISRESP)) {
2455 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2456 chn->analysers |= AN_REQ_INSPECT_FE;
2457 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2458 chn->analysers |= AN_REQ_INSPECT_BE;
2459
2460 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2461 goto out;
2462
2463 ctx->stream_id = s->uniq_id;
2464 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2465 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2466 if (ret != 1)
2467 goto out;
2468 }
2469 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2470 }
2471 else {
2472 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2473 chn->analysers |= AN_RES_INSPECT;
2474
2475 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2476 goto out;
2477
2478 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2479 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2480 if (ret != 1)
2481 goto out;
2482 }
2483 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2484 }
2485
2486 out:
2487 if (!ret) {
2488 channel_dont_read(chn);
2489 channel_dont_close(chn);
2490 }
2491 return ret;
2492}
2493
2494/* Called before a processing happens on a given channel */
2495static int
2496spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2497 struct channel *chn, unsigned an_bit)
2498{
2499 struct spoe_context *ctx = filter->ctx;
2500 int ret = 1;
2501
2502 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2503 " - ctx-flags=0x%08x - ana=0x%08x\n",
2504 (int)now.tv_sec, (int)now.tv_usec,
2505 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2506 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2507 ctx->flags, an_bit);
2508
2509 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2510 goto out;
2511
2512 switch (an_bit) {
2513 case AN_REQ_INSPECT_FE:
2514 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2515 break;
2516 case AN_REQ_INSPECT_BE:
2517 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2518 break;
2519 case AN_RES_INSPECT:
2520 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2521 break;
2522 case AN_REQ_HTTP_PROCESS_FE:
2523 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2524 break;
2525 case AN_REQ_HTTP_PROCESS_BE:
2526 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2527 break;
2528 case AN_RES_HTTP_PROCESS_FE:
2529 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2530 break;
2531 }
2532
2533 out:
2534 if (!ret) {
2535 channel_dont_read(chn);
2536 channel_dont_close(chn);
2537 }
2538 return ret;
2539}
2540
2541/* Called when the filtering on the channel ends. */
2542static int
2543spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2544{
2545 struct spoe_context *ctx = filter->ctx;
2546
2547 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2548 " - ctx-flags=0x%08x\n",
2549 (int)now.tv_sec, (int)now.tv_usec,
2550 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2551 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2552
2553 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2554 reset_spoe_context(ctx);
2555 }
2556
2557 return 1;
2558}
2559
2560/********************************************************************
2561 * Functions that manage the filter initialization
2562 ********************************************************************/
2563struct flt_ops spoe_ops = {
2564 /* Manage SPOE filter, called for each filter declaration */
2565 .init = spoe_init,
2566 .deinit = spoe_deinit,
2567 .check = spoe_check,
2568
2569 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002570 .attach = spoe_start,
2571 .detach = spoe_stop,
2572 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002573
2574 /* Handle channels activity */
2575 .channel_start_analyze = spoe_start_analyze,
2576 .channel_pre_analyze = spoe_chn_pre_analyze,
2577 .channel_end_analyze = spoe_end_analyze,
2578};
2579
2580
2581static int
2582cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2583{
2584 const char *err;
2585 int i, err_code = 0;
2586
2587 if ((cfg_scope == NULL && curengine != NULL) ||
2588 (cfg_scope != NULL && curengine == NULL) ||
2589 strcmp(curengine, cfg_scope))
2590 goto out;
2591
2592 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2593 if (!*args[1]) {
2594 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2595 file, linenum);
2596 err_code |= ERR_ALERT | ERR_ABORT;
2597 goto out;
2598 }
2599 if (*args[2]) {
2600 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2601 file, linenum, args[2]);
2602 err_code |= ERR_ALERT | ERR_ABORT;
2603 goto out;
2604 }
2605
2606 err = invalid_char(args[1]);
2607 if (err) {
2608 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2609 file, linenum, *err, args[0], args[1]);
2610 err_code |= ERR_ALERT | ERR_ABORT;
2611 goto out;
2612 }
2613
2614 if (curagent != NULL) {
2615 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2616 file, linenum);
2617 err_code |= ERR_ALERT | ERR_ABORT;
2618 goto out;
2619 }
2620 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2621 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2622 err_code |= ERR_ALERT | ERR_ABORT;
2623 goto out;
2624 }
2625
2626 curagent->id = strdup(args[1]);
2627 curagent->conf.file = strdup(file);
2628 curagent->conf.line = linenum;
2629 curagent->timeout.hello = TICK_ETERNITY;
2630 curagent->timeout.ack = TICK_ETERNITY;
2631 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002632 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002633 curagent->var_pfx = NULL;
2634 curagent->new_applets = 0;
2635
2636 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2637 LIST_INIT(&curagent->messages[i]);
2638 LIST_INIT(&curagent->cache);
2639 LIST_INIT(&curagent->applet_wq);
2640 }
2641 else if (!strcmp(args[0], "use-backend")) {
2642 if (!*args[1]) {
2643 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2644 file, linenum, args[0]);
2645 err_code |= ERR_ALERT | ERR_FATAL;
2646 goto out;
2647 }
2648 if (*args[2]) {
2649 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2650 file, linenum, args[2]);
2651 err_code |= ERR_ALERT | ERR_ABORT;
2652 goto out;
2653 }
2654 free(curagent->b.name);
2655 curagent->b.name = strdup(args[1]);
2656 }
2657 else if (!strcmp(args[0], "messages")) {
2658 int cur_arg = 1;
2659 while (*args[cur_arg]) {
2660 struct spoe_msg_placeholder *mp = NULL;
2661
2662 list_for_each_entry(mp, &curmps, list) {
2663 if (!strcmp(mp->id, args[cur_arg])) {
2664 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2665 file, linenum, args[cur_arg]);
2666 err_code |= ERR_ALERT | ERR_FATAL;
2667 goto out;
2668 }
2669 }
2670
2671 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2672 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2673 err_code |= ERR_ALERT | ERR_ABORT;
2674 goto out;
2675 }
2676 mp->id = strdup(args[cur_arg]);
2677 LIST_ADDQ(&curmps, &mp->list);
2678 cur_arg++;
2679 }
2680 }
2681 else if (!strcmp(args[0], "timeout")) {
2682 unsigned int *tv = NULL;
2683 const char *res;
2684 unsigned timeout;
2685
2686 if (!*args[1]) {
2687 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2688 file, linenum);
2689 err_code |= ERR_ALERT | ERR_FATAL;
2690 goto out;
2691 }
2692 if (!strcmp(args[1], "hello"))
2693 tv = &curagent->timeout.hello;
2694 else if (!strcmp(args[1], "idle"))
2695 tv = &curagent->timeout.idle;
2696 else if (!strcmp(args[1], "ack"))
2697 tv = &curagent->timeout.ack;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002698 else if (!strcmp(args[1], "processing"))
2699 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002700 else {
Christopher Fauletf7a30922016-11-10 15:04:51 +01002701 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle', 'ack' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002702 file, linenum, args[1]);
2703 err_code |= ERR_ALERT | ERR_FATAL;
2704 goto out;
2705 }
2706 if (!*args[2]) {
2707 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2708 file, linenum, args[1]);
2709 err_code |= ERR_ALERT | ERR_FATAL;
2710 goto out;
2711 }
2712 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2713 if (res) {
2714 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2715 file, linenum, *res, args[1]);
2716 err_code |= ERR_ALERT | ERR_ABORT;
2717 goto out;
2718 }
2719 if (*args[3]) {
2720 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2721 file, linenum, args[3]);
2722 err_code |= ERR_ALERT | ERR_ABORT;
2723 goto out;
2724 }
2725 *tv = MS_TO_TICKS(timeout);
2726 }
2727 else if (!strcmp(args[0], "option")) {
2728 if (!*args[1]) {
2729 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2730 file, linenum, args[0]);
2731 err_code |= ERR_ALERT | ERR_FATAL;
2732 goto out;
2733 }
2734 if (!strcmp(args[1], "var-prefix")) {
2735 char *tmp;
2736
2737 if (!*args[2]) {
2738 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2739 file, linenum, args[0],
2740 args[1]);
2741 err_code |= ERR_ALERT | ERR_FATAL;
2742 goto out;
2743 }
2744 tmp = args[2];
2745 while (*tmp) {
2746 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2747 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2748 file, linenum, args[0], args[1]);
2749 err_code |= ERR_ALERT | ERR_FATAL;
2750 goto out;
2751 }
2752 tmp++;
2753 }
2754 curagent->var_pfx = strdup(args[2]);
2755 }
2756 else {
2757 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2758 file, linenum, args[1]);
2759 err_code |= ERR_ALERT | ERR_FATAL;
2760 goto out;
2761 }
2762 }
2763 else if (*args[0]) {
2764 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2765 file, linenum, args[0]);
2766 err_code |= ERR_ALERT | ERR_FATAL;
2767 goto out;
2768 }
2769 out:
2770 return err_code;
2771}
2772
2773static int
2774cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2775{
2776 struct spoe_message *msg;
2777 struct spoe_arg *arg;
2778 const char *err;
2779 char *errmsg = NULL;
2780 int err_code = 0;
2781
2782 if ((cfg_scope == NULL && curengine != NULL) ||
2783 (cfg_scope != NULL && curengine == NULL) ||
2784 strcmp(curengine, cfg_scope))
2785 goto out;
2786
2787 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2788 if (!*args[1]) {
2789 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2790 file, linenum);
2791 err_code |= ERR_ALERT | ERR_ABORT;
2792 goto out;
2793 }
2794 if (*args[2]) {
2795 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2796 file, linenum, args[2]);
2797 err_code |= ERR_ALERT | ERR_ABORT;
2798 goto out;
2799 }
2800
2801 err = invalid_char(args[1]);
2802 if (err) {
2803 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2804 file, linenum, *err, args[0], args[1]);
2805 err_code |= ERR_ALERT | ERR_ABORT;
2806 goto out;
2807 }
2808
2809 list_for_each_entry(msg, &curmsgs, list) {
2810 if (!strcmp(msg->id, args[1])) {
2811 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2812 " name as another one declared at %s:%d.\n",
2813 file, linenum, args[1], msg->conf.file, msg->conf.line);
2814 err_code |= ERR_ALERT | ERR_FATAL;
2815 goto out;
2816 }
2817 }
2818
2819 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2820 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2821 err_code |= ERR_ALERT | ERR_ABORT;
2822 goto out;
2823 }
2824
2825 curmsg->id = strdup(args[1]);
2826 curmsg->id_len = strlen(curmsg->id);
2827 curmsg->event = SPOE_EV_NONE;
2828 curmsg->conf.file = strdup(file);
2829 curmsg->conf.line = linenum;
2830 LIST_INIT(&curmsg->args);
2831 LIST_ADDQ(&curmsgs, &curmsg->list);
2832 }
2833 else if (!strcmp(args[0], "args")) {
2834 int cur_arg = 1;
2835
2836 curproxy->conf.args.ctx = ARGC_SPOE;
2837 curproxy->conf.args.file = file;
2838 curproxy->conf.args.line = linenum;
2839 while (*args[cur_arg]) {
2840 char *delim = strchr(args[cur_arg], '=');
2841 int idx = 0;
2842
2843 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2844 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2845 err_code |= ERR_ALERT | ERR_ABORT;
2846 goto out;
2847 }
2848
2849 if (!delim) {
2850 arg->name = NULL;
2851 arg->name_len = 0;
2852 delim = args[cur_arg];
2853 }
2854 else {
2855 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2856 arg->name_len = delim - args[cur_arg];
2857 delim++;
2858 }
2859
2860 arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
2861 if (arg->expr == NULL) {
2862 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2863 err_code |= ERR_ALERT | ERR_FATAL;
2864 free(arg->name);
2865 free(arg);
2866 goto out;
2867 }
2868 LIST_ADDQ(&curmsg->args, &arg->list);
2869 cur_arg++;
2870 }
2871 curproxy->conf.args.file = NULL;
2872 curproxy->conf.args.line = 0;
2873 }
2874 else if (!strcmp(args[0], "event")) {
2875 if (!*args[1]) {
2876 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2877 err_code |= ERR_ALERT | ERR_ABORT;
2878 goto out;
2879 }
2880 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2881 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2882 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2883 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2884
2885 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2886 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2887 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2888 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2889 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2890 curmsg->event = SPOE_EV_ON_TCP_RSP;
2891
2892 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2893 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2894 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2895 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2896 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2897 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2898 else {
2899 Alert("parsing [%s:%d] : unkown event '%s'.\n",
2900 file, linenum, args[1]);
2901 err_code |= ERR_ALERT | ERR_ABORT;
2902 goto out;
2903 }
2904 }
2905 else if (!*args[0]) {
2906 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
2907 file, linenum, args[0]);
2908 err_code |= ERR_ALERT | ERR_FATAL;
2909 goto out;
2910 }
2911 out:
2912 free(errmsg);
2913 return err_code;
2914}
2915
2916/* Return -1 on error, else 0 */
2917static int
2918parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
2919 struct flt_conf *fconf, char **err, void *private)
2920{
2921 struct list backup_sections;
2922 struct spoe_config *conf;
2923 struct spoe_message *msg, *msgback;
2924 struct spoe_msg_placeholder *mp, *mpback;
2925 char *file = NULL, *engine = NULL;
2926 int ret, pos = *cur_arg + 1;
2927
2928 conf = calloc(1, sizeof(*conf));
2929 if (conf == NULL) {
2930 memprintf(err, "%s: out of memory", args[*cur_arg]);
2931 goto error;
2932 }
2933 conf->proxy = px;
2934
2935 while (*args[pos]) {
2936 if (!strcmp(args[pos], "config")) {
2937 if (!*args[pos+1]) {
2938 memprintf(err, "'%s' : '%s' option without value",
2939 args[*cur_arg], args[pos]);
2940 goto error;
2941 }
2942 file = args[pos+1];
2943 pos += 2;
2944 }
2945 else if (!strcmp(args[pos], "engine")) {
2946 if (!*args[pos+1]) {
2947 memprintf(err, "'%s' : '%s' option without value",
2948 args[*cur_arg], args[pos]);
2949 goto error;
2950 }
2951 engine = args[pos+1];
2952 pos += 2;
2953 }
2954 else {
2955 memprintf(err, "unknown keyword '%s'", args[pos]);
2956 goto error;
2957 }
2958 }
2959 if (file == NULL) {
2960 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
2961 goto error;
2962 }
2963
2964 /* backup sections and register SPOE sections */
2965 LIST_INIT(&backup_sections);
2966 cfg_backup_sections(&backup_sections);
2967 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
2968 cfg_register_section("spoe-message", cfg_parse_spoe_message);
2969
2970 /* Parse SPOE filter configuration file */
2971 curengine = engine;
2972 curproxy = px;
2973 curagent = NULL;
2974 curmsg = NULL;
2975 ret = readcfgfile(file);
2976 curproxy = NULL;
2977
2978 /* unregister SPOE sections and restore previous sections */
2979 cfg_unregister_sections();
2980 cfg_restore_sections(&backup_sections);
2981
2982 if (ret == -1) {
2983 memprintf(err, "Could not open configuration file %s : %s",
2984 file, strerror(errno));
2985 goto error;
2986 }
2987 if (ret & (ERR_ABORT|ERR_FATAL)) {
2988 memprintf(err, "Error(s) found in configuration file %s", file);
2989 goto error;
2990 }
2991
2992 /* Check SPOE agent */
2993 if (curagent == NULL) {
2994 memprintf(err, "No SPOE agent found in file %s", file);
2995 goto error;
2996 }
2997 if (curagent->b.name == NULL) {
2998 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
2999 curagent->id, curagent->conf.file, curagent->conf.line);
3000 goto error;
3001 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003002 if (curagent->timeout.hello == TICK_ETERNITY ||
3003 curagent->timeout.idle == TICK_ETERNITY ||
3004 curagent->timeout.ack == TICK_ETERNITY ||
3005 curagent->timeout.processing == TICK_ETERNITY) {
3006 if (curagent->timeout.ack == TICK_ETERNITY)
3007 curagent->timeout.ack = curagent->timeout.idle;
3008
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003009 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3010 " | While not properly invalid, you will certainly encounter various problems\n"
3011 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Fauletf7a30922016-11-10 15:04:51 +01003012 " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003013 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3014 }
3015 if (curagent->var_pfx == NULL) {
3016 char *tmp = curagent->id;
3017
3018 while (*tmp) {
3019 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3020 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3021 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3022 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3023 goto error;
3024 }
3025 tmp++;
3026 }
3027 curagent->var_pfx = strdup(curagent->id);
3028 }
3029
3030 if (LIST_ISEMPTY(&curmps)) {
3031 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3032 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3033 goto finish;
3034 }
3035
3036 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3037 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3038 if (!strcmp(msg->id, mp->id)) {
3039 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3040 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3041 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3042 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3043 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3044 }
3045 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3046 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3047 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3048 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3049 px->id, msg->conf.file, msg->conf.line);
3050 goto next;
3051 }
3052 if (msg->event == SPOE_EV_NONE) {
3053 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3054 px->id, msg->conf.file, msg->conf.line);
3055 goto next;
3056 }
3057 msg->agent = curagent;
3058 LIST_DEL(&msg->list);
3059 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3060 goto next;
3061 }
3062 }
3063 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3064 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3065 goto error;
3066 next:
3067 continue;
3068 }
3069
3070 finish:
3071 conf->agent = curagent;
3072 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3073 LIST_DEL(&mp->list);
3074 release_spoe_msg_placeholder(mp);
3075 }
3076 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3077 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3078 px->id, msg->id, msg->conf.file, msg->conf.line);
3079 LIST_DEL(&msg->list);
3080 release_spoe_message(msg);
3081 }
3082
3083 *cur_arg = pos;
3084 fconf->ops = &spoe_ops;
3085 fconf->conf = conf;
3086 return 0;
3087
3088 error:
3089 release_spoe_agent(curagent);
3090 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3091 LIST_DEL(&mp->list);
3092 release_spoe_msg_placeholder(mp);
3093 }
3094 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3095 LIST_DEL(&msg->list);
3096 release_spoe_message(msg);
3097 }
3098 free(conf);
3099 return -1;
3100}
3101
3102
3103/* Declare the filter parser for "spoe" keyword */
3104static struct flt_kw_list flt_kws = { "SPOE", { }, {
3105 { "spoe", parse_spoe_flt, NULL },
3106 { NULL, NULL, NULL },
3107 }
3108};
3109
3110__attribute__((constructor))
3111static void __spoe_init(void)
3112{
3113 flt_register_keywords(&flt_kws);
3114
3115 LIST_INIT(&curmsgs);
3116 LIST_INIT(&curmps);
3117 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3118}
3119
3120__attribute__((destructor))
3121static void
3122__spoe_deinit(void)
3123{
3124 pool_destroy2(pool2_spoe_ctx);
3125}