blob: 4d811ebb00b3cebe373009ff17b09a57334b6762 [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau36979d92020-06-05 17:27:29 +020021#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020023#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020024#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020025#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020026#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020027#include <haproxy/log.h>
Willy Tarreau817538e2021-05-08 20:20:21 +020028#include <haproxy/proxy.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020029#include <haproxy/ring.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020030#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020031#include <haproxy/sink.h>
Willy Tarreau5e539c92020-06-04 20:45:39 +020032#include <haproxy/stream_interface.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020033#include <haproxy/time.h>
Willy Tarreau4bad5e22021-05-08 13:05:30 +020034#include <haproxy/tools.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020035
36struct list sink_list = LIST_HEAD_INIT(sink_list);
37
Emeric Brunfd667082022-09-13 16:16:30 +020038/* sink proxies list */
39struct proxy *sink_proxies_list;
40
Emeric Brun99c453d2020-05-25 15:01:04 +020041struct sink *cfg_sink;
42
Willy Tarreau67b5a162019-08-11 16:38:56 +020043struct sink *sink_find(const char *name)
44{
45 struct sink *sink;
46
47 list_for_each_entry(sink, &sink_list, sink_list)
48 if (strcmp(sink->name, name) == 0)
49 return sink;
50 return NULL;
51}
52
53/* creates a new sink and adds it to the list, it's still generic and not fully
54 * initialized. Returns NULL on allocation failure. If another one already
55 * exists with the same name, it will be returned. The caller can detect it as
56 * a newly created one has type SINK_TYPE_NEW.
57 */
Emeric Brun54648852020-07-06 15:54:06 +020058static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020059{
60 struct sink *sink;
61
62 sink = sink_find(name);
63 if (sink)
64 goto end;
65
Emeric Brun494c5052020-05-28 11:13:15 +020066 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020067 if (!sink)
68 goto end;
69
Emeric Brun99c453d2020-05-25 15:01:04 +020070 sink->name = strdup(name);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010071 if (!sink->name)
72 goto err;
73
Emeric Brun99c453d2020-05-25 15:01:04 +020074 sink->desc = strdup(desc);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010075 if (!sink->desc)
76 goto err;
77
Willy Tarreau67b5a162019-08-11 16:38:56 +020078 sink->fmt = fmt;
79 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010080 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020081 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020082 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020083 sink->ctx.dropped = 0;
84 HA_RWLOCK_INIT(&sink->ctx.lock);
Willy Tarreau2b718102021-04-21 07:32:39 +020085 LIST_APPEND(&sink_list, &sink->sink_list);
Willy Tarreau67b5a162019-08-11 16:38:56 +020086 end:
87 return sink;
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010088
89 err:
Willy Tarreau61cfdf42021-02-20 10:46:51 +010090 ha_free(&sink->name);
91 ha_free(&sink->desc);
92 ha_free(&sink);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010093
94 return NULL;
Willy Tarreau67b5a162019-08-11 16:38:56 +020095}
96
Willy Tarreau973e6622019-08-20 11:57:52 +020097/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
98 * and description <desc>. Returns NULL on allocation failure or conflict.
99 * Perfect duplicates are merged (same type, fd, and name).
100 */
Emeric Brun54648852020-07-06 15:54:06 +0200101struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +0200102{
103 struct sink *sink;
104
105 sink = __sink_new(name, desc, fmt);
106 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
107 goto end;
108
109 if (sink->type != SINK_TYPE_NEW) {
110 sink = NULL;
111 goto end;
112 }
113
114 sink->type = SINK_TYPE_FD;
115 sink->ctx.fd = fd;
116 end:
117 return sink;
118}
119
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200120/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
121 * and description <desc>. Returns NULL on allocation failure or conflict.
122 * Perfect duplicates are merged (same type and name). If sizes differ, the
123 * largest one is kept.
124 */
Emeric Brun54648852020-07-06 15:54:06 +0200125struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200126{
127 struct sink *sink;
128
129 sink = __sink_new(name, desc, fmt);
130 if (!sink)
131 goto fail;
132
133 if (sink->type == SINK_TYPE_BUFFER) {
134 /* such a buffer already exists, we may have to resize it */
135 if (!ring_resize(sink->ctx.ring, size))
136 goto fail;
137 goto end;
138 }
139
140 if (sink->type != SINK_TYPE_NEW) {
141 /* already exists of another type */
142 goto fail;
143 }
144
145 sink->ctx.ring = ring_new(size);
146 if (!sink->ctx.ring) {
Willy Tarreau2b718102021-04-21 07:32:39 +0200147 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200148 free(sink->name);
149 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200150 free(sink);
151 goto fail;
152 }
153
154 sink->type = SINK_TYPE_BUFFER;
155 end:
156 return sink;
157 fail:
158 return NULL;
159}
160
Willy Tarreau67b5a162019-08-11 16:38:56 +0200161/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500162 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200163 * done here. Lost messages are NOT accounted for. It is preferable to call
164 * sink_write() instead which will also try to emit the number of dropped
Aurelien DARRAGONc2423132023-06-26 16:44:41 +0200165 * messages when there are any. It will stop writing at <maxlen> instead of
166 * sink->maxlen if <maxlen> is positive and inferior to sink->maxlen.
167 *
168 * It returns >0 if it could write anything, <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200169 */
Aurelien DARRAGONc2423132023-06-26 16:44:41 +0200170 ssize_t __sink_write(struct sink *sink, size_t maxlen,
171 const struct ist msg[], size_t nmsg,
Emeric Brun54648852020-07-06 15:54:06 +0200172 int level, int facility, struct ist *metadata)
173 {
174 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200175 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200176
Emeric Brun54648852020-07-06 15:54:06 +0200177 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200178 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200179
Emeric Brun54648852020-07-06 15:54:06 +0200180 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200181
182send:
Aurelien DARRAGONc2423132023-06-26 16:44:41 +0200183 if (!maxlen)
184 maxlen = ~0;
Willy Tarreau973e6622019-08-20 11:57:52 +0200185 if (sink->type == SINK_TYPE_FD) {
Aurelien DARRAGONc2423132023-06-26 16:44:41 +0200186 return fd_write_frag_line(sink->ctx.fd, MIN(maxlen, sink->maxlen), pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200187 }
188 else if (sink->type == SINK_TYPE_BUFFER) {
Aurelien DARRAGONc2423132023-06-26 16:44:41 +0200189 return ring_write(sink->ctx.ring, MIN(maxlen, sink->maxlen), pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200190 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200191 return 0;
192}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200193
Willy Tarreau8f240232019-08-27 16:41:06 +0200194/* Tries to emit a message indicating the number of dropped events. In case of
195 * success, the amount of drops is reduced by as much. It's supposed to be
196 * called under an exclusive lock on the sink to avoid multiple produces doing
197 * the same. On success, >0 is returned, otherwise <=0 on failure.
198 */
Emeric Brun54648852020-07-06 15:54:06 +0200199int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200200{
Emeric Brun54648852020-07-06 15:54:06 +0200201 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
202 static THREAD_LOCAL pid_t curr_pid;
203 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200204 unsigned int dropped;
205 struct buffer msg;
206 struct ist msgvec[1];
207 char logbuf[64];
208
209 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
210 chunk_init(&msg, logbuf, sizeof(logbuf));
211 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
212 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200213
Emeric Brun54648852020-07-06 15:54:06 +0200214 if (!metadata[LOG_META_HOST].len) {
215 if (global.log_send_hostname)
216 metadata[LOG_META_HOST] = ist2(global.log_send_hostname, strlen(global.log_send_hostname));
Emeric Brun54648852020-07-06 15:54:06 +0200217 }
218
219 if (!metadata[LOG_META_TAG].len)
220 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
221
222 if (unlikely(curr_pid != getpid()))
223 metadata[LOG_META_PID].len = 0;
224
225 if (!metadata[LOG_META_PID].len) {
226 curr_pid = getpid();
227 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
228 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
229 }
230
Aurelien DARRAGONc2423132023-06-26 16:44:41 +0200231 if (__sink_write(sink, 0, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200232 return 0;
233 /* success! */
234 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
235 }
236 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200237}
238
Willy Tarreau9f830d72019-08-26 18:17:04 +0200239/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
240static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
241{
242 struct sink *sink;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200243 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200244
245 args++; // make args[1] the 1st arg
246
247 if (!*args[1]) {
248 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200249 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200250 list_for_each_entry(sink, &sink_list, sink_list) {
251 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
252 sink->name,
253 sink->type == SINK_TYPE_NEW ? "init" :
254 sink->type == SINK_TYPE_FD ? "fd" :
255 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
256 sink->ctx.dropped, sink->desc);
257 }
258
259 trash.area[trash.data] = 0;
260 return cli_msg(appctx, LOG_WARNING, trash.area);
261 }
262
263 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
264 return 1;
265
266 sink = sink_find(args[1]);
267 if (!sink)
268 return cli_err(appctx, "No such event sink");
269
270 if (sink->type != SINK_TYPE_BUFFER)
271 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
272
Willy Tarreau1d181e42019-08-30 11:17:01 +0200273 for (arg = 2; *args[arg]; arg++) {
274 if (strcmp(args[arg], "-w") == 0)
275 appctx->ctx.cli.i0 |= 1; // wait mode
276 else if (strcmp(args[arg], "-n") == 0)
277 appctx->ctx.cli.i0 |= 2; // seek to new
278 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
279 appctx->ctx.cli.i0 |= 3; // seek to new + wait
280 else
281 return cli_err(appctx, "unknown option");
282 }
Willy Tarreau9f830d72019-08-26 18:17:04 +0200283 return ring_attach_cli(sink->ctx.ring, appctx);
284}
285
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500286/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200287void sink_setup_proxy(struct proxy *px)
288{
289 px->last_change = now.tv_sec;
Christopher Faulet1e25eb12022-10-24 15:10:18 +0200290 px->cap = PR_CAP_BE;
Emeric Brun494c5052020-05-28 11:13:15 +0200291 px->maxconn = 0;
292 px->conn_retries = 1;
293 px->timeout.server = TICK_ETERNITY;
294 px->timeout.client = TICK_ETERNITY;
295 px->timeout.connect = TICK_ETERNITY;
296 px->accept = NULL;
297 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
298 px->bind_proc = 0; /* will be filled by users */
Emeric Brunfd667082022-09-13 16:16:30 +0200299 px->next = sink_proxies_list;
300 sink_proxies_list = px;
Emeric Brun494c5052020-05-28 11:13:15 +0200301}
302
303/*
304 * IO Handler to handle message push to syslog tcp server
305 */
306static void sink_forward_io_handler(struct appctx *appctx)
307{
308 struct stream_interface *si = appctx->owner;
309 struct stream *s = si_strm(si);
310 struct sink *sink = strm_fe(s)->parent;
311 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
312 struct ring *ring = sink->ctx.ring;
313 struct buffer *buf = &ring->buf;
314 uint64_t msg_len;
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200315 size_t len, cnt, ofs, last_ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200316 int ret = 0;
317
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500318 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200319 if (unlikely(stopping))
320 goto close;
321
322 /* for rex because it seems reset to timeout
323 * and we don't want expire on this case
324 * with a syslog server
325 */
326 si_oc(si)->rex = TICK_ETERNITY;
327 /* rto should not change but it seems the case */
328 si_oc(si)->rto = TICK_ETERNITY;
329
330 /* an error was detected */
331 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
332 goto close;
333
334 /* con closed by server side */
335 if ((si_oc(si)->flags & CF_SHUTW))
336 goto close;
337
338 /* if the connection is not established, inform the stream that we want
339 * to be notified whenever the connection completes.
340 */
341 if (si_opposite(si)->state < SI_ST_EST) {
342 si_cant_get(si);
343 si_rx_conn_blk(si);
344 si_rx_endp_more(si);
345 return;
346 }
347
348 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
349 if (appctx != sft->appctx) {
350 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
351 goto close;
352 }
353 ofs = sft->ofs;
354
355 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
356 LIST_DEL_INIT(&appctx->wait_entry);
357 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
358
359 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
360
361 /* explanation for the initialization below: it would be better to do
362 * this in the parsing function but this would occasionally result in
363 * dropped events because we'd take a reference on the oldest message
364 * and keep it while being scheduled. Thus instead let's take it the
365 * first time we enter here so that we have a chance to pass many
366 * existing messages before grabbing a reference to a location. This
367 * value cannot be produced after initialization.
368 */
369 if (unlikely(ofs == ~0)) {
370 ofs = 0;
371
Willy Tarreau4781b152021-04-06 13:53:36 +0200372 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200373 ofs += ring->ofs;
374 }
375
Emeric Brun494c5052020-05-28 11:13:15 +0200376 /* in this loop, ofs always points to the counter byte that precedes
377 * the message so that we can take our reference there if we have to
378 * stop before the end (ret=0).
379 */
380 if (si_opposite(si)->state == SI_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100381 /* we were already there, adjust the offset to be relative to
382 * the buffer's head and remove us from the counter.
383 */
384 ofs -= ring->ofs;
385 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200386 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100387
Emeric Brun494c5052020-05-28 11:13:15 +0200388 ret = 1;
389 while (ofs + 1 < b_data(buf)) {
390 cnt = 1;
391 len = b_peek_varint(buf, ofs + cnt, &msg_len);
392 if (!len)
393 break;
394 cnt += len;
395 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
396
397 if (unlikely(msg_len + 1 > b_size(&trash))) {
398 /* too large a message to ever fit, let's skip it */
399 ofs += cnt + msg_len;
400 continue;
401 }
402
403 chunk_reset(&trash);
404 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
405 trash.data += len;
406 trash.area[trash.data++] = '\n';
407
408 if (ci_putchk(si_ic(si), &trash) == -1) {
409 si_rx_room_blk(si);
410 ret = 0;
411 break;
412 }
413 ofs += cnt + msg_len;
414 }
415
Willy Tarreau4781b152021-04-06 13:53:36 +0200416 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200417 ofs += ring->ofs;
418 sft->ofs = ofs;
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200419 last_ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200420 }
421 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
422
423 if (ret) {
424 /* let's be woken up once new data arrive */
425 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200426 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200427 ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200428 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreaub2cd3ed2022-08-04 17:18:54 +0200429 if (ofs != last_ofs) {
430 /* more data was added into the ring between the
431 * unlock and the lock, and the writer might not
432 * have seen us. We need to reschedule a read.
433 */
434 si_rx_endp_more(si);
435 } else
436 si_rx_endp_done(si);
Emeric Brun494c5052020-05-28 11:13:15 +0200437 }
438 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
439
440 /* always drain data from server */
441 co_skip(si_oc(si), si_oc(si)->output);
442 return;
443
444close:
445 si_shutw(si);
446 si_shutr(si);
447 si_ic(si)->flags |= CF_READ_NULL;
448}
449
Emeric Brun97556472020-05-30 01:42:45 +0200450/*
451 * IO Handler to handle message push to syslog tcp server
452 * using octet counting frames
453 */
454static void sink_forward_oc_io_handler(struct appctx *appctx)
455{
456 struct stream_interface *si = appctx->owner;
457 struct stream *s = si_strm(si);
458 struct sink *sink = strm_fe(s)->parent;
459 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
460 struct ring *ring = sink->ctx.ring;
461 struct buffer *buf = &ring->buf;
462 uint64_t msg_len;
463 size_t len, cnt, ofs;
464 int ret = 0;
465 char *p;
466
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500467 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200468 if (unlikely(stopping))
469 goto close;
470
471 /* for rex because it seems reset to timeout
472 * and we don't want expire on this case
473 * with a syslog server
474 */
475 si_oc(si)->rex = TICK_ETERNITY;
476 /* rto should not change but it seems the case */
477 si_oc(si)->rto = TICK_ETERNITY;
478
479 /* an error was detected */
480 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
481 goto close;
482
483 /* con closed by server side */
484 if ((si_oc(si)->flags & CF_SHUTW))
485 goto close;
486
487 /* if the connection is not established, inform the stream that we want
488 * to be notified whenever the connection completes.
489 */
490 if (si_opposite(si)->state < SI_ST_EST) {
491 si_cant_get(si);
492 si_rx_conn_blk(si);
493 si_rx_endp_more(si);
494 return;
495 }
496
497 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
498 if (appctx != sft->appctx) {
499 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
500 goto close;
501 }
502 ofs = sft->ofs;
503
504 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
505 LIST_DEL_INIT(&appctx->wait_entry);
506 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
507
508 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
509
510 /* explanation for the initialization below: it would be better to do
511 * this in the parsing function but this would occasionally result in
512 * dropped events because we'd take a reference on the oldest message
513 * and keep it while being scheduled. Thus instead let's take it the
514 * first time we enter here so that we have a chance to pass many
515 * existing messages before grabbing a reference to a location. This
516 * value cannot be produced after initialization.
517 */
518 if (unlikely(ofs == ~0)) {
519 ofs = 0;
520
Willy Tarreau4781b152021-04-06 13:53:36 +0200521 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200522 ofs += ring->ofs;
523 }
524
Emeric Brun97556472020-05-30 01:42:45 +0200525 /* in this loop, ofs always points to the counter byte that precedes
526 * the message so that we can take our reference there if we have to
527 * stop before the end (ret=0).
528 */
529 if (si_opposite(si)->state == SI_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100530 /* we were already there, adjust the offset to be relative to
531 * the buffer's head and remove us from the counter.
532 */
533 ofs -= ring->ofs;
534 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200535 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100536
Emeric Brun97556472020-05-30 01:42:45 +0200537 ret = 1;
538 while (ofs + 1 < b_data(buf)) {
539 cnt = 1;
540 len = b_peek_varint(buf, ofs + cnt, &msg_len);
541 if (!len)
542 break;
543 cnt += len;
544 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
545
546 chunk_reset(&trash);
547 p = ulltoa(msg_len, trash.area, b_size(&trash));
548 if (p) {
549 trash.data = (p - trash.area) + 1;
550 *p = ' ';
551 }
552
553 if (!p || (trash.data + msg_len > b_size(&trash))) {
554 /* too large a message to ever fit, let's skip it */
555 ofs += cnt + msg_len;
556 continue;
557 }
558
559 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
560
561 if (ci_putchk(si_ic(si), &trash) == -1) {
562 si_rx_room_blk(si);
563 ret = 0;
564 break;
565 }
566 ofs += cnt + msg_len;
567 }
568
Willy Tarreau4781b152021-04-06 13:53:36 +0200569 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200570 ofs += ring->ofs;
571 sft->ofs = ofs;
572 }
573 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
574
575 if (ret) {
576 /* let's be woken up once new data arrive */
577 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200578 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Emeric Brun97556472020-05-30 01:42:45 +0200579 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
580 si_rx_endp_done(si);
581 }
582 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
583
584 /* always drain data from server */
585 co_skip(si_oc(si), si_oc(si)->output);
586 return;
587
588close:
589 si_shutw(si);
590 si_shutr(si);
591 si_ic(si)->flags |= CF_READ_NULL;
592}
593
Emeric Brun494c5052020-05-28 11:13:15 +0200594void __sink_forward_session_deinit(struct sink_forward_target *sft)
595{
596 struct stream_interface *si;
597 struct stream *s;
598 struct sink *sink;
599
600 if (!sft->appctx)
601 return;
602
603 si = sft->appctx->owner;
604 if (!si)
605 return;
606
607 s = si_strm(si);
608 if (!s)
609 return;
610
611 sink = strm_fe(s)->parent;
612 if (!sink)
613 return;
614
615 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
616 LIST_DEL_INIT(&sft->appctx->wait_entry);
617 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
618
619 sft->appctx = NULL;
620 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
621}
622
623
624static void sink_forward_session_release(struct appctx *appctx)
625{
Christopher Fauletefebfda2022-01-14 15:03:22 +0100626 struct sink_forward_target *sft = appctx->ctx.sft.ptr;
Emeric Brun494c5052020-05-28 11:13:15 +0200627
628 if (!sft)
629 return;
630
631 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
632 if (sft->appctx == appctx)
633 __sink_forward_session_deinit(sft);
634 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
635}
636
637static struct applet sink_forward_applet = {
638 .obj_type = OBJ_TYPE_APPLET,
639 .name = "<SINKFWD>", /* used for logging */
640 .fct = sink_forward_io_handler,
641 .release = sink_forward_session_release,
642};
643
Emeric Brun97556472020-05-30 01:42:45 +0200644static struct applet sink_forward_oc_applet = {
645 .obj_type = OBJ_TYPE_APPLET,
646 .name = "<SINKFWDOC>", /* used for logging */
647 .fct = sink_forward_oc_io_handler,
648 .release = sink_forward_session_release,
649};
650
Emeric Brun494c5052020-05-28 11:13:15 +0200651/*
652 * Create a new peer session in assigned state (connect will start automatically)
653 */
654static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
655{
656 struct proxy *p = sink->forward_px;
657 struct appctx *appctx;
658 struct session *sess;
659 struct stream *s;
Emeric Brun97556472020-05-30 01:42:45 +0200660 struct applet *applet = &sink_forward_applet;
661
662 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
663 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200664
Emeric Brun97556472020-05-30 01:42:45 +0200665 appctx = appctx_new(applet, tid_bit);
Emeric Brun494c5052020-05-28 11:13:15 +0200666 if (!appctx)
667 goto out_close;
668
669 appctx->ctx.sft.ptr = (void *)sft;
670
671 sess = session_new(p, NULL, &appctx->obj_type);
672 if (!sess) {
673 ha_alert("out of memory in peer_session_create().\n");
674 goto out_free_appctx;
675 }
676
Christopher Faulet26256f82020-09-14 11:40:13 +0200677 if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) {
Emeric Brun494c5052020-05-28 11:13:15 +0200678 ha_alert("Failed to initialize stream in peer_session_create().\n");
679 goto out_free_sess;
680 }
681
682
683 s->target = &sft->srv->obj_type;
Willy Tarreau9b7587a2020-10-15 07:32:10 +0200684 if (!sockaddr_alloc(&s->target_addr, &sft->srv->addr, sizeof(sft->srv->addr)))
Emeric Brun494c5052020-05-28 11:13:15 +0200685 goto out_free_strm;
Emeric Brun494c5052020-05-28 11:13:15 +0200686 s->flags = SF_ASSIGNED|SF_ADDR_SET;
687 s->si[1].flags |= SI_FL_NOLINGER;
688
689 s->do_log = NULL;
690 s->uniq_id = 0;
691
692 s->res.flags |= CF_READ_DONTWAIT;
693 /* for rto and rex to eternity to not expire on idle recv:
694 * We are using a syslog server.
695 */
696 s->res.rto = TICK_ETERNITY;
697 s->res.rex = TICK_ETERNITY;
698 sft->appctx = appctx;
699 task_wakeup(s->task, TASK_WOKEN_INIT);
700 return appctx;
701
702 /* Error unrolling */
703 out_free_strm:
Willy Tarreau2b718102021-04-21 07:32:39 +0200704 LIST_DELETE(&s->list);
Emeric Brun494c5052020-05-28 11:13:15 +0200705 pool_free(pool_head_stream, s);
706 out_free_sess:
707 session_free(sess);
708 out_free_appctx:
709 appctx_free(appctx);
710 out_close:
711 return NULL;
712}
713
714/*
715 * Task to handle connctions to forward servers
716 */
Willy Tarreau144f84a2021-03-02 16:09:26 +0100717static struct task *process_sink_forward(struct task * task, void *context, unsigned int state)
Emeric Brun494c5052020-05-28 11:13:15 +0200718{
719 struct sink *sink = (struct sink *)context;
720 struct sink_forward_target *sft = sink->sft;
721
722 task->expire = TICK_ETERNITY;
723
724 if (!stopping) {
725 while (sft) {
726 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
727 /* if appctx is NULL, start a new session */
728 if (!sft->appctx)
729 sft->appctx = sink_forward_session_create(sink, sft);
730 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
731 sft = sft->next;
732 }
733 }
734 else {
735 while (sft) {
736 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
737 /* awake applet to perform a clean close */
738 if (sft->appctx)
739 appctx_wakeup(sft->appctx);
740 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
741 sft = sft->next;
742 }
743 }
744
745 return task;
746}
747/*
748 * Init task to manage connctions to forward servers
749 *
750 * returns 0 in case of error.
751 */
752int sink_init_forward(struct sink *sink)
753{
754 sink->forward_task = task_new(MAX_THREADS_MASK);
755 if (!sink->forward_task)
756 return 0;
757
758 sink->forward_task->process = process_sink_forward;
759 sink->forward_task->context = (void *)sink;
760 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
761 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
762 return 1;
763}
Emeric Brun99c453d2020-05-25 15:01:04 +0200764/*
765 * Parse "ring" section and create corresponding sink buffer.
766 *
767 * The function returns 0 in success case, otherwise, it returns error
768 * flags.
769 */
770int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
771{
772 int err_code = 0;
773 const char *inv;
774 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200775 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200776
Willy Tarreau9d675b62022-08-11 16:12:11 +0200777 if (strcmp(args[0], "ring") == 0) { /* new ring section */
Emeric Brun99c453d2020-05-25 15:01:04 +0200778 if (!*args[1]) {
779 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
780 err_code |= ERR_ALERT | ERR_FATAL;
781 goto err;
782 }
783
784 inv = invalid_char(args[1]);
785 if (inv) {
786 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
787 err_code |= ERR_ALERT | ERR_FATAL;
788 goto err;
789 }
790
791 if (sink_find(args[1])) {
792 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
793 err_code |= ERR_ALERT | ERR_FATAL;
794 goto err;
795 }
796
Emeric Brun54648852020-07-06 15:54:06 +0200797 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200798 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
799 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
800 err_code |= ERR_ALERT | ERR_FATAL;
801 goto err;
802 }
Emeric Brun494c5052020-05-28 11:13:15 +0200803
804 /* allocate new proxy to handle forwards */
805 p = calloc(1, sizeof *p);
806 if (!p) {
807 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
808 err_code |= ERR_ALERT | ERR_FATAL;
809 goto err;
810 }
811
812 init_new_proxy(p);
813 sink_setup_proxy(p);
814 p->parent = cfg_sink;
815 p->id = strdup(args[1]);
816 p->conf.args.file = p->conf.file = strdup(file);
817 p->conf.args.line = p->conf.line = linenum;
818 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200819 }
820 else if (strcmp(args[0], "size") == 0) {
Willy Tarreau9d675b62022-08-11 16:12:11 +0200821 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
822 ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum);
823 err_code |= ERR_ALERT | ERR_FATAL;
824 goto err;
825 }
826
Emeric Brun99c453d2020-05-25 15:01:04 +0200827 size = atol(args[1]);
828 if (!size) {
829 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
830 err_code |= ERR_ALERT | ERR_FATAL;
831 goto err;
832 }
833
Willy Tarreau9d675b62022-08-11 16:12:11 +0200834 if (size < cfg_sink->ctx.ring->buf.size) {
835 ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than current size '%llu' for ring '%s'.\n",
836 file, linenum, (ullong)size, (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
Aurelien DARRAGONb3304be2023-06-22 16:57:29 +0200837 err_code |= ERR_WARN;
Willy Tarreau9d675b62022-08-11 16:12:11 +0200838 goto err;
839 }
840
841 if (!ring_resize(cfg_sink->ctx.ring, size)) {
842 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%llu' for ring '%s'.\n", file, linenum,
843 (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200844 err_code |= ERR_ALERT | ERR_FATAL;
845 goto err;
846 }
847 }
Emeric Brun494c5052020-05-28 11:13:15 +0200848 else if (strcmp(args[0],"server") == 0) {
Willy Tarreau28ae98a2022-11-16 18:56:34 +0100849 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
850 ha_alert("parsing [%s:%d] : unable to create server '%s'.\n", file, linenum, args[1]);
851 err_code |= ERR_ALERT | ERR_FATAL;
852 goto err;
853 }
854
Amaury Denoyelle30c05372021-03-08 16:36:46 +0100855 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL,
856 SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE);
Emeric Brun494c5052020-05-28 11:13:15 +0200857 }
858 else if (strcmp(args[0],"timeout") == 0) {
859 if (!cfg_sink || !cfg_sink->forward_px) {
860 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
861 err_code |= ERR_ALERT | ERR_FATAL;
862 goto err;
863 }
864
865 if (strcmp(args[1], "connect") == 0 ||
866 strcmp(args[1], "server") == 0) {
867 const char *res;
868 unsigned int tout;
869
870 if (!*args[2]) {
871 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
872 file, linenum, args[0], args[1]);
873 err_code |= ERR_ALERT | ERR_FATAL;
874 goto err;
875 }
876 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
877 if (res == PARSE_TIME_OVER) {
878 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
879 file, linenum, args[2], args[0], args[1]);
880 err_code |= ERR_ALERT | ERR_FATAL;
881 goto err;
882 }
883 else if (res == PARSE_TIME_UNDER) {
884 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
885 file, linenum, args[2], args[0], args[1]);
886 err_code |= ERR_ALERT | ERR_FATAL;
887 goto err;
888 }
889 else if (res) {
890 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
891 file, linenum, *res, args[0], args[1]);
892 err_code |= ERR_ALERT | ERR_FATAL;
893 goto err;
894 }
Christopher Faulet737b3fd2022-10-19 16:26:21 +0200895 if (args[1][0] == 'c')
Emeric Brun494c5052020-05-28 11:13:15 +0200896 cfg_sink->forward_px->timeout.connect = tout;
897 else
898 cfg_sink->forward_px->timeout.server = tout;
899 }
900 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200901 else if (strcmp(args[0],"format") == 0) {
902 if (!cfg_sink) {
903 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
904 err_code |= ERR_ALERT | ERR_FATAL;
905 goto err;
906 }
907
Emeric Brun54648852020-07-06 15:54:06 +0200908 cfg_sink->fmt = get_log_format(args[1]);
909 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +0200910 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
911 err_code |= ERR_ALERT | ERR_FATAL;
912 goto err;
913 }
914 }
915 else if (strcmp(args[0],"maxlen") == 0) {
916 if (!cfg_sink) {
917 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
918 err_code |= ERR_ALERT | ERR_FATAL;
919 goto err;
920 }
921
922 cfg_sink->maxlen = atol(args[1]);
923 if (!cfg_sink->maxlen) {
924 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
925 err_code |= ERR_ALERT | ERR_FATAL;
926 goto err;
927 }
928 }
929 else if (strcmp(args[0],"description") == 0) {
930 if (!cfg_sink) {
931 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
932 err_code |= ERR_ALERT | ERR_FATAL;
933 goto err;
934 }
935
936 if (!*args[1]) {
937 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
938 err_code |= ERR_ALERT | ERR_FATAL;
939 goto err;
940 }
941
942 free(cfg_sink->desc);
943
944 cfg_sink->desc = strdup(args[1]);
945 if (!cfg_sink->desc) {
946 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
947 err_code |= ERR_ALERT | ERR_FATAL;
948 goto err;
949 }
950 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200951 else {
952 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
953 err_code |= ERR_ALERT | ERR_FATAL;
954 goto err;
955 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200956
957err:
958 return err_code;
959}
960
Emeric Brun94aab062021-04-02 10:41:36 +0200961/* Creates an new sink buffer from a log server.
962 *
963 * It uses the logsrvaddress to declare a forward
964 * server for this buffer. And it initializes the
965 * forwarding.
966 *
967 * The function returns a pointer on the
968 * allocated struct sink if allocate
969 * and initialize succeed, else if it fails
970 * it returns NULL.
971 *
972 * Note: the sink is created using the name
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +0500973 * specified into logsrv->ring_name
Emeric Brun94aab062021-04-02 10:41:36 +0200974 */
975struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
976{
977 struct proxy *p = NULL;
978 struct sink *sink = NULL;
979 struct server *srv = NULL;
980 struct sink_forward_target *sft = NULL;
981 int i;
982
983 /* allocate new proxy to handle
984 * forward to a stream server
985 */
986 p = calloc(1, sizeof *p);
987 if (!p) {
988 goto error;
989 }
990
991 init_new_proxy(p);
992 sink_setup_proxy(p);
993 p->id = strdup(logsrv->ring_name);
994 p->conf.args.file = p->conf.file = strdup(logsrv->conf.file);
995 p->conf.args.line = p->conf.line = logsrv->conf.line;
996
Christopher Faulet46da0412022-10-24 15:53:01 +0200997 /* Set default connect and server timeout */
998 p->timeout.connect = MS_TO_TICKS(1000);
999 p->timeout.server = MS_TO_TICKS(5000);
1000
Emeric Brun94aab062021-04-02 10:41:36 +02001001 /* allocate a new server to forward messages
1002 * from ring buffer
1003 */
1004 srv = new_server(p);
1005 if (!srv)
1006 goto error;
1007
1008 /* init server */
1009 srv->id = strdup(logsrv->ring_name);
1010 srv->conf.file = strdup(logsrv->conf.file);
1011 srv->conf.line = logsrv->conf.line;
1012 srv->addr = logsrv->addr;
1013 srv->svc_port = get_host_port(&logsrv->addr);
1014 HA_SPIN_INIT(&srv->lock);
1015
1016 /* process per thread init */
1017 srv->per_thr = calloc(global.nbthread, sizeof(*srv->per_thr));
1018 if (!srv->per_thr)
1019 goto error;
1020
1021 for (i = 0; i < global.nbthread; i++) {
1022 srv->per_thr[i].idle_conns = EB_ROOT;
1023 srv->per_thr[i].safe_conns = EB_ROOT;
1024 srv->per_thr[i].avail_conns = EB_ROOT;
1025 MT_LIST_INIT(&srv->per_thr[i].streams);
1026 }
1027
1028 /* the servers are linked backwards
1029 * first into proxy
1030 */
Emeric Brun94aab062021-04-02 10:41:36 +02001031 srv->next = p->srv;
Aurelien DARRAGONca28e782023-07-06 14:57:32 +02001032 p->srv = srv;
Emeric Brun94aab062021-04-02 10:41:36 +02001033
1034 /* allocate sink_forward_target descriptor */
1035 sft = calloc(1, sizeof(*sft));
1036 if (!sft)
1037 goto error;
1038
1039 /* init sink_forward_target offset */
1040 sft->srv = srv;
1041 sft->appctx = NULL;
1042 sft->ofs = ~0;
1043 HA_SPIN_INIT(&sft->lock);
1044
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +05001045 /* prepare description for the sink */
Emeric Brun94aab062021-04-02 10:41:36 +02001046 chunk_reset(&trash);
1047 chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
1048
1049 /* allocate a new sink buffer */
1050 sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE);
1051 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1052 goto error;
1053 }
1054
1055 /* link sink_forward_target to proxy */
1056 sink->forward_px = p;
1057 p->parent = sink;
1058
1059 /* insert into sink_forward_targets
1060 * list into sink
1061 */
1062 sft->next = sink->sft;
1063 sink->sft = sft;
1064
1065 /* mark server as an attached reader to the ring */
1066 if (!ring_attach(sink->ctx.ring)) {
1067 /* should never fail since there is
1068 * only one reader
1069 */
1070 goto error;
1071 }
1072
1073 /* initialize sink buffer forwarding */
1074 if (!sink_init_forward(sink))
1075 goto error;
1076
1077 /* reset familyt of logsrv to consider the ring buffer target */
1078 logsrv->addr.ss_family = AF_UNSPEC;
1079
1080 return sink;
1081error:
Aurelien DARRAGON0e786f72023-07-11 09:31:06 +02001082 if (srv)
1083 free_server(srv);
1084
Emeric Brun94aab062021-04-02 10:41:36 +02001085 if (p) {
1086 if (p->id)
1087 free(p->id);
1088 if (p->conf.file)
1089 free(p->conf.file);
1090
1091 free(p);
1092 }
1093
Emeric Brun94aab062021-04-02 10:41:36 +02001094 if (sft)
1095 free(sft);
1096
1097 if (sink) {
1098 if (sink->ctx.ring)
1099 ring_free(sink->ctx.ring);
1100
Willy Tarreau2b718102021-04-21 07:32:39 +02001101 LIST_DELETE(&sink->sink_list);
Emeric Brun94aab062021-04-02 10:41:36 +02001102 free(sink->name);
1103 free(sink->desc);
1104 free(sink);
1105 }
1106
1107 return NULL;
1108}
1109
Emeric Brun99c453d2020-05-25 15:01:04 +02001110/*
1111 * Post parsing "ring" section.
1112 *
1113 * The function returns 0 in success case, otherwise, it returns error
1114 * flags.
1115 */
1116int cfg_post_parse_ring()
1117{
1118 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +02001119 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +02001120
1121 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
1122 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
1123 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +02001124 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +02001125 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
Aurelien DARRAGON6726e572023-06-26 14:22:12 +02001126 err_code |= ERR_WARN;
Emeric Brun99c453d2020-05-25 15:01:04 +02001127 }
Emeric Brun494c5052020-05-28 11:13:15 +02001128
1129 /* prepare forward server descriptors */
1130 if (cfg_sink->forward_px) {
1131 srv = cfg_sink->forward_px->srv;
1132 while (srv) {
1133 struct sink_forward_target *sft;
Emeric Brun99c453d2020-05-25 15:01:04 +02001134
Emeric Brun494c5052020-05-28 11:13:15 +02001135 /* allocate sink_forward_target descriptor */
1136 sft = calloc(1, sizeof(*sft));
1137 if (!sft) {
1138 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1139 err_code |= ERR_ALERT | ERR_FATAL;
1140 break;
1141 }
1142 sft->srv = srv;
1143 sft->appctx = NULL;
1144 sft->ofs = ~0; /* init ring offset */
1145 sft->next = cfg_sink->sft;
1146 HA_SPIN_INIT(&sft->lock);
1147
1148 /* mark server attached to the ring */
1149 if (!ring_attach(cfg_sink->ctx.ring)) {
1150 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1151 err_code |= ERR_ALERT | ERR_FATAL;
Aurelien DARRAGON39835222023-07-10 16:26:08 +02001152 ha_free(&sft);
1153 break;
Emeric Brun494c5052020-05-28 11:13:15 +02001154 }
1155 cfg_sink->sft = sft;
1156 srv = srv->next;
1157 }
Aurelien DARRAGON39835222023-07-10 16:26:08 +02001158 if (sink_init_forward(cfg_sink) == 0) {
1159 ha_alert("error when trying to initialize sink buffer forwarding.\n");
1160 err_code |= ERR_ALERT | ERR_FATAL;
1161 }
Emeric Brun494c5052020-05-28 11:13:15 +02001162 }
1163 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001164 cfg_sink = NULL;
1165
1166 return err_code;
1167}
1168
1169/* resolve sink names at end of config. Returns 0 on success otherwise error
1170 * flags.
1171*/
1172int post_sink_resolve()
1173{
Christopher Fauletfc633b62020-11-06 15:24:23 +01001174 int err_code = ERR_NONE;
Emeric Brun99c453d2020-05-25 15:01:04 +02001175 struct logsrv *logsrv, *logb;
1176 struct sink *sink;
1177 struct proxy *px;
1178
1179 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1180 if (logsrv->type == LOG_TARGET_BUFFER) {
1181 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001182 if (!sink) {
1183 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1184 * means we must allocate a sink
1185 * buffer to send messages to this logsrv
1186 */
1187 if (logsrv->addr.ss_family != AF_UNSPEC) {
1188 sink = sink_new_from_logsrv(logsrv);
1189 if (!sink) {
1190 ha_alert("global stream log server declared in file '%s' at line %d cannot be initialized'.\n",
1191 logsrv->conf.file, logsrv->conf.line);
1192 err_code |= ERR_ALERT | ERR_FATAL;
1193 }
1194 }
1195 else {
1196 ha_alert("global log server declared in file '%s' at line %d uses unknown ring named '%s'.\n",
1197 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1198 err_code |= ERR_ALERT | ERR_FATAL;
1199 }
1200 }
1201 else if (sink->type != SINK_TYPE_BUFFER) {
1202 ha_alert("global log server declared in file '%s' at line %d uses incompatible ring '%s'.\n",
1203 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001204 err_code |= ERR_ALERT | ERR_FATAL;
1205 }
1206 logsrv->sink = sink;
1207 }
Emeric Brun94aab062021-04-02 10:41:36 +02001208
Emeric Brun99c453d2020-05-25 15:01:04 +02001209 }
1210
1211 for (px = proxies_list; px; px = px->next) {
1212 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1213 if (logsrv->type == LOG_TARGET_BUFFER) {
1214 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001215 if (!sink) {
1216 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1217 * means we must allocate a sink
1218 * buffer to send messages to this logsrv
1219 */
1220 if (logsrv->addr.ss_family != AF_UNSPEC) {
1221 sink = sink_new_from_logsrv(logsrv);
1222 if (!sink) {
1223 ha_alert("log server declared in proxy section '%s' file '%s' at line %d cannot be initialized'.\n",
1224 px->id, logsrv->conf.file, logsrv->conf.line);
1225 err_code |= ERR_ALERT | ERR_FATAL;
1226 }
1227 }
1228 else {
1229 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1230 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1231 err_code |= ERR_ALERT | ERR_FATAL;
1232 }
1233 }
1234 else if (sink->type != SINK_TYPE_BUFFER) {
1235 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses incomatible ring named '%s'.\n",
1236 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001237 err_code |= ERR_ALERT | ERR_FATAL;
1238 }
1239 logsrv->sink = sink;
1240 }
1241 }
1242 }
Emeric Brun12941c82020-07-07 14:19:42 +02001243
1244 for (px = cfg_log_forward; px; px = px->next) {
1245 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1246 if (logsrv->type == LOG_TARGET_BUFFER) {
1247 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001248 if (!sink) {
1249 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1250 * means we must allocate a sink
1251 * buffer to send messages to this logsrv
1252 */
1253 if (logsrv->addr.ss_family != AF_UNSPEC) {
1254 sink = sink_new_from_logsrv(logsrv);
1255 if (!sink) {
1256 ha_alert("log server declared in log-forward section '%s' file '%s' at line %d cannot be initialized'.\n",
1257 px->id, logsrv->conf.file, logsrv->conf.line);
1258 err_code |= ERR_ALERT | ERR_FATAL;
1259 }
1260 }
1261 else {
1262 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1263 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1264 err_code |= ERR_ALERT | ERR_FATAL;
1265 }
1266 }
1267 else if (sink->type != SINK_TYPE_BUFFER) {
1268 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1269 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun12941c82020-07-07 14:19:42 +02001270 err_code |= ERR_ALERT | ERR_FATAL;
1271 }
1272 logsrv->sink = sink;
1273 }
1274 }
1275 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001276 return err_code;
1277}
1278
1279
Willy Tarreau973e6622019-08-20 11:57:52 +02001280static void sink_init()
1281{
Emeric Brun54648852020-07-06 15:54:06 +02001282 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1283 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1284 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001285}
1286
1287static void sink_deinit()
1288{
1289 struct sink *sink, *sb;
Aurelien DARRAGONcd2bfa42023-07-10 15:17:12 +02001290 struct sink_forward_target *sft_next;
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001291
1292 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1293 if (sink->type == SINK_TYPE_BUFFER)
1294 ring_free(sink->ctx.ring);
Willy Tarreau2b718102021-04-21 07:32:39 +02001295 LIST_DELETE(&sink->sink_list);
Willy Tarreaudb5fd2b2023-01-26 15:46:08 +01001296 task_destroy(sink->forward_task);
Aurelien DARRAGONf8796032023-03-09 12:07:09 +01001297 free_proxy(sink->forward_px);
Emeric Brun99c453d2020-05-25 15:01:04 +02001298 free(sink->name);
1299 free(sink->desc);
Aurelien DARRAGONcd2bfa42023-07-10 15:17:12 +02001300 while (sink->sft) {
1301 sft_next = sink->sft->next;
1302 free(sink->sft);
1303 sink->sft = sft_next;
1304 }
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001305 free(sink);
1306 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001307}
1308
1309INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001310REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001311
Willy Tarreau9f830d72019-08-26 18:17:04 +02001312static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaub205bfd2021-05-07 11:38:37 +02001313 { { "show", "events", NULL }, "show events [<sink>] [-w] [-n] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001314 {{},}
1315}};
1316
1317INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1318
Emeric Brun99c453d2020-05-25 15:01:04 +02001319/* config parsers for this section */
1320REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1321REGISTER_POST_CHECK(post_sink_resolve);
1322
Willy Tarreau67b5a162019-08-11 16:38:56 +02001323/*
1324 * Local variables:
1325 * c-indent-level: 8
1326 * c-basic-offset: 8
1327 * End:
1328 */