[MEDIUM] stats: replace the stats socket analyser with an SI applet
We can get rid of the stats analyser by moving all the stats code
to a stream interface applet. Above being cleaner, it provides new
advantages such as the ability to process requests and responses
from the same function and work only with simple state machines.
There's no need for any hijack hack anymore.
The direct advantage for the user are the interactive mode and the
ability to chain several commands delimited by a semi-colon. Now if
the user types "prompt", he gets a prompt from which he can send
as many requests as he wants. All outputs are terminated by a
blank line followed by a new prompt, so this can be used from
external tools too.
The code is not very clean, it needs some rework, but some part
of the dirty parts are due to the remnants of the hijack mode used
in the old functions we call.
The old AN_REQ_STATS_SOCK analyser flag is now unused and has been
removed.
diff --git a/src/dumpstats.c b/src/dumpstats.c
index 97de0e9..390f853 100644
--- a/src/dumpstats.c
+++ b/src/dumpstats.c
@@ -54,11 +54,14 @@
const char stats_sock_usage_msg[] =
"Unknown command. Please enter one of the following commands only :\n"
+ " help : this message\n"
+ " prompt : toggle interactive mode with prompt\n"
+ " quit : disconnect\n"
" show info : report information about the running process\n"
" show stat : report counters for each proxy and server\n"
" show errors : report last request and response errors for each proxy\n"
" show sess : report the list of current sessions\n"
- "\n";
+ "";
const struct chunk stats_sock_usage = {
.str = (char *)&stats_sock_usage_msg,
@@ -122,7 +125,7 @@
global.stats_sock.options = LI_O_NONE;
global.stats_sock.accept = uxst_event_accept;
global.stats_sock.handler = process_session;
- global.stats_sock.analysers = AN_REQ_STATS_SOCK;
+ global.stats_sock.analysers = 0;
global.stats_sock.nice = -64; /* we want to boost priority for local stats */
global.stats_sock.private = global.stats_fe; /* must point to the frontend */
@@ -228,12 +231,15 @@
"\n");
}
-/* Parses the request line in <cmd> and possibly starts dumping stats on
- * s->rep with the hijack bit set. Returns 1 if OK, 0 in case of any error.
- * The line is modified after parsing.
+/* Processes the stats interpreter on the statistics socket. This function is
+ * called from an applet running in a stream interface. Right now we still
+ * support older functions which used to emulate servers and to set
+ * STATS_ST_CLOSE upon completion, but we also support a new interactive mode.
+ * The function returns 1 if the request was understood, otherwise zero.
*/
-int stats_sock_parse_request(struct session *s, char *line)
+int stats_sock_parse_request(struct stream_interface *si, char *line)
{
+ struct session *s = si->private;
char *args[MAX_STATS_ARGS + 1];
int arg;
@@ -260,6 +266,8 @@
while (++arg <= MAX_STATS_ARGS)
args[arg] = line;
+ si->st0 = 0;
+ s->data_ctx.stats.flags = 0;
if (strcmp(args[0], "show") == 0) {
if (strcmp(args[1], "stat") == 0) {
if (*args[2] && *args[3] && *args[4]) {
@@ -271,21 +279,21 @@
s->data_ctx.stats.flags |= STAT_SHOW_STAT;
s->data_ctx.stats.flags |= STAT_FMT_CSV;
+ s->data_state = DATA_ST_INIT;
s->ana_state = STATS_ST_REP;
- stream_int_retnclose(s->rep->cons, NULL);
- buffer_install_hijacker(s, s->rep, stats_dump_raw_to_buffer);
+ si->st0 = 3; // stats_dump_raw_to_buffer
}
else if (strcmp(args[1], "info") == 0) {
s->data_ctx.stats.flags |= STAT_SHOW_INFO;
s->data_ctx.stats.flags |= STAT_FMT_CSV;
+ s->data_state = DATA_ST_INIT;
s->ana_state = STATS_ST_REP;
- stream_int_retnclose(s->rep->cons, NULL);
- buffer_install_hijacker(s, s->rep, stats_dump_raw_to_buffer);
+ si->st0 = 3; // stats_dump_raw_to_buffer
}
else if (strcmp(args[1], "sess") == 0) {
+ s->data_state = DATA_ST_INIT;
s->ana_state = STATS_ST_REP;
- stream_int_retnclose(s->rep->cons, NULL);
- buffer_install_hijacker(s, s->rep, stats_dump_sess_to_buffer);
+ si->st0 = 4; // stats_dump_sess_to_buffer
}
else if (strcmp(args[1], "errors") == 0) {
if (*args[2])
@@ -293,9 +301,9 @@
else
s->data_ctx.errors.iid = -1;
s->data_ctx.errors.px = NULL;
+ s->data_state = DATA_ST_INIT;
s->ana_state = STATS_ST_REP;
- stream_int_retnclose(s->rep->cons, NULL);
- buffer_install_hijacker(s, s->rep, stats_dump_errors_to_buffer);
+ si->st0 = 5; // stats_dump_errors_to_buffer
}
else { /* neither "stat" nor "info" nor "sess" */
return 0;
@@ -307,70 +315,207 @@
return 1;
}
-/* Processes the stats interpreter on the statistics socket.
- * In order to ease the transition, we simply simulate the server status
- * for now. It only knows states STATS_ST_INIT, STATS_ST_REQ, STATS_ST_REP, and
- * STATS_ST_CLOSE. It removes its analyser bit from req->analysers once done.
- * It always returns 0.
+/* This I/O handler runs as an applet embedded in a stream interface. It is
+ * used to processes I/O from/to the stats unix socket. Right now we still
+ * support older functions which used to emulate servers and to set
+ * STATS_ST_CLOSE upon completion, but we also support a new interactive mode.
+ * The system relies on a request/response flip-flop state machine. We read
+ * a request, then we process it and send the response. Then we can read again.
+ * This could be enhanced a lot but we're still bound to support older output
+ * functions which were designed to work as hijackers.
+ * At the moment, we use si->st0 as the output type, and si->st1 to indicate
+ * whether we're in prompt mode or not.
*/
-int stats_sock_req_analyser(struct session *s, struct buffer *req, int an_bit)
+void stats_io_handler(struct stream_interface *si)
{
- char *line, *p;
+ struct session *s = si->private;
+ struct buffer *req = si->ob;
+ struct buffer *res = si->ib;
+ int reql;
+ int len;
- switch (s->ana_state) {
- case STATS_ST_INIT:
- /* Stats output not initialized yet */
- memset(&s->data_ctx.stats, 0, sizeof(s->data_ctx.stats));
- s->data_source = DATA_SRC_STATS;
- s->ana_state = STATS_ST_REQ;
- buffer_dont_connect(s->req);
- /* fall through */
+ if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO))
+ goto out;
- case STATS_ST_REQ:
- /* Now, stats are initialized, hijack is not set, and
- * we are waiting for a complete request line.
- */
+ while (1) {
+ if (s->ana_state == STATS_ST_INIT) {
+ /* Stats output not initialized yet */
+ memset(&s->data_ctx.stats, 0, sizeof(s->data_ctx.stats));
+ s->data_source = DATA_SRC_STATS;
+ s->ana_state = STATS_ST_REQ;
+ }
+ else if (s->ana_state == STATS_ST_REQ) {
+ reql = buffer_si_peekline(si->ob, trash, sizeof(trash));
+ if (reql <= 0) { /* closed or EOL not found */
+ if (reql == 0)
+ break;
+ s->ana_state = STATS_ST_CLOSE;
+ continue;
+ }
- line = s->req->data;
- p = memchr(line, '\n', s->req->l);
+ /* seek for a possible semi-colon. If we find one, we
+ * replace it with an LF and skip only this part.
+ */
+ for (len = 0; len < reql; len++)
+ if (trash[len] == ';') {
+ trash[len] = '\n';
+ reql = len + 1;
+ break;
+ }
- if (p) {
- *p = '\0';
- if (!stats_sock_parse_request(s, line)) {
- /* invalid request */
- stream_int_retnclose(s->req->prod, &stats_sock_usage);
- s->ana_state = 0;
- req->analysers = 0;
- return 0;
+ /* ensure we have a full line */
+ if (trash[reql-1] != '\n') {
+ s->ana_state = STATS_ST_CLOSE;
+ continue;
}
+
+ len = reql - 1;
+ trash[len] = '\0';
+
+ si->st0 = 1; // default to prompt
+ if (len) {
+ if (strcmp(trash, "quit") == 0) {
+ s->ana_state = STATS_ST_CLOSE;
+ continue;
+ }
+ else if (strcmp(trash, "prompt") == 0)
+ si->st1 = !si->st1;
+ else if (strcmp(trash, "help") == 0 ||
+ !stats_sock_parse_request(si, trash))
+ si->st0 = 2; // help
+ }
+ else if (!si->st1) {
+ /* if prompt is disabled, print help on empty lines,
+ * so that the user at least knows how to enable
+ * prompt and find help.
+ */
+ si->st0 = 2;
+ }
+
+ /* re-adjust req buffer */
+ buffer_skip(si->ob, reql);
+
+ s->ana_state = STATS_ST_REP;
+ req->flags |= BF_READ_DONTWAIT; /* we plan to read small requests */
}
+ else if (s->ana_state == STATS_ST_REP) {
+ if (res->flags & (BF_SHUTR_NOW|BF_SHUTR)) {
+ s->ana_state = STATS_ST_CLOSE;
+ continue;
+ }
+
+ switch (si->st0) {
+ case 2: /* help */
+ if (buffer_feed(si->ib, stats_sock_usage.str, stats_sock_usage.len) < 0)
+ /* message sent or too large for buffer (!) */
+ si->st0 = 1; // send prompt
+ break;
+ case 3: /* stats/info dump, should be split later ? */
+ stats_dump_raw_to_buffer(s, res);
+ si->ib->flags |= BF_READ_PARTIAL; /* remove this once we use buffer_feed */
+ if (s->ana_state == STATS_ST_CLOSE)
+ si->st0 = 1; // end of command, send prompt
+ break;
+ case 4: /* sessions dump */
+ stats_dump_sess_to_buffer(s, res);
+ si->ib->flags |= BF_READ_PARTIAL; /* remove this once we use buffer_feed */
+ if (s->ana_state == STATS_ST_CLOSE)
+ si->st0 = 1; // end of command, send prompt
+ break;
+ case 5: /* errors dump */
+ stats_dump_errors_to_buffer(s, res);
+ si->ib->flags |= BF_READ_PARTIAL; /* remove this once we use buffer_feed */
+ if (s->ana_state == STATS_ST_CLOSE)
+ si->st0 = 1; // end of command, send prompt
+ break;
+ default: /* abnormal state or lack of space for prompt */
+ si->st0 = 1; // return to prompt
+ break;
+ }
- /* processing a valid or incomplete request */
- if ((req->flags & BF_FULL) || /* invalid request */
- (req->flags & BF_READ_ERROR) || /* input error */
- (req->flags & BF_READ_TIMEOUT) || /* read timeout */
- tick_is_expired(req->analyse_exp, now_ms) || /* request timeout */
- (req->flags & BF_SHUTR)) { /* input closed */
- buffer_shutw_now(s->rep);
+ if (si->st0 == 1) { /* post-command prompt (LF or LF + '> ') */
+ if (!si->st1) { /* non-interactive mode */
+ if (buffer_feed(si->ib, "\n", 1) < 0)
+ si->st0 = 0; // end of output
+ }
+ else { /* interactive mode */
+ if (buffer_feed(si->ib, "\n> ", 3) < 0)
+ si->st0 = 0; // end of output
+ }
+ }
+
+ /* If the output functions are still there, it means
+ * they require more room.
+ */
+ if (si->st0 > 0) {
+ s->ana_state = STATS_ST_REP; /* some old applets still force CLOSE */
+ si->flags |= SI_FL_WAIT_ROOM;
+ break;
+ }
+
+ /* Now we close the output if one of the writers did so,
+ * or if we're not in interactive mode and the request
+ * buffer is empty. This still allows pipelined requests
+ * to be sent in non-interactive mode.
+ */
+ if ((res->flags & (BF_SHUTW|BF_SHUTW_NOW)) || (!si->st1 && !req->send_max)) {
+ s->ana_state = STATS_ST_CLOSE;
+ continue;
+ }
+
+ /* switch state back to ST_REQ to read next requests */
+ s->ana_state = STATS_ST_REQ;
+ }
+ else if (s->ana_state == STATS_ST_CLOSE) {
+ /* let's close for real now. Note that we may as well
+ * call shutw+shutr, but this is enough since the shut
+ * conditions below will complete.
+ */
+ buffer_shutw(si->ob);
s->ana_state = 0;
- req->analysers = 0;
- return 0;
+ break;
}
- /* don't forward nor abort */
- req->flags |= BF_READ_DONTWAIT; /* we plan to read small requests */
- return 0;
+ }
- case STATS_ST_REP:
- /* do nothing while response is being processed */
- return 0;
+ if ((res->flags & BF_SHUTR) && (si->state == SI_ST_EST) && (s->ana_state != STATS_ST_REQ)) {
+ DPRINTF(stderr, "%s@%d: si to buf closed. req=%08x, res=%08x, st=%d\n",
+ __FUNCTION__, __LINE__, req->flags, res->flags, si->state);
+ /* Other size has closed, let's abort if we have no more processing to do
+ * and nothing more to consume. This is comparable to a broken pipe, so
+ * we forward the close to the request side so that it flows upstream to
+ * the client.
+ */
+ si->shutw(si);
+ }
+
+ if ((req->flags & BF_SHUTW) && (si->state == SI_ST_EST) && (s->ana_state != STATS_ST_REP)) {
+ DPRINTF(stderr, "%s@%d: buf to si closed. req=%08x, res=%08x, st=%d\n",
+ __FUNCTION__, __LINE__, req->flags, res->flags, si->state);
+ /* We have no more processing to do, and nothing more to send, and
+ * the client side has closed. So we'll forward this state downstream
+ * on the response buffer.
+ */
+ si->shutr(si);
+ res->flags |= BF_READ_NULL;
+ }
+
+ /* update all other flags and resync with the other side */
+ si->update(si);
+
+ /* we don't want to expire timeouts while we're processing requests */
+ si->ib->rex = TICK_ETERNITY;
+ si->ob->wex = TICK_ETERNITY;
- case STATS_ST_CLOSE:
- /* end of dump */
- s->req->analysers &= ~an_bit;
+ out:
+ DPRINTF(stderr, "%s@%d: st=%d, rqf=%x, rpf=%x, rql=%d, rqs=%d, rl=%d, rs=%d\n",
+ __FUNCTION__, __LINE__,
+ si->state, req->flags, res->flags, req->l, req->send_max, res->l, res->send_max);
+
+ if (unlikely(si->state == SI_ST_DIS || si->state == SI_ST_CLO)) {
+ /* check that we have released everything then unregister */
+ stream_int_unregister_handler(si);
s->ana_state = 0;
- break;
}
- return 0;
}
/*
diff --git a/src/proto_uxst.c b/src/proto_uxst.c
index 686d854..695f407 100644
--- a/src/proto_uxst.c
+++ b/src/proto_uxst.c
@@ -41,6 +41,7 @@
#include <proto/acl.h>
#include <proto/backend.h>
#include <proto/buffers.h>
+#include <proto/dumpstats.h>
#include <proto/fd.h>
#include <proto/log.h>
#include <proto/protocols.h>
@@ -460,16 +461,12 @@
s->si[1].err_type = SI_ET_NONE;
s->si[1].err_loc = NULL;
s->si[1].owner = t;
- s->si[1].update = stream_sock_data_finish;
- s->si[1].shutr = stream_sock_shutr;
- s->si[1].shutw = stream_sock_shutw;
- s->si[1].chk_rcv = stream_sock_chk_rcv;
- s->si[1].chk_snd = stream_sock_chk_snd;
- s->si[1].connect = NULL;
- s->si[1].iohandler = NULL;
s->si[1].exp = TICK_ETERNITY;
s->si[1].fd = -1; /* just to help with debugging */
s->si[1].flags = SI_FL_NONE;
+ stream_int_register_handler(&s->si[1], stats_io_handler);
+ s->si[1].private = s;
+ s->si[1].st0 = s->si[1].st1 = 0;
s->srv = s->prev_srv = s->srv_conn = NULL;
s->pend_pos = NULL;
diff --git a/src/session.c b/src/session.c
index f9b7157..e359eb1 100644
--- a/src/session.c
+++ b/src/session.c
@@ -847,12 +847,6 @@
break;
}
- if (s->req->analysers & AN_REQ_STATS_SOCK) {
- last_ana |= AN_REQ_STATS_SOCK;
- if (!stats_sock_req_analyser(s, s->req, AN_REQ_STATS_SOCK))
- break;
- }
-
if (s->req->analysers & AN_REQ_PRST_RDP_COOKIE) {
last_ana |= AN_REQ_PRST_RDP_COOKIE;
if (!tcp_persist_rdp_cookie(s, s->req, AN_REQ_PRST_RDP_COOKIE))