MINOR: cache: Add entry to the tree as soon as possible

When many concurrent requests targeting the same resource were seen, the
cache could sometimes be filled by too many partial responses resulting
in the impossibility to cache a single one of them. This happened
because the actual tree insertion happened only after all the payload of
every response was seen. So until then, every response was added to the
cache because none of the streams knew that a similar request/response
was already being treated.
This patch consists in adding the cache_entry as soon as possible in the
tree (right after the first packet) so that the other responses do not
get cached as well (if they have the same primary key).
A "complete" flag is also added to the cache_entry so that we know if
all the payload is already stored in the entry or if it is still being
processed.
diff --git a/src/cache.c b/src/cache.c
index 1b86ccd..420a009 100644
--- a/src/cache.c
+++ b/src/cache.c
@@ -105,6 +105,7 @@
 };
 
 struct cache_entry {
+	unsigned int complete;    /* An entry won't be valid until complete is not null. */
 	unsigned int latest_validation;     /* latest validation date */
 	unsigned int expire;      /* expiration date */
 	unsigned int age;         /* Origin server "Age" header value */
@@ -478,13 +479,9 @@
 
 		object = (struct cache_entry *)st->first_block->data;
 
-		/* does not need to test if the insertion worked, if it
-		 * doesn't, the blocks will be reused anyway */
-
 		shctx_lock(shctx);
-		if (eb32_insert(&cache->entries, &object->eb) != &object->eb) {
-			object->eb.key = 0;
-		}
+		/* The whole payload was cached, the entry can now be used. */
+		object->complete = 1;
 		/* remove from the hotlist */
 		shctx_row_dec_hot(shctx, st->first_block);
 		shctx_unlock(shctx);
@@ -700,7 +697,6 @@
 enum act_return http_action_store_cache(struct act_rule *rule, struct proxy *px,
 					struct session *sess, struct stream *s, int flags)
 {
-	unsigned int age;
 	long long hdr_age;
 	struct http_txn *txn = s->txn;
 	struct http_msg *msg = &txn->rsp;
@@ -716,10 +712,7 @@
 	struct http_hdr_ctx ctx;
 	size_t hdrs_len = 0;
 	int32_t pos;
-	unsigned int etag_length = 0;
-	unsigned int etag_offset = 0;
 	struct ist header_name = IST_NULL;
-	time_t last_modified = 0;
 	unsigned int vary_signature = 0;
 
 	/* Don't cache if the response came from a cache */
@@ -771,6 +764,8 @@
 	if (cache->vary_processing_enabled) {
 		if (!http_check_vary_header(htx, &vary_signature))
 			goto out;
+		if (vary_signature)
+			http_request_reduce_secondary_key(vary_signature, txn->cache_secondary_hash);
 	}
 	else if (http_find_header(htx, ist("Vary"), &ctx, 0)) {
 		goto out;
@@ -780,21 +775,72 @@
 
 	if (!(txn->flags & TX_CACHEABLE) || !(txn->flags & TX_CACHE_COOK) || (txn->flags & TX_CACHE_IGNORE))
 		goto out;
+
+	shctx_lock(shctx);
+	old = entry_exist(cache, txn->cache_hash);
+	if (old) {
+		if (vary_signature)
+			old = secondary_entry_exist(cconf->c.cache, old,
+						    txn->cache_secondary_hash);
+		if (old) {
+			if (!old->complete) {
+				/* An entry with the same primary key is already being
+				 * created, we should not try to store the current
+				 * response because it will waste space in the cache. */
+				shctx_unlock(shctx);
+				goto out;
+			}
+			eb32_delete(&old->eb);
+			old->eb.key = 0;
+		}
+	}
+	first = shctx_row_reserve_hot(shctx, NULL, sizeof(struct cache_entry));
+	if (!first) {
+		shctx_unlock(shctx);
+		goto out;
+	}
+	/* the received memory is not initialized, we need at least to mark
+	 * the object as not indexed yet.
+	 */
+	object = (struct cache_entry *)first->data;
+	memset(object, 0, sizeof(*object));
+	object->eb.key = key;
+	object->secondary_key_signature = vary_signature;
+	/* We need to temporarily set a valid expiring time until the actual one
+	 * is set by the end of this function (in case of concurrent accesses to
+	 * the same resource). This way the second access will find an existing
+	 * but not yet usable entry in the tree and will avoid storing its data. */
+	object->expire = now.tv_sec + 2;
+
+	memcpy(object->hash, txn->cache_hash, sizeof(object->hash));
+	if (vary_signature)
+		memcpy(object->secondary_key, txn->cache_secondary_hash, HTTP_CACHE_SEC_KEY_LEN);
+
+	/* Insert the entry in the tree even if the payload is not cached yet. */
+	if (eb32_insert(&cache->entries, &object->eb) != &object->eb) {
+		object->eb.key = 0;
+		shctx_unlock(shctx);
+		goto out;
+	}
+	shctx_unlock(shctx);
+
+	/* reserve space for the cache_entry structure */
+	first->len = sizeof(struct cache_entry);
+	first->last_append = NULL;
 
-	age = 0;
 	ctx.blk = NULL;
 	if (http_find_header(htx, ist("Age"), &ctx, 0)) {
 		if (!strl2llrc(ctx.value.ptr, ctx.value.len, &hdr_age) && hdr_age > 0) {
 			if (unlikely(hdr_age > CACHE_ENTRY_MAX_AGE))
 				hdr_age = CACHE_ENTRY_MAX_AGE;
-			age = hdr_age;
+			object->age = hdr_age;
 		}
 		http_remove_header(htx, &ctx);
 	}
 
 	/* Build a last-modified time that will be stored in the cache_entry and
 	 * compared to a future If-Modified-Since client header. */
-	last_modified = get_last_modified_time(htx);
+	object->last_modified = get_last_modified_time(htx);
 
 	chunk_reset(&trash);
 	for (pos = htx_get_first(htx); pos != -1; pos = htx_get_next(htx, pos)) {
@@ -813,8 +859,8 @@
 		if (type == HTX_BLK_HDR) {
 			header_name = htx_get_blk_name(htx, blk);
 			if (isteq(header_name, ist("etag"))) {
-				etag_length = sz - istlen(header_name);
-				etag_offset = sizeof(struct cache_entry) + b_data(&trash) - sz + istlen(header_name);
+				object->etag_length = sz - istlen(header_name);
+				object->etag_offset = sizeof(struct cache_entry) + b_data(&trash) - sz + istlen(header_name);
 			}
 		}
 		if (type == HTX_BLK_EOH)
@@ -826,36 +872,17 @@
 		goto out;
 
 	shctx_lock(shctx);
-	first = shctx_row_reserve_hot(shctx, NULL, sizeof(struct cache_entry) + trash.data);
-	if (!first) {
+	if (!shctx_row_reserve_hot(shctx, first, trash.data)) {
 		shctx_unlock(shctx);
 		goto out;
 	}
 	shctx_unlock(shctx);
 
-	/* the received memory is not initialized, we need at least to mark
-	 * the object as not indexed yet.
-	 */
-	object = (struct cache_entry *)first->data;
-	object->eb.node.leaf_p = NULL;
-	object->eb.key = 0;
-	object->age = age;
-	object->last_modified = last_modified;
-	object->secondary_key_signature = vary_signature;
-
-	/* reserve space for the cache_entry structure */
-	first->len = sizeof(struct cache_entry);
-	first->last_append = NULL;
 	/* cache the headers in a http action because it allows to chose what
 	 * to cache, for example you might want to cache a response before
 	 * modifying some HTTP headers, or on the contrary after modifying
 	 * those headers.
 	 */
-
-	/* Write the ETag information in the cache_entry if needed. */
-	object->etag_length = etag_length;
-	object->etag_offset = etag_offset;
-
 	/* does not need to be locked because it's in the "hot" list,
 	 * copy the headers */
 	if (shctx_row_data_append(shctx, first, NULL, (unsigned char *)trash.area, trash.data) < 0)
@@ -865,36 +892,10 @@
 	if (cache_ctx) {
 		cache_ctx->first_block = first;
 
-		object->eb.key = key;
-
-		memcpy(object->hash, txn->cache_hash, sizeof(object->hash));
-
-		/* Add the current request's secondary key to the buffer if needed. */
-		if (vary_signature) {
-			http_request_reduce_secondary_key(vary_signature, txn->cache_secondary_hash);
-			memcpy(object->secondary_key, txn->cache_secondary_hash, HTTP_CACHE_SEC_KEY_LEN);
-		}
-
-		/* Insert the node later on caching success */
-
-		shctx_lock(shctx);
-
-		old = entry_exist(cconf->c.cache, txn->cache_hash);
-		if (old) {
-			if (vary_signature)
-				old = secondary_entry_exist(cconf->c.cache, old,
-							    txn->cache_secondary_hash);
-
-			if (old) {
-				eb32_delete(&old->eb);
-				old->eb.key = 0;
-			}
-		}
-		shctx_unlock(shctx);
-
 		/* store latest value and expiration time */
 		object->latest_validation = now.tv_sec;
-		object->expire = now.tv_sec + http_calc_maxage(s, cconf->c.cache);
+		object->expire = now.tv_sec + http_calc_maxage(s, cache);
+
 		return ACT_RET_CONT;
 	}
 
@@ -903,6 +904,8 @@
 	if (first) {
 		shctx_lock(shctx);
 		first->len = 0;
+		if (object->eb.key)
+			eb32_delete(&object->eb);
 		object->eb.key = 0;
 		shctx_row_dec_hot(shctx, first);
 		shctx_unlock(shctx);
@@ -1453,7 +1456,8 @@
 
 	shctx_lock(shctx_ptr(cache));
 	res = entry_exist(cache, s->txn->cache_hash);
-	if (res) {
+	/* We must not use an entry that is not complete. */
+	if (res && res->complete) {
 		struct appctx *appctx;
 		entry_block = block_ptr(res);
 		shctx_row_inc_hot(shctx_ptr(cache), entry_block);