blob: f5918dc5f029eca0c574754dac240976c00adfcd [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
Christopher Faulet48026722016-11-16 15:01:12 +010033#include <proto/freq_ctr.h>
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020034#include <proto/frontend.h>
35#include <proto/log.h>
36#include <proto/proto_http.h>
37#include <proto/proxy.h>
38#include <proto/sample.h>
39#include <proto/session.h>
40#include <proto/signal.h>
41#include <proto/stream.h>
42#include <proto/stream_interface.h>
43#include <proto/task.h>
44#include <proto/vars.h>
45
46#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47#define SPOE_PRINTF(x...) fprintf(x)
48#else
49#define SPOE_PRINTF(x...)
50#endif
51
52/* Helper to get ctx inside an appctx */
53#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
54
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020055/* Minimal size for a frame */
56#define MIN_FRAME_SIZE 256
57
Christopher Fauletea62c2a2016-11-14 10:54:21 +010058/* Flags set on the SPOE agent */
59#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
60
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020061/* Flags set on the SPOE context */
62#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
63#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
64#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
65#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
66
67#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
68
69#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
70#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
71
72/* All possible states for a SPOE context */
73enum spoe_ctx_state {
74 SPOE_CTX_ST_NONE = 0,
75 SPOE_CTX_ST_READY,
76 SPOE_CTX_ST_SENDING_MSGS,
77 SPOE_CTX_ST_WAITING_ACK,
78 SPOE_CTX_ST_DONE,
79 SPOE_CTX_ST_ERROR,
80};
81
82/* All possible states for a SPOE applet */
83enum spoe_appctx_state {
84 SPOE_APPCTX_ST_CONNECT = 0,
85 SPOE_APPCTX_ST_CONNECTING,
86 SPOE_APPCTX_ST_PROCESSING,
87 SPOE_APPCTX_ST_DISCONNECT,
88 SPOE_APPCTX_ST_DISCONNECTING,
89 SPOE_APPCTX_ST_EXIT,
90 SPOE_APPCTX_ST_END,
91};
92
93/* All supported SPOE actions */
94enum spoe_action_type {
95 SPOE_ACT_T_SET_VAR = 1,
96 SPOE_ACT_T_UNSET_VAR,
97 SPOE_ACT_TYPES,
98};
99
100/* All supported SPOE events */
101enum spoe_event {
102 SPOE_EV_NONE = 0,
103
104 /* Request events */
105 SPOE_EV_ON_CLIENT_SESS = 1,
106 SPOE_EV_ON_TCP_REQ_FE,
107 SPOE_EV_ON_TCP_REQ_BE,
108 SPOE_EV_ON_HTTP_REQ_FE,
109 SPOE_EV_ON_HTTP_REQ_BE,
110
111 /* Response events */
112 SPOE_EV_ON_SERVER_SESS,
113 SPOE_EV_ON_TCP_RSP,
114 SPOE_EV_ON_HTTP_RSP,
115
116 SPOE_EV_EVENTS
117};
118
119/* Errors triggerd by SPOE applet */
120enum spoe_frame_error {
121 SPOE_FRM_ERR_NONE = 0,
122 SPOE_FRM_ERR_IO,
123 SPOE_FRM_ERR_TOUT,
124 SPOE_FRM_ERR_TOO_BIG,
125 SPOE_FRM_ERR_INVALID,
126 SPOE_FRM_ERR_NO_VSN,
127 SPOE_FRM_ERR_NO_FRAME_SIZE,
128 SPOE_FRM_ERR_NO_CAP,
129 SPOE_FRM_ERR_BAD_VSN,
130 SPOE_FRM_ERR_BAD_FRAME_SIZE,
131 SPOE_FRM_ERR_UNKNOWN = 99,
132 SPOE_FRM_ERRS,
133};
134
135/* Scopes used for variables set by agents. It is a way to be agnotic to vars
136 * scope. */
137enum spoe_vars_scope {
138 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
139 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
140 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
141 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
142 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
143};
144
145
146/* Describe an argument that will be linked to a message. It is a sample fetch,
147 * with an optional name. */
148struct spoe_arg {
149 char *name; /* Name of the argument, may be NULL */
150 unsigned int name_len; /* The name length, 0 if NULL */
151 struct sample_expr *expr; /* Sample expression */
152 struct list list; /* Used to chain SPOE args */
153};
154
155/* Used during the config parsing only because, when a SPOE agent section is
156 * parsed, messages can be undefined. */
157struct spoe_msg_placeholder {
158 char *id; /* SPOE message placeholder id */
159 struct list list; /* Use to chain SPOE message placeholders */
160};
161
162/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
163 * an argument list (see above) and it is linked to a specific event. */
164struct spoe_message {
165 char *id; /* SPOE message id */
166 unsigned int id_len; /* The message id length */
167 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
168 struct {
169 char *file; /* file where the SPOE message appears */
170 int line; /* line where the SPOE message appears */
171 } conf; /* config information */
172 struct list args; /* Arguments added when the SPOE messages is sent */
173 struct list list; /* Used to chain SPOE messages */
174
175 enum spoe_event event; /* SPOE_EV_* */
176};
177
178/* Describe a SPOE agent. */
179struct spoe_agent {
180 char *id; /* SPOE agent id (name) */
181 struct {
182 char *file; /* file where the SPOE agent appears */
183 int line; /* line where the SPOE agent appears */
184 } conf; /* config information */
185 union {
186 struct proxy *be; /* Backend used by this agent */
187 char *name; /* Backend name used during conf parsing */
188 } b;
189 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100190 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
191 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100192 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200193 } timeout;
194
195 char *var_pfx; /* Prefix used for vars set by the agent */
Christopher Faulet985532d2016-11-16 15:36:19 +0100196 char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
Christopher Fauletea62c2a2016-11-14 10:54:21 +0100197 unsigned int flags; /* SPOE_FL_* */
Christopher Faulet48026722016-11-16 15:01:12 +0100198 unsigned int cps_max; /* Maximum number of connections per second */
199 unsigned int eps_max; /* Maximum number of errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200200
201 struct list cache; /* List used to cache SPOE streams. In
202 * fact, we cache the SPOE applect ctx */
203
204 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
205 * for each supported events */
206
207 struct list applet_wq; /* List of streams waiting for a SPOE applet */
Christopher Faulet48026722016-11-16 15:01:12 +0100208 struct freq_ctr conn_per_sec; /* connections per second */
209 struct freq_ctr err_per_sec; /* connetion errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200210};
211
212/* SPOE filter configuration */
213struct spoe_config {
214 struct proxy *proxy; /* Proxy owning the filter */
215 struct spoe_agent *agent; /* Agent used by this filter */
216 struct proxy agent_fe; /* Agent frontend */
217};
218
219/* SPOE context attached to a stream. It is the main structure that handles the
220 * processing offload */
221struct spoe_context {
222 struct filter *filter; /* The SPOE filter */
223 struct stream *strm; /* The stream that should be offloaded */
224 struct appctx *appctx; /* The SPOE appctx */
225 struct list *messages; /* List of messages that will be sent during the stream processing */
226 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
Christopher Fauleta73e59b2016-12-09 17:30:18 +0100227 struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200228 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
229
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200230 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
231 unsigned int flags; /* SPOE_CTX_FL_* */
232
233 unsigned int stream_id; /* stream_id and frame_id are used */
234 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100235 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200236};
237
Christopher Faulet3b386a32017-02-23 10:17:15 +0100238/* SPOE filter id. Used to identify SPOE filters */
239const char *spoe_filter_id = "SPOE filter";
240
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200241/* Set if the handle on SIGUSR1 is registered */
242static int sighandler_registered = 0;
243
244/* proxy used during the parsing */
245struct proxy *curproxy = NULL;
246
247/* The name of the SPOE engine, used during the parsing */
248char *curengine = NULL;
249
250/* SPOE agent used during the parsing */
251struct spoe_agent *curagent = NULL;
252
253/* SPOE message used during the parsing */
254struct spoe_message *curmsg = NULL;
255
256/* list of SPOE messages and placeholders used during the parsing */
257struct list curmsgs;
258struct list curmps;
259
260/* Pool used to allocate new SPOE contexts */
261static struct pool_head *pool2_spoe_ctx = NULL;
262
263/* Temporary variables used to ease error processing */
264int spoe_status_code = SPOE_FRM_ERR_NONE;
265char spoe_reason[256];
266
267struct flt_ops spoe_ops;
268
269static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
270static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
271static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
272
273/********************************************************************
274 * helper functions/globals
275 ********************************************************************/
276static void
277release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
278{
279 if (!mp)
280 return;
281 free(mp->id);
282 free(mp);
283}
284
285
286static void
287release_spoe_message(struct spoe_message *msg)
288{
289 struct spoe_arg *arg, *back;
290
291 if (!msg)
292 return;
293 free(msg->id);
294 free(msg->conf.file);
295 list_for_each_entry_safe(arg, back, &msg->args, list) {
296 release_sample_expr(arg->expr);
297 free(arg->name);
298 LIST_DEL(&arg->list);
299 free(arg);
300 }
301 free(msg);
302}
303
304static void
305release_spoe_agent(struct spoe_agent *agent)
306{
307 struct spoe_message *msg, *back;
308 int i;
309
310 if (!agent)
311 return;
312 free(agent->id);
313 free(agent->conf.file);
314 free(agent->var_pfx);
Christopher Faulet985532d2016-11-16 15:36:19 +0100315 free(agent->var_on_error);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200316 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
317 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
318 LIST_DEL(&msg->list);
319 release_spoe_message(msg);
320 }
321 }
322 free(agent);
323}
324
325static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
326 [SPOE_FRM_ERR_NONE] = "normal",
327 [SPOE_FRM_ERR_IO] = "I/O error",
328 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
329 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
330 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
331 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
332 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
333 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
334 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
335 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
336 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
337};
338
339static const char *spoe_event_str[SPOE_EV_EVENTS] = {
340 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
341 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
342 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
343 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
344 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
345
346 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
347 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
348 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
349};
350
351
352#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
353
354static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
355 [SPOE_CTX_ST_NONE] = "NONE",
356 [SPOE_CTX_ST_READY] = "READY",
357 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
358 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
359 [SPOE_CTX_ST_DONE] = "DONE",
360 [SPOE_CTX_ST_ERROR] = "ERROR",
361};
362
363static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
364 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
365 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
366 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
367 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
368 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
369 [SPOE_APPCTX_ST_EXIT] = "EXIT",
370 [SPOE_APPCTX_ST_END] = "END",
371};
372
373#endif
374/********************************************************************
375 * Functions that encode/decode SPOE frames
376 ********************************************************************/
377/* Frame Types sent by HAProxy and by agents */
378enum spoe_frame_type {
379 /* Frames sent by HAProxy */
380 SPOE_FRM_T_HAPROXY_HELLO = 1,
381 SPOE_FRM_T_HAPROXY_DISCON,
382 SPOE_FRM_T_HAPROXY_NOTIFY,
383
384 /* Frames sent by the agents */
385 SPOE_FRM_T_AGENT_HELLO = 101,
386 SPOE_FRM_T_AGENT_DISCON,
387 SPOE_FRM_T_AGENT_ACK
388};
389
390/* All supported data types */
391enum spoe_data_type {
392 SPOE_DATA_T_NULL = 0,
393 SPOE_DATA_T_BOOL,
394 SPOE_DATA_T_INT32,
395 SPOE_DATA_T_UINT32,
396 SPOE_DATA_T_INT64,
397 SPOE_DATA_T_UINT64,
398 SPOE_DATA_T_IPV4,
399 SPOE_DATA_T_IPV6,
400 SPOE_DATA_T_STR,
401 SPOE_DATA_T_BIN,
402 SPOE_DATA_TYPES
403};
404
405/* Masks to get data type or flags value */
406#define SPOE_DATA_T_MASK 0x0F
407#define SPOE_DATA_FL_MASK 0xF0
408
409/* Flags to set Boolean values */
410#define SPOE_DATA_FL_FALSE 0x00
411#define SPOE_DATA_FL_TRUE 0x10
412
413/* Helper to get static string length, excluding the terminating null byte */
414#define SLEN(str) (sizeof(str)-1)
415
416/* Predefined key used in HELLO/DISCONNECT frames */
417#define SUPPORTED_VERSIONS_KEY "supported-versions"
418#define VERSION_KEY "version"
419#define MAX_FRAME_SIZE_KEY "max-frame-size"
420#define CAPABILITIES_KEY "capabilities"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100421#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200422#define STATUS_CODE_KEY "status-code"
423#define MSG_KEY "message"
424
425struct spoe_version {
426 char *str;
427 int min;
428 int max;
429};
430
431/* All supported versions */
432static struct spoe_version supported_versions[] = {
433 {"1.0", 1000, 1000},
434 {NULL, 0, 0}
435};
436
437/* Comma-separated list of supported versions */
438#define SUPPORTED_VERSIONS_VAL "1.0"
439
440/* Comma-separated list of supported capabilities (none for now) */
441#define CAPABILITIES_VAL ""
442
443static int
444decode_spoe_version(const char *str, size_t len)
445{
446 char tmp[len+1], *start, *end;
447 double d;
448 int vsn = -1;
449
450 memset(tmp, 0, len+1);
451 memcpy(tmp, str, len);
452
453 start = tmp;
454 while (isspace(*start))
455 start++;
456
457 d = strtod(start, &end);
458 if (d == 0 || start == end)
459 goto out;
460
461 if (*end) {
462 while (isspace(*end))
463 end++;
464 if (*end)
465 goto out;
466 }
467 vsn = (int)(d * 1000);
468 out:
469 return vsn;
470}
471
472/* Encode a variable-length integer. This function never fails and returns the
473 * number of written bytes. */
474static int
475encode_spoe_varint(uint64_t i, char *buf)
476{
477 int idx;
478
479 if (i < 240) {
480 buf[0] = (unsigned char)i;
481 return 1;
482 }
483
484 buf[0] = (unsigned char)i | 240;
485 i = (i - 240) >> 4;
486 for (idx = 1; i >= 128; ++idx) {
487 buf[idx] = (unsigned char)i | 128;
488 i = (i - 128) >> 7;
489 }
490 buf[idx++] = (unsigned char)i;
491 return idx;
492}
493
494/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
495 * happens when the buffer's end in reached. On success, the number of read
496 * bytes is returned. */
497static int
498decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
499{
500 unsigned char *msg = (unsigned char *)buf;
501 int idx = 0;
502
503 if (msg > (unsigned char *)end)
504 return -1;
505
506 if (msg[0] < 240) {
507 *i = msg[0];
508 return 1;
509 }
510 *i = msg[0];
511 do {
512 ++idx;
513 if (msg+idx > (unsigned char *)end)
514 return -1;
515 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
516 } while (msg[idx] >= 128);
517 return (idx + 1);
518}
519
520/* Encode a string. The string will be prefix by its length, encoded as a
521 * variable-length integer. This function never fails and returns the number of
522 * written bytes. */
523static int
524encode_spoe_string(const char *str, size_t len, char *dst)
525{
526 int idx = 0;
527
528 if (!len) {
529 dst[0] = 0;
530 return 1;
531 }
532
533 idx += encode_spoe_varint(len, dst);
534 memcpy(dst+idx, str, len);
535 return (idx + len);
536}
537
538/* Decode a string. Its length is decoded first as a variable-length integer. If
539 * it succeeds, and if the string length is valid, the begin of the string is
540 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
541 * read is returned. If an error occurred, -1 is returned and <*str> remains
542 * NULL. */
543static int
544decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
545{
546 int i, idx = 0;
547
548 *str = NULL;
549 *len = 0;
550
551 if ((i = decode_spoe_varint(buf, end, len)) == -1)
552 goto error;
553 idx += i;
554 if (buf + idx + *len > end)
555 goto error;
556
557 *str = buf+idx;
558 return (idx + *len);
559
560 error:
561 return -1;
562}
563
564/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
565 * of bytes read is returned. A types data is composed of a type (1 byte) and
566 * corresponding data:
567 * - boolean: non additional data (0 bytes)
568 * - integers: a variable-length integer (see decode_spoe_varint)
569 * - ipv4: 4 bytes
570 * - ipv6: 16 bytes
571 * - binary and string: a buffer prefixed by its size, a variable-length
572 * integer (see decode_spoe_string) */
573static int
574skip_spoe_data(char *frame, char *end)
575{
576 uint64_t sz = 0;
577 int i, idx = 0;
578
579 if (frame > end)
580 return -1;
581
582 switch (frame[idx++] & SPOE_DATA_T_MASK) {
583 case SPOE_DATA_T_BOOL:
584 break;
585 case SPOE_DATA_T_INT32:
586 case SPOE_DATA_T_INT64:
587 case SPOE_DATA_T_UINT32:
588 case SPOE_DATA_T_UINT64:
589 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
590 return -1;
591 idx += i;
592 break;
593 case SPOE_DATA_T_IPV4:
594 idx += 4;
595 break;
596 case SPOE_DATA_T_IPV6:
597 idx += 16;
598 break;
599 case SPOE_DATA_T_STR:
600 case SPOE_DATA_T_BIN:
601 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
602 return -1;
603 idx += i + sz;
604 break;
605 }
606
607 if (frame+idx > end)
608 return -1;
609 return idx;
610}
611
612/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
613 * number of read bytes is returned. See skip_spoe_data for details. */
614static int
615decode_spoe_data(char *frame, char *end, struct sample *smp)
616{
617 uint64_t sz = 0;
618 int type, i, idx = 0;
619
620 if (frame > end)
621 return -1;
622
623 type = frame[idx++];
624 switch (type & SPOE_DATA_T_MASK) {
625 case SPOE_DATA_T_BOOL:
626 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
627 smp->data.type = SMP_T_BOOL;
628 break;
629 case SPOE_DATA_T_INT32:
630 case SPOE_DATA_T_INT64:
631 case SPOE_DATA_T_UINT32:
632 case SPOE_DATA_T_UINT64:
633 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
634 return -1;
635 idx += i;
636 smp->data.type = SMP_T_SINT;
637 break;
638 case SPOE_DATA_T_IPV4:
639 if (frame+idx+4 > end)
640 return -1;
641 memcpy(&smp->data.u.ipv4, frame+idx, 4);
642 smp->data.type = SMP_T_IPV4;
643 idx += 4;
644 break;
645 case SPOE_DATA_T_IPV6:
646 if (frame+idx+16 > end)
647 return -1;
648 memcpy(&smp->data.u.ipv6, frame+idx, 16);
649 smp->data.type = SMP_T_IPV6;
650 idx += 16;
651 break;
652 case SPOE_DATA_T_STR:
653 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
654 return -1;
655 idx += i;
656 if (frame+idx+sz > end)
657 return -1;
658 smp->data.u.str.str = frame+idx;
659 smp->data.u.str.len = sz;
660 smp->data.type = SMP_T_STR;
661 idx += sz;
662 break;
663 case SPOE_DATA_T_BIN:
664 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
665 return -1;
666 idx += i;
667 if (frame+idx+sz > end)
668 return -1;
669 smp->data.u.str.str = frame+idx;
670 smp->data.u.str.len = sz;
671 smp->data.type = SMP_T_BIN;
672 idx += sz;
673 break;
674 }
675
676 if (frame+idx > end)
677 return -1;
678 return idx;
679}
680
681/* Skip an action in a frame received from an agent. If an error occurred, -1 is
682 * returned, otherwise the number of read bytes is returned. An action is
683 * composed of the action type followed by a typed data. */
684static int
685skip_spoe_action(char *frame, char *end)
686{
687 int n, i, idx = 0;
688
689 if (frame+2 > end)
690 return -1;
691
692 idx++; /* Skip the action type */
693 n = frame[idx++];
694 while (n-- > 0) {
695 if ((i = skip_spoe_data(frame+idx, end)) == -1)
696 return -1;
697 idx += i;
698 }
699
700 if (frame+idx > end)
701 return -1;
702 return idx;
703}
704
705/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
706 * success, 0 if the frame can be ignored and -1 if an error occurred. */
707static int
708prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
709{
710 int idx = 0;
711 size_t max = (7 /* TYPE + METADATA */
712 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
713 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
714 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
715
716 if (size < max)
717 return -1;
718
719 /* Frame type */
720 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
721
722 /* No flags for now */
723 memset(frame+idx, 0, 4);
724 idx += 4;
725
726 /* No stream-id and frame-id for HELLO frames */
727 frame[idx++] = 0;
728 frame[idx++] = 0;
729
730 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
731 * and "capabilities" */
732
733 /* "supported-versions" K/V item */
734 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
735 frame[idx++] = SPOE_DATA_T_STR;
736 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
737
738 /* "max-fram-size" K/V item */
739 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
740 frame[idx++] = SPOE_DATA_T_UINT32;
741 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
742
743 /* "capabilities" K/V item */
744 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
745 frame[idx++] = SPOE_DATA_T_STR;
746 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
747
748 return idx;
749}
750
751/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
752 * size on success, 0 if the frame can be ignored and -1 if an error
753 * occurred. */
754static int
755prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
756{
757 const char *reason;
758 int rlen, idx = 0;
759 size_t max = (7 /* TYPE + METADATA */
760 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
761 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
762
763 if (size < max)
764 return -1;
765
766 /* Get the message corresponding to the status code */
767 if (spoe_status_code >= SPOE_FRM_ERRS)
768 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
769 reason = spoe_frm_err_reasons[spoe_status_code];
770 rlen = strlen(reason);
771
772 /* Frame type */
773 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
774
775 /* No flags for now */
776 memset(frame+idx, 0, 4);
777 idx += 4;
778
779 /* No stream-id and frame-id for DISCONNECT frames */
780 frame[idx++] = 0;
781 frame[idx++] = 0;
782
783 /* There are 2 mandatory items: "status-code" and "message" */
784
785 /* "status-code" K/V item */
786 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
787 frame[idx++] = SPOE_DATA_T_UINT32;
788 idx += encode_spoe_varint(spoe_status_code, frame+idx);
789
790 /* "message" K/V item */
791 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
792 frame[idx++] = SPOE_DATA_T_STR;
793 idx += encode_spoe_string(reason, rlen, frame+idx);
794
795 return idx;
796}
797
798/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
799 * success, 0 if the frame can be ignored and -1 if an error occurred. */
800static int
801prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
802{
803 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
804 int idx = 0;
805
806 if (size < APPCTX_SPOE(appctx).max_frame_size)
807 return -1;
808
809 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
810
811 /* No flags for now */
812 memset(frame+idx, 0, 4);
813 idx += 4;
814
815 /* Set stream-id and frame-id */
816 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
817 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
818
819 /* Copy encoded messages */
820 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
821 idx += ctx->buffer->i;
822
823 return idx;
824}
825
826/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
827 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
828static int
829handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
830{
831 int vsn, max_frame_size;
832 int i, idx = 0;
833 size_t min_size = (7 /* TYPE + METADATA */
834 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
835 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
836 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
837
838 /* Check frame type */
839 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
840 return 0;
841
842 if (size < min_size) {
843 spoe_status_code = SPOE_FRM_ERR_INVALID;
844 return -1;
845 }
846
847 /* Skip flags: fragmentation is not supported for now */
848 idx += 4;
849
850 /* stream-id and frame-id must be cleared */
851 if (frame[idx] != 0 || frame[idx+1] != 0) {
852 spoe_status_code = SPOE_FRM_ERR_INVALID;
853 return -1;
854 }
855 idx += 2;
856
857 /* There are 3 mandatory items: "version", "max-frame-size" and
858 * "capabilities" */
859
860 /* Loop on K/V items */
861 vsn = max_frame_size = 0;
862 while (idx < size) {
863 char *str;
864 uint64_t sz;
865
866 /* Decode the item key */
867 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
868 if (str == NULL) {
869 spoe_status_code = SPOE_FRM_ERR_INVALID;
870 return -1;
871 }
872 /* Check "version" K/V item */
873 if (!memcmp(str, VERSION_KEY, sz)) {
874 /* The value must be a string */
875 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
876 spoe_status_code = SPOE_FRM_ERR_INVALID;
877 return -1;
878 }
879 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
880 if (str == NULL) {
881 spoe_status_code = SPOE_FRM_ERR_INVALID;
882 return -1;
883 }
884
885 vsn = decode_spoe_version(str, sz);
886 if (vsn == -1) {
887 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
888 return -1;
889 }
890 for (i = 0; supported_versions[i].str != NULL; ++i) {
891 if (vsn >= supported_versions[i].min &&
892 vsn <= supported_versions[i].max)
893 break;
894 }
895 if (supported_versions[i].str == NULL) {
896 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
897 return -1;
898 }
899 }
900 /* Check "max-frame-size" K/V item */
901 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
902 int type;
903
904 /* The value must be integer */
905 type = frame[idx++];
906 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
907 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
908 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
909 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
910 spoe_status_code = SPOE_FRM_ERR_INVALID;
911 return -1;
912 }
913 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
914 spoe_status_code = SPOE_FRM_ERR_INVALID;
915 return -1;
916 }
917 idx += i;
918 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
919 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
920 return -1;
921 }
922 max_frame_size = sz;
923 }
924 /* Skip "capabilities" K/V item for now */
925 else {
926 /* Silently ignore unknown item */
927 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
928 spoe_status_code = SPOE_FRM_ERR_INVALID;
929 return -1;
930 }
931 idx += i;
932 }
933 }
934
935 /* Final checks */
936 if (!vsn) {
937 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
938 return -1;
939 }
940 if (!max_frame_size) {
941 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
942 return -1;
943 }
944
945 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
946 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
947 return idx;
948}
949
950/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
951 * bytes on success, 0 if the frame can be ignored and -1 if an error
952 * occurred. */
953static int
954handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
955{
956 int i, idx = 0;
957 size_t min_size = (7 /* TYPE + METADATA */
958 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
959 + 1 + SLEN(MSG_KEY) + 1 + 1);
960
961 /* Check frame type */
962 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
963 return 0;
964
965 if (size < min_size) {
966 spoe_status_code = SPOE_FRM_ERR_INVALID;
967 return -1;
968 }
969
970 /* Skip flags: fragmentation is not supported for now */
971 idx += 4;
972
973 /* stream-id and frame-id must be cleared */
974 if (frame[idx] != 0 || frame[idx+1] != 0) {
975 spoe_status_code = SPOE_FRM_ERR_INVALID;
976 return -1;
977 }
978 idx += 2;
979
980 /* There are 2 mandatory items: "status-code" and "message" */
981
982 /* Loop on K/V items */
983 while (idx < size) {
984 char *str;
985 uint64_t sz;
986
987 /* Decode the item key */
988 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
989 if (str == NULL) {
990 spoe_status_code = SPOE_FRM_ERR_INVALID;
991 return -1;
992 }
993
994 /* Check "status-code" K/V item */
995 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
996 int type;
997
998 /* The value must be an integer */
999 type = frame[idx++];
1000 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
1001 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
1002 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1003 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1004 spoe_status_code = SPOE_FRM_ERR_INVALID;
1005 return -1;
1006 }
1007 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1008 spoe_status_code = SPOE_FRM_ERR_INVALID;
1009 return -1;
1010 }
1011 idx += i;
1012 spoe_status_code = sz;
1013 }
1014
1015 /* Check "message" K/V item */
1016 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1017 /* The value must be a string */
1018 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1019 spoe_status_code = SPOE_FRM_ERR_INVALID;
1020 return -1;
1021 }
1022 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1023 if (str == NULL || sz > 255) {
1024 spoe_status_code = SPOE_FRM_ERR_INVALID;
1025 return -1;
1026 }
1027 memcpy(spoe_reason, str, sz);
1028 spoe_reason[sz] = 0;
1029 }
1030 else {
1031 /* Silently ignore unknown item */
1032 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1033 spoe_status_code = SPOE_FRM_ERR_INVALID;
1034 return -1;
1035 }
1036 idx += i;
1037 }
1038 }
1039
1040 return idx;
1041}
1042
1043
1044/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1045 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1046static int
1047handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1048{
1049 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1050 uint64_t stream_id, frame_id;
1051 int idx = 0;
1052 size_t min_size = (7 /* TYPE + METADATA */);
1053
1054 /* Check frame type */
1055 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1056 return 0;
1057
1058 if (size < min_size) {
1059 spoe_status_code = SPOE_FRM_ERR_INVALID;
1060 return -1;
1061 }
1062
1063 /* Skip flags: fragmentation is not supported for now */
1064 idx += 4;
1065
1066 /* Get the stream-id and the frame-id */
1067 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1068 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1069
1070 /* Check stream-id and frame-id */
1071 if (ctx->stream_id != (unsigned int)stream_id ||
1072 ctx->frame_id != (unsigned int)frame_id)
1073 return 0;
1074
1075 /* Copy encoded actions */
1076 b_reset(ctx->buffer);
1077 memcpy(ctx->buffer->p, frame+idx, size-idx);
1078 ctx->buffer->i = size-idx;
1079
1080 return idx;
1081}
1082
Christopher Fauletba7bc162016-11-07 21:07:38 +01001083/* This function is used in cfgparse.c and declared in proto/checks.h. It
1084 * prepare the request to send to agents during a healthcheck. It returns 0 on
1085 * success and -1 if an error occurred. */
1086int
1087prepare_spoe_healthcheck_request(char **req, int *len)
1088{
1089 struct appctx a;
1090 char *frame, buf[global.tune.bufsize];
1091 unsigned int framesz;
1092 int idx;
1093
1094 memset(&a, 0, sizeof(a));
1095 memset(buf, 0, sizeof(buf));
1096 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1097
1098 frame = buf+4;
1099 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1100 if (idx <= 0)
1101 return -1;
1102 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1103 return -1;
1104
1105 /* "healthcheck" K/V item */
1106 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1107 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1108
1109 framesz = htonl(idx);
1110 memcpy(buf, (char *)&framesz, 4);
1111
1112 if ((*req = malloc(idx+4)) == NULL)
1113 return -1;
1114 memcpy(*req, buf, idx+4);
1115 *len = idx+4;
1116 return 0;
1117}
1118
1119/* This function is used in checks.c and declared in proto/checks.h. It decode
1120 * the response received from an agent during a healthcheck. It returns 0 on
1121 * success and -1 if an error occurred. */
1122int
1123handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1124{
1125 struct appctx a;
1126 int r;
1127
1128 memset(&a, 0, sizeof(a));
1129 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1130
1131 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1132 goto error;
1133 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1134 if (r == 0)
1135 spoe_status_code = SPOE_FRM_ERR_INVALID;
1136 goto error;
1137 }
1138
1139 return 0;
1140
1141 error:
1142 if (spoe_status_code >= SPOE_FRM_ERRS)
1143 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1144 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1145 return -1;
1146}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001147
1148/********************************************************************
1149 * Functions that manage the SPOE applet
1150 ********************************************************************/
1151/* Callback function that catches applet timeouts. If a timeout occurred, we set
1152 * <appctx->st1> flag and the SPOE applet is woken up. */
1153static struct task *
1154process_spoe_applet(struct task * task)
1155{
1156 struct appctx *appctx = task->context;
1157
1158 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1159 if (tick_is_expired(task->expire, now_ms)) {
1160 task->expire = TICK_ETERNITY;
1161 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1162 }
1163 si_applet_want_get(appctx->owner);
1164 appctx_wakeup(appctx);
1165 return task;
1166}
1167
1168/* Remove a SPOE applet from the agent cache */
1169static void
1170remove_spoe_applet_from_cache(struct appctx *appctx)
1171{
1172 struct appctx *a, *back;
1173 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1174
1175 if (LIST_ISEMPTY(&agent->cache))
1176 return;
1177
1178 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1179 if (a == appctx) {
1180 LIST_DEL(&APPCTX_SPOE(appctx).list);
1181 break;
1182 }
1183 }
1184}
1185
1186
1187/* Callback function that releases a SPOE applet. This happens when the
1188 * connection with the agent is closed. */
1189static void
1190release_spoe_applet(struct appctx *appctx)
1191{
1192 struct stream_interface *si = appctx->owner;
1193 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1194 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1195
1196 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1197 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1198 on_new_spoe_appctx_failure(agent);
1199
1200 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1201 si_shutw(si);
1202 si_shutr(si);
1203 si_ic(si)->flags |= CF_READ_NULL;
1204 appctx->st0 = SPOE_APPCTX_ST_END;
1205 }
1206
1207 if (ctx != NULL) {
1208 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1209 ctx->appctx = NULL;
1210 }
1211
1212 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1213 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1214 __FUNCTION__, appctx);
1215
1216 /* Release the task attached to the SPOE applet */
1217 if (APPCTX_SPOE(appctx).task) {
1218 task_delete(APPCTX_SPOE(appctx).task);
1219 task_free(APPCTX_SPOE(appctx).task);
1220 }
1221
1222 /* And remove it from the agent cache */
1223 remove_spoe_applet_from_cache(appctx);
1224 APPCTX_SPOE(appctx).ctx = NULL;
1225}
1226
1227/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1228 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1229 * encoded using the callback function <prepare>. */
1230static int
1231send_spoe_frame(struct appctx *appctx,
1232 int (*prepare)(struct appctx *, char *, size_t))
1233{
1234 struct stream_interface *si = appctx->owner;
1235 int framesz, ret;
1236 uint32_t netint;
1237
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001238 if (si_ic(si)->buf->size == 0)
1239 return -1;
1240
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001241 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1242 if (ret <= 0)
1243 goto skip_or_error;
1244 framesz = ret;
1245 netint = htonl(framesz);
1246 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1247 if (ret > 0)
1248 ret = bi_putblk(si_ic(si), trash.str, framesz);
1249 if (ret <= 0) {
1250 if (ret == -1)
1251 return -1;
1252 return -2;
1253 }
1254 return 1;
1255
1256 skip_or_error:
1257 if (!ret)
1258 return -1;
1259 return -2;
1260}
1261
1262/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1263 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1264 * is decoded using the callback function <handle>. */
1265static int
1266recv_spoe_frame(struct appctx *appctx,
1267 int (*handle)(struct appctx *, char *, size_t))
1268{
1269 struct stream_interface *si = appctx->owner;
1270 int framesz, ret;
1271 uint32_t netint;
1272
1273 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1274 if (ret <= 0)
1275 goto empty_or_error;
1276 framesz = ntohl(netint);
1277 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1278 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1279 return -2;
1280 }
1281
1282 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1283 if (ret <= 0)
1284 goto empty_or_error;
1285 bo_skip(si_oc(si), ret+sizeof(netint));
1286
1287 /* First check if the received frame is a DISCONNECT frame */
1288 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1289 if (ret != 0) {
1290 if (ret > 0) {
1291 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1292 " - disconnected by peer (%d): %s\n",
1293 (int)now.tv_sec, (int)now.tv_usec,
1294 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1295 __FUNCTION__, appctx, spoe_status_code,
1296 spoe_reason);
1297 return 2;
1298 }
1299 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1300 " - error on frame (%s)\n",
1301 (int)now.tv_sec, (int)now.tv_usec,
1302 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1303 __FUNCTION__, appctx,
1304 spoe_frm_err_reasons[spoe_status_code]);
1305 return -2;
1306 }
1307 if (handle == NULL)
1308 goto out;
1309
1310 /* If not, try to decode it */
1311 ret = handle(appctx, trash.str, framesz);
1312 if (ret <= 0) {
1313 if (!ret)
1314 return -1;
1315 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1316 " - error on frame (%s)\n",
1317 (int)now.tv_sec, (int)now.tv_usec,
1318 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1319 __FUNCTION__, appctx,
1320 spoe_frm_err_reasons[spoe_status_code]);
1321 return -2;
1322 }
1323 out:
1324 return 1;
1325
1326 empty_or_error:
1327 if (!ret)
1328 return 0;
1329 spoe_status_code = SPOE_FRM_ERR_IO;
1330 return -2;
1331}
1332
1333/* I/O Handler processing messages exchanged with the agent */
1334static void
1335handle_spoe_applet(struct appctx *appctx)
1336{
1337 struct stream_interface *si = appctx->owner;
1338 struct stream *s = si_strm(si);
1339 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1340 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1341 int ret;
1342
1343 switchstate:
1344 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1345 " - appctx-state=%s\n",
1346 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1347 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1348
1349 switch (appctx->st0) {
1350 case SPOE_APPCTX_ST_CONNECT:
1351 spoe_status_code = SPOE_FRM_ERR_NONE;
1352 if (si->state <= SI_ST_CON) {
1353 si_applet_want_put(si);
1354 task_wakeup(s->task, TASK_WOKEN_MSG);
1355 break;
1356 }
1357 else if (si->state != SI_ST_EST) {
1358 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1359 on_new_spoe_appctx_failure(agent);
1360 goto switchstate;
1361 }
1362 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1363 if (ret < 0) {
1364 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1365 on_new_spoe_appctx_failure(agent);
1366 goto switchstate;
1367 }
1368 else if (!ret)
1369 goto full;
1370
1371 /* Hello frame was sent. Set the hello timeout and
1372 * wait for the reply. */
1373 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1374 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1375 /* fall through */
1376
1377 case SPOE_APPCTX_ST_CONNECTING:
1378 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1379 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1380 on_new_spoe_appctx_failure(agent);
1381 goto switchstate;
1382 }
1383 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1384 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1385 " - Connection timed out\n",
1386 (int)now.tv_sec, (int)now.tv_usec,
1387 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1388 __FUNCTION__, appctx);
1389 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1390 on_new_spoe_appctx_failure(agent);
1391 goto switchstate;
1392 }
1393 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1394 if (ret < 0) {
1395 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1396 on_new_spoe_appctx_failure(agent);
1397 goto switchstate;
1398 }
1399 if (ret == 2) {
1400 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1401 on_new_spoe_appctx_failure(agent);
1402 goto switchstate;
1403 }
1404 if (!ret)
1405 goto out;
1406
1407 /* hello handshake is finished, set the idle timeout,
1408 * Add the appctx in the agent cache, decrease the
1409 * number of new applets and wake up waiting streams. */
1410 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1411 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1412 on_new_spoe_appctx_success(agent, appctx);
1413 break;
1414
1415 case SPOE_APPCTX_ST_PROCESSING:
1416 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1417 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1418 goto switchstate;
1419 }
1420 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1421 spoe_status_code = SPOE_FRM_ERR_TOUT;
1422 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1423 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1424 goto switchstate;
1425 }
1426 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1427 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1428 if (ret < 0) {
1429 if (ret == -1) {
1430 ctx->state = SPOE_CTX_ST_ERROR;
1431 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1432 goto skip_notify_frame;
1433 }
1434 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1435 goto switchstate;
1436 }
1437 else if (!ret)
1438 goto full;
1439 ctx->state = SPOE_CTX_ST_WAITING_ACK;
Christopher Faulet03a34492016-11-19 16:47:56 +01001440 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001441 }
1442
1443 skip_notify_frame:
1444 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1445 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1446 if (ret < 0) {
1447 if (ret == -1)
1448 goto skip_notify_frame;
1449 ctx->state = SPOE_CTX_ST_ERROR;
1450 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1451 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1452 goto switchstate;
1453 }
1454 if (!ret)
1455 goto out;
1456 if (ret == 2) {
1457 ctx->state = SPOE_CTX_ST_ERROR;
1458 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1459 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1460 goto switchstate;
1461 }
1462 ctx->state = SPOE_CTX_ST_DONE;
1463 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1464 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1465 }
1466 else {
1467 if (stopping) {
1468 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1469 goto switchstate;
1470 }
1471
1472 ret = recv_spoe_frame(appctx, NULL);
1473 if (ret < 0) {
1474 if (ret == -1)
1475 goto skip_notify_frame;
1476 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1477 goto switchstate;
1478 }
1479 if (!ret)
1480 goto out;
1481 if (ret == 2) {
1482 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1483 goto switchstate;
1484 }
1485 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1486 }
1487 break;
1488
1489 case SPOE_APPCTX_ST_DISCONNECT:
1490 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1491 if (ret < 0) {
1492 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1493 goto switchstate;
1494 }
1495 else if (!ret)
1496 goto full;
1497 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1498 " - disconnected by HAProxy (%d): %s\n",
1499 (int)now.tv_sec, (int)now.tv_usec,
1500 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1501 __FUNCTION__, appctx, spoe_status_code,
1502 spoe_frm_err_reasons[spoe_status_code]);
1503
Christopher Faulet03a34492016-11-19 16:47:56 +01001504 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001505 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1506 /* fall through */
1507
1508 case SPOE_APPCTX_ST_DISCONNECTING:
1509 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1510 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1511 goto switchstate;
1512 }
1513 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1514 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1515 goto switchstate;
1516 }
1517 ret = recv_spoe_frame(appctx, NULL);
1518 if (ret < 0 || ret == 2) {
1519 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1520 goto switchstate;
1521 }
1522 break;
1523
1524 case SPOE_APPCTX_ST_EXIT:
1525 si_shutw(si);
1526 si_shutr(si);
1527 si_ic(si)->flags |= CF_READ_NULL;
1528 appctx->st0 = SPOE_APPCTX_ST_END;
1529 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1530 /* fall through */
1531
1532 case SPOE_APPCTX_ST_END:
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001533 return;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001534 }
1535
1536 out:
1537 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1538 task_queue(APPCTX_SPOE(appctx).task);
1539 si_oc(si)->flags |= CF_READ_DONTWAIT;
1540 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1541 return;
1542 full:
1543 si_applet_cant_put(si);
1544 goto out;
1545}
1546
1547struct applet spoe_applet = {
1548 .obj_type = OBJ_TYPE_APPLET,
1549 .name = "<SPOE>", /* used for logging */
1550 .fct = handle_spoe_applet,
1551 .release = release_spoe_applet,
1552};
1553
1554/* Create a SPOE applet. On success, the created applet is returned, else
1555 * NULL. */
1556static struct appctx *
1557create_spoe_appctx(struct spoe_config *conf)
1558{
1559 struct appctx *appctx;
1560 struct session *sess;
1561 struct task *task;
1562 struct stream *strm;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001563
1564 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1565 goto out_error;
1566
1567 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1568 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1569 goto out_free_appctx;
1570 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1571 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1572 APPCTX_SPOE(appctx).task->context = appctx;
1573 APPCTX_SPOE(appctx).agent = conf->agent;
1574 APPCTX_SPOE(appctx).ctx = NULL;
1575 APPCTX_SPOE(appctx).version = 0;
1576 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1577 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1578
Willy Tarreau5820a362016-12-22 15:59:02 +01001579 sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001580 if (!sess)
1581 goto out_free_spoe;
1582
1583 if ((task = task_new()) == NULL)
1584 goto out_free_sess;
1585
1586 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1587 goto out_free_task;
1588
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001589 stream_set_backend(strm, conf->agent->b.be);
1590
1591 /* applet is waiting for data */
1592 si_applet_cant_get(&strm->si[0]);
1593 appctx_wakeup(appctx);
1594
Christopher Faulet48026722016-11-16 15:01:12 +01001595 /* Increase the per-process number of cumulated connections */
1596 if (conf->agent->cps_max > 0)
1597 update_freq_ctr(&conf->agent->conn_per_sec, 1);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001598
1599 strm->do_log = NULL;
1600 strm->res.flags |= CF_READ_DONTWAIT;
1601
1602 conf->agent_fe.feconn++;
1603 jobs++;
1604 totalconn++;
1605
1606 return appctx;
1607
1608 /* Error unrolling */
1609 out_free_task:
1610 task_free(task);
1611 out_free_sess:
1612 session_free(sess);
1613 out_free_spoe:
1614 task_free(APPCTX_SPOE(appctx).task);
1615 out_free_appctx:
1616 appctx_free(appctx);
1617 out_error:
1618 return NULL;
1619}
1620
1621/* Wake up a SPOE applet attached to a SPOE context. */
1622static void
1623wakeup_spoe_appctx(struct spoe_context *ctx)
1624{
1625 if (ctx->appctx == NULL)
1626 return;
1627 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1628 si_applet_want_get(ctx->appctx->owner);
1629 si_applet_want_put(ctx->appctx->owner);
1630 appctx_wakeup(ctx->appctx);
1631 }
1632}
1633
1634
1635/* Run across the list of pending streams waiting for a SPOE applet and wake the
1636 * first. */
1637static void
1638offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1639{
1640 struct spoe_context *ctx;
1641
Christopher Fauletf7a30922016-11-10 15:04:51 +01001642 if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1643 return;
1644
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001645 if (LIST_ISEMPTY(&agent->applet_wq))
1646 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1647 else {
1648 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1649 APPCTX_SPOE(appctx).ctx = ctx;
1650 ctx->appctx = appctx;
1651 LIST_DEL(&ctx->applet_wait);
1652 LIST_INIT(&ctx->applet_wait);
1653 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1654 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1655 " - wake up stream to get available SPOE applet\n",
1656 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1657 __FUNCTION__, ctx->strm);
1658 }
1659}
1660
1661/* A failure occurred during SPOE applet creation. */
1662static void
1663on_new_spoe_appctx_failure(struct spoe_agent *agent)
1664{
1665 struct spoe_context *ctx;
1666
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001667 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001668 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1669 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1670 " - wake up stream because to SPOE applet connection failed\n",
1671 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1672 __FUNCTION__, ctx->strm);
1673 }
1674}
1675
1676static void
1677on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1678{
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001679 offer_spoe_appctx(agent, appctx);
1680}
1681/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1682 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1683static int
1684acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1685{
1686 struct spoe_config *conf = FLT_CONF(ctx->filter);
1687 struct spoe_agent *agent = conf->agent;
1688 struct appctx *appctx;
1689
1690 /* If a process is already started for this SPOE context, retry
1691 * later. */
1692 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1693 goto wait;
1694
1695 /* If needed, initialize the buffer that will be used to encode messages
1696 * and decode actions. */
1697 if (ctx->buffer == &buf_empty) {
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001698 if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
1699 LIST_DEL(&ctx->buffer_wait.list);
1700 LIST_INIT(&ctx->buffer_wait.list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001701 }
1702
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001703 if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
1704 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001705 goto wait;
1706 }
1707 }
1708
1709 /* If the SPOE applet was already set, all is done. */
1710 if (ctx->appctx)
1711 goto success;
1712
1713 /* Else try to retrieve it from the agent cache */
1714 if (!LIST_ISEMPTY(&agent->cache)) {
1715 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1716 LIST_DEL(&APPCTX_SPOE(appctx).list);
1717 APPCTX_SPOE(appctx).ctx = ctx;
1718 ctx->appctx = appctx;
1719 goto success;
1720 }
1721
Christopher Faulet48026722016-11-16 15:01:12 +01001722 /* If there is no server up for the agent's backend, this is an
1723 * error. */
1724 if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001725 goto error;
1726
1727 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1728 " - waiting for available SPOE appctx\n",
1729 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1730 ctx->strm);
1731
1732 /* Else add the stream in the waiting queue. */
1733 if (LIST_ISEMPTY(&ctx->applet_wait))
1734 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1735
1736 /* Finally, create new SPOE applet if we can */
Christopher Faulet48026722016-11-16 15:01:12 +01001737 if (agent->cps_max > 0) {
1738 if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
1739 goto wait;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001740 }
Christopher Faulet48026722016-11-16 15:01:12 +01001741 if (create_spoe_appctx(conf) == NULL)
1742 goto error;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001743
1744 wait:
1745 return 0;
1746
1747 success:
1748 /* Remove the stream from the waiting queue */
1749 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1750 LIST_DEL(&ctx->applet_wait);
1751 LIST_INIT(&ctx->applet_wait);
1752 }
1753
1754 /* Set the right flag to prevent request and response processing
1755 * in same time. */
1756 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1757 ? SPOE_CTX_FL_REQ_PROCESS
1758 : SPOE_CTX_FL_RSP_PROCESS);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001759
1760 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1761 " - acquire SPOE appctx %p from cache\n",
1762 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1763 __FUNCTION__, ctx->strm, ctx->appctx);
1764 return 1;
1765
1766 error:
1767 /* Remove the stream from the waiting queue */
1768 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1769 LIST_DEL(&ctx->applet_wait);
1770 LIST_INIT(&ctx->applet_wait);
1771 }
1772
1773 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Faulet48026722016-11-16 15:01:12 +01001774 " - failed to acquire SPOE appctx\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001775 (int)now.tv_sec, (int)now.tv_usec, agent->id,
Christopher Faulet48026722016-11-16 15:01:12 +01001776 __FUNCTION__, ctx->strm);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001777 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1778
1779 return -1;
1780}
1781
1782/* Release a SPOE applet and push it in the agent cache. */
1783static void
1784release_spoe_appctx(struct spoe_context *ctx)
1785{
1786 struct spoe_config *conf = FLT_CONF(ctx->filter);
1787 struct spoe_agent *agent = conf->agent;
1788 struct appctx *appctx = ctx->appctx;
1789
1790 /* Reset the flag to allow next processing */
1791 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1792
Christopher Fauletf7a30922016-11-10 15:04:51 +01001793 /* Reset processing timer */
1794 ctx->process_exp = TICK_ETERNITY;
1795
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001796 /* Release the buffer if needed */
1797 if (ctx->buffer != &buf_empty) {
1798 b_free(&ctx->buffer);
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001799 offer_buffers(ctx, tasks_run_queue + applets_active_queue);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001800 }
1801
1802 /* If there is no SPOE applet, all is done */
1803 if (!appctx)
1804 return;
1805
1806 /* Else, reassign it or push it in the agent cache */
1807 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1808 " - release SPOE appctx %p\n",
1809 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1810 __FUNCTION__, ctx->strm, appctx);
1811
1812 APPCTX_SPOE(appctx).ctx = NULL;
1813 ctx->appctx = NULL;
1814 offer_spoe_appctx(agent, appctx);
1815}
1816
1817/***************************************************************************
1818 * Functions that process SPOE messages and actions
1819 **************************************************************************/
1820/* Process SPOE messages for a specific event. During the processing, it returns
1821 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1822 * is returned. */
1823static int
1824process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1825 struct list *messages, int dir)
1826{
1827 struct spoe_message *msg;
1828 struct sample *smp;
1829 struct spoe_arg *arg;
1830 char *p;
1831 size_t max_size;
1832 int off, flag, idx = 0;
1833
1834 /* Reserve 32 bytes from the frame Metadata */
1835 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1836
1837 b_reset(ctx->buffer);
1838 p = ctx->buffer->p;
1839
1840 /* Loop on messages */
1841 list_for_each_entry(msg, messages, list) {
1842 if (idx + msg->id_len + 1 > max_size)
1843 goto skip;
1844
1845 /* Set the message name */
1846 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1847
1848 /* Save offset where to store the number of arguments for this
1849 * message */
1850 off = idx++;
1851 p[off] = 0;
1852
1853 /* Loop on arguments */
1854 list_for_each_entry(arg, &msg->args, list) {
1855 p[off]++; /* Increment the number of arguments */
1856
1857 if (idx + arg->name_len + 1 > max_size)
1858 goto skip;
1859
1860 /* Encode the arguement name as a string. It can by NULL */
1861 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1862
1863 /* Fetch the arguement value */
1864 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1865 if (!smp) {
1866 /* If no value is available, set it to NULL */
1867 p[idx++] = SPOE_DATA_T_NULL;
1868 continue;
1869 }
1870
1871 /* Else, encode the arguement value */
1872 switch (smp->data.type) {
1873 case SMP_T_BOOL:
1874 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1875 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1876 break;
1877 case SMP_T_SINT:
1878 p[idx++] = SPOE_DATA_T_INT64;
1879 if (idx + 8 > max_size)
1880 goto skip;
1881 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1882 break;
1883 case SMP_T_IPV4:
1884 p[idx++] = SPOE_DATA_T_IPV4;
1885 if (idx + 4 > max_size)
1886 goto skip;
1887 memcpy(p+idx, &smp->data.u.ipv4, 4);
1888 idx += 4;
1889 break;
1890 case SMP_T_IPV6:
1891 p[idx++] = SPOE_DATA_T_IPV6;
1892 if (idx + 16 > max_size)
1893 goto skip;
1894 memcpy(p+idx, &smp->data.u.ipv6, 16);
1895 idx += 16;
1896 break;
1897 case SMP_T_STR:
1898 p[idx++] = SPOE_DATA_T_STR;
1899 if (idx + smp->data.u.str.len > max_size)
1900 goto skip;
1901 idx += encode_spoe_string(smp->data.u.str.str,
1902 smp->data.u.str.len,
1903 p+idx);
1904 break;
1905 case SMP_T_BIN:
1906 p[idx++] = SPOE_DATA_T_BIN;
1907 if (idx + smp->data.u.str.len > max_size)
1908 goto skip;
1909 idx += encode_spoe_string(smp->data.u.str.str,
1910 smp->data.u.str.len,
1911 p+idx);
1912 break;
1913 case SMP_T_METH:
1914 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1915 p[idx++] = SPOE_DATA_T_STR;
1916 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1917 goto skip;
1918 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1919 http_known_methods[smp->data.u.meth.meth].len,
1920 p+idx);
1921 }
1922 else {
1923 p[idx++] = SPOE_DATA_T_STR;
1924 if (idx + smp->data.u.str.len > max_size)
1925 goto skip;
1926 idx += encode_spoe_string(smp->data.u.meth.str.str,
1927 smp->data.u.meth.str.len,
1928 p+idx);
1929 }
1930 break;
1931 default:
1932 p[idx++] = SPOE_DATA_T_NULL;
1933 }
1934 }
1935 }
1936 ctx->buffer->i = idx;
1937 return 1;
1938
1939 skip:
1940 b_reset(ctx->buffer);
1941 return 0;
1942}
1943
1944/* Helper function to set a variable */
1945static void
1946set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1947 struct sample *smp)
1948{
1949 struct spoe_config *conf = FLT_CONF(ctx->filter);
1950 struct spoe_agent *agent = conf->agent;
1951 char varname[64];
1952
1953 memset(varname, 0, sizeof(varname));
1954 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1955 scope, agent->var_pfx, len, name);
1956 vars_set_by_name_ifexist(varname, len, smp);
1957}
1958
1959/* Helper function to unset a variable */
1960static void
1961unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1962 struct sample *smp)
1963{
1964 struct spoe_config *conf = FLT_CONF(ctx->filter);
1965 struct spoe_agent *agent = conf->agent;
1966 char varname[64];
1967
1968 memset(varname, 0, sizeof(varname));
1969 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1970 scope, agent->var_pfx, len, name);
1971 vars_unset_by_name_ifexist(varname, len, smp);
1972}
1973
1974
1975/* Process SPOE actions for a specific event. During the processing, it returns
1976 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1977 * is returned. */
1978static int
1979process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1980 enum spoe_event ev, int dir)
1981{
1982 char *p;
1983 size_t size;
1984 int off, i, idx = 0;
1985
1986 p = ctx->buffer->p;
1987 size = ctx->buffer->i;
1988
1989 while (idx < size) {
1990 char *str;
1991 uint64_t sz;
1992 struct sample smp;
1993 enum spoe_action_type type;
1994
1995 off = idx;
1996 if (idx+2 > size)
1997 goto skip;
1998
1999 type = p[idx++];
2000 switch (type) {
2001 case SPOE_ACT_T_SET_VAR: {
2002 char *scope;
2003
2004 if (p[idx++] != 3)
2005 goto skip_action;
2006
2007 switch (p[idx++]) {
2008 case SPOE_SCOPE_PROC: scope = "proc"; break;
2009 case SPOE_SCOPE_SESS: scope = "sess"; break;
2010 case SPOE_SCOPE_TXN : scope = "txn"; break;
2011 case SPOE_SCOPE_REQ : scope = "req"; break;
2012 case SPOE_SCOPE_RES : scope = "res"; break;
2013 default: goto skip;
2014 }
2015
2016 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2017 if (str == NULL)
2018 goto skip;
2019 memset(&smp, 0, sizeof(smp));
2020 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
Christopher Fauletb5cff602016-11-24 14:53:22 +01002021
2022 if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002023 goto skip;
Christopher Fauletb5cff602016-11-24 14:53:22 +01002024 idx += i;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002025
2026 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2027 " - set-var '%s.%s.%.*s'\n",
2028 (int)now.tv_sec, (int)now.tv_usec,
2029 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2030 __FUNCTION__, s, scope,
2031 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2032 (int)sz, str);
2033
2034 set_spoe_var(ctx, scope, str, sz, &smp);
2035 break;
2036 }
2037
2038 case SPOE_ACT_T_UNSET_VAR: {
2039 char *scope;
2040
2041 if (p[idx++] != 2)
2042 goto skip_action;
2043
2044 switch (p[idx++]) {
2045 case SPOE_SCOPE_PROC: scope = "proc"; break;
2046 case SPOE_SCOPE_SESS: scope = "sess"; break;
2047 case SPOE_SCOPE_TXN : scope = "txn"; break;
2048 case SPOE_SCOPE_REQ : scope = "req"; break;
2049 case SPOE_SCOPE_RES : scope = "res"; break;
2050 default: goto skip;
2051 }
2052
2053 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2054 if (str == NULL)
2055 goto skip;
2056 memset(&smp, 0, sizeof(smp));
2057 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2058
2059 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2060 " - unset-var '%s.%s.%.*s'\n",
2061 (int)now.tv_sec, (int)now.tv_usec,
2062 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2063 __FUNCTION__, s, scope,
2064 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2065 (int)sz, str);
2066
2067 unset_spoe_var(ctx, scope, str, sz, &smp);
2068 break;
2069 }
2070
2071 default:
2072 skip_action:
2073 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2074 goto skip;
2075 idx += i;
2076 }
2077 }
2078
2079 return 1;
2080 skip:
2081 return 0;
2082}
2083
2084
2085/* Process a SPOE event. First, this functions will process messages attached to
2086 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2087 * ACK frame to process corresponding actions. During all the processing, it
2088 * returns 0 and it returns 1 when the processing is finished. If an error
2089 * occurred, -1 is returned. */
2090static int
2091process_spoe_event(struct stream *s, struct spoe_context *ctx,
2092 enum spoe_event ev)
2093{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002094 struct spoe_config *conf = FLT_CONF(ctx->filter);
2095 struct spoe_agent *agent = conf->agent;
2096 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002097
2098 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2099 " - ctx-state=%s - event=%s\n",
2100 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002101 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002102 spoe_event_str[ev]);
2103
Christopher Faulet48026722016-11-16 15:01:12 +01002104 if (agent->eps_max > 0) {
2105 if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2106 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2107 " - skip event '%s': max EPS reached\n",
2108 (int)now.tv_sec, (int)now.tv_usec,
2109 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2110 goto skip;
2111 }
2112 }
2113
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002114 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2115
2116 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2117 goto out;
2118
2119 if (ctx->state == SPOE_CTX_ST_ERROR)
2120 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002121
2122 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2123 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2124 " - failed to process event '%s': timeout\n",
2125 (int)now.tv_sec, (int)now.tv_usec,
2126 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2127 send_log(ctx->strm->be, LOG_WARNING,
2128 "failed to process event '%s': timeout.\n",
2129 spoe_event_str[ev]);
2130 goto error;
2131 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002132
2133 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauletf7a30922016-11-10 15:04:51 +01002134 if (!tick_isset(ctx->process_exp)) {
2135 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2136 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2137 ctx->process_exp);
2138 }
2139
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002140 ret = acquire_spoe_appctx(ctx, dir);
2141 if (ret <= 0) {
2142 if (!ret)
2143 goto out;
2144 goto error;
2145 }
2146 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2147 }
2148
2149 if (ctx->appctx == NULL)
2150 goto error;
2151
2152 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2153 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2154 if (ret <= 0) {
2155 if (!ret)
2156 goto skip;
2157 goto error;
2158 }
2159 wakeup_spoe_appctx(ctx);
2160 ret = 0;
2161 goto out;
2162 }
2163
2164 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2165 wakeup_spoe_appctx(ctx);
2166 ret = 0;
2167 goto out;
2168 }
2169
2170 if (ctx->state == SPOE_CTX_ST_DONE) {
2171 ret = process_spoe_actions(s, ctx, ev, dir);
2172 if (ret <= 0) {
2173 if (!ret)
2174 goto skip;
2175 goto error;
2176 }
2177 ctx->frame_id++;
2178 release_spoe_appctx(ctx);
2179 ctx->state = SPOE_CTX_ST_READY;
2180 }
2181
2182 out:
2183 return ret;
2184
2185 skip:
2186 release_spoe_appctx(ctx);
2187 ctx->state = SPOE_CTX_ST_READY;
2188 return 1;
2189
2190 error:
Christopher Faulet48026722016-11-16 15:01:12 +01002191 if (agent->eps_max > 0)
2192 update_freq_ctr(&agent->err_per_sec, 1);
2193
Christopher Faulet985532d2016-11-16 15:36:19 +01002194 if (agent->var_on_error) {
2195 struct sample smp;
2196
2197 memset(&smp, 0, sizeof(smp));
2198 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2199 smp.data.u.sint = 1;
2200 smp.data.type = SMP_T_BOOL;
2201
2202 set_spoe_var(ctx, "txn", agent->var_on_error,
2203 strlen(agent->var_on_error), &smp);
2204 }
2205
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002206 release_spoe_appctx(ctx);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002207 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2208 ? SPOE_CTX_ST_READY
2209 : SPOE_CTX_ST_ERROR);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002210 return 1;
2211}
2212
2213
2214/***************************************************************************
2215 * Functions that create/destroy SPOE contexts
2216 **************************************************************************/
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002217static int wakeup_spoe_context(struct spoe_context *ctx)
2218{
2219 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
2220 return 1;
2221}
2222
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002223static struct spoe_context *
2224create_spoe_context(struct filter *filter)
2225{
2226 struct spoe_config *conf = FLT_CONF(filter);
2227 struct spoe_context *ctx;
2228
2229 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2230 if (ctx == NULL) {
2231 return NULL;
2232 }
2233 memset(ctx, 0, sizeof(*ctx));
2234 ctx->filter = filter;
2235 ctx->state = SPOE_CTX_ST_NONE;
2236 ctx->flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002237 ctx->messages = conf->agent->messages;
2238 ctx->buffer = &buf_empty;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002239 LIST_INIT(&ctx->buffer_wait.list);
2240 ctx->buffer_wait.target = ctx;
2241 ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002242 LIST_INIT(&ctx->applet_wait);
2243
Christopher Fauletf7a30922016-11-10 15:04:51 +01002244 ctx->stream_id = 0;
2245 ctx->frame_id = 1;
2246 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002247
2248 return ctx;
2249}
2250
2251static void
2252destroy_spoe_context(struct spoe_context *ctx)
2253{
2254 if (!ctx)
2255 return;
2256
2257 if (ctx->appctx)
2258 APPCTX_SPOE(ctx->appctx).ctx = NULL;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002259 if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
2260 LIST_DEL(&ctx->buffer_wait.list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002261 if (!LIST_ISEMPTY(&ctx->applet_wait))
2262 LIST_DEL(&ctx->applet_wait);
2263 pool_free2(pool2_spoe_ctx, ctx);
2264}
2265
2266static void
2267reset_spoe_context(struct spoe_context *ctx)
2268{
2269 ctx->state = SPOE_CTX_ST_READY;
2270 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2271}
2272
2273
2274/***************************************************************************
2275 * Hooks that manage the filter lifecycle (init/check/deinit)
2276 **************************************************************************/
2277/* Signal handler: Do a soft stop, wakeup SPOE applet */
2278static void
2279sig_stop_spoe(struct sig_handler *sh)
2280{
2281 struct proxy *p;
2282
2283 p = proxy;
2284 while (p) {
2285 struct flt_conf *fconf;
2286
2287 list_for_each_entry(fconf, &p->filter_configs, list) {
Christopher Faulet3b386a32017-02-23 10:17:15 +01002288 struct spoe_config *conf;
2289 struct spoe_agent *agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002290 struct appctx *appctx;
2291
Christopher Faulet3b386a32017-02-23 10:17:15 +01002292 if (fconf->id != spoe_filter_id)
2293 continue;
2294
2295 conf = fconf->conf;
2296 agent = conf->agent;
2297
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002298 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2299 si_applet_want_get(appctx->owner);
2300 si_applet_want_put(appctx->owner);
2301 appctx_wakeup(appctx);
2302 }
2303 }
2304 p = p->next;
2305 }
2306}
2307
2308
2309/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2310static int
2311spoe_init(struct proxy *px, struct flt_conf *fconf)
2312{
2313 struct spoe_config *conf = fconf->conf;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002314
2315 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2316 init_new_proxy(&conf->agent_fe);
2317 conf->agent_fe.parent = conf->agent;
2318 conf->agent_fe.last_change = now.tv_sec;
2319 conf->agent_fe.id = conf->agent->id;
2320 conf->agent_fe.cap = PR_CAP_FE;
2321 conf->agent_fe.mode = PR_MODE_TCP;
2322 conf->agent_fe.maxconn = 0;
2323 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2324 conf->agent_fe.conn_retries = CONN_RETRIES;
2325 conf->agent_fe.accept = frontend_accept;
2326 conf->agent_fe.srv = NULL;
2327 conf->agent_fe.timeout.client = TICK_ETERNITY;
2328 conf->agent_fe.default_target = &spoe_applet.obj_type;
2329 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2330
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002331 if (!sighandler_registered) {
2332 signal_register_fct(0, sig_stop_spoe, 0);
2333 sighandler_registered = 1;
2334 }
2335
2336 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002337}
2338
2339/* Free ressources allocated by the SPOE filter. */
2340static void
2341spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2342{
2343 struct spoe_config *conf = fconf->conf;
2344
2345 if (conf) {
2346 struct spoe_agent *agent = conf->agent;
2347 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2348 struct listener *, by_fe);
2349
2350 free(l);
2351 release_spoe_agent(agent);
2352 free(conf);
2353 }
2354 fconf->conf = NULL;
2355}
2356
2357/* Check configuration of a SPOE filter for a specified proxy.
2358 * Return 1 on error, else 0. */
2359static int
2360spoe_check(struct proxy *px, struct flt_conf *fconf)
2361{
2362 struct spoe_config *conf = fconf->conf;
2363 struct proxy *target;
2364
2365 target = proxy_be_by_name(conf->agent->b.name);
2366 if (target == NULL) {
2367 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2368 " declared at %s:%d.\n",
2369 px->id, conf->agent->b.name, conf->agent->id,
2370 conf->agent->conf.file, conf->agent->conf.line);
2371 return 1;
2372 }
2373 if (target->mode != PR_MODE_TCP) {
2374 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2375 " at %s:%d does not support HTTP mode.\n",
2376 px->id, target->id, conf->agent->id,
2377 conf->agent->conf.file, conf->agent->conf.line);
2378 return 1;
2379 }
2380
2381 free(conf->agent->b.name);
2382 conf->agent->b.name = NULL;
2383 conf->agent->b.be = target;
2384 return 0;
2385}
2386
2387/**************************************************************************
2388 * Hooks attached to a stream
2389 *************************************************************************/
2390/* Called when a filter instance is created and attach to a stream. It creates
2391 * the context that will be used to process this stream. */
2392static int
2393spoe_start(struct stream *s, struct filter *filter)
2394{
2395 struct spoe_context *ctx;
2396
2397 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2398 (int)now.tv_sec, (int)now.tv_usec,
2399 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2400 __FUNCTION__, s);
2401
2402 ctx = create_spoe_context(filter);
2403 if (ctx == NULL) {
2404 send_log(s->be, LOG_EMERG,
2405 "failed to create SPOE context for proxy %s\n",
2406 s->be->id);
2407 return 0;
2408 }
2409
2410 ctx->strm = s;
2411 ctx->state = SPOE_CTX_ST_READY;
2412 filter->ctx = ctx;
2413
2414 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2415 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2416
2417 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2418 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2419
2420 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2421 filter->pre_analyzers |= AN_RES_INSPECT;
2422
2423 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2424 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2425
2426 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2427 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2428
2429 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2430 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2431
2432 return 1;
2433}
2434
2435/* Called when a filter instance is detached from a stream. It release the
2436 * attached SPOE context. */
2437static void
2438spoe_stop(struct stream *s, struct filter *filter)
2439{
2440 struct spoe_context *ctx = filter->ctx;
2441
2442 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2443 (int)now.tv_sec, (int)now.tv_usec,
2444 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2445 __FUNCTION__, s);
2446
2447 if (ctx) {
2448 release_spoe_appctx(ctx);
2449 destroy_spoe_context(ctx);
2450 }
2451}
2452
Christopher Fauletf7a30922016-11-10 15:04:51 +01002453
2454/*
2455 * Called when the stream is woken up because of expired timer.
2456 */
2457static void
2458spoe_check_timeouts(struct stream *s, struct filter *filter)
2459{
2460 struct spoe_context *ctx = filter->ctx;
2461
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002462 if (tick_is_expired(ctx->process_exp, now_ms)) {
2463 s->pending_events |= TASK_WOKEN_MSG;
2464 if (ctx->buffer != &buf_empty) {
2465 b_free(&ctx->buffer);
2466 offer_buffers(ctx, tasks_run_queue + applets_active_queue);
2467 }
2468 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01002469}
2470
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002471/* Called when we are ready to filter data on a channel */
2472static int
2473spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2474{
2475 struct spoe_context *ctx = filter->ctx;
2476 int ret = 1;
2477
2478 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2479 " - ctx-flags=0x%08x\n",
2480 (int)now.tv_sec, (int)now.tv_usec,
2481 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2482 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2483
2484 if (!(chn->flags & CF_ISRESP)) {
2485 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2486 chn->analysers |= AN_REQ_INSPECT_FE;
2487 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2488 chn->analysers |= AN_REQ_INSPECT_BE;
2489
2490 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2491 goto out;
2492
2493 ctx->stream_id = s->uniq_id;
2494 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2495 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2496 if (ret != 1)
2497 goto out;
2498 }
2499 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2500 }
2501 else {
2502 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2503 chn->analysers |= AN_RES_INSPECT;
2504
2505 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2506 goto out;
2507
2508 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2509 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2510 if (ret != 1)
2511 goto out;
2512 }
2513 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2514 }
2515
2516 out:
2517 if (!ret) {
2518 channel_dont_read(chn);
2519 channel_dont_close(chn);
2520 }
2521 return ret;
2522}
2523
2524/* Called before a processing happens on a given channel */
2525static int
2526spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2527 struct channel *chn, unsigned an_bit)
2528{
2529 struct spoe_context *ctx = filter->ctx;
2530 int ret = 1;
2531
2532 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2533 " - ctx-flags=0x%08x - ana=0x%08x\n",
2534 (int)now.tv_sec, (int)now.tv_usec,
2535 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2536 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2537 ctx->flags, an_bit);
2538
2539 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2540 goto out;
2541
2542 switch (an_bit) {
2543 case AN_REQ_INSPECT_FE:
2544 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2545 break;
2546 case AN_REQ_INSPECT_BE:
2547 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2548 break;
2549 case AN_RES_INSPECT:
2550 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2551 break;
2552 case AN_REQ_HTTP_PROCESS_FE:
2553 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2554 break;
2555 case AN_REQ_HTTP_PROCESS_BE:
2556 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2557 break;
2558 case AN_RES_HTTP_PROCESS_FE:
2559 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2560 break;
2561 }
2562
2563 out:
2564 if (!ret) {
2565 channel_dont_read(chn);
2566 channel_dont_close(chn);
2567 }
2568 return ret;
2569}
2570
2571/* Called when the filtering on the channel ends. */
2572static int
2573spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2574{
2575 struct spoe_context *ctx = filter->ctx;
2576
2577 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2578 " - ctx-flags=0x%08x\n",
2579 (int)now.tv_sec, (int)now.tv_usec,
2580 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2581 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2582
2583 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2584 reset_spoe_context(ctx);
2585 }
2586
2587 return 1;
2588}
2589
2590/********************************************************************
2591 * Functions that manage the filter initialization
2592 ********************************************************************/
2593struct flt_ops spoe_ops = {
2594 /* Manage SPOE filter, called for each filter declaration */
2595 .init = spoe_init,
2596 .deinit = spoe_deinit,
2597 .check = spoe_check,
2598
2599 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002600 .attach = spoe_start,
2601 .detach = spoe_stop,
2602 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002603
2604 /* Handle channels activity */
2605 .channel_start_analyze = spoe_start_analyze,
2606 .channel_pre_analyze = spoe_chn_pre_analyze,
2607 .channel_end_analyze = spoe_end_analyze,
2608};
2609
2610
2611static int
2612cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2613{
2614 const char *err;
2615 int i, err_code = 0;
2616
2617 if ((cfg_scope == NULL && curengine != NULL) ||
2618 (cfg_scope != NULL && curengine == NULL) ||
2619 strcmp(curengine, cfg_scope))
2620 goto out;
2621
2622 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2623 if (!*args[1]) {
2624 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2625 file, linenum);
2626 err_code |= ERR_ALERT | ERR_ABORT;
2627 goto out;
2628 }
2629 if (*args[2]) {
2630 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2631 file, linenum, args[2]);
2632 err_code |= ERR_ALERT | ERR_ABORT;
2633 goto out;
2634 }
2635
2636 err = invalid_char(args[1]);
2637 if (err) {
2638 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2639 file, linenum, *err, args[0], args[1]);
2640 err_code |= ERR_ALERT | ERR_ABORT;
2641 goto out;
2642 }
2643
2644 if (curagent != NULL) {
2645 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2646 file, linenum);
2647 err_code |= ERR_ALERT | ERR_ABORT;
2648 goto out;
2649 }
2650 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2651 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2652 err_code |= ERR_ALERT | ERR_ABORT;
2653 goto out;
2654 }
2655
2656 curagent->id = strdup(args[1]);
2657 curagent->conf.file = strdup(file);
2658 curagent->conf.line = linenum;
2659 curagent->timeout.hello = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002660 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002661 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002662 curagent->var_pfx = NULL;
Christopher Faulet985532d2016-11-16 15:36:19 +01002663 curagent->var_on_error = NULL;
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002664 curagent->flags = 0;
Christopher Faulet48026722016-11-16 15:01:12 +01002665 curagent->cps_max = 0;
2666 curagent->eps_max = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002667
2668 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2669 LIST_INIT(&curagent->messages[i]);
2670 LIST_INIT(&curagent->cache);
2671 LIST_INIT(&curagent->applet_wq);
2672 }
2673 else if (!strcmp(args[0], "use-backend")) {
2674 if (!*args[1]) {
2675 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2676 file, linenum, args[0]);
2677 err_code |= ERR_ALERT | ERR_FATAL;
2678 goto out;
2679 }
2680 if (*args[2]) {
2681 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2682 file, linenum, args[2]);
2683 err_code |= ERR_ALERT | ERR_ABORT;
2684 goto out;
2685 }
2686 free(curagent->b.name);
2687 curagent->b.name = strdup(args[1]);
2688 }
2689 else if (!strcmp(args[0], "messages")) {
2690 int cur_arg = 1;
2691 while (*args[cur_arg]) {
2692 struct spoe_msg_placeholder *mp = NULL;
2693
2694 list_for_each_entry(mp, &curmps, list) {
2695 if (!strcmp(mp->id, args[cur_arg])) {
2696 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2697 file, linenum, args[cur_arg]);
2698 err_code |= ERR_ALERT | ERR_FATAL;
2699 goto out;
2700 }
2701 }
2702
2703 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2704 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2705 err_code |= ERR_ALERT | ERR_ABORT;
2706 goto out;
2707 }
2708 mp->id = strdup(args[cur_arg]);
2709 LIST_ADDQ(&curmps, &mp->list);
2710 cur_arg++;
2711 }
2712 }
2713 else if (!strcmp(args[0], "timeout")) {
2714 unsigned int *tv = NULL;
2715 const char *res;
2716 unsigned timeout;
2717
2718 if (!*args[1]) {
2719 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2720 file, linenum);
2721 err_code |= ERR_ALERT | ERR_FATAL;
2722 goto out;
2723 }
2724 if (!strcmp(args[1], "hello"))
2725 tv = &curagent->timeout.hello;
2726 else if (!strcmp(args[1], "idle"))
2727 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002728 else if (!strcmp(args[1], "processing"))
2729 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002730 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01002731 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002732 file, linenum, args[1]);
2733 err_code |= ERR_ALERT | ERR_FATAL;
2734 goto out;
2735 }
2736 if (!*args[2]) {
2737 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2738 file, linenum, args[1]);
2739 err_code |= ERR_ALERT | ERR_FATAL;
2740 goto out;
2741 }
2742 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2743 if (res) {
2744 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2745 file, linenum, *res, args[1]);
2746 err_code |= ERR_ALERT | ERR_ABORT;
2747 goto out;
2748 }
2749 if (*args[3]) {
2750 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2751 file, linenum, args[3]);
2752 err_code |= ERR_ALERT | ERR_ABORT;
2753 goto out;
2754 }
2755 *tv = MS_TO_TICKS(timeout);
2756 }
2757 else if (!strcmp(args[0], "option")) {
2758 if (!*args[1]) {
2759 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2760 file, linenum, args[0]);
2761 err_code |= ERR_ALERT | ERR_FATAL;
2762 goto out;
2763 }
2764 if (!strcmp(args[1], "var-prefix")) {
2765 char *tmp;
2766
2767 if (!*args[2]) {
2768 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2769 file, linenum, args[0],
2770 args[1]);
2771 err_code |= ERR_ALERT | ERR_FATAL;
2772 goto out;
2773 }
2774 tmp = args[2];
2775 while (*tmp) {
2776 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2777 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2778 file, linenum, args[0], args[1]);
2779 err_code |= ERR_ALERT | ERR_FATAL;
2780 goto out;
2781 }
2782 tmp++;
2783 }
2784 curagent->var_pfx = strdup(args[2]);
2785 }
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002786 else if (!strcmp(args[1], "continue-on-error")) {
2787 if (*args[2]) {
2788 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
Christopher Faulet48026722016-11-16 15:01:12 +01002789 file, linenum, args[2]);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002790 err_code |= ERR_ALERT | ERR_ABORT;
2791 goto out;
2792 }
2793 curagent->flags |= SPOE_FL_CONT_ON_ERR;
2794 }
Christopher Faulet985532d2016-11-16 15:36:19 +01002795 else if (!strcmp(args[1], "set-on-error")) {
2796 char *tmp;
2797
2798 if (!*args[2]) {
2799 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2800 file, linenum, args[0],
2801 args[1]);
2802 err_code |= ERR_ALERT | ERR_FATAL;
2803 goto out;
2804 }
2805 tmp = args[2];
2806 while (*tmp) {
2807 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2808 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2809 file, linenum, args[0], args[1]);
2810 err_code |= ERR_ALERT | ERR_FATAL;
2811 goto out;
2812 }
2813 tmp++;
2814 }
2815 curagent->var_on_error = strdup(args[2]);
2816 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002817 else {
2818 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2819 file, linenum, args[1]);
2820 err_code |= ERR_ALERT | ERR_FATAL;
2821 goto out;
2822 }
Christopher Faulet48026722016-11-16 15:01:12 +01002823 }
2824 else if (!strcmp(args[0], "maxconnrate")) {
2825 if (!*args[1]) {
2826 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2827 file, linenum, args[0]);
2828 err_code |= ERR_ALERT | ERR_FATAL;
2829 goto out;
2830 }
2831 if (*args[2]) {
2832 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2833 file, linenum, args[2]);
2834 err_code |= ERR_ALERT | ERR_ABORT;
2835 goto out;
2836 }
2837 curagent->cps_max = atol(args[1]);
2838 }
2839 else if (!strcmp(args[0], "maxerrrate")) {
2840 if (!*args[1]) {
2841 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2842 file, linenum, args[0]);
2843 err_code |= ERR_ALERT | ERR_FATAL;
2844 goto out;
2845 }
2846 if (*args[2]) {
2847 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2848 file, linenum, args[2]);
2849 err_code |= ERR_ALERT | ERR_ABORT;
2850 goto out;
2851 }
2852 curagent->eps_max = atol(args[1]);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002853 }
2854 else if (*args[0]) {
2855 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2856 file, linenum, args[0]);
2857 err_code |= ERR_ALERT | ERR_FATAL;
2858 goto out;
2859 }
2860 out:
2861 return err_code;
2862}
2863
2864static int
2865cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2866{
2867 struct spoe_message *msg;
2868 struct spoe_arg *arg;
2869 const char *err;
2870 char *errmsg = NULL;
2871 int err_code = 0;
2872
2873 if ((cfg_scope == NULL && curengine != NULL) ||
2874 (cfg_scope != NULL && curengine == NULL) ||
2875 strcmp(curengine, cfg_scope))
2876 goto out;
2877
2878 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2879 if (!*args[1]) {
2880 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2881 file, linenum);
2882 err_code |= ERR_ALERT | ERR_ABORT;
2883 goto out;
2884 }
2885 if (*args[2]) {
2886 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2887 file, linenum, args[2]);
2888 err_code |= ERR_ALERT | ERR_ABORT;
2889 goto out;
2890 }
2891
2892 err = invalid_char(args[1]);
2893 if (err) {
2894 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2895 file, linenum, *err, args[0], args[1]);
2896 err_code |= ERR_ALERT | ERR_ABORT;
2897 goto out;
2898 }
2899
2900 list_for_each_entry(msg, &curmsgs, list) {
2901 if (!strcmp(msg->id, args[1])) {
2902 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2903 " name as another one declared at %s:%d.\n",
2904 file, linenum, args[1], msg->conf.file, msg->conf.line);
2905 err_code |= ERR_ALERT | ERR_FATAL;
2906 goto out;
2907 }
2908 }
2909
2910 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2911 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2912 err_code |= ERR_ALERT | ERR_ABORT;
2913 goto out;
2914 }
2915
2916 curmsg->id = strdup(args[1]);
2917 curmsg->id_len = strlen(curmsg->id);
2918 curmsg->event = SPOE_EV_NONE;
2919 curmsg->conf.file = strdup(file);
2920 curmsg->conf.line = linenum;
2921 LIST_INIT(&curmsg->args);
2922 LIST_ADDQ(&curmsgs, &curmsg->list);
2923 }
2924 else if (!strcmp(args[0], "args")) {
2925 int cur_arg = 1;
2926
2927 curproxy->conf.args.ctx = ARGC_SPOE;
2928 curproxy->conf.args.file = file;
2929 curproxy->conf.args.line = linenum;
2930 while (*args[cur_arg]) {
2931 char *delim = strchr(args[cur_arg], '=');
2932 int idx = 0;
2933
2934 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2935 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2936 err_code |= ERR_ALERT | ERR_ABORT;
2937 goto out;
2938 }
2939
2940 if (!delim) {
2941 arg->name = NULL;
2942 arg->name_len = 0;
2943 delim = args[cur_arg];
2944 }
2945 else {
2946 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2947 arg->name_len = delim - args[cur_arg];
2948 delim++;
2949 }
Christopher Fauletb0b42382017-02-23 22:41:09 +01002950 arg->expr = sample_parse_expr((char*[]){delim, NULL},
2951 &idx, file, linenum, &errmsg,
2952 &curproxy->conf.args);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002953 if (arg->expr == NULL) {
2954 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2955 err_code |= ERR_ALERT | ERR_FATAL;
2956 free(arg->name);
2957 free(arg);
2958 goto out;
2959 }
2960 LIST_ADDQ(&curmsg->args, &arg->list);
2961 cur_arg++;
2962 }
2963 curproxy->conf.args.file = NULL;
2964 curproxy->conf.args.line = 0;
2965 }
2966 else if (!strcmp(args[0], "event")) {
2967 if (!*args[1]) {
2968 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2969 err_code |= ERR_ALERT | ERR_ABORT;
2970 goto out;
2971 }
2972 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2973 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2974 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2975 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2976
2977 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2978 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2979 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2980 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2981 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2982 curmsg->event = SPOE_EV_ON_TCP_RSP;
2983
2984 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2985 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2986 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2987 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2988 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2989 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2990 else {
2991 Alert("parsing [%s:%d] : unkown event '%s'.\n",
2992 file, linenum, args[1]);
2993 err_code |= ERR_ALERT | ERR_ABORT;
2994 goto out;
2995 }
2996 }
2997 else if (!*args[0]) {
2998 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
2999 file, linenum, args[0]);
3000 err_code |= ERR_ALERT | ERR_FATAL;
3001 goto out;
3002 }
3003 out:
3004 free(errmsg);
3005 return err_code;
3006}
3007
3008/* Return -1 on error, else 0 */
3009static int
3010parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
3011 struct flt_conf *fconf, char **err, void *private)
3012{
3013 struct list backup_sections;
3014 struct spoe_config *conf;
3015 struct spoe_message *msg, *msgback;
3016 struct spoe_msg_placeholder *mp, *mpback;
3017 char *file = NULL, *engine = NULL;
3018 int ret, pos = *cur_arg + 1;
3019
3020 conf = calloc(1, sizeof(*conf));
3021 if (conf == NULL) {
3022 memprintf(err, "%s: out of memory", args[*cur_arg]);
3023 goto error;
3024 }
3025 conf->proxy = px;
3026
3027 while (*args[pos]) {
3028 if (!strcmp(args[pos], "config")) {
3029 if (!*args[pos+1]) {
3030 memprintf(err, "'%s' : '%s' option without value",
3031 args[*cur_arg], args[pos]);
3032 goto error;
3033 }
3034 file = args[pos+1];
3035 pos += 2;
3036 }
3037 else if (!strcmp(args[pos], "engine")) {
3038 if (!*args[pos+1]) {
3039 memprintf(err, "'%s' : '%s' option without value",
3040 args[*cur_arg], args[pos]);
3041 goto error;
3042 }
3043 engine = args[pos+1];
3044 pos += 2;
3045 }
3046 else {
3047 memprintf(err, "unknown keyword '%s'", args[pos]);
3048 goto error;
3049 }
3050 }
3051 if (file == NULL) {
3052 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3053 goto error;
3054 }
3055
3056 /* backup sections and register SPOE sections */
3057 LIST_INIT(&backup_sections);
3058 cfg_backup_sections(&backup_sections);
3059 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
3060 cfg_register_section("spoe-message", cfg_parse_spoe_message);
3061
3062 /* Parse SPOE filter configuration file */
3063 curengine = engine;
3064 curproxy = px;
3065 curagent = NULL;
3066 curmsg = NULL;
3067 ret = readcfgfile(file);
3068 curproxy = NULL;
3069
3070 /* unregister SPOE sections and restore previous sections */
3071 cfg_unregister_sections();
3072 cfg_restore_sections(&backup_sections);
3073
3074 if (ret == -1) {
3075 memprintf(err, "Could not open configuration file %s : %s",
3076 file, strerror(errno));
3077 goto error;
3078 }
3079 if (ret & (ERR_ABORT|ERR_FATAL)) {
3080 memprintf(err, "Error(s) found in configuration file %s", file);
3081 goto error;
3082 }
3083
3084 /* Check SPOE agent */
3085 if (curagent == NULL) {
3086 memprintf(err, "No SPOE agent found in file %s", file);
3087 goto error;
3088 }
3089 if (curagent->b.name == NULL) {
3090 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3091 curagent->id, curagent->conf.file, curagent->conf.line);
3092 goto error;
3093 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003094 if (curagent->timeout.hello == TICK_ETERNITY ||
3095 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003096 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003097 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3098 " | While not properly invalid, you will certainly encounter various problems\n"
3099 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003100 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003101 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3102 }
3103 if (curagent->var_pfx == NULL) {
3104 char *tmp = curagent->id;
3105
3106 while (*tmp) {
3107 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3108 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3109 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3110 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3111 goto error;
3112 }
3113 tmp++;
3114 }
3115 curagent->var_pfx = strdup(curagent->id);
3116 }
3117
3118 if (LIST_ISEMPTY(&curmps)) {
3119 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3120 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3121 goto finish;
3122 }
3123
3124 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3125 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3126 if (!strcmp(msg->id, mp->id)) {
3127 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3128 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3129 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3130 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3131 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3132 }
3133 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3134 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3135 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3136 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3137 px->id, msg->conf.file, msg->conf.line);
3138 goto next;
3139 }
3140 if (msg->event == SPOE_EV_NONE) {
3141 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3142 px->id, msg->conf.file, msg->conf.line);
3143 goto next;
3144 }
3145 msg->agent = curagent;
3146 LIST_DEL(&msg->list);
3147 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3148 goto next;
3149 }
3150 }
3151 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3152 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3153 goto error;
3154 next:
3155 continue;
3156 }
3157
3158 finish:
3159 conf->agent = curagent;
3160 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3161 LIST_DEL(&mp->list);
3162 release_spoe_msg_placeholder(mp);
3163 }
3164 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3165 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3166 px->id, msg->id, msg->conf.file, msg->conf.line);
3167 LIST_DEL(&msg->list);
3168 release_spoe_message(msg);
3169 }
3170
3171 *cur_arg = pos;
Christopher Faulet3b386a32017-02-23 10:17:15 +01003172 fconf->id = spoe_filter_id;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003173 fconf->ops = &spoe_ops;
3174 fconf->conf = conf;
3175 return 0;
3176
3177 error:
3178 release_spoe_agent(curagent);
3179 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3180 LIST_DEL(&mp->list);
3181 release_spoe_msg_placeholder(mp);
3182 }
3183 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3184 LIST_DEL(&msg->list);
3185 release_spoe_message(msg);
3186 }
3187 free(conf);
3188 return -1;
3189}
3190
3191
3192/* Declare the filter parser for "spoe" keyword */
3193static struct flt_kw_list flt_kws = { "SPOE", { }, {
3194 { "spoe", parse_spoe_flt, NULL },
3195 { NULL, NULL, NULL },
3196 }
3197};
3198
3199__attribute__((constructor))
3200static void __spoe_init(void)
3201{
3202 flt_register_keywords(&flt_kws);
3203
3204 LIST_INIT(&curmsgs);
3205 LIST_INIT(&curmps);
3206 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3207}
3208
3209__attribute__((destructor))
3210static void
3211__spoe_deinit(void)
3212{
3213 pool_destroy2(pool2_spoe_ctx);
3214}