MEDIUM: conn-stream: Pre-allocate endpoint to create CS from muxes and applets

It is a transient commit to prepare next changes. Now, when a conn-stream is
created from an applet or a multiplexer, an endpoint is always provided. In
addition, the API to create a conn-stream was specialized to have one
function per type.

The next step will be to share the endpoint structure.
diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h
index 97164ac..97b9c34 100644
--- a/include/haproxy/applet.h
+++ b/include/haproxy/applet.h
@@ -59,7 +59,7 @@
  * appctx_free(). <applet> is assigned as the applet, but it can be NULL. The
  * applet's task is always created on the current thread.
  */
-static inline struct appctx *appctx_new(struct applet *applet, void *owner)
+static inline struct appctx *appctx_new(struct applet *applet)
 {
 	struct appctx *appctx;
 
@@ -67,7 +67,6 @@
 	if (likely(appctx != NULL)) {
 		appctx->obj_type = OBJ_TYPE_APPCTX;
 		appctx->applet = applet;
-		appctx->owner = owner;
 		appctx_init(appctx);
 		appctx->t = task_new_here();
 		if (unlikely(appctx->t == NULL)) {
diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h
index 638b0cf..e0c3b50 100644
--- a/include/haproxy/conn_stream.h
+++ b/include/haproxy/conn_stream.h
@@ -23,11 +23,13 @@
 #define _HAPROXY_CONN_STREAM_H
 
 #include <haproxy/api.h>
-#include <haproxy/applet.h>
 #include <haproxy/connection.h>
 #include <haproxy/conn_stream-t.h>
 #include <haproxy/obj_type.h>
 
+struct buffer;
+struct session;
+struct appctx;
 struct stream;
 struct stream_interface;
 struct check;
@@ -38,10 +40,16 @@
 void cs_endpoint_free(struct cs_endpoint *endp);
 
 struct conn_stream *cs_new(struct cs_endpoint *endp);
+struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input);
+struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input);
+struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags);
+struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags);
 void cs_free(struct conn_stream *cs);
-void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx);
-void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx);
-int cs_attach_app(struct conn_stream *cs, enum obj_type *app);
+
+void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx);
+void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx);
+int cs_attach_strm(struct conn_stream *cs, struct stream *strm);
+
 void cs_detach_endp(struct conn_stream *cs);
 void cs_detach_app(struct conn_stream *cs);
 
diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h
index aab4687..6c97b7f 100644
--- a/include/haproxy/mux_quic.h
+++ b/include/haproxy/mux_quic.h
@@ -115,15 +115,13 @@
 		return NULL;
 	endp->target = qcs;
 	endp->ctx = qcs->qcc->conn;
-	cs = cs_new(endp);
+	endp->flags |= CS_EP_T_MUX;
+	cs = cs_new_from_mux(endp, qcs->qcc->conn->owner, buf);
 	if (!cs) {
 		cs_endpoint_free(endp);
 		return NULL;
 	}
-	cs_attach_endp_mux(cs, qcs, qcs->qcc->conn);
 	qcs->cs = cs;
-	stream_new(qcs->qcc->conn->owner, cs, buf);
-
 	++qcs->qcc->nb_cs;
 
 	return cs;
diff --git a/src/backend.c b/src/backend.c
index 2f33f37..f8c13b4 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -1570,7 +1570,9 @@
 			return SF_ERR_INTERNAL;  /* how did we get there ? */
 		}
 
-		cs_attach_endp_mux(s->csb, NULL, srv_conn);
+		cs_attach_mux(s->csb, NULL, srv_conn);
+		srv_conn->ctx = s->csb;
+
 #if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation)
 		if (!srv ||
 		    (srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) ||
diff --git a/src/check.c b/src/check.c
index b26e7b9..44583c2 100644
--- a/src/check.c
+++ b/src/check.c
@@ -1391,11 +1391,9 @@
 	if (check->type == PR_O2_EXT_CHK)
 		t = task_new_on(0);
 	else {
-		check->cs = cs_new(NULL);
+		check->cs = cs_new_from_check(check, CS_FL_NONE);
 		if (!check->cs)
 			goto fail_alloc_cs;
-		if (cs_attach_app(check->cs, &check->obj_type) < 0)
-			goto fail_attach_cs;
 		t = task_new_anywhere();
 	}
 
@@ -1420,7 +1418,6 @@
 	return 1;
 
   fail_alloc_task:
-  fail_attach_cs:
 	cs_free(check->cs);
   fail_alloc_cs:
 	ha_alert("Starting [%s:%s] check: out of memory.\n",
diff --git a/src/conn_stream.c b/src/conn_stream.c
index 5b81774..24a22b5 100644
--- a/src/conn_stream.c
+++ b/src/conn_stream.c
@@ -75,6 +75,68 @@
 	return NULL;
 }
 
+struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input)
+{
+	struct conn_stream *cs;
+
+	cs = cs_new(endp);
+	if (unlikely(!cs))
+		return NULL;
+	if (unlikely(!stream_new(sess, cs, input))) {
+		pool_free(pool_head_connstream, cs);
+		cs = NULL;
+	}
+	return cs;
+}
+
+struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input)
+{
+	struct conn_stream *cs;
+	struct appctx *appctx = endp->ctx;
+
+	cs = cs_new(endp);
+	if (unlikely(!cs))
+		return NULL;
+	appctx->owner = cs;
+	if (unlikely(!stream_new(sess, cs, input))) {
+		pool_free(pool_head_connstream, cs);
+		cs = NULL;
+	}
+	return cs;
+}
+
+struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags)
+{
+	struct conn_stream *cs;
+
+	cs = cs_new(NULL);
+	if (unlikely(!cs))
+		return NULL;
+	cs->flags |= flags;
+	cs->si = si_new(cs);
+	if (unlikely(!cs->si)) {
+		cs_free(cs);
+		return NULL;
+	}
+	cs->app = &strm->obj_type;
+	cs->si->ops = &si_embedded_ops;
+	cs->data_cb = NULL;
+	return cs;
+}
+
+struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags)
+{
+	struct conn_stream *cs;
+
+	cs = cs_new(NULL);
+	if (unlikely(!cs))
+		return NULL;
+	cs->flags |= flags;
+	cs->app = &check->obj_type;
+	cs->data_cb = &check_conn_cb;
+	return cs;
+}
+
 /* Releases a conn_stream previously allocated by cs_new(), as well as any
  * buffer it would still hold.
  */
@@ -89,11 +151,11 @@
 
 
 /* Attaches a conn_stream to an mux endpoint and sets the endpoint ctx */
-void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx)
+void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
 {
 	struct connection *conn = ctx;
 
-	cs->endp->target = endp;
+	cs->endp->target = target;
 	cs->endp->ctx = ctx;
 	cs->endp->flags |= CS_EP_T_MUX;
 	if (!conn->ctx)
@@ -107,11 +169,11 @@
 }
 
 /* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */
-void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx)
+void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx)
 {
-	struct appctx *appctx = endp;
+	struct appctx *appctx = target;
 
-	cs->endp->target = endp;
+	cs->endp->target = target;
 	cs->endp->ctx = ctx;
 	cs->endp->flags |= CS_EP_T_APPLET;
 	appctx->owner = cs;
@@ -122,31 +184,26 @@
 }
 
 /* Attaches a conn_stream to a app layer and sets the relevant callbacks */
-int cs_attach_app(struct conn_stream *cs, enum obj_type *app)
+int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
 {
-	cs->app = app;
+	cs->app = &strm->obj_type;
 
-	if (objt_stream(app)) {
-		if (!cs->si)
-			cs->si = si_new(cs);
-		if (unlikely(!cs->si))
-			return -1;
+	cs->si = si_new(cs);
+	if (unlikely(!cs->si))
+		return -1;
 
-		if (cs->endp->flags & CS_EP_T_MUX) {
-			cs->si->ops = &si_conn_ops;
-			cs->data_cb = &si_conn_cb;
-		}
-		else if (cs->endp->flags & CS_EP_T_APPLET) {
-			cs->si->ops = &si_applet_ops;
-			cs->data_cb = NULL;
-		}
-		else {
-			cs->si->ops = &si_embedded_ops;
-			cs->data_cb = NULL;
-		}
+	if (cs->endp->flags & CS_EP_T_MUX) {
+		cs->si->ops = &si_conn_ops;
+		cs->data_cb = &si_conn_cb;
 	}
-	else if (objt_check(app))
-		cs->data_cb = &check_conn_cb;
+	else if (cs->endp->flags & CS_EP_T_APPLET) {
+		cs->si->ops = &si_applet_ops;
+		cs->data_cb = NULL;
+	}
+	else {
+		cs->si->ops = &si_embedded_ops;
+		cs->data_cb = NULL;
+	}
 	return 0;
 }
 
diff --git a/src/dns.c b/src/dns.c
index 461e3c7..e6f7093 100644
--- a/src/dns.c
+++ b/src/dns.c
@@ -887,19 +887,15 @@
 {
 	struct appctx *appctx;
 	struct session *sess;
+	struct cs_endpoint *endp;
 	struct conn_stream *cs;
 	struct stream *s;
 	struct applet *applet = &dns_session_applet;
+	struct sockaddr_storage *addr = NULL;
 
-	cs = cs_new(NULL);
-	if (!cs) {
-		ha_alert("out of memory in dns_session_create().\n");
-		goto out_close;
-	}
-
-	appctx = appctx_new(applet, cs);
+	appctx = appctx_new(applet);
 	if (!appctx)
-		goto out_free_cs;
+		goto out_close;
 	appctx->ctx.sft.ptr = (void *)ds;
 
 	sess = session_new(ds->dss->srv->proxy, NULL, &appctx->obj_type);
@@ -908,18 +904,28 @@
 		goto out_free_appctx;
 	}
 
-	if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
-		ha_alert("Failed to initialize stream in dns_session_create().\n");
+	if (!sockaddr_alloc(&addr, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
 		goto out_free_sess;
+
+	endp = cs_endpoint_new();
+	if (!endp)
+		goto out_free_addr;
+	endp->target = appctx;
+	endp->ctx = appctx;
+	endp->flags |= CS_EP_T_APPLET;
+
+	cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+	if (!cs) {
+		ha_alert("Failed to initialize stream in dns_session_create().\n");
+		cs_endpoint_free(endp);
+		goto out_free_addr;
 	}
 
+	s = DISGUISE(cs_strm(cs));
+	cs_si(s->csb)->dst = addr;
+	cs_si(s->csb)->flags |= SI_FL_NOLINGER;
 	s->target = &ds->dss->srv->obj_type;
-	if (!sockaddr_alloc(&cs_si(s->csb)->dst, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
-		goto out_free_strm;
-
-	cs_attach_endp_app(cs, appctx, appctx);
 	s->flags = SF_ASSIGNED|SF_ADDR_SET;
-	cs_si(s->csb)->flags |= SI_FL_NOLINGER;
 
 	s->do_log = NULL;
 	s->uniq_id = 0;
@@ -934,15 +940,12 @@
 	return appctx;
 
 	/* Error unrolling */
- out_free_strm:
-	LIST_DELETE(&s->list);
-	pool_free(pool_head_stream, s);
+ out_free_addr:
+	sockaddr_free(&addr);
  out_free_sess:
 	session_free(sess);
  out_free_appctx:
 	appctx_free(appctx);
- out_free_cs:
-	cs_free(cs);
  out_close:
 	return NULL;
 }
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index e8daf1b..86fb646 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1988,16 +1988,13 @@
 {
 	struct appctx      *appctx;
 	struct session     *sess;
+	struct cs_endpoint *endp;
 	struct conn_stream *cs;
 	struct stream      *strm;
 
-	cs = cs_new(NULL);
-	if (!cs)
+	if ((appctx = appctx_new(&spoe_applet)) == NULL)
 		goto out_error;
 
-	if ((appctx = appctx_new(&spoe_applet, cs)) == NULL)
-		goto out_free_cs;
-
 	appctx->ctx.spoe.ptr = pool_zalloc(pool_head_spoe_appctx);
 	if (SPOE_APPCTX(appctx) == NULL)
 		goto out_free_appctx;
@@ -2028,14 +2025,24 @@
 	if (!sess)
 		goto out_free_spoe;
 
+	endp = cs_endpoint_new();
+	if (!endp)
+		goto out_free_sess;
+	endp->target = appctx;
+	endp->ctx = appctx;
+	endp->flags |= CS_EP_T_APPLET;
+
-	if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL)
+	cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+	if (!cs) {
+		cs_endpoint_free(endp);
 		goto out_free_sess;
+	}
 
-	cs_attach_endp_app(cs, appctx, appctx);
+	strm = DISGUISE(cs_strm(cs));
 	stream_set_backend(strm, conf->agent->b.be);
 
 	/* applet is waiting for data */
-	si_cant_get(strm->csf->si);
+	si_cant_get(cs_si(strm->csf));
 	appctx_wakeup(appctx);
 
 	strm->do_log = NULL;
@@ -2058,8 +2065,6 @@
 	pool_free(pool_head_spoe_appctx, SPOE_APPCTX(appctx));
  out_free_appctx:
 	appctx_free(appctx);
- out_free_cs:
-	cs_free(cs);
  out_error:
 	return NULL;
 }
diff --git a/src/hlua.c b/src/hlua.c
index 3f9b123..705c15c 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -2918,8 +2918,9 @@
 	struct hlua_socket *socket;
 	struct appctx *appctx;
 	struct session *sess;
+	struct cs_endpoint *endp;
 	struct conn_stream *cs;
-	struct stream *strm;
+	struct stream *s;
 
 	/* Check stack size. */
 	if (!lua_checkstack(L, 3)) {
@@ -2944,17 +2945,11 @@
 	lua_rawgeti(L, LUA_REGISTRYINDEX, class_socket_ref);
 	lua_setmetatable(L, -2);
 
-	cs = cs_new(NULL);
-	if (!cs) {
-		hlua_pusherror(L, "socket: out of memory");
-		goto out_fail_conf;
-	}
-
 	/* Create the applet context */
-	appctx = appctx_new(&update_applet, cs);
+	appctx = appctx_new(&update_applet);
 	if (!appctx) {
 		hlua_pusherror(L, "socket: out of memory");
-		goto out_fail_cs;
+		goto out_fail_conf;
 	}
 
 	appctx->ctx.hlua_cosocket.connected = 0;
@@ -2969,13 +2964,21 @@
 		goto out_fail_appctx;
 	}
 
+	endp = cs_endpoint_new();
+	if (!endp)
+		goto out_fail_sess;
+	endp->target = appctx;
+	endp->ctx = appctx;
+	endp->flags |= CS_EP_T_APPLET;
+
-	strm = stream_new(sess, cs, &BUF_NULL);
-	if (!strm) {
+	cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+	if (!cs) {
 		hlua_pusherror(L, "socket: out of memory");
+		cs_endpoint_free(endp);
 		goto out_fail_sess;
 	}
 
-	cs_attach_endp_app(cs, appctx, appctx);
+	s = DISGUISE(cs_strm(cs));
 
 	/* Initialise cross reference between stream and Lua socket object. */
 	xref_create(&socket->xref, &appctx->ctx.hlua_cosocket.xref);
@@ -2984,11 +2987,11 @@
 	 * and retrieve data from the server. The connection is initialized
 	 * with the "struct server".
 	 */
-	si_set_state(strm->csb->si, SI_ST_ASS);
+	si_set_state(cs_si(s->csb), SI_ST_ASS);
 
 	/* Force destination server. */
-	strm->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;
-	strm->target = &socket_tcp->obj_type;
+	s->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;
+	s->target = &socket_tcp->obj_type;
 
 	return 1;
 
@@ -2996,8 +2999,6 @@
 	session_free(sess);
  out_fail_appctx:
 	appctx_free(appctx);
- out_fail_cs:
-	cs_free(cs);
  out_fail_conf:
 	WILL_LJMP(lua_error(L));
 	return 0;
diff --git a/src/http_client.c b/src/http_client.c
index dd27482..55701e5 100644
--- a/src/http_client.c
+++ b/src/http_client.c
@@ -455,8 +455,10 @@
 	struct applet *applet = &httpclient_applet;
 	struct appctx *appctx;
 	struct session *sess;
+	struct cs_endpoint *endp;
 	struct conn_stream *cs;
 	struct stream *s;
+	struct sockaddr_storage *addr = NULL;
 	int len;
 	struct sockaddr_storage ss_url;
 	struct sockaddr_storage* ss_dst;
@@ -476,17 +478,11 @@
 		goto out;
 	}
 
-	cs = cs_new(NULL);
-	if (!cs) {
-		ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__);
-		goto out;
-	}
-
 	/* The HTTP client will be created in the same thread as the caller,
 	 * avoiding threading issues */
-	appctx = appctx_new(applet, cs);
+	appctx = appctx_new(applet);
 	if (!appctx)
-		goto out_free_cs;
+		goto out;
 
 	sess = session_new(httpclient_proxy, NULL, &appctx->obj_type);
 	if (!sess) {
@@ -494,25 +490,33 @@
 		goto out_free_appctx;
 	}
 
-	if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) {
-		ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
-		goto out_free_sess;
-	}
-
-	/* set the "timeout server" */
-	s->req.wto = hc->timeout_server;
-	s->res.rto = hc->timeout_server;
-
 	/* if httpclient_set_dst() was used, sets the alternative address */
 	if (hc->dst)
 		ss_dst = hc->dst;
 	else
 		ss_dst = &ss_url;
 
-	if (!sockaddr_alloc(&cs_si(s->csb)->dst, ss_dst, sizeof(*hc->dst))) {
-		ha_alert("httpclient: Failed to initialize stream in %s:%d.\n", __FUNCTION__, __LINE__);
-		goto out_free_stream;
+	if (!sockaddr_alloc(&addr, ss_dst, sizeof(*hc->dst)))
+		goto out_free_sess;
+
+	endp = cs_endpoint_new();
+	if (!endp)
+		goto out_free_addr;
+	endp->target = appctx;
+	endp->ctx = appctx;
+	endp->flags |= CS_EP_T_APPLET;
+
+	cs = cs_new_from_applet(endp, sess, &hc->req.buf);
+	if (!cs) {
+		ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
+		cs_endpoint_free(endp);
+		goto out_free_addr;
 	}
+	s = DISGUISE(cs_strm(cs));
+
+	/* set the "timeout server" */
+	s->req.wto = hc->timeout_server;
+	s->res.rto = hc->timeout_server;
 
 	/* choose the SSL server or not */
 	switch (out.scheme) {
@@ -529,9 +533,9 @@
 			break;
 	}
 
-	cs_attach_endp_app(cs, appctx, appctx);
-	s->flags |= SF_ASSIGNED|SF_ADDR_SET;
+	cs_si(s->csb)->dst = addr;
 	cs_si(s->csb)->flags |= SI_FL_NOLINGER;
+	s->flags |= SF_ASSIGNED|SF_ADDR_SET;
 	s->res.flags |= CF_READ_DONTWAIT;
 
 	/* applet is waiting for data */
@@ -550,14 +554,16 @@
 	return appctx;
 
 out_free_stream:
+	cs_detach_app(cs);
 	LIST_DELETE(&s->list);
 	pool_free(pool_head_stream, s);
+	cs_free(cs);
+out_free_addr:
+	sockaddr_free(&addr);
 out_free_sess:
 	session_free(sess);
 out_free_appctx:
 	appctx_free(appctx);
-out_free_cs:
-	cs_free(cs);
 out:
 
 	return NULL;
diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c
index ea956b3..e18d9ab 100644
--- a/src/mux_fcgi.c
+++ b/src/mux_fcgi.c
@@ -1130,7 +1130,7 @@
 		TRACE_ERROR("fstream allocation failure", FCGI_EV_FSTRM_NEW|FCGI_EV_FSTRM_END|FCGI_EV_FSTRM_ERR, fconn->conn);
 		goto out;
 	}
-	cs_attach_endp_mux(cs, fstrm, fconn->conn);
+	cs_attach_mux(cs, fstrm, fconn->conn);
 	fstrm->cs = cs;
 	fstrm->sess = sess;
 	fconn->nb_cs++;
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 669b411..9ebd0ff 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -717,9 +717,9 @@
 {
 	struct h1c *h1c = h1s->h1c;
 	struct cs_endpoint *endp;
-	struct conn_stream *cs;
 
 	TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s);
+
 	endp = cs_endpoint_new();
 	if (!endp) {
 		TRACE_ERROR("CS endp allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
@@ -727,36 +727,27 @@
 	}
 	endp->target = h1s;
 	endp->ctx = h1c->conn;
+	endp->flags |= CS_EP_T_MUX;
 	if (h1s->flags & H1S_F_NOT_FIRST)
 		endp->flags |= CS_EP_NOT_FIRST;
 	if (h1s->req.flags & H1_MF_UPG_WEBSOCKET)
 		endp->flags |= CS_EP_WEBSOCKET;
 
-	cs = cs_new(endp);
-	if (!cs) {
+	h1s->cs = cs_new_from_mux(endp, h1c->conn->owner, input);
+	if (!h1s->cs) {
 		TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
 		cs_endpoint_free(endp);
 		goto err;
 	}
-	cs_attach_endp_mux(cs, h1s, h1c->conn);
-	h1s->cs = cs;
-
-	if (!stream_new(h1c->conn->owner, cs, input)) {
-		TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
-		goto err_cs;
-	}
 
 	HA_ATOMIC_INC(&h1c->px_counters->open_streams);
 	HA_ATOMIC_INC(&h1c->px_counters->total_streams);
 
 	h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY;
 	TRACE_LEAVE(H1_EV_STRM_NEW, h1c->conn, h1s);
-	return cs;
+	return h1s->cs;
 
-  err_cs:
-	cs_free(cs);
   err:
-	h1s->cs = NULL;
 	TRACE_DEVEL("leaving on error", H1_EV_STRM_NEW|H1_EV_STRM_ERR, h1c->conn, h1s);
 	return NULL;
 }
@@ -856,7 +847,7 @@
 	if (!h1s)
 		goto fail;
 
-	cs_attach_endp_mux(cs, h1s, h1c->conn);
+	cs_attach_mux(cs, h1s, h1c->conn);
 	h1s->flags |= H1S_F_RX_BLK;
 	h1s->cs = cs;
 	h1s->sess = sess;
@@ -1004,7 +995,7 @@
 		if (!h1c_frt_stream_new(h1c))
 			goto fail;
 		h1c->h1s->cs = cs;
-		cs_attach_endp_mux(cs, h1c->h1s, conn);
+		cs_attach_mux(cs, h1c->h1s, conn);
 
 		/* Attach the CS but Not ready yet */
 		h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED;
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 98a837d..39e5ece 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -1591,7 +1591,6 @@
 {
 	struct session *sess = h2c->conn->owner;
 	struct cs_endpoint *endp;
-	struct conn_stream *cs;
 	struct h2s *h2s;
 
 	TRACE_ENTER(H2_EV_H2S_NEW, h2c->conn);
@@ -1608,30 +1607,26 @@
 		goto out_close;
 	endp->target = h2s;
 	endp->ctx = h2c->conn;
-	endp->flags |= CS_EP_NOT_FIRST;
+	endp->flags |= (CS_EP_T_MUX|CS_EP_NOT_FIRST);
 	/* FIXME wrong analogy between ext-connect and websocket, this need to
 	 * be refine.
 	 */
 	if (flags & H2_SF_EXT_CONNECT_RCVD)
 		endp->flags |= CS_EP_WEBSOCKET;
 
-	cs = cs_new(endp);
-	if (!cs) {
-		cs_endpoint_free(endp);
-		goto out_close;
-	}
-	cs_attach_endp_mux(cs, h2s, h2c->conn);
-	h2s->cs = cs;
-	h2c->nb_cs++;
-
 	/* The stream will record the request's accept date (which is either the
 	 * end of the connection's or the date immediately after the previous
 	 * request) and the idle time, which is the delay since the previous
 	 * request. We can set the value now, it will be copied by stream_new().
 	 */
 	sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake;
-	if (!stream_new(h2c->conn->owner, cs, input))
-		goto out_free_cs;
+
+	h2s->cs = cs_new_from_mux(endp, sess, input);
+	if (!h2s->cs) {
+		cs_endpoint_free(endp);
+		goto out_close;
+	}
+	h2c->nb_cs++;
 
 	/* We want the accept date presented to the next stream to be the one
 	 * we have now, the handshake time to be null (since the next stream
@@ -1649,12 +1644,6 @@
 	TRACE_LEAVE(H2_EV_H2S_NEW, h2c->conn);
 	return h2s;
 
- out_free_cs:
-	h2c->nb_cs--;
-	if (!h2c->nb_cs)
-		h2c->idle_start = now_ms;
-	cs_free(cs);
-	h2s->cs = NULL;
  out_close:
 	h2s_destroy(h2s);
  out:
@@ -1684,7 +1673,7 @@
 	if (!h2s)
 		goto out;
 
-	cs_attach_endp_mux(cs, h2s, h2c->conn);
+	cs_attach_mux(cs, h2s, h2c->conn);
 	h2s->cs = cs;
 	h2s->sess = sess;
 	h2c->nb_cs++;
diff --git a/src/mux_pt.c b/src/mux_pt.c
index e49898e..5bd7193 100644
--- a/src/mux_pt.c
+++ b/src/mux_pt.c
@@ -297,18 +297,14 @@
 			goto fail_free_ctx;
 		endp->target = ctx;
 		endp->ctx = conn;
-		cs = cs_new(endp);
+		endp->flags |= CS_EP_T_MUX;
+
+		cs = cs_new_from_mux(endp, sess, input);
 		if (!cs) {
 			TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
 			cs_endpoint_free(endp);
 			goto fail_free_ctx;
 		}
-		cs_attach_endp_mux(cs, ctx, conn);
-
-		if (!stream_new(conn->owner, cs, &BUF_NULL)) {
-			TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);
-			goto fail_free;
-		}
 		TRACE_POINT(PT_EV_STRM_NEW, conn, cs);
 	}
 	conn->ctx = ctx;
@@ -320,9 +316,7 @@
 	TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs);
 	return 0;
 
- fail_free:
-	cs_free(cs);
-fail_free_ctx:
+ fail_free_ctx:
 	if (ctx->wait_event.tasklet)
 		tasklet_free(ctx->wait_event.tasklet);
 	pool_free(pool_head_pt_ctx, ctx);
@@ -379,7 +373,7 @@
 	TRACE_ENTER(PT_EV_STRM_NEW, conn);
 	if (ctx->wait_event.events)
 		conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
-	cs_attach_endp_mux(cs, ctx, conn);
+	cs_attach_mux(cs, ctx, conn);
 	ctx->cs = cs;
 	cs->flags |= CS_FL_RCV_MORE;
 
diff --git a/src/peers.c b/src/peers.c
index b88f4ec..9701963 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -3181,8 +3181,10 @@
 	struct proxy *p = peers->peers_fe; /* attached frontend */
 	struct appctx *appctx;
 	struct session *sess;
+	struct cs_endpoint *endp;
 	struct conn_stream *cs;
 	struct stream *s;
+	struct sockaddr_storage *addr = NULL;
 
 	peer->new_conn++;
 	peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
@@ -3191,15 +3193,9 @@
 	peer->last_hdshk = now_ms;
 	s = NULL;
 
-	cs = cs_new(NULL);
-	if (!cs) {
-		ha_alert("out of memory in peer_session_create().\n");
-		goto out_close;
-	}
-
-	appctx = appctx_new(&peer_applet, cs);
+	appctx = appctx_new(&peer_applet);
 	if (!appctx)
-		goto out_free_cs;
+		goto out_close;
 
 	appctx->st0 = PEER_SESS_ST_CONNECT;
 	appctx->ctx.peers.ptr = (void *)peer;
@@ -3210,23 +3206,34 @@
 		goto out_free_appctx;
 	}
 
-	if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
-		ha_alert("Failed to initialize stream in peer_session_create().\n");
+	if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr)))
 		goto out_free_sess;
+
+	endp = cs_endpoint_new();
+	if (!endp)
+		goto out_free_addr;
+	endp->target = appctx;
+	endp->ctx = appctx;
+	endp->flags |= CS_EP_T_APPLET;
+
+	cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+	if (!cs) {
+		ha_alert("Failed to initialize stream in peer_session_create().\n");
+		cs_endpoint_free(endp);
+		goto out_free_addr;
 	}
 
+	s = DISGUISE(cs_strm(cs));
+
 	/* applet is waiting for data */
 	si_cant_get(cs_si(s->csf));
 	appctx_wakeup(appctx);
 
 	/* initiate an outgoing connection */
-	s->target = peer_session_target(peer, s);
-	if (!sockaddr_alloc(&(cs_si(s->csb)->dst), &peer->addr, sizeof(peer->addr)))
-		goto out_free_strm;
-
-	cs_attach_endp_app(cs, appctx, appctx);
-	s->flags = SF_ASSIGNED|SF_ADDR_SET;
+	cs_si(s->csb)->dst = addr;
 	cs_si(s->csb)->flags |= SI_FL_NOLINGER;
+	s->flags = SF_ASSIGNED|SF_ADDR_SET;
+	s->target = peer_session_target(peer, s);
 
 	s->do_log = NULL;
 	s->uniq_id = 0;
@@ -3238,15 +3245,12 @@
 	return appctx;
 
 	/* Error unrolling */
- out_free_strm:
-	LIST_DELETE(&s->list);
-	pool_free(pool_head_stream, s);
+ out_free_addr:
+	sockaddr_free(&addr);
  out_free_sess:
 	session_free(sess);
  out_free_appctx:
 	appctx_free(appctx);
- out_free_cs:
-	cs_free(cs);
  out_close:
 	return NULL;
 }
diff --git a/src/sink.c b/src/sink.c
index d704042..0f01689 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -636,22 +636,18 @@
 	struct proxy *p = sink->forward_px;
 	struct appctx *appctx;
 	struct session *sess;
+	struct cs_endpoint *endp;
 	struct conn_stream *cs;
 	struct stream *s;
 	struct applet *applet = &sink_forward_applet;
+	struct sockaddr_storage *addr = NULL;
 
 	if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
 		applet = &sink_forward_oc_applet;
 
-	cs = cs_new(NULL);
-	if (!cs) {
-		ha_alert("out of memory in sink_forward_session_create");
-		goto out_close;
-	}
-
-	appctx = appctx_new(applet, cs);
+	appctx = appctx_new(applet);
 	if (!appctx)
-		goto out_free_cs;
+		goto out_close;
 
 	appctx->ctx.sft.ptr = (void *)sft;
 
@@ -661,19 +657,29 @@
 		goto out_free_appctx;
 	}
 
-	if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
-		ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
+	if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
 		goto out_free_sess;
+
+	endp = cs_endpoint_new();
+	if (!endp)
+		goto out_free_addr;
+	endp->target = appctx;
+	endp->ctx = appctx;
+	endp->flags |= CS_EP_T_APPLET;
+
+	cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+	if (!cs) {
+		ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
+		cs_endpoint_free(endp);
+		goto out_free_addr;
 	}
+	s = DISGUISE(cs_strm(cs));
 
+	cs_si(s->csb)->dst = addr;
+	cs_si(s->csb)->flags |= SI_FL_NOLINGER;
 
 	s->target = &sft->srv->obj_type;
-	if (!sockaddr_alloc(&cs_si(s->csb)->dst, &sft->srv->addr, sizeof(sft->srv->addr)))
-		goto out_free_strm;
-
-	cs_attach_endp_app(cs, appctx, appctx);
 	s->flags = SF_ASSIGNED|SF_ADDR_SET;
-	cs_si(s->csb)->flags |= SI_FL_NOLINGER;
 
 	s->do_log = NULL;
 	s->uniq_id = 0;
@@ -688,15 +694,12 @@
 	return appctx;
 
 	/* Error unrolling */
- out_free_strm:
-	LIST_DELETE(&s->list);
-	pool_free(pool_head_stream, s);
+ out_free_addr:
+	sockaddr_free(&addr);
  out_free_sess:
 	session_free(sess);
  out_free_appctx:
 	appctx_free(appctx);
- out_free_cs:
-	cs_free(cs);
  out_close:
 	return NULL;
 }
diff --git a/src/stream.c b/src/stream.c
index e30027c..b07de9b 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -443,15 +443,13 @@
 		s->flags |= SF_HTX;
 
 	s->csf = cs;
-	s->csb = cs_new(NULL);
+	if (cs_attach_strm(s->csf, s) < 0)
+		goto out_fail_attach_csf;
+
+	s->csb = cs_new_from_strm(s, CS_FL_NONE);
 	if (!s->csb)
 		goto out_fail_alloc_csb;
 
-	if (cs_attach_app(s->csf, &s->obj_type) < 0)
-		goto out_fail_attach_csf;
-	if (cs_attach_app(s->csb, &s->obj_type) < 0)
-		goto out_fail_attach_csb;
-
 	si_set_state(cs_si(s->csf), SI_ST_EST);
 	cs_si(s->csf)->hcto = sess->fe->timeout.clientfin;
 
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 4e2f7bb..3609647 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -339,10 +339,11 @@
 
 	DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si));
 
-	appctx = appctx_new(app, si->cs);
+	appctx = appctx_new(app);
 	if (!appctx)
 		return NULL;
-	cs_attach_endp_app(si->cs, appctx, appctx);
+	cs_attach_applet(si->cs, appctx, appctx);
+	appctx->owner = si->cs;
 	appctx->t->nice = si_strm(si)->task->nice;
 	si_cant_get(si);
 	appctx_wakeup(appctx);
diff --git a/src/tcpcheck.c b/src/tcpcheck.c
index ee439da..53ee771 100644
--- a/src/tcpcheck.c
+++ b/src/tcpcheck.c
@@ -1101,7 +1101,8 @@
 		TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
 		goto out;
 	}
-	cs_attach_endp_mux(check->cs, NULL, conn);
+	cs_attach_mux(check->cs, NULL, conn);
+	conn->ctx = check->cs;
 	tasklet_set_tid(check->wait_list.tasklet, tid);
 	conn_set_owner(conn, check->sess, NULL);