MINOR: spoe: Improve implementation of the payload fragmentation

Now, when a payload is fragmented, the first frame must define the frame type
and the followings must use the special type SPOE_FRM_T_UNSET. This way, it is
easy to know if a fragment is the first one or not. Of course, all frames must
still share the same stream-id and frame-id.

Update SPOA example accordingly.
diff --git a/contrib/spoa_example/spoa.c b/contrib/spoa_example/spoa.c
index e245984..7c6d504 100644
--- a/contrib/spoa_example/spoa.c
+++ b/contrib/spoa_example/spoa.c
@@ -401,6 +401,42 @@
 	return ret;
 }
 
+static int
+acc_payload(struct spoe_frame *frame)
+{
+	struct client *client = frame->client;
+	char          *buf;
+	size_t         len = frame->len - frame->offset;
+	int            ret = frame->offset;
+
+	/* No need to accumulation payload */
+	if (frame->fragmented == false)
+		return ret;
+
+	buf = realloc(frame->frag_buf, frame->frag_len + len);
+	if (buf == NULL) {
+		client->status_code = SPOE_FRM_ERR_RES;
+		return -1;
+	}
+	memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
+	frame->frag_buf  = buf;
+	frame->frag_len += len;
+
+	if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+		/* Wait for next parts */
+		frame->buf    = (char *)(frame->data);
+		frame->offset = 0;
+		frame->len    = 0;
+		frame->flags  = 0;
+		return 1;
+	}
+
+	frame->buf    = frame->frag_buf;
+	frame->len    = frame->frag_len;
+	frame->offset = 0;
+	return ret;
+}
+
 /* Check disconnect status code. It returns -1 if an error occurred, the number
  * of read bytes otherwise. */
 static int
@@ -454,6 +490,8 @@
 	return ret;
 }
 
+
+
 /* Decode a HELLO frame received from HAProxy. It returns -1 if an error
  * occurred, otherwise the number of read bytes. HELLO frame cannot be
  * ignored and having another frame than a HELLO frame is an error. */
@@ -664,7 +702,7 @@
 	memcpy((char *)&(frame->flags), p, 4);
 	p += 4;
 
-	/* Fragmentation is not supported for DISCONNECT frame */
+	/* Fragmentation is not supported */
 	if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
 		client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
 		goto error;
@@ -676,44 +714,87 @@
 	if (spoe_decode_varint(&p, end, &frame_id) == -1)
 		goto ignore;
 
-	if (frame->fragmented == true) {
-		if (frame->stream_id != (unsigned int)stream_id ||
-		    frame->frame_id  != (unsigned int)frame_id) {
-			client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
-			goto error;
-		}
+	frame->stream_id = (unsigned int)stream_id;
+	frame->frame_id  = (unsigned int)frame_id;
 
-		if (frame->flags & SPOE_FRM_FL_ABRT) {
-			DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
-			      " - Abort processing of a fragmented frame"
-			      " - frag_len=%u - len=%u - offset=%ld",
-			      client->id, frame->stream_id, frame->frame_id,
-			      frame->frag_len, frame->len, p - frame->buf);
-			goto ignore;
-		}
-		DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
-		      " - %s fragment of a fragmented frame received"
-		      " - frag_len=%u - len=%u - offset=%ld",
-		      client->id, frame->stream_id, frame->frame_id,
-		      (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
-		      frame->frag_len, frame->len, p - frame->buf);
+	DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+	      " - %s frame received"
+	      " - frag_len=%u - len=%u - offset=%ld",
+	      client->id, frame->stream_id, frame->frame_id,
+	      (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
+	      frame->frag_len, frame->len, p - frame->buf);
+
+	frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
+	frame->offset = (p - frame->buf);
+	return acc_payload(frame);
+
+  ignore:
+	return 0;
+
+  error:
+	return -1;
+}
+
+/* Decode next part of a fragmented frame received from HAProxy. It returns -1
+ * if an error occurred, 0 if it must be must be ignored, otherwise the number
+ * of read bytes. */
+static int
+handle_hafrag(struct spoe_frame *frame)
+{
+	struct client *client = frame->client;
+	char          *p, *end;
+	uint64_t       stream_id, frame_id;
+
+	p = frame->buf;
+	end = frame->buf + frame->len;
+
+	/* Check frame type */
+	if (*p++ != SPOE_FRM_T_UNSET)
+		goto ignore;
+
+	DEBUG(frame->worker, "<%lu> Decode Next part of a fragmented frame", client->id);
+
+	/* Fragmentation is not supported */
+	if (fragmentation == false) {
+		client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+		goto error;
 	}
-	else {
-		frame->stream_id = (unsigned int)stream_id;
-		frame->frame_id  = (unsigned int)frame_id;
+
+	/* Retrieve flags */
+	memcpy((char *)&(frame->flags), p, 4);
+	p+= 4;
+
+	/* Read the stream-id and frame-id */
+	if (spoe_decode_varint(&p, end, &stream_id) == -1)
+		goto ignore;
+	if (spoe_decode_varint(&p, end, &frame_id) == -1)
+		goto ignore;
 
+	if (frame->fragmented == false                  ||
+	    frame->stream_id != (unsigned int)stream_id ||
+	    frame->frame_id  != (unsigned int)frame_id) {
+		client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
+		goto error;
+	}
+
+	if (frame->flags & SPOE_FRM_FL_ABRT) {
 		DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
-		      " - %s frame received"
+		      " - Abort processing of a fragmented frame"
 		      " - frag_len=%u - len=%u - offset=%ld",
 		      client->id, frame->stream_id, frame->frame_id,
-		      (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
 		      frame->frag_len, frame->len, p - frame->buf);
-
-		frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
+		goto ignore;
 	}
 
+	DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+	      " - %s fragment of a fragmented frame received"
+	      " - frag_len=%u - len=%u - offset=%ld",
+	      client->id, frame->stream_id, frame->frame_id,
+	      (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
+	      frame->frag_len, frame->len, p - frame->buf);
+
 	frame->offset = (p - frame->buf);
-	return frame->offset;
+	return acc_payload(frame);
 
   ignore:
 	return 0;
@@ -1356,7 +1437,11 @@
 				client->state = SPOA_ST_DISCONNECTING;
 				goto disconnecting;
 			}
-			n = handle_hanotify(frame);
+			if (frame->buf[0] == SPOE_FRM_T_UNSET)
+				n = handle_hafrag(frame);
+			else
+				n = handle_hanotify(frame);
+
 			if (n < 0) {
 				LOG(client->worker, "Failed to decode frame: %s",
 				    spoe_frm_err_reasons[client->status_code]);
@@ -1366,6 +1451,8 @@
 				LOG(client->worker, "Ignore invalid/unknown/aborted frame");
 				goto ignore_frame;
 			}
+			else if (n == 1)
+				goto noop;
 			else
 				goto process_frame;
 
@@ -1382,39 +1469,14 @@
 			goto disconnect;
 	}
 
+  noop:
+	return;
+
   ignore_frame:
 	reset_frame(frame);
 	return;
 
   process_frame:
-	if (frame->fragmented == true) {
-		char  *buf;
-		size_t len = frame->len - frame->offset;
-
-		buf = realloc(frame->frag_buf, frame->frag_len + len);
-		if (buf == NULL) {
-			client->status_code = SPOE_FRM_ERR_RES;
-			goto disconnect;
-		}
-		memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
-		frame->frag_buf  = buf;
-		frame->frag_len += len;
-
-		if (!(frame->flags & SPOE_FRM_FL_FIN)) {
-			/* Wait for next fragments */
-			frame->buf    = (char *)(frame->data);
-			frame->offset = 0;
-			frame->len    = 0;
-			frame->flags  = 0;
-			return;
-		}
-
-		frame->buf    = frame->frag_buf;
-		frame->len    = frame->frag_len;
-		frame->offset = 0;
-		/* fall through */
-	}
-
 	process_incoming_frame(frame);
 	client->incoming_frame = NULL;
 	return;
diff --git a/include/types/spoe.h b/include/types/spoe.h
index a6b0ffc..b65bc7a 100644
--- a/include/types/spoe.h
+++ b/include/types/spoe.h
@@ -294,6 +294,8 @@
 
 /* Frame Types sent by HAProxy and by agents */
 enum spoe_frame_type {
+	SPOE_FRM_T_UNSET = 0,
+
 	/* Frames sent by HAProxy */
 	SPOE_FRM_T_HAPROXY_HELLO = 1,
 	SPOE_FRM_T_HAPROXY_DISCON,
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 8156287..ddfb67b 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -495,30 +495,76 @@
 	p   = frame;
 	end = frame+size;
 
+	stream_id = ctx->stream_id;
+	frame_id  = ctx->frame_id;
+
+	if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
+		/* The fragmentation is not supported by the applet */
+		if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
+			SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+			return -1;
+		}
+		flags = ctx->frag_ctx.flags;
+	}
+
+	/* Set Frame type */
+	*p++ = SPOE_FRM_T_HAPROXY_NOTIFY;
+
+	/* Set flags */
+	memcpy(p, (char *)&flags, 4);
+	p += 4;
+
+	/* Set stream-id and frame-id */
+	if (spoe_encode_varint(stream_id, &p, end) == -1)
+		goto too_big;
+	if (spoe_encode_varint(frame_id, &p, end) == -1)
+		goto too_big;
+
+	/* Copy encoded messages, if possible */
+	sz = ctx->buffer->i;
+	if (p + sz >= end)
+		goto too_big;
+	memcpy(p, ctx->buffer->p, sz);
+	p += sz;
+
+	return (p - frame);
+
+  too_big:
+	SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
+	return 0;
+}
+
+/* Encode next part of a fragmented frame sent by HAProxy to an agent. It
+ * returns the number of encoded bytes in the frame on success, 0 if an encoding
+ * error occurred and -1 if a fatal error occurred. */
+static int
+spoe_prepare_hafrag_frame(struct appctx *appctx, struct spoe_context *ctx,
+			  char *frame, size_t size)
+{
+	char        *p, *end;
+	unsigned int stream_id, frame_id;
+	unsigned int flags;
+	size_t       sz;
+
+	p   = frame;
+	end = frame+size;
+
 	/* <ctx> is null when the stream has aborted the processing of a
 	 * fragmented frame. In this case, we must notify the corresponding
 	 * agent using ids stored in <frag_ctx>. */
 	if (ctx == NULL) {
-		flags    |= SPOE_FRM_FL_ABRT;
+		flags     = (SPOE_FRM_FL_FIN|SPOE_FRM_FL_ABRT);
 		stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid;
 		frame_id  = SPOE_APPCTX(appctx)->frag_ctx.curfid;
 	}
 	else {
+		flags     = ctx->frag_ctx.flags;
 		stream_id = ctx->stream_id;
 		frame_id  = ctx->frame_id;
-
-		if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
-			/* The fragmentation is not supported by the applet */
-			if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
-				SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
-				return -1;
-			}
-			flags = ctx->frag_ctx.flags;
-		}
 	}
 
 	/* Set Frame type */
-	*p++ = SPOE_FRM_T_HAPROXY_NOTIFY;
+	*p++ = SPOE_FRM_T_UNSET;
 
 	/* Set flags */
 	memcpy(p, (char *)&flags, 4);
@@ -530,13 +576,17 @@
 	if (spoe_encode_varint(frame_id, &p, end) == -1)
 		goto too_big;
 
+	if (ctx == NULL)
+		goto end;
+
 	/* Copy encoded messages, if possible */
-	sz = SPOE_APPCTX(appctx)->buffer->i;
+	sz = ctx->buffer->i;
 	if (p + sz >= end)
 		goto too_big;
-	memcpy(p, SPOE_APPCTX(appctx)->buffer->p, sz);
+	memcpy(p, ctx->buffer->p, sz);
 	p += sz;
 
+  end:
 	return (p - frame);
 
   too_big:
@@ -1150,12 +1200,13 @@
 		if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
 			agent->applets_idle--;
 
-		si_shutw(si);
-		si_shutr(si);
-		si_ic(si)->flags |= CF_READ_NULL;
 		appctx->st0 = SPOE_APPCTX_ST_END;
 		if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
 			spoe_appctx->status_code = SPOE_FRM_ERR_IO;
+
+		si_shutw(si);
+		si_shutr(si);
+		si_ic(si)->flags |= CF_READ_NULL;
 	}
 
 	/* Destroy the task attached to this applet */
@@ -1351,19 +1402,36 @@
 	return 0;
 }
 
+
 static int
-spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx,
-				 int *skip)
+spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
 {
-	struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+	struct spoe_agent   *agent = SPOE_APPCTX(appctx)->agent;
+	struct spoe_context *ctx = NULL;
 	char *frame, *buf;
 	int   ret;
 
 	/* 4 bytes are reserved at the beginning of <buf> to store the frame
 	 * length. */
 	buf = trash.str; frame = buf+4;
-	ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
-					  SPOE_APPCTX(appctx)->max_frame_size);
+
+	if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
+		ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
+		ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
+						SPOE_APPCTX(appctx)->max_frame_size);
+	}
+	else if (LIST_ISEMPTY(&agent->sending_queue)) {
+		*skip = 1;
+		ret   = 1;
+		goto end;
+	}
+	else {
+		ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+		ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
+						  SPOE_APPCTX(appctx)->max_frame_size);
+
+	}
+
 	if (ret > 1)
 		ret = spoe_send_frame(appctx, buf, ret);
 
@@ -1376,6 +1444,7 @@
 			if (ctx == NULL)
 				goto abort_frag_frame;
 
+			spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
 			ctx->state = SPOE_CTX_ST_ERROR;
@@ -1391,6 +1460,7 @@
 			if (ctx == NULL)
 				goto abort_frag_frame;
 
+			spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
 			LIST_DEL(&ctx->list);
 			LIST_INIT(&ctx->list);
 			if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
@@ -1506,7 +1576,6 @@
 {
 	struct stream_interface *si    = appctx->owner;
 	struct spoe_agent       *agent = SPOE_APPCTX(appctx)->agent;
-	struct spoe_context     *ctx = NULL;
 	unsigned int  fpa = 0;
 	int           ret, skip_sending = 0, skip_receiving = 0;
 
@@ -1531,39 +1600,21 @@
 		    skip_sending, skip_receiving,
 		    spoe_appctx_state_str[appctx->st0]);
 
-	if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
+	if (fpa > agent->max_fpa)
 		goto stop;
-	else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
+	else if (skip_sending || appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
 		if (skip_receiving)
 			goto stop;
 		goto recv_frame;
 	}
-	else if (skip_sending)
-		goto recv_frame;
-	else if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
-		ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
-		goto send_frame;
-	}
-	else if (LIST_ISEMPTY(&agent->sending_queue)) {
-		skip_sending = 1;
-		goto recv_frame;
-	}
-	ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
 
-  send_frame:
-	/* Transfer the buffer ownership to the SPOE appctx */
-	if (ctx) {
-		SPOE_APPCTX(appctx)->buffer = ctx->buffer;
-		ctx->buffer = &buf_empty;
-	}
-	ret = spoe_handle_sending_frame_appctx(appctx, ctx, &skip_sending);
+	/* send_frame */
+	ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending);
 	switch (ret) {
 		case -1: /* error */
 			goto next;
 
 		case 0: /* ignore */
-			spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer,
-					    &SPOE_APPCTX(appctx)->buffer_wait);
 			agent->sending_rate++;
 			fpa++;
 			break;
@@ -1572,8 +1623,6 @@
 			break;
 
 		default:
-			spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer,
-					    &SPOE_APPCTX(appctx)->buffer_wait);
 			agent->sending_rate++;
 			fpa++;
 			break;
@@ -2571,7 +2620,7 @@
 spoe_reset_context(struct spoe_context *ctx)
 {
 	ctx->state  = SPOE_CTX_ST_READY;
-	ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+	ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
 }