MEDIUM: connection: close front idling connection on soft-stop
Implement a safe mechanism to close front idling connection which
prevents the soft-stop to complete. Every h1/h2 front connection is
added in a new per-thread list instance. On shutdown, a new task is
waking up which calls wake mux operation on every connection still
present in the new list.
A new stopping_list attach point has been added in the connection
structure. As this member is only used for frontend connections, it
shared the same union as the session_list reserved for backend
connections.
diff --git a/include/haproxy/connection-t.h b/include/haproxy/connection-t.h
index 5314056..7cac07a 100644
--- a/include/haproxy/connection-t.h
+++ b/include/haproxy/connection-t.h
@@ -424,6 +424,14 @@
char name[8]; /* mux layer name, zero-terminated */
};
+/* list of frontend connections. Used to call mux wake operation on soft-stop
+ * to close idling connections.
+ */
+struct mux_stopping_data {
+ struct list list; /* list of registered frontend connections */
+ struct task *task; /* task woken up on soft-stop */
+};
+
/* data_cb describes the data layer's recv and send callbacks which are called
* when I/O activity was detected after the transport layer is ready. These
* callbacks are supposed to make use of the xprt_ops above to exchange data
@@ -528,6 +536,7 @@
struct mt_list toremove_list; /* list for connection to clean up */
union {
struct list session_list; /* used by backend conns, list of attached connections to a session */
+ struct list stopping_list; /* used by frontend conns, attach point in mux stopping list */
};
union conn_handle handle; /* connection handle at the socket layer */
const struct netns_entry *proxy_netns;
diff --git a/include/haproxy/connection.h b/include/haproxy/connection.h
index 296da89..b0b855e 100644
--- a/include/haproxy/connection.h
+++ b/include/haproxy/connection.h
@@ -44,6 +44,7 @@
extern struct pool_head *pool_head_authority;
extern struct xprt_ops *registered_xprt[XPRT_ENTRIES];
extern struct mux_proto_list mux_proto_list;
+extern struct mux_stopping_data mux_stopping_data[MAX_THREADS];
#define IS_HTX_CONN(conn) ((conn)->mux && ((conn)->mux->flags & MX_FL_HTX))
#define IS_HTX_CS(cs) (IS_HTX_CONN((cs)->conn))
@@ -373,6 +374,8 @@
MT_LIST_INIT(&conn->toremove_list);
if (conn_is_back(conn))
LIST_INIT(&conn->session_list);
+ else
+ LIST_INIT(&conn->stopping_list);
conn->subs = NULL;
conn->src = NULL;
conn->dst = NULL;
diff --git a/include/haproxy/global.h b/include/haproxy/global.h
index 9d8f789..4231ba8 100644
--- a/include/haproxy/global.h
+++ b/include/haproxy/global.h
@@ -57,6 +57,8 @@
extern unsigned char boot_seed[20]; // per-boot random seed (160 bits initially)
extern THREAD_LOCAL struct buffer trash;
+extern struct task *stopping_task[MAX_PROCS];
+
struct proxy;
struct server;
int main(int argc, char **argv);
diff --git a/src/connection.c b/src/connection.c
index c0ba815..67b2124 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -41,6 +41,8 @@
.list = LIST_HEAD_INIT(mux_proto_list.list)
};
+struct mux_stopping_data mux_stopping_data[MAX_THREADS];
+
/* disables sending of proxy-protocol-v2's LOCAL command */
static int pp2_never_send_local;
diff --git a/src/haproxy.c b/src/haproxy.c
index 5d3e735..f149fc5 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -2370,11 +2370,31 @@
exit(status);
}
+/* Handler of the task of mux_stopping_data.
+ * Called on soft-stop.
+ */
+struct task *mux_stopping_process(struct task *t, void *ctx, unsigned int state)
+{
+ struct connection *conn, *back;
+
+ list_for_each_entry_safe(conn, back, &mux_stopping_data[tid].list, stopping_list) {
+ if (conn->mux && conn->mux->wake)
+ conn->mux->wake(conn);
+ }
+
+ return t;
+}
+
/* Runs the polling loop */
void run_poll_loop()
{
int next, wake;
+ /* allocates the thread bound mux_stopping_data task */
+ mux_stopping_data[tid].task = task_new(tid_bit);
+ mux_stopping_data[tid].task->process = mux_stopping_process;
+ LIST_INIT(&mux_stopping_data[tid].list);
+
tv_update_date(0,1);
while (1) {
wake_expired_tasks();
@@ -2411,6 +2431,12 @@
int i;
if (stopping) {
+ /* stop muxes before acknowleding stopping */
+ if (!(stopping_thread_mask & tid_bit)) {
+ task_wakeup(mux_stopping_data[tid].task, TASK_WOKEN_OTHER);
+ wake = 1;
+ }
+
if (_HA_ATOMIC_OR_FETCH(&stopping_thread_mask, tid_bit) == tid_bit) {
/* notify all threads that stopping was just set */
for (i = 0; i < global.nbthread; i++)
@@ -2438,6 +2464,8 @@
activity[tid].loops++;
}
+
+ task_destroy(mux_stopping_data[tid].task);
}
static void *run_thread_poll_loop(void *data)
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 940b754..62bd477 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -816,6 +816,9 @@
h1c->shut_timeout = h1c->timeout = proxy->timeout.client;
if (tick_isset(proxy->timeout.clientfin))
h1c->shut_timeout = proxy->timeout.clientfin;
+
+ LIST_APPEND(&mux_stopping_data[tid].list,
+ &h1c->conn->stopping_list);
}
if (tick_isset(h1c->timeout)) {
t = task_new(tid_bit);
@@ -936,6 +939,9 @@
}
if (conn) {
+ if (!conn_is_back(conn))
+ LIST_DEL_INIT(&conn->stopping_list);
+
conn->mux = NULL;
conn->ctx = NULL;
TRACE_DEVEL("freeing conn", H1_EV_H1C_END, conn);
diff --git a/src/mux_h2.c b/src/mux_h2.c
index ebae438..1374cd7 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -955,6 +955,15 @@
h2c->wait_event.tasklet->process = h2_io_cb;
h2c->wait_event.tasklet->context = h2c;
h2c->wait_event.events = 0;
+ if (!conn_is_back(conn)) {
+ /* Connection might already be in the stopping_list if subject
+ * to h1->h2 upgrade.
+ */
+ if (!LIST_INLIST(&conn->stopping_list)) {
+ LIST_APPEND(&mux_stopping_data[tid].list,
+ &conn->stopping_list);
+ }
+ }
h2c->ddht = hpack_dht_alloc();
if (!h2c->ddht)
@@ -1097,6 +1106,9 @@
}
if (conn) {
+ if (!conn_is_back(conn))
+ LIST_DEL_INIT(&conn->stopping_list);
+
conn->mux = NULL;
conn->ctx = NULL;
TRACE_DEVEL("freeing conn", H2_EV_H2C_END, conn);