MAJOR: stream/conn_stream: Move the stream-interface into the conn-stream
Thanks to all previous changes, it is now possible to move the
stream-interface into the conn-stream. To do so, some SI functions are
removed and their conn-stream counterparts are added. In addition, the
conn-stream is now responsible to create and release the
stream-interface. While the stream-interfaces were inlined in the stream
structure, there is now a pointer in the conn-stream. stream-interfaces are
now dynamically allocated. Thus a dedicated pool is added. It is a temporary
change because, at the end, the stream-interface structure will most
probably disappear.
diff --git a/include/haproxy/channel.h b/include/haproxy/channel.h
index 07127bd..8de8e17 100644
--- a/include/haproxy/channel.h
+++ b/include/haproxy/channel.h
@@ -68,18 +68,18 @@
static inline struct stream_interface *chn_prod(const struct channel *chn)
{
if (chn->flags & CF_ISRESP)
- return &LIST_ELEM(chn, struct stream *, res)->si[1];
+ return LIST_ELEM(chn, struct stream *, res)->csb->si;
else
- return &LIST_ELEM(chn, struct stream *, req)->si[0];
+ return LIST_ELEM(chn, struct stream *, req)->csf->si;
}
/* returns a pointer to the stream interface consuming the channel (producer) */
static inline struct stream_interface *chn_cons(const struct channel *chn)
{
if (chn->flags & CF_ISRESP)
- return &LIST_ELEM(chn, struct stream *, res)->si[0];
+ return LIST_ELEM(chn, struct stream *, res)->csf->si;
else
- return &LIST_ELEM(chn, struct stream *, req)->si[1];
+ return LIST_ELEM(chn, struct stream *, req)->csb->si;
}
/* c_orig() : returns the pointer to the channel buffer's origin */
diff --git a/include/haproxy/conn_stream-t.h b/include/haproxy/conn_stream-t.h
index 88775e9..580d104 100644
--- a/include/haproxy/conn_stream-t.h
+++ b/include/haproxy/conn_stream-t.h
@@ -25,6 +25,8 @@
#include <haproxy/obj_type-t.h>
+struct stream_interface;
+
/* conn_stream flags */
enum {
CS_FL_NONE = 0x00000000, /* Just for initialization purposes */
@@ -92,7 +94,7 @@
unsigned int flags; /* CS_FL_* */
enum obj_type *end; /* points to the end point (connection or appctx) */
enum obj_type *app; /* points to the applicative point (stream or check) */
- void *data; /* pointer to upper layer's entity (eg: stream interface) */
+ struct stream_interface *si;
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
void *ctx; /* mux-specific context */
};
diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h
index 730a29b..5be833d 100644
--- a/include/haproxy/conn_stream.h
+++ b/include/haproxy/conn_stream.h
@@ -29,13 +29,16 @@
#include <haproxy/obj_type.h>
struct stream;
+struct stream_interface;
struct check;
#define IS_HTX_CS(cs) (cs_conn(cs) && IS_HTX_CONN(cs_conn(cs)))
-struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, void *data, const struct data_cb *data_cb);
+struct conn_stream *cs_new();
void cs_free(struct conn_stream *cs);
-
+void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx);
+int cs_attach_app(struct conn_stream *cs, enum obj_type *app);
+void cs_detach_endp(struct conn_stream *cs);
/*
* Initializes all required fields for a new conn_strema.
@@ -47,7 +50,7 @@
cs->end = NULL;
cs->app = NULL;
cs->ctx = NULL;
- cs->data = NULL;
+ cs->si = NULL;
cs->data_cb = NULL;
}
@@ -77,11 +80,6 @@
return (cs ? objt_appctx(cs->end) : NULL);
}
-static inline struct stream_interface *cs_si(const struct conn_stream *cs)
-{
- return (cs ? cs->data : NULL);
-}
-
static inline struct stream *cs_strm(const struct conn_stream *cs)
{
return (cs ? objt_stream(cs->app) : NULL);
@@ -92,57 +90,9 @@
return (cs ? objt_check(cs->app) : NULL);
}
-/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */
-static inline void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx)
-{
- cs->end = endp;
- cs->ctx = ctx;
-}
-
-/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
-static inline void cs_attach_app(struct conn_stream *cs, enum obj_type *app, void *data, const struct data_cb *data_cb)
-{
- cs->app = app;
- cs->data = data;
- cs->data_cb = data_cb;
-}
-
-/* Detach the conn_stream from the endpoint, if any. For a connecrion, if a mux
- * owns the connection ->detach() callback is called. Otherwise, it means the
- * conn-stream owns the connection. In this case the connection is closed and
- * released. For an applet, the appctx is released. At the end, the conn-stream
- * is not released but some fields a reset.
- */
-static inline void cs_detach_endp(struct conn_stream *cs)
+static inline struct stream_interface *cs_si(const struct conn_stream *cs)
{
- struct connection *conn;
- struct appctx *appctx;
-
- if ((conn = cs_conn(cs))) {
- if (conn->mux)
- conn->mux->detach(cs);
- else {
- /* It's too early to have a mux, let's just destroy
- * the connection
- */
- conn_stop_tracking(conn);
- conn_full_close(conn);
- if (conn->destroy_cb)
- conn->destroy_cb(conn);
- conn_free(conn);
- }
- }
- else if ((appctx = cs_appctx(cs))) {
- if (appctx->applet->release)
- appctx->applet->release(appctx);
- appctx_free(appctx);
- }
-
- /* Rest CS */
- cs->flags = CS_FL_NONE;
- cs->end = NULL;
- cs->ctx = NULL;
- cs->data_cb = NULL;
+ return (cs_strm(cs) ? cs->si : NULL);
}
/* Release a conn_stream */
diff --git a/include/haproxy/stream-t.h b/include/haproxy/stream-t.h
index 7e8a257..f75a181 100644
--- a/include/haproxy/stream-t.h
+++ b/include/haproxy/stream-t.h
@@ -167,7 +167,6 @@
struct conn_stream *csf; /* frontend conn-stream */
struct conn_stream *csb; /* backend conn-stream */
- struct stream_interface si[2]; /* client and server stream interfaces */
struct strm_logs logs; /* logs for this stream */
void (*do_log)(struct stream *s); /* the function to call in order to log (or NULL) */
diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h
index 5236ffd..a30f345 100644
--- a/include/haproxy/stream_interface.h
+++ b/include/haproxy/stream_interface.h
@@ -33,6 +33,10 @@
extern struct si_ops si_conn_ops;
extern struct si_ops si_applet_ops;
extern struct data_cb si_conn_cb;
+extern struct data_cb check_conn_cb;
+
+struct stream_interface *si_new(struct conn_stream *cs);
+void si_free(struct stream_interface *si);
/* main event functions used to move data between sockets and buffers */
int si_check_timeouts(struct stream_interface *si);
@@ -87,7 +91,7 @@
/* returns the stream interface on the other side. Used during forwarding. */
static inline struct stream_interface *si_opposite(struct stream_interface *si)
{
- return ((si->flags & SI_FL_ISBACK) ? &(cs_strm(si->cs)->si[0]) : &(cs_strm(si->cs)->si[1]));
+ return ((si->flags & SI_FL_ISBACK) ? cs_strm(si->cs)->csf->si : cs_strm(si->cs)->csb->si);
}
/* initializes a stream interface in the SI_ST_INI state. It's detached from
@@ -105,6 +109,7 @@
si->cs = NULL;
si->state = si->prev_state = SI_ST_INI;
si->ops = &si_embedded_ops;
+ si->l7_buffer = BUF_NULL;
si->wait_event.tasklet = tasklet_new();
if (!si->wait_event.tasklet)
return -1;
@@ -137,81 +142,6 @@
return !!(si_state_bit(state) & mask);
}
-/* Reset the endpoint detaching it from the conn-stream. For a connection
- * attached to a mux, it is unsubscribe from any event.
- */
-static inline void si_reset_endpoint(struct stream_interface *si)
-{
- if (!si->cs)
- return;
-
- if (cs_conn_mux(si->cs) && si->wait_event.events != 0)
- (cs_conn_mux(si->cs))->unsubscribe(si->cs, si->wait_event.events, &si->wait_event);
-
- cs_detach_endp(si->cs);
- si->ops = &si_embedded_ops;
-}
-
-/* Release the endpoint if it's a connection or an applet, then nullify it.
- * Note: released connections are closed then freed.
- */
-static inline void si_release_endpoint(struct stream_interface *si)
-{
- if (!si->cs)
- return;
- si_reset_endpoint(si);
- cs_free(si->cs);
- si->cs = NULL;
- si->ops = &si_embedded_ops;
-
-}
-
-/* Attach conn_stream <cs> to the stream interface <si>. */
-static inline void si_attach_cs(struct stream_interface *si, struct conn_stream *cs)
-{
- si->cs = cs;
- if (cs_conn(cs)) {
- si->ops = &si_conn_ops;
- cs_attach_app(cs, &si_strm(si)->obj_type, si, &si_conn_cb);
- }
- else if (cs_appctx(cs)) {
- struct appctx *appctx = cs_appctx(cs);
-
- si->ops = &si_applet_ops;
- appctx->owner = cs;
- cs_attach_app(cs, &si_strm(si)->obj_type, si, NULL);
- }
- else {
- si->ops = &si_embedded_ops;
- cs_attach_app(cs, &si_strm(si)->obj_type, si, NULL);
- }
-}
-
-/* Attach connection <conn> to the stream interface <si>. The stream interface
- * is configured to work with a connection context.
- */
-static inline void si_attach_conn(struct stream_interface *si, struct connection *conn)
-{
- si_reset_endpoint(si);
- if (!conn->ctx)
- conn->ctx = si->cs;
- si->ops = &si_conn_ops;
- cs_attach_endp(si->cs, &conn->obj_type, conn);
- cs_attach_app(si->cs, &si_strm(si)->obj_type, si, &si_conn_cb);
-}
-
-/* Attach appctx <appctx> to the stream interface <si>. The stream interface
- * is configured to work with an applet context.
- */
-static inline void si_attach_appctx(struct stream_interface *si, struct appctx *appctx)
-{
- si_reset_endpoint(si);
- appctx->owner = si->cs;
- si->ops = &si_applet_ops;
- cs_attach_endp(si->cs, &appctx->obj_type, appctx);
- cs_attach_app(si->cs, &si_strm(si)->obj_type, si, NULL);
-}
-
/* call the applet's release function if any. Needs to be called upon close() */
static inline void si_applet_release(struct stream_interface *si)
{
diff --git a/src/backend.c b/src/backend.c
index 3da12f8..0fed42a 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -1495,9 +1495,9 @@
}
if (avail >= 1) {
- si_attach_conn(cs_si(s->csb), srv_conn);
+ cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn);
if (srv_conn->mux->attach(srv_conn, s->csb, s->sess) == -1) {
- si_reset_endpoint(cs_si(s->csb));
+ cs_detach_endp(s->csb);
srv_conn = NULL;
}
}
@@ -1571,7 +1571,7 @@
return SF_ERR_INTERNAL; /* how did we get there ? */
}
- si_attach_conn(cs_si(s->csb), srv_conn);
+ cs_attach_endp(s->csb, &srv_conn->obj_type, srv_conn);
#if defined(USE_OPENSSL) && defined(TLSEXT_TYPE_application_layer_protocol_negotiation)
if (!srv ||
(srv->use_ssl != 1 || (!(srv->ssl_ctx.alpn_str) && !(srv->ssl_ctx.npn_str)) ||
@@ -2289,7 +2289,7 @@
* Note: the stream-interface will be switched to ST_REQ, ST_ASS or
* ST_TAR and SI_FL_ERR and SI_FL_EXP flags will be unset.
*/
- si_reset_endpoint(cs_si(s->csb));
+ cs_detach_endp(s->csb);
stream_choose_redispatch(s);
diff --git a/src/cli.c b/src/cli.c
index e0f9736..63ca4c6 100644
--- a/src/cli.c
+++ b/src/cli.c
@@ -2715,7 +2715,7 @@
* connection.
*/
if (!si_conn_ready(cs_si(s->csb))) {
- si_reset_endpoint(cs_si(s->csb));
+ cs_detach_endp(s->csb);
s->srv_conn = NULL;
}
diff --git a/src/conn_stream.c b/src/conn_stream.c
index 41d4a2e..0c7bf9b 100644
--- a/src/conn_stream.c
+++ b/src/conn_stream.c
@@ -14,7 +14,7 @@
#include <haproxy/connection.h>
#include <haproxy/conn_stream.h>
#include <haproxy/pool.h>
-//#include <haproxy/stream_interface.h>
+#include <haproxy/stream_interface.h>
DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream));
@@ -22,7 +22,7 @@
/* Tries to allocate a new conn_stream and initialize its main fields. On
* failure, nothing is allocated and NULL is returned.
*/
-struct conn_stream *cs_new(enum obj_type *endp, void *ctx, enum obj_type *app, void *data, const struct data_cb *data_cb)
+struct conn_stream *cs_new()
{
struct conn_stream *cs;
@@ -30,8 +30,6 @@
if (unlikely(!cs))
return NULL;
cs_init(cs);
- cs_attach_endp(cs, endp, ctx);
- cs_attach_app(cs, app, data, data_cb);
return cs;
}
@@ -40,5 +38,106 @@
*/
void cs_free(struct conn_stream *cs)
{
+ si_free(cs->si);
pool_free(pool_head_connstream, cs);
}
+
+
+/* Attaches a conn_stream to an endpoint and sets the endpoint ctx */
+void cs_attach_endp(struct conn_stream *cs, enum obj_type *endp, void *ctx)
+{
+ struct connection *conn;
+ struct appctx *appctx;
+
+ cs->end = endp;
+ cs->ctx = ctx;
+ if ((conn = objt_conn(endp)) != NULL) {
+ if (!conn->ctx)
+ conn->ctx = cs;
+ if (cs_strm(cs)) {
+ cs->si->ops = &si_conn_ops;
+ cs->data_cb = &si_conn_cb;
+ }
+ else if (cs_check(cs))
+ cs->data_cb = &check_conn_cb;
+ }
+ else if ((appctx = objt_appctx(endp)) != NULL) {
+ appctx->owner = cs;
+ if (cs->si) {
+ cs->si->ops = &si_applet_ops;
+ cs->data_cb = NULL;
+ }
+ }
+}
+
+/* Attaches a conn_stream to a app layer and sets the relevant callbacks */
+int cs_attach_app(struct conn_stream *cs, enum obj_type *app)
+{
+ cs->app = app;
+
+ if (objt_stream(app)) {
+ if (!cs->si)
+ cs->si = si_new(cs);
+ if (unlikely(!cs->si))
+ return -1;
+
+ if (cs_conn(cs)) {
+ cs->si->ops = &si_conn_ops;
+ cs->data_cb = &si_conn_cb;
+ }
+ else if (cs_appctx(cs)) {
+ cs->si->ops = &si_applet_ops;
+ cs->data_cb = NULL;
+ }
+ else {
+ cs->si->ops = &si_embedded_ops;
+ cs->data_cb = NULL;
+ }
+ }
+ else if (objt_check(app))
+ cs->data_cb = &check_conn_cb;
+ return 0;
+}
+
+/* Detach the conn_stream from the endpoint, if any. For a connecrion, if a mux
+ * owns the connection ->detach() callback is called. Otherwise, it means the
+ * conn-stream owns the connection. In this case the connection is closed and
+ * released. For an applet, the appctx is released. At the end, the conn-stream
+ * is not released but some fields a reset.
+ */
+void cs_detach_endp(struct conn_stream *cs)
+{
+ struct connection *conn;
+ struct appctx *appctx;
+
+ if ((conn = cs_conn(cs))) {
+ if (conn->mux) {
+ if (cs->si && cs->si->wait_event.events != 0)
+ conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event);
+ conn->mux->detach(cs);
+ }
+ else {
+ /* It's too early to have a mux, let's just destroy
+ * the connection
+ */
+ conn_stop_tracking(conn);
+ conn_full_close(conn);
+ if (conn->destroy_cb)
+ conn->destroy_cb(conn);
+ conn_free(conn);
+ }
+ }
+ else if ((appctx = cs_appctx(cs))) {
+ if (appctx->applet->release)
+ appctx->applet->release(appctx);
+ appctx_free(appctx);
+ }
+
+ /* Rest CS */
+ cs->flags = CS_FL_NONE;
+ cs->end = NULL;
+ cs->ctx = NULL;
+ if (cs->si)
+ cs->si->ops = &si_embedded_ops;
+ cs->data_cb = NULL;
+}
diff --git a/src/connection.c b/src/connection.c
index 60cbd22..1d53eed 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -1738,8 +1738,8 @@
memcpy(hdr->sig, pp2_signature, PP2_SIGNATURE_LEN);
if (strm) {
- src = si_src(&strm->si[0]);
- dst = si_dst(&strm->si[0]);
+ src = si_src(strm->csf->si);
+ dst = si_dst(strm->csf->si);
}
else if (remote && conn_get_src(remote) && conn_get_dst(remote)) {
src = conn_src(remote);
@@ -1937,8 +1937,8 @@
const struct sockaddr_storage *dst = NULL;
if (strm) {
- src = si_src(&strm->si[0]);
- dst = si_dst(&strm->si[0]);
+ src = si_src(strm->csf->si);
+ dst = si_dst(strm->csf->si);
}
else if (remote && conn_get_src(remote) && conn_get_dst(remote)) {
src = conn_src(remote);
diff --git a/src/dns.c b/src/dns.c
index 0bcecec..249db22 100644
--- a/src/dns.c
+++ b/src/dns.c
@@ -903,11 +903,12 @@
goto out_free_appctx;
}
- cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
+ cs = cs_new();
if (!cs) {
ha_alert("out of memory in dns_session_create().\n");
goto out_free_sess;
}
+ cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in dns_session_create().\n");
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 6c4c9f8..5d0a535 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -2024,9 +2024,10 @@
if (!sess)
goto out_free_spoe;
- cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
+ cs = cs_new();
if (!cs)
goto out_free_sess;
+ cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((strm = stream_new(sess, cs, &BUF_NULL)) == NULL)
goto out_free_cs;
@@ -2034,7 +2035,7 @@
stream_set_backend(strm, conf->agent->b.be);
/* applet is waiting for data */
- si_cant_get(&strm->si[0]);
+ si_cant_get(strm->csf->si);
appctx_wakeup(appctx);
strm->do_log = NULL;
diff --git a/src/h3.c b/src/h3.c
index bda2a46..b5699bf 100644
--- a/src/h3.c
+++ b/src/h3.c
@@ -176,9 +176,10 @@
if (fin)
htx->flags |= HTX_FL_EOM;
- cs = cs_new(qcs->qcc->conn->obj_type);
+ cs = cs_new();
if (!cs)
return 1;
+ cs_attach_endp(&qcs->qcc->conn->obj_type, qcs);
cs->flags |= CS_FL_NOT_FIRST;
cs->ctx = qcs;
diff --git a/src/hlua.c b/src/hlua.c
index a6cc839..3b2bd73 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -2961,11 +2961,12 @@
goto out_fail_appctx;
}
- cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
+ cs = cs_new();
if (!cs) {
hlua_pusherror(L, "socket: out of memory");
goto out_fail_sess;
}
+ cs_attach_endp(cs, &appctx->obj_type, appctx);
strm = stream_new(sess, cs, &BUF_NULL);
if (!strm) {
@@ -2980,7 +2981,7 @@
* and retrieve data from the server. The connection is initialized
* with the "struct server".
*/
- si_set_state(&strm->si[1], SI_ST_ASS);
+ si_set_state(strm->csb->si, SI_ST_ASS);
/* Force destination server. */
strm->flags |= SF_DIRECT | SF_ASSIGNED | SF_BE_ASSIGNED;
diff --git a/src/hq_interop.c b/src/hq_interop.c
index 376779d..c9d8c9a 100644
--- a/src/hq_interop.c
+++ b/src/hq_interop.c
@@ -72,10 +72,10 @@
htx_add_endof(htx, HTX_BLK_EOH);
htx_to_buf(htx, &htx_buf);
- cs = cs_new(&qcs->qcc->conn->obj_type);
+ cs = cs_new();
if (!cs)
return -1;
-
+ cs_attach_endp(cs, &qcs->qcc->conn->obj_type, qcs);
cs->ctx = qcs;
stream_create_from_cs(cs, &htx_buf);
diff --git a/src/http_ana.c b/src/http_ana.c
index c502d43..a3fd15c 100644
--- a/src/http_ana.c
+++ b/src/http_ana.c
@@ -1257,7 +1257,7 @@
res->to_forward = 0;
res->analyse_exp = TICK_ETERNITY;
res->total = 0;
- si_reset_endpoint(cs_si(s->csb));
+ cs_detach_endp(s->csb);
b_free(&req->buf);
/* Swap the L7 buffer with the channel buffer */
diff --git a/src/http_client.c b/src/http_client.c
index 9cbecf0..b84e617 100644
--- a/src/http_client.c
+++ b/src/http_client.c
@@ -486,11 +486,12 @@
ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__);
goto out_free_appctx;
}
- cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
+ cs = cs_new();
if (!cs) {
ha_alert("httpclient: out of memory in %s:%d.\n", __FUNCTION__, __LINE__);
goto out_free_sess;
}
+ cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &hc->req.buf)) == NULL) {
ha_alert("httpclient: Failed to initialize stream %s:%d.\n", __FUNCTION__, __LINE__);
goto out_free_cs;
diff --git a/src/http_fetch.c b/src/http_fetch.c
index 99dc89a..22919d3 100644
--- a/src/http_fetch.c
+++ b/src/http_fetch.c
@@ -1186,7 +1186,7 @@
*/
static int smp_fetch_base32_src(const struct arg *args, struct sample *smp, const char *kw, void *private)
{
- const struct sockaddr_storage *src = (smp->strm ? si_src(&smp->strm->si[0]) : NULL);
+ const struct sockaddr_storage *src = (smp->strm ? si_src(smp->strm->csf->si) : NULL);
struct buffer *temp;
if (!src)
@@ -2053,7 +2053,7 @@
*/
static int smp_fetch_url32_src(const struct arg *args, struct sample *smp, const char *kw, void *private)
{
- const struct sockaddr_storage *src = (smp->strm ? si_src(&smp->strm->si[0]) : NULL);
+ const struct sockaddr_storage *src = (smp->strm ? si_src(smp->strm->csf->si) : NULL);
struct buffer *temp;
if (!src)
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 48ba0a3..bfa923a 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -682,11 +682,12 @@
struct conn_stream *cs;
TRACE_ENTER(H1_EV_STRM_NEW, h1c->conn, h1s);
- cs = cs_new(&h1c->conn->obj_type, h1s, NULL, NULL, NULL);
+ cs = cs_new();
if (!cs) {
TRACE_ERROR("CS allocation failure", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, h1c->conn, h1s);
goto err;
}
+ cs_attach_endp(cs, &h1c->conn->obj_type, h1s);
h1s->cs = cs;
if (h1s->flags & H1S_F_NOT_FIRST)
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 2049b51..4fed9b2 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -1529,11 +1529,11 @@
if (!h2s)
goto out;
- cs = cs_new(&h2c->conn->obj_type, h2s, NULL, NULL, NULL);
+ cs = cs_new();
if (!cs)
goto out_close;
-
cs->flags |= CS_FL_NOT_FIRST;
+ cs_attach_endp(cs, &h2c->conn->obj_type, h2s);
h2s->cs = cs;
h2c->nb_cs++;
diff --git a/src/mux_pt.c b/src/mux_pt.c
index 7ba9da9..9f1aaa8 100644
--- a/src/mux_pt.c
+++ b/src/mux_pt.c
@@ -291,11 +291,12 @@
ctx->conn = conn;
if (!cs) {
- cs = cs_new(&conn->obj_type, NULL, NULL, NULL, NULL);
+ cs = cs_new();
if (!cs) {
TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
goto fail_free_ctx;
}
+ cs_attach_endp(cs, &conn->obj_type, NULL);
if (!stream_new(conn->owner, cs, &BUF_NULL)) {
TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);
diff --git a/src/peers.c b/src/peers.c
index 8e7246d..5172214 100644
--- a/src/peers.c
+++ b/src/peers.c
@@ -3204,11 +3204,12 @@
goto out_free_appctx;
}
- cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
+ cs = cs_new();
if (!cs) {
ha_alert("out of memory in peer_session_create().\n");
goto out_free_sess;
}
+ cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in peer_session_create().\n");
diff --git a/src/sink.c b/src/sink.c
index 7a4751b..aa0ecfa 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -655,11 +655,12 @@
goto out_free_appctx;
}
- cs = cs_new(&appctx->obj_type, appctx, NULL, NULL, NULL);
+ cs = cs_new();
if (!cs) {
ha_alert("out of memory in sink_forward_session_create");
goto out_free_sess;
}
+ cs_attach_endp(cs, &appctx->obj_type, appctx);
if ((s = stream_new(sess, cs, &BUF_NULL)) == NULL) {
ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
diff --git a/src/stream.c b/src/stream.c
index f7c2f6f..9dbf965 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -438,30 +438,26 @@
if (sess->fe->mode == PR_MODE_HTTP)
s->flags |= SF_HTX;
- cs->app = &s->obj_type;
s->csf = cs;
- s->csb = cs_new(NULL, NULL, &s->obj_type, &s->si[1], NULL);
+ s->csb = cs_new();
if (!s->csb)
- goto out_fail_alloc_cs;
+ goto out_fail_alloc_csb;
- s->si[0].flags = SI_FL_NONE;
- if (si_reset(&s->si[0]) < 0)
- goto out_fail_reset_si0;
- si_attach_cs(&s->si[0], s->csf);
- si_set_state(&s->si[0], SI_ST_EST);
- s->si[0].hcto = sess->fe->timeout.clientfin;
+ if (cs_attach_app(s->csf, &s->obj_type) < 0)
+ goto out_fail_attach_csf;
+ if (cs_attach_app(s->csb, &s->obj_type) < 0)
+ goto out_fail_attach_csb;
- if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
- s->si[0].flags |= SI_FL_INDEP_STR;
+ si_set_state(cs_si(s->csf), SI_ST_EST);
+ cs_si(s->csf)->hcto = sess->fe->timeout.clientfin;
- s->si[1].flags = SI_FL_ISBACK;
- if (si_reset(&s->si[1]) < 0)
- goto out_fail_reset_si1;
- si_attach_cs(&s->si[1], s->csb);
- s->si[1].hcto = TICK_ETERNITY;
+ if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
+ cs_si(s->csf)->flags |= SI_FL_INDEP_STR;
+ cs_si(s->csb)->flags = SI_FL_ISBACK;
+ cs_si(s->csb)->hcto = TICK_ETERNITY;
if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
- s->si[1].flags |= SI_FL_INDEP_STR;
+ cs_si(s->csb)->flags |= SI_FL_INDEP_STR;
if (cs->flags & CS_FL_WEBSOCKET)
s->flags |= SF_WEBSOCKET;
@@ -470,7 +466,7 @@
if (mux) {
if (mux->flags & MX_FL_CLEAN_ABRT)
- s->si[0].flags |= SI_FL_CLEAN_ABRT;
+ cs_si(s->csf)->flags |= SI_FL_CLEAN_ABRT;
if (mux->flags & MX_FL_HTX)
s->flags |= SF_HTX;
}
@@ -539,10 +535,9 @@
if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
goto out_fail_accept;
- s->si[1].l7_buffer = BUF_NULL;
/* finish initialization of the accepted file descriptor */
if (cs_appctx(cs))
- si_want_get(&s->si[0]);
+ si_want_get(cs_si(s->csf));
if (sess->fe->accept && sess->fe->accept(s) < 0)
goto out_fail_accept;
@@ -571,13 +566,12 @@
/* Error unrolling */
out_fail_accept:
flt_stream_release(s, 0);
- tasklet_free(s->si[1].wait_event.tasklet);
LIST_DELETE(&s->list);
- out_fail_reset_si1:
- tasklet_free(s->si[0].wait_event.tasklet);
- out_fail_reset_si0:
- si_release_endpoint(&s->si[1]);
- out_fail_alloc_cs:
+ out_fail_attach_csb:
+ si_free(cs_si(s->csf));
+ out_fail_attach_csf:
+ cs_free(s->csb);
+ out_fail_alloc_csb:
task_destroy(t);
out_fail_alloc:
pool_free(pool_head_stream, s);
@@ -722,23 +716,15 @@
/* FIXME: Handle it in appctx_free ??? */
must_free_sess = objt_appctx(sess->origin) && sess->origin == s->csf->end;
+ /* FIXME: ATTENTION, si CSF est librérer avant, ça plante !!!! */
+ cs_destroy(s->csb);
+ cs_destroy(s->csf);
- si_release_endpoint(cs_si(s->csb));
- si_release_endpoint(cs_si(s->csf));
-
- tasklet_free(s->si[0].wait_event.tasklet);
- tasklet_free(s->si[1].wait_event.tasklet);
-
- b_free(&s->si[1].l7_buffer);
if (must_free_sess) {
sess->origin = NULL;
session_free(sess);
}
- sockaddr_free(&s->si[0].src);
- sockaddr_free(&s->si[0].dst);
- sockaddr_free(&s->si[1].src);
- sockaddr_free(&s->si[1].dst);
pool_free(pool_head_stream, s);
/* We may want to free the maximum amount of pools if the proxy is stopping */
@@ -2187,7 +2173,7 @@
}
}
else {
- si_reset_endpoint(si_b);
+ cs_detach_endp(s->csb);
si_b->state = SI_ST_CLO; /* shutw+ini = abort */
channel_shutw_now(req); /* fix buffer flags upon abort */
channel_shutr_now(res);
@@ -3157,7 +3143,7 @@
chunk_appendf(&trash,
" flags=0x%x, conn_retries=%d, srv_conn=%p, pend_pos=%p waiting=%d epoch=%#x\n",
- strm->flags, strm->si[1].conn_retries, strm->srv_conn, strm->pend_pos,
+ strm->flags, strm->csb->si->conn_retries, strm->srv_conn, strm->pend_pos,
LIST_INLIST(&strm->buffer_wait.list), strm->stream_epoch);
chunk_appendf(&trash,
@@ -3253,29 +3239,29 @@
chunk_appendf(&trash,
" si[0]=%p (state=%s flags=0x%02x endp0=%s:%p exp=%s et=0x%03x sub=%d)\n",
- &strm->si[0],
- si_state_str(strm->si[0].state),
- strm->si[0].flags,
+ strm->csf->si,
+ si_state_str(strm->csf->si->state),
+ strm->csf->si->flags,
obj_type_name(strm->csf->end),
obj_base_ptr(strm->csf->end),
- strm->si[0].exp ?
- tick_is_expired(strm->si[0].exp, now_ms) ? "<PAST>" :
- human_time(TICKS_TO_MS(strm->si[0].exp - now_ms),
+ strm->csf->si->exp ?
+ tick_is_expired(strm->csf->si->exp, now_ms) ? "<PAST>" :
+ human_time(TICKS_TO_MS(strm->csf->si->exp - now_ms),
TICKS_TO_MS(1000)) : "<NEVER>",
- strm->si[0].err_type, strm->si[0].wait_event.events);
+ strm->csf->si->err_type, strm->csf->si->wait_event.events);
chunk_appendf(&trash,
" si[1]=%p (state=%s flags=0x%02x endp1=%s:%p exp=%s et=0x%03x sub=%d)\n",
- &strm->si[1],
- si_state_str(strm->si[1].state),
- strm->si[1].flags,
+ strm->csb->si,
+ si_state_str(strm->csb->si->state),
+ strm->csb->si->flags,
obj_type_name(strm->csb->end),
obj_base_ptr(strm->csb->end),
- strm->si[1].exp ?
- tick_is_expired(strm->si[1].exp, now_ms) ? "<PAST>" :
- human_time(TICKS_TO_MS(strm->si[1].exp - now_ms),
+ strm->csb->si->exp ?
+ tick_is_expired(strm->csb->si->exp, now_ms) ? "<PAST>" :
+ human_time(TICKS_TO_MS(strm->csb->si->exp - now_ms),
TICKS_TO_MS(1000)) : "<NEVER>",
- strm->si[1].err_type, strm->si[1].wait_event.events);
+ strm->csb->si->err_type, strm->csb->si->wait_event.events);
cs = strm->csf;
chunk_appendf(&trash, " cs=%p csf=0x%08x ctx=%p\n", cs, cs->flags, cs->ctx);
@@ -3650,21 +3636,21 @@
conn = cs_conn(curr_strm->csf);
chunk_appendf(&trash,
" s0=[%d,%1xh,fd=%d,ex=%s]",
- curr_strm->si[0].state,
- curr_strm->si[0].flags,
+ curr_strm->csf->si->state,
+ curr_strm->csf->si->flags,
conn ? conn->handle.fd : -1,
- curr_strm->si[0].exp ?
- human_time(TICKS_TO_MS(curr_strm->si[0].exp - now_ms),
+ curr_strm->csf->si->exp ?
+ human_time(TICKS_TO_MS(curr_strm->csf->si->exp - now_ms),
TICKS_TO_MS(1000)) : "");
conn = cs_conn(curr_strm->csb);
chunk_appendf(&trash,
" s1=[%d,%1xh,fd=%d,ex=%s]",
- curr_strm->si[1].state,
- curr_strm->si[1].flags,
+ curr_strm->csb->si->state,
+ curr_strm->csb->si->flags,
conn ? conn->handle.fd : -1,
- curr_strm->si[1].exp ?
- human_time(TICKS_TO_MS(curr_strm->si[1].exp - now_ms),
+ curr_strm->csb->si->exp ?
+ human_time(TICKS_TO_MS(curr_strm->csb->si->exp - now_ms),
TICKS_TO_MS(1000)) : "");
chunk_appendf(&trash,
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 5b86bf8..aa3b6d8 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -28,6 +28,7 @@
#include <haproxy/http_htx.h>
#include <haproxy/pipe-t.h>
#include <haproxy/pipe.h>
+#include <haproxy/pool.h>
#include <haproxy/proxy.h>
#include <haproxy/stream-t.h>
#include <haproxy/stream_interface.h>
@@ -36,6 +37,9 @@
#include <haproxy/tools.h>
+DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream_interface));
+
+
/* functions used by default on a detached stream-interface */
static void stream_int_shutr(struct stream_interface *si);
static void stream_int_shutw(struct stream_interface *si);
@@ -98,6 +102,35 @@
.name = "STRM",
};
+
+struct stream_interface *si_new(struct conn_stream *cs)
+{
+ struct stream_interface *si;
+
+ si = pool_alloc(pool_head_streaminterface);
+ if (unlikely(!si))
+ return NULL;
+ si->flags = SI_FL_NONE;
+ if (si_reset(si) < 0) {
+ pool_free(pool_head_streaminterface, si);
+ return NULL;
+ }
+ si->cs = cs;
+ return si;
+}
+
+void si_free(struct stream_interface *si)
+{
+ if (!si)
+ return;
+
+ b_free(&si->l7_buffer);
+ tasklet_free(si->wait_event.tasklet);
+ sockaddr_free(&si->src);
+ sockaddr_free(&si->dst);
+ pool_free(pool_head_streaminterface, si);
+}
+
/*
* This function only has to be called once after a wakeup event in case of
* suspected timeout. It controls the stream interface timeouts and sets
@@ -309,7 +342,7 @@
appctx = appctx_new(app);
if (!appctx)
return NULL;
- si_attach_appctx(si, appctx);
+ cs_attach_endp(si->cs, &appctx->obj_type, appctx);
appctx->t->nice = si_strm(si)->task->nice;
si_cant_get(si);
appctx_wakeup(appctx);
diff --git a/src/tcp_act.c b/src/tcp_act.c
index a2ed7d0..ecb33d9 100644
--- a/src/tcp_act.c
+++ b/src/tcp_act.c
@@ -288,7 +288,7 @@
* is present, returning with ERR will cause lingering to be disabled.
*/
if (strm)
- strm->si[0].flags |= SI_FL_NOLINGER;
+ strm->csf->si->flags |= SI_FL_NOLINGER;
/* We're on the client-facing side, we must force to disable lingering to
* ensure we will use an RST exclusively and kill any pending data.
diff --git a/src/tcp_sample.c b/src/tcp_sample.c
index 4a77036..895a130 100644
--- a/src/tcp_sample.c
+++ b/src/tcp_sample.c
@@ -65,7 +65,7 @@
src = conn_src(conn);
}
else /* src */
- src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess));
+ src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess));
if (!src)
return 0;
@@ -109,7 +109,7 @@
src = conn_src(conn);
}
else /* src_port */
- src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess));
+ src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess));
if (!src)
return 0;
@@ -144,7 +144,7 @@
dst = conn_dst(conn);
}
else /* dst */
- dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess));
+ dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess));
if (!dst)
return 0;
@@ -181,7 +181,7 @@
dst = conn_dst(conn);
}
else /* dst_is_local */
- dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess));
+ dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess));
if (!dst)
return 0;
@@ -207,7 +207,7 @@
src = conn_src(conn);
}
else /* src_is_local */
- src = (smp->strm ? si_src(&smp->strm->si[0]) : sess_src(smp->sess));
+ src = (smp->strm ? si_src(smp->strm->csf->si) : sess_src(smp->sess));
if (!src)
return 0;
@@ -240,7 +240,7 @@
dst = conn_dst(conn);
}
else /* dst_port */
- dst = (smp->strm ? si_dst(&smp->strm->si[0]) : sess_dst(smp->sess));
+ dst = (smp->strm ? si_dst(smp->strm->csf->si) : sess_dst(smp->sess));
if (!dst)
return 0;
diff --git a/src/tcpcheck.c b/src/tcpcheck.c
index 3697f41..0aab10f 100644
--- a/src/tcpcheck.c
+++ b/src/tcpcheck.c
@@ -1093,7 +1093,7 @@
/* No connection, prepare a new one */
conn = conn_new((s ? &s->obj_type : &proxy->obj_type));
if (conn)
- cs = cs_new(&conn->obj_type, conn, &check->obj_type, NULL, &check_conn_cb);
+ cs = cs_new();
if (!conn || !cs) {
chunk_printf(&trash, "TCPCHK error allocating connection at step %d",
tcpcheck_get_step_id(check, rule));
@@ -1106,7 +1106,18 @@
conn_free(conn);
goto out;
}
-
+ cs_attach_endp(cs, &conn->obj_type, conn);
+ if (cs_attach_app(cs, &check->obj_type) < 0) {
+ chunk_printf(&trash, "TCPCHK error allocating connection at step %d",
+ tcpcheck_get_step_id(check, rule));
+ if (rule->comment)
+ chunk_appendf(&trash, " comment: '%s'", rule->comment);
+ set_server_check_status(check, HCHK_STATUS_SOCKERR, trash.area);
+ ret = TCPCHK_EVAL_STOP;
+ TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
+ cs_destroy(cs);
+ goto out;
+ }
tasklet_set_tid(check->wait_list.tasklet, tid);
check->cs = cs;