blob: f4da4ddd7caa82b183dd2a3a11f6f96c0916fe7c [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
Christopher Faulet48026722016-11-16 15:01:12 +010033#include <proto/freq_ctr.h>
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020034#include <proto/frontend.h>
35#include <proto/log.h>
36#include <proto/proto_http.h>
37#include <proto/proxy.h>
38#include <proto/sample.h>
39#include <proto/session.h>
40#include <proto/signal.h>
41#include <proto/stream.h>
42#include <proto/stream_interface.h>
43#include <proto/task.h>
44#include <proto/vars.h>
45
46#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
47#define SPOE_PRINTF(x...) fprintf(x)
48#else
49#define SPOE_PRINTF(x...)
50#endif
51
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020052/* Minimal size for a frame */
53#define MIN_FRAME_SIZE 256
54
Christopher Fauletea62c2a2016-11-14 10:54:21 +010055/* Flags set on the SPOE agent */
56#define SPOE_FL_CONT_ON_ERR 0x00000001 /* Do not stop events processing when an error occurred */
57
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020058/* Flags set on the SPOE context */
59#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
60#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
61#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
62#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
63
64#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
65
Christopher Fauleta1cda022016-12-21 08:58:06 +010066/* Flags set on the SPOE applet */
67#define SPOE_APPCTX_FL_PIPELINING 0x00000001 /* Set if pipelining is supported */
68#define SPOE_APPCTX_FL_ASYNC 0x00000002 /* Set if asynchronus frames is supported */
69#define SPOE_APPCTX_FL_PERSIST 0x00000004 /* Set if the applet is persistent */
70
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020071#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
72#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
73
74/* All possible states for a SPOE context */
75enum spoe_ctx_state {
76 SPOE_CTX_ST_NONE = 0,
77 SPOE_CTX_ST_READY,
78 SPOE_CTX_ST_SENDING_MSGS,
79 SPOE_CTX_ST_WAITING_ACK,
80 SPOE_CTX_ST_DONE,
81 SPOE_CTX_ST_ERROR,
82};
83
84/* All possible states for a SPOE applet */
85enum spoe_appctx_state {
86 SPOE_APPCTX_ST_CONNECT = 0,
87 SPOE_APPCTX_ST_CONNECTING,
Christopher Fauleta1cda022016-12-21 08:58:06 +010088 SPOE_APPCTX_ST_IDLE,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +020089 SPOE_APPCTX_ST_PROCESSING,
90 SPOE_APPCTX_ST_DISCONNECT,
91 SPOE_APPCTX_ST_DISCONNECTING,
92 SPOE_APPCTX_ST_EXIT,
93 SPOE_APPCTX_ST_END,
94};
95
96/* All supported SPOE actions */
97enum spoe_action_type {
98 SPOE_ACT_T_SET_VAR = 1,
99 SPOE_ACT_T_UNSET_VAR,
100 SPOE_ACT_TYPES,
101};
102
103/* All supported SPOE events */
104enum spoe_event {
105 SPOE_EV_NONE = 0,
106
107 /* Request events */
108 SPOE_EV_ON_CLIENT_SESS = 1,
109 SPOE_EV_ON_TCP_REQ_FE,
110 SPOE_EV_ON_TCP_REQ_BE,
111 SPOE_EV_ON_HTTP_REQ_FE,
112 SPOE_EV_ON_HTTP_REQ_BE,
113
114 /* Response events */
115 SPOE_EV_ON_SERVER_SESS,
116 SPOE_EV_ON_TCP_RSP,
117 SPOE_EV_ON_HTTP_RSP,
118
119 SPOE_EV_EVENTS
120};
121
Christopher Fauletb067b062017-01-04 16:39:11 +0100122/* Errors triggered by streams */
123enum spoe_context_error {
124 SPOE_CTX_ERR_NONE = 0,
125 SPOE_CTX_ERR_TOUT,
126 SPOE_CTX_ERR_RES,
127 SPOE_CTX_ERR_UNKNOWN = 255,
128 SPOE_CTX_ERRS,
129};
130
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200131/* Errors triggerd by SPOE applet */
132enum spoe_frame_error {
133 SPOE_FRM_ERR_NONE = 0,
134 SPOE_FRM_ERR_IO,
135 SPOE_FRM_ERR_TOUT,
136 SPOE_FRM_ERR_TOO_BIG,
137 SPOE_FRM_ERR_INVALID,
138 SPOE_FRM_ERR_NO_VSN,
139 SPOE_FRM_ERR_NO_FRAME_SIZE,
140 SPOE_FRM_ERR_NO_CAP,
141 SPOE_FRM_ERR_BAD_VSN,
142 SPOE_FRM_ERR_BAD_FRAME_SIZE,
143 SPOE_FRM_ERR_UNKNOWN = 99,
144 SPOE_FRM_ERRS,
145};
146
147/* Scopes used for variables set by agents. It is a way to be agnotic to vars
148 * scope. */
149enum spoe_vars_scope {
150 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
151 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
152 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
153 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
154 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
155};
156
157
158/* Describe an argument that will be linked to a message. It is a sample fetch,
159 * with an optional name. */
160struct spoe_arg {
161 char *name; /* Name of the argument, may be NULL */
162 unsigned int name_len; /* The name length, 0 if NULL */
163 struct sample_expr *expr; /* Sample expression */
164 struct list list; /* Used to chain SPOE args */
165};
166
167/* Used during the config parsing only because, when a SPOE agent section is
168 * parsed, messages can be undefined. */
169struct spoe_msg_placeholder {
170 char *id; /* SPOE message placeholder id */
171 struct list list; /* Use to chain SPOE message placeholders */
172};
173
174/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
175 * an argument list (see above) and it is linked to a specific event. */
176struct spoe_message {
Christopher Fauleta1cda022016-12-21 08:58:06 +0100177 char *id; /* SPOE message id */
178 unsigned int id_len; /* The message id length */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200179 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
180 struct {
Christopher Fauleta1cda022016-12-21 08:58:06 +0100181 char *file; /* file where the SPOE message appears */
182 int line; /* line where the SPOE message appears */
183 } conf; /* config information */
184 struct list args; /* Arguments added when the SPOE messages is sent */
185 struct list list; /* Used to chain SPOE messages */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200186
187 enum spoe_event event; /* SPOE_EV_* */
188};
189
190/* Describe a SPOE agent. */
191struct spoe_agent {
192 char *id; /* SPOE agent id (name) */
193 struct {
194 char *file; /* file where the SPOE agent appears */
195 int line; /* line where the SPOE agent appears */
196 } conf; /* config information */
197 union {
198 struct proxy *be; /* Backend used by this agent */
199 char *name; /* Backend name used during conf parsing */
200 } b;
201 struct {
Christopher Fauletf7a30922016-11-10 15:04:51 +0100202 unsigned int hello; /* Max time to receive AGENT-HELLO frame (in SPOE applet) */
203 unsigned int idle; /* Max Idle timeout (in SPOE applet) */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100204 unsigned int processing; /* Max time to process an event (in the main stream) */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200205 } timeout;
206
Christopher Fauleta1cda022016-12-21 08:58:06 +0100207 /* Config info */
208 char *engine_id; /* engine-id string */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200209 char *var_pfx; /* Prefix used for vars set by the agent */
Christopher Faulet985532d2016-11-16 15:36:19 +0100210 char *var_on_error; /* Variable to set when an error occured, in the TXN scope */
Christopher Fauletea62c2a2016-11-14 10:54:21 +0100211 unsigned int flags; /* SPOE_FL_* */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100212 unsigned int cps_max; /* Maximum # of connections per second */
213 unsigned int eps_max; /* Maximum # of errors per second */
214 unsigned int max_frame_size; /* Maximum frame size for this agent, before any negotiation */
215 unsigned int min_applets; /* Minimum # applets alive at a time */
216 unsigned int max_fpa; /* Maximum # of frames handled per applet at once */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200217
218 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
219 * for each supported events */
220
Christopher Fauleta1cda022016-12-21 08:58:06 +0100221 /* running info */
222 unsigned int applets_act; /* # of applets alive at a time */
223 unsigned int applets_idle; /* # of applets in the state SPOE_APPCTX_ST_IDLE */
224 unsigned int sending_rate; /* the global sending rate */
225
226 struct freq_ctr conn_per_sec; /* connections per second */
227 struct freq_ctr err_per_sec; /* connetion errors per second */
228
229 struct list applets; /* List of available SPOE applets */
230 struct list sending_queue; /* Queue of streams waiting to send data */
231 struct list waiting_queue; /* Queue of streams waiting for a ack, in async mode */
232
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200233};
234
235/* SPOE filter configuration */
236struct spoe_config {
237 struct proxy *proxy; /* Proxy owning the filter */
238 struct spoe_agent *agent; /* Agent used by this filter */
239 struct proxy agent_fe; /* Agent frontend */
240};
241
242/* SPOE context attached to a stream. It is the main structure that handles the
243 * processing offload */
244struct spoe_context {
245 struct filter *filter; /* The SPOE filter */
246 struct stream *strm; /* The stream that should be offloaded */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100247
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200248 struct list *messages; /* List of messages that will be sent during the stream processing */
Christopher Faulet4596fb72017-01-11 14:05:19 +0100249 struct buffer *buffer; /* Buffer used to store a encoded messages */
250 struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100251 struct list list;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200252
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200253 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
254 unsigned int flags; /* SPOE_CTX_FL_* */
Christopher Fauletb067b062017-01-04 16:39:11 +0100255 unsigned int status_code; /* SPOE_CTX_ERR_* */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200256
257 unsigned int stream_id; /* stream_id and frame_id are used */
258 unsigned int frame_id; /* to map NOTIFY and ACK frames */
Christopher Fauletf7a30922016-11-10 15:04:51 +0100259 unsigned int process_exp; /* expiration date to process an event */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200260};
261
Christopher Faulet42bfa462017-01-04 14:14:19 +0100262/* SPOE context inside a appctx */
263struct spoe_appctx {
264 struct appctx *owner; /* the owner */
265 struct task *task; /* task to handle applet timeouts */
266 struct spoe_agent *agent; /* agent on which the applet is attached */
267
268 unsigned int version; /* the negotiated version */
269 unsigned int max_frame_size; /* the negotiated max-frame-size value */
270 unsigned int flags; /* SPOE_APPCTX_FL_* */
271
Christopher Fauletb067b062017-01-04 16:39:11 +0100272 unsigned int status_code; /* SPOE_FRM_ERR_* */
Christopher Faulet4596fb72017-01-11 14:05:19 +0100273 struct buffer *buffer; /* Buffer used to store a encoded messages */
274 struct buffer_wait buffer_wait; /* position in the list of ressources waiting for a buffer */
Christopher Faulet42bfa462017-01-04 14:14:19 +0100275 struct list waiting_queue; /* list of streams waiting for a ACK frame, in sync and pipelining mode */
276 struct list list; /* next spoe appctx for the same agent */
277};
278
279#define SPOE_APPCTX(appctx) ((struct spoe_appctx *)((appctx)->ctx.spoe.ptr))
280
Christopher Faulet3b386a32017-02-23 10:17:15 +0100281/* SPOE filter id. Used to identify SPOE filters */
282const char *spoe_filter_id = "SPOE filter";
283
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200284/* Set if the handle on SIGUSR1 is registered */
285static int sighandler_registered = 0;
286
287/* proxy used during the parsing */
288struct proxy *curproxy = NULL;
289
290/* The name of the SPOE engine, used during the parsing */
291char *curengine = NULL;
292
293/* SPOE agent used during the parsing */
294struct spoe_agent *curagent = NULL;
295
296/* SPOE message used during the parsing */
297struct spoe_message *curmsg = NULL;
298
299/* list of SPOE messages and placeholders used during the parsing */
300struct list curmsgs;
301struct list curmps;
302
Christopher Faulet42bfa462017-01-04 14:14:19 +0100303/* Pools used to allocate SPOE structs */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200304static struct pool_head *pool2_spoe_ctx = NULL;
Christopher Faulet42bfa462017-01-04 14:14:19 +0100305static struct pool_head *pool2_spoe_appctx = NULL;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200306
307/* Temporary variables used to ease error processing */
308int spoe_status_code = SPOE_FRM_ERR_NONE;
309char spoe_reason[256];
310
311struct flt_ops spoe_ops;
312
Christopher Fauleta1cda022016-12-21 08:58:06 +0100313static int queue_spoe_context(struct spoe_context *ctx);
Christopher Faulet4596fb72017-01-11 14:05:19 +0100314static int acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
315static void release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200316
317/********************************************************************
318 * helper functions/globals
319 ********************************************************************/
320static void
321release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
322{
323 if (!mp)
324 return;
325 free(mp->id);
326 free(mp);
327}
328
329
330static void
331release_spoe_message(struct spoe_message *msg)
332{
333 struct spoe_arg *arg, *back;
334
335 if (!msg)
336 return;
337 free(msg->id);
338 free(msg->conf.file);
339 list_for_each_entry_safe(arg, back, &msg->args, list) {
340 release_sample_expr(arg->expr);
341 free(arg->name);
342 LIST_DEL(&arg->list);
343 free(arg);
344 }
345 free(msg);
346}
347
348static void
349release_spoe_agent(struct spoe_agent *agent)
350{
351 struct spoe_message *msg, *back;
352 int i;
353
354 if (!agent)
355 return;
356 free(agent->id);
357 free(agent->conf.file);
358 free(agent->var_pfx);
Christopher Fauleta1cda022016-12-21 08:58:06 +0100359 free(agent->engine_id);
Christopher Faulet985532d2016-11-16 15:36:19 +0100360 free(agent->var_on_error);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200361 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
362 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
363 LIST_DEL(&msg->list);
364 release_spoe_message(msg);
365 }
366 }
367 free(agent);
368}
369
370static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
371 [SPOE_FRM_ERR_NONE] = "normal",
372 [SPOE_FRM_ERR_IO] = "I/O error",
373 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
374 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
375 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
376 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
377 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
378 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
379 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
380 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
381 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
382};
383
384static const char *spoe_event_str[SPOE_EV_EVENTS] = {
385 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
386 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
387 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
388 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
389 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
390
391 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
392 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
393 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
394};
395
396
397#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
398
399static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
400 [SPOE_CTX_ST_NONE] = "NONE",
401 [SPOE_CTX_ST_READY] = "READY",
402 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
403 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
404 [SPOE_CTX_ST_DONE] = "DONE",
405 [SPOE_CTX_ST_ERROR] = "ERROR",
406};
407
408static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
409 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
410 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
Christopher Fauleta1cda022016-12-21 08:58:06 +0100411 [SPOE_APPCTX_ST_IDLE] = "IDLE",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200412 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
413 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
414 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
415 [SPOE_APPCTX_ST_EXIT] = "EXIT",
416 [SPOE_APPCTX_ST_END] = "END",
417};
418
419#endif
Christopher Fauleta1cda022016-12-21 08:58:06 +0100420
421static char *
422generate_pseudo_uuid()
423{
424 static int init = 0;
425
426 const char uuid_fmt[] = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx";
427 const char uuid_chr[] = "0123456789ABCDEF-";
428 char *uuid;
429 int i;
430
431 if ((uuid = calloc(1, sizeof(uuid_fmt))) == NULL)
432 return NULL;
433
434 if (!init) {
435 srand(now_ms);
436 init = 1;
437 }
438
439 for (i = 0; i < sizeof(uuid_fmt)-1; i++) {
440 int r = rand () % 16;
441
442 switch (uuid_fmt[i]) {
443 case 'x' : uuid[i] = uuid_chr[r]; break;
444 case 'y' : uuid[i] = uuid_chr[(r & 0x03) | 0x08]; break;
445 default : uuid[i] = uuid_fmt[i]; break;
446 }
447 }
448 return uuid;
449}
450
451static inline unsigned int
452min_applets_act(struct spoe_agent *agent)
453{
454 unsigned int nbsrv;
455
456 if (agent->min_applets)
457 return agent->min_applets;
458
459 nbsrv = (agent->b.be->srv_act ? agent->b.be->srv_act : agent->b.be->srv_bck);
460 return 2*nbsrv;
461}
462
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200463/********************************************************************
464 * Functions that encode/decode SPOE frames
465 ********************************************************************/
466/* Frame Types sent by HAProxy and by agents */
467enum spoe_frame_type {
468 /* Frames sent by HAProxy */
469 SPOE_FRM_T_HAPROXY_HELLO = 1,
470 SPOE_FRM_T_HAPROXY_DISCON,
471 SPOE_FRM_T_HAPROXY_NOTIFY,
472
473 /* Frames sent by the agents */
474 SPOE_FRM_T_AGENT_HELLO = 101,
475 SPOE_FRM_T_AGENT_DISCON,
476 SPOE_FRM_T_AGENT_ACK
477};
478
479/* All supported data types */
480enum spoe_data_type {
481 SPOE_DATA_T_NULL = 0,
482 SPOE_DATA_T_BOOL,
483 SPOE_DATA_T_INT32,
484 SPOE_DATA_T_UINT32,
485 SPOE_DATA_T_INT64,
486 SPOE_DATA_T_UINT64,
487 SPOE_DATA_T_IPV4,
488 SPOE_DATA_T_IPV6,
489 SPOE_DATA_T_STR,
490 SPOE_DATA_T_BIN,
491 SPOE_DATA_TYPES
492};
493
494/* Masks to get data type or flags value */
495#define SPOE_DATA_T_MASK 0x0F
496#define SPOE_DATA_FL_MASK 0xF0
497
498/* Flags to set Boolean values */
499#define SPOE_DATA_FL_FALSE 0x00
500#define SPOE_DATA_FL_TRUE 0x10
501
502/* Helper to get static string length, excluding the terminating null byte */
503#define SLEN(str) (sizeof(str)-1)
504
505/* Predefined key used in HELLO/DISCONNECT frames */
506#define SUPPORTED_VERSIONS_KEY "supported-versions"
507#define VERSION_KEY "version"
508#define MAX_FRAME_SIZE_KEY "max-frame-size"
509#define CAPABILITIES_KEY "capabilities"
Christopher Fauleta1cda022016-12-21 08:58:06 +0100510#define ENGINE_ID_KEY "engine-id"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100511#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200512#define STATUS_CODE_KEY "status-code"
513#define MSG_KEY "message"
514
515struct spoe_version {
516 char *str;
517 int min;
518 int max;
519};
520
521/* All supported versions */
522static struct spoe_version supported_versions[] = {
523 {"1.0", 1000, 1000},
524 {NULL, 0, 0}
525};
526
527/* Comma-separated list of supported versions */
528#define SUPPORTED_VERSIONS_VAL "1.0"
529
530/* Comma-separated list of supported capabilities (none for now) */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100531//#define CAPABILITIES_VAL ""
532#define CAPABILITIES_VAL "pipelining,async"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200533
534static int
535decode_spoe_version(const char *str, size_t len)
536{
537 char tmp[len+1], *start, *end;
538 double d;
539 int vsn = -1;
540
541 memset(tmp, 0, len+1);
542 memcpy(tmp, str, len);
543
544 start = tmp;
545 while (isspace(*start))
546 start++;
547
548 d = strtod(start, &end);
549 if (d == 0 || start == end)
550 goto out;
551
552 if (*end) {
553 while (isspace(*end))
554 end++;
555 if (*end)
556 goto out;
557 }
558 vsn = (int)(d * 1000);
559 out:
560 return vsn;
561}
562
563/* Encode a variable-length integer. This function never fails and returns the
564 * number of written bytes. */
565static int
566encode_spoe_varint(uint64_t i, char *buf)
567{
568 int idx;
569
570 if (i < 240) {
571 buf[0] = (unsigned char)i;
572 return 1;
573 }
574
575 buf[0] = (unsigned char)i | 240;
576 i = (i - 240) >> 4;
577 for (idx = 1; i >= 128; ++idx) {
578 buf[idx] = (unsigned char)i | 128;
579 i = (i - 128) >> 7;
580 }
581 buf[idx++] = (unsigned char)i;
582 return idx;
583}
584
585/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
586 * happens when the buffer's end in reached. On success, the number of read
587 * bytes is returned. */
588static int
589decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
590{
591 unsigned char *msg = (unsigned char *)buf;
592 int idx = 0;
593
594 if (msg > (unsigned char *)end)
595 return -1;
596
597 if (msg[0] < 240) {
598 *i = msg[0];
599 return 1;
600 }
601 *i = msg[0];
602 do {
603 ++idx;
604 if (msg+idx > (unsigned char *)end)
605 return -1;
606 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
607 } while (msg[idx] >= 128);
608 return (idx + 1);
609}
610
611/* Encode a string. The string will be prefix by its length, encoded as a
612 * variable-length integer. This function never fails and returns the number of
613 * written bytes. */
614static int
615encode_spoe_string(const char *str, size_t len, char *dst)
616{
617 int idx = 0;
618
619 if (!len) {
620 dst[0] = 0;
621 return 1;
622 }
623
624 idx += encode_spoe_varint(len, dst);
625 memcpy(dst+idx, str, len);
626 return (idx + len);
627}
628
629/* Decode a string. Its length is decoded first as a variable-length integer. If
630 * it succeeds, and if the string length is valid, the begin of the string is
631 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
632 * read is returned. If an error occurred, -1 is returned and <*str> remains
633 * NULL. */
634static int
635decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
636{
637 int i, idx = 0;
638
639 *str = NULL;
640 *len = 0;
641
642 if ((i = decode_spoe_varint(buf, end, len)) == -1)
643 goto error;
644 idx += i;
645 if (buf + idx + *len > end)
646 goto error;
647
648 *str = buf+idx;
649 return (idx + *len);
650
651 error:
652 return -1;
653}
654
655/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
656 * of bytes read is returned. A types data is composed of a type (1 byte) and
657 * corresponding data:
658 * - boolean: non additional data (0 bytes)
659 * - integers: a variable-length integer (see decode_spoe_varint)
660 * - ipv4: 4 bytes
661 * - ipv6: 16 bytes
662 * - binary and string: a buffer prefixed by its size, a variable-length
663 * integer (see decode_spoe_string) */
664static int
665skip_spoe_data(char *frame, char *end)
666{
667 uint64_t sz = 0;
668 int i, idx = 0;
669
670 if (frame > end)
671 return -1;
672
673 switch (frame[idx++] & SPOE_DATA_T_MASK) {
674 case SPOE_DATA_T_BOOL:
675 break;
676 case SPOE_DATA_T_INT32:
677 case SPOE_DATA_T_INT64:
678 case SPOE_DATA_T_UINT32:
679 case SPOE_DATA_T_UINT64:
680 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
681 return -1;
682 idx += i;
683 break;
684 case SPOE_DATA_T_IPV4:
685 idx += 4;
686 break;
687 case SPOE_DATA_T_IPV6:
688 idx += 16;
689 break;
690 case SPOE_DATA_T_STR:
691 case SPOE_DATA_T_BIN:
692 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
693 return -1;
694 idx += i + sz;
695 break;
696 }
697
698 if (frame+idx > end)
699 return -1;
700 return idx;
701}
702
703/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
704 * number of read bytes is returned. See skip_spoe_data for details. */
705static int
706decode_spoe_data(char *frame, char *end, struct sample *smp)
707{
708 uint64_t sz = 0;
709 int type, i, idx = 0;
710
711 if (frame > end)
712 return -1;
713
714 type = frame[idx++];
715 switch (type & SPOE_DATA_T_MASK) {
716 case SPOE_DATA_T_BOOL:
717 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
718 smp->data.type = SMP_T_BOOL;
719 break;
720 case SPOE_DATA_T_INT32:
721 case SPOE_DATA_T_INT64:
722 case SPOE_DATA_T_UINT32:
723 case SPOE_DATA_T_UINT64:
724 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
725 return -1;
726 idx += i;
727 smp->data.type = SMP_T_SINT;
728 break;
729 case SPOE_DATA_T_IPV4:
730 if (frame+idx+4 > end)
731 return -1;
732 memcpy(&smp->data.u.ipv4, frame+idx, 4);
733 smp->data.type = SMP_T_IPV4;
734 idx += 4;
735 break;
736 case SPOE_DATA_T_IPV6:
737 if (frame+idx+16 > end)
738 return -1;
739 memcpy(&smp->data.u.ipv6, frame+idx, 16);
740 smp->data.type = SMP_T_IPV6;
741 idx += 16;
742 break;
743 case SPOE_DATA_T_STR:
744 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
745 return -1;
746 idx += i;
747 if (frame+idx+sz > end)
748 return -1;
749 smp->data.u.str.str = frame+idx;
750 smp->data.u.str.len = sz;
751 smp->data.type = SMP_T_STR;
752 idx += sz;
753 break;
754 case SPOE_DATA_T_BIN:
755 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
756 return -1;
757 idx += i;
758 if (frame+idx+sz > end)
759 return -1;
760 smp->data.u.str.str = frame+idx;
761 smp->data.u.str.len = sz;
762 smp->data.type = SMP_T_BIN;
763 idx += sz;
764 break;
765 }
766
767 if (frame+idx > end)
768 return -1;
769 return idx;
770}
771
772/* Skip an action in a frame received from an agent. If an error occurred, -1 is
773 * returned, otherwise the number of read bytes is returned. An action is
774 * composed of the action type followed by a typed data. */
775static int
776skip_spoe_action(char *frame, char *end)
777{
778 int n, i, idx = 0;
779
780 if (frame+2 > end)
781 return -1;
782
783 idx++; /* Skip the action type */
784 n = frame[idx++];
785 while (n-- > 0) {
786 if ((i = skip_spoe_data(frame+idx, end)) == -1)
787 return -1;
788 idx += i;
789 }
790
791 if (frame+idx > end)
792 return -1;
793 return idx;
794}
795
796/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
797 * success, 0 if the frame can be ignored and -1 if an error occurred. */
798static int
799prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
800{
Christopher Faulet42bfa462017-01-04 14:14:19 +0100801 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200802 int idx = 0;
803 size_t max = (7 /* TYPE + METADATA */
804 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
805 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
Christopher Fauleta1cda022016-12-21 08:58:06 +0100806 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)
807 + 1 + SLEN(ENGINE_ID_KEY) + 1 + 1 + 36);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200808
Christopher Fauletb067b062017-01-04 16:39:11 +0100809 if (size < max) {
810 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200811 return -1;
Christopher Fauletb067b062017-01-04 16:39:11 +0100812 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200813
814 /* Frame type */
815 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
816
817 /* No flags for now */
818 memset(frame+idx, 0, 4);
819 idx += 4;
820
821 /* No stream-id and frame-id for HELLO frames */
822 frame[idx++] = 0;
823 frame[idx++] = 0;
824
825 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
826 * and "capabilities" */
827
828 /* "supported-versions" K/V item */
829 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
830 frame[idx++] = SPOE_DATA_T_STR;
831 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
832
833 /* "max-fram-size" K/V item */
834 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
835 frame[idx++] = SPOE_DATA_T_UINT32;
Christopher Faulet42bfa462017-01-04 14:14:19 +0100836 idx += encode_spoe_varint(SPOE_APPCTX(appctx)->max_frame_size, frame+idx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200837
838 /* "capabilities" K/V item */
839 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
840 frame[idx++] = SPOE_DATA_T_STR;
841 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
842
Christopher Fauleta1cda022016-12-21 08:58:06 +0100843 /* "engine-id" K/V item */
844 if (agent != NULL && agent->engine_id != NULL) {
845 idx += encode_spoe_string(ENGINE_ID_KEY, SLEN(ENGINE_ID_KEY), frame+idx);
846 frame[idx++] = SPOE_DATA_T_STR;
847 idx += encode_spoe_string(agent->engine_id, strlen(agent->engine_id), frame+idx);
848 }
849
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200850 return idx;
851}
852
853/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
854 * size on success, 0 if the frame can be ignored and -1 if an error
855 * occurred. */
856static int
857prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
858{
859 const char *reason;
860 int rlen, idx = 0;
861 size_t max = (7 /* TYPE + METADATA */
862 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
863 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
864
865 if (size < max)
866 return -1;
867
868 /* Get the message corresponding to the status code */
869 if (spoe_status_code >= SPOE_FRM_ERRS)
870 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
871 reason = spoe_frm_err_reasons[spoe_status_code];
872 rlen = strlen(reason);
873
874 /* Frame type */
875 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
876
877 /* No flags for now */
878 memset(frame+idx, 0, 4);
879 idx += 4;
880
881 /* No stream-id and frame-id for DISCONNECT frames */
882 frame[idx++] = 0;
883 frame[idx++] = 0;
884
885 /* There are 2 mandatory items: "status-code" and "message" */
886
887 /* "status-code" K/V item */
888 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
889 frame[idx++] = SPOE_DATA_T_UINT32;
890 idx += encode_spoe_varint(spoe_status_code, frame+idx);
891
892 /* "message" K/V item */
893 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
894 frame[idx++] = SPOE_DATA_T_STR;
895 idx += encode_spoe_string(reason, rlen, frame+idx);
896
897 return idx;
898}
899
900/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
901 * success, 0 if the frame can be ignored and -1 if an error occurred. */
902static int
Christopher Fauleta1cda022016-12-21 08:58:06 +0100903prepare_spoe_hanotify_frame(struct appctx *appctx, struct spoe_context *ctx,
904 char *frame, size_t size)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200905{
Christopher Fauleta1cda022016-12-21 08:58:06 +0100906 int idx = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200907
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200908 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
909
910 /* No flags for now */
911 memset(frame+idx, 0, 4);
912 idx += 4;
913
914 /* Set stream-id and frame-id */
915 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
916 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
917
Christopher Faulet4596fb72017-01-11 14:05:19 +0100918 /* check the buffer size */
919 if (idx + SPOE_APPCTX(appctx)->buffer->i > size) {
Christopher Fauletb067b062017-01-04 16:39:11 +0100920 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
Christopher Fauleta1cda022016-12-21 08:58:06 +0100921 return 0;
Christopher Fauletb067b062017-01-04 16:39:11 +0100922 }
Christopher Fauleta1cda022016-12-21 08:58:06 +0100923
924 /* Copy encoded messages */
Christopher Faulet4596fb72017-01-11 14:05:19 +0100925 memcpy(frame+idx, SPOE_APPCTX(appctx)->buffer->p, SPOE_APPCTX(appctx)->buffer->i);
926 idx += SPOE_APPCTX(appctx)->buffer->i;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200927
928 return idx;
929}
930
931/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
932 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
933static int
934handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
935{
Christopher Fauleta1cda022016-12-21 08:58:06 +0100936 int vsn, max_frame_size, flags;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200937 int i, idx = 0;
938 size_t min_size = (7 /* TYPE + METADATA */
939 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
940 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
941 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
942
943 /* Check frame type */
944 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
945 return 0;
946
947 if (size < min_size) {
948 spoe_status_code = SPOE_FRM_ERR_INVALID;
949 return -1;
950 }
951
952 /* Skip flags: fragmentation is not supported for now */
953 idx += 4;
954
955 /* stream-id and frame-id must be cleared */
956 if (frame[idx] != 0 || frame[idx+1] != 0) {
957 spoe_status_code = SPOE_FRM_ERR_INVALID;
958 return -1;
959 }
960 idx += 2;
961
962 /* There are 3 mandatory items: "version", "max-frame-size" and
963 * "capabilities" */
964
965 /* Loop on K/V items */
Christopher Fauleta1cda022016-12-21 08:58:06 +0100966 vsn = max_frame_size = flags = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200967 while (idx < size) {
968 char *str;
969 uint64_t sz;
970
971 /* Decode the item key */
972 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
973 if (str == NULL) {
974 spoe_status_code = SPOE_FRM_ERR_INVALID;
975 return -1;
976 }
977 /* Check "version" K/V item */
978 if (!memcmp(str, VERSION_KEY, sz)) {
979 /* The value must be a string */
980 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
981 spoe_status_code = SPOE_FRM_ERR_INVALID;
982 return -1;
983 }
984 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
985 if (str == NULL) {
986 spoe_status_code = SPOE_FRM_ERR_INVALID;
987 return -1;
988 }
989
990 vsn = decode_spoe_version(str, sz);
991 if (vsn == -1) {
992 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
993 return -1;
994 }
995 for (i = 0; supported_versions[i].str != NULL; ++i) {
996 if (vsn >= supported_versions[i].min &&
997 vsn <= supported_versions[i].max)
998 break;
999 }
1000 if (supported_versions[i].str == NULL) {
1001 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
1002 return -1;
1003 }
1004 }
1005 /* Check "max-frame-size" K/V item */
1006 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
1007 int type;
1008
1009 /* The value must be integer */
1010 type = frame[idx++];
1011 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
1012 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
1013 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1014 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1015 spoe_status_code = SPOE_FRM_ERR_INVALID;
1016 return -1;
1017 }
1018 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1019 spoe_status_code = SPOE_FRM_ERR_INVALID;
1020 return -1;
1021 }
1022 idx += i;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001023 if (sz < MIN_FRAME_SIZE || sz > SPOE_APPCTX(appctx)->max_frame_size) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001024 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
1025 return -1;
1026 }
1027 max_frame_size = sz;
1028 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001029 /* Check "capabilities" K/V item */
1030 else if (!memcmp(str, CAPABILITIES_KEY, sz)) {
1031 int i;
1032
1033 /* The value must be a string */
1034 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1035 spoe_status_code = SPOE_FRM_ERR_INVALID;
1036 return -1;
1037 }
1038 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1039 if (str == NULL)
1040 continue;
1041
1042 i = 0;
1043 while (i < sz) {
1044 char *delim;
1045
1046 /* Skip leading spaces */
1047 for (; isspace(str[i]) && i < sz; i++);
1048
1049 if (sz - i >= 10 && !strncmp(str + i, "pipelining", 10)) {
1050 i += 10;
1051 if (sz == i || isspace(str[i]) || str[i] == ',')
1052 flags |= SPOE_APPCTX_FL_PIPELINING;
1053 }
1054 else if (sz - i >= 5 && !strncmp(str + i, "async", 5)) {
1055 i += 5;
1056 if (sz == i || isspace(str[i]) || str[i] == ',')
1057 flags |= SPOE_APPCTX_FL_ASYNC;
1058 }
1059
1060 if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
1061 break;
1062 i = (delim - str) + 1;
1063 }
1064 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001065 else {
1066 /* Silently ignore unknown item */
1067 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1068 spoe_status_code = SPOE_FRM_ERR_INVALID;
1069 return -1;
1070 }
1071 idx += i;
1072 }
1073 }
1074
1075 /* Final checks */
1076 if (!vsn) {
1077 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
1078 return -1;
1079 }
1080 if (!max_frame_size) {
1081 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
1082 return -1;
1083 }
1084
Christopher Faulet42bfa462017-01-04 14:14:19 +01001085 SPOE_APPCTX(appctx)->version = (unsigned int)vsn;
1086 SPOE_APPCTX(appctx)->max_frame_size = (unsigned int)max_frame_size;
1087 SPOE_APPCTX(appctx)->flags |= flags;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001088 return idx;
1089}
1090
1091/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
1092 * bytes on success, 0 if the frame can be ignored and -1 if an error
1093 * occurred. */
1094static int
1095handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
1096{
1097 int i, idx = 0;
1098 size_t min_size = (7 /* TYPE + METADATA */
1099 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
1100 + 1 + SLEN(MSG_KEY) + 1 + 1);
1101
1102 /* Check frame type */
1103 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
1104 return 0;
1105
1106 if (size < min_size) {
1107 spoe_status_code = SPOE_FRM_ERR_INVALID;
1108 return -1;
1109 }
1110
1111 /* Skip flags: fragmentation is not supported for now */
1112 idx += 4;
1113
1114 /* stream-id and frame-id must be cleared */
1115 if (frame[idx] != 0 || frame[idx+1] != 0) {
1116 spoe_status_code = SPOE_FRM_ERR_INVALID;
1117 return -1;
1118 }
1119 idx += 2;
1120
1121 /* There are 2 mandatory items: "status-code" and "message" */
1122
1123 /* Loop on K/V items */
1124 while (idx < size) {
1125 char *str;
1126 uint64_t sz;
1127
1128 /* Decode the item key */
1129 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1130 if (str == NULL) {
1131 spoe_status_code = SPOE_FRM_ERR_INVALID;
1132 return -1;
1133 }
1134
1135 /* Check "status-code" K/V item */
1136 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
1137 int type;
1138
1139 /* The value must be an integer */
1140 type = frame[idx++];
1141 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
1142 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
1143 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
1144 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1145 spoe_status_code = SPOE_FRM_ERR_INVALID;
1146 return -1;
1147 }
1148 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1149 spoe_status_code = SPOE_FRM_ERR_INVALID;
1150 return -1;
1151 }
1152 idx += i;
1153 spoe_status_code = sz;
1154 }
1155
1156 /* Check "message" K/V item */
1157 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1158 /* The value must be a string */
1159 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1160 spoe_status_code = SPOE_FRM_ERR_INVALID;
1161 return -1;
1162 }
1163 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1164 if (str == NULL || sz > 255) {
1165 spoe_status_code = SPOE_FRM_ERR_INVALID;
1166 return -1;
1167 }
1168 memcpy(spoe_reason, str, sz);
1169 spoe_reason[sz] = 0;
1170 }
1171 else {
1172 /* Silently ignore unknown item */
1173 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1174 spoe_status_code = SPOE_FRM_ERR_INVALID;
1175 return -1;
1176 }
1177 idx += i;
1178 }
1179 }
1180
1181 return idx;
1182}
1183
1184
Christopher Fauleta1cda022016-12-21 08:58:06 +01001185/* Decode ACK frame sent by an agent. It returns the number of read bytes on
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001186 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1187static int
1188handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1189{
Christopher Faulet42bfa462017-01-04 14:14:19 +01001190 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001191 struct spoe_context *ctx, *back;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001192 uint64_t stream_id, frame_id;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001193 int i, idx = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001194 size_t min_size = (7 /* TYPE + METADATA */);
1195
1196 /* Check frame type */
1197 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1198 return 0;
1199
1200 if (size < min_size) {
1201 spoe_status_code = SPOE_FRM_ERR_INVALID;
1202 return -1;
1203 }
1204
1205 /* Skip flags: fragmentation is not supported for now */
1206 idx += 4;
1207
1208 /* Get the stream-id and the frame-id */
Christopher Fauleta1cda022016-12-21 08:58:06 +01001209 if ((i = decode_spoe_varint(frame+idx, frame+size, &stream_id)) == -1)
1210 return 0;
1211 idx += i;
1212 if ((i= decode_spoe_varint(frame+idx, frame+size, &frame_id)) == -1)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001213 return 0;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001214 idx += i;
1215
Christopher Faulet42bfa462017-01-04 14:14:19 +01001216 if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001217 list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
1218 if (ctx->stream_id == (unsigned int)stream_id &&
1219 ctx->frame_id == (unsigned int)frame_id)
1220 goto found;
1221 }
1222 }
1223 else {
Christopher Faulet42bfa462017-01-04 14:14:19 +01001224 list_for_each_entry_safe(ctx, back, &SPOE_APPCTX(appctx)->waiting_queue, list) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001225 if (ctx->stream_id == (unsigned int)stream_id &&
1226 ctx->frame_id == (unsigned int)frame_id)
1227 goto found;
1228 }
1229 }
1230
1231 /* No Stream found, ignore the frame */
1232 return 0;
1233
1234 found:
Christopher Faulet4596fb72017-01-11 14:05:19 +01001235 if (!acquire_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait))
Christopher Fauleta1cda022016-12-21 08:58:06 +01001236 return 1; /* Retry later */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001237
Christopher Faulet4596fb72017-01-11 14:05:19 +01001238 /* Transfer the buffer ownership to the SPOE context */
1239 ctx->buffer = SPOE_APPCTX(appctx)->buffer;
1240 SPOE_APPCTX(appctx)->buffer = &buf_empty;
1241
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001242 /* Copy encoded actions */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001243 memcpy(ctx->buffer->p, frame+idx, size-idx);
1244 ctx->buffer->i = size-idx;
1245
Christopher Fauleta1cda022016-12-21 08:58:06 +01001246 /* Notify the stream */
1247 LIST_DEL(&ctx->list);
1248 LIST_INIT(&ctx->list);
1249 ctx->state = SPOE_CTX_ST_DONE;
1250 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1251
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001252 return idx;
1253}
1254
Christopher Fauletba7bc162016-11-07 21:07:38 +01001255/* This function is used in cfgparse.c and declared in proto/checks.h. It
1256 * prepare the request to send to agents during a healthcheck. It returns 0 on
1257 * success and -1 if an error occurred. */
1258int
1259prepare_spoe_healthcheck_request(char **req, int *len)
1260{
Christopher Faulet42bfa462017-01-04 14:14:19 +01001261 struct appctx appctx;
1262 struct spoe_appctx spoe_appctx;
1263 char *frame, buf[global.tune.bufsize];
1264 unsigned int framesz;
1265 int idx;
Christopher Fauletba7bc162016-11-07 21:07:38 +01001266
Christopher Faulet42bfa462017-01-04 14:14:19 +01001267 memset(&appctx, 0, sizeof(appctx));
1268 memset(&spoe_appctx, 0, sizeof(spoe_appctx));
Christopher Fauletba7bc162016-11-07 21:07:38 +01001269 memset(buf, 0, sizeof(buf));
Christopher Faulet42bfa462017-01-04 14:14:19 +01001270
1271 appctx.ctx.spoe.ptr = &spoe_appctx;
1272 SPOE_APPCTX(&appctx)->max_frame_size = global.tune.bufsize-4;
Christopher Fauletba7bc162016-11-07 21:07:38 +01001273
1274 frame = buf+4;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001275 idx = prepare_spoe_hahello_frame(&appctx, frame, global.tune.bufsize-4);
Christopher Fauletba7bc162016-11-07 21:07:38 +01001276 if (idx <= 0)
1277 return -1;
1278 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1279 return -1;
1280
1281 /* "healthcheck" K/V item */
1282 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1283 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1284
1285 framesz = htonl(idx);
1286 memcpy(buf, (char *)&framesz, 4);
1287
1288 if ((*req = malloc(idx+4)) == NULL)
1289 return -1;
1290 memcpy(*req, buf, idx+4);
1291 *len = idx+4;
1292 return 0;
1293}
1294
1295/* This function is used in checks.c and declared in proto/checks.h. It decode
1296 * the response received from an agent during a healthcheck. It returns 0 on
1297 * success and -1 if an error occurred. */
1298int
1299handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1300{
Christopher Faulet42bfa462017-01-04 14:14:19 +01001301 struct appctx appctx;
1302 struct spoe_appctx spoe_appctx;
1303 int r;
Christopher Fauletba7bc162016-11-07 21:07:38 +01001304
Christopher Faulet42bfa462017-01-04 14:14:19 +01001305 memset(&appctx, 0, sizeof(appctx));
1306 memset(&spoe_appctx, 0, sizeof(spoe_appctx));
Christopher Fauletba7bc162016-11-07 21:07:38 +01001307
Christopher Faulet42bfa462017-01-04 14:14:19 +01001308 appctx.ctx.spoe.ptr = &spoe_appctx;
1309 SPOE_APPCTX(&appctx)->max_frame_size = global.tune.bufsize-4;
1310
1311 if (handle_spoe_agentdiscon_frame(&appctx, frame, size) != 0)
Christopher Fauletba7bc162016-11-07 21:07:38 +01001312 goto error;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001313 if ((r = handle_spoe_agenthello_frame(&appctx, frame, size)) <= 0) {
Christopher Fauletba7bc162016-11-07 21:07:38 +01001314 if (r == 0)
1315 spoe_status_code = SPOE_FRM_ERR_INVALID;
1316 goto error;
1317 }
1318
1319 return 0;
1320
1321 error:
1322 if (spoe_status_code >= SPOE_FRM_ERRS)
1323 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1324 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1325 return -1;
1326}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001327
Christopher Fauleta1cda022016-12-21 08:58:06 +01001328/* Send a SPOE frame to an agent. It returns -1 when an error occurred, 0 when
1329 * the frame can be ignored, 1 to retry later, and the frame legnth on
1330 * success. */
1331static int
1332send_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
1333{
1334 struct stream_interface *si = appctx->owner;
1335 int ret;
1336 uint32_t netint;
1337
1338 if (si_ic(si)->buf == &buf_empty)
1339 return 1;
1340
1341 netint = htonl(framesz);
1342 memcpy(buf, (char *)&netint, 4);
1343 ret = bi_putblk(si_ic(si), buf, framesz+4);
1344
1345 if (ret <= 0) {
1346 if (ret == -1)
1347 return 1; /* retry */
Christopher Fauletb067b062017-01-04 16:39:11 +01001348 spoe_status_code = SPOE_FRM_ERR_IO;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001349 return -1; /* error */
1350 }
1351 return framesz;
1352}
1353
1354/* Receive a SPOE frame from an agent. It return -1 when an error occurred, 0
1355 * when the frame can be ignored, 1 to retry later and the frame length on
1356 * success. */
1357static int
1358recv_spoe_frame(struct appctx *appctx, char *buf, size_t framesz)
1359{
1360 struct stream_interface *si = appctx->owner;
1361 int ret;
1362 uint32_t netint;
1363
1364 if (si_oc(si)->buf == &buf_empty)
1365 return 1;
1366
1367 ret = bo_getblk(si_oc(si), (char *)&netint, 4, 0);
1368 if (ret > 0) {
1369 framesz = ntohl(netint);
Christopher Faulet42bfa462017-01-04 14:14:19 +01001370 if (framesz > SPOE_APPCTX(appctx)->max_frame_size) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001371 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1372 return -1;
1373 }
1374 ret = bo_getblk(si_oc(si), trash.str, framesz, 4);
1375 }
1376 if (ret <= 0) {
1377 if (ret == 0)
1378 return 1; /* retry */
1379 spoe_status_code = SPOE_FRM_ERR_IO;
1380 return -1; /* error */
1381 }
1382 return framesz;
1383}
1384
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001385/********************************************************************
1386 * Functions that manage the SPOE applet
1387 ********************************************************************/
Christopher Faulet4596fb72017-01-11 14:05:19 +01001388static int
1389wakeup_spoe_appctx(struct appctx *appctx)
1390{
1391 si_applet_want_get(appctx->owner);
1392 si_applet_want_put(appctx->owner);
1393 appctx_wakeup(appctx);
1394 return 1;
1395}
1396
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001397/* Callback function that catches applet timeouts. If a timeout occurred, we set
1398 * <appctx->st1> flag and the SPOE applet is woken up. */
1399static struct task *
1400process_spoe_applet(struct task * task)
1401{
1402 struct appctx *appctx = task->context;
1403
1404 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1405 if (tick_is_expired(task->expire, now_ms)) {
1406 task->expire = TICK_ETERNITY;
1407 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1408 }
Christopher Faulet4596fb72017-01-11 14:05:19 +01001409 wakeup_spoe_appctx(appctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001410 return task;
1411}
1412
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001413/* Callback function that releases a SPOE applet. This happens when the
1414 * connection with the agent is closed. */
1415static void
1416release_spoe_applet(struct appctx *appctx)
1417{
1418 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001419 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001420 struct spoe_context *ctx, *back;
1421
1422 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1423 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1424 __FUNCTION__, appctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001425
Christopher Fauleta1cda022016-12-21 08:58:06 +01001426 agent->applets_act--;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001427 if (!LIST_ISEMPTY(&SPOE_APPCTX(appctx)->list)) {
1428 LIST_DEL(&SPOE_APPCTX(appctx)->list);
1429 LIST_INIT(&SPOE_APPCTX(appctx)->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001430 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001431
1432 if (appctx->st0 != SPOE_APPCTX_ST_END) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001433 if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
1434 agent->applets_idle--;
1435
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001436 si_shutw(si);
1437 si_shutr(si);
1438 si_ic(si)->flags |= CF_READ_NULL;
1439 appctx->st0 = SPOE_APPCTX_ST_END;
Christopher Fauletb067b062017-01-04 16:39:11 +01001440 if (SPOE_APPCTX(appctx)->status_code == SPOE_FRM_ERR_NONE)
1441 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_IO;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001442 }
1443
Christopher Faulet42bfa462017-01-04 14:14:19 +01001444 if (SPOE_APPCTX(appctx)->task) {
1445 task_delete(SPOE_APPCTX(appctx)->task);
1446 task_free(SPOE_APPCTX(appctx)->task);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001447 }
1448
Christopher Faulet42bfa462017-01-04 14:14:19 +01001449 list_for_each_entry_safe(ctx, back, &SPOE_APPCTX(appctx)->waiting_queue, list) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001450 LIST_DEL(&ctx->list);
1451 LIST_INIT(&ctx->list);
1452 ctx->state = SPOE_CTX_ST_ERROR;
Christopher Fauletb067b062017-01-04 16:39:11 +01001453 ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001454 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001455 }
1456
Christopher Faulet4596fb72017-01-11 14:05:19 +01001457 release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
Christopher Faulet42bfa462017-01-04 14:14:19 +01001458 pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
1459
Christopher Fauleta1cda022016-12-21 08:58:06 +01001460 if (!LIST_ISEMPTY(&agent->applets))
1461 return;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001462
Christopher Fauleta1cda022016-12-21 08:58:06 +01001463 list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
1464 LIST_DEL(&ctx->list);
1465 LIST_INIT(&ctx->list);
1466 ctx->state = SPOE_CTX_ST_ERROR;
Christopher Fauletb067b062017-01-04 16:39:11 +01001467 ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001468 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001469 }
1470
Christopher Fauleta1cda022016-12-21 08:58:06 +01001471 list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
1472 LIST_DEL(&ctx->list);
1473 LIST_INIT(&ctx->list);
1474 ctx->state = SPOE_CTX_ST_ERROR;
Christopher Fauletb067b062017-01-04 16:39:11 +01001475 ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001476 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1477 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001478}
1479
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001480static int
Christopher Fauleta1cda022016-12-21 08:58:06 +01001481handle_connect_spoe_applet(struct appctx *appctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001482{
Christopher Fauleta1cda022016-12-21 08:58:06 +01001483 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001484 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001485 char *frame = trash.str;
1486 int ret;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001487
Christopher Fauleta1cda022016-12-21 08:58:06 +01001488 if (si->state <= SI_ST_CON) {
1489 si_applet_want_put(si);
1490 task_wakeup(si_strm(si)->task, TASK_WOKEN_MSG);
1491 goto stop;
1492 }
Christopher Fauletb067b062017-01-04 16:39:11 +01001493 if (si->state != SI_ST_EST) {
1494 spoe_status_code = SPOE_FRM_ERR_IO;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001495 goto exit;
Christopher Fauletb067b062017-01-04 16:39:11 +01001496 }
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001497
Christopher Fauleta1cda022016-12-21 08:58:06 +01001498 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1499 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
1500 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
Christopher Fauletb067b062017-01-04 16:39:11 +01001501 spoe_status_code = SPOE_FRM_ERR_TOUT;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001502 goto exit;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001503 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001504
Christopher Faulet42bfa462017-01-04 14:14:19 +01001505 if (SPOE_APPCTX(appctx)->task->expire == TICK_ETERNITY)
1506 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001507
Christopher Faulet42bfa462017-01-04 14:14:19 +01001508 ret = prepare_spoe_hahello_frame(appctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001509 if (ret > 1)
1510 ret = send_spoe_frame(appctx, frame, ret);
1511
1512 switch (ret) {
1513 case -1: /* error */
1514 goto exit;
1515
1516 case 0: /* ignore => an error, cannot be ignored */
1517 goto exit;
1518
1519 case 1: /* retry later */
1520 si_applet_cant_put(si);
1521 goto stop;
1522
1523 default: /* CONNECT frame successfully sent */
1524 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1525 goto next;
1526 }
1527
1528 next:
1529 return 0;
1530 stop:
1531 return 1;
1532 exit:
Christopher Fauletb067b062017-01-04 16:39:11 +01001533 SPOE_APPCTX(appctx)->status_code = spoe_status_code;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001534 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1535 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001536}
1537
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001538static int
Christopher Fauleta1cda022016-12-21 08:58:06 +01001539handle_connecting_spoe_applet(struct appctx *appctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001540{
Christopher Fauleta1cda022016-12-21 08:58:06 +01001541 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001542 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001543 char *frame = trash.str;
1544 int ret, framesz = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001545
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001546
Christopher Fauletb067b062017-01-04 16:39:11 +01001547 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1548 spoe_status_code = SPOE_FRM_ERR_IO;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001549 goto exit;
Christopher Fauletb067b062017-01-04 16:39:11 +01001550 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001551
Christopher Fauleta1cda022016-12-21 08:58:06 +01001552 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1553 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p - Connection timed out\n",
1554 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, appctx);
Christopher Fauletb067b062017-01-04 16:39:11 +01001555 spoe_status_code = SPOE_FRM_ERR_TOUT;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001556 goto exit;
1557 }
1558
Christopher Faulet42bfa462017-01-04 14:14:19 +01001559 ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001560 if (ret > 1) {
1561 if (*frame == SPOE_FRM_T_AGENT_DISCON) {
1562 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1563 goto next;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001564 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001565 framesz = ret;
1566 ret = handle_spoe_agenthello_frame(appctx, frame, framesz);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001567 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001568
Christopher Fauleta1cda022016-12-21 08:58:06 +01001569 switch (ret) {
1570 case -1: /* error */
1571 if (framesz)
1572 bo_skip(si_oc(si), framesz+4);
1573 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1574 goto next;
1575
1576 case 0: /* ignore */
1577 if (framesz)
1578 bo_skip(si_oc(si), framesz+4);
1579 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1580 goto next;
1581
1582 case 1: /* retry later */
1583 goto stop;
1584
1585 default:
1586 /* hello handshake is finished, set the idle timeout,
1587 * Add the appctx in the agent cache, decrease the
1588 * number of new applets and wake up waiting streams. */
1589 if (framesz)
1590 bo_skip(si_oc(si), framesz+4);
1591 agent->applets_idle++;
1592 appctx->st0 = SPOE_APPCTX_ST_IDLE;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001593 LIST_DEL(&SPOE_APPCTX(appctx)->list);
1594 LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001595 goto next;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001596 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001597
Christopher Fauleta1cda022016-12-21 08:58:06 +01001598 next:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001599 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001600 return 0;
1601 stop:
1602 return 1;
1603 exit:
Christopher Fauletb067b062017-01-04 16:39:11 +01001604 SPOE_APPCTX(appctx)->status_code = spoe_status_code;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001605 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1606 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001607}
1608
Christopher Fauleta1cda022016-12-21 08:58:06 +01001609static int
1610handle_processing_spoe_applet(struct appctx *appctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001611{
1612 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001613 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001614 struct spoe_context *ctx;
1615 char *frame = trash.str;
1616 unsigned int fpa = 0;
1617 int ret, framesz = 0, skip_sending = 0, skip_receiving = 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001618
Christopher Fauletb067b062017-01-04 16:39:11 +01001619 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1620 spoe_status_code = SPOE_FRM_ERR_IO;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001621 goto exit;
Christopher Fauletb067b062017-01-04 16:39:11 +01001622 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001623
Christopher Fauleta1cda022016-12-21 08:58:06 +01001624 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1625 spoe_status_code = SPOE_FRM_ERR_TOUT;
1626 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1627 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1628 goto next;
1629 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001630
Christopher Fauleta1cda022016-12-21 08:58:06 +01001631 process:
1632 if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
1633 goto stop;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001634
Christopher Fauleta1cda022016-12-21 08:58:06 +01001635 /* Frames must be handled synchronously and a the applet is waiting for
1636 * a ACK frame */
Christopher Faulet42bfa462017-01-04 14:14:19 +01001637 if (!(SPOE_APPCTX(appctx)->flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) &&
1638 !LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001639 if (skip_receiving)
1640 goto stop;
1641 goto recv_frame;
1642 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001643
Christopher Fauleta1cda022016-12-21 08:58:06 +01001644 if (LIST_ISEMPTY(&agent->sending_queue) || skip_sending) {
1645 skip_sending = 1;
1646 goto recv_frame;
1647 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001648
Christopher Fauleta1cda022016-12-21 08:58:06 +01001649 ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
Christopher Faulet4596fb72017-01-11 14:05:19 +01001650
1651 /* Transfer the buffer ownership to the SPOE appctx */
1652 SPOE_APPCTX(appctx)->buffer = ctx->buffer;
1653 ctx->buffer = &buf_empty;
1654
Christopher Faulet42bfa462017-01-04 14:14:19 +01001655 ret = prepare_spoe_hanotify_frame(appctx, ctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001656 if (ret > 1)
1657 ret = send_spoe_frame(appctx, frame, ret);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001658
Christopher Fauleta1cda022016-12-21 08:58:06 +01001659 switch (ret) {
1660 case -1: /* error */
1661 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1662 goto next;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001663
Christopher Fauleta1cda022016-12-21 08:58:06 +01001664 case 0: /* ignore */
1665 agent->sending_rate++;
1666 ctx->state = SPOE_CTX_ST_ERROR;
Christopher Fauletb067b062017-01-04 16:39:11 +01001667 ctx->status_code = (spoe_status_code + 0x100);
Christopher Faulet4596fb72017-01-11 14:05:19 +01001668 release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001669 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1670 LIST_DEL(&ctx->list);
1671 LIST_INIT(&ctx->list);
1672 fpa++;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001673 break;
1674
Christopher Fauleta1cda022016-12-21 08:58:06 +01001675 case 1: /* retry */
1676 si_applet_cant_put(si);
1677 skip_sending = 1;
1678 break;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001679
Christopher Fauleta1cda022016-12-21 08:58:06 +01001680 default:
1681 agent->sending_rate++;
1682 ctx->state = SPOE_CTX_ST_WAITING_ACK;
Christopher Faulet4596fb72017-01-11 14:05:19 +01001683 release_spoe_buffer(&SPOE_APPCTX(appctx)->buffer, &SPOE_APPCTX(appctx)->buffer_wait);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001684 LIST_DEL(&ctx->list);
1685 LIST_INIT(&ctx->list);
Christopher Faulet42bfa462017-01-04 14:14:19 +01001686 if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC)
Christopher Fauleta1cda022016-12-21 08:58:06 +01001687 LIST_ADDQ(&agent->waiting_queue, &ctx->list);
1688 else
Christopher Faulet42bfa462017-01-04 14:14:19 +01001689 LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001690 fpa++;
1691 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001692
Christopher Fauleta1cda022016-12-21 08:58:06 +01001693 if (fpa > agent->max_fpa)
1694 goto stop;
1695
1696 recv_frame:
1697 if (skip_receiving)
1698 goto process;
1699
1700 framesz = 0;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001701 ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001702 if (ret > 1) {
1703 if (*frame == SPOE_FRM_T_AGENT_DISCON) {
1704 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1705 goto next;
1706 }
1707 framesz = ret;
1708 ret = handle_spoe_agentack_frame(appctx, frame, framesz);
1709 }
1710
1711 switch (ret) {
1712 case -1: /* error */
1713 if (framesz)
1714 bo_skip(si_oc(si), framesz+4);
1715 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1716 goto next;
1717
1718 case 0: /* ignore */
1719 if (framesz)
1720 bo_skip(si_oc(si), framesz+4);
1721 fpa++;
1722 break;
1723
1724 case 1: /* retry */
1725 skip_receiving = 1;
1726 break;
1727
1728 default:
1729 if (framesz)
1730 bo_skip(si_oc(si), framesz+4);
1731 fpa++;
1732 }
1733 goto process;
1734
1735 next:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001736 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001737 return 0;
1738 stop:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001739 if ((SPOE_APPCTX(appctx)->flags & (SPOE_APPCTX_FL_ASYNC|SPOE_APPCTX_FL_PIPELINING)) ||
1740 LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01001741 agent->applets_idle++;
1742 appctx->st0 = SPOE_APPCTX_ST_IDLE;
1743 }
Christopher Faulet42bfa462017-01-04 14:14:19 +01001744 if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) {
1745 LIST_DEL(&SPOE_APPCTX(appctx)->list);
1746 LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001747 if (fpa)
Christopher Faulet42bfa462017-01-04 14:14:19 +01001748 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001749 }
1750 return 1;
1751
1752 exit:
Christopher Fauletb067b062017-01-04 16:39:11 +01001753 SPOE_APPCTX(appctx)->status_code = spoe_status_code;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001754 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1755 return 0;
1756}
1757
1758static int
1759handle_disconnect_spoe_applet(struct appctx *appctx)
1760{
1761 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001762 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001763 char *frame = trash.str;
1764 int ret;
1765
Christopher Fauletb067b062017-01-04 16:39:11 +01001766 SPOE_APPCTX(appctx)->status_code = spoe_status_code;
1767
Christopher Fauleta1cda022016-12-21 08:58:06 +01001768 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO)
1769 goto exit;
1770
1771 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT)
1772 goto exit;
1773
Christopher Faulet42bfa462017-01-04 14:14:19 +01001774 ret = prepare_spoe_hadiscon_frame(appctx, frame+4, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001775 if (ret > 1)
1776 ret = send_spoe_frame(appctx, frame, ret);
1777
1778 switch (ret) {
1779 case -1: /* error */
1780 goto exit;
1781
1782 case 0: /* ignore */
1783 goto exit;
1784
1785 case 1: /* retry */
1786 si_applet_cant_put(si);
1787 goto stop;
1788
1789 default:
1790 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1791 " - disconnected by HAProxy (%d): %s\n",
1792 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1793 __FUNCTION__, appctx, spoe_status_code,
1794 spoe_frm_err_reasons[spoe_status_code]);
1795
1796 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1797 goto next;
1798 }
1799
1800 next:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001801 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001802 return 0;
1803 stop:
1804 return 1;
1805 exit:
1806 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1807 return 0;
1808}
1809
1810static int
1811handle_disconnecting_spoe_applet(struct appctx *appctx)
1812{
1813 struct stream_interface *si = appctx->owner;
1814 char *frame = trash.str;
1815 int ret, framesz = 0;
1816
Christopher Fauletb067b062017-01-04 16:39:11 +01001817 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1818 spoe_status_code = SPOE_FRM_ERR_IO;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001819 goto exit;
Christopher Fauletb067b062017-01-04 16:39:11 +01001820 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001821
Christopher Fauletb067b062017-01-04 16:39:11 +01001822 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1823 spoe_status_code = SPOE_FRM_ERR_TOUT;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001824 goto exit;
Christopher Fauletb067b062017-01-04 16:39:11 +01001825 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001826
1827 framesz = 0;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001828 ret = recv_spoe_frame(appctx, frame, SPOE_APPCTX(appctx)->max_frame_size);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001829 if (ret > 1) {
1830 framesz = ret;
1831 ret = handle_spoe_agentdiscon_frame(appctx, frame, framesz);
1832 }
1833
1834 switch (ret) {
1835 case -1: /* error */
1836 if (framesz)
1837 bo_skip(si_oc(si), framesz+4);
1838 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1839 " - error on frame (%s)\n",
1840 (int)now.tv_sec, (int)now.tv_usec,
Christopher Faulet42bfa462017-01-04 14:14:19 +01001841 ((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id,
Christopher Fauleta1cda022016-12-21 08:58:06 +01001842 __FUNCTION__, appctx,
1843 spoe_frm_err_reasons[spoe_status_code]);
1844 goto exit;
1845
1846 case 0: /* ignore */
1847 if (framesz)
1848 bo_skip(si_oc(si), framesz+4);
1849 goto next;
1850
1851 case 1: /* retry */
1852 goto stop;
1853
1854 default:
1855 if (framesz)
1856 bo_skip(si_oc(si), framesz+4);
1857 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1858 " - disconnected by peer (%d): %s\n",
1859 (int)now.tv_sec, (int)now.tv_usec,
Christopher Faulet42bfa462017-01-04 14:14:19 +01001860 ((struct spoe_agent *)SPOE_APPCTX(appctx)->agent)->id,
Christopher Fauleta1cda022016-12-21 08:58:06 +01001861 __FUNCTION__, appctx, spoe_status_code,
1862 spoe_reason);
1863 goto exit;
1864 }
1865
1866 next:
1867 return 0;
1868 stop:
1869 return 1;
1870 exit:
Christopher Fauletb067b062017-01-04 16:39:11 +01001871 if (SPOE_APPCTX(appctx)->status_code == SPOE_FRM_ERR_NONE)
1872 SPOE_APPCTX(appctx)->status_code = spoe_status_code;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001873 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1874 return 0;
1875}
1876
1877/* I/O Handler processing messages exchanged with the agent */
1878static void
1879handle_spoe_applet(struct appctx *appctx)
1880{
1881 struct stream_interface *si = appctx->owner;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001882 struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001883
Christopher Fauletb067b062017-01-04 16:39:11 +01001884 spoe_status_code = SPOE_FRM_ERR_NONE;
1885
Christopher Fauleta1cda022016-12-21 08:58:06 +01001886 switchstate:
1887 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1888 " - appctx-state=%s\n",
1889 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1890 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1891
1892 switch (appctx->st0) {
1893 case SPOE_APPCTX_ST_CONNECT:
Christopher Fauleta1cda022016-12-21 08:58:06 +01001894 if (handle_connect_spoe_applet(appctx))
1895 goto out;
1896 goto switchstate;
1897
1898 case SPOE_APPCTX_ST_CONNECTING:
1899 if (handle_connecting_spoe_applet(appctx))
1900 goto out;
1901 goto switchstate;
1902
1903 case SPOE_APPCTX_ST_IDLE:
1904 if (stopping &&
1905 LIST_ISEMPTY(&agent->sending_queue) &&
Christopher Faulet42bfa462017-01-04 14:14:19 +01001906 LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
1907 SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
Christopher Fauleta1cda022016-12-21 08:58:06 +01001908 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001909 goto switchstate;
1910 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001911 agent->applets_idle--;
1912 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1913 /* fall through */
1914
1915 case SPOE_APPCTX_ST_PROCESSING:
1916 if (handle_processing_spoe_applet(appctx))
1917 goto out;
1918 goto switchstate;
1919
1920 case SPOE_APPCTX_ST_DISCONNECT:
1921 if (handle_disconnect_spoe_applet(appctx))
1922 goto out;
1923 goto switchstate;
1924
1925 case SPOE_APPCTX_ST_DISCONNECTING:
1926 if (handle_disconnecting_spoe_applet(appctx))
1927 goto out;
1928 goto switchstate;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001929
1930 case SPOE_APPCTX_ST_EXIT:
1931 si_shutw(si);
1932 si_shutr(si);
1933 si_ic(si)->flags |= CF_READ_NULL;
1934 appctx->st0 = SPOE_APPCTX_ST_END;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001935 SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001936 /* fall through */
1937
1938 case SPOE_APPCTX_ST_END:
Christopher Fauleta73e59b2016-12-09 17:30:18 +01001939 return;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001940 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01001941 out:
Christopher Faulet42bfa462017-01-04 14:14:19 +01001942 if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY)
1943 task_queue(SPOE_APPCTX(appctx)->task);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001944 si_oc(si)->flags |= CF_READ_DONTWAIT;
1945 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001946}
1947
1948struct applet spoe_applet = {
1949 .obj_type = OBJ_TYPE_APPLET,
1950 .name = "<SPOE>", /* used for logging */
1951 .fct = handle_spoe_applet,
1952 .release = release_spoe_applet,
1953};
1954
1955/* Create a SPOE applet. On success, the created applet is returned, else
1956 * NULL. */
1957static struct appctx *
1958create_spoe_appctx(struct spoe_config *conf)
1959{
1960 struct appctx *appctx;
1961 struct session *sess;
1962 struct task *task;
1963 struct stream *strm;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001964
1965 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1966 goto out_error;
1967
Christopher Faulet42bfa462017-01-04 14:14:19 +01001968 appctx->ctx.spoe.ptr = pool_alloc_dirty(pool2_spoe_appctx);
1969 if (SPOE_APPCTX(appctx) == NULL)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001970 goto out_free_appctx;
Christopher Fauleta1cda022016-12-21 08:58:06 +01001971
Christopher Faulet42bfa462017-01-04 14:14:19 +01001972 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1973 if ((SPOE_APPCTX(appctx)->task = task_new()) == NULL)
1974 goto out_free_spoe_appctx;
1975
1976 SPOE_APPCTX(appctx)->owner = appctx;
1977 SPOE_APPCTX(appctx)->task->process = process_spoe_applet;
1978 SPOE_APPCTX(appctx)->task->expire = TICK_ETERNITY;
1979 SPOE_APPCTX(appctx)->task->context = appctx;
1980 SPOE_APPCTX(appctx)->agent = conf->agent;
1981 SPOE_APPCTX(appctx)->version = 0;
1982 SPOE_APPCTX(appctx)->max_frame_size = conf->agent->max_frame_size;
1983 SPOE_APPCTX(appctx)->flags = 0;
Christopher Fauletb067b062017-01-04 16:39:11 +01001984 SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE;
Christopher Faulet4596fb72017-01-11 14:05:19 +01001985 SPOE_APPCTX(appctx)->buffer = &buf_empty;
1986
1987 LIST_INIT(&SPOE_APPCTX(appctx)->buffer_wait.list);
1988 SPOE_APPCTX(appctx)->buffer_wait.target = appctx;
1989 SPOE_APPCTX(appctx)->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_appctx;
Christopher Faulet42bfa462017-01-04 14:14:19 +01001990
1991 LIST_INIT(&SPOE_APPCTX(appctx)->list);
1992 LIST_INIT(&SPOE_APPCTX(appctx)->waiting_queue);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001993
Willy Tarreau5820a362016-12-22 15:59:02 +01001994 sess = session_new(&conf->agent_fe, NULL, &appctx->obj_type);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001995 if (!sess)
1996 goto out_free_spoe;
1997
1998 if ((task = task_new()) == NULL)
1999 goto out_free_sess;
2000
2001 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
2002 goto out_free_task;
2003
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002004 stream_set_backend(strm, conf->agent->b.be);
2005
2006 /* applet is waiting for data */
2007 si_applet_cant_get(&strm->si[0]);
2008 appctx_wakeup(appctx);
2009
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002010 strm->do_log = NULL;
2011 strm->res.flags |= CF_READ_DONTWAIT;
2012
2013 conf->agent_fe.feconn++;
2014 jobs++;
2015 totalconn++;
2016
Christopher Faulet42bfa462017-01-04 14:14:19 +01002017 task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
2018 LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002019 conf->agent->applets_act++;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002020 return appctx;
2021
2022 /* Error unrolling */
2023 out_free_task:
2024 task_free(task);
2025 out_free_sess:
2026 session_free(sess);
2027 out_free_spoe:
Christopher Faulet42bfa462017-01-04 14:14:19 +01002028 task_free(SPOE_APPCTX(appctx)->task);
2029 out_free_spoe_appctx:
2030 pool_free2(pool2_spoe_appctx, SPOE_APPCTX(appctx));
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002031 out_free_appctx:
2032 appctx_free(appctx);
2033 out_error:
2034 return NULL;
2035}
2036
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002037static int
Christopher Fauleta1cda022016-12-21 08:58:06 +01002038queue_spoe_context(struct spoe_context *ctx)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002039{
2040 struct spoe_config *conf = FLT_CONF(ctx->filter);
2041 struct spoe_agent *agent = conf->agent;
2042 struct appctx *appctx;
Christopher Faulet42bfa462017-01-04 14:14:19 +01002043 struct spoe_appctx *spoe_appctx;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002044 unsigned int min_applets;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002045
Christopher Fauleta1cda022016-12-21 08:58:06 +01002046 min_applets = min_applets_act(agent);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002047
Christopher Fauleta1cda022016-12-21 08:58:06 +01002048 /* Check if we need to create a new SPOE applet or not. */
2049 if (agent->applets_act >= min_applets && agent->applets_idle && agent->sending_rate)
2050 goto end;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002051
2052 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Fauleta1cda022016-12-21 08:58:06 +01002053 " - try to create new SPOE appctx\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002054 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
2055 ctx->strm);
2056
Christopher Fauleta1cda022016-12-21 08:58:06 +01002057 /* Do not try to create a new applet if there is no server up for the
2058 * agent's backend. */
2059 if (!agent->b.be->srv_act && !agent->b.be->srv_bck) {
2060 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2061 " - cannot create SPOE appctx: no server up\n",
2062 (int)now.tv_sec, (int)now.tv_usec, agent->id,
2063 __FUNCTION__, ctx->strm);
2064 goto end;
2065 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002066
Christopher Fauleta1cda022016-12-21 08:58:06 +01002067 /* Do not try to create a new applet if we have reached the maximum of
2068 * connection per seconds */
Christopher Faulet48026722016-11-16 15:01:12 +01002069 if (agent->cps_max > 0) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01002070 if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) {
2071 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2072 " - cannot create SPOE appctx: max CPS reached\n",
2073 (int)now.tv_sec, (int)now.tv_usec, agent->id,
2074 __FUNCTION__, ctx->strm);
2075 goto end;
2076 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002077 }
2078
Christopher Fauleta1cda022016-12-21 08:58:06 +01002079 appctx = create_spoe_appctx(conf);
2080 if (appctx == NULL) {
2081 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2082 " - failed to create SPOE appctx\n",
2083 (int)now.tv_sec, (int)now.tv_usec, agent->id,
2084 __FUNCTION__, ctx->strm);
Christopher Faulet72bcc472017-01-04 16:39:41 +01002085 send_log(ctx->strm->be, LOG_EMERG,
2086 "SPOE: [%s] failed to create SPOE applet\n",
2087 agent->id);
2088
Christopher Fauleta1cda022016-12-21 08:58:06 +01002089 goto end;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002090 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002091 if (agent->applets_act <= min_applets)
Christopher Faulet42bfa462017-01-04 14:14:19 +01002092 SPOE_APPCTX(appctx)->flags |= SPOE_APPCTX_FL_PERSIST;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002093
Christopher Fauleta1cda022016-12-21 08:58:06 +01002094 /* Increase the per-process number of cumulated connections */
2095 if (agent->cps_max > 0)
2096 update_freq_ctr(&agent->conn_per_sec, 1);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002097
Christopher Fauleta1cda022016-12-21 08:58:06 +01002098 end:
2099 /* The only reason to return an error is when there is no applet */
2100 if (LIST_ISEMPTY(&agent->applets))
2101 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002102
Christopher Fauleta1cda022016-12-21 08:58:06 +01002103 /* Add the SPOE context in the sending queue and update all running
2104 * info */
2105 LIST_ADDQ(&agent->sending_queue, &ctx->list);
2106 if (agent->sending_rate)
2107 agent->sending_rate--;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002108
2109 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
Christopher Fauleta1cda022016-12-21 08:58:06 +01002110 " - Add stream in sending queue - applets_act=%u - applets_idle=%u"
2111 " - sending_rate=%u\n",
2112 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
2113 ctx->strm, agent->applets_act, agent->applets_idle, agent->sending_rate);
Christopher Fauletf7a30922016-11-10 15:04:51 +01002114
Christopher Fauleta1cda022016-12-21 08:58:06 +01002115 /* Finally try to wakeup the first IDLE applet found and move it at the
2116 * end of the list. */
Christopher Faulet42bfa462017-01-04 14:14:19 +01002117 list_for_each_entry(spoe_appctx, &agent->applets, list) {
2118 appctx = spoe_appctx->owner;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002119 if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
Christopher Faulet4596fb72017-01-11 14:05:19 +01002120 wakeup_spoe_appctx(appctx);
Christopher Faulet42bfa462017-01-04 14:14:19 +01002121 LIST_DEL(&spoe_appctx->list);
2122 LIST_ADDQ(&agent->applets, &spoe_appctx->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002123 break;
2124 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002125 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002126 return 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002127}
2128
2129/***************************************************************************
2130 * Functions that process SPOE messages and actions
2131 **************************************************************************/
2132/* Process SPOE messages for a specific event. During the processing, it returns
2133 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
2134 * is returned. */
2135static int
2136process_spoe_messages(struct stream *s, struct spoe_context *ctx,
2137 struct list *messages, int dir)
2138{
Christopher Fauleta1cda022016-12-21 08:58:06 +01002139 struct spoe_config *conf = FLT_CONF(ctx->filter);
2140 struct spoe_agent *agent = conf->agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002141 struct spoe_message *msg;
2142 struct sample *smp;
2143 struct spoe_arg *arg;
2144 char *p;
2145 size_t max_size;
2146 int off, flag, idx = 0;
2147
2148 /* Reserve 32 bytes from the frame Metadata */
Christopher Fauleta1cda022016-12-21 08:58:06 +01002149 max_size = agent->max_frame_size - 32;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002150
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002151 p = ctx->buffer->p;
2152
2153 /* Loop on messages */
2154 list_for_each_entry(msg, messages, list) {
2155 if (idx + msg->id_len + 1 > max_size)
2156 goto skip;
2157
2158 /* Set the message name */
2159 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
2160
2161 /* Save offset where to store the number of arguments for this
2162 * message */
2163 off = idx++;
2164 p[off] = 0;
2165
2166 /* Loop on arguments */
2167 list_for_each_entry(arg, &msg->args, list) {
2168 p[off]++; /* Increment the number of arguments */
2169
2170 if (idx + arg->name_len + 1 > max_size)
2171 goto skip;
2172
2173 /* Encode the arguement name as a string. It can by NULL */
2174 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
2175
2176 /* Fetch the arguement value */
2177 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
2178 if (!smp) {
2179 /* If no value is available, set it to NULL */
2180 p[idx++] = SPOE_DATA_T_NULL;
2181 continue;
2182 }
2183
2184 /* Else, encode the arguement value */
2185 switch (smp->data.type) {
2186 case SMP_T_BOOL:
2187 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
2188 p[idx++] = (SPOE_DATA_T_BOOL | flag);
2189 break;
2190 case SMP_T_SINT:
2191 p[idx++] = SPOE_DATA_T_INT64;
2192 if (idx + 8 > max_size)
2193 goto skip;
2194 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
2195 break;
2196 case SMP_T_IPV4:
2197 p[idx++] = SPOE_DATA_T_IPV4;
2198 if (idx + 4 > max_size)
2199 goto skip;
2200 memcpy(p+idx, &smp->data.u.ipv4, 4);
2201 idx += 4;
2202 break;
2203 case SMP_T_IPV6:
2204 p[idx++] = SPOE_DATA_T_IPV6;
2205 if (idx + 16 > max_size)
2206 goto skip;
2207 memcpy(p+idx, &smp->data.u.ipv6, 16);
2208 idx += 16;
2209 break;
2210 case SMP_T_STR:
2211 p[idx++] = SPOE_DATA_T_STR;
2212 if (idx + smp->data.u.str.len > max_size)
2213 goto skip;
2214 idx += encode_spoe_string(smp->data.u.str.str,
2215 smp->data.u.str.len,
2216 p+idx);
2217 break;
2218 case SMP_T_BIN:
2219 p[idx++] = SPOE_DATA_T_BIN;
2220 if (idx + smp->data.u.str.len > max_size)
2221 goto skip;
2222 idx += encode_spoe_string(smp->data.u.str.str,
2223 smp->data.u.str.len,
2224 p+idx);
2225 break;
2226 case SMP_T_METH:
2227 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
2228 p[idx++] = SPOE_DATA_T_STR;
2229 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
2230 goto skip;
2231 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
2232 http_known_methods[smp->data.u.meth.meth].len,
2233 p+idx);
2234 }
2235 else {
2236 p[idx++] = SPOE_DATA_T_STR;
2237 if (idx + smp->data.u.str.len > max_size)
2238 goto skip;
2239 idx += encode_spoe_string(smp->data.u.meth.str.str,
2240 smp->data.u.meth.str.len,
2241 p+idx);
2242 }
2243 break;
2244 default:
2245 p[idx++] = SPOE_DATA_T_NULL;
2246 }
2247 }
2248 }
2249 ctx->buffer->i = idx;
2250 return 1;
2251
2252 skip:
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002253 return 0;
2254}
2255
2256/* Helper function to set a variable */
2257static void
2258set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
2259 struct sample *smp)
2260{
2261 struct spoe_config *conf = FLT_CONF(ctx->filter);
2262 struct spoe_agent *agent = conf->agent;
2263 char varname[64];
2264
2265 memset(varname, 0, sizeof(varname));
2266 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
2267 scope, agent->var_pfx, len, name);
2268 vars_set_by_name_ifexist(varname, len, smp);
2269}
2270
2271/* Helper function to unset a variable */
2272static void
2273unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
2274 struct sample *smp)
2275{
2276 struct spoe_config *conf = FLT_CONF(ctx->filter);
2277 struct spoe_agent *agent = conf->agent;
2278 char varname[64];
2279
2280 memset(varname, 0, sizeof(varname));
2281 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
2282 scope, agent->var_pfx, len, name);
2283 vars_unset_by_name_ifexist(varname, len, smp);
2284}
2285
2286
2287/* Process SPOE actions for a specific event. During the processing, it returns
2288 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
2289 * is returned. */
2290static int
2291process_spoe_actions(struct stream *s, struct spoe_context *ctx,
2292 enum spoe_event ev, int dir)
2293{
2294 char *p;
2295 size_t size;
2296 int off, i, idx = 0;
2297
2298 p = ctx->buffer->p;
2299 size = ctx->buffer->i;
2300
2301 while (idx < size) {
2302 char *str;
2303 uint64_t sz;
2304 struct sample smp;
2305 enum spoe_action_type type;
2306
2307 off = idx;
2308 if (idx+2 > size)
2309 goto skip;
2310
2311 type = p[idx++];
2312 switch (type) {
2313 case SPOE_ACT_T_SET_VAR: {
2314 char *scope;
2315
2316 if (p[idx++] != 3)
2317 goto skip_action;
2318
2319 switch (p[idx++]) {
2320 case SPOE_SCOPE_PROC: scope = "proc"; break;
2321 case SPOE_SCOPE_SESS: scope = "sess"; break;
2322 case SPOE_SCOPE_TXN : scope = "txn"; break;
2323 case SPOE_SCOPE_REQ : scope = "req"; break;
2324 case SPOE_SCOPE_RES : scope = "res"; break;
2325 default: goto skip;
2326 }
2327
2328 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2329 if (str == NULL)
2330 goto skip;
2331 memset(&smp, 0, sizeof(smp));
2332 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
Christopher Fauletb5cff602016-11-24 14:53:22 +01002333
2334 if ((i = decode_spoe_data(p+idx, p+size, &smp)) == -1)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002335 goto skip;
Christopher Fauletb5cff602016-11-24 14:53:22 +01002336 idx += i;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002337
2338 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2339 " - set-var '%s.%s.%.*s'\n",
2340 (int)now.tv_sec, (int)now.tv_usec,
2341 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2342 __FUNCTION__, s, scope,
2343 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2344 (int)sz, str);
2345
2346 set_spoe_var(ctx, scope, str, sz, &smp);
2347 break;
2348 }
2349
2350 case SPOE_ACT_T_UNSET_VAR: {
2351 char *scope;
2352
2353 if (p[idx++] != 2)
2354 goto skip_action;
2355
2356 switch (p[idx++]) {
2357 case SPOE_SCOPE_PROC: scope = "proc"; break;
2358 case SPOE_SCOPE_SESS: scope = "sess"; break;
2359 case SPOE_SCOPE_TXN : scope = "txn"; break;
2360 case SPOE_SCOPE_REQ : scope = "req"; break;
2361 case SPOE_SCOPE_RES : scope = "res"; break;
2362 default: goto skip;
2363 }
2364
2365 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2366 if (str == NULL)
2367 goto skip;
2368 memset(&smp, 0, sizeof(smp));
2369 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2370
2371 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2372 " - unset-var '%s.%s.%.*s'\n",
2373 (int)now.tv_sec, (int)now.tv_usec,
2374 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2375 __FUNCTION__, s, scope,
2376 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2377 (int)sz, str);
2378
2379 unset_spoe_var(ctx, scope, str, sz, &smp);
2380 break;
2381 }
2382
2383 default:
2384 skip_action:
2385 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2386 goto skip;
2387 idx += i;
2388 }
2389 }
2390
2391 return 1;
2392 skip:
2393 return 0;
2394}
2395
Christopher Fauleta1cda022016-12-21 08:58:06 +01002396static int
2397start_event_processing(struct spoe_context *ctx, int dir)
2398{
Christopher Fauleta1cda022016-12-21 08:58:06 +01002399 /* If a process is already started for this SPOE context, retry
2400 * later. */
2401 if (ctx->flags & SPOE_CTX_FL_PROCESS)
2402 goto wait;
2403
Christopher Faulet4596fb72017-01-11 14:05:19 +01002404 if (!acquire_spoe_buffer(&ctx->buffer, &ctx->buffer_wait))
Christopher Fauletb067b062017-01-04 16:39:11 +01002405 goto wait;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002406
2407 /* Set the right flag to prevent request and response processing
2408 * in same time. */
2409 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
2410 ? SPOE_CTX_FL_REQ_PROCESS
2411 : SPOE_CTX_FL_RSP_PROCESS);
2412
2413 return 1;
2414
2415 wait:
2416 return 0;
2417}
2418
2419static void
2420stop_event_processing(struct spoe_context *ctx)
2421{
2422 /* Reset the flag to allow next processing */
2423 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2424
Christopher Fauletb067b062017-01-04 16:39:11 +01002425 ctx->status_code = 0;
2426
Christopher Fauleta1cda022016-12-21 08:58:06 +01002427 /* Reset processing timer */
2428 ctx->process_exp = TICK_ETERNITY;
2429
Christopher Faulet4596fb72017-01-11 14:05:19 +01002430 release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002431
2432 if (!LIST_ISEMPTY(&ctx->list)) {
2433 LIST_DEL(&ctx->list);
2434 LIST_INIT(&ctx->list);
2435 }
2436}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002437
2438/* Process a SPOE event. First, this functions will process messages attached to
2439 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2440 * ACK frame to process corresponding actions. During all the processing, it
2441 * returns 0 and it returns 1 when the processing is finished. If an error
2442 * occurred, -1 is returned. */
2443static int
2444process_spoe_event(struct stream *s, struct spoe_context *ctx,
2445 enum spoe_event ev)
2446{
Christopher Fauletf7a30922016-11-10 15:04:51 +01002447 struct spoe_config *conf = FLT_CONF(ctx->filter);
2448 struct spoe_agent *agent = conf->agent;
2449 int dir, ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002450
2451 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2452 " - ctx-state=%s - event=%s\n",
2453 (int)now.tv_sec, (int)now.tv_usec,
Christopher Fauletf7a30922016-11-10 15:04:51 +01002454 agent->id, __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002455 spoe_event_str[ev]);
2456
Christopher Faulet48026722016-11-16 15:01:12 +01002457
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002458 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2459
2460 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2461 goto out;
2462
2463 if (ctx->state == SPOE_CTX_ST_ERROR)
2464 goto error;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002465
2466 if (tick_is_expired(ctx->process_exp, now_ms) && ctx->state != SPOE_CTX_ST_DONE) {
2467 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2468 " - failed to process event '%s': timeout\n",
2469 (int)now.tv_sec, (int)now.tv_usec,
2470 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
Christopher Fauletb067b062017-01-04 16:39:11 +01002471 ctx->status_code = SPOE_CTX_ERR_TOUT;
Christopher Fauletf7a30922016-11-10 15:04:51 +01002472 goto error;
2473 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002474
2475 if (ctx->state == SPOE_CTX_ST_READY) {
Christopher Fauleta1cda022016-12-21 08:58:06 +01002476 if (agent->eps_max > 0) {
2477 if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
2478 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2479 " - skip event '%s': max EPS reached\n",
2480 (int)now.tv_sec, (int)now.tv_usec,
2481 agent->id, __FUNCTION__, s, spoe_event_str[ev]);
2482 goto skip;
2483 }
2484 }
2485
Christopher Fauletf7a30922016-11-10 15:04:51 +01002486 if (!tick_isset(ctx->process_exp)) {
2487 ctx->process_exp = tick_add_ifset(now_ms, agent->timeout.processing);
2488 s->task->expire = tick_first((tick_is_expired(s->task->expire, now_ms) ? 0 : s->task->expire),
2489 ctx->process_exp);
2490 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002491 ret = start_event_processing(ctx, dir);
Christopher Fauletb067b062017-01-04 16:39:11 +01002492 if (!ret)
2493 goto out;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002494 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
Christopher Fauletb067b062017-01-04 16:39:11 +01002495 if (!ret)
2496 goto skip;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002497
Christopher Fauletb067b062017-01-04 16:39:11 +01002498 if (!queue_spoe_context(ctx)) {
2499 ctx->status_code = SPOE_CTX_ERR_RES;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002500 goto error;
Christopher Fauletb067b062017-01-04 16:39:11 +01002501 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01002502
2503 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2504 /* fall through */
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002505 }
2506
Christopher Fauleta1cda022016-12-21 08:58:06 +01002507 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS ||
2508 ctx->state == SPOE_CTX_ST_WAITING_ACK) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002509 ret = 0;
2510 goto out;
2511 }
2512
2513 if (ctx->state == SPOE_CTX_ST_DONE) {
2514 ret = process_spoe_actions(s, ctx, ev, dir);
Christopher Fauletb067b062017-01-04 16:39:11 +01002515 if (!ret)
2516 goto skip;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002517 ctx->frame_id++;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002518 ctx->state = SPOE_CTX_ST_READY;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002519 goto end;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002520 }
2521
2522 out:
2523 return ret;
2524
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002525 error:
Christopher Faulet48026722016-11-16 15:01:12 +01002526 if (agent->eps_max > 0)
2527 update_freq_ctr(&agent->err_per_sec, 1);
2528
Christopher Faulet985532d2016-11-16 15:36:19 +01002529 if (agent->var_on_error) {
2530 struct sample smp;
2531
Christopher Fauleta1cda022016-12-21 08:58:06 +01002532 // FIXME: Get the error code here
Christopher Faulet985532d2016-11-16 15:36:19 +01002533 memset(&smp, 0, sizeof(smp));
2534 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
Christopher Fauletb067b062017-01-04 16:39:11 +01002535 smp.data.u.sint = ctx->status_code;
Christopher Faulet985532d2016-11-16 15:36:19 +01002536 smp.data.type = SMP_T_BOOL;
2537
2538 set_spoe_var(ctx, "txn", agent->var_on_error,
2539 strlen(agent->var_on_error), &smp);
2540 }
Christopher Faulet72bcc472017-01-04 16:39:41 +01002541 send_log(ctx->strm->be, LOG_WARNING,
2542 "SPOE: [%s] failed to process event '%s': code=%u\n",
2543 agent->id, spoe_event_str[ev], ctx->status_code);
Christopher Faulet985532d2016-11-16 15:36:19 +01002544
Christopher Fauletea62c2a2016-11-14 10:54:21 +01002545 ctx->state = ((agent->flags & SPOE_FL_CONT_ON_ERR)
2546 ? SPOE_CTX_ST_READY
Christopher Fauletb067b062017-01-04 16:39:11 +01002547 : SPOE_CTX_ST_NONE);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002548 ret = 1;
2549 goto end;
2550
2551 skip:
2552 ctx->state = SPOE_CTX_ST_READY;
2553 ret = 1;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002554
Christopher Fauleta1cda022016-12-21 08:58:06 +01002555 end:
2556 stop_event_processing(ctx);
2557 return ret;
2558}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002559
2560/***************************************************************************
2561 * Functions that create/destroy SPOE contexts
2562 **************************************************************************/
Christopher Fauleta1cda022016-12-21 08:58:06 +01002563static int
Christopher Faulet4596fb72017-01-11 14:05:19 +01002564acquire_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
Christopher Fauleta1cda022016-12-21 08:58:06 +01002565{
Christopher Faulet4596fb72017-01-11 14:05:19 +01002566 if (*buf != &buf_empty)
Christopher Fauleta1cda022016-12-21 08:58:06 +01002567 return 1;
2568
Christopher Faulet4596fb72017-01-11 14:05:19 +01002569 if (!LIST_ISEMPTY(&buffer_wait->list)) {
2570 LIST_DEL(&buffer_wait->list);
2571 LIST_INIT(&buffer_wait->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002572 }
2573
Christopher Faulet4596fb72017-01-11 14:05:19 +01002574 if (b_alloc_margin(buf, global.tune.reserved_bufs))
Christopher Fauleta1cda022016-12-21 08:58:06 +01002575 return 1;
2576
Christopher Faulet4596fb72017-01-11 14:05:19 +01002577 LIST_ADDQ(&buffer_wq, &buffer_wait->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002578 return 0;
2579}
2580
2581static void
Christopher Faulet4596fb72017-01-11 14:05:19 +01002582release_spoe_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
Christopher Fauleta1cda022016-12-21 08:58:06 +01002583{
Christopher Faulet4596fb72017-01-11 14:05:19 +01002584 if (!LIST_ISEMPTY(&buffer_wait->list)) {
2585 LIST_DEL(&buffer_wait->list);
2586 LIST_INIT(&buffer_wait->list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002587 }
2588
2589 /* Release the buffer if needed */
Christopher Faulet4596fb72017-01-11 14:05:19 +01002590 if (*buf != &buf_empty) {
2591 b_free(buf);
2592 offer_buffers(buffer_wait->target,
2593 tasks_run_queue + applets_active_queue);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002594 }
2595}
2596
Christopher Faulet4596fb72017-01-11 14:05:19 +01002597static int
2598wakeup_spoe_context(struct spoe_context *ctx)
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002599{
2600 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
2601 return 1;
2602}
2603
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002604static struct spoe_context *
2605create_spoe_context(struct filter *filter)
2606{
2607 struct spoe_config *conf = FLT_CONF(filter);
2608 struct spoe_context *ctx;
2609
2610 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2611 if (ctx == NULL) {
2612 return NULL;
2613 }
2614 memset(ctx, 0, sizeof(*ctx));
Christopher Fauletb067b062017-01-04 16:39:11 +01002615 ctx->filter = filter;
2616 ctx->state = SPOE_CTX_ST_NONE;
2617 ctx->status_code = SPOE_CTX_ERR_NONE;
2618 ctx->flags = 0;
2619 ctx->messages = conf->agent->messages;
2620 ctx->buffer = &buf_empty;
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002621 LIST_INIT(&ctx->buffer_wait.list);
2622 ctx->buffer_wait.target = ctx;
2623 ctx->buffer_wait.wakeup_cb = (int (*)(void *))wakeup_spoe_context;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002624 LIST_INIT(&ctx->list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002625
Christopher Fauletf7a30922016-11-10 15:04:51 +01002626 ctx->stream_id = 0;
2627 ctx->frame_id = 1;
2628 ctx->process_exp = TICK_ETERNITY;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002629
2630 return ctx;
2631}
2632
2633static void
2634destroy_spoe_context(struct spoe_context *ctx)
2635{
2636 if (!ctx)
2637 return;
2638
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002639 if (!LIST_ISEMPTY(&ctx->buffer_wait.list))
2640 LIST_DEL(&ctx->buffer_wait.list);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002641 if (!LIST_ISEMPTY(&ctx->list))
2642 LIST_DEL(&ctx->list);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002643 pool_free2(pool2_spoe_ctx, ctx);
2644}
2645
2646static void
2647reset_spoe_context(struct spoe_context *ctx)
2648{
2649 ctx->state = SPOE_CTX_ST_READY;
2650 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2651}
2652
2653
2654/***************************************************************************
2655 * Hooks that manage the filter lifecycle (init/check/deinit)
2656 **************************************************************************/
2657/* Signal handler: Do a soft stop, wakeup SPOE applet */
2658static void
2659sig_stop_spoe(struct sig_handler *sh)
2660{
2661 struct proxy *p;
2662
2663 p = proxy;
2664 while (p) {
2665 struct flt_conf *fconf;
2666
2667 list_for_each_entry(fconf, &p->filter_configs, list) {
Christopher Faulet3b386a32017-02-23 10:17:15 +01002668 struct spoe_config *conf;
2669 struct spoe_agent *agent;
Christopher Faulet42bfa462017-01-04 14:14:19 +01002670 struct spoe_appctx *spoe_appctx;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002671
Christopher Faulet3b386a32017-02-23 10:17:15 +01002672 if (fconf->id != spoe_filter_id)
2673 continue;
2674
2675 conf = fconf->conf;
2676 agent = conf->agent;
2677
Christopher Faulet42bfa462017-01-04 14:14:19 +01002678 list_for_each_entry(spoe_appctx, &agent->applets, list) {
Christopher Faulet4596fb72017-01-11 14:05:19 +01002679 wakeup_spoe_appctx(spoe_appctx->owner);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002680 }
2681 }
2682 p = p->next;
2683 }
2684}
2685
2686
2687/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2688static int
2689spoe_init(struct proxy *px, struct flt_conf *fconf)
2690{
2691 struct spoe_config *conf = fconf->conf;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002692
2693 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2694 init_new_proxy(&conf->agent_fe);
2695 conf->agent_fe.parent = conf->agent;
2696 conf->agent_fe.last_change = now.tv_sec;
2697 conf->agent_fe.id = conf->agent->id;
2698 conf->agent_fe.cap = PR_CAP_FE;
2699 conf->agent_fe.mode = PR_MODE_TCP;
2700 conf->agent_fe.maxconn = 0;
2701 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2702 conf->agent_fe.conn_retries = CONN_RETRIES;
2703 conf->agent_fe.accept = frontend_accept;
2704 conf->agent_fe.srv = NULL;
2705 conf->agent_fe.timeout.client = TICK_ETERNITY;
2706 conf->agent_fe.default_target = &spoe_applet.obj_type;
2707 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2708
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002709 if (!sighandler_registered) {
2710 signal_register_fct(0, sig_stop_spoe, 0);
2711 sighandler_registered = 1;
2712 }
2713
2714 return 0;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002715}
2716
2717/* Free ressources allocated by the SPOE filter. */
2718static void
2719spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2720{
2721 struct spoe_config *conf = fconf->conf;
2722
2723 if (conf) {
2724 struct spoe_agent *agent = conf->agent;
2725 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2726 struct listener *, by_fe);
2727
2728 free(l);
2729 release_spoe_agent(agent);
2730 free(conf);
2731 }
2732 fconf->conf = NULL;
2733}
2734
2735/* Check configuration of a SPOE filter for a specified proxy.
2736 * Return 1 on error, else 0. */
2737static int
2738spoe_check(struct proxy *px, struct flt_conf *fconf)
2739{
2740 struct spoe_config *conf = fconf->conf;
2741 struct proxy *target;
2742
2743 target = proxy_be_by_name(conf->agent->b.name);
2744 if (target == NULL) {
2745 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2746 " declared at %s:%d.\n",
2747 px->id, conf->agent->b.name, conf->agent->id,
2748 conf->agent->conf.file, conf->agent->conf.line);
2749 return 1;
2750 }
2751 if (target->mode != PR_MODE_TCP) {
2752 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2753 " at %s:%d does not support HTTP mode.\n",
2754 px->id, target->id, conf->agent->id,
2755 conf->agent->conf.file, conf->agent->conf.line);
2756 return 1;
2757 }
2758
2759 free(conf->agent->b.name);
2760 conf->agent->b.name = NULL;
2761 conf->agent->b.be = target;
2762 return 0;
2763}
2764
2765/**************************************************************************
2766 * Hooks attached to a stream
2767 *************************************************************************/
2768/* Called when a filter instance is created and attach to a stream. It creates
2769 * the context that will be used to process this stream. */
2770static int
2771spoe_start(struct stream *s, struct filter *filter)
2772{
Christopher Faulet72bcc472017-01-04 16:39:41 +01002773 struct spoe_config *conf = FLT_CONF(filter);
2774 struct spoe_agent *agent = conf->agent;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002775 struct spoe_context *ctx;
2776
2777 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
Christopher Faulet72bcc472017-01-04 16:39:41 +01002778 (int)now.tv_sec, (int)now.tv_usec, agent->id,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002779 __FUNCTION__, s);
2780
2781 ctx = create_spoe_context(filter);
2782 if (ctx == NULL) {
Christopher Faulet72bcc472017-01-04 16:39:41 +01002783 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2784 " - failed to create SPOE context\n",
2785 (int)now.tv_sec, (int)now.tv_usec, agent->id,
2786 __FUNCTION__, ctx->strm);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002787 send_log(s->be, LOG_EMERG,
Christopher Faulet72bcc472017-01-04 16:39:41 +01002788 "SPOE: [%s] failed to create SPOE context\n",
2789 agent->id);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002790 return 0;
2791 }
2792
2793 ctx->strm = s;
2794 ctx->state = SPOE_CTX_ST_READY;
2795 filter->ctx = ctx;
2796
2797 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2798 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2799
2800 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2801 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2802
2803 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2804 filter->pre_analyzers |= AN_RES_INSPECT;
2805
2806 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2807 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2808
2809 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2810 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2811
2812 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2813 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2814
2815 return 1;
2816}
2817
2818/* Called when a filter instance is detached from a stream. It release the
2819 * attached SPOE context. */
2820static void
2821spoe_stop(struct stream *s, struct filter *filter)
2822{
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002823 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2824 (int)now.tv_sec, (int)now.tv_usec,
2825 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2826 __FUNCTION__, s);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002827 destroy_spoe_context(filter->ctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002828}
2829
Christopher Fauletf7a30922016-11-10 15:04:51 +01002830
2831/*
2832 * Called when the stream is woken up because of expired timer.
2833 */
2834static void
2835spoe_check_timeouts(struct stream *s, struct filter *filter)
2836{
2837 struct spoe_context *ctx = filter->ctx;
2838
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002839 if (tick_is_expired(ctx->process_exp, now_ms)) {
2840 s->pending_events |= TASK_WOKEN_MSG;
Christopher Faulet4596fb72017-01-11 14:05:19 +01002841 release_spoe_buffer(&ctx->buffer, &ctx->buffer_wait);
Christopher Fauleta73e59b2016-12-09 17:30:18 +01002842 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01002843}
2844
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002845/* Called when we are ready to filter data on a channel */
2846static int
2847spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2848{
2849 struct spoe_context *ctx = filter->ctx;
2850 int ret = 1;
2851
2852 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2853 " - ctx-flags=0x%08x\n",
2854 (int)now.tv_sec, (int)now.tv_usec,
2855 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2856 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2857
Christopher Fauletb067b062017-01-04 16:39:11 +01002858 if (ctx->state == SPOE_CTX_ST_NONE)
2859 goto out;
2860
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002861 if (!(chn->flags & CF_ISRESP)) {
2862 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2863 chn->analysers |= AN_REQ_INSPECT_FE;
2864 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2865 chn->analysers |= AN_REQ_INSPECT_BE;
2866
2867 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2868 goto out;
2869
2870 ctx->stream_id = s->uniq_id;
Christopher Fauletb067b062017-01-04 16:39:11 +01002871 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2872 if (!ret)
2873 goto out;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002874 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2875 }
2876 else {
2877 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2878 chn->analysers |= AN_RES_INSPECT;
2879
2880 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2881 goto out;
2882
Christopher Fauletb067b062017-01-04 16:39:11 +01002883 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
Christopher Fauleta1cda022016-12-21 08:58:06 +01002884 if (!ret) {
2885 channel_dont_read(chn);
2886 channel_dont_close(chn);
Christopher Fauletb067b062017-01-04 16:39:11 +01002887 goto out;
Christopher Fauleta1cda022016-12-21 08:58:06 +01002888 }
Christopher Fauletb067b062017-01-04 16:39:11 +01002889 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002890 }
2891
2892 out:
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002893 return ret;
2894}
2895
2896/* Called before a processing happens on a given channel */
2897static int
2898spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2899 struct channel *chn, unsigned an_bit)
2900{
2901 struct spoe_context *ctx = filter->ctx;
2902 int ret = 1;
2903
2904 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2905 " - ctx-flags=0x%08x - ana=0x%08x\n",
2906 (int)now.tv_sec, (int)now.tv_usec,
2907 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2908 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2909 ctx->flags, an_bit);
2910
Christopher Fauletb067b062017-01-04 16:39:11 +01002911 if (ctx->state == SPOE_CTX_ST_NONE)
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002912 goto out;
2913
2914 switch (an_bit) {
2915 case AN_REQ_INSPECT_FE:
2916 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2917 break;
2918 case AN_REQ_INSPECT_BE:
2919 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2920 break;
2921 case AN_RES_INSPECT:
2922 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2923 break;
2924 case AN_REQ_HTTP_PROCESS_FE:
2925 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2926 break;
2927 case AN_REQ_HTTP_PROCESS_BE:
2928 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2929 break;
2930 case AN_RES_HTTP_PROCESS_FE:
2931 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2932 break;
2933 }
2934
2935 out:
2936 if (!ret) {
2937 channel_dont_read(chn);
2938 channel_dont_close(chn);
2939 }
2940 return ret;
2941}
2942
2943/* Called when the filtering on the channel ends. */
2944static int
2945spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2946{
2947 struct spoe_context *ctx = filter->ctx;
2948
2949 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2950 " - ctx-flags=0x%08x\n",
2951 (int)now.tv_sec, (int)now.tv_usec,
2952 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2953 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2954
2955 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2956 reset_spoe_context(ctx);
2957 }
2958
2959 return 1;
2960}
2961
2962/********************************************************************
2963 * Functions that manage the filter initialization
2964 ********************************************************************/
2965struct flt_ops spoe_ops = {
2966 /* Manage SPOE filter, called for each filter declaration */
2967 .init = spoe_init,
2968 .deinit = spoe_deinit,
2969 .check = spoe_check,
2970
2971 /* Handle start/stop of SPOE */
Christopher Fauletf7a30922016-11-10 15:04:51 +01002972 .attach = spoe_start,
2973 .detach = spoe_stop,
2974 .check_timeouts = spoe_check_timeouts,
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02002975
2976 /* Handle channels activity */
2977 .channel_start_analyze = spoe_start_analyze,
2978 .channel_pre_analyze = spoe_chn_pre_analyze,
2979 .channel_end_analyze = spoe_end_analyze,
2980};
2981
2982
2983static int
2984cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2985{
2986 const char *err;
2987 int i, err_code = 0;
2988
2989 if ((cfg_scope == NULL && curengine != NULL) ||
2990 (cfg_scope != NULL && curengine == NULL) ||
2991 strcmp(curengine, cfg_scope))
2992 goto out;
2993
2994 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2995 if (!*args[1]) {
2996 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2997 file, linenum);
2998 err_code |= ERR_ALERT | ERR_ABORT;
2999 goto out;
3000 }
3001 if (*args[2]) {
3002 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3003 file, linenum, args[2]);
3004 err_code |= ERR_ALERT | ERR_ABORT;
3005 goto out;
3006 }
3007
3008 err = invalid_char(args[1]);
3009 if (err) {
3010 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
3011 file, linenum, *err, args[0], args[1]);
3012 err_code |= ERR_ALERT | ERR_ABORT;
3013 goto out;
3014 }
3015
3016 if (curagent != NULL) {
3017 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
3018 file, linenum);
3019 err_code |= ERR_ALERT | ERR_ABORT;
3020 goto out;
3021 }
3022 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
3023 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
3024 err_code |= ERR_ALERT | ERR_ABORT;
3025 goto out;
3026 }
3027
3028 curagent->id = strdup(args[1]);
Christopher Fauleta1cda022016-12-21 08:58:06 +01003029
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003030 curagent->conf.file = strdup(file);
3031 curagent->conf.line = linenum;
Christopher Fauleta1cda022016-12-21 08:58:06 +01003032
3033 curagent->timeout.hello = TICK_ETERNITY;
3034 curagent->timeout.idle = TICK_ETERNITY;
Christopher Fauletf7a30922016-11-10 15:04:51 +01003035 curagent->timeout.processing = TICK_ETERNITY;
Christopher Fauleta1cda022016-12-21 08:58:06 +01003036
3037 curagent->engine_id = NULL;
3038 curagent->var_pfx = NULL;
3039 curagent->var_on_error = NULL;
3040 curagent->flags = 0;
3041 curagent->cps_max = 0;
3042 curagent->eps_max = 0;
3043 curagent->max_frame_size = global.tune.bufsize - 4;
3044 curagent->min_applets = 0;
3045 curagent->max_fpa = 100;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003046
3047 for (i = 0; i < SPOE_EV_EVENTS; ++i)
3048 LIST_INIT(&curagent->messages[i]);
Christopher Fauleta1cda022016-12-21 08:58:06 +01003049
3050 curagent->applets_act = 0;
3051 curagent->applets_idle = 0;
3052 curagent->sending_rate = 0;
3053
3054 LIST_INIT(&curagent->applets);
3055 LIST_INIT(&curagent->sending_queue);
3056 LIST_INIT(&curagent->waiting_queue);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003057 }
3058 else if (!strcmp(args[0], "use-backend")) {
3059 if (!*args[1]) {
3060 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
3061 file, linenum, args[0]);
3062 err_code |= ERR_ALERT | ERR_FATAL;
3063 goto out;
3064 }
3065 if (*args[2]) {
3066 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3067 file, linenum, args[2]);
3068 err_code |= ERR_ALERT | ERR_ABORT;
3069 goto out;
3070 }
3071 free(curagent->b.name);
3072 curagent->b.name = strdup(args[1]);
3073 }
3074 else if (!strcmp(args[0], "messages")) {
3075 int cur_arg = 1;
3076 while (*args[cur_arg]) {
3077 struct spoe_msg_placeholder *mp = NULL;
3078
3079 list_for_each_entry(mp, &curmps, list) {
3080 if (!strcmp(mp->id, args[cur_arg])) {
3081 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
3082 file, linenum, args[cur_arg]);
3083 err_code |= ERR_ALERT | ERR_FATAL;
3084 goto out;
3085 }
3086 }
3087
3088 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
3089 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
3090 err_code |= ERR_ALERT | ERR_ABORT;
3091 goto out;
3092 }
3093 mp->id = strdup(args[cur_arg]);
3094 LIST_ADDQ(&curmps, &mp->list);
3095 cur_arg++;
3096 }
3097 }
3098 else if (!strcmp(args[0], "timeout")) {
3099 unsigned int *tv = NULL;
3100 const char *res;
3101 unsigned timeout;
3102
3103 if (!*args[1]) {
3104 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
3105 file, linenum);
3106 err_code |= ERR_ALERT | ERR_FATAL;
3107 goto out;
3108 }
3109 if (!strcmp(args[1], "hello"))
3110 tv = &curagent->timeout.hello;
3111 else if (!strcmp(args[1], "idle"))
3112 tv = &curagent->timeout.idle;
Christopher Fauletf7a30922016-11-10 15:04:51 +01003113 else if (!strcmp(args[1], "processing"))
3114 tv = &curagent->timeout.processing;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003115 else {
Christopher Faulet03a34492016-11-19 16:47:56 +01003116 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' or 'processing' (got %s).\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003117 file, linenum, args[1]);
3118 err_code |= ERR_ALERT | ERR_FATAL;
3119 goto out;
3120 }
3121 if (!*args[2]) {
3122 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
3123 file, linenum, args[1]);
3124 err_code |= ERR_ALERT | ERR_FATAL;
3125 goto out;
3126 }
3127 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
3128 if (res) {
3129 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
3130 file, linenum, *res, args[1]);
3131 err_code |= ERR_ALERT | ERR_ABORT;
3132 goto out;
3133 }
3134 if (*args[3]) {
3135 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3136 file, linenum, args[3]);
3137 err_code |= ERR_ALERT | ERR_ABORT;
3138 goto out;
3139 }
3140 *tv = MS_TO_TICKS(timeout);
3141 }
3142 else if (!strcmp(args[0], "option")) {
3143 if (!*args[1]) {
3144 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
3145 file, linenum, args[0]);
3146 err_code |= ERR_ALERT | ERR_FATAL;
3147 goto out;
3148 }
3149 if (!strcmp(args[1], "var-prefix")) {
3150 char *tmp;
3151
3152 if (!*args[2]) {
3153 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
3154 file, linenum, args[0],
3155 args[1]);
3156 err_code |= ERR_ALERT | ERR_FATAL;
3157 goto out;
3158 }
3159 tmp = args[2];
3160 while (*tmp) {
3161 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3162 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
3163 file, linenum, args[0], args[1]);
3164 err_code |= ERR_ALERT | ERR_FATAL;
3165 goto out;
3166 }
3167 tmp++;
3168 }
3169 curagent->var_pfx = strdup(args[2]);
3170 }
Christopher Fauletea62c2a2016-11-14 10:54:21 +01003171 else if (!strcmp(args[1], "continue-on-error")) {
3172 if (*args[2]) {
3173 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
Christopher Faulet48026722016-11-16 15:01:12 +01003174 file, linenum, args[2]);
Christopher Fauletea62c2a2016-11-14 10:54:21 +01003175 err_code |= ERR_ALERT | ERR_ABORT;
3176 goto out;
3177 }
3178 curagent->flags |= SPOE_FL_CONT_ON_ERR;
3179 }
Christopher Faulet985532d2016-11-16 15:36:19 +01003180 else if (!strcmp(args[1], "set-on-error")) {
3181 char *tmp;
3182
3183 if (!*args[2]) {
3184 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
3185 file, linenum, args[0],
3186 args[1]);
3187 err_code |= ERR_ALERT | ERR_FATAL;
3188 goto out;
3189 }
3190 tmp = args[2];
3191 while (*tmp) {
3192 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3193 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
3194 file, linenum, args[0], args[1]);
3195 err_code |= ERR_ALERT | ERR_FATAL;
3196 goto out;
3197 }
3198 tmp++;
3199 }
3200 curagent->var_on_error = strdup(args[2]);
3201 }
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003202 else {
3203 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
3204 file, linenum, args[1]);
3205 err_code |= ERR_ALERT | ERR_FATAL;
3206 goto out;
3207 }
Christopher Faulet48026722016-11-16 15:01:12 +01003208 }
3209 else if (!strcmp(args[0], "maxconnrate")) {
3210 if (!*args[1]) {
3211 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
3212 file, linenum, args[0]);
3213 err_code |= ERR_ALERT | ERR_FATAL;
3214 goto out;
3215 }
3216 if (*args[2]) {
3217 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3218 file, linenum, args[2]);
3219 err_code |= ERR_ALERT | ERR_ABORT;
3220 goto out;
3221 }
3222 curagent->cps_max = atol(args[1]);
3223 }
3224 else if (!strcmp(args[0], "maxerrrate")) {
3225 if (!*args[1]) {
3226 Alert("parsing [%s:%d] : '%s' expects an integer argument.\n",
3227 file, linenum, args[0]);
3228 err_code |= ERR_ALERT | ERR_FATAL;
3229 goto out;
3230 }
3231 if (*args[2]) {
3232 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3233 file, linenum, args[2]);
3234 err_code |= ERR_ALERT | ERR_ABORT;
3235 goto out;
3236 }
3237 curagent->eps_max = atol(args[1]);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003238 }
3239 else if (*args[0]) {
3240 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
3241 file, linenum, args[0]);
3242 err_code |= ERR_ALERT | ERR_FATAL;
3243 goto out;
3244 }
3245 out:
3246 return err_code;
3247}
3248
3249static int
3250cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
3251{
3252 struct spoe_message *msg;
3253 struct spoe_arg *arg;
3254 const char *err;
3255 char *errmsg = NULL;
3256 int err_code = 0;
3257
3258 if ((cfg_scope == NULL && curengine != NULL) ||
3259 (cfg_scope != NULL && curengine == NULL) ||
3260 strcmp(curengine, cfg_scope))
3261 goto out;
3262
3263 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
3264 if (!*args[1]) {
3265 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
3266 file, linenum);
3267 err_code |= ERR_ALERT | ERR_ABORT;
3268 goto out;
3269 }
3270 if (*args[2]) {
3271 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
3272 file, linenum, args[2]);
3273 err_code |= ERR_ALERT | ERR_ABORT;
3274 goto out;
3275 }
3276
3277 err = invalid_char(args[1]);
3278 if (err) {
3279 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
3280 file, linenum, *err, args[0], args[1]);
3281 err_code |= ERR_ALERT | ERR_ABORT;
3282 goto out;
3283 }
3284
3285 list_for_each_entry(msg, &curmsgs, list) {
3286 if (!strcmp(msg->id, args[1])) {
3287 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
3288 " name as another one declared at %s:%d.\n",
3289 file, linenum, args[1], msg->conf.file, msg->conf.line);
3290 err_code |= ERR_ALERT | ERR_FATAL;
3291 goto out;
3292 }
3293 }
3294
3295 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
3296 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
3297 err_code |= ERR_ALERT | ERR_ABORT;
3298 goto out;
3299 }
3300
3301 curmsg->id = strdup(args[1]);
3302 curmsg->id_len = strlen(curmsg->id);
3303 curmsg->event = SPOE_EV_NONE;
3304 curmsg->conf.file = strdup(file);
3305 curmsg->conf.line = linenum;
3306 LIST_INIT(&curmsg->args);
3307 LIST_ADDQ(&curmsgs, &curmsg->list);
3308 }
3309 else if (!strcmp(args[0], "args")) {
3310 int cur_arg = 1;
3311
3312 curproxy->conf.args.ctx = ARGC_SPOE;
3313 curproxy->conf.args.file = file;
3314 curproxy->conf.args.line = linenum;
3315 while (*args[cur_arg]) {
3316 char *delim = strchr(args[cur_arg], '=');
3317 int idx = 0;
3318
3319 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
3320 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
3321 err_code |= ERR_ALERT | ERR_ABORT;
3322 goto out;
3323 }
3324
3325 if (!delim) {
3326 arg->name = NULL;
3327 arg->name_len = 0;
3328 delim = args[cur_arg];
3329 }
3330 else {
3331 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
3332 arg->name_len = delim - args[cur_arg];
3333 delim++;
3334 }
Christopher Fauletb0b42382017-02-23 22:41:09 +01003335 arg->expr = sample_parse_expr((char*[]){delim, NULL},
3336 &idx, file, linenum, &errmsg,
3337 &curproxy->conf.args);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003338 if (arg->expr == NULL) {
3339 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
3340 err_code |= ERR_ALERT | ERR_FATAL;
3341 free(arg->name);
3342 free(arg);
3343 goto out;
3344 }
3345 LIST_ADDQ(&curmsg->args, &arg->list);
3346 cur_arg++;
3347 }
3348 curproxy->conf.args.file = NULL;
3349 curproxy->conf.args.line = 0;
3350 }
3351 else if (!strcmp(args[0], "event")) {
3352 if (!*args[1]) {
3353 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
3354 err_code |= ERR_ALERT | ERR_ABORT;
3355 goto out;
3356 }
3357 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
3358 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
3359 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
3360 curmsg->event = SPOE_EV_ON_SERVER_SESS;
3361
3362 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
3363 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
3364 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
3365 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
3366 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
3367 curmsg->event = SPOE_EV_ON_TCP_RSP;
3368
3369 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
3370 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
3371 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
3372 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
3373 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
3374 curmsg->event = SPOE_EV_ON_HTTP_RSP;
3375 else {
3376 Alert("parsing [%s:%d] : unkown event '%s'.\n",
3377 file, linenum, args[1]);
3378 err_code |= ERR_ALERT | ERR_ABORT;
3379 goto out;
3380 }
3381 }
3382 else if (!*args[0]) {
3383 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
3384 file, linenum, args[0]);
3385 err_code |= ERR_ALERT | ERR_FATAL;
3386 goto out;
3387 }
3388 out:
3389 free(errmsg);
3390 return err_code;
3391}
3392
3393/* Return -1 on error, else 0 */
3394static int
3395parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
3396 struct flt_conf *fconf, char **err, void *private)
3397{
3398 struct list backup_sections;
3399 struct spoe_config *conf;
3400 struct spoe_message *msg, *msgback;
3401 struct spoe_msg_placeholder *mp, *mpback;
3402 char *file = NULL, *engine = NULL;
3403 int ret, pos = *cur_arg + 1;
3404
3405 conf = calloc(1, sizeof(*conf));
3406 if (conf == NULL) {
3407 memprintf(err, "%s: out of memory", args[*cur_arg]);
3408 goto error;
3409 }
3410 conf->proxy = px;
3411
3412 while (*args[pos]) {
3413 if (!strcmp(args[pos], "config")) {
3414 if (!*args[pos+1]) {
3415 memprintf(err, "'%s' : '%s' option without value",
3416 args[*cur_arg], args[pos]);
3417 goto error;
3418 }
3419 file = args[pos+1];
3420 pos += 2;
3421 }
3422 else if (!strcmp(args[pos], "engine")) {
3423 if (!*args[pos+1]) {
3424 memprintf(err, "'%s' : '%s' option without value",
3425 args[*cur_arg], args[pos]);
3426 goto error;
3427 }
3428 engine = args[pos+1];
3429 pos += 2;
3430 }
3431 else {
3432 memprintf(err, "unknown keyword '%s'", args[pos]);
3433 goto error;
3434 }
3435 }
3436 if (file == NULL) {
3437 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
3438 goto error;
3439 }
3440
3441 /* backup sections and register SPOE sections */
3442 LIST_INIT(&backup_sections);
3443 cfg_backup_sections(&backup_sections);
3444 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
3445 cfg_register_section("spoe-message", cfg_parse_spoe_message);
3446
3447 /* Parse SPOE filter configuration file */
3448 curengine = engine;
3449 curproxy = px;
3450 curagent = NULL;
3451 curmsg = NULL;
3452 ret = readcfgfile(file);
3453 curproxy = NULL;
3454
3455 /* unregister SPOE sections and restore previous sections */
3456 cfg_unregister_sections();
3457 cfg_restore_sections(&backup_sections);
3458
3459 if (ret == -1) {
3460 memprintf(err, "Could not open configuration file %s : %s",
3461 file, strerror(errno));
3462 goto error;
3463 }
3464 if (ret & (ERR_ABORT|ERR_FATAL)) {
3465 memprintf(err, "Error(s) found in configuration file %s", file);
3466 goto error;
3467 }
3468
3469 /* Check SPOE agent */
3470 if (curagent == NULL) {
3471 memprintf(err, "No SPOE agent found in file %s", file);
3472 goto error;
3473 }
3474 if (curagent->b.name == NULL) {
3475 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
3476 curagent->id, curagent->conf.file, curagent->conf.line);
3477 goto error;
3478 }
Christopher Fauletf7a30922016-11-10 15:04:51 +01003479 if (curagent->timeout.hello == TICK_ETERNITY ||
3480 curagent->timeout.idle == TICK_ETERNITY ||
Christopher Fauletf7a30922016-11-10 15:04:51 +01003481 curagent->timeout.processing == TICK_ETERNITY) {
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003482 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
3483 " | While not properly invalid, you will certainly encounter various problems\n"
3484 " | with such a configuration. To fix this, please ensure that all following\n"
Christopher Faulet03a34492016-11-19 16:47:56 +01003485 " | timeouts are set to a non-zero value: 'hello', 'idle', 'processing'.\n",
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003486 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3487 }
3488 if (curagent->var_pfx == NULL) {
3489 char *tmp = curagent->id;
3490
3491 while (*tmp) {
3492 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
3493 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
3494 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
3495 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
3496 goto error;
3497 }
3498 tmp++;
3499 }
3500 curagent->var_pfx = strdup(curagent->id);
3501 }
Christopher Fauleta1cda022016-12-21 08:58:06 +01003502 if (curagent->engine_id == NULL)
3503 curagent->engine_id = generate_pseudo_uuid();
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003504
3505 if (LIST_ISEMPTY(&curmps)) {
3506 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
3507 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
3508 goto finish;
3509 }
3510
3511 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3512 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
Christopher Fauleta21b0642017-01-09 16:56:23 +01003513 struct spoe_arg *arg;
3514 unsigned int where;
3515
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003516 if (!strcmp(msg->id, mp->id)) {
3517 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
3518 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
3519 msg->event = SPOE_EV_ON_TCP_REQ_FE;
3520 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
3521 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
3522 }
3523 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
3524 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3525 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3526 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3527 px->id, msg->conf.file, msg->conf.line);
3528 goto next;
3529 }
3530 if (msg->event == SPOE_EV_NONE) {
3531 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3532 px->id, msg->conf.file, msg->conf.line);
3533 goto next;
3534 }
Christopher Fauleta21b0642017-01-09 16:56:23 +01003535
3536 where = 0;
3537 switch (msg->event) {
3538 case SPOE_EV_ON_CLIENT_SESS:
3539 where |= SMP_VAL_FE_CON_ACC;
3540 break;
3541
3542 case SPOE_EV_ON_TCP_REQ_FE:
3543 where |= SMP_VAL_FE_REQ_CNT;
3544 break;
3545
3546 case SPOE_EV_ON_HTTP_REQ_FE:
3547 where |= SMP_VAL_FE_HRQ_HDR;
3548 break;
3549
3550 case SPOE_EV_ON_TCP_REQ_BE:
3551 if (px->cap & PR_CAP_FE)
3552 where |= SMP_VAL_FE_REQ_CNT;
3553 if (px->cap & PR_CAP_BE)
3554 where |= SMP_VAL_BE_REQ_CNT;
3555 break;
3556
3557 case SPOE_EV_ON_HTTP_REQ_BE:
3558 if (px->cap & PR_CAP_FE)
3559 where |= SMP_VAL_FE_HRQ_HDR;
3560 if (px->cap & PR_CAP_BE)
3561 where |= SMP_VAL_BE_HRQ_HDR;
3562 break;
3563
3564 case SPOE_EV_ON_SERVER_SESS:
3565 where |= SMP_VAL_BE_SRV_CON;
3566 break;
3567
3568 case SPOE_EV_ON_TCP_RSP:
3569 if (px->cap & PR_CAP_FE)
3570 where |= SMP_VAL_FE_RES_CNT;
3571 if (px->cap & PR_CAP_BE)
3572 where |= SMP_VAL_BE_RES_CNT;
3573 break;
3574
3575 case SPOE_EV_ON_HTTP_RSP:
3576 if (px->cap & PR_CAP_FE)
3577 where |= SMP_VAL_FE_HRS_HDR;
3578 if (px->cap & PR_CAP_BE)
3579 where |= SMP_VAL_BE_HRS_HDR;
3580 break;
3581
3582 default:
3583 break;
3584 }
3585
3586 list_for_each_entry(arg, &msg->args, list) {
3587 if (!(arg->expr->fetch->val & where)) {
3588 Warning("Proxy '%s': Ignore SPOE message at %s:%d: "
3589 "some args extract information from '%s', "
3590 "none of which is available here ('%s').\n",
3591 px->id, msg->conf.file, msg->conf.line,
3592 sample_ckp_names(arg->expr->fetch->use),
3593 sample_ckp_names(where));
3594 goto next;
3595 }
3596 }
3597
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003598 msg->agent = curagent;
3599 LIST_DEL(&msg->list);
3600 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3601 goto next;
3602 }
3603 }
3604 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3605 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3606 goto error;
3607 next:
3608 continue;
3609 }
3610
3611 finish:
3612 conf->agent = curagent;
3613 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3614 LIST_DEL(&mp->list);
3615 release_spoe_msg_placeholder(mp);
3616 }
3617 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3618 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3619 px->id, msg->id, msg->conf.file, msg->conf.line);
3620 LIST_DEL(&msg->list);
3621 release_spoe_message(msg);
3622 }
3623
3624 *cur_arg = pos;
Christopher Faulet3b386a32017-02-23 10:17:15 +01003625 fconf->id = spoe_filter_id;
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003626 fconf->ops = &spoe_ops;
3627 fconf->conf = conf;
3628 return 0;
3629
3630 error:
3631 release_spoe_agent(curagent);
3632 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3633 LIST_DEL(&mp->list);
3634 release_spoe_msg_placeholder(mp);
3635 }
3636 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3637 LIST_DEL(&msg->list);
3638 release_spoe_message(msg);
3639 }
3640 free(conf);
3641 return -1;
3642}
3643
3644
3645/* Declare the filter parser for "spoe" keyword */
3646static struct flt_kw_list flt_kws = { "SPOE", { }, {
3647 { "spoe", parse_spoe_flt, NULL },
3648 { NULL, NULL, NULL },
3649 }
3650};
3651
3652__attribute__((constructor))
3653static void __spoe_init(void)
3654{
3655 flt_register_keywords(&flt_kws);
3656
3657 LIST_INIT(&curmsgs);
3658 LIST_INIT(&curmps);
3659 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
Christopher Faulet42bfa462017-01-04 14:14:19 +01003660 pool2_spoe_appctx = create_pool("spoe_appctx", sizeof(struct spoe_appctx), MEM_F_SHARED);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003661}
3662
3663__attribute__((destructor))
3664static void
3665__spoe_deinit(void)
3666{
3667 pool_destroy2(pool2_spoe_ctx);
Christopher Faulet42bfa462017-01-04 14:14:19 +01003668 pool_destroy2(pool2_spoe_appctx);
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02003669}