blob: c65513a06334d8b7770495db06541b0bddb8661b [file] [log] [blame]
/*
* Event sink management
*
* Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation, version 2.1
* exclusively.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <common/cfgparse.h>
#include <common/compat.h>
#include <common/config.h>
#include <common/ist.h>
#include <common/mini-clist.h>
#include <common/time.h>
#include <proto/cli.h>
#include <proto/log.h>
#include <proto/ring.h>
#include <proto/sink.h>
#include <proto/stream_interface.h>
struct list sink_list = LIST_HEAD_INIT(sink_list);
struct sink *cfg_sink;
struct sink *sink_find(const char *name)
{
struct sink *sink;
list_for_each_entry(sink, &sink_list, sink_list)
if (strcmp(sink->name, name) == 0)
return sink;
return NULL;
}
/* creates a new sink and adds it to the list, it's still generic and not fully
* initialized. Returns NULL on allocation failure. If another one already
* exists with the same name, it will be returned. The caller can detect it as
* a newly created one has type SINK_TYPE_NEW.
*/
static struct sink *__sink_new(const char *name, const char *desc, enum sink_fmt fmt)
{
struct sink *sink;
sink = sink_find(name);
if (sink)
goto end;
sink = malloc(sizeof(*sink));
if (!sink)
goto end;
sink->name = strdup(name);
sink->desc = strdup(desc);
sink->fmt = fmt;
sink->type = SINK_TYPE_NEW;
sink->maxlen = BUFSIZE;
/* address will be filled by the caller if needed */
sink->ctx.fd = -1;
sink->ctx.dropped = 0;
HA_RWLOCK_INIT(&sink->ctx.lock);
LIST_ADDQ(&sink_list, &sink->sink_list);
end:
return sink;
}
/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
* and description <desc>. Returns NULL on allocation failure or conflict.
* Perfect duplicates are merged (same type, fd, and name).
*/
struct sink *sink_new_fd(const char *name, const char *desc, enum sink_fmt fmt, int fd)
{
struct sink *sink;
sink = __sink_new(name, desc, fmt);
if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
goto end;
if (sink->type != SINK_TYPE_NEW) {
sink = NULL;
goto end;
}
sink->type = SINK_TYPE_FD;
sink->ctx.fd = fd;
end:
return sink;
}
/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
* and description <desc>. Returns NULL on allocation failure or conflict.
* Perfect duplicates are merged (same type and name). If sizes differ, the
* largest one is kept.
*/
struct sink *sink_new_buf(const char *name, const char *desc, enum sink_fmt fmt, size_t size)
{
struct sink *sink;
sink = __sink_new(name, desc, fmt);
if (!sink)
goto fail;
if (sink->type == SINK_TYPE_BUFFER) {
/* such a buffer already exists, we may have to resize it */
if (!ring_resize(sink->ctx.ring, size))
goto fail;
goto end;
}
if (sink->type != SINK_TYPE_NEW) {
/* already exists of another type */
goto fail;
}
sink->ctx.ring = ring_new(size);
if (!sink->ctx.ring) {
LIST_DEL(&sink->sink_list);
free(sink->name);
free(sink->desc);
free(sink);
goto fail;
}
sink->type = SINK_TYPE_BUFFER;
end:
return sink;
fail:
return NULL;
}
/* tries to send <nmsg> message parts (up to 8, ignored above) from message
* array <msg> to sink <sink>. Formating according to the sink's preference is
* done here. Lost messages are NOT accounted for. It is preferable to call
* sink_write() instead which will also try to emit the number of dropped
* messages when there are any. It returns >0 if it could write anything,
* <=0 otherwise.
*/
ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg,
int level, int facility, struct ist *tag,
struct ist *pid, struct ist *sd)
{
int log_format;
char short_hdr[4];
struct ist pfx[6];
size_t npfx = 0;
char *hdr_ptr;
int fac_level;
if (sink->fmt == SINK_FMT_RAW)
goto send;
if (sink->fmt == SINK_FMT_SHORT || sink->fmt == SINK_FMT_TIMED) {
short_hdr[0] = '<';
short_hdr[1] = '0' + level;
short_hdr[2] = '>';
pfx[npfx].ptr = short_hdr;
pfx[npfx].len = 3;
npfx++;
if (sink->fmt == SINK_FMT_SHORT)
goto send;
}
if (sink->fmt == SINK_FMT_ISO || sink->fmt == SINK_FMT_TIMED) {
pfx[npfx].ptr = timeofday_as_iso_us(1);
pfx[npfx].len = 27;
npfx++;
goto send;
}
else if (sink->fmt == SINK_FMT_RFC5424) {
pfx[npfx].ptr = logheader_rfc5424;
pfx[npfx].len = update_log_hdr_rfc5424(date.tv_sec) - pfx[npfx].ptr;
log_format = LOG_FORMAT_RFC5424;
}
else {
pfx[npfx].ptr = logheader;
pfx[npfx].len = update_log_hdr(date.tv_sec) - pfx[npfx].ptr;
log_format = LOG_FORMAT_RFC3164;
sd = NULL;
}
fac_level = (facility << 3) + level;
hdr_ptr = pfx[npfx].ptr + 3; /* last digit of the log level */
do {
*hdr_ptr = '0' + fac_level % 10;
fac_level /= 10;
hdr_ptr--;
} while (fac_level && hdr_ptr > pfx[npfx].ptr);
*hdr_ptr = '<';
pfx[npfx].len -= hdr_ptr - pfx[npfx].ptr;
pfx[npfx].ptr = hdr_ptr;
npfx++;
if (tag && tag->len) {
pfx[npfx].ptr = tag->ptr;
pfx[npfx].len = tag->len;
npfx++;
}
pfx[npfx].ptr = get_format_pid_sep1(log_format, &pfx[npfx].len);
if (pfx[npfx].len)
npfx++;
if (pid && pid->len) {
pfx[npfx].ptr = pid->ptr;
pfx[npfx].len = pid->len;
npfx++;
}
pfx[npfx].ptr = get_format_pid_sep2(log_format, &pfx[npfx].len);
if (pfx[npfx].len)
npfx++;
if (sd && sd->len) {
pfx[npfx].ptr = sd->ptr;
pfx[npfx].len = sd->len;
npfx++;
}
send:
if (sink->type == SINK_TYPE_FD) {
return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1);
}
else if (sink->type == SINK_TYPE_BUFFER) {
return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg);
}
return 0;
}
/* Tries to emit a message indicating the number of dropped events. In case of
* success, the amount of drops is reduced by as much. It's supposed to be
* called under an exclusive lock on the sink to avoid multiple produces doing
* the same. On success, >0 is returned, otherwise <=0 on failure.
*/
int sink_announce_dropped(struct sink *sink, int facility, struct ist *pid)
{
unsigned int dropped;
struct buffer msg;
struct ist msgvec[1];
char logbuf[64];
struct ist sd;
struct ist tag;
while (unlikely((dropped = sink->ctx.dropped) > 0)) {
chunk_init(&msg, logbuf, sizeof(logbuf));
chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : "");
msgvec[0] = ist2(msg.area, msg.data);
sd.ptr = default_rfc5424_sd_log_format;
sd.len = 2;
tag.ptr = global.log_tag.area;
tag.len = global.log_tag.data;
if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, &tag, pid, &sd) <= 0)
return 0;
/* success! */
HA_ATOMIC_SUB(&sink->ctx.dropped, dropped);
}
return 1;
}
/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
{
struct sink *sink;
int arg;
args++; // make args[1] the 1st arg
if (!*args[1]) {
/* no arg => report the list of supported sink */
chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n");
list_for_each_entry(sink, &sink_list, sink_list) {
chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n",
sink->name,
sink->type == SINK_TYPE_NEW ? "init" :
sink->type == SINK_TYPE_FD ? "fd" :
sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
sink->ctx.dropped, sink->desc);
}
trash.area[trash.data] = 0;
return cli_msg(appctx, LOG_WARNING, trash.area);
}
if (!cli_has_level(appctx, ACCESS_LVL_OPER))
return 1;
sink = sink_find(args[1]);
if (!sink)
return cli_err(appctx, "No such event sink");
if (sink->type != SINK_TYPE_BUFFER)
return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
for (arg = 2; *args[arg]; arg++) {
if (strcmp(args[arg], "-w") == 0)
appctx->ctx.cli.i0 |= 1; // wait mode
else if (strcmp(args[arg], "-n") == 0)
appctx->ctx.cli.i0 |= 2; // seek to new
else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
appctx->ctx.cli.i0 |= 3; // seek to new + wait
else
return cli_err(appctx, "unknown option");
}
return ring_attach_cli(sink->ctx.ring, appctx);
}
/*
* Parse "ring" section and create corresponding sink buffer.
*
* The function returns 0 in success case, otherwise, it returns error
* flags.
*/
int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
{
int err_code = 0;
const char *inv;
size_t size = BUFSIZE;
if (strcmp(args[0], "ring") == 0) { /* new peers section */
if (!*args[1]) {
ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
inv = invalid_char(args[1]);
if (inv) {
ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
if (sink_find(args[1])) {
ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
cfg_sink = sink_new_buf(args[1], args[1] , SINK_FMT_RAW, size);
if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) {
ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
}
else if (strcmp(args[0], "size") == 0) {
size = atol(args[1]);
if (!size) {
ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)
|| !ring_resize(cfg_sink->ctx.ring, size)) {
ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
}
else if (strcmp(args[0],"format") == 0) {
if (!cfg_sink) {
ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
if (strcmp(args[1], "raw") == 0) {
cfg_sink->fmt = SINK_FMT_RAW;
}
else if (strcmp(args[1], "short") == 0) {
cfg_sink->fmt = SINK_FMT_SHORT;
}
else if (strcmp(args[1], "iso") == 0) {
cfg_sink->fmt = SINK_FMT_ISO;
}
else if (strcmp(args[1], "timed") == 0) {
cfg_sink->fmt = SINK_FMT_TIMED;
}
else if (strcmp(args[1], "rfc3164") == 0) {
cfg_sink->fmt = SINK_FMT_RFC3164;
}
else if (strcmp(args[1], "rfc5424") == 0) {
cfg_sink->fmt = SINK_FMT_RFC5424;
}
else {
ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
}
else if (strcmp(args[0],"maxlen") == 0) {
if (!cfg_sink) {
ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
cfg_sink->maxlen = atol(args[1]);
if (!cfg_sink->maxlen) {
ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
}
else if (strcmp(args[0],"description") == 0) {
if (!cfg_sink) {
ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
if (!*args[1]) {
ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
free(cfg_sink->desc);
cfg_sink->desc = strdup(args[1]);
if (!cfg_sink->desc) {
ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
}
err:
return err_code;
}
/*
* Post parsing "ring" section.
*
* The function returns 0 in success case, otherwise, it returns error
* flags.
*/
int cfg_post_parse_ring()
{
int err_code = 0;
if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) {
ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n",
cfg_sink->name, cfg_sink->maxlen, b_size(&cfg_sink->ctx.ring->buf));
cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf);
err_code |= ERR_ALERT;
}
}
cfg_sink = NULL;
return err_code;
}
/* resolve sink names at end of config. Returns 0 on success otherwise error
* flags.
*/
int post_sink_resolve()
{
int err_code = 0;
struct logsrv *logsrv, *logb;
struct sink *sink;
struct proxy *px;
list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) {
if (logsrv->type == LOG_TARGET_BUFFER) {
sink = sink_find(logsrv->ring_name);
if (!sink || sink->type != SINK_TYPE_BUFFER) {
ha_alert("global log server uses unkown ring named '%s'.\n", logsrv->ring_name);
err_code |= ERR_ALERT | ERR_FATAL;
}
logsrv->sink = sink;
}
}
for (px = proxies_list; px; px = px->next) {
list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) {
if (logsrv->type == LOG_TARGET_BUFFER) {
sink = sink_find(logsrv->ring_name);
if (!sink || sink->type != SINK_TYPE_BUFFER) {
ha_alert("proxy '%s' log server uses unkown ring named '%s'.\n", px->id, logsrv->ring_name);
err_code |= ERR_ALERT | ERR_FATAL;
}
logsrv->sink = sink;
}
}
}
return err_code;
}
static void sink_init()
{
sink_new_fd("stdout", "standard output (fd#1)", SINK_FMT_RAW, 1);
sink_new_fd("stderr", "standard output (fd#2)", SINK_FMT_RAW, 2);
sink_new_buf("buf0", "in-memory ring buffer", SINK_FMT_TIMED, 1048576);
}
static void sink_deinit()
{
struct sink *sink, *sb;
list_for_each_entry_safe(sink, sb, &sink_list, sink_list) {
if (sink->type == SINK_TYPE_BUFFER)
ring_free(sink->ctx.ring);
LIST_DEL(&sink->sink_list);
free(sink->name);
free(sink->desc);
free(sink);
}
}
INITCALL0(STG_REGISTER, sink_init);
REGISTER_POST_DEINIT(sink_deinit);
static struct cli_kw_list cli_kws = {{ },{
{ { "show", "events", NULL }, "show events [<sink>] : show event sink state", cli_parse_show_events, NULL, NULL },
{{},}
}};
INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
/* config parsers for this section */
REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
REGISTER_POST_CHECK(post_sink_resolve);
/*
* Local variables:
* c-indent-level: 8
* c-basic-offset: 8
* End:
*/