MEDIUM: proto_htx/filters: Add data filtering during the forwarding

If there is data filters registered on the stream, the function
flt_http_payload() is called before forwarding any data. And the function
flt_http_end() is called when all data are forwarded. While at least one data
filter reamins registered on the stream, no fast forwarding is used.
diff --git a/src/proto_htx.c b/src/proto_htx.c
index 90e743b..05c9ea0 100644
--- a/src/proto_htx.c
+++ b/src/proto_htx.c
@@ -1167,7 +1167,7 @@
 	struct http_txn *txn = s->txn;
 	struct http_msg *msg = &txn->req;
 	struct htx *htx;
-	//int ret;
+	int ret;
 
 	DPRINTF(stderr,"[%u] %s: stream=%p b=%p, exp(r,w)=%u,%u bf=%08x bh=%lu analysers=%02x\n",
 		now_ms, __FUNCTION__,
@@ -1225,21 +1225,31 @@
 
 	if (msg->msg_state >= HTTP_MSG_DONE)
 		goto done;
-
-	/* Forward all input data. We get it by removing all outgoing data not
-	 * forwarded yet from HTX data size.
+	/* Forward input data. We get it by removing all outgoing data not
+	 * forwarded yet from HTX data size. If there are some data filters, we
+	 * let them decide the amount of data to forward.
 	 */
-	c_adv(req, htx->data - co_data(req));
+	if (HAS_REQ_DATA_FILTERS(s)) {
+		ret  = flt_http_payload(s, msg, htx->data);
+		if (ret < 0)
+			goto return_bad_req;
+		c_adv(req, ret);
+		if (htx->data != co_data(req) || htx->extra)
+			goto missing_data_or_waiting;
+	}
+	else {
+		c_adv(req, htx->data - co_data(req));
 
-	/* To let the function channel_forward work as expected we must update
-	 * the channel's buffer to pretend there is no more input data. The
-	 * right length is then restored. We must do that, because when an HTX
-	 * message is stored into a buffer, it appears as full.
-	 */
-	b_set_data(&req->buf, co_data(req));
-	if (msg->flags & HTTP_MSGF_XFER_LEN)
-		htx->extra -= channel_forward(req, htx->extra);
-	b_set_data(&req->buf, b_size(&req->buf));
+		/* To let the function channel_forward work as expected we must update
+		 * the channel's buffer to pretend there is no more input data. The
+		 * right length is then restored. We must do that, because when an HTX
+		 * message is stored into a buffer, it appears as full.
+		 */
+		b_set_data(&req->buf, co_data(req));
+		if (msg->flags & HTTP_MSGF_XFER_LEN)
+			htx->extra -= channel_forward(req, htx->extra);
+		b_set_data(&req->buf, b_size(&req->buf));
+	}
 
 	/* Check if the end-of-message is reached and if so, switch the message
 	 * in HTTP_MSG_DONE state.
@@ -1255,6 +1265,15 @@
 	if ((txn->flags & TX_CON_WANT_MSK) != TX_CON_WANT_TUN)
 		channel_dont_close(req);
 
+	if (HAS_REQ_DATA_FILTERS(s)) {
+		ret = flt_http_end(s, msg);
+		if (ret <= 0) {
+			if (!ret)
+				goto missing_data_or_waiting;
+			goto return_bad_req;
+		}
+	}
+
 	htx_end_request(s);
 	if (!(req->analysers & an_bit)) {
 		htx_end_response(s);
@@ -1647,7 +1666,7 @@
 	 */
 	if (txn->status < 200 &&
 	    (txn->status == 100 || txn->status >= 102)) {
-		//FLT_STRM_CB(s, flt_htx_reset(s, http, htx));
+		FLT_STRM_CB(s, flt_http_reset(s, msg));
 		c_adv(rep, htx->data);
 		msg->msg_state = HTTP_MSG_RPBEFORE;
 		txn->status = 0;
@@ -2106,7 +2125,7 @@
 	struct http_txn *txn = s->txn;
 	struct http_msg *msg = &s->txn->rsp;
 	struct htx *htx;
-	//int ret;
+	int ret;
 
 	DPRINTF(stderr,"[%u] %s: stream=%p b=%p, exp(r,w)=%u,%u bf=%08x bh=%lu analysers=%02x\n",
 		now_ms, __FUNCTION__,
@@ -2146,24 +2165,35 @@
 	if (msg->msg_state >= HTTP_MSG_DONE)
 		goto done;
 
-	/* Forward all input data. We get it by removing all outgoing data not
-	 * forwarded yet from HTX data size.
+	/* Forward input data. We get it by removing all outgoing data not
+	 * forwarded yet from HTX data size. If there are some data filters, we
+	 * let them decide the amount of data to forward.
 	 */
-	c_adv(res, htx->data - co_data(res));
+	if (HAS_RSP_DATA_FILTERS(s)) {
+		ret  = flt_http_payload(s, msg, htx->data);
+		if (ret < 0)
+			goto return_bad_res;
+		c_adv(res, ret);
+		if (htx->data != co_data(res) || htx->extra)
+			goto missing_data_or_waiting;
+	}
+	else {
+		c_adv(res, htx->data - co_data(res));
 
-	/* To let the function channel_forward work as expected we must update
-	 * the channel's buffer to pretend there is no more input data. The
-	 * right length is then restored. We must do that, because when an HTX
-	 * message is stored into a buffer, it appears as full.
-	 */
-	b_set_data(&res->buf, co_data(res));
-	if (msg->flags & HTTP_MSGF_XFER_LEN)
-		htx->extra -= channel_forward(res, htx->extra);
-	b_set_data(&res->buf, b_size(&res->buf));
+		/* To let the function channel_forward work as expected we must update
+		 * the channel's buffer to pretend there is no more input data. The
+		 * right length is then restored. We must do that, because when an HTX
+		 * message is stored into a buffer, it appears as full.
+		 */
+		b_set_data(&res->buf, co_data(res));
+		if (msg->flags & HTTP_MSGF_XFER_LEN)
+			htx->extra -= channel_forward(res, htx->extra);
+		b_set_data(&res->buf, b_size(&res->buf));
+	}
 
 	if (!(msg->flags & HTTP_MSGF_XFER_LEN)) {
 		/* The server still sending data that should be filtered */
-		if (res->flags & CF_SHUTR || !HAS_DATA_FILTERS(s, res)) {
+		if (res->flags & CF_SHUTR || !HAS_RSP_DATA_FILTERS(s)) {
 			msg->msg_state = HTTP_MSG_TUNNEL;
 			goto done;
 		}
@@ -2181,6 +2211,15 @@
 	/* other states, DONE...TUNNEL */
 	channel_dont_close(res);
 
+	if (HAS_RSP_DATA_FILTERS(s)) {
+		ret = flt_http_end(s, msg);
+		if (ret <= 0) {
+			if (!ret)
+				goto missing_data_or_waiting;
+			goto return_bad_res;
+		}
+	}
+
 	htx_end_response(s);
 	if (!(res->analysers & an_bit)) {
 		htx_end_request(s);
@@ -2228,7 +2267,7 @@
 	 * are filters registered on the stream, we don't want to forward a
 	 * close
 	 */
-	if ((msg->flags & HTTP_MSGF_XFER_LEN) || HAS_DATA_FILTERS(s, res))
+	if ((msg->flags & HTTP_MSGF_XFER_LEN) || HAS_RSP_DATA_FILTERS(s))
 		channel_dont_close(res);
 
 	/* We know that more data are expected, but we couldn't send more that
@@ -5290,7 +5329,7 @@
 		struct channel *chn = si_ic(si);
 		struct htx *htx;
 
-		//FLT_STRM_CB(s, flt_htx_reply(s, s->txn->status, htx));
+		FLT_STRM_CB(s, flt_http_reply(s, s->txn->status, msg));
 		chn->buf.data = msg->data;
 		memcpy(chn->buf.area, msg->area, msg->data);
 		htx = htx_from_buf(&chn->buf);
@@ -5321,7 +5360,7 @@
 		struct channel *chn = &s->res;
 		struct htx *htx;
 
-		//FLT_STRM_CB(s, flt_htx_reply(s, s->txn->status, htx));
+		FLT_STRM_CB(s, flt_http_reply(s, s->txn->status, msg));
 		chn->buf.data = msg->data;
 		memcpy(chn->buf.area, msg->area, msg->data);
 		htx = htx_from_buf(&chn->buf);