blob: 17290d2cc4a5674edc577fa5388bccf09c11aaeb [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
Christopher Faulet48026722016-11-16 15:01:12 +010033#include <proto/freq_ctr.h>
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020034#include <proto/frontend.h>
35#include <proto/log.h>
36#include <proto/proto_http.h>
37#include <proto/proxy.h>
38#include <proto/sample.h>
39#include <proto/session.h>
40#include <proto/signal.h>
41#include <proto/stream.h>
42#include <proto/stream_interface.h>
43#include <proto/task.h>
44#include <proto/vars.h>
45
46#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47#define SPOE_PRINTF(x...) fprintf(x)
48#else
49#define SPOE_PRINTF(x...)
50#endif
51
52/* Helper to get ctx inside an appctx */
53#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
54
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020055/* Minimal size for a frame */
56#define MIN_FRAME_SIZE 256
57
Christopher Fauletea62c2a2016-11-14 10:54:21 +010058/* Flags set on the SPOE agent */
59#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
60
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020061/* Flags set on the SPOE context */
62#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
63#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
64#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
65#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
66
67#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
68
Christopher Fauleta1cda022016-12-21 08:58:06 +010069/* Flags set on the SPOE applet */
70#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */
71#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */
72#define SPOE_APPCTX_FL_PERSIST 0x00000004 /* Set if the applet is persistent */
73
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020074#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
75#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
76
77/* All possible states for a SPOE context */
78enum spoe_ctx_state {
79 SPOE_CTX_ST_NONE = 0,
80 SPOE_CTX_ST_READY,
81 SPOE_CTX_ST_SENDING_MSGS,
82 SPOE_CTX_ST_WAITING_ACK,
83 SPOE_CTX_ST_DONE,
84 SPOE_CTX_ST_ERROR,
85};
86
87/* All possible states for a SPOE applet */
88enum spoe_appctx_state {
89 SPOE_APPCTX_ST_CONNECT = 0,
90 SPOE_APPCTX_ST_CONNECTING,
Christopher Fauleta1cda022016-12-21 08:58:06 +010091 SPOE_APPCTX_ST_IDLE,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020092 SPOE_APPCTX_ST_PROCESSING,
93 SPOE_APPCTX_ST_DISCONNECT,
94 SPOE_APPCTX_ST_DISCONNECTING,
95 SPOE_APPCTX_ST_EXIT,
96 SPOE_APPCTX_ST_END,
97};
98
99/* All supported SPOE actions */
100enum spoe_action_type {
101 SPOE_ACT_T_SET_VAR = 1,
102 SPOE_ACT_T_UNSET_VAR,
103 SPOE_ACT_TYPES,
104};
105
106/* All supported SPOE events */
107enum spoe_event {
108 SPOE_EV_NONE = 0,
109
110 /* Request events */
111 SPOE_EV_ON_CLIENT_SESS = 1,
112 SPOE_EV_ON_TCP_REQ_FE,
113 SPOE_EV_ON_TCP_REQ_BE,
114 SPOE_EV_ON_HTTP_REQ_FE,
115 SPOE_EV_ON_HTTP_REQ_BE,
116
117 /* Response events */
118 SPOE_EV_ON_SERVER_SESS,
119 SPOE_EV_ON_TCP_RSP,
120 SPOE_EV_ON_HTTP_RSP,
121
122 SPOE_EV_EVENTS
123};
124
125/* Errors triggerd by SPOE applet */
126enum spoe_frame_error {
127 SPOE_FRM_ERR_NONE = 0,
128 SPOE_FRM_ERR_IO,
129 SPOE_FRM_ERR_TOUT,
130 SPOE_FRM_ERR_TOO_BIG,
131 SPOE_FRM_ERR_INVALID,
132 SPOE_FRM_ERR_NO_VSN,
133 SPOE_FRM_ERR_NO_FRAME_SIZE,
134 SPOE_FRM_ERR_NO_CAP,
135 SPOE_FRM_ERR_BAD_VSN,
136 SPOE_FRM_ERR_BAD_FRAME_SIZE,
137 SPOE_FRM_ERR_UNKNOWN = 99,
138 SPOE_FRM_ERRS,
139};
140
141/* Scopes used for variables set by agents. It is a way to be agnotic to vars
142 * scope. */
143enum spoe_vars_scope {
144 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
145 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
146 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
147 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
148 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
149};
150
151
152/* Describe an argument that will be linked to a message. It is a sample fetch,
153 * with an optional name. */
154struct spoe_arg {
155 char *name; /* Name of the argument, may be NULL */
156 unsigned int name_len; /* The name length, 0 if NULL */
157 struct sample_expr *expr; /* Sample expression */
158 struct list list; /* Used to chain SPOE args */
159};
160
161/* Used during the config parsing only because, when a SPOE agent section is
162 * parsed, messages can be undefined. */
163struct spoe_msg_placeholder {
164 char *id; /* SPOE message placeholder id */
165 struct list list; /* Use to chain SPOE message placeholders */
166};
167
168/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
169 * an argument list (see above) and it is linked to a specific event. */
170struct spoe_message {
Christopher Fauleta1cda022016-12-21 08:58:06 +0100171 char *id; /* SPOE message id */
172 unsigned int id_len; /* The message id length */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200173 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
174 struct {
Christopher Fauleta1cda022016-12-21 08:58:06 +0100175 char *file; /* file where the SPOE message appears */
176 int line; /* line where the SPOE message appears */
177 } conf; /* config information */
178 struct list args; /* Arguments added when the SPOE messages is sent */
179 struct list list; /* Used to chain SPOE messages */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200180
181 enum spoe_event event; /* SPOE_EV_* */
182};
183
184/* Describe a SPOE agent. */
185struct spoe_agent {
186 char *id; /* SPOE agent id (name) */
187 struct {
188 char *file; /* file where the SPOE agent appears */
189 int line; /* line where the SPOE agent appears */
190 } conf; /* config information */
191 union {
192 struct proxy *be; /* Backend used by this agent */
193 char *name; /* Backend name used during conf parsing */
194 } b;
195 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100196 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
197 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100198 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200199 } timeout;
200
Christopher Fauleta1cda022016-12-21 08:58:06 +0100201 /* Config info */
202 char *engine_id; /* engine-id string */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200203 char *var_pfx; /* Prefix used for vars set by the agent */
Christopher Faulet985532d2016-11-16 15:36:19 +0100204 char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
Christopher Fauletea62c2a2016-11-14 10:54:21 +0100205 unsigned int flags; /* SPOE_FL_* */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100206 unsigned int cps_max; /* Maximum # of connections per second */
207 unsigned int eps_max; /* Maximum # of errors per second */
208 unsigned int max_frame_size; /* Maximum frame size for this agent, before any negotiation */
209 unsigned int min_applets; /* Minimum # applets alive at a time */
210 unsigned int max_fpa; /* Maximum # of frames handled per applet at once */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200211
212 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
213 * for each supported events */
214
Christopher Fauleta1cda022016-12-21 08:58:06 +0100215 /* running info */
216 unsigned int applets_act; /* # of applets alive at a time */
217 unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */
218 unsigned int sending_rate; /* the global sending rate */
219
220 struct freq_ctr conn_per_sec; /* connections per second */
221 struct freq_ctr err_per_sec; /* connetion errors per second */
222
223 struct list applets; /* List of available SPOE applets */
224 struct list sending_queue; /* Queue of streams waiting to send data */
225 struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */
226
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200227};
228
229/* SPOE filter configuration */
230struct spoe_config {
231 struct proxy *proxy; /* Proxy owning the filter */
232 struct spoe_agent *agent; /* Agent used by this filter */
233 struct proxy agent_fe; /* Agent frontend */
234};
235
236/* SPOE context attached to a stream. It is the main structure that handles the
237 * processing offload */
238struct spoe_context {
239 struct filter *filter; /* The SPOE filter */
240 struct stream *strm; /* The stream that should be offloaded */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100241
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200242 struct list *messages; /* List of messages that will be sent during the stream processing */
243 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
Christopher Fauleta73e59b2016-12-09 17:30:18 +0100244 struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100245 struct list list;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200246
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200247 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
248 unsigned int flags; /* SPOE_CTX_FL_* */
249
250 unsigned int stream_id; /* stream_id and frame_id are used */
251 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100252 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200253};
254
Christopher Faulet3b386a32017-02-23 10:17:15 +0100255/* SPOE filter id. Used to identify SPOE filters */
256const char *spoe_filter_id = "SPOE filter";
257
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200258/* Set if the handle on SIGUSR1 is registered */
259static int sighandler_registered = 0;
260
261/* proxy used during the parsing */
262struct proxy *curproxy = NULL;
263
264/* The name of the SPOE engine, used during the parsing */
265char *curengine = NULL;
266
267/* SPOE agent used during the parsing */
268struct spoe_agent *curagent = NULL;
269
270/* SPOE message used during the parsing */
271struct spoe_message *curmsg = NULL;
272
273/* list of SPOE messages and placeholders used during the parsing */
274struct list curmsgs;
275struct list curmps;
276
277/* Pool used to allocate new SPOE contexts */
278static struct pool_head *pool2_spoe_ctx = NULL;
279
280/* Temporary variables used to ease error processing */
281int spoe_status_code = SPOE_FRM_ERR_NONE;
282char spoe_reason[256];
283
284struct flt_ops spoe_ops;
285
Christopher Fauleta1cda022016-12-21 08:58:06 +0100286static int queue_spoe_context(struct spoe_context *ctx);
287static int acquire_spoe_buffer(struct spoe_context *ctx);
288static void release_spoe_buffer(struct spoe_context *ctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200289
290/********************************************************************
291 * helper functions/globals
292 ********************************************************************/
293static void
294release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
295{
296 if (!mp)
297 return;
298 free(mp->id);
299 free(mp);
300}
301
302
303static void
304release_spoe_message(struct spoe_message *msg)
305{
306 struct spoe_arg *arg, *back;
307
308 if (!msg)
309 return;
310 free(msg->id);
311 free(msg->conf.file);
312 list_for_each_entry_safe(arg, back, &msg->args, list) {
313 release_sample_expr(arg->expr);
314 free(arg->name);
315 LIST_DEL(&arg->list);
316 free(arg);
317 }
318 free(msg);
319}
320
321static void
322release_spoe_agent(struct spoe_agent *agent)
323{
324 struct spoe_message *msg, *back;
325 int i;
326
327 if (!agent)
328 return;
329 free(agent->id);
330 free(agent->conf.file);
331 free(agent->var_pfx);
Christopher Fauleta1cda022016-12-21 08:58:06 +0100332 free(agent->engine_id);
Christopher Faulet985532d2016-11-16 15:36:19 +0100333 free(agent->var_on_error);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200334 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
335 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
336 LIST_DEL(&msg->list);
337 release_spoe_message(msg);
338 }
339 }
340 free(agent);
341}
342
343static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
344 [SPOE_FRM_ERR_NONE] = "normal",
345 [SPOE_FRM_ERR_IO] = "I/O error",
346 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
347 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
348 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
349 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
350 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
351 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
352 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
353 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
354 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
355};
356
357static const char *spoe_event_str[SPOE_EV_EVENTS] = {
358 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
359 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
360 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
361 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
362 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
363
364 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
365 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
366 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
367};
368
369
370#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
371
372static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
373 [SPOE_CTX_ST_NONE] = "NONE",
374 [SPOE_CTX_ST_READY] = "READY",
375 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
376 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
377 [SPOE_CTX_ST_DONE] = "DONE",
378 [SPOE_CTX_ST_ERROR] = "ERROR",
379};
380
381static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
382 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
383 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
Christopher Fauleta1cda022016-12-21 08:58:06 +0100384 [SPOE_APPCTX_ST_IDLE] = "IDLE",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200385 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
386 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
387 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
388 [SPOE_APPCTX_ST_EXIT] = "EXIT",
389 [SPOE_APPCTX_ST_END] = "END",
390};
391
392#endif
Christopher Fauleta1cda022016-12-21 08:58:06 +0100393
394static char *
395generate_pseudo_uuid()
396{
397 static int init = 0;
398
399 const char uuid_fmt[] = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx";
400 const char uuid_chr[] = "0123456789ABCDEF-";
401 char *uuid;
402 int i;
403
404 if ((uuid = calloc(1, sizeof(uuid_fmt))) == NULL)
405 return NULL;
406
407 if (!init) {
408 srand(now_ms);
409 init = 1;
410 }
411
412 for (i = 0; i < sizeof(uuid_fmt)-1; i++) {
413 int r = rand () % 16;
414
415 switch (uuid_fmt[i]) {
416 case 'x' : uuid[i] = uuid_chr[r]; break;
417 case 'y' : uuid[i] = uuid_chr[(r & 0x03) | 0x08]; break;
418 default : uuid[i] = uuid_fmt[i]; break;
419 }
420 }
421 return uuid;
422}
423
424static inline unsigned int
425min_applets_act(struct spoe_agent *agent)
426{
427 unsigned int nbsrv;
428
429 if (agent->min_applets)
430 return agent->min_applets;
431
432 nbsrv = (agent->b.be->srv_act ? agent->b.be->srv_act : agent->b.be->srv_bck);
433 return 2*nbsrv;
434}
435
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200436/********************************************************************
437 * Functions that encode/decode SPOE frames
438 ********************************************************************/
439/* Frame Types sent by HAProxy and by agents */
440enum spoe_frame_type {
441 /* Frames sent by HAProxy */
442 SPOE_FRM_T_HAPROXY_HELLO = 1,
443 SPOE_FRM_T_HAPROXY_DISCON,
444 SPOE_FRM_T_HAPROXY_NOTIFY,
445
446 /* Frames sent by the agents */
447 SPOE_FRM_T_AGENT_HELLO = 101,
448 SPOE_FRM_T_AGENT_DISCON,
449 SPOE_FRM_T_AGENT_ACK
450};
451
452/* All supported data types */
453enum spoe_data_type {
454 SPOE_DATA_T_NULL = 0,
455 SPOE_DATA_T_BOOL,
456 SPOE_DATA_T_INT32,
457 SPOE_DATA_T_UINT32,
458 SPOE_DATA_T_INT64,
459 SPOE_DATA_T_UINT64,
460 SPOE_DATA_T_IPV4,
461 SPOE_DATA_T_IPV6,
462 SPOE_DATA_T_STR,
463 SPOE_DATA_T_BIN,
464 SPOE_DATA_TYPES
465};
466
467/* Masks to get data type or flags value */
468#define SPOE_DATA_T_MASK 0x0F
469#define SPOE_DATA_FL_MASK 0xF0
470
471/* Flags to set Boolean values */
472#define SPOE_DATA_FL_FALSE 0x00
473#define SPOE_DATA_FL_TRUE 0x10
474
475/* Helper to get static string length, excluding the terminating null byte */
476#define SLEN(str) (sizeof(str)-1)
477
478/* Predefined key used in HELLO/DISCONNECT frames */
479#define SUPPORTED_VERSIONS_KEY "supported-versions"
480#define VERSION_KEY "version"
481#define MAX_FRAME_SIZE_KEY "max-frame-size"
482#define CAPABILITIES_KEY "capabilities"
Christopher Fauleta1cda022016-12-21 08:58:06 +0100483#define ENGINE_ID_KEY "engine-id"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100484#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200485#define STATUS_CODE_KEY "status-code"
486#define MSG_KEY "message"
487
488struct spoe_version {
489 char *str;
490 int min;
491 int max;
492};
493
494/* All supported versions */
495static struct spoe_version supported_versions[] = {
496 {"1.0", 1000, 1000},
497 {NULL, 0, 0}
498};
499
500/* Comma-separated list of supported versions */
501#define SUPPORTED_VERSIONS_VAL "1.0"
502
503/* Comma-separated list of supported capabilities (none for now) */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100504//#define CAPABILITIES_VAL ""
505#define CAPABILITIES_VAL "pipelining,async"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200506
507static int
508decode_spoe_version(const char *str, size_t len)
509{
510 char tmp[len+1], *start, *end;
511 double d;
512 int vsn = -1;
513
514 memset(tmp, 0, len+1);
515 memcpy(tmp, str, len);
516
517 start = tmp;
518 while (isspace(*start))
519 start++;
520
521 d = strtod(start, &end);
522 if (d == 0 || start == end)
523 goto out;
524
525 if (*end) {
526 while (isspace(*end))
527 end++;
528 if (*end)
529 goto out;
530 }
531 vsn = (int)(d * 1000);
532 out:
533 return vsn;
534}
535
536/* Encode a variable-length integer. This function never fails and returns the
537 * number of written bytes. */
538static int
539encode_spoe_varint(uint64_t i, char *buf)
540{
541 int idx;
542
543 if (i < 240) {
544 buf[0] = (unsigned char)i;
545 return 1;
546 }
547
548 buf[0] = (unsigned char)i | 240;
549 i = (i - 240) >> 4;
550 for (idx = 1; i >= 128; ++idx) {
551 buf[idx] = (unsigned char)i | 128;
552 i = (i - 128) >> 7;
553 }
554 buf[idx++] = (unsigned char)i;
555 return idx;
556}
557
558/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
559 * happens when the buffer's end in reached. On success, the number of read
560 * bytes is returned. */
561static int
562decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
563{
564 unsigned char *msg = (unsigned char *)buf;
565 int idx = 0;
566
567 if (msg > (unsigned char *)end)
568 return -1;
569
570 if (msg[0] < 240) {
571 *i = msg[0];
572 return 1;
573 }
574 *i = msg[0];
575 do {
576 ++idx;
577 if (msg+idx > (unsigned char *)end)
578 return -1;
579 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
580 } while (msg[idx] >= 128);
581 return (idx + 1);
582}
583
584/* Encode a string. The string will be prefix by its length, encoded as a
585 * variable-length integer. This function never fails and returns the number of
586 * written bytes. */
587static int
588encode_spoe_string(const char *str, size_t len, char *dst)
589{
590 int idx = 0;
591
592 if (!len) {
593 dst[0] = 0;
594 return 1;
595 }
596
597 idx += encode_spoe_varint(len, dst);
598 memcpy(dst+idx, str, len);
599 return (idx + len);
600}
601
602/* Decode a string. Its length is decoded first as a variable-length integer. If
603 * it succeeds, and if the string length is valid, the begin of the string is
604 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
605 * read is returned. If an error occurred, -1 is returned and <*str> remains
606 * NULL. */
607static int
608decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
609{
610 int i, idx = 0;
611
612 *str = NULL;
613 *len = 0;
614
615 if ((i = decode_spoe_varint(buf, end, len)) == -1)
616 goto error;
617 idx += i;
618 if (buf + idx + *len > end)
619 goto error;
620
621 *str = buf+idx;
622 return (idx + *len);
623
624 error:
625 return -1;
626}
627
628/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
629 * of bytes read is returned. A types data is composed of a type (1 byte) and
630 * corresponding data:
631 * - boolean: non additional data (0 bytes)
632 * - integers: a variable-length integer (see decode_spoe_varint)
633 * - ipv4: 4 bytes
634 * - ipv6: 16 bytes
635 * - binary and string: a buffer prefixed by its size, a variable-length
636 * integer (see decode_spoe_string) */
637static int
638skip_spoe_data(char *frame, char *end)
639{
640 uint64_t sz = 0;
641 int i, idx = 0;
642
643 if (frame > end)
644 return -1;
645
646 switch (frame[idx++] & SPOE_DATA_T_MASK) {
647 case SPOE_DATA_T_BOOL:
648 break;
649 case SPOE_DATA_T_INT32:
650 case SPOE_DATA_T_INT64:
651 case SPOE_DATA_T_UINT32:
652 case SPOE_DATA_T_UINT64:
653 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
654 return -1;
655 idx += i;
656 break;
657 case SPOE_DATA_T_IPV4:
658 idx += 4;
659 break;
660 case SPOE_DATA_T_IPV6:
661 idx += 16;
662 break;
663 case SPOE_DATA_T_STR:
664 case SPOE_DATA_T_BIN:
665 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
666 return -1;
667 idx += i + sz;
668 break;
669 }
670
671 if (frame+idx > end)
672 return -1;
673 return idx;
674}
675
676/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
677 * number of read bytes is returned. See skip_spoe_data for details. */
678static int
679decode_spoe_data(char *frame, char *end, struct sample *smp)
680{
681 uint64_t sz = 0;
682 int type, i, idx = 0;
683
684 if (frame > end)
685 return -1;
686
687 type = frame[idx++];
688 switch (type & SPOE_DATA_T_MASK) {
689 case SPOE_DATA_T_BOOL:
690 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
691 smp->data.type = SMP_T_BOOL;
692 break;
693 case SPOE_DATA_T_INT32:
694 case SPOE_DATA_T_INT64:
695 case SPOE_DATA_T_UINT32:
696 case SPOE_DATA_T_UINT64:
697 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
698 return -1;
699 idx += i;
700 smp->data.type = SMP_T_SINT;
701 break;
702 case SPOE_DATA_T_IPV4:
703 if (frame+idx+4 > end)
704 return -1;
705 memcpy(&smp->data.u.ipv4, frame+idx, 4);
706 smp->data.type = SMP_T_IPV4;
707 idx += 4;
708 break;
709 case SPOE_DATA_T_IPV6:
710 if (frame+idx+16 > end)
711 return -1;
712 memcpy(&smp->data.u.ipv6, frame+idx, 16);
713 smp->data.type = SMP_T_IPV6;
714 idx += 16;
715 break;
716 case SPOE_DATA_T_STR:
717 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
718 return -1;
719 idx += i;
720 if (frame+idx+sz > end)
721 return -1;
722 smp->data.u.str.str = frame+idx;
723 smp->data.u.str.len = sz;
724 smp->data.type = SMP_T_STR;
725 idx += sz;
726 break;
727 case SPOE_DATA_T_BIN:
728 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
729 return -1;
730 idx += i;
731 if (frame+idx+sz > end)
732 return -1;
733 smp->data.u.str.str = frame+idx;
734 smp->data.u.str.len = sz;
735 smp->data.type = SMP_T_BIN;
736 idx += sz;
737 break;
738 }
739
740 if (frame+idx > end)
741 return -1;
742 return idx;
743}
744
745/* Skip an action in a frame received from an agent. If an error occurred, -1 is
746 * returned, otherwise the number of read bytes is returned. An action is
747 * composed of the action type followed by a typed data. */
748static int
749skip_spoe_action(char *frame, char *end)
750{
751 int n, i, idx = 0;
752
753 if (frame+2 > end)
754 return -1;
755
756 idx++; /* Skip the action type */
757 n = frame[idx++];
758 while (n-- > 0) {
759 if ((i = skip_spoe_data(frame+idx, end)) == -1)
760 return -1;
761 idx += i;
762 }
763
764 if (frame+idx > end)
765 return -1;
766 return idx;
767}
768
769/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
770 * success, 0 if the frame can be ignored and -1 if an error occurred. */
771static int
772prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
773{
Christopher Fauleta1cda022016-12-21 08:58:06 +0100774 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200775 int idx = 0;
776 size_t max = (7 /* TYPE + METADATA */
777 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
778 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
Christopher Fauleta1cda022016-12-21 08:58:06 +0100779 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)
780 + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200781
782 if (size < max)
783 return -1;
784
785 /* Frame type */
786 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
787
788 /* No flags for now */
789 memset(frame+idx, 0, 4);
790 idx += 4;
791
792 /* No stream-id and frame-id for HELLO frames */
793 frame[idx++] = 0;
794 frame[idx++] = 0;
795
796 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
797 * and "capabilities" */
798
799 /* "supported-versions" K/V item */
800 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
801 frame[idx++] = SPOE_DATA_T_STR;
802 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
803
804 /* "max-fram-size" K/V item */
805 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
806 frame[idx++] = SPOE_DATA_T_UINT32;
807 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
808
809 /* "capabilities" K/V item */
810 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
811 frame[idx++] = SPOE_DATA_T_STR;
812 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
813
Christopher Fauleta1cda022016-12-21 08:58:06 +0100814 /* "engine-id" K/V item */
815 if (agent != NULL && agent->engine_id != NULL) {
816 idx += encode_spoe_string(ENGINE_ID_KEY, SLEN(ENGINE_ID_KEY), frame+idx);
817 frame[idx++] = SPOE_DATA_T_STR;
818 idx += encode_spoe_string(agent->engine_id, strlen(agent->engine_id), frame+idx);
819 }
820
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200821 return idx;
822}
823
824/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
825 * size on success, 0 if the frame can be ignored and -1 if an error
826 * occurred. */
827static int
828prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
829{
830 const char *reason;
831 int rlen, idx = 0;
832 size_t max = (7 /* TYPE + METADATA */
833 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
834 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
835
836 if (size < max)
837 return -1;
838
839 /* Get the message corresponding to the status code */
840 if (spoe_status_code >= SPOE_FRM_ERRS)
841 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
842 reason = spoe_frm_err_reasons[spoe_status_code];
843 rlen = strlen(reason);
844
845 /* Frame type */
846 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
847
848 /* No flags for now */
849 memset(frame+idx, 0, 4);
850 idx += 4;
851
852 /* No stream-id and frame-id for DISCONNECT frames */
853 frame[idx++] = 0;
854 frame[idx++] = 0;
855
856 /* There are 2 mandatory items: "status-code" and "message" */
857
858 /* "status-code" K/V item */
859 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
860 frame[idx++] = SPOE_DATA_T_UINT32;
861 idx += encode_spoe_varint(spoe_status_code, frame+idx);
862
863 /* "message" K/V item */
864 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
865 frame[idx++] = SPOE_DATA_T_STR;
866 idx += encode_spoe_string(reason, rlen, frame+idx);
867
868 return idx;
869}
870
871/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
872 * success, 0 if the frame can be ignored and -1 if an error occurred. */
873static int
Christopher Fauleta1cda022016-12-21 08:58:06 +0100874prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
875 char *frame, size_t size)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200876{
Christopher Fauleta1cda022016-12-21 08:58:06 +0100877 int idx = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200878
879 if (size < APPCTX_SPOE(appctx).max_frame_size)
880 return -1;
881
882 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
883
884 /* No flags for now */
885 memset(frame+idx, 0, 4);
886 idx += 4;
887
888 /* Set stream-id and frame-id */
889 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
890 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
891
892 /* Copy encoded messages */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100893 if (idx + ctx->buffer->i > size)
894 return 0;
895
896 /* Copy encoded messages */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200897 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
898 idx += ctx->buffer->i;
899
900 return idx;
901}
902
903/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
904 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
905static int
906handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
907{
Christopher Fauleta1cda022016-12-21 08:58:06 +0100908 int vsn, max_frame_size, flags;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200909 int i, idx = 0;
910 size_t min_size = (7 /* TYPE + METADATA */
911 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
912 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
913 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
914
915 /* Check frame type */
916 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
917 return 0;
918
919 if (size < min_size) {
920 spoe_status_code = SPOE_FRM_ERR_INVALID;
921 return -1;
922 }
923
924 /* Skip flags: fragmentation is not supported for now */
925 idx += 4;
926
927 /* stream-id and frame-id must be cleared */
928 if (frame[idx] != 0 || frame[idx+1] != 0) {
929 spoe_status_code = SPOE_FRM_ERR_INVALID;
930 return -1;
931 }
932 idx += 2;
933
934 /* There are 3 mandatory items: "version", "max-frame-size" and
935 * "capabilities" */
936
937 /* Loop on K/V items */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100938 vsn = max_frame_size = flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200939 while (idx < size) {
940 char *str;
941 uint64_t sz;
942
943 /* Decode the item key */
944 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
945 if (str == NULL) {
946 spoe_status_code = SPOE_FRM_ERR_INVALID;
947 return -1;
948 }
949 /* Check "version" K/V item */
950 if (!memcmp(str, VERSION_KEY, sz)) {
951 /* The value must be a string */
952 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
953 spoe_status_code = SPOE_FRM_ERR_INVALID;
954 return -1;
955 }
956 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
957 if (str == NULL) {
958 spoe_status_code = SPOE_FRM_ERR_INVALID;
959 return -1;
960 }
961
962 vsn = decode_spoe_version(str, sz);
963 if (vsn == -1) {
964 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
965 return -1;
966 }
967 for (i = 0; supported_versions[i].str != NULL; ++i) {
968 if (vsn >= supported_versions[i].min &&
969 vsn <= supported_versions[i].max)
970 break;
971 }
972 if (supported_versions[i].str == NULL) {
973 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
974 return -1;
975 }
976 }
977 /* Check "max-frame-size" K/V item */
978 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
979 int type;
980
981 /* The value must be integer */
982 type = frame[idx++];
983 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
984 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
985 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
986 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
987 spoe_status_code = SPOE_FRM_ERR_INVALID;
988 return -1;
989 }
990 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
991 spoe_status_code = SPOE_FRM_ERR_INVALID;
992 return -1;
993 }
994 idx += i;
995 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
996 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
997 return -1;
998 }
999 max_frame_size = sz;
1000 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001001 /* Check "capabilities" K/V item */
1002 else if (!memcmp(str, CAPABILITIES_KEY, sz)) {
1003 int i;
1004
1005 /* The value must be a string */
1006 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1007 spoe_status_code = SPOE_FRM_ERR_INVALID;
1008 return -1;
1009 }
1010 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1011 if (str == NULL)
1012 continue;
1013
1014 i = 0;
1015 while (i < sz) {
1016 char *delim;
1017
1018 /* Skip leading spaces */
1019 for (; isspace(str[i]) && i < sz; i++);
1020
1021 if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) {
1022 i += 10;
1023 if (sz == i || isspace(str[i]) || str[i] == ',')
1024 flags |= SPOE_APPCTX_FL_PIPELINING;
1025 }
1026 else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) {
1027 i += 5;
1028 if (sz == i || isspace(str[i]) || str[i] == ',')
1029 flags |= SPOE_APPCTX_FL_ASYNC;
1030 }
1031
1032 if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
1033 break;
1034 i = (delim - str) + 1;
1035 }
1036 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001037 else {
1038 /* Silently ignore unknown item */
1039 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1040 spoe_status_code = SPOE_FRM_ERR_INVALID;
1041 return -1;
1042 }
1043 idx += i;
1044 }
1045 }
1046
1047 /* Final checks */
1048 if (!vsn) {
1049 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
1050 return -1;
1051 }
1052 if (!max_frame_size) {
1053 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
1054 return -1;
1055 }
1056
1057 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
1058 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001059 APPCTX_SPOE(appctx).flags |= flags;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001060 return idx;
1061}
1062
1063/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
1064 * bytes on success, 0 if the frame can be ignored and -1 if an error
1065 * occurred. */
1066static int
1067handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
1068{
1069 int i, idx = 0;
1070 size_t min_size = (7 /* TYPE + METADATA */
1071 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
1072 + 1 + SLEN(MSG_KEY) + 1 + 1);
1073
1074 /* Check frame type */
1075 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
1076 return 0;
1077
1078 if (size < min_size) {
1079 spoe_status_code = SPOE_FRM_ERR_INVALID;
1080 return -1;
1081 }
1082
1083 /* Skip flags: fragmentation is not supported for now */
1084 idx += 4;
1085
1086 /* stream-id and frame-id must be cleared */
1087 if (frame[idx] != 0 || frame[idx+1] != 0) {
1088 spoe_status_code = SPOE_FRM_ERR_INVALID;
1089 return -1;
1090 }
1091 idx += 2;
1092
1093 /* There are 2 mandatory items: "status-code" and "message" */
1094
1095 /* Loop on K/V items */
1096 while (idx < size) {
1097 char *str;
1098 uint64_t sz;
1099
1100 /* Decode the item key */
1101 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1102 if (str == NULL) {
1103 spoe_status_code = SPOE_FRM_ERR_INVALID;
1104 return -1;
1105 }
1106
1107 /* Check "status-code" K/V item */
1108 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
1109 int type;
1110
1111 /* The value must be an integer */
1112 type = frame[idx++];
1113 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
1114 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
1115 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1116 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1117 spoe_status_code = SPOE_FRM_ERR_INVALID;
1118 return -1;
1119 }
1120 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1121 spoe_status_code = SPOE_FRM_ERR_INVALID;
1122 return -1;
1123 }
1124 idx += i;
1125 spoe_status_code = sz;
1126 }
1127
1128 /* Check "message" K/V item */
1129 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1130 /* The value must be a string */
1131 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1132 spoe_status_code = SPOE_FRM_ERR_INVALID;
1133 return -1;
1134 }
1135 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1136 if (str == NULL || sz > 255) {
1137 spoe_status_code = SPOE_FRM_ERR_INVALID;
1138 return -1;
1139 }
1140 memcpy(spoe_reason, str, sz);
1141 spoe_reason[sz] = 0;
1142 }
1143 else {
1144 /* Silently ignore unknown item */
1145 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1146 spoe_status_code = SPOE_FRM_ERR_INVALID;
1147 return -1;
1148 }
1149 idx += i;
1150 }
1151 }
1152
1153 return idx;
1154}
1155
1156
Christopher Fauleta1cda022016-12-21 08:58:06 +01001157/* Decode ACK frame sent by an agent. It returns the number of read bytes on
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001158 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1159static int
1160handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1161{
Christopher Fauleta1cda022016-12-21 08:58:06 +01001162 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1163 struct spoe_context *ctx, *back;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001164 uint64_t stream_id, frame_id;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001165 int i, idx = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001166 size_t min_size = (7 /* TYPE + METADATA */);
1167
1168 /* Check frame type */
1169 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1170 return 0;
1171
1172 if (size < min_size) {
1173 spoe_status_code = SPOE_FRM_ERR_INVALID;
1174 return -1;
1175 }
1176
1177 /* Skip flags: fragmentation is not supported for now */
1178 idx += 4;
1179
1180 /* Get the stream-id and the frame-id */
Christopher Fauleta1cda022016-12-21 08:58:06 +01001181 if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1)
1182 return 0;
1183 idx += i;
1184 if ((i= decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001185 return 0;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001186 idx += i;
1187
1188 if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC) {
1189 list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
1190 if (ctx->stream_id == (unsigned int)stream_id &&
1191 ctx->frame_id == (unsigned int)frame_id)
1192 goto found;
1193 }
1194 }
1195 else {
1196 list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) {
1197 if (ctx->stream_id == (unsigned int)stream_id &&
1198 ctx->frame_id == (unsigned int)frame_id)
1199 goto found;
1200 }
1201 }
1202
1203 /* No Stream found, ignore the frame */
1204 return 0;
1205
1206 found:
1207 if (acquire_spoe_buffer(ctx) <= 0)
1208 return 1; /* Retry later */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001209
1210 /* Copy encoded actions */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001211 memcpy(ctx->buffer->p, frame+idx, size-idx);
1212 ctx->buffer->i = size-idx;
1213
Christopher Fauleta1cda022016-12-21 08:58:06 +01001214 /* Notify the stream */
1215 LIST_DEL(&ctx->list);
1216 LIST_INIT(&ctx->list);
1217 ctx->state = SPOE_CTX_ST_DONE;
1218 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1219
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001220 return idx;
1221}
1222
Christopher Fauletba7bc162016-11-07 21:07:38 +01001223/* This function is used in cfgparse.c and declared in proto/checks.h. It
1224 * prepare the request to send to agents during a healthcheck. It returns 0 on
1225 * success and -1 if an error occurred. */
1226int
1227prepare_spoe_healthcheck_request(char **req, int *len)
1228{
1229 struct appctx a;
1230 char *frame, buf[global.tune.bufsize];
1231 unsigned int framesz;
1232 int idx;
1233
1234 memset(&a, 0, sizeof(a));
1235 memset(buf, 0, sizeof(buf));
Christopher Fauleta1cda022016-12-21 08:58:06 +01001236 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4;
Christopher Fauletba7bc162016-11-07 21:07:38 +01001237
1238 frame = buf+4;
1239 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1240 if (idx <= 0)
1241 return -1;
1242 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1243 return -1;
1244
1245 /* "healthcheck" K/V item */
1246 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1247 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1248
1249 framesz = htonl(idx);
1250 memcpy(buf, (char *)&framesz, 4);
1251
1252 if ((*req = malloc(idx+4)) == NULL)
1253 return -1;
1254 memcpy(*req, buf, idx+4);
1255 *len = idx+4;
1256 return 0;
1257}
1258
1259/* This function is used in checks.c and declared in proto/checks.h. It decode
1260 * the response received from an agent during a healthcheck. It returns 0 on
1261 * success and -1 if an error occurred. */
1262int
1263handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1264{
1265 struct appctx a;
1266 int r;
1267
1268 memset(&a, 0, sizeof(a));
Christopher Fauleta1cda022016-12-21 08:58:06 +01001269 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize-4;
Christopher Fauletba7bc162016-11-07 21:07:38 +01001270
1271 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1272 goto error;
1273 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1274 if (r == 0)
1275 spoe_status_code = SPOE_FRM_ERR_INVALID;
1276 goto error;
1277 }
1278
1279 return 0;
1280
1281 error:
1282 if (spoe_status_code >= SPOE_FRM_ERRS)
1283 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1284 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1285 return -1;
1286}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001287
Christopher Fauleta1cda022016-12-21 08:58:06 +01001288/* Send a SPOE frame to an agent. It returns -1 when an error occurred, 0 when
1289 * the frame can be ignored, 1 to retry later, and the frame legnth on
1290 * success. */
1291static int
1292send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
1293{
1294 struct stream_interface *si = appctx->owner;
1295 int ret;
1296 uint32_t netint;
1297
1298 if (si_ic(si)->buf == &buf_empty)
1299 return 1;
1300
1301 netint = htonl(framesz);
1302 memcpy(buf, (char *)&netint, 4);
1303 ret = bi_putblk(si_ic(si), buf, framesz+4);
1304
1305 if (ret <= 0) {
1306 if (ret == -1)
1307 return 1; /* retry */
1308 return -1; /* error */
1309 }
1310 return framesz;
1311}
1312
1313/* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0
1314 * when the frame can be ignored, 1 to retry later and the frame length on
1315 * success. */
1316static int
1317recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
1318{
1319 struct stream_interface *si = appctx->owner;
1320 int ret;
1321 uint32_t netint;
1322
1323 if (si_oc(si)->buf == &buf_empty)
1324 return 1;
1325
1326 ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0);
1327 if (ret > 0) {
1328 framesz = ntohl(netint);
1329 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1330 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1331 return -1;
1332 }
1333 ret = bo_getblk(si_oc(si), trash.str, framesz, 4);
1334 }
1335 if (ret <= 0) {
1336 if (ret == 0)
1337 return 1; /* retry */
1338 spoe_status_code = SPOE_FRM_ERR_IO;
1339 return -1; /* error */
1340 }
1341 return framesz;
1342}
1343
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001344/********************************************************************
1345 * Functions that manage the SPOE applet
1346 ********************************************************************/
1347/* Callback function that catches applet timeouts. If a timeout occurred, we set
1348 * <appctx->st1> flag and the SPOE applet is woken up. */
1349static struct task *
1350process_spoe_applet(struct task * task)
1351{
1352 struct appctx *appctx = task->context;
1353
1354 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1355 if (tick_is_expired(task->expire, now_ms)) {
1356 task->expire = TICK_ETERNITY;
1357 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1358 }
1359 si_applet_want_get(appctx->owner);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001360 si_applet_want_put(appctx->owner);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001361 appctx_wakeup(appctx);
1362 return task;
1363}
1364
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001365/* Callback function that releases a SPOE applet. This happens when the
1366 * connection with the agent is closed. */
1367static void
1368release_spoe_applet(struct appctx *appctx)
1369{
1370 struct stream_interface *si = appctx->owner;
1371 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001372 struct spoe_context *ctx, *back;
1373
1374 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1375 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1376 __FUNCTION__, appctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001377
Christopher Fauleta1cda022016-12-21 08:58:06 +01001378 agent->applets_act--;
1379 if (!LIST_ISEMPTY(&APPCTX_SPOE(appctx).list)) {
1380 LIST_DEL(&APPCTX_SPOE(appctx).list);
1381 LIST_INIT(&APPCTX_SPOE(appctx).list);
1382 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001383
1384 if (appctx->st0 != SPOE_APPCTX_ST_END) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001385 if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
1386 agent->applets_idle--;
1387
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001388 si_shutw(si);
1389 si_shutr(si);
1390 si_ic(si)->flags |= CF_READ_NULL;
1391 appctx->st0 = SPOE_APPCTX_ST_END;
1392 }
1393
Christopher Fauleta1cda022016-12-21 08:58:06 +01001394 if (APPCTX_SPOE(appctx).task) {
1395 task_delete(APPCTX_SPOE(appctx).task);
1396 task_free(APPCTX_SPOE(appctx).task);
1397 }
1398
1399 list_for_each_entry_safe(ctx, back, &APPCTX_SPOE(appctx).waiting_queue, list) {
1400 LIST_DEL(&ctx->list);
1401 LIST_INIT(&ctx->list);
1402 ctx->state = SPOE_CTX_ST_ERROR;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001403 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001404 }
1405
Christopher Fauleta1cda022016-12-21 08:58:06 +01001406 if (!LIST_ISEMPTY(&agent->applets))
1407 return;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001408
Christopher Fauleta1cda022016-12-21 08:58:06 +01001409 list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
1410 LIST_DEL(&ctx->list);
1411 LIST_INIT(&ctx->list);
1412 ctx->state = SPOE_CTX_ST_ERROR;
1413 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001414 }
1415
Christopher Fauleta1cda022016-12-21 08:58:06 +01001416 list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
1417 LIST_DEL(&ctx->list);
1418 LIST_INIT(&ctx->list);
1419 ctx->state = SPOE_CTX_ST_ERROR;
1420 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1421 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001422}
1423
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001424static int
Christopher Fauleta1cda022016-12-21 08:58:06 +01001425handle_connect_spoe_applet(struct appctx *appctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001426{
Christopher Fauleta1cda022016-12-21 08:58:06 +01001427 struct stream_interface *si = appctx->owner;
1428 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1429 char *frame = trash.str;
1430 int ret;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001431
Christopher Fauleta1cda022016-12-21 08:58:06 +01001432 if (si->state <= SI_ST_CON) {
1433 si_applet_want_put(si);
1434 task_wakeup(si_strm(si)->task, TASK_WOKEN_MSG);
1435 goto stop;
1436 }
1437 if (si->state != SI_ST_EST)
1438 goto exit;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001439
Christopher Fauleta1cda022016-12-21 08:58:06 +01001440 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1441 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
1442 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
1443 goto exit;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001444 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001445
Christopher Fauleta1cda022016-12-21 08:58:06 +01001446 if (APPCTX_SPOE(appctx).task->expire == TICK_ETERNITY)
1447 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1448
1449 ret = prepare_spoe_hahello_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
1450 if (ret > 1)
1451 ret = send_spoe_frame(appctx, frame, ret);
1452
1453 switch (ret) {
1454 case -1: /* error */
1455 goto exit;
1456
1457 case 0: /* ignore => an error, cannot be ignored */
1458 goto exit;
1459
1460 case 1: /* retry later */
1461 si_applet_cant_put(si);
1462 goto stop;
1463
1464 default: /* CONNECT frame successfully sent */
1465 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1466 goto next;
1467 }
1468
1469 next:
1470 return 0;
1471 stop:
1472 return 1;
1473 exit:
1474 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1475 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001476}
1477
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001478static int
Christopher Fauleta1cda022016-12-21 08:58:06 +01001479handle_connecting_spoe_applet(struct appctx *appctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001480{
Christopher Fauleta1cda022016-12-21 08:58:06 +01001481 struct stream_interface *si = appctx->owner;
1482 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1483 char *frame = trash.str;
1484 int ret, framesz = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001485
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001486
Christopher Fauleta1cda022016-12-21 08:58:06 +01001487 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
1488 goto exit;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001489
Christopher Fauleta1cda022016-12-21 08:58:06 +01001490 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1491 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
1492 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
1493 goto exit;
1494 }
1495
1496 ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
1497 if (ret > 1) {
1498 if (*frame == SPOE_FRM_T_AGENT_DISCON) {
1499 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1500 goto next;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001501 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001502 framesz = ret;
1503 ret = handle_spoe_agenthello_frame(appctx, frame, framesz);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001504 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001505
Christopher Fauleta1cda022016-12-21 08:58:06 +01001506 switch (ret) {
1507 case -1: /* error */
1508 if (framesz)
1509 bo_skip(si_oc(si), framesz+4);
1510 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1511 goto next;
1512
1513 case 0: /* ignore */
1514 if (framesz)
1515 bo_skip(si_oc(si), framesz+4);
1516 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1517 goto next;
1518
1519 case 1: /* retry later */
1520 goto stop;
1521
1522 default:
1523 /* hello handshake is finished, set the idle timeout,
1524 * Add the appctx in the agent cache, decrease the
1525 * number of new applets and wake up waiting streams. */
1526 if (framesz)
1527 bo_skip(si_oc(si), framesz+4);
1528 agent->applets_idle++;
1529 appctx->st0 = SPOE_APPCTX_ST_IDLE;
1530 goto next;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001531 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001532
Christopher Fauleta1cda022016-12-21 08:58:06 +01001533 next:
1534 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1535 return 0;
1536 stop:
1537 return 1;
1538 exit:
1539 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1540 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001541}
1542
Christopher Fauleta1cda022016-12-21 08:58:06 +01001543static int
1544handle_processing_spoe_applet(struct appctx *appctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001545{
1546 struct stream_interface *si = appctx->owner;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001547 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001548 struct spoe_context *ctx;
1549 char *frame = trash.str;
1550 unsigned int fpa = 0;
1551 int ret, framesz = 0, skip_sending = 0, skip_receiving = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001552
Christopher Fauleta1cda022016-12-21 08:58:06 +01001553 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
1554 goto exit;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001555
Christopher Fauleta1cda022016-12-21 08:58:06 +01001556 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1557 spoe_status_code = SPOE_FRM_ERR_TOUT;
1558 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1559 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1560 goto next;
1561 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001562
Christopher Fauleta1cda022016-12-21 08:58:06 +01001563 process:
1564 if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
1565 goto stop;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001566
Christopher Fauleta1cda022016-12-21 08:58:06 +01001567 /* Frames must be handled synchronously and a the applet is waiting for
1568 * a ACK frame */
1569 if (!(APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) &&
1570 !LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
1571 if (skip_receiving)
1572 goto stop;
1573 goto recv_frame;
1574 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001575
Christopher Fauleta1cda022016-12-21 08:58:06 +01001576 if (LIST_ISEMPTY(&agent->sending_queue) || skip_sending) {
1577 skip_sending = 1;
1578 goto recv_frame;
1579 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001580
Christopher Fauleta1cda022016-12-21 08:58:06 +01001581 ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
1582 ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
1583 if (ret > 1)
1584 ret = send_spoe_frame(appctx, frame, ret);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001585
Christopher Fauleta1cda022016-12-21 08:58:06 +01001586 switch (ret) {
1587 case -1: /* error */
1588 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1589 goto next;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001590
Christopher Fauleta1cda022016-12-21 08:58:06 +01001591 case 0: /* ignore */
1592 agent->sending_rate++;
1593 ctx->state = SPOE_CTX_ST_ERROR;
1594 release_spoe_buffer(ctx);
1595 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1596 LIST_DEL(&ctx->list);
1597 LIST_INIT(&ctx->list);
1598 fpa++;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001599 break;
1600
Christopher Fauleta1cda022016-12-21 08:58:06 +01001601 case 1: /* retry */
1602 si_applet_cant_put(si);
1603 skip_sending = 1;
1604 break;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001605
Christopher Fauleta1cda022016-12-21 08:58:06 +01001606 default:
1607 agent->sending_rate++;
1608 ctx->state = SPOE_CTX_ST_WAITING_ACK;
1609 release_spoe_buffer(ctx);
1610 LIST_DEL(&ctx->list);
1611 LIST_INIT(&ctx->list);
1612 if (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_ASYNC)
1613 LIST_ADDQ(&agent->waiting_queue, &ctx->list);
1614 else
1615 LIST_ADDQ(&APPCTX_SPOE(appctx).waiting_queue, &ctx->list);
1616 fpa++;
1617 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001618
Christopher Fauleta1cda022016-12-21 08:58:06 +01001619 if (fpa > agent->max_fpa)
1620 goto stop;
1621
1622 recv_frame:
1623 if (skip_receiving)
1624 goto process;
1625
1626 framesz = 0;
1627 ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
1628 if (ret > 1) {
1629 if (*frame == SPOE_FRM_T_AGENT_DISCON) {
1630 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1631 goto next;
1632 }
1633 framesz = ret;
1634 ret = handle_spoe_agentack_frame(appctx, frame, framesz);
1635 }
1636
1637 switch (ret) {
1638 case -1: /* error */
1639 if (framesz)
1640 bo_skip(si_oc(si), framesz+4);
1641 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1642 goto next;
1643
1644 case 0: /* ignore */
1645 if (framesz)
1646 bo_skip(si_oc(si), framesz+4);
1647 fpa++;
1648 break;
1649
1650 case 1: /* retry */
1651 skip_receiving = 1;
1652 break;
1653
1654 default:
1655 if (framesz)
1656 bo_skip(si_oc(si), framesz+4);
1657 fpa++;
1658 }
1659 goto process;
1660
1661 next:
1662 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1663 return 0;
1664 stop:
1665 if ((APPCTX_SPOE(appctx).flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) ||
1666 LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
1667 agent->applets_idle++;
1668 appctx->st0 = SPOE_APPCTX_ST_IDLE;
1669 }
1670 if (fpa || (APPCTX_SPOE(appctx).flags & SPOE_APPCTX_FL_PERSIST)) {
1671 LIST_DEL(&APPCTX_SPOE(appctx).list);
1672 LIST_ADD(&agent->applets, &APPCTX_SPOE(appctx).list);
1673 if (fpa)
1674 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1675 }
1676 return 1;
1677
1678 exit:
1679 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1680 return 0;
1681}
1682
1683static int
1684handle_disconnect_spoe_applet(struct appctx *appctx)
1685{
1686 struct stream_interface *si = appctx->owner;
1687 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1688 char *frame = trash.str;
1689 int ret;
1690
1691 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
1692 goto exit;
1693
1694 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
1695 goto exit;
1696
1697 ret = prepare_spoe_hadiscon_frame(appctx, frame+4, APPCTX_SPOE(appctx).max_frame_size);
1698 if (ret > 1)
1699 ret = send_spoe_frame(appctx, frame, ret);
1700
1701 switch (ret) {
1702 case -1: /* error */
1703 goto exit;
1704
1705 case 0: /* ignore */
1706 goto exit;
1707
1708 case 1: /* retry */
1709 si_applet_cant_put(si);
1710 goto stop;
1711
1712 default:
1713 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1714 " - disconnected by HAProxy (%d): %s\n",
1715 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1716 __FUNCTION__, appctx, spoe_status_code,
1717 spoe_frm_err_reasons[spoe_status_code]);
1718
1719 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1720 goto next;
1721 }
1722
1723 next:
1724 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1725 return 0;
1726 stop:
1727 return 1;
1728 exit:
1729 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1730 return 0;
1731}
1732
1733static int
1734handle_disconnecting_spoe_applet(struct appctx *appctx)
1735{
1736 struct stream_interface *si = appctx->owner;
1737 char *frame = trash.str;
1738 int ret, framesz = 0;
1739
1740 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
1741 goto exit;
1742
1743 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
1744 goto exit;
1745
1746 framesz = 0;
1747 ret = recv_spoe_frame(appctx, frame, APPCTX_SPOE(appctx).max_frame_size);
1748 if (ret > 1) {
1749 framesz = ret;
1750 ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz);
1751 }
1752
1753 switch (ret) {
1754 case -1: /* error */
1755 if (framesz)
1756 bo_skip(si_oc(si), framesz+4);
1757 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1758 " - error on frame (%s)\n",
1759 (int)now.tv_sec, (int)now.tv_usec,
1760 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1761 __FUNCTION__, appctx,
1762 spoe_frm_err_reasons[spoe_status_code]);
1763 goto exit;
1764
1765 case 0: /* ignore */
1766 if (framesz)
1767 bo_skip(si_oc(si), framesz+4);
1768 goto next;
1769
1770 case 1: /* retry */
1771 goto stop;
1772
1773 default:
1774 if (framesz)
1775 bo_skip(si_oc(si), framesz+4);
1776 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1777 " - disconnected by peer (%d): %s\n",
1778 (int)now.tv_sec, (int)now.tv_usec,
1779 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1780 __FUNCTION__, appctx, spoe_status_code,
1781 spoe_reason);
1782 goto exit;
1783 }
1784
1785 next:
1786 return 0;
1787 stop:
1788 return 1;
1789 exit:
1790 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1791 return 0;
1792}
1793
1794/* I/O Handler processing messages exchanged with the agent */
1795static void
1796handle_spoe_applet(struct appctx *appctx)
1797{
1798 struct stream_interface *si = appctx->owner;
1799 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1800
1801 switchstate:
1802 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1803 " - appctx-state=%s\n",
1804 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1805 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1806
1807 switch (appctx->st0) {
1808 case SPOE_APPCTX_ST_CONNECT:
1809 spoe_status_code = SPOE_FRM_ERR_NONE;
1810 if (handle_connect_spoe_applet(appctx))
1811 goto out;
1812 goto switchstate;
1813
1814 case SPOE_APPCTX_ST_CONNECTING:
1815 if (handle_connecting_spoe_applet(appctx))
1816 goto out;
1817 goto switchstate;
1818
1819 case SPOE_APPCTX_ST_IDLE:
1820 if (stopping &&
1821 LIST_ISEMPTY(&agent->sending_queue) &&
1822 LIST_ISEMPTY(&APPCTX_SPOE(appctx).waiting_queue)) {
1823 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1824 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001825 goto switchstate;
1826 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001827 agent->applets_idle--;
1828 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1829 /* fall through */
1830
1831 case SPOE_APPCTX_ST_PROCESSING:
1832 if (handle_processing_spoe_applet(appctx))
1833 goto out;
1834 goto switchstate;
1835
1836 case SPOE_APPCTX_ST_DISCONNECT:
1837 if (handle_disconnect_spoe_applet(appctx))
1838 goto out;
1839 goto switchstate;
1840
1841 case SPOE_APPCTX_ST_DISCONNECTING:
1842 if (handle_disconnecting_spoe_applet(appctx))
1843 goto out;
1844 goto switchstate;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001845
1846 case SPOE_APPCTX_ST_EXIT:
1847 si_shutw(si);
1848 si_shutr(si);
1849 si_ic(si)->flags |= CF_READ_NULL;
1850 appctx->st0 = SPOE_APPCTX_ST_END;
1851 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1852 /* fall through */
1853
1854 case SPOE_APPCTX_ST_END:
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001855 return;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001856 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001857 out:
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001858 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1859 task_queue(APPCTX_SPOE(appctx).task);
1860 si_oc(si)->flags |= CF_READ_DONTWAIT;
1861 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001862}
1863
1864struct applet spoe_applet = {
1865 .obj_type = OBJ_TYPE_APPLET,
1866 .name = "<SPOE>", /* used for logging */
1867 .fct = handle_spoe_applet,
1868 .release = release_spoe_applet,
1869};
1870
1871/* Create a SPOE applet. On success, the created applet is returned, else
1872 * NULL. */
1873static struct appctx *
1874create_spoe_appctx(struct spoe_config *conf)
1875{
1876 struct appctx *appctx;
1877 struct session *sess;
1878 struct task *task;
1879 struct stream *strm;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001880
1881 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1882 goto out_error;
1883
1884 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1885 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1886 goto out_free_appctx;
1887 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001888 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;//tick_add_ifset(now_ms, conf->agent->timeout.hello);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001889 APPCTX_SPOE(appctx).task->context = appctx;
1890 APPCTX_SPOE(appctx).agent = conf->agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001891 APPCTX_SPOE(appctx).version = 0;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001892 APPCTX_SPOE(appctx).max_frame_size = conf->agent->max_frame_size;
1893 APPCTX_SPOE(appctx).flags = 0;
1894
1895 LIST_INIT(&APPCTX_SPOE(appctx).list);
1896 LIST_INIT(&APPCTX_SPOE(appctx).waiting_queue);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001897
Willy Tarreau5820a362016-12-22 15:59:02 +01001898 sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001899 if (!sess)
1900 goto out_free_spoe;
1901
1902 if ((task = task_new()) == NULL)
1903 goto out_free_sess;
1904
1905 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1906 goto out_free_task;
1907
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001908 stream_set_backend(strm, conf->agent->b.be);
1909
1910 /* applet is waiting for data */
1911 si_applet_cant_get(&strm->si[0]);
1912 appctx_wakeup(appctx);
1913
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001914 strm->do_log = NULL;
1915 strm->res.flags |= CF_READ_DONTWAIT;
1916
1917 conf->agent_fe.feconn++;
1918 jobs++;
1919 totalconn++;
1920
Christopher Fauleta1cda022016-12-21 08:58:06 +01001921 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1922 LIST_ADDQ(&conf->agent->applets, &APPCTX_SPOE(appctx).list);
1923 conf->agent->applets_act++;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001924 return appctx;
1925
1926 /* Error unrolling */
1927 out_free_task:
1928 task_free(task);
1929 out_free_sess:
1930 session_free(sess);
1931 out_free_spoe:
1932 task_free(APPCTX_SPOE(appctx).task);
1933 out_free_appctx:
1934 appctx_free(appctx);
1935 out_error:
1936 return NULL;
1937}
1938
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001939static int
Christopher Fauleta1cda022016-12-21 08:58:06 +01001940queue_spoe_context(struct spoe_context *ctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001941{
1942 struct spoe_config *conf = FLT_CONF(ctx->filter);
1943 struct spoe_agent *agent = conf->agent;
1944 struct appctx *appctx;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001945 unsigned int min_applets;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001946
Christopher Fauleta1cda022016-12-21 08:58:06 +01001947 min_applets = min_applets_act(agent);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001948
Christopher Fauleta1cda022016-12-21 08:58:06 +01001949 /* Check if we need to create a new SPOE applet or not. */
1950 if (agent->applets_act >= min_applets && agent->applets_idle && agent->sending_rate)
1951 goto end;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001952
1953 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Fauleta1cda022016-12-21 08:58:06 +01001954 " - try to create new SPOE appctx\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001955 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1956 ctx->strm);
1957
Christopher Fauleta1cda022016-12-21 08:58:06 +01001958 /* Do not try to create a new applet if there is no server up for the
1959 * agent's backend. */
1960 if (!agent->b.be->srv_act && !agent->b.be->srv_bck) {
1961 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1962 " - cannot create SPOE appctx: no server up\n",
1963 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1964 __FUNCTION__, ctx->strm);
1965 goto end;
1966 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001967
Christopher Fauleta1cda022016-12-21 08:58:06 +01001968 /* Do not try to create a new applet if we have reached the maximum of
1969 * connection per seconds */
Christopher Faulet48026722016-11-16 15:01:12 +01001970 if (agent->cps_max > 0) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001971 if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) {
1972 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1973 " - cannot create SPOE appctx: max CPS reached\n",
1974 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1975 __FUNCTION__, ctx->strm);
1976 goto end;
1977 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001978 }
1979
Christopher Fauleta1cda022016-12-21 08:58:06 +01001980 appctx = create_spoe_appctx(conf);
1981 if (appctx == NULL) {
1982 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1983 " - failed to create SPOE appctx\n",
1984 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1985 __FUNCTION__, ctx->strm);
1986 goto end;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001987 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001988 if (agent->applets_act <= min_applets)
1989 APPCTX_SPOE(appctx).flags |= SPOE_APPCTX_FL_PERSIST;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001990
Christopher Fauleta1cda022016-12-21 08:58:06 +01001991 /* Increase the per-process number of cumulated connections */
1992 if (agent->cps_max > 0)
1993 update_freq_ctr(&agent->conn_per_sec, 1);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001994
Christopher Fauleta1cda022016-12-21 08:58:06 +01001995 end:
1996 /* The only reason to return an error is when there is no applet */
1997 if (LIST_ISEMPTY(&agent->applets))
1998 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001999
Christopher Fauleta1cda022016-12-21 08:58:06 +01002000 /* Add the SPOE context in the sending queue and update all running
2001 * info */
2002 LIST_ADDQ(&agent->sending_queue, &ctx->list);
2003 if (agent->sending_rate)
2004 agent->sending_rate--;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002005
2006 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Fauleta1cda022016-12-21 08:58:06 +01002007 " - Add stream in sending queue - applets_act=%u - applets_idle=%u"
2008 " - sending_rate=%u\n",
2009 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
2010 ctx->strm, agent->applets_act, agent->applets_idle, agent->sending_rate);
Christopher Fauletf7a30922016-11-10 15:04:51 +01002011
Christopher Fauleta1cda022016-12-21 08:58:06 +01002012 /* Finally try to wakeup the first IDLE applet found and move it at the
2013 * end of the list. */
2014 list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) {
2015 if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
2016 si_applet_want_get(appctx->owner);
2017 si_applet_want_put(appctx->owner);
2018 appctx_wakeup(appctx);
2019 LIST_DEL(&APPCTX_SPOE(appctx).list);
2020 LIST_ADDQ(&agent->applets, &APPCTX_SPOE(appctx).list);
2021 break;
2022 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002023 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002024 return 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002025}
2026
2027/***************************************************************************
2028 * Functions that process SPOE messages and actions
2029 **************************************************************************/
2030/* Process SPOE messages for a specific event. During the processing, it returns
2031 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
2032 * is returned. */
2033static int
2034process_spoe_messages(struct stream *s, struct spoe_context *ctx,
2035 struct list *messages, int dir)
2036{
Christopher Fauleta1cda022016-12-21 08:58:06 +01002037 struct spoe_config *conf = FLT_CONF(ctx->filter);
2038 struct spoe_agent *agent = conf->agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002039 struct spoe_message *msg;
2040 struct sample *smp;
2041 struct spoe_arg *arg;
2042 char *p;
2043 size_t max_size;
2044 int off, flag, idx = 0;
2045
2046 /* Reserve 32 bytes from the frame Metadata */
Christopher Fauleta1cda022016-12-21 08:58:06 +01002047 max_size = agent->max_frame_size - 32;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002048
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002049 p = ctx->buffer->p;
2050
2051 /* Loop on messages */
2052 list_for_each_entry(msg, messages, list) {
2053 if (idx + msg->id_len + 1 > max_size)
2054 goto skip;
2055
2056 /* Set the message name */
2057 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
2058
2059 /* Save offset where to store the number of arguments for this
2060 * message */
2061 off = idx++;
2062 p[off] = 0;
2063
2064 /* Loop on arguments */
2065 list_for_each_entry(arg, &msg->args, list) {
2066 p[off]++; /* Increment the number of arguments */
2067
2068 if (idx + arg->name_len + 1 > max_size)
2069 goto skip;
2070
2071 /* Encode the arguement name as a string. It can by NULL */
2072 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
2073
2074 /* Fetch the arguement value */
2075 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
2076 if (!smp) {
2077 /* If no value is available, set it to NULL */
2078 p[idx++] = SPOE_DATA_T_NULL;
2079 continue;
2080 }
2081
2082 /* Else, encode the arguement value */
2083 switch (smp->data.type) {
2084 case SMP_T_BOOL:
2085 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
2086 p[idx++] = (SPOE_DATA_T_BOOL | flag);
2087 break;
2088 case SMP_T_SINT:
2089 p[idx++] = SPOE_DATA_T_INT64;
2090 if (idx + 8 > max_size)
2091 goto skip;
2092 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
2093 break;
2094 case SMP_T_IPV4:
2095 p[idx++] = SPOE_DATA_T_IPV4;
2096 if (idx + 4 > max_size)
2097 goto skip;
2098 memcpy(p+idx, &smp->data.u.ipv4, 4);
2099 idx += 4;
2100 break;
2101 case SMP_T_IPV6:
2102 p[idx++] = SPOE_DATA_T_IPV6;
2103 if (idx + 16 > max_size)
2104 goto skip;
2105 memcpy(p+idx, &smp->data.u.ipv6, 16);
2106 idx += 16;
2107 break;
2108 case SMP_T_STR:
2109 p[idx++] = SPOE_DATA_T_STR;
2110 if (idx + smp->data.u.str.len > max_size)
2111 goto skip;
2112 idx += encode_spoe_string(smp->data.u.str.str,
2113 smp->data.u.str.len,
2114 p+idx);
2115 break;
2116 case SMP_T_BIN:
2117 p[idx++] = SPOE_DATA_T_BIN;
2118 if (idx + smp->data.u.str.len > max_size)
2119 goto skip;
2120 idx += encode_spoe_string(smp->data.u.str.str,
2121 smp->data.u.str.len,
2122 p+idx);
2123 break;
2124 case SMP_T_METH:
2125 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
2126 p[idx++] = SPOE_DATA_T_STR;
2127 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
2128 goto skip;
2129 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
2130 http_known_methods[smp->data.u.meth.meth].len,
2131 p+idx);
2132 }
2133 else {
2134 p[idx++] = SPOE_DATA_T_STR;
2135 if (idx + smp->data.u.str.len > max_size)
2136 goto skip;
2137 idx += encode_spoe_string(smp->data.u.meth.str.str,
2138 smp->data.u.meth.str.len,
2139 p+idx);
2140 }
2141 break;
2142 default:
2143 p[idx++] = SPOE_DATA_T_NULL;
2144 }
2145 }
2146 }
2147 ctx->buffer->i = idx;
2148 return 1;
2149
2150 skip:
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002151 return 0;
2152}
2153
2154/* Helper function to set a variable */
2155static void
2156set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
2157 struct sample *smp)
2158{
2159 struct spoe_config *conf = FLT_CONF(ctx->filter);
2160 struct spoe_agent *agent = conf->agent;
2161 char varname[64];
2162
2163 memset(varname, 0, sizeof(varname));
2164 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
2165 scope, agent->var_pfx, len, name);
2166 vars_set_by_name_ifexist(varname, len, smp);
2167}
2168
2169/* Helper function to unset a variable */
2170static void
2171unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
2172 struct sample *smp)
2173{
2174 struct spoe_config *conf = FLT_CONF(ctx->filter);
2175 struct spoe_agent *agent = conf->agent;
2176 char varname[64];
2177
2178 memset(varname, 0, sizeof(varname));
2179 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
2180 scope, agent->var_pfx, len, name);
2181 vars_unset_by_name_ifexist(varname, len, smp);
2182}
2183
2184
2185/* Process SPOE actions for a specific event. During the processing, it returns
2186 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
2187 * is returned. */
2188static int
2189process_spoe_actions(struct stream *s, struct spoe_context *ctx,
2190 enum spoe_event ev, int dir)
2191{
2192 char *p;
2193 size_t size;
2194 int off, i, idx = 0;
2195
2196 p = ctx->buffer->p;
2197 size = ctx->buffer->i;
2198
2199 while (idx < size) {
2200 char *str;
2201 uint64_t sz;
2202 struct sample smp;
2203 enum spoe_action_type type;
2204
2205 off = idx;
2206 if (idx+2 > size)
2207 goto skip;
2208
2209 type = p[idx++];
2210 switch (type) {
2211 case SPOE_ACT_T_SET_VAR: {
2212 char *scope;
2213
2214 if (p[idx++] != 3)
2215 goto skip_action;
2216
2217 switch (p[idx++]) {
2218 case SPOE_SCOPE_PROC: scope = "proc"; break;
2219 case SPOE_SCOPE_SESS: scope = "sess"; break;
2220 case SPOE_SCOPE_TXN : scope = "txn"; break;
2221 case SPOE_SCOPE_REQ : scope = "req"; break;
2222 case SPOE_SCOPE_RES : scope = "res"; break;
2223 default: goto skip;
2224 }
2225
2226 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2227 if (str == NULL)
2228 goto skip;
2229 memset(&smp, 0, sizeof(smp));
2230 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
Christopher Fauletb5cff602016-11-24 14:53:22 +01002231
2232 if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002233 goto skip;
Christopher Fauletb5cff602016-11-24 14:53:22 +01002234 idx += i;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002235
2236 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2237 " - set-var '%s.%s.%.*s'\n",
2238 (int)now.tv_sec, (int)now.tv_usec,
2239 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2240 __FUNCTION__, s, scope,
2241 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2242 (int)sz, str);
2243
2244 set_spoe_var(ctx, scope, str, sz, &smp);
2245 break;
2246 }
2247
2248 case SPOE_ACT_T_UNSET_VAR: {
2249 char *scope;
2250
2251 if (p[idx++] != 2)
2252 goto skip_action;
2253
2254 switch (p[idx++]) {
2255 case SPOE_SCOPE_PROC: scope = "proc"; break;
2256 case SPOE_SCOPE_SESS: scope = "sess"; break;
2257 case SPOE_SCOPE_TXN : scope = "txn"; break;
2258 case SPOE_SCOPE_REQ : scope = "req"; break;
2259 case SPOE_SCOPE_RES : scope = "res"; break;
2260 default: goto skip;
2261 }
2262
2263 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2264 if (str == NULL)
2265 goto skip;
2266 memset(&smp, 0, sizeof(smp));
2267 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2268
2269 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2270 " - unset-var '%s.%s.%.*s'\n",
2271 (int)now.tv_sec, (int)now.tv_usec,
2272 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2273 __FUNCTION__, s, scope,
2274 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2275 (int)sz, str);
2276
2277 unset_spoe_var(ctx, scope, str, sz, &smp);
2278 break;
2279 }
2280
2281 default:
2282 skip_action:
2283 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2284 goto skip;
2285 idx += i;
2286 }
2287 }
2288
2289 return 1;
2290 skip:
2291 return 0;
2292}
2293
Christopher Fauleta1cda022016-12-21 08:58:06 +01002294static int
2295start_event_processing(struct spoe_context *ctx, int dir)
2296{
2297 int ret;
2298 /* If a process is already started for this SPOE context, retry
2299 * later. */
2300 if (ctx->flags & SPOE_CTX_FL_PROCESS)
2301 goto wait;
2302
2303 ret = acquire_spoe_buffer(ctx);
2304 if (ret <= 0)
2305 return ret;
2306
2307 /* Set the right flag to prevent request and response processing
2308 * in same time. */
2309 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
2310 ? SPOE_CTX_FL_REQ_PROCESS
2311 : SPOE_CTX_FL_RSP_PROCESS);
2312
2313 return 1;
2314
2315 wait:
2316 return 0;
2317}
2318
2319static void
2320stop_event_processing(struct spoe_context *ctx)
2321{
2322 /* Reset the flag to allow next processing */
2323 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2324
2325 /* Reset processing timer */
2326 ctx->process_exp = TICK_ETERNITY;
2327
2328 release_spoe_buffer(ctx);
2329
2330 if (!LIST_ISEMPTY(&ctx->list)) {
2331 LIST_DEL(&ctx->list);
2332 LIST_INIT(&ctx->list);
2333 }
2334}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002335
2336/* Process a SPOE event. First, this functions will process messages attached to
2337 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2338 * ACK frame to process corresponding actions. During all the processing, it
2339 * returns 0 and it returns 1 when the processing is finished. If an error
2340 * occurred, -1 is returned. */
2341static int
2342process_spoe_event(struct stream *s, struct spoe_context *ctx,
2343 enum spoe_event ev)
2344{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002345 struct spoe_config *conf = FLT_CONF(ctx->filter);
2346 struct spoe_agent *agent = conf->agent;
2347 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002348
2349 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2350 " - ctx-state=%s - event=%s\n",
2351 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002352 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002353 spoe_event_str[ev]);
2354
Christopher Faulet48026722016-11-16 15:01:12 +01002355
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002356 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2357
2358 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2359 goto out;
2360
2361 if (ctx->state == SPOE_CTX_ST_ERROR)
2362 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002363
2364 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2365 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2366 " - failed to process event '%s': timeout\n",
2367 (int)now.tv_sec, (int)now.tv_usec,
2368 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2369 send_log(ctx->strm->be, LOG_WARNING,
2370 "failed to process event '%s': timeout.\n",
2371 spoe_event_str[ev]);
2372 goto error;
2373 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002374
2375 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01002376 if (agent->eps_max > 0) {
2377 if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2378 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2379 " - skip event '%s': max EPS reached\n",
2380 (int)now.tv_sec, (int)now.tv_usec,
2381 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2382 goto skip;
2383 }
2384 }
2385
Christopher Fauletf7a30922016-11-10 15:04:51 +01002386 if (!tick_isset(ctx->process_exp)) {
2387 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2388 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2389 ctx->process_exp);
2390 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002391 ret = start_event_processing(ctx, dir);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002392 if (ret <= 0) {
2393 if (!ret)
2394 goto out;
2395 goto error;
2396 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002397 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2398 if (ret <= 0) {
2399 if (!ret)
2400 goto skip;
2401 goto error;
2402 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002403
2404 if (!queue_spoe_context(ctx))
2405 goto error;
2406
2407 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2408 /* fall through */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002409 }
2410
Christopher Fauleta1cda022016-12-21 08:58:06 +01002411 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS ||
2412 ctx->state == SPOE_CTX_ST_WAITING_ACK) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002413 ret = 0;
2414 goto out;
2415 }
2416
2417 if (ctx->state == SPOE_CTX_ST_DONE) {
2418 ret = process_spoe_actions(s, ctx, ev, dir);
2419 if (ret <= 0) {
2420 if (!ret)
2421 goto skip;
2422 goto error;
2423 }
2424 ctx->frame_id++;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002425 ctx->state = SPOE_CTX_ST_READY;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002426 goto end;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002427 }
2428
2429 out:
2430 return ret;
2431
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002432 error:
Christopher Faulet48026722016-11-16 15:01:12 +01002433 if (agent->eps_max > 0)
2434 update_freq_ctr(&agent->err_per_sec, 1);
2435
Christopher Faulet985532d2016-11-16 15:36:19 +01002436 if (agent->var_on_error) {
2437 struct sample smp;
2438
Christopher Fauleta1cda022016-12-21 08:58:06 +01002439 // FIXME: Get the error code here
Christopher Faulet985532d2016-11-16 15:36:19 +01002440 memset(&smp, 0, sizeof(smp));
2441 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2442 smp.data.u.sint = 1;
2443 smp.data.type = SMP_T_BOOL;
2444
2445 set_spoe_var(ctx, "txn", agent->var_on_error,
2446 strlen(agent->var_on_error), &smp);
2447 }
2448
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002449 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2450 ? SPOE_CTX_ST_READY
2451 : SPOE_CTX_ST_ERROR);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002452 ret = 1;
2453 goto end;
2454
2455 skip:
2456 ctx->state = SPOE_CTX_ST_READY;
2457 ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002458
Christopher Fauleta1cda022016-12-21 08:58:06 +01002459 end:
2460 stop_event_processing(ctx);
2461 return ret;
2462}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002463
2464/***************************************************************************
2465 * Functions that create/destroy SPOE contexts
2466 **************************************************************************/
Christopher Fauleta1cda022016-12-21 08:58:06 +01002467static int
2468acquire_spoe_buffer(struct spoe_context *ctx)
2469{
2470 if (ctx->buffer != &buf_empty)
2471 return 1;
2472
2473 if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
2474 LIST_DEL(&ctx->buffer_wait.list);
2475 LIST_INIT(&ctx->buffer_wait.list);
2476 }
2477
2478 if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs))
2479 return 1;
2480
2481 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
2482 return 0;
2483}
2484
2485static void
2486release_spoe_buffer(struct spoe_context *ctx)
2487{
2488 if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
2489 LIST_DEL(&ctx->buffer_wait.list);
2490 LIST_INIT(&ctx->buffer_wait.list);
2491 }
2492
2493 /* Release the buffer if needed */
2494 if (ctx->buffer != &buf_empty) {
2495 b_free(&ctx->buffer);
2496 offer_buffers(ctx, tasks_run_queue + applets_active_queue);
2497 }
2498}
2499
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002500static int wakeup_spoe_context(struct spoe_context *ctx)
2501{
2502 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
2503 return 1;
2504}
2505
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002506static struct spoe_context *
2507create_spoe_context(struct filter *filter)
2508{
2509 struct spoe_config *conf = FLT_CONF(filter);
2510 struct spoe_context *ctx;
2511
2512 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2513 if (ctx == NULL) {
2514 return NULL;
2515 }
2516 memset(ctx, 0, sizeof(*ctx));
2517 ctx->filter = filter;
2518 ctx->state = SPOE_CTX_ST_NONE;
2519 ctx->flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002520 ctx->messages = conf->agent->messages;
2521 ctx->buffer = &buf_empty;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002522 LIST_INIT(&ctx->buffer_wait.list);
2523 ctx->buffer_wait.target = ctx;
2524 ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002525 LIST_INIT(&ctx->list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002526
Christopher Fauletf7a30922016-11-10 15:04:51 +01002527 ctx->stream_id = 0;
2528 ctx->frame_id = 1;
2529 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002530
2531 return ctx;
2532}
2533
2534static void
2535destroy_spoe_context(struct spoe_context *ctx)
2536{
2537 if (!ctx)
2538 return;
2539
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002540 if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
2541 LIST_DEL(&ctx->buffer_wait.list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002542 if (!LIST_ISEMPTY(&ctx->list))
2543 LIST_DEL(&ctx->list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002544 pool_free2(pool2_spoe_ctx, ctx);
2545}
2546
2547static void
2548reset_spoe_context(struct spoe_context *ctx)
2549{
2550 ctx->state = SPOE_CTX_ST_READY;
2551 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2552}
2553
2554
2555/***************************************************************************
2556 * Hooks that manage the filter lifecycle (init/check/deinit)
2557 **************************************************************************/
2558/* Signal handler: Do a soft stop, wakeup SPOE applet */
2559static void
2560sig_stop_spoe(struct sig_handler *sh)
2561{
2562 struct proxy *p;
2563
2564 p = proxy;
2565 while (p) {
2566 struct flt_conf *fconf;
2567
2568 list_for_each_entry(fconf, &p->filter_configs, list) {
Christopher Faulet3b386a32017-02-23 10:17:15 +01002569 struct spoe_config *conf;
2570 struct spoe_agent *agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002571 struct appctx *appctx;
2572
Christopher Faulet3b386a32017-02-23 10:17:15 +01002573 if (fconf->id != spoe_filter_id)
2574 continue;
2575
2576 conf = fconf->conf;
2577 agent = conf->agent;
2578
Christopher Fauleta1cda022016-12-21 08:58:06 +01002579 list_for_each_entry(appctx, &agent->applets, ctx.spoe.list) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002580 si_applet_want_get(appctx->owner);
2581 si_applet_want_put(appctx->owner);
2582 appctx_wakeup(appctx);
2583 }
2584 }
2585 p = p->next;
2586 }
2587}
2588
2589
2590/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2591static int
2592spoe_init(struct proxy *px, struct flt_conf *fconf)
2593{
2594 struct spoe_config *conf = fconf->conf;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002595
2596 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2597 init_new_proxy(&conf->agent_fe);
2598 conf->agent_fe.parent = conf->agent;
2599 conf->agent_fe.last_change = now.tv_sec;
2600 conf->agent_fe.id = conf->agent->id;
2601 conf->agent_fe.cap = PR_CAP_FE;
2602 conf->agent_fe.mode = PR_MODE_TCP;
2603 conf->agent_fe.maxconn = 0;
2604 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2605 conf->agent_fe.conn_retries = CONN_RETRIES;
2606 conf->agent_fe.accept = frontend_accept;
2607 conf->agent_fe.srv = NULL;
2608 conf->agent_fe.timeout.client = TICK_ETERNITY;
2609 conf->agent_fe.default_target = &spoe_applet.obj_type;
2610 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2611
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002612 if (!sighandler_registered) {
2613 signal_register_fct(0, sig_stop_spoe, 0);
2614 sighandler_registered = 1;
2615 }
2616
2617 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002618}
2619
2620/* Free ressources allocated by the SPOE filter. */
2621static void
2622spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2623{
2624 struct spoe_config *conf = fconf->conf;
2625
2626 if (conf) {
2627 struct spoe_agent *agent = conf->agent;
2628 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2629 struct listener *, by_fe);
2630
2631 free(l);
2632 release_spoe_agent(agent);
2633 free(conf);
2634 }
2635 fconf->conf = NULL;
2636}
2637
2638/* Check configuration of a SPOE filter for a specified proxy.
2639 * Return 1 on error, else 0. */
2640static int
2641spoe_check(struct proxy *px, struct flt_conf *fconf)
2642{
2643 struct spoe_config *conf = fconf->conf;
2644 struct proxy *target;
2645
2646 target = proxy_be_by_name(conf->agent->b.name);
2647 if (target == NULL) {
2648 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2649 " declared at %s:%d.\n",
2650 px->id, conf->agent->b.name, conf->agent->id,
2651 conf->agent->conf.file, conf->agent->conf.line);
2652 return 1;
2653 }
2654 if (target->mode != PR_MODE_TCP) {
2655 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2656 " at %s:%d does not support HTTP mode.\n",
2657 px->id, target->id, conf->agent->id,
2658 conf->agent->conf.file, conf->agent->conf.line);
2659 return 1;
2660 }
2661
2662 free(conf->agent->b.name);
2663 conf->agent->b.name = NULL;
2664 conf->agent->b.be = target;
2665 return 0;
2666}
2667
2668/**************************************************************************
2669 * Hooks attached to a stream
2670 *************************************************************************/
2671/* Called when a filter instance is created and attach to a stream. It creates
2672 * the context that will be used to process this stream. */
2673static int
2674spoe_start(struct stream *s, struct filter *filter)
2675{
2676 struct spoe_context *ctx;
2677
2678 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2679 (int)now.tv_sec, (int)now.tv_usec,
2680 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2681 __FUNCTION__, s);
2682
2683 ctx = create_spoe_context(filter);
2684 if (ctx == NULL) {
2685 send_log(s->be, LOG_EMERG,
2686 "failed to create SPOE context for proxy %s\n",
2687 s->be->id);
2688 return 0;
2689 }
2690
2691 ctx->strm = s;
2692 ctx->state = SPOE_CTX_ST_READY;
2693 filter->ctx = ctx;
2694
2695 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2696 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2697
2698 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2699 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2700
2701 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2702 filter->pre_analyzers |= AN_RES_INSPECT;
2703
2704 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2705 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2706
2707 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2708 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2709
2710 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2711 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2712
2713 return 1;
2714}
2715
2716/* Called when a filter instance is detached from a stream. It release the
2717 * attached SPOE context. */
2718static void
2719spoe_stop(struct stream *s, struct filter *filter)
2720{
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002721 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2722 (int)now.tv_sec, (int)now.tv_usec,
2723 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2724 __FUNCTION__, s);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002725 destroy_spoe_context(filter->ctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002726}
2727
Christopher Fauletf7a30922016-11-10 15:04:51 +01002728
2729/*
2730 * Called when the stream is woken up because of expired timer.
2731 */
2732static void
2733spoe_check_timeouts(struct stream *s, struct filter *filter)
2734{
2735 struct spoe_context *ctx = filter->ctx;
2736
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002737 if (tick_is_expired(ctx->process_exp, now_ms)) {
2738 s->pending_events |= TASK_WOKEN_MSG;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002739 release_spoe_buffer(ctx);
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002740 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01002741}
2742
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002743/* Called when we are ready to filter data on a channel */
2744static int
2745spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2746{
2747 struct spoe_context *ctx = filter->ctx;
2748 int ret = 1;
2749
2750 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2751 " - ctx-flags=0x%08x\n",
2752 (int)now.tv_sec, (int)now.tv_usec,
2753 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2754 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2755
2756 if (!(chn->flags & CF_ISRESP)) {
2757 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2758 chn->analysers |= AN_REQ_INSPECT_FE;
2759 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2760 chn->analysers |= AN_REQ_INSPECT_BE;
2761
2762 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2763 goto out;
2764
2765 ctx->stream_id = s->uniq_id;
2766 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2767 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2768 if (ret != 1)
2769 goto out;
2770 }
2771 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2772 }
2773 else {
2774 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2775 chn->analysers |= AN_RES_INSPECT;
2776
2777 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2778 goto out;
2779
2780 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2781 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2782 if (ret != 1)
2783 goto out;
2784 }
2785 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002786 if (!ret) {
2787 channel_dont_read(chn);
2788 channel_dont_close(chn);
2789 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002790 }
2791
2792 out:
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002793 return ret;
2794}
2795
2796/* Called before a processing happens on a given channel */
2797static int
2798spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2799 struct channel *chn, unsigned an_bit)
2800{
2801 struct spoe_context *ctx = filter->ctx;
2802 int ret = 1;
2803
2804 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2805 " - ctx-flags=0x%08x - ana=0x%08x\n",
2806 (int)now.tv_sec, (int)now.tv_usec,
2807 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2808 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2809 ctx->flags, an_bit);
2810
2811 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2812 goto out;
2813
2814 switch (an_bit) {
2815 case AN_REQ_INSPECT_FE:
2816 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2817 break;
2818 case AN_REQ_INSPECT_BE:
2819 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2820 break;
2821 case AN_RES_INSPECT:
2822 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2823 break;
2824 case AN_REQ_HTTP_PROCESS_FE:
2825 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2826 break;
2827 case AN_REQ_HTTP_PROCESS_BE:
2828 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2829 break;
2830 case AN_RES_HTTP_PROCESS_FE:
2831 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2832 break;
2833 }
2834
2835 out:
2836 if (!ret) {
2837 channel_dont_read(chn);
2838 channel_dont_close(chn);
2839 }
2840 return ret;
2841}
2842
2843/* Called when the filtering on the channel ends. */
2844static int
2845spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2846{
2847 struct spoe_context *ctx = filter->ctx;
2848
2849 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2850 " - ctx-flags=0x%08x\n",
2851 (int)now.tv_sec, (int)now.tv_usec,
2852 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2853 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2854
2855 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2856 reset_spoe_context(ctx);
2857 }
2858
2859 return 1;
2860}
2861
2862/********************************************************************
2863 * Functions that manage the filter initialization
2864 ********************************************************************/
2865struct flt_ops spoe_ops = {
2866 /* Manage SPOE filter, called for each filter declaration */
2867 .init = spoe_init,
2868 .deinit = spoe_deinit,
2869 .check = spoe_check,
2870
2871 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002872 .attach = spoe_start,
2873 .detach = spoe_stop,
2874 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002875
2876 /* Handle channels activity */
2877 .channel_start_analyze = spoe_start_analyze,
2878 .channel_pre_analyze = spoe_chn_pre_analyze,
2879 .channel_end_analyze = spoe_end_analyze,
2880};
2881
2882
2883static int
2884cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2885{
2886 const char *err;
2887 int i, err_code = 0;
2888
2889 if ((cfg_scope == NULL && curengine != NULL) ||
2890 (cfg_scope != NULL && curengine == NULL) ||
2891 strcmp(curengine, cfg_scope))
2892 goto out;
2893
2894 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2895 if (!*args[1]) {
2896 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2897 file, linenum);
2898 err_code |= ERR_ALERT | ERR_ABORT;
2899 goto out;
2900 }
2901 if (*args[2]) {
2902 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2903 file, linenum, args[2]);
2904 err_code |= ERR_ALERT | ERR_ABORT;
2905 goto out;
2906 }
2907
2908 err = invalid_char(args[1]);
2909 if (err) {
2910 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2911 file, linenum, *err, args[0], args[1]);
2912 err_code |= ERR_ALERT | ERR_ABORT;
2913 goto out;
2914 }
2915
2916 if (curagent != NULL) {
2917 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2918 file, linenum);
2919 err_code |= ERR_ALERT | ERR_ABORT;
2920 goto out;
2921 }
2922 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2923 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2924 err_code |= ERR_ALERT | ERR_ABORT;
2925 goto out;
2926 }
2927
2928 curagent->id = strdup(args[1]);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002929
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002930 curagent->conf.file = strdup(file);
2931 curagent->conf.line = linenum;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002932
2933 curagent->timeout.hello = TICK_ETERNITY;
2934 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002935 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002936
2937 curagent->engine_id = NULL;
2938 curagent->var_pfx = NULL;
2939 curagent->var_on_error = NULL;
2940 curagent->flags = 0;
2941 curagent->cps_max = 0;
2942 curagent->eps_max = 0;
2943 curagent->max_frame_size = global.tune.bufsize - 4;
2944 curagent->min_applets = 0;
2945 curagent->max_fpa = 100;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002946
2947 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2948 LIST_INIT(&curagent->messages[i]);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002949
2950 curagent->applets_act = 0;
2951 curagent->applets_idle = 0;
2952 curagent->sending_rate = 0;
2953
2954 LIST_INIT(&curagent->applets);
2955 LIST_INIT(&curagent->sending_queue);
2956 LIST_INIT(&curagent->waiting_queue);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002957 }
2958 else if (!strcmp(args[0], "use-backend")) {
2959 if (!*args[1]) {
2960 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2961 file, linenum, args[0]);
2962 err_code |= ERR_ALERT | ERR_FATAL;
2963 goto out;
2964 }
2965 if (*args[2]) {
2966 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2967 file, linenum, args[2]);
2968 err_code |= ERR_ALERT | ERR_ABORT;
2969 goto out;
2970 }
2971 free(curagent->b.name);
2972 curagent->b.name = strdup(args[1]);
2973 }
2974 else if (!strcmp(args[0], "messages")) {
2975 int cur_arg = 1;
2976 while (*args[cur_arg]) {
2977 struct spoe_msg_placeholder *mp = NULL;
2978
2979 list_for_each_entry(mp, &curmps, list) {
2980 if (!strcmp(mp->id, args[cur_arg])) {
2981 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2982 file, linenum, args[cur_arg]);
2983 err_code |= ERR_ALERT | ERR_FATAL;
2984 goto out;
2985 }
2986 }
2987
2988 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2989 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2990 err_code |= ERR_ALERT | ERR_ABORT;
2991 goto out;
2992 }
2993 mp->id = strdup(args[cur_arg]);
2994 LIST_ADDQ(&curmps, &mp->list);
2995 cur_arg++;
2996 }
2997 }
2998 else if (!strcmp(args[0], "timeout")) {
2999 unsigned int *tv = NULL;
3000 const char *res;
3001 unsigned timeout;
3002
3003 if (!*args[1]) {
3004 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
3005 file, linenum);
3006 err_code |= ERR_ALERT | ERR_FATAL;
3007 goto out;
3008 }
3009 if (!strcmp(args[1], "hello"))
3010 tv = &curagent->timeout.hello;
3011 else if (!strcmp(args[1], "idle"))
3012 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01003013 else if (!strcmp(args[1], "processing"))
3014 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003015 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01003016 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003017 file, linenum, args[1]);
3018 err_code |= ERR_ALERT | ERR_FATAL;
3019 goto out;
3020 }
3021 if (!*args[2]) {
3022 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
3023 file, linenum, args[1]);
3024 err_code |= ERR_ALERT | ERR_FATAL;
3025 goto out;
3026 }
3027 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
3028 if (res) {
3029 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
3030 file, linenum, *res, args[1]);
3031 err_code |= ERR_ALERT | ERR_ABORT;
3032 goto out;
3033 }
3034 if (*args[3]) {
3035 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3036 file, linenum, args[3]);
3037 err_code |= ERR_ALERT | ERR_ABORT;
3038 goto out;
3039 }
3040 *tv = MS_TO_TICKS(timeout);
3041 }
3042 else if (!strcmp(args[0], "option")) {
3043 if (!*args[1]) {
3044 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
3045 file, linenum, args[0]);
3046 err_code |= ERR_ALERT | ERR_FATAL;
3047 goto out;
3048 }
3049 if (!strcmp(args[1], "var-prefix")) {
3050 char *tmp;
3051
3052 if (!*args[2]) {
3053 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
3054 file, linenum, args[0],
3055 args[1]);
3056 err_code |= ERR_ALERT | ERR_FATAL;
3057 goto out;
3058 }
3059 tmp = args[2];
3060 while (*tmp) {
3061 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3062 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
3063 file, linenum, args[0], args[1]);
3064 err_code |= ERR_ALERT | ERR_FATAL;
3065 goto out;
3066 }
3067 tmp++;
3068 }
3069 curagent->var_pfx = strdup(args[2]);
3070 }
Christopher Fauletea62c2a2016-11-14 10:54:21 +01003071 else if (!strcmp(args[1], "continue-on-error")) {
3072 if (*args[2]) {
3073 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
Christopher Faulet48026722016-11-16 15:01:12 +01003074 file, linenum, args[2]);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01003075 err_code |= ERR_ALERT | ERR_ABORT;
3076 goto out;
3077 }
3078 curagent->flags |= SPOE_FL_CONT_ON_ERR;
3079 }
Christopher Faulet985532d2016-11-16 15:36:19 +01003080 else if (!strcmp(args[1], "set-on-error")) {
3081 char *tmp;
3082
3083 if (!*args[2]) {
3084 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
3085 file, linenum, args[0],
3086 args[1]);
3087 err_code |= ERR_ALERT | ERR_FATAL;
3088 goto out;
3089 }
3090 tmp = args[2];
3091 while (*tmp) {
3092 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3093 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
3094 file, linenum, args[0], args[1]);
3095 err_code |= ERR_ALERT | ERR_FATAL;
3096 goto out;
3097 }
3098 tmp++;
3099 }
3100 curagent->var_on_error = strdup(args[2]);
3101 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003102 else {
3103 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
3104 file, linenum, args[1]);
3105 err_code |= ERR_ALERT | ERR_FATAL;
3106 goto out;
3107 }
Christopher Faulet48026722016-11-16 15:01:12 +01003108 }
3109 else if (!strcmp(args[0], "maxconnrate")) {
3110 if (!*args[1]) {
3111 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
3112 file, linenum, args[0]);
3113 err_code |= ERR_ALERT | ERR_FATAL;
3114 goto out;
3115 }
3116 if (*args[2]) {
3117 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3118 file, linenum, args[2]);
3119 err_code |= ERR_ALERT | ERR_ABORT;
3120 goto out;
3121 }
3122 curagent->cps_max = atol(args[1]);
3123 }
3124 else if (!strcmp(args[0], "maxerrrate")) {
3125 if (!*args[1]) {
3126 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
3127 file, linenum, args[0]);
3128 err_code |= ERR_ALERT | ERR_FATAL;
3129 goto out;
3130 }
3131 if (*args[2]) {
3132 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3133 file, linenum, args[2]);
3134 err_code |= ERR_ALERT | ERR_ABORT;
3135 goto out;
3136 }
3137 curagent->eps_max = atol(args[1]);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003138 }
3139 else if (*args[0]) {
3140 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
3141 file, linenum, args[0]);
3142 err_code |= ERR_ALERT | ERR_FATAL;
3143 goto out;
3144 }
3145 out:
3146 return err_code;
3147}
3148
3149static int
3150cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
3151{
3152 struct spoe_message *msg;
3153 struct spoe_arg *arg;
3154 const char *err;
3155 char *errmsg = NULL;
3156 int err_code = 0;
3157
3158 if ((cfg_scope == NULL && curengine != NULL) ||
3159 (cfg_scope != NULL && curengine == NULL) ||
3160 strcmp(curengine, cfg_scope))
3161 goto out;
3162
3163 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
3164 if (!*args[1]) {
3165 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
3166 file, linenum);
3167 err_code |= ERR_ALERT | ERR_ABORT;
3168 goto out;
3169 }
3170 if (*args[2]) {
3171 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3172 file, linenum, args[2]);
3173 err_code |= ERR_ALERT | ERR_ABORT;
3174 goto out;
3175 }
3176
3177 err = invalid_char(args[1]);
3178 if (err) {
3179 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
3180 file, linenum, *err, args[0], args[1]);
3181 err_code |= ERR_ALERT | ERR_ABORT;
3182 goto out;
3183 }
3184
3185 list_for_each_entry(msg, &curmsgs, list) {
3186 if (!strcmp(msg->id, args[1])) {
3187 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
3188 " name as another one declared at %s:%d.\n",
3189 file, linenum, args[1], msg->conf.file, msg->conf.line);
3190 err_code |= ERR_ALERT | ERR_FATAL;
3191 goto out;
3192 }
3193 }
3194
3195 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
3196 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
3197 err_code |= ERR_ALERT | ERR_ABORT;
3198 goto out;
3199 }
3200
3201 curmsg->id = strdup(args[1]);
3202 curmsg->id_len = strlen(curmsg->id);
3203 curmsg->event = SPOE_EV_NONE;
3204 curmsg->conf.file = strdup(file);
3205 curmsg->conf.line = linenum;
3206 LIST_INIT(&curmsg->args);
3207 LIST_ADDQ(&curmsgs, &curmsg->list);
3208 }
3209 else if (!strcmp(args[0], "args")) {
3210 int cur_arg = 1;
3211
3212 curproxy->conf.args.ctx = ARGC_SPOE;
3213 curproxy->conf.args.file = file;
3214 curproxy->conf.args.line = linenum;
3215 while (*args[cur_arg]) {
3216 char *delim = strchr(args[cur_arg], '=');
3217 int idx = 0;
3218
3219 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
3220 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
3221 err_code |= ERR_ALERT | ERR_ABORT;
3222 goto out;
3223 }
3224
3225 if (!delim) {
3226 arg->name = NULL;
3227 arg->name_len = 0;
3228 delim = args[cur_arg];
3229 }
3230 else {
3231 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
3232 arg->name_len = delim - args[cur_arg];
3233 delim++;
3234 }
Christopher Fauletb0b42382017-02-23 22:41:09 +01003235 arg->expr = sample_parse_expr((char*[]){delim, NULL},
3236 &idx, file, linenum, &errmsg,
3237 &curproxy->conf.args);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003238 if (arg->expr == NULL) {
3239 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
3240 err_code |= ERR_ALERT | ERR_FATAL;
3241 free(arg->name);
3242 free(arg);
3243 goto out;
3244 }
3245 LIST_ADDQ(&curmsg->args, &arg->list);
3246 cur_arg++;
3247 }
3248 curproxy->conf.args.file = NULL;
3249 curproxy->conf.args.line = 0;
3250 }
3251 else if (!strcmp(args[0], "event")) {
3252 if (!*args[1]) {
3253 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
3254 err_code |= ERR_ALERT | ERR_ABORT;
3255 goto out;
3256 }
3257 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
3258 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
3259 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
3260 curmsg->event = SPOE_EV_ON_SERVER_SESS;
3261
3262 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
3263 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
3264 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
3265 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
3266 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
3267 curmsg->event = SPOE_EV_ON_TCP_RSP;
3268
3269 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
3270 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
3271 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
3272 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
3273 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
3274 curmsg->event = SPOE_EV_ON_HTTP_RSP;
3275 else {
3276 Alert("parsing [%s:%d] : unkown event '%s'.\n",
3277 file, linenum, args[1]);
3278 err_code |= ERR_ALERT | ERR_ABORT;
3279 goto out;
3280 }
3281 }
3282 else if (!*args[0]) {
3283 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
3284 file, linenum, args[0]);
3285 err_code |= ERR_ALERT | ERR_FATAL;
3286 goto out;
3287 }
3288 out:
3289 free(errmsg);
3290 return err_code;
3291}
3292
3293/* Return -1 on error, else 0 */
3294static int
3295parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
3296 struct flt_conf *fconf, char **err, void *private)
3297{
3298 struct list backup_sections;
3299 struct spoe_config *conf;
3300 struct spoe_message *msg, *msgback;
3301 struct spoe_msg_placeholder *mp, *mpback;
3302 char *file = NULL, *engine = NULL;
3303 int ret, pos = *cur_arg + 1;
3304
3305 conf = calloc(1, sizeof(*conf));
3306 if (conf == NULL) {
3307 memprintf(err, "%s: out of memory", args[*cur_arg]);
3308 goto error;
3309 }
3310 conf->proxy = px;
3311
3312 while (*args[pos]) {
3313 if (!strcmp(args[pos], "config")) {
3314 if (!*args[pos+1]) {
3315 memprintf(err, "'%s' : '%s' option without value",
3316 args[*cur_arg], args[pos]);
3317 goto error;
3318 }
3319 file = args[pos+1];
3320 pos += 2;
3321 }
3322 else if (!strcmp(args[pos], "engine")) {
3323 if (!*args[pos+1]) {
3324 memprintf(err, "'%s' : '%s' option without value",
3325 args[*cur_arg], args[pos]);
3326 goto error;
3327 }
3328 engine = args[pos+1];
3329 pos += 2;
3330 }
3331 else {
3332 memprintf(err, "unknown keyword '%s'", args[pos]);
3333 goto error;
3334 }
3335 }
3336 if (file == NULL) {
3337 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3338 goto error;
3339 }
3340
3341 /* backup sections and register SPOE sections */
3342 LIST_INIT(&backup_sections);
3343 cfg_backup_sections(&backup_sections);
3344 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
3345 cfg_register_section("spoe-message", cfg_parse_spoe_message);
3346
3347 /* Parse SPOE filter configuration file */
3348 curengine = engine;
3349 curproxy = px;
3350 curagent = NULL;
3351 curmsg = NULL;
3352 ret = readcfgfile(file);
3353 curproxy = NULL;
3354
3355 /* unregister SPOE sections and restore previous sections */
3356 cfg_unregister_sections();
3357 cfg_restore_sections(&backup_sections);
3358
3359 if (ret == -1) {
3360 memprintf(err, "Could not open configuration file %s : %s",
3361 file, strerror(errno));
3362 goto error;
3363 }
3364 if (ret & (ERR_ABORT|ERR_FATAL)) {
3365 memprintf(err, "Error(s) found in configuration file %s", file);
3366 goto error;
3367 }
3368
3369 /* Check SPOE agent */
3370 if (curagent == NULL) {
3371 memprintf(err, "No SPOE agent found in file %s", file);
3372 goto error;
3373 }
3374 if (curagent->b.name == NULL) {
3375 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3376 curagent->id, curagent->conf.file, curagent->conf.line);
3377 goto error;
3378 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003379 if (curagent->timeout.hello == TICK_ETERNITY ||
3380 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003381 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003382 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3383 " | While not properly invalid, you will certainly encounter various problems\n"
3384 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003385 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003386 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3387 }
3388 if (curagent->var_pfx == NULL) {
3389 char *tmp = curagent->id;
3390
3391 while (*tmp) {
3392 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3393 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3394 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3395 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3396 goto error;
3397 }
3398 tmp++;
3399 }
3400 curagent->var_pfx = strdup(curagent->id);
3401 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01003402 if (curagent->engine_id == NULL)
3403 curagent->engine_id = generate_pseudo_uuid();
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003404
3405 if (LIST_ISEMPTY(&curmps)) {
3406 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3407 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3408 goto finish;
3409 }
3410
3411 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3412 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3413 if (!strcmp(msg->id, mp->id)) {
3414 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3415 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3416 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3417 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3418 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3419 }
3420 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3421 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3422 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3423 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3424 px->id, msg->conf.file, msg->conf.line);
3425 goto next;
3426 }
3427 if (msg->event == SPOE_EV_NONE) {
3428 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3429 px->id, msg->conf.file, msg->conf.line);
3430 goto next;
3431 }
3432 msg->agent = curagent;
3433 LIST_DEL(&msg->list);
3434 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3435 goto next;
3436 }
3437 }
3438 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3439 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3440 goto error;
3441 next:
3442 continue;
3443 }
3444
3445 finish:
3446 conf->agent = curagent;
3447 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3448 LIST_DEL(&mp->list);
3449 release_spoe_msg_placeholder(mp);
3450 }
3451 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3452 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3453 px->id, msg->id, msg->conf.file, msg->conf.line);
3454 LIST_DEL(&msg->list);
3455 release_spoe_message(msg);
3456 }
3457
3458 *cur_arg = pos;
Christopher Faulet3b386a32017-02-23 10:17:15 +01003459 fconf->id = spoe_filter_id;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003460 fconf->ops = &spoe_ops;
3461 fconf->conf = conf;
3462 return 0;
3463
3464 error:
3465 release_spoe_agent(curagent);
3466 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3467 LIST_DEL(&mp->list);
3468 release_spoe_msg_placeholder(mp);
3469 }
3470 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3471 LIST_DEL(&msg->list);
3472 release_spoe_message(msg);
3473 }
3474 free(conf);
3475 return -1;
3476}
3477
3478
3479/* Declare the filter parser for "spoe" keyword */
3480static struct flt_kw_list flt_kws = { "SPOE", { }, {
3481 { "spoe", parse_spoe_flt, NULL },
3482 { NULL, NULL, NULL },
3483 }
3484};
3485
3486__attribute__((constructor))
3487static void __spoe_init(void)
3488{
3489 flt_register_keywords(&flt_kws);
3490
3491 LIST_INIT(&curmsgs);
3492 LIST_INIT(&curmps);
3493 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3494}
3495
3496__attribute__((destructor))
3497static void
3498__spoe_deinit(void)
3499{
3500 pool_destroy2(pool2_spoe_ctx);
3501}