MEDIUM: threads/compression: Make HTTP compression thread-safe
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index df5b9ed..6cb28fc 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -164,6 +164,7 @@
PATEXP_LOCK,
PATLRU_LOCK,
VARS_LOCK,
+ COMP_POOL_LOCK,
LOCK_LABELS
};
struct lock_stat {
@@ -251,7 +252,7 @@
"LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
"UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
"APPLETS", "PEER", "BUF_WQ", "STREAMS", "SSL", "SSL_GEN_CERTS",
- "PATREF", "PATEXP", "PATLRU", "VARS" };
+ "PATREF", "PATEXP", "PATLRU", "VARS", "COMP_POOL" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
diff --git a/src/compression.c b/src/compression.c
index ecd2fa5..ead9934 100644
--- a/src/compression.c
+++ b/src/compression.c
@@ -29,6 +29,7 @@
#include <common/cfgparse.h>
#include <common/compat.h>
#include <common/memory.h>
+#include <common/hathreads.h>
#include <types/global.h>
#include <types/compression.h>
@@ -40,6 +41,10 @@
#include <proto/stream.h>
+#ifdef USE_THREAD
+static HA_SPINLOCK_T comp_pool_lock;
+#endif
+
#ifdef USE_ZLIB
static void *alloc_zlib(void *opaque, unsigned int items, unsigned int size);
@@ -154,8 +159,12 @@
return -1;
#endif
- if (unlikely(pool_comp_ctx == NULL))
- pool_comp_ctx = create_pool("comp_ctx", sizeof(struct comp_ctx), MEM_F_SHARED);
+ if (unlikely(pool_comp_ctx == NULL)) {
+ SPIN_LOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ if (unlikely(pool_comp_ctx == NULL))
+ pool_comp_ctx = create_pool("comp_ctx", sizeof(struct comp_ctx), MEM_F_SHARED);
+ SPIN_UNLOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ }
*comp_ctx = pool_alloc2(pool_comp_ctx);
if (*comp_ctx == NULL)
@@ -165,7 +174,7 @@
(*comp_ctx)->direct_len = 0;
(*comp_ctx)->queued = NULL;
#elif defined(USE_ZLIB)
- zlib_used_memory += sizeof(struct comp_ctx);
+ HA_ATOMIC_ADD(&zlib_used_memory, sizeof(struct comp_ctx));
strm = &(*comp_ctx)->strm;
strm->zalloc = alloc_zlib;
@@ -187,7 +196,7 @@
*comp_ctx = NULL;
#ifdef USE_ZLIB
- zlib_used_memory -= sizeof(struct comp_ctx);
+ HA_ATOMIC_SUB(&zlib_used_memory, sizeof(struct comp_ctx));
#endif
return 0;
}
@@ -282,7 +291,7 @@
*/
static int rfc195x_add_data(struct comp_ctx *comp_ctx, const char *in_data, int in_len, struct buffer *out)
{
- static struct buffer *tmpbuf = &buf_empty;
+ static THREAD_LOCAL struct buffer *tmpbuf = &buf_empty;
if (in_len <= 0)
return 0;
@@ -393,7 +402,7 @@
static void *alloc_zlib(void *opaque, unsigned int items, unsigned int size)
{
struct comp_ctx *ctx = opaque;
- static char round = 0; /* order in deflateInit2 */
+ static THREAD_LOCAL char round = 0; /* order in deflateInit2 */
void *buf = NULL;
struct pool_head *pool = NULL;
@@ -402,42 +411,62 @@
switch (round) {
case 0:
- if (zlib_pool_deflate_state == NULL)
- zlib_pool_deflate_state = create_pool("zlib_state", size * items, MEM_F_SHARED);
+ if (zlib_pool_deflate_state == NULL) {
+ SPIN_LOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ if (zlib_pool_deflate_state == NULL)
+ zlib_pool_deflate_state = create_pool("zlib_state", size * items, MEM_F_SHARED);
+ SPIN_UNLOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ }
pool = zlib_pool_deflate_state;
ctx->zlib_deflate_state = buf = pool_alloc2(pool);
break;
case 1:
- if (zlib_pool_window == NULL)
- zlib_pool_window = create_pool("zlib_window", size * items, MEM_F_SHARED);
+ if (zlib_pool_window == NULL) {
+ SPIN_LOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ if (zlib_pool_window == NULL)
+ zlib_pool_window = create_pool("zlib_window", size * items, MEM_F_SHARED);
+ SPIN_UNLOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ }
pool = zlib_pool_window;
ctx->zlib_window = buf = pool_alloc2(pool);
break;
case 2:
- if (zlib_pool_prev == NULL)
- zlib_pool_prev = create_pool("zlib_prev", size * items, MEM_F_SHARED);
+ if (zlib_pool_prev == NULL) {
+ SPIN_LOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ if (zlib_pool_prev == NULL)
+ zlib_pool_prev = create_pool("zlib_prev", size * items, MEM_F_SHARED);
+ SPIN_UNLOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ }
pool = zlib_pool_prev;
ctx->zlib_prev = buf = pool_alloc2(pool);
break;
case 3:
- if (zlib_pool_head == NULL)
- zlib_pool_head = create_pool("zlib_head", size * items, MEM_F_SHARED);
+ if (zlib_pool_head == NULL) {
+ SPIN_LOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ if (zlib_pool_head == NULL)
+ zlib_pool_head = create_pool("zlib_head", size * items, MEM_F_SHARED);
+ SPIN_UNLOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ }
pool = zlib_pool_head;
ctx->zlib_head = buf = pool_alloc2(pool);
break;
case 4:
- if (zlib_pool_pending_buf == NULL)
- zlib_pool_pending_buf = create_pool("zlib_pending_buf", size * items, MEM_F_SHARED);
+ if (zlib_pool_pending_buf == NULL) {
+ SPIN_LOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ if (zlib_pool_pending_buf == NULL)
+ zlib_pool_pending_buf = create_pool("zlib_pending_buf", size * items, MEM_F_SHARED);
+ SPIN_UNLOCK(COMP_POOL_LOCK, &comp_pool_lock);
+ }
pool = zlib_pool_pending_buf;
ctx->zlib_pending_buf = buf = pool_alloc2(pool);
break;
}
if (buf != NULL)
- zlib_used_memory += pool->size;
+ HA_ATOMIC_ADD(&zlib_used_memory, pool->size);
end:
@@ -468,7 +497,7 @@
pool = zlib_pool_pending_buf;
pool_free2(pool, ptr);
- zlib_used_memory -= pool->size;
+ HA_ATOMIC_SUB(&zlib_used_memory, pool->size);
}
/**************************
@@ -692,6 +721,7 @@
global.tune.maxzlibmem = DEFAULT_MAXZLIBMEM * 1024U * 1024U,
#endif
#ifdef USE_ZLIB
+ SPIN_INIT(&comp_pool_lock);
memprintf(&ptr, "Built with zlib version : " ZLIB_VERSION);
memprintf(&ptr, "%s\nRunning on zlib version : %s", ptr, zlibVersion());
#elif defined(USE_SLZ)
diff --git a/src/flt_http_comp.c b/src/flt_http_comp.c
index 7a44f2d..7cbd3a7 100644
--- a/src/flt_http_comp.c
+++ b/src/flt_http_comp.c
@@ -36,8 +36,8 @@
/* Pools used to allocate comp_state structs */
static struct pool_head *pool2_comp_state = NULL;
-static struct buffer *tmpbuf = &buf_empty;
-static struct buffer *zbuf = &buf_empty;
+static THREAD_LOCAL struct buffer *tmpbuf = &buf_empty;
+static THREAD_LOCAL struct buffer *zbuf = &buf_empty;
struct comp_state {
struct comp_ctx *comp_ctx; /* compression context */
@@ -66,9 +66,8 @@
/***********************************************************************/
static int
-comp_flt_init(struct proxy *px, struct flt_conf *fconf)
+comp_flt_init_per_thread(struct proxy *px, struct flt_conf *fconf)
{
-
if (!tmpbuf->size && b_alloc(&tmpbuf) == NULL)
return -1;
if (!zbuf->size && b_alloc(&zbuf) == NULL)
@@ -77,7 +76,7 @@
}
static void
-comp_flt_deinit(struct proxy *px, struct flt_conf *fconf)
+comp_flt_deinit_per_thread(struct proxy *px, struct flt_conf *fconf)
{
if (tmpbuf->size)
b_free(&tmpbuf);
@@ -88,6 +87,7 @@
static int
comp_start_analyze(struct stream *s, struct filter *filter, struct channel *chn)
{
+
if (filter->ctx == NULL) {
struct comp_state *st;
@@ -790,8 +790,8 @@
/***********************************************************************/
struct flt_ops comp_ops = {
- .init = comp_flt_init,
- .deinit = comp_flt_deinit,
+ .init_per_thread = comp_flt_init_per_thread,
+ .deinit_per_thread = comp_flt_deinit_per_thread,
.channel_start_analyze = comp_start_analyze,
.channel_end_analyze = comp_end_analyze,