blob: 7257a4ae82b25be81bff7d5ef4968d9dfda169e0 [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau36979d92020-06-05 17:27:29 +020021#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020023#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020024#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020025#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020026#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020027#include <haproxy/log.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020028#include <haproxy/ring.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020029#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020030#include <haproxy/sink.h>
Willy Tarreau5e539c92020-06-04 20:45:39 +020031#include <haproxy/stream_interface.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020032#include <haproxy/time.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020033
34struct list sink_list = LIST_HEAD_INIT(sink_list);
35
Emeric Brun99c453d2020-05-25 15:01:04 +020036struct sink *cfg_sink;
37
Willy Tarreau67b5a162019-08-11 16:38:56 +020038struct sink *sink_find(const char *name)
39{
40 struct sink *sink;
41
42 list_for_each_entry(sink, &sink_list, sink_list)
43 if (strcmp(sink->name, name) == 0)
44 return sink;
45 return NULL;
46}
47
48/* creates a new sink and adds it to the list, it's still generic and not fully
49 * initialized. Returns NULL on allocation failure. If another one already
50 * exists with the same name, it will be returned. The caller can detect it as
51 * a newly created one has type SINK_TYPE_NEW.
52 */
Emeric Brun54648852020-07-06 15:54:06 +020053static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020054{
55 struct sink *sink;
56
57 sink = sink_find(name);
58 if (sink)
59 goto end;
60
Emeric Brun494c5052020-05-28 11:13:15 +020061 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020062 if (!sink)
63 goto end;
64
Emeric Brun99c453d2020-05-25 15:01:04 +020065 sink->name = strdup(name);
66 sink->desc = strdup(desc);
Willy Tarreau67b5a162019-08-11 16:38:56 +020067 sink->fmt = fmt;
68 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010069 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020070 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020071 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020072 sink->ctx.dropped = 0;
73 HA_RWLOCK_INIT(&sink->ctx.lock);
74 LIST_ADDQ(&sink_list, &sink->sink_list);
75 end:
76 return sink;
77}
78
Willy Tarreau973e6622019-08-20 11:57:52 +020079/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
80 * and description <desc>. Returns NULL on allocation failure or conflict.
81 * Perfect duplicates are merged (same type, fd, and name).
82 */
Emeric Brun54648852020-07-06 15:54:06 +020083struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +020084{
85 struct sink *sink;
86
87 sink = __sink_new(name, desc, fmt);
88 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
89 goto end;
90
91 if (sink->type != SINK_TYPE_NEW) {
92 sink = NULL;
93 goto end;
94 }
95
96 sink->type = SINK_TYPE_FD;
97 sink->ctx.fd = fd;
98 end:
99 return sink;
100}
101
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200102/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
103 * and description <desc>. Returns NULL on allocation failure or conflict.
104 * Perfect duplicates are merged (same type and name). If sizes differ, the
105 * largest one is kept.
106 */
Emeric Brun54648852020-07-06 15:54:06 +0200107struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200108{
109 struct sink *sink;
110
111 sink = __sink_new(name, desc, fmt);
112 if (!sink)
113 goto fail;
114
115 if (sink->type == SINK_TYPE_BUFFER) {
116 /* such a buffer already exists, we may have to resize it */
117 if (!ring_resize(sink->ctx.ring, size))
118 goto fail;
119 goto end;
120 }
121
122 if (sink->type != SINK_TYPE_NEW) {
123 /* already exists of another type */
124 goto fail;
125 }
126
127 sink->ctx.ring = ring_new(size);
128 if (!sink->ctx.ring) {
129 LIST_DEL(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200130 free(sink->name);
131 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200132 free(sink);
133 goto fail;
134 }
135
136 sink->type = SINK_TYPE_BUFFER;
137 end:
138 return sink;
139 fail:
140 return NULL;
141}
142
Willy Tarreau67b5a162019-08-11 16:38:56 +0200143/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500144 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200145 * done here. Lost messages are NOT accounted for. It is preferable to call
146 * sink_write() instead which will also try to emit the number of dropped
147 * messages when there are any. It returns >0 if it could write anything,
148 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200149 */
Emeric Brun54648852020-07-06 15:54:06 +0200150 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
151 int level, int facility, struct ist *metadata)
152 {
153 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200154 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200155
Emeric Brun54648852020-07-06 15:54:06 +0200156 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200157 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200158
Emeric Brun54648852020-07-06 15:54:06 +0200159 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200160
161send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200162 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200163 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200164 }
165 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200166 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200167 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200168 return 0;
169}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200170
Willy Tarreau8f240232019-08-27 16:41:06 +0200171/* Tries to emit a message indicating the number of dropped events. In case of
172 * success, the amount of drops is reduced by as much. It's supposed to be
173 * called under an exclusive lock on the sink to avoid multiple produces doing
174 * the same. On success, >0 is returned, otherwise <=0 on failure.
175 */
Emeric Brun54648852020-07-06 15:54:06 +0200176int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200177{
Emeric Brun54648852020-07-06 15:54:06 +0200178 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
179 static THREAD_LOCAL pid_t curr_pid;
180 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200181 unsigned int dropped;
182 struct buffer msg;
183 struct ist msgvec[1];
184 char logbuf[64];
185
186 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
187 chunk_init(&msg, logbuf, sizeof(logbuf));
188 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
189 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200190
Emeric Brun54648852020-07-06 15:54:06 +0200191 if (!metadata[LOG_META_HOST].len) {
192 if (global.log_send_hostname)
193 metadata[LOG_META_HOST] = ist2(global.log_send_hostname, strlen(global.log_send_hostname));
Emeric Brun54648852020-07-06 15:54:06 +0200194 }
195
196 if (!metadata[LOG_META_TAG].len)
197 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
198
199 if (unlikely(curr_pid != getpid()))
200 metadata[LOG_META_PID].len = 0;
201
202 if (!metadata[LOG_META_PID].len) {
203 curr_pid = getpid();
204 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
205 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
206 }
207
208 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200209 return 0;
210 /* success! */
211 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
212 }
213 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200214}
215
Willy Tarreau9f830d72019-08-26 18:17:04 +0200216/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
217static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
218{
219 struct sink *sink;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200220 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200221
222 args++; // make args[1] the 1st arg
223
224 if (!*args[1]) {
225 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200226 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200227 list_for_each_entry(sink, &sink_list, sink_list) {
228 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
229 sink->name,
230 sink->type == SINK_TYPE_NEW ? "init" :
231 sink->type == SINK_TYPE_FD ? "fd" :
232 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
233 sink->ctx.dropped, sink->desc);
234 }
235
236 trash.area[trash.data] = 0;
237 return cli_msg(appctx, LOG_WARNING, trash.area);
238 }
239
240 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
241 return 1;
242
243 sink = sink_find(args[1]);
244 if (!sink)
245 return cli_err(appctx, "No such event sink");
246
247 if (sink->type != SINK_TYPE_BUFFER)
248 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
249
Willy Tarreau1d181e42019-08-30 11:17:01 +0200250 for (arg = 2; *args[arg]; arg++) {
251 if (strcmp(args[arg], "-w") == 0)
252 appctx->ctx.cli.i0 |= 1; // wait mode
253 else if (strcmp(args[arg], "-n") == 0)
254 appctx->ctx.cli.i0 |= 2; // seek to new
255 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
256 appctx->ctx.cli.i0 |= 3; // seek to new + wait
257 else
258 return cli_err(appctx, "unknown option");
259 }
Willy Tarreau9f830d72019-08-26 18:17:04 +0200260 return ring_attach_cli(sink->ctx.ring, appctx);
261}
262
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500263/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200264void sink_setup_proxy(struct proxy *px)
265{
266 px->last_change = now.tv_sec;
267 px->cap = PR_CAP_FE | PR_CAP_BE;
268 px->maxconn = 0;
269 px->conn_retries = 1;
270 px->timeout.server = TICK_ETERNITY;
271 px->timeout.client = TICK_ETERNITY;
272 px->timeout.connect = TICK_ETERNITY;
273 px->accept = NULL;
274 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
275 px->bind_proc = 0; /* will be filled by users */
276}
277
278/*
279 * IO Handler to handle message push to syslog tcp server
280 */
281static void sink_forward_io_handler(struct appctx *appctx)
282{
283 struct stream_interface *si = appctx->owner;
284 struct stream *s = si_strm(si);
285 struct sink *sink = strm_fe(s)->parent;
286 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
287 struct ring *ring = sink->ctx.ring;
288 struct buffer *buf = &ring->buf;
289 uint64_t msg_len;
290 size_t len, cnt, ofs;
291 int ret = 0;
292
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500293 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200294 if (unlikely(stopping))
295 goto close;
296
297 /* for rex because it seems reset to timeout
298 * and we don't want expire on this case
299 * with a syslog server
300 */
301 si_oc(si)->rex = TICK_ETERNITY;
302 /* rto should not change but it seems the case */
303 si_oc(si)->rto = TICK_ETERNITY;
304
305 /* an error was detected */
306 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
307 goto close;
308
309 /* con closed by server side */
310 if ((si_oc(si)->flags & CF_SHUTW))
311 goto close;
312
313 /* if the connection is not established, inform the stream that we want
314 * to be notified whenever the connection completes.
315 */
316 if (si_opposite(si)->state < SI_ST_EST) {
317 si_cant_get(si);
318 si_rx_conn_blk(si);
319 si_rx_endp_more(si);
320 return;
321 }
322
323 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
324 if (appctx != sft->appctx) {
325 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
326 goto close;
327 }
328 ofs = sft->ofs;
329
330 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
331 LIST_DEL_INIT(&appctx->wait_entry);
332 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
333
334 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
335
336 /* explanation for the initialization below: it would be better to do
337 * this in the parsing function but this would occasionally result in
338 * dropped events because we'd take a reference on the oldest message
339 * and keep it while being scheduled. Thus instead let's take it the
340 * first time we enter here so that we have a chance to pass many
341 * existing messages before grabbing a reference to a location. This
342 * value cannot be produced after initialization.
343 */
344 if (unlikely(ofs == ~0)) {
345 ofs = 0;
346
347 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
348 ofs += ring->ofs;
349 }
350
351 /* we were already there, adjust the offset to be relative to
352 * the buffer's head and remove us from the counter.
353 */
354 ofs -= ring->ofs;
355 BUG_ON(ofs >= buf->size);
356 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
357
358 /* in this loop, ofs always points to the counter byte that precedes
359 * the message so that we can take our reference there if we have to
360 * stop before the end (ret=0).
361 */
362 if (si_opposite(si)->state == SI_ST_EST) {
363 ret = 1;
364 while (ofs + 1 < b_data(buf)) {
365 cnt = 1;
366 len = b_peek_varint(buf, ofs + cnt, &msg_len);
367 if (!len)
368 break;
369 cnt += len;
370 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
371
372 if (unlikely(msg_len + 1 > b_size(&trash))) {
373 /* too large a message to ever fit, let's skip it */
374 ofs += cnt + msg_len;
375 continue;
376 }
377
378 chunk_reset(&trash);
379 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
380 trash.data += len;
381 trash.area[trash.data++] = '\n';
382
383 if (ci_putchk(si_ic(si), &trash) == -1) {
384 si_rx_room_blk(si);
385 ret = 0;
386 break;
387 }
388 ofs += cnt + msg_len;
389 }
390
391 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
392 ofs += ring->ofs;
393 sft->ofs = ofs;
394 }
395 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
396
397 if (ret) {
398 /* let's be woken up once new data arrive */
399 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
400 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
401 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
402 si_rx_endp_done(si);
403 }
404 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
405
406 /* always drain data from server */
407 co_skip(si_oc(si), si_oc(si)->output);
408 return;
409
410close:
411 si_shutw(si);
412 si_shutr(si);
413 si_ic(si)->flags |= CF_READ_NULL;
414}
415
Emeric Brun97556472020-05-30 01:42:45 +0200416/*
417 * IO Handler to handle message push to syslog tcp server
418 * using octet counting frames
419 */
420static void sink_forward_oc_io_handler(struct appctx *appctx)
421{
422 struct stream_interface *si = appctx->owner;
423 struct stream *s = si_strm(si);
424 struct sink *sink = strm_fe(s)->parent;
425 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
426 struct ring *ring = sink->ctx.ring;
427 struct buffer *buf = &ring->buf;
428 uint64_t msg_len;
429 size_t len, cnt, ofs;
430 int ret = 0;
431 char *p;
432
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500433 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200434 if (unlikely(stopping))
435 goto close;
436
437 /* for rex because it seems reset to timeout
438 * and we don't want expire on this case
439 * with a syslog server
440 */
441 si_oc(si)->rex = TICK_ETERNITY;
442 /* rto should not change but it seems the case */
443 si_oc(si)->rto = TICK_ETERNITY;
444
445 /* an error was detected */
446 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
447 goto close;
448
449 /* con closed by server side */
450 if ((si_oc(si)->flags & CF_SHUTW))
451 goto close;
452
453 /* if the connection is not established, inform the stream that we want
454 * to be notified whenever the connection completes.
455 */
456 if (si_opposite(si)->state < SI_ST_EST) {
457 si_cant_get(si);
458 si_rx_conn_blk(si);
459 si_rx_endp_more(si);
460 return;
461 }
462
463 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
464 if (appctx != sft->appctx) {
465 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
466 goto close;
467 }
468 ofs = sft->ofs;
469
470 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
471 LIST_DEL_INIT(&appctx->wait_entry);
472 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
473
474 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
475
476 /* explanation for the initialization below: it would be better to do
477 * this in the parsing function but this would occasionally result in
478 * dropped events because we'd take a reference on the oldest message
479 * and keep it while being scheduled. Thus instead let's take it the
480 * first time we enter here so that we have a chance to pass many
481 * existing messages before grabbing a reference to a location. This
482 * value cannot be produced after initialization.
483 */
484 if (unlikely(ofs == ~0)) {
485 ofs = 0;
486
487 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
488 ofs += ring->ofs;
489 }
490
491 /* we were already there, adjust the offset to be relative to
492 * the buffer's head and remove us from the counter.
493 */
494 ofs -= ring->ofs;
495 BUG_ON(ofs >= buf->size);
496 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
497
498 /* in this loop, ofs always points to the counter byte that precedes
499 * the message so that we can take our reference there if we have to
500 * stop before the end (ret=0).
501 */
502 if (si_opposite(si)->state == SI_ST_EST) {
503 ret = 1;
504 while (ofs + 1 < b_data(buf)) {
505 cnt = 1;
506 len = b_peek_varint(buf, ofs + cnt, &msg_len);
507 if (!len)
508 break;
509 cnt += len;
510 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
511
512 chunk_reset(&trash);
513 p = ulltoa(msg_len, trash.area, b_size(&trash));
514 if (p) {
515 trash.data = (p - trash.area) + 1;
516 *p = ' ';
517 }
518
519 if (!p || (trash.data + msg_len > b_size(&trash))) {
520 /* too large a message to ever fit, let's skip it */
521 ofs += cnt + msg_len;
522 continue;
523 }
524
525 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
526
527 if (ci_putchk(si_ic(si), &trash) == -1) {
528 si_rx_room_blk(si);
529 ret = 0;
530 break;
531 }
532 ofs += cnt + msg_len;
533 }
534
535 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
536 ofs += ring->ofs;
537 sft->ofs = ofs;
538 }
539 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
540
541 if (ret) {
542 /* let's be woken up once new data arrive */
543 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
544 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
545 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
546 si_rx_endp_done(si);
547 }
548 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
549
550 /* always drain data from server */
551 co_skip(si_oc(si), si_oc(si)->output);
552 return;
553
554close:
555 si_shutw(si);
556 si_shutr(si);
557 si_ic(si)->flags |= CF_READ_NULL;
558}
559
Emeric Brun494c5052020-05-28 11:13:15 +0200560void __sink_forward_session_deinit(struct sink_forward_target *sft)
561{
562 struct stream_interface *si;
563 struct stream *s;
564 struct sink *sink;
565
566 if (!sft->appctx)
567 return;
568
569 si = sft->appctx->owner;
570 if (!si)
571 return;
572
573 s = si_strm(si);
574 if (!s)
575 return;
576
577 sink = strm_fe(s)->parent;
578 if (!sink)
579 return;
580
581 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
582 LIST_DEL_INIT(&sft->appctx->wait_entry);
583 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
584
585 sft->appctx = NULL;
586 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
587}
588
589
590static void sink_forward_session_release(struct appctx *appctx)
591{
592 struct sink_forward_target *sft = appctx->ctx.peers.ptr;
593
594 if (!sft)
595 return;
596
597 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
598 if (sft->appctx == appctx)
599 __sink_forward_session_deinit(sft);
600 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
601}
602
603static struct applet sink_forward_applet = {
604 .obj_type = OBJ_TYPE_APPLET,
605 .name = "<SINKFWD>", /* used for logging */
606 .fct = sink_forward_io_handler,
607 .release = sink_forward_session_release,
608};
609
Emeric Brun97556472020-05-30 01:42:45 +0200610static struct applet sink_forward_oc_applet = {
611 .obj_type = OBJ_TYPE_APPLET,
612 .name = "<SINKFWDOC>", /* used for logging */
613 .fct = sink_forward_oc_io_handler,
614 .release = sink_forward_session_release,
615};
616
Emeric Brun494c5052020-05-28 11:13:15 +0200617/*
618 * Create a new peer session in assigned state (connect will start automatically)
619 */
620static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
621{
622 struct proxy *p = sink->forward_px;
623 struct appctx *appctx;
624 struct session *sess;
625 struct stream *s;
Emeric Brun97556472020-05-30 01:42:45 +0200626 struct applet *applet = &sink_forward_applet;
627
628 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
629 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200630
Emeric Brun97556472020-05-30 01:42:45 +0200631 appctx = appctx_new(applet, tid_bit);
Emeric Brun494c5052020-05-28 11:13:15 +0200632 if (!appctx)
633 goto out_close;
634
635 appctx->ctx.sft.ptr = (void *)sft;
636
637 sess = session_new(p, NULL, &appctx->obj_type);
638 if (!sess) {
639 ha_alert("out of memory in peer_session_create().\n");
640 goto out_free_appctx;
641 }
642
643 if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
644 ha_alert("Failed to initialize stream in peer_session_create().\n");
645 goto out_free_sess;
646 }
647
648
649 s->target = &sft->srv->obj_type;
Willy Tarreau9b7587a2020-10-15 07:32:10 +0200650 if (!sockaddr_alloc(&s->target_addr, &sft->srv->addr, sizeof(sft->srv->addr)))
Emeric Brun494c5052020-05-28 11:13:15 +0200651 goto out_free_strm;
Emeric Brun494c5052020-05-28 11:13:15 +0200652 s->flags = SF_ASSIGNED|SF_ADDR_SET;
653 s->si[1].flags |= SI_FL_NOLINGER;
654
655 s->do_log = NULL;
656 s->uniq_id = 0;
657
658 s->res.flags |= CF_READ_DONTWAIT;
659 /* for rto and rex to eternity to not expire on idle recv:
660 * We are using a syslog server.
661 */
662 s->res.rto = TICK_ETERNITY;
663 s->res.rex = TICK_ETERNITY;
664 sft->appctx = appctx;
665 task_wakeup(s->task, TASK_WOKEN_INIT);
666 return appctx;
667
668 /* Error unrolling */
669 out_free_strm:
670 LIST_DEL(&s->list);
671 pool_free(pool_head_stream, s);
672 out_free_sess:
673 session_free(sess);
674 out_free_appctx:
675 appctx_free(appctx);
676 out_close:
677 return NULL;
678}
679
680/*
681 * Task to handle connctions to forward servers
682 */
683static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
684{
685 struct sink *sink = (struct sink *)context;
686 struct sink_forward_target *sft = sink->sft;
687
688 task->expire = TICK_ETERNITY;
689
690 if (!stopping) {
691 while (sft) {
692 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
693 /* if appctx is NULL, start a new session */
694 if (!sft->appctx)
695 sft->appctx = sink_forward_session_create(sink, sft);
696 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
697 sft = sft->next;
698 }
699 }
700 else {
701 while (sft) {
702 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
703 /* awake applet to perform a clean close */
704 if (sft->appctx)
705 appctx_wakeup(sft->appctx);
706 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
707 sft = sft->next;
708 }
709 }
710
711 return task;
712}
713/*
714 * Init task to manage connctions to forward servers
715 *
716 * returns 0 in case of error.
717 */
718int sink_init_forward(struct sink *sink)
719{
720 sink->forward_task = task_new(MAX_THREADS_MASK);
721 if (!sink->forward_task)
722 return 0;
723
724 sink->forward_task->process = process_sink_forward;
725 sink->forward_task->context = (void *)sink;
726 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
727 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
728 return 1;
729}
Emeric Brun99c453d2020-05-25 15:01:04 +0200730/*
731 * Parse "ring" section and create corresponding sink buffer.
732 *
733 * The function returns 0 in success case, otherwise, it returns error
734 * flags.
735 */
736int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
737{
738 int err_code = 0;
739 const char *inv;
740 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200741 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200742
743 if (strcmp(args[0], "ring") == 0) { /* new peers section */
744 if (!*args[1]) {
745 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
746 err_code |= ERR_ALERT | ERR_FATAL;
747 goto err;
748 }
749
750 inv = invalid_char(args[1]);
751 if (inv) {
752 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
753 err_code |= ERR_ALERT | ERR_FATAL;
754 goto err;
755 }
756
757 if (sink_find(args[1])) {
758 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
759 err_code |= ERR_ALERT | ERR_FATAL;
760 goto err;
761 }
762
Emeric Brun54648852020-07-06 15:54:06 +0200763 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200764 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
765 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
766 err_code |= ERR_ALERT | ERR_FATAL;
767 goto err;
768 }
Emeric Brun494c5052020-05-28 11:13:15 +0200769
770 /* allocate new proxy to handle forwards */
771 p = calloc(1, sizeof *p);
772 if (!p) {
773 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
774 err_code |= ERR_ALERT | ERR_FATAL;
775 goto err;
776 }
777
778 init_new_proxy(p);
779 sink_setup_proxy(p);
780 p->parent = cfg_sink;
781 p->id = strdup(args[1]);
782 p->conf.args.file = p->conf.file = strdup(file);
783 p->conf.args.line = p->conf.line = linenum;
784 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200785 }
786 else if (strcmp(args[0], "size") == 0) {
787 size = atol(args[1]);
788 if (!size) {
789 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
790 err_code |= ERR_ALERT | ERR_FATAL;
791 goto err;
792 }
793
794 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
795 || !ring_resize(cfg_sink->ctx.ring, size)) {
796 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
797 err_code |= ERR_ALERT | ERR_FATAL;
798 goto err;
799 }
800 }
Emeric Brun494c5052020-05-28 11:13:15 +0200801 else if (strcmp(args[0],"server") == 0) {
Emeric Brund3db3842020-07-21 16:54:36 +0200802 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0, 1);
Emeric Brun494c5052020-05-28 11:13:15 +0200803 }
804 else if (strcmp(args[0],"timeout") == 0) {
805 if (!cfg_sink || !cfg_sink->forward_px) {
806 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
807 err_code |= ERR_ALERT | ERR_FATAL;
808 goto err;
809 }
810
811 if (strcmp(args[1], "connect") == 0 ||
812 strcmp(args[1], "server") == 0) {
813 const char *res;
814 unsigned int tout;
815
816 if (!*args[2]) {
817 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
818 file, linenum, args[0], args[1]);
819 err_code |= ERR_ALERT | ERR_FATAL;
820 goto err;
821 }
822 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
823 if (res == PARSE_TIME_OVER) {
824 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
825 file, linenum, args[2], args[0], args[1]);
826 err_code |= ERR_ALERT | ERR_FATAL;
827 goto err;
828 }
829 else if (res == PARSE_TIME_UNDER) {
830 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
831 file, linenum, args[2], args[0], args[1]);
832 err_code |= ERR_ALERT | ERR_FATAL;
833 goto err;
834 }
835 else if (res) {
836 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
837 file, linenum, *res, args[0], args[1]);
838 err_code |= ERR_ALERT | ERR_FATAL;
839 goto err;
840 }
841 if (args[1][2] == 'c')
842 cfg_sink->forward_px->timeout.connect = tout;
843 else
844 cfg_sink->forward_px->timeout.server = tout;
845 }
846 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200847 else if (strcmp(args[0],"format") == 0) {
848 if (!cfg_sink) {
849 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
850 err_code |= ERR_ALERT | ERR_FATAL;
851 goto err;
852 }
853
Emeric Brun54648852020-07-06 15:54:06 +0200854 cfg_sink->fmt = get_log_format(args[1]);
855 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +0200856 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
857 err_code |= ERR_ALERT | ERR_FATAL;
858 goto err;
859 }
860 }
861 else if (strcmp(args[0],"maxlen") == 0) {
862 if (!cfg_sink) {
863 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
864 err_code |= ERR_ALERT | ERR_FATAL;
865 goto err;
866 }
867
868 cfg_sink->maxlen = atol(args[1]);
869 if (!cfg_sink->maxlen) {
870 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
871 err_code |= ERR_ALERT | ERR_FATAL;
872 goto err;
873 }
874 }
875 else if (strcmp(args[0],"description") == 0) {
876 if (!cfg_sink) {
877 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
878 err_code |= ERR_ALERT | ERR_FATAL;
879 goto err;
880 }
881
882 if (!*args[1]) {
883 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
884 err_code |= ERR_ALERT | ERR_FATAL;
885 goto err;
886 }
887
888 free(cfg_sink->desc);
889
890 cfg_sink->desc = strdup(args[1]);
891 if (!cfg_sink->desc) {
892 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
893 err_code |= ERR_ALERT | ERR_FATAL;
894 goto err;
895 }
896 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200897 else {
898 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
899 err_code |= ERR_ALERT | ERR_FATAL;
900 goto err;
901 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200902
903err:
904 return err_code;
905}
906
907/*
908 * Post parsing "ring" section.
909 *
910 * The function returns 0 in success case, otherwise, it returns error
911 * flags.
912 */
913int cfg_post_parse_ring()
914{
915 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +0200916 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +0200917
918 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
919 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
920 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +0200921 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +0200922 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
923 err_code |= ERR_ALERT;
924 }
Emeric Brun494c5052020-05-28 11:13:15 +0200925
926 /* prepare forward server descriptors */
927 if (cfg_sink->forward_px) {
928 srv = cfg_sink->forward_px->srv;
929 while (srv) {
930 struct sink_forward_target *sft;
931 /* init ssl if needed */
932 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
933 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
934 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
935 err_code |= ERR_ALERT | ERR_FATAL;
936 }
937 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200938
Emeric Brun494c5052020-05-28 11:13:15 +0200939 /* allocate sink_forward_target descriptor */
940 sft = calloc(1, sizeof(*sft));
941 if (!sft) {
942 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
943 err_code |= ERR_ALERT | ERR_FATAL;
944 break;
945 }
946 sft->srv = srv;
947 sft->appctx = NULL;
948 sft->ofs = ~0; /* init ring offset */
949 sft->next = cfg_sink->sft;
950 HA_SPIN_INIT(&sft->lock);
951
952 /* mark server attached to the ring */
953 if (!ring_attach(cfg_sink->ctx.ring)) {
954 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
955 err_code |= ERR_ALERT | ERR_FATAL;
956 }
957 cfg_sink->sft = sft;
958 srv = srv->next;
959 }
960 sink_init_forward(cfg_sink);
961 }
962 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200963 cfg_sink = NULL;
964
965 return err_code;
966}
967
968/* resolve sink names at end of config. Returns 0 on success otherwise error
969 * flags.
970*/
971int post_sink_resolve()
972{
973 int err_code = 0;
974 struct logsrv *logsrv, *logb;
975 struct sink *sink;
976 struct proxy *px;
977
978 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
979 if (logsrv->type == LOG_TARGET_BUFFER) {
980 sink = sink_find(logsrv->ring_name);
981 if (!sink || sink->type != SINK_TYPE_BUFFER) {
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500982 ha_alert("global log server uses unknown ring named '%s'.\n", logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200983 err_code |= ERR_ALERT | ERR_FATAL;
984 }
985 logsrv->sink = sink;
986 }
987 }
988
989 for (px = proxies_list; px; px = px->next) {
990 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
991 if (logsrv->type == LOG_TARGET_BUFFER) {
992 sink = sink_find(logsrv->ring_name);
993 if (!sink || sink->type != SINK_TYPE_BUFFER) {
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500994 ha_alert("proxy '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200995 err_code |= ERR_ALERT | ERR_FATAL;
996 }
997 logsrv->sink = sink;
998 }
999 }
1000 }
Emeric Brun12941c82020-07-07 14:19:42 +02001001
1002 for (px = cfg_log_forward; px; px = px->next) {
1003 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1004 if (logsrv->type == LOG_TARGET_BUFFER) {
1005 sink = sink_find(logsrv->ring_name);
1006 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1007 ha_alert("log-forward '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name);
1008 err_code |= ERR_ALERT | ERR_FATAL;
1009 }
1010 logsrv->sink = sink;
1011 }
1012 }
1013 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001014 return err_code;
1015}
1016
1017
Willy Tarreau973e6622019-08-20 11:57:52 +02001018static void sink_init()
1019{
Emeric Brun54648852020-07-06 15:54:06 +02001020 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1021 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1022 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001023}
1024
1025static void sink_deinit()
1026{
1027 struct sink *sink, *sb;
1028
1029 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1030 if (sink->type == SINK_TYPE_BUFFER)
1031 ring_free(sink->ctx.ring);
1032 LIST_DEL(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001033 free(sink->name);
1034 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001035 free(sink);
1036 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001037}
1038
1039INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001040REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001041
Willy Tarreau9f830d72019-08-26 18:17:04 +02001042static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaufcf94982019-11-15 15:07:21 +01001043 { { "show", "events", NULL }, "show events [<sink>] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001044 {{},}
1045}};
1046
1047INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1048
Emeric Brun99c453d2020-05-25 15:01:04 +02001049/* config parsers for this section */
1050REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1051REGISTER_POST_CHECK(post_sink_resolve);
1052
Willy Tarreau67b5a162019-08-11 16:38:56 +02001053/*
1054 * Local variables:
1055 * c-indent-level: 8
1056 * c-basic-offset: 8
1057 * End:
1058 */