MINOR: connection: Be prepared to handle conn-stream with no connection
The conn-stream will progressively replace the stream-interface. Thus, a
stream will have to allocate the backend conn-stream during its
creation. This means it will be possible to have a conn-stream with no
connection. To prepare this change, we test the conn-stream's connection
when we retrieve it.
diff --git a/include/haproxy/connection.h b/include/haproxy/connection.h
index 7cc852f..e0af4c3 100644
--- a/include/haproxy/connection.h
+++ b/include/haproxy/connection.h
@@ -246,10 +246,16 @@
c->xprt->shutw(c, c->xprt_ctx, 0);
}
+/* Returns the conn from a cs. If cs is NULL, returns NULL */
+static inline struct connection *cs_conn(const struct conn_stream *cs)
+{
+ return cs ? cs->conn : NULL;
+}
+
/* shut read */
static inline void cs_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{
- if (cs->flags & CS_FL_SHR)
+ if (!cs_conn(cs) || cs->flags & CS_FL_SHR)
return;
/* clean data-layer shutdown */
@@ -261,7 +267,7 @@
/* shut write */
static inline void cs_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{
- if (cs->flags & CS_FL_SHW)
+ if (!cs_conn(cs) || cs->flags & CS_FL_SHW)
return;
/* clean data-layer shutdown */
@@ -387,29 +393,25 @@
/* Release a conn_stream */
static inline void cs_destroy(struct conn_stream *cs)
{
- if (cs->conn->mux)
- cs->conn->mux->detach(cs);
- else {
- /* It's too early to have a mux, let's just destroy
- * the connection
- */
- struct connection *conn = cs->conn;
+ if (cs_conn(cs)) {
+ if (cs->conn->mux)
+ cs->conn->mux->detach(cs);
+ else {
+ /* It's too early to have a mux, let's just destroy
+ * the connection
+ */
+ struct connection *conn = cs->conn;
- conn_stop_tracking(conn);
- conn_full_close(conn);
- if (conn->destroy_cb)
- conn->destroy_cb(conn);
- conn_free(conn);
+ conn_stop_tracking(conn);
+ conn_full_close(conn);
+ if (conn->destroy_cb)
+ conn->destroy_cb(conn);
+ conn_free(conn);
+ }
}
cs_free(cs);
}
-/* Returns the conn from a cs. If cs is NULL, returns NULL */
-static inline struct connection *cs_conn(const struct conn_stream *cs)
-{
- return cs ? cs->conn : NULL;
-}
-
/* Returns the source address of the connection or NULL if not set */
static inline const struct sockaddr_storage *conn_src(struct connection *conn)
{
diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h
index c1c2b03..55c622b 100644
--- a/include/haproxy/stream_interface.h
+++ b/include/haproxy/stream_interface.h
@@ -183,9 +183,9 @@
appctx_free(appctx);
}
else if ((cs = objt_cs(si->end))) {
- if (si->wait_event.events != 0)
+ if (cs_conn(cs) && si->wait_event.events != 0)
cs->conn->mux->unsubscribe(cs, si->wait_event.events,
- &si->wait_event);
+ &si->wait_event);
cs_destroy(cs);
}
si_detach_endpoint(si);
@@ -481,7 +481,7 @@
return 0;
cs = objt_cs(si->end);
- if (!cs || !cs->conn->mux)
+ if (!cs_conn(cs) || !cs->conn->mux)
return 0; // only conn_streams are supported
if (si->wait_event.events & SUB_RETRY_RECV)
@@ -578,7 +578,7 @@
else {
struct conn_stream *cs = objt_cs(si->end);
- if (cs && cs->conn)
+ if (cs_conn(cs))
return conn_src(cs->conn);
}
return NULL;
@@ -598,7 +598,7 @@
else {
struct conn_stream *cs = objt_cs(si->end);
- if (cs && cs->conn)
+ if (cs_conn(cs))
return conn_dst(cs->conn);
}
return NULL;
@@ -622,7 +622,7 @@
else {
struct conn_stream *cs = objt_cs(si->end);
- if (cs && cs->conn)
+ if (cs_conn(cs))
src = conn_src(cs->conn);
}
if (!src)
@@ -653,7 +653,7 @@
else {
struct conn_stream *cs = objt_cs(si->end);
- if (cs && cs->conn)
+ if (cs_conn(cs))
dst = conn_dst(cs->conn);
}
if (!dst)
diff --git a/src/backend.c b/src/backend.c
index fb63131..6a793a0 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -2220,8 +2220,6 @@
void back_handle_st_cer(struct stream *s)
{
struct stream_interface *si = &s->si[1];
- struct conn_stream *cs = objt_cs(si->end);
- struct connection *conn = cs_conn(cs);
DBG_TRACE_ENTER(STRM_EV_STRM_PROC|STRM_EV_SI_ST, s);
@@ -2230,6 +2228,8 @@
/* we probably have to release last stream from the server */
if (objt_server(s->target)) {
+ struct connection *conn = cs_conn(objt_cs(si->end));
+
health_adjust(__objt_server(s->target), HANA_STATUS_L4_ERR);
if (s->flags & SF_CURR_SESS) {
diff --git a/src/check.c b/src/check.c
index cb1be9b..97d340a 100644
--- a/src/check.c
+++ b/src/check.c
@@ -233,7 +233,9 @@
if (check->cs) {
- chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", check->cs->conn, check->cs->conn->flags);
+ struct connection *conn = cs_conn(check->cs);
+
+ chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", conn, conn ? conn->flags : 0);
chunk_appendf(&trace_buf, " cs=%p(0x%08x)", check->cs, check->cs->flags);
}
@@ -791,7 +793,7 @@
retrieve_errno_from_socket(conn);
if (conn && !(conn->flags & CO_FL_ERROR) &&
- !(cs->flags & CS_FL_ERROR) && !expired)
+ cs && !(cs->flags & CS_FL_ERROR) && !expired)
return;
TRACE_ENTER(CHK_EV_HCHK_END|CHK_EV_HCHK_ERR, check, 0, 0, (size_t[]){expired});
@@ -904,7 +906,7 @@
set_server_check_status(check, HCHK_STATUS_SOCKERR, err_msg);
}
- if (!conn || !conn->ctrl) {
+ if (!cs || !conn || !conn->ctrl) {
/* error before any connection attempt (connection allocation error or no control layer) */
set_server_check_status(check, HCHK_STATUS_SOCKERR, err_msg);
}
@@ -1016,7 +1018,7 @@
*/
static int wake_srv_chk(struct conn_stream *cs)
{
- struct connection *conn = cs->conn;
+ struct connection *conn;
struct check *check = cs->data;
struct email_alertq *q = container_of(check, typeof(*q), check);
int ret = 0;
@@ -1031,9 +1033,9 @@
ret = tcpcheck_main(check);
cs = check->cs;
- conn = cs->conn;
+ conn = cs_conn(cs);
- if (unlikely(conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
+ if (unlikely(!conn || !cs || conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
/* We may get error reports bypassing the I/O handlers, typically
* the case when sending a pure TCP check which fails, then the I/O
* handlers above are not called. This is completely handled by the
@@ -1053,7 +1055,7 @@
ret = -1;
if (check->wait_list.events)
- cs->conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
+ conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
/* We may have been scheduled to run, and the
* I/O handler expects to have a cs, so remove
@@ -1171,6 +1173,8 @@
TRACE_STATE("health-check complete or aborted", CHK_EV_TASK_WAKE|CHK_EV_HCHK_END, check);
check->current_step = NULL;
+ cs = check->cs;
+ conn = cs_conn(cs);
if (conn && conn->xprt) {
/* The check was aborted and the connection was not yet closed.
@@ -1182,8 +1186,8 @@
}
if (cs) {
- if (check->wait_list.events)
- cs->conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
+ if (conn && check->wait_list.events)
+ conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
/* We may have been scheduled to run, and the
* I/O handler expects to have a cs, so remove
* the tasklet
@@ -1352,7 +1356,10 @@
check_release_buf(check, &check->bi);
check_release_buf(check, &check->bo);
if (check->cs) {
- ha_free(&check->cs->conn);
+ struct connection *conn = cs_conn(check->cs);
+
+ if (conn)
+ conn_free(conn);
cs_free(check->cs);
check->cs = NULL;
}
diff --git a/src/http_ana.c b/src/http_ana.c
index c2d9d9b..6cb248c 100644
--- a/src/http_ana.c
+++ b/src/http_ana.c
@@ -1325,10 +1325,7 @@
if (unlikely(htx_is_empty(htx) || htx->first == -1)) {
/* 1: have we encountered a read error ? */
if (rep->flags & CF_READ_ERROR) {
- struct connection *conn = NULL;
-
- if (objt_cs(s->si[1].end))
- conn = __objt_cs(s->si[1].end)->conn;
+ struct connection *conn = cs_conn(objt_cs(s->si[1].end));
/* Perform a L7 retry because server refuses the early data. */
if ((si_b->flags & SI_FL_L7_RETRY) &&
@@ -5007,7 +5004,7 @@
chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
dir,
objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1,
- objt_cs(s->si[1].end) ? (unsigned short)__objt_cs(s->si[1].end)->conn->handle.fd : -1);
+ cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1);
max = HTX_SL_P1_LEN(sl);
UBOUND(max, trash.size - trash.data - 3);
@@ -5038,7 +5035,7 @@
chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
dir,
objt_conn(sess->origin) ? (unsigned short)__objt_conn(sess->origin)->handle.fd : -1,
- objt_cs(s->si[1].end) ? (unsigned short)__objt_cs(s->si[1].end)->conn->handle.fd : -1);
+ cs_conn(objt_cs(s->si[1].end)) ? (unsigned short)(cs_conn(__objt_cs(s->si[1].end)))->handle.fd : -1);
max = n.len;
UBOUND(max, trash.size - trash.data - 3);
diff --git a/src/stream.c b/src/stream.c
index 628bdf5..98e34a9 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -460,7 +460,7 @@
si_set_state(&s->si[0], SI_ST_EST);
s->si[0].hcto = sess->fe->timeout.clientfin;
- if (cs && cs->conn->mux) {
+ if (cs_conn(cs) && cs->conn->mux) {
if (cs->conn->mux->flags & MX_FL_CLEAN_ABRT)
s->si[0].flags |= SI_FL_CLEAN_ABRT;
if (cs->conn->mux->flags & MX_FL_HTX)
@@ -883,8 +883,7 @@
static void back_establish(struct stream *s)
{
struct stream_interface *si = &s->si[1];
- struct conn_stream *srv_cs = objt_cs(si->end);
- struct connection *conn = srv_cs ? srv_cs->conn : objt_conn(si->end);
+ struct connection *conn = cs_conn(objt_cs(si->end));
struct channel *req = &s->req;
struct channel *rep = &s->res;
@@ -930,7 +929,7 @@
si_rx_endp_more(si);
rep->flags |= CF_READ_ATTACHED; /* producer is now attached */
- if (objt_cs(si->end)) {
+ if (conn) {
/* real connections have timeouts
* if already defined, it means that a set-timeout rule has
* been executed so do not overwrite them
@@ -2164,9 +2163,9 @@
if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
req->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) &&
- (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe &&
+ (cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe &&
__objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->rcv_pipe) &&
- (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe &&
+ (cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe &&
__objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->snd_pipe) &&
(pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
@@ -2357,9 +2356,9 @@
if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
res->to_forward &&
(global.tune.options & GTUNE_USE_SPLICE) &&
- (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe &&
+ (cs_conn(objt_cs(si_f->end)) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe &&
__objt_cs(si_f->end)->conn->mux && __objt_cs(si_f->end)->conn->mux->snd_pipe) &&
- (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe &&
+ (cs_conn(objt_cs(si_b->end)) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe &&
__objt_cs(si_b->end)->conn->mux && __objt_cs(si_b->end)->conn->mux->rcv_pipe) &&
(pipes_used < global.maxpipes) &&
(((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
@@ -2436,18 +2435,18 @@
if (si_b->state == SI_ST_CLO &&
si_b->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
- s->uniq_id, s->be->id,
- objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
- objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
+ s->uniq_id, s->be->id,
+ cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
+ cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
DISGUISE(write(1, trash.area, trash.data));
}
if (si_f->state == SI_ST_CLO &&
si_f->prev_state == SI_ST_EST) {
chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
- s->uniq_id, s->be->id,
- objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
- objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
+ s->uniq_id, s->be->id,
+ cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
+ cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
DISGUISE(write(1, trash.area, trash.data));
}
}
@@ -2513,9 +2512,9 @@
if (unlikely((global.mode & MODE_DEBUG) &&
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
- s->uniq_id, s->be->id,
- objt_cs(si_f->end) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
- objt_cs(si_b->end) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
+ s->uniq_id, s->be->id,
+ cs_conn(objt_cs(si_f->end)) ? (unsigned short)__objt_cs(si_f->end)->conn->handle.fd : -1,
+ cs_conn(objt_cs(si_b->end)) ? (unsigned short)__objt_cs(si_b->end)->conn->handle.fd : -1);
DISGUISE(write(1, trash.area, trash.data));
}
@@ -3291,7 +3290,8 @@
TICKS_TO_MS(1000)) : "<NEVER>",
strm->si[1].err_type, strm->si[1].wait_event.events);
- if ((cs = objt_cs(strm->si[0].end)) != NULL) {
+ if (cs_conn(objt_cs(strm->si[0].end)) != NULL) {
+ cs = __objt_cs(strm->si[0].end);
conn = cs->conn;
chunk_appendf(&trash,
@@ -3327,7 +3327,8 @@
(unsigned long long)tmpctx->t->cpu_time, (unsigned long long)tmpctx->t->lat_time);
}
- if ((cs = objt_cs(strm->si[1].end)) != NULL) {
+ if (cs_conn(objt_cs(strm->si[1].end)) != NULL) {
+ cs = __objt_cs(strm->si[1].end);
conn = cs->conn;
chunk_appendf(&trash,
diff --git a/src/stream_interface.c b/src/stream_interface.c
index c32d566..e6254de 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -354,12 +354,11 @@
if (cs && cs->data_cb == &si_conn_cb) {
struct stream_interface *si = cs->data;
- struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end);
struct stream *strm = si_strm(si);
ret = make_proxy_line(trash.area, trash.size,
objt_server(conn->target),
- remote_cs ? remote_cs->conn : NULL,
+ cs_conn(objt_cs(si_opposite(si)->end)),
strm);
}
else {
@@ -434,7 +433,7 @@
/* process consumer side */
if (channel_is_empty(oc)) {
- struct connection *conn = objt_cs(si->end) ? __objt_cs(si->end)->conn : NULL;
+ struct connection *conn = cs_conn(objt_cs(si->end));
if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
(si->state == SI_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS))))
@@ -800,7 +799,7 @@
struct conn_stream *cs = objt_cs(si->end);
int ret = 0;
- if (!cs)
+ if (!cs_conn(cs))
return t;
if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
@@ -927,7 +926,7 @@
return;
cs = objt_cs(si->end);
- if (!cs || !cs->conn->mux)
+ if (!cs_conn(cs) || !cs->conn->mux)
return;
si_cs_send(cs);