MINOR: listener: make accept_queue index atomic
There has always been a race when checking the length of an accept queue
to determine which one is more loaded that another, because the head and
tail are read at two different moments. This is not required, we can merge
them as two 16 bit numbers inside a single 32-bit index that is always
accessed atomically. This way we read both values at once and always have
a consistent measurement.
diff --git a/include/haproxy/listener-t.h b/include/haproxy/listener-t.h
index f45c990..1ee17f6 100644
--- a/include/haproxy/listener-t.h
+++ b/include/haproxy/listener-t.h
@@ -289,9 +289,9 @@
/* The per-thread accept queue ring, must be a power of two minus 1 */
#define ACCEPT_QUEUE_SIZE ((1<<10) - 1)
+/* head and tail are both 16 bits so that idx can be accessed atomically */
struct accept_queue_ring {
- unsigned int head;
- unsigned int tail;
+ uint32_t idx; /* (head << 16) | tail */
struct tasklet *tasklet; /* tasklet of the thread owning this ring */
struct connection *entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64)));
};
diff --git a/include/haproxy/listener.h b/include/haproxy/listener.h
index f759952..afb4530 100644
--- a/include/haproxy/listener.h
+++ b/include/haproxy/listener.h
@@ -212,6 +212,19 @@
extern const char* li_status_st[LI_STATE_COUNT];
enum li_status get_li_status(struct listener *l);
+static inline uint accept_queue_ring_len(const struct accept_queue_ring *ring)
+{
+ uint idx, head, tail, len;
+
+ idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */
+ head = idx >> 16;
+ tail = idx & 0xffff;
+ len = tail + ACCEPT_QUEUE_SIZE - head;
+ if (len >= ACCEPT_QUEUE_SIZE)
+ len -= ACCEPT_QUEUE_SIZE;
+ return len;
+}
+
#endif /* _HAPROXY_LISTENER_H */
/*
diff --git a/src/activity.c b/src/activity.c
index c47bd84..7664d0c 100644
--- a/src/activity.c
+++ b/src/activity.c
@@ -1115,7 +1115,7 @@
chunk_appendf(&trash, "accq_pushed:"); SHOW_TOT(thr, activity[thr].accq_pushed);
chunk_appendf(&trash, "accq_full:"); SHOW_TOT(thr, activity[thr].accq_full);
#ifdef USE_THREAD
- chunk_appendf(&trash, "accq_ring:"); SHOW_TOT(thr, (accept_queue_rings[thr].tail - accept_queue_rings[thr].head + ACCEPT_QUEUE_SIZE) % ACCEPT_QUEUE_SIZE);
+ chunk_appendf(&trash, "accq_ring:"); SHOW_TOT(thr, accept_queue_ring_len(&accept_queue_rings[thr]));
chunk_appendf(&trash, "fd_takeover:"); SHOW_TOT(thr, activity[thr].fd_takeover);
#endif
diff --git a/src/listener.c b/src/listener.c
index eb872e6..8215cec 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -68,10 +68,10 @@
unsigned int pos, next;
struct connection *ptr;
struct connection **e;
+ uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */
- pos = ring->head;
-
- if (pos == ring->tail)
+ pos = idx >> 16;
+ if (pos == (uint16_t)idx)
return NULL;
next = pos + 1;
@@ -93,7 +93,10 @@
*e = NULL;
__ha_barrier_store();
- ring->head = next;
+ do {
+ pos = (next << 16) | (idx & 0xffff);
+ } while (unlikely(!HA_ATOMIC_CAS(&ring->idx, &idx, pos) && __ha_cpu_relax()));
+
return ptr;
}
@@ -105,15 +108,17 @@
int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn)
{
unsigned int pos, next;
+ uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */
- pos = ring->tail;
do {
+ pos = (uint16_t)idx;
next = pos + 1;
if (next >= ACCEPT_QUEUE_SIZE)
next = 0;
- if (next == ring->head)
+ if (next == (idx >> 16))
return 0; // ring full
- } while (unlikely(!_HA_ATOMIC_CAS(&ring->tail, &pos, next)));
+ next |= (idx & 0xffff0000U);
+ } while (unlikely(!_HA_ATOMIC_CAS(&ring->idx, &idx, next) && __ha_cpu_relax()));
ring->entry[pos] = conn;
__ha_barrier_store();
@@ -1230,13 +1235,8 @@
}
/* now we have two distinct thread IDs belonging to the mask */
- q1 = accept_queue_rings[base + t1].tail - accept_queue_rings[base + t1].head + ACCEPT_QUEUE_SIZE;
- if (q1 >= ACCEPT_QUEUE_SIZE)
- q1 -= ACCEPT_QUEUE_SIZE;
-
- q2 = accept_queue_rings[base + t2].tail - accept_queue_rings[base + t2].head + ACCEPT_QUEUE_SIZE;
- if (q2 >= ACCEPT_QUEUE_SIZE)
- q2 -= ACCEPT_QUEUE_SIZE;
+ q1 = accept_queue_ring_len(&accept_queue_rings[base + t1]);
+ q2 = accept_queue_ring_len(&accept_queue_rings[base + t2]);
/* we have 3 possibilities now :
* q1 < q2 : t1 is less loaded than t2, so we pick it