MINOR: stream: Handle stream HTTP upgrade in a dedicated function

The code responsible to perform an HTTP upgrade from a TCP stream is moved
in a dedicated function, stream_set_http_mode().

The stream_set_backend() function is slightly updated, especially to
correctly set the request analysers.
diff --git a/include/haproxy/stream.h b/include/haproxy/stream.h
index bfd92ec..6a49f14 100644
--- a/include/haproxy/stream.h
+++ b/include/haproxy/stream.h
@@ -60,6 +60,7 @@
 struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input);
 int stream_create_from_cs(struct conn_stream *cs, struct buffer *input);
 int stream_upgrade_from_cs(struct conn_stream *cs, struct buffer *input);
+int stream_set_http_mode(struct stream *s);
 
 /* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
 void stream_shutdown(struct stream *stream, int why);
diff --git a/src/proxy.c b/src/proxy.c
index cd7ba51..fddb179 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -2116,6 +2116,8 @@
  */
 int stream_set_backend(struct stream *s, struct proxy *be)
 {
+	unsigned int req_ana;
+
 	if (s->flags & SF_BE_ASSIGNED)
 		return 1;
 
@@ -2140,69 +2142,41 @@
 	 * be more reliable to store the list of analysers that have been run,
 	 * but what we do here is OK for now.
 	 */
-	s->req.analysers |= be->be_req_ana & ~(strm_li(s) ? strm_li(s)->analysers : 0);
+	req_ana = be->be_req_ana;
+	if (!(strm_fe(s)->options & PR_O_WREQ_BODY) && be->options & PR_O_WREQ_BODY) {
+		/* The backend request to parse a request body while it was not
+		 * performed on the frontend, so add the corresponding analyser
+		 */
+		req_ana |= AN_REQ_HTTP_BODY;
+	}
+	if (IS_HTX_STRM(s) && strm_fe(s)->mode != PR_MODE_HTTP) {
+		/* The stream was already upgraded to HTTP, so remove analysers
+		 * set during the upgrade
+		 */
+		req_ana &= ~(AN_REQ_WAIT_HTTP|AN_REQ_HTTP_PROCESS_FE);
+	}
+	s->req.analysers |= req_ana & ~(strm_li(s) ? strm_li(s)->analysers : 0);
 
-	/* If the target backend requires HTTP processing, we have to allocate
-	 * the HTTP transaction if we did not have one.
-	 */
-	if (unlikely(!s->txn && be->http_needed && !http_create_txn(s)))
-			return 0;
-
-	if (s->txn) {
+	if (!IS_HTX_STRM(s) && be->mode == PR_MODE_HTTP) {
 		/* If we chain a TCP frontend to an HTX backend, we must upgrade
 		 * the client mux */
-		if (!IS_HTX_STRM(s) && be->mode == PR_MODE_HTTP) {
-			struct connection  *conn = objt_conn(strm_sess(s)->origin);
-			struct conn_stream *cs   = objt_cs(s->si[0].end);
-
-			if (conn && cs) {
-				si_rx_endp_more(&s->si[0]);
-				/* Make sure we're unsubscribed, the the new
-				 * mux will probably want to subscribe to
-				 * the underlying XPRT
-				 */
-				if (s->si[0].wait_event.events)
-					conn->mux->unsubscribe(cs, s->si[0].wait_event.events,
-					    &s->si[0].wait_event);
-				if (conn->mux->flags & MX_FL_NO_UPG)
-					return 0;
-				if (conn_upgrade_mux_fe(conn, cs, &s->req.buf, ist(""), PROTO_MODE_HTTP)  == -1)
-					return 0;
-
-				s->req.flags &= ~(CF_READ_PARTIAL|CF_AUTO_CONNECT);
-				s->req.total = 0;
-				s->flags |= SF_IGNORE;
-				if (strcmp(conn->mux->name, "H2") == 0) {
-					/* For HTTP/2, destroy the conn_stream,
-					 * disable logging, and abort the stream
-					 * process. Thus it will be silently
-					 * destroyed. The new mux will create
-					 * new streams.
-					 */
-					cs_free(cs);
-					si_detach_endpoint(&s->si[0]);
-					s->logs.logwait = 0;
-					s->logs.level = 0;
-					channel_abort(&s->req);
-					channel_abort(&s->res);
-					s->req.analysers &= AN_REQ_FLT_END;
-					s->req.analyse_exp = TICK_ETERNITY;
-					return 1;
-				}
-			}
-		}
-		else if (IS_HTX_STRM(s) && be->mode != PR_MODE_HTTP) {
-			/* If a TCP backend is assgiend to an HTX stream, return
-			 * an error. It may happens for a new stream on a
-			 * previously upgraded connections. */
-			if (!(s->flags & SF_ERR_MASK))
-				s->flags |= SF_ERR_INTERNAL;
+		if (!stream_set_http_mode(s))
 			return 0;
-		}
-
-		/* we may request to parse a request body */
-		if (be->options & PR_O_WREQ_BODY)
-			s->req.analysers |= AN_REQ_HTTP_BODY;
+	}
+	else if (IS_HTX_STRM(s) && be->mode != PR_MODE_HTTP) {
+		/* If a TCP backend is assgiend to an HTX stream, return an
+		 * error. It may happens for a new stream on a previously
+		 * upgraded connections. */
+		if (!(s->flags & SF_ERR_MASK))
+			s->flags |= SF_ERR_INTERNAL;
+		return 0;
+	}
+	else {
+		/* If the target backend requires HTTP processing, we have to allocate
+		 * the HTTP transaction if we did not have one.
+		 */
+		if (unlikely(!s->txn && be->http_needed && !http_create_txn(s)))
+			return 0;
 	}
 
 	s->flags |= SF_BE_ASSIGNED;
diff --git a/src/stream.c b/src/stream.c
index 8b8e26d..124345f 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -1478,6 +1478,66 @@
 	return 1;
 }
 
+/* Set the stream to HTTP mode, if necessary. The minimal request HTTP analysers
+ * are set and the client mux is upgraded. It returns 1 if the stream processing
+ * may continue or 0 if it should be stopped. It happens on error or if the
+ * upgrade required a new stream.
+ */
+int stream_set_http_mode(struct stream *s)
+{
+	struct connection  *conn;
+	struct conn_stream *cs;
+
+	/* Already an HTTP stream */
+	if (IS_HTX_STRM(s))
+		return 1;
+
+	s->req.analysers |= AN_REQ_WAIT_HTTP|AN_REQ_HTTP_PROCESS_FE;
+
+	if (unlikely(!s->txn && !http_create_txn(s)))
+		return 0;
+
+	conn = objt_conn(strm_sess(s)->origin);
+	cs = objt_cs(s->si[0].end);
+	if (conn && cs) {
+		si_rx_endp_more(&s->si[0]);
+		/* Make sure we're unsubscribed, the the new
+		 * mux will probably want to subscribe to
+		 * the underlying XPRT
+		 */
+		if (s->si[0].wait_event.events)
+			conn->mux->unsubscribe(cs, s->si[0].wait_event.events,
+					       &s->si[0].wait_event);
+		if (conn->mux->flags & MX_FL_NO_UPG)
+			return 0;
+		if (conn_upgrade_mux_fe(conn, cs, &s->req.buf, ist(""), PROTO_MODE_HTTP)  == -1)
+			return 0;
+
+		s->req.flags &= ~(CF_READ_PARTIAL|CF_AUTO_CONNECT);
+		s->req.total = 0;
+		s->flags |= SF_IGNORE;
+		if (strcmp(conn->mux->name, "H2") == 0) {
+			/* For HTTP/2, destroy the conn_stream, disable logging,
+			 * and abort the stream process. Thus it will be
+			 * silently destroyed. The new mux will create new
+			 * streams.
+			 */
+			cs_free(cs);
+			si_detach_endpoint(&s->si[0]);
+			s->logs.logwait = 0;
+			s->logs.level = 0;
+			channel_abort(&s->req);
+			channel_abort(&s->res);
+			s->req.analysers &= AN_REQ_FLT_END;
+			s->req.analyse_exp = TICK_ETERNITY;
+		}
+	}
+
+	return 1;
+}
+
+
+
 /* This macro is very specific to the function below. See the comments in
  * process_stream() below to understand the logic and the tests.
  */