MEDIUM: ring: implement a wait mode for watchers
Now it is possible for a reader to subscribe and wait for new events
sent to a ring buffer. When new events are written to a ring buffer,
the applets that are subscribed are woken up to display new events.
For now we only support this with the CLI applet called by "show events"
since the I/O handler is indeed a CLI I/O handler. But it's not
complicated to add other mechanisms to consume events and forward them
to external log servers for example. The wait mode is enabled by adding
"-w" after "show events <sink>". An extra "-n" was added to directly
seek to new events only.
diff --git a/doc/management.txt b/doc/management.txt
index 6709cee..9973747 100644
--- a/doc/management.txt
+++ b/doc/management.txt
@@ -1964,10 +1964,16 @@
is the slash ('/') in header name "header/bizarre", which is not a valid
HTTP character for a header name.
-show events [<sink>]
+show events [<sink>] [-w] [-n]
With no option, this lists all known event sinks and their types. With an
option, it will dump all available events in the designated sink if it is of
- type buffer.
+ type buffer. If option "-w" is passed after the sink name, then once the end
+ of the buffer is reached, the command will wait for new events and display
+ them. It is possible to stop the operation by entering any input (which will
+ be discarded) or by closing the session. Finally, option "-n" is used to
+ directly seek to the end of the buffer, which is often convenient when
+ combined with "-w" to only report new events. For convenience, "-wn" or "-nw"
+ may be used to enable both options at once.
show fd [<fd>]
Dump the list of either all open file descriptors or just the one number <fd>
diff --git a/include/types/ring.h b/include/types/ring.h
index 3d69b22..86c507f 100644
--- a/include/types/ring.h
+++ b/include/types/ring.h
@@ -96,6 +96,7 @@
struct ring {
struct buffer buf; // storage area
size_t ofs; // absolute offset in history of the buffer's head
+ struct list waiters; // list of waiters, for now, CLI "show event"
__decl_hathreads(HA_RWLOCK_T lock);
int readers_count;
};
diff --git a/src/ring.c b/src/ring.c
index a38148c..f0886b0 100644
--- a/src/ring.c
+++ b/src/ring.c
@@ -48,6 +48,7 @@
goto fail;
HA_RWLOCK_INIT(&ring->lock);
+ LIST_INIT(&ring->waiters);
ring->readers_count = 0;
ring->ofs = 0;
ring->buf = b_make(area, size, 0, 0);
@@ -113,6 +114,7 @@
ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
{
struct buffer *buf = &ring->buf;
+ struct appctx *appctx;
size_t totlen = 0;
size_t lenlen;
size_t dellen;
@@ -187,6 +189,11 @@
*b_tail(buf) = 0; buf->data++;; // new read counter
sent = lenlen + totlen + 1;
+
+ /* notify potential readers */
+ list_for_each_entry(appctx, &ring->waiters, ctx.cli.l0)
+ appctx_wakeup(appctx);
+
done_buf:
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
return sent;
@@ -216,9 +223,12 @@
/* This function dumps all events from the ring whose pointer is in <p0> into
* the appctx's output buffer, and takes from <o0> the seek offset into the
- * buffer's history (0 for oldest known event). It returns 0 if the output
- * buffer is full and it needs to be called again, otherwise non-zero. It is
- * meant to be used with cli_release_show_ring() to clean up.
+ * buffer's history (0 for oldest known event). It looks at <i0> for boolean
+ * options: bit0 means it must wait for new data or any key to be pressed. Bit1
+ * means it must seek directly to the end to wait for new contents. It returns
+ * 0 if the output buffer or events are missing is full and it needs to be
+ * called again, otherwise non-zero. It is meant to be used with
+ * cli_release_show_ring() to clean up.
*/
int cli_io_handler_show_ring(struct appctx *appctx)
{
@@ -235,6 +245,8 @@
HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
+ LIST_DEL_INIT(&appctx->ctx.cli.l0);
+
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in
* dropped events because we'd take a reference on the oldest message
@@ -244,8 +256,14 @@
* value cannot be produced after initialization.
*/
if (unlikely(ofs == ~0)) {
- HA_ATOMIC_ADD(b_head(buf), 1);
- ofs = ring->ofs;
+ ofs = 0;
+
+ /* going to the end means looking at tail-1 */
+ if (appctx->ctx.cli.i0 & 2)
+ ofs += b_data(buf) - 1;
+
+ HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+ ofs += ring->ofs;
}
/* we were already there, adjust the offset to be relative to
@@ -291,6 +309,20 @@
ofs += ring->ofs;
appctx->ctx.cli.o0 = ofs;
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+ if (ret && (appctx->ctx.cli.i0 & 1)) {
+ /* we've drained everything and are configured to wait for more
+ * data or an event (keypress, close)
+ */
+ if (!si_oc(si)->output && !(si_oc(si)->flags & CF_SHUTW)) {
+ /* let's be woken up once new data arrive */
+ LIST_ADDQ(&ring->waiters, &appctx->ctx.cli.l0);
+ si_rx_endp_done(si);
+ ret = 0;
+ }
+ /* always drain all the request */
+ co_skip(si_oc(si), si_oc(si)->output);
+ }
return ret;
}
@@ -308,6 +340,7 @@
/* reader was still attached */
ofs -= ring->ofs;
BUG_ON(ofs >= b_size(&ring->buf));
+ LIST_DEL_INIT(&appctx->ctx.cli.l0);
HA_ATOMIC_SUB(b_peek(&ring->buf, ofs), 1);
}
HA_ATOMIC_SUB(&ring->readers_count, 1);
diff --git a/src/sink.c b/src/sink.c
index 8d2ce91..ed49062 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -203,12 +203,13 @@
static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
{
struct sink *sink;
+ int arg;
args++; // make args[1] the 1st arg
if (!*args[1]) {
/* no arg => report the list of supported sink */
- chunk_printf(&trash, "Supported events sinks:\n");
+ chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
list_for_each_entry(sink, &sink_list, sink_list) {
chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
sink->name,
@@ -232,6 +233,16 @@
if (sink->type != SINK_TYPE_BUFFER)
return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
+ for (arg = 2; *args[arg]; arg++) {
+ if (strcmp(args[arg], "-w") == 0)
+ appctx->ctx.cli.i0 |= 1; // wait mode
+ else if (strcmp(args[arg], "-n") == 0)
+ appctx->ctx.cli.i0 |= 2; // seek to new
+ else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
+ appctx->ctx.cli.i0 |= 3; // seek to new + wait
+ else
+ return cli_err(appctx, "unknown option");
+ }
return ring_attach_cli(sink->ctx.ring, appctx);
}