BUG/MEDIUM: port_range: Make the ring buffer lock-free.
Port range uses a ring buffer, and unfortunately, when making haproxy
multithreaded, it's been overlooked, and the ring buffer is not thread-safe.
When specifying a source range, 2 or more threads could pick the same
port, and of course only one of them could use the port, the others would
always fail the connection.
To fix this, make it a lock-free ring buffer. This is easier than usual
because we know the ring buffer can never be full.
This should be backported to 1.8 and 1.9.
diff --git a/include/proto/port_range.h b/include/proto/port_range.h
index 8c63fac..c651fa8 100644
--- a/include/proto/port_range.h
+++ b/include/proto/port_range.h
@@ -24,18 +24,24 @@
#include <types/port_range.h>
+#define GET_NEXT_OFF(range, off) ((off) == (range)->size - 1 ? 0 : (off) + 1)
+
/* return an available port from range <range>, or zero if none is left */
static inline int port_range_alloc_port(struct port_range *range)
{
int ret;
+ int get;
+ int put;
- if (!range->avail)
- return 0;
- ret = range->ports[range->get];
- range->get++;
- if (range->get >= range->size)
- range->get = 0;
- range->avail--;
+ get = _HA_ATOMIC_LOAD(&range->get);
+ do {
+ /* barrier ot make sure get is loaded before put */
+ __ha_barrier_atomic_load();
+ put = _HA_ATOMIC_LOAD(&range->put_t);
+ if (unlikely(put == get))
+ return 0;
+ ret = range->ports[get];
+ } while (!(_HA_ATOMIC_CAS(&range->get, &get, GET_NEXT_OFF(range, get))));
return ret;
}
@@ -45,14 +51,30 @@
*/
static inline void port_range_release_port(struct port_range *range, int port)
{
+ int put;
+
if (!port || !range)
return;
- range->ports[range->put] = port;
- range->avail++;
- range->put++;
- if (range->put >= range->size)
- range->put = 0;
+ put = range->put_h;
+ /* put_h is reserved for producers, so that they can each get a
+ * free slot, put_t is what is used by consumers to know if there's
+ * elements available or not
+ */
+ /* First reserve or slot, we know the ring buffer can't be full,
+ * as we will only ever release port we allocated before
+ */
+ while (!(_HA_ATOMIC_CAS(&range->put_h, &put, GET_NEXT_OFF(range, put))));
+ _HA_ATOMIC_STORE(&range->ports[put], port);
+ /* Barrier to make sure the new port is visible before we change put_t */
+ __ha_barrier_atomic_store();
+ /* Wait until all the threads that got a slot before us are done */
+ while ((volatile int)range->put_t != put)
+ __ha_compiler_barrier();
+ /* Let the world know we're done, and any potential consumer they
+ * can use that port.
+ */
+ _HA_ATOMIC_STORE(&range->put_t, GET_NEXT_OFF(range, put));
}
/* return a new initialized port range of N ports. The ports are not
@@ -62,8 +84,10 @@
{
struct port_range *ret;
ret = calloc(1, sizeof(struct port_range) +
- n * sizeof(((struct port_range *)0)->ports[0]));
- ret->size = ret->avail = n;
+ (n + 1) * sizeof(((struct port_range *)0)->ports[0]));
+ ret->size = n + 1;
+ /* Start at the first free element */
+ ret->put_h = ret->put_t = n;
return ret;
}