MEDIUM: conn-stream: Pre-allocate endpoint to create CS from muxes and applets
It is a transient commit to prepare next changes. Now, when a conn-stream is
created from an applet or a multiplexer, an endpoint is always provided. In
addition, the API to create a conn-stream was specialized to have one
function per type.
The next step will be to share the endpoint structure.
diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h
index 97164ac..97b9c34 100644
--- a/include/haproxy/applet.h
+++ b/include/haproxy/applet.h
@@ -59,7 +59,7 @@
* appctx_free(). <applet> is assigned as the applet, but it can be NULL. The
* applet's task is always created on the current thread.
*/
-static inline struct appctx *appctx_new(struct applet *applet, void *owner)
+static inline struct appctx *appctx_new(struct applet *applet)
{
struct appctx *appctx;
@@ -67,7 +67,6 @@
if (likely(appctx != NULL)) {
appctx->obj_type = OBJ_TYPE_APPCTX;
appctx->applet = applet;
- appctx->owner = owner;
appctx_init(appctx);
appctx->t = task_new_here();
if (unlikely(appctx->t == NULL)) {
diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h
index 638b0cf..e0c3b50 100644
--- a/include/haproxy/conn_stream.h
+++ b/include/haproxy/conn_stream.h
@@ -23,11 +23,13 @@
#define _HAPROXY_CONN_STREAM_H
#include <haproxy/api.h>
-#include <haproxy/applet.h>
#include <haproxy/connection.h>
#include <haproxy/conn_stream-t.h>
#include <haproxy/obj_type.h>
+struct buffer;
+struct session;
+struct appctx;
struct stream;
struct stream_interface;
struct check;
@@ -38,10 +40,16 @@
void cs_endpoint_free(struct cs_endpoint *endp);
struct conn_stream *cs_new(struct cs_endpoint *endp);
+struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input);
+struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input);
+struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags);
+struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags);
void cs_free(struct conn_stream *cs);
-void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx);
-void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx);
-int cs_attach_app(struct conn_stream *cs, enum obj_type *app);
+
+void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx);
+void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx);
+int cs_attach_strm(struct conn_stream *cs, struct stream *strm);
+
void cs_detach_endp(struct conn_stream *cs);
void cs_detach_app(struct conn_stream *cs);
diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h
index aab4687..6c97b7f 100644
--- a/include/haproxy/mux_quic.h
+++ b/include/haproxy/mux_quic.h
@@ -115,15 +115,13 @@
return NULL;
endp->target = qcs;
endp->ctx = qcs->qcc->conn;
- cs = cs_new(endp);
+ endp->flags |= CS_EP_T_MUX;
+ cs = cs_new_from_mux(endp, qcs->qcc->conn->owner, buf);
if (!cs) {
cs_endpoint_free(endp);
return NULL;
}
- cs_attach_endp_mux(cs, qcs, qcs->qcc->conn);
qcs->cs = cs;
- stream_new(qcs->qcc->conn->owner, cs, buf);
-
++qcs->qcc->nb_cs;
return cs;
diff --git a/src/backend.c b/src/backend.c
index 2f33f37..f8c13b4 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -1570,7 +1570,9 @@
return SF_ERR_INTERNAL; /* how did we get there ? */
}
- cs_attach_endp_mux(s->csb, NULL, srv_conn);
+ cs_attach_mux(s->csb, NULL, srv_conn);
+ srv_conn->ctx = s->csb;
+
#if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation)
if (!srv ||
(srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) ||
diff --git a/src/check.c b/src/check.c
index b26e7b9..44583c2 100644
--- a/src/check.c
+++ b/src/check.c
@@ -1391,11 +1391,9 @@
if (check->type == PR_O2_EXT_CHK)
t = task_new_on(0);
else {
- check->cs = cs_new(NULL);
+ check->cs = cs_new_from_check(check, CS_FL_NONE);
if (!check->cs)
goto fail_alloc_cs;
- if (cs_attach_app(check->cs, &check->obj_type) < 0)
- goto fail_attach_cs;
t = task_new_anywhere();
}
@@ -1420,7 +1418,6 @@
return 1;
fail_alloc_task:
- fail_attach_cs:
cs_free(check->cs);
fail_alloc_cs:
ha_alert("Starting [%s:%s] check: out of memory.\n",
diff --git a/src/conn_stream.c b/src/conn_stream.c
index 5b81774..24a22b5 100644
--- a/src/conn_stream.c
+++ b/src/conn_stream.c
@@ -75,6 +75,68 @@
return NULL;
}
+struct conn_stream *cs_new_from_mux(struct cs_endpoint *endp, struct session *sess, struct buffer *input)
+{
+ struct conn_stream *cs;
+
+ cs = cs_new(endp);
+ if (unlikely(!cs))
+ return NULL;
+ if (unlikely(!stream_new(sess, cs, input))) {
+ pool_free(pool_head_connstream, cs);
+ cs = NULL;
+ }
+ return cs;
+}
+
+struct conn_stream *cs_new_from_applet(struct cs_endpoint *endp, struct session *sess, struct buffer *input)
+{
+ struct conn_stream *cs;
+ struct appctx *appctx = endp->ctx;
+
+ cs = cs_new(endp);
+ if (unlikely(!cs))
+ return NULL;
+ appctx->owner = cs;
+ if (unlikely(!stream_new(sess, cs, input))) {
+ pool_free(pool_head_connstream, cs);
+ cs = NULL;
+ }
+ return cs;
+}
+
+struct conn_stream *cs_new_from_strm(struct stream *strm, unsigned int flags)
+{
+ struct conn_stream *cs;
+
+ cs = cs_new(NULL);
+ if (unlikely(!cs))
+ return NULL;
+ cs->flags |= flags;
+ cs->si = si_new(cs);
+ if (unlikely(!cs->si)) {
+ cs_free(cs);
+ return NULL;
+ }
+ cs->app = &strm->obj_type;
+ cs->si->ops = &si_embedded_ops;
+ cs->data_cb = NULL;
+ return cs;
+}
+
+struct conn_stream *cs_new_from_check(struct check *check, unsigned int flags)
+{
+ struct conn_stream *cs;
+
+ cs = cs_new(NULL);
+ if (unlikely(!cs))
+ return NULL;
+ cs->flags |= flags;
+ cs->app = &check->obj_type;
+ cs->data_cb = &check_conn_cb;
+ return cs;
+}
+
/* Releases a conn_stream previously allocated by cs_new(), as well as any
* buffer it would still hold.
*/
@@ -89,11 +151,11 @@
/* Attaches a conn_stream to an mux endpoint and sets the endpoint ctx */
-void cs_attach_endp_mux(struct conn_stream *cs, void *endp, void *ctx)
+void cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
{
struct connection *conn = ctx;
- cs->endp->target = endp;
+ cs->endp->target = target;
cs->endp->ctx = ctx;
cs->endp->flags |= CS_EP_T_MUX;
if (!conn->ctx)
@@ -107,11 +169,11 @@
}
/* Attaches a conn_stream to an applet endpoint and sets the endpoint ctx */
-void cs_attach_endp_app(struct conn_stream *cs, void *endp, void *ctx)
+void cs_attach_applet(struct conn_stream *cs, void *target, void *ctx)
{
- struct appctx *appctx = endp;
+ struct appctx *appctx = target;
- cs->endp->target = endp;
+ cs->endp->target = target;
cs->endp->ctx = ctx;
cs->endp->flags |= CS_EP_T_APPLET;
appctx->owner = cs;
@@ -122,31 +184,26 @@
}
/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
-int cs_attach_app(struct conn_stream *cs, enum obj_type *app)
+int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
{
- cs->app = app;
+ cs->app = &strm->obj_type;
- if (objt_stream(app)) {
- if (!cs->si)
- cs->si = si_new(cs);
- if (unlikely(!cs->si))
- return -1;
+ cs->si = si_new(cs);
+ if (unlikely(!cs->si))
+ return -1;
- if (cs->endp->flags & CS_EP_T_MUX) {
- cs->si->ops = &si_conn_ops;
- cs->data_cb = &si_conn_cb;
- }
- else if (cs->endp->flags & CS_EP_T_APPLET) {
- cs->si->ops = &si_applet_ops;
- cs->data_cb = NULL;
- }
- else {
- cs->si->ops = &si_embedded_ops;
- cs->data_cb = NULL;
- }
+ if (cs->endp->flags & CS_EP_T_MUX) {
+ cs->si->ops = &si_conn_ops;
+ cs->data_cb = &si_conn_cb;
}
- else if (objt_check(app))
- cs->data_cb = &check_conn_cb;
+ else if (cs->endp->flags & CS_EP_T_APPLET) {
+ cs->si->ops = &si_applet_ops;
+ cs->data_cb = NULL;
+ }
+ else {
+ cs->si->ops = &si_embedded_ops;
+ cs->data_cb = NULL;
+ }
return 0;
}
diff --git a/src/dns.c b/src/dns.c
index 461e3c7..e6f7093 100644
--- a/src/dns.c
+++ b/src/dns.c
@@ -887,19 +887,15 @@
{
struct appctx *appctx;
struct session *sess;
+ struct cs_endpoint *endp;
struct conn_stream *cs;
struct stream *s;
struct applet *applet = &dns_session_applet;
+ struct sockaddr_storage *addr = NULL;
- cs = cs_new(NULL);
- if (!cs) {
- ha_alert("out of memory in dns_session_create().\n");
- goto out_close;
- }
-
- appctx = appctx_new(applet, cs);
+ appctx = appctx_new(applet);
if (!appctx)
- goto out_free_cs;
+ goto out_close;
appctx->ctx.sft.ptr = (void *)ds;
sess = session_new(ds->dss->srv->proxy, NULL, &appctx->obj_type);
@@ -908,18 +904,28 @@
goto out_free_appctx;
}
- if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
- ha_alert("Failed to initialize stream in dns_session_create().\n");
+ if (!sockaddr_alloc(&addr, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
goto out_free_sess;
+
+ endp = cs_endpoint_new();
+ if (!endp)
+ goto out_free_addr;
+ endp->target = appctx;
+ endp->ctx = appctx;
+ endp->flags |= CS_EP_T_APPLET;
+
+ cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+ if (!cs) {
+ ha_alert("Failed to initialize stream in dns_session_create().\n");
+ cs_endpoint_free(endp);
+ goto out_free_addr;
}
+ s = DISGUISE(cs_strm(cs));
+ cs_si(s->csb)->dst = addr;
+ cs_si(s->csb)->flags |= SI_FL_NOLINGER;
s->target = &ds->dss->srv->obj_type;
- if (!sockaddr_alloc(&cs_si(s->csb)->dst, &ds->dss->srv->addr, sizeof(ds->dss->srv->addr)))
- goto out_free_strm;
-
- cs_attach_endp_app(cs, appctx, appctx);
s->flags = SF_ASSIGNED|SF_ADDR_SET;
- cs_si(s->csb)->flags |= SI_FL_NOLINGER;
s->do_log = NULL;
s->uniq_id = 0;
@@ -934,15 +940,12 @@
return appctx;
/* Error unrolling */
- out_free_strm:
- LIST_DELETE(&s->list);
- pool_free(pool_head_stream, s);
+ out_free_addr:
+ sockaddr_free(&addr);
out_free_sess:
session_free(sess);
out_free_appctx:
appctx_free(appctx);
- out_free_cs:
- cs_free(cs);
out_close:
return NULL;
}
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index e8daf1b..86fb646 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -1988,16 +1988,13 @@
{
struct appctx *appctx;
struct session *sess;
+ struct cs_endpoint *endp;
struct conn_stream *cs;
struct stream *strm;
- cs = cs_new(NULL);
- if (!cs)
+ if ((appctx = appctx_new(&spoe_applet)) == NULL)
goto out_error;
- if ((appctx = appctx_new(&spoe_applet, cs)) == NULL)
- goto out_free_cs;
-
appctx->ctx.spoe.ptr = pool_zalloc(pool_head_spoe_appctx);
if (SPOE_APPCTX(appctx) == NULL)
goto out_free_appctx;
@@ -2028,14 +2025,24 @@
if (!sess)
goto out_free_spoe;
+ endp = cs_endpoint_new();
+ if (!endp)
+ goto out_free_sess;
+ endp->target = appctx;
+ endp->ctx = appctx;
+ endp->flags |= CS_EP_T_APPLET;
+
- if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL)
+ cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+ if (!cs) {
+ cs_endpoint_free(endp);
goto out_free_sess;
+ }
- cs_attach_endp_app(cs, appctx, appctx);
+ strm = DISGUISE(cs_strm(cs));
stream_set_backend(strm, conf->agent->b.be);
/* applet is waiting for data */
- si_cant_get(strm->csf->si);
+ si_cant_get(cs_si(strm->csf));
appctx_wakeup(appctx);
strm->do_log = NULL;
@@ -2058,8 +2065,6 @@
pool_free(pool_head_spoe_appctx, SPOE_APPCTX(appctx));
out_free_appctx:
appctx_free(appctx);
- out_free_cs:
- cs_free(cs);
out_error:
return NULL;
}
diff --git a/src/hlua.c b/src/hlua.c
index 3f9b123..705c15c 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -2918,8 +2918,9 @@
struct hlua_socket *socket;
struct appctx *appctx;
struct session *sess;
+ struct cs_endpoint *endp;
struct conn_stream *cs;
- struct stream *strm;
+ struct stream *s;
/* Check stack size. */
if (!lua_checkstack(L, 3)) {
@@ -2944,17 +2945,11 @@
lua_rawgeti(L, LUA_REGISTRYINDEX, class_socket_ref);
lua_setmetatable(L, -2);
- cs = cs_new(NULL);
- if (!cs) {
- hlua_pusherror(L, "socket: out of memory");
- goto out_fail_conf;
- }
-
/* Create the applet context */
- appctx = appctx_new(&update_applet, cs);
+ appctx = appctx_new(&update_applet);
if (!appctx) {
hlua_pusherror(L, "socket: out of memory");
- goto out_fail_cs;
+ goto out_fail_conf;
}
appctx->ctx.hlua_cosocket.connected = 0;
@@ -2969,13 +2964,21 @@
goto out_fail_appctx;
}
+ endp = cs_endpoint_new();
+ if (!endp)
+ goto out_fail_sess;
+ endp->target = appctx;
+ endp->ctx = appctx;
+ endp->flags |= CS_EP_T_APPLET;
+
- strm = stream_new(sess, cs, &BUF_NULL);
- if (!strm) {
+ cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+ if (!cs) {
hlua_pusherror(L, "socket: out of memory");
+ cs_endpoint_free(endp);
goto out_fail_sess;
}
- cs_attach_endp_app(cs, appctx, appctx);
+ s = DISGUISE(cs_strm(cs));
/* Initialise cross reference between stream and Lua socket object. */
xref_create(&socket->xref, &appctx->ctx.hlua_cosocket.xref);
@@ -2984,11 +2987,11 @@
* and retrieve data from the server. The connection is initialized
* with the "struct server".
*/
- si_set_state(strm->csb->si, SI_ST_ASS);
+ si_set_state(cs_si(s->csb), SI_ST_ASS);
/* Force destination server. */
- strm->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;
- strm->target = &socket_tcp->obj_type;
+ s->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;
+ s->target = &socket_tcp->obj_type;
return 1;
@@ -2996,8 +2999,6 @@
session_free(sess);
out_fail_appctx:
appctx_free(appctx);
- out_fail_cs:
- cs_free(cs);
out_fail_conf:
WILL_LJMP(lua_error(L));
return 0;
diff --git a/src/http_client.c b/src/http_client.c
index dd27482..55701e5 100644
--- a/src/http_client.c
+++ b/src/http_client.c
@@ -455,8 +455,10 @@
struct applet *applet = &httpclient_applet;
struct appctx *appctx;
struct session *sess;
+ struct cs_endpoint *endp;
struct conn_stream *cs;
struct stream *s;
+ struct sockaddr_storage *addr = NULL;
int len;
struct sockaddr_storage ss_url;
struct sockaddr_storage* ss_dst;
@@ -476,17 +478,11 @@
goto out;
}
- cs = cs_new(NULL);
- if (!cs) {
- ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__);
- goto out;
- }
-
/* The HTTP client will be created in the same thread as the caller,
* avoiding threading issues */
- appctx = appctx_new(applet, cs);
+ appctx = appctx_new(applet);
if (!appctx)
- goto out_free_cs;
+ goto out;
sess = session_new(httpclient_proxy, NULL, &appctx->obj_type);
if (!sess) {
@@ -494,25 +490,33 @@
goto out_free_appctx;
}
- if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) {
- ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
- goto out_free_sess;
- }
-
- /* set the "timeout server" */
- s->req.wto = hc->timeout_server;
- s->res.rto = hc->timeout_server;
-
/* if httpclient_set_dst() was used, sets the alternative address */
if (hc->dst)
ss_dst = hc->dst;
else
ss_dst = &ss_url;
- if (!sockaddr_alloc(&cs_si(s->csb)->dst, ss_dst, sizeof(*hc->dst))) {
- ha_alert("httpclient: Failed to initialize stream in %s:%d.\n", __FUNCTION__, __LINE__);
- goto out_free_stream;
+ if (!sockaddr_alloc(&addr, ss_dst, sizeof(*hc->dst)))
+ goto out_free_sess;
+
+ endp = cs_endpoint_new();
+ if (!endp)
+ goto out_free_addr;
+ endp->target = appctx;
+ endp->ctx = appctx;
+ endp->flags |= CS_EP_T_APPLET;
+
+ cs = cs_new_from_applet(endp, sess, &hc->req.buf);
+ if (!cs) {
+ ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
+ cs_endpoint_free(endp);
+ goto out_free_addr;
}
+ s = DISGUISE(cs_strm(cs));
+
+ /* set the "timeout server" */
+ s->req.wto = hc->timeout_server;
+ s->res.rto = hc->timeout_server;
/* choose the SSL server or not */
switch (out.scheme) {
@@ -529,9 +533,9 @@
break;
}
- cs_attach_endp_app(cs, appctx, appctx);
- s->flags |= SF_ASSIGNED|SF_ADDR_SET;
+ cs_si(s->csb)->dst = addr;
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
+ s->flags |= SF_ASSIGNED|SF_ADDR_SET;
s->res.flags |= CF_READ_DONTWAIT;
/* applet is waiting for data */
@@ -550,14 +554,16 @@
return appctx;
out_free_stream:
+ cs_detach_app(cs);
LIST_DELETE(&s->list);
pool_free(pool_head_stream, s);
+ cs_free(cs);
+out_free_addr:
+ sockaddr_free(&addr);
out_free_sess:
session_free(sess);
out_free_appctx:
appctx_free(appctx);
-out_free_cs:
- cs_free(cs);
out:
return NULL;
diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c
index ea956b3..e18d9ab 100644
--- a/src/mux_fcgi.c
+++ b/src/mux_fcgi.c
@@ -1130,7 +1130,7 @@
TRACE_ERROR("fstream allocation failure", FCGI_EV_FSTRM_NEW|FCGI_EV_FSTRM_END|FCGI_EV_FSTRM_ERR, fconn->conn);
goto out;
}
- cs_attach_endp_mux(cs, fstrm, fconn->conn);
+ cs_attach_mux(cs, fstrm, fconn->conn);
fstrm->cs = cs;
fstrm->sess = sess;
fconn->nb_cs++;
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 669b411..9ebd0ff 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -717,9 +717,9 @@
{
struct h1c *h1c = h1s->h1c;
struct cs_endpoint *endp;
- struct conn_stream *cs;
TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s);
+
endp = cs_endpoint_new();
if (!endp) {
TRACE_ERROR("CS endp allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
@@ -727,36 +727,27 @@
}
endp->target = h1s;
endp->ctx = h1c->conn;
+ endp->flags |= CS_EP_T_MUX;
if (h1s->flags & H1S_F_NOT_FIRST)
endp->flags |= CS_EP_NOT_FIRST;
if (h1s->req.flags & H1_MF_UPG_WEBSOCKET)
endp->flags |= CS_EP_WEBSOCKET;
- cs = cs_new(endp);
- if (!cs) {
+ h1s->cs = cs_new_from_mux(endp, h1c->conn->owner, input);
+ if (!h1s->cs) {
TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
cs_endpoint_free(endp);
goto err;
}
- cs_attach_endp_mux(cs, h1s, h1c->conn);
- h1s->cs = cs;
-
- if (!stream_new(h1c->conn->owner, cs, input)) {
- TRACE_DEVEL("leaving on stream creation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
- goto err_cs;
- }
HA_ATOMIC_INC(&h1c->px_counters->open_streams);
HA_ATOMIC_INC(&h1c->px_counters->total_streams);
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED | H1C_F_ST_READY;
TRACE_LEAVE(H1_EV_STRM_NEW, h1c->conn, h1s);
- return cs;
+ return h1s->cs;
- err_cs:
- cs_free(cs);
err:
- h1s->cs = NULL;
TRACE_DEVEL("leaving on error", H1_EV_STRM_NEW|H1_EV_STRM_ERR, h1c->conn, h1s);
return NULL;
}
@@ -856,7 +847,7 @@
if (!h1s)
goto fail;
- cs_attach_endp_mux(cs, h1s, h1c->conn);
+ cs_attach_mux(cs, h1s, h1c->conn);
h1s->flags |= H1S_F_RX_BLK;
h1s->cs = cs;
h1s->sess = sess;
@@ -1004,7 +995,7 @@
if (!h1c_frt_stream_new(h1c))
goto fail;
h1c->h1s->cs = cs;
- cs_attach_endp_mux(cs, h1c->h1s, conn);
+ cs_attach_mux(cs, h1c->h1s, conn);
/* Attach the CS but Not ready yet */
h1c->flags = (h1c->flags & ~H1C_F_ST_EMBRYONIC) | H1C_F_ST_ATTACHED;
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 98a837d..39e5ece 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -1591,7 +1591,6 @@
{
struct session *sess = h2c->conn->owner;
struct cs_endpoint *endp;
- struct conn_stream *cs;
struct h2s *h2s;
TRACE_ENTER(H2_EV_H2S_NEW, h2c->conn);
@@ -1608,30 +1607,26 @@
goto out_close;
endp->target = h2s;
endp->ctx = h2c->conn;
- endp->flags |= CS_EP_NOT_FIRST;
+ endp->flags |= (CS_EP_T_MUX|CS_EP_NOT_FIRST);
/* FIXME wrong analogy between ext-connect and websocket, this need to
* be refine.
*/
if (flags & H2_SF_EXT_CONNECT_RCVD)
endp->flags |= CS_EP_WEBSOCKET;
- cs = cs_new(endp);
- if (!cs) {
- cs_endpoint_free(endp);
- goto out_close;
- }
- cs_attach_endp_mux(cs, h2s, h2c->conn);
- h2s->cs = cs;
- h2c->nb_cs++;
-
/* The stream will record the request's accept date (which is either the
* end of the connection's or the date immediately after the previous
* request) and the idle time, which is the delay since the previous
* request. We can set the value now, it will be copied by stream_new().
*/
sess->t_idle = tv_ms_elapsed(&sess->tv_accept, &now) - sess->t_handshake;
- if (!stream_new(h2c->conn->owner, cs, input))
- goto out_free_cs;
+
+ h2s->cs = cs_new_from_mux(endp, sess, input);
+ if (!h2s->cs) {
+ cs_endpoint_free(endp);
+ goto out_close;
+ }
+ h2c->nb_cs++;
/* We want the accept date presented to the next stream to be the one
* we have now, the handshake time to be null (since the next stream
@@ -1649,12 +1644,6 @@
TRACE_LEAVE(H2_EV_H2S_NEW, h2c->conn);
return h2s;
- out_free_cs:
- h2c->nb_cs--;
- if (!h2c->nb_cs)
- h2c->idle_start = now_ms;
- cs_free(cs);
- h2s->cs = NULL;
out_close:
h2s_destroy(h2s);
out:
@@ -1684,7 +1673,7 @@
if (!h2s)
goto out;
- cs_attach_endp_mux(cs, h2s, h2c->conn);
+ cs_attach_mux(cs, h2s, h2c->conn);
h2s->cs = cs;
h2s->sess = sess;
h2c->nb_cs++;
diff --git a/src/mux_pt.c b/src/mux_pt.c
index e49898e..5bd7193 100644
--- a/src/mux_pt.c
+++ b/src/mux_pt.c
@@ -297,18 +297,14 @@
goto fail_free_ctx;
endp->target = ctx;
endp->ctx = conn;
- cs = cs_new(endp);
+ endp->flags |= CS_EP_T_MUX;
+
+ cs = cs_new_from_mux(endp, sess, input);
if (!cs) {
TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
cs_endpoint_free(endp);
goto fail_free_ctx;
}
- cs_attach_endp_mux(cs, ctx, conn);
-
- if (!stream_new(conn->owner, cs, &BUF_NULL)) {
- TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);
- goto fail_free;
- }
TRACE_POINT(PT_EV_STRM_NEW, conn, cs);
}
conn->ctx = ctx;
@@ -320,9 +316,7 @@
TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs);
return 0;
- fail_free:
- cs_free(cs);
-fail_free_ctx:
+ fail_free_ctx:
if (ctx->wait_event.tasklet)
tasklet_free(ctx->wait_event.tasklet);
pool_free(pool_head_pt_ctx, ctx);
@@ -379,7 +373,7 @@
TRACE_ENTER(PT_EV_STRM_NEW, conn);
if (ctx->wait_event.events)
conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
- cs_attach_endp_mux(cs, ctx, conn);
+ cs_attach_mux(cs, ctx, conn);
ctx->cs = cs;
cs->flags |= CS_FL_RCV_MORE;
diff --git a/src/peers.c b/src/peers.c
index b88f4ec..9701963 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -3181,8 +3181,10 @@
struct proxy *p = peers->peers_fe; /* attached frontend */
struct appctx *appctx;
struct session *sess;
+ struct cs_endpoint *endp;
struct conn_stream *cs;
struct stream *s;
+ struct sockaddr_storage *addr = NULL;
peer->new_conn++;
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
@@ -3191,15 +3193,9 @@
peer->last_hdshk = now_ms;
s = NULL;
- cs = cs_new(NULL);
- if (!cs) {
- ha_alert("out of memory in peer_session_create().\n");
- goto out_close;
- }
-
- appctx = appctx_new(&peer_applet, cs);
+ appctx = appctx_new(&peer_applet);
if (!appctx)
- goto out_free_cs;
+ goto out_close;
appctx->st0 = PEER_SESS_ST_CONNECT;
appctx->ctx.peers.ptr = (void *)peer;
@@ -3210,23 +3206,34 @@
goto out_free_appctx;
}
- if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
- ha_alert("Failed to initialize stream in peer_session_create().\n");
+ if (!sockaddr_alloc(&addr, &peer->addr, sizeof(peer->addr)))
goto out_free_sess;
+
+ endp = cs_endpoint_new();
+ if (!endp)
+ goto out_free_addr;
+ endp->target = appctx;
+ endp->ctx = appctx;
+ endp->flags |= CS_EP_T_APPLET;
+
+ cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+ if (!cs) {
+ ha_alert("Failed to initialize stream in peer_session_create().\n");
+ cs_endpoint_free(endp);
+ goto out_free_addr;
}
+ s = DISGUISE(cs_strm(cs));
+
/* applet is waiting for data */
si_cant_get(cs_si(s->csf));
appctx_wakeup(appctx);
/* initiate an outgoing connection */
- s->target = peer_session_target(peer, s);
- if (!sockaddr_alloc(&(cs_si(s->csb)->dst), &peer->addr, sizeof(peer->addr)))
- goto out_free_strm;
-
- cs_attach_endp_app(cs, appctx, appctx);
- s->flags = SF_ASSIGNED|SF_ADDR_SET;
+ cs_si(s->csb)->dst = addr;
cs_si(s->csb)->flags |= SI_FL_NOLINGER;
+ s->flags = SF_ASSIGNED|SF_ADDR_SET;
+ s->target = peer_session_target(peer, s);
s->do_log = NULL;
s->uniq_id = 0;
@@ -3238,15 +3245,12 @@
return appctx;
/* Error unrolling */
- out_free_strm:
- LIST_DELETE(&s->list);
- pool_free(pool_head_stream, s);
+ out_free_addr:
+ sockaddr_free(&addr);
out_free_sess:
session_free(sess);
out_free_appctx:
appctx_free(appctx);
- out_free_cs:
- cs_free(cs);
out_close:
return NULL;
}
diff --git a/src/sink.c b/src/sink.c
index d704042..0f01689 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -636,22 +636,18 @@
struct proxy *p = sink->forward_px;
struct appctx *appctx;
struct session *sess;
+ struct cs_endpoint *endp;
struct conn_stream *cs;
struct stream *s;
struct applet *applet = &sink_forward_applet;
+ struct sockaddr_storage *addr = NULL;
if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
applet = &sink_forward_oc_applet;
- cs = cs_new(NULL);
- if (!cs) {
- ha_alert("out of memory in sink_forward_session_create");
- goto out_close;
- }
-
- appctx = appctx_new(applet, cs);
+ appctx = appctx_new(applet);
if (!appctx)
- goto out_free_cs;
+ goto out_close;
appctx->ctx.sft.ptr = (void *)sft;
@@ -661,19 +657,29 @@
goto out_free_appctx;
}
- if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
- ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
+ if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
goto out_free_sess;
+
+ endp = cs_endpoint_new();
+ if (!endp)
+ goto out_free_addr;
+ endp->target = appctx;
+ endp->ctx = appctx;
+ endp->flags |= CS_EP_T_APPLET;
+
+ cs = cs_new_from_applet(endp, sess, &BUF_NULL);
+ if (!cs) {
+ ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
+ cs_endpoint_free(endp);
+ goto out_free_addr;
}
+ s = DISGUISE(cs_strm(cs));
+ cs_si(s->csb)->dst = addr;
+ cs_si(s->csb)->flags |= SI_FL_NOLINGER;
s->target = &sft->srv->obj_type;
- if (!sockaddr_alloc(&cs_si(s->csb)->dst, &sft->srv->addr, sizeof(sft->srv->addr)))
- goto out_free_strm;
-
- cs_attach_endp_app(cs, appctx, appctx);
s->flags = SF_ASSIGNED|SF_ADDR_SET;
- cs_si(s->csb)->flags |= SI_FL_NOLINGER;
s->do_log = NULL;
s->uniq_id = 0;
@@ -688,15 +694,12 @@
return appctx;
/* Error unrolling */
- out_free_strm:
- LIST_DELETE(&s->list);
- pool_free(pool_head_stream, s);
+ out_free_addr:
+ sockaddr_free(&addr);
out_free_sess:
session_free(sess);
out_free_appctx:
appctx_free(appctx);
- out_free_cs:
- cs_free(cs);
out_close:
return NULL;
}
diff --git a/src/stream.c b/src/stream.c
index e30027c..b07de9b 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -443,15 +443,13 @@
s->flags |= SF_HTX;
s->csf = cs;
- s->csb = cs_new(NULL);
+ if (cs_attach_strm(s->csf, s) < 0)
+ goto out_fail_attach_csf;
+
+ s->csb = cs_new_from_strm(s, CS_FL_NONE);
if (!s->csb)
goto out_fail_alloc_csb;
- if (cs_attach_app(s->csf, &s->obj_type) < 0)
- goto out_fail_attach_csf;
- if (cs_attach_app(s->csb, &s->obj_type) < 0)
- goto out_fail_attach_csb;
-
si_set_state(cs_si(s->csf), SI_ST_EST);
cs_si(s->csf)->hcto = sess->fe->timeout.clientfin;
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 4e2f7bb..3609647 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -339,10 +339,11 @@
DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", app, si, si_task(si));
- appctx = appctx_new(app, si->cs);
+ appctx = appctx_new(app);
if (!appctx)
return NULL;
- cs_attach_endp_app(si->cs, appctx, appctx);
+ cs_attach_applet(si->cs, appctx, appctx);
+ appctx->owner = si->cs;
appctx->t->nice = si_strm(si)->task->nice;
si_cant_get(si);
appctx_wakeup(appctx);
diff --git a/src/tcpcheck.c b/src/tcpcheck.c
index ee439da..53ee771 100644
--- a/src/tcpcheck.c
+++ b/src/tcpcheck.c
@@ -1101,7 +1101,8 @@
TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
goto out;
}
- cs_attach_endp_mux(check->cs, NULL, conn);
+ cs_attach_mux(check->cs, NULL, conn);
+ conn->ctx = check->cs;
tasklet_set_tid(check->wait_list.tasklet, tid);
conn_set_owner(conn, check->sess, NULL);