blob: 70e5be9a4d5f9ea4c51a1fe6144c90ee6de82e7e [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau36979d92020-06-05 17:27:29 +020021#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020023#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020024#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020025#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020026#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020027#include <haproxy/log.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020028#include <haproxy/ring.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020029#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020030#include <haproxy/sink.h>
Willy Tarreau5e539c92020-06-04 20:45:39 +020031#include <haproxy/stream_interface.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020032#include <haproxy/time.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020033
34struct list sink_list = LIST_HEAD_INIT(sink_list);
35
Emeric Brun99c453d2020-05-25 15:01:04 +020036struct sink *cfg_sink;
37
Willy Tarreau67b5a162019-08-11 16:38:56 +020038struct sink *sink_find(const char *name)
39{
40 struct sink *sink;
41
42 list_for_each_entry(sink, &sink_list, sink_list)
43 if (strcmp(sink->name, name) == 0)
44 return sink;
45 return NULL;
46}
47
48/* creates a new sink and adds it to the list, it's still generic and not fully
49 * initialized. Returns NULL on allocation failure. If another one already
50 * exists with the same name, it will be returned. The caller can detect it as
51 * a newly created one has type SINK_TYPE_NEW.
52 */
Emeric Brun54648852020-07-06 15:54:06 +020053static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020054{
55 struct sink *sink;
56
57 sink = sink_find(name);
58 if (sink)
59 goto end;
60
Emeric Brun494c5052020-05-28 11:13:15 +020061 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020062 if (!sink)
63 goto end;
64
Emeric Brun99c453d2020-05-25 15:01:04 +020065 sink->name = strdup(name);
Tim Duesterhusa9a012d2021-01-03 19:54:11 +010066 if (!sink->name)
67 goto err;
68
Emeric Brun99c453d2020-05-25 15:01:04 +020069 sink->desc = strdup(desc);
Tim Duesterhusa9a012d2021-01-03 19:54:11 +010070 if (!sink->desc)
71 goto err;
72
Willy Tarreau67b5a162019-08-11 16:38:56 +020073 sink->fmt = fmt;
74 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010075 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020076 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020077 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020078 sink->ctx.dropped = 0;
79 HA_RWLOCK_INIT(&sink->ctx.lock);
80 LIST_ADDQ(&sink_list, &sink->sink_list);
81 end:
82 return sink;
Tim Duesterhusa9a012d2021-01-03 19:54:11 +010083
84 err:
85 free(sink->name); sink->name = NULL;
86 free(sink->desc); sink->desc = NULL;
87 free(sink); sink = NULL;
88
89 return NULL;
Willy Tarreau67b5a162019-08-11 16:38:56 +020090}
91
Willy Tarreau973e6622019-08-20 11:57:52 +020092/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
93 * and description <desc>. Returns NULL on allocation failure or conflict.
94 * Perfect duplicates are merged (same type, fd, and name).
95 */
Emeric Brun54648852020-07-06 15:54:06 +020096struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +020097{
98 struct sink *sink;
99
100 sink = __sink_new(name, desc, fmt);
101 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
102 goto end;
103
104 if (sink->type != SINK_TYPE_NEW) {
105 sink = NULL;
106 goto end;
107 }
108
109 sink->type = SINK_TYPE_FD;
110 sink->ctx.fd = fd;
111 end:
112 return sink;
113}
114
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200115/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
116 * and description <desc>. Returns NULL on allocation failure or conflict.
117 * Perfect duplicates are merged (same type and name). If sizes differ, the
118 * largest one is kept.
119 */
Emeric Brun54648852020-07-06 15:54:06 +0200120struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200121{
122 struct sink *sink;
123
124 sink = __sink_new(name, desc, fmt);
125 if (!sink)
126 goto fail;
127
128 if (sink->type == SINK_TYPE_BUFFER) {
129 /* such a buffer already exists, we may have to resize it */
130 if (!ring_resize(sink->ctx.ring, size))
131 goto fail;
132 goto end;
133 }
134
135 if (sink->type != SINK_TYPE_NEW) {
136 /* already exists of another type */
137 goto fail;
138 }
139
140 sink->ctx.ring = ring_new(size);
141 if (!sink->ctx.ring) {
142 LIST_DEL(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200143 free(sink->name);
144 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200145 free(sink);
146 goto fail;
147 }
148
149 sink->type = SINK_TYPE_BUFFER;
150 end:
151 return sink;
152 fail:
153 return NULL;
154}
155
Willy Tarreau67b5a162019-08-11 16:38:56 +0200156/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500157 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200158 * done here. Lost messages are NOT accounted for. It is preferable to call
159 * sink_write() instead which will also try to emit the number of dropped
160 * messages when there are any. It returns >0 if it could write anything,
161 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200162 */
Emeric Brun54648852020-07-06 15:54:06 +0200163 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
164 int level, int facility, struct ist *metadata)
165 {
166 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200167 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200168
Emeric Brun54648852020-07-06 15:54:06 +0200169 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200170 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200171
Emeric Brun54648852020-07-06 15:54:06 +0200172 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200173
174send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200175 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200176 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200177 }
178 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200179 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200180 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200181 return 0;
182}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200183
Willy Tarreau8f240232019-08-27 16:41:06 +0200184/* Tries to emit a message indicating the number of dropped events. In case of
185 * success, the amount of drops is reduced by as much. It's supposed to be
186 * called under an exclusive lock on the sink to avoid multiple produces doing
187 * the same. On success, >0 is returned, otherwise <=0 on failure.
188 */
Emeric Brun54648852020-07-06 15:54:06 +0200189int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200190{
Emeric Brun54648852020-07-06 15:54:06 +0200191 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
192 static THREAD_LOCAL pid_t curr_pid;
193 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200194 unsigned int dropped;
195 struct buffer msg;
196 struct ist msgvec[1];
197 char logbuf[64];
198
199 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
200 chunk_init(&msg, logbuf, sizeof(logbuf));
201 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
202 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200203
Emeric Brun54648852020-07-06 15:54:06 +0200204 if (!metadata[LOG_META_HOST].len) {
205 if (global.log_send_hostname)
206 metadata[LOG_META_HOST] = ist2(global.log_send_hostname, strlen(global.log_send_hostname));
Emeric Brun54648852020-07-06 15:54:06 +0200207 }
208
209 if (!metadata[LOG_META_TAG].len)
210 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
211
212 if (unlikely(curr_pid != getpid()))
213 metadata[LOG_META_PID].len = 0;
214
215 if (!metadata[LOG_META_PID].len) {
216 curr_pid = getpid();
217 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
218 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
219 }
220
221 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200222 return 0;
223 /* success! */
224 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
225 }
226 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200227}
228
Willy Tarreau9f830d72019-08-26 18:17:04 +0200229/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
230static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
231{
232 struct sink *sink;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200233 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200234
235 args++; // make args[1] the 1st arg
236
237 if (!*args[1]) {
238 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200239 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200240 list_for_each_entry(sink, &sink_list, sink_list) {
241 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
242 sink->name,
243 sink->type == SINK_TYPE_NEW ? "init" :
244 sink->type == SINK_TYPE_FD ? "fd" :
245 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
246 sink->ctx.dropped, sink->desc);
247 }
248
249 trash.area[trash.data] = 0;
250 return cli_msg(appctx, LOG_WARNING, trash.area);
251 }
252
253 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
254 return 1;
255
256 sink = sink_find(args[1]);
257 if (!sink)
258 return cli_err(appctx, "No such event sink");
259
260 if (sink->type != SINK_TYPE_BUFFER)
261 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
262
Willy Tarreau1d181e42019-08-30 11:17:01 +0200263 for (arg = 2; *args[arg]; arg++) {
264 if (strcmp(args[arg], "-w") == 0)
265 appctx->ctx.cli.i0 |= 1; // wait mode
266 else if (strcmp(args[arg], "-n") == 0)
267 appctx->ctx.cli.i0 |= 2; // seek to new
268 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
269 appctx->ctx.cli.i0 |= 3; // seek to new + wait
270 else
271 return cli_err(appctx, "unknown option");
272 }
Willy Tarreau9f830d72019-08-26 18:17:04 +0200273 return ring_attach_cli(sink->ctx.ring, appctx);
274}
275
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500276/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200277void sink_setup_proxy(struct proxy *px)
278{
279 px->last_change = now.tv_sec;
280 px->cap = PR_CAP_FE | PR_CAP_BE;
281 px->maxconn = 0;
282 px->conn_retries = 1;
283 px->timeout.server = TICK_ETERNITY;
284 px->timeout.client = TICK_ETERNITY;
285 px->timeout.connect = TICK_ETERNITY;
286 px->accept = NULL;
287 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
288 px->bind_proc = 0; /* will be filled by users */
289}
290
291/*
292 * IO Handler to handle message push to syslog tcp server
293 */
294static void sink_forward_io_handler(struct appctx *appctx)
295{
296 struct stream_interface *si = appctx->owner;
297 struct stream *s = si_strm(si);
298 struct sink *sink = strm_fe(s)->parent;
299 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
300 struct ring *ring = sink->ctx.ring;
301 struct buffer *buf = &ring->buf;
302 uint64_t msg_len;
303 size_t len, cnt, ofs;
304 int ret = 0;
305
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500306 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200307 if (unlikely(stopping))
308 goto close;
309
310 /* for rex because it seems reset to timeout
311 * and we don't want expire on this case
312 * with a syslog server
313 */
314 si_oc(si)->rex = TICK_ETERNITY;
315 /* rto should not change but it seems the case */
316 si_oc(si)->rto = TICK_ETERNITY;
317
318 /* an error was detected */
319 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
320 goto close;
321
322 /* con closed by server side */
323 if ((si_oc(si)->flags & CF_SHUTW))
324 goto close;
325
326 /* if the connection is not established, inform the stream that we want
327 * to be notified whenever the connection completes.
328 */
329 if (si_opposite(si)->state < SI_ST_EST) {
330 si_cant_get(si);
331 si_rx_conn_blk(si);
332 si_rx_endp_more(si);
333 return;
334 }
335
336 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
337 if (appctx != sft->appctx) {
338 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
339 goto close;
340 }
341 ofs = sft->ofs;
342
343 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
344 LIST_DEL_INIT(&appctx->wait_entry);
345 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
346
347 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
348
349 /* explanation for the initialization below: it would be better to do
350 * this in the parsing function but this would occasionally result in
351 * dropped events because we'd take a reference on the oldest message
352 * and keep it while being scheduled. Thus instead let's take it the
353 * first time we enter here so that we have a chance to pass many
354 * existing messages before grabbing a reference to a location. This
355 * value cannot be produced after initialization.
356 */
357 if (unlikely(ofs == ~0)) {
358 ofs = 0;
359
360 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
361 ofs += ring->ofs;
362 }
363
Emeric Brun494c5052020-05-28 11:13:15 +0200364 /* in this loop, ofs always points to the counter byte that precedes
365 * the message so that we can take our reference there if we have to
366 * stop before the end (ret=0).
367 */
368 if (si_opposite(si)->state == SI_ST_EST) {
Emeric Brun6b9c2b32020-12-02 17:02:09 +0100369 /* we were already there, adjust the offset to be relative to
370 * the buffer's head and remove us from the counter.
371 */
372 ofs -= ring->ofs;
373 BUG_ON(ofs >= buf->size);
374 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
375
Emeric Brun494c5052020-05-28 11:13:15 +0200376 ret = 1;
377 while (ofs + 1 < b_data(buf)) {
378 cnt = 1;
379 len = b_peek_varint(buf, ofs + cnt, &msg_len);
380 if (!len)
381 break;
382 cnt += len;
383 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
384
385 if (unlikely(msg_len + 1 > b_size(&trash))) {
386 /* too large a message to ever fit, let's skip it */
387 ofs += cnt + msg_len;
388 continue;
389 }
390
391 chunk_reset(&trash);
392 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
393 trash.data += len;
394 trash.area[trash.data++] = '\n';
395
396 if (ci_putchk(si_ic(si), &trash) == -1) {
397 si_rx_room_blk(si);
398 ret = 0;
399 break;
400 }
401 ofs += cnt + msg_len;
402 }
403
404 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
405 ofs += ring->ofs;
406 sft->ofs = ofs;
407 }
408 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
409
410 if (ret) {
411 /* let's be woken up once new data arrive */
412 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
413 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
414 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
415 si_rx_endp_done(si);
416 }
417 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
418
419 /* always drain data from server */
420 co_skip(si_oc(si), si_oc(si)->output);
421 return;
422
423close:
424 si_shutw(si);
425 si_shutr(si);
426 si_ic(si)->flags |= CF_READ_NULL;
427}
428
Emeric Brun97556472020-05-30 01:42:45 +0200429/*
430 * IO Handler to handle message push to syslog tcp server
431 * using octet counting frames
432 */
433static void sink_forward_oc_io_handler(struct appctx *appctx)
434{
435 struct stream_interface *si = appctx->owner;
436 struct stream *s = si_strm(si);
437 struct sink *sink = strm_fe(s)->parent;
438 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
439 struct ring *ring = sink->ctx.ring;
440 struct buffer *buf = &ring->buf;
441 uint64_t msg_len;
442 size_t len, cnt, ofs;
443 int ret = 0;
444 char *p;
445
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500446 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200447 if (unlikely(stopping))
448 goto close;
449
450 /* for rex because it seems reset to timeout
451 * and we don't want expire on this case
452 * with a syslog server
453 */
454 si_oc(si)->rex = TICK_ETERNITY;
455 /* rto should not change but it seems the case */
456 si_oc(si)->rto = TICK_ETERNITY;
457
458 /* an error was detected */
459 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
460 goto close;
461
462 /* con closed by server side */
463 if ((si_oc(si)->flags & CF_SHUTW))
464 goto close;
465
466 /* if the connection is not established, inform the stream that we want
467 * to be notified whenever the connection completes.
468 */
469 if (si_opposite(si)->state < SI_ST_EST) {
470 si_cant_get(si);
471 si_rx_conn_blk(si);
472 si_rx_endp_more(si);
473 return;
474 }
475
476 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
477 if (appctx != sft->appctx) {
478 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
479 goto close;
480 }
481 ofs = sft->ofs;
482
483 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
484 LIST_DEL_INIT(&appctx->wait_entry);
485 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
486
487 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
488
489 /* explanation for the initialization below: it would be better to do
490 * this in the parsing function but this would occasionally result in
491 * dropped events because we'd take a reference on the oldest message
492 * and keep it while being scheduled. Thus instead let's take it the
493 * first time we enter here so that we have a chance to pass many
494 * existing messages before grabbing a reference to a location. This
495 * value cannot be produced after initialization.
496 */
497 if (unlikely(ofs == ~0)) {
498 ofs = 0;
499
500 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
501 ofs += ring->ofs;
502 }
503
Emeric Brun97556472020-05-30 01:42:45 +0200504 /* in this loop, ofs always points to the counter byte that precedes
505 * the message so that we can take our reference there if we have to
506 * stop before the end (ret=0).
507 */
508 if (si_opposite(si)->state == SI_ST_EST) {
Emeric Brun6b9c2b32020-12-02 17:02:09 +0100509 /* we were already there, adjust the offset to be relative to
510 * the buffer's head and remove us from the counter.
511 */
512 ofs -= ring->ofs;
513 BUG_ON(ofs >= buf->size);
514 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
515
Emeric Brun97556472020-05-30 01:42:45 +0200516 ret = 1;
517 while (ofs + 1 < b_data(buf)) {
518 cnt = 1;
519 len = b_peek_varint(buf, ofs + cnt, &msg_len);
520 if (!len)
521 break;
522 cnt += len;
523 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
524
525 chunk_reset(&trash);
526 p = ulltoa(msg_len, trash.area, b_size(&trash));
527 if (p) {
528 trash.data = (p - trash.area) + 1;
529 *p = ' ';
530 }
531
532 if (!p || (trash.data + msg_len > b_size(&trash))) {
533 /* too large a message to ever fit, let's skip it */
534 ofs += cnt + msg_len;
535 continue;
536 }
537
538 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
539
540 if (ci_putchk(si_ic(si), &trash) == -1) {
541 si_rx_room_blk(si);
542 ret = 0;
543 break;
544 }
545 ofs += cnt + msg_len;
546 }
547
548 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
549 ofs += ring->ofs;
550 sft->ofs = ofs;
551 }
552 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
553
554 if (ret) {
555 /* let's be woken up once new data arrive */
556 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
557 LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
558 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
559 si_rx_endp_done(si);
560 }
561 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
562
563 /* always drain data from server */
564 co_skip(si_oc(si), si_oc(si)->output);
565 return;
566
567close:
568 si_shutw(si);
569 si_shutr(si);
570 si_ic(si)->flags |= CF_READ_NULL;
571}
572
Emeric Brun494c5052020-05-28 11:13:15 +0200573void __sink_forward_session_deinit(struct sink_forward_target *sft)
574{
575 struct stream_interface *si;
576 struct stream *s;
577 struct sink *sink;
578
579 if (!sft->appctx)
580 return;
581
582 si = sft->appctx->owner;
583 if (!si)
584 return;
585
586 s = si_strm(si);
587 if (!s)
588 return;
589
590 sink = strm_fe(s)->parent;
591 if (!sink)
592 return;
593
594 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
595 LIST_DEL_INIT(&sft->appctx->wait_entry);
596 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
597
598 sft->appctx = NULL;
599 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
600}
601
602
603static void sink_forward_session_release(struct appctx *appctx)
604{
605 struct sink_forward_target *sft = appctx->ctx.peers.ptr;
606
607 if (!sft)
608 return;
609
610 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
611 if (sft->appctx == appctx)
612 __sink_forward_session_deinit(sft);
613 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
614}
615
616static struct applet sink_forward_applet = {
617 .obj_type = OBJ_TYPE_APPLET,
618 .name = "<SINKFWD>", /* used for logging */
619 .fct = sink_forward_io_handler,
620 .release = sink_forward_session_release,
621};
622
Emeric Brun97556472020-05-30 01:42:45 +0200623static struct applet sink_forward_oc_applet = {
624 .obj_type = OBJ_TYPE_APPLET,
625 .name = "<SINKFWDOC>", /* used for logging */
626 .fct = sink_forward_oc_io_handler,
627 .release = sink_forward_session_release,
628};
629
Emeric Brun494c5052020-05-28 11:13:15 +0200630/*
631 * Create a new peer session in assigned state (connect will start automatically)
632 */
633static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
634{
635 struct proxy *p = sink->forward_px;
636 struct appctx *appctx;
637 struct session *sess;
638 struct stream *s;
Emeric Brun97556472020-05-30 01:42:45 +0200639 struct applet *applet = &sink_forward_applet;
640
641 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
642 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200643
Emeric Brun97556472020-05-30 01:42:45 +0200644 appctx = appctx_new(applet, tid_bit);
Emeric Brun494c5052020-05-28 11:13:15 +0200645 if (!appctx)
646 goto out_close;
647
648 appctx->ctx.sft.ptr = (void *)sft;
649
650 sess = session_new(p, NULL, &appctx->obj_type);
651 if (!sess) {
652 ha_alert("out of memory in peer_session_create().\n");
653 goto out_free_appctx;
654 }
655
656 if ((s = stream_new(sess, &appctx->obj_type)) == NULL) {
657 ha_alert("Failed to initialize stream in peer_session_create().\n");
658 goto out_free_sess;
659 }
660
661
662 s->target = &sft->srv->obj_type;
Willy Tarreau9b7587a2020-10-15 07:32:10 +0200663 if (!sockaddr_alloc(&s->target_addr, &sft->srv->addr, sizeof(sft->srv->addr)))
Emeric Brun494c5052020-05-28 11:13:15 +0200664 goto out_free_strm;
Emeric Brun494c5052020-05-28 11:13:15 +0200665 s->flags = SF_ASSIGNED|SF_ADDR_SET;
666 s->si[1].flags |= SI_FL_NOLINGER;
667
668 s->do_log = NULL;
669 s->uniq_id = 0;
670
671 s->res.flags |= CF_READ_DONTWAIT;
672 /* for rto and rex to eternity to not expire on idle recv:
673 * We are using a syslog server.
674 */
675 s->res.rto = TICK_ETERNITY;
676 s->res.rex = TICK_ETERNITY;
677 sft->appctx = appctx;
678 task_wakeup(s->task, TASK_WOKEN_INIT);
679 return appctx;
680
681 /* Error unrolling */
682 out_free_strm:
683 LIST_DEL(&s->list);
684 pool_free(pool_head_stream, s);
685 out_free_sess:
686 session_free(sess);
687 out_free_appctx:
688 appctx_free(appctx);
689 out_close:
690 return NULL;
691}
692
693/*
694 * Task to handle connctions to forward servers
695 */
696static struct task *process_sink_forward(struct task * task, void *context, unsigned short state)
697{
698 struct sink *sink = (struct sink *)context;
699 struct sink_forward_target *sft = sink->sft;
700
701 task->expire = TICK_ETERNITY;
702
703 if (!stopping) {
704 while (sft) {
705 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
706 /* if appctx is NULL, start a new session */
707 if (!sft->appctx)
708 sft->appctx = sink_forward_session_create(sink, sft);
709 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
710 sft = sft->next;
711 }
712 }
713 else {
714 while (sft) {
715 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
716 /* awake applet to perform a clean close */
717 if (sft->appctx)
718 appctx_wakeup(sft->appctx);
719 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
720 sft = sft->next;
721 }
722 }
723
724 return task;
725}
726/*
727 * Init task to manage connctions to forward servers
728 *
729 * returns 0 in case of error.
730 */
731int sink_init_forward(struct sink *sink)
732{
733 sink->forward_task = task_new(MAX_THREADS_MASK);
734 if (!sink->forward_task)
735 return 0;
736
737 sink->forward_task->process = process_sink_forward;
738 sink->forward_task->context = (void *)sink;
739 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
740 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
741 return 1;
742}
Emeric Brun99c453d2020-05-25 15:01:04 +0200743/*
744 * Parse "ring" section and create corresponding sink buffer.
745 *
746 * The function returns 0 in success case, otherwise, it returns error
747 * flags.
748 */
749int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
750{
751 int err_code = 0;
752 const char *inv;
753 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200754 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200755
756 if (strcmp(args[0], "ring") == 0) { /* new peers section */
757 if (!*args[1]) {
758 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
759 err_code |= ERR_ALERT | ERR_FATAL;
760 goto err;
761 }
762
763 inv = invalid_char(args[1]);
764 if (inv) {
765 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
766 err_code |= ERR_ALERT | ERR_FATAL;
767 goto err;
768 }
769
770 if (sink_find(args[1])) {
771 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
772 err_code |= ERR_ALERT | ERR_FATAL;
773 goto err;
774 }
775
Emeric Brun54648852020-07-06 15:54:06 +0200776 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200777 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
778 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
779 err_code |= ERR_ALERT | ERR_FATAL;
780 goto err;
781 }
Emeric Brun494c5052020-05-28 11:13:15 +0200782
783 /* allocate new proxy to handle forwards */
784 p = calloc(1, sizeof *p);
785 if (!p) {
786 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
787 err_code |= ERR_ALERT | ERR_FATAL;
788 goto err;
789 }
790
791 init_new_proxy(p);
792 sink_setup_proxy(p);
793 p->parent = cfg_sink;
794 p->id = strdup(args[1]);
795 p->conf.args.file = p->conf.file = strdup(file);
796 p->conf.args.line = p->conf.line = linenum;
797 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200798 }
799 else if (strcmp(args[0], "size") == 0) {
800 size = atol(args[1]);
801 if (!size) {
802 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
803 err_code |= ERR_ALERT | ERR_FATAL;
804 goto err;
805 }
806
807 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
808 || !ring_resize(cfg_sink->ctx.ring, size)) {
809 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
810 err_code |= ERR_ALERT | ERR_FATAL;
811 goto err;
812 }
813 }
Emeric Brun494c5052020-05-28 11:13:15 +0200814 else if (strcmp(args[0],"server") == 0) {
Emeric Brund3db3842020-07-21 16:54:36 +0200815 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0, 1);
Emeric Brun494c5052020-05-28 11:13:15 +0200816 }
817 else if (strcmp(args[0],"timeout") == 0) {
818 if (!cfg_sink || !cfg_sink->forward_px) {
819 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
820 err_code |= ERR_ALERT | ERR_FATAL;
821 goto err;
822 }
823
824 if (strcmp(args[1], "connect") == 0 ||
825 strcmp(args[1], "server") == 0) {
826 const char *res;
827 unsigned int tout;
828
829 if (!*args[2]) {
830 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
831 file, linenum, args[0], args[1]);
832 err_code |= ERR_ALERT | ERR_FATAL;
833 goto err;
834 }
835 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
836 if (res == PARSE_TIME_OVER) {
837 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
838 file, linenum, args[2], args[0], args[1]);
839 err_code |= ERR_ALERT | ERR_FATAL;
840 goto err;
841 }
842 else if (res == PARSE_TIME_UNDER) {
843 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
844 file, linenum, args[2], args[0], args[1]);
845 err_code |= ERR_ALERT | ERR_FATAL;
846 goto err;
847 }
848 else if (res) {
849 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
850 file, linenum, *res, args[0], args[1]);
851 err_code |= ERR_ALERT | ERR_FATAL;
852 goto err;
853 }
854 if (args[1][2] == 'c')
855 cfg_sink->forward_px->timeout.connect = tout;
856 else
857 cfg_sink->forward_px->timeout.server = tout;
858 }
859 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200860 else if (strcmp(args[0],"format") == 0) {
861 if (!cfg_sink) {
862 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
863 err_code |= ERR_ALERT | ERR_FATAL;
864 goto err;
865 }
866
Emeric Brun54648852020-07-06 15:54:06 +0200867 cfg_sink->fmt = get_log_format(args[1]);
868 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +0200869 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
870 err_code |= ERR_ALERT | ERR_FATAL;
871 goto err;
872 }
873 }
874 else if (strcmp(args[0],"maxlen") == 0) {
875 if (!cfg_sink) {
876 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
877 err_code |= ERR_ALERT | ERR_FATAL;
878 goto err;
879 }
880
881 cfg_sink->maxlen = atol(args[1]);
882 if (!cfg_sink->maxlen) {
883 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
884 err_code |= ERR_ALERT | ERR_FATAL;
885 goto err;
886 }
887 }
888 else if (strcmp(args[0],"description") == 0) {
889 if (!cfg_sink) {
890 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
891 err_code |= ERR_ALERT | ERR_FATAL;
892 goto err;
893 }
894
895 if (!*args[1]) {
896 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
897 err_code |= ERR_ALERT | ERR_FATAL;
898 goto err;
899 }
900
901 free(cfg_sink->desc);
902
903 cfg_sink->desc = strdup(args[1]);
904 if (!cfg_sink->desc) {
905 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
906 err_code |= ERR_ALERT | ERR_FATAL;
907 goto err;
908 }
909 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200910 else {
911 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
912 err_code |= ERR_ALERT | ERR_FATAL;
913 goto err;
914 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200915
916err:
917 return err_code;
918}
919
920/*
921 * Post parsing "ring" section.
922 *
923 * The function returns 0 in success case, otherwise, it returns error
924 * flags.
925 */
926int cfg_post_parse_ring()
927{
928 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +0200929 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +0200930
931 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
932 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
933 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +0200934 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +0200935 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
936 err_code |= ERR_ALERT;
937 }
Emeric Brun494c5052020-05-28 11:13:15 +0200938
939 /* prepare forward server descriptors */
940 if (cfg_sink->forward_px) {
941 srv = cfg_sink->forward_px->srv;
942 while (srv) {
943 struct sink_forward_target *sft;
944 /* init ssl if needed */
945 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
946 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
947 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
948 err_code |= ERR_ALERT | ERR_FATAL;
949 }
950 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200951
Emeric Brun494c5052020-05-28 11:13:15 +0200952 /* allocate sink_forward_target descriptor */
953 sft = calloc(1, sizeof(*sft));
954 if (!sft) {
955 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
956 err_code |= ERR_ALERT | ERR_FATAL;
957 break;
958 }
959 sft->srv = srv;
960 sft->appctx = NULL;
961 sft->ofs = ~0; /* init ring offset */
962 sft->next = cfg_sink->sft;
963 HA_SPIN_INIT(&sft->lock);
964
965 /* mark server attached to the ring */
966 if (!ring_attach(cfg_sink->ctx.ring)) {
967 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
968 err_code |= ERR_ALERT | ERR_FATAL;
969 }
970 cfg_sink->sft = sft;
971 srv = srv->next;
972 }
973 sink_init_forward(cfg_sink);
974 }
975 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200976 cfg_sink = NULL;
977
978 return err_code;
979}
980
981/* resolve sink names at end of config. Returns 0 on success otherwise error
982 * flags.
983*/
984int post_sink_resolve()
985{
986 int err_code = 0;
987 struct logsrv *logsrv, *logb;
988 struct sink *sink;
989 struct proxy *px;
990
991 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
992 if (logsrv->type == LOG_TARGET_BUFFER) {
993 sink = sink_find(logsrv->ring_name);
994 if (!sink || sink->type != SINK_TYPE_BUFFER) {
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500995 ha_alert("global log server uses unknown ring named '%s'.\n", logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200996 err_code |= ERR_ALERT | ERR_FATAL;
997 }
998 logsrv->sink = sink;
999 }
1000 }
1001
1002 for (px = proxies_list; px; px = px->next) {
1003 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1004 if (logsrv->type == LOG_TARGET_BUFFER) {
1005 sink = sink_find(logsrv->ring_name);
1006 if (!sink || sink->type != SINK_TYPE_BUFFER) {
Ilya Shipitsin47d17182020-06-21 21:42:57 +05001007 ha_alert("proxy '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001008 err_code |= ERR_ALERT | ERR_FATAL;
1009 }
1010 logsrv->sink = sink;
1011 }
1012 }
1013 }
Emeric Brun12941c82020-07-07 14:19:42 +02001014
1015 for (px = cfg_log_forward; px; px = px->next) {
1016 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1017 if (logsrv->type == LOG_TARGET_BUFFER) {
1018 sink = sink_find(logsrv->ring_name);
1019 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1020 ha_alert("log-forward '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name);
1021 err_code |= ERR_ALERT | ERR_FATAL;
1022 }
1023 logsrv->sink = sink;
1024 }
1025 }
1026 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001027 return err_code;
1028}
1029
1030
Willy Tarreau973e6622019-08-20 11:57:52 +02001031static void sink_init()
1032{
Emeric Brun54648852020-07-06 15:54:06 +02001033 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1034 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1035 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001036}
1037
1038static void sink_deinit()
1039{
1040 struct sink *sink, *sb;
1041
1042 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1043 if (sink->type == SINK_TYPE_BUFFER)
1044 ring_free(sink->ctx.ring);
1045 LIST_DEL(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001046 free(sink->name);
1047 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001048 free(sink);
1049 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001050}
1051
1052INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001053REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001054
Willy Tarreau9f830d72019-08-26 18:17:04 +02001055static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaufcf94982019-11-15 15:07:21 +01001056 { { "show", "events", NULL }, "show events [<sink>] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001057 {{},}
1058}};
1059
1060INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1061
Emeric Brun99c453d2020-05-25 15:01:04 +02001062/* config parsers for this section */
1063REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1064REGISTER_POST_CHECK(post_sink_resolve);
1065
Willy Tarreau67b5a162019-08-11 16:38:56 +02001066/*
1067 * Local variables:
1068 * c-indent-level: 8
1069 * c-basic-offset: 8
1070 * End:
1071 */