blob: 12e589ebcdf93a982f8a18abeccb36755901eb21 [file] [log] [blame]
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001/*
2 * Stream processing offload engine management.
3 *
4 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 *
11 */
12#include <ctype.h>
13#include <errno.h>
14
15#include <common/buffer.h>
16#include <common/cfgparse.h>
17#include <common/compat.h>
18#include <common/config.h>
19#include <common/debug.h>
20#include <common/memory.h>
21#include <common/time.h>
22
23#include <types/arg.h>
24#include <types/filters.h>
25#include <types/global.h>
26#include <types/proxy.h>
27#include <types/sample.h>
28#include <types/stream.h>
29
30#include <proto/arg.h>
31#include <proto/backend.h>
32#include <proto/filters.h>
33#include <proto/frontend.h>
34#include <proto/log.h>
35#include <proto/proto_http.h>
36#include <proto/proxy.h>
37#include <proto/sample.h>
38#include <proto/session.h>
39#include <proto/signal.h>
40#include <proto/stream.h>
41#include <proto/stream_interface.h>
42#include <proto/task.h>
43#include <proto/vars.h>
44
45#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
46#define SPOE_PRINTF(x...) fprintf(x)
47#else
48#define SPOE_PRINTF(x...)
49#endif
50
51/* Helper to get ctx inside an appctx */
52#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
53
54/* TODO: add an option to customize these values */
55/* The maximum number of new applet waiting the end of the hello handshake */
56#define MAX_NEW_SPOE_APPLETS 5
57
58/* The maximum number of error when a stream is waiting of a SPOE applet */
59#define MAX_NEW_SPOE_APPLET_ERRS 3
60
61/* Minimal size for a frame */
62#define MIN_FRAME_SIZE 256
63
64/* Flags set on the SPOE context */
65#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
66#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
67#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
68#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
69
70#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
71
72#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
73#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
74
75/* All possible states for a SPOE context */
76enum spoe_ctx_state {
77 SPOE_CTX_ST_NONE = 0,
78 SPOE_CTX_ST_READY,
79 SPOE_CTX_ST_SENDING_MSGS,
80 SPOE_CTX_ST_WAITING_ACK,
81 SPOE_CTX_ST_DONE,
82 SPOE_CTX_ST_ERROR,
83};
84
85/* All possible states for a SPOE applet */
86enum spoe_appctx_state {
87 SPOE_APPCTX_ST_CONNECT = 0,
88 SPOE_APPCTX_ST_CONNECTING,
89 SPOE_APPCTX_ST_PROCESSING,
90 SPOE_APPCTX_ST_DISCONNECT,
91 SPOE_APPCTX_ST_DISCONNECTING,
92 SPOE_APPCTX_ST_EXIT,
93 SPOE_APPCTX_ST_END,
94};
95
96/* All supported SPOE actions */
97enum spoe_action_type {
98 SPOE_ACT_T_SET_VAR = 1,
99 SPOE_ACT_T_UNSET_VAR,
100 SPOE_ACT_TYPES,
101};
102
103/* All supported SPOE events */
104enum spoe_event {
105 SPOE_EV_NONE = 0,
106
107 /* Request events */
108 SPOE_EV_ON_CLIENT_SESS = 1,
109 SPOE_EV_ON_TCP_REQ_FE,
110 SPOE_EV_ON_TCP_REQ_BE,
111 SPOE_EV_ON_HTTP_REQ_FE,
112 SPOE_EV_ON_HTTP_REQ_BE,
113
114 /* Response events */
115 SPOE_EV_ON_SERVER_SESS,
116 SPOE_EV_ON_TCP_RSP,
117 SPOE_EV_ON_HTTP_RSP,
118
119 SPOE_EV_EVENTS
120};
121
122/* Errors triggerd by SPOE applet */
123enum spoe_frame_error {
124 SPOE_FRM_ERR_NONE = 0,
125 SPOE_FRM_ERR_IO,
126 SPOE_FRM_ERR_TOUT,
127 SPOE_FRM_ERR_TOO_BIG,
128 SPOE_FRM_ERR_INVALID,
129 SPOE_FRM_ERR_NO_VSN,
130 SPOE_FRM_ERR_NO_FRAME_SIZE,
131 SPOE_FRM_ERR_NO_CAP,
132 SPOE_FRM_ERR_BAD_VSN,
133 SPOE_FRM_ERR_BAD_FRAME_SIZE,
134 SPOE_FRM_ERR_UNKNOWN = 99,
135 SPOE_FRM_ERRS,
136};
137
138/* Scopes used for variables set by agents. It is a way to be agnotic to vars
139 * scope. */
140enum spoe_vars_scope {
141 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
142 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
143 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
144 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
145 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
146};
147
148
149/* Describe an argument that will be linked to a message. It is a sample fetch,
150 * with an optional name. */
151struct spoe_arg {
152 char *name; /* Name of the argument, may be NULL */
153 unsigned int name_len; /* The name length, 0 if NULL */
154 struct sample_expr *expr; /* Sample expression */
155 struct list list; /* Used to chain SPOE args */
156};
157
158/* Used during the config parsing only because, when a SPOE agent section is
159 * parsed, messages can be undefined. */
160struct spoe_msg_placeholder {
161 char *id; /* SPOE message placeholder id */
162 struct list list; /* Use to chain SPOE message placeholders */
163};
164
165/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
166 * an argument list (see above) and it is linked to a specific event. */
167struct spoe_message {
168 char *id; /* SPOE message id */
169 unsigned int id_len; /* The message id length */
170 struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
171 struct {
172 char *file; /* file where the SPOE message appears */
173 int line; /* line where the SPOE message appears */
174 } conf; /* config information */
175 struct list args; /* Arguments added when the SPOE messages is sent */
176 struct list list; /* Used to chain SPOE messages */
177
178 enum spoe_event event; /* SPOE_EV_* */
179};
180
181/* Describe a SPOE agent. */
182struct spoe_agent {
183 char *id; /* SPOE agent id (name) */
184 struct {
185 char *file; /* file where the SPOE agent appears */
186 int line; /* line where the SPOE agent appears */
187 } conf; /* config information */
188 union {
189 struct proxy *be; /* Backend used by this agent */
190 char *name; /* Backend name used during conf parsing */
191 } b;
192 struct {
193 unsigned int hello; /* Max time to receive AGENT-HELLO frame */
194 unsigned int idle; /* Max Idle timeout */
195 unsigned int ack; /* Max time to acknowledge a NOTIFY frame */
196 } timeout;
197
198 char *var_pfx; /* Prefix used for vars set by the agent */
199
200 struct list cache; /* List used to cache SPOE streams. In
201 * fact, we cache the SPOE applect ctx */
202
203 struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
204 * for each supported events */
205
206 struct list applet_wq; /* List of streams waiting for a SPOE applet */
207 unsigned int new_applets; /* The number of new SPOE applets */
208};
209
210/* SPOE filter configuration */
211struct spoe_config {
212 struct proxy *proxy; /* Proxy owning the filter */
213 struct spoe_agent *agent; /* Agent used by this filter */
214 struct proxy agent_fe; /* Agent frontend */
215};
216
217/* SPOE context attached to a stream. It is the main structure that handles the
218 * processing offload */
219struct spoe_context {
220 struct filter *filter; /* The SPOE filter */
221 struct stream *strm; /* The stream that should be offloaded */
222 struct appctx *appctx; /* The SPOE appctx */
223 struct list *messages; /* List of messages that will be sent during the stream processing */
224 struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
225 struct list buffer_wait; /* position in the list of streams waiting for a buffer */
226 struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
227
228 unsigned int errs; /* The number of errors to acquire a SPOE applet */
229
230 enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
231 unsigned int flags; /* SPOE_CTX_FL_* */
232
233 unsigned int stream_id; /* stream_id and frame_id are used */
234 unsigned int frame_id; /* to map NOTIFY and ACK frames */
235
236};
237
238/* Set if the handle on SIGUSR1 is registered */
239static int sighandler_registered = 0;
240
241/* proxy used during the parsing */
242struct proxy *curproxy = NULL;
243
244/* The name of the SPOE engine, used during the parsing */
245char *curengine = NULL;
246
247/* SPOE agent used during the parsing */
248struct spoe_agent *curagent = NULL;
249
250/* SPOE message used during the parsing */
251struct spoe_message *curmsg = NULL;
252
253/* list of SPOE messages and placeholders used during the parsing */
254struct list curmsgs;
255struct list curmps;
256
257/* Pool used to allocate new SPOE contexts */
258static struct pool_head *pool2_spoe_ctx = NULL;
259
260/* Temporary variables used to ease error processing */
261int spoe_status_code = SPOE_FRM_ERR_NONE;
262char spoe_reason[256];
263
264struct flt_ops spoe_ops;
265
266static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
267static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
268static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
269
270/********************************************************************
271 * helper functions/globals
272 ********************************************************************/
273static void
274release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
275{
276 if (!mp)
277 return;
278 free(mp->id);
279 free(mp);
280}
281
282
283static void
284release_spoe_message(struct spoe_message *msg)
285{
286 struct spoe_arg *arg, *back;
287
288 if (!msg)
289 return;
290 free(msg->id);
291 free(msg->conf.file);
292 list_for_each_entry_safe(arg, back, &msg->args, list) {
293 release_sample_expr(arg->expr);
294 free(arg->name);
295 LIST_DEL(&arg->list);
296 free(arg);
297 }
298 free(msg);
299}
300
301static void
302release_spoe_agent(struct spoe_agent *agent)
303{
304 struct spoe_message *msg, *back;
305 int i;
306
307 if (!agent)
308 return;
309 free(agent->id);
310 free(agent->conf.file);
311 free(agent->var_pfx);
312 for (i = 0; i < SPOE_EV_EVENTS; ++i) {
313 list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
314 LIST_DEL(&msg->list);
315 release_spoe_message(msg);
316 }
317 }
318 free(agent);
319}
320
321static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
322 [SPOE_FRM_ERR_NONE] = "normal",
323 [SPOE_FRM_ERR_IO] = "I/O error",
324 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
325 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
326 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
327 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
328 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
329 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
330 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
331 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
332 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
333};
334
335static const char *spoe_event_str[SPOE_EV_EVENTS] = {
336 [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
337 [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
338 [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
339 [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
340 [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
341
342 [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
343 [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
344 [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
345};
346
347
348#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
349
350static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
351 [SPOE_CTX_ST_NONE] = "NONE",
352 [SPOE_CTX_ST_READY] = "READY",
353 [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
354 [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
355 [SPOE_CTX_ST_DONE] = "DONE",
356 [SPOE_CTX_ST_ERROR] = "ERROR",
357};
358
359static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
360 [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
361 [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
362 [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
363 [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
364 [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
365 [SPOE_APPCTX_ST_EXIT] = "EXIT",
366 [SPOE_APPCTX_ST_END] = "END",
367};
368
369#endif
370/********************************************************************
371 * Functions that encode/decode SPOE frames
372 ********************************************************************/
373/* Frame Types sent by HAProxy and by agents */
374enum spoe_frame_type {
375 /* Frames sent by HAProxy */
376 SPOE_FRM_T_HAPROXY_HELLO = 1,
377 SPOE_FRM_T_HAPROXY_DISCON,
378 SPOE_FRM_T_HAPROXY_NOTIFY,
379
380 /* Frames sent by the agents */
381 SPOE_FRM_T_AGENT_HELLO = 101,
382 SPOE_FRM_T_AGENT_DISCON,
383 SPOE_FRM_T_AGENT_ACK
384};
385
386/* All supported data types */
387enum spoe_data_type {
388 SPOE_DATA_T_NULL = 0,
389 SPOE_DATA_T_BOOL,
390 SPOE_DATA_T_INT32,
391 SPOE_DATA_T_UINT32,
392 SPOE_DATA_T_INT64,
393 SPOE_DATA_T_UINT64,
394 SPOE_DATA_T_IPV4,
395 SPOE_DATA_T_IPV6,
396 SPOE_DATA_T_STR,
397 SPOE_DATA_T_BIN,
398 SPOE_DATA_TYPES
399};
400
401/* Masks to get data type or flags value */
402#define SPOE_DATA_T_MASK 0x0F
403#define SPOE_DATA_FL_MASK 0xF0
404
405/* Flags to set Boolean values */
406#define SPOE_DATA_FL_FALSE 0x00
407#define SPOE_DATA_FL_TRUE 0x10
408
409/* Helper to get static string length, excluding the terminating null byte */
410#define SLEN(str) (sizeof(str)-1)
411
412/* Predefined key used in HELLO/DISCONNECT frames */
413#define SUPPORTED_VERSIONS_KEY "supported-versions"
414#define VERSION_KEY "version"
415#define MAX_FRAME_SIZE_KEY "max-frame-size"
416#define CAPABILITIES_KEY "capabilities"
Christopher Fauletba7bc162016-11-07 21:07:38 +0100417#define HEALTHCHECK_KEY "healthcheck"
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +0200418#define STATUS_CODE_KEY "status-code"
419#define MSG_KEY "message"
420
421struct spoe_version {
422 char *str;
423 int min;
424 int max;
425};
426
427/* All supported versions */
428static struct spoe_version supported_versions[] = {
429 {"1.0", 1000, 1000},
430 {NULL, 0, 0}
431};
432
433/* Comma-separated list of supported versions */
434#define SUPPORTED_VERSIONS_VAL "1.0"
435
436/* Comma-separated list of supported capabilities (none for now) */
437#define CAPABILITIES_VAL ""
438
439static int
440decode_spoe_version(const char *str, size_t len)
441{
442 char tmp[len+1], *start, *end;
443 double d;
444 int vsn = -1;
445
446 memset(tmp, 0, len+1);
447 memcpy(tmp, str, len);
448
449 start = tmp;
450 while (isspace(*start))
451 start++;
452
453 d = strtod(start, &end);
454 if (d == 0 || start == end)
455 goto out;
456
457 if (*end) {
458 while (isspace(*end))
459 end++;
460 if (*end)
461 goto out;
462 }
463 vsn = (int)(d * 1000);
464 out:
465 return vsn;
466}
467
468/* Encode a variable-length integer. This function never fails and returns the
469 * number of written bytes. */
470static int
471encode_spoe_varint(uint64_t i, char *buf)
472{
473 int idx;
474
475 if (i < 240) {
476 buf[0] = (unsigned char)i;
477 return 1;
478 }
479
480 buf[0] = (unsigned char)i | 240;
481 i = (i - 240) >> 4;
482 for (idx = 1; i >= 128; ++idx) {
483 buf[idx] = (unsigned char)i | 128;
484 i = (i - 128) >> 7;
485 }
486 buf[idx++] = (unsigned char)i;
487 return idx;
488}
489
490/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
491 * happens when the buffer's end in reached. On success, the number of read
492 * bytes is returned. */
493static int
494decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
495{
496 unsigned char *msg = (unsigned char *)buf;
497 int idx = 0;
498
499 if (msg > (unsigned char *)end)
500 return -1;
501
502 if (msg[0] < 240) {
503 *i = msg[0];
504 return 1;
505 }
506 *i = msg[0];
507 do {
508 ++idx;
509 if (msg+idx > (unsigned char *)end)
510 return -1;
511 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
512 } while (msg[idx] >= 128);
513 return (idx + 1);
514}
515
516/* Encode a string. The string will be prefix by its length, encoded as a
517 * variable-length integer. This function never fails and returns the number of
518 * written bytes. */
519static int
520encode_spoe_string(const char *str, size_t len, char *dst)
521{
522 int idx = 0;
523
524 if (!len) {
525 dst[0] = 0;
526 return 1;
527 }
528
529 idx += encode_spoe_varint(len, dst);
530 memcpy(dst+idx, str, len);
531 return (idx + len);
532}
533
534/* Decode a string. Its length is decoded first as a variable-length integer. If
535 * it succeeds, and if the string length is valid, the begin of the string is
536 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
537 * read is returned. If an error occurred, -1 is returned and <*str> remains
538 * NULL. */
539static int
540decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
541{
542 int i, idx = 0;
543
544 *str = NULL;
545 *len = 0;
546
547 if ((i = decode_spoe_varint(buf, end, len)) == -1)
548 goto error;
549 idx += i;
550 if (buf + idx + *len > end)
551 goto error;
552
553 *str = buf+idx;
554 return (idx + *len);
555
556 error:
557 return -1;
558}
559
560/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
561 * of bytes read is returned. A types data is composed of a type (1 byte) and
562 * corresponding data:
563 * - boolean: non additional data (0 bytes)
564 * - integers: a variable-length integer (see decode_spoe_varint)
565 * - ipv4: 4 bytes
566 * - ipv6: 16 bytes
567 * - binary and string: a buffer prefixed by its size, a variable-length
568 * integer (see decode_spoe_string) */
569static int
570skip_spoe_data(char *frame, char *end)
571{
572 uint64_t sz = 0;
573 int i, idx = 0;
574
575 if (frame > end)
576 return -1;
577
578 switch (frame[idx++] & SPOE_DATA_T_MASK) {
579 case SPOE_DATA_T_BOOL:
580 break;
581 case SPOE_DATA_T_INT32:
582 case SPOE_DATA_T_INT64:
583 case SPOE_DATA_T_UINT32:
584 case SPOE_DATA_T_UINT64:
585 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
586 return -1;
587 idx += i;
588 break;
589 case SPOE_DATA_T_IPV4:
590 idx += 4;
591 break;
592 case SPOE_DATA_T_IPV6:
593 idx += 16;
594 break;
595 case SPOE_DATA_T_STR:
596 case SPOE_DATA_T_BIN:
597 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
598 return -1;
599 idx += i + sz;
600 break;
601 }
602
603 if (frame+idx > end)
604 return -1;
605 return idx;
606}
607
608/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
609 * number of read bytes is returned. See skip_spoe_data for details. */
610static int
611decode_spoe_data(char *frame, char *end, struct sample *smp)
612{
613 uint64_t sz = 0;
614 int type, i, idx = 0;
615
616 if (frame > end)
617 return -1;
618
619 type = frame[idx++];
620 switch (type & SPOE_DATA_T_MASK) {
621 case SPOE_DATA_T_BOOL:
622 smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
623 smp->data.type = SMP_T_BOOL;
624 break;
625 case SPOE_DATA_T_INT32:
626 case SPOE_DATA_T_INT64:
627 case SPOE_DATA_T_UINT32:
628 case SPOE_DATA_T_UINT64:
629 if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
630 return -1;
631 idx += i;
632 smp->data.type = SMP_T_SINT;
633 break;
634 case SPOE_DATA_T_IPV4:
635 if (frame+idx+4 > end)
636 return -1;
637 memcpy(&smp->data.u.ipv4, frame+idx, 4);
638 smp->data.type = SMP_T_IPV4;
639 idx += 4;
640 break;
641 case SPOE_DATA_T_IPV6:
642 if (frame+idx+16 > end)
643 return -1;
644 memcpy(&smp->data.u.ipv6, frame+idx, 16);
645 smp->data.type = SMP_T_IPV6;
646 idx += 16;
647 break;
648 case SPOE_DATA_T_STR:
649 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
650 return -1;
651 idx += i;
652 if (frame+idx+sz > end)
653 return -1;
654 smp->data.u.str.str = frame+idx;
655 smp->data.u.str.len = sz;
656 smp->data.type = SMP_T_STR;
657 idx += sz;
658 break;
659 case SPOE_DATA_T_BIN:
660 if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
661 return -1;
662 idx += i;
663 if (frame+idx+sz > end)
664 return -1;
665 smp->data.u.str.str = frame+idx;
666 smp->data.u.str.len = sz;
667 smp->data.type = SMP_T_BIN;
668 idx += sz;
669 break;
670 }
671
672 if (frame+idx > end)
673 return -1;
674 return idx;
675}
676
677/* Skip an action in a frame received from an agent. If an error occurred, -1 is
678 * returned, otherwise the number of read bytes is returned. An action is
679 * composed of the action type followed by a typed data. */
680static int
681skip_spoe_action(char *frame, char *end)
682{
683 int n, i, idx = 0;
684
685 if (frame+2 > end)
686 return -1;
687
688 idx++; /* Skip the action type */
689 n = frame[idx++];
690 while (n-- > 0) {
691 if ((i = skip_spoe_data(frame+idx, end)) == -1)
692 return -1;
693 idx += i;
694 }
695
696 if (frame+idx > end)
697 return -1;
698 return idx;
699}
700
701/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
702 * success, 0 if the frame can be ignored and -1 if an error occurred. */
703static int
704prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
705{
706 int idx = 0;
707 size_t max = (7 /* TYPE + METADATA */
708 + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
709 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
710 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
711
712 if (size < max)
713 return -1;
714
715 /* Frame type */
716 frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
717
718 /* No flags for now */
719 memset(frame+idx, 0, 4);
720 idx += 4;
721
722 /* No stream-id and frame-id for HELLO frames */
723 frame[idx++] = 0;
724 frame[idx++] = 0;
725
726 /* There are 3 mandatory items: "supported-versions", "max-frame-size"
727 * and "capabilities" */
728
729 /* "supported-versions" K/V item */
730 idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
731 frame[idx++] = SPOE_DATA_T_STR;
732 idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
733
734 /* "max-fram-size" K/V item */
735 idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
736 frame[idx++] = SPOE_DATA_T_UINT32;
737 idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
738
739 /* "capabilities" K/V item */
740 idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
741 frame[idx++] = SPOE_DATA_T_STR;
742 idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
743
744 return idx;
745}
746
747/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
748 * size on success, 0 if the frame can be ignored and -1 if an error
749 * occurred. */
750static int
751prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
752{
753 const char *reason;
754 int rlen, idx = 0;
755 size_t max = (7 /* TYPE + METADATA */
756 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
757 + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
758
759 if (size < max)
760 return -1;
761
762 /* Get the message corresponding to the status code */
763 if (spoe_status_code >= SPOE_FRM_ERRS)
764 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
765 reason = spoe_frm_err_reasons[spoe_status_code];
766 rlen = strlen(reason);
767
768 /* Frame type */
769 frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
770
771 /* No flags for now */
772 memset(frame+idx, 0, 4);
773 idx += 4;
774
775 /* No stream-id and frame-id for DISCONNECT frames */
776 frame[idx++] = 0;
777 frame[idx++] = 0;
778
779 /* There are 2 mandatory items: "status-code" and "message" */
780
781 /* "status-code" K/V item */
782 idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
783 frame[idx++] = SPOE_DATA_T_UINT32;
784 idx += encode_spoe_varint(spoe_status_code, frame+idx);
785
786 /* "message" K/V item */
787 idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
788 frame[idx++] = SPOE_DATA_T_STR;
789 idx += encode_spoe_string(reason, rlen, frame+idx);
790
791 return idx;
792}
793
794/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
795 * success, 0 if the frame can be ignored and -1 if an error occurred. */
796static int
797prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
798{
799 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
800 int idx = 0;
801
802 if (size < APPCTX_SPOE(appctx).max_frame_size)
803 return -1;
804
805 frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
806
807 /* No flags for now */
808 memset(frame+idx, 0, 4);
809 idx += 4;
810
811 /* Set stream-id and frame-id */
812 idx += encode_spoe_varint(ctx->stream_id, frame+idx);
813 idx += encode_spoe_varint(ctx->frame_id, frame+idx);
814
815 /* Copy encoded messages */
816 memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
817 idx += ctx->buffer->i;
818
819 return idx;
820}
821
822/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
823 * on success, 0 if the frame can be ignored and -1 if an error occurred. */
824static int
825handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
826{
827 int vsn, max_frame_size;
828 int i, idx = 0;
829 size_t min_size = (7 /* TYPE + METADATA */
830 + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
831 + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
832 + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
833
834 /* Check frame type */
835 if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
836 return 0;
837
838 if (size < min_size) {
839 spoe_status_code = SPOE_FRM_ERR_INVALID;
840 return -1;
841 }
842
843 /* Skip flags: fragmentation is not supported for now */
844 idx += 4;
845
846 /* stream-id and frame-id must be cleared */
847 if (frame[idx] != 0 || frame[idx+1] != 0) {
848 spoe_status_code = SPOE_FRM_ERR_INVALID;
849 return -1;
850 }
851 idx += 2;
852
853 /* There are 3 mandatory items: "version", "max-frame-size" and
854 * "capabilities" */
855
856 /* Loop on K/V items */
857 vsn = max_frame_size = 0;
858 while (idx < size) {
859 char *str;
860 uint64_t sz;
861
862 /* Decode the item key */
863 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
864 if (str == NULL) {
865 spoe_status_code = SPOE_FRM_ERR_INVALID;
866 return -1;
867 }
868 /* Check "version" K/V item */
869 if (!memcmp(str, VERSION_KEY, sz)) {
870 /* The value must be a string */
871 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
872 spoe_status_code = SPOE_FRM_ERR_INVALID;
873 return -1;
874 }
875 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
876 if (str == NULL) {
877 spoe_status_code = SPOE_FRM_ERR_INVALID;
878 return -1;
879 }
880
881 vsn = decode_spoe_version(str, sz);
882 if (vsn == -1) {
883 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
884 return -1;
885 }
886 for (i = 0; supported_versions[i].str != NULL; ++i) {
887 if (vsn >= supported_versions[i].min &&
888 vsn <= supported_versions[i].max)
889 break;
890 }
891 if (supported_versions[i].str == NULL) {
892 spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
893 return -1;
894 }
895 }
896 /* Check "max-frame-size" K/V item */
897 else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
898 int type;
899
900 /* The value must be integer */
901 type = frame[idx++];
902 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
903 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
904 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
905 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
906 spoe_status_code = SPOE_FRM_ERR_INVALID;
907 return -1;
908 }
909 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
910 spoe_status_code = SPOE_FRM_ERR_INVALID;
911 return -1;
912 }
913 idx += i;
914 if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
915 spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
916 return -1;
917 }
918 max_frame_size = sz;
919 }
920 /* Skip "capabilities" K/V item for now */
921 else {
922 /* Silently ignore unknown item */
923 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
924 spoe_status_code = SPOE_FRM_ERR_INVALID;
925 return -1;
926 }
927 idx += i;
928 }
929 }
930
931 /* Final checks */
932 if (!vsn) {
933 spoe_status_code = SPOE_FRM_ERR_NO_VSN;
934 return -1;
935 }
936 if (!max_frame_size) {
937 spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
938 return -1;
939 }
940
941 APPCTX_SPOE(appctx).version = (unsigned int)vsn;
942 APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
943 return idx;
944}
945
946/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
947 * bytes on success, 0 if the frame can be ignored and -1 if an error
948 * occurred. */
949static int
950handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
951{
952 int i, idx = 0;
953 size_t min_size = (7 /* TYPE + METADATA */
954 + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
955 + 1 + SLEN(MSG_KEY) + 1 + 1);
956
957 /* Check frame type */
958 if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
959 return 0;
960
961 if (size < min_size) {
962 spoe_status_code = SPOE_FRM_ERR_INVALID;
963 return -1;
964 }
965
966 /* Skip flags: fragmentation is not supported for now */
967 idx += 4;
968
969 /* stream-id and frame-id must be cleared */
970 if (frame[idx] != 0 || frame[idx+1] != 0) {
971 spoe_status_code = SPOE_FRM_ERR_INVALID;
972 return -1;
973 }
974 idx += 2;
975
976 /* There are 2 mandatory items: "status-code" and "message" */
977
978 /* Loop on K/V items */
979 while (idx < size) {
980 char *str;
981 uint64_t sz;
982
983 /* Decode the item key */
984 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
985 if (str == NULL) {
986 spoe_status_code = SPOE_FRM_ERR_INVALID;
987 return -1;
988 }
989
990 /* Check "status-code" K/V item */
991 if (!memcmp(str, STATUS_CODE_KEY, sz)) {
992 int type;
993
994 /* The value must be an integer */
995 type = frame[idx++];
996 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
997 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
998 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
999 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
1000 spoe_status_code = SPOE_FRM_ERR_INVALID;
1001 return -1;
1002 }
1003 if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
1004 spoe_status_code = SPOE_FRM_ERR_INVALID;
1005 return -1;
1006 }
1007 idx += i;
1008 spoe_status_code = sz;
1009 }
1010
1011 /* Check "message" K/V item */
1012 else if (sz && !memcmp(str, MSG_KEY, sz)) {
1013 /* The value must be a string */
1014 if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
1015 spoe_status_code = SPOE_FRM_ERR_INVALID;
1016 return -1;
1017 }
1018 idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
1019 if (str == NULL || sz > 255) {
1020 spoe_status_code = SPOE_FRM_ERR_INVALID;
1021 return -1;
1022 }
1023 memcpy(spoe_reason, str, sz);
1024 spoe_reason[sz] = 0;
1025 }
1026 else {
1027 /* Silently ignore unknown item */
1028 if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
1029 spoe_status_code = SPOE_FRM_ERR_INVALID;
1030 return -1;
1031 }
1032 idx += i;
1033 }
1034 }
1035
1036 return idx;
1037}
1038
1039
1040/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
1041 * success, 0 if the frame can be ignored and -1 if an error occurred. */
1042static int
1043handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
1044{
1045 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1046 uint64_t stream_id, frame_id;
1047 int idx = 0;
1048 size_t min_size = (7 /* TYPE + METADATA */);
1049
1050 /* Check frame type */
1051 if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
1052 return 0;
1053
1054 if (size < min_size) {
1055 spoe_status_code = SPOE_FRM_ERR_INVALID;
1056 return -1;
1057 }
1058
1059 /* Skip flags: fragmentation is not supported for now */
1060 idx += 4;
1061
1062 /* Get the stream-id and the frame-id */
1063 idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
1064 idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
1065
1066 /* Check stream-id and frame-id */
1067 if (ctx->stream_id != (unsigned int)stream_id ||
1068 ctx->frame_id != (unsigned int)frame_id)
1069 return 0;
1070
1071 /* Copy encoded actions */
1072 b_reset(ctx->buffer);
1073 memcpy(ctx->buffer->p, frame+idx, size-idx);
1074 ctx->buffer->i = size-idx;
1075
1076 return idx;
1077}
1078
Christopher Fauletba7bc162016-11-07 21:07:38 +01001079/* This function is used in cfgparse.c and declared in proto/checks.h. It
1080 * prepare the request to send to agents during a healthcheck. It returns 0 on
1081 * success and -1 if an error occurred. */
1082int
1083prepare_spoe_healthcheck_request(char **req, int *len)
1084{
1085 struct appctx a;
1086 char *frame, buf[global.tune.bufsize];
1087 unsigned int framesz;
1088 int idx;
1089
1090 memset(&a, 0, sizeof(a));
1091 memset(buf, 0, sizeof(buf));
1092 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1093
1094 frame = buf+4;
1095 idx = prepare_spoe_hahello_frame(&a, frame, global.tune.bufsize-4);
1096 if (idx <= 0)
1097 return -1;
1098 if (idx + SLEN(HEALTHCHECK_KEY) + 1 > global.tune.bufsize-4)
1099 return -1;
1100
1101 /* "healthcheck" K/V item */
1102 idx += encode_spoe_string(HEALTHCHECK_KEY, SLEN(HEALTHCHECK_KEY), frame+idx);
1103 frame[idx++] = (SPOE_DATA_T_BOOL | SPOE_DATA_FL_TRUE);
1104
1105 framesz = htonl(idx);
1106 memcpy(buf, (char *)&framesz, 4);
1107
1108 if ((*req = malloc(idx+4)) == NULL)
1109 return -1;
1110 memcpy(*req, buf, idx+4);
1111 *len = idx+4;
1112 return 0;
1113}
1114
1115/* This function is used in checks.c and declared in proto/checks.h. It decode
1116 * the response received from an agent during a healthcheck. It returns 0 on
1117 * success and -1 if an error occurred. */
1118int
1119handle_spoe_healthcheck_response(char *frame, size_t size, char *err, int errlen)
1120{
1121 struct appctx a;
1122 int r;
1123
1124 memset(&a, 0, sizeof(a));
1125 APPCTX_SPOE(&a).max_frame_size = global.tune.bufsize;
1126
1127 if (handle_spoe_agentdiscon_frame(&a, frame, size) != 0)
1128 goto error;
1129 if ((r = handle_spoe_agenthello_frame(&a, frame, size)) <= 0) {
1130 if (r == 0)
1131 spoe_status_code = SPOE_FRM_ERR_INVALID;
1132 goto error;
1133 }
1134
1135 return 0;
1136
1137 error:
1138 if (spoe_status_code >= SPOE_FRM_ERRS)
1139 spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
1140 strncpy(err, spoe_frm_err_reasons[spoe_status_code], errlen);
1141 return -1;
1142}
Christopher Fauletf7e4e7e2016-10-27 22:29:49 +02001143
1144/********************************************************************
1145 * Functions that manage the SPOE applet
1146 ********************************************************************/
1147/* Callback function that catches applet timeouts. If a timeout occurred, we set
1148 * <appctx->st1> flag and the SPOE applet is woken up. */
1149static struct task *
1150process_spoe_applet(struct task * task)
1151{
1152 struct appctx *appctx = task->context;
1153
1154 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1155 if (tick_is_expired(task->expire, now_ms)) {
1156 task->expire = TICK_ETERNITY;
1157 appctx->st1 = SPOE_APPCTX_ERR_TOUT;
1158 }
1159 si_applet_want_get(appctx->owner);
1160 appctx_wakeup(appctx);
1161 return task;
1162}
1163
1164/* Remove a SPOE applet from the agent cache */
1165static void
1166remove_spoe_applet_from_cache(struct appctx *appctx)
1167{
1168 struct appctx *a, *back;
1169 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1170
1171 if (LIST_ISEMPTY(&agent->cache))
1172 return;
1173
1174 list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
1175 if (a == appctx) {
1176 LIST_DEL(&APPCTX_SPOE(appctx).list);
1177 break;
1178 }
1179 }
1180}
1181
1182
1183/* Callback function that releases a SPOE applet. This happens when the
1184 * connection with the agent is closed. */
1185static void
1186release_spoe_applet(struct appctx *appctx)
1187{
1188 struct stream_interface *si = appctx->owner;
1189 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1190 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1191
1192 if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
1193 appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
1194 on_new_spoe_appctx_failure(agent);
1195
1196 if (appctx->st0 != SPOE_APPCTX_ST_END) {
1197 si_shutw(si);
1198 si_shutr(si);
1199 si_ic(si)->flags |= CF_READ_NULL;
1200 appctx->st0 = SPOE_APPCTX_ST_END;
1201 }
1202
1203 if (ctx != NULL) {
1204 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1205 ctx->appctx = NULL;
1206 }
1207
1208 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
1209 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1210 __FUNCTION__, appctx);
1211
1212 /* Release the task attached to the SPOE applet */
1213 if (APPCTX_SPOE(appctx).task) {
1214 task_delete(APPCTX_SPOE(appctx).task);
1215 task_free(APPCTX_SPOE(appctx).task);
1216 }
1217
1218 /* And remove it from the agent cache */
1219 remove_spoe_applet_from_cache(appctx);
1220 APPCTX_SPOE(appctx).ctx = NULL;
1221}
1222
1223/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
1224 * the frame can be ignored, 0 to retry later and 1 on success. The frame is
1225 * encoded using the callback function <prepare>. */
1226static int
1227send_spoe_frame(struct appctx *appctx,
1228 int (*prepare)(struct appctx *, char *, size_t))
1229{
1230 struct stream_interface *si = appctx->owner;
1231 int framesz, ret;
1232 uint32_t netint;
1233
1234 ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
1235 if (ret <= 0)
1236 goto skip_or_error;
1237 framesz = ret;
1238 netint = htonl(framesz);
1239 ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
1240 if (ret > 0)
1241 ret = bi_putblk(si_ic(si), trash.str, framesz);
1242 if (ret <= 0) {
1243 if (ret == -1)
1244 return -1;
1245 return -2;
1246 }
1247 return 1;
1248
1249 skip_or_error:
1250 if (!ret)
1251 return -1;
1252 return -2;
1253}
1254
1255/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
1256 * when the frame can be ignored, 0 to retry later and 1 on success. The frame
1257 * is decoded using the callback function <handle>. */
1258static int
1259recv_spoe_frame(struct appctx *appctx,
1260 int (*handle)(struct appctx *, char *, size_t))
1261{
1262 struct stream_interface *si = appctx->owner;
1263 int framesz, ret;
1264 uint32_t netint;
1265
1266 ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
1267 if (ret <= 0)
1268 goto empty_or_error;
1269 framesz = ntohl(netint);
1270 if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
1271 spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
1272 return -2;
1273 }
1274
1275 ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
1276 if (ret <= 0)
1277 goto empty_or_error;
1278 bo_skip(si_oc(si), ret+sizeof(netint));
1279
1280 /* First check if the received frame is a DISCONNECT frame */
1281 ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
1282 if (ret != 0) {
1283 if (ret > 0) {
1284 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1285 " - disconnected by peer (%d): %s\n",
1286 (int)now.tv_sec, (int)now.tv_usec,
1287 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1288 __FUNCTION__, appctx, spoe_status_code,
1289 spoe_reason);
1290 return 2;
1291 }
1292 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1293 " - error on frame (%s)\n",
1294 (int)now.tv_sec, (int)now.tv_usec,
1295 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1296 __FUNCTION__, appctx,
1297 spoe_frm_err_reasons[spoe_status_code]);
1298 return -2;
1299 }
1300 if (handle == NULL)
1301 goto out;
1302
1303 /* If not, try to decode it */
1304 ret = handle(appctx, trash.str, framesz);
1305 if (ret <= 0) {
1306 if (!ret)
1307 return -1;
1308 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1309 " - error on frame (%s)\n",
1310 (int)now.tv_sec, (int)now.tv_usec,
1311 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1312 __FUNCTION__, appctx,
1313 spoe_frm_err_reasons[spoe_status_code]);
1314 return -2;
1315 }
1316 out:
1317 return 1;
1318
1319 empty_or_error:
1320 if (!ret)
1321 return 0;
1322 spoe_status_code = SPOE_FRM_ERR_IO;
1323 return -2;
1324}
1325
1326/* I/O Handler processing messages exchanged with the agent */
1327static void
1328handle_spoe_applet(struct appctx *appctx)
1329{
1330 struct stream_interface *si = appctx->owner;
1331 struct stream *s = si_strm(si);
1332 struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
1333 struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
1334 int ret;
1335
1336 switchstate:
1337 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1338 " - appctx-state=%s\n",
1339 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1340 __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
1341
1342 switch (appctx->st0) {
1343 case SPOE_APPCTX_ST_CONNECT:
1344 spoe_status_code = SPOE_FRM_ERR_NONE;
1345 if (si->state <= SI_ST_CON) {
1346 si_applet_want_put(si);
1347 task_wakeup(s->task, TASK_WOKEN_MSG);
1348 break;
1349 }
1350 else if (si->state != SI_ST_EST) {
1351 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1352 on_new_spoe_appctx_failure(agent);
1353 goto switchstate;
1354 }
1355 ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
1356 if (ret < 0) {
1357 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1358 on_new_spoe_appctx_failure(agent);
1359 goto switchstate;
1360 }
1361 else if (!ret)
1362 goto full;
1363
1364 /* Hello frame was sent. Set the hello timeout and
1365 * wait for the reply. */
1366 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
1367 appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
1368 /* fall through */
1369
1370 case SPOE_APPCTX_ST_CONNECTING:
1371 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1372 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1373 on_new_spoe_appctx_failure(agent);
1374 goto switchstate;
1375 }
1376 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1377 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1378 " - Connection timed out\n",
1379 (int)now.tv_sec, (int)now.tv_usec,
1380 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1381 __FUNCTION__, appctx);
1382 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1383 on_new_spoe_appctx_failure(agent);
1384 goto switchstate;
1385 }
1386 ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
1387 if (ret < 0) {
1388 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1389 on_new_spoe_appctx_failure(agent);
1390 goto switchstate;
1391 }
1392 if (ret == 2) {
1393 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1394 on_new_spoe_appctx_failure(agent);
1395 goto switchstate;
1396 }
1397 if (!ret)
1398 goto out;
1399
1400 /* hello handshake is finished, set the idle timeout,
1401 * Add the appctx in the agent cache, decrease the
1402 * number of new applets and wake up waiting streams. */
1403 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1404 appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
1405 on_new_spoe_appctx_success(agent, appctx);
1406 break;
1407
1408 case SPOE_APPCTX_ST_PROCESSING:
1409 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1410 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1411 goto switchstate;
1412 }
1413 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1414 spoe_status_code = SPOE_FRM_ERR_TOUT;
1415 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1416 appctx->st1 = SPOE_APPCTX_ERR_NONE;
1417 goto switchstate;
1418 }
1419 if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
1420 ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
1421 if (ret < 0) {
1422 if (ret == -1) {
1423 ctx->state = SPOE_CTX_ST_ERROR;
1424 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1425 goto skip_notify_frame;
1426 }
1427 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1428 goto switchstate;
1429 }
1430 else if (!ret)
1431 goto full;
1432 ctx->state = SPOE_CTX_ST_WAITING_ACK;
1433 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.ack);
1434 }
1435
1436 skip_notify_frame:
1437 if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
1438 ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
1439 if (ret < 0) {
1440 if (ret == -1)
1441 goto skip_notify_frame;
1442 ctx->state = SPOE_CTX_ST_ERROR;
1443 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1444 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1445 goto switchstate;
1446 }
1447 if (!ret)
1448 goto out;
1449 if (ret == 2) {
1450 ctx->state = SPOE_CTX_ST_ERROR;
1451 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1452 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1453 goto switchstate;
1454 }
1455 ctx->state = SPOE_CTX_ST_DONE;
1456 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1457 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1458 }
1459 else {
1460 if (stopping) {
1461 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1462 goto switchstate;
1463 }
1464
1465 ret = recv_spoe_frame(appctx, NULL);
1466 if (ret < 0) {
1467 if (ret == -1)
1468 goto skip_notify_frame;
1469 appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
1470 goto switchstate;
1471 }
1472 if (!ret)
1473 goto out;
1474 if (ret == 2) {
1475 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1476 goto switchstate;
1477 }
1478 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
1479 }
1480 break;
1481
1482 case SPOE_APPCTX_ST_DISCONNECT:
1483 ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
1484 if (ret < 0) {
1485 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1486 goto switchstate;
1487 }
1488 else if (!ret)
1489 goto full;
1490 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
1491 " - disconnected by HAProxy (%d): %s\n",
1492 (int)now.tv_sec, (int)now.tv_usec,
1493 ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
1494 __FUNCTION__, appctx, spoe_status_code,
1495 spoe_frm_err_reasons[spoe_status_code]);
1496
1497 APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.ack);
1498 appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
1499 /* fall through */
1500
1501 case SPOE_APPCTX_ST_DISCONNECTING:
1502 if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
1503 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1504 goto switchstate;
1505 }
1506 if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
1507 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1508 goto switchstate;
1509 }
1510 ret = recv_spoe_frame(appctx, NULL);
1511 if (ret < 0 || ret == 2) {
1512 appctx->st0 = SPOE_APPCTX_ST_EXIT;
1513 goto switchstate;
1514 }
1515 break;
1516
1517 case SPOE_APPCTX_ST_EXIT:
1518 si_shutw(si);
1519 si_shutr(si);
1520 si_ic(si)->flags |= CF_READ_NULL;
1521 appctx->st0 = SPOE_APPCTX_ST_END;
1522 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1523 /* fall through */
1524
1525 case SPOE_APPCTX_ST_END:
1526 break;
1527 }
1528
1529 out:
1530 if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
1531 task_queue(APPCTX_SPOE(appctx).task);
1532 si_oc(si)->flags |= CF_READ_DONTWAIT;
1533 task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
1534 return;
1535 full:
1536 si_applet_cant_put(si);
1537 goto out;
1538}
1539
1540struct applet spoe_applet = {
1541 .obj_type = OBJ_TYPE_APPLET,
1542 .name = "<SPOE>", /* used for logging */
1543 .fct = handle_spoe_applet,
1544 .release = release_spoe_applet,
1545};
1546
1547/* Create a SPOE applet. On success, the created applet is returned, else
1548 * NULL. */
1549static struct appctx *
1550create_spoe_appctx(struct spoe_config *conf)
1551{
1552 struct appctx *appctx;
1553 struct session *sess;
1554 struct task *task;
1555 struct stream *strm;
1556 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
1557 struct listener *, by_fe);
1558
1559 if ((appctx = appctx_new(&spoe_applet)) == NULL)
1560 goto out_error;
1561
1562 appctx->st0 = SPOE_APPCTX_ST_CONNECT;
1563 if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
1564 goto out_free_appctx;
1565 APPCTX_SPOE(appctx).task->process = process_spoe_applet;
1566 APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
1567 APPCTX_SPOE(appctx).task->context = appctx;
1568 APPCTX_SPOE(appctx).agent = conf->agent;
1569 APPCTX_SPOE(appctx).ctx = NULL;
1570 APPCTX_SPOE(appctx).version = 0;
1571 APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
1572 task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
1573
1574 sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
1575 if (!sess)
1576 goto out_free_spoe;
1577
1578 if ((task = task_new()) == NULL)
1579 goto out_free_sess;
1580
1581 if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
1582 goto out_free_task;
1583
1584 strm->target = sess->listener->default_target;
1585 strm->req.analysers |= sess->listener->analysers;
1586 stream_set_backend(strm, conf->agent->b.be);
1587
1588 /* applet is waiting for data */
1589 si_applet_cant_get(&strm->si[0]);
1590 appctx_wakeup(appctx);
1591
1592 /* Increase the number of applets waiting the end of the hello
1593 * handshake. */
1594 conf->agent->new_applets++;
1595
1596 strm->do_log = NULL;
1597 strm->res.flags |= CF_READ_DONTWAIT;
1598
1599 conf->agent_fe.feconn++;
1600 jobs++;
1601 totalconn++;
1602
1603 return appctx;
1604
1605 /* Error unrolling */
1606 out_free_task:
1607 task_free(task);
1608 out_free_sess:
1609 session_free(sess);
1610 out_free_spoe:
1611 task_free(APPCTX_SPOE(appctx).task);
1612 out_free_appctx:
1613 appctx_free(appctx);
1614 out_error:
1615 return NULL;
1616}
1617
1618/* Wake up a SPOE applet attached to a SPOE context. */
1619static void
1620wakeup_spoe_appctx(struct spoe_context *ctx)
1621{
1622 if (ctx->appctx == NULL)
1623 return;
1624 if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
1625 si_applet_want_get(ctx->appctx->owner);
1626 si_applet_want_put(ctx->appctx->owner);
1627 appctx_wakeup(ctx->appctx);
1628 }
1629}
1630
1631
1632/* Run across the list of pending streams waiting for a SPOE applet and wake the
1633 * first. */
1634static void
1635offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
1636{
1637 struct spoe_context *ctx;
1638
1639 if (LIST_ISEMPTY(&agent->applet_wq))
1640 LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
1641 else {
1642 ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
1643 APPCTX_SPOE(appctx).ctx = ctx;
1644 ctx->appctx = appctx;
1645 LIST_DEL(&ctx->applet_wait);
1646 LIST_INIT(&ctx->applet_wait);
1647 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1648 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1649 " - wake up stream to get available SPOE applet\n",
1650 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1651 __FUNCTION__, ctx->strm);
1652 }
1653}
1654
1655/* A failure occurred during SPOE applet creation. */
1656static void
1657on_new_spoe_appctx_failure(struct spoe_agent *agent)
1658{
1659 struct spoe_context *ctx;
1660
1661 agent->new_applets--;
1662 list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
1663 ctx->errs++;
1664 task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
1665 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1666 " - wake up stream because to SPOE applet connection failed\n",
1667 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1668 __FUNCTION__, ctx->strm);
1669 }
1670}
1671
1672static void
1673on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
1674{
1675 agent->new_applets--;
1676 offer_spoe_appctx(agent, appctx);
1677}
1678/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
1679 * returns 1 on success, 0 to retry later and -1 if an error occurred. */
1680static int
1681acquire_spoe_appctx(struct spoe_context *ctx, int dir)
1682{
1683 struct spoe_config *conf = FLT_CONF(ctx->filter);
1684 struct spoe_agent *agent = conf->agent;
1685 struct appctx *appctx;
1686
1687 /* If a process is already started for this SPOE context, retry
1688 * later. */
1689 if (ctx->flags & SPOE_CTX_FL_PROCESS)
1690 goto wait;
1691
1692 /* If needed, initialize the buffer that will be used to encode messages
1693 * and decode actions. */
1694 if (ctx->buffer == &buf_empty) {
1695 if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
1696 LIST_DEL(&ctx->buffer_wait);
1697 LIST_INIT(&ctx->buffer_wait);
1698 }
1699
1700 if (!b_alloc_margin(&ctx->buffer, 0)) {
1701 LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
1702 goto wait;
1703 }
1704 }
1705
1706 /* If the SPOE applet was already set, all is done. */
1707 if (ctx->appctx)
1708 goto success;
1709
1710 /* Else try to retrieve it from the agent cache */
1711 if (!LIST_ISEMPTY(&agent->cache)) {
1712 appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
1713 LIST_DEL(&APPCTX_SPOE(appctx).list);
1714 APPCTX_SPOE(appctx).ctx = ctx;
1715 ctx->appctx = appctx;
1716 goto success;
1717 }
1718
1719 /* If there is no server up for the agent's backend or it too many
1720 * failure occurred, this is an error. */
1721 if ((!agent->b.be->srv_act && !agent->b.be->srv_bck) ||
1722 ctx->errs >= MAX_NEW_SPOE_APPLET_ERRS)
1723 goto error;
1724
1725 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1726 " - waiting for available SPOE appctx\n",
1727 (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
1728 ctx->strm);
1729
1730 /* Else add the stream in the waiting queue. */
1731 if (LIST_ISEMPTY(&ctx->applet_wait))
1732 LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
1733
1734 /* Finally, create new SPOE applet if we can */
1735 if (agent->new_applets < MAX_NEW_SPOE_APPLETS) {
1736 if (create_spoe_appctx(conf) == NULL)
1737 goto error;
1738 }
1739
1740 wait:
1741 return 0;
1742
1743 success:
1744 /* Remove the stream from the waiting queue */
1745 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1746 LIST_DEL(&ctx->applet_wait);
1747 LIST_INIT(&ctx->applet_wait);
1748 }
1749
1750 /* Set the right flag to prevent request and response processing
1751 * in same time. */
1752 ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
1753 ? SPOE_CTX_FL_REQ_PROCESS
1754 : SPOE_CTX_FL_RSP_PROCESS);
1755 ctx->errs = 0;
1756
1757 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1758 " - acquire SPOE appctx %p from cache\n",
1759 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1760 __FUNCTION__, ctx->strm, ctx->appctx);
1761 return 1;
1762
1763 error:
1764 /* Remove the stream from the waiting queue */
1765 if (!LIST_ISEMPTY(&ctx->applet_wait)) {
1766 LIST_DEL(&ctx->applet_wait);
1767 LIST_INIT(&ctx->applet_wait);
1768 }
1769
1770 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1771 " - failed to acquire SPOE appctx errs=%u\n",
1772 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1773 __FUNCTION__, ctx->strm, ctx->errs);
1774 send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
1775
1776 return -1;
1777}
1778
1779/* Release a SPOE applet and push it in the agent cache. */
1780static void
1781release_spoe_appctx(struct spoe_context *ctx)
1782{
1783 struct spoe_config *conf = FLT_CONF(ctx->filter);
1784 struct spoe_agent *agent = conf->agent;
1785 struct appctx *appctx = ctx->appctx;
1786
1787 /* Reset the flag to allow next processing */
1788 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
1789
1790 /* Release the buffer if needed */
1791 if (ctx->buffer != &buf_empty) {
1792 b_free(&ctx->buffer);
1793 if (!LIST_ISEMPTY(&buffer_wq))
1794 stream_offer_buffers();
1795 }
1796
1797 /* If there is no SPOE applet, all is done */
1798 if (!appctx)
1799 return;
1800
1801 /* Else, reassign it or push it in the agent cache */
1802 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
1803 " - release SPOE appctx %p\n",
1804 (int)now.tv_sec, (int)now.tv_usec, agent->id,
1805 __FUNCTION__, ctx->strm, appctx);
1806
1807 APPCTX_SPOE(appctx).ctx = NULL;
1808 ctx->appctx = NULL;
1809 offer_spoe_appctx(agent, appctx);
1810}
1811
1812/***************************************************************************
1813 * Functions that process SPOE messages and actions
1814 **************************************************************************/
1815/* Process SPOE messages for a specific event. During the processing, it returns
1816 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1817 * is returned. */
1818static int
1819process_spoe_messages(struct stream *s, struct spoe_context *ctx,
1820 struct list *messages, int dir)
1821{
1822 struct spoe_message *msg;
1823 struct sample *smp;
1824 struct spoe_arg *arg;
1825 char *p;
1826 size_t max_size;
1827 int off, flag, idx = 0;
1828
1829 /* Reserve 32 bytes from the frame Metadata */
1830 max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
1831
1832 b_reset(ctx->buffer);
1833 p = ctx->buffer->p;
1834
1835 /* Loop on messages */
1836 list_for_each_entry(msg, messages, list) {
1837 if (idx + msg->id_len + 1 > max_size)
1838 goto skip;
1839
1840 /* Set the message name */
1841 idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
1842
1843 /* Save offset where to store the number of arguments for this
1844 * message */
1845 off = idx++;
1846 p[off] = 0;
1847
1848 /* Loop on arguments */
1849 list_for_each_entry(arg, &msg->args, list) {
1850 p[off]++; /* Increment the number of arguments */
1851
1852 if (idx + arg->name_len + 1 > max_size)
1853 goto skip;
1854
1855 /* Encode the arguement name as a string. It can by NULL */
1856 idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
1857
1858 /* Fetch the arguement value */
1859 smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
1860 if (!smp) {
1861 /* If no value is available, set it to NULL */
1862 p[idx++] = SPOE_DATA_T_NULL;
1863 continue;
1864 }
1865
1866 /* Else, encode the arguement value */
1867 switch (smp->data.type) {
1868 case SMP_T_BOOL:
1869 flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
1870 p[idx++] = (SPOE_DATA_T_BOOL | flag);
1871 break;
1872 case SMP_T_SINT:
1873 p[idx++] = SPOE_DATA_T_INT64;
1874 if (idx + 8 > max_size)
1875 goto skip;
1876 idx += encode_spoe_varint(smp->data.u.sint, p+idx);
1877 break;
1878 case SMP_T_IPV4:
1879 p[idx++] = SPOE_DATA_T_IPV4;
1880 if (idx + 4 > max_size)
1881 goto skip;
1882 memcpy(p+idx, &smp->data.u.ipv4, 4);
1883 idx += 4;
1884 break;
1885 case SMP_T_IPV6:
1886 p[idx++] = SPOE_DATA_T_IPV6;
1887 if (idx + 16 > max_size)
1888 goto skip;
1889 memcpy(p+idx, &smp->data.u.ipv6, 16);
1890 idx += 16;
1891 break;
1892 case SMP_T_STR:
1893 p[idx++] = SPOE_DATA_T_STR;
1894 if (idx + smp->data.u.str.len > max_size)
1895 goto skip;
1896 idx += encode_spoe_string(smp->data.u.str.str,
1897 smp->data.u.str.len,
1898 p+idx);
1899 break;
1900 case SMP_T_BIN:
1901 p[idx++] = SPOE_DATA_T_BIN;
1902 if (idx + smp->data.u.str.len > max_size)
1903 goto skip;
1904 idx += encode_spoe_string(smp->data.u.str.str,
1905 smp->data.u.str.len,
1906 p+idx);
1907 break;
1908 case SMP_T_METH:
1909 if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
1910 p[idx++] = SPOE_DATA_T_STR;
1911 if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
1912 goto skip;
1913 idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
1914 http_known_methods[smp->data.u.meth.meth].len,
1915 p+idx);
1916 }
1917 else {
1918 p[idx++] = SPOE_DATA_T_STR;
1919 if (idx + smp->data.u.str.len > max_size)
1920 goto skip;
1921 idx += encode_spoe_string(smp->data.u.meth.str.str,
1922 smp->data.u.meth.str.len,
1923 p+idx);
1924 }
1925 break;
1926 default:
1927 p[idx++] = SPOE_DATA_T_NULL;
1928 }
1929 }
1930 }
1931 ctx->buffer->i = idx;
1932 return 1;
1933
1934 skip:
1935 b_reset(ctx->buffer);
1936 return 0;
1937}
1938
1939/* Helper function to set a variable */
1940static void
1941set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1942 struct sample *smp)
1943{
1944 struct spoe_config *conf = FLT_CONF(ctx->filter);
1945 struct spoe_agent *agent = conf->agent;
1946 char varname[64];
1947
1948 memset(varname, 0, sizeof(varname));
1949 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1950 scope, agent->var_pfx, len, name);
1951 vars_set_by_name_ifexist(varname, len, smp);
1952}
1953
1954/* Helper function to unset a variable */
1955static void
1956unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
1957 struct sample *smp)
1958{
1959 struct spoe_config *conf = FLT_CONF(ctx->filter);
1960 struct spoe_agent *agent = conf->agent;
1961 char varname[64];
1962
1963 memset(varname, 0, sizeof(varname));
1964 len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
1965 scope, agent->var_pfx, len, name);
1966 vars_unset_by_name_ifexist(varname, len, smp);
1967}
1968
1969
1970/* Process SPOE actions for a specific event. During the processing, it returns
1971 * 0 and it returns 1 when the processing is finished. If an error occurred, -1
1972 * is returned. */
1973static int
1974process_spoe_actions(struct stream *s, struct spoe_context *ctx,
1975 enum spoe_event ev, int dir)
1976{
1977 char *p;
1978 size_t size;
1979 int off, i, idx = 0;
1980
1981 p = ctx->buffer->p;
1982 size = ctx->buffer->i;
1983
1984 while (idx < size) {
1985 char *str;
1986 uint64_t sz;
1987 struct sample smp;
1988 enum spoe_action_type type;
1989
1990 off = idx;
1991 if (idx+2 > size)
1992 goto skip;
1993
1994 type = p[idx++];
1995 switch (type) {
1996 case SPOE_ACT_T_SET_VAR: {
1997 char *scope;
1998
1999 if (p[idx++] != 3)
2000 goto skip_action;
2001
2002 switch (p[idx++]) {
2003 case SPOE_SCOPE_PROC: scope = "proc"; break;
2004 case SPOE_SCOPE_SESS: scope = "sess"; break;
2005 case SPOE_SCOPE_TXN : scope = "txn"; break;
2006 case SPOE_SCOPE_REQ : scope = "req"; break;
2007 case SPOE_SCOPE_RES : scope = "res"; break;
2008 default: goto skip;
2009 }
2010
2011 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2012 if (str == NULL)
2013 goto skip;
2014 memset(&smp, 0, sizeof(smp));
2015 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2016 if (decode_spoe_data(p+idx, p+size, &smp) == -1)
2017 goto skip;
2018
2019 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2020 " - set-var '%s.%s.%.*s'\n",
2021 (int)now.tv_sec, (int)now.tv_usec,
2022 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2023 __FUNCTION__, s, scope,
2024 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2025 (int)sz, str);
2026
2027 set_spoe_var(ctx, scope, str, sz, &smp);
2028 break;
2029 }
2030
2031 case SPOE_ACT_T_UNSET_VAR: {
2032 char *scope;
2033
2034 if (p[idx++] != 2)
2035 goto skip_action;
2036
2037 switch (p[idx++]) {
2038 case SPOE_SCOPE_PROC: scope = "proc"; break;
2039 case SPOE_SCOPE_SESS: scope = "sess"; break;
2040 case SPOE_SCOPE_TXN : scope = "txn"; break;
2041 case SPOE_SCOPE_REQ : scope = "req"; break;
2042 case SPOE_SCOPE_RES : scope = "res"; break;
2043 default: goto skip;
2044 }
2045
2046 idx += decode_spoe_string(p+idx, p+size, &str, &sz);
2047 if (str == NULL)
2048 goto skip;
2049 memset(&smp, 0, sizeof(smp));
2050 smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
2051
2052 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2053 " - unset-var '%s.%s.%.*s'\n",
2054 (int)now.tv_sec, (int)now.tv_usec,
2055 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2056 __FUNCTION__, s, scope,
2057 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
2058 (int)sz, str);
2059
2060 unset_spoe_var(ctx, scope, str, sz, &smp);
2061 break;
2062 }
2063
2064 default:
2065 skip_action:
2066 if ((i = skip_spoe_action(p+off, p+size)) == -1)
2067 goto skip;
2068 idx += i;
2069 }
2070 }
2071
2072 return 1;
2073 skip:
2074 return 0;
2075}
2076
2077
2078/* Process a SPOE event. First, this functions will process messages attached to
2079 * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
2080 * ACK frame to process corresponding actions. During all the processing, it
2081 * returns 0 and it returns 1 when the processing is finished. If an error
2082 * occurred, -1 is returned. */
2083static int
2084process_spoe_event(struct stream *s, struct spoe_context *ctx,
2085 enum spoe_event ev)
2086{
2087 int dir, ret = 1;
2088
2089 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
2090 " - ctx-state=%s - event=%s\n",
2091 (int)now.tv_sec, (int)now.tv_usec,
2092 ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
2093 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2094 spoe_event_str[ev]);
2095
2096 dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
2097
2098 if (LIST_ISEMPTY(&(ctx->messages[ev])))
2099 goto out;
2100
2101 if (ctx->state == SPOE_CTX_ST_ERROR)
2102 goto error;
2103
2104 if (ctx->state == SPOE_CTX_ST_READY) {
2105 ret = acquire_spoe_appctx(ctx, dir);
2106 if (ret <= 0) {
2107 if (!ret)
2108 goto out;
2109 goto error;
2110 }
2111 ctx->state = SPOE_CTX_ST_SENDING_MSGS;
2112 }
2113
2114 if (ctx->appctx == NULL)
2115 goto error;
2116
2117 if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
2118 ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
2119 if (ret <= 0) {
2120 if (!ret)
2121 goto skip;
2122 goto error;
2123 }
2124 wakeup_spoe_appctx(ctx);
2125 ret = 0;
2126 goto out;
2127 }
2128
2129 if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
2130 wakeup_spoe_appctx(ctx);
2131 ret = 0;
2132 goto out;
2133 }
2134
2135 if (ctx->state == SPOE_CTX_ST_DONE) {
2136 ret = process_spoe_actions(s, ctx, ev, dir);
2137 if (ret <= 0) {
2138 if (!ret)
2139 goto skip;
2140 goto error;
2141 }
2142 ctx->frame_id++;
2143 release_spoe_appctx(ctx);
2144 ctx->state = SPOE_CTX_ST_READY;
2145 }
2146
2147 out:
2148 return ret;
2149
2150 skip:
2151 release_spoe_appctx(ctx);
2152 ctx->state = SPOE_CTX_ST_READY;
2153 return 1;
2154
2155 error:
2156 release_spoe_appctx(ctx);
2157 ctx->state = SPOE_CTX_ST_ERROR;
2158 return 1;
2159}
2160
2161
2162/***************************************************************************
2163 * Functions that create/destroy SPOE contexts
2164 **************************************************************************/
2165static struct spoe_context *
2166create_spoe_context(struct filter *filter)
2167{
2168 struct spoe_config *conf = FLT_CONF(filter);
2169 struct spoe_context *ctx;
2170
2171 ctx = pool_alloc_dirty(pool2_spoe_ctx);
2172 if (ctx == NULL) {
2173 return NULL;
2174 }
2175 memset(ctx, 0, sizeof(*ctx));
2176 ctx->filter = filter;
2177 ctx->state = SPOE_CTX_ST_NONE;
2178 ctx->flags = 0;
2179 ctx->errs = 0;
2180 ctx->messages = conf->agent->messages;
2181 ctx->buffer = &buf_empty;
2182 LIST_INIT(&ctx->buffer_wait);
2183 LIST_INIT(&ctx->applet_wait);
2184
2185 ctx->stream_id = 0;
2186 ctx->frame_id = 1;
2187
2188 return ctx;
2189}
2190
2191static void
2192destroy_spoe_context(struct spoe_context *ctx)
2193{
2194 if (!ctx)
2195 return;
2196
2197 if (ctx->appctx)
2198 APPCTX_SPOE(ctx->appctx).ctx = NULL;
2199 if (!LIST_ISEMPTY(&ctx->buffer_wait))
2200 LIST_DEL(&ctx->buffer_wait);
2201 if (!LIST_ISEMPTY(&ctx->applet_wait))
2202 LIST_DEL(&ctx->applet_wait);
2203 pool_free2(pool2_spoe_ctx, ctx);
2204}
2205
2206static void
2207reset_spoe_context(struct spoe_context *ctx)
2208{
2209 ctx->state = SPOE_CTX_ST_READY;
2210 ctx->flags &= ~SPOE_CTX_FL_PROCESS;
2211}
2212
2213
2214/***************************************************************************
2215 * Hooks that manage the filter lifecycle (init/check/deinit)
2216 **************************************************************************/
2217/* Signal handler: Do a soft stop, wakeup SPOE applet */
2218static void
2219sig_stop_spoe(struct sig_handler *sh)
2220{
2221 struct proxy *p;
2222
2223 p = proxy;
2224 while (p) {
2225 struct flt_conf *fconf;
2226
2227 list_for_each_entry(fconf, &p->filter_configs, list) {
2228 struct spoe_config *conf = fconf->conf;
2229 struct spoe_agent *agent = conf->agent;
2230 struct appctx *appctx;
2231
2232 list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
2233 si_applet_want_get(appctx->owner);
2234 si_applet_want_put(appctx->owner);
2235 appctx_wakeup(appctx);
2236 }
2237 }
2238 p = p->next;
2239 }
2240}
2241
2242
2243/* Initialize the SPOE filter. Returns -1 on error, else 0. */
2244static int
2245spoe_init(struct proxy *px, struct flt_conf *fconf)
2246{
2247 struct spoe_config *conf = fconf->conf;
2248 struct listener *l;
2249
2250 memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
2251 init_new_proxy(&conf->agent_fe);
2252 conf->agent_fe.parent = conf->agent;
2253 conf->agent_fe.last_change = now.tv_sec;
2254 conf->agent_fe.id = conf->agent->id;
2255 conf->agent_fe.cap = PR_CAP_FE;
2256 conf->agent_fe.mode = PR_MODE_TCP;
2257 conf->agent_fe.maxconn = 0;
2258 conf->agent_fe.options2 |= PR_O2_INDEPSTR;
2259 conf->agent_fe.conn_retries = CONN_RETRIES;
2260 conf->agent_fe.accept = frontend_accept;
2261 conf->agent_fe.srv = NULL;
2262 conf->agent_fe.timeout.client = TICK_ETERNITY;
2263 conf->agent_fe.default_target = &spoe_applet.obj_type;
2264 conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
2265
2266 if ((l = calloc(1, sizeof(*l))) == NULL) {
2267 Alert("spoe_init : out of memory.\n");
2268 goto out_error;
2269 }
2270 l->obj_type = OBJ_TYPE_LISTENER;
2271 l->obj_type = OBJ_TYPE_LISTENER;
2272 l->frontend = &conf->agent_fe;
2273 l->state = LI_READY;
2274 l->analysers = conf->agent_fe.fe_req_ana;
2275 LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
2276
2277 if (!sighandler_registered) {
2278 signal_register_fct(0, sig_stop_spoe, 0);
2279 sighandler_registered = 1;
2280 }
2281
2282 return 0;
2283
2284 out_error:
2285 return -1;
2286}
2287
2288/* Free ressources allocated by the SPOE filter. */
2289static void
2290spoe_deinit(struct proxy *px, struct flt_conf *fconf)
2291{
2292 struct spoe_config *conf = fconf->conf;
2293
2294 if (conf) {
2295 struct spoe_agent *agent = conf->agent;
2296 struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
2297 struct listener *, by_fe);
2298
2299 free(l);
2300 release_spoe_agent(agent);
2301 free(conf);
2302 }
2303 fconf->conf = NULL;
2304}
2305
2306/* Check configuration of a SPOE filter for a specified proxy.
2307 * Return 1 on error, else 0. */
2308static int
2309spoe_check(struct proxy *px, struct flt_conf *fconf)
2310{
2311 struct spoe_config *conf = fconf->conf;
2312 struct proxy *target;
2313
2314 target = proxy_be_by_name(conf->agent->b.name);
2315 if (target == NULL) {
2316 Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
2317 " declared at %s:%d.\n",
2318 px->id, conf->agent->b.name, conf->agent->id,
2319 conf->agent->conf.file, conf->agent->conf.line);
2320 return 1;
2321 }
2322 if (target->mode != PR_MODE_TCP) {
2323 Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
2324 " at %s:%d does not support HTTP mode.\n",
2325 px->id, target->id, conf->agent->id,
2326 conf->agent->conf.file, conf->agent->conf.line);
2327 return 1;
2328 }
2329
2330 free(conf->agent->b.name);
2331 conf->agent->b.name = NULL;
2332 conf->agent->b.be = target;
2333 return 0;
2334}
2335
2336/**************************************************************************
2337 * Hooks attached to a stream
2338 *************************************************************************/
2339/* Called when a filter instance is created and attach to a stream. It creates
2340 * the context that will be used to process this stream. */
2341static int
2342spoe_start(struct stream *s, struct filter *filter)
2343{
2344 struct spoe_context *ctx;
2345
2346 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2347 (int)now.tv_sec, (int)now.tv_usec,
2348 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2349 __FUNCTION__, s);
2350
2351 ctx = create_spoe_context(filter);
2352 if (ctx == NULL) {
2353 send_log(s->be, LOG_EMERG,
2354 "failed to create SPOE context for proxy %s\n",
2355 s->be->id);
2356 return 0;
2357 }
2358
2359 ctx->strm = s;
2360 ctx->state = SPOE_CTX_ST_READY;
2361 filter->ctx = ctx;
2362
2363 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
2364 filter->pre_analyzers |= AN_REQ_INSPECT_FE;
2365
2366 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
2367 filter->pre_analyzers |= AN_REQ_INSPECT_BE;
2368
2369 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
2370 filter->pre_analyzers |= AN_RES_INSPECT;
2371
2372 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
2373 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
2374
2375 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
2376 filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
2377
2378 if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
2379 filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
2380
2381 return 1;
2382}
2383
2384/* Called when a filter instance is detached from a stream. It release the
2385 * attached SPOE context. */
2386static void
2387spoe_stop(struct stream *s, struct filter *filter)
2388{
2389 struct spoe_context *ctx = filter->ctx;
2390
2391 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
2392 (int)now.tv_sec, (int)now.tv_usec,
2393 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2394 __FUNCTION__, s);
2395
2396 if (ctx) {
2397 release_spoe_appctx(ctx);
2398 destroy_spoe_context(ctx);
2399 }
2400}
2401
2402/* Called when we are ready to filter data on a channel */
2403static int
2404spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2405{
2406 struct spoe_context *ctx = filter->ctx;
2407 int ret = 1;
2408
2409 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2410 " - ctx-flags=0x%08x\n",
2411 (int)now.tv_sec, (int)now.tv_usec,
2412 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2413 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2414
2415 if (!(chn->flags & CF_ISRESP)) {
2416 if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
2417 chn->analysers |= AN_REQ_INSPECT_FE;
2418 if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
2419 chn->analysers |= AN_REQ_INSPECT_BE;
2420
2421 if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
2422 goto out;
2423
2424 ctx->stream_id = s->uniq_id;
2425 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2426 ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
2427 if (ret != 1)
2428 goto out;
2429 }
2430 ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
2431 }
2432 else {
2433 if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
2434 chn->analysers |= AN_RES_INSPECT;
2435
2436 if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
2437 goto out;
2438
2439 if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
2440 ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
2441 if (ret != 1)
2442 goto out;
2443 }
2444 ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
2445 }
2446
2447 out:
2448 if (!ret) {
2449 channel_dont_read(chn);
2450 channel_dont_close(chn);
2451 }
2452 return ret;
2453}
2454
2455/* Called before a processing happens on a given channel */
2456static int
2457spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
2458 struct channel *chn, unsigned an_bit)
2459{
2460 struct spoe_context *ctx = filter->ctx;
2461 int ret = 1;
2462
2463 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2464 " - ctx-flags=0x%08x - ana=0x%08x\n",
2465 (int)now.tv_sec, (int)now.tv_usec,
2466 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2467 __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
2468 ctx->flags, an_bit);
2469
2470 if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
2471 goto out;
2472
2473 switch (an_bit) {
2474 case AN_REQ_INSPECT_FE:
2475 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
2476 break;
2477 case AN_REQ_INSPECT_BE:
2478 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
2479 break;
2480 case AN_RES_INSPECT:
2481 ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
2482 break;
2483 case AN_REQ_HTTP_PROCESS_FE:
2484 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
2485 break;
2486 case AN_REQ_HTTP_PROCESS_BE:
2487 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
2488 break;
2489 case AN_RES_HTTP_PROCESS_FE:
2490 ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
2491 break;
2492 }
2493
2494 out:
2495 if (!ret) {
2496 channel_dont_read(chn);
2497 channel_dont_close(chn);
2498 }
2499 return ret;
2500}
2501
2502/* Called when the filtering on the channel ends. */
2503static int
2504spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
2505{
2506 struct spoe_context *ctx = filter->ctx;
2507
2508 SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
2509 " - ctx-flags=0x%08x\n",
2510 (int)now.tv_sec, (int)now.tv_usec,
2511 ((struct spoe_config *)FLT_CONF(filter))->agent->id,
2512 __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
2513
2514 if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
2515 reset_spoe_context(ctx);
2516 }
2517
2518 return 1;
2519}
2520
2521/********************************************************************
2522 * Functions that manage the filter initialization
2523 ********************************************************************/
2524struct flt_ops spoe_ops = {
2525 /* Manage SPOE filter, called for each filter declaration */
2526 .init = spoe_init,
2527 .deinit = spoe_deinit,
2528 .check = spoe_check,
2529
2530 /* Handle start/stop of SPOE */
2531 .attach = spoe_start,
2532 .detach = spoe_stop,
2533
2534 /* Handle channels activity */
2535 .channel_start_analyze = spoe_start_analyze,
2536 .channel_pre_analyze = spoe_chn_pre_analyze,
2537 .channel_end_analyze = spoe_end_analyze,
2538};
2539
2540
2541static int
2542cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
2543{
2544 const char *err;
2545 int i, err_code = 0;
2546
2547 if ((cfg_scope == NULL && curengine != NULL) ||
2548 (cfg_scope != NULL && curengine == NULL) ||
2549 strcmp(curengine, cfg_scope))
2550 goto out;
2551
2552 if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
2553 if (!*args[1]) {
2554 Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
2555 file, linenum);
2556 err_code |= ERR_ALERT | ERR_ABORT;
2557 goto out;
2558 }
2559 if (*args[2]) {
2560 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2561 file, linenum, args[2]);
2562 err_code |= ERR_ALERT | ERR_ABORT;
2563 goto out;
2564 }
2565
2566 err = invalid_char(args[1]);
2567 if (err) {
2568 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2569 file, linenum, *err, args[0], args[1]);
2570 err_code |= ERR_ALERT | ERR_ABORT;
2571 goto out;
2572 }
2573
2574 if (curagent != NULL) {
2575 Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
2576 file, linenum);
2577 err_code |= ERR_ALERT | ERR_ABORT;
2578 goto out;
2579 }
2580 if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
2581 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2582 err_code |= ERR_ALERT | ERR_ABORT;
2583 goto out;
2584 }
2585
2586 curagent->id = strdup(args[1]);
2587 curagent->conf.file = strdup(file);
2588 curagent->conf.line = linenum;
2589 curagent->timeout.hello = TICK_ETERNITY;
2590 curagent->timeout.ack = TICK_ETERNITY;
2591 curagent->timeout.idle = TICK_ETERNITY;
2592 curagent->var_pfx = NULL;
2593 curagent->new_applets = 0;
2594
2595 for (i = 0; i < SPOE_EV_EVENTS; ++i)
2596 LIST_INIT(&curagent->messages[i]);
2597 LIST_INIT(&curagent->cache);
2598 LIST_INIT(&curagent->applet_wq);
2599 }
2600 else if (!strcmp(args[0], "use-backend")) {
2601 if (!*args[1]) {
2602 Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
2603 file, linenum, args[0]);
2604 err_code |= ERR_ALERT | ERR_FATAL;
2605 goto out;
2606 }
2607 if (*args[2]) {
2608 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2609 file, linenum, args[2]);
2610 err_code |= ERR_ALERT | ERR_ABORT;
2611 goto out;
2612 }
2613 free(curagent->b.name);
2614 curagent->b.name = strdup(args[1]);
2615 }
2616 else if (!strcmp(args[0], "messages")) {
2617 int cur_arg = 1;
2618 while (*args[cur_arg]) {
2619 struct spoe_msg_placeholder *mp = NULL;
2620
2621 list_for_each_entry(mp, &curmps, list) {
2622 if (!strcmp(mp->id, args[cur_arg])) {
2623 Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
2624 file, linenum, args[cur_arg]);
2625 err_code |= ERR_ALERT | ERR_FATAL;
2626 goto out;
2627 }
2628 }
2629
2630 if ((mp = calloc(1, sizeof(*mp))) == NULL) {
2631 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2632 err_code |= ERR_ALERT | ERR_ABORT;
2633 goto out;
2634 }
2635 mp->id = strdup(args[cur_arg]);
2636 LIST_ADDQ(&curmps, &mp->list);
2637 cur_arg++;
2638 }
2639 }
2640 else if (!strcmp(args[0], "timeout")) {
2641 unsigned int *tv = NULL;
2642 const char *res;
2643 unsigned timeout;
2644
2645 if (!*args[1]) {
2646 Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
2647 file, linenum);
2648 err_code |= ERR_ALERT | ERR_FATAL;
2649 goto out;
2650 }
2651 if (!strcmp(args[1], "hello"))
2652 tv = &curagent->timeout.hello;
2653 else if (!strcmp(args[1], "idle"))
2654 tv = &curagent->timeout.idle;
2655 else if (!strcmp(args[1], "ack"))
2656 tv = &curagent->timeout.ack;
2657 else {
2658 Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' and 'ack' (got %s).\n",
2659 file, linenum, args[1]);
2660 err_code |= ERR_ALERT | ERR_FATAL;
2661 goto out;
2662 }
2663 if (!*args[2]) {
2664 Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
2665 file, linenum, args[1]);
2666 err_code |= ERR_ALERT | ERR_FATAL;
2667 goto out;
2668 }
2669 res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
2670 if (res) {
2671 Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
2672 file, linenum, *res, args[1]);
2673 err_code |= ERR_ALERT | ERR_ABORT;
2674 goto out;
2675 }
2676 if (*args[3]) {
2677 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2678 file, linenum, args[3]);
2679 err_code |= ERR_ALERT | ERR_ABORT;
2680 goto out;
2681 }
2682 *tv = MS_TO_TICKS(timeout);
2683 }
2684 else if (!strcmp(args[0], "option")) {
2685 if (!*args[1]) {
2686 Alert("parsing [%s:%d]: '%s' expects an option name.\n",
2687 file, linenum, args[0]);
2688 err_code |= ERR_ALERT | ERR_FATAL;
2689 goto out;
2690 }
2691 if (!strcmp(args[1], "var-prefix")) {
2692 char *tmp;
2693
2694 if (!*args[2]) {
2695 Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
2696 file, linenum, args[0],
2697 args[1]);
2698 err_code |= ERR_ALERT | ERR_FATAL;
2699 goto out;
2700 }
2701 tmp = args[2];
2702 while (*tmp) {
2703 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2704 Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
2705 file, linenum, args[0], args[1]);
2706 err_code |= ERR_ALERT | ERR_FATAL;
2707 goto out;
2708 }
2709 tmp++;
2710 }
2711 curagent->var_pfx = strdup(args[2]);
2712 }
2713 else {
2714 Alert("parsing [%s:%d]: option '%s' is not supported.\n",
2715 file, linenum, args[1]);
2716 err_code |= ERR_ALERT | ERR_FATAL;
2717 goto out;
2718 }
2719 }
2720 else if (*args[0]) {
2721 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
2722 file, linenum, args[0]);
2723 err_code |= ERR_ALERT | ERR_FATAL;
2724 goto out;
2725 }
2726 out:
2727 return err_code;
2728}
2729
2730static int
2731cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
2732{
2733 struct spoe_message *msg;
2734 struct spoe_arg *arg;
2735 const char *err;
2736 char *errmsg = NULL;
2737 int err_code = 0;
2738
2739 if ((cfg_scope == NULL && curengine != NULL) ||
2740 (cfg_scope != NULL && curengine == NULL) ||
2741 strcmp(curengine, cfg_scope))
2742 goto out;
2743
2744 if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
2745 if (!*args[1]) {
2746 Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
2747 file, linenum);
2748 err_code |= ERR_ALERT | ERR_ABORT;
2749 goto out;
2750 }
2751 if (*args[2]) {
2752 Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
2753 file, linenum, args[2]);
2754 err_code |= ERR_ALERT | ERR_ABORT;
2755 goto out;
2756 }
2757
2758 err = invalid_char(args[1]);
2759 if (err) {
2760 Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
2761 file, linenum, *err, args[0], args[1]);
2762 err_code |= ERR_ALERT | ERR_ABORT;
2763 goto out;
2764 }
2765
2766 list_for_each_entry(msg, &curmsgs, list) {
2767 if (!strcmp(msg->id, args[1])) {
2768 Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
2769 " name as another one declared at %s:%d.\n",
2770 file, linenum, args[1], msg->conf.file, msg->conf.line);
2771 err_code |= ERR_ALERT | ERR_FATAL;
2772 goto out;
2773 }
2774 }
2775
2776 if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
2777 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2778 err_code |= ERR_ALERT | ERR_ABORT;
2779 goto out;
2780 }
2781
2782 curmsg->id = strdup(args[1]);
2783 curmsg->id_len = strlen(curmsg->id);
2784 curmsg->event = SPOE_EV_NONE;
2785 curmsg->conf.file = strdup(file);
2786 curmsg->conf.line = linenum;
2787 LIST_INIT(&curmsg->args);
2788 LIST_ADDQ(&curmsgs, &curmsg->list);
2789 }
2790 else if (!strcmp(args[0], "args")) {
2791 int cur_arg = 1;
2792
2793 curproxy->conf.args.ctx = ARGC_SPOE;
2794 curproxy->conf.args.file = file;
2795 curproxy->conf.args.line = linenum;
2796 while (*args[cur_arg]) {
2797 char *delim = strchr(args[cur_arg], '=');
2798 int idx = 0;
2799
2800 if ((arg = calloc(1, sizeof(*arg))) == NULL) {
2801 Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
2802 err_code |= ERR_ALERT | ERR_ABORT;
2803 goto out;
2804 }
2805
2806 if (!delim) {
2807 arg->name = NULL;
2808 arg->name_len = 0;
2809 delim = args[cur_arg];
2810 }
2811 else {
2812 arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
2813 arg->name_len = delim - args[cur_arg];
2814 delim++;
2815 }
2816
2817 arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
2818 if (arg->expr == NULL) {
2819 Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
2820 err_code |= ERR_ALERT | ERR_FATAL;
2821 free(arg->name);
2822 free(arg);
2823 goto out;
2824 }
2825 LIST_ADDQ(&curmsg->args, &arg->list);
2826 cur_arg++;
2827 }
2828 curproxy->conf.args.file = NULL;
2829 curproxy->conf.args.line = 0;
2830 }
2831 else if (!strcmp(args[0], "event")) {
2832 if (!*args[1]) {
2833 Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
2834 err_code |= ERR_ALERT | ERR_ABORT;
2835 goto out;
2836 }
2837 if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
2838 curmsg->event = SPOE_EV_ON_CLIENT_SESS;
2839 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
2840 curmsg->event = SPOE_EV_ON_SERVER_SESS;
2841
2842 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
2843 curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
2844 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
2845 curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
2846 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
2847 curmsg->event = SPOE_EV_ON_TCP_RSP;
2848
2849 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
2850 curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
2851 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
2852 curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
2853 else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
2854 curmsg->event = SPOE_EV_ON_HTTP_RSP;
2855 else {
2856 Alert("parsing [%s:%d] : unkown event '%s'.\n",
2857 file, linenum, args[1]);
2858 err_code |= ERR_ALERT | ERR_ABORT;
2859 goto out;
2860 }
2861 }
2862 else if (!*args[0]) {
2863 Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
2864 file, linenum, args[0]);
2865 err_code |= ERR_ALERT | ERR_FATAL;
2866 goto out;
2867 }
2868 out:
2869 free(errmsg);
2870 return err_code;
2871}
2872
2873/* Return -1 on error, else 0 */
2874static int
2875parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
2876 struct flt_conf *fconf, char **err, void *private)
2877{
2878 struct list backup_sections;
2879 struct spoe_config *conf;
2880 struct spoe_message *msg, *msgback;
2881 struct spoe_msg_placeholder *mp, *mpback;
2882 char *file = NULL, *engine = NULL;
2883 int ret, pos = *cur_arg + 1;
2884
2885 conf = calloc(1, sizeof(*conf));
2886 if (conf == NULL) {
2887 memprintf(err, "%s: out of memory", args[*cur_arg]);
2888 goto error;
2889 }
2890 conf->proxy = px;
2891
2892 while (*args[pos]) {
2893 if (!strcmp(args[pos], "config")) {
2894 if (!*args[pos+1]) {
2895 memprintf(err, "'%s' : '%s' option without value",
2896 args[*cur_arg], args[pos]);
2897 goto error;
2898 }
2899 file = args[pos+1];
2900 pos += 2;
2901 }
2902 else if (!strcmp(args[pos], "engine")) {
2903 if (!*args[pos+1]) {
2904 memprintf(err, "'%s' : '%s' option without value",
2905 args[*cur_arg], args[pos]);
2906 goto error;
2907 }
2908 engine = args[pos+1];
2909 pos += 2;
2910 }
2911 else {
2912 memprintf(err, "unknown keyword '%s'", args[pos]);
2913 goto error;
2914 }
2915 }
2916 if (file == NULL) {
2917 memprintf(err, "'%s' : missing config file", args[*cur_arg]);
2918 goto error;
2919 }
2920
2921 /* backup sections and register SPOE sections */
2922 LIST_INIT(&backup_sections);
2923 cfg_backup_sections(&backup_sections);
2924 cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
2925 cfg_register_section("spoe-message", cfg_parse_spoe_message);
2926
2927 /* Parse SPOE filter configuration file */
2928 curengine = engine;
2929 curproxy = px;
2930 curagent = NULL;
2931 curmsg = NULL;
2932 ret = readcfgfile(file);
2933 curproxy = NULL;
2934
2935 /* unregister SPOE sections and restore previous sections */
2936 cfg_unregister_sections();
2937 cfg_restore_sections(&backup_sections);
2938
2939 if (ret == -1) {
2940 memprintf(err, "Could not open configuration file %s : %s",
2941 file, strerror(errno));
2942 goto error;
2943 }
2944 if (ret & (ERR_ABORT|ERR_FATAL)) {
2945 memprintf(err, "Error(s) found in configuration file %s", file);
2946 goto error;
2947 }
2948
2949 /* Check SPOE agent */
2950 if (curagent == NULL) {
2951 memprintf(err, "No SPOE agent found in file %s", file);
2952 goto error;
2953 }
2954 if (curagent->b.name == NULL) {
2955 memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
2956 curagent->id, curagent->conf.file, curagent->conf.line);
2957 goto error;
2958 }
2959 if (curagent->timeout.hello == TICK_ETERNITY ||
2960 curagent->timeout.idle == TICK_ETERNITY ||
2961 curagent->timeout.ack == TICK_ETERNITY) {
2962 Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
2963 " | While not properly invalid, you will certainly encounter various problems\n"
2964 " | with such a configuration. To fix this, please ensure that all following\n"
2965 " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack'.\n",
2966 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
2967 }
2968 if (curagent->var_pfx == NULL) {
2969 char *tmp = curagent->id;
2970
2971 while (*tmp) {
2972 if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
2973 memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
2974 "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
2975 curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
2976 goto error;
2977 }
2978 tmp++;
2979 }
2980 curagent->var_pfx = strdup(curagent->id);
2981 }
2982
2983 if (LIST_ISEMPTY(&curmps)) {
2984 Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
2985 px->id, curagent->id, curagent->conf.file, curagent->conf.line);
2986 goto finish;
2987 }
2988
2989 list_for_each_entry_safe(mp, mpback, &curmps, list) {
2990 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
2991 if (!strcmp(msg->id, mp->id)) {
2992 if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
2993 if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
2994 msg->event = SPOE_EV_ON_TCP_REQ_FE;
2995 if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
2996 msg->event = SPOE_EV_ON_HTTP_REQ_FE;
2997 }
2998 if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
2999 msg->event == SPOE_EV_ON_TCP_REQ_FE ||
3000 msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
3001 Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
3002 px->id, msg->conf.file, msg->conf.line);
3003 goto next;
3004 }
3005 if (msg->event == SPOE_EV_NONE) {
3006 Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
3007 px->id, msg->conf.file, msg->conf.line);
3008 goto next;
3009 }
3010 msg->agent = curagent;
3011 LIST_DEL(&msg->list);
3012 LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
3013 goto next;
3014 }
3015 }
3016 memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
3017 curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
3018 goto error;
3019 next:
3020 continue;
3021 }
3022
3023 finish:
3024 conf->agent = curagent;
3025 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3026 LIST_DEL(&mp->list);
3027 release_spoe_msg_placeholder(mp);
3028 }
3029 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3030 Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
3031 px->id, msg->id, msg->conf.file, msg->conf.line);
3032 LIST_DEL(&msg->list);
3033 release_spoe_message(msg);
3034 }
3035
3036 *cur_arg = pos;
3037 fconf->ops = &spoe_ops;
3038 fconf->conf = conf;
3039 return 0;
3040
3041 error:
3042 release_spoe_agent(curagent);
3043 list_for_each_entry_safe(mp, mpback, &curmps, list) {
3044 LIST_DEL(&mp->list);
3045 release_spoe_msg_placeholder(mp);
3046 }
3047 list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
3048 LIST_DEL(&msg->list);
3049 release_spoe_message(msg);
3050 }
3051 free(conf);
3052 return -1;
3053}
3054
3055
3056/* Declare the filter parser for "spoe" keyword */
3057static struct flt_kw_list flt_kws = { "SPOE", { }, {
3058 { "spoe", parse_spoe_flt, NULL },
3059 { NULL, NULL, NULL },
3060 }
3061};
3062
3063__attribute__((constructor))
3064static void __spoe_init(void)
3065{
3066 flt_register_keywords(&flt_kws);
3067
3068 LIST_INIT(&curmsgs);
3069 LIST_INIT(&curmps);
3070 pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
3071}
3072
3073__attribute__((destructor))
3074static void
3075__spoe_deinit(void)
3076{
3077 pool_destroy2(pool2_spoe_ctx);
3078}