MINOR: spoe: Add support for fragmentation capability in the SPOA example

This is just an example. So be careful to not send really huge payload because
it would eat all your memory.
diff --git a/contrib/spoa_example/spoa.c b/contrib/spoa_example/spoa.c
index 5c3a453..8e234b5 100644
--- a/contrib/spoa_example/spoa.c
+++ b/contrib/spoa_example/spoa.c
@@ -98,6 +98,9 @@
 	SPOE_FRM_ERR_NO_CAP,
 	SPOE_FRM_ERR_BAD_VSN,
 	SPOE_FRM_ERR_BAD_FRAME_SIZE,
+	SPOE_FRM_ERR_FRAG_NOT_SUPPORTED,
+	SPOE_FRM_ERR_INTERLACED_FRAMES,
+	SPOE_FRM_ERR_RES,
 	SPOE_FRM_ERR_UNKNOWN = 99,
 	SPOE_FRM_ERRS,
 };
@@ -131,6 +134,10 @@
 	SPOA_FRM_T_AGENT,
 };
 
+/* Flags set on the SPOE frame */
+#define SPOE_FRM_FL_FIN         0x00000001
+#define SPOE_FRM_FL_ABRT        0x00000002
+
 /* Masks to get data type or flags value */
 #define SPOE_DATA_T_MASK  0x0F
 #define SPOE_DATA_FL_MASK 0xF0
@@ -157,8 +164,10 @@
 
 	unsigned int         stream_id;
 	unsigned int         frame_id;
-	bool                 hcheck;    /* true is the CONNECT frame is a healthcheck */
-	int                  ip_score;  /* -1 if unset, else between 0 and 100 */
+	unsigned int         flags;
+	bool                 hcheck;     /* true is the CONNECT frame is a healthcheck */
+	bool                 fragmented; /* true if the frame is fragmented */
+	int                  ip_score;   /* -1 if unset, else between 0 and 100 */
 
 	struct event         process_frame_event;
 	struct worker       *worker;
@@ -166,6 +175,9 @@
 	struct client       *client;
 	struct list          list;
 
+	char                *frag_buf; /* used to accumulate payload of a fragmented frame */
+	unsigned int         frag_len;
+
 	char                 data[0];
 };
 
@@ -190,6 +202,7 @@
 	struct spoe_engine *engine;
 	bool                pipelining;
 	bool                async;
+	bool                fragmentation;
 
 	struct worker      *worker;
 	struct list         by_worker;
@@ -244,20 +257,24 @@
 static bool           debug            = false;
 static bool           pipelining       = false;
 static bool           async            = false;
+static bool           fragmentation    = false;
 
 
 static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
-	[SPOE_FRM_ERR_NONE]           = "normal",
-	[SPOE_FRM_ERR_IO]             = "I/O error",
-	[SPOE_FRM_ERR_TOUT]           = "a timeout occurred",
-	[SPOE_FRM_ERR_TOO_BIG]        = "frame is too big",
-	[SPOE_FRM_ERR_INVALID]        = "invalid frame received",
-	[SPOE_FRM_ERR_NO_VSN]         = "version value not found",
-	[SPOE_FRM_ERR_NO_FRAME_SIZE]  = "max-frame-size value not found",
-	[SPOE_FRM_ERR_NO_CAP]         = "capabilities value not found",
-	[SPOE_FRM_ERR_BAD_VSN]        = "unsupported version",
-	[SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
-	[SPOE_FRM_ERR_UNKNOWN]        = "an unknown error occurred",
+	[SPOE_FRM_ERR_NONE]               = "normal",
+	[SPOE_FRM_ERR_IO]                 = "I/O error",
+	[SPOE_FRM_ERR_TOUT]               = "a timeout occurred",
+	[SPOE_FRM_ERR_TOO_BIG]            = "frame is too big",
+	[SPOE_FRM_ERR_INVALID]            = "invalid frame received",
+	[SPOE_FRM_ERR_NO_VSN]             = "version value not found",
+	[SPOE_FRM_ERR_NO_FRAME_SIZE]      = "max-frame-size value not found",
+	[SPOE_FRM_ERR_NO_CAP]             = "capabilities value not found",
+	[SPOE_FRM_ERR_BAD_VSN]            = "unsupported version",
+	[SPOE_FRM_ERR_BAD_FRAME_SIZE]     = "max-frame-size too big or too small",
+	[SPOE_FRM_ERR_FRAG_NOT_SUPPORTED] = "fragmentation not supported",
+	[SPOE_FRM_ERR_INTERLACED_FRAMES]  = "invalid interlaced frames",
+	[SPOE_FRM_ERR_RES]                = "resource allocation error",
+	[SPOE_FRM_ERR_UNKNOWN]            = "an unknown error occurred",
 };
 
 static void signal_cb(evutil_socket_t, short, void *);
@@ -640,6 +657,15 @@
 				client->async = true;
 			}
 		}
+		else if (sz - i >= 13 && !strncmp(str + i, "fragmentation", 13)) {
+			i += 5;
+			if (sz == i || isspace(str[i]) || str[i] == ',') {
+				DEBUG(frame->worker,
+				      "<%lu> HAProxy supports fragmented frame",
+				      client->id);
+				client->fragmentation = true;
+			}
+		}
 
 		if (sz == i || (delim = memchr(str + i, ',', sz-i)) == NULL)
 			break;
@@ -740,9 +766,16 @@
 
 	DEBUG(frame->worker, "<%lu> Decode HAProxy HELLO frame", client->id);
 
-	/* Skip flags */
+	/* Retrieve flags */
+	memcpy((char *)&(frame->flags), buf+idx, 4);
 	idx += 4;
 
+	/* Fragmentation is not supported for HELLO frame */
+	if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+		client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+		goto error;
+	}
+
 	/* stream-id and frame-id must be cleared */
 	if (buf[idx] != 0 || buf[idx+1] != 0) {
 		client->status_code = SPOE_FRM_ERR_INVALID;
@@ -846,9 +879,16 @@
 
 	DEBUG(frame->worker, "<%lu> Decode HAProxy DISCONNECT frame", client->id);
 
-	/* Skip flags */
+	/* Retrieve flags */
+	memcpy((char *)&(frame->flags), buf+idx, 4);
 	idx += 4;
 
+	/* Fragmentation is not supported for DISCONNECT frame */
+	if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+		client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+		goto error;
+	}
+
 	/* stream-id and frame-id must be cleared */
 	if (buf[idx] != 0 || buf[idx+1] != 0) {
 		client->status_code = SPOE_FRM_ERR_INVALID;
@@ -906,8 +946,8 @@
 }
 
 /* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
- * occurred or if the frame must be ignored, 0 if the frame must be ack without
- * any processing, otherwise the number of read bytes (always > 0). */
+ * occurred, 0 if it must be must be ignored, otherwise the number of read
+ * bytes. */
 static int
 handle_hanotify(struct spoe_frame *frame)
 {
@@ -923,9 +963,16 @@
 
 	DEBUG(frame->worker, "<%lu> Decode HAProxy NOTIFY frame", client->id);
 
-	/* Skip flags */
+	/* Retrieve flags */
+	memcpy((char *)&(frame->flags), buf+idx, 4);
 	idx += 4;
 
+	/* Fragmentation is not supported for DISCONNECT frame */
+	if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
+		client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+		goto error;
+	}
+
 	/* Read the stream-id */
 	if ((i = decode_spoe_varint(buf+idx, end, &stream_id)) == -1)
 		goto ignore;
@@ -936,20 +983,49 @@
 		goto ignore;
 	idx += i;
 
-	frame->stream_id = (unsigned int)stream_id;
-	frame->frame_id  = (unsigned int)frame_id;
+	if (frame->fragmented == true) {
+		if (frame->stream_id != (unsigned int)stream_id ||
+		    frame->frame_id  != (unsigned int)frame_id) {
+			client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
+			goto error;
+		}
 
-	DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u",
-	      client->id, frame->stream_id, frame->frame_id);
+		if (frame->flags & SPOE_FRM_FL_ABRT) {
+			DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+			      " - Abort processing of a fragmented frame"
+			      " - frag_len=%u - len=%u - offset=%u",
+			      client->id, frame->stream_id, frame->frame_id,
+			      frame->frag_len, frame->len, idx);
+			goto ignore;
+		}
+		DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+		      " - %s fragment of a fragmented frame received"
+		      " - frag_len=%u - len=%u - offset=%u",
+		      client->id, frame->stream_id, frame->frame_id,
+		      (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
+		      frame->frag_len, frame->len, idx);
+	}
+	else {
+		frame->stream_id = (unsigned int)stream_id;
+		frame->frame_id  = (unsigned int)frame_id;
 
-	if (buf + idx == end) {
-		return 0;
+		DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+		      " - %s frame received"
+		      " - frag_len=%u - len=%u - offset=%u",
+		      client->id, frame->stream_id, frame->frame_id,
+		      (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
+		      frame->frag_len, frame->len, idx);
+
+		frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
 	}
 
 	frame->offset = idx;
 	return idx;
 
   ignore:
+	return 0;
+
+  error:
 	return -1;
 }
 
@@ -960,7 +1036,9 @@
 {
 	struct client *client = frame->client;
 	char          *buf    = frame->buf;
-	int            idx    = 0;
+	char           capabilities[64];
+	int            n, idx = 0;
+	unsigned int   flags  = SPOE_FRM_FL_FIN;
 
 	DEBUG(frame->worker, "<%lu> Encode Agent HELLO frame", client->id);
 	frame->type = SPOA_FRM_T_AGENT;
@@ -968,8 +1046,8 @@
 	/* Frame Type */
 	buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
 
-	/* No flags for now */
-	memset(buf+idx, 0, 4); /* No flags */
+	/* Set flags */
+	memcpy(buf+idx, (char *)&flags, 4);
 	idx += 4;
 
 	/* No stream-id and frame-id for HELLO frames */
@@ -994,18 +1072,41 @@
 	/* "capabilities" K/V item */
 	idx += encode_spoe_string("capabilities", 12, buf+idx);
 	buf[idx++] = SPOE_DATA_T_STR;
-	if (client->pipelining == true && client->async == true)
-		idx += encode_spoe_string("pipelining,async", 16, buf+idx);
-	else if (client->pipelining == true)
-		idx += encode_spoe_string("pipelining", 10, buf+idx);
-	else if (client->async == true)
-		idx += encode_spoe_string("async", 5, buf+idx);
+
+	memset(capabilities, 0, sizeof(capabilities));
+	n = 0;
+
+	/*     1. Fragmentation capability ? */
+	if (fragmentation == true) {
+		memcpy(capabilities, "fragmentation", 13);
+		n += 13;
+	}
+	/*     2. Pipelining capability ? */
+	if (client->pipelining == true && n != 0) {
+		memcpy(capabilities + n, ", pipelining", 12);
+		n += 12;
+	}
+	else if (client->pipelining == true) {
+		memcpy(capabilities, "pipelining", 10);
+		n += 10;
+	}
+	/*     3. Async capability ? */
+	if (client->async == true && n != 0) {
+		memcpy(capabilities + n, ", async", 7);
+		n += 7;
+	}
+	else if (client->async == true) {
+		memcpy(capabilities, "async", 5);
+		n += 5;
+	}
+	/*     4. Encode capabilities string */
+	if (n != 0)
+		idx += encode_spoe_string(capabilities, n, buf+idx);
 	else
 		idx += encode_spoe_string(NULL, 0, buf+idx);
 
-	DEBUG(frame->worker, "<%lu> Agent capabilities : %s %s",
-	      client->id, (client->pipelining?"pipelining":""),
-	      (client->async?"async":""));
+	DEBUG(frame->worker, "<%lu> Agent capabilities : %.*s",
+	      client->id, n, capabilities);
 
 	frame->len = idx;
 	return idx;
@@ -1020,6 +1121,7 @@
 	char           *buf   = frame->buf;
 	const char     *reason;
 	int             rlen, idx = 0;
+	unsigned int    flags  = SPOE_FRM_FL_FIN;
 
 	DEBUG(frame->worker, "<%lu> Encode Agent DISCONNECT frame", client->id);
 	frame->type = SPOA_FRM_T_AGENT;
@@ -1032,8 +1134,8 @@
 	/* Frame type */
 	buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
 
-	/* No flags for now */
-	memset(buf+idx, 0, 4);
+	/* Set flags */
+	memcpy(buf+idx, (char *)&flags, 4);
 	idx += 4;
 
 	/* No stream-id and frame-id for DISCONNECT frames */
@@ -1065,8 +1167,9 @@
 static int
 prepare_agentack(struct spoe_frame *frame)
 {
-	char *buf = frame->buf;
-	int   idx = 0;
+	char        *buf = frame->buf;
+	int          idx = 0;
+	unsigned int flags  = SPOE_FRM_FL_FIN;
 
 	/* Be careful here, in async mode, frame->client can be NULL */
 
@@ -1076,8 +1179,8 @@
 	/* Frame type */
 	buf[idx++] = SPOE_FRM_T_AGENT_ACK;
 
-	/* No flags for now */
-	memset(buf+idx, 0, 4); /* No flags */
+	/* Set flags */
+	memcpy(buf+idx, (char *)&flags, 4);
 	idx += 4;
 
 	/* Set stream-id and frame-id for ACK frames */
@@ -1140,6 +1243,8 @@
 
 	worker = frame->worker;
 	LIST_DEL(&frame->list);
+	if (frame->frag_buf)
+		free(frame->frag_buf);
 	memset(frame, 0, sizeof(*frame)+max_frame_size+4);
 	LIST_ADDQ(&worker->frames, &frame->list);
 }
@@ -1186,14 +1291,21 @@
 	if (frame == NULL)
 		return;
 
-	frame->type      = SPOA_FRM_T_UNKNOWN;
-	frame->buf       = (char *)(frame->data);
-	frame->offset    = 0;
-	frame->len       = 0;
-	frame->stream_id = 0;
-	frame->frame_id  = 0;
-	frame->hcheck    = false;
-	frame->ip_score  = -1;
+	if (frame->frag_buf)
+		free(frame->frag_buf);
+
+	frame->type       = SPOA_FRM_T_UNKNOWN;
+	frame->buf        = (char *)(frame->data);
+	frame->offset     = 0;
+	frame->len        = 0;
+	frame->stream_id  = 0;
+	frame->frame_id   = 0;
+	frame->flags      = 0;
+	frame->hcheck     = false;
+	frame->fragmented = false;
+	frame->ip_score   = -1;
+	frame->frag_buf   = NULL;
+	frame->frag_len   = 0;
 	LIST_INIT(&frame->list);
 }
 
@@ -1414,8 +1526,8 @@
 	int                idx    = frame->offset;
 
 	DEBUG(frame->worker,
-	      "Process frame messages : STREAM-ID=%u - FRAME-ID=%u",
-	      frame->stream_id, frame->frame_id);
+	      "Process frame messages : STREAM-ID=%u - FRAME-ID=%u - length=%u bytes",
+	      frame->stream_id, frame->frame_id, frame->len - frame->offset);
 
 	/* Loop on messages */
 	while (buf+idx < end) {
@@ -1471,7 +1583,10 @@
 
   stop_processing:
 	/* Prepare agent ACK frame */
+	frame->buf    = (char *)(frame->data) + 4;
 	frame->offset = 0;
+	frame->len    = 0;
+	frame->flags  = 0;
 	idx = prepare_agentack(frame);
 
 	if (frame->ip_score != -1) {
@@ -1547,20 +1662,19 @@
 			goto write_frame;
 
 		case SPOA_ST_PROCESSING:
-			n = handle_hanotify(frame);
-			if (n < 0 && frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
+			if (frame->buf[0] == SPOE_FRM_T_HAPROXY_DISCON) {
 				client->state = SPOA_ST_DISCONNECTING;
 				goto disconnecting;
 			}
+			n = handle_hanotify(frame);
 			if (n < 0) {
-				LOG(client->worker, "Ignore invalid or unknown frame");
-				goto ignore_frame;
+				LOG(client->worker, "Failed to decode frame: %s",
+				    spoe_frm_err_reasons[client->status_code]);
+				goto disconnect;
 			}
 			if (n == 0) {
-				DEBUG(client->worker, "<%lu> No message found, ack it now",
-				      client->id);
-				prepare_agentack(frame);
-				goto write_frame;
+				LOG(client->worker, "Ignore invalid/unknown/aborted frame");
+				goto ignore_frame;
 			}
 			else
 				goto process_frame;
@@ -1583,6 +1697,34 @@
 	return;
 
   process_frame:
+	if (frame->fragmented == true) {
+		char  *buf;
+		size_t len = frame->len - frame->offset;
+
+		buf = realloc(frame->frag_buf, frame->frag_len + len);
+		if (buf == NULL) {
+			client->status_code = SPOE_FRM_ERR_RES;
+			goto disconnect;
+		}
+		memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
+		frame->frag_buf  = buf;
+		frame->frag_len += len;
+
+		if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+			/* Wait for next fragments */
+			frame->buf    = (char *)(frame->data);
+			frame->offset = 0;
+			frame->len    = 0;
+			frame->flags  = 0;
+			return;
+		}
+
+		frame->buf    = frame->frag_buf;
+		frame->len    = frame->frag_len;
+		frame->offset = 0;
+		/* fall through */
+	}
+
 	process_incoming_frame(frame);
 	client->incoming_frame = NULL;
 	return;
@@ -1829,7 +1971,7 @@
 		"                           but can be in any other unit if the number is suffixed\n"
 		"                           by a unit (us, ms, s)\n"
 		"\n"
-		"    Supported capabilities: pipelining, async\n",
+		"    Supported capabilities: fragmentation, pipelining, async\n",
 		prog, MAX_FRAME_SIZE, DEFAULT_PORT, NUM_WORKERS);
 }
 
@@ -1863,6 +2005,8 @@
 					pipelining = true;
 				else if (!strcmp(optarg, "async"))
 					async = true;
+				else if (!strcmp(optarg, "fragmentation"))
+					fragmentation = true;
 				else
 					fprintf(stderr, "WARNING: unsupported capability '%s'\n", optarg);
 				break;
@@ -1964,8 +2108,8 @@
 
 	DEBUG(&null_worker,
 	      "Server is ready"
-	      " [pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
-	      (pipelining?"true":"false"), (async?"true":"false"),
+	      " [fragmentation=%s - pipelining=%s - async=%s - debug=%s - max-frame-size=%u]",
+	      (fragmentation?"true":"false"), (pipelining?"true":"false"), (async?"true":"false"),
 	      (debug?"true":"false"), max_frame_size);
 	event_base_dispatch(base);