blob: 519d6bb5f505dcea9eb6cfc6aea00e952369370a [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
Christopher Faulet48026722016-11-16 15:01:12 +010033#include <proto/freq_ctr.h>
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020034#include <proto/frontend.h>
35#include <proto/log.h>
36#include <proto/proto_http.h>
37#include <proto/proxy.h>
38#include <proto/sample.h>
39#include <proto/session.h>
40#include <proto/signal.h>
41#include <proto/stream.h>
42#include <proto/stream_interface.h>
43#include <proto/task.h>
44#include <proto/vars.h>
45
46#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47#define SPOE_PRINTF(x...) fprintf(x)
48#else
49#define SPOE_PRINTF(x...)
50#endif
51
52/* Helper to get ctx inside an appctx */
53#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
54
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020055/* Minimal size for a frame */
56#define MIN_FRAME_SIZE 256
57
Christopher Fauletea62c2a2016-11-14 10:54:21 +010058/* Flags set on the SPOE agent */
59#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
60
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020061/* Flags set on the SPOE context */
62#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
63#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
64#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
65#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
66
67#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
68
69#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
70#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
71
72/* All possible states for a SPOE context */
73enum spoe_ctx_state {
74 SPOE_CTX_ST_NONE = 0,
75 SPOE_CTX_ST_READY,
76 SPOE_CTX_ST_SENDING_MSGS,
77 SPOE_CTX_ST_WAITING_ACK,
78 SPOE_CTX_ST_DONE,
79 SPOE_CTX_ST_ERROR,
80};
81
82/* All possible states for a SPOE applet */
83enum spoe_appctx_state {
84 SPOE_APPCTX_ST_CONNECT = 0,
85 SPOE_APPCTX_ST_CONNECTING,
86 SPOE_APPCTX_ST_PROCESSING,
87 SPOE_APPCTX_ST_DISCONNECT,
88 SPOE_APPCTX_ST_DISCONNECTING,
89 SPOE_APPCTX_ST_EXIT,
90 SPOE_APPCTX_ST_END,
91};
92
93/* All supported SPOE actions */
94enum spoe_action_type {
95 SPOE_ACT_T_SET_VAR = 1,
96 SPOE_ACT_T_UNSET_VAR,
97 SPOE_ACT_TYPES,
98};
99
100/* All supported SPOE events */
101enum spoe_event {
102 SPOE_EV_NONE = 0,
103
104 /* Request events */
105 SPOE_EV_ON_CLIENT_SESS = 1,
106 SPOE_EV_ON_TCP_REQ_FE,
107 SPOE_EV_ON_TCP_REQ_BE,
108 SPOE_EV_ON_HTTP_REQ_FE,
109 SPOE_EV_ON_HTTP_REQ_BE,
110
111 /* Response events */
112 SPOE_EV_ON_SERVER_SESS,
113 SPOE_EV_ON_TCP_RSP,
114 SPOE_EV_ON_HTTP_RSP,
115
116 SPOE_EV_EVENTS
117};
118
119/* Errors triggerd by SPOE applet */
120enum spoe_frame_error {
121 SPOE_FRM_ERR_NONE = 0,
122 SPOE_FRM_ERR_IO,
123 SPOE_FRM_ERR_TOUT,
124 SPOE_FRM_ERR_TOO_BIG,
125 SPOE_FRM_ERR_INVALID,
126 SPOE_FRM_ERR_NO_VSN,
127 SPOE_FRM_ERR_NO_FRAME_SIZE,
128 SPOE_FRM_ERR_NO_CAP,
129 SPOE_FRM_ERR_BAD_VSN,
130 SPOE_FRM_ERR_BAD_FRAME_SIZE,
131 SPOE_FRM_ERR_UNKNOWN = 99,
132 SPOE_FRM_ERRS,
133};
134
135/* Scopes used for variables set by agents. It is a way to be agnotic to vars
136 * scope. */
137enum spoe_vars_scope {
138 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
139 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
140 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
141 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
142 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
143};
144
145
146/* Describe an argument that will be linked to a message. It is a sample fetch,
147 * with an optional name. */
148struct spoe_arg {
149 char *name; /* Name of the argument, may be NULL */
150 unsigned int name_len; /* The name length, 0 if NULL */
151 struct sample_expr *expr; /* Sample expression */
152 struct list list; /* Used to chain SPOE args */
153};
154
155/* Used during the config parsing only because, when a SPOE agent section is
156 * parsed, messages can be undefined. */
157struct spoe_msg_placeholder {
158 char *id; /* SPOE message placeholder id */
159 struct list list; /* Use to chain SPOE message placeholders */
160};
161
162/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
163 * an argument list (see above) and it is linked to a specific event. */
164struct spoe_message {
165 char *id; /* SPOE message id */
166 unsigned int id_len; /* The message id length */
167 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
168 struct {
169 char *file; /* file where the SPOE message appears */
170 int line; /* line where the SPOE message appears */
171 } conf; /* config information */
172 struct list args; /* Arguments added when the SPOE messages is sent */
173 struct list list; /* Used to chain SPOE messages */
174
175 enum spoe_event event; /* SPOE_EV_* */
176};
177
178/* Describe a SPOE agent. */
179struct spoe_agent {
180 char *id; /* SPOE agent id (name) */
181 struct {
182 char *file; /* file where the SPOE agent appears */
183 int line; /* line where the SPOE agent appears */
184 } conf; /* config information */
185 union {
186 struct proxy *be; /* Backend used by this agent */
187 char *name; /* Backend name used during conf parsing */
188 } b;
189 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100190 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
191 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100192 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200193 } timeout;
194
195 char *var_pfx; /* Prefix used for vars set by the agent */
Christopher Faulet985532d2016-11-16 15:36:19 +0100196 char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
Christopher Fauletea62c2a2016-11-14 10:54:21 +0100197 unsigned int flags; /* SPOE_FL_* */
Christopher Faulet48026722016-11-16 15:01:12 +0100198 unsigned int cps_max; /* Maximum number of connections per second */
199 unsigned int eps_max; /* Maximum number of errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200200
201 struct list cache; /* List used to cache SPOE streams. In
202 * fact, we cache the SPOE applect ctx */
203
204 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
205 * for each supported events */
206
207 struct list applet_wq; /* List of streams waiting for a SPOE applet */
Christopher Faulet48026722016-11-16 15:01:12 +0100208 struct freq_ctr conn_per_sec; /* connections per second */
209 struct freq_ctr err_per_sec; /* connetion errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200210};
211
212/* SPOE filter configuration */
213struct spoe_config {
214 struct proxy *proxy; /* Proxy owning the filter */
215 struct spoe_agent *agent; /* Agent used by this filter */
216 struct proxy agent_fe; /* Agent frontend */
217};
218
219/* SPOE context attached to a stream. It is the main structure that handles the
220 * processing offload */
221struct spoe_context {
222 struct filter *filter; /* The SPOE filter */
223 struct stream *strm; /* The stream that should be offloaded */
224 struct appctx *appctx; /* The SPOE appctx */
225 struct list *messages; /* List of messages that will be sent during the stream processing */
226 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
Christopher Fauleta73e59b2016-12-09 17:30:18 +0100227 struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200228 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
229
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200230 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
231 unsigned int flags; /* SPOE_CTX_FL_* */
232
233 unsigned int stream_id; /* stream_id and frame_id are used */
234 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100235 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200236};
237
238/* Set if the handle on SIGUSR1 is registered */
239static int sighandler_registered = 0;
240
241/* proxy used during the parsing */
242struct proxy *curproxy = NULL;
243
244/* The name of the SPOE engine, used during the parsing */
245char *curengine = NULL;
246
247/* SPOE agent used during the parsing */
248struct spoe_agent *curagent = NULL;
249
250/* SPOE message used during the parsing */
251struct spoe_message *curmsg = NULL;
252
253/* list of SPOE messages and placeholders used during the parsing */
254struct list curmsgs;
255struct list curmps;
256
257/* Pool used to allocate new SPOE contexts */
258static struct pool_head *pool2_spoe_ctx = NULL;
259
260/* Temporary variables used to ease error processing */
261int spoe_status_code = SPOE_FRM_ERR_NONE;
262char spoe_reason[256];
263
264struct flt_ops spoe_ops;
265
266static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
267static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
268static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
269
270/********************************************************************
271 * helper functions/globals
272 ********************************************************************/
273static void
274release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
275{
276 if (!mp)
277 return;
278 free(mp->id);
279 free(mp);
280}
281
282
283static void
284release_spoe_message(struct spoe_message *msg)
285{
286 struct spoe_arg *arg, *back;
287
288 if (!msg)
289 return;
290 free(msg->id);
291 free(msg->conf.file);
292 list_for_each_entry_safe(arg, back, &msg->args, list) {
293 release_sample_expr(arg->expr);
294 free(arg->name);
295 LIST_DEL(&arg->list);
296 free(arg);
297 }
298 free(msg);
299}
300
301static void
302release_spoe_agent(struct spoe_agent *agent)
303{
304 struct spoe_message *msg, *back;
305 int i;
306
307 if (!agent)
308 return;
309 free(agent->id);
310 free(agent->conf.file);
311 free(agent->var_pfx);
Christopher Faulet985532d2016-11-16 15:36:19 +0100312 free(agent->var_on_error);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200313 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
314 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
315 LIST_DEL(&msg->list);
316 release_spoe_message(msg);
317 }
318 }
319 free(agent);
320}
321
322static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
323 [SPOE_FRM_ERR_NONE] = "normal",
324 [SPOE_FRM_ERR_IO] = "I/O error",
325 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
326 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
327 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
328 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
329 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
330 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
331 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
332 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
333 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
334};
335
336static const char *spoe_event_str[SPOE_EV_EVENTS] = {
337 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
338 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
339 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
340 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
341 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
342
343 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
344 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
345 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
346};
347
348
349#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
350
351static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
352 [SPOE_CTX_ST_NONE] = "NONE",
353 [SPOE_CTX_ST_READY] = "READY",
354 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
355 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
356 [SPOE_CTX_ST_DONE] = "DONE",
357 [SPOE_CTX_ST_ERROR] = "ERROR",
358};
359
360static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
361 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
362 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
363 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
364 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
365 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
366 [SPOE_APPCTX_ST_EXIT] = "EXIT",
367 [SPOE_APPCTX_ST_END] = "END",
368};
369
370#endif
371/********************************************************************
372 * Functions that encode/decode SPOE frames
373 ********************************************************************/
374/* Frame Types sent by HAProxy and by agents */
375enum spoe_frame_type {
376 /* Frames sent by HAProxy */
377 SPOE_FRM_T_HAPROXY_HELLO = 1,
378 SPOE_FRM_T_HAPROXY_DISCON,
379 SPOE_FRM_T_HAPROXY_NOTIFY,
380
381 /* Frames sent by the agents */
382 SPOE_FRM_T_AGENT_HELLO = 101,
383 SPOE_FRM_T_AGENT_DISCON,
384 SPOE_FRM_T_AGENT_ACK
385};
386
387/* All supported data types */
388enum spoe_data_type {
389 SPOE_DATA_T_NULL = 0,
390 SPOE_DATA_T_BOOL,
391 SPOE_DATA_T_INT32,
392 SPOE_DATA_T_UINT32,
393 SPOE_DATA_T_INT64,
394 SPOE_DATA_T_UINT64,
395 SPOE_DATA_T_IPV4,
396 SPOE_DATA_T_IPV6,
397 SPOE_DATA_T_STR,
398 SPOE_DATA_T_BIN,
399 SPOE_DATA_TYPES
400};
401
402/* Masks to get data type or flags value */
403#define SPOE_DATA_T_MASK 0x0F
404#define SPOE_DATA_FL_MASK 0xF0
405
406/* Flags to set Boolean values */
407#define SPOE_DATA_FL_FALSE 0x00
408#define SPOE_DATA_FL_TRUE 0x10
409
410/* Helper to get static string length, excluding the terminating null byte */
411#define SLEN(str) (sizeof(str)-1)
412
413/* Predefined key used in HELLO/DISCONNECT frames */
414#define SUPPORTED_VERSIONS_KEY "supported-versions"
415#define VERSION_KEY "version"
416#define MAX_FRAME_SIZE_KEY "max-frame-size"
417#define CAPABILITIES_KEY "capabilities"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100418#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200419#define STATUS_CODE_KEY "status-code"
420#define MSG_KEY "message"
421
422struct spoe_version {
423 char *str;
424 int min;
425 int max;
426};
427
428/* All supported versions */
429static struct spoe_version supported_versions[] = {
430 {"1.0", 1000, 1000},
431 {NULL, 0, 0}
432};
433
434/* Comma-separated list of supported versions */
435#define SUPPORTED_VERSIONS_VAL "1.0"
436
437/* Comma-separated list of supported capabilities (none for now) */
438#define CAPABILITIES_VAL ""
439
440static int
441decode_spoe_version(const char *str, size_t len)
442{
443 char tmp[len+1], *start, *end;
444 double d;
445 int vsn = -1;
446
447 memset(tmp, 0, len+1);
448 memcpy(tmp, str, len);
449
450 start = tmp;
451 while (isspace(*start))
452 start++;
453
454 d = strtod(start, &end);
455 if (d == 0 || start == end)
456 goto out;
457
458 if (*end) {
459 while (isspace(*end))
460 end++;
461 if (*end)
462 goto out;
463 }
464 vsn = (int)(d * 1000);
465 out:
466 return vsn;
467}
468
469/* Encode a variable-length integer. This function never fails and returns the
470 * number of written bytes. */
471static int
472encode_spoe_varint(uint64_t i, char *buf)
473{
474 int idx;
475
476 if (i < 240) {
477 buf[0] = (unsigned char)i;
478 return 1;
479 }
480
481 buf[0] = (unsigned char)i | 240;
482 i = (i - 240) >> 4;
483 for (idx = 1; i >= 128; ++idx) {
484 buf[idx] = (unsigned char)i | 128;
485 i = (i - 128) >> 7;
486 }
487 buf[idx++] = (unsigned char)i;
488 return idx;
489}
490
491/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
492 * happens when the buffer's end in reached. On success, the number of read
493 * bytes is returned. */
494static int
495decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
496{
497 unsigned char *msg = (unsigned char *)buf;
498 int idx = 0;
499
500 if (msg > (unsigned char *)end)
501 return -1;
502
503 if (msg[0] < 240) {
504 *i = msg[0];
505 return 1;
506 }
507 *i = msg[0];
508 do {
509 ++idx;
510 if (msg+idx > (unsigned char *)end)
511 return -1;
512 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
513 } while (msg[idx] >= 128);
514 return (idx + 1);
515}
516
517/* Encode a string. The string will be prefix by its length, encoded as a
518 * variable-length integer. This function never fails and returns the number of
519 * written bytes. */
520static int
521encode_spoe_string(const char *str, size_t len, char *dst)
522{
523 int idx = 0;
524
525 if (!len) {
526 dst[0] = 0;
527 return 1;
528 }
529
530 idx += encode_spoe_varint(len, dst);
531 memcpy(dst+idx, str, len);
532 return (idx + len);
533}
534
535/* Decode a string. Its length is decoded first as a variable-length integer. If
536 * it succeeds, and if the string length is valid, the begin of the string is
537 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
538 * read is returned. If an error occurred, -1 is returned and <*str> remains
539 * NULL. */
540static int
541decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
542{
543 int i, idx = 0;
544
545 *str = NULL;
546 *len = 0;
547
548 if ((i = decode_spoe_varint(buf, end, len)) == -1)
549 goto error;
550 idx += i;
551 if (buf + idx + *len > end)
552 goto error;
553
554 *str = buf+idx;
555 return (idx + *len);
556
557 error:
558 return -1;
559}
560
561/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
562 * of bytes read is returned. A types data is composed of a type (1 byte) and
563 * corresponding data:
564 * - boolean: non additional data (0 bytes)
565 * - integers: a variable-length integer (see decode_spoe_varint)
566 * - ipv4: 4 bytes
567 * - ipv6: 16 bytes
568 * - binary and string: a buffer prefixed by its size, a variable-length
569 * integer (see decode_spoe_string) */
570static int
571skip_spoe_data(char *frame, char *end)
572{
573 uint64_t sz = 0;
574 int i, idx = 0;
575
576 if (frame > end)
577 return -1;
578
579 switch (frame[idx++] & SPOE_DATA_T_MASK) {
580 case SPOE_DATA_T_BOOL:
581 break;
582 case SPOE_DATA_T_INT32:
583 case SPOE_DATA_T_INT64:
584 case SPOE_DATA_T_UINT32:
585 case SPOE_DATA_T_UINT64:
586 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
587 return -1;
588 idx += i;
589 break;
590 case SPOE_DATA_T_IPV4:
591 idx += 4;
592 break;
593 case SPOE_DATA_T_IPV6:
594 idx += 16;
595 break;
596 case SPOE_DATA_T_STR:
597 case SPOE_DATA_T_BIN:
598 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
599 return -1;
600 idx += i + sz;
601 break;
602 }
603
604 if (frame+idx > end)
605 return -1;
606 return idx;
607}
608
609/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
610 * number of read bytes is returned. See skip_spoe_data for details. */
611static int
612decode_spoe_data(char *frame, char *end, struct sample *smp)
613{
614 uint64_t sz = 0;
615 int type, i, idx = 0;
616
617 if (frame > end)
618 return -1;
619
620 type = frame[idx++];
621 switch (type & SPOE_DATA_T_MASK) {
622 case SPOE_DATA_T_BOOL:
623 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
624 smp->data.type = SMP_T_BOOL;
625 break;
626 case SPOE_DATA_T_INT32:
627 case SPOE_DATA_T_INT64:
628 case SPOE_DATA_T_UINT32:
629 case SPOE_DATA_T_UINT64:
630 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
631 return -1;
632 idx += i;
633 smp->data.type = SMP_T_SINT;
634 break;
635 case SPOE_DATA_T_IPV4:
636 if (frame+idx+4 > end)
637 return -1;
638 memcpy(&smp->data.u.ipv4, frame+idx, 4);
639 smp->data.type = SMP_T_IPV4;
640 idx += 4;
641 break;
642 case SPOE_DATA_T_IPV6:
643 if (frame+idx+16 > end)
644 return -1;
645 memcpy(&smp->data.u.ipv6, frame+idx, 16);
646 smp->data.type = SMP_T_IPV6;
647 idx += 16;
648 break;
649 case SPOE_DATA_T_STR:
650 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
651 return -1;
652 idx += i;
653 if (frame+idx+sz > end)
654 return -1;
655 smp->data.u.str.str = frame+idx;
656 smp->data.u.str.len = sz;
657 smp->data.type = SMP_T_STR;
658 idx += sz;
659 break;
660 case SPOE_DATA_T_BIN:
661 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
662 return -1;
663 idx += i;
664 if (frame+idx+sz > end)
665 return -1;
666 smp->data.u.str.str = frame+idx;
667 smp->data.u.str.len = sz;
668 smp->data.type = SMP_T_BIN;
669 idx += sz;
670 break;
671 }
672
673 if (frame+idx > end)
674 return -1;
675 return idx;
676}
677
678/* Skip an action in a frame received from an agent. If an error occurred, -1 is
679 * returned, otherwise the number of read bytes is returned. An action is
680 * composed of the action type followed by a typed data. */
681static int
682skip_spoe_action(char *frame, char *end)
683{
684 int n, i, idx = 0;
685
686 if (frame+2 > end)
687 return -1;
688
689 idx++; /* Skip the action type */
690 n = frame[idx++];
691 while (n-- > 0) {
692 if ((i = skip_spoe_data(frame+idx, end)) == -1)
693 return -1;
694 idx += i;
695 }
696
697 if (frame+idx > end)
698 return -1;
699 return idx;
700}
701
702/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
703 * success, 0 if the frame can be ignored and -1 if an error occurred. */
704static int
705prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
706{
707 int idx = 0;
708 size_t max = (7 /* TYPE + METADATA */
709 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
710 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
711 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
712
713 if (size < max)
714 return -1;
715
716 /* Frame type */
717 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
718
719 /* No flags for now */
720 memset(frame+idx, 0, 4);
721 idx += 4;
722
723 /* No stream-id and frame-id for HELLO frames */
724 frame[idx++] = 0;
725 frame[idx++] = 0;
726
727 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
728 * and "capabilities" */
729
730 /* "supported-versions" K/V item */
731 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
732 frame[idx++] = SPOE_DATA_T_STR;
733 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
734
735 /* "max-fram-size" K/V item */
736 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
737 frame[idx++] = SPOE_DATA_T_UINT32;
738 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
739
740 /* "capabilities" K/V item */
741 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
742 frame[idx++] = SPOE_DATA_T_STR;
743 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
744
745 return idx;
746}
747
748/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
749 * size on success, 0 if the frame can be ignored and -1 if an error
750 * occurred. */
751static int
752prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
753{
754 const char *reason;
755 int rlen, idx = 0;
756 size_t max = (7 /* TYPE + METADATA */
757 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
758 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
759
760 if (size < max)
761 return -1;
762
763 /* Get the message corresponding to the status code */
764 if (spoe_status_code >= SPOE_FRM_ERRS)
765 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
766 reason = spoe_frm_err_reasons[spoe_status_code];
767 rlen = strlen(reason);
768
769 /* Frame type */
770 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
771
772 /* No flags for now */
773 memset(frame+idx, 0, 4);
774 idx += 4;
775
776 /* No stream-id and frame-id for DISCONNECT frames */
777 frame[idx++] = 0;
778 frame[idx++] = 0;
779
780 /* There are 2 mandatory items: "status-code" and "message" */
781
782 /* "status-code" K/V item */
783 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
784 frame[idx++] = SPOE_DATA_T_UINT32;
785 idx += encode_spoe_varint(spoe_status_code, frame+idx);
786
787 /* "message" K/V item */
788 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
789 frame[idx++] = SPOE_DATA_T_STR;
790 idx += encode_spoe_string(reason, rlen, frame+idx);
791
792 return idx;
793}
794
795/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
796 * success, 0 if the frame can be ignored and -1 if an error occurred. */
797static int
798prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
799{
800 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
801 int idx = 0;
802
803 if (size < APPCTX_SPOE(appctx).max_frame_size)
804 return -1;
805
806 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
807
808 /* No flags for now */
809 memset(frame+idx, 0, 4);
810 idx += 4;
811
812 /* Set stream-id and frame-id */
813 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
814 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
815
816 /* Copy encoded messages */
817 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
818 idx += ctx->buffer->i;
819
820 return idx;
821}
822
823/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
824 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
825static int
826handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
827{
828 int vsn, max_frame_size;
829 int i, idx = 0;
830 size_t min_size = (7 /* TYPE + METADATA */
831 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
832 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
833 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
834
835 /* Check frame type */
836 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
837 return 0;
838
839 if (size < min_size) {
840 spoe_status_code = SPOE_FRM_ERR_INVALID;
841 return -1;
842 }
843
844 /* Skip flags: fragmentation is not supported for now */
845 idx += 4;
846
847 /* stream-id and frame-id must be cleared */
848 if (frame[idx] != 0 || frame[idx+1] != 0) {
849 spoe_status_code = SPOE_FRM_ERR_INVALID;
850 return -1;
851 }
852 idx += 2;
853
854 /* There are 3 mandatory items: "version", "max-frame-size" and
855 * "capabilities" */
856
857 /* Loop on K/V items */
858 vsn = max_frame_size = 0;
859 while (idx < size) {
860 char *str;
861 uint64_t sz;
862
863 /* Decode the item key */
864 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
865 if (str == NULL) {
866 spoe_status_code = SPOE_FRM_ERR_INVALID;
867 return -1;
868 }
869 /* Check "version" K/V item */
870 if (!memcmp(str, VERSION_KEY, sz)) {
871 /* The value must be a string */
872 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
873 spoe_status_code = SPOE_FRM_ERR_INVALID;
874 return -1;
875 }
876 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
877 if (str == NULL) {
878 spoe_status_code = SPOE_FRM_ERR_INVALID;
879 return -1;
880 }
881
882 vsn = decode_spoe_version(str, sz);
883 if (vsn == -1) {
884 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
885 return -1;
886 }
887 for (i = 0; supported_versions[i].str != NULL; ++i) {
888 if (vsn >= supported_versions[i].min &&
889 vsn <= supported_versions[i].max)
890 break;
891 }
892 if (supported_versions[i].str == NULL) {
893 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
894 return -1;
895 }
896 }
897 /* Check "max-frame-size" K/V item */
898 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
899 int type;
900
901 /* The value must be integer */
902 type = frame[idx++];
903 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
904 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
905 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
906 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
907 spoe_status_code = SPOE_FRM_ERR_INVALID;
908 return -1;
909 }
910 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
911 spoe_status_code = SPOE_FRM_ERR_INVALID;
912 return -1;
913 }
914 idx += i;
915 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
916 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
917 return -1;
918 }
919 max_frame_size = sz;
920 }
921 /* Skip "capabilities" K/V item for now */
922 else {
923 /* Silently ignore unknown item */
924 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
925 spoe_status_code = SPOE_FRM_ERR_INVALID;
926 return -1;
927 }
928 idx += i;
929 }
930 }
931
932 /* Final checks */
933 if (!vsn) {
934 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
935 return -1;
936 }
937 if (!max_frame_size) {
938 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
939 return -1;
940 }
941
942 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
943 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
944 return idx;
945}
946
947/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
948 * bytes on success, 0 if the frame can be ignored and -1 if an error
949 * occurred. */
950static int
951handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
952{
953 int i, idx = 0;
954 size_t min_size = (7 /* TYPE + METADATA */
955 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
956 + 1 + SLEN(MSG_KEY) + 1 + 1);
957
958 /* Check frame type */
959 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
960 return 0;
961
962 if (size < min_size) {
963 spoe_status_code = SPOE_FRM_ERR_INVALID;
964 return -1;
965 }
966
967 /* Skip flags: fragmentation is not supported for now */
968 idx += 4;
969
970 /* stream-id and frame-id must be cleared */
971 if (frame[idx] != 0 || frame[idx+1] != 0) {
972 spoe_status_code = SPOE_FRM_ERR_INVALID;
973 return -1;
974 }
975 idx += 2;
976
977 /* There are 2 mandatory items: "status-code" and "message" */
978
979 /* Loop on K/V items */
980 while (idx < size) {
981 char *str;
982 uint64_t sz;
983
984 /* Decode the item key */
985 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
986 if (str == NULL) {
987 spoe_status_code = SPOE_FRM_ERR_INVALID;
988 return -1;
989 }
990
991 /* Check "status-code" K/V item */
992 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
993 int type;
994
995 /* The value must be an integer */
996 type = frame[idx++];
997 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
998 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
999 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1000 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1001 spoe_status_code = SPOE_FRM_ERR_INVALID;
1002 return -1;
1003 }
1004 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1005 spoe_status_code = SPOE_FRM_ERR_INVALID;
1006 return -1;
1007 }
1008 idx += i;
1009 spoe_status_code = sz;
1010 }
1011
1012 /* Check "message" K/V item */
1013 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1014 /* The value must be a string */
1015 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1016 spoe_status_code = SPOE_FRM_ERR_INVALID;
1017 return -1;
1018 }
1019 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1020 if (str == NULL || sz > 255) {
1021 spoe_status_code = SPOE_FRM_ERR_INVALID;
1022 return -1;
1023 }
1024 memcpy(spoe_reason, str, sz);
1025 spoe_reason[sz] = 0;
1026 }
1027 else {
1028 /* Silently ignore unknown item */
1029 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1030 spoe_status_code = SPOE_FRM_ERR_INVALID;
1031 return -1;
1032 }
1033 idx += i;
1034 }
1035 }
1036
1037 return idx;
1038}
1039
1040
1041/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1042 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1043static int
1044handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1045{
1046 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1047 uint64_t stream_id, frame_id;
1048 int idx = 0;
1049 size_t min_size = (7 /* TYPE + METADATA */);
1050
1051 /* Check frame type */
1052 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1053 return 0;
1054
1055 if (size < min_size) {
1056 spoe_status_code = SPOE_FRM_ERR_INVALID;
1057 return -1;
1058 }
1059
1060 /* Skip flags: fragmentation is not supported for now */
1061 idx += 4;
1062
1063 /* Get the stream-id and the frame-id */
1064 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1065 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1066
1067 /* Check stream-id and frame-id */
1068 if (ctx->stream_id != (unsigned int)stream_id ||
1069 ctx->frame_id != (unsigned int)frame_id)
1070 return 0;
1071
1072 /* Copy encoded actions */
1073 b_reset(ctx->buffer);
1074 memcpy(ctx->buffer->p, frame+idx, size-idx);
1075 ctx->buffer->i = size-idx;
1076
1077 return idx;
1078}
1079
Christopher Fauletba7bc162016-11-07 21:07:38 +01001080/* This function is used in cfgparse.c and declared in proto/checks.h. It
1081 * prepare the request to send to agents during a healthcheck. It returns 0 on
1082 * success and -1 if an error occurred. */
1083int
1084prepare_spoe_healthcheck_request(char **req, int *len)
1085{
1086 struct appctx a;
1087 char *frame, buf[global.tune.bufsize];
1088 unsigned int framesz;
1089 int idx;
1090
1091 memset(&a, 0, sizeof(a));
1092 memset(buf, 0, sizeof(buf));
1093 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1094
1095 frame = buf+4;
1096 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1097 if (idx <= 0)
1098 return -1;
1099 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1100 return -1;
1101
1102 /* "healthcheck" K/V item */
1103 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1104 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1105
1106 framesz = htonl(idx);
1107 memcpy(buf, (char *)&framesz, 4);
1108
1109 if ((*req = malloc(idx+4)) == NULL)
1110 return -1;
1111 memcpy(*req, buf, idx+4);
1112 *len = idx+4;
1113 return 0;
1114}
1115
1116/* This function is used in checks.c and declared in proto/checks.h. It decode
1117 * the response received from an agent during a healthcheck. It returns 0 on
1118 * success and -1 if an error occurred. */
1119int
1120handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1121{
1122 struct appctx a;
1123 int r;
1124
1125 memset(&a, 0, sizeof(a));
1126 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1127
1128 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1129 goto error;
1130 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1131 if (r == 0)
1132 spoe_status_code = SPOE_FRM_ERR_INVALID;
1133 goto error;
1134 }
1135
1136 return 0;
1137
1138 error:
1139 if (spoe_status_code >= SPOE_FRM_ERRS)
1140 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1141 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1142 return -1;
1143}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001144
1145/********************************************************************
1146 * Functions that manage the SPOE applet
1147 ********************************************************************/
1148/* Callback function that catches applet timeouts. If a timeout occurred, we set
1149 * <appctx->st1> flag and the SPOE applet is woken up. */
1150static struct task *
1151process_spoe_applet(struct task * task)
1152{
1153 struct appctx *appctx = task->context;
1154
1155 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1156 if (tick_is_expired(task->expire, now_ms)) {
1157 task->expire = TICK_ETERNITY;
1158 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1159 }
1160 si_applet_want_get(appctx->owner);
1161 appctx_wakeup(appctx);
1162 return task;
1163}
1164
1165/* Remove a SPOE applet from the agent cache */
1166static void
1167remove_spoe_applet_from_cache(struct appctx *appctx)
1168{
1169 struct appctx *a, *back;
1170 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1171
1172 if (LIST_ISEMPTY(&agent->cache))
1173 return;
1174
1175 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1176 if (a == appctx) {
1177 LIST_DEL(&APPCTX_SPOE(appctx).list);
1178 break;
1179 }
1180 }
1181}
1182
1183
1184/* Callback function that releases a SPOE applet. This happens when the
1185 * connection with the agent is closed. */
1186static void
1187release_spoe_applet(struct appctx *appctx)
1188{
1189 struct stream_interface *si = appctx->owner;
1190 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1191 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1192
1193 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1194 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1195 on_new_spoe_appctx_failure(agent);
1196
1197 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1198 si_shutw(si);
1199 si_shutr(si);
1200 si_ic(si)->flags |= CF_READ_NULL;
1201 appctx->st0 = SPOE_APPCTX_ST_END;
1202 }
1203
1204 if (ctx != NULL) {
1205 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1206 ctx->appctx = NULL;
1207 }
1208
1209 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1210 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1211 __FUNCTION__, appctx);
1212
1213 /* Release the task attached to the SPOE applet */
1214 if (APPCTX_SPOE(appctx).task) {
1215 task_delete(APPCTX_SPOE(appctx).task);
1216 task_free(APPCTX_SPOE(appctx).task);
1217 }
1218
1219 /* And remove it from the agent cache */
1220 remove_spoe_applet_from_cache(appctx);
1221 APPCTX_SPOE(appctx).ctx = NULL;
1222}
1223
1224/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1225 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1226 * encoded using the callback function <prepare>. */
1227static int
1228send_spoe_frame(struct appctx *appctx,
1229 int (*prepare)(struct appctx *, char *, size_t))
1230{
1231 struct stream_interface *si = appctx->owner;
1232 int framesz, ret;
1233 uint32_t netint;
1234
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001235 if (si_ic(si)->buf->size == 0)
1236 return -1;
1237
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001238 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1239 if (ret <= 0)
1240 goto skip_or_error;
1241 framesz = ret;
1242 netint = htonl(framesz);
1243 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1244 if (ret > 0)
1245 ret = bi_putblk(si_ic(si), trash.str, framesz);
1246 if (ret <= 0) {
1247 if (ret == -1)
1248 return -1;
1249 return -2;
1250 }
1251 return 1;
1252
1253 skip_or_error:
1254 if (!ret)
1255 return -1;
1256 return -2;
1257}
1258
1259/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1260 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1261 * is decoded using the callback function <handle>. */
1262static int
1263recv_spoe_frame(struct appctx *appctx,
1264 int (*handle)(struct appctx *, char *, size_t))
1265{
1266 struct stream_interface *si = appctx->owner;
1267 int framesz, ret;
1268 uint32_t netint;
1269
1270 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1271 if (ret <= 0)
1272 goto empty_or_error;
1273 framesz = ntohl(netint);
1274 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1275 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1276 return -2;
1277 }
1278
1279 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1280 if (ret <= 0)
1281 goto empty_or_error;
1282 bo_skip(si_oc(si), ret+sizeof(netint));
1283
1284 /* First check if the received frame is a DISCONNECT frame */
1285 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1286 if (ret != 0) {
1287 if (ret > 0) {
1288 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1289 " - disconnected by peer (%d): %s\n",
1290 (int)now.tv_sec, (int)now.tv_usec,
1291 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1292 __FUNCTION__, appctx, spoe_status_code,
1293 spoe_reason);
1294 return 2;
1295 }
1296 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1297 " - error on frame (%s)\n",
1298 (int)now.tv_sec, (int)now.tv_usec,
1299 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1300 __FUNCTION__, appctx,
1301 spoe_frm_err_reasons[spoe_status_code]);
1302 return -2;
1303 }
1304 if (handle == NULL)
1305 goto out;
1306
1307 /* If not, try to decode it */
1308 ret = handle(appctx, trash.str, framesz);
1309 if (ret <= 0) {
1310 if (!ret)
1311 return -1;
1312 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1313 " - error on frame (%s)\n",
1314 (int)now.tv_sec, (int)now.tv_usec,
1315 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1316 __FUNCTION__, appctx,
1317 spoe_frm_err_reasons[spoe_status_code]);
1318 return -2;
1319 }
1320 out:
1321 return 1;
1322
1323 empty_or_error:
1324 if (!ret)
1325 return 0;
1326 spoe_status_code = SPOE_FRM_ERR_IO;
1327 return -2;
1328}
1329
1330/* I/O Handler processing messages exchanged with the agent */
1331static void
1332handle_spoe_applet(struct appctx *appctx)
1333{
1334 struct stream_interface *si = appctx->owner;
1335 struct stream *s = si_strm(si);
1336 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1337 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1338 int ret;
1339
1340 switchstate:
1341 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1342 " - appctx-state=%s\n",
1343 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1344 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1345
1346 switch (appctx->st0) {
1347 case SPOE_APPCTX_ST_CONNECT:
1348 spoe_status_code = SPOE_FRM_ERR_NONE;
1349 if (si->state <= SI_ST_CON) {
1350 si_applet_want_put(si);
1351 task_wakeup(s->task, TASK_WOKEN_MSG);
1352 break;
1353 }
1354 else if (si->state != SI_ST_EST) {
1355 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1356 on_new_spoe_appctx_failure(agent);
1357 goto switchstate;
1358 }
1359 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1360 if (ret < 0) {
1361 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1362 on_new_spoe_appctx_failure(agent);
1363 goto switchstate;
1364 }
1365 else if (!ret)
1366 goto full;
1367
1368 /* Hello frame was sent. Set the hello timeout and
1369 * wait for the reply. */
1370 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1371 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1372 /* fall through */
1373
1374 case SPOE_APPCTX_ST_CONNECTING:
1375 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1376 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1377 on_new_spoe_appctx_failure(agent);
1378 goto switchstate;
1379 }
1380 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1381 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1382 " - Connection timed out\n",
1383 (int)now.tv_sec, (int)now.tv_usec,
1384 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1385 __FUNCTION__, appctx);
1386 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1387 on_new_spoe_appctx_failure(agent);
1388 goto switchstate;
1389 }
1390 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1391 if (ret < 0) {
1392 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1393 on_new_spoe_appctx_failure(agent);
1394 goto switchstate;
1395 }
1396 if (ret == 2) {
1397 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1398 on_new_spoe_appctx_failure(agent);
1399 goto switchstate;
1400 }
1401 if (!ret)
1402 goto out;
1403
1404 /* hello handshake is finished, set the idle timeout,
1405 * Add the appctx in the agent cache, decrease the
1406 * number of new applets and wake up waiting streams. */
1407 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1408 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1409 on_new_spoe_appctx_success(agent, appctx);
1410 break;
1411
1412 case SPOE_APPCTX_ST_PROCESSING:
1413 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1414 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1415 goto switchstate;
1416 }
1417 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1418 spoe_status_code = SPOE_FRM_ERR_TOUT;
1419 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1420 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1421 goto switchstate;
1422 }
1423 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1424 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1425 if (ret < 0) {
1426 if (ret == -1) {
1427 ctx->state = SPOE_CTX_ST_ERROR;
1428 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1429 goto skip_notify_frame;
1430 }
1431 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1432 goto switchstate;
1433 }
1434 else if (!ret)
1435 goto full;
1436 ctx->state = SPOE_CTX_ST_WAITING_ACK;
Christopher Faulet03a34492016-11-19 16:47:56 +01001437 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001438 }
1439
1440 skip_notify_frame:
1441 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1442 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1443 if (ret < 0) {
1444 if (ret == -1)
1445 goto skip_notify_frame;
1446 ctx->state = SPOE_CTX_ST_ERROR;
1447 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1448 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1449 goto switchstate;
1450 }
1451 if (!ret)
1452 goto out;
1453 if (ret == 2) {
1454 ctx->state = SPOE_CTX_ST_ERROR;
1455 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1456 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1457 goto switchstate;
1458 }
1459 ctx->state = SPOE_CTX_ST_DONE;
1460 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1461 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1462 }
1463 else {
1464 if (stopping) {
1465 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1466 goto switchstate;
1467 }
1468
1469 ret = recv_spoe_frame(appctx, NULL);
1470 if (ret < 0) {
1471 if (ret == -1)
1472 goto skip_notify_frame;
1473 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1474 goto switchstate;
1475 }
1476 if (!ret)
1477 goto out;
1478 if (ret == 2) {
1479 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1480 goto switchstate;
1481 }
1482 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1483 }
1484 break;
1485
1486 case SPOE_APPCTX_ST_DISCONNECT:
1487 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1488 if (ret < 0) {
1489 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1490 goto switchstate;
1491 }
1492 else if (!ret)
1493 goto full;
1494 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1495 " - disconnected by HAProxy (%d): %s\n",
1496 (int)now.tv_sec, (int)now.tv_usec,
1497 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1498 __FUNCTION__, appctx, spoe_status_code,
1499 spoe_frm_err_reasons[spoe_status_code]);
1500
Christopher Faulet03a34492016-11-19 16:47:56 +01001501 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001502 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1503 /* fall through */
1504
1505 case SPOE_APPCTX_ST_DISCONNECTING:
1506 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1507 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1508 goto switchstate;
1509 }
1510 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1511 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1512 goto switchstate;
1513 }
1514 ret = recv_spoe_frame(appctx, NULL);
1515 if (ret < 0 || ret == 2) {
1516 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1517 goto switchstate;
1518 }
1519 break;
1520
1521 case SPOE_APPCTX_ST_EXIT:
1522 si_shutw(si);
1523 si_shutr(si);
1524 si_ic(si)->flags |= CF_READ_NULL;
1525 appctx->st0 = SPOE_APPCTX_ST_END;
1526 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1527 /* fall through */
1528
1529 case SPOE_APPCTX_ST_END:
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001530 return;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001531 }
1532
1533 out:
1534 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1535 task_queue(APPCTX_SPOE(appctx).task);
1536 si_oc(si)->flags |= CF_READ_DONTWAIT;
1537 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1538 return;
1539 full:
1540 si_applet_cant_put(si);
1541 goto out;
1542}
1543
1544struct applet spoe_applet = {
1545 .obj_type = OBJ_TYPE_APPLET,
1546 .name = "<SPOE>", /* used for logging */
1547 .fct = handle_spoe_applet,
1548 .release = release_spoe_applet,
1549};
1550
1551/* Create a SPOE applet. On success, the created applet is returned, else
1552 * NULL. */
1553static struct appctx *
1554create_spoe_appctx(struct spoe_config *conf)
1555{
1556 struct appctx *appctx;
1557 struct session *sess;
1558 struct task *task;
1559 struct stream *strm;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001560
1561 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1562 goto out_error;
1563
1564 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1565 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1566 goto out_free_appctx;
1567 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1568 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1569 APPCTX_SPOE(appctx).task->context = appctx;
1570 APPCTX_SPOE(appctx).agent = conf->agent;
1571 APPCTX_SPOE(appctx).ctx = NULL;
1572 APPCTX_SPOE(appctx).version = 0;
1573 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1574 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1575
Willy Tarreau5820a362016-12-22 15:59:02 +01001576 sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001577 if (!sess)
1578 goto out_free_spoe;
1579
1580 if ((task = task_new()) == NULL)
1581 goto out_free_sess;
1582
1583 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1584 goto out_free_task;
1585
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001586 stream_set_backend(strm, conf->agent->b.be);
1587
1588 /* applet is waiting for data */
1589 si_applet_cant_get(&strm->si[0]);
1590 appctx_wakeup(appctx);
1591
Christopher Faulet48026722016-11-16 15:01:12 +01001592 /* Increase the per-process number of cumulated connections */
1593 if (conf->agent->cps_max > 0)
1594 update_freq_ctr(&conf->agent->conn_per_sec, 1);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001595
1596 strm->do_log = NULL;
1597 strm->res.flags |= CF_READ_DONTWAIT;
1598
1599 conf->agent_fe.feconn++;
1600 jobs++;
1601 totalconn++;
1602
1603 return appctx;
1604
1605 /* Error unrolling */
1606 out_free_task:
1607 task_free(task);
1608 out_free_sess:
1609 session_free(sess);
1610 out_free_spoe:
1611 task_free(APPCTX_SPOE(appctx).task);
1612 out_free_appctx:
1613 appctx_free(appctx);
1614 out_error:
1615 return NULL;
1616}
1617
1618/* Wake up a SPOE applet attached to a SPOE context. */
1619static void
1620wakeup_spoe_appctx(struct spoe_context *ctx)
1621{
1622 if (ctx->appctx == NULL)
1623 return;
1624 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1625 si_applet_want_get(ctx->appctx->owner);
1626 si_applet_want_put(ctx->appctx->owner);
1627 appctx_wakeup(ctx->appctx);
1628 }
1629}
1630
1631
1632/* Run across the list of pending streams waiting for a SPOE applet and wake the
1633 * first. */
1634static void
1635offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1636{
1637 struct spoe_context *ctx;
1638
Christopher Fauletf7a30922016-11-10 15:04:51 +01001639 if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1640 return;
1641
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001642 if (LIST_ISEMPTY(&agent->applet_wq))
1643 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1644 else {
1645 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1646 APPCTX_SPOE(appctx).ctx = ctx;
1647 ctx->appctx = appctx;
1648 LIST_DEL(&ctx->applet_wait);
1649 LIST_INIT(&ctx->applet_wait);
1650 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1651 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1652 " - wake up stream to get available SPOE applet\n",
1653 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1654 __FUNCTION__, ctx->strm);
1655 }
1656}
1657
1658/* A failure occurred during SPOE applet creation. */
1659static void
1660on_new_spoe_appctx_failure(struct spoe_agent *agent)
1661{
1662 struct spoe_context *ctx;
1663
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001664 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001665 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1666 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1667 " - wake up stream because to SPOE applet connection failed\n",
1668 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1669 __FUNCTION__, ctx->strm);
1670 }
1671}
1672
1673static void
1674on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1675{
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001676 offer_spoe_appctx(agent, appctx);
1677}
1678/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1679 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1680static int
1681acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1682{
1683 struct spoe_config *conf = FLT_CONF(ctx->filter);
1684 struct spoe_agent *agent = conf->agent;
1685 struct appctx *appctx;
1686
1687 /* If a process is already started for this SPOE context, retry
1688 * later. */
1689 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1690 goto wait;
1691
1692 /* If needed, initialize the buffer that will be used to encode messages
1693 * and decode actions. */
1694 if (ctx->buffer == &buf_empty) {
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001695 if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
1696 LIST_DEL(&ctx->buffer_wait.list);
1697 LIST_INIT(&ctx->buffer_wait.list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001698 }
1699
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001700 if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
1701 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001702 goto wait;
1703 }
1704 }
1705
1706 /* If the SPOE applet was already set, all is done. */
1707 if (ctx->appctx)
1708 goto success;
1709
1710 /* Else try to retrieve it from the agent cache */
1711 if (!LIST_ISEMPTY(&agent->cache)) {
1712 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1713 LIST_DEL(&APPCTX_SPOE(appctx).list);
1714 APPCTX_SPOE(appctx).ctx = ctx;
1715 ctx->appctx = appctx;
1716 goto success;
1717 }
1718
Christopher Faulet48026722016-11-16 15:01:12 +01001719 /* If there is no server up for the agent's backend, this is an
1720 * error. */
1721 if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001722 goto error;
1723
1724 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1725 " - waiting for available SPOE appctx\n",
1726 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1727 ctx->strm);
1728
1729 /* Else add the stream in the waiting queue. */
1730 if (LIST_ISEMPTY(&ctx->applet_wait))
1731 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1732
1733 /* Finally, create new SPOE applet if we can */
Christopher Faulet48026722016-11-16 15:01:12 +01001734 if (agent->cps_max > 0) {
1735 if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
1736 goto wait;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001737 }
Christopher Faulet48026722016-11-16 15:01:12 +01001738 if (create_spoe_appctx(conf) == NULL)
1739 goto error;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001740
1741 wait:
1742 return 0;
1743
1744 success:
1745 /* Remove the stream from the waiting queue */
1746 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1747 LIST_DEL(&ctx->applet_wait);
1748 LIST_INIT(&ctx->applet_wait);
1749 }
1750
1751 /* Set the right flag to prevent request and response processing
1752 * in same time. */
1753 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1754 ? SPOE_CTX_FL_REQ_PROCESS
1755 : SPOE_CTX_FL_RSP_PROCESS);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001756
1757 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1758 " - acquire SPOE appctx %p from cache\n",
1759 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1760 __FUNCTION__, ctx->strm, ctx->appctx);
1761 return 1;
1762
1763 error:
1764 /* Remove the stream from the waiting queue */
1765 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1766 LIST_DEL(&ctx->applet_wait);
1767 LIST_INIT(&ctx->applet_wait);
1768 }
1769
1770 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Faulet48026722016-11-16 15:01:12 +01001771 " - failed to acquire SPOE appctx\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001772 (int)now.tv_sec, (int)now.tv_usec, agent->id,
Christopher Faulet48026722016-11-16 15:01:12 +01001773 __FUNCTION__, ctx->strm);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001774 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1775
1776 return -1;
1777}
1778
1779/* Release a SPOE applet and push it in the agent cache. */
1780static void
1781release_spoe_appctx(struct spoe_context *ctx)
1782{
1783 struct spoe_config *conf = FLT_CONF(ctx->filter);
1784 struct spoe_agent *agent = conf->agent;
1785 struct appctx *appctx = ctx->appctx;
1786
1787 /* Reset the flag to allow next processing */
1788 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1789
Christopher Fauletf7a30922016-11-10 15:04:51 +01001790 /* Reset processing timer */
1791 ctx->process_exp = TICK_ETERNITY;
1792
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001793 /* Release the buffer if needed */
1794 if (ctx->buffer != &buf_empty) {
1795 b_free(&ctx->buffer);
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001796 offer_buffers(ctx, tasks_run_queue + applets_active_queue);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001797 }
1798
1799 /* If there is no SPOE applet, all is done */
1800 if (!appctx)
1801 return;
1802
1803 /* Else, reassign it or push it in the agent cache */
1804 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1805 " - release SPOE appctx %p\n",
1806 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1807 __FUNCTION__, ctx->strm, appctx);
1808
1809 APPCTX_SPOE(appctx).ctx = NULL;
1810 ctx->appctx = NULL;
1811 offer_spoe_appctx(agent, appctx);
1812}
1813
1814/***************************************************************************
1815 * Functions that process SPOE messages and actions
1816 **************************************************************************/
1817/* Process SPOE messages for a specific event. During the processing, it returns
1818 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1819 * is returned. */
1820static int
1821process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1822 struct list *messages, int dir)
1823{
1824 struct spoe_message *msg;
1825 struct sample *smp;
1826 struct spoe_arg *arg;
1827 char *p;
1828 size_t max_size;
1829 int off, flag, idx = 0;
1830
1831 /* Reserve 32 bytes from the frame Metadata */
1832 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1833
1834 b_reset(ctx->buffer);
1835 p = ctx->buffer->p;
1836
1837 /* Loop on messages */
1838 list_for_each_entry(msg, messages, list) {
1839 if (idx + msg->id_len + 1 > max_size)
1840 goto skip;
1841
1842 /* Set the message name */
1843 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1844
1845 /* Save offset where to store the number of arguments for this
1846 * message */
1847 off = idx++;
1848 p[off] = 0;
1849
1850 /* Loop on arguments */
1851 list_for_each_entry(arg, &msg->args, list) {
1852 p[off]++; /* Increment the number of arguments */
1853
1854 if (idx + arg->name_len + 1 > max_size)
1855 goto skip;
1856
1857 /* Encode the arguement name as a string. It can by NULL */
1858 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1859
1860 /* Fetch the arguement value */
1861 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1862 if (!smp) {
1863 /* If no value is available, set it to NULL */
1864 p[idx++] = SPOE_DATA_T_NULL;
1865 continue;
1866 }
1867
1868 /* Else, encode the arguement value */
1869 switch (smp->data.type) {
1870 case SMP_T_BOOL:
1871 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1872 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1873 break;
1874 case SMP_T_SINT:
1875 p[idx++] = SPOE_DATA_T_INT64;
1876 if (idx + 8 > max_size)
1877 goto skip;
1878 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1879 break;
1880 case SMP_T_IPV4:
1881 p[idx++] = SPOE_DATA_T_IPV4;
1882 if (idx + 4 > max_size)
1883 goto skip;
1884 memcpy(p+idx, &smp->data.u.ipv4, 4);
1885 idx += 4;
1886 break;
1887 case SMP_T_IPV6:
1888 p[idx++] = SPOE_DATA_T_IPV6;
1889 if (idx + 16 > max_size)
1890 goto skip;
1891 memcpy(p+idx, &smp->data.u.ipv6, 16);
1892 idx += 16;
1893 break;
1894 case SMP_T_STR:
1895 p[idx++] = SPOE_DATA_T_STR;
1896 if (idx + smp->data.u.str.len > max_size)
1897 goto skip;
1898 idx += encode_spoe_string(smp->data.u.str.str,
1899 smp->data.u.str.len,
1900 p+idx);
1901 break;
1902 case SMP_T_BIN:
1903 p[idx++] = SPOE_DATA_T_BIN;
1904 if (idx + smp->data.u.str.len > max_size)
1905 goto skip;
1906 idx += encode_spoe_string(smp->data.u.str.str,
1907 smp->data.u.str.len,
1908 p+idx);
1909 break;
1910 case SMP_T_METH:
1911 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1912 p[idx++] = SPOE_DATA_T_STR;
1913 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1914 goto skip;
1915 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1916 http_known_methods[smp->data.u.meth.meth].len,
1917 p+idx);
1918 }
1919 else {
1920 p[idx++] = SPOE_DATA_T_STR;
1921 if (idx + smp->data.u.str.len > max_size)
1922 goto skip;
1923 idx += encode_spoe_string(smp->data.u.meth.str.str,
1924 smp->data.u.meth.str.len,
1925 p+idx);
1926 }
1927 break;
1928 default:
1929 p[idx++] = SPOE_DATA_T_NULL;
1930 }
1931 }
1932 }
1933 ctx->buffer->i = idx;
1934 return 1;
1935
1936 skip:
1937 b_reset(ctx->buffer);
1938 return 0;
1939}
1940
1941/* Helper function to set a variable */
1942static void
1943set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1944 struct sample *smp)
1945{
1946 struct spoe_config *conf = FLT_CONF(ctx->filter);
1947 struct spoe_agent *agent = conf->agent;
1948 char varname[64];
1949
1950 memset(varname, 0, sizeof(varname));
1951 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1952 scope, agent->var_pfx, len, name);
1953 vars_set_by_name_ifexist(varname, len, smp);
1954}
1955
1956/* Helper function to unset a variable */
1957static void
1958unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1959 struct sample *smp)
1960{
1961 struct spoe_config *conf = FLT_CONF(ctx->filter);
1962 struct spoe_agent *agent = conf->agent;
1963 char varname[64];
1964
1965 memset(varname, 0, sizeof(varname));
1966 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1967 scope, agent->var_pfx, len, name);
1968 vars_unset_by_name_ifexist(varname, len, smp);
1969}
1970
1971
1972/* Process SPOE actions for a specific event. During the processing, it returns
1973 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1974 * is returned. */
1975static int
1976process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1977 enum spoe_event ev, int dir)
1978{
1979 char *p;
1980 size_t size;
1981 int off, i, idx = 0;
1982
1983 p = ctx->buffer->p;
1984 size = ctx->buffer->i;
1985
1986 while (idx < size) {
1987 char *str;
1988 uint64_t sz;
1989 struct sample smp;
1990 enum spoe_action_type type;
1991
1992 off = idx;
1993 if (idx+2 > size)
1994 goto skip;
1995
1996 type = p[idx++];
1997 switch (type) {
1998 case SPOE_ACT_T_SET_VAR: {
1999 char *scope;
2000
2001 if (p[idx++] != 3)
2002 goto skip_action;
2003
2004 switch (p[idx++]) {
2005 case SPOE_SCOPE_PROC: scope = "proc"; break;
2006 case SPOE_SCOPE_SESS: scope = "sess"; break;
2007 case SPOE_SCOPE_TXN : scope = "txn"; break;
2008 case SPOE_SCOPE_REQ : scope = "req"; break;
2009 case SPOE_SCOPE_RES : scope = "res"; break;
2010 default: goto skip;
2011 }
2012
2013 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2014 if (str == NULL)
2015 goto skip;
2016 memset(&smp, 0, sizeof(smp));
2017 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
Christopher Fauletb5cff602016-11-24 14:53:22 +01002018
2019 if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002020 goto skip;
Christopher Fauletb5cff602016-11-24 14:53:22 +01002021 idx += i;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002022
2023 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2024 " - set-var '%s.%s.%.*s'\n",
2025 (int)now.tv_sec, (int)now.tv_usec,
2026 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2027 __FUNCTION__, s, scope,
2028 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2029 (int)sz, str);
2030
2031 set_spoe_var(ctx, scope, str, sz, &smp);
2032 break;
2033 }
2034
2035 case SPOE_ACT_T_UNSET_VAR: {
2036 char *scope;
2037
2038 if (p[idx++] != 2)
2039 goto skip_action;
2040
2041 switch (p[idx++]) {
2042 case SPOE_SCOPE_PROC: scope = "proc"; break;
2043 case SPOE_SCOPE_SESS: scope = "sess"; break;
2044 case SPOE_SCOPE_TXN : scope = "txn"; break;
2045 case SPOE_SCOPE_REQ : scope = "req"; break;
2046 case SPOE_SCOPE_RES : scope = "res"; break;
2047 default: goto skip;
2048 }
2049
2050 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2051 if (str == NULL)
2052 goto skip;
2053 memset(&smp, 0, sizeof(smp));
2054 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2055
2056 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2057 " - unset-var '%s.%s.%.*s'\n",
2058 (int)now.tv_sec, (int)now.tv_usec,
2059 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2060 __FUNCTION__, s, scope,
2061 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2062 (int)sz, str);
2063
2064 unset_spoe_var(ctx, scope, str, sz, &smp);
2065 break;
2066 }
2067
2068 default:
2069 skip_action:
2070 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2071 goto skip;
2072 idx += i;
2073 }
2074 }
2075
2076 return 1;
2077 skip:
2078 return 0;
2079}
2080
2081
2082/* Process a SPOE event. First, this functions will process messages attached to
2083 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2084 * ACK frame to process corresponding actions. During all the processing, it
2085 * returns 0 and it returns 1 when the processing is finished. If an error
2086 * occurred, -1 is returned. */
2087static int
2088process_spoe_event(struct stream *s, struct spoe_context *ctx,
2089 enum spoe_event ev)
2090{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002091 struct spoe_config *conf = FLT_CONF(ctx->filter);
2092 struct spoe_agent *agent = conf->agent;
2093 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002094
2095 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2096 " - ctx-state=%s - event=%s\n",
2097 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002098 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002099 spoe_event_str[ev]);
2100
Christopher Faulet48026722016-11-16 15:01:12 +01002101 if (agent->eps_max > 0) {
2102 if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2103 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2104 " - skip event '%s': max EPS reached\n",
2105 (int)now.tv_sec, (int)now.tv_usec,
2106 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2107 goto skip;
2108 }
2109 }
2110
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002111 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2112
2113 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2114 goto out;
2115
2116 if (ctx->state == SPOE_CTX_ST_ERROR)
2117 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002118
2119 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2120 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2121 " - failed to process event '%s': timeout\n",
2122 (int)now.tv_sec, (int)now.tv_usec,
2123 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2124 send_log(ctx->strm->be, LOG_WARNING,
2125 "failed to process event '%s': timeout.\n",
2126 spoe_event_str[ev]);
2127 goto error;
2128 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002129
2130 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauletf7a30922016-11-10 15:04:51 +01002131 if (!tick_isset(ctx->process_exp)) {
2132 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2133 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2134 ctx->process_exp);
2135 }
2136
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002137 ret = acquire_spoe_appctx(ctx, dir);
2138 if (ret <= 0) {
2139 if (!ret)
2140 goto out;
2141 goto error;
2142 }
2143 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2144 }
2145
2146 if (ctx->appctx == NULL)
2147 goto error;
2148
2149 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2150 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2151 if (ret <= 0) {
2152 if (!ret)
2153 goto skip;
2154 goto error;
2155 }
2156 wakeup_spoe_appctx(ctx);
2157 ret = 0;
2158 goto out;
2159 }
2160
2161 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2162 wakeup_spoe_appctx(ctx);
2163 ret = 0;
2164 goto out;
2165 }
2166
2167 if (ctx->state == SPOE_CTX_ST_DONE) {
2168 ret = process_spoe_actions(s, ctx, ev, dir);
2169 if (ret <= 0) {
2170 if (!ret)
2171 goto skip;
2172 goto error;
2173 }
2174 ctx->frame_id++;
2175 release_spoe_appctx(ctx);
2176 ctx->state = SPOE_CTX_ST_READY;
2177 }
2178
2179 out:
2180 return ret;
2181
2182 skip:
2183 release_spoe_appctx(ctx);
2184 ctx->state = SPOE_CTX_ST_READY;
2185 return 1;
2186
2187 error:
Christopher Faulet48026722016-11-16 15:01:12 +01002188 if (agent->eps_max > 0)
2189 update_freq_ctr(&agent->err_per_sec, 1);
2190
Christopher Faulet985532d2016-11-16 15:36:19 +01002191 if (agent->var_on_error) {
2192 struct sample smp;
2193
2194 memset(&smp, 0, sizeof(smp));
2195 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2196 smp.data.u.sint = 1;
2197 smp.data.type = SMP_T_BOOL;
2198
2199 set_spoe_var(ctx, "txn", agent->var_on_error,
2200 strlen(agent->var_on_error), &smp);
2201 }
2202
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002203 release_spoe_appctx(ctx);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002204 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2205 ? SPOE_CTX_ST_READY
2206 : SPOE_CTX_ST_ERROR);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002207 return 1;
2208}
2209
2210
2211/***************************************************************************
2212 * Functions that create/destroy SPOE contexts
2213 **************************************************************************/
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002214static int wakeup_spoe_context(struct spoe_context *ctx)
2215{
2216 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
2217 return 1;
2218}
2219
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002220static struct spoe_context *
2221create_spoe_context(struct filter *filter)
2222{
2223 struct spoe_config *conf = FLT_CONF(filter);
2224 struct spoe_context *ctx;
2225
2226 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2227 if (ctx == NULL) {
2228 return NULL;
2229 }
2230 memset(ctx, 0, sizeof(*ctx));
2231 ctx->filter = filter;
2232 ctx->state = SPOE_CTX_ST_NONE;
2233 ctx->flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002234 ctx->messages = conf->agent->messages;
2235 ctx->buffer = &buf_empty;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002236 LIST_INIT(&ctx->buffer_wait.list);
2237 ctx->buffer_wait.target = ctx;
2238 ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002239 LIST_INIT(&ctx->applet_wait);
2240
Christopher Fauletf7a30922016-11-10 15:04:51 +01002241 ctx->stream_id = 0;
2242 ctx->frame_id = 1;
2243 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002244
2245 return ctx;
2246}
2247
2248static void
2249destroy_spoe_context(struct spoe_context *ctx)
2250{
2251 if (!ctx)
2252 return;
2253
2254 if (ctx->appctx)
2255 APPCTX_SPOE(ctx->appctx).ctx = NULL;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002256 if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
2257 LIST_DEL(&ctx->buffer_wait.list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002258 if (!LIST_ISEMPTY(&ctx->applet_wait))
2259 LIST_DEL(&ctx->applet_wait);
2260 pool_free2(pool2_spoe_ctx, ctx);
2261}
2262
2263static void
2264reset_spoe_context(struct spoe_context *ctx)
2265{
2266 ctx->state = SPOE_CTX_ST_READY;
2267 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2268}
2269
2270
2271/***************************************************************************
2272 * Hooks that manage the filter lifecycle (init/check/deinit)
2273 **************************************************************************/
2274/* Signal handler: Do a soft stop, wakeup SPOE applet */
2275static void
2276sig_stop_spoe(struct sig_handler *sh)
2277{
2278 struct proxy *p;
2279
2280 p = proxy;
2281 while (p) {
2282 struct flt_conf *fconf;
2283
2284 list_for_each_entry(fconf, &p->filter_configs, list) {
2285 struct spoe_config *conf = fconf->conf;
2286 struct spoe_agent *agent = conf->agent;
2287 struct appctx *appctx;
2288
2289 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2290 si_applet_want_get(appctx->owner);
2291 si_applet_want_put(appctx->owner);
2292 appctx_wakeup(appctx);
2293 }
2294 }
2295 p = p->next;
2296 }
2297}
2298
2299
2300/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2301static int
2302spoe_init(struct proxy *px, struct flt_conf *fconf)
2303{
2304 struct spoe_config *conf = fconf->conf;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002305
2306 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2307 init_new_proxy(&conf->agent_fe);
2308 conf->agent_fe.parent = conf->agent;
2309 conf->agent_fe.last_change = now.tv_sec;
2310 conf->agent_fe.id = conf->agent->id;
2311 conf->agent_fe.cap = PR_CAP_FE;
2312 conf->agent_fe.mode = PR_MODE_TCP;
2313 conf->agent_fe.maxconn = 0;
2314 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2315 conf->agent_fe.conn_retries = CONN_RETRIES;
2316 conf->agent_fe.accept = frontend_accept;
2317 conf->agent_fe.srv = NULL;
2318 conf->agent_fe.timeout.client = TICK_ETERNITY;
2319 conf->agent_fe.default_target = &spoe_applet.obj_type;
2320 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2321
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002322 if (!sighandler_registered) {
2323 signal_register_fct(0, sig_stop_spoe, 0);
2324 sighandler_registered = 1;
2325 }
2326
2327 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002328}
2329
2330/* Free ressources allocated by the SPOE filter. */
2331static void
2332spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2333{
2334 struct spoe_config *conf = fconf->conf;
2335
2336 if (conf) {
2337 struct spoe_agent *agent = conf->agent;
2338 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2339 struct listener *, by_fe);
2340
2341 free(l);
2342 release_spoe_agent(agent);
2343 free(conf);
2344 }
2345 fconf->conf = NULL;
2346}
2347
2348/* Check configuration of a SPOE filter for a specified proxy.
2349 * Return 1 on error, else 0. */
2350static int
2351spoe_check(struct proxy *px, struct flt_conf *fconf)
2352{
2353 struct spoe_config *conf = fconf->conf;
2354 struct proxy *target;
2355
2356 target = proxy_be_by_name(conf->agent->b.name);
2357 if (target == NULL) {
2358 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2359 " declared at %s:%d.\n",
2360 px->id, conf->agent->b.name, conf->agent->id,
2361 conf->agent->conf.file, conf->agent->conf.line);
2362 return 1;
2363 }
2364 if (target->mode != PR_MODE_TCP) {
2365 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2366 " at %s:%d does not support HTTP mode.\n",
2367 px->id, target->id, conf->agent->id,
2368 conf->agent->conf.file, conf->agent->conf.line);
2369 return 1;
2370 }
2371
2372 free(conf->agent->b.name);
2373 conf->agent->b.name = NULL;
2374 conf->agent->b.be = target;
2375 return 0;
2376}
2377
2378/**************************************************************************
2379 * Hooks attached to a stream
2380 *************************************************************************/
2381/* Called when a filter instance is created and attach to a stream. It creates
2382 * the context that will be used to process this stream. */
2383static int
2384spoe_start(struct stream *s, struct filter *filter)
2385{
2386 struct spoe_context *ctx;
2387
2388 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2389 (int)now.tv_sec, (int)now.tv_usec,
2390 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2391 __FUNCTION__, s);
2392
2393 ctx = create_spoe_context(filter);
2394 if (ctx == NULL) {
2395 send_log(s->be, LOG_EMERG,
2396 "failed to create SPOE context for proxy %s\n",
2397 s->be->id);
2398 return 0;
2399 }
2400
2401 ctx->strm = s;
2402 ctx->state = SPOE_CTX_ST_READY;
2403 filter->ctx = ctx;
2404
2405 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2406 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2407
2408 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2409 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2410
2411 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2412 filter->pre_analyzers |= AN_RES_INSPECT;
2413
2414 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2415 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2416
2417 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2418 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2419
2420 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2421 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2422
2423 return 1;
2424}
2425
2426/* Called when a filter instance is detached from a stream. It release the
2427 * attached SPOE context. */
2428static void
2429spoe_stop(struct stream *s, struct filter *filter)
2430{
2431 struct spoe_context *ctx = filter->ctx;
2432
2433 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2434 (int)now.tv_sec, (int)now.tv_usec,
2435 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2436 __FUNCTION__, s);
2437
2438 if (ctx) {
2439 release_spoe_appctx(ctx);
2440 destroy_spoe_context(ctx);
2441 }
2442}
2443
Christopher Fauletf7a30922016-11-10 15:04:51 +01002444
2445/*
2446 * Called when the stream is woken up because of expired timer.
2447 */
2448static void
2449spoe_check_timeouts(struct stream *s, struct filter *filter)
2450{
2451 struct spoe_context *ctx = filter->ctx;
2452
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002453 if (tick_is_expired(ctx->process_exp, now_ms)) {
2454 s->pending_events |= TASK_WOKEN_MSG;
2455 if (ctx->buffer != &buf_empty) {
2456 b_free(&ctx->buffer);
2457 offer_buffers(ctx, tasks_run_queue + applets_active_queue);
2458 }
2459 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01002460}
2461
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002462/* Called when we are ready to filter data on a channel */
2463static int
2464spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2465{
2466 struct spoe_context *ctx = filter->ctx;
2467 int ret = 1;
2468
2469 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2470 " - ctx-flags=0x%08x\n",
2471 (int)now.tv_sec, (int)now.tv_usec,
2472 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2473 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2474
2475 if (!(chn->flags & CF_ISRESP)) {
2476 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2477 chn->analysers |= AN_REQ_INSPECT_FE;
2478 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2479 chn->analysers |= AN_REQ_INSPECT_BE;
2480
2481 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2482 goto out;
2483
2484 ctx->stream_id = s->uniq_id;
2485 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2486 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2487 if (ret != 1)
2488 goto out;
2489 }
2490 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2491 }
2492 else {
2493 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2494 chn->analysers |= AN_RES_INSPECT;
2495
2496 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2497 goto out;
2498
2499 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2500 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2501 if (ret != 1)
2502 goto out;
2503 }
2504 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2505 }
2506
2507 out:
2508 if (!ret) {
2509 channel_dont_read(chn);
2510 channel_dont_close(chn);
2511 }
2512 return ret;
2513}
2514
2515/* Called before a processing happens on a given channel */
2516static int
2517spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2518 struct channel *chn, unsigned an_bit)
2519{
2520 struct spoe_context *ctx = filter->ctx;
2521 int ret = 1;
2522
2523 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2524 " - ctx-flags=0x%08x - ana=0x%08x\n",
2525 (int)now.tv_sec, (int)now.tv_usec,
2526 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2527 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2528 ctx->flags, an_bit);
2529
2530 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2531 goto out;
2532
2533 switch (an_bit) {
2534 case AN_REQ_INSPECT_FE:
2535 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2536 break;
2537 case AN_REQ_INSPECT_BE:
2538 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2539 break;
2540 case AN_RES_INSPECT:
2541 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2542 break;
2543 case AN_REQ_HTTP_PROCESS_FE:
2544 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2545 break;
2546 case AN_REQ_HTTP_PROCESS_BE:
2547 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2548 break;
2549 case AN_RES_HTTP_PROCESS_FE:
2550 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2551 break;
2552 }
2553
2554 out:
2555 if (!ret) {
2556 channel_dont_read(chn);
2557 channel_dont_close(chn);
2558 }
2559 return ret;
2560}
2561
2562/* Called when the filtering on the channel ends. */
2563static int
2564spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2565{
2566 struct spoe_context *ctx = filter->ctx;
2567
2568 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2569 " - ctx-flags=0x%08x\n",
2570 (int)now.tv_sec, (int)now.tv_usec,
2571 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2572 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2573
2574 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2575 reset_spoe_context(ctx);
2576 }
2577
2578 return 1;
2579}
2580
2581/********************************************************************
2582 * Functions that manage the filter initialization
2583 ********************************************************************/
2584struct flt_ops spoe_ops = {
2585 /* Manage SPOE filter, called for each filter declaration */
2586 .init = spoe_init,
2587 .deinit = spoe_deinit,
2588 .check = spoe_check,
2589
2590 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002591 .attach = spoe_start,
2592 .detach = spoe_stop,
2593 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002594
2595 /* Handle channels activity */
2596 .channel_start_analyze = spoe_start_analyze,
2597 .channel_pre_analyze = spoe_chn_pre_analyze,
2598 .channel_end_analyze = spoe_end_analyze,
2599};
2600
2601
2602static int
2603cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2604{
2605 const char *err;
2606 int i, err_code = 0;
2607
2608 if ((cfg_scope == NULL && curengine != NULL) ||
2609 (cfg_scope != NULL && curengine == NULL) ||
2610 strcmp(curengine, cfg_scope))
2611 goto out;
2612
2613 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2614 if (!*args[1]) {
2615 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2616 file, linenum);
2617 err_code |= ERR_ALERT | ERR_ABORT;
2618 goto out;
2619 }
2620 if (*args[2]) {
2621 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2622 file, linenum, args[2]);
2623 err_code |= ERR_ALERT | ERR_ABORT;
2624 goto out;
2625 }
2626
2627 err = invalid_char(args[1]);
2628 if (err) {
2629 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2630 file, linenum, *err, args[0], args[1]);
2631 err_code |= ERR_ALERT | ERR_ABORT;
2632 goto out;
2633 }
2634
2635 if (curagent != NULL) {
2636 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2637 file, linenum);
2638 err_code |= ERR_ALERT | ERR_ABORT;
2639 goto out;
2640 }
2641 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2642 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2643 err_code |= ERR_ALERT | ERR_ABORT;
2644 goto out;
2645 }
2646
2647 curagent->id = strdup(args[1]);
2648 curagent->conf.file = strdup(file);
2649 curagent->conf.line = linenum;
2650 curagent->timeout.hello = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002651 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002652 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002653 curagent->var_pfx = NULL;
Christopher Faulet985532d2016-11-16 15:36:19 +01002654 curagent->var_on_error = NULL;
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002655 curagent->flags = 0;
Christopher Faulet48026722016-11-16 15:01:12 +01002656 curagent->cps_max = 0;
2657 curagent->eps_max = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002658
2659 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2660 LIST_INIT(&curagent->messages[i]);
2661 LIST_INIT(&curagent->cache);
2662 LIST_INIT(&curagent->applet_wq);
2663 }
2664 else if (!strcmp(args[0], "use-backend")) {
2665 if (!*args[1]) {
2666 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2667 file, linenum, args[0]);
2668 err_code |= ERR_ALERT | ERR_FATAL;
2669 goto out;
2670 }
2671 if (*args[2]) {
2672 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2673 file, linenum, args[2]);
2674 err_code |= ERR_ALERT | ERR_ABORT;
2675 goto out;
2676 }
2677 free(curagent->b.name);
2678 curagent->b.name = strdup(args[1]);
2679 }
2680 else if (!strcmp(args[0], "messages")) {
2681 int cur_arg = 1;
2682 while (*args[cur_arg]) {
2683 struct spoe_msg_placeholder *mp = NULL;
2684
2685 list_for_each_entry(mp, &curmps, list) {
2686 if (!strcmp(mp->id, args[cur_arg])) {
2687 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2688 file, linenum, args[cur_arg]);
2689 err_code |= ERR_ALERT | ERR_FATAL;
2690 goto out;
2691 }
2692 }
2693
2694 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2695 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2696 err_code |= ERR_ALERT | ERR_ABORT;
2697 goto out;
2698 }
2699 mp->id = strdup(args[cur_arg]);
2700 LIST_ADDQ(&curmps, &mp->list);
2701 cur_arg++;
2702 }
2703 }
2704 else if (!strcmp(args[0], "timeout")) {
2705 unsigned int *tv = NULL;
2706 const char *res;
2707 unsigned timeout;
2708
2709 if (!*args[1]) {
2710 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2711 file, linenum);
2712 err_code |= ERR_ALERT | ERR_FATAL;
2713 goto out;
2714 }
2715 if (!strcmp(args[1], "hello"))
2716 tv = &curagent->timeout.hello;
2717 else if (!strcmp(args[1], "idle"))
2718 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002719 else if (!strcmp(args[1], "processing"))
2720 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002721 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01002722 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002723 file, linenum, args[1]);
2724 err_code |= ERR_ALERT | ERR_FATAL;
2725 goto out;
2726 }
2727 if (!*args[2]) {
2728 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2729 file, linenum, args[1]);
2730 err_code |= ERR_ALERT | ERR_FATAL;
2731 goto out;
2732 }
2733 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2734 if (res) {
2735 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2736 file, linenum, *res, args[1]);
2737 err_code |= ERR_ALERT | ERR_ABORT;
2738 goto out;
2739 }
2740 if (*args[3]) {
2741 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2742 file, linenum, args[3]);
2743 err_code |= ERR_ALERT | ERR_ABORT;
2744 goto out;
2745 }
2746 *tv = MS_TO_TICKS(timeout);
2747 }
2748 else if (!strcmp(args[0], "option")) {
2749 if (!*args[1]) {
2750 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2751 file, linenum, args[0]);
2752 err_code |= ERR_ALERT | ERR_FATAL;
2753 goto out;
2754 }
2755 if (!strcmp(args[1], "var-prefix")) {
2756 char *tmp;
2757
2758 if (!*args[2]) {
2759 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2760 file, linenum, args[0],
2761 args[1]);
2762 err_code |= ERR_ALERT | ERR_FATAL;
2763 goto out;
2764 }
2765 tmp = args[2];
2766 while (*tmp) {
2767 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2768 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2769 file, linenum, args[0], args[1]);
2770 err_code |= ERR_ALERT | ERR_FATAL;
2771 goto out;
2772 }
2773 tmp++;
2774 }
2775 curagent->var_pfx = strdup(args[2]);
2776 }
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002777 else if (!strcmp(args[1], "continue-on-error")) {
2778 if (*args[2]) {
2779 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
Christopher Faulet48026722016-11-16 15:01:12 +01002780 file, linenum, args[2]);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002781 err_code |= ERR_ALERT | ERR_ABORT;
2782 goto out;
2783 }
2784 curagent->flags |= SPOE_FL_CONT_ON_ERR;
2785 }
Christopher Faulet985532d2016-11-16 15:36:19 +01002786 else if (!strcmp(args[1], "set-on-error")) {
2787 char *tmp;
2788
2789 if (!*args[2]) {
2790 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2791 file, linenum, args[0],
2792 args[1]);
2793 err_code |= ERR_ALERT | ERR_FATAL;
2794 goto out;
2795 }
2796 tmp = args[2];
2797 while (*tmp) {
2798 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2799 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2800 file, linenum, args[0], args[1]);
2801 err_code |= ERR_ALERT | ERR_FATAL;
2802 goto out;
2803 }
2804 tmp++;
2805 }
2806 curagent->var_on_error = strdup(args[2]);
2807 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002808 else {
2809 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2810 file, linenum, args[1]);
2811 err_code |= ERR_ALERT | ERR_FATAL;
2812 goto out;
2813 }
Christopher Faulet48026722016-11-16 15:01:12 +01002814 }
2815 else if (!strcmp(args[0], "maxconnrate")) {
2816 if (!*args[1]) {
2817 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2818 file, linenum, args[0]);
2819 err_code |= ERR_ALERT | ERR_FATAL;
2820 goto out;
2821 }
2822 if (*args[2]) {
2823 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2824 file, linenum, args[2]);
2825 err_code |= ERR_ALERT | ERR_ABORT;
2826 goto out;
2827 }
2828 curagent->cps_max = atol(args[1]);
2829 }
2830 else if (!strcmp(args[0], "maxerrrate")) {
2831 if (!*args[1]) {
2832 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2833 file, linenum, args[0]);
2834 err_code |= ERR_ALERT | ERR_FATAL;
2835 goto out;
2836 }
2837 if (*args[2]) {
2838 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2839 file, linenum, args[2]);
2840 err_code |= ERR_ALERT | ERR_ABORT;
2841 goto out;
2842 }
2843 curagent->eps_max = atol(args[1]);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002844 }
2845 else if (*args[0]) {
2846 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2847 file, linenum, args[0]);
2848 err_code |= ERR_ALERT | ERR_FATAL;
2849 goto out;
2850 }
2851 out:
2852 return err_code;
2853}
2854
2855static int
2856cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2857{
2858 struct spoe_message *msg;
2859 struct spoe_arg *arg;
2860 const char *err;
2861 char *errmsg = NULL;
2862 int err_code = 0;
2863
2864 if ((cfg_scope == NULL && curengine != NULL) ||
2865 (cfg_scope != NULL && curengine == NULL) ||
2866 strcmp(curengine, cfg_scope))
2867 goto out;
2868
2869 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2870 if (!*args[1]) {
2871 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2872 file, linenum);
2873 err_code |= ERR_ALERT | ERR_ABORT;
2874 goto out;
2875 }
2876 if (*args[2]) {
2877 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2878 file, linenum, args[2]);
2879 err_code |= ERR_ALERT | ERR_ABORT;
2880 goto out;
2881 }
2882
2883 err = invalid_char(args[1]);
2884 if (err) {
2885 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2886 file, linenum, *err, args[0], args[1]);
2887 err_code |= ERR_ALERT | ERR_ABORT;
2888 goto out;
2889 }
2890
2891 list_for_each_entry(msg, &curmsgs, list) {
2892 if (!strcmp(msg->id, args[1])) {
2893 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2894 " name as another one declared at %s:%d.\n",
2895 file, linenum, args[1], msg->conf.file, msg->conf.line);
2896 err_code |= ERR_ALERT | ERR_FATAL;
2897 goto out;
2898 }
2899 }
2900
2901 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2902 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2903 err_code |= ERR_ALERT | ERR_ABORT;
2904 goto out;
2905 }
2906
2907 curmsg->id = strdup(args[1]);
2908 curmsg->id_len = strlen(curmsg->id);
2909 curmsg->event = SPOE_EV_NONE;
2910 curmsg->conf.file = strdup(file);
2911 curmsg->conf.line = linenum;
2912 LIST_INIT(&curmsg->args);
2913 LIST_ADDQ(&curmsgs, &curmsg->list);
2914 }
2915 else if (!strcmp(args[0], "args")) {
2916 int cur_arg = 1;
2917
2918 curproxy->conf.args.ctx = ARGC_SPOE;
2919 curproxy->conf.args.file = file;
2920 curproxy->conf.args.line = linenum;
2921 while (*args[cur_arg]) {
2922 char *delim = strchr(args[cur_arg], '=');
2923 int idx = 0;
2924
2925 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2926 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2927 err_code |= ERR_ALERT | ERR_ABORT;
2928 goto out;
2929 }
2930
2931 if (!delim) {
2932 arg->name = NULL;
2933 arg->name_len = 0;
2934 delim = args[cur_arg];
2935 }
2936 else {
2937 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2938 arg->name_len = delim - args[cur_arg];
2939 delim++;
2940 }
2941
2942 arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
2943 if (arg->expr == NULL) {
2944 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2945 err_code |= ERR_ALERT | ERR_FATAL;
2946 free(arg->name);
2947 free(arg);
2948 goto out;
2949 }
2950 LIST_ADDQ(&curmsg->args, &arg->list);
2951 cur_arg++;
2952 }
2953 curproxy->conf.args.file = NULL;
2954 curproxy->conf.args.line = 0;
2955 }
2956 else if (!strcmp(args[0], "event")) {
2957 if (!*args[1]) {
2958 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2959 err_code |= ERR_ALERT | ERR_ABORT;
2960 goto out;
2961 }
2962 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2963 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2964 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2965 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2966
2967 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2968 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2969 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2970 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2971 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2972 curmsg->event = SPOE_EV_ON_TCP_RSP;
2973
2974 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2975 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2976 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2977 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2978 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2979 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2980 else {
2981 Alert("parsing [%s:%d] : unkown event '%s'.\n",
2982 file, linenum, args[1]);
2983 err_code |= ERR_ALERT | ERR_ABORT;
2984 goto out;
2985 }
2986 }
2987 else if (!*args[0]) {
2988 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
2989 file, linenum, args[0]);
2990 err_code |= ERR_ALERT | ERR_FATAL;
2991 goto out;
2992 }
2993 out:
2994 free(errmsg);
2995 return err_code;
2996}
2997
2998/* Return -1 on error, else 0 */
2999static int
3000parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
3001 struct flt_conf *fconf, char **err, void *private)
3002{
3003 struct list backup_sections;
3004 struct spoe_config *conf;
3005 struct spoe_message *msg, *msgback;
3006 struct spoe_msg_placeholder *mp, *mpback;
3007 char *file = NULL, *engine = NULL;
3008 int ret, pos = *cur_arg + 1;
3009
3010 conf = calloc(1, sizeof(*conf));
3011 if (conf == NULL) {
3012 memprintf(err, "%s: out of memory", args[*cur_arg]);
3013 goto error;
3014 }
3015 conf->proxy = px;
3016
3017 while (*args[pos]) {
3018 if (!strcmp(args[pos], "config")) {
3019 if (!*args[pos+1]) {
3020 memprintf(err, "'%s' : '%s' option without value",
3021 args[*cur_arg], args[pos]);
3022 goto error;
3023 }
3024 file = args[pos+1];
3025 pos += 2;
3026 }
3027 else if (!strcmp(args[pos], "engine")) {
3028 if (!*args[pos+1]) {
3029 memprintf(err, "'%s' : '%s' option without value",
3030 args[*cur_arg], args[pos]);
3031 goto error;
3032 }
3033 engine = args[pos+1];
3034 pos += 2;
3035 }
3036 else {
3037 memprintf(err, "unknown keyword '%s'", args[pos]);
3038 goto error;
3039 }
3040 }
3041 if (file == NULL) {
3042 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3043 goto error;
3044 }
3045
3046 /* backup sections and register SPOE sections */
3047 LIST_INIT(&backup_sections);
3048 cfg_backup_sections(&backup_sections);
3049 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
3050 cfg_register_section("spoe-message", cfg_parse_spoe_message);
3051
3052 /* Parse SPOE filter configuration file */
3053 curengine = engine;
3054 curproxy = px;
3055 curagent = NULL;
3056 curmsg = NULL;
3057 ret = readcfgfile(file);
3058 curproxy = NULL;
3059
3060 /* unregister SPOE sections and restore previous sections */
3061 cfg_unregister_sections();
3062 cfg_restore_sections(&backup_sections);
3063
3064 if (ret == -1) {
3065 memprintf(err, "Could not open configuration file %s : %s",
3066 file, strerror(errno));
3067 goto error;
3068 }
3069 if (ret & (ERR_ABORT|ERR_FATAL)) {
3070 memprintf(err, "Error(s) found in configuration file %s", file);
3071 goto error;
3072 }
3073
3074 /* Check SPOE agent */
3075 if (curagent == NULL) {
3076 memprintf(err, "No SPOE agent found in file %s", file);
3077 goto error;
3078 }
3079 if (curagent->b.name == NULL) {
3080 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3081 curagent->id, curagent->conf.file, curagent->conf.line);
3082 goto error;
3083 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003084 if (curagent->timeout.hello == TICK_ETERNITY ||
3085 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003086 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003087 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3088 " | While not properly invalid, you will certainly encounter various problems\n"
3089 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003090 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003091 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3092 }
3093 if (curagent->var_pfx == NULL) {
3094 char *tmp = curagent->id;
3095
3096 while (*tmp) {
3097 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3098 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3099 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3100 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3101 goto error;
3102 }
3103 tmp++;
3104 }
3105 curagent->var_pfx = strdup(curagent->id);
3106 }
3107
3108 if (LIST_ISEMPTY(&curmps)) {
3109 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3110 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3111 goto finish;
3112 }
3113
3114 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3115 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3116 if (!strcmp(msg->id, mp->id)) {
3117 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3118 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3119 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3120 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3121 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3122 }
3123 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3124 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3125 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3126 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3127 px->id, msg->conf.file, msg->conf.line);
3128 goto next;
3129 }
3130 if (msg->event == SPOE_EV_NONE) {
3131 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3132 px->id, msg->conf.file, msg->conf.line);
3133 goto next;
3134 }
3135 msg->agent = curagent;
3136 LIST_DEL(&msg->list);
3137 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3138 goto next;
3139 }
3140 }
3141 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3142 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3143 goto error;
3144 next:
3145 continue;
3146 }
3147
3148 finish:
3149 conf->agent = curagent;
3150 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3151 LIST_DEL(&mp->list);
3152 release_spoe_msg_placeholder(mp);
3153 }
3154 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3155 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3156 px->id, msg->id, msg->conf.file, msg->conf.line);
3157 LIST_DEL(&msg->list);
3158 release_spoe_message(msg);
3159 }
3160
3161 *cur_arg = pos;
3162 fconf->ops = &spoe_ops;
3163 fconf->conf = conf;
3164 return 0;
3165
3166 error:
3167 release_spoe_agent(curagent);
3168 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3169 LIST_DEL(&mp->list);
3170 release_spoe_msg_placeholder(mp);
3171 }
3172 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3173 LIST_DEL(&msg->list);
3174 release_spoe_message(msg);
3175 }
3176 free(conf);
3177 return -1;
3178}
3179
3180
3181/* Declare the filter parser for "spoe" keyword */
3182static struct flt_kw_list flt_kws = { "SPOE", { }, {
3183 { "spoe", parse_spoe_flt, NULL },
3184 { NULL, NULL, NULL },
3185 }
3186};
3187
3188__attribute__((constructor))
3189static void __spoe_init(void)
3190{
3191 flt_register_keywords(&flt_kws);
3192
3193 LIST_INIT(&curmsgs);
3194 LIST_INIT(&curmps);
3195 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3196}
3197
3198__attribute__((destructor))
3199static void
3200__spoe_deinit(void)
3201{
3202 pool_destroy2(pool2_spoe_ctx);
3203}