MAJOR: fd/threads: Make the fdcache mostly lockless.
Create a local, per-thread, fdcache, for file descriptors that only belongs
to one thread, and make the global fd cache mostly lockless, as we can get
a lot of contention on the fd cache lock.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 1dabf3c..a8fdf15 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -259,7 +259,6 @@
/* WARNING!!! if you update this enum, please also keep lock_label() up to date below */
enum lock_label {
THREAD_SYNC_LOCK = 0,
- FDCACHE_LOCK,
FD_LOCK,
TASK_RQ_LOCK,
TASK_WQ_LOCK,
@@ -376,7 +375,6 @@
{
switch (label) {
case THREAD_SYNC_LOCK: return "THREAD_SYNC";
- case FDCACHE_LOCK: return "FDCACHE";
case FD_LOCK: return "FD";
case TASK_RQ_LOCK: return "TASK_RQ";
case TASK_WQ_LOCK: return "TASK_WQ";
diff --git a/include/proto/fd.h b/include/proto/fd.h
index a7e70b7..595af90 100644
--- a/include/proto/fd.h
+++ b/include/proto/fd.h
@@ -33,8 +33,9 @@
/* public variables */
-extern unsigned int *fd_cache; // FD events cache
-extern int fd_cache_num; // number of events in the cache
+extern volatile struct fdlist fd_cache;
+extern volatile struct fdlist fd_cache_local[MAX_THREADS];
+
extern unsigned long fd_cache_mask; // Mask of threads with events in the cache
extern THREAD_LOCAL int *fd_updt; // FD updates list
@@ -104,45 +105,224 @@
fd_updt[fd_nbupdt++] = fd;
}
+
+#define _GET_NEXT(fd) fdtab[fd].fdcache_entry.next
+#define _GET_PREV(fd) fdtab[fd].fdcache_entry.prev
+
+static inline void fd_add_to_fd_list(volatile struct fdlist *list, int fd)
+{
+ int next;
+ int new;
+ int old;
+ int last;
+
+redo_next:
+ next = _GET_NEXT(fd);
+ /*
+ * Check that we're not already in the cache, and if not, lock us.
+ * <= -3 means not in the cache, -2 means locked, -1 means we're
+ * in the cache, and the last element, >= 0 gives the FD of the next
+ * in the cache.
+ */
+ if (next >= -2)
+ goto done;
+ if (!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2))
+ goto redo_next;
+ __ha_barrier_store();
+redo_last:
+ /* First, insert in the linked list */
+ last = list->last;
+ old = -1;
+ new = fd;
+ if (unlikely(last == -1)) {
+ /* list is empty, try to add ourselves alone so that list->last=fd */
+
+ _GET_PREV(fd) = last;
+
+ /* Make sure the "prev" store is visible before we update the last entry */
+ __ha_barrier_store();
+ if (unlikely(!HA_ATOMIC_CAS(&list->last, &old, new)))
+ goto redo_last;
+
+ /* list->first was necessary -1, we're guaranteed to be alone here */
+ list->first = fd;
+
+ /* since we're alone at the end of the list and still locked(-2),
+ * we know noone tried to add past us. Mark the end of list.
+ */
+ _GET_NEXT(fd) = -1;
+ goto done; /* We're done ! */
+ } else {
+ /* non-empty list, add past the tail */
+ do {
+ new = fd;
+ old = -1;
+ _GET_PREV(fd) = last;
+
+ __ha_barrier_store();
+
+ /* adding ourselves past the last element
+ * The CAS will only succeed if its next is -1,
+ * which means it's in the cache, and the last element.
+ */
+ if (likely(HA_ATOMIC_CAS(&_GET_NEXT(last), &old, new)))
+ break;
+ goto redo_last;
+ } while (1);
+ }
+ /* Then, update the last entry */
+redo_fd_cache:
+ last = list->last;
+ __ha_barrier_load();
+
+ if (unlikely(!HA_ATOMIC_CAS(&list->last, &last, fd)))
+ goto redo_fd_cache;
+ __ha_barrier_store();
+ _GET_NEXT(fd) = -1;
+ __ha_barrier_store();
+done:
+ return;
+}
/* Allocates a cache entry for a file descriptor if it does not yet have one.
* This can be done at any time.
*/
static inline void fd_alloc_cache_entry(const int fd)
{
- HA_RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
- if (fdtab[fd].cache)
- goto end;
- fd_cache_num++;
- fd_cache_mask |= fdtab[fd].thread_mask;
- fdtab[fd].cache = fd_cache_num;
- fd_cache[fd_cache_num-1] = fd;
- end:
- HA_RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
+ if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
+ fd_add_to_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
+ else
+ fd_add_to_fd_list(&fd_cache, fd);
+ }
+
+static inline void fd_rm_from_fd_list(volatile struct fdlist *list, int fd)
+{
+#if defined(HA_HAVE_CAS_DW) || defined(HA_CAS_IS_8B)
+ volatile struct fdlist_entry cur_list, next_list;
+#endif
+ int old;
+ int new = -2;
+ volatile int prev;
+ volatile int next;
+ int last;
+
+lock_self:
+#if (defined(HA_CAS_IS_8B) || defined(HA_HAVE_CAS_DW))
+ next_list.next = next_list.prev = -2;
+ cur_list.prev = _GET_PREV(fd);
+ cur_list.next = _GET_NEXT(fd);
+ /* First, attempt to lock our own entries */
+ do {
+ /* The FD is not in the FD cache, give up */
+ if (unlikely(cur_list.next <= -3))
+ return;
+ if (unlikely(cur_list.prev == -2 || cur_list.next == -2))
+ goto lock_self;
+ } while (
+#ifdef HA_CAS_IS_8B
+ unlikely(!HA_ATOMIC_CAS(((void **)(void *)&_GET_NEXT(fd)), ((void **)(void *)&cur_list), (*(void **)(void *)&next_list))))
+#else
+ unlikely(!__ha_cas_dw((void *)&_GET_NEXT(fd), (void *)&cur_list, (void *)&next_list)))
+#endif
+ ;
+ next = cur_list.next;
+ prev = cur_list.prev;
+
+#else
+lock_self_next:
+ next = _GET_NEXT(fd);
+ if (next == -2)
+ goto lock_self_next;
+ if (next <= -3)
+ goto done;
+ if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(fd), &next, -2)))
+ goto lock_self_next;
+lock_self_prev:
+ prev = _GET_PREV(fd);
+ if (prev == -2)
+ goto lock_self_prev;
+ if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(fd), &prev, -2)))
+ goto lock_self_prev;
+#endif
+ __ha_barrier_store();
+
+ /* Now, lock the entries of our neighbours */
+ if (likely(prev != -1)) {
+redo_prev:
+ old = fd;
+
+ if (unlikely(!HA_ATOMIC_CAS(&_GET_NEXT(prev), &old, new))) {
+ if (unlikely(old == -2)) {
+ /* Neighbour already locked, give up and
+ * retry again once he's done
+ */
+ _GET_PREV(fd) = prev;
+ __ha_barrier_store();
+ _GET_NEXT(fd) = next;
+ __ha_barrier_store();
+ goto lock_self;
+ }
+ goto redo_prev;
+ }
+ }
+ if (likely(next != -1)) {
+redo_next:
+ old = fd;
+ if (unlikely(!HA_ATOMIC_CAS(&_GET_PREV(next), &old, new))) {
+ if (unlikely(old == -2)) {
+ /* Neighbour already locked, give up and
+ * retry again once he's done
+ */
+ if (prev != -1) {
+ _GET_NEXT(prev) = fd;
+ __ha_barrier_store();
+ }
+ _GET_PREV(fd) = prev;
+ __ha_barrier_store();
+ _GET_NEXT(fd) = next;
+ __ha_barrier_store();
+ goto lock_self;
+ }
+ goto redo_next;
+ }
+ }
+ if (list->first == fd)
+ list->first = next;
+ __ha_barrier_store();
+ last = list->last;
+ while (unlikely(last == fd && (!HA_ATOMIC_CAS(&list->last, &last, prev))))
+ __ha_compiler_barrier();
+ /* Make sure we let other threads know we're no longer in cache,
+ * before releasing our neighbours.
+ */
+ __ha_barrier_store();
+ if (likely(prev != -1))
+ _GET_NEXT(prev) = next;
+ __ha_barrier_store();
+ if (likely(next != -1))
+ _GET_PREV(next) = prev;
+ __ha_barrier_store();
+ /* Ok, now we're out of the fd cache */
+ _GET_NEXT(fd) = -(next + 4);
+ __ha_barrier_store();
+done:
+ return;
}
+#undef _GET_NEXT
+#undef _GET_PREV
+
+
/* Removes entry used by fd <fd> from the FD cache and replaces it with the
- * last one. The fdtab.cache is adjusted to match the back reference if needed.
+ * last one.
* If the fd has no entry assigned, return immediately.
*/
static inline void fd_release_cache_entry(int fd)
{
- unsigned int pos;
-
- HA_RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
- pos = fdtab[fd].cache;
- if (!pos)
- goto end;
- fdtab[fd].cache = 0;
- fd_cache_num--;
- if (likely(pos <= fd_cache_num)) {
- /* was not the last entry */
- fd = fd_cache[fd_cache_num];
- fd_cache[pos - 1] = fd;
- fdtab[fd].cache = pos;
- }
- end:
- HA_RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
+ if (!(fdtab[fd].thread_mask & (fdtab[fd].thread_mask - 1)))
+ fd_rm_from_fd_list(&fd_cache_local[my_ffsl(fdtab[fd].thread_mask) - 1], fd);
+ else
+ fd_rm_from_fd_list(&fd_cache, fd);
}
/* Computes the new polled status based on the active and ready statuses, for
@@ -402,7 +582,6 @@
fdtab[fd].update_mask &= ~tid_bit;
fdtab[fd].linger_risk = 0;
fdtab[fd].cloned = 0;
- fdtab[fd].cache = 0;
fdtab[fd].thread_mask = thread_mask;
/* note: do not reset polled_mask here as it indicates which poller
* still knows this FD from a possible previous round.
diff --git a/include/types/fd.h b/include/types/fd.h
index e04ea67..8edf26b 100644
--- a/include/types/fd.h
+++ b/include/types/fd.h
@@ -90,15 +90,25 @@
*/
#define DEAD_FD_MAGIC 0xFDDEADFD
+struct fdlist_entry {
+ volatile int next;
+ volatile int prev;
+} __attribute__ ((aligned(8)));
+
+struct fdlist {
+ volatile int first;
+ volatile int last;
+} __attribute__ ((aligned(8)));
+
/* info about one given fd */
struct fdtab {
__decl_hathreads(HA_SPINLOCK_T lock);
unsigned long thread_mask; /* mask of thread IDs authorized to process the task */
unsigned long polled_mask; /* mask of thread IDs currently polling this fd */
unsigned long update_mask; /* mask of thread IDs having an update for fd */
+ struct fdlist_entry fdcache_entry; /* Entry in the fdcache */
void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */
- unsigned int cache; /* position+1 in the FD cache. 0=not in cache. */
unsigned char state; /* FD state for read and write directions (2*3 bits) */
unsigned char ev; /* event seen in return of poll() : FD_POLL_* */
unsigned char linger_risk:1; /* 1 if we must kill lingering before closing */
diff --git a/src/cli.c b/src/cli.c
index ed8cc5b..85d3567 100644
--- a/src/cli.c
+++ b/src/cli.c
@@ -811,7 +811,7 @@
(fdt.ev & FD_POLL_IN) ? 'I' : 'i',
fdt.linger_risk ? 'L' : 'l',
fdt.cloned ? 'C' : 'c',
- fdt.cache,
+ fdt.fdcache_entry.next >= -2 ? 1 : 0,
fdt.owner,
fdt.iocb,
(fdt.iocb == conn_fd_handler) ? "conn_fd_handler" :
diff --git a/src/fd.c b/src/fd.c
index 0995040..2cd79fb 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -167,15 +167,14 @@
struct poller cur_poller;
int nbpollers = 0;
-unsigned int *fd_cache = NULL; // FD events cache
-int fd_cache_num = 0; // number of events in the cache
+volatile struct fdlist fd_cache ; // FD events cache
+volatile struct fdlist fd_cache_local[MAX_THREADS]; // FD events local for each thread
+
unsigned long fd_cache_mask = 0; // Mask of threads with events in the cache
THREAD_LOCAL int *fd_updt = NULL; // FD updates list
THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list
-__decl_hathreads(HA_RWLOCK_T fdcache_lock); /* global lock to protect fd_cache array */
-
/* Deletes an FD from the fdsets.
* The file descriptor is also closed.
*/
@@ -221,33 +220,30 @@
fd_dodelete(fd, 0);
}
-/* Scan and process the cached events. This should be called right after
- * the poller. The loop may cause new entries to be created, for example
- * if a listener causes an accept() to initiate a new incoming connection
- * wanting to attempt an recv().
- */
-void fd_process_cached_events()
+static inline void fdlist_process_cached_events(volatile struct fdlist *fdlist)
{
- int fd, entry, e;
+ int fd, old_fd, e;
- HA_RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
- fd_cache_mask &= ~tid_bit;
- for (entry = 0; entry < fd_cache_num; ) {
- fd = fd_cache[entry];
-
- if (!(fdtab[fd].thread_mask & tid_bit)) {
- activity[tid].fd_skip++;
- goto next;
- }
+ for (old_fd = fd = fdlist->first; fd != -1; fd = fdtab[fd].fdcache_entry.next) {
+ if (fd == -2) {
+ fd = old_fd;
+ continue;
+ } else if (fd <= -3)
+ fd = -fd - 4;
+ if (fd == -1)
+ break;
+ old_fd = fd;
+ if (!(fdtab[fd].thread_mask & tid_bit))
+ continue;
+ if (fdtab[fd].fdcache_entry.next < -3)
+ continue;
- fd_cache_mask |= tid_bit;
+ HA_ATOMIC_OR(&fd_cache_mask, tid_bit);
if (HA_SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock)) {
activity[tid].fd_lock++;
- goto next;
+ continue;
}
- HA_RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
-
e = fdtab[fd].state;
fdtab[fd].ev &= FD_POLL_STICKY;
@@ -265,19 +261,19 @@
fd_release_cache_entry(fd);
HA_SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
-
- HA_RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
- /* If the fd was removed from the cache, it has been
- * replaced by the next one that we don't want to skip !
- */
- if (entry < fd_cache_num && fd_cache[entry] != fd) {
- activity[tid].fd_del++;
- continue;
- }
- next:
- entry++;
}
- HA_RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
+}
+
+/* Scan and process the cached events. This should be called right after
+ * the poller. The loop may cause new entries to be created, for example
+ * if a listener causes an accept() to initiate a new incoming connection
+ * wanting to attempt an recv().
+ */
+void fd_process_cached_events()
+{
+ HA_ATOMIC_AND(&fd_cache_mask, ~tid_bit);
+ fdlist_process_cached_events(&fd_cache_local[tid]);
+ fdlist_process_cached_events(&fd_cache);
}
/* disable the specified poller */
@@ -320,16 +316,19 @@
if ((fdinfo = calloc(global.maxsock, sizeof(struct fdinfo))) == NULL)
goto fail_info;
- if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL)
- goto fail_cache;
-
+ fd_cache.first = fd_cache.last = -1;
hap_register_per_thread_init(init_pollers_per_thread);
hap_register_per_thread_deinit(deinit_pollers_per_thread);
- for (p = 0; p < global.maxsock; p++)
+ for (p = 0; p < global.maxsock; p++) {
HA_SPIN_INIT(&fdtab[p].lock);
+ /* Mark the fd as out of the fd cache */
+ fdtab[p].fdcache_entry.next = -3;
+ fdtab[p].fdcache_entry.next = -3;
+ }
+ for (p = 0; p < global.nbthread; p++)
+ fd_cache_local[p].first = fd_cache_local[p].last = -1;
- HA_RWLOCK_INIT(&fdcache_lock);
do {
bp = NULL;
for (p = 0; p < nbpollers; p++)
@@ -372,11 +371,8 @@
bp->term(bp);
}
- free(fd_cache); fd_cache = NULL;
free(fdinfo); fdinfo = NULL;
free(fdtab); fdtab = NULL;
-
- HA_RWLOCK_DESTROY(&fdcache_lock);
}
/*
diff --git a/src/stream.c b/src/stream.c
index 92f9c0a..e710d0d 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -2905,7 +2905,7 @@
conn->flags,
conn->handle.fd,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0,
- conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0,
+ conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0,
conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
}
@@ -2938,7 +2938,7 @@
conn->flags,
conn->handle.fd,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].state : 0,
- conn->handle.fd >= 0 ? fdtab[conn->handle.fd].cache : 0,
+ conn->handle.fd >= 0 ? fdtab[conn->handle.fd].fdcache_entry.next >= -2 : 0,
conn->handle.fd >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
conn->handle.fd >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
}