blob: b89ebb925c48c61ca9b4836bdc3c7ed7fa171166 [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau36979d92020-06-05 17:27:29 +020021#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Christopher Faulet6b0a0fb2022-04-04 11:29:28 +020023#include <haproxy/applet.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020024#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020025#include <haproxy/cli.h>
Christopher Faulet908628c2022-03-25 16:43:49 +010026#include <haproxy/conn_stream.h>
27#include <haproxy/cs_utils.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020028#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020029#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020030#include <haproxy/log.h>
Willy Tarreau817538e2021-05-08 20:20:21 +020031#include <haproxy/proxy.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020032#include <haproxy/ring.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020033#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020034#include <haproxy/sink.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020035#include <haproxy/time.h>
Willy Tarreau4bad5e22021-05-08 13:05:30 +020036#include <haproxy/tools.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020037
38struct list sink_list = LIST_HEAD_INIT(sink_list);
39
Emeric Brun99c453d2020-05-25 15:01:04 +020040struct sink *cfg_sink;
41
Willy Tarreau67b5a162019-08-11 16:38:56 +020042struct sink *sink_find(const char *name)
43{
44 struct sink *sink;
45
46 list_for_each_entry(sink, &sink_list, sink_list)
47 if (strcmp(sink->name, name) == 0)
48 return sink;
49 return NULL;
50}
51
52/* creates a new sink and adds it to the list, it's still generic and not fully
53 * initialized. Returns NULL on allocation failure. If another one already
54 * exists with the same name, it will be returned. The caller can detect it as
55 * a newly created one has type SINK_TYPE_NEW.
56 */
Emeric Brun54648852020-07-06 15:54:06 +020057static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020058{
59 struct sink *sink;
60
61 sink = sink_find(name);
62 if (sink)
63 goto end;
64
Emeric Brun494c5052020-05-28 11:13:15 +020065 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020066 if (!sink)
67 goto end;
68
Emeric Brun99c453d2020-05-25 15:01:04 +020069 sink->name = strdup(name);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010070 if (!sink->name)
71 goto err;
72
Emeric Brun99c453d2020-05-25 15:01:04 +020073 sink->desc = strdup(desc);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010074 if (!sink->desc)
75 goto err;
76
Willy Tarreau67b5a162019-08-11 16:38:56 +020077 sink->fmt = fmt;
78 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010079 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020080 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020081 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020082 sink->ctx.dropped = 0;
83 HA_RWLOCK_INIT(&sink->ctx.lock);
Willy Tarreau2b718102021-04-21 07:32:39 +020084 LIST_APPEND(&sink_list, &sink->sink_list);
Willy Tarreau67b5a162019-08-11 16:38:56 +020085 end:
86 return sink;
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010087
88 err:
Willy Tarreau61cfdf42021-02-20 10:46:51 +010089 ha_free(&sink->name);
90 ha_free(&sink->desc);
91 ha_free(&sink);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010092
93 return NULL;
Willy Tarreau67b5a162019-08-11 16:38:56 +020094}
95
Willy Tarreau973e6622019-08-20 11:57:52 +020096/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
97 * and description <desc>. Returns NULL on allocation failure or conflict.
98 * Perfect duplicates are merged (same type, fd, and name).
99 */
Emeric Brun54648852020-07-06 15:54:06 +0200100struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +0200101{
102 struct sink *sink;
103
104 sink = __sink_new(name, desc, fmt);
105 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
106 goto end;
107
108 if (sink->type != SINK_TYPE_NEW) {
109 sink = NULL;
110 goto end;
111 }
112
113 sink->type = SINK_TYPE_FD;
114 sink->ctx.fd = fd;
115 end:
116 return sink;
117}
118
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200119/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
120 * and description <desc>. Returns NULL on allocation failure or conflict.
121 * Perfect duplicates are merged (same type and name). If sizes differ, the
122 * largest one is kept.
123 */
Emeric Brun54648852020-07-06 15:54:06 +0200124struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200125{
126 struct sink *sink;
127
128 sink = __sink_new(name, desc, fmt);
129 if (!sink)
130 goto fail;
131
132 if (sink->type == SINK_TYPE_BUFFER) {
133 /* such a buffer already exists, we may have to resize it */
134 if (!ring_resize(sink->ctx.ring, size))
135 goto fail;
136 goto end;
137 }
138
139 if (sink->type != SINK_TYPE_NEW) {
140 /* already exists of another type */
141 goto fail;
142 }
143
144 sink->ctx.ring = ring_new(size);
145 if (!sink->ctx.ring) {
Willy Tarreau2b718102021-04-21 07:32:39 +0200146 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200147 free(sink->name);
148 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200149 free(sink);
150 goto fail;
151 }
152
153 sink->type = SINK_TYPE_BUFFER;
154 end:
155 return sink;
156 fail:
157 return NULL;
158}
159
Willy Tarreau67b5a162019-08-11 16:38:56 +0200160/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500161 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200162 * done here. Lost messages are NOT accounted for. It is preferable to call
163 * sink_write() instead which will also try to emit the number of dropped
164 * messages when there are any. It returns >0 if it could write anything,
165 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200166 */
Emeric Brun54648852020-07-06 15:54:06 +0200167 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
168 int level, int facility, struct ist *metadata)
169 {
170 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200171 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200172
Emeric Brun54648852020-07-06 15:54:06 +0200173 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200174 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200175
Emeric Brun54648852020-07-06 15:54:06 +0200176 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200177
178send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200179 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200180 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200181 }
182 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200183 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200184 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200185 return 0;
186}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200187
Willy Tarreau8f240232019-08-27 16:41:06 +0200188/* Tries to emit a message indicating the number of dropped events. In case of
189 * success, the amount of drops is reduced by as much. It's supposed to be
190 * called under an exclusive lock on the sink to avoid multiple produces doing
191 * the same. On success, >0 is returned, otherwise <=0 on failure.
192 */
Emeric Brun54648852020-07-06 15:54:06 +0200193int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200194{
Emeric Brun54648852020-07-06 15:54:06 +0200195 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
196 static THREAD_LOCAL pid_t curr_pid;
197 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200198 unsigned int dropped;
199 struct buffer msg;
200 struct ist msgvec[1];
201 char logbuf[64];
202
203 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
204 chunk_init(&msg, logbuf, sizeof(logbuf));
205 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
206 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200207
Emeric Brun54648852020-07-06 15:54:06 +0200208 if (!metadata[LOG_META_HOST].len) {
209 if (global.log_send_hostname)
Tim Duesterhus77508502022-03-15 13:11:06 +0100210 metadata[LOG_META_HOST] = ist(global.log_send_hostname);
Emeric Brun54648852020-07-06 15:54:06 +0200211 }
212
213 if (!metadata[LOG_META_TAG].len)
214 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
215
216 if (unlikely(curr_pid != getpid()))
217 metadata[LOG_META_PID].len = 0;
218
219 if (!metadata[LOG_META_PID].len) {
220 curr_pid = getpid();
221 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
222 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
223 }
224
225 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200226 return 0;
227 /* success! */
228 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
229 }
230 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200231}
232
Willy Tarreau9f830d72019-08-26 18:17:04 +0200233/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
234static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
235{
236 struct sink *sink;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200237 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200238
239 args++; // make args[1] the 1st arg
240
241 if (!*args[1]) {
242 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200243 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200244 list_for_each_entry(sink, &sink_list, sink_list) {
245 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
246 sink->name,
247 sink->type == SINK_TYPE_NEW ? "init" :
248 sink->type == SINK_TYPE_FD ? "fd" :
249 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
250 sink->ctx.dropped, sink->desc);
251 }
252
253 trash.area[trash.data] = 0;
254 return cli_msg(appctx, LOG_WARNING, trash.area);
255 }
256
257 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
258 return 1;
259
260 sink = sink_find(args[1]);
261 if (!sink)
262 return cli_err(appctx, "No such event sink");
263
264 if (sink->type != SINK_TYPE_BUFFER)
265 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
266
Willy Tarreau1d181e42019-08-30 11:17:01 +0200267 for (arg = 2; *args[arg]; arg++) {
268 if (strcmp(args[arg], "-w") == 0)
269 appctx->ctx.cli.i0 |= 1; // wait mode
270 else if (strcmp(args[arg], "-n") == 0)
271 appctx->ctx.cli.i0 |= 2; // seek to new
272 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
273 appctx->ctx.cli.i0 |= 3; // seek to new + wait
274 else
275 return cli_err(appctx, "unknown option");
276 }
Willy Tarreau9f830d72019-08-26 18:17:04 +0200277 return ring_attach_cli(sink->ctx.ring, appctx);
278}
279
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500280/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200281void sink_setup_proxy(struct proxy *px)
282{
283 px->last_change = now.tv_sec;
284 px->cap = PR_CAP_FE | PR_CAP_BE;
285 px->maxconn = 0;
286 px->conn_retries = 1;
287 px->timeout.server = TICK_ETERNITY;
288 px->timeout.client = TICK_ETERNITY;
289 px->timeout.connect = TICK_ETERNITY;
290 px->accept = NULL;
291 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
Emeric Brun494c5052020-05-28 11:13:15 +0200292}
293
294/*
295 * IO Handler to handle message push to syslog tcp server
296 */
297static void sink_forward_io_handler(struct appctx *appctx)
298{
Christopher Faulet908628c2022-03-25 16:43:49 +0100299 struct conn_stream *cs = appctx->owner;
300 struct stream *s = __cs_strm(cs);
Emeric Brun494c5052020-05-28 11:13:15 +0200301 struct sink *sink = strm_fe(s)->parent;
302 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
303 struct ring *ring = sink->ctx.ring;
304 struct buffer *buf = &ring->buf;
305 uint64_t msg_len;
306 size_t len, cnt, ofs;
307 int ret = 0;
308
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500309 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200310 if (unlikely(stopping))
311 goto close;
312
313 /* for rex because it seems reset to timeout
314 * and we don't want expire on this case
315 * with a syslog server
316 */
Christopher Faulet908628c2022-03-25 16:43:49 +0100317 cs_oc(cs)->rex = TICK_ETERNITY;
Emeric Brun494c5052020-05-28 11:13:15 +0200318 /* rto should not change but it seems the case */
Christopher Faulet908628c2022-03-25 16:43:49 +0100319 cs_oc(cs)->rto = TICK_ETERNITY;
Emeric Brun494c5052020-05-28 11:13:15 +0200320
321 /* an error was detected */
Christopher Faulet908628c2022-03-25 16:43:49 +0100322 if (unlikely(cs_ic(cs)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
Emeric Brun494c5052020-05-28 11:13:15 +0200323 goto close;
324
325 /* con closed by server side */
Christopher Faulet908628c2022-03-25 16:43:49 +0100326 if ((cs_oc(cs)->flags & CF_SHUTW))
Emeric Brun494c5052020-05-28 11:13:15 +0200327 goto close;
328
329 /* if the connection is not established, inform the stream that we want
330 * to be notified whenever the connection completes.
331 */
Christopher Faulet62e75742022-03-31 09:16:34 +0200332 if (cs_opposite(cs)->state < CS_ST_EST) {
Christopher Fauleta0bdec32022-04-04 07:51:21 +0200333 cs_cant_get(cs);
334 cs_rx_conn_blk(cs);
335 cs_rx_endp_more(cs);
Emeric Brun494c5052020-05-28 11:13:15 +0200336 return;
337 }
338
339 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
340 if (appctx != sft->appctx) {
341 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
342 goto close;
343 }
344 ofs = sft->ofs;
345
346 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
347 LIST_DEL_INIT(&appctx->wait_entry);
348 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
349
350 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
351
352 /* explanation for the initialization below: it would be better to do
353 * this in the parsing function but this would occasionally result in
354 * dropped events because we'd take a reference on the oldest message
355 * and keep it while being scheduled. Thus instead let's take it the
356 * first time we enter here so that we have a chance to pass many
357 * existing messages before grabbing a reference to a location. This
358 * value cannot be produced after initialization.
359 */
360 if (unlikely(ofs == ~0)) {
361 ofs = 0;
362
Willy Tarreau4781b152021-04-06 13:53:36 +0200363 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200364 ofs += ring->ofs;
365 }
366
Emeric Brun494c5052020-05-28 11:13:15 +0200367 /* in this loop, ofs always points to the counter byte that precedes
368 * the message so that we can take our reference there if we have to
369 * stop before the end (ret=0).
370 */
Christopher Faulet62e75742022-03-31 09:16:34 +0200371 if (cs_opposite(cs)->state == CS_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100372 /* we were already there, adjust the offset to be relative to
373 * the buffer's head and remove us from the counter.
374 */
375 ofs -= ring->ofs;
376 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200377 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100378
Emeric Brun494c5052020-05-28 11:13:15 +0200379 ret = 1;
380 while (ofs + 1 < b_data(buf)) {
381 cnt = 1;
382 len = b_peek_varint(buf, ofs + cnt, &msg_len);
383 if (!len)
384 break;
385 cnt += len;
386 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
387
388 if (unlikely(msg_len + 1 > b_size(&trash))) {
389 /* too large a message to ever fit, let's skip it */
390 ofs += cnt + msg_len;
391 continue;
392 }
393
394 chunk_reset(&trash);
395 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
396 trash.data += len;
397 trash.area[trash.data++] = '\n';
398
Christopher Faulet908628c2022-03-25 16:43:49 +0100399 if (ci_putchk(cs_ic(cs), &trash) == -1) {
Christopher Fauleta0bdec32022-04-04 07:51:21 +0200400 cs_rx_room_blk(cs);
Emeric Brun494c5052020-05-28 11:13:15 +0200401 ret = 0;
402 break;
403 }
404 ofs += cnt + msg_len;
405 }
406
Willy Tarreau4781b152021-04-06 13:53:36 +0200407 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200408 ofs += ring->ofs;
409 sft->ofs = ofs;
410 }
411 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
412
413 if (ret) {
414 /* let's be woken up once new data arrive */
415 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200416 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Emeric Brun494c5052020-05-28 11:13:15 +0200417 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Christopher Fauleta0bdec32022-04-04 07:51:21 +0200418 cs_rx_endp_done(cs);
Emeric Brun494c5052020-05-28 11:13:15 +0200419 }
420 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
421
422 /* always drain data from server */
Christopher Faulet908628c2022-03-25 16:43:49 +0100423 co_skip(cs_oc(cs), cs_oc(cs)->output);
Emeric Brun494c5052020-05-28 11:13:15 +0200424 return;
425
426close:
Christopher Fauletda098e62022-03-31 17:44:45 +0200427 cs_shutw(cs);
428 cs_shutr(cs);
Christopher Faulet908628c2022-03-25 16:43:49 +0100429 cs_ic(cs)->flags |= CF_READ_NULL;
Emeric Brun494c5052020-05-28 11:13:15 +0200430}
431
Emeric Brun97556472020-05-30 01:42:45 +0200432/*
433 * IO Handler to handle message push to syslog tcp server
434 * using octet counting frames
435 */
436static void sink_forward_oc_io_handler(struct appctx *appctx)
437{
Christopher Faulet908628c2022-03-25 16:43:49 +0100438 struct conn_stream *cs = appctx->owner;
439 struct stream *s = __cs_strm(cs);
Emeric Brun97556472020-05-30 01:42:45 +0200440 struct sink *sink = strm_fe(s)->parent;
441 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
442 struct ring *ring = sink->ctx.ring;
443 struct buffer *buf = &ring->buf;
444 uint64_t msg_len;
445 size_t len, cnt, ofs;
446 int ret = 0;
447 char *p;
448
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500449 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200450 if (unlikely(stopping))
451 goto close;
452
453 /* for rex because it seems reset to timeout
454 * and we don't want expire on this case
455 * with a syslog server
456 */
Christopher Faulet908628c2022-03-25 16:43:49 +0100457 cs_oc(cs)->rex = TICK_ETERNITY;
Emeric Brun97556472020-05-30 01:42:45 +0200458 /* rto should not change but it seems the case */
Christopher Faulet908628c2022-03-25 16:43:49 +0100459 cs_oc(cs)->rto = TICK_ETERNITY;
Emeric Brun97556472020-05-30 01:42:45 +0200460
461 /* an error was detected */
Christopher Faulet908628c2022-03-25 16:43:49 +0100462 if (unlikely(cs_ic(cs)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
Emeric Brun97556472020-05-30 01:42:45 +0200463 goto close;
464
465 /* con closed by server side */
Christopher Faulet908628c2022-03-25 16:43:49 +0100466 if ((cs_oc(cs)->flags & CF_SHUTW))
Emeric Brun97556472020-05-30 01:42:45 +0200467 goto close;
468
469 /* if the connection is not established, inform the stream that we want
470 * to be notified whenever the connection completes.
471 */
Christopher Faulet62e75742022-03-31 09:16:34 +0200472 if (cs_opposite(cs)->state < CS_ST_EST) {
Christopher Fauleta0bdec32022-04-04 07:51:21 +0200473 cs_cant_get(cs);
474 cs_rx_conn_blk(cs);
475 cs_rx_endp_more(cs);
Emeric Brun97556472020-05-30 01:42:45 +0200476 return;
477 }
478
479 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
480 if (appctx != sft->appctx) {
481 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
482 goto close;
483 }
484 ofs = sft->ofs;
485
486 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
487 LIST_DEL_INIT(&appctx->wait_entry);
488 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
489
490 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
491
492 /* explanation for the initialization below: it would be better to do
493 * this in the parsing function but this would occasionally result in
494 * dropped events because we'd take a reference on the oldest message
495 * and keep it while being scheduled. Thus instead let's take it the
496 * first time we enter here so that we have a chance to pass many
497 * existing messages before grabbing a reference to a location. This
498 * value cannot be produced after initialization.
499 */
500 if (unlikely(ofs == ~0)) {
501 ofs = 0;
502
Willy Tarreau4781b152021-04-06 13:53:36 +0200503 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200504 ofs += ring->ofs;
505 }
506
Emeric Brun97556472020-05-30 01:42:45 +0200507 /* in this loop, ofs always points to the counter byte that precedes
508 * the message so that we can take our reference there if we have to
509 * stop before the end (ret=0).
510 */
Christopher Faulet62e75742022-03-31 09:16:34 +0200511 if (cs_opposite(cs)->state == CS_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100512 /* we were already there, adjust the offset to be relative to
513 * the buffer's head and remove us from the counter.
514 */
515 ofs -= ring->ofs;
516 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200517 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100518
Emeric Brun97556472020-05-30 01:42:45 +0200519 ret = 1;
520 while (ofs + 1 < b_data(buf)) {
521 cnt = 1;
522 len = b_peek_varint(buf, ofs + cnt, &msg_len);
523 if (!len)
524 break;
525 cnt += len;
526 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
527
528 chunk_reset(&trash);
529 p = ulltoa(msg_len, trash.area, b_size(&trash));
530 if (p) {
531 trash.data = (p - trash.area) + 1;
532 *p = ' ';
533 }
534
535 if (!p || (trash.data + msg_len > b_size(&trash))) {
536 /* too large a message to ever fit, let's skip it */
537 ofs += cnt + msg_len;
538 continue;
539 }
540
541 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
542
Christopher Faulet908628c2022-03-25 16:43:49 +0100543 if (ci_putchk(cs_ic(cs), &trash) == -1) {
Christopher Fauleta0bdec32022-04-04 07:51:21 +0200544 cs_rx_room_blk(cs);
Emeric Brun97556472020-05-30 01:42:45 +0200545 ret = 0;
546 break;
547 }
548 ofs += cnt + msg_len;
549 }
550
Willy Tarreau4781b152021-04-06 13:53:36 +0200551 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200552 ofs += ring->ofs;
553 sft->ofs = ofs;
554 }
555 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
556
557 if (ret) {
558 /* let's be woken up once new data arrive */
559 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200560 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Emeric Brun97556472020-05-30 01:42:45 +0200561 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Christopher Fauleta0bdec32022-04-04 07:51:21 +0200562 cs_rx_endp_done(cs);
Emeric Brun97556472020-05-30 01:42:45 +0200563 }
564 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
565
566 /* always drain data from server */
Christopher Faulet908628c2022-03-25 16:43:49 +0100567 co_skip(cs_oc(cs), cs_oc(cs)->output);
Emeric Brun97556472020-05-30 01:42:45 +0200568 return;
569
570close:
Christopher Fauletda098e62022-03-31 17:44:45 +0200571 cs_shutw(cs);
572 cs_shutr(cs);
Christopher Faulet908628c2022-03-25 16:43:49 +0100573 cs_ic(cs)->flags |= CF_READ_NULL;
Emeric Brun97556472020-05-30 01:42:45 +0200574}
575
Emeric Brun494c5052020-05-28 11:13:15 +0200576void __sink_forward_session_deinit(struct sink_forward_target *sft)
577{
Christopher Faulet908628c2022-03-25 16:43:49 +0100578 struct stream *s = __cs_strm(sft->appctx->owner);
Emeric Brun494c5052020-05-28 11:13:15 +0200579 struct sink *sink;
580
Emeric Brun494c5052020-05-28 11:13:15 +0200581 sink = strm_fe(s)->parent;
582 if (!sink)
583 return;
584
585 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
586 LIST_DEL_INIT(&sft->appctx->wait_entry);
587 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
588
589 sft->appctx = NULL;
590 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
591}
592
593
594static void sink_forward_session_release(struct appctx *appctx)
595{
Christopher Fauletdd0b1442022-01-14 15:03:22 +0100596 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
Emeric Brun494c5052020-05-28 11:13:15 +0200597
598 if (!sft)
599 return;
600
601 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
602 if (sft->appctx == appctx)
603 __sink_forward_session_deinit(sft);
604 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
605}
606
607static struct applet sink_forward_applet = {
608 .obj_type = OBJ_TYPE_APPLET,
609 .name = "<SINKFWD>", /* used for logging */
610 .fct = sink_forward_io_handler,
611 .release = sink_forward_session_release,
612};
613
Emeric Brun97556472020-05-30 01:42:45 +0200614static struct applet sink_forward_oc_applet = {
615 .obj_type = OBJ_TYPE_APPLET,
616 .name = "<SINKFWDOC>", /* used for logging */
617 .fct = sink_forward_oc_io_handler,
618 .release = sink_forward_session_release,
619};
620
Emeric Brun494c5052020-05-28 11:13:15 +0200621/*
622 * Create a new peer session in assigned state (connect will start automatically)
623 */
624static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
625{
626 struct proxy *p = sink->forward_px;
627 struct appctx *appctx;
628 struct session *sess;
Christopher Faulet13a35e52021-12-20 15:34:16 +0100629 struct conn_stream *cs;
Emeric Brun494c5052020-05-28 11:13:15 +0200630 struct stream *s;
Emeric Brun97556472020-05-30 01:42:45 +0200631 struct applet *applet = &sink_forward_applet;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100632 struct sockaddr_storage *addr = NULL;
Emeric Brun97556472020-05-30 01:42:45 +0200633
634 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
635 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200636
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100637 appctx = appctx_new(applet, NULL);
Christopher Faulet2479e5f2022-01-19 14:50:11 +0100638 if (!appctx)
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100639 goto out_close;
Emeric Brun494c5052020-05-28 11:13:15 +0200640
641 appctx->ctx.sft.ptr = (void *)sft;
642
643 sess = session_new(p, NULL, &appctx->obj_type);
644 if (!sess) {
Christopher Faulet13a35e52021-12-20 15:34:16 +0100645 ha_alert("out of memory in sink_forward_session_create().\n");
Emeric Brun494c5052020-05-28 11:13:15 +0200646 goto out_free_appctx;
647 }
648
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100649 if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
Christopher Faulet2479e5f2022-01-19 14:50:11 +0100650 goto out_free_sess;
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100651
Christopher Faulet9ec2f4d2022-03-23 15:15:29 +0100652 cs = cs_new_from_applet(appctx->endp, sess, &BUF_NULL);
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100653 if (!cs) {
654 ha_alert("Failed to initialize stream in sink_forward_session_create().\n");
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100655 goto out_free_addr;
Christopher Faulet13a35e52021-12-20 15:34:16 +0100656 }
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100657 s = DISGUISE(cs_strm(cs));
Christopher Faulet13a35e52021-12-20 15:34:16 +0100658
Christopher Faulet8da67aa2022-03-29 17:53:09 +0200659 s->csb->dst = addr;
Christopher Faulet8abe7122022-03-30 15:10:18 +0200660 s->csb->flags |= CS_FL_NOLINGER;
Emeric Brun494c5052020-05-28 11:13:15 +0200661
662 s->target = &sft->srv->obj_type;
Emeric Brun494c5052020-05-28 11:13:15 +0200663 s->flags = SF_ASSIGNED|SF_ADDR_SET;
Emeric Brun494c5052020-05-28 11:13:15 +0200664
665 s->do_log = NULL;
666 s->uniq_id = 0;
667
668 s->res.flags |= CF_READ_DONTWAIT;
669 /* for rto and rex to eternity to not expire on idle recv:
670 * We are using a syslog server.
671 */
672 s->res.rto = TICK_ETERNITY;
673 s->res.rex = TICK_ETERNITY;
674 sft->appctx = appctx;
Emeric Brun494c5052020-05-28 11:13:15 +0200675 return appctx;
676
677 /* Error unrolling */
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100678 out_free_addr:
679 sockaddr_free(&addr);
Emeric Brun494c5052020-05-28 11:13:15 +0200680 out_free_sess:
681 session_free(sess);
682 out_free_appctx:
683 appctx_free(appctx);
684 out_close:
685 return NULL;
686}
687
688/*
689 * Task to handle connctions to forward servers
690 */
Willy Tarreau144f84a2021-03-02 16:09:26 +0100691static struct task *process_sink_forward(struct task * task, void *context, unsigned int state)
Emeric Brun494c5052020-05-28 11:13:15 +0200692{
693 struct sink *sink = (struct sink *)context;
694 struct sink_forward_target *sft = sink->sft;
695
696 task->expire = TICK_ETERNITY;
697
698 if (!stopping) {
699 while (sft) {
700 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
701 /* if appctx is NULL, start a new session */
702 if (!sft->appctx)
703 sft->appctx = sink_forward_session_create(sink, sft);
704 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
705 sft = sft->next;
706 }
707 }
708 else {
709 while (sft) {
710 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
711 /* awake applet to perform a clean close */
712 if (sft->appctx)
713 appctx_wakeup(sft->appctx);
714 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
715 sft = sft->next;
716 }
717 }
718
719 return task;
720}
721/*
722 * Init task to manage connctions to forward servers
723 *
724 * returns 0 in case of error.
725 */
726int sink_init_forward(struct sink *sink)
727{
Willy Tarreaubeeabf52021-10-01 18:23:30 +0200728 sink->forward_task = task_new_anywhere();
Emeric Brun494c5052020-05-28 11:13:15 +0200729 if (!sink->forward_task)
730 return 0;
731
732 sink->forward_task->process = process_sink_forward;
733 sink->forward_task->context = (void *)sink;
734 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
735 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
736 return 1;
737}
Emeric Brun99c453d2020-05-25 15:01:04 +0200738/*
739 * Parse "ring" section and create corresponding sink buffer.
740 *
741 * The function returns 0 in success case, otherwise, it returns error
742 * flags.
743 */
744int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
745{
746 int err_code = 0;
747 const char *inv;
748 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200749 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200750
751 if (strcmp(args[0], "ring") == 0) { /* new peers section */
752 if (!*args[1]) {
753 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
754 err_code |= ERR_ALERT | ERR_FATAL;
755 goto err;
756 }
757
758 inv = invalid_char(args[1]);
759 if (inv) {
760 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
761 err_code |= ERR_ALERT | ERR_FATAL;
762 goto err;
763 }
764
765 if (sink_find(args[1])) {
766 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
767 err_code |= ERR_ALERT | ERR_FATAL;
768 goto err;
769 }
770
Emeric Brun54648852020-07-06 15:54:06 +0200771 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200772 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
773 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
774 err_code |= ERR_ALERT | ERR_FATAL;
775 goto err;
776 }
Emeric Brun494c5052020-05-28 11:13:15 +0200777
778 /* allocate new proxy to handle forwards */
779 p = calloc(1, sizeof *p);
780 if (!p) {
781 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
782 err_code |= ERR_ALERT | ERR_FATAL;
783 goto err;
784 }
785
786 init_new_proxy(p);
787 sink_setup_proxy(p);
788 p->parent = cfg_sink;
789 p->id = strdup(args[1]);
790 p->conf.args.file = p->conf.file = strdup(file);
791 p->conf.args.line = p->conf.line = linenum;
792 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200793 }
794 else if (strcmp(args[0], "size") == 0) {
795 size = atol(args[1]);
796 if (!size) {
797 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
798 err_code |= ERR_ALERT | ERR_FATAL;
799 goto err;
800 }
801
802 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
803 || !ring_resize(cfg_sink->ctx.ring, size)) {
804 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
805 err_code |= ERR_ALERT | ERR_FATAL;
806 goto err;
807 }
808 }
Emeric Brun494c5052020-05-28 11:13:15 +0200809 else if (strcmp(args[0],"server") == 0) {
Amaury Denoyelle30c05372021-03-08 16:36:46 +0100810 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL,
811 SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE);
Emeric Brun494c5052020-05-28 11:13:15 +0200812 }
813 else if (strcmp(args[0],"timeout") == 0) {
814 if (!cfg_sink || !cfg_sink->forward_px) {
815 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
816 err_code |= ERR_ALERT | ERR_FATAL;
817 goto err;
818 }
819
820 if (strcmp(args[1], "connect") == 0 ||
821 strcmp(args[1], "server") == 0) {
822 const char *res;
823 unsigned int tout;
824
825 if (!*args[2]) {
826 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
827 file, linenum, args[0], args[1]);
828 err_code |= ERR_ALERT | ERR_FATAL;
829 goto err;
830 }
831 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
832 if (res == PARSE_TIME_OVER) {
833 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
834 file, linenum, args[2], args[0], args[1]);
835 err_code |= ERR_ALERT | ERR_FATAL;
836 goto err;
837 }
838 else if (res == PARSE_TIME_UNDER) {
839 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
840 file, linenum, args[2], args[0], args[1]);
841 err_code |= ERR_ALERT | ERR_FATAL;
842 goto err;
843 }
844 else if (res) {
845 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
846 file, linenum, *res, args[0], args[1]);
847 err_code |= ERR_ALERT | ERR_FATAL;
848 goto err;
849 }
850 if (args[1][2] == 'c')
851 cfg_sink->forward_px->timeout.connect = tout;
852 else
853 cfg_sink->forward_px->timeout.server = tout;
854 }
855 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200856 else if (strcmp(args[0],"format") == 0) {
857 if (!cfg_sink) {
858 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
859 err_code |= ERR_ALERT | ERR_FATAL;
860 goto err;
861 }
862
Emeric Brun54648852020-07-06 15:54:06 +0200863 cfg_sink->fmt = get_log_format(args[1]);
864 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +0200865 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
866 err_code |= ERR_ALERT | ERR_FATAL;
867 goto err;
868 }
869 }
870 else if (strcmp(args[0],"maxlen") == 0) {
871 if (!cfg_sink) {
872 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
873 err_code |= ERR_ALERT | ERR_FATAL;
874 goto err;
875 }
876
877 cfg_sink->maxlen = atol(args[1]);
878 if (!cfg_sink->maxlen) {
879 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
880 err_code |= ERR_ALERT | ERR_FATAL;
881 goto err;
882 }
883 }
884 else if (strcmp(args[0],"description") == 0) {
885 if (!cfg_sink) {
886 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
887 err_code |= ERR_ALERT | ERR_FATAL;
888 goto err;
889 }
890
891 if (!*args[1]) {
892 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
893 err_code |= ERR_ALERT | ERR_FATAL;
894 goto err;
895 }
896
897 free(cfg_sink->desc);
898
899 cfg_sink->desc = strdup(args[1]);
900 if (!cfg_sink->desc) {
901 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
902 err_code |= ERR_ALERT | ERR_FATAL;
903 goto err;
904 }
905 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200906 else {
907 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
908 err_code |= ERR_ALERT | ERR_FATAL;
909 goto err;
910 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200911
912err:
913 return err_code;
914}
915
Emeric Brun94aab062021-04-02 10:41:36 +0200916/* Creates an new sink buffer from a log server.
917 *
918 * It uses the logsrvaddress to declare a forward
919 * server for this buffer. And it initializes the
920 * forwarding.
921 *
922 * The function returns a pointer on the
923 * allocated struct sink if allocate
924 * and initialize succeed, else if it fails
925 * it returns NULL.
926 *
927 * Note: the sink is created using the name
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +0500928 * specified into logsrv->ring_name
Emeric Brun94aab062021-04-02 10:41:36 +0200929 */
930struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
931{
932 struct proxy *p = NULL;
933 struct sink *sink = NULL;
934 struct server *srv = NULL;
935 struct sink_forward_target *sft = NULL;
Emeric Brun94aab062021-04-02 10:41:36 +0200936
937 /* allocate new proxy to handle
938 * forward to a stream server
939 */
940 p = calloc(1, sizeof *p);
941 if (!p) {
942 goto error;
943 }
944
945 init_new_proxy(p);
946 sink_setup_proxy(p);
947 p->id = strdup(logsrv->ring_name);
948 p->conf.args.file = p->conf.file = strdup(logsrv->conf.file);
949 p->conf.args.line = p->conf.line = logsrv->conf.line;
950
951 /* allocate a new server to forward messages
952 * from ring buffer
953 */
954 srv = new_server(p);
955 if (!srv)
956 goto error;
957
958 /* init server */
959 srv->id = strdup(logsrv->ring_name);
960 srv->conf.file = strdup(logsrv->conf.file);
961 srv->conf.line = logsrv->conf.line;
962 srv->addr = logsrv->addr;
963 srv->svc_port = get_host_port(&logsrv->addr);
964 HA_SPIN_INIT(&srv->lock);
965
966 /* process per thread init */
Miroslav Zagorac8a8f2702021-06-15 15:33:20 +0200967 if (srv_init_per_thr(srv) == -1)
Emeric Brun94aab062021-04-02 10:41:36 +0200968 goto error;
969
Emeric Brun94aab062021-04-02 10:41:36 +0200970 /* the servers are linked backwards
971 * first into proxy
972 */
973 p->srv = srv;
974 srv->next = p->srv;
975
976 /* allocate sink_forward_target descriptor */
977 sft = calloc(1, sizeof(*sft));
978 if (!sft)
979 goto error;
980
981 /* init sink_forward_target offset */
982 sft->srv = srv;
983 sft->appctx = NULL;
984 sft->ofs = ~0;
985 HA_SPIN_INIT(&sft->lock);
986
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +0500987 /* prepare description for the sink */
Emeric Brun94aab062021-04-02 10:41:36 +0200988 chunk_reset(&trash);
989 chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
990
991 /* allocate a new sink buffer */
992 sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE);
993 if (!sink || sink->type != SINK_TYPE_BUFFER) {
994 goto error;
995 }
996
997 /* link sink_forward_target to proxy */
998 sink->forward_px = p;
999 p->parent = sink;
1000
1001 /* insert into sink_forward_targets
1002 * list into sink
1003 */
1004 sft->next = sink->sft;
1005 sink->sft = sft;
1006
1007 /* mark server as an attached reader to the ring */
1008 if (!ring_attach(sink->ctx.ring)) {
1009 /* should never fail since there is
1010 * only one reader
1011 */
1012 goto error;
1013 }
1014
1015 /* initialize sink buffer forwarding */
1016 if (!sink_init_forward(sink))
1017 goto error;
1018
1019 /* reset familyt of logsrv to consider the ring buffer target */
1020 logsrv->addr.ss_family = AF_UNSPEC;
1021
1022 return sink;
1023error:
1024 if (p) {
1025 if (p->id)
1026 free(p->id);
1027 if (p->conf.file)
1028 free(p->conf.file);
1029
1030 free(p);
1031 }
1032
1033 if (srv) {
1034 if (srv->id)
1035 free(srv->id);
1036 if (srv->conf.file)
1037 free((void *)srv->conf.file);
1038 if (srv->per_thr)
1039 free(srv->per_thr);
1040 free(srv);
1041 }
1042
1043 if (sft)
1044 free(sft);
1045
1046 if (sink) {
1047 if (sink->ctx.ring)
1048 ring_free(sink->ctx.ring);
1049
Willy Tarreau2b718102021-04-21 07:32:39 +02001050 LIST_DELETE(&sink->sink_list);
Emeric Brun94aab062021-04-02 10:41:36 +02001051 free(sink->name);
1052 free(sink->desc);
1053 free(sink);
1054 }
1055
1056 return NULL;
1057}
1058
Emeric Brun99c453d2020-05-25 15:01:04 +02001059/*
1060 * Post parsing "ring" section.
1061 *
1062 * The function returns 0 in success case, otherwise, it returns error
1063 * flags.
1064 */
1065int cfg_post_parse_ring()
1066{
1067 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +02001068 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +02001069
1070 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
1071 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
1072 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +02001073 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +02001074 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
1075 err_code |= ERR_ALERT;
1076 }
Emeric Brun494c5052020-05-28 11:13:15 +02001077
1078 /* prepare forward server descriptors */
1079 if (cfg_sink->forward_px) {
1080 srv = cfg_sink->forward_px->srv;
1081 while (srv) {
1082 struct sink_forward_target *sft;
1083 /* init ssl if needed */
1084 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
1085 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
1086 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
1087 err_code |= ERR_ALERT | ERR_FATAL;
1088 }
1089 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001090
Emeric Brun494c5052020-05-28 11:13:15 +02001091 /* allocate sink_forward_target descriptor */
1092 sft = calloc(1, sizeof(*sft));
1093 if (!sft) {
1094 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1095 err_code |= ERR_ALERT | ERR_FATAL;
1096 break;
1097 }
1098 sft->srv = srv;
1099 sft->appctx = NULL;
1100 sft->ofs = ~0; /* init ring offset */
1101 sft->next = cfg_sink->sft;
1102 HA_SPIN_INIT(&sft->lock);
1103
1104 /* mark server attached to the ring */
1105 if (!ring_attach(cfg_sink->ctx.ring)) {
1106 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1107 err_code |= ERR_ALERT | ERR_FATAL;
1108 }
1109 cfg_sink->sft = sft;
1110 srv = srv->next;
1111 }
1112 sink_init_forward(cfg_sink);
1113 }
1114 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001115 cfg_sink = NULL;
1116
1117 return err_code;
1118}
1119
1120/* resolve sink names at end of config. Returns 0 on success otherwise error
1121 * flags.
1122*/
1123int post_sink_resolve()
1124{
Christopher Fauletfc633b62020-11-06 15:24:23 +01001125 int err_code = ERR_NONE;
Emeric Brun99c453d2020-05-25 15:01:04 +02001126 struct logsrv *logsrv, *logb;
1127 struct sink *sink;
1128 struct proxy *px;
1129
1130 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1131 if (logsrv->type == LOG_TARGET_BUFFER) {
1132 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001133 if (!sink) {
1134 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1135 * means we must allocate a sink
1136 * buffer to send messages to this logsrv
1137 */
1138 if (logsrv->addr.ss_family != AF_UNSPEC) {
1139 sink = sink_new_from_logsrv(logsrv);
1140 if (!sink) {
1141 ha_alert("global stream log server declared in file '%s' at line %d cannot be initialized'.\n",
1142 logsrv->conf.file, logsrv->conf.line);
1143 err_code |= ERR_ALERT | ERR_FATAL;
1144 }
1145 }
1146 else {
1147 ha_alert("global log server declared in file '%s' at line %d uses unknown ring named '%s'.\n",
1148 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1149 err_code |= ERR_ALERT | ERR_FATAL;
1150 }
1151 }
1152 else if (sink->type != SINK_TYPE_BUFFER) {
1153 ha_alert("global log server declared in file '%s' at line %d uses incompatible ring '%s'.\n",
1154 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001155 err_code |= ERR_ALERT | ERR_FATAL;
1156 }
1157 logsrv->sink = sink;
1158 }
Emeric Brun94aab062021-04-02 10:41:36 +02001159
Emeric Brun99c453d2020-05-25 15:01:04 +02001160 }
1161
1162 for (px = proxies_list; px; px = px->next) {
1163 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1164 if (logsrv->type == LOG_TARGET_BUFFER) {
1165 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001166 if (!sink) {
1167 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1168 * means we must allocate a sink
1169 * buffer to send messages to this logsrv
1170 */
1171 if (logsrv->addr.ss_family != AF_UNSPEC) {
1172 sink = sink_new_from_logsrv(logsrv);
1173 if (!sink) {
1174 ha_alert("log server declared in proxy section '%s' file '%s' at line %d cannot be initialized'.\n",
1175 px->id, logsrv->conf.file, logsrv->conf.line);
1176 err_code |= ERR_ALERT | ERR_FATAL;
1177 }
1178 }
1179 else {
1180 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1181 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1182 err_code |= ERR_ALERT | ERR_FATAL;
1183 }
1184 }
1185 else if (sink->type != SINK_TYPE_BUFFER) {
1186 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses incomatible ring named '%s'.\n",
1187 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001188 err_code |= ERR_ALERT | ERR_FATAL;
1189 }
1190 logsrv->sink = sink;
1191 }
1192 }
1193 }
Emeric Brun12941c82020-07-07 14:19:42 +02001194
1195 for (px = cfg_log_forward; px; px = px->next) {
1196 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1197 if (logsrv->type == LOG_TARGET_BUFFER) {
1198 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001199 if (!sink) {
1200 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1201 * means we must allocate a sink
1202 * buffer to send messages to this logsrv
1203 */
1204 if (logsrv->addr.ss_family != AF_UNSPEC) {
1205 sink = sink_new_from_logsrv(logsrv);
1206 if (!sink) {
1207 ha_alert("log server declared in log-forward section '%s' file '%s' at line %d cannot be initialized'.\n",
1208 px->id, logsrv->conf.file, logsrv->conf.line);
1209 err_code |= ERR_ALERT | ERR_FATAL;
1210 }
1211 }
1212 else {
1213 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1214 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1215 err_code |= ERR_ALERT | ERR_FATAL;
1216 }
1217 }
1218 else if (sink->type != SINK_TYPE_BUFFER) {
1219 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1220 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun12941c82020-07-07 14:19:42 +02001221 err_code |= ERR_ALERT | ERR_FATAL;
1222 }
1223 logsrv->sink = sink;
1224 }
1225 }
1226 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001227 return err_code;
1228}
1229
1230
Willy Tarreau973e6622019-08-20 11:57:52 +02001231static void sink_init()
1232{
Emeric Brun54648852020-07-06 15:54:06 +02001233 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1234 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1235 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001236}
1237
1238static void sink_deinit()
1239{
1240 struct sink *sink, *sb;
1241
1242 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1243 if (sink->type == SINK_TYPE_BUFFER)
1244 ring_free(sink->ctx.ring);
Willy Tarreau2b718102021-04-21 07:32:39 +02001245 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001246 free(sink->name);
1247 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001248 free(sink);
1249 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001250}
1251
1252INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001253REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001254
Willy Tarreau9f830d72019-08-26 18:17:04 +02001255static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaub205bfd2021-05-07 11:38:37 +02001256 { { "show", "events", NULL }, "show events [<sink>] [-w] [-n] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001257 {{},}
1258}};
1259
1260INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1261
Emeric Brun99c453d2020-05-25 15:01:04 +02001262/* config parsers for this section */
1263REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1264REGISTER_POST_CHECK(post_sink_resolve);
1265
Willy Tarreau67b5a162019-08-11 16:38:56 +02001266/*
1267 * Local variables:
1268 * c-indent-level: 8
1269 * c-basic-offset: 8
1270 * End:
1271 */