BUG/MAJOR: listener: fix thread safety in resume_listener()

resume_listener() can be called from a thread not part of the listener's
mask after a curr_conn has gone lower than a proxy's or the process' limit.
This results in fd_may_recv() being called unlocked if the listener is
bound to only one thread, and quickly locks up.

This patch solves this by creating a per-thread work_list dedicated to
listeners, and modifying resume_listener() so that it bounces the listener
to one of its owning thread's work_list and waking it up. This thread will
then call resume_listener() again and will perform the operation on the
file descriptor itself. It is important to do it this way so that the
listener's state cannot be modified while the listener is being moved,
otherwise multiple threads can take conflicting decisions and the listener
could be put back into the global queue if the listener was used at the
same time.

It seems like a slightly simpler approach would be possible if the locked
list API would provide the ability to return a locked element. In this
case the listener would be immediately requeued in dequeue_all_listeners()
without having to go through resume_listener() with its associated lock.

This fix must be backported to all versions having the lock-less accept
loop, which is as far as 1.8 since deadlock fixes involving this feature
had to be backported there. It is expected that the code should not differ
too much there. However, previous commit "MINOR: task: introduce work lists"
will be needed as well and should not present difficulties either. For 1.8,
the commits introducing thread_mask() and LIST_ADDED() will be needed as
well, either backporting my_flsl() or switching to my_ffsl() will be OK,
and some changes will have to be performed so that the init function is
properly called (and maybe the deinit one can be dropped).

In order to test for the fix, simply set up a multi-threaded frontend with
multiple bind lines each attached to a single thread (reproduced with 16
threads here), set up a very low maxconn value on the frontend, and inject
heavy traffic on all listeners in parallel with slightly more connections
than the configured limit ( typically +20%) so that it flips very
frequently. If the bug is still there, at some point (5-20 seconds) the
traffic will go much lower or even stop, either with spinning threads or
not.

(cherry picked from commit f2cb169487ca800a4849f5961c0fb30443de343d)
Signed-off-by: Christopher Faulet <cfaulet@haproxy.com>
diff --git a/src/listener.c b/src/listener.c
index 8b167bd..40a774e 100644
--- a/src/listener.c
+++ b/src/listener.c
@@ -49,6 +49,12 @@
 
 struct xfer_sock_list *xfer_sock_list = NULL;
 
+/* there is one listener queue per thread so that a thread unblocking the
+ * global queue can wake up listeners bound only to foreing threads by
+ * moving them to the remote queues and waking up the associated task.
+ */
+static struct work_list *local_listener_queue;
+
 #if defined(USE_THREAD)
 
 struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64))) = { };
@@ -328,6 +334,12 @@
 
 	HA_SPIN_LOCK(LISTENER_LOCK, &l->lock);
 
+	/* check that another thread didn't to the job in parallel (e.g. at the
+	 * end of listen_accept() while we'd come from dequeue_all_listeners().
+	 */
+	if (LIST_ADDED(&l->wait_queue))
+		goto end;
+
 	if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) &&
 	    !(proc_mask(l->bind_conf->bind_proc) & pid_bit))
 		goto end;
@@ -370,6 +382,15 @@
 		goto end;
 	}
 
+	if (!(thread_mask(l->bind_conf->bind_thread) & tid_bit)) {
+		/* we're not allowed to touch this listener's FD, let's requeue
+		 * the listener into one of its owning thread's queue instead.
+		 */
+		int first_thread = my_flsl(thread_mask(l->bind_conf->bind_thread)) - 1;
+		work_list_add(&local_listener_queue[first_thread], &l->wait_queue);
+		goto end;
+	}
+
 	fd_want_recv(l->fd);
 	l->state = LI_READY;
   end:
@@ -1063,6 +1084,38 @@
 		dequeue_all_listeners(&fe->listener_queue);
 }
 
+/* resume listeners waiting in the local listener queue. They are still in LI_LIMITED state */
+static struct task *listener_queue_process(struct task *t, void *context, unsigned short state)
+{
+	struct work_list *wl = context;
+	struct listener *l;
+
+	while ((l = LIST_POP_LOCKED(&wl->head, struct listener *, wait_queue))) {
+		/* The listeners are still in the LI_LIMITED state */
+		resume_listener(l);
+	}
+	return t;
+}
+
+/* Initializes the listener queues. Returns 0 on success, otherwise ERR_* flags */
+static int listener_queue_init()
+{
+	local_listener_queue = work_list_create(global.nbthread, listener_queue_process, NULL);
+	if (!local_listener_queue) {
+		ha_alert("Out of memory while initializing listener queues.\n");
+		return ERR_FATAL|ERR_ABORT;
+	}
+	return 0;
+}
+
+static void listener_queue_deinit()
+{
+	work_list_destroy(local_listener_queue, global.nbthread);
+}
+
+REGISTER_CONFIG_POSTPARSER("multi-threaded listener queue", listener_queue_init);
+REGISTER_POST_DEINIT(listener_queue_deinit);
+
 /*
  * Registers the bind keyword list <kwl> as a list of valid keywords for next
  * parsing sessions.