blob: 4b5dd5b6e0b75703c02ecb7c1c7644d93f8ba9f1 [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau36979d92020-06-05 17:27:29 +020021#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020023#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020024#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020025#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020026#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020027#include <haproxy/log.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020028#include <haproxy/ring.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020029#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020030#include <haproxy/sink.h>
Willy Tarreau5e539c92020-06-04 20:45:39 +020031#include <haproxy/stream_interface.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020032#include <haproxy/time.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020033
34struct list sink_list = LIST_HEAD_INIT(sink_list);
35
Emeric Brun99c453d2020-05-25 15:01:04 +020036struct sink *cfg_sink;
37
Willy Tarreau67b5a162019-08-11 16:38:56 +020038struct sink *sink_find(const char *name)
39{
40 struct sink *sink;
41
42 list_for_each_entry(sink, &sink_list, sink_list)
43 if (strcmp(sink->name, name) == 0)
44 return sink;
45 return NULL;
46}
47
48/* creates a new sink and adds it to the list, it's still generic and not fully
49 * initialized. Returns NULL on allocation failure. If another one already
50 * exists with the same name, it will be returned. The caller can detect it as
51 * a newly created one has type SINK_TYPE_NEW.
52 */
Emeric Brun54648852020-07-06 15:54:06 +020053static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020054{
55 struct sink *sink;
56
57 sink = sink_find(name);
58 if (sink)
59 goto end;
60
Emeric Brun494c5052020-05-28 11:13:15 +020061 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020062 if (!sink)
63 goto end;
64
Emeric Brun99c453d2020-05-25 15:01:04 +020065 sink->name = strdup(name);
66 sink->desc = strdup(desc);
Willy Tarreau67b5a162019-08-11 16:38:56 +020067 sink->fmt = fmt;
68 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010069 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020070 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020071 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020072 sink->ctx.dropped = 0;
73 HA_RWLOCK_INIT(&sink->ctx.lock);
74 LIST_ADDQ(&sink_list, &sink->sink_list);
75 end:
76 return sink;
77}
78
Willy Tarreau973e6622019-08-20 11:57:52 +020079/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
80 * and description <desc>. Returns NULL on allocation failure or conflict.
81 * Perfect duplicates are merged (same type, fd, and name).
82 */
Emeric Brun54648852020-07-06 15:54:06 +020083struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +020084{
85 struct sink *sink;
86
87 sink = __sink_new(name, desc, fmt);
88 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
89 goto end;
90
91 if (sink->type != SINK_TYPE_NEW) {
92 sink = NULL;
93 goto end;
94 }
95
96 sink->type = SINK_TYPE_FD;
97 sink->ctx.fd = fd;
98 end:
99 return sink;
100}
101
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200102/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
103 * and description <desc>. Returns NULL on allocation failure or conflict.
104 * Perfect duplicates are merged (same type and name). If sizes differ, the
105 * largest one is kept.
106 */
Emeric Brun54648852020-07-06 15:54:06 +0200107struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200108{
109 struct sink *sink;
110
111 sink = __sink_new(name, desc, fmt);
112 if (!sink)
113 goto fail;
114
115 if (sink->type == SINK_TYPE_BUFFER) {
116 /* such a buffer already exists, we may have to resize it */
117 if (!ring_resize(sink->ctx.ring, size))
118 goto fail;
119 goto end;
120 }
121
122 if (sink->type != SINK_TYPE_NEW) {
123 /* already exists of another type */
124 goto fail;
125 }
126
127 sink->ctx.ring = ring_new(size);
128 if (!sink->ctx.ring) {
129 LIST_DEL(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200130 free(sink->name);
131 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200132 free(sink);
133 goto fail;
134 }
135
136 sink->type = SINK_TYPE_BUFFER;
137 end:
138 return sink;
139 fail:
140 return NULL;
141}
142
Willy Tarreau67b5a162019-08-11 16:38:56 +0200143/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500144 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200145 * done here. Lost messages are NOT accounted for. It is preferable to call
146 * sink_write() instead which will also try to emit the number of dropped
147 * messages when there are any. It returns >0 if it could write anything,
148 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200149 */
Emeric Brun54648852020-07-06 15:54:06 +0200150 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
151 int level, int facility, struct ist *metadata)
152 {
153 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200154 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200155
Emeric Brun54648852020-07-06 15:54:06 +0200156 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200157 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200158
Emeric Brun54648852020-07-06 15:54:06 +0200159 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200160
161send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200162 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200163 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200164 }
165 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200166 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200167 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200168 return 0;
169}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200170
Willy Tarreau8f240232019-08-27 16:41:06 +0200171/* Tries to emit a message indicating the number of dropped events. In case of
172 * success, the amount of drops is reduced by as much. It's supposed to be
173 * called under an exclusive lock on the sink to avoid multiple produces doing
174 * the same. On success, >0 is returned, otherwise <=0 on failure.
175 */
Emeric Brun54648852020-07-06 15:54:06 +0200176int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200177{
Emeric Brun54648852020-07-06 15:54:06 +0200178 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
179 static THREAD_LOCAL pid_t curr_pid;
180 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200181 unsigned int dropped;
182 struct buffer msg;
183 struct ist msgvec[1];
184 char logbuf[64];
185
186 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
187 chunk_init(&msg, logbuf, sizeof(logbuf));
188 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
189 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200190
Emeric Brun54648852020-07-06 15:54:06 +0200191 if (!metadata[LOG_META_HOST].len) {
192 if (global.log_send_hostname)
193 metadata[LOG_META_HOST] = ist2(global.log_send_hostname, strlen(global.log_send_hostname));
194 else
195 metadata[LOG_META_HOST] = ist2(hostname, strlen(hostname));
196 }
197
198 if (!metadata[LOG_META_TAG].len)
199 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
200
201 if (unlikely(curr_pid != getpid()))
202 metadata[LOG_META_PID].len = 0;
203
204 if (!metadata[LOG_META_PID].len) {
205 curr_pid = getpid();
206 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
207 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
208 }
209
210 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200211 return 0;
212 /* success! */
213 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
214 }
215 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200216}
217
Willy Tarreau9f830d72019-08-26 18:17:04 +0200218/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
219static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
220{
221 struct sink *sink;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200222 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200223
224 args++; // make args[1] the 1st arg
225
226 if (!*args[1]) {
227 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200228 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200229 list_for_each_entry(sink, &sink_list, sink_list) {
230 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
231 sink->name,
232 sink->type == SINK_TYPE_NEW ? "init" :
233 sink->type == SINK_TYPE_FD ? "fd" :
234 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
235 sink->ctx.dropped, sink->desc);
236 }
237
238 trash.area[trash.data] = 0;
239 return cli_msg(appctx, LOG_WARNING, trash.area);
240 }
241
242 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
243 return 1;
244
245 sink = sink_find(args[1]);
246 if (!sink)
247 return cli_err(appctx, "No such event sink");
248
249 if (sink->type != SINK_TYPE_BUFFER)
250 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
251
Willy Tarreau1d181e42019-08-30 11:17:01 +0200252 for (arg = 2; *args[arg]; arg++) {
253 if (strcmp(args[arg], "-w") == 0)
254 appctx->ctx.cli.i0 |= 1; // wait mode
255 else if (strcmp(args[arg], "-n") == 0)
256 appctx->ctx.cli.i0 |= 2; // seek to new
257 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
258 appctx->ctx.cli.i0 |= 3; // seek to new + wait
259 else
260 return cli_err(appctx, "unknown option");
261 }
Willy Tarreau9f830d72019-08-26 18:17:04 +0200262 return ring_attach_cli(sink->ctx.ring, appctx);
263}
264
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500265/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200266void sink_setup_proxy(struct proxy *px)
267{
268 px->last_change = now.tv_sec;
269 px->cap = PR_CAP_FE | PR_CAP_BE;
270 px->maxconn = 0;
271 px->conn_retries = 1;
272 px->timeout.server = TICK_ETERNITY;
273 px->timeout.client = TICK_ETERNITY;
274 px->timeout.connect = TICK_ETERNITY;
275 px->accept = NULL;
276 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
277 px->bind_proc = 0; /* will be filled by users */
278}
279
280/*
281 * IO Handler to handle message push to syslog tcp server
282 */
283static void sink_forward_io_handler(struct appctx *appctx)
284{
285 struct stream_interface *si = appctx->owner;
286 struct stream *s = si_strm(si);
287 struct sink *sink = strm_fe(s)->parent;
288 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
289 struct ring *ring = sink->ctx.ring;
290 struct buffer *buf = &ring->buf;
291 uint64_t msg_len;
292 size_t len, cnt, ofs;
293 int ret = 0;
294
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500295 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200296 if (unlikely(stopping))
297 goto close;
298
299 /* for rex because it seems reset to timeout
300 * and we don't want expire on this case
301 * with a syslog server
302 */
303 si_oc(si)->rex = TICK_ETERNITY;
304 /* rto should not change but it seems the case */
305 si_oc(si)->rto = TICK_ETERNITY;
306
307 /* an error was detected */
308 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
309 goto close;
310
311 /* con closed by server side */
312 if ((si_oc(si)->flags & CF_SHUTW))
313 goto close;
314
315 /* if the connection is not established, inform the stream that we want
316 * to be notified whenever the connection completes.
317 */
318 if (si_opposite(si)->state < SI_ST_EST) {
319 si_cant_get(si);
320 si_rx_conn_blk(si);
321 si_rx_endp_more(si);
322 return;
323 }
324
325 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
326 if (appctx != sft->appctx) {
327 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
328 goto close;
329 }
330 ofs = sft->ofs;
331
332 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
333 LIST_DEL_INIT(&appctx->wait_entry);
334 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
335
336 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
337
338 /* explanation for the initialization below: it would be better to do
339 * this in the parsing function but this would occasionally result in
340 * dropped events because we'd take a reference on the oldest message
341 * and keep it while being scheduled. Thus instead let's take it the
342 * first time we enter here so that we have a chance to pass many
343 * existing messages before grabbing a reference to a location. This
344 * value cannot be produced after initialization.
345 */
346 if (unlikely(ofs == ~0)) {
347 ofs = 0;
348
349 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
350 ofs += ring->ofs;
351 }
352
353 /* we were already there, adjust the offset to be relative to
354 * the buffer's head and remove us from the counter.
355 */
356 ofs -= ring->ofs;
357 BUG_ON(ofs >= buf->size);
358 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
359
360 /* in this loop, ofs always points to the counter byte that precedes
361 * the message so that we can take our reference there if we have to
362 * stop before the end (ret=0).
363 */
364 if (si_opposite(si)->state == SI_ST_EST) {
365 ret = 1;
366 while (ofs + 1 < b_data(buf)) {
367 cnt = 1;
368 len = b_peek_varint(buf, ofs + cnt, &msg_len);
369 if (!len)
370 break;
371 cnt += len;
372 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
373
374 if (unlikely(msg_len + 1 > b_size(&trash))) {
375 /* too large a message to ever fit, let's skip it */
376 ofs += cnt + msg_len;
377 continue;
378 }
379
380 chunk_reset(&trash);
381 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
382 trash.data += len;
383 trash.area[trash.data++] = '\n';
384
385 if (ci_putchk(si_ic(si), &trash) == -1) {
386 si_rx_room_blk(si);
387 ret = 0;
388 break;
389 }
390 ofs += cnt + msg_len;
391 }
392
393 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
394 ofs += ring->ofs;
395 sft->ofs = ofs;
396 }
397 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
398
399 if (ret) {
400 /* let's be woken up once new data arrive */
401 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
402 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
403 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
404 si_rx_endp_done(si);
405 }
406 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
407
408 /* always drain data from server */
409 co_skip(si_oc(si), si_oc(si)->output);
410 return;
411
412close:
413 si_shutw(si);
414 si_shutr(si);
415 si_ic(si)->flags |= CF_READ_NULL;
416}
417
Emeric Brun97556472020-05-30 01:42:45 +0200418/*
419 * IO Handler to handle message push to syslog tcp server
420 * using octet counting frames
421 */
422static void sink_forward_oc_io_handler(struct appctx *appctx)
423{
424 struct stream_interface *si = appctx->owner;
425 struct stream *s = si_strm(si);
426 struct sink *sink = strm_fe(s)->parent;
427 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
428 struct ring *ring = sink->ctx.ring;
429 struct buffer *buf = &ring->buf;
430 uint64_t msg_len;
431 size_t len, cnt, ofs;
432 int ret = 0;
433 char *p;
434
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500435 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200436 if (unlikely(stopping))
437 goto close;
438
439 /* for rex because it seems reset to timeout
440 * and we don't want expire on this case
441 * with a syslog server
442 */
443 si_oc(si)->rex = TICK_ETERNITY;
444 /* rto should not change but it seems the case */
445 si_oc(si)->rto = TICK_ETERNITY;
446
447 /* an error was detected */
448 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
449 goto close;
450
451 /* con closed by server side */
452 if ((si_oc(si)->flags & CF_SHUTW))
453 goto close;
454
455 /* if the connection is not established, inform the stream that we want
456 * to be notified whenever the connection completes.
457 */
458 if (si_opposite(si)->state < SI_ST_EST) {
459 si_cant_get(si);
460 si_rx_conn_blk(si);
461 si_rx_endp_more(si);
462 return;
463 }
464
465 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
466 if (appctx != sft->appctx) {
467 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
468 goto close;
469 }
470 ofs = sft->ofs;
471
472 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
473 LIST_DEL_INIT(&appctx->wait_entry);
474 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
475
476 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
477
478 /* explanation for the initialization below: it would be better to do
479 * this in the parsing function but this would occasionally result in
480 * dropped events because we'd take a reference on the oldest message
481 * and keep it while being scheduled. Thus instead let's take it the
482 * first time we enter here so that we have a chance to pass many
483 * existing messages before grabbing a reference to a location. This
484 * value cannot be produced after initialization.
485 */
486 if (unlikely(ofs == ~0)) {
487 ofs = 0;
488
489 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
490 ofs += ring->ofs;
491 }
492
493 /* we were already there, adjust the offset to be relative to
494 * the buffer's head and remove us from the counter.
495 */
496 ofs -= ring->ofs;
497 BUG_ON(ofs >= buf->size);
498 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
499
500 /* in this loop, ofs always points to the counter byte that precedes
501 * the message so that we can take our reference there if we have to
502 * stop before the end (ret=0).
503 */
504 if (si_opposite(si)->state == SI_ST_EST) {
505 ret = 1;
506 while (ofs + 1 < b_data(buf)) {
507 cnt = 1;
508 len = b_peek_varint(buf, ofs + cnt, &msg_len);
509 if (!len)
510 break;
511 cnt += len;
512 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
513
514 chunk_reset(&trash);
515 p = ulltoa(msg_len, trash.area, b_size(&trash));
516 if (p) {
517 trash.data = (p - trash.area) + 1;
518 *p = ' ';
519 }
520
521 if (!p || (trash.data + msg_len > b_size(&trash))) {
522 /* too large a message to ever fit, let's skip it */
523 ofs += cnt + msg_len;
524 continue;
525 }
526
527 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
528
529 if (ci_putchk(si_ic(si), &trash) == -1) {
530 si_rx_room_blk(si);
531 ret = 0;
532 break;
533 }
534 ofs += cnt + msg_len;
535 }
536
537 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
538 ofs += ring->ofs;
539 sft->ofs = ofs;
540 }
541 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
542
543 if (ret) {
544 /* let's be woken up once new data arrive */
545 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
546 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
547 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
548 si_rx_endp_done(si);
549 }
550 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
551
552 /* always drain data from server */
553 co_skip(si_oc(si), si_oc(si)->output);
554 return;
555
556close:
557 si_shutw(si);
558 si_shutr(si);
559 si_ic(si)->flags |= CF_READ_NULL;
560}
561
Emeric Brun494c5052020-05-28 11:13:15 +0200562void __sink_forward_session_deinit(struct sink_forward_target *sft)
563{
564 struct stream_interface *si;
565 struct stream *s;
566 struct sink *sink;
567
568 if (!sft->appctx)
569 return;
570
571 si = sft->appctx->owner;
572 if (!si)
573 return;
574
575 s = si_strm(si);
576 if (!s)
577 return;
578
579 sink = strm_fe(s)->parent;
580 if (!sink)
581 return;
582
583 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
584 LIST_DEL_INIT(&sft->appctx->wait_entry);
585 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
586
587 sft->appctx = NULL;
588 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
589}
590
591
592static void sink_forward_session_release(struct appctx *appctx)
593{
594 struct sink_forward_target *sft = appctx->ctx.peers.ptr;
595
596 if (!sft)
597 return;
598
599 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
600 if (sft->appctx == appctx)
601 __sink_forward_session_deinit(sft);
602 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
603}
604
605static struct applet sink_forward_applet = {
606 .obj_type = OBJ_TYPE_APPLET,
607 .name = "<SINKFWD>", /* used for logging */
608 .fct = sink_forward_io_handler,
609 .release = sink_forward_session_release,
610};
611
Emeric Brun97556472020-05-30 01:42:45 +0200612static struct applet sink_forward_oc_applet = {
613 .obj_type = OBJ_TYPE_APPLET,
614 .name = "<SINKFWDOC>", /* used for logging */
615 .fct = sink_forward_oc_io_handler,
616 .release = sink_forward_session_release,
617};
618
Emeric Brun494c5052020-05-28 11:13:15 +0200619/*
620 * Create a new peer session in assigned state (connect will start automatically)
621 */
622static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
623{
624 struct proxy *p = sink->forward_px;
625 struct appctx *appctx;
626 struct session *sess;
627 struct stream *s;
Emeric Brun97556472020-05-30 01:42:45 +0200628 struct applet *applet = &sink_forward_applet;
629
630 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
631 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200632
Emeric Brun97556472020-05-30 01:42:45 +0200633 appctx = appctx_new(applet, tid_bit);
Emeric Brun494c5052020-05-28 11:13:15 +0200634 if (!appctx)
635 goto out_close;
636
637 appctx->ctx.sft.ptr = (void *)sft;
638
639 sess = session_new(p, NULL, &appctx->obj_type);
640 if (!sess) {
641 ha_alert("out of memory in peer_session_create().\n");
642 goto out_free_appctx;
643 }
644
645 if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
646 ha_alert("Failed to initialize stream in peer_session_create().\n");
647 goto out_free_sess;
648 }
649
650
651 s->target = &sft->srv->obj_type;
652 if (!sockaddr_alloc(&s->target_addr))
653 goto out_free_strm;
654 *s->target_addr = sft->srv->addr;
655 s->flags = SF_ASSIGNED|SF_ADDR_SET;
656 s->si[1].flags |= SI_FL_NOLINGER;
657
658 s->do_log = NULL;
659 s->uniq_id = 0;
660
661 s->res.flags |= CF_READ_DONTWAIT;
662 /* for rto and rex to eternity to not expire on idle recv:
663 * We are using a syslog server.
664 */
665 s->res.rto = TICK_ETERNITY;
666 s->res.rex = TICK_ETERNITY;
667 sft->appctx = appctx;
668 task_wakeup(s->task, TASK_WOKEN_INIT);
669 return appctx;
670
671 /* Error unrolling */
672 out_free_strm:
673 LIST_DEL(&s->list);
674 pool_free(pool_head_stream, s);
675 out_free_sess:
676 session_free(sess);
677 out_free_appctx:
678 appctx_free(appctx);
679 out_close:
680 return NULL;
681}
682
683/*
684 * Task to handle connctions to forward servers
685 */
686static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
687{
688 struct sink *sink = (struct sink *)context;
689 struct sink_forward_target *sft = sink->sft;
690
691 task->expire = TICK_ETERNITY;
692
693 if (!stopping) {
694 while (sft) {
695 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
696 /* if appctx is NULL, start a new session */
697 if (!sft->appctx)
698 sft->appctx = sink_forward_session_create(sink, sft);
699 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
700 sft = sft->next;
701 }
702 }
703 else {
704 while (sft) {
705 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
706 /* awake applet to perform a clean close */
707 if (sft->appctx)
708 appctx_wakeup(sft->appctx);
709 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
710 sft = sft->next;
711 }
712 }
713
714 return task;
715}
716/*
717 * Init task to manage connctions to forward servers
718 *
719 * returns 0 in case of error.
720 */
721int sink_init_forward(struct sink *sink)
722{
723 sink->forward_task = task_new(MAX_THREADS_MASK);
724 if (!sink->forward_task)
725 return 0;
726
727 sink->forward_task->process = process_sink_forward;
728 sink->forward_task->context = (void *)sink;
729 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
730 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
731 return 1;
732}
Emeric Brun99c453d2020-05-25 15:01:04 +0200733/*
734 * Parse "ring" section and create corresponding sink buffer.
735 *
736 * The function returns 0 in success case, otherwise, it returns error
737 * flags.
738 */
739int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
740{
741 int err_code = 0;
742 const char *inv;
743 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200744 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200745
746 if (strcmp(args[0], "ring") == 0) { /* new peers section */
747 if (!*args[1]) {
748 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
749 err_code |= ERR_ALERT | ERR_FATAL;
750 goto err;
751 }
752
753 inv = invalid_char(args[1]);
754 if (inv) {
755 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
756 err_code |= ERR_ALERT | ERR_FATAL;
757 goto err;
758 }
759
760 if (sink_find(args[1])) {
761 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
762 err_code |= ERR_ALERT | ERR_FATAL;
763 goto err;
764 }
765
Emeric Brun54648852020-07-06 15:54:06 +0200766 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200767 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
768 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
769 err_code |= ERR_ALERT | ERR_FATAL;
770 goto err;
771 }
Emeric Brun494c5052020-05-28 11:13:15 +0200772
773 /* allocate new proxy to handle forwards */
774 p = calloc(1, sizeof *p);
775 if (!p) {
776 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
777 err_code |= ERR_ALERT | ERR_FATAL;
778 goto err;
779 }
780
781 init_new_proxy(p);
782 sink_setup_proxy(p);
783 p->parent = cfg_sink;
784 p->id = strdup(args[1]);
785 p->conf.args.file = p->conf.file = strdup(file);
786 p->conf.args.line = p->conf.line = linenum;
787 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200788 }
789 else if (strcmp(args[0], "size") == 0) {
790 size = atol(args[1]);
791 if (!size) {
792 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
793 err_code |= ERR_ALERT | ERR_FATAL;
794 goto err;
795 }
796
797 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
798 || !ring_resize(cfg_sink->ctx.ring, size)) {
799 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
800 err_code |= ERR_ALERT | ERR_FATAL;
801 goto err;
802 }
803 }
Emeric Brun494c5052020-05-28 11:13:15 +0200804 else if (strcmp(args[0],"server") == 0) {
805 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0);
806 }
807 else if (strcmp(args[0],"timeout") == 0) {
808 if (!cfg_sink || !cfg_sink->forward_px) {
809 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
810 err_code |= ERR_ALERT | ERR_FATAL;
811 goto err;
812 }
813
814 if (strcmp(args[1], "connect") == 0 ||
815 strcmp(args[1], "server") == 0) {
816 const char *res;
817 unsigned int tout;
818
819 if (!*args[2]) {
820 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
821 file, linenum, args[0], args[1]);
822 err_code |= ERR_ALERT | ERR_FATAL;
823 goto err;
824 }
825 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
826 if (res == PARSE_TIME_OVER) {
827 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
828 file, linenum, args[2], args[0], args[1]);
829 err_code |= ERR_ALERT | ERR_FATAL;
830 goto err;
831 }
832 else if (res == PARSE_TIME_UNDER) {
833 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
834 file, linenum, args[2], args[0], args[1]);
835 err_code |= ERR_ALERT | ERR_FATAL;
836 goto err;
837 }
838 else if (res) {
839 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
840 file, linenum, *res, args[0], args[1]);
841 err_code |= ERR_ALERT | ERR_FATAL;
842 goto err;
843 }
844 if (args[1][2] == 'c')
845 cfg_sink->forward_px->timeout.connect = tout;
846 else
847 cfg_sink->forward_px->timeout.server = tout;
848 }
849 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200850 else if (strcmp(args[0],"format") == 0) {
851 if (!cfg_sink) {
852 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
853 err_code |= ERR_ALERT | ERR_FATAL;
854 goto err;
855 }
856
Emeric Brun54648852020-07-06 15:54:06 +0200857 cfg_sink->fmt = get_log_format(args[1]);
858 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +0200859 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
860 err_code |= ERR_ALERT | ERR_FATAL;
861 goto err;
862 }
863 }
864 else if (strcmp(args[0],"maxlen") == 0) {
865 if (!cfg_sink) {
866 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
867 err_code |= ERR_ALERT | ERR_FATAL;
868 goto err;
869 }
870
871 cfg_sink->maxlen = atol(args[1]);
872 if (!cfg_sink->maxlen) {
873 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
874 err_code |= ERR_ALERT | ERR_FATAL;
875 goto err;
876 }
877 }
878 else if (strcmp(args[0],"description") == 0) {
879 if (!cfg_sink) {
880 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
881 err_code |= ERR_ALERT | ERR_FATAL;
882 goto err;
883 }
884
885 if (!*args[1]) {
886 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
887 err_code |= ERR_ALERT | ERR_FATAL;
888 goto err;
889 }
890
891 free(cfg_sink->desc);
892
893 cfg_sink->desc = strdup(args[1]);
894 if (!cfg_sink->desc) {
895 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
896 err_code |= ERR_ALERT | ERR_FATAL;
897 goto err;
898 }
899 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200900 else {
901 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
902 err_code |= ERR_ALERT | ERR_FATAL;
903 goto err;
904 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200905
906err:
907 return err_code;
908}
909
910/*
911 * Post parsing "ring" section.
912 *
913 * The function returns 0 in success case, otherwise, it returns error
914 * flags.
915 */
916int cfg_post_parse_ring()
917{
918 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +0200919 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +0200920
921 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
922 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
923 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +0200924 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +0200925 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
926 err_code |= ERR_ALERT;
927 }
Emeric Brun494c5052020-05-28 11:13:15 +0200928
929 /* prepare forward server descriptors */
930 if (cfg_sink->forward_px) {
931 srv = cfg_sink->forward_px->srv;
932 while (srv) {
933 struct sink_forward_target *sft;
934 /* init ssl if needed */
935 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
936 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
937 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
938 err_code |= ERR_ALERT | ERR_FATAL;
939 }
940 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200941
Emeric Brun494c5052020-05-28 11:13:15 +0200942 /* allocate sink_forward_target descriptor */
943 sft = calloc(1, sizeof(*sft));
944 if (!sft) {
945 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
946 err_code |= ERR_ALERT | ERR_FATAL;
947 break;
948 }
949 sft->srv = srv;
950 sft->appctx = NULL;
951 sft->ofs = ~0; /* init ring offset */
952 sft->next = cfg_sink->sft;
953 HA_SPIN_INIT(&sft->lock);
954
955 /* mark server attached to the ring */
956 if (!ring_attach(cfg_sink->ctx.ring)) {
957 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
958 err_code |= ERR_ALERT | ERR_FATAL;
959 }
960 cfg_sink->sft = sft;
961 srv = srv->next;
962 }
963 sink_init_forward(cfg_sink);
964 }
965 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200966 cfg_sink = NULL;
967
968 return err_code;
969}
970
971/* resolve sink names at end of config. Returns 0 on success otherwise error
972 * flags.
973*/
974int post_sink_resolve()
975{
976 int err_code = 0;
977 struct logsrv *logsrv, *logb;
978 struct sink *sink;
979 struct proxy *px;
980
981 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
982 if (logsrv->type == LOG_TARGET_BUFFER) {
983 sink = sink_find(logsrv->ring_name);
984 if (!sink || sink->type != SINK_TYPE_BUFFER) {
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500985 ha_alert("global log server uses unknown ring named '%s'.\n", logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200986 err_code |= ERR_ALERT | ERR_FATAL;
987 }
988 logsrv->sink = sink;
989 }
990 }
991
992 for (px = proxies_list; px; px = px->next) {
993 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
994 if (logsrv->type == LOG_TARGET_BUFFER) {
995 sink = sink_find(logsrv->ring_name);
996 if (!sink || sink->type != SINK_TYPE_BUFFER) {
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500997 ha_alert("proxy '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200998 err_code |= ERR_ALERT | ERR_FATAL;
999 }
1000 logsrv->sink = sink;
1001 }
1002 }
1003 }
Emeric Brun12941c82020-07-07 14:19:42 +02001004
1005 for (px = cfg_log_forward; px; px = px->next) {
1006 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1007 if (logsrv->type == LOG_TARGET_BUFFER) {
1008 sink = sink_find(logsrv->ring_name);
1009 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1010 ha_alert("log-forward '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name);
1011 err_code |= ERR_ALERT | ERR_FATAL;
1012 }
1013 logsrv->sink = sink;
1014 }
1015 }
1016 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001017 return err_code;
1018}
1019
1020
Willy Tarreau973e6622019-08-20 11:57:52 +02001021static void sink_init()
1022{
Emeric Brun54648852020-07-06 15:54:06 +02001023 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1024 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1025 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001026}
1027
1028static void sink_deinit()
1029{
1030 struct sink *sink, *sb;
1031
1032 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1033 if (sink->type == SINK_TYPE_BUFFER)
1034 ring_free(sink->ctx.ring);
1035 LIST_DEL(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001036 free(sink->name);
1037 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001038 free(sink);
1039 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001040}
1041
1042INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001043REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001044
Willy Tarreau9f830d72019-08-26 18:17:04 +02001045static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaufcf94982019-11-15 15:07:21 +01001046 { { "show", "events", NULL }, "show events [<sink>] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001047 {{},}
1048}};
1049
1050INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1051
Emeric Brun99c453d2020-05-25 15:01:04 +02001052/* config parsers for this section */
1053REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1054REGISTER_POST_CHECK(post_sink_resolve);
1055
Willy Tarreau67b5a162019-08-11 16:38:56 +02001056/*
1057 * Local variables:
1058 * c-indent-level: 8
1059 * c-basic-offset: 8
1060 * End:
1061 */