BUG/MEDIUM: connections: Let the xprt layer know a takeover happened.

When we takeover a connection, let the xprt layer know. If it has its own
tasklet, and it is already scheduled, then it has to be destroyed, otherwise
it may run the new mux tasklet on the old thread.

Note that we only do this for the ssl xprt for now, because the only other
one that might wake the mux up is the handshake one, which is supposed to
disappear before idle connections exist.

No backport is needed, this is for 2.2.
diff --git a/include/haproxy/connection-t.h b/include/haproxy/connection-t.h
index e7e9c0d..c00b94e 100644
--- a/include/haproxy/connection-t.h
+++ b/include/haproxy/connection-t.h
@@ -354,6 +354,7 @@
 	int  (*prepare_srv)(struct server *srv);    /* prepare a server context */
 	void (*destroy_srv)(struct server *srv);    /* destroy a server context */
 	int  (*get_alpn)(const struct connection *conn, void *xprt_ctx, const char **str, int *len); /* get application layer name */
+	int (*takeover)(struct connection *conn, void *xprt_ctx, int orig_tid); /* Let the xprt know the fd have been taken over */
 	char name[8];                               /* transport layer name, zero-terminated */
 	int (*subscribe)(struct connection *conn, void *xprt_ctx, int event_type, struct wait_event *es); /* Subscribe <es> to events, such as "being able to send" */
 	int (*unsubscribe)(struct connection *conn, void *xprt_ctx, int event_type, struct wait_event *es); /* Unsubscribe <es> from events */
diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c
index 9e8eb9a..a0509e6 100644
--- a/src/mux_fcgi.c
+++ b/src/mux_fcgi.c
@@ -4091,6 +4091,18 @@
 
 	if (fd_takeover(conn->handle.fd, conn) != 0)
 		return -1;
+
+	if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid) != 0) {
+		/* We failed to takeover the xprt, even if the connection may
+		 * still be valid, flag it as error'd, as we have already
+		 * taken over the fd, and wake the tasklet, so that it will
+		 * destroy it.
+		 */
+		conn->flags |= CO_FL_ERROR;
+		tasklet_wakeup_on(fcgi->wait_event.tasklet, orig_tid);
+		return -1;
+	}
+
 	if (fcgi->wait_event.events)
 		fcgi->conn->xprt->unsubscribe(fcgi->conn, fcgi->conn->xprt_ctx,
 		    fcgi->wait_event.events, &fcgi->wait_event);
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 0111f19..1a166a4 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -2929,6 +2929,18 @@
 
 	if (fd_takeover(conn->handle.fd, conn) != 0)
 		return -1;
+
+	if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid) != 0) {
+		/* We failed to takeover the xprt, even if the connection may
+		 * still be valid, flag it as error'd, as we have already
+		 * taken over the fd, and wake the tasklet, so that it will
+		 * destroy it.
+		 */
+		conn->flags |= CO_FL_ERROR;
+		tasklet_wakeup_on(h1c->wait_event.tasklet, orig_tid);
+		return -1;
+	}
+
 	if (h1c->wait_event.events)
 		h1c->conn->xprt->unsubscribe(h1c->conn, h1c->conn->xprt_ctx,
 		    h1c->wait_event.events, &h1c->wait_event);
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 22212af..08120e9 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -6062,6 +6062,18 @@
 
 	if (fd_takeover(conn->handle.fd, conn) != 0)
 		return -1;
+
+	if (conn->xprt->takeover && conn->xprt->takeover(conn, conn->xprt_ctx, orig_tid) != 0) {
+		/* We failed to takeover the xprt, even if the connection may
+		 * still be valid, flag it as error'd, as we have already
+		 * taken over the fd, and wake the tasklet, so that it will
+		 * destroy it.
+		 */
+		conn->flags |= CO_FL_ERROR;
+		tasklet_wakeup_on(h2c->wait_event.tasklet, orig_tid);
+		return -1;
+	}
+
 	if (h2c->wait_event.events)
 		h2c->conn->xprt->unsubscribe(h2c->conn, h2c->conn->xprt_ctx,
 		    h2c->wait_event.events, &h2c->wait_event);
diff --git a/src/ssl_sock.c b/src/ssl_sock.c
index dce5c64..a32db1a 100644
--- a/src/ssl_sock.c
+++ b/src/ssl_sock.c
@@ -5425,6 +5425,27 @@
 	return 0;
 }
 
+/* The connection has been taken over, so destroy the old tasklet and create
+ * a new one. The original thread ID must be passed into orig_tid
+ * It should be called with the takeover lock for the old thread held.
+ * Returns 0 on success, and -1 on failure
+ */
+static int ssl_takeover(struct connection *conn, void *xprt_ctx, int orig_tid)
+{
+	struct ssl_sock_ctx *ctx = xprt_ctx;
+	struct tasklet *tl = tasklet_new();
+
+	if (!tl)
+		return -1;
+
+	ctx->wait_event.tasklet->context = NULL;
+	tasklet_wakeup_on(ctx->wait_event.tasklet, orig_tid);
+	ctx->wait_event.tasklet = tl;
+	ctx->wait_event.tasklet->process = ssl_sock_io_cb;
+	ctx->wait_event.tasklet->context = ctx;
+	return 0;
+}
+
 /* Use the provided XPRT as an underlying XPRT, and provide the old one.
  * Returns 0 on success, and non-zero on failure.
  */
@@ -5459,8 +5480,23 @@
 
 static struct task *ssl_sock_io_cb(struct task *t, void *context, unsigned short state)
 {
+	struct tasklet *tl = (struct tasklet *)t;
 	struct ssl_sock_ctx *ctx = context;
+	struct connection *conn;
+	int conn_in_list;
+	int ret = 0;
 
+	HA_SPIN_LOCK(OTHER_LOCK, &idle_conns[tid].takeover_lock);
+	if (tl->context == NULL) {
+		HA_SPIN_UNLOCK(OTHER_LOCK, &idle_conns[tid].takeover_lock);
+		tasklet_free(tl);
+		return NULL;
+	}
+	conn = ctx->conn;
+	conn_in_list = conn->flags & CO_FL_LIST_MASK;
+	if (conn_in_list)
+		MT_LIST_DEL(&conn->list);
+	HA_SPIN_UNLOCK(OTHER_LOCK, &idle_conns[tid].takeover_lock);
 	/* First if we're doing an handshake, try that */
 	if (ctx->conn->flags & CO_FL_SSL_WAIT_HS)
 		ssl_sock_handshake(ctx->conn, CO_FL_SSL_WAIT_HS);
@@ -5471,7 +5507,6 @@
 	 */
 	if ((ctx->conn->flags & CO_FL_ERROR) ||
 	    !(ctx->conn->flags & CO_FL_SSL_WAIT_HS)) {
-		int ret = 0;
 		int woke = 0;
 
 		/* On error, wake any waiter */
@@ -5491,8 +5526,8 @@
 			if (!ctx->conn->mux)
 				ret = conn_create_mux(ctx->conn);
 			if (ret >= 0 && !woke && ctx->conn->mux && ctx->conn->mux->wake)
-				ctx->conn->mux->wake(ctx->conn);
-			return NULL;
+				ret = ctx->conn->mux->wake(ctx->conn);
+			goto leave;
 		}
 	}
 #if (HA_OPENSSL_VERSION_NUMBER >= 0x10101000L)
@@ -5505,6 +5540,15 @@
 			ctx->subs = NULL;
 	}
 #endif
+leave:
+	if (!ret && conn_in_list) {
+		struct server *srv = objt_server(conn->target);
+
+		if (conn_in_list == CO_FL_SAFE_LIST)
+			MT_LIST_ADDQ(&srv->safe_conns[tid], &conn->list);
+		else
+			MT_LIST_ADDQ(&srv->idle_conns[tid], &conn->list);
+	}
 	return NULL;
 }
 
@@ -6494,6 +6538,7 @@
 	.prepare_srv = ssl_sock_prepare_srv_ctx,
 	.destroy_srv = ssl_sock_free_srv_ctx,
 	.get_alpn = ssl_sock_get_alpn,
+	.takeover = ssl_takeover,
 	.name     = "SSL",
 };