MEDIUM: connection: close front idling connection on soft-stop

Implement a safe mechanism to close front idling connection which
prevents the soft-stop to complete. Every h1/h2 front connection is
added in a new per-thread list instance. On shutdown, a new task is
waking up which calls wake mux operation on every connection still
present in the new list.

A new stopping_list attach point has been added in the connection
structure. As this member is only used for frontend connections, it
shared the same union as the session_list reserved for backend
connections.
diff --git a/src/connection.c b/src/connection.c
index c0ba815..67b2124 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -41,6 +41,8 @@
         .list = LIST_HEAD_INIT(mux_proto_list.list)
 };
 
+struct mux_stopping_data mux_stopping_data[MAX_THREADS];
+
 /* disables sending of proxy-protocol-v2's LOCAL command */
 static int pp2_never_send_local;
 
diff --git a/src/haproxy.c b/src/haproxy.c
index 5d3e735..f149fc5 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -2370,11 +2370,31 @@
 	exit(status);
 }
 
+/* Handler of the task of mux_stopping_data.
+ * Called on soft-stop.
+ */
+struct task *mux_stopping_process(struct task *t, void *ctx, unsigned int state)
+{
+	struct connection *conn, *back;
+
+	list_for_each_entry_safe(conn, back, &mux_stopping_data[tid].list, stopping_list) {
+		if (conn->mux && conn->mux->wake)
+			conn->mux->wake(conn);
+	}
+
+	return t;
+}
+
 /* Runs the polling loop */
 void run_poll_loop()
 {
 	int next, wake;
 
+	/* allocates the thread bound mux_stopping_data task */
+	mux_stopping_data[tid].task = task_new(tid_bit);
+	mux_stopping_data[tid].task->process = mux_stopping_process;
+	LIST_INIT(&mux_stopping_data[tid].list);
+
 	tv_update_date(0,1);
 	while (1) {
 		wake_expired_tasks();
@@ -2411,6 +2431,12 @@
 			int i;
 
 			if (stopping) {
+				/* stop muxes before acknowleding stopping */
+				if (!(stopping_thread_mask & tid_bit)) {
+					task_wakeup(mux_stopping_data[tid].task, TASK_WOKEN_OTHER);
+					wake = 1;
+				}
+
 				if (_HA_ATOMIC_OR_FETCH(&stopping_thread_mask, tid_bit) == tid_bit) {
 					/* notify all threads that stopping was just set */
 					for (i = 0; i < global.nbthread; i++)
@@ -2438,6 +2464,8 @@
 
 		activity[tid].loops++;
 	}
+
+	task_destroy(mux_stopping_data[tid].task);
 }
 
 static void *run_thread_poll_loop(void *data)
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 940b754..62bd477 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -816,6 +816,9 @@
 		h1c->shut_timeout = h1c->timeout = proxy->timeout.client;
 		if (tick_isset(proxy->timeout.clientfin))
 			h1c->shut_timeout = proxy->timeout.clientfin;
+
+		LIST_APPEND(&mux_stopping_data[tid].list,
+		            &h1c->conn->stopping_list);
 	}
 	if (tick_isset(h1c->timeout)) {
 		t = task_new(tid_bit);
@@ -936,6 +939,9 @@
 	}
 
 	if (conn) {
+		if (!conn_is_back(conn))
+			LIST_DEL_INIT(&conn->stopping_list);
+
 		conn->mux = NULL;
 		conn->ctx = NULL;
 		TRACE_DEVEL("freeing conn", H1_EV_H1C_END, conn);
diff --git a/src/mux_h2.c b/src/mux_h2.c
index ebae438..1374cd7 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -955,6 +955,15 @@
 	h2c->wait_event.tasklet->process = h2_io_cb;
 	h2c->wait_event.tasklet->context = h2c;
 	h2c->wait_event.events = 0;
+	if (!conn_is_back(conn)) {
+		/* Connection might already be in the stopping_list if subject
+		 * to h1->h2 upgrade.
+		 */
+		if (!LIST_INLIST(&conn->stopping_list)) {
+			LIST_APPEND(&mux_stopping_data[tid].list,
+			            &conn->stopping_list);
+		}
+	}
 
 	h2c->ddht = hpack_dht_alloc();
 	if (!h2c->ddht)
@@ -1097,6 +1106,9 @@
 	}
 
 	if (conn) {
+		if (!conn_is_back(conn))
+			LIST_DEL_INIT(&conn->stopping_list);
+
 		conn->mux = NULL;
 		conn->ctx = NULL;
 		TRACE_DEVEL("freeing conn", H2_EV_H2C_END, conn);