MEDIUM: filters: Adapt filters API to allow again TCP filtering on HTX streams
This change make the payload filtering uniform between TCP and HTTP
filters. Now, in TCP, like in HTTP, there is only one callback responsible to
forward data. Thus, old callbacks, tcp_data() and tcp_forward_data(), are
replaced by a single callback function, tcp_payload(). This new callback gets
the offset in the payload to (re)start the filtering and the maximum amount of
data it can forward. It is the filter's responsibility to be compatible with HTX
streams. If not, it must not set the flag FLT_CFG_FL_HTX.
Because of this change, nxt and fwd offsets are no longer needed. Thus they are
removed from the filter structure with their update functions,
flt_change_next_size() and flt_change_forward_size(). Moreover, the trace filter
has been updated accordingly.
This patch breaks the compatibility with the old API. Thus it should probably
not be backported. But, AFAIK, there is no TCP filter, thus the breakage is very
limited.
diff --git a/include/proto/filters.h b/include/proto/filters.h
index 1ca47c7..2ece189 100644
--- a/include/proto/filters.h
+++ b/include/proto/filters.h
@@ -45,13 +45,6 @@
#define FLT_STRM_OFF(s, chn) (strm_flt(s)->offset[CHN_IDX(chn)])
#define FLT_OFF(flt, chn) ((flt)->offset[CHN_IDX(chn)])
-#define FLT_NXT(flt, chn) ((flt)->next[CHN_IDX(chn)])
-#define FLT_FWD(flt, chn) ((flt)->fwd[CHN_IDX(chn)])
-#define flt_req_nxt(flt) ((flt)->next[0])
-#define flt_rsp_nxt(flt) ((flt)->next[1])
-#define flt_req_fwd(flt) ((flt)->fwd[0])
-#define flt_rsp_fwd(flt) ((flt)->fwd[1])
-
#define HAS_FILTERS(strm) ((strm)->strm_flt.flags & STRM_FLT_FL_HAS_FILTERS)
#define HAS_REQ_DATA_FILTERS(strm) ((strm)->strm_flt.nb_req_data_filters != 0)
@@ -174,58 +167,6 @@
}
}
-/* This function must be called when a filter alter incoming data. It updates
- * next offset value of all filter's predecessors. Do not call this function
- * when a filter change the size of incomding data leads to an undefined
- * behavior.
- *
- * This is the filter's responsiblitiy to update data itself. For now, it is
- * unclear to know how to handle data updates, so we do the minimum here. For
- * example, if you filter an HTTP message, we must update msg->next and
- * msg->chunk_len values.
- */
-static inline void
-flt_change_next_size(struct filter *filter, struct channel *chn, int len)
-{
- struct stream *s = chn_strm(chn);
- struct filter *f;
-
- list_for_each_entry(f, &strm_flt(s)->filters, list) {
- if (f == filter)
- break;
- if (IS_DATA_FILTER(filter, chn))
- FLT_NXT(f, chn) += len;
- }
-}
-
-/* This function must be called when a filter alter forwarded data. It updates
- * offset values (next and forward) of all filters. Do not call this function
- * when a filter change the size of forwarded data leads to an undefined
- * behavior.
- *
- * This is the filter's responsiblitiy to update data itself. For now, it is
- * unclear to know how to handle data updates, so we do the minimum here. For
- * example, if you filter an HTTP message, we must update msg->next and
- * msg->chunk_len values.
- */
-static inline void
-flt_change_forward_size(struct filter *filter, struct channel *chn, int len)
-{
- struct stream *s = chn_strm(chn);
- struct filter *f;
- int before = 1;
-
- list_for_each_entry(f, &strm_flt(s)->filters, list) {
- if (f == filter)
- before = 0;
- if (IS_DATA_FILTER(filter, chn)) {
- if (before)
- FLT_FWD(f, chn) += len;
- FLT_NXT(f, chn) += len;
- }
- }
-}
-
/* This function must be called when a filter alter payload data. It updates
* offsets of all previous filters and the offset of the stream. Do not call
* this function when a filter change the size of payload data leads to an
diff --git a/include/types/filters.h b/include/types/filters.h
index c91eeae..0b54951 100644
--- a/include/types/filters.h
+++ b/include/types/filters.h
@@ -138,12 +138,11 @@
* to the client (mainly, when an error or a redirect
* occur).
* Returns nothing.
- * - tcp_data : Called when unparsed data are available.
- * Returns a negative value if an error occurs, else
- * the number of consumed bytes.
- * - tcp_forward_data : Called when some data can be consumed.
+ *
+ *
+ * - tcp_payload : Called when some data can be consumed.
* Returns a negative value if an error occurs, else
- * or the number of forwarded bytes.
+ * the number of forwarded bytes.
*/
struct flt_ops {
/*
@@ -186,9 +185,8 @@
/*
* TCP callbacks
*/
- int (*tcp_data) (struct stream *s, struct filter *f, struct channel *chn);
- int (*tcp_forward_data)(struct stream *s, struct filter *f, struct channel *chn,
- unsigned int len);
+ int (*tcp_payload) (struct stream *s, struct filter *f, struct channel *chn,
+ unsigned int offset, unsigned int len);
};
/* Flags set on a filter config */
@@ -227,11 +225,8 @@
struct flt_conf *config; /* the filter's configuration */
void *ctx; /* The filter context (opaque) */
unsigned short flags; /* FLT_FL_* */
- unsigned int next[2]; /* Offset, relative to buf->p, to the next byte to parse for a specific channel
- * 0: request channel, 1: response channel */
- unsigned int fwd[2]; /* Offset, relative to buf->p, to the next byte to forward for a specific channel
+ unsigned long long offset[2]; /* Offset of input data already filtered for a specific channel
* 0: request channel, 1: response channel */
- unsigned long long offset[2];
unsigned int pre_analyzers; /* bit field indicating analyzers to pre-process */
unsigned int post_analyzers; /* bit field indicating analyzers to post-process */
struct list list; /* Next filter for the same proxy/stream */
diff --git a/src/filters.c b/src/filters.c
index c7f3ebd..090146e 100644
--- a/src/filters.c
+++ b/src/filters.c
@@ -670,9 +670,7 @@
continue;
}
- FLT_NXT(filter, chn) = 0;
- FLT_FWD(filter, chn) = 0;
-
+ FLT_OFF(filter, chn) = 0;
if (FLT_OPS(filter)->channel_start_analyze) {
DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_FLT_ANA, s);
ret = FLT_OPS(filter)->channel_start_analyze(s, filter, chn);
@@ -799,8 +797,7 @@
goto sync;
RESUME_FILTER_LOOP(s, chn) {
- FLT_NXT(filter, chn) = 0;
- FLT_FWD(filter, chn) = 0;
+ FLT_OFF(filter, chn) = 0;
unregister_data_filter(s, chn, filter);
if (FLT_OPS(filter)->channel_end_analyze) {
@@ -848,120 +845,44 @@
return ret;
}
-
-/*
- * Calls 'tcp_data' callback for all "data" filters attached to a stream. This
- * function is called when incoming data are available. It takes care to update
- * the next offset of filters and adjusts available data to be sure that a
- * filter cannot parse more data than its predecessors. A filter can choose to
- * not consume all available data. Returns -1 if an error occurs, the number of
- * consumed bytes otherwise.
- */
-static int
-flt_data(struct stream *s, struct channel *chn)
-{
- struct filter *filter;
- unsigned int buf_i;
- int delta = 0, ret = 0;
-
- /* Save buffer state */
- buf_i = ci_data(chn);
-
- list_for_each_entry(filter, &strm_flt(s)->filters, list) {
- unsigned int *nxt;
-
- /* Call "data" filters only */
- if (!IS_DATA_FILTER(filter, chn))
- continue;
-
- nxt = &FLT_NXT(filter, chn);
- if (FLT_OPS(filter)->tcp_data) {
- unsigned int i = ci_data(chn);
-
- DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
- ret = FLT_OPS(filter)->tcp_data(s, filter, chn);
- if (ret < 0)
- break;
- delta += (int)(ci_data(chn) - i);
-
- /* Increase next offset of the current filter */
- *nxt += ret;
-
- /* And set this value as the bound for the next
- * filter. It will not able to parse more data than the
- * current one. */
- b_set_data(&chn->buf, co_data(chn) + *nxt);
- }
- else {
- /* Consume all available data */
- *nxt = ci_data(chn);
- }
-
- /* Update <ret> value to be sure to have the last one when we
- * exit from the loop. This value will be used to know how much
- * data are "forwardable" */
- ret = *nxt;
- }
-
- /* Restore the original buffer state */
- b_set_data(&chn->buf, co_data(chn) + buf_i + delta);
-
- return ret;
-}
/*
- * Calls 'tcp_forward_data' callback for all "data" filters attached to a
- * stream. This function is called when some data can be forwarded. It takes
- * care to update the forward offset of filters and adjusts "forwardable" data
- * to be sure that a filter cannot forward more data than its predecessors. A
- * filter can choose to not forward all parsed data. Returns a negative value if
- * an error occurs, else the number of forwarded bytes.
+ * Calls 'tcp_payload' callback for all "data" filters attached to a
+ * stream. This function is called when some data can be forwarded in the
+ * AN_REQ_FLT_XFER_BODY and AN_RES_FLT_XFER_BODY analyzers. It takes care to
+ * update the filters and the stream offset to be sure that a filter cannot
+ * forward more data than its predecessors. A filter can choose to not forward
+ * all data. Returns a negative value if an error occurs, else the number of
+ * forwarded bytes.
*/
-static int
-flt_forward_data(struct stream *s, struct channel *chn, unsigned int len)
+int
+flt_tcp_payload(struct stream *s, struct channel *chn, unsigned int len)
{
struct filter *filter;
- int ret = len;
+ unsigned long long *strm_off = &FLT_STRM_OFF(s, chn);
+ unsigned int out = co_data(chn);
+ int ret = len - out;
+ DBG_TRACE_ENTER(STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
list_for_each_entry(filter, &strm_flt(s)->filters, list) {
- unsigned int *fwd;
-
/* Call "data" filters only */
if (!IS_DATA_FILTER(filter, chn))
continue;
+ if (FLT_OPS(filter)->tcp_payload) {
+ unsigned long long *flt_off = &FLT_OFF(filter, chn);
+ unsigned int offset = *flt_off - *strm_off;
- fwd = &FLT_FWD(filter, chn);
- if (FLT_OPS(filter)->tcp_forward_data) {
- /* Remove bytes that the current filter considered as
- * forwarded */
DBG_TRACE_DEVEL(FLT_ID(filter), STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
- ret = FLT_OPS(filter)->tcp_forward_data(s, filter, chn, ret - *fwd);
+ ret = FLT_OPS(filter)->tcp_payload(s, filter, chn, out + offset, ret - offset);
if (ret < 0)
goto end;
+ *flt_off += ret;
+ ret += offset;
}
-
- /* Adjust bytes that the current filter considers as
- * forwarded */
- *fwd += ret;
-
- /* And set this value as the bound for the next filter. It will
- * not able to forward more data than the current one. */
- ret = *fwd;
}
-
- if (!ret)
- goto end;
-
- /* Finally, adjust filters offsets by removing data that HAProxy will
- * forward. */
- list_for_each_entry(filter, &strm_flt(s)->filters, list) {
- if (!IS_DATA_FILTER(filter, chn))
- continue;
- FLT_NXT(filter, chn) -= ret;
- FLT_FWD(filter, chn) -= ret;
- }
-
+ *strm_off += ret;
end:
+ DBG_TRACE_LEAVE(STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
return ret;
}
@@ -976,12 +897,13 @@
int
flt_xfer_data(struct stream *s, struct channel *chn, unsigned int an_bit)
{
+ unsigned int len;
int ret = 1;
DBG_TRACE_ENTER(STRM_EV_STRM_ANA|STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
/* If there is no "data" filters, we do nothing */
- if (!HAS_DATA_FILTERS(s, chn) || (s->flags & SF_HTX))
+ if (!HAS_DATA_FILTERS(s, chn))
goto end;
/* Be sure that the output is still opened. Else we stop the data
@@ -990,26 +912,30 @@
((chn->flags & CF_SHUTW) && (chn->to_forward || co_data(chn))))
goto end;
- /* Let all "data" filters parsing incoming data */
- ret = flt_data(s, chn);
- if (ret < 0)
- goto end;
+ if (s->flags & SF_HTX) {
+ struct htx *htx = htxbuf(&chn->buf);
+ len = htx->data;
+ }
+ else
+ len = c_data(chn);
- /* And forward them */
- ret = flt_forward_data(s, chn, ret);
+ ret = flt_tcp_payload(s, chn, len);
if (ret < 0)
goto end;
-
- /* Consume data that all filters consider as forwarded. */
c_adv(chn, ret);
/* Stop waiting data if the input in closed and no data is pending or if
* the output is closed. */
- if ((chn->flags & CF_SHUTW) ||
- ((chn->flags & CF_SHUTR) && !ci_data(chn))) {
+ if (chn->flags & CF_SHUTW) {
ret = 1;
goto end;
}
+ if (chn->flags & CF_SHUTR) {
+ if (((s->flags & SF_HTX) && htx_is_empty(htxbuf(&chn->buf))) || c_empty(chn)) {
+ ret = 1;
+ goto end;
+ }
+ }
/* Wait for data */
DBG_TRACE_DEVEL("waiting for more data", STRM_EV_STRM_ANA|STRM_EV_TCP_ANA|STRM_EV_FLT_ANA, s);
diff --git a/src/flt_trace.c b/src/flt_trace.c
index 5ed5158..e349431 100644
--- a/src/flt_trace.c
+++ b/src/flt_trace.c
@@ -110,17 +110,17 @@
}
static void
-trace_raw_hexdump(struct buffer *buf, int len, int out)
+trace_raw_hexdump(struct buffer *buf, unsigned int offset, unsigned int len)
{
unsigned char p[len];
int block1, block2;
block1 = len;
- if (block1 > b_contig_data(buf, out))
- block1 = b_contig_data(buf, out);
+ if (block1 > b_contig_data(buf, offset))
+ block1 = b_contig_data(buf, offset);
block2 = len - block1;
- memcpy(p, b_head(buf), block1);
+ memcpy(p, b_peek(buf, offset), block1);
memcpy(p+block1, b_orig(buf), block2);
trace_hexdump(ist2(p, len));
}
@@ -153,6 +153,31 @@
}
}
+static unsigned int
+trace_get_htx_datalen(struct htx *htx, unsigned int offset, unsigned int len)
+{
+ struct htx_blk *blk;
+ uint32_t sz, data = 0;
+
+ for (blk = htx_get_first_blk(htx); blk; blk = htx_get_next_blk(htx, blk)) {
+ if (htx_get_blk_type(blk) != HTX_BLK_DATA)
+ break;
+
+ sz = htx_get_blksz(blk);
+ if (offset >= sz) {
+ offset -= sz;
+ continue;
+ }
+ data += sz - offset;
+ offset = 0;
+ if (data > len) {
+ data = len;
+ break;
+ }
+ }
+ return data;
+}
+
/***************************************************************************
* Hooks that manage the filter lifecycle (init/check/deinit)
**************************************************************************/
@@ -441,28 +466,9 @@
int ret = len;
if (ret && conf->rand_forwarding) {
- struct htx *htx = htxbuf(&msg->chn->buf);
- struct htx_blk *blk;
- uint32_t sz, data = 0;
- unsigned int off = offset;
-
- for (blk = htx_get_first_blk(htx); blk; blk = htx_get_next_blk(htx, blk)) {
- if (htx_get_blk_type(blk) != HTX_BLK_DATA)
- break;
+ unsigned int data = trace_get_htx_datalen(htxbuf(&msg->chn->buf), offset, len);
- sz = htx_get_blksz(blk);
- if (off >= sz) {
- off -= sz;
- continue;
- }
- data += sz - off;
- off = 0;
- if (data > len) {
- data = len;
- break;
- }
- }
- if (data) {
+ if (data) {
ret = random() % (ret+1);
if (!ret || ret >= data)
ret = len;
@@ -476,7 +482,7 @@
offset, len, ret);
if (conf->hexdump)
- trace_htx_hexdump(htxbuf(&msg->chn->buf), offset, len);
+ trace_htx_hexdump(htxbuf(&msg->chn->buf), offset, ret);
if (ret != len)
task_wakeup(s->task, TASK_WOKEN_MSG);
@@ -520,51 +526,51 @@
* Hooks to filter TCP data
*************************************************************************/
static int
-trace_tcp_data(struct stream *s, struct filter *filter, struct channel *chn)
+trace_tcp_payload(struct stream *s, struct filter *filter, struct channel *chn,
+ unsigned int offset, unsigned int len)
{
struct trace_config *conf = FLT_CONF(filter);
- int avail = ci_data(chn) - FLT_NXT(filter, chn);
- int ret = avail;
+ int ret = len;
- if (ret && conf->rand_parsing)
- ret = random() % (ret+1);
+ if (s->flags & SF_HTX) {
+ if (ret && conf->rand_forwarding) {
+ unsigned int data = trace_get_htx_datalen(htxbuf(&chn->buf), offset, len);
- FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - next=%u - avail=%u - consume=%d",
- __FUNCTION__,
- channel_label(chn), proxy_mode(s), stream_pos(s),
- FLT_NXT(filter, chn), avail, ret);
+ if (data) {
+ ret = random() % (ret+1);
+ if (!ret || ret >= data)
+ ret = len;
+ }
+ }
- if (ret != avail)
- task_wakeup(s->task, TASK_WOKEN_MSG);
- return ret;
-}
+ FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - "
+ "offset=%u - len=%u - forward=%d",
+ __FUNCTION__,
+ channel_label(chn), proxy_mode(s), stream_pos(s),
+ offset, len, ret);
-static int
-trace_tcp_forward_data(struct stream *s, struct filter *filter, struct channel *chn,
- unsigned int len)
-{
- struct trace_config *conf = FLT_CONF(filter);
- int ret = len;
+ if (conf->hexdump)
+ trace_htx_hexdump(htxbuf(&chn->buf), offset, ret);
+ }
+ else {
- if (ret && conf->rand_forwarding)
- ret = random() % (ret+1);
+ if (ret && conf->rand_forwarding)
+ ret = random() % (ret+1);
- FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - len=%u - fwd=%u - forward=%d",
- __FUNCTION__,
- channel_label(chn), proxy_mode(s), stream_pos(s), len,
- FLT_FWD(filter, chn), ret);
+ FLT_STRM_TRACE(conf, s, "%-25s: channel=%-10s - mode=%-5s (%s) - "
+ "offset=%u - len=%u - forward=%d",
+ __FUNCTION__,
+ channel_label(chn), proxy_mode(s), stream_pos(s),
+ offset, len, ret);
- if (conf->hexdump) {
- c_adv(chn, FLT_FWD(filter, chn));
- trace_raw_hexdump(&chn->buf, ret, co_data(chn));
- c_rew(chn, FLT_FWD(filter, chn));
+ if (conf->hexdump)
+ trace_raw_hexdump(&chn->buf, offset, ret);
}
- if (ret != len)
- task_wakeup(s->task, TASK_WOKEN_MSG);
+ if (ret != len)
+ task_wakeup(s->task, TASK_WOKEN_MSG);
return ret;
}
-
/********************************************************************
* Functions that manage the filter initialization
********************************************************************/
@@ -598,8 +604,7 @@
.http_reply = trace_http_reply,
/* Filter TCP data */
- .tcp_data = trace_tcp_data,
- .tcp_forward_data = trace_tcp_forward_data,
+ .tcp_payload = trace_tcp_payload,
};
/* Return -1 on error, else 0 */