blob: ef3d0f0f216582720e182e2374230731e71a0c09 [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau0b8e9ce2022-08-11 16:38:20 +020021#include <sys/mman.h>
22#include <errno.h>
23#include <fcntl.h>
24
Willy Tarreau36979d92020-06-05 17:27:29 +020025#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020026#include <haproxy/api.h>
Christopher Faulet6b0a0fb2022-04-04 11:29:28 +020027#include <haproxy/applet.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020028#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020029#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020030#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020031#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020032#include <haproxy/log.h>
Willy Tarreau817538e2021-05-08 20:20:21 +020033#include <haproxy/proxy.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020034#include <haproxy/ring.h>
Willy Tarreau5edca2f2022-05-27 09:25:10 +020035#include <haproxy/sc_strm.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020036#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020037#include <haproxy/sink.h>
Willy Tarreaucb086c62022-05-27 09:47:12 +020038#include <haproxy/stconn.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020039#include <haproxy/time.h>
Willy Tarreau4bad5e22021-05-08 13:05:30 +020040#include <haproxy/tools.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020041
42struct list sink_list = LIST_HEAD_INIT(sink_list);
43
Emeric Brund6e581d2022-09-13 16:16:30 +020044/* sink proxies list */
45struct proxy *sink_proxies_list;
46
Emeric Brun99c453d2020-05-25 15:01:04 +020047struct sink *cfg_sink;
48
Willy Tarreau67b5a162019-08-11 16:38:56 +020049struct sink *sink_find(const char *name)
50{
51 struct sink *sink;
52
53 list_for_each_entry(sink, &sink_list, sink_list)
54 if (strcmp(sink->name, name) == 0)
55 return sink;
56 return NULL;
57}
58
59/* creates a new sink and adds it to the list, it's still generic and not fully
60 * initialized. Returns NULL on allocation failure. If another one already
61 * exists with the same name, it will be returned. The caller can detect it as
62 * a newly created one has type SINK_TYPE_NEW.
63 */
Emeric Brun54648852020-07-06 15:54:06 +020064static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020065{
66 struct sink *sink;
67
68 sink = sink_find(name);
69 if (sink)
70 goto end;
71
Emeric Brun494c5052020-05-28 11:13:15 +020072 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020073 if (!sink)
74 goto end;
75
Emeric Brun99c453d2020-05-25 15:01:04 +020076 sink->name = strdup(name);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010077 if (!sink->name)
78 goto err;
79
Emeric Brun99c453d2020-05-25 15:01:04 +020080 sink->desc = strdup(desc);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010081 if (!sink->desc)
82 goto err;
83
Willy Tarreau67b5a162019-08-11 16:38:56 +020084 sink->fmt = fmt;
85 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010086 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020087 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020088 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020089 sink->ctx.dropped = 0;
90 HA_RWLOCK_INIT(&sink->ctx.lock);
Willy Tarreau2b718102021-04-21 07:32:39 +020091 LIST_APPEND(&sink_list, &sink->sink_list);
Willy Tarreau67b5a162019-08-11 16:38:56 +020092 end:
93 return sink;
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010094
95 err:
Willy Tarreau61cfdf42021-02-20 10:46:51 +010096 ha_free(&sink->name);
97 ha_free(&sink->desc);
98 ha_free(&sink);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010099
100 return NULL;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200101}
102
Willy Tarreau973e6622019-08-20 11:57:52 +0200103/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
104 * and description <desc>. Returns NULL on allocation failure or conflict.
105 * Perfect duplicates are merged (same type, fd, and name).
106 */
Emeric Brun54648852020-07-06 15:54:06 +0200107struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +0200108{
109 struct sink *sink;
110
111 sink = __sink_new(name, desc, fmt);
112 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
113 goto end;
114
115 if (sink->type != SINK_TYPE_NEW) {
116 sink = NULL;
117 goto end;
118 }
119
120 sink->type = SINK_TYPE_FD;
121 sink->ctx.fd = fd;
122 end:
123 return sink;
124}
125
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200126/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
127 * and description <desc>. Returns NULL on allocation failure or conflict.
128 * Perfect duplicates are merged (same type and name). If sizes differ, the
129 * largest one is kept.
130 */
Emeric Brun54648852020-07-06 15:54:06 +0200131struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200132{
133 struct sink *sink;
134
135 sink = __sink_new(name, desc, fmt);
136 if (!sink)
137 goto fail;
138
139 if (sink->type == SINK_TYPE_BUFFER) {
140 /* such a buffer already exists, we may have to resize it */
141 if (!ring_resize(sink->ctx.ring, size))
142 goto fail;
143 goto end;
144 }
145
146 if (sink->type != SINK_TYPE_NEW) {
147 /* already exists of another type */
148 goto fail;
149 }
150
151 sink->ctx.ring = ring_new(size);
152 if (!sink->ctx.ring) {
Willy Tarreau2b718102021-04-21 07:32:39 +0200153 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200154 free(sink->name);
155 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200156 free(sink);
157 goto fail;
158 }
159
160 sink->type = SINK_TYPE_BUFFER;
161 end:
162 return sink;
163 fail:
164 return NULL;
165}
166
Willy Tarreau67b5a162019-08-11 16:38:56 +0200167/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500168 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200169 * done here. Lost messages are NOT accounted for. It is preferable to call
170 * sink_write() instead which will also try to emit the number of dropped
171 * messages when there are any. It returns >0 if it could write anything,
172 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200173 */
Emeric Brun54648852020-07-06 15:54:06 +0200174 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
175 int level, int facility, struct ist *metadata)
176 {
177 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200178 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200179
Emeric Brun54648852020-07-06 15:54:06 +0200180 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200181 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200182
Emeric Brun54648852020-07-06 15:54:06 +0200183 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200184
185send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200186 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200187 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200188 }
189 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200190 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200191 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200192 return 0;
193}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200194
Willy Tarreau8f240232019-08-27 16:41:06 +0200195/* Tries to emit a message indicating the number of dropped events. In case of
196 * success, the amount of drops is reduced by as much. It's supposed to be
197 * called under an exclusive lock on the sink to avoid multiple produces doing
198 * the same. On success, >0 is returned, otherwise <=0 on failure.
199 */
Emeric Brun54648852020-07-06 15:54:06 +0200200int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200201{
Emeric Brun54648852020-07-06 15:54:06 +0200202 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
203 static THREAD_LOCAL pid_t curr_pid;
204 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200205 unsigned int dropped;
206 struct buffer msg;
207 struct ist msgvec[1];
208 char logbuf[64];
209
210 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
211 chunk_init(&msg, logbuf, sizeof(logbuf));
212 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
213 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200214
Emeric Brun54648852020-07-06 15:54:06 +0200215 if (!metadata[LOG_META_HOST].len) {
216 if (global.log_send_hostname)
Tim Duesterhus77508502022-03-15 13:11:06 +0100217 metadata[LOG_META_HOST] = ist(global.log_send_hostname);
Emeric Brun54648852020-07-06 15:54:06 +0200218 }
219
220 if (!metadata[LOG_META_TAG].len)
221 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
222
223 if (unlikely(curr_pid != getpid()))
224 metadata[LOG_META_PID].len = 0;
225
226 if (!metadata[LOG_META_PID].len) {
227 curr_pid = getpid();
228 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
229 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
230 }
231
232 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200233 return 0;
234 /* success! */
235 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
236 }
237 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200238}
239
Willy Tarreau9f830d72019-08-26 18:17:04 +0200240/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
241static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
242{
243 struct sink *sink;
Willy Tarreaucba88382022-05-05 15:18:57 +0200244 uint ring_flags;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200245 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200246
247 args++; // make args[1] the 1st arg
248
249 if (!*args[1]) {
250 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200251 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200252 list_for_each_entry(sink, &sink_list, sink_list) {
253 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
254 sink->name,
255 sink->type == SINK_TYPE_NEW ? "init" :
256 sink->type == SINK_TYPE_FD ? "fd" :
257 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
258 sink->ctx.dropped, sink->desc);
259 }
260
261 trash.area[trash.data] = 0;
262 return cli_msg(appctx, LOG_WARNING, trash.area);
263 }
264
265 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
266 return 1;
267
268 sink = sink_find(args[1]);
269 if (!sink)
270 return cli_err(appctx, "No such event sink");
271
272 if (sink->type != SINK_TYPE_BUFFER)
273 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
274
Willy Tarreaucba88382022-05-05 15:18:57 +0200275 ring_flags = 0;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200276 for (arg = 2; *args[arg]; arg++) {
277 if (strcmp(args[arg], "-w") == 0)
Willy Tarreaucba88382022-05-05 15:18:57 +0200278 ring_flags |= RING_WF_WAIT_MODE;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200279 else if (strcmp(args[arg], "-n") == 0)
Willy Tarreaucba88382022-05-05 15:18:57 +0200280 ring_flags |= RING_WF_SEEK_NEW;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200281 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
Willy Tarreaucba88382022-05-05 15:18:57 +0200282 ring_flags |= RING_WF_WAIT_MODE | RING_WF_SEEK_NEW;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200283 else
284 return cli_err(appctx, "unknown option");
285 }
Willy Tarreaucba88382022-05-05 15:18:57 +0200286 return ring_attach_cli(sink->ctx.ring, appctx, ring_flags);
Willy Tarreau9f830d72019-08-26 18:17:04 +0200287}
288
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500289/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200290void sink_setup_proxy(struct proxy *px)
291{
292 px->last_change = now.tv_sec;
Christopher Faulet11a707a2022-10-24 15:10:18 +0200293 px->cap = PR_CAP_BE;
Emeric Brun494c5052020-05-28 11:13:15 +0200294 px->maxconn = 0;
295 px->conn_retries = 1;
296 px->timeout.server = TICK_ETERNITY;
297 px->timeout.client = TICK_ETERNITY;
298 px->timeout.connect = TICK_ETERNITY;
299 px->accept = NULL;
300 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
Emeric Brund6e581d2022-09-13 16:16:30 +0200301 px->next = sink_proxies_list;
302 sink_proxies_list = px;
Emeric Brun494c5052020-05-28 11:13:15 +0200303}
304
305/*
Willy Tarreau42cc8312022-05-04 20:42:23 +0200306 * IO Handler to handle message push to syslog tcp server.
307 * It takes its context from appctx->svcctx.
Emeric Brun494c5052020-05-28 11:13:15 +0200308 */
309static void sink_forward_io_handler(struct appctx *appctx)
310{
Willy Tarreauc12b3212022-05-27 11:08:15 +0200311 struct stconn *sc = appctx_sc(appctx);
Willy Tarreau0eca5392022-05-27 10:44:25 +0200312 struct stream *s = __sc_strm(sc);
Emeric Brun494c5052020-05-28 11:13:15 +0200313 struct sink *sink = strm_fe(s)->parent;
Willy Tarreau42cc8312022-05-04 20:42:23 +0200314 struct sink_forward_target *sft = appctx->svcctx;
Emeric Brun494c5052020-05-28 11:13:15 +0200315 struct ring *ring = sink->ctx.ring;
316 struct buffer *buf = &ring->buf;
317 uint64_t msg_len;
Willy Tarreau53bfab02022-08-04 17:18:54 +0200318 size_t len, cnt, ofs, last_ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200319 int ret = 0;
320
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500321 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200322 if (unlikely(stopping))
323 goto close;
324
325 /* for rex because it seems reset to timeout
326 * and we don't want expire on this case
327 * with a syslog server
328 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200329 sc_oc(sc)->rex = TICK_ETERNITY;
Emeric Brun494c5052020-05-28 11:13:15 +0200330 /* rto should not change but it seems the case */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200331 sc_oc(sc)->rto = TICK_ETERNITY;
Emeric Brun494c5052020-05-28 11:13:15 +0200332
333 /* an error was detected */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200334 if (unlikely(sc_ic(sc)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
Emeric Brun494c5052020-05-28 11:13:15 +0200335 goto close;
336
337 /* con closed by server side */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200338 if ((sc_oc(sc)->flags & CF_SHUTW))
Emeric Brun494c5052020-05-28 11:13:15 +0200339 goto close;
340
341 /* if the connection is not established, inform the stream that we want
342 * to be notified whenever the connection completes.
343 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200344 if (sc_opposite(sc)->state < SC_ST_EST) {
Willy Tarreau90e8b452022-05-25 18:21:43 +0200345 applet_need_more_data(appctx);
Willy Tarreaub23edc82022-05-24 16:49:03 +0200346 se_need_remote_conn(appctx->sedesc);
Willy Tarreau4164eb92022-05-25 15:42:03 +0200347 applet_have_more_data(appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200348 return;
349 }
350
351 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
352 if (appctx != sft->appctx) {
353 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
354 goto close;
355 }
356 ofs = sft->ofs;
357
358 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
359 LIST_DEL_INIT(&appctx->wait_entry);
360 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
361
362 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
363
364 /* explanation for the initialization below: it would be better to do
365 * this in the parsing function but this would occasionally result in
366 * dropped events because we'd take a reference on the oldest message
367 * and keep it while being scheduled. Thus instead let's take it the
368 * first time we enter here so that we have a chance to pass many
369 * existing messages before grabbing a reference to a location. This
370 * value cannot be produced after initialization.
371 */
372 if (unlikely(ofs == ~0)) {
373 ofs = 0;
374
Willy Tarreau4781b152021-04-06 13:53:36 +0200375 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200376 ofs += ring->ofs;
377 }
378
Emeric Brun494c5052020-05-28 11:13:15 +0200379 /* in this loop, ofs always points to the counter byte that precedes
380 * the message so that we can take our reference there if we have to
381 * stop before the end (ret=0).
382 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200383 if (sc_opposite(sc)->state == SC_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100384 /* we were already there, adjust the offset to be relative to
385 * the buffer's head and remove us from the counter.
386 */
387 ofs -= ring->ofs;
388 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200389 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100390
Emeric Brun494c5052020-05-28 11:13:15 +0200391 ret = 1;
392 while (ofs + 1 < b_data(buf)) {
393 cnt = 1;
394 len = b_peek_varint(buf, ofs + cnt, &msg_len);
395 if (!len)
396 break;
397 cnt += len;
398 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
399
400 if (unlikely(msg_len + 1 > b_size(&trash))) {
401 /* too large a message to ever fit, let's skip it */
402 ofs += cnt + msg_len;
403 continue;
404 }
405
406 chunk_reset(&trash);
407 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
408 trash.data += len;
409 trash.area[trash.data++] = '\n';
410
Willy Tarreaud0a06d52022-05-18 15:07:19 +0200411 if (applet_putchk(appctx, &trash) == -1) {
Emeric Brun494c5052020-05-28 11:13:15 +0200412 ret = 0;
413 break;
414 }
415 ofs += cnt + msg_len;
416 }
417
Willy Tarreau4781b152021-04-06 13:53:36 +0200418 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200419 ofs += ring->ofs;
420 sft->ofs = ofs;
Willy Tarreau53bfab02022-08-04 17:18:54 +0200421 last_ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200422 }
423 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
424
425 if (ret) {
426 /* let's be woken up once new data arrive */
427 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200428 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Willy Tarreau53bfab02022-08-04 17:18:54 +0200429 ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200430 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau53bfab02022-08-04 17:18:54 +0200431 if (ofs != last_ofs) {
432 /* more data was added into the ring between the
433 * unlock and the lock, and the writer might not
434 * have seen us. We need to reschedule a read.
435 */
436 applet_have_more_data(appctx);
437 } else
438 applet_have_no_more_data(appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200439 }
440 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
441
442 /* always drain data from server */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200443 co_skip(sc_oc(sc), sc_oc(sc)->output);
Emeric Brun494c5052020-05-28 11:13:15 +0200444 return;
445
446close:
Willy Tarreau0eca5392022-05-27 10:44:25 +0200447 sc_shutw(sc);
448 sc_shutr(sc);
449 sc_ic(sc)->flags |= CF_READ_NULL;
Emeric Brun494c5052020-05-28 11:13:15 +0200450}
451
Emeric Brun97556472020-05-30 01:42:45 +0200452/*
453 * IO Handler to handle message push to syslog tcp server
454 * using octet counting frames
Willy Tarreau42cc8312022-05-04 20:42:23 +0200455 * It takes its context from appctx->svcctx.
Emeric Brun97556472020-05-30 01:42:45 +0200456 */
457static void sink_forward_oc_io_handler(struct appctx *appctx)
458{
Willy Tarreauc12b3212022-05-27 11:08:15 +0200459 struct stconn *sc = appctx_sc(appctx);
Willy Tarreau0eca5392022-05-27 10:44:25 +0200460 struct stream *s = __sc_strm(sc);
Emeric Brun97556472020-05-30 01:42:45 +0200461 struct sink *sink = strm_fe(s)->parent;
Willy Tarreau42cc8312022-05-04 20:42:23 +0200462 struct sink_forward_target *sft = appctx->svcctx;
Emeric Brun97556472020-05-30 01:42:45 +0200463 struct ring *ring = sink->ctx.ring;
464 struct buffer *buf = &ring->buf;
465 uint64_t msg_len;
466 size_t len, cnt, ofs;
467 int ret = 0;
468 char *p;
469
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500470 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200471 if (unlikely(stopping))
472 goto close;
473
474 /* for rex because it seems reset to timeout
475 * and we don't want expire on this case
476 * with a syslog server
477 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200478 sc_oc(sc)->rex = TICK_ETERNITY;
Emeric Brun97556472020-05-30 01:42:45 +0200479 /* rto should not change but it seems the case */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200480 sc_oc(sc)->rto = TICK_ETERNITY;
Emeric Brun97556472020-05-30 01:42:45 +0200481
482 /* an error was detected */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200483 if (unlikely(sc_ic(sc)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
Emeric Brun97556472020-05-30 01:42:45 +0200484 goto close;
485
486 /* con closed by server side */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200487 if ((sc_oc(sc)->flags & CF_SHUTW))
Emeric Brun97556472020-05-30 01:42:45 +0200488 goto close;
489
490 /* if the connection is not established, inform the stream that we want
491 * to be notified whenever the connection completes.
492 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200493 if (sc_opposite(sc)->state < SC_ST_EST) {
Willy Tarreau90e8b452022-05-25 18:21:43 +0200494 applet_need_more_data(appctx);
Willy Tarreaub23edc82022-05-24 16:49:03 +0200495 se_need_remote_conn(appctx->sedesc);
Willy Tarreau4164eb92022-05-25 15:42:03 +0200496 applet_have_more_data(appctx);
Emeric Brun97556472020-05-30 01:42:45 +0200497 return;
498 }
499
500 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
501 if (appctx != sft->appctx) {
502 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
503 goto close;
504 }
505 ofs = sft->ofs;
506
507 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
508 LIST_DEL_INIT(&appctx->wait_entry);
509 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
510
511 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
512
513 /* explanation for the initialization below: it would be better to do
514 * this in the parsing function but this would occasionally result in
515 * dropped events because we'd take a reference on the oldest message
516 * and keep it while being scheduled. Thus instead let's take it the
517 * first time we enter here so that we have a chance to pass many
518 * existing messages before grabbing a reference to a location. This
519 * value cannot be produced after initialization.
520 */
521 if (unlikely(ofs == ~0)) {
522 ofs = 0;
523
Willy Tarreau4781b152021-04-06 13:53:36 +0200524 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200525 ofs += ring->ofs;
526 }
527
Emeric Brun97556472020-05-30 01:42:45 +0200528 /* in this loop, ofs always points to the counter byte that precedes
529 * the message so that we can take our reference there if we have to
530 * stop before the end (ret=0).
531 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200532 if (sc_opposite(sc)->state == SC_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100533 /* we were already there, adjust the offset to be relative to
534 * the buffer's head and remove us from the counter.
535 */
536 ofs -= ring->ofs;
537 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200538 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100539
Emeric Brun97556472020-05-30 01:42:45 +0200540 ret = 1;
541 while (ofs + 1 < b_data(buf)) {
542 cnt = 1;
543 len = b_peek_varint(buf, ofs + cnt, &msg_len);
544 if (!len)
545 break;
546 cnt += len;
547 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
548
549 chunk_reset(&trash);
550 p = ulltoa(msg_len, trash.area, b_size(&trash));
551 if (p) {
552 trash.data = (p - trash.area) + 1;
553 *p = ' ';
554 }
555
556 if (!p || (trash.data + msg_len > b_size(&trash))) {
557 /* too large a message to ever fit, let's skip it */
558 ofs += cnt + msg_len;
559 continue;
560 }
561
562 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
563
Willy Tarreaud0a06d52022-05-18 15:07:19 +0200564 if (applet_putchk(appctx, &trash) == -1) {
Emeric Brun97556472020-05-30 01:42:45 +0200565 ret = 0;
566 break;
567 }
568 ofs += cnt + msg_len;
569 }
570
Willy Tarreau4781b152021-04-06 13:53:36 +0200571 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200572 ofs += ring->ofs;
573 sft->ofs = ofs;
574 }
575 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
576
577 if (ret) {
578 /* let's be woken up once new data arrive */
579 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200580 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Emeric Brun97556472020-05-30 01:42:45 +0200581 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau4164eb92022-05-25 15:42:03 +0200582 applet_have_no_more_data(appctx);
Emeric Brun97556472020-05-30 01:42:45 +0200583 }
584 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
585
586 /* always drain data from server */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200587 co_skip(sc_oc(sc), sc_oc(sc)->output);
Emeric Brun97556472020-05-30 01:42:45 +0200588 return;
589
590close:
Willy Tarreau0eca5392022-05-27 10:44:25 +0200591 sc_shutw(sc);
592 sc_shutr(sc);
593 sc_ic(sc)->flags |= CF_READ_NULL;
Emeric Brun97556472020-05-30 01:42:45 +0200594}
595
Emeric Brun494c5052020-05-28 11:13:15 +0200596void __sink_forward_session_deinit(struct sink_forward_target *sft)
597{
Willy Tarreau0698c802022-05-11 14:09:57 +0200598 struct stream *s = appctx_strm(sft->appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200599 struct sink *sink;
600
Emeric Brun494c5052020-05-28 11:13:15 +0200601 sink = strm_fe(s)->parent;
602 if (!sink)
603 return;
604
605 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
606 LIST_DEL_INIT(&sft->appctx->wait_entry);
607 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
608
609 sft->appctx = NULL;
610 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
611}
612
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200613static int sink_forward_session_init(struct appctx *appctx)
614{
615 struct sink_forward_target *sft = appctx->svcctx;
616 struct stream *s;
617 struct sockaddr_storage *addr = NULL;
618
619 if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
620 goto out_error;
621
622 if (appctx_finalize_startup(appctx, sft->sink->forward_px, &BUF_NULL) == -1)
623 goto out_free_addr;
624
625 s = appctx_strm(appctx);
Willy Tarreau7cb9e6c2022-05-17 19:40:40 +0200626 s->scb->dst = addr;
Willy Tarreaucb041662022-05-17 19:44:42 +0200627 s->scb->flags |= SC_FL_NOLINGER;
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200628
629 s->target = &sft->srv->obj_type;
630 s->flags = SF_ASSIGNED;
631
632 s->do_log = NULL;
633 s->uniq_id = 0;
634
635 s->res.flags |= CF_READ_DONTWAIT;
636 /* for rto and rex to eternity to not expire on idle recv:
637 * We are using a syslog server.
638 */
639 s->res.rto = TICK_ETERNITY;
640 s->res.rex = TICK_ETERNITY;
641 sft->appctx = appctx;
642
643 return 0;
644
645 out_free_addr:
646 sockaddr_free(&addr);
647 out_error:
648 return -1;
649}
Emeric Brun494c5052020-05-28 11:13:15 +0200650
651static void sink_forward_session_release(struct appctx *appctx)
652{
Willy Tarreau42cc8312022-05-04 20:42:23 +0200653 struct sink_forward_target *sft = appctx->svcctx;
Emeric Brun494c5052020-05-28 11:13:15 +0200654
655 if (!sft)
656 return;
657
658 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
659 if (sft->appctx == appctx)
660 __sink_forward_session_deinit(sft);
661 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
662}
663
664static struct applet sink_forward_applet = {
665 .obj_type = OBJ_TYPE_APPLET,
666 .name = "<SINKFWD>", /* used for logging */
667 .fct = sink_forward_io_handler,
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200668 .init = sink_forward_session_init,
Emeric Brun494c5052020-05-28 11:13:15 +0200669 .release = sink_forward_session_release,
670};
671
Emeric Brun97556472020-05-30 01:42:45 +0200672static struct applet sink_forward_oc_applet = {
673 .obj_type = OBJ_TYPE_APPLET,
674 .name = "<SINKFWDOC>", /* used for logging */
675 .fct = sink_forward_oc_io_handler,
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200676 .init = sink_forward_session_init,
Emeric Brun97556472020-05-30 01:42:45 +0200677 .release = sink_forward_session_release,
678};
679
Emeric Brun494c5052020-05-28 11:13:15 +0200680/*
681 * Create a new peer session in assigned state (connect will start automatically)
Willy Tarreau42cc8312022-05-04 20:42:23 +0200682 * It sets its context into appctx->svcctx.
Emeric Brun494c5052020-05-28 11:13:15 +0200683 */
684static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
685{
Emeric Brun494c5052020-05-28 11:13:15 +0200686 struct appctx *appctx;
Emeric Brun97556472020-05-30 01:42:45 +0200687 struct applet *applet = &sink_forward_applet;
688
689 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
690 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200691
Christopher Faulet6095d572022-05-16 17:09:48 +0200692 appctx = appctx_new_here(applet, NULL);
Christopher Faulet2479e5f2022-01-19 14:50:11 +0100693 if (!appctx)
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100694 goto out_close;
Willy Tarreau42cc8312022-05-04 20:42:23 +0200695 appctx->svcctx = (void *)sft;
Emeric Brun494c5052020-05-28 11:13:15 +0200696
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200697 if (appctx_init(appctx) == -1)
Christopher Faulet92202da2022-05-11 12:22:10 +0200698 goto out_free_appctx;
Christopher Faulet13a35e52021-12-20 15:34:16 +0100699
Emeric Brun494c5052020-05-28 11:13:15 +0200700 return appctx;
701
702 /* Error unrolling */
Emeric Brun494c5052020-05-28 11:13:15 +0200703 out_free_appctx:
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200704 appctx_free_on_early_error(appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200705 out_close:
706 return NULL;
707}
708
709/*
710 * Task to handle connctions to forward servers
711 */
Willy Tarreau144f84a2021-03-02 16:09:26 +0100712static struct task *process_sink_forward(struct task * task, void *context, unsigned int state)
Emeric Brun494c5052020-05-28 11:13:15 +0200713{
714 struct sink *sink = (struct sink *)context;
715 struct sink_forward_target *sft = sink->sft;
716
717 task->expire = TICK_ETERNITY;
718
719 if (!stopping) {
720 while (sft) {
721 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
722 /* if appctx is NULL, start a new session */
723 if (!sft->appctx)
724 sft->appctx = sink_forward_session_create(sink, sft);
725 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
726 sft = sft->next;
727 }
728 }
729 else {
730 while (sft) {
731 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
732 /* awake applet to perform a clean close */
733 if (sft->appctx)
734 appctx_wakeup(sft->appctx);
735 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
736 sft = sft->next;
737 }
738 }
739
740 return task;
741}
742/*
743 * Init task to manage connctions to forward servers
744 *
745 * returns 0 in case of error.
746 */
747int sink_init_forward(struct sink *sink)
748{
Willy Tarreaubeeabf52021-10-01 18:23:30 +0200749 sink->forward_task = task_new_anywhere();
Emeric Brun494c5052020-05-28 11:13:15 +0200750 if (!sink->forward_task)
751 return 0;
752
753 sink->forward_task->process = process_sink_forward;
754 sink->forward_task->context = (void *)sink;
755 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
756 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
757 return 1;
758}
Willy Tarreau32872db2022-08-31 18:52:17 +0200759
760/* This tries to rotate a file-backed ring, but only if it contains contents.
761 * This way empty rings will not cause backups to be overwritten and it's safe
762 * to reload multiple times. That's only best effort, failures are silently
763 * ignored.
764 */
765void sink_rotate_file_backed_ring(const char *name)
766{
767 struct ring ring;
768 char *oldback;
769 int ret;
770 int fd;
771
772 fd = open(name, O_RDONLY);
773 if (fd < 0)
774 return;
775
776 /* check for contents validity */
777 ret = read(fd, &ring, sizeof(ring));
778 close(fd);
779
780 if (ret != sizeof(ring))
781 goto rotate;
782
783 /* contents are present, we want to keep them => rotate. Note that
784 * an empty ring buffer has one byte (the marker).
785 */
786 if (ring.buf.data > 1)
787 goto rotate;
788
789 /* nothing to keep, let's scratch the file and preserve the backup */
790 return;
791
792 rotate:
793 oldback = NULL;
794 memprintf(&oldback, "%s.bak", name);
795 if (oldback) {
796 /* try to rename any possibly existing ring file to
797 * ".bak" and delete remains of older ones. This will
798 * ensure we don't wipe useful debug info upon restart.
799 */
800 unlink(oldback);
801 if (rename(name, oldback) < 0)
802 unlink(oldback);
803 ha_free(&oldback);
804 }
805}
806
Emeric Brun99c453d2020-05-25 15:01:04 +0200807/*
808 * Parse "ring" section and create corresponding sink buffer.
809 *
810 * The function returns 0 in success case, otherwise, it returns error
811 * flags.
812 */
813int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
814{
815 int err_code = 0;
816 const char *inv;
817 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200818 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200819
Willy Tarreau18d13062022-08-11 16:12:11 +0200820 if (strcmp(args[0], "ring") == 0) { /* new ring section */
Emeric Brun99c453d2020-05-25 15:01:04 +0200821 if (!*args[1]) {
822 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
823 err_code |= ERR_ALERT | ERR_FATAL;
824 goto err;
825 }
826
827 inv = invalid_char(args[1]);
828 if (inv) {
829 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
830 err_code |= ERR_ALERT | ERR_FATAL;
831 goto err;
832 }
833
834 if (sink_find(args[1])) {
835 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
836 err_code |= ERR_ALERT | ERR_FATAL;
837 goto err;
838 }
839
Emeric Brun54648852020-07-06 15:54:06 +0200840 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200841 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
842 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
843 err_code |= ERR_ALERT | ERR_FATAL;
844 goto err;
845 }
Emeric Brun494c5052020-05-28 11:13:15 +0200846
847 /* allocate new proxy to handle forwards */
848 p = calloc(1, sizeof *p);
849 if (!p) {
850 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
851 err_code |= ERR_ALERT | ERR_FATAL;
852 goto err;
853 }
854
855 init_new_proxy(p);
856 sink_setup_proxy(p);
857 p->parent = cfg_sink;
858 p->id = strdup(args[1]);
859 p->conf.args.file = p->conf.file = strdup(file);
860 p->conf.args.line = p->conf.line = linenum;
861 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200862 }
863 else if (strcmp(args[0], "size") == 0) {
Willy Tarreau18d13062022-08-11 16:12:11 +0200864 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
865 ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum);
866 err_code |= ERR_ALERT | ERR_FATAL;
867 goto err;
868 }
869
Emeric Brun99c453d2020-05-25 15:01:04 +0200870 size = atol(args[1]);
871 if (!size) {
872 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
873 err_code |= ERR_ALERT | ERR_FATAL;
874 goto err;
875 }
876
Willy Tarreau0b8e9ce2022-08-11 16:38:20 +0200877 if (cfg_sink->store) {
878 ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum);
879 err_code |= ERR_ALERT | ERR_FATAL;
880 goto err;
881 }
882
Willy Tarreau18d13062022-08-11 16:12:11 +0200883 if (size < cfg_sink->ctx.ring->buf.size) {
884 ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than current size '%llu' for ring '%s'.\n",
885 file, linenum, (ullong)size, (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
886 err_code |= ERR_ALERT | ERR_FATAL;
887 goto err;
888 }
889
890 if (!ring_resize(cfg_sink->ctx.ring, size)) {
891 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%llu' for ring '%s'.\n", file, linenum,
892 (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200893 err_code |= ERR_ALERT | ERR_FATAL;
894 goto err;
895 }
Willy Tarreau0b8e9ce2022-08-11 16:38:20 +0200896 }
897 else if (strcmp(args[0], "backing-file") == 0) {
898 /* This tries to mmap file <file> for size <size> and to use it as a backing store
899 * for ring <ring>. Existing data are delete. NULL is returned on error.
900 */
901 const char *backing = args[1];
902 size_t size;
903 void *area;
904 int fd;
905
906 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
907 ha_alert("parsing [%s:%d] : 'backing-file' only usable with existing rings.\n", file, linenum);
908 err_code |= ERR_ALERT | ERR_FATAL;
909 goto err;
910 }
911
912 if (cfg_sink->store) {
913 ha_alert("parsing [%s:%d] : 'backing-file' already specified for ring '%s' (was '%s').\n", file, linenum, cfg_sink->name, cfg_sink->store);
914 err_code |= ERR_ALERT | ERR_FATAL;
915 goto err;
916 }
917
Willy Tarreau32872db2022-08-31 18:52:17 +0200918 /* let's check if the file exists and is not empty. That's the
919 * only condition under which we'll trigger a rotate, so that
920 * config checks, reloads, or restarts that don't emit anything
921 * do not rotate it again.
922 */
923 sink_rotate_file_backed_ring(backing);
Willy Tarreauded77cc2022-08-12 15:38:20 +0200924
Willy Tarreau8e877052022-08-12 15:03:12 +0200925 fd = open(backing, O_RDWR | O_CREAT, 0600);
Willy Tarreau0b8e9ce2022-08-11 16:38:20 +0200926 if (fd < 0) {
927 ha_alert("parsing [%s:%d] : cannot open backing-file '%s' for ring '%s': %s.\n", file, linenum, backing, cfg_sink->name, strerror(errno));
928 err_code |= ERR_ALERT | ERR_FATAL;
929 goto err;
930 }
931
932 size = (cfg_sink->ctx.ring->buf.size + 4095UL) & -4096UL;
933 if (ftruncate(fd, size) != 0) {
934 close(fd);
935 ha_alert("parsing [%s:%d] : could not adjust size of backing-file for ring '%s': %s.\n", file, linenum, cfg_sink->name, strerror(errno));
936 err_code |= ERR_ALERT | ERR_FATAL;
937 goto err;
938 }
939
940 area = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
941 if (area == MAP_FAILED) {
942 close(fd);
943 ha_alert("parsing [%s:%d] : failed to use '%s' as a backing file for ring '%s': %s.\n", file, linenum, backing, cfg_sink->name, strerror(errno));
944 err_code |= ERR_ALERT | ERR_FATAL;
945 goto err;
946 }
947
948 /* we don't need the file anymore */
949 close(fd);
950 cfg_sink->store = strdup(backing);
951
952 /* never fails */
953 ring_free(cfg_sink->ctx.ring);
954 cfg_sink->ctx.ring = ring_make_from_area(area, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200955 }
Emeric Brun494c5052020-05-28 11:13:15 +0200956 else if (strcmp(args[0],"server") == 0) {
Willy Tarreau1b662aa2022-11-16 18:56:34 +0100957 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
958 ha_alert("parsing [%s:%d] : unable to create server '%s'.\n", file, linenum, args[1]);
959 err_code |= ERR_ALERT | ERR_FATAL;
960 goto err;
961 }
962
Amaury Denoyelle30c05372021-03-08 16:36:46 +0100963 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL,
964 SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE);
Emeric Brun494c5052020-05-28 11:13:15 +0200965 }
966 else if (strcmp(args[0],"timeout") == 0) {
967 if (!cfg_sink || !cfg_sink->forward_px) {
968 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
969 err_code |= ERR_ALERT | ERR_FATAL;
970 goto err;
971 }
972
973 if (strcmp(args[1], "connect") == 0 ||
974 strcmp(args[1], "server") == 0) {
975 const char *res;
976 unsigned int tout;
977
978 if (!*args[2]) {
979 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
980 file, linenum, args[0], args[1]);
981 err_code |= ERR_ALERT | ERR_FATAL;
982 goto err;
983 }
984 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
985 if (res == PARSE_TIME_OVER) {
986 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
987 file, linenum, args[2], args[0], args[1]);
988 err_code |= ERR_ALERT | ERR_FATAL;
989 goto err;
990 }
991 else if (res == PARSE_TIME_UNDER) {
992 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
993 file, linenum, args[2], args[0], args[1]);
994 err_code |= ERR_ALERT | ERR_FATAL;
995 goto err;
996 }
997 else if (res) {
998 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
999 file, linenum, *res, args[0], args[1]);
1000 err_code |= ERR_ALERT | ERR_FATAL;
1001 goto err;
1002 }
Christopher Faulet321d1002022-10-19 16:26:21 +02001003 if (args[1][0] == 'c')
Emeric Brun494c5052020-05-28 11:13:15 +02001004 cfg_sink->forward_px->timeout.connect = tout;
1005 else
1006 cfg_sink->forward_px->timeout.server = tout;
1007 }
1008 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001009 else if (strcmp(args[0],"format") == 0) {
1010 if (!cfg_sink) {
1011 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
1012 err_code |= ERR_ALERT | ERR_FATAL;
1013 goto err;
1014 }
1015
Emeric Brun54648852020-07-06 15:54:06 +02001016 cfg_sink->fmt = get_log_format(args[1]);
1017 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +02001018 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
1019 err_code |= ERR_ALERT | ERR_FATAL;
1020 goto err;
1021 }
1022 }
1023 else if (strcmp(args[0],"maxlen") == 0) {
1024 if (!cfg_sink) {
1025 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
1026 err_code |= ERR_ALERT | ERR_FATAL;
1027 goto err;
1028 }
1029
1030 cfg_sink->maxlen = atol(args[1]);
1031 if (!cfg_sink->maxlen) {
1032 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
1033 err_code |= ERR_ALERT | ERR_FATAL;
1034 goto err;
1035 }
1036 }
1037 else if (strcmp(args[0],"description") == 0) {
1038 if (!cfg_sink) {
1039 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
1040 err_code |= ERR_ALERT | ERR_FATAL;
1041 goto err;
1042 }
1043
1044 if (!*args[1]) {
1045 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
1046 err_code |= ERR_ALERT | ERR_FATAL;
1047 goto err;
1048 }
1049
1050 free(cfg_sink->desc);
1051
1052 cfg_sink->desc = strdup(args[1]);
1053 if (!cfg_sink->desc) {
1054 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
1055 err_code |= ERR_ALERT | ERR_FATAL;
1056 goto err;
1057 }
1058 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +02001059 else {
1060 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
1061 err_code |= ERR_ALERT | ERR_FATAL;
1062 goto err;
1063 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001064
1065err:
1066 return err_code;
1067}
1068
Emeric Brun94aab062021-04-02 10:41:36 +02001069/* Creates an new sink buffer from a log server.
1070 *
1071 * It uses the logsrvaddress to declare a forward
1072 * server for this buffer. And it initializes the
1073 * forwarding.
1074 *
1075 * The function returns a pointer on the
1076 * allocated struct sink if allocate
1077 * and initialize succeed, else if it fails
1078 * it returns NULL.
1079 *
1080 * Note: the sink is created using the name
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +05001081 * specified into logsrv->ring_name
Emeric Brun94aab062021-04-02 10:41:36 +02001082 */
1083struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
1084{
1085 struct proxy *p = NULL;
1086 struct sink *sink = NULL;
1087 struct server *srv = NULL;
1088 struct sink_forward_target *sft = NULL;
Emeric Brun94aab062021-04-02 10:41:36 +02001089
1090 /* allocate new proxy to handle
1091 * forward to a stream server
1092 */
1093 p = calloc(1, sizeof *p);
1094 if (!p) {
1095 goto error;
1096 }
1097
1098 init_new_proxy(p);
1099 sink_setup_proxy(p);
1100 p->id = strdup(logsrv->ring_name);
1101 p->conf.args.file = p->conf.file = strdup(logsrv->conf.file);
1102 p->conf.args.line = p->conf.line = logsrv->conf.line;
1103
Christopher Fauletd08a25b2022-10-24 15:53:01 +02001104 /* Set default connect and server timeout */
1105 p->timeout.connect = MS_TO_TICKS(1000);
1106 p->timeout.server = MS_TO_TICKS(5000);
1107
Emeric Brun94aab062021-04-02 10:41:36 +02001108 /* allocate a new server to forward messages
1109 * from ring buffer
1110 */
1111 srv = new_server(p);
1112 if (!srv)
1113 goto error;
1114
1115 /* init server */
1116 srv->id = strdup(logsrv->ring_name);
1117 srv->conf.file = strdup(logsrv->conf.file);
1118 srv->conf.line = logsrv->conf.line;
1119 srv->addr = logsrv->addr;
1120 srv->svc_port = get_host_port(&logsrv->addr);
1121 HA_SPIN_INIT(&srv->lock);
1122
1123 /* process per thread init */
Miroslav Zagorac8a8f2702021-06-15 15:33:20 +02001124 if (srv_init_per_thr(srv) == -1)
Emeric Brun94aab062021-04-02 10:41:36 +02001125 goto error;
1126
Emeric Brun94aab062021-04-02 10:41:36 +02001127 /* the servers are linked backwards
1128 * first into proxy
1129 */
1130 p->srv = srv;
1131 srv->next = p->srv;
1132
1133 /* allocate sink_forward_target descriptor */
1134 sft = calloc(1, sizeof(*sft));
1135 if (!sft)
1136 goto error;
1137
1138 /* init sink_forward_target offset */
1139 sft->srv = srv;
1140 sft->appctx = NULL;
1141 sft->ofs = ~0;
1142 HA_SPIN_INIT(&sft->lock);
1143
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +05001144 /* prepare description for the sink */
Emeric Brun94aab062021-04-02 10:41:36 +02001145 chunk_reset(&trash);
1146 chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
1147
1148 /* allocate a new sink buffer */
1149 sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE);
1150 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1151 goto error;
1152 }
1153
1154 /* link sink_forward_target to proxy */
1155 sink->forward_px = p;
1156 p->parent = sink;
1157
1158 /* insert into sink_forward_targets
1159 * list into sink
1160 */
Christopher Faulet2ae25ea2022-05-12 14:50:09 +02001161 sft->sink = sink;
Emeric Brun94aab062021-04-02 10:41:36 +02001162 sft->next = sink->sft;
1163 sink->sft = sft;
1164
1165 /* mark server as an attached reader to the ring */
1166 if (!ring_attach(sink->ctx.ring)) {
1167 /* should never fail since there is
1168 * only one reader
1169 */
1170 goto error;
1171 }
1172
1173 /* initialize sink buffer forwarding */
1174 if (!sink_init_forward(sink))
1175 goto error;
1176
1177 /* reset familyt of logsrv to consider the ring buffer target */
1178 logsrv->addr.ss_family = AF_UNSPEC;
1179
1180 return sink;
1181error:
1182 if (p) {
1183 if (p->id)
1184 free(p->id);
1185 if (p->conf.file)
1186 free(p->conf.file);
1187
1188 free(p);
1189 }
1190
1191 if (srv) {
1192 if (srv->id)
1193 free(srv->id);
1194 if (srv->conf.file)
1195 free((void *)srv->conf.file);
1196 if (srv->per_thr)
1197 free(srv->per_thr);
1198 free(srv);
1199 }
1200
1201 if (sft)
1202 free(sft);
1203
1204 if (sink) {
1205 if (sink->ctx.ring)
1206 ring_free(sink->ctx.ring);
1207
Willy Tarreau2b718102021-04-21 07:32:39 +02001208 LIST_DELETE(&sink->sink_list);
Emeric Brun94aab062021-04-02 10:41:36 +02001209 free(sink->name);
1210 free(sink->desc);
1211 free(sink);
1212 }
1213
1214 return NULL;
1215}
1216
Emeric Brun99c453d2020-05-25 15:01:04 +02001217/*
1218 * Post parsing "ring" section.
1219 *
1220 * The function returns 0 in success case, otherwise, it returns error
1221 * flags.
1222 */
1223int cfg_post_parse_ring()
1224{
1225 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +02001226 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +02001227
1228 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
1229 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
1230 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +02001231 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +02001232 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
1233 err_code |= ERR_ALERT;
1234 }
Emeric Brun494c5052020-05-28 11:13:15 +02001235
1236 /* prepare forward server descriptors */
1237 if (cfg_sink->forward_px) {
1238 srv = cfg_sink->forward_px->srv;
1239 while (srv) {
1240 struct sink_forward_target *sft;
Emeric Brun99c453d2020-05-25 15:01:04 +02001241
Emeric Brun494c5052020-05-28 11:13:15 +02001242 /* allocate sink_forward_target descriptor */
1243 sft = calloc(1, sizeof(*sft));
1244 if (!sft) {
1245 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1246 err_code |= ERR_ALERT | ERR_FATAL;
1247 break;
1248 }
1249 sft->srv = srv;
1250 sft->appctx = NULL;
1251 sft->ofs = ~0; /* init ring offset */
Christopher Faulet96417f32022-08-04 16:00:13 +02001252 sft->sink = cfg_sink;
Emeric Brun494c5052020-05-28 11:13:15 +02001253 sft->next = cfg_sink->sft;
1254 HA_SPIN_INIT(&sft->lock);
1255
1256 /* mark server attached to the ring */
1257 if (!ring_attach(cfg_sink->ctx.ring)) {
1258 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1259 err_code |= ERR_ALERT | ERR_FATAL;
1260 }
1261 cfg_sink->sft = sft;
1262 srv = srv->next;
1263 }
1264 sink_init_forward(cfg_sink);
1265 }
1266 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001267 cfg_sink = NULL;
1268
1269 return err_code;
1270}
1271
1272/* resolve sink names at end of config. Returns 0 on success otherwise error
1273 * flags.
1274*/
1275int post_sink_resolve()
1276{
Christopher Fauletfc633b62020-11-06 15:24:23 +01001277 int err_code = ERR_NONE;
Emeric Brun99c453d2020-05-25 15:01:04 +02001278 struct logsrv *logsrv, *logb;
1279 struct sink *sink;
1280 struct proxy *px;
1281
1282 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1283 if (logsrv->type == LOG_TARGET_BUFFER) {
1284 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001285 if (!sink) {
1286 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1287 * means we must allocate a sink
1288 * buffer to send messages to this logsrv
1289 */
1290 if (logsrv->addr.ss_family != AF_UNSPEC) {
1291 sink = sink_new_from_logsrv(logsrv);
1292 if (!sink) {
1293 ha_alert("global stream log server declared in file '%s' at line %d cannot be initialized'.\n",
1294 logsrv->conf.file, logsrv->conf.line);
1295 err_code |= ERR_ALERT | ERR_FATAL;
1296 }
1297 }
1298 else {
1299 ha_alert("global log server declared in file '%s' at line %d uses unknown ring named '%s'.\n",
1300 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1301 err_code |= ERR_ALERT | ERR_FATAL;
1302 }
1303 }
1304 else if (sink->type != SINK_TYPE_BUFFER) {
1305 ha_alert("global log server declared in file '%s' at line %d uses incompatible ring '%s'.\n",
1306 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001307 err_code |= ERR_ALERT | ERR_FATAL;
1308 }
1309 logsrv->sink = sink;
1310 }
Emeric Brun94aab062021-04-02 10:41:36 +02001311
Emeric Brun99c453d2020-05-25 15:01:04 +02001312 }
1313
1314 for (px = proxies_list; px; px = px->next) {
1315 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1316 if (logsrv->type == LOG_TARGET_BUFFER) {
1317 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001318 if (!sink) {
1319 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1320 * means we must allocate a sink
1321 * buffer to send messages to this logsrv
1322 */
1323 if (logsrv->addr.ss_family != AF_UNSPEC) {
1324 sink = sink_new_from_logsrv(logsrv);
1325 if (!sink) {
1326 ha_alert("log server declared in proxy section '%s' file '%s' at line %d cannot be initialized'.\n",
1327 px->id, logsrv->conf.file, logsrv->conf.line);
1328 err_code |= ERR_ALERT | ERR_FATAL;
1329 }
1330 }
1331 else {
1332 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1333 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1334 err_code |= ERR_ALERT | ERR_FATAL;
1335 }
1336 }
1337 else if (sink->type != SINK_TYPE_BUFFER) {
1338 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses incomatible ring named '%s'.\n",
1339 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001340 err_code |= ERR_ALERT | ERR_FATAL;
1341 }
1342 logsrv->sink = sink;
1343 }
1344 }
1345 }
Emeric Brun12941c82020-07-07 14:19:42 +02001346
1347 for (px = cfg_log_forward; px; px = px->next) {
1348 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1349 if (logsrv->type == LOG_TARGET_BUFFER) {
1350 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001351 if (!sink) {
1352 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1353 * means we must allocate a sink
1354 * buffer to send messages to this logsrv
1355 */
1356 if (logsrv->addr.ss_family != AF_UNSPEC) {
1357 sink = sink_new_from_logsrv(logsrv);
1358 if (!sink) {
1359 ha_alert("log server declared in log-forward section '%s' file '%s' at line %d cannot be initialized'.\n",
1360 px->id, logsrv->conf.file, logsrv->conf.line);
1361 err_code |= ERR_ALERT | ERR_FATAL;
1362 }
1363 }
1364 else {
1365 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1366 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1367 err_code |= ERR_ALERT | ERR_FATAL;
1368 }
1369 }
1370 else if (sink->type != SINK_TYPE_BUFFER) {
1371 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1372 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun12941c82020-07-07 14:19:42 +02001373 err_code |= ERR_ALERT | ERR_FATAL;
1374 }
1375 logsrv->sink = sink;
1376 }
1377 }
1378 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001379 return err_code;
1380}
1381
1382
Willy Tarreau973e6622019-08-20 11:57:52 +02001383static void sink_init()
1384{
Emeric Brun54648852020-07-06 15:54:06 +02001385 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1386 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1387 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001388}
1389
1390static void sink_deinit()
1391{
1392 struct sink *sink, *sb;
1393
1394 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
Willy Tarreau0b8e9ce2022-08-11 16:38:20 +02001395 if (sink->type == SINK_TYPE_BUFFER) {
1396 if (sink->store)
1397 munmap(sink->ctx.ring->buf.area, sink->ctx.ring->buf.size);
1398 else
1399 ring_free(sink->ctx.ring);
1400 }
Willy Tarreau2b718102021-04-21 07:32:39 +02001401 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001402 free(sink->name);
1403 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001404 free(sink);
1405 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001406}
1407
1408INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001409REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001410
Willy Tarreau9f830d72019-08-26 18:17:04 +02001411static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaub205bfd2021-05-07 11:38:37 +02001412 { { "show", "events", NULL }, "show events [<sink>] [-w] [-n] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001413 {{},}
1414}};
1415
1416INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1417
Emeric Brun99c453d2020-05-25 15:01:04 +02001418/* config parsers for this section */
1419REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1420REGISTER_POST_CHECK(post_sink_resolve);
1421
Willy Tarreau67b5a162019-08-11 16:38:56 +02001422/*
1423 * Local variables:
1424 * c-indent-level: 8
1425 * c-basic-offset: 8
1426 * End:
1427 */