blob: 0b722b6754634783355f7e31bc2462add12310b8 [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
Christopher Faulet48026722016-11-16 15:01:12 +010033#include <proto/freq_ctr.h>
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020034#include <proto/frontend.h>
35#include <proto/log.h>
36#include <proto/proto_http.h>
37#include <proto/proxy.h>
38#include <proto/sample.h>
39#include <proto/session.h>
40#include <proto/signal.h>
41#include <proto/stream.h>
42#include <proto/stream_interface.h>
43#include <proto/task.h>
44#include <proto/vars.h>
45
46#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47#define SPOE_PRINTF(x...) fprintf(x)
48#else
49#define SPOE_PRINTF(x...)
50#endif
51
52/* Helper to get ctx inside an appctx */
53#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
54
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020055/* Minimal size for a frame */
56#define MIN_FRAME_SIZE 256
57
Christopher Fauletea62c2a2016-11-14 10:54:21 +010058/* Flags set on the SPOE agent */
59#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
60
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020061/* Flags set on the SPOE context */
62#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
63#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
64#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
65#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
66
67#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
68
69#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
70#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
71
72/* All possible states for a SPOE context */
73enum spoe_ctx_state {
74 SPOE_CTX_ST_NONE = 0,
75 SPOE_CTX_ST_READY,
76 SPOE_CTX_ST_SENDING_MSGS,
77 SPOE_CTX_ST_WAITING_ACK,
78 SPOE_CTX_ST_DONE,
79 SPOE_CTX_ST_ERROR,
80};
81
82/* All possible states for a SPOE applet */
83enum spoe_appctx_state {
84 SPOE_APPCTX_ST_CONNECT = 0,
85 SPOE_APPCTX_ST_CONNECTING,
86 SPOE_APPCTX_ST_PROCESSING,
87 SPOE_APPCTX_ST_DISCONNECT,
88 SPOE_APPCTX_ST_DISCONNECTING,
89 SPOE_APPCTX_ST_EXIT,
90 SPOE_APPCTX_ST_END,
91};
92
93/* All supported SPOE actions */
94enum spoe_action_type {
95 SPOE_ACT_T_SET_VAR = 1,
96 SPOE_ACT_T_UNSET_VAR,
97 SPOE_ACT_TYPES,
98};
99
100/* All supported SPOE events */
101enum spoe_event {
102 SPOE_EV_NONE = 0,
103
104 /* Request events */
105 SPOE_EV_ON_CLIENT_SESS = 1,
106 SPOE_EV_ON_TCP_REQ_FE,
107 SPOE_EV_ON_TCP_REQ_BE,
108 SPOE_EV_ON_HTTP_REQ_FE,
109 SPOE_EV_ON_HTTP_REQ_BE,
110
111 /* Response events */
112 SPOE_EV_ON_SERVER_SESS,
113 SPOE_EV_ON_TCP_RSP,
114 SPOE_EV_ON_HTTP_RSP,
115
116 SPOE_EV_EVENTS
117};
118
119/* Errors triggerd by SPOE applet */
120enum spoe_frame_error {
121 SPOE_FRM_ERR_NONE = 0,
122 SPOE_FRM_ERR_IO,
123 SPOE_FRM_ERR_TOUT,
124 SPOE_FRM_ERR_TOO_BIG,
125 SPOE_FRM_ERR_INVALID,
126 SPOE_FRM_ERR_NO_VSN,
127 SPOE_FRM_ERR_NO_FRAME_SIZE,
128 SPOE_FRM_ERR_NO_CAP,
129 SPOE_FRM_ERR_BAD_VSN,
130 SPOE_FRM_ERR_BAD_FRAME_SIZE,
131 SPOE_FRM_ERR_UNKNOWN = 99,
132 SPOE_FRM_ERRS,
133};
134
135/* Scopes used for variables set by agents. It is a way to be agnotic to vars
136 * scope. */
137enum spoe_vars_scope {
138 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
139 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
140 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
141 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
142 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
143};
144
145
146/* Describe an argument that will be linked to a message. It is a sample fetch,
147 * with an optional name. */
148struct spoe_arg {
149 char *name; /* Name of the argument, may be NULL */
150 unsigned int name_len; /* The name length, 0 if NULL */
151 struct sample_expr *expr; /* Sample expression */
152 struct list list; /* Used to chain SPOE args */
153};
154
155/* Used during the config parsing only because, when a SPOE agent section is
156 * parsed, messages can be undefined. */
157struct spoe_msg_placeholder {
158 char *id; /* SPOE message placeholder id */
159 struct list list; /* Use to chain SPOE message placeholders */
160};
161
162/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
163 * an argument list (see above) and it is linked to a specific event. */
164struct spoe_message {
165 char *id; /* SPOE message id */
166 unsigned int id_len; /* The message id length */
167 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
168 struct {
169 char *file; /* file where the SPOE message appears */
170 int line; /* line where the SPOE message appears */
171 } conf; /* config information */
172 struct list args; /* Arguments added when the SPOE messages is sent */
173 struct list list; /* Used to chain SPOE messages */
174
175 enum spoe_event event; /* SPOE_EV_* */
176};
177
178/* Describe a SPOE agent. */
179struct spoe_agent {
180 char *id; /* SPOE agent id (name) */
181 struct {
182 char *file; /* file where the SPOE agent appears */
183 int line; /* line where the SPOE agent appears */
184 } conf; /* config information */
185 union {
186 struct proxy *be; /* Backend used by this agent */
187 char *name; /* Backend name used during conf parsing */
188 } b;
189 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100190 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
191 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100192 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200193 } timeout;
194
195 char *var_pfx; /* Prefix used for vars set by the agent */
Christopher Faulet985532d2016-11-16 15:36:19 +0100196 char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
Christopher Fauletea62c2a2016-11-14 10:54:21 +0100197 unsigned int flags; /* SPOE_FL_* */
Christopher Faulet48026722016-11-16 15:01:12 +0100198 unsigned int cps_max; /* Maximum number of connections per second */
199 unsigned int eps_max; /* Maximum number of errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200200
201 struct list cache; /* List used to cache SPOE streams. In
202 * fact, we cache the SPOE applect ctx */
203
204 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
205 * for each supported events */
206
207 struct list applet_wq; /* List of streams waiting for a SPOE applet */
Christopher Faulet48026722016-11-16 15:01:12 +0100208 struct freq_ctr conn_per_sec; /* connections per second */
209 struct freq_ctr err_per_sec; /* connetion errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200210};
211
212/* SPOE filter configuration */
213struct spoe_config {
214 struct proxy *proxy; /* Proxy owning the filter */
215 struct spoe_agent *agent; /* Agent used by this filter */
216 struct proxy agent_fe; /* Agent frontend */
217};
218
219/* SPOE context attached to a stream. It is the main structure that handles the
220 * processing offload */
221struct spoe_context {
222 struct filter *filter; /* The SPOE filter */
223 struct stream *strm; /* The stream that should be offloaded */
224 struct appctx *appctx; /* The SPOE appctx */
225 struct list *messages; /* List of messages that will be sent during the stream processing */
226 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
227 struct list buffer_wait; /* position in the list of streams waiting for a buffer */
228 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
229
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200230 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
231 unsigned int flags; /* SPOE_CTX_FL_* */
232
233 unsigned int stream_id; /* stream_id and frame_id are used */
234 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100235 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200236};
237
238/* Set if the handle on SIGUSR1 is registered */
239static int sighandler_registered = 0;
240
241/* proxy used during the parsing */
242struct proxy *curproxy = NULL;
243
244/* The name of the SPOE engine, used during the parsing */
245char *curengine = NULL;
246
247/* SPOE agent used during the parsing */
248struct spoe_agent *curagent = NULL;
249
250/* SPOE message used during the parsing */
251struct spoe_message *curmsg = NULL;
252
253/* list of SPOE messages and placeholders used during the parsing */
254struct list curmsgs;
255struct list curmps;
256
257/* Pool used to allocate new SPOE contexts */
258static struct pool_head *pool2_spoe_ctx = NULL;
259
260/* Temporary variables used to ease error processing */
261int spoe_status_code = SPOE_FRM_ERR_NONE;
262char spoe_reason[256];
263
264struct flt_ops spoe_ops;
265
266static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
267static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
268static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
269
270/********************************************************************
271 * helper functions/globals
272 ********************************************************************/
273static void
274release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
275{
276 if (!mp)
277 return;
278 free(mp->id);
279 free(mp);
280}
281
282
283static void
284release_spoe_message(struct spoe_message *msg)
285{
286 struct spoe_arg *arg, *back;
287
288 if (!msg)
289 return;
290 free(msg->id);
291 free(msg->conf.file);
292 list_for_each_entry_safe(arg, back, &msg->args, list) {
293 release_sample_expr(arg->expr);
294 free(arg->name);
295 LIST_DEL(&arg->list);
296 free(arg);
297 }
298 free(msg);
299}
300
301static void
302release_spoe_agent(struct spoe_agent *agent)
303{
304 struct spoe_message *msg, *back;
305 int i;
306
307 if (!agent)
308 return;
309 free(agent->id);
310 free(agent->conf.file);
311 free(agent->var_pfx);
Christopher Faulet985532d2016-11-16 15:36:19 +0100312 free(agent->var_on_error);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200313 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
314 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
315 LIST_DEL(&msg->list);
316 release_spoe_message(msg);
317 }
318 }
319 free(agent);
320}
321
322static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
323 [SPOE_FRM_ERR_NONE] = "normal",
324 [SPOE_FRM_ERR_IO] = "I/O error",
325 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
326 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
327 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
328 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
329 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
330 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
331 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
332 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
333 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
334};
335
336static const char *spoe_event_str[SPOE_EV_EVENTS] = {
337 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
338 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
339 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
340 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
341 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
342
343 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
344 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
345 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
346};
347
348
349#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
350
351static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
352 [SPOE_CTX_ST_NONE] = "NONE",
353 [SPOE_CTX_ST_READY] = "READY",
354 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
355 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
356 [SPOE_CTX_ST_DONE] = "DONE",
357 [SPOE_CTX_ST_ERROR] = "ERROR",
358};
359
360static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
361 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
362 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
363 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
364 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
365 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
366 [SPOE_APPCTX_ST_EXIT] = "EXIT",
367 [SPOE_APPCTX_ST_END] = "END",
368};
369
370#endif
371/********************************************************************
372 * Functions that encode/decode SPOE frames
373 ********************************************************************/
374/* Frame Types sent by HAProxy and by agents */
375enum spoe_frame_type {
376 /* Frames sent by HAProxy */
377 SPOE_FRM_T_HAPROXY_HELLO = 1,
378 SPOE_FRM_T_HAPROXY_DISCON,
379 SPOE_FRM_T_HAPROXY_NOTIFY,
380
381 /* Frames sent by the agents */
382 SPOE_FRM_T_AGENT_HELLO = 101,
383 SPOE_FRM_T_AGENT_DISCON,
384 SPOE_FRM_T_AGENT_ACK
385};
386
387/* All supported data types */
388enum spoe_data_type {
389 SPOE_DATA_T_NULL = 0,
390 SPOE_DATA_T_BOOL,
391 SPOE_DATA_T_INT32,
392 SPOE_DATA_T_UINT32,
393 SPOE_DATA_T_INT64,
394 SPOE_DATA_T_UINT64,
395 SPOE_DATA_T_IPV4,
396 SPOE_DATA_T_IPV6,
397 SPOE_DATA_T_STR,
398 SPOE_DATA_T_BIN,
399 SPOE_DATA_TYPES
400};
401
402/* Masks to get data type or flags value */
403#define SPOE_DATA_T_MASK 0x0F
404#define SPOE_DATA_FL_MASK 0xF0
405
406/* Flags to set Boolean values */
407#define SPOE_DATA_FL_FALSE 0x00
408#define SPOE_DATA_FL_TRUE 0x10
409
410/* Helper to get static string length, excluding the terminating null byte */
411#define SLEN(str) (sizeof(str)-1)
412
413/* Predefined key used in HELLO/DISCONNECT frames */
414#define SUPPORTED_VERSIONS_KEY "supported-versions"
415#define VERSION_KEY "version"
416#define MAX_FRAME_SIZE_KEY "max-frame-size"
417#define CAPABILITIES_KEY "capabilities"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100418#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200419#define STATUS_CODE_KEY "status-code"
420#define MSG_KEY "message"
421
422struct spoe_version {
423 char *str;
424 int min;
425 int max;
426};
427
428/* All supported versions */
429static struct spoe_version supported_versions[] = {
430 {"1.0", 1000, 1000},
431 {NULL, 0, 0}
432};
433
434/* Comma-separated list of supported versions */
435#define SUPPORTED_VERSIONS_VAL "1.0"
436
437/* Comma-separated list of supported capabilities (none for now) */
438#define CAPABILITIES_VAL ""
439
440static int
441decode_spoe_version(const char *str, size_t len)
442{
443 char tmp[len+1], *start, *end;
444 double d;
445 int vsn = -1;
446
447 memset(tmp, 0, len+1);
448 memcpy(tmp, str, len);
449
450 start = tmp;
451 while (isspace(*start))
452 start++;
453
454 d = strtod(start, &end);
455 if (d == 0 || start == end)
456 goto out;
457
458 if (*end) {
459 while (isspace(*end))
460 end++;
461 if (*end)
462 goto out;
463 }
464 vsn = (int)(d * 1000);
465 out:
466 return vsn;
467}
468
469/* Encode a variable-length integer. This function never fails and returns the
470 * number of written bytes. */
471static int
472encode_spoe_varint(uint64_t i, char *buf)
473{
474 int idx;
475
476 if (i < 240) {
477 buf[0] = (unsigned char)i;
478 return 1;
479 }
480
481 buf[0] = (unsigned char)i | 240;
482 i = (i - 240) >> 4;
483 for (idx = 1; i >= 128; ++idx) {
484 buf[idx] = (unsigned char)i | 128;
485 i = (i - 128) >> 7;
486 }
487 buf[idx++] = (unsigned char)i;
488 return idx;
489}
490
491/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
492 * happens when the buffer's end in reached. On success, the number of read
493 * bytes is returned. */
494static int
495decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
496{
497 unsigned char *msg = (unsigned char *)buf;
498 int idx = 0;
499
500 if (msg > (unsigned char *)end)
501 return -1;
502
503 if (msg[0] < 240) {
504 *i = msg[0];
505 return 1;
506 }
507 *i = msg[0];
508 do {
509 ++idx;
510 if (msg+idx > (unsigned char *)end)
511 return -1;
512 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
513 } while (msg[idx] >= 128);
514 return (idx + 1);
515}
516
517/* Encode a string. The string will be prefix by its length, encoded as a
518 * variable-length integer. This function never fails and returns the number of
519 * written bytes. */
520static int
521encode_spoe_string(const char *str, size_t len, char *dst)
522{
523 int idx = 0;
524
525 if (!len) {
526 dst[0] = 0;
527 return 1;
528 }
529
530 idx += encode_spoe_varint(len, dst);
531 memcpy(dst+idx, str, len);
532 return (idx + len);
533}
534
535/* Decode a string. Its length is decoded first as a variable-length integer. If
536 * it succeeds, and if the string length is valid, the begin of the string is
537 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
538 * read is returned. If an error occurred, -1 is returned and <*str> remains
539 * NULL. */
540static int
541decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
542{
543 int i, idx = 0;
544
545 *str = NULL;
546 *len = 0;
547
548 if ((i = decode_spoe_varint(buf, end, len)) == -1)
549 goto error;
550 idx += i;
551 if (buf + idx + *len > end)
552 goto error;
553
554 *str = buf+idx;
555 return (idx + *len);
556
557 error:
558 return -1;
559}
560
561/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
562 * of bytes read is returned. A types data is composed of a type (1 byte) and
563 * corresponding data:
564 * - boolean: non additional data (0 bytes)
565 * - integers: a variable-length integer (see decode_spoe_varint)
566 * - ipv4: 4 bytes
567 * - ipv6: 16 bytes
568 * - binary and string: a buffer prefixed by its size, a variable-length
569 * integer (see decode_spoe_string) */
570static int
571skip_spoe_data(char *frame, char *end)
572{
573 uint64_t sz = 0;
574 int i, idx = 0;
575
576 if (frame > end)
577 return -1;
578
579 switch (frame[idx++] & SPOE_DATA_T_MASK) {
580 case SPOE_DATA_T_BOOL:
581 break;
582 case SPOE_DATA_T_INT32:
583 case SPOE_DATA_T_INT64:
584 case SPOE_DATA_T_UINT32:
585 case SPOE_DATA_T_UINT64:
586 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
587 return -1;
588 idx += i;
589 break;
590 case SPOE_DATA_T_IPV4:
591 idx += 4;
592 break;
593 case SPOE_DATA_T_IPV6:
594 idx += 16;
595 break;
596 case SPOE_DATA_T_STR:
597 case SPOE_DATA_T_BIN:
598 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
599 return -1;
600 idx += i + sz;
601 break;
602 }
603
604 if (frame+idx > end)
605 return -1;
606 return idx;
607}
608
609/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
610 * number of read bytes is returned. See skip_spoe_data for details. */
611static int
612decode_spoe_data(char *frame, char *end, struct sample *smp)
613{
614 uint64_t sz = 0;
615 int type, i, idx = 0;
616
617 if (frame > end)
618 return -1;
619
620 type = frame[idx++];
621 switch (type & SPOE_DATA_T_MASK) {
622 case SPOE_DATA_T_BOOL:
623 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
624 smp->data.type = SMP_T_BOOL;
625 break;
626 case SPOE_DATA_T_INT32:
627 case SPOE_DATA_T_INT64:
628 case SPOE_DATA_T_UINT32:
629 case SPOE_DATA_T_UINT64:
630 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
631 return -1;
632 idx += i;
633 smp->data.type = SMP_T_SINT;
634 break;
635 case SPOE_DATA_T_IPV4:
636 if (frame+idx+4 > end)
637 return -1;
638 memcpy(&smp->data.u.ipv4, frame+idx, 4);
639 smp->data.type = SMP_T_IPV4;
640 idx += 4;
641 break;
642 case SPOE_DATA_T_IPV6:
643 if (frame+idx+16 > end)
644 return -1;
645 memcpy(&smp->data.u.ipv6, frame+idx, 16);
646 smp->data.type = SMP_T_IPV6;
647 idx += 16;
648 break;
649 case SPOE_DATA_T_STR:
650 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
651 return -1;
652 idx += i;
653 if (frame+idx+sz > end)
654 return -1;
655 smp->data.u.str.str = frame+idx;
656 smp->data.u.str.len = sz;
657 smp->data.type = SMP_T_STR;
658 idx += sz;
659 break;
660 case SPOE_DATA_T_BIN:
661 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
662 return -1;
663 idx += i;
664 if (frame+idx+sz > end)
665 return -1;
666 smp->data.u.str.str = frame+idx;
667 smp->data.u.str.len = sz;
668 smp->data.type = SMP_T_BIN;
669 idx += sz;
670 break;
671 }
672
673 if (frame+idx > end)
674 return -1;
675 return idx;
676}
677
678/* Skip an action in a frame received from an agent. If an error occurred, -1 is
679 * returned, otherwise the number of read bytes is returned. An action is
680 * composed of the action type followed by a typed data. */
681static int
682skip_spoe_action(char *frame, char *end)
683{
684 int n, i, idx = 0;
685
686 if (frame+2 > end)
687 return -1;
688
689 idx++; /* Skip the action type */
690 n = frame[idx++];
691 while (n-- > 0) {
692 if ((i = skip_spoe_data(frame+idx, end)) == -1)
693 return -1;
694 idx += i;
695 }
696
697 if (frame+idx > end)
698 return -1;
699 return idx;
700}
701
702/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
703 * success, 0 if the frame can be ignored and -1 if an error occurred. */
704static int
705prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
706{
707 int idx = 0;
708 size_t max = (7 /* TYPE + METADATA */
709 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
710 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
711 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
712
713 if (size < max)
714 return -1;
715
716 /* Frame type */
717 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
718
719 /* No flags for now */
720 memset(frame+idx, 0, 4);
721 idx += 4;
722
723 /* No stream-id and frame-id for HELLO frames */
724 frame[idx++] = 0;
725 frame[idx++] = 0;
726
727 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
728 * and "capabilities" */
729
730 /* "supported-versions" K/V item */
731 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
732 frame[idx++] = SPOE_DATA_T_STR;
733 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
734
735 /* "max-fram-size" K/V item */
736 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
737 frame[idx++] = SPOE_DATA_T_UINT32;
738 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
739
740 /* "capabilities" K/V item */
741 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
742 frame[idx++] = SPOE_DATA_T_STR;
743 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
744
745 return idx;
746}
747
748/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
749 * size on success, 0 if the frame can be ignored and -1 if an error
750 * occurred. */
751static int
752prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
753{
754 const char *reason;
755 int rlen, idx = 0;
756 size_t max = (7 /* TYPE + METADATA */
757 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
758 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
759
760 if (size < max)
761 return -1;
762
763 /* Get the message corresponding to the status code */
764 if (spoe_status_code >= SPOE_FRM_ERRS)
765 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
766 reason = spoe_frm_err_reasons[spoe_status_code];
767 rlen = strlen(reason);
768
769 /* Frame type */
770 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
771
772 /* No flags for now */
773 memset(frame+idx, 0, 4);
774 idx += 4;
775
776 /* No stream-id and frame-id for DISCONNECT frames */
777 frame[idx++] = 0;
778 frame[idx++] = 0;
779
780 /* There are 2 mandatory items: "status-code" and "message" */
781
782 /* "status-code" K/V item */
783 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
784 frame[idx++] = SPOE_DATA_T_UINT32;
785 idx += encode_spoe_varint(spoe_status_code, frame+idx);
786
787 /* "message" K/V item */
788 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
789 frame[idx++] = SPOE_DATA_T_STR;
790 idx += encode_spoe_string(reason, rlen, frame+idx);
791
792 return idx;
793}
794
795/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
796 * success, 0 if the frame can be ignored and -1 if an error occurred. */
797static int
798prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
799{
800 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
801 int idx = 0;
802
803 if (size < APPCTX_SPOE(appctx).max_frame_size)
804 return -1;
805
806 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
807
808 /* No flags for now */
809 memset(frame+idx, 0, 4);
810 idx += 4;
811
812 /* Set stream-id and frame-id */
813 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
814 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
815
816 /* Copy encoded messages */
817 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
818 idx += ctx->buffer->i;
819
820 return idx;
821}
822
823/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
824 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
825static int
826handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
827{
828 int vsn, max_frame_size;
829 int i, idx = 0;
830 size_t min_size = (7 /* TYPE + METADATA */
831 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
832 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
833 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
834
835 /* Check frame type */
836 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
837 return 0;
838
839 if (size < min_size) {
840 spoe_status_code = SPOE_FRM_ERR_INVALID;
841 return -1;
842 }
843
844 /* Skip flags: fragmentation is not supported for now */
845 idx += 4;
846
847 /* stream-id and frame-id must be cleared */
848 if (frame[idx] != 0 || frame[idx+1] != 0) {
849 spoe_status_code = SPOE_FRM_ERR_INVALID;
850 return -1;
851 }
852 idx += 2;
853
854 /* There are 3 mandatory items: "version", "max-frame-size" and
855 * "capabilities" */
856
857 /* Loop on K/V items */
858 vsn = max_frame_size = 0;
859 while (idx < size) {
860 char *str;
861 uint64_t sz;
862
863 /* Decode the item key */
864 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
865 if (str == NULL) {
866 spoe_status_code = SPOE_FRM_ERR_INVALID;
867 return -1;
868 }
869 /* Check "version" K/V item */
870 if (!memcmp(str, VERSION_KEY, sz)) {
871 /* The value must be a string */
872 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
873 spoe_status_code = SPOE_FRM_ERR_INVALID;
874 return -1;
875 }
876 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
877 if (str == NULL) {
878 spoe_status_code = SPOE_FRM_ERR_INVALID;
879 return -1;
880 }
881
882 vsn = decode_spoe_version(str, sz);
883 if (vsn == -1) {
884 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
885 return -1;
886 }
887 for (i = 0; supported_versions[i].str != NULL; ++i) {
888 if (vsn >= supported_versions[i].min &&
889 vsn <= supported_versions[i].max)
890 break;
891 }
892 if (supported_versions[i].str == NULL) {
893 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
894 return -1;
895 }
896 }
897 /* Check "max-frame-size" K/V item */
898 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
899 int type;
900
901 /* The value must be integer */
902 type = frame[idx++];
903 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
904 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
905 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
906 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
907 spoe_status_code = SPOE_FRM_ERR_INVALID;
908 return -1;
909 }
910 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
911 spoe_status_code = SPOE_FRM_ERR_INVALID;
912 return -1;
913 }
914 idx += i;
915 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
916 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
917 return -1;
918 }
919 max_frame_size = sz;
920 }
921 /* Skip "capabilities" K/V item for now */
922 else {
923 /* Silently ignore unknown item */
924 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
925 spoe_status_code = SPOE_FRM_ERR_INVALID;
926 return -1;
927 }
928 idx += i;
929 }
930 }
931
932 /* Final checks */
933 if (!vsn) {
934 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
935 return -1;
936 }
937 if (!max_frame_size) {
938 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
939 return -1;
940 }
941
942 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
943 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
944 return idx;
945}
946
947/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
948 * bytes on success, 0 if the frame can be ignored and -1 if an error
949 * occurred. */
950static int
951handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
952{
953 int i, idx = 0;
954 size_t min_size = (7 /* TYPE + METADATA */
955 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
956 + 1 + SLEN(MSG_KEY) + 1 + 1);
957
958 /* Check frame type */
959 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
960 return 0;
961
962 if (size < min_size) {
963 spoe_status_code = SPOE_FRM_ERR_INVALID;
964 return -1;
965 }
966
967 /* Skip flags: fragmentation is not supported for now */
968 idx += 4;
969
970 /* stream-id and frame-id must be cleared */
971 if (frame[idx] != 0 || frame[idx+1] != 0) {
972 spoe_status_code = SPOE_FRM_ERR_INVALID;
973 return -1;
974 }
975 idx += 2;
976
977 /* There are 2 mandatory items: "status-code" and "message" */
978
979 /* Loop on K/V items */
980 while (idx < size) {
981 char *str;
982 uint64_t sz;
983
984 /* Decode the item key */
985 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
986 if (str == NULL) {
987 spoe_status_code = SPOE_FRM_ERR_INVALID;
988 return -1;
989 }
990
991 /* Check "status-code" K/V item */
992 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
993 int type;
994
995 /* The value must be an integer */
996 type = frame[idx++];
997 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
998 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
999 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1000 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1001 spoe_status_code = SPOE_FRM_ERR_INVALID;
1002 return -1;
1003 }
1004 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1005 spoe_status_code = SPOE_FRM_ERR_INVALID;
1006 return -1;
1007 }
1008 idx += i;
1009 spoe_status_code = sz;
1010 }
1011
1012 /* Check "message" K/V item */
1013 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1014 /* The value must be a string */
1015 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1016 spoe_status_code = SPOE_FRM_ERR_INVALID;
1017 return -1;
1018 }
1019 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1020 if (str == NULL || sz > 255) {
1021 spoe_status_code = SPOE_FRM_ERR_INVALID;
1022 return -1;
1023 }
1024 memcpy(spoe_reason, str, sz);
1025 spoe_reason[sz] = 0;
1026 }
1027 else {
1028 /* Silently ignore unknown item */
1029 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1030 spoe_status_code = SPOE_FRM_ERR_INVALID;
1031 return -1;
1032 }
1033 idx += i;
1034 }
1035 }
1036
1037 return idx;
1038}
1039
1040
1041/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1042 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1043static int
1044handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1045{
1046 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1047 uint64_t stream_id, frame_id;
1048 int idx = 0;
1049 size_t min_size = (7 /* TYPE + METADATA */);
1050
1051 /* Check frame type */
1052 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1053 return 0;
1054
1055 if (size < min_size) {
1056 spoe_status_code = SPOE_FRM_ERR_INVALID;
1057 return -1;
1058 }
1059
1060 /* Skip flags: fragmentation is not supported for now */
1061 idx += 4;
1062
1063 /* Get the stream-id and the frame-id */
1064 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1065 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1066
1067 /* Check stream-id and frame-id */
1068 if (ctx->stream_id != (unsigned int)stream_id ||
1069 ctx->frame_id != (unsigned int)frame_id)
1070 return 0;
1071
1072 /* Copy encoded actions */
1073 b_reset(ctx->buffer);
1074 memcpy(ctx->buffer->p, frame+idx, size-idx);
1075 ctx->buffer->i = size-idx;
1076
1077 return idx;
1078}
1079
Christopher Fauletba7bc162016-11-07 21:07:38 +01001080/* This function is used in cfgparse.c and declared in proto/checks.h. It
1081 * prepare the request to send to agents during a healthcheck. It returns 0 on
1082 * success and -1 if an error occurred. */
1083int
1084prepare_spoe_healthcheck_request(char **req, int *len)
1085{
1086 struct appctx a;
1087 char *frame, buf[global.tune.bufsize];
1088 unsigned int framesz;
1089 int idx;
1090
1091 memset(&a, 0, sizeof(a));
1092 memset(buf, 0, sizeof(buf));
1093 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1094
1095 frame = buf+4;
1096 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1097 if (idx <= 0)
1098 return -1;
1099 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1100 return -1;
1101
1102 /* "healthcheck" K/V item */
1103 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1104 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1105
1106 framesz = htonl(idx);
1107 memcpy(buf, (char *)&framesz, 4);
1108
1109 if ((*req = malloc(idx+4)) == NULL)
1110 return -1;
1111 memcpy(*req, buf, idx+4);
1112 *len = idx+4;
1113 return 0;
1114}
1115
1116/* This function is used in checks.c and declared in proto/checks.h. It decode
1117 * the response received from an agent during a healthcheck. It returns 0 on
1118 * success and -1 if an error occurred. */
1119int
1120handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1121{
1122 struct appctx a;
1123 int r;
1124
1125 memset(&a, 0, sizeof(a));
1126 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1127
1128 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1129 goto error;
1130 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1131 if (r == 0)
1132 spoe_status_code = SPOE_FRM_ERR_INVALID;
1133 goto error;
1134 }
1135
1136 return 0;
1137
1138 error:
1139 if (spoe_status_code >= SPOE_FRM_ERRS)
1140 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1141 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1142 return -1;
1143}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001144
1145/********************************************************************
1146 * Functions that manage the SPOE applet
1147 ********************************************************************/
1148/* Callback function that catches applet timeouts. If a timeout occurred, we set
1149 * <appctx->st1> flag and the SPOE applet is woken up. */
1150static struct task *
1151process_spoe_applet(struct task * task)
1152{
1153 struct appctx *appctx = task->context;
1154
1155 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1156 if (tick_is_expired(task->expire, now_ms)) {
1157 task->expire = TICK_ETERNITY;
1158 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1159 }
1160 si_applet_want_get(appctx->owner);
1161 appctx_wakeup(appctx);
1162 return task;
1163}
1164
1165/* Remove a SPOE applet from the agent cache */
1166static void
1167remove_spoe_applet_from_cache(struct appctx *appctx)
1168{
1169 struct appctx *a, *back;
1170 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1171
1172 if (LIST_ISEMPTY(&agent->cache))
1173 return;
1174
1175 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1176 if (a == appctx) {
1177 LIST_DEL(&APPCTX_SPOE(appctx).list);
1178 break;
1179 }
1180 }
1181}
1182
1183
1184/* Callback function that releases a SPOE applet. This happens when the
1185 * connection with the agent is closed. */
1186static void
1187release_spoe_applet(struct appctx *appctx)
1188{
1189 struct stream_interface *si = appctx->owner;
1190 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1191 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1192
1193 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1194 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1195 on_new_spoe_appctx_failure(agent);
1196
1197 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1198 si_shutw(si);
1199 si_shutr(si);
1200 si_ic(si)->flags |= CF_READ_NULL;
1201 appctx->st0 = SPOE_APPCTX_ST_END;
1202 }
1203
1204 if (ctx != NULL) {
1205 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1206 ctx->appctx = NULL;
1207 }
1208
1209 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1210 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1211 __FUNCTION__, appctx);
1212
1213 /* Release the task attached to the SPOE applet */
1214 if (APPCTX_SPOE(appctx).task) {
1215 task_delete(APPCTX_SPOE(appctx).task);
1216 task_free(APPCTX_SPOE(appctx).task);
1217 }
1218
1219 /* And remove it from the agent cache */
1220 remove_spoe_applet_from_cache(appctx);
1221 APPCTX_SPOE(appctx).ctx = NULL;
1222}
1223
1224/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1225 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1226 * encoded using the callback function <prepare>. */
1227static int
1228send_spoe_frame(struct appctx *appctx,
1229 int (*prepare)(struct appctx *, char *, size_t))
1230{
1231 struct stream_interface *si = appctx->owner;
1232 int framesz, ret;
1233 uint32_t netint;
1234
1235 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1236 if (ret <= 0)
1237 goto skip_or_error;
1238 framesz = ret;
1239 netint = htonl(framesz);
1240 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1241 if (ret > 0)
1242 ret = bi_putblk(si_ic(si), trash.str, framesz);
1243 if (ret <= 0) {
1244 if (ret == -1)
1245 return -1;
1246 return -2;
1247 }
1248 return 1;
1249
1250 skip_or_error:
1251 if (!ret)
1252 return -1;
1253 return -2;
1254}
1255
1256/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1257 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1258 * is decoded using the callback function <handle>. */
1259static int
1260recv_spoe_frame(struct appctx *appctx,
1261 int (*handle)(struct appctx *, char *, size_t))
1262{
1263 struct stream_interface *si = appctx->owner;
1264 int framesz, ret;
1265 uint32_t netint;
1266
1267 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1268 if (ret <= 0)
1269 goto empty_or_error;
1270 framesz = ntohl(netint);
1271 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1272 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1273 return -2;
1274 }
1275
1276 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1277 if (ret <= 0)
1278 goto empty_or_error;
1279 bo_skip(si_oc(si), ret+sizeof(netint));
1280
1281 /* First check if the received frame is a DISCONNECT frame */
1282 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1283 if (ret != 0) {
1284 if (ret > 0) {
1285 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1286 " - disconnected by peer (%d): %s\n",
1287 (int)now.tv_sec, (int)now.tv_usec,
1288 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1289 __FUNCTION__, appctx, spoe_status_code,
1290 spoe_reason);
1291 return 2;
1292 }
1293 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1294 " - error on frame (%s)\n",
1295 (int)now.tv_sec, (int)now.tv_usec,
1296 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1297 __FUNCTION__, appctx,
1298 spoe_frm_err_reasons[spoe_status_code]);
1299 return -2;
1300 }
1301 if (handle == NULL)
1302 goto out;
1303
1304 /* If not, try to decode it */
1305 ret = handle(appctx, trash.str, framesz);
1306 if (ret <= 0) {
1307 if (!ret)
1308 return -1;
1309 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1310 " - error on frame (%s)\n",
1311 (int)now.tv_sec, (int)now.tv_usec,
1312 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1313 __FUNCTION__, appctx,
1314 spoe_frm_err_reasons[spoe_status_code]);
1315 return -2;
1316 }
1317 out:
1318 return 1;
1319
1320 empty_or_error:
1321 if (!ret)
1322 return 0;
1323 spoe_status_code = SPOE_FRM_ERR_IO;
1324 return -2;
1325}
1326
1327/* I/O Handler processing messages exchanged with the agent */
1328static void
1329handle_spoe_applet(struct appctx *appctx)
1330{
1331 struct stream_interface *si = appctx->owner;
1332 struct stream *s = si_strm(si);
1333 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1334 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1335 int ret;
1336
1337 switchstate:
1338 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1339 " - appctx-state=%s\n",
1340 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1341 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1342
1343 switch (appctx->st0) {
1344 case SPOE_APPCTX_ST_CONNECT:
1345 spoe_status_code = SPOE_FRM_ERR_NONE;
1346 if (si->state <= SI_ST_CON) {
1347 si_applet_want_put(si);
1348 task_wakeup(s->task, TASK_WOKEN_MSG);
1349 break;
1350 }
1351 else if (si->state != SI_ST_EST) {
1352 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1353 on_new_spoe_appctx_failure(agent);
1354 goto switchstate;
1355 }
1356 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1357 if (ret < 0) {
1358 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1359 on_new_spoe_appctx_failure(agent);
1360 goto switchstate;
1361 }
1362 else if (!ret)
1363 goto full;
1364
1365 /* Hello frame was sent. Set the hello timeout and
1366 * wait for the reply. */
1367 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1368 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1369 /* fall through */
1370
1371 case SPOE_APPCTX_ST_CONNECTING:
1372 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1373 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1374 on_new_spoe_appctx_failure(agent);
1375 goto switchstate;
1376 }
1377 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1378 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1379 " - Connection timed out\n",
1380 (int)now.tv_sec, (int)now.tv_usec,
1381 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1382 __FUNCTION__, appctx);
1383 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1384 on_new_spoe_appctx_failure(agent);
1385 goto switchstate;
1386 }
1387 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1388 if (ret < 0) {
1389 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1390 on_new_spoe_appctx_failure(agent);
1391 goto switchstate;
1392 }
1393 if (ret == 2) {
1394 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1395 on_new_spoe_appctx_failure(agent);
1396 goto switchstate;
1397 }
1398 if (!ret)
1399 goto out;
1400
1401 /* hello handshake is finished, set the idle timeout,
1402 * Add the appctx in the agent cache, decrease the
1403 * number of new applets and wake up waiting streams. */
1404 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1405 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1406 on_new_spoe_appctx_success(agent, appctx);
1407 break;
1408
1409 case SPOE_APPCTX_ST_PROCESSING:
1410 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1411 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1412 goto switchstate;
1413 }
1414 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1415 spoe_status_code = SPOE_FRM_ERR_TOUT;
1416 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1417 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1418 goto switchstate;
1419 }
1420 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1421 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1422 if (ret < 0) {
1423 if (ret == -1) {
1424 ctx->state = SPOE_CTX_ST_ERROR;
1425 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1426 goto skip_notify_frame;
1427 }
1428 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1429 goto switchstate;
1430 }
1431 else if (!ret)
1432 goto full;
1433 ctx->state = SPOE_CTX_ST_WAITING_ACK;
Christopher Faulet03a34492016-11-19 16:47:56 +01001434 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001435 }
1436
1437 skip_notify_frame:
1438 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1439 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1440 if (ret < 0) {
1441 if (ret == -1)
1442 goto skip_notify_frame;
1443 ctx->state = SPOE_CTX_ST_ERROR;
1444 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1445 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1446 goto switchstate;
1447 }
1448 if (!ret)
1449 goto out;
1450 if (ret == 2) {
1451 ctx->state = SPOE_CTX_ST_ERROR;
1452 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1453 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1454 goto switchstate;
1455 }
1456 ctx->state = SPOE_CTX_ST_DONE;
1457 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1458 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1459 }
1460 else {
1461 if (stopping) {
1462 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1463 goto switchstate;
1464 }
1465
1466 ret = recv_spoe_frame(appctx, NULL);
1467 if (ret < 0) {
1468 if (ret == -1)
1469 goto skip_notify_frame;
1470 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1471 goto switchstate;
1472 }
1473 if (!ret)
1474 goto out;
1475 if (ret == 2) {
1476 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1477 goto switchstate;
1478 }
1479 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1480 }
1481 break;
1482
1483 case SPOE_APPCTX_ST_DISCONNECT:
1484 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1485 if (ret < 0) {
1486 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1487 goto switchstate;
1488 }
1489 else if (!ret)
1490 goto full;
1491 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1492 " - disconnected by HAProxy (%d): %s\n",
1493 (int)now.tv_sec, (int)now.tv_usec,
1494 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1495 __FUNCTION__, appctx, spoe_status_code,
1496 spoe_frm_err_reasons[spoe_status_code]);
1497
Christopher Faulet03a34492016-11-19 16:47:56 +01001498 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001499 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1500 /* fall through */
1501
1502 case SPOE_APPCTX_ST_DISCONNECTING:
1503 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1504 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1505 goto switchstate;
1506 }
1507 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1508 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1509 goto switchstate;
1510 }
1511 ret = recv_spoe_frame(appctx, NULL);
1512 if (ret < 0 || ret == 2) {
1513 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1514 goto switchstate;
1515 }
1516 break;
1517
1518 case SPOE_APPCTX_ST_EXIT:
1519 si_shutw(si);
1520 si_shutr(si);
1521 si_ic(si)->flags |= CF_READ_NULL;
1522 appctx->st0 = SPOE_APPCTX_ST_END;
1523 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1524 /* fall through */
1525
1526 case SPOE_APPCTX_ST_END:
1527 break;
1528 }
1529
1530 out:
1531 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1532 task_queue(APPCTX_SPOE(appctx).task);
1533 si_oc(si)->flags |= CF_READ_DONTWAIT;
1534 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1535 return;
1536 full:
1537 si_applet_cant_put(si);
1538 goto out;
1539}
1540
1541struct applet spoe_applet = {
1542 .obj_type = OBJ_TYPE_APPLET,
1543 .name = "<SPOE>", /* used for logging */
1544 .fct = handle_spoe_applet,
1545 .release = release_spoe_applet,
1546};
1547
1548/* Create a SPOE applet. On success, the created applet is returned, else
1549 * NULL. */
1550static struct appctx *
1551create_spoe_appctx(struct spoe_config *conf)
1552{
1553 struct appctx *appctx;
1554 struct session *sess;
1555 struct task *task;
1556 struct stream *strm;
1557 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1558 struct listener *, by_fe);
1559
1560 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1561 goto out_error;
1562
1563 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1564 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1565 goto out_free_appctx;
1566 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1567 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1568 APPCTX_SPOE(appctx).task->context = appctx;
1569 APPCTX_SPOE(appctx).agent = conf->agent;
1570 APPCTX_SPOE(appctx).ctx = NULL;
1571 APPCTX_SPOE(appctx).version = 0;
1572 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1573 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1574
1575 sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1576 if (!sess)
1577 goto out_free_spoe;
1578
1579 if ((task = task_new()) == NULL)
1580 goto out_free_sess;
1581
1582 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1583 goto out_free_task;
1584
1585 strm->target = sess->listener->default_target;
1586 strm->req.analysers |= sess->listener->analysers;
1587 stream_set_backend(strm, conf->agent->b.be);
1588
1589 /* applet is waiting for data */
1590 si_applet_cant_get(&strm->si[0]);
1591 appctx_wakeup(appctx);
1592
Christopher Faulet48026722016-11-16 15:01:12 +01001593 /* Increase the per-process number of cumulated connections */
1594 if (conf->agent->cps_max > 0)
1595 update_freq_ctr(&conf->agent->conn_per_sec, 1);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001596
1597 strm->do_log = NULL;
1598 strm->res.flags |= CF_READ_DONTWAIT;
1599
1600 conf->agent_fe.feconn++;
1601 jobs++;
1602 totalconn++;
1603
1604 return appctx;
1605
1606 /* Error unrolling */
1607 out_free_task:
1608 task_free(task);
1609 out_free_sess:
1610 session_free(sess);
1611 out_free_spoe:
1612 task_free(APPCTX_SPOE(appctx).task);
1613 out_free_appctx:
1614 appctx_free(appctx);
1615 out_error:
1616 return NULL;
1617}
1618
1619/* Wake up a SPOE applet attached to a SPOE context. */
1620static void
1621wakeup_spoe_appctx(struct spoe_context *ctx)
1622{
1623 if (ctx->appctx == NULL)
1624 return;
1625 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1626 si_applet_want_get(ctx->appctx->owner);
1627 si_applet_want_put(ctx->appctx->owner);
1628 appctx_wakeup(ctx->appctx);
1629 }
1630}
1631
1632
1633/* Run across the list of pending streams waiting for a SPOE applet and wake the
1634 * first. */
1635static void
1636offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1637{
1638 struct spoe_context *ctx;
1639
Christopher Fauletf7a30922016-11-10 15:04:51 +01001640 if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1641 return;
1642
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001643 if (LIST_ISEMPTY(&agent->applet_wq))
1644 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1645 else {
1646 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1647 APPCTX_SPOE(appctx).ctx = ctx;
1648 ctx->appctx = appctx;
1649 LIST_DEL(&ctx->applet_wait);
1650 LIST_INIT(&ctx->applet_wait);
1651 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1652 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1653 " - wake up stream to get available SPOE applet\n",
1654 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1655 __FUNCTION__, ctx->strm);
1656 }
1657}
1658
1659/* A failure occurred during SPOE applet creation. */
1660static void
1661on_new_spoe_appctx_failure(struct spoe_agent *agent)
1662{
1663 struct spoe_context *ctx;
1664
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001665 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001666 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1667 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1668 " - wake up stream because to SPOE applet connection failed\n",
1669 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1670 __FUNCTION__, ctx->strm);
1671 }
1672}
1673
1674static void
1675on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1676{
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001677 offer_spoe_appctx(agent, appctx);
1678}
1679/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1680 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1681static int
1682acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1683{
1684 struct spoe_config *conf = FLT_CONF(ctx->filter);
1685 struct spoe_agent *agent = conf->agent;
1686 struct appctx *appctx;
1687
1688 /* If a process is already started for this SPOE context, retry
1689 * later. */
1690 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1691 goto wait;
1692
1693 /* If needed, initialize the buffer that will be used to encode messages
1694 * and decode actions. */
1695 if (ctx->buffer == &buf_empty) {
1696 if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
1697 LIST_DEL(&ctx->buffer_wait);
1698 LIST_INIT(&ctx->buffer_wait);
1699 }
1700
1701 if (!b_alloc_margin(&ctx->buffer, 0)) {
1702 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
1703 goto wait;
1704 }
1705 }
1706
1707 /* If the SPOE applet was already set, all is done. */
1708 if (ctx->appctx)
1709 goto success;
1710
1711 /* Else try to retrieve it from the agent cache */
1712 if (!LIST_ISEMPTY(&agent->cache)) {
1713 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1714 LIST_DEL(&APPCTX_SPOE(appctx).list);
1715 APPCTX_SPOE(appctx).ctx = ctx;
1716 ctx->appctx = appctx;
1717 goto success;
1718 }
1719
Christopher Faulet48026722016-11-16 15:01:12 +01001720 /* If there is no server up for the agent's backend, this is an
1721 * error. */
1722 if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001723 goto error;
1724
1725 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1726 " - waiting for available SPOE appctx\n",
1727 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1728 ctx->strm);
1729
1730 /* Else add the stream in the waiting queue. */
1731 if (LIST_ISEMPTY(&ctx->applet_wait))
1732 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1733
1734 /* Finally, create new SPOE applet if we can */
Christopher Faulet48026722016-11-16 15:01:12 +01001735 if (agent->cps_max > 0) {
1736 if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
1737 goto wait;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001738 }
Christopher Faulet48026722016-11-16 15:01:12 +01001739 if (create_spoe_appctx(conf) == NULL)
1740 goto error;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001741
1742 wait:
1743 return 0;
1744
1745 success:
1746 /* Remove the stream from the waiting queue */
1747 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1748 LIST_DEL(&ctx->applet_wait);
1749 LIST_INIT(&ctx->applet_wait);
1750 }
1751
1752 /* Set the right flag to prevent request and response processing
1753 * in same time. */
1754 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1755 ? SPOE_CTX_FL_REQ_PROCESS
1756 : SPOE_CTX_FL_RSP_PROCESS);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001757
1758 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1759 " - acquire SPOE appctx %p from cache\n",
1760 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1761 __FUNCTION__, ctx->strm, ctx->appctx);
1762 return 1;
1763
1764 error:
1765 /* Remove the stream from the waiting queue */
1766 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1767 LIST_DEL(&ctx->applet_wait);
1768 LIST_INIT(&ctx->applet_wait);
1769 }
1770
1771 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Faulet48026722016-11-16 15:01:12 +01001772 " - failed to acquire SPOE appctx\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001773 (int)now.tv_sec, (int)now.tv_usec, agent->id,
Christopher Faulet48026722016-11-16 15:01:12 +01001774 __FUNCTION__, ctx->strm);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001775 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1776
1777 return -1;
1778}
1779
1780/* Release a SPOE applet and push it in the agent cache. */
1781static void
1782release_spoe_appctx(struct spoe_context *ctx)
1783{
1784 struct spoe_config *conf = FLT_CONF(ctx->filter);
1785 struct spoe_agent *agent = conf->agent;
1786 struct appctx *appctx = ctx->appctx;
1787
1788 /* Reset the flag to allow next processing */
1789 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1790
Christopher Fauletf7a30922016-11-10 15:04:51 +01001791 /* Reset processing timer */
1792 ctx->process_exp = TICK_ETERNITY;
1793
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001794 /* Release the buffer if needed */
1795 if (ctx->buffer != &buf_empty) {
1796 b_free(&ctx->buffer);
1797 if (!LIST_ISEMPTY(&buffer_wq))
1798 stream_offer_buffers();
1799 }
1800
1801 /* If there is no SPOE applet, all is done */
1802 if (!appctx)
1803 return;
1804
1805 /* Else, reassign it or push it in the agent cache */
1806 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1807 " - release SPOE appctx %p\n",
1808 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1809 __FUNCTION__, ctx->strm, appctx);
1810
1811 APPCTX_SPOE(appctx).ctx = NULL;
1812 ctx->appctx = NULL;
1813 offer_spoe_appctx(agent, appctx);
1814}
1815
1816/***************************************************************************
1817 * Functions that process SPOE messages and actions
1818 **************************************************************************/
1819/* Process SPOE messages for a specific event. During the processing, it returns
1820 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1821 * is returned. */
1822static int
1823process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1824 struct list *messages, int dir)
1825{
1826 struct spoe_message *msg;
1827 struct sample *smp;
1828 struct spoe_arg *arg;
1829 char *p;
1830 size_t max_size;
1831 int off, flag, idx = 0;
1832
1833 /* Reserve 32 bytes from the frame Metadata */
1834 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1835
1836 b_reset(ctx->buffer);
1837 p = ctx->buffer->p;
1838
1839 /* Loop on messages */
1840 list_for_each_entry(msg, messages, list) {
1841 if (idx + msg->id_len + 1 > max_size)
1842 goto skip;
1843
1844 /* Set the message name */
1845 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1846
1847 /* Save offset where to store the number of arguments for this
1848 * message */
1849 off = idx++;
1850 p[off] = 0;
1851
1852 /* Loop on arguments */
1853 list_for_each_entry(arg, &msg->args, list) {
1854 p[off]++; /* Increment the number of arguments */
1855
1856 if (idx + arg->name_len + 1 > max_size)
1857 goto skip;
1858
1859 /* Encode the arguement name as a string. It can by NULL */
1860 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1861
1862 /* Fetch the arguement value */
1863 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1864 if (!smp) {
1865 /* If no value is available, set it to NULL */
1866 p[idx++] = SPOE_DATA_T_NULL;
1867 continue;
1868 }
1869
1870 /* Else, encode the arguement value */
1871 switch (smp->data.type) {
1872 case SMP_T_BOOL:
1873 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1874 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1875 break;
1876 case SMP_T_SINT:
1877 p[idx++] = SPOE_DATA_T_INT64;
1878 if (idx + 8 > max_size)
1879 goto skip;
1880 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1881 break;
1882 case SMP_T_IPV4:
1883 p[idx++] = SPOE_DATA_T_IPV4;
1884 if (idx + 4 > max_size)
1885 goto skip;
1886 memcpy(p+idx, &smp->data.u.ipv4, 4);
1887 idx += 4;
1888 break;
1889 case SMP_T_IPV6:
1890 p[idx++] = SPOE_DATA_T_IPV6;
1891 if (idx + 16 > max_size)
1892 goto skip;
1893 memcpy(p+idx, &smp->data.u.ipv6, 16);
1894 idx += 16;
1895 break;
1896 case SMP_T_STR:
1897 p[idx++] = SPOE_DATA_T_STR;
1898 if (idx + smp->data.u.str.len > max_size)
1899 goto skip;
1900 idx += encode_spoe_string(smp->data.u.str.str,
1901 smp->data.u.str.len,
1902 p+idx);
1903 break;
1904 case SMP_T_BIN:
1905 p[idx++] = SPOE_DATA_T_BIN;
1906 if (idx + smp->data.u.str.len > max_size)
1907 goto skip;
1908 idx += encode_spoe_string(smp->data.u.str.str,
1909 smp->data.u.str.len,
1910 p+idx);
1911 break;
1912 case SMP_T_METH:
1913 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1914 p[idx++] = SPOE_DATA_T_STR;
1915 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1916 goto skip;
1917 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1918 http_known_methods[smp->data.u.meth.meth].len,
1919 p+idx);
1920 }
1921 else {
1922 p[idx++] = SPOE_DATA_T_STR;
1923 if (idx + smp->data.u.str.len > max_size)
1924 goto skip;
1925 idx += encode_spoe_string(smp->data.u.meth.str.str,
1926 smp->data.u.meth.str.len,
1927 p+idx);
1928 }
1929 break;
1930 default:
1931 p[idx++] = SPOE_DATA_T_NULL;
1932 }
1933 }
1934 }
1935 ctx->buffer->i = idx;
1936 return 1;
1937
1938 skip:
1939 b_reset(ctx->buffer);
1940 return 0;
1941}
1942
1943/* Helper function to set a variable */
1944static void
1945set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1946 struct sample *smp)
1947{
1948 struct spoe_config *conf = FLT_CONF(ctx->filter);
1949 struct spoe_agent *agent = conf->agent;
1950 char varname[64];
1951
1952 memset(varname, 0, sizeof(varname));
1953 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1954 scope, agent->var_pfx, len, name);
1955 vars_set_by_name_ifexist(varname, len, smp);
1956}
1957
1958/* Helper function to unset a variable */
1959static void
1960unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1961 struct sample *smp)
1962{
1963 struct spoe_config *conf = FLT_CONF(ctx->filter);
1964 struct spoe_agent *agent = conf->agent;
1965 char varname[64];
1966
1967 memset(varname, 0, sizeof(varname));
1968 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1969 scope, agent->var_pfx, len, name);
1970 vars_unset_by_name_ifexist(varname, len, smp);
1971}
1972
1973
1974/* Process SPOE actions for a specific event. During the processing, it returns
1975 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1976 * is returned. */
1977static int
1978process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1979 enum spoe_event ev, int dir)
1980{
1981 char *p;
1982 size_t size;
1983 int off, i, idx = 0;
1984
1985 p = ctx->buffer->p;
1986 size = ctx->buffer->i;
1987
1988 while (idx < size) {
1989 char *str;
1990 uint64_t sz;
1991 struct sample smp;
1992 enum spoe_action_type type;
1993
1994 off = idx;
1995 if (idx+2 > size)
1996 goto skip;
1997
1998 type = p[idx++];
1999 switch (type) {
2000 case SPOE_ACT_T_SET_VAR: {
2001 char *scope;
2002
2003 if (p[idx++] != 3)
2004 goto skip_action;
2005
2006 switch (p[idx++]) {
2007 case SPOE_SCOPE_PROC: scope = "proc"; break;
2008 case SPOE_SCOPE_SESS: scope = "sess"; break;
2009 case SPOE_SCOPE_TXN : scope = "txn"; break;
2010 case SPOE_SCOPE_REQ : scope = "req"; break;
2011 case SPOE_SCOPE_RES : scope = "res"; break;
2012 default: goto skip;
2013 }
2014
2015 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2016 if (str == NULL)
2017 goto skip;
2018 memset(&smp, 0, sizeof(smp));
2019 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2020 if (decode_spoe_data(p+idx, p+size, &smp) == -1)
2021 goto skip;
2022
2023 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2024 " - set-var '%s.%s.%.*s'\n",
2025 (int)now.tv_sec, (int)now.tv_usec,
2026 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2027 __FUNCTION__, s, scope,
2028 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2029 (int)sz, str);
2030
2031 set_spoe_var(ctx, scope, str, sz, &smp);
2032 break;
2033 }
2034
2035 case SPOE_ACT_T_UNSET_VAR: {
2036 char *scope;
2037
2038 if (p[idx++] != 2)
2039 goto skip_action;
2040
2041 switch (p[idx++]) {
2042 case SPOE_SCOPE_PROC: scope = "proc"; break;
2043 case SPOE_SCOPE_SESS: scope = "sess"; break;
2044 case SPOE_SCOPE_TXN : scope = "txn"; break;
2045 case SPOE_SCOPE_REQ : scope = "req"; break;
2046 case SPOE_SCOPE_RES : scope = "res"; break;
2047 default: goto skip;
2048 }
2049
2050 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2051 if (str == NULL)
2052 goto skip;
2053 memset(&smp, 0, sizeof(smp));
2054 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2055
2056 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2057 " - unset-var '%s.%s.%.*s'\n",
2058 (int)now.tv_sec, (int)now.tv_usec,
2059 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2060 __FUNCTION__, s, scope,
2061 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2062 (int)sz, str);
2063
2064 unset_spoe_var(ctx, scope, str, sz, &smp);
2065 break;
2066 }
2067
2068 default:
2069 skip_action:
2070 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2071 goto skip;
2072 idx += i;
2073 }
2074 }
2075
2076 return 1;
2077 skip:
2078 return 0;
2079}
2080
2081
2082/* Process a SPOE event. First, this functions will process messages attached to
2083 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2084 * ACK frame to process corresponding actions. During all the processing, it
2085 * returns 0 and it returns 1 when the processing is finished. If an error
2086 * occurred, -1 is returned. */
2087static int
2088process_spoe_event(struct stream *s, struct spoe_context *ctx,
2089 enum spoe_event ev)
2090{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002091 struct spoe_config *conf = FLT_CONF(ctx->filter);
2092 struct spoe_agent *agent = conf->agent;
2093 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002094
2095 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2096 " - ctx-state=%s - event=%s\n",
2097 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002098 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002099 spoe_event_str[ev]);
2100
Christopher Faulet48026722016-11-16 15:01:12 +01002101 if (agent->eps_max > 0) {
2102 if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2103 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2104 " - skip event '%s': max EPS reached\n",
2105 (int)now.tv_sec, (int)now.tv_usec,
2106 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2107 goto skip;
2108 }
2109 }
2110
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002111 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2112
2113 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2114 goto out;
2115
2116 if (ctx->state == SPOE_CTX_ST_ERROR)
2117 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002118
2119 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2120 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2121 " - failed to process event '%s': timeout\n",
2122 (int)now.tv_sec, (int)now.tv_usec,
2123 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2124 send_log(ctx->strm->be, LOG_WARNING,
2125 "failed to process event '%s': timeout.\n",
2126 spoe_event_str[ev]);
2127 goto error;
2128 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002129
2130 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauletf7a30922016-11-10 15:04:51 +01002131 if (!tick_isset(ctx->process_exp)) {
2132 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2133 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2134 ctx->process_exp);
2135 }
2136
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002137 ret = acquire_spoe_appctx(ctx, dir);
2138 if (ret <= 0) {
2139 if (!ret)
2140 goto out;
2141 goto error;
2142 }
2143 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2144 }
2145
2146 if (ctx->appctx == NULL)
2147 goto error;
2148
2149 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2150 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2151 if (ret <= 0) {
2152 if (!ret)
2153 goto skip;
2154 goto error;
2155 }
2156 wakeup_spoe_appctx(ctx);
2157 ret = 0;
2158 goto out;
2159 }
2160
2161 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2162 wakeup_spoe_appctx(ctx);
2163 ret = 0;
2164 goto out;
2165 }
2166
2167 if (ctx->state == SPOE_CTX_ST_DONE) {
2168 ret = process_spoe_actions(s, ctx, ev, dir);
2169 if (ret <= 0) {
2170 if (!ret)
2171 goto skip;
2172 goto error;
2173 }
2174 ctx->frame_id++;
2175 release_spoe_appctx(ctx);
2176 ctx->state = SPOE_CTX_ST_READY;
2177 }
2178
2179 out:
2180 return ret;
2181
2182 skip:
2183 release_spoe_appctx(ctx);
2184 ctx->state = SPOE_CTX_ST_READY;
2185 return 1;
2186
2187 error:
Christopher Faulet48026722016-11-16 15:01:12 +01002188 if (agent->eps_max > 0)
2189 update_freq_ctr(&agent->err_per_sec, 1);
2190
Christopher Faulet985532d2016-11-16 15:36:19 +01002191 if (agent->var_on_error) {
2192 struct sample smp;
2193
2194 memset(&smp, 0, sizeof(smp));
2195 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2196 smp.data.u.sint = 1;
2197 smp.data.type = SMP_T_BOOL;
2198
2199 set_spoe_var(ctx, "txn", agent->var_on_error,
2200 strlen(agent->var_on_error), &smp);
2201 }
2202
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002203 release_spoe_appctx(ctx);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002204 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2205 ? SPOE_CTX_ST_READY
2206 : SPOE_CTX_ST_ERROR);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002207 return 1;
2208}
2209
2210
2211/***************************************************************************
2212 * Functions that create/destroy SPOE contexts
2213 **************************************************************************/
2214static struct spoe_context *
2215create_spoe_context(struct filter *filter)
2216{
2217 struct spoe_config *conf = FLT_CONF(filter);
2218 struct spoe_context *ctx;
2219
2220 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2221 if (ctx == NULL) {
2222 return NULL;
2223 }
2224 memset(ctx, 0, sizeof(*ctx));
2225 ctx->filter = filter;
2226 ctx->state = SPOE_CTX_ST_NONE;
2227 ctx->flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002228 ctx->messages = conf->agent->messages;
2229 ctx->buffer = &buf_empty;
2230 LIST_INIT(&ctx->buffer_wait);
2231 LIST_INIT(&ctx->applet_wait);
2232
Christopher Fauletf7a30922016-11-10 15:04:51 +01002233 ctx->stream_id = 0;
2234 ctx->frame_id = 1;
2235 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002236
2237 return ctx;
2238}
2239
2240static void
2241destroy_spoe_context(struct spoe_context *ctx)
2242{
2243 if (!ctx)
2244 return;
2245
2246 if (ctx->appctx)
2247 APPCTX_SPOE(ctx->appctx).ctx = NULL;
2248 if (!LIST_ISEMPTY(&ctx->buffer_wait))
2249 LIST_DEL(&ctx->buffer_wait);
2250 if (!LIST_ISEMPTY(&ctx->applet_wait))
2251 LIST_DEL(&ctx->applet_wait);
2252 pool_free2(pool2_spoe_ctx, ctx);
2253}
2254
2255static void
2256reset_spoe_context(struct spoe_context *ctx)
2257{
2258 ctx->state = SPOE_CTX_ST_READY;
2259 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2260}
2261
2262
2263/***************************************************************************
2264 * Hooks that manage the filter lifecycle (init/check/deinit)
2265 **************************************************************************/
2266/* Signal handler: Do a soft stop, wakeup SPOE applet */
2267static void
2268sig_stop_spoe(struct sig_handler *sh)
2269{
2270 struct proxy *p;
2271
2272 p = proxy;
2273 while (p) {
2274 struct flt_conf *fconf;
2275
2276 list_for_each_entry(fconf, &p->filter_configs, list) {
2277 struct spoe_config *conf = fconf->conf;
2278 struct spoe_agent *agent = conf->agent;
2279 struct appctx *appctx;
2280
2281 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2282 si_applet_want_get(appctx->owner);
2283 si_applet_want_put(appctx->owner);
2284 appctx_wakeup(appctx);
2285 }
2286 }
2287 p = p->next;
2288 }
2289}
2290
2291
2292/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2293static int
2294spoe_init(struct proxy *px, struct flt_conf *fconf)
2295{
2296 struct spoe_config *conf = fconf->conf;
2297 struct listener *l;
2298
2299 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2300 init_new_proxy(&conf->agent_fe);
2301 conf->agent_fe.parent = conf->agent;
2302 conf->agent_fe.last_change = now.tv_sec;
2303 conf->agent_fe.id = conf->agent->id;
2304 conf->agent_fe.cap = PR_CAP_FE;
2305 conf->agent_fe.mode = PR_MODE_TCP;
2306 conf->agent_fe.maxconn = 0;
2307 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2308 conf->agent_fe.conn_retries = CONN_RETRIES;
2309 conf->agent_fe.accept = frontend_accept;
2310 conf->agent_fe.srv = NULL;
2311 conf->agent_fe.timeout.client = TICK_ETERNITY;
2312 conf->agent_fe.default_target = &spoe_applet.obj_type;
2313 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2314
2315 if ((l = calloc(1, sizeof(*l))) == NULL) {
2316 Alert("spoe_init : out of memory.\n");
2317 goto out_error;
2318 }
2319 l->obj_type = OBJ_TYPE_LISTENER;
2320 l->obj_type = OBJ_TYPE_LISTENER;
2321 l->frontend = &conf->agent_fe;
2322 l->state = LI_READY;
2323 l->analysers = conf->agent_fe.fe_req_ana;
2324 LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2325
2326 if (!sighandler_registered) {
2327 signal_register_fct(0, sig_stop_spoe, 0);
2328 sighandler_registered = 1;
2329 }
2330
2331 return 0;
2332
2333 out_error:
2334 return -1;
2335}
2336
2337/* Free ressources allocated by the SPOE filter. */
2338static void
2339spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2340{
2341 struct spoe_config *conf = fconf->conf;
2342
2343 if (conf) {
2344 struct spoe_agent *agent = conf->agent;
2345 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2346 struct listener *, by_fe);
2347
2348 free(l);
2349 release_spoe_agent(agent);
2350 free(conf);
2351 }
2352 fconf->conf = NULL;
2353}
2354
2355/* Check configuration of a SPOE filter for a specified proxy.
2356 * Return 1 on error, else 0. */
2357static int
2358spoe_check(struct proxy *px, struct flt_conf *fconf)
2359{
2360 struct spoe_config *conf = fconf->conf;
2361 struct proxy *target;
2362
2363 target = proxy_be_by_name(conf->agent->b.name);
2364 if (target == NULL) {
2365 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2366 " declared at %s:%d.\n",
2367 px->id, conf->agent->b.name, conf->agent->id,
2368 conf->agent->conf.file, conf->agent->conf.line);
2369 return 1;
2370 }
2371 if (target->mode != PR_MODE_TCP) {
2372 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2373 " at %s:%d does not support HTTP mode.\n",
2374 px->id, target->id, conf->agent->id,
2375 conf->agent->conf.file, conf->agent->conf.line);
2376 return 1;
2377 }
2378
2379 free(conf->agent->b.name);
2380 conf->agent->b.name = NULL;
2381 conf->agent->b.be = target;
2382 return 0;
2383}
2384
2385/**************************************************************************
2386 * Hooks attached to a stream
2387 *************************************************************************/
2388/* Called when a filter instance is created and attach to a stream. It creates
2389 * the context that will be used to process this stream. */
2390static int
2391spoe_start(struct stream *s, struct filter *filter)
2392{
2393 struct spoe_context *ctx;
2394
2395 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2396 (int)now.tv_sec, (int)now.tv_usec,
2397 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2398 __FUNCTION__, s);
2399
2400 ctx = create_spoe_context(filter);
2401 if (ctx == NULL) {
2402 send_log(s->be, LOG_EMERG,
2403 "failed to create SPOE context for proxy %s\n",
2404 s->be->id);
2405 return 0;
2406 }
2407
2408 ctx->strm = s;
2409 ctx->state = SPOE_CTX_ST_READY;
2410 filter->ctx = ctx;
2411
2412 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2413 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2414
2415 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2416 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2417
2418 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2419 filter->pre_analyzers |= AN_RES_INSPECT;
2420
2421 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2422 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2423
2424 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2425 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2426
2427 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2428 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2429
2430 return 1;
2431}
2432
2433/* Called when a filter instance is detached from a stream. It release the
2434 * attached SPOE context. */
2435static void
2436spoe_stop(struct stream *s, struct filter *filter)
2437{
2438 struct spoe_context *ctx = filter->ctx;
2439
2440 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2441 (int)now.tv_sec, (int)now.tv_usec,
2442 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2443 __FUNCTION__, s);
2444
2445 if (ctx) {
2446 release_spoe_appctx(ctx);
2447 destroy_spoe_context(ctx);
2448 }
2449}
2450
Christopher Fauletf7a30922016-11-10 15:04:51 +01002451
2452/*
2453 * Called when the stream is woken up because of expired timer.
2454 */
2455static void
2456spoe_check_timeouts(struct stream *s, struct filter *filter)
2457{
2458 struct spoe_context *ctx = filter->ctx;
2459
2460 if (tick_is_expired(ctx->process_exp, now_ms))
2461 s->task->state |= TASK_WOKEN_MSG;
2462}
2463
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002464/* Called when we are ready to filter data on a channel */
2465static int
2466spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2467{
2468 struct spoe_context *ctx = filter->ctx;
2469 int ret = 1;
2470
2471 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2472 " - ctx-flags=0x%08x\n",
2473 (int)now.tv_sec, (int)now.tv_usec,
2474 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2475 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2476
2477 if (!(chn->flags & CF_ISRESP)) {
2478 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2479 chn->analysers |= AN_REQ_INSPECT_FE;
2480 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2481 chn->analysers |= AN_REQ_INSPECT_BE;
2482
2483 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2484 goto out;
2485
2486 ctx->stream_id = s->uniq_id;
2487 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2488 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2489 if (ret != 1)
2490 goto out;
2491 }
2492 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2493 }
2494 else {
2495 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2496 chn->analysers |= AN_RES_INSPECT;
2497
2498 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2499 goto out;
2500
2501 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2502 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2503 if (ret != 1)
2504 goto out;
2505 }
2506 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2507 }
2508
2509 out:
2510 if (!ret) {
2511 channel_dont_read(chn);
2512 channel_dont_close(chn);
2513 }
2514 return ret;
2515}
2516
2517/* Called before a processing happens on a given channel */
2518static int
2519spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2520 struct channel *chn, unsigned an_bit)
2521{
2522 struct spoe_context *ctx = filter->ctx;
2523 int ret = 1;
2524
2525 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2526 " - ctx-flags=0x%08x - ana=0x%08x\n",
2527 (int)now.tv_sec, (int)now.tv_usec,
2528 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2529 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2530 ctx->flags, an_bit);
2531
2532 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2533 goto out;
2534
2535 switch (an_bit) {
2536 case AN_REQ_INSPECT_FE:
2537 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2538 break;
2539 case AN_REQ_INSPECT_BE:
2540 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2541 break;
2542 case AN_RES_INSPECT:
2543 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2544 break;
2545 case AN_REQ_HTTP_PROCESS_FE:
2546 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2547 break;
2548 case AN_REQ_HTTP_PROCESS_BE:
2549 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2550 break;
2551 case AN_RES_HTTP_PROCESS_FE:
2552 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2553 break;
2554 }
2555
2556 out:
2557 if (!ret) {
2558 channel_dont_read(chn);
2559 channel_dont_close(chn);
2560 }
2561 return ret;
2562}
2563
2564/* Called when the filtering on the channel ends. */
2565static int
2566spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2567{
2568 struct spoe_context *ctx = filter->ctx;
2569
2570 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2571 " - ctx-flags=0x%08x\n",
2572 (int)now.tv_sec, (int)now.tv_usec,
2573 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2574 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2575
2576 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2577 reset_spoe_context(ctx);
2578 }
2579
2580 return 1;
2581}
2582
2583/********************************************************************
2584 * Functions that manage the filter initialization
2585 ********************************************************************/
2586struct flt_ops spoe_ops = {
2587 /* Manage SPOE filter, called for each filter declaration */
2588 .init = spoe_init,
2589 .deinit = spoe_deinit,
2590 .check = spoe_check,
2591
2592 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002593 .attach = spoe_start,
2594 .detach = spoe_stop,
2595 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002596
2597 /* Handle channels activity */
2598 .channel_start_analyze = spoe_start_analyze,
2599 .channel_pre_analyze = spoe_chn_pre_analyze,
2600 .channel_end_analyze = spoe_end_analyze,
2601};
2602
2603
2604static int
2605cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2606{
2607 const char *err;
2608 int i, err_code = 0;
2609
2610 if ((cfg_scope == NULL && curengine != NULL) ||
2611 (cfg_scope != NULL && curengine == NULL) ||
2612 strcmp(curengine, cfg_scope))
2613 goto out;
2614
2615 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2616 if (!*args[1]) {
2617 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2618 file, linenum);
2619 err_code |= ERR_ALERT | ERR_ABORT;
2620 goto out;
2621 }
2622 if (*args[2]) {
2623 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2624 file, linenum, args[2]);
2625 err_code |= ERR_ALERT | ERR_ABORT;
2626 goto out;
2627 }
2628
2629 err = invalid_char(args[1]);
2630 if (err) {
2631 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2632 file, linenum, *err, args[0], args[1]);
2633 err_code |= ERR_ALERT | ERR_ABORT;
2634 goto out;
2635 }
2636
2637 if (curagent != NULL) {
2638 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2639 file, linenum);
2640 err_code |= ERR_ALERT | ERR_ABORT;
2641 goto out;
2642 }
2643 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2644 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2645 err_code |= ERR_ALERT | ERR_ABORT;
2646 goto out;
2647 }
2648
2649 curagent->id = strdup(args[1]);
2650 curagent->conf.file = strdup(file);
2651 curagent->conf.line = linenum;
2652 curagent->timeout.hello = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002653 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002654 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002655 curagent->var_pfx = NULL;
Christopher Faulet985532d2016-11-16 15:36:19 +01002656 curagent->var_on_error = NULL;
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002657 curagent->flags = 0;
Christopher Faulet48026722016-11-16 15:01:12 +01002658 curagent->cps_max = 0;
2659 curagent->eps_max = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002660
2661 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2662 LIST_INIT(&curagent->messages[i]);
2663 LIST_INIT(&curagent->cache);
2664 LIST_INIT(&curagent->applet_wq);
2665 }
2666 else if (!strcmp(args[0], "use-backend")) {
2667 if (!*args[1]) {
2668 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2669 file, linenum, args[0]);
2670 err_code |= ERR_ALERT | ERR_FATAL;
2671 goto out;
2672 }
2673 if (*args[2]) {
2674 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2675 file, linenum, args[2]);
2676 err_code |= ERR_ALERT | ERR_ABORT;
2677 goto out;
2678 }
2679 free(curagent->b.name);
2680 curagent->b.name = strdup(args[1]);
2681 }
2682 else if (!strcmp(args[0], "messages")) {
2683 int cur_arg = 1;
2684 while (*args[cur_arg]) {
2685 struct spoe_msg_placeholder *mp = NULL;
2686
2687 list_for_each_entry(mp, &curmps, list) {
2688 if (!strcmp(mp->id, args[cur_arg])) {
2689 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2690 file, linenum, args[cur_arg]);
2691 err_code |= ERR_ALERT | ERR_FATAL;
2692 goto out;
2693 }
2694 }
2695
2696 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2697 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2698 err_code |= ERR_ALERT | ERR_ABORT;
2699 goto out;
2700 }
2701 mp->id = strdup(args[cur_arg]);
2702 LIST_ADDQ(&curmps, &mp->list);
2703 cur_arg++;
2704 }
2705 }
2706 else if (!strcmp(args[0], "timeout")) {
2707 unsigned int *tv = NULL;
2708 const char *res;
2709 unsigned timeout;
2710
2711 if (!*args[1]) {
2712 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2713 file, linenum);
2714 err_code |= ERR_ALERT | ERR_FATAL;
2715 goto out;
2716 }
2717 if (!strcmp(args[1], "hello"))
2718 tv = &curagent->timeout.hello;
2719 else if (!strcmp(args[1], "idle"))
2720 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002721 else if (!strcmp(args[1], "processing"))
2722 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002723 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01002724 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002725 file, linenum, args[1]);
2726 err_code |= ERR_ALERT | ERR_FATAL;
2727 goto out;
2728 }
2729 if (!*args[2]) {
2730 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2731 file, linenum, args[1]);
2732 err_code |= ERR_ALERT | ERR_FATAL;
2733 goto out;
2734 }
2735 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2736 if (res) {
2737 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2738 file, linenum, *res, args[1]);
2739 err_code |= ERR_ALERT | ERR_ABORT;
2740 goto out;
2741 }
2742 if (*args[3]) {
2743 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2744 file, linenum, args[3]);
2745 err_code |= ERR_ALERT | ERR_ABORT;
2746 goto out;
2747 }
2748 *tv = MS_TO_TICKS(timeout);
2749 }
2750 else if (!strcmp(args[0], "option")) {
2751 if (!*args[1]) {
2752 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2753 file, linenum, args[0]);
2754 err_code |= ERR_ALERT | ERR_FATAL;
2755 goto out;
2756 }
2757 if (!strcmp(args[1], "var-prefix")) {
2758 char *tmp;
2759
2760 if (!*args[2]) {
2761 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2762 file, linenum, args[0],
2763 args[1]);
2764 err_code |= ERR_ALERT | ERR_FATAL;
2765 goto out;
2766 }
2767 tmp = args[2];
2768 while (*tmp) {
2769 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2770 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2771 file, linenum, args[0], args[1]);
2772 err_code |= ERR_ALERT | ERR_FATAL;
2773 goto out;
2774 }
2775 tmp++;
2776 }
2777 curagent->var_pfx = strdup(args[2]);
2778 }
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002779 else if (!strcmp(args[1], "continue-on-error")) {
2780 if (*args[2]) {
2781 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
Christopher Faulet48026722016-11-16 15:01:12 +01002782 file, linenum, args[2]);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002783 err_code |= ERR_ALERT | ERR_ABORT;
2784 goto out;
2785 }
2786 curagent->flags |= SPOE_FL_CONT_ON_ERR;
2787 }
Christopher Faulet985532d2016-11-16 15:36:19 +01002788 else if (!strcmp(args[1], "set-on-error")) {
2789 char *tmp;
2790
2791 if (!*args[2]) {
2792 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2793 file, linenum, args[0],
2794 args[1]);
2795 err_code |= ERR_ALERT | ERR_FATAL;
2796 goto out;
2797 }
2798 tmp = args[2];
2799 while (*tmp) {
2800 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2801 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2802 file, linenum, args[0], args[1]);
2803 err_code |= ERR_ALERT | ERR_FATAL;
2804 goto out;
2805 }
2806 tmp++;
2807 }
2808 curagent->var_on_error = strdup(args[2]);
2809 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002810 else {
2811 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2812 file, linenum, args[1]);
2813 err_code |= ERR_ALERT | ERR_FATAL;
2814 goto out;
2815 }
Christopher Faulet48026722016-11-16 15:01:12 +01002816 }
2817 else if (!strcmp(args[0], "maxconnrate")) {
2818 if (!*args[1]) {
2819 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2820 file, linenum, args[0]);
2821 err_code |= ERR_ALERT | ERR_FATAL;
2822 goto out;
2823 }
2824 if (*args[2]) {
2825 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2826 file, linenum, args[2]);
2827 err_code |= ERR_ALERT | ERR_ABORT;
2828 goto out;
2829 }
2830 curagent->cps_max = atol(args[1]);
2831 }
2832 else if (!strcmp(args[0], "maxerrrate")) {
2833 if (!*args[1]) {
2834 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2835 file, linenum, args[0]);
2836 err_code |= ERR_ALERT | ERR_FATAL;
2837 goto out;
2838 }
2839 if (*args[2]) {
2840 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2841 file, linenum, args[2]);
2842 err_code |= ERR_ALERT | ERR_ABORT;
2843 goto out;
2844 }
2845 curagent->eps_max = atol(args[1]);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002846 }
2847 else if (*args[0]) {
2848 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2849 file, linenum, args[0]);
2850 err_code |= ERR_ALERT | ERR_FATAL;
2851 goto out;
2852 }
2853 out:
2854 return err_code;
2855}
2856
2857static int
2858cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2859{
2860 struct spoe_message *msg;
2861 struct spoe_arg *arg;
2862 const char *err;
2863 char *errmsg = NULL;
2864 int err_code = 0;
2865
2866 if ((cfg_scope == NULL && curengine != NULL) ||
2867 (cfg_scope != NULL && curengine == NULL) ||
2868 strcmp(curengine, cfg_scope))
2869 goto out;
2870
2871 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2872 if (!*args[1]) {
2873 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2874 file, linenum);
2875 err_code |= ERR_ALERT | ERR_ABORT;
2876 goto out;
2877 }
2878 if (*args[2]) {
2879 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2880 file, linenum, args[2]);
2881 err_code |= ERR_ALERT | ERR_ABORT;
2882 goto out;
2883 }
2884
2885 err = invalid_char(args[1]);
2886 if (err) {
2887 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2888 file, linenum, *err, args[0], args[1]);
2889 err_code |= ERR_ALERT | ERR_ABORT;
2890 goto out;
2891 }
2892
2893 list_for_each_entry(msg, &curmsgs, list) {
2894 if (!strcmp(msg->id, args[1])) {
2895 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2896 " name as another one declared at %s:%d.\n",
2897 file, linenum, args[1], msg->conf.file, msg->conf.line);
2898 err_code |= ERR_ALERT | ERR_FATAL;
2899 goto out;
2900 }
2901 }
2902
2903 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2904 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2905 err_code |= ERR_ALERT | ERR_ABORT;
2906 goto out;
2907 }
2908
2909 curmsg->id = strdup(args[1]);
2910 curmsg->id_len = strlen(curmsg->id);
2911 curmsg->event = SPOE_EV_NONE;
2912 curmsg->conf.file = strdup(file);
2913 curmsg->conf.line = linenum;
2914 LIST_INIT(&curmsg->args);
2915 LIST_ADDQ(&curmsgs, &curmsg->list);
2916 }
2917 else if (!strcmp(args[0], "args")) {
2918 int cur_arg = 1;
2919
2920 curproxy->conf.args.ctx = ARGC_SPOE;
2921 curproxy->conf.args.file = file;
2922 curproxy->conf.args.line = linenum;
2923 while (*args[cur_arg]) {
2924 char *delim = strchr(args[cur_arg], '=');
2925 int idx = 0;
2926
2927 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2928 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2929 err_code |= ERR_ALERT | ERR_ABORT;
2930 goto out;
2931 }
2932
2933 if (!delim) {
2934 arg->name = NULL;
2935 arg->name_len = 0;
2936 delim = args[cur_arg];
2937 }
2938 else {
2939 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2940 arg->name_len = delim - args[cur_arg];
2941 delim++;
2942 }
2943
2944 arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
2945 if (arg->expr == NULL) {
2946 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2947 err_code |= ERR_ALERT | ERR_FATAL;
2948 free(arg->name);
2949 free(arg);
2950 goto out;
2951 }
2952 LIST_ADDQ(&curmsg->args, &arg->list);
2953 cur_arg++;
2954 }
2955 curproxy->conf.args.file = NULL;
2956 curproxy->conf.args.line = 0;
2957 }
2958 else if (!strcmp(args[0], "event")) {
2959 if (!*args[1]) {
2960 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2961 err_code |= ERR_ALERT | ERR_ABORT;
2962 goto out;
2963 }
2964 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2965 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2966 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2967 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2968
2969 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2970 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2971 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2972 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2973 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2974 curmsg->event = SPOE_EV_ON_TCP_RSP;
2975
2976 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2977 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2978 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2979 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2980 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2981 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2982 else {
2983 Alert("parsing [%s:%d] : unkown event '%s'.\n",
2984 file, linenum, args[1]);
2985 err_code |= ERR_ALERT | ERR_ABORT;
2986 goto out;
2987 }
2988 }
2989 else if (!*args[0]) {
2990 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
2991 file, linenum, args[0]);
2992 err_code |= ERR_ALERT | ERR_FATAL;
2993 goto out;
2994 }
2995 out:
2996 free(errmsg);
2997 return err_code;
2998}
2999
3000/* Return -1 on error, else 0 */
3001static int
3002parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
3003 struct flt_conf *fconf, char **err, void *private)
3004{
3005 struct list backup_sections;
3006 struct spoe_config *conf;
3007 struct spoe_message *msg, *msgback;
3008 struct spoe_msg_placeholder *mp, *mpback;
3009 char *file = NULL, *engine = NULL;
3010 int ret, pos = *cur_arg + 1;
3011
3012 conf = calloc(1, sizeof(*conf));
3013 if (conf == NULL) {
3014 memprintf(err, "%s: out of memory", args[*cur_arg]);
3015 goto error;
3016 }
3017 conf->proxy = px;
3018
3019 while (*args[pos]) {
3020 if (!strcmp(args[pos], "config")) {
3021 if (!*args[pos+1]) {
3022 memprintf(err, "'%s' : '%s' option without value",
3023 args[*cur_arg], args[pos]);
3024 goto error;
3025 }
3026 file = args[pos+1];
3027 pos += 2;
3028 }
3029 else if (!strcmp(args[pos], "engine")) {
3030 if (!*args[pos+1]) {
3031 memprintf(err, "'%s' : '%s' option without value",
3032 args[*cur_arg], args[pos]);
3033 goto error;
3034 }
3035 engine = args[pos+1];
3036 pos += 2;
3037 }
3038 else {
3039 memprintf(err, "unknown keyword '%s'", args[pos]);
3040 goto error;
3041 }
3042 }
3043 if (file == NULL) {
3044 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3045 goto error;
3046 }
3047
3048 /* backup sections and register SPOE sections */
3049 LIST_INIT(&backup_sections);
3050 cfg_backup_sections(&backup_sections);
3051 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
3052 cfg_register_section("spoe-message", cfg_parse_spoe_message);
3053
3054 /* Parse SPOE filter configuration file */
3055 curengine = engine;
3056 curproxy = px;
3057 curagent = NULL;
3058 curmsg = NULL;
3059 ret = readcfgfile(file);
3060 curproxy = NULL;
3061
3062 /* unregister SPOE sections and restore previous sections */
3063 cfg_unregister_sections();
3064 cfg_restore_sections(&backup_sections);
3065
3066 if (ret == -1) {
3067 memprintf(err, "Could not open configuration file %s : %s",
3068 file, strerror(errno));
3069 goto error;
3070 }
3071 if (ret & (ERR_ABORT|ERR_FATAL)) {
3072 memprintf(err, "Error(s) found in configuration file %s", file);
3073 goto error;
3074 }
3075
3076 /* Check SPOE agent */
3077 if (curagent == NULL) {
3078 memprintf(err, "No SPOE agent found in file %s", file);
3079 goto error;
3080 }
3081 if (curagent->b.name == NULL) {
3082 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3083 curagent->id, curagent->conf.file, curagent->conf.line);
3084 goto error;
3085 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003086 if (curagent->timeout.hello == TICK_ETERNITY ||
3087 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003088 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003089 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3090 " | While not properly invalid, you will certainly encounter various problems\n"
3091 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003092 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003093 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3094 }
3095 if (curagent->var_pfx == NULL) {
3096 char *tmp = curagent->id;
3097
3098 while (*tmp) {
3099 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3100 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3101 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3102 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3103 goto error;
3104 }
3105 tmp++;
3106 }
3107 curagent->var_pfx = strdup(curagent->id);
3108 }
3109
3110 if (LIST_ISEMPTY(&curmps)) {
3111 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3112 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3113 goto finish;
3114 }
3115
3116 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3117 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3118 if (!strcmp(msg->id, mp->id)) {
3119 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3120 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3121 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3122 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3123 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3124 }
3125 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3126 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3127 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3128 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3129 px->id, msg->conf.file, msg->conf.line);
3130 goto next;
3131 }
3132 if (msg->event == SPOE_EV_NONE) {
3133 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3134 px->id, msg->conf.file, msg->conf.line);
3135 goto next;
3136 }
3137 msg->agent = curagent;
3138 LIST_DEL(&msg->list);
3139 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3140 goto next;
3141 }
3142 }
3143 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3144 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3145 goto error;
3146 next:
3147 continue;
3148 }
3149
3150 finish:
3151 conf->agent = curagent;
3152 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3153 LIST_DEL(&mp->list);
3154 release_spoe_msg_placeholder(mp);
3155 }
3156 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3157 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3158 px->id, msg->id, msg->conf.file, msg->conf.line);
3159 LIST_DEL(&msg->list);
3160 release_spoe_message(msg);
3161 }
3162
3163 *cur_arg = pos;
3164 fconf->ops = &spoe_ops;
3165 fconf->conf = conf;
3166 return 0;
3167
3168 error:
3169 release_spoe_agent(curagent);
3170 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3171 LIST_DEL(&mp->list);
3172 release_spoe_msg_placeholder(mp);
3173 }
3174 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3175 LIST_DEL(&msg->list);
3176 release_spoe_message(msg);
3177 }
3178 free(conf);
3179 return -1;
3180}
3181
3182
3183/* Declare the filter parser for "spoe" keyword */
3184static struct flt_kw_list flt_kws = { "SPOE", { }, {
3185 { "spoe", parse_spoe_flt, NULL },
3186 { NULL, NULL, NULL },
3187 }
3188};
3189
3190__attribute__((constructor))
3191static void __spoe_init(void)
3192{
3193 flt_register_keywords(&flt_kws);
3194
3195 LIST_INIT(&curmsgs);
3196 LIST_INIT(&curmps);
3197 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3198}
3199
3200__attribute__((destructor))
3201static void
3202__spoe_deinit(void)
3203{
3204 pool_destroy2(pool2_spoe_ctx);
3205}