BUG/MINOR: ring/cli: fix a race condition between the writer and the reader

The ring's CLI reader unlocks the read side of a ring and relocks it for
writing only if it needs to re-subscribe. But during this time, the writer
might have pushed data, see nobody subscribed hence woken nobody, while
the reader would have left marking that the applet had no more data. This
results in a dump that will not make any forward progress: the ring is
clogged by this reader which believes there's no data and the writer
will never wake it up.

The right approach consists in verifying after re-attaching if the writer
had made any progress in between, and to report that another call is
needed. Note that a jump back to the beginning would also work but here
we provide better fairness between readers this way.

This needs to be backported to 2.2. The applet API needed to signal the
availability of new data changed a few times since then.

(cherry picked from commit b8e0fb97f339583148446711ba69880b32105c7f)
Signed-off-by: Willy Tarreau <w@1wt.eu>
(cherry picked from commit 00f5c61a988de84a42810e2c164135abbcef0951)
[wt: s/ctx->ofs/ctx.cli.o0/; s/applet_have_no_more_data/si_rx_endp_done/;
     s/applet_have_more_data/si_rx_endp_more/]
Signed-off-by: Willy Tarreau <w@1wt.eu>
(cherry picked from commit 764717ad1b0831c2adb329e99ae3595d3ab1acca)
Signed-off-by: Willy Tarreau <w@1wt.eu>
diff --git a/src/ring.c b/src/ring.c
index 4f1bca2..ba1da1b 100644
--- a/src/ring.c
+++ b/src/ring.c
@@ -280,6 +280,7 @@
 	struct ring *ring = appctx->ctx.cli.p0;
 	struct buffer *buf = &ring->buf;
 	size_t ofs = appctx->ctx.cli.o0;
+	size_t last_ofs;
 	uint64_t msg_len;
 	size_t len, cnt;
 	int ret;
@@ -353,6 +354,7 @@
 
 	HA_ATOMIC_INC(b_peek(buf, ofs));
 	ofs += ring->ofs;
+	last_ofs = ring->ofs;
 	appctx->ctx.cli.o0 = ofs;
 	HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
 
@@ -364,8 +366,16 @@
 			/* let's be woken up once new data arrive */
 			HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
 			LIST_APPEND(&ring->waiters, &appctx->wait_entry);
+			ofs = ring->ofs;
 			HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
-			si_rx_endp_done(si);
+			if (ofs != last_ofs) {
+				/* more data was added into the ring between the
+				 * unlock and the lock, and the writer might not
+				 * have seen us. We need to reschedule a read.
+				 */
+				si_rx_endp_more(si);
+			} else
+				si_rx_endp_done(si);
 			ret = 0;
 		}
 		/* always drain all the request */