blob: b7f111cc766ab03d7804e561087d5cf08e57befc [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau36979d92020-06-05 17:27:29 +020021#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020023#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020024#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020025#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020026#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020027#include <haproxy/log.h>
Willy Tarreau817538e2021-05-08 20:20:21 +020028#include <haproxy/proxy.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020029#include <haproxy/ring.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020030#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020031#include <haproxy/sink.h>
Willy Tarreau5e539c92020-06-04 20:45:39 +020032#include <haproxy/stream_interface.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020033#include <haproxy/time.h>
Willy Tarreau4bad5e22021-05-08 13:05:30 +020034#include <haproxy/tools.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020035
36struct list sink_list = LIST_HEAD_INIT(sink_list);
37
Emeric Brunfd667082022-09-13 16:16:30 +020038/* sink proxies list */
39struct proxy *sink_proxies_list;
40
Emeric Brun99c453d2020-05-25 15:01:04 +020041struct sink *cfg_sink;
42
Willy Tarreau67b5a162019-08-11 16:38:56 +020043struct sink *sink_find(const char *name)
44{
45 struct sink *sink;
46
47 list_for_each_entry(sink, &sink_list, sink_list)
48 if (strcmp(sink->name, name) == 0)
49 return sink;
50 return NULL;
51}
52
53/* creates a new sink and adds it to the list, it's still generic and not fully
54 * initialized. Returns NULL on allocation failure. If another one already
55 * exists with the same name, it will be returned. The caller can detect it as
56 * a newly created one has type SINK_TYPE_NEW.
57 */
Emeric Brun54648852020-07-06 15:54:06 +020058static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020059{
60 struct sink *sink;
61
62 sink = sink_find(name);
63 if (sink)
64 goto end;
65
Emeric Brun494c5052020-05-28 11:13:15 +020066 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020067 if (!sink)
68 goto end;
69
Emeric Brun99c453d2020-05-25 15:01:04 +020070 sink->name = strdup(name);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010071 if (!sink->name)
72 goto err;
73
Emeric Brun99c453d2020-05-25 15:01:04 +020074 sink->desc = strdup(desc);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010075 if (!sink->desc)
76 goto err;
77
Willy Tarreau67b5a162019-08-11 16:38:56 +020078 sink->fmt = fmt;
79 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010080 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020081 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020082 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020083 sink->ctx.dropped = 0;
84 HA_RWLOCK_INIT(&sink->ctx.lock);
Willy Tarreau2b718102021-04-21 07:32:39 +020085 LIST_APPEND(&sink_list, &sink->sink_list);
Willy Tarreau67b5a162019-08-11 16:38:56 +020086 end:
87 return sink;
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010088
89 err:
Willy Tarreau61cfdf42021-02-20 10:46:51 +010090 ha_free(&sink->name);
91 ha_free(&sink->desc);
92 ha_free(&sink);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010093
94 return NULL;
Willy Tarreau67b5a162019-08-11 16:38:56 +020095}
96
Willy Tarreau973e6622019-08-20 11:57:52 +020097/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
98 * and description <desc>. Returns NULL on allocation failure or conflict.
99 * Perfect duplicates are merged (same type, fd, and name).
100 */
Emeric Brun54648852020-07-06 15:54:06 +0200101struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +0200102{
103 struct sink *sink;
104
105 sink = __sink_new(name, desc, fmt);
106 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
107 goto end;
108
109 if (sink->type != SINK_TYPE_NEW) {
110 sink = NULL;
111 goto end;
112 }
113
114 sink->type = SINK_TYPE_FD;
115 sink->ctx.fd = fd;
116 end:
117 return sink;
118}
119
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200120/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
121 * and description <desc>. Returns NULL on allocation failure or conflict.
122 * Perfect duplicates are merged (same type and name). If sizes differ, the
123 * largest one is kept.
124 */
Emeric Brun54648852020-07-06 15:54:06 +0200125struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200126{
127 struct sink *sink;
128
129 sink = __sink_new(name, desc, fmt);
130 if (!sink)
131 goto fail;
132
133 if (sink->type == SINK_TYPE_BUFFER) {
134 /* such a buffer already exists, we may have to resize it */
135 if (!ring_resize(sink->ctx.ring, size))
136 goto fail;
137 goto end;
138 }
139
140 if (sink->type != SINK_TYPE_NEW) {
141 /* already exists of another type */
142 goto fail;
143 }
144
145 sink->ctx.ring = ring_new(size);
146 if (!sink->ctx.ring) {
Willy Tarreau2b718102021-04-21 07:32:39 +0200147 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200148 free(sink->name);
149 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200150 free(sink);
151 goto fail;
152 }
153
154 sink->type = SINK_TYPE_BUFFER;
155 end:
156 return sink;
157 fail:
158 return NULL;
159}
160
Willy Tarreau67b5a162019-08-11 16:38:56 +0200161/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500162 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200163 * done here. Lost messages are NOT accounted for. It is preferable to call
164 * sink_write() instead which will also try to emit the number of dropped
165 * messages when there are any. It returns >0 if it could write anything,
166 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200167 */
Emeric Brun54648852020-07-06 15:54:06 +0200168 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
169 int level, int facility, struct ist *metadata)
170 {
171 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200172 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200173
Emeric Brun54648852020-07-06 15:54:06 +0200174 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200175 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200176
Emeric Brun54648852020-07-06 15:54:06 +0200177 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200178
179send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200180 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200181 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200182 }
183 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200184 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200185 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200186 return 0;
187}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200188
Willy Tarreau8f240232019-08-27 16:41:06 +0200189/* Tries to emit a message indicating the number of dropped events. In case of
190 * success, the amount of drops is reduced by as much. It's supposed to be
191 * called under an exclusive lock on the sink to avoid multiple produces doing
192 * the same. On success, >0 is returned, otherwise <=0 on failure.
193 */
Emeric Brun54648852020-07-06 15:54:06 +0200194int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200195{
Emeric Brun54648852020-07-06 15:54:06 +0200196 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
197 static THREAD_LOCAL pid_t curr_pid;
198 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200199 unsigned int dropped;
200 struct buffer msg;
201 struct ist msgvec[1];
202 char logbuf[64];
203
204 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
205 chunk_init(&msg, logbuf, sizeof(logbuf));
206 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
207 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200208
Emeric Brun54648852020-07-06 15:54:06 +0200209 if (!metadata[LOG_META_HOST].len) {
210 if (global.log_send_hostname)
211 metadata[LOG_META_HOST] = ist2(global.log_send_hostname, strlen(global.log_send_hostname));
Emeric Brun54648852020-07-06 15:54:06 +0200212 }
213
214 if (!metadata[LOG_META_TAG].len)
215 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
216
217 if (unlikely(curr_pid != getpid()))
218 metadata[LOG_META_PID].len = 0;
219
220 if (!metadata[LOG_META_PID].len) {
221 curr_pid = getpid();
222 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
223 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
224 }
225
226 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200227 return 0;
228 /* success! */
229 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
230 }
231 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200232}
233
Willy Tarreau9f830d72019-08-26 18:17:04 +0200234/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
235static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
236{
237 struct sink *sink;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200238 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200239
240 args++; // make args[1] the 1st arg
241
242 if (!*args[1]) {
243 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200244 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200245 list_for_each_entry(sink, &sink_list, sink_list) {
246 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
247 sink->name,
248 sink->type == SINK_TYPE_NEW ? "init" :
249 sink->type == SINK_TYPE_FD ? "fd" :
250 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
251 sink->ctx.dropped, sink->desc);
252 }
253
254 trash.area[trash.data] = 0;
255 return cli_msg(appctx, LOG_WARNING, trash.area);
256 }
257
258 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
259 return 1;
260
261 sink = sink_find(args[1]);
262 if (!sink)
263 return cli_err(appctx, "No such event sink");
264
265 if (sink->type != SINK_TYPE_BUFFER)
266 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
267
Willy Tarreau1d181e42019-08-30 11:17:01 +0200268 for (arg = 2; *args[arg]; arg++) {
269 if (strcmp(args[arg], "-w") == 0)
270 appctx->ctx.cli.i0 |= 1; // wait mode
271 else if (strcmp(args[arg], "-n") == 0)
272 appctx->ctx.cli.i0 |= 2; // seek to new
273 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
274 appctx->ctx.cli.i0 |= 3; // seek to new + wait
275 else
276 return cli_err(appctx, "unknown option");
277 }
Willy Tarreau9f830d72019-08-26 18:17:04 +0200278 return ring_attach_cli(sink->ctx.ring, appctx);
279}
280
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500281/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200282void sink_setup_proxy(struct proxy *px)
283{
284 px->last_change = now.tv_sec;
Christopher Faulet1e25eb12022-10-24 15:10:18 +0200285 px->cap = PR_CAP_BE;
Emeric Brun494c5052020-05-28 11:13:15 +0200286 px->maxconn = 0;
287 px->conn_retries = 1;
288 px->timeout.server = TICK_ETERNITY;
289 px->timeout.client = TICK_ETERNITY;
290 px->timeout.connect = TICK_ETERNITY;
291 px->accept = NULL;
292 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
293 px->bind_proc = 0; /* will be filled by users */
Emeric Brunfd667082022-09-13 16:16:30 +0200294 px->next = sink_proxies_list;
295 sink_proxies_list = px;
Emeric Brun494c5052020-05-28 11:13:15 +0200296}
297
298/*
299 * IO Handler to handle message push to syslog tcp server
300 */
301static void sink_forward_io_handler(struct appctx *appctx)
302{
303 struct stream_interface *si = appctx->owner;
304 struct stream *s = si_strm(si);
305 struct sink *sink = strm_fe(s)->parent;
306 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
307 struct ring *ring = sink->ctx.ring;
308 struct buffer *buf = &ring->buf;
309 uint64_t msg_len;
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200310 size_t len, cnt, ofs, last_ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200311 int ret = 0;
312
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500313 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200314 if (unlikely(stopping))
315 goto close;
316
317 /* for rex because it seems reset to timeout
318 * and we don't want expire on this case
319 * with a syslog server
320 */
321 si_oc(si)->rex = TICK_ETERNITY;
322 /* rto should not change but it seems the case */
323 si_oc(si)->rto = TICK_ETERNITY;
324
325 /* an error was detected */
326 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
327 goto close;
328
329 /* con closed by server side */
330 if ((si_oc(si)->flags & CF_SHUTW))
331 goto close;
332
333 /* if the connection is not established, inform the stream that we want
334 * to be notified whenever the connection completes.
335 */
336 if (si_opposite(si)->state < SI_ST_EST) {
337 si_cant_get(si);
338 si_rx_conn_blk(si);
339 si_rx_endp_more(si);
340 return;
341 }
342
343 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
344 if (appctx != sft->appctx) {
345 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
346 goto close;
347 }
348 ofs = sft->ofs;
349
350 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
351 LIST_DEL_INIT(&appctx->wait_entry);
352 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
353
354 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
355
356 /* explanation for the initialization below: it would be better to do
357 * this in the parsing function but this would occasionally result in
358 * dropped events because we'd take a reference on the oldest message
359 * and keep it while being scheduled. Thus instead let's take it the
360 * first time we enter here so that we have a chance to pass many
361 * existing messages before grabbing a reference to a location. This
362 * value cannot be produced after initialization.
363 */
364 if (unlikely(ofs == ~0)) {
365 ofs = 0;
366
Willy Tarreau4781b152021-04-06 13:53:36 +0200367 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200368 ofs += ring->ofs;
369 }
370
Emeric Brun494c5052020-05-28 11:13:15 +0200371 /* in this loop, ofs always points to the counter byte that precedes
372 * the message so that we can take our reference there if we have to
373 * stop before the end (ret=0).
374 */
375 if (si_opposite(si)->state == SI_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100376 /* we were already there, adjust the offset to be relative to
377 * the buffer's head and remove us from the counter.
378 */
379 ofs -= ring->ofs;
380 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200381 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100382
Emeric Brun494c5052020-05-28 11:13:15 +0200383 ret = 1;
384 while (ofs + 1 < b_data(buf)) {
385 cnt = 1;
386 len = b_peek_varint(buf, ofs + cnt, &msg_len);
387 if (!len)
388 break;
389 cnt += len;
390 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
391
392 if (unlikely(msg_len + 1 > b_size(&trash))) {
393 /* too large a message to ever fit, let's skip it */
394 ofs += cnt + msg_len;
395 continue;
396 }
397
398 chunk_reset(&trash);
399 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
400 trash.data += len;
401 trash.area[trash.data++] = '\n';
402
403 if (ci_putchk(si_ic(si), &trash) == -1) {
404 si_rx_room_blk(si);
405 ret = 0;
406 break;
407 }
408 ofs += cnt + msg_len;
409 }
410
Willy Tarreau4781b152021-04-06 13:53:36 +0200411 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200412 ofs += ring->ofs;
413 sft->ofs = ofs;
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200414 last_ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200415 }
416 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
417
418 if (ret) {
419 /* let's be woken up once new data arrive */
420 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200421 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200422 ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200423 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200424 if (ofs != last_ofs) {
425 /* more data was added into the ring between the
426 * unlock and the lock, and the writer might not
427 * have seen us. We need to reschedule a read.
428 */
429 si_rx_endp_more(si);
430 } else
431 si_rx_endp_done(si);
Emeric Brun494c5052020-05-28 11:13:15 +0200432 }
433 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
434
435 /* always drain data from server */
436 co_skip(si_oc(si), si_oc(si)->output);
437 return;
438
439close:
440 si_shutw(si);
441 si_shutr(si);
442 si_ic(si)->flags |= CF_READ_NULL;
443}
444
Emeric Brun97556472020-05-30 01:42:45 +0200445/*
446 * IO Handler to handle message push to syslog tcp server
447 * using octet counting frames
448 */
449static void sink_forward_oc_io_handler(struct appctx *appctx)
450{
451 struct stream_interface *si = appctx->owner;
452 struct stream *s = si_strm(si);
453 struct sink *sink = strm_fe(s)->parent;
454 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
455 struct ring *ring = sink->ctx.ring;
456 struct buffer *buf = &ring->buf;
457 uint64_t msg_len;
458 size_t len, cnt, ofs;
459 int ret = 0;
460 char *p;
461
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500462 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200463 if (unlikely(stopping))
464 goto close;
465
466 /* for rex because it seems reset to timeout
467 * and we don't want expire on this case
468 * with a syslog server
469 */
470 si_oc(si)->rex = TICK_ETERNITY;
471 /* rto should not change but it seems the case */
472 si_oc(si)->rto = TICK_ETERNITY;
473
474 /* an error was detected */
475 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
476 goto close;
477
478 /* con closed by server side */
479 if ((si_oc(si)->flags & CF_SHUTW))
480 goto close;
481
482 /* if the connection is not established, inform the stream that we want
483 * to be notified whenever the connection completes.
484 */
485 if (si_opposite(si)->state < SI_ST_EST) {
486 si_cant_get(si);
487 si_rx_conn_blk(si);
488 si_rx_endp_more(si);
489 return;
490 }
491
492 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
493 if (appctx != sft->appctx) {
494 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
495 goto close;
496 }
497 ofs = sft->ofs;
498
499 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
500 LIST_DEL_INIT(&appctx->wait_entry);
501 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
502
503 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
504
505 /* explanation for the initialization below: it would be better to do
506 * this in the parsing function but this would occasionally result in
507 * dropped events because we'd take a reference on the oldest message
508 * and keep it while being scheduled. Thus instead let's take it the
509 * first time we enter here so that we have a chance to pass many
510 * existing messages before grabbing a reference to a location. This
511 * value cannot be produced after initialization.
512 */
513 if (unlikely(ofs == ~0)) {
514 ofs = 0;
515
Willy Tarreau4781b152021-04-06 13:53:36 +0200516 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200517 ofs += ring->ofs;
518 }
519
Emeric Brun97556472020-05-30 01:42:45 +0200520 /* in this loop, ofs always points to the counter byte that precedes
521 * the message so that we can take our reference there if we have to
522 * stop before the end (ret=0).
523 */
524 if (si_opposite(si)->state == SI_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100525 /* we were already there, adjust the offset to be relative to
526 * the buffer's head and remove us from the counter.
527 */
528 ofs -= ring->ofs;
529 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200530 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100531
Emeric Brun97556472020-05-30 01:42:45 +0200532 ret = 1;
533 while (ofs + 1 < b_data(buf)) {
534 cnt = 1;
535 len = b_peek_varint(buf, ofs + cnt, &msg_len);
536 if (!len)
537 break;
538 cnt += len;
539 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
540
541 chunk_reset(&trash);
542 p = ulltoa(msg_len, trash.area, b_size(&trash));
543 if (p) {
544 trash.data = (p - trash.area) + 1;
545 *p = ' ';
546 }
547
548 if (!p || (trash.data + msg_len > b_size(&trash))) {
549 /* too large a message to ever fit, let's skip it */
550 ofs += cnt + msg_len;
551 continue;
552 }
553
554 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
555
556 if (ci_putchk(si_ic(si), &trash) == -1) {
557 si_rx_room_blk(si);
558 ret = 0;
559 break;
560 }
561 ofs += cnt + msg_len;
562 }
563
Willy Tarreau4781b152021-04-06 13:53:36 +0200564 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200565 ofs += ring->ofs;
566 sft->ofs = ofs;
567 }
568 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
569
570 if (ret) {
571 /* let's be woken up once new data arrive */
572 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200573 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Emeric Brun97556472020-05-30 01:42:45 +0200574 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
575 si_rx_endp_done(si);
576 }
577 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
578
579 /* always drain data from server */
580 co_skip(si_oc(si), si_oc(si)->output);
581 return;
582
583close:
584 si_shutw(si);
585 si_shutr(si);
586 si_ic(si)->flags |= CF_READ_NULL;
587}
588
Emeric Brun494c5052020-05-28 11:13:15 +0200589void __sink_forward_session_deinit(struct sink_forward_target *sft)
590{
591 struct stream_interface *si;
592 struct stream *s;
593 struct sink *sink;
594
595 if (!sft->appctx)
596 return;
597
598 si = sft->appctx->owner;
599 if (!si)
600 return;
601
602 s = si_strm(si);
603 if (!s)
604 return;
605
606 sink = strm_fe(s)->parent;
607 if (!sink)
608 return;
609
610 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
611 LIST_DEL_INIT(&sft->appctx->wait_entry);
612 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
613
614 sft->appctx = NULL;
615 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
616}
617
618
619static void sink_forward_session_release(struct appctx *appctx)
620{
Christopher Fauletefebfda2022-01-14 15:03:22 +0100621 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
Emeric Brun494c5052020-05-28 11:13:15 +0200622
623 if (!sft)
624 return;
625
626 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
627 if (sft->appctx == appctx)
628 __sink_forward_session_deinit(sft);
629 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
630}
631
632static struct applet sink_forward_applet = {
633 .obj_type = OBJ_TYPE_APPLET,
634 .name = "<SINKFWD>", /* used for logging */
635 .fct = sink_forward_io_handler,
636 .release = sink_forward_session_release,
637};
638
Emeric Brun97556472020-05-30 01:42:45 +0200639static struct applet sink_forward_oc_applet = {
640 .obj_type = OBJ_TYPE_APPLET,
641 .name = "<SINKFWDOC>", /* used for logging */
642 .fct = sink_forward_oc_io_handler,
643 .release = sink_forward_session_release,
644};
645
Emeric Brun494c5052020-05-28 11:13:15 +0200646/*
647 * Create a new peer session in assigned state (connect will start automatically)
648 */
649static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
650{
651 struct proxy *p = sink->forward_px;
652 struct appctx *appctx;
653 struct session *sess;
654 struct stream *s;
Emeric Brun97556472020-05-30 01:42:45 +0200655 struct applet *applet = &sink_forward_applet;
656
657 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
658 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200659
Emeric Brun97556472020-05-30 01:42:45 +0200660 appctx = appctx_new(applet, tid_bit);
Emeric Brun494c5052020-05-28 11:13:15 +0200661 if (!appctx)
662 goto out_close;
663
664 appctx->ctx.sft.ptr = (void *)sft;
665
666 sess = session_new(p, NULL, &appctx->obj_type);
667 if (!sess) {
668 ha_alert("out of memory in peer_session_create().\n");
669 goto out_free_appctx;
670 }
671
Christopher Faulet26256f82020-09-14 11:40:13 +0200672 if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) {
Emeric Brun494c5052020-05-28 11:13:15 +0200673 ha_alert("Failed to initialize stream in peer_session_create().\n");
674 goto out_free_sess;
675 }
676
677
678 s->target = &sft->srv->obj_type;
Willy Tarreau9b7587a2020-10-15 07:32:10 +0200679 if (!sockaddr_alloc(&s->target_addr, &sft->srv->addr, sizeof(sft->srv->addr)))
Emeric Brun494c5052020-05-28 11:13:15 +0200680 goto out_free_strm;
Emeric Brun494c5052020-05-28 11:13:15 +0200681 s->flags = SF_ASSIGNED|SF_ADDR_SET;
682 s->si[1].flags |= SI_FL_NOLINGER;
683
684 s->do_log = NULL;
685 s->uniq_id = 0;
686
687 s->res.flags |= CF_READ_DONTWAIT;
688 /* for rto and rex to eternity to not expire on idle recv:
689 * We are using a syslog server.
690 */
691 s->res.rto = TICK_ETERNITY;
692 s->res.rex = TICK_ETERNITY;
693 sft->appctx = appctx;
694 task_wakeup(s->task, TASK_WOKEN_INIT);
695 return appctx;
696
697 /* Error unrolling */
698 out_free_strm:
Willy Tarreau2b718102021-04-21 07:32:39 +0200699 LIST_DELETE(&s->list);
Emeric Brun494c5052020-05-28 11:13:15 +0200700 pool_free(pool_head_stream, s);
701 out_free_sess:
702 session_free(sess);
703 out_free_appctx:
704 appctx_free(appctx);
705 out_close:
706 return NULL;
707}
708
709/*
710 * Task to handle connctions to forward servers
711 */
Willy Tarreau144f84a2021-03-02 16:09:26 +0100712static struct task *process_sink_forward(struct task * task, void *context, unsigned int state)
Emeric Brun494c5052020-05-28 11:13:15 +0200713{
714 struct sink *sink = (struct sink *)context;
715 struct sink_forward_target *sft = sink->sft;
716
717 task->expire = TICK_ETERNITY;
718
719 if (!stopping) {
720 while (sft) {
721 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
722 /* if appctx is NULL, start a new session */
723 if (!sft->appctx)
724 sft->appctx = sink_forward_session_create(sink, sft);
725 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
726 sft = sft->next;
727 }
728 }
729 else {
730 while (sft) {
731 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
732 /* awake applet to perform a clean close */
733 if (sft->appctx)
734 appctx_wakeup(sft->appctx);
735 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
736 sft = sft->next;
737 }
738 }
739
740 return task;
741}
742/*
743 * Init task to manage connctions to forward servers
744 *
745 * returns 0 in case of error.
746 */
747int sink_init_forward(struct sink *sink)
748{
749 sink->forward_task = task_new(MAX_THREADS_MASK);
750 if (!sink->forward_task)
751 return 0;
752
753 sink->forward_task->process = process_sink_forward;
754 sink->forward_task->context = (void *)sink;
755 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
756 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
757 return 1;
758}
Emeric Brun99c453d2020-05-25 15:01:04 +0200759/*
760 * Parse "ring" section and create corresponding sink buffer.
761 *
762 * The function returns 0 in success case, otherwise, it returns error
763 * flags.
764 */
765int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
766{
767 int err_code = 0;
768 const char *inv;
769 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200770 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200771
Willy Tarreau9d675b62022-08-11 16:12:11 +0200772 if (strcmp(args[0], "ring") == 0) { /* new ring section */
Emeric Brun99c453d2020-05-25 15:01:04 +0200773 if (!*args[1]) {
774 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
775 err_code |= ERR_ALERT | ERR_FATAL;
776 goto err;
777 }
778
779 inv = invalid_char(args[1]);
780 if (inv) {
781 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
782 err_code |= ERR_ALERT | ERR_FATAL;
783 goto err;
784 }
785
786 if (sink_find(args[1])) {
787 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
788 err_code |= ERR_ALERT | ERR_FATAL;
789 goto err;
790 }
791
Emeric Brun54648852020-07-06 15:54:06 +0200792 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200793 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
794 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
795 err_code |= ERR_ALERT | ERR_FATAL;
796 goto err;
797 }
Emeric Brun494c5052020-05-28 11:13:15 +0200798
799 /* allocate new proxy to handle forwards */
800 p = calloc(1, sizeof *p);
801 if (!p) {
802 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
803 err_code |= ERR_ALERT | ERR_FATAL;
804 goto err;
805 }
806
807 init_new_proxy(p);
808 sink_setup_proxy(p);
809 p->parent = cfg_sink;
810 p->id = strdup(args[1]);
811 p->conf.args.file = p->conf.file = strdup(file);
812 p->conf.args.line = p->conf.line = linenum;
813 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200814 }
815 else if (strcmp(args[0], "size") == 0) {
Willy Tarreau9d675b62022-08-11 16:12:11 +0200816 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
817 ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum);
818 err_code |= ERR_ALERT | ERR_FATAL;
819 goto err;
820 }
821
Emeric Brun99c453d2020-05-25 15:01:04 +0200822 size = atol(args[1]);
823 if (!size) {
824 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
825 err_code |= ERR_ALERT | ERR_FATAL;
826 goto err;
827 }
828
Willy Tarreau9d675b62022-08-11 16:12:11 +0200829 if (size < cfg_sink->ctx.ring->buf.size) {
830 ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than current size '%llu' for ring '%s'.\n",
831 file, linenum, (ullong)size, (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
832 err_code |= ERR_ALERT | ERR_FATAL;
833 goto err;
834 }
835
836 if (!ring_resize(cfg_sink->ctx.ring, size)) {
837 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%llu' for ring '%s'.\n", file, linenum,
838 (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200839 err_code |= ERR_ALERT | ERR_FATAL;
840 goto err;
841 }
842 }
Emeric Brun494c5052020-05-28 11:13:15 +0200843 else if (strcmp(args[0],"server") == 0) {
Willy Tarreau28ae98a2022-11-16 18:56:34 +0100844 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
845 ha_alert("parsing [%s:%d] : unable to create server '%s'.\n", file, linenum, args[1]);
846 err_code |= ERR_ALERT | ERR_FATAL;
847 goto err;
848 }
849
Amaury Denoyelle30c05372021-03-08 16:36:46 +0100850 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL,
851 SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE);
Emeric Brun494c5052020-05-28 11:13:15 +0200852 }
853 else if (strcmp(args[0],"timeout") == 0) {
854 if (!cfg_sink || !cfg_sink->forward_px) {
855 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
856 err_code |= ERR_ALERT | ERR_FATAL;
857 goto err;
858 }
859
860 if (strcmp(args[1], "connect") == 0 ||
861 strcmp(args[1], "server") == 0) {
862 const char *res;
863 unsigned int tout;
864
865 if (!*args[2]) {
866 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
867 file, linenum, args[0], args[1]);
868 err_code |= ERR_ALERT | ERR_FATAL;
869 goto err;
870 }
871 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
872 if (res == PARSE_TIME_OVER) {
873 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
874 file, linenum, args[2], args[0], args[1]);
875 err_code |= ERR_ALERT | ERR_FATAL;
876 goto err;
877 }
878 else if (res == PARSE_TIME_UNDER) {
879 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
880 file, linenum, args[2], args[0], args[1]);
881 err_code |= ERR_ALERT | ERR_FATAL;
882 goto err;
883 }
884 else if (res) {
885 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
886 file, linenum, *res, args[0], args[1]);
887 err_code |= ERR_ALERT | ERR_FATAL;
888 goto err;
889 }
Christopher Faulet737b3fd2022-10-19 16:26:21 +0200890 if (args[1][0] == 'c')
Emeric Brun494c5052020-05-28 11:13:15 +0200891 cfg_sink->forward_px->timeout.connect = tout;
892 else
893 cfg_sink->forward_px->timeout.server = tout;
894 }
895 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200896 else if (strcmp(args[0],"format") == 0) {
897 if (!cfg_sink) {
898 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
899 err_code |= ERR_ALERT | ERR_FATAL;
900 goto err;
901 }
902
Emeric Brun54648852020-07-06 15:54:06 +0200903 cfg_sink->fmt = get_log_format(args[1]);
904 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +0200905 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
906 err_code |= ERR_ALERT | ERR_FATAL;
907 goto err;
908 }
909 }
910 else if (strcmp(args[0],"maxlen") == 0) {
911 if (!cfg_sink) {
912 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
913 err_code |= ERR_ALERT | ERR_FATAL;
914 goto err;
915 }
916
917 cfg_sink->maxlen = atol(args[1]);
918 if (!cfg_sink->maxlen) {
919 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
920 err_code |= ERR_ALERT | ERR_FATAL;
921 goto err;
922 }
923 }
924 else if (strcmp(args[0],"description") == 0) {
925 if (!cfg_sink) {
926 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
927 err_code |= ERR_ALERT | ERR_FATAL;
928 goto err;
929 }
930
931 if (!*args[1]) {
932 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
933 err_code |= ERR_ALERT | ERR_FATAL;
934 goto err;
935 }
936
937 free(cfg_sink->desc);
938
939 cfg_sink->desc = strdup(args[1]);
940 if (!cfg_sink->desc) {
941 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
942 err_code |= ERR_ALERT | ERR_FATAL;
943 goto err;
944 }
945 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200946 else {
947 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
948 err_code |= ERR_ALERT | ERR_FATAL;
949 goto err;
950 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200951
952err:
953 return err_code;
954}
955
Emeric Brun94aab062021-04-02 10:41:36 +0200956/* Creates an new sink buffer from a log server.
957 *
958 * It uses the logsrvaddress to declare a forward
959 * server for this buffer. And it initializes the
960 * forwarding.
961 *
962 * The function returns a pointer on the
963 * allocated struct sink if allocate
964 * and initialize succeed, else if it fails
965 * it returns NULL.
966 *
967 * Note: the sink is created using the name
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +0500968 * specified into logsrv->ring_name
Emeric Brun94aab062021-04-02 10:41:36 +0200969 */
970struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
971{
972 struct proxy *p = NULL;
973 struct sink *sink = NULL;
974 struct server *srv = NULL;
975 struct sink_forward_target *sft = NULL;
976 int i;
977
978 /* allocate new proxy to handle
979 * forward to a stream server
980 */
981 p = calloc(1, sizeof *p);
982 if (!p) {
983 goto error;
984 }
985
986 init_new_proxy(p);
987 sink_setup_proxy(p);
988 p->id = strdup(logsrv->ring_name);
989 p->conf.args.file = p->conf.file = strdup(logsrv->conf.file);
990 p->conf.args.line = p->conf.line = logsrv->conf.line;
991
Christopher Faulet46da0412022-10-24 15:53:01 +0200992 /* Set default connect and server timeout */
993 p->timeout.connect = MS_TO_TICKS(1000);
994 p->timeout.server = MS_TO_TICKS(5000);
995
Emeric Brun94aab062021-04-02 10:41:36 +0200996 /* allocate a new server to forward messages
997 * from ring buffer
998 */
999 srv = new_server(p);
1000 if (!srv)
1001 goto error;
1002
1003 /* init server */
1004 srv->id = strdup(logsrv->ring_name);
1005 srv->conf.file = strdup(logsrv->conf.file);
1006 srv->conf.line = logsrv->conf.line;
1007 srv->addr = logsrv->addr;
1008 srv->svc_port = get_host_port(&logsrv->addr);
1009 HA_SPIN_INIT(&srv->lock);
1010
1011 /* process per thread init */
1012 srv->per_thr = calloc(global.nbthread, sizeof(*srv->per_thr));
1013 if (!srv->per_thr)
1014 goto error;
1015
1016 for (i = 0; i < global.nbthread; i++) {
1017 srv->per_thr[i].idle_conns = EB_ROOT;
1018 srv->per_thr[i].safe_conns = EB_ROOT;
1019 srv->per_thr[i].avail_conns = EB_ROOT;
1020 MT_LIST_INIT(&srv->per_thr[i].streams);
1021 }
1022
1023 /* the servers are linked backwards
1024 * first into proxy
1025 */
1026 p->srv = srv;
1027 srv->next = p->srv;
1028
1029 /* allocate sink_forward_target descriptor */
1030 sft = calloc(1, sizeof(*sft));
1031 if (!sft)
1032 goto error;
1033
1034 /* init sink_forward_target offset */
1035 sft->srv = srv;
1036 sft->appctx = NULL;
1037 sft->ofs = ~0;
1038 HA_SPIN_INIT(&sft->lock);
1039
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +05001040 /* prepare description for the sink */
Emeric Brun94aab062021-04-02 10:41:36 +02001041 chunk_reset(&trash);
1042 chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
1043
1044 /* allocate a new sink buffer */
1045 sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE);
1046 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1047 goto error;
1048 }
1049
1050 /* link sink_forward_target to proxy */
1051 sink->forward_px = p;
1052 p->parent = sink;
1053
1054 /* insert into sink_forward_targets
1055 * list into sink
1056 */
1057 sft->next = sink->sft;
1058 sink->sft = sft;
1059
1060 /* mark server as an attached reader to the ring */
1061 if (!ring_attach(sink->ctx.ring)) {
1062 /* should never fail since there is
1063 * only one reader
1064 */
1065 goto error;
1066 }
1067
1068 /* initialize sink buffer forwarding */
1069 if (!sink_init_forward(sink))
1070 goto error;
1071
1072 /* reset familyt of logsrv to consider the ring buffer target */
1073 logsrv->addr.ss_family = AF_UNSPEC;
1074
1075 return sink;
1076error:
1077 if (p) {
1078 if (p->id)
1079 free(p->id);
1080 if (p->conf.file)
1081 free(p->conf.file);
1082
1083 free(p);
1084 }
1085
1086 if (srv) {
1087 if (srv->id)
1088 free(srv->id);
1089 if (srv->conf.file)
1090 free((void *)srv->conf.file);
1091 if (srv->per_thr)
1092 free(srv->per_thr);
1093 free(srv);
1094 }
1095
1096 if (sft)
1097 free(sft);
1098
1099 if (sink) {
1100 if (sink->ctx.ring)
1101 ring_free(sink->ctx.ring);
1102
Willy Tarreau2b718102021-04-21 07:32:39 +02001103 LIST_DELETE(&sink->sink_list);
Emeric Brun94aab062021-04-02 10:41:36 +02001104 free(sink->name);
1105 free(sink->desc);
1106 free(sink);
1107 }
1108
1109 return NULL;
1110}
1111
Emeric Brun99c453d2020-05-25 15:01:04 +02001112/*
1113 * Post parsing "ring" section.
1114 *
1115 * The function returns 0 in success case, otherwise, it returns error
1116 * flags.
1117 */
1118int cfg_post_parse_ring()
1119{
1120 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +02001121 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +02001122
1123 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
1124 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
1125 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +02001126 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +02001127 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
1128 err_code |= ERR_ALERT;
1129 }
Emeric Brun494c5052020-05-28 11:13:15 +02001130
1131 /* prepare forward server descriptors */
1132 if (cfg_sink->forward_px) {
1133 srv = cfg_sink->forward_px->srv;
1134 while (srv) {
1135 struct sink_forward_target *sft;
Emeric Brun99c453d2020-05-25 15:01:04 +02001136
Emeric Brun494c5052020-05-28 11:13:15 +02001137 /* allocate sink_forward_target descriptor */
1138 sft = calloc(1, sizeof(*sft));
1139 if (!sft) {
1140 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1141 err_code |= ERR_ALERT | ERR_FATAL;
1142 break;
1143 }
1144 sft->srv = srv;
1145 sft->appctx = NULL;
1146 sft->ofs = ~0; /* init ring offset */
1147 sft->next = cfg_sink->sft;
1148 HA_SPIN_INIT(&sft->lock);
1149
1150 /* mark server attached to the ring */
1151 if (!ring_attach(cfg_sink->ctx.ring)) {
1152 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1153 err_code |= ERR_ALERT | ERR_FATAL;
1154 }
1155 cfg_sink->sft = sft;
1156 srv = srv->next;
1157 }
1158 sink_init_forward(cfg_sink);
1159 }
1160 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001161 cfg_sink = NULL;
1162
1163 return err_code;
1164}
1165
1166/* resolve sink names at end of config. Returns 0 on success otherwise error
1167 * flags.
1168*/
1169int post_sink_resolve()
1170{
Christopher Fauletfc633b62020-11-06 15:24:23 +01001171 int err_code = ERR_NONE;
Emeric Brun99c453d2020-05-25 15:01:04 +02001172 struct logsrv *logsrv, *logb;
1173 struct sink *sink;
1174 struct proxy *px;
1175
1176 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1177 if (logsrv->type == LOG_TARGET_BUFFER) {
1178 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001179 if (!sink) {
1180 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1181 * means we must allocate a sink
1182 * buffer to send messages to this logsrv
1183 */
1184 if (logsrv->addr.ss_family != AF_UNSPEC) {
1185 sink = sink_new_from_logsrv(logsrv);
1186 if (!sink) {
1187 ha_alert("global stream log server declared in file '%s' at line %d cannot be initialized'.\n",
1188 logsrv->conf.file, logsrv->conf.line);
1189 err_code |= ERR_ALERT | ERR_FATAL;
1190 }
1191 }
1192 else {
1193 ha_alert("global log server declared in file '%s' at line %d uses unknown ring named '%s'.\n",
1194 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1195 err_code |= ERR_ALERT | ERR_FATAL;
1196 }
1197 }
1198 else if (sink->type != SINK_TYPE_BUFFER) {
1199 ha_alert("global log server declared in file '%s' at line %d uses incompatible ring '%s'.\n",
1200 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001201 err_code |= ERR_ALERT | ERR_FATAL;
1202 }
1203 logsrv->sink = sink;
1204 }
Emeric Brun94aab062021-04-02 10:41:36 +02001205
Emeric Brun99c453d2020-05-25 15:01:04 +02001206 }
1207
1208 for (px = proxies_list; px; px = px->next) {
1209 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1210 if (logsrv->type == LOG_TARGET_BUFFER) {
1211 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001212 if (!sink) {
1213 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1214 * means we must allocate a sink
1215 * buffer to send messages to this logsrv
1216 */
1217 if (logsrv->addr.ss_family != AF_UNSPEC) {
1218 sink = sink_new_from_logsrv(logsrv);
1219 if (!sink) {
1220 ha_alert("log server declared in proxy section '%s' file '%s' at line %d cannot be initialized'.\n",
1221 px->id, logsrv->conf.file, logsrv->conf.line);
1222 err_code |= ERR_ALERT | ERR_FATAL;
1223 }
1224 }
1225 else {
1226 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1227 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1228 err_code |= ERR_ALERT | ERR_FATAL;
1229 }
1230 }
1231 else if (sink->type != SINK_TYPE_BUFFER) {
1232 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses incomatible ring named '%s'.\n",
1233 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001234 err_code |= ERR_ALERT | ERR_FATAL;
1235 }
1236 logsrv->sink = sink;
1237 }
1238 }
1239 }
Emeric Brun12941c82020-07-07 14:19:42 +02001240
1241 for (px = cfg_log_forward; px; px = px->next) {
1242 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1243 if (logsrv->type == LOG_TARGET_BUFFER) {
1244 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001245 if (!sink) {
1246 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1247 * means we must allocate a sink
1248 * buffer to send messages to this logsrv
1249 */
1250 if (logsrv->addr.ss_family != AF_UNSPEC) {
1251 sink = sink_new_from_logsrv(logsrv);
1252 if (!sink) {
1253 ha_alert("log server declared in log-forward section '%s' file '%s' at line %d cannot be initialized'.\n",
1254 px->id, logsrv->conf.file, logsrv->conf.line);
1255 err_code |= ERR_ALERT | ERR_FATAL;
1256 }
1257 }
1258 else {
1259 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1260 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1261 err_code |= ERR_ALERT | ERR_FATAL;
1262 }
1263 }
1264 else if (sink->type != SINK_TYPE_BUFFER) {
1265 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1266 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun12941c82020-07-07 14:19:42 +02001267 err_code |= ERR_ALERT | ERR_FATAL;
1268 }
1269 logsrv->sink = sink;
1270 }
1271 }
1272 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001273 return err_code;
1274}
1275
1276
Willy Tarreau973e6622019-08-20 11:57:52 +02001277static void sink_init()
1278{
Emeric Brun54648852020-07-06 15:54:06 +02001279 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1280 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1281 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001282}
1283
1284static void sink_deinit()
1285{
1286 struct sink *sink, *sb;
1287
1288 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1289 if (sink->type == SINK_TYPE_BUFFER)
1290 ring_free(sink->ctx.ring);
Willy Tarreau2b718102021-04-21 07:32:39 +02001291 LIST_DELETE(&sink->sink_list);
Willy Tarreaudb5fd2b2023-01-26 15:46:08 +01001292 task_destroy(sink->forward_task);
Emeric Brun99c453d2020-05-25 15:01:04 +02001293 free(sink->name);
1294 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001295 free(sink);
1296 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001297}
1298
1299INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001300REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001301
Willy Tarreau9f830d72019-08-26 18:17:04 +02001302static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaub205bfd2021-05-07 11:38:37 +02001303 { { "show", "events", NULL }, "show events [<sink>] [-w] [-n] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001304 {{},}
1305}};
1306
1307INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1308
Emeric Brun99c453d2020-05-25 15:01:04 +02001309/* config parsers for this section */
1310REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1311REGISTER_POST_CHECK(post_sink_resolve);
1312
Willy Tarreau67b5a162019-08-11 16:38:56 +02001313/*
1314 * Local variables:
1315 * c-indent-level: 8
1316 * c-basic-offset: 8
1317 * End:
1318 */