blob: ba76611179f73d184fc291a3bfc85692808ea43a [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
Christopher Faulet48026722016-11-16 15:01:12 +010033#include <proto/freq_ctr.h>
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020034#include <proto/frontend.h>
35#include <proto/log.h>
36#include <proto/proto_http.h>
37#include <proto/proxy.h>
38#include <proto/sample.h>
39#include <proto/session.h>
40#include <proto/signal.h>
41#include <proto/stream.h>
42#include <proto/stream_interface.h>
43#include <proto/task.h>
44#include <proto/vars.h>
45
46#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47#define SPOE_PRINTF(x...) fprintf(x)
48#else
49#define SPOE_PRINTF(x...)
50#endif
51
52/* Helper to get ctx inside an appctx */
53#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
54
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020055/* Minimal size for a frame */
56#define MIN_FRAME_SIZE 256
57
Christopher Fauletea62c2a2016-11-14 10:54:21 +010058/* Flags set on the SPOE agent */
59#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
60
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020061/* Flags set on the SPOE context */
62#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
63#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
64#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
65#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
66
67#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
68
69#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
70#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
71
72/* All possible states for a SPOE context */
73enum spoe_ctx_state {
74 SPOE_CTX_ST_NONE = 0,
75 SPOE_CTX_ST_READY,
76 SPOE_CTX_ST_SENDING_MSGS,
77 SPOE_CTX_ST_WAITING_ACK,
78 SPOE_CTX_ST_DONE,
79 SPOE_CTX_ST_ERROR,
80};
81
82/* All possible states for a SPOE applet */
83enum spoe_appctx_state {
84 SPOE_APPCTX_ST_CONNECT = 0,
85 SPOE_APPCTX_ST_CONNECTING,
86 SPOE_APPCTX_ST_PROCESSING,
87 SPOE_APPCTX_ST_DISCONNECT,
88 SPOE_APPCTX_ST_DISCONNECTING,
89 SPOE_APPCTX_ST_EXIT,
90 SPOE_APPCTX_ST_END,
91};
92
93/* All supported SPOE actions */
94enum spoe_action_type {
95 SPOE_ACT_T_SET_VAR = 1,
96 SPOE_ACT_T_UNSET_VAR,
97 SPOE_ACT_TYPES,
98};
99
100/* All supported SPOE events */
101enum spoe_event {
102 SPOE_EV_NONE = 0,
103
104 /* Request events */
105 SPOE_EV_ON_CLIENT_SESS = 1,
106 SPOE_EV_ON_TCP_REQ_FE,
107 SPOE_EV_ON_TCP_REQ_BE,
108 SPOE_EV_ON_HTTP_REQ_FE,
109 SPOE_EV_ON_HTTP_REQ_BE,
110
111 /* Response events */
112 SPOE_EV_ON_SERVER_SESS,
113 SPOE_EV_ON_TCP_RSP,
114 SPOE_EV_ON_HTTP_RSP,
115
116 SPOE_EV_EVENTS
117};
118
119/* Errors triggerd by SPOE applet */
120enum spoe_frame_error {
121 SPOE_FRM_ERR_NONE = 0,
122 SPOE_FRM_ERR_IO,
123 SPOE_FRM_ERR_TOUT,
124 SPOE_FRM_ERR_TOO_BIG,
125 SPOE_FRM_ERR_INVALID,
126 SPOE_FRM_ERR_NO_VSN,
127 SPOE_FRM_ERR_NO_FRAME_SIZE,
128 SPOE_FRM_ERR_NO_CAP,
129 SPOE_FRM_ERR_BAD_VSN,
130 SPOE_FRM_ERR_BAD_FRAME_SIZE,
131 SPOE_FRM_ERR_UNKNOWN = 99,
132 SPOE_FRM_ERRS,
133};
134
135/* Scopes used for variables set by agents. It is a way to be agnotic to vars
136 * scope. */
137enum spoe_vars_scope {
138 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
139 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
140 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
141 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
142 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
143};
144
145
146/* Describe an argument that will be linked to a message. It is a sample fetch,
147 * with an optional name. */
148struct spoe_arg {
149 char *name; /* Name of the argument, may be NULL */
150 unsigned int name_len; /* The name length, 0 if NULL */
151 struct sample_expr *expr; /* Sample expression */
152 struct list list; /* Used to chain SPOE args */
153};
154
155/* Used during the config parsing only because, when a SPOE agent section is
156 * parsed, messages can be undefined. */
157struct spoe_msg_placeholder {
158 char *id; /* SPOE message placeholder id */
159 struct list list; /* Use to chain SPOE message placeholders */
160};
161
162/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
163 * an argument list (see above) and it is linked to a specific event. */
164struct spoe_message {
165 char *id; /* SPOE message id */
166 unsigned int id_len; /* The message id length */
167 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
168 struct {
169 char *file; /* file where the SPOE message appears */
170 int line; /* line where the SPOE message appears */
171 } conf; /* config information */
172 struct list args; /* Arguments added when the SPOE messages is sent */
173 struct list list; /* Used to chain SPOE messages */
174
175 enum spoe_event event; /* SPOE_EV_* */
176};
177
178/* Describe a SPOE agent. */
179struct spoe_agent {
180 char *id; /* SPOE agent id (name) */
181 struct {
182 char *file; /* file where the SPOE agent appears */
183 int line; /* line where the SPOE agent appears */
184 } conf; /* config information */
185 union {
186 struct proxy *be; /* Backend used by this agent */
187 char *name; /* Backend name used during conf parsing */
188 } b;
189 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100190 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
191 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100192 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200193 } timeout;
194
195 char *var_pfx; /* Prefix used for vars set by the agent */
Christopher Fauletea62c2a2016-11-14 10:54:21 +0100196 unsigned int flags; /* SPOE_FL_* */
Christopher Faulet48026722016-11-16 15:01:12 +0100197 unsigned int cps_max; /* Maximum number of connections per second */
198 unsigned int eps_max; /* Maximum number of errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200199
200 struct list cache; /* List used to cache SPOE streams. In
201 * fact, we cache the SPOE applect ctx */
202
203 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
204 * for each supported events */
205
206 struct list applet_wq; /* List of streams waiting for a SPOE applet */
Christopher Faulet48026722016-11-16 15:01:12 +0100207 struct freq_ctr conn_per_sec; /* connections per second */
208 struct freq_ctr err_per_sec; /* connetion errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200209};
210
211/* SPOE filter configuration */
212struct spoe_config {
213 struct proxy *proxy; /* Proxy owning the filter */
214 struct spoe_agent *agent; /* Agent used by this filter */
215 struct proxy agent_fe; /* Agent frontend */
216};
217
218/* SPOE context attached to a stream. It is the main structure that handles the
219 * processing offload */
220struct spoe_context {
221 struct filter *filter; /* The SPOE filter */
222 struct stream *strm; /* The stream that should be offloaded */
223 struct appctx *appctx; /* The SPOE appctx */
224 struct list *messages; /* List of messages that will be sent during the stream processing */
225 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
226 struct list buffer_wait; /* position in the list of streams waiting for a buffer */
227 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
228
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200229 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
230 unsigned int flags; /* SPOE_CTX_FL_* */
231
232 unsigned int stream_id; /* stream_id and frame_id are used */
233 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100234 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200235};
236
237/* Set if the handle on SIGUSR1 is registered */
238static int sighandler_registered = 0;
239
240/* proxy used during the parsing */
241struct proxy *curproxy = NULL;
242
243/* The name of the SPOE engine, used during the parsing */
244char *curengine = NULL;
245
246/* SPOE agent used during the parsing */
247struct spoe_agent *curagent = NULL;
248
249/* SPOE message used during the parsing */
250struct spoe_message *curmsg = NULL;
251
252/* list of SPOE messages and placeholders used during the parsing */
253struct list curmsgs;
254struct list curmps;
255
256/* Pool used to allocate new SPOE contexts */
257static struct pool_head *pool2_spoe_ctx = NULL;
258
259/* Temporary variables used to ease error processing */
260int spoe_status_code = SPOE_FRM_ERR_NONE;
261char spoe_reason[256];
262
263struct flt_ops spoe_ops;
264
265static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
266static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
267static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
268
269/********************************************************************
270 * helper functions/globals
271 ********************************************************************/
272static void
273release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
274{
275 if (!mp)
276 return;
277 free(mp->id);
278 free(mp);
279}
280
281
282static void
283release_spoe_message(struct spoe_message *msg)
284{
285 struct spoe_arg *arg, *back;
286
287 if (!msg)
288 return;
289 free(msg->id);
290 free(msg->conf.file);
291 list_for_each_entry_safe(arg, back, &msg->args, list) {
292 release_sample_expr(arg->expr);
293 free(arg->name);
294 LIST_DEL(&arg->list);
295 free(arg);
296 }
297 free(msg);
298}
299
300static void
301release_spoe_agent(struct spoe_agent *agent)
302{
303 struct spoe_message *msg, *back;
304 int i;
305
306 if (!agent)
307 return;
308 free(agent->id);
309 free(agent->conf.file);
310 free(agent->var_pfx);
311 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
312 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
313 LIST_DEL(&msg->list);
314 release_spoe_message(msg);
315 }
316 }
317 free(agent);
318}
319
320static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
321 [SPOE_FRM_ERR_NONE] = "normal",
322 [SPOE_FRM_ERR_IO] = "I/O error",
323 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
324 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
325 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
326 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
327 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
328 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
329 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
330 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
331 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
332};
333
334static const char *spoe_event_str[SPOE_EV_EVENTS] = {
335 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
336 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
337 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
338 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
339 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
340
341 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
342 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
343 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
344};
345
346
347#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
348
349static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
350 [SPOE_CTX_ST_NONE] = "NONE",
351 [SPOE_CTX_ST_READY] = "READY",
352 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
353 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
354 [SPOE_CTX_ST_DONE] = "DONE",
355 [SPOE_CTX_ST_ERROR] = "ERROR",
356};
357
358static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
359 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
360 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
361 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
362 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
363 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
364 [SPOE_APPCTX_ST_EXIT] = "EXIT",
365 [SPOE_APPCTX_ST_END] = "END",
366};
367
368#endif
369/********************************************************************
370 * Functions that encode/decode SPOE frames
371 ********************************************************************/
372/* Frame Types sent by HAProxy and by agents */
373enum spoe_frame_type {
374 /* Frames sent by HAProxy */
375 SPOE_FRM_T_HAPROXY_HELLO = 1,
376 SPOE_FRM_T_HAPROXY_DISCON,
377 SPOE_FRM_T_HAPROXY_NOTIFY,
378
379 /* Frames sent by the agents */
380 SPOE_FRM_T_AGENT_HELLO = 101,
381 SPOE_FRM_T_AGENT_DISCON,
382 SPOE_FRM_T_AGENT_ACK
383};
384
385/* All supported data types */
386enum spoe_data_type {
387 SPOE_DATA_T_NULL = 0,
388 SPOE_DATA_T_BOOL,
389 SPOE_DATA_T_INT32,
390 SPOE_DATA_T_UINT32,
391 SPOE_DATA_T_INT64,
392 SPOE_DATA_T_UINT64,
393 SPOE_DATA_T_IPV4,
394 SPOE_DATA_T_IPV6,
395 SPOE_DATA_T_STR,
396 SPOE_DATA_T_BIN,
397 SPOE_DATA_TYPES
398};
399
400/* Masks to get data type or flags value */
401#define SPOE_DATA_T_MASK 0x0F
402#define SPOE_DATA_FL_MASK 0xF0
403
404/* Flags to set Boolean values */
405#define SPOE_DATA_FL_FALSE 0x00
406#define SPOE_DATA_FL_TRUE 0x10
407
408/* Helper to get static string length, excluding the terminating null byte */
409#define SLEN(str) (sizeof(str)-1)
410
411/* Predefined key used in HELLO/DISCONNECT frames */
412#define SUPPORTED_VERSIONS_KEY "supported-versions"
413#define VERSION_KEY "version"
414#define MAX_FRAME_SIZE_KEY "max-frame-size"
415#define CAPABILITIES_KEY "capabilities"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100416#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200417#define STATUS_CODE_KEY "status-code"
418#define MSG_KEY "message"
419
420struct spoe_version {
421 char *str;
422 int min;
423 int max;
424};
425
426/* All supported versions */
427static struct spoe_version supported_versions[] = {
428 {"1.0", 1000, 1000},
429 {NULL, 0, 0}
430};
431
432/* Comma-separated list of supported versions */
433#define SUPPORTED_VERSIONS_VAL "1.0"
434
435/* Comma-separated list of supported capabilities (none for now) */
436#define CAPABILITIES_VAL ""
437
438static int
439decode_spoe_version(const char *str, size_t len)
440{
441 char tmp[len+1], *start, *end;
442 double d;
443 int vsn = -1;
444
445 memset(tmp, 0, len+1);
446 memcpy(tmp, str, len);
447
448 start = tmp;
449 while (isspace(*start))
450 start++;
451
452 d = strtod(start, &end);
453 if (d == 0 || start == end)
454 goto out;
455
456 if (*end) {
457 while (isspace(*end))
458 end++;
459 if (*end)
460 goto out;
461 }
462 vsn = (int)(d * 1000);
463 out:
464 return vsn;
465}
466
467/* Encode a variable-length integer. This function never fails and returns the
468 * number of written bytes. */
469static int
470encode_spoe_varint(uint64_t i, char *buf)
471{
472 int idx;
473
474 if (i < 240) {
475 buf[0] = (unsigned char)i;
476 return 1;
477 }
478
479 buf[0] = (unsigned char)i | 240;
480 i = (i - 240) >> 4;
481 for (idx = 1; i >= 128; ++idx) {
482 buf[idx] = (unsigned char)i | 128;
483 i = (i - 128) >> 7;
484 }
485 buf[idx++] = (unsigned char)i;
486 return idx;
487}
488
489/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
490 * happens when the buffer's end in reached. On success, the number of read
491 * bytes is returned. */
492static int
493decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
494{
495 unsigned char *msg = (unsigned char *)buf;
496 int idx = 0;
497
498 if (msg > (unsigned char *)end)
499 return -1;
500
501 if (msg[0] < 240) {
502 *i = msg[0];
503 return 1;
504 }
505 *i = msg[0];
506 do {
507 ++idx;
508 if (msg+idx > (unsigned char *)end)
509 return -1;
510 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
511 } while (msg[idx] >= 128);
512 return (idx + 1);
513}
514
515/* Encode a string. The string will be prefix by its length, encoded as a
516 * variable-length integer. This function never fails and returns the number of
517 * written bytes. */
518static int
519encode_spoe_string(const char *str, size_t len, char *dst)
520{
521 int idx = 0;
522
523 if (!len) {
524 dst[0] = 0;
525 return 1;
526 }
527
528 idx += encode_spoe_varint(len, dst);
529 memcpy(dst+idx, str, len);
530 return (idx + len);
531}
532
533/* Decode a string. Its length is decoded first as a variable-length integer. If
534 * it succeeds, and if the string length is valid, the begin of the string is
535 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
536 * read is returned. If an error occurred, -1 is returned and <*str> remains
537 * NULL. */
538static int
539decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
540{
541 int i, idx = 0;
542
543 *str = NULL;
544 *len = 0;
545
546 if ((i = decode_spoe_varint(buf, end, len)) == -1)
547 goto error;
548 idx += i;
549 if (buf + idx + *len > end)
550 goto error;
551
552 *str = buf+idx;
553 return (idx + *len);
554
555 error:
556 return -1;
557}
558
559/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
560 * of bytes read is returned. A types data is composed of a type (1 byte) and
561 * corresponding data:
562 * - boolean: non additional data (0 bytes)
563 * - integers: a variable-length integer (see decode_spoe_varint)
564 * - ipv4: 4 bytes
565 * - ipv6: 16 bytes
566 * - binary and string: a buffer prefixed by its size, a variable-length
567 * integer (see decode_spoe_string) */
568static int
569skip_spoe_data(char *frame, char *end)
570{
571 uint64_t sz = 0;
572 int i, idx = 0;
573
574 if (frame > end)
575 return -1;
576
577 switch (frame[idx++] & SPOE_DATA_T_MASK) {
578 case SPOE_DATA_T_BOOL:
579 break;
580 case SPOE_DATA_T_INT32:
581 case SPOE_DATA_T_INT64:
582 case SPOE_DATA_T_UINT32:
583 case SPOE_DATA_T_UINT64:
584 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
585 return -1;
586 idx += i;
587 break;
588 case SPOE_DATA_T_IPV4:
589 idx += 4;
590 break;
591 case SPOE_DATA_T_IPV6:
592 idx += 16;
593 break;
594 case SPOE_DATA_T_STR:
595 case SPOE_DATA_T_BIN:
596 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
597 return -1;
598 idx += i + sz;
599 break;
600 }
601
602 if (frame+idx > end)
603 return -1;
604 return idx;
605}
606
607/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
608 * number of read bytes is returned. See skip_spoe_data for details. */
609static int
610decode_spoe_data(char *frame, char *end, struct sample *smp)
611{
612 uint64_t sz = 0;
613 int type, i, idx = 0;
614
615 if (frame > end)
616 return -1;
617
618 type = frame[idx++];
619 switch (type & SPOE_DATA_T_MASK) {
620 case SPOE_DATA_T_BOOL:
621 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
622 smp->data.type = SMP_T_BOOL;
623 break;
624 case SPOE_DATA_T_INT32:
625 case SPOE_DATA_T_INT64:
626 case SPOE_DATA_T_UINT32:
627 case SPOE_DATA_T_UINT64:
628 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
629 return -1;
630 idx += i;
631 smp->data.type = SMP_T_SINT;
632 break;
633 case SPOE_DATA_T_IPV4:
634 if (frame+idx+4 > end)
635 return -1;
636 memcpy(&smp->data.u.ipv4, frame+idx, 4);
637 smp->data.type = SMP_T_IPV4;
638 idx += 4;
639 break;
640 case SPOE_DATA_T_IPV6:
641 if (frame+idx+16 > end)
642 return -1;
643 memcpy(&smp->data.u.ipv6, frame+idx, 16);
644 smp->data.type = SMP_T_IPV6;
645 idx += 16;
646 break;
647 case SPOE_DATA_T_STR:
648 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
649 return -1;
650 idx += i;
651 if (frame+idx+sz > end)
652 return -1;
653 smp->data.u.str.str = frame+idx;
654 smp->data.u.str.len = sz;
655 smp->data.type = SMP_T_STR;
656 idx += sz;
657 break;
658 case SPOE_DATA_T_BIN:
659 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
660 return -1;
661 idx += i;
662 if (frame+idx+sz > end)
663 return -1;
664 smp->data.u.str.str = frame+idx;
665 smp->data.u.str.len = sz;
666 smp->data.type = SMP_T_BIN;
667 idx += sz;
668 break;
669 }
670
671 if (frame+idx > end)
672 return -1;
673 return idx;
674}
675
676/* Skip an action in a frame received from an agent. If an error occurred, -1 is
677 * returned, otherwise the number of read bytes is returned. An action is
678 * composed of the action type followed by a typed data. */
679static int
680skip_spoe_action(char *frame, char *end)
681{
682 int n, i, idx = 0;
683
684 if (frame+2 > end)
685 return -1;
686
687 idx++; /* Skip the action type */
688 n = frame[idx++];
689 while (n-- > 0) {
690 if ((i = skip_spoe_data(frame+idx, end)) == -1)
691 return -1;
692 idx += i;
693 }
694
695 if (frame+idx > end)
696 return -1;
697 return idx;
698}
699
700/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
701 * success, 0 if the frame can be ignored and -1 if an error occurred. */
702static int
703prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
704{
705 int idx = 0;
706 size_t max = (7 /* TYPE + METADATA */
707 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
708 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
709 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
710
711 if (size < max)
712 return -1;
713
714 /* Frame type */
715 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
716
717 /* No flags for now */
718 memset(frame+idx, 0, 4);
719 idx += 4;
720
721 /* No stream-id and frame-id for HELLO frames */
722 frame[idx++] = 0;
723 frame[idx++] = 0;
724
725 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
726 * and "capabilities" */
727
728 /* "supported-versions" K/V item */
729 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
730 frame[idx++] = SPOE_DATA_T_STR;
731 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
732
733 /* "max-fram-size" K/V item */
734 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
735 frame[idx++] = SPOE_DATA_T_UINT32;
736 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
737
738 /* "capabilities" K/V item */
739 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
740 frame[idx++] = SPOE_DATA_T_STR;
741 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
742
743 return idx;
744}
745
746/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
747 * size on success, 0 if the frame can be ignored and -1 if an error
748 * occurred. */
749static int
750prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
751{
752 const char *reason;
753 int rlen, idx = 0;
754 size_t max = (7 /* TYPE + METADATA */
755 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
756 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
757
758 if (size < max)
759 return -1;
760
761 /* Get the message corresponding to the status code */
762 if (spoe_status_code >= SPOE_FRM_ERRS)
763 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
764 reason = spoe_frm_err_reasons[spoe_status_code];
765 rlen = strlen(reason);
766
767 /* Frame type */
768 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
769
770 /* No flags for now */
771 memset(frame+idx, 0, 4);
772 idx += 4;
773
774 /* No stream-id and frame-id for DISCONNECT frames */
775 frame[idx++] = 0;
776 frame[idx++] = 0;
777
778 /* There are 2 mandatory items: "status-code" and "message" */
779
780 /* "status-code" K/V item */
781 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
782 frame[idx++] = SPOE_DATA_T_UINT32;
783 idx += encode_spoe_varint(spoe_status_code, frame+idx);
784
785 /* "message" K/V item */
786 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
787 frame[idx++] = SPOE_DATA_T_STR;
788 idx += encode_spoe_string(reason, rlen, frame+idx);
789
790 return idx;
791}
792
793/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
794 * success, 0 if the frame can be ignored and -1 if an error occurred. */
795static int
796prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
797{
798 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
799 int idx = 0;
800
801 if (size < APPCTX_SPOE(appctx).max_frame_size)
802 return -1;
803
804 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
805
806 /* No flags for now */
807 memset(frame+idx, 0, 4);
808 idx += 4;
809
810 /* Set stream-id and frame-id */
811 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
812 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
813
814 /* Copy encoded messages */
815 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
816 idx += ctx->buffer->i;
817
818 return idx;
819}
820
821/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
822 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
823static int
824handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
825{
826 int vsn, max_frame_size;
827 int i, idx = 0;
828 size_t min_size = (7 /* TYPE + METADATA */
829 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
830 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
831 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
832
833 /* Check frame type */
834 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
835 return 0;
836
837 if (size < min_size) {
838 spoe_status_code = SPOE_FRM_ERR_INVALID;
839 return -1;
840 }
841
842 /* Skip flags: fragmentation is not supported for now */
843 idx += 4;
844
845 /* stream-id and frame-id must be cleared */
846 if (frame[idx] != 0 || frame[idx+1] != 0) {
847 spoe_status_code = SPOE_FRM_ERR_INVALID;
848 return -1;
849 }
850 idx += 2;
851
852 /* There are 3 mandatory items: "version", "max-frame-size" and
853 * "capabilities" */
854
855 /* Loop on K/V items */
856 vsn = max_frame_size = 0;
857 while (idx < size) {
858 char *str;
859 uint64_t sz;
860
861 /* Decode the item key */
862 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
863 if (str == NULL) {
864 spoe_status_code = SPOE_FRM_ERR_INVALID;
865 return -1;
866 }
867 /* Check "version" K/V item */
868 if (!memcmp(str, VERSION_KEY, sz)) {
869 /* The value must be a string */
870 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
871 spoe_status_code = SPOE_FRM_ERR_INVALID;
872 return -1;
873 }
874 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
875 if (str == NULL) {
876 spoe_status_code = SPOE_FRM_ERR_INVALID;
877 return -1;
878 }
879
880 vsn = decode_spoe_version(str, sz);
881 if (vsn == -1) {
882 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
883 return -1;
884 }
885 for (i = 0; supported_versions[i].str != NULL; ++i) {
886 if (vsn >= supported_versions[i].min &&
887 vsn <= supported_versions[i].max)
888 break;
889 }
890 if (supported_versions[i].str == NULL) {
891 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
892 return -1;
893 }
894 }
895 /* Check "max-frame-size" K/V item */
896 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
897 int type;
898
899 /* The value must be integer */
900 type = frame[idx++];
901 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
902 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
903 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
904 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
905 spoe_status_code = SPOE_FRM_ERR_INVALID;
906 return -1;
907 }
908 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
909 spoe_status_code = SPOE_FRM_ERR_INVALID;
910 return -1;
911 }
912 idx += i;
913 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
914 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
915 return -1;
916 }
917 max_frame_size = sz;
918 }
919 /* Skip "capabilities" K/V item for now */
920 else {
921 /* Silently ignore unknown item */
922 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
923 spoe_status_code = SPOE_FRM_ERR_INVALID;
924 return -1;
925 }
926 idx += i;
927 }
928 }
929
930 /* Final checks */
931 if (!vsn) {
932 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
933 return -1;
934 }
935 if (!max_frame_size) {
936 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
937 return -1;
938 }
939
940 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
941 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
942 return idx;
943}
944
945/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
946 * bytes on success, 0 if the frame can be ignored and -1 if an error
947 * occurred. */
948static int
949handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
950{
951 int i, idx = 0;
952 size_t min_size = (7 /* TYPE + METADATA */
953 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
954 + 1 + SLEN(MSG_KEY) + 1 + 1);
955
956 /* Check frame type */
957 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
958 return 0;
959
960 if (size < min_size) {
961 spoe_status_code = SPOE_FRM_ERR_INVALID;
962 return -1;
963 }
964
965 /* Skip flags: fragmentation is not supported for now */
966 idx += 4;
967
968 /* stream-id and frame-id must be cleared */
969 if (frame[idx] != 0 || frame[idx+1] != 0) {
970 spoe_status_code = SPOE_FRM_ERR_INVALID;
971 return -1;
972 }
973 idx += 2;
974
975 /* There are 2 mandatory items: "status-code" and "message" */
976
977 /* Loop on K/V items */
978 while (idx < size) {
979 char *str;
980 uint64_t sz;
981
982 /* Decode the item key */
983 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
984 if (str == NULL) {
985 spoe_status_code = SPOE_FRM_ERR_INVALID;
986 return -1;
987 }
988
989 /* Check "status-code" K/V item */
990 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
991 int type;
992
993 /* The value must be an integer */
994 type = frame[idx++];
995 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
996 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
997 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
998 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
999 spoe_status_code = SPOE_FRM_ERR_INVALID;
1000 return -1;
1001 }
1002 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1003 spoe_status_code = SPOE_FRM_ERR_INVALID;
1004 return -1;
1005 }
1006 idx += i;
1007 spoe_status_code = sz;
1008 }
1009
1010 /* Check "message" K/V item */
1011 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1012 /* The value must be a string */
1013 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1014 spoe_status_code = SPOE_FRM_ERR_INVALID;
1015 return -1;
1016 }
1017 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1018 if (str == NULL || sz > 255) {
1019 spoe_status_code = SPOE_FRM_ERR_INVALID;
1020 return -1;
1021 }
1022 memcpy(spoe_reason, str, sz);
1023 spoe_reason[sz] = 0;
1024 }
1025 else {
1026 /* Silently ignore unknown item */
1027 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1028 spoe_status_code = SPOE_FRM_ERR_INVALID;
1029 return -1;
1030 }
1031 idx += i;
1032 }
1033 }
1034
1035 return idx;
1036}
1037
1038
1039/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1040 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1041static int
1042handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1043{
1044 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1045 uint64_t stream_id, frame_id;
1046 int idx = 0;
1047 size_t min_size = (7 /* TYPE + METADATA */);
1048
1049 /* Check frame type */
1050 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1051 return 0;
1052
1053 if (size < min_size) {
1054 spoe_status_code = SPOE_FRM_ERR_INVALID;
1055 return -1;
1056 }
1057
1058 /* Skip flags: fragmentation is not supported for now */
1059 idx += 4;
1060
1061 /* Get the stream-id and the frame-id */
1062 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1063 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1064
1065 /* Check stream-id and frame-id */
1066 if (ctx->stream_id != (unsigned int)stream_id ||
1067 ctx->frame_id != (unsigned int)frame_id)
1068 return 0;
1069
1070 /* Copy encoded actions */
1071 b_reset(ctx->buffer);
1072 memcpy(ctx->buffer->p, frame+idx, size-idx);
1073 ctx->buffer->i = size-idx;
1074
1075 return idx;
1076}
1077
Christopher Fauletba7bc162016-11-07 21:07:38 +01001078/* This function is used in cfgparse.c and declared in proto/checks.h. It
1079 * prepare the request to send to agents during a healthcheck. It returns 0 on
1080 * success and -1 if an error occurred. */
1081int
1082prepare_spoe_healthcheck_request(char **req, int *len)
1083{
1084 struct appctx a;
1085 char *frame, buf[global.tune.bufsize];
1086 unsigned int framesz;
1087 int idx;
1088
1089 memset(&a, 0, sizeof(a));
1090 memset(buf, 0, sizeof(buf));
1091 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1092
1093 frame = buf+4;
1094 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1095 if (idx <= 0)
1096 return -1;
1097 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1098 return -1;
1099
1100 /* "healthcheck" K/V item */
1101 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1102 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1103
1104 framesz = htonl(idx);
1105 memcpy(buf, (char *)&framesz, 4);
1106
1107 if ((*req = malloc(idx+4)) == NULL)
1108 return -1;
1109 memcpy(*req, buf, idx+4);
1110 *len = idx+4;
1111 return 0;
1112}
1113
1114/* This function is used in checks.c and declared in proto/checks.h. It decode
1115 * the response received from an agent during a healthcheck. It returns 0 on
1116 * success and -1 if an error occurred. */
1117int
1118handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1119{
1120 struct appctx a;
1121 int r;
1122
1123 memset(&a, 0, sizeof(a));
1124 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1125
1126 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1127 goto error;
1128 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1129 if (r == 0)
1130 spoe_status_code = SPOE_FRM_ERR_INVALID;
1131 goto error;
1132 }
1133
1134 return 0;
1135
1136 error:
1137 if (spoe_status_code >= SPOE_FRM_ERRS)
1138 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1139 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1140 return -1;
1141}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001142
1143/********************************************************************
1144 * Functions that manage the SPOE applet
1145 ********************************************************************/
1146/* Callback function that catches applet timeouts. If a timeout occurred, we set
1147 * <appctx->st1> flag and the SPOE applet is woken up. */
1148static struct task *
1149process_spoe_applet(struct task * task)
1150{
1151 struct appctx *appctx = task->context;
1152
1153 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1154 if (tick_is_expired(task->expire, now_ms)) {
1155 task->expire = TICK_ETERNITY;
1156 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1157 }
1158 si_applet_want_get(appctx->owner);
1159 appctx_wakeup(appctx);
1160 return task;
1161}
1162
1163/* Remove a SPOE applet from the agent cache */
1164static void
1165remove_spoe_applet_from_cache(struct appctx *appctx)
1166{
1167 struct appctx *a, *back;
1168 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1169
1170 if (LIST_ISEMPTY(&agent->cache))
1171 return;
1172
1173 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1174 if (a == appctx) {
1175 LIST_DEL(&APPCTX_SPOE(appctx).list);
1176 break;
1177 }
1178 }
1179}
1180
1181
1182/* Callback function that releases a SPOE applet. This happens when the
1183 * connection with the agent is closed. */
1184static void
1185release_spoe_applet(struct appctx *appctx)
1186{
1187 struct stream_interface *si = appctx->owner;
1188 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1189 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1190
1191 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1192 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1193 on_new_spoe_appctx_failure(agent);
1194
1195 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1196 si_shutw(si);
1197 si_shutr(si);
1198 si_ic(si)->flags |= CF_READ_NULL;
1199 appctx->st0 = SPOE_APPCTX_ST_END;
1200 }
1201
1202 if (ctx != NULL) {
1203 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1204 ctx->appctx = NULL;
1205 }
1206
1207 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1208 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1209 __FUNCTION__, appctx);
1210
1211 /* Release the task attached to the SPOE applet */
1212 if (APPCTX_SPOE(appctx).task) {
1213 task_delete(APPCTX_SPOE(appctx).task);
1214 task_free(APPCTX_SPOE(appctx).task);
1215 }
1216
1217 /* And remove it from the agent cache */
1218 remove_spoe_applet_from_cache(appctx);
1219 APPCTX_SPOE(appctx).ctx = NULL;
1220}
1221
1222/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1223 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1224 * encoded using the callback function <prepare>. */
1225static int
1226send_spoe_frame(struct appctx *appctx,
1227 int (*prepare)(struct appctx *, char *, size_t))
1228{
1229 struct stream_interface *si = appctx->owner;
1230 int framesz, ret;
1231 uint32_t netint;
1232
1233 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1234 if (ret <= 0)
1235 goto skip_or_error;
1236 framesz = ret;
1237 netint = htonl(framesz);
1238 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1239 if (ret > 0)
1240 ret = bi_putblk(si_ic(si), trash.str, framesz);
1241 if (ret <= 0) {
1242 if (ret == -1)
1243 return -1;
1244 return -2;
1245 }
1246 return 1;
1247
1248 skip_or_error:
1249 if (!ret)
1250 return -1;
1251 return -2;
1252}
1253
1254/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1255 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1256 * is decoded using the callback function <handle>. */
1257static int
1258recv_spoe_frame(struct appctx *appctx,
1259 int (*handle)(struct appctx *, char *, size_t))
1260{
1261 struct stream_interface *si = appctx->owner;
1262 int framesz, ret;
1263 uint32_t netint;
1264
1265 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1266 if (ret <= 0)
1267 goto empty_or_error;
1268 framesz = ntohl(netint);
1269 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1270 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1271 return -2;
1272 }
1273
1274 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1275 if (ret <= 0)
1276 goto empty_or_error;
1277 bo_skip(si_oc(si), ret+sizeof(netint));
1278
1279 /* First check if the received frame is a DISCONNECT frame */
1280 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1281 if (ret != 0) {
1282 if (ret > 0) {
1283 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1284 " - disconnected by peer (%d): %s\n",
1285 (int)now.tv_sec, (int)now.tv_usec,
1286 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1287 __FUNCTION__, appctx, spoe_status_code,
1288 spoe_reason);
1289 return 2;
1290 }
1291 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1292 " - error on frame (%s)\n",
1293 (int)now.tv_sec, (int)now.tv_usec,
1294 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1295 __FUNCTION__, appctx,
1296 spoe_frm_err_reasons[spoe_status_code]);
1297 return -2;
1298 }
1299 if (handle == NULL)
1300 goto out;
1301
1302 /* If not, try to decode it */
1303 ret = handle(appctx, trash.str, framesz);
1304 if (ret <= 0) {
1305 if (!ret)
1306 return -1;
1307 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1308 " - error on frame (%s)\n",
1309 (int)now.tv_sec, (int)now.tv_usec,
1310 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1311 __FUNCTION__, appctx,
1312 spoe_frm_err_reasons[spoe_status_code]);
1313 return -2;
1314 }
1315 out:
1316 return 1;
1317
1318 empty_or_error:
1319 if (!ret)
1320 return 0;
1321 spoe_status_code = SPOE_FRM_ERR_IO;
1322 return -2;
1323}
1324
1325/* I/O Handler processing messages exchanged with the agent */
1326static void
1327handle_spoe_applet(struct appctx *appctx)
1328{
1329 struct stream_interface *si = appctx->owner;
1330 struct stream *s = si_strm(si);
1331 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1332 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1333 int ret;
1334
1335 switchstate:
1336 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1337 " - appctx-state=%s\n",
1338 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1339 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1340
1341 switch (appctx->st0) {
1342 case SPOE_APPCTX_ST_CONNECT:
1343 spoe_status_code = SPOE_FRM_ERR_NONE;
1344 if (si->state <= SI_ST_CON) {
1345 si_applet_want_put(si);
1346 task_wakeup(s->task, TASK_WOKEN_MSG);
1347 break;
1348 }
1349 else if (si->state != SI_ST_EST) {
1350 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1351 on_new_spoe_appctx_failure(agent);
1352 goto switchstate;
1353 }
1354 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1355 if (ret < 0) {
1356 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1357 on_new_spoe_appctx_failure(agent);
1358 goto switchstate;
1359 }
1360 else if (!ret)
1361 goto full;
1362
1363 /* Hello frame was sent. Set the hello timeout and
1364 * wait for the reply. */
1365 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1366 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1367 /* fall through */
1368
1369 case SPOE_APPCTX_ST_CONNECTING:
1370 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1371 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1372 on_new_spoe_appctx_failure(agent);
1373 goto switchstate;
1374 }
1375 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1376 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1377 " - Connection timed out\n",
1378 (int)now.tv_sec, (int)now.tv_usec,
1379 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1380 __FUNCTION__, appctx);
1381 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1382 on_new_spoe_appctx_failure(agent);
1383 goto switchstate;
1384 }
1385 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1386 if (ret < 0) {
1387 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1388 on_new_spoe_appctx_failure(agent);
1389 goto switchstate;
1390 }
1391 if (ret == 2) {
1392 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1393 on_new_spoe_appctx_failure(agent);
1394 goto switchstate;
1395 }
1396 if (!ret)
1397 goto out;
1398
1399 /* hello handshake is finished, set the idle timeout,
1400 * Add the appctx in the agent cache, decrease the
1401 * number of new applets and wake up waiting streams. */
1402 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1403 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1404 on_new_spoe_appctx_success(agent, appctx);
1405 break;
1406
1407 case SPOE_APPCTX_ST_PROCESSING:
1408 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1409 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1410 goto switchstate;
1411 }
1412 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1413 spoe_status_code = SPOE_FRM_ERR_TOUT;
1414 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1415 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1416 goto switchstate;
1417 }
1418 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1419 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1420 if (ret < 0) {
1421 if (ret == -1) {
1422 ctx->state = SPOE_CTX_ST_ERROR;
1423 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1424 goto skip_notify_frame;
1425 }
1426 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1427 goto switchstate;
1428 }
1429 else if (!ret)
1430 goto full;
1431 ctx->state = SPOE_CTX_ST_WAITING_ACK;
Christopher Faulet03a34492016-11-19 16:47:56 +01001432 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001433 }
1434
1435 skip_notify_frame:
1436 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1437 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1438 if (ret < 0) {
1439 if (ret == -1)
1440 goto skip_notify_frame;
1441 ctx->state = SPOE_CTX_ST_ERROR;
1442 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1443 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1444 goto switchstate;
1445 }
1446 if (!ret)
1447 goto out;
1448 if (ret == 2) {
1449 ctx->state = SPOE_CTX_ST_ERROR;
1450 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1451 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1452 goto switchstate;
1453 }
1454 ctx->state = SPOE_CTX_ST_DONE;
1455 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1456 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1457 }
1458 else {
1459 if (stopping) {
1460 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1461 goto switchstate;
1462 }
1463
1464 ret = recv_spoe_frame(appctx, NULL);
1465 if (ret < 0) {
1466 if (ret == -1)
1467 goto skip_notify_frame;
1468 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1469 goto switchstate;
1470 }
1471 if (!ret)
1472 goto out;
1473 if (ret == 2) {
1474 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1475 goto switchstate;
1476 }
1477 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1478 }
1479 break;
1480
1481 case SPOE_APPCTX_ST_DISCONNECT:
1482 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1483 if (ret < 0) {
1484 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1485 goto switchstate;
1486 }
1487 else if (!ret)
1488 goto full;
1489 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1490 " - disconnected by HAProxy (%d): %s\n",
1491 (int)now.tv_sec, (int)now.tv_usec,
1492 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1493 __FUNCTION__, appctx, spoe_status_code,
1494 spoe_frm_err_reasons[spoe_status_code]);
1495
Christopher Faulet03a34492016-11-19 16:47:56 +01001496 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001497 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1498 /* fall through */
1499
1500 case SPOE_APPCTX_ST_DISCONNECTING:
1501 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1502 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1503 goto switchstate;
1504 }
1505 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1506 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1507 goto switchstate;
1508 }
1509 ret = recv_spoe_frame(appctx, NULL);
1510 if (ret < 0 || ret == 2) {
1511 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1512 goto switchstate;
1513 }
1514 break;
1515
1516 case SPOE_APPCTX_ST_EXIT:
1517 si_shutw(si);
1518 si_shutr(si);
1519 si_ic(si)->flags |= CF_READ_NULL;
1520 appctx->st0 = SPOE_APPCTX_ST_END;
1521 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1522 /* fall through */
1523
1524 case SPOE_APPCTX_ST_END:
1525 break;
1526 }
1527
1528 out:
1529 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1530 task_queue(APPCTX_SPOE(appctx).task);
1531 si_oc(si)->flags |= CF_READ_DONTWAIT;
1532 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1533 return;
1534 full:
1535 si_applet_cant_put(si);
1536 goto out;
1537}
1538
1539struct applet spoe_applet = {
1540 .obj_type = OBJ_TYPE_APPLET,
1541 .name = "<SPOE>", /* used for logging */
1542 .fct = handle_spoe_applet,
1543 .release = release_spoe_applet,
1544};
1545
1546/* Create a SPOE applet. On success, the created applet is returned, else
1547 * NULL. */
1548static struct appctx *
1549create_spoe_appctx(struct spoe_config *conf)
1550{
1551 struct appctx *appctx;
1552 struct session *sess;
1553 struct task *task;
1554 struct stream *strm;
1555 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1556 struct listener *, by_fe);
1557
1558 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1559 goto out_error;
1560
1561 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1562 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1563 goto out_free_appctx;
1564 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1565 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1566 APPCTX_SPOE(appctx).task->context = appctx;
1567 APPCTX_SPOE(appctx).agent = conf->agent;
1568 APPCTX_SPOE(appctx).ctx = NULL;
1569 APPCTX_SPOE(appctx).version = 0;
1570 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1571 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1572
1573 sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1574 if (!sess)
1575 goto out_free_spoe;
1576
1577 if ((task = task_new()) == NULL)
1578 goto out_free_sess;
1579
1580 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1581 goto out_free_task;
1582
1583 strm->target = sess->listener->default_target;
1584 strm->req.analysers |= sess->listener->analysers;
1585 stream_set_backend(strm, conf->agent->b.be);
1586
1587 /* applet is waiting for data */
1588 si_applet_cant_get(&strm->si[0]);
1589 appctx_wakeup(appctx);
1590
Christopher Faulet48026722016-11-16 15:01:12 +01001591 /* Increase the per-process number of cumulated connections */
1592 if (conf->agent->cps_max > 0)
1593 update_freq_ctr(&conf->agent->conn_per_sec, 1);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001594
1595 strm->do_log = NULL;
1596 strm->res.flags |= CF_READ_DONTWAIT;
1597
1598 conf->agent_fe.feconn++;
1599 jobs++;
1600 totalconn++;
1601
1602 return appctx;
1603
1604 /* Error unrolling */
1605 out_free_task:
1606 task_free(task);
1607 out_free_sess:
1608 session_free(sess);
1609 out_free_spoe:
1610 task_free(APPCTX_SPOE(appctx).task);
1611 out_free_appctx:
1612 appctx_free(appctx);
1613 out_error:
1614 return NULL;
1615}
1616
1617/* Wake up a SPOE applet attached to a SPOE context. */
1618static void
1619wakeup_spoe_appctx(struct spoe_context *ctx)
1620{
1621 if (ctx->appctx == NULL)
1622 return;
1623 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1624 si_applet_want_get(ctx->appctx->owner);
1625 si_applet_want_put(ctx->appctx->owner);
1626 appctx_wakeup(ctx->appctx);
1627 }
1628}
1629
1630
1631/* Run across the list of pending streams waiting for a SPOE applet and wake the
1632 * first. */
1633static void
1634offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1635{
1636 struct spoe_context *ctx;
1637
Christopher Fauletf7a30922016-11-10 15:04:51 +01001638 if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1639 return;
1640
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001641 if (LIST_ISEMPTY(&agent->applet_wq))
1642 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1643 else {
1644 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1645 APPCTX_SPOE(appctx).ctx = ctx;
1646 ctx->appctx = appctx;
1647 LIST_DEL(&ctx->applet_wait);
1648 LIST_INIT(&ctx->applet_wait);
1649 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1650 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1651 " - wake up stream to get available SPOE applet\n",
1652 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1653 __FUNCTION__, ctx->strm);
1654 }
1655}
1656
1657/* A failure occurred during SPOE applet creation. */
1658static void
1659on_new_spoe_appctx_failure(struct spoe_agent *agent)
1660{
1661 struct spoe_context *ctx;
1662
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001663 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001664 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1665 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1666 " - wake up stream because to SPOE applet connection failed\n",
1667 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1668 __FUNCTION__, ctx->strm);
1669 }
1670}
1671
1672static void
1673on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1674{
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001675 offer_spoe_appctx(agent, appctx);
1676}
1677/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1678 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1679static int
1680acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1681{
1682 struct spoe_config *conf = FLT_CONF(ctx->filter);
1683 struct spoe_agent *agent = conf->agent;
1684 struct appctx *appctx;
1685
1686 /* If a process is already started for this SPOE context, retry
1687 * later. */
1688 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1689 goto wait;
1690
1691 /* If needed, initialize the buffer that will be used to encode messages
1692 * and decode actions. */
1693 if (ctx->buffer == &buf_empty) {
1694 if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
1695 LIST_DEL(&ctx->buffer_wait);
1696 LIST_INIT(&ctx->buffer_wait);
1697 }
1698
1699 if (!b_alloc_margin(&ctx->buffer, 0)) {
1700 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
1701 goto wait;
1702 }
1703 }
1704
1705 /* If the SPOE applet was already set, all is done. */
1706 if (ctx->appctx)
1707 goto success;
1708
1709 /* Else try to retrieve it from the agent cache */
1710 if (!LIST_ISEMPTY(&agent->cache)) {
1711 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1712 LIST_DEL(&APPCTX_SPOE(appctx).list);
1713 APPCTX_SPOE(appctx).ctx = ctx;
1714 ctx->appctx = appctx;
1715 goto success;
1716 }
1717
Christopher Faulet48026722016-11-16 15:01:12 +01001718 /* If there is no server up for the agent's backend, this is an
1719 * error. */
1720 if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001721 goto error;
1722
1723 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1724 " - waiting for available SPOE appctx\n",
1725 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1726 ctx->strm);
1727
1728 /* Else add the stream in the waiting queue. */
1729 if (LIST_ISEMPTY(&ctx->applet_wait))
1730 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1731
1732 /* Finally, create new SPOE applet if we can */
Christopher Faulet48026722016-11-16 15:01:12 +01001733 if (agent->cps_max > 0) {
1734 if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
1735 goto wait;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001736 }
Christopher Faulet48026722016-11-16 15:01:12 +01001737 if (create_spoe_appctx(conf) == NULL)
1738 goto error;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001739
1740 wait:
1741 return 0;
1742
1743 success:
1744 /* Remove the stream from the waiting queue */
1745 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1746 LIST_DEL(&ctx->applet_wait);
1747 LIST_INIT(&ctx->applet_wait);
1748 }
1749
1750 /* Set the right flag to prevent request and response processing
1751 * in same time. */
1752 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1753 ? SPOE_CTX_FL_REQ_PROCESS
1754 : SPOE_CTX_FL_RSP_PROCESS);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001755
1756 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1757 " - acquire SPOE appctx %p from cache\n",
1758 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1759 __FUNCTION__, ctx->strm, ctx->appctx);
1760 return 1;
1761
1762 error:
1763 /* Remove the stream from the waiting queue */
1764 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1765 LIST_DEL(&ctx->applet_wait);
1766 LIST_INIT(&ctx->applet_wait);
1767 }
1768
1769 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Faulet48026722016-11-16 15:01:12 +01001770 " - failed to acquire SPOE appctx\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001771 (int)now.tv_sec, (int)now.tv_usec, agent->id,
Christopher Faulet48026722016-11-16 15:01:12 +01001772 __FUNCTION__, ctx->strm);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001773 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1774
1775 return -1;
1776}
1777
1778/* Release a SPOE applet and push it in the agent cache. */
1779static void
1780release_spoe_appctx(struct spoe_context *ctx)
1781{
1782 struct spoe_config *conf = FLT_CONF(ctx->filter);
1783 struct spoe_agent *agent = conf->agent;
1784 struct appctx *appctx = ctx->appctx;
1785
1786 /* Reset the flag to allow next processing */
1787 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1788
Christopher Fauletf7a30922016-11-10 15:04:51 +01001789 /* Reset processing timer */
1790 ctx->process_exp = TICK_ETERNITY;
1791
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001792 /* Release the buffer if needed */
1793 if (ctx->buffer != &buf_empty) {
1794 b_free(&ctx->buffer);
1795 if (!LIST_ISEMPTY(&buffer_wq))
1796 stream_offer_buffers();
1797 }
1798
1799 /* If there is no SPOE applet, all is done */
1800 if (!appctx)
1801 return;
1802
1803 /* Else, reassign it or push it in the agent cache */
1804 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1805 " - release SPOE appctx %p\n",
1806 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1807 __FUNCTION__, ctx->strm, appctx);
1808
1809 APPCTX_SPOE(appctx).ctx = NULL;
1810 ctx->appctx = NULL;
1811 offer_spoe_appctx(agent, appctx);
1812}
1813
1814/***************************************************************************
1815 * Functions that process SPOE messages and actions
1816 **************************************************************************/
1817/* Process SPOE messages for a specific event. During the processing, it returns
1818 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1819 * is returned. */
1820static int
1821process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1822 struct list *messages, int dir)
1823{
1824 struct spoe_message *msg;
1825 struct sample *smp;
1826 struct spoe_arg *arg;
1827 char *p;
1828 size_t max_size;
1829 int off, flag, idx = 0;
1830
1831 /* Reserve 32 bytes from the frame Metadata */
1832 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1833
1834 b_reset(ctx->buffer);
1835 p = ctx->buffer->p;
1836
1837 /* Loop on messages */
1838 list_for_each_entry(msg, messages, list) {
1839 if (idx + msg->id_len + 1 > max_size)
1840 goto skip;
1841
1842 /* Set the message name */
1843 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1844
1845 /* Save offset where to store the number of arguments for this
1846 * message */
1847 off = idx++;
1848 p[off] = 0;
1849
1850 /* Loop on arguments */
1851 list_for_each_entry(arg, &msg->args, list) {
1852 p[off]++; /* Increment the number of arguments */
1853
1854 if (idx + arg->name_len + 1 > max_size)
1855 goto skip;
1856
1857 /* Encode the arguement name as a string. It can by NULL */
1858 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1859
1860 /* Fetch the arguement value */
1861 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1862 if (!smp) {
1863 /* If no value is available, set it to NULL */
1864 p[idx++] = SPOE_DATA_T_NULL;
1865 continue;
1866 }
1867
1868 /* Else, encode the arguement value */
1869 switch (smp->data.type) {
1870 case SMP_T_BOOL:
1871 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1872 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1873 break;
1874 case SMP_T_SINT:
1875 p[idx++] = SPOE_DATA_T_INT64;
1876 if (idx + 8 > max_size)
1877 goto skip;
1878 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1879 break;
1880 case SMP_T_IPV4:
1881 p[idx++] = SPOE_DATA_T_IPV4;
1882 if (idx + 4 > max_size)
1883 goto skip;
1884 memcpy(p+idx, &smp->data.u.ipv4, 4);
1885 idx += 4;
1886 break;
1887 case SMP_T_IPV6:
1888 p[idx++] = SPOE_DATA_T_IPV6;
1889 if (idx + 16 > max_size)
1890 goto skip;
1891 memcpy(p+idx, &smp->data.u.ipv6, 16);
1892 idx += 16;
1893 break;
1894 case SMP_T_STR:
1895 p[idx++] = SPOE_DATA_T_STR;
1896 if (idx + smp->data.u.str.len > max_size)
1897 goto skip;
1898 idx += encode_spoe_string(smp->data.u.str.str,
1899 smp->data.u.str.len,
1900 p+idx);
1901 break;
1902 case SMP_T_BIN:
1903 p[idx++] = SPOE_DATA_T_BIN;
1904 if (idx + smp->data.u.str.len > max_size)
1905 goto skip;
1906 idx += encode_spoe_string(smp->data.u.str.str,
1907 smp->data.u.str.len,
1908 p+idx);
1909 break;
1910 case SMP_T_METH:
1911 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1912 p[idx++] = SPOE_DATA_T_STR;
1913 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1914 goto skip;
1915 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1916 http_known_methods[smp->data.u.meth.meth].len,
1917 p+idx);
1918 }
1919 else {
1920 p[idx++] = SPOE_DATA_T_STR;
1921 if (idx + smp->data.u.str.len > max_size)
1922 goto skip;
1923 idx += encode_spoe_string(smp->data.u.meth.str.str,
1924 smp->data.u.meth.str.len,
1925 p+idx);
1926 }
1927 break;
1928 default:
1929 p[idx++] = SPOE_DATA_T_NULL;
1930 }
1931 }
1932 }
1933 ctx->buffer->i = idx;
1934 return 1;
1935
1936 skip:
1937 b_reset(ctx->buffer);
1938 return 0;
1939}
1940
1941/* Helper function to set a variable */
1942static void
1943set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1944 struct sample *smp)
1945{
1946 struct spoe_config *conf = FLT_CONF(ctx->filter);
1947 struct spoe_agent *agent = conf->agent;
1948 char varname[64];
1949
1950 memset(varname, 0, sizeof(varname));
1951 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1952 scope, agent->var_pfx, len, name);
1953 vars_set_by_name_ifexist(varname, len, smp);
1954}
1955
1956/* Helper function to unset a variable */
1957static void
1958unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1959 struct sample *smp)
1960{
1961 struct spoe_config *conf = FLT_CONF(ctx->filter);
1962 struct spoe_agent *agent = conf->agent;
1963 char varname[64];
1964
1965 memset(varname, 0, sizeof(varname));
1966 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1967 scope, agent->var_pfx, len, name);
1968 vars_unset_by_name_ifexist(varname, len, smp);
1969}
1970
1971
1972/* Process SPOE actions for a specific event. During the processing, it returns
1973 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1974 * is returned. */
1975static int
1976process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1977 enum spoe_event ev, int dir)
1978{
1979 char *p;
1980 size_t size;
1981 int off, i, idx = 0;
1982
1983 p = ctx->buffer->p;
1984 size = ctx->buffer->i;
1985
1986 while (idx < size) {
1987 char *str;
1988 uint64_t sz;
1989 struct sample smp;
1990 enum spoe_action_type type;
1991
1992 off = idx;
1993 if (idx+2 > size)
1994 goto skip;
1995
1996 type = p[idx++];
1997 switch (type) {
1998 case SPOE_ACT_T_SET_VAR: {
1999 char *scope;
2000
2001 if (p[idx++] != 3)
2002 goto skip_action;
2003
2004 switch (p[idx++]) {
2005 case SPOE_SCOPE_PROC: scope = "proc"; break;
2006 case SPOE_SCOPE_SESS: scope = "sess"; break;
2007 case SPOE_SCOPE_TXN : scope = "txn"; break;
2008 case SPOE_SCOPE_REQ : scope = "req"; break;
2009 case SPOE_SCOPE_RES : scope = "res"; break;
2010 default: goto skip;
2011 }
2012
2013 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2014 if (str == NULL)
2015 goto skip;
2016 memset(&smp, 0, sizeof(smp));
2017 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2018 if (decode_spoe_data(p+idx, p+size, &smp) == -1)
2019 goto skip;
2020
2021 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2022 " - set-var '%s.%s.%.*s'\n",
2023 (int)now.tv_sec, (int)now.tv_usec,
2024 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2025 __FUNCTION__, s, scope,
2026 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2027 (int)sz, str);
2028
2029 set_spoe_var(ctx, scope, str, sz, &smp);
2030 break;
2031 }
2032
2033 case SPOE_ACT_T_UNSET_VAR: {
2034 char *scope;
2035
2036 if (p[idx++] != 2)
2037 goto skip_action;
2038
2039 switch (p[idx++]) {
2040 case SPOE_SCOPE_PROC: scope = "proc"; break;
2041 case SPOE_SCOPE_SESS: scope = "sess"; break;
2042 case SPOE_SCOPE_TXN : scope = "txn"; break;
2043 case SPOE_SCOPE_REQ : scope = "req"; break;
2044 case SPOE_SCOPE_RES : scope = "res"; break;
2045 default: goto skip;
2046 }
2047
2048 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2049 if (str == NULL)
2050 goto skip;
2051 memset(&smp, 0, sizeof(smp));
2052 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2053
2054 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2055 " - unset-var '%s.%s.%.*s'\n",
2056 (int)now.tv_sec, (int)now.tv_usec,
2057 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2058 __FUNCTION__, s, scope,
2059 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2060 (int)sz, str);
2061
2062 unset_spoe_var(ctx, scope, str, sz, &smp);
2063 break;
2064 }
2065
2066 default:
2067 skip_action:
2068 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2069 goto skip;
2070 idx += i;
2071 }
2072 }
2073
2074 return 1;
2075 skip:
2076 return 0;
2077}
2078
2079
2080/* Process a SPOE event. First, this functions will process messages attached to
2081 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2082 * ACK frame to process corresponding actions. During all the processing, it
2083 * returns 0 and it returns 1 when the processing is finished. If an error
2084 * occurred, -1 is returned. */
2085static int
2086process_spoe_event(struct stream *s, struct spoe_context *ctx,
2087 enum spoe_event ev)
2088{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002089 struct spoe_config *conf = FLT_CONF(ctx->filter);
2090 struct spoe_agent *agent = conf->agent;
2091 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002092
2093 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2094 " - ctx-state=%s - event=%s\n",
2095 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002096 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002097 spoe_event_str[ev]);
2098
Christopher Faulet48026722016-11-16 15:01:12 +01002099 if (agent->eps_max > 0) {
2100 if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2101 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2102 " - skip event '%s': max EPS reached\n",
2103 (int)now.tv_sec, (int)now.tv_usec,
2104 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2105 goto skip;
2106 }
2107 }
2108
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002109 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2110
2111 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2112 goto out;
2113
2114 if (ctx->state == SPOE_CTX_ST_ERROR)
2115 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002116
2117 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2118 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2119 " - failed to process event '%s': timeout\n",
2120 (int)now.tv_sec, (int)now.tv_usec,
2121 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2122 send_log(ctx->strm->be, LOG_WARNING,
2123 "failed to process event '%s': timeout.\n",
2124 spoe_event_str[ev]);
2125 goto error;
2126 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002127
2128 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauletf7a30922016-11-10 15:04:51 +01002129 if (!tick_isset(ctx->process_exp)) {
2130 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2131 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2132 ctx->process_exp);
2133 }
2134
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002135 ret = acquire_spoe_appctx(ctx, dir);
2136 if (ret <= 0) {
2137 if (!ret)
2138 goto out;
2139 goto error;
2140 }
2141 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2142 }
2143
2144 if (ctx->appctx == NULL)
2145 goto error;
2146
2147 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2148 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2149 if (ret <= 0) {
2150 if (!ret)
2151 goto skip;
2152 goto error;
2153 }
2154 wakeup_spoe_appctx(ctx);
2155 ret = 0;
2156 goto out;
2157 }
2158
2159 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2160 wakeup_spoe_appctx(ctx);
2161 ret = 0;
2162 goto out;
2163 }
2164
2165 if (ctx->state == SPOE_CTX_ST_DONE) {
2166 ret = process_spoe_actions(s, ctx, ev, dir);
2167 if (ret <= 0) {
2168 if (!ret)
2169 goto skip;
2170 goto error;
2171 }
2172 ctx->frame_id++;
2173 release_spoe_appctx(ctx);
2174 ctx->state = SPOE_CTX_ST_READY;
2175 }
2176
2177 out:
2178 return ret;
2179
2180 skip:
2181 release_spoe_appctx(ctx);
2182 ctx->state = SPOE_CTX_ST_READY;
2183 return 1;
2184
2185 error:
Christopher Faulet48026722016-11-16 15:01:12 +01002186 if (agent->eps_max > 0)
2187 update_freq_ctr(&agent->err_per_sec, 1);
2188
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002189 release_spoe_appctx(ctx);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002190 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2191 ? SPOE_CTX_ST_READY
2192 : SPOE_CTX_ST_ERROR);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002193 return 1;
2194}
2195
2196
2197/***************************************************************************
2198 * Functions that create/destroy SPOE contexts
2199 **************************************************************************/
2200static struct spoe_context *
2201create_spoe_context(struct filter *filter)
2202{
2203 struct spoe_config *conf = FLT_CONF(filter);
2204 struct spoe_context *ctx;
2205
2206 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2207 if (ctx == NULL) {
2208 return NULL;
2209 }
2210 memset(ctx, 0, sizeof(*ctx));
2211 ctx->filter = filter;
2212 ctx->state = SPOE_CTX_ST_NONE;
2213 ctx->flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002214 ctx->messages = conf->agent->messages;
2215 ctx->buffer = &buf_empty;
2216 LIST_INIT(&ctx->buffer_wait);
2217 LIST_INIT(&ctx->applet_wait);
2218
Christopher Fauletf7a30922016-11-10 15:04:51 +01002219 ctx->stream_id = 0;
2220 ctx->frame_id = 1;
2221 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002222
2223 return ctx;
2224}
2225
2226static void
2227destroy_spoe_context(struct spoe_context *ctx)
2228{
2229 if (!ctx)
2230 return;
2231
2232 if (ctx->appctx)
2233 APPCTX_SPOE(ctx->appctx).ctx = NULL;
2234 if (!LIST_ISEMPTY(&ctx->buffer_wait))
2235 LIST_DEL(&ctx->buffer_wait);
2236 if (!LIST_ISEMPTY(&ctx->applet_wait))
2237 LIST_DEL(&ctx->applet_wait);
2238 pool_free2(pool2_spoe_ctx, ctx);
2239}
2240
2241static void
2242reset_spoe_context(struct spoe_context *ctx)
2243{
2244 ctx->state = SPOE_CTX_ST_READY;
2245 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2246}
2247
2248
2249/***************************************************************************
2250 * Hooks that manage the filter lifecycle (init/check/deinit)
2251 **************************************************************************/
2252/* Signal handler: Do a soft stop, wakeup SPOE applet */
2253static void
2254sig_stop_spoe(struct sig_handler *sh)
2255{
2256 struct proxy *p;
2257
2258 p = proxy;
2259 while (p) {
2260 struct flt_conf *fconf;
2261
2262 list_for_each_entry(fconf, &p->filter_configs, list) {
2263 struct spoe_config *conf = fconf->conf;
2264 struct spoe_agent *agent = conf->agent;
2265 struct appctx *appctx;
2266
2267 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2268 si_applet_want_get(appctx->owner);
2269 si_applet_want_put(appctx->owner);
2270 appctx_wakeup(appctx);
2271 }
2272 }
2273 p = p->next;
2274 }
2275}
2276
2277
2278/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2279static int
2280spoe_init(struct proxy *px, struct flt_conf *fconf)
2281{
2282 struct spoe_config *conf = fconf->conf;
2283 struct listener *l;
2284
2285 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2286 init_new_proxy(&conf->agent_fe);
2287 conf->agent_fe.parent = conf->agent;
2288 conf->agent_fe.last_change = now.tv_sec;
2289 conf->agent_fe.id = conf->agent->id;
2290 conf->agent_fe.cap = PR_CAP_FE;
2291 conf->agent_fe.mode = PR_MODE_TCP;
2292 conf->agent_fe.maxconn = 0;
2293 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2294 conf->agent_fe.conn_retries = CONN_RETRIES;
2295 conf->agent_fe.accept = frontend_accept;
2296 conf->agent_fe.srv = NULL;
2297 conf->agent_fe.timeout.client = TICK_ETERNITY;
2298 conf->agent_fe.default_target = &spoe_applet.obj_type;
2299 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2300
2301 if ((l = calloc(1, sizeof(*l))) == NULL) {
2302 Alert("spoe_init : out of memory.\n");
2303 goto out_error;
2304 }
2305 l->obj_type = OBJ_TYPE_LISTENER;
2306 l->obj_type = OBJ_TYPE_LISTENER;
2307 l->frontend = &conf->agent_fe;
2308 l->state = LI_READY;
2309 l->analysers = conf->agent_fe.fe_req_ana;
2310 LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2311
2312 if (!sighandler_registered) {
2313 signal_register_fct(0, sig_stop_spoe, 0);
2314 sighandler_registered = 1;
2315 }
2316
2317 return 0;
2318
2319 out_error:
2320 return -1;
2321}
2322
2323/* Free ressources allocated by the SPOE filter. */
2324static void
2325spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2326{
2327 struct spoe_config *conf = fconf->conf;
2328
2329 if (conf) {
2330 struct spoe_agent *agent = conf->agent;
2331 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2332 struct listener *, by_fe);
2333
2334 free(l);
2335 release_spoe_agent(agent);
2336 free(conf);
2337 }
2338 fconf->conf = NULL;
2339}
2340
2341/* Check configuration of a SPOE filter for a specified proxy.
2342 * Return 1 on error, else 0. */
2343static int
2344spoe_check(struct proxy *px, struct flt_conf *fconf)
2345{
2346 struct spoe_config *conf = fconf->conf;
2347 struct proxy *target;
2348
2349 target = proxy_be_by_name(conf->agent->b.name);
2350 if (target == NULL) {
2351 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2352 " declared at %s:%d.\n",
2353 px->id, conf->agent->b.name, conf->agent->id,
2354 conf->agent->conf.file, conf->agent->conf.line);
2355 return 1;
2356 }
2357 if (target->mode != PR_MODE_TCP) {
2358 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2359 " at %s:%d does not support HTTP mode.\n",
2360 px->id, target->id, conf->agent->id,
2361 conf->agent->conf.file, conf->agent->conf.line);
2362 return 1;
2363 }
2364
2365 free(conf->agent->b.name);
2366 conf->agent->b.name = NULL;
2367 conf->agent->b.be = target;
2368 return 0;
2369}
2370
2371/**************************************************************************
2372 * Hooks attached to a stream
2373 *************************************************************************/
2374/* Called when a filter instance is created and attach to a stream. It creates
2375 * the context that will be used to process this stream. */
2376static int
2377spoe_start(struct stream *s, struct filter *filter)
2378{
2379 struct spoe_context *ctx;
2380
2381 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2382 (int)now.tv_sec, (int)now.tv_usec,
2383 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2384 __FUNCTION__, s);
2385
2386 ctx = create_spoe_context(filter);
2387 if (ctx == NULL) {
2388 send_log(s->be, LOG_EMERG,
2389 "failed to create SPOE context for proxy %s\n",
2390 s->be->id);
2391 return 0;
2392 }
2393
2394 ctx->strm = s;
2395 ctx->state = SPOE_CTX_ST_READY;
2396 filter->ctx = ctx;
2397
2398 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2399 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2400
2401 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2402 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2403
2404 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2405 filter->pre_analyzers |= AN_RES_INSPECT;
2406
2407 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2408 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2409
2410 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2411 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2412
2413 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2414 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2415
2416 return 1;
2417}
2418
2419/* Called when a filter instance is detached from a stream. It release the
2420 * attached SPOE context. */
2421static void
2422spoe_stop(struct stream *s, struct filter *filter)
2423{
2424 struct spoe_context *ctx = filter->ctx;
2425
2426 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2427 (int)now.tv_sec, (int)now.tv_usec,
2428 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2429 __FUNCTION__, s);
2430
2431 if (ctx) {
2432 release_spoe_appctx(ctx);
2433 destroy_spoe_context(ctx);
2434 }
2435}
2436
Christopher Fauletf7a30922016-11-10 15:04:51 +01002437
2438/*
2439 * Called when the stream is woken up because of expired timer.
2440 */
2441static void
2442spoe_check_timeouts(struct stream *s, struct filter *filter)
2443{
2444 struct spoe_context *ctx = filter->ctx;
2445
2446 if (tick_is_expired(ctx->process_exp, now_ms))
2447 s->task->state |= TASK_WOKEN_MSG;
2448}
2449
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002450/* Called when we are ready to filter data on a channel */
2451static int
2452spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2453{
2454 struct spoe_context *ctx = filter->ctx;
2455 int ret = 1;
2456
2457 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2458 " - ctx-flags=0x%08x\n",
2459 (int)now.tv_sec, (int)now.tv_usec,
2460 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2461 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2462
2463 if (!(chn->flags & CF_ISRESP)) {
2464 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2465 chn->analysers |= AN_REQ_INSPECT_FE;
2466 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2467 chn->analysers |= AN_REQ_INSPECT_BE;
2468
2469 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2470 goto out;
2471
2472 ctx->stream_id = s->uniq_id;
2473 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2474 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2475 if (ret != 1)
2476 goto out;
2477 }
2478 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2479 }
2480 else {
2481 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2482 chn->analysers |= AN_RES_INSPECT;
2483
2484 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2485 goto out;
2486
2487 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2488 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2489 if (ret != 1)
2490 goto out;
2491 }
2492 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2493 }
2494
2495 out:
2496 if (!ret) {
2497 channel_dont_read(chn);
2498 channel_dont_close(chn);
2499 }
2500 return ret;
2501}
2502
2503/* Called before a processing happens on a given channel */
2504static int
2505spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2506 struct channel *chn, unsigned an_bit)
2507{
2508 struct spoe_context *ctx = filter->ctx;
2509 int ret = 1;
2510
2511 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2512 " - ctx-flags=0x%08x - ana=0x%08x\n",
2513 (int)now.tv_sec, (int)now.tv_usec,
2514 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2515 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2516 ctx->flags, an_bit);
2517
2518 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2519 goto out;
2520
2521 switch (an_bit) {
2522 case AN_REQ_INSPECT_FE:
2523 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2524 break;
2525 case AN_REQ_INSPECT_BE:
2526 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2527 break;
2528 case AN_RES_INSPECT:
2529 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2530 break;
2531 case AN_REQ_HTTP_PROCESS_FE:
2532 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2533 break;
2534 case AN_REQ_HTTP_PROCESS_BE:
2535 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2536 break;
2537 case AN_RES_HTTP_PROCESS_FE:
2538 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2539 break;
2540 }
2541
2542 out:
2543 if (!ret) {
2544 channel_dont_read(chn);
2545 channel_dont_close(chn);
2546 }
2547 return ret;
2548}
2549
2550/* Called when the filtering on the channel ends. */
2551static int
2552spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2553{
2554 struct spoe_context *ctx = filter->ctx;
2555
2556 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2557 " - ctx-flags=0x%08x\n",
2558 (int)now.tv_sec, (int)now.tv_usec,
2559 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2560 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2561
2562 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2563 reset_spoe_context(ctx);
2564 }
2565
2566 return 1;
2567}
2568
2569/********************************************************************
2570 * Functions that manage the filter initialization
2571 ********************************************************************/
2572struct flt_ops spoe_ops = {
2573 /* Manage SPOE filter, called for each filter declaration */
2574 .init = spoe_init,
2575 .deinit = spoe_deinit,
2576 .check = spoe_check,
2577
2578 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002579 .attach = spoe_start,
2580 .detach = spoe_stop,
2581 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002582
2583 /* Handle channels activity */
2584 .channel_start_analyze = spoe_start_analyze,
2585 .channel_pre_analyze = spoe_chn_pre_analyze,
2586 .channel_end_analyze = spoe_end_analyze,
2587};
2588
2589
2590static int
2591cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2592{
2593 const char *err;
2594 int i, err_code = 0;
2595
2596 if ((cfg_scope == NULL && curengine != NULL) ||
2597 (cfg_scope != NULL && curengine == NULL) ||
2598 strcmp(curengine, cfg_scope))
2599 goto out;
2600
2601 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2602 if (!*args[1]) {
2603 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2604 file, linenum);
2605 err_code |= ERR_ALERT | ERR_ABORT;
2606 goto out;
2607 }
2608 if (*args[2]) {
2609 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2610 file, linenum, args[2]);
2611 err_code |= ERR_ALERT | ERR_ABORT;
2612 goto out;
2613 }
2614
2615 err = invalid_char(args[1]);
2616 if (err) {
2617 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2618 file, linenum, *err, args[0], args[1]);
2619 err_code |= ERR_ALERT | ERR_ABORT;
2620 goto out;
2621 }
2622
2623 if (curagent != NULL) {
2624 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2625 file, linenum);
2626 err_code |= ERR_ALERT | ERR_ABORT;
2627 goto out;
2628 }
2629 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2630 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2631 err_code |= ERR_ALERT | ERR_ABORT;
2632 goto out;
2633 }
2634
2635 curagent->id = strdup(args[1]);
2636 curagent->conf.file = strdup(file);
2637 curagent->conf.line = linenum;
2638 curagent->timeout.hello = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002639 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002640 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002641 curagent->var_pfx = NULL;
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002642 curagent->flags = 0;
Christopher Faulet48026722016-11-16 15:01:12 +01002643 curagent->cps_max = 0;
2644 curagent->eps_max = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002645
2646 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2647 LIST_INIT(&curagent->messages[i]);
2648 LIST_INIT(&curagent->cache);
2649 LIST_INIT(&curagent->applet_wq);
2650 }
2651 else if (!strcmp(args[0], "use-backend")) {
2652 if (!*args[1]) {
2653 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2654 file, linenum, args[0]);
2655 err_code |= ERR_ALERT | ERR_FATAL;
2656 goto out;
2657 }
2658 if (*args[2]) {
2659 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2660 file, linenum, args[2]);
2661 err_code |= ERR_ALERT | ERR_ABORT;
2662 goto out;
2663 }
2664 free(curagent->b.name);
2665 curagent->b.name = strdup(args[1]);
2666 }
2667 else if (!strcmp(args[0], "messages")) {
2668 int cur_arg = 1;
2669 while (*args[cur_arg]) {
2670 struct spoe_msg_placeholder *mp = NULL;
2671
2672 list_for_each_entry(mp, &curmps, list) {
2673 if (!strcmp(mp->id, args[cur_arg])) {
2674 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2675 file, linenum, args[cur_arg]);
2676 err_code |= ERR_ALERT | ERR_FATAL;
2677 goto out;
2678 }
2679 }
2680
2681 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2682 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2683 err_code |= ERR_ALERT | ERR_ABORT;
2684 goto out;
2685 }
2686 mp->id = strdup(args[cur_arg]);
2687 LIST_ADDQ(&curmps, &mp->list);
2688 cur_arg++;
2689 }
2690 }
2691 else if (!strcmp(args[0], "timeout")) {
2692 unsigned int *tv = NULL;
2693 const char *res;
2694 unsigned timeout;
2695
2696 if (!*args[1]) {
2697 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2698 file, linenum);
2699 err_code |= ERR_ALERT | ERR_FATAL;
2700 goto out;
2701 }
2702 if (!strcmp(args[1], "hello"))
2703 tv = &curagent->timeout.hello;
2704 else if (!strcmp(args[1], "idle"))
2705 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002706 else if (!strcmp(args[1], "processing"))
2707 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002708 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01002709 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002710 file, linenum, args[1]);
2711 err_code |= ERR_ALERT | ERR_FATAL;
2712 goto out;
2713 }
2714 if (!*args[2]) {
2715 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2716 file, linenum, args[1]);
2717 err_code |= ERR_ALERT | ERR_FATAL;
2718 goto out;
2719 }
2720 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2721 if (res) {
2722 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2723 file, linenum, *res, args[1]);
2724 err_code |= ERR_ALERT | ERR_ABORT;
2725 goto out;
2726 }
2727 if (*args[3]) {
2728 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2729 file, linenum, args[3]);
2730 err_code |= ERR_ALERT | ERR_ABORT;
2731 goto out;
2732 }
2733 *tv = MS_TO_TICKS(timeout);
2734 }
2735 else if (!strcmp(args[0], "option")) {
2736 if (!*args[1]) {
2737 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2738 file, linenum, args[0]);
2739 err_code |= ERR_ALERT | ERR_FATAL;
2740 goto out;
2741 }
2742 if (!strcmp(args[1], "var-prefix")) {
2743 char *tmp;
2744
2745 if (!*args[2]) {
2746 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2747 file, linenum, args[0],
2748 args[1]);
2749 err_code |= ERR_ALERT | ERR_FATAL;
2750 goto out;
2751 }
2752 tmp = args[2];
2753 while (*tmp) {
2754 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2755 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2756 file, linenum, args[0], args[1]);
2757 err_code |= ERR_ALERT | ERR_FATAL;
2758 goto out;
2759 }
2760 tmp++;
2761 }
2762 curagent->var_pfx = strdup(args[2]);
2763 }
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002764 else if (!strcmp(args[1], "continue-on-error")) {
2765 if (*args[2]) {
2766 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
Christopher Faulet48026722016-11-16 15:01:12 +01002767 file, linenum, args[2]);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002768 err_code |= ERR_ALERT | ERR_ABORT;
2769 goto out;
2770 }
2771 curagent->flags |= SPOE_FL_CONT_ON_ERR;
2772 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002773 else {
2774 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2775 file, linenum, args[1]);
2776 err_code |= ERR_ALERT | ERR_FATAL;
2777 goto out;
2778 }
Christopher Faulet48026722016-11-16 15:01:12 +01002779 }
2780 else if (!strcmp(args[0], "maxconnrate")) {
2781 if (!*args[1]) {
2782 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2783 file, linenum, args[0]);
2784 err_code |= ERR_ALERT | ERR_FATAL;
2785 goto out;
2786 }
2787 if (*args[2]) {
2788 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2789 file, linenum, args[2]);
2790 err_code |= ERR_ALERT | ERR_ABORT;
2791 goto out;
2792 }
2793 curagent->cps_max = atol(args[1]);
2794 }
2795 else if (!strcmp(args[0], "maxerrrate")) {
2796 if (!*args[1]) {
2797 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2798 file, linenum, args[0]);
2799 err_code |= ERR_ALERT | ERR_FATAL;
2800 goto out;
2801 }
2802 if (*args[2]) {
2803 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2804 file, linenum, args[2]);
2805 err_code |= ERR_ALERT | ERR_ABORT;
2806 goto out;
2807 }
2808 curagent->eps_max = atol(args[1]);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002809 }
2810 else if (*args[0]) {
2811 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2812 file, linenum, args[0]);
2813 err_code |= ERR_ALERT | ERR_FATAL;
2814 goto out;
2815 }
2816 out:
2817 return err_code;
2818}
2819
2820static int
2821cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2822{
2823 struct spoe_message *msg;
2824 struct spoe_arg *arg;
2825 const char *err;
2826 char *errmsg = NULL;
2827 int err_code = 0;
2828
2829 if ((cfg_scope == NULL && curengine != NULL) ||
2830 (cfg_scope != NULL && curengine == NULL) ||
2831 strcmp(curengine, cfg_scope))
2832 goto out;
2833
2834 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2835 if (!*args[1]) {
2836 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2837 file, linenum);
2838 err_code |= ERR_ALERT | ERR_ABORT;
2839 goto out;
2840 }
2841 if (*args[2]) {
2842 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2843 file, linenum, args[2]);
2844 err_code |= ERR_ALERT | ERR_ABORT;
2845 goto out;
2846 }
2847
2848 err = invalid_char(args[1]);
2849 if (err) {
2850 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2851 file, linenum, *err, args[0], args[1]);
2852 err_code |= ERR_ALERT | ERR_ABORT;
2853 goto out;
2854 }
2855
2856 list_for_each_entry(msg, &curmsgs, list) {
2857 if (!strcmp(msg->id, args[1])) {
2858 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2859 " name as another one declared at %s:%d.\n",
2860 file, linenum, args[1], msg->conf.file, msg->conf.line);
2861 err_code |= ERR_ALERT | ERR_FATAL;
2862 goto out;
2863 }
2864 }
2865
2866 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2867 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2868 err_code |= ERR_ALERT | ERR_ABORT;
2869 goto out;
2870 }
2871
2872 curmsg->id = strdup(args[1]);
2873 curmsg->id_len = strlen(curmsg->id);
2874 curmsg->event = SPOE_EV_NONE;
2875 curmsg->conf.file = strdup(file);
2876 curmsg->conf.line = linenum;
2877 LIST_INIT(&curmsg->args);
2878 LIST_ADDQ(&curmsgs, &curmsg->list);
2879 }
2880 else if (!strcmp(args[0], "args")) {
2881 int cur_arg = 1;
2882
2883 curproxy->conf.args.ctx = ARGC_SPOE;
2884 curproxy->conf.args.file = file;
2885 curproxy->conf.args.line = linenum;
2886 while (*args[cur_arg]) {
2887 char *delim = strchr(args[cur_arg], '=');
2888 int idx = 0;
2889
2890 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2891 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2892 err_code |= ERR_ALERT | ERR_ABORT;
2893 goto out;
2894 }
2895
2896 if (!delim) {
2897 arg->name = NULL;
2898 arg->name_len = 0;
2899 delim = args[cur_arg];
2900 }
2901 else {
2902 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2903 arg->name_len = delim - args[cur_arg];
2904 delim++;
2905 }
2906
2907 arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
2908 if (arg->expr == NULL) {
2909 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2910 err_code |= ERR_ALERT | ERR_FATAL;
2911 free(arg->name);
2912 free(arg);
2913 goto out;
2914 }
2915 LIST_ADDQ(&curmsg->args, &arg->list);
2916 cur_arg++;
2917 }
2918 curproxy->conf.args.file = NULL;
2919 curproxy->conf.args.line = 0;
2920 }
2921 else if (!strcmp(args[0], "event")) {
2922 if (!*args[1]) {
2923 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2924 err_code |= ERR_ALERT | ERR_ABORT;
2925 goto out;
2926 }
2927 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2928 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2929 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2930 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2931
2932 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2933 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2934 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2935 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2936 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2937 curmsg->event = SPOE_EV_ON_TCP_RSP;
2938
2939 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2940 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2941 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2942 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2943 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2944 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2945 else {
2946 Alert("parsing [%s:%d] : unkown event '%s'.\n",
2947 file, linenum, args[1]);
2948 err_code |= ERR_ALERT | ERR_ABORT;
2949 goto out;
2950 }
2951 }
2952 else if (!*args[0]) {
2953 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
2954 file, linenum, args[0]);
2955 err_code |= ERR_ALERT | ERR_FATAL;
2956 goto out;
2957 }
2958 out:
2959 free(errmsg);
2960 return err_code;
2961}
2962
2963/* Return -1 on error, else 0 */
2964static int
2965parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
2966 struct flt_conf *fconf, char **err, void *private)
2967{
2968 struct list backup_sections;
2969 struct spoe_config *conf;
2970 struct spoe_message *msg, *msgback;
2971 struct spoe_msg_placeholder *mp, *mpback;
2972 char *file = NULL, *engine = NULL;
2973 int ret, pos = *cur_arg + 1;
2974
2975 conf = calloc(1, sizeof(*conf));
2976 if (conf == NULL) {
2977 memprintf(err, "%s: out of memory", args[*cur_arg]);
2978 goto error;
2979 }
2980 conf->proxy = px;
2981
2982 while (*args[pos]) {
2983 if (!strcmp(args[pos], "config")) {
2984 if (!*args[pos+1]) {
2985 memprintf(err, "'%s' : '%s' option without value",
2986 args[*cur_arg], args[pos]);
2987 goto error;
2988 }
2989 file = args[pos+1];
2990 pos += 2;
2991 }
2992 else if (!strcmp(args[pos], "engine")) {
2993 if (!*args[pos+1]) {
2994 memprintf(err, "'%s' : '%s' option without value",
2995 args[*cur_arg], args[pos]);
2996 goto error;
2997 }
2998 engine = args[pos+1];
2999 pos += 2;
3000 }
3001 else {
3002 memprintf(err, "unknown keyword '%s'", args[pos]);
3003 goto error;
3004 }
3005 }
3006 if (file == NULL) {
3007 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3008 goto error;
3009 }
3010
3011 /* backup sections and register SPOE sections */
3012 LIST_INIT(&backup_sections);
3013 cfg_backup_sections(&backup_sections);
3014 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
3015 cfg_register_section("spoe-message", cfg_parse_spoe_message);
3016
3017 /* Parse SPOE filter configuration file */
3018 curengine = engine;
3019 curproxy = px;
3020 curagent = NULL;
3021 curmsg = NULL;
3022 ret = readcfgfile(file);
3023 curproxy = NULL;
3024
3025 /* unregister SPOE sections and restore previous sections */
3026 cfg_unregister_sections();
3027 cfg_restore_sections(&backup_sections);
3028
3029 if (ret == -1) {
3030 memprintf(err, "Could not open configuration file %s : %s",
3031 file, strerror(errno));
3032 goto error;
3033 }
3034 if (ret & (ERR_ABORT|ERR_FATAL)) {
3035 memprintf(err, "Error(s) found in configuration file %s", file);
3036 goto error;
3037 }
3038
3039 /* Check SPOE agent */
3040 if (curagent == NULL) {
3041 memprintf(err, "No SPOE agent found in file %s", file);
3042 goto error;
3043 }
3044 if (curagent->b.name == NULL) {
3045 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3046 curagent->id, curagent->conf.file, curagent->conf.line);
3047 goto error;
3048 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003049 if (curagent->timeout.hello == TICK_ETERNITY ||
3050 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003051 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003052 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3053 " | While not properly invalid, you will certainly encounter various problems\n"
3054 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003055 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003056 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3057 }
3058 if (curagent->var_pfx == NULL) {
3059 char *tmp = curagent->id;
3060
3061 while (*tmp) {
3062 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3063 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3064 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3065 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3066 goto error;
3067 }
3068 tmp++;
3069 }
3070 curagent->var_pfx = strdup(curagent->id);
3071 }
3072
3073 if (LIST_ISEMPTY(&curmps)) {
3074 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3075 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3076 goto finish;
3077 }
3078
3079 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3080 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3081 if (!strcmp(msg->id, mp->id)) {
3082 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3083 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3084 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3085 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3086 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3087 }
3088 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3089 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3090 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3091 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3092 px->id, msg->conf.file, msg->conf.line);
3093 goto next;
3094 }
3095 if (msg->event == SPOE_EV_NONE) {
3096 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3097 px->id, msg->conf.file, msg->conf.line);
3098 goto next;
3099 }
3100 msg->agent = curagent;
3101 LIST_DEL(&msg->list);
3102 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3103 goto next;
3104 }
3105 }
3106 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3107 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3108 goto error;
3109 next:
3110 continue;
3111 }
3112
3113 finish:
3114 conf->agent = curagent;
3115 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3116 LIST_DEL(&mp->list);
3117 release_spoe_msg_placeholder(mp);
3118 }
3119 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3120 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3121 px->id, msg->id, msg->conf.file, msg->conf.line);
3122 LIST_DEL(&msg->list);
3123 release_spoe_message(msg);
3124 }
3125
3126 *cur_arg = pos;
3127 fconf->ops = &spoe_ops;
3128 fconf->conf = conf;
3129 return 0;
3130
3131 error:
3132 release_spoe_agent(curagent);
3133 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3134 LIST_DEL(&mp->list);
3135 release_spoe_msg_placeholder(mp);
3136 }
3137 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3138 LIST_DEL(&msg->list);
3139 release_spoe_message(msg);
3140 }
3141 free(conf);
3142 return -1;
3143}
3144
3145
3146/* Declare the filter parser for "spoe" keyword */
3147static struct flt_kw_list flt_kws = { "SPOE", { }, {
3148 { "spoe", parse_spoe_flt, NULL },
3149 { NULL, NULL, NULL },
3150 }
3151};
3152
3153__attribute__((constructor))
3154static void __spoe_init(void)
3155{
3156 flt_register_keywords(&flt_kws);
3157
3158 LIST_INIT(&curmsgs);
3159 LIST_INIT(&curmps);
3160 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3161}
3162
3163__attribute__((destructor))
3164static void
3165__spoe_deinit(void)
3166{
3167 pool_destroy2(pool2_spoe_ctx);
3168}