BUG/MEDIUM: sink/forwarder: ensure ring offset is properly readjusted to head

Since d9c7188 ("MEDIUM: ring: make the offset relative to the head/tail instead
of absolute"), ring offset calculation has changed: we don't rely on ring->ofs
absolute offset anymore.

But with the above patch, relative offset is not properly calculated in
sink_forward_oc_io_handler() and sink_forward_io_handler().

The issue here is the same as 737d10f ("BUG/MEDIUM: dns: ensure ring offset is
properly reajusted to head") since dns and sink_forward share the same
ring logic:

When the ring is becoming full, ring_write() will try to regain some space to
insert new data by calling b_del() on older messages. Here b_del() moves
buffer's head under the hood, and since ring->ofs cannot be used to "correct"
the relative offset, both sink_forward_oc_io_handler() and
sink_forward_io_handler() start to get invalid offset.
At this point, we will suffer from ring data corruption resulting in unexpected
behavior or process crashes.

This can be easily demonstrated with the following test:

    |log-forward syslog
    |  dgram-bind 127.0.0.1:5114
    |  log ring@logbuffer local0
    |
    |ring logbuffer
    |  format rfc5424
    |  size 16384
    |  server logserver 127.0.0.1:5114

Haproxy will forward incoming logs on udp@127.0.0.1:5114 to
tcp@127.0.0.1:5114

Then use the following tcp server:
  nc -l -p 5114

With the following udp log sender:
    |while [ 1 ]
    |do
    |  logger --udp  --server 127.0.0.1 -P 5114 -p user.warn "Test 7"
    |done

Once the ring buffer is full (it takes less that a second to fill the 16k
buffer) haproxy starts to misbehave and the log forwarding stops.

We apply the same fix as in 737d10f ("BUG/MEDIUM: dns: ensure ring offset is
properly reajusted to head").
Please note the ~0 case that is handled slightly differently in this patch:
this is required to properly start reading from a non-empty ring. This case
will be fixed in dns related code in the following patch.

This does not need to be backported as d9c7188 was not marked for backports.
diff --git a/src/sink.c b/src/sink.c
index c656b8d..042e244 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -344,7 +344,6 @@
 		HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
 		goto close;
 	}
-	ofs = sft->ofs;
 
 	HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
 	LIST_DEL_INIT(&appctx->wait_entry);
@@ -360,9 +359,9 @@
 	 * existing messages before grabbing a reference to a location. This
 	 * value cannot be produced after initialization.
 	 */
-	if (unlikely(ofs == ~0)) {
-		ofs = 0;
-		HA_ATOMIC_INC(b_peek(buf, ofs));
+	if (unlikely(sft->ofs == ~0)) {
+		sft->ofs = b_peek_ofs(buf, 0);
+		HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
 	}
 
 	/* in this loop, ofs always points to the counter byte that precedes
@@ -373,6 +372,9 @@
 		/* we were already there, adjust the offset to be relative to
 		 * the buffer's head and remove us from the counter.
 		 */
+		ofs = sft->ofs - b_head_ofs(buf);
+		if (sft->ofs < b_head_ofs(buf))
+			ofs += b_size(buf);
 		BUG_ON(ofs >= buf->size);
 		HA_ATOMIC_DEC(b_peek(buf, ofs));
 
@@ -405,7 +407,7 @@
 
 		HA_ATOMIC_INC(b_peek(buf, ofs));
 		last_ofs = b_tail_ofs(buf);
-		sft->ofs = ofs;
+		sft->ofs = b_peek_ofs(buf, ofs);
 	}
 	HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
 
@@ -480,7 +482,6 @@
 		HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
 		goto close;
 	}
-	ofs = sft->ofs;
 
 	HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
 	LIST_DEL_INIT(&appctx->wait_entry);
@@ -496,9 +497,9 @@
 	 * existing messages before grabbing a reference to a location. This
 	 * value cannot be produced after initialization.
 	 */
-	if (unlikely(ofs == ~0)) {
-		ofs = 0;
-		HA_ATOMIC_INC(b_peek(buf, ofs));
+	if (unlikely(sft->ofs == ~0)) {
+		sft->ofs = b_peek_ofs(buf, 0);
+		HA_ATOMIC_INC(b_orig(buf) + sft->ofs);
 	}
 
 	/* in this loop, ofs always points to the counter byte that precedes
@@ -509,6 +510,9 @@
 		/* we were already there, adjust the offset to be relative to
 		 * the buffer's head and remove us from the counter.
 		 */
+		ofs = sft->ofs - b_head_ofs(buf);
+		if (sft->ofs < b_head_ofs(buf))
+			ofs += b_size(buf);
 		BUG_ON(ofs >= buf->size);
 		HA_ATOMIC_DEC(b_peek(buf, ofs));
 
@@ -544,7 +548,7 @@
 		}
 
 		HA_ATOMIC_INC(b_peek(buf, ofs));
-		sft->ofs = ofs;
+		sft->ofs = b_peek_ofs(buf, ofs);
 	}
 	HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);