blob: 06d6e5d250fdb0d1b829c0f286d68474540ad3af [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
33#include <proto/frontend.h>
34#include <proto/log.h>
35#include <proto/proto_http.h>
36#include <proto/proxy.h>
37#include <proto/sample.h>
38#include <proto/session.h>
39#include <proto/signal.h>
40#include <proto/stream.h>
41#include <proto/stream_interface.h>
42#include <proto/task.h>
43#include <proto/vars.h>
44
45#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
46#define SPOE_PRINTF(x...) fprintf(x)
47#else
48#define SPOE_PRINTF(x...)
49#endif
50
51/* Helper to get ctx inside an appctx */
52#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
53
54/* TODO: add an option to customize these values */
55/* The maximum number of new applet waiting the end of the hello handshake */
56#define MAX_NEW_SPOE_APPLETS 5
57
58/* The maximum number of error when a stream is waiting of a SPOE applet */
59#define MAX_NEW_SPOE_APPLET_ERRS 3
60
61/* Minimal size for a frame */
62#define MIN_FRAME_SIZE 256
63
Christopher Fauletea62c2a2016-11-14 10:54:21 +010064/* Flags set on the SPOE agent */
65#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
66
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020067/* Flags set on the SPOE context */
68#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
69#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
70#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
71#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
72
73#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
74
75#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
76#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
77
78/* All possible states for a SPOE context */
79enum spoe_ctx_state {
80 SPOE_CTX_ST_NONE = 0,
81 SPOE_CTX_ST_READY,
82 SPOE_CTX_ST_SENDING_MSGS,
83 SPOE_CTX_ST_WAITING_ACK,
84 SPOE_CTX_ST_DONE,
85 SPOE_CTX_ST_ERROR,
86};
87
88/* All possible states for a SPOE applet */
89enum spoe_appctx_state {
90 SPOE_APPCTX_ST_CONNECT = 0,
91 SPOE_APPCTX_ST_CONNECTING,
92 SPOE_APPCTX_ST_PROCESSING,
93 SPOE_APPCTX_ST_DISCONNECT,
94 SPOE_APPCTX_ST_DISCONNECTING,
95 SPOE_APPCTX_ST_EXIT,
96 SPOE_APPCTX_ST_END,
97};
98
99/* All supported SPOE actions */
100enum spoe_action_type {
101 SPOE_ACT_T_SET_VAR = 1,
102 SPOE_ACT_T_UNSET_VAR,
103 SPOE_ACT_TYPES,
104};
105
106/* All supported SPOE events */
107enum spoe_event {
108 SPOE_EV_NONE = 0,
109
110 /* Request events */
111 SPOE_EV_ON_CLIENT_SESS = 1,
112 SPOE_EV_ON_TCP_REQ_FE,
113 SPOE_EV_ON_TCP_REQ_BE,
114 SPOE_EV_ON_HTTP_REQ_FE,
115 SPOE_EV_ON_HTTP_REQ_BE,
116
117 /* Response events */
118 SPOE_EV_ON_SERVER_SESS,
119 SPOE_EV_ON_TCP_RSP,
120 SPOE_EV_ON_HTTP_RSP,
121
122 SPOE_EV_EVENTS
123};
124
125/* Errors triggerd by SPOE applet */
126enum spoe_frame_error {
127 SPOE_FRM_ERR_NONE = 0,
128 SPOE_FRM_ERR_IO,
129 SPOE_FRM_ERR_TOUT,
130 SPOE_FRM_ERR_TOO_BIG,
131 SPOE_FRM_ERR_INVALID,
132 SPOE_FRM_ERR_NO_VSN,
133 SPOE_FRM_ERR_NO_FRAME_SIZE,
134 SPOE_FRM_ERR_NO_CAP,
135 SPOE_FRM_ERR_BAD_VSN,
136 SPOE_FRM_ERR_BAD_FRAME_SIZE,
137 SPOE_FRM_ERR_UNKNOWN = 99,
138 SPOE_FRM_ERRS,
139};
140
141/* Scopes used for variables set by agents. It is a way to be agnotic to vars
142 * scope. */
143enum spoe_vars_scope {
144 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
145 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
146 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
147 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
148 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
149};
150
151
152/* Describe an argument that will be linked to a message. It is a sample fetch,
153 * with an optional name. */
154struct spoe_arg {
155 char *name; /* Name of the argument, may be NULL */
156 unsigned int name_len; /* The name length, 0 if NULL */
157 struct sample_expr *expr; /* Sample expression */
158 struct list list; /* Used to chain SPOE args */
159};
160
161/* Used during the config parsing only because, when a SPOE agent section is
162 * parsed, messages can be undefined. */
163struct spoe_msg_placeholder {
164 char *id; /* SPOE message placeholder id */
165 struct list list; /* Use to chain SPOE message placeholders */
166};
167
168/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
169 * an argument list (see above) and it is linked to a specific event. */
170struct spoe_message {
171 char *id; /* SPOE message id */
172 unsigned int id_len; /* The message id length */
173 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
174 struct {
175 char *file; /* file where the SPOE message appears */
176 int line; /* line where the SPOE message appears */
177 } conf; /* config information */
178 struct list args; /* Arguments added when the SPOE messages is sent */
179 struct list list; /* Used to chain SPOE messages */
180
181 enum spoe_event event; /* SPOE_EV_* */
182};
183
184/* Describe a SPOE agent. */
185struct spoe_agent {
186 char *id; /* SPOE agent id (name) */
187 struct {
188 char *file; /* file where the SPOE agent appears */
189 int line; /* line where the SPOE agent appears */
190 } conf; /* config information */
191 union {
192 struct proxy *be; /* Backend used by this agent */
193 char *name; /* Backend name used during conf parsing */
194 } b;
195 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100196 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
197 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100198 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200199 } timeout;
200
201 char *var_pfx; /* Prefix used for vars set by the agent */
Christopher Fauletea62c2a2016-11-14 10:54:21 +0100202 unsigned int flags; /* SPOE_FL_* */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200203
204 struct list cache; /* List used to cache SPOE streams. In
205 * fact, we cache the SPOE applect ctx */
206
207 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
208 * for each supported events */
209
210 struct list applet_wq; /* List of streams waiting for a SPOE applet */
211 unsigned int new_applets; /* The number of new SPOE applets */
212};
213
214/* SPOE filter configuration */
215struct spoe_config {
216 struct proxy *proxy; /* Proxy owning the filter */
217 struct spoe_agent *agent; /* Agent used by this filter */
218 struct proxy agent_fe; /* Agent frontend */
219};
220
221/* SPOE context attached to a stream. It is the main structure that handles the
222 * processing offload */
223struct spoe_context {
224 struct filter *filter; /* The SPOE filter */
225 struct stream *strm; /* The stream that should be offloaded */
226 struct appctx *appctx; /* The SPOE appctx */
227 struct list *messages; /* List of messages that will be sent during the stream processing */
228 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
229 struct list buffer_wait; /* position in the list of streams waiting for a buffer */
230 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
231
232 unsigned int errs; /* The number of errors to acquire a SPOE applet */
233
234 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
235 unsigned int flags; /* SPOE_CTX_FL_* */
236
237 unsigned int stream_id; /* stream_id and frame_id are used */
238 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100239 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200240};
241
242/* Set if the handle on SIGUSR1 is registered */
243static int sighandler_registered = 0;
244
245/* proxy used during the parsing */
246struct proxy *curproxy = NULL;
247
248/* The name of the SPOE engine, used during the parsing */
249char *curengine = NULL;
250
251/* SPOE agent used during the parsing */
252struct spoe_agent *curagent = NULL;
253
254/* SPOE message used during the parsing */
255struct spoe_message *curmsg = NULL;
256
257/* list of SPOE messages and placeholders used during the parsing */
258struct list curmsgs;
259struct list curmps;
260
261/* Pool used to allocate new SPOE contexts */
262static struct pool_head *pool2_spoe_ctx = NULL;
263
264/* Temporary variables used to ease error processing */
265int spoe_status_code = SPOE_FRM_ERR_NONE;
266char spoe_reason[256];
267
268struct flt_ops spoe_ops;
269
270static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
271static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
272static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
273
274/********************************************************************
275 * helper functions/globals
276 ********************************************************************/
277static void
278release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
279{
280 if (!mp)
281 return;
282 free(mp->id);
283 free(mp);
284}
285
286
287static void
288release_spoe_message(struct spoe_message *msg)
289{
290 struct spoe_arg *arg, *back;
291
292 if (!msg)
293 return;
294 free(msg->id);
295 free(msg->conf.file);
296 list_for_each_entry_safe(arg, back, &msg->args, list) {
297 release_sample_expr(arg->expr);
298 free(arg->name);
299 LIST_DEL(&arg->list);
300 free(arg);
301 }
302 free(msg);
303}
304
305static void
306release_spoe_agent(struct spoe_agent *agent)
307{
308 struct spoe_message *msg, *back;
309 int i;
310
311 if (!agent)
312 return;
313 free(agent->id);
314 free(agent->conf.file);
315 free(agent->var_pfx);
316 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
317 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
318 LIST_DEL(&msg->list);
319 release_spoe_message(msg);
320 }
321 }
322 free(agent);
323}
324
325static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
326 [SPOE_FRM_ERR_NONE] = "normal",
327 [SPOE_FRM_ERR_IO] = "I/O error",
328 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
329 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
330 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
331 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
332 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
333 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
334 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
335 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
336 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
337};
338
339static const char *spoe_event_str[SPOE_EV_EVENTS] = {
340 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
341 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
342 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
343 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
344 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
345
346 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
347 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
348 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
349};
350
351
352#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
353
354static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
355 [SPOE_CTX_ST_NONE] = "NONE",
356 [SPOE_CTX_ST_READY] = "READY",
357 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
358 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
359 [SPOE_CTX_ST_DONE] = "DONE",
360 [SPOE_CTX_ST_ERROR] = "ERROR",
361};
362
363static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
364 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
365 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
366 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
367 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
368 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
369 [SPOE_APPCTX_ST_EXIT] = "EXIT",
370 [SPOE_APPCTX_ST_END] = "END",
371};
372
373#endif
374/********************************************************************
375 * Functions that encode/decode SPOE frames
376 ********************************************************************/
377/* Frame Types sent by HAProxy and by agents */
378enum spoe_frame_type {
379 /* Frames sent by HAProxy */
380 SPOE_FRM_T_HAPROXY_HELLO = 1,
381 SPOE_FRM_T_HAPROXY_DISCON,
382 SPOE_FRM_T_HAPROXY_NOTIFY,
383
384 /* Frames sent by the agents */
385 SPOE_FRM_T_AGENT_HELLO = 101,
386 SPOE_FRM_T_AGENT_DISCON,
387 SPOE_FRM_T_AGENT_ACK
388};
389
390/* All supported data types */
391enum spoe_data_type {
392 SPOE_DATA_T_NULL = 0,
393 SPOE_DATA_T_BOOL,
394 SPOE_DATA_T_INT32,
395 SPOE_DATA_T_UINT32,
396 SPOE_DATA_T_INT64,
397 SPOE_DATA_T_UINT64,
398 SPOE_DATA_T_IPV4,
399 SPOE_DATA_T_IPV6,
400 SPOE_DATA_T_STR,
401 SPOE_DATA_T_BIN,
402 SPOE_DATA_TYPES
403};
404
405/* Masks to get data type or flags value */
406#define SPOE_DATA_T_MASK 0x0F
407#define SPOE_DATA_FL_MASK 0xF0
408
409/* Flags to set Boolean values */
410#define SPOE_DATA_FL_FALSE 0x00
411#define SPOE_DATA_FL_TRUE 0x10
412
413/* Helper to get static string length, excluding the terminating null byte */
414#define SLEN(str) (sizeof(str)-1)
415
416/* Predefined key used in HELLO/DISCONNECT frames */
417#define SUPPORTED_VERSIONS_KEY "supported-versions"
418#define VERSION_KEY "version"
419#define MAX_FRAME_SIZE_KEY "max-frame-size"
420#define CAPABILITIES_KEY "capabilities"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100421#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200422#define STATUS_CODE_KEY "status-code"
423#define MSG_KEY "message"
424
425struct spoe_version {
426 char *str;
427 int min;
428 int max;
429};
430
431/* All supported versions */
432static struct spoe_version supported_versions[] = {
433 {"1.0", 1000, 1000},
434 {NULL, 0, 0}
435};
436
437/* Comma-separated list of supported versions */
438#define SUPPORTED_VERSIONS_VAL "1.0"
439
440/* Comma-separated list of supported capabilities (none for now) */
441#define CAPABILITIES_VAL ""
442
443static int
444decode_spoe_version(const char *str, size_t len)
445{
446 char tmp[len+1], *start, *end;
447 double d;
448 int vsn = -1;
449
450 memset(tmp, 0, len+1);
451 memcpy(tmp, str, len);
452
453 start = tmp;
454 while (isspace(*start))
455 start++;
456
457 d = strtod(start, &end);
458 if (d == 0 || start == end)
459 goto out;
460
461 if (*end) {
462 while (isspace(*end))
463 end++;
464 if (*end)
465 goto out;
466 }
467 vsn = (int)(d * 1000);
468 out:
469 return vsn;
470}
471
472/* Encode a variable-length integer. This function never fails and returns the
473 * number of written bytes. */
474static int
475encode_spoe_varint(uint64_t i, char *buf)
476{
477 int idx;
478
479 if (i < 240) {
480 buf[0] = (unsigned char)i;
481 return 1;
482 }
483
484 buf[0] = (unsigned char)i | 240;
485 i = (i - 240) >> 4;
486 for (idx = 1; i >= 128; ++idx) {
487 buf[idx] = (unsigned char)i | 128;
488 i = (i - 128) >> 7;
489 }
490 buf[idx++] = (unsigned char)i;
491 return idx;
492}
493
494/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
495 * happens when the buffer's end in reached. On success, the number of read
496 * bytes is returned. */
497static int
498decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
499{
500 unsigned char *msg = (unsigned char *)buf;
501 int idx = 0;
502
503 if (msg > (unsigned char *)end)
504 return -1;
505
506 if (msg[0] < 240) {
507 *i = msg[0];
508 return 1;
509 }
510 *i = msg[0];
511 do {
512 ++idx;
513 if (msg+idx > (unsigned char *)end)
514 return -1;
515 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
516 } while (msg[idx] >= 128);
517 return (idx + 1);
518}
519
520/* Encode a string. The string will be prefix by its length, encoded as a
521 * variable-length integer. This function never fails and returns the number of
522 * written bytes. */
523static int
524encode_spoe_string(const char *str, size_t len, char *dst)
525{
526 int idx = 0;
527
528 if (!len) {
529 dst[0] = 0;
530 return 1;
531 }
532
533 idx += encode_spoe_varint(len, dst);
534 memcpy(dst+idx, str, len);
535 return (idx + len);
536}
537
538/* Decode a string. Its length is decoded first as a variable-length integer. If
539 * it succeeds, and if the string length is valid, the begin of the string is
540 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
541 * read is returned. If an error occurred, -1 is returned and <*str> remains
542 * NULL. */
543static int
544decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
545{
546 int i, idx = 0;
547
548 *str = NULL;
549 *len = 0;
550
551 if ((i = decode_spoe_varint(buf, end, len)) == -1)
552 goto error;
553 idx += i;
554 if (buf + idx + *len > end)
555 goto error;
556
557 *str = buf+idx;
558 return (idx + *len);
559
560 error:
561 return -1;
562}
563
564/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
565 * of bytes read is returned. A types data is composed of a type (1 byte) and
566 * corresponding data:
567 * - boolean: non additional data (0 bytes)
568 * - integers: a variable-length integer (see decode_spoe_varint)
569 * - ipv4: 4 bytes
570 * - ipv6: 16 bytes
571 * - binary and string: a buffer prefixed by its size, a variable-length
572 * integer (see decode_spoe_string) */
573static int
574skip_spoe_data(char *frame, char *end)
575{
576 uint64_t sz = 0;
577 int i, idx = 0;
578
579 if (frame > end)
580 return -1;
581
582 switch (frame[idx++] & SPOE_DATA_T_MASK) {
583 case SPOE_DATA_T_BOOL:
584 break;
585 case SPOE_DATA_T_INT32:
586 case SPOE_DATA_T_INT64:
587 case SPOE_DATA_T_UINT32:
588 case SPOE_DATA_T_UINT64:
589 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
590 return -1;
591 idx += i;
592 break;
593 case SPOE_DATA_T_IPV4:
594 idx += 4;
595 break;
596 case SPOE_DATA_T_IPV6:
597 idx += 16;
598 break;
599 case SPOE_DATA_T_STR:
600 case SPOE_DATA_T_BIN:
601 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
602 return -1;
603 idx += i + sz;
604 break;
605 }
606
607 if (frame+idx > end)
608 return -1;
609 return idx;
610}
611
612/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
613 * number of read bytes is returned. See skip_spoe_data for details. */
614static int
615decode_spoe_data(char *frame, char *end, struct sample *smp)
616{
617 uint64_t sz = 0;
618 int type, i, idx = 0;
619
620 if (frame > end)
621 return -1;
622
623 type = frame[idx++];
624 switch (type & SPOE_DATA_T_MASK) {
625 case SPOE_DATA_T_BOOL:
626 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
627 smp->data.type = SMP_T_BOOL;
628 break;
629 case SPOE_DATA_T_INT32:
630 case SPOE_DATA_T_INT64:
631 case SPOE_DATA_T_UINT32:
632 case SPOE_DATA_T_UINT64:
633 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
634 return -1;
635 idx += i;
636 smp->data.type = SMP_T_SINT;
637 break;
638 case SPOE_DATA_T_IPV4:
639 if (frame+idx+4 > end)
640 return -1;
641 memcpy(&smp->data.u.ipv4, frame+idx, 4);
642 smp->data.type = SMP_T_IPV4;
643 idx += 4;
644 break;
645 case SPOE_DATA_T_IPV6:
646 if (frame+idx+16 > end)
647 return -1;
648 memcpy(&smp->data.u.ipv6, frame+idx, 16);
649 smp->data.type = SMP_T_IPV6;
650 idx += 16;
651 break;
652 case SPOE_DATA_T_STR:
653 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
654 return -1;
655 idx += i;
656 if (frame+idx+sz > end)
657 return -1;
658 smp->data.u.str.str = frame+idx;
659 smp->data.u.str.len = sz;
660 smp->data.type = SMP_T_STR;
661 idx += sz;
662 break;
663 case SPOE_DATA_T_BIN:
664 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
665 return -1;
666 idx += i;
667 if (frame+idx+sz > end)
668 return -1;
669 smp->data.u.str.str = frame+idx;
670 smp->data.u.str.len = sz;
671 smp->data.type = SMP_T_BIN;
672 idx += sz;
673 break;
674 }
675
676 if (frame+idx > end)
677 return -1;
678 return idx;
679}
680
681/* Skip an action in a frame received from an agent. If an error occurred, -1 is
682 * returned, otherwise the number of read bytes is returned. An action is
683 * composed of the action type followed by a typed data. */
684static int
685skip_spoe_action(char *frame, char *end)
686{
687 int n, i, idx = 0;
688
689 if (frame+2 > end)
690 return -1;
691
692 idx++; /* Skip the action type */
693 n = frame[idx++];
694 while (n-- > 0) {
695 if ((i = skip_spoe_data(frame+idx, end)) == -1)
696 return -1;
697 idx += i;
698 }
699
700 if (frame+idx > end)
701 return -1;
702 return idx;
703}
704
705/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
706 * success, 0 if the frame can be ignored and -1 if an error occurred. */
707static int
708prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
709{
710 int idx = 0;
711 size_t max = (7 /* TYPE + METADATA */
712 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
713 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
714 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
715
716 if (size < max)
717 return -1;
718
719 /* Frame type */
720 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
721
722 /* No flags for now */
723 memset(frame+idx, 0, 4);
724 idx += 4;
725
726 /* No stream-id and frame-id for HELLO frames */
727 frame[idx++] = 0;
728 frame[idx++] = 0;
729
730 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
731 * and "capabilities" */
732
733 /* "supported-versions" K/V item */
734 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
735 frame[idx++] = SPOE_DATA_T_STR;
736 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
737
738 /* "max-fram-size" K/V item */
739 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
740 frame[idx++] = SPOE_DATA_T_UINT32;
741 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
742
743 /* "capabilities" K/V item */
744 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
745 frame[idx++] = SPOE_DATA_T_STR;
746 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
747
748 return idx;
749}
750
751/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
752 * size on success, 0 if the frame can be ignored and -1 if an error
753 * occurred. */
754static int
755prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
756{
757 const char *reason;
758 int rlen, idx = 0;
759 size_t max = (7 /* TYPE + METADATA */
760 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
761 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
762
763 if (size < max)
764 return -1;
765
766 /* Get the message corresponding to the status code */
767 if (spoe_status_code >= SPOE_FRM_ERRS)
768 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
769 reason = spoe_frm_err_reasons[spoe_status_code];
770 rlen = strlen(reason);
771
772 /* Frame type */
773 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
774
775 /* No flags for now */
776 memset(frame+idx, 0, 4);
777 idx += 4;
778
779 /* No stream-id and frame-id for DISCONNECT frames */
780 frame[idx++] = 0;
781 frame[idx++] = 0;
782
783 /* There are 2 mandatory items: "status-code" and "message" */
784
785 /* "status-code" K/V item */
786 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
787 frame[idx++] = SPOE_DATA_T_UINT32;
788 idx += encode_spoe_varint(spoe_status_code, frame+idx);
789
790 /* "message" K/V item */
791 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
792 frame[idx++] = SPOE_DATA_T_STR;
793 idx += encode_spoe_string(reason, rlen, frame+idx);
794
795 return idx;
796}
797
798/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
799 * success, 0 if the frame can be ignored and -1 if an error occurred. */
800static int
801prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
802{
803 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
804 int idx = 0;
805
806 if (size < APPCTX_SPOE(appctx).max_frame_size)
807 return -1;
808
809 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
810
811 /* No flags for now */
812 memset(frame+idx, 0, 4);
813 idx += 4;
814
815 /* Set stream-id and frame-id */
816 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
817 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
818
819 /* Copy encoded messages */
820 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
821 idx += ctx->buffer->i;
822
823 return idx;
824}
825
826/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
827 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
828static int
829handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
830{
831 int vsn, max_frame_size;
832 int i, idx = 0;
833 size_t min_size = (7 /* TYPE + METADATA */
834 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
835 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
836 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
837
838 /* Check frame type */
839 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
840 return 0;
841
842 if (size < min_size) {
843 spoe_status_code = SPOE_FRM_ERR_INVALID;
844 return -1;
845 }
846
847 /* Skip flags: fragmentation is not supported for now */
848 idx += 4;
849
850 /* stream-id and frame-id must be cleared */
851 if (frame[idx] != 0 || frame[idx+1] != 0) {
852 spoe_status_code = SPOE_FRM_ERR_INVALID;
853 return -1;
854 }
855 idx += 2;
856
857 /* There are 3 mandatory items: "version", "max-frame-size" and
858 * "capabilities" */
859
860 /* Loop on K/V items */
861 vsn = max_frame_size = 0;
862 while (idx < size) {
863 char *str;
864 uint64_t sz;
865
866 /* Decode the item key */
867 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
868 if (str == NULL) {
869 spoe_status_code = SPOE_FRM_ERR_INVALID;
870 return -1;
871 }
872 /* Check "version" K/V item */
873 if (!memcmp(str, VERSION_KEY, sz)) {
874 /* The value must be a string */
875 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
876 spoe_status_code = SPOE_FRM_ERR_INVALID;
877 return -1;
878 }
879 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
880 if (str == NULL) {
881 spoe_status_code = SPOE_FRM_ERR_INVALID;
882 return -1;
883 }
884
885 vsn = decode_spoe_version(str, sz);
886 if (vsn == -1) {
887 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
888 return -1;
889 }
890 for (i = 0; supported_versions[i].str != NULL; ++i) {
891 if (vsn >= supported_versions[i].min &&
892 vsn <= supported_versions[i].max)
893 break;
894 }
895 if (supported_versions[i].str == NULL) {
896 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
897 return -1;
898 }
899 }
900 /* Check "max-frame-size" K/V item */
901 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
902 int type;
903
904 /* The value must be integer */
905 type = frame[idx++];
906 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
907 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
908 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
909 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
910 spoe_status_code = SPOE_FRM_ERR_INVALID;
911 return -1;
912 }
913 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
914 spoe_status_code = SPOE_FRM_ERR_INVALID;
915 return -1;
916 }
917 idx += i;
918 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
919 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
920 return -1;
921 }
922 max_frame_size = sz;
923 }
924 /* Skip "capabilities" K/V item for now */
925 else {
926 /* Silently ignore unknown item */
927 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
928 spoe_status_code = SPOE_FRM_ERR_INVALID;
929 return -1;
930 }
931 idx += i;
932 }
933 }
934
935 /* Final checks */
936 if (!vsn) {
937 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
938 return -1;
939 }
940 if (!max_frame_size) {
941 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
942 return -1;
943 }
944
945 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
946 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
947 return idx;
948}
949
950/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
951 * bytes on success, 0 if the frame can be ignored and -1 if an error
952 * occurred. */
953static int
954handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
955{
956 int i, idx = 0;
957 size_t min_size = (7 /* TYPE + METADATA */
958 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
959 + 1 + SLEN(MSG_KEY) + 1 + 1);
960
961 /* Check frame type */
962 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
963 return 0;
964
965 if (size < min_size) {
966 spoe_status_code = SPOE_FRM_ERR_INVALID;
967 return -1;
968 }
969
970 /* Skip flags: fragmentation is not supported for now */
971 idx += 4;
972
973 /* stream-id and frame-id must be cleared */
974 if (frame[idx] != 0 || frame[idx+1] != 0) {
975 spoe_status_code = SPOE_FRM_ERR_INVALID;
976 return -1;
977 }
978 idx += 2;
979
980 /* There are 2 mandatory items: "status-code" and "message" */
981
982 /* Loop on K/V items */
983 while (idx < size) {
984 char *str;
985 uint64_t sz;
986
987 /* Decode the item key */
988 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
989 if (str == NULL) {
990 spoe_status_code = SPOE_FRM_ERR_INVALID;
991 return -1;
992 }
993
994 /* Check "status-code" K/V item */
995 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
996 int type;
997
998 /* The value must be an integer */
999 type = frame[idx++];
1000 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
1001 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
1002 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1003 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1004 spoe_status_code = SPOE_FRM_ERR_INVALID;
1005 return -1;
1006 }
1007 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1008 spoe_status_code = SPOE_FRM_ERR_INVALID;
1009 return -1;
1010 }
1011 idx += i;
1012 spoe_status_code = sz;
1013 }
1014
1015 /* Check "message" K/V item */
1016 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1017 /* The value must be a string */
1018 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1019 spoe_status_code = SPOE_FRM_ERR_INVALID;
1020 return -1;
1021 }
1022 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1023 if (str == NULL || sz > 255) {
1024 spoe_status_code = SPOE_FRM_ERR_INVALID;
1025 return -1;
1026 }
1027 memcpy(spoe_reason, str, sz);
1028 spoe_reason[sz] = 0;
1029 }
1030 else {
1031 /* Silently ignore unknown item */
1032 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1033 spoe_status_code = SPOE_FRM_ERR_INVALID;
1034 return -1;
1035 }
1036 idx += i;
1037 }
1038 }
1039
1040 return idx;
1041}
1042
1043
1044/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1045 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1046static int
1047handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1048{
1049 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1050 uint64_t stream_id, frame_id;
1051 int idx = 0;
1052 size_t min_size = (7 /* TYPE + METADATA */);
1053
1054 /* Check frame type */
1055 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1056 return 0;
1057
1058 if (size < min_size) {
1059 spoe_status_code = SPOE_FRM_ERR_INVALID;
1060 return -1;
1061 }
1062
1063 /* Skip flags: fragmentation is not supported for now */
1064 idx += 4;
1065
1066 /* Get the stream-id and the frame-id */
1067 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1068 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1069
1070 /* Check stream-id and frame-id */
1071 if (ctx->stream_id != (unsigned int)stream_id ||
1072 ctx->frame_id != (unsigned int)frame_id)
1073 return 0;
1074
1075 /* Copy encoded actions */
1076 b_reset(ctx->buffer);
1077 memcpy(ctx->buffer->p, frame+idx, size-idx);
1078 ctx->buffer->i = size-idx;
1079
1080 return idx;
1081}
1082
Christopher Fauletba7bc162016-11-07 21:07:38 +01001083/* This function is used in cfgparse.c and declared in proto/checks.h. It
1084 * prepare the request to send to agents during a healthcheck. It returns 0 on
1085 * success and -1 if an error occurred. */
1086int
1087prepare_spoe_healthcheck_request(char **req, int *len)
1088{
1089 struct appctx a;
1090 char *frame, buf[global.tune.bufsize];
1091 unsigned int framesz;
1092 int idx;
1093
1094 memset(&a, 0, sizeof(a));
1095 memset(buf, 0, sizeof(buf));
1096 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1097
1098 frame = buf+4;
1099 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1100 if (idx <= 0)
1101 return -1;
1102 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1103 return -1;
1104
1105 /* "healthcheck" K/V item */
1106 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1107 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1108
1109 framesz = htonl(idx);
1110 memcpy(buf, (char *)&framesz, 4);
1111
1112 if ((*req = malloc(idx+4)) == NULL)
1113 return -1;
1114 memcpy(*req, buf, idx+4);
1115 *len = idx+4;
1116 return 0;
1117}
1118
1119/* This function is used in checks.c and declared in proto/checks.h. It decode
1120 * the response received from an agent during a healthcheck. It returns 0 on
1121 * success and -1 if an error occurred. */
1122int
1123handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1124{
1125 struct appctx a;
1126 int r;
1127
1128 memset(&a, 0, sizeof(a));
1129 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1130
1131 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1132 goto error;
1133 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1134 if (r == 0)
1135 spoe_status_code = SPOE_FRM_ERR_INVALID;
1136 goto error;
1137 }
1138
1139 return 0;
1140
1141 error:
1142 if (spoe_status_code >= SPOE_FRM_ERRS)
1143 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1144 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1145 return -1;
1146}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001147
1148/********************************************************************
1149 * Functions that manage the SPOE applet
1150 ********************************************************************/
1151/* Callback function that catches applet timeouts. If a timeout occurred, we set
1152 * <appctx->st1> flag and the SPOE applet is woken up. */
1153static struct task *
1154process_spoe_applet(struct task * task)
1155{
1156 struct appctx *appctx = task->context;
1157
1158 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1159 if (tick_is_expired(task->expire, now_ms)) {
1160 task->expire = TICK_ETERNITY;
1161 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1162 }
1163 si_applet_want_get(appctx->owner);
1164 appctx_wakeup(appctx);
1165 return task;
1166}
1167
1168/* Remove a SPOE applet from the agent cache */
1169static void
1170remove_spoe_applet_from_cache(struct appctx *appctx)
1171{
1172 struct appctx *a, *back;
1173 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1174
1175 if (LIST_ISEMPTY(&agent->cache))
1176 return;
1177
1178 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1179 if (a == appctx) {
1180 LIST_DEL(&APPCTX_SPOE(appctx).list);
1181 break;
1182 }
1183 }
1184}
1185
1186
1187/* Callback function that releases a SPOE applet. This happens when the
1188 * connection with the agent is closed. */
1189static void
1190release_spoe_applet(struct appctx *appctx)
1191{
1192 struct stream_interface *si = appctx->owner;
1193 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1194 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1195
1196 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1197 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1198 on_new_spoe_appctx_failure(agent);
1199
1200 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1201 si_shutw(si);
1202 si_shutr(si);
1203 si_ic(si)->flags |= CF_READ_NULL;
1204 appctx->st0 = SPOE_APPCTX_ST_END;
1205 }
1206
1207 if (ctx != NULL) {
1208 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1209 ctx->appctx = NULL;
1210 }
1211
1212 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1213 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1214 __FUNCTION__, appctx);
1215
1216 /* Release the task attached to the SPOE applet */
1217 if (APPCTX_SPOE(appctx).task) {
1218 task_delete(APPCTX_SPOE(appctx).task);
1219 task_free(APPCTX_SPOE(appctx).task);
1220 }
1221
1222 /* And remove it from the agent cache */
1223 remove_spoe_applet_from_cache(appctx);
1224 APPCTX_SPOE(appctx).ctx = NULL;
1225}
1226
1227/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1228 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1229 * encoded using the callback function <prepare>. */
1230static int
1231send_spoe_frame(struct appctx *appctx,
1232 int (*prepare)(struct appctx *, char *, size_t))
1233{
1234 struct stream_interface *si = appctx->owner;
1235 int framesz, ret;
1236 uint32_t netint;
1237
1238 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1239 if (ret <= 0)
1240 goto skip_or_error;
1241 framesz = ret;
1242 netint = htonl(framesz);
1243 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1244 if (ret > 0)
1245 ret = bi_putblk(si_ic(si), trash.str, framesz);
1246 if (ret <= 0) {
1247 if (ret == -1)
1248 return -1;
1249 return -2;
1250 }
1251 return 1;
1252
1253 skip_or_error:
1254 if (!ret)
1255 return -1;
1256 return -2;
1257}
1258
1259/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1260 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1261 * is decoded using the callback function <handle>. */
1262static int
1263recv_spoe_frame(struct appctx *appctx,
1264 int (*handle)(struct appctx *, char *, size_t))
1265{
1266 struct stream_interface *si = appctx->owner;
1267 int framesz, ret;
1268 uint32_t netint;
1269
1270 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1271 if (ret <= 0)
1272 goto empty_or_error;
1273 framesz = ntohl(netint);
1274 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1275 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1276 return -2;
1277 }
1278
1279 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1280 if (ret <= 0)
1281 goto empty_or_error;
1282 bo_skip(si_oc(si), ret+sizeof(netint));
1283
1284 /* First check if the received frame is a DISCONNECT frame */
1285 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1286 if (ret != 0) {
1287 if (ret > 0) {
1288 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1289 " - disconnected by peer (%d): %s\n",
1290 (int)now.tv_sec, (int)now.tv_usec,
1291 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1292 __FUNCTION__, appctx, spoe_status_code,
1293 spoe_reason);
1294 return 2;
1295 }
1296 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1297 " - error on frame (%s)\n",
1298 (int)now.tv_sec, (int)now.tv_usec,
1299 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1300 __FUNCTION__, appctx,
1301 spoe_frm_err_reasons[spoe_status_code]);
1302 return -2;
1303 }
1304 if (handle == NULL)
1305 goto out;
1306
1307 /* If not, try to decode it */
1308 ret = handle(appctx, trash.str, framesz);
1309 if (ret <= 0) {
1310 if (!ret)
1311 return -1;
1312 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1313 " - error on frame (%s)\n",
1314 (int)now.tv_sec, (int)now.tv_usec,
1315 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1316 __FUNCTION__, appctx,
1317 spoe_frm_err_reasons[spoe_status_code]);
1318 return -2;
1319 }
1320 out:
1321 return 1;
1322
1323 empty_or_error:
1324 if (!ret)
1325 return 0;
1326 spoe_status_code = SPOE_FRM_ERR_IO;
1327 return -2;
1328}
1329
1330/* I/O Handler processing messages exchanged with the agent */
1331static void
1332handle_spoe_applet(struct appctx *appctx)
1333{
1334 struct stream_interface *si = appctx->owner;
1335 struct stream *s = si_strm(si);
1336 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1337 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1338 int ret;
1339
1340 switchstate:
1341 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1342 " - appctx-state=%s\n",
1343 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1344 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1345
1346 switch (appctx->st0) {
1347 case SPOE_APPCTX_ST_CONNECT:
1348 spoe_status_code = SPOE_FRM_ERR_NONE;
1349 if (si->state <= SI_ST_CON) {
1350 si_applet_want_put(si);
1351 task_wakeup(s->task, TASK_WOKEN_MSG);
1352 break;
1353 }
1354 else if (si->state != SI_ST_EST) {
1355 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1356 on_new_spoe_appctx_failure(agent);
1357 goto switchstate;
1358 }
1359 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1360 if (ret < 0) {
1361 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1362 on_new_spoe_appctx_failure(agent);
1363 goto switchstate;
1364 }
1365 else if (!ret)
1366 goto full;
1367
1368 /* Hello frame was sent. Set the hello timeout and
1369 * wait for the reply. */
1370 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1371 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1372 /* fall through */
1373
1374 case SPOE_APPCTX_ST_CONNECTING:
1375 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1376 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1377 on_new_spoe_appctx_failure(agent);
1378 goto switchstate;
1379 }
1380 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1381 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1382 " - Connection timed out\n",
1383 (int)now.tv_sec, (int)now.tv_usec,
1384 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1385 __FUNCTION__, appctx);
1386 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1387 on_new_spoe_appctx_failure(agent);
1388 goto switchstate;
1389 }
1390 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1391 if (ret < 0) {
1392 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1393 on_new_spoe_appctx_failure(agent);
1394 goto switchstate;
1395 }
1396 if (ret == 2) {
1397 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1398 on_new_spoe_appctx_failure(agent);
1399 goto switchstate;
1400 }
1401 if (!ret)
1402 goto out;
1403
1404 /* hello handshake is finished, set the idle timeout,
1405 * Add the appctx in the agent cache, decrease the
1406 * number of new applets and wake up waiting streams. */
1407 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1408 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1409 on_new_spoe_appctx_success(agent, appctx);
1410 break;
1411
1412 case SPOE_APPCTX_ST_PROCESSING:
1413 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1414 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1415 goto switchstate;
1416 }
1417 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1418 spoe_status_code = SPOE_FRM_ERR_TOUT;
1419 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1420 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1421 goto switchstate;
1422 }
1423 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1424 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1425 if (ret < 0) {
1426 if (ret == -1) {
1427 ctx->state = SPOE_CTX_ST_ERROR;
1428 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1429 goto skip_notify_frame;
1430 }
1431 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1432 goto switchstate;
1433 }
1434 else if (!ret)
1435 goto full;
1436 ctx->state = SPOE_CTX_ST_WAITING_ACK;
Christopher Faulet03a34492016-11-19 16:47:56 +01001437 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001438 }
1439
1440 skip_notify_frame:
1441 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1442 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1443 if (ret < 0) {
1444 if (ret == -1)
1445 goto skip_notify_frame;
1446 ctx->state = SPOE_CTX_ST_ERROR;
1447 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1448 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1449 goto switchstate;
1450 }
1451 if (!ret)
1452 goto out;
1453 if (ret == 2) {
1454 ctx->state = SPOE_CTX_ST_ERROR;
1455 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1456 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1457 goto switchstate;
1458 }
1459 ctx->state = SPOE_CTX_ST_DONE;
1460 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1461 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1462 }
1463 else {
1464 if (stopping) {
1465 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1466 goto switchstate;
1467 }
1468
1469 ret = recv_spoe_frame(appctx, NULL);
1470 if (ret < 0) {
1471 if (ret == -1)
1472 goto skip_notify_frame;
1473 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1474 goto switchstate;
1475 }
1476 if (!ret)
1477 goto out;
1478 if (ret == 2) {
1479 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1480 goto switchstate;
1481 }
1482 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1483 }
1484 break;
1485
1486 case SPOE_APPCTX_ST_DISCONNECT:
1487 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1488 if (ret < 0) {
1489 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1490 goto switchstate;
1491 }
1492 else if (!ret)
1493 goto full;
1494 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1495 " - disconnected by HAProxy (%d): %s\n",
1496 (int)now.tv_sec, (int)now.tv_usec,
1497 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1498 __FUNCTION__, appctx, spoe_status_code,
1499 spoe_frm_err_reasons[spoe_status_code]);
1500
Christopher Faulet03a34492016-11-19 16:47:56 +01001501 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001502 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1503 /* fall through */
1504
1505 case SPOE_APPCTX_ST_DISCONNECTING:
1506 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1507 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1508 goto switchstate;
1509 }
1510 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1511 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1512 goto switchstate;
1513 }
1514 ret = recv_spoe_frame(appctx, NULL);
1515 if (ret < 0 || ret == 2) {
1516 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1517 goto switchstate;
1518 }
1519 break;
1520
1521 case SPOE_APPCTX_ST_EXIT:
1522 si_shutw(si);
1523 si_shutr(si);
1524 si_ic(si)->flags |= CF_READ_NULL;
1525 appctx->st0 = SPOE_APPCTX_ST_END;
1526 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1527 /* fall through */
1528
1529 case SPOE_APPCTX_ST_END:
1530 break;
1531 }
1532
1533 out:
1534 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1535 task_queue(APPCTX_SPOE(appctx).task);
1536 si_oc(si)->flags |= CF_READ_DONTWAIT;
1537 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1538 return;
1539 full:
1540 si_applet_cant_put(si);
1541 goto out;
1542}
1543
1544struct applet spoe_applet = {
1545 .obj_type = OBJ_TYPE_APPLET,
1546 .name = "<SPOE>", /* used for logging */
1547 .fct = handle_spoe_applet,
1548 .release = release_spoe_applet,
1549};
1550
1551/* Create a SPOE applet. On success, the created applet is returned, else
1552 * NULL. */
1553static struct appctx *
1554create_spoe_appctx(struct spoe_config *conf)
1555{
1556 struct appctx *appctx;
1557 struct session *sess;
1558 struct task *task;
1559 struct stream *strm;
1560 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1561 struct listener *, by_fe);
1562
1563 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1564 goto out_error;
1565
1566 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1567 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1568 goto out_free_appctx;
1569 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1570 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1571 APPCTX_SPOE(appctx).task->context = appctx;
1572 APPCTX_SPOE(appctx).agent = conf->agent;
1573 APPCTX_SPOE(appctx).ctx = NULL;
1574 APPCTX_SPOE(appctx).version = 0;
1575 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1576 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1577
1578 sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1579 if (!sess)
1580 goto out_free_spoe;
1581
1582 if ((task = task_new()) == NULL)
1583 goto out_free_sess;
1584
1585 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1586 goto out_free_task;
1587
1588 strm->target = sess->listener->default_target;
1589 strm->req.analysers |= sess->listener->analysers;
1590 stream_set_backend(strm, conf->agent->b.be);
1591
1592 /* applet is waiting for data */
1593 si_applet_cant_get(&strm->si[0]);
1594 appctx_wakeup(appctx);
1595
1596 /* Increase the number of applets waiting the end of the hello
1597 * handshake. */
1598 conf->agent->new_applets++;
1599
1600 strm->do_log = NULL;
1601 strm->res.flags |= CF_READ_DONTWAIT;
1602
1603 conf->agent_fe.feconn++;
1604 jobs++;
1605 totalconn++;
1606
1607 return appctx;
1608
1609 /* Error unrolling */
1610 out_free_task:
1611 task_free(task);
1612 out_free_sess:
1613 session_free(sess);
1614 out_free_spoe:
1615 task_free(APPCTX_SPOE(appctx).task);
1616 out_free_appctx:
1617 appctx_free(appctx);
1618 out_error:
1619 return NULL;
1620}
1621
1622/* Wake up a SPOE applet attached to a SPOE context. */
1623static void
1624wakeup_spoe_appctx(struct spoe_context *ctx)
1625{
1626 if (ctx->appctx == NULL)
1627 return;
1628 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1629 si_applet_want_get(ctx->appctx->owner);
1630 si_applet_want_put(ctx->appctx->owner);
1631 appctx_wakeup(ctx->appctx);
1632 }
1633}
1634
1635
1636/* Run across the list of pending streams waiting for a SPOE applet and wake the
1637 * first. */
1638static void
1639offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1640{
1641 struct spoe_context *ctx;
1642
Christopher Fauletf7a30922016-11-10 15:04:51 +01001643 if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1644 return;
1645
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001646 if (LIST_ISEMPTY(&agent->applet_wq))
1647 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1648 else {
1649 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1650 APPCTX_SPOE(appctx).ctx = ctx;
1651 ctx->appctx = appctx;
1652 LIST_DEL(&ctx->applet_wait);
1653 LIST_INIT(&ctx->applet_wait);
1654 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1655 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1656 " - wake up stream to get available SPOE applet\n",
1657 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1658 __FUNCTION__, ctx->strm);
1659 }
1660}
1661
1662/* A failure occurred during SPOE applet creation. */
1663static void
1664on_new_spoe_appctx_failure(struct spoe_agent *agent)
1665{
1666 struct spoe_context *ctx;
1667
1668 agent->new_applets--;
1669 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
1670 ctx->errs++;
1671 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1672 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1673 " - wake up stream because to SPOE applet connection failed\n",
1674 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1675 __FUNCTION__, ctx->strm);
1676 }
1677}
1678
1679static void
1680on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1681{
1682 agent->new_applets--;
1683 offer_spoe_appctx(agent, appctx);
1684}
1685/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1686 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1687static int
1688acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1689{
1690 struct spoe_config *conf = FLT_CONF(ctx->filter);
1691 struct spoe_agent *agent = conf->agent;
1692 struct appctx *appctx;
1693
1694 /* If a process is already started for this SPOE context, retry
1695 * later. */
1696 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1697 goto wait;
1698
1699 /* If needed, initialize the buffer that will be used to encode messages
1700 * and decode actions. */
1701 if (ctx->buffer == &buf_empty) {
1702 if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
1703 LIST_DEL(&ctx->buffer_wait);
1704 LIST_INIT(&ctx->buffer_wait);
1705 }
1706
1707 if (!b_alloc_margin(&ctx->buffer, 0)) {
1708 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
1709 goto wait;
1710 }
1711 }
1712
1713 /* If the SPOE applet was already set, all is done. */
1714 if (ctx->appctx)
1715 goto success;
1716
1717 /* Else try to retrieve it from the agent cache */
1718 if (!LIST_ISEMPTY(&agent->cache)) {
1719 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1720 LIST_DEL(&APPCTX_SPOE(appctx).list);
1721 APPCTX_SPOE(appctx).ctx = ctx;
1722 ctx->appctx = appctx;
1723 goto success;
1724 }
1725
1726 /* If there is no server up for the agent's backend or it too many
1727 * failure occurred, this is an error. */
1728 if ((!agent->b.be->srv_act && !agent->b.be->srv_bck) ||
1729 ctx->errs >= MAX_NEW_SPOE_APPLET_ERRS)
1730 goto error;
1731
1732 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1733 " - waiting for available SPOE appctx\n",
1734 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1735 ctx->strm);
1736
1737 /* Else add the stream in the waiting queue. */
1738 if (LIST_ISEMPTY(&ctx->applet_wait))
1739 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1740
1741 /* Finally, create new SPOE applet if we can */
1742 if (agent->new_applets < MAX_NEW_SPOE_APPLETS) {
1743 if (create_spoe_appctx(conf) == NULL)
1744 goto error;
1745 }
1746
1747 wait:
1748 return 0;
1749
1750 success:
1751 /* Remove the stream from the waiting queue */
1752 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1753 LIST_DEL(&ctx->applet_wait);
1754 LIST_INIT(&ctx->applet_wait);
1755 }
1756
1757 /* Set the right flag to prevent request and response processing
1758 * in same time. */
1759 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1760 ? SPOE_CTX_FL_REQ_PROCESS
1761 : SPOE_CTX_FL_RSP_PROCESS);
1762 ctx->errs = 0;
1763
1764 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1765 " - acquire SPOE appctx %p from cache\n",
1766 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1767 __FUNCTION__, ctx->strm, ctx->appctx);
1768 return 1;
1769
1770 error:
1771 /* Remove the stream from the waiting queue */
1772 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1773 LIST_DEL(&ctx->applet_wait);
1774 LIST_INIT(&ctx->applet_wait);
1775 }
1776
1777 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1778 " - failed to acquire SPOE appctx errs=%u\n",
1779 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1780 __FUNCTION__, ctx->strm, ctx->errs);
1781 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1782
1783 return -1;
1784}
1785
1786/* Release a SPOE applet and push it in the agent cache. */
1787static void
1788release_spoe_appctx(struct spoe_context *ctx)
1789{
1790 struct spoe_config *conf = FLT_CONF(ctx->filter);
1791 struct spoe_agent *agent = conf->agent;
1792 struct appctx *appctx = ctx->appctx;
1793
1794 /* Reset the flag to allow next processing */
1795 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1796
Christopher Fauletf7a30922016-11-10 15:04:51 +01001797 /* Reset processing timer */
1798 ctx->process_exp = TICK_ETERNITY;
1799
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001800 /* Release the buffer if needed */
1801 if (ctx->buffer != &buf_empty) {
1802 b_free(&ctx->buffer);
1803 if (!LIST_ISEMPTY(&buffer_wq))
1804 stream_offer_buffers();
1805 }
1806
1807 /* If there is no SPOE applet, all is done */
1808 if (!appctx)
1809 return;
1810
1811 /* Else, reassign it or push it in the agent cache */
1812 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1813 " - release SPOE appctx %p\n",
1814 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1815 __FUNCTION__, ctx->strm, appctx);
1816
1817 APPCTX_SPOE(appctx).ctx = NULL;
1818 ctx->appctx = NULL;
1819 offer_spoe_appctx(agent, appctx);
1820}
1821
1822/***************************************************************************
1823 * Functions that process SPOE messages and actions
1824 **************************************************************************/
1825/* Process SPOE messages for a specific event. During the processing, it returns
1826 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1827 * is returned. */
1828static int
1829process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1830 struct list *messages, int dir)
1831{
1832 struct spoe_message *msg;
1833 struct sample *smp;
1834 struct spoe_arg *arg;
1835 char *p;
1836 size_t max_size;
1837 int off, flag, idx = 0;
1838
1839 /* Reserve 32 bytes from the frame Metadata */
1840 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1841
1842 b_reset(ctx->buffer);
1843 p = ctx->buffer->p;
1844
1845 /* Loop on messages */
1846 list_for_each_entry(msg, messages, list) {
1847 if (idx + msg->id_len + 1 > max_size)
1848 goto skip;
1849
1850 /* Set the message name */
1851 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1852
1853 /* Save offset where to store the number of arguments for this
1854 * message */
1855 off = idx++;
1856 p[off] = 0;
1857
1858 /* Loop on arguments */
1859 list_for_each_entry(arg, &msg->args, list) {
1860 p[off]++; /* Increment the number of arguments */
1861
1862 if (idx + arg->name_len + 1 > max_size)
1863 goto skip;
1864
1865 /* Encode the arguement name as a string. It can by NULL */
1866 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1867
1868 /* Fetch the arguement value */
1869 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1870 if (!smp) {
1871 /* If no value is available, set it to NULL */
1872 p[idx++] = SPOE_DATA_T_NULL;
1873 continue;
1874 }
1875
1876 /* Else, encode the arguement value */
1877 switch (smp->data.type) {
1878 case SMP_T_BOOL:
1879 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1880 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1881 break;
1882 case SMP_T_SINT:
1883 p[idx++] = SPOE_DATA_T_INT64;
1884 if (idx + 8 > max_size)
1885 goto skip;
1886 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1887 break;
1888 case SMP_T_IPV4:
1889 p[idx++] = SPOE_DATA_T_IPV4;
1890 if (idx + 4 > max_size)
1891 goto skip;
1892 memcpy(p+idx, &smp->data.u.ipv4, 4);
1893 idx += 4;
1894 break;
1895 case SMP_T_IPV6:
1896 p[idx++] = SPOE_DATA_T_IPV6;
1897 if (idx + 16 > max_size)
1898 goto skip;
1899 memcpy(p+idx, &smp->data.u.ipv6, 16);
1900 idx += 16;
1901 break;
1902 case SMP_T_STR:
1903 p[idx++] = SPOE_DATA_T_STR;
1904 if (idx + smp->data.u.str.len > max_size)
1905 goto skip;
1906 idx += encode_spoe_string(smp->data.u.str.str,
1907 smp->data.u.str.len,
1908 p+idx);
1909 break;
1910 case SMP_T_BIN:
1911 p[idx++] = SPOE_DATA_T_BIN;
1912 if (idx + smp->data.u.str.len > max_size)
1913 goto skip;
1914 idx += encode_spoe_string(smp->data.u.str.str,
1915 smp->data.u.str.len,
1916 p+idx);
1917 break;
1918 case SMP_T_METH:
1919 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1920 p[idx++] = SPOE_DATA_T_STR;
1921 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1922 goto skip;
1923 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1924 http_known_methods[smp->data.u.meth.meth].len,
1925 p+idx);
1926 }
1927 else {
1928 p[idx++] = SPOE_DATA_T_STR;
1929 if (idx + smp->data.u.str.len > max_size)
1930 goto skip;
1931 idx += encode_spoe_string(smp->data.u.meth.str.str,
1932 smp->data.u.meth.str.len,
1933 p+idx);
1934 }
1935 break;
1936 default:
1937 p[idx++] = SPOE_DATA_T_NULL;
1938 }
1939 }
1940 }
1941 ctx->buffer->i = idx;
1942 return 1;
1943
1944 skip:
1945 b_reset(ctx->buffer);
1946 return 0;
1947}
1948
1949/* Helper function to set a variable */
1950static void
1951set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1952 struct sample *smp)
1953{
1954 struct spoe_config *conf = FLT_CONF(ctx->filter);
1955 struct spoe_agent *agent = conf->agent;
1956 char varname[64];
1957
1958 memset(varname, 0, sizeof(varname));
1959 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1960 scope, agent->var_pfx, len, name);
1961 vars_set_by_name_ifexist(varname, len, smp);
1962}
1963
1964/* Helper function to unset a variable */
1965static void
1966unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1967 struct sample *smp)
1968{
1969 struct spoe_config *conf = FLT_CONF(ctx->filter);
1970 struct spoe_agent *agent = conf->agent;
1971 char varname[64];
1972
1973 memset(varname, 0, sizeof(varname));
1974 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1975 scope, agent->var_pfx, len, name);
1976 vars_unset_by_name_ifexist(varname, len, smp);
1977}
1978
1979
1980/* Process SPOE actions for a specific event. During the processing, it returns
1981 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1982 * is returned. */
1983static int
1984process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1985 enum spoe_event ev, int dir)
1986{
1987 char *p;
1988 size_t size;
1989 int off, i, idx = 0;
1990
1991 p = ctx->buffer->p;
1992 size = ctx->buffer->i;
1993
1994 while (idx < size) {
1995 char *str;
1996 uint64_t sz;
1997 struct sample smp;
1998 enum spoe_action_type type;
1999
2000 off = idx;
2001 if (idx+2 > size)
2002 goto skip;
2003
2004 type = p[idx++];
2005 switch (type) {
2006 case SPOE_ACT_T_SET_VAR: {
2007 char *scope;
2008
2009 if (p[idx++] != 3)
2010 goto skip_action;
2011
2012 switch (p[idx++]) {
2013 case SPOE_SCOPE_PROC: scope = "proc"; break;
2014 case SPOE_SCOPE_SESS: scope = "sess"; break;
2015 case SPOE_SCOPE_TXN : scope = "txn"; break;
2016 case SPOE_SCOPE_REQ : scope = "req"; break;
2017 case SPOE_SCOPE_RES : scope = "res"; break;
2018 default: goto skip;
2019 }
2020
2021 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2022 if (str == NULL)
2023 goto skip;
2024 memset(&smp, 0, sizeof(smp));
2025 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2026 if (decode_spoe_data(p+idx, p+size, &smp) == -1)
2027 goto skip;
2028
2029 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2030 " - set-var '%s.%s.%.*s'\n",
2031 (int)now.tv_sec, (int)now.tv_usec,
2032 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2033 __FUNCTION__, s, scope,
2034 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2035 (int)sz, str);
2036
2037 set_spoe_var(ctx, scope, str, sz, &smp);
2038 break;
2039 }
2040
2041 case SPOE_ACT_T_UNSET_VAR: {
2042 char *scope;
2043
2044 if (p[idx++] != 2)
2045 goto skip_action;
2046
2047 switch (p[idx++]) {
2048 case SPOE_SCOPE_PROC: scope = "proc"; break;
2049 case SPOE_SCOPE_SESS: scope = "sess"; break;
2050 case SPOE_SCOPE_TXN : scope = "txn"; break;
2051 case SPOE_SCOPE_REQ : scope = "req"; break;
2052 case SPOE_SCOPE_RES : scope = "res"; break;
2053 default: goto skip;
2054 }
2055
2056 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2057 if (str == NULL)
2058 goto skip;
2059 memset(&smp, 0, sizeof(smp));
2060 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2061
2062 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2063 " - unset-var '%s.%s.%.*s'\n",
2064 (int)now.tv_sec, (int)now.tv_usec,
2065 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2066 __FUNCTION__, s, scope,
2067 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2068 (int)sz, str);
2069
2070 unset_spoe_var(ctx, scope, str, sz, &smp);
2071 break;
2072 }
2073
2074 default:
2075 skip_action:
2076 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2077 goto skip;
2078 idx += i;
2079 }
2080 }
2081
2082 return 1;
2083 skip:
2084 return 0;
2085}
2086
2087
2088/* Process a SPOE event. First, this functions will process messages attached to
2089 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2090 * ACK frame to process corresponding actions. During all the processing, it
2091 * returns 0 and it returns 1 when the processing is finished. If an error
2092 * occurred, -1 is returned. */
2093static int
2094process_spoe_event(struct stream *s, struct spoe_context *ctx,
2095 enum spoe_event ev)
2096{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002097 struct spoe_config *conf = FLT_CONF(ctx->filter);
2098 struct spoe_agent *agent = conf->agent;
2099 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002100
2101 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2102 " - ctx-state=%s - event=%s\n",
2103 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002104 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002105 spoe_event_str[ev]);
2106
2107 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2108
2109 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2110 goto out;
2111
2112 if (ctx->state == SPOE_CTX_ST_ERROR)
2113 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002114
2115 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2116 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2117 " - failed to process event '%s': timeout\n",
2118 (int)now.tv_sec, (int)now.tv_usec,
2119 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2120 send_log(ctx->strm->be, LOG_WARNING,
2121 "failed to process event '%s': timeout.\n",
2122 spoe_event_str[ev]);
2123 goto error;
2124 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002125
2126 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauletf7a30922016-11-10 15:04:51 +01002127 if (!tick_isset(ctx->process_exp)) {
2128 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2129 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2130 ctx->process_exp);
2131 }
2132
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002133 ret = acquire_spoe_appctx(ctx, dir);
2134 if (ret <= 0) {
2135 if (!ret)
2136 goto out;
2137 goto error;
2138 }
2139 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2140 }
2141
2142 if (ctx->appctx == NULL)
2143 goto error;
2144
2145 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2146 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2147 if (ret <= 0) {
2148 if (!ret)
2149 goto skip;
2150 goto error;
2151 }
2152 wakeup_spoe_appctx(ctx);
2153 ret = 0;
2154 goto out;
2155 }
2156
2157 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2158 wakeup_spoe_appctx(ctx);
2159 ret = 0;
2160 goto out;
2161 }
2162
2163 if (ctx->state == SPOE_CTX_ST_DONE) {
2164 ret = process_spoe_actions(s, ctx, ev, dir);
2165 if (ret <= 0) {
2166 if (!ret)
2167 goto skip;
2168 goto error;
2169 }
2170 ctx->frame_id++;
2171 release_spoe_appctx(ctx);
2172 ctx->state = SPOE_CTX_ST_READY;
2173 }
2174
2175 out:
2176 return ret;
2177
2178 skip:
2179 release_spoe_appctx(ctx);
2180 ctx->state = SPOE_CTX_ST_READY;
2181 return 1;
2182
2183 error:
2184 release_spoe_appctx(ctx);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002185 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2186 ? SPOE_CTX_ST_READY
2187 : SPOE_CTX_ST_ERROR);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002188 return 1;
2189}
2190
2191
2192/***************************************************************************
2193 * Functions that create/destroy SPOE contexts
2194 **************************************************************************/
2195static struct spoe_context *
2196create_spoe_context(struct filter *filter)
2197{
2198 struct spoe_config *conf = FLT_CONF(filter);
2199 struct spoe_context *ctx;
2200
2201 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2202 if (ctx == NULL) {
2203 return NULL;
2204 }
2205 memset(ctx, 0, sizeof(*ctx));
2206 ctx->filter = filter;
2207 ctx->state = SPOE_CTX_ST_NONE;
2208 ctx->flags = 0;
2209 ctx->errs = 0;
2210 ctx->messages = conf->agent->messages;
2211 ctx->buffer = &buf_empty;
2212 LIST_INIT(&ctx->buffer_wait);
2213 LIST_INIT(&ctx->applet_wait);
2214
Christopher Fauletf7a30922016-11-10 15:04:51 +01002215 ctx->stream_id = 0;
2216 ctx->frame_id = 1;
2217 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002218
2219 return ctx;
2220}
2221
2222static void
2223destroy_spoe_context(struct spoe_context *ctx)
2224{
2225 if (!ctx)
2226 return;
2227
2228 if (ctx->appctx)
2229 APPCTX_SPOE(ctx->appctx).ctx = NULL;
2230 if (!LIST_ISEMPTY(&ctx->buffer_wait))
2231 LIST_DEL(&ctx->buffer_wait);
2232 if (!LIST_ISEMPTY(&ctx->applet_wait))
2233 LIST_DEL(&ctx->applet_wait);
2234 pool_free2(pool2_spoe_ctx, ctx);
2235}
2236
2237static void
2238reset_spoe_context(struct spoe_context *ctx)
2239{
2240 ctx->state = SPOE_CTX_ST_READY;
2241 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2242}
2243
2244
2245/***************************************************************************
2246 * Hooks that manage the filter lifecycle (init/check/deinit)
2247 **************************************************************************/
2248/* Signal handler: Do a soft stop, wakeup SPOE applet */
2249static void
2250sig_stop_spoe(struct sig_handler *sh)
2251{
2252 struct proxy *p;
2253
2254 p = proxy;
2255 while (p) {
2256 struct flt_conf *fconf;
2257
2258 list_for_each_entry(fconf, &p->filter_configs, list) {
2259 struct spoe_config *conf = fconf->conf;
2260 struct spoe_agent *agent = conf->agent;
2261 struct appctx *appctx;
2262
2263 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2264 si_applet_want_get(appctx->owner);
2265 si_applet_want_put(appctx->owner);
2266 appctx_wakeup(appctx);
2267 }
2268 }
2269 p = p->next;
2270 }
2271}
2272
2273
2274/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2275static int
2276spoe_init(struct proxy *px, struct flt_conf *fconf)
2277{
2278 struct spoe_config *conf = fconf->conf;
2279 struct listener *l;
2280
2281 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2282 init_new_proxy(&conf->agent_fe);
2283 conf->agent_fe.parent = conf->agent;
2284 conf->agent_fe.last_change = now.tv_sec;
2285 conf->agent_fe.id = conf->agent->id;
2286 conf->agent_fe.cap = PR_CAP_FE;
2287 conf->agent_fe.mode = PR_MODE_TCP;
2288 conf->agent_fe.maxconn = 0;
2289 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2290 conf->agent_fe.conn_retries = CONN_RETRIES;
2291 conf->agent_fe.accept = frontend_accept;
2292 conf->agent_fe.srv = NULL;
2293 conf->agent_fe.timeout.client = TICK_ETERNITY;
2294 conf->agent_fe.default_target = &spoe_applet.obj_type;
2295 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2296
2297 if ((l = calloc(1, sizeof(*l))) == NULL) {
2298 Alert("spoe_init : out of memory.\n");
2299 goto out_error;
2300 }
2301 l->obj_type = OBJ_TYPE_LISTENER;
2302 l->obj_type = OBJ_TYPE_LISTENER;
2303 l->frontend = &conf->agent_fe;
2304 l->state = LI_READY;
2305 l->analysers = conf->agent_fe.fe_req_ana;
2306 LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2307
2308 if (!sighandler_registered) {
2309 signal_register_fct(0, sig_stop_spoe, 0);
2310 sighandler_registered = 1;
2311 }
2312
2313 return 0;
2314
2315 out_error:
2316 return -1;
2317}
2318
2319/* Free ressources allocated by the SPOE filter. */
2320static void
2321spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2322{
2323 struct spoe_config *conf = fconf->conf;
2324
2325 if (conf) {
2326 struct spoe_agent *agent = conf->agent;
2327 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2328 struct listener *, by_fe);
2329
2330 free(l);
2331 release_spoe_agent(agent);
2332 free(conf);
2333 }
2334 fconf->conf = NULL;
2335}
2336
2337/* Check configuration of a SPOE filter for a specified proxy.
2338 * Return 1 on error, else 0. */
2339static int
2340spoe_check(struct proxy *px, struct flt_conf *fconf)
2341{
2342 struct spoe_config *conf = fconf->conf;
2343 struct proxy *target;
2344
2345 target = proxy_be_by_name(conf->agent->b.name);
2346 if (target == NULL) {
2347 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2348 " declared at %s:%d.\n",
2349 px->id, conf->agent->b.name, conf->agent->id,
2350 conf->agent->conf.file, conf->agent->conf.line);
2351 return 1;
2352 }
2353 if (target->mode != PR_MODE_TCP) {
2354 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2355 " at %s:%d does not support HTTP mode.\n",
2356 px->id, target->id, conf->agent->id,
2357 conf->agent->conf.file, conf->agent->conf.line);
2358 return 1;
2359 }
2360
2361 free(conf->agent->b.name);
2362 conf->agent->b.name = NULL;
2363 conf->agent->b.be = target;
2364 return 0;
2365}
2366
2367/**************************************************************************
2368 * Hooks attached to a stream
2369 *************************************************************************/
2370/* Called when a filter instance is created and attach to a stream. It creates
2371 * the context that will be used to process this stream. */
2372static int
2373spoe_start(struct stream *s, struct filter *filter)
2374{
2375 struct spoe_context *ctx;
2376
2377 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2378 (int)now.tv_sec, (int)now.tv_usec,
2379 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2380 __FUNCTION__, s);
2381
2382 ctx = create_spoe_context(filter);
2383 if (ctx == NULL) {
2384 send_log(s->be, LOG_EMERG,
2385 "failed to create SPOE context for proxy %s\n",
2386 s->be->id);
2387 return 0;
2388 }
2389
2390 ctx->strm = s;
2391 ctx->state = SPOE_CTX_ST_READY;
2392 filter->ctx = ctx;
2393
2394 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2395 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2396
2397 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2398 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2399
2400 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2401 filter->pre_analyzers |= AN_RES_INSPECT;
2402
2403 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2404 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2405
2406 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2407 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2408
2409 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2410 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2411
2412 return 1;
2413}
2414
2415/* Called when a filter instance is detached from a stream. It release the
2416 * attached SPOE context. */
2417static void
2418spoe_stop(struct stream *s, struct filter *filter)
2419{
2420 struct spoe_context *ctx = filter->ctx;
2421
2422 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2423 (int)now.tv_sec, (int)now.tv_usec,
2424 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2425 __FUNCTION__, s);
2426
2427 if (ctx) {
2428 release_spoe_appctx(ctx);
2429 destroy_spoe_context(ctx);
2430 }
2431}
2432
Christopher Fauletf7a30922016-11-10 15:04:51 +01002433
2434/*
2435 * Called when the stream is woken up because of expired timer.
2436 */
2437static void
2438spoe_check_timeouts(struct stream *s, struct filter *filter)
2439{
2440 struct spoe_context *ctx = filter->ctx;
2441
2442 if (tick_is_expired(ctx->process_exp, now_ms))
2443 s->task->state |= TASK_WOKEN_MSG;
2444}
2445
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002446/* Called when we are ready to filter data on a channel */
2447static int
2448spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2449{
2450 struct spoe_context *ctx = filter->ctx;
2451 int ret = 1;
2452
2453 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2454 " - ctx-flags=0x%08x\n",
2455 (int)now.tv_sec, (int)now.tv_usec,
2456 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2457 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2458
2459 if (!(chn->flags & CF_ISRESP)) {
2460 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2461 chn->analysers |= AN_REQ_INSPECT_FE;
2462 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2463 chn->analysers |= AN_REQ_INSPECT_BE;
2464
2465 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2466 goto out;
2467
2468 ctx->stream_id = s->uniq_id;
2469 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2470 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2471 if (ret != 1)
2472 goto out;
2473 }
2474 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2475 }
2476 else {
2477 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2478 chn->analysers |= AN_RES_INSPECT;
2479
2480 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2481 goto out;
2482
2483 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2484 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2485 if (ret != 1)
2486 goto out;
2487 }
2488 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2489 }
2490
2491 out:
2492 if (!ret) {
2493 channel_dont_read(chn);
2494 channel_dont_close(chn);
2495 }
2496 return ret;
2497}
2498
2499/* Called before a processing happens on a given channel */
2500static int
2501spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2502 struct channel *chn, unsigned an_bit)
2503{
2504 struct spoe_context *ctx = filter->ctx;
2505 int ret = 1;
2506
2507 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2508 " - ctx-flags=0x%08x - ana=0x%08x\n",
2509 (int)now.tv_sec, (int)now.tv_usec,
2510 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2511 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2512 ctx->flags, an_bit);
2513
2514 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2515 goto out;
2516
2517 switch (an_bit) {
2518 case AN_REQ_INSPECT_FE:
2519 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2520 break;
2521 case AN_REQ_INSPECT_BE:
2522 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2523 break;
2524 case AN_RES_INSPECT:
2525 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2526 break;
2527 case AN_REQ_HTTP_PROCESS_FE:
2528 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2529 break;
2530 case AN_REQ_HTTP_PROCESS_BE:
2531 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2532 break;
2533 case AN_RES_HTTP_PROCESS_FE:
2534 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2535 break;
2536 }
2537
2538 out:
2539 if (!ret) {
2540 channel_dont_read(chn);
2541 channel_dont_close(chn);
2542 }
2543 return ret;
2544}
2545
2546/* Called when the filtering on the channel ends. */
2547static int
2548spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2549{
2550 struct spoe_context *ctx = filter->ctx;
2551
2552 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2553 " - ctx-flags=0x%08x\n",
2554 (int)now.tv_sec, (int)now.tv_usec,
2555 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2556 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2557
2558 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2559 reset_spoe_context(ctx);
2560 }
2561
2562 return 1;
2563}
2564
2565/********************************************************************
2566 * Functions that manage the filter initialization
2567 ********************************************************************/
2568struct flt_ops spoe_ops = {
2569 /* Manage SPOE filter, called for each filter declaration */
2570 .init = spoe_init,
2571 .deinit = spoe_deinit,
2572 .check = spoe_check,
2573
2574 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002575 .attach = spoe_start,
2576 .detach = spoe_stop,
2577 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002578
2579 /* Handle channels activity */
2580 .channel_start_analyze = spoe_start_analyze,
2581 .channel_pre_analyze = spoe_chn_pre_analyze,
2582 .channel_end_analyze = spoe_end_analyze,
2583};
2584
2585
2586static int
2587cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2588{
2589 const char *err;
2590 int i, err_code = 0;
2591
2592 if ((cfg_scope == NULL && curengine != NULL) ||
2593 (cfg_scope != NULL && curengine == NULL) ||
2594 strcmp(curengine, cfg_scope))
2595 goto out;
2596
2597 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2598 if (!*args[1]) {
2599 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2600 file, linenum);
2601 err_code |= ERR_ALERT | ERR_ABORT;
2602 goto out;
2603 }
2604 if (*args[2]) {
2605 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2606 file, linenum, args[2]);
2607 err_code |= ERR_ALERT | ERR_ABORT;
2608 goto out;
2609 }
2610
2611 err = invalid_char(args[1]);
2612 if (err) {
2613 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2614 file, linenum, *err, args[0], args[1]);
2615 err_code |= ERR_ALERT | ERR_ABORT;
2616 goto out;
2617 }
2618
2619 if (curagent != NULL) {
2620 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2621 file, linenum);
2622 err_code |= ERR_ALERT | ERR_ABORT;
2623 goto out;
2624 }
2625 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2626 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2627 err_code |= ERR_ALERT | ERR_ABORT;
2628 goto out;
2629 }
2630
2631 curagent->id = strdup(args[1]);
2632 curagent->conf.file = strdup(file);
2633 curagent->conf.line = linenum;
2634 curagent->timeout.hello = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002635 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002636 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002637 curagent->var_pfx = NULL;
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002638 curagent->flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002639 curagent->new_applets = 0;
2640
2641 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2642 LIST_INIT(&curagent->messages[i]);
2643 LIST_INIT(&curagent->cache);
2644 LIST_INIT(&curagent->applet_wq);
2645 }
2646 else if (!strcmp(args[0], "use-backend")) {
2647 if (!*args[1]) {
2648 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2649 file, linenum, args[0]);
2650 err_code |= ERR_ALERT | ERR_FATAL;
2651 goto out;
2652 }
2653 if (*args[2]) {
2654 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2655 file, linenum, args[2]);
2656 err_code |= ERR_ALERT | ERR_ABORT;
2657 goto out;
2658 }
2659 free(curagent->b.name);
2660 curagent->b.name = strdup(args[1]);
2661 }
2662 else if (!strcmp(args[0], "messages")) {
2663 int cur_arg = 1;
2664 while (*args[cur_arg]) {
2665 struct spoe_msg_placeholder *mp = NULL;
2666
2667 list_for_each_entry(mp, &curmps, list) {
2668 if (!strcmp(mp->id, args[cur_arg])) {
2669 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2670 file, linenum, args[cur_arg]);
2671 err_code |= ERR_ALERT | ERR_FATAL;
2672 goto out;
2673 }
2674 }
2675
2676 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2677 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2678 err_code |= ERR_ALERT | ERR_ABORT;
2679 goto out;
2680 }
2681 mp->id = strdup(args[cur_arg]);
2682 LIST_ADDQ(&curmps, &mp->list);
2683 cur_arg++;
2684 }
2685 }
2686 else if (!strcmp(args[0], "timeout")) {
2687 unsigned int *tv = NULL;
2688 const char *res;
2689 unsigned timeout;
2690
2691 if (!*args[1]) {
2692 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2693 file, linenum);
2694 err_code |= ERR_ALERT | ERR_FATAL;
2695 goto out;
2696 }
2697 if (!strcmp(args[1], "hello"))
2698 tv = &curagent->timeout.hello;
2699 else if (!strcmp(args[1], "idle"))
2700 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002701 else if (!strcmp(args[1], "processing"))
2702 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002703 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01002704 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002705 file, linenum, args[1]);
2706 err_code |= ERR_ALERT | ERR_FATAL;
2707 goto out;
2708 }
2709 if (!*args[2]) {
2710 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2711 file, linenum, args[1]);
2712 err_code |= ERR_ALERT | ERR_FATAL;
2713 goto out;
2714 }
2715 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2716 if (res) {
2717 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2718 file, linenum, *res, args[1]);
2719 err_code |= ERR_ALERT | ERR_ABORT;
2720 goto out;
2721 }
2722 if (*args[3]) {
2723 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2724 file, linenum, args[3]);
2725 err_code |= ERR_ALERT | ERR_ABORT;
2726 goto out;
2727 }
2728 *tv = MS_TO_TICKS(timeout);
2729 }
2730 else if (!strcmp(args[0], "option")) {
2731 if (!*args[1]) {
2732 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2733 file, linenum, args[0]);
2734 err_code |= ERR_ALERT | ERR_FATAL;
2735 goto out;
2736 }
2737 if (!strcmp(args[1], "var-prefix")) {
2738 char *tmp;
2739
2740 if (!*args[2]) {
2741 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2742 file, linenum, args[0],
2743 args[1]);
2744 err_code |= ERR_ALERT | ERR_FATAL;
2745 goto out;
2746 }
2747 tmp = args[2];
2748 while (*tmp) {
2749 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2750 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2751 file, linenum, args[0], args[1]);
2752 err_code |= ERR_ALERT | ERR_FATAL;
2753 goto out;
2754 }
2755 tmp++;
2756 }
2757 curagent->var_pfx = strdup(args[2]);
2758 }
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002759 else if (!strcmp(args[1], "continue-on-error")) {
2760 if (*args[2]) {
2761 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2762 file, linenum, args[3]);
2763 err_code |= ERR_ALERT | ERR_ABORT;
2764 goto out;
2765 }
2766 curagent->flags |= SPOE_FL_CONT_ON_ERR;
2767 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002768 else {
2769 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2770 file, linenum, args[1]);
2771 err_code |= ERR_ALERT | ERR_FATAL;
2772 goto out;
2773 }
2774 }
2775 else if (*args[0]) {
2776 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2777 file, linenum, args[0]);
2778 err_code |= ERR_ALERT | ERR_FATAL;
2779 goto out;
2780 }
2781 out:
2782 return err_code;
2783}
2784
2785static int
2786cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2787{
2788 struct spoe_message *msg;
2789 struct spoe_arg *arg;
2790 const char *err;
2791 char *errmsg = NULL;
2792 int err_code = 0;
2793
2794 if ((cfg_scope == NULL && curengine != NULL) ||
2795 (cfg_scope != NULL && curengine == NULL) ||
2796 strcmp(curengine, cfg_scope))
2797 goto out;
2798
2799 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2800 if (!*args[1]) {
2801 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2802 file, linenum);
2803 err_code |= ERR_ALERT | ERR_ABORT;
2804 goto out;
2805 }
2806 if (*args[2]) {
2807 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2808 file, linenum, args[2]);
2809 err_code |= ERR_ALERT | ERR_ABORT;
2810 goto out;
2811 }
2812
2813 err = invalid_char(args[1]);
2814 if (err) {
2815 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2816 file, linenum, *err, args[0], args[1]);
2817 err_code |= ERR_ALERT | ERR_ABORT;
2818 goto out;
2819 }
2820
2821 list_for_each_entry(msg, &curmsgs, list) {
2822 if (!strcmp(msg->id, args[1])) {
2823 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2824 " name as another one declared at %s:%d.\n",
2825 file, linenum, args[1], msg->conf.file, msg->conf.line);
2826 err_code |= ERR_ALERT | ERR_FATAL;
2827 goto out;
2828 }
2829 }
2830
2831 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2832 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2833 err_code |= ERR_ALERT | ERR_ABORT;
2834 goto out;
2835 }
2836
2837 curmsg->id = strdup(args[1]);
2838 curmsg->id_len = strlen(curmsg->id);
2839 curmsg->event = SPOE_EV_NONE;
2840 curmsg->conf.file = strdup(file);
2841 curmsg->conf.line = linenum;
2842 LIST_INIT(&curmsg->args);
2843 LIST_ADDQ(&curmsgs, &curmsg->list);
2844 }
2845 else if (!strcmp(args[0], "args")) {
2846 int cur_arg = 1;
2847
2848 curproxy->conf.args.ctx = ARGC_SPOE;
2849 curproxy->conf.args.file = file;
2850 curproxy->conf.args.line = linenum;
2851 while (*args[cur_arg]) {
2852 char *delim = strchr(args[cur_arg], '=');
2853 int idx = 0;
2854
2855 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2856 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2857 err_code |= ERR_ALERT | ERR_ABORT;
2858 goto out;
2859 }
2860
2861 if (!delim) {
2862 arg->name = NULL;
2863 arg->name_len = 0;
2864 delim = args[cur_arg];
2865 }
2866 else {
2867 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2868 arg->name_len = delim - args[cur_arg];
2869 delim++;
2870 }
2871
2872 arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
2873 if (arg->expr == NULL) {
2874 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2875 err_code |= ERR_ALERT | ERR_FATAL;
2876 free(arg->name);
2877 free(arg);
2878 goto out;
2879 }
2880 LIST_ADDQ(&curmsg->args, &arg->list);
2881 cur_arg++;
2882 }
2883 curproxy->conf.args.file = NULL;
2884 curproxy->conf.args.line = 0;
2885 }
2886 else if (!strcmp(args[0], "event")) {
2887 if (!*args[1]) {
2888 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2889 err_code |= ERR_ALERT | ERR_ABORT;
2890 goto out;
2891 }
2892 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2893 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2894 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2895 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2896
2897 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2898 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2899 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2900 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2901 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2902 curmsg->event = SPOE_EV_ON_TCP_RSP;
2903
2904 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2905 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2906 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2907 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2908 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2909 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2910 else {
2911 Alert("parsing [%s:%d] : unkown event '%s'.\n",
2912 file, linenum, args[1]);
2913 err_code |= ERR_ALERT | ERR_ABORT;
2914 goto out;
2915 }
2916 }
2917 else if (!*args[0]) {
2918 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
2919 file, linenum, args[0]);
2920 err_code |= ERR_ALERT | ERR_FATAL;
2921 goto out;
2922 }
2923 out:
2924 free(errmsg);
2925 return err_code;
2926}
2927
2928/* Return -1 on error, else 0 */
2929static int
2930parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
2931 struct flt_conf *fconf, char **err, void *private)
2932{
2933 struct list backup_sections;
2934 struct spoe_config *conf;
2935 struct spoe_message *msg, *msgback;
2936 struct spoe_msg_placeholder *mp, *mpback;
2937 char *file = NULL, *engine = NULL;
2938 int ret, pos = *cur_arg + 1;
2939
2940 conf = calloc(1, sizeof(*conf));
2941 if (conf == NULL) {
2942 memprintf(err, "%s: out of memory", args[*cur_arg]);
2943 goto error;
2944 }
2945 conf->proxy = px;
2946
2947 while (*args[pos]) {
2948 if (!strcmp(args[pos], "config")) {
2949 if (!*args[pos+1]) {
2950 memprintf(err, "'%s' : '%s' option without value",
2951 args[*cur_arg], args[pos]);
2952 goto error;
2953 }
2954 file = args[pos+1];
2955 pos += 2;
2956 }
2957 else if (!strcmp(args[pos], "engine")) {
2958 if (!*args[pos+1]) {
2959 memprintf(err, "'%s' : '%s' option without value",
2960 args[*cur_arg], args[pos]);
2961 goto error;
2962 }
2963 engine = args[pos+1];
2964 pos += 2;
2965 }
2966 else {
2967 memprintf(err, "unknown keyword '%s'", args[pos]);
2968 goto error;
2969 }
2970 }
2971 if (file == NULL) {
2972 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
2973 goto error;
2974 }
2975
2976 /* backup sections and register SPOE sections */
2977 LIST_INIT(&backup_sections);
2978 cfg_backup_sections(&backup_sections);
2979 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
2980 cfg_register_section("spoe-message", cfg_parse_spoe_message);
2981
2982 /* Parse SPOE filter configuration file */
2983 curengine = engine;
2984 curproxy = px;
2985 curagent = NULL;
2986 curmsg = NULL;
2987 ret = readcfgfile(file);
2988 curproxy = NULL;
2989
2990 /* unregister SPOE sections and restore previous sections */
2991 cfg_unregister_sections();
2992 cfg_restore_sections(&backup_sections);
2993
2994 if (ret == -1) {
2995 memprintf(err, "Could not open configuration file %s : %s",
2996 file, strerror(errno));
2997 goto error;
2998 }
2999 if (ret & (ERR_ABORT|ERR_FATAL)) {
3000 memprintf(err, "Error(s) found in configuration file %s", file);
3001 goto error;
3002 }
3003
3004 /* Check SPOE agent */
3005 if (curagent == NULL) {
3006 memprintf(err, "No SPOE agent found in file %s", file);
3007 goto error;
3008 }
3009 if (curagent->b.name == NULL) {
3010 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3011 curagent->id, curagent->conf.file, curagent->conf.line);
3012 goto error;
3013 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003014 if (curagent->timeout.hello == TICK_ETERNITY ||
3015 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003016 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003017 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3018 " | While not properly invalid, you will certainly encounter various problems\n"
3019 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003020 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003021 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3022 }
3023 if (curagent->var_pfx == NULL) {
3024 char *tmp = curagent->id;
3025
3026 while (*tmp) {
3027 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3028 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3029 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3030 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3031 goto error;
3032 }
3033 tmp++;
3034 }
3035 curagent->var_pfx = strdup(curagent->id);
3036 }
3037
3038 if (LIST_ISEMPTY(&curmps)) {
3039 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3040 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3041 goto finish;
3042 }
3043
3044 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3045 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3046 if (!strcmp(msg->id, mp->id)) {
3047 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3048 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3049 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3050 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3051 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3052 }
3053 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3054 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3055 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3056 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3057 px->id, msg->conf.file, msg->conf.line);
3058 goto next;
3059 }
3060 if (msg->event == SPOE_EV_NONE) {
3061 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3062 px->id, msg->conf.file, msg->conf.line);
3063 goto next;
3064 }
3065 msg->agent = curagent;
3066 LIST_DEL(&msg->list);
3067 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3068 goto next;
3069 }
3070 }
3071 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3072 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3073 goto error;
3074 next:
3075 continue;
3076 }
3077
3078 finish:
3079 conf->agent = curagent;
3080 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3081 LIST_DEL(&mp->list);
3082 release_spoe_msg_placeholder(mp);
3083 }
3084 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3085 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3086 px->id, msg->id, msg->conf.file, msg->conf.line);
3087 LIST_DEL(&msg->list);
3088 release_spoe_message(msg);
3089 }
3090
3091 *cur_arg = pos;
3092 fconf->ops = &spoe_ops;
3093 fconf->conf = conf;
3094 return 0;
3095
3096 error:
3097 release_spoe_agent(curagent);
3098 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3099 LIST_DEL(&mp->list);
3100 release_spoe_msg_placeholder(mp);
3101 }
3102 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3103 LIST_DEL(&msg->list);
3104 release_spoe_message(msg);
3105 }
3106 free(conf);
3107 return -1;
3108}
3109
3110
3111/* Declare the filter parser for "spoe" keyword */
3112static struct flt_kw_list flt_kws = { "SPOE", { }, {
3113 { "spoe", parse_spoe_flt, NULL },
3114 { NULL, NULL, NULL },
3115 }
3116};
3117
3118__attribute__((constructor))
3119static void __spoe_init(void)
3120{
3121 flt_register_keywords(&flt_kws);
3122
3123 LIST_INIT(&curmsgs);
3124 LIST_INIT(&curmps);
3125 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3126}
3127
3128__attribute__((destructor))
3129static void
3130__spoe_deinit(void)
3131{
3132 pool_destroy2(pool2_spoe_ctx);
3133}