BUG/MINOR: sink: fix a race condition between the writer and the reader

This is the same issue as just fixed in b8e0fb97f ("BUG/MINOR: ring/cli:
fix a race condition between the writer and the reader") but this time
for sinks. They're also sucking the ring and present the same race at
high write loads.

This must be backported to 2.2 as well. See comments in the aforementioned
commit for backport hints if needed.

(cherry picked from commit 53bfab080c124181e555f1c8b1f150d673847738)
Signed-off-by: Willy Tarreau <w@1wt.eu>
(cherry picked from commit aeb71723a16e0ea492a565ac923c6d058e45bfe1)
[wt: s/applet_have_no_more_data/si_rx_endp_done/;
     s/applet_have_more_data/si_rx_endp_more/]
Signed-off-by: Willy Tarreau <w@1wt.eu>
(cherry picked from commit 49f25f5413d502357d6ef1da17e13da2fe0be2d6)
Signed-off-by: Willy Tarreau <w@1wt.eu>
diff --git a/src/sink.c b/src/sink.c
index 0f421cc..4213e69 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -302,7 +302,7 @@
 	struct ring *ring = sink->ctx.ring;
 	struct buffer *buf = &ring->buf;
 	uint64_t msg_len;
-	size_t len, cnt, ofs;
+	size_t len, cnt, ofs, last_ofs;
 	int ret = 0;
 
 	/* if stopping was requested, close immediately */
@@ -406,6 +406,7 @@
 		HA_ATOMIC_INC(b_peek(buf, ofs));
 		ofs += ring->ofs;
 		sft->ofs = ofs;
+		last_ofs = ring->ofs;
 	}
 	HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
 
@@ -413,8 +414,16 @@
 		/* let's be woken up once new data arrive */
 		HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
 		LIST_APPEND(&ring->waiters, &appctx->wait_entry);
+		ofs = ring->ofs;
 		HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
-		si_rx_endp_done(si);
+		if (ofs != last_ofs) {
+			/* more data was added into the ring between the
+			 * unlock and the lock, and the writer might not
+			 * have seen us. We need to reschedule a read.
+			 */
+			si_rx_endp_more(si);
+		} else
+			si_rx_endp_done(si);
 	}
 	HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);