blob: ff498b24d87850710e2bbac77552417d8962a1b9 [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Emeric Brun99c453d2020-05-25 15:01:04 +020021#include <common/cfgparse.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020022#include <common/compat.h>
23#include <common/config.h>
24#include <common/ist.h>
25#include <common/mini-clist.h>
Willy Tarreau53ba9d92019-09-26 08:03:58 +020026#include <common/time.h>
Willy Tarreau9f830d72019-08-26 18:17:04 +020027#include <proto/cli.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020028#include <proto/log.h>
Willy Tarreau4ed23ca2019-08-23 15:47:49 +020029#include <proto/ring.h>
Emeric Brun494c5052020-05-28 11:13:15 +020030#include <proto/signal.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020031#include <proto/sink.h>
Willy Tarreau9f830d72019-08-26 18:17:04 +020032#include <proto/stream_interface.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020033
34struct list sink_list = LIST_HEAD_INIT(sink_list);
35
Emeric Brun99c453d2020-05-25 15:01:04 +020036struct sink *cfg_sink;
37
Willy Tarreau67b5a162019-08-11 16:38:56 +020038struct sink *sink_find(const char *name)
39{
40 struct sink *sink;
41
42 list_for_each_entry(sink, &sink_list, sink_list)
43 if (strcmp(sink->name, name) == 0)
44 return sink;
45 return NULL;
46}
47
48/* creates a new sink and adds it to the list, it's still generic and not fully
49 * initialized. Returns NULL on allocation failure. If another one already
50 * exists with the same name, it will be returned. The caller can detect it as
51 * a newly created one has type SINK_TYPE_NEW.
52 */
Willy Tarreau973e6622019-08-20 11:57:52 +020053static struct sink *__sink_new(const char *name, const char *desc, enum sink_fmt fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020054{
55 struct sink *sink;
56
57 sink = sink_find(name);
58 if (sink)
59 goto end;
60
Emeric Brun494c5052020-05-28 11:13:15 +020061 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020062 if (!sink)
63 goto end;
64
Emeric Brun99c453d2020-05-25 15:01:04 +020065 sink->name = strdup(name);
66 sink->desc = strdup(desc);
Willy Tarreau67b5a162019-08-11 16:38:56 +020067 sink->fmt = fmt;
68 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010069 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020070 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020071 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020072 sink->ctx.dropped = 0;
73 HA_RWLOCK_INIT(&sink->ctx.lock);
74 LIST_ADDQ(&sink_list, &sink->sink_list);
75 end:
76 return sink;
77}
78
Willy Tarreau973e6622019-08-20 11:57:52 +020079/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
80 * and description <desc>. Returns NULL on allocation failure or conflict.
81 * Perfect duplicates are merged (same type, fd, and name).
82 */
83struct sink *sink_new_fd(const char *name, const char *desc, enum sink_fmt fmt, int fd)
84{
85 struct sink *sink;
86
87 sink = __sink_new(name, desc, fmt);
88 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
89 goto end;
90
91 if (sink->type != SINK_TYPE_NEW) {
92 sink = NULL;
93 goto end;
94 }
95
96 sink->type = SINK_TYPE_FD;
97 sink->ctx.fd = fd;
98 end:
99 return sink;
100}
101
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200102/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
103 * and description <desc>. Returns NULL on allocation failure or conflict.
104 * Perfect duplicates are merged (same type and name). If sizes differ, the
105 * largest one is kept.
106 */
107struct sink *sink_new_buf(const char *name, const char *desc, enum sink_fmt fmt, size_t size)
108{
109 struct sink *sink;
110
111 sink = __sink_new(name, desc, fmt);
112 if (!sink)
113 goto fail;
114
115 if (sink->type == SINK_TYPE_BUFFER) {
116 /* such a buffer already exists, we may have to resize it */
117 if (!ring_resize(sink->ctx.ring, size))
118 goto fail;
119 goto end;
120 }
121
122 if (sink->type != SINK_TYPE_NEW) {
123 /* already exists of another type */
124 goto fail;
125 }
126
127 sink->ctx.ring = ring_new(size);
128 if (!sink->ctx.ring) {
129 LIST_DEL(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200130 free(sink->name);
131 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200132 free(sink);
133 goto fail;
134 }
135
136 sink->type = SINK_TYPE_BUFFER;
137 end:
138 return sink;
139 fail:
140 return NULL;
141}
142
Willy Tarreau67b5a162019-08-11 16:38:56 +0200143/* tries to send <nmsg> message parts (up to 8, ignored above) from message
144 * array <msg> to sink <sink>. Formating according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200145 * done here. Lost messages are NOT accounted for. It is preferable to call
146 * sink_write() instead which will also try to emit the number of dropped
147 * messages when there are any. It returns >0 if it could write anything,
148 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200149 */
Emeric Brunbd163812020-05-06 14:33:46 +0200150ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
151 int level, int facility, struct ist *tag,
152 struct ist *pid, struct ist *sd)
Willy Tarreau67b5a162019-08-11 16:38:56 +0200153{
Emeric Brunbd163812020-05-06 14:33:46 +0200154 int log_format;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200155 char short_hdr[4];
Emeric Brunbd163812020-05-06 14:33:46 +0200156 struct ist pfx[6];
Willy Tarreaua1426de2019-08-27 14:21:02 +0200157 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200158 char *hdr_ptr;
159 int fac_level;
160
161 if (sink->fmt == SINK_FMT_RAW)
162 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200163
Willy Tarreau53ba9d92019-09-26 08:03:58 +0200164 if (sink->fmt == SINK_FMT_SHORT || sink->fmt == SINK_FMT_TIMED) {
Willy Tarreau67b5a162019-08-11 16:38:56 +0200165 short_hdr[0] = '<';
Emeric Brunbd163812020-05-06 14:33:46 +0200166 short_hdr[1] = '0' + level;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200167 short_hdr[2] = '>';
168
Willy Tarreaua1426de2019-08-27 14:21:02 +0200169 pfx[npfx].ptr = short_hdr;
170 pfx[npfx].len = 3;
171 npfx++;
Emeric Brunbd163812020-05-06 14:33:46 +0200172 if (sink->fmt == SINK_FMT_SHORT)
173 goto send;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200174 }
Willy Tarreau67b5a162019-08-11 16:38:56 +0200175
Emeric Brunbd163812020-05-06 14:33:46 +0200176
Willy Tarreau53ba9d92019-09-26 08:03:58 +0200177 if (sink->fmt == SINK_FMT_ISO || sink->fmt == SINK_FMT_TIMED) {
178 pfx[npfx].ptr = timeofday_as_iso_us(1);
179 pfx[npfx].len = 27;
180 npfx++;
Emeric Brunbd163812020-05-06 14:33:46 +0200181 goto send;
Willy Tarreau53ba9d92019-09-26 08:03:58 +0200182 }
Emeric Brunbd163812020-05-06 14:33:46 +0200183 else if (sink->fmt == SINK_FMT_RFC5424) {
184 pfx[npfx].ptr = logheader_rfc5424;
185 pfx[npfx].len = update_log_hdr_rfc5424(date.tv_sec) - pfx[npfx].ptr;
186 log_format = LOG_FORMAT_RFC5424;
187 }
188 else {
189 pfx[npfx].ptr = logheader;
190 pfx[npfx].len = update_log_hdr(date.tv_sec) - pfx[npfx].ptr;
191 log_format = LOG_FORMAT_RFC3164;
192 sd = NULL;
193 }
194
195 fac_level = (facility << 3) + level;
196 hdr_ptr = pfx[npfx].ptr + 3; /* last digit of the log level */
197 do {
198 *hdr_ptr = '0' + fac_level % 10;
199 fac_level /= 10;
200 hdr_ptr--;
201 } while (fac_level && hdr_ptr > pfx[npfx].ptr);
202 *hdr_ptr = '<';
203 pfx[npfx].len -= hdr_ptr - pfx[npfx].ptr;
204 pfx[npfx].ptr = hdr_ptr;
205 npfx++;
206
207 if (tag && tag->len) {
208 pfx[npfx].ptr = tag->ptr;
209 pfx[npfx].len = tag->len;
210 npfx++;
211 }
212 pfx[npfx].ptr = get_format_pid_sep1(log_format, &pfx[npfx].len);
213 if (pfx[npfx].len)
214 npfx++;
Willy Tarreau53ba9d92019-09-26 08:03:58 +0200215
Emeric Brunbd163812020-05-06 14:33:46 +0200216 if (pid && pid->len) {
217 pfx[npfx].ptr = pid->ptr;
218 pfx[npfx].len = pid->len;
219 npfx++;
220 }
221
222 pfx[npfx].ptr = get_format_pid_sep2(log_format, &pfx[npfx].len);
223 if (pfx[npfx].len)
224 npfx++;
225
226 if (sd && sd->len) {
227 pfx[npfx].ptr = sd->ptr;
228 pfx[npfx].len = sd->len;
229 npfx++;
230 }
231
232send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200233 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200234 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200235 }
236 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200237 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200238 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200239 return 0;
240}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200241
Willy Tarreau8f240232019-08-27 16:41:06 +0200242/* Tries to emit a message indicating the number of dropped events. In case of
243 * success, the amount of drops is reduced by as much. It's supposed to be
244 * called under an exclusive lock on the sink to avoid multiple produces doing
245 * the same. On success, >0 is returned, otherwise <=0 on failure.
246 */
Emeric Brunbd163812020-05-06 14:33:46 +0200247int sink_announce_dropped(struct sink *sink, int facility, struct ist *pid)
Willy Tarreau8f240232019-08-27 16:41:06 +0200248{
249 unsigned int dropped;
250 struct buffer msg;
251 struct ist msgvec[1];
252 char logbuf[64];
Emeric Brunbd163812020-05-06 14:33:46 +0200253 struct ist sd;
254 struct ist tag;
Willy Tarreau8f240232019-08-27 16:41:06 +0200255
256 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
257 chunk_init(&msg, logbuf, sizeof(logbuf));
258 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
259 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200260
261 sd.ptr = default_rfc5424_sd_log_format;
262 sd.len = 2;
263 tag.ptr = global.log_tag.area;
264 tag.len = global.log_tag.data;
265 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, &tag, pid, &sd) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200266 return 0;
267 /* success! */
268 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
269 }
270 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200271}
272
Willy Tarreau9f830d72019-08-26 18:17:04 +0200273/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
274static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
275{
276 struct sink *sink;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200277 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200278
279 args++; // make args[1] the 1st arg
280
281 if (!*args[1]) {
282 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200283 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200284 list_for_each_entry(sink, &sink_list, sink_list) {
285 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
286 sink->name,
287 sink->type == SINK_TYPE_NEW ? "init" :
288 sink->type == SINK_TYPE_FD ? "fd" :
289 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
290 sink->ctx.dropped, sink->desc);
291 }
292
293 trash.area[trash.data] = 0;
294 return cli_msg(appctx, LOG_WARNING, trash.area);
295 }
296
297 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
298 return 1;
299
300 sink = sink_find(args[1]);
301 if (!sink)
302 return cli_err(appctx, "No such event sink");
303
304 if (sink->type != SINK_TYPE_BUFFER)
305 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
306
Willy Tarreau1d181e42019-08-30 11:17:01 +0200307 for (arg = 2; *args[arg]; arg++) {
308 if (strcmp(args[arg], "-w") == 0)
309 appctx->ctx.cli.i0 |= 1; // wait mode
310 else if (strcmp(args[arg], "-n") == 0)
311 appctx->ctx.cli.i0 |= 2; // seek to new
312 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
313 appctx->ctx.cli.i0 |= 3; // seek to new + wait
314 else
315 return cli_err(appctx, "unknown option");
316 }
Willy Tarreau9f830d72019-08-26 18:17:04 +0200317 return ring_attach_cli(sink->ctx.ring, appctx);
318}
319
Emeric Brun494c5052020-05-28 11:13:15 +0200320/* Pre-configures a ring proxy to emmit connections */
321void sink_setup_proxy(struct proxy *px)
322{
323 px->last_change = now.tv_sec;
324 px->cap = PR_CAP_FE | PR_CAP_BE;
325 px->maxconn = 0;
326 px->conn_retries = 1;
327 px->timeout.server = TICK_ETERNITY;
328 px->timeout.client = TICK_ETERNITY;
329 px->timeout.connect = TICK_ETERNITY;
330 px->accept = NULL;
331 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
332 px->bind_proc = 0; /* will be filled by users */
333}
334
335/*
336 * IO Handler to handle message push to syslog tcp server
337 */
338static void sink_forward_io_handler(struct appctx *appctx)
339{
340 struct stream_interface *si = appctx->owner;
341 struct stream *s = si_strm(si);
342 struct sink *sink = strm_fe(s)->parent;
343 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
344 struct ring *ring = sink->ctx.ring;
345 struct buffer *buf = &ring->buf;
346 uint64_t msg_len;
347 size_t len, cnt, ofs;
348 int ret = 0;
349
350 /* if stopping was requested, close immediatly */
351 if (unlikely(stopping))
352 goto close;
353
354 /* for rex because it seems reset to timeout
355 * and we don't want expire on this case
356 * with a syslog server
357 */
358 si_oc(si)->rex = TICK_ETERNITY;
359 /* rto should not change but it seems the case */
360 si_oc(si)->rto = TICK_ETERNITY;
361
362 /* an error was detected */
363 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
364 goto close;
365
366 /* con closed by server side */
367 if ((si_oc(si)->flags & CF_SHUTW))
368 goto close;
369
370 /* if the connection is not established, inform the stream that we want
371 * to be notified whenever the connection completes.
372 */
373 if (si_opposite(si)->state < SI_ST_EST) {
374 si_cant_get(si);
375 si_rx_conn_blk(si);
376 si_rx_endp_more(si);
377 return;
378 }
379
380 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
381 if (appctx != sft->appctx) {
382 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
383 goto close;
384 }
385 ofs = sft->ofs;
386
387 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
388 LIST_DEL_INIT(&appctx->wait_entry);
389 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
390
391 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
392
393 /* explanation for the initialization below: it would be better to do
394 * this in the parsing function but this would occasionally result in
395 * dropped events because we'd take a reference on the oldest message
396 * and keep it while being scheduled. Thus instead let's take it the
397 * first time we enter here so that we have a chance to pass many
398 * existing messages before grabbing a reference to a location. This
399 * value cannot be produced after initialization.
400 */
401 if (unlikely(ofs == ~0)) {
402 ofs = 0;
403
404 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
405 ofs += ring->ofs;
406 }
407
408 /* we were already there, adjust the offset to be relative to
409 * the buffer's head and remove us from the counter.
410 */
411 ofs -= ring->ofs;
412 BUG_ON(ofs >= buf->size);
413 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
414
415 /* in this loop, ofs always points to the counter byte that precedes
416 * the message so that we can take our reference there if we have to
417 * stop before the end (ret=0).
418 */
419 if (si_opposite(si)->state == SI_ST_EST) {
420 ret = 1;
421 while (ofs + 1 < b_data(buf)) {
422 cnt = 1;
423 len = b_peek_varint(buf, ofs + cnt, &msg_len);
424 if (!len)
425 break;
426 cnt += len;
427 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
428
429 if (unlikely(msg_len + 1 > b_size(&trash))) {
430 /* too large a message to ever fit, let's skip it */
431 ofs += cnt + msg_len;
432 continue;
433 }
434
435 chunk_reset(&trash);
436 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
437 trash.data += len;
438 trash.area[trash.data++] = '\n';
439
440 if (ci_putchk(si_ic(si), &trash) == -1) {
441 si_rx_room_blk(si);
442 ret = 0;
443 break;
444 }
445 ofs += cnt + msg_len;
446 }
447
448 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
449 ofs += ring->ofs;
450 sft->ofs = ofs;
451 }
452 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
453
454 if (ret) {
455 /* let's be woken up once new data arrive */
456 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
457 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
458 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
459 si_rx_endp_done(si);
460 }
461 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
462
463 /* always drain data from server */
464 co_skip(si_oc(si), si_oc(si)->output);
465 return;
466
467close:
468 si_shutw(si);
469 si_shutr(si);
470 si_ic(si)->flags |= CF_READ_NULL;
471}
472
Emeric Brun97556472020-05-30 01:42:45 +0200473/*
474 * IO Handler to handle message push to syslog tcp server
475 * using octet counting frames
476 */
477static void sink_forward_oc_io_handler(struct appctx *appctx)
478{
479 struct stream_interface *si = appctx->owner;
480 struct stream *s = si_strm(si);
481 struct sink *sink = strm_fe(s)->parent;
482 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
483 struct ring *ring = sink->ctx.ring;
484 struct buffer *buf = &ring->buf;
485 uint64_t msg_len;
486 size_t len, cnt, ofs;
487 int ret = 0;
488 char *p;
489
490 /* if stopping was requested, close immediatly */
491 if (unlikely(stopping))
492 goto close;
493
494 /* for rex because it seems reset to timeout
495 * and we don't want expire on this case
496 * with a syslog server
497 */
498 si_oc(si)->rex = TICK_ETERNITY;
499 /* rto should not change but it seems the case */
500 si_oc(si)->rto = TICK_ETERNITY;
501
502 /* an error was detected */
503 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
504 goto close;
505
506 /* con closed by server side */
507 if ((si_oc(si)->flags & CF_SHUTW))
508 goto close;
509
510 /* if the connection is not established, inform the stream that we want
511 * to be notified whenever the connection completes.
512 */
513 if (si_opposite(si)->state < SI_ST_EST) {
514 si_cant_get(si);
515 si_rx_conn_blk(si);
516 si_rx_endp_more(si);
517 return;
518 }
519
520 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
521 if (appctx != sft->appctx) {
522 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
523 goto close;
524 }
525 ofs = sft->ofs;
526
527 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
528 LIST_DEL_INIT(&appctx->wait_entry);
529 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
530
531 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
532
533 /* explanation for the initialization below: it would be better to do
534 * this in the parsing function but this would occasionally result in
535 * dropped events because we'd take a reference on the oldest message
536 * and keep it while being scheduled. Thus instead let's take it the
537 * first time we enter here so that we have a chance to pass many
538 * existing messages before grabbing a reference to a location. This
539 * value cannot be produced after initialization.
540 */
541 if (unlikely(ofs == ~0)) {
542 ofs = 0;
543
544 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
545 ofs += ring->ofs;
546 }
547
548 /* we were already there, adjust the offset to be relative to
549 * the buffer's head and remove us from the counter.
550 */
551 ofs -= ring->ofs;
552 BUG_ON(ofs >= buf->size);
553 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
554
555 /* in this loop, ofs always points to the counter byte that precedes
556 * the message so that we can take our reference there if we have to
557 * stop before the end (ret=0).
558 */
559 if (si_opposite(si)->state == SI_ST_EST) {
560 ret = 1;
561 while (ofs + 1 < b_data(buf)) {
562 cnt = 1;
563 len = b_peek_varint(buf, ofs + cnt, &msg_len);
564 if (!len)
565 break;
566 cnt += len;
567 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
568
569 chunk_reset(&trash);
570 p = ulltoa(msg_len, trash.area, b_size(&trash));
571 if (p) {
572 trash.data = (p - trash.area) + 1;
573 *p = ' ';
574 }
575
576 if (!p || (trash.data + msg_len > b_size(&trash))) {
577 /* too large a message to ever fit, let's skip it */
578 ofs += cnt + msg_len;
579 continue;
580 }
581
582 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
583
584 if (ci_putchk(si_ic(si), &trash) == -1) {
585 si_rx_room_blk(si);
586 ret = 0;
587 break;
588 }
589 ofs += cnt + msg_len;
590 }
591
592 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
593 ofs += ring->ofs;
594 sft->ofs = ofs;
595 }
596 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
597
598 if (ret) {
599 /* let's be woken up once new data arrive */
600 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
601 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
602 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
603 si_rx_endp_done(si);
604 }
605 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
606
607 /* always drain data from server */
608 co_skip(si_oc(si), si_oc(si)->output);
609 return;
610
611close:
612 si_shutw(si);
613 si_shutr(si);
614 si_ic(si)->flags |= CF_READ_NULL;
615}
616
Emeric Brun494c5052020-05-28 11:13:15 +0200617void __sink_forward_session_deinit(struct sink_forward_target *sft)
618{
619 struct stream_interface *si;
620 struct stream *s;
621 struct sink *sink;
622
623 if (!sft->appctx)
624 return;
625
626 si = sft->appctx->owner;
627 if (!si)
628 return;
629
630 s = si_strm(si);
631 if (!s)
632 return;
633
634 sink = strm_fe(s)->parent;
635 if (!sink)
636 return;
637
638 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
639 LIST_DEL_INIT(&sft->appctx->wait_entry);
640 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
641
642 sft->appctx = NULL;
643 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
644}
645
646
647static void sink_forward_session_release(struct appctx *appctx)
648{
649 struct sink_forward_target *sft = appctx->ctx.peers.ptr;
650
651 if (!sft)
652 return;
653
654 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
655 if (sft->appctx == appctx)
656 __sink_forward_session_deinit(sft);
657 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
658}
659
660static struct applet sink_forward_applet = {
661 .obj_type = OBJ_TYPE_APPLET,
662 .name = "<SINKFWD>", /* used for logging */
663 .fct = sink_forward_io_handler,
664 .release = sink_forward_session_release,
665};
666
Emeric Brun97556472020-05-30 01:42:45 +0200667static struct applet sink_forward_oc_applet = {
668 .obj_type = OBJ_TYPE_APPLET,
669 .name = "<SINKFWDOC>", /* used for logging */
670 .fct = sink_forward_oc_io_handler,
671 .release = sink_forward_session_release,
672};
673
Emeric Brun494c5052020-05-28 11:13:15 +0200674/*
675 * Create a new peer session in assigned state (connect will start automatically)
676 */
677static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
678{
679 struct proxy *p = sink->forward_px;
680 struct appctx *appctx;
681 struct session *sess;
682 struct stream *s;
Emeric Brun97556472020-05-30 01:42:45 +0200683 struct applet *applet = &sink_forward_applet;
684
685 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
686 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200687
Emeric Brun97556472020-05-30 01:42:45 +0200688 appctx = appctx_new(applet, tid_bit);
Emeric Brun494c5052020-05-28 11:13:15 +0200689 if (!appctx)
690 goto out_close;
691
692 appctx->ctx.sft.ptr = (void *)sft;
693
694 sess = session_new(p, NULL, &appctx->obj_type);
695 if (!sess) {
696 ha_alert("out of memory in peer_session_create().\n");
697 goto out_free_appctx;
698 }
699
700 if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
701 ha_alert("Failed to initialize stream in peer_session_create().\n");
702 goto out_free_sess;
703 }
704
705
706 s->target = &sft->srv->obj_type;
707 if (!sockaddr_alloc(&s->target_addr))
708 goto out_free_strm;
709 *s->target_addr = sft->srv->addr;
710 s->flags = SF_ASSIGNED|SF_ADDR_SET;
711 s->si[1].flags |= SI_FL_NOLINGER;
712
713 s->do_log = NULL;
714 s->uniq_id = 0;
715
716 s->res.flags |= CF_READ_DONTWAIT;
717 /* for rto and rex to eternity to not expire on idle recv:
718 * We are using a syslog server.
719 */
720 s->res.rto = TICK_ETERNITY;
721 s->res.rex = TICK_ETERNITY;
722 sft->appctx = appctx;
723 task_wakeup(s->task, TASK_WOKEN_INIT);
724 return appctx;
725
726 /* Error unrolling */
727 out_free_strm:
728 LIST_DEL(&s->list);
729 pool_free(pool_head_stream, s);
730 out_free_sess:
731 session_free(sess);
732 out_free_appctx:
733 appctx_free(appctx);
734 out_close:
735 return NULL;
736}
737
738/*
739 * Task to handle connctions to forward servers
740 */
741static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
742{
743 struct sink *sink = (struct sink *)context;
744 struct sink_forward_target *sft = sink->sft;
745
746 task->expire = TICK_ETERNITY;
747
748 if (!stopping) {
749 while (sft) {
750 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
751 /* if appctx is NULL, start a new session */
752 if (!sft->appctx)
753 sft->appctx = sink_forward_session_create(sink, sft);
754 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
755 sft = sft->next;
756 }
757 }
758 else {
759 while (sft) {
760 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
761 /* awake applet to perform a clean close */
762 if (sft->appctx)
763 appctx_wakeup(sft->appctx);
764 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
765 sft = sft->next;
766 }
767 }
768
769 return task;
770}
771/*
772 * Init task to manage connctions to forward servers
773 *
774 * returns 0 in case of error.
775 */
776int sink_init_forward(struct sink *sink)
777{
778 sink->forward_task = task_new(MAX_THREADS_MASK);
779 if (!sink->forward_task)
780 return 0;
781
782 sink->forward_task->process = process_sink_forward;
783 sink->forward_task->context = (void *)sink;
784 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
785 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
786 return 1;
787}
Emeric Brun99c453d2020-05-25 15:01:04 +0200788/*
789 * Parse "ring" section and create corresponding sink buffer.
790 *
791 * The function returns 0 in success case, otherwise, it returns error
792 * flags.
793 */
794int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
795{
796 int err_code = 0;
797 const char *inv;
798 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200799 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200800
801 if (strcmp(args[0], "ring") == 0) { /* new peers section */
802 if (!*args[1]) {
803 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
804 err_code |= ERR_ALERT | ERR_FATAL;
805 goto err;
806 }
807
808 inv = invalid_char(args[1]);
809 if (inv) {
810 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
811 err_code |= ERR_ALERT | ERR_FATAL;
812 goto err;
813 }
814
815 if (sink_find(args[1])) {
816 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
817 err_code |= ERR_ALERT | ERR_FATAL;
818 goto err;
819 }
820
821 cfg_sink = sink_new_buf(args[1], args[1] , SINK_FMT_RAW, size);
822 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
823 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
824 err_code |= ERR_ALERT | ERR_FATAL;
825 goto err;
826 }
Emeric Brun494c5052020-05-28 11:13:15 +0200827
828 /* allocate new proxy to handle forwards */
829 p = calloc(1, sizeof *p);
830 if (!p) {
831 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
832 err_code |= ERR_ALERT | ERR_FATAL;
833 goto err;
834 }
835
836 init_new_proxy(p);
837 sink_setup_proxy(p);
838 p->parent = cfg_sink;
839 p->id = strdup(args[1]);
840 p->conf.args.file = p->conf.file = strdup(file);
841 p->conf.args.line = p->conf.line = linenum;
842 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200843 }
844 else if (strcmp(args[0], "size") == 0) {
845 size = atol(args[1]);
846 if (!size) {
847 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
848 err_code |= ERR_ALERT | ERR_FATAL;
849 goto err;
850 }
851
852 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
853 || !ring_resize(cfg_sink->ctx.ring, size)) {
854 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
855 err_code |= ERR_ALERT | ERR_FATAL;
856 goto err;
857 }
858 }
Emeric Brun494c5052020-05-28 11:13:15 +0200859 else if (strcmp(args[0],"server") == 0) {
860 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0);
861 }
862 else if (strcmp(args[0],"timeout") == 0) {
863 if (!cfg_sink || !cfg_sink->forward_px) {
864 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
865 err_code |= ERR_ALERT | ERR_FATAL;
866 goto err;
867 }
868
869 if (strcmp(args[1], "connect") == 0 ||
870 strcmp(args[1], "server") == 0) {
871 const char *res;
872 unsigned int tout;
873
874 if (!*args[2]) {
875 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
876 file, linenum, args[0], args[1]);
877 err_code |= ERR_ALERT | ERR_FATAL;
878 goto err;
879 }
880 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
881 if (res == PARSE_TIME_OVER) {
882 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
883 file, linenum, args[2], args[0], args[1]);
884 err_code |= ERR_ALERT | ERR_FATAL;
885 goto err;
886 }
887 else if (res == PARSE_TIME_UNDER) {
888 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
889 file, linenum, args[2], args[0], args[1]);
890 err_code |= ERR_ALERT | ERR_FATAL;
891 goto err;
892 }
893 else if (res) {
894 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
895 file, linenum, *res, args[0], args[1]);
896 err_code |= ERR_ALERT | ERR_FATAL;
897 goto err;
898 }
899 if (args[1][2] == 'c')
900 cfg_sink->forward_px->timeout.connect = tout;
901 else
902 cfg_sink->forward_px->timeout.server = tout;
903 }
904 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200905 else if (strcmp(args[0],"format") == 0) {
906 if (!cfg_sink) {
907 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
908 err_code |= ERR_ALERT | ERR_FATAL;
909 goto err;
910 }
911
912 if (strcmp(args[1], "raw") == 0) {
913 cfg_sink->fmt = SINK_FMT_RAW;
914 }
915 else if (strcmp(args[1], "short") == 0) {
916 cfg_sink->fmt = SINK_FMT_SHORT;
917 }
918 else if (strcmp(args[1], "iso") == 0) {
919 cfg_sink->fmt = SINK_FMT_ISO;
920 }
921 else if (strcmp(args[1], "timed") == 0) {
922 cfg_sink->fmt = SINK_FMT_TIMED;
923 }
924 else if (strcmp(args[1], "rfc3164") == 0) {
925 cfg_sink->fmt = SINK_FMT_RFC3164;
926 }
927 else if (strcmp(args[1], "rfc5424") == 0) {
928 cfg_sink->fmt = SINK_FMT_RFC5424;
929 }
930 else {
931 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
932 err_code |= ERR_ALERT | ERR_FATAL;
933 goto err;
934 }
935 }
936 else if (strcmp(args[0],"maxlen") == 0) {
937 if (!cfg_sink) {
938 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
939 err_code |= ERR_ALERT | ERR_FATAL;
940 goto err;
941 }
942
943 cfg_sink->maxlen = atol(args[1]);
944 if (!cfg_sink->maxlen) {
945 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
946 err_code |= ERR_ALERT | ERR_FATAL;
947 goto err;
948 }
949 }
950 else if (strcmp(args[0],"description") == 0) {
951 if (!cfg_sink) {
952 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
953 err_code |= ERR_ALERT | ERR_FATAL;
954 goto err;
955 }
956
957 if (!*args[1]) {
958 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
959 err_code |= ERR_ALERT | ERR_FATAL;
960 goto err;
961 }
962
963 free(cfg_sink->desc);
964
965 cfg_sink->desc = strdup(args[1]);
966 if (!cfg_sink->desc) {
967 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
968 err_code |= ERR_ALERT | ERR_FATAL;
969 goto err;
970 }
971 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200972 else {
973 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
974 err_code |= ERR_ALERT | ERR_FATAL;
975 goto err;
976 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200977
978err:
979 return err_code;
980}
981
982/*
983 * Post parsing "ring" section.
984 *
985 * The function returns 0 in success case, otherwise, it returns error
986 * flags.
987 */
988int cfg_post_parse_ring()
989{
990 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +0200991 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +0200992
993 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
994 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
995 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +0200996 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +0200997 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
998 err_code |= ERR_ALERT;
999 }
Emeric Brun494c5052020-05-28 11:13:15 +02001000
1001 /* prepare forward server descriptors */
1002 if (cfg_sink->forward_px) {
1003 srv = cfg_sink->forward_px->srv;
1004 while (srv) {
1005 struct sink_forward_target *sft;
1006 /* init ssl if needed */
1007 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
1008 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
1009 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
1010 err_code |= ERR_ALERT | ERR_FATAL;
1011 }
1012 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001013
Emeric Brun494c5052020-05-28 11:13:15 +02001014 /* allocate sink_forward_target descriptor */
1015 sft = calloc(1, sizeof(*sft));
1016 if (!sft) {
1017 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1018 err_code |= ERR_ALERT | ERR_FATAL;
1019 break;
1020 }
1021 sft->srv = srv;
1022 sft->appctx = NULL;
1023 sft->ofs = ~0; /* init ring offset */
1024 sft->next = cfg_sink->sft;
1025 HA_SPIN_INIT(&sft->lock);
1026
1027 /* mark server attached to the ring */
1028 if (!ring_attach(cfg_sink->ctx.ring)) {
1029 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1030 err_code |= ERR_ALERT | ERR_FATAL;
1031 }
1032 cfg_sink->sft = sft;
1033 srv = srv->next;
1034 }
1035 sink_init_forward(cfg_sink);
1036 }
1037 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001038 cfg_sink = NULL;
1039
1040 return err_code;
1041}
1042
1043/* resolve sink names at end of config. Returns 0 on success otherwise error
1044 * flags.
1045*/
1046int post_sink_resolve()
1047{
1048 int err_code = 0;
1049 struct logsrv *logsrv, *logb;
1050 struct sink *sink;
1051 struct proxy *px;
1052
1053 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1054 if (logsrv->type == LOG_TARGET_BUFFER) {
1055 sink = sink_find(logsrv->ring_name);
1056 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1057 ha_alert("global log server uses unkown ring named '%s'.\n", logsrv->ring_name);
1058 err_code |= ERR_ALERT | ERR_FATAL;
1059 }
1060 logsrv->sink = sink;
1061 }
1062 }
1063
1064 for (px = proxies_list; px; px = px->next) {
1065 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1066 if (logsrv->type == LOG_TARGET_BUFFER) {
1067 sink = sink_find(logsrv->ring_name);
1068 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1069 ha_alert("proxy '%s' log server uses unkown ring named '%s'.\n", px->id, logsrv->ring_name);
1070 err_code |= ERR_ALERT | ERR_FATAL;
1071 }
1072 logsrv->sink = sink;
1073 }
1074 }
1075 }
1076 return err_code;
1077}
1078
1079
Willy Tarreau973e6622019-08-20 11:57:52 +02001080static void sink_init()
1081{
1082 sink_new_fd("stdout", "standard output (fd#1)", SINK_FMT_RAW, 1);
1083 sink_new_fd("stderr", "standard output (fd#2)", SINK_FMT_RAW, 2);
Willy Tarreauf8340e32019-09-26 08:05:15 +02001084 sink_new_buf("buf0", "in-memory ring buffer", SINK_FMT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001085}
1086
1087static void sink_deinit()
1088{
1089 struct sink *sink, *sb;
1090
1091 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1092 if (sink->type == SINK_TYPE_BUFFER)
1093 ring_free(sink->ctx.ring);
1094 LIST_DEL(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001095 free(sink->name);
1096 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001097 free(sink);
1098 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001099}
1100
1101INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001102REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001103
Willy Tarreau9f830d72019-08-26 18:17:04 +02001104static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaufcf94982019-11-15 15:07:21 +01001105 { { "show", "events", NULL }, "show events [<sink>] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001106 {{},}
1107}};
1108
1109INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1110
Emeric Brun99c453d2020-05-25 15:01:04 +02001111/* config parsers for this section */
1112REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1113REGISTER_POST_CHECK(post_sink_resolve);
1114
Willy Tarreau67b5a162019-08-11 16:38:56 +02001115/*
1116 * Local variables:
1117 * c-indent-level: 8
1118 * c-basic-offset: 8
1119 * End:
1120 */