MAJOR: filters: Require explicit registration to filter HTTP body and TCP data

Before, functions to filter HTTP body (and TCP data) were called from the moment
at least one filter was attached to the stream. If no filter is interested by
these data, this uselessly slows data parsing.
A good example is the HTTP compression filter. Depending of request and response
headers, the response compression can be enabled or not. So it could be really
nice to call it only when enabled.

So, now, to filter HTTP/TCP data, a filter must use the function
register_data_filter. For TCP streams, this function can be called only
once. But for HTTP streams, when needed, it must be called for each HTTP request
or HTTP response.
Only registered filters will be called during data parsing. At any time, a
filter can be unregistered by calling the function unregister_data_filter.
diff --git a/include/proto/filters.h b/include/proto/filters.h
index 4ed81a8..dd7490f 100644
--- a/include/proto/filters.h
+++ b/include/proto/filters.h
@@ -35,18 +35,27 @@
 #define FLT_NXT(flt, chn) ((flt)->next[CHN_IDX(chn)])
 #define FLT_FWD(flt, chn) ((flt)->fwd[CHN_IDX(chn)])
 
-#define HAS_FILTERS(strm) ((strm)->strm_flt.has_filters)
+#define HAS_FILTERS(strm)           ((strm)->strm_flt.flags & STRM_FLT_FL_HAS_FILTERS)
 
-#define FLT_STRM_CB_IMPL_0(strm, call)					\
+#define HAS_REQ_DATA_FILTERS(strm)  ((strm)->strm_flt.nb_req_data_filters != 0)
+#define HAS_RSP_DATA_FILTERS(strm)  ((strm)->strm_flt.nb_rsp_data_filters != 0)
+#define HAS_DATA_FILTERS(strm, chn) ((chn->flags & CF_ISRESP) ? HAS_RSP_DATA_FILTERS(strm) : HAS_REQ_DATA_FILTERS(strm))
+
+#define IS_REQ_DATA_FILTER(flt)  ((flt)->flags & FLT_FL_IS_REQ_DATA_FILTER)
+#define IS_RSP_DATA_FILTER(flt)  ((flt)->flags & FLT_FL_IS_RSP_DATA_FILTER)
+#define IS_DATA_FILTER(flt, chn) ((chn->flags & CF_ISRESP) ? IS_RSP_DATA_FILTER(flt) : IS_REQ_DATA_FILTER(flt))
+
+#define FLT_STRM_CB(strm, call)						\
 	do {								\
 		if (HAS_FILTERS(strm)) { call; }			\
 	} while (0)
-#define FLT_STRM_CB_IMPL_1(strm, call, default_ret, ...)		\
-	(HAS_FILTERS(strm) ? call : default_ret)
-#define FLT_STRM_CB_IMPL_2(strm, call, default_ret, on_error)		\
+
+#define FLT_STRM_DATA_CB_IMPL_1(strm, chn, call, default_ret)	        \
+	(HAS_DATA_FILTERS(strm, chn) ? call : default_ret)
+#define FLT_STRM_DATA_CB_IMPL_2(strm, chn, call, default_ret, on_error)	\
 	({								\
 		int _ret;						\
-		if (HAS_FILTERS(strm)) {				\
+		if (HAS_DATA_FILTERS(strm, chn)) {			\
 			_ret = call;					\
 			if (_ret < 0) { on_error; }			\
 		}							\
@@ -54,10 +63,10 @@
 			_ret = default_ret;				\
 		_ret;							\
 	})
-#define FLT_STRM_CB_IMPL_3(strm, call, default_ret, on_error, on_wait)	\
+#define FLT_STRM_DATA_CB_IMPL_3(strm, chn, call, default_ret, on_error, on_wait) \
 	({								\
 		int _ret;						\
-		if (HAS_FILTERS(strm)) {				\
+		if (HAS_DATA_FILTERS(strm, chn)) {			\
 			_ret = call;					\
 			if (_ret < 0) { on_error; }			\
 			if (!_ret)    { on_wait;  }			\
@@ -67,14 +76,14 @@
 		_ret;							\
 	})
 
-#define FLT_STRM_CB_IMPL_X(strm, call, A, B, C, CB_IMPL, ...) CB_IMPL
+#define FLT_STRM_DATA_CB_IMPL_X(strm, chn, call, A, B, C, DATA_CB_IMPL, ...) \
+	DATA_CB_IMPL
 
-#define FLT_STRM_CB(strm, call, ...)					\
-	FLT_STRM_CB_IMPL_X(strm, call, ##__VA_ARGS__,			\
-			   FLT_STRM_CB_IMPL_3(strm, call, ##__VA_ARGS__), \
-			   FLT_STRM_CB_IMPL_2(strm, call, ##__VA_ARGS__), \
-			   FLT_STRM_CB_IMPL_1(strm, call, ##__VA_ARGS__), \
-			   FLT_STRM_CB_IMPL_0(strm, call))
+#define FLT_STRM_DATA_CB(strm, chn, call, ...)				\
+	FLT_STRM_DATA_CB_IMPL_X(strm, chn, call, ##__VA_ARGS__,		\
+				FLT_STRM_DATA_CB_IMPL_3(strm, chn, call, ##__VA_ARGS__), \
+				FLT_STRM_DATA_CB_IMPL_2(strm, chn, call, ##__VA_ARGS__), \
+				FLT_STRM_DATA_CB_IMPL_1(strm, chn, call, ##__VA_ARGS__))
 
 #define CALL_FILTER_ANALYZER(analyzer, strm, chn, bit)			\
 	if (!HAS_FILTERS(strm) || analyzer((strm), (chn), bit)) ; else break
@@ -118,25 +127,41 @@
 	return &s->strm_flt;
 }
 
+/* Registers a filter to a channel. If a filter was already registered, this
+ * function do nothing. Once registered, the filter becomes a "data" filter for
+ * this channel. */
 static inline void
-flt_set_forward_data(struct filter *filter, struct channel *chn)
+register_data_filter(struct stream *s, struct channel *chn, struct filter *filter)
 {
-	filter->flags[CHN_IDX(chn)] |= FILTER_FL_FORWARD_DATA;
+	if (!IS_DATA_FILTER(filter, chn)) {
+		if (chn->flags & CF_ISRESP) {
+			filter->flags |= FLT_FL_IS_RSP_DATA_FILTER;
+			strm_flt(s)->nb_rsp_data_filters++;
+		}
+		else  {
+			filter->flags |= FLT_FL_IS_REQ_DATA_FILTER;
+			strm_flt(s)->nb_req_data_filters++;
+		}
+	}
 }
 
+/* Unregisters a "data" filter from a channel. */
 static inline void
-flt_reset_forward_data(struct filter *filter, struct channel *chn)
+unregister_data_filter(struct stream *s, struct channel *chn, struct filter *filter)
 {
-	filter->flags[CHN_IDX(chn)] &= ~FILTER_FL_FORWARD_DATA;
-}
+	if (IS_DATA_FILTER(filter, chn)) {
+		if (chn->flags & CF_ISRESP) {
+			filter->flags &= ~FLT_FL_IS_RSP_DATA_FILTER;
+			strm_flt(s)->nb_rsp_data_filters--;
 
-static inline int
-flt_want_forward_data(struct filter *filter, const struct channel *chn)
-{
-	return filter->flags[CHN_IDX(chn)] & FILTER_FL_FORWARD_DATA;
+		}
+		else  {
+			filter->flags &= ~FLT_FL_IS_REQ_DATA_FILTER;
+			strm_flt(s)->nb_req_data_filters--;
+		}
+	}
 }
 
-
 /* This function must be called when a filter alter incoming data. It updates
  * next offset value of all filter's predecessors. Do not call this function
  * when a filter change the size of incomding data leads to an undefined
diff --git a/include/types/filters.h b/include/types/filters.h
index 20b0c95..38f96d9 100644
--- a/include/types/filters.h
+++ b/include/types/filters.h
@@ -172,6 +172,15 @@
 				 unsigned int len);
 };
 
+/* Flags set on a filter instance */
+#define FLT_FL_IS_BACKEND_FILTER  0x0001 /* The filter is a backend filter */
+#define FLT_FL_IS_REQ_DATA_FILTER 0x0002 /* The filter will parse data on the request channel */
+#define FLT_FL_IS_RSP_DATA_FILTER 0x0004 /* The filter will parse data on the response channel */
+
+
+/* Flags set on the stream, common to all filters attached to its stream */
+#define STRM_FLT_FL_HAS_FILTERS          0x0001 /* The stream has at least one filter */
+
 /*
  * Structure representing the state of a filter. When attached to a proxy, only
  * <ops> and <conf> field (and optionnaly <id>) are set. All other fields are
@@ -188,8 +197,7 @@
 	struct flt_ops *ops;               /* The filter callbacks */
 	void           *conf;              /* The filter configuration */
 	void           *ctx;               /* The filter context (opaque) */
-	int             is_backend_filter; /* Flag to specify if the filter is a "backend" filter */
-	unsigned int    flags[2];          /* 0: request, 1: response */
+	unsigned short  flags;             /* FLT_FL_* */
 	unsigned int    next[2];           /* Offset, relative to buf->p, to the next byte to parse for a specific channel
 	                                    * 0: request channel, 1: response channel */
 	unsigned int    fwd[2];            /* Offset, relative to buf->p, to the next byte to forward for a specific channel
@@ -197,10 +205,18 @@
 	struct list     list;              /* Next filter for the same proxy/stream */
 };
 
+/*
+ * Structure reprensenting the "global" state of filters attached to a stream.
+ */
 struct strm_flt {
-	struct list    filters;
-	struct filter *current[2]; // 0: request, 1: response
-	int            has_filters;
+	struct list    filters;               /* List of filters attached to a stream */
+	struct filter *current[2];            /* From which filter resume processing, for a specific channel.
+	                                       * This is used for resumable callbacks only,
+	                                       * If NULL, we start from the first filter.
+	                                       * 0: request channel, 1: response channel */
+	unsigned short flags;                 /* STRM_FL_* */
+	unsigned char  nb_req_data_filters;   /* Number of data filters registerd on the request channel */
+	unsigned char  nb_rsp_data_filters;   /* Number of data filters registerd on the response channel */
 };
 
 #endif /* _TYPES_FILTERS_H */
diff --git a/src/filters.c b/src/filters.c
index 399f417..1ab2b37 100644
--- a/src/filters.c
+++ b/src/filters.c
@@ -57,23 +57,23 @@
 	do {								\
 		struct filter *filter;					\
 									\
-		if ((strm)->strm_flt.current[CHN_IDX(chn)]) {		\
-			filter = (strm)->strm_flt.current[CHN_IDX(chn)]; \
-			(strm)->strm_flt.current[CHN_IDX(chn)] = NULL;	\
+		if (strm_flt(strm)->current[CHN_IDX(chn)]) {	\
+			filter = strm_flt(strm)->current[CHN_IDX(chn)]; \
+			strm_flt(strm)->current[CHN_IDX(chn)] = NULL; \
 			goto resume_execution;				\
 		}							\
 									\
 		list_for_each_entry(filter, &strm_flt(s)->filters, list) { \
-		  resume_execution:
+		resume_execution:
 
 #define RESUME_FILTER_END					\
 		}						\
 	} while(0)
 
-#define BREAK_EXECUTION(strm, chn, label)			\
-	do {							\
-		(strm)->strm_flt.current[CHN_IDX(chn)] = filter;	\
-		goto label;					\
+#define BREAK_EXECUTION(strm, chn, label)				\
+	do {								\
+		strm_flt(strm)->current[CHN_IDX(chn)] = filter;	\
+		goto label;						\
 	} while (0)
 
 
@@ -283,8 +283,7 @@
 
 /* Attaches a filter to a stream. Returns -1 if an error occurs, 0 otherwise. */
 static int
-flt_stream_add_filter(struct stream *s, struct filter *filter,
-			  int is_backend)
+flt_stream_add_filter(struct stream *s, struct filter *filter, unsigned int flags)
 {
 	struct filter *f = pool_alloc2(pool2_filter);
 	if (!f) /* not enough memory */
@@ -293,9 +292,9 @@
 	f->id    = filter->id;
 	f->ops   = filter->ops;
 	f->conf  = filter->conf;
-	f->is_backend_filter = is_backend;
+	f->flags |= flags;
 	LIST_ADDQ(&strm_flt(s)->filters, &f->list);
-	strm_flt(s)->has_filters = 1;
+	strm_flt(s)->flags |= STRM_FLT_FL_HAS_FILTERS;
 	return 0;
 }
 
@@ -329,13 +328,13 @@
 	struct filter *filter, *back;
 
 	list_for_each_entry_safe(filter, back, &strm_flt(s)->filters, list) {
-		if (!only_backend || filter->is_backend_filter) {
+		if (!only_backend || (filter->flags & FLT_FL_IS_BACKEND_FILTER)) {
 			LIST_DEL(&filter->list);
 			pool_free2(pool2_filter, filter);
 		}
 	}
 	if (LIST_ISEMPTY(&strm_flt(s)->filters))
-		strm_flt(s)->has_filters = 0;
+		strm_flt(s)->flags &= ~STRM_FLT_FL_HAS_FILTERS;
 }
 
 /*
@@ -384,7 +383,7 @@
 		return 0;
 
 	list_for_each_entry(filter, &be->filters, list) {
-		if (flt_stream_add_filter(s, filter, 1) < 0)
+		if (flt_stream_add_filter(s, filter, FLT_FL_IS_BACKEND_FILTER) < 0)
 			return -1;
 	}
 	return 0;
@@ -410,31 +409,38 @@
 	/* Save buffer state */
 	buf_i = buf->i;
 
-	buf->i = MIN(msg->chunk_len + msg->next, buf->i);
 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
+		unsigned int *nxt;
+
+		/* Call "data" filters only */
+		if (!IS_DATA_FILTER(filter, msg->chn))
+			continue;
+
 		/* If the HTTP parser is ahead, we update the next offset of the
 		 * current filter. This happens for chunked messages, at the
 		 * begining of a new chunk. */
-		if (msg->next > FLT_NXT(filter, msg->chn))
-			FLT_NXT(filter, msg->chn) = msg->next;
-		if (filter->ops->http_data && !flt_want_forward_data(filter, msg->chn)) {
+		nxt = &FLT_NXT(filter, msg->chn);
+		if (msg->next > *nxt)
+			*nxt = msg->next;
+
+		if (filter->ops->http_data) {
 			ret = filter->ops->http_data(s, filter, msg);
-			if (ret <= 0)
+			if (ret < 0)
 				break;
 
 			/* Update the next offset of the current filter */
-			FLT_NXT(filter, msg->chn) += ret;
+			*nxt += ret;
 
 			/* And set this value as the bound for the next
 			 * filter. It will not able to parse more data than this
 			 * one. */
-			buf->i = FLT_NXT(filter, msg->chn);
+			buf->i = *nxt;
 		}
 		else {
 			/* Consume all available data and update the next offset
 			 * of the current filter. buf->i is untouched here. */
-			ret = buf->i - FLT_NXT(filter, msg->chn);
-			FLT_NXT(filter, msg->chn) = buf->i;
+			ret = MIN(msg->chunk_len + msg->next, buf->i) - *nxt;
+			*nxt += ret;
 		}
 	}
 
@@ -456,13 +462,21 @@
 flt_http_chunk_trailers(struct stream *s, struct http_msg *msg)
 {
 	struct filter *filter;
-	int ret = 1;
+	int            ret = 1;
 
 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
+		unsigned int *nxt;
+
+		/* Call "data" filters only */
+		if (!IS_DATA_FILTER(filter, msg->chn))
+			continue;
+
 		/* Be sure to set the next offset of the filter at the right
 		 * place. This is really useful when the first part of the
 		 * trailers was parsed. */
-		FLT_NXT(filter, msg->chn) = msg->next;
+		nxt = &FLT_NXT(filter, msg->chn);
+		*nxt = msg->next;
+
 		if (filter->ops->http_chunk_trailers) {
 			ret = filter->ops->http_chunk_trailers(s, filter, msg);
 			if (ret < 0)
@@ -470,7 +484,7 @@
 		}
 		/* Update the next offset of the current filter. Here all data
 		 * are always consumed. */
-		FLT_NXT(filter, msg->chn) += msg->sol;
+		*nxt += msg->sol;
 	}
 	return ret;
 }
@@ -493,7 +507,6 @@
 			if (ret <= 0)
 				BREAK_EXECUTION(s, msg->chn, end);
 		}
-		flt_reset_forward_data(filter, msg->chn);
 	} RESUME_FILTER_END;
 end:
 	return ret;
@@ -545,27 +558,35 @@
 	int            ret = len;
 
 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
+		unsigned int *nxt, *fwd;
+
+		/* Call "data" filters only */
+		if (!IS_DATA_FILTER(filter, msg->chn))
+			continue;
+
 		/* If the HTTP parser is ahead, we update the next offset of the
 		 * current filter. This happens for chunked messages, when the
 		 * chunk envelope is parsed. */
-		if (msg->next > FLT_NXT(filter, msg->chn))
-			FLT_NXT(filter, msg->chn) = msg->next;
+		nxt = &FLT_NXT(filter, msg->chn);
+		fwd = &FLT_FWD(filter, msg->chn);
+		if (msg->next > *nxt)
+			*nxt = msg->next;
+
 		if (filter->ops->http_forward_data) {
-			/*  Remove bytes that the current filter considered as
-			 *  forwarded */
-			ret = filter->ops->http_forward_data(s, filter, msg,
-							     ret - FLT_FWD(filter, msg->chn));
+			/* Remove bytes that the current filter considered as
+			 * forwarded */
+			ret = filter->ops->http_forward_data(s, filter, msg, ret - *fwd);
 			if (ret < 0)
 				goto end;
 		}
 
 		/* Adjust bytes that the current filter considers as
 		 * forwarded */
-		FLT_FWD(filter, msg->chn) += ret;
+		*fwd += ret;
 
 		/* And set this value as the bound for the next filter. It will
 		 * not able to forward more data than the current one. */
-		ret = FLT_FWD(filter, msg->chn);
+		ret = *fwd;
 	}
 
 	if (!ret)
@@ -574,6 +595,8 @@
 	/* Finally, adjust filters offsets by removing data that HAProxy will
 	 * forward. */
 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
+		if (!IS_DATA_FILTER(filter, msg->chn))
+			continue;
 		FLT_NXT(filter, msg->chn) -= ret;
 		FLT_FWD(filter, msg->chn) -= ret;
 	}
@@ -599,7 +622,7 @@
 	 * so we do not need to check the filter list's emptiness. */
 
 	RESUME_FILTER_LOOP(s, chn) {
-		if (an_bit == AN_FLT_START_BE && !filter->is_backend_filter)
+		if (an_bit == AN_FLT_START_BE && !(filter->flags & FLT_FL_IS_BACKEND_FILTER))
 			continue;
 
 		FLT_NXT(filter, chn) = 0;
@@ -649,9 +672,8 @@
 int
 flt_analyze_http_headers(struct stream *s, struct channel *chn, unsigned int an_bit)
 {
-	struct filter   *filter;
-	struct http_msg *msg;
-	int              ret = 1;
+	struct filter *filter;
+	int            ret = 1;
 
 	RESUME_FILTER_LOOP(s, chn) {
 		if (filter->ops->channel_analyze) {
@@ -665,9 +687,13 @@
 	 * headers because any filter can alter them. So the definitive size of
 	 * headers (msg->sov) is only known when all filters have been
 	 * called. */
-	msg = ((chn->flags & CF_ISRESP) ? &s->txn->rsp : &s->txn->req);
 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
-		FLT_NXT(filter, msg->chn) = msg->sov;
+		/* Handle "data" filters only */
+		if (!IS_DATA_FILTER(filter, chn))
+			continue;
+
+		FLT_NXT(filter, chn) = ((chn->flags & CF_ISRESP)
+					? s->txn->rsp.sov : s->txn->req.sov);
 	}
 
  check_result:
@@ -686,12 +712,10 @@
 {
 	int ret = 1;
 
-	/* If this function is called, this means there is at least one filter,
-	 * so we do not need to check the filter list's emptiness. */
-
 	RESUME_FILTER_LOOP(s, chn) {
 		FLT_NXT(filter, chn) = 0;
 		FLT_FWD(filter, chn) = 0;
+		unregister_data_filter(s, chn, filter);
 
 		if (filter->ops->channel_end_analyze) {
 			ret = filter->ops->channel_end_analyze(s, filter, chn);
@@ -738,7 +762,7 @@
 static int
 flt_data(struct stream *s, struct channel *chn)
 {
-	struct filter *filter = NULL;
+	struct filter *filter;
 	struct buffer *buf = chn->buf;
 	unsigned int   buf_i;
 	int            ret = 0;
@@ -747,27 +771,35 @@
 	buf_i = buf->i;
 
 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
-		if (filter->ops->tcp_data && !flt_want_forward_data(filter, chn)) {
+		unsigned int *nxt;
+
+		/* Call "data" filters only */
+		if (!IS_DATA_FILTER(filter, chn))
+			continue;
+
+		nxt = &FLT_NXT(filter, chn);
+		if (filter->ops->tcp_data) {
 			ret = filter->ops->tcp_data(s, filter, chn);
 			if (ret < 0)
 				break;
 
 			/* Increase next offset of the current filter */
-			FLT_NXT(filter, chn) += ret;
+			*nxt += ret;
 
 			/* And set this value as the bound for the next
 			 * filter. It will not able to parse more data than the
 			 * current one. */
-			buf->i = FLT_NXT(filter, chn);
+			buf->i = *nxt;
 		}
 		else {
 			/* Consume all available data */
-			FLT_NXT(filter, chn) = buf->i;
+			*nxt = buf->i;
 		}
 
 		/* Update <ret> value to be sure to have the last one when we
-		 * exit from the loop. */
-		ret = FLT_NXT(filter, chn);
+		 * exit from the loop. This value will be used to know how much
+		 * data are "forwardable" */
+		ret = *nxt;
 	}
 
 	/* Restore the original buffer state */
@@ -787,40 +819,46 @@
 static int
 flt_forward_data(struct stream *s, struct channel *chn, unsigned int len)
 {
-	struct filter *filter = NULL;
+	struct filter *filter;
 	int            ret = len;
 
 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
+		unsigned int *fwd;
+
+		/* Call "data" filters only */
+		if (!IS_DATA_FILTER(filter, chn))
+			continue;
+
+		fwd = &FLT_FWD(filter, chn);
 		if (filter->ops->tcp_forward_data) {
 			/* Remove bytes that the current filter considered as
 			 * forwarded */
-			ret = filter->ops->tcp_forward_data(s, filter, chn,
-							    ret - FLT_FWD(filter, chn));
+			ret = filter->ops->tcp_forward_data(s, filter, chn, ret - *fwd);
 			if (ret < 0)
 				goto end;
 		}
 
-		/* Adjust bytes taht the current filter considers as
+		/* Adjust bytes that the current filter considers as
 		 * forwarded */
-		FLT_FWD(filter, chn) += ret;
+		*fwd += ret;
 
 		/* And set this value as the bound for the next filter. It will
 		 * not able to forward more data than the current one. */
-		ret = FLT_FWD(filter, chn);
+		ret = *fwd;
 	}
 
 	if (!ret)
 		goto end;
 
-	/* Adjust forward counter and next offset of filters by removing data
-	 * that HAProxy will consider as forwarded. */
+	/* Finally, adjust filters offsets by removing data that HAProxy will
+	 * forward. */
 	list_for_each_entry(filter, &strm_flt(s)->filters, list) {
+		if (!IS_DATA_FILTER(filter, chn))
+			continue;
 		FLT_NXT(filter, chn) -= ret;
 		FLT_FWD(filter, chn) -= ret;
 	}
 
-	/* Consume data that all filters consider as forwarded. */
-	b_adv(chn->buf, ret);
  end:
 	return ret;
 }
@@ -838,8 +876,9 @@
 {
 	int ret = 1;
 
-	/* If this function is called, this means there is at least one filter,
-	 * so we do not need to check the filter list's emptiness. */
+	/* If there is no "data" filters, we do nothing */
+	if (!HAS_DATA_FILTERS(s, chn))
+		goto end;
 
 	/* Be sure that the output is still opened. Else we stop the data
 	 * filtering. */
@@ -857,6 +896,9 @@
 	if (ret < 0)
 		goto end;
 
+	/* Consume data that all filters consider as forwarded. */
+	b_adv(chn->buf, ret);
+
 	/* Stop waiting data if the input in closed and no data is pending or if
 	 * the output is closed. */
 	if ((chn->flags & CF_SHUTW) ||
diff --git a/src/flt_http_comp.c b/src/flt_http_comp.c
index e7d3577..12d9e33 100644
--- a/src/flt_http_comp.c
+++ b/src/flt_http_comp.c
@@ -113,8 +113,10 @@
 			select_compression_request_header(st, s, &s->txn->req);
 		else {
 			select_compression_response_header(st, s, &s->txn->rsp);
-			if (st->comp_algo)
+			if (st->comp_algo) {
 				st->sov = s->txn->rsp.sov;
+				register_data_filter(s, chn, filter);
+			}
 		}
 	}
 
@@ -155,10 +157,8 @@
 	unsigned int       len;
 	int                ret;
 
-	if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) {
-		flt_set_forward_data(filter, msg->chn);
+	if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo)
 		return 1;
-	}
 
 	len = MIN(msg->chunk_len + msg->next, msg->chn->buf->i) - FLT_NXT(filter, msg->chn);
 	if (!len)
@@ -193,11 +193,6 @@
 	struct comp_state *st = filter->ctx;
 	int                ret;
 
-	if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) {
-		flt_set_forward_data(filter, msg->chn);
-		return 1;
-	}
-
 	if (!st->initialized)
 		return 1;
 
@@ -223,12 +218,6 @@
 	struct comp_state *st = filter->ctx;
 	int                ret;
 
-	if (!(msg->chn->flags & CF_ISRESP) || !st->comp_algo) {
-		flt_set_forward_data(filter, msg->chn);
-		ret = len;
-		return ret;
-	}
-
 	/* To work, previous filters MUST forward all data */
 	if (FLT_FWD(filter, msg->chn) + len != FLT_NXT(filter, msg->chn)) {
 		Warning("HTTP compression failed: unexpected behavior of previous filters\n");
diff --git a/src/proto_http.c b/src/proto_http.c
index b25d18a..0e035a5 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -6800,9 +6800,9 @@
 		msg->chunk_len += len;
 		msg->body_len  += len;
 	}
-	ret = FLT_STRM_CB(s, flt_http_data(s, msg),
-			  /* default_ret */ MIN(msg->chunk_len, chn->buf->i - msg->next),
-			  /* on_error    */ goto error);
+	ret = FLT_STRM_DATA_CB(s, chn, flt_http_data(s, msg),
+			       /* default_ret */ MIN(msg->chunk_len, chn->buf->i - msg->next),
+			       /* on_error    */ goto error);
 	msg->next     += ret;
 	msg->chunk_len -= ret;
 	if (msg->chunk_len) {
@@ -6821,26 +6821,26 @@
   ending:
 	/* we may have some pending data starting at res->buf->p such as a last
 	 * chunk of data or trailers. */
-	ret = FLT_STRM_CB(s, flt_http_forward_data(s, msg, msg->next),
-			  /* default_ret */ msg->next,
-			  /* on_error    */ goto error);
+	ret = FLT_STRM_DATA_CB(s, chn, flt_http_forward_data(s, msg, msg->next),
+			       /* default_ret */ msg->next,
+			       /* on_error    */ goto error);
 	b_adv(chn->buf, ret);
 	msg->next -= ret;
 	if (msg->next)
 		goto missing_data_or_waiting;
 
-	FLT_STRM_CB(s, flt_http_end(s, msg),
-		    /* default_ret */ 1,
-		    /* on_error    */ goto error,
-		    /* on_wait     */ goto waiting);
+	FLT_STRM_DATA_CB(s, chn, flt_http_end(s, msg),
+			 /* default_ret */ 1,
+			 /* on_error    */ goto error,
+			 /* on_wait     */ goto waiting);
 	msg->msg_state = HTTP_MSG_DONE;
 	return 1;
 
   missing_data_or_waiting:
 	/* we may have some pending data starting at chn->buf->p */
-	ret = FLT_STRM_CB(s, flt_http_forward_data(s, msg, msg->next),
-			  /* default_ret */ msg->next,
-			  /* on_error    */ goto error);
+	ret = FLT_STRM_DATA_CB(s, chn, flt_http_forward_data(s, msg, msg->next),
+			       /* default_ret */ msg->next,
+			       /* on_error    */ goto error);
 	b_adv(chn->buf, ret);
 	msg->next -= ret;
 	if (!(chn->flags & CF_WROTE_DATA) || msg->sov > 0)
@@ -6866,10 +6866,10 @@
   switch_states:
 	switch (msg->msg_state) {
 		case HTTP_MSG_DATA:
-			ret = FLT_STRM_CB(s, flt_http_data(s, msg),
-					  /* default_ret */ MIN(msg->chunk_len, chn->buf->i - msg->next),
-					  /* on_error    */ goto error);
-			msg->next     += ret;
+			ret = FLT_STRM_DATA_CB(s, chn, flt_http_data(s, msg),
+					       /* default_ret */ MIN(msg->chunk_len, chn->buf->i - msg->next),
+					       /* on_error    */ goto error);
+			msg->next      += ret;
 			msg->chunk_len -= ret;
 			if (msg->chunk_len) {
 				/* input empty or output full */
@@ -6917,9 +6917,9 @@
 			ret = http_forward_trailers(msg);
 			if (ret < 0)
 				goto chunk_parsing_error;
-			FLT_STRM_CB(s, flt_http_chunk_trailers(s, msg),
-				    /* default_ret */ 1,
-				    /* on_error    */ goto error);
+			FLT_STRM_DATA_CB(s, chn, flt_http_chunk_trailers(s, msg),
+					 /* default_ret */ 1,
+					 /* on_error    */ goto error);
 			msg->next += msg->sol;
 			msg->sol   = 0;
 			if (!ret)
@@ -6938,7 +6938,7 @@
   ending:
 	/* we may have some pending data starting at res->buf->p such as a last
 	 * chunk of data or trailers. */
-	ret = FLT_STRM_CB(s, flt_http_forward_data(s, msg, msg->next),
+	ret = FLT_STRM_DATA_CB(s, chn, flt_http_forward_data(s, msg, msg->next),
 			  /* default_ret */ msg->next,
 			  /* on_error    */ goto error);
 	b_adv(chn->buf, ret);
@@ -6946,7 +6946,7 @@
 	if (msg->next)
 		goto missing_data_or_waiting;
 
-	FLT_STRM_CB(s, flt_http_end(s, msg),
+	FLT_STRM_DATA_CB(s, chn, flt_http_end(s, msg),
 		    /* default_ret */ 1,
 		    /* on_error    */ goto error,
 		    /* on_wait     */ goto waiting);
@@ -6955,7 +6955,7 @@
 
   missing_data_or_waiting:
 	/* we may have some pending data starting at chn->buf->p */
-	ret = FLT_STRM_CB(s, flt_http_forward_data(s, msg, msg->next),
+	ret = FLT_STRM_DATA_CB(s, chn, flt_http_forward_data(s, msg, msg->next),
 			  /* default_ret */ msg->next,
 			  /* on_error    */ goto error);
 	b_adv(chn->buf, ret);