MEDIUM: ring: add server statement to forward messages from a ring

This patch adds new statement "server" into ring section, and the
related "timeout connect" and "timeout server".

server <name> <address> [param*]
  Used to configure a syslog tcp server to forward messages from ring buffer.
  This supports for all "server" parameters found in 5.2 paragraph.
  Some of these parameters are irrelevant for "ring" sections.

timeout connect <timeout>
  Set the maximum time to wait for a connection attempt to a server to succeed.

  Arguments :
    <timeout> is the timeout value specified in milliseconds by default, but
              can be in any other unit if the number is suffixed by the unit,
              as explained at the top of this document.

timeout server <timeout>
  Set the maximum time for pending data staying into output buffer.

  Arguments :
    <timeout> is the timeout value specified in milliseconds by default, but
              can be in any other unit if the number is suffixed by the unit,
              as explained at the top of this document.

  Example:
    global
        log ring@myring local7

    ring myring
        description "My local buffer"
        format rfc3164
        maxlen 1200
        size 32764
        timeout connect 5s
        timeout server 10s
        server mysyslogsrv 127.0.0.1:6514
diff --git a/src/sink.c b/src/sink.c
index 50f0352..e7a0c02 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -27,6 +27,7 @@
 #include <proto/cli.h>
 #include <proto/log.h>
 #include <proto/ring.h>
+#include <proto/signal.h>
 #include <proto/sink.h>
 #include <proto/stream_interface.h>
 
@@ -57,7 +58,7 @@
 	if (sink)
 		goto end;
 
-	sink = malloc(sizeof(*sink));
+	sink = calloc(1, sizeof(*sink));
 	if (!sink)
 		goto end;
 
@@ -316,6 +317,319 @@
 	return ring_attach_cli(sink->ctx.ring, appctx);
 }
 
+/* Pre-configures a ring proxy to emmit connections */
+void sink_setup_proxy(struct proxy *px)
+{
+	px->last_change = now.tv_sec;
+	px->cap = PR_CAP_FE | PR_CAP_BE;
+	px->maxconn = 0;
+	px->conn_retries = 1;
+	px->timeout.server = TICK_ETERNITY;
+	px->timeout.client = TICK_ETERNITY;
+	px->timeout.connect = TICK_ETERNITY;
+	px->accept = NULL;
+	px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
+	px->bind_proc = 0; /* will be filled by users */
+}
+
+/*
+ * IO Handler to handle message push to syslog tcp server
+ */
+static void sink_forward_io_handler(struct appctx *appctx)
+{
+	struct stream_interface *si = appctx->owner;
+	struct stream *s = si_strm(si);
+	struct sink *sink = strm_fe(s)->parent;
+	struct sink_forward_target *sft = appctx->ctx.sft.ptr;
+	struct ring *ring = sink->ctx.ring;
+	struct buffer *buf = &ring->buf;
+	uint64_t msg_len;
+	size_t len, cnt, ofs;
+	int ret = 0;
+
+	/* if stopping was requested, close immediatly */
+	if (unlikely(stopping))
+		goto close;
+
+	/* for rex because it seems reset to timeout
+	 * and we don't want expire on this case
+	 * with a syslog server
+	 */
+	si_oc(si)->rex = TICK_ETERNITY;
+	/* rto should not change but it seems the case */
+	si_oc(si)->rto = TICK_ETERNITY;
+
+	/* an error was detected */
+	if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
+		goto close;
+
+	/* con closed by server side */
+	if ((si_oc(si)->flags & CF_SHUTW))
+		goto close;
+
+	/* if the connection is not established, inform the stream that we want
+	 * to be notified whenever the connection completes.
+	 */
+	if (si_opposite(si)->state < SI_ST_EST) {
+		si_cant_get(si);
+		si_rx_conn_blk(si);
+		si_rx_endp_more(si);
+		return;
+	}
+
+	HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+	if (appctx != sft->appctx) {
+		HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+		goto close;
+	}
+	ofs = sft->ofs;
+
+	HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
+	LIST_DEL_INIT(&appctx->wait_entry);
+	HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+	HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
+
+	/* explanation for the initialization below: it would be better to do
+	 * this in the parsing function but this would occasionally result in
+	 * dropped events because we'd take a reference on the oldest message
+	 * and keep it while being scheduled. Thus instead let's take it the
+	 * first time we enter here so that we have a chance to pass many
+	 * existing messages before grabbing a reference to a location. This
+	 * value cannot be produced after initialization.
+	 */
+	if (unlikely(ofs == ~0)) {
+		ofs = 0;
+
+		HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+		ofs += ring->ofs;
+	}
+
+	/* we were already there, adjust the offset to be relative to
+	 * the buffer's head and remove us from the counter.
+	 */
+	ofs -= ring->ofs;
+	BUG_ON(ofs >= buf->size);
+	HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
+
+	/* in this loop, ofs always points to the counter byte that precedes
+	 * the message so that we can take our reference there if we have to
+	 * stop before the end (ret=0).
+	 */
+	if (si_opposite(si)->state == SI_ST_EST) {
+		ret = 1;
+		while (ofs + 1 < b_data(buf)) {
+			cnt = 1;
+			len = b_peek_varint(buf, ofs + cnt, &msg_len);
+			if (!len)
+				break;
+			cnt += len;
+			BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
+
+			if (unlikely(msg_len + 1 > b_size(&trash))) {
+				/* too large a message to ever fit, let's skip it */
+				ofs += cnt + msg_len;
+				continue;
+			}
+
+			chunk_reset(&trash);
+			len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
+			trash.data += len;
+			trash.area[trash.data++] = '\n';
+
+			if (ci_putchk(si_ic(si), &trash) == -1) {
+				si_rx_room_blk(si);
+				ret = 0;
+				break;
+			}
+			ofs += cnt + msg_len;
+		}
+
+		HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
+		ofs += ring->ofs;
+		sft->ofs = ofs;
+	}
+	HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
+
+	if (ret) {
+		/* let's be woken up once new data arrive */
+		HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
+		LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
+		HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
+		si_rx_endp_done(si);
+	}
+	HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+
+	/* always drain data from server */
+	co_skip(si_oc(si), si_oc(si)->output);
+	return;
+
+close:
+	si_shutw(si);
+	si_shutr(si);
+	si_ic(si)->flags |= CF_READ_NULL;
+}
+
+void __sink_forward_session_deinit(struct sink_forward_target *sft)
+{
+	struct stream_interface *si;
+	struct stream *s;
+	struct sink *sink;
+
+	if (!sft->appctx)
+		return;
+
+	si = sft->appctx->owner;
+	if (!si)
+		return;
+
+	s = si_strm(si);
+	if (!s)
+		return;
+
+	sink = strm_fe(s)->parent;
+	if (!sink)
+		return;
+
+	HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
+	LIST_DEL_INIT(&sft->appctx->wait_entry);
+	HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
+
+	sft->appctx = NULL;
+	task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
+}
+
+
+static void sink_forward_session_release(struct appctx *appctx)
+{
+	struct sink_forward_target *sft = appctx->ctx.peers.ptr;
+
+	if (!sft)
+		return;
+
+	HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+	if (sft->appctx == appctx)
+		__sink_forward_session_deinit(sft);
+	HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+}
+
+static struct applet sink_forward_applet = {
+	.obj_type = OBJ_TYPE_APPLET,
+	.name = "<SINKFWD>", /* used for logging */
+	.fct = sink_forward_io_handler,
+	.release = sink_forward_session_release,
+};
+
+/*
+ * Create a new peer session in assigned state (connect will start automatically)
+ */
+static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
+{
+	struct proxy *p = sink->forward_px;
+	struct appctx *appctx;
+	struct session *sess;
+	struct stream *s;
+
+	appctx = appctx_new(&sink_forward_applet, tid_bit);
+	if (!appctx)
+		goto out_close;
+
+	appctx->ctx.sft.ptr = (void *)sft;
+
+	sess = session_new(p, NULL, &appctx->obj_type);
+	if (!sess) {
+		ha_alert("out of memory in peer_session_create().\n");
+		goto out_free_appctx;
+	}
+
+	if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
+		ha_alert("Failed to initialize stream in peer_session_create().\n");
+		goto out_free_sess;
+	}
+
+
+	s->target = &sft->srv->obj_type;
+	if (!sockaddr_alloc(&s->target_addr))
+		goto out_free_strm;
+	*s->target_addr = sft->srv->addr;
+	s->flags = SF_ASSIGNED|SF_ADDR_SET;
+	s->si[1].flags |= SI_FL_NOLINGER;
+
+	s->do_log = NULL;
+	s->uniq_id = 0;
+
+	s->res.flags |= CF_READ_DONTWAIT;
+	/* for rto and rex to eternity to not expire on idle recv:
+	 * We are using a syslog server.
+	 */
+	s->res.rto = TICK_ETERNITY;
+	s->res.rex = TICK_ETERNITY;
+	sft->appctx = appctx;
+	task_wakeup(s->task, TASK_WOKEN_INIT);
+	return appctx;
+
+	/* Error unrolling */
+ out_free_strm:
+	LIST_DEL(&s->list);
+	pool_free(pool_head_stream, s);
+ out_free_sess:
+	session_free(sess);
+ out_free_appctx:
+	appctx_free(appctx);
+ out_close:
+	return NULL;
+}
+
+/*
+ * Task to handle connctions to forward servers
+ */
+static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
+{
+	struct sink *sink = (struct sink *)context;
+	struct sink_forward_target *sft = sink->sft;
+
+	task->expire = TICK_ETERNITY;
+
+	if (!stopping) {
+		while (sft) {
+			HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+			/* if appctx is NULL, start a new session */
+			if (!sft->appctx)
+				sft->appctx = sink_forward_session_create(sink, sft);
+			HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+			sft = sft->next;
+		}
+	}
+	else {
+		while (sft) {
+			HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
+			/* awake applet to perform a clean close */
+			if (sft->appctx)
+				appctx_wakeup(sft->appctx);
+			HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
+			sft = sft->next;
+		}
+	}
+
+	return task;
+}
+/*
+ * Init task to manage connctions to forward servers
+ *
+ * returns 0 in case of error.
+ */
+int sink_init_forward(struct sink *sink)
+{
+	sink->forward_task = task_new(MAX_THREADS_MASK);
+	if (!sink->forward_task)
+		return 0;
+
+	sink->forward_task->process = process_sink_forward;
+	sink->forward_task->context = (void *)sink;
+	sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
+	task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
+	return 1;
+}
 /*
  * Parse "ring" section and create corresponding sink buffer.
  *
@@ -327,6 +641,7 @@
 	int err_code = 0;
 	const char *inv;
 	size_t size = BUFSIZE;
+	struct proxy *p;
 
 	if (strcmp(args[0], "ring") == 0) { /* new peers section */
 		if (!*args[1]) {
@@ -354,6 +669,22 @@
 			err_code |= ERR_ALERT | ERR_FATAL;
 			goto err;
 		}
+
+		/* allocate new proxy to handle forwards */
+		p = calloc(1, sizeof *p);
+		if (!p) {
+			ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+			err_code |= ERR_ALERT | ERR_FATAL;
+			goto err;
+		}
+
+		init_new_proxy(p);
+		sink_setup_proxy(p);
+		p->parent = cfg_sink;
+		p->id = strdup(args[1]);
+		p->conf.args.file = p->conf.file = strdup(file);
+		p->conf.args.line = p->conf.line = linenum;
+		cfg_sink->forward_px = p;
 	}
 	else if (strcmp(args[0], "size") == 0) {
 		size = atol(args[1]);
@@ -370,6 +701,52 @@
 			goto err;
 		}
 	}
+	else if (strcmp(args[0],"server") == 0) {
+		err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0);
+	}
+	else if (strcmp(args[0],"timeout") == 0) {
+		if (!cfg_sink || !cfg_sink->forward_px) {
+			ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
+			err_code |= ERR_ALERT | ERR_FATAL;
+			goto err;
+		}
+
+                if (strcmp(args[1], "connect") == 0 ||
+		    strcmp(args[1], "server") == 0) {
+			const char *res;
+			unsigned int tout;
+
+			if (!*args[2]) {
+				ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
+					 file, linenum, args[0], args[1]);
+				err_code |= ERR_ALERT | ERR_FATAL;
+				goto err;
+			}
+			res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
+			if (res == PARSE_TIME_OVER) {
+				ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
+					 file, linenum, args[2], args[0], args[1]);
+				err_code |= ERR_ALERT | ERR_FATAL;
+				goto err;
+			}
+			else if (res == PARSE_TIME_UNDER) {
+				ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
+					 file, linenum, args[2], args[0], args[1]);
+				err_code |= ERR_ALERT | ERR_FATAL;
+				goto err;
+			}
+			else if (res) {
+				ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
+					 file, linenum, *res, args[0], args[1]);
+				err_code |= ERR_ALERT | ERR_FATAL;
+				goto err;
+			}
+                        if (args[1][2] == 'c')
+                                cfg_sink->forward_px->timeout.connect = tout;
+                        else
+                                cfg_sink->forward_px->timeout.server = tout;
+		}
+	}
 	else if (strcmp(args[0],"format") == 0) {
 		if (!cfg_sink) {
 			ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
@@ -456,6 +833,7 @@
 int cfg_post_parse_ring()
 {
 	int err_code = 0;
+	struct server *srv;
 
 	if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
 		if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
@@ -464,8 +842,44 @@
 			cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
 			err_code |= ERR_ALERT;
 		}
-	}
+
+		/* prepare forward server descriptors */
+		if (cfg_sink->forward_px) {
+			srv = cfg_sink->forward_px->srv;
+			while (srv) {
+				struct sink_forward_target *sft;
+				/* init ssl if needed */
+				if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
+					if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
+						ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
+						err_code |= ERR_ALERT | ERR_FATAL;
+					}
+				}
 
+				/* allocate sink_forward_target descriptor */
+				sft = calloc(1, sizeof(*sft));
+				if (!sft) {
+					ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
+					err_code |= ERR_ALERT | ERR_FATAL;
+					break;
+				}
+				sft->srv = srv;
+				sft->appctx = NULL;
+				sft->ofs = ~0; /* init ring offset */
+				sft->next = cfg_sink->sft;
+				HA_SPIN_INIT(&sft->lock);
+
+				/* mark server attached to the ring */
+				if (!ring_attach(cfg_sink->ctx.ring)) {
+					ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
+					err_code |= ERR_ALERT | ERR_FATAL;
+				}
+				cfg_sink->sft = sft;
+				srv = srv->next;
+			}
+			sink_init_forward(cfg_sink);
+		}
+	}
 	cfg_sink = NULL;
 
 	return err_code;