MAJOR: fd: get rid of the DWCAS when setting the running_mask
Right now we're using a DWCAS to atomically set the running_mask while
being constrained by the thread_mask. This DWCAS is annoying because we
may seriously need it later when adding support for thread groups, for
checking that the running_mask applies to the correct group.
It turns out that the DWCAS is not strictly necessary because we never
need it to set the thread_mask based on the running_mask, only the other
way around. And in fact, the running_mask is always cleared alone, and
the thread_mask is changed alone as well. The running_mask is only
relevant to indicate a takeover when the thread_mask matches it. Any
bit set in running and not present in thread_mask indicates a transition
in progress.
As such, it is possible to re-arrange this by using a regular CAS around a
consistency check between running_mask and thread_mask in fd_update_events
and by making a CAS on running_mask then an atomic store on the thread_mask
in fd_takeover(). The only other case is fd_delete() but that one already
sets the running_mask before clearing the thread_mask, which is compatible
with the consistency check above.
This change has happily survived 10 billion takeovers on a 16-thread
machine at 800k requests/s.
The fd-migration doc was updated to reflect this change.
diff --git a/doc/internals/fd-migration.txt b/doc/internals/fd-migration.txt
index 5635ec8..010431d 100644
--- a/doc/internals/fd-migration.txt
+++ b/doc/internals/fd-migration.txt
@@ -103,3 +103,36 @@
- mux->wake() is called
- if the connection previously was in the list, it's reinserted under the
idle_conns_lock.
+
+
+With the DWCAS removal between running_mask & thread_mask:
+
+fd_takeover:
+ 1 if (!CAS(&running_mask, 0, tid_bit))
+ 2 return fail;
+ 3 atomic_store(&thread_mask, tid_bit);
+ 4 atomic_and(&running_mask, ~tid_bit);
+
+poller:
+ 1 do {
+ 2 /* read consistent running_mask & thread_mask */
+ 3 do {
+ 4 run = atomic_load(&running_mask);
+ 5 thr = atomic_load(&thread_mask);
+ 6 } while (run & ~thr);
+ 7
+ 8 if (!(thr & tid_bit)) {
+ 9 /* takeover has started */
+ 10 goto disable_fd;
+ 11 }
+ 12 } while (!CAS(&running_mask, run, run | tid_bit));
+
+fd_delete:
+ 1 atomic_or(&running_mask, tid_bit);
+ 2 atomic_store(&thread_mask, 0);
+ 3 atomic_and(&running_mask, ~tid_bit);
+
+The loop in poller:3-6 is used to make sure the thread_mask we read matches
+the last updated running_mask. If nobody can give up on fd_takeover(), it
+might even be possible to spin on thread_mask only. Late pollers will not
+set running anymore with this.
diff --git a/include/haproxy/fd-t.h b/include/haproxy/fd-t.h
index 1e31825..433a3f3 100644
--- a/include/haproxy/fd-t.h
+++ b/include/haproxy/fd-t.h
@@ -151,10 +151,6 @@
/* info about one given fd. Note: only align on cache lines when using threads;
* 32-bit small archs can put everything in 32-bytes when threads are disabled.
- *
- * NOTE: DO NOT REORDER THIS STRUCT AT ALL! Some code parts rely on exact field
- * ordering, for example fd_takeover() and fd_set_running() want running_mask
- * immediately followed by thread_mask to perform a double-word-CAS on them.
*/
struct fdtab {
unsigned long running_mask; /* mask of thread IDs currently using the fd */
diff --git a/src/fd.c b/src/fd.c
index 3ac7e55..138a8fd 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -372,49 +372,36 @@
*/
int fd_takeover(int fd, void *expected_owner)
{
- int ret = -1;
-
-#ifndef HA_HAVE_CAS_DW
- if (_HA_ATOMIC_OR_FETCH(&fdtab[fd].running_mask, tid_bit) == tid_bit) {
- HA_RWLOCK_WRLOCK(OTHER_LOCK, &fd_mig_lock);
- if (fdtab[fd].owner == expected_owner) {
- fdtab[fd].thread_mask = tid_bit;
- ret = 0;
- }
- HA_RWLOCK_WRUNLOCK(OTHER_LOCK, &fd_mig_lock);
- }
-#else
- unsigned long old_masks[2];
- unsigned long new_masks[2];
-
- new_masks[0] = new_masks[1] = tid_bit;
-
- old_masks[0] = _HA_ATOMIC_OR_FETCH(&fdtab[fd].running_mask, tid_bit);
- old_masks[1] = fdtab[fd].thread_mask;
+ unsigned long old;
/* protect ourself against a delete then an insert for the same fd,
* if it happens, then the owner will no longer be the expected
* connection.
*/
- if (fdtab[fd].owner == expected_owner) {
- while (old_masks[0] == tid_bit && old_masks[1]) {
- if (_HA_ATOMIC_DWCAS(&fdtab[fd].running_mask, &old_masks, &new_masks)) {
- ret = 0;
- break;
- }
- }
- }
-#endif /* HW_HAVE_CAS_DW */
+ if (fdtab[fd].owner != expected_owner)
+ return -1;
- _HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
+ /* we must be alone to work on this idle FD. If not, it means that its
+ * poller is currently waking up and is about to use it, likely to
+ * close it on shut/error, but maybe also to process any unexpectedly
+ * pending data.
+ */
+ old = 0;
+ if (!HA_ATOMIC_CAS(&fdtab[fd].running_mask, &old, tid_bit))
+ return -1;
+
+ /* success, from now on it's ours */
+ HA_ATOMIC_STORE(&fdtab[fd].thread_mask, tid_bit);
/* Make sure the FD doesn't have the active bit. It is possible that
* the fd is polled by the thread that used to own it, the new thread
* is supposed to call subscribe() later, to activate polling.
*/
- if (likely(ret == 0))
- fd_stop_recv(fd);
- return ret;
+ fd_stop_recv(fd);
+
+ /* we're done with it */
+ HA_ATOMIC_AND(&fdtab[fd].running_mask, ~tid_bit);
+ return 0;
}
void updt_fd_polling(const int fd)
@@ -456,16 +443,29 @@
unsigned long locked;
uint old, new;
uint new_flags, must_stop;
+ ulong rmask, tmask;
ti->flags &= ~TI_FL_STUCK; // this thread is still running
/* do nothing if the FD was taken over under us */
- if (fd_set_running(fd) == -1) {
- activity[tid].poll_skip_fd++;
- return FD_UPDT_MIGRATED;
- }
+ do {
+ /* make sure we read a synchronous copy of rmask and tmask
+ * (tmask is only up to date if it reflects all of rmask's
+ * bits).
+ */
+ do {
+ rmask = _HA_ATOMIC_LOAD(&fdtab[fd].running_mask);
+ tmask = _HA_ATOMIC_LOAD(&fdtab[fd].thread_mask);
+ } while (rmask & ~tmask);
+
+ if (!(tmask & tid_bit)) {
+ /* a takeover has started */
+ activity[tid].poll_skip_fd++;
+ return FD_UPDT_MIGRATED;
+ }
+ } while (!HA_ATOMIC_CAS(&fdtab[fd].running_mask, &rmask, rmask | tid_bit));
- locked = (fdtab[fd].thread_mask != tid_bit);
+ locked = (tmask != tid_bit);
/* OK now we are guaranteed that our thread_mask was present and
* that we're allowed to update the FD.