| /* |
| * Event sink management |
| * |
| * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Lesser General Public |
| * License as published by the Free Software Foundation, version 2.1 |
| * exclusively. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Lesser General Public License for more details. |
| * |
| * You should have received a copy of the GNU Lesser General Public |
| * License along with this library; if not, write to the Free Software |
| * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| */ |
| |
| #include <import/ist.h> |
| #include <haproxy/api.h> |
| #include <haproxy/cfgparse.h> |
| #include <haproxy/cli.h> |
| #include <haproxy/errors.h> |
| #include <haproxy/list.h> |
| #include <haproxy/log.h> |
| #include <haproxy/ring.h> |
| #include <haproxy/signal.h> |
| #include <haproxy/sink.h> |
| #include <haproxy/stream_interface.h> |
| #include <haproxy/time.h> |
| |
| struct list sink_list = LIST_HEAD_INIT(sink_list); |
| |
| struct sink *cfg_sink; |
| |
| struct sink *sink_find(const char *name) |
| { |
| struct sink *sink; |
| |
| list_for_each_entry(sink, &sink_list, sink_list) |
| if (strcmp(sink->name, name) == 0) |
| return sink; |
| return NULL; |
| } |
| |
| /* creates a new sink and adds it to the list, it's still generic and not fully |
| * initialized. Returns NULL on allocation failure. If another one already |
| * exists with the same name, it will be returned. The caller can detect it as |
| * a newly created one has type SINK_TYPE_NEW. |
| */ |
| static struct sink *__sink_new(const char *name, const char *desc, enum sink_fmt fmt) |
| { |
| struct sink *sink; |
| |
| sink = sink_find(name); |
| if (sink) |
| goto end; |
| |
| sink = calloc(1, sizeof(*sink)); |
| if (!sink) |
| goto end; |
| |
| sink->name = strdup(name); |
| sink->desc = strdup(desc); |
| sink->fmt = fmt; |
| sink->type = SINK_TYPE_NEW; |
| sink->maxlen = BUFSIZE; |
| /* address will be filled by the caller if needed */ |
| sink->ctx.fd = -1; |
| sink->ctx.dropped = 0; |
| HA_RWLOCK_INIT(&sink->ctx.lock); |
| LIST_ADDQ(&sink_list, &sink->sink_list); |
| end: |
| return sink; |
| } |
| |
| /* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>, |
| * and description <desc>. Returns NULL on allocation failure or conflict. |
| * Perfect duplicates are merged (same type, fd, and name). |
| */ |
| struct sink *sink_new_fd(const char *name, const char *desc, enum sink_fmt fmt, int fd) |
| { |
| struct sink *sink; |
| |
| sink = __sink_new(name, desc, fmt); |
| if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd)) |
| goto end; |
| |
| if (sink->type != SINK_TYPE_NEW) { |
| sink = NULL; |
| goto end; |
| } |
| |
| sink->type = SINK_TYPE_FD; |
| sink->ctx.fd = fd; |
| end: |
| return sink; |
| } |
| |
| /* creates a sink called <name> of type BUF of size <size>, format <fmt>, |
| * and description <desc>. Returns NULL on allocation failure or conflict. |
| * Perfect duplicates are merged (same type and name). If sizes differ, the |
| * largest one is kept. |
| */ |
| struct sink *sink_new_buf(const char *name, const char *desc, enum sink_fmt fmt, size_t size) |
| { |
| struct sink *sink; |
| |
| sink = __sink_new(name, desc, fmt); |
| if (!sink) |
| goto fail; |
| |
| if (sink->type == SINK_TYPE_BUFFER) { |
| /* such a buffer already exists, we may have to resize it */ |
| if (!ring_resize(sink->ctx.ring, size)) |
| goto fail; |
| goto end; |
| } |
| |
| if (sink->type != SINK_TYPE_NEW) { |
| /* already exists of another type */ |
| goto fail; |
| } |
| |
| sink->ctx.ring = ring_new(size); |
| if (!sink->ctx.ring) { |
| LIST_DEL(&sink->sink_list); |
| free(sink->name); |
| free(sink->desc); |
| free(sink); |
| goto fail; |
| } |
| |
| sink->type = SINK_TYPE_BUFFER; |
| end: |
| return sink; |
| fail: |
| return NULL; |
| } |
| |
| /* tries to send <nmsg> message parts (up to 8, ignored above) from message |
| * array <msg> to sink <sink>. Formatting according to the sink's preference is |
| * done here. Lost messages are NOT accounted for. It is preferable to call |
| * sink_write() instead which will also try to emit the number of dropped |
| * messages when there are any. It returns >0 if it could write anything, |
| * <=0 otherwise. |
| */ |
| ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg, |
| int level, int facility, struct ist *tag, |
| struct ist *pid, struct ist *sd) |
| { |
| int log_format; |
| char short_hdr[4]; |
| struct ist pfx[6]; |
| size_t npfx = 0; |
| char *hdr_ptr; |
| int fac_level; |
| |
| if (sink->fmt == SINK_FMT_RAW) |
| goto send; |
| |
| if (sink->fmt == SINK_FMT_SHORT || sink->fmt == SINK_FMT_TIMED) { |
| short_hdr[0] = '<'; |
| short_hdr[1] = '0' + level; |
| short_hdr[2] = '>'; |
| |
| pfx[npfx].ptr = short_hdr; |
| pfx[npfx].len = 3; |
| npfx++; |
| if (sink->fmt == SINK_FMT_SHORT) |
| goto send; |
| } |
| |
| |
| if (sink->fmt == SINK_FMT_ISO || sink->fmt == SINK_FMT_TIMED) { |
| pfx[npfx].ptr = timeofday_as_iso_us(1); |
| pfx[npfx].len = 33; |
| npfx++; |
| goto send; |
| } |
| else if (sink->fmt == SINK_FMT_RFC5424) { |
| pfx[npfx].ptr = logheader_rfc5424; |
| pfx[npfx].len = update_log_hdr_rfc5424(date.tv_sec, date.tv_usec) - pfx[npfx].ptr; |
| log_format = LOG_FORMAT_RFC5424; |
| } |
| else { |
| pfx[npfx].ptr = logheader; |
| pfx[npfx].len = update_log_hdr(date.tv_sec) - pfx[npfx].ptr; |
| log_format = LOG_FORMAT_RFC3164; |
| sd = NULL; |
| } |
| |
| fac_level = (facility << 3) + level; |
| hdr_ptr = pfx[npfx].ptr + 3; /* last digit of the log level */ |
| do { |
| *hdr_ptr = '0' + fac_level % 10; |
| fac_level /= 10; |
| hdr_ptr--; |
| } while (fac_level && hdr_ptr > pfx[npfx].ptr); |
| *hdr_ptr = '<'; |
| pfx[npfx].len -= hdr_ptr - pfx[npfx].ptr; |
| pfx[npfx].ptr = hdr_ptr; |
| npfx++; |
| |
| if (tag && tag->len) { |
| pfx[npfx].ptr = tag->ptr; |
| pfx[npfx].len = tag->len; |
| npfx++; |
| } |
| pfx[npfx].ptr = get_format_pid_sep1(log_format, &pfx[npfx].len); |
| if (pfx[npfx].len) |
| npfx++; |
| |
| if (pid && pid->len) { |
| pfx[npfx].ptr = pid->ptr; |
| pfx[npfx].len = pid->len; |
| npfx++; |
| } |
| |
| pfx[npfx].ptr = get_format_pid_sep2(log_format, &pfx[npfx].len); |
| if (pfx[npfx].len) |
| npfx++; |
| |
| if (sd && sd->len) { |
| pfx[npfx].ptr = sd->ptr; |
| pfx[npfx].len = sd->len; |
| npfx++; |
| } |
| |
| send: |
| if (sink->type == SINK_TYPE_FD) { |
| return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1); |
| } |
| else if (sink->type == SINK_TYPE_BUFFER) { |
| return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg); |
| } |
| return 0; |
| } |
| |
| /* Tries to emit a message indicating the number of dropped events. In case of |
| * success, the amount of drops is reduced by as much. It's supposed to be |
| * called under an exclusive lock on the sink to avoid multiple produces doing |
| * the same. On success, >0 is returned, otherwise <=0 on failure. |
| */ |
| int sink_announce_dropped(struct sink *sink, int facility, struct ist *pid) |
| { |
| unsigned int dropped; |
| struct buffer msg; |
| struct ist msgvec[1]; |
| char logbuf[64]; |
| struct ist sd; |
| struct ist tag; |
| |
| while (unlikely((dropped = sink->ctx.dropped) > 0)) { |
| chunk_init(&msg, logbuf, sizeof(logbuf)); |
| chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : ""); |
| msgvec[0] = ist2(msg.area, msg.data); |
| |
| sd.ptr = default_rfc5424_sd_log_format; |
| sd.len = 2; |
| tag.ptr = global.log_tag.area; |
| tag.len = global.log_tag.data; |
| if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, &tag, pid, &sd) <= 0) |
| return 0; |
| /* success! */ |
| HA_ATOMIC_SUB(&sink->ctx.dropped, dropped); |
| } |
| return 1; |
| } |
| |
| /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */ |
| static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private) |
| { |
| struct sink *sink; |
| int arg; |
| |
| args++; // make args[1] the 1st arg |
| |
| if (!*args[1]) { |
| /* no arg => report the list of supported sink */ |
| chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n"); |
| list_for_each_entry(sink, &sink_list, sink_list) { |
| chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n", |
| sink->name, |
| sink->type == SINK_TYPE_NEW ? "init" : |
| sink->type == SINK_TYPE_FD ? "fd" : |
| sink->type == SINK_TYPE_BUFFER ? "buffer" : "?", |
| sink->ctx.dropped, sink->desc); |
| } |
| |
| trash.area[trash.data] = 0; |
| return cli_msg(appctx, LOG_WARNING, trash.area); |
| } |
| |
| if (!cli_has_level(appctx, ACCESS_LVL_OPER)) |
| return 1; |
| |
| sink = sink_find(args[1]); |
| if (!sink) |
| return cli_err(appctx, "No such event sink"); |
| |
| if (sink->type != SINK_TYPE_BUFFER) |
| return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink"); |
| |
| for (arg = 2; *args[arg]; arg++) { |
| if (strcmp(args[arg], "-w") == 0) |
| appctx->ctx.cli.i0 |= 1; // wait mode |
| else if (strcmp(args[arg], "-n") == 0) |
| appctx->ctx.cli.i0 |= 2; // seek to new |
| else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0) |
| appctx->ctx.cli.i0 |= 3; // seek to new + wait |
| else |
| return cli_err(appctx, "unknown option"); |
| } |
| return ring_attach_cli(sink->ctx.ring, appctx); |
| } |
| |
| /* Pre-configures a ring proxy to emit connections */ |
| void sink_setup_proxy(struct proxy *px) |
| { |
| px->last_change = now.tv_sec; |
| px->cap = PR_CAP_FE | PR_CAP_BE; |
| px->maxconn = 0; |
| px->conn_retries = 1; |
| px->timeout.server = TICK_ETERNITY; |
| px->timeout.client = TICK_ETERNITY; |
| px->timeout.connect = TICK_ETERNITY; |
| px->accept = NULL; |
| px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC; |
| px->bind_proc = 0; /* will be filled by users */ |
| } |
| |
| /* |
| * IO Handler to handle message push to syslog tcp server |
| */ |
| static void sink_forward_io_handler(struct appctx *appctx) |
| { |
| struct stream_interface *si = appctx->owner; |
| struct stream *s = si_strm(si); |
| struct sink *sink = strm_fe(s)->parent; |
| struct sink_forward_target *sft = appctx->ctx.sft.ptr; |
| struct ring *ring = sink->ctx.ring; |
| struct buffer *buf = &ring->buf; |
| uint64_t msg_len; |
| size_t len, cnt, ofs; |
| int ret = 0; |
| |
| /* if stopping was requested, close immediately */ |
| if (unlikely(stopping)) |
| goto close; |
| |
| /* for rex because it seems reset to timeout |
| * and we don't want expire on this case |
| * with a syslog server |
| */ |
| si_oc(si)->rex = TICK_ETERNITY; |
| /* rto should not change but it seems the case */ |
| si_oc(si)->rto = TICK_ETERNITY; |
| |
| /* an error was detected */ |
| if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW))) |
| goto close; |
| |
| /* con closed by server side */ |
| if ((si_oc(si)->flags & CF_SHUTW)) |
| goto close; |
| |
| /* if the connection is not established, inform the stream that we want |
| * to be notified whenever the connection completes. |
| */ |
| if (si_opposite(si)->state < SI_ST_EST) { |
| si_cant_get(si); |
| si_rx_conn_blk(si); |
| si_rx_endp_more(si); |
| return; |
| } |
| |
| HA_SPIN_LOCK(SFT_LOCK, &sft->lock); |
| if (appctx != sft->appctx) { |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| goto close; |
| } |
| ofs = sft->ofs; |
| |
| HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); |
| LIST_DEL_INIT(&appctx->wait_entry); |
| HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| /* explanation for the initialization below: it would be better to do |
| * this in the parsing function but this would occasionally result in |
| * dropped events because we'd take a reference on the oldest message |
| * and keep it while being scheduled. Thus instead let's take it the |
| * first time we enter here so that we have a chance to pass many |
| * existing messages before grabbing a reference to a location. This |
| * value cannot be produced after initialization. |
| */ |
| if (unlikely(ofs == ~0)) { |
| ofs = 0; |
| |
| HA_ATOMIC_ADD(b_peek(buf, ofs), 1); |
| ofs += ring->ofs; |
| } |
| |
| /* we were already there, adjust the offset to be relative to |
| * the buffer's head and remove us from the counter. |
| */ |
| ofs -= ring->ofs; |
| BUG_ON(ofs >= buf->size); |
| HA_ATOMIC_SUB(b_peek(buf, ofs), 1); |
| |
| /* in this loop, ofs always points to the counter byte that precedes |
| * the message so that we can take our reference there if we have to |
| * stop before the end (ret=0). |
| */ |
| if (si_opposite(si)->state == SI_ST_EST) { |
| ret = 1; |
| while (ofs + 1 < b_data(buf)) { |
| cnt = 1; |
| len = b_peek_varint(buf, ofs + cnt, &msg_len); |
| if (!len) |
| break; |
| cnt += len; |
| BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); |
| |
| if (unlikely(msg_len + 1 > b_size(&trash))) { |
| /* too large a message to ever fit, let's skip it */ |
| ofs += cnt + msg_len; |
| continue; |
| } |
| |
| chunk_reset(&trash); |
| len = b_getblk(buf, trash.area, msg_len, ofs + cnt); |
| trash.data += len; |
| trash.area[trash.data++] = '\n'; |
| |
| if (ci_putchk(si_ic(si), &trash) == -1) { |
| si_rx_room_blk(si); |
| ret = 0; |
| break; |
| } |
| ofs += cnt + msg_len; |
| } |
| |
| HA_ATOMIC_ADD(b_peek(buf, ofs), 1); |
| ofs += ring->ofs; |
| sft->ofs = ofs; |
| } |
| HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| if (ret) { |
| /* let's be woken up once new data arrive */ |
| HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); |
| LIST_ADDQ(&ring->waiters, &appctx->wait_entry); |
| HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); |
| si_rx_endp_done(si); |
| } |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| |
| /* always drain data from server */ |
| co_skip(si_oc(si), si_oc(si)->output); |
| return; |
| |
| close: |
| si_shutw(si); |
| si_shutr(si); |
| si_ic(si)->flags |= CF_READ_NULL; |
| } |
| |
| /* |
| * IO Handler to handle message push to syslog tcp server |
| * using octet counting frames |
| */ |
| static void sink_forward_oc_io_handler(struct appctx *appctx) |
| { |
| struct stream_interface *si = appctx->owner; |
| struct stream *s = si_strm(si); |
| struct sink *sink = strm_fe(s)->parent; |
| struct sink_forward_target *sft = appctx->ctx.sft.ptr; |
| struct ring *ring = sink->ctx.ring; |
| struct buffer *buf = &ring->buf; |
| uint64_t msg_len; |
| size_t len, cnt, ofs; |
| int ret = 0; |
| char *p; |
| |
| /* if stopping was requested, close immediately */ |
| if (unlikely(stopping)) |
| goto close; |
| |
| /* for rex because it seems reset to timeout |
| * and we don't want expire on this case |
| * with a syslog server |
| */ |
| si_oc(si)->rex = TICK_ETERNITY; |
| /* rto should not change but it seems the case */ |
| si_oc(si)->rto = TICK_ETERNITY; |
| |
| /* an error was detected */ |
| if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW))) |
| goto close; |
| |
| /* con closed by server side */ |
| if ((si_oc(si)->flags & CF_SHUTW)) |
| goto close; |
| |
| /* if the connection is not established, inform the stream that we want |
| * to be notified whenever the connection completes. |
| */ |
| if (si_opposite(si)->state < SI_ST_EST) { |
| si_cant_get(si); |
| si_rx_conn_blk(si); |
| si_rx_endp_more(si); |
| return; |
| } |
| |
| HA_SPIN_LOCK(SFT_LOCK, &sft->lock); |
| if (appctx != sft->appctx) { |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| goto close; |
| } |
| ofs = sft->ofs; |
| |
| HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); |
| LIST_DEL_INIT(&appctx->wait_entry); |
| HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| /* explanation for the initialization below: it would be better to do |
| * this in the parsing function but this would occasionally result in |
| * dropped events because we'd take a reference on the oldest message |
| * and keep it while being scheduled. Thus instead let's take it the |
| * first time we enter here so that we have a chance to pass many |
| * existing messages before grabbing a reference to a location. This |
| * value cannot be produced after initialization. |
| */ |
| if (unlikely(ofs == ~0)) { |
| ofs = 0; |
| |
| HA_ATOMIC_ADD(b_peek(buf, ofs), 1); |
| ofs += ring->ofs; |
| } |
| |
| /* we were already there, adjust the offset to be relative to |
| * the buffer's head and remove us from the counter. |
| */ |
| ofs -= ring->ofs; |
| BUG_ON(ofs >= buf->size); |
| HA_ATOMIC_SUB(b_peek(buf, ofs), 1); |
| |
| /* in this loop, ofs always points to the counter byte that precedes |
| * the message so that we can take our reference there if we have to |
| * stop before the end (ret=0). |
| */ |
| if (si_opposite(si)->state == SI_ST_EST) { |
| ret = 1; |
| while (ofs + 1 < b_data(buf)) { |
| cnt = 1; |
| len = b_peek_varint(buf, ofs + cnt, &msg_len); |
| if (!len) |
| break; |
| cnt += len; |
| BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); |
| |
| chunk_reset(&trash); |
| p = ulltoa(msg_len, trash.area, b_size(&trash)); |
| if (p) { |
| trash.data = (p - trash.area) + 1; |
| *p = ' '; |
| } |
| |
| if (!p || (trash.data + msg_len > b_size(&trash))) { |
| /* too large a message to ever fit, let's skip it */ |
| ofs += cnt + msg_len; |
| continue; |
| } |
| |
| trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt); |
| |
| if (ci_putchk(si_ic(si), &trash) == -1) { |
| si_rx_room_blk(si); |
| ret = 0; |
| break; |
| } |
| ofs += cnt + msg_len; |
| } |
| |
| HA_ATOMIC_ADD(b_peek(buf, ofs), 1); |
| ofs += ring->ofs; |
| sft->ofs = ofs; |
| } |
| HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| if (ret) { |
| /* let's be woken up once new data arrive */ |
| HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); |
| LIST_ADDQ(&ring->waiters, &appctx->wait_entry); |
| HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); |
| si_rx_endp_done(si); |
| } |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| |
| /* always drain data from server */ |
| co_skip(si_oc(si), si_oc(si)->output); |
| return; |
| |
| close: |
| si_shutw(si); |
| si_shutr(si); |
| si_ic(si)->flags |= CF_READ_NULL; |
| } |
| |
| void __sink_forward_session_deinit(struct sink_forward_target *sft) |
| { |
| struct stream_interface *si; |
| struct stream *s; |
| struct sink *sink; |
| |
| if (!sft->appctx) |
| return; |
| |
| si = sft->appctx->owner; |
| if (!si) |
| return; |
| |
| s = si_strm(si); |
| if (!s) |
| return; |
| |
| sink = strm_fe(s)->parent; |
| if (!sink) |
| return; |
| |
| HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock); |
| LIST_DEL_INIT(&sft->appctx->wait_entry); |
| HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock); |
| |
| sft->appctx = NULL; |
| task_wakeup(sink->forward_task, TASK_WOKEN_MSG); |
| } |
| |
| |
| static void sink_forward_session_release(struct appctx *appctx) |
| { |
| struct sink_forward_target *sft = appctx->ctx.peers.ptr; |
| |
| if (!sft) |
| return; |
| |
| HA_SPIN_LOCK(SFT_LOCK, &sft->lock); |
| if (sft->appctx == appctx) |
| __sink_forward_session_deinit(sft); |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| } |
| |
| static struct applet sink_forward_applet = { |
| .obj_type = OBJ_TYPE_APPLET, |
| .name = "<SINKFWD>", /* used for logging */ |
| .fct = sink_forward_io_handler, |
| .release = sink_forward_session_release, |
| }; |
| |
| static struct applet sink_forward_oc_applet = { |
| .obj_type = OBJ_TYPE_APPLET, |
| .name = "<SINKFWDOC>", /* used for logging */ |
| .fct = sink_forward_oc_io_handler, |
| .release = sink_forward_session_release, |
| }; |
| |
| /* |
| * Create a new peer session in assigned state (connect will start automatically) |
| */ |
| static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft) |
| { |
| struct proxy *p = sink->forward_px; |
| struct appctx *appctx; |
| struct session *sess; |
| struct stream *s; |
| struct applet *applet = &sink_forward_applet; |
| |
| if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) |
| applet = &sink_forward_oc_applet; |
| |
| appctx = appctx_new(applet, tid_bit); |
| if (!appctx) |
| goto out_close; |
| |
| appctx->ctx.sft.ptr = (void *)sft; |
| |
| sess = session_new(p, NULL, &appctx->obj_type); |
| if (!sess) { |
| ha_alert("out of memory in peer_session_create().\n"); |
| goto out_free_appctx; |
| } |
| |
| if ((s = stream_new(sess, &appctx->obj_type)) == NULL) { |
| ha_alert("Failed to initialize stream in peer_session_create().\n"); |
| goto out_free_sess; |
| } |
| |
| |
| s->target = &sft->srv->obj_type; |
| if (!sockaddr_alloc(&s->target_addr)) |
| goto out_free_strm; |
| *s->target_addr = sft->srv->addr; |
| s->flags = SF_ASSIGNED|SF_ADDR_SET; |
| s->si[1].flags |= SI_FL_NOLINGER; |
| |
| s->do_log = NULL; |
| s->uniq_id = 0; |
| |
| s->res.flags |= CF_READ_DONTWAIT; |
| /* for rto and rex to eternity to not expire on idle recv: |
| * We are using a syslog server. |
| */ |
| s->res.rto = TICK_ETERNITY; |
| s->res.rex = TICK_ETERNITY; |
| sft->appctx = appctx; |
| task_wakeup(s->task, TASK_WOKEN_INIT); |
| return appctx; |
| |
| /* Error unrolling */ |
| out_free_strm: |
| LIST_DEL(&s->list); |
| pool_free(pool_head_stream, s); |
| out_free_sess: |
| session_free(sess); |
| out_free_appctx: |
| appctx_free(appctx); |
| out_close: |
| return NULL; |
| } |
| |
| /* |
| * Task to handle connctions to forward servers |
| */ |
| static struct task *process_sink_forward(struct task * task, void *context, unsigned short state) |
| { |
| struct sink *sink = (struct sink *)context; |
| struct sink_forward_target *sft = sink->sft; |
| |
| task->expire = TICK_ETERNITY; |
| |
| if (!stopping) { |
| while (sft) { |
| HA_SPIN_LOCK(SFT_LOCK, &sft->lock); |
| /* if appctx is NULL, start a new session */ |
| if (!sft->appctx) |
| sft->appctx = sink_forward_session_create(sink, sft); |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| sft = sft->next; |
| } |
| } |
| else { |
| while (sft) { |
| HA_SPIN_LOCK(SFT_LOCK, &sft->lock); |
| /* awake applet to perform a clean close */ |
| if (sft->appctx) |
| appctx_wakeup(sft->appctx); |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| sft = sft->next; |
| } |
| } |
| |
| return task; |
| } |
| /* |
| * Init task to manage connctions to forward servers |
| * |
| * returns 0 in case of error. |
| */ |
| int sink_init_forward(struct sink *sink) |
| { |
| sink->forward_task = task_new(MAX_THREADS_MASK); |
| if (!sink->forward_task) |
| return 0; |
| |
| sink->forward_task->process = process_sink_forward; |
| sink->forward_task->context = (void *)sink; |
| sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0); |
| task_wakeup(sink->forward_task, TASK_WOKEN_INIT); |
| return 1; |
| } |
| /* |
| * Parse "ring" section and create corresponding sink buffer. |
| * |
| * The function returns 0 in success case, otherwise, it returns error |
| * flags. |
| */ |
| int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) |
| { |
| int err_code = 0; |
| const char *inv; |
| size_t size = BUFSIZE; |
| struct proxy *p; |
| |
| if (strcmp(args[0], "ring") == 0) { /* new peers section */ |
| if (!*args[1]) { |
| ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| inv = invalid_char(args[1]); |
| if (inv) { |
| ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| if (sink_find(args[1])) { |
| ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| cfg_sink = sink_new_buf(args[1], args[1] , SINK_FMT_RAW, size); |
| if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) { |
| ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| /* allocate new proxy to handle forwards */ |
| p = calloc(1, sizeof *p); |
| if (!p) { |
| ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| init_new_proxy(p); |
| sink_setup_proxy(p); |
| p->parent = cfg_sink; |
| p->id = strdup(args[1]); |
| p->conf.args.file = p->conf.file = strdup(file); |
| p->conf.args.line = p->conf.line = linenum; |
| cfg_sink->forward_px = p; |
| } |
| else if (strcmp(args[0], "size") == 0) { |
| size = atol(args[1]); |
| if (!size) { |
| ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER) |
| || !ring_resize(cfg_sink->ctx.ring, size)) { |
| ha_alert("parsing [%s:%d] : fail to set sink buffer size '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| } |
| else if (strcmp(args[0],"server") == 0) { |
| err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, 1, 0); |
| } |
| else if (strcmp(args[0],"timeout") == 0) { |
| if (!cfg_sink || !cfg_sink->forward_px) { |
| ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| if (strcmp(args[1], "connect") == 0 || |
| strcmp(args[1], "server") == 0) { |
| const char *res; |
| unsigned int tout; |
| |
| if (!*args[2]) { |
| ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n", |
| file, linenum, args[0], args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| res = parse_time_err(args[2], &tout, TIME_UNIT_MS); |
| if (res == PARSE_TIME_OVER) { |
| ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n", |
| file, linenum, args[2], args[0], args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| else if (res == PARSE_TIME_UNDER) { |
| ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n", |
| file, linenum, args[2], args[0], args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| else if (res) { |
| ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n", |
| file, linenum, *res, args[0], args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| if (args[1][2] == 'c') |
| cfg_sink->forward_px->timeout.connect = tout; |
| else |
| cfg_sink->forward_px->timeout.server = tout; |
| } |
| } |
| else if (strcmp(args[0],"format") == 0) { |
| if (!cfg_sink) { |
| ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| if (strcmp(args[1], "raw") == 0) { |
| cfg_sink->fmt = SINK_FMT_RAW; |
| } |
| else if (strcmp(args[1], "short") == 0) { |
| cfg_sink->fmt = SINK_FMT_SHORT; |
| } |
| else if (strcmp(args[1], "iso") == 0) { |
| cfg_sink->fmt = SINK_FMT_ISO; |
| } |
| else if (strcmp(args[1], "timed") == 0) { |
| cfg_sink->fmt = SINK_FMT_TIMED; |
| } |
| else if (strcmp(args[1], "rfc3164") == 0) { |
| cfg_sink->fmt = SINK_FMT_RFC3164; |
| } |
| else if (strcmp(args[1], "rfc5424") == 0) { |
| cfg_sink->fmt = SINK_FMT_RFC5424; |
| } |
| else { |
| ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| } |
| else if (strcmp(args[0],"maxlen") == 0) { |
| if (!cfg_sink) { |
| ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| cfg_sink->maxlen = atol(args[1]); |
| if (!cfg_sink->maxlen) { |
| ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| } |
| else if (strcmp(args[0],"description") == 0) { |
| if (!cfg_sink) { |
| ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| if (!*args[1]) { |
| ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| free(cfg_sink->desc); |
| |
| cfg_sink->desc = strdup(args[1]); |
| if (!cfg_sink->desc) { |
| ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| } |
| else { |
| ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| err: |
| return err_code; |
| } |
| |
| /* |
| * Post parsing "ring" section. |
| * |
| * The function returns 0 in success case, otherwise, it returns error |
| * flags. |
| */ |
| int cfg_post_parse_ring() |
| { |
| int err_code = 0; |
| struct server *srv; |
| |
| if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) { |
| if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) { |
| ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n", |
| cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf)); |
| cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf); |
| err_code |= ERR_ALERT; |
| } |
| |
| /* prepare forward server descriptors */ |
| if (cfg_sink->forward_px) { |
| srv = cfg_sink->forward_px->srv; |
| while (srv) { |
| struct sink_forward_target *sft; |
| /* init ssl if needed */ |
| if (srv->use_ssl == 1 && xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->prepare_srv) { |
| if (xprt_get(XPRT_SSL)->prepare_srv(srv)) { |
| ha_alert("unable to prepare SSL for server '%s' in ring '%s'.\n", srv->id, cfg_sink->name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| } |
| |
| /* allocate sink_forward_target descriptor */ |
| sft = calloc(1, sizeof(*sft)); |
| if (!sft) { |
| ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| break; |
| } |
| sft->srv = srv; |
| sft->appctx = NULL; |
| sft->ofs = ~0; /* init ring offset */ |
| sft->next = cfg_sink->sft; |
| HA_SPIN_INIT(&sft->lock); |
| |
| /* mark server attached to the ring */ |
| if (!ring_attach(cfg_sink->ctx.ring)) { |
| ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| cfg_sink->sft = sft; |
| srv = srv->next; |
| } |
| sink_init_forward(cfg_sink); |
| } |
| } |
| cfg_sink = NULL; |
| |
| return err_code; |
| } |
| |
| /* resolve sink names at end of config. Returns 0 on success otherwise error |
| * flags. |
| */ |
| int post_sink_resolve() |
| { |
| int err_code = 0; |
| struct logsrv *logsrv, *logb; |
| struct sink *sink; |
| struct proxy *px; |
| |
| list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) { |
| if (logsrv->type == LOG_TARGET_BUFFER) { |
| sink = sink_find(logsrv->ring_name); |
| if (!sink || sink->type != SINK_TYPE_BUFFER) { |
| ha_alert("global log server uses unknown ring named '%s'.\n", logsrv->ring_name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| logsrv->sink = sink; |
| } |
| } |
| |
| for (px = proxies_list; px; px = px->next) { |
| list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) { |
| if (logsrv->type == LOG_TARGET_BUFFER) { |
| sink = sink_find(logsrv->ring_name); |
| if (!sink || sink->type != SINK_TYPE_BUFFER) { |
| ha_alert("proxy '%s' log server uses unknown ring named '%s'.\n", px->id, logsrv->ring_name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| logsrv->sink = sink; |
| } |
| } |
| } |
| return err_code; |
| } |
| |
| |
| static void sink_init() |
| { |
| sink_new_fd("stdout", "standard output (fd#1)", SINK_FMT_RAW, 1); |
| sink_new_fd("stderr", "standard output (fd#2)", SINK_FMT_RAW, 2); |
| sink_new_buf("buf0", "in-memory ring buffer", SINK_FMT_TIMED, 1048576); |
| } |
| |
| static void sink_deinit() |
| { |
| struct sink *sink, *sb; |
| |
| list_for_each_entry_safe(sink, sb, &sink_list, sink_list) { |
| if (sink->type == SINK_TYPE_BUFFER) |
| ring_free(sink->ctx.ring); |
| LIST_DEL(&sink->sink_list); |
| free(sink->name); |
| free(sink->desc); |
| free(sink); |
| } |
| } |
| |
| INITCALL0(STG_REGISTER, sink_init); |
| REGISTER_POST_DEINIT(sink_deinit); |
| |
| static struct cli_kw_list cli_kws = {{ },{ |
| { { "show", "events", NULL }, "show events [<sink>] : show event sink state", cli_parse_show_events, NULL, NULL }, |
| {{},} |
| }}; |
| |
| INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws); |
| |
| /* config parsers for this section */ |
| REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring); |
| REGISTER_POST_CHECK(post_sink_resolve); |
| |
| /* |
| * Local variables: |
| * c-indent-level: 8 |
| * c-basic-offset: 8 |
| * End: |
| */ |