blob: 09a302633f82f903769b9523304574279d4d0743 [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau36979d92020-06-05 17:27:29 +020021#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020023#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020024#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020025#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020026#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020027#include <haproxy/log.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020028#include <haproxy/ring.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020029#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020030#include <haproxy/sink.h>
Willy Tarreau5e539c92020-06-04 20:45:39 +020031#include <haproxy/stream_interface.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020032#include <haproxy/time.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020033
34struct list sink_list = LIST_HEAD_INIT(sink_list);
35
Emeric Brun99c453d2020-05-25 15:01:04 +020036struct sink *cfg_sink;
37
Willy Tarreau67b5a162019-08-11 16:38:56 +020038struct sink *sink_find(const char *name)
39{
40 struct sink *sink;
41
42 list_for_each_entry(sink, &sink_list, sink_list)
43 if (strcmp(sink->name, name) == 0)
44 return sink;
45 return NULL;
46}
47
48/* creates a new sink and adds it to the list, it's still generic and not fully
49 * initialized. Returns NULL on allocation failure. If another one already
50 * exists with the same name, it will be returned. The caller can detect it as
51 * a newly created one has type SINK_TYPE_NEW.
52 */
Emeric Brun54648852020-07-06 15:54:06 +020053static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020054{
55 struct sink *sink;
56
57 sink = sink_find(name);
58 if (sink)
59 goto end;
60
Emeric Brun494c5052020-05-28 11:13:15 +020061 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020062 if (!sink)
63 goto end;
64
Emeric Brun99c453d2020-05-25 15:01:04 +020065 sink->name = strdup(name);
66 sink->desc = strdup(desc);
Willy Tarreau67b5a162019-08-11 16:38:56 +020067 sink->fmt = fmt;
68 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010069 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020070 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020071 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020072 sink->ctx.dropped = 0;
73 HA_RWLOCK_INIT(&sink->ctx.lock);
74 LIST_ADDQ(&sink_list, &sink->sink_list);
75 end:
76 return sink;
77}
78
Willy Tarreau973e6622019-08-20 11:57:52 +020079/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
80 * and description <desc>. Returns NULL on allocation failure or conflict.
81 * Perfect duplicates are merged (same type, fd, and name).
82 */
Emeric Brun54648852020-07-06 15:54:06 +020083struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +020084{
85 struct sink *sink;
86
87 sink = __sink_new(name, desc, fmt);
88 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
89 goto end;
90
91 if (sink->type != SINK_TYPE_NEW) {
92 sink = NULL;
93 goto end;
94 }
95
96 sink->type = SINK_TYPE_FD;
97 sink->ctx.fd = fd;
98 end:
99 return sink;
100}
101
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200102/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
103 * and description <desc>. Returns NULL on allocation failure or conflict.
104 * Perfect duplicates are merged (same type and name). If sizes differ, the
105 * largest one is kept.
106 */
Emeric Brun54648852020-07-06 15:54:06 +0200107struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200108{
109 struct sink *sink;
110
111 sink = __sink_new(name, desc, fmt);
112 if (!sink)
113 goto fail;
114
115 if (sink->type == SINK_TYPE_BUFFER) {
116 /* such a buffer already exists, we may have to resize it */
117 if (!ring_resize(sink->ctx.ring, size))
118 goto fail;
119 goto end;
120 }
121
122 if (sink->type != SINK_TYPE_NEW) {
123 /* already exists of another type */
124 goto fail;
125 }
126
127 sink->ctx.ring = ring_new(size);
128 if (!sink->ctx.ring) {
129 LIST_DEL(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200130 free(sink->name);
131 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200132 free(sink);
133 goto fail;
134 }
135
136 sink->type = SINK_TYPE_BUFFER;
137 end:
138 return sink;
139 fail:
140 return NULL;
141}
142
Willy Tarreau67b5a162019-08-11 16:38:56 +0200143/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500144 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200145 * done here. Lost messages are NOT accounted for. It is preferable to call
146 * sink_write() instead which will also try to emit the number of dropped
147 * messages when there are any. It returns >0 if it could write anything,
148 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200149 */
Emeric Brun54648852020-07-06 15:54:06 +0200150 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
151 int level, int facility, struct ist *metadata)
152 {
153 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200154 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200155
Emeric Brun54648852020-07-06 15:54:06 +0200156 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200157 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200158
Emeric Brun54648852020-07-06 15:54:06 +0200159 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200160
161send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200162 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200163 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200164 }
165 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200166 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200167 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200168 return 0;
169}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200170
Willy Tarreau8f240232019-08-27 16:41:06 +0200171/* Tries to emit a message indicating the number of dropped events. In case of
172 * success, the amount of drops is reduced by as much. It's supposed to be
173 * called under an exclusive lock on the sink to avoid multiple produces doing
174 * the same. On success, >0 is returned, otherwise <=0 on failure.
175 */
Emeric Brun54648852020-07-06 15:54:06 +0200176int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200177{
Emeric Brun54648852020-07-06 15:54:06 +0200178 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
179 static THREAD_LOCAL pid_t curr_pid;
180 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200181 unsigned int dropped;
182 struct buffer msg;
183 struct ist msgvec[1];
184 char logbuf[64];
185
186 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
187 chunk_init(&msg, logbuf, sizeof(logbuf));
188 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
189 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200190
Emeric Brun54648852020-07-06 15:54:06 +0200191 if (!metadata[LOG_META_HOST].len) {
192 if (global.log_send_hostname)
193 metadata[LOG_META_HOST] = ist2(global.log_send_hostname, strlen(global.log_send_hostname));
194 else
195 metadata[LOG_META_HOST] = ist2(hostname, strlen(hostname));
196 }
197
198 if (!metadata[LOG_META_TAG].len)
199 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
200
201 if (unlikely(curr_pid != getpid()))
202 metadata[LOG_META_PID].len = 0;
203
204 if (!metadata[LOG_META_PID].len) {
205 curr_pid = getpid();
206 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
207 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
208 }
209
210 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200211 return 0;
212 /* success! */
213 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
214 }
215 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200216}
217
Willy Tarreau9f830d72019-08-26 18:17:04 +0200218/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
219static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
220{
221 struct sink *sink;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200222 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200223
224 args++; // make args[1] the 1st arg
225
226 if (!*args[1]) {
227 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200228 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200229 list_for_each_entry(sink, &sink_list, sink_list) {
230 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
231 sink->name,
232 sink->type == SINK_TYPE_NEW ? "init" :
233 sink->type == SINK_TYPE_FD ? "fd" :
234 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
235 sink->ctx.dropped, sink->desc);
236 }
237
238 trash.area[trash.data] = 0;
239 return cli_msg(appctx, LOG_WARNING, trash.area);
240 }
241
242 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
243 return 1;
244
245 sink = sink_find(args[1]);
246 if (!sink)
247 return cli_err(appctx, "No such event sink");
248
249 if (sink->type != SINK_TYPE_BUFFER)
250 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
251
Willy Tarreau1d181e42019-08-30 11:17:01 +0200252 for (arg = 2; *args[arg]; arg++) {
253 if (strcmp(args[arg], "-w") == 0)
254 appctx->ctx.cli.i0 |= 1; // wait mode
255 else if (strcmp(args[arg], "-n") == 0)
256 appctx->ctx.cli.i0 |= 2; // seek to new
257 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
258 appctx->ctx.cli.i0 |= 3; // seek to new + wait
259 else
260 return cli_err(appctx, "unknown option");
261 }
Willy Tarreau9f830d72019-08-26 18:17:04 +0200262 return ring_attach_cli(sink->ctx.ring, appctx);
263}
264
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500265/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200266void sink_setup_proxy(struct proxy *px)
267{
268 px->last_change = now.tv_sec;
269 px->cap = PR_CAP_FE | PR_CAP_BE;
270 px->maxconn = 0;
271 px->conn_retries = 1;
272 px->timeout.server = TICK_ETERNITY;
273 px->timeout.client = TICK_ETERNITY;
274 px->timeout.connect = TICK_ETERNITY;
275 px->accept = NULL;
276 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
277 px->bind_proc = 0; /* will be filled by users */
278}
279
280/*
281 * IO Handler to handle message push to syslog tcp server
282 */
283static void sink_forward_io_handler(struct appctx *appctx)
284{
285 struct stream_interface *si = appctx->owner;
286 struct stream *s = si_strm(si);
287 struct sink *sink = strm_fe(s)->parent;
288 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
289 struct ring *ring = sink->ctx.ring;
290 struct buffer *buf = &ring->buf;
291 uint64_t msg_len;
292 size_t len, cnt, ofs;
293 int ret = 0;
294
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500295 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200296 if (unlikely(stopping))
297 goto close;
298
299 /* for rex because it seems reset to timeout
300 * and we don't want expire on this case
301 * with a syslog server
302 */
303 si_oc(si)->rex = TICK_ETERNITY;
304 /* rto should not change but it seems the case */
305 si_oc(si)->rto = TICK_ETERNITY;
306
307 /* an error was detected */
308 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
309 goto close;
310
311 /* con closed by server side */
312 if ((si_oc(si)->flags & CF_SHUTW))
313 goto close;
314
315 /* if the connection is not established, inform the stream that we want
316 * to be notified whenever the connection completes.
317 */
318 if (si_opposite(si)->state < SI_ST_EST) {
319 si_cant_get(si);
320 si_rx_conn_blk(si);
321 si_rx_endp_more(si);
322 return;
323 }
324
325 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
326 if (appctx != sft->appctx) {
327 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
328 goto close;
329 }
330 ofs = sft->ofs;
331
332 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
333 LIST_DEL_INIT(&appctx->wait_entry);
334 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
335
336 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
337
338 /* explanation for the initialization below: it would be better to do
339 * this in the parsing function but this would occasionally result in
340 * dropped events because we'd take a reference on the oldest message
341 * and keep it while being scheduled. Thus instead let's take it the
342 * first time we enter here so that we have a chance to pass many
343 * existing messages before grabbing a reference to a location. This
344 * value cannot be produced after initialization.
345 */
346 if (unlikely(ofs == ~0)) {
347 ofs = 0;
348
349 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
350 ofs += ring->ofs;
351 }
352
353 /* we were already there, adjust the offset to be relative to
354 * the buffer's head and remove us from the counter.
355 */
356 ofs -= ring->ofs;
357 BUG_ON(ofs >= buf->size);
358 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
359
360 /* in this loop, ofs always points to the counter byte that precedes
361 * the message so that we can take our reference there if we have to
362 * stop before the end (ret=0).
363 */
364 if (si_opposite(si)->state == SI_ST_EST) {
365 ret = 1;
366 while (ofs + 1 < b_data(buf)) {
367 cnt = 1;
368 len = b_peek_varint(buf, ofs + cnt, &msg_len);
369 if (!len)
370 break;
371 cnt += len;
372 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
373
374 if (unlikely(msg_len + 1 > b_size(&trash))) {
375 /* too large a message to ever fit, let's skip it */
376 ofs += cnt + msg_len;
377 continue;
378 }
379
380 chunk_reset(&trash);
381 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
382 trash.data += len;
383 trash.area[trash.data++] = '\n';
384
385 if (ci_putchk(si_ic(si), &trash) == -1) {
386 si_rx_room_blk(si);
387 ret = 0;
388 break;
389 }
390 ofs += cnt + msg_len;
391 }
392
393 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
394 ofs += ring->ofs;
395 sft->ofs = ofs;
396 }
397 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
398
399 if (ret) {
400 /* let's be woken up once new data arrive */
401 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
402 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
403 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
404 si_rx_endp_done(si);
405 }
406 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
407
408 /* always drain data from server */
409 co_skip(si_oc(si), si_oc(si)->output);
410 return;
411
412close:
413 si_shutw(si);
414 si_shutr(si);
415 si_ic(si)->flags |= CF_READ_NULL;
416}
417
Emeric Brun97556472020-05-30 01:42:45 +0200418/*
419 * IO Handler to handle message push to syslog tcp server
420 * using octet counting frames
421 */
422static void sink_forward_oc_io_handler(struct appctx *appctx)
423{
424 struct stream_interface *si = appctx->owner;
425 struct stream *s = si_strm(si);
426 struct sink *sink = strm_fe(s)->parent;
427 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
428 struct ring *ring = sink->ctx.ring;
429 struct buffer *buf = &ring->buf;
430 uint64_t msg_len;
431 size_t len, cnt, ofs;
432 int ret = 0;
433 char *p;
434
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500435 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200436 if (unlikely(stopping))
437 goto close;
438
439 /* for rex because it seems reset to timeout
440 * and we don't want expire on this case
441 * with a syslog server
442 */
443 si_oc(si)->rex = TICK_ETERNITY;
444 /* rto should not change but it seems the case */
445 si_oc(si)->rto = TICK_ETERNITY;
446
447 /* an error was detected */
448 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
449 goto close;
450
451 /* con closed by server side */
452 if ((si_oc(si)->flags & CF_SHUTW))
453 goto close;
454
455 /* if the connection is not established, inform the stream that we want
456 * to be notified whenever the connection completes.
457 */
458 if (si_opposite(si)->state < SI_ST_EST) {
459 si_cant_get(si);
460 si_rx_conn_blk(si);
461 si_rx_endp_more(si);
462 return;
463 }
464
465 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
466 if (appctx != sft->appctx) {
467 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
468 goto close;
469 }
470 ofs = sft->ofs;
471
472 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
473 LIST_DEL_INIT(&appctx->wait_entry);
474 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
475
476 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
477
478 /* explanation for the initialization below: it would be better to do
479 * this in the parsing function but this would occasionally result in
480 * dropped events because we'd take a reference on the oldest message
481 * and keep it while being scheduled. Thus instead let's take it the
482 * first time we enter here so that we have a chance to pass many
483 * existing messages before grabbing a reference to a location. This
484 * value cannot be produced after initialization.
485 */
486 if (unlikely(ofs == ~0)) {
487 ofs = 0;
488
489 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
490 ofs += ring->ofs;
491 }
492
493 /* we were already there, adjust the offset to be relative to
494 * the buffer's head and remove us from the counter.
495 */
496 ofs -= ring->ofs;
497 BUG_ON(ofs >= buf->size);
498 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
499
500 /* in this loop, ofs always points to the counter byte that precedes
501 * the message so that we can take our reference there if we have to
502 * stop before the end (ret=0).
503 */
504 if (si_opposite(si)->state == SI_ST_EST) {
505 ret = 1;
506 while (ofs + 1 < b_data(buf)) {
507 cnt = 1;
508 len = b_peek_varint(buf, ofs + cnt, &msg_len);
509 if (!len)
510 break;
511 cnt += len;
512 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
513
514 chunk_reset(&trash);
515 p = ulltoa(msg_len, trash.area, b_size(&trash));
516 if (p) {
517 trash.data = (p - trash.area) + 1;
518 *p = ' ';
519 }
520
521 if (!p || (trash.data + msg_len > b_size(&trash))) {
522 /* too large a message to ever fit, let's skip it */
523 ofs += cnt + msg_len;
524 continue;
525 }
526
527 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
528
529 if (ci_putchk(si_ic(si), &trash) == -1) {
530 si_rx_room_blk(si);
531 ret = 0;
532 break;
533 }
534 ofs += cnt + msg_len;
535 }
536
537 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
538 ofs += ring->ofs;
539 sft->ofs = ofs;
540 }
541 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
542
543 if (ret) {
544 /* let's be woken up once new data arrive */
545 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
546 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
547 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
548 si_rx_endp_done(si);
549 }
550 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
551
552 /* always drain data from server */
553 co_skip(si_oc(si), si_oc(si)->output);
554 return;
555
556close:
557 si_shutw(si);
558 si_shutr(si);
559 si_ic(si)->flags |= CF_READ_NULL;
560}
561
Emeric Brun494c5052020-05-28 11:13:15 +0200562void __sink_forward_session_deinit(struct sink_forward_target *sft)
563{
564 struct stream_interface *si;
565 struct stream *s;
566 struct sink *sink;
567
568 if (!sft->appctx)
569 return;
570
571 si = sft->appctx->owner;
572 if (!si)
573 return;
574
575 s = si_strm(si);
576 if (!s)
577 return;
578
579 sink = strm_fe(s)->parent;
580 if (!sink)
581 return;
582
583 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
584 LIST_DEL_INIT(&sft->appctx->wait_entry);
585 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
586
587 sft->appctx = NULL;
588 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
589}
590
591
592static void sink_forward_session_release(struct appctx *appctx)
593{
594 struct sink_forward_target *sft = appctx->ctx.peers.ptr;
595
596 if (!sft)
597 return;
598
599 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
600 if (sft->appctx == appctx)
601 __sink_forward_session_deinit(sft);
602 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
603}
604
605static struct applet sink_forward_applet = {
606 .obj_type = OBJ_TYPE_APPLET,
607 .name = "<SINKFWD>", /* used for logging */
608 .fct = sink_forward_io_handler,
609 .release = sink_forward_session_release,
610};
611
Emeric Brun97556472020-05-30 01:42:45 +0200612static struct applet sink_forward_oc_applet = {
613 .obj_type = OBJ_TYPE_APPLET,
614 .name = "<SINKFWDOC>", /* used for logging */
615 .fct = sink_forward_oc_io_handler,
616 .release = sink_forward_session_release,
617};
618
Emeric Brun494c5052020-05-28 11:13:15 +0200619/*
620 * Create a new peer session in assigned state (connect will start automatically)
621 */
622static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
623{
624 struct proxy *p = sink->forward_px;
625 struct appctx *appctx;
626 struct session *sess;
627 struct stream *s;
Emeric Brun97556472020-05-30 01:42:45 +0200628 struct applet *applet = &sink_forward_applet;
629
630 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
631 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200632
Emeric Brun97556472020-05-30 01:42:45 +0200633 appctx = appctx_new(applet, tid_bit);
Emeric Brun494c5052020-05-28 11:13:15 +0200634 if (!appctx)
635 goto out_close;
636
637 appctx->ctx.sft.ptr = (void *)sft;
638
639 sess = session_new(p, NULL, &appctx->obj_type);
640 if (!sess) {
641 ha_alert("out of memory in peer_session_create().\n");
642 goto out_free_appctx;
643 }
644
645 if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
646 ha_alert("Failed to initialize stream in peer_session_create().\n");
647 goto out_free_sess;
648 }
649
650
651 s->target = &sft->srv->obj_type;
Willy Tarreau9b7587a2020-10-15 07:32:10 +0200652 if (!sockaddr_alloc(&s->target_addr, &sft->srv->addr, sizeof(sft->srv->addr)))
Emeric Brun494c5052020-05-28 11:13:15 +0200653 goto out_free_strm;
Emeric Brun494c5052020-05-28 11:13:15 +0200654 s->flags = SF_ASSIGNED|SF_ADDR_SET;
655 s->si[1].flags |= SI_FL_NOLINGER;
656
657 s->do_log = NULL;
658 s->uniq_id = 0;
659
660 s->res.flags |= CF_READ_DONTWAIT;
661 /* for rto and rex to eternity to not expire on idle recv:
662 * We are using a syslog server.
663 */
664 s->res.rto = TICK_ETERNITY;
665 s->res.rex = TICK_ETERNITY;
666 sft->appctx = appctx;
667 task_wakeup(s->task, TASK_WOKEN_INIT);
668 return appctx;
669
670 /* Error unrolling */
671 out_free_strm:
672 LIST_DEL(&s->list);
673 pool_free(pool_head_stream, s);
674 out_free_sess:
675 session_free(sess);
676 out_free_appctx:
677 appctx_free(appctx);
678 out_close:
679 return NULL;
680}
681
682/*
683 * Task to handle connctions to forward servers
684 */
685static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
686{
687 struct sink *sink = (struct sink *)context;
688 struct sink_forward_target *sft = sink->sft;
689
690 task->expire = TICK_ETERNITY;
691
692 if (!stopping) {
693 while (sft) {
694 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
695 /* if appctx is NULL, start a new session */
696 if (!sft->appctx)
697 sft->appctx = sink_forward_session_create(sink, sft);
698 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
699 sft = sft->next;
700 }
701 }
702 else {
703 while (sft) {
704 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
705 /* awake applet to perform a clean close */
706 if (sft->appctx)
707 appctx_wakeup(sft->appctx);
708 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
709 sft = sft->next;
710 }
711 }
712
713 return task;
714}
715/*
716 * Init task to manage connctions to forward servers
717 *
718 * returns 0 in case of error.
719 */
720int sink_init_forward(struct sink *sink)
721{
722 sink->forward_task = task_new(MAX_THREADS_MASK);
723 if (!sink->forward_task)
724 return 0;
725
726 sink->forward_task->process = process_sink_forward;
727 sink->forward_task->context = (void *)sink;
728 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
729 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
730 return 1;
731}
Emeric Brun99c453d2020-05-25 15:01:04 +0200732/*
733 * Parse "ring" section and create corresponding sink buffer.
734 *
735 * The function returns 0 in success case, otherwise, it returns error
736 * flags.
737 */
738int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
739{
740 int err_code = 0;
741 const char *inv;
742 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200743 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200744
745 if (strcmp(args[0], "ring") == 0) { /* new peers section */
746 if (!*args[1]) {
747 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
748 err_code |= ERR_ALERT | ERR_FATAL;
749 goto err;
750 }
751
752 inv = invalid_char(args[1]);
753 if (inv) {
754 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
755 err_code |= ERR_ALERT | ERR_FATAL;
756 goto err;
757 }
758
759 if (sink_find(args[1])) {
760 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
761 err_code |= ERR_ALERT | ERR_FATAL;
762 goto err;
763 }
764
Emeric Brun54648852020-07-06 15:54:06 +0200765 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200766 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
767 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
768 err_code |= ERR_ALERT | ERR_FATAL;
769 goto err;
770 }
Emeric Brun494c5052020-05-28 11:13:15 +0200771
772 /* allocate new proxy to handle forwards */
773 p = calloc(1, sizeof *p);
774 if (!p) {
775 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
776 err_code |= ERR_ALERT | ERR_FATAL;
777 goto err;
778 }
779
780 init_new_proxy(p);
781 sink_setup_proxy(p);
782 p->parent = cfg_sink;
783 p->id = strdup(args[1]);
784 p->conf.args.file = p->conf.file = strdup(file);
785 p->conf.args.line = p->conf.line = linenum;
786 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200787 }
788 else if (strcmp(args[0], "size") == 0) {
789 size = atol(args[1]);
790 if (!size) {
791 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
792 err_code |= ERR_ALERT | ERR_FATAL;
793 goto err;
794 }
795
796 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
797 || !ring_resize(cfg_sink->ctx.ring, size)) {
798 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
799 err_code |= ERR_ALERT | ERR_FATAL;
800 goto err;
801 }
802 }
Emeric Brun494c5052020-05-28 11:13:15 +0200803 else if (strcmp(args[0],"server") == 0) {
Emeric Brund3db3842020-07-21 16:54:36 +0200804 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0, 1);
Emeric Brun494c5052020-05-28 11:13:15 +0200805 }
806 else if (strcmp(args[0],"timeout") == 0) {
807 if (!cfg_sink || !cfg_sink->forward_px) {
808 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
809 err_code |= ERR_ALERT | ERR_FATAL;
810 goto err;
811 }
812
813 if (strcmp(args[1], "connect") == 0 ||
814 strcmp(args[1], "server") == 0) {
815 const char *res;
816 unsigned int tout;
817
818 if (!*args[2]) {
819 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
820 file, linenum, args[0], args[1]);
821 err_code |= ERR_ALERT | ERR_FATAL;
822 goto err;
823 }
824 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
825 if (res == PARSE_TIME_OVER) {
826 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
827 file, linenum, args[2], args[0], args[1]);
828 err_code |= ERR_ALERT | ERR_FATAL;
829 goto err;
830 }
831 else if (res == PARSE_TIME_UNDER) {
832 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
833 file, linenum, args[2], args[0], args[1]);
834 err_code |= ERR_ALERT | ERR_FATAL;
835 goto err;
836 }
837 else if (res) {
838 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
839 file, linenum, *res, args[0], args[1]);
840 err_code |= ERR_ALERT | ERR_FATAL;
841 goto err;
842 }
843 if (args[1][2] == 'c')
844 cfg_sink->forward_px->timeout.connect = tout;
845 else
846 cfg_sink->forward_px->timeout.server = tout;
847 }
848 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200849 else if (strcmp(args[0],"format") == 0) {
850 if (!cfg_sink) {
851 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
852 err_code |= ERR_ALERT | ERR_FATAL;
853 goto err;
854 }
855
Emeric Brun54648852020-07-06 15:54:06 +0200856 cfg_sink->fmt = get_log_format(args[1]);
857 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +0200858 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
859 err_code |= ERR_ALERT | ERR_FATAL;
860 goto err;
861 }
862 }
863 else if (strcmp(args[0],"maxlen") == 0) {
864 if (!cfg_sink) {
865 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
866 err_code |= ERR_ALERT | ERR_FATAL;
867 goto err;
868 }
869
870 cfg_sink->maxlen = atol(args[1]);
871 if (!cfg_sink->maxlen) {
872 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
873 err_code |= ERR_ALERT | ERR_FATAL;
874 goto err;
875 }
876 }
877 else if (strcmp(args[0],"description") == 0) {
878 if (!cfg_sink) {
879 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
880 err_code |= ERR_ALERT | ERR_FATAL;
881 goto err;
882 }
883
884 if (!*args[1]) {
885 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
886 err_code |= ERR_ALERT | ERR_FATAL;
887 goto err;
888 }
889
890 free(cfg_sink->desc);
891
892 cfg_sink->desc = strdup(args[1]);
893 if (!cfg_sink->desc) {
894 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
895 err_code |= ERR_ALERT | ERR_FATAL;
896 goto err;
897 }
898 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200899 else {
900 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
901 err_code |= ERR_ALERT | ERR_FATAL;
902 goto err;
903 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200904
905err:
906 return err_code;
907}
908
909/*
910 * Post parsing "ring" section.
911 *
912 * The function returns 0 in success case, otherwise, it returns error
913 * flags.
914 */
915int cfg_post_parse_ring()
916{
917 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +0200918 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +0200919
920 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
921 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
922 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +0200923 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +0200924 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
925 err_code |= ERR_ALERT;
926 }
Emeric Brun494c5052020-05-28 11:13:15 +0200927
928 /* prepare forward server descriptors */
929 if (cfg_sink->forward_px) {
930 srv = cfg_sink->forward_px->srv;
931 while (srv) {
932 struct sink_forward_target *sft;
933 /* init ssl if needed */
934 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
935 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
936 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
937 err_code |= ERR_ALERT | ERR_FATAL;
938 }
939 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200940
Emeric Brun494c5052020-05-28 11:13:15 +0200941 /* allocate sink_forward_target descriptor */
942 sft = calloc(1, sizeof(*sft));
943 if (!sft) {
944 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
945 err_code |= ERR_ALERT | ERR_FATAL;
946 break;
947 }
948 sft->srv = srv;
949 sft->appctx = NULL;
950 sft->ofs = ~0; /* init ring offset */
951 sft->next = cfg_sink->sft;
952 HA_SPIN_INIT(&sft->lock);
953
954 /* mark server attached to the ring */
955 if (!ring_attach(cfg_sink->ctx.ring)) {
956 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
957 err_code |= ERR_ALERT | ERR_FATAL;
958 }
959 cfg_sink->sft = sft;
960 srv = srv->next;
961 }
962 sink_init_forward(cfg_sink);
963 }
964 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200965 cfg_sink = NULL;
966
967 return err_code;
968}
969
970/* resolve sink names at end of config. Returns 0 on success otherwise error
971 * flags.
972*/
973int post_sink_resolve()
974{
Christopher Fauletfc633b62020-11-06 15:24:23 +0100975 int err_code = ERR_NONE;
Emeric Brun99c453d2020-05-25 15:01:04 +0200976 struct logsrv *logsrv, *logb;
977 struct sink *sink;
978 struct proxy *px;
979
980 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
981 if (logsrv->type == LOG_TARGET_BUFFER) {
982 sink = sink_find(logsrv->ring_name);
983 if (!sink || sink->type != SINK_TYPE_BUFFER) {
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500984 ha_alert("global log server uses unknown ring named '%s'.\n", logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200985 err_code |= ERR_ALERT | ERR_FATAL;
986 }
987 logsrv->sink = sink;
988 }
989 }
990
991 for (px = proxies_list; px; px = px->next) {
992 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
993 if (logsrv->type == LOG_TARGET_BUFFER) {
994 sink = sink_find(logsrv->ring_name);
995 if (!sink || sink->type != SINK_TYPE_BUFFER) {
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500996 ha_alert("proxy '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200997 err_code |= ERR_ALERT | ERR_FATAL;
998 }
999 logsrv->sink = sink;
1000 }
1001 }
1002 }
Emeric Brun12941c82020-07-07 14:19:42 +02001003
1004 for (px = cfg_log_forward; px; px = px->next) {
1005 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1006 if (logsrv->type == LOG_TARGET_BUFFER) {
1007 sink = sink_find(logsrv->ring_name);
1008 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1009 ha_alert("log-forward '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name);
1010 err_code |= ERR_ALERT | ERR_FATAL;
1011 }
1012 logsrv->sink = sink;
1013 }
1014 }
1015 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001016 return err_code;
1017}
1018
1019
Willy Tarreau973e6622019-08-20 11:57:52 +02001020static void sink_init()
1021{
Emeric Brun54648852020-07-06 15:54:06 +02001022 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1023 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1024 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001025}
1026
1027static void sink_deinit()
1028{
1029 struct sink *sink, *sb;
1030
1031 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1032 if (sink->type == SINK_TYPE_BUFFER)
1033 ring_free(sink->ctx.ring);
1034 LIST_DEL(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001035 free(sink->name);
1036 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001037 free(sink);
1038 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001039}
1040
1041INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001042REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001043
Willy Tarreau9f830d72019-08-26 18:17:04 +02001044static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaufcf94982019-11-15 15:07:21 +01001045 { { "show", "events", NULL }, "show events [<sink>] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001046 {{},}
1047}};
1048
1049INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1050
Emeric Brun99c453d2020-05-25 15:01:04 +02001051/* config parsers for this section */
1052REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1053REGISTER_POST_CHECK(post_sink_resolve);
1054
Willy Tarreau67b5a162019-08-11 16:38:56 +02001055/*
1056 * Local variables:
1057 * c-indent-level: 8
1058 * c-basic-offset: 8
1059 * End:
1060 */