MINOR: spoe: Improve implementation of the payload fragmentation
Now, when a payload is fragmented, the first frame must define the frame type
and the followings must use the special type SPOE_FRM_T_UNSET. This way, it is
easy to know if a fragment is the first one or not. Of course, all frames must
still share the same stream-id and frame-id.
Update SPOA example accordingly.
diff --git a/contrib/spoa_example/spoa.c b/contrib/spoa_example/spoa.c
index e245984..7c6d504 100644
--- a/contrib/spoa_example/spoa.c
+++ b/contrib/spoa_example/spoa.c
@@ -401,6 +401,42 @@
return ret;
}
+static int
+acc_payload(struct spoe_frame *frame)
+{
+ struct client *client = frame->client;
+ char *buf;
+ size_t len = frame->len - frame->offset;
+ int ret = frame->offset;
+
+ /* No need to accumulation payload */
+ if (frame->fragmented == false)
+ return ret;
+
+ buf = realloc(frame->frag_buf, frame->frag_len + len);
+ if (buf == NULL) {
+ client->status_code = SPOE_FRM_ERR_RES;
+ return -1;
+ }
+ memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
+ frame->frag_buf = buf;
+ frame->frag_len += len;
+
+ if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+ /* Wait for next parts */
+ frame->buf = (char *)(frame->data);
+ frame->offset = 0;
+ frame->len = 0;
+ frame->flags = 0;
+ return 1;
+ }
+
+ frame->buf = frame->frag_buf;
+ frame->len = frame->frag_len;
+ frame->offset = 0;
+ return ret;
+}
+
/* Check disconnect status code. It returns -1 if an error occurred, the number
* of read bytes otherwise. */
static int
@@ -454,6 +490,8 @@
return ret;
}
+
+
/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
* occurred, otherwise the number of read bytes. HELLO frame cannot be
* ignored and having another frame than a HELLO frame is an error. */
@@ -664,7 +702,7 @@
memcpy((char *)&(frame->flags), p, 4);
p += 4;
- /* Fragmentation is not supported for DISCONNECT frame */
+ /* Fragmentation is not supported */
if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
goto error;
@@ -676,44 +714,87 @@
if (spoe_decode_varint(&p, end, &frame_id) == -1)
goto ignore;
- if (frame->fragmented == true) {
- if (frame->stream_id != (unsigned int)stream_id ||
- frame->frame_id != (unsigned int)frame_id) {
- client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
- goto error;
- }
+ frame->stream_id = (unsigned int)stream_id;
+ frame->frame_id = (unsigned int)frame_id;
- if (frame->flags & SPOE_FRM_FL_ABRT) {
- DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
- " - Abort processing of a fragmented frame"
- " - frag_len=%u - len=%u - offset=%ld",
- client->id, frame->stream_id, frame->frame_id,
- frame->frag_len, frame->len, p - frame->buf);
- goto ignore;
- }
- DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
- " - %s fragment of a fragmented frame received"
- " - frag_len=%u - len=%u - offset=%ld",
- client->id, frame->stream_id, frame->frame_id,
- (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
- frame->frag_len, frame->len, p - frame->buf);
+ DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+ " - %s frame received"
+ " - frag_len=%u - len=%u - offset=%ld",
+ client->id, frame->stream_id, frame->frame_id,
+ (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
+ frame->frag_len, frame->len, p - frame->buf);
+
+ frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
+ frame->offset = (p - frame->buf);
+ return acc_payload(frame);
+
+ ignore:
+ return 0;
+
+ error:
+ return -1;
+}
+
+/* Decode next part of a fragmented frame received from HAProxy. It returns -1
+ * if an error occurred, 0 if it must be must be ignored, otherwise the number
+ * of read bytes. */
+static int
+handle_hafrag(struct spoe_frame *frame)
+{
+ struct client *client = frame->client;
+ char *p, *end;
+ uint64_t stream_id, frame_id;
+
+ p = frame->buf;
+ end = frame->buf + frame->len;
+
+ /* Check frame type */
+ if (*p++ != SPOE_FRM_T_UNSET)
+ goto ignore;
+
+ DEBUG(frame->worker, "<%lu> Decode Next part of a fragmented frame", client->id);
+
+ /* Fragmentation is not supported */
+ if (fragmentation == false) {
+ client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ goto error;
}
- else {
- frame->stream_id = (unsigned int)stream_id;
- frame->frame_id = (unsigned int)frame_id;
+
+ /* Retrieve flags */
+ memcpy((char *)&(frame->flags), p, 4);
+ p+= 4;
+
+ /* Read the stream-id and frame-id */
+ if (spoe_decode_varint(&p, end, &stream_id) == -1)
+ goto ignore;
+ if (spoe_decode_varint(&p, end, &frame_id) == -1)
+ goto ignore;
+ if (frame->fragmented == false ||
+ frame->stream_id != (unsigned int)stream_id ||
+ frame->frame_id != (unsigned int)frame_id) {
+ client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
+ goto error;
+ }
+
+ if (frame->flags & SPOE_FRM_FL_ABRT) {
DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
- " - %s frame received"
+ " - Abort processing of a fragmented frame"
" - frag_len=%u - len=%u - offset=%ld",
client->id, frame->stream_id, frame->frame_id,
- (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
frame->frag_len, frame->len, p - frame->buf);
-
- frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
+ goto ignore;
}
+ DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+ " - %s fragment of a fragmented frame received"
+ " - frag_len=%u - len=%u - offset=%ld",
+ client->id, frame->stream_id, frame->frame_id,
+ (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
+ frame->frag_len, frame->len, p - frame->buf);
+
frame->offset = (p - frame->buf);
- return frame->offset;
+ return acc_payload(frame);
ignore:
return 0;
@@ -1356,7 +1437,11 @@
client->state = SPOA_ST_DISCONNECTING;
goto disconnecting;
}
- n = handle_hanotify(frame);
+ if (frame->buf[0] == SPOE_FRM_T_UNSET)
+ n = handle_hafrag(frame);
+ else
+ n = handle_hanotify(frame);
+
if (n < 0) {
LOG(client->worker, "Failed to decode frame: %s",
spoe_frm_err_reasons[client->status_code]);
@@ -1366,6 +1451,8 @@
LOG(client->worker, "Ignore invalid/unknown/aborted frame");
goto ignore_frame;
}
+ else if (n == 1)
+ goto noop;
else
goto process_frame;
@@ -1382,39 +1469,14 @@
goto disconnect;
}
+ noop:
+ return;
+
ignore_frame:
reset_frame(frame);
return;
process_frame:
- if (frame->fragmented == true) {
- char *buf;
- size_t len = frame->len - frame->offset;
-
- buf = realloc(frame->frag_buf, frame->frag_len + len);
- if (buf == NULL) {
- client->status_code = SPOE_FRM_ERR_RES;
- goto disconnect;
- }
- memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
- frame->frag_buf = buf;
- frame->frag_len += len;
-
- if (!(frame->flags & SPOE_FRM_FL_FIN)) {
- /* Wait for next fragments */
- frame->buf = (char *)(frame->data);
- frame->offset = 0;
- frame->len = 0;
- frame->flags = 0;
- return;
- }
-
- frame->buf = frame->frag_buf;
- frame->len = frame->frag_len;
- frame->offset = 0;
- /* fall through */
- }
-
process_incoming_frame(frame);
client->incoming_frame = NULL;
return;