MINOR: spoe: Add support for fragmentation capability in the SPOA example
This is just an example. So be careful to not send really huge payload because
it would eat all your memory.
diff --git a/contrib/spoa_example/spoa.c b/contrib/spoa_example/spoa.c
index 5c3a453..8e234b5 100644
--- a/contrib/spoa_example/spoa.c
+++ b/contrib/spoa_example/spoa.c
@@ -98,6 +98,9 @@
SPOE_FRM_ERR_NO_CAP,
SPOE_FRM_ERR_BAD_VSN,
SPOE_FRM_ERR_BAD_FRAME_SIZE,
+ SPOE_FRM_ERR_FRAG_NOT_SUPPORTED,
+ SPOE_FRM_ERR_INTERLACED_FRAMES,
+ SPOE_FRM_ERR_RES,
SPOE_FRM_ERR_UNKNOWN = 99,
SPOE_FRM_ERRS,
};
@@ -131,6 +134,10 @@
SPOA_FRM_T_AGENT,
};
+/* Flags set on the SPOE frame */
+#define SPOE_FRM_FL_FIN 0x00000001
+#define SPOE_FRM_FL_ABRT 0x00000002
+
/* Masks to get data type or flags value */
#define SPOE_DATA_T_MASK 0x0F
#define SPOE_DATA_FL_MASK 0xF0
@@ -157,8 +164,10 @@
unsigned int stream_id;
unsigned int frame_id;
- bool hcheck; /* true is the CONNECT frame is a healthcheck */
- int ip_score; /* -1 if unset, else between 0 and 100 */
+ unsigned int flags;
+ bool hcheck; /* true is the CONNECT frame is a healthcheck */
+ bool fragmented; /* true if the frame is fragmented */
+ int ip_score; /* -1 if unset, else between 0 and 100 */
struct event process_frame_event;
struct worker *worker;
@@ -166,6 +175,9 @@
struct client *client;
struct list list;
+ char *frag_buf; /* used to accumulate payload of a fragmented frame */
+ unsigned int frag_len;
+
char data[0];
};
@@ -190,6 +202,7 @@
struct spoe_engine *engine;
bool pipelining;
bool async;
+ bool fragmentation;
struct worker *worker;
struct list by_worker;
@@ -244,20 +257,24 @@
static bool debug = false;
static bool pipelining = false;
static bool async = false;
+static bool fragmentation = false;
static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
- [SPOE_FRM_ERR_NONE] = "normal",
- [SPOE_FRM_ERR_IO] = "I/O error",
- [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
- [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
- [SPOE_FRM_ERR_INVALID] = "invalid frame received",
- [SPOE_FRM_ERR_NO_VSN] = "version value not found",
- [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
- [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
- [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
- [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
- [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
+ [SPOE_FRM_ERR_NONE] = "normal",
+ [SPOE_FRM_ERR_IO] = "I/O error",
+ [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
+ [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
+ [SPOE_FRM_ERR_INVALID] = "invalid frame received",
+ [SPOE_FRM_ERR_NO_VSN] = "version value not found",
+ [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
+ [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
+ [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
+ [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
+ [SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported",
+ [SPOE_FRM_ERR_INTERLACED_FRAMES] = "invalid interlaced frames",
+ [SPOE_FRM_ERR_RES] = "resource allocation error",
+ [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
};
static void signal_cb(evutil_socket_t, short, void *);
@@ -640,6 +657,15 @@
client->async = true;
}
}
+ else if (sz - i >= 13 && !strncmp(str + i, "fragmentation", 13)) {
+ i += 5;
+ if (sz == i || isspace(str[i]) || str[i] == ',') {
+ DEBUG(frame->worker,
+ "<%lu> HAProxy supports fragmented frame",
+ client->id);
+ client->fragmentation = true;
+ }
+ }
if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
break;
@@ -740,9 +766,16 @@
DEBUG(frame->worker, "<%lu> Decode HAProxy HELLO frame", client->id);
- /* Skip flags */
+ /* Retrieve flags */
+ memcpy((char *)&(frame->flags), buf+idx, 4);
idx += 4;
+ /* Fragmentation is not supported for HELLO frame */
+ if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+ client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ goto error;
+ }
+
/* stream-id and frame-id must be cleared */
if (buf[idx] != 0 || buf[idx+1] != 0) {
client->status_code = SPOE_FRM_ERR_INVALID;
@@ -846,9 +879,16 @@
DEBUG(frame->worker, "<%lu> Decode HAProxy DISCONNECT frame", client->id);
- /* Skip flags */
+ /* Retrieve flags */
+ memcpy((char *)&(frame->flags), buf+idx, 4);
idx += 4;
+ /* Fragmentation is not supported for DISCONNECT frame */
+ if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+ client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ goto error;
+ }
+
/* stream-id and frame-id must be cleared */
if (buf[idx] != 0 || buf[idx+1] != 0) {
client->status_code = SPOE_FRM_ERR_INVALID;
@@ -906,8 +946,8 @@
}
/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
- * occurred or if the frame must be ignored, 0 if the frame must be ack without
- * any processing, otherwise the number of read bytes (always > 0). */
+ * occurred, 0 if it must be must be ignored, otherwise the number of read
+ * bytes. */
static int
handle_hanotify(struct spoe_frame *frame)
{
@@ -923,9 +963,16 @@
DEBUG(frame->worker, "<%lu> Decode HAProxy NOTIFY frame", client->id);
- /* Skip flags */
+ /* Retrieve flags */
+ memcpy((char *)&(frame->flags), buf+idx, 4);
idx += 4;
+ /* Fragmentation is not supported for DISCONNECT frame */
+ if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
+ client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ goto error;
+ }
+
/* Read the stream-id */
if ((i = decode_spoe_varint(buf+idx, end, &stream_id)) == -1)
goto ignore;
@@ -936,20 +983,49 @@
goto ignore;
idx += i;
- frame->stream_id = (unsigned int)stream_id;
- frame->frame_id = (unsigned int)frame_id;
+ if (frame->fragmented == true) {
+ if (frame->stream_id != (unsigned int)stream_id ||
+ frame->frame_id != (unsigned int)frame_id) {
+ client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
+ goto error;
+ }
- DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u",
- client->id, frame->stream_id, frame->frame_id);
+ if (frame->flags & SPOE_FRM_FL_ABRT) {
+ DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+ " - Abort processing of a fragmented frame"
+ " - frag_len=%u - len=%u - offset=%u",
+ client->id, frame->stream_id, frame->frame_id,
+ frame->frag_len, frame->len, idx);
+ goto ignore;
+ }
+ DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+ " - %s fragment of a fragmented frame received"
+ " - frag_len=%u - len=%u - offset=%u",
+ client->id, frame->stream_id, frame->frame_id,
+ (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
+ frame->frag_len, frame->len, idx);
+ }
+ else {
+ frame->stream_id = (unsigned int)stream_id;
+ frame->frame_id = (unsigned int)frame_id;
- if (buf + idx == end) {
- return 0;
+ DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+ " - %s frame received"
+ " - frag_len=%u - len=%u - offset=%u",
+ client->id, frame->stream_id, frame->frame_id,
+ (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
+ frame->frag_len, frame->len, idx);
+
+ frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
}
frame->offset = idx;
return idx;
ignore:
+ return 0;
+
+ error:
return -1;
}
@@ -960,7 +1036,9 @@
{
struct client *client = frame->client;
char *buf = frame->buf;
- int idx = 0;
+ char capabilities[64];
+ int n, idx = 0;
+ unsigned int flags = SPOE_FRM_FL_FIN;
DEBUG(frame->worker, "<%lu> Encode Agent HELLO frame", client->id);
frame->type = SPOA_FRM_T_AGENT;
@@ -968,8 +1046,8 @@
/* Frame Type */
buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
- /* No flags for now */
- memset(buf+idx, 0, 4); /* No flags */
+ /* Set flags */
+ memcpy(buf+idx, (char *)&flags, 4);
idx += 4;
/* No stream-id and frame-id for HELLO frames */
@@ -994,18 +1072,41 @@
/* "capabilities" K/V item */
idx += encode_spoe_string("capabilities", 12, buf+idx);
buf[idx++] = SPOE_DATA_T_STR;
- if (client->pipelining == true && client->async == true)
- idx += encode_spoe_string("pipelining,async", 16, buf+idx);
- else if (client->pipelining == true)
- idx += encode_spoe_string("pipelining", 10, buf+idx);
- else if (client->async == true)
- idx += encode_spoe_string("async", 5, buf+idx);
+
+ memset(capabilities, 0, sizeof(capabilities));
+ n = 0;
+
+ /* 1. Fragmentation capability ? */
+ if (fragmentation == true) {
+ memcpy(capabilities, "fragmentation", 13);
+ n += 13;
+ }
+ /* 2. Pipelining capability ? */
+ if (client->pipelining == true && n != 0) {
+ memcpy(capabilities + n, ", pipelining", 12);
+ n += 12;
+ }
+ else if (client->pipelining == true) {
+ memcpy(capabilities, "pipelining", 10);
+ n += 10;
+ }
+ /* 3. Async capability ? */
+ if (client->async == true && n != 0) {
+ memcpy(capabilities + n, ", async", 7);
+ n += 7;
+ }
+ else if (client->async == true) {
+ memcpy(capabilities, "async", 5);
+ n += 5;
+ }
+ /* 4. Encode capabilities string */
+ if (n != 0)
+ idx += encode_spoe_string(capabilities, n, buf+idx);
else
idx += encode_spoe_string(NULL, 0, buf+idx);
- DEBUG(frame->worker, "<%lu> Agent capabilities : %s %s",
- client->id, (client->pipelining?"pipelining":""),
- (client->async?"async":""));
+ DEBUG(frame->worker, "<%lu> Agent capabilities : %.*s",
+ client->id, n, capabilities);
frame->len = idx;
return idx;
@@ -1020,6 +1121,7 @@
char *buf = frame->buf;
const char *reason;
int rlen, idx = 0;
+ unsigned int flags = SPOE_FRM_FL_FIN;
DEBUG(frame->worker, "<%lu> Encode Agent DISCONNECT frame", client->id);
frame->type = SPOA_FRM_T_AGENT;
@@ -1032,8 +1134,8 @@
/* Frame type */
buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
- /* No flags for now */
- memset(buf+idx, 0, 4);
+ /* Set flags */
+ memcpy(buf+idx, (char *)&flags, 4);
idx += 4;
/* No stream-id and frame-id for DISCONNECT frames */
@@ -1065,8 +1167,9 @@
static int
prepare_agentack(struct spoe_frame *frame)
{
- char *buf = frame->buf;
- int idx = 0;
+ char *buf = frame->buf;
+ int idx = 0;
+ unsigned int flags = SPOE_FRM_FL_FIN;
/* Be careful here, in async mode, frame->client can be NULL */
@@ -1076,8 +1179,8 @@
/* Frame type */
buf[idx++] = SPOE_FRM_T_AGENT_ACK;
- /* No flags for now */
- memset(buf+idx, 0, 4); /* No flags */
+ /* Set flags */
+ memcpy(buf+idx, (char *)&flags, 4);
idx += 4;
/* Set stream-id and frame-id for ACK frames */
@@ -1140,6 +1243,8 @@
worker = frame->worker;
LIST_DEL(&frame->list);
+ if (frame->frag_buf)
+ free(frame->frag_buf);
memset(frame, 0, sizeof(*frame)+max_frame_size+4);
LIST_ADDQ(&worker->frames, &frame->list);
}
@@ -1186,14 +1291,21 @@
if (frame == NULL)
return;
- frame->type = SPOA_FRM_T_UNKNOWN;
- frame->buf = (char *)(frame->data);
- frame->offset = 0;
- frame->len = 0;
- frame->stream_id = 0;
- frame->frame_id = 0;
- frame->hcheck = false;
- frame->ip_score = -1;
+ if (frame->frag_buf)
+ free(frame->frag_buf);
+
+ frame->type = SPOA_FRM_T_UNKNOWN;
+ frame->buf = (char *)(frame->data);
+ frame->offset = 0;
+ frame->len = 0;
+ frame->stream_id = 0;
+ frame->frame_id = 0;
+ frame->flags = 0;
+ frame->hcheck = false;
+ frame->fragmented = false;
+ frame->ip_score = -1;
+ frame->frag_buf = NULL;
+ frame->frag_len = 0;
LIST_INIT(&frame->list);
}
@@ -1414,8 +1526,8 @@
int idx = frame->offset;
DEBUG(frame->worker,
- "Process frame messages : STREAM-ID=%u - FRAME-ID=%u",
- frame->stream_id, frame->frame_id);
+ "Process frame messages : STREAM-ID=%u - FRAME-ID=%u - length=%u bytes",
+ frame->stream_id, frame->frame_id, frame->len - frame->offset);
/* Loop on messages */
while (buf+idx < end) {
@@ -1471,7 +1583,10 @@
stop_processing:
/* Prepare agent ACK frame */
+ frame->buf = (char *)(frame->data) + 4;
frame->offset = 0;
+ frame->len = 0;
+ frame->flags = 0;
idx = prepare_agentack(frame);
if (frame->ip_score != -1) {
@@ -1547,20 +1662,19 @@
goto write_frame;
case SPOA_ST_PROCESSING:
- n = handle_hanotify(frame);
- if (n < 0 && frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
+ if (frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
client->state = SPOA_ST_DISCONNECTING;
goto disconnecting;
}
+ n = handle_hanotify(frame);
if (n < 0) {
- LOG(client->worker, "Ignore invalid or unknown frame");
- goto ignore_frame;
+ LOG(client->worker, "Failed to decode frame: %s",
+ spoe_frm_err_reasons[client->status_code]);
+ goto disconnect;
}
if (n == 0) {
- DEBUG(client->worker, "<%lu> No message found, ack it now",
- client->id);
- prepare_agentack(frame);
- goto write_frame;
+ LOG(client->worker, "Ignore invalid/unknown/aborted frame");
+ goto ignore_frame;
}
else
goto process_frame;
@@ -1583,6 +1697,34 @@
return;
process_frame:
+ if (frame->fragmented == true) {
+ char *buf;
+ size_t len = frame->len - frame->offset;
+
+ buf = realloc(frame->frag_buf, frame->frag_len + len);
+ if (buf == NULL) {
+ client->status_code = SPOE_FRM_ERR_RES;
+ goto disconnect;
+ }
+ memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
+ frame->frag_buf = buf;
+ frame->frag_len += len;
+
+ if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+ /* Wait for next fragments */
+ frame->buf = (char *)(frame->data);
+ frame->offset = 0;
+ frame->len = 0;
+ frame->flags = 0;
+ return;
+ }
+
+ frame->buf = frame->frag_buf;
+ frame->len = frame->frag_len;
+ frame->offset = 0;
+ /* fall through */
+ }
+
process_incoming_frame(frame);
client->incoming_frame = NULL;
return;
@@ -1829,7 +1971,7 @@
" but can be in any other unit if the number is suffixed\n"
" by a unit (us, ms, s)\n"
"\n"
- " Supported capabilities: pipelining, async\n",
+ " Supported capabilities: fragmentation, pipelining, async\n",
prog, MAX_FRAME_SIZE, DEFAULT_PORT, NUM_WORKERS);
}
@@ -1863,6 +2005,8 @@
pipelining = true;
else if (!strcmp(optarg, "async"))
async = true;
+ else if (!strcmp(optarg, "fragmentation"))
+ fragmentation = true;
else
fprintf(stderr, "WARNING: unsupported capability '%s'\n", optarg);
break;
@@ -1964,8 +2108,8 @@
DEBUG(&null_worker,
"Server is ready"
- " [pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
- (pipelining?"true":"false"), (async?"true":"false"),
+ " [fragmentation=%s - pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
+ (fragmentation?"true":"false"), (pipelining?"true":"false"), (async?"true":"false"),
(debug?"true":"false"), max_frame_size);
event_base_dispatch(base);