MAJOR: sessions: Store multiple outgoing connections in the session.
Instead of just storing the last connection in the session, store all of
the connections, for at most MAX_SRV_LIST (currently 5) targets.
That way we can do keepalive on more than 1 outgoing connection when the
client uses HTTP/2.
diff --git a/include/proto/connection.h b/include/proto/connection.h
index a1b979e..3f48b15 100644
--- a/include/proto/connection.h
+++ b/include/proto/connection.h
@@ -662,13 +662,8 @@
/* Releases a connection previously allocated by conn_new() */
static inline void conn_free(struct connection *conn)
{
- struct session *sess, *sess_back;
-
- list_for_each_entry_safe(sess, sess_back, &conn->session_list, conn_list) {
- sess->srv_conn = NULL;
- LIST_DEL(&sess->conn_list);
- LIST_INIT(&sess->conn_list);
- }
+ /* Remove ourself from the session's connections list, if any. */
+ LIST_DEL(&conn->session_list);
/* If we temporarily stored the connection as the stream_interface's
* end point, remove it.
*/
diff --git a/include/proto/session.h b/include/proto/session.h
index 68f5c0d..bc5498a 100644
--- a/include/proto/session.h
+++ b/include/proto/session.h
@@ -71,6 +71,46 @@
}
}
+static inline void session_add_conn(struct session *sess, struct connection *conn, void *target)
+{
+ int avail = -1;
+ int i;
+
+ for (i = 0; i < MAX_SRV_LIST; i++) {
+ if (sess->srv_list[i].target == target) {
+ avail = i;
+ break;
+ }
+ if (LIST_ISEMPTY(&sess->srv_list[i].list) && avail == -1)
+ avail = i;
+ }
+ if (avail == -1) {
+ struct connection *conn, *conn_back;
+ int count = 0;
+ /* We have no slot free, let's free the one with the fewer connections */
+ for (i = 0; i < MAX_SRV_LIST; i++) {
+ int count_list = 0;
+ list_for_each_entry(conn, &sess->srv_list[i].list, session_list)
+ count_list++;
+ if (count == 0 || count_list < count) {
+ count = count_list;
+ avail = i;
+ }
+ }
+ /* Now unown all the connections */
+ list_for_each_entry_safe(conn, conn_back, &sess->srv_list[avail].list, session_list) {
+ conn->owner = NULL;
+ LIST_DEL(&conn->session_list);
+ LIST_INIT(&conn->session_list);
+ if (conn->mux)
+ conn->mux->destroy(conn);
+ }
+
+ }
+ sess->srv_list[avail].target = target;
+ LIST_ADDQ(&sess->srv_list[avail].list, &conn->session_list);
+}
+
#endif /* _PROTO_SESSION_H */
diff --git a/include/types/connection.h b/include/types/connection.h
index dbf985b..85afca0 100644
--- a/include/types/connection.h
+++ b/include/types/connection.h
@@ -414,7 +414,7 @@
struct wait_event *send_wait; /* Task to wake when we're ready to send */
struct wait_event *recv_wait; /* Task to wake when we're ready to recv */
struct list list; /* attach point to various connection lists (idle, ...) */
- struct list session_list; /* List of all sessions attached to this connection */
+ struct list session_list; /* List of attached connections to a session */
int xprt_st; /* transport layer state, initialized to zero */
int tmp_early_data; /* 1st byte of early data, if any */
int sent_early_data; /* Amount of early data we sent so far */
diff --git a/include/types/session.h b/include/types/session.h
index e0e1455..334a071 100644
--- a/include/types/session.h
+++ b/include/types/session.h
@@ -37,6 +37,13 @@
#include <types/task.h>
#include <types/vars.h>
+struct sess_srv_list {
+ void *target;
+ struct list list;
+};
+
+#define MAX_SRV_LIST 5
+
struct session {
struct proxy *fe; /* the proxy this session depends on for the client side */
struct listener *listener; /* the listener by which the request arrived */
@@ -47,8 +54,7 @@
struct vars vars; /* list of variables for the session scope. */
struct task *task; /* handshake timeout processing */
long t_handshake; /* handshake duration, -1 = not completed */
- struct connection *srv_conn; /* Server connection we last used */
- struct list conn_list; /* List element for the session list in each connection */
+ struct sess_srv_list srv_list[MAX_SRV_LIST]; /* List of servers and the connections the session is currently responsible for */
};
#endif /* _TYPES_SESSION_H */
diff --git a/src/backend.c b/src/backend.c
index fa01a7d..cde3c2f 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -52,6 +52,7 @@
#include <proto/queue.h>
#include <proto/sample.h>
#include <proto/server.h>
+#include <proto/session.h>
#include <proto/stream.h>
#include <proto/stream_interface.h>
#include <proto/task.h>
@@ -559,8 +560,9 @@
{
struct connection *conn;
struct server *conn_slot;
- struct server *srv, *prev_srv;
+ struct server *srv = NULL, *prev_srv;
int err;
+ int i;
DPRINTF(stderr,"assign_server : s=%p\n",s);
@@ -584,27 +586,27 @@
srv = NULL;
s->target = NULL;
- conn = s->sess->srv_conn;
- if (conn &&
- (conn->flags & CO_FL_CONNECTED) &&
- objt_server(conn->target) && __objt_server(conn->target)->proxy == s->be &&
- (s->be->lbprm.algo & BE_LB_KIND) != BE_LB_KIND_HI &&
- ((s->txn && s->txn->flags & TX_PREFER_LAST) ||
- ((s->be->options & PR_O_PREF_LAST) &&
- (!s->be->max_ka_queue ||
- server_has_room(__objt_server(conn->target)) ||
- (__objt_server(conn->target)->nbpend + 1) < s->be->max_ka_queue))) &&
- srv_currently_usable(__objt_server(conn->target))) {
- /* This stream was relying on a server in a previous request
- * and the proxy has "option prefer-last-server" set
- * and balance algorithm dont tell us to do otherwise, so
- * let's try to reuse the same server.
- */
- srv = __objt_server(conn->target);
- s->target = &srv->obj_type;
+ for (i = 0; i < MAX_SRV_LIST; i++) {
+ list_for_each_entry(conn, &s->sess->srv_list[i].list, session_list) {
+ if (conn->flags & CO_FL_CONNECTED &&
+ objt_server(conn->target) &&
+ __objt_server(conn->target)->proxy == s->be &&
+ (s->be->lbprm.algo & BE_LB_KIND) != BE_LB_KIND_HI &&
+ ((s->txn && s->txn->flags & TX_PREFER_LAST) ||
+ ((s->be->options & PR_O_PREF_LAST) &&
+ (!s->be->max_ka_queue ||
+ server_has_room(__objt_server(conn->target)) ||
+ (__objt_server(conn->target)->nbpend + 1) < s->be->max_ka_queue))) &&
+ srv_currently_usable(__objt_server(conn->target))) {
+ srv = __objt_server(conn->target);
+ s->target = &srv->obj_type;
+ goto out_ok;
+
+ }
+ }
}
- else if (s->be->lbprm.algo & BE_LB_KIND) {
+ if (s->be->lbprm.algo & BE_LB_KIND) {
/* we must check if we have at least one server available */
if (!s->be->lbprm.tot_weight) {
@@ -748,6 +750,7 @@
goto out;
}
+out_ok:
s->flags |= SF_ASSIGNED;
err = SRV_STATUS_OK;
out:
@@ -1100,20 +1103,39 @@
int connect_server(struct stream *s)
{
struct connection *cli_conn = NULL;
- struct connection *srv_conn;
- struct connection *old_conn;
+ struct connection *srv_conn = NULL;
+ struct connection *old_conn = NULL;
struct conn_stream *srv_cs;
struct server *srv;
int reuse = 0;
int err;
+ int i;
+ for (i = 0; i < MAX_SRV_LIST; i++) {
+ if (s->sess->srv_list[i].target == s->target) {
+ list_for_each_entry(srv_conn, &s->sess->srv_list[i].list,
+ session_list) {
+ if (conn_xprt_ready(srv_conn) &&
+ srv_conn->mux && (srv_conn->mux->avail_streams(srv_conn) > 0)) {
+ reuse = 1;
+ break;
+ }
+ }
+ }
+ }
+ if (!srv_conn) {
+ for (i = 0; i < MAX_SRV_LIST; i++) {
+ if (!LIST_ISEMPTY(&s->sess->srv_list[i].list)) {
+ srv_conn = LIST_ELEM(&s->sess->srv_list[i].list,
+ struct connection *, session_list);
+ break;
+ }
+ }
+ }
+ old_conn = srv_conn;
+
srv = objt_server(s->target);
- old_conn = srv_conn = s->sess->srv_conn;
- if (srv_conn)
- reuse = (s->target == srv_conn->target) &&
- conn_xprt_ready(srv_conn) && srv_conn->mux &&
- (srv_conn->mux->avail_streams(srv_conn) > 0);
if (srv && !reuse) {
srv_conn = NULL;
@@ -1176,55 +1198,25 @@
/* We're about to use another connection, let the mux know we're
* done with this one
*/
- if (old_conn != srv_conn) {
- int did_switch = 0;
+ if (old_conn != srv_conn || !reuse) {
if (srv_conn && reuse) {
- struct session *sess;
- int count = 0;
+ struct session *sess = srv_conn->owner;
- /*
- * If we're attempting to reuse a connection, and
- * the new connection has only one user, and there
- * are no more streams available, attempt to give
- * it our old connection
- */
- list_for_each_entry(sess, &srv_conn->session_list,
- conn_list) {
- count++;
- if (count > 1)
- break;
- }
- if (count == 1) {
- sess = LIST_ELEM(srv_conn->session_list.n,
- struct session *, conn_list);
- LIST_DEL(&sess->conn_list);
+ if (sess) {
if (old_conn &&
!(old_conn->flags & CO_FL_PRIVATE) &&
old_conn->mux != NULL &&
(old_conn->mux->avail_streams(old_conn) > 0) &&
(srv_conn->mux->avail_streams(srv_conn) == 1)) {
- LIST_ADDQ(&old_conn->session_list, &sess->conn_list);
- sess->srv_conn = old_conn;
- did_switch = 1;
- } else {
- LIST_INIT(&sess->conn_list);
- sess->srv_conn = NULL;
+ LIST_DEL(&old_conn->session_list);
+ LIST_INIT(&old_conn->session_list);
+ session_add_conn(sess, old_conn, s->target);
+ old_conn->owner = sess;
}
}
}
- /*
- * We didn't manage to give our old connection, destroy it
- */
- if (old_conn && !did_switch) {
- old_conn->owner = NULL;
- LIST_DEL(&old_conn->list);
- LIST_INIT(&old_conn->list);
- if (old_conn->mux)
- old_conn->mux->destroy(old_conn);
- old_conn = NULL;
- }
}
if (!reuse) {
@@ -1242,9 +1234,8 @@
}
if (srv_conn && old_conn != srv_conn) {
srv_conn->owner = s->sess;
- s->sess->srv_conn = srv_conn;
- LIST_DEL(&s->sess->conn_list);
- LIST_ADDQ(&srv_conn->session_list, &s->sess->conn_list);
+ LIST_DEL(&srv_conn->session_list);
+ session_add_conn(s->sess, srv_conn, s->target);
}
if (!srv_conn)
diff --git a/src/session.c b/src/session.c
index c23d35c..8080c94 100644
--- a/src/session.c
+++ b/src/session.c
@@ -41,6 +41,7 @@
struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type *origin)
{
struct session *sess;
+ int i;
sess = pool_alloc(pool_head_session);
if (sess) {
@@ -53,21 +54,24 @@
vars_init(&sess->vars, SCOPE_SESS);
sess->task = NULL;
sess->t_handshake = -1; /* handshake not done yet */
- LIST_INIT(&sess->conn_list);
- sess->srv_conn = NULL;
HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.conn_max,
HA_ATOMIC_ADD(&fe->feconn, 1));
if (li)
proxy_inc_fe_conn_ctr(li, fe);
HA_ATOMIC_ADD(&totalconn, 1);
HA_ATOMIC_ADD(&jobs, 1);
+ for (i = 0; i < MAX_SRV_LIST; i++) {
+ sess->srv_list[i].target = NULL;
+ LIST_INIT(&sess->srv_list[i].list);
+ }
}
return sess;
}
void session_free(struct session *sess)
{
- struct connection *conn;
+ struct connection *conn, *conn_back;
+ int i;
HA_ATOMIC_SUB(&sess->fe->feconn, 1);
if (sess->listener)
@@ -77,21 +81,25 @@
conn = objt_conn(sess->origin);
if (conn != NULL && conn->mux)
conn->mux->destroy(conn);
- conn = sess->srv_conn;
- if (conn != NULL && conn->mux) {
- LIST_DEL(&conn->list);
- LIST_INIT(&conn->list);
- conn->owner = NULL;
- conn->mux->destroy(conn);
- } else if (conn) {
- /* We have a connection, but not yet an associated mux.
- * So destroy it now.
- */
- conn_stop_tracking(conn);
- conn_full_close(conn);
- conn_free(conn);
+ for (i = 0; i < MAX_SRV_LIST; i++) {
+ int count = 0;
+ list_for_each_entry_safe(conn, conn_back, &sess->srv_list[i].list, session_list) {
+ count++;
+ if (conn->mux) {
+ LIST_DEL(&conn->session_list);
+ LIST_INIT(&conn->session_list);
+ conn->owner = NULL;
+ conn->mux->destroy(conn);
+ } else {
+ /* We have a connection, but not yet an associated mux.
+ * So destroy it now.
+ */
+ conn_stop_tracking(conn);
+ conn_full_close(conn);
+ conn_free(conn);
+ }
+ }
}
- LIST_DEL(&sess->conn_list);
pool_free(pool_head_session, sess);
HA_ATOMIC_SUB(&jobs, 1);
}