MAJOR: connections: Detach connections from streams.
Do not destroy the connection when we're about to destroy a stream. This
prevents us from doing keepalive on server connections when the client is
using HTTP/2, as a new stream is created for each request.
Instead, the session is now responsible for destroying connections.
When reusing connections, the attach() mux method is now used to create a new
conn_stream.
diff --git a/src/backend.c b/src/backend.c
index d402028..0b45d85 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -583,7 +583,7 @@
srv = NULL;
s->target = NULL;
- conn = cs_conn(objt_cs(s->si[1].end));
+ conn = s->sess->srv_conn;
if (conn &&
(conn->flags & CO_FL_CONNECTED) &&
@@ -1056,28 +1056,22 @@
{
struct connection *cli_conn = NULL;
struct connection *srv_conn;
+ struct connection *old_conn;
struct conn_stream *srv_cs;
- struct conn_stream *old_cs;
struct server *srv;
int reuse = 0;
int err;
+
srv = objt_server(s->target);
- srv_cs = objt_cs(s->si[1].end);
- srv_conn = cs_conn(srv_cs);
+ old_conn = srv_conn = s->sess->srv_conn;
if (srv_conn)
- reuse = s->target == srv_conn->target;
+ reuse = (s->target == srv_conn->target) &&
+ (srv_conn->mux->avail_streams(srv_conn) > 0) &&
+ conn_xprt_ready(srv_conn);
if (srv && !reuse) {
- old_cs = srv_cs;
- if (old_cs) {
- srv_conn = NULL;
- srv_cs->data = NULL;
- si_detach_endpoint(&s->si[1]);
- /* note: if the connection was in a server's idle
- * queue, it doesn't get dequeued.
- */
- }
+ srv_conn = NULL;
/* Below we pick connections from the safe or idle lists based
* on the strategy, the fact that this is a first or second
@@ -1114,29 +1108,8 @@
* other owner's. That way it may remain alive for others to
* pick.
*/
- if (srv_conn) {
- LIST_DEL(&srv_conn->list);
- LIST_INIT(&srv_conn->list);
-
- /* XXX cognet: this assumes only 1 conn_stream per
- * connection, has to be revisited later
- */
- srv_cs = srv_conn->mux_ctx;
-
- if (srv_cs->data) {
- si_detach_endpoint(srv_cs->data);
- if (old_cs && !(old_cs->conn->flags & CO_FL_PRIVATE)) {
- si_attach_cs(srv_cs->data, old_cs);
- si_idle_cs(srv_cs->data, NULL);
- }
- }
- si_attach_cs(&s->si[1], srv_cs);
+ if (srv_conn)
reuse = 1;
- }
-
- /* we may have to release our connection if we couldn't swap it */
- if (old_cs && !old_cs->data)
- cs_destroy(old_cs);
}
if (reuse) {
@@ -1155,13 +1128,74 @@
}
}
+ /* We're about to use another connection, let the mux know we're
+ * done with this one
+ */
+ if (old_conn != srv_conn) {
+ int did_switch = 0;
+
+ if (srv_conn && reuse) {
+ struct session *sess;
+ int count = 0;
+
+ /*
+ * If we're attempting to reuse a connection, and
+ * the new connection has only one user, and there
+ * are no more streams available, attempt to give
+ * it our old connection
+ */
+ list_for_each_entry(sess, &srv_conn->session_list,
+ conn_list) {
+ count++;
+ if (count > 1)
+ break;
+ }
+ if (count == 1) {
+ sess = LIST_ELEM(srv_conn->session_list.n,
+ struct session *, conn_list);
+ LIST_DEL(&sess->conn_list);
+ if (old_conn &&
+ !(old_conn->flags & CO_FL_PRIVATE) &&
+ (old_conn->mux->avail_streams(old_conn) > 0) &&
+ (srv_conn->mux->avail_streams(srv_conn) == 1)) {
+ LIST_ADDQ(&old_conn->session_list, &sess->conn_list);
+ sess->srv_conn = old_conn;
+ } else {
+ LIST_INIT(&sess->conn_list);
+ sess->srv_conn = NULL;
+ }
+ did_switch = 1;
+ }
+
+ }
+ /*
+ * We didn't manage to give our old connection, destroy it
+ */
+ if (old_conn && !did_switch) {
+ old_conn->owner = NULL;
+ old_conn->mux->destroy(old_conn);
+ old_conn = NULL;
+ }
+ }
+
if (!reuse) {
srv_cs = si_alloc_cs(&s->si[1], NULL);
srv_conn = cs_conn(srv_cs);
} else {
- /* reusing our connection, take it out of the idle list */
- LIST_DEL(&srv_conn->list);
- LIST_INIT(&srv_conn->list);
+ if (srv_conn->mux->avail_streams(srv_conn) == 1) {
+ /* No more streams available, remove it from the list */
+ LIST_DEL(&srv_conn->list);
+ LIST_INIT(&srv_conn->list);
+ }
+ srv_cs = srv_conn->mux->attach(srv_conn);
+ if (srv_cs)
+ si_attach_cs(&s->si[1], srv_cs);
+ }
+ if (srv_conn && old_conn != srv_conn) {
+ srv_conn->owner = s->sess;
+ s->sess->srv_conn = srv_conn;
+ LIST_DEL(&s->sess->conn_list);
+ LIST_ADDQ(&srv_conn->session_list, &s->sess->conn_list);
}
if (!srv_cs)
@@ -1203,15 +1237,10 @@
conn_get_to_addr(cli_conn);
}
- si_attach_cs(&s->si[1], srv_cs);
-
assign_tproxy_address(s);
}
- else {
- /* the connection is being reused, just re-attach it */
- si_attach_cs(&s->si[1], srv_cs);
+ else
s->flags |= SF_SRV_REUSED;
- }
/* flag for logging source ip/port */
if (strm_fe(s)->options2 & PR_O2_SRC_ADDR)
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 87b5cb1..9e4f801 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -62,7 +62,6 @@
#define H2_CF_WAIT_FOR_HS 0x00004000 // We did check that at least a stream was waiting for handshake
#define H2_CF_IS_BACK 0x00008000 // this is an outgoing connection
-
/* H2 connection state, in h2c->st0 */
enum h2_cs {
H2_CS_PREFACE, // init done, waiting for connection preface
@@ -2264,7 +2263,7 @@
if (!b_data(buf)) {
h2_release_buf(h2c, &h2c->dbuf);
- return 0;
+ return conn_xprt_read0_pending(conn);
}
if (b_data(buf) == buf->size)
@@ -2282,7 +2281,7 @@
int sent = 0;
if (conn->flags & CO_FL_ERROR)
- return 0;
+ return 1;
if (conn->flags & (CO_FL_HANDSHAKE|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) {
diff --git a/src/mux_pt.c b/src/mux_pt.c
index a0f0397..a974ec3 100644
--- a/src/mux_pt.c
+++ b/src/mux_pt.c
@@ -30,6 +30,9 @@
LIST_DEL(&conn->list);
conn_stop_tracking(conn);
conn_full_close(conn);
+ tasklet_free(ctx->wait_event.task);
+ conn->mux = NULL;
+ conn->mux_ctx = NULL;
if (conn->destroy_cb)
conn->destroy_cb(conn);
/* We don't bother unsubscribing here, as we're about to destroy
@@ -45,7 +48,7 @@
struct mux_pt_ctx *ctx = tctx;
conn_sock_drain(ctx->conn);
- if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH))
+ if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))
mux_pt_destroy(ctx);
else
ctx->conn->xprt->subscribe(ctx->conn, SUB_CAN_RECV,
@@ -135,6 +138,16 @@
*/
static struct conn_stream *mux_pt_attach(struct connection *conn)
{
+ struct conn_stream *cs;
+ struct mux_pt_ctx *ctx = conn->mux_ctx;
+
+ cs = cs_new(conn);
+ if (!cs)
+ goto fail;
+
+ ctx->cs = cs;
+ return (cs);
+fail:
return NULL;
}
@@ -149,10 +162,13 @@
return cs;
}
-/* Destroy the mux and the associated connection */
+/* Destroy the mux and the associated connection, if no longer used */
static void mux_pt_destroy_meth(struct connection *conn)
{
- mux_pt_destroy(conn->mux_ctx);
+ struct mux_pt_ctx *ctx = conn->mux_ctx;
+
+ if (!(ctx->cs))
+ mux_pt_destroy(ctx);
}
/*
@@ -164,9 +180,13 @@
struct mux_pt_ctx *ctx = cs->conn->mux_ctx;
/* Subscribe, to know if we got disconnected */
- conn->xprt->subscribe(conn, SUB_CAN_RECV, &ctx->wait_event);
- ctx->cs = NULL;
- mux_pt_destroy(ctx);
+ if (conn->owner != NULL &&
+ !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
+ ctx->cs = NULL;
+ conn->xprt->subscribe(conn, SUB_CAN_RECV, &ctx->wait_event);
+ } else
+ /* There's no session attached to that connection, destroy it */
+ mux_pt_destroy(ctx);
}
static int mux_pt_avail_streams(struct connection *conn)
@@ -184,6 +204,11 @@
cs->conn->xprt->shutr(cs->conn, (mode == CS_SHR_DRAIN));
if (cs->flags & CS_FL_SHW)
conn_full_close(cs->conn);
+ /* Maybe we've been put in the list of available idle connections,
+ * get ouf of here
+ */
+ LIST_DEL(&cs->conn->list);
+ LIST_INIT(&cs->conn->list);
}
static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
@@ -196,6 +221,11 @@
conn_sock_shutw(cs->conn, (mode == CS_SHW_NORMAL));
else
conn_full_close(cs->conn);
+ /* Maybe we've been put in the list of available idle connections,
+ * get ouf of here
+ */
+ LIST_DEL(&cs->conn->list);
+ LIST_INIT(&cs->conn->list);
}
/*
diff --git a/src/proto_http.c b/src/proto_http.c
index 0ce03b3..c93afa9 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -3716,20 +3716,18 @@
* flags. We also need a more accurate method for computing per-request
* data.
*/
- /*
- * XXX cognet: This is probably wrong, this is killing a whole
- * connection, in the new world order, we probably want to just kill
- * the stream, this is to be revisited the day we handle multiple
- * streams in one server connection.
- */
cs = objt_cs(s->si[1].end);
srv_conn = cs_conn(cs);
/* unless we're doing keep-alive, we want to quickly close the connection
* to the server.
+ * XXX cognet: If the connection doesn't have a owner then it may not
+ * be referenced anywhere, just kill it now, even if it could be reused.
+ * To be revisited later when revisited later when we handle connection
+ * pools properly.
*/
if (((s->txn->flags & TX_CON_WANT_MSK) != TX_CON_WANT_KAL) ||
- !si_conn_ready(&s->si[1])) {
+ !si_conn_ready(&s->si[1]) || !srv_conn->owner) {
s->si[1].flags |= SI_FL_NOLINGER | SI_FL_NOHALF;
si_shutr(&s->si[1]);
si_shutw(&s->si[1]);
@@ -3805,14 +3803,15 @@
s->target = NULL;
+
- /* only release our endpoint if we don't intend to reuse the
- * connection.
+ /* If we're doing keepalive, first call the mux detach() method
+ * to let it know we want to detach without freing the connection.
+ * We then can call si_release_endpoint() to destroy the conn_stream
*/
if (((s->txn->flags & TX_CON_WANT_MSK) != TX_CON_WANT_KAL) ||
- !si_conn_ready(&s->si[1])) {
- si_release_endpoint(&s->si[1]);
+ !si_conn_ready(&s->si[1]) || !srv_conn->owner)
srv_conn = NULL;
- }
+ si_release_endpoint(&s->si[1]);
s->si[1].state = s->si[1].prev_state = SI_ST_INI;
s->si[1].err_type = SI_ET_NONE;
@@ -3867,18 +3866,18 @@
/* we're in keep-alive with an idle connection, monitor it if not already done */
if (srv_conn && LIST_ISEMPTY(&srv_conn->list)) {
srv = objt_server(srv_conn->target);
- if (!srv)
- si_idle_cs(&s->si[1], NULL);
- else if (srv_conn->flags & CO_FL_PRIVATE)
- si_idle_cs(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL));
- else if (prev_flags & TX_NOT_FIRST)
- /* note: we check the request, not the connection, but
- * this is valid for strategies SAFE and AGGR, and in
- * case of ALWS, we don't care anyway.
- */
- si_idle_cs(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL));
- else
- si_idle_cs(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL));
+ if (srv) {
+ if (srv_conn->flags & CO_FL_PRIVATE)
+ LIST_ADD(&srv->priv_conns[tid], &srv_conn->list);
+ else if (prev_flags & TX_NOT_FIRST)
+ /* note: we check the request, not the connection, but
+ * this is valid for strategies SAFE and AGGR, and in
+ * case of ALWS, we don't care anyway.
+ */
+ LIST_ADD(&srv->safe_conns[tid], &srv_conn->list);
+ else
+ LIST_ADD(&srv->idle_conns[tid], &srv_conn->list);
+ }
}
s->req.analysers = strm_li(s) ? strm_li(s)->analysers : 0;
s->res.analysers = 0;
diff --git a/src/session.c b/src/session.c
index d8c8d36..7d21a6a 100644
--- a/src/session.c
+++ b/src/session.c
@@ -67,11 +67,19 @@
void session_free(struct session *sess)
{
+ struct connection *conn;
+
HA_ATOMIC_SUB(&sess->fe->feconn, 1);
if (sess->listener)
listener_release(sess->listener);
session_store_counters(sess);
vars_prune_per_sess(&sess->vars);
+ conn = objt_conn(sess->origin);
+ if (conn != NULL && conn->mux)
+ conn->mux->destroy(conn);
+ conn = sess->srv_conn;
+ if (conn != NULL && conn->mux)
+ conn->mux->destroy(conn);
pool_free(pool_head_session, sess);
HA_ATOMIC_SUB(&jobs, 1);
}
@@ -377,6 +385,7 @@
conn_stop_tracking(conn);
conn_full_close(conn);
conn_free(conn);
+ sess->origin = NULL;
task_delete(task);
task_free(task);
diff --git a/src/stream_interface.c b/src/stream_interface.c
index f88b432..22c329f 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -53,7 +53,6 @@
static void stream_int_chk_snd_applet(struct stream_interface *si);
int si_cs_recv(struct conn_stream *cs);
static int si_cs_process(struct conn_stream *cs);
-static int si_idle_conn_wake_cb(struct conn_stream *cs);
int si_cs_send(struct conn_stream *cs);
/* stream-interface operations for embedded tasks */
@@ -85,11 +84,6 @@
.name = "STRM",
};
-struct data_cb si_idle_conn_cb = {
- .wake = si_idle_conn_wake_cb,
- .name = "IDLE",
-};
-
/*
* This function only has to be called once after a wakeup event in case of
* suspected timeout. It controls the stream interface timeouts and sets
@@ -410,29 +404,6 @@
return 0;
}
-
-/* Callback to be used by connection I/O handlers when some activity is detected
- * on an idle server connection. Its main purpose is to kill the connection once
- * a close was detected on it. It returns 0 if it did nothing serious, or -1 if
- * it killed the connection.
- */
-static int si_idle_conn_wake_cb(struct conn_stream *cs)
-{
- struct connection *conn = cs->conn;
- struct stream_interface *si = cs->data;
-
- if (!conn_ctrl_ready(conn))
- return 0;
-
- conn_sock_drain(conn);
-
- if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) {
- /* warning, we can't do anything on <conn> after this call ! */
- si_release_endpoint(si);
- return -1;
- }
- return 0;
-}
/* This function is the equivalent to stream_int_update() except that it's
* designed to be called from outside the stream handlers, typically the lower