MAJOR: connection : Split struct connection into struct connection and struct conn_stream.

All the references to connections in the data path from streams and
stream_interfaces were changed to use conn_streams. Most functions named
"something_conn" were renamed to "something_cs" for this. Sometimes the
connection still is what matters (eg during a connection establishment)
and were not always renamed. The change is significant and minimal at the
same time, and was quite thoroughly tested now. As of this patch, all
accesses to the connection from upper layers go through the pass-through
mux.
diff --git a/include/proto/connection.h b/include/proto/connection.h
index a2bcc97..e55ec8b 100644
--- a/include/proto/connection.h
+++ b/include/proto/connection.h
@@ -315,25 +315,25 @@
 	c->flags &= ~CO_FL_XPRT_RD_ENA;
 }
 
-static inline void __cs_data_want_recv(struct conn_stream *cs)
+static inline void __cs_want_recv(struct conn_stream *cs)
 {
 	cs->flags |= CS_FL_DATA_RD_ENA;
 }
 
-static inline void __cs_data_stop_recv(struct conn_stream *cs)
+static inline void __cs_stop_recv(struct conn_stream *cs)
 {
 	cs->flags &= ~CS_FL_DATA_RD_ENA;
 }
 
-static inline void cs_data_want_recv(struct conn_stream *cs)
+static inline void cs_want_recv(struct conn_stream *cs)
 {
-	__cs_data_want_recv(cs);
+	__cs_want_recv(cs);
 	cs_update_mux_polling(cs);
 }
 
-static inline void cs_data_stop_recv(struct conn_stream *cs)
+static inline void cs_stop_recv(struct conn_stream *cs)
 {
-	__cs_data_stop_recv(cs);
+	__cs_stop_recv(cs);
 	cs_update_mux_polling(cs);
 }
 
@@ -366,36 +366,36 @@
 	c->flags &= ~(CO_FL_XPRT_WR_ENA | CO_FL_XPRT_RD_ENA);
 }
 
-static inline void __cs_data_want_send(struct conn_stream *cs)
+static inline void __cs_want_send(struct conn_stream *cs)
 {
 	cs->flags |= CS_FL_DATA_WR_ENA;
 }
 
-static inline void __cs_data_stop_send(struct conn_stream *cs)
+static inline void __cs_stop_send(struct conn_stream *cs)
 {
 	cs->flags &= ~CS_FL_DATA_WR_ENA;
 }
 
-static inline void cs_data_stop_send(struct conn_stream *cs)
+static inline void cs_stop_send(struct conn_stream *cs)
 {
-	__cs_data_stop_send(cs);
+	__cs_stop_send(cs);
 	cs_update_mux_polling(cs);
 }
 
-static inline void cs_data_want_send(struct conn_stream *cs)
+static inline void cs_want_send(struct conn_stream *cs)
 {
-	__cs_data_want_send(cs);
+	__cs_want_send(cs);
 	cs_update_mux_polling(cs);
 }
 
-static inline void __cs_data_stop_both(struct conn_stream *cs)
+static inline void __cs_stop_both(struct conn_stream *cs)
 {
 	cs->flags &= ~(CS_FL_DATA_WR_ENA | CS_FL_DATA_RD_ENA);
 }
 
-static inline void cs_data_stop_both(struct conn_stream *cs)
+static inline void cs_stop_both(struct conn_stream *cs)
 {
-	__cs_data_stop_both(cs);
+	__cs_stop_both(cs);
 	cs_update_mux_polling(cs);
 }
 
@@ -537,6 +537,45 @@
 		c->xprt->shutw(c, 0);
 }
 
+/* shut read after draining possibly pending data */
+static inline void cs_shutr(struct conn_stream *cs)
+{
+	__cs_stop_recv(cs);
+
+	/* clean data-layer shutdown */
+	if (cs->conn->mux && cs->conn->mux->shutr)
+		cs->conn->mux->shutr(cs, 1);
+}
+
+/* shut read after disabling lingering */
+static inline void cs_shutr_hard(struct conn_stream *cs)
+{
+	__cs_stop_recv(cs);
+
+	/* clean data-layer shutdown */
+	if (cs->conn->mux && cs->conn->mux->shutr)
+		cs->conn->mux->shutr(cs, 0);
+}
+
+static inline void cs_shutw(struct conn_stream *cs)
+{
+	__cs_stop_send(cs);
+
+	/* clean data-layer shutdown */
+	if (cs->conn->mux && cs->conn->mux->shutw)
+		cs->conn->mux->shutw(cs, 1);
+}
+
+static inline void cs_shutw_hard(struct conn_stream *cs)
+{
+	__cs_stop_send(cs);
+
+	/* unclean data-layer shutdown */
+	if (cs->conn->mux && cs->conn->mux->shutw)
+		cs->conn->mux->shutw(cs, 0);
+}
+
+
 /* detect sock->data read0 transition */
 static inline int conn_xprt_read0_pending(struct connection *c)
 {
@@ -576,7 +615,6 @@
 {
 	conn->obj_type = OBJ_TYPE_CONN;
 	conn->flags = CO_FL_NONE;
-	conn->data = NULL;
 	conn->tmp_early_data = -1;
 	conn->mux = NULL;
 	conn->mux_ctx = NULL;
@@ -622,31 +660,43 @@
 	return conn;
 }
 
-/* Tries to allocate a new conn_stream and initialize its main fields. The
- * connection is returned on success, NULL on failure. The connection must
- * be released using pool_free2() or conn_free().
+/* Releases a conn_stream previously allocated by cs_new() */
+static inline void cs_free(struct conn_stream *cs)
+{
+	pool_free2(pool2_connstream, cs);
+}
+
+/* Tries to allocate a new conn_stream and initialize its main fields. If
+ * <conn> is NULL, then a new connection is allocated on the fly, initialized,
+ * and assigned to cs->conn ; this connection will then have to be released
+ * using pool_free2() or conn_free(). The conn_stream is initialized and added
+ * to the mux's stream list on success, then returned. On failure, nothing is
+ * allocated and NULL is returned.
  */
 static inline struct conn_stream *cs_new(struct connection *conn)
 {
 	struct conn_stream *cs;
 
 	cs = pool_alloc2(pool2_connstream);
-	if (likely(cs != NULL))
-		cs_init(cs, conn);
-	return cs;
-}
+	if (!likely(cs))
+		return NULL;
 
-/* Releases a conn_stream previously allocated by cs_new() */
-static inline void cs_free(struct conn_stream *cs)
-{
-	pool_free2(pool2_connstream, cs);
+	if (!conn) {
+		conn = conn_new();
+		if (!likely(conn)) {
+			cs_free(cs);
+			return NULL;
+		}
+		conn_init(conn);
+	}
+
+	cs_init(cs, conn);
+	return cs;
 }
 
 /* Releases a connection previously allocated by conn_new() */
 static inline void conn_free(struct connection *conn)
 {
-	if (conn->mux && conn->mux->release)
-		conn->mux->release(conn);
 	pool_free2(pool2_connection, conn);
 }
 
@@ -700,11 +750,11 @@
 	conn->flags |= CO_FL_ADDR_TO_SET;
 }
 
-/* Attaches a connection to an owner and assigns a data layer */
-static inline void conn_attach(struct connection *conn, void *owner, const struct data_cb *data)
+/* Attaches a conn_stream to a data layer and sets the relevant callbacks */
+static inline void cs_attach(struct conn_stream *cs, void *data, const struct data_cb *data_cb)
 {
-	conn->data = data;
-	conn->owner = owner;
+	cs->data_cb = data_cb;
+	cs->data = data;
 }
 
 /* Installs the connection's mux layer for upper context <ctx>.
@@ -789,11 +839,11 @@
 	return conn->mux->name;
 }
 
-static inline const char *conn_get_data_name(const struct connection *conn)
+static inline const char *cs_get_data_name(const struct conn_stream *cs)
 {
-	if (!conn->data)
+	if (!cs->data_cb)
 		return "NONE";
-	return conn->data->name;
+	return cs->data_cb->name;
 }
 
 /* registers pointer to transport layer <id> (XPRT_*) */
diff --git a/include/proto/stream.h b/include/proto/stream.h
index 3efb42b..f0edc2e 100644
--- a/include/proto/stream.h
+++ b/include/proto/stream.h
@@ -36,7 +36,7 @@
 extern struct data_cb sess_conn_cb;
 
 struct stream *stream_new(struct session *sess, enum obj_type *origin);
-int stream_create_from_conn(struct connection *conn);
+int stream_create_from_cs(struct conn_stream *cs);
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_stream();
diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h
index c6578ef..ee1fa12 100644
--- a/include/proto/stream_interface.h
+++ b/include/proto/stream_interface.h
@@ -152,18 +152,14 @@
  */
 static inline void si_release_endpoint(struct stream_interface *si)
 {
-	struct connection *conn;
+	struct conn_stream *cs;
 	struct appctx *appctx;
 
 	if (!si->end)
 		return;
 
-	if ((conn = objt_conn(si->end))) {
-		LIST_DEL(&conn->list);
-		conn_stop_tracking(conn);
-		conn_full_close(conn);
-		conn_free(conn);
-	}
+	if ((cs = objt_cs(si->end)))
+		cs_destroy(cs);
 	else if ((appctx = objt_appctx(si->end))) {
 		if (appctx->applet->release && si->state < SI_ST_DIS)
 			appctx->applet->release(appctx);
@@ -178,26 +174,27 @@
  * connection will also be added at the head of this list. This connection
  * remains assigned to the stream interface it is currently attached to.
  */
-static inline void si_idle_conn(struct stream_interface *si, struct list *pool)
+static inline void si_idle_cs(struct stream_interface *si, struct list *pool)
 {
-	struct connection *conn = __objt_conn(si->end);
+	struct conn_stream *cs = __objt_cs(si->end);
+	struct connection *conn = cs->conn;
 
 	if (pool)
 		LIST_ADD(pool, &conn->list);
 
-	conn_attach(conn, si, &si_idle_conn_cb);
-	conn_xprt_want_recv(conn);
+	cs_attach(cs, si, &si_idle_conn_cb);
+	cs_want_recv(cs);
 }
 
-/* Attach connection <conn> to the stream interface <si>. The stream interface
+/* Attach conn_stream <cs> to the stream interface <si>. The stream interface
  * is configured to work with a connection and the connection it configured
  * with a stream interface data layer.
  */
-static inline void si_attach_conn(struct stream_interface *si, struct connection *conn)
+static inline void si_attach_cs(struct stream_interface *si, struct conn_stream *cs)
 {
 	si->ops = &si_conn_ops;
-	si->end = &conn->obj_type;
-	conn_attach(conn, si, &si_conn_cb);
+	si->end = &cs->obj_type;
+	cs_attach(cs, si, &si_conn_cb);
 }
 
 /* Returns true if a connection is attached to the stream interface <si> and
@@ -205,7 +202,7 @@
  */
 static inline int si_conn_ready(struct stream_interface *si)
 {
-	struct connection *conn = objt_conn(si->end);
+	struct connection *conn = cs_conn(objt_cs(si->end));
 
 	return conn && conn_ctrl_ready(conn) && conn_xprt_ready(conn);
 }
@@ -276,22 +273,22 @@
 	si->flags &= ~SI_FL_WANT_GET;
 }
 
-/* Try to allocate a new connection and assign it to the interface. If
+/* Try to allocate a new conn_stream and assign it to the interface. If
  * an endpoint was previously allocated, it is released first. The newly
- * allocated connection is initialized, assigned to the stream interface,
+ * allocated conn_stream is initialized, assigned to the stream interface,
  * and returned.
  */
-static inline struct connection *si_alloc_conn(struct stream_interface *si)
+static inline struct conn_stream *si_alloc_cs(struct stream_interface *si, struct connection *conn)
 {
-	struct connection *conn;
+	struct conn_stream *cs;
 
 	si_release_endpoint(si);
 
-	conn = conn_new();
-	if (conn)
-		si_attach_conn(si, conn);
+	cs = cs_new(conn);
+	if (cs)
+		si_attach_cs(si, cs);
 
-	return conn;
+	return cs;
 }
 
 /* Release the interface's existing endpoint (connection or appctx) and
@@ -346,7 +343,8 @@
 /* Calls chk_snd on the connection using the ctrl layer */
 static inline int si_connect(struct stream_interface *si)
 {
-	struct connection *conn = objt_conn(si->end);
+	struct conn_stream *cs = objt_cs(si->end);
+	struct connection *conn = cs_conn(cs);
 	int ret = SF_ERR_NONE;
 
 	if (unlikely(!conn || !conn->ctrl || !conn->ctrl->connect))
@@ -364,7 +362,7 @@
 		/* reuse the existing connection */
 		if (!channel_is_empty(si_oc(si))) {
 			/* we'll have to send a request there. */
-			conn_xprt_want_send(conn);
+			cs_want_send(cs);
 		}
 
 		/* the connection is established */
diff --git a/include/types/checks.h b/include/types/checks.h
index 3559f2d..ac3e7b6 100644
--- a/include/types/checks.h
+++ b/include/types/checks.h
@@ -157,7 +157,7 @@
 
 struct check {
 	struct xprt_ops *xprt;			/* transport layer operations for health checks */
-	struct connection *conn;		/* connection state for health checks */
+	struct conn_stream *cs;			/* conn_stream state for health checks */
 	unsigned short port;			/* the port to use for the health checks */
 	struct buffer *bi, *bo;			/* input and output buffers to send/recv check */
 	struct task *task;			/* the task associated to the health check processing, NULL if disabled */
diff --git a/include/types/connection.h b/include/types/connection.h
index 66ec1b6..0d62efc 100644
--- a/include/types/connection.h
+++ b/include/types/connection.h
@@ -288,9 +288,9 @@
  * data movement. It may abort a connection by returning < 0.
  */
 struct data_cb {
-	void (*recv)(struct connection *conn);  /* data-layer recv callback */
-	void (*send)(struct connection *conn);  /* data-layer send callback */
-	int  (*wake)(struct connection *conn);  /* data-layer callback to report activity */
+	void (*recv)(struct conn_stream *cs);  /* data-layer recv callback */
+	void (*send)(struct conn_stream *cs);  /* data-layer send callback */
+	int  (*wake)(struct conn_stream *cs);  /* data-layer callback to report activity */
 	char name[8];                           /* data layer name, zero-terminated */
 };
 
@@ -347,10 +347,9 @@
 	const struct protocol *ctrl;  /* operations at the socket layer */
 	const struct xprt_ops *xprt;  /* operations at the transport layer */
 	const struct mux_ops  *mux;   /* mux layer opreations. Must be set before xprt->init() */
-	const struct data_cb  *data;  /* data layer callbacks. Must be set before xprt->init() */
 	void *xprt_ctx;               /* general purpose pointer, initialized to NULL */
 	void *mux_ctx;                /* mux-specific context, initialized to NULL */
-	void *owner;                  /* pointer to upper layer's entity (eg: session, stream interface) */
+	void *owner;                  /* pointer to the owner session for incoming connections, or NULL */
 	int xprt_st;                  /* transport layer state, initialized to zero */
 	int tmp_early_data;           /* 1st byte of early data, if any */
 	union conn_handle handle;     /* connection handle at the socket layer */
diff --git a/src/backend.c b/src/backend.c
index 4d44e54..9dbbd91 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -567,7 +567,7 @@
 
 	srv = NULL;
 	s->target = NULL;
-	conn = objt_conn(s->si[1].end);
+	conn = cs_conn(objt_cs(s->si[1].end));
 
 	if (conn &&
 	    (conn->flags & CO_FL_CONNECTED) &&
@@ -720,8 +720,7 @@
 		s->target = &s->be->obj_type;
 	}
 	else if ((s->be->options & PR_O_HTTP_PROXY) &&
-		 (conn = objt_conn(s->si[1].end)) &&
-		 is_addr(&conn->addr.to)) {
+		 conn && is_addr(&conn->addr.to)) {
 		/* in proxy mode, we need a valid destination address */
 		s->target = &s->be->obj_type;
 	}
@@ -769,7 +768,7 @@
 int assign_server_address(struct stream *s)
 {
 	struct connection *cli_conn = objt_conn(strm_orig(s));
-	struct connection *srv_conn = objt_conn(s->si[1].end);
+	struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end));
 
 #ifdef DEBUG_FULL
 	fprintf(stderr,"assign_server_address : s=%p\n",s);
@@ -973,7 +972,7 @@
 	struct server *srv = objt_server(s->target);
 	struct conn_src *src;
 	struct connection *cli_conn;
-	struct connection *srv_conn = objt_conn(s->si[1].end);
+	struct connection *srv_conn = cs_conn(objt_cs(s->si[1].end));
 
 	if (srv && srv->conn_src.opts & CO_SRC_BIND)
 		src = &srv->conn_src;
@@ -1041,21 +1040,23 @@
 {
 	struct connection *cli_conn;
 	struct connection *srv_conn;
-	struct connection *old_conn;
+	struct conn_stream *srv_cs;
+	struct conn_stream *old_cs;
 	struct server *srv;
 	int reuse = 0;
 	int err;
 
 	srv = objt_server(s->target);
-	srv_conn = objt_conn(s->si[1].end);
+	srv_cs = objt_cs(s->si[1].end);
+	srv_conn = cs_conn(srv_cs);
 	if (srv_conn)
 		reuse = s->target == srv_conn->target;
 
 	if (srv && !reuse) {
-		old_conn = srv_conn;
-		if (old_conn) {
+		old_cs = srv_cs;
+		if (old_cs) {
 			srv_conn = NULL;
-			old_conn->owner = NULL;
+			srv_cs->data = NULL;
 			si_detach_endpoint(&s->si[1]);
 			/* note: if the connection was in a server's idle
 			 * queue, it doesn't get dequeued.
@@ -1101,23 +1102,25 @@
 			LIST_DEL(&srv_conn->list);
 			LIST_INIT(&srv_conn->list);
 
-			if (srv_conn->owner) {
-				si_detach_endpoint(srv_conn->owner);
-				if (old_conn && !(old_conn->flags & CO_FL_PRIVATE)) {
-					si_attach_conn(srv_conn->owner, old_conn);
-					si_idle_conn(srv_conn->owner, NULL);
+			/* XXX cognet: this assumes only 1 conn_stream per
+			 * connection, has to be revisited later
+			 */
+			srv_cs = srv_conn->mux_ctx;
+
+			if (srv_conn->mux == &mux_pt_ops && srv_cs->data) {
+				si_detach_endpoint(srv_cs->data);
+				if (old_cs && !(old_cs->conn->flags & CO_FL_PRIVATE)) {
+					si_attach_cs(srv_cs->data, old_cs);
+					si_idle_cs(srv_cs->data, NULL);
 				}
 			}
-			si_attach_conn(&s->si[1], srv_conn);
+			si_attach_cs(&s->si[1], srv_cs);
 			reuse = 1;
 		}
 
 		/* we may have to release our connection if we couldn't swap it */
-		if (old_conn && !old_conn->owner) {
-			LIST_DEL(&old_conn->list);
-			conn_full_close(old_conn);
-			conn_free(old_conn);
-		}
+		if (old_cs && !old_cs->data)
+			cs_destroy(old_cs);
 	}
 
 	if (reuse) {
@@ -1136,15 +1139,16 @@
 		}
 	}
 
-	if (!reuse)
-		srv_conn = si_alloc_conn(&s->si[1]);
-	else {
+	if (!reuse) {
+		srv_cs = si_alloc_cs(&s->si[1], NULL);
+		srv_conn = cs_conn(srv_cs);
+	} else {
 		/* reusing our connection, take it out of the idle list */
 		LIST_DEL(&srv_conn->list);
 		LIST_INIT(&srv_conn->list);
 	}
 
-	if (!srv_conn)
+	if (!srv_cs)
 		return SF_ERR_RESOURCE;
 
 	if (!(s->flags & SF_ADDR_SET)) {
@@ -1160,14 +1164,16 @@
 		/* set the correct protocol on the output stream interface */
 		if (srv) {
 			conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), srv->xprt);
-			conn_install_mux(srv_conn, &mux_pt_ops, srv_conn);
+			/* XXX: Pick the right mux, when we finally have one */
+			conn_install_mux(srv_conn, &mux_pt_ops, srv_cs);
 		}
 		else if (obj_type(s->target) == OBJ_TYPE_PROXY) {
 			/* proxies exclusively run on raw_sock right now */
 			conn_prepare(srv_conn, protocol_by_family(srv_conn->addr.to.ss_family), xprt_get(XPRT_RAW));
-			if (!objt_conn(s->si[1].end) || !objt_conn(s->si[1].end)->ctrl)
+			if (!objt_cs(s->si[1].end) || !objt_cs(s->si[1].end)->conn->ctrl)
 				return SF_ERR_INTERNAL;
-			conn_install_mux(srv_conn, &mux_pt_ops, srv_conn);
+			/* XXX: Pick the right mux, when we finally have one */
+			conn_install_mux(srv_conn, &mux_pt_ops, srv_cs);
 		}
 		else
 			return SF_ERR_INTERNAL;  /* how did we get there ? */
@@ -1182,13 +1188,13 @@
 				conn_get_to_addr(cli_conn);
 		}
 
-		si_attach_conn(&s->si[1], srv_conn);
+		si_attach_cs(&s->si[1], srv_cs);
 
 		assign_tproxy_address(s);
 	}
 	else {
 		/* the connection is being reused, just re-attach it */
-		si_attach_conn(&s->si[1], srv_conn);
+		si_attach_cs(&s->si[1], srv_cs);
 		s->flags |= SF_SRV_REUSED;
 	}
 
diff --git a/src/checks.c b/src/checks.c
index b717d38..aff5ff3 100644
--- a/src/checks.c
+++ b/src/checks.c
@@ -582,7 +582,8 @@
  */
 static void chk_report_conn_err(struct check *check, int errno_bck, int expired)
 {
-	struct connection *conn = check->conn;
+	struct conn_stream *cs = check->cs;
+	struct connection *conn = cs_conn(cs);
 	const char *err_msg;
 	struct chunk *chk;
 	int step;
@@ -705,9 +706,10 @@
  * it sends the request. In other cases, it calls set_server_check_status()
  * to set check->status, check->duration and check->result.
  */
-static void event_srv_chk_w(struct connection *conn)
+static void event_srv_chk_w(struct conn_stream *cs)
 {
-	struct check *check = conn->owner;
+	struct connection *conn = cs->conn;
+	struct check *check = cs->data;
 	struct server *s = check->server;
 	struct task *t = check->task;
 
@@ -719,7 +721,7 @@
 
 	if (retrieve_errno_from_socket(conn)) {
 		chk_report_conn_err(check, errno, 0);
-		__conn_xprt_stop_both(conn);
+		__cs_stop_both(cs);
 		goto out_wakeup;
 	}
 
@@ -741,10 +743,10 @@
 		return;
 
 	if (check->bo->o) {
-		conn->xprt->snd_buf(conn, check->bo, 0);
+		conn->mux->snd_buf(cs, check->bo, 0);
 		if (conn->flags & CO_FL_ERROR) {
 			chk_report_conn_err(check, errno, 0);
-			__conn_xprt_stop_both(conn);
+			__cs_stop_both(cs);
 			goto out_wakeup;
 		}
 		if (check->bo->o)
@@ -761,7 +763,7 @@
  out_wakeup:
 	task_wakeup(t, TASK_WOKEN_IO);
  out_nowake:
-	__conn_xprt_stop_send(conn);   /* nothing more to write */
+	__cs_stop_send(cs);   /* nothing more to write */
 }
 
 /*
@@ -778,9 +780,10 @@
  * call it with a proper error status like HCHK_STATUS_L7STS, HCHK_STATUS_L6RSP,
  * etc.
  */
-static void event_srv_chk_r(struct connection *conn)
+static void event_srv_chk_r(struct conn_stream *cs)
 {
-	struct check *check = conn->owner;
+	struct connection *conn = cs->conn;
+	struct check *check = cs->data;
 	struct server *s = check->server;
 	struct task *t = check->task;
 	char *desc;
@@ -815,7 +818,7 @@
 
 	done = 0;
 
-	conn->xprt->rcv_buf(conn, check->bi, check->bi->size);
+	conn->mux->rcv_buf(cs, check->bi, check->bi->size);
 	if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
 		done = 1;
 		if ((conn->flags & CO_FL_ERROR) && !check->bi->i) {
@@ -1339,8 +1342,8 @@
 	 * range quickly.  To avoid sending RSTs all the time, we first try to
 	 * drain pending data.
 	 */
-	__conn_xprt_stop_both(conn);
-	conn_xprt_shutw(conn);
+	__cs_stop_both(cs);
+	cs_shutw(cs);
 
 	/* OK, let's not stay here forever */
 	if (check->result == CHK_RES_FAILED)
@@ -1350,7 +1353,7 @@
 	return;
 
  wait_more_data:
-	__conn_xprt_want_recv(conn);
+	__cs_want_recv(cs);
 }
 
 /*
@@ -1359,15 +1362,17 @@
  * It returns 0 on normal cases, <0 if at least one close() has happened on the
  * connection (eg: reconnect).
  */
-static int wake_srv_chk(struct connection *conn)
+static int wake_srv_chk(struct conn_stream *cs)
 {
-	struct check *check = conn->owner;
+	struct connection *conn = cs->conn;
+	struct check *check = cs->data;
 	int ret = 0;
 
 	/* we may have to make progress on the TCP checks */
 	if (check->type == PR_O2_TCPCHK_CHK) {
 		ret = tcpcheck_main(check);
-		conn = check->conn;
+		cs = check->cs;
+		conn = cs_conn(cs);
 	}
 
 	if (unlikely(conn->flags & CO_FL_ERROR)) {
@@ -1378,8 +1383,7 @@
 		 * we expect errno to still be valid.
 		 */
 		chk_report_conn_err(check, errno, 0);
-
-		__conn_xprt_stop_both(conn);
+		__cs_stop_both(cs);
 		task_wakeup(check->task, TASK_WOKEN_IO);
 	}
 	else if (!(conn->flags & (CO_FL_XPRT_RD_ENA|CO_FL_XPRT_WR_ENA|CO_FL_HANDSHAKE))) {
@@ -1478,7 +1482,8 @@
 {
 	struct check *check = t->context;
 	struct server *s = check->server;
-	struct connection *conn = check->conn;
+	struct conn_stream *cs = check->cs;
+	struct connection *conn = cs_conn(cs);
 	struct protocol *proto;
 	struct tcpcheck_rule *tcp_rule = NULL;
 	int ret;
@@ -1535,9 +1540,10 @@
 	}
 
 	/* prepare a new connection */
-	conn = check->conn = conn_new();
-	if (!check->conn)
+	cs = check->cs = cs_new(NULL);
+	if (!check->cs)
 		return SF_ERR_RESOURCE;
+	conn = cs->conn;
 
 	if (is_addr(&check->addr)) {
 		/* we'll connect to the check addr specified on the server */
@@ -1553,7 +1559,7 @@
 
 		i = srv_check_healthcheck_port(check);
 		if (i == 0) {
-			conn->owner = check;
+			cs->data = check;
 			return SF_ERR_CHK_PORT;
 		}
 
@@ -1563,8 +1569,8 @@
 	proto = protocol_by_family(conn->addr.to.ss_family);
 
 	conn_prepare(conn, proto, check->xprt);
-	conn_install_mux(conn, &mux_pt_ops, conn);
-	conn_attach(conn, check, &check_conn_cb);
+	conn_install_mux(conn, &mux_pt_ops, cs);
+	cs_attach(cs, check, &check_conn_cb);
 	conn->target = &s->obj_type;
 
 	/* no client address */
@@ -2077,7 +2083,8 @@
 {
 	struct check *check = t->context;
 	struct server *s = check->server;
-	struct connection *conn = check->conn;
+	struct conn_stream *cs = check->cs;
+	struct connection *conn = cs_conn(cs);
 	int rv;
 	int ret;
 	int expired = tick_is_expired(t->expire, now_ms);
@@ -2105,7 +2112,8 @@
 		check->bo->o = 0;
 
 		ret = connect_conn_chk(t);
-		conn = check->conn;
+		cs = check->cs;
+		conn = cs_conn(cs);
 
 		switch (ret) {
 		case SF_ERR_UP:
@@ -2123,7 +2131,7 @@
 			}
 
 			if (check->type)
-				conn_xprt_want_recv(conn);   /* prepare for reading a possible reply */
+				cs_want_recv(cs);   /* prepare for reading a possible reply */
 
 			task_set_affinity(t, tid_bit);
 			goto reschedule;
@@ -2147,9 +2155,10 @@
 		}
 
 		/* here, we have seen a synchronous error, no fd was allocated */
-		if (conn) {
-			conn_free(conn);
-			check->conn = conn = NULL;
+		if (cs) {
+			cs_destroy(cs);
+			cs = check->cs = NULL;
+			conn = NULL;
 		}
 
 		check->state &= ~CHK_ST_INPROGRESS;
@@ -2201,8 +2210,9 @@
 		}
 
 		if (conn) {
-			conn_free(conn);
-			check->conn = conn = NULL;
+			cs_destroy(cs);
+			cs = check->cs = NULL;
+			conn = NULL;
 		}
 
 		if (check->result == CHK_RES_FAILED) {
@@ -2550,7 +2560,8 @@
 	char *contentptr, *comment;
 	struct tcpcheck_rule *next;
 	int done = 0, ret = 0, step = 0;
-	struct connection *conn = check->conn;
+	struct conn_stream *cs = check->cs;
+	struct connection *conn = cs_conn(cs);
 	struct server *s = check->server;
 	struct task *t = check->task;
 	struct list *head = check->tcpcheck_rules;
@@ -2619,8 +2630,8 @@
 	}
 
 	/* It's only the rules which will enable send/recv */
-	if (conn)
-		__conn_xprt_stop_both(conn);
+	if (cs)
+		cs_stop_both(cs);
 
 	while (1) {
 		/* We have to try to flush the output buffer before reading, at
@@ -2633,11 +2644,11 @@
 		     check->current_step->action != TCPCHK_ACT_SEND ||
 		     check->current_step->string_len >= buffer_total_space(check->bo))) {
 
-			__conn_xprt_want_send(conn);
-			if (conn->xprt->snd_buf(conn, check->bo, 0) <= 0) {
+			__cs_want_send(cs);
+			if (conn->mux->snd_buf(cs, check->bo, 0) <= 0) {
 				if (conn->flags & CO_FL_ERROR) {
 					chk_report_conn_err(check, errno, 0);
-					__conn_xprt_stop_both(conn);
+					__cs_stop_both(cs);
 					goto out_end_tcpcheck;
 				}
 				break;
@@ -2673,8 +2684,9 @@
 			 *   2: try to get a new connection
 			 *   3: release and replace the old one on success
 			 */
-			if (check->conn) {
-				conn_full_close(check->conn);
+			if (check->cs) {
+				/* XXX: need to kill all CS here as well but not to free them yet */
+				conn_full_close(check->cs->conn);
 				retcode = -1; /* do not reuse the fd! */
 			}
 
@@ -2682,8 +2694,8 @@
 			check->last_started_step = check->current_step;
 
 			/* prepare new connection */
-			conn = conn_new();
-			if (!conn) {
+			cs = cs_new(NULL);
+			if (!cs) {
 				step = tcpcheck_get_step_id(check);
 				chunk_printf(&trash, "TCPCHK error allocating connection at step %d", step);
 				comment = tcpcheck_get_step_comment(check, step);
@@ -2694,11 +2706,15 @@
 				return retcode;
 			}
 
-			if (check->conn)
-				conn_free(check->conn);
-			check->conn = conn;
+			if (check->cs) {
+				if (check->cs->conn)
+					conn_free(check->cs->conn);
+				cs_free(check->cs);
+			}
 
-			conn_attach(conn, check, &check_conn_cb);
+			check->cs = cs;
+			conn = cs->conn;
+			cs_attach(cs, check, &check_conn_cb);
 			conn->target = &s->obj_type;
 
 			/* no client address */
@@ -2727,7 +2743,7 @@
 				xprt = xprt_get(XPRT_RAW);
 			}
 			conn_prepare(conn, proto, xprt);
-			conn_install_mux(conn, &mux_pt_ops, conn);
+			conn_install_mux(conn, &mux_pt_ops, cs);
 
 			ret = SF_ERR_INTERNAL;
 			if (proto->connect)
@@ -2860,8 +2876,8 @@
 			if (unlikely(check->result == CHK_RES_FAILED))
 				goto out_end_tcpcheck;
 
-			__conn_xprt_want_recv(conn);
-			if (conn->xprt->rcv_buf(conn, check->bi, check->bi->size) <= 0) {
+			__cs_want_recv(cs);
+			if (conn->mux->rcv_buf(cs, check->bi, check->bi->size) <= 0) {
 				if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
 					done = 1;
 					if ((conn->flags & CO_FL_ERROR) && !check->bi->i) {
@@ -2958,7 +2974,7 @@
 
 					if (check->current_step->action == TCPCHK_ACT_EXPECT)
 						goto tcpcheck_expect;
-					__conn_xprt_stop_recv(conn);
+					__cs_stop_recv(cs);
 				}
 			}
 			else {
@@ -2978,7 +2994,7 @@
 
 					if (check->current_step->action == TCPCHK_ACT_EXPECT)
 						goto tcpcheck_expect;
-					__conn_xprt_stop_recv(conn);
+					__cs_stop_recv(cs);
 				}
 				/* not matched but was supposed to => ERROR */
 				else {
@@ -3012,11 +3028,11 @@
 
 	/* warning, current_step may now point to the head */
 	if (check->bo->o)
-		__conn_xprt_want_send(conn);
+		__cs_want_send(cs);
 
 	if (&check->current_step->list != head &&
 	    check->current_step->action == TCPCHK_ACT_EXPECT)
-		__conn_xprt_want_recv(conn);
+		__cs_want_recv(cs);
 	return retcode;
 
  out_end_tcpcheck:
@@ -3030,7 +3046,7 @@
 	if (check->result == CHK_RES_FAILED)
 		conn->flags |= CO_FL_ERROR;
 
-	__conn_xprt_stop_both(conn);
+	__cs_stop_both(cs);
 	return retcode;
 }
 
@@ -3049,7 +3065,6 @@
 		return "out of memory while allocating check buffer";
 	}
 	check->bo->size = global.tune.chksize;
-
 	return NULL;
 }
 
@@ -3059,8 +3074,10 @@
 	check->bi = NULL;
 	free(check->bo);
 	check->bo = NULL;
-	free(check->conn);
-	check->conn = NULL;
+	free(check->cs->conn);
+	check->cs->conn = NULL;
+	cs_free(check->cs);
+	check->cs = NULL;
 }
 
 void email_alert_free(struct email_alert *alert)
diff --git a/src/cli.c b/src/cli.c
index 9ca8e1f..b546fd0 100644
--- a/src/cli.c
+++ b/src/cli.c
@@ -1268,7 +1268,7 @@
 	struct cmsghdr *cmsg;
 	struct stream_interface *si = appctx->owner;
 	struct stream *s = si_strm(si);
-	struct connection *remote = objt_conn(si_opposite(si)->end);
+	struct connection *remote = cs_conn(objt_cs(si_opposite(si)->end));
 	struct msghdr msghdr;
 	struct iovec iov;
 	struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
diff --git a/src/frontend.c b/src/frontend.c
index e03e099..24fc0c1 100644
--- a/src/frontend.c
+++ b/src/frontend.c
@@ -101,7 +101,7 @@
 
 		/* try to report the ALPN value when available (also works for NPN) */
 
-		if (conn && conn->owner == &s->si[0]) {
+		if (conn && conn == cs_conn(objt_cs(s->si[0].end))) {
 			if (conn_get_alpn(conn, &alpn_str, &alpn_len) && alpn_str) {
 				int len = MIN(alpn_len, sizeof(alpn) - 1);
 				memcpy(alpn, alpn_str, len);
diff --git a/src/hlua.c b/src/hlua.c
index 4911204..761fa7f 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -1521,7 +1521,7 @@
 static void hlua_socket_handler(struct appctx *appctx)
 {
 	struct stream_interface *si = appctx->owner;
-	struct connection *c = objt_conn(si_opposite(si)->end);
+	struct connection *c = cs_conn(objt_cs(si_opposite(si)->end));
 
 	if (appctx->ctx.hlua_cosocket.die) {
 		si_shutw(si);
@@ -2167,7 +2167,7 @@
 	si = appctx->owner;
 	s = si_strm(si);
 
-	conn = objt_conn(s->si[1].end);
+	conn = cs_conn(objt_cs(s->si[1].end));
 	if (!conn) {
 		xref_unlock(&socket->xref, peer);
 		lua_pushnil(L);
@@ -2217,7 +2217,7 @@
 	si = appctx->owner;
 	s = si_strm(si);
 
-	conn = objt_conn(s->si[1].end);
+	conn = cs_conn(objt_cs(s->si[1].end));
 	if (!conn) {
 		xref_unlock(&socket->xref, peer);
 		lua_pushnil(L);
@@ -2346,7 +2346,7 @@
 	s = si_strm(si);
 
 	/* Initialise connection. */
-	conn = si_alloc_conn(&s->si[1]);
+	conn = cs_conn(si_alloc_cs(&s->si[1], NULL));
 	if (!conn) {
 		xref_unlock(&socket->xref, peer);
 		WILL_LJMP(luaL_error(L, "connect: internal error"));
diff --git a/src/log.c b/src/log.c
index 773662b..88e0d07 100644
--- a/src/log.c
+++ b/src/log.c
@@ -1525,7 +1525,7 @@
 				break;
 
 			case LOG_FMT_BACKENDIP:  // %bi
-				conn = objt_conn(s->si[1].end);
+				conn = cs_conn(objt_cs(s->si[1].end));
 				if (conn)
 					ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp);
 				else
@@ -1538,7 +1538,7 @@
 				break;
 
 			case LOG_FMT_BACKENDPORT:  // %bp
-				conn = objt_conn(s->si[1].end);
+				conn = cs_conn(objt_cs(s->si[1].end));
 				if (conn)
 					ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.from, dst + maxsize - tmplog, tmp);
 				else
@@ -1551,7 +1551,7 @@
 				break;
 
 			case LOG_FMT_SERVERIP: // %si
-				conn = objt_conn(s->si[1].end);
+				conn = cs_conn(objt_cs(s->si[1].end));
 				if (conn)
 					ret = lf_ip(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp);
 				else
@@ -1564,7 +1564,7 @@
 				break;
 
 			case LOG_FMT_SERVERPORT: // %sp
-				conn = objt_conn(s->si[1].end);
+				conn = cs_conn(objt_cs(s->si[1].end));
 				if (conn)
 					ret = lf_port(tmplog, (struct sockaddr *)&conn->addr.to, dst + maxsize - tmplog, tmp);
 				else
diff --git a/src/mux_pt.c b/src/mux_pt.c
index 48a676f..2a83eb4 100644
--- a/src/mux_pt.c
+++ b/src/mux_pt.c
@@ -14,17 +14,31 @@
 #include <proto/connection.h>
 #include <proto/stream.h>
 
-/* Initialize the mux once it's attached. If conn->mux_ctx is NULL, it is
- * assumed that no data layer has yet been instanciated so the mux is
- * attached to an incoming connection and will instanciate a new stream. If
- * conn->mux_ctx exists, it is assumed that it is an outgoing connection
- * requested for this context. Returns < 0 on error.
+/* Initialize the mux once it's attached. It is expected that conn->mux_ctx
+ * points to the existing conn_stream (for outgoing connections) or NULL (for
+ * incoming ones, in which case one will be allocated and a new stream will be
+ * instanciated). Returns < 0 on error.
  */
 static int mux_pt_init(struct connection *conn)
 {
-	if (!conn->mux_ctx)
-		return stream_create_from_conn(conn);
+	struct conn_stream *cs = conn->mux_ctx;
+
+	if (!cs) {
+		cs = cs_new(conn);
+		if (!cs)
+			goto fail;
+
+		if (stream_create_from_cs(cs) < 0)
+			goto fail_free;
+
+		conn->mux_ctx = cs;
+	}
 	return 0;
+
+ fail_free:
+	cs_free(cs);
+ fail:
+	return -1;
 }
 
 /* callback to be used by default for the pass-through mux. It calls the data
@@ -32,7 +46,13 @@
  */
 static int mux_pt_wake(struct connection *conn)
 {
-	return conn->data->wake ? conn->data->wake(conn) : 0;
+	struct conn_stream *cs = conn->mux_ctx;
+	int ret;
+
+	ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
+
+	cs_update_mux_polling(cs);
+	return (ret);
 }
 
 /* callback used to update the mux's polling flags after changing a cs' status.
@@ -60,7 +80,12 @@
  */
 static void mux_pt_recv(struct connection *conn)
 {
-	conn->data->recv(conn);
+	struct conn_stream *cs = conn->mux_ctx;
+
+	if (conn_xprt_read0_pending(conn))
+		cs->flags |= CS_FL_EOS;
+	cs->data_cb->recv(cs);
+	cs_update_mux_polling(cs);
 }
 
 /* callback to be used by default for the pass-through mux. It simply calls the
@@ -68,7 +93,10 @@
  */
 static void mux_pt_send(struct connection *conn)
 {
-	conn->data->send(conn);
+	struct conn_stream *cs = conn->mux_ctx;
+
+	cs->data_cb->send(cs);
+	cs_update_mux_polling(cs);
 }
 
 /*
diff --git a/src/peers.c b/src/peers.c
index d7705ea..9419afe 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -1871,6 +1871,7 @@
 	struct session *sess;
 	struct stream *s;
 	struct connection *conn;
+	struct conn_stream *cs;
 
 	peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
 	peer->statuscode = PEER_SESS_SC_CONNECTCODE;
@@ -1912,9 +1913,12 @@
 	if (unlikely((conn = conn_new()) == NULL))
 		goto out_free_strm;
 
+	if (unlikely((cs = cs_new(conn)) == NULL))
+		goto out_free_conn;
+
 	conn_prepare(conn, peer->proto, peer->xprt);
-	conn_install_mux(conn, &mux_pt_ops, conn);
-	si_attach_conn(&s->si[1], conn);
+	conn_install_mux(conn, &mux_pt_ops, cs);
+	si_attach_cs(&s->si[1], cs);
 
 	conn->target = s->target = &s->be->obj_type;
 	memcpy(&conn->addr.to, &peer->addr, sizeof(conn->addr.to));
@@ -1928,6 +1932,8 @@
 	return appctx;
 
 	/* Error unrolling */
+ out_free_conn:
+	conn_free(conn);
  out_free_strm:
 	LIST_DEL(&s->list);
 	pool_free2(pool2_stream, s);
diff --git a/src/proto_http.c b/src/proto_http.c
index ca99188..8d813d3 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -3662,7 +3662,7 @@
 		char *path;
 
 		/* Note that for now we don't reuse existing proxy connections */
-		if (unlikely((conn = si_alloc_conn(&s->si[1])) == NULL)) {
+		if (unlikely((conn = cs_conn(si_alloc_cs(&s->si[1], NULL))) == NULL)) {
 			txn->req.err_state = txn->req.msg_state;
 			txn->req.msg_state = HTTP_MSG_ERROR;
 			txn->status = 500;
@@ -4212,6 +4212,7 @@
 	int prev_status = s->txn->status;
 	struct proxy *fe = strm_fe(s);
 	struct proxy *be = s->be;
+	struct conn_stream *cs;
 	struct connection *srv_conn;
 	struct server *srv;
 	unsigned int prev_flags = s->txn->flags;
@@ -4221,7 +4222,14 @@
 	 * flags. We also need a more accurate method for computing per-request
 	 * data.
 	 */
-	srv_conn = objt_conn(s->si[1].end);
+	/*
+	 * XXX cognet: This is probably wrong, this is killing a whole
+	 * connection, in the new world order, we probably want to just kill
+	 * the stream, this is to be revisited the day we handle multiple
+	 * streams in one server connection.
+	 */
+	cs = objt_cs(s->si[1].end);
+	srv_conn = cs_conn(cs);
 
 	/* unless we're doing keep-alive, we want to quickly close the connection
 	 * to the server.
@@ -4364,17 +4372,17 @@
 	if (srv_conn && LIST_ISEMPTY(&srv_conn->list)) {
 		srv = objt_server(srv_conn->target);
 		if (!srv)
-			si_idle_conn(&s->si[1], NULL);
+			si_idle_cs(&s->si[1], NULL);
 		else if (srv_conn->flags & CO_FL_PRIVATE)
-			si_idle_conn(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL));
+			si_idle_cs(&s->si[1], (srv->priv_conns ? &srv->priv_conns[tid] : NULL));
 		else if (prev_flags & TX_NOT_FIRST)
 			/* note: we check the request, not the connection, but
 			 * this is valid for strategies SAFE and AGGR, and in
 			 * case of ALWS, we don't care anyway.
 			 */
-			si_idle_conn(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL));
+			si_idle_cs(&s->si[1], (srv->safe_conns ? &srv->safe_conns[tid] : NULL));
 		else
-			si_idle_conn(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL));
+			si_idle_cs(&s->si[1], (srv->idle_conns ? &srv->idle_conns[tid] : NULL));
 	}
 	s->req.analysers = strm_li(s) ? strm_li(s)->analysers : 0;
 	s->res.analysers = 0;
@@ -7936,7 +7944,7 @@
 	chunk_printf(&trash, "%08x:%s.%s[%04x:%04x]: ", s->uniq_id, s->be->id,
 		      dir,
 		     objt_conn(sess->origin) ? (unsigned short)objt_conn(sess->origin)->handle.fd : -1,
-		     objt_conn(s->si[1].end) ? (unsigned short)objt_conn(s->si[1].end)->handle.fd : -1);
+		     objt_cs(s->si[1].end) ? (unsigned short)objt_cs(s->si[1].end)->conn->handle.fd : -1);
 
 	for (max = 0; start + max < end; max++)
 		if (start[max] == '\r' || start[max] == '\n')
diff --git a/src/proto_tcp.c b/src/proto_tcp.c
index d5345ee..5badda7 100644
--- a/src/proto_tcp.c
+++ b/src/proto_tcp.c
@@ -1598,11 +1598,11 @@
 	/* get the object associated with the stream interface.The
 	 * object can be other thing than a connection. For example,
 	 * it be a appctx. */
-	conn = objt_conn(smp->strm->si[dir].end);
+	conn = cs_conn(objt_cs(smp->strm->si[dir].end));
 	if (!conn)
 		return 0;
 
-	/* The fd may not be avalaible for the tcp_info struct, and the
+	/* The fd may not be available for the tcp_info struct, and the
 	  syscal can fail. */
 	optlen = sizeof(info);
 	if (getsockopt(conn->handle.fd, SOL_TCP, TCP_INFO, &info, &optlen) == -1)
diff --git a/src/stream.c b/src/stream.c
index 4808cfa..722d2f4 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -74,11 +74,11 @@
  * valid right after the handshake, before the connection's data layer is
  * initialized, because it relies on the session to be in conn->owner.
  */
-int stream_create_from_conn(struct connection *conn)
+int stream_create_from_cs(struct conn_stream *cs)
 {
 	struct stream *strm;
 
-	strm = stream_new(conn->owner, &conn->obj_type);
+	strm = stream_new(cs->conn->owner, &cs->obj_type);
 	if (strm == NULL)
 		return -1;
 
@@ -99,7 +99,7 @@
 {
 	struct stream *s;
 	struct task *t;
-	struct connection *conn = objt_conn(origin);
+	struct conn_stream *cs  = objt_cs(origin);
 	struct appctx *appctx   = objt_appctx(origin);
 
 	if (unlikely((s = pool_alloc2(pool2_stream)) == NULL))
@@ -198,8 +198,8 @@
 	s->si[0].hcto = sess->fe->timeout.clientfin;
 
 	/* attach the incoming connection to the stream interface now. */
-	if (conn)
-		si_attach_conn(&s->si[0], conn);
+	if (cs)
+		si_attach_cs(&s->si[0], cs);
 	else if (appctx)
 		si_attach_appctx(&s->si[0], appctx);
 
@@ -261,8 +261,8 @@
 		goto out_fail_accept;
 
 	/* finish initialization of the accepted file descriptor */
-	if (conn)
-		conn_xprt_want_recv(conn);
+	if (cs)
+		cs_want_recv(cs);
 	else if (appctx)
 		si_applet_want_get(&s->si[0]);
 
@@ -295,7 +295,8 @@
 	struct session *sess = strm_sess(s);
 	struct proxy *fe = sess->fe;
 	struct bref *bref, *back;
-	struct connection *cli_conn = objt_conn(s->si[0].end);
+	struct conn_stream *cli_cs = objt_cs(s->si[0].end);
+	struct connection *cli_conn = cs_conn(cli_cs);
 	int i;
 
 	if (s->pend_pos)
@@ -343,6 +344,7 @@
 		http_end_txn(s);
 
 	/* ensure the client-side transport layer is destroyed */
+	/* XXX cognet: wrong for multiple streams in one connection */
 	if (cli_conn) {
 		conn_stop_tracking(cli_conn);
 		conn_full_close(cli_conn);
@@ -577,7 +579,7 @@
 	struct stream_interface *si = &s->si[1];
 	struct channel *req = &s->req;
 	struct channel *rep = &s->res;
-	struct connection *srv_conn = __objt_conn(si->end);
+	struct connection *srv_conn = __objt_cs(si->end)->conn;
 
 	/* If we got an error, or if nothing happened and the connection timed
 	 * out, we must give up. The CER state handler will take care of retry
@@ -597,6 +599,9 @@
 		si->exp   = TICK_ETERNITY;
 		si->state = SI_ST_CER;
 
+		/* XXX cognet: do we really want to kill the connection here ?
+		 * Probably not for multiple streams.
+		 */
 		conn_full_close(srv_conn);
 
 		if (si->err_type)
@@ -647,7 +652,8 @@
 static int sess_update_st_cer(struct stream *s)
 {
 	struct stream_interface *si = &s->si[1];
-	struct connection *conn = objt_conn(si->end);
+	struct conn_stream *cs = objt_cs(si->end);
+	struct connection *conn = cs_conn(cs);
 
 	/* we probably have to release last stream from the server */
 	if (objt_server(s->target)) {
@@ -812,7 +818,7 @@
 		req->flags |= CF_WAKE_ONCE;
 		req->flags &= ~CF_WAKE_CONNECT;
 	}
-	if (objt_conn(si->end)) {
+	if (objt_cs(si->end)) {
 		/* real connections have timeouts */
 		req->wto = s->be->timeout.server;
 		rep->rto = s->be->timeout.server;
@@ -2111,8 +2117,8 @@
 	if (!(req->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
 	    req->to_forward &&
 	    (global.tune.options & GTUNE_USE_SPLICE) &&
-	    (objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->rcv_pipe) &&
-	    (objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->snd_pipe) &&
+	    (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->rcv_pipe) &&
+	    (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->snd_pipe) &&
 	    (pipes_used < global.maxpipes) &&
 	    (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_REQ) ||
 	     (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
@@ -2292,8 +2298,8 @@
 	if (!(res->flags & (CF_KERN_SPLICING|CF_SHUTR)) &&
 	    res->to_forward &&
 	    (global.tune.options & GTUNE_USE_SPLICE) &&
-	    (objt_conn(si_f->end) && __objt_conn(si_f->end)->xprt && __objt_conn(si_f->end)->xprt->snd_pipe) &&
-	    (objt_conn(si_b->end) && __objt_conn(si_b->end)->xprt && __objt_conn(si_b->end)->xprt->rcv_pipe) &&
+	    (objt_cs(si_f->end) && __objt_cs(si_f->end)->conn->xprt && __objt_cs(si_f->end)->conn->xprt->snd_pipe) &&
+	    (objt_cs(si_b->end) && __objt_cs(si_b->end)->conn->xprt && __objt_cs(si_b->end)->conn->xprt->rcv_pipe) &&
 	    (pipes_used < global.maxpipes) &&
 	    (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_RTR) ||
 	     (((sess->fe->options2|s->be->options2) & PR_O2_SPLIC_AUT) &&
@@ -2361,8 +2367,8 @@
 		    si_b->prev_state == SI_ST_EST) {
 			chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
 				      s->uniq_id, s->be->id,
-			              objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1,
-			              objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1);
+			              objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
+			              objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
 			shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
 		}
 
@@ -2370,8 +2376,8 @@
 		    si_f->prev_state == SI_ST_EST) {
 			chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
 				      s->uniq_id, s->be->id,
-			              objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1,
-			              objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1);
+			              objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
+			              objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
 			shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
 		}
 	}
@@ -2460,8 +2466,8 @@
 		     (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
 		chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
 			      s->uniq_id, s->be->id,
-		              objt_conn(si_f->end) ? (unsigned short)objt_conn(si_f->end)->handle.fd : -1,
-		              objt_conn(si_b->end) ? (unsigned short)objt_conn(si_b->end)->handle.fd : -1);
+		              objt_cs(si_f->end) ? (unsigned short)objt_cs(si_f->end)->conn->handle.fd : -1,
+		              objt_cs(si_b->end) ? (unsigned short)objt_cs(si_b->end)->conn->handle.fd : -1);
 		shut_your_big_mouth_gcc(write(1, trash.str, trash.len));
 	}
 
@@ -2692,6 +2698,7 @@
 	struct tm tm;
 	extern const char *monthname[12];
 	char pn[INET6_ADDRSTRLEN];
+	struct conn_stream *cs;
 	struct connection *conn;
 	struct appctx *tmpctx;
 
@@ -2777,7 +2784,9 @@
 		else
 			chunk_appendf(&trash, "  backend=<NONE> (id=-1 mode=-)");
 
+		cs = objt_cs(strm->si[1].end);
+		conn = cs_conn(cs);
+
-		conn = objt_conn(strm->si[1].end);
 		if (conn)
 			conn_get_from_addr(conn);
 
@@ -2869,14 +2878,16 @@
 			                     TICKS_TO_MS(1000)) : "<NEVER>",
 			     strm->si[1].err_type);
 
-		if ((conn = objt_conn(strm->si[0].end)) != NULL) {
+		if ((cs = objt_cs(strm->si[0].end)) != NULL) {
+			conn = cs->conn;
+
 			chunk_appendf(&trash,
 			              "  co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
 				      conn,
 				      conn_get_ctrl_name(conn),
 				      conn_get_xprt_name(conn),
 				      conn_get_mux_name(conn),
-				      conn_get_data_name(conn),
+				      cs_get_data_name(cs),
 			              obj_type_name(conn->target),
 			              obj_base_ptr(conn->target));
 
@@ -2898,14 +2909,16 @@
 			              tmpctx->applet->name);
 		}
 
-		if ((conn = objt_conn(strm->si[1].end)) != NULL) {
+		if ((cs = objt_cs(strm->si[1].end)) != NULL) {
+			conn = cs->conn;
+
 			chunk_appendf(&trash,
 			              "  co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n",
 				      conn,
 				      conn_get_ctrl_name(conn),
 				      conn_get_xprt_name(conn),
 				      conn_get_mux_name(conn),
-				      conn_get_data_name(conn),
+				      cs_get_data_name(cs),
 			              obj_type_name(conn->target),
 			              obj_base_ptr(conn->target));
 
@@ -3171,7 +3184,7 @@
 				     human_time(TICKS_TO_MS(curr_strm->res.analyse_exp - now_ms),
 						TICKS_TO_MS(1000)) : "");
 
-			conn = objt_conn(curr_strm->si[0].end);
+			conn = cs_conn(objt_cs(curr_strm->si[0].end));
 			chunk_appendf(&trash,
 				     " s0=[%d,%1xh,fd=%d,ex=%s]",
 				     curr_strm->si[0].state,
@@ -3181,7 +3194,7 @@
 				     human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms),
 						TICKS_TO_MS(1000)) : "");
 
-			conn = objt_conn(curr_strm->si[1].end);
+			conn = cs_conn(objt_cs(curr_strm->si[1].end));
 			chunk_appendf(&trash,
 				     " s1=[%d,%1xh,fd=%d,ex=%s]",
 				     curr_strm->si[1].state,
diff --git a/src/stream_interface.c b/src/stream_interface.c
index a5463b7..e9bc831 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -30,6 +30,7 @@
 #include <proto/applet.h>
 #include <proto/channel.h>
 #include <proto/connection.h>
+#include <proto/mux_pt.h>
 #include <proto/pipe.h>
 #include <proto/stream.h>
 #include <proto/stream_interface.h>
@@ -50,11 +51,11 @@
 static void stream_int_shutw_applet(struct stream_interface *si);
 static void stream_int_chk_rcv_applet(struct stream_interface *si);
 static void stream_int_chk_snd_applet(struct stream_interface *si);
-static void si_conn_recv_cb(struct connection *conn);
-static void si_conn_send_cb(struct connection *conn);
-static int si_conn_wake_cb(struct connection *conn);
-static int si_idle_conn_wake_cb(struct connection *conn);
-static void si_idle_conn_null_cb(struct connection *conn);
+static void si_cs_recv_cb(struct conn_stream *cs);
+static void si_cs_send_cb(struct conn_stream *cs);
+static int si_cs_wake_cb(struct conn_stream *cs);
+static int si_idle_conn_wake_cb(struct conn_stream *cs);
+static void si_idle_conn_null_cb(struct conn_stream *cs);
 
 /* stream-interface operations for embedded tasks */
 struct si_ops si_embedded_ops = {
@@ -83,9 +84,9 @@
 };
 
 struct data_cb si_conn_cb = {
-	.recv    = si_conn_recv_cb,
-	.send    = si_conn_send_cb,
-	.wake    = si_conn_wake_cb,
+	.recv    = si_cs_recv_cb,
+	.send    = si_cs_send_cb,
+	.wake    = si_cs_wake_cb,
 	.name    = "STRM",
 };
 
@@ -337,8 +338,10 @@
 	 * we've sent the whole proxy line. Otherwise we use connect().
 	 */
 	while (conn->send_proxy_ofs) {
+		struct conn_stream *cs;
 		int ret;
 
+		cs = conn->mux_ctx;
 		/* The target server expects a PROXY line to be sent first.
 		 * If the send_proxy_ofs is negative, it corresponds to the
 		 * offset to start sending from then end of the proxy string
@@ -348,10 +351,10 @@
 		 * is attached to a stream interface. Otherwise we can only
 		 * send a LOCAL line (eg: for use with health checks).
 		 */
-		if (conn->data == &si_conn_cb) {
-			struct stream_interface *si = conn->owner;
-			struct connection *remote = objt_conn(si_opposite(si)->end);
-			ret = make_proxy_line(trash.str, trash.size, objt_server(conn->target), remote);
+		if (conn->mux == &mux_pt_ops && cs->data_cb == &si_conn_cb) {
+			struct stream_interface *si = cs->data;
+			struct conn_stream *remote_cs = objt_cs(si_opposite(si)->end);
+			ret = make_proxy_line(trash.str, trash.size, objt_server(conn->target), remote_cs ? remote_cs->conn : NULL);
 		}
 		else {
 			/* The target server expects a LOCAL line to be sent first. Retrieving
@@ -414,9 +417,9 @@
  * It simply sets the CO_FL_SOCK_RD_SH flag so that si_idle_conn_wake_cb()
  * is notified and can kill the connection.
  */
-static void si_idle_conn_null_cb(struct connection *conn)
+static void si_idle_conn_null_cb(struct conn_stream *cs)
 {
-	conn_sock_drain(conn);
+	conn_sock_drain(cs->conn);
 }
 
 /* Callback to be used by connection I/O handlers when some activity is detected
@@ -424,9 +427,10 @@
  * a close was detected on it. It returns 0 if it did nothing serious, or -1 if
  * it killed the connection.
  */
-static int si_idle_conn_wake_cb(struct connection *conn)
+static int si_idle_conn_wake_cb(struct conn_stream *cs)
 {
-	struct stream_interface *si = conn->owner;
+	struct connection *conn = cs->conn;
+	struct stream_interface *si = cs->data;
 
 	if (!conn_ctrl_ready(conn))
 		return 0;
@@ -560,9 +564,10 @@
  * connection's polling based on the channels and stream interface's final
  * states. The function always returns 0.
  */
-static int si_conn_wake_cb(struct connection *conn)
+static int si_cs_wake_cb(struct conn_stream *cs)
 {
-	struct stream_interface *si = conn->owner;
+	struct connection *conn = cs->conn;
+	struct stream_interface *si = cs->data;
 	struct channel *ic = si_ic(si);
 	struct channel *oc = si_oc(si);
 
@@ -599,36 +604,37 @@
 	 * was done above (eg: maybe some buffers got emptied).
 	 */
 	if (channel_is_empty(oc))
-		__conn_xprt_stop_send(conn);
+		__cs_stop_send(cs);
 
 
 	if (si->flags & SI_FL_WAIT_ROOM) {
-		__conn_xprt_stop_recv(conn);
+		__cs_stop_recv(cs);
 	}
 	else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
 		 channel_may_recv(ic)) {
-		__conn_xprt_want_recv(conn);
+		__cs_want_recv(cs);
 	}
 	return 0;
 }
 
 /*
  * This function is called to send buffer data to a stream socket.
- * It calls the transport layer's snd_buf function. It relies on the
+ * It calls the mux layer's snd_buf function. It relies on the
  * caller to commit polling changes. The caller should check conn->flags
  * for errors.
  */
-static void si_conn_send(struct connection *conn)
+static void si_cs_send(struct conn_stream *cs)
 {
-	struct stream_interface *si = conn->owner;
+	struct connection *conn = cs->conn;
+	struct stream_interface *si = cs->data;
 	struct channel *oc = si_oc(si);
 	int ret;
 
 	/* ensure it's only set if a write attempt has succeeded */
 	oc->flags &= ~CF_WRITE_PARTIAL;
 
-	if (oc->pipe && conn->xprt->snd_pipe) {
-		ret = conn->xprt->snd_pipe(conn, oc->pipe);
+	if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
+		ret = conn->mux->snd_pipe(cs, oc->pipe);
 		if (ret > 0)
 			oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
 
@@ -672,7 +678,7 @@
 		if (oc->flags & CF_STREAMER)
 			send_flag |= CO_SFL_STREAMER;
 
-		ret = conn->xprt->snd_buf(conn, oc->buf, send_flag);
+		ret = conn->mux->snd_buf(cs, oc->buf, send_flag);
 		if (ret > 0) {
 			oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
 
@@ -766,25 +772,25 @@
 {
 	struct channel *ic = si_ic(si);
 	struct channel *oc = si_oc(si);
-	struct connection *conn = __objt_conn(si->end);
+	struct conn_stream *cs = __objt_cs(si->end);
 
 	if (!(ic->flags & CF_SHUTR)) {
 		/* Read not closed */
 		if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic))
-			__conn_xprt_stop_recv(conn);
+			__cs_stop_recv(cs);
 		else
-			__conn_xprt_want_recv(conn);
+			__cs_want_recv(cs);
 	}
 
 	if (!(oc->flags & CF_SHUTW)) {
 		/* Write not closed */
 		if (channel_is_empty(oc))
-			__conn_xprt_stop_send(conn);
+			__cs_stop_send(cs);
 		else
-			__conn_xprt_want_send(conn);
+			__cs_want_send(cs);
 	}
 
-	conn_cond_update_xprt_polling(conn);
+	cs_update_mux_polling(cs);
 }
 
 /*
@@ -799,7 +805,8 @@
  */
 static void stream_int_shutr_conn(struct stream_interface *si)
 {
-	struct connection *conn = __objt_conn(si->end);
+	struct conn_stream *cs = __objt_cs(si->end);
+	struct connection *conn = cs->conn;
 	struct channel *ic = si_ic(si);
 
 	ic->flags &= ~CF_SHUTR_NOW;
@@ -813,6 +820,7 @@
 		return;
 
 	if (si_oc(si)->flags & CF_SHUTW) {
+		/* XXX: should just close cs ? */
 		conn_full_close(conn);
 		si->state = SI_ST_DIS;
 		si->exp = TICK_ETERNITY;
@@ -823,7 +831,7 @@
 	}
 	else if (conn->ctrl) {
 		/* we want the caller to disable polling on this FD */
-		conn_xprt_stop_recv(conn);
+		cs_stop_recv(cs);
 	}
 }
 
@@ -837,7 +845,8 @@
  */
 static void stream_int_shutw_conn(struct stream_interface *si)
 {
-	struct connection *conn = __objt_conn(si->end);
+	struct conn_stream *cs = __objt_cs(si->end);
+	struct connection *conn = cs->conn;
 	struct channel *ic = si_ic(si);
 	struct channel *oc = si_oc(si);
 
@@ -865,13 +874,21 @@
 			/* quick close, the socket is alredy shut anyway */
 		}
 		else if (si->flags & SI_FL_NOLINGER) {
-			/* unclean data-layer shutdown */
-			conn_xprt_shutw_hard(conn);
+			/* unclean data-layer shutdown, typically an aborted request
+			 * or a forwarded shutdown from a client to a server due to
+			 * option abortonclose. No need for the TLS layer to try to
+			 * emit a shutdown message.
+			 */
+			cs_shutw_hard(cs);
 		}
 		else {
-			/* clean data-layer shutdown */
-			conn_xprt_shutw(conn);
-			conn_sock_shutw(conn);
+			/* clean data-layer shutdown. This only happens on the
+			 * frontend side, or on the backend side when forwarding
+			 * a client close in TCP mode or in HTTP TUNNEL mode
+			 * while option abortonclose is set. We want the TLS
+			 * layer to try to signal it to the peer before we close.
+			 */
+			cs_shutw(cs);
 
 			/* If the stream interface is configured to disable half-open
 			 * connections, we'll skip the shutdown(), but only if the
@@ -920,25 +937,23 @@
 static void stream_int_chk_rcv_conn(struct stream_interface *si)
 {
 	struct channel *ic = si_ic(si);
-	struct connection *conn = __objt_conn(si->end);
+	struct conn_stream *cs = __objt_cs(si->end);
 
 	if (unlikely(si->state > SI_ST_EST || (ic->flags & CF_SHUTR)))
 		return;
 
-	conn_refresh_polling_flags(conn);
-
 	if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
 		/* stop reading */
 		if (!(ic->flags & CF_DONT_READ)) /* full */
 			si->flags |= SI_FL_WAIT_ROOM;
-		__conn_xprt_stop_recv(conn);
+		__cs_stop_recv(cs);
 	}
 	else {
 		/* (re)start reading */
 		si->flags &= ~SI_FL_WAIT_ROOM;
-		__conn_xprt_want_recv(conn);
+		__cs_want_recv(cs);
 	}
-	conn_cond_update_xprt_polling(conn);
+	cs_update_mux_polling(cs);
 }
 
 
@@ -950,7 +965,7 @@
 static void stream_int_chk_snd_conn(struct stream_interface *si)
 {
 	struct channel *oc = si_oc(si);
-	struct connection *conn = __objt_conn(si->end);
+	struct conn_stream *cs = __objt_cs(si->end);
 
 	/* ensure it's only set if a write attempt has succeeded */
 	oc->flags &= ~CF_WRITE_PARTIAL;
@@ -965,7 +980,7 @@
 	    !(si->flags & SI_FL_WAIT_DATA))       /* not waiting for data */
 		return;
 
-	if (conn->flags & CO_FL_XPRT_WR_ENA) {
+	if (cs->flags & CS_FL_DATA_WR_ENA) {
 		/* already subscribed to write notifications, will be called
 		 * anyway, so let's avoid calling it especially if the reader
 		 * is not ready.
@@ -973,16 +988,12 @@
 		return;
 	}
 
-	/* Before calling the data-level operations, we have to prepare
-	 * the polling flags to ensure we properly detect changes.
-	 */
-	conn_refresh_polling_flags(conn);
-	__conn_xprt_want_send(conn);
+	__cs_want_send(cs);
 
-	si_conn_send(conn);
-	if (conn->flags & CO_FL_ERROR) {
+	si_cs_send(cs);
+	if (cs->conn->flags & CO_FL_ERROR) {
 		/* Write error on the file descriptor */
-		__conn_xprt_stop_both(conn);
+		__cs_stop_both(cs);
 		si->flags |= SI_FL_ERR;
 		goto out_wakeup;
 	}
@@ -996,7 +1007,7 @@
 		 * ->o limit was reached. Maybe we just wrote the last
 		 * chunk and need to close.
 		 */
-		__conn_xprt_stop_send(conn);
+		__cs_stop_send(cs);
 		if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
 		     (CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
 		    (si->state == SI_ST_EST)) {
@@ -1012,7 +1023,7 @@
 		/* Otherwise there are remaining data to be sent in the buffer,
 		 * which means we have to poll before doing so.
 		 */
-		__conn_xprt_want_send(conn);
+		__cs_want_send(cs);
 		si->flags &= ~SI_FL_WAIT_DATA;
 		if (!tick_isset(oc->wex))
 			oc->wex = tick_add_ifset(now_ms, oc->wto);
@@ -1052,17 +1063,18 @@
 	}
 
 	/* commit possible polling changes */
-	conn_cond_update_polling(conn);
+	cs_update_mux_polling(cs);
 }
 
 /*
  * This is the callback which is called by the connection layer to receive data
- * into the buffer from the connection. It iterates over the transport layer's
+ * into the buffer from the connection. It iterates over the mux layer's
  * rcv_buf function.
  */
-static void si_conn_recv_cb(struct connection *conn)
+static void si_cs_recv_cb(struct conn_stream *cs)
 {
-	struct stream_interface *si = conn->owner;
+	struct connection *conn = cs->conn;
+	struct stream_interface *si = cs->data;
 	struct channel *ic = si_ic(si);
 	int ret, max, cur_read;
 	int read_poll = MAX_READ_POLL_LOOPS;
@@ -1081,7 +1093,7 @@
 		return;
 
 	/* stop here if we reached the end of data */
-	if (conn_xprt_read0_pending(conn))
+	if (cs->flags & CS_FL_EOS)
 		goto out_shutdown_r;
 
 	cur_read = 0;
@@ -1101,7 +1113,7 @@
 	/* First, let's see if we may splice data across the channel without
 	 * using a buffer.
 	 */
-	if (conn->xprt->rcv_pipe &&
+	if (conn->xprt->rcv_pipe && conn->mux->rcv_pipe &&
 	    (ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
 	    ic->flags & CF_KERN_SPLICING) {
 		if (buffer_not_empty(ic->buf)) {
@@ -1120,7 +1132,7 @@
 			}
 		}
 
-		ret = conn->xprt->rcv_pipe(conn, ic->pipe, ic->to_forward);
+		ret = conn->mux->rcv_pipe(cs, ic->pipe, ic->to_forward);
 		if (ret < 0) {
 			/* splice not supported on this end, let's disable it */
 			ic->flags &= ~CF_KERN_SPLICING;
@@ -1135,7 +1147,7 @@
 			ic->flags |= CF_READ_PARTIAL;
 		}
 
-		if (conn_xprt_read0_pending(conn))
+		if (cs->flags & CS_FL_EOS)
 			goto out_shutdown_r;
 
 		if (conn->flags & CO_FL_ERROR)
@@ -1146,7 +1158,7 @@
 			 * could soon be full. Let's stop before needing to poll.
 			 */
 			si->flags |= SI_FL_WAIT_ROOM;
-			__conn_xprt_stop_recv(conn);
+			__cs_stop_recv(cs);
 		}
 
 		/* splice not possible (anymore), let's go on on standard copy */
@@ -1177,7 +1189,7 @@
 			break;
 		}
 
-		ret = conn->xprt->rcv_buf(conn, ic->buf, max);
+		ret = conn->mux->rcv_buf(cs, ic->buf, max);
 		if (ret <= 0)
 			break;
 
@@ -1203,9 +1215,12 @@
 		}
 
 		if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
-			if (__conn_xprt_done_recv(conn))
-				si->flags |= SI_FL_WAIT_ROOM;
-			break;
+			/*
+			 * This used to be __conn_xprt_done_recv()
+			 * This was changed to accomodate with the mux code,
+			 * but we may have lost a worthwhile optimization.
+			 */
+			__cs_stop_recv(cs);
 		}
 
 		/* if too many bytes were missing from last read, it means that
@@ -1271,7 +1286,7 @@
 	if (conn->flags & CO_FL_ERROR)
 		return;
 
-	if (conn_xprt_read0_pending(conn))
+	if (cs->flags & CS_FL_EOS)
 		/* connection closed */
 		goto out_shutdown_r;
 
@@ -1291,9 +1306,10 @@
  * from the buffer to the connection. It iterates over the transport layer's
  * snd_buf function.
  */
-static void si_conn_send_cb(struct connection *conn)
+static void si_cs_send_cb(struct conn_stream *cs)
 {
-	struct stream_interface *si = conn->owner;
+	struct connection *conn = cs->conn;
+	struct stream_interface *si = cs->data;
 
 	if (conn->flags & CO_FL_ERROR)
 		return;
@@ -1307,7 +1323,7 @@
 		return;
 
 	/* OK there are data waiting to be sent */
-	si_conn_send(conn);
+	si_cs_send(cs);
 
 	/* OK all done */
 	return;
@@ -1320,7 +1336,7 @@
  */
 void stream_sock_read0(struct stream_interface *si)
 {
-	struct connection *conn = __objt_conn(si->end);
+	struct conn_stream *cs = __objt_cs(si->end);
 	struct channel *ic = si_ic(si);
 	struct channel *oc = si_oc(si);
 
@@ -1340,17 +1356,17 @@
 	if (si->flags & SI_FL_NOHALF) {
 		/* we want to immediately forward this close to the write side */
 		/* force flag on ssl to keep stream in cache */
-		conn_xprt_shutw_hard(conn);
+		cs_shutw_hard(cs);
 		goto do_close;
 	}
 
 	/* otherwise that's just a normal read shutdown */
-	__conn_xprt_stop_recv(conn);
+	__cs_stop_recv(cs);
 	return;
 
  do_close:
 	/* OK we completely close the socket here just as if we went through si_shut[rw]() */
-	conn_full_close(conn);
+	conn_full_close(cs->conn);
 
 	ic->flags &= ~CF_SHUTR_NOW;
 	ic->flags |= CF_SHUTR;