MINOIR: mux-pt/trace: Register a new trace source with its events
As for the other muxes, traces are now supported in the pt mux. All parts of
the multiplexer is covered by these traces. Events are splitted by
categories (connection, stream, rx and tx).
In traces, the first argument is always a connection. So it is easy to get
the mux context (conn->ctx). The second argument is always a conn-stream and
mau be NUUL. The third one is a buffer and it may also be NULL. Depending on
the context it is the request or the response. In all cases it is owned by a
channel. Finally, the fourth argument is an integer value. Its meaning
depends on the calling context.
diff --git a/src/mux_pt.c b/src/mux_pt.c
index 834cd51..eff43d2 100644
--- a/src/mux_pt.c
+++ b/src/mux_pt.c
@@ -13,8 +13,10 @@
#include <haproxy/api.h>
#include <haproxy/buf.h>
#include <haproxy/connection.h>
+#include <haproxy/pipe-t.h>
#include <haproxy/stream.h>
#include <haproxy/task.h>
+#include <haproxy/trace.h>
struct mux_pt_ctx {
struct conn_stream *cs;
@@ -24,15 +26,181 @@
DECLARE_STATIC_POOL(pool_head_pt_ctx, "mux_pt", sizeof(struct mux_pt_ctx));
+/* trace source and events */
+static void pt_trace(enum trace_level level, uint64_t mask,
+ const struct trace_source *src,
+ const struct ist where, const struct ist func,
+ const void *a1, const void *a2, const void *a3, const void *a4);
+
+/* The event representation is split like this :
+ * pt_ctx - internal PT context
+ * strm - application layer
+ */
+static const struct trace_event pt_trace_events[] = {
+#define PT_EV_CONN_NEW (1ULL << 0)
+ { .mask = PT_EV_CONN_NEW, .name = "pt_conn_new", .desc = "new PT connection" },
+#define PT_EV_CONN_WAKE (1ULL << 1)
+ { .mask = PT_EV_CONN_WAKE, .name = "pt_conn_wake", .desc = "PT connection woken up" },
+#define PT_EV_CONN_END (1ULL << 2)
+ { .mask = PT_EV_CONN_END, .name = "pt_conn_end", .desc = "PT connection terminated" },
+#define PT_EV_CONN_ERR (1ULL << 3)
+ { .mask = PT_EV_CONN_ERR, .name = "pt_conn_err", .desc = "error on PT connection" },
+#define PT_EV_STRM_NEW (1ULL << 4)
+ { .mask = PT_EV_STRM_NEW, .name = "strm_new", .desc = "app-layer stream creation" },
+#define PT_EV_STRM_SHUT (1ULL << 5)
+ { .mask = PT_EV_STRM_SHUT, .name = "strm_shut", .desc = "stream shutdown" },
+#define PT_EV_STRM_END (1ULL << 6)
+ { .mask = PT_EV_STRM_END, .name = "strm_end", .desc = "detaching app-layer stream" },
+#define PT_EV_STRM_ERR (1ULL << 7)
+ { .mask = PT_EV_STRM_ERR, .name = "strm_err", .desc = "stream error" },
+#define PT_EV_RX_DATA (1ULL << 8)
+ { .mask = PT_EV_RX_DATA, .name = "pt_rx_data", .desc = "Rx on PT connection" },
+#define PT_EV_TX_DATA (1ULL << 9)
+ { .mask = PT_EV_TX_DATA, .name = "pt_tx_data", .desc = "Tx on PT connection" },
+
+ {}
+};
+
+
+static const struct name_desc pt_trace_decoding[] = {
+#define PT_VERB_CLEAN 1
+ { .name="clean", .desc="only user-friendly stuff, generally suitable for level \"user\"" },
+#define PT_VERB_MINIMAL 2
+ { .name="minimal", .desc="report only h1c/h1s state and flags, no real decoding" },
+#define PT_VERB_SIMPLE 3
+ { .name="simple", .desc="add request/response status line or htx info when available" },
+#define PT_VERB_ADVANCED 4
+ { .name="advanced", .desc="add header fields or frame decoding when available" },
+#define PT_VERB_COMPLETE 5
+ { .name="complete", .desc="add full data dump when available" },
+ { /* end */ }
+};
+
+static struct trace_source trace_pt = {
+ .name = IST("pt"),
+ .desc = "Passthrough multiplexer",
+ .arg_def = TRC_ARG1_CONN, // TRACE()'s first argument is always a connection
+ .default_cb = pt_trace,
+ .known_events = pt_trace_events,
+ .lockon_args = NULL,
+ .decoding = pt_trace_decoding,
+ .report_events = ~0, // report everything by default
+};
+
+#define TRACE_SOURCE &trace_pt
+INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
+
+static inline void pt_trace_buf(const struct buffer *buf, size_t ofs, size_t len)
+{
+ size_t block1, block2;
+ int line, ptr, newptr;
+
+ block1 = b_contig_data(buf, ofs);
+ block2 = 0;
+ if (block1 > len)
+ block1 = len;
+ block2 = len - block1;
+
+ ofs = b_peek_ofs(buf, ofs);
+
+ line = 0;
+ ptr = ofs;
+ while (ptr < ofs + block1) {
+ newptr = dump_text_line(&trace_buf, b_orig(buf), b_size(buf), ofs + block1, &line, ptr);
+ if (newptr == ptr)
+ break;
+ ptr = newptr;
+ }
+
+ line = ptr = 0;
+ while (ptr < block2) {
+ newptr = dump_text_line(&trace_buf, b_orig(buf), b_size(buf), block2, &line, ptr);
+ if (newptr == ptr)
+ break;
+ ptr = newptr;
+ }
+}
+
+/* the PT traces always expect that arg1, if non-null, is of type connection
+ * (from which we can derive the pt context), that arg2, if non-null, is a
+ * conn-stream, and that arg3, if non-null, is a buffer.
+ */
+static void pt_trace(enum trace_level level, uint64_t mask, const struct trace_source *src,
+ const struct ist where, const struct ist func,
+ const void *a1, const void *a2, const void *a3, const void *a4)
+{
+ const struct connection *conn = a1;
+ const struct mux_pt_ctx *ctx = conn ? conn->ctx : NULL;
+ const struct conn_stream *cs = a2;
+ const struct buffer *buf = a3;
+ const size_t *val = a4;
+
+ if (!ctx|| src->verbosity < PT_VERB_CLEAN)
+ return;
+
+ /* Display frontend/backend info by default */
+ chunk_appendf(&trace_buf, " : [%c]", (conn_is_back(conn) ? 'B' : 'F'));
+
+ if (src->verbosity == PT_VERB_CLEAN)
+ return;
+
+ /* Display the value to the 4th argument (level > STATE) */
+ if (src->level > TRACE_LEVEL_STATE && val)
+ chunk_appendf(&trace_buf, " - VAL=%lu", (long)*val);
+
+ /* Display conn and cs info, if defined (pointer + flags) */
+ chunk_appendf(&trace_buf, " - conn=%p(0x%08x)", conn, conn->flags);
+ if (cs)
+ chunk_appendf(&trace_buf, " cs=%p(0x%08x)", cs, cs->flags);
+
+ if (src->verbosity == PT_VERB_MINIMAL)
+ return;
+
+ /* Display buffer info, if defined (level > USER & verbosity > SIMPLE) */
+ if (src->level > TRACE_LEVEL_USER && buf) {
+ int full = 0, max = 3000, chunk = 1024;
+
+ /* Full info (level > STATE && verbosity > SIMPLE) */
+ if (src->level > TRACE_LEVEL_STATE) {
+ if (src->verbosity == PT_VERB_COMPLETE)
+ full = 1;
+ else if (src->verbosity == PT_VERB_ADVANCED) {
+ full = 1;
+ max = 256;
+ chunk = 64;
+ }
+ }
+
+ chunk_appendf(&trace_buf, " buf=%u@%p+%u/%u",
+ (unsigned int)b_data(buf), b_orig(buf),
+ (unsigned int)b_head_ofs(buf), (unsigned int)b_size(buf));
+
+ if (b_data(buf) && full) {
+ chunk_memcat(&trace_buf, "\n", 1);
+ if (b_data(buf) < max)
+ pt_trace_buf(buf, 0, b_data(buf));
+ else {
+ pt_trace_buf(buf, 0, chunk);
+ chunk_memcat(&trace_buf, " ...\n", 6);
+ pt_trace_buf(buf, b_data(buf) - chunk, chunk);
+ }
+ }
+ }
+}
+
static void mux_pt_destroy(struct mux_pt_ctx *ctx)
{
struct connection *conn = NULL;
+ TRACE_POINT(PT_EV_CONN_END);
+
if (ctx) {
/* The connection must be attached to this mux to be released */
if (ctx->conn && ctx->conn->ctx == ctx)
conn = ctx->conn;
+ TRACE_DEVEL("freeing pt context", PT_EV_CONN_END, conn);
+
tasklet_free(ctx->wait_event.tasklet);
if (conn && ctx->wait_event.events != 0)
@@ -44,6 +212,7 @@
if (conn) {
conn->mux = NULL;
conn->ctx = NULL;
+ TRACE_DEVEL("freeing conn", PT_EV_CONN_END, conn);
conn_stop_tracking(conn);
conn_full_close(conn);
@@ -60,6 +229,7 @@
{
struct mux_pt_ctx *ctx = tctx;
+ TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn, ctx->cs);
if (ctx->cs) {
/* There's a small race condition.
* mux_pt_io_cb() is only supposed to be called if we have no
@@ -75,17 +245,22 @@
ctx->conn->subs = NULL;
} else if (ctx->cs->data_cb->wake)
ctx->cs->data_cb->wake(ctx->cs);
+ TRACE_DEVEL("leaving waking up CS", PT_EV_CONN_WAKE, ctx->conn, ctx->cs);
return t;
}
conn_ctrl_drain(ctx->conn);
if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH)) {
+ TRACE_DEVEL("destroying pt context", PT_EV_CONN_WAKE, ctx->conn);
mux_pt_destroy(ctx);
t = NULL;
}
- else
+ else {
+ TRACE_DEVEL("subscribing for reads", PT_EV_CONN_WAKE, ctx->conn);
ctx->conn->xprt->subscribe(ctx->conn, ctx->conn->xprt_ctx, SUB_RETRY_RECV,
- &ctx->wait_event);
+ &ctx->wait_event);
+ }
+ TRACE_LEAVE(PT_EV_CONN_WAKE, ctx->conn);
return t;
}
@@ -100,8 +275,12 @@
struct conn_stream *cs = conn->ctx;
struct mux_pt_ctx *ctx = pool_alloc(pool_head_pt_ctx);
- if (!ctx)
+ TRACE_ENTER(PT_EV_CONN_NEW);
+
+ if (!ctx) {
+ TRACE_ERROR("PT context allocation failure", PT_EV_CONN_NEW|PT_EV_CONN_END|PT_EV_CONN_ERR);
goto fail;
+ }
ctx->wait_event.tasklet = tasklet_new();
if (!ctx->wait_event.tasklet)
@@ -113,18 +292,24 @@
if (!cs) {
cs = cs_new(conn, conn->target);
- if (!cs)
+ if (!cs) {
+ TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
goto fail_free_ctx;
+ }
- if (stream_create_from_cs(cs, &BUF_NULL) < 0)
+ if (stream_create_from_cs(cs, &BUF_NULL) < 0) {
+ TRACE_ERROR("stream creation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn, cs);
goto fail_free;
-
+ }
+ TRACE_POINT(PT_EV_STRM_NEW, conn, cs);
}
conn->ctx = ctx;
ctx->cs = cs;
cs->flags |= CS_FL_RCV_MORE;
if (global.tune.options & GTUNE_USE_SPLICE)
cs->flags |= CS_FL_MAY_SPLICE;
+
+ TRACE_LEAVE(PT_EV_CONN_NEW, conn, cs);
return 0;
fail_free:
@@ -134,6 +319,7 @@
tasklet_free(ctx->wait_event.tasklet);
pool_free(pool_head_pt_ctx, ctx);
fail:
+ TRACE_DEVEL("leaving in error", PT_EV_CONN_NEW|PT_EV_CONN_END|PT_EV_CONN_ERR);
return -1;
}
@@ -146,14 +332,18 @@
struct conn_stream *cs = ctx->cs;
int ret = 0;
+ TRACE_ENTER(PT_EV_CONN_WAKE, ctx->conn, cs);
if (cs) {
ret = cs->data_cb->wake ? cs->data_cb->wake(cs) : 0;
- if (ret < 0)
+ if (ret < 0) {
+ TRACE_DEVEL("leaving waking up CS", PT_EV_CONN_WAKE, ctx->conn, cs);
return ret;
+ }
} else {
conn_ctrl_drain(conn);
if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH)) {
+ TRACE_DEVEL("leaving destroying PT context", PT_EV_CONN_WAKE, ctx->conn);
mux_pt_destroy(ctx);
return -1;
}
@@ -165,6 +355,8 @@
if ((conn->flags & (CO_FL_EARLY_DATA | CO_FL_EARLY_SSL_HS | CO_FL_WAIT_XPRT)) ==
CO_FL_EARLY_DATA)
conn->flags &= ~CO_FL_EARLY_DATA;
+
+ TRACE_LEAVE(PT_EV_CONN_WAKE, ctx->conn);
return ret;
}
@@ -177,16 +369,22 @@
struct conn_stream *cs;
struct mux_pt_ctx *ctx = conn->ctx;
+ TRACE_ENTER(PT_EV_STRM_NEW, conn);
if (ctx->wait_event.events)
conn->xprt->unsubscribe(ctx->conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
cs = cs_new(conn, conn->target);
- if (!cs)
+ if (!cs) {
+ TRACE_ERROR("CS allocation failure", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
goto fail;
+ }
ctx->cs = cs;
cs->flags |= CS_FL_RCV_MORE;
+
+ TRACE_LEAVE(PT_EV_STRM_NEW, conn, cs);
return (cs);
fail:
+ TRACE_DEVEL("leaving on error", PT_EV_STRM_NEW|PT_EV_STRM_END|PT_EV_STRM_ERR, conn);
return NULL;
}
@@ -207,6 +405,7 @@
{
struct mux_pt_ctx *pt = ctx;
+ TRACE_POINT(PT_EV_CONN_END, pt->conn, pt->cs);
if (!(pt->cs) || !(pt->conn) || pt->conn->ctx != pt)
mux_pt_destroy(pt);
}
@@ -219,14 +418,20 @@
struct connection *conn = cs->conn;
struct mux_pt_ctx *ctx = cs->conn->ctx;
+ TRACE_ENTER(PT_EV_STRM_END, conn, cs);
+
/* Subscribe, to know if we got disconnected */
if (conn->owner != NULL &&
!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
ctx->cs = NULL;
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &ctx->wait_event);
- } else
+ } else {
/* There's no session attached to that connection, destroy it */
+ TRACE_DEVEL("killing dead connection", PT_EV_STRM_END, conn, cs);
mux_pt_destroy(ctx);
+ }
+
+ TRACE_LEAVE(PT_EV_STRM_END);
}
/* returns the number of streams in use on a connection */
@@ -245,6 +450,8 @@
static void mux_pt_shutr(struct conn_stream *cs, enum cs_shr_mode mode)
{
+ TRACE_ENTER(PT_EV_STRM_SHUT, cs->conn, cs);
+
if (cs->flags & CS_FL_SHR)
return;
cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
@@ -255,10 +462,14 @@
conn_ctrl_drain(cs->conn);
if (cs->flags & CS_FL_SHW)
conn_full_close(cs->conn);
+
+ TRACE_LEAVE(PT_EV_STRM_SHUT, cs->conn, cs);
}
static void mux_pt_shutw(struct conn_stream *cs, enum cs_shw_mode mode)
{
+ TRACE_ENTER(PT_EV_STRM_SHUT, cs->conn, cs);
+
if (cs->flags & CS_FL_SHW)
return;
if (conn_xprt_ready(cs->conn) && cs->conn->xprt->shutw)
@@ -268,6 +479,8 @@
conn_sock_shutw(cs->conn, (mode == CS_SHW_NORMAL));
else
conn_full_close(cs->conn);
+
+ TRACE_LEAVE(PT_EV_STRM_SHUT, cs->conn, cs);
}
/*
@@ -275,22 +488,28 @@
*/
static size_t mux_pt_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
- size_t ret;
+ size_t ret = 0;
+
+ TRACE_ENTER(PT_EV_RX_DATA, cs->conn, cs, buf, (size_t[]){count});
if (!count) {
cs->flags |= (CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
- return 0;
+ goto end;
}
b_realign_if_empty(buf);
ret = cs->conn->xprt->rcv_buf(cs->conn, cs->conn->xprt_ctx, buf, count, flags);
if (conn_xprt_read0_pending(cs->conn)) {
cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
cs->flags |= CS_FL_EOS;
+ TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, cs->conn, cs);
}
if (cs->conn->flags & CO_FL_ERROR) {
cs->flags &= ~(CS_FL_RCV_MORE | CS_FL_WANT_ROOM);
cs->flags |= CS_FL_ERROR;
+ TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, cs->conn, cs);
}
+ end:
+ TRACE_LEAVE(PT_EV_RX_DATA, cs->conn, cs, buf, (size_t[]){ret});
return ret;
}
@@ -299,10 +518,14 @@
{
size_t ret;
+ TRACE_ENTER(PT_EV_TX_DATA, cs->conn, cs, buf, (size_t[]){count});
+
ret = cs->conn->xprt->snd_buf(cs->conn, cs->conn->xprt_ctx, buf, count, flags);
if (ret > 0)
b_del(buf, ret);
+
+ TRACE_LEAVE(PT_EV_TX_DATA, cs->conn, cs, buf, (size_t[]){ret});
return ret;
}
@@ -313,6 +536,7 @@
*/
static int mux_pt_subscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
+ TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){event_type});
return cs->conn->xprt->subscribe(cs->conn, cs->conn->xprt_ctx, event_type, es);
}
@@ -322,6 +546,7 @@
*/
static int mux_pt_unsubscribe(struct conn_stream *cs, int event_type, struct wait_event *es)
{
+ TRACE_POINT(PT_EV_RX_DATA|PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){event_type});
return cs->conn->xprt->unsubscribe(cs->conn, cs->conn->xprt_ctx, event_type, es);
}
@@ -331,17 +556,32 @@
{
int ret;
+ TRACE_ENTER(PT_EV_RX_DATA, cs->conn, cs, 0, (size_t[]){count});
+
ret = cs->conn->xprt->rcv_pipe(cs->conn, cs->conn->xprt_ctx, pipe, count);
- if (conn_xprt_read0_pending(cs->conn))
+ if (conn_xprt_read0_pending(cs->conn)) {
cs->flags |= CS_FL_EOS;
- if (cs->conn->flags & CO_FL_ERROR)
+ TRACE_DEVEL("read0 on connection", PT_EV_RX_DATA, cs->conn, cs);
+ }
+ if (cs->conn->flags & CO_FL_ERROR) {
cs->flags |= CS_FL_ERROR;
+ TRACE_DEVEL("error on connection", PT_EV_RX_DATA|PT_EV_CONN_ERR, cs->conn, cs);
+ }
+
+ TRACE_LEAVE(PT_EV_RX_DATA, cs->conn, cs, 0, (size_t[]){ret});
return (ret);
}
static int mux_pt_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
{
- return (cs->conn->xprt->snd_pipe(cs->conn, cs->conn->xprt_ctx, pipe));
+ int ret;
+
+ TRACE_ENTER(PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){pipe->data});
+
+ ret = cs->conn->xprt->snd_pipe(cs->conn, cs->conn->xprt_ctx, pipe);
+
+ TRACE_LEAVE(PT_EV_TX_DATA, cs->conn, cs, 0, (size_t[]){ret});
+ return ret;
}
#endif