MEDIUM: server/backend: implement websocket protocol selection

Handle properly websocket streams if the server uses an ALPN with both
h1 and h2. Add a new field h2_ws in the server structure. If set to off,
reuse is automatically disable on backend and ALPN is forced to http1.x
if possible. Nothing is done if on.

Implement a mechanism to be able to use a different http version for
websocket streams. A new server member <ws> represents the algorithm to
select the protocol. This can overrides the server <proto>
configuration. If the connection uses ALPN for proto selection, it is
updated for websocket streams to select the right protocol.

Three mode of selection are implemented :
- auto : use the same protocol between non-ws and ws streams. If ALPN is
  use, try to update it to "http/1.1"; this is only done if the server
  ALPN contains "http/1.1".
- h1 : use http/1.1
- h2 : use http/2.0; this requires the server to support RFC8441 or an
  error will be returned by haproxy.

(cherry picked from commit 9c3251d1087061f3a0c88f69b8f62ec1f9324bd7)
[ad: adjusted context: skip reuse move in connect_server; refcount field
 in server structure renamed]
Signed-off-by: Amaury Denoyelle <adenoyelle@haproxy.com>
diff --git a/include/haproxy/server-t.h b/include/haproxy/server-t.h
index 7fd4288..4291953 100644
--- a/include/haproxy/server-t.h
+++ b/include/haproxy/server-t.h
@@ -212,6 +212,13 @@
 	struct eb_root avail_conns;             /* Connections in use, but with still new streams available */
 };
 
+/* Configure the protocol selection for websocket */
+enum __attribute__((__packed__)) srv_ws_mode {
+	SRV_WS_AUTO = 0,
+	SRV_WS_H1,
+	SRV_WS_H2,
+};
+
 struct proxy;
 struct server {
 	/* mostly config or admin stuff, doesn't change often */
@@ -258,6 +265,9 @@
 	unsigned cumulative_weight;		/* weight of servers prior to this one in the same group, for chash balancing */
 	int maxqueue;				/* maximum number of pending connections allowed */
 
+	enum srv_ws_mode ws;                    /* configure the protocol selection for websocket */
+	/* 3 bytes hole here */
+
 	uint refcount_dynsrv;                   /* refcount used for dynamic servers */
 
 	/* The elements below may be changed on every single request by any
diff --git a/include/haproxy/server.h b/include/haproxy/server.h
index 625f68b..c927de3 100644
--- a/include/haproxy/server.h
+++ b/include/haproxy/server.h
@@ -157,6 +157,9 @@
  */
 void srv_set_dyncookie(struct server *s);
 
+int srv_check_reuse_ws(struct server *srv);
+const struct mux_ops *srv_get_ws_proto(struct server *srv);
+
 /* increase the number of cumulated connections on the designated server */
 static inline void srv_inc_sess_ctr(struct server *s)
 {
diff --git a/src/backend.c b/src/backend.c
index 1701304..7bb1685 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -1343,6 +1343,16 @@
 	if (!IS_HTX_STRM(s))
 		goto skip_reuse;
 
+	/* disable reuse if websocket stream and the protocol to use is not the
+	 * same as the main protocol of the server.
+	 */
+	if (unlikely(s->flags & SF_WEBSOCKET) && srv) {
+		if (!srv_check_reuse_ws(srv)) {
+			DBG_TRACE_STATE("skip idle connections reuse: websocket stream", STRM_EV_STRM_PROC|STRM_EV_SI_ST, s);
+			goto skip_reuse;
+		}
+	}
+
 	/* first, search for a matching connection in the session's idle conns */
 	srv_conn = session_get_conn(s->sess, s->target, hash);
 	if (srv_conn)
@@ -1596,6 +1606,33 @@
 			srv_conn->send_proxy_ofs = 1;
 			srv_conn->flags |= CO_FL_SOCKS4;
 		}
+
+#if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation)
+		/* if websocket stream, try to update connection ALPN. */
+		if (unlikely(s->flags & SF_WEBSOCKET) &&
+		    srv && srv->use_ssl && srv->ssl_ctx.alpn_str) {
+			char *alpn = "";
+			int force = 0;
+
+			switch (srv->ws) {
+			case SRV_WS_AUTO:
+				alpn = "\x08http/1.1";
+				force = 0;
+				break;
+			case SRV_WS_H1:
+				alpn = "\x08http/1.1";
+				force = 1;
+				break;
+			case SRV_WS_H2:
+				alpn = "\x02h2";
+				force = 1;
+				break;
+			}
+
+			if (!conn_update_alpn(srv_conn, ist(alpn), force))
+				DBG_TRACE_STATE("update alpn for websocket", STRM_EV_STRM_PROC|STRM_EV_SI_ST, s);
+		}
+#endif
 	}
 	else {
 		/* Currently there seems to be no known cases of xprt ready
@@ -1654,7 +1691,9 @@
 	 * fail, and flag the connection as CO_FL_ERROR.
 	 */
 	if (init_mux) {
-		if (conn_install_mux_be(srv_conn, srv_cs, s->sess, NULL) < 0) {
+		const struct mux_ops *alt_mux =
+		  likely(!(s->flags & SF_WEBSOCKET)) ? NULL : srv_get_ws_proto(srv);
+		if (conn_install_mux_be(srv_conn, srv_cs, s->sess, alt_mux) < 0) {
 			conn_full_close(srv_conn);
 			return SF_ERR_INTERNAL;
 		}
diff --git a/src/server.c b/src/server.c
index 69aa514..6989f85 100644
--- a/src/server.c
+++ b/src/server.c
@@ -197,6 +197,94 @@
 	HA_RWLOCK_RDUNLOCK(PROXY_LOCK, &p->lock);
 }
 
+/* Returns true if it's possible to reuse an idle connection from server <srv>
+ * for a websocket stream. This is the case if server is configured to use the
+ * same protocol for both HTTP and websocket streams. This depends on the value
+ * of "proto", "alpn" and "ws" keywords.
+ */
+int srv_check_reuse_ws(struct server *srv)
+{
+	if (srv->mux_proto || srv->use_ssl != 1 || !srv->ssl_ctx.alpn_str) {
+		/* explicit srv.mux_proto or no ALPN : srv.mux_proto is used
+		 * for mux selection.
+		 */
+		const struct ist srv_mux = srv->mux_proto ?
+		                           srv->mux_proto->token : IST_NULL;
+
+		switch (srv->ws) {
+		/* "auto" means use the same protocol : reuse is possible. */
+		case SRV_WS_AUTO:
+			return 1;
+
+		/* "h2" means use h2 for websocket : reuse is possible if
+		 * server mux is h2.
+		 */
+		case SRV_WS_H2:
+			if (srv->mux_proto && isteq(srv_mux, ist("h2")))
+				return 1;
+			break;
+
+		/* "h1" means use h1 for websocket : reuse is possible if
+		 * server mux is h1.
+		 */
+		case SRV_WS_H1:
+			if (!srv->mux_proto || isteq(srv_mux, ist("h1")))
+				return 1;
+			break;
+		}
+	}
+	else {
+		/* ALPN selection.
+		 * Based on the assumption that only "h2" and "http/1.1" token
+		 * are used on server ALPN.
+		 */
+		const struct ist alpn = ist2(srv->ssl_ctx.alpn_str,
+		                             srv->ssl_ctx.alpn_len);
+
+		switch (srv->ws) {
+		case SRV_WS_AUTO:
+			/* for auto mode, consider reuse as possible if the
+			 * server uses a single protocol ALPN
+			 */
+			if (!istchr(alpn, ','))
+				return 1;
+			break;
+
+		case SRV_WS_H2:
+			return isteq(alpn, ist("\x02h2"));
+
+		case SRV_WS_H1:
+			return isteq(alpn, ist("\x08http/1.1"));
+		}
+	}
+
+	return 0;
+}
+
+/* Return the proto to used for a websocket stream on <srv> without ALPN. NULL
+ * is a valid value indicating to use the fallback mux.
+ */
+const struct mux_ops *srv_get_ws_proto(struct server *srv)
+{
+	const struct mux_proto_list *mux = NULL;
+
+	switch (srv->ws) {
+	case SRV_WS_AUTO:
+		mux = srv->mux_proto;
+		break;
+
+	case SRV_WS_H1:
+		mux = get_mux_proto(ist("h1"));
+		break;
+
+	case SRV_WS_H2:
+		mux = get_mux_proto(ist("h2"));
+		break;
+	}
+
+	return mux ? mux->mux : NULL;
+}
+
 /*
  * Must be called with the server lock held. The server is first removed from
  * the proxy tree if it was already attached. If <reattach> is true, the server
@@ -2094,6 +2182,7 @@
 	srv->agent.fastinter          = src->agent.fastinter;
 	srv->agent.downinter          = src->agent.downinter;
 	srv->maxqueue                 = src->maxqueue;
+	srv->ws                       = src->ws;
 	srv->minconn                  = src->minconn;
 	srv->maxconn                  = src->maxconn;
 	srv->slowstart                = src->slowstart;