BUG/MAJOR: fd/threads, task/threads: ensure all spin locks are unlocked
Calculate if the fd or task should be locked once, before locking, and
reuse the calculation when determing when to unlock.
Fixes a race condition added in 87d54a9a for fds, and b20aa9ee for tasks,
released in 1.9-dev4. When one thread modifies thread_mask to be a single
thread for a task or fd while a second thread has locked or is waiting on a
lock for that task or fd, the second thread will not unlock it. For FDs,
this is observable when a listener is polled by multiple threads, and is
closed while those threads have events pending. For tasks, this seems
possible, where task_set_affinity is called, but I did not observe it.
This must be backported to 1.9.
diff --git a/include/proto/fd.h b/include/proto/fd.h
index 14c2220..d227ae0 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -282,6 +282,7 @@
static inline void fd_stop_recv(int fd)
{
unsigned char old, new;
+ unsigned long locked;
old = fdtab[fd].state;
do {
@@ -294,10 +295,11 @@
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
- if (atleast2(fdtab[fd].thread_mask))
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@@ -305,6 +307,7 @@
static inline void fd_stop_send(int fd)
{
unsigned char old, new;
+ unsigned long locked;
old = fdtab[fd].state;
do {
@@ -317,10 +320,11 @@
if ((old ^ new) & FD_EV_POLLED_W)
updt_fd_polling(fd);
- if (atleast2(fdtab[fd].thread_mask))
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@@ -328,6 +332,7 @@
static inline void fd_stop_both(int fd)
{
unsigned char old, new;
+ unsigned long locked;
old = fdtab[fd].state;
do {
@@ -340,10 +345,11 @@
if ((old ^ new) & FD_EV_POLLED_RW)
updt_fd_polling(fd);
- if (atleast2(fdtab[fd].thread_mask))
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@@ -351,6 +357,7 @@
static inline void fd_cant_recv(const int fd)
{
unsigned char old, new;
+ unsigned long locked;
old = fdtab[fd].state;
do {
@@ -364,23 +371,27 @@
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
- if (atleast2(fdtab[fd].thread_mask))
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> can receive anymore without polling. */
static inline void fd_may_recv(const int fd)
{
+ unsigned long locked;
+
/* marking ready never changes polled status */
HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_R);
- if (atleast2(fdtab[fd].thread_mask))
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@@ -392,6 +403,7 @@
static inline void fd_done_recv(const int fd)
{
unsigned char old, new;
+ unsigned long locked;
old = fdtab[fd].state;
do {
@@ -405,10 +417,11 @@
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
- if (atleast2(fdtab[fd].thread_mask))
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@@ -416,6 +429,7 @@
static inline void fd_cant_send(const int fd)
{
unsigned char old, new;
+ unsigned long locked;
old = fdtab[fd].state;
do {
@@ -429,23 +443,27 @@
if ((old ^ new) & FD_EV_POLLED_W)
updt_fd_polling(fd);
- if (atleast2(fdtab[fd].thread_mask))
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
static inline void fd_may_send(const int fd)
{
+ unsigned long locked;
+
/* marking ready never changes polled status */
HA_ATOMIC_OR(&fdtab[fd].state, FD_EV_READY_W);
- if (atleast2(fdtab[fd].thread_mask))
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@@ -453,6 +471,7 @@
static inline void fd_want_recv(int fd)
{
unsigned char old, new;
+ unsigned long locked;
old = fdtab[fd].state;
do {
@@ -466,10 +485,11 @@
if ((old ^ new) & FD_EV_POLLED_R)
updt_fd_polling(fd);
- if (atleast2(fdtab[fd].thread_mask))
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@@ -477,6 +497,7 @@
static inline void fd_want_send(int fd)
{
unsigned char old, new;
+ unsigned long locked;
old = fdtab[fd].state;
do {
@@ -490,10 +511,11 @@
if ((old ^ new) & FD_EV_POLLED_W)
updt_fd_polling(fd);
- if (atleast2(fdtab[fd].thread_mask))
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fd_update_cache(fd); /* need an update entry to change the state */
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
@@ -501,11 +523,13 @@
* by the poller to set FD_POLL_* flags. */
static inline void fd_update_events(int fd, int evts)
{
- if (atleast2(fdtab[fd].thread_mask))
+ unsigned long locked = atleast2(fdtab[fd].thread_mask);
+
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].ev &= FD_POLL_STICKY;
fdtab[fd].ev |= evts;
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
@@ -518,7 +542,9 @@
/* Prepares <fd> for being polled */
static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned long thread_mask)
{
- if (atleast2(thread_mask))
+ unsigned long locked = atleast2(thread_mask);
+
+ if (locked)
HA_SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].owner = owner;
fdtab[fd].iocb = iocb;
@@ -529,7 +555,7 @@
/* note: do not reset polled_mask here as it indicates which poller
* still knows this FD from a possible previous round.
*/
- if (atleast2(thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
diff --git a/include/proto/task.h b/include/proto/task.h
index 0177c52..eddbf92 100644
--- a/include/proto/task.h
+++ b/include/proto/task.h
@@ -184,11 +184,14 @@
*/
static inline struct task *task_unlink_wq(struct task *t)
{
+ unsigned long locked;
+
if (likely(task_in_wq(t))) {
- if (atleast2(t->thread_mask))
+ locked = atleast2(t->thread_mask);
+ if (locked)
HA_SPIN_LOCK(TASK_WQ_LOCK, &wq_lock);
__task_unlink_wq(t);
- if (atleast2(t->thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(TASK_WQ_LOCK, &wq_lock);
}
return t;
diff --git a/src/fd.c b/src/fd.c
index 7fdd56b..314e10f 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -411,6 +411,7 @@
static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
{
int fd, old_fd, e;
+ unsigned long locked;
for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].cache.next) {
if (fd == -2) {
@@ -427,7 +428,8 @@
continue;
HA_ATOMIC_OR(&fd_cache_mask, tid_bit);
- if (atleast2(fdtab[fd].thread_mask) && HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
+ locked = atleast2(fdtab[fd].thread_mask);
+ if (locked && HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
activity[tid].fd_lock++;
continue;
}
@@ -442,13 +444,13 @@
fdtab[fd].ev |= FD_POLL_OUT;
if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) {
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].iocb(fd);
}
else {
fd_release_cache_entry(fd);
- if (atleast2(fdtab[fd].thread_mask))
+ if (locked)
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
}