MAJOR: spoe: Add an experimental Stream Processing Offload Engine
SPOE makes possible the communication with external components to retrieve some
info using an in-house binary protocol, the Stream Processing Offload Protocol
(SPOP). In the long term, its aim is to allow any kind of offloading on the
streams. This first version, besides being experimental, won't do lot of
things. The most important today is to validate the protocol design and lay the
foundations of what will, one day, be a full offload engine for the stream
processing.
So, for now, the SPOE can offload the stream processing before "tcp-request
content", "tcp-response content", "http-request" and "http-response" rules. And
it only supports variables creation/suppression. But, in spite of these limited
features, we can easily imagine to implement a SSO solution, an ip reputation
service or an ip geolocation service.
Internally, the SPOE is implemented as a filter. So, to use it, you must use
following line in a proxy proxy section:
frontend my-front
...
filter spoe [engine <name>] config <file>
...
It uses its own configuration file to keep the HAProxy configuration clean. It
is also a easy way to disable it by commenting out the filter line.
See "doc/SPOE.txt" for all details about the SPOE configuration.
diff --git a/Makefile b/Makefile
index b2b748c..b68d2c5 100644
--- a/Makefile
+++ b/Makefile
@@ -144,8 +144,8 @@
#### Debug settings
# You can enable debugging on specific code parts by setting DEBUG=-DDEBUG_xxx.
# Currently defined DEBUG macros include DEBUG_FULL, DEBUG_MEMORY, DEBUG_FSM,
-# DEBUG_HASH and DEBUG_AUTH. Please check sources for exact meaning or do not
-# use at all.
+# DEBUG_HASH, DEBUG_AUTH and DEBUG_SPOE. Please check sources for exact meaning
+# or do not use at all.
DEBUG =
#### Trace options
@@ -778,7 +778,7 @@
src/acl.o src/sample.o src/memory.o src/freq_ctr.o src/auth.o src/proto_udp.o \
src/compression.o src/payload.o src/hash.o src/pattern.o src/map.o \
src/namespace.o src/mailers.o src/dns.o src/vars.o src/filters.o \
- src/flt_http_comp.o src/flt_trace.o
+ src/flt_http_comp.o src/flt_trace.o src/flt_spoe.o
EBTREE_OBJS = $(EBTREE_DIR)/ebtree.o \
$(EBTREE_DIR)/eb32tree.o $(EBTREE_DIR)/eb64tree.o \
diff --git a/doc/SPOE.txt b/doc/SPOE.txt
new file mode 100644
index 0000000..538bb26
--- /dev/null
+++ b/doc/SPOE.txt
@@ -0,0 +1,829 @@
+ -----------------------------------------------
+ Stream Processing Offload Engine (SPOE)
+ Version 1.0
+ ( Last update: 2016-11-07 )
+ -----------------------------------------------
+ Author : Christopher Faulet
+ Contact : cfaulet at haproxy dot com
+
+
+SUMMARY
+--------
+
+ 0. Terms
+ 1. Introduction
+ 2. SPOE configuration
+ 2.1. SPOE scope
+ 2.2. "spoe-agent" section
+ 2.3. "spoe-message" section
+ 2.4. Example
+ 3. SPOP specification
+ 3.1. Data types
+ 3.2. Frames
+ 3.2.1. Frame capabilities
+ 3.2.2. Frame types overview
+ 3.2.3. Workflow
+ 3.2.4. Frame: HAPROXY-HELLO
+ 3.2.5. Frame: AGENT-HELLO
+ 3.2.6. Frame: NOTIFY
+ 3.2.7. Frame: ACK
+ 3.2.8. Frame: HAPROXY-DISCONNECT
+ 3.2.9. Frame: AGENT-DISCONNECT
+ 3.3. Events & messages
+ 3.4. Actions
+ 3.5. Error & timeouts
+
+
+0. Terms
+---------
+
+* SPOE : Stream Processing Offload Engine.
+
+ A SPOE is a filter talking to servers managed ba a SPOA to offload the
+ stream processing. An engine is attached to a proxy. A proxy can have
+ several engine. Each engine is linked to an agent and only one.
+
+* SPOA : Stream Processing Offload Agent.
+
+ A SPOA is a service that will receive info from a SPOE to offload the
+ stream processing. An agent manages several servers. It uses a backend to
+ reference all of them. By extension, these servers can also be called
+ agents.
+
+* SPOP : Stream Processing Offload Protocol, used by SPOEs to talk to SPOA
+ servers.
+
+ This protocol is used by engines to talk to agents. It is an in-house
+ binary protocol described in this documentation.
+
+
+1. Introduction
+----------------
+
+SPOE is a feature introduced in HAProxy 1.7. It makes possible the
+communication with external components to retrieve some info. The idea started
+with the problems caused by most ldap libs not working fine in event-driven
+systems (often at least the connect() is blocking). So, it is hard to properly
+implement Single Sign On solution (SSO) in HAProxy. The SPOE will ease this
+kind of processing, or we hope so.
+
+Now, the aim of SPOE is to allow any kind of offloading on the streams. First
+releases, besides being experimental, won't do lot of things. As we will see,
+there are few handled events and even less actions supported. Actually, for
+now, the SPOE can offload the processing before "tcp-request content",
+"tcp-response content", "http-request" and "http-response" rules. And it only
+supports variables definition. But, in spite of these limited features, we can
+easily imagine to implement SSO solution, ip reputation or ip geolocation
+services.
+
+
+2. SPOE configuration
+----------------------
+
+Because SPOE is implemented as a filter, To use it, you must declare a "filter
+spoe" line in a proxy section (frontend/backend/listen) :
+
+ frontend my-front
+ ...
+ filter spoe [engine <name>] config <file>
+ ...
+
+The "config" parameter is mandatory. It specififies the SPOE configuration
+file. The engine name is optional. It can be set to declare the scope to use in
+the SPOE configuration. So it is possible to use the same SPOE configuration
+for several engines. If no name is provided, the SPOE configuration must not
+contain any scope directive.
+
+We use a separate configuration file on purpose. By commenting SPOE filter
+line, you completly disable the feature, including the parsing of sections
+reserved to SPOE. This is also a way to keep the HAProxy configuration clean.
+
+A SPOE configuration file must contains, at least, the SPOA configuration
+("spoe-agent" section) and SPOE messages ("spoe-message" section) attached to
+this agent. Unused messages (not reference in "spoe-agent" section) will be
+ignored.
+
+IMPORTANT : The configuration of a SPOE filter must be located in a dedicated
+file. But the backend used by a SPOA must be declared in HAProxy configuration
+file.
+
+2.1. SPOE scope
+-------------------------
+
+If you specify an engine name on the SPOE filter line, then you need to define
+scope in the SPOE configuration with the same name. You can have several SPOE
+scope in the same file. In each scope, you must define one and only one
+"spoe-agent" section to configure the SPOA linked to your SPOE and several
+"spoe-message" sections to describe messages sent to servers mananger by your
+SPOA.
+
+A SPOE scope starts with this kind of line :
+
+ [<name>]
+
+where <name> is the same engine name specified on the SPOE filter line. The
+scope ends when the file ends or when another scope is found.
+
+ Example :
+ [my-first-engine]
+ spoe-agent my-agent
+ ...
+ spoe-message msg1
+ ...
+ spoe-message msg2
+ ...
+
+ [my-second-engine]
+ ...
+
+If no engine name is provided on the SPOE filter line, no SPOE scope must be
+found in the SPOE configuration file. All the file is considered to be in the
+same anonymous and implicit scope.
+
+2.2. "spoe-agent" section
+--------------------------
+
+For each engine, you must define one and only one "spoe-agent" section. In this
+section, you will declare SPOE messages and the backend you will use. You will
+also set timeouts and options to customize your agent's behaviour.
+
+
+spoe-agent <name>
+ Create a new SPOA with the name <name>. It must have one and only one
+ "spoe-agent" definition by SPOE scope.
+
+ Arguments :
+ <name> is the name of the agent section.
+
+ following keywords are supported :
+ - messages
+ - option var-prefix
+ - timeout hello|idle|ack
+ - use-backend
+
+
+messages <msg-name> ...
+ Declare the list of SPOE messages that an agent will handle.
+
+ Arguments :
+ <msg-name> is the name of a SPOE message.
+
+ Messages declared here must be found in the same engine scope, else an error
+ is triggered during the configuration parsing. You can have many "messages"
+ lines.
+
+ See also: "spoe-message" section.
+
+
+option var-prefix <prefix>
+ Define the prefix used when variables are set by an agent.
+
+ Arguments :
+
+ <prefix> is the prefix used to limit the scope of variables set by an
+ agent.
+
+ To avoid conflict with other variables defined by HAProxy, all variables
+ names will be prefixed. By default, the "spoe-agent" name is used. This
+ option can be used to customize it.
+
+ The prefix will be added between the variable scope and its name, separated
+ by a '.'. It may only contain characters 'a-z', 'A-Z', '0-9', '.' and '_', as
+ for variables name. In HAProxy configuration, you need to use this prefix as
+ a part of the variables name. For example, if an agent define the variable
+ "myvar" in the "txn" scope, with the prefix "my_spoe_pfx", then you should
+ use "txn.my_spoe_pfx.myvar" name in your HAProxy configuration.
+
+ An agent will never set new variables at runtime. It can only set new value
+ for existing ones.
+
+
+timeout ack <timeout>
+ Set the maximum time to wait for an agent to receive the acknowledgement to a
+ NOTIFY frame.
+
+ Arguments :
+ <timeout> is the timeout value specified in milliseconds by default, but
+ can be in any other unit if the number is suffixed by the unit,
+ as explained at the top of this document.
+
+
+timeout hello <timeout>
+ Set the maximum time to wait for an agent to receive the AGENT-HELLO frame.
+
+ Arguments :
+ <timeout> is the timeout value specified in milliseconds by default, but
+ can be in any other unit if the number is suffixed by the unit,
+ as explained at the top of this document.
+
+ This timeout is an applicative timeout. It differ from "timeout connect"
+ defined on backends.
+
+
+timeout idle <timeout>
+ Set the maximum time to wait for an agent to close an idle connection.
+
+ Arguments :
+ <timeout> is the timeout value specified in milliseconds by default, but
+ can be in any other unit if the number is suffixed by the unit,
+ as explained at the top of this document.
+
+
+use-backend <backend>
+ Specify the backend to use. It must be defined.
+
+ Arguments :
+ <backend> is the name of a valid "backend" section.
+
+
+2.3. "spoe-message" section
+----------------------------
+
+To offload the stream processing, SPOE will send messages with specific
+information at a specific moment in the stream life and will wait for
+corresponding replies to know what to do.
+
+
+spoe-message <name>
+ Create a new SPOE message with the name <name>.
+
+ Arguments :
+ <name> is the name of the SPOE message.
+
+ Here you define a message that can be referenced in a "spoe-agent"
+ section. Following keywords are supported :
+ - args
+ - event
+
+ See also: "spoe-agent" section.
+
+
+args [name=]<sample> ...
+ Define arguments passed into the SPOE message.
+
+ Arguments :
+ <sample> is a sample expression.
+
+ When the message is processed, if a sample expression is not available, it is
+ set to NULL. Arguments are processed in their declaration order and added in
+ the message in that order. It is possible to declare named arguements.
+
+ For example:
+ args frontend=fe_id src dst
+
+
+event <name>
+ Set the event that triggers sending of the message.
+
+ Argument :
+ <name> is the event name.
+
+ Supported events are:
+ - on-client-session
+ - on-server-connectiob
+ - on-frontend-tcp-request
+ - on-backend-tcp-request
+ - on-tcp-response
+ - on-frontend-http-request
+ - on-backend-http-request
+ - on-http-response
+
+ See section 3.5 about Events.
+
+2.4. Example
+-------------
+
+Here is a simple but complete example that sends client-ip address to a ip
+reputation service. This service can set the variable "ip_score" which is an
+integer between 0 and 100, indicating its reputation (100 means totally safe
+and 0 a blacklisted IP with no doubt).
+
+ ###
+ ### HAProxy configuration
+ frontend www
+ mode http
+ bind *:80
+
+ filter spoe engine ip-reputation config spoe-ip-reputation.conf
+
+ # Reject connection if the IP reputation is under 20
+ tcp-request content reject if { var(sess.iprep.ip_score) -m int lt 20 }
+
+ default_backend http-servers
+
+ backend http-servers
+ mode http
+ server http A.B.C.D:80
+
+ backend iprep-servers
+ mode tcp
+ balance roundrobin
+
+ timeout connect 5s # greater than hello timeout
+ timeout server 3m # greater than idle timeout
+
+ server iprep1 A1.B1.C1.D1:12345
+ server iprep2 A2.B2.C2.D2:12345
+
+ ####
+ ### spoe-ip-reputation.conf
+ [ip-reputation]
+
+ spoe-agent iprep-agent
+ messages get-ip-reputation
+
+ option var-prefix iprep
+
+ timeout hello 2s
+ timeout ack 10ms
+ timeout idle 2m
+
+ use-backend iprep-servers
+
+ spoe-message get-ip-reputation
+ args ip=src
+ event on-client-session
+
+
+3. SPOP specification
+----------------------
+
+3.1. Data types
+----------------
+
+Here is the bytewise representation of typed data:
+
+ TYPED-DATA : <TYPE:4 bits><FLAGS:4 bits><DATA>
+
+Supported types and their representation are:
+
+ TYPE | ID | DESCRIPTION
+ -----------------------------+-----+----------------------------------
+ NULL | 0 | NULL : <0>
+ Boolean | 1 | BOOL : <1+FLAG>
+ 32bits signed integer | 2 | INT32 : <2><VALUE:varint>
+ 32bits unsigned integer | 3 | UINT32 : <3><VALUE:varint>
+ 64bits signed integer | 4 | INT64 : <4><VALUE:varint>
+ 32bits unsigned integer | 5 | UNIT64 : <5><VALUE:varint>
+ IPV4 | 6 | IPV4 : <6><STRUCT IN_ADDR:4 bytes>
+ IPV6 | 7 | IPV6 : <7><STRUCT IN_ADDR6:16 bytes>
+ String | 8 | STRING : <8><LENGTH:varint><BYTES>
+ Binary | 9 | BINARY : <9><LENGTH:varint><BYTES>
+ 10 -> 15 unused/reserved | - | -
+ -----------------------------+-----+----------------------------------
+
+Variable-length integer (varint) are encoded using Peers encoding:
+
+
+ 0 <= X < 240 : 1 byte (7.875 bits) [ XXXX XXXX ]
+ 240 <= X < 2288 : 2 bytes (11 bits) [ 1111 XXXX ] [ 0XXX XXXX ]
+ 2288 <= X < 264432 : 3 bytes (18 bits) [ 1111 XXXX ] [ 1XXX XXXX ] [ 0XXX XXXX ]
+ 264432 <= X < 33818864 : 4 bytes (25 bits) [ 1111 XXXX ] [ 1XXX XXXX ]*2 [ 0XXX XXXX ]
+ 33818864 <= X < 4328786160 : 5 bytes (32 bits) [ 1111 XXXX ] [ 1XXX XXXX ]*3 [ 0XXX XXXX ]
+ ...
+
+For booleans, the value (true or false) is the first bit in the FLAGS
+bitfield. if this bit is set to 0, then the boolean is evaluated as false,
+otherwise, the boolean is evaluated as true.
+
+3.2. Frames
+------------
+
+Exchange between HAProxy and agents are made using FRAME packets. All frames
+must be prefixed with their size encoded on 4 bytes in network byte order:
+
+ <FRAME-LENGTH:4 bytes> <FRAME>
+
+A frame always starts with its type, on one byte, followed by metadata
+containing flags, on 4 bytes and a two variable-length integer representing the
+stream identifier and the frame identifier inside the stream:
+
+ FRAME : <FRAME-TYPE:1 byte> <METADATA> <FRAME-PAYLOAD>
+ METADATA : <FLAGS:4 bytes> <STREAM-ID:varint> <FRAME-ID:varint>
+
+Then comes the frame payload. Depending on the frame type, the payload can be
+of three types: a simple key/value list, a list of messages or a list of
+actions.
+
+ FRAME-PAYLOAD : <LIST-OF-MESSAGES> | <LIST-OF-ACTIONS> | <KV-LIST>
+
+ LIST-OF-MESSAGES : [ <MESSAGE-NAME> <NB-ARGS:1 byte> <KV-LIST> ... ]
+ MESSAGE-NAME : <STRING>
+
+ LIST-OF-ACTIONS : [ <ACTION-TYPE:1 byte> <NB-ARGS:1 byte> <ACTION-ARGS> ... ]
+ ACTION-ARGS : [ <TYPED-DATA>... ]
+
+ KV-LIST : [ <KV-NAME> <KV-VALUE> ... ]
+ KV-NAME : <STRING>
+ KV-VALUE : <TYPED-DATA>
+
+ FLAGS : 0 1-31
+ +---+-----------+
+ | F| |
+ | I| RESERVED |
+ | N| |
+ +--+------------+
+
+ FIN: Indicates that this is the final payload fragment. The first fragment
+ may also be the final fragment.
+
+Frames cannot exceed a maximum size negociated between HAProxy and agents
+during the HELLO handshake. Most of time, payload will be small enough to send
+it in one frame. But when supported by the peer, it will be possible to
+fragment huge payload on many frames. This ability is announced during the
+HELLO handshake and it can be asynmetric (supported by agents but not by
+HAProxy or the opposite). The following rules apply to fragmentation:
+
+ * An unfragemnted payload consists of a single frame with the FIN bit set.
+
+ * A fragemented payload consists of several frames with the FIN bit clear and
+ terminated by a single frame with the FIN bit set. All these frames must
+ share the same STREAM-ID and FRAME-ID. And, of course, the FRAME-TYPE must
+ be the same.
+
+Beside the support of fragmented payload by a peer, some payload must not be
+fragmented. See below for details.
+
+IMPORTANT : The maximum size supported by peers for a frame must be greater or
+equal to 256 bytes.
+
+3.2.1. Frame capabilities
+--------------------------
+
+Here are the list of official capabilities that HAProxy and agents can support:
+
+ * fragmentation: This is the abaility for a peer to support fragmented
+ payload in received frames.
+
+Unsupported or unknown capabilities are silently ignored, when possible.
+
+3.2.2. Frame types overview
+----------------------------
+
+Here are types of frame supported by SPOE. Frames sent by HAProxy come first,
+then frames sent by agents :
+
+ TYPE | ID | DESCRIPTION
+ -----------------------------+-----+-------------------------------------
+ HAPROXY-HELLO | 1 | Sent by HAProxy when it opens a
+ | | connection on an agent.
+ | |
+ HAPROXY-DISCONNECT | 2 | Sent by HAProxy when it want to close
+ | | the connection or in reply to an
+ | | AGENT-DISCONNECT frame
+ | |
+ NOTIFY | 3 | Sent by HAProxy to pass information
+ | | to an agent
+ -----------------------------+-----+-------------------------------------
+ AGENT-HELLO | 101 | Reply to a HAPROXY-HELLO frame, when
+ | | the connection is established
+ | |
+ AGENT-DISCONNECT | 102 | Sent by an agent just before closing
+ | | the connection
+ | |
+ ACK | 103 | Sent to acknowledge a NOTIFY frame
+ -----------------------------+-----+-------------------------------------
+
+Unknown frames may be silently skipped.
+
+3.2.3. Workflow
+----------------
+
+ * Successful HELLO handshake:
+
+ HAPROXY AGENT SRV
+ | HAPROXY-HELLO |
+ | --------------------------> |
+ | |
+ | AGENT-HELLO |
+ | <-------------------------- |
+ | |
+
+
+ * Error encountered by agent during the HELLO handshake:
+
+ HAPROXY AGENT SRV
+ | HAPROXY-HELLO |
+ | --------------------------> |
+ | |
+ | DISCONNECT + close() |
+ | <-------------------------- |
+ | |
+
+ * Error encountered by HAProxy during the HELLO handshake:
+
+ HAPROXY AGENT SRV
+ | HAPROXY-HELLO |
+ | --------------------------> |
+ | |
+ | AGENT-HELLO |
+ | <-------------------------- |
+ | |
+ | DISCONNECT |
+ | --------------------------> |
+ | |
+ | DISCONNECT + close() |
+ | <-------------------------- |
+ | |
+
+ * Notify / Ack exchange:
+
+ HAPROXY AGENT SRV
+ | NOTIFY |
+ | --------------------------> |
+ | |
+ | ACK |
+ | <-------------------------- |
+ | |
+
+ * Connection closed by haproxy:
+
+ HAPROXY AGENT SRV
+ | DISCONNECT |
+ | --------------------------> |
+ | |
+ | DISCONNECT + close() |
+ | <-------------------------- |
+ | |
+
+ * Connection closed by agent:
+
+ HAPROXY AGENT SRV
+ | DISCONNECT + close() |
+ | <-------------------------- |
+ | |
+
+3.2.4. Frame: HAPROXY-HELLO
+----------------------------
+
+This frame is the first one exchanged between HAProxy and an agent, when the
+connection is established. The payload of this frame is a KV-LIST. It cannot be
+fragmented. STREAM-ID and FRAME-ID are must be set 0.
+
+Following items are mandatory in the KV-LIST:
+
+ * "supported-versions" <STRING>
+
+ Last SPOP major versions supported by HAProxy. It is a comma-separated list
+ of versions, following the format "Major.Minor". Spaces must be ignored, if
+ any. When a major version is announced by HAProxy, it means it also support
+ all previous minor versions.
+
+ Example: "2.0, 1.5" means HAProxy supports SPOP 2.0 and 1.0 to 1.5
+
+ * "max-frame-size" <UINT32>
+
+ This is the maximum size allowed for a frame. The HAPROXY-HELLO frame must
+ be lower or equal to this value.
+
+ * "capabilities" <STRING>
+
+ This a comma-separated list of capabilities supported by HAProxy. Spaces
+ must be ignored, if any.
+
+To finish the HELLO handshake, the agent must return an AGENT-HELLO frame with
+its supported SPOP version, the lower value between its maximum size allowed
+for a frame and the HAProxy one and capabilities it supports. If an error
+occurs or if an incompatibility is detected with the agent configuration, an
+AGENT-DISCONNECT frame must be returned.
+
+3.2.5. Frame: AGENT-HELLO
+--------------------------
+
+This frame is sent in reply to a HAPROXY-HELLO frame to finish a HELLO
+handshake. As for HAPROXY-HELLO frame, STREAM-ID and FRAME-ID are also set
+0. The payload of this frame is a KV-LIST and it cannot be fragmented.
+
+Following items are mandatory in the KV-LIST:
+
+ * "version" <STRING>
+
+ This is the SPOP version the agent supports. It must follow the format
+ "Major.Minor" and it must be lower or equal than one of major versions
+ announced by HAProxy.
+
+ * "max-frame-size" <UINT32>
+
+ This is the maximum size allowed for a frame. It must be lower or equal to
+ the value in the HAPROXY-HELLO frame. This value will be used for all
+ subsequent frames.
+
+ * "capabilities" <STRING>
+
+ This a comma-separated list of capabilities supported by agent. Spaces must
+ be ignored, if any.
+
+At this time, if everything is ok for HAProxy (supported version and valid
+max-frame-size value), the HELLO handshake is successfully completed. Else,
+HAProxy sends a HAPROXY-DISCONNECT frame with the corresponding error.
+
+3.2.6. Frame: NOTIFY
+---------------------
+
+Information are sent to the agents inside NOTIFY frames. These frames are
+attached to a stream, so STREAM-ID and FRAME-ID must be set. The payload of
+NOTIFY frames is a LIST-OF-MESSAGES and, if supported by agents, it can be
+fragmented.
+
+NOTIFY frames must be acknowledge by agents sending an ACK frame, repeating
+right STREAM-ID and FRAME-ID.
+
+3.2.7. Frame: ACK
+------------------
+
+ACK frames must be sent by agents to reply to NOTIFY frames. STREAM-ID and
+FRAME-ID found in a NOTIFY frame must be reuse in the corresponding ACK
+frame. The payload of ACK frames is a LIST-OF-ACTIONS and, if supported by
+HAProxy, it can be fragmented.
+
+3.2.8. Frame: HAPROXY-DISCONNECT
+---------------------------------
+
+If an error occurs, at anytime, from the HAProxy side, a HAPROXY-DISCONNECT
+frame is sent with information describing the error. HAProxy will wait an
+AGENT-DISCONNECT frame in reply. All other frames will be ignored. The agent
+must then close the socket.
+
+The payload of this frame is a KV-LIST. It cannot be fragmented. STREAM-ID and
+FRAME-ID are must be set 0.
+
+Following items are mandatory in the KV-LIST:
+
+ * "status-code" <UINT32>
+
+ This is the code corresponding to the error.
+
+ * "message" <STRING>
+
+ This is a textual message describing the error.
+
+For more information about known errors, see section "Errors & timeouts"
+
+3.2.9. Frame: AGENT-DISCONNECT
+-------------------------------
+
+If an error occurs, at anytime, from the agent size, a AGENT-DISCONNECT frame
+is sent, with information desribing the error. such frame is also sent in reply
+to a HAPROXY-DISCONNECT. The agent must close the socket just after sending
+this frame.
+
+The payload of this frame is a KV-LIST. It cannot be fragmented. STREAM-ID and
+FRAME-ID are must be set 0.
+
+Following items are mandatory in the KV-LIST:
+
+ * "status-code" <UINT32>
+
+ This is the code corresponding to the error.
+
+ * "message" <STRING>
+
+ This is a textual message describing the error.
+
+For more information about known errors, see section "Errors & timeouts"
+
+3.3. Events & Messages
+-----------------------
+
+Information about streams are sent in NOTIFY frames. You can specify which kind
+of information to send by defining "spoe-message" sections in your SPOE
+configuration file. for each "spoe-message" there will be a message in a NOTIFY
+frame when the right event is triggered.
+
+A NOTIFY frame is sent for an specific event when there is at least one
+"spoe-message" attached to this event. All messages for an event will be added
+in the same NOTIFY frame.
+
+Here is the list of supported events:
+
+ * on-client-session is triggered when a new client session is created.
+ This event is only available for SPOE filters
+ declared in a frontend or a listen section.
+
+ * on-frontend-tcp-request is triggered just before the evaluation of
+ "tcp-request content" rules on the frontend side.
+ This event is only available for SPOE filters
+ declared in a frontend or a listen section.
+
+ * on-backend-tcp-request is triggered just before the evaluation of
+ "tcp-request content" rules on the backend side.
+ This event is skipped for SPOE filters declared
+ in a listen section.
+
+ * on-frontend-http-request is triggered just before the evaluation of
+ "http-request" rules on the frontend side. This
+ event is only available for SPOE filters declared
+ in a frontend or a listen section.
+
+ * on-backend-http-request is triggered just before the evaluation of
+ "http-request" rules on the backend side. This
+ event is skipped for SPOE filters declared in a
+ listen section.
+
+ * on-server-session is triggered when the session with the server is
+ established.
+
+ * on-tcp-response is triggered just before the evaluation of
+ "tcp-response content" rules.
+
+ * on-http-response is triggered just before the evaluation of
+ "http-response" rules.
+
+
+The stream processing will loop on these events, when triggered, waiting the
+agent reply.
+
+3.4. Actions
+-------------
+
+An agent must acknowledge each NOTIFY frame by sending the corresponding ACK
+frame. Actions can be added in these frames to dynamically take action on the
+processing of a stream.
+
+Here is the list of supported actions:
+
+ * set-var set the value for an existing variable. 3 arguments must be
+ attached to this action: the variable scope (proc, sess, txn,
+ req or req), the variable name (a string) and its value.
+
+ ACTION-SET-VAR : <SET-VAR:1 byte><NB-ARGS:1 byte><VAR-SCOPE:1 byte><VAR-NAME><VAR-VALUE>
+
+ SET-VAR : <1>
+ NB-ARGS : <3>
+ VAR-SCOPE : <PROCESS> | <SESSION> | <TRANSACTION> | <REQUEST> | <RESPONSE>
+ VAR-NAME : <STRING>
+ VAR-VALUE : <TYPED-DATA>
+
+ PROCESS : <0>
+ SESSION : <1>
+ TRANSACTION : <2>
+ REQUEST : <3>
+ RESERVED : <4>
+
+ * unset-var unset the value for an existing variable. 2 arguments must be
+ attached to this action: the variable scope (proc, sess, txn,
+ req or req) and the variable name (a string).
+
+ ACTION-UNSET-VAR : <SET-VAR:1 byte><NB-ARGS:1 byte><VAR-SCOPE:1 byte><VAR-NAME>
+
+ SET-VAR : <1>
+ NB-ARGS : <3>
+ VAR-SCOPE : <PROCESS> | <SESSION> | <TRANSACTION> | <REQUEST> | <RESPONSE>
+ VAR-NAME : <STRING>
+
+ PROCESS : <0>
+ SESSION : <1>
+ TRANSACTION : <2>
+ REQUEST : <3>
+ RESERVED : <4>
+
+
+NOTE: Name of the variables will be automatically prefixed by HAProxy to avoid
+ name clashes with other variables used in HAProxy. Moreover, unknown
+ variable will be silently ignored.
+
+3.5. Error & timeouts
+----------------------
+
+Here is the list of all known errors:
+
+ STATUS CODE | DESCRIPTION
+ ----------------+--------------------------------------------------------
+ 0 | normal (no error occurred)
+ 1 | I/O error
+ 2 | A timeout occurred
+ 3 | frame is too big
+ 4 | invalid frame received
+ 5 | version value not found
+ 6 | max-frame-size value not found
+ 7 | capabilities value not found
+ 8 | unsupported version
+ 9 | max-frame-size too big or too small
+ 99 | an unknown error occurrde
+ ----------------+--------------------------------------------------------
+
+An agent can define its own errors using a not yet assigned status code.
+
+IMPORTANT NOTE: For a specific stream, when an abnormal/unexpected error
+ occurs, the SPOE is disabled for all the transaction. So if you
+ have several events configured, such error on an event will
+ disabled all followings. For TCP streams, this will disable the
+ SPOE for the whole session. For HTTP streams, this will disable
+ it for the transaction (request and response).
+
+To avoid a stream to wait infinitly, you must carefully choose the
+acknowledgement timeout. In most of cases, it will be quiet low. But it depends
+on the responsivness of your service.
+
+You must also choose idle timeout carefully. Because connection with your
+service depends on the backend configuration used by the SPOA, it is important
+to use a lower value for idle timeout than the server timeout. Else the
+connection will be closed by HAProxy. The same is true for hello timeout. You
+should choose a lower value than the connect timeout.
+
+
+/*
+ * Local variables:
+ * fill-column: 79
+ * End:
+ */
diff --git a/doc/configuration.txt b/doc/configuration.txt
index 2a6183c..3139650 100644
--- a/doc/configuration.txt
+++ b/doc/configuration.txt
@@ -106,6 +106,7 @@
9. Supported filters
9.1. Trace
9.2. HTTP compression
+9.3. Stream Processing Offload Engine (SPOE)
1. Quick reminder about HTTP
@@ -16182,6 +16183,37 @@
See also : "compression"
+9.3. Stream Processing Offload Engine (SPOE)
+--------------------------------------------
+
+filter spoe [engine <name>] config <file>
+
+ Arguments :
+
+ <name> is the engine name that will be used to find the right scope in
+ the configuration file. If not provided, all the file will be
+ parsed.
+
+ <file> is the path of the engine configuration file. This file can
+ contain configuration of several engines. In this case, each
+ part must be placed in its own scope.
+
+The Stream Processing Offload Engine (SPOE) is a filter communicating with
+external components. It allows the offload of some specifics processing on the
+streams in tierce applications. These external components and information
+exchanged with them are configured in dedicated files, for the main part. It
+also requires dedicated backends, defined in HAProxy configuration.
+
+SPOE communicates with external components using an in-house binary protocol,
+the Stream Processing Offload Protocol (SPOP).
+
+For all information about the SPOE configuation and the SPOP specification, see
+"doc/SPOE.txt".
+
+Important note:
+ The SPOE filter is highly experimental for now and was not heavily
+ tested. It is really not production ready. So use it carefully.
+
/*
* Local variables:
* fill-column: 79
diff --git a/include/types/applet.h b/include/types/applet.h
index 91d7d8f..1f094e6 100644
--- a/include/types/applet.h
+++ b/include/types/applet.h
@@ -145,6 +145,14 @@
struct {
char **var;
} env;
+ struct {
+ struct task *task;
+ void *ctx;
+ void *agent;
+ unsigned int version;
+ unsigned int max_frame_size;
+ struct list list;
+ } spoe; /* used by SPOE filter */
} ctx; /* used by stats I/O handlers to dump the stats */
};
diff --git a/include/types/arg.h b/include/types/arg.h
index fef12ec..7576f8a 100644
--- a/include/types/arg.h
+++ b/include/types/arg.h
@@ -76,6 +76,7 @@
ARGC_RDR, /* redirect */
ARGC_CAP, /* capture rule */
ARGC_SRV, /* server line */
+ ARGC_SPOE, /* spoe message args */
};
/* flags used when compiling and executing regex */
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
new file mode 100644
index 0000000..1ebdbda
--- /dev/null
+++ b/src/flt_spoe.c
@@ -0,0 +1,3013 @@
+/*
+ * Stream processing offload engine management.
+ *
+ * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ *
+ */
+#include <ctype.h>
+#include <errno.h>
+
+#include <common/buffer.h>
+#include <common/cfgparse.h>
+#include <common/compat.h>
+#include <common/config.h>
+#include <common/debug.h>
+#include <common/memory.h>
+#include <common/time.h>
+
+#include <types/arg.h>
+#include <types/filters.h>
+#include <types/global.h>
+#include <types/proxy.h>
+#include <types/sample.h>
+#include <types/stream.h>
+
+#include <proto/arg.h>
+#include <proto/backend.h>
+#include <proto/filters.h>
+#include <proto/frontend.h>
+#include <proto/log.h>
+#include <proto/proto_http.h>
+#include <proto/proxy.h>
+#include <proto/sample.h>
+#include <proto/session.h>
+#include <proto/signal.h>
+#include <proto/stream.h>
+#include <proto/stream_interface.h>
+#include <proto/task.h>
+#include <proto/vars.h>
+
+#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
+#define SPOE_PRINTF(x...) fprintf(x)
+#else
+#define SPOE_PRINTF(x...)
+#endif
+
+/* Helper to get ctx inside an appctx */
+#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe)
+
+/* TODO: add an option to customize these values */
+/* The maximum number of new applet waiting the end of the hello handshake */
+#define MAX_NEW_SPOE_APPLETS 5
+
+/* The maximum number of error when a stream is waiting of a SPOE applet */
+#define MAX_NEW_SPOE_APPLET_ERRS 3
+
+/* Minimal size for a frame */
+#define MIN_FRAME_SIZE 256
+
+/* Flags set on the SPOE context */
+#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */
+#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */
+#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */
+#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */
+
+#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS)
+
+#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */
+#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */
+
+/* All possible states for a SPOE context */
+enum spoe_ctx_state {
+ SPOE_CTX_ST_NONE = 0,
+ SPOE_CTX_ST_READY,
+ SPOE_CTX_ST_SENDING_MSGS,
+ SPOE_CTX_ST_WAITING_ACK,
+ SPOE_CTX_ST_DONE,
+ SPOE_CTX_ST_ERROR,
+};
+
+/* All possible states for a SPOE applet */
+enum spoe_appctx_state {
+ SPOE_APPCTX_ST_CONNECT = 0,
+ SPOE_APPCTX_ST_CONNECTING,
+ SPOE_APPCTX_ST_PROCESSING,
+ SPOE_APPCTX_ST_DISCONNECT,
+ SPOE_APPCTX_ST_DISCONNECTING,
+ SPOE_APPCTX_ST_EXIT,
+ SPOE_APPCTX_ST_END,
+};
+
+/* All supported SPOE actions */
+enum spoe_action_type {
+ SPOE_ACT_T_SET_VAR = 1,
+ SPOE_ACT_T_UNSET_VAR,
+ SPOE_ACT_TYPES,
+};
+
+/* All supported SPOE events */
+enum spoe_event {
+ SPOE_EV_NONE = 0,
+
+ /* Request events */
+ SPOE_EV_ON_CLIENT_SESS = 1,
+ SPOE_EV_ON_TCP_REQ_FE,
+ SPOE_EV_ON_TCP_REQ_BE,
+ SPOE_EV_ON_HTTP_REQ_FE,
+ SPOE_EV_ON_HTTP_REQ_BE,
+
+ /* Response events */
+ SPOE_EV_ON_SERVER_SESS,
+ SPOE_EV_ON_TCP_RSP,
+ SPOE_EV_ON_HTTP_RSP,
+
+ SPOE_EV_EVENTS
+};
+
+/* Errors triggerd by SPOE applet */
+enum spoe_frame_error {
+ SPOE_FRM_ERR_NONE = 0,
+ SPOE_FRM_ERR_IO,
+ SPOE_FRM_ERR_TOUT,
+ SPOE_FRM_ERR_TOO_BIG,
+ SPOE_FRM_ERR_INVALID,
+ SPOE_FRM_ERR_NO_VSN,
+ SPOE_FRM_ERR_NO_FRAME_SIZE,
+ SPOE_FRM_ERR_NO_CAP,
+ SPOE_FRM_ERR_BAD_VSN,
+ SPOE_FRM_ERR_BAD_FRAME_SIZE,
+ SPOE_FRM_ERR_UNKNOWN = 99,
+ SPOE_FRM_ERRS,
+};
+
+/* Scopes used for variables set by agents. It is a way to be agnotic to vars
+ * scope. */
+enum spoe_vars_scope {
+ SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
+ SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
+ SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
+ SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
+ SPOE_SCOPE_RES, /* <=> SCOPE_RES */
+};
+
+
+/* Describe an argument that will be linked to a message. It is a sample fetch,
+ * with an optional name. */
+struct spoe_arg {
+ char *name; /* Name of the argument, may be NULL */
+ unsigned int name_len; /* The name length, 0 if NULL */
+ struct sample_expr *expr; /* Sample expression */
+ struct list list; /* Used to chain SPOE args */
+};
+
+/* Used during the config parsing only because, when a SPOE agent section is
+ * parsed, messages can be undefined. */
+struct spoe_msg_placeholder {
+ char *id; /* SPOE message placeholder id */
+ struct list list; /* Use to chain SPOE message placeholders */
+};
+
+/* Describe a message that will be sent in a NOTIFY frame. A message has a name,
+ * an argument list (see above) and it is linked to a specific event. */
+struct spoe_message {
+ char *id; /* SPOE message id */
+ unsigned int id_len; /* The message id length */
+ struct spoe_agent *agent; /* SPOE agent owning this SPOE message */
+ struct {
+ char *file; /* file where the SPOE message appears */
+ int line; /* line where the SPOE message appears */
+ } conf; /* config information */
+ struct list args; /* Arguments added when the SPOE messages is sent */
+ struct list list; /* Used to chain SPOE messages */
+
+ enum spoe_event event; /* SPOE_EV_* */
+};
+
+/* Describe a SPOE agent. */
+struct spoe_agent {
+ char *id; /* SPOE agent id (name) */
+ struct {
+ char *file; /* file where the SPOE agent appears */
+ int line; /* line where the SPOE agent appears */
+ } conf; /* config information */
+ union {
+ struct proxy *be; /* Backend used by this agent */
+ char *name; /* Backend name used during conf parsing */
+ } b;
+ struct {
+ unsigned int hello; /* Max time to receive AGENT-HELLO frame */
+ unsigned int idle; /* Max Idle timeout */
+ unsigned int ack; /* Max time to acknowledge a NOTIFY frame */
+ } timeout;
+
+ char *var_pfx; /* Prefix used for vars set by the agent */
+
+ struct list cache; /* List used to cache SPOE streams. In
+ * fact, we cache the SPOE applect ctx */
+
+ struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent
+ * for each supported events */
+
+ struct list applet_wq; /* List of streams waiting for a SPOE applet */
+ unsigned int new_applets; /* The number of new SPOE applets */
+};
+
+/* SPOE filter configuration */
+struct spoe_config {
+ struct proxy *proxy; /* Proxy owning the filter */
+ struct spoe_agent *agent; /* Agent used by this filter */
+ struct proxy agent_fe; /* Agent frontend */
+};
+
+/* SPOE context attached to a stream. It is the main structure that handles the
+ * processing offload */
+struct spoe_context {
+ struct filter *filter; /* The SPOE filter */
+ struct stream *strm; /* The stream that should be offloaded */
+ struct appctx *appctx; /* The SPOE appctx */
+ struct list *messages; /* List of messages that will be sent during the stream processing */
+ struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */
+ struct list buffer_wait; /* position in the list of streams waiting for a buffer */
+ struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */
+
+ unsigned int errs; /* The number of errors to acquire a SPOE applet */
+
+ enum spoe_ctx_state state; /* SPOE_CTX_ST_* */
+ unsigned int flags; /* SPOE_CTX_FL_* */
+
+ unsigned int stream_id; /* stream_id and frame_id are used */
+ unsigned int frame_id; /* to map NOTIFY and ACK frames */
+
+};
+
+/* Set if the handle on SIGUSR1 is registered */
+static int sighandler_registered = 0;
+
+/* proxy used during the parsing */
+struct proxy *curproxy = NULL;
+
+/* The name of the SPOE engine, used during the parsing */
+char *curengine = NULL;
+
+/* SPOE agent used during the parsing */
+struct spoe_agent *curagent = NULL;
+
+/* SPOE message used during the parsing */
+struct spoe_message *curmsg = NULL;
+
+/* list of SPOE messages and placeholders used during the parsing */
+struct list curmsgs;
+struct list curmps;
+
+/* Pool used to allocate new SPOE contexts */
+static struct pool_head *pool2_spoe_ctx = NULL;
+
+/* Temporary variables used to ease error processing */
+int spoe_status_code = SPOE_FRM_ERR_NONE;
+char spoe_reason[256];
+
+struct flt_ops spoe_ops;
+
+static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx);
+static void on_new_spoe_appctx_failure(struct spoe_agent *agent);
+static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx);
+
+/********************************************************************
+ * helper functions/globals
+ ********************************************************************/
+static void
+release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp)
+{
+ if (!mp)
+ return;
+ free(mp->id);
+ free(mp);
+}
+
+
+static void
+release_spoe_message(struct spoe_message *msg)
+{
+ struct spoe_arg *arg, *back;
+
+ if (!msg)
+ return;
+ free(msg->id);
+ free(msg->conf.file);
+ list_for_each_entry_safe(arg, back, &msg->args, list) {
+ release_sample_expr(arg->expr);
+ free(arg->name);
+ LIST_DEL(&arg->list);
+ free(arg);
+ }
+ free(msg);
+}
+
+static void
+release_spoe_agent(struct spoe_agent *agent)
+{
+ struct spoe_message *msg, *back;
+ int i;
+
+ if (!agent)
+ return;
+ free(agent->id);
+ free(agent->conf.file);
+ free(agent->var_pfx);
+ for (i = 0; i < SPOE_EV_EVENTS; ++i) {
+ list_for_each_entry_safe(msg, back, &agent->messages[i], list) {
+ LIST_DEL(&msg->list);
+ release_spoe_message(msg);
+ }
+ }
+ free(agent);
+}
+
+static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
+ [SPOE_FRM_ERR_NONE] = "normal",
+ [SPOE_FRM_ERR_IO] = "I/O error",
+ [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
+ [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
+ [SPOE_FRM_ERR_INVALID] = "invalid frame received",
+ [SPOE_FRM_ERR_NO_VSN] = "version value not found",
+ [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
+ [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
+ [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
+ [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
+ [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
+};
+
+static const char *spoe_event_str[SPOE_EV_EVENTS] = {
+ [SPOE_EV_ON_CLIENT_SESS] = "on-client-session",
+ [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request",
+ [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request",
+ [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request",
+ [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request",
+
+ [SPOE_EV_ON_SERVER_SESS] = "on-server-session",
+ [SPOE_EV_ON_TCP_RSP] = "on-tcp-response",
+ [SPOE_EV_ON_HTTP_RSP] = "on-http-response",
+};
+
+
+#if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
+
+static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = {
+ [SPOE_CTX_ST_NONE] = "NONE",
+ [SPOE_CTX_ST_READY] = "READY",
+ [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS",
+ [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK",
+ [SPOE_CTX_ST_DONE] = "DONE",
+ [SPOE_CTX_ST_ERROR] = "ERROR",
+};
+
+static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = {
+ [SPOE_APPCTX_ST_CONNECT] = "CONNECT",
+ [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING",
+ [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING",
+ [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT",
+ [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING",
+ [SPOE_APPCTX_ST_EXIT] = "EXIT",
+ [SPOE_APPCTX_ST_END] = "END",
+};
+
+#endif
+/********************************************************************
+ * Functions that encode/decode SPOE frames
+ ********************************************************************/
+/* Frame Types sent by HAProxy and by agents */
+enum spoe_frame_type {
+ /* Frames sent by HAProxy */
+ SPOE_FRM_T_HAPROXY_HELLO = 1,
+ SPOE_FRM_T_HAPROXY_DISCON,
+ SPOE_FRM_T_HAPROXY_NOTIFY,
+
+ /* Frames sent by the agents */
+ SPOE_FRM_T_AGENT_HELLO = 101,
+ SPOE_FRM_T_AGENT_DISCON,
+ SPOE_FRM_T_AGENT_ACK
+};
+
+/* All supported data types */
+enum spoe_data_type {
+ SPOE_DATA_T_NULL = 0,
+ SPOE_DATA_T_BOOL,
+ SPOE_DATA_T_INT32,
+ SPOE_DATA_T_UINT32,
+ SPOE_DATA_T_INT64,
+ SPOE_DATA_T_UINT64,
+ SPOE_DATA_T_IPV4,
+ SPOE_DATA_T_IPV6,
+ SPOE_DATA_T_STR,
+ SPOE_DATA_T_BIN,
+ SPOE_DATA_TYPES
+};
+
+/* Masks to get data type or flags value */
+#define SPOE_DATA_T_MASK 0x0F
+#define SPOE_DATA_FL_MASK 0xF0
+
+/* Flags to set Boolean values */
+#define SPOE_DATA_FL_FALSE 0x00
+#define SPOE_DATA_FL_TRUE 0x10
+
+/* Helper to get static string length, excluding the terminating null byte */
+#define SLEN(str) (sizeof(str)-1)
+
+/* Predefined key used in HELLO/DISCONNECT frames */
+#define SUPPORTED_VERSIONS_KEY "supported-versions"
+#define VERSION_KEY "version"
+#define MAX_FRAME_SIZE_KEY "max-frame-size"
+#define CAPABILITIES_KEY "capabilities"
+#define STATUS_CODE_KEY "status-code"
+#define MSG_KEY "message"
+
+struct spoe_version {
+ char *str;
+ int min;
+ int max;
+};
+
+/* All supported versions */
+static struct spoe_version supported_versions[] = {
+ {"1.0", 1000, 1000},
+ {NULL, 0, 0}
+};
+
+/* Comma-separated list of supported versions */
+#define SUPPORTED_VERSIONS_VAL "1.0"
+
+/* Comma-separated list of supported capabilities (none for now) */
+#define CAPABILITIES_VAL ""
+
+static int
+decode_spoe_version(const char *str, size_t len)
+{
+ char tmp[len+1], *start, *end;
+ double d;
+ int vsn = -1;
+
+ memset(tmp, 0, len+1);
+ memcpy(tmp, str, len);
+
+ start = tmp;
+ while (isspace(*start))
+ start++;
+
+ d = strtod(start, &end);
+ if (d == 0 || start == end)
+ goto out;
+
+ if (*end) {
+ while (isspace(*end))
+ end++;
+ if (*end)
+ goto out;
+ }
+ vsn = (int)(d * 1000);
+ out:
+ return vsn;
+}
+
+/* Encode a variable-length integer. This function never fails and returns the
+ * number of written bytes. */
+static int
+encode_spoe_varint(uint64_t i, char *buf)
+{
+ int idx;
+
+ if (i < 240) {
+ buf[0] = (unsigned char)i;
+ return 1;
+ }
+
+ buf[0] = (unsigned char)i | 240;
+ i = (i - 240) >> 4;
+ for (idx = 1; i >= 128; ++idx) {
+ buf[idx] = (unsigned char)i | 128;
+ i = (i - 128) >> 7;
+ }
+ buf[idx++] = (unsigned char)i;
+ return idx;
+}
+
+/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
+ * happens when the buffer's end in reached. On success, the number of read
+ * bytes is returned. */
+static int
+decode_spoe_varint(const char *buf, const char *end, uint64_t *i)
+{
+ unsigned char *msg = (unsigned char *)buf;
+ int idx = 0;
+
+ if (msg > (unsigned char *)end)
+ return -1;
+
+ if (msg[0] < 240) {
+ *i = msg[0];
+ return 1;
+ }
+ *i = msg[0];
+ do {
+ ++idx;
+ if (msg+idx > (unsigned char *)end)
+ return -1;
+ *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
+ } while (msg[idx] >= 128);
+ return (idx + 1);
+}
+
+/* Encode a string. The string will be prefix by its length, encoded as a
+ * variable-length integer. This function never fails and returns the number of
+ * written bytes. */
+static int
+encode_spoe_string(const char *str, size_t len, char *dst)
+{
+ int idx = 0;
+
+ if (!len) {
+ dst[0] = 0;
+ return 1;
+ }
+
+ idx += encode_spoe_varint(len, dst);
+ memcpy(dst+idx, str, len);
+ return (idx + len);
+}
+
+/* Decode a string. Its length is decoded first as a variable-length integer. If
+ * it succeeds, and if the string length is valid, the begin of the string is
+ * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
+ * read is returned. If an error occurred, -1 is returned and <*str> remains
+ * NULL. */
+static int
+decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
+{
+ int i, idx = 0;
+
+ *str = NULL;
+ *len = 0;
+
+ if ((i = decode_spoe_varint(buf, end, len)) == -1)
+ goto error;
+ idx += i;
+ if (buf + idx + *len > end)
+ goto error;
+
+ *str = buf+idx;
+ return (idx + *len);
+
+ error:
+ return -1;
+}
+
+/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
+ * of bytes read is returned. A types data is composed of a type (1 byte) and
+ * corresponding data:
+ * - boolean: non additional data (0 bytes)
+ * - integers: a variable-length integer (see decode_spoe_varint)
+ * - ipv4: 4 bytes
+ * - ipv6: 16 bytes
+ * - binary and string: a buffer prefixed by its size, a variable-length
+ * integer (see decode_spoe_string) */
+static int
+skip_spoe_data(char *frame, char *end)
+{
+ uint64_t sz = 0;
+ int i, idx = 0;
+
+ if (frame > end)
+ return -1;
+
+ switch (frame[idx++] & SPOE_DATA_T_MASK) {
+ case SPOE_DATA_T_BOOL:
+ break;
+ case SPOE_DATA_T_INT32:
+ case SPOE_DATA_T_INT64:
+ case SPOE_DATA_T_UINT32:
+ case SPOE_DATA_T_UINT64:
+ if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+ return -1;
+ idx += i;
+ break;
+ case SPOE_DATA_T_IPV4:
+ idx += 4;
+ break;
+ case SPOE_DATA_T_IPV6:
+ idx += 16;
+ break;
+ case SPOE_DATA_T_STR:
+ case SPOE_DATA_T_BIN:
+ if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+ return -1;
+ idx += i + sz;
+ break;
+ }
+
+ if (frame+idx > end)
+ return -1;
+ return idx;
+}
+
+/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
+ * number of read bytes is returned. See skip_spoe_data for details. */
+static int
+decode_spoe_data(char *frame, char *end, struct sample *smp)
+{
+ uint64_t sz = 0;
+ int type, i, idx = 0;
+
+ if (frame > end)
+ return -1;
+
+ type = frame[idx++];
+ switch (type & SPOE_DATA_T_MASK) {
+ case SPOE_DATA_T_BOOL:
+ smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
+ smp->data.type = SMP_T_BOOL;
+ break;
+ case SPOE_DATA_T_INT32:
+ case SPOE_DATA_T_INT64:
+ case SPOE_DATA_T_UINT32:
+ case SPOE_DATA_T_UINT64:
+ if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1)
+ return -1;
+ idx += i;
+ smp->data.type = SMP_T_SINT;
+ break;
+ case SPOE_DATA_T_IPV4:
+ if (frame+idx+4 > end)
+ return -1;
+ memcpy(&smp->data.u.ipv4, frame+idx, 4);
+ smp->data.type = SMP_T_IPV4;
+ idx += 4;
+ break;
+ case SPOE_DATA_T_IPV6:
+ if (frame+idx+16 > end)
+ return -1;
+ memcpy(&smp->data.u.ipv6, frame+idx, 16);
+ smp->data.type = SMP_T_IPV6;
+ idx += 16;
+ break;
+ case SPOE_DATA_T_STR:
+ if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+ return -1;
+ idx += i;
+ if (frame+idx+sz > end)
+ return -1;
+ smp->data.u.str.str = frame+idx;
+ smp->data.u.str.len = sz;
+ smp->data.type = SMP_T_STR;
+ idx += sz;
+ break;
+ case SPOE_DATA_T_BIN:
+ if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1)
+ return -1;
+ idx += i;
+ if (frame+idx+sz > end)
+ return -1;
+ smp->data.u.str.str = frame+idx;
+ smp->data.u.str.len = sz;
+ smp->data.type = SMP_T_BIN;
+ idx += sz;
+ break;
+ }
+
+ if (frame+idx > end)
+ return -1;
+ return idx;
+}
+
+/* Skip an action in a frame received from an agent. If an error occurred, -1 is
+ * returned, otherwise the number of read bytes is returned. An action is
+ * composed of the action type followed by a typed data. */
+static int
+skip_spoe_action(char *frame, char *end)
+{
+ int n, i, idx = 0;
+
+ if (frame+2 > end)
+ return -1;
+
+ idx++; /* Skip the action type */
+ n = frame[idx++];
+ while (n-- > 0) {
+ if ((i = skip_spoe_data(frame+idx, end)) == -1)
+ return -1;
+ idx += i;
+ }
+
+ if (frame+idx > end)
+ return -1;
+ return idx;
+}
+
+/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on
+ * success, 0 if the frame can be ignored and -1 if an error occurred. */
+static int
+prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size)
+{
+ int idx = 0;
+ size_t max = (7 /* TYPE + METADATA */
+ + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL)
+ + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4
+ + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL));
+
+ if (size < max)
+ return -1;
+
+ /* Frame type */
+ frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO;
+
+ /* No flags for now */
+ memset(frame+idx, 0, 4);
+ idx += 4;
+
+ /* No stream-id and frame-id for HELLO frames */
+ frame[idx++] = 0;
+ frame[idx++] = 0;
+
+ /* There are 3 mandatory items: "supported-versions", "max-frame-size"
+ * and "capabilities" */
+
+ /* "supported-versions" K/V item */
+ idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx);
+ frame[idx++] = SPOE_DATA_T_STR;
+ idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx);
+
+ /* "max-fram-size" K/V item */
+ idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx);
+ frame[idx++] = SPOE_DATA_T_UINT32;
+ idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx);
+
+ /* "capabilities" K/V item */
+ idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx);
+ frame[idx++] = SPOE_DATA_T_STR;
+ idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx);
+
+ return idx;
+}
+
+/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame
+ * size on success, 0 if the frame can be ignored and -1 if an error
+ * occurred. */
+static int
+prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size)
+{
+ const char *reason;
+ int rlen, idx = 0;
+ size_t max = (7 /* TYPE + METADATA */
+ + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2
+ + 1 + SLEN(MSG_KEY) + 1 + 2 + 255);
+
+ if (size < max)
+ return -1;
+
+ /* Get the message corresponding to the status code */
+ if (spoe_status_code >= SPOE_FRM_ERRS)
+ spoe_status_code = SPOE_FRM_ERR_UNKNOWN;
+ reason = spoe_frm_err_reasons[spoe_status_code];
+ rlen = strlen(reason);
+
+ /* Frame type */
+ frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON;
+
+ /* No flags for now */
+ memset(frame+idx, 0, 4);
+ idx += 4;
+
+ /* No stream-id and frame-id for DISCONNECT frames */
+ frame[idx++] = 0;
+ frame[idx++] = 0;
+
+ /* There are 2 mandatory items: "status-code" and "message" */
+
+ /* "status-code" K/V item */
+ idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx);
+ frame[idx++] = SPOE_DATA_T_UINT32;
+ idx += encode_spoe_varint(spoe_status_code, frame+idx);
+
+ /* "message" K/V item */
+ idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx);
+ frame[idx++] = SPOE_DATA_T_STR;
+ idx += encode_spoe_string(reason, rlen, frame+idx);
+
+ return idx;
+}
+
+/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on
+ * success, 0 if the frame can be ignored and -1 if an error occurred. */
+static int
+prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size)
+{
+ struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
+ int idx = 0;
+
+ if (size < APPCTX_SPOE(appctx).max_frame_size)
+ return -1;
+
+ frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY;
+
+ /* No flags for now */
+ memset(frame+idx, 0, 4);
+ idx += 4;
+
+ /* Set stream-id and frame-id */
+ idx += encode_spoe_varint(ctx->stream_id, frame+idx);
+ idx += encode_spoe_varint(ctx->frame_id, frame+idx);
+
+ /* Copy encoded messages */
+ memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i);
+ idx += ctx->buffer->i;
+
+ return idx;
+}
+
+/* Decode HELLO frame sent by an agent. It returns the number of by read bytes
+ * on success, 0 if the frame can be ignored and -1 if an error occurred. */
+static int
+handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size)
+{
+ int vsn, max_frame_size;
+ int i, idx = 0;
+ size_t min_size = (7 /* TYPE + METADATA */
+ + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3
+ + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1
+ + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0);
+
+ /* Check frame type */
+ if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO)
+ return 0;
+
+ if (size < min_size) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+
+ /* Skip flags: fragmentation is not supported for now */
+ idx += 4;
+
+ /* stream-id and frame-id must be cleared */
+ if (frame[idx] != 0 || frame[idx+1] != 0) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ idx += 2;
+
+ /* There are 3 mandatory items: "version", "max-frame-size" and
+ * "capabilities" */
+
+ /* Loop on K/V items */
+ vsn = max_frame_size = 0;
+ while (idx < size) {
+ char *str;
+ uint64_t sz;
+
+ /* Decode the item key */
+ idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
+ if (str == NULL) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ /* Check "version" K/V item */
+ if (!memcmp(str, VERSION_KEY, sz)) {
+ /* The value must be a string */
+ if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
+ if (str == NULL) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+
+ vsn = decode_spoe_version(str, sz);
+ if (vsn == -1) {
+ spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
+ return -1;
+ }
+ for (i = 0; supported_versions[i].str != NULL; ++i) {
+ if (vsn >= supported_versions[i].min &&
+ vsn <= supported_versions[i].max)
+ break;
+ }
+ if (supported_versions[i].str == NULL) {
+ spoe_status_code = SPOE_FRM_ERR_BAD_VSN;
+ return -1;
+ }
+ }
+ /* Check "max-frame-size" K/V item */
+ else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) {
+ int type;
+
+ /* The value must be integer */
+ type = frame[idx++];
+ if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
+ (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
+ (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
+ (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ idx += i;
+ if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) {
+ spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE;
+ return -1;
+ }
+ max_frame_size = sz;
+ }
+ /* Skip "capabilities" K/V item for now */
+ else {
+ /* Silently ignore unknown item */
+ if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ idx += i;
+ }
+ }
+
+ /* Final checks */
+ if (!vsn) {
+ spoe_status_code = SPOE_FRM_ERR_NO_VSN;
+ return -1;
+ }
+ if (!max_frame_size) {
+ spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE;
+ return -1;
+ }
+
+ APPCTX_SPOE(appctx).version = (unsigned int)vsn;
+ APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size;
+ return idx;
+}
+
+/* Decode DISCONNECT frame sent by an agent. It returns the number of by read
+ * bytes on success, 0 if the frame can be ignored and -1 if an error
+ * occurred. */
+static int
+handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size)
+{
+ int i, idx = 0;
+ size_t min_size = (7 /* TYPE + METADATA */
+ + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1
+ + 1 + SLEN(MSG_KEY) + 1 + 1);
+
+ /* Check frame type */
+ if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON)
+ return 0;
+
+ if (size < min_size) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+
+ /* Skip flags: fragmentation is not supported for now */
+ idx += 4;
+
+ /* stream-id and frame-id must be cleared */
+ if (frame[idx] != 0 || frame[idx+1] != 0) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ idx += 2;
+
+ /* There are 2 mandatory items: "status-code" and "message" */
+
+ /* Loop on K/V items */
+ while (idx < size) {
+ char *str;
+ uint64_t sz;
+
+ /* Decode the item key */
+ idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
+ if (str == NULL) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+
+ /* Check "status-code" K/V item */
+ if (!memcmp(str, STATUS_CODE_KEY, sz)) {
+ int type;
+
+ /* The value must be an integer */
+ type = frame[idx++];
+ if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
+ (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
+ (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
+ (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ idx += i;
+ spoe_status_code = sz;
+ }
+
+ /* Check "message" K/V item */
+ else if (sz && !memcmp(str, MSG_KEY, sz)) {
+ /* The value must be a string */
+ if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ idx += decode_spoe_string(frame+idx, frame+size, &str, &sz);
+ if (str == NULL || sz > 255) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ memcpy(spoe_reason, str, sz);
+ spoe_reason[sz] = 0;
+ }
+ else {
+ /* Silently ignore unknown item */
+ if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+ idx += i;
+ }
+ }
+
+ return idx;
+}
+
+
+/* Decode ACK frame sent by an agent. It returns the number of by read bytes on
+ * success, 0 if the frame can be ignored and -1 if an error occurred. */
+static int
+handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size)
+{
+ struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
+ uint64_t stream_id, frame_id;
+ int idx = 0;
+ size_t min_size = (7 /* TYPE + METADATA */);
+
+ /* Check frame type */
+ if (frame[idx++] != SPOE_FRM_T_AGENT_ACK)
+ return 0;
+
+ if (size < min_size) {
+ spoe_status_code = SPOE_FRM_ERR_INVALID;
+ return -1;
+ }
+
+ /* Skip flags: fragmentation is not supported for now */
+ idx += 4;
+
+ /* Get the stream-id and the frame-id */
+ idx += decode_spoe_varint(frame+idx, frame+size, &stream_id);
+ idx += decode_spoe_varint(frame+idx, frame+size, &frame_id);
+
+ /* Check stream-id and frame-id */
+ if (ctx->stream_id != (unsigned int)stream_id ||
+ ctx->frame_id != (unsigned int)frame_id)
+ return 0;
+
+ /* Copy encoded actions */
+ b_reset(ctx->buffer);
+ memcpy(ctx->buffer->p, frame+idx, size-idx);
+ ctx->buffer->i = size-idx;
+
+ return idx;
+}
+
+
+/********************************************************************
+ * Functions that manage the SPOE applet
+ ********************************************************************/
+/* Callback function that catches applet timeouts. If a timeout occurred, we set
+ * <appctx->st1> flag and the SPOE applet is woken up. */
+static struct task *
+process_spoe_applet(struct task * task)
+{
+ struct appctx *appctx = task->context;
+
+ appctx->st1 = SPOE_APPCTX_ERR_NONE;
+ if (tick_is_expired(task->expire, now_ms)) {
+ task->expire = TICK_ETERNITY;
+ appctx->st1 = SPOE_APPCTX_ERR_TOUT;
+ }
+ si_applet_want_get(appctx->owner);
+ appctx_wakeup(appctx);
+ return task;
+}
+
+/* Remove a SPOE applet from the agent cache */
+static void
+remove_spoe_applet_from_cache(struct appctx *appctx)
+{
+ struct appctx *a, *back;
+ struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
+
+ if (LIST_ISEMPTY(&agent->cache))
+ return;
+
+ list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) {
+ if (a == appctx) {
+ LIST_DEL(&APPCTX_SPOE(appctx).list);
+ break;
+ }
+ }
+}
+
+
+/* Callback function that releases a SPOE applet. This happens when the
+ * connection with the agent is closed. */
+static void
+release_spoe_applet(struct appctx *appctx)
+{
+ struct stream_interface *si = appctx->owner;
+ struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
+ struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
+
+ if (appctx->st0 == SPOE_APPCTX_ST_CONNECT ||
+ appctx->st0 == SPOE_APPCTX_ST_CONNECTING)
+ on_new_spoe_appctx_failure(agent);
+
+ if (appctx->st0 != SPOE_APPCTX_ST_END) {
+ si_shutw(si);
+ si_shutr(si);
+ si_ic(si)->flags |= CF_READ_NULL;
+ appctx->st0 = SPOE_APPCTX_ST_END;
+ }
+
+ if (ctx != NULL) {
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ ctx->appctx = NULL;
+ }
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, appctx);
+
+ /* Release the task attached to the SPOE applet */
+ if (APPCTX_SPOE(appctx).task) {
+ task_delete(APPCTX_SPOE(appctx).task);
+ task_free(APPCTX_SPOE(appctx).task);
+ }
+
+ /* And remove it from the agent cache */
+ remove_spoe_applet_from_cache(appctx);
+ APPCTX_SPOE(appctx).ctx = NULL;
+}
+
+/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when
+ * the frame can be ignored, 0 to retry later and 1 on success. The frame is
+ * encoded using the callback function <prepare>. */
+static int
+send_spoe_frame(struct appctx *appctx,
+ int (*prepare)(struct appctx *, char *, size_t))
+{
+ struct stream_interface *si = appctx->owner;
+ int framesz, ret;
+ uint32_t netint;
+
+ ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size);
+ if (ret <= 0)
+ goto skip_or_error;
+ framesz = ret;
+ netint = htonl(framesz);
+ ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint));
+ if (ret > 0)
+ ret = bi_putblk(si_ic(si), trash.str, framesz);
+ if (ret <= 0) {
+ if (ret == -1)
+ return -1;
+ return -2;
+ }
+ return 1;
+
+ skip_or_error:
+ if (!ret)
+ return -1;
+ return -2;
+}
+
+/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1
+ * when the frame can be ignored, 0 to retry later and 1 on success. The frame
+ * is decoded using the callback function <handle>. */
+static int
+recv_spoe_frame(struct appctx *appctx,
+ int (*handle)(struct appctx *, char *, size_t))
+{
+ struct stream_interface *si = appctx->owner;
+ int framesz, ret;
+ uint32_t netint;
+
+ ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0);
+ if (ret <= 0)
+ goto empty_or_error;
+ framesz = ntohl(netint);
+ if (framesz > APPCTX_SPOE(appctx).max_frame_size) {
+ spoe_status_code = SPOE_FRM_ERR_TOO_BIG;
+ return -2;
+ }
+
+ ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint));
+ if (ret <= 0)
+ goto empty_or_error;
+ bo_skip(si_oc(si), ret+sizeof(netint));
+
+ /* First check if the received frame is a DISCONNECT frame */
+ ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz);
+ if (ret != 0) {
+ if (ret > 0) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - disconnected by peer (%d): %s\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
+ __FUNCTION__, appctx, spoe_status_code,
+ spoe_reason);
+ return 2;
+ }
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - error on frame (%s)\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
+ __FUNCTION__, appctx,
+ spoe_frm_err_reasons[spoe_status_code]);
+ return -2;
+ }
+ if (handle == NULL)
+ goto out;
+
+ /* If not, try to decode it */
+ ret = handle(appctx, trash.str, framesz);
+ if (ret <= 0) {
+ if (!ret)
+ return -1;
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - error on frame (%s)\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
+ __FUNCTION__, appctx,
+ spoe_frm_err_reasons[spoe_status_code]);
+ return -2;
+ }
+ out:
+ return 1;
+
+ empty_or_error:
+ if (!ret)
+ return 0;
+ spoe_status_code = SPOE_FRM_ERR_IO;
+ return -2;
+}
+
+/* I/O Handler processing messages exchanged with the agent */
+static void
+handle_spoe_applet(struct appctx *appctx)
+{
+ struct stream_interface *si = appctx->owner;
+ struct stream *s = si_strm(si);
+ struct spoe_agent *agent = APPCTX_SPOE(appctx).agent;
+ struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx;
+ int ret;
+
+ switchstate:
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - appctx-state=%s\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]);
+
+ switch (appctx->st0) {
+ case SPOE_APPCTX_ST_CONNECT:
+ spoe_status_code = SPOE_FRM_ERR_NONE;
+ if (si->state <= SI_ST_CON) {
+ si_applet_want_put(si);
+ task_wakeup(s->task, TASK_WOKEN_MSG);
+ break;
+ }
+ else if (si->state != SI_ST_EST) {
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ on_new_spoe_appctx_failure(agent);
+ goto switchstate;
+ }
+ ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame);
+ if (ret < 0) {
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ on_new_spoe_appctx_failure(agent);
+ goto switchstate;
+ }
+ else if (!ret)
+ goto full;
+
+ /* Hello frame was sent. Set the hello timeout and
+ * wait for the reply. */
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello);
+ appctx->st0 = SPOE_APPCTX_ST_CONNECTING;
+ /* fall through */
+
+ case SPOE_APPCTX_ST_CONNECTING:
+ if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ on_new_spoe_appctx_failure(agent);
+ goto switchstate;
+ }
+ if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - Connection timed out\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
+ __FUNCTION__, appctx);
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ on_new_spoe_appctx_failure(agent);
+ goto switchstate;
+ }
+ ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame);
+ if (ret < 0) {
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ on_new_spoe_appctx_failure(agent);
+ goto switchstate;
+ }
+ if (ret == 2) {
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ on_new_spoe_appctx_failure(agent);
+ goto switchstate;
+ }
+ if (!ret)
+ goto out;
+
+ /* hello handshake is finished, set the idle timeout,
+ * Add the appctx in the agent cache, decrease the
+ * number of new applets and wake up waiting streams. */
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
+ on_new_spoe_appctx_success(agent, appctx);
+ break;
+
+ case SPOE_APPCTX_ST_PROCESSING:
+ if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ goto switchstate;
+ }
+ if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+ spoe_status_code = SPOE_FRM_ERR_TOUT;
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ appctx->st1 = SPOE_APPCTX_ERR_NONE;
+ goto switchstate;
+ }
+ if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
+ ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame);
+ if (ret < 0) {
+ if (ret == -1) {
+ ctx->state = SPOE_CTX_ST_ERROR;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ goto skip_notify_frame;
+ }
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ goto switchstate;
+ }
+ else if (!ret)
+ goto full;
+ ctx->state = SPOE_CTX_ST_WAITING_ACK;
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.ack);
+ }
+
+ skip_notify_frame:
+ if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) {
+ ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame);
+ if (ret < 0) {
+ if (ret == -1)
+ goto skip_notify_frame;
+ ctx->state = SPOE_CTX_ST_ERROR;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ goto switchstate;
+ }
+ if (!ret)
+ goto out;
+ if (ret == 2) {
+ ctx->state = SPOE_CTX_ST_ERROR;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ goto switchstate;
+ }
+ ctx->state = SPOE_CTX_ST_DONE;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ }
+ else {
+ if (stopping) {
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ goto switchstate;
+ }
+
+ ret = recv_spoe_frame(appctx, NULL);
+ if (ret < 0) {
+ if (ret == -1)
+ goto skip_notify_frame;
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
+ goto switchstate;
+ }
+ if (!ret)
+ goto out;
+ if (ret == 2) {
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ goto switchstate;
+ }
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
+ }
+ break;
+
+ case SPOE_APPCTX_ST_DISCONNECT:
+ ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame);
+ if (ret < 0) {
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ goto switchstate;
+ }
+ else if (!ret)
+ goto full;
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
+ " - disconnected by HAProxy (%d): %s\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id,
+ __FUNCTION__, appctx, spoe_status_code,
+ spoe_frm_err_reasons[spoe_status_code]);
+
+ APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.ack);
+ appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+ /* fall through */
+
+ case SPOE_APPCTX_ST_DISCONNECTING:
+ if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) {
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ goto switchstate;
+ }
+ if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) {
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ goto switchstate;
+ }
+ ret = recv_spoe_frame(appctx, NULL);
+ if (ret < 0 || ret == 2) {
+ appctx->st0 = SPOE_APPCTX_ST_EXIT;
+ goto switchstate;
+ }
+ break;
+
+ case SPOE_APPCTX_ST_EXIT:
+ si_shutw(si);
+ si_shutr(si);
+ si_ic(si)->flags |= CF_READ_NULL;
+ appctx->st0 = SPOE_APPCTX_ST_END;
+ APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
+ /* fall through */
+
+ case SPOE_APPCTX_ST_END:
+ break;
+ }
+
+ out:
+ if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY)
+ task_queue(APPCTX_SPOE(appctx).task);
+ si_oc(si)->flags |= CF_READ_DONTWAIT;
+ task_wakeup(si_strm(si)->task, TASK_WOKEN_IO);
+ return;
+ full:
+ si_applet_cant_put(si);
+ goto out;
+}
+
+struct applet spoe_applet = {
+ .obj_type = OBJ_TYPE_APPLET,
+ .name = "<SPOE>", /* used for logging */
+ .fct = handle_spoe_applet,
+ .release = release_spoe_applet,
+};
+
+/* Create a SPOE applet. On success, the created applet is returned, else
+ * NULL. */
+static struct appctx *
+create_spoe_appctx(struct spoe_config *conf)
+{
+ struct appctx *appctx;
+ struct session *sess;
+ struct task *task;
+ struct stream *strm;
+ struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
+ struct listener *, by_fe);
+
+ if ((appctx = appctx_new(&spoe_applet)) == NULL)
+ goto out_error;
+
+ appctx->st0 = SPOE_APPCTX_ST_CONNECT;
+ if ((APPCTX_SPOE(appctx).task = task_new()) == NULL)
+ goto out_free_appctx;
+ APPCTX_SPOE(appctx).task->process = process_spoe_applet;
+ APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY;
+ APPCTX_SPOE(appctx).task->context = appctx;
+ APPCTX_SPOE(appctx).agent = conf->agent;
+ APPCTX_SPOE(appctx).ctx = NULL;
+ APPCTX_SPOE(appctx).version = 0;
+ APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize;
+ task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT);
+
+ sess = session_new(&conf->agent_fe, l, &appctx->obj_type);
+ if (!sess)
+ goto out_free_spoe;
+
+ if ((task = task_new()) == NULL)
+ goto out_free_sess;
+
+ if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL)
+ goto out_free_task;
+
+ strm->target = sess->listener->default_target;
+ strm->req.analysers |= sess->listener->analysers;
+ stream_set_backend(strm, conf->agent->b.be);
+
+ /* applet is waiting for data */
+ si_applet_cant_get(&strm->si[0]);
+ appctx_wakeup(appctx);
+
+ /* Increase the number of applets waiting the end of the hello
+ * handshake. */
+ conf->agent->new_applets++;
+
+ strm->do_log = NULL;
+ strm->res.flags |= CF_READ_DONTWAIT;
+
+ conf->agent_fe.feconn++;
+ jobs++;
+ totalconn++;
+
+ return appctx;
+
+ /* Error unrolling */
+ out_free_task:
+ task_free(task);
+ out_free_sess:
+ session_free(sess);
+ out_free_spoe:
+ task_free(APPCTX_SPOE(appctx).task);
+ out_free_appctx:
+ appctx_free(appctx);
+ out_error:
+ return NULL;
+}
+
+/* Wake up a SPOE applet attached to a SPOE context. */
+static void
+wakeup_spoe_appctx(struct spoe_context *ctx)
+{
+ if (ctx->appctx == NULL)
+ return;
+ if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) {
+ si_applet_want_get(ctx->appctx->owner);
+ si_applet_want_put(ctx->appctx->owner);
+ appctx_wakeup(ctx->appctx);
+ }
+}
+
+
+/* Run across the list of pending streams waiting for a SPOE applet and wake the
+ * first. */
+static void
+offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx)
+{
+ struct spoe_context *ctx;
+
+ if (LIST_ISEMPTY(&agent->applet_wq))
+ LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list);
+ else {
+ ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait);
+ APPCTX_SPOE(appctx).ctx = ctx;
+ ctx->appctx = appctx;
+ LIST_DEL(&ctx->applet_wait);
+ LIST_INIT(&ctx->applet_wait);
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - wake up stream to get available SPOE applet\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, ctx->strm);
+ }
+}
+
+/* A failure occurred during SPOE applet creation. */
+static void
+on_new_spoe_appctx_failure(struct spoe_agent *agent)
+{
+ struct spoe_context *ctx;
+
+ agent->new_applets--;
+ list_for_each_entry(ctx, &agent->applet_wq, applet_wait) {
+ ctx->errs++;
+ task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - wake up stream because to SPOE applet connection failed\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, ctx->strm);
+ }
+}
+
+static void
+on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx)
+{
+ agent->new_applets--;
+ offer_spoe_appctx(agent, appctx);
+}
+/* Retrieve a SPOE applet from the agent cache if possible, else create it. It
+ * returns 1 on success, 0 to retry later and -1 if an error occurred. */
+static int
+acquire_spoe_appctx(struct spoe_context *ctx, int dir)
+{
+ struct spoe_config *conf = FLT_CONF(ctx->filter);
+ struct spoe_agent *agent = conf->agent;
+ struct appctx *appctx;
+
+ /* If a process is already started for this SPOE context, retry
+ * later. */
+ if (ctx->flags & SPOE_CTX_FL_PROCESS)
+ goto wait;
+
+ /* If needed, initialize the buffer that will be used to encode messages
+ * and decode actions. */
+ if (ctx->buffer == &buf_empty) {
+ if (!LIST_ISEMPTY(&ctx->buffer_wait)) {
+ LIST_DEL(&ctx->buffer_wait);
+ LIST_INIT(&ctx->buffer_wait);
+ }
+
+ if (!b_alloc_margin(&ctx->buffer, 0)) {
+ LIST_ADDQ(&buffer_wq, &ctx->buffer_wait);
+ goto wait;
+ }
+ }
+
+ /* If the SPOE applet was already set, all is done. */
+ if (ctx->appctx)
+ goto success;
+
+ /* Else try to retrieve it from the agent cache */
+ if (!LIST_ISEMPTY(&agent->cache)) {
+ appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list);
+ LIST_DEL(&APPCTX_SPOE(appctx).list);
+ APPCTX_SPOE(appctx).ctx = ctx;
+ ctx->appctx = appctx;
+ goto success;
+ }
+
+ /* If there is no server up for the agent's backend or it too many
+ * failure occurred, this is an error. */
+ if ((!agent->b.be->srv_act && !agent->b.be->srv_bck) ||
+ ctx->errs >= MAX_NEW_SPOE_APPLET_ERRS)
+ goto error;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - waiting for available SPOE appctx\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
+ ctx->strm);
+
+ /* Else add the stream in the waiting queue. */
+ if (LIST_ISEMPTY(&ctx->applet_wait))
+ LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait);
+
+ /* Finally, create new SPOE applet if we can */
+ if (agent->new_applets < MAX_NEW_SPOE_APPLETS) {
+ if (create_spoe_appctx(conf) == NULL)
+ goto error;
+ }
+
+ wait:
+ return 0;
+
+ success:
+ /* Remove the stream from the waiting queue */
+ if (!LIST_ISEMPTY(&ctx->applet_wait)) {
+ LIST_DEL(&ctx->applet_wait);
+ LIST_INIT(&ctx->applet_wait);
+ }
+
+ /* Set the right flag to prevent request and response processing
+ * in same time. */
+ ctx->flags |= ((dir == SMP_OPT_DIR_REQ)
+ ? SPOE_CTX_FL_REQ_PROCESS
+ : SPOE_CTX_FL_RSP_PROCESS);
+ ctx->errs = 0;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - acquire SPOE appctx %p from cache\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, ctx->strm, ctx->appctx);
+ return 1;
+
+ error:
+ /* Remove the stream from the waiting queue */
+ if (!LIST_ISEMPTY(&ctx->applet_wait)) {
+ LIST_DEL(&ctx->applet_wait);
+ LIST_INIT(&ctx->applet_wait);
+ }
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - failed to acquire SPOE appctx errs=%u\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, ctx->strm, ctx->errs);
+ send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n");
+
+ return -1;
+}
+
+/* Release a SPOE applet and push it in the agent cache. */
+static void
+release_spoe_appctx(struct spoe_context *ctx)
+{
+ struct spoe_config *conf = FLT_CONF(ctx->filter);
+ struct spoe_agent *agent = conf->agent;
+ struct appctx *appctx = ctx->appctx;
+
+ /* Reset the flag to allow next processing */
+ ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+
+ /* Release the buffer if needed */
+ if (ctx->buffer != &buf_empty) {
+ b_free(&ctx->buffer);
+ if (!LIST_ISEMPTY(&buffer_wq))
+ stream_offer_buffers();
+ }
+
+ /* If there is no SPOE applet, all is done */
+ if (!appctx)
+ return;
+
+ /* Else, reassign it or push it in the agent cache */
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - release SPOE appctx %p\n",
+ (int)now.tv_sec, (int)now.tv_usec, agent->id,
+ __FUNCTION__, ctx->strm, appctx);
+
+ APPCTX_SPOE(appctx).ctx = NULL;
+ ctx->appctx = NULL;
+ offer_spoe_appctx(agent, appctx);
+}
+
+/***************************************************************************
+ * Functions that process SPOE messages and actions
+ **************************************************************************/
+/* Process SPOE messages for a specific event. During the processing, it returns
+ * 0 and it returns 1 when the processing is finished. If an error occurred, -1
+ * is returned. */
+static int
+process_spoe_messages(struct stream *s, struct spoe_context *ctx,
+ struct list *messages, int dir)
+{
+ struct spoe_message *msg;
+ struct sample *smp;
+ struct spoe_arg *arg;
+ char *p;
+ size_t max_size;
+ int off, flag, idx = 0;
+
+ /* Reserve 32 bytes from the frame Metadata */
+ max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32;
+
+ b_reset(ctx->buffer);
+ p = ctx->buffer->p;
+
+ /* Loop on messages */
+ list_for_each_entry(msg, messages, list) {
+ if (idx + msg->id_len + 1 > max_size)
+ goto skip;
+
+ /* Set the message name */
+ idx += encode_spoe_string(msg->id, msg->id_len, p+idx);
+
+ /* Save offset where to store the number of arguments for this
+ * message */
+ off = idx++;
+ p[off] = 0;
+
+ /* Loop on arguments */
+ list_for_each_entry(arg, &msg->args, list) {
+ p[off]++; /* Increment the number of arguments */
+
+ if (idx + arg->name_len + 1 > max_size)
+ goto skip;
+
+ /* Encode the arguement name as a string. It can by NULL */
+ idx += encode_spoe_string(arg->name, arg->name_len, p+idx);
+
+ /* Fetch the arguement value */
+ smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL);
+ if (!smp) {
+ /* If no value is available, set it to NULL */
+ p[idx++] = SPOE_DATA_T_NULL;
+ continue;
+ }
+
+ /* Else, encode the arguement value */
+ switch (smp->data.type) {
+ case SMP_T_BOOL:
+ flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE);
+ p[idx++] = (SPOE_DATA_T_BOOL | flag);
+ break;
+ case SMP_T_SINT:
+ p[idx++] = SPOE_DATA_T_INT64;
+ if (idx + 8 > max_size)
+ goto skip;
+ idx += encode_spoe_varint(smp->data.u.sint, p+idx);
+ break;
+ case SMP_T_IPV4:
+ p[idx++] = SPOE_DATA_T_IPV4;
+ if (idx + 4 > max_size)
+ goto skip;
+ memcpy(p+idx, &smp->data.u.ipv4, 4);
+ idx += 4;
+ break;
+ case SMP_T_IPV6:
+ p[idx++] = SPOE_DATA_T_IPV6;
+ if (idx + 16 > max_size)
+ goto skip;
+ memcpy(p+idx, &smp->data.u.ipv6, 16);
+ idx += 16;
+ break;
+ case SMP_T_STR:
+ p[idx++] = SPOE_DATA_T_STR;
+ if (idx + smp->data.u.str.len > max_size)
+ goto skip;
+ idx += encode_spoe_string(smp->data.u.str.str,
+ smp->data.u.str.len,
+ p+idx);
+ break;
+ case SMP_T_BIN:
+ p[idx++] = SPOE_DATA_T_BIN;
+ if (idx + smp->data.u.str.len > max_size)
+ goto skip;
+ idx += encode_spoe_string(smp->data.u.str.str,
+ smp->data.u.str.len,
+ p+idx);
+ break;
+ case SMP_T_METH:
+ if (smp->data.u.meth.meth == HTTP_METH_OTHER) {
+ p[idx++] = SPOE_DATA_T_STR;
+ if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size)
+ goto skip;
+ idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name,
+ http_known_methods[smp->data.u.meth.meth].len,
+ p+idx);
+ }
+ else {
+ p[idx++] = SPOE_DATA_T_STR;
+ if (idx + smp->data.u.str.len > max_size)
+ goto skip;
+ idx += encode_spoe_string(smp->data.u.meth.str.str,
+ smp->data.u.meth.str.len,
+ p+idx);
+ }
+ break;
+ default:
+ p[idx++] = SPOE_DATA_T_NULL;
+ }
+ }
+ }
+ ctx->buffer->i = idx;
+ return 1;
+
+ skip:
+ b_reset(ctx->buffer);
+ return 0;
+}
+
+/* Helper function to set a variable */
+static void
+set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
+ struct sample *smp)
+{
+ struct spoe_config *conf = FLT_CONF(ctx->filter);
+ struct spoe_agent *agent = conf->agent;
+ char varname[64];
+
+ memset(varname, 0, sizeof(varname));
+ len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
+ scope, agent->var_pfx, len, name);
+ vars_set_by_name_ifexist(varname, len, smp);
+}
+
+/* Helper function to unset a variable */
+static void
+unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len,
+ struct sample *smp)
+{
+ struct spoe_config *conf = FLT_CONF(ctx->filter);
+ struct spoe_agent *agent = conf->agent;
+ char varname[64];
+
+ memset(varname, 0, sizeof(varname));
+ len = snprintf(varname, sizeof(varname), "%s.%s.%.*s",
+ scope, agent->var_pfx, len, name);
+ vars_unset_by_name_ifexist(varname, len, smp);
+}
+
+
+/* Process SPOE actions for a specific event. During the processing, it returns
+ * 0 and it returns 1 when the processing is finished. If an error occurred, -1
+ * is returned. */
+static int
+process_spoe_actions(struct stream *s, struct spoe_context *ctx,
+ enum spoe_event ev, int dir)
+{
+ char *p;
+ size_t size;
+ int off, i, idx = 0;
+
+ p = ctx->buffer->p;
+ size = ctx->buffer->i;
+
+ while (idx < size) {
+ char *str;
+ uint64_t sz;
+ struct sample smp;
+ enum spoe_action_type type;
+
+ off = idx;
+ if (idx+2 > size)
+ goto skip;
+
+ type = p[idx++];
+ switch (type) {
+ case SPOE_ACT_T_SET_VAR: {
+ char *scope;
+
+ if (p[idx++] != 3)
+ goto skip_action;
+
+ switch (p[idx++]) {
+ case SPOE_SCOPE_PROC: scope = "proc"; break;
+ case SPOE_SCOPE_SESS: scope = "sess"; break;
+ case SPOE_SCOPE_TXN : scope = "txn"; break;
+ case SPOE_SCOPE_REQ : scope = "req"; break;
+ case SPOE_SCOPE_RES : scope = "res"; break;
+ default: goto skip;
+ }
+
+ idx += decode_spoe_string(p+idx, p+size, &str, &sz);
+ if (str == NULL)
+ goto skip;
+ memset(&smp, 0, sizeof(smp));
+ smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+ if (decode_spoe_data(p+idx, p+size, &smp) == -1)
+ goto skip;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - set-var '%s.%s.%.*s'\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
+ __FUNCTION__, s, scope,
+ ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
+ (int)sz, str);
+
+ set_spoe_var(ctx, scope, str, sz, &smp);
+ break;
+ }
+
+ case SPOE_ACT_T_UNSET_VAR: {
+ char *scope;
+
+ if (p[idx++] != 2)
+ goto skip_action;
+
+ switch (p[idx++]) {
+ case SPOE_SCOPE_PROC: scope = "proc"; break;
+ case SPOE_SCOPE_SESS: scope = "sess"; break;
+ case SPOE_SCOPE_TXN : scope = "txn"; break;
+ case SPOE_SCOPE_REQ : scope = "req"; break;
+ case SPOE_SCOPE_RES : scope = "res"; break;
+ default: goto skip;
+ }
+
+ idx += decode_spoe_string(p+idx, p+size, &str, &sz);
+ if (str == NULL)
+ goto skip;
+ memset(&smp, 0, sizeof(smp));
+ smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL);
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - unset-var '%s.%s.%.*s'\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
+ __FUNCTION__, s, scope,
+ ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx,
+ (int)sz, str);
+
+ unset_spoe_var(ctx, scope, str, sz, &smp);
+ break;
+ }
+
+ default:
+ skip_action:
+ if ((i = skip_spoe_action(p+off, p+size)) == -1)
+ goto skip;
+ idx += i;
+ }
+ }
+
+ return 1;
+ skip:
+ return 0;
+}
+
+
+/* Process a SPOE event. First, this functions will process messages attached to
+ * this event and send them to an agent in a NOTIFY frame. Then, it will wait a
+ * ACK frame to process corresponding actions. During all the processing, it
+ * returns 0 and it returns 1 when the processing is finished. If an error
+ * occurred, -1 is returned. */
+static int
+process_spoe_event(struct stream *s, struct spoe_context *ctx,
+ enum spoe_event ev)
+{
+ int dir, ret = 1;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
+ " - ctx-state=%s - event=%s\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id,
+ __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
+ spoe_event_str[ev]);
+
+ dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES);
+
+ if (LIST_ISEMPTY(&(ctx->messages[ev])))
+ goto out;
+
+ if (ctx->state == SPOE_CTX_ST_ERROR)
+ goto error;
+
+ if (ctx->state == SPOE_CTX_ST_READY) {
+ ret = acquire_spoe_appctx(ctx, dir);
+ if (ret <= 0) {
+ if (!ret)
+ goto out;
+ goto error;
+ }
+ ctx->state = SPOE_CTX_ST_SENDING_MSGS;
+ }
+
+ if (ctx->appctx == NULL)
+ goto error;
+
+ if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) {
+ ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir);
+ if (ret <= 0) {
+ if (!ret)
+ goto skip;
+ goto error;
+ }
+ wakeup_spoe_appctx(ctx);
+ ret = 0;
+ goto out;
+ }
+
+ if (ctx->state == SPOE_CTX_ST_WAITING_ACK) {
+ wakeup_spoe_appctx(ctx);
+ ret = 0;
+ goto out;
+ }
+
+ if (ctx->state == SPOE_CTX_ST_DONE) {
+ ret = process_spoe_actions(s, ctx, ev, dir);
+ if (ret <= 0) {
+ if (!ret)
+ goto skip;
+ goto error;
+ }
+ ctx->frame_id++;
+ release_spoe_appctx(ctx);
+ ctx->state = SPOE_CTX_ST_READY;
+ }
+
+ out:
+ return ret;
+
+ skip:
+ release_spoe_appctx(ctx);
+ ctx->state = SPOE_CTX_ST_READY;
+ return 1;
+
+ error:
+ release_spoe_appctx(ctx);
+ ctx->state = SPOE_CTX_ST_ERROR;
+ return 1;
+}
+
+
+/***************************************************************************
+ * Functions that create/destroy SPOE contexts
+ **************************************************************************/
+static struct spoe_context *
+create_spoe_context(struct filter *filter)
+{
+ struct spoe_config *conf = FLT_CONF(filter);
+ struct spoe_context *ctx;
+
+ ctx = pool_alloc_dirty(pool2_spoe_ctx);
+ if (ctx == NULL) {
+ return NULL;
+ }
+ memset(ctx, 0, sizeof(*ctx));
+ ctx->filter = filter;
+ ctx->state = SPOE_CTX_ST_NONE;
+ ctx->flags = 0;
+ ctx->errs = 0;
+ ctx->messages = conf->agent->messages;
+ ctx->buffer = &buf_empty;
+ LIST_INIT(&ctx->buffer_wait);
+ LIST_INIT(&ctx->applet_wait);
+
+ ctx->stream_id = 0;
+ ctx->frame_id = 1;
+
+ return ctx;
+}
+
+static void
+destroy_spoe_context(struct spoe_context *ctx)
+{
+ if (!ctx)
+ return;
+
+ if (ctx->appctx)
+ APPCTX_SPOE(ctx->appctx).ctx = NULL;
+ if (!LIST_ISEMPTY(&ctx->buffer_wait))
+ LIST_DEL(&ctx->buffer_wait);
+ if (!LIST_ISEMPTY(&ctx->applet_wait))
+ LIST_DEL(&ctx->applet_wait);
+ pool_free2(pool2_spoe_ctx, ctx);
+}
+
+static void
+reset_spoe_context(struct spoe_context *ctx)
+{
+ ctx->state = SPOE_CTX_ST_READY;
+ ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+}
+
+
+/***************************************************************************
+ * Hooks that manage the filter lifecycle (init/check/deinit)
+ **************************************************************************/
+/* Signal handler: Do a soft stop, wakeup SPOE applet */
+static void
+sig_stop_spoe(struct sig_handler *sh)
+{
+ struct proxy *p;
+
+ p = proxy;
+ while (p) {
+ struct flt_conf *fconf;
+
+ list_for_each_entry(fconf, &p->filter_configs, list) {
+ struct spoe_config *conf = fconf->conf;
+ struct spoe_agent *agent = conf->agent;
+ struct appctx *appctx;
+
+ list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) {
+ si_applet_want_get(appctx->owner);
+ si_applet_want_put(appctx->owner);
+ appctx_wakeup(appctx);
+ }
+ }
+ p = p->next;
+ }
+}
+
+
+/* Initialize the SPOE filter. Returns -1 on error, else 0. */
+static int
+spoe_init(struct proxy *px, struct flt_conf *fconf)
+{
+ struct spoe_config *conf = fconf->conf;
+ struct listener *l;
+
+ memset(&conf->agent_fe, 0, sizeof(conf->agent_fe));
+ init_new_proxy(&conf->agent_fe);
+ conf->agent_fe.parent = conf->agent;
+ conf->agent_fe.last_change = now.tv_sec;
+ conf->agent_fe.id = conf->agent->id;
+ conf->agent_fe.cap = PR_CAP_FE;
+ conf->agent_fe.mode = PR_MODE_TCP;
+ conf->agent_fe.maxconn = 0;
+ conf->agent_fe.options2 |= PR_O2_INDEPSTR;
+ conf->agent_fe.conn_retries = CONN_RETRIES;
+ conf->agent_fe.accept = frontend_accept;
+ conf->agent_fe.srv = NULL;
+ conf->agent_fe.timeout.client = TICK_ETERNITY;
+ conf->agent_fe.default_target = &spoe_applet.obj_type;
+ conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES;
+
+ if ((l = calloc(1, sizeof(*l))) == NULL) {
+ Alert("spoe_init : out of memory.\n");
+ goto out_error;
+ }
+ l->obj_type = OBJ_TYPE_LISTENER;
+ l->obj_type = OBJ_TYPE_LISTENER;
+ l->frontend = &conf->agent_fe;
+ l->state = LI_READY;
+ l->analysers = conf->agent_fe.fe_req_ana;
+ LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe);
+
+ if (!sighandler_registered) {
+ signal_register_fct(0, sig_stop_spoe, 0);
+ sighandler_registered = 1;
+ }
+
+ return 0;
+
+ out_error:
+ return -1;
+}
+
+/* Free ressources allocated by the SPOE filter. */
+static void
+spoe_deinit(struct proxy *px, struct flt_conf *fconf)
+{
+ struct spoe_config *conf = fconf->conf;
+
+ if (conf) {
+ struct spoe_agent *agent = conf->agent;
+ struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners,
+ struct listener *, by_fe);
+
+ free(l);
+ release_spoe_agent(agent);
+ free(conf);
+ }
+ fconf->conf = NULL;
+}
+
+/* Check configuration of a SPOE filter for a specified proxy.
+ * Return 1 on error, else 0. */
+static int
+spoe_check(struct proxy *px, struct flt_conf *fconf)
+{
+ struct spoe_config *conf = fconf->conf;
+ struct proxy *target;
+
+ target = proxy_be_by_name(conf->agent->b.name);
+ if (target == NULL) {
+ Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'"
+ " declared at %s:%d.\n",
+ px->id, conf->agent->b.name, conf->agent->id,
+ conf->agent->conf.file, conf->agent->conf.line);
+ return 1;
+ }
+ if (target->mode != PR_MODE_TCP) {
+ Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared"
+ " at %s:%d does not support HTTP mode.\n",
+ px->id, target->id, conf->agent->id,
+ conf->agent->conf.file, conf->agent->conf.line);
+ return 1;
+ }
+
+ free(conf->agent->b.name);
+ conf->agent->b.name = NULL;
+ conf->agent->b.be = target;
+ return 0;
+}
+
+/**************************************************************************
+ * Hooks attached to a stream
+ *************************************************************************/
+/* Called when a filter instance is created and attach to a stream. It creates
+ * the context that will be used to process this stream. */
+static int
+spoe_start(struct stream *s, struct filter *filter)
+{
+ struct spoe_context *ctx;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(filter))->agent->id,
+ __FUNCTION__, s);
+
+ ctx = create_spoe_context(filter);
+ if (ctx == NULL) {
+ send_log(s->be, LOG_EMERG,
+ "failed to create SPOE context for proxy %s\n",
+ s->be->id);
+ return 0;
+ }
+
+ ctx->strm = s;
+ ctx->state = SPOE_CTX_ST_READY;
+ filter->ctx = ctx;
+
+ if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE]))
+ filter->pre_analyzers |= AN_REQ_INSPECT_FE;
+
+ if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE]))
+ filter->pre_analyzers |= AN_REQ_INSPECT_BE;
+
+ if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP]))
+ filter->pre_analyzers |= AN_RES_INSPECT;
+
+ if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE]))
+ filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE;
+
+ if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE]))
+ filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE;
+
+ if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP]))
+ filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE;
+
+ return 1;
+}
+
+/* Called when a filter instance is detached from a stream. It release the
+ * attached SPOE context. */
+static void
+spoe_stop(struct stream *s, struct filter *filter)
+{
+ struct spoe_context *ctx = filter->ctx;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(filter))->agent->id,
+ __FUNCTION__, s);
+
+ if (ctx) {
+ release_spoe_appctx(ctx);
+ destroy_spoe_context(ctx);
+ }
+}
+
+/* Called when we are ready to filter data on a channel */
+static int
+spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
+{
+ struct spoe_context *ctx = filter->ctx;
+ int ret = 1;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
+ " - ctx-flags=0x%08x\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(filter))->agent->id,
+ __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
+
+ if (!(chn->flags & CF_ISRESP)) {
+ if (filter->pre_analyzers & AN_REQ_INSPECT_FE)
+ chn->analysers |= AN_REQ_INSPECT_FE;
+ if (filter->pre_analyzers & AN_REQ_INSPECT_BE)
+ chn->analysers |= AN_REQ_INSPECT_BE;
+
+ if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED)
+ goto out;
+
+ ctx->stream_id = s->uniq_id;
+ if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
+ ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS);
+ if (ret != 1)
+ goto out;
+ }
+ ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED;
+ }
+ else {
+ if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP)
+ chn->analysers |= AN_RES_INSPECT;
+
+ if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED)
+ goto out;
+
+ if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) {
+ ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS);
+ if (ret != 1)
+ goto out;
+ }
+ ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED;
+ }
+
+ out:
+ if (!ret) {
+ channel_dont_read(chn);
+ channel_dont_close(chn);
+ }
+ return ret;
+}
+
+/* Called before a processing happens on a given channel */
+static int
+spoe_chn_pre_analyze(struct stream *s, struct filter *filter,
+ struct channel *chn, unsigned an_bit)
+{
+ struct spoe_context *ctx = filter->ctx;
+ int ret = 1;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
+ " - ctx-flags=0x%08x - ana=0x%08x\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(filter))->agent->id,
+ __FUNCTION__, s, spoe_ctx_state_str[ctx->state],
+ ctx->flags, an_bit);
+
+ if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR)
+ goto out;
+
+ switch (an_bit) {
+ case AN_REQ_INSPECT_FE:
+ ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE);
+ break;
+ case AN_REQ_INSPECT_BE:
+ ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE);
+ break;
+ case AN_RES_INSPECT:
+ ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP);
+ break;
+ case AN_REQ_HTTP_PROCESS_FE:
+ ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE);
+ break;
+ case AN_REQ_HTTP_PROCESS_BE:
+ ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE);
+ break;
+ case AN_RES_HTTP_PROCESS_FE:
+ ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP);
+ break;
+ }
+
+ out:
+ if (!ret) {
+ channel_dont_read(chn);
+ channel_dont_close(chn);
+ }
+ return ret;
+}
+
+/* Called when the filtering on the channel ends. */
+static int
+spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn)
+{
+ struct spoe_context *ctx = filter->ctx;
+
+ SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s"
+ " - ctx-flags=0x%08x\n",
+ (int)now.tv_sec, (int)now.tv_usec,
+ ((struct spoe_config *)FLT_CONF(filter))->agent->id,
+ __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags);
+
+ if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) {
+ reset_spoe_context(ctx);
+ }
+
+ return 1;
+}
+
+/********************************************************************
+ * Functions that manage the filter initialization
+ ********************************************************************/
+struct flt_ops spoe_ops = {
+ /* Manage SPOE filter, called for each filter declaration */
+ .init = spoe_init,
+ .deinit = spoe_deinit,
+ .check = spoe_check,
+
+ /* Handle start/stop of SPOE */
+ .attach = spoe_start,
+ .detach = spoe_stop,
+
+ /* Handle channels activity */
+ .channel_start_analyze = spoe_start_analyze,
+ .channel_pre_analyze = spoe_chn_pre_analyze,
+ .channel_end_analyze = spoe_end_analyze,
+};
+
+
+static int
+cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm)
+{
+ const char *err;
+ int i, err_code = 0;
+
+ if ((cfg_scope == NULL && curengine != NULL) ||
+ (cfg_scope != NULL && curengine == NULL) ||
+ strcmp(curengine, cfg_scope))
+ goto out;
+
+ if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */
+ if (!*args[1]) {
+ Alert("parsing [%s:%d] : missing name for spoe-agent section.\n",
+ file, linenum);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+ if (*args[2]) {
+ Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
+ file, linenum, args[2]);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+
+ err = invalid_char(args[1]);
+ if (err) {
+ Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
+ file, linenum, *err, args[0], args[1]);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+
+ if (curagent != NULL) {
+ Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n",
+ file, linenum);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+ if ((curagent = calloc(1, sizeof(*curagent))) == NULL) {
+ Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+
+ curagent->id = strdup(args[1]);
+ curagent->conf.file = strdup(file);
+ curagent->conf.line = linenum;
+ curagent->timeout.hello = TICK_ETERNITY;
+ curagent->timeout.ack = TICK_ETERNITY;
+ curagent->timeout.idle = TICK_ETERNITY;
+ curagent->var_pfx = NULL;
+ curagent->new_applets = 0;
+
+ for (i = 0; i < SPOE_EV_EVENTS; ++i)
+ LIST_INIT(&curagent->messages[i]);
+ LIST_INIT(&curagent->cache);
+ LIST_INIT(&curagent->applet_wq);
+ }
+ else if (!strcmp(args[0], "use-backend")) {
+ if (!*args[1]) {
+ Alert("parsing [%s:%d] : '%s' expects a backend name.\n",
+ file, linenum, args[0]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ if (*args[2]) {
+ Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
+ file, linenum, args[2]);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+ free(curagent->b.name);
+ curagent->b.name = strdup(args[1]);
+ }
+ else if (!strcmp(args[0], "messages")) {
+ int cur_arg = 1;
+ while (*args[cur_arg]) {
+ struct spoe_msg_placeholder *mp = NULL;
+
+ list_for_each_entry(mp, &curmps, list) {
+ if (!strcmp(mp->id, args[cur_arg])) {
+ Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n",
+ file, linenum, args[cur_arg]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ }
+
+ if ((mp = calloc(1, sizeof(*mp))) == NULL) {
+ Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+ mp->id = strdup(args[cur_arg]);
+ LIST_ADDQ(&curmps, &mp->list);
+ cur_arg++;
+ }
+ }
+ else if (!strcmp(args[0], "timeout")) {
+ unsigned int *tv = NULL;
+ const char *res;
+ unsigned timeout;
+
+ if (!*args[1]) {
+ Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n",
+ file, linenum);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ if (!strcmp(args[1], "hello"))
+ tv = &curagent->timeout.hello;
+ else if (!strcmp(args[1], "idle"))
+ tv = &curagent->timeout.idle;
+ else if (!strcmp(args[1], "ack"))
+ tv = &curagent->timeout.ack;
+ else {
+ Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' and 'ack' (got %s).\n",
+ file, linenum, args[1]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ if (!*args[2]) {
+ Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n",
+ file, linenum, args[1]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ res = parse_time_err(args[2], &timeout, TIME_UNIT_MS);
+ if (res) {
+ Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n",
+ file, linenum, *res, args[1]);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+ if (*args[3]) {
+ Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
+ file, linenum, args[3]);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+ *tv = MS_TO_TICKS(timeout);
+ }
+ else if (!strcmp(args[0], "option")) {
+ if (!*args[1]) {
+ Alert("parsing [%s:%d]: '%s' expects an option name.\n",
+ file, linenum, args[0]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ if (!strcmp(args[1], "var-prefix")) {
+ char *tmp;
+
+ if (!*args[2]) {
+ Alert("parsing [%s:%d]: '%s %s' expects a value.\n",
+ file, linenum, args[0],
+ args[1]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ tmp = args[2];
+ while (*tmp) {
+ if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
+ Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n",
+ file, linenum, args[0], args[1]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ tmp++;
+ }
+ curagent->var_pfx = strdup(args[2]);
+ }
+ else {
+ Alert("parsing [%s:%d]: option '%s' is not supported.\n",
+ file, linenum, args[1]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ }
+ else if (*args[0]) {
+ Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n",
+ file, linenum, args[0]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ out:
+ return err_code;
+}
+
+static int
+cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm)
+{
+ struct spoe_message *msg;
+ struct spoe_arg *arg;
+ const char *err;
+ char *errmsg = NULL;
+ int err_code = 0;
+
+ if ((cfg_scope == NULL && curengine != NULL) ||
+ (cfg_scope != NULL && curengine == NULL) ||
+ strcmp(curengine, cfg_scope))
+ goto out;
+
+ if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */
+ if (!*args[1]) {
+ Alert("parsing [%s:%d] : missing name for spoe-message section.\n",
+ file, linenum);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+ if (*args[2]) {
+ Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n",
+ file, linenum, args[2]);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+
+ err = invalid_char(args[1]);
+ if (err) {
+ Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n",
+ file, linenum, *err, args[0], args[1]);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+
+ list_for_each_entry(msg, &curmsgs, list) {
+ if (!strcmp(msg->id, args[1])) {
+ Alert("parsing [%s:%d]: spoe-message section '%s' has the same"
+ " name as another one declared at %s:%d.\n",
+ file, linenum, args[1], msg->conf.file, msg->conf.line);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ }
+
+ if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) {
+ Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+
+ curmsg->id = strdup(args[1]);
+ curmsg->id_len = strlen(curmsg->id);
+ curmsg->event = SPOE_EV_NONE;
+ curmsg->conf.file = strdup(file);
+ curmsg->conf.line = linenum;
+ LIST_INIT(&curmsg->args);
+ LIST_ADDQ(&curmsgs, &curmsg->list);
+ }
+ else if (!strcmp(args[0], "args")) {
+ int cur_arg = 1;
+
+ curproxy->conf.args.ctx = ARGC_SPOE;
+ curproxy->conf.args.file = file;
+ curproxy->conf.args.line = linenum;
+ while (*args[cur_arg]) {
+ char *delim = strchr(args[cur_arg], '=');
+ int idx = 0;
+
+ if ((arg = calloc(1, sizeof(*arg))) == NULL) {
+ Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+
+ if (!delim) {
+ arg->name = NULL;
+ arg->name_len = 0;
+ delim = args[cur_arg];
+ }
+ else {
+ arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]);
+ arg->name_len = delim - args[cur_arg];
+ delim++;
+ }
+
+ arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args);
+ if (arg->expr == NULL) {
+ Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ free(arg->name);
+ free(arg);
+ goto out;
+ }
+ LIST_ADDQ(&curmsg->args, &arg->list);
+ cur_arg++;
+ }
+ curproxy->conf.args.file = NULL;
+ curproxy->conf.args.line = 0;
+ }
+ else if (!strcmp(args[0], "event")) {
+ if (!*args[1]) {
+ Alert("parsing [%s:%d] : missing event name.\n", file, linenum);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+ if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS]))
+ curmsg->event = SPOE_EV_ON_CLIENT_SESS;
+ else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS]))
+ curmsg->event = SPOE_EV_ON_SERVER_SESS;
+
+ else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE]))
+ curmsg->event = SPOE_EV_ON_TCP_REQ_FE;
+ else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE]))
+ curmsg->event = SPOE_EV_ON_TCP_REQ_BE;
+ else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP]))
+ curmsg->event = SPOE_EV_ON_TCP_RSP;
+
+ else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE]))
+ curmsg->event = SPOE_EV_ON_HTTP_REQ_FE;
+ else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE]))
+ curmsg->event = SPOE_EV_ON_HTTP_REQ_BE;
+ else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP]))
+ curmsg->event = SPOE_EV_ON_HTTP_RSP;
+ else {
+ Alert("parsing [%s:%d] : unkown event '%s'.\n",
+ file, linenum, args[1]);
+ err_code |= ERR_ALERT | ERR_ABORT;
+ goto out;
+ }
+ }
+ else if (!*args[0]) {
+ Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n",
+ file, linenum, args[0]);
+ err_code |= ERR_ALERT | ERR_FATAL;
+ goto out;
+ }
+ out:
+ free(errmsg);
+ return err_code;
+}
+
+/* Return -1 on error, else 0 */
+static int
+parse_spoe_flt(char **args, int *cur_arg, struct proxy *px,
+ struct flt_conf *fconf, char **err, void *private)
+{
+ struct list backup_sections;
+ struct spoe_config *conf;
+ struct spoe_message *msg, *msgback;
+ struct spoe_msg_placeholder *mp, *mpback;
+ char *file = NULL, *engine = NULL;
+ int ret, pos = *cur_arg + 1;
+
+ conf = calloc(1, sizeof(*conf));
+ if (conf == NULL) {
+ memprintf(err, "%s: out of memory", args[*cur_arg]);
+ goto error;
+ }
+ conf->proxy = px;
+
+ while (*args[pos]) {
+ if (!strcmp(args[pos], "config")) {
+ if (!*args[pos+1]) {
+ memprintf(err, "'%s' : '%s' option without value",
+ args[*cur_arg], args[pos]);
+ goto error;
+ }
+ file = args[pos+1];
+ pos += 2;
+ }
+ else if (!strcmp(args[pos], "engine")) {
+ if (!*args[pos+1]) {
+ memprintf(err, "'%s' : '%s' option without value",
+ args[*cur_arg], args[pos]);
+ goto error;
+ }
+ engine = args[pos+1];
+ pos += 2;
+ }
+ else {
+ memprintf(err, "unknown keyword '%s'", args[pos]);
+ goto error;
+ }
+ }
+ if (file == NULL) {
+ memprintf(err, "'%s' : missing config file", args[*cur_arg]);
+ goto error;
+ }
+
+ /* backup sections and register SPOE sections */
+ LIST_INIT(&backup_sections);
+ cfg_backup_sections(&backup_sections);
+ cfg_register_section("spoe-agent", cfg_parse_spoe_agent);
+ cfg_register_section("spoe-message", cfg_parse_spoe_message);
+
+ /* Parse SPOE filter configuration file */
+ curengine = engine;
+ curproxy = px;
+ curagent = NULL;
+ curmsg = NULL;
+ ret = readcfgfile(file);
+ curproxy = NULL;
+
+ /* unregister SPOE sections and restore previous sections */
+ cfg_unregister_sections();
+ cfg_restore_sections(&backup_sections);
+
+ if (ret == -1) {
+ memprintf(err, "Could not open configuration file %s : %s",
+ file, strerror(errno));
+ goto error;
+ }
+ if (ret & (ERR_ABORT|ERR_FATAL)) {
+ memprintf(err, "Error(s) found in configuration file %s", file);
+ goto error;
+ }
+
+ /* Check SPOE agent */
+ if (curagent == NULL) {
+ memprintf(err, "No SPOE agent found in file %s", file);
+ goto error;
+ }
+ if (curagent->b.name == NULL) {
+ memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d",
+ curagent->id, curagent->conf.file, curagent->conf.line);
+ goto error;
+ }
+ if (curagent->timeout.hello == TICK_ETERNITY ||
+ curagent->timeout.idle == TICK_ETERNITY ||
+ curagent->timeout.ack == TICK_ETERNITY) {
+ Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n"
+ " | While not properly invalid, you will certainly encounter various problems\n"
+ " | with such a configuration. To fix this, please ensure that all following\n"
+ " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack'.\n",
+ px->id, curagent->id, curagent->conf.file, curagent->conf.line);
+ }
+ if (curagent->var_pfx == NULL) {
+ char *tmp = curagent->id;
+
+ while (*tmp) {
+ if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') {
+ memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. "
+ "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n",
+ curagent->id, curagent->id, curagent->conf.file, curagent->conf.line);
+ goto error;
+ }
+ tmp++;
+ }
+ curagent->var_pfx = strdup(curagent->id);
+ }
+
+ if (LIST_ISEMPTY(&curmps)) {
+ Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n",
+ px->id, curagent->id, curagent->conf.file, curagent->conf.line);
+ goto finish;
+ }
+
+ list_for_each_entry_safe(mp, mpback, &curmps, list) {
+ list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
+ if (!strcmp(msg->id, mp->id)) {
+ if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) {
+ if (msg->event == SPOE_EV_ON_TCP_REQ_BE)
+ msg->event = SPOE_EV_ON_TCP_REQ_FE;
+ if (msg->event == SPOE_EV_ON_HTTP_REQ_BE)
+ msg->event = SPOE_EV_ON_HTTP_REQ_FE;
+ }
+ if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS ||
+ msg->event == SPOE_EV_ON_TCP_REQ_FE ||
+ msg->event == SPOE_EV_ON_HTTP_REQ_FE)) {
+ Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n",
+ px->id, msg->conf.file, msg->conf.line);
+ goto next;
+ }
+ if (msg->event == SPOE_EV_NONE) {
+ Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n",
+ px->id, msg->conf.file, msg->conf.line);
+ goto next;
+ }
+ msg->agent = curagent;
+ LIST_DEL(&msg->list);
+ LIST_ADDQ(&curagent->messages[msg->event], &msg->list);
+ goto next;
+ }
+ }
+ memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d",
+ curagent->id, mp->id, curagent->conf.file, curagent->conf.line);
+ goto error;
+ next:
+ continue;
+ }
+
+ finish:
+ conf->agent = curagent;
+ list_for_each_entry_safe(mp, mpback, &curmps, list) {
+ LIST_DEL(&mp->list);
+ release_spoe_msg_placeholder(mp);
+ }
+ list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
+ Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n",
+ px->id, msg->id, msg->conf.file, msg->conf.line);
+ LIST_DEL(&msg->list);
+ release_spoe_message(msg);
+ }
+
+ *cur_arg = pos;
+ fconf->ops = &spoe_ops;
+ fconf->conf = conf;
+ return 0;
+
+ error:
+ release_spoe_agent(curagent);
+ list_for_each_entry_safe(mp, mpback, &curmps, list) {
+ LIST_DEL(&mp->list);
+ release_spoe_msg_placeholder(mp);
+ }
+ list_for_each_entry_safe(msg, msgback, &curmsgs, list) {
+ LIST_DEL(&msg->list);
+ release_spoe_message(msg);
+ }
+ free(conf);
+ return -1;
+}
+
+
+/* Declare the filter parser for "spoe" keyword */
+static struct flt_kw_list flt_kws = { "SPOE", { }, {
+ { "spoe", parse_spoe_flt, NULL },
+ { NULL, NULL, NULL },
+ }
+};
+
+__attribute__((constructor))
+static void __spoe_init(void)
+{
+ flt_register_keywords(&flt_kws);
+
+ LIST_INIT(&curmsgs);
+ LIST_INIT(&curmps);
+ pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED);
+}
+
+__attribute__((destructor))
+static void
+__spoe_deinit(void)
+{
+ pool_destroy2(pool2_spoe_ctx);
+}
diff --git a/src/log.c b/src/log.c
index 64c7922..1232988 100644
--- a/src/log.c
+++ b/src/log.c
@@ -267,6 +267,8 @@
return "capture";
case ARGC_SRV:
return "server";
+ case ARGC_SPOE:
+ return "spoe-message";
default:
return "undefined(please report this bug)"; /* must never happen */
}
diff --git a/src/sample.c b/src/sample.c
index 35b5913..51e6183 100644
--- a/src/sample.c
+++ b/src/sample.c
@@ -1127,6 +1127,7 @@
case ARGC_CAP: where = "in capture rule in"; break;
case ARGC_ACL: ctx = "ACL keyword"; break;
case ARGC_SRV: where = "in server directive in"; break;
+ case ARGC_SPOE: where = "in spoe-message directive in"; break;
}
/* set a few default settings */