blob: 78091aa58f594ff224e6bdd06c64298c39e2144e [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau36979d92020-06-05 17:27:29 +020021#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020023#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020024#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020025#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020026#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020027#include <haproxy/log.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020028#include <haproxy/ring.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020029#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020030#include <haproxy/sink.h>
Willy Tarreau5e539c92020-06-04 20:45:39 +020031#include <haproxy/stream_interface.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020032#include <haproxy/time.h>
Willy Tarreau4bad5e22021-05-08 13:05:30 +020033#include <haproxy/tools.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020034
35struct list sink_list = LIST_HEAD_INIT(sink_list);
36
Emeric Brun99c453d2020-05-25 15:01:04 +020037struct sink *cfg_sink;
38
Willy Tarreau67b5a162019-08-11 16:38:56 +020039struct sink *sink_find(const char *name)
40{
41 struct sink *sink;
42
43 list_for_each_entry(sink, &sink_list, sink_list)
44 if (strcmp(sink->name, name) == 0)
45 return sink;
46 return NULL;
47}
48
49/* creates a new sink and adds it to the list, it's still generic and not fully
50 * initialized. Returns NULL on allocation failure. If another one already
51 * exists with the same name, it will be returned. The caller can detect it as
52 * a newly created one has type SINK_TYPE_NEW.
53 */
Emeric Brun54648852020-07-06 15:54:06 +020054static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020055{
56 struct sink *sink;
57
58 sink = sink_find(name);
59 if (sink)
60 goto end;
61
Emeric Brun494c5052020-05-28 11:13:15 +020062 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020063 if (!sink)
64 goto end;
65
Emeric Brun99c453d2020-05-25 15:01:04 +020066 sink->name = strdup(name);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010067 if (!sink->name)
68 goto err;
69
Emeric Brun99c453d2020-05-25 15:01:04 +020070 sink->desc = strdup(desc);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010071 if (!sink->desc)
72 goto err;
73
Willy Tarreau67b5a162019-08-11 16:38:56 +020074 sink->fmt = fmt;
75 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010076 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020077 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020078 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020079 sink->ctx.dropped = 0;
80 HA_RWLOCK_INIT(&sink->ctx.lock);
Willy Tarreau2b718102021-04-21 07:32:39 +020081 LIST_APPEND(&sink_list, &sink->sink_list);
Willy Tarreau67b5a162019-08-11 16:38:56 +020082 end:
83 return sink;
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010084
85 err:
Willy Tarreau61cfdf42021-02-20 10:46:51 +010086 ha_free(&sink->name);
87 ha_free(&sink->desc);
88 ha_free(&sink);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010089
90 return NULL;
Willy Tarreau67b5a162019-08-11 16:38:56 +020091}
92
Willy Tarreau973e6622019-08-20 11:57:52 +020093/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
94 * and description <desc>. Returns NULL on allocation failure or conflict.
95 * Perfect duplicates are merged (same type, fd, and name).
96 */
Emeric Brun54648852020-07-06 15:54:06 +020097struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +020098{
99 struct sink *sink;
100
101 sink = __sink_new(name, desc, fmt);
102 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
103 goto end;
104
105 if (sink->type != SINK_TYPE_NEW) {
106 sink = NULL;
107 goto end;
108 }
109
110 sink->type = SINK_TYPE_FD;
111 sink->ctx.fd = fd;
112 end:
113 return sink;
114}
115
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200116/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
117 * and description <desc>. Returns NULL on allocation failure or conflict.
118 * Perfect duplicates are merged (same type and name). If sizes differ, the
119 * largest one is kept.
120 */
Emeric Brun54648852020-07-06 15:54:06 +0200121struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200122{
123 struct sink *sink;
124
125 sink = __sink_new(name, desc, fmt);
126 if (!sink)
127 goto fail;
128
129 if (sink->type == SINK_TYPE_BUFFER) {
130 /* such a buffer already exists, we may have to resize it */
131 if (!ring_resize(sink->ctx.ring, size))
132 goto fail;
133 goto end;
134 }
135
136 if (sink->type != SINK_TYPE_NEW) {
137 /* already exists of another type */
138 goto fail;
139 }
140
141 sink->ctx.ring = ring_new(size);
142 if (!sink->ctx.ring) {
Willy Tarreau2b718102021-04-21 07:32:39 +0200143 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200144 free(sink->name);
145 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200146 free(sink);
147 goto fail;
148 }
149
150 sink->type = SINK_TYPE_BUFFER;
151 end:
152 return sink;
153 fail:
154 return NULL;
155}
156
Willy Tarreau67b5a162019-08-11 16:38:56 +0200157/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500158 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200159 * done here. Lost messages are NOT accounted for. It is preferable to call
160 * sink_write() instead which will also try to emit the number of dropped
161 * messages when there are any. It returns >0 if it could write anything,
162 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200163 */
Emeric Brun54648852020-07-06 15:54:06 +0200164 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
165 int level, int facility, struct ist *metadata)
166 {
167 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200168 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200169
Emeric Brun54648852020-07-06 15:54:06 +0200170 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200171 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200172
Emeric Brun54648852020-07-06 15:54:06 +0200173 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200174
175send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200176 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200177 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200178 }
179 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200180 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200181 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200182 return 0;
183}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200184
Willy Tarreau8f240232019-08-27 16:41:06 +0200185/* Tries to emit a message indicating the number of dropped events. In case of
186 * success, the amount of drops is reduced by as much. It's supposed to be
187 * called under an exclusive lock on the sink to avoid multiple produces doing
188 * the same. On success, >0 is returned, otherwise <=0 on failure.
189 */
Emeric Brun54648852020-07-06 15:54:06 +0200190int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200191{
Emeric Brun54648852020-07-06 15:54:06 +0200192 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
193 static THREAD_LOCAL pid_t curr_pid;
194 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200195 unsigned int dropped;
196 struct buffer msg;
197 struct ist msgvec[1];
198 char logbuf[64];
199
200 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
201 chunk_init(&msg, logbuf, sizeof(logbuf));
202 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
203 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200204
Emeric Brun54648852020-07-06 15:54:06 +0200205 if (!metadata[LOG_META_HOST].len) {
206 if (global.log_send_hostname)
207 metadata[LOG_META_HOST] = ist2(global.log_send_hostname, strlen(global.log_send_hostname));
Emeric Brun54648852020-07-06 15:54:06 +0200208 }
209
210 if (!metadata[LOG_META_TAG].len)
211 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
212
213 if (unlikely(curr_pid != getpid()))
214 metadata[LOG_META_PID].len = 0;
215
216 if (!metadata[LOG_META_PID].len) {
217 curr_pid = getpid();
218 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
219 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
220 }
221
222 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200223 return 0;
224 /* success! */
225 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
226 }
227 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200228}
229
Willy Tarreau9f830d72019-08-26 18:17:04 +0200230/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
231static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
232{
233 struct sink *sink;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200234 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200235
236 args++; // make args[1] the 1st arg
237
238 if (!*args[1]) {
239 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200240 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200241 list_for_each_entry(sink, &sink_list, sink_list) {
242 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
243 sink->name,
244 sink->type == SINK_TYPE_NEW ? "init" :
245 sink->type == SINK_TYPE_FD ? "fd" :
246 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
247 sink->ctx.dropped, sink->desc);
248 }
249
250 trash.area[trash.data] = 0;
251 return cli_msg(appctx, LOG_WARNING, trash.area);
252 }
253
254 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
255 return 1;
256
257 sink = sink_find(args[1]);
258 if (!sink)
259 return cli_err(appctx, "No such event sink");
260
261 if (sink->type != SINK_TYPE_BUFFER)
262 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
263
Willy Tarreau1d181e42019-08-30 11:17:01 +0200264 for (arg = 2; *args[arg]; arg++) {
265 if (strcmp(args[arg], "-w") == 0)
266 appctx->ctx.cli.i0 |= 1; // wait mode
267 else if (strcmp(args[arg], "-n") == 0)
268 appctx->ctx.cli.i0 |= 2; // seek to new
269 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
270 appctx->ctx.cli.i0 |= 3; // seek to new + wait
271 else
272 return cli_err(appctx, "unknown option");
273 }
Willy Tarreau9f830d72019-08-26 18:17:04 +0200274 return ring_attach_cli(sink->ctx.ring, appctx);
275}
276
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500277/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200278void sink_setup_proxy(struct proxy *px)
279{
280 px->last_change = now.tv_sec;
281 px->cap = PR_CAP_FE | PR_CAP_BE;
282 px->maxconn = 0;
283 px->conn_retries = 1;
284 px->timeout.server = TICK_ETERNITY;
285 px->timeout.client = TICK_ETERNITY;
286 px->timeout.connect = TICK_ETERNITY;
287 px->accept = NULL;
288 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
289 px->bind_proc = 0; /* will be filled by users */
290}
291
292/*
293 * IO Handler to handle message push to syslog tcp server
294 */
295static void sink_forward_io_handler(struct appctx *appctx)
296{
297 struct stream_interface *si = appctx->owner;
298 struct stream *s = si_strm(si);
299 struct sink *sink = strm_fe(s)->parent;
300 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
301 struct ring *ring = sink->ctx.ring;
302 struct buffer *buf = &ring->buf;
303 uint64_t msg_len;
304 size_t len, cnt, ofs;
305 int ret = 0;
306
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500307 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200308 if (unlikely(stopping))
309 goto close;
310
311 /* for rex because it seems reset to timeout
312 * and we don't want expire on this case
313 * with a syslog server
314 */
315 si_oc(si)->rex = TICK_ETERNITY;
316 /* rto should not change but it seems the case */
317 si_oc(si)->rto = TICK_ETERNITY;
318
319 /* an error was detected */
320 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
321 goto close;
322
323 /* con closed by server side */
324 if ((si_oc(si)->flags & CF_SHUTW))
325 goto close;
326
327 /* if the connection is not established, inform the stream that we want
328 * to be notified whenever the connection completes.
329 */
330 if (si_opposite(si)->state < SI_ST_EST) {
331 si_cant_get(si);
332 si_rx_conn_blk(si);
333 si_rx_endp_more(si);
334 return;
335 }
336
337 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
338 if (appctx != sft->appctx) {
339 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
340 goto close;
341 }
342 ofs = sft->ofs;
343
344 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
345 LIST_DEL_INIT(&appctx->wait_entry);
346 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
347
348 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
349
350 /* explanation for the initialization below: it would be better to do
351 * this in the parsing function but this would occasionally result in
352 * dropped events because we'd take a reference on the oldest message
353 * and keep it while being scheduled. Thus instead let's take it the
354 * first time we enter here so that we have a chance to pass many
355 * existing messages before grabbing a reference to a location. This
356 * value cannot be produced after initialization.
357 */
358 if (unlikely(ofs == ~0)) {
359 ofs = 0;
360
Willy Tarreau4781b152021-04-06 13:53:36 +0200361 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200362 ofs += ring->ofs;
363 }
364
Emeric Brun494c5052020-05-28 11:13:15 +0200365 /* in this loop, ofs always points to the counter byte that precedes
366 * the message so that we can take our reference there if we have to
367 * stop before the end (ret=0).
368 */
369 if (si_opposite(si)->state == SI_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100370 /* we were already there, adjust the offset to be relative to
371 * the buffer's head and remove us from the counter.
372 */
373 ofs -= ring->ofs;
374 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200375 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100376
Emeric Brun494c5052020-05-28 11:13:15 +0200377 ret = 1;
378 while (ofs + 1 < b_data(buf)) {
379 cnt = 1;
380 len = b_peek_varint(buf, ofs + cnt, &msg_len);
381 if (!len)
382 break;
383 cnt += len;
384 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
385
386 if (unlikely(msg_len + 1 > b_size(&trash))) {
387 /* too large a message to ever fit, let's skip it */
388 ofs += cnt + msg_len;
389 continue;
390 }
391
392 chunk_reset(&trash);
393 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
394 trash.data += len;
395 trash.area[trash.data++] = '\n';
396
397 if (ci_putchk(si_ic(si), &trash) == -1) {
398 si_rx_room_blk(si);
399 ret = 0;
400 break;
401 }
402 ofs += cnt + msg_len;
403 }
404
Willy Tarreau4781b152021-04-06 13:53:36 +0200405 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200406 ofs += ring->ofs;
407 sft->ofs = ofs;
408 }
409 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
410
411 if (ret) {
412 /* let's be woken up once new data arrive */
413 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200414 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Emeric Brun494c5052020-05-28 11:13:15 +0200415 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
416 si_rx_endp_done(si);
417 }
418 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
419
420 /* always drain data from server */
421 co_skip(si_oc(si), si_oc(si)->output);
422 return;
423
424close:
425 si_shutw(si);
426 si_shutr(si);
427 si_ic(si)->flags |= CF_READ_NULL;
428}
429
Emeric Brun97556472020-05-30 01:42:45 +0200430/*
431 * IO Handler to handle message push to syslog tcp server
432 * using octet counting frames
433 */
434static void sink_forward_oc_io_handler(struct appctx *appctx)
435{
436 struct stream_interface *si = appctx->owner;
437 struct stream *s = si_strm(si);
438 struct sink *sink = strm_fe(s)->parent;
439 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
440 struct ring *ring = sink->ctx.ring;
441 struct buffer *buf = &ring->buf;
442 uint64_t msg_len;
443 size_t len, cnt, ofs;
444 int ret = 0;
445 char *p;
446
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500447 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200448 if (unlikely(stopping))
449 goto close;
450
451 /* for rex because it seems reset to timeout
452 * and we don't want expire on this case
453 * with a syslog server
454 */
455 si_oc(si)->rex = TICK_ETERNITY;
456 /* rto should not change but it seems the case */
457 si_oc(si)->rto = TICK_ETERNITY;
458
459 /* an error was detected */
460 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
461 goto close;
462
463 /* con closed by server side */
464 if ((si_oc(si)->flags & CF_SHUTW))
465 goto close;
466
467 /* if the connection is not established, inform the stream that we want
468 * to be notified whenever the connection completes.
469 */
470 if (si_opposite(si)->state < SI_ST_EST) {
471 si_cant_get(si);
472 si_rx_conn_blk(si);
473 si_rx_endp_more(si);
474 return;
475 }
476
477 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
478 if (appctx != sft->appctx) {
479 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
480 goto close;
481 }
482 ofs = sft->ofs;
483
484 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
485 LIST_DEL_INIT(&appctx->wait_entry);
486 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
487
488 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
489
490 /* explanation for the initialization below: it would be better to do
491 * this in the parsing function but this would occasionally result in
492 * dropped events because we'd take a reference on the oldest message
493 * and keep it while being scheduled. Thus instead let's take it the
494 * first time we enter here so that we have a chance to pass many
495 * existing messages before grabbing a reference to a location. This
496 * value cannot be produced after initialization.
497 */
498 if (unlikely(ofs == ~0)) {
499 ofs = 0;
500
Willy Tarreau4781b152021-04-06 13:53:36 +0200501 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200502 ofs += ring->ofs;
503 }
504
Emeric Brun97556472020-05-30 01:42:45 +0200505 /* in this loop, ofs always points to the counter byte that precedes
506 * the message so that we can take our reference there if we have to
507 * stop before the end (ret=0).
508 */
509 if (si_opposite(si)->state == SI_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100510 /* we were already there, adjust the offset to be relative to
511 * the buffer's head and remove us from the counter.
512 */
513 ofs -= ring->ofs;
514 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200515 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100516
Emeric Brun97556472020-05-30 01:42:45 +0200517 ret = 1;
518 while (ofs + 1 < b_data(buf)) {
519 cnt = 1;
520 len = b_peek_varint(buf, ofs + cnt, &msg_len);
521 if (!len)
522 break;
523 cnt += len;
524 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
525
526 chunk_reset(&trash);
527 p = ulltoa(msg_len, trash.area, b_size(&trash));
528 if (p) {
529 trash.data = (p - trash.area) + 1;
530 *p = ' ';
531 }
532
533 if (!p || (trash.data + msg_len > b_size(&trash))) {
534 /* too large a message to ever fit, let's skip it */
535 ofs += cnt + msg_len;
536 continue;
537 }
538
539 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
540
541 if (ci_putchk(si_ic(si), &trash) == -1) {
542 si_rx_room_blk(si);
543 ret = 0;
544 break;
545 }
546 ofs += cnt + msg_len;
547 }
548
Willy Tarreau4781b152021-04-06 13:53:36 +0200549 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200550 ofs += ring->ofs;
551 sft->ofs = ofs;
552 }
553 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
554
555 if (ret) {
556 /* let's be woken up once new data arrive */
557 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200558 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Emeric Brun97556472020-05-30 01:42:45 +0200559 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
560 si_rx_endp_done(si);
561 }
562 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
563
564 /* always drain data from server */
565 co_skip(si_oc(si), si_oc(si)->output);
566 return;
567
568close:
569 si_shutw(si);
570 si_shutr(si);
571 si_ic(si)->flags |= CF_READ_NULL;
572}
573
Emeric Brun494c5052020-05-28 11:13:15 +0200574void __sink_forward_session_deinit(struct sink_forward_target *sft)
575{
576 struct stream_interface *si;
577 struct stream *s;
578 struct sink *sink;
579
580 if (!sft->appctx)
581 return;
582
583 si = sft->appctx->owner;
584 if (!si)
585 return;
586
587 s = si_strm(si);
588 if (!s)
589 return;
590
591 sink = strm_fe(s)->parent;
592 if (!sink)
593 return;
594
595 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
596 LIST_DEL_INIT(&sft->appctx->wait_entry);
597 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
598
599 sft->appctx = NULL;
600 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
601}
602
603
604static void sink_forward_session_release(struct appctx *appctx)
605{
606 struct sink_forward_target *sft = appctx->ctx.peers.ptr;
607
608 if (!sft)
609 return;
610
611 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
612 if (sft->appctx == appctx)
613 __sink_forward_session_deinit(sft);
614 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
615}
616
617static struct applet sink_forward_applet = {
618 .obj_type = OBJ_TYPE_APPLET,
619 .name = "<SINKFWD>", /* used for logging */
620 .fct = sink_forward_io_handler,
621 .release = sink_forward_session_release,
622};
623
Emeric Brun97556472020-05-30 01:42:45 +0200624static struct applet sink_forward_oc_applet = {
625 .obj_type = OBJ_TYPE_APPLET,
626 .name = "<SINKFWDOC>", /* used for logging */
627 .fct = sink_forward_oc_io_handler,
628 .release = sink_forward_session_release,
629};
630
Emeric Brun494c5052020-05-28 11:13:15 +0200631/*
632 * Create a new peer session in assigned state (connect will start automatically)
633 */
634static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
635{
636 struct proxy *p = sink->forward_px;
637 struct appctx *appctx;
638 struct session *sess;
639 struct stream *s;
Emeric Brun97556472020-05-30 01:42:45 +0200640 struct applet *applet = &sink_forward_applet;
641
642 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
643 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200644
Emeric Brun97556472020-05-30 01:42:45 +0200645 appctx = appctx_new(applet, tid_bit);
Emeric Brun494c5052020-05-28 11:13:15 +0200646 if (!appctx)
647 goto out_close;
648
649 appctx->ctx.sft.ptr = (void *)sft;
650
651 sess = session_new(p, NULL, &appctx->obj_type);
652 if (!sess) {
653 ha_alert("out of memory in peer_session_create().\n");
654 goto out_free_appctx;
655 }
656
Christopher Faulet26256f82020-09-14 11:40:13 +0200657 if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) {
Emeric Brun494c5052020-05-28 11:13:15 +0200658 ha_alert("Failed to initialize stream in peer_session_create().\n");
659 goto out_free_sess;
660 }
661
662
663 s->target = &sft->srv->obj_type;
Willy Tarreau9b7587a2020-10-15 07:32:10 +0200664 if (!sockaddr_alloc(&s->target_addr, &sft->srv->addr, sizeof(sft->srv->addr)))
Emeric Brun494c5052020-05-28 11:13:15 +0200665 goto out_free_strm;
Emeric Brun494c5052020-05-28 11:13:15 +0200666 s->flags = SF_ASSIGNED|SF_ADDR_SET;
667 s->si[1].flags |= SI_FL_NOLINGER;
668
669 s->do_log = NULL;
670 s->uniq_id = 0;
671
672 s->res.flags |= CF_READ_DONTWAIT;
673 /* for rto and rex to eternity to not expire on idle recv:
674 * We are using a syslog server.
675 */
676 s->res.rto = TICK_ETERNITY;
677 s->res.rex = TICK_ETERNITY;
678 sft->appctx = appctx;
679 task_wakeup(s->task, TASK_WOKEN_INIT);
680 return appctx;
681
682 /* Error unrolling */
683 out_free_strm:
Willy Tarreau2b718102021-04-21 07:32:39 +0200684 LIST_DELETE(&s->list);
Emeric Brun494c5052020-05-28 11:13:15 +0200685 pool_free(pool_head_stream, s);
686 out_free_sess:
687 session_free(sess);
688 out_free_appctx:
689 appctx_free(appctx);
690 out_close:
691 return NULL;
692}
693
694/*
695 * Task to handle connctions to forward servers
696 */
Willy Tarreau144f84a2021-03-02 16:09:26 +0100697static struct task *process_sink_forward(struct task * task, void *context, unsigned int state)
Emeric Brun494c5052020-05-28 11:13:15 +0200698{
699 struct sink *sink = (struct sink *)context;
700 struct sink_forward_target *sft = sink->sft;
701
702 task->expire = TICK_ETERNITY;
703
704 if (!stopping) {
705 while (sft) {
706 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
707 /* if appctx is NULL, start a new session */
708 if (!sft->appctx)
709 sft->appctx = sink_forward_session_create(sink, sft);
710 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
711 sft = sft->next;
712 }
713 }
714 else {
715 while (sft) {
716 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
717 /* awake applet to perform a clean close */
718 if (sft->appctx)
719 appctx_wakeup(sft->appctx);
720 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
721 sft = sft->next;
722 }
723 }
724
725 return task;
726}
727/*
728 * Init task to manage connctions to forward servers
729 *
730 * returns 0 in case of error.
731 */
732int sink_init_forward(struct sink *sink)
733{
734 sink->forward_task = task_new(MAX_THREADS_MASK);
735 if (!sink->forward_task)
736 return 0;
737
738 sink->forward_task->process = process_sink_forward;
739 sink->forward_task->context = (void *)sink;
740 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
741 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
742 return 1;
743}
Emeric Brun99c453d2020-05-25 15:01:04 +0200744/*
745 * Parse "ring" section and create corresponding sink buffer.
746 *
747 * The function returns 0 in success case, otherwise, it returns error
748 * flags.
749 */
750int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
751{
752 int err_code = 0;
753 const char *inv;
754 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200755 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200756
757 if (strcmp(args[0], "ring") == 0) { /* new peers section */
758 if (!*args[1]) {
759 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
760 err_code |= ERR_ALERT | ERR_FATAL;
761 goto err;
762 }
763
764 inv = invalid_char(args[1]);
765 if (inv) {
766 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
767 err_code |= ERR_ALERT | ERR_FATAL;
768 goto err;
769 }
770
771 if (sink_find(args[1])) {
772 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
773 err_code |= ERR_ALERT | ERR_FATAL;
774 goto err;
775 }
776
Emeric Brun54648852020-07-06 15:54:06 +0200777 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200778 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
779 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
780 err_code |= ERR_ALERT | ERR_FATAL;
781 goto err;
782 }
Emeric Brun494c5052020-05-28 11:13:15 +0200783
784 /* allocate new proxy to handle forwards */
785 p = calloc(1, sizeof *p);
786 if (!p) {
787 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
788 err_code |= ERR_ALERT | ERR_FATAL;
789 goto err;
790 }
791
792 init_new_proxy(p);
793 sink_setup_proxy(p);
794 p->parent = cfg_sink;
795 p->id = strdup(args[1]);
796 p->conf.args.file = p->conf.file = strdup(file);
797 p->conf.args.line = p->conf.line = linenum;
798 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200799 }
800 else if (strcmp(args[0], "size") == 0) {
801 size = atol(args[1]);
802 if (!size) {
803 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
804 err_code |= ERR_ALERT | ERR_FATAL;
805 goto err;
806 }
807
808 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
809 || !ring_resize(cfg_sink->ctx.ring, size)) {
810 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
811 err_code |= ERR_ALERT | ERR_FATAL;
812 goto err;
813 }
814 }
Emeric Brun494c5052020-05-28 11:13:15 +0200815 else if (strcmp(args[0],"server") == 0) {
Amaury Denoyelle30c05372021-03-08 16:36:46 +0100816 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL,
817 SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE);
Emeric Brun494c5052020-05-28 11:13:15 +0200818 }
819 else if (strcmp(args[0],"timeout") == 0) {
820 if (!cfg_sink || !cfg_sink->forward_px) {
821 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
822 err_code |= ERR_ALERT | ERR_FATAL;
823 goto err;
824 }
825
826 if (strcmp(args[1], "connect") == 0 ||
827 strcmp(args[1], "server") == 0) {
828 const char *res;
829 unsigned int tout;
830
831 if (!*args[2]) {
832 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
833 file, linenum, args[0], args[1]);
834 err_code |= ERR_ALERT | ERR_FATAL;
835 goto err;
836 }
837 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
838 if (res == PARSE_TIME_OVER) {
839 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
840 file, linenum, args[2], args[0], args[1]);
841 err_code |= ERR_ALERT | ERR_FATAL;
842 goto err;
843 }
844 else if (res == PARSE_TIME_UNDER) {
845 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
846 file, linenum, args[2], args[0], args[1]);
847 err_code |= ERR_ALERT | ERR_FATAL;
848 goto err;
849 }
850 else if (res) {
851 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
852 file, linenum, *res, args[0], args[1]);
853 err_code |= ERR_ALERT | ERR_FATAL;
854 goto err;
855 }
856 if (args[1][2] == 'c')
857 cfg_sink->forward_px->timeout.connect = tout;
858 else
859 cfg_sink->forward_px->timeout.server = tout;
860 }
861 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200862 else if (strcmp(args[0],"format") == 0) {
863 if (!cfg_sink) {
864 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
865 err_code |= ERR_ALERT | ERR_FATAL;
866 goto err;
867 }
868
Emeric Brun54648852020-07-06 15:54:06 +0200869 cfg_sink->fmt = get_log_format(args[1]);
870 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +0200871 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
872 err_code |= ERR_ALERT | ERR_FATAL;
873 goto err;
874 }
875 }
876 else if (strcmp(args[0],"maxlen") == 0) {
877 if (!cfg_sink) {
878 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
879 err_code |= ERR_ALERT | ERR_FATAL;
880 goto err;
881 }
882
883 cfg_sink->maxlen = atol(args[1]);
884 if (!cfg_sink->maxlen) {
885 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
886 err_code |= ERR_ALERT | ERR_FATAL;
887 goto err;
888 }
889 }
890 else if (strcmp(args[0],"description") == 0) {
891 if (!cfg_sink) {
892 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
893 err_code |= ERR_ALERT | ERR_FATAL;
894 goto err;
895 }
896
897 if (!*args[1]) {
898 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
899 err_code |= ERR_ALERT | ERR_FATAL;
900 goto err;
901 }
902
903 free(cfg_sink->desc);
904
905 cfg_sink->desc = strdup(args[1]);
906 if (!cfg_sink->desc) {
907 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
908 err_code |= ERR_ALERT | ERR_FATAL;
909 goto err;
910 }
911 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200912 else {
913 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
914 err_code |= ERR_ALERT | ERR_FATAL;
915 goto err;
916 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200917
918err:
919 return err_code;
920}
921
Emeric Brun94aab062021-04-02 10:41:36 +0200922/* Creates an new sink buffer from a log server.
923 *
924 * It uses the logsrvaddress to declare a forward
925 * server for this buffer. And it initializes the
926 * forwarding.
927 *
928 * The function returns a pointer on the
929 * allocated struct sink if allocate
930 * and initialize succeed, else if it fails
931 * it returns NULL.
932 *
933 * Note: the sink is created using the name
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +0500934 * specified into logsrv->ring_name
Emeric Brun94aab062021-04-02 10:41:36 +0200935 */
936struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
937{
938 struct proxy *p = NULL;
939 struct sink *sink = NULL;
940 struct server *srv = NULL;
941 struct sink_forward_target *sft = NULL;
942 int i;
943
944 /* allocate new proxy to handle
945 * forward to a stream server
946 */
947 p = calloc(1, sizeof *p);
948 if (!p) {
949 goto error;
950 }
951
952 init_new_proxy(p);
953 sink_setup_proxy(p);
954 p->id = strdup(logsrv->ring_name);
955 p->conf.args.file = p->conf.file = strdup(logsrv->conf.file);
956 p->conf.args.line = p->conf.line = logsrv->conf.line;
957
958 /* allocate a new server to forward messages
959 * from ring buffer
960 */
961 srv = new_server(p);
962 if (!srv)
963 goto error;
964
965 /* init server */
966 srv->id = strdup(logsrv->ring_name);
967 srv->conf.file = strdup(logsrv->conf.file);
968 srv->conf.line = logsrv->conf.line;
969 srv->addr = logsrv->addr;
970 srv->svc_port = get_host_port(&logsrv->addr);
971 HA_SPIN_INIT(&srv->lock);
972
973 /* process per thread init */
974 srv->per_thr = calloc(global.nbthread, sizeof(*srv->per_thr));
975 if (!srv->per_thr)
976 goto error;
977
978 for (i = 0; i < global.nbthread; i++) {
979 srv->per_thr[i].idle_conns = EB_ROOT;
980 srv->per_thr[i].safe_conns = EB_ROOT;
981 srv->per_thr[i].avail_conns = EB_ROOT;
982 MT_LIST_INIT(&srv->per_thr[i].streams);
983 }
984
985 /* the servers are linked backwards
986 * first into proxy
987 */
988 p->srv = srv;
989 srv->next = p->srv;
990
991 /* allocate sink_forward_target descriptor */
992 sft = calloc(1, sizeof(*sft));
993 if (!sft)
994 goto error;
995
996 /* init sink_forward_target offset */
997 sft->srv = srv;
998 sft->appctx = NULL;
999 sft->ofs = ~0;
1000 HA_SPIN_INIT(&sft->lock);
1001
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +05001002 /* prepare description for the sink */
Emeric Brun94aab062021-04-02 10:41:36 +02001003 chunk_reset(&trash);
1004 chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
1005
1006 /* allocate a new sink buffer */
1007 sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE);
1008 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1009 goto error;
1010 }
1011
1012 /* link sink_forward_target to proxy */
1013 sink->forward_px = p;
1014 p->parent = sink;
1015
1016 /* insert into sink_forward_targets
1017 * list into sink
1018 */
1019 sft->next = sink->sft;
1020 sink->sft = sft;
1021
1022 /* mark server as an attached reader to the ring */
1023 if (!ring_attach(sink->ctx.ring)) {
1024 /* should never fail since there is
1025 * only one reader
1026 */
1027 goto error;
1028 }
1029
1030 /* initialize sink buffer forwarding */
1031 if (!sink_init_forward(sink))
1032 goto error;
1033
1034 /* reset familyt of logsrv to consider the ring buffer target */
1035 logsrv->addr.ss_family = AF_UNSPEC;
1036
1037 return sink;
1038error:
1039 if (p) {
1040 if (p->id)
1041 free(p->id);
1042 if (p->conf.file)
1043 free(p->conf.file);
1044
1045 free(p);
1046 }
1047
1048 if (srv) {
1049 if (srv->id)
1050 free(srv->id);
1051 if (srv->conf.file)
1052 free((void *)srv->conf.file);
1053 if (srv->per_thr)
1054 free(srv->per_thr);
1055 free(srv);
1056 }
1057
1058 if (sft)
1059 free(sft);
1060
1061 if (sink) {
1062 if (sink->ctx.ring)
1063 ring_free(sink->ctx.ring);
1064
Willy Tarreau2b718102021-04-21 07:32:39 +02001065 LIST_DELETE(&sink->sink_list);
Emeric Brun94aab062021-04-02 10:41:36 +02001066 free(sink->name);
1067 free(sink->desc);
1068 free(sink);
1069 }
1070
1071 return NULL;
1072}
1073
Emeric Brun99c453d2020-05-25 15:01:04 +02001074/*
1075 * Post parsing "ring" section.
1076 *
1077 * The function returns 0 in success case, otherwise, it returns error
1078 * flags.
1079 */
1080int cfg_post_parse_ring()
1081{
1082 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +02001083 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +02001084
1085 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
1086 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
1087 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +02001088 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +02001089 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
1090 err_code |= ERR_ALERT;
1091 }
Emeric Brun494c5052020-05-28 11:13:15 +02001092
1093 /* prepare forward server descriptors */
1094 if (cfg_sink->forward_px) {
1095 srv = cfg_sink->forward_px->srv;
1096 while (srv) {
1097 struct sink_forward_target *sft;
1098 /* init ssl if needed */
1099 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
1100 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
1101 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
1102 err_code |= ERR_ALERT | ERR_FATAL;
1103 }
1104 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001105
Emeric Brun494c5052020-05-28 11:13:15 +02001106 /* allocate sink_forward_target descriptor */
1107 sft = calloc(1, sizeof(*sft));
1108 if (!sft) {
1109 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1110 err_code |= ERR_ALERT | ERR_FATAL;
1111 break;
1112 }
1113 sft->srv = srv;
1114 sft->appctx = NULL;
1115 sft->ofs = ~0; /* init ring offset */
1116 sft->next = cfg_sink->sft;
1117 HA_SPIN_INIT(&sft->lock);
1118
1119 /* mark server attached to the ring */
1120 if (!ring_attach(cfg_sink->ctx.ring)) {
1121 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1122 err_code |= ERR_ALERT | ERR_FATAL;
1123 }
1124 cfg_sink->sft = sft;
1125 srv = srv->next;
1126 }
1127 sink_init_forward(cfg_sink);
1128 }
1129 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001130 cfg_sink = NULL;
1131
1132 return err_code;
1133}
1134
1135/* resolve sink names at end of config. Returns 0 on success otherwise error
1136 * flags.
1137*/
1138int post_sink_resolve()
1139{
Christopher Fauletfc633b62020-11-06 15:24:23 +01001140 int err_code = ERR_NONE;
Emeric Brun99c453d2020-05-25 15:01:04 +02001141 struct logsrv *logsrv, *logb;
1142 struct sink *sink;
1143 struct proxy *px;
1144
1145 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1146 if (logsrv->type == LOG_TARGET_BUFFER) {
1147 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001148 if (!sink) {
1149 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1150 * means we must allocate a sink
1151 * buffer to send messages to this logsrv
1152 */
1153 if (logsrv->addr.ss_family != AF_UNSPEC) {
1154 sink = sink_new_from_logsrv(logsrv);
1155 if (!sink) {
1156 ha_alert("global stream log server declared in file '%s' at line %d cannot be initialized'.\n",
1157 logsrv->conf.file, logsrv->conf.line);
1158 err_code |= ERR_ALERT | ERR_FATAL;
1159 }
1160 }
1161 else {
1162 ha_alert("global log server declared in file '%s' at line %d uses unknown ring named '%s'.\n",
1163 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1164 err_code |= ERR_ALERT | ERR_FATAL;
1165 }
1166 }
1167 else if (sink->type != SINK_TYPE_BUFFER) {
1168 ha_alert("global log server declared in file '%s' at line %d uses incompatible ring '%s'.\n",
1169 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001170 err_code |= ERR_ALERT | ERR_FATAL;
1171 }
1172 logsrv->sink = sink;
1173 }
Emeric Brun94aab062021-04-02 10:41:36 +02001174
Emeric Brun99c453d2020-05-25 15:01:04 +02001175 }
1176
1177 for (px = proxies_list; px; px = px->next) {
1178 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1179 if (logsrv->type == LOG_TARGET_BUFFER) {
1180 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001181 if (!sink) {
1182 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1183 * means we must allocate a sink
1184 * buffer to send messages to this logsrv
1185 */
1186 if (logsrv->addr.ss_family != AF_UNSPEC) {
1187 sink = sink_new_from_logsrv(logsrv);
1188 if (!sink) {
1189 ha_alert("log server declared in proxy section '%s' file '%s' at line %d cannot be initialized'.\n",
1190 px->id, logsrv->conf.file, logsrv->conf.line);
1191 err_code |= ERR_ALERT | ERR_FATAL;
1192 }
1193 }
1194 else {
1195 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1196 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1197 err_code |= ERR_ALERT | ERR_FATAL;
1198 }
1199 }
1200 else if (sink->type != SINK_TYPE_BUFFER) {
1201 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses incomatible ring named '%s'.\n",
1202 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001203 err_code |= ERR_ALERT | ERR_FATAL;
1204 }
1205 logsrv->sink = sink;
1206 }
1207 }
1208 }
Emeric Brun12941c82020-07-07 14:19:42 +02001209
1210 for (px = cfg_log_forward; px; px = px->next) {
1211 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1212 if (logsrv->type == LOG_TARGET_BUFFER) {
1213 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001214 if (!sink) {
1215 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1216 * means we must allocate a sink
1217 * buffer to send messages to this logsrv
1218 */
1219 if (logsrv->addr.ss_family != AF_UNSPEC) {
1220 sink = sink_new_from_logsrv(logsrv);
1221 if (!sink) {
1222 ha_alert("log server declared in log-forward section '%s' file '%s' at line %d cannot be initialized'.\n",
1223 px->id, logsrv->conf.file, logsrv->conf.line);
1224 err_code |= ERR_ALERT | ERR_FATAL;
1225 }
1226 }
1227 else {
1228 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1229 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1230 err_code |= ERR_ALERT | ERR_FATAL;
1231 }
1232 }
1233 else if (sink->type != SINK_TYPE_BUFFER) {
1234 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1235 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun12941c82020-07-07 14:19:42 +02001236 err_code |= ERR_ALERT | ERR_FATAL;
1237 }
1238 logsrv->sink = sink;
1239 }
1240 }
1241 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001242 return err_code;
1243}
1244
1245
Willy Tarreau973e6622019-08-20 11:57:52 +02001246static void sink_init()
1247{
Emeric Brun54648852020-07-06 15:54:06 +02001248 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1249 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1250 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001251}
1252
1253static void sink_deinit()
1254{
1255 struct sink *sink, *sb;
1256
1257 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1258 if (sink->type == SINK_TYPE_BUFFER)
1259 ring_free(sink->ctx.ring);
Willy Tarreau2b718102021-04-21 07:32:39 +02001260 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001261 free(sink->name);
1262 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001263 free(sink);
1264 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001265}
1266
1267INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001268REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001269
Willy Tarreau9f830d72019-08-26 18:17:04 +02001270static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaub205bfd2021-05-07 11:38:37 +02001271 { { "show", "events", NULL }, "show events [<sink>] [-w] [-n] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001272 {{},}
1273}};
1274
1275INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1276
Emeric Brun99c453d2020-05-25 15:01:04 +02001277/* config parsers for this section */
1278REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1279REGISTER_POST_CHECK(post_sink_resolve);
1280
Willy Tarreau67b5a162019-08-11 16:38:56 +02001281/*
1282 * Local variables:
1283 * c-indent-level: 8
1284 * c-basic-offset: 8
1285 * End:
1286 */