blob: 776848e487be86afd2148de0df27b59239a90f08 [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
Christopher Faulet48026722016-11-16 15:01:12 +010033#include <proto/freq_ctr.h>
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020034#include <proto/frontend.h>
35#include <proto/log.h>
36#include <proto/proto_http.h>
37#include <proto/proxy.h>
38#include <proto/sample.h>
39#include <proto/session.h>
40#include <proto/signal.h>
41#include <proto/stream.h>
42#include <proto/stream_interface.h>
43#include <proto/task.h>
44#include <proto/vars.h>
45
46#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47#define SPOE_PRINTF(x...) fprintf(x)
48#else
49#define SPOE_PRINTF(x...)
50#endif
51
52/* Helper to get ctx inside an appctx */
53#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
54
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020055/* Minimal size for a frame */
56#define MIN_FRAME_SIZE 256
57
Christopher Fauletea62c2a2016-11-14 10:54:21 +010058/* Flags set on the SPOE agent */
59#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
60
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020061/* Flags set on the SPOE context */
62#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
63#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
64#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
65#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
66
67#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
68
69#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
70#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
71
72/* All possible states for a SPOE context */
73enum spoe_ctx_state {
74 SPOE_CTX_ST_NONE = 0,
75 SPOE_CTX_ST_READY,
76 SPOE_CTX_ST_SENDING_MSGS,
77 SPOE_CTX_ST_WAITING_ACK,
78 SPOE_CTX_ST_DONE,
79 SPOE_CTX_ST_ERROR,
80};
81
82/* All possible states for a SPOE applet */
83enum spoe_appctx_state {
84 SPOE_APPCTX_ST_CONNECT = 0,
85 SPOE_APPCTX_ST_CONNECTING,
86 SPOE_APPCTX_ST_PROCESSING,
87 SPOE_APPCTX_ST_DISCONNECT,
88 SPOE_APPCTX_ST_DISCONNECTING,
89 SPOE_APPCTX_ST_EXIT,
90 SPOE_APPCTX_ST_END,
91};
92
93/* All supported SPOE actions */
94enum spoe_action_type {
95 SPOE_ACT_T_SET_VAR = 1,
96 SPOE_ACT_T_UNSET_VAR,
97 SPOE_ACT_TYPES,
98};
99
100/* All supported SPOE events */
101enum spoe_event {
102 SPOE_EV_NONE = 0,
103
104 /* Request events */
105 SPOE_EV_ON_CLIENT_SESS = 1,
106 SPOE_EV_ON_TCP_REQ_FE,
107 SPOE_EV_ON_TCP_REQ_BE,
108 SPOE_EV_ON_HTTP_REQ_FE,
109 SPOE_EV_ON_HTTP_REQ_BE,
110
111 /* Response events */
112 SPOE_EV_ON_SERVER_SESS,
113 SPOE_EV_ON_TCP_RSP,
114 SPOE_EV_ON_HTTP_RSP,
115
116 SPOE_EV_EVENTS
117};
118
119/* Errors triggerd by SPOE applet */
120enum spoe_frame_error {
121 SPOE_FRM_ERR_NONE = 0,
122 SPOE_FRM_ERR_IO,
123 SPOE_FRM_ERR_TOUT,
124 SPOE_FRM_ERR_TOO_BIG,
125 SPOE_FRM_ERR_INVALID,
126 SPOE_FRM_ERR_NO_VSN,
127 SPOE_FRM_ERR_NO_FRAME_SIZE,
128 SPOE_FRM_ERR_NO_CAP,
129 SPOE_FRM_ERR_BAD_VSN,
130 SPOE_FRM_ERR_BAD_FRAME_SIZE,
131 SPOE_FRM_ERR_UNKNOWN = 99,
132 SPOE_FRM_ERRS,
133};
134
135/* Scopes used for variables set by agents. It is a way to be agnotic to vars
136 * scope. */
137enum spoe_vars_scope {
138 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
139 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
140 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
141 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
142 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
143};
144
145
146/* Describe an argument that will be linked to a message. It is a sample fetch,
147 * with an optional name. */
148struct spoe_arg {
149 char *name; /* Name of the argument, may be NULL */
150 unsigned int name_len; /* The name length, 0 if NULL */
151 struct sample_expr *expr; /* Sample expression */
152 struct list list; /* Used to chain SPOE args */
153};
154
155/* Used during the config parsing only because, when a SPOE agent section is
156 * parsed, messages can be undefined. */
157struct spoe_msg_placeholder {
158 char *id; /* SPOE message placeholder id */
159 struct list list; /* Use to chain SPOE message placeholders */
160};
161
162/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
163 * an argument list (see above) and it is linked to a specific event. */
164struct spoe_message {
165 char *id; /* SPOE message id */
166 unsigned int id_len; /* The message id length */
167 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
168 struct {
169 char *file; /* file where the SPOE message appears */
170 int line; /* line where the SPOE message appears */
171 } conf; /* config information */
172 struct list args; /* Arguments added when the SPOE messages is sent */
173 struct list list; /* Used to chain SPOE messages */
174
175 enum spoe_event event; /* SPOE_EV_* */
176};
177
178/* Describe a SPOE agent. */
179struct spoe_agent {
180 char *id; /* SPOE agent id (name) */
181 struct {
182 char *file; /* file where the SPOE agent appears */
183 int line; /* line where the SPOE agent appears */
184 } conf; /* config information */
185 union {
186 struct proxy *be; /* Backend used by this agent */
187 char *name; /* Backend name used during conf parsing */
188 } b;
189 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100190 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
191 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100192 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200193 } timeout;
194
195 char *var_pfx; /* Prefix used for vars set by the agent */
Christopher Faulet985532d2016-11-16 15:36:19 +0100196 char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
Christopher Fauletea62c2a2016-11-14 10:54:21 +0100197 unsigned int flags; /* SPOE_FL_* */
Christopher Faulet48026722016-11-16 15:01:12 +0100198 unsigned int cps_max; /* Maximum number of connections per second */
199 unsigned int eps_max; /* Maximum number of errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200200
201 struct list cache; /* List used to cache SPOE streams. In
202 * fact, we cache the SPOE applect ctx */
203
204 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
205 * for each supported events */
206
207 struct list applet_wq; /* List of streams waiting for a SPOE applet */
Christopher Faulet48026722016-11-16 15:01:12 +0100208 struct freq_ctr conn_per_sec; /* connections per second */
209 struct freq_ctr err_per_sec; /* connetion errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200210};
211
212/* SPOE filter configuration */
213struct spoe_config {
214 struct proxy *proxy; /* Proxy owning the filter */
215 struct spoe_agent *agent; /* Agent used by this filter */
216 struct proxy agent_fe; /* Agent frontend */
217};
218
219/* SPOE context attached to a stream. It is the main structure that handles the
220 * processing offload */
221struct spoe_context {
222 struct filter *filter; /* The SPOE filter */
223 struct stream *strm; /* The stream that should be offloaded */
224 struct appctx *appctx; /* The SPOE appctx */
225 struct list *messages; /* List of messages that will be sent during the stream processing */
226 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
227 struct list buffer_wait; /* position in the list of streams waiting for a buffer */
228 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
229
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200230 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
231 unsigned int flags; /* SPOE_CTX_FL_* */
232
233 unsigned int stream_id; /* stream_id and frame_id are used */
234 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100235 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200236};
237
238/* Set if the handle on SIGUSR1 is registered */
239static int sighandler_registered = 0;
240
241/* proxy used during the parsing */
242struct proxy *curproxy = NULL;
243
244/* The name of the SPOE engine, used during the parsing */
245char *curengine = NULL;
246
247/* SPOE agent used during the parsing */
248struct spoe_agent *curagent = NULL;
249
250/* SPOE message used during the parsing */
251struct spoe_message *curmsg = NULL;
252
253/* list of SPOE messages and placeholders used during the parsing */
254struct list curmsgs;
255struct list curmps;
256
257/* Pool used to allocate new SPOE contexts */
258static struct pool_head *pool2_spoe_ctx = NULL;
259
260/* Temporary variables used to ease error processing */
261int spoe_status_code = SPOE_FRM_ERR_NONE;
262char spoe_reason[256];
263
264struct flt_ops spoe_ops;
265
266static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
267static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
268static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
269
270/********************************************************************
271 * helper functions/globals
272 ********************************************************************/
273static void
274release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
275{
276 if (!mp)
277 return;
278 free(mp->id);
279 free(mp);
280}
281
282
283static void
284release_spoe_message(struct spoe_message *msg)
285{
286 struct spoe_arg *arg, *back;
287
288 if (!msg)
289 return;
290 free(msg->id);
291 free(msg->conf.file);
292 list_for_each_entry_safe(arg, back, &msg->args, list) {
293 release_sample_expr(arg->expr);
294 free(arg->name);
295 LIST_DEL(&arg->list);
296 free(arg);
297 }
298 free(msg);
299}
300
301static void
302release_spoe_agent(struct spoe_agent *agent)
303{
304 struct spoe_message *msg, *back;
305 int i;
306
307 if (!agent)
308 return;
309 free(agent->id);
310 free(agent->conf.file);
311 free(agent->var_pfx);
Christopher Faulet985532d2016-11-16 15:36:19 +0100312 free(agent->var_on_error);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200313 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
314 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
315 LIST_DEL(&msg->list);
316 release_spoe_message(msg);
317 }
318 }
319 free(agent);
320}
321
322static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
323 [SPOE_FRM_ERR_NONE] = "normal",
324 [SPOE_FRM_ERR_IO] = "I/O error",
325 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
326 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
327 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
328 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
329 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
330 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
331 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
332 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
333 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
334};
335
336static const char *spoe_event_str[SPOE_EV_EVENTS] = {
337 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
338 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
339 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
340 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
341 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
342
343 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
344 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
345 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
346};
347
348
349#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
350
351static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
352 [SPOE_CTX_ST_NONE] = "NONE",
353 [SPOE_CTX_ST_READY] = "READY",
354 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
355 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
356 [SPOE_CTX_ST_DONE] = "DONE",
357 [SPOE_CTX_ST_ERROR] = "ERROR",
358};
359
360static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
361 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
362 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
363 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
364 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
365 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
366 [SPOE_APPCTX_ST_EXIT] = "EXIT",
367 [SPOE_APPCTX_ST_END] = "END",
368};
369
370#endif
371/********************************************************************
372 * Functions that encode/decode SPOE frames
373 ********************************************************************/
374/* Frame Types sent by HAProxy and by agents */
375enum spoe_frame_type {
376 /* Frames sent by HAProxy */
377 SPOE_FRM_T_HAPROXY_HELLO = 1,
378 SPOE_FRM_T_HAPROXY_DISCON,
379 SPOE_FRM_T_HAPROXY_NOTIFY,
380
381 /* Frames sent by the agents */
382 SPOE_FRM_T_AGENT_HELLO = 101,
383 SPOE_FRM_T_AGENT_DISCON,
384 SPOE_FRM_T_AGENT_ACK
385};
386
387/* All supported data types */
388enum spoe_data_type {
389 SPOE_DATA_T_NULL = 0,
390 SPOE_DATA_T_BOOL,
391 SPOE_DATA_T_INT32,
392 SPOE_DATA_T_UINT32,
393 SPOE_DATA_T_INT64,
394 SPOE_DATA_T_UINT64,
395 SPOE_DATA_T_IPV4,
396 SPOE_DATA_T_IPV6,
397 SPOE_DATA_T_STR,
398 SPOE_DATA_T_BIN,
399 SPOE_DATA_TYPES
400};
401
402/* Masks to get data type or flags value */
403#define SPOE_DATA_T_MASK 0x0F
404#define SPOE_DATA_FL_MASK 0xF0
405
406/* Flags to set Boolean values */
407#define SPOE_DATA_FL_FALSE 0x00
408#define SPOE_DATA_FL_TRUE 0x10
409
410/* Helper to get static string length, excluding the terminating null byte */
411#define SLEN(str) (sizeof(str)-1)
412
413/* Predefined key used in HELLO/DISCONNECT frames */
414#define SUPPORTED_VERSIONS_KEY "supported-versions"
415#define VERSION_KEY "version"
416#define MAX_FRAME_SIZE_KEY "max-frame-size"
417#define CAPABILITIES_KEY "capabilities"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100418#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200419#define STATUS_CODE_KEY "status-code"
420#define MSG_KEY "message"
421
422struct spoe_version {
423 char *str;
424 int min;
425 int max;
426};
427
428/* All supported versions */
429static struct spoe_version supported_versions[] = {
430 {"1.0", 1000, 1000},
431 {NULL, 0, 0}
432};
433
434/* Comma-separated list of supported versions */
435#define SUPPORTED_VERSIONS_VAL "1.0"
436
437/* Comma-separated list of supported capabilities (none for now) */
438#define CAPABILITIES_VAL ""
439
440static int
441decode_spoe_version(const char *str, size_t len)
442{
443 char tmp[len+1], *start, *end;
444 double d;
445 int vsn = -1;
446
447 memset(tmp, 0, len+1);
448 memcpy(tmp, str, len);
449
450 start = tmp;
451 while (isspace(*start))
452 start++;
453
454 d = strtod(start, &end);
455 if (d == 0 || start == end)
456 goto out;
457
458 if (*end) {
459 while (isspace(*end))
460 end++;
461 if (*end)
462 goto out;
463 }
464 vsn = (int)(d * 1000);
465 out:
466 return vsn;
467}
468
469/* Encode a variable-length integer. This function never fails and returns the
470 * number of written bytes. */
471static int
472encode_spoe_varint(uint64_t i, char *buf)
473{
474 int idx;
475
476 if (i < 240) {
477 buf[0] = (unsigned char)i;
478 return 1;
479 }
480
481 buf[0] = (unsigned char)i | 240;
482 i = (i - 240) >> 4;
483 for (idx = 1; i >= 128; ++idx) {
484 buf[idx] = (unsigned char)i | 128;
485 i = (i - 128) >> 7;
486 }
487 buf[idx++] = (unsigned char)i;
488 return idx;
489}
490
491/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
492 * happens when the buffer's end in reached. On success, the number of read
493 * bytes is returned. */
494static int
495decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
496{
497 unsigned char *msg = (unsigned char *)buf;
498 int idx = 0;
499
500 if (msg > (unsigned char *)end)
501 return -1;
502
503 if (msg[0] < 240) {
504 *i = msg[0];
505 return 1;
506 }
507 *i = msg[0];
508 do {
509 ++idx;
510 if (msg+idx > (unsigned char *)end)
511 return -1;
512 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
513 } while (msg[idx] >= 128);
514 return (idx + 1);
515}
516
517/* Encode a string. The string will be prefix by its length, encoded as a
518 * variable-length integer. This function never fails and returns the number of
519 * written bytes. */
520static int
521encode_spoe_string(const char *str, size_t len, char *dst)
522{
523 int idx = 0;
524
525 if (!len) {
526 dst[0] = 0;
527 return 1;
528 }
529
530 idx += encode_spoe_varint(len, dst);
531 memcpy(dst+idx, str, len);
532 return (idx + len);
533}
534
535/* Decode a string. Its length is decoded first as a variable-length integer. If
536 * it succeeds, and if the string length is valid, the begin of the string is
537 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
538 * read is returned. If an error occurred, -1 is returned and <*str> remains
539 * NULL. */
540static int
541decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
542{
543 int i, idx = 0;
544
545 *str = NULL;
546 *len = 0;
547
548 if ((i = decode_spoe_varint(buf, end, len)) == -1)
549 goto error;
550 idx += i;
551 if (buf + idx + *len > end)
552 goto error;
553
554 *str = buf+idx;
555 return (idx + *len);
556
557 error:
558 return -1;
559}
560
561/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
562 * of bytes read is returned. A types data is composed of a type (1 byte) and
563 * corresponding data:
564 * - boolean: non additional data (0 bytes)
565 * - integers: a variable-length integer (see decode_spoe_varint)
566 * - ipv4: 4 bytes
567 * - ipv6: 16 bytes
568 * - binary and string: a buffer prefixed by its size, a variable-length
569 * integer (see decode_spoe_string) */
570static int
571skip_spoe_data(char *frame, char *end)
572{
573 uint64_t sz = 0;
574 int i, idx = 0;
575
576 if (frame > end)
577 return -1;
578
579 switch (frame[idx++] & SPOE_DATA_T_MASK) {
580 case SPOE_DATA_T_BOOL:
581 break;
582 case SPOE_DATA_T_INT32:
583 case SPOE_DATA_T_INT64:
584 case SPOE_DATA_T_UINT32:
585 case SPOE_DATA_T_UINT64:
586 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
587 return -1;
588 idx += i;
589 break;
590 case SPOE_DATA_T_IPV4:
591 idx += 4;
592 break;
593 case SPOE_DATA_T_IPV6:
594 idx += 16;
595 break;
596 case SPOE_DATA_T_STR:
597 case SPOE_DATA_T_BIN:
598 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
599 return -1;
600 idx += i + sz;
601 break;
602 }
603
604 if (frame+idx > end)
605 return -1;
606 return idx;
607}
608
609/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
610 * number of read bytes is returned. See skip_spoe_data for details. */
611static int
612decode_spoe_data(char *frame, char *end, struct sample *smp)
613{
614 uint64_t sz = 0;
615 int type, i, idx = 0;
616
617 if (frame > end)
618 return -1;
619
620 type = frame[idx++];
621 switch (type & SPOE_DATA_T_MASK) {
622 case SPOE_DATA_T_BOOL:
623 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
624 smp->data.type = SMP_T_BOOL;
625 break;
626 case SPOE_DATA_T_INT32:
627 case SPOE_DATA_T_INT64:
628 case SPOE_DATA_T_UINT32:
629 case SPOE_DATA_T_UINT64:
630 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
631 return -1;
632 idx += i;
633 smp->data.type = SMP_T_SINT;
634 break;
635 case SPOE_DATA_T_IPV4:
636 if (frame+idx+4 > end)
637 return -1;
638 memcpy(&smp->data.u.ipv4, frame+idx, 4);
639 smp->data.type = SMP_T_IPV4;
640 idx += 4;
641 break;
642 case SPOE_DATA_T_IPV6:
643 if (frame+idx+16 > end)
644 return -1;
645 memcpy(&smp->data.u.ipv6, frame+idx, 16);
646 smp->data.type = SMP_T_IPV6;
647 idx += 16;
648 break;
649 case SPOE_DATA_T_STR:
650 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
651 return -1;
652 idx += i;
653 if (frame+idx+sz > end)
654 return -1;
655 smp->data.u.str.str = frame+idx;
656 smp->data.u.str.len = sz;
657 smp->data.type = SMP_T_STR;
658 idx += sz;
659 break;
660 case SPOE_DATA_T_BIN:
661 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
662 return -1;
663 idx += i;
664 if (frame+idx+sz > end)
665 return -1;
666 smp->data.u.str.str = frame+idx;
667 smp->data.u.str.len = sz;
668 smp->data.type = SMP_T_BIN;
669 idx += sz;
670 break;
671 }
672
673 if (frame+idx > end)
674 return -1;
675 return idx;
676}
677
678/* Skip an action in a frame received from an agent. If an error occurred, -1 is
679 * returned, otherwise the number of read bytes is returned. An action is
680 * composed of the action type followed by a typed data. */
681static int
682skip_spoe_action(char *frame, char *end)
683{
684 int n, i, idx = 0;
685
686 if (frame+2 > end)
687 return -1;
688
689 idx++; /* Skip the action type */
690 n = frame[idx++];
691 while (n-- > 0) {
692 if ((i = skip_spoe_data(frame+idx, end)) == -1)
693 return -1;
694 idx += i;
695 }
696
697 if (frame+idx > end)
698 return -1;
699 return idx;
700}
701
702/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
703 * success, 0 if the frame can be ignored and -1 if an error occurred. */
704static int
705prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
706{
707 int idx = 0;
708 size_t max = (7 /* TYPE + METADATA */
709 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
710 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
711 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
712
713 if (size < max)
714 return -1;
715
716 /* Frame type */
717 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
718
719 /* No flags for now */
720 memset(frame+idx, 0, 4);
721 idx += 4;
722
723 /* No stream-id and frame-id for HELLO frames */
724 frame[idx++] = 0;
725 frame[idx++] = 0;
726
727 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
728 * and "capabilities" */
729
730 /* "supported-versions" K/V item */
731 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
732 frame[idx++] = SPOE_DATA_T_STR;
733 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
734
735 /* "max-fram-size" K/V item */
736 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
737 frame[idx++] = SPOE_DATA_T_UINT32;
738 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
739
740 /* "capabilities" K/V item */
741 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
742 frame[idx++] = SPOE_DATA_T_STR;
743 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
744
745 return idx;
746}
747
748/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
749 * size on success, 0 if the frame can be ignored and -1 if an error
750 * occurred. */
751static int
752prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
753{
754 const char *reason;
755 int rlen, idx = 0;
756 size_t max = (7 /* TYPE + METADATA */
757 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
758 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
759
760 if (size < max)
761 return -1;
762
763 /* Get the message corresponding to the status code */
764 if (spoe_status_code >= SPOE_FRM_ERRS)
765 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
766 reason = spoe_frm_err_reasons[spoe_status_code];
767 rlen = strlen(reason);
768
769 /* Frame type */
770 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
771
772 /* No flags for now */
773 memset(frame+idx, 0, 4);
774 idx += 4;
775
776 /* No stream-id and frame-id for DISCONNECT frames */
777 frame[idx++] = 0;
778 frame[idx++] = 0;
779
780 /* There are 2 mandatory items: "status-code" and "message" */
781
782 /* "status-code" K/V item */
783 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
784 frame[idx++] = SPOE_DATA_T_UINT32;
785 idx += encode_spoe_varint(spoe_status_code, frame+idx);
786
787 /* "message" K/V item */
788 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
789 frame[idx++] = SPOE_DATA_T_STR;
790 idx += encode_spoe_string(reason, rlen, frame+idx);
791
792 return idx;
793}
794
795/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
796 * success, 0 if the frame can be ignored and -1 if an error occurred. */
797static int
798prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
799{
800 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
801 int idx = 0;
802
803 if (size < APPCTX_SPOE(appctx).max_frame_size)
804 return -1;
805
806 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
807
808 /* No flags for now */
809 memset(frame+idx, 0, 4);
810 idx += 4;
811
812 /* Set stream-id and frame-id */
813 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
814 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
815
816 /* Copy encoded messages */
817 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
818 idx += ctx->buffer->i;
819
820 return idx;
821}
822
823/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
824 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
825static int
826handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
827{
828 int vsn, max_frame_size;
829 int i, idx = 0;
830 size_t min_size = (7 /* TYPE + METADATA */
831 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
832 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
833 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
834
835 /* Check frame type */
836 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
837 return 0;
838
839 if (size < min_size) {
840 spoe_status_code = SPOE_FRM_ERR_INVALID;
841 return -1;
842 }
843
844 /* Skip flags: fragmentation is not supported for now */
845 idx += 4;
846
847 /* stream-id and frame-id must be cleared */
848 if (frame[idx] != 0 || frame[idx+1] != 0) {
849 spoe_status_code = SPOE_FRM_ERR_INVALID;
850 return -1;
851 }
852 idx += 2;
853
854 /* There are 3 mandatory items: "version", "max-frame-size" and
855 * "capabilities" */
856
857 /* Loop on K/V items */
858 vsn = max_frame_size = 0;
859 while (idx < size) {
860 char *str;
861 uint64_t sz;
862
863 /* Decode the item key */
864 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
865 if (str == NULL) {
866 spoe_status_code = SPOE_FRM_ERR_INVALID;
867 return -1;
868 }
869 /* Check "version" K/V item */
870 if (!memcmp(str, VERSION_KEY, sz)) {
871 /* The value must be a string */
872 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
873 spoe_status_code = SPOE_FRM_ERR_INVALID;
874 return -1;
875 }
876 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
877 if (str == NULL) {
878 spoe_status_code = SPOE_FRM_ERR_INVALID;
879 return -1;
880 }
881
882 vsn = decode_spoe_version(str, sz);
883 if (vsn == -1) {
884 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
885 return -1;
886 }
887 for (i = 0; supported_versions[i].str != NULL; ++i) {
888 if (vsn >= supported_versions[i].min &&
889 vsn <= supported_versions[i].max)
890 break;
891 }
892 if (supported_versions[i].str == NULL) {
893 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
894 return -1;
895 }
896 }
897 /* Check "max-frame-size" K/V item */
898 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
899 int type;
900
901 /* The value must be integer */
902 type = frame[idx++];
903 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
904 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
905 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
906 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
907 spoe_status_code = SPOE_FRM_ERR_INVALID;
908 return -1;
909 }
910 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
911 spoe_status_code = SPOE_FRM_ERR_INVALID;
912 return -1;
913 }
914 idx += i;
915 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
916 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
917 return -1;
918 }
919 max_frame_size = sz;
920 }
921 /* Skip "capabilities" K/V item for now */
922 else {
923 /* Silently ignore unknown item */
924 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
925 spoe_status_code = SPOE_FRM_ERR_INVALID;
926 return -1;
927 }
928 idx += i;
929 }
930 }
931
932 /* Final checks */
933 if (!vsn) {
934 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
935 return -1;
936 }
937 if (!max_frame_size) {
938 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
939 return -1;
940 }
941
942 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
943 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
944 return idx;
945}
946
947/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
948 * bytes on success, 0 if the frame can be ignored and -1 if an error
949 * occurred. */
950static int
951handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
952{
953 int i, idx = 0;
954 size_t min_size = (7 /* TYPE + METADATA */
955 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
956 + 1 + SLEN(MSG_KEY) + 1 + 1);
957
958 /* Check frame type */
959 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
960 return 0;
961
962 if (size < min_size) {
963 spoe_status_code = SPOE_FRM_ERR_INVALID;
964 return -1;
965 }
966
967 /* Skip flags: fragmentation is not supported for now */
968 idx += 4;
969
970 /* stream-id and frame-id must be cleared */
971 if (frame[idx] != 0 || frame[idx+1] != 0) {
972 spoe_status_code = SPOE_FRM_ERR_INVALID;
973 return -1;
974 }
975 idx += 2;
976
977 /* There are 2 mandatory items: "status-code" and "message" */
978
979 /* Loop on K/V items */
980 while (idx < size) {
981 char *str;
982 uint64_t sz;
983
984 /* Decode the item key */
985 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
986 if (str == NULL) {
987 spoe_status_code = SPOE_FRM_ERR_INVALID;
988 return -1;
989 }
990
991 /* Check "status-code" K/V item */
992 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
993 int type;
994
995 /* The value must be an integer */
996 type = frame[idx++];
997 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
998 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
999 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1000 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1001 spoe_status_code = SPOE_FRM_ERR_INVALID;
1002 return -1;
1003 }
1004 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1005 spoe_status_code = SPOE_FRM_ERR_INVALID;
1006 return -1;
1007 }
1008 idx += i;
1009 spoe_status_code = sz;
1010 }
1011
1012 /* Check "message" K/V item */
1013 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1014 /* The value must be a string */
1015 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1016 spoe_status_code = SPOE_FRM_ERR_INVALID;
1017 return -1;
1018 }
1019 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1020 if (str == NULL || sz > 255) {
1021 spoe_status_code = SPOE_FRM_ERR_INVALID;
1022 return -1;
1023 }
1024 memcpy(spoe_reason, str, sz);
1025 spoe_reason[sz] = 0;
1026 }
1027 else {
1028 /* Silently ignore unknown item */
1029 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1030 spoe_status_code = SPOE_FRM_ERR_INVALID;
1031 return -1;
1032 }
1033 idx += i;
1034 }
1035 }
1036
1037 return idx;
1038}
1039
1040
1041/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1042 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1043static int
1044handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1045{
1046 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1047 uint64_t stream_id, frame_id;
1048 int idx = 0;
1049 size_t min_size = (7 /* TYPE + METADATA */);
1050
1051 /* Check frame type */
1052 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1053 return 0;
1054
1055 if (size < min_size) {
1056 spoe_status_code = SPOE_FRM_ERR_INVALID;
1057 return -1;
1058 }
1059
1060 /* Skip flags: fragmentation is not supported for now */
1061 idx += 4;
1062
1063 /* Get the stream-id and the frame-id */
1064 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1065 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1066
1067 /* Check stream-id and frame-id */
1068 if (ctx->stream_id != (unsigned int)stream_id ||
1069 ctx->frame_id != (unsigned int)frame_id)
1070 return 0;
1071
1072 /* Copy encoded actions */
1073 b_reset(ctx->buffer);
1074 memcpy(ctx->buffer->p, frame+idx, size-idx);
1075 ctx->buffer->i = size-idx;
1076
1077 return idx;
1078}
1079
Christopher Fauletba7bc162016-11-07 21:07:38 +01001080/* This function is used in cfgparse.c and declared in proto/checks.h. It
1081 * prepare the request to send to agents during a healthcheck. It returns 0 on
1082 * success and -1 if an error occurred. */
1083int
1084prepare_spoe_healthcheck_request(char **req, int *len)
1085{
1086 struct appctx a;
1087 char *frame, buf[global.tune.bufsize];
1088 unsigned int framesz;
1089 int idx;
1090
1091 memset(&a, 0, sizeof(a));
1092 memset(buf, 0, sizeof(buf));
1093 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1094
1095 frame = buf+4;
1096 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1097 if (idx <= 0)
1098 return -1;
1099 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1100 return -1;
1101
1102 /* "healthcheck" K/V item */
1103 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1104 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1105
1106 framesz = htonl(idx);
1107 memcpy(buf, (char *)&framesz, 4);
1108
1109 if ((*req = malloc(idx+4)) == NULL)
1110 return -1;
1111 memcpy(*req, buf, idx+4);
1112 *len = idx+4;
1113 return 0;
1114}
1115
1116/* This function is used in checks.c and declared in proto/checks.h. It decode
1117 * the response received from an agent during a healthcheck. It returns 0 on
1118 * success and -1 if an error occurred. */
1119int
1120handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1121{
1122 struct appctx a;
1123 int r;
1124
1125 memset(&a, 0, sizeof(a));
1126 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1127
1128 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1129 goto error;
1130 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1131 if (r == 0)
1132 spoe_status_code = SPOE_FRM_ERR_INVALID;
1133 goto error;
1134 }
1135
1136 return 0;
1137
1138 error:
1139 if (spoe_status_code >= SPOE_FRM_ERRS)
1140 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1141 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1142 return -1;
1143}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001144
1145/********************************************************************
1146 * Functions that manage the SPOE applet
1147 ********************************************************************/
1148/* Callback function that catches applet timeouts. If a timeout occurred, we set
1149 * <appctx->st1> flag and the SPOE applet is woken up. */
1150static struct task *
1151process_spoe_applet(struct task * task)
1152{
1153 struct appctx *appctx = task->context;
1154
1155 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1156 if (tick_is_expired(task->expire, now_ms)) {
1157 task->expire = TICK_ETERNITY;
1158 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1159 }
1160 si_applet_want_get(appctx->owner);
1161 appctx_wakeup(appctx);
1162 return task;
1163}
1164
1165/* Remove a SPOE applet from the agent cache */
1166static void
1167remove_spoe_applet_from_cache(struct appctx *appctx)
1168{
1169 struct appctx *a, *back;
1170 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1171
1172 if (LIST_ISEMPTY(&agent->cache))
1173 return;
1174
1175 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1176 if (a == appctx) {
1177 LIST_DEL(&APPCTX_SPOE(appctx).list);
1178 break;
1179 }
1180 }
1181}
1182
1183
1184/* Callback function that releases a SPOE applet. This happens when the
1185 * connection with the agent is closed. */
1186static void
1187release_spoe_applet(struct appctx *appctx)
1188{
1189 struct stream_interface *si = appctx->owner;
1190 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1191 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1192
1193 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1194 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1195 on_new_spoe_appctx_failure(agent);
1196
1197 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1198 si_shutw(si);
1199 si_shutr(si);
1200 si_ic(si)->flags |= CF_READ_NULL;
1201 appctx->st0 = SPOE_APPCTX_ST_END;
1202 }
1203
1204 if (ctx != NULL) {
1205 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1206 ctx->appctx = NULL;
1207 }
1208
1209 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1210 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1211 __FUNCTION__, appctx);
1212
1213 /* Release the task attached to the SPOE applet */
1214 if (APPCTX_SPOE(appctx).task) {
1215 task_delete(APPCTX_SPOE(appctx).task);
1216 task_free(APPCTX_SPOE(appctx).task);
1217 }
1218
1219 /* And remove it from the agent cache */
1220 remove_spoe_applet_from_cache(appctx);
1221 APPCTX_SPOE(appctx).ctx = NULL;
1222}
1223
1224/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1225 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1226 * encoded using the callback function <prepare>. */
1227static int
1228send_spoe_frame(struct appctx *appctx,
1229 int (*prepare)(struct appctx *, char *, size_t))
1230{
1231 struct stream_interface *si = appctx->owner;
1232 int framesz, ret;
1233 uint32_t netint;
1234
1235 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1236 if (ret <= 0)
1237 goto skip_or_error;
1238 framesz = ret;
1239 netint = htonl(framesz);
1240 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1241 if (ret > 0)
1242 ret = bi_putblk(si_ic(si), trash.str, framesz);
1243 if (ret <= 0) {
1244 if (ret == -1)
1245 return -1;
1246 return -2;
1247 }
1248 return 1;
1249
1250 skip_or_error:
1251 if (!ret)
1252 return -1;
1253 return -2;
1254}
1255
1256/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1257 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1258 * is decoded using the callback function <handle>. */
1259static int
1260recv_spoe_frame(struct appctx *appctx,
1261 int (*handle)(struct appctx *, char *, size_t))
1262{
1263 struct stream_interface *si = appctx->owner;
1264 int framesz, ret;
1265 uint32_t netint;
1266
1267 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1268 if (ret <= 0)
1269 goto empty_or_error;
1270 framesz = ntohl(netint);
1271 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1272 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1273 return -2;
1274 }
1275
1276 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1277 if (ret <= 0)
1278 goto empty_or_error;
1279 bo_skip(si_oc(si), ret+sizeof(netint));
1280
1281 /* First check if the received frame is a DISCONNECT frame */
1282 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1283 if (ret != 0) {
1284 if (ret > 0) {
1285 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1286 " - disconnected by peer (%d): %s\n",
1287 (int)now.tv_sec, (int)now.tv_usec,
1288 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1289 __FUNCTION__, appctx, spoe_status_code,
1290 spoe_reason);
1291 return 2;
1292 }
1293 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1294 " - error on frame (%s)\n",
1295 (int)now.tv_sec, (int)now.tv_usec,
1296 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1297 __FUNCTION__, appctx,
1298 spoe_frm_err_reasons[spoe_status_code]);
1299 return -2;
1300 }
1301 if (handle == NULL)
1302 goto out;
1303
1304 /* If not, try to decode it */
1305 ret = handle(appctx, trash.str, framesz);
1306 if (ret <= 0) {
1307 if (!ret)
1308 return -1;
1309 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1310 " - error on frame (%s)\n",
1311 (int)now.tv_sec, (int)now.tv_usec,
1312 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1313 __FUNCTION__, appctx,
1314 spoe_frm_err_reasons[spoe_status_code]);
1315 return -2;
1316 }
1317 out:
1318 return 1;
1319
1320 empty_or_error:
1321 if (!ret)
1322 return 0;
1323 spoe_status_code = SPOE_FRM_ERR_IO;
1324 return -2;
1325}
1326
1327/* I/O Handler processing messages exchanged with the agent */
1328static void
1329handle_spoe_applet(struct appctx *appctx)
1330{
1331 struct stream_interface *si = appctx->owner;
1332 struct stream *s = si_strm(si);
1333 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1334 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1335 int ret;
1336
1337 switchstate:
1338 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1339 " - appctx-state=%s\n",
1340 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1341 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1342
1343 switch (appctx->st0) {
1344 case SPOE_APPCTX_ST_CONNECT:
1345 spoe_status_code = SPOE_FRM_ERR_NONE;
1346 if (si->state <= SI_ST_CON) {
1347 si_applet_want_put(si);
1348 task_wakeup(s->task, TASK_WOKEN_MSG);
1349 break;
1350 }
1351 else if (si->state != SI_ST_EST) {
1352 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1353 on_new_spoe_appctx_failure(agent);
1354 goto switchstate;
1355 }
1356 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1357 if (ret < 0) {
1358 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1359 on_new_spoe_appctx_failure(agent);
1360 goto switchstate;
1361 }
1362 else if (!ret)
1363 goto full;
1364
1365 /* Hello frame was sent. Set the hello timeout and
1366 * wait for the reply. */
1367 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1368 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1369 /* fall through */
1370
1371 case SPOE_APPCTX_ST_CONNECTING:
1372 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1373 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1374 on_new_spoe_appctx_failure(agent);
1375 goto switchstate;
1376 }
1377 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1378 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1379 " - Connection timed out\n",
1380 (int)now.tv_sec, (int)now.tv_usec,
1381 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1382 __FUNCTION__, appctx);
1383 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1384 on_new_spoe_appctx_failure(agent);
1385 goto switchstate;
1386 }
1387 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1388 if (ret < 0) {
1389 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1390 on_new_spoe_appctx_failure(agent);
1391 goto switchstate;
1392 }
1393 if (ret == 2) {
1394 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1395 on_new_spoe_appctx_failure(agent);
1396 goto switchstate;
1397 }
1398 if (!ret)
1399 goto out;
1400
1401 /* hello handshake is finished, set the idle timeout,
1402 * Add the appctx in the agent cache, decrease the
1403 * number of new applets and wake up waiting streams. */
1404 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1405 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1406 on_new_spoe_appctx_success(agent, appctx);
1407 break;
1408
1409 case SPOE_APPCTX_ST_PROCESSING:
1410 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1411 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1412 goto switchstate;
1413 }
1414 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1415 spoe_status_code = SPOE_FRM_ERR_TOUT;
1416 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1417 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1418 goto switchstate;
1419 }
1420 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1421 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1422 if (ret < 0) {
1423 if (ret == -1) {
1424 ctx->state = SPOE_CTX_ST_ERROR;
1425 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1426 goto skip_notify_frame;
1427 }
1428 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1429 goto switchstate;
1430 }
1431 else if (!ret)
1432 goto full;
1433 ctx->state = SPOE_CTX_ST_WAITING_ACK;
Christopher Faulet03a34492016-11-19 16:47:56 +01001434 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001435 }
1436
1437 skip_notify_frame:
1438 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1439 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1440 if (ret < 0) {
1441 if (ret == -1)
1442 goto skip_notify_frame;
1443 ctx->state = SPOE_CTX_ST_ERROR;
1444 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1445 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1446 goto switchstate;
1447 }
1448 if (!ret)
1449 goto out;
1450 if (ret == 2) {
1451 ctx->state = SPOE_CTX_ST_ERROR;
1452 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1453 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1454 goto switchstate;
1455 }
1456 ctx->state = SPOE_CTX_ST_DONE;
1457 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1458 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1459 }
1460 else {
1461 if (stopping) {
1462 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1463 goto switchstate;
1464 }
1465
1466 ret = recv_spoe_frame(appctx, NULL);
1467 if (ret < 0) {
1468 if (ret == -1)
1469 goto skip_notify_frame;
1470 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1471 goto switchstate;
1472 }
1473 if (!ret)
1474 goto out;
1475 if (ret == 2) {
1476 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1477 goto switchstate;
1478 }
1479 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1480 }
1481 break;
1482
1483 case SPOE_APPCTX_ST_DISCONNECT:
1484 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1485 if (ret < 0) {
1486 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1487 goto switchstate;
1488 }
1489 else if (!ret)
1490 goto full;
1491 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1492 " - disconnected by HAProxy (%d): %s\n",
1493 (int)now.tv_sec, (int)now.tv_usec,
1494 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1495 __FUNCTION__, appctx, spoe_status_code,
1496 spoe_frm_err_reasons[spoe_status_code]);
1497
Christopher Faulet03a34492016-11-19 16:47:56 +01001498 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001499 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1500 /* fall through */
1501
1502 case SPOE_APPCTX_ST_DISCONNECTING:
1503 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1504 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1505 goto switchstate;
1506 }
1507 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1508 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1509 goto switchstate;
1510 }
1511 ret = recv_spoe_frame(appctx, NULL);
1512 if (ret < 0 || ret == 2) {
1513 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1514 goto switchstate;
1515 }
1516 break;
1517
1518 case SPOE_APPCTX_ST_EXIT:
1519 si_shutw(si);
1520 si_shutr(si);
1521 si_ic(si)->flags |= CF_READ_NULL;
1522 appctx->st0 = SPOE_APPCTX_ST_END;
1523 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1524 /* fall through */
1525
1526 case SPOE_APPCTX_ST_END:
1527 break;
1528 }
1529
1530 out:
1531 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1532 task_queue(APPCTX_SPOE(appctx).task);
1533 si_oc(si)->flags |= CF_READ_DONTWAIT;
1534 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1535 return;
1536 full:
1537 si_applet_cant_put(si);
1538 goto out;
1539}
1540
1541struct applet spoe_applet = {
1542 .obj_type = OBJ_TYPE_APPLET,
1543 .name = "<SPOE>", /* used for logging */
1544 .fct = handle_spoe_applet,
1545 .release = release_spoe_applet,
1546};
1547
1548/* Create a SPOE applet. On success, the created applet is returned, else
1549 * NULL. */
1550static struct appctx *
1551create_spoe_appctx(struct spoe_config *conf)
1552{
1553 struct appctx *appctx;
1554 struct session *sess;
1555 struct task *task;
1556 struct stream *strm;
1557 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1558 struct listener *, by_fe);
1559
1560 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1561 goto out_error;
1562
1563 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1564 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1565 goto out_free_appctx;
1566 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1567 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1568 APPCTX_SPOE(appctx).task->context = appctx;
1569 APPCTX_SPOE(appctx).agent = conf->agent;
1570 APPCTX_SPOE(appctx).ctx = NULL;
1571 APPCTX_SPOE(appctx).version = 0;
1572 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1573 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1574
1575 sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1576 if (!sess)
1577 goto out_free_spoe;
1578
1579 if ((task = task_new()) == NULL)
1580 goto out_free_sess;
1581
1582 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1583 goto out_free_task;
1584
1585 strm->target = sess->listener->default_target;
1586 strm->req.analysers |= sess->listener->analysers;
1587 stream_set_backend(strm, conf->agent->b.be);
1588
1589 /* applet is waiting for data */
1590 si_applet_cant_get(&strm->si[0]);
1591 appctx_wakeup(appctx);
1592
Christopher Faulet48026722016-11-16 15:01:12 +01001593 /* Increase the per-process number of cumulated connections */
1594 if (conf->agent->cps_max > 0)
1595 update_freq_ctr(&conf->agent->conn_per_sec, 1);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001596
1597 strm->do_log = NULL;
1598 strm->res.flags |= CF_READ_DONTWAIT;
1599
1600 conf->agent_fe.feconn++;
1601 jobs++;
1602 totalconn++;
1603
1604 return appctx;
1605
1606 /* Error unrolling */
1607 out_free_task:
1608 task_free(task);
1609 out_free_sess:
1610 session_free(sess);
1611 out_free_spoe:
1612 task_free(APPCTX_SPOE(appctx).task);
1613 out_free_appctx:
1614 appctx_free(appctx);
1615 out_error:
1616 return NULL;
1617}
1618
1619/* Wake up a SPOE applet attached to a SPOE context. */
1620static void
1621wakeup_spoe_appctx(struct spoe_context *ctx)
1622{
1623 if (ctx->appctx == NULL)
1624 return;
1625 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1626 si_applet_want_get(ctx->appctx->owner);
1627 si_applet_want_put(ctx->appctx->owner);
1628 appctx_wakeup(ctx->appctx);
1629 }
1630}
1631
1632
1633/* Run across the list of pending streams waiting for a SPOE applet and wake the
1634 * first. */
1635static void
1636offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1637{
1638 struct spoe_context *ctx;
1639
Christopher Fauletf7a30922016-11-10 15:04:51 +01001640 if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1641 return;
1642
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001643 if (LIST_ISEMPTY(&agent->applet_wq))
1644 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1645 else {
1646 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1647 APPCTX_SPOE(appctx).ctx = ctx;
1648 ctx->appctx = appctx;
1649 LIST_DEL(&ctx->applet_wait);
1650 LIST_INIT(&ctx->applet_wait);
1651 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1652 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1653 " - wake up stream to get available SPOE applet\n",
1654 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1655 __FUNCTION__, ctx->strm);
1656 }
1657}
1658
1659/* A failure occurred during SPOE applet creation. */
1660static void
1661on_new_spoe_appctx_failure(struct spoe_agent *agent)
1662{
1663 struct spoe_context *ctx;
1664
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001665 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001666 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1667 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1668 " - wake up stream because to SPOE applet connection failed\n",
1669 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1670 __FUNCTION__, ctx->strm);
1671 }
1672}
1673
1674static void
1675on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1676{
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001677 offer_spoe_appctx(agent, appctx);
1678}
1679/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1680 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1681static int
1682acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1683{
1684 struct spoe_config *conf = FLT_CONF(ctx->filter);
1685 struct spoe_agent *agent = conf->agent;
1686 struct appctx *appctx;
1687
1688 /* If a process is already started for this SPOE context, retry
1689 * later. */
1690 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1691 goto wait;
1692
1693 /* If needed, initialize the buffer that will be used to encode messages
1694 * and decode actions. */
1695 if (ctx->buffer == &buf_empty) {
1696 if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
1697 LIST_DEL(&ctx->buffer_wait);
1698 LIST_INIT(&ctx->buffer_wait);
1699 }
1700
1701 if (!b_alloc_margin(&ctx->buffer, 0)) {
1702 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
1703 goto wait;
1704 }
1705 }
1706
1707 /* If the SPOE applet was already set, all is done. */
1708 if (ctx->appctx)
1709 goto success;
1710
1711 /* Else try to retrieve it from the agent cache */
1712 if (!LIST_ISEMPTY(&agent->cache)) {
1713 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1714 LIST_DEL(&APPCTX_SPOE(appctx).list);
1715 APPCTX_SPOE(appctx).ctx = ctx;
1716 ctx->appctx = appctx;
1717 goto success;
1718 }
1719
Christopher Faulet48026722016-11-16 15:01:12 +01001720 /* If there is no server up for the agent's backend, this is an
1721 * error. */
1722 if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001723 goto error;
1724
1725 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1726 " - waiting for available SPOE appctx\n",
1727 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1728 ctx->strm);
1729
1730 /* Else add the stream in the waiting queue. */
1731 if (LIST_ISEMPTY(&ctx->applet_wait))
1732 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1733
1734 /* Finally, create new SPOE applet if we can */
Christopher Faulet48026722016-11-16 15:01:12 +01001735 if (agent->cps_max > 0) {
1736 if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
1737 goto wait;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001738 }
Christopher Faulet48026722016-11-16 15:01:12 +01001739 if (create_spoe_appctx(conf) == NULL)
1740 goto error;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001741
1742 wait:
1743 return 0;
1744
1745 success:
1746 /* Remove the stream from the waiting queue */
1747 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1748 LIST_DEL(&ctx->applet_wait);
1749 LIST_INIT(&ctx->applet_wait);
1750 }
1751
1752 /* Set the right flag to prevent request and response processing
1753 * in same time. */
1754 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1755 ? SPOE_CTX_FL_REQ_PROCESS
1756 : SPOE_CTX_FL_RSP_PROCESS);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001757
1758 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1759 " - acquire SPOE appctx %p from cache\n",
1760 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1761 __FUNCTION__, ctx->strm, ctx->appctx);
1762 return 1;
1763
1764 error:
1765 /* Remove the stream from the waiting queue */
1766 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1767 LIST_DEL(&ctx->applet_wait);
1768 LIST_INIT(&ctx->applet_wait);
1769 }
1770
1771 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Faulet48026722016-11-16 15:01:12 +01001772 " - failed to acquire SPOE appctx\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001773 (int)now.tv_sec, (int)now.tv_usec, agent->id,
Christopher Faulet48026722016-11-16 15:01:12 +01001774 __FUNCTION__, ctx->strm);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001775 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1776
1777 return -1;
1778}
1779
1780/* Release a SPOE applet and push it in the agent cache. */
1781static void
1782release_spoe_appctx(struct spoe_context *ctx)
1783{
1784 struct spoe_config *conf = FLT_CONF(ctx->filter);
1785 struct spoe_agent *agent = conf->agent;
1786 struct appctx *appctx = ctx->appctx;
1787
1788 /* Reset the flag to allow next processing */
1789 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1790
Christopher Fauletf7a30922016-11-10 15:04:51 +01001791 /* Reset processing timer */
1792 ctx->process_exp = TICK_ETERNITY;
1793
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001794 /* Release the buffer if needed */
1795 if (ctx->buffer != &buf_empty) {
1796 b_free(&ctx->buffer);
1797 if (!LIST_ISEMPTY(&buffer_wq))
1798 stream_offer_buffers();
1799 }
1800
1801 /* If there is no SPOE applet, all is done */
1802 if (!appctx)
1803 return;
1804
1805 /* Else, reassign it or push it in the agent cache */
1806 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1807 " - release SPOE appctx %p\n",
1808 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1809 __FUNCTION__, ctx->strm, appctx);
1810
1811 APPCTX_SPOE(appctx).ctx = NULL;
1812 ctx->appctx = NULL;
1813 offer_spoe_appctx(agent, appctx);
1814}
1815
1816/***************************************************************************
1817 * Functions that process SPOE messages and actions
1818 **************************************************************************/
1819/* Process SPOE messages for a specific event. During the processing, it returns
1820 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1821 * is returned. */
1822static int
1823process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1824 struct list *messages, int dir)
1825{
1826 struct spoe_message *msg;
1827 struct sample *smp;
1828 struct spoe_arg *arg;
1829 char *p;
1830 size_t max_size;
1831 int off, flag, idx = 0;
1832
1833 /* Reserve 32 bytes from the frame Metadata */
1834 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1835
1836 b_reset(ctx->buffer);
1837 p = ctx->buffer->p;
1838
1839 /* Loop on messages */
1840 list_for_each_entry(msg, messages, list) {
1841 if (idx + msg->id_len + 1 > max_size)
1842 goto skip;
1843
1844 /* Set the message name */
1845 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1846
1847 /* Save offset where to store the number of arguments for this
1848 * message */
1849 off = idx++;
1850 p[off] = 0;
1851
1852 /* Loop on arguments */
1853 list_for_each_entry(arg, &msg->args, list) {
1854 p[off]++; /* Increment the number of arguments */
1855
1856 if (idx + arg->name_len + 1 > max_size)
1857 goto skip;
1858
1859 /* Encode the arguement name as a string. It can by NULL */
1860 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1861
1862 /* Fetch the arguement value */
1863 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1864 if (!smp) {
1865 /* If no value is available, set it to NULL */
1866 p[idx++] = SPOE_DATA_T_NULL;
1867 continue;
1868 }
1869
1870 /* Else, encode the arguement value */
1871 switch (smp->data.type) {
1872 case SMP_T_BOOL:
1873 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1874 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1875 break;
1876 case SMP_T_SINT:
1877 p[idx++] = SPOE_DATA_T_INT64;
1878 if (idx + 8 > max_size)
1879 goto skip;
1880 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1881 break;
1882 case SMP_T_IPV4:
1883 p[idx++] = SPOE_DATA_T_IPV4;
1884 if (idx + 4 > max_size)
1885 goto skip;
1886 memcpy(p+idx, &smp->data.u.ipv4, 4);
1887 idx += 4;
1888 break;
1889 case SMP_T_IPV6:
1890 p[idx++] = SPOE_DATA_T_IPV6;
1891 if (idx + 16 > max_size)
1892 goto skip;
1893 memcpy(p+idx, &smp->data.u.ipv6, 16);
1894 idx += 16;
1895 break;
1896 case SMP_T_STR:
1897 p[idx++] = SPOE_DATA_T_STR;
1898 if (idx + smp->data.u.str.len > max_size)
1899 goto skip;
1900 idx += encode_spoe_string(smp->data.u.str.str,
1901 smp->data.u.str.len,
1902 p+idx);
1903 break;
1904 case SMP_T_BIN:
1905 p[idx++] = SPOE_DATA_T_BIN;
1906 if (idx + smp->data.u.str.len > max_size)
1907 goto skip;
1908 idx += encode_spoe_string(smp->data.u.str.str,
1909 smp->data.u.str.len,
1910 p+idx);
1911 break;
1912 case SMP_T_METH:
1913 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1914 p[idx++] = SPOE_DATA_T_STR;
1915 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1916 goto skip;
1917 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1918 http_known_methods[smp->data.u.meth.meth].len,
1919 p+idx);
1920 }
1921 else {
1922 p[idx++] = SPOE_DATA_T_STR;
1923 if (idx + smp->data.u.str.len > max_size)
1924 goto skip;
1925 idx += encode_spoe_string(smp->data.u.meth.str.str,
1926 smp->data.u.meth.str.len,
1927 p+idx);
1928 }
1929 break;
1930 default:
1931 p[idx++] = SPOE_DATA_T_NULL;
1932 }
1933 }
1934 }
1935 ctx->buffer->i = idx;
1936 return 1;
1937
1938 skip:
1939 b_reset(ctx->buffer);
1940 return 0;
1941}
1942
1943/* Helper function to set a variable */
1944static void
1945set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1946 struct sample *smp)
1947{
1948 struct spoe_config *conf = FLT_CONF(ctx->filter);
1949 struct spoe_agent *agent = conf->agent;
1950 char varname[64];
1951
1952 memset(varname, 0, sizeof(varname));
1953 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1954 scope, agent->var_pfx, len, name);
1955 vars_set_by_name_ifexist(varname, len, smp);
1956}
1957
1958/* Helper function to unset a variable */
1959static void
1960unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1961 struct sample *smp)
1962{
1963 struct spoe_config *conf = FLT_CONF(ctx->filter);
1964 struct spoe_agent *agent = conf->agent;
1965 char varname[64];
1966
1967 memset(varname, 0, sizeof(varname));
1968 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1969 scope, agent->var_pfx, len, name);
1970 vars_unset_by_name_ifexist(varname, len, smp);
1971}
1972
1973
1974/* Process SPOE actions for a specific event. During the processing, it returns
1975 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1976 * is returned. */
1977static int
1978process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1979 enum spoe_event ev, int dir)
1980{
1981 char *p;
1982 size_t size;
1983 int off, i, idx = 0;
1984
1985 p = ctx->buffer->p;
1986 size = ctx->buffer->i;
1987
1988 while (idx < size) {
1989 char *str;
1990 uint64_t sz;
1991 struct sample smp;
1992 enum spoe_action_type type;
1993
1994 off = idx;
1995 if (idx+2 > size)
1996 goto skip;
1997
1998 type = p[idx++];
1999 switch (type) {
2000 case SPOE_ACT_T_SET_VAR: {
2001 char *scope;
2002
2003 if (p[idx++] != 3)
2004 goto skip_action;
2005
2006 switch (p[idx++]) {
2007 case SPOE_SCOPE_PROC: scope = "proc"; break;
2008 case SPOE_SCOPE_SESS: scope = "sess"; break;
2009 case SPOE_SCOPE_TXN : scope = "txn"; break;
2010 case SPOE_SCOPE_REQ : scope = "req"; break;
2011 case SPOE_SCOPE_RES : scope = "res"; break;
2012 default: goto skip;
2013 }
2014
2015 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2016 if (str == NULL)
2017 goto skip;
2018 memset(&smp, 0, sizeof(smp));
2019 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
Christopher Fauletb5cff602016-11-24 14:53:22 +01002020
2021 if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002022 goto skip;
Christopher Fauletb5cff602016-11-24 14:53:22 +01002023 idx += i;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002024
2025 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2026 " - set-var '%s.%s.%.*s'\n",
2027 (int)now.tv_sec, (int)now.tv_usec,
2028 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2029 __FUNCTION__, s, scope,
2030 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2031 (int)sz, str);
2032
2033 set_spoe_var(ctx, scope, str, sz, &smp);
2034 break;
2035 }
2036
2037 case SPOE_ACT_T_UNSET_VAR: {
2038 char *scope;
2039
2040 if (p[idx++] != 2)
2041 goto skip_action;
2042
2043 switch (p[idx++]) {
2044 case SPOE_SCOPE_PROC: scope = "proc"; break;
2045 case SPOE_SCOPE_SESS: scope = "sess"; break;
2046 case SPOE_SCOPE_TXN : scope = "txn"; break;
2047 case SPOE_SCOPE_REQ : scope = "req"; break;
2048 case SPOE_SCOPE_RES : scope = "res"; break;
2049 default: goto skip;
2050 }
2051
2052 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2053 if (str == NULL)
2054 goto skip;
2055 memset(&smp, 0, sizeof(smp));
2056 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2057
2058 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2059 " - unset-var '%s.%s.%.*s'\n",
2060 (int)now.tv_sec, (int)now.tv_usec,
2061 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2062 __FUNCTION__, s, scope,
2063 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2064 (int)sz, str);
2065
2066 unset_spoe_var(ctx, scope, str, sz, &smp);
2067 break;
2068 }
2069
2070 default:
2071 skip_action:
2072 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2073 goto skip;
2074 idx += i;
2075 }
2076 }
2077
2078 return 1;
2079 skip:
2080 return 0;
2081}
2082
2083
2084/* Process a SPOE event. First, this functions will process messages attached to
2085 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2086 * ACK frame to process corresponding actions. During all the processing, it
2087 * returns 0 and it returns 1 when the processing is finished. If an error
2088 * occurred, -1 is returned. */
2089static int
2090process_spoe_event(struct stream *s, struct spoe_context *ctx,
2091 enum spoe_event ev)
2092{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002093 struct spoe_config *conf = FLT_CONF(ctx->filter);
2094 struct spoe_agent *agent = conf->agent;
2095 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002096
2097 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2098 " - ctx-state=%s - event=%s\n",
2099 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002100 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002101 spoe_event_str[ev]);
2102
Christopher Faulet48026722016-11-16 15:01:12 +01002103 if (agent->eps_max > 0) {
2104 if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2105 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2106 " - skip event '%s': max EPS reached\n",
2107 (int)now.tv_sec, (int)now.tv_usec,
2108 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2109 goto skip;
2110 }
2111 }
2112
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002113 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2114
2115 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2116 goto out;
2117
2118 if (ctx->state == SPOE_CTX_ST_ERROR)
2119 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002120
2121 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2122 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2123 " - failed to process event '%s': timeout\n",
2124 (int)now.tv_sec, (int)now.tv_usec,
2125 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2126 send_log(ctx->strm->be, LOG_WARNING,
2127 "failed to process event '%s': timeout.\n",
2128 spoe_event_str[ev]);
2129 goto error;
2130 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002131
2132 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauletf7a30922016-11-10 15:04:51 +01002133 if (!tick_isset(ctx->process_exp)) {
2134 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2135 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2136 ctx->process_exp);
2137 }
2138
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002139 ret = acquire_spoe_appctx(ctx, dir);
2140 if (ret <= 0) {
2141 if (!ret)
2142 goto out;
2143 goto error;
2144 }
2145 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2146 }
2147
2148 if (ctx->appctx == NULL)
2149 goto error;
2150
2151 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2152 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2153 if (ret <= 0) {
2154 if (!ret)
2155 goto skip;
2156 goto error;
2157 }
2158 wakeup_spoe_appctx(ctx);
2159 ret = 0;
2160 goto out;
2161 }
2162
2163 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2164 wakeup_spoe_appctx(ctx);
2165 ret = 0;
2166 goto out;
2167 }
2168
2169 if (ctx->state == SPOE_CTX_ST_DONE) {
2170 ret = process_spoe_actions(s, ctx, ev, dir);
2171 if (ret <= 0) {
2172 if (!ret)
2173 goto skip;
2174 goto error;
2175 }
2176 ctx->frame_id++;
2177 release_spoe_appctx(ctx);
2178 ctx->state = SPOE_CTX_ST_READY;
2179 }
2180
2181 out:
2182 return ret;
2183
2184 skip:
2185 release_spoe_appctx(ctx);
2186 ctx->state = SPOE_CTX_ST_READY;
2187 return 1;
2188
2189 error:
Christopher Faulet48026722016-11-16 15:01:12 +01002190 if (agent->eps_max > 0)
2191 update_freq_ctr(&agent->err_per_sec, 1);
2192
Christopher Faulet985532d2016-11-16 15:36:19 +01002193 if (agent->var_on_error) {
2194 struct sample smp;
2195
2196 memset(&smp, 0, sizeof(smp));
2197 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2198 smp.data.u.sint = 1;
2199 smp.data.type = SMP_T_BOOL;
2200
2201 set_spoe_var(ctx, "txn", agent->var_on_error,
2202 strlen(agent->var_on_error), &smp);
2203 }
2204
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002205 release_spoe_appctx(ctx);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002206 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2207 ? SPOE_CTX_ST_READY
2208 : SPOE_CTX_ST_ERROR);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002209 return 1;
2210}
2211
2212
2213/***************************************************************************
2214 * Functions that create/destroy SPOE contexts
2215 **************************************************************************/
2216static struct spoe_context *
2217create_spoe_context(struct filter *filter)
2218{
2219 struct spoe_config *conf = FLT_CONF(filter);
2220 struct spoe_context *ctx;
2221
2222 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2223 if (ctx == NULL) {
2224 return NULL;
2225 }
2226 memset(ctx, 0, sizeof(*ctx));
2227 ctx->filter = filter;
2228 ctx->state = SPOE_CTX_ST_NONE;
2229 ctx->flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002230 ctx->messages = conf->agent->messages;
2231 ctx->buffer = &buf_empty;
2232 LIST_INIT(&ctx->buffer_wait);
2233 LIST_INIT(&ctx->applet_wait);
2234
Christopher Fauletf7a30922016-11-10 15:04:51 +01002235 ctx->stream_id = 0;
2236 ctx->frame_id = 1;
2237 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002238
2239 return ctx;
2240}
2241
2242static void
2243destroy_spoe_context(struct spoe_context *ctx)
2244{
2245 if (!ctx)
2246 return;
2247
2248 if (ctx->appctx)
2249 APPCTX_SPOE(ctx->appctx).ctx = NULL;
2250 if (!LIST_ISEMPTY(&ctx->buffer_wait))
2251 LIST_DEL(&ctx->buffer_wait);
2252 if (!LIST_ISEMPTY(&ctx->applet_wait))
2253 LIST_DEL(&ctx->applet_wait);
2254 pool_free2(pool2_spoe_ctx, ctx);
2255}
2256
2257static void
2258reset_spoe_context(struct spoe_context *ctx)
2259{
2260 ctx->state = SPOE_CTX_ST_READY;
2261 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2262}
2263
2264
2265/***************************************************************************
2266 * Hooks that manage the filter lifecycle (init/check/deinit)
2267 **************************************************************************/
2268/* Signal handler: Do a soft stop, wakeup SPOE applet */
2269static void
2270sig_stop_spoe(struct sig_handler *sh)
2271{
2272 struct proxy *p;
2273
2274 p = proxy;
2275 while (p) {
2276 struct flt_conf *fconf;
2277
2278 list_for_each_entry(fconf, &p->filter_configs, list) {
2279 struct spoe_config *conf = fconf->conf;
2280 struct spoe_agent *agent = conf->agent;
2281 struct appctx *appctx;
2282
2283 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2284 si_applet_want_get(appctx->owner);
2285 si_applet_want_put(appctx->owner);
2286 appctx_wakeup(appctx);
2287 }
2288 }
2289 p = p->next;
2290 }
2291}
2292
2293
2294/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2295static int
2296spoe_init(struct proxy *px, struct flt_conf *fconf)
2297{
2298 struct spoe_config *conf = fconf->conf;
2299 struct listener *l;
2300
2301 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2302 init_new_proxy(&conf->agent_fe);
2303 conf->agent_fe.parent = conf->agent;
2304 conf->agent_fe.last_change = now.tv_sec;
2305 conf->agent_fe.id = conf->agent->id;
2306 conf->agent_fe.cap = PR_CAP_FE;
2307 conf->agent_fe.mode = PR_MODE_TCP;
2308 conf->agent_fe.maxconn = 0;
2309 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2310 conf->agent_fe.conn_retries = CONN_RETRIES;
2311 conf->agent_fe.accept = frontend_accept;
2312 conf->agent_fe.srv = NULL;
2313 conf->agent_fe.timeout.client = TICK_ETERNITY;
2314 conf->agent_fe.default_target = &spoe_applet.obj_type;
2315 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2316
2317 if ((l = calloc(1, sizeof(*l))) == NULL) {
2318 Alert("spoe_init : out of memory.\n");
2319 goto out_error;
2320 }
2321 l->obj_type = OBJ_TYPE_LISTENER;
2322 l->obj_type = OBJ_TYPE_LISTENER;
2323 l->frontend = &conf->agent_fe;
2324 l->state = LI_READY;
2325 l->analysers = conf->agent_fe.fe_req_ana;
2326 LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2327
2328 if (!sighandler_registered) {
2329 signal_register_fct(0, sig_stop_spoe, 0);
2330 sighandler_registered = 1;
2331 }
2332
2333 return 0;
2334
2335 out_error:
2336 return -1;
2337}
2338
2339/* Free ressources allocated by the SPOE filter. */
2340static void
2341spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2342{
2343 struct spoe_config *conf = fconf->conf;
2344
2345 if (conf) {
2346 struct spoe_agent *agent = conf->agent;
2347 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2348 struct listener *, by_fe);
2349
2350 free(l);
2351 release_spoe_agent(agent);
2352 free(conf);
2353 }
2354 fconf->conf = NULL;
2355}
2356
2357/* Check configuration of a SPOE filter for a specified proxy.
2358 * Return 1 on error, else 0. */
2359static int
2360spoe_check(struct proxy *px, struct flt_conf *fconf)
2361{
2362 struct spoe_config *conf = fconf->conf;
2363 struct proxy *target;
2364
2365 target = proxy_be_by_name(conf->agent->b.name);
2366 if (target == NULL) {
2367 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2368 " declared at %s:%d.\n",
2369 px->id, conf->agent->b.name, conf->agent->id,
2370 conf->agent->conf.file, conf->agent->conf.line);
2371 return 1;
2372 }
2373 if (target->mode != PR_MODE_TCP) {
2374 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2375 " at %s:%d does not support HTTP mode.\n",
2376 px->id, target->id, conf->agent->id,
2377 conf->agent->conf.file, conf->agent->conf.line);
2378 return 1;
2379 }
2380
2381 free(conf->agent->b.name);
2382 conf->agent->b.name = NULL;
2383 conf->agent->b.be = target;
2384 return 0;
2385}
2386
2387/**************************************************************************
2388 * Hooks attached to a stream
2389 *************************************************************************/
2390/* Called when a filter instance is created and attach to a stream. It creates
2391 * the context that will be used to process this stream. */
2392static int
2393spoe_start(struct stream *s, struct filter *filter)
2394{
2395 struct spoe_context *ctx;
2396
2397 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2398 (int)now.tv_sec, (int)now.tv_usec,
2399 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2400 __FUNCTION__, s);
2401
2402 ctx = create_spoe_context(filter);
2403 if (ctx == NULL) {
2404 send_log(s->be, LOG_EMERG,
2405 "failed to create SPOE context for proxy %s\n",
2406 s->be->id);
2407 return 0;
2408 }
2409
2410 ctx->strm = s;
2411 ctx->state = SPOE_CTX_ST_READY;
2412 filter->ctx = ctx;
2413
2414 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2415 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2416
2417 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2418 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2419
2420 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2421 filter->pre_analyzers |= AN_RES_INSPECT;
2422
2423 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2424 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2425
2426 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2427 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2428
2429 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2430 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2431
2432 return 1;
2433}
2434
2435/* Called when a filter instance is detached from a stream. It release the
2436 * attached SPOE context. */
2437static void
2438spoe_stop(struct stream *s, struct filter *filter)
2439{
2440 struct spoe_context *ctx = filter->ctx;
2441
2442 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2443 (int)now.tv_sec, (int)now.tv_usec,
2444 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2445 __FUNCTION__, s);
2446
2447 if (ctx) {
2448 release_spoe_appctx(ctx);
2449 destroy_spoe_context(ctx);
2450 }
2451}
2452
Christopher Fauletf7a30922016-11-10 15:04:51 +01002453
2454/*
2455 * Called when the stream is woken up because of expired timer.
2456 */
2457static void
2458spoe_check_timeouts(struct stream *s, struct filter *filter)
2459{
2460 struct spoe_context *ctx = filter->ctx;
2461
2462 if (tick_is_expired(ctx->process_exp, now_ms))
2463 s->task->state |= TASK_WOKEN_MSG;
2464}
2465
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002466/* Called when we are ready to filter data on a channel */
2467static int
2468spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2469{
2470 struct spoe_context *ctx = filter->ctx;
2471 int ret = 1;
2472
2473 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2474 " - ctx-flags=0x%08x\n",
2475 (int)now.tv_sec, (int)now.tv_usec,
2476 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2477 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2478
2479 if (!(chn->flags & CF_ISRESP)) {
2480 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2481 chn->analysers |= AN_REQ_INSPECT_FE;
2482 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2483 chn->analysers |= AN_REQ_INSPECT_BE;
2484
2485 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2486 goto out;
2487
2488 ctx->stream_id = s->uniq_id;
2489 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2490 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2491 if (ret != 1)
2492 goto out;
2493 }
2494 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2495 }
2496 else {
2497 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2498 chn->analysers |= AN_RES_INSPECT;
2499
2500 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2501 goto out;
2502
2503 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2504 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2505 if (ret != 1)
2506 goto out;
2507 }
2508 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2509 }
2510
2511 out:
2512 if (!ret) {
2513 channel_dont_read(chn);
2514 channel_dont_close(chn);
2515 }
2516 return ret;
2517}
2518
2519/* Called before a processing happens on a given channel */
2520static int
2521spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2522 struct channel *chn, unsigned an_bit)
2523{
2524 struct spoe_context *ctx = filter->ctx;
2525 int ret = 1;
2526
2527 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2528 " - ctx-flags=0x%08x - ana=0x%08x\n",
2529 (int)now.tv_sec, (int)now.tv_usec,
2530 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2531 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2532 ctx->flags, an_bit);
2533
2534 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2535 goto out;
2536
2537 switch (an_bit) {
2538 case AN_REQ_INSPECT_FE:
2539 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2540 break;
2541 case AN_REQ_INSPECT_BE:
2542 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2543 break;
2544 case AN_RES_INSPECT:
2545 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2546 break;
2547 case AN_REQ_HTTP_PROCESS_FE:
2548 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2549 break;
2550 case AN_REQ_HTTP_PROCESS_BE:
2551 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2552 break;
2553 case AN_RES_HTTP_PROCESS_FE:
2554 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2555 break;
2556 }
2557
2558 out:
2559 if (!ret) {
2560 channel_dont_read(chn);
2561 channel_dont_close(chn);
2562 }
2563 return ret;
2564}
2565
2566/* Called when the filtering on the channel ends. */
2567static int
2568spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2569{
2570 struct spoe_context *ctx = filter->ctx;
2571
2572 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2573 " - ctx-flags=0x%08x\n",
2574 (int)now.tv_sec, (int)now.tv_usec,
2575 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2576 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2577
2578 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2579 reset_spoe_context(ctx);
2580 }
2581
2582 return 1;
2583}
2584
2585/********************************************************************
2586 * Functions that manage the filter initialization
2587 ********************************************************************/
2588struct flt_ops spoe_ops = {
2589 /* Manage SPOE filter, called for each filter declaration */
2590 .init = spoe_init,
2591 .deinit = spoe_deinit,
2592 .check = spoe_check,
2593
2594 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002595 .attach = spoe_start,
2596 .detach = spoe_stop,
2597 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002598
2599 /* Handle channels activity */
2600 .channel_start_analyze = spoe_start_analyze,
2601 .channel_pre_analyze = spoe_chn_pre_analyze,
2602 .channel_end_analyze = spoe_end_analyze,
2603};
2604
2605
2606static int
2607cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2608{
2609 const char *err;
2610 int i, err_code = 0;
2611
2612 if ((cfg_scope == NULL && curengine != NULL) ||
2613 (cfg_scope != NULL && curengine == NULL) ||
2614 strcmp(curengine, cfg_scope))
2615 goto out;
2616
2617 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2618 if (!*args[1]) {
2619 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2620 file, linenum);
2621 err_code |= ERR_ALERT | ERR_ABORT;
2622 goto out;
2623 }
2624 if (*args[2]) {
2625 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2626 file, linenum, args[2]);
2627 err_code |= ERR_ALERT | ERR_ABORT;
2628 goto out;
2629 }
2630
2631 err = invalid_char(args[1]);
2632 if (err) {
2633 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2634 file, linenum, *err, args[0], args[1]);
2635 err_code |= ERR_ALERT | ERR_ABORT;
2636 goto out;
2637 }
2638
2639 if (curagent != NULL) {
2640 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2641 file, linenum);
2642 err_code |= ERR_ALERT | ERR_ABORT;
2643 goto out;
2644 }
2645 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2646 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2647 err_code |= ERR_ALERT | ERR_ABORT;
2648 goto out;
2649 }
2650
2651 curagent->id = strdup(args[1]);
2652 curagent->conf.file = strdup(file);
2653 curagent->conf.line = linenum;
2654 curagent->timeout.hello = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002655 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002656 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002657 curagent->var_pfx = NULL;
Christopher Faulet985532d2016-11-16 15:36:19 +01002658 curagent->var_on_error = NULL;
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002659 curagent->flags = 0;
Christopher Faulet48026722016-11-16 15:01:12 +01002660 curagent->cps_max = 0;
2661 curagent->eps_max = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002662
2663 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2664 LIST_INIT(&curagent->messages[i]);
2665 LIST_INIT(&curagent->cache);
2666 LIST_INIT(&curagent->applet_wq);
2667 }
2668 else if (!strcmp(args[0], "use-backend")) {
2669 if (!*args[1]) {
2670 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2671 file, linenum, args[0]);
2672 err_code |= ERR_ALERT | ERR_FATAL;
2673 goto out;
2674 }
2675 if (*args[2]) {
2676 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2677 file, linenum, args[2]);
2678 err_code |= ERR_ALERT | ERR_ABORT;
2679 goto out;
2680 }
2681 free(curagent->b.name);
2682 curagent->b.name = strdup(args[1]);
2683 }
2684 else if (!strcmp(args[0], "messages")) {
2685 int cur_arg = 1;
2686 while (*args[cur_arg]) {
2687 struct spoe_msg_placeholder *mp = NULL;
2688
2689 list_for_each_entry(mp, &curmps, list) {
2690 if (!strcmp(mp->id, args[cur_arg])) {
2691 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2692 file, linenum, args[cur_arg]);
2693 err_code |= ERR_ALERT | ERR_FATAL;
2694 goto out;
2695 }
2696 }
2697
2698 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2699 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2700 err_code |= ERR_ALERT | ERR_ABORT;
2701 goto out;
2702 }
2703 mp->id = strdup(args[cur_arg]);
2704 LIST_ADDQ(&curmps, &mp->list);
2705 cur_arg++;
2706 }
2707 }
2708 else if (!strcmp(args[0], "timeout")) {
2709 unsigned int *tv = NULL;
2710 const char *res;
2711 unsigned timeout;
2712
2713 if (!*args[1]) {
2714 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2715 file, linenum);
2716 err_code |= ERR_ALERT | ERR_FATAL;
2717 goto out;
2718 }
2719 if (!strcmp(args[1], "hello"))
2720 tv = &curagent->timeout.hello;
2721 else if (!strcmp(args[1], "idle"))
2722 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002723 else if (!strcmp(args[1], "processing"))
2724 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002725 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01002726 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002727 file, linenum, args[1]);
2728 err_code |= ERR_ALERT | ERR_FATAL;
2729 goto out;
2730 }
2731 if (!*args[2]) {
2732 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2733 file, linenum, args[1]);
2734 err_code |= ERR_ALERT | ERR_FATAL;
2735 goto out;
2736 }
2737 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2738 if (res) {
2739 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2740 file, linenum, *res, args[1]);
2741 err_code |= ERR_ALERT | ERR_ABORT;
2742 goto out;
2743 }
2744 if (*args[3]) {
2745 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2746 file, linenum, args[3]);
2747 err_code |= ERR_ALERT | ERR_ABORT;
2748 goto out;
2749 }
2750 *tv = MS_TO_TICKS(timeout);
2751 }
2752 else if (!strcmp(args[0], "option")) {
2753 if (!*args[1]) {
2754 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2755 file, linenum, args[0]);
2756 err_code |= ERR_ALERT | ERR_FATAL;
2757 goto out;
2758 }
2759 if (!strcmp(args[1], "var-prefix")) {
2760 char *tmp;
2761
2762 if (!*args[2]) {
2763 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2764 file, linenum, args[0],
2765 args[1]);
2766 err_code |= ERR_ALERT | ERR_FATAL;
2767 goto out;
2768 }
2769 tmp = args[2];
2770 while (*tmp) {
2771 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2772 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2773 file, linenum, args[0], args[1]);
2774 err_code |= ERR_ALERT | ERR_FATAL;
2775 goto out;
2776 }
2777 tmp++;
2778 }
2779 curagent->var_pfx = strdup(args[2]);
2780 }
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002781 else if (!strcmp(args[1], "continue-on-error")) {
2782 if (*args[2]) {
2783 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
Christopher Faulet48026722016-11-16 15:01:12 +01002784 file, linenum, args[2]);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002785 err_code |= ERR_ALERT | ERR_ABORT;
2786 goto out;
2787 }
2788 curagent->flags |= SPOE_FL_CONT_ON_ERR;
2789 }
Christopher Faulet985532d2016-11-16 15:36:19 +01002790 else if (!strcmp(args[1], "set-on-error")) {
2791 char *tmp;
2792
2793 if (!*args[2]) {
2794 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2795 file, linenum, args[0],
2796 args[1]);
2797 err_code |= ERR_ALERT | ERR_FATAL;
2798 goto out;
2799 }
2800 tmp = args[2];
2801 while (*tmp) {
2802 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2803 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2804 file, linenum, args[0], args[1]);
2805 err_code |= ERR_ALERT | ERR_FATAL;
2806 goto out;
2807 }
2808 tmp++;
2809 }
2810 curagent->var_on_error = strdup(args[2]);
2811 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002812 else {
2813 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2814 file, linenum, args[1]);
2815 err_code |= ERR_ALERT | ERR_FATAL;
2816 goto out;
2817 }
Christopher Faulet48026722016-11-16 15:01:12 +01002818 }
2819 else if (!strcmp(args[0], "maxconnrate")) {
2820 if (!*args[1]) {
2821 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2822 file, linenum, args[0]);
2823 err_code |= ERR_ALERT | ERR_FATAL;
2824 goto out;
2825 }
2826 if (*args[2]) {
2827 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2828 file, linenum, args[2]);
2829 err_code |= ERR_ALERT | ERR_ABORT;
2830 goto out;
2831 }
2832 curagent->cps_max = atol(args[1]);
2833 }
2834 else if (!strcmp(args[0], "maxerrrate")) {
2835 if (!*args[1]) {
2836 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2837 file, linenum, args[0]);
2838 err_code |= ERR_ALERT | ERR_FATAL;
2839 goto out;
2840 }
2841 if (*args[2]) {
2842 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2843 file, linenum, args[2]);
2844 err_code |= ERR_ALERT | ERR_ABORT;
2845 goto out;
2846 }
2847 curagent->eps_max = atol(args[1]);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002848 }
2849 else if (*args[0]) {
2850 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2851 file, linenum, args[0]);
2852 err_code |= ERR_ALERT | ERR_FATAL;
2853 goto out;
2854 }
2855 out:
2856 return err_code;
2857}
2858
2859static int
2860cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2861{
2862 struct spoe_message *msg;
2863 struct spoe_arg *arg;
2864 const char *err;
2865 char *errmsg = NULL;
2866 int err_code = 0;
2867
2868 if ((cfg_scope == NULL && curengine != NULL) ||
2869 (cfg_scope != NULL && curengine == NULL) ||
2870 strcmp(curengine, cfg_scope))
2871 goto out;
2872
2873 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2874 if (!*args[1]) {
2875 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2876 file, linenum);
2877 err_code |= ERR_ALERT | ERR_ABORT;
2878 goto out;
2879 }
2880 if (*args[2]) {
2881 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2882 file, linenum, args[2]);
2883 err_code |= ERR_ALERT | ERR_ABORT;
2884 goto out;
2885 }
2886
2887 err = invalid_char(args[1]);
2888 if (err) {
2889 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2890 file, linenum, *err, args[0], args[1]);
2891 err_code |= ERR_ALERT | ERR_ABORT;
2892 goto out;
2893 }
2894
2895 list_for_each_entry(msg, &curmsgs, list) {
2896 if (!strcmp(msg->id, args[1])) {
2897 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2898 " name as another one declared at %s:%d.\n",
2899 file, linenum, args[1], msg->conf.file, msg->conf.line);
2900 err_code |= ERR_ALERT | ERR_FATAL;
2901 goto out;
2902 }
2903 }
2904
2905 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2906 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2907 err_code |= ERR_ALERT | ERR_ABORT;
2908 goto out;
2909 }
2910
2911 curmsg->id = strdup(args[1]);
2912 curmsg->id_len = strlen(curmsg->id);
2913 curmsg->event = SPOE_EV_NONE;
2914 curmsg->conf.file = strdup(file);
2915 curmsg->conf.line = linenum;
2916 LIST_INIT(&curmsg->args);
2917 LIST_ADDQ(&curmsgs, &curmsg->list);
2918 }
2919 else if (!strcmp(args[0], "args")) {
2920 int cur_arg = 1;
2921
2922 curproxy->conf.args.ctx = ARGC_SPOE;
2923 curproxy->conf.args.file = file;
2924 curproxy->conf.args.line = linenum;
2925 while (*args[cur_arg]) {
2926 char *delim = strchr(args[cur_arg], '=');
2927 int idx = 0;
2928
2929 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2930 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2931 err_code |= ERR_ALERT | ERR_ABORT;
2932 goto out;
2933 }
2934
2935 if (!delim) {
2936 arg->name = NULL;
2937 arg->name_len = 0;
2938 delim = args[cur_arg];
2939 }
2940 else {
2941 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2942 arg->name_len = delim - args[cur_arg];
2943 delim++;
2944 }
2945
2946 arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
2947 if (arg->expr == NULL) {
2948 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2949 err_code |= ERR_ALERT | ERR_FATAL;
2950 free(arg->name);
2951 free(arg);
2952 goto out;
2953 }
2954 LIST_ADDQ(&curmsg->args, &arg->list);
2955 cur_arg++;
2956 }
2957 curproxy->conf.args.file = NULL;
2958 curproxy->conf.args.line = 0;
2959 }
2960 else if (!strcmp(args[0], "event")) {
2961 if (!*args[1]) {
2962 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2963 err_code |= ERR_ALERT | ERR_ABORT;
2964 goto out;
2965 }
2966 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2967 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2968 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2969 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2970
2971 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2972 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2973 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2974 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2975 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2976 curmsg->event = SPOE_EV_ON_TCP_RSP;
2977
2978 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2979 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2980 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2981 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2982 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2983 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2984 else {
2985 Alert("parsing [%s:%d] : unkown event '%s'.\n",
2986 file, linenum, args[1]);
2987 err_code |= ERR_ALERT | ERR_ABORT;
2988 goto out;
2989 }
2990 }
2991 else if (!*args[0]) {
2992 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
2993 file, linenum, args[0]);
2994 err_code |= ERR_ALERT | ERR_FATAL;
2995 goto out;
2996 }
2997 out:
2998 free(errmsg);
2999 return err_code;
3000}
3001
3002/* Return -1 on error, else 0 */
3003static int
3004parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
3005 struct flt_conf *fconf, char **err, void *private)
3006{
3007 struct list backup_sections;
3008 struct spoe_config *conf;
3009 struct spoe_message *msg, *msgback;
3010 struct spoe_msg_placeholder *mp, *mpback;
3011 char *file = NULL, *engine = NULL;
3012 int ret, pos = *cur_arg + 1;
3013
3014 conf = calloc(1, sizeof(*conf));
3015 if (conf == NULL) {
3016 memprintf(err, "%s: out of memory", args[*cur_arg]);
3017 goto error;
3018 }
3019 conf->proxy = px;
3020
3021 while (*args[pos]) {
3022 if (!strcmp(args[pos], "config")) {
3023 if (!*args[pos+1]) {
3024 memprintf(err, "'%s' : '%s' option without value",
3025 args[*cur_arg], args[pos]);
3026 goto error;
3027 }
3028 file = args[pos+1];
3029 pos += 2;
3030 }
3031 else if (!strcmp(args[pos], "engine")) {
3032 if (!*args[pos+1]) {
3033 memprintf(err, "'%s' : '%s' option without value",
3034 args[*cur_arg], args[pos]);
3035 goto error;
3036 }
3037 engine = args[pos+1];
3038 pos += 2;
3039 }
3040 else {
3041 memprintf(err, "unknown keyword '%s'", args[pos]);
3042 goto error;
3043 }
3044 }
3045 if (file == NULL) {
3046 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3047 goto error;
3048 }
3049
3050 /* backup sections and register SPOE sections */
3051 LIST_INIT(&backup_sections);
3052 cfg_backup_sections(&backup_sections);
3053 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
3054 cfg_register_section("spoe-message", cfg_parse_spoe_message);
3055
3056 /* Parse SPOE filter configuration file */
3057 curengine = engine;
3058 curproxy = px;
3059 curagent = NULL;
3060 curmsg = NULL;
3061 ret = readcfgfile(file);
3062 curproxy = NULL;
3063
3064 /* unregister SPOE sections and restore previous sections */
3065 cfg_unregister_sections();
3066 cfg_restore_sections(&backup_sections);
3067
3068 if (ret == -1) {
3069 memprintf(err, "Could not open configuration file %s : %s",
3070 file, strerror(errno));
3071 goto error;
3072 }
3073 if (ret & (ERR_ABORT|ERR_FATAL)) {
3074 memprintf(err, "Error(s) found in configuration file %s", file);
3075 goto error;
3076 }
3077
3078 /* Check SPOE agent */
3079 if (curagent == NULL) {
3080 memprintf(err, "No SPOE agent found in file %s", file);
3081 goto error;
3082 }
3083 if (curagent->b.name == NULL) {
3084 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3085 curagent->id, curagent->conf.file, curagent->conf.line);
3086 goto error;
3087 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003088 if (curagent->timeout.hello == TICK_ETERNITY ||
3089 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003090 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003091 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3092 " | While not properly invalid, you will certainly encounter various problems\n"
3093 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003094 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003095 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3096 }
3097 if (curagent->var_pfx == NULL) {
3098 char *tmp = curagent->id;
3099
3100 while (*tmp) {
3101 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3102 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3103 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3104 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3105 goto error;
3106 }
3107 tmp++;
3108 }
3109 curagent->var_pfx = strdup(curagent->id);
3110 }
3111
3112 if (LIST_ISEMPTY(&curmps)) {
3113 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3114 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3115 goto finish;
3116 }
3117
3118 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3119 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3120 if (!strcmp(msg->id, mp->id)) {
3121 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3122 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3123 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3124 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3125 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3126 }
3127 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3128 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3129 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3130 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3131 px->id, msg->conf.file, msg->conf.line);
3132 goto next;
3133 }
3134 if (msg->event == SPOE_EV_NONE) {
3135 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3136 px->id, msg->conf.file, msg->conf.line);
3137 goto next;
3138 }
3139 msg->agent = curagent;
3140 LIST_DEL(&msg->list);
3141 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3142 goto next;
3143 }
3144 }
3145 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3146 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3147 goto error;
3148 next:
3149 continue;
3150 }
3151
3152 finish:
3153 conf->agent = curagent;
3154 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3155 LIST_DEL(&mp->list);
3156 release_spoe_msg_placeholder(mp);
3157 }
3158 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3159 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3160 px->id, msg->id, msg->conf.file, msg->conf.line);
3161 LIST_DEL(&msg->list);
3162 release_spoe_message(msg);
3163 }
3164
3165 *cur_arg = pos;
3166 fconf->ops = &spoe_ops;
3167 fconf->conf = conf;
3168 return 0;
3169
3170 error:
3171 release_spoe_agent(curagent);
3172 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3173 LIST_DEL(&mp->list);
3174 release_spoe_msg_placeholder(mp);
3175 }
3176 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3177 LIST_DEL(&msg->list);
3178 release_spoe_message(msg);
3179 }
3180 free(conf);
3181 return -1;
3182}
3183
3184
3185/* Declare the filter parser for "spoe" keyword */
3186static struct flt_kw_list flt_kws = { "SPOE", { }, {
3187 { "spoe", parse_spoe_flt, NULL },
3188 { NULL, NULL, NULL },
3189 }
3190};
3191
3192__attribute__((constructor))
3193static void __spoe_init(void)
3194{
3195 flt_register_keywords(&flt_kws);
3196
3197 LIST_INIT(&curmsgs);
3198 LIST_INIT(&curmps);
3199 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3200}
3201
3202__attribute__((destructor))
3203static void
3204__spoe_deinit(void)
3205{
3206 pool_destroy2(pool2_spoe_ctx);
3207}