blob: 6b8c799957e4afcbefdad423e44fbaf088f9723e [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
Christopher Faulet48026722016-11-16 15:01:12 +010033#include <proto/freq_ctr.h>
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020034#include <proto/frontend.h>
35#include <proto/log.h>
36#include <proto/proto_http.h>
37#include <proto/proxy.h>
38#include <proto/sample.h>
39#include <proto/session.h>
40#include <proto/signal.h>
41#include <proto/stream.h>
42#include <proto/stream_interface.h>
43#include <proto/task.h>
44#include <proto/vars.h>
45
46#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47#define SPOE_PRINTF(x...) fprintf(x)
48#else
49#define SPOE_PRINTF(x...)
50#endif
51
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020052/* Minimal size for a frame */
53#define MIN_FRAME_SIZE 256
54
Christopher Fauletea62c2a2016-11-14 10:54:21 +010055/* Flags set on the SPOE agent */
56#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
57
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020058/* Flags set on the SPOE context */
59#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
60#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
61#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
62#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
63
64#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
65
Christopher Fauleta1cda022016-12-21 08:58:06 +010066/* Flags set on the SPOE applet */
67#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */
68#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */
69#define SPOE_APPCTX_FL_PERSIST 0x00000004 /* Set if the applet is persistent */
70
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020071#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
72#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
73
74/* All possible states for a SPOE context */
75enum spoe_ctx_state {
76 SPOE_CTX_ST_NONE = 0,
77 SPOE_CTX_ST_READY,
78 SPOE_CTX_ST_SENDING_MSGS,
79 SPOE_CTX_ST_WAITING_ACK,
80 SPOE_CTX_ST_DONE,
81 SPOE_CTX_ST_ERROR,
82};
83
84/* All possible states for a SPOE applet */
85enum spoe_appctx_state {
86 SPOE_APPCTX_ST_CONNECT = 0,
87 SPOE_APPCTX_ST_CONNECTING,
Christopher Fauleta1cda022016-12-21 08:58:06 +010088 SPOE_APPCTX_ST_IDLE,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020089 SPOE_APPCTX_ST_PROCESSING,
90 SPOE_APPCTX_ST_DISCONNECT,
91 SPOE_APPCTX_ST_DISCONNECTING,
92 SPOE_APPCTX_ST_EXIT,
93 SPOE_APPCTX_ST_END,
94};
95
96/* All supported SPOE actions */
97enum spoe_action_type {
98 SPOE_ACT_T_SET_VAR = 1,
99 SPOE_ACT_T_UNSET_VAR,
100 SPOE_ACT_TYPES,
101};
102
103/* All supported SPOE events */
104enum spoe_event {
105 SPOE_EV_NONE = 0,
106
107 /* Request events */
108 SPOE_EV_ON_CLIENT_SESS = 1,
109 SPOE_EV_ON_TCP_REQ_FE,
110 SPOE_EV_ON_TCP_REQ_BE,
111 SPOE_EV_ON_HTTP_REQ_FE,
112 SPOE_EV_ON_HTTP_REQ_BE,
113
114 /* Response events */
115 SPOE_EV_ON_SERVER_SESS,
116 SPOE_EV_ON_TCP_RSP,
117 SPOE_EV_ON_HTTP_RSP,
118
119 SPOE_EV_EVENTS
120};
121
122/* Errors triggerd by SPOE applet */
123enum spoe_frame_error {
124 SPOE_FRM_ERR_NONE = 0,
125 SPOE_FRM_ERR_IO,
126 SPOE_FRM_ERR_TOUT,
127 SPOE_FRM_ERR_TOO_BIG,
128 SPOE_FRM_ERR_INVALID,
129 SPOE_FRM_ERR_NO_VSN,
130 SPOE_FRM_ERR_NO_FRAME_SIZE,
131 SPOE_FRM_ERR_NO_CAP,
132 SPOE_FRM_ERR_BAD_VSN,
133 SPOE_FRM_ERR_BAD_FRAME_SIZE,
134 SPOE_FRM_ERR_UNKNOWN = 99,
135 SPOE_FRM_ERRS,
136};
137
138/* Scopes used for variables set by agents. It is a way to be agnotic to vars
139 * scope. */
140enum spoe_vars_scope {
141 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
142 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
143 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
144 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
145 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
146};
147
148
149/* Describe an argument that will be linked to a message. It is a sample fetch,
150 * with an optional name. */
151struct spoe_arg {
152 char *name; /* Name of the argument, may be NULL */
153 unsigned int name_len; /* The name length, 0 if NULL */
154 struct sample_expr *expr; /* Sample expression */
155 struct list list; /* Used to chain SPOE args */
156};
157
158/* Used during the config parsing only because, when a SPOE agent section is
159 * parsed, messages can be undefined. */
160struct spoe_msg_placeholder {
161 char *id; /* SPOE message placeholder id */
162 struct list list; /* Use to chain SPOE message placeholders */
163};
164
165/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
166 * an argument list (see above) and it is linked to a specific event. */
167struct spoe_message {
Christopher Fauleta1cda022016-12-21 08:58:06 +0100168 char *id; /* SPOE message id */
169 unsigned int id_len; /* The message id length */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200170 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
171 struct {
Christopher Fauleta1cda022016-12-21 08:58:06 +0100172 char *file; /* file where the SPOE message appears */
173 int line; /* line where the SPOE message appears */
174 } conf; /* config information */
175 struct list args; /* Arguments added when the SPOE messages is sent */
176 struct list list; /* Used to chain SPOE messages */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200177
178 enum spoe_event event; /* SPOE_EV_* */
179};
180
181/* Describe a SPOE agent. */
182struct spoe_agent {
183 char *id; /* SPOE agent id (name) */
184 struct {
185 char *file; /* file where the SPOE agent appears */
186 int line; /* line where the SPOE agent appears */
187 } conf; /* config information */
188 union {
189 struct proxy *be; /* Backend used by this agent */
190 char *name; /* Backend name used during conf parsing */
191 } b;
192 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100193 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
194 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100195 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200196 } timeout;
197
Christopher Fauleta1cda022016-12-21 08:58:06 +0100198 /* Config info */
199 char *engine_id; /* engine-id string */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200200 char *var_pfx; /* Prefix used for vars set by the agent */
Christopher Faulet985532d2016-11-16 15:36:19 +0100201 char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
Christopher Fauletea62c2a2016-11-14 10:54:21 +0100202 unsigned int flags; /* SPOE_FL_* */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100203 unsigned int cps_max; /* Maximum # of connections per second */
204 unsigned int eps_max; /* Maximum # of errors per second */
205 unsigned int max_frame_size; /* Maximum frame size for this agent, before any negotiation */
206 unsigned int min_applets; /* Minimum # applets alive at a time */
207 unsigned int max_fpa; /* Maximum # of frames handled per applet at once */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200208
209 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
210 * for each supported events */
211
Christopher Fauleta1cda022016-12-21 08:58:06 +0100212 /* running info */
213 unsigned int applets_act; /* # of applets alive at a time */
214 unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */
215 unsigned int sending_rate; /* the global sending rate */
216
217 struct freq_ctr conn_per_sec; /* connections per second */
218 struct freq_ctr err_per_sec; /* connetion errors per second */
219
220 struct list applets; /* List of available SPOE applets */
221 struct list sending_queue; /* Queue of streams waiting to send data */
222 struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */
223
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200224};
225
226/* SPOE filter configuration */
227struct spoe_config {
228 struct proxy *proxy; /* Proxy owning the filter */
229 struct spoe_agent *agent; /* Agent used by this filter */
230 struct proxy agent_fe; /* Agent frontend */
231};
232
233/* SPOE context attached to a stream. It is the main structure that handles the
234 * processing offload */
235struct spoe_context {
236 struct filter *filter; /* The SPOE filter */
237 struct stream *strm; /* The stream that should be offloaded */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100238
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200239 struct list *messages; /* List of messages that will be sent during the stream processing */
240 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
Christopher Fauleta73e59b2016-12-09 17:30:18 +0100241 struct buffer_wait buffer_wait; /* position in the list of streams waiting for a buffer */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100242 struct list list;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200243
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200244 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
245 unsigned int flags; /* SPOE_CTX_FL_* */
246
247 unsigned int stream_id; /* stream_id and frame_id are used */
248 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100249 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200250};
251
Christopher Faulet42bfa462017-01-04 14:14:19 +0100252/* SPOE context inside a appctx */
253struct spoe_appctx {
254 struct appctx *owner; /* the owner */
255 struct task *task; /* task to handle applet timeouts */
256 struct spoe_agent *agent; /* agent on which the applet is attached */
257
258 unsigned int version; /* the negotiated version */
259 unsigned int max_frame_size; /* the negotiated max-frame-size value */
260 unsigned int flags; /* SPOE_APPCTX_FL_* */
261
262 struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */
263 struct list list; /* next spoe appctx for the same agent */
264};
265
266#define SPOE_APPCTX(appctx) ((struct spoe_appctx *)((appctx)->ctx.spoe.ptr))
267
Christopher Faulet3b386a32017-02-23 10:17:15 +0100268/* SPOE filter id. Used to identify SPOE filters */
269const char *spoe_filter_id = "SPOE filter";
270
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200271/* Set if the handle on SIGUSR1 is registered */
272static int sighandler_registered = 0;
273
274/* proxy used during the parsing */
275struct proxy *curproxy = NULL;
276
277/* The name of the SPOE engine, used during the parsing */
278char *curengine = NULL;
279
280/* SPOE agent used during the parsing */
281struct spoe_agent *curagent = NULL;
282
283/* SPOE message used during the parsing */
284struct spoe_message *curmsg = NULL;
285
286/* list of SPOE messages and placeholders used during the parsing */
287struct list curmsgs;
288struct list curmps;
289
Christopher Faulet42bfa462017-01-04 14:14:19 +0100290/* Pools used to allocate SPOE structs */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200291static struct pool_head *pool2_spoe_ctx = NULL;
Christopher Faulet42bfa462017-01-04 14:14:19 +0100292static struct pool_head *pool2_spoe_appctx = NULL;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200293
294/* Temporary variables used to ease error processing */
295int spoe_status_code = SPOE_FRM_ERR_NONE;
296char spoe_reason[256];
297
298struct flt_ops spoe_ops;
299
Christopher Fauleta1cda022016-12-21 08:58:06 +0100300static int queue_spoe_context(struct spoe_context *ctx);
301static int acquire_spoe_buffer(struct spoe_context *ctx);
302static void release_spoe_buffer(struct spoe_context *ctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200303
304/********************************************************************
305 * helper functions/globals
306 ********************************************************************/
307static void
308release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
309{
310 if (!mp)
311 return;
312 free(mp->id);
313 free(mp);
314}
315
316
317static void
318release_spoe_message(struct spoe_message *msg)
319{
320 struct spoe_arg *arg, *back;
321
322 if (!msg)
323 return;
324 free(msg->id);
325 free(msg->conf.file);
326 list_for_each_entry_safe(arg, back, &msg->args, list) {
327 release_sample_expr(arg->expr);
328 free(arg->name);
329 LIST_DEL(&arg->list);
330 free(arg);
331 }
332 free(msg);
333}
334
335static void
336release_spoe_agent(struct spoe_agent *agent)
337{
338 struct spoe_message *msg, *back;
339 int i;
340
341 if (!agent)
342 return;
343 free(agent->id);
344 free(agent->conf.file);
345 free(agent->var_pfx);
Christopher Fauleta1cda022016-12-21 08:58:06 +0100346 free(agent->engine_id);
Christopher Faulet985532d2016-11-16 15:36:19 +0100347 free(agent->var_on_error);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200348 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
349 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
350 LIST_DEL(&msg->list);
351 release_spoe_message(msg);
352 }
353 }
354 free(agent);
355}
356
357static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
358 [SPOE_FRM_ERR_NONE] = "normal",
359 [SPOE_FRM_ERR_IO] = "I/O error",
360 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
361 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
362 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
363 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
364 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
365 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
366 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
367 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
368 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
369};
370
371static const char *spoe_event_str[SPOE_EV_EVENTS] = {
372 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
373 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
374 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
375 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
376 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
377
378 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
379 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
380 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
381};
382
383
384#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
385
386static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
387 [SPOE_CTX_ST_NONE] = "NONE",
388 [SPOE_CTX_ST_READY] = "READY",
389 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
390 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
391 [SPOE_CTX_ST_DONE] = "DONE",
392 [SPOE_CTX_ST_ERROR] = "ERROR",
393};
394
395static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
396 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
397 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
Christopher Fauleta1cda022016-12-21 08:58:06 +0100398 [SPOE_APPCTX_ST_IDLE] = "IDLE",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200399 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
400 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
401 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
402 [SPOE_APPCTX_ST_EXIT] = "EXIT",
403 [SPOE_APPCTX_ST_END] = "END",
404};
405
406#endif
Christopher Fauleta1cda022016-12-21 08:58:06 +0100407
408static char *
409generate_pseudo_uuid()
410{
411 static int init = 0;
412
413 const char uuid_fmt[] = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx";
414 const char uuid_chr[] = "0123456789ABCDEF-";
415 char *uuid;
416 int i;
417
418 if ((uuid = calloc(1, sizeof(uuid_fmt))) == NULL)
419 return NULL;
420
421 if (!init) {
422 srand(now_ms);
423 init = 1;
424 }
425
426 for (i = 0; i < sizeof(uuid_fmt)-1; i++) {
427 int r = rand () % 16;
428
429 switch (uuid_fmt[i]) {
430 case 'x' : uuid[i] = uuid_chr[r]; break;
431 case 'y' : uuid[i] = uuid_chr[(r & 0x03) | 0x08]; break;
432 default : uuid[i] = uuid_fmt[i]; break;
433 }
434 }
435 return uuid;
436}
437
438static inline unsigned int
439min_applets_act(struct spoe_agent *agent)
440{
441 unsigned int nbsrv;
442
443 if (agent->min_applets)
444 return agent->min_applets;
445
446 nbsrv = (agent->b.be->srv_act ? agent->b.be->srv_act : agent->b.be->srv_bck);
447 return 2*nbsrv;
448}
449
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200450/********************************************************************
451 * Functions that encode/decode SPOE frames
452 ********************************************************************/
453/* Frame Types sent by HAProxy and by agents */
454enum spoe_frame_type {
455 /* Frames sent by HAProxy */
456 SPOE_FRM_T_HAPROXY_HELLO = 1,
457 SPOE_FRM_T_HAPROXY_DISCON,
458 SPOE_FRM_T_HAPROXY_NOTIFY,
459
460 /* Frames sent by the agents */
461 SPOE_FRM_T_AGENT_HELLO = 101,
462 SPOE_FRM_T_AGENT_DISCON,
463 SPOE_FRM_T_AGENT_ACK
464};
465
466/* All supported data types */
467enum spoe_data_type {
468 SPOE_DATA_T_NULL = 0,
469 SPOE_DATA_T_BOOL,
470 SPOE_DATA_T_INT32,
471 SPOE_DATA_T_UINT32,
472 SPOE_DATA_T_INT64,
473 SPOE_DATA_T_UINT64,
474 SPOE_DATA_T_IPV4,
475 SPOE_DATA_T_IPV6,
476 SPOE_DATA_T_STR,
477 SPOE_DATA_T_BIN,
478 SPOE_DATA_TYPES
479};
480
481/* Masks to get data type or flags value */
482#define SPOE_DATA_T_MASK 0x0F
483#define SPOE_DATA_FL_MASK 0xF0
484
485/* Flags to set Boolean values */
486#define SPOE_DATA_FL_FALSE 0x00
487#define SPOE_DATA_FL_TRUE 0x10
488
489/* Helper to get static string length, excluding the terminating null byte */
490#define SLEN(str) (sizeof(str)-1)
491
492/* Predefined key used in HELLO/DISCONNECT frames */
493#define SUPPORTED_VERSIONS_KEY "supported-versions"
494#define VERSION_KEY "version"
495#define MAX_FRAME_SIZE_KEY "max-frame-size"
496#define CAPABILITIES_KEY "capabilities"
Christopher Fauleta1cda022016-12-21 08:58:06 +0100497#define ENGINE_ID_KEY "engine-id"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100498#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200499#define STATUS_CODE_KEY "status-code"
500#define MSG_KEY "message"
501
502struct spoe_version {
503 char *str;
504 int min;
505 int max;
506};
507
508/* All supported versions */
509static struct spoe_version supported_versions[] = {
510 {"1.0", 1000, 1000},
511 {NULL, 0, 0}
512};
513
514/* Comma-separated list of supported versions */
515#define SUPPORTED_VERSIONS_VAL "1.0"
516
517/* Comma-separated list of supported capabilities (none for now) */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100518//#define CAPABILITIES_VAL ""
519#define CAPABILITIES_VAL "pipelining,async"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200520
521static int
522decode_spoe_version(const char *str, size_t len)
523{
524 char tmp[len+1], *start, *end;
525 double d;
526 int vsn = -1;
527
528 memset(tmp, 0, len+1);
529 memcpy(tmp, str, len);
530
531 start = tmp;
532 while (isspace(*start))
533 start++;
534
535 d = strtod(start, &end);
536 if (d == 0 || start == end)
537 goto out;
538
539 if (*end) {
540 while (isspace(*end))
541 end++;
542 if (*end)
543 goto out;
544 }
545 vsn = (int)(d * 1000);
546 out:
547 return vsn;
548}
549
550/* Encode a variable-length integer. This function never fails and returns the
551 * number of written bytes. */
552static int
553encode_spoe_varint(uint64_t i, char *buf)
554{
555 int idx;
556
557 if (i < 240) {
558 buf[0] = (unsigned char)i;
559 return 1;
560 }
561
562 buf[0] = (unsigned char)i | 240;
563 i = (i - 240) >> 4;
564 for (idx = 1; i >= 128; ++idx) {
565 buf[idx] = (unsigned char)i | 128;
566 i = (i - 128) >> 7;
567 }
568 buf[idx++] = (unsigned char)i;
569 return idx;
570}
571
572/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
573 * happens when the buffer's end in reached. On success, the number of read
574 * bytes is returned. */
575static int
576decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
577{
578 unsigned char *msg = (unsigned char *)buf;
579 int idx = 0;
580
581 if (msg > (unsigned char *)end)
582 return -1;
583
584 if (msg[0] < 240) {
585 *i = msg[0];
586 return 1;
587 }
588 *i = msg[0];
589 do {
590 ++idx;
591 if (msg+idx > (unsigned char *)end)
592 return -1;
593 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
594 } while (msg[idx] >= 128);
595 return (idx + 1);
596}
597
598/* Encode a string. The string will be prefix by its length, encoded as a
599 * variable-length integer. This function never fails and returns the number of
600 * written bytes. */
601static int
602encode_spoe_string(const char *str, size_t len, char *dst)
603{
604 int idx = 0;
605
606 if (!len) {
607 dst[0] = 0;
608 return 1;
609 }
610
611 idx += encode_spoe_varint(len, dst);
612 memcpy(dst+idx, str, len);
613 return (idx + len);
614}
615
616/* Decode a string. Its length is decoded first as a variable-length integer. If
617 * it succeeds, and if the string length is valid, the begin of the string is
618 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
619 * read is returned. If an error occurred, -1 is returned and <*str> remains
620 * NULL. */
621static int
622decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
623{
624 int i, idx = 0;
625
626 *str = NULL;
627 *len = 0;
628
629 if ((i = decode_spoe_varint(buf, end, len)) == -1)
630 goto error;
631 idx += i;
632 if (buf + idx + *len > end)
633 goto error;
634
635 *str = buf+idx;
636 return (idx + *len);
637
638 error:
639 return -1;
640}
641
642/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
643 * of bytes read is returned. A types data is composed of a type (1 byte) and
644 * corresponding data:
645 * - boolean: non additional data (0 bytes)
646 * - integers: a variable-length integer (see decode_spoe_varint)
647 * - ipv4: 4 bytes
648 * - ipv6: 16 bytes
649 * - binary and string: a buffer prefixed by its size, a variable-length
650 * integer (see decode_spoe_string) */
651static int
652skip_spoe_data(char *frame, char *end)
653{
654 uint64_t sz = 0;
655 int i, idx = 0;
656
657 if (frame > end)
658 return -1;
659
660 switch (frame[idx++] & SPOE_DATA_T_MASK) {
661 case SPOE_DATA_T_BOOL:
662 break;
663 case SPOE_DATA_T_INT32:
664 case SPOE_DATA_T_INT64:
665 case SPOE_DATA_T_UINT32:
666 case SPOE_DATA_T_UINT64:
667 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
668 return -1;
669 idx += i;
670 break;
671 case SPOE_DATA_T_IPV4:
672 idx += 4;
673 break;
674 case SPOE_DATA_T_IPV6:
675 idx += 16;
676 break;
677 case SPOE_DATA_T_STR:
678 case SPOE_DATA_T_BIN:
679 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
680 return -1;
681 idx += i + sz;
682 break;
683 }
684
685 if (frame+idx > end)
686 return -1;
687 return idx;
688}
689
690/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
691 * number of read bytes is returned. See skip_spoe_data for details. */
692static int
693decode_spoe_data(char *frame, char *end, struct sample *smp)
694{
695 uint64_t sz = 0;
696 int type, i, idx = 0;
697
698 if (frame > end)
699 return -1;
700
701 type = frame[idx++];
702 switch (type & SPOE_DATA_T_MASK) {
703 case SPOE_DATA_T_BOOL:
704 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
705 smp->data.type = SMP_T_BOOL;
706 break;
707 case SPOE_DATA_T_INT32:
708 case SPOE_DATA_T_INT64:
709 case SPOE_DATA_T_UINT32:
710 case SPOE_DATA_T_UINT64:
711 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
712 return -1;
713 idx += i;
714 smp->data.type = SMP_T_SINT;
715 break;
716 case SPOE_DATA_T_IPV4:
717 if (frame+idx+4 > end)
718 return -1;
719 memcpy(&smp->data.u.ipv4, frame+idx, 4);
720 smp->data.type = SMP_T_IPV4;
721 idx += 4;
722 break;
723 case SPOE_DATA_T_IPV6:
724 if (frame+idx+16 > end)
725 return -1;
726 memcpy(&smp->data.u.ipv6, frame+idx, 16);
727 smp->data.type = SMP_T_IPV6;
728 idx += 16;
729 break;
730 case SPOE_DATA_T_STR:
731 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
732 return -1;
733 idx += i;
734 if (frame+idx+sz > end)
735 return -1;
736 smp->data.u.str.str = frame+idx;
737 smp->data.u.str.len = sz;
738 smp->data.type = SMP_T_STR;
739 idx += sz;
740 break;
741 case SPOE_DATA_T_BIN:
742 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
743 return -1;
744 idx += i;
745 if (frame+idx+sz > end)
746 return -1;
747 smp->data.u.str.str = frame+idx;
748 smp->data.u.str.len = sz;
749 smp->data.type = SMP_T_BIN;
750 idx += sz;
751 break;
752 }
753
754 if (frame+idx > end)
755 return -1;
756 return idx;
757}
758
759/* Skip an action in a frame received from an agent. If an error occurred, -1 is
760 * returned, otherwise the number of read bytes is returned. An action is
761 * composed of the action type followed by a typed data. */
762static int
763skip_spoe_action(char *frame, char *end)
764{
765 int n, i, idx = 0;
766
767 if (frame+2 > end)
768 return -1;
769
770 idx++; /* Skip the action type */
771 n = frame[idx++];
772 while (n-- > 0) {
773 if ((i = skip_spoe_data(frame+idx, end)) == -1)
774 return -1;
775 idx += i;
776 }
777
778 if (frame+idx > end)
779 return -1;
780 return idx;
781}
782
783/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
784 * success, 0 if the frame can be ignored and -1 if an error occurred. */
785static int
786prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
787{
Christopher Faulet42bfa462017-01-04 14:14:19 +0100788 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200789 int idx = 0;
790 size_t max = (7 /* TYPE + METADATA */
791 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
792 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
Christopher Fauleta1cda022016-12-21 08:58:06 +0100793 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)
794 + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200795
796 if (size < max)
797 return -1;
798
799 /* Frame type */
800 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
801
802 /* No flags for now */
803 memset(frame+idx, 0, 4);
804 idx += 4;
805
806 /* No stream-id and frame-id for HELLO frames */
807 frame[idx++] = 0;
808 frame[idx++] = 0;
809
810 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
811 * and "capabilities" */
812
813 /* "supported-versions" K/V item */
814 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
815 frame[idx++] = SPOE_DATA_T_STR;
816 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
817
818 /* "max-fram-size" K/V item */
819 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
820 frame[idx++] = SPOE_DATA_T_UINT32;
Christopher Faulet42bfa462017-01-04 14:14:19 +0100821 idx += encode_spoe_varint(SPOE_APPCTX(appctx)->max_frame_size, frame+idx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200822
823 /* "capabilities" K/V item */
824 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
825 frame[idx++] = SPOE_DATA_T_STR;
826 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
827
Christopher Fauleta1cda022016-12-21 08:58:06 +0100828 /* "engine-id" K/V item */
829 if (agent != NULL && agent->engine_id != NULL) {
830 idx += encode_spoe_string(ENGINE_ID_KEY, SLEN(ENGINE_ID_KEY), frame+idx);
831 frame[idx++] = SPOE_DATA_T_STR;
832 idx += encode_spoe_string(agent->engine_id, strlen(agent->engine_id), frame+idx);
833 }
834
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200835 return idx;
836}
837
838/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
839 * size on success, 0 if the frame can be ignored and -1 if an error
840 * occurred. */
841static int
842prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
843{
844 const char *reason;
845 int rlen, idx = 0;
846 size_t max = (7 /* TYPE + METADATA */
847 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
848 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
849
850 if (size < max)
851 return -1;
852
853 /* Get the message corresponding to the status code */
854 if (spoe_status_code >= SPOE_FRM_ERRS)
855 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
856 reason = spoe_frm_err_reasons[spoe_status_code];
857 rlen = strlen(reason);
858
859 /* Frame type */
860 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
861
862 /* No flags for now */
863 memset(frame+idx, 0, 4);
864 idx += 4;
865
866 /* No stream-id and frame-id for DISCONNECT frames */
867 frame[idx++] = 0;
868 frame[idx++] = 0;
869
870 /* There are 2 mandatory items: "status-code" and "message" */
871
872 /* "status-code" K/V item */
873 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
874 frame[idx++] = SPOE_DATA_T_UINT32;
875 idx += encode_spoe_varint(spoe_status_code, frame+idx);
876
877 /* "message" K/V item */
878 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
879 frame[idx++] = SPOE_DATA_T_STR;
880 idx += encode_spoe_string(reason, rlen, frame+idx);
881
882 return idx;
883}
884
885/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
886 * success, 0 if the frame can be ignored and -1 if an error occurred. */
887static int
Christopher Fauleta1cda022016-12-21 08:58:06 +0100888prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
889 char *frame, size_t size)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200890{
Christopher Fauleta1cda022016-12-21 08:58:06 +0100891 int idx = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200892
Christopher Faulet42bfa462017-01-04 14:14:19 +0100893 if (size < SPOE_APPCTX(appctx)->max_frame_size)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200894 return -1;
895
896 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
897
898 /* No flags for now */
899 memset(frame+idx, 0, 4);
900 idx += 4;
901
902 /* Set stream-id and frame-id */
903 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
904 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
905
906 /* Copy encoded messages */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100907 if (idx + ctx->buffer->i > size)
908 return 0;
909
910 /* Copy encoded messages */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200911 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
912 idx += ctx->buffer->i;
913
914 return idx;
915}
916
917/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
918 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
919static int
920handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
921{
Christopher Fauleta1cda022016-12-21 08:58:06 +0100922 int vsn, max_frame_size, flags;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200923 int i, idx = 0;
924 size_t min_size = (7 /* TYPE + METADATA */
925 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
926 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
927 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
928
929 /* Check frame type */
930 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
931 return 0;
932
933 if (size < min_size) {
934 spoe_status_code = SPOE_FRM_ERR_INVALID;
935 return -1;
936 }
937
938 /* Skip flags: fragmentation is not supported for now */
939 idx += 4;
940
941 /* stream-id and frame-id must be cleared */
942 if (frame[idx] != 0 || frame[idx+1] != 0) {
943 spoe_status_code = SPOE_FRM_ERR_INVALID;
944 return -1;
945 }
946 idx += 2;
947
948 /* There are 3 mandatory items: "version", "max-frame-size" and
949 * "capabilities" */
950
951 /* Loop on K/V items */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100952 vsn = max_frame_size = flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200953 while (idx < size) {
954 char *str;
955 uint64_t sz;
956
957 /* Decode the item key */
958 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
959 if (str == NULL) {
960 spoe_status_code = SPOE_FRM_ERR_INVALID;
961 return -1;
962 }
963 /* Check "version" K/V item */
964 if (!memcmp(str, VERSION_KEY, sz)) {
965 /* The value must be a string */
966 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
967 spoe_status_code = SPOE_FRM_ERR_INVALID;
968 return -1;
969 }
970 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
971 if (str == NULL) {
972 spoe_status_code = SPOE_FRM_ERR_INVALID;
973 return -1;
974 }
975
976 vsn = decode_spoe_version(str, sz);
977 if (vsn == -1) {
978 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
979 return -1;
980 }
981 for (i = 0; supported_versions[i].str != NULL; ++i) {
982 if (vsn >= supported_versions[i].min &&
983 vsn <= supported_versions[i].max)
984 break;
985 }
986 if (supported_versions[i].str == NULL) {
987 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
988 return -1;
989 }
990 }
991 /* Check "max-frame-size" K/V item */
992 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
993 int type;
994
995 /* The value must be integer */
996 type = frame[idx++];
997 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
998 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
999 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1000 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1001 spoe_status_code = SPOE_FRM_ERR_INVALID;
1002 return -1;
1003 }
1004 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1005 spoe_status_code = SPOE_FRM_ERR_INVALID;
1006 return -1;
1007 }
1008 idx += i;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001009 if (sz < MIN_FRAME_SIZE || sz > SPOE_APPCTX(appctx)->max_frame_size) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001010 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
1011 return -1;
1012 }
1013 max_frame_size = sz;
1014 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001015 /* Check "capabilities" K/V item */
1016 else if (!memcmp(str, CAPABILITIES_KEY, sz)) {
1017 int i;
1018
1019 /* The value must be a string */
1020 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1021 spoe_status_code = SPOE_FRM_ERR_INVALID;
1022 return -1;
1023 }
1024 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1025 if (str == NULL)
1026 continue;
1027
1028 i = 0;
1029 while (i < sz) {
1030 char *delim;
1031
1032 /* Skip leading spaces */
1033 for (; isspace(str[i]) && i < sz; i++);
1034
1035 if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) {
1036 i += 10;
1037 if (sz == i || isspace(str[i]) || str[i] == ',')
1038 flags |= SPOE_APPCTX_FL_PIPELINING;
1039 }
1040 else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) {
1041 i += 5;
1042 if (sz == i || isspace(str[i]) || str[i] == ',')
1043 flags |= SPOE_APPCTX_FL_ASYNC;
1044 }
1045
1046 if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
1047 break;
1048 i = (delim - str) + 1;
1049 }
1050 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001051 else {
1052 /* Silently ignore unknown item */
1053 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1054 spoe_status_code = SPOE_FRM_ERR_INVALID;
1055 return -1;
1056 }
1057 idx += i;
1058 }
1059 }
1060
1061 /* Final checks */
1062 if (!vsn) {
1063 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
1064 return -1;
1065 }
1066 if (!max_frame_size) {
1067 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
1068 return -1;
1069 }
1070
Christopher Faulet42bfa462017-01-04 14:14:19 +01001071 SPOE_APPCTX(appctx)->version = (unsigned int)vsn;
1072 SPOE_APPCTX(appctx)->max_frame_size = (unsigned int)max_frame_size;
1073 SPOE_APPCTX(appctx)->flags |= flags;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001074 return idx;
1075}
1076
1077/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
1078 * bytes on success, 0 if the frame can be ignored and -1 if an error
1079 * occurred. */
1080static int
1081handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
1082{
1083 int i, idx = 0;
1084 size_t min_size = (7 /* TYPE + METADATA */
1085 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
1086 + 1 + SLEN(MSG_KEY) + 1 + 1);
1087
1088 /* Check frame type */
1089 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
1090 return 0;
1091
1092 if (size < min_size) {
1093 spoe_status_code = SPOE_FRM_ERR_INVALID;
1094 return -1;
1095 }
1096
1097 /* Skip flags: fragmentation is not supported for now */
1098 idx += 4;
1099
1100 /* stream-id and frame-id must be cleared */
1101 if (frame[idx] != 0 || frame[idx+1] != 0) {
1102 spoe_status_code = SPOE_FRM_ERR_INVALID;
1103 return -1;
1104 }
1105 idx += 2;
1106
1107 /* There are 2 mandatory items: "status-code" and "message" */
1108
1109 /* Loop on K/V items */
1110 while (idx < size) {
1111 char *str;
1112 uint64_t sz;
1113
1114 /* Decode the item key */
1115 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1116 if (str == NULL) {
1117 spoe_status_code = SPOE_FRM_ERR_INVALID;
1118 return -1;
1119 }
1120
1121 /* Check "status-code" K/V item */
1122 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
1123 int type;
1124
1125 /* The value must be an integer */
1126 type = frame[idx++];
1127 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
1128 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
1129 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1130 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1131 spoe_status_code = SPOE_FRM_ERR_INVALID;
1132 return -1;
1133 }
1134 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1135 spoe_status_code = SPOE_FRM_ERR_INVALID;
1136 return -1;
1137 }
1138 idx += i;
1139 spoe_status_code = sz;
1140 }
1141
1142 /* Check "message" K/V item */
1143 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1144 /* The value must be a string */
1145 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1146 spoe_status_code = SPOE_FRM_ERR_INVALID;
1147 return -1;
1148 }
1149 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1150 if (str == NULL || sz > 255) {
1151 spoe_status_code = SPOE_FRM_ERR_INVALID;
1152 return -1;
1153 }
1154 memcpy(spoe_reason, str, sz);
1155 spoe_reason[sz] = 0;
1156 }
1157 else {
1158 /* Silently ignore unknown item */
1159 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1160 spoe_status_code = SPOE_FRM_ERR_INVALID;
1161 return -1;
1162 }
1163 idx += i;
1164 }
1165 }
1166
1167 return idx;
1168}
1169
1170
Christopher Fauleta1cda022016-12-21 08:58:06 +01001171/* Decode ACK frame sent by an agent. It returns the number of read bytes on
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001172 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1173static int
1174handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1175{
Christopher Faulet42bfa462017-01-04 14:14:19 +01001176 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001177 struct spoe_context *ctx, *back;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001178 uint64_t stream_id, frame_id;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001179 int i, idx = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001180 size_t min_size = (7 /* TYPE + METADATA */);
1181
1182 /* Check frame type */
1183 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1184 return 0;
1185
1186 if (size < min_size) {
1187 spoe_status_code = SPOE_FRM_ERR_INVALID;
1188 return -1;
1189 }
1190
1191 /* Skip flags: fragmentation is not supported for now */
1192 idx += 4;
1193
1194 /* Get the stream-id and the frame-id */
Christopher Fauleta1cda022016-12-21 08:58:06 +01001195 if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1)
1196 return 0;
1197 idx += i;
1198 if ((i= decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001199 return 0;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001200 idx += i;
1201
Christopher Faulet42bfa462017-01-04 14:14:19 +01001202 if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001203 list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
1204 if (ctx->stream_id == (unsigned int)stream_id &&
1205 ctx->frame_id == (unsigned int)frame_id)
1206 goto found;
1207 }
1208 }
1209 else {
Christopher Faulet42bfa462017-01-04 14:14:19 +01001210 list_for_each_entry_safe(ctx, back, &SPOE_APPCTX(appctx)->waiting_queue, list) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001211 if (ctx->stream_id == (unsigned int)stream_id &&
1212 ctx->frame_id == (unsigned int)frame_id)
1213 goto found;
1214 }
1215 }
1216
1217 /* No Stream found, ignore the frame */
1218 return 0;
1219
1220 found:
1221 if (acquire_spoe_buffer(ctx) <= 0)
1222 return 1; /* Retry later */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001223
1224 /* Copy encoded actions */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001225 memcpy(ctx->buffer->p, frame+idx, size-idx);
1226 ctx->buffer->i = size-idx;
1227
Christopher Fauleta1cda022016-12-21 08:58:06 +01001228 /* Notify the stream */
1229 LIST_DEL(&ctx->list);
1230 LIST_INIT(&ctx->list);
1231 ctx->state = SPOE_CTX_ST_DONE;
1232 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1233
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001234 return idx;
1235}
1236
Christopher Fauletba7bc162016-11-07 21:07:38 +01001237/* This function is used in cfgparse.c and declared in proto/checks.h. It
1238 * prepare the request to send to agents during a healthcheck. It returns 0 on
1239 * success and -1 if an error occurred. */
1240int
1241prepare_spoe_healthcheck_request(char **req, int *len)
1242{
Christopher Faulet42bfa462017-01-04 14:14:19 +01001243 struct appctx appctx;
1244 struct spoe_appctx spoe_appctx;
1245 char *frame, buf[global.tune.bufsize];
1246 unsigned int framesz;
1247 int idx;
Christopher Fauletba7bc162016-11-07 21:07:38 +01001248
Christopher Faulet42bfa462017-01-04 14:14:19 +01001249 memset(&appctx, 0, sizeof(appctx));
1250 memset(&spoe_appctx, 0, sizeof(spoe_appctx));
Christopher Fauletba7bc162016-11-07 21:07:38 +01001251 memset(buf, 0, sizeof(buf));
Christopher Faulet42bfa462017-01-04 14:14:19 +01001252
1253 appctx.ctx.spoe.ptr = &spoe_appctx;
1254 SPOE_APPCTX(&appctx)->max_frame_size = global.tune.bufsize-4;
Christopher Fauletba7bc162016-11-07 21:07:38 +01001255
1256 frame = buf+4;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001257 idx = prepare_spoe_hahello_frame(&appctx, frame, global.tune.bufsize-4);
Christopher Fauletba7bc162016-11-07 21:07:38 +01001258 if (idx <= 0)
1259 return -1;
1260 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1261 return -1;
1262
1263 /* "healthcheck" K/V item */
1264 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1265 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1266
1267 framesz = htonl(idx);
1268 memcpy(buf, (char *)&framesz, 4);
1269
1270 if ((*req = malloc(idx+4)) == NULL)
1271 return -1;
1272 memcpy(*req, buf, idx+4);
1273 *len = idx+4;
1274 return 0;
1275}
1276
1277/* This function is used in checks.c and declared in proto/checks.h. It decode
1278 * the response received from an agent during a healthcheck. It returns 0 on
1279 * success and -1 if an error occurred. */
1280int
1281handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1282{
Christopher Faulet42bfa462017-01-04 14:14:19 +01001283 struct appctx appctx;
1284 struct spoe_appctx spoe_appctx;
1285 int r;
Christopher Fauletba7bc162016-11-07 21:07:38 +01001286
Christopher Faulet42bfa462017-01-04 14:14:19 +01001287 memset(&appctx, 0, sizeof(appctx));
1288 memset(&spoe_appctx, 0, sizeof(spoe_appctx));
Christopher Fauletba7bc162016-11-07 21:07:38 +01001289
Christopher Faulet42bfa462017-01-04 14:14:19 +01001290 appctx.ctx.spoe.ptr = &spoe_appctx;
1291 SPOE_APPCTX(&appctx)->max_frame_size = global.tune.bufsize-4;
1292
1293 if (handle_spoe_agentdiscon_frame(&appctx, frame, size) != 0)
Christopher Fauletba7bc162016-11-07 21:07:38 +01001294 goto error;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001295 if ((r = handle_spoe_agenthello_frame(&appctx, frame, size)) <= 0) {
Christopher Fauletba7bc162016-11-07 21:07:38 +01001296 if (r == 0)
1297 spoe_status_code = SPOE_FRM_ERR_INVALID;
1298 goto error;
1299 }
1300
1301 return 0;
1302
1303 error:
1304 if (spoe_status_code >= SPOE_FRM_ERRS)
1305 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1306 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1307 return -1;
1308}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001309
Christopher Fauleta1cda022016-12-21 08:58:06 +01001310/* Send a SPOE frame to an agent. It returns -1 when an error occurred, 0 when
1311 * the frame can be ignored, 1 to retry later, and the frame legnth on
1312 * success. */
1313static int
1314send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
1315{
1316 struct stream_interface *si = appctx->owner;
1317 int ret;
1318 uint32_t netint;
1319
1320 if (si_ic(si)->buf == &buf_empty)
1321 return 1;
1322
1323 netint = htonl(framesz);
1324 memcpy(buf, (char *)&netint, 4);
1325 ret = bi_putblk(si_ic(si), buf, framesz+4);
1326
1327 if (ret <= 0) {
1328 if (ret == -1)
1329 return 1; /* retry */
1330 return -1; /* error */
1331 }
1332 return framesz;
1333}
1334
1335/* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0
1336 * when the frame can be ignored, 1 to retry later and the frame length on
1337 * success. */
1338static int
1339recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
1340{
1341 struct stream_interface *si = appctx->owner;
1342 int ret;
1343 uint32_t netint;
1344
1345 if (si_oc(si)->buf == &buf_empty)
1346 return 1;
1347
1348 ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0);
1349 if (ret > 0) {
1350 framesz = ntohl(netint);
Christopher Faulet42bfa462017-01-04 14:14:19 +01001351 if (framesz > SPOE_APPCTX(appctx)->max_frame_size) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001352 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1353 return -1;
1354 }
1355 ret = bo_getblk(si_oc(si), trash.str, framesz, 4);
1356 }
1357 if (ret <= 0) {
1358 if (ret == 0)
1359 return 1; /* retry */
1360 spoe_status_code = SPOE_FRM_ERR_IO;
1361 return -1; /* error */
1362 }
1363 return framesz;
1364}
1365
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001366/********************************************************************
1367 * Functions that manage the SPOE applet
1368 ********************************************************************/
1369/* Callback function that catches applet timeouts. If a timeout occurred, we set
1370 * <appctx->st1> flag and the SPOE applet is woken up. */
1371static struct task *
1372process_spoe_applet(struct task * task)
1373{
1374 struct appctx *appctx = task->context;
1375
1376 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1377 if (tick_is_expired(task->expire, now_ms)) {
1378 task->expire = TICK_ETERNITY;
1379 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1380 }
1381 si_applet_want_get(appctx->owner);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001382 si_applet_want_put(appctx->owner);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001383 appctx_wakeup(appctx);
1384 return task;
1385}
1386
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001387/* Callback function that releases a SPOE applet. This happens when the
1388 * connection with the agent is closed. */
1389static void
1390release_spoe_applet(struct appctx *appctx)
1391{
1392 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001393 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001394 struct spoe_context *ctx, *back;
1395
1396 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1397 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1398 __FUNCTION__, appctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001399
Christopher Fauleta1cda022016-12-21 08:58:06 +01001400 agent->applets_act--;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001401 if (!LIST_ISEMPTY(&SPOE_APPCTX(appctx)->list)) {
1402 LIST_DEL(&SPOE_APPCTX(appctx)->list);
1403 LIST_INIT(&SPOE_APPCTX(appctx)->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001404 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001405
1406 if (appctx->st0 != SPOE_APPCTX_ST_END) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001407 if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
1408 agent->applets_idle--;
1409
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001410 si_shutw(si);
1411 si_shutr(si);
1412 si_ic(si)->flags |= CF_READ_NULL;
1413 appctx->st0 = SPOE_APPCTX_ST_END;
1414 }
1415
Christopher Faulet42bfa462017-01-04 14:14:19 +01001416 if (SPOE_APPCTX(appctx)->task) {
1417 task_delete(SPOE_APPCTX(appctx)->task);
1418 task_free(SPOE_APPCTX(appctx)->task);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001419 }
1420
Christopher Faulet42bfa462017-01-04 14:14:19 +01001421 list_for_each_entry_safe(ctx, back, &SPOE_APPCTX(appctx)->waiting_queue, list) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001422 LIST_DEL(&ctx->list);
1423 LIST_INIT(&ctx->list);
1424 ctx->state = SPOE_CTX_ST_ERROR;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001425 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001426 }
1427
Christopher Faulet42bfa462017-01-04 14:14:19 +01001428 pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
1429
Christopher Fauleta1cda022016-12-21 08:58:06 +01001430 if (!LIST_ISEMPTY(&agent->applets))
1431 return;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001432
Christopher Fauleta1cda022016-12-21 08:58:06 +01001433 list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
1434 LIST_DEL(&ctx->list);
1435 LIST_INIT(&ctx->list);
1436 ctx->state = SPOE_CTX_ST_ERROR;
1437 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001438 }
1439
Christopher Fauleta1cda022016-12-21 08:58:06 +01001440 list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
1441 LIST_DEL(&ctx->list);
1442 LIST_INIT(&ctx->list);
1443 ctx->state = SPOE_CTX_ST_ERROR;
1444 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1445 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001446}
1447
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001448static int
Christopher Fauleta1cda022016-12-21 08:58:06 +01001449handle_connect_spoe_applet(struct appctx *appctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001450{
Christopher Fauleta1cda022016-12-21 08:58:06 +01001451 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001452 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001453 char *frame = trash.str;
1454 int ret;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001455
Christopher Fauleta1cda022016-12-21 08:58:06 +01001456 if (si->state <= SI_ST_CON) {
1457 si_applet_want_put(si);
1458 task_wakeup(si_strm(si)->task, TASK_WOKEN_MSG);
1459 goto stop;
1460 }
1461 if (si->state != SI_ST_EST)
1462 goto exit;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001463
Christopher Fauleta1cda022016-12-21 08:58:06 +01001464 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1465 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
1466 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
1467 goto exit;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001468 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001469
Christopher Faulet42bfa462017-01-04 14:14:19 +01001470 if (SPOE_APPCTX(appctx)->task->expire == TICK_ETERNITY)
1471 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001472
Christopher Faulet42bfa462017-01-04 14:14:19 +01001473 ret = prepare_spoe_hahello_frame(appctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001474 if (ret > 1)
1475 ret = send_spoe_frame(appctx, frame, ret);
1476
1477 switch (ret) {
1478 case -1: /* error */
1479 goto exit;
1480
1481 case 0: /* ignore => an error, cannot be ignored */
1482 goto exit;
1483
1484 case 1: /* retry later */
1485 si_applet_cant_put(si);
1486 goto stop;
1487
1488 default: /* CONNECT frame successfully sent */
1489 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1490 goto next;
1491 }
1492
1493 next:
1494 return 0;
1495 stop:
1496 return 1;
1497 exit:
1498 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1499 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001500}
1501
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001502static int
Christopher Fauleta1cda022016-12-21 08:58:06 +01001503handle_connecting_spoe_applet(struct appctx *appctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001504{
Christopher Fauleta1cda022016-12-21 08:58:06 +01001505 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001506 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001507 char *frame = trash.str;
1508 int ret, framesz = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001509
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001510
Christopher Fauleta1cda022016-12-21 08:58:06 +01001511 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
1512 goto exit;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001513
Christopher Fauleta1cda022016-12-21 08:58:06 +01001514 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1515 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
1516 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
1517 goto exit;
1518 }
1519
Christopher Faulet42bfa462017-01-04 14:14:19 +01001520 ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001521 if (ret > 1) {
1522 if (*frame == SPOE_FRM_T_AGENT_DISCON) {
1523 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1524 goto next;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001525 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001526 framesz = ret;
1527 ret = handle_spoe_agenthello_frame(appctx, frame, framesz);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001528 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001529
Christopher Fauleta1cda022016-12-21 08:58:06 +01001530 switch (ret) {
1531 case -1: /* error */
1532 if (framesz)
1533 bo_skip(si_oc(si), framesz+4);
1534 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1535 goto next;
1536
1537 case 0: /* ignore */
1538 if (framesz)
1539 bo_skip(si_oc(si), framesz+4);
1540 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1541 goto next;
1542
1543 case 1: /* retry later */
1544 goto stop;
1545
1546 default:
1547 /* hello handshake is finished, set the idle timeout,
1548 * Add the appctx in the agent cache, decrease the
1549 * number of new applets and wake up waiting streams. */
1550 if (framesz)
1551 bo_skip(si_oc(si), framesz+4);
1552 agent->applets_idle++;
1553 appctx->st0 = SPOE_APPCTX_ST_IDLE;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001554 LIST_DEL(&SPOE_APPCTX(appctx)->list);
1555 LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001556 goto next;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001557 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001558
Christopher Fauleta1cda022016-12-21 08:58:06 +01001559 next:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001560 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001561 return 0;
1562 stop:
1563 return 1;
1564 exit:
1565 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1566 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001567}
1568
Christopher Fauleta1cda022016-12-21 08:58:06 +01001569static int
1570handle_processing_spoe_applet(struct appctx *appctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001571{
1572 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001573 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001574 struct spoe_context *ctx;
1575 char *frame = trash.str;
1576 unsigned int fpa = 0;
1577 int ret, framesz = 0, skip_sending = 0, skip_receiving = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001578
Christopher Fauleta1cda022016-12-21 08:58:06 +01001579 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
1580 goto exit;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001581
Christopher Fauleta1cda022016-12-21 08:58:06 +01001582 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1583 spoe_status_code = SPOE_FRM_ERR_TOUT;
1584 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1585 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1586 goto next;
1587 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001588
Christopher Fauleta1cda022016-12-21 08:58:06 +01001589 process:
1590 if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
1591 goto stop;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001592
Christopher Fauleta1cda022016-12-21 08:58:06 +01001593 /* Frames must be handled synchronously and a the applet is waiting for
1594 * a ACK frame */
Christopher Faulet42bfa462017-01-04 14:14:19 +01001595 if (!(SPOE_APPCTX(appctx)->flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) &&
1596 !LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001597 if (skip_receiving)
1598 goto stop;
1599 goto recv_frame;
1600 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001601
Christopher Fauleta1cda022016-12-21 08:58:06 +01001602 if (LIST_ISEMPTY(&agent->sending_queue) || skip_sending) {
1603 skip_sending = 1;
1604 goto recv_frame;
1605 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001606
Christopher Fauleta1cda022016-12-21 08:58:06 +01001607 ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
Christopher Faulet42bfa462017-01-04 14:14:19 +01001608 ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001609 if (ret > 1)
1610 ret = send_spoe_frame(appctx, frame, ret);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001611
Christopher Fauleta1cda022016-12-21 08:58:06 +01001612 switch (ret) {
1613 case -1: /* error */
1614 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1615 goto next;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001616
Christopher Fauleta1cda022016-12-21 08:58:06 +01001617 case 0: /* ignore */
1618 agent->sending_rate++;
1619 ctx->state = SPOE_CTX_ST_ERROR;
1620 release_spoe_buffer(ctx);
1621 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1622 LIST_DEL(&ctx->list);
1623 LIST_INIT(&ctx->list);
1624 fpa++;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001625 break;
1626
Christopher Fauleta1cda022016-12-21 08:58:06 +01001627 case 1: /* retry */
1628 si_applet_cant_put(si);
1629 skip_sending = 1;
1630 break;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001631
Christopher Fauleta1cda022016-12-21 08:58:06 +01001632 default:
1633 agent->sending_rate++;
1634 ctx->state = SPOE_CTX_ST_WAITING_ACK;
1635 release_spoe_buffer(ctx);
1636 LIST_DEL(&ctx->list);
1637 LIST_INIT(&ctx->list);
Christopher Faulet42bfa462017-01-04 14:14:19 +01001638 if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC)
Christopher Fauleta1cda022016-12-21 08:58:06 +01001639 LIST_ADDQ(&agent->waiting_queue, &ctx->list);
1640 else
Christopher Faulet42bfa462017-01-04 14:14:19 +01001641 LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001642 fpa++;
1643 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001644
Christopher Fauleta1cda022016-12-21 08:58:06 +01001645 if (fpa > agent->max_fpa)
1646 goto stop;
1647
1648 recv_frame:
1649 if (skip_receiving)
1650 goto process;
1651
1652 framesz = 0;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001653 ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001654 if (ret > 1) {
1655 if (*frame == SPOE_FRM_T_AGENT_DISCON) {
1656 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1657 goto next;
1658 }
1659 framesz = ret;
1660 ret = handle_spoe_agentack_frame(appctx, frame, framesz);
1661 }
1662
1663 switch (ret) {
1664 case -1: /* error */
1665 if (framesz)
1666 bo_skip(si_oc(si), framesz+4);
1667 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1668 goto next;
1669
1670 case 0: /* ignore */
1671 if (framesz)
1672 bo_skip(si_oc(si), framesz+4);
1673 fpa++;
1674 break;
1675
1676 case 1: /* retry */
1677 skip_receiving = 1;
1678 break;
1679
1680 default:
1681 if (framesz)
1682 bo_skip(si_oc(si), framesz+4);
1683 fpa++;
1684 }
1685 goto process;
1686
1687 next:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001688 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001689 return 0;
1690 stop:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001691 if ((SPOE_APPCTX(appctx)->flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) ||
1692 LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001693 agent->applets_idle++;
1694 appctx->st0 = SPOE_APPCTX_ST_IDLE;
1695 }
Christopher Faulet42bfa462017-01-04 14:14:19 +01001696 if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) {
1697 LIST_DEL(&SPOE_APPCTX(appctx)->list);
1698 LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001699 if (fpa)
Christopher Faulet42bfa462017-01-04 14:14:19 +01001700 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001701 }
1702 return 1;
1703
1704 exit:
1705 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1706 return 0;
1707}
1708
1709static int
1710handle_disconnect_spoe_applet(struct appctx *appctx)
1711{
1712 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001713 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001714 char *frame = trash.str;
1715 int ret;
1716
1717 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
1718 goto exit;
1719
1720 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
1721 goto exit;
1722
Christopher Faulet42bfa462017-01-04 14:14:19 +01001723 ret = prepare_spoe_hadiscon_frame(appctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001724 if (ret > 1)
1725 ret = send_spoe_frame(appctx, frame, ret);
1726
1727 switch (ret) {
1728 case -1: /* error */
1729 goto exit;
1730
1731 case 0: /* ignore */
1732 goto exit;
1733
1734 case 1: /* retry */
1735 si_applet_cant_put(si);
1736 goto stop;
1737
1738 default:
1739 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1740 " - disconnected by HAProxy (%d): %s\n",
1741 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1742 __FUNCTION__, appctx, spoe_status_code,
1743 spoe_frm_err_reasons[spoe_status_code]);
1744
1745 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1746 goto next;
1747 }
1748
1749 next:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001750 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001751 return 0;
1752 stop:
1753 return 1;
1754 exit:
1755 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1756 return 0;
1757}
1758
1759static int
1760handle_disconnecting_spoe_applet(struct appctx *appctx)
1761{
1762 struct stream_interface *si = appctx->owner;
1763 char *frame = trash.str;
1764 int ret, framesz = 0;
1765
1766 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
1767 goto exit;
1768
1769 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
1770 goto exit;
1771
1772 framesz = 0;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001773 ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001774 if (ret > 1) {
1775 framesz = ret;
1776 ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz);
1777 }
1778
1779 switch (ret) {
1780 case -1: /* error */
1781 if (framesz)
1782 bo_skip(si_oc(si), framesz+4);
1783 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1784 " - error on frame (%s)\n",
1785 (int)now.tv_sec, (int)now.tv_usec,
Christopher Faulet42bfa462017-01-04 14:14:19 +01001786 ((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id,
Christopher Fauleta1cda022016-12-21 08:58:06 +01001787 __FUNCTION__, appctx,
1788 spoe_frm_err_reasons[spoe_status_code]);
1789 goto exit;
1790
1791 case 0: /* ignore */
1792 if (framesz)
1793 bo_skip(si_oc(si), framesz+4);
1794 goto next;
1795
1796 case 1: /* retry */
1797 goto stop;
1798
1799 default:
1800 if (framesz)
1801 bo_skip(si_oc(si), framesz+4);
1802 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1803 " - disconnected by peer (%d): %s\n",
1804 (int)now.tv_sec, (int)now.tv_usec,
Christopher Faulet42bfa462017-01-04 14:14:19 +01001805 ((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id,
Christopher Fauleta1cda022016-12-21 08:58:06 +01001806 __FUNCTION__, appctx, spoe_status_code,
1807 spoe_reason);
1808 goto exit;
1809 }
1810
1811 next:
1812 return 0;
1813 stop:
1814 return 1;
1815 exit:
1816 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1817 return 0;
1818}
1819
1820/* I/O Handler processing messages exchanged with the agent */
1821static void
1822handle_spoe_applet(struct appctx *appctx)
1823{
1824 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001825 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001826
1827 switchstate:
1828 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1829 " - appctx-state=%s\n",
1830 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1831 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1832
1833 switch (appctx->st0) {
1834 case SPOE_APPCTX_ST_CONNECT:
1835 spoe_status_code = SPOE_FRM_ERR_NONE;
1836 if (handle_connect_spoe_applet(appctx))
1837 goto out;
1838 goto switchstate;
1839
1840 case SPOE_APPCTX_ST_CONNECTING:
1841 if (handle_connecting_spoe_applet(appctx))
1842 goto out;
1843 goto switchstate;
1844
1845 case SPOE_APPCTX_ST_IDLE:
1846 if (stopping &&
1847 LIST_ISEMPTY(&agent->sending_queue) &&
Christopher Faulet42bfa462017-01-04 14:14:19 +01001848 LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
1849 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001850 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001851 goto switchstate;
1852 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001853 agent->applets_idle--;
1854 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1855 /* fall through */
1856
1857 case SPOE_APPCTX_ST_PROCESSING:
1858 if (handle_processing_spoe_applet(appctx))
1859 goto out;
1860 goto switchstate;
1861
1862 case SPOE_APPCTX_ST_DISCONNECT:
1863 if (handle_disconnect_spoe_applet(appctx))
1864 goto out;
1865 goto switchstate;
1866
1867 case SPOE_APPCTX_ST_DISCONNECTING:
1868 if (handle_disconnecting_spoe_applet(appctx))
1869 goto out;
1870 goto switchstate;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001871
1872 case SPOE_APPCTX_ST_EXIT:
1873 si_shutw(si);
1874 si_shutr(si);
1875 si_ic(si)->flags |= CF_READ_NULL;
1876 appctx->st0 = SPOE_APPCTX_ST_END;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001877 SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001878 /* fall through */
1879
1880 case SPOE_APPCTX_ST_END:
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001881 return;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001882 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001883 out:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001884 if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY)
1885 task_queue(SPOE_APPCTX(appctx)->task);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001886 si_oc(si)->flags |= CF_READ_DONTWAIT;
1887 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001888}
1889
1890struct applet spoe_applet = {
1891 .obj_type = OBJ_TYPE_APPLET,
1892 .name = "<SPOE>", /* used for logging */
1893 .fct = handle_spoe_applet,
1894 .release = release_spoe_applet,
1895};
1896
1897/* Create a SPOE applet. On success, the created applet is returned, else
1898 * NULL. */
1899static struct appctx *
1900create_spoe_appctx(struct spoe_config *conf)
1901{
1902 struct appctx *appctx;
1903 struct session *sess;
1904 struct task *task;
1905 struct stream *strm;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001906
1907 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1908 goto out_error;
1909
Christopher Faulet42bfa462017-01-04 14:14:19 +01001910 appctx->ctx.spoe.ptr = pool_alloc_dirty(pool2_spoe_appctx);
1911 if (SPOE_APPCTX(appctx) == NULL)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001912 goto out_free_appctx;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001913
Christopher Faulet42bfa462017-01-04 14:14:19 +01001914 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1915 if ((SPOE_APPCTX(appctx)->task = task_new()) == NULL)
1916 goto out_free_spoe_appctx;
1917
1918 SPOE_APPCTX(appctx)->owner = appctx;
1919 SPOE_APPCTX(appctx)->task->process = process_spoe_applet;
1920 SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
1921 SPOE_APPCTX(appctx)->task->context = appctx;
1922 SPOE_APPCTX(appctx)->agent = conf->agent;
1923 SPOE_APPCTX(appctx)->version = 0;
1924 SPOE_APPCTX(appctx)->max_frame_size = conf->agent->max_frame_size;
1925 SPOE_APPCTX(appctx)->flags = 0;
1926
1927 LIST_INIT(&SPOE_APPCTX(appctx)->list);
1928 LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001929
Willy Tarreau5820a362016-12-22 15:59:02 +01001930 sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001931 if (!sess)
1932 goto out_free_spoe;
1933
1934 if ((task = task_new()) == NULL)
1935 goto out_free_sess;
1936
1937 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1938 goto out_free_task;
1939
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001940 stream_set_backend(strm, conf->agent->b.be);
1941
1942 /* applet is waiting for data */
1943 si_applet_cant_get(&strm->si[0]);
1944 appctx_wakeup(appctx);
1945
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001946 strm->do_log = NULL;
1947 strm->res.flags |= CF_READ_DONTWAIT;
1948
1949 conf->agent_fe.feconn++;
1950 jobs++;
1951 totalconn++;
1952
Christopher Faulet42bfa462017-01-04 14:14:19 +01001953 task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
1954 LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001955 conf->agent->applets_act++;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001956 return appctx;
1957
1958 /* Error unrolling */
1959 out_free_task:
1960 task_free(task);
1961 out_free_sess:
1962 session_free(sess);
1963 out_free_spoe:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001964 task_free(SPOE_APPCTX(appctx)->task);
1965 out_free_spoe_appctx:
1966 pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001967 out_free_appctx:
1968 appctx_free(appctx);
1969 out_error:
1970 return NULL;
1971}
1972
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001973static int
Christopher Fauleta1cda022016-12-21 08:58:06 +01001974queue_spoe_context(struct spoe_context *ctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001975{
1976 struct spoe_config *conf = FLT_CONF(ctx->filter);
1977 struct spoe_agent *agent = conf->agent;
1978 struct appctx *appctx;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001979 struct spoe_appctx *spoe_appctx;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001980 unsigned int min_applets;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001981
Christopher Fauleta1cda022016-12-21 08:58:06 +01001982 min_applets = min_applets_act(agent);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001983
Christopher Fauleta1cda022016-12-21 08:58:06 +01001984 /* Check if we need to create a new SPOE applet or not. */
1985 if (agent->applets_act >= min_applets && agent->applets_idle && agent->sending_rate)
1986 goto end;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001987
1988 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Fauleta1cda022016-12-21 08:58:06 +01001989 " - try to create new SPOE appctx\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001990 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1991 ctx->strm);
1992
Christopher Fauleta1cda022016-12-21 08:58:06 +01001993 /* Do not try to create a new applet if there is no server up for the
1994 * agent's backend. */
1995 if (!agent->b.be->srv_act && !agent->b.be->srv_bck) {
1996 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1997 " - cannot create SPOE appctx: no server up\n",
1998 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1999 __FUNCTION__, ctx->strm);
2000 goto end;
2001 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002002
Christopher Fauleta1cda022016-12-21 08:58:06 +01002003 /* Do not try to create a new applet if we have reached the maximum of
2004 * connection per seconds */
Christopher Faulet48026722016-11-16 15:01:12 +01002005 if (agent->cps_max > 0) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01002006 if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) {
2007 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2008 " - cannot create SPOE appctx: max CPS reached\n",
2009 (int)now.tv_sec, (int)now.tv_usec, agent->id,
2010 __FUNCTION__, ctx->strm);
2011 goto end;
2012 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002013 }
2014
Christopher Fauleta1cda022016-12-21 08:58:06 +01002015 appctx = create_spoe_appctx(conf);
2016 if (appctx == NULL) {
2017 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2018 " - failed to create SPOE appctx\n",
2019 (int)now.tv_sec, (int)now.tv_usec, agent->id,
2020 __FUNCTION__, ctx->strm);
2021 goto end;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002022 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002023 if (agent->applets_act <= min_applets)
Christopher Faulet42bfa462017-01-04 14:14:19 +01002024 SPOE_APPCTX(appctx)->flags |= SPOE_APPCTX_FL_PERSIST;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002025
Christopher Fauleta1cda022016-12-21 08:58:06 +01002026 /* Increase the per-process number of cumulated connections */
2027 if (agent->cps_max > 0)
2028 update_freq_ctr(&agent->conn_per_sec, 1);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002029
Christopher Fauleta1cda022016-12-21 08:58:06 +01002030 end:
2031 /* The only reason to return an error is when there is no applet */
2032 if (LIST_ISEMPTY(&agent->applets))
2033 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002034
Christopher Fauleta1cda022016-12-21 08:58:06 +01002035 /* Add the SPOE context in the sending queue and update all running
2036 * info */
2037 LIST_ADDQ(&agent->sending_queue, &ctx->list);
2038 if (agent->sending_rate)
2039 agent->sending_rate--;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002040
2041 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Fauleta1cda022016-12-21 08:58:06 +01002042 " - Add stream in sending queue - applets_act=%u - applets_idle=%u"
2043 " - sending_rate=%u\n",
2044 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
2045 ctx->strm, agent->applets_act, agent->applets_idle, agent->sending_rate);
Christopher Fauletf7a30922016-11-10 15:04:51 +01002046
Christopher Fauleta1cda022016-12-21 08:58:06 +01002047 /* Finally try to wakeup the first IDLE applet found and move it at the
2048 * end of the list. */
Christopher Faulet42bfa462017-01-04 14:14:19 +01002049 list_for_each_entry(spoe_appctx, &agent->applets, list) {
2050 appctx = spoe_appctx->owner;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002051 if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
2052 si_applet_want_get(appctx->owner);
2053 si_applet_want_put(appctx->owner);
2054 appctx_wakeup(appctx);
Christopher Faulet42bfa462017-01-04 14:14:19 +01002055 LIST_DEL(&spoe_appctx->list);
2056 LIST_ADDQ(&agent->applets, &spoe_appctx->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002057 break;
2058 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002059 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002060 return 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002061}
2062
2063/***************************************************************************
2064 * Functions that process SPOE messages and actions
2065 **************************************************************************/
2066/* Process SPOE messages for a specific event. During the processing, it returns
2067 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
2068 * is returned. */
2069static int
2070process_spoe_messages(struct stream *s, struct spoe_context *ctx,
2071 struct list *messages, int dir)
2072{
Christopher Fauleta1cda022016-12-21 08:58:06 +01002073 struct spoe_config *conf = FLT_CONF(ctx->filter);
2074 struct spoe_agent *agent = conf->agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002075 struct spoe_message *msg;
2076 struct sample *smp;
2077 struct spoe_arg *arg;
2078 char *p;
2079 size_t max_size;
2080 int off, flag, idx = 0;
2081
2082 /* Reserve 32 bytes from the frame Metadata */
Christopher Fauleta1cda022016-12-21 08:58:06 +01002083 max_size = agent->max_frame_size - 32;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002084
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002085 p = ctx->buffer->p;
2086
2087 /* Loop on messages */
2088 list_for_each_entry(msg, messages, list) {
2089 if (idx + msg->id_len + 1 > max_size)
2090 goto skip;
2091
2092 /* Set the message name */
2093 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
2094
2095 /* Save offset where to store the number of arguments for this
2096 * message */
2097 off = idx++;
2098 p[off] = 0;
2099
2100 /* Loop on arguments */
2101 list_for_each_entry(arg, &msg->args, list) {
2102 p[off]++; /* Increment the number of arguments */
2103
2104 if (idx + arg->name_len + 1 > max_size)
2105 goto skip;
2106
2107 /* Encode the arguement name as a string. It can by NULL */
2108 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
2109
2110 /* Fetch the arguement value */
2111 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
2112 if (!smp) {
2113 /* If no value is available, set it to NULL */
2114 p[idx++] = SPOE_DATA_T_NULL;
2115 continue;
2116 }
2117
2118 /* Else, encode the arguement value */
2119 switch (smp->data.type) {
2120 case SMP_T_BOOL:
2121 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
2122 p[idx++] = (SPOE_DATA_T_BOOL | flag);
2123 break;
2124 case SMP_T_SINT:
2125 p[idx++] = SPOE_DATA_T_INT64;
2126 if (idx + 8 > max_size)
2127 goto skip;
2128 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
2129 break;
2130 case SMP_T_IPV4:
2131 p[idx++] = SPOE_DATA_T_IPV4;
2132 if (idx + 4 > max_size)
2133 goto skip;
2134 memcpy(p+idx, &smp->data.u.ipv4, 4);
2135 idx += 4;
2136 break;
2137 case SMP_T_IPV6:
2138 p[idx++] = SPOE_DATA_T_IPV6;
2139 if (idx + 16 > max_size)
2140 goto skip;
2141 memcpy(p+idx, &smp->data.u.ipv6, 16);
2142 idx += 16;
2143 break;
2144 case SMP_T_STR:
2145 p[idx++] = SPOE_DATA_T_STR;
2146 if (idx + smp->data.u.str.len > max_size)
2147 goto skip;
2148 idx += encode_spoe_string(smp->data.u.str.str,
2149 smp->data.u.str.len,
2150 p+idx);
2151 break;
2152 case SMP_T_BIN:
2153 p[idx++] = SPOE_DATA_T_BIN;
2154 if (idx + smp->data.u.str.len > max_size)
2155 goto skip;
2156 idx += encode_spoe_string(smp->data.u.str.str,
2157 smp->data.u.str.len,
2158 p+idx);
2159 break;
2160 case SMP_T_METH:
2161 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
2162 p[idx++] = SPOE_DATA_T_STR;
2163 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
2164 goto skip;
2165 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
2166 http_known_methods[smp->data.u.meth.meth].len,
2167 p+idx);
2168 }
2169 else {
2170 p[idx++] = SPOE_DATA_T_STR;
2171 if (idx + smp->data.u.str.len > max_size)
2172 goto skip;
2173 idx += encode_spoe_string(smp->data.u.meth.str.str,
2174 smp->data.u.meth.str.len,
2175 p+idx);
2176 }
2177 break;
2178 default:
2179 p[idx++] = SPOE_DATA_T_NULL;
2180 }
2181 }
2182 }
2183 ctx->buffer->i = idx;
2184 return 1;
2185
2186 skip:
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002187 return 0;
2188}
2189
2190/* Helper function to set a variable */
2191static void
2192set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
2193 struct sample *smp)
2194{
2195 struct spoe_config *conf = FLT_CONF(ctx->filter);
2196 struct spoe_agent *agent = conf->agent;
2197 char varname[64];
2198
2199 memset(varname, 0, sizeof(varname));
2200 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
2201 scope, agent->var_pfx, len, name);
2202 vars_set_by_name_ifexist(varname, len, smp);
2203}
2204
2205/* Helper function to unset a variable */
2206static void
2207unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
2208 struct sample *smp)
2209{
2210 struct spoe_config *conf = FLT_CONF(ctx->filter);
2211 struct spoe_agent *agent = conf->agent;
2212 char varname[64];
2213
2214 memset(varname, 0, sizeof(varname));
2215 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
2216 scope, agent->var_pfx, len, name);
2217 vars_unset_by_name_ifexist(varname, len, smp);
2218}
2219
2220
2221/* Process SPOE actions for a specific event. During the processing, it returns
2222 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
2223 * is returned. */
2224static int
2225process_spoe_actions(struct stream *s, struct spoe_context *ctx,
2226 enum spoe_event ev, int dir)
2227{
2228 char *p;
2229 size_t size;
2230 int off, i, idx = 0;
2231
2232 p = ctx->buffer->p;
2233 size = ctx->buffer->i;
2234
2235 while (idx < size) {
2236 char *str;
2237 uint64_t sz;
2238 struct sample smp;
2239 enum spoe_action_type type;
2240
2241 off = idx;
2242 if (idx+2 > size)
2243 goto skip;
2244
2245 type = p[idx++];
2246 switch (type) {
2247 case SPOE_ACT_T_SET_VAR: {
2248 char *scope;
2249
2250 if (p[idx++] != 3)
2251 goto skip_action;
2252
2253 switch (p[idx++]) {
2254 case SPOE_SCOPE_PROC: scope = "proc"; break;
2255 case SPOE_SCOPE_SESS: scope = "sess"; break;
2256 case SPOE_SCOPE_TXN : scope = "txn"; break;
2257 case SPOE_SCOPE_REQ : scope = "req"; break;
2258 case SPOE_SCOPE_RES : scope = "res"; break;
2259 default: goto skip;
2260 }
2261
2262 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2263 if (str == NULL)
2264 goto skip;
2265 memset(&smp, 0, sizeof(smp));
2266 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
Christopher Fauletb5cff602016-11-24 14:53:22 +01002267
2268 if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002269 goto skip;
Christopher Fauletb5cff602016-11-24 14:53:22 +01002270 idx += i;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002271
2272 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2273 " - set-var '%s.%s.%.*s'\n",
2274 (int)now.tv_sec, (int)now.tv_usec,
2275 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2276 __FUNCTION__, s, scope,
2277 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2278 (int)sz, str);
2279
2280 set_spoe_var(ctx, scope, str, sz, &smp);
2281 break;
2282 }
2283
2284 case SPOE_ACT_T_UNSET_VAR: {
2285 char *scope;
2286
2287 if (p[idx++] != 2)
2288 goto skip_action;
2289
2290 switch (p[idx++]) {
2291 case SPOE_SCOPE_PROC: scope = "proc"; break;
2292 case SPOE_SCOPE_SESS: scope = "sess"; break;
2293 case SPOE_SCOPE_TXN : scope = "txn"; break;
2294 case SPOE_SCOPE_REQ : scope = "req"; break;
2295 case SPOE_SCOPE_RES : scope = "res"; break;
2296 default: goto skip;
2297 }
2298
2299 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2300 if (str == NULL)
2301 goto skip;
2302 memset(&smp, 0, sizeof(smp));
2303 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2304
2305 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2306 " - unset-var '%s.%s.%.*s'\n",
2307 (int)now.tv_sec, (int)now.tv_usec,
2308 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2309 __FUNCTION__, s, scope,
2310 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2311 (int)sz, str);
2312
2313 unset_spoe_var(ctx, scope, str, sz, &smp);
2314 break;
2315 }
2316
2317 default:
2318 skip_action:
2319 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2320 goto skip;
2321 idx += i;
2322 }
2323 }
2324
2325 return 1;
2326 skip:
2327 return 0;
2328}
2329
Christopher Fauleta1cda022016-12-21 08:58:06 +01002330static int
2331start_event_processing(struct spoe_context *ctx, int dir)
2332{
2333 int ret;
2334 /* If a process is already started for this SPOE context, retry
2335 * later. */
2336 if (ctx->flags & SPOE_CTX_FL_PROCESS)
2337 goto wait;
2338
2339 ret = acquire_spoe_buffer(ctx);
2340 if (ret <= 0)
2341 return ret;
2342
2343 /* Set the right flag to prevent request and response processing
2344 * in same time. */
2345 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
2346 ? SPOE_CTX_FL_REQ_PROCESS
2347 : SPOE_CTX_FL_RSP_PROCESS);
2348
2349 return 1;
2350
2351 wait:
2352 return 0;
2353}
2354
2355static void
2356stop_event_processing(struct spoe_context *ctx)
2357{
2358 /* Reset the flag to allow next processing */
2359 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2360
2361 /* Reset processing timer */
2362 ctx->process_exp = TICK_ETERNITY;
2363
2364 release_spoe_buffer(ctx);
2365
2366 if (!LIST_ISEMPTY(&ctx->list)) {
2367 LIST_DEL(&ctx->list);
2368 LIST_INIT(&ctx->list);
2369 }
2370}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002371
2372/* Process a SPOE event. First, this functions will process messages attached to
2373 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2374 * ACK frame to process corresponding actions. During all the processing, it
2375 * returns 0 and it returns 1 when the processing is finished. If an error
2376 * occurred, -1 is returned. */
2377static int
2378process_spoe_event(struct stream *s, struct spoe_context *ctx,
2379 enum spoe_event ev)
2380{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002381 struct spoe_config *conf = FLT_CONF(ctx->filter);
2382 struct spoe_agent *agent = conf->agent;
2383 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002384
2385 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2386 " - ctx-state=%s - event=%s\n",
2387 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002388 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002389 spoe_event_str[ev]);
2390
Christopher Faulet48026722016-11-16 15:01:12 +01002391
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002392 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2393
2394 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2395 goto out;
2396
2397 if (ctx->state == SPOE_CTX_ST_ERROR)
2398 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002399
2400 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2401 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2402 " - failed to process event '%s': timeout\n",
2403 (int)now.tv_sec, (int)now.tv_usec,
2404 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2405 send_log(ctx->strm->be, LOG_WARNING,
2406 "failed to process event '%s': timeout.\n",
2407 spoe_event_str[ev]);
2408 goto error;
2409 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002410
2411 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01002412 if (agent->eps_max > 0) {
2413 if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2414 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2415 " - skip event '%s': max EPS reached\n",
2416 (int)now.tv_sec, (int)now.tv_usec,
2417 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2418 goto skip;
2419 }
2420 }
2421
Christopher Fauletf7a30922016-11-10 15:04:51 +01002422 if (!tick_isset(ctx->process_exp)) {
2423 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2424 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2425 ctx->process_exp);
2426 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002427 ret = start_event_processing(ctx, dir);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002428 if (ret <= 0) {
2429 if (!ret)
2430 goto out;
2431 goto error;
2432 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002433 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2434 if (ret <= 0) {
2435 if (!ret)
2436 goto skip;
2437 goto error;
2438 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002439
2440 if (!queue_spoe_context(ctx))
2441 goto error;
2442
2443 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2444 /* fall through */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002445 }
2446
Christopher Fauleta1cda022016-12-21 08:58:06 +01002447 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS ||
2448 ctx->state == SPOE_CTX_ST_WAITING_ACK) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002449 ret = 0;
2450 goto out;
2451 }
2452
2453 if (ctx->state == SPOE_CTX_ST_DONE) {
2454 ret = process_spoe_actions(s, ctx, ev, dir);
2455 if (ret <= 0) {
2456 if (!ret)
2457 goto skip;
2458 goto error;
2459 }
2460 ctx->frame_id++;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002461 ctx->state = SPOE_CTX_ST_READY;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002462 goto end;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002463 }
2464
2465 out:
2466 return ret;
2467
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002468 error:
Christopher Faulet48026722016-11-16 15:01:12 +01002469 if (agent->eps_max > 0)
2470 update_freq_ctr(&agent->err_per_sec, 1);
2471
Christopher Faulet985532d2016-11-16 15:36:19 +01002472 if (agent->var_on_error) {
2473 struct sample smp;
2474
Christopher Fauleta1cda022016-12-21 08:58:06 +01002475 // FIXME: Get the error code here
Christopher Faulet985532d2016-11-16 15:36:19 +01002476 memset(&smp, 0, sizeof(smp));
2477 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2478 smp.data.u.sint = 1;
2479 smp.data.type = SMP_T_BOOL;
2480
2481 set_spoe_var(ctx, "txn", agent->var_on_error,
2482 strlen(agent->var_on_error), &smp);
2483 }
2484
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002485 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2486 ? SPOE_CTX_ST_READY
2487 : SPOE_CTX_ST_ERROR);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002488 ret = 1;
2489 goto end;
2490
2491 skip:
2492 ctx->state = SPOE_CTX_ST_READY;
2493 ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002494
Christopher Fauleta1cda022016-12-21 08:58:06 +01002495 end:
2496 stop_event_processing(ctx);
2497 return ret;
2498}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002499
2500/***************************************************************************
2501 * Functions that create/destroy SPOE contexts
2502 **************************************************************************/
Christopher Fauleta1cda022016-12-21 08:58:06 +01002503static int
2504acquire_spoe_buffer(struct spoe_context *ctx)
2505{
2506 if (ctx->buffer != &buf_empty)
2507 return 1;
2508
2509 if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
2510 LIST_DEL(&ctx->buffer_wait.list);
2511 LIST_INIT(&ctx->buffer_wait.list);
2512 }
2513
2514 if (b_alloc_margin(&ctx->buffer, global.tune.reserved_bufs))
2515 return 1;
2516
2517 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait.list);
2518 return 0;
2519}
2520
2521static void
2522release_spoe_buffer(struct spoe_context *ctx)
2523{
2524 if (!LIST_ISEMPTY(&ctx->buffer_wait.list)) {
2525 LIST_DEL(&ctx->buffer_wait.list);
2526 LIST_INIT(&ctx->buffer_wait.list);
2527 }
2528
2529 /* Release the buffer if needed */
2530 if (ctx->buffer != &buf_empty) {
2531 b_free(&ctx->buffer);
2532 offer_buffers(ctx, tasks_run_queue + applets_active_queue);
2533 }
2534}
2535
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002536static int wakeup_spoe_context(struct spoe_context *ctx)
2537{
2538 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
2539 return 1;
2540}
2541
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002542static struct spoe_context *
2543create_spoe_context(struct filter *filter)
2544{
2545 struct spoe_config *conf = FLT_CONF(filter);
2546 struct spoe_context *ctx;
2547
2548 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2549 if (ctx == NULL) {
2550 return NULL;
2551 }
2552 memset(ctx, 0, sizeof(*ctx));
2553 ctx->filter = filter;
2554 ctx->state = SPOE_CTX_ST_NONE;
2555 ctx->flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002556 ctx->messages = conf->agent->messages;
2557 ctx->buffer = &buf_empty;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002558 LIST_INIT(&ctx->buffer_wait.list);
2559 ctx->buffer_wait.target = ctx;
2560 ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002561 LIST_INIT(&ctx->list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002562
Christopher Fauletf7a30922016-11-10 15:04:51 +01002563 ctx->stream_id = 0;
2564 ctx->frame_id = 1;
2565 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002566
2567 return ctx;
2568}
2569
2570static void
2571destroy_spoe_context(struct spoe_context *ctx)
2572{
2573 if (!ctx)
2574 return;
2575
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002576 if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
2577 LIST_DEL(&ctx->buffer_wait.list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002578 if (!LIST_ISEMPTY(&ctx->list))
2579 LIST_DEL(&ctx->list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002580 pool_free2(pool2_spoe_ctx, ctx);
2581}
2582
2583static void
2584reset_spoe_context(struct spoe_context *ctx)
2585{
2586 ctx->state = SPOE_CTX_ST_READY;
2587 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2588}
2589
2590
2591/***************************************************************************
2592 * Hooks that manage the filter lifecycle (init/check/deinit)
2593 **************************************************************************/
2594/* Signal handler: Do a soft stop, wakeup SPOE applet */
2595static void
2596sig_stop_spoe(struct sig_handler *sh)
2597{
2598 struct proxy *p;
2599
2600 p = proxy;
2601 while (p) {
2602 struct flt_conf *fconf;
2603
2604 list_for_each_entry(fconf, &p->filter_configs, list) {
Christopher Faulet3b386a32017-02-23 10:17:15 +01002605 struct spoe_config *conf;
2606 struct spoe_agent *agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002607 struct appctx *appctx;
Christopher Faulet42bfa462017-01-04 14:14:19 +01002608 struct spoe_appctx *spoe_appctx;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002609
Christopher Faulet3b386a32017-02-23 10:17:15 +01002610 if (fconf->id != spoe_filter_id)
2611 continue;
2612
2613 conf = fconf->conf;
2614 agent = conf->agent;
2615
Christopher Faulet42bfa462017-01-04 14:14:19 +01002616 list_for_each_entry(spoe_appctx, &agent->applets, list) {
2617 appctx = spoe_appctx->owner;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002618 si_applet_want_get(appctx->owner);
2619 si_applet_want_put(appctx->owner);
2620 appctx_wakeup(appctx);
2621 }
2622 }
2623 p = p->next;
2624 }
2625}
2626
2627
2628/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2629static int
2630spoe_init(struct proxy *px, struct flt_conf *fconf)
2631{
2632 struct spoe_config *conf = fconf->conf;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002633
2634 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2635 init_new_proxy(&conf->agent_fe);
2636 conf->agent_fe.parent = conf->agent;
2637 conf->agent_fe.last_change = now.tv_sec;
2638 conf->agent_fe.id = conf->agent->id;
2639 conf->agent_fe.cap = PR_CAP_FE;
2640 conf->agent_fe.mode = PR_MODE_TCP;
2641 conf->agent_fe.maxconn = 0;
2642 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2643 conf->agent_fe.conn_retries = CONN_RETRIES;
2644 conf->agent_fe.accept = frontend_accept;
2645 conf->agent_fe.srv = NULL;
2646 conf->agent_fe.timeout.client = TICK_ETERNITY;
2647 conf->agent_fe.default_target = &spoe_applet.obj_type;
2648 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2649
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002650 if (!sighandler_registered) {
2651 signal_register_fct(0, sig_stop_spoe, 0);
2652 sighandler_registered = 1;
2653 }
2654
2655 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002656}
2657
2658/* Free ressources allocated by the SPOE filter. */
2659static void
2660spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2661{
2662 struct spoe_config *conf = fconf->conf;
2663
2664 if (conf) {
2665 struct spoe_agent *agent = conf->agent;
2666 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2667 struct listener *, by_fe);
2668
2669 free(l);
2670 release_spoe_agent(agent);
2671 free(conf);
2672 }
2673 fconf->conf = NULL;
2674}
2675
2676/* Check configuration of a SPOE filter for a specified proxy.
2677 * Return 1 on error, else 0. */
2678static int
2679spoe_check(struct proxy *px, struct flt_conf *fconf)
2680{
2681 struct spoe_config *conf = fconf->conf;
2682 struct proxy *target;
2683
2684 target = proxy_be_by_name(conf->agent->b.name);
2685 if (target == NULL) {
2686 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2687 " declared at %s:%d.\n",
2688 px->id, conf->agent->b.name, conf->agent->id,
2689 conf->agent->conf.file, conf->agent->conf.line);
2690 return 1;
2691 }
2692 if (target->mode != PR_MODE_TCP) {
2693 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2694 " at %s:%d does not support HTTP mode.\n",
2695 px->id, target->id, conf->agent->id,
2696 conf->agent->conf.file, conf->agent->conf.line);
2697 return 1;
2698 }
2699
2700 free(conf->agent->b.name);
2701 conf->agent->b.name = NULL;
2702 conf->agent->b.be = target;
2703 return 0;
2704}
2705
2706/**************************************************************************
2707 * Hooks attached to a stream
2708 *************************************************************************/
2709/* Called when a filter instance is created and attach to a stream. It creates
2710 * the context that will be used to process this stream. */
2711static int
2712spoe_start(struct stream *s, struct filter *filter)
2713{
2714 struct spoe_context *ctx;
2715
2716 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2717 (int)now.tv_sec, (int)now.tv_usec,
2718 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2719 __FUNCTION__, s);
2720
2721 ctx = create_spoe_context(filter);
2722 if (ctx == NULL) {
2723 send_log(s->be, LOG_EMERG,
2724 "failed to create SPOE context for proxy %s\n",
2725 s->be->id);
2726 return 0;
2727 }
2728
2729 ctx->strm = s;
2730 ctx->state = SPOE_CTX_ST_READY;
2731 filter->ctx = ctx;
2732
2733 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2734 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2735
2736 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2737 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2738
2739 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2740 filter->pre_analyzers |= AN_RES_INSPECT;
2741
2742 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2743 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2744
2745 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2746 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2747
2748 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2749 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2750
2751 return 1;
2752}
2753
2754/* Called when a filter instance is detached from a stream. It release the
2755 * attached SPOE context. */
2756static void
2757spoe_stop(struct stream *s, struct filter *filter)
2758{
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002759 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2760 (int)now.tv_sec, (int)now.tv_usec,
2761 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2762 __FUNCTION__, s);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002763 destroy_spoe_context(filter->ctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002764}
2765
Christopher Fauletf7a30922016-11-10 15:04:51 +01002766
2767/*
2768 * Called when the stream is woken up because of expired timer.
2769 */
2770static void
2771spoe_check_timeouts(struct stream *s, struct filter *filter)
2772{
2773 struct spoe_context *ctx = filter->ctx;
2774
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002775 if (tick_is_expired(ctx->process_exp, now_ms)) {
2776 s->pending_events |= TASK_WOKEN_MSG;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002777 release_spoe_buffer(ctx);
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002778 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01002779}
2780
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002781/* Called when we are ready to filter data on a channel */
2782static int
2783spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2784{
2785 struct spoe_context *ctx = filter->ctx;
2786 int ret = 1;
2787
2788 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2789 " - ctx-flags=0x%08x\n",
2790 (int)now.tv_sec, (int)now.tv_usec,
2791 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2792 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2793
2794 if (!(chn->flags & CF_ISRESP)) {
2795 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2796 chn->analysers |= AN_REQ_INSPECT_FE;
2797 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2798 chn->analysers |= AN_REQ_INSPECT_BE;
2799
2800 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2801 goto out;
2802
2803 ctx->stream_id = s->uniq_id;
2804 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2805 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2806 if (ret != 1)
2807 goto out;
2808 }
2809 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2810 }
2811 else {
2812 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2813 chn->analysers |= AN_RES_INSPECT;
2814
2815 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2816 goto out;
2817
2818 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2819 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2820 if (ret != 1)
2821 goto out;
2822 }
2823 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002824 if (!ret) {
2825 channel_dont_read(chn);
2826 channel_dont_close(chn);
2827 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002828 }
2829
2830 out:
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002831 return ret;
2832}
2833
2834/* Called before a processing happens on a given channel */
2835static int
2836spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2837 struct channel *chn, unsigned an_bit)
2838{
2839 struct spoe_context *ctx = filter->ctx;
2840 int ret = 1;
2841
2842 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2843 " - ctx-flags=0x%08x - ana=0x%08x\n",
2844 (int)now.tv_sec, (int)now.tv_usec,
2845 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2846 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2847 ctx->flags, an_bit);
2848
2849 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2850 goto out;
2851
2852 switch (an_bit) {
2853 case AN_REQ_INSPECT_FE:
2854 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2855 break;
2856 case AN_REQ_INSPECT_BE:
2857 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2858 break;
2859 case AN_RES_INSPECT:
2860 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2861 break;
2862 case AN_REQ_HTTP_PROCESS_FE:
2863 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2864 break;
2865 case AN_REQ_HTTP_PROCESS_BE:
2866 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2867 break;
2868 case AN_RES_HTTP_PROCESS_FE:
2869 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2870 break;
2871 }
2872
2873 out:
2874 if (!ret) {
2875 channel_dont_read(chn);
2876 channel_dont_close(chn);
2877 }
2878 return ret;
2879}
2880
2881/* Called when the filtering on the channel ends. */
2882static int
2883spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2884{
2885 struct spoe_context *ctx = filter->ctx;
2886
2887 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2888 " - ctx-flags=0x%08x\n",
2889 (int)now.tv_sec, (int)now.tv_usec,
2890 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2891 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2892
2893 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2894 reset_spoe_context(ctx);
2895 }
2896
2897 return 1;
2898}
2899
2900/********************************************************************
2901 * Functions that manage the filter initialization
2902 ********************************************************************/
2903struct flt_ops spoe_ops = {
2904 /* Manage SPOE filter, called for each filter declaration */
2905 .init = spoe_init,
2906 .deinit = spoe_deinit,
2907 .check = spoe_check,
2908
2909 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002910 .attach = spoe_start,
2911 .detach = spoe_stop,
2912 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002913
2914 /* Handle channels activity */
2915 .channel_start_analyze = spoe_start_analyze,
2916 .channel_pre_analyze = spoe_chn_pre_analyze,
2917 .channel_end_analyze = spoe_end_analyze,
2918};
2919
2920
2921static int
2922cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2923{
2924 const char *err;
2925 int i, err_code = 0;
2926
2927 if ((cfg_scope == NULL && curengine != NULL) ||
2928 (cfg_scope != NULL && curengine == NULL) ||
2929 strcmp(curengine, cfg_scope))
2930 goto out;
2931
2932 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2933 if (!*args[1]) {
2934 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2935 file, linenum);
2936 err_code |= ERR_ALERT | ERR_ABORT;
2937 goto out;
2938 }
2939 if (*args[2]) {
2940 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2941 file, linenum, args[2]);
2942 err_code |= ERR_ALERT | ERR_ABORT;
2943 goto out;
2944 }
2945
2946 err = invalid_char(args[1]);
2947 if (err) {
2948 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2949 file, linenum, *err, args[0], args[1]);
2950 err_code |= ERR_ALERT | ERR_ABORT;
2951 goto out;
2952 }
2953
2954 if (curagent != NULL) {
2955 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2956 file, linenum);
2957 err_code |= ERR_ALERT | ERR_ABORT;
2958 goto out;
2959 }
2960 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2961 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2962 err_code |= ERR_ALERT | ERR_ABORT;
2963 goto out;
2964 }
2965
2966 curagent->id = strdup(args[1]);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002967
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002968 curagent->conf.file = strdup(file);
2969 curagent->conf.line = linenum;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002970
2971 curagent->timeout.hello = TICK_ETERNITY;
2972 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002973 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002974
2975 curagent->engine_id = NULL;
2976 curagent->var_pfx = NULL;
2977 curagent->var_on_error = NULL;
2978 curagent->flags = 0;
2979 curagent->cps_max = 0;
2980 curagent->eps_max = 0;
2981 curagent->max_frame_size = global.tune.bufsize - 4;
2982 curagent->min_applets = 0;
2983 curagent->max_fpa = 100;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002984
2985 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2986 LIST_INIT(&curagent->messages[i]);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002987
2988 curagent->applets_act = 0;
2989 curagent->applets_idle = 0;
2990 curagent->sending_rate = 0;
2991
2992 LIST_INIT(&curagent->applets);
2993 LIST_INIT(&curagent->sending_queue);
2994 LIST_INIT(&curagent->waiting_queue);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002995 }
2996 else if (!strcmp(args[0], "use-backend")) {
2997 if (!*args[1]) {
2998 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2999 file, linenum, args[0]);
3000 err_code |= ERR_ALERT | ERR_FATAL;
3001 goto out;
3002 }
3003 if (*args[2]) {
3004 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3005 file, linenum, args[2]);
3006 err_code |= ERR_ALERT | ERR_ABORT;
3007 goto out;
3008 }
3009 free(curagent->b.name);
3010 curagent->b.name = strdup(args[1]);
3011 }
3012 else if (!strcmp(args[0], "messages")) {
3013 int cur_arg = 1;
3014 while (*args[cur_arg]) {
3015 struct spoe_msg_placeholder *mp = NULL;
3016
3017 list_for_each_entry(mp, &curmps, list) {
3018 if (!strcmp(mp->id, args[cur_arg])) {
3019 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
3020 file, linenum, args[cur_arg]);
3021 err_code |= ERR_ALERT | ERR_FATAL;
3022 goto out;
3023 }
3024 }
3025
3026 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
3027 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
3028 err_code |= ERR_ALERT | ERR_ABORT;
3029 goto out;
3030 }
3031 mp->id = strdup(args[cur_arg]);
3032 LIST_ADDQ(&curmps, &mp->list);
3033 cur_arg++;
3034 }
3035 }
3036 else if (!strcmp(args[0], "timeout")) {
3037 unsigned int *tv = NULL;
3038 const char *res;
3039 unsigned timeout;
3040
3041 if (!*args[1]) {
3042 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
3043 file, linenum);
3044 err_code |= ERR_ALERT | ERR_FATAL;
3045 goto out;
3046 }
3047 if (!strcmp(args[1], "hello"))
3048 tv = &curagent->timeout.hello;
3049 else if (!strcmp(args[1], "idle"))
3050 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01003051 else if (!strcmp(args[1], "processing"))
3052 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003053 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01003054 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003055 file, linenum, args[1]);
3056 err_code |= ERR_ALERT | ERR_FATAL;
3057 goto out;
3058 }
3059 if (!*args[2]) {
3060 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
3061 file, linenum, args[1]);
3062 err_code |= ERR_ALERT | ERR_FATAL;
3063 goto out;
3064 }
3065 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
3066 if (res) {
3067 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
3068 file, linenum, *res, args[1]);
3069 err_code |= ERR_ALERT | ERR_ABORT;
3070 goto out;
3071 }
3072 if (*args[3]) {
3073 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3074 file, linenum, args[3]);
3075 err_code |= ERR_ALERT | ERR_ABORT;
3076 goto out;
3077 }
3078 *tv = MS_TO_TICKS(timeout);
3079 }
3080 else if (!strcmp(args[0], "option")) {
3081 if (!*args[1]) {
3082 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
3083 file, linenum, args[0]);
3084 err_code |= ERR_ALERT | ERR_FATAL;
3085 goto out;
3086 }
3087 if (!strcmp(args[1], "var-prefix")) {
3088 char *tmp;
3089
3090 if (!*args[2]) {
3091 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
3092 file, linenum, args[0],
3093 args[1]);
3094 err_code |= ERR_ALERT | ERR_FATAL;
3095 goto out;
3096 }
3097 tmp = args[2];
3098 while (*tmp) {
3099 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3100 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
3101 file, linenum, args[0], args[1]);
3102 err_code |= ERR_ALERT | ERR_FATAL;
3103 goto out;
3104 }
3105 tmp++;
3106 }
3107 curagent->var_pfx = strdup(args[2]);
3108 }
Christopher Fauletea62c2a2016-11-14 10:54:21 +01003109 else if (!strcmp(args[1], "continue-on-error")) {
3110 if (*args[2]) {
3111 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
Christopher Faulet48026722016-11-16 15:01:12 +01003112 file, linenum, args[2]);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01003113 err_code |= ERR_ALERT | ERR_ABORT;
3114 goto out;
3115 }
3116 curagent->flags |= SPOE_FL_CONT_ON_ERR;
3117 }
Christopher Faulet985532d2016-11-16 15:36:19 +01003118 else if (!strcmp(args[1], "set-on-error")) {
3119 char *tmp;
3120
3121 if (!*args[2]) {
3122 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
3123 file, linenum, args[0],
3124 args[1]);
3125 err_code |= ERR_ALERT | ERR_FATAL;
3126 goto out;
3127 }
3128 tmp = args[2];
3129 while (*tmp) {
3130 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3131 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
3132 file, linenum, args[0], args[1]);
3133 err_code |= ERR_ALERT | ERR_FATAL;
3134 goto out;
3135 }
3136 tmp++;
3137 }
3138 curagent->var_on_error = strdup(args[2]);
3139 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003140 else {
3141 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
3142 file, linenum, args[1]);
3143 err_code |= ERR_ALERT | ERR_FATAL;
3144 goto out;
3145 }
Christopher Faulet48026722016-11-16 15:01:12 +01003146 }
3147 else if (!strcmp(args[0], "maxconnrate")) {
3148 if (!*args[1]) {
3149 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
3150 file, linenum, args[0]);
3151 err_code |= ERR_ALERT | ERR_FATAL;
3152 goto out;
3153 }
3154 if (*args[2]) {
3155 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3156 file, linenum, args[2]);
3157 err_code |= ERR_ALERT | ERR_ABORT;
3158 goto out;
3159 }
3160 curagent->cps_max = atol(args[1]);
3161 }
3162 else if (!strcmp(args[0], "maxerrrate")) {
3163 if (!*args[1]) {
3164 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
3165 file, linenum, args[0]);
3166 err_code |= ERR_ALERT | ERR_FATAL;
3167 goto out;
3168 }
3169 if (*args[2]) {
3170 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3171 file, linenum, args[2]);
3172 err_code |= ERR_ALERT | ERR_ABORT;
3173 goto out;
3174 }
3175 curagent->eps_max = atol(args[1]);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003176 }
3177 else if (*args[0]) {
3178 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
3179 file, linenum, args[0]);
3180 err_code |= ERR_ALERT | ERR_FATAL;
3181 goto out;
3182 }
3183 out:
3184 return err_code;
3185}
3186
3187static int
3188cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
3189{
3190 struct spoe_message *msg;
3191 struct spoe_arg *arg;
3192 const char *err;
3193 char *errmsg = NULL;
3194 int err_code = 0;
3195
3196 if ((cfg_scope == NULL && curengine != NULL) ||
3197 (cfg_scope != NULL && curengine == NULL) ||
3198 strcmp(curengine, cfg_scope))
3199 goto out;
3200
3201 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
3202 if (!*args[1]) {
3203 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
3204 file, linenum);
3205 err_code |= ERR_ALERT | ERR_ABORT;
3206 goto out;
3207 }
3208 if (*args[2]) {
3209 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3210 file, linenum, args[2]);
3211 err_code |= ERR_ALERT | ERR_ABORT;
3212 goto out;
3213 }
3214
3215 err = invalid_char(args[1]);
3216 if (err) {
3217 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
3218 file, linenum, *err, args[0], args[1]);
3219 err_code |= ERR_ALERT | ERR_ABORT;
3220 goto out;
3221 }
3222
3223 list_for_each_entry(msg, &curmsgs, list) {
3224 if (!strcmp(msg->id, args[1])) {
3225 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
3226 " name as another one declared at %s:%d.\n",
3227 file, linenum, args[1], msg->conf.file, msg->conf.line);
3228 err_code |= ERR_ALERT | ERR_FATAL;
3229 goto out;
3230 }
3231 }
3232
3233 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
3234 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
3235 err_code |= ERR_ALERT | ERR_ABORT;
3236 goto out;
3237 }
3238
3239 curmsg->id = strdup(args[1]);
3240 curmsg->id_len = strlen(curmsg->id);
3241 curmsg->event = SPOE_EV_NONE;
3242 curmsg->conf.file = strdup(file);
3243 curmsg->conf.line = linenum;
3244 LIST_INIT(&curmsg->args);
3245 LIST_ADDQ(&curmsgs, &curmsg->list);
3246 }
3247 else if (!strcmp(args[0], "args")) {
3248 int cur_arg = 1;
3249
3250 curproxy->conf.args.ctx = ARGC_SPOE;
3251 curproxy->conf.args.file = file;
3252 curproxy->conf.args.line = linenum;
3253 while (*args[cur_arg]) {
3254 char *delim = strchr(args[cur_arg], '=');
3255 int idx = 0;
3256
3257 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
3258 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
3259 err_code |= ERR_ALERT | ERR_ABORT;
3260 goto out;
3261 }
3262
3263 if (!delim) {
3264 arg->name = NULL;
3265 arg->name_len = 0;
3266 delim = args[cur_arg];
3267 }
3268 else {
3269 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
3270 arg->name_len = delim - args[cur_arg];
3271 delim++;
3272 }
Christopher Fauletb0b42382017-02-23 22:41:09 +01003273 arg->expr = sample_parse_expr((char*[]){delim, NULL},
3274 &idx, file, linenum, &errmsg,
3275 &curproxy->conf.args);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003276 if (arg->expr == NULL) {
3277 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
3278 err_code |= ERR_ALERT | ERR_FATAL;
3279 free(arg->name);
3280 free(arg);
3281 goto out;
3282 }
3283 LIST_ADDQ(&curmsg->args, &arg->list);
3284 cur_arg++;
3285 }
3286 curproxy->conf.args.file = NULL;
3287 curproxy->conf.args.line = 0;
3288 }
3289 else if (!strcmp(args[0], "event")) {
3290 if (!*args[1]) {
3291 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
3292 err_code |= ERR_ALERT | ERR_ABORT;
3293 goto out;
3294 }
3295 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
3296 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
3297 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
3298 curmsg->event = SPOE_EV_ON_SERVER_SESS;
3299
3300 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
3301 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
3302 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
3303 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
3304 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
3305 curmsg->event = SPOE_EV_ON_TCP_RSP;
3306
3307 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
3308 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
3309 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
3310 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
3311 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
3312 curmsg->event = SPOE_EV_ON_HTTP_RSP;
3313 else {
3314 Alert("parsing [%s:%d] : unkown event '%s'.\n",
3315 file, linenum, args[1]);
3316 err_code |= ERR_ALERT | ERR_ABORT;
3317 goto out;
3318 }
3319 }
3320 else if (!*args[0]) {
3321 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
3322 file, linenum, args[0]);
3323 err_code |= ERR_ALERT | ERR_FATAL;
3324 goto out;
3325 }
3326 out:
3327 free(errmsg);
3328 return err_code;
3329}
3330
3331/* Return -1 on error, else 0 */
3332static int
3333parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
3334 struct flt_conf *fconf, char **err, void *private)
3335{
3336 struct list backup_sections;
3337 struct spoe_config *conf;
3338 struct spoe_message *msg, *msgback;
3339 struct spoe_msg_placeholder *mp, *mpback;
3340 char *file = NULL, *engine = NULL;
3341 int ret, pos = *cur_arg + 1;
3342
3343 conf = calloc(1, sizeof(*conf));
3344 if (conf == NULL) {
3345 memprintf(err, "%s: out of memory", args[*cur_arg]);
3346 goto error;
3347 }
3348 conf->proxy = px;
3349
3350 while (*args[pos]) {
3351 if (!strcmp(args[pos], "config")) {
3352 if (!*args[pos+1]) {
3353 memprintf(err, "'%s' : '%s' option without value",
3354 args[*cur_arg], args[pos]);
3355 goto error;
3356 }
3357 file = args[pos+1];
3358 pos += 2;
3359 }
3360 else if (!strcmp(args[pos], "engine")) {
3361 if (!*args[pos+1]) {
3362 memprintf(err, "'%s' : '%s' option without value",
3363 args[*cur_arg], args[pos]);
3364 goto error;
3365 }
3366 engine = args[pos+1];
3367 pos += 2;
3368 }
3369 else {
3370 memprintf(err, "unknown keyword '%s'", args[pos]);
3371 goto error;
3372 }
3373 }
3374 if (file == NULL) {
3375 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3376 goto error;
3377 }
3378
3379 /* backup sections and register SPOE sections */
3380 LIST_INIT(&backup_sections);
3381 cfg_backup_sections(&backup_sections);
3382 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
3383 cfg_register_section("spoe-message", cfg_parse_spoe_message);
3384
3385 /* Parse SPOE filter configuration file */
3386 curengine = engine;
3387 curproxy = px;
3388 curagent = NULL;
3389 curmsg = NULL;
3390 ret = readcfgfile(file);
3391 curproxy = NULL;
3392
3393 /* unregister SPOE sections and restore previous sections */
3394 cfg_unregister_sections();
3395 cfg_restore_sections(&backup_sections);
3396
3397 if (ret == -1) {
3398 memprintf(err, "Could not open configuration file %s : %s",
3399 file, strerror(errno));
3400 goto error;
3401 }
3402 if (ret & (ERR_ABORT|ERR_FATAL)) {
3403 memprintf(err, "Error(s) found in configuration file %s", file);
3404 goto error;
3405 }
3406
3407 /* Check SPOE agent */
3408 if (curagent == NULL) {
3409 memprintf(err, "No SPOE agent found in file %s", file);
3410 goto error;
3411 }
3412 if (curagent->b.name == NULL) {
3413 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3414 curagent->id, curagent->conf.file, curagent->conf.line);
3415 goto error;
3416 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003417 if (curagent->timeout.hello == TICK_ETERNITY ||
3418 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003419 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003420 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3421 " | While not properly invalid, you will certainly encounter various problems\n"
3422 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003423 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003424 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3425 }
3426 if (curagent->var_pfx == NULL) {
3427 char *tmp = curagent->id;
3428
3429 while (*tmp) {
3430 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3431 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3432 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3433 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3434 goto error;
3435 }
3436 tmp++;
3437 }
3438 curagent->var_pfx = strdup(curagent->id);
3439 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01003440 if (curagent->engine_id == NULL)
3441 curagent->engine_id = generate_pseudo_uuid();
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003442
3443 if (LIST_ISEMPTY(&curmps)) {
3444 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3445 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3446 goto finish;
3447 }
3448
3449 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3450 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3451 if (!strcmp(msg->id, mp->id)) {
3452 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3453 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3454 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3455 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3456 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3457 }
3458 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3459 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3460 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3461 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3462 px->id, msg->conf.file, msg->conf.line);
3463 goto next;
3464 }
3465 if (msg->event == SPOE_EV_NONE) {
3466 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3467 px->id, msg->conf.file, msg->conf.line);
3468 goto next;
3469 }
3470 msg->agent = curagent;
3471 LIST_DEL(&msg->list);
3472 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3473 goto next;
3474 }
3475 }
3476 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3477 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3478 goto error;
3479 next:
3480 continue;
3481 }
3482
3483 finish:
3484 conf->agent = curagent;
3485 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3486 LIST_DEL(&mp->list);
3487 release_spoe_msg_placeholder(mp);
3488 }
3489 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3490 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3491 px->id, msg->id, msg->conf.file, msg->conf.line);
3492 LIST_DEL(&msg->list);
3493 release_spoe_message(msg);
3494 }
3495
3496 *cur_arg = pos;
Christopher Faulet3b386a32017-02-23 10:17:15 +01003497 fconf->id = spoe_filter_id;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003498 fconf->ops = &spoe_ops;
3499 fconf->conf = conf;
3500 return 0;
3501
3502 error:
3503 release_spoe_agent(curagent);
3504 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3505 LIST_DEL(&mp->list);
3506 release_spoe_msg_placeholder(mp);
3507 }
3508 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3509 LIST_DEL(&msg->list);
3510 release_spoe_message(msg);
3511 }
3512 free(conf);
3513 return -1;
3514}
3515
3516
3517/* Declare the filter parser for "spoe" keyword */
3518static struct flt_kw_list flt_kws = { "SPOE", { }, {
3519 { "spoe", parse_spoe_flt, NULL },
3520 { NULL, NULL, NULL },
3521 }
3522};
3523
3524__attribute__((constructor))
3525static void __spoe_init(void)
3526{
3527 flt_register_keywords(&flt_kws);
3528
3529 LIST_INIT(&curmsgs);
3530 LIST_INIT(&curmps);
3531 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
Christopher Faulet42bfa462017-01-04 14:14:19 +01003532 pool2_spoe_appctx = create_pool("spoe_appctx", sizeof(struct spoe_appctx), MEM_F_SHARED);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003533}
3534
3535__attribute__((destructor))
3536static void
3537__spoe_deinit(void)
3538{
3539 pool_destroy2(pool2_spoe_ctx);
Christopher Faulet42bfa462017-01-04 14:14:19 +01003540 pool_destroy2(pool2_spoe_appctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003541}