MINOR: spoe: Improve implementation of the payload fragmentation
Now, when a payload is fragmented, the first frame must define the frame type
and the followings must use the special type SPOE_FRM_T_UNSET. This way, it is
easy to know if a fragment is the first one or not. Of course, all frames must
still share the same stream-id and frame-id.
Update SPOA example accordingly.
diff --git a/contrib/spoa_example/spoa.c b/contrib/spoa_example/spoa.c
index e245984..7c6d504 100644
--- a/contrib/spoa_example/spoa.c
+++ b/contrib/spoa_example/spoa.c
@@ -401,6 +401,42 @@
return ret;
}
+static int
+acc_payload(struct spoe_frame *frame)
+{
+ struct client *client = frame->client;
+ char *buf;
+ size_t len = frame->len - frame->offset;
+ int ret = frame->offset;
+
+ /* No need to accumulation payload */
+ if (frame->fragmented == false)
+ return ret;
+
+ buf = realloc(frame->frag_buf, frame->frag_len + len);
+ if (buf == NULL) {
+ client->status_code = SPOE_FRM_ERR_RES;
+ return -1;
+ }
+ memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
+ frame->frag_buf = buf;
+ frame->frag_len += len;
+
+ if (!(frame->flags & SPOE_FRM_FL_FIN)) {
+ /* Wait for next parts */
+ frame->buf = (char *)(frame->data);
+ frame->offset = 0;
+ frame->len = 0;
+ frame->flags = 0;
+ return 1;
+ }
+
+ frame->buf = frame->frag_buf;
+ frame->len = frame->frag_len;
+ frame->offset = 0;
+ return ret;
+}
+
/* Check disconnect status code. It returns -1 if an error occurred, the number
* of read bytes otherwise. */
static int
@@ -454,6 +490,8 @@
return ret;
}
+
+
/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
* occurred, otherwise the number of read bytes. HELLO frame cannot be
* ignored and having another frame than a HELLO frame is an error. */
@@ -664,7 +702,7 @@
memcpy((char *)&(frame->flags), p, 4);
p += 4;
- /* Fragmentation is not supported for DISCONNECT frame */
+ /* Fragmentation is not supported */
if (!(frame->flags & SPOE_FRM_FL_FIN) && fragmentation == false) {
client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
goto error;
@@ -676,44 +714,87 @@
if (spoe_decode_varint(&p, end, &frame_id) == -1)
goto ignore;
- if (frame->fragmented == true) {
- if (frame->stream_id != (unsigned int)stream_id ||
- frame->frame_id != (unsigned int)frame_id) {
- client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
- goto error;
- }
+ frame->stream_id = (unsigned int)stream_id;
+ frame->frame_id = (unsigned int)frame_id;
- if (frame->flags & SPOE_FRM_FL_ABRT) {
- DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
- " - Abort processing of a fragmented frame"
- " - frag_len=%u - len=%u - offset=%ld",
- client->id, frame->stream_id, frame->frame_id,
- frame->frag_len, frame->len, p - frame->buf);
- goto ignore;
- }
- DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
- " - %s fragment of a fragmented frame received"
- " - frag_len=%u - len=%u - offset=%ld",
- client->id, frame->stream_id, frame->frame_id,
- (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
- frame->frag_len, frame->len, p - frame->buf);
+ DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+ " - %s frame received"
+ " - frag_len=%u - len=%u - offset=%ld",
+ client->id, frame->stream_id, frame->frame_id,
+ (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
+ frame->frag_len, frame->len, p - frame->buf);
+
+ frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
+ frame->offset = (p - frame->buf);
+ return acc_payload(frame);
+
+ ignore:
+ return 0;
+
+ error:
+ return -1;
+}
+
+/* Decode next part of a fragmented frame received from HAProxy. It returns -1
+ * if an error occurred, 0 if it must be must be ignored, otherwise the number
+ * of read bytes. */
+static int
+handle_hafrag(struct spoe_frame *frame)
+{
+ struct client *client = frame->client;
+ char *p, *end;
+ uint64_t stream_id, frame_id;
+
+ p = frame->buf;
+ end = frame->buf + frame->len;
+
+ /* Check frame type */
+ if (*p++ != SPOE_FRM_T_UNSET)
+ goto ignore;
+
+ DEBUG(frame->worker, "<%lu> Decode Next part of a fragmented frame", client->id);
+
+ /* Fragmentation is not supported */
+ if (fragmentation == false) {
+ client->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ goto error;
}
- else {
- frame->stream_id = (unsigned int)stream_id;
- frame->frame_id = (unsigned int)frame_id;
+
+ /* Retrieve flags */
+ memcpy((char *)&(frame->flags), p, 4);
+ p+= 4;
+
+ /* Read the stream-id and frame-id */
+ if (spoe_decode_varint(&p, end, &stream_id) == -1)
+ goto ignore;
+ if (spoe_decode_varint(&p, end, &frame_id) == -1)
+ goto ignore;
+ if (frame->fragmented == false ||
+ frame->stream_id != (unsigned int)stream_id ||
+ frame->frame_id != (unsigned int)frame_id) {
+ client->status_code = SPOE_FRM_ERR_INTERLACED_FRAMES;
+ goto error;
+ }
+
+ if (frame->flags & SPOE_FRM_FL_ABRT) {
DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
- " - %s frame received"
+ " - Abort processing of a fragmented frame"
" - frag_len=%u - len=%u - offset=%ld",
client->id, frame->stream_id, frame->frame_id,
- (frame->flags & SPOE_FRM_FL_FIN) ? "unfragmented" : "fragmented",
frame->frag_len, frame->len, p - frame->buf);
-
- frame->fragmented = !(frame->flags & SPOE_FRM_FL_FIN);
+ goto ignore;
}
+ DEBUG(frame->worker, "<%lu> STREAM-ID=%u - FRAME-ID=%u"
+ " - %s fragment of a fragmented frame received"
+ " - frag_len=%u - len=%u - offset=%ld",
+ client->id, frame->stream_id, frame->frame_id,
+ (frame->flags & SPOE_FRM_FL_FIN) ? "last" : "next",
+ frame->frag_len, frame->len, p - frame->buf);
+
frame->offset = (p - frame->buf);
- return frame->offset;
+ return acc_payload(frame);
ignore:
return 0;
@@ -1356,7 +1437,11 @@
client->state = SPOA_ST_DISCONNECTING;
goto disconnecting;
}
- n = handle_hanotify(frame);
+ if (frame->buf[0] == SPOE_FRM_T_UNSET)
+ n = handle_hafrag(frame);
+ else
+ n = handle_hanotify(frame);
+
if (n < 0) {
LOG(client->worker, "Failed to decode frame: %s",
spoe_frm_err_reasons[client->status_code]);
@@ -1366,6 +1451,8 @@
LOG(client->worker, "Ignore invalid/unknown/aborted frame");
goto ignore_frame;
}
+ else if (n == 1)
+ goto noop;
else
goto process_frame;
@@ -1382,39 +1469,14 @@
goto disconnect;
}
+ noop:
+ return;
+
ignore_frame:
reset_frame(frame);
return;
process_frame:
- if (frame->fragmented == true) {
- char *buf;
- size_t len = frame->len - frame->offset;
-
- buf = realloc(frame->frag_buf, frame->frag_len + len);
- if (buf == NULL) {
- client->status_code = SPOE_FRM_ERR_RES;
- goto disconnect;
- }
- memcpy(buf + frame->frag_len, frame->buf + frame->offset, len);
- frame->frag_buf = buf;
- frame->frag_len += len;
-
- if (!(frame->flags & SPOE_FRM_FL_FIN)) {
- /* Wait for next fragments */
- frame->buf = (char *)(frame->data);
- frame->offset = 0;
- frame->len = 0;
- frame->flags = 0;
- return;
- }
-
- frame->buf = frame->frag_buf;
- frame->len = frame->frag_len;
- frame->offset = 0;
- /* fall through */
- }
-
process_incoming_frame(frame);
client->incoming_frame = NULL;
return;
diff --git a/include/types/spoe.h b/include/types/spoe.h
index a6b0ffc..b65bc7a 100644
--- a/include/types/spoe.h
+++ b/include/types/spoe.h
@@ -294,6 +294,8 @@
/* Frame Types sent by HAProxy and by agents */
enum spoe_frame_type {
+ SPOE_FRM_T_UNSET = 0,
+
/* Frames sent by HAProxy */
SPOE_FRM_T_HAPROXY_HELLO = 1,
SPOE_FRM_T_HAPROXY_DISCON,
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 8156287..ddfb67b 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -495,30 +495,76 @@
p = frame;
end = frame+size;
+ stream_id = ctx->stream_id;
+ frame_id = ctx->frame_id;
+
+ if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
+ /* The fragmentation is not supported by the applet */
+ if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
+ return -1;
+ }
+ flags = ctx->frag_ctx.flags;
+ }
+
+ /* Set Frame type */
+ *p++ = SPOE_FRM_T_HAPROXY_NOTIFY;
+
+ /* Set flags */
+ memcpy(p, (char *)&flags, 4);
+ p += 4;
+
+ /* Set stream-id and frame-id */
+ if (spoe_encode_varint(stream_id, &p, end) == -1)
+ goto too_big;
+ if (spoe_encode_varint(frame_id, &p, end) == -1)
+ goto too_big;
+
+ /* Copy encoded messages, if possible */
+ sz = ctx->buffer->i;
+ if (p + sz >= end)
+ goto too_big;
+ memcpy(p, ctx->buffer->p, sz);
+ p += sz;
+
+ return (p - frame);
+
+ too_big:
+ SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_TOO_BIG;
+ return 0;
+}
+
+/* Encode next part of a fragmented frame sent by HAProxy to an agent. It
+ * returns the number of encoded bytes in the frame on success, 0 if an encoding
+ * error occurred and -1 if a fatal error occurred. */
+static int
+spoe_prepare_hafrag_frame(struct appctx *appctx, struct spoe_context *ctx,
+ char *frame, size_t size)
+{
+ char *p, *end;
+ unsigned int stream_id, frame_id;
+ unsigned int flags;
+ size_t sz;
+
+ p = frame;
+ end = frame+size;
+
/* <ctx> is null when the stream has aborted the processing of a
* fragmented frame. In this case, we must notify the corresponding
* agent using ids stored in <frag_ctx>. */
if (ctx == NULL) {
- flags |= SPOE_FRM_FL_ABRT;
+ flags = (SPOE_FRM_FL_FIN|SPOE_FRM_FL_ABRT);
stream_id = SPOE_APPCTX(appctx)->frag_ctx.cursid;
frame_id = SPOE_APPCTX(appctx)->frag_ctx.curfid;
}
else {
+ flags = ctx->frag_ctx.flags;
stream_id = ctx->stream_id;
frame_id = ctx->frame_id;
-
- if (ctx->flags & SPOE_CTX_FL_FRAGMENTED) {
- /* The fragmentation is not supported by the applet */
- if (!(SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_FRAGMENTATION)) {
- SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_FRAG_NOT_SUPPORTED;
- return -1;
- }
- flags = ctx->frag_ctx.flags;
- }
}
/* Set Frame type */
- *p++ = SPOE_FRM_T_HAPROXY_NOTIFY;
+ *p++ = SPOE_FRM_T_UNSET;
/* Set flags */
memcpy(p, (char *)&flags, 4);
@@ -530,13 +576,17 @@
if (spoe_encode_varint(frame_id, &p, end) == -1)
goto too_big;
+ if (ctx == NULL)
+ goto end;
+
/* Copy encoded messages, if possible */
- sz = SPOE_APPCTX(appctx)->buffer->i;
+ sz = ctx->buffer->i;
if (p + sz >= end)
goto too_big;
- memcpy(p, SPOE_APPCTX(appctx)->buffer->p, sz);
+ memcpy(p, ctx->buffer->p, sz);
p += sz;
+ end:
return (p - frame);
too_big:
@@ -1150,12 +1200,13 @@
if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
agent->applets_idle--;
- si_shutw(si);
- si_shutr(si);
- si_ic(si)->flags |= CF_READ_NULL;
appctx->st0 = SPOE_APPCTX_ST_END;
if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
spoe_appctx->status_code = SPOE_FRM_ERR_IO;
+
+ si_shutw(si);
+ si_shutr(si);
+ si_ic(si)->flags |= CF_READ_NULL;
}
/* Destroy the task attached to this applet */
@@ -1351,19 +1402,36 @@
return 0;
}
+
static int
-spoe_handle_sending_frame_appctx(struct appctx *appctx, struct spoe_context *ctx,
- int *skip)
+spoe_handle_sending_frame_appctx(struct appctx *appctx, int *skip)
{
- struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+ struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
+ struct spoe_context *ctx = NULL;
char *frame, *buf;
int ret;
/* 4 bytes are reserved at the beginning of <buf> to store the frame
* length. */
buf = trash.str; frame = buf+4;
- ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
- SPOE_APPCTX(appctx)->max_frame_size);
+
+ if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
+ ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
+ ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
+ SPOE_APPCTX(appctx)->max_frame_size);
+ }
+ else if (LIST_ISEMPTY(&agent->sending_queue)) {
+ *skip = 1;
+ ret = 1;
+ goto end;
+ }
+ else {
+ ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+ ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
+ SPOE_APPCTX(appctx)->max_frame_size);
+
+ }
+
if (ret > 1)
ret = spoe_send_frame(appctx, buf, ret);
@@ -1376,6 +1444,7 @@
if (ctx == NULL)
goto abort_frag_frame;
+ spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
ctx->state = SPOE_CTX_ST_ERROR;
@@ -1391,6 +1460,7 @@
if (ctx == NULL)
goto abort_frag_frame;
+ spoe_release_buffer(&ctx->buffer, &ctx->buffer_wait);
LIST_DEL(&ctx->list);
LIST_INIT(&ctx->list);
if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
@@ -1506,7 +1576,6 @@
{
struct stream_interface *si = appctx->owner;
struct spoe_agent *agent = SPOE_APPCTX(appctx)->agent;
- struct spoe_context *ctx = NULL;
unsigned int fpa = 0;
int ret, skip_sending = 0, skip_receiving = 0;
@@ -1531,39 +1600,21 @@
skip_sending, skip_receiving,
spoe_appctx_state_str[appctx->st0]);
- if (fpa > agent->max_fpa || (skip_sending && skip_receiving))
+ if (fpa > agent->max_fpa)
goto stop;
- else if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
+ else if (skip_sending || appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK) {
if (skip_receiving)
goto stop;
goto recv_frame;
}
- else if (skip_sending)
- goto recv_frame;
- else if (appctx->st0 == SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY) {
- ctx = SPOE_APPCTX(appctx)->frag_ctx.ctx;
- goto send_frame;
- }
- else if (LIST_ISEMPTY(&agent->sending_queue)) {
- skip_sending = 1;
- goto recv_frame;
- }
- ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
- send_frame:
- /* Transfer the buffer ownership to the SPOE appctx */
- if (ctx) {
- SPOE_APPCTX(appctx)->buffer = ctx->buffer;
- ctx->buffer = &buf_empty;
- }
- ret = spoe_handle_sending_frame_appctx(appctx, ctx, &skip_sending);
+ /* send_frame */
+ ret = spoe_handle_sending_frame_appctx(appctx, &skip_sending);
switch (ret) {
case -1: /* error */
goto next;
case 0: /* ignore */
- spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer,
- &SPOE_APPCTX(appctx)->buffer_wait);
agent->sending_rate++;
fpa++;
break;
@@ -1572,8 +1623,6 @@
break;
default:
- spoe_release_buffer(&SPOE_APPCTX(appctx)->buffer,
- &SPOE_APPCTX(appctx)->buffer_wait);
agent->sending_rate++;
fpa++;
break;
@@ -2571,7 +2620,7 @@
spoe_reset_context(struct spoe_context *ctx)
{
ctx->state = SPOE_CTX_ST_READY;
- ctx->flags &= ~SPOE_CTX_FL_PROCESS;
+ ctx->flags &= ~(SPOE_CTX_FL_PROCESS|SPOE_CTX_FL_FRAGMENTED);
}