blob: 889acd2f4607431787f49442d8530d9856dc20c2 [file] [log] [blame]
Willy Tarreau67b5a162019-08-11 16:38:56 +02001/*
2 * Event sink management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
Willy Tarreau36979d92020-06-05 17:27:29 +020021#include <import/ist.h>
Willy Tarreau4c7e4b72020-05-27 12:58:42 +020022#include <haproxy/api.h>
Christopher Faulet6b0a0fb2022-04-04 11:29:28 +020023#include <haproxy/applet.h>
Willy Tarreau6be78492020-06-05 00:00:29 +020024#include <haproxy/cfgparse.h>
Willy Tarreau83487a82020-06-04 20:19:54 +020025#include <haproxy/cli.h>
Willy Tarreau36979d92020-06-05 17:27:29 +020026#include <haproxy/errors.h>
Willy Tarreau853b2972020-05-27 18:01:47 +020027#include <haproxy/list.h>
Willy Tarreauaeed4a82020-06-04 22:01:04 +020028#include <haproxy/log.h>
Willy Tarreau817538e2021-05-08 20:20:21 +020029#include <haproxy/proxy.h>
Willy Tarreaud2ad57c2020-06-03 19:43:35 +020030#include <haproxy/ring.h>
Willy Tarreau5edca2f2022-05-27 09:25:10 +020031#include <haproxy/sc_strm.h>
Willy Tarreau3727a8a2020-06-04 17:37:26 +020032#include <haproxy/signal.h>
Willy Tarreauba2f73d2020-06-03 20:02:28 +020033#include <haproxy/sink.h>
Willy Tarreaucb086c62022-05-27 09:47:12 +020034#include <haproxy/stconn.h>
Willy Tarreaub2551052020-06-09 09:07:15 +020035#include <haproxy/time.h>
Willy Tarreau4bad5e22021-05-08 13:05:30 +020036#include <haproxy/tools.h>
Willy Tarreau67b5a162019-08-11 16:38:56 +020037
38struct list sink_list = LIST_HEAD_INIT(sink_list);
39
Emeric Brun99c453d2020-05-25 15:01:04 +020040struct sink *cfg_sink;
41
Willy Tarreau67b5a162019-08-11 16:38:56 +020042struct sink *sink_find(const char *name)
43{
44 struct sink *sink;
45
46 list_for_each_entry(sink, &sink_list, sink_list)
47 if (strcmp(sink->name, name) == 0)
48 return sink;
49 return NULL;
50}
51
52/* creates a new sink and adds it to the list, it's still generic and not fully
53 * initialized. Returns NULL on allocation failure. If another one already
54 * exists with the same name, it will be returned. The caller can detect it as
55 * a newly created one has type SINK_TYPE_NEW.
56 */
Emeric Brun54648852020-07-06 15:54:06 +020057static struct sink *__sink_new(const char *name, const char *desc, int fmt)
Willy Tarreau67b5a162019-08-11 16:38:56 +020058{
59 struct sink *sink;
60
61 sink = sink_find(name);
62 if (sink)
63 goto end;
64
Emeric Brun494c5052020-05-28 11:13:15 +020065 sink = calloc(1, sizeof(*sink));
Willy Tarreau67b5a162019-08-11 16:38:56 +020066 if (!sink)
67 goto end;
68
Emeric Brun99c453d2020-05-25 15:01:04 +020069 sink->name = strdup(name);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010070 if (!sink->name)
71 goto err;
72
Emeric Brun99c453d2020-05-25 15:01:04 +020073 sink->desc = strdup(desc);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010074 if (!sink->desc)
75 goto err;
76
Willy Tarreau67b5a162019-08-11 16:38:56 +020077 sink->fmt = fmt;
78 sink->type = SINK_TYPE_NEW;
Christopher Fauleta63a5c22019-11-15 15:10:12 +010079 sink->maxlen = BUFSIZE;
Willy Tarreau67b5a162019-08-11 16:38:56 +020080 /* address will be filled by the caller if needed */
Willy Tarreau973e6622019-08-20 11:57:52 +020081 sink->ctx.fd = -1;
Willy Tarreau67b5a162019-08-11 16:38:56 +020082 sink->ctx.dropped = 0;
83 HA_RWLOCK_INIT(&sink->ctx.lock);
Willy Tarreau2b718102021-04-21 07:32:39 +020084 LIST_APPEND(&sink_list, &sink->sink_list);
Willy Tarreau67b5a162019-08-11 16:38:56 +020085 end:
86 return sink;
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010087
88 err:
Willy Tarreau61cfdf42021-02-20 10:46:51 +010089 ha_free(&sink->name);
90 ha_free(&sink->desc);
91 ha_free(&sink);
Tim Duesterhusa7ebffe2021-01-03 19:54:11 +010092
93 return NULL;
Willy Tarreau67b5a162019-08-11 16:38:56 +020094}
95
Willy Tarreau973e6622019-08-20 11:57:52 +020096/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
97 * and description <desc>. Returns NULL on allocation failure or conflict.
98 * Perfect duplicates are merged (same type, fd, and name).
99 */
Emeric Brun54648852020-07-06 15:54:06 +0200100struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
Willy Tarreau973e6622019-08-20 11:57:52 +0200101{
102 struct sink *sink;
103
104 sink = __sink_new(name, desc, fmt);
105 if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
106 goto end;
107
108 if (sink->type != SINK_TYPE_NEW) {
109 sink = NULL;
110 goto end;
111 }
112
113 sink->type = SINK_TYPE_FD;
114 sink->ctx.fd = fd;
115 end:
116 return sink;
117}
118
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200119/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
120 * and description <desc>. Returns NULL on allocation failure or conflict.
121 * Perfect duplicates are merged (same type and name). If sizes differ, the
122 * largest one is kept.
123 */
Emeric Brun54648852020-07-06 15:54:06 +0200124struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200125{
126 struct sink *sink;
127
128 sink = __sink_new(name, desc, fmt);
129 if (!sink)
130 goto fail;
131
132 if (sink->type == SINK_TYPE_BUFFER) {
133 /* such a buffer already exists, we may have to resize it */
134 if (!ring_resize(sink->ctx.ring, size))
135 goto fail;
136 goto end;
137 }
138
139 if (sink->type != SINK_TYPE_NEW) {
140 /* already exists of another type */
141 goto fail;
142 }
143
144 sink->ctx.ring = ring_new(size);
145 if (!sink->ctx.ring) {
Willy Tarreau2b718102021-04-21 07:32:39 +0200146 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +0200147 free(sink->name);
148 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200149 free(sink);
150 goto fail;
151 }
152
153 sink->type = SINK_TYPE_BUFFER;
154 end:
155 return sink;
156 fail:
157 return NULL;
158}
159
Willy Tarreau67b5a162019-08-11 16:38:56 +0200160/* tries to send <nmsg> message parts (up to 8, ignored above) from message
Ilya Shipitsin46a030c2020-07-05 16:36:08 +0500161 * array <msg> to sink <sink>. Formatting according to the sink's preference is
Willy Tarreau8f240232019-08-27 16:41:06 +0200162 * done here. Lost messages are NOT accounted for. It is preferable to call
163 * sink_write() instead which will also try to emit the number of dropped
164 * messages when there are any. It returns >0 if it could write anything,
165 * <=0 otherwise.
Willy Tarreau67b5a162019-08-11 16:38:56 +0200166 */
Emeric Brun54648852020-07-06 15:54:06 +0200167 ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
168 int level, int facility, struct ist *metadata)
169 {
170 struct ist *pfx = NULL;
Willy Tarreaua1426de2019-08-27 14:21:02 +0200171 size_t npfx = 0;
Emeric Brunbd163812020-05-06 14:33:46 +0200172
Emeric Brun54648852020-07-06 15:54:06 +0200173 if (sink->fmt == LOG_FORMAT_RAW)
Emeric Brunbd163812020-05-06 14:33:46 +0200174 goto send;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200175
Emeric Brun54648852020-07-06 15:54:06 +0200176 pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx);
Emeric Brunbd163812020-05-06 14:33:46 +0200177
178send:
Willy Tarreau973e6622019-08-20 11:57:52 +0200179 if (sink->type == SINK_TYPE_FD) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200180 return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +0200181 }
182 else if (sink->type == SINK_TYPE_BUFFER) {
Willy Tarreau8f240232019-08-27 16:41:06 +0200183 return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
Willy Tarreau973e6622019-08-20 11:57:52 +0200184 }
Willy Tarreau8f240232019-08-27 16:41:06 +0200185 return 0;
186}
Willy Tarreau67b5a162019-08-11 16:38:56 +0200187
Willy Tarreau8f240232019-08-27 16:41:06 +0200188/* Tries to emit a message indicating the number of dropped events. In case of
189 * success, the amount of drops is reduced by as much. It's supposed to be
190 * called under an exclusive lock on the sink to avoid multiple produces doing
191 * the same. On success, >0 is returned, otherwise <=0 on failure.
192 */
Emeric Brun54648852020-07-06 15:54:06 +0200193int sink_announce_dropped(struct sink *sink, int facility)
Willy Tarreau8f240232019-08-27 16:41:06 +0200194{
Emeric Brun54648852020-07-06 15:54:06 +0200195 static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS];
196 static THREAD_LOCAL pid_t curr_pid;
197 static THREAD_LOCAL char pidstr[16];
Willy Tarreau8f240232019-08-27 16:41:06 +0200198 unsigned int dropped;
199 struct buffer msg;
200 struct ist msgvec[1];
201 char logbuf[64];
202
203 while (unlikely((dropped = sink->ctx.dropped) > 0)) {
204 chunk_init(&msg, logbuf, sizeof(logbuf));
205 chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
206 msgvec[0] = ist2(msg.area, msg.data);
Emeric Brunbd163812020-05-06 14:33:46 +0200207
Emeric Brun54648852020-07-06 15:54:06 +0200208 if (!metadata[LOG_META_HOST].len) {
209 if (global.log_send_hostname)
Tim Duesterhus77508502022-03-15 13:11:06 +0100210 metadata[LOG_META_HOST] = ist(global.log_send_hostname);
Emeric Brun54648852020-07-06 15:54:06 +0200211 }
212
213 if (!metadata[LOG_META_TAG].len)
214 metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data);
215
216 if (unlikely(curr_pid != getpid()))
217 metadata[LOG_META_PID].len = 0;
218
219 if (!metadata[LOG_META_PID].len) {
220 curr_pid = getpid();
221 ltoa_o(curr_pid, pidstr, sizeof(pidstr));
222 metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr));
223 }
224
225 if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0)
Willy Tarreau8f240232019-08-27 16:41:06 +0200226 return 0;
227 /* success! */
228 HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
229 }
230 return 1;
Willy Tarreau67b5a162019-08-11 16:38:56 +0200231}
232
Willy Tarreau9f830d72019-08-26 18:17:04 +0200233/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
234static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
235{
236 struct sink *sink;
Willy Tarreaucba88382022-05-05 15:18:57 +0200237 uint ring_flags;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200238 int arg;
Willy Tarreau9f830d72019-08-26 18:17:04 +0200239
240 args++; // make args[1] the 1st arg
241
242 if (!*args[1]) {
243 /* no arg => report the list of supported sink */
Willy Tarreau1d181e42019-08-30 11:17:01 +0200244 chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
Willy Tarreau9f830d72019-08-26 18:17:04 +0200245 list_for_each_entry(sink, &sink_list, sink_list) {
246 chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
247 sink->name,
248 sink->type == SINK_TYPE_NEW ? "init" :
249 sink->type == SINK_TYPE_FD ? "fd" :
250 sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
251 sink->ctx.dropped, sink->desc);
252 }
253
254 trash.area[trash.data] = 0;
255 return cli_msg(appctx, LOG_WARNING, trash.area);
256 }
257
258 if (!cli_has_level(appctx, ACCESS_LVL_OPER))
259 return 1;
260
261 sink = sink_find(args[1]);
262 if (!sink)
263 return cli_err(appctx, "No such event sink");
264
265 if (sink->type != SINK_TYPE_BUFFER)
266 return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
267
Willy Tarreaucba88382022-05-05 15:18:57 +0200268 ring_flags = 0;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200269 for (arg = 2; *args[arg]; arg++) {
270 if (strcmp(args[arg], "-w") == 0)
Willy Tarreaucba88382022-05-05 15:18:57 +0200271 ring_flags |= RING_WF_WAIT_MODE;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200272 else if (strcmp(args[arg], "-n") == 0)
Willy Tarreaucba88382022-05-05 15:18:57 +0200273 ring_flags |= RING_WF_SEEK_NEW;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200274 else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
Willy Tarreaucba88382022-05-05 15:18:57 +0200275 ring_flags |= RING_WF_WAIT_MODE | RING_WF_SEEK_NEW;
Willy Tarreau1d181e42019-08-30 11:17:01 +0200276 else
277 return cli_err(appctx, "unknown option");
278 }
Willy Tarreaucba88382022-05-05 15:18:57 +0200279 return ring_attach_cli(sink->ctx.ring, appctx, ring_flags);
Willy Tarreau9f830d72019-08-26 18:17:04 +0200280}
281
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500282/* Pre-configures a ring proxy to emit connections */
Emeric Brun494c5052020-05-28 11:13:15 +0200283void sink_setup_proxy(struct proxy *px)
284{
285 px->last_change = now.tv_sec;
286 px->cap = PR_CAP_FE | PR_CAP_BE;
287 px->maxconn = 0;
288 px->conn_retries = 1;
289 px->timeout.server = TICK_ETERNITY;
290 px->timeout.client = TICK_ETERNITY;
291 px->timeout.connect = TICK_ETERNITY;
292 px->accept = NULL;
293 px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
Emeric Brun494c5052020-05-28 11:13:15 +0200294}
295
296/*
Willy Tarreau42cc8312022-05-04 20:42:23 +0200297 * IO Handler to handle message push to syslog tcp server.
298 * It takes its context from appctx->svcctx.
Emeric Brun494c5052020-05-28 11:13:15 +0200299 */
300static void sink_forward_io_handler(struct appctx *appctx)
301{
Willy Tarreauc12b3212022-05-27 11:08:15 +0200302 struct stconn *sc = appctx_sc(appctx);
Willy Tarreau0eca5392022-05-27 10:44:25 +0200303 struct stream *s = __sc_strm(sc);
Emeric Brun494c5052020-05-28 11:13:15 +0200304 struct sink *sink = strm_fe(s)->parent;
Willy Tarreau42cc8312022-05-04 20:42:23 +0200305 struct sink_forward_target *sft = appctx->svcctx;
Emeric Brun494c5052020-05-28 11:13:15 +0200306 struct ring *ring = sink->ctx.ring;
307 struct buffer *buf = &ring->buf;
308 uint64_t msg_len;
Willy Tarreau53bfab02022-08-04 17:18:54 +0200309 size_t len, cnt, ofs, last_ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200310 int ret = 0;
311
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500312 /* if stopping was requested, close immediately */
Emeric Brun494c5052020-05-28 11:13:15 +0200313 if (unlikely(stopping))
314 goto close;
315
316 /* for rex because it seems reset to timeout
317 * and we don't want expire on this case
318 * with a syslog server
319 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200320 sc_oc(sc)->rex = TICK_ETERNITY;
Emeric Brun494c5052020-05-28 11:13:15 +0200321 /* rto should not change but it seems the case */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200322 sc_oc(sc)->rto = TICK_ETERNITY;
Emeric Brun494c5052020-05-28 11:13:15 +0200323
324 /* an error was detected */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200325 if (unlikely(sc_ic(sc)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
Emeric Brun494c5052020-05-28 11:13:15 +0200326 goto close;
327
328 /* con closed by server side */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200329 if ((sc_oc(sc)->flags & CF_SHUTW))
Emeric Brun494c5052020-05-28 11:13:15 +0200330 goto close;
331
332 /* if the connection is not established, inform the stream that we want
333 * to be notified whenever the connection completes.
334 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200335 if (sc_opposite(sc)->state < SC_ST_EST) {
Willy Tarreau90e8b452022-05-25 18:21:43 +0200336 applet_need_more_data(appctx);
Willy Tarreaub23edc82022-05-24 16:49:03 +0200337 se_need_remote_conn(appctx->sedesc);
Willy Tarreau4164eb92022-05-25 15:42:03 +0200338 applet_have_more_data(appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200339 return;
340 }
341
342 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
343 if (appctx != sft->appctx) {
344 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
345 goto close;
346 }
347 ofs = sft->ofs;
348
349 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
350 LIST_DEL_INIT(&appctx->wait_entry);
351 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
352
353 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
354
355 /* explanation for the initialization below: it would be better to do
356 * this in the parsing function but this would occasionally result in
357 * dropped events because we'd take a reference on the oldest message
358 * and keep it while being scheduled. Thus instead let's take it the
359 * first time we enter here so that we have a chance to pass many
360 * existing messages before grabbing a reference to a location. This
361 * value cannot be produced after initialization.
362 */
363 if (unlikely(ofs == ~0)) {
364 ofs = 0;
365
Willy Tarreau4781b152021-04-06 13:53:36 +0200366 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200367 ofs += ring->ofs;
368 }
369
Emeric Brun494c5052020-05-28 11:13:15 +0200370 /* in this loop, ofs always points to the counter byte that precedes
371 * the message so that we can take our reference there if we have to
372 * stop before the end (ret=0).
373 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200374 if (sc_opposite(sc)->state == SC_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100375 /* we were already there, adjust the offset to be relative to
376 * the buffer's head and remove us from the counter.
377 */
378 ofs -= ring->ofs;
379 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200380 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100381
Emeric Brun494c5052020-05-28 11:13:15 +0200382 ret = 1;
383 while (ofs + 1 < b_data(buf)) {
384 cnt = 1;
385 len = b_peek_varint(buf, ofs + cnt, &msg_len);
386 if (!len)
387 break;
388 cnt += len;
389 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
390
391 if (unlikely(msg_len + 1 > b_size(&trash))) {
392 /* too large a message to ever fit, let's skip it */
393 ofs += cnt + msg_len;
394 continue;
395 }
396
397 chunk_reset(&trash);
398 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
399 trash.data += len;
400 trash.area[trash.data++] = '\n';
401
Willy Tarreaud0a06d52022-05-18 15:07:19 +0200402 if (applet_putchk(appctx, &trash) == -1) {
Emeric Brun494c5052020-05-28 11:13:15 +0200403 ret = 0;
404 break;
405 }
406 ofs += cnt + msg_len;
407 }
408
Willy Tarreau4781b152021-04-06 13:53:36 +0200409 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun494c5052020-05-28 11:13:15 +0200410 ofs += ring->ofs;
411 sft->ofs = ofs;
Willy Tarreau53bfab02022-08-04 17:18:54 +0200412 last_ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200413 }
414 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
415
416 if (ret) {
417 /* let's be woken up once new data arrive */
418 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200419 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Willy Tarreau53bfab02022-08-04 17:18:54 +0200420 ofs = ring->ofs;
Emeric Brun494c5052020-05-28 11:13:15 +0200421 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau53bfab02022-08-04 17:18:54 +0200422 if (ofs != last_ofs) {
423 /* more data was added into the ring between the
424 * unlock and the lock, and the writer might not
425 * have seen us. We need to reschedule a read.
426 */
427 applet_have_more_data(appctx);
428 } else
429 applet_have_no_more_data(appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200430 }
431 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
432
433 /* always drain data from server */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200434 co_skip(sc_oc(sc), sc_oc(sc)->output);
Emeric Brun494c5052020-05-28 11:13:15 +0200435 return;
436
437close:
Willy Tarreau0eca5392022-05-27 10:44:25 +0200438 sc_shutw(sc);
439 sc_shutr(sc);
440 sc_ic(sc)->flags |= CF_READ_NULL;
Emeric Brun494c5052020-05-28 11:13:15 +0200441}
442
Emeric Brun97556472020-05-30 01:42:45 +0200443/*
444 * IO Handler to handle message push to syslog tcp server
445 * using octet counting frames
Willy Tarreau42cc8312022-05-04 20:42:23 +0200446 * It takes its context from appctx->svcctx.
Emeric Brun97556472020-05-30 01:42:45 +0200447 */
448static void sink_forward_oc_io_handler(struct appctx *appctx)
449{
Willy Tarreauc12b3212022-05-27 11:08:15 +0200450 struct stconn *sc = appctx_sc(appctx);
Willy Tarreau0eca5392022-05-27 10:44:25 +0200451 struct stream *s = __sc_strm(sc);
Emeric Brun97556472020-05-30 01:42:45 +0200452 struct sink *sink = strm_fe(s)->parent;
Willy Tarreau42cc8312022-05-04 20:42:23 +0200453 struct sink_forward_target *sft = appctx->svcctx;
Emeric Brun97556472020-05-30 01:42:45 +0200454 struct ring *ring = sink->ctx.ring;
455 struct buffer *buf = &ring->buf;
456 uint64_t msg_len;
457 size_t len, cnt, ofs;
458 int ret = 0;
459 char *p;
460
Ilya Shipitsin47d17182020-06-21 21:42:57 +0500461 /* if stopping was requested, close immediately */
Emeric Brun97556472020-05-30 01:42:45 +0200462 if (unlikely(stopping))
463 goto close;
464
465 /* for rex because it seems reset to timeout
466 * and we don't want expire on this case
467 * with a syslog server
468 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200469 sc_oc(sc)->rex = TICK_ETERNITY;
Emeric Brun97556472020-05-30 01:42:45 +0200470 /* rto should not change but it seems the case */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200471 sc_oc(sc)->rto = TICK_ETERNITY;
Emeric Brun97556472020-05-30 01:42:45 +0200472
473 /* an error was detected */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200474 if (unlikely(sc_ic(sc)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
Emeric Brun97556472020-05-30 01:42:45 +0200475 goto close;
476
477 /* con closed by server side */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200478 if ((sc_oc(sc)->flags & CF_SHUTW))
Emeric Brun97556472020-05-30 01:42:45 +0200479 goto close;
480
481 /* if the connection is not established, inform the stream that we want
482 * to be notified whenever the connection completes.
483 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200484 if (sc_opposite(sc)->state < SC_ST_EST) {
Willy Tarreau90e8b452022-05-25 18:21:43 +0200485 applet_need_more_data(appctx);
Willy Tarreaub23edc82022-05-24 16:49:03 +0200486 se_need_remote_conn(appctx->sedesc);
Willy Tarreau4164eb92022-05-25 15:42:03 +0200487 applet_have_more_data(appctx);
Emeric Brun97556472020-05-30 01:42:45 +0200488 return;
489 }
490
491 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
492 if (appctx != sft->appctx) {
493 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
494 goto close;
495 }
496 ofs = sft->ofs;
497
498 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
499 LIST_DEL_INIT(&appctx->wait_entry);
500 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
501
502 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
503
504 /* explanation for the initialization below: it would be better to do
505 * this in the parsing function but this would occasionally result in
506 * dropped events because we'd take a reference on the oldest message
507 * and keep it while being scheduled. Thus instead let's take it the
508 * first time we enter here so that we have a chance to pass many
509 * existing messages before grabbing a reference to a location. This
510 * value cannot be produced after initialization.
511 */
512 if (unlikely(ofs == ~0)) {
513 ofs = 0;
514
Willy Tarreau4781b152021-04-06 13:53:36 +0200515 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200516 ofs += ring->ofs;
517 }
518
Emeric Brun97556472020-05-30 01:42:45 +0200519 /* in this loop, ofs always points to the counter byte that precedes
520 * the message so that we can take our reference there if we have to
521 * stop before the end (ret=0).
522 */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200523 if (sc_opposite(sc)->state == SC_ST_EST) {
Emeric Brunfdabf492020-12-02 17:02:09 +0100524 /* we were already there, adjust the offset to be relative to
525 * the buffer's head and remove us from the counter.
526 */
527 ofs -= ring->ofs;
528 BUG_ON(ofs >= buf->size);
Willy Tarreau4781b152021-04-06 13:53:36 +0200529 HA_ATOMIC_DEC(b_peek(buf, ofs));
Emeric Brunfdabf492020-12-02 17:02:09 +0100530
Emeric Brun97556472020-05-30 01:42:45 +0200531 ret = 1;
532 while (ofs + 1 < b_data(buf)) {
533 cnt = 1;
534 len = b_peek_varint(buf, ofs + cnt, &msg_len);
535 if (!len)
536 break;
537 cnt += len;
538 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
539
540 chunk_reset(&trash);
541 p = ulltoa(msg_len, trash.area, b_size(&trash));
542 if (p) {
543 trash.data = (p - trash.area) + 1;
544 *p = ' ';
545 }
546
547 if (!p || (trash.data + msg_len > b_size(&trash))) {
548 /* too large a message to ever fit, let's skip it */
549 ofs += cnt + msg_len;
550 continue;
551 }
552
553 trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
554
Willy Tarreaud0a06d52022-05-18 15:07:19 +0200555 if (applet_putchk(appctx, &trash) == -1) {
Emeric Brun97556472020-05-30 01:42:45 +0200556 ret = 0;
557 break;
558 }
559 ofs += cnt + msg_len;
560 }
561
Willy Tarreau4781b152021-04-06 13:53:36 +0200562 HA_ATOMIC_INC(b_peek(buf, ofs));
Emeric Brun97556472020-05-30 01:42:45 +0200563 ofs += ring->ofs;
564 sft->ofs = ofs;
565 }
566 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
567
568 if (ret) {
569 /* let's be woken up once new data arrive */
570 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau2b718102021-04-21 07:32:39 +0200571 LIST_APPEND(&ring->waiters, &appctx->wait_entry);
Emeric Brun97556472020-05-30 01:42:45 +0200572 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
Willy Tarreau4164eb92022-05-25 15:42:03 +0200573 applet_have_no_more_data(appctx);
Emeric Brun97556472020-05-30 01:42:45 +0200574 }
575 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
576
577 /* always drain data from server */
Willy Tarreau0eca5392022-05-27 10:44:25 +0200578 co_skip(sc_oc(sc), sc_oc(sc)->output);
Emeric Brun97556472020-05-30 01:42:45 +0200579 return;
580
581close:
Willy Tarreau0eca5392022-05-27 10:44:25 +0200582 sc_shutw(sc);
583 sc_shutr(sc);
584 sc_ic(sc)->flags |= CF_READ_NULL;
Emeric Brun97556472020-05-30 01:42:45 +0200585}
586
Emeric Brun494c5052020-05-28 11:13:15 +0200587void __sink_forward_session_deinit(struct sink_forward_target *sft)
588{
Willy Tarreau0698c802022-05-11 14:09:57 +0200589 struct stream *s = appctx_strm(sft->appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200590 struct sink *sink;
591
Emeric Brun494c5052020-05-28 11:13:15 +0200592 sink = strm_fe(s)->parent;
593 if (!sink)
594 return;
595
596 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
597 LIST_DEL_INIT(&sft->appctx->wait_entry);
598 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock);
599
600 sft->appctx = NULL;
601 task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
602}
603
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200604static int sink_forward_session_init(struct appctx *appctx)
605{
606 struct sink_forward_target *sft = appctx->svcctx;
607 struct stream *s;
608 struct sockaddr_storage *addr = NULL;
609
610 if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
611 goto out_error;
612
613 if (appctx_finalize_startup(appctx, sft->sink->forward_px, &BUF_NULL) == -1)
614 goto out_free_addr;
615
616 s = appctx_strm(appctx);
Willy Tarreau7cb9e6c2022-05-17 19:40:40 +0200617 s->scb->dst = addr;
Willy Tarreaucb041662022-05-17 19:44:42 +0200618 s->scb->flags |= SC_FL_NOLINGER;
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200619
620 s->target = &sft->srv->obj_type;
621 s->flags = SF_ASSIGNED;
622
623 s->do_log = NULL;
624 s->uniq_id = 0;
625
626 s->res.flags |= CF_READ_DONTWAIT;
627 /* for rto and rex to eternity to not expire on idle recv:
628 * We are using a syslog server.
629 */
630 s->res.rto = TICK_ETERNITY;
631 s->res.rex = TICK_ETERNITY;
632 sft->appctx = appctx;
633
634 return 0;
635
636 out_free_addr:
637 sockaddr_free(&addr);
638 out_error:
639 return -1;
640}
Emeric Brun494c5052020-05-28 11:13:15 +0200641
642static void sink_forward_session_release(struct appctx *appctx)
643{
Willy Tarreau42cc8312022-05-04 20:42:23 +0200644 struct sink_forward_target *sft = appctx->svcctx;
Emeric Brun494c5052020-05-28 11:13:15 +0200645
646 if (!sft)
647 return;
648
649 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
650 if (sft->appctx == appctx)
651 __sink_forward_session_deinit(sft);
652 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
653}
654
655static struct applet sink_forward_applet = {
656 .obj_type = OBJ_TYPE_APPLET,
657 .name = "<SINKFWD>", /* used for logging */
658 .fct = sink_forward_io_handler,
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200659 .init = sink_forward_session_init,
Emeric Brun494c5052020-05-28 11:13:15 +0200660 .release = sink_forward_session_release,
661};
662
Emeric Brun97556472020-05-30 01:42:45 +0200663static struct applet sink_forward_oc_applet = {
664 .obj_type = OBJ_TYPE_APPLET,
665 .name = "<SINKFWDOC>", /* used for logging */
666 .fct = sink_forward_oc_io_handler,
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200667 .init = sink_forward_session_init,
Emeric Brun97556472020-05-30 01:42:45 +0200668 .release = sink_forward_session_release,
669};
670
Emeric Brun494c5052020-05-28 11:13:15 +0200671/*
672 * Create a new peer session in assigned state (connect will start automatically)
Willy Tarreau42cc8312022-05-04 20:42:23 +0200673 * It sets its context into appctx->svcctx.
Emeric Brun494c5052020-05-28 11:13:15 +0200674 */
675static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
676{
Emeric Brun494c5052020-05-28 11:13:15 +0200677 struct appctx *appctx;
Emeric Brun97556472020-05-30 01:42:45 +0200678 struct applet *applet = &sink_forward_applet;
679
680 if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
681 applet = &sink_forward_oc_applet;
Emeric Brun494c5052020-05-28 11:13:15 +0200682
Christopher Faulet6095d572022-05-16 17:09:48 +0200683 appctx = appctx_new_here(applet, NULL);
Christopher Faulet2479e5f2022-01-19 14:50:11 +0100684 if (!appctx)
Christopher Fauleta9e8b392022-03-23 11:01:09 +0100685 goto out_close;
Willy Tarreau42cc8312022-05-04 20:42:23 +0200686 appctx->svcctx = (void *)sft;
Emeric Brun494c5052020-05-28 11:13:15 +0200687
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200688 if (appctx_init(appctx) == -1)
Christopher Faulet92202da2022-05-11 12:22:10 +0200689 goto out_free_appctx;
Christopher Faulet13a35e52021-12-20 15:34:16 +0100690
Emeric Brun494c5052020-05-28 11:13:15 +0200691 return appctx;
692
693 /* Error unrolling */
Emeric Brun494c5052020-05-28 11:13:15 +0200694 out_free_appctx:
Christopher Fauletaee57fc2022-05-12 15:34:48 +0200695 appctx_free_on_early_error(appctx);
Emeric Brun494c5052020-05-28 11:13:15 +0200696 out_close:
697 return NULL;
698}
699
700/*
701 * Task to handle connctions to forward servers
702 */
Willy Tarreau144f84a2021-03-02 16:09:26 +0100703static struct task *process_sink_forward(struct task * task, void *context, unsigned int state)
Emeric Brun494c5052020-05-28 11:13:15 +0200704{
705 struct sink *sink = (struct sink *)context;
706 struct sink_forward_target *sft = sink->sft;
707
708 task->expire = TICK_ETERNITY;
709
710 if (!stopping) {
711 while (sft) {
712 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
713 /* if appctx is NULL, start a new session */
714 if (!sft->appctx)
715 sft->appctx = sink_forward_session_create(sink, sft);
716 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
717 sft = sft->next;
718 }
719 }
720 else {
721 while (sft) {
722 HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
723 /* awake applet to perform a clean close */
724 if (sft->appctx)
725 appctx_wakeup(sft->appctx);
726 HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
727 sft = sft->next;
728 }
729 }
730
731 return task;
732}
733/*
734 * Init task to manage connctions to forward servers
735 *
736 * returns 0 in case of error.
737 */
738int sink_init_forward(struct sink *sink)
739{
Willy Tarreaubeeabf52021-10-01 18:23:30 +0200740 sink->forward_task = task_new_anywhere();
Emeric Brun494c5052020-05-28 11:13:15 +0200741 if (!sink->forward_task)
742 return 0;
743
744 sink->forward_task->process = process_sink_forward;
745 sink->forward_task->context = (void *)sink;
746 sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
747 task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
748 return 1;
749}
Emeric Brun99c453d2020-05-25 15:01:04 +0200750/*
751 * Parse "ring" section and create corresponding sink buffer.
752 *
753 * The function returns 0 in success case, otherwise, it returns error
754 * flags.
755 */
756int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
757{
758 int err_code = 0;
759 const char *inv;
760 size_t size = BUFSIZE;
Emeric Brun494c5052020-05-28 11:13:15 +0200761 struct proxy *p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200762
763 if (strcmp(args[0], "ring") == 0) { /* new peers section */
764 if (!*args[1]) {
765 ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
766 err_code |= ERR_ALERT | ERR_FATAL;
767 goto err;
768 }
769
770 inv = invalid_char(args[1]);
771 if (inv) {
772 ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
773 err_code |= ERR_ALERT | ERR_FATAL;
774 goto err;
775 }
776
777 if (sink_find(args[1])) {
778 ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
779 err_code |= ERR_ALERT | ERR_FATAL;
780 goto err;
781 }
782
Emeric Brun54648852020-07-06 15:54:06 +0200783 cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size);
Emeric Brun99c453d2020-05-25 15:01:04 +0200784 if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
785 ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
786 err_code |= ERR_ALERT | ERR_FATAL;
787 goto err;
788 }
Emeric Brun494c5052020-05-28 11:13:15 +0200789
790 /* allocate new proxy to handle forwards */
791 p = calloc(1, sizeof *p);
792 if (!p) {
793 ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
794 err_code |= ERR_ALERT | ERR_FATAL;
795 goto err;
796 }
797
798 init_new_proxy(p);
799 sink_setup_proxy(p);
800 p->parent = cfg_sink;
801 p->id = strdup(args[1]);
802 p->conf.args.file = p->conf.file = strdup(file);
803 p->conf.args.line = p->conf.line = linenum;
804 cfg_sink->forward_px = p;
Emeric Brun99c453d2020-05-25 15:01:04 +0200805 }
806 else if (strcmp(args[0], "size") == 0) {
807 size = atol(args[1]);
808 if (!size) {
809 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
810 err_code |= ERR_ALERT | ERR_FATAL;
811 goto err;
812 }
813
814 if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
815 || !ring_resize(cfg_sink->ctx.ring, size)) {
816 ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
817 err_code |= ERR_ALERT | ERR_FATAL;
818 goto err;
819 }
820 }
Emeric Brun494c5052020-05-28 11:13:15 +0200821 else if (strcmp(args[0],"server") == 0) {
Amaury Denoyelle30c05372021-03-08 16:36:46 +0100822 err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL,
823 SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE);
Emeric Brun494c5052020-05-28 11:13:15 +0200824 }
825 else if (strcmp(args[0],"timeout") == 0) {
826 if (!cfg_sink || !cfg_sink->forward_px) {
827 ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
828 err_code |= ERR_ALERT | ERR_FATAL;
829 goto err;
830 }
831
832 if (strcmp(args[1], "connect") == 0 ||
833 strcmp(args[1], "server") == 0) {
834 const char *res;
835 unsigned int tout;
836
837 if (!*args[2]) {
838 ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
839 file, linenum, args[0], args[1]);
840 err_code |= ERR_ALERT | ERR_FATAL;
841 goto err;
842 }
843 res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
844 if (res == PARSE_TIME_OVER) {
845 ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
846 file, linenum, args[2], args[0], args[1]);
847 err_code |= ERR_ALERT | ERR_FATAL;
848 goto err;
849 }
850 else if (res == PARSE_TIME_UNDER) {
851 ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
852 file, linenum, args[2], args[0], args[1]);
853 err_code |= ERR_ALERT | ERR_FATAL;
854 goto err;
855 }
856 else if (res) {
857 ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
858 file, linenum, *res, args[0], args[1]);
859 err_code |= ERR_ALERT | ERR_FATAL;
860 goto err;
861 }
862 if (args[1][2] == 'c')
863 cfg_sink->forward_px->timeout.connect = tout;
864 else
865 cfg_sink->forward_px->timeout.server = tout;
866 }
867 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200868 else if (strcmp(args[0],"format") == 0) {
869 if (!cfg_sink) {
870 ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
871 err_code |= ERR_ALERT | ERR_FATAL;
872 goto err;
873 }
874
Emeric Brun54648852020-07-06 15:54:06 +0200875 cfg_sink->fmt = get_log_format(args[1]);
876 if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
Emeric Brun99c453d2020-05-25 15:01:04 +0200877 ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
878 err_code |= ERR_ALERT | ERR_FATAL;
879 goto err;
880 }
881 }
882 else if (strcmp(args[0],"maxlen") == 0) {
883 if (!cfg_sink) {
884 ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
885 err_code |= ERR_ALERT | ERR_FATAL;
886 goto err;
887 }
888
889 cfg_sink->maxlen = atol(args[1]);
890 if (!cfg_sink->maxlen) {
891 ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
892 err_code |= ERR_ALERT | ERR_FATAL;
893 goto err;
894 }
895 }
896 else if (strcmp(args[0],"description") == 0) {
897 if (!cfg_sink) {
898 ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
899 err_code |= ERR_ALERT | ERR_FATAL;
900 goto err;
901 }
902
903 if (!*args[1]) {
904 ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
905 err_code |= ERR_ALERT | ERR_FATAL;
906 goto err;
907 }
908
909 free(cfg_sink->desc);
910
911 cfg_sink->desc = strdup(args[1]);
912 if (!cfg_sink->desc) {
913 ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
914 err_code |= ERR_ALERT | ERR_FATAL;
915 goto err;
916 }
917 }
Emeric Brun9f2ff3a2020-05-29 15:47:52 +0200918 else {
919 ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
920 err_code |= ERR_ALERT | ERR_FATAL;
921 goto err;
922 }
Emeric Brun99c453d2020-05-25 15:01:04 +0200923
924err:
925 return err_code;
926}
927
Emeric Brun94aab062021-04-02 10:41:36 +0200928/* Creates an new sink buffer from a log server.
929 *
930 * It uses the logsrvaddress to declare a forward
931 * server for this buffer. And it initializes the
932 * forwarding.
933 *
934 * The function returns a pointer on the
935 * allocated struct sink if allocate
936 * and initialize succeed, else if it fails
937 * it returns NULL.
938 *
939 * Note: the sink is created using the name
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +0500940 * specified into logsrv->ring_name
Emeric Brun94aab062021-04-02 10:41:36 +0200941 */
942struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
943{
944 struct proxy *p = NULL;
945 struct sink *sink = NULL;
946 struct server *srv = NULL;
947 struct sink_forward_target *sft = NULL;
Emeric Brun94aab062021-04-02 10:41:36 +0200948
949 /* allocate new proxy to handle
950 * forward to a stream server
951 */
952 p = calloc(1, sizeof *p);
953 if (!p) {
954 goto error;
955 }
956
957 init_new_proxy(p);
958 sink_setup_proxy(p);
959 p->id = strdup(logsrv->ring_name);
960 p->conf.args.file = p->conf.file = strdup(logsrv->conf.file);
961 p->conf.args.line = p->conf.line = logsrv->conf.line;
962
963 /* allocate a new server to forward messages
964 * from ring buffer
965 */
966 srv = new_server(p);
967 if (!srv)
968 goto error;
969
970 /* init server */
971 srv->id = strdup(logsrv->ring_name);
972 srv->conf.file = strdup(logsrv->conf.file);
973 srv->conf.line = logsrv->conf.line;
974 srv->addr = logsrv->addr;
975 srv->svc_port = get_host_port(&logsrv->addr);
976 HA_SPIN_INIT(&srv->lock);
977
978 /* process per thread init */
Miroslav Zagorac8a8f2702021-06-15 15:33:20 +0200979 if (srv_init_per_thr(srv) == -1)
Emeric Brun94aab062021-04-02 10:41:36 +0200980 goto error;
981
Emeric Brun94aab062021-04-02 10:41:36 +0200982 /* the servers are linked backwards
983 * first into proxy
984 */
985 p->srv = srv;
986 srv->next = p->srv;
987
988 /* allocate sink_forward_target descriptor */
989 sft = calloc(1, sizeof(*sft));
990 if (!sft)
991 goto error;
992
993 /* init sink_forward_target offset */
994 sft->srv = srv;
995 sft->appctx = NULL;
996 sft->ofs = ~0;
997 HA_SPIN_INIT(&sft->lock);
998
Ilya Shipitsinb2be9a12021-04-24 13:25:42 +0500999 /* prepare description for the sink */
Emeric Brun94aab062021-04-02 10:41:36 +02001000 chunk_reset(&trash);
1001 chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
1002
1003 /* allocate a new sink buffer */
1004 sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE);
1005 if (!sink || sink->type != SINK_TYPE_BUFFER) {
1006 goto error;
1007 }
1008
1009 /* link sink_forward_target to proxy */
1010 sink->forward_px = p;
1011 p->parent = sink;
1012
1013 /* insert into sink_forward_targets
1014 * list into sink
1015 */
Christopher Faulet2ae25ea2022-05-12 14:50:09 +02001016 sft->sink = sink;
Emeric Brun94aab062021-04-02 10:41:36 +02001017 sft->next = sink->sft;
1018 sink->sft = sft;
1019
1020 /* mark server as an attached reader to the ring */
1021 if (!ring_attach(sink->ctx.ring)) {
1022 /* should never fail since there is
1023 * only one reader
1024 */
1025 goto error;
1026 }
1027
1028 /* initialize sink buffer forwarding */
1029 if (!sink_init_forward(sink))
1030 goto error;
1031
1032 /* reset familyt of logsrv to consider the ring buffer target */
1033 logsrv->addr.ss_family = AF_UNSPEC;
1034
1035 return sink;
1036error:
1037 if (p) {
1038 if (p->id)
1039 free(p->id);
1040 if (p->conf.file)
1041 free(p->conf.file);
1042
1043 free(p);
1044 }
1045
1046 if (srv) {
1047 if (srv->id)
1048 free(srv->id);
1049 if (srv->conf.file)
1050 free((void *)srv->conf.file);
1051 if (srv->per_thr)
1052 free(srv->per_thr);
1053 free(srv);
1054 }
1055
1056 if (sft)
1057 free(sft);
1058
1059 if (sink) {
1060 if (sink->ctx.ring)
1061 ring_free(sink->ctx.ring);
1062
Willy Tarreau2b718102021-04-21 07:32:39 +02001063 LIST_DELETE(&sink->sink_list);
Emeric Brun94aab062021-04-02 10:41:36 +02001064 free(sink->name);
1065 free(sink->desc);
1066 free(sink);
1067 }
1068
1069 return NULL;
1070}
1071
Emeric Brun99c453d2020-05-25 15:01:04 +02001072/*
1073 * Post parsing "ring" section.
1074 *
1075 * The function returns 0 in success case, otherwise, it returns error
1076 * flags.
1077 */
1078int cfg_post_parse_ring()
1079{
1080 int err_code = 0;
Emeric Brun494c5052020-05-28 11:13:15 +02001081 struct server *srv;
Emeric Brun99c453d2020-05-25 15:01:04 +02001082
1083 if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
1084 if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
1085 ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
Willy Tarreau570a22b2020-06-02 12:00:46 +02001086 cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf));
Emeric Brun99c453d2020-05-25 15:01:04 +02001087 cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
1088 err_code |= ERR_ALERT;
1089 }
Emeric Brun494c5052020-05-28 11:13:15 +02001090
1091 /* prepare forward server descriptors */
1092 if (cfg_sink->forward_px) {
1093 srv = cfg_sink->forward_px->srv;
1094 while (srv) {
1095 struct sink_forward_target *sft;
1096 /* init ssl if needed */
1097 if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) {
1098 if (xprt_get(XPRT_SSL)->prepare_srv(srv)) {
1099 ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name);
1100 err_code |= ERR_ALERT | ERR_FATAL;
1101 }
1102 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001103
Emeric Brun494c5052020-05-28 11:13:15 +02001104 /* allocate sink_forward_target descriptor */
1105 sft = calloc(1, sizeof(*sft));
1106 if (!sft) {
1107 ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
1108 err_code |= ERR_ALERT | ERR_FATAL;
1109 break;
1110 }
1111 sft->srv = srv;
1112 sft->appctx = NULL;
1113 sft->ofs = ~0; /* init ring offset */
Christopher Faulet96417f32022-08-04 16:00:13 +02001114 sft->sink = cfg_sink;
Emeric Brun494c5052020-05-28 11:13:15 +02001115 sft->next = cfg_sink->sft;
1116 HA_SPIN_INIT(&sft->lock);
1117
1118 /* mark server attached to the ring */
1119 if (!ring_attach(cfg_sink->ctx.ring)) {
1120 ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
1121 err_code |= ERR_ALERT | ERR_FATAL;
1122 }
1123 cfg_sink->sft = sft;
1124 srv = srv->next;
1125 }
1126 sink_init_forward(cfg_sink);
1127 }
1128 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001129 cfg_sink = NULL;
1130
1131 return err_code;
1132}
1133
1134/* resolve sink names at end of config. Returns 0 on success otherwise error
1135 * flags.
1136*/
1137int post_sink_resolve()
1138{
Christopher Fauletfc633b62020-11-06 15:24:23 +01001139 int err_code = ERR_NONE;
Emeric Brun99c453d2020-05-25 15:01:04 +02001140 struct logsrv *logsrv, *logb;
1141 struct sink *sink;
1142 struct proxy *px;
1143
1144 list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
1145 if (logsrv->type == LOG_TARGET_BUFFER) {
1146 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001147 if (!sink) {
1148 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1149 * means we must allocate a sink
1150 * buffer to send messages to this logsrv
1151 */
1152 if (logsrv->addr.ss_family != AF_UNSPEC) {
1153 sink = sink_new_from_logsrv(logsrv);
1154 if (!sink) {
1155 ha_alert("global stream log server declared in file '%s' at line %d cannot be initialized'.\n",
1156 logsrv->conf.file, logsrv->conf.line);
1157 err_code |= ERR_ALERT | ERR_FATAL;
1158 }
1159 }
1160 else {
1161 ha_alert("global log server declared in file '%s' at line %d uses unknown ring named '%s'.\n",
1162 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1163 err_code |= ERR_ALERT | ERR_FATAL;
1164 }
1165 }
1166 else if (sink->type != SINK_TYPE_BUFFER) {
1167 ha_alert("global log server declared in file '%s' at line %d uses incompatible ring '%s'.\n",
1168 logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001169 err_code |= ERR_ALERT | ERR_FATAL;
1170 }
1171 logsrv->sink = sink;
1172 }
Emeric Brun94aab062021-04-02 10:41:36 +02001173
Emeric Brun99c453d2020-05-25 15:01:04 +02001174 }
1175
1176 for (px = proxies_list; px; px = px->next) {
1177 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1178 if (logsrv->type == LOG_TARGET_BUFFER) {
1179 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001180 if (!sink) {
1181 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1182 * means we must allocate a sink
1183 * buffer to send messages to this logsrv
1184 */
1185 if (logsrv->addr.ss_family != AF_UNSPEC) {
1186 sink = sink_new_from_logsrv(logsrv);
1187 if (!sink) {
1188 ha_alert("log server declared in proxy section '%s' file '%s' at line %d cannot be initialized'.\n",
1189 px->id, logsrv->conf.file, logsrv->conf.line);
1190 err_code |= ERR_ALERT | ERR_FATAL;
1191 }
1192 }
1193 else {
1194 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1195 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1196 err_code |= ERR_ALERT | ERR_FATAL;
1197 }
1198 }
1199 else if (sink->type != SINK_TYPE_BUFFER) {
1200 ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses incomatible ring named '%s'.\n",
1201 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun99c453d2020-05-25 15:01:04 +02001202 err_code |= ERR_ALERT | ERR_FATAL;
1203 }
1204 logsrv->sink = sink;
1205 }
1206 }
1207 }
Emeric Brun12941c82020-07-07 14:19:42 +02001208
1209 for (px = cfg_log_forward; px; px = px->next) {
1210 list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
1211 if (logsrv->type == LOG_TARGET_BUFFER) {
1212 sink = sink_find(logsrv->ring_name);
Emeric Brun94aab062021-04-02 10:41:36 +02001213 if (!sink) {
1214 /* LOG_TARGET_BUFFER but !AF_UNSPEC
1215 * means we must allocate a sink
1216 * buffer to send messages to this logsrv
1217 */
1218 if (logsrv->addr.ss_family != AF_UNSPEC) {
1219 sink = sink_new_from_logsrv(logsrv);
1220 if (!sink) {
1221 ha_alert("log server declared in log-forward section '%s' file '%s' at line %d cannot be initialized'.\n",
1222 px->id, logsrv->conf.file, logsrv->conf.line);
1223 err_code |= ERR_ALERT | ERR_FATAL;
1224 }
1225 }
1226 else {
1227 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1228 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
1229 err_code |= ERR_ALERT | ERR_FATAL;
1230 }
1231 }
1232 else if (sink->type != SINK_TYPE_BUFFER) {
1233 ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n",
1234 px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name);
Emeric Brun12941c82020-07-07 14:19:42 +02001235 err_code |= ERR_ALERT | ERR_FATAL;
1236 }
1237 logsrv->sink = sink;
1238 }
1239 }
1240 }
Emeric Brun99c453d2020-05-25 15:01:04 +02001241 return err_code;
1242}
1243
1244
Willy Tarreau973e6622019-08-20 11:57:52 +02001245static void sink_init()
1246{
Emeric Brun54648852020-07-06 15:54:06 +02001247 sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1248 sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1249 sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001250}
1251
1252static void sink_deinit()
1253{
1254 struct sink *sink, *sb;
1255
1256 list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
1257 if (sink->type == SINK_TYPE_BUFFER)
1258 ring_free(sink->ctx.ring);
Willy Tarreau2b718102021-04-21 07:32:39 +02001259 LIST_DELETE(&sink->sink_list);
Emeric Brun99c453d2020-05-25 15:01:04 +02001260 free(sink->name);
1261 free(sink->desc);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001262 free(sink);
1263 }
Willy Tarreau973e6622019-08-20 11:57:52 +02001264}
1265
1266INITCALL0(STG_REGISTER, sink_init);
Willy Tarreau4ed23ca2019-08-23 15:47:49 +02001267REGISTER_POST_DEINIT(sink_deinit);
Willy Tarreau973e6622019-08-20 11:57:52 +02001268
Willy Tarreau9f830d72019-08-26 18:17:04 +02001269static struct cli_kw_list cli_kws = {{ },{
Willy Tarreaub205bfd2021-05-07 11:38:37 +02001270 { { "show", "events", NULL }, "show events [<sink>] [-w] [-n] : show event sink state", cli_parse_show_events, NULL, NULL },
Willy Tarreau9f830d72019-08-26 18:17:04 +02001271 {{},}
1272}};
1273
1274INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1275
Emeric Brun99c453d2020-05-25 15:01:04 +02001276/* config parsers for this section */
1277REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1278REGISTER_POST_CHECK(post_sink_resolve);
1279
Willy Tarreau67b5a162019-08-11 16:38:56 +02001280/*
1281 * Local variables:
1282 * c-indent-level: 8
1283 * c-basic-offset: 8
1284 * End:
1285 */