MINOR: spoa-server: Prepare responses
This patch adds SPOP responses managament. It provides SPOP
encoding primitives. It also move the example function
ip_reputation to this new behavior.
diff --git a/contrib/spoa_server/spoa.c b/contrib/spoa_server/spoa.c
index e484315..53fc759 100644
--- a/contrib/spoa_server/spoa.c
+++ b/contrib/spoa_server/spoa.c
@@ -156,26 +156,30 @@
check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
{
char str[INET_ADDRSTRLEN];
+ unsigned int score;
if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
return;
- w->ip_score = random() % 100;
+ score = random() % 100;
+ set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score);
- DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
+ DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, score);
}
static void
check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
{
char str[INET6_ADDRSTRLEN];
+ unsigned int score;
if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
return;
- w->ip_score = random() % 100;
+ score = random() % 100;
+ set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score);
- DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
+ DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, score);
}
static int
@@ -700,6 +704,159 @@
return -1;
}
+/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
+ * the number of written bytes otherwise. */
+static void prepare_agentack(struct worker *w)
+{
+ w->ack_len = 0;
+
+ /* Frame type */
+ w->ack[w->ack_len++] = SPOE_FRM_T_AGENT_ACK;
+
+ /* No flags for now */
+ memset(w->ack + w->ack_len, 0, 4); /* No flags */
+ w->ack_len += 4;
+
+ /* Set stream-id and frame-id for ACK frames */
+ w->ack_len += encode_spoe_varint(w->stream_id, w->ack + w->ack_len);
+ w->ack_len += encode_spoe_varint(w->frame_id, w->ack + w->ack_len);
+}
+
+static inline
+int set_var_name(struct worker *w, const char *name, int name_len, unsigned char scope)
+{
+ w->ack[w->ack_len++] = SPOE_ACT_T_SET_VAR; /* Action type */
+ w->ack[w->ack_len++] = 3; /* Number of args */
+ w->ack[w->ack_len++] = scope; /* Arg 1: the scope */
+ w->ack_len += encode_spoe_string(name, name_len, w->ack+w->ack_len); /* Arg 2: variable name */
+ return 1;
+}
+
+int set_var_null(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope)
+{
+ if (!set_var_name(w, name, name_len, scope))
+ return 0;
+ w->ack[w->ack_len++] = SPOE_DATA_T_NULL;
+ return 1;
+}
+
+int set_var_bool(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, bool value)
+{
+ if (!set_var_name(w, name, name_len, scope))
+ return 0;
+ w->ack[w->ack_len++] = SPOE_DATA_T_BOOL | (!!value << 4);
+ return 1;
+}
+
+static inline
+int set_var_int(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, int type, uint64_t value)
+{
+ if (!set_var_name(w, name, name_len, scope))
+ return 0;
+ w->ack[w->ack_len++] = SPOE_DATA_T_UINT32;
+ w->ack_len += encode_spoe_varint(value, w->ack+w->ack_len); /* Arg 3: variable value */
+ return 1;
+}
+
+int set_var_uint32(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, uint32_t value)
+{
+ return set_var_int(w, name, name_len, scope, SPOE_DATA_T_UINT32, value);
+}
+
+int set_var_int32(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, int32_t value)
+{
+ return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
+}
+
+int set_var_uint64(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, uint64_t value)
+{
+ return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
+}
+
+int set_var_int64(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, int64_t value)
+{
+ return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
+}
+
+int set_var_ipv4(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope,
+ struct in_addr *ipv4)
+{
+ if (!set_var_name(w, name, name_len, scope))
+ return 0;
+ w->ack[w->ack_len++] = SPOE_DATA_T_IPV4;
+ memcpy(w->ack+w->ack_len, ipv4, 4);
+ w->ack_len += 4;
+ return 1;
+}
+
+int set_var_ipv6(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope,
+ struct in6_addr *ipv6)
+{
+ if (!set_var_name(w, name, name_len, scope))
+ return 0;
+ w->ack[w->ack_len++] = SPOE_DATA_T_IPV6;
+ memcpy(w->ack+w->ack_len, ipv6, 16);
+ w->ack_len += 16;
+ return 1;
+}
+
+static inline
+int set_var_buf(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, int type,
+ const char *str, int str_len)
+{
+ if (!set_var_name(w, name, name_len, scope))
+ return 0;
+ w->ack[w->ack_len++] = type;
+ w->ack_len += encode_spoe_string(str, str_len, w->ack+w->ack_len);
+ return 1;
+}
+
+int set_var_string(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope,
+ const char *str, int strlen)
+{
+ return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_STR, str, strlen);
+}
+
+int set_var_bin(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope,
+ const char *str, int strlen)
+{
+ return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_BIN, str, strlen);
+}
+
+/* This function is a little bit ugly,
+ * TODO: improve the response without copying the bufer
+ */
+static int commit_agentack(struct worker *w)
+{
+ memcpy(w->buf, w->ack, w->ack_len);
+ w->len = w->ack_len;
+ return 1;
+}
+
/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
* occurred, 0 if the frame must be skipped, otherwise the number of read
* bytes. */
@@ -737,6 +894,9 @@
DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
w->stream_id, w->frame_id);
+ /* Prepara ack, if the processing fails tha ack will be cancelled */
+ prepare_agentack(w);
+
/* Loop on messages */
while (idx < w->len) {
char *str;
@@ -840,39 +1000,6 @@
return idx;
}
-/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
- * the number of written bytes otherwise. */
-static int
-prepare_agentack(struct worker *w)
-{
- int idx = 0;
-
- /* Frame type */
- w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
-
- /* No flags for now */
- memset(w->buf+idx, 0, 4); /* No flags */
- idx += 4;
-
- /* Set stream-id and frame-id for ACK frames */
- idx += encode_spoe_varint(w->stream_id, w->buf+idx);
- idx += encode_spoe_varint(w->frame_id, w->buf+idx);
-
- /* Data */
- if (w->ip_score == -1)
- goto out;
-
- w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */
- w->buf[idx++] = 3; /* Number of args */
- w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */
- idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
- w->buf[idx++] = SPOE_DATA_T_UINT32;
- idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
-out:
- w->len = idx;
- return idx;
-}
-
/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
* occurred, the number of written bytes otherwise. */
static int
@@ -957,7 +1084,7 @@
LOG("Failed to handle Haproxy NOTIFY frame");
goto error_or_quit;
}
- if (prepare_agentack(w) < 0) {
+ if (commit_agentack(w) < 0) {
LOG("Failed to prepare Agent ACK frame");
goto error_or_quit;
}
@@ -1022,7 +1149,6 @@
if (w.healthcheck == true)
goto close;
while (1) {
- w.ip_score = -1;
if (notify_ack_roundtip(csock, &w) < 0)
break;
}
diff --git a/contrib/spoa_server/spoa.h b/contrib/spoa_server/spoa.h
index ca85181..ee19f37 100644
--- a/contrib/spoa_server/spoa.h
+++ b/contrib/spoa_server/spoa.h
@@ -55,7 +55,8 @@
unsigned int stream_id;
unsigned int frame_id;
bool healthcheck;
- int ip_score; /* -1 if unset, else between 0 and 100 */
+ char ack[MAX_FRAME_SIZE];
+ unsigned int ack_len;
};
struct chunk {
@@ -106,6 +107,41 @@
void ps_register(struct ps *ps);
void ps_register_message(struct ps *ps, const char *name, void *ref);
+int set_var_null(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope);
+int set_var_bool(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, bool value);
+int set_var_uint32(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, uint32_t value);
+int set_var_int32(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, int32_t value);
+int set_var_uint64(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, uint64_t value);
+int set_var_int64(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope, int64_t value);
+int set_var_ipv4(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope,
+ struct in_addr *ipv4);
+int set_var_ipv6(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope,
+ struct in6_addr *ipv6);
+int set_var_string(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope,
+ const char *str, int strlen);
+int set_var_bin(struct worker *w,
+ const char *name, int name_len,
+ unsigned char scope,
+ const char *str, int strlen);
+
#define LOG(fmt, args...) \
do { \
struct timeval now; \