MAJOR: threads/buffer: Make buffer wait queue thread safe
Adds a global lock to protect the buffer wait queue.
diff --git a/src/buffer.c b/src/buffer.c
index 83e4e9e..e892d1e 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -33,6 +33,9 @@
/* list of objects waiting for at least one buffer */
struct list buffer_wq = LIST_HEAD_INIT(buffer_wq);
+#ifdef USE_THREAD
+HA_SPINLOCK_T buffer_wq_lock;
+#endif
/* this buffer is always the same size as standard buffers and is used for
* swapping data inside a buffer.
@@ -72,6 +75,8 @@
if (global.tune.buf_limit)
pool2_buffer->limit = global.tune.buf_limit;
+ SPIN_INIT(&buffer_wq_lock);
+
buffer = pool_refill_alloc(pool2_buffer, pool2_buffer->minavail - 1);
if (!buffer)
return 0;
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index aa3f37a..7fc4ed8 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -18,6 +18,7 @@
#include <common/debug.h>
#include <common/memory.h>
#include <common/time.h>
+#include <common/hathreads.h>
#include <types/arg.h>
#include <types/global.h>
@@ -2685,14 +2686,18 @@
return 1;
if (!LIST_ISEMPTY(&buffer_wait->list)) {
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&buffer_wait->list);
LIST_INIT(&buffer_wait->list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
if (b_alloc_margin(buf, global.tune.reserved_bufs))
return 1;
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_ADDQ(&buffer_wq, &buffer_wait->list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
return 0;
}
@@ -2700,8 +2705,10 @@
spoe_release_buffer(struct buffer **buf, struct buffer_wait *buffer_wait)
{
if (!LIST_ISEMPTY(&buffer_wait->list)) {
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&buffer_wait->list);
LIST_INIT(&buffer_wait->list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
/* Release the buffer if needed */
diff --git a/src/stream.c b/src/stream.c
index 8975638..51d2354 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -320,8 +320,10 @@
/* We may still be present in the buffer wait queue */
if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&s->buffer_wait.list);
LIST_INIT(&s->buffer_wait.list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
if (s->req.buf->size || s->res.buf->size) {
b_drop(&s->req.buf);
@@ -415,14 +417,18 @@
static int stream_alloc_work_buffer(struct stream *s)
{
if (!LIST_ISEMPTY(&s->buffer_wait.list)) {
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_DEL(&s->buffer_wait.list);
LIST_INIT(&s->buffer_wait.list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
}
if (b_alloc_margin(&s->res.buf, 0))
return 1;
+ SPIN_LOCK(BUF_WQ_LOCK, &buffer_wq_lock);
LIST_ADDQ(&buffer_wq, &s->buffer_wait.list);
+ SPIN_UNLOCK(BUF_WQ_LOCK, &buffer_wq_lock);
return 0;
}