MEDIUM: filters: Optimize the HTTP compression for chunk encoded response

Instead of compressing all chunks as they come, we store them in a temporary
buffer. The compression happens during the forwarding phase. This change speeds
up the compression of chunked response.
diff --git a/src/flt_http_comp.c b/src/flt_http_comp.c
index 3fee538..d9a1338 100644
--- a/src/flt_http_comp.c
+++ b/src/flt_http_comp.c
@@ -33,13 +33,16 @@
 struct flt_ops comp_ops;
 
 static struct buffer *tmpbuf = &buf_empty;
+static struct buffer *zbuf   = &buf_empty;
 
 struct comp_state {
 	struct comp_ctx  *comp_ctx;   /* compression context */
 	struct comp_algo *comp_algo;  /* compression algorithm if not NULL */
-	int sov;
+	int hdrs_len;
+	int tlrs_len;
 	int consumed;
 	int initialized;
+	int finished;
 };
 
 static int select_compression_request_header(struct comp_state *st,
@@ -62,14 +65,10 @@
 comp_flt_init(struct proxy *px, struct filter *filter)
 {
 
-	/* We need a compression buffer in the DATA state to put the output of
-	 * compressed data, and in CRLF state to let the TRAILERS state finish
-	 * the job of removing the trailing CRLF.
-	 */
-	if (!tmpbuf->size) {
-		if (b_alloc(&tmpbuf) == NULL)
-			return -1;
-	}
+	if (!tmpbuf->size && b_alloc(&tmpbuf) == NULL)
+		return -1;
+	if (!zbuf->size && b_alloc(&zbuf) == NULL)
+		return -1;
 	return 0;
 }
 
@@ -78,6 +77,8 @@
 {
 	if (tmpbuf->size)
 		b_free(&tmpbuf);
+	if (zbuf->size)
+		b_free(&zbuf);
 }
 
 static int
@@ -91,9 +92,11 @@
 
 		st->comp_algo   = NULL;
 		st->comp_ctx    = NULL;
-		st->sov         = 0;
+		st->hdrs_len    = 0;
+		st->tlrs_len    = 0;
 		st->consumed    = 0;
 		st->initialized = 0;
+		st->finished    = 0;
 		filter->ctx     = st;
 	}
 	return 1;
@@ -114,8 +117,8 @@
 		else {
 			select_compression_response_header(st, s, &s->txn->rsp);
 			if (st->comp_algo) {
-				st->sov = s->txn->rsp.sov;
 				register_data_filter(s, chn, filter);
+				st->hdrs_len = s->txn->rsp.sov;
 			}
 		}
 	}
@@ -164,21 +167,38 @@
 		return len;
 
 	if (!st->initialized) {
-		unsigned int fwd = flt_rsp_fwd(filter) + st->sov;
+		unsigned int fwd = flt_rsp_fwd(filter) + st->hdrs_len;
 
+		b_reset(tmpbuf);
 		b_adv(buf, fwd);
-		ret = http_compression_buffer_init(buf, tmpbuf);
+		ret = http_compression_buffer_init(buf, zbuf);
 		b_rew(buf, fwd);
 		if (ret < 0) {
 			msg->chn->flags |= CF_WAKE_WRITE;
 			return 0;
 		}
 	}
-	b_adv(buf, *nxt);
-	ret = http_compression_buffer_add_data(st, buf, tmpbuf, len);
-	b_rew(buf, *nxt);
-	if (ret < 0)
-		return ret;
+
+	if (msg->flags & HTTP_MSGF_TE_CHNK) {
+		int block = bi_contig_data(buf);
+
+		len = MIN(tmpbuf->size - buffer_len(tmpbuf), len);
+		if (len > block) {
+			memcpy(bi_end(tmpbuf), b_ptr(buf, *nxt), block);
+			memcpy(bi_end(tmpbuf)+block, buf->data, len - block);
+		}
+		else
+			memcpy(bi_end(tmpbuf), b_ptr(buf, *nxt), len);
+		tmpbuf->i += len;
+		ret        = len;
+	}
+	else {
+		b_adv(buf, *nxt);
+		ret = http_compression_buffer_add_data(st, buf, zbuf, len);
+		b_rew(buf, *nxt);
+		if (ret < 0)
+			return ret;
+	}
 
 	st->initialized = 1;
 	msg->next      += ret;
@@ -192,22 +212,20 @@
 			 struct http_msg *msg)
 {
 	struct comp_state *st = filter->ctx;
-	int                ret;
 
-	if (!st->initialized)
-		return 1;
-
-	st->consumed = msg->next - st->sov;
-	b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->sov);
-	ret = http_compression_buffer_end(st, s, &msg->chn->buf, &tmpbuf, 1);
-	if (ret < 0)
-		return ret;
+	if (!st->initialized) {
+		if (!st->finished) {
+			struct buffer *buf = msg->chn->buf;
+			unsigned int   fwd = flt_rsp_fwd(filter) + st->hdrs_len;
 
-	st->initialized = 0;
-	st->sov         = 0;
-	msg->next       = ret;
-	FLT_NXT(filter, msg->chn) = ret;
-	FLT_FWD(filter, msg->chn) = 0;
+			b_reset(tmpbuf);
+			b_adv(buf, fwd);
+			http_compression_buffer_init(buf, zbuf);
+			b_rew(buf, fwd);
+			st->initialized = 1;
+		}
+	}
+	st->tlrs_len = msg->sol;
 	return 1;
 }
 
@@ -220,29 +238,65 @@
 	int                ret;
 
 	/* To work, previous filters MUST forward all data */
-	if (FLT_FWD(filter, msg->chn) + len != FLT_NXT(filter, msg->chn)) {
+	if (flt_rsp_fwd(filter) + len != flt_rsp_nxt(filter)) {
 		Warning("HTTP compression failed: unexpected behavior of previous filters\n");
 		return -1;
 	}
 
 	if (!st->initialized) {
-		ret = len;
-		st->sov = ((st->sov > ret) ?  (st->sov-ret) : 0);
+		if (!len) {
+			/* Nothing to foward */
+			ret = len;
+		}
+		else if (st->hdrs_len > len) {
+			/* Forward part of headers */
+			ret           = len;
+			st->hdrs_len -= len;
+		}
+		else if (st->hdrs_len > 0) {
+			/* Forward remaining headers */
+			ret          = st->hdrs_len;
+			st->hdrs_len = 0;
+		}
+		else if (msg->msg_state < HTTP_MSG_TRAILERS) {
+			/* Do not forward anything for now. This only happens
+			 * with chunk-encoded responses. Waiting data are part
+			 * of the chunk envelope (the chunk size or the chunk
+			 * CRLF). These data will be skipped during the
+			 * compression. */
+			ret = 0;
+		}
+		else {
+			/* Forward trailers data */
+			ret = len;
+		}
 		return ret;
 	}
 
+	if (msg->flags & HTTP_MSGF_TE_CHNK) {
+		ret = http_compression_buffer_add_data(st, tmpbuf, zbuf, tmpbuf->i);
+		if (ret != tmpbuf->i) {
+			Warning("HTTP compression failed: Must consume %d bytes but only %d bytes consumed\n",
+				tmpbuf->i, ret);
+			return -1;
+		}
+	}
+
-	st->consumed = len - st->sov;
-	b_adv(msg->chn->buf, FLT_FWD(filter, msg->chn) + st->sov);
-	ret = http_compression_buffer_end(st, s, &msg->chn->buf, &tmpbuf,
-					  msg->msg_state == HTTP_MSG_ENDING);
+	st->consumed = len - st->hdrs_len - st->tlrs_len;
+	b_adv(msg->chn->buf, flt_rsp_fwd(filter) + st->hdrs_len);
+	ret = http_compression_buffer_end(st, s, &msg->chn->buf, &zbuf, msg->msg_state >= HTTP_MSG_TRAILERS);
+	b_rew(msg->chn->buf, flt_rsp_fwd(filter) + st->hdrs_len);
 	if (ret < 0)
 		return ret;
 
+	flt_change_forward_size(filter, msg->chn, ret - st->consumed);
+	msg->next += (ret - st->consumed);
+	ret += st->hdrs_len + st->tlrs_len;
+
 	st->initialized = 0;
-	st->sov         = 0;
-	msg->next       = ret;
-	FLT_NXT(filter, msg->chn) = ret;
-	FLT_FWD(filter, msg->chn) = 0;
+	st->finished    = (msg->msg_state >= HTTP_MSG_TRAILERS);
+	st->hdrs_len    = 0;
+	st->tlrs_len    = 0;
 	return ret;
 }
 
@@ -594,7 +648,6 @@
 		return -1; /* flush failed */
 
 #endif /* USE_ZLIB */
-
 	if (ob->i == 10) {
 		/* No data were appended, let's drop the output buffer and
 		 * keep the input buffer unchanged.
@@ -651,7 +704,7 @@
 
 		memcpy(tail, "0\r\n", 3);
 		tail += 3;
-		if (msg->msg_state == HTTP_MSG_ENDING) {
+		if (!(msg->flags & HTTP_MSGF_TE_CHNK)) {
 			memcpy(tail, "\r\n", 2);
 			tail += 2;
 		}