blob: 2eb050a16dabaf277b7d0bfc7915e8fe7025ae45 [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau0b8e9ce2022-08-11 16:38:20 +020021#include <sys/mman.h>
22#include <errno.h>
23#include <fcntl.h>
24
Willy Tarreau36979d92020-06-05 17:27:29 +020025#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020026#include <haproxy/api.h>
Christopher Faulet6b0a0fb2022-04-04 11:29:28 +020027#include <haproxy/applet.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020028#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020029#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020030#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020031#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020032#include <haproxy/log.h>
Willy Tarreau817538e2021-05-08 20:20:21 +020033#include <haproxy/proxy.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020034#include <haproxy/ring.h>
Willy Tarreau5edca2f2022-05-27 09:25:10 +020035#include <haproxy/sc_strm.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020036#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020037#include <haproxy/sink.h>
Willy Tarreaucb086c62022-05-27 09:47:12 +020038#include <haproxy/stconn.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020039#include <haproxy/time.h>
Willy Tarreau4bad5e22021-05-08 13:05:30 +020040#include <haproxy/tools.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020041
42struct list sink_list = LIST_HEAD_INIT(sink_list);
43
Emeric Brun99c453d2020-05-25 15:01:04 +020044struct sink *cfg_sink;
45
Willy Tarreau67b5a162019-08-11 16:38:56 +020046struct sink *sink_find(const char *name)
47{
48 struct sink *sink;
49
50 list_for_each_entry(sink, &sink_list, sink_list)
51 if (strcmp(sink->name, name) == 0)
52 return sink;
53 return NULL;
54}
55
56/* creates a new sink and adds it to the list, it's still generic and not fully
57 * initialized. Returns NULL on allocation failure. If another one already
58 * exists with the same name, it will be returned. The caller can detect it as
59 * a newly created one has type SINK_TYPE_NEW.
60 */
Emeric Brun54648852020-07-06 15:54:06 +020061static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020062{
63 struct sink *sink;
64
65 sink = sink_find(name);
66 if (sink)
67 goto end;
68
Emeric Brun494c5052020-05-28 11:13:15 +020069 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020070 if (!sink)
71 goto end;
72
Emeric Brun99c453d2020-05-25 15:01:04 +020073 sink->name = strdup(name);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010074 if (!sink->name)
75 goto err;
76
Emeric Brun99c453d2020-05-25 15:01:04 +020077 sink->desc = strdup(desc);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010078 if (!sink->desc)
79 goto err;
80
Willy Tarreau67b5a162019-08-11 16:38:56 +020081 sink->fmt = fmt;
82 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010083 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020084 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020085 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020086 sink->ctx.dropped = 0;
87 HA_RWLOCK_INIT(&sink->ctx.lock);
Willy Tarreau2b718102021-04-21 07:32:39 +020088 LIST_APPEND(&sink_list, &sink->sink_list);
Willy Tarreau67b5a162019-08-11 16:38:56 +020089 end:
90 return sink;
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010091
92 err:
Willy Tarreau61cfdf42021-02-20 10:46:51 +010093 ha_free(&sink->name);
94 ha_free(&sink->desc);
95 ha_free(&sink);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010096
97 return NULL;
Willy Tarreau67b5a162019-08-11 16:38:56 +020098}
99
Willy Tarreau973e6622019-08-20 11:57:52 +0200100/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
101 * and description <desc>. Returns NULL on allocation failure or conflict.
102 * Perfect duplicates are merged (same type, fd, and name).
103 */
Emeric Brun54648852020-07-06 15:54:06 +0200104struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +0200105{
106 struct sink *sink;
107
108 sink = __sink_new(name, desc, fmt);
109 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
110 goto end;
111
112 if (sink->type != SINK_TYPE_NEW) {
113 sink = NULL;
114 goto end;
115 }
116
117 sink->type = SINK_TYPE_FD;
118 sink->ctx.fd = fd;
119 end:
120 return sink;
121}
122
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200123/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
124 * and description <desc>. Returns NULL on allocation failure or conflict.
125 * Perfect duplicates are merged (same type and name). If sizes differ, the
126 * largest one is kept.
127 */
Emeric Brun54648852020-07-06 15:54:06 +0200128struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200129{
130 struct sink *sink;
131
132 sink = __sink_new(name, desc, fmt);
133 if (!sink)
134 goto fail;
135
136 if (sink->type == SINK_TYPE_BUFFER) {
137 /* such a buffer already exists, we may have to resize it */
138 if (!ring_resize(sink->ctx.ring, size))
139 goto fail;
140 goto end;
141 }
142
143 if (sink->type != SINK_TYPE_NEW) {
144 /* already exists of another type */
145 goto fail;
146 }
147
148 sink->ctx.ring = ring_new(size);
149 if (!sink->ctx.ring) {
Willy Tarreau2b718102021-04-21 07:32:39 +0200150 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200151 free(sink->name);
152 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200153 free(sink);
154 goto fail;
155 }
156
157 sink->type = SINK_TYPE_BUFFER;
158 end:
159 return sink;
160 fail:
161 return NULL;
162}
163
Willy Tarreau67b5a162019-08-11 16:38:56 +0200164/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500165 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200166 * done here. Lost messages are NOT accounted for. It is preferable to call
167 * sink_write() instead which will also try to emit the number of dropped
168 * messages when there are any. It returns >0 if it could write anything,
169 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200170 */
Emeric Brun54648852020-07-06 15:54:06 +0200171 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
172 int level, int facility, struct ist *metadata)
173 {
174 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200175 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200176
Emeric Brun54648852020-07-06 15:54:06 +0200177 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200178 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200179
Emeric Brun54648852020-07-06 15:54:06 +0200180 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200181
182send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200183 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200184 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200185 }
186 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200187 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200188 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200189 return 0;
190}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200191
Willy Tarreau8f240232019-08-27 16:41:06 +0200192/* Tries to emit a message indicating the number of dropped events. In case of
193 * success, the amount of drops is reduced by as much. It's supposed to be
194 * called under an exclusive lock on the sink to avoid multiple produces doing
195 * the same. On success, >0 is returned, otherwise <=0 on failure.
196 */
Emeric Brun54648852020-07-06 15:54:06 +0200197int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200198{
Emeric Brun54648852020-07-06 15:54:06 +0200199 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
200 static THREAD_LOCAL pid_t curr_pid;
201 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200202 unsigned int dropped;
203 struct buffer msg;
204 struct ist msgvec[1];
205 char logbuf[64];
206
207 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
208 chunk_init(&msg, logbuf, sizeof(logbuf));
209 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
210 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200211
Emeric Brun54648852020-07-06 15:54:06 +0200212 if (!metadata[LOG_META_HOST].len) {
213 if (global.log_send_hostname)
Tim Duesterhus77508502022-03-15 13:11:06 +0100214 metadata[LOG_META_HOST] = ist(global.log_send_hostname);
Emeric Brun54648852020-07-06 15:54:06 +0200215 }
216
217 if (!metadata[LOG_META_TAG].len)
218 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
219
220 if (unlikely(curr_pid != getpid()))
221 metadata[LOG_META_PID].len = 0;
222
223 if (!metadata[LOG_META_PID].len) {
224 curr_pid = getpid();
225 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
226 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
227 }
228
229 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200230 return 0;
231 /* success! */
232 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
233 }
234 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200235}
236
Willy Tarreau9f830d72019-08-26 18:17:04 +0200237/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
238static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
239{
240 struct sink *sink;
Willy Tarreaucba88382022-05-05 15:18:57 +0200241 uint ring_flags;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200242 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200243
244 args++; // make args[1] the 1st arg
245
246 if (!*args[1]) {
247 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200248 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200249 list_for_each_entry(sink, &sink_list, sink_list) {
250 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
251 sink->name,
252 sink->type == SINK_TYPE_NEW ? "init" :
253 sink->type == SINK_TYPE_FD ? "fd" :
254 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
255 sink->ctx.dropped, sink->desc);
256 }
257
258 trash.area[trash.data] = 0;
259 return cli_msg(appctx, LOG_WARNING, trash.area);
260 }
261
262 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
263 return 1;
264
265 sink = sink_find(args[1]);
266 if (!sink)
267 return cli_err(appctx, "No such event sink");
268
269 if (sink->type != SINK_TYPE_BUFFER)
270 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
271
Willy Tarreaucba88382022-05-05 15:18:57 +0200272 ring_flags = 0;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200273 for (arg = 2; *args[arg]; arg++) {
274 if (strcmp(args[arg], "-w") == 0)
Willy Tarreaucba88382022-05-05 15:18:57 +0200275 ring_flags |= RING_WF_WAIT_MODE;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200276 else if (strcmp(args[arg], "-n") == 0)
Willy Tarreaucba88382022-05-05 15:18:57 +0200277 ring_flags |= RING_WF_SEEK_NEW;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200278 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
Willy Tarreaucba88382022-05-05 15:18:57 +0200279 ring_flags |= RING_WF_WAIT_MODE | RING_WF_SEEK_NEW;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200280 else
281 return cli_err(appctx, "unknown option");
282 }
Willy Tarreaucba88382022-05-05 15:18:57 +0200283 return ring_attach_cli(sink->ctx.ring, appctx, ring_flags);
Willy Tarreau9f830d72019-08-26 18:17:04 +0200284}
285
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500286/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200287void sink_setup_proxy(struct proxy *px)
288{
289 px->last_change = now.tv_sec;
290 px->cap = PR_CAP_FE | PR_CAP_BE;
291 px->maxconn = 0;
292 px->conn_retries = 1;
293 px->timeout.server = TICK_ETERNITY;
294 px->timeout.client = TICK_ETERNITY;
295 px->timeout.connect = TICK_ETERNITY;
296 px->accept = NULL;
297 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
Emeric Brun494c5052020-05-28 11:13:15 +0200298}
299
300/*
Willy Tarreau42cc8312022-05-04 20:42:23 +0200301 * IO Handler to handle message push to syslog tcp server.
302 * It takes its context from appctx->svcctx.
Emeric Brun494c5052020-05-28 11:13:15 +0200303 */
304static void sink_forward_io_handler(struct appctx *appctx)
305{
Willy Tarreauc12b3212022-05-27 11:08:15 +0200306 struct stconn *sc = appctx_sc(appctx);
Willy Tarreau0eca5392022-05-27 10:44:25 +0200307 struct stream *s = __sc_strm(sc);
Emeric Brun494c5052020-05-28 11:13:15 +0200308 struct sink *sink = strm_fe(s)->parent;
Willy Tarreau42cc8312022-05-04 20:42:23 +0200309 struct sink_forward_target *sft = appctx->svcctx;
Emeric Brun494c5052020-05-28 11:13:15 +0200310 struct ring *ring = sink->ctx.ring;
311 struct buffer *buf = &ring->buf;
312 uint64_t msg_len;
Willy Tarreau53bfab02022-08-04 17:18:54 +0200313 size_t len, cnt, ofs, last_ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200314 int ret = 0;
315
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500316 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200317 if (unlikely(stopping))
318 goto close;
319
320 /* for rex because it seems reset to timeout
321 * and we don't want expire on this case
322 * with a syslog server
323 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200324 sc_oc(sc)->rex = TICK_ETERNITY;
Emeric Brun494c5052020-05-28 11:13:15 +0200325 /* rto should not change but it seems the case */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200326 sc_oc(sc)->rto = TICK_ETERNITY;
Emeric Brun494c5052020-05-28 11:13:15 +0200327
328 /* an error was detected */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200329 if (unlikely(sc_ic(sc)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
Emeric Brun494c5052020-05-28 11:13:15 +0200330 goto close;
331
332 /* con closed by server side */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200333 if ((sc_oc(sc)->flags & CF_SHUTW))
Emeric Brun494c5052020-05-28 11:13:15 +0200334 goto close;
335
336 /* if the connection is not established, inform the stream that we want
337 * to be notified whenever the connection completes.
338 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200339 if (sc_opposite(sc)->state < SC_ST_EST) {
Willy Tarreau90e8b452022-05-25 18:21:43 +0200340 applet_need_more_data(appctx);
Willy Tarreaub23edc82022-05-24 16:49:03 +0200341 se_need_remote_conn(appctx->sedesc);
Willy Tarreau4164eb92022-05-25 15:42:03 +0200342 applet_have_more_data(appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200343 return;
344 }
345
346 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
347 if (appctx != sft->appctx) {
348 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
349 goto close;
350 }
351 ofs = sft->ofs;
352
353 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
354 LIST_DEL_INIT(&appctx->wait_entry);
355 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
356
357 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
358
359 /* explanation for the initialization below: it would be better to do
360 * this in the parsing function but this would occasionally result in
361 * dropped events because we'd take a reference on the oldest message
362 * and keep it while being scheduled. Thus instead let's take it the
363 * first time we enter here so that we have a chance to pass many
364 * existing messages before grabbing a reference to a location. This
365 * value cannot be produced after initialization.
366 */
367 if (unlikely(ofs == ~0)) {
368 ofs = 0;
369
Willy Tarreau4781b152021-04-06 13:53:36 +0200370 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200371 ofs += ring->ofs;
372 }
373
Emeric Brun494c5052020-05-28 11:13:15 +0200374 /* in this loop, ofs always points to the counter byte that precedes
375 * the message so that we can take our reference there if we have to
376 * stop before the end (ret=0).
377 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200378 if (sc_opposite(sc)->state == SC_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100379 /* we were already there, adjust the offset to be relative to
380 * the buffer's head and remove us from the counter.
381 */
382 ofs -= ring->ofs;
383 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200384 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100385
Emeric Brun494c5052020-05-28 11:13:15 +0200386 ret = 1;
387 while (ofs + 1 < b_data(buf)) {
388 cnt = 1;
389 len = b_peek_varint(buf, ofs + cnt, &msg_len);
390 if (!len)
391 break;
392 cnt += len;
393 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
394
395 if (unlikely(msg_len + 1 > b_size(&trash))) {
396 /* too large a message to ever fit, let's skip it */
397 ofs += cnt + msg_len;
398 continue;
399 }
400
401 chunk_reset(&trash);
402 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
403 trash.data += len;
404 trash.area[trash.data++] = '\n';
405
Willy Tarreaud0a06d52022-05-18 15:07:19 +0200406 if (applet_putchk(appctx, &trash) == -1) {
Emeric Brun494c5052020-05-28 11:13:15 +0200407 ret = 0;
408 break;
409 }
410 ofs += cnt + msg_len;
411 }
412
Willy Tarreau4781b152021-04-06 13:53:36 +0200413 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200414 ofs += ring->ofs;
415 sft->ofs = ofs;
Willy Tarreau53bfab02022-08-04 17:18:54 +0200416 last_ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200417 }
418 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
419
420 if (ret) {
421 /* let's be woken up once new data arrive */
422 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200423 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Willy Tarreau53bfab02022-08-04 17:18:54 +0200424 ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200425 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau53bfab02022-08-04 17:18:54 +0200426 if (ofs != last_ofs) {
427 /* more data was added into the ring between the
428 * unlock and the lock, and the writer might not
429 * have seen us. We need to reschedule a read.
430 */
431 applet_have_more_data(appctx);
432 } else
433 applet_have_no_more_data(appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200434 }
435 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
436
437 /* always drain data from server */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200438 co_skip(sc_oc(sc), sc_oc(sc)->output);
Emeric Brun494c5052020-05-28 11:13:15 +0200439 return;
440
441close:
Willy Tarreau0eca5392022-05-27 10:44:25 +0200442 sc_shutw(sc);
443 sc_shutr(sc);
444 sc_ic(sc)->flags |= CF_READ_NULL;
Emeric Brun494c5052020-05-28 11:13:15 +0200445}
446
Emeric Brun97556472020-05-30 01:42:45 +0200447/*
448 * IO Handler to handle message push to syslog tcp server
449 * using octet counting frames
Willy Tarreau42cc8312022-05-04 20:42:23 +0200450 * It takes its context from appctx->svcctx.
Emeric Brun97556472020-05-30 01:42:45 +0200451 */
452static void sink_forward_oc_io_handler(struct appctx *appctx)
453{
Willy Tarreauc12b3212022-05-27 11:08:15 +0200454 struct stconn *sc = appctx_sc(appctx);
Willy Tarreau0eca5392022-05-27 10:44:25 +0200455 struct stream *s = __sc_strm(sc);
Emeric Brun97556472020-05-30 01:42:45 +0200456 struct sink *sink = strm_fe(s)->parent;
Willy Tarreau42cc8312022-05-04 20:42:23 +0200457 struct sink_forward_target *sft = appctx->svcctx;
Emeric Brun97556472020-05-30 01:42:45 +0200458 struct ring *ring = sink->ctx.ring;
459 struct buffer *buf = &ring->buf;
460 uint64_t msg_len;
461 size_t len, cnt, ofs;
462 int ret = 0;
463 char *p;
464
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500465 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200466 if (unlikely(stopping))
467 goto close;
468
469 /* for rex because it seems reset to timeout
470 * and we don't want expire on this case
471 * with a syslog server
472 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200473 sc_oc(sc)->rex = TICK_ETERNITY;
Emeric Brun97556472020-05-30 01:42:45 +0200474 /* rto should not change but it seems the case */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200475 sc_oc(sc)->rto = TICK_ETERNITY;
Emeric Brun97556472020-05-30 01:42:45 +0200476
477 /* an error was detected */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200478 if (unlikely(sc_ic(sc)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
Emeric Brun97556472020-05-30 01:42:45 +0200479 goto close;
480
481 /* con closed by server side */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200482 if ((sc_oc(sc)->flags & CF_SHUTW))
Emeric Brun97556472020-05-30 01:42:45 +0200483 goto close;
484
485 /* if the connection is not established, inform the stream that we want
486 * to be notified whenever the connection completes.
487 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200488 if (sc_opposite(sc)->state < SC_ST_EST) {
Willy Tarreau90e8b452022-05-25 18:21:43 +0200489 applet_need_more_data(appctx);
Willy Tarreaub23edc82022-05-24 16:49:03 +0200490 se_need_remote_conn(appctx->sedesc);
Willy Tarreau4164eb92022-05-25 15:42:03 +0200491 applet_have_more_data(appctx);
Emeric Brun97556472020-05-30 01:42:45 +0200492 return;
493 }
494
495 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
496 if (appctx != sft->appctx) {
497 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
498 goto close;
499 }
500 ofs = sft->ofs;
501
502 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
503 LIST_DEL_INIT(&appctx->wait_entry);
504 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
505
506 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
507
508 /* explanation for the initialization below: it would be better to do
509 * this in the parsing function but this would occasionally result in
510 * dropped events because we'd take a reference on the oldest message
511 * and keep it while being scheduled. Thus instead let's take it the
512 * first time we enter here so that we have a chance to pass many
513 * existing messages before grabbing a reference to a location. This
514 * value cannot be produced after initialization.
515 */
516 if (unlikely(ofs == ~0)) {
517 ofs = 0;
518
Willy Tarreau4781b152021-04-06 13:53:36 +0200519 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200520 ofs += ring->ofs;
521 }
522
Emeric Brun97556472020-05-30 01:42:45 +0200523 /* in this loop, ofs always points to the counter byte that precedes
524 * the message so that we can take our reference there if we have to
525 * stop before the end (ret=0).
526 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200527 if (sc_opposite(sc)->state == SC_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100528 /* we were already there, adjust the offset to be relative to
529 * the buffer's head and remove us from the counter.
530 */
531 ofs -= ring->ofs;
532 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200533 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100534
Emeric Brun97556472020-05-30 01:42:45 +0200535 ret = 1;
536 while (ofs + 1 < b_data(buf)) {
537 cnt = 1;
538 len = b_peek_varint(buf, ofs + cnt, &msg_len);
539 if (!len)
540 break;
541 cnt += len;
542 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
543
544 chunk_reset(&trash);
545 p = ulltoa(msg_len, trash.area, b_size(&trash));
546 if (p) {
547 trash.data = (p - trash.area) + 1;
548 *p = ' ';
549 }
550
551 if (!p || (trash.data + msg_len > b_size(&trash))) {
552 /* too large a message to ever fit, let's skip it */
553 ofs += cnt + msg_len;
554 continue;
555 }
556
557 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
558
Willy Tarreaud0a06d52022-05-18 15:07:19 +0200559 if (applet_putchk(appctx, &trash) == -1) {
Emeric Brun97556472020-05-30 01:42:45 +0200560 ret = 0;
561 break;
562 }
563 ofs += cnt + msg_len;
564 }
565
Willy Tarreau4781b152021-04-06 13:53:36 +0200566 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200567 ofs += ring->ofs;
568 sft->ofs = ofs;
569 }
570 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
571
572 if (ret) {
573 /* let's be woken up once new data arrive */
574 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200575 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Emeric Brun97556472020-05-30 01:42:45 +0200576 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau4164eb92022-05-25 15:42:03 +0200577 applet_have_no_more_data(appctx);
Emeric Brun97556472020-05-30 01:42:45 +0200578 }
579 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
580
581 /* always drain data from server */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200582 co_skip(sc_oc(sc), sc_oc(sc)->output);
Emeric Brun97556472020-05-30 01:42:45 +0200583 return;
584
585close:
Willy Tarreau0eca5392022-05-27 10:44:25 +0200586 sc_shutw(sc);
587 sc_shutr(sc);
588 sc_ic(sc)->flags |= CF_READ_NULL;
Emeric Brun97556472020-05-30 01:42:45 +0200589}
590
Emeric Brun494c5052020-05-28 11:13:15 +0200591void __sink_forward_session_deinit(struct sink_forward_target *sft)
592{
Willy Tarreau0698c802022-05-11 14:09:57 +0200593 struct stream *s = appctx_strm(sft->appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200594 struct sink *sink;
595
Emeric Brun494c5052020-05-28 11:13:15 +0200596 sink = strm_fe(s)->parent;
597 if (!sink)
598 return;
599
600 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
601 LIST_DEL_INIT(&sft->appctx->wait_entry);
602 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
603
604 sft->appctx = NULL;
605 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
606}
607
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200608static int sink_forward_session_init(struct appctx *appctx)
609{
610 struct sink_forward_target *sft = appctx->svcctx;
611 struct stream *s;
612 struct sockaddr_storage *addr = NULL;
613
614 if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
615 goto out_error;
616
617 if (appctx_finalize_startup(appctx, sft->sink->forward_px, &BUF_NULL) == -1)
618 goto out_free_addr;
619
620 s = appctx_strm(appctx);
Willy Tarreau7cb9e6c2022-05-17 19:40:40 +0200621 s->scb->dst = addr;
Willy Tarreaucb041662022-05-17 19:44:42 +0200622 s->scb->flags |= SC_FL_NOLINGER;
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200623
624 s->target = &sft->srv->obj_type;
625 s->flags = SF_ASSIGNED;
626
627 s->do_log = NULL;
628 s->uniq_id = 0;
629
630 s->res.flags |= CF_READ_DONTWAIT;
631 /* for rto and rex to eternity to not expire on idle recv:
632 * We are using a syslog server.
633 */
634 s->res.rto = TICK_ETERNITY;
635 s->res.rex = TICK_ETERNITY;
636 sft->appctx = appctx;
637
638 return 0;
639
640 out_free_addr:
641 sockaddr_free(&addr);
642 out_error:
643 return -1;
644}
Emeric Brun494c5052020-05-28 11:13:15 +0200645
646static void sink_forward_session_release(struct appctx *appctx)
647{
Willy Tarreau42cc8312022-05-04 20:42:23 +0200648 struct sink_forward_target *sft = appctx->svcctx;
Emeric Brun494c5052020-05-28 11:13:15 +0200649
650 if (!sft)
651 return;
652
653 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
654 if (sft->appctx == appctx)
655 __sink_forward_session_deinit(sft);
656 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
657}
658
659static struct applet sink_forward_applet = {
660 .obj_type = OBJ_TYPE_APPLET,
661 .name = "<SINKFWD>", /* used for logging */
662 .fct = sink_forward_io_handler,
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200663 .init = sink_forward_session_init,
Emeric Brun494c5052020-05-28 11:13:15 +0200664 .release = sink_forward_session_release,
665};
666
Emeric Brun97556472020-05-30 01:42:45 +0200667static struct applet sink_forward_oc_applet = {
668 .obj_type = OBJ_TYPE_APPLET,
669 .name = "<SINKFWDOC>", /* used for logging */
670 .fct = sink_forward_oc_io_handler,
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200671 .init = sink_forward_session_init,
Emeric Brun97556472020-05-30 01:42:45 +0200672 .release = sink_forward_session_release,
673};
674
Emeric Brun494c5052020-05-28 11:13:15 +0200675/*
676 * Create a new peer session in assigned state (connect will start automatically)
Willy Tarreau42cc8312022-05-04 20:42:23 +0200677 * It sets its context into appctx->svcctx.
Emeric Brun494c5052020-05-28 11:13:15 +0200678 */
679static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
680{
Emeric Brun494c5052020-05-28 11:13:15 +0200681 struct appctx *appctx;
Emeric Brun97556472020-05-30 01:42:45 +0200682 struct applet *applet = &sink_forward_applet;
683
684 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
685 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200686
Christopher Faulet6095d572022-05-16 17:09:48 +0200687 appctx = appctx_new_here(applet, NULL);
Christopher Faulet2479e5f2022-01-19 14:50:11 +0100688 if (!appctx)
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100689 goto out_close;
Willy Tarreau42cc8312022-05-04 20:42:23 +0200690 appctx->svcctx = (void *)sft;
Emeric Brun494c5052020-05-28 11:13:15 +0200691
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200692 if (appctx_init(appctx) == -1)
Christopher Faulet92202da2022-05-11 12:22:10 +0200693 goto out_free_appctx;
Christopher Faulet13a35e52021-12-20 15:34:16 +0100694
Emeric Brun494c5052020-05-28 11:13:15 +0200695 return appctx;
696
697 /* Error unrolling */
Emeric Brun494c5052020-05-28 11:13:15 +0200698 out_free_appctx:
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200699 appctx_free_on_early_error(appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200700 out_close:
701 return NULL;
702}
703
704/*
705 * Task to handle connctions to forward servers
706 */
Willy Tarreau144f84a2021-03-02 16:09:26 +0100707static struct task *process_sink_forward(struct task * task, void *context, unsigned int state)
Emeric Brun494c5052020-05-28 11:13:15 +0200708{
709 struct sink *sink = (struct sink *)context;
710 struct sink_forward_target *sft = sink->sft;
711
712 task->expire = TICK_ETERNITY;
713
714 if (!stopping) {
715 while (sft) {
716 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
717 /* if appctx is NULL, start a new session */
718 if (!sft->appctx)
719 sft->appctx = sink_forward_session_create(sink, sft);
720 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
721 sft = sft->next;
722 }
723 }
724 else {
725 while (sft) {
726 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
727 /* awake applet to perform a clean close */
728 if (sft->appctx)
729 appctx_wakeup(sft->appctx);
730 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
731 sft = sft->next;
732 }
733 }
734
735 return task;
736}
737/*
738 * Init task to manage connctions to forward servers
739 *
740 * returns 0 in case of error.
741 */
742int sink_init_forward(struct sink *sink)
743{
Willy Tarreaubeeabf52021-10-01 18:23:30 +0200744 sink->forward_task = task_new_anywhere();
Emeric Brun494c5052020-05-28 11:13:15 +0200745 if (!sink->forward_task)
746 return 0;
747
748 sink->forward_task->process = process_sink_forward;
749 sink->forward_task->context = (void *)sink;
750 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
751 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
752 return 1;
753}
Willy Tarreau32872db2022-08-31 18:52:17 +0200754
755/* This tries to rotate a file-backed ring, but only if it contains contents.
756 * This way empty rings will not cause backups to be overwritten and it's safe
757 * to reload multiple times. That's only best effort, failures are silently
758 * ignored.
759 */
760void sink_rotate_file_backed_ring(const char *name)
761{
762 struct ring ring;
763 char *oldback;
764 int ret;
765 int fd;
766
767 fd = open(name, O_RDONLY);
768 if (fd < 0)
769 return;
770
771 /* check for contents validity */
772 ret = read(fd, &ring, sizeof(ring));
773 close(fd);
774
775 if (ret != sizeof(ring))
776 goto rotate;
777
778 /* contents are present, we want to keep them => rotate. Note that
779 * an empty ring buffer has one byte (the marker).
780 */
781 if (ring.buf.data > 1)
782 goto rotate;
783
784 /* nothing to keep, let's scratch the file and preserve the backup */
785 return;
786
787 rotate:
788 oldback = NULL;
789 memprintf(&oldback, "%s.bak", name);
790 if (oldback) {
791 /* try to rename any possibly existing ring file to
792 * ".bak" and delete remains of older ones. This will
793 * ensure we don't wipe useful debug info upon restart.
794 */
795 unlink(oldback);
796 if (rename(name, oldback) < 0)
797 unlink(oldback);
798 ha_free(&oldback);
799 }
800}
801
Emeric Brun99c453d2020-05-25 15:01:04 +0200802/*
803 * Parse "ring" section and create corresponding sink buffer.
804 *
805 * The function returns 0 in success case, otherwise, it returns error
806 * flags.
807 */
808int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
809{
810 int err_code = 0;
811 const char *inv;
812 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200813 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200814
Willy Tarreau18d13062022-08-11 16:12:11 +0200815 if (strcmp(args[0], "ring") == 0) { /* new ring section */
Emeric Brun99c453d2020-05-25 15:01:04 +0200816 if (!*args[1]) {
817 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
818 err_code |= ERR_ALERT | ERR_FATAL;
819 goto err;
820 }
821
822 inv = invalid_char(args[1]);
823 if (inv) {
824 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
825 err_code |= ERR_ALERT | ERR_FATAL;
826 goto err;
827 }
828
829 if (sink_find(args[1])) {
830 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
831 err_code |= ERR_ALERT | ERR_FATAL;
832 goto err;
833 }
834
Emeric Brun54648852020-07-06 15:54:06 +0200835 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200836 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
837 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
838 err_code |= ERR_ALERT | ERR_FATAL;
839 goto err;
840 }
Emeric Brun494c5052020-05-28 11:13:15 +0200841
842 /* allocate new proxy to handle forwards */
843 p = calloc(1, sizeof *p);
844 if (!p) {
845 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
846 err_code |= ERR_ALERT | ERR_FATAL;
847 goto err;
848 }
849
850 init_new_proxy(p);
851 sink_setup_proxy(p);
852 p->parent = cfg_sink;
853 p->id = strdup(args[1]);
854 p->conf.args.file = p->conf.file = strdup(file);
855 p->conf.args.line = p->conf.line = linenum;
856 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200857 }
858 else if (strcmp(args[0], "size") == 0) {
Willy Tarreau18d13062022-08-11 16:12:11 +0200859 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
860 ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum);
861 err_code |= ERR_ALERT | ERR_FATAL;
862 goto err;
863 }
864
Emeric Brun99c453d2020-05-25 15:01:04 +0200865 size = atol(args[1]);
866 if (!size) {
867 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
868 err_code |= ERR_ALERT | ERR_FATAL;
869 goto err;
870 }
871
Willy Tarreau0b8e9ce2022-08-11 16:38:20 +0200872 if (cfg_sink->store) {
873 ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum);
874 err_code |= ERR_ALERT | ERR_FATAL;
875 goto err;
876 }
877
Willy Tarreau18d13062022-08-11 16:12:11 +0200878 if (size < cfg_sink->ctx.ring->buf.size) {
879 ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than current size '%llu' for ring '%s'.\n",
880 file, linenum, (ullong)size, (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
881 err_code |= ERR_ALERT | ERR_FATAL;
882 goto err;
883 }
884
885 if (!ring_resize(cfg_sink->ctx.ring, size)) {
886 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%llu' for ring '%s'.\n", file, linenum,
887 (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name);
Emeric Brun99c453d2020-05-25 15:01:04 +0200888 err_code |= ERR_ALERT | ERR_FATAL;
889 goto err;
890 }
Willy Tarreau0b8e9ce2022-08-11 16:38:20 +0200891 }
892 else if (strcmp(args[0], "backing-file") == 0) {
893 /* This tries to mmap file <file> for size <size> and to use it as a backing store
894 * for ring <ring>. Existing data are delete. NULL is returned on error.
895 */
896 const char *backing = args[1];
897 size_t size;
898 void *area;
899 int fd;
900
901 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
902 ha_alert("parsing [%s:%d] : 'backing-file' only usable with existing rings.\n", file, linenum);
903 err_code |= ERR_ALERT | ERR_FATAL;
904 goto err;
905 }
906
907 if (cfg_sink->store) {
908 ha_alert("parsing [%s:%d] : 'backing-file' already specified for ring '%s' (was '%s').\n", file, linenum, cfg_sink->name, cfg_sink->store);
909 err_code |= ERR_ALERT | ERR_FATAL;
910 goto err;
911 }
912
Willy Tarreau32872db2022-08-31 18:52:17 +0200913 /* let's check if the file exists and is not empty. That's the
914 * only condition under which we'll trigger a rotate, so that
915 * config checks, reloads, or restarts that don't emit anything
916 * do not rotate it again.
917 */
918 sink_rotate_file_backed_ring(backing);
Willy Tarreauded77cc2022-08-12 15:38:20 +0200919
Willy Tarreau8e877052022-08-12 15:03:12 +0200920 fd = open(backing, O_RDWR | O_CREAT, 0600);
Willy Tarreau0b8e9ce2022-08-11 16:38:20 +0200921 if (fd < 0) {
922 ha_alert("parsing [%s:%d] : cannot open backing-file '%s' for ring '%s': %s.\n", file, linenum, backing, cfg_sink->name, strerror(errno));
923 err_code |= ERR_ALERT | ERR_FATAL;
924 goto err;
925 }
926
927 size = (cfg_sink->ctx.ring->buf.size + 4095UL) & -4096UL;
928 if (ftruncate(fd, size) != 0) {
929 close(fd);
930 ha_alert("parsing [%s:%d] : could not adjust size of backing-file for ring '%s': %s.\n", file, linenum, cfg_sink->name, strerror(errno));
931 err_code |= ERR_ALERT | ERR_FATAL;
932 goto err;
933 }
934
935 area = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
936 if (area == MAP_FAILED) {
937 close(fd);
938 ha_alert("parsing [%s:%d] : failed to use '%s' as a backing file for ring '%s': %s.\n", file, linenum, backing, cfg_sink->name, strerror(errno));
939 err_code |= ERR_ALERT | ERR_FATAL;
940 goto err;
941 }
942
943 /* we don't need the file anymore */
944 close(fd);
945 cfg_sink->store = strdup(backing);
946
947 /* never fails */
948 ring_free(cfg_sink->ctx.ring);
949 cfg_sink->ctx.ring = ring_make_from_area(area, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200950 }
Emeric Brun494c5052020-05-28 11:13:15 +0200951 else if (strcmp(args[0],"server") == 0) {
Amaury Denoyelle30c05372021-03-08 16:36:46 +0100952 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL,
953 SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE);
Emeric Brun494c5052020-05-28 11:13:15 +0200954 }
955 else if (strcmp(args[0],"timeout") == 0) {
956 if (!cfg_sink || !cfg_sink->forward_px) {
957 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
958 err_code |= ERR_ALERT | ERR_FATAL;
959 goto err;
960 }
961
962 if (strcmp(args[1], "connect") == 0 ||
963 strcmp(args[1], "server") == 0) {
964 const char *res;
965 unsigned int tout;
966
967 if (!*args[2]) {
968 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
969 file, linenum, args[0], args[1]);
970 err_code |= ERR_ALERT | ERR_FATAL;
971 goto err;
972 }
973 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
974 if (res == PARSE_TIME_OVER) {
975 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
976 file, linenum, args[2], args[0], args[1]);
977 err_code |= ERR_ALERT | ERR_FATAL;
978 goto err;
979 }
980 else if (res == PARSE_TIME_UNDER) {
981 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
982 file, linenum, args[2], args[0], args[1]);
983 err_code |= ERR_ALERT | ERR_FATAL;
984 goto err;
985 }
986 else if (res) {
987 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
988 file, linenum, *res, args[0], args[1]);
989 err_code |= ERR_ALERT | ERR_FATAL;
990 goto err;
991 }
992 if (args[1][2] == 'c')
993 cfg_sink->forward_px->timeout.connect = tout;
994 else
995 cfg_sink->forward_px->timeout.server = tout;
996 }
997 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200998 else if (strcmp(args[0],"format") == 0) {
999 if (!cfg_sink) {
1000 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
1001 err_code |= ERR_ALERT | ERR_FATAL;
1002 goto err;
1003 }
1004
Emeric Brun54648852020-07-06 15:54:06 +02001005 cfg_sink->fmt = get_log_format(args[1]);
1006 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +02001007 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
1008 err_code |= ERR_ALERT | ERR_FATAL;
1009 goto err;
1010 }
1011 }
1012 else if (strcmp(args[0],"maxlen") == 0) {
1013 if (!cfg_sink) {
1014 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
1015 err_code |= ERR_ALERT | ERR_FATAL;
1016 goto err;
1017 }
1018
1019 cfg_sink->maxlen = atol(args[1]);
1020 if (!cfg_sink->maxlen) {
1021 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
1022 err_code |= ERR_ALERT | ERR_FATAL;
1023 goto err;
1024 }
1025 }
1026 else if (strcmp(args[0],"description") == 0) {
1027 if (!cfg_sink) {
1028 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
1029 err_code |= ERR_ALERT | ERR_FATAL;
1030 goto err;
1031 }
1032
1033 if (!*args[1]) {
1034 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
1035 err_code |= ERR_ALERT | ERR_FATAL;
1036 goto err;
1037 }
1038
1039 free(cfg_sink->desc);
1040
1041 cfg_sink->desc = strdup(args[1]);
1042 if (!cfg_sink->desc) {
1043 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
1044 err_code |= ERR_ALERT | ERR_FATAL;
1045 goto err;
1046 }
1047 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +02001048 else {
1049 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
1050 err_code |= ERR_ALERT | ERR_FATAL;
1051 goto err;
1052 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001053
1054err:
1055 return err_code;
1056}
1057
Emeric Brun94aab062021-04-02 10:41:36 +02001058/* Creates an new sink buffer from a log server.
1059 *
1060 * It uses the logsrvaddress to declare a forward
1061 * server for this buffer. And it initializes the
1062 * forwarding.
1063 *
1064 * The function returns a pointer on the
1065 * allocated struct sink if allocate
1066 * and initialize succeed, else if it fails
1067 * it returns NULL.
1068 *
1069 * Note: the sink is created using the name
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +05001070 * specified into logsrv->ring_name
Emeric Brun94aab062021-04-02 10:41:36 +02001071 */
1072struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
1073{
1074 struct proxy *p = NULL;
1075 struct sink *sink = NULL;
1076 struct server *srv = NULL;
1077 struct sink_forward_target *sft = NULL;
Emeric Brun94aab062021-04-02 10:41:36 +02001078
1079 /* allocate new proxy to handle
1080 * forward to a stream server
1081 */
1082 p = calloc(1, sizeof *p);
1083 if (!p) {
1084 goto error;
1085 }
1086
1087 init_new_proxy(p);
1088 sink_setup_proxy(p);
1089 p->id = strdup(logsrv->ring_name);
1090 p->conf.args.file = p->conf.file = strdup(logsrv->conf.file);
1091 p->conf.args.line = p->conf.line = logsrv->conf.line;
1092
1093 /* allocate a new server to forward messages
1094 * from ring buffer
1095 */
1096 srv = new_server(p);
1097 if (!srv)
1098 goto error;
1099
1100 /* init server */
1101 srv->id = strdup(logsrv->ring_name);
1102 srv->conf.file = strdup(logsrv->conf.file);
1103 srv->conf.line = logsrv->conf.line;
1104 srv->addr = logsrv->addr;
1105 srv->svc_port = get_host_port(&logsrv->addr);
1106 HA_SPIN_INIT(&srv->lock);
1107
1108 /* process per thread init */
Miroslav Zagorac8a8f2702021-06-15 15:33:20 +02001109 if (srv_init_per_thr(srv) == -1)
Emeric Brun94aab062021-04-02 10:41:36 +02001110 goto error;
1111
Emeric Brun94aab062021-04-02 10:41:36 +02001112 /* the servers are linked backwards
1113 * first into proxy
1114 */
1115 p->srv = srv;
1116 srv->next = p->srv;
1117
1118 /* allocate sink_forward_target descriptor */
1119 sft = calloc(1, sizeof(*sft));
1120 if (!sft)
1121 goto error;
1122
1123 /* init sink_forward_target offset */
1124 sft->srv = srv;
1125 sft->appctx = NULL;
1126 sft->ofs = ~0;
1127 HA_SPIN_INIT(&sft->lock);
1128
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +05001129 /* prepare description for the sink */
Emeric Brun94aab062021-04-02 10:41:36 +02001130 chunk_reset(&trash);
1131 chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
1132
1133 /* allocate a new sink buffer */
1134 sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE);
1135 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1136 goto error;
1137 }
1138
1139 /* link sink_forward_target to proxy */
1140 sink->forward_px = p;
1141 p->parent = sink;
1142
1143 /* insert into sink_forward_targets
1144 * list into sink
1145 */
Christopher Faulet2ae25ea2022-05-12 14:50:09 +02001146 sft->sink = sink;
Emeric Brun94aab062021-04-02 10:41:36 +02001147 sft->next = sink->sft;
1148 sink->sft = sft;
1149
1150 /* mark server as an attached reader to the ring */
1151 if (!ring_attach(sink->ctx.ring)) {
1152 /* should never fail since there is
1153 * only one reader
1154 */
1155 goto error;
1156 }
1157
1158 /* initialize sink buffer forwarding */
1159 if (!sink_init_forward(sink))
1160 goto error;
1161
1162 /* reset familyt of logsrv to consider the ring buffer target */
1163 logsrv->addr.ss_family = AF_UNSPEC;
1164
1165 return sink;
1166error:
1167 if (p) {
1168 if (p->id)
1169 free(p->id);
1170 if (p->conf.file)
1171 free(p->conf.file);
1172
1173 free(p);
1174 }
1175
1176 if (srv) {
1177 if (srv->id)
1178 free(srv->id);
1179 if (srv->conf.file)
1180 free((void *)srv->conf.file);
1181 if (srv->per_thr)
1182 free(srv->per_thr);
1183 free(srv);
1184 }
1185
1186 if (sft)
1187 free(sft);
1188
1189 if (sink) {
1190 if (sink->ctx.ring)
1191 ring_free(sink->ctx.ring);
1192
Willy Tarreau2b718102021-04-21 07:32:39 +02001193 LIST_DELETE(&sink->sink_list);
Emeric Brun94aab062021-04-02 10:41:36 +02001194 free(sink->name);
1195 free(sink->desc);
1196 free(sink);
1197 }
1198
1199 return NULL;
1200}
1201
Emeric Brun99c453d2020-05-25 15:01:04 +02001202/*
1203 * Post parsing "ring" section.
1204 *
1205 * The function returns 0 in success case, otherwise, it returns error
1206 * flags.
1207 */
1208int cfg_post_parse_ring()
1209{
1210 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +02001211 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +02001212
1213 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
1214 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
1215 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +02001216 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +02001217 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
1218 err_code |= ERR_ALERT;
1219 }
Emeric Brun494c5052020-05-28 11:13:15 +02001220
1221 /* prepare forward server descriptors */
1222 if (cfg_sink->forward_px) {
1223 srv = cfg_sink->forward_px->srv;
1224 while (srv) {
1225 struct sink_forward_target *sft;
1226 /* init ssl if needed */
1227 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
1228 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
1229 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
1230 err_code |= ERR_ALERT | ERR_FATAL;
1231 }
1232 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001233
Emeric Brun494c5052020-05-28 11:13:15 +02001234 /* allocate sink_forward_target descriptor */
1235 sft = calloc(1, sizeof(*sft));
1236 if (!sft) {
1237 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1238 err_code |= ERR_ALERT | ERR_FATAL;
1239 break;
1240 }
1241 sft->srv = srv;
1242 sft->appctx = NULL;
1243 sft->ofs = ~0; /* init ring offset */
Christopher Faulet96417f32022-08-04 16:00:13 +02001244 sft->sink = cfg_sink;
Emeric Brun494c5052020-05-28 11:13:15 +02001245 sft->next = cfg_sink->sft;
1246 HA_SPIN_INIT(&sft->lock);
1247
1248 /* mark server attached to the ring */
1249 if (!ring_attach(cfg_sink->ctx.ring)) {
1250 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1251 err_code |= ERR_ALERT | ERR_FATAL;
1252 }
1253 cfg_sink->sft = sft;
1254 srv = srv->next;
1255 }
1256 sink_init_forward(cfg_sink);
1257 }
1258 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001259 cfg_sink = NULL;
1260
1261 return err_code;
1262}
1263
1264/* resolve sink names at end of config. Returns 0 on success otherwise error
1265 * flags.
1266*/
1267int post_sink_resolve()
1268{
Christopher Fauletfc633b62020-11-06 15:24:23 +01001269 int err_code = ERR_NONE;
Emeric Brun99c453d2020-05-25 15:01:04 +02001270 struct logsrv *logsrv, *logb;
1271 struct sink *sink;
1272 struct proxy *px;
1273
1274 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1275 if (logsrv->type == LOG_TARGET_BUFFER) {
1276 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001277 if (!sink) {
1278 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1279 * means we must allocate a sink
1280 * buffer to send messages to this logsrv
1281 */
1282 if (logsrv->addr.ss_family != AF_UNSPEC) {
1283 sink = sink_new_from_logsrv(logsrv);
1284 if (!sink) {
1285 ha_alert("global stream log server declared in file '%s' at line %d cannot be initialized'.\n",
1286 logsrv->conf.file, logsrv->conf.line);
1287 err_code |= ERR_ALERT | ERR_FATAL;
1288 }
1289 }
1290 else {
1291 ha_alert("global log server declared in file '%s' at line %d uses unknown ring named '%s'.\n",
1292 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1293 err_code |= ERR_ALERT | ERR_FATAL;
1294 }
1295 }
1296 else if (sink->type != SINK_TYPE_BUFFER) {
1297 ha_alert("global log server declared in file '%s' at line %d uses incompatible ring '%s'.\n",
1298 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001299 err_code |= ERR_ALERT | ERR_FATAL;
1300 }
1301 logsrv->sink = sink;
1302 }
Emeric Brun94aab062021-04-02 10:41:36 +02001303
Emeric Brun99c453d2020-05-25 15:01:04 +02001304 }
1305
1306 for (px = proxies_list; px; px = px->next) {
1307 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1308 if (logsrv->type == LOG_TARGET_BUFFER) {
1309 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001310 if (!sink) {
1311 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1312 * means we must allocate a sink
1313 * buffer to send messages to this logsrv
1314 */
1315 if (logsrv->addr.ss_family != AF_UNSPEC) {
1316 sink = sink_new_from_logsrv(logsrv);
1317 if (!sink) {
1318 ha_alert("log server declared in proxy section '%s' file '%s' at line %d cannot be initialized'.\n",
1319 px->id, logsrv->conf.file, logsrv->conf.line);
1320 err_code |= ERR_ALERT | ERR_FATAL;
1321 }
1322 }
1323 else {
1324 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1325 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1326 err_code |= ERR_ALERT | ERR_FATAL;
1327 }
1328 }
1329 else if (sink->type != SINK_TYPE_BUFFER) {
1330 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses incomatible ring named '%s'.\n",
1331 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001332 err_code |= ERR_ALERT | ERR_FATAL;
1333 }
1334 logsrv->sink = sink;
1335 }
1336 }
1337 }
Emeric Brun12941c82020-07-07 14:19:42 +02001338
1339 for (px = cfg_log_forward; px; px = px->next) {
1340 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1341 if (logsrv->type == LOG_TARGET_BUFFER) {
1342 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001343 if (!sink) {
1344 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1345 * means we must allocate a sink
1346 * buffer to send messages to this logsrv
1347 */
1348 if (logsrv->addr.ss_family != AF_UNSPEC) {
1349 sink = sink_new_from_logsrv(logsrv);
1350 if (!sink) {
1351 ha_alert("log server declared in log-forward section '%s' file '%s' at line %d cannot be initialized'.\n",
1352 px->id, logsrv->conf.file, logsrv->conf.line);
1353 err_code |= ERR_ALERT | ERR_FATAL;
1354 }
1355 }
1356 else {
1357 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1358 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1359 err_code |= ERR_ALERT | ERR_FATAL;
1360 }
1361 }
1362 else if (sink->type != SINK_TYPE_BUFFER) {
1363 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1364 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun12941c82020-07-07 14:19:42 +02001365 err_code |= ERR_ALERT | ERR_FATAL;
1366 }
1367 logsrv->sink = sink;
1368 }
1369 }
1370 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001371 return err_code;
1372}
1373
1374
Willy Tarreau973e6622019-08-20 11:57:52 +02001375static void sink_init()
1376{
Emeric Brun54648852020-07-06 15:54:06 +02001377 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1378 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1379 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001380}
1381
1382static void sink_deinit()
1383{
1384 struct sink *sink, *sb;
1385
1386 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
Willy Tarreau0b8e9ce2022-08-11 16:38:20 +02001387 if (sink->type == SINK_TYPE_BUFFER) {
1388 if (sink->store)
1389 munmap(sink->ctx.ring->buf.area, sink->ctx.ring->buf.size);
1390 else
1391 ring_free(sink->ctx.ring);
1392 }
Willy Tarreau2b718102021-04-21 07:32:39 +02001393 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001394 free(sink->name);
1395 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001396 free(sink);
1397 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001398}
1399
1400INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001401REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001402
Willy Tarreau9f830d72019-08-26 18:17:04 +02001403static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaub205bfd2021-05-07 11:38:37 +02001404 { { "show", "events", NULL }, "show events [<sink>] [-w] [-n] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001405 {{},}
1406}};
1407
1408INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1409
Emeric Brun99c453d2020-05-25 15:01:04 +02001410/* config parsers for this section */
1411REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1412REGISTER_POST_CHECK(post_sink_resolve);
1413
Willy Tarreau67b5a162019-08-11 16:38:56 +02001414/*
1415 * Local variables:
1416 * c-indent-level: 8
1417 * c-basic-offset: 8
1418 * End:
1419 */