BUG/MEDIUM: port_range: Make the ring buffer lock-free.

Port range uses a ring buffer, and unfortunately, when making haproxy
multithreaded, it's been overlooked, and the ring buffer is not thread-safe.
When specifying a source range, 2 or more threads could pick the same
port, and of course only one of them could use the port, the others would
always fail the connection.
To fix this, make it a lock-free ring buffer. This is easier than usual
because we know the ring buffer can never be full.

This should be backported to 1.8 and 1.9.
diff --git a/include/proto/port_range.h b/include/proto/port_range.h
index 8c63fac..c651fa8 100644
--- a/include/proto/port_range.h
+++ b/include/proto/port_range.h
@@ -24,18 +24,24 @@
 
 #include <types/port_range.h>
 
+#define GET_NEXT_OFF(range, off) ((off) == (range)->size - 1 ? 0 : (off) + 1)
+
 /* return an available port from range <range>, or zero if none is left */
 static inline int port_range_alloc_port(struct port_range *range)
 {
 	int ret;
+	int get;
+	int put;
 
-	if (!range->avail)
-		return 0;
-	ret = range->ports[range->get];
-	range->get++;
-	if (range->get >= range->size)
-		range->get = 0;
-	range->avail--;
+	get = _HA_ATOMIC_LOAD(&range->get);
+	do {
+		/* barrier ot make sure get is loaded before put */
+		__ha_barrier_atomic_load();
+		put = _HA_ATOMIC_LOAD(&range->put_t);
+		if (unlikely(put == get))
+			return 0;
+		ret = range->ports[get];
+	} while (!(_HA_ATOMIC_CAS(&range->get, &get, GET_NEXT_OFF(range, get))));
 	return ret;
 }
 
@@ -45,14 +51,30 @@
  */
 static inline void port_range_release_port(struct port_range *range, int port)
 {
+	int put;
+
 	if (!port || !range)
 		return;
 
-	range->ports[range->put] = port;
-	range->avail++;
-	range->put++;
-	if (range->put >= range->size)
-		range->put = 0;
+	put = range->put_h;
+	/* put_h is reserved for producers, so that they can each get a
+	 * free slot, put_t is what is used by consumers to know if there's
+	 * elements available or not
+	 */
+	/* First reserve or slot, we know the ring buffer can't be full,
+	 * as we will only ever release port we allocated before
+	 */
+	while (!(_HA_ATOMIC_CAS(&range->put_h, &put, GET_NEXT_OFF(range, put))));
+	_HA_ATOMIC_STORE(&range->ports[put], port);
+	/* Barrier to make sure the new port is visible before we change put_t */
+	__ha_barrier_atomic_store();
+	/* Wait until all the threads that got a slot before us are done */
+	while ((volatile int)range->put_t != put)
+		__ha_compiler_barrier();
+	/* Let the world know we're done, and any potential consumer they
+	 * can use that port.
+	 */
+	_HA_ATOMIC_STORE(&range->put_t, GET_NEXT_OFF(range, put));
 }
 
 /* return a new initialized port range of N ports. The ports are not
@@ -62,8 +84,10 @@
 {
 	struct port_range *ret;
 	ret = calloc(1, sizeof(struct port_range) +
-		     n * sizeof(((struct port_range *)0)->ports[0]));
-	ret->size = ret->avail = n;
+		     (n + 1) * sizeof(((struct port_range *)0)->ports[0]));
+	ret->size = n + 1;
+	/* Start at the first free element */
+	ret->put_h = ret->put_t = n;
 	return ret;
 }