MEDIUM: quic: implement thread affinity rebinding

Implement a new function qc_set_tid_affinity(). This function is
responsible to rebind a quic_conn instance to a new thread.

This operation consists mostly of releasing existing tasks and tasklet
and allocating new instances on the new thread. If the quic_conn uses
its owned socket, it is also migrated to the new thread. The migration
is finally completed with updated the CID TID to the new thread. After
this step, the connection is thus accessible to the new thread and
cannot be access anymore on the old one without risking race condition.

To ensure rebinding is either done completely or not at all, tasks and
tasklet are pre-allocated before all operations. If this fails, an error
is returned and rebiding is not done.

To destroy the older tasklet, its context is set to NULL before wake up.
In I/O callbacks, a new function qc_process() is used to check context
and free the tasklet if NULL.

The thread rebinding can cause a race condition if the older thread
quic_dghdlrs::dgrams list contains datagram for the connection after
rebinding is done. To prevent this, quic_rx_pkt_retrieve_conn() always
check if the packet CID is still associated to the current thread or
not. In the latter case, no connection is returned and the new thread is
returned to allow to redispatch the datagram to the new thread in a
thread-safe way.

This should be backported up to 2.7 after a period of observation.
diff --git a/include/haproxy/quic_conn-t.h b/include/haproxy/quic_conn-t.h
index 51337b1..052b8c8 100644
--- a/include/haproxy/quic_conn-t.h
+++ b/include/haproxy/quic_conn-t.h
@@ -231,6 +231,7 @@
 #define           QUIC_EV_CONN_RCV       (1ULL << 48)
 #define           QUIC_EV_CONN_KILL      (1ULL << 49)
 #define           QUIC_EV_CONN_KP        (1ULL << 50)
+#define           QUIC_EV_CONN_SET_AFFINITY (1ULL << 52)
 
 /* Similar to kernel min()/max() definitions. */
 #define QUIC_MIN(a, b) ({ \
diff --git a/include/haproxy/quic_conn.h b/include/haproxy/quic_conn.h
index a3cd423..a25d5ef 100644
--- a/include/haproxy/quic_conn.h
+++ b/include/haproxy/quic_conn.h
@@ -700,5 +700,7 @@
 	}
 }
 
+int qc_set_tid_affinity(struct quic_conn *qc, uint tid);
+
 #endif /* USE_QUIC */
 #endif /* _HAPROXY_QUIC_CONN_H */
diff --git a/src/quic_conn.c b/src/quic_conn.c
index 74a5f32..31dc1b9 100644
--- a/src/quic_conn.c
+++ b/src/quic_conn.c
@@ -177,6 +177,7 @@
 	{ .mask = QUIC_EV_CONN_IDLE_TIMER, .name = "idle_timer",     .desc = "idle timer task"},
 	{ .mask = QUIC_EV_CONN_SUB,      .name = "xprt_sub",         .desc = "RX/TX subcription or unsubscription to QUIC xprt"},
 	{ .mask = QUIC_EV_CONN_RCV,      .name = "conn_recv",        .desc = "RX on connection" },
+	{ .mask = QUIC_EV_CONN_SET_AFFINITY, .name = "conn_set_affinity", .desc = "set connection thread affinity" },
 	{ /* end */ }
 };
 
@@ -4968,9 +4969,9 @@
 	struct quic_conn *qc = context;
 	struct quic_enc_level *qel;
 
-	qel = &qc->els[QUIC_TLS_ENC_LEVEL_APP];
-
 	TRACE_ENTER(QUIC_EV_CONN_IO_CB, qc);
+
+	qel = &qc->els[QUIC_TLS_ENC_LEVEL_APP];
 	TRACE_STATE("connection handshake state", QUIC_EV_CONN_IO_CB, qc, &qc->state);
 
 	if (qc_test_fd(qc))
@@ -5045,6 +5046,7 @@
 	int st, zero_rtt;
 
 	TRACE_ENTER(QUIC_EV_CONN_IO_CB, qc);
+
 	eqel = &qc->els[QUIC_TLS_ENC_LEVEL_EARLY_DATA];
 	st = qc->state;
 	TRACE_PROTO("connection state", QUIC_EV_CONN_IO_CB, qc, &st);
@@ -6726,7 +6728,11 @@
  * this is an Initial packet. <dgram> is the datagram containing the packet and
  * <l> is the listener instance on which it was received.
  *
- * Returns the quic-conn instance or NULL.
+ * By default, <new_tid> is set to -1. However, if thread affinity has been
+ * chanbed, it will be set to its new thread ID.
+ *
+ * Returns the quic-conn instance or NULL if not found or thread affinity
+ * changed.
  */
 static struct quic_conn *quic_rx_pkt_retrieve_conn(struct quic_rx_packet *pkt,
                                                    struct quic_dgram *dgram,
@@ -6747,7 +6753,7 @@
 
 	qc = retrieve_qc_conn_from_cid(pkt, l, &dgram->saddr, new_tid);
 
-	/* If connection already created on another thread. */
+	/* If connection already created or rebinded on another thread. */
         if (!qc && *new_tid != -1 && tid != *new_tid)
 		goto out;
 
@@ -8234,7 +8240,7 @@
 
 			qc = from_qc ? from_qc : quic_rx_pkt_retrieve_conn(pkt, dgram, li, &new_tid);
 			/* qc is NULL if receiving a non Initial packet for an
-			 * unknown connection.
+			 * unknown connection or on connection affinity rebind.
 			 */
 			if (!qc) {
 				if (new_tid >= 0) {
@@ -8434,6 +8440,89 @@
 	return 0;
 }
 
+/* Move a <qc> QUIC connection and its resources from the current thread to the
+ * new one <new_tid>. After this call, the connection cannot be dereferenced
+ * anymore on the current thread.
+ *
+ * Returns 0 on success else non-zero.
+ */
+int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid)
+{
+	struct task *t1 = NULL, *t2 = NULL;
+	struct tasklet *t3 = NULL;
+
+	struct quic_connection_id *conn_id;
+	struct eb64_node *node;
+
+	TRACE_ENTER(QUIC_EV_CONN_SET_AFFINITY, qc);
+
+	/* Pre-allocate all required resources. This ensures we do not left a
+	 * connection with only some of its field rebinded.
+	 */
+	if (((t1 = task_new_on(new_tid)) == NULL) ||
+	    (qc->timer_task && (t2 = task_new_on(new_tid)) == NULL) ||
+	    (t3 = tasklet_new()) == NULL) {
+		goto err;
+	}
+
+	/* Reinit idle timer task. */
+	task_kill(qc->idle_timer_task);
+	t1->expire = qc->idle_timer_task->expire;
+	qc->idle_timer_task = t1;
+	qc->idle_timer_task->process = qc_idle_timer_task;
+	qc->idle_timer_task->context = qc;
+
+	/* Reinit timer task if allocated. */
+	if (qc->timer_task) {
+		task_kill(qc->timer_task);
+		qc->timer_task = t2;
+		qc->timer_task->process = qc_process_timer;
+		qc->timer_task->context = qc;
+	}
+
+	/* Reinit IO tasklet. */
+	tasklet_kill(qc->wait_event.tasklet);
+	/* In most cases quic_conn_app_io_cb is used but for 0-RTT quic_conn_io_cb can be still activated. */
+	t3->process = qc->wait_event.tasklet->process;
+	qc->wait_event.tasklet = t3;
+	qc->wait_event.tasklet->tid = new_tid;
+	qc->wait_event.tasklet->context = qc;
+	qc->wait_event.events = 0;
+
+	/* Rebind the connection FD. */
+	if (qc_test_fd(qc)) {
+		fd_migrate_on(qc->fd, new_tid);
+		/* TODO need to reactivate reading on the new thread. */
+	}
+
+	/* Remove conn from per-thread list instance. */
+	qc_detach_th_ctx_list(qc, 0);
+	/* Connection must not be closing or else it must be inserted in quic_conns_clo list instance instead. */
+	BUG_ON(qc->flags & (QUIC_FL_CONN_CLOSING|QUIC_FL_CONN_DRAINING));
+	LIST_APPEND(&ha_thread_ctx[new_tid].quic_conns, &qc->el_th_ctx);
+	qc->qc_epoch = HA_ATOMIC_LOAD(&qc_epoch);
+
+	node = eb64_first(&qc->cids);
+	BUG_ON(!node || eb64_next(node)); /* One and only one CID must be present before affinity rebind. */
+	conn_id = eb64_entry(node, struct quic_connection_id, seq_num);
+	/* Rebinding is considered done when CID points to the new thread. No
+	 * access should be done to quic-conn instance after it.
+	 */
+	HA_ATOMIC_STORE(&conn_id->tid, new_tid);
+	qc = NULL;
+
+	TRACE_LEAVE(QUIC_EV_CONN_SET_AFFINITY, NULL);
+	return 0;
+
+ err:
+	task_destroy(t1);
+	task_destroy(t2);
+	if (t3)
+		tasklet_free(t3);
+
+	TRACE_DEVEL("leaving on error", QUIC_EV_CONN_SET_AFFINITY, qc);
+	return 1;
+}
 
 /* appctx context used by "show quic" command */
 struct show_quic_ctx {