blob: 6c6897d572b649912ba949689d28980d5de13668 [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau36979d92020-06-05 17:27:29 +020021#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020023#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020024#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020025#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020026#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020027#include <haproxy/log.h>
Willy Tarreau817538e2021-05-08 20:20:21 +020028#include <haproxy/proxy.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020029#include <haproxy/ring.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020030#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020031#include <haproxy/sink.h>
Willy Tarreau5e539c92020-06-04 20:45:39 +020032#include <haproxy/stream_interface.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020033#include <haproxy/time.h>
Willy Tarreau4bad5e22021-05-08 13:05:30 +020034#include <haproxy/tools.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020035
36struct list sink_list = LIST_HEAD_INIT(sink_list);
37
Emeric Brun99c453d2020-05-25 15:01:04 +020038struct sink *cfg_sink;
39
Willy Tarreau67b5a162019-08-11 16:38:56 +020040struct sink *sink_find(const char *name)
41{
42 struct sink *sink;
43
44 list_for_each_entry(sink, &sink_list, sink_list)
45 if (strcmp(sink->name, name) == 0)
46 return sink;
47 return NULL;
48}
49
50/* creates a new sink and adds it to the list, it's still generic and not fully
51 * initialized. Returns NULL on allocation failure. If another one already
52 * exists with the same name, it will be returned. The caller can detect it as
53 * a newly created one has type SINK_TYPE_NEW.
54 */
Emeric Brun54648852020-07-06 15:54:06 +020055static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020056{
57 struct sink *sink;
58
59 sink = sink_find(name);
60 if (sink)
61 goto end;
62
Emeric Brun494c5052020-05-28 11:13:15 +020063 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020064 if (!sink)
65 goto end;
66
Emeric Brun99c453d2020-05-25 15:01:04 +020067 sink->name = strdup(name);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010068 if (!sink->name)
69 goto err;
70
Emeric Brun99c453d2020-05-25 15:01:04 +020071 sink->desc = strdup(desc);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010072 if (!sink->desc)
73 goto err;
74
Willy Tarreau67b5a162019-08-11 16:38:56 +020075 sink->fmt = fmt;
76 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010077 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020078 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020079 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020080 sink->ctx.dropped = 0;
81 HA_RWLOCK_INIT(&sink->ctx.lock);
Willy Tarreau2b718102021-04-21 07:32:39 +020082 LIST_APPEND(&sink_list, &sink->sink_list);
Willy Tarreau67b5a162019-08-11 16:38:56 +020083 end:
84 return sink;
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010085
86 err:
Willy Tarreau61cfdf42021-02-20 10:46:51 +010087 ha_free(&sink->name);
88 ha_free(&sink->desc);
89 ha_free(&sink);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010090
91 return NULL;
Willy Tarreau67b5a162019-08-11 16:38:56 +020092}
93
Willy Tarreau973e6622019-08-20 11:57:52 +020094/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
95 * and description <desc>. Returns NULL on allocation failure or conflict.
96 * Perfect duplicates are merged (same type, fd, and name).
97 */
Emeric Brun54648852020-07-06 15:54:06 +020098struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +020099{
100 struct sink *sink;
101
102 sink = __sink_new(name, desc, fmt);
103 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
104 goto end;
105
106 if (sink->type != SINK_TYPE_NEW) {
107 sink = NULL;
108 goto end;
109 }
110
111 sink->type = SINK_TYPE_FD;
112 sink->ctx.fd = fd;
113 end:
114 return sink;
115}
116
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200117/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
118 * and description <desc>. Returns NULL on allocation failure or conflict.
119 * Perfect duplicates are merged (same type and name). If sizes differ, the
120 * largest one is kept.
121 */
Emeric Brun54648852020-07-06 15:54:06 +0200122struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200123{
124 struct sink *sink;
125
126 sink = __sink_new(name, desc, fmt);
127 if (!sink)
128 goto fail;
129
130 if (sink->type == SINK_TYPE_BUFFER) {
131 /* such a buffer already exists, we may have to resize it */
132 if (!ring_resize(sink->ctx.ring, size))
133 goto fail;
134 goto end;
135 }
136
137 if (sink->type != SINK_TYPE_NEW) {
138 /* already exists of another type */
139 goto fail;
140 }
141
142 sink->ctx.ring = ring_new(size);
143 if (!sink->ctx.ring) {
Willy Tarreau2b718102021-04-21 07:32:39 +0200144 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200145 free(sink->name);
146 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200147 free(sink);
148 goto fail;
149 }
150
151 sink->type = SINK_TYPE_BUFFER;
152 end:
153 return sink;
154 fail:
155 return NULL;
156}
157
Willy Tarreau67b5a162019-08-11 16:38:56 +0200158/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500159 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200160 * done here. Lost messages are NOT accounted for. It is preferable to call
161 * sink_write() instead which will also try to emit the number of dropped
162 * messages when there are any. It returns >0 if it could write anything,
163 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200164 */
Emeric Brun54648852020-07-06 15:54:06 +0200165 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
166 int level, int facility, struct ist *metadata)
167 {
168 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200169 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200170
Emeric Brun54648852020-07-06 15:54:06 +0200171 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200172 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200173
Emeric Brun54648852020-07-06 15:54:06 +0200174 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200175
176send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200177 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200178 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200179 }
180 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200181 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200182 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200183 return 0;
184}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200185
Willy Tarreau8f240232019-08-27 16:41:06 +0200186/* Tries to emit a message indicating the number of dropped events. In case of
187 * success, the amount of drops is reduced by as much. It's supposed to be
188 * called under an exclusive lock on the sink to avoid multiple produces doing
189 * the same. On success, >0 is returned, otherwise <=0 on failure.
190 */
Emeric Brun54648852020-07-06 15:54:06 +0200191int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200192{
Emeric Brun54648852020-07-06 15:54:06 +0200193 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
194 static THREAD_LOCAL pid_t curr_pid;
195 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200196 unsigned int dropped;
197 struct buffer msg;
198 struct ist msgvec[1];
199 char logbuf[64];
200
201 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
202 chunk_init(&msg, logbuf, sizeof(logbuf));
203 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
204 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200205
Emeric Brun54648852020-07-06 15:54:06 +0200206 if (!metadata[LOG_META_HOST].len) {
207 if (global.log_send_hostname)
208 metadata[LOG_META_HOST] = ist2(global.log_send_hostname, strlen(global.log_send_hostname));
Emeric Brun54648852020-07-06 15:54:06 +0200209 }
210
211 if (!metadata[LOG_META_TAG].len)
212 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
213
214 if (unlikely(curr_pid != getpid()))
215 metadata[LOG_META_PID].len = 0;
216
217 if (!metadata[LOG_META_PID].len) {
218 curr_pid = getpid();
219 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
220 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
221 }
222
223 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200224 return 0;
225 /* success! */
226 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
227 }
228 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200229}
230
Willy Tarreau9f830d72019-08-26 18:17:04 +0200231/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
232static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
233{
234 struct sink *sink;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200235 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200236
237 args++; // make args[1] the 1st arg
238
239 if (!*args[1]) {
240 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200241 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200242 list_for_each_entry(sink, &sink_list, sink_list) {
243 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
244 sink->name,
245 sink->type == SINK_TYPE_NEW ? "init" :
246 sink->type == SINK_TYPE_FD ? "fd" :
247 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
248 sink->ctx.dropped, sink->desc);
249 }
250
251 trash.area[trash.data] = 0;
252 return cli_msg(appctx, LOG_WARNING, trash.area);
253 }
254
255 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
256 return 1;
257
258 sink = sink_find(args[1]);
259 if (!sink)
260 return cli_err(appctx, "No such event sink");
261
262 if (sink->type != SINK_TYPE_BUFFER)
263 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
264
Willy Tarreau1d181e42019-08-30 11:17:01 +0200265 for (arg = 2; *args[arg]; arg++) {
266 if (strcmp(args[arg], "-w") == 0)
267 appctx->ctx.cli.i0 |= 1; // wait mode
268 else if (strcmp(args[arg], "-n") == 0)
269 appctx->ctx.cli.i0 |= 2; // seek to new
270 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
271 appctx->ctx.cli.i0 |= 3; // seek to new + wait
272 else
273 return cli_err(appctx, "unknown option");
274 }
Willy Tarreau9f830d72019-08-26 18:17:04 +0200275 return ring_attach_cli(sink->ctx.ring, appctx);
276}
277
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500278/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200279void sink_setup_proxy(struct proxy *px)
280{
281 px->last_change = now.tv_sec;
282 px->cap = PR_CAP_FE | PR_CAP_BE;
283 px->maxconn = 0;
284 px->conn_retries = 1;
285 px->timeout.server = TICK_ETERNITY;
286 px->timeout.client = TICK_ETERNITY;
287 px->timeout.connect = TICK_ETERNITY;
288 px->accept = NULL;
289 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
290 px->bind_proc = 0; /* will be filled by users */
291}
292
293/*
294 * IO Handler to handle message push to syslog tcp server
295 */
296static void sink_forward_io_handler(struct appctx *appctx)
297{
298 struct stream_interface *si = appctx->owner;
299 struct stream *s = si_strm(si);
300 struct sink *sink = strm_fe(s)->parent;
301 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
302 struct ring *ring = sink->ctx.ring;
303 struct buffer *buf = &ring->buf;
304 uint64_t msg_len;
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200305 size_t len, cnt, ofs, last_ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200306 int ret = 0;
307
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500308 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200309 if (unlikely(stopping))
310 goto close;
311
312 /* for rex because it seems reset to timeout
313 * and we don't want expire on this case
314 * with a syslog server
315 */
316 si_oc(si)->rex = TICK_ETERNITY;
317 /* rto should not change but it seems the case */
318 si_oc(si)->rto = TICK_ETERNITY;
319
320 /* an error was detected */
321 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
322 goto close;
323
324 /* con closed by server side */
325 if ((si_oc(si)->flags & CF_SHUTW))
326 goto close;
327
328 /* if the connection is not established, inform the stream that we want
329 * to be notified whenever the connection completes.
330 */
331 if (si_opposite(si)->state < SI_ST_EST) {
332 si_cant_get(si);
333 si_rx_conn_blk(si);
334 si_rx_endp_more(si);
335 return;
336 }
337
338 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
339 if (appctx != sft->appctx) {
340 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
341 goto close;
342 }
343 ofs = sft->ofs;
344
345 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
346 LIST_DEL_INIT(&appctx->wait_entry);
347 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
348
349 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
350
351 /* explanation for the initialization below: it would be better to do
352 * this in the parsing function but this would occasionally result in
353 * dropped events because we'd take a reference on the oldest message
354 * and keep it while being scheduled. Thus instead let's take it the
355 * first time we enter here so that we have a chance to pass many
356 * existing messages before grabbing a reference to a location. This
357 * value cannot be produced after initialization.
358 */
359 if (unlikely(ofs == ~0)) {
360 ofs = 0;
361
Willy Tarreau4781b152021-04-06 13:53:36 +0200362 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200363 ofs += ring->ofs;
364 }
365
Emeric Brun494c5052020-05-28 11:13:15 +0200366 /* in this loop, ofs always points to the counter byte that precedes
367 * the message so that we can take our reference there if we have to
368 * stop before the end (ret=0).
369 */
370 if (si_opposite(si)->state == SI_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100371 /* we were already there, adjust the offset to be relative to
372 * the buffer's head and remove us from the counter.
373 */
374 ofs -= ring->ofs;
375 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200376 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100377
Emeric Brun494c5052020-05-28 11:13:15 +0200378 ret = 1;
379 while (ofs + 1 < b_data(buf)) {
380 cnt = 1;
381 len = b_peek_varint(buf, ofs + cnt, &msg_len);
382 if (!len)
383 break;
384 cnt += len;
385 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
386
387 if (unlikely(msg_len + 1 > b_size(&trash))) {
388 /* too large a message to ever fit, let's skip it */
389 ofs += cnt + msg_len;
390 continue;
391 }
392
393 chunk_reset(&trash);
394 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
395 trash.data += len;
396 trash.area[trash.data++] = '\n';
397
398 if (ci_putchk(si_ic(si), &trash) == -1) {
399 si_rx_room_blk(si);
400 ret = 0;
401 break;
402 }
403 ofs += cnt + msg_len;
404 }
405
Willy Tarreau4781b152021-04-06 13:53:36 +0200406 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200407 ofs += ring->ofs;
408 sft->ofs = ofs;
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200409 last_ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200410 }
411 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
412
413 if (ret) {
414 /* let's be woken up once new data arrive */
415 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200416 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200417 ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200418 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200419 if (ofs != last_ofs) {
420 /* more data was added into the ring between the
421 * unlock and the lock, and the writer might not
422 * have seen us. We need to reschedule a read.
423 */
424 si_rx_endp_more(si);
425 } else
426 si_rx_endp_done(si);
Emeric Brun494c5052020-05-28 11:13:15 +0200427 }
428 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
429
430 /* always drain data from server */
431 co_skip(si_oc(si), si_oc(si)->output);
432 return;
433
434close:
435 si_shutw(si);
436 si_shutr(si);
437 si_ic(si)->flags |= CF_READ_NULL;
438}
439
Emeric Brun97556472020-05-30 01:42:45 +0200440/*
441 * IO Handler to handle message push to syslog tcp server
442 * using octet counting frames
443 */
444static void sink_forward_oc_io_handler(struct appctx *appctx)
445{
446 struct stream_interface *si = appctx->owner;
447 struct stream *s = si_strm(si);
448 struct sink *sink = strm_fe(s)->parent;
449 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
450 struct ring *ring = sink->ctx.ring;
451 struct buffer *buf = &ring->buf;
452 uint64_t msg_len;
453 size_t len, cnt, ofs;
454 int ret = 0;
455 char *p;
456
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500457 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200458 if (unlikely(stopping))
459 goto close;
460
461 /* for rex because it seems reset to timeout
462 * and we don't want expire on this case
463 * with a syslog server
464 */
465 si_oc(si)->rex = TICK_ETERNITY;
466 /* rto should not change but it seems the case */
467 si_oc(si)->rto = TICK_ETERNITY;
468
469 /* an error was detected */
470 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
471 goto close;
472
473 /* con closed by server side */
474 if ((si_oc(si)->flags & CF_SHUTW))
475 goto close;
476
477 /* if the connection is not established, inform the stream that we want
478 * to be notified whenever the connection completes.
479 */
480 if (si_opposite(si)->state < SI_ST_EST) {
481 si_cant_get(si);
482 si_rx_conn_blk(si);
483 si_rx_endp_more(si);
484 return;
485 }
486
487 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
488 if (appctx != sft->appctx) {
489 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
490 goto close;
491 }
492 ofs = sft->ofs;
493
494 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
495 LIST_DEL_INIT(&appctx->wait_entry);
496 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
497
498 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
499
500 /* explanation for the initialization below: it would be better to do
501 * this in the parsing function but this would occasionally result in
502 * dropped events because we'd take a reference on the oldest message
503 * and keep it while being scheduled. Thus instead let's take it the
504 * first time we enter here so that we have a chance to pass many
505 * existing messages before grabbing a reference to a location. This
506 * value cannot be produced after initialization.
507 */
508 if (unlikely(ofs == ~0)) {
509 ofs = 0;
510
Willy Tarreau4781b152021-04-06 13:53:36 +0200511 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200512 ofs += ring->ofs;
513 }
514
Emeric Brun97556472020-05-30 01:42:45 +0200515 /* in this loop, ofs always points to the counter byte that precedes
516 * the message so that we can take our reference there if we have to
517 * stop before the end (ret=0).
518 */
519 if (si_opposite(si)->state == SI_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100520 /* we were already there, adjust the offset to be relative to
521 * the buffer's head and remove us from the counter.
522 */
523 ofs -= ring->ofs;
524 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200525 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100526
Emeric Brun97556472020-05-30 01:42:45 +0200527 ret = 1;
528 while (ofs + 1 < b_data(buf)) {
529 cnt = 1;
530 len = b_peek_varint(buf, ofs + cnt, &msg_len);
531 if (!len)
532 break;
533 cnt += len;
534 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
535
536 chunk_reset(&trash);
537 p = ulltoa(msg_len, trash.area, b_size(&trash));
538 if (p) {
539 trash.data = (p - trash.area) + 1;
540 *p = ' ';
541 }
542
543 if (!p || (trash.data + msg_len > b_size(&trash))) {
544 /* too large a message to ever fit, let's skip it */
545 ofs += cnt + msg_len;
546 continue;
547 }
548
549 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
550
551 if (ci_putchk(si_ic(si), &trash) == -1) {
552 si_rx_room_blk(si);
553 ret = 0;
554 break;
555 }
556 ofs += cnt + msg_len;
557 }
558
Willy Tarreau4781b152021-04-06 13:53:36 +0200559 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200560 ofs += ring->ofs;
561 sft->ofs = ofs;
562 }
563 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
564
565 if (ret) {
566 /* let's be woken up once new data arrive */
567 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200568 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Emeric Brun97556472020-05-30 01:42:45 +0200569 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
570 si_rx_endp_done(si);
571 }
572 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
573
574 /* always drain data from server */
575 co_skip(si_oc(si), si_oc(si)->output);
576 return;
577
578close:
579 si_shutw(si);
580 si_shutr(si);
581 si_ic(si)->flags |= CF_READ_NULL;
582}
583
Emeric Brun494c5052020-05-28 11:13:15 +0200584void __sink_forward_session_deinit(struct sink_forward_target *sft)
585{
586 struct stream_interface *si;
587 struct stream *s;
588 struct sink *sink;
589
590 if (!sft->appctx)
591 return;
592
593 si = sft->appctx->owner;
594 if (!si)
595 return;
596
597 s = si_strm(si);
598 if (!s)
599 return;
600
601 sink = strm_fe(s)->parent;
602 if (!sink)
603 return;
604
605 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
606 LIST_DEL_INIT(&sft->appctx->wait_entry);
607 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
608
609 sft->appctx = NULL;
610 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
611}
612
613
614static void sink_forward_session_release(struct appctx *appctx)
615{
Christopher Fauletefebfda2022-01-14 15:03:22 +0100616 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
Emeric Brun494c5052020-05-28 11:13:15 +0200617
618 if (!sft)
619 return;
620
621 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
622 if (sft->appctx == appctx)
623 __sink_forward_session_deinit(sft);
624 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
625}
626
627static struct applet sink_forward_applet = {
628 .obj_type = OBJ_TYPE_APPLET,
629 .name = "<SINKFWD>", /* used for logging */
630 .fct = sink_forward_io_handler,
631 .release = sink_forward_session_release,
632};
633
Emeric Brun97556472020-05-30 01:42:45 +0200634static struct applet sink_forward_oc_applet = {
635 .obj_type = OBJ_TYPE_APPLET,
636 .name = "<SINKFWDOC>", /* used for logging */
637 .fct = sink_forward_oc_io_handler,
638 .release = sink_forward_session_release,
639};
640
Emeric Brun494c5052020-05-28 11:13:15 +0200641/*
642 * Create a new peer session in assigned state (connect will start automatically)
643 */
644static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
645{
646 struct proxy *p = sink->forward_px;
647 struct appctx *appctx;
648 struct session *sess;
649 struct stream *s;
Emeric Brun97556472020-05-30 01:42:45 +0200650 struct applet *applet = &sink_forward_applet;
651
652 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
653 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200654
Emeric Brun97556472020-05-30 01:42:45 +0200655 appctx = appctx_new(applet, tid_bit);
Emeric Brun494c5052020-05-28 11:13:15 +0200656 if (!appctx)
657 goto out_close;
658
659 appctx->ctx.sft.ptr = (void *)sft;
660
661 sess = session_new(p, NULL, &appctx->obj_type);
662 if (!sess) {
663 ha_alert("out of memory in peer_session_create().\n");
664 goto out_free_appctx;
665 }
666
Christopher Faulet26256f82020-09-14 11:40:13 +0200667 if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) {
Emeric Brun494c5052020-05-28 11:13:15 +0200668 ha_alert("Failed to initialize stream in peer_session_create().\n");
669 goto out_free_sess;
670 }
671
672
673 s->target = &sft->srv->obj_type;
Willy Tarreau9b7587a2020-10-15 07:32:10 +0200674 if (!sockaddr_alloc(&s->target_addr, &sft->srv->addr, sizeof(sft->srv->addr)))
Emeric Brun494c5052020-05-28 11:13:15 +0200675 goto out_free_strm;
Emeric Brun494c5052020-05-28 11:13:15 +0200676 s->flags = SF_ASSIGNED|SF_ADDR_SET;
677 s->si[1].flags |= SI_FL_NOLINGER;
678
679 s->do_log = NULL;
680 s->uniq_id = 0;
681
682 s->res.flags |= CF_READ_DONTWAIT;
683 /* for rto and rex to eternity to not expire on idle recv:
684 * We are using a syslog server.
685 */
686 s->res.rto = TICK_ETERNITY;
687 s->res.rex = TICK_ETERNITY;
688 sft->appctx = appctx;
689 task_wakeup(s->task, TASK_WOKEN_INIT);
690 return appctx;
691
692 /* Error unrolling */
693 out_free_strm:
Willy Tarreau2b718102021-04-21 07:32:39 +0200694 LIST_DELETE(&s->list);
Emeric Brun494c5052020-05-28 11:13:15 +0200695 pool_free(pool_head_stream, s);
696 out_free_sess:
697 session_free(sess);
698 out_free_appctx:
699 appctx_free(appctx);
700 out_close:
701 return NULL;
702}
703
704/*
705 * Task to handle connctions to forward servers
706 */
Willy Tarreau144f84a2021-03-02 16:09:26 +0100707static struct task *process_sink_forward(struct task * task, void *context, unsigned int state)
Emeric Brun494c5052020-05-28 11:13:15 +0200708{
709 struct sink *sink = (struct sink *)context;
710 struct sink_forward_target *sft = sink->sft;
711
712 task->expire = TICK_ETERNITY;
713
714 if (!stopping) {
715 while (sft) {
716 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
717 /* if appctx is NULL, start a new session */
718 if (!sft->appctx)
719 sft->appctx = sink_forward_session_create(sink, sft);
720 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
721 sft = sft->next;
722 }
723 }
724 else {
725 while (sft) {
726 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
727 /* awake applet to perform a clean close */
728 if (sft->appctx)
729 appctx_wakeup(sft->appctx);
730 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
731 sft = sft->next;
732 }
733 }
734
735 return task;
736}
737/*
738 * Init task to manage connctions to forward servers
739 *
740 * returns 0 in case of error.
741 */
742int sink_init_forward(struct sink *sink)
743{
744 sink->forward_task = task_new(MAX_THREADS_MASK);
745 if (!sink->forward_task)
746 return 0;
747
748 sink->forward_task->process = process_sink_forward;
749 sink->forward_task->context = (void *)sink;
750 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
751 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
752 return 1;
753}
Emeric Brun99c453d2020-05-25 15:01:04 +0200754/*
755 * Parse "ring" section and create corresponding sink buffer.
756 *
757 * The function returns 0 in success case, otherwise, it returns error
758 * flags.
759 */
760int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
761{
762 int err_code = 0;
763 const char *inv;
764 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200765 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200766
Willy Tarreau9d675b62022-08-11 16:12:11 +0200767 if (strcmp(args[0], "ring") == 0) { /* new ring section */
Emeric Brun99c453d2020-05-25 15:01:04 +0200768 if (!*args[1]) {
769 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
770 err_code |= ERR_ALERT | ERR_FATAL;
771 goto err;
772 }
773
774 inv = invalid_char(args[1]);
775 if (inv) {
776 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
777 err_code |= ERR_ALERT | ERR_FATAL;
778 goto err;
779 }
780
781 if (sink_find(args[1])) {
782 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
783 err_code |= ERR_ALERT | ERR_FATAL;
784 goto err;
785 }
786
Emeric Brun54648852020-07-06 15:54:06 +0200787 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200788 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
789 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
790 err_code |= ERR_ALERT | ERR_FATAL;
791 goto err;
792 }
Emeric Brun494c5052020-05-28 11:13:15 +0200793
794 /* allocate new proxy to handle forwards */
795 p = calloc(1, sizeof *p);
796 if (!p) {
797 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
798 err_code |= ERR_ALERT | ERR_FATAL;
799 goto err;
800 }
801
802 init_new_proxy(p);
803 sink_setup_proxy(p);
804 p->parent = cfg_sink;
805 p->id = strdup(args[1]);
806 p->conf.args.file = p->conf.file = strdup(file);
807 p->conf.args.line = p->conf.line = linenum;
808 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200809 }
810 else if (strcmp(args[0], "size") == 0) {
Willy Tarreau9d675b62022-08-11 16:12:11 +0200811 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
812 ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum);
813 err_code |= ERR_ALERT | ERR_FATAL;
814 goto err;
815 }
816
Emeric Brun99c453d2020-05-25 15:01:04 +0200817 size = atol(args[1]);
818 if (!size) {
819 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
820 err_code |= ERR_ALERT | ERR_FATAL;
821 goto err;
822 }
823
Willy Tarreau9d675b62022-08-11 16:12:11 +0200824 if (size < cfg_sink->ctx.ring->buf.size) {
825 ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than current size '%llu' for ring '%s'.\n",
826 file, linenum, (ullong)size, (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
827 err_code |= ERR_ALERT | ERR_FATAL;
828 goto err;
829 }
830
831 if (!ring_resize(cfg_sink->ctx.ring, size)) {
832 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%llu' for ring '%s'.\n", file, linenum,
833 (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200834 err_code |= ERR_ALERT | ERR_FATAL;
835 goto err;
836 }
837 }
Emeric Brun494c5052020-05-28 11:13:15 +0200838 else if (strcmp(args[0],"server") == 0) {
Amaury Denoyelle30c05372021-03-08 16:36:46 +0100839 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL,
840 SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE);
Emeric Brun494c5052020-05-28 11:13:15 +0200841 }
842 else if (strcmp(args[0],"timeout") == 0) {
843 if (!cfg_sink || !cfg_sink->forward_px) {
844 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
845 err_code |= ERR_ALERT | ERR_FATAL;
846 goto err;
847 }
848
849 if (strcmp(args[1], "connect") == 0 ||
850 strcmp(args[1], "server") == 0) {
851 const char *res;
852 unsigned int tout;
853
854 if (!*args[2]) {
855 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
856 file, linenum, args[0], args[1]);
857 err_code |= ERR_ALERT | ERR_FATAL;
858 goto err;
859 }
860 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
861 if (res == PARSE_TIME_OVER) {
862 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
863 file, linenum, args[2], args[0], args[1]);
864 err_code |= ERR_ALERT | ERR_FATAL;
865 goto err;
866 }
867 else if (res == PARSE_TIME_UNDER) {
868 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
869 file, linenum, args[2], args[0], args[1]);
870 err_code |= ERR_ALERT | ERR_FATAL;
871 goto err;
872 }
873 else if (res) {
874 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
875 file, linenum, *res, args[0], args[1]);
876 err_code |= ERR_ALERT | ERR_FATAL;
877 goto err;
878 }
879 if (args[1][2] == 'c')
880 cfg_sink->forward_px->timeout.connect = tout;
881 else
882 cfg_sink->forward_px->timeout.server = tout;
883 }
884 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200885 else if (strcmp(args[0],"format") == 0) {
886 if (!cfg_sink) {
887 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
888 err_code |= ERR_ALERT | ERR_FATAL;
889 goto err;
890 }
891
Emeric Brun54648852020-07-06 15:54:06 +0200892 cfg_sink->fmt = get_log_format(args[1]);
893 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +0200894 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
895 err_code |= ERR_ALERT | ERR_FATAL;
896 goto err;
897 }
898 }
899 else if (strcmp(args[0],"maxlen") == 0) {
900 if (!cfg_sink) {
901 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
902 err_code |= ERR_ALERT | ERR_FATAL;
903 goto err;
904 }
905
906 cfg_sink->maxlen = atol(args[1]);
907 if (!cfg_sink->maxlen) {
908 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
909 err_code |= ERR_ALERT | ERR_FATAL;
910 goto err;
911 }
912 }
913 else if (strcmp(args[0],"description") == 0) {
914 if (!cfg_sink) {
915 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
916 err_code |= ERR_ALERT | ERR_FATAL;
917 goto err;
918 }
919
920 if (!*args[1]) {
921 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
922 err_code |= ERR_ALERT | ERR_FATAL;
923 goto err;
924 }
925
926 free(cfg_sink->desc);
927
928 cfg_sink->desc = strdup(args[1]);
929 if (!cfg_sink->desc) {
930 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
931 err_code |= ERR_ALERT | ERR_FATAL;
932 goto err;
933 }
934 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200935 else {
936 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
937 err_code |= ERR_ALERT | ERR_FATAL;
938 goto err;
939 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200940
941err:
942 return err_code;
943}
944
Emeric Brun94aab062021-04-02 10:41:36 +0200945/* Creates an new sink buffer from a log server.
946 *
947 * It uses the logsrvaddress to declare a forward
948 * server for this buffer. And it initializes the
949 * forwarding.
950 *
951 * The function returns a pointer on the
952 * allocated struct sink if allocate
953 * and initialize succeed, else if it fails
954 * it returns NULL.
955 *
956 * Note: the sink is created using the name
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +0500957 * specified into logsrv->ring_name
Emeric Brun94aab062021-04-02 10:41:36 +0200958 */
959struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
960{
961 struct proxy *p = NULL;
962 struct sink *sink = NULL;
963 struct server *srv = NULL;
964 struct sink_forward_target *sft = NULL;
965 int i;
966
967 /* allocate new proxy to handle
968 * forward to a stream server
969 */
970 p = calloc(1, sizeof *p);
971 if (!p) {
972 goto error;
973 }
974
975 init_new_proxy(p);
976 sink_setup_proxy(p);
977 p->id = strdup(logsrv->ring_name);
978 p->conf.args.file = p->conf.file = strdup(logsrv->conf.file);
979 p->conf.args.line = p->conf.line = logsrv->conf.line;
980
981 /* allocate a new server to forward messages
982 * from ring buffer
983 */
984 srv = new_server(p);
985 if (!srv)
986 goto error;
987
988 /* init server */
989 srv->id = strdup(logsrv->ring_name);
990 srv->conf.file = strdup(logsrv->conf.file);
991 srv->conf.line = logsrv->conf.line;
992 srv->addr = logsrv->addr;
993 srv->svc_port = get_host_port(&logsrv->addr);
994 HA_SPIN_INIT(&srv->lock);
995
996 /* process per thread init */
997 srv->per_thr = calloc(global.nbthread, sizeof(*srv->per_thr));
998 if (!srv->per_thr)
999 goto error;
1000
1001 for (i = 0; i < global.nbthread; i++) {
1002 srv->per_thr[i].idle_conns = EB_ROOT;
1003 srv->per_thr[i].safe_conns = EB_ROOT;
1004 srv->per_thr[i].avail_conns = EB_ROOT;
1005 MT_LIST_INIT(&srv->per_thr[i].streams);
1006 }
1007
1008 /* the servers are linked backwards
1009 * first into proxy
1010 */
1011 p->srv = srv;
1012 srv->next = p->srv;
1013
1014 /* allocate sink_forward_target descriptor */
1015 sft = calloc(1, sizeof(*sft));
1016 if (!sft)
1017 goto error;
1018
1019 /* init sink_forward_target offset */
1020 sft->srv = srv;
1021 sft->appctx = NULL;
1022 sft->ofs = ~0;
1023 HA_SPIN_INIT(&sft->lock);
1024
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +05001025 /* prepare description for the sink */
Emeric Brun94aab062021-04-02 10:41:36 +02001026 chunk_reset(&trash);
1027 chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
1028
1029 /* allocate a new sink buffer */
1030 sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE);
1031 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1032 goto error;
1033 }
1034
1035 /* link sink_forward_target to proxy */
1036 sink->forward_px = p;
1037 p->parent = sink;
1038
1039 /* insert into sink_forward_targets
1040 * list into sink
1041 */
1042 sft->next = sink->sft;
1043 sink->sft = sft;
1044
1045 /* mark server as an attached reader to the ring */
1046 if (!ring_attach(sink->ctx.ring)) {
1047 /* should never fail since there is
1048 * only one reader
1049 */
1050 goto error;
1051 }
1052
1053 /* initialize sink buffer forwarding */
1054 if (!sink_init_forward(sink))
1055 goto error;
1056
1057 /* reset familyt of logsrv to consider the ring buffer target */
1058 logsrv->addr.ss_family = AF_UNSPEC;
1059
1060 return sink;
1061error:
1062 if (p) {
1063 if (p->id)
1064 free(p->id);
1065 if (p->conf.file)
1066 free(p->conf.file);
1067
1068 free(p);
1069 }
1070
1071 if (srv) {
1072 if (srv->id)
1073 free(srv->id);
1074 if (srv->conf.file)
1075 free((void *)srv->conf.file);
1076 if (srv->per_thr)
1077 free(srv->per_thr);
1078 free(srv);
1079 }
1080
1081 if (sft)
1082 free(sft);
1083
1084 if (sink) {
1085 if (sink->ctx.ring)
1086 ring_free(sink->ctx.ring);
1087
Willy Tarreau2b718102021-04-21 07:32:39 +02001088 LIST_DELETE(&sink->sink_list);
Emeric Brun94aab062021-04-02 10:41:36 +02001089 free(sink->name);
1090 free(sink->desc);
1091 free(sink);
1092 }
1093
1094 return NULL;
1095}
1096
Emeric Brun99c453d2020-05-25 15:01:04 +02001097/*
1098 * Post parsing "ring" section.
1099 *
1100 * The function returns 0 in success case, otherwise, it returns error
1101 * flags.
1102 */
1103int cfg_post_parse_ring()
1104{
1105 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +02001106 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +02001107
1108 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
1109 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
1110 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +02001111 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +02001112 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
1113 err_code |= ERR_ALERT;
1114 }
Emeric Brun494c5052020-05-28 11:13:15 +02001115
1116 /* prepare forward server descriptors */
1117 if (cfg_sink->forward_px) {
1118 srv = cfg_sink->forward_px->srv;
1119 while (srv) {
1120 struct sink_forward_target *sft;
1121 /* init ssl if needed */
1122 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
1123 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
1124 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
1125 err_code |= ERR_ALERT | ERR_FATAL;
1126 }
1127 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001128
Emeric Brun494c5052020-05-28 11:13:15 +02001129 /* allocate sink_forward_target descriptor */
1130 sft = calloc(1, sizeof(*sft));
1131 if (!sft) {
1132 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1133 err_code |= ERR_ALERT | ERR_FATAL;
1134 break;
1135 }
1136 sft->srv = srv;
1137 sft->appctx = NULL;
1138 sft->ofs = ~0; /* init ring offset */
1139 sft->next = cfg_sink->sft;
1140 HA_SPIN_INIT(&sft->lock);
1141
1142 /* mark server attached to the ring */
1143 if (!ring_attach(cfg_sink->ctx.ring)) {
1144 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1145 err_code |= ERR_ALERT | ERR_FATAL;
1146 }
1147 cfg_sink->sft = sft;
1148 srv = srv->next;
1149 }
1150 sink_init_forward(cfg_sink);
1151 }
1152 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001153 cfg_sink = NULL;
1154
1155 return err_code;
1156}
1157
1158/* resolve sink names at end of config. Returns 0 on success otherwise error
1159 * flags.
1160*/
1161int post_sink_resolve()
1162{
Christopher Fauletfc633b62020-11-06 15:24:23 +01001163 int err_code = ERR_NONE;
Emeric Brun99c453d2020-05-25 15:01:04 +02001164 struct logsrv *logsrv, *logb;
1165 struct sink *sink;
1166 struct proxy *px;
1167
1168 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1169 if (logsrv->type == LOG_TARGET_BUFFER) {
1170 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001171 if (!sink) {
1172 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1173 * means we must allocate a sink
1174 * buffer to send messages to this logsrv
1175 */
1176 if (logsrv->addr.ss_family != AF_UNSPEC) {
1177 sink = sink_new_from_logsrv(logsrv);
1178 if (!sink) {
1179 ha_alert("global stream log server declared in file '%s' at line %d cannot be initialized'.\n",
1180 logsrv->conf.file, logsrv->conf.line);
1181 err_code |= ERR_ALERT | ERR_FATAL;
1182 }
1183 }
1184 else {
1185 ha_alert("global log server declared in file '%s' at line %d uses unknown ring named '%s'.\n",
1186 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1187 err_code |= ERR_ALERT | ERR_FATAL;
1188 }
1189 }
1190 else if (sink->type != SINK_TYPE_BUFFER) {
1191 ha_alert("global log server declared in file '%s' at line %d uses incompatible ring '%s'.\n",
1192 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001193 err_code |= ERR_ALERT | ERR_FATAL;
1194 }
1195 logsrv->sink = sink;
1196 }
Emeric Brun94aab062021-04-02 10:41:36 +02001197
Emeric Brun99c453d2020-05-25 15:01:04 +02001198 }
1199
1200 for (px = proxies_list; px; px = px->next) {
1201 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1202 if (logsrv->type == LOG_TARGET_BUFFER) {
1203 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001204 if (!sink) {
1205 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1206 * means we must allocate a sink
1207 * buffer to send messages to this logsrv
1208 */
1209 if (logsrv->addr.ss_family != AF_UNSPEC) {
1210 sink = sink_new_from_logsrv(logsrv);
1211 if (!sink) {
1212 ha_alert("log server declared in proxy section '%s' file '%s' at line %d cannot be initialized'.\n",
1213 px->id, logsrv->conf.file, logsrv->conf.line);
1214 err_code |= ERR_ALERT | ERR_FATAL;
1215 }
1216 }
1217 else {
1218 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1219 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1220 err_code |= ERR_ALERT | ERR_FATAL;
1221 }
1222 }
1223 else if (sink->type != SINK_TYPE_BUFFER) {
1224 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses incomatible ring named '%s'.\n",
1225 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001226 err_code |= ERR_ALERT | ERR_FATAL;
1227 }
1228 logsrv->sink = sink;
1229 }
1230 }
1231 }
Emeric Brun12941c82020-07-07 14:19:42 +02001232
1233 for (px = cfg_log_forward; px; px = px->next) {
1234 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1235 if (logsrv->type == LOG_TARGET_BUFFER) {
1236 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001237 if (!sink) {
1238 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1239 * means we must allocate a sink
1240 * buffer to send messages to this logsrv
1241 */
1242 if (logsrv->addr.ss_family != AF_UNSPEC) {
1243 sink = sink_new_from_logsrv(logsrv);
1244 if (!sink) {
1245 ha_alert("log server declared in log-forward section '%s' file '%s' at line %d cannot be initialized'.\n",
1246 px->id, logsrv->conf.file, logsrv->conf.line);
1247 err_code |= ERR_ALERT | ERR_FATAL;
1248 }
1249 }
1250 else {
1251 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1252 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1253 err_code |= ERR_ALERT | ERR_FATAL;
1254 }
1255 }
1256 else if (sink->type != SINK_TYPE_BUFFER) {
1257 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1258 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun12941c82020-07-07 14:19:42 +02001259 err_code |= ERR_ALERT | ERR_FATAL;
1260 }
1261 logsrv->sink = sink;
1262 }
1263 }
1264 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001265 return err_code;
1266}
1267
1268
Willy Tarreau973e6622019-08-20 11:57:52 +02001269static void sink_init()
1270{
Emeric Brun54648852020-07-06 15:54:06 +02001271 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1272 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1273 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001274}
1275
1276static void sink_deinit()
1277{
1278 struct sink *sink, *sb;
1279
1280 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1281 if (sink->type == SINK_TYPE_BUFFER)
1282 ring_free(sink->ctx.ring);
Willy Tarreau2b718102021-04-21 07:32:39 +02001283 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001284 free(sink->name);
1285 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001286 free(sink);
1287 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001288}
1289
1290INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001291REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001292
Willy Tarreau9f830d72019-08-26 18:17:04 +02001293static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaub205bfd2021-05-07 11:38:37 +02001294 { { "show", "events", NULL }, "show events [<sink>] [-w] [-n] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001295 {{},}
1296}};
1297
1298INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1299
Emeric Brun99c453d2020-05-25 15:01:04 +02001300/* config parsers for this section */
1301REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1302REGISTER_POST_CHECK(post_sink_resolve);
1303
Willy Tarreau67b5a162019-08-11 16:38:56 +02001304/*
1305 * Local variables:
1306 * c-indent-level: 8
1307 * c-basic-offset: 8
1308 * End:
1309 */