MINOR: stream: Handle stream HTTP upgrade in a dedicated function
The code responsible to perform an HTTP upgrade from a TCP stream is moved
in a dedicated function, stream_set_http_mode().
The stream_set_backend() function is slightly updated, especially to
correctly set the request analysers.
diff --git a/include/haproxy/stream.h b/include/haproxy/stream.h
index bfd92ec..6a49f14 100644
--- a/include/haproxy/stream.h
+++ b/include/haproxy/stream.h
@@ -60,6 +60,7 @@
struct stream *stream_new(struct session *sess, enum obj_type *origin, struct buffer *input);
int stream_create_from_cs(struct conn_stream *cs, struct buffer *input);
int stream_upgrade_from_cs(struct conn_stream *cs, struct buffer *input);
+int stream_set_http_mode(struct stream *s);
/* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
void stream_shutdown(struct stream *stream, int why);
diff --git a/src/proxy.c b/src/proxy.c
index cd7ba51..fddb179 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -2116,6 +2116,8 @@
*/
int stream_set_backend(struct stream *s, struct proxy *be)
{
+ unsigned int req_ana;
+
if (s->flags & SF_BE_ASSIGNED)
return 1;
@@ -2140,69 +2142,41 @@
* be more reliable to store the list of analysers that have been run,
* but what we do here is OK for now.
*/
- s->req.analysers |= be->be_req_ana & ~(strm_li(s) ? strm_li(s)->analysers : 0);
+ req_ana = be->be_req_ana;
+ if (!(strm_fe(s)->options & PR_O_WREQ_BODY) && be->options & PR_O_WREQ_BODY) {
+ /* The backend request to parse a request body while it was not
+ * performed on the frontend, so add the corresponding analyser
+ */
+ req_ana |= AN_REQ_HTTP_BODY;
+ }
+ if (IS_HTX_STRM(s) && strm_fe(s)->mode != PR_MODE_HTTP) {
+ /* The stream was already upgraded to HTTP, so remove analysers
+ * set during the upgrade
+ */
+ req_ana &= ~(AN_REQ_WAIT_HTTP|AN_REQ_HTTP_PROCESS_FE);
+ }
+ s->req.analysers |= req_ana & ~(strm_li(s) ? strm_li(s)->analysers : 0);
- /* If the target backend requires HTTP processing, we have to allocate
- * the HTTP transaction if we did not have one.
- */
- if (unlikely(!s->txn && be->http_needed && !http_create_txn(s)))
- return 0;
-
- if (s->txn) {
+ if (!IS_HTX_STRM(s) && be->mode == PR_MODE_HTTP) {
/* If we chain a TCP frontend to an HTX backend, we must upgrade
* the client mux */
- if (!IS_HTX_STRM(s) && be->mode == PR_MODE_HTTP) {
- struct connection *conn = objt_conn(strm_sess(s)->origin);
- struct conn_stream *cs = objt_cs(s->si[0].end);
-
- if (conn && cs) {
- si_rx_endp_more(&s->si[0]);
- /* Make sure we're unsubscribed, the the new
- * mux will probably want to subscribe to
- * the underlying XPRT
- */
- if (s->si[0].wait_event.events)
- conn->mux->unsubscribe(cs, s->si[0].wait_event.events,
- &s->si[0].wait_event);
- if (conn->mux->flags & MX_FL_NO_UPG)
- return 0;
- if (conn_upgrade_mux_fe(conn, cs, &s->req.buf, ist(""), PROTO_MODE_HTTP) == -1)
- return 0;
-
- s->req.flags &= ~(CF_READ_PARTIAL|CF_AUTO_CONNECT);
- s->req.total = 0;
- s->flags |= SF_IGNORE;
- if (strcmp(conn->mux->name, "H2") == 0) {
- /* For HTTP/2, destroy the conn_stream,
- * disable logging, and abort the stream
- * process. Thus it will be silently
- * destroyed. The new mux will create
- * new streams.
- */
- cs_free(cs);
- si_detach_endpoint(&s->si[0]);
- s->logs.logwait = 0;
- s->logs.level = 0;
- channel_abort(&s->req);
- channel_abort(&s->res);
- s->req.analysers &= AN_REQ_FLT_END;
- s->req.analyse_exp = TICK_ETERNITY;
- return 1;
- }
- }
- }
- else if (IS_HTX_STRM(s) && be->mode != PR_MODE_HTTP) {
- /* If a TCP backend is assgiend to an HTX stream, return
- * an error. It may happens for a new stream on a
- * previously upgraded connections. */
- if (!(s->flags & SF_ERR_MASK))
- s->flags |= SF_ERR_INTERNAL;
+ if (!stream_set_http_mode(s))
return 0;
- }
-
- /* we may request to parse a request body */
- if (be->options & PR_O_WREQ_BODY)
- s->req.analysers |= AN_REQ_HTTP_BODY;
+ }
+ else if (IS_HTX_STRM(s) && be->mode != PR_MODE_HTTP) {
+ /* If a TCP backend is assgiend to an HTX stream, return an
+ * error. It may happens for a new stream on a previously
+ * upgraded connections. */
+ if (!(s->flags & SF_ERR_MASK))
+ s->flags |= SF_ERR_INTERNAL;
+ return 0;
+ }
+ else {
+ /* If the target backend requires HTTP processing, we have to allocate
+ * the HTTP transaction if we did not have one.
+ */
+ if (unlikely(!s->txn && be->http_needed && !http_create_txn(s)))
+ return 0;
}
s->flags |= SF_BE_ASSIGNED;
diff --git a/src/stream.c b/src/stream.c
index 8b8e26d..124345f 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -1478,6 +1478,66 @@
return 1;
}
+/* Set the stream to HTTP mode, if necessary. The minimal request HTTP analysers
+ * are set and the client mux is upgraded. It returns 1 if the stream processing
+ * may continue or 0 if it should be stopped. It happens on error or if the
+ * upgrade required a new stream.
+ */
+int stream_set_http_mode(struct stream *s)
+{
+ struct connection *conn;
+ struct conn_stream *cs;
+
+ /* Already an HTTP stream */
+ if (IS_HTX_STRM(s))
+ return 1;
+
+ s->req.analysers |= AN_REQ_WAIT_HTTP|AN_REQ_HTTP_PROCESS_FE;
+
+ if (unlikely(!s->txn && !http_create_txn(s)))
+ return 0;
+
+ conn = objt_conn(strm_sess(s)->origin);
+ cs = objt_cs(s->si[0].end);
+ if (conn && cs) {
+ si_rx_endp_more(&s->si[0]);
+ /* Make sure we're unsubscribed, the the new
+ * mux will probably want to subscribe to
+ * the underlying XPRT
+ */
+ if (s->si[0].wait_event.events)
+ conn->mux->unsubscribe(cs, s->si[0].wait_event.events,
+ &s->si[0].wait_event);
+ if (conn->mux->flags & MX_FL_NO_UPG)
+ return 0;
+ if (conn_upgrade_mux_fe(conn, cs, &s->req.buf, ist(""), PROTO_MODE_HTTP) == -1)
+ return 0;
+
+ s->req.flags &= ~(CF_READ_PARTIAL|CF_AUTO_CONNECT);
+ s->req.total = 0;
+ s->flags |= SF_IGNORE;
+ if (strcmp(conn->mux->name, "H2") == 0) {
+ /* For HTTP/2, destroy the conn_stream, disable logging,
+ * and abort the stream process. Thus it will be
+ * silently destroyed. The new mux will create new
+ * streams.
+ */
+ cs_free(cs);
+ si_detach_endpoint(&s->si[0]);
+ s->logs.logwait = 0;
+ s->logs.level = 0;
+ channel_abort(&s->req);
+ channel_abort(&s->res);
+ s->req.analysers &= AN_REQ_FLT_END;
+ s->req.analyse_exp = TICK_ETERNITY;
+ }
+ }
+
+ return 1;
+}
+
+
+
/* This macro is very specific to the function below. See the comments in
* process_stream() below to understand the logic and the tests.
*/