blob: 4361773381c2c7e0f9d41fb616c3675d7c7778fb [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
33#include <proto/frontend.h>
34#include <proto/log.h>
35#include <proto/proto_http.h>
36#include <proto/proxy.h>
37#include <proto/sample.h>
38#include <proto/session.h>
39#include <proto/signal.h>
40#include <proto/stream.h>
41#include <proto/stream_interface.h>
42#include <proto/task.h>
43#include <proto/vars.h>
44
45#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
46#define SPOE_PRINTF(x...) fprintf(x)
47#else
48#define SPOE_PRINTF(x...)
49#endif
50
51/* Helper to get ctx inside an appctx */
52#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
53
54/* TODO: add an option to customize these values */
55/* The maximum number of new applet waiting the end of the hello handshake */
56#define MAX_NEW_SPOE_APPLETS 5
57
58/* The maximum number of error when a stream is waiting of a SPOE applet */
59#define MAX_NEW_SPOE_APPLET_ERRS 3
60
61/* Minimal size for a frame */
62#define MIN_FRAME_SIZE 256
63
64/* Flags set on the SPOE context */
65#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
66#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
67#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
68#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
69
70#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
71
72#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
73#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
74
75/* All possible states for a SPOE context */
76enum spoe_ctx_state {
77 SPOE_CTX_ST_NONE = 0,
78 SPOE_CTX_ST_READY,
79 SPOE_CTX_ST_SENDING_MSGS,
80 SPOE_CTX_ST_WAITING_ACK,
81 SPOE_CTX_ST_DONE,
82 SPOE_CTX_ST_ERROR,
83};
84
85/* All possible states for a SPOE applet */
86enum spoe_appctx_state {
87 SPOE_APPCTX_ST_CONNECT = 0,
88 SPOE_APPCTX_ST_CONNECTING,
89 SPOE_APPCTX_ST_PROCESSING,
90 SPOE_APPCTX_ST_DISCONNECT,
91 SPOE_APPCTX_ST_DISCONNECTING,
92 SPOE_APPCTX_ST_EXIT,
93 SPOE_APPCTX_ST_END,
94};
95
96/* All supported SPOE actions */
97enum spoe_action_type {
98 SPOE_ACT_T_SET_VAR = 1,
99 SPOE_ACT_T_UNSET_VAR,
100 SPOE_ACT_TYPES,
101};
102
103/* All supported SPOE events */
104enum spoe_event {
105 SPOE_EV_NONE = 0,
106
107 /* Request events */
108 SPOE_EV_ON_CLIENT_SESS = 1,
109 SPOE_EV_ON_TCP_REQ_FE,
110 SPOE_EV_ON_TCP_REQ_BE,
111 SPOE_EV_ON_HTTP_REQ_FE,
112 SPOE_EV_ON_HTTP_REQ_BE,
113
114 /* Response events */
115 SPOE_EV_ON_SERVER_SESS,
116 SPOE_EV_ON_TCP_RSP,
117 SPOE_EV_ON_HTTP_RSP,
118
119 SPOE_EV_EVENTS
120};
121
122/* Errors triggerd by SPOE applet */
123enum spoe_frame_error {
124 SPOE_FRM_ERR_NONE = 0,
125 SPOE_FRM_ERR_IO,
126 SPOE_FRM_ERR_TOUT,
127 SPOE_FRM_ERR_TOO_BIG,
128 SPOE_FRM_ERR_INVALID,
129 SPOE_FRM_ERR_NO_VSN,
130 SPOE_FRM_ERR_NO_FRAME_SIZE,
131 SPOE_FRM_ERR_NO_CAP,
132 SPOE_FRM_ERR_BAD_VSN,
133 SPOE_FRM_ERR_BAD_FRAME_SIZE,
134 SPOE_FRM_ERR_UNKNOWN = 99,
135 SPOE_FRM_ERRS,
136};
137
138/* Scopes used for variables set by agents. It is a way to be agnotic to vars
139 * scope. */
140enum spoe_vars_scope {
141 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
142 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
143 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
144 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
145 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
146};
147
148
149/* Describe an argument that will be linked to a message. It is a sample fetch,
150 * with an optional name. */
151struct spoe_arg {
152 char *name; /* Name of the argument, may be NULL */
153 unsigned int name_len; /* The name length, 0 if NULL */
154 struct sample_expr *expr; /* Sample expression */
155 struct list list; /* Used to chain SPOE args */
156};
157
158/* Used during the config parsing only because, when a SPOE agent section is
159 * parsed, messages can be undefined. */
160struct spoe_msg_placeholder {
161 char *id; /* SPOE message placeholder id */
162 struct list list; /* Use to chain SPOE message placeholders */
163};
164
165/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
166 * an argument list (see above) and it is linked to a specific event. */
167struct spoe_message {
168 char *id; /* SPOE message id */
169 unsigned int id_len; /* The message id length */
170 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
171 struct {
172 char *file; /* file where the SPOE message appears */
173 int line; /* line where the SPOE message appears */
174 } conf; /* config information */
175 struct list args; /* Arguments added when the SPOE messages is sent */
176 struct list list; /* Used to chain SPOE messages */
177
178 enum spoe_event event; /* SPOE_EV_* */
179};
180
181/* Describe a SPOE agent. */
182struct spoe_agent {
183 char *id; /* SPOE agent id (name) */
184 struct {
185 char *file; /* file where the SPOE agent appears */
186 int line; /* line where the SPOE agent appears */
187 } conf; /* config information */
188 union {
189 struct proxy *be; /* Backend used by this agent */
190 char *name; /* Backend name used during conf parsing */
191 } b;
192 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100193 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
194 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100195 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200196 } timeout;
197
198 char *var_pfx; /* Prefix used for vars set by the agent */
199
200 struct list cache; /* List used to cache SPOE streams. In
201 * fact, we cache the SPOE applect ctx */
202
203 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
204 * for each supported events */
205
206 struct list applet_wq; /* List of streams waiting for a SPOE applet */
207 unsigned int new_applets; /* The number of new SPOE applets */
208};
209
210/* SPOE filter configuration */
211struct spoe_config {
212 struct proxy *proxy; /* Proxy owning the filter */
213 struct spoe_agent *agent; /* Agent used by this filter */
214 struct proxy agent_fe; /* Agent frontend */
215};
216
217/* SPOE context attached to a stream. It is the main structure that handles the
218 * processing offload */
219struct spoe_context {
220 struct filter *filter; /* The SPOE filter */
221 struct stream *strm; /* The stream that should be offloaded */
222 struct appctx *appctx; /* The SPOE appctx */
223 struct list *messages; /* List of messages that will be sent during the stream processing */
224 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
225 struct list buffer_wait; /* position in the list of streams waiting for a buffer */
226 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
227
228 unsigned int errs; /* The number of errors to acquire a SPOE applet */
229
230 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
231 unsigned int flags; /* SPOE_CTX_FL_* */
232
233 unsigned int stream_id; /* stream_id and frame_id are used */
234 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100235 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200236};
237
238/* Set if the handle on SIGUSR1 is registered */
239static int sighandler_registered = 0;
240
241/* proxy used during the parsing */
242struct proxy *curproxy = NULL;
243
244/* The name of the SPOE engine, used during the parsing */
245char *curengine = NULL;
246
247/* SPOE agent used during the parsing */
248struct spoe_agent *curagent = NULL;
249
250/* SPOE message used during the parsing */
251struct spoe_message *curmsg = NULL;
252
253/* list of SPOE messages and placeholders used during the parsing */
254struct list curmsgs;
255struct list curmps;
256
257/* Pool used to allocate new SPOE contexts */
258static struct pool_head *pool2_spoe_ctx = NULL;
259
260/* Temporary variables used to ease error processing */
261int spoe_status_code = SPOE_FRM_ERR_NONE;
262char spoe_reason[256];
263
264struct flt_ops spoe_ops;
265
266static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
267static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
268static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
269
270/********************************************************************
271 * helper functions/globals
272 ********************************************************************/
273static void
274release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
275{
276 if (!mp)
277 return;
278 free(mp->id);
279 free(mp);
280}
281
282
283static void
284release_spoe_message(struct spoe_message *msg)
285{
286 struct spoe_arg *arg, *back;
287
288 if (!msg)
289 return;
290 free(msg->id);
291 free(msg->conf.file);
292 list_for_each_entry_safe(arg, back, &msg->args, list) {
293 release_sample_expr(arg->expr);
294 free(arg->name);
295 LIST_DEL(&arg->list);
296 free(arg);
297 }
298 free(msg);
299}
300
301static void
302release_spoe_agent(struct spoe_agent *agent)
303{
304 struct spoe_message *msg, *back;
305 int i;
306
307 if (!agent)
308 return;
309 free(agent->id);
310 free(agent->conf.file);
311 free(agent->var_pfx);
312 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
313 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
314 LIST_DEL(&msg->list);
315 release_spoe_message(msg);
316 }
317 }
318 free(agent);
319}
320
321static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
322 [SPOE_FRM_ERR_NONE] = "normal",
323 [SPOE_FRM_ERR_IO] = "I/O error",
324 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
325 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
326 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
327 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
328 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
329 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
330 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
331 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
332 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
333};
334
335static const char *spoe_event_str[SPOE_EV_EVENTS] = {
336 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
337 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
338 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
339 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
340 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
341
342 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
343 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
344 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
345};
346
347
348#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
349
350static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
351 [SPOE_CTX_ST_NONE] = "NONE",
352 [SPOE_CTX_ST_READY] = "READY",
353 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
354 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
355 [SPOE_CTX_ST_DONE] = "DONE",
356 [SPOE_CTX_ST_ERROR] = "ERROR",
357};
358
359static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
360 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
361 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
362 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
363 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
364 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
365 [SPOE_APPCTX_ST_EXIT] = "EXIT",
366 [SPOE_APPCTX_ST_END] = "END",
367};
368
369#endif
370/********************************************************************
371 * Functions that encode/decode SPOE frames
372 ********************************************************************/
373/* Frame Types sent by HAProxy and by agents */
374enum spoe_frame_type {
375 /* Frames sent by HAProxy */
376 SPOE_FRM_T_HAPROXY_HELLO = 1,
377 SPOE_FRM_T_HAPROXY_DISCON,
378 SPOE_FRM_T_HAPROXY_NOTIFY,
379
380 /* Frames sent by the agents */
381 SPOE_FRM_T_AGENT_HELLO = 101,
382 SPOE_FRM_T_AGENT_DISCON,
383 SPOE_FRM_T_AGENT_ACK
384};
385
386/* All supported data types */
387enum spoe_data_type {
388 SPOE_DATA_T_NULL = 0,
389 SPOE_DATA_T_BOOL,
390 SPOE_DATA_T_INT32,
391 SPOE_DATA_T_UINT32,
392 SPOE_DATA_T_INT64,
393 SPOE_DATA_T_UINT64,
394 SPOE_DATA_T_IPV4,
395 SPOE_DATA_T_IPV6,
396 SPOE_DATA_T_STR,
397 SPOE_DATA_T_BIN,
398 SPOE_DATA_TYPES
399};
400
401/* Masks to get data type or flags value */
402#define SPOE_DATA_T_MASK 0x0F
403#define SPOE_DATA_FL_MASK 0xF0
404
405/* Flags to set Boolean values */
406#define SPOE_DATA_FL_FALSE 0x00
407#define SPOE_DATA_FL_TRUE 0x10
408
409/* Helper to get static string length, excluding the terminating null byte */
410#define SLEN(str) (sizeof(str)-1)
411
412/* Predefined key used in HELLO/DISCONNECT frames */
413#define SUPPORTED_VERSIONS_KEY "supported-versions"
414#define VERSION_KEY "version"
415#define MAX_FRAME_SIZE_KEY "max-frame-size"
416#define CAPABILITIES_KEY "capabilities"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100417#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200418#define STATUS_CODE_KEY "status-code"
419#define MSG_KEY "message"
420
421struct spoe_version {
422 char *str;
423 int min;
424 int max;
425};
426
427/* All supported versions */
428static struct spoe_version supported_versions[] = {
429 {"1.0", 1000, 1000},
430 {NULL, 0, 0}
431};
432
433/* Comma-separated list of supported versions */
434#define SUPPORTED_VERSIONS_VAL "1.0"
435
436/* Comma-separated list of supported capabilities (none for now) */
437#define CAPABILITIES_VAL ""
438
439static int
440decode_spoe_version(const char *str, size_t len)
441{
442 char tmp[len+1], *start, *end;
443 double d;
444 int vsn = -1;
445
446 memset(tmp, 0, len+1);
447 memcpy(tmp, str, len);
448
449 start = tmp;
450 while (isspace(*start))
451 start++;
452
453 d = strtod(start, &end);
454 if (d == 0 || start == end)
455 goto out;
456
457 if (*end) {
458 while (isspace(*end))
459 end++;
460 if (*end)
461 goto out;
462 }
463 vsn = (int)(d * 1000);
464 out:
465 return vsn;
466}
467
468/* Encode a variable-length integer. This function never fails and returns the
469 * number of written bytes. */
470static int
471encode_spoe_varint(uint64_t i, char *buf)
472{
473 int idx;
474
475 if (i < 240) {
476 buf[0] = (unsigned char)i;
477 return 1;
478 }
479
480 buf[0] = (unsigned char)i | 240;
481 i = (i - 240) >> 4;
482 for (idx = 1; i >= 128; ++idx) {
483 buf[idx] = (unsigned char)i | 128;
484 i = (i - 128) >> 7;
485 }
486 buf[idx++] = (unsigned char)i;
487 return idx;
488}
489
490/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
491 * happens when the buffer's end in reached. On success, the number of read
492 * bytes is returned. */
493static int
494decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
495{
496 unsigned char *msg = (unsigned char *)buf;
497 int idx = 0;
498
499 if (msg > (unsigned char *)end)
500 return -1;
501
502 if (msg[0] < 240) {
503 *i = msg[0];
504 return 1;
505 }
506 *i = msg[0];
507 do {
508 ++idx;
509 if (msg+idx > (unsigned char *)end)
510 return -1;
511 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
512 } while (msg[idx] >= 128);
513 return (idx + 1);
514}
515
516/* Encode a string. The string will be prefix by its length, encoded as a
517 * variable-length integer. This function never fails and returns the number of
518 * written bytes. */
519static int
520encode_spoe_string(const char *str, size_t len, char *dst)
521{
522 int idx = 0;
523
524 if (!len) {
525 dst[0] = 0;
526 return 1;
527 }
528
529 idx += encode_spoe_varint(len, dst);
530 memcpy(dst+idx, str, len);
531 return (idx + len);
532}
533
534/* Decode a string. Its length is decoded first as a variable-length integer. If
535 * it succeeds, and if the string length is valid, the begin of the string is
536 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
537 * read is returned. If an error occurred, -1 is returned and <*str> remains
538 * NULL. */
539static int
540decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
541{
542 int i, idx = 0;
543
544 *str = NULL;
545 *len = 0;
546
547 if ((i = decode_spoe_varint(buf, end, len)) == -1)
548 goto error;
549 idx += i;
550 if (buf + idx + *len > end)
551 goto error;
552
553 *str = buf+idx;
554 return (idx + *len);
555
556 error:
557 return -1;
558}
559
560/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
561 * of bytes read is returned. A types data is composed of a type (1 byte) and
562 * corresponding data:
563 * - boolean: non additional data (0 bytes)
564 * - integers: a variable-length integer (see decode_spoe_varint)
565 * - ipv4: 4 bytes
566 * - ipv6: 16 bytes
567 * - binary and string: a buffer prefixed by its size, a variable-length
568 * integer (see decode_spoe_string) */
569static int
570skip_spoe_data(char *frame, char *end)
571{
572 uint64_t sz = 0;
573 int i, idx = 0;
574
575 if (frame > end)
576 return -1;
577
578 switch (frame[idx++] & SPOE_DATA_T_MASK) {
579 case SPOE_DATA_T_BOOL:
580 break;
581 case SPOE_DATA_T_INT32:
582 case SPOE_DATA_T_INT64:
583 case SPOE_DATA_T_UINT32:
584 case SPOE_DATA_T_UINT64:
585 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
586 return -1;
587 idx += i;
588 break;
589 case SPOE_DATA_T_IPV4:
590 idx += 4;
591 break;
592 case SPOE_DATA_T_IPV6:
593 idx += 16;
594 break;
595 case SPOE_DATA_T_STR:
596 case SPOE_DATA_T_BIN:
597 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
598 return -1;
599 idx += i + sz;
600 break;
601 }
602
603 if (frame+idx > end)
604 return -1;
605 return idx;
606}
607
608/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
609 * number of read bytes is returned. See skip_spoe_data for details. */
610static int
611decode_spoe_data(char *frame, char *end, struct sample *smp)
612{
613 uint64_t sz = 0;
614 int type, i, idx = 0;
615
616 if (frame > end)
617 return -1;
618
619 type = frame[idx++];
620 switch (type & SPOE_DATA_T_MASK) {
621 case SPOE_DATA_T_BOOL:
622 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
623 smp->data.type = SMP_T_BOOL;
624 break;
625 case SPOE_DATA_T_INT32:
626 case SPOE_DATA_T_INT64:
627 case SPOE_DATA_T_UINT32:
628 case SPOE_DATA_T_UINT64:
629 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
630 return -1;
631 idx += i;
632 smp->data.type = SMP_T_SINT;
633 break;
634 case SPOE_DATA_T_IPV4:
635 if (frame+idx+4 > end)
636 return -1;
637 memcpy(&smp->data.u.ipv4, frame+idx, 4);
638 smp->data.type = SMP_T_IPV4;
639 idx += 4;
640 break;
641 case SPOE_DATA_T_IPV6:
642 if (frame+idx+16 > end)
643 return -1;
644 memcpy(&smp->data.u.ipv6, frame+idx, 16);
645 smp->data.type = SMP_T_IPV6;
646 idx += 16;
647 break;
648 case SPOE_DATA_T_STR:
649 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
650 return -1;
651 idx += i;
652 if (frame+idx+sz > end)
653 return -1;
654 smp->data.u.str.str = frame+idx;
655 smp->data.u.str.len = sz;
656 smp->data.type = SMP_T_STR;
657 idx += sz;
658 break;
659 case SPOE_DATA_T_BIN:
660 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
661 return -1;
662 idx += i;
663 if (frame+idx+sz > end)
664 return -1;
665 smp->data.u.str.str = frame+idx;
666 smp->data.u.str.len = sz;
667 smp->data.type = SMP_T_BIN;
668 idx += sz;
669 break;
670 }
671
672 if (frame+idx > end)
673 return -1;
674 return idx;
675}
676
677/* Skip an action in a frame received from an agent. If an error occurred, -1 is
678 * returned, otherwise the number of read bytes is returned. An action is
679 * composed of the action type followed by a typed data. */
680static int
681skip_spoe_action(char *frame, char *end)
682{
683 int n, i, idx = 0;
684
685 if (frame+2 > end)
686 return -1;
687
688 idx++; /* Skip the action type */
689 n = frame[idx++];
690 while (n-- > 0) {
691 if ((i = skip_spoe_data(frame+idx, end)) == -1)
692 return -1;
693 idx += i;
694 }
695
696 if (frame+idx > end)
697 return -1;
698 return idx;
699}
700
701/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
702 * success, 0 if the frame can be ignored and -1 if an error occurred. */
703static int
704prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
705{
706 int idx = 0;
707 size_t max = (7 /* TYPE + METADATA */
708 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
709 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
710 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
711
712 if (size < max)
713 return -1;
714
715 /* Frame type */
716 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
717
718 /* No flags for now */
719 memset(frame+idx, 0, 4);
720 idx += 4;
721
722 /* No stream-id and frame-id for HELLO frames */
723 frame[idx++] = 0;
724 frame[idx++] = 0;
725
726 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
727 * and "capabilities" */
728
729 /* "supported-versions" K/V item */
730 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
731 frame[idx++] = SPOE_DATA_T_STR;
732 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
733
734 /* "max-fram-size" K/V item */
735 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
736 frame[idx++] = SPOE_DATA_T_UINT32;
737 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
738
739 /* "capabilities" K/V item */
740 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
741 frame[idx++] = SPOE_DATA_T_STR;
742 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
743
744 return idx;
745}
746
747/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
748 * size on success, 0 if the frame can be ignored and -1 if an error
749 * occurred. */
750static int
751prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
752{
753 const char *reason;
754 int rlen, idx = 0;
755 size_t max = (7 /* TYPE + METADATA */
756 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
757 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
758
759 if (size < max)
760 return -1;
761
762 /* Get the message corresponding to the status code */
763 if (spoe_status_code >= SPOE_FRM_ERRS)
764 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
765 reason = spoe_frm_err_reasons[spoe_status_code];
766 rlen = strlen(reason);
767
768 /* Frame type */
769 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
770
771 /* No flags for now */
772 memset(frame+idx, 0, 4);
773 idx += 4;
774
775 /* No stream-id and frame-id for DISCONNECT frames */
776 frame[idx++] = 0;
777 frame[idx++] = 0;
778
779 /* There are 2 mandatory items: "status-code" and "message" */
780
781 /* "status-code" K/V item */
782 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
783 frame[idx++] = SPOE_DATA_T_UINT32;
784 idx += encode_spoe_varint(spoe_status_code, frame+idx);
785
786 /* "message" K/V item */
787 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
788 frame[idx++] = SPOE_DATA_T_STR;
789 idx += encode_spoe_string(reason, rlen, frame+idx);
790
791 return idx;
792}
793
794/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
795 * success, 0 if the frame can be ignored and -1 if an error occurred. */
796static int
797prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
798{
799 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
800 int idx = 0;
801
802 if (size < APPCTX_SPOE(appctx).max_frame_size)
803 return -1;
804
805 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
806
807 /* No flags for now */
808 memset(frame+idx, 0, 4);
809 idx += 4;
810
811 /* Set stream-id and frame-id */
812 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
813 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
814
815 /* Copy encoded messages */
816 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
817 idx += ctx->buffer->i;
818
819 return idx;
820}
821
822/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
823 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
824static int
825handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
826{
827 int vsn, max_frame_size;
828 int i, idx = 0;
829 size_t min_size = (7 /* TYPE + METADATA */
830 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
831 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
832 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
833
834 /* Check frame type */
835 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
836 return 0;
837
838 if (size < min_size) {
839 spoe_status_code = SPOE_FRM_ERR_INVALID;
840 return -1;
841 }
842
843 /* Skip flags: fragmentation is not supported for now */
844 idx += 4;
845
846 /* stream-id and frame-id must be cleared */
847 if (frame[idx] != 0 || frame[idx+1] != 0) {
848 spoe_status_code = SPOE_FRM_ERR_INVALID;
849 return -1;
850 }
851 idx += 2;
852
853 /* There are 3 mandatory items: "version", "max-frame-size" and
854 * "capabilities" */
855
856 /* Loop on K/V items */
857 vsn = max_frame_size = 0;
858 while (idx < size) {
859 char *str;
860 uint64_t sz;
861
862 /* Decode the item key */
863 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
864 if (str == NULL) {
865 spoe_status_code = SPOE_FRM_ERR_INVALID;
866 return -1;
867 }
868 /* Check "version" K/V item */
869 if (!memcmp(str, VERSION_KEY, sz)) {
870 /* The value must be a string */
871 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
872 spoe_status_code = SPOE_FRM_ERR_INVALID;
873 return -1;
874 }
875 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
876 if (str == NULL) {
877 spoe_status_code = SPOE_FRM_ERR_INVALID;
878 return -1;
879 }
880
881 vsn = decode_spoe_version(str, sz);
882 if (vsn == -1) {
883 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
884 return -1;
885 }
886 for (i = 0; supported_versions[i].str != NULL; ++i) {
887 if (vsn >= supported_versions[i].min &&
888 vsn <= supported_versions[i].max)
889 break;
890 }
891 if (supported_versions[i].str == NULL) {
892 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
893 return -1;
894 }
895 }
896 /* Check "max-frame-size" K/V item */
897 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
898 int type;
899
900 /* The value must be integer */
901 type = frame[idx++];
902 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
903 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
904 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
905 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
906 spoe_status_code = SPOE_FRM_ERR_INVALID;
907 return -1;
908 }
909 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
910 spoe_status_code = SPOE_FRM_ERR_INVALID;
911 return -1;
912 }
913 idx += i;
914 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
915 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
916 return -1;
917 }
918 max_frame_size = sz;
919 }
920 /* Skip "capabilities" K/V item for now */
921 else {
922 /* Silently ignore unknown item */
923 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
924 spoe_status_code = SPOE_FRM_ERR_INVALID;
925 return -1;
926 }
927 idx += i;
928 }
929 }
930
931 /* Final checks */
932 if (!vsn) {
933 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
934 return -1;
935 }
936 if (!max_frame_size) {
937 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
938 return -1;
939 }
940
941 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
942 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
943 return idx;
944}
945
946/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
947 * bytes on success, 0 if the frame can be ignored and -1 if an error
948 * occurred. */
949static int
950handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
951{
952 int i, idx = 0;
953 size_t min_size = (7 /* TYPE + METADATA */
954 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
955 + 1 + SLEN(MSG_KEY) + 1 + 1);
956
957 /* Check frame type */
958 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
959 return 0;
960
961 if (size < min_size) {
962 spoe_status_code = SPOE_FRM_ERR_INVALID;
963 return -1;
964 }
965
966 /* Skip flags: fragmentation is not supported for now */
967 idx += 4;
968
969 /* stream-id and frame-id must be cleared */
970 if (frame[idx] != 0 || frame[idx+1] != 0) {
971 spoe_status_code = SPOE_FRM_ERR_INVALID;
972 return -1;
973 }
974 idx += 2;
975
976 /* There are 2 mandatory items: "status-code" and "message" */
977
978 /* Loop on K/V items */
979 while (idx < size) {
980 char *str;
981 uint64_t sz;
982
983 /* Decode the item key */
984 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
985 if (str == NULL) {
986 spoe_status_code = SPOE_FRM_ERR_INVALID;
987 return -1;
988 }
989
990 /* Check "status-code" K/V item */
991 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
992 int type;
993
994 /* The value must be an integer */
995 type = frame[idx++];
996 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
997 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
998 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
999 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1000 spoe_status_code = SPOE_FRM_ERR_INVALID;
1001 return -1;
1002 }
1003 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1004 spoe_status_code = SPOE_FRM_ERR_INVALID;
1005 return -1;
1006 }
1007 idx += i;
1008 spoe_status_code = sz;
1009 }
1010
1011 /* Check "message" K/V item */
1012 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1013 /* The value must be a string */
1014 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1015 spoe_status_code = SPOE_FRM_ERR_INVALID;
1016 return -1;
1017 }
1018 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1019 if (str == NULL || sz > 255) {
1020 spoe_status_code = SPOE_FRM_ERR_INVALID;
1021 return -1;
1022 }
1023 memcpy(spoe_reason, str, sz);
1024 spoe_reason[sz] = 0;
1025 }
1026 else {
1027 /* Silently ignore unknown item */
1028 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1029 spoe_status_code = SPOE_FRM_ERR_INVALID;
1030 return -1;
1031 }
1032 idx += i;
1033 }
1034 }
1035
1036 return idx;
1037}
1038
1039
1040/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1041 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1042static int
1043handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1044{
1045 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1046 uint64_t stream_id, frame_id;
1047 int idx = 0;
1048 size_t min_size = (7 /* TYPE + METADATA */);
1049
1050 /* Check frame type */
1051 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1052 return 0;
1053
1054 if (size < min_size) {
1055 spoe_status_code = SPOE_FRM_ERR_INVALID;
1056 return -1;
1057 }
1058
1059 /* Skip flags: fragmentation is not supported for now */
1060 idx += 4;
1061
1062 /* Get the stream-id and the frame-id */
1063 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1064 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1065
1066 /* Check stream-id and frame-id */
1067 if (ctx->stream_id != (unsigned int)stream_id ||
1068 ctx->frame_id != (unsigned int)frame_id)
1069 return 0;
1070
1071 /* Copy encoded actions */
1072 b_reset(ctx->buffer);
1073 memcpy(ctx->buffer->p, frame+idx, size-idx);
1074 ctx->buffer->i = size-idx;
1075
1076 return idx;
1077}
1078
Christopher Fauletba7bc162016-11-07 21:07:38 +01001079/* This function is used in cfgparse.c and declared in proto/checks.h. It
1080 * prepare the request to send to agents during a healthcheck. It returns 0 on
1081 * success and -1 if an error occurred. */
1082int
1083prepare_spoe_healthcheck_request(char **req, int *len)
1084{
1085 struct appctx a;
1086 char *frame, buf[global.tune.bufsize];
1087 unsigned int framesz;
1088 int idx;
1089
1090 memset(&a, 0, sizeof(a));
1091 memset(buf, 0, sizeof(buf));
1092 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1093
1094 frame = buf+4;
1095 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1096 if (idx <= 0)
1097 return -1;
1098 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1099 return -1;
1100
1101 /* "healthcheck" K/V item */
1102 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1103 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1104
1105 framesz = htonl(idx);
1106 memcpy(buf, (char *)&framesz, 4);
1107
1108 if ((*req = malloc(idx+4)) == NULL)
1109 return -1;
1110 memcpy(*req, buf, idx+4);
1111 *len = idx+4;
1112 return 0;
1113}
1114
1115/* This function is used in checks.c and declared in proto/checks.h. It decode
1116 * the response received from an agent during a healthcheck. It returns 0 on
1117 * success and -1 if an error occurred. */
1118int
1119handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1120{
1121 struct appctx a;
1122 int r;
1123
1124 memset(&a, 0, sizeof(a));
1125 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1126
1127 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1128 goto error;
1129 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1130 if (r == 0)
1131 spoe_status_code = SPOE_FRM_ERR_INVALID;
1132 goto error;
1133 }
1134
1135 return 0;
1136
1137 error:
1138 if (spoe_status_code >= SPOE_FRM_ERRS)
1139 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1140 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1141 return -1;
1142}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001143
1144/********************************************************************
1145 * Functions that manage the SPOE applet
1146 ********************************************************************/
1147/* Callback function that catches applet timeouts. If a timeout occurred, we set
1148 * <appctx->st1> flag and the SPOE applet is woken up. */
1149static struct task *
1150process_spoe_applet(struct task * task)
1151{
1152 struct appctx *appctx = task->context;
1153
1154 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1155 if (tick_is_expired(task->expire, now_ms)) {
1156 task->expire = TICK_ETERNITY;
1157 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1158 }
1159 si_applet_want_get(appctx->owner);
1160 appctx_wakeup(appctx);
1161 return task;
1162}
1163
1164/* Remove a SPOE applet from the agent cache */
1165static void
1166remove_spoe_applet_from_cache(struct appctx *appctx)
1167{
1168 struct appctx *a, *back;
1169 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1170
1171 if (LIST_ISEMPTY(&agent->cache))
1172 return;
1173
1174 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1175 if (a == appctx) {
1176 LIST_DEL(&APPCTX_SPOE(appctx).list);
1177 break;
1178 }
1179 }
1180}
1181
1182
1183/* Callback function that releases a SPOE applet. This happens when the
1184 * connection with the agent is closed. */
1185static void
1186release_spoe_applet(struct appctx *appctx)
1187{
1188 struct stream_interface *si = appctx->owner;
1189 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1190 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1191
1192 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1193 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1194 on_new_spoe_appctx_failure(agent);
1195
1196 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1197 si_shutw(si);
1198 si_shutr(si);
1199 si_ic(si)->flags |= CF_READ_NULL;
1200 appctx->st0 = SPOE_APPCTX_ST_END;
1201 }
1202
1203 if (ctx != NULL) {
1204 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1205 ctx->appctx = NULL;
1206 }
1207
1208 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1209 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1210 __FUNCTION__, appctx);
1211
1212 /* Release the task attached to the SPOE applet */
1213 if (APPCTX_SPOE(appctx).task) {
1214 task_delete(APPCTX_SPOE(appctx).task);
1215 task_free(APPCTX_SPOE(appctx).task);
1216 }
1217
1218 /* And remove it from the agent cache */
1219 remove_spoe_applet_from_cache(appctx);
1220 APPCTX_SPOE(appctx).ctx = NULL;
1221}
1222
1223/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1224 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1225 * encoded using the callback function <prepare>. */
1226static int
1227send_spoe_frame(struct appctx *appctx,
1228 int (*prepare)(struct appctx *, char *, size_t))
1229{
1230 struct stream_interface *si = appctx->owner;
1231 int framesz, ret;
1232 uint32_t netint;
1233
1234 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1235 if (ret <= 0)
1236 goto skip_or_error;
1237 framesz = ret;
1238 netint = htonl(framesz);
1239 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1240 if (ret > 0)
1241 ret = bi_putblk(si_ic(si), trash.str, framesz);
1242 if (ret <= 0) {
1243 if (ret == -1)
1244 return -1;
1245 return -2;
1246 }
1247 return 1;
1248
1249 skip_or_error:
1250 if (!ret)
1251 return -1;
1252 return -2;
1253}
1254
1255/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1256 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1257 * is decoded using the callback function <handle>. */
1258static int
1259recv_spoe_frame(struct appctx *appctx,
1260 int (*handle)(struct appctx *, char *, size_t))
1261{
1262 struct stream_interface *si = appctx->owner;
1263 int framesz, ret;
1264 uint32_t netint;
1265
1266 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1267 if (ret <= 0)
1268 goto empty_or_error;
1269 framesz = ntohl(netint);
1270 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1271 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1272 return -2;
1273 }
1274
1275 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1276 if (ret <= 0)
1277 goto empty_or_error;
1278 bo_skip(si_oc(si), ret+sizeof(netint));
1279
1280 /* First check if the received frame is a DISCONNECT frame */
1281 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1282 if (ret != 0) {
1283 if (ret > 0) {
1284 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1285 " - disconnected by peer (%d): %s\n",
1286 (int)now.tv_sec, (int)now.tv_usec,
1287 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1288 __FUNCTION__, appctx, spoe_status_code,
1289 spoe_reason);
1290 return 2;
1291 }
1292 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1293 " - error on frame (%s)\n",
1294 (int)now.tv_sec, (int)now.tv_usec,
1295 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1296 __FUNCTION__, appctx,
1297 spoe_frm_err_reasons[spoe_status_code]);
1298 return -2;
1299 }
1300 if (handle == NULL)
1301 goto out;
1302
1303 /* If not, try to decode it */
1304 ret = handle(appctx, trash.str, framesz);
1305 if (ret <= 0) {
1306 if (!ret)
1307 return -1;
1308 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1309 " - error on frame (%s)\n",
1310 (int)now.tv_sec, (int)now.tv_usec,
1311 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1312 __FUNCTION__, appctx,
1313 spoe_frm_err_reasons[spoe_status_code]);
1314 return -2;
1315 }
1316 out:
1317 return 1;
1318
1319 empty_or_error:
1320 if (!ret)
1321 return 0;
1322 spoe_status_code = SPOE_FRM_ERR_IO;
1323 return -2;
1324}
1325
1326/* I/O Handler processing messages exchanged with the agent */
1327static void
1328handle_spoe_applet(struct appctx *appctx)
1329{
1330 struct stream_interface *si = appctx->owner;
1331 struct stream *s = si_strm(si);
1332 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1333 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1334 int ret;
1335
1336 switchstate:
1337 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1338 " - appctx-state=%s\n",
1339 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1340 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1341
1342 switch (appctx->st0) {
1343 case SPOE_APPCTX_ST_CONNECT:
1344 spoe_status_code = SPOE_FRM_ERR_NONE;
1345 if (si->state <= SI_ST_CON) {
1346 si_applet_want_put(si);
1347 task_wakeup(s->task, TASK_WOKEN_MSG);
1348 break;
1349 }
1350 else if (si->state != SI_ST_EST) {
1351 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1352 on_new_spoe_appctx_failure(agent);
1353 goto switchstate;
1354 }
1355 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1356 if (ret < 0) {
1357 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1358 on_new_spoe_appctx_failure(agent);
1359 goto switchstate;
1360 }
1361 else if (!ret)
1362 goto full;
1363
1364 /* Hello frame was sent. Set the hello timeout and
1365 * wait for the reply. */
1366 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1367 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1368 /* fall through */
1369
1370 case SPOE_APPCTX_ST_CONNECTING:
1371 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1372 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1373 on_new_spoe_appctx_failure(agent);
1374 goto switchstate;
1375 }
1376 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1377 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1378 " - Connection timed out\n",
1379 (int)now.tv_sec, (int)now.tv_usec,
1380 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1381 __FUNCTION__, appctx);
1382 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1383 on_new_spoe_appctx_failure(agent);
1384 goto switchstate;
1385 }
1386 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1387 if (ret < 0) {
1388 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1389 on_new_spoe_appctx_failure(agent);
1390 goto switchstate;
1391 }
1392 if (ret == 2) {
1393 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1394 on_new_spoe_appctx_failure(agent);
1395 goto switchstate;
1396 }
1397 if (!ret)
1398 goto out;
1399
1400 /* hello handshake is finished, set the idle timeout,
1401 * Add the appctx in the agent cache, decrease the
1402 * number of new applets and wake up waiting streams. */
1403 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1404 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1405 on_new_spoe_appctx_success(agent, appctx);
1406 break;
1407
1408 case SPOE_APPCTX_ST_PROCESSING:
1409 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1410 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1411 goto switchstate;
1412 }
1413 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1414 spoe_status_code = SPOE_FRM_ERR_TOUT;
1415 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1416 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1417 goto switchstate;
1418 }
1419 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1420 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1421 if (ret < 0) {
1422 if (ret == -1) {
1423 ctx->state = SPOE_CTX_ST_ERROR;
1424 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1425 goto skip_notify_frame;
1426 }
1427 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1428 goto switchstate;
1429 }
1430 else if (!ret)
1431 goto full;
1432 ctx->state = SPOE_CTX_ST_WAITING_ACK;
Christopher Faulet03a34492016-11-19 16:47:56 +01001433 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001434 }
1435
1436 skip_notify_frame:
1437 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1438 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1439 if (ret < 0) {
1440 if (ret == -1)
1441 goto skip_notify_frame;
1442 ctx->state = SPOE_CTX_ST_ERROR;
1443 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1444 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1445 goto switchstate;
1446 }
1447 if (!ret)
1448 goto out;
1449 if (ret == 2) {
1450 ctx->state = SPOE_CTX_ST_ERROR;
1451 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1452 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1453 goto switchstate;
1454 }
1455 ctx->state = SPOE_CTX_ST_DONE;
1456 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1457 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1458 }
1459 else {
1460 if (stopping) {
1461 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1462 goto switchstate;
1463 }
1464
1465 ret = recv_spoe_frame(appctx, NULL);
1466 if (ret < 0) {
1467 if (ret == -1)
1468 goto skip_notify_frame;
1469 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1470 goto switchstate;
1471 }
1472 if (!ret)
1473 goto out;
1474 if (ret == 2) {
1475 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1476 goto switchstate;
1477 }
1478 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1479 }
1480 break;
1481
1482 case SPOE_APPCTX_ST_DISCONNECT:
1483 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1484 if (ret < 0) {
1485 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1486 goto switchstate;
1487 }
1488 else if (!ret)
1489 goto full;
1490 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1491 " - disconnected by HAProxy (%d): %s\n",
1492 (int)now.tv_sec, (int)now.tv_usec,
1493 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1494 __FUNCTION__, appctx, spoe_status_code,
1495 spoe_frm_err_reasons[spoe_status_code]);
1496
Christopher Faulet03a34492016-11-19 16:47:56 +01001497 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001498 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1499 /* fall through */
1500
1501 case SPOE_APPCTX_ST_DISCONNECTING:
1502 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1503 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1504 goto switchstate;
1505 }
1506 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1507 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1508 goto switchstate;
1509 }
1510 ret = recv_spoe_frame(appctx, NULL);
1511 if (ret < 0 || ret == 2) {
1512 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1513 goto switchstate;
1514 }
1515 break;
1516
1517 case SPOE_APPCTX_ST_EXIT:
1518 si_shutw(si);
1519 si_shutr(si);
1520 si_ic(si)->flags |= CF_READ_NULL;
1521 appctx->st0 = SPOE_APPCTX_ST_END;
1522 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1523 /* fall through */
1524
1525 case SPOE_APPCTX_ST_END:
1526 break;
1527 }
1528
1529 out:
1530 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1531 task_queue(APPCTX_SPOE(appctx).task);
1532 si_oc(si)->flags |= CF_READ_DONTWAIT;
1533 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1534 return;
1535 full:
1536 si_applet_cant_put(si);
1537 goto out;
1538}
1539
1540struct applet spoe_applet = {
1541 .obj_type = OBJ_TYPE_APPLET,
1542 .name = "<SPOE>", /* used for logging */
1543 .fct = handle_spoe_applet,
1544 .release = release_spoe_applet,
1545};
1546
1547/* Create a SPOE applet. On success, the created applet is returned, else
1548 * NULL. */
1549static struct appctx *
1550create_spoe_appctx(struct spoe_config *conf)
1551{
1552 struct appctx *appctx;
1553 struct session *sess;
1554 struct task *task;
1555 struct stream *strm;
1556 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1557 struct listener *, by_fe);
1558
1559 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1560 goto out_error;
1561
1562 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1563 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1564 goto out_free_appctx;
1565 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1566 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1567 APPCTX_SPOE(appctx).task->context = appctx;
1568 APPCTX_SPOE(appctx).agent = conf->agent;
1569 APPCTX_SPOE(appctx).ctx = NULL;
1570 APPCTX_SPOE(appctx).version = 0;
1571 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1572 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1573
1574 sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1575 if (!sess)
1576 goto out_free_spoe;
1577
1578 if ((task = task_new()) == NULL)
1579 goto out_free_sess;
1580
1581 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1582 goto out_free_task;
1583
1584 strm->target = sess->listener->default_target;
1585 strm->req.analysers |= sess->listener->analysers;
1586 stream_set_backend(strm, conf->agent->b.be);
1587
1588 /* applet is waiting for data */
1589 si_applet_cant_get(&strm->si[0]);
1590 appctx_wakeup(appctx);
1591
1592 /* Increase the number of applets waiting the end of the hello
1593 * handshake. */
1594 conf->agent->new_applets++;
1595
1596 strm->do_log = NULL;
1597 strm->res.flags |= CF_READ_DONTWAIT;
1598
1599 conf->agent_fe.feconn++;
1600 jobs++;
1601 totalconn++;
1602
1603 return appctx;
1604
1605 /* Error unrolling */
1606 out_free_task:
1607 task_free(task);
1608 out_free_sess:
1609 session_free(sess);
1610 out_free_spoe:
1611 task_free(APPCTX_SPOE(appctx).task);
1612 out_free_appctx:
1613 appctx_free(appctx);
1614 out_error:
1615 return NULL;
1616}
1617
1618/* Wake up a SPOE applet attached to a SPOE context. */
1619static void
1620wakeup_spoe_appctx(struct spoe_context *ctx)
1621{
1622 if (ctx->appctx == NULL)
1623 return;
1624 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1625 si_applet_want_get(ctx->appctx->owner);
1626 si_applet_want_put(ctx->appctx->owner);
1627 appctx_wakeup(ctx->appctx);
1628 }
1629}
1630
1631
1632/* Run across the list of pending streams waiting for a SPOE applet and wake the
1633 * first. */
1634static void
1635offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1636{
1637 struct spoe_context *ctx;
1638
Christopher Fauletf7a30922016-11-10 15:04:51 +01001639 if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1640 return;
1641
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001642 if (LIST_ISEMPTY(&agent->applet_wq))
1643 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1644 else {
1645 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1646 APPCTX_SPOE(appctx).ctx = ctx;
1647 ctx->appctx = appctx;
1648 LIST_DEL(&ctx->applet_wait);
1649 LIST_INIT(&ctx->applet_wait);
1650 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1651 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1652 " - wake up stream to get available SPOE applet\n",
1653 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1654 __FUNCTION__, ctx->strm);
1655 }
1656}
1657
1658/* A failure occurred during SPOE applet creation. */
1659static void
1660on_new_spoe_appctx_failure(struct spoe_agent *agent)
1661{
1662 struct spoe_context *ctx;
1663
1664 agent->new_applets--;
1665 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
1666 ctx->errs++;
1667 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1668 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1669 " - wake up stream because to SPOE applet connection failed\n",
1670 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1671 __FUNCTION__, ctx->strm);
1672 }
1673}
1674
1675static void
1676on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1677{
1678 agent->new_applets--;
1679 offer_spoe_appctx(agent, appctx);
1680}
1681/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1682 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1683static int
1684acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1685{
1686 struct spoe_config *conf = FLT_CONF(ctx->filter);
1687 struct spoe_agent *agent = conf->agent;
1688 struct appctx *appctx;
1689
1690 /* If a process is already started for this SPOE context, retry
1691 * later. */
1692 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1693 goto wait;
1694
1695 /* If needed, initialize the buffer that will be used to encode messages
1696 * and decode actions. */
1697 if (ctx->buffer == &buf_empty) {
1698 if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
1699 LIST_DEL(&ctx->buffer_wait);
1700 LIST_INIT(&ctx->buffer_wait);
1701 }
1702
1703 if (!b_alloc_margin(&ctx->buffer, 0)) {
1704 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
1705 goto wait;
1706 }
1707 }
1708
1709 /* If the SPOE applet was already set, all is done. */
1710 if (ctx->appctx)
1711 goto success;
1712
1713 /* Else try to retrieve it from the agent cache */
1714 if (!LIST_ISEMPTY(&agent->cache)) {
1715 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1716 LIST_DEL(&APPCTX_SPOE(appctx).list);
1717 APPCTX_SPOE(appctx).ctx = ctx;
1718 ctx->appctx = appctx;
1719 goto success;
1720 }
1721
1722 /* If there is no server up for the agent's backend or it too many
1723 * failure occurred, this is an error. */
1724 if ((!agent->b.be->srv_act && !agent->b.be->srv_bck) ||
1725 ctx->errs >= MAX_NEW_SPOE_APPLET_ERRS)
1726 goto error;
1727
1728 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1729 " - waiting for available SPOE appctx\n",
1730 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1731 ctx->strm);
1732
1733 /* Else add the stream in the waiting queue. */
1734 if (LIST_ISEMPTY(&ctx->applet_wait))
1735 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1736
1737 /* Finally, create new SPOE applet if we can */
1738 if (agent->new_applets < MAX_NEW_SPOE_APPLETS) {
1739 if (create_spoe_appctx(conf) == NULL)
1740 goto error;
1741 }
1742
1743 wait:
1744 return 0;
1745
1746 success:
1747 /* Remove the stream from the waiting queue */
1748 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1749 LIST_DEL(&ctx->applet_wait);
1750 LIST_INIT(&ctx->applet_wait);
1751 }
1752
1753 /* Set the right flag to prevent request and response processing
1754 * in same time. */
1755 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1756 ? SPOE_CTX_FL_REQ_PROCESS
1757 : SPOE_CTX_FL_RSP_PROCESS);
1758 ctx->errs = 0;
1759
1760 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1761 " - acquire SPOE appctx %p from cache\n",
1762 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1763 __FUNCTION__, ctx->strm, ctx->appctx);
1764 return 1;
1765
1766 error:
1767 /* Remove the stream from the waiting queue */
1768 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1769 LIST_DEL(&ctx->applet_wait);
1770 LIST_INIT(&ctx->applet_wait);
1771 }
1772
1773 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1774 " - failed to acquire SPOE appctx errs=%u\n",
1775 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1776 __FUNCTION__, ctx->strm, ctx->errs);
1777 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1778
1779 return -1;
1780}
1781
1782/* Release a SPOE applet and push it in the agent cache. */
1783static void
1784release_spoe_appctx(struct spoe_context *ctx)
1785{
1786 struct spoe_config *conf = FLT_CONF(ctx->filter);
1787 struct spoe_agent *agent = conf->agent;
1788 struct appctx *appctx = ctx->appctx;
1789
1790 /* Reset the flag to allow next processing */
1791 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1792
Christopher Fauletf7a30922016-11-10 15:04:51 +01001793 /* Reset processing timer */
1794 ctx->process_exp = TICK_ETERNITY;
1795
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001796 /* Release the buffer if needed */
1797 if (ctx->buffer != &buf_empty) {
1798 b_free(&ctx->buffer);
1799 if (!LIST_ISEMPTY(&buffer_wq))
1800 stream_offer_buffers();
1801 }
1802
1803 /* If there is no SPOE applet, all is done */
1804 if (!appctx)
1805 return;
1806
1807 /* Else, reassign it or push it in the agent cache */
1808 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1809 " - release SPOE appctx %p\n",
1810 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1811 __FUNCTION__, ctx->strm, appctx);
1812
1813 APPCTX_SPOE(appctx).ctx = NULL;
1814 ctx->appctx = NULL;
1815 offer_spoe_appctx(agent, appctx);
1816}
1817
1818/***************************************************************************
1819 * Functions that process SPOE messages and actions
1820 **************************************************************************/
1821/* Process SPOE messages for a specific event. During the processing, it returns
1822 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1823 * is returned. */
1824static int
1825process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1826 struct list *messages, int dir)
1827{
1828 struct spoe_message *msg;
1829 struct sample *smp;
1830 struct spoe_arg *arg;
1831 char *p;
1832 size_t max_size;
1833 int off, flag, idx = 0;
1834
1835 /* Reserve 32 bytes from the frame Metadata */
1836 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1837
1838 b_reset(ctx->buffer);
1839 p = ctx->buffer->p;
1840
1841 /* Loop on messages */
1842 list_for_each_entry(msg, messages, list) {
1843 if (idx + msg->id_len + 1 > max_size)
1844 goto skip;
1845
1846 /* Set the message name */
1847 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1848
1849 /* Save offset where to store the number of arguments for this
1850 * message */
1851 off = idx++;
1852 p[off] = 0;
1853
1854 /* Loop on arguments */
1855 list_for_each_entry(arg, &msg->args, list) {
1856 p[off]++; /* Increment the number of arguments */
1857
1858 if (idx + arg->name_len + 1 > max_size)
1859 goto skip;
1860
1861 /* Encode the arguement name as a string. It can by NULL */
1862 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1863
1864 /* Fetch the arguement value */
1865 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1866 if (!smp) {
1867 /* If no value is available, set it to NULL */
1868 p[idx++] = SPOE_DATA_T_NULL;
1869 continue;
1870 }
1871
1872 /* Else, encode the arguement value */
1873 switch (smp->data.type) {
1874 case SMP_T_BOOL:
1875 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1876 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1877 break;
1878 case SMP_T_SINT:
1879 p[idx++] = SPOE_DATA_T_INT64;
1880 if (idx + 8 > max_size)
1881 goto skip;
1882 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1883 break;
1884 case SMP_T_IPV4:
1885 p[idx++] = SPOE_DATA_T_IPV4;
1886 if (idx + 4 > max_size)
1887 goto skip;
1888 memcpy(p+idx, &smp->data.u.ipv4, 4);
1889 idx += 4;
1890 break;
1891 case SMP_T_IPV6:
1892 p[idx++] = SPOE_DATA_T_IPV6;
1893 if (idx + 16 > max_size)
1894 goto skip;
1895 memcpy(p+idx, &smp->data.u.ipv6, 16);
1896 idx += 16;
1897 break;
1898 case SMP_T_STR:
1899 p[idx++] = SPOE_DATA_T_STR;
1900 if (idx + smp->data.u.str.len > max_size)
1901 goto skip;
1902 idx += encode_spoe_string(smp->data.u.str.str,
1903 smp->data.u.str.len,
1904 p+idx);
1905 break;
1906 case SMP_T_BIN:
1907 p[idx++] = SPOE_DATA_T_BIN;
1908 if (idx + smp->data.u.str.len > max_size)
1909 goto skip;
1910 idx += encode_spoe_string(smp->data.u.str.str,
1911 smp->data.u.str.len,
1912 p+idx);
1913 break;
1914 case SMP_T_METH:
1915 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1916 p[idx++] = SPOE_DATA_T_STR;
1917 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1918 goto skip;
1919 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1920 http_known_methods[smp->data.u.meth.meth].len,
1921 p+idx);
1922 }
1923 else {
1924 p[idx++] = SPOE_DATA_T_STR;
1925 if (idx + smp->data.u.str.len > max_size)
1926 goto skip;
1927 idx += encode_spoe_string(smp->data.u.meth.str.str,
1928 smp->data.u.meth.str.len,
1929 p+idx);
1930 }
1931 break;
1932 default:
1933 p[idx++] = SPOE_DATA_T_NULL;
1934 }
1935 }
1936 }
1937 ctx->buffer->i = idx;
1938 return 1;
1939
1940 skip:
1941 b_reset(ctx->buffer);
1942 return 0;
1943}
1944
1945/* Helper function to set a variable */
1946static void
1947set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1948 struct sample *smp)
1949{
1950 struct spoe_config *conf = FLT_CONF(ctx->filter);
1951 struct spoe_agent *agent = conf->agent;
1952 char varname[64];
1953
1954 memset(varname, 0, sizeof(varname));
1955 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1956 scope, agent->var_pfx, len, name);
1957 vars_set_by_name_ifexist(varname, len, smp);
1958}
1959
1960/* Helper function to unset a variable */
1961static void
1962unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1963 struct sample *smp)
1964{
1965 struct spoe_config *conf = FLT_CONF(ctx->filter);
1966 struct spoe_agent *agent = conf->agent;
1967 char varname[64];
1968
1969 memset(varname, 0, sizeof(varname));
1970 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1971 scope, agent->var_pfx, len, name);
1972 vars_unset_by_name_ifexist(varname, len, smp);
1973}
1974
1975
1976/* Process SPOE actions for a specific event. During the processing, it returns
1977 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1978 * is returned. */
1979static int
1980process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1981 enum spoe_event ev, int dir)
1982{
1983 char *p;
1984 size_t size;
1985 int off, i, idx = 0;
1986
1987 p = ctx->buffer->p;
1988 size = ctx->buffer->i;
1989
1990 while (idx < size) {
1991 char *str;
1992 uint64_t sz;
1993 struct sample smp;
1994 enum spoe_action_type type;
1995
1996 off = idx;
1997 if (idx+2 > size)
1998 goto skip;
1999
2000 type = p[idx++];
2001 switch (type) {
2002 case SPOE_ACT_T_SET_VAR: {
2003 char *scope;
2004
2005 if (p[idx++] != 3)
2006 goto skip_action;
2007
2008 switch (p[idx++]) {
2009 case SPOE_SCOPE_PROC: scope = "proc"; break;
2010 case SPOE_SCOPE_SESS: scope = "sess"; break;
2011 case SPOE_SCOPE_TXN : scope = "txn"; break;
2012 case SPOE_SCOPE_REQ : scope = "req"; break;
2013 case SPOE_SCOPE_RES : scope = "res"; break;
2014 default: goto skip;
2015 }
2016
2017 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2018 if (str == NULL)
2019 goto skip;
2020 memset(&smp, 0, sizeof(smp));
2021 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2022 if (decode_spoe_data(p+idx, p+size, &smp) == -1)
2023 goto skip;
2024
2025 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2026 " - set-var '%s.%s.%.*s'\n",
2027 (int)now.tv_sec, (int)now.tv_usec,
2028 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2029 __FUNCTION__, s, scope,
2030 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2031 (int)sz, str);
2032
2033 set_spoe_var(ctx, scope, str, sz, &smp);
2034 break;
2035 }
2036
2037 case SPOE_ACT_T_UNSET_VAR: {
2038 char *scope;
2039
2040 if (p[idx++] != 2)
2041 goto skip_action;
2042
2043 switch (p[idx++]) {
2044 case SPOE_SCOPE_PROC: scope = "proc"; break;
2045 case SPOE_SCOPE_SESS: scope = "sess"; break;
2046 case SPOE_SCOPE_TXN : scope = "txn"; break;
2047 case SPOE_SCOPE_REQ : scope = "req"; break;
2048 case SPOE_SCOPE_RES : scope = "res"; break;
2049 default: goto skip;
2050 }
2051
2052 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2053 if (str == NULL)
2054 goto skip;
2055 memset(&smp, 0, sizeof(smp));
2056 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2057
2058 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2059 " - unset-var '%s.%s.%.*s'\n",
2060 (int)now.tv_sec, (int)now.tv_usec,
2061 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2062 __FUNCTION__, s, scope,
2063 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2064 (int)sz, str);
2065
2066 unset_spoe_var(ctx, scope, str, sz, &smp);
2067 break;
2068 }
2069
2070 default:
2071 skip_action:
2072 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2073 goto skip;
2074 idx += i;
2075 }
2076 }
2077
2078 return 1;
2079 skip:
2080 return 0;
2081}
2082
2083
2084/* Process a SPOE event. First, this functions will process messages attached to
2085 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2086 * ACK frame to process corresponding actions. During all the processing, it
2087 * returns 0 and it returns 1 when the processing is finished. If an error
2088 * occurred, -1 is returned. */
2089static int
2090process_spoe_event(struct stream *s, struct spoe_context *ctx,
2091 enum spoe_event ev)
2092{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002093 struct spoe_config *conf = FLT_CONF(ctx->filter);
2094 struct spoe_agent *agent = conf->agent;
2095 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002096
2097 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2098 " - ctx-state=%s - event=%s\n",
2099 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002100 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002101 spoe_event_str[ev]);
2102
2103 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2104
2105 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2106 goto out;
2107
2108 if (ctx->state == SPOE_CTX_ST_ERROR)
2109 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002110
2111 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2112 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2113 " - failed to process event '%s': timeout\n",
2114 (int)now.tv_sec, (int)now.tv_usec,
2115 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2116 send_log(ctx->strm->be, LOG_WARNING,
2117 "failed to process event '%s': timeout.\n",
2118 spoe_event_str[ev]);
2119 goto error;
2120 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002121
2122 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauletf7a30922016-11-10 15:04:51 +01002123 if (!tick_isset(ctx->process_exp)) {
2124 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2125 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2126 ctx->process_exp);
2127 }
2128
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002129 ret = acquire_spoe_appctx(ctx, dir);
2130 if (ret <= 0) {
2131 if (!ret)
2132 goto out;
2133 goto error;
2134 }
2135 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2136 }
2137
2138 if (ctx->appctx == NULL)
2139 goto error;
2140
2141 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2142 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2143 if (ret <= 0) {
2144 if (!ret)
2145 goto skip;
2146 goto error;
2147 }
2148 wakeup_spoe_appctx(ctx);
2149 ret = 0;
2150 goto out;
2151 }
2152
2153 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2154 wakeup_spoe_appctx(ctx);
2155 ret = 0;
2156 goto out;
2157 }
2158
2159 if (ctx->state == SPOE_CTX_ST_DONE) {
2160 ret = process_spoe_actions(s, ctx, ev, dir);
2161 if (ret <= 0) {
2162 if (!ret)
2163 goto skip;
2164 goto error;
2165 }
2166 ctx->frame_id++;
2167 release_spoe_appctx(ctx);
2168 ctx->state = SPOE_CTX_ST_READY;
2169 }
2170
2171 out:
2172 return ret;
2173
2174 skip:
2175 release_spoe_appctx(ctx);
2176 ctx->state = SPOE_CTX_ST_READY;
2177 return 1;
2178
2179 error:
2180 release_spoe_appctx(ctx);
2181 ctx->state = SPOE_CTX_ST_ERROR;
2182 return 1;
2183}
2184
2185
2186/***************************************************************************
2187 * Functions that create/destroy SPOE contexts
2188 **************************************************************************/
2189static struct spoe_context *
2190create_spoe_context(struct filter *filter)
2191{
2192 struct spoe_config *conf = FLT_CONF(filter);
2193 struct spoe_context *ctx;
2194
2195 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2196 if (ctx == NULL) {
2197 return NULL;
2198 }
2199 memset(ctx, 0, sizeof(*ctx));
2200 ctx->filter = filter;
2201 ctx->state = SPOE_CTX_ST_NONE;
2202 ctx->flags = 0;
2203 ctx->errs = 0;
2204 ctx->messages = conf->agent->messages;
2205 ctx->buffer = &buf_empty;
2206 LIST_INIT(&ctx->buffer_wait);
2207 LIST_INIT(&ctx->applet_wait);
2208
Christopher Fauletf7a30922016-11-10 15:04:51 +01002209 ctx->stream_id = 0;
2210 ctx->frame_id = 1;
2211 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002212
2213 return ctx;
2214}
2215
2216static void
2217destroy_spoe_context(struct spoe_context *ctx)
2218{
2219 if (!ctx)
2220 return;
2221
2222 if (ctx->appctx)
2223 APPCTX_SPOE(ctx->appctx).ctx = NULL;
2224 if (!LIST_ISEMPTY(&ctx->buffer_wait))
2225 LIST_DEL(&ctx->buffer_wait);
2226 if (!LIST_ISEMPTY(&ctx->applet_wait))
2227 LIST_DEL(&ctx->applet_wait);
2228 pool_free2(pool2_spoe_ctx, ctx);
2229}
2230
2231static void
2232reset_spoe_context(struct spoe_context *ctx)
2233{
2234 ctx->state = SPOE_CTX_ST_READY;
2235 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2236}
2237
2238
2239/***************************************************************************
2240 * Hooks that manage the filter lifecycle (init/check/deinit)
2241 **************************************************************************/
2242/* Signal handler: Do a soft stop, wakeup SPOE applet */
2243static void
2244sig_stop_spoe(struct sig_handler *sh)
2245{
2246 struct proxy *p;
2247
2248 p = proxy;
2249 while (p) {
2250 struct flt_conf *fconf;
2251
2252 list_for_each_entry(fconf, &p->filter_configs, list) {
2253 struct spoe_config *conf = fconf->conf;
2254 struct spoe_agent *agent = conf->agent;
2255 struct appctx *appctx;
2256
2257 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2258 si_applet_want_get(appctx->owner);
2259 si_applet_want_put(appctx->owner);
2260 appctx_wakeup(appctx);
2261 }
2262 }
2263 p = p->next;
2264 }
2265}
2266
2267
2268/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2269static int
2270spoe_init(struct proxy *px, struct flt_conf *fconf)
2271{
2272 struct spoe_config *conf = fconf->conf;
2273 struct listener *l;
2274
2275 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2276 init_new_proxy(&conf->agent_fe);
2277 conf->agent_fe.parent = conf->agent;
2278 conf->agent_fe.last_change = now.tv_sec;
2279 conf->agent_fe.id = conf->agent->id;
2280 conf->agent_fe.cap = PR_CAP_FE;
2281 conf->agent_fe.mode = PR_MODE_TCP;
2282 conf->agent_fe.maxconn = 0;
2283 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2284 conf->agent_fe.conn_retries = CONN_RETRIES;
2285 conf->agent_fe.accept = frontend_accept;
2286 conf->agent_fe.srv = NULL;
2287 conf->agent_fe.timeout.client = TICK_ETERNITY;
2288 conf->agent_fe.default_target = &spoe_applet.obj_type;
2289 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2290
2291 if ((l = calloc(1, sizeof(*l))) == NULL) {
2292 Alert("spoe_init : out of memory.\n");
2293 goto out_error;
2294 }
2295 l->obj_type = OBJ_TYPE_LISTENER;
2296 l->obj_type = OBJ_TYPE_LISTENER;
2297 l->frontend = &conf->agent_fe;
2298 l->state = LI_READY;
2299 l->analysers = conf->agent_fe.fe_req_ana;
2300 LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2301
2302 if (!sighandler_registered) {
2303 signal_register_fct(0, sig_stop_spoe, 0);
2304 sighandler_registered = 1;
2305 }
2306
2307 return 0;
2308
2309 out_error:
2310 return -1;
2311}
2312
2313/* Free ressources allocated by the SPOE filter. */
2314static void
2315spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2316{
2317 struct spoe_config *conf = fconf->conf;
2318
2319 if (conf) {
2320 struct spoe_agent *agent = conf->agent;
2321 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2322 struct listener *, by_fe);
2323
2324 free(l);
2325 release_spoe_agent(agent);
2326 free(conf);
2327 }
2328 fconf->conf = NULL;
2329}
2330
2331/* Check configuration of a SPOE filter for a specified proxy.
2332 * Return 1 on error, else 0. */
2333static int
2334spoe_check(struct proxy *px, struct flt_conf *fconf)
2335{
2336 struct spoe_config *conf = fconf->conf;
2337 struct proxy *target;
2338
2339 target = proxy_be_by_name(conf->agent->b.name);
2340 if (target == NULL) {
2341 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2342 " declared at %s:%d.\n",
2343 px->id, conf->agent->b.name, conf->agent->id,
2344 conf->agent->conf.file, conf->agent->conf.line);
2345 return 1;
2346 }
2347 if (target->mode != PR_MODE_TCP) {
2348 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2349 " at %s:%d does not support HTTP mode.\n",
2350 px->id, target->id, conf->agent->id,
2351 conf->agent->conf.file, conf->agent->conf.line);
2352 return 1;
2353 }
2354
2355 free(conf->agent->b.name);
2356 conf->agent->b.name = NULL;
2357 conf->agent->b.be = target;
2358 return 0;
2359}
2360
2361/**************************************************************************
2362 * Hooks attached to a stream
2363 *************************************************************************/
2364/* Called when a filter instance is created and attach to a stream. It creates
2365 * the context that will be used to process this stream. */
2366static int
2367spoe_start(struct stream *s, struct filter *filter)
2368{
2369 struct spoe_context *ctx;
2370
2371 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2372 (int)now.tv_sec, (int)now.tv_usec,
2373 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2374 __FUNCTION__, s);
2375
2376 ctx = create_spoe_context(filter);
2377 if (ctx == NULL) {
2378 send_log(s->be, LOG_EMERG,
2379 "failed to create SPOE context for proxy %s\n",
2380 s->be->id);
2381 return 0;
2382 }
2383
2384 ctx->strm = s;
2385 ctx->state = SPOE_CTX_ST_READY;
2386 filter->ctx = ctx;
2387
2388 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2389 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2390
2391 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2392 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2393
2394 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2395 filter->pre_analyzers |= AN_RES_INSPECT;
2396
2397 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2398 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2399
2400 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2401 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2402
2403 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2404 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2405
2406 return 1;
2407}
2408
2409/* Called when a filter instance is detached from a stream. It release the
2410 * attached SPOE context. */
2411static void
2412spoe_stop(struct stream *s, struct filter *filter)
2413{
2414 struct spoe_context *ctx = filter->ctx;
2415
2416 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2417 (int)now.tv_sec, (int)now.tv_usec,
2418 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2419 __FUNCTION__, s);
2420
2421 if (ctx) {
2422 release_spoe_appctx(ctx);
2423 destroy_spoe_context(ctx);
2424 }
2425}
2426
Christopher Fauletf7a30922016-11-10 15:04:51 +01002427
2428/*
2429 * Called when the stream is woken up because of expired timer.
2430 */
2431static void
2432spoe_check_timeouts(struct stream *s, struct filter *filter)
2433{
2434 struct spoe_context *ctx = filter->ctx;
2435
2436 if (tick_is_expired(ctx->process_exp, now_ms))
2437 s->task->state |= TASK_WOKEN_MSG;
2438}
2439
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002440/* Called when we are ready to filter data on a channel */
2441static int
2442spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2443{
2444 struct spoe_context *ctx = filter->ctx;
2445 int ret = 1;
2446
2447 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2448 " - ctx-flags=0x%08x\n",
2449 (int)now.tv_sec, (int)now.tv_usec,
2450 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2451 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2452
2453 if (!(chn->flags & CF_ISRESP)) {
2454 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2455 chn->analysers |= AN_REQ_INSPECT_FE;
2456 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2457 chn->analysers |= AN_REQ_INSPECT_BE;
2458
2459 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2460 goto out;
2461
2462 ctx->stream_id = s->uniq_id;
2463 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2464 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2465 if (ret != 1)
2466 goto out;
2467 }
2468 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2469 }
2470 else {
2471 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2472 chn->analysers |= AN_RES_INSPECT;
2473
2474 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2475 goto out;
2476
2477 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2478 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2479 if (ret != 1)
2480 goto out;
2481 }
2482 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2483 }
2484
2485 out:
2486 if (!ret) {
2487 channel_dont_read(chn);
2488 channel_dont_close(chn);
2489 }
2490 return ret;
2491}
2492
2493/* Called before a processing happens on a given channel */
2494static int
2495spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2496 struct channel *chn, unsigned an_bit)
2497{
2498 struct spoe_context *ctx = filter->ctx;
2499 int ret = 1;
2500
2501 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2502 " - ctx-flags=0x%08x - ana=0x%08x\n",
2503 (int)now.tv_sec, (int)now.tv_usec,
2504 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2505 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2506 ctx->flags, an_bit);
2507
2508 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2509 goto out;
2510
2511 switch (an_bit) {
2512 case AN_REQ_INSPECT_FE:
2513 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2514 break;
2515 case AN_REQ_INSPECT_BE:
2516 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2517 break;
2518 case AN_RES_INSPECT:
2519 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2520 break;
2521 case AN_REQ_HTTP_PROCESS_FE:
2522 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2523 break;
2524 case AN_REQ_HTTP_PROCESS_BE:
2525 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2526 break;
2527 case AN_RES_HTTP_PROCESS_FE:
2528 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2529 break;
2530 }
2531
2532 out:
2533 if (!ret) {
2534 channel_dont_read(chn);
2535 channel_dont_close(chn);
2536 }
2537 return ret;
2538}
2539
2540/* Called when the filtering on the channel ends. */
2541static int
2542spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2543{
2544 struct spoe_context *ctx = filter->ctx;
2545
2546 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2547 " - ctx-flags=0x%08x\n",
2548 (int)now.tv_sec, (int)now.tv_usec,
2549 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2550 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2551
2552 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2553 reset_spoe_context(ctx);
2554 }
2555
2556 return 1;
2557}
2558
2559/********************************************************************
2560 * Functions that manage the filter initialization
2561 ********************************************************************/
2562struct flt_ops spoe_ops = {
2563 /* Manage SPOE filter, called for each filter declaration */
2564 .init = spoe_init,
2565 .deinit = spoe_deinit,
2566 .check = spoe_check,
2567
2568 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002569 .attach = spoe_start,
2570 .detach = spoe_stop,
2571 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002572
2573 /* Handle channels activity */
2574 .channel_start_analyze = spoe_start_analyze,
2575 .channel_pre_analyze = spoe_chn_pre_analyze,
2576 .channel_end_analyze = spoe_end_analyze,
2577};
2578
2579
2580static int
2581cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2582{
2583 const char *err;
2584 int i, err_code = 0;
2585
2586 if ((cfg_scope == NULL && curengine != NULL) ||
2587 (cfg_scope != NULL && curengine == NULL) ||
2588 strcmp(curengine, cfg_scope))
2589 goto out;
2590
2591 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2592 if (!*args[1]) {
2593 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2594 file, linenum);
2595 err_code |= ERR_ALERT | ERR_ABORT;
2596 goto out;
2597 }
2598 if (*args[2]) {
2599 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2600 file, linenum, args[2]);
2601 err_code |= ERR_ALERT | ERR_ABORT;
2602 goto out;
2603 }
2604
2605 err = invalid_char(args[1]);
2606 if (err) {
2607 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2608 file, linenum, *err, args[0], args[1]);
2609 err_code |= ERR_ALERT | ERR_ABORT;
2610 goto out;
2611 }
2612
2613 if (curagent != NULL) {
2614 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2615 file, linenum);
2616 err_code |= ERR_ALERT | ERR_ABORT;
2617 goto out;
2618 }
2619 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2620 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2621 err_code |= ERR_ALERT | ERR_ABORT;
2622 goto out;
2623 }
2624
2625 curagent->id = strdup(args[1]);
2626 curagent->conf.file = strdup(file);
2627 curagent->conf.line = linenum;
2628 curagent->timeout.hello = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002629 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002630 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002631 curagent->var_pfx = NULL;
2632 curagent->new_applets = 0;
2633
2634 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2635 LIST_INIT(&curagent->messages[i]);
2636 LIST_INIT(&curagent->cache);
2637 LIST_INIT(&curagent->applet_wq);
2638 }
2639 else if (!strcmp(args[0], "use-backend")) {
2640 if (!*args[1]) {
2641 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2642 file, linenum, args[0]);
2643 err_code |= ERR_ALERT | ERR_FATAL;
2644 goto out;
2645 }
2646 if (*args[2]) {
2647 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2648 file, linenum, args[2]);
2649 err_code |= ERR_ALERT | ERR_ABORT;
2650 goto out;
2651 }
2652 free(curagent->b.name);
2653 curagent->b.name = strdup(args[1]);
2654 }
2655 else if (!strcmp(args[0], "messages")) {
2656 int cur_arg = 1;
2657 while (*args[cur_arg]) {
2658 struct spoe_msg_placeholder *mp = NULL;
2659
2660 list_for_each_entry(mp, &curmps, list) {
2661 if (!strcmp(mp->id, args[cur_arg])) {
2662 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2663 file, linenum, args[cur_arg]);
2664 err_code |= ERR_ALERT | ERR_FATAL;
2665 goto out;
2666 }
2667 }
2668
2669 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2670 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2671 err_code |= ERR_ALERT | ERR_ABORT;
2672 goto out;
2673 }
2674 mp->id = strdup(args[cur_arg]);
2675 LIST_ADDQ(&curmps, &mp->list);
2676 cur_arg++;
2677 }
2678 }
2679 else if (!strcmp(args[0], "timeout")) {
2680 unsigned int *tv = NULL;
2681 const char *res;
2682 unsigned timeout;
2683
2684 if (!*args[1]) {
2685 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2686 file, linenum);
2687 err_code |= ERR_ALERT | ERR_FATAL;
2688 goto out;
2689 }
2690 if (!strcmp(args[1], "hello"))
2691 tv = &curagent->timeout.hello;
2692 else if (!strcmp(args[1], "idle"))
2693 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002694 else if (!strcmp(args[1], "processing"))
2695 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002696 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01002697 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002698 file, linenum, args[1]);
2699 err_code |= ERR_ALERT | ERR_FATAL;
2700 goto out;
2701 }
2702 if (!*args[2]) {
2703 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2704 file, linenum, args[1]);
2705 err_code |= ERR_ALERT | ERR_FATAL;
2706 goto out;
2707 }
2708 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2709 if (res) {
2710 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2711 file, linenum, *res, args[1]);
2712 err_code |= ERR_ALERT | ERR_ABORT;
2713 goto out;
2714 }
2715 if (*args[3]) {
2716 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2717 file, linenum, args[3]);
2718 err_code |= ERR_ALERT | ERR_ABORT;
2719 goto out;
2720 }
2721 *tv = MS_TO_TICKS(timeout);
2722 }
2723 else if (!strcmp(args[0], "option")) {
2724 if (!*args[1]) {
2725 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2726 file, linenum, args[0]);
2727 err_code |= ERR_ALERT | ERR_FATAL;
2728 goto out;
2729 }
2730 if (!strcmp(args[1], "var-prefix")) {
2731 char *tmp;
2732
2733 if (!*args[2]) {
2734 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2735 file, linenum, args[0],
2736 args[1]);
2737 err_code |= ERR_ALERT | ERR_FATAL;
2738 goto out;
2739 }
2740 tmp = args[2];
2741 while (*tmp) {
2742 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2743 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2744 file, linenum, args[0], args[1]);
2745 err_code |= ERR_ALERT | ERR_FATAL;
2746 goto out;
2747 }
2748 tmp++;
2749 }
2750 curagent->var_pfx = strdup(args[2]);
2751 }
2752 else {
2753 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2754 file, linenum, args[1]);
2755 err_code |= ERR_ALERT | ERR_FATAL;
2756 goto out;
2757 }
2758 }
2759 else if (*args[0]) {
2760 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2761 file, linenum, args[0]);
2762 err_code |= ERR_ALERT | ERR_FATAL;
2763 goto out;
2764 }
2765 out:
2766 return err_code;
2767}
2768
2769static int
2770cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2771{
2772 struct spoe_message *msg;
2773 struct spoe_arg *arg;
2774 const char *err;
2775 char *errmsg = NULL;
2776 int err_code = 0;
2777
2778 if ((cfg_scope == NULL && curengine != NULL) ||
2779 (cfg_scope != NULL && curengine == NULL) ||
2780 strcmp(curengine, cfg_scope))
2781 goto out;
2782
2783 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2784 if (!*args[1]) {
2785 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2786 file, linenum);
2787 err_code |= ERR_ALERT | ERR_ABORT;
2788 goto out;
2789 }
2790 if (*args[2]) {
2791 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2792 file, linenum, args[2]);
2793 err_code |= ERR_ALERT | ERR_ABORT;
2794 goto out;
2795 }
2796
2797 err = invalid_char(args[1]);
2798 if (err) {
2799 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2800 file, linenum, *err, args[0], args[1]);
2801 err_code |= ERR_ALERT | ERR_ABORT;
2802 goto out;
2803 }
2804
2805 list_for_each_entry(msg, &curmsgs, list) {
2806 if (!strcmp(msg->id, args[1])) {
2807 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2808 " name as another one declared at %s:%d.\n",
2809 file, linenum, args[1], msg->conf.file, msg->conf.line);
2810 err_code |= ERR_ALERT | ERR_FATAL;
2811 goto out;
2812 }
2813 }
2814
2815 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2816 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2817 err_code |= ERR_ALERT | ERR_ABORT;
2818 goto out;
2819 }
2820
2821 curmsg->id = strdup(args[1]);
2822 curmsg->id_len = strlen(curmsg->id);
2823 curmsg->event = SPOE_EV_NONE;
2824 curmsg->conf.file = strdup(file);
2825 curmsg->conf.line = linenum;
2826 LIST_INIT(&curmsg->args);
2827 LIST_ADDQ(&curmsgs, &curmsg->list);
2828 }
2829 else if (!strcmp(args[0], "args")) {
2830 int cur_arg = 1;
2831
2832 curproxy->conf.args.ctx = ARGC_SPOE;
2833 curproxy->conf.args.file = file;
2834 curproxy->conf.args.line = linenum;
2835 while (*args[cur_arg]) {
2836 char *delim = strchr(args[cur_arg], '=');
2837 int idx = 0;
2838
2839 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2840 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2841 err_code |= ERR_ALERT | ERR_ABORT;
2842 goto out;
2843 }
2844
2845 if (!delim) {
2846 arg->name = NULL;
2847 arg->name_len = 0;
2848 delim = args[cur_arg];
2849 }
2850 else {
2851 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2852 arg->name_len = delim - args[cur_arg];
2853 delim++;
2854 }
2855
2856 arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
2857 if (arg->expr == NULL) {
2858 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2859 err_code |= ERR_ALERT | ERR_FATAL;
2860 free(arg->name);
2861 free(arg);
2862 goto out;
2863 }
2864 LIST_ADDQ(&curmsg->args, &arg->list);
2865 cur_arg++;
2866 }
2867 curproxy->conf.args.file = NULL;
2868 curproxy->conf.args.line = 0;
2869 }
2870 else if (!strcmp(args[0], "event")) {
2871 if (!*args[1]) {
2872 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2873 err_code |= ERR_ALERT | ERR_ABORT;
2874 goto out;
2875 }
2876 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2877 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2878 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2879 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2880
2881 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2882 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2883 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2884 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2885 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2886 curmsg->event = SPOE_EV_ON_TCP_RSP;
2887
2888 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2889 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2890 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2891 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2892 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2893 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2894 else {
2895 Alert("parsing [%s:%d] : unkown event '%s'.\n",
2896 file, linenum, args[1]);
2897 err_code |= ERR_ALERT | ERR_ABORT;
2898 goto out;
2899 }
2900 }
2901 else if (!*args[0]) {
2902 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
2903 file, linenum, args[0]);
2904 err_code |= ERR_ALERT | ERR_FATAL;
2905 goto out;
2906 }
2907 out:
2908 free(errmsg);
2909 return err_code;
2910}
2911
2912/* Return -1 on error, else 0 */
2913static int
2914parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
2915 struct flt_conf *fconf, char **err, void *private)
2916{
2917 struct list backup_sections;
2918 struct spoe_config *conf;
2919 struct spoe_message *msg, *msgback;
2920 struct spoe_msg_placeholder *mp, *mpback;
2921 char *file = NULL, *engine = NULL;
2922 int ret, pos = *cur_arg + 1;
2923
2924 conf = calloc(1, sizeof(*conf));
2925 if (conf == NULL) {
2926 memprintf(err, "%s: out of memory", args[*cur_arg]);
2927 goto error;
2928 }
2929 conf->proxy = px;
2930
2931 while (*args[pos]) {
2932 if (!strcmp(args[pos], "config")) {
2933 if (!*args[pos+1]) {
2934 memprintf(err, "'%s' : '%s' option without value",
2935 args[*cur_arg], args[pos]);
2936 goto error;
2937 }
2938 file = args[pos+1];
2939 pos += 2;
2940 }
2941 else if (!strcmp(args[pos], "engine")) {
2942 if (!*args[pos+1]) {
2943 memprintf(err, "'%s' : '%s' option without value",
2944 args[*cur_arg], args[pos]);
2945 goto error;
2946 }
2947 engine = args[pos+1];
2948 pos += 2;
2949 }
2950 else {
2951 memprintf(err, "unknown keyword '%s'", args[pos]);
2952 goto error;
2953 }
2954 }
2955 if (file == NULL) {
2956 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
2957 goto error;
2958 }
2959
2960 /* backup sections and register SPOE sections */
2961 LIST_INIT(&backup_sections);
2962 cfg_backup_sections(&backup_sections);
2963 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
2964 cfg_register_section("spoe-message", cfg_parse_spoe_message);
2965
2966 /* Parse SPOE filter configuration file */
2967 curengine = engine;
2968 curproxy = px;
2969 curagent = NULL;
2970 curmsg = NULL;
2971 ret = readcfgfile(file);
2972 curproxy = NULL;
2973
2974 /* unregister SPOE sections and restore previous sections */
2975 cfg_unregister_sections();
2976 cfg_restore_sections(&backup_sections);
2977
2978 if (ret == -1) {
2979 memprintf(err, "Could not open configuration file %s : %s",
2980 file, strerror(errno));
2981 goto error;
2982 }
2983 if (ret & (ERR_ABORT|ERR_FATAL)) {
2984 memprintf(err, "Error(s) found in configuration file %s", file);
2985 goto error;
2986 }
2987
2988 /* Check SPOE agent */
2989 if (curagent == NULL) {
2990 memprintf(err, "No SPOE agent found in file %s", file);
2991 goto error;
2992 }
2993 if (curagent->b.name == NULL) {
2994 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
2995 curagent->id, curagent->conf.file, curagent->conf.line);
2996 goto error;
2997 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01002998 if (curagent->timeout.hello == TICK_ETERNITY ||
2999 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003000 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003001 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3002 " | While not properly invalid, you will certainly encounter various problems\n"
3003 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003004 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003005 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3006 }
3007 if (curagent->var_pfx == NULL) {
3008 char *tmp = curagent->id;
3009
3010 while (*tmp) {
3011 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3012 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3013 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3014 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3015 goto error;
3016 }
3017 tmp++;
3018 }
3019 curagent->var_pfx = strdup(curagent->id);
3020 }
3021
3022 if (LIST_ISEMPTY(&curmps)) {
3023 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3024 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3025 goto finish;
3026 }
3027
3028 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3029 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3030 if (!strcmp(msg->id, mp->id)) {
3031 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3032 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3033 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3034 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3035 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3036 }
3037 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3038 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3039 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3040 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3041 px->id, msg->conf.file, msg->conf.line);
3042 goto next;
3043 }
3044 if (msg->event == SPOE_EV_NONE) {
3045 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3046 px->id, msg->conf.file, msg->conf.line);
3047 goto next;
3048 }
3049 msg->agent = curagent;
3050 LIST_DEL(&msg->list);
3051 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3052 goto next;
3053 }
3054 }
3055 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3056 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3057 goto error;
3058 next:
3059 continue;
3060 }
3061
3062 finish:
3063 conf->agent = curagent;
3064 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3065 LIST_DEL(&mp->list);
3066 release_spoe_msg_placeholder(mp);
3067 }
3068 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3069 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3070 px->id, msg->id, msg->conf.file, msg->conf.line);
3071 LIST_DEL(&msg->list);
3072 release_spoe_message(msg);
3073 }
3074
3075 *cur_arg = pos;
3076 fconf->ops = &spoe_ops;
3077 fconf->conf = conf;
3078 return 0;
3079
3080 error:
3081 release_spoe_agent(curagent);
3082 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3083 LIST_DEL(&mp->list);
3084 release_spoe_msg_placeholder(mp);
3085 }
3086 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3087 LIST_DEL(&msg->list);
3088 release_spoe_message(msg);
3089 }
3090 free(conf);
3091 return -1;
3092}
3093
3094
3095/* Declare the filter parser for "spoe" keyword */
3096static struct flt_kw_list flt_kws = { "SPOE", { }, {
3097 { "spoe", parse_spoe_flt, NULL },
3098 { NULL, NULL, NULL },
3099 }
3100};
3101
3102__attribute__((constructor))
3103static void __spoe_init(void)
3104{
3105 flt_register_keywords(&flt_kws);
3106
3107 LIST_INIT(&curmsgs);
3108 LIST_INIT(&curmps);
3109 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3110}
3111
3112__attribute__((destructor))
3113static void
3114__spoe_deinit(void)
3115{
3116 pool_destroy2(pool2_spoe_ctx);
3117}