blob: 1ebdbdaf07076cb2cee01a408b897278a56523a9 [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
33#include <proto/frontend.h>
34#include <proto/log.h>
35#include <proto/proto_http.h>
36#include <proto/proxy.h>
37#include <proto/sample.h>
38#include <proto/session.h>
39#include <proto/signal.h>
40#include <proto/stream.h>
41#include <proto/stream_interface.h>
42#include <proto/task.h>
43#include <proto/vars.h>
44
45#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
46#define SPOE_PRINTF(x...) fprintf(x)
47#else
48#define SPOE_PRINTF(x...)
49#endif
50
51/* Helper to get ctx inside an appctx */
52#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
53
54/* TODO: add an option to customize these values */
55/* The maximum number of new applet waiting the end of the hello handshake */
56#define MAX_NEW_SPOE_APPLETS 5
57
58/* The maximum number of error when a stream is waiting of a SPOE applet */
59#define MAX_NEW_SPOE_APPLET_ERRS 3
60
61/* Minimal size for a frame */
62#define MIN_FRAME_SIZE 256
63
64/* Flags set on the SPOE context */
65#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
66#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
67#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
68#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
69
70#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
71
72#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
73#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
74
75/* All possible states for a SPOE context */
76enum spoe_ctx_state {
77 SPOE_CTX_ST_NONE = 0,
78 SPOE_CTX_ST_READY,
79 SPOE_CTX_ST_SENDING_MSGS,
80 SPOE_CTX_ST_WAITING_ACK,
81 SPOE_CTX_ST_DONE,
82 SPOE_CTX_ST_ERROR,
83};
84
85/* All possible states for a SPOE applet */
86enum spoe_appctx_state {
87 SPOE_APPCTX_ST_CONNECT = 0,
88 SPOE_APPCTX_ST_CONNECTING,
89 SPOE_APPCTX_ST_PROCESSING,
90 SPOE_APPCTX_ST_DISCONNECT,
91 SPOE_APPCTX_ST_DISCONNECTING,
92 SPOE_APPCTX_ST_EXIT,
93 SPOE_APPCTX_ST_END,
94};
95
96/* All supported SPOE actions */
97enum spoe_action_type {
98 SPOE_ACT_T_SET_VAR = 1,
99 SPOE_ACT_T_UNSET_VAR,
100 SPOE_ACT_TYPES,
101};
102
103/* All supported SPOE events */
104enum spoe_event {
105 SPOE_EV_NONE = 0,
106
107 /* Request events */
108 SPOE_EV_ON_CLIENT_SESS = 1,
109 SPOE_EV_ON_TCP_REQ_FE,
110 SPOE_EV_ON_TCP_REQ_BE,
111 SPOE_EV_ON_HTTP_REQ_FE,
112 SPOE_EV_ON_HTTP_REQ_BE,
113
114 /* Response events */
115 SPOE_EV_ON_SERVER_SESS,
116 SPOE_EV_ON_TCP_RSP,
117 SPOE_EV_ON_HTTP_RSP,
118
119 SPOE_EV_EVENTS
120};
121
122/* Errors triggerd by SPOE applet */
123enum spoe_frame_error {
124 SPOE_FRM_ERR_NONE = 0,
125 SPOE_FRM_ERR_IO,
126 SPOE_FRM_ERR_TOUT,
127 SPOE_FRM_ERR_TOO_BIG,
128 SPOE_FRM_ERR_INVALID,
129 SPOE_FRM_ERR_NO_VSN,
130 SPOE_FRM_ERR_NO_FRAME_SIZE,
131 SPOE_FRM_ERR_NO_CAP,
132 SPOE_FRM_ERR_BAD_VSN,
133 SPOE_FRM_ERR_BAD_FRAME_SIZE,
134 SPOE_FRM_ERR_UNKNOWN = 99,
135 SPOE_FRM_ERRS,
136};
137
138/* Scopes used for variables set by agents. It is a way to be agnotic to vars
139 * scope. */
140enum spoe_vars_scope {
141 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
142 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
143 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
144 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
145 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
146};
147
148
149/* Describe an argument that will be linked to a message. It is a sample fetch,
150 * with an optional name. */
151struct spoe_arg {
152 char *name; /* Name of the argument, may be NULL */
153 unsigned int name_len; /* The name length, 0 if NULL */
154 struct sample_expr *expr; /* Sample expression */
155 struct list list; /* Used to chain SPOE args */
156};
157
158/* Used during the config parsing only because, when a SPOE agent section is
159 * parsed, messages can be undefined. */
160struct spoe_msg_placeholder {
161 char *id; /* SPOE message placeholder id */
162 struct list list; /* Use to chain SPOE message placeholders */
163};
164
165/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
166 * an argument list (see above) and it is linked to a specific event. */
167struct spoe_message {
168 char *id; /* SPOE message id */
169 unsigned int id_len; /* The message id length */
170 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
171 struct {
172 char *file; /* file where the SPOE message appears */
173 int line; /* line where the SPOE message appears */
174 } conf; /* config information */
175 struct list args; /* Arguments added when the SPOE messages is sent */
176 struct list list; /* Used to chain SPOE messages */
177
178 enum spoe_event event; /* SPOE_EV_* */
179};
180
181/* Describe a SPOE agent. */
182struct spoe_agent {
183 char *id; /* SPOE agent id (name) */
184 struct {
185 char *file; /* file where the SPOE agent appears */
186 int line; /* line where the SPOE agent appears */
187 } conf; /* config information */
188 union {
189 struct proxy *be; /* Backend used by this agent */
190 char *name; /* Backend name used during conf parsing */
191 } b;
192 struct {
193 unsigned int hello; /* Max time to receive AGENT-HELLO frame */
194 unsigned int idle; /* Max Idle timeout */
195 unsigned int ack; /* Max time to acknowledge a NOTIFY frame */
196 } timeout;
197
198 char *var_pfx; /* Prefix used for vars set by the agent */
199
200 struct list cache; /* List used to cache SPOE streams. In
201 * fact, we cache the SPOE applect ctx */
202
203 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
204 * for each supported events */
205
206 struct list applet_wq; /* List of streams waiting for a SPOE applet */
207 unsigned int new_applets; /* The number of new SPOE applets */
208};
209
210/* SPOE filter configuration */
211struct spoe_config {
212 struct proxy *proxy; /* Proxy owning the filter */
213 struct spoe_agent *agent; /* Agent used by this filter */
214 struct proxy agent_fe; /* Agent frontend */
215};
216
217/* SPOE context attached to a stream. It is the main structure that handles the
218 * processing offload */
219struct spoe_context {
220 struct filter *filter; /* The SPOE filter */
221 struct stream *strm; /* The stream that should be offloaded */
222 struct appctx *appctx; /* The SPOE appctx */
223 struct list *messages; /* List of messages that will be sent during the stream processing */
224 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
225 struct list buffer_wait; /* position in the list of streams waiting for a buffer */
226 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
227
228 unsigned int errs; /* The number of errors to acquire a SPOE applet */
229
230 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
231 unsigned int flags; /* SPOE_CTX_FL_* */
232
233 unsigned int stream_id; /* stream_id and frame_id are used */
234 unsigned int frame_id; /* to map NOTIFY and ACK frames */
235
236};
237
238/* Set if the handle on SIGUSR1 is registered */
239static int sighandler_registered = 0;
240
241/* proxy used during the parsing */
242struct proxy *curproxy = NULL;
243
244/* The name of the SPOE engine, used during the parsing */
245char *curengine = NULL;
246
247/* SPOE agent used during the parsing */
248struct spoe_agent *curagent = NULL;
249
250/* SPOE message used during the parsing */
251struct spoe_message *curmsg = NULL;
252
253/* list of SPOE messages and placeholders used during the parsing */
254struct list curmsgs;
255struct list curmps;
256
257/* Pool used to allocate new SPOE contexts */
258static struct pool_head *pool2_spoe_ctx = NULL;
259
260/* Temporary variables used to ease error processing */
261int spoe_status_code = SPOE_FRM_ERR_NONE;
262char spoe_reason[256];
263
264struct flt_ops spoe_ops;
265
266static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
267static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
268static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
269
270/********************************************************************
271 * helper functions/globals
272 ********************************************************************/
273static void
274release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
275{
276 if (!mp)
277 return;
278 free(mp->id);
279 free(mp);
280}
281
282
283static void
284release_spoe_message(struct spoe_message *msg)
285{
286 struct spoe_arg *arg, *back;
287
288 if (!msg)
289 return;
290 free(msg->id);
291 free(msg->conf.file);
292 list_for_each_entry_safe(arg, back, &msg->args, list) {
293 release_sample_expr(arg->expr);
294 free(arg->name);
295 LIST_DEL(&arg->list);
296 free(arg);
297 }
298 free(msg);
299}
300
301static void
302release_spoe_agent(struct spoe_agent *agent)
303{
304 struct spoe_message *msg, *back;
305 int i;
306
307 if (!agent)
308 return;
309 free(agent->id);
310 free(agent->conf.file);
311 free(agent->var_pfx);
312 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
313 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
314 LIST_DEL(&msg->list);
315 release_spoe_message(msg);
316 }
317 }
318 free(agent);
319}
320
321static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
322 [SPOE_FRM_ERR_NONE] = "normal",
323 [SPOE_FRM_ERR_IO] = "I/O error",
324 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
325 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
326 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
327 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
328 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
329 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
330 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
331 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
332 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
333};
334
335static const char *spoe_event_str[SPOE_EV_EVENTS] = {
336 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
337 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
338 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
339 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
340 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
341
342 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
343 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
344 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
345};
346
347
348#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
349
350static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
351 [SPOE_CTX_ST_NONE] = "NONE",
352 [SPOE_CTX_ST_READY] = "READY",
353 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
354 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
355 [SPOE_CTX_ST_DONE] = "DONE",
356 [SPOE_CTX_ST_ERROR] = "ERROR",
357};
358
359static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
360 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
361 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
362 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
363 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
364 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
365 [SPOE_APPCTX_ST_EXIT] = "EXIT",
366 [SPOE_APPCTX_ST_END] = "END",
367};
368
369#endif
370/********************************************************************
371 * Functions that encode/decode SPOE frames
372 ********************************************************************/
373/* Frame Types sent by HAProxy and by agents */
374enum spoe_frame_type {
375 /* Frames sent by HAProxy */
376 SPOE_FRM_T_HAPROXY_HELLO = 1,
377 SPOE_FRM_T_HAPROXY_DISCON,
378 SPOE_FRM_T_HAPROXY_NOTIFY,
379
380 /* Frames sent by the agents */
381 SPOE_FRM_T_AGENT_HELLO = 101,
382 SPOE_FRM_T_AGENT_DISCON,
383 SPOE_FRM_T_AGENT_ACK
384};
385
386/* All supported data types */
387enum spoe_data_type {
388 SPOE_DATA_T_NULL = 0,
389 SPOE_DATA_T_BOOL,
390 SPOE_DATA_T_INT32,
391 SPOE_DATA_T_UINT32,
392 SPOE_DATA_T_INT64,
393 SPOE_DATA_T_UINT64,
394 SPOE_DATA_T_IPV4,
395 SPOE_DATA_T_IPV6,
396 SPOE_DATA_T_STR,
397 SPOE_DATA_T_BIN,
398 SPOE_DATA_TYPES
399};
400
401/* Masks to get data type or flags value */
402#define SPOE_DATA_T_MASK 0x0F
403#define SPOE_DATA_FL_MASK 0xF0
404
405/* Flags to set Boolean values */
406#define SPOE_DATA_FL_FALSE 0x00
407#define SPOE_DATA_FL_TRUE 0x10
408
409/* Helper to get static string length, excluding the terminating null byte */
410#define SLEN(str) (sizeof(str)-1)
411
412/* Predefined key used in HELLO/DISCONNECT frames */
413#define SUPPORTED_VERSIONS_KEY "supported-versions"
414#define VERSION_KEY "version"
415#define MAX_FRAME_SIZE_KEY "max-frame-size"
416#define CAPABILITIES_KEY "capabilities"
417#define STATUS_CODE_KEY "status-code"
418#define MSG_KEY "message"
419
420struct spoe_version {
421 char *str;
422 int min;
423 int max;
424};
425
426/* All supported versions */
427static struct spoe_version supported_versions[] = {
428 {"1.0", 1000, 1000},
429 {NULL, 0, 0}
430};
431
432/* Comma-separated list of supported versions */
433#define SUPPORTED_VERSIONS_VAL "1.0"
434
435/* Comma-separated list of supported capabilities (none for now) */
436#define CAPABILITIES_VAL ""
437
438static int
439decode_spoe_version(const char *str, size_t len)
440{
441 char tmp[len+1], *start, *end;
442 double d;
443 int vsn = -1;
444
445 memset(tmp, 0, len+1);
446 memcpy(tmp, str, len);
447
448 start = tmp;
449 while (isspace(*start))
450 start++;
451
452 d = strtod(start, &end);
453 if (d == 0 || start == end)
454 goto out;
455
456 if (*end) {
457 while (isspace(*end))
458 end++;
459 if (*end)
460 goto out;
461 }
462 vsn = (int)(d * 1000);
463 out:
464 return vsn;
465}
466
467/* Encode a variable-length integer. This function never fails and returns the
468 * number of written bytes. */
469static int
470encode_spoe_varint(uint64_t i, char *buf)
471{
472 int idx;
473
474 if (i < 240) {
475 buf[0] = (unsigned char)i;
476 return 1;
477 }
478
479 buf[0] = (unsigned char)i | 240;
480 i = (i - 240) >> 4;
481 for (idx = 1; i >= 128; ++idx) {
482 buf[idx] = (unsigned char)i | 128;
483 i = (i - 128) >> 7;
484 }
485 buf[idx++] = (unsigned char)i;
486 return idx;
487}
488
489/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
490 * happens when the buffer's end in reached. On success, the number of read
491 * bytes is returned. */
492static int
493decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
494{
495 unsigned char *msg = (unsigned char *)buf;
496 int idx = 0;
497
498 if (msg > (unsigned char *)end)
499 return -1;
500
501 if (msg[0] < 240) {
502 *i = msg[0];
503 return 1;
504 }
505 *i = msg[0];
506 do {
507 ++idx;
508 if (msg+idx > (unsigned char *)end)
509 return -1;
510 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
511 } while (msg[idx] >= 128);
512 return (idx + 1);
513}
514
515/* Encode a string. The string will be prefix by its length, encoded as a
516 * variable-length integer. This function never fails and returns the number of
517 * written bytes. */
518static int
519encode_spoe_string(const char *str, size_t len, char *dst)
520{
521 int idx = 0;
522
523 if (!len) {
524 dst[0] = 0;
525 return 1;
526 }
527
528 idx += encode_spoe_varint(len, dst);
529 memcpy(dst+idx, str, len);
530 return (idx + len);
531}
532
533/* Decode a string. Its length is decoded first as a variable-length integer. If
534 * it succeeds, and if the string length is valid, the begin of the string is
535 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
536 * read is returned. If an error occurred, -1 is returned and <*str> remains
537 * NULL. */
538static int
539decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
540{
541 int i, idx = 0;
542
543 *str = NULL;
544 *len = 0;
545
546 if ((i = decode_spoe_varint(buf, end, len)) == -1)
547 goto error;
548 idx += i;
549 if (buf + idx + *len > end)
550 goto error;
551
552 *str = buf+idx;
553 return (idx + *len);
554
555 error:
556 return -1;
557}
558
559/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
560 * of bytes read is returned. A types data is composed of a type (1 byte) and
561 * corresponding data:
562 * - boolean: non additional data (0 bytes)
563 * - integers: a variable-length integer (see decode_spoe_varint)
564 * - ipv4: 4 bytes
565 * - ipv6: 16 bytes
566 * - binary and string: a buffer prefixed by its size, a variable-length
567 * integer (see decode_spoe_string) */
568static int
569skip_spoe_data(char *frame, char *end)
570{
571 uint64_t sz = 0;
572 int i, idx = 0;
573
574 if (frame > end)
575 return -1;
576
577 switch (frame[idx++] & SPOE_DATA_T_MASK) {
578 case SPOE_DATA_T_BOOL:
579 break;
580 case SPOE_DATA_T_INT32:
581 case SPOE_DATA_T_INT64:
582 case SPOE_DATA_T_UINT32:
583 case SPOE_DATA_T_UINT64:
584 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
585 return -1;
586 idx += i;
587 break;
588 case SPOE_DATA_T_IPV4:
589 idx += 4;
590 break;
591 case SPOE_DATA_T_IPV6:
592 idx += 16;
593 break;
594 case SPOE_DATA_T_STR:
595 case SPOE_DATA_T_BIN:
596 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
597 return -1;
598 idx += i + sz;
599 break;
600 }
601
602 if (frame+idx > end)
603 return -1;
604 return idx;
605}
606
607/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
608 * number of read bytes is returned. See skip_spoe_data for details. */
609static int
610decode_spoe_data(char *frame, char *end, struct sample *smp)
611{
612 uint64_t sz = 0;
613 int type, i, idx = 0;
614
615 if (frame > end)
616 return -1;
617
618 type = frame[idx++];
619 switch (type & SPOE_DATA_T_MASK) {
620 case SPOE_DATA_T_BOOL:
621 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
622 smp->data.type = SMP_T_BOOL;
623 break;
624 case SPOE_DATA_T_INT32:
625 case SPOE_DATA_T_INT64:
626 case SPOE_DATA_T_UINT32:
627 case SPOE_DATA_T_UINT64:
628 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
629 return -1;
630 idx += i;
631 smp->data.type = SMP_T_SINT;
632 break;
633 case SPOE_DATA_T_IPV4:
634 if (frame+idx+4 > end)
635 return -1;
636 memcpy(&smp->data.u.ipv4, frame+idx, 4);
637 smp->data.type = SMP_T_IPV4;
638 idx += 4;
639 break;
640 case SPOE_DATA_T_IPV6:
641 if (frame+idx+16 > end)
642 return -1;
643 memcpy(&smp->data.u.ipv6, frame+idx, 16);
644 smp->data.type = SMP_T_IPV6;
645 idx += 16;
646 break;
647 case SPOE_DATA_T_STR:
648 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
649 return -1;
650 idx += i;
651 if (frame+idx+sz > end)
652 return -1;
653 smp->data.u.str.str = frame+idx;
654 smp->data.u.str.len = sz;
655 smp->data.type = SMP_T_STR;
656 idx += sz;
657 break;
658 case SPOE_DATA_T_BIN:
659 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
660 return -1;
661 idx += i;
662 if (frame+idx+sz > end)
663 return -1;
664 smp->data.u.str.str = frame+idx;
665 smp->data.u.str.len = sz;
666 smp->data.type = SMP_T_BIN;
667 idx += sz;
668 break;
669 }
670
671 if (frame+idx > end)
672 return -1;
673 return idx;
674}
675
676/* Skip an action in a frame received from an agent. If an error occurred, -1 is
677 * returned, otherwise the number of read bytes is returned. An action is
678 * composed of the action type followed by a typed data. */
679static int
680skip_spoe_action(char *frame, char *end)
681{
682 int n, i, idx = 0;
683
684 if (frame+2 > end)
685 return -1;
686
687 idx++; /* Skip the action type */
688 n = frame[idx++];
689 while (n-- > 0) {
690 if ((i = skip_spoe_data(frame+idx, end)) == -1)
691 return -1;
692 idx += i;
693 }
694
695 if (frame+idx > end)
696 return -1;
697 return idx;
698}
699
700/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
701 * success, 0 if the frame can be ignored and -1 if an error occurred. */
702static int
703prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
704{
705 int idx = 0;
706 size_t max = (7 /* TYPE + METADATA */
707 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
708 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
709 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
710
711 if (size < max)
712 return -1;
713
714 /* Frame type */
715 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
716
717 /* No flags for now */
718 memset(frame+idx, 0, 4);
719 idx += 4;
720
721 /* No stream-id and frame-id for HELLO frames */
722 frame[idx++] = 0;
723 frame[idx++] = 0;
724
725 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
726 * and "capabilities" */
727
728 /* "supported-versions" K/V item */
729 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
730 frame[idx++] = SPOE_DATA_T_STR;
731 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
732
733 /* "max-fram-size" K/V item */
734 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
735 frame[idx++] = SPOE_DATA_T_UINT32;
736 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
737
738 /* "capabilities" K/V item */
739 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
740 frame[idx++] = SPOE_DATA_T_STR;
741 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
742
743 return idx;
744}
745
746/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
747 * size on success, 0 if the frame can be ignored and -1 if an error
748 * occurred. */
749static int
750prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
751{
752 const char *reason;
753 int rlen, idx = 0;
754 size_t max = (7 /* TYPE + METADATA */
755 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
756 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
757
758 if (size < max)
759 return -1;
760
761 /* Get the message corresponding to the status code */
762 if (spoe_status_code >= SPOE_FRM_ERRS)
763 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
764 reason = spoe_frm_err_reasons[spoe_status_code];
765 rlen = strlen(reason);
766
767 /* Frame type */
768 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
769
770 /* No flags for now */
771 memset(frame+idx, 0, 4);
772 idx += 4;
773
774 /* No stream-id and frame-id for DISCONNECT frames */
775 frame[idx++] = 0;
776 frame[idx++] = 0;
777
778 /* There are 2 mandatory items: "status-code" and "message" */
779
780 /* "status-code" K/V item */
781 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
782 frame[idx++] = SPOE_DATA_T_UINT32;
783 idx += encode_spoe_varint(spoe_status_code, frame+idx);
784
785 /* "message" K/V item */
786 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
787 frame[idx++] = SPOE_DATA_T_STR;
788 idx += encode_spoe_string(reason, rlen, frame+idx);
789
790 return idx;
791}
792
793/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
794 * success, 0 if the frame can be ignored and -1 if an error occurred. */
795static int
796prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
797{
798 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
799 int idx = 0;
800
801 if (size < APPCTX_SPOE(appctx).max_frame_size)
802 return -1;
803
804 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
805
806 /* No flags for now */
807 memset(frame+idx, 0, 4);
808 idx += 4;
809
810 /* Set stream-id and frame-id */
811 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
812 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
813
814 /* Copy encoded messages */
815 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
816 idx += ctx->buffer->i;
817
818 return idx;
819}
820
821/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
822 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
823static int
824handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
825{
826 int vsn, max_frame_size;
827 int i, idx = 0;
828 size_t min_size = (7 /* TYPE + METADATA */
829 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
830 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
831 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
832
833 /* Check frame type */
834 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
835 return 0;
836
837 if (size < min_size) {
838 spoe_status_code = SPOE_FRM_ERR_INVALID;
839 return -1;
840 }
841
842 /* Skip flags: fragmentation is not supported for now */
843 idx += 4;
844
845 /* stream-id and frame-id must be cleared */
846 if (frame[idx] != 0 || frame[idx+1] != 0) {
847 spoe_status_code = SPOE_FRM_ERR_INVALID;
848 return -1;
849 }
850 idx += 2;
851
852 /* There are 3 mandatory items: "version", "max-frame-size" and
853 * "capabilities" */
854
855 /* Loop on K/V items */
856 vsn = max_frame_size = 0;
857 while (idx < size) {
858 char *str;
859 uint64_t sz;
860
861 /* Decode the item key */
862 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
863 if (str == NULL) {
864 spoe_status_code = SPOE_FRM_ERR_INVALID;
865 return -1;
866 }
867 /* Check "version" K/V item */
868 if (!memcmp(str, VERSION_KEY, sz)) {
869 /* The value must be a string */
870 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
871 spoe_status_code = SPOE_FRM_ERR_INVALID;
872 return -1;
873 }
874 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
875 if (str == NULL) {
876 spoe_status_code = SPOE_FRM_ERR_INVALID;
877 return -1;
878 }
879
880 vsn = decode_spoe_version(str, sz);
881 if (vsn == -1) {
882 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
883 return -1;
884 }
885 for (i = 0; supported_versions[i].str != NULL; ++i) {
886 if (vsn >= supported_versions[i].min &&
887 vsn <= supported_versions[i].max)
888 break;
889 }
890 if (supported_versions[i].str == NULL) {
891 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
892 return -1;
893 }
894 }
895 /* Check "max-frame-size" K/V item */
896 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
897 int type;
898
899 /* The value must be integer */
900 type = frame[idx++];
901 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
902 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
903 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
904 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
905 spoe_status_code = SPOE_FRM_ERR_INVALID;
906 return -1;
907 }
908 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
909 spoe_status_code = SPOE_FRM_ERR_INVALID;
910 return -1;
911 }
912 idx += i;
913 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
914 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
915 return -1;
916 }
917 max_frame_size = sz;
918 }
919 /* Skip "capabilities" K/V item for now */
920 else {
921 /* Silently ignore unknown item */
922 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
923 spoe_status_code = SPOE_FRM_ERR_INVALID;
924 return -1;
925 }
926 idx += i;
927 }
928 }
929
930 /* Final checks */
931 if (!vsn) {
932 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
933 return -1;
934 }
935 if (!max_frame_size) {
936 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
937 return -1;
938 }
939
940 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
941 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
942 return idx;
943}
944
945/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
946 * bytes on success, 0 if the frame can be ignored and -1 if an error
947 * occurred. */
948static int
949handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
950{
951 int i, idx = 0;
952 size_t min_size = (7 /* TYPE + METADATA */
953 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
954 + 1 + SLEN(MSG_KEY) + 1 + 1);
955
956 /* Check frame type */
957 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
958 return 0;
959
960 if (size < min_size) {
961 spoe_status_code = SPOE_FRM_ERR_INVALID;
962 return -1;
963 }
964
965 /* Skip flags: fragmentation is not supported for now */
966 idx += 4;
967
968 /* stream-id and frame-id must be cleared */
969 if (frame[idx] != 0 || frame[idx+1] != 0) {
970 spoe_status_code = SPOE_FRM_ERR_INVALID;
971 return -1;
972 }
973 idx += 2;
974
975 /* There are 2 mandatory items: "status-code" and "message" */
976
977 /* Loop on K/V items */
978 while (idx < size) {
979 char *str;
980 uint64_t sz;
981
982 /* Decode the item key */
983 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
984 if (str == NULL) {
985 spoe_status_code = SPOE_FRM_ERR_INVALID;
986 return -1;
987 }
988
989 /* Check "status-code" K/V item */
990 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
991 int type;
992
993 /* The value must be an integer */
994 type = frame[idx++];
995 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
996 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
997 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
998 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
999 spoe_status_code = SPOE_FRM_ERR_INVALID;
1000 return -1;
1001 }
1002 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1003 spoe_status_code = SPOE_FRM_ERR_INVALID;
1004 return -1;
1005 }
1006 idx += i;
1007 spoe_status_code = sz;
1008 }
1009
1010 /* Check "message" K/V item */
1011 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1012 /* The value must be a string */
1013 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1014 spoe_status_code = SPOE_FRM_ERR_INVALID;
1015 return -1;
1016 }
1017 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1018 if (str == NULL || sz > 255) {
1019 spoe_status_code = SPOE_FRM_ERR_INVALID;
1020 return -1;
1021 }
1022 memcpy(spoe_reason, str, sz);
1023 spoe_reason[sz] = 0;
1024 }
1025 else {
1026 /* Silently ignore unknown item */
1027 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1028 spoe_status_code = SPOE_FRM_ERR_INVALID;
1029 return -1;
1030 }
1031 idx += i;
1032 }
1033 }
1034
1035 return idx;
1036}
1037
1038
1039/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1040 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1041static int
1042handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1043{
1044 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1045 uint64_t stream_id, frame_id;
1046 int idx = 0;
1047 size_t min_size = (7 /* TYPE + METADATA */);
1048
1049 /* Check frame type */
1050 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1051 return 0;
1052
1053 if (size < min_size) {
1054 spoe_status_code = SPOE_FRM_ERR_INVALID;
1055 return -1;
1056 }
1057
1058 /* Skip flags: fragmentation is not supported for now */
1059 idx += 4;
1060
1061 /* Get the stream-id and the frame-id */
1062 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1063 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1064
1065 /* Check stream-id and frame-id */
1066 if (ctx->stream_id != (unsigned int)stream_id ||
1067 ctx->frame_id != (unsigned int)frame_id)
1068 return 0;
1069
1070 /* Copy encoded actions */
1071 b_reset(ctx->buffer);
1072 memcpy(ctx->buffer->p, frame+idx, size-idx);
1073 ctx->buffer->i = size-idx;
1074
1075 return idx;
1076}
1077
1078
1079/********************************************************************
1080 * Functions that manage the SPOE applet
1081 ********************************************************************/
1082/* Callback function that catches applet timeouts. If a timeout occurred, we set
1083 * <appctx->st1> flag and the SPOE applet is woken up. */
1084static struct task *
1085process_spoe_applet(struct task * task)
1086{
1087 struct appctx *appctx = task->context;
1088
1089 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1090 if (tick_is_expired(task->expire, now_ms)) {
1091 task->expire = TICK_ETERNITY;
1092 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1093 }
1094 si_applet_want_get(appctx->owner);
1095 appctx_wakeup(appctx);
1096 return task;
1097}
1098
1099/* Remove a SPOE applet from the agent cache */
1100static void
1101remove_spoe_applet_from_cache(struct appctx *appctx)
1102{
1103 struct appctx *a, *back;
1104 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1105
1106 if (LIST_ISEMPTY(&agent->cache))
1107 return;
1108
1109 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1110 if (a == appctx) {
1111 LIST_DEL(&APPCTX_SPOE(appctx).list);
1112 break;
1113 }
1114 }
1115}
1116
1117
1118/* Callback function that releases a SPOE applet. This happens when the
1119 * connection with the agent is closed. */
1120static void
1121release_spoe_applet(struct appctx *appctx)
1122{
1123 struct stream_interface *si = appctx->owner;
1124 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1125 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1126
1127 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1128 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1129 on_new_spoe_appctx_failure(agent);
1130
1131 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1132 si_shutw(si);
1133 si_shutr(si);
1134 si_ic(si)->flags |= CF_READ_NULL;
1135 appctx->st0 = SPOE_APPCTX_ST_END;
1136 }
1137
1138 if (ctx != NULL) {
1139 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1140 ctx->appctx = NULL;
1141 }
1142
1143 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1144 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1145 __FUNCTION__, appctx);
1146
1147 /* Release the task attached to the SPOE applet */
1148 if (APPCTX_SPOE(appctx).task) {
1149 task_delete(APPCTX_SPOE(appctx).task);
1150 task_free(APPCTX_SPOE(appctx).task);
1151 }
1152
1153 /* And remove it from the agent cache */
1154 remove_spoe_applet_from_cache(appctx);
1155 APPCTX_SPOE(appctx).ctx = NULL;
1156}
1157
1158/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1159 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1160 * encoded using the callback function <prepare>. */
1161static int
1162send_spoe_frame(struct appctx *appctx,
1163 int (*prepare)(struct appctx *, char *, size_t))
1164{
1165 struct stream_interface *si = appctx->owner;
1166 int framesz, ret;
1167 uint32_t netint;
1168
1169 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1170 if (ret <= 0)
1171 goto skip_or_error;
1172 framesz = ret;
1173 netint = htonl(framesz);
1174 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1175 if (ret > 0)
1176 ret = bi_putblk(si_ic(si), trash.str, framesz);
1177 if (ret <= 0) {
1178 if (ret == -1)
1179 return -1;
1180 return -2;
1181 }
1182 return 1;
1183
1184 skip_or_error:
1185 if (!ret)
1186 return -1;
1187 return -2;
1188}
1189
1190/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1191 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1192 * is decoded using the callback function <handle>. */
1193static int
1194recv_spoe_frame(struct appctx *appctx,
1195 int (*handle)(struct appctx *, char *, size_t))
1196{
1197 struct stream_interface *si = appctx->owner;
1198 int framesz, ret;
1199 uint32_t netint;
1200
1201 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1202 if (ret <= 0)
1203 goto empty_or_error;
1204 framesz = ntohl(netint);
1205 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1206 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1207 return -2;
1208 }
1209
1210 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1211 if (ret <= 0)
1212 goto empty_or_error;
1213 bo_skip(si_oc(si), ret+sizeof(netint));
1214
1215 /* First check if the received frame is a DISCONNECT frame */
1216 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1217 if (ret != 0) {
1218 if (ret > 0) {
1219 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1220 " - disconnected by peer (%d): %s\n",
1221 (int)now.tv_sec, (int)now.tv_usec,
1222 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1223 __FUNCTION__, appctx, spoe_status_code,
1224 spoe_reason);
1225 return 2;
1226 }
1227 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1228 " - error on frame (%s)\n",
1229 (int)now.tv_sec, (int)now.tv_usec,
1230 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1231 __FUNCTION__, appctx,
1232 spoe_frm_err_reasons[spoe_status_code]);
1233 return -2;
1234 }
1235 if (handle == NULL)
1236 goto out;
1237
1238 /* If not, try to decode it */
1239 ret = handle(appctx, trash.str, framesz);
1240 if (ret <= 0) {
1241 if (!ret)
1242 return -1;
1243 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1244 " - error on frame (%s)\n",
1245 (int)now.tv_sec, (int)now.tv_usec,
1246 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1247 __FUNCTION__, appctx,
1248 spoe_frm_err_reasons[spoe_status_code]);
1249 return -2;
1250 }
1251 out:
1252 return 1;
1253
1254 empty_or_error:
1255 if (!ret)
1256 return 0;
1257 spoe_status_code = SPOE_FRM_ERR_IO;
1258 return -2;
1259}
1260
1261/* I/O Handler processing messages exchanged with the agent */
1262static void
1263handle_spoe_applet(struct appctx *appctx)
1264{
1265 struct stream_interface *si = appctx->owner;
1266 struct stream *s = si_strm(si);
1267 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1268 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1269 int ret;
1270
1271 switchstate:
1272 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1273 " - appctx-state=%s\n",
1274 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1275 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1276
1277 switch (appctx->st0) {
1278 case SPOE_APPCTX_ST_CONNECT:
1279 spoe_status_code = SPOE_FRM_ERR_NONE;
1280 if (si->state <= SI_ST_CON) {
1281 si_applet_want_put(si);
1282 task_wakeup(s->task, TASK_WOKEN_MSG);
1283 break;
1284 }
1285 else if (si->state != SI_ST_EST) {
1286 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1287 on_new_spoe_appctx_failure(agent);
1288 goto switchstate;
1289 }
1290 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1291 if (ret < 0) {
1292 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1293 on_new_spoe_appctx_failure(agent);
1294 goto switchstate;
1295 }
1296 else if (!ret)
1297 goto full;
1298
1299 /* Hello frame was sent. Set the hello timeout and
1300 * wait for the reply. */
1301 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1302 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1303 /* fall through */
1304
1305 case SPOE_APPCTX_ST_CONNECTING:
1306 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1307 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1308 on_new_spoe_appctx_failure(agent);
1309 goto switchstate;
1310 }
1311 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1312 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1313 " - Connection timed out\n",
1314 (int)now.tv_sec, (int)now.tv_usec,
1315 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1316 __FUNCTION__, appctx);
1317 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1318 on_new_spoe_appctx_failure(agent);
1319 goto switchstate;
1320 }
1321 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1322 if (ret < 0) {
1323 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1324 on_new_spoe_appctx_failure(agent);
1325 goto switchstate;
1326 }
1327 if (ret == 2) {
1328 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1329 on_new_spoe_appctx_failure(agent);
1330 goto switchstate;
1331 }
1332 if (!ret)
1333 goto out;
1334
1335 /* hello handshake is finished, set the idle timeout,
1336 * Add the appctx in the agent cache, decrease the
1337 * number of new applets and wake up waiting streams. */
1338 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1339 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1340 on_new_spoe_appctx_success(agent, appctx);
1341 break;
1342
1343 case SPOE_APPCTX_ST_PROCESSING:
1344 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1345 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1346 goto switchstate;
1347 }
1348 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1349 spoe_status_code = SPOE_FRM_ERR_TOUT;
1350 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1351 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1352 goto switchstate;
1353 }
1354 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1355 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1356 if (ret < 0) {
1357 if (ret == -1) {
1358 ctx->state = SPOE_CTX_ST_ERROR;
1359 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1360 goto skip_notify_frame;
1361 }
1362 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1363 goto switchstate;
1364 }
1365 else if (!ret)
1366 goto full;
1367 ctx->state = SPOE_CTX_ST_WAITING_ACK;
1368 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.ack);
1369 }
1370
1371 skip_notify_frame:
1372 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1373 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1374 if (ret < 0) {
1375 if (ret == -1)
1376 goto skip_notify_frame;
1377 ctx->state = SPOE_CTX_ST_ERROR;
1378 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1379 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1380 goto switchstate;
1381 }
1382 if (!ret)
1383 goto out;
1384 if (ret == 2) {
1385 ctx->state = SPOE_CTX_ST_ERROR;
1386 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1387 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1388 goto switchstate;
1389 }
1390 ctx->state = SPOE_CTX_ST_DONE;
1391 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1392 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1393 }
1394 else {
1395 if (stopping) {
1396 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1397 goto switchstate;
1398 }
1399
1400 ret = recv_spoe_frame(appctx, NULL);
1401 if (ret < 0) {
1402 if (ret == -1)
1403 goto skip_notify_frame;
1404 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1405 goto switchstate;
1406 }
1407 if (!ret)
1408 goto out;
1409 if (ret == 2) {
1410 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1411 goto switchstate;
1412 }
1413 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1414 }
1415 break;
1416
1417 case SPOE_APPCTX_ST_DISCONNECT:
1418 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1419 if (ret < 0) {
1420 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1421 goto switchstate;
1422 }
1423 else if (!ret)
1424 goto full;
1425 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1426 " - disconnected by HAProxy (%d): %s\n",
1427 (int)now.tv_sec, (int)now.tv_usec,
1428 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1429 __FUNCTION__, appctx, spoe_status_code,
1430 spoe_frm_err_reasons[spoe_status_code]);
1431
1432 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.ack);
1433 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1434 /* fall through */
1435
1436 case SPOE_APPCTX_ST_DISCONNECTING:
1437 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1438 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1439 goto switchstate;
1440 }
1441 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1442 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1443 goto switchstate;
1444 }
1445 ret = recv_spoe_frame(appctx, NULL);
1446 if (ret < 0 || ret == 2) {
1447 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1448 goto switchstate;
1449 }
1450 break;
1451
1452 case SPOE_APPCTX_ST_EXIT:
1453 si_shutw(si);
1454 si_shutr(si);
1455 si_ic(si)->flags |= CF_READ_NULL;
1456 appctx->st0 = SPOE_APPCTX_ST_END;
1457 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1458 /* fall through */
1459
1460 case SPOE_APPCTX_ST_END:
1461 break;
1462 }
1463
1464 out:
1465 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1466 task_queue(APPCTX_SPOE(appctx).task);
1467 si_oc(si)->flags |= CF_READ_DONTWAIT;
1468 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1469 return;
1470 full:
1471 si_applet_cant_put(si);
1472 goto out;
1473}
1474
1475struct applet spoe_applet = {
1476 .obj_type = OBJ_TYPE_APPLET,
1477 .name = "<SPOE>", /* used for logging */
1478 .fct = handle_spoe_applet,
1479 .release = release_spoe_applet,
1480};
1481
1482/* Create a SPOE applet. On success, the created applet is returned, else
1483 * NULL. */
1484static struct appctx *
1485create_spoe_appctx(struct spoe_config *conf)
1486{
1487 struct appctx *appctx;
1488 struct session *sess;
1489 struct task *task;
1490 struct stream *strm;
1491 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1492 struct listener *, by_fe);
1493
1494 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1495 goto out_error;
1496
1497 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1498 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1499 goto out_free_appctx;
1500 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1501 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1502 APPCTX_SPOE(appctx).task->context = appctx;
1503 APPCTX_SPOE(appctx).agent = conf->agent;
1504 APPCTX_SPOE(appctx).ctx = NULL;
1505 APPCTX_SPOE(appctx).version = 0;
1506 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1507 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1508
1509 sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1510 if (!sess)
1511 goto out_free_spoe;
1512
1513 if ((task = task_new()) == NULL)
1514 goto out_free_sess;
1515
1516 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1517 goto out_free_task;
1518
1519 strm->target = sess->listener->default_target;
1520 strm->req.analysers |= sess->listener->analysers;
1521 stream_set_backend(strm, conf->agent->b.be);
1522
1523 /* applet is waiting for data */
1524 si_applet_cant_get(&strm->si[0]);
1525 appctx_wakeup(appctx);
1526
1527 /* Increase the number of applets waiting the end of the hello
1528 * handshake. */
1529 conf->agent->new_applets++;
1530
1531 strm->do_log = NULL;
1532 strm->res.flags |= CF_READ_DONTWAIT;
1533
1534 conf->agent_fe.feconn++;
1535 jobs++;
1536 totalconn++;
1537
1538 return appctx;
1539
1540 /* Error unrolling */
1541 out_free_task:
1542 task_free(task);
1543 out_free_sess:
1544 session_free(sess);
1545 out_free_spoe:
1546 task_free(APPCTX_SPOE(appctx).task);
1547 out_free_appctx:
1548 appctx_free(appctx);
1549 out_error:
1550 return NULL;
1551}
1552
1553/* Wake up a SPOE applet attached to a SPOE context. */
1554static void
1555wakeup_spoe_appctx(struct spoe_context *ctx)
1556{
1557 if (ctx->appctx == NULL)
1558 return;
1559 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1560 si_applet_want_get(ctx->appctx->owner);
1561 si_applet_want_put(ctx->appctx->owner);
1562 appctx_wakeup(ctx->appctx);
1563 }
1564}
1565
1566
1567/* Run across the list of pending streams waiting for a SPOE applet and wake the
1568 * first. */
1569static void
1570offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1571{
1572 struct spoe_context *ctx;
1573
1574 if (LIST_ISEMPTY(&agent->applet_wq))
1575 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1576 else {
1577 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1578 APPCTX_SPOE(appctx).ctx = ctx;
1579 ctx->appctx = appctx;
1580 LIST_DEL(&ctx->applet_wait);
1581 LIST_INIT(&ctx->applet_wait);
1582 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1583 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1584 " - wake up stream to get available SPOE applet\n",
1585 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1586 __FUNCTION__, ctx->strm);
1587 }
1588}
1589
1590/* A failure occurred during SPOE applet creation. */
1591static void
1592on_new_spoe_appctx_failure(struct spoe_agent *agent)
1593{
1594 struct spoe_context *ctx;
1595
1596 agent->new_applets--;
1597 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
1598 ctx->errs++;
1599 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1600 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1601 " - wake up stream because to SPOE applet connection failed\n",
1602 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1603 __FUNCTION__, ctx->strm);
1604 }
1605}
1606
1607static void
1608on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1609{
1610 agent->new_applets--;
1611 offer_spoe_appctx(agent, appctx);
1612}
1613/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1614 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1615static int
1616acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1617{
1618 struct spoe_config *conf = FLT_CONF(ctx->filter);
1619 struct spoe_agent *agent = conf->agent;
1620 struct appctx *appctx;
1621
1622 /* If a process is already started for this SPOE context, retry
1623 * later. */
1624 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1625 goto wait;
1626
1627 /* If needed, initialize the buffer that will be used to encode messages
1628 * and decode actions. */
1629 if (ctx->buffer == &buf_empty) {
1630 if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
1631 LIST_DEL(&ctx->buffer_wait);
1632 LIST_INIT(&ctx->buffer_wait);
1633 }
1634
1635 if (!b_alloc_margin(&ctx->buffer, 0)) {
1636 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
1637 goto wait;
1638 }
1639 }
1640
1641 /* If the SPOE applet was already set, all is done. */
1642 if (ctx->appctx)
1643 goto success;
1644
1645 /* Else try to retrieve it from the agent cache */
1646 if (!LIST_ISEMPTY(&agent->cache)) {
1647 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1648 LIST_DEL(&APPCTX_SPOE(appctx).list);
1649 APPCTX_SPOE(appctx).ctx = ctx;
1650 ctx->appctx = appctx;
1651 goto success;
1652 }
1653
1654 /* If there is no server up for the agent's backend or it too many
1655 * failure occurred, this is an error. */
1656 if ((!agent->b.be->srv_act && !agent->b.be->srv_bck) ||
1657 ctx->errs >= MAX_NEW_SPOE_APPLET_ERRS)
1658 goto error;
1659
1660 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1661 " - waiting for available SPOE appctx\n",
1662 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1663 ctx->strm);
1664
1665 /* Else add the stream in the waiting queue. */
1666 if (LIST_ISEMPTY(&ctx->applet_wait))
1667 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1668
1669 /* Finally, create new SPOE applet if we can */
1670 if (agent->new_applets < MAX_NEW_SPOE_APPLETS) {
1671 if (create_spoe_appctx(conf) == NULL)
1672 goto error;
1673 }
1674
1675 wait:
1676 return 0;
1677
1678 success:
1679 /* Remove the stream from the waiting queue */
1680 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1681 LIST_DEL(&ctx->applet_wait);
1682 LIST_INIT(&ctx->applet_wait);
1683 }
1684
1685 /* Set the right flag to prevent request and response processing
1686 * in same time. */
1687 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1688 ? SPOE_CTX_FL_REQ_PROCESS
1689 : SPOE_CTX_FL_RSP_PROCESS);
1690 ctx->errs = 0;
1691
1692 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1693 " - acquire SPOE appctx %p from cache\n",
1694 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1695 __FUNCTION__, ctx->strm, ctx->appctx);
1696 return 1;
1697
1698 error:
1699 /* Remove the stream from the waiting queue */
1700 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1701 LIST_DEL(&ctx->applet_wait);
1702 LIST_INIT(&ctx->applet_wait);
1703 }
1704
1705 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1706 " - failed to acquire SPOE appctx errs=%u\n",
1707 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1708 __FUNCTION__, ctx->strm, ctx->errs);
1709 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1710
1711 return -1;
1712}
1713
1714/* Release a SPOE applet and push it in the agent cache. */
1715static void
1716release_spoe_appctx(struct spoe_context *ctx)
1717{
1718 struct spoe_config *conf = FLT_CONF(ctx->filter);
1719 struct spoe_agent *agent = conf->agent;
1720 struct appctx *appctx = ctx->appctx;
1721
1722 /* Reset the flag to allow next processing */
1723 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1724
1725 /* Release the buffer if needed */
1726 if (ctx->buffer != &buf_empty) {
1727 b_free(&ctx->buffer);
1728 if (!LIST_ISEMPTY(&buffer_wq))
1729 stream_offer_buffers();
1730 }
1731
1732 /* If there is no SPOE applet, all is done */
1733 if (!appctx)
1734 return;
1735
1736 /* Else, reassign it or push it in the agent cache */
1737 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1738 " - release SPOE appctx %p\n",
1739 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1740 __FUNCTION__, ctx->strm, appctx);
1741
1742 APPCTX_SPOE(appctx).ctx = NULL;
1743 ctx->appctx = NULL;
1744 offer_spoe_appctx(agent, appctx);
1745}
1746
1747/***************************************************************************
1748 * Functions that process SPOE messages and actions
1749 **************************************************************************/
1750/* Process SPOE messages for a specific event. During the processing, it returns
1751 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1752 * is returned. */
1753static int
1754process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1755 struct list *messages, int dir)
1756{
1757 struct spoe_message *msg;
1758 struct sample *smp;
1759 struct spoe_arg *arg;
1760 char *p;
1761 size_t max_size;
1762 int off, flag, idx = 0;
1763
1764 /* Reserve 32 bytes from the frame Metadata */
1765 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1766
1767 b_reset(ctx->buffer);
1768 p = ctx->buffer->p;
1769
1770 /* Loop on messages */
1771 list_for_each_entry(msg, messages, list) {
1772 if (idx + msg->id_len + 1 > max_size)
1773 goto skip;
1774
1775 /* Set the message name */
1776 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1777
1778 /* Save offset where to store the number of arguments for this
1779 * message */
1780 off = idx++;
1781 p[off] = 0;
1782
1783 /* Loop on arguments */
1784 list_for_each_entry(arg, &msg->args, list) {
1785 p[off]++; /* Increment the number of arguments */
1786
1787 if (idx + arg->name_len + 1 > max_size)
1788 goto skip;
1789
1790 /* Encode the arguement name as a string. It can by NULL */
1791 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1792
1793 /* Fetch the arguement value */
1794 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1795 if (!smp) {
1796 /* If no value is available, set it to NULL */
1797 p[idx++] = SPOE_DATA_T_NULL;
1798 continue;
1799 }
1800
1801 /* Else, encode the arguement value */
1802 switch (smp->data.type) {
1803 case SMP_T_BOOL:
1804 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1805 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1806 break;
1807 case SMP_T_SINT:
1808 p[idx++] = SPOE_DATA_T_INT64;
1809 if (idx + 8 > max_size)
1810 goto skip;
1811 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1812 break;
1813 case SMP_T_IPV4:
1814 p[idx++] = SPOE_DATA_T_IPV4;
1815 if (idx + 4 > max_size)
1816 goto skip;
1817 memcpy(p+idx, &smp->data.u.ipv4, 4);
1818 idx += 4;
1819 break;
1820 case SMP_T_IPV6:
1821 p[idx++] = SPOE_DATA_T_IPV6;
1822 if (idx + 16 > max_size)
1823 goto skip;
1824 memcpy(p+idx, &smp->data.u.ipv6, 16);
1825 idx += 16;
1826 break;
1827 case SMP_T_STR:
1828 p[idx++] = SPOE_DATA_T_STR;
1829 if (idx + smp->data.u.str.len > max_size)
1830 goto skip;
1831 idx += encode_spoe_string(smp->data.u.str.str,
1832 smp->data.u.str.len,
1833 p+idx);
1834 break;
1835 case SMP_T_BIN:
1836 p[idx++] = SPOE_DATA_T_BIN;
1837 if (idx + smp->data.u.str.len > max_size)
1838 goto skip;
1839 idx += encode_spoe_string(smp->data.u.str.str,
1840 smp->data.u.str.len,
1841 p+idx);
1842 break;
1843 case SMP_T_METH:
1844 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1845 p[idx++] = SPOE_DATA_T_STR;
1846 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1847 goto skip;
1848 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1849 http_known_methods[smp->data.u.meth.meth].len,
1850 p+idx);
1851 }
1852 else {
1853 p[idx++] = SPOE_DATA_T_STR;
1854 if (idx + smp->data.u.str.len > max_size)
1855 goto skip;
1856 idx += encode_spoe_string(smp->data.u.meth.str.str,
1857 smp->data.u.meth.str.len,
1858 p+idx);
1859 }
1860 break;
1861 default:
1862 p[idx++] = SPOE_DATA_T_NULL;
1863 }
1864 }
1865 }
1866 ctx->buffer->i = idx;
1867 return 1;
1868
1869 skip:
1870 b_reset(ctx->buffer);
1871 return 0;
1872}
1873
1874/* Helper function to set a variable */
1875static void
1876set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1877 struct sample *smp)
1878{
1879 struct spoe_config *conf = FLT_CONF(ctx->filter);
1880 struct spoe_agent *agent = conf->agent;
1881 char varname[64];
1882
1883 memset(varname, 0, sizeof(varname));
1884 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1885 scope, agent->var_pfx, len, name);
1886 vars_set_by_name_ifexist(varname, len, smp);
1887}
1888
1889/* Helper function to unset a variable */
1890static void
1891unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1892 struct sample *smp)
1893{
1894 struct spoe_config *conf = FLT_CONF(ctx->filter);
1895 struct spoe_agent *agent = conf->agent;
1896 char varname[64];
1897
1898 memset(varname, 0, sizeof(varname));
1899 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1900 scope, agent->var_pfx, len, name);
1901 vars_unset_by_name_ifexist(varname, len, smp);
1902}
1903
1904
1905/* Process SPOE actions for a specific event. During the processing, it returns
1906 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1907 * is returned. */
1908static int
1909process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1910 enum spoe_event ev, int dir)
1911{
1912 char *p;
1913 size_t size;
1914 int off, i, idx = 0;
1915
1916 p = ctx->buffer->p;
1917 size = ctx->buffer->i;
1918
1919 while (idx < size) {
1920 char *str;
1921 uint64_t sz;
1922 struct sample smp;
1923 enum spoe_action_type type;
1924
1925 off = idx;
1926 if (idx+2 > size)
1927 goto skip;
1928
1929 type = p[idx++];
1930 switch (type) {
1931 case SPOE_ACT_T_SET_VAR: {
1932 char *scope;
1933
1934 if (p[idx++] != 3)
1935 goto skip_action;
1936
1937 switch (p[idx++]) {
1938 case SPOE_SCOPE_PROC: scope = "proc"; break;
1939 case SPOE_SCOPE_SESS: scope = "sess"; break;
1940 case SPOE_SCOPE_TXN : scope = "txn"; break;
1941 case SPOE_SCOPE_REQ : scope = "req"; break;
1942 case SPOE_SCOPE_RES : scope = "res"; break;
1943 default: goto skip;
1944 }
1945
1946 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
1947 if (str == NULL)
1948 goto skip;
1949 memset(&smp, 0, sizeof(smp));
1950 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
1951 if (decode_spoe_data(p+idx, p+size, &smp) == -1)
1952 goto skip;
1953
1954 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1955 " - set-var '%s.%s.%.*s'\n",
1956 (int)now.tv_sec, (int)now.tv_usec,
1957 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
1958 __FUNCTION__, s, scope,
1959 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
1960 (int)sz, str);
1961
1962 set_spoe_var(ctx, scope, str, sz, &smp);
1963 break;
1964 }
1965
1966 case SPOE_ACT_T_UNSET_VAR: {
1967 char *scope;
1968
1969 if (p[idx++] != 2)
1970 goto skip_action;
1971
1972 switch (p[idx++]) {
1973 case SPOE_SCOPE_PROC: scope = "proc"; break;
1974 case SPOE_SCOPE_SESS: scope = "sess"; break;
1975 case SPOE_SCOPE_TXN : scope = "txn"; break;
1976 case SPOE_SCOPE_REQ : scope = "req"; break;
1977 case SPOE_SCOPE_RES : scope = "res"; break;
1978 default: goto skip;
1979 }
1980
1981 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
1982 if (str == NULL)
1983 goto skip;
1984 memset(&smp, 0, sizeof(smp));
1985 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
1986
1987 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1988 " - unset-var '%s.%s.%.*s'\n",
1989 (int)now.tv_sec, (int)now.tv_usec,
1990 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
1991 __FUNCTION__, s, scope,
1992 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
1993 (int)sz, str);
1994
1995 unset_spoe_var(ctx, scope, str, sz, &smp);
1996 break;
1997 }
1998
1999 default:
2000 skip_action:
2001 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2002 goto skip;
2003 idx += i;
2004 }
2005 }
2006
2007 return 1;
2008 skip:
2009 return 0;
2010}
2011
2012
2013/* Process a SPOE event. First, this functions will process messages attached to
2014 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2015 * ACK frame to process corresponding actions. During all the processing, it
2016 * returns 0 and it returns 1 when the processing is finished. If an error
2017 * occurred, -1 is returned. */
2018static int
2019process_spoe_event(struct stream *s, struct spoe_context *ctx,
2020 enum spoe_event ev)
2021{
2022 int dir, ret = 1;
2023
2024 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2025 " - ctx-state=%s - event=%s\n",
2026 (int)now.tv_sec, (int)now.tv_usec,
2027 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2028 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2029 spoe_event_str[ev]);
2030
2031 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2032
2033 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2034 goto out;
2035
2036 if (ctx->state == SPOE_CTX_ST_ERROR)
2037 goto error;
2038
2039 if (ctx->state == SPOE_CTX_ST_READY) {
2040 ret = acquire_spoe_appctx(ctx, dir);
2041 if (ret <= 0) {
2042 if (!ret)
2043 goto out;
2044 goto error;
2045 }
2046 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2047 }
2048
2049 if (ctx->appctx == NULL)
2050 goto error;
2051
2052 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2053 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2054 if (ret <= 0) {
2055 if (!ret)
2056 goto skip;
2057 goto error;
2058 }
2059 wakeup_spoe_appctx(ctx);
2060 ret = 0;
2061 goto out;
2062 }
2063
2064 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2065 wakeup_spoe_appctx(ctx);
2066 ret = 0;
2067 goto out;
2068 }
2069
2070 if (ctx->state == SPOE_CTX_ST_DONE) {
2071 ret = process_spoe_actions(s, ctx, ev, dir);
2072 if (ret <= 0) {
2073 if (!ret)
2074 goto skip;
2075 goto error;
2076 }
2077 ctx->frame_id++;
2078 release_spoe_appctx(ctx);
2079 ctx->state = SPOE_CTX_ST_READY;
2080 }
2081
2082 out:
2083 return ret;
2084
2085 skip:
2086 release_spoe_appctx(ctx);
2087 ctx->state = SPOE_CTX_ST_READY;
2088 return 1;
2089
2090 error:
2091 release_spoe_appctx(ctx);
2092 ctx->state = SPOE_CTX_ST_ERROR;
2093 return 1;
2094}
2095
2096
2097/***************************************************************************
2098 * Functions that create/destroy SPOE contexts
2099 **************************************************************************/
2100static struct spoe_context *
2101create_spoe_context(struct filter *filter)
2102{
2103 struct spoe_config *conf = FLT_CONF(filter);
2104 struct spoe_context *ctx;
2105
2106 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2107 if (ctx == NULL) {
2108 return NULL;
2109 }
2110 memset(ctx, 0, sizeof(*ctx));
2111 ctx->filter = filter;
2112 ctx->state = SPOE_CTX_ST_NONE;
2113 ctx->flags = 0;
2114 ctx->errs = 0;
2115 ctx->messages = conf->agent->messages;
2116 ctx->buffer = &buf_empty;
2117 LIST_INIT(&ctx->buffer_wait);
2118 LIST_INIT(&ctx->applet_wait);
2119
2120 ctx->stream_id = 0;
2121 ctx->frame_id = 1;
2122
2123 return ctx;
2124}
2125
2126static void
2127destroy_spoe_context(struct spoe_context *ctx)
2128{
2129 if (!ctx)
2130 return;
2131
2132 if (ctx->appctx)
2133 APPCTX_SPOE(ctx->appctx).ctx = NULL;
2134 if (!LIST_ISEMPTY(&ctx->buffer_wait))
2135 LIST_DEL(&ctx->buffer_wait);
2136 if (!LIST_ISEMPTY(&ctx->applet_wait))
2137 LIST_DEL(&ctx->applet_wait);
2138 pool_free2(pool2_spoe_ctx, ctx);
2139}
2140
2141static void
2142reset_spoe_context(struct spoe_context *ctx)
2143{
2144 ctx->state = SPOE_CTX_ST_READY;
2145 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2146}
2147
2148
2149/***************************************************************************
2150 * Hooks that manage the filter lifecycle (init/check/deinit)
2151 **************************************************************************/
2152/* Signal handler: Do a soft stop, wakeup SPOE applet */
2153static void
2154sig_stop_spoe(struct sig_handler *sh)
2155{
2156 struct proxy *p;
2157
2158 p = proxy;
2159 while (p) {
2160 struct flt_conf *fconf;
2161
2162 list_for_each_entry(fconf, &p->filter_configs, list) {
2163 struct spoe_config *conf = fconf->conf;
2164 struct spoe_agent *agent = conf->agent;
2165 struct appctx *appctx;
2166
2167 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2168 si_applet_want_get(appctx->owner);
2169 si_applet_want_put(appctx->owner);
2170 appctx_wakeup(appctx);
2171 }
2172 }
2173 p = p->next;
2174 }
2175}
2176
2177
2178/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2179static int
2180spoe_init(struct proxy *px, struct flt_conf *fconf)
2181{
2182 struct spoe_config *conf = fconf->conf;
2183 struct listener *l;
2184
2185 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2186 init_new_proxy(&conf->agent_fe);
2187 conf->agent_fe.parent = conf->agent;
2188 conf->agent_fe.last_change = now.tv_sec;
2189 conf->agent_fe.id = conf->agent->id;
2190 conf->agent_fe.cap = PR_CAP_FE;
2191 conf->agent_fe.mode = PR_MODE_TCP;
2192 conf->agent_fe.maxconn = 0;
2193 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2194 conf->agent_fe.conn_retries = CONN_RETRIES;
2195 conf->agent_fe.accept = frontend_accept;
2196 conf->agent_fe.srv = NULL;
2197 conf->agent_fe.timeout.client = TICK_ETERNITY;
2198 conf->agent_fe.default_target = &spoe_applet.obj_type;
2199 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2200
2201 if ((l = calloc(1, sizeof(*l))) == NULL) {
2202 Alert("spoe_init : out of memory.\n");
2203 goto out_error;
2204 }
2205 l->obj_type = OBJ_TYPE_LISTENER;
2206 l->obj_type = OBJ_TYPE_LISTENER;
2207 l->frontend = &conf->agent_fe;
2208 l->state = LI_READY;
2209 l->analysers = conf->agent_fe.fe_req_ana;
2210 LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2211
2212 if (!sighandler_registered) {
2213 signal_register_fct(0, sig_stop_spoe, 0);
2214 sighandler_registered = 1;
2215 }
2216
2217 return 0;
2218
2219 out_error:
2220 return -1;
2221}
2222
2223/* Free ressources allocated by the SPOE filter. */
2224static void
2225spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2226{
2227 struct spoe_config *conf = fconf->conf;
2228
2229 if (conf) {
2230 struct spoe_agent *agent = conf->agent;
2231 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2232 struct listener *, by_fe);
2233
2234 free(l);
2235 release_spoe_agent(agent);
2236 free(conf);
2237 }
2238 fconf->conf = NULL;
2239}
2240
2241/* Check configuration of a SPOE filter for a specified proxy.
2242 * Return 1 on error, else 0. */
2243static int
2244spoe_check(struct proxy *px, struct flt_conf *fconf)
2245{
2246 struct spoe_config *conf = fconf->conf;
2247 struct proxy *target;
2248
2249 target = proxy_be_by_name(conf->agent->b.name);
2250 if (target == NULL) {
2251 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2252 " declared at %s:%d.\n",
2253 px->id, conf->agent->b.name, conf->agent->id,
2254 conf->agent->conf.file, conf->agent->conf.line);
2255 return 1;
2256 }
2257 if (target->mode != PR_MODE_TCP) {
2258 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2259 " at %s:%d does not support HTTP mode.\n",
2260 px->id, target->id, conf->agent->id,
2261 conf->agent->conf.file, conf->agent->conf.line);
2262 return 1;
2263 }
2264
2265 free(conf->agent->b.name);
2266 conf->agent->b.name = NULL;
2267 conf->agent->b.be = target;
2268 return 0;
2269}
2270
2271/**************************************************************************
2272 * Hooks attached to a stream
2273 *************************************************************************/
2274/* Called when a filter instance is created and attach to a stream. It creates
2275 * the context that will be used to process this stream. */
2276static int
2277spoe_start(struct stream *s, struct filter *filter)
2278{
2279 struct spoe_context *ctx;
2280
2281 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2282 (int)now.tv_sec, (int)now.tv_usec,
2283 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2284 __FUNCTION__, s);
2285
2286 ctx = create_spoe_context(filter);
2287 if (ctx == NULL) {
2288 send_log(s->be, LOG_EMERG,
2289 "failed to create SPOE context for proxy %s\n",
2290 s->be->id);
2291 return 0;
2292 }
2293
2294 ctx->strm = s;
2295 ctx->state = SPOE_CTX_ST_READY;
2296 filter->ctx = ctx;
2297
2298 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2299 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2300
2301 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2302 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2303
2304 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2305 filter->pre_analyzers |= AN_RES_INSPECT;
2306
2307 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2308 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2309
2310 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2311 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2312
2313 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2314 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2315
2316 return 1;
2317}
2318
2319/* Called when a filter instance is detached from a stream. It release the
2320 * attached SPOE context. */
2321static void
2322spoe_stop(struct stream *s, struct filter *filter)
2323{
2324 struct spoe_context *ctx = filter->ctx;
2325
2326 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2327 (int)now.tv_sec, (int)now.tv_usec,
2328 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2329 __FUNCTION__, s);
2330
2331 if (ctx) {
2332 release_spoe_appctx(ctx);
2333 destroy_spoe_context(ctx);
2334 }
2335}
2336
2337/* Called when we are ready to filter data on a channel */
2338static int
2339spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2340{
2341 struct spoe_context *ctx = filter->ctx;
2342 int ret = 1;
2343
2344 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2345 " - ctx-flags=0x%08x\n",
2346 (int)now.tv_sec, (int)now.tv_usec,
2347 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2348 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2349
2350 if (!(chn->flags & CF_ISRESP)) {
2351 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2352 chn->analysers |= AN_REQ_INSPECT_FE;
2353 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2354 chn->analysers |= AN_REQ_INSPECT_BE;
2355
2356 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2357 goto out;
2358
2359 ctx->stream_id = s->uniq_id;
2360 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2361 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2362 if (ret != 1)
2363 goto out;
2364 }
2365 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2366 }
2367 else {
2368 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2369 chn->analysers |= AN_RES_INSPECT;
2370
2371 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2372 goto out;
2373
2374 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2375 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2376 if (ret != 1)
2377 goto out;
2378 }
2379 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2380 }
2381
2382 out:
2383 if (!ret) {
2384 channel_dont_read(chn);
2385 channel_dont_close(chn);
2386 }
2387 return ret;
2388}
2389
2390/* Called before a processing happens on a given channel */
2391static int
2392spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2393 struct channel *chn, unsigned an_bit)
2394{
2395 struct spoe_context *ctx = filter->ctx;
2396 int ret = 1;
2397
2398 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2399 " - ctx-flags=0x%08x - ana=0x%08x\n",
2400 (int)now.tv_sec, (int)now.tv_usec,
2401 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2402 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2403 ctx->flags, an_bit);
2404
2405 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2406 goto out;
2407
2408 switch (an_bit) {
2409 case AN_REQ_INSPECT_FE:
2410 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2411 break;
2412 case AN_REQ_INSPECT_BE:
2413 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2414 break;
2415 case AN_RES_INSPECT:
2416 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2417 break;
2418 case AN_REQ_HTTP_PROCESS_FE:
2419 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2420 break;
2421 case AN_REQ_HTTP_PROCESS_BE:
2422 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2423 break;
2424 case AN_RES_HTTP_PROCESS_FE:
2425 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2426 break;
2427 }
2428
2429 out:
2430 if (!ret) {
2431 channel_dont_read(chn);
2432 channel_dont_close(chn);
2433 }
2434 return ret;
2435}
2436
2437/* Called when the filtering on the channel ends. */
2438static int
2439spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2440{
2441 struct spoe_context *ctx = filter->ctx;
2442
2443 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2444 " - ctx-flags=0x%08x\n",
2445 (int)now.tv_sec, (int)now.tv_usec,
2446 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2447 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2448
2449 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2450 reset_spoe_context(ctx);
2451 }
2452
2453 return 1;
2454}
2455
2456/********************************************************************
2457 * Functions that manage the filter initialization
2458 ********************************************************************/
2459struct flt_ops spoe_ops = {
2460 /* Manage SPOE filter, called for each filter declaration */
2461 .init = spoe_init,
2462 .deinit = spoe_deinit,
2463 .check = spoe_check,
2464
2465 /* Handle start/stop of SPOE */
2466 .attach = spoe_start,
2467 .detach = spoe_stop,
2468
2469 /* Handle channels activity */
2470 .channel_start_analyze = spoe_start_analyze,
2471 .channel_pre_analyze = spoe_chn_pre_analyze,
2472 .channel_end_analyze = spoe_end_analyze,
2473};
2474
2475
2476static int
2477cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2478{
2479 const char *err;
2480 int i, err_code = 0;
2481
2482 if ((cfg_scope == NULL && curengine != NULL) ||
2483 (cfg_scope != NULL && curengine == NULL) ||
2484 strcmp(curengine, cfg_scope))
2485 goto out;
2486
2487 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2488 if (!*args[1]) {
2489 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2490 file, linenum);
2491 err_code |= ERR_ALERT | ERR_ABORT;
2492 goto out;
2493 }
2494 if (*args[2]) {
2495 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2496 file, linenum, args[2]);
2497 err_code |= ERR_ALERT | ERR_ABORT;
2498 goto out;
2499 }
2500
2501 err = invalid_char(args[1]);
2502 if (err) {
2503 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2504 file, linenum, *err, args[0], args[1]);
2505 err_code |= ERR_ALERT | ERR_ABORT;
2506 goto out;
2507 }
2508
2509 if (curagent != NULL) {
2510 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2511 file, linenum);
2512 err_code |= ERR_ALERT | ERR_ABORT;
2513 goto out;
2514 }
2515 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2516 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2517 err_code |= ERR_ALERT | ERR_ABORT;
2518 goto out;
2519 }
2520
2521 curagent->id = strdup(args[1]);
2522 curagent->conf.file = strdup(file);
2523 curagent->conf.line = linenum;
2524 curagent->timeout.hello = TICK_ETERNITY;
2525 curagent->timeout.ack = TICK_ETERNITY;
2526 curagent->timeout.idle = TICK_ETERNITY;
2527 curagent->var_pfx = NULL;
2528 curagent->new_applets = 0;
2529
2530 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2531 LIST_INIT(&curagent->messages[i]);
2532 LIST_INIT(&curagent->cache);
2533 LIST_INIT(&curagent->applet_wq);
2534 }
2535 else if (!strcmp(args[0], "use-backend")) {
2536 if (!*args[1]) {
2537 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2538 file, linenum, args[0]);
2539 err_code |= ERR_ALERT | ERR_FATAL;
2540 goto out;
2541 }
2542 if (*args[2]) {
2543 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2544 file, linenum, args[2]);
2545 err_code |= ERR_ALERT | ERR_ABORT;
2546 goto out;
2547 }
2548 free(curagent->b.name);
2549 curagent->b.name = strdup(args[1]);
2550 }
2551 else if (!strcmp(args[0], "messages")) {
2552 int cur_arg = 1;
2553 while (*args[cur_arg]) {
2554 struct spoe_msg_placeholder *mp = NULL;
2555
2556 list_for_each_entry(mp, &curmps, list) {
2557 if (!strcmp(mp->id, args[cur_arg])) {
2558 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2559 file, linenum, args[cur_arg]);
2560 err_code |= ERR_ALERT | ERR_FATAL;
2561 goto out;
2562 }
2563 }
2564
2565 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2566 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2567 err_code |= ERR_ALERT | ERR_ABORT;
2568 goto out;
2569 }
2570 mp->id = strdup(args[cur_arg]);
2571 LIST_ADDQ(&curmps, &mp->list);
2572 cur_arg++;
2573 }
2574 }
2575 else if (!strcmp(args[0], "timeout")) {
2576 unsigned int *tv = NULL;
2577 const char *res;
2578 unsigned timeout;
2579
2580 if (!*args[1]) {
2581 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2582 file, linenum);
2583 err_code |= ERR_ALERT | ERR_FATAL;
2584 goto out;
2585 }
2586 if (!strcmp(args[1], "hello"))
2587 tv = &curagent->timeout.hello;
2588 else if (!strcmp(args[1], "idle"))
2589 tv = &curagent->timeout.idle;
2590 else if (!strcmp(args[1], "ack"))
2591 tv = &curagent->timeout.ack;
2592 else {
2593 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' and 'ack' (got %s).\n",
2594 file, linenum, args[1]);
2595 err_code |= ERR_ALERT | ERR_FATAL;
2596 goto out;
2597 }
2598 if (!*args[2]) {
2599 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2600 file, linenum, args[1]);
2601 err_code |= ERR_ALERT | ERR_FATAL;
2602 goto out;
2603 }
2604 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2605 if (res) {
2606 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2607 file, linenum, *res, args[1]);
2608 err_code |= ERR_ALERT | ERR_ABORT;
2609 goto out;
2610 }
2611 if (*args[3]) {
2612 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2613 file, linenum, args[3]);
2614 err_code |= ERR_ALERT | ERR_ABORT;
2615 goto out;
2616 }
2617 *tv = MS_TO_TICKS(timeout);
2618 }
2619 else if (!strcmp(args[0], "option")) {
2620 if (!*args[1]) {
2621 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2622 file, linenum, args[0]);
2623 err_code |= ERR_ALERT | ERR_FATAL;
2624 goto out;
2625 }
2626 if (!strcmp(args[1], "var-prefix")) {
2627 char *tmp;
2628
2629 if (!*args[2]) {
2630 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2631 file, linenum, args[0],
2632 args[1]);
2633 err_code |= ERR_ALERT | ERR_FATAL;
2634 goto out;
2635 }
2636 tmp = args[2];
2637 while (*tmp) {
2638 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2639 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2640 file, linenum, args[0], args[1]);
2641 err_code |= ERR_ALERT | ERR_FATAL;
2642 goto out;
2643 }
2644 tmp++;
2645 }
2646 curagent->var_pfx = strdup(args[2]);
2647 }
2648 else {
2649 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2650 file, linenum, args[1]);
2651 err_code |= ERR_ALERT | ERR_FATAL;
2652 goto out;
2653 }
2654 }
2655 else if (*args[0]) {
2656 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2657 file, linenum, args[0]);
2658 err_code |= ERR_ALERT | ERR_FATAL;
2659 goto out;
2660 }
2661 out:
2662 return err_code;
2663}
2664
2665static int
2666cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2667{
2668 struct spoe_message *msg;
2669 struct spoe_arg *arg;
2670 const char *err;
2671 char *errmsg = NULL;
2672 int err_code = 0;
2673
2674 if ((cfg_scope == NULL && curengine != NULL) ||
2675 (cfg_scope != NULL && curengine == NULL) ||
2676 strcmp(curengine, cfg_scope))
2677 goto out;
2678
2679 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2680 if (!*args[1]) {
2681 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2682 file, linenum);
2683 err_code |= ERR_ALERT | ERR_ABORT;
2684 goto out;
2685 }
2686 if (*args[2]) {
2687 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2688 file, linenum, args[2]);
2689 err_code |= ERR_ALERT | ERR_ABORT;
2690 goto out;
2691 }
2692
2693 err = invalid_char(args[1]);
2694 if (err) {
2695 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2696 file, linenum, *err, args[0], args[1]);
2697 err_code |= ERR_ALERT | ERR_ABORT;
2698 goto out;
2699 }
2700
2701 list_for_each_entry(msg, &curmsgs, list) {
2702 if (!strcmp(msg->id, args[1])) {
2703 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2704 " name as another one declared at %s:%d.\n",
2705 file, linenum, args[1], msg->conf.file, msg->conf.line);
2706 err_code |= ERR_ALERT | ERR_FATAL;
2707 goto out;
2708 }
2709 }
2710
2711 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2712 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2713 err_code |= ERR_ALERT | ERR_ABORT;
2714 goto out;
2715 }
2716
2717 curmsg->id = strdup(args[1]);
2718 curmsg->id_len = strlen(curmsg->id);
2719 curmsg->event = SPOE_EV_NONE;
2720 curmsg->conf.file = strdup(file);
2721 curmsg->conf.line = linenum;
2722 LIST_INIT(&curmsg->args);
2723 LIST_ADDQ(&curmsgs, &curmsg->list);
2724 }
2725 else if (!strcmp(args[0], "args")) {
2726 int cur_arg = 1;
2727
2728 curproxy->conf.args.ctx = ARGC_SPOE;
2729 curproxy->conf.args.file = file;
2730 curproxy->conf.args.line = linenum;
2731 while (*args[cur_arg]) {
2732 char *delim = strchr(args[cur_arg], '=');
2733 int idx = 0;
2734
2735 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2736 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2737 err_code |= ERR_ALERT | ERR_ABORT;
2738 goto out;
2739 }
2740
2741 if (!delim) {
2742 arg->name = NULL;
2743 arg->name_len = 0;
2744 delim = args[cur_arg];
2745 }
2746 else {
2747 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2748 arg->name_len = delim - args[cur_arg];
2749 delim++;
2750 }
2751
2752 arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
2753 if (arg->expr == NULL) {
2754 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2755 err_code |= ERR_ALERT | ERR_FATAL;
2756 free(arg->name);
2757 free(arg);
2758 goto out;
2759 }
2760 LIST_ADDQ(&curmsg->args, &arg->list);
2761 cur_arg++;
2762 }
2763 curproxy->conf.args.file = NULL;
2764 curproxy->conf.args.line = 0;
2765 }
2766 else if (!strcmp(args[0], "event")) {
2767 if (!*args[1]) {
2768 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2769 err_code |= ERR_ALERT | ERR_ABORT;
2770 goto out;
2771 }
2772 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2773 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2774 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2775 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2776
2777 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2778 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2779 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2780 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2781 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2782 curmsg->event = SPOE_EV_ON_TCP_RSP;
2783
2784 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2785 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2786 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2787 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2788 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2789 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2790 else {
2791 Alert("parsing [%s:%d] : unkown event '%s'.\n",
2792 file, linenum, args[1]);
2793 err_code |= ERR_ALERT | ERR_ABORT;
2794 goto out;
2795 }
2796 }
2797 else if (!*args[0]) {
2798 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
2799 file, linenum, args[0]);
2800 err_code |= ERR_ALERT | ERR_FATAL;
2801 goto out;
2802 }
2803 out:
2804 free(errmsg);
2805 return err_code;
2806}
2807
2808/* Return -1 on error, else 0 */
2809static int
2810parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
2811 struct flt_conf *fconf, char **err, void *private)
2812{
2813 struct list backup_sections;
2814 struct spoe_config *conf;
2815 struct spoe_message *msg, *msgback;
2816 struct spoe_msg_placeholder *mp, *mpback;
2817 char *file = NULL, *engine = NULL;
2818 int ret, pos = *cur_arg + 1;
2819
2820 conf = calloc(1, sizeof(*conf));
2821 if (conf == NULL) {
2822 memprintf(err, "%s: out of memory", args[*cur_arg]);
2823 goto error;
2824 }
2825 conf->proxy = px;
2826
2827 while (*args[pos]) {
2828 if (!strcmp(args[pos], "config")) {
2829 if (!*args[pos+1]) {
2830 memprintf(err, "'%s' : '%s' option without value",
2831 args[*cur_arg], args[pos]);
2832 goto error;
2833 }
2834 file = args[pos+1];
2835 pos += 2;
2836 }
2837 else if (!strcmp(args[pos], "engine")) {
2838 if (!*args[pos+1]) {
2839 memprintf(err, "'%s' : '%s' option without value",
2840 args[*cur_arg], args[pos]);
2841 goto error;
2842 }
2843 engine = args[pos+1];
2844 pos += 2;
2845 }
2846 else {
2847 memprintf(err, "unknown keyword '%s'", args[pos]);
2848 goto error;
2849 }
2850 }
2851 if (file == NULL) {
2852 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
2853 goto error;
2854 }
2855
2856 /* backup sections and register SPOE sections */
2857 LIST_INIT(&backup_sections);
2858 cfg_backup_sections(&backup_sections);
2859 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
2860 cfg_register_section("spoe-message", cfg_parse_spoe_message);
2861
2862 /* Parse SPOE filter configuration file */
2863 curengine = engine;
2864 curproxy = px;
2865 curagent = NULL;
2866 curmsg = NULL;
2867 ret = readcfgfile(file);
2868 curproxy = NULL;
2869
2870 /* unregister SPOE sections and restore previous sections */
2871 cfg_unregister_sections();
2872 cfg_restore_sections(&backup_sections);
2873
2874 if (ret == -1) {
2875 memprintf(err, "Could not open configuration file %s : %s",
2876 file, strerror(errno));
2877 goto error;
2878 }
2879 if (ret & (ERR_ABORT|ERR_FATAL)) {
2880 memprintf(err, "Error(s) found in configuration file %s", file);
2881 goto error;
2882 }
2883
2884 /* Check SPOE agent */
2885 if (curagent == NULL) {
2886 memprintf(err, "No SPOE agent found in file %s", file);
2887 goto error;
2888 }
2889 if (curagent->b.name == NULL) {
2890 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
2891 curagent->id, curagent->conf.file, curagent->conf.line);
2892 goto error;
2893 }
2894 if (curagent->timeout.hello == TICK_ETERNITY ||
2895 curagent->timeout.idle == TICK_ETERNITY ||
2896 curagent->timeout.ack == TICK_ETERNITY) {
2897 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
2898 " | While not properly invalid, you will certainly encounter various problems\n"
2899 " | with such a configuration. To fix this, please ensure that all following\n"
2900 " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack'.\n",
2901 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
2902 }
2903 if (curagent->var_pfx == NULL) {
2904 char *tmp = curagent->id;
2905
2906 while (*tmp) {
2907 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2908 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
2909 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
2910 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
2911 goto error;
2912 }
2913 tmp++;
2914 }
2915 curagent->var_pfx = strdup(curagent->id);
2916 }
2917
2918 if (LIST_ISEMPTY(&curmps)) {
2919 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
2920 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
2921 goto finish;
2922 }
2923
2924 list_for_each_entry_safe(mp, mpback, &curmps, list) {
2925 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
2926 if (!strcmp(msg->id, mp->id)) {
2927 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
2928 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
2929 msg->event = SPOE_EV_ON_TCP_REQ_FE;
2930 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
2931 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
2932 }
2933 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
2934 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
2935 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
2936 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
2937 px->id, msg->conf.file, msg->conf.line);
2938 goto next;
2939 }
2940 if (msg->event == SPOE_EV_NONE) {
2941 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
2942 px->id, msg->conf.file, msg->conf.line);
2943 goto next;
2944 }
2945 msg->agent = curagent;
2946 LIST_DEL(&msg->list);
2947 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
2948 goto next;
2949 }
2950 }
2951 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
2952 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
2953 goto error;
2954 next:
2955 continue;
2956 }
2957
2958 finish:
2959 conf->agent = curagent;
2960 list_for_each_entry_safe(mp, mpback, &curmps, list) {
2961 LIST_DEL(&mp->list);
2962 release_spoe_msg_placeholder(mp);
2963 }
2964 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
2965 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
2966 px->id, msg->id, msg->conf.file, msg->conf.line);
2967 LIST_DEL(&msg->list);
2968 release_spoe_message(msg);
2969 }
2970
2971 *cur_arg = pos;
2972 fconf->ops = &spoe_ops;
2973 fconf->conf = conf;
2974 return 0;
2975
2976 error:
2977 release_spoe_agent(curagent);
2978 list_for_each_entry_safe(mp, mpback, &curmps, list) {
2979 LIST_DEL(&mp->list);
2980 release_spoe_msg_placeholder(mp);
2981 }
2982 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
2983 LIST_DEL(&msg->list);
2984 release_spoe_message(msg);
2985 }
2986 free(conf);
2987 return -1;
2988}
2989
2990
2991/* Declare the filter parser for "spoe" keyword */
2992static struct flt_kw_list flt_kws = { "SPOE", { }, {
2993 { "spoe", parse_spoe_flt, NULL },
2994 { NULL, NULL, NULL },
2995 }
2996};
2997
2998__attribute__((constructor))
2999static void __spoe_init(void)
3000{
3001 flt_register_keywords(&flt_kws);
3002
3003 LIST_INIT(&curmsgs);
3004 LIST_INIT(&curmps);
3005 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3006}
3007
3008__attribute__((destructor))
3009static void
3010__spoe_deinit(void)
3011{
3012 pool_destroy2(pool2_spoe_ctx);
3013}