MEDIUM: ring: implement a wait mode for watchers

Now it is possible for a reader to subscribe and wait for new events
sent to a ring buffer. When new events are written to a ring buffer,
the applets that are subscribed are woken up to display new events.
For now we only support this with the CLI applet called by "show events"
since the I/O handler is indeed a CLI I/O handler. But it's not
complicated to add other mechanisms to consume events and forward them
to external log servers for example. The wait mode is enabled by adding
"-w" after "show events <sink>". An extra "-n" was added to directly
seek to new events only.
diff --git a/doc/management.txt b/doc/management.txt
index 6709cee..9973747 100644
--- a/doc/management.txt
+++ b/doc/management.txt
@@ -1964,10 +1964,16 @@
     is the slash ('/') in header name "header/bizarre", which is not a valid
     HTTP character for a header name.
 
-show events [<sink>]
+show events [<sink>] [-w] [-n]
   With no option, this lists all known event sinks and their types. With an
   option, it will dump all available events in the designated sink if it is of
-  type buffer.
+  type buffer. If option "-w" is passed after the sink name, then once the end
+  of the buffer is reached, the command will wait for new events and display
+  them. It is possible to stop the operation by entering any input (which will
+  be discarded) or by closing the session. Finally, option "-n" is used to
+  directly seek to the end of the buffer, which is often convenient when
+  combined with "-w" to only report new events. For convenience, "-wn" or "-nw"
+  may be used to enable both options at once.
 
 show fd [<fd>]
   Dump the list of either all open file descriptors or just the one number <fd>
diff --git a/include/types/ring.h b/include/types/ring.h
index 3d69b22..86c507f 100644
--- a/include/types/ring.h
+++ b/include/types/ring.h
@@ -96,6 +96,7 @@
 struct ring {
 	struct buffer buf;   // storage area
 	size_t ofs;          // absolute offset in history of the buffer's head
+	struct list waiters; // list of waiters, for now, CLI "show event"
 	__decl_hathreads(HA_RWLOCK_T lock);
 	int readers_count;
 };
diff --git a/src/ring.c b/src/ring.c
index a38148c..f0886b0 100644
--- a/src/ring.c
+++ b/src/ring.c
@@ -48,6 +48,7 @@
 		goto fail;
 
 	HA_RWLOCK_INIT(&ring->lock);
+	LIST_INIT(&ring->waiters);
 	ring->readers_count = 0;
 	ring->ofs = 0;
 	ring->buf = b_make(area, size, 0, 0);
@@ -113,6 +114,7 @@
 ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
 {
 	struct buffer *buf = &ring->buf;
+	struct appctx *appctx;
 	size_t totlen = 0;
 	size_t lenlen;
 	size_t dellen;
@@ -187,6 +189,11 @@
 
 	*b_tail(buf) = 0; buf->data++;; // new read counter
 	sent = lenlen + totlen + 1;
+
+	/* notify potential readers */
+	list_for_each_entry(appctx, &ring->waiters, ctx.cli.l0)
+		appctx_wakeup(appctx);
+
  done_buf:
 	HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
 	return sent;
@@ -216,9 +223,12 @@
 
 /* This function dumps all events from the ring whose pointer is in <p0> into
  * the appctx's output buffer, and takes from <o0> the seek offset into the
- * buffer's history (0 for oldest known event). It returns 0 if the output
- * buffer is full and it needs to be called again, otherwise non-zero. It is
- * meant to be used with cli_release_show_ring() to clean up.
+ * buffer's history (0 for oldest known event). It looks at <i0> for boolean
+ * options: bit0 means it must wait for new data or any key to be pressed. Bit1
+ * means it must seek directly to the end to wait for new contents. It returns
+ * 0 if the output buffer or events are missing is full and it needs to be
+ * called again, otherwise non-zero. It is meant to be used with
+ * cli_release_show_ring() to clean up.
  */
 int cli_io_handler_show_ring(struct appctx *appctx)
 {
@@ -235,6 +245,8 @@
 
 	HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
 
+	LIST_DEL_INIT(&appctx->ctx.cli.l0);
+
 	/* explanation for the initialization below: it would be better to do
 	 * this in the parsing function but this would occasionally result in
 	 * dropped events because we'd take a reference on the oldest message
@@ -244,8 +256,14 @@
 	 * value cannot be produced after initialization.
 	 */
 	if (unlikely(ofs == ~0)) {
-		HA_ATOMIC_ADD(b_head(buf), 1);
-		ofs = ring->ofs;
+		ofs = 0;
+
+		/* going to the end means looking at tail-1 */
+		if (appctx->ctx.cli.i0 & 2)
+			ofs += b_data(buf) - 1;
+
+		HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+		ofs += ring->ofs;
 	}
 
 	/* we were already there, adjust the offset to be relative to
@@ -291,6 +309,20 @@
 	ofs += ring->ofs;
 	appctx->ctx.cli.o0 = ofs;
 	HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+	if (ret && (appctx->ctx.cli.i0 & 1)) {
+		/* we've drained everything and are configured to wait for more
+		 * data or an event (keypress, close)
+		 */
+		if (!si_oc(si)->output && !(si_oc(si)->flags & CF_SHUTW)) {
+			/* let's be woken up once new data arrive */
+			LIST_ADDQ(&ring->waiters, &appctx->ctx.cli.l0);
+			si_rx_endp_done(si);
+			ret = 0;
+		}
+		/* always drain all the request */
+		co_skip(si_oc(si), si_oc(si)->output);
+	}
 	return ret;
 }
 
@@ -308,6 +340,7 @@
 		/* reader was still attached */
 		ofs -= ring->ofs;
 		BUG_ON(ofs >= b_size(&ring->buf));
+		LIST_DEL_INIT(&appctx->ctx.cli.l0);
 		HA_ATOMIC_SUB(b_peek(&ring->buf, ofs), 1);
 	}
 	HA_ATOMIC_SUB(&ring->readers_count, 1);
diff --git a/src/sink.c b/src/sink.c
index 8d2ce91..ed49062 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -203,12 +203,13 @@
 static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
 {
 	struct sink *sink;
+	int arg;
 
 	args++; // make args[1] the 1st arg
 
 	if (!*args[1]) {
 		/* no arg => report the list of supported sink */
-		chunk_printf(&trash, "Supported events sinks:\n");
+		chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
 		list_for_each_entry(sink, &sink_list, sink_list) {
 			chunk_appendf(&trash, "    %-10s : type=%s, %u dropped, %s\n",
 				      sink->name,
@@ -232,6 +233,16 @@
 	if (sink->type != SINK_TYPE_BUFFER)
 		return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
 
+	for (arg = 2; *args[arg]; arg++) {
+		if (strcmp(args[arg], "-w") == 0)
+			appctx->ctx.cli.i0 |= 1; // wait mode
+		else if (strcmp(args[arg], "-n") == 0)
+			appctx->ctx.cli.i0 |= 2; // seek to new
+		else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
+			appctx->ctx.cli.i0 |= 3; // seek to new + wait
+		else
+			return cli_err(appctx, "unknown option");
+	}
 	return ring_attach_cli(sink->ctx.ring, appctx);
 }