blob: aa6414abfdf43370e9a2f3df4628752d97126d90 [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
Christopher Faulet48026722016-11-16 15:01:12 +010033#include <proto/freq_ctr.h>
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020034#include <proto/frontend.h>
35#include <proto/log.h>
36#include <proto/proto_http.h>
37#include <proto/proxy.h>
38#include <proto/sample.h>
39#include <proto/session.h>
40#include <proto/signal.h>
41#include <proto/stream.h>
42#include <proto/stream_interface.h>
43#include <proto/task.h>
44#include <proto/vars.h>
45
46#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47#define SPOE_PRINTF(x...) fprintf(x)
48#else
49#define SPOE_PRINTF(x...)
50#endif
51
52/* Helper to get ctx inside an appctx */
53#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
54
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020055/* Minimal size for a frame */
56#define MIN_FRAME_SIZE 256
57
Christopher Fauletea62c2a2016-11-14 10:54:21 +010058/* Flags set on the SPOE agent */
59#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
60
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020061/* Flags set on the SPOE context */
62#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
63#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
64#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
65#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
66
67#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
68
69#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
70#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
71
72/* All possible states for a SPOE context */
73enum spoe_ctx_state {
74 SPOE_CTX_ST_NONE = 0,
75 SPOE_CTX_ST_READY,
76 SPOE_CTX_ST_SENDING_MSGS,
77 SPOE_CTX_ST_WAITING_ACK,
78 SPOE_CTX_ST_DONE,
79 SPOE_CTX_ST_ERROR,
80};
81
82/* All possible states for a SPOE applet */
83enum spoe_appctx_state {
84 SPOE_APPCTX_ST_CONNECT = 0,
85 SPOE_APPCTX_ST_CONNECTING,
86 SPOE_APPCTX_ST_PROCESSING,
87 SPOE_APPCTX_ST_DISCONNECT,
88 SPOE_APPCTX_ST_DISCONNECTING,
89 SPOE_APPCTX_ST_EXIT,
90 SPOE_APPCTX_ST_END,
91};
92
93/* All supported SPOE actions */
94enum spoe_action_type {
95 SPOE_ACT_T_SET_VAR = 1,
96 SPOE_ACT_T_UNSET_VAR,
97 SPOE_ACT_TYPES,
98};
99
100/* All supported SPOE events */
101enum spoe_event {
102 SPOE_EV_NONE = 0,
103
104 /* Request events */
105 SPOE_EV_ON_CLIENT_SESS = 1,
106 SPOE_EV_ON_TCP_REQ_FE,
107 SPOE_EV_ON_TCP_REQ_BE,
108 SPOE_EV_ON_HTTP_REQ_FE,
109 SPOE_EV_ON_HTTP_REQ_BE,
110
111 /* Response events */
112 SPOE_EV_ON_SERVER_SESS,
113 SPOE_EV_ON_TCP_RSP,
114 SPOE_EV_ON_HTTP_RSP,
115
116 SPOE_EV_EVENTS
117};
118
119/* Errors triggerd by SPOE applet */
120enum spoe_frame_error {
121 SPOE_FRM_ERR_NONE = 0,
122 SPOE_FRM_ERR_IO,
123 SPOE_FRM_ERR_TOUT,
124 SPOE_FRM_ERR_TOO_BIG,
125 SPOE_FRM_ERR_INVALID,
126 SPOE_FRM_ERR_NO_VSN,
127 SPOE_FRM_ERR_NO_FRAME_SIZE,
128 SPOE_FRM_ERR_NO_CAP,
129 SPOE_FRM_ERR_BAD_VSN,
130 SPOE_FRM_ERR_BAD_FRAME_SIZE,
131 SPOE_FRM_ERR_UNKNOWN = 99,
132 SPOE_FRM_ERRS,
133};
134
135/* Scopes used for variables set by agents. It is a way to be agnotic to vars
136 * scope. */
137enum spoe_vars_scope {
138 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
139 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
140 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
141 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
142 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
143};
144
145
146/* Describe an argument that will be linked to a message. It is a sample fetch,
147 * with an optional name. */
148struct spoe_arg {
149 char *name; /* Name of the argument, may be NULL */
150 unsigned int name_len; /* The name length, 0 if NULL */
151 struct sample_expr *expr; /* Sample expression */
152 struct list list; /* Used to chain SPOE args */
153};
154
155/* Used during the config parsing only because, when a SPOE agent section is
156 * parsed, messages can be undefined. */
157struct spoe_msg_placeholder {
158 char *id; /* SPOE message placeholder id */
159 struct list list; /* Use to chain SPOE message placeholders */
160};
161
162/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
163 * an argument list (see above) and it is linked to a specific event. */
164struct spoe_message {
165 char *id; /* SPOE message id */
166 unsigned int id_len; /* The message id length */
167 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
168 struct {
169 char *file; /* file where the SPOE message appears */
170 int line; /* line where the SPOE message appears */
171 } conf; /* config information */
172 struct list args; /* Arguments added when the SPOE messages is sent */
173 struct list list; /* Used to chain SPOE messages */
174
175 enum spoe_event event; /* SPOE_EV_* */
176};
177
178/* Describe a SPOE agent. */
179struct spoe_agent {
180 char *id; /* SPOE agent id (name) */
181 struct {
182 char *file; /* file where the SPOE agent appears */
183 int line; /* line where the SPOE agent appears */
184 } conf; /* config information */
185 union {
186 struct proxy *be; /* Backend used by this agent */
187 char *name; /* Backend name used during conf parsing */
188 } b;
189 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100190 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
191 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100192 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200193 } timeout;
194
195 char *var_pfx; /* Prefix used for vars set by the agent */
Christopher Faulet985532d2016-11-16 15:36:19 +0100196 char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
Christopher Fauletea62c2a2016-11-14 10:54:21 +0100197 unsigned int flags; /* SPOE_FL_* */
Christopher Faulet48026722016-11-16 15:01:12 +0100198 unsigned int cps_max; /* Maximum number of connections per second */
199 unsigned int eps_max; /* Maximum number of errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200200
201 struct list cache; /* List used to cache SPOE streams. In
202 * fact, we cache the SPOE applect ctx */
203
204 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
205 * for each supported events */
206
207 struct list applet_wq; /* List of streams waiting for a SPOE applet */
Christopher Faulet48026722016-11-16 15:01:12 +0100208 struct freq_ctr conn_per_sec; /* connections per second */
209 struct freq_ctr err_per_sec; /* connetion errors per second */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200210};
211
212/* SPOE filter configuration */
213struct spoe_config {
214 struct proxy *proxy; /* Proxy owning the filter */
215 struct spoe_agent *agent; /* Agent used by this filter */
216 struct proxy agent_fe; /* Agent frontend */
217};
218
219/* SPOE context attached to a stream. It is the main structure that handles the
220 * processing offload */
221struct spoe_context {
222 struct filter *filter; /* The SPOE filter */
223 struct stream *strm; /* The stream that should be offloaded */
224 struct appctx *appctx; /* The SPOE appctx */
225 struct list *messages; /* List of messages that will be sent during the stream processing */
226 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
Christopher Fauleta73e59b2016-12-09 17:30:18 +0100227 struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200228 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
229
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200230 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
231 unsigned int flags; /* SPOE_CTX_FL_* */
232
233 unsigned int stream_id; /* stream_id and frame_id are used */
234 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100235 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200236};
237
238/* Set if the handle on SIGUSR1 is registered */
239static int sighandler_registered = 0;
240
241/* proxy used during the parsing */
242struct proxy *curproxy = NULL;
243
244/* The name of the SPOE engine, used during the parsing */
245char *curengine = NULL;
246
247/* SPOE agent used during the parsing */
248struct spoe_agent *curagent = NULL;
249
250/* SPOE message used during the parsing */
251struct spoe_message *curmsg = NULL;
252
253/* list of SPOE messages and placeholders used during the parsing */
254struct list curmsgs;
255struct list curmps;
256
257/* Pool used to allocate new SPOE contexts */
258static struct pool_head *pool2_spoe_ctx = NULL;
259
260/* Temporary variables used to ease error processing */
261int spoe_status_code = SPOE_FRM_ERR_NONE;
262char spoe_reason[256];
263
264struct flt_ops spoe_ops;
265
266static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
267static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
268static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
269
270/********************************************************************
271 * helper functions/globals
272 ********************************************************************/
273static void
274release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
275{
276 if (!mp)
277 return;
278 free(mp->id);
279 free(mp);
280}
281
282
283static void
284release_spoe_message(struct spoe_message *msg)
285{
286 struct spoe_arg *arg, *back;
287
288 if (!msg)
289 return;
290 free(msg->id);
291 free(msg->conf.file);
292 list_for_each_entry_safe(arg, back, &msg->args, list) {
293 release_sample_expr(arg->expr);
294 free(arg->name);
295 LIST_DEL(&arg->list);
296 free(arg);
297 }
298 free(msg);
299}
300
301static void
302release_spoe_agent(struct spoe_agent *agent)
303{
304 struct spoe_message *msg, *back;
305 int i;
306
307 if (!agent)
308 return;
309 free(agent->id);
310 free(agent->conf.file);
311 free(agent->var_pfx);
Christopher Faulet985532d2016-11-16 15:36:19 +0100312 free(agent->var_on_error);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200313 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
314 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
315 LIST_DEL(&msg->list);
316 release_spoe_message(msg);
317 }
318 }
319 free(agent);
320}
321
322static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
323 [SPOE_FRM_ERR_NONE] = "normal",
324 [SPOE_FRM_ERR_IO] = "I/O error",
325 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
326 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
327 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
328 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
329 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
330 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
331 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
332 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
333 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
334};
335
336static const char *spoe_event_str[SPOE_EV_EVENTS] = {
337 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
338 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
339 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
340 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
341 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
342
343 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
344 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
345 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
346};
347
348
349#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
350
351static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
352 [SPOE_CTX_ST_NONE] = "NONE",
353 [SPOE_CTX_ST_READY] = "READY",
354 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
355 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
356 [SPOE_CTX_ST_DONE] = "DONE",
357 [SPOE_CTX_ST_ERROR] = "ERROR",
358};
359
360static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
361 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
362 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
363 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
364 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
365 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
366 [SPOE_APPCTX_ST_EXIT] = "EXIT",
367 [SPOE_APPCTX_ST_END] = "END",
368};
369
370#endif
371/********************************************************************
372 * Functions that encode/decode SPOE frames
373 ********************************************************************/
374/* Frame Types sent by HAProxy and by agents */
375enum spoe_frame_type {
376 /* Frames sent by HAProxy */
377 SPOE_FRM_T_HAPROXY_HELLO = 1,
378 SPOE_FRM_T_HAPROXY_DISCON,
379 SPOE_FRM_T_HAPROXY_NOTIFY,
380
381 /* Frames sent by the agents */
382 SPOE_FRM_T_AGENT_HELLO = 101,
383 SPOE_FRM_T_AGENT_DISCON,
384 SPOE_FRM_T_AGENT_ACK
385};
386
387/* All supported data types */
388enum spoe_data_type {
389 SPOE_DATA_T_NULL = 0,
390 SPOE_DATA_T_BOOL,
391 SPOE_DATA_T_INT32,
392 SPOE_DATA_T_UINT32,
393 SPOE_DATA_T_INT64,
394 SPOE_DATA_T_UINT64,
395 SPOE_DATA_T_IPV4,
396 SPOE_DATA_T_IPV6,
397 SPOE_DATA_T_STR,
398 SPOE_DATA_T_BIN,
399 SPOE_DATA_TYPES
400};
401
402/* Masks to get data type or flags value */
403#define SPOE_DATA_T_MASK 0x0F
404#define SPOE_DATA_FL_MASK 0xF0
405
406/* Flags to set Boolean values */
407#define SPOE_DATA_FL_FALSE 0x00
408#define SPOE_DATA_FL_TRUE 0x10
409
410/* Helper to get static string length, excluding the terminating null byte */
411#define SLEN(str) (sizeof(str)-1)
412
413/* Predefined key used in HELLO/DISCONNECT frames */
414#define SUPPORTED_VERSIONS_KEY "supported-versions"
415#define VERSION_KEY "version"
416#define MAX_FRAME_SIZE_KEY "max-frame-size"
417#define CAPABILITIES_KEY "capabilities"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100418#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200419#define STATUS_CODE_KEY "status-code"
420#define MSG_KEY "message"
421
422struct spoe_version {
423 char *str;
424 int min;
425 int max;
426};
427
428/* All supported versions */
429static struct spoe_version supported_versions[] = {
430 {"1.0", 1000, 1000},
431 {NULL, 0, 0}
432};
433
434/* Comma-separated list of supported versions */
435#define SUPPORTED_VERSIONS_VAL "1.0"
436
437/* Comma-separated list of supported capabilities (none for now) */
438#define CAPABILITIES_VAL ""
439
440static int
441decode_spoe_version(const char *str, size_t len)
442{
443 char tmp[len+1], *start, *end;
444 double d;
445 int vsn = -1;
446
447 memset(tmp, 0, len+1);
448 memcpy(tmp, str, len);
449
450 start = tmp;
451 while (isspace(*start))
452 start++;
453
454 d = strtod(start, &end);
455 if (d == 0 || start == end)
456 goto out;
457
458 if (*end) {
459 while (isspace(*end))
460 end++;
461 if (*end)
462 goto out;
463 }
464 vsn = (int)(d * 1000);
465 out:
466 return vsn;
467}
468
469/* Encode a variable-length integer. This function never fails and returns the
470 * number of written bytes. */
471static int
472encode_spoe_varint(uint64_t i, char *buf)
473{
474 int idx;
475
476 if (i < 240) {
477 buf[0] = (unsigned char)i;
478 return 1;
479 }
480
481 buf[0] = (unsigned char)i | 240;
482 i = (i - 240) >> 4;
483 for (idx = 1; i >= 128; ++idx) {
484 buf[idx] = (unsigned char)i | 128;
485 i = (i - 128) >> 7;
486 }
487 buf[idx++] = (unsigned char)i;
488 return idx;
489}
490
491/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
492 * happens when the buffer's end in reached. On success, the number of read
493 * bytes is returned. */
494static int
495decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
496{
497 unsigned char *msg = (unsigned char *)buf;
498 int idx = 0;
499
500 if (msg > (unsigned char *)end)
501 return -1;
502
503 if (msg[0] < 240) {
504 *i = msg[0];
505 return 1;
506 }
507 *i = msg[0];
508 do {
509 ++idx;
510 if (msg+idx > (unsigned char *)end)
511 return -1;
512 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
513 } while (msg[idx] >= 128);
514 return (idx + 1);
515}
516
517/* Encode a string. The string will be prefix by its length, encoded as a
518 * variable-length integer. This function never fails and returns the number of
519 * written bytes. */
520static int
521encode_spoe_string(const char *str, size_t len, char *dst)
522{
523 int idx = 0;
524
525 if (!len) {
526 dst[0] = 0;
527 return 1;
528 }
529
530 idx += encode_spoe_varint(len, dst);
531 memcpy(dst+idx, str, len);
532 return (idx + len);
533}
534
535/* Decode a string. Its length is decoded first as a variable-length integer. If
536 * it succeeds, and if the string length is valid, the begin of the string is
537 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
538 * read is returned. If an error occurred, -1 is returned and <*str> remains
539 * NULL. */
540static int
541decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
542{
543 int i, idx = 0;
544
545 *str = NULL;
546 *len = 0;
547
548 if ((i = decode_spoe_varint(buf, end, len)) == -1)
549 goto error;
550 idx += i;
551 if (buf + idx + *len > end)
552 goto error;
553
554 *str = buf+idx;
555 return (idx + *len);
556
557 error:
558 return -1;
559}
560
561/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
562 * of bytes read is returned. A types data is composed of a type (1 byte) and
563 * corresponding data:
564 * - boolean: non additional data (0 bytes)
565 * - integers: a variable-length integer (see decode_spoe_varint)
566 * - ipv4: 4 bytes
567 * - ipv6: 16 bytes
568 * - binary and string: a buffer prefixed by its size, a variable-length
569 * integer (see decode_spoe_string) */
570static int
571skip_spoe_data(char *frame, char *end)
572{
573 uint64_t sz = 0;
574 int i, idx = 0;
575
576 if (frame > end)
577 return -1;
578
579 switch (frame[idx++] & SPOE_DATA_T_MASK) {
580 case SPOE_DATA_T_BOOL:
581 break;
582 case SPOE_DATA_T_INT32:
583 case SPOE_DATA_T_INT64:
584 case SPOE_DATA_T_UINT32:
585 case SPOE_DATA_T_UINT64:
586 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
587 return -1;
588 idx += i;
589 break;
590 case SPOE_DATA_T_IPV4:
591 idx += 4;
592 break;
593 case SPOE_DATA_T_IPV6:
594 idx += 16;
595 break;
596 case SPOE_DATA_T_STR:
597 case SPOE_DATA_T_BIN:
598 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
599 return -1;
600 idx += i + sz;
601 break;
602 }
603
604 if (frame+idx > end)
605 return -1;
606 return idx;
607}
608
609/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
610 * number of read bytes is returned. See skip_spoe_data for details. */
611static int
612decode_spoe_data(char *frame, char *end, struct sample *smp)
613{
614 uint64_t sz = 0;
615 int type, i, idx = 0;
616
617 if (frame > end)
618 return -1;
619
620 type = frame[idx++];
621 switch (type & SPOE_DATA_T_MASK) {
622 case SPOE_DATA_T_BOOL:
623 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
624 smp->data.type = SMP_T_BOOL;
625 break;
626 case SPOE_DATA_T_INT32:
627 case SPOE_DATA_T_INT64:
628 case SPOE_DATA_T_UINT32:
629 case SPOE_DATA_T_UINT64:
630 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
631 return -1;
632 idx += i;
633 smp->data.type = SMP_T_SINT;
634 break;
635 case SPOE_DATA_T_IPV4:
636 if (frame+idx+4 > end)
637 return -1;
638 memcpy(&smp->data.u.ipv4, frame+idx, 4);
639 smp->data.type = SMP_T_IPV4;
640 idx += 4;
641 break;
642 case SPOE_DATA_T_IPV6:
643 if (frame+idx+16 > end)
644 return -1;
645 memcpy(&smp->data.u.ipv6, frame+idx, 16);
646 smp->data.type = SMP_T_IPV6;
647 idx += 16;
648 break;
649 case SPOE_DATA_T_STR:
650 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
651 return -1;
652 idx += i;
653 if (frame+idx+sz > end)
654 return -1;
655 smp->data.u.str.str = frame+idx;
656 smp->data.u.str.len = sz;
657 smp->data.type = SMP_T_STR;
658 idx += sz;
659 break;
660 case SPOE_DATA_T_BIN:
661 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
662 return -1;
663 idx += i;
664 if (frame+idx+sz > end)
665 return -1;
666 smp->data.u.str.str = frame+idx;
667 smp->data.u.str.len = sz;
668 smp->data.type = SMP_T_BIN;
669 idx += sz;
670 break;
671 }
672
673 if (frame+idx > end)
674 return -1;
675 return idx;
676}
677
678/* Skip an action in a frame received from an agent. If an error occurred, -1 is
679 * returned, otherwise the number of read bytes is returned. An action is
680 * composed of the action type followed by a typed data. */
681static int
682skip_spoe_action(char *frame, char *end)
683{
684 int n, i, idx = 0;
685
686 if (frame+2 > end)
687 return -1;
688
689 idx++; /* Skip the action type */
690 n = frame[idx++];
691 while (n-- > 0) {
692 if ((i = skip_spoe_data(frame+idx, end)) == -1)
693 return -1;
694 idx += i;
695 }
696
697 if (frame+idx > end)
698 return -1;
699 return idx;
700}
701
702/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
703 * success, 0 if the frame can be ignored and -1 if an error occurred. */
704static int
705prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
706{
707 int idx = 0;
708 size_t max = (7 /* TYPE + METADATA */
709 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
710 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
711 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
712
713 if (size < max)
714 return -1;
715
716 /* Frame type */
717 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
718
719 /* No flags for now */
720 memset(frame+idx, 0, 4);
721 idx += 4;
722
723 /* No stream-id and frame-id for HELLO frames */
724 frame[idx++] = 0;
725 frame[idx++] = 0;
726
727 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
728 * and "capabilities" */
729
730 /* "supported-versions" K/V item */
731 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
732 frame[idx++] = SPOE_DATA_T_STR;
733 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
734
735 /* "max-fram-size" K/V item */
736 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
737 frame[idx++] = SPOE_DATA_T_UINT32;
738 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
739
740 /* "capabilities" K/V item */
741 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
742 frame[idx++] = SPOE_DATA_T_STR;
743 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
744
745 return idx;
746}
747
748/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
749 * size on success, 0 if the frame can be ignored and -1 if an error
750 * occurred. */
751static int
752prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
753{
754 const char *reason;
755 int rlen, idx = 0;
756 size_t max = (7 /* TYPE + METADATA */
757 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
758 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
759
760 if (size < max)
761 return -1;
762
763 /* Get the message corresponding to the status code */
764 if (spoe_status_code >= SPOE_FRM_ERRS)
765 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
766 reason = spoe_frm_err_reasons[spoe_status_code];
767 rlen = strlen(reason);
768
769 /* Frame type */
770 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
771
772 /* No flags for now */
773 memset(frame+idx, 0, 4);
774 idx += 4;
775
776 /* No stream-id and frame-id for DISCONNECT frames */
777 frame[idx++] = 0;
778 frame[idx++] = 0;
779
780 /* There are 2 mandatory items: "status-code" and "message" */
781
782 /* "status-code" K/V item */
783 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
784 frame[idx++] = SPOE_DATA_T_UINT32;
785 idx += encode_spoe_varint(spoe_status_code, frame+idx);
786
787 /* "message" K/V item */
788 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
789 frame[idx++] = SPOE_DATA_T_STR;
790 idx += encode_spoe_string(reason, rlen, frame+idx);
791
792 return idx;
793}
794
795/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
796 * success, 0 if the frame can be ignored and -1 if an error occurred. */
797static int
798prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
799{
800 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
801 int idx = 0;
802
803 if (size < APPCTX_SPOE(appctx).max_frame_size)
804 return -1;
805
806 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
807
808 /* No flags for now */
809 memset(frame+idx, 0, 4);
810 idx += 4;
811
812 /* Set stream-id and frame-id */
813 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
814 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
815
816 /* Copy encoded messages */
817 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
818 idx += ctx->buffer->i;
819
820 return idx;
821}
822
823/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
824 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
825static int
826handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
827{
828 int vsn, max_frame_size;
829 int i, idx = 0;
830 size_t min_size = (7 /* TYPE + METADATA */
831 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
832 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
833 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
834
835 /* Check frame type */
836 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
837 return 0;
838
839 if (size < min_size) {
840 spoe_status_code = SPOE_FRM_ERR_INVALID;
841 return -1;
842 }
843
844 /* Skip flags: fragmentation is not supported for now */
845 idx += 4;
846
847 /* stream-id and frame-id must be cleared */
848 if (frame[idx] != 0 || frame[idx+1] != 0) {
849 spoe_status_code = SPOE_FRM_ERR_INVALID;
850 return -1;
851 }
852 idx += 2;
853
854 /* There are 3 mandatory items: "version", "max-frame-size" and
855 * "capabilities" */
856
857 /* Loop on K/V items */
858 vsn = max_frame_size = 0;
859 while (idx < size) {
860 char *str;
861 uint64_t sz;
862
863 /* Decode the item key */
864 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
865 if (str == NULL) {
866 spoe_status_code = SPOE_FRM_ERR_INVALID;
867 return -1;
868 }
869 /* Check "version" K/V item */
870 if (!memcmp(str, VERSION_KEY, sz)) {
871 /* The value must be a string */
872 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
873 spoe_status_code = SPOE_FRM_ERR_INVALID;
874 return -1;
875 }
876 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
877 if (str == NULL) {
878 spoe_status_code = SPOE_FRM_ERR_INVALID;
879 return -1;
880 }
881
882 vsn = decode_spoe_version(str, sz);
883 if (vsn == -1) {
884 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
885 return -1;
886 }
887 for (i = 0; supported_versions[i].str != NULL; ++i) {
888 if (vsn >= supported_versions[i].min &&
889 vsn <= supported_versions[i].max)
890 break;
891 }
892 if (supported_versions[i].str == NULL) {
893 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
894 return -1;
895 }
896 }
897 /* Check "max-frame-size" K/V item */
898 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
899 int type;
900
901 /* The value must be integer */
902 type = frame[idx++];
903 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
904 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
905 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
906 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
907 spoe_status_code = SPOE_FRM_ERR_INVALID;
908 return -1;
909 }
910 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
911 spoe_status_code = SPOE_FRM_ERR_INVALID;
912 return -1;
913 }
914 idx += i;
915 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
916 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
917 return -1;
918 }
919 max_frame_size = sz;
920 }
921 /* Skip "capabilities" K/V item for now */
922 else {
923 /* Silently ignore unknown item */
924 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
925 spoe_status_code = SPOE_FRM_ERR_INVALID;
926 return -1;
927 }
928 idx += i;
929 }
930 }
931
932 /* Final checks */
933 if (!vsn) {
934 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
935 return -1;
936 }
937 if (!max_frame_size) {
938 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
939 return -1;
940 }
941
942 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
943 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
944 return idx;
945}
946
947/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
948 * bytes on success, 0 if the frame can be ignored and -1 if an error
949 * occurred. */
950static int
951handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
952{
953 int i, idx = 0;
954 size_t min_size = (7 /* TYPE + METADATA */
955 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
956 + 1 + SLEN(MSG_KEY) + 1 + 1);
957
958 /* Check frame type */
959 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
960 return 0;
961
962 if (size < min_size) {
963 spoe_status_code = SPOE_FRM_ERR_INVALID;
964 return -1;
965 }
966
967 /* Skip flags: fragmentation is not supported for now */
968 idx += 4;
969
970 /* stream-id and frame-id must be cleared */
971 if (frame[idx] != 0 || frame[idx+1] != 0) {
972 spoe_status_code = SPOE_FRM_ERR_INVALID;
973 return -1;
974 }
975 idx += 2;
976
977 /* There are 2 mandatory items: "status-code" and "message" */
978
979 /* Loop on K/V items */
980 while (idx < size) {
981 char *str;
982 uint64_t sz;
983
984 /* Decode the item key */
985 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
986 if (str == NULL) {
987 spoe_status_code = SPOE_FRM_ERR_INVALID;
988 return -1;
989 }
990
991 /* Check "status-code" K/V item */
992 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
993 int type;
994
995 /* The value must be an integer */
996 type = frame[idx++];
997 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
998 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
999 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1000 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1001 spoe_status_code = SPOE_FRM_ERR_INVALID;
1002 return -1;
1003 }
1004 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1005 spoe_status_code = SPOE_FRM_ERR_INVALID;
1006 return -1;
1007 }
1008 idx += i;
1009 spoe_status_code = sz;
1010 }
1011
1012 /* Check "message" K/V item */
1013 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1014 /* The value must be a string */
1015 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1016 spoe_status_code = SPOE_FRM_ERR_INVALID;
1017 return -1;
1018 }
1019 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1020 if (str == NULL || sz > 255) {
1021 spoe_status_code = SPOE_FRM_ERR_INVALID;
1022 return -1;
1023 }
1024 memcpy(spoe_reason, str, sz);
1025 spoe_reason[sz] = 0;
1026 }
1027 else {
1028 /* Silently ignore unknown item */
1029 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1030 spoe_status_code = SPOE_FRM_ERR_INVALID;
1031 return -1;
1032 }
1033 idx += i;
1034 }
1035 }
1036
1037 return idx;
1038}
1039
1040
1041/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1042 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1043static int
1044handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1045{
1046 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1047 uint64_t stream_id, frame_id;
1048 int idx = 0;
1049 size_t min_size = (7 /* TYPE + METADATA */);
1050
1051 /* Check frame type */
1052 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1053 return 0;
1054
1055 if (size < min_size) {
1056 spoe_status_code = SPOE_FRM_ERR_INVALID;
1057 return -1;
1058 }
1059
1060 /* Skip flags: fragmentation is not supported for now */
1061 idx += 4;
1062
1063 /* Get the stream-id and the frame-id */
1064 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1065 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1066
1067 /* Check stream-id and frame-id */
1068 if (ctx->stream_id != (unsigned int)stream_id ||
1069 ctx->frame_id != (unsigned int)frame_id)
1070 return 0;
1071
1072 /* Copy encoded actions */
1073 b_reset(ctx->buffer);
1074 memcpy(ctx->buffer->p, frame+idx, size-idx);
1075 ctx->buffer->i = size-idx;
1076
1077 return idx;
1078}
1079
Christopher Fauletba7bc162016-11-07 21:07:38 +01001080/* This function is used in cfgparse.c and declared in proto/checks.h. It
1081 * prepare the request to send to agents during a healthcheck. It returns 0 on
1082 * success and -1 if an error occurred. */
1083int
1084prepare_spoe_healthcheck_request(char **req, int *len)
1085{
1086 struct appctx a;
1087 char *frame, buf[global.tune.bufsize];
1088 unsigned int framesz;
1089 int idx;
1090
1091 memset(&a, 0, sizeof(a));
1092 memset(buf, 0, sizeof(buf));
1093 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1094
1095 frame = buf+4;
1096 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1097 if (idx <= 0)
1098 return -1;
1099 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1100 return -1;
1101
1102 /* "healthcheck" K/V item */
1103 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1104 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1105
1106 framesz = htonl(idx);
1107 memcpy(buf, (char *)&framesz, 4);
1108
1109 if ((*req = malloc(idx+4)) == NULL)
1110 return -1;
1111 memcpy(*req, buf, idx+4);
1112 *len = idx+4;
1113 return 0;
1114}
1115
1116/* This function is used in checks.c and declared in proto/checks.h. It decode
1117 * the response received from an agent during a healthcheck. It returns 0 on
1118 * success and -1 if an error occurred. */
1119int
1120handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1121{
1122 struct appctx a;
1123 int r;
1124
1125 memset(&a, 0, sizeof(a));
1126 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1127
1128 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1129 goto error;
1130 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1131 if (r == 0)
1132 spoe_status_code = SPOE_FRM_ERR_INVALID;
1133 goto error;
1134 }
1135
1136 return 0;
1137
1138 error:
1139 if (spoe_status_code >= SPOE_FRM_ERRS)
1140 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1141 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1142 return -1;
1143}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001144
1145/********************************************************************
1146 * Functions that manage the SPOE applet
1147 ********************************************************************/
1148/* Callback function that catches applet timeouts. If a timeout occurred, we set
1149 * <appctx->st1> flag and the SPOE applet is woken up. */
1150static struct task *
1151process_spoe_applet(struct task * task)
1152{
1153 struct appctx *appctx = task->context;
1154
1155 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1156 if (tick_is_expired(task->expire, now_ms)) {
1157 task->expire = TICK_ETERNITY;
1158 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1159 }
1160 si_applet_want_get(appctx->owner);
1161 appctx_wakeup(appctx);
1162 return task;
1163}
1164
1165/* Remove a SPOE applet from the agent cache */
1166static void
1167remove_spoe_applet_from_cache(struct appctx *appctx)
1168{
1169 struct appctx *a, *back;
1170 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1171
1172 if (LIST_ISEMPTY(&agent->cache))
1173 return;
1174
1175 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1176 if (a == appctx) {
1177 LIST_DEL(&APPCTX_SPOE(appctx).list);
1178 break;
1179 }
1180 }
1181}
1182
1183
1184/* Callback function that releases a SPOE applet. This happens when the
1185 * connection with the agent is closed. */
1186static void
1187release_spoe_applet(struct appctx *appctx)
1188{
1189 struct stream_interface *si = appctx->owner;
1190 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1191 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1192
1193 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1194 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1195 on_new_spoe_appctx_failure(agent);
1196
1197 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1198 si_shutw(si);
1199 si_shutr(si);
1200 si_ic(si)->flags |= CF_READ_NULL;
1201 appctx->st0 = SPOE_APPCTX_ST_END;
1202 }
1203
1204 if (ctx != NULL) {
1205 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1206 ctx->appctx = NULL;
1207 }
1208
1209 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1210 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1211 __FUNCTION__, appctx);
1212
1213 /* Release the task attached to the SPOE applet */
1214 if (APPCTX_SPOE(appctx).task) {
1215 task_delete(APPCTX_SPOE(appctx).task);
1216 task_free(APPCTX_SPOE(appctx).task);
1217 }
1218
1219 /* And remove it from the agent cache */
1220 remove_spoe_applet_from_cache(appctx);
1221 APPCTX_SPOE(appctx).ctx = NULL;
1222}
1223
1224/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1225 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1226 * encoded using the callback function <prepare>. */
1227static int
1228send_spoe_frame(struct appctx *appctx,
1229 int (*prepare)(struct appctx *, char *, size_t))
1230{
1231 struct stream_interface *si = appctx->owner;
1232 int framesz, ret;
1233 uint32_t netint;
1234
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001235 if (si_ic(si)->buf->size == 0)
1236 return -1;
1237
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001238 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1239 if (ret <= 0)
1240 goto skip_or_error;
1241 framesz = ret;
1242 netint = htonl(framesz);
1243 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1244 if (ret > 0)
1245 ret = bi_putblk(si_ic(si), trash.str, framesz);
1246 if (ret <= 0) {
1247 if (ret == -1)
1248 return -1;
1249 return -2;
1250 }
1251 return 1;
1252
1253 skip_or_error:
1254 if (!ret)
1255 return -1;
1256 return -2;
1257}
1258
1259/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1260 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1261 * is decoded using the callback function <handle>. */
1262static int
1263recv_spoe_frame(struct appctx *appctx,
1264 int (*handle)(struct appctx *, char *, size_t))
1265{
1266 struct stream_interface *si = appctx->owner;
1267 int framesz, ret;
1268 uint32_t netint;
1269
1270 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1271 if (ret <= 0)
1272 goto empty_or_error;
1273 framesz = ntohl(netint);
1274 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1275 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1276 return -2;
1277 }
1278
1279 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1280 if (ret <= 0)
1281 goto empty_or_error;
1282 bo_skip(si_oc(si), ret+sizeof(netint));
1283
1284 /* First check if the received frame is a DISCONNECT frame */
1285 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1286 if (ret != 0) {
1287 if (ret > 0) {
1288 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1289 " - disconnected by peer (%d): %s\n",
1290 (int)now.tv_sec, (int)now.tv_usec,
1291 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1292 __FUNCTION__, appctx, spoe_status_code,
1293 spoe_reason);
1294 return 2;
1295 }
1296 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1297 " - error on frame (%s)\n",
1298 (int)now.tv_sec, (int)now.tv_usec,
1299 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1300 __FUNCTION__, appctx,
1301 spoe_frm_err_reasons[spoe_status_code]);
1302 return -2;
1303 }
1304 if (handle == NULL)
1305 goto out;
1306
1307 /* If not, try to decode it */
1308 ret = handle(appctx, trash.str, framesz);
1309 if (ret <= 0) {
1310 if (!ret)
1311 return -1;
1312 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1313 " - error on frame (%s)\n",
1314 (int)now.tv_sec, (int)now.tv_usec,
1315 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1316 __FUNCTION__, appctx,
1317 spoe_frm_err_reasons[spoe_status_code]);
1318 return -2;
1319 }
1320 out:
1321 return 1;
1322
1323 empty_or_error:
1324 if (!ret)
1325 return 0;
1326 spoe_status_code = SPOE_FRM_ERR_IO;
1327 return -2;
1328}
1329
1330/* I/O Handler processing messages exchanged with the agent */
1331static void
1332handle_spoe_applet(struct appctx *appctx)
1333{
1334 struct stream_interface *si = appctx->owner;
1335 struct stream *s = si_strm(si);
1336 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1337 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1338 int ret;
1339
1340 switchstate:
1341 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1342 " - appctx-state=%s\n",
1343 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1344 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1345
1346 switch (appctx->st0) {
1347 case SPOE_APPCTX_ST_CONNECT:
1348 spoe_status_code = SPOE_FRM_ERR_NONE;
1349 if (si->state <= SI_ST_CON) {
1350 si_applet_want_put(si);
1351 task_wakeup(s->task, TASK_WOKEN_MSG);
1352 break;
1353 }
1354 else if (si->state != SI_ST_EST) {
1355 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1356 on_new_spoe_appctx_failure(agent);
1357 goto switchstate;
1358 }
1359 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1360 if (ret < 0) {
1361 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1362 on_new_spoe_appctx_failure(agent);
1363 goto switchstate;
1364 }
1365 else if (!ret)
1366 goto full;
1367
1368 /* Hello frame was sent. Set the hello timeout and
1369 * wait for the reply. */
1370 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1371 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1372 /* fall through */
1373
1374 case SPOE_APPCTX_ST_CONNECTING:
1375 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1376 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1377 on_new_spoe_appctx_failure(agent);
1378 goto switchstate;
1379 }
1380 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1381 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1382 " - Connection timed out\n",
1383 (int)now.tv_sec, (int)now.tv_usec,
1384 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1385 __FUNCTION__, appctx);
1386 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1387 on_new_spoe_appctx_failure(agent);
1388 goto switchstate;
1389 }
1390 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1391 if (ret < 0) {
1392 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1393 on_new_spoe_appctx_failure(agent);
1394 goto switchstate;
1395 }
1396 if (ret == 2) {
1397 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1398 on_new_spoe_appctx_failure(agent);
1399 goto switchstate;
1400 }
1401 if (!ret)
1402 goto out;
1403
1404 /* hello handshake is finished, set the idle timeout,
1405 * Add the appctx in the agent cache, decrease the
1406 * number of new applets and wake up waiting streams. */
1407 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1408 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1409 on_new_spoe_appctx_success(agent, appctx);
1410 break;
1411
1412 case SPOE_APPCTX_ST_PROCESSING:
1413 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1414 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1415 goto switchstate;
1416 }
1417 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1418 spoe_status_code = SPOE_FRM_ERR_TOUT;
1419 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1420 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1421 goto switchstate;
1422 }
1423 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1424 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1425 if (ret < 0) {
1426 if (ret == -1) {
1427 ctx->state = SPOE_CTX_ST_ERROR;
1428 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1429 goto skip_notify_frame;
1430 }
1431 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1432 goto switchstate;
1433 }
1434 else if (!ret)
1435 goto full;
1436 ctx->state = SPOE_CTX_ST_WAITING_ACK;
Christopher Faulet03a34492016-11-19 16:47:56 +01001437 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001438 }
1439
1440 skip_notify_frame:
1441 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1442 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1443 if (ret < 0) {
1444 if (ret == -1)
1445 goto skip_notify_frame;
1446 ctx->state = SPOE_CTX_ST_ERROR;
1447 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1448 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1449 goto switchstate;
1450 }
1451 if (!ret)
1452 goto out;
1453 if (ret == 2) {
1454 ctx->state = SPOE_CTX_ST_ERROR;
1455 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1456 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1457 goto switchstate;
1458 }
1459 ctx->state = SPOE_CTX_ST_DONE;
1460 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1461 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1462 }
1463 else {
1464 if (stopping) {
1465 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1466 goto switchstate;
1467 }
1468
1469 ret = recv_spoe_frame(appctx, NULL);
1470 if (ret < 0) {
1471 if (ret == -1)
1472 goto skip_notify_frame;
1473 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1474 goto switchstate;
1475 }
1476 if (!ret)
1477 goto out;
1478 if (ret == 2) {
1479 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1480 goto switchstate;
1481 }
1482 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1483 }
1484 break;
1485
1486 case SPOE_APPCTX_ST_DISCONNECT:
1487 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1488 if (ret < 0) {
1489 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1490 goto switchstate;
1491 }
1492 else if (!ret)
1493 goto full;
1494 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1495 " - disconnected by HAProxy (%d): %s\n",
1496 (int)now.tv_sec, (int)now.tv_usec,
1497 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1498 __FUNCTION__, appctx, spoe_status_code,
1499 spoe_frm_err_reasons[spoe_status_code]);
1500
Christopher Faulet03a34492016-11-19 16:47:56 +01001501 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001502 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1503 /* fall through */
1504
1505 case SPOE_APPCTX_ST_DISCONNECTING:
1506 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1507 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1508 goto switchstate;
1509 }
1510 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1511 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1512 goto switchstate;
1513 }
1514 ret = recv_spoe_frame(appctx, NULL);
1515 if (ret < 0 || ret == 2) {
1516 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1517 goto switchstate;
1518 }
1519 break;
1520
1521 case SPOE_APPCTX_ST_EXIT:
1522 si_shutw(si);
1523 si_shutr(si);
1524 si_ic(si)->flags |= CF_READ_NULL;
1525 appctx->st0 = SPOE_APPCTX_ST_END;
1526 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1527 /* fall through */
1528
1529 case SPOE_APPCTX_ST_END:
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001530 return;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001531 }
1532
1533 out:
1534 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1535 task_queue(APPCTX_SPOE(appctx).task);
1536 si_oc(si)->flags |= CF_READ_DONTWAIT;
1537 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1538 return;
1539 full:
1540 si_applet_cant_put(si);
1541 goto out;
1542}
1543
1544struct applet spoe_applet = {
1545 .obj_type = OBJ_TYPE_APPLET,
1546 .name = "<SPOE>", /* used for logging */
1547 .fct = handle_spoe_applet,
1548 .release = release_spoe_applet,
1549};
1550
1551/* Create a SPOE applet. On success, the created applet is returned, else
1552 * NULL. */
1553static struct appctx *
1554create_spoe_appctx(struct spoe_config *conf)
1555{
1556 struct appctx *appctx;
1557 struct session *sess;
1558 struct task *task;
1559 struct stream *strm;
1560 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1561 struct listener *, by_fe);
1562
1563 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1564 goto out_error;
1565
1566 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1567 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1568 goto out_free_appctx;
1569 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1570 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1571 APPCTX_SPOE(appctx).task->context = appctx;
1572 APPCTX_SPOE(appctx).agent = conf->agent;
1573 APPCTX_SPOE(appctx).ctx = NULL;
1574 APPCTX_SPOE(appctx).version = 0;
1575 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1576 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1577
1578 sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1579 if (!sess)
1580 goto out_free_spoe;
1581
1582 if ((task = task_new()) == NULL)
1583 goto out_free_sess;
1584
1585 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1586 goto out_free_task;
1587
1588 strm->target = sess->listener->default_target;
1589 strm->req.analysers |= sess->listener->analysers;
1590 stream_set_backend(strm, conf->agent->b.be);
1591
1592 /* applet is waiting for data */
1593 si_applet_cant_get(&strm->si[0]);
1594 appctx_wakeup(appctx);
1595
Christopher Faulet48026722016-11-16 15:01:12 +01001596 /* Increase the per-process number of cumulated connections */
1597 if (conf->agent->cps_max > 0)
1598 update_freq_ctr(&conf->agent->conn_per_sec, 1);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001599
1600 strm->do_log = NULL;
1601 strm->res.flags |= CF_READ_DONTWAIT;
1602
1603 conf->agent_fe.feconn++;
1604 jobs++;
1605 totalconn++;
1606
1607 return appctx;
1608
1609 /* Error unrolling */
1610 out_free_task:
1611 task_free(task);
1612 out_free_sess:
1613 session_free(sess);
1614 out_free_spoe:
1615 task_free(APPCTX_SPOE(appctx).task);
1616 out_free_appctx:
1617 appctx_free(appctx);
1618 out_error:
1619 return NULL;
1620}
1621
1622/* Wake up a SPOE applet attached to a SPOE context. */
1623static void
1624wakeup_spoe_appctx(struct spoe_context *ctx)
1625{
1626 if (ctx->appctx == NULL)
1627 return;
1628 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1629 si_applet_want_get(ctx->appctx->owner);
1630 si_applet_want_put(ctx->appctx->owner);
1631 appctx_wakeup(ctx->appctx);
1632 }
1633}
1634
1635
1636/* Run across the list of pending streams waiting for a SPOE applet and wake the
1637 * first. */
1638static void
1639offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1640{
1641 struct spoe_context *ctx;
1642
Christopher Fauletf7a30922016-11-10 15:04:51 +01001643 if (!appctx || appctx->st0 > SPOE_APPCTX_ST_PROCESSING)
1644 return;
1645
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001646 if (LIST_ISEMPTY(&agent->applet_wq))
1647 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1648 else {
1649 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1650 APPCTX_SPOE(appctx).ctx = ctx;
1651 ctx->appctx = appctx;
1652 LIST_DEL(&ctx->applet_wait);
1653 LIST_INIT(&ctx->applet_wait);
1654 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1655 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1656 " - wake up stream to get available SPOE applet\n",
1657 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1658 __FUNCTION__, ctx->strm);
1659 }
1660}
1661
1662/* A failure occurred during SPOE applet creation. */
1663static void
1664on_new_spoe_appctx_failure(struct spoe_agent *agent)
1665{
1666 struct spoe_context *ctx;
1667
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001668 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001669 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1670 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1671 " - wake up stream because to SPOE applet connection failed\n",
1672 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1673 __FUNCTION__, ctx->strm);
1674 }
1675}
1676
1677static void
1678on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1679{
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001680 offer_spoe_appctx(agent, appctx);
1681}
1682/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1683 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1684static int
1685acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1686{
1687 struct spoe_config *conf = FLT_CONF(ctx->filter);
1688 struct spoe_agent *agent = conf->agent;
1689 struct appctx *appctx;
1690
1691 /* If a process is already started for this SPOE context, retry
1692 * later. */
1693 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1694 goto wait;
1695
1696 /* If needed, initialize the buffer that will be used to encode messages
1697 * and decode actions. */
1698 if (ctx->buffer == &buf_empty) {
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001699 if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
1700 LIST_DEL(&ctx->buffer_wait.list);
1701 LIST_INIT(&ctx->buffer_wait.list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001702 }
1703
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001704 if (!b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs)) {
1705 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001706 goto wait;
1707 }
1708 }
1709
1710 /* If the SPOE applet was already set, all is done. */
1711 if (ctx->appctx)
1712 goto success;
1713
1714 /* Else try to retrieve it from the agent cache */
1715 if (!LIST_ISEMPTY(&agent->cache)) {
1716 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1717 LIST_DEL(&APPCTX_SPOE(appctx).list);
1718 APPCTX_SPOE(appctx).ctx = ctx;
1719 ctx->appctx = appctx;
1720 goto success;
1721 }
1722
Christopher Faulet48026722016-11-16 15:01:12 +01001723 /* If there is no server up for the agent's backend, this is an
1724 * error. */
1725 if (!agent->b.be->srv_act && !agent->b.be->srv_bck)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001726 goto error;
1727
1728 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1729 " - waiting for available SPOE appctx\n",
1730 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1731 ctx->strm);
1732
1733 /* Else add the stream in the waiting queue. */
1734 if (LIST_ISEMPTY(&ctx->applet_wait))
1735 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1736
1737 /* Finally, create new SPOE applet if we can */
Christopher Faulet48026722016-11-16 15:01:12 +01001738 if (agent->cps_max > 0) {
1739 if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0))
1740 goto wait;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001741 }
Christopher Faulet48026722016-11-16 15:01:12 +01001742 if (create_spoe_appctx(conf) == NULL)
1743 goto error;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001744
1745 wait:
1746 return 0;
1747
1748 success:
1749 /* Remove the stream from the waiting queue */
1750 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1751 LIST_DEL(&ctx->applet_wait);
1752 LIST_INIT(&ctx->applet_wait);
1753 }
1754
1755 /* Set the right flag to prevent request and response processing
1756 * in same time. */
1757 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1758 ? SPOE_CTX_FL_REQ_PROCESS
1759 : SPOE_CTX_FL_RSP_PROCESS);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001760
1761 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1762 " - acquire SPOE appctx %p from cache\n",
1763 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1764 __FUNCTION__, ctx->strm, ctx->appctx);
1765 return 1;
1766
1767 error:
1768 /* Remove the stream from the waiting queue */
1769 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1770 LIST_DEL(&ctx->applet_wait);
1771 LIST_INIT(&ctx->applet_wait);
1772 }
1773
1774 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Faulet48026722016-11-16 15:01:12 +01001775 " - failed to acquire SPOE appctx\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001776 (int)now.tv_sec, (int)now.tv_usec, agent->id,
Christopher Faulet48026722016-11-16 15:01:12 +01001777 __FUNCTION__, ctx->strm);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001778 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1779
1780 return -1;
1781}
1782
1783/* Release a SPOE applet and push it in the agent cache. */
1784static void
1785release_spoe_appctx(struct spoe_context *ctx)
1786{
1787 struct spoe_config *conf = FLT_CONF(ctx->filter);
1788 struct spoe_agent *agent = conf->agent;
1789 struct appctx *appctx = ctx->appctx;
1790
1791 /* Reset the flag to allow next processing */
1792 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1793
Christopher Fauletf7a30922016-11-10 15:04:51 +01001794 /* Reset processing timer */
1795 ctx->process_exp = TICK_ETERNITY;
1796
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001797 /* Release the buffer if needed */
1798 if (ctx->buffer != &buf_empty) {
1799 b_free(&ctx->buffer);
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001800 offer_buffers(ctx, tasks_run_queue + applets_active_queue);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001801 }
1802
1803 /* If there is no SPOE applet, all is done */
1804 if (!appctx)
1805 return;
1806
1807 /* Else, reassign it or push it in the agent cache */
1808 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1809 " - release SPOE appctx %p\n",
1810 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1811 __FUNCTION__, ctx->strm, appctx);
1812
1813 APPCTX_SPOE(appctx).ctx = NULL;
1814 ctx->appctx = NULL;
1815 offer_spoe_appctx(agent, appctx);
1816}
1817
1818/***************************************************************************
1819 * Functions that process SPOE messages and actions
1820 **************************************************************************/
1821/* Process SPOE messages for a specific event. During the processing, it returns
1822 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1823 * is returned. */
1824static int
1825process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1826 struct list *messages, int dir)
1827{
1828 struct spoe_message *msg;
1829 struct sample *smp;
1830 struct spoe_arg *arg;
1831 char *p;
1832 size_t max_size;
1833 int off, flag, idx = 0;
1834
1835 /* Reserve 32 bytes from the frame Metadata */
1836 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1837
1838 b_reset(ctx->buffer);
1839 p = ctx->buffer->p;
1840
1841 /* Loop on messages */
1842 list_for_each_entry(msg, messages, list) {
1843 if (idx + msg->id_len + 1 > max_size)
1844 goto skip;
1845
1846 /* Set the message name */
1847 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1848
1849 /* Save offset where to store the number of arguments for this
1850 * message */
1851 off = idx++;
1852 p[off] = 0;
1853
1854 /* Loop on arguments */
1855 list_for_each_entry(arg, &msg->args, list) {
1856 p[off]++; /* Increment the number of arguments */
1857
1858 if (idx + arg->name_len + 1 > max_size)
1859 goto skip;
1860
1861 /* Encode the arguement name as a string. It can by NULL */
1862 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1863
1864 /* Fetch the arguement value */
1865 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1866 if (!smp) {
1867 /* If no value is available, set it to NULL */
1868 p[idx++] = SPOE_DATA_T_NULL;
1869 continue;
1870 }
1871
1872 /* Else, encode the arguement value */
1873 switch (smp->data.type) {
1874 case SMP_T_BOOL:
1875 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1876 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1877 break;
1878 case SMP_T_SINT:
1879 p[idx++] = SPOE_DATA_T_INT64;
1880 if (idx + 8 > max_size)
1881 goto skip;
1882 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1883 break;
1884 case SMP_T_IPV4:
1885 p[idx++] = SPOE_DATA_T_IPV4;
1886 if (idx + 4 > max_size)
1887 goto skip;
1888 memcpy(p+idx, &smp->data.u.ipv4, 4);
1889 idx += 4;
1890 break;
1891 case SMP_T_IPV6:
1892 p[idx++] = SPOE_DATA_T_IPV6;
1893 if (idx + 16 > max_size)
1894 goto skip;
1895 memcpy(p+idx, &smp->data.u.ipv6, 16);
1896 idx += 16;
1897 break;
1898 case SMP_T_STR:
1899 p[idx++] = SPOE_DATA_T_STR;
1900 if (idx + smp->data.u.str.len > max_size)
1901 goto skip;
1902 idx += encode_spoe_string(smp->data.u.str.str,
1903 smp->data.u.str.len,
1904 p+idx);
1905 break;
1906 case SMP_T_BIN:
1907 p[idx++] = SPOE_DATA_T_BIN;
1908 if (idx + smp->data.u.str.len > max_size)
1909 goto skip;
1910 idx += encode_spoe_string(smp->data.u.str.str,
1911 smp->data.u.str.len,
1912 p+idx);
1913 break;
1914 case SMP_T_METH:
1915 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1916 p[idx++] = SPOE_DATA_T_STR;
1917 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1918 goto skip;
1919 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1920 http_known_methods[smp->data.u.meth.meth].len,
1921 p+idx);
1922 }
1923 else {
1924 p[idx++] = SPOE_DATA_T_STR;
1925 if (idx + smp->data.u.str.len > max_size)
1926 goto skip;
1927 idx += encode_spoe_string(smp->data.u.meth.str.str,
1928 smp->data.u.meth.str.len,
1929 p+idx);
1930 }
1931 break;
1932 default:
1933 p[idx++] = SPOE_DATA_T_NULL;
1934 }
1935 }
1936 }
1937 ctx->buffer->i = idx;
1938 return 1;
1939
1940 skip:
1941 b_reset(ctx->buffer);
1942 return 0;
1943}
1944
1945/* Helper function to set a variable */
1946static void
1947set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1948 struct sample *smp)
1949{
1950 struct spoe_config *conf = FLT_CONF(ctx->filter);
1951 struct spoe_agent *agent = conf->agent;
1952 char varname[64];
1953
1954 memset(varname, 0, sizeof(varname));
1955 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1956 scope, agent->var_pfx, len, name);
1957 vars_set_by_name_ifexist(varname, len, smp);
1958}
1959
1960/* Helper function to unset a variable */
1961static void
1962unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1963 struct sample *smp)
1964{
1965 struct spoe_config *conf = FLT_CONF(ctx->filter);
1966 struct spoe_agent *agent = conf->agent;
1967 char varname[64];
1968
1969 memset(varname, 0, sizeof(varname));
1970 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1971 scope, agent->var_pfx, len, name);
1972 vars_unset_by_name_ifexist(varname, len, smp);
1973}
1974
1975
1976/* Process SPOE actions for a specific event. During the processing, it returns
1977 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1978 * is returned. */
1979static int
1980process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1981 enum spoe_event ev, int dir)
1982{
1983 char *p;
1984 size_t size;
1985 int off, i, idx = 0;
1986
1987 p = ctx->buffer->p;
1988 size = ctx->buffer->i;
1989
1990 while (idx < size) {
1991 char *str;
1992 uint64_t sz;
1993 struct sample smp;
1994 enum spoe_action_type type;
1995
1996 off = idx;
1997 if (idx+2 > size)
1998 goto skip;
1999
2000 type = p[idx++];
2001 switch (type) {
2002 case SPOE_ACT_T_SET_VAR: {
2003 char *scope;
2004
2005 if (p[idx++] != 3)
2006 goto skip_action;
2007
2008 switch (p[idx++]) {
2009 case SPOE_SCOPE_PROC: scope = "proc"; break;
2010 case SPOE_SCOPE_SESS: scope = "sess"; break;
2011 case SPOE_SCOPE_TXN : scope = "txn"; break;
2012 case SPOE_SCOPE_REQ : scope = "req"; break;
2013 case SPOE_SCOPE_RES : scope = "res"; break;
2014 default: goto skip;
2015 }
2016
2017 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2018 if (str == NULL)
2019 goto skip;
2020 memset(&smp, 0, sizeof(smp));
2021 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
Christopher Fauletb5cff602016-11-24 14:53:22 +01002022
2023 if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002024 goto skip;
Christopher Fauletb5cff602016-11-24 14:53:22 +01002025 idx += i;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002026
2027 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2028 " - set-var '%s.%s.%.*s'\n",
2029 (int)now.tv_sec, (int)now.tv_usec,
2030 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2031 __FUNCTION__, s, scope,
2032 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2033 (int)sz, str);
2034
2035 set_spoe_var(ctx, scope, str, sz, &smp);
2036 break;
2037 }
2038
2039 case SPOE_ACT_T_UNSET_VAR: {
2040 char *scope;
2041
2042 if (p[idx++] != 2)
2043 goto skip_action;
2044
2045 switch (p[idx++]) {
2046 case SPOE_SCOPE_PROC: scope = "proc"; break;
2047 case SPOE_SCOPE_SESS: scope = "sess"; break;
2048 case SPOE_SCOPE_TXN : scope = "txn"; break;
2049 case SPOE_SCOPE_REQ : scope = "req"; break;
2050 case SPOE_SCOPE_RES : scope = "res"; break;
2051 default: goto skip;
2052 }
2053
2054 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2055 if (str == NULL)
2056 goto skip;
2057 memset(&smp, 0, sizeof(smp));
2058 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2059
2060 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2061 " - unset-var '%s.%s.%.*s'\n",
2062 (int)now.tv_sec, (int)now.tv_usec,
2063 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2064 __FUNCTION__, s, scope,
2065 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2066 (int)sz, str);
2067
2068 unset_spoe_var(ctx, scope, str, sz, &smp);
2069 break;
2070 }
2071
2072 default:
2073 skip_action:
2074 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2075 goto skip;
2076 idx += i;
2077 }
2078 }
2079
2080 return 1;
2081 skip:
2082 return 0;
2083}
2084
2085
2086/* Process a SPOE event. First, this functions will process messages attached to
2087 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2088 * ACK frame to process corresponding actions. During all the processing, it
2089 * returns 0 and it returns 1 when the processing is finished. If an error
2090 * occurred, -1 is returned. */
2091static int
2092process_spoe_event(struct stream *s, struct spoe_context *ctx,
2093 enum spoe_event ev)
2094{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002095 struct spoe_config *conf = FLT_CONF(ctx->filter);
2096 struct spoe_agent *agent = conf->agent;
2097 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002098
2099 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2100 " - ctx-state=%s - event=%s\n",
2101 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002102 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002103 spoe_event_str[ev]);
2104
Christopher Faulet48026722016-11-16 15:01:12 +01002105 if (agent->eps_max > 0) {
2106 if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2107 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2108 " - skip event '%s': max EPS reached\n",
2109 (int)now.tv_sec, (int)now.tv_usec,
2110 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2111 goto skip;
2112 }
2113 }
2114
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002115 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2116
2117 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2118 goto out;
2119
2120 if (ctx->state == SPOE_CTX_ST_ERROR)
2121 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002122
2123 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2124 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2125 " - failed to process event '%s': timeout\n",
2126 (int)now.tv_sec, (int)now.tv_usec,
2127 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2128 send_log(ctx->strm->be, LOG_WARNING,
2129 "failed to process event '%s': timeout.\n",
2130 spoe_event_str[ev]);
2131 goto error;
2132 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002133
2134 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauletf7a30922016-11-10 15:04:51 +01002135 if (!tick_isset(ctx->process_exp)) {
2136 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2137 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2138 ctx->process_exp);
2139 }
2140
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002141 ret = acquire_spoe_appctx(ctx, dir);
2142 if (ret <= 0) {
2143 if (!ret)
2144 goto out;
2145 goto error;
2146 }
2147 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2148 }
2149
2150 if (ctx->appctx == NULL)
2151 goto error;
2152
2153 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2154 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2155 if (ret <= 0) {
2156 if (!ret)
2157 goto skip;
2158 goto error;
2159 }
2160 wakeup_spoe_appctx(ctx);
2161 ret = 0;
2162 goto out;
2163 }
2164
2165 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2166 wakeup_spoe_appctx(ctx);
2167 ret = 0;
2168 goto out;
2169 }
2170
2171 if (ctx->state == SPOE_CTX_ST_DONE) {
2172 ret = process_spoe_actions(s, ctx, ev, dir);
2173 if (ret <= 0) {
2174 if (!ret)
2175 goto skip;
2176 goto error;
2177 }
2178 ctx->frame_id++;
2179 release_spoe_appctx(ctx);
2180 ctx->state = SPOE_CTX_ST_READY;
2181 }
2182
2183 out:
2184 return ret;
2185
2186 skip:
2187 release_spoe_appctx(ctx);
2188 ctx->state = SPOE_CTX_ST_READY;
2189 return 1;
2190
2191 error:
Christopher Faulet48026722016-11-16 15:01:12 +01002192 if (agent->eps_max > 0)
2193 update_freq_ctr(&agent->err_per_sec, 1);
2194
Christopher Faulet985532d2016-11-16 15:36:19 +01002195 if (agent->var_on_error) {
2196 struct sample smp;
2197
2198 memset(&smp, 0, sizeof(smp));
2199 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2200 smp.data.u.sint = 1;
2201 smp.data.type = SMP_T_BOOL;
2202
2203 set_spoe_var(ctx, "txn", agent->var_on_error,
2204 strlen(agent->var_on_error), &smp);
2205 }
2206
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002207 release_spoe_appctx(ctx);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002208 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2209 ? SPOE_CTX_ST_READY
2210 : SPOE_CTX_ST_ERROR);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002211 return 1;
2212}
2213
2214
2215/***************************************************************************
2216 * Functions that create/destroy SPOE contexts
2217 **************************************************************************/
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002218static int wakeup_spoe_context(struct spoe_context *ctx)
2219{
2220 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
2221 return 1;
2222}
2223
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002224static struct spoe_context *
2225create_spoe_context(struct filter *filter)
2226{
2227 struct spoe_config *conf = FLT_CONF(filter);
2228 struct spoe_context *ctx;
2229
2230 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2231 if (ctx == NULL) {
2232 return NULL;
2233 }
2234 memset(ctx, 0, sizeof(*ctx));
2235 ctx->filter = filter;
2236 ctx->state = SPOE_CTX_ST_NONE;
2237 ctx->flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002238 ctx->messages = conf->agent->messages;
2239 ctx->buffer = &buf_empty;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002240 LIST_INIT(&ctx->buffer_wait.list);
2241 ctx->buffer_wait.target = ctx;
2242 ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002243 LIST_INIT(&ctx->applet_wait);
2244
Christopher Fauletf7a30922016-11-10 15:04:51 +01002245 ctx->stream_id = 0;
2246 ctx->frame_id = 1;
2247 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002248
2249 return ctx;
2250}
2251
2252static void
2253destroy_spoe_context(struct spoe_context *ctx)
2254{
2255 if (!ctx)
2256 return;
2257
2258 if (ctx->appctx)
2259 APPCTX_SPOE(ctx->appctx).ctx = NULL;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002260 if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
2261 LIST_DEL(&ctx->buffer_wait.list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002262 if (!LIST_ISEMPTY(&ctx->applet_wait))
2263 LIST_DEL(&ctx->applet_wait);
2264 pool_free2(pool2_spoe_ctx, ctx);
2265}
2266
2267static void
2268reset_spoe_context(struct spoe_context *ctx)
2269{
2270 ctx->state = SPOE_CTX_ST_READY;
2271 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2272}
2273
2274
2275/***************************************************************************
2276 * Hooks that manage the filter lifecycle (init/check/deinit)
2277 **************************************************************************/
2278/* Signal handler: Do a soft stop, wakeup SPOE applet */
2279static void
2280sig_stop_spoe(struct sig_handler *sh)
2281{
2282 struct proxy *p;
2283
2284 p = proxy;
2285 while (p) {
2286 struct flt_conf *fconf;
2287
2288 list_for_each_entry(fconf, &p->filter_configs, list) {
2289 struct spoe_config *conf = fconf->conf;
2290 struct spoe_agent *agent = conf->agent;
2291 struct appctx *appctx;
2292
2293 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2294 si_applet_want_get(appctx->owner);
2295 si_applet_want_put(appctx->owner);
2296 appctx_wakeup(appctx);
2297 }
2298 }
2299 p = p->next;
2300 }
2301}
2302
2303
2304/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2305static int
2306spoe_init(struct proxy *px, struct flt_conf *fconf)
2307{
2308 struct spoe_config *conf = fconf->conf;
2309 struct listener *l;
2310
2311 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2312 init_new_proxy(&conf->agent_fe);
2313 conf->agent_fe.parent = conf->agent;
2314 conf->agent_fe.last_change = now.tv_sec;
2315 conf->agent_fe.id = conf->agent->id;
2316 conf->agent_fe.cap = PR_CAP_FE;
2317 conf->agent_fe.mode = PR_MODE_TCP;
2318 conf->agent_fe.maxconn = 0;
2319 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2320 conf->agent_fe.conn_retries = CONN_RETRIES;
2321 conf->agent_fe.accept = frontend_accept;
2322 conf->agent_fe.srv = NULL;
2323 conf->agent_fe.timeout.client = TICK_ETERNITY;
2324 conf->agent_fe.default_target = &spoe_applet.obj_type;
2325 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2326
2327 if ((l = calloc(1, sizeof(*l))) == NULL) {
2328 Alert("spoe_init : out of memory.\n");
2329 goto out_error;
2330 }
2331 l->obj_type = OBJ_TYPE_LISTENER;
2332 l->obj_type = OBJ_TYPE_LISTENER;
2333 l->frontend = &conf->agent_fe;
2334 l->state = LI_READY;
2335 l->analysers = conf->agent_fe.fe_req_ana;
2336 LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2337
2338 if (!sighandler_registered) {
2339 signal_register_fct(0, sig_stop_spoe, 0);
2340 sighandler_registered = 1;
2341 }
2342
2343 return 0;
2344
2345 out_error:
2346 return -1;
2347}
2348
2349/* Free ressources allocated by the SPOE filter. */
2350static void
2351spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2352{
2353 struct spoe_config *conf = fconf->conf;
2354
2355 if (conf) {
2356 struct spoe_agent *agent = conf->agent;
2357 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2358 struct listener *, by_fe);
2359
2360 free(l);
2361 release_spoe_agent(agent);
2362 free(conf);
2363 }
2364 fconf->conf = NULL;
2365}
2366
2367/* Check configuration of a SPOE filter for a specified proxy.
2368 * Return 1 on error, else 0. */
2369static int
2370spoe_check(struct proxy *px, struct flt_conf *fconf)
2371{
2372 struct spoe_config *conf = fconf->conf;
2373 struct proxy *target;
2374
2375 target = proxy_be_by_name(conf->agent->b.name);
2376 if (target == NULL) {
2377 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2378 " declared at %s:%d.\n",
2379 px->id, conf->agent->b.name, conf->agent->id,
2380 conf->agent->conf.file, conf->agent->conf.line);
2381 return 1;
2382 }
2383 if (target->mode != PR_MODE_TCP) {
2384 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2385 " at %s:%d does not support HTTP mode.\n",
2386 px->id, target->id, conf->agent->id,
2387 conf->agent->conf.file, conf->agent->conf.line);
2388 return 1;
2389 }
2390
2391 free(conf->agent->b.name);
2392 conf->agent->b.name = NULL;
2393 conf->agent->b.be = target;
2394 return 0;
2395}
2396
2397/**************************************************************************
2398 * Hooks attached to a stream
2399 *************************************************************************/
2400/* Called when a filter instance is created and attach to a stream. It creates
2401 * the context that will be used to process this stream. */
2402static int
2403spoe_start(struct stream *s, struct filter *filter)
2404{
2405 struct spoe_context *ctx;
2406
2407 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2408 (int)now.tv_sec, (int)now.tv_usec,
2409 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2410 __FUNCTION__, s);
2411
2412 ctx = create_spoe_context(filter);
2413 if (ctx == NULL) {
2414 send_log(s->be, LOG_EMERG,
2415 "failed to create SPOE context for proxy %s\n",
2416 s->be->id);
2417 return 0;
2418 }
2419
2420 ctx->strm = s;
2421 ctx->state = SPOE_CTX_ST_READY;
2422 filter->ctx = ctx;
2423
2424 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2425 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2426
2427 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2428 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2429
2430 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2431 filter->pre_analyzers |= AN_RES_INSPECT;
2432
2433 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2434 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2435
2436 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2437 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2438
2439 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2440 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2441
2442 return 1;
2443}
2444
2445/* Called when a filter instance is detached from a stream. It release the
2446 * attached SPOE context. */
2447static void
2448spoe_stop(struct stream *s, struct filter *filter)
2449{
2450 struct spoe_context *ctx = filter->ctx;
2451
2452 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2453 (int)now.tv_sec, (int)now.tv_usec,
2454 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2455 __FUNCTION__, s);
2456
2457 if (ctx) {
2458 release_spoe_appctx(ctx);
2459 destroy_spoe_context(ctx);
2460 }
2461}
2462
Christopher Fauletf7a30922016-11-10 15:04:51 +01002463
2464/*
2465 * Called when the stream is woken up because of expired timer.
2466 */
2467static void
2468spoe_check_timeouts(struct stream *s, struct filter *filter)
2469{
2470 struct spoe_context *ctx = filter->ctx;
2471
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002472 if (tick_is_expired(ctx->process_exp, now_ms)) {
2473 s->pending_events |= TASK_WOKEN_MSG;
2474 if (ctx->buffer != &buf_empty) {
2475 b_free(&ctx->buffer);
2476 offer_buffers(ctx, tasks_run_queue + applets_active_queue);
2477 }
2478 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01002479}
2480
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002481/* Called when we are ready to filter data on a channel */
2482static int
2483spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2484{
2485 struct spoe_context *ctx = filter->ctx;
2486 int ret = 1;
2487
2488 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2489 " - ctx-flags=0x%08x\n",
2490 (int)now.tv_sec, (int)now.tv_usec,
2491 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2492 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2493
2494 if (!(chn->flags & CF_ISRESP)) {
2495 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2496 chn->analysers |= AN_REQ_INSPECT_FE;
2497 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2498 chn->analysers |= AN_REQ_INSPECT_BE;
2499
2500 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2501 goto out;
2502
2503 ctx->stream_id = s->uniq_id;
2504 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2505 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2506 if (ret != 1)
2507 goto out;
2508 }
2509 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2510 }
2511 else {
2512 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2513 chn->analysers |= AN_RES_INSPECT;
2514
2515 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2516 goto out;
2517
2518 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2519 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2520 if (ret != 1)
2521 goto out;
2522 }
2523 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2524 }
2525
2526 out:
2527 if (!ret) {
2528 channel_dont_read(chn);
2529 channel_dont_close(chn);
2530 }
2531 return ret;
2532}
2533
2534/* Called before a processing happens on a given channel */
2535static int
2536spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2537 struct channel *chn, unsigned an_bit)
2538{
2539 struct spoe_context *ctx = filter->ctx;
2540 int ret = 1;
2541
2542 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2543 " - ctx-flags=0x%08x - ana=0x%08x\n",
2544 (int)now.tv_sec, (int)now.tv_usec,
2545 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2546 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2547 ctx->flags, an_bit);
2548
2549 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2550 goto out;
2551
2552 switch (an_bit) {
2553 case AN_REQ_INSPECT_FE:
2554 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2555 break;
2556 case AN_REQ_INSPECT_BE:
2557 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2558 break;
2559 case AN_RES_INSPECT:
2560 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2561 break;
2562 case AN_REQ_HTTP_PROCESS_FE:
2563 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2564 break;
2565 case AN_REQ_HTTP_PROCESS_BE:
2566 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2567 break;
2568 case AN_RES_HTTP_PROCESS_FE:
2569 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2570 break;
2571 }
2572
2573 out:
2574 if (!ret) {
2575 channel_dont_read(chn);
2576 channel_dont_close(chn);
2577 }
2578 return ret;
2579}
2580
2581/* Called when the filtering on the channel ends. */
2582static int
2583spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2584{
2585 struct spoe_context *ctx = filter->ctx;
2586
2587 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2588 " - ctx-flags=0x%08x\n",
2589 (int)now.tv_sec, (int)now.tv_usec,
2590 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2591 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2592
2593 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2594 reset_spoe_context(ctx);
2595 }
2596
2597 return 1;
2598}
2599
2600/********************************************************************
2601 * Functions that manage the filter initialization
2602 ********************************************************************/
2603struct flt_ops spoe_ops = {
2604 /* Manage SPOE filter, called for each filter declaration */
2605 .init = spoe_init,
2606 .deinit = spoe_deinit,
2607 .check = spoe_check,
2608
2609 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002610 .attach = spoe_start,
2611 .detach = spoe_stop,
2612 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002613
2614 /* Handle channels activity */
2615 .channel_start_analyze = spoe_start_analyze,
2616 .channel_pre_analyze = spoe_chn_pre_analyze,
2617 .channel_end_analyze = spoe_end_analyze,
2618};
2619
2620
2621static int
2622cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2623{
2624 const char *err;
2625 int i, err_code = 0;
2626
2627 if ((cfg_scope == NULL && curengine != NULL) ||
2628 (cfg_scope != NULL && curengine == NULL) ||
2629 strcmp(curengine, cfg_scope))
2630 goto out;
2631
2632 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2633 if (!*args[1]) {
2634 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2635 file, linenum);
2636 err_code |= ERR_ALERT | ERR_ABORT;
2637 goto out;
2638 }
2639 if (*args[2]) {
2640 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2641 file, linenum, args[2]);
2642 err_code |= ERR_ALERT | ERR_ABORT;
2643 goto out;
2644 }
2645
2646 err = invalid_char(args[1]);
2647 if (err) {
2648 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2649 file, linenum, *err, args[0], args[1]);
2650 err_code |= ERR_ALERT | ERR_ABORT;
2651 goto out;
2652 }
2653
2654 if (curagent != NULL) {
2655 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2656 file, linenum);
2657 err_code |= ERR_ALERT | ERR_ABORT;
2658 goto out;
2659 }
2660 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2661 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2662 err_code |= ERR_ALERT | ERR_ABORT;
2663 goto out;
2664 }
2665
2666 curagent->id = strdup(args[1]);
2667 curagent->conf.file = strdup(file);
2668 curagent->conf.line = linenum;
2669 curagent->timeout.hello = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002670 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002671 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002672 curagent->var_pfx = NULL;
Christopher Faulet985532d2016-11-16 15:36:19 +01002673 curagent->var_on_error = NULL;
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002674 curagent->flags = 0;
Christopher Faulet48026722016-11-16 15:01:12 +01002675 curagent->cps_max = 0;
2676 curagent->eps_max = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002677
2678 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2679 LIST_INIT(&curagent->messages[i]);
2680 LIST_INIT(&curagent->cache);
2681 LIST_INIT(&curagent->applet_wq);
2682 }
2683 else if (!strcmp(args[0], "use-backend")) {
2684 if (!*args[1]) {
2685 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2686 file, linenum, args[0]);
2687 err_code |= ERR_ALERT | ERR_FATAL;
2688 goto out;
2689 }
2690 if (*args[2]) {
2691 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2692 file, linenum, args[2]);
2693 err_code |= ERR_ALERT | ERR_ABORT;
2694 goto out;
2695 }
2696 free(curagent->b.name);
2697 curagent->b.name = strdup(args[1]);
2698 }
2699 else if (!strcmp(args[0], "messages")) {
2700 int cur_arg = 1;
2701 while (*args[cur_arg]) {
2702 struct spoe_msg_placeholder *mp = NULL;
2703
2704 list_for_each_entry(mp, &curmps, list) {
2705 if (!strcmp(mp->id, args[cur_arg])) {
2706 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2707 file, linenum, args[cur_arg]);
2708 err_code |= ERR_ALERT | ERR_FATAL;
2709 goto out;
2710 }
2711 }
2712
2713 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2714 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2715 err_code |= ERR_ALERT | ERR_ABORT;
2716 goto out;
2717 }
2718 mp->id = strdup(args[cur_arg]);
2719 LIST_ADDQ(&curmps, &mp->list);
2720 cur_arg++;
2721 }
2722 }
2723 else if (!strcmp(args[0], "timeout")) {
2724 unsigned int *tv = NULL;
2725 const char *res;
2726 unsigned timeout;
2727
2728 if (!*args[1]) {
2729 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2730 file, linenum);
2731 err_code |= ERR_ALERT | ERR_FATAL;
2732 goto out;
2733 }
2734 if (!strcmp(args[1], "hello"))
2735 tv = &curagent->timeout.hello;
2736 else if (!strcmp(args[1], "idle"))
2737 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002738 else if (!strcmp(args[1], "processing"))
2739 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002740 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01002741 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002742 file, linenum, args[1]);
2743 err_code |= ERR_ALERT | ERR_FATAL;
2744 goto out;
2745 }
2746 if (!*args[2]) {
2747 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2748 file, linenum, args[1]);
2749 err_code |= ERR_ALERT | ERR_FATAL;
2750 goto out;
2751 }
2752 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2753 if (res) {
2754 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2755 file, linenum, *res, args[1]);
2756 err_code |= ERR_ALERT | ERR_ABORT;
2757 goto out;
2758 }
2759 if (*args[3]) {
2760 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2761 file, linenum, args[3]);
2762 err_code |= ERR_ALERT | ERR_ABORT;
2763 goto out;
2764 }
2765 *tv = MS_TO_TICKS(timeout);
2766 }
2767 else if (!strcmp(args[0], "option")) {
2768 if (!*args[1]) {
2769 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2770 file, linenum, args[0]);
2771 err_code |= ERR_ALERT | ERR_FATAL;
2772 goto out;
2773 }
2774 if (!strcmp(args[1], "var-prefix")) {
2775 char *tmp;
2776
2777 if (!*args[2]) {
2778 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2779 file, linenum, args[0],
2780 args[1]);
2781 err_code |= ERR_ALERT | ERR_FATAL;
2782 goto out;
2783 }
2784 tmp = args[2];
2785 while (*tmp) {
2786 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2787 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2788 file, linenum, args[0], args[1]);
2789 err_code |= ERR_ALERT | ERR_FATAL;
2790 goto out;
2791 }
2792 tmp++;
2793 }
2794 curagent->var_pfx = strdup(args[2]);
2795 }
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002796 else if (!strcmp(args[1], "continue-on-error")) {
2797 if (*args[2]) {
2798 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
Christopher Faulet48026722016-11-16 15:01:12 +01002799 file, linenum, args[2]);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002800 err_code |= ERR_ALERT | ERR_ABORT;
2801 goto out;
2802 }
2803 curagent->flags |= SPOE_FL_CONT_ON_ERR;
2804 }
Christopher Faulet985532d2016-11-16 15:36:19 +01002805 else if (!strcmp(args[1], "set-on-error")) {
2806 char *tmp;
2807
2808 if (!*args[2]) {
2809 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2810 file, linenum, args[0],
2811 args[1]);
2812 err_code |= ERR_ALERT | ERR_FATAL;
2813 goto out;
2814 }
2815 tmp = args[2];
2816 while (*tmp) {
2817 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2818 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2819 file, linenum, args[0], args[1]);
2820 err_code |= ERR_ALERT | ERR_FATAL;
2821 goto out;
2822 }
2823 tmp++;
2824 }
2825 curagent->var_on_error = strdup(args[2]);
2826 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002827 else {
2828 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2829 file, linenum, args[1]);
2830 err_code |= ERR_ALERT | ERR_FATAL;
2831 goto out;
2832 }
Christopher Faulet48026722016-11-16 15:01:12 +01002833 }
2834 else if (!strcmp(args[0], "maxconnrate")) {
2835 if (!*args[1]) {
2836 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2837 file, linenum, args[0]);
2838 err_code |= ERR_ALERT | ERR_FATAL;
2839 goto out;
2840 }
2841 if (*args[2]) {
2842 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2843 file, linenum, args[2]);
2844 err_code |= ERR_ALERT | ERR_ABORT;
2845 goto out;
2846 }
2847 curagent->cps_max = atol(args[1]);
2848 }
2849 else if (!strcmp(args[0], "maxerrrate")) {
2850 if (!*args[1]) {
2851 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
2852 file, linenum, args[0]);
2853 err_code |= ERR_ALERT | ERR_FATAL;
2854 goto out;
2855 }
2856 if (*args[2]) {
2857 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2858 file, linenum, args[2]);
2859 err_code |= ERR_ALERT | ERR_ABORT;
2860 goto out;
2861 }
2862 curagent->eps_max = atol(args[1]);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002863 }
2864 else if (*args[0]) {
2865 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2866 file, linenum, args[0]);
2867 err_code |= ERR_ALERT | ERR_FATAL;
2868 goto out;
2869 }
2870 out:
2871 return err_code;
2872}
2873
2874static int
2875cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2876{
2877 struct spoe_message *msg;
2878 struct spoe_arg *arg;
2879 const char *err;
2880 char *errmsg = NULL;
2881 int err_code = 0;
2882
2883 if ((cfg_scope == NULL && curengine != NULL) ||
2884 (cfg_scope != NULL && curengine == NULL) ||
2885 strcmp(curengine, cfg_scope))
2886 goto out;
2887
2888 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2889 if (!*args[1]) {
2890 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2891 file, linenum);
2892 err_code |= ERR_ALERT | ERR_ABORT;
2893 goto out;
2894 }
2895 if (*args[2]) {
2896 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2897 file, linenum, args[2]);
2898 err_code |= ERR_ALERT | ERR_ABORT;
2899 goto out;
2900 }
2901
2902 err = invalid_char(args[1]);
2903 if (err) {
2904 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2905 file, linenum, *err, args[0], args[1]);
2906 err_code |= ERR_ALERT | ERR_ABORT;
2907 goto out;
2908 }
2909
2910 list_for_each_entry(msg, &curmsgs, list) {
2911 if (!strcmp(msg->id, args[1])) {
2912 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2913 " name as another one declared at %s:%d.\n",
2914 file, linenum, args[1], msg->conf.file, msg->conf.line);
2915 err_code |= ERR_ALERT | ERR_FATAL;
2916 goto out;
2917 }
2918 }
2919
2920 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2921 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2922 err_code |= ERR_ALERT | ERR_ABORT;
2923 goto out;
2924 }
2925
2926 curmsg->id = strdup(args[1]);
2927 curmsg->id_len = strlen(curmsg->id);
2928 curmsg->event = SPOE_EV_NONE;
2929 curmsg->conf.file = strdup(file);
2930 curmsg->conf.line = linenum;
2931 LIST_INIT(&curmsg->args);
2932 LIST_ADDQ(&curmsgs, &curmsg->list);
2933 }
2934 else if (!strcmp(args[0], "args")) {
2935 int cur_arg = 1;
2936
2937 curproxy->conf.args.ctx = ARGC_SPOE;
2938 curproxy->conf.args.file = file;
2939 curproxy->conf.args.line = linenum;
2940 while (*args[cur_arg]) {
2941 char *delim = strchr(args[cur_arg], '=');
2942 int idx = 0;
2943
2944 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2945 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2946 err_code |= ERR_ALERT | ERR_ABORT;
2947 goto out;
2948 }
2949
2950 if (!delim) {
2951 arg->name = NULL;
2952 arg->name_len = 0;
2953 delim = args[cur_arg];
2954 }
2955 else {
2956 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2957 arg->name_len = delim - args[cur_arg];
2958 delim++;
2959 }
2960
2961 arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
2962 if (arg->expr == NULL) {
2963 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2964 err_code |= ERR_ALERT | ERR_FATAL;
2965 free(arg->name);
2966 free(arg);
2967 goto out;
2968 }
2969 LIST_ADDQ(&curmsg->args, &arg->list);
2970 cur_arg++;
2971 }
2972 curproxy->conf.args.file = NULL;
2973 curproxy->conf.args.line = 0;
2974 }
2975 else if (!strcmp(args[0], "event")) {
2976 if (!*args[1]) {
2977 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2978 err_code |= ERR_ALERT | ERR_ABORT;
2979 goto out;
2980 }
2981 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2982 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2983 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2984 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2985
2986 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2987 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2988 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2989 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2990 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2991 curmsg->event = SPOE_EV_ON_TCP_RSP;
2992
2993 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2994 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2995 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2996 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2997 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2998 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2999 else {
3000 Alert("parsing [%s:%d] : unkown event '%s'.\n",
3001 file, linenum, args[1]);
3002 err_code |= ERR_ALERT | ERR_ABORT;
3003 goto out;
3004 }
3005 }
3006 else if (!*args[0]) {
3007 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
3008 file, linenum, args[0]);
3009 err_code |= ERR_ALERT | ERR_FATAL;
3010 goto out;
3011 }
3012 out:
3013 free(errmsg);
3014 return err_code;
3015}
3016
3017/* Return -1 on error, else 0 */
3018static int
3019parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
3020 struct flt_conf *fconf, char **err, void *private)
3021{
3022 struct list backup_sections;
3023 struct spoe_config *conf;
3024 struct spoe_message *msg, *msgback;
3025 struct spoe_msg_placeholder *mp, *mpback;
3026 char *file = NULL, *engine = NULL;
3027 int ret, pos = *cur_arg + 1;
3028
3029 conf = calloc(1, sizeof(*conf));
3030 if (conf == NULL) {
3031 memprintf(err, "%s: out of memory", args[*cur_arg]);
3032 goto error;
3033 }
3034 conf->proxy = px;
3035
3036 while (*args[pos]) {
3037 if (!strcmp(args[pos], "config")) {
3038 if (!*args[pos+1]) {
3039 memprintf(err, "'%s' : '%s' option without value",
3040 args[*cur_arg], args[pos]);
3041 goto error;
3042 }
3043 file = args[pos+1];
3044 pos += 2;
3045 }
3046 else if (!strcmp(args[pos], "engine")) {
3047 if (!*args[pos+1]) {
3048 memprintf(err, "'%s' : '%s' option without value",
3049 args[*cur_arg], args[pos]);
3050 goto error;
3051 }
3052 engine = args[pos+1];
3053 pos += 2;
3054 }
3055 else {
3056 memprintf(err, "unknown keyword '%s'", args[pos]);
3057 goto error;
3058 }
3059 }
3060 if (file == NULL) {
3061 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3062 goto error;
3063 }
3064
3065 /* backup sections and register SPOE sections */
3066 LIST_INIT(&backup_sections);
3067 cfg_backup_sections(&backup_sections);
3068 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
3069 cfg_register_section("spoe-message", cfg_parse_spoe_message);
3070
3071 /* Parse SPOE filter configuration file */
3072 curengine = engine;
3073 curproxy = px;
3074 curagent = NULL;
3075 curmsg = NULL;
3076 ret = readcfgfile(file);
3077 curproxy = NULL;
3078
3079 /* unregister SPOE sections and restore previous sections */
3080 cfg_unregister_sections();
3081 cfg_restore_sections(&backup_sections);
3082
3083 if (ret == -1) {
3084 memprintf(err, "Could not open configuration file %s : %s",
3085 file, strerror(errno));
3086 goto error;
3087 }
3088 if (ret & (ERR_ABORT|ERR_FATAL)) {
3089 memprintf(err, "Error(s) found in configuration file %s", file);
3090 goto error;
3091 }
3092
3093 /* Check SPOE agent */
3094 if (curagent == NULL) {
3095 memprintf(err, "No SPOE agent found in file %s", file);
3096 goto error;
3097 }
3098 if (curagent->b.name == NULL) {
3099 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3100 curagent->id, curagent->conf.file, curagent->conf.line);
3101 goto error;
3102 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003103 if (curagent->timeout.hello == TICK_ETERNITY ||
3104 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003105 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003106 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3107 " | While not properly invalid, you will certainly encounter various problems\n"
3108 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003109 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003110 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3111 }
3112 if (curagent->var_pfx == NULL) {
3113 char *tmp = curagent->id;
3114
3115 while (*tmp) {
3116 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3117 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3118 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3119 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3120 goto error;
3121 }
3122 tmp++;
3123 }
3124 curagent->var_pfx = strdup(curagent->id);
3125 }
3126
3127 if (LIST_ISEMPTY(&curmps)) {
3128 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3129 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3130 goto finish;
3131 }
3132
3133 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3134 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3135 if (!strcmp(msg->id, mp->id)) {
3136 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3137 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3138 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3139 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3140 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3141 }
3142 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3143 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3144 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3145 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3146 px->id, msg->conf.file, msg->conf.line);
3147 goto next;
3148 }
3149 if (msg->event == SPOE_EV_NONE) {
3150 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3151 px->id, msg->conf.file, msg->conf.line);
3152 goto next;
3153 }
3154 msg->agent = curagent;
3155 LIST_DEL(&msg->list);
3156 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3157 goto next;
3158 }
3159 }
3160 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3161 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3162 goto error;
3163 next:
3164 continue;
3165 }
3166
3167 finish:
3168 conf->agent = curagent;
3169 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3170 LIST_DEL(&mp->list);
3171 release_spoe_msg_placeholder(mp);
3172 }
3173 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3174 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3175 px->id, msg->id, msg->conf.file, msg->conf.line);
3176 LIST_DEL(&msg->list);
3177 release_spoe_message(msg);
3178 }
3179
3180 *cur_arg = pos;
3181 fconf->ops = &spoe_ops;
3182 fconf->conf = conf;
3183 return 0;
3184
3185 error:
3186 release_spoe_agent(curagent);
3187 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3188 LIST_DEL(&mp->list);
3189 release_spoe_msg_placeholder(mp);
3190 }
3191 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3192 LIST_DEL(&msg->list);
3193 release_spoe_message(msg);
3194 }
3195 free(conf);
3196 return -1;
3197}
3198
3199
3200/* Declare the filter parser for "spoe" keyword */
3201static struct flt_kw_list flt_kws = { "SPOE", { }, {
3202 { "spoe", parse_spoe_flt, NULL },
3203 { NULL, NULL, NULL },
3204 }
3205};
3206
3207__attribute__((constructor))
3208static void __spoe_init(void)
3209{
3210 flt_register_keywords(&flt_kws);
3211
3212 LIST_INIT(&curmsgs);
3213 LIST_INIT(&curmps);
3214 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3215}
3216
3217__attribute__((destructor))
3218static void
3219__spoe_deinit(void)
3220{
3221 pool_destroy2(pool2_spoe_ctx);
3222}