MINOR: connection: Be prepared to handle conn-stream with no connection

The conn-stream will progressively replace the stream-interface. Thus, a
stream will have to allocate the backend conn-stream during its
creation. This means it will be possible to have a conn-stream with no
connection. To prepare this change, we test the conn-stream's connection
when we retrieve it.
diff --git a/include/haproxy/connection.h b/include/haproxy/connection.h
index 7cc852f..e0af4c3 100644
--- a/include/haproxy/connection.h
+++ b/include/haproxy/connection.h
@@ -246,10 +246,16 @@
 		c->xprt->shutw(c, c->xprt_ctx, 0);
 }
 
+/* Returns the conn from a cs. If cs is NULL, returns NULL */
+static inline struct connection *cs_conn(const struct conn_stream *cs)
+{
+	return cs ? cs->conn : NULL;
+}
+
 /* shut read */
 static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
 {
-	if (cs->flags & CS_FL_SHR)
+	if (!cs_conn(cs) || cs->flags & CS_FL_SHR)
 		return;
 
 	/* clean data-layer shutdown */
@@ -261,7 +267,7 @@
 /* shut write */
 static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
 {
-	if (cs->flags & CS_FL_SHW)
+	if (!cs_conn(cs) || cs->flags & CS_FL_SHW)
 		return;
 
 	/* clean data-layer shutdown */
@@ -387,29 +393,25 @@
 /* Release a conn_stream */
 static inline void cs_destroy(struct conn_stream *cs)
 {
-	if (cs->conn->mux)
-		cs->conn->mux->detach(cs);
-	else {
-		/* It's too early to have a mux, let's just destroy
-		 * the connection
-		 */
-		struct connection *conn = cs->conn;
+	if (cs_conn(cs)) {
+		if (cs->conn->mux)
+			cs->conn->mux->detach(cs);
+		else {
+			/* It's too early to have a mux, let's just destroy
+			 * the connection
+			 */
+			struct connection *conn = cs->conn;
 
-		conn_stop_tracking(conn);
-		conn_full_close(conn);
-		if (conn->destroy_cb)
-			conn->destroy_cb(conn);
-		conn_free(conn);
+			conn_stop_tracking(conn);
+			conn_full_close(conn);
+			if (conn->destroy_cb)
+				conn->destroy_cb(conn);
+			conn_free(conn);
+		}
 	}
 	cs_free(cs);
 }
 
-/* Returns the conn from a cs. If cs is NULL, returns NULL */
-static inline struct connection *cs_conn(const struct conn_stream *cs)
-{
-	return cs ? cs->conn : NULL;
-}
-
 /* Returns the source address of the connection or NULL if not set */
 static inline const struct sockaddr_storage *conn_src(struct connection *conn)
 {
diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h
index c1c2b03..55c622b 100644
--- a/include/haproxy/stream_interface.h
+++ b/include/haproxy/stream_interface.h
@@ -183,9 +183,9 @@
 		appctx_free(appctx);
 	}
 	else if ((cs = objt_cs(si->end))) {
-		if (si->wait_event.events != 0)
+		if (cs_conn(cs) && si->wait_event.events != 0)
 			cs->conn->mux->unsubscribe(cs, si->wait_event.events,
-			    &si->wait_event);
+						   &si->wait_event);
 		cs_destroy(cs);
 	}
 	si_detach_endpoint(si);
@@ -481,7 +481,7 @@
 		return 0;
 
 	cs = objt_cs(si->end);
-	if (!cs || !cs->conn->mux)
+	if (!cs_conn(cs) || !cs->conn->mux)
 		return 0; // only conn_streams are supported
 
 	if (si->wait_event.events & SUB_RETRY_RECV)
@@ -578,7 +578,7 @@
 	else {
 		struct conn_stream *cs = objt_cs(si->end);
 
-		if (cs && cs->conn)
+		if (cs_conn(cs))
 			return conn_src(cs->conn);
 	}
 	return NULL;
@@ -598,7 +598,7 @@
 	else {
 		struct conn_stream *cs = objt_cs(si->end);
 
-		if (cs && cs->conn)
+		if (cs_conn(cs))
 			return conn_dst(cs->conn);
 	}
 	return NULL;
@@ -622,7 +622,7 @@
 	else {
 		struct conn_stream *cs = objt_cs(si->end);
 
-		if (cs && cs->conn)
+		if (cs_conn(cs))
 			src = conn_src(cs->conn);
 	}
 	if (!src)
@@ -653,7 +653,7 @@
 	else {
 		struct conn_stream *cs = objt_cs(si->end);
 
-		if (cs && cs->conn)
+		if (cs_conn(cs))
 			dst = conn_dst(cs->conn);
 	}
 	if (!dst)
diff --git a/src/backend.c b/src/backend.c
index fb63131..6a793a0 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -2220,8 +2220,6 @@
 void back_handle_st_cer(struct stream *s)
 {
 	struct stream_interface *si = &s->si[1];
-	struct conn_stream *cs = objt_cs(si->end);
-	struct connection *conn = cs_conn(cs);
 
 	DBG_TRACE_ENTER(STRM_EV_STRM_PROC|STRM_EV_SI_ST, s);
 
@@ -2230,6 +2228,8 @@
 
 	/* we probably have to release last stream from the server */
 	if (objt_server(s->target)) {
+		struct connection *conn = cs_conn(objt_cs(si->end));
+
 		health_adjust(__objt_server(s->target), HANA_STATUS_L4_ERR);
 
 		if (s->flags & SF_CURR_SESS) {
diff --git a/src/check.c b/src/check.c
index cb1be9b..97d340a 100644
--- a/src/check.c
+++ b/src/check.c
@@ -233,7 +233,9 @@
 
 
 	if (check->cs) {
-		chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", check->cs->conn, check->cs->conn->flags);
+		struct connection *conn = cs_conn(check->cs);
+
+		chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", conn, conn ? conn->flags : 0);
 		chunk_appendf(&trace_buf, " cs=%p(0x%08x)", check->cs, check->cs->flags);
 	}
 
@@ -791,7 +793,7 @@
 		retrieve_errno_from_socket(conn);
 
 	if (conn && !(conn->flags & CO_FL_ERROR) &&
-	    !(cs->flags & CS_FL_ERROR) && !expired)
+	    cs && !(cs->flags & CS_FL_ERROR) && !expired)
 		return;
 
 	TRACE_ENTER(CHK_EV_HCHK_END|CHK_EV_HCHK_ERR, check, 0, 0, (size_t[]){expired});
@@ -904,7 +906,7 @@
 		set_server_check_status(check, HCHK_STATUS_SOCKERR, err_msg);
 	}
 
-	if (!conn || !conn->ctrl) {
+	if (!cs || !conn || !conn->ctrl) {
 		/* error before any connection attempt (connection allocation error or no control layer) */
 		set_server_check_status(check, HCHK_STATUS_SOCKERR, err_msg);
 	}
@@ -1016,7 +1018,7 @@
  */
 static int wake_srv_chk(struct conn_stream *cs)
 {
-	struct connection *conn = cs->conn;
+	struct connection *conn;
 	struct check *check = cs->data;
 	struct email_alertq *q = container_of(check, typeof(*q), check);
 	int ret = 0;
@@ -1031,9 +1033,9 @@
 	ret = tcpcheck_main(check);
 
 	cs = check->cs;
-	conn = cs->conn;
+	conn = cs_conn(cs);
 
-	if (unlikely(conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
+	if (unlikely(!conn || !cs || conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
 		/* We may get error reports bypassing the I/O handlers, typically
 		 * the case when sending a pure TCP check which fails, then the I/O
 		 * handlers above are not called. This is completely handled by the
@@ -1053,7 +1055,7 @@
 		ret = -1;
 
 		if (check->wait_list.events)
-			cs->conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
+			conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
 
 		/* We may have been scheduled to run, and the
 		 * I/O handler expects to have a cs, so remove
@@ -1171,6 +1173,8 @@
 	TRACE_STATE("health-check complete or aborted", CHK_EV_TASK_WAKE|CHK_EV_HCHK_END, check);
 
 	check->current_step = NULL;
+	cs = check->cs;
+	conn = cs_conn(cs);
 
 	if (conn && conn->xprt) {
 		/* The check was aborted and the connection was not yet closed.
@@ -1182,8 +1186,8 @@
 	}
 
 	if (cs) {
-		if (check->wait_list.events)
-			cs->conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
+		if (conn && check->wait_list.events)
+			conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
 		/* We may have been scheduled to run, and the
 		 * I/O handler expects to have a cs, so remove
 		 * the tasklet
@@ -1352,7 +1356,10 @@
 	check_release_buf(check, &check->bi);
 	check_release_buf(check, &check->bo);
 	if (check->cs) {
-		ha_free(&check->cs->conn);
+		struct connection *conn = cs_conn(check->cs);
+
+		if (conn)
+			conn_free(conn);
 		cs_free(check->cs);
 		check->cs = NULL;
 	}
diff --git a/src/http_ana.c b/src/http_ana.c
index c2d9d9b..6cb248c 100644
--- a/src/http_ana.c
+++ b/src/http_ana.c
@@ -1325,10 +1325,7 @@
 	if (unlikely(htx_is_empty(htx) || htx->first == -1)) {
 		/* 1: have we encountered a read error ? */
 		if (rep->flags & CF_READ_ERROR) {
-			struct connection *conn = NULL;
-
-			if (objt_cs(s->si[1].end))
-				conn = __objt_cs(s->si[1].end)->conn;
+			struct connection *conn = cs_conn(objt_cs(s->si[1].end));
 
 			/* Perform a L7 retry because server refuses the early data. */
 			if ((si_b->flags & SI_FL_L7_RETRY) &&
@@ -5007,7 +5004,7 @@
         chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
                      dir,
                      objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1,
-                     objt_cs(s->si[1].end) ? (unsigned short)__objt_cs(s->si[1].end)->conn->handle.fd : -1);
+                     cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1);
 
         max = HTX_SL_P1_LEN(sl);
         UBOUND(max, trash.size - trash.data - 3);
@@ -5038,7 +5035,7 @@
         chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
                      dir,
                      objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1,
-                     objt_cs(s->si[1].end) ? (unsigned short)__objt_cs(s->si[1].end)->conn->handle.fd : -1);
+                     cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1);
 
         max = n.len;
         UBOUND(max, trash.size - trash.data - 3);
diff --git a/src/stream.c b/src/stream.c
index 628bdf5..98e34a9 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -460,7 +460,7 @@
 	si_set_state(&s->si[0], SI_ST_EST);
 	s->si[0].hcto = sess->fe->timeout.clientfin;
 
-	if (cs && cs->conn->mux) {
+	if (cs_conn(cs) && cs->conn->mux) {
 		if (cs->conn->mux->flags & MX_FL_CLEAN_ABRT)
 			s->si[0].flags |= SI_FL_CLEAN_ABRT;
 		if (cs->conn->mux->flags & MX_FL_HTX)
@@ -883,8 +883,7 @@
 static void back_establish(struct stream *s)
 {
 	struct stream_interface *si = &s->si[1];
-	struct conn_stream *srv_cs = objt_cs(si->end);
-	struct connection *conn = srv_cs ? srv_cs->conn : objt_conn(si->end);
+	struct connection *conn = cs_conn(objt_cs(si->end));
 	struct channel *req = &s->req;
 	struct channel *rep = &s->res;
 
@@ -930,7 +929,7 @@
 
 	si_rx_endp_more(si);
 	rep->flags |= CF_READ_ATTACHED; /* producer is now attached */
-	if (objt_cs(si->end)) {
+	if (conn) {
 		/* real connections have timeouts
 		 * if already defined, it means that a set-timeout rule has
 		 * been executed so do not overwrite them
@@ -2164,9 +2163,9 @@
 	if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
 	    req->to_forward &&
 	    (global.tune.options & GTUNE_USE_SPLICE) &&
-	    (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe &&
+	    (cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe &&
 	     __objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->rcv_pipe) &&
-	    (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe &&
+	    (cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe &&
 	     __objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->snd_pipe) &&
 	    (pipes_used < global.maxpipes) &&
 	    (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
@@ -2357,9 +2356,9 @@
 	if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
 	    res->to_forward &&
 	    (global.tune.options & GTUNE_USE_SPLICE) &&
-	    (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe &&
+	    (cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe &&
 	     __objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->snd_pipe) &&
-	    (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe &&
+	    (cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe &&
 	     __objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->rcv_pipe) &&
 	    (pipes_used < global.maxpipes) &&
 	    (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
@@ -2436,18 +2435,18 @@
 		if (si_b->state == SI_ST_CLO &&
 		    si_b->prev_state == SI_ST_EST) {
 			chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
-				      s->uniq_id, s->be->id,
-			              objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
-			              objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
+				     s->uniq_id, s->be->id,
+				     cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
+				     cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
 			DISGUISE(write(1, trash.area, trash.data));
 		}
 
 		if (si_f->state == SI_ST_CLO &&
 		    si_f->prev_state == SI_ST_EST) {
 			chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
-				      s->uniq_id, s->be->id,
-			              objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
-			              objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
+				     s->uniq_id, s->be->id,
+				     cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
+				     cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
 			DISGUISE(write(1, trash.area, trash.data));
 		}
 	}
@@ -2513,9 +2512,9 @@
 	if (unlikely((global.mode & MODE_DEBUG) &&
 		     (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
 		chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
-			      s->uniq_id, s->be->id,
-		              objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
-		              objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
+			     s->uniq_id, s->be->id,
+			     cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
+			     cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
 		DISGUISE(write(1, trash.area, trash.data));
 	}
 
@@ -3291,7 +3290,8 @@
 			                     TICKS_TO_MS(1000)) : "<NEVER>",
 			     strm->si[1].err_type, strm->si[1].wait_event.events);
 
-		if ((cs = objt_cs(strm->si[0].end)) != NULL) {
+		if (cs_conn(objt_cs(strm->si[0].end)) != NULL) {
+			cs = __objt_cs(strm->si[0].end);
 			conn = cs->conn;
 
 			chunk_appendf(&trash,
@@ -3327,7 +3327,8 @@
 			              (unsigned long long)tmpctx->t->cpu_time, (unsigned long long)tmpctx->t->lat_time);
 		}
 
-		if ((cs = objt_cs(strm->si[1].end)) != NULL) {
+		if (cs_conn(objt_cs(strm->si[1].end)) != NULL) {
+			cs = __objt_cs(strm->si[1].end);
 			conn = cs->conn;
 
 			chunk_appendf(&trash,
diff --git a/src/stream_interface.c b/src/stream_interface.c
index c32d566..e6254de 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -354,12 +354,11 @@
 
 		if (cs && cs->data_cb == &si_conn_cb) {
 			struct stream_interface *si = cs->data;
-			struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end);
 			struct stream *strm = si_strm(si);
 
 			ret = make_proxy_line(trash.area, trash.size,
 					      objt_server(conn->target),
-					      remote_cs ? remote_cs->conn : NULL,
+					      cs_conn(objt_cs(si_opposite(si)->end)),
 					      strm);
 		}
 		else {
@@ -434,7 +433,7 @@
 
 	/* process consumer side */
 	if (channel_is_empty(oc)) {
-		struct connection *conn = objt_cs(si->end) ? __objt_cs(si->end)->conn : NULL;
+		struct connection *conn = cs_conn(objt_cs(si->end));
 
 		if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
 		    (si->state == SI_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS))))
@@ -800,7 +799,7 @@
 	struct conn_stream *cs = objt_cs(si->end);
 	int ret = 0;
 
-	if (!cs)
+	if (!cs_conn(cs))
 		return t;
 
 	if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
@@ -927,7 +926,7 @@
 		return;
 
 	cs = objt_cs(si->end);
-	if (!cs || !cs->conn->mux)
+	if (!cs_conn(cs) || !cs->conn->mux)
 		return;
 
 	si_cs_send(cs);