| /* |
| * Event sink management |
| * |
| * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu |
| * |
| * This library is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Lesser General Public |
| * License as published by the Free Software Foundation, version 2.1 |
| * exclusively. |
| * |
| * This library is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Lesser General Public License for more details. |
| * |
| * You should have received a copy of the GNU Lesser General Public |
| * License along with this library; if not, write to the Free Software |
| * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
| */ |
| |
| #include <import/ist.h> |
| #include <haproxy/api.h> |
| #include <haproxy/cfgparse.h> |
| #include <haproxy/cli.h> |
| #include <haproxy/errors.h> |
| #include <haproxy/list.h> |
| #include <haproxy/log.h> |
| #include <haproxy/proxy.h> |
| #include <haproxy/ring.h> |
| #include <haproxy/signal.h> |
| #include <haproxy/sink.h> |
| #include <haproxy/stream_interface.h> |
| #include <haproxy/time.h> |
| #include <haproxy/tools.h> |
| |
| struct list sink_list = LIST_HEAD_INIT(sink_list); |
| |
| /* sink proxies list */ |
| struct proxy *sink_proxies_list; |
| |
| struct sink *cfg_sink; |
| |
| struct sink *sink_find(const char *name) |
| { |
| struct sink *sink; |
| |
| list_for_each_entry(sink, &sink_list, sink_list) |
| if (strcmp(sink->name, name) == 0) |
| return sink; |
| return NULL; |
| } |
| |
| /* creates a new sink and adds it to the list, it's still generic and not fully |
| * initialized. Returns NULL on allocation failure. If another one already |
| * exists with the same name, it will be returned. The caller can detect it as |
| * a newly created one has type SINK_TYPE_NEW. |
| */ |
| static struct sink *__sink_new(const char *name, const char *desc, int fmt) |
| { |
| struct sink *sink; |
| |
| sink = sink_find(name); |
| if (sink) |
| goto end; |
| |
| sink = calloc(1, sizeof(*sink)); |
| if (!sink) |
| goto end; |
| |
| sink->name = strdup(name); |
| if (!sink->name) |
| goto err; |
| |
| sink->desc = strdup(desc); |
| if (!sink->desc) |
| goto err; |
| |
| sink->fmt = fmt; |
| sink->type = SINK_TYPE_NEW; |
| sink->maxlen = BUFSIZE; |
| /* address will be filled by the caller if needed */ |
| sink->ctx.fd = -1; |
| sink->ctx.dropped = 0; |
| HA_RWLOCK_INIT(&sink->ctx.lock); |
| LIST_APPEND(&sink_list, &sink->sink_list); |
| end: |
| return sink; |
| |
| err: |
| ha_free(&sink->name); |
| ha_free(&sink->desc); |
| ha_free(&sink); |
| |
| return NULL; |
| } |
| |
| /* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>, |
| * and description <desc>. Returns NULL on allocation failure or conflict. |
| * Perfect duplicates are merged (same type, fd, and name). |
| */ |
| struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd) |
| { |
| struct sink *sink; |
| |
| sink = __sink_new(name, desc, fmt); |
| if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd)) |
| goto end; |
| |
| if (sink->type != SINK_TYPE_NEW) { |
| sink = NULL; |
| goto end; |
| } |
| |
| sink->type = SINK_TYPE_FD; |
| sink->ctx.fd = fd; |
| end: |
| return sink; |
| } |
| |
| /* creates a sink called <name> of type BUF of size <size>, format <fmt>, |
| * and description <desc>. Returns NULL on allocation failure or conflict. |
| * Perfect duplicates are merged (same type and name). If sizes differ, the |
| * largest one is kept. |
| */ |
| struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size) |
| { |
| struct sink *sink; |
| |
| sink = __sink_new(name, desc, fmt); |
| if (!sink) |
| goto fail; |
| |
| if (sink->type == SINK_TYPE_BUFFER) { |
| /* such a buffer already exists, we may have to resize it */ |
| if (!ring_resize(sink->ctx.ring, size)) |
| goto fail; |
| goto end; |
| } |
| |
| if (sink->type != SINK_TYPE_NEW) { |
| /* already exists of another type */ |
| goto fail; |
| } |
| |
| sink->ctx.ring = ring_new(size); |
| if (!sink->ctx.ring) { |
| LIST_DELETE(&sink->sink_list); |
| free(sink->name); |
| free(sink->desc); |
| free(sink); |
| goto fail; |
| } |
| |
| sink->type = SINK_TYPE_BUFFER; |
| end: |
| return sink; |
| fail: |
| return NULL; |
| } |
| |
| /* tries to send <nmsg> message parts (up to 8, ignored above) from message |
| * array <msg> to sink <sink>. Formatting according to the sink's preference is |
| * done here. Lost messages are NOT accounted for. It is preferable to call |
| * sink_write() instead which will also try to emit the number of dropped |
| * messages when there are any. It returns >0 if it could write anything, |
| * <=0 otherwise. |
| */ |
| ssize_t __sink_write(struct sink *sink, const struct ist msg[], size_t nmsg, |
| int level, int facility, struct ist *metadata) |
| { |
| struct ist *pfx = NULL; |
| size_t npfx = 0; |
| |
| if (sink->fmt == LOG_FORMAT_RAW) |
| goto send; |
| |
| pfx = build_log_header(sink->fmt, level, facility, metadata, &npfx); |
| |
| send: |
| if (sink->type == SINK_TYPE_FD) { |
| return fd_write_frag_line(sink->ctx.fd, sink->maxlen, pfx, npfx, msg, nmsg, 1); |
| } |
| else if (sink->type == SINK_TYPE_BUFFER) { |
| return ring_write(sink->ctx.ring, sink->maxlen, pfx, npfx, msg, nmsg); |
| } |
| return 0; |
| } |
| |
| /* Tries to emit a message indicating the number of dropped events. In case of |
| * success, the amount of drops is reduced by as much. It's supposed to be |
| * called under an exclusive lock on the sink to avoid multiple produces doing |
| * the same. On success, >0 is returned, otherwise <=0 on failure. |
| */ |
| int sink_announce_dropped(struct sink *sink, int facility) |
| { |
| static THREAD_LOCAL struct ist metadata[LOG_META_FIELDS]; |
| static THREAD_LOCAL pid_t curr_pid; |
| static THREAD_LOCAL char pidstr[16]; |
| unsigned int dropped; |
| struct buffer msg; |
| struct ist msgvec[1]; |
| char logbuf[64]; |
| |
| while (unlikely((dropped = sink->ctx.dropped) > 0)) { |
| chunk_init(&msg, logbuf, sizeof(logbuf)); |
| chunk_printf(&msg, "%u event%s dropped", dropped, dropped > 1 ? "s" : ""); |
| msgvec[0] = ist2(msg.area, msg.data); |
| |
| if (!metadata[LOG_META_HOST].len) { |
| if (global.log_send_hostname) |
| metadata[LOG_META_HOST] = ist2(global.log_send_hostname, strlen(global.log_send_hostname)); |
| } |
| |
| if (!metadata[LOG_META_TAG].len) |
| metadata[LOG_META_TAG] = ist2(global.log_tag.area, global.log_tag.data); |
| |
| if (unlikely(curr_pid != getpid())) |
| metadata[LOG_META_PID].len = 0; |
| |
| if (!metadata[LOG_META_PID].len) { |
| curr_pid = getpid(); |
| ltoa_o(curr_pid, pidstr, sizeof(pidstr)); |
| metadata[LOG_META_PID] = ist2(pidstr, strlen(pidstr)); |
| } |
| |
| if (__sink_write(sink, msgvec, 1, LOG_NOTICE, facility, metadata) <= 0) |
| return 0; |
| /* success! */ |
| HA_ATOMIC_SUB(&sink->ctx.dropped, dropped); |
| } |
| return 1; |
| } |
| |
| /* parse the "show events" command, returns 1 if a message is returned, otherwise zero */ |
| static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private) |
| { |
| struct sink *sink; |
| int arg; |
| |
| args++; // make args[1] the 1st arg |
| |
| if (!*args[1]) { |
| /* no arg => report the list of supported sink */ |
| chunk_printf(&trash, "Supported events sinks are listed below. Add -w(wait), -n(new). Any key to stop\n"); |
| list_for_each_entry(sink, &sink_list, sink_list) { |
| chunk_appendf(&trash, " %-10s : type=%s, %u dropped, %s\n", |
| sink->name, |
| sink->type == SINK_TYPE_NEW ? "init" : |
| sink->type == SINK_TYPE_FD ? "fd" : |
| sink->type == SINK_TYPE_BUFFER ? "buffer" : "?", |
| sink->ctx.dropped, sink->desc); |
| } |
| |
| trash.area[trash.data] = 0; |
| return cli_msg(appctx, LOG_WARNING, trash.area); |
| } |
| |
| if (!cli_has_level(appctx, ACCESS_LVL_OPER)) |
| return 1; |
| |
| sink = sink_find(args[1]); |
| if (!sink) |
| return cli_err(appctx, "No such event sink"); |
| |
| if (sink->type != SINK_TYPE_BUFFER) |
| return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink"); |
| |
| for (arg = 2; *args[arg]; arg++) { |
| if (strcmp(args[arg], "-w") == 0) |
| appctx->ctx.cli.i0 |= 1; // wait mode |
| else if (strcmp(args[arg], "-n") == 0) |
| appctx->ctx.cli.i0 |= 2; // seek to new |
| else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0) |
| appctx->ctx.cli.i0 |= 3; // seek to new + wait |
| else |
| return cli_err(appctx, "unknown option"); |
| } |
| return ring_attach_cli(sink->ctx.ring, appctx); |
| } |
| |
| /* Pre-configures a ring proxy to emit connections */ |
| void sink_setup_proxy(struct proxy *px) |
| { |
| px->last_change = now.tv_sec; |
| px->cap = PR_CAP_FE | PR_CAP_BE; |
| px->maxconn = 0; |
| px->conn_retries = 1; |
| px->timeout.server = TICK_ETERNITY; |
| px->timeout.client = TICK_ETERNITY; |
| px->timeout.connect = TICK_ETERNITY; |
| px->accept = NULL; |
| px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC; |
| px->bind_proc = 0; /* will be filled by users */ |
| px->next = sink_proxies_list; |
| sink_proxies_list = px; |
| } |
| |
| /* |
| * IO Handler to handle message push to syslog tcp server |
| */ |
| static void sink_forward_io_handler(struct appctx *appctx) |
| { |
| struct stream_interface *si = appctx->owner; |
| struct stream *s = si_strm(si); |
| struct sink *sink = strm_fe(s)->parent; |
| struct sink_forward_target *sft = appctx->ctx.sft.ptr; |
| struct ring *ring = sink->ctx.ring; |
| struct buffer *buf = &ring->buf; |
| uint64_t msg_len; |
| size_t len, cnt, ofs, last_ofs; |
| int ret = 0; |
| |
| /* if stopping was requested, close immediately */ |
| if (unlikely(stopping)) |
| goto close; |
| |
| /* for rex because it seems reset to timeout |
| * and we don't want expire on this case |
| * with a syslog server |
| */ |
| si_oc(si)->rex = TICK_ETERNITY; |
| /* rto should not change but it seems the case */ |
| si_oc(si)->rto = TICK_ETERNITY; |
| |
| /* an error was detected */ |
| if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW))) |
| goto close; |
| |
| /* con closed by server side */ |
| if ((si_oc(si)->flags & CF_SHUTW)) |
| goto close; |
| |
| /* if the connection is not established, inform the stream that we want |
| * to be notified whenever the connection completes. |
| */ |
| if (si_opposite(si)->state < SI_ST_EST) { |
| si_cant_get(si); |
| si_rx_conn_blk(si); |
| si_rx_endp_more(si); |
| return; |
| } |
| |
| HA_SPIN_LOCK(SFT_LOCK, &sft->lock); |
| if (appctx != sft->appctx) { |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| goto close; |
| } |
| ofs = sft->ofs; |
| |
| HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); |
| LIST_DEL_INIT(&appctx->wait_entry); |
| HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| /* explanation for the initialization below: it would be better to do |
| * this in the parsing function but this would occasionally result in |
| * dropped events because we'd take a reference on the oldest message |
| * and keep it while being scheduled. Thus instead let's take it the |
| * first time we enter here so that we have a chance to pass many |
| * existing messages before grabbing a reference to a location. This |
| * value cannot be produced after initialization. |
| */ |
| if (unlikely(ofs == ~0)) { |
| ofs = 0; |
| |
| HA_ATOMIC_INC(b_peek(buf, ofs)); |
| ofs += ring->ofs; |
| } |
| |
| /* in this loop, ofs always points to the counter byte that precedes |
| * the message so that we can take our reference there if we have to |
| * stop before the end (ret=0). |
| */ |
| if (si_opposite(si)->state == SI_ST_EST) { |
| /* we were already there, adjust the offset to be relative to |
| * the buffer's head and remove us from the counter. |
| */ |
| ofs -= ring->ofs; |
| BUG_ON(ofs >= buf->size); |
| HA_ATOMIC_DEC(b_peek(buf, ofs)); |
| |
| ret = 1; |
| while (ofs + 1 < b_data(buf)) { |
| cnt = 1; |
| len = b_peek_varint(buf, ofs + cnt, &msg_len); |
| if (!len) |
| break; |
| cnt += len; |
| BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); |
| |
| if (unlikely(msg_len + 1 > b_size(&trash))) { |
| /* too large a message to ever fit, let's skip it */ |
| ofs += cnt + msg_len; |
| continue; |
| } |
| |
| chunk_reset(&trash); |
| len = b_getblk(buf, trash.area, msg_len, ofs + cnt); |
| trash.data += len; |
| trash.area[trash.data++] = '\n'; |
| |
| if (ci_putchk(si_ic(si), &trash) == -1) { |
| si_rx_room_blk(si); |
| ret = 0; |
| break; |
| } |
| ofs += cnt + msg_len; |
| } |
| |
| HA_ATOMIC_INC(b_peek(buf, ofs)); |
| ofs += ring->ofs; |
| sft->ofs = ofs; |
| last_ofs = ring->ofs; |
| } |
| HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| if (ret) { |
| /* let's be woken up once new data arrive */ |
| HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); |
| LIST_APPEND(&ring->waiters, &appctx->wait_entry); |
| ofs = ring->ofs; |
| HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); |
| if (ofs != last_ofs) { |
| /* more data was added into the ring between the |
| * unlock and the lock, and the writer might not |
| * have seen us. We need to reschedule a read. |
| */ |
| si_rx_endp_more(si); |
| } else |
| si_rx_endp_done(si); |
| } |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| |
| /* always drain data from server */ |
| co_skip(si_oc(si), si_oc(si)->output); |
| return; |
| |
| close: |
| si_shutw(si); |
| si_shutr(si); |
| si_ic(si)->flags |= CF_READ_NULL; |
| } |
| |
| /* |
| * IO Handler to handle message push to syslog tcp server |
| * using octet counting frames |
| */ |
| static void sink_forward_oc_io_handler(struct appctx *appctx) |
| { |
| struct stream_interface *si = appctx->owner; |
| struct stream *s = si_strm(si); |
| struct sink *sink = strm_fe(s)->parent; |
| struct sink_forward_target *sft = appctx->ctx.sft.ptr; |
| struct ring *ring = sink->ctx.ring; |
| struct buffer *buf = &ring->buf; |
| uint64_t msg_len; |
| size_t len, cnt, ofs; |
| int ret = 0; |
| char *p; |
| |
| /* if stopping was requested, close immediately */ |
| if (unlikely(stopping)) |
| goto close; |
| |
| /* for rex because it seems reset to timeout |
| * and we don't want expire on this case |
| * with a syslog server |
| */ |
| si_oc(si)->rex = TICK_ETERNITY; |
| /* rto should not change but it seems the case */ |
| si_oc(si)->rto = TICK_ETERNITY; |
| |
| /* an error was detected */ |
| if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW))) |
| goto close; |
| |
| /* con closed by server side */ |
| if ((si_oc(si)->flags & CF_SHUTW)) |
| goto close; |
| |
| /* if the connection is not established, inform the stream that we want |
| * to be notified whenever the connection completes. |
| */ |
| if (si_opposite(si)->state < SI_ST_EST) { |
| si_cant_get(si); |
| si_rx_conn_blk(si); |
| si_rx_endp_more(si); |
| return; |
| } |
| |
| HA_SPIN_LOCK(SFT_LOCK, &sft->lock); |
| if (appctx != sft->appctx) { |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| goto close; |
| } |
| ofs = sft->ofs; |
| |
| HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); |
| LIST_DEL_INIT(&appctx->wait_entry); |
| HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| /* explanation for the initialization below: it would be better to do |
| * this in the parsing function but this would occasionally result in |
| * dropped events because we'd take a reference on the oldest message |
| * and keep it while being scheduled. Thus instead let's take it the |
| * first time we enter here so that we have a chance to pass many |
| * existing messages before grabbing a reference to a location. This |
| * value cannot be produced after initialization. |
| */ |
| if (unlikely(ofs == ~0)) { |
| ofs = 0; |
| |
| HA_ATOMIC_INC(b_peek(buf, ofs)); |
| ofs += ring->ofs; |
| } |
| |
| /* in this loop, ofs always points to the counter byte that precedes |
| * the message so that we can take our reference there if we have to |
| * stop before the end (ret=0). |
| */ |
| if (si_opposite(si)->state == SI_ST_EST) { |
| /* we were already there, adjust the offset to be relative to |
| * the buffer's head and remove us from the counter. |
| */ |
| ofs -= ring->ofs; |
| BUG_ON(ofs >= buf->size); |
| HA_ATOMIC_DEC(b_peek(buf, ofs)); |
| |
| ret = 1; |
| while (ofs + 1 < b_data(buf)) { |
| cnt = 1; |
| len = b_peek_varint(buf, ofs + cnt, &msg_len); |
| if (!len) |
| break; |
| cnt += len; |
| BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf)); |
| |
| chunk_reset(&trash); |
| p = ulltoa(msg_len, trash.area, b_size(&trash)); |
| if (p) { |
| trash.data = (p - trash.area) + 1; |
| *p = ' '; |
| } |
| |
| if (!p || (trash.data + msg_len > b_size(&trash))) { |
| /* too large a message to ever fit, let's skip it */ |
| ofs += cnt + msg_len; |
| continue; |
| } |
| |
| trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt); |
| |
| if (ci_putchk(si_ic(si), &trash) == -1) { |
| si_rx_room_blk(si); |
| ret = 0; |
| break; |
| } |
| ofs += cnt + msg_len; |
| } |
| |
| HA_ATOMIC_INC(b_peek(buf, ofs)); |
| ofs += ring->ofs; |
| sft->ofs = ofs; |
| } |
| HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock); |
| |
| if (ret) { |
| /* let's be woken up once new data arrive */ |
| HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock); |
| LIST_APPEND(&ring->waiters, &appctx->wait_entry); |
| HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock); |
| si_rx_endp_done(si); |
| } |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| |
| /* always drain data from server */ |
| co_skip(si_oc(si), si_oc(si)->output); |
| return; |
| |
| close: |
| si_shutw(si); |
| si_shutr(si); |
| si_ic(si)->flags |= CF_READ_NULL; |
| } |
| |
| void __sink_forward_session_deinit(struct sink_forward_target *sft) |
| { |
| struct stream_interface *si; |
| struct stream *s; |
| struct sink *sink; |
| |
| if (!sft->appctx) |
| return; |
| |
| si = sft->appctx->owner; |
| if (!si) |
| return; |
| |
| s = si_strm(si); |
| if (!s) |
| return; |
| |
| sink = strm_fe(s)->parent; |
| if (!sink) |
| return; |
| |
| HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock); |
| LIST_DEL_INIT(&sft->appctx->wait_entry); |
| HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &sink->ctx.ring->lock); |
| |
| sft->appctx = NULL; |
| task_wakeup(sink->forward_task, TASK_WOKEN_MSG); |
| } |
| |
| |
| static void sink_forward_session_release(struct appctx *appctx) |
| { |
| struct sink_forward_target *sft = appctx->ctx.sft.ptr; |
| |
| if (!sft) |
| return; |
| |
| HA_SPIN_LOCK(SFT_LOCK, &sft->lock); |
| if (sft->appctx == appctx) |
| __sink_forward_session_deinit(sft); |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| } |
| |
| static struct applet sink_forward_applet = { |
| .obj_type = OBJ_TYPE_APPLET, |
| .name = "<SINKFWD>", /* used for logging */ |
| .fct = sink_forward_io_handler, |
| .release = sink_forward_session_release, |
| }; |
| |
| static struct applet sink_forward_oc_applet = { |
| .obj_type = OBJ_TYPE_APPLET, |
| .name = "<SINKFWDOC>", /* used for logging */ |
| .fct = sink_forward_oc_io_handler, |
| .release = sink_forward_session_release, |
| }; |
| |
| /* |
| * Create a new peer session in assigned state (connect will start automatically) |
| */ |
| static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft) |
| { |
| struct proxy *p = sink->forward_px; |
| struct appctx *appctx; |
| struct session *sess; |
| struct stream *s; |
| struct applet *applet = &sink_forward_applet; |
| |
| if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING) |
| applet = &sink_forward_oc_applet; |
| |
| appctx = appctx_new(applet, tid_bit); |
| if (!appctx) |
| goto out_close; |
| |
| appctx->ctx.sft.ptr = (void *)sft; |
| |
| sess = session_new(p, NULL, &appctx->obj_type); |
| if (!sess) { |
| ha_alert("out of memory in peer_session_create().\n"); |
| goto out_free_appctx; |
| } |
| |
| if ((s = stream_new(sess, &appctx->obj_type, &BUF_NULL)) == NULL) { |
| ha_alert("Failed to initialize stream in peer_session_create().\n"); |
| goto out_free_sess; |
| } |
| |
| |
| s->target = &sft->srv->obj_type; |
| if (!sockaddr_alloc(&s->target_addr, &sft->srv->addr, sizeof(sft->srv->addr))) |
| goto out_free_strm; |
| s->flags = SF_ASSIGNED|SF_ADDR_SET; |
| s->si[1].flags |= SI_FL_NOLINGER; |
| |
| s->do_log = NULL; |
| s->uniq_id = 0; |
| |
| s->res.flags |= CF_READ_DONTWAIT; |
| /* for rto and rex to eternity to not expire on idle recv: |
| * We are using a syslog server. |
| */ |
| s->res.rto = TICK_ETERNITY; |
| s->res.rex = TICK_ETERNITY; |
| sft->appctx = appctx; |
| task_wakeup(s->task, TASK_WOKEN_INIT); |
| return appctx; |
| |
| /* Error unrolling */ |
| out_free_strm: |
| LIST_DELETE(&s->list); |
| pool_free(pool_head_stream, s); |
| out_free_sess: |
| session_free(sess); |
| out_free_appctx: |
| appctx_free(appctx); |
| out_close: |
| return NULL; |
| } |
| |
| /* |
| * Task to handle connctions to forward servers |
| */ |
| static struct task *process_sink_forward(struct task * task, void *context, unsigned int state) |
| { |
| struct sink *sink = (struct sink *)context; |
| struct sink_forward_target *sft = sink->sft; |
| |
| task->expire = TICK_ETERNITY; |
| |
| if (!stopping) { |
| while (sft) { |
| HA_SPIN_LOCK(SFT_LOCK, &sft->lock); |
| /* if appctx is NULL, start a new session */ |
| if (!sft->appctx) |
| sft->appctx = sink_forward_session_create(sink, sft); |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| sft = sft->next; |
| } |
| } |
| else { |
| while (sft) { |
| HA_SPIN_LOCK(SFT_LOCK, &sft->lock); |
| /* awake applet to perform a clean close */ |
| if (sft->appctx) |
| appctx_wakeup(sft->appctx); |
| HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); |
| sft = sft->next; |
| } |
| } |
| |
| return task; |
| } |
| /* |
| * Init task to manage connctions to forward servers |
| * |
| * returns 0 in case of error. |
| */ |
| int sink_init_forward(struct sink *sink) |
| { |
| sink->forward_task = task_new(MAX_THREADS_MASK); |
| if (!sink->forward_task) |
| return 0; |
| |
| sink->forward_task->process = process_sink_forward; |
| sink->forward_task->context = (void *)sink; |
| sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0); |
| task_wakeup(sink->forward_task, TASK_WOKEN_INIT); |
| return 1; |
| } |
| /* |
| * Parse "ring" section and create corresponding sink buffer. |
| * |
| * The function returns 0 in success case, otherwise, it returns error |
| * flags. |
| */ |
| int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) |
| { |
| int err_code = 0; |
| const char *inv; |
| size_t size = BUFSIZE; |
| struct proxy *p; |
| |
| if (strcmp(args[0], "ring") == 0) { /* new ring section */ |
| if (!*args[1]) { |
| ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| inv = invalid_char(args[1]); |
| if (inv) { |
| ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| if (sink_find(args[1])) { |
| ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size); |
| if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) { |
| ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| /* allocate new proxy to handle forwards */ |
| p = calloc(1, sizeof *p); |
| if (!p) { |
| ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| init_new_proxy(p); |
| sink_setup_proxy(p); |
| p->parent = cfg_sink; |
| p->id = strdup(args[1]); |
| p->conf.args.file = p->conf.file = strdup(file); |
| p->conf.args.line = p->conf.line = linenum; |
| cfg_sink->forward_px = p; |
| } |
| else if (strcmp(args[0], "size") == 0) { |
| if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) { |
| ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| size = atol(args[1]); |
| if (!size) { |
| ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| if (size < cfg_sink->ctx.ring->buf.size) { |
| ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than current size '%llu' for ring '%s'.\n", |
| file, linenum, (ullong)size, (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| if (!ring_resize(cfg_sink->ctx.ring, size)) { |
| ha_alert("parsing [%s:%d] : fail to set sink buffer size '%llu' for ring '%s'.\n", file, linenum, |
| (ullong)cfg_sink->ctx.ring->buf.size, cfg_sink->name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| } |
| else if (strcmp(args[0],"server") == 0) { |
| err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL, |
| SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE); |
| } |
| else if (strcmp(args[0],"timeout") == 0) { |
| if (!cfg_sink || !cfg_sink->forward_px) { |
| ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| if (strcmp(args[1], "connect") == 0 || |
| strcmp(args[1], "server") == 0) { |
| const char *res; |
| unsigned int tout; |
| |
| if (!*args[2]) { |
| ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n", |
| file, linenum, args[0], args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| res = parse_time_err(args[2], &tout, TIME_UNIT_MS); |
| if (res == PARSE_TIME_OVER) { |
| ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n", |
| file, linenum, args[2], args[0], args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| else if (res == PARSE_TIME_UNDER) { |
| ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n", |
| file, linenum, args[2], args[0], args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| else if (res) { |
| ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n", |
| file, linenum, *res, args[0], args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| if (args[1][0] == 'c') |
| cfg_sink->forward_px->timeout.connect = tout; |
| else |
| cfg_sink->forward_px->timeout.server = tout; |
| } |
| } |
| else if (strcmp(args[0],"format") == 0) { |
| if (!cfg_sink) { |
| ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| cfg_sink->fmt = get_log_format(args[1]); |
| if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) { |
| ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| } |
| else if (strcmp(args[0],"maxlen") == 0) { |
| if (!cfg_sink) { |
| ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| cfg_sink->maxlen = atol(args[1]); |
| if (!cfg_sink->maxlen) { |
| ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| } |
| else if (strcmp(args[0],"description") == 0) { |
| if (!cfg_sink) { |
| ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| if (!*args[1]) { |
| ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| free(cfg_sink->desc); |
| |
| cfg_sink->desc = strdup(args[1]); |
| if (!cfg_sink->desc) { |
| ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| } |
| else { |
| ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| goto err; |
| } |
| |
| err: |
| return err_code; |
| } |
| |
| /* Creates an new sink buffer from a log server. |
| * |
| * It uses the logsrvaddress to declare a forward |
| * server for this buffer. And it initializes the |
| * forwarding. |
| * |
| * The function returns a pointer on the |
| * allocated struct sink if allocate |
| * and initialize succeed, else if it fails |
| * it returns NULL. |
| * |
| * Note: the sink is created using the name |
| * specified into logsrv->ring_name |
| */ |
| struct sink *sink_new_from_logsrv(struct logsrv *logsrv) |
| { |
| struct proxy *p = NULL; |
| struct sink *sink = NULL; |
| struct server *srv = NULL; |
| struct sink_forward_target *sft = NULL; |
| int i; |
| |
| /* allocate new proxy to handle |
| * forward to a stream server |
| */ |
| p = calloc(1, sizeof *p); |
| if (!p) { |
| goto error; |
| } |
| |
| init_new_proxy(p); |
| sink_setup_proxy(p); |
| p->id = strdup(logsrv->ring_name); |
| p->conf.args.file = p->conf.file = strdup(logsrv->conf.file); |
| p->conf.args.line = p->conf.line = logsrv->conf.line; |
| |
| /* allocate a new server to forward messages |
| * from ring buffer |
| */ |
| srv = new_server(p); |
| if (!srv) |
| goto error; |
| |
| /* init server */ |
| srv->id = strdup(logsrv->ring_name); |
| srv->conf.file = strdup(logsrv->conf.file); |
| srv->conf.line = logsrv->conf.line; |
| srv->addr = logsrv->addr; |
| srv->svc_port = get_host_port(&logsrv->addr); |
| HA_SPIN_INIT(&srv->lock); |
| |
| /* process per thread init */ |
| srv->per_thr = calloc(global.nbthread, sizeof(*srv->per_thr)); |
| if (!srv->per_thr) |
| goto error; |
| |
| for (i = 0; i < global.nbthread; i++) { |
| srv->per_thr[i].idle_conns = EB_ROOT; |
| srv->per_thr[i].safe_conns = EB_ROOT; |
| srv->per_thr[i].avail_conns = EB_ROOT; |
| MT_LIST_INIT(&srv->per_thr[i].streams); |
| } |
| |
| /* the servers are linked backwards |
| * first into proxy |
| */ |
| p->srv = srv; |
| srv->next = p->srv; |
| |
| /* allocate sink_forward_target descriptor */ |
| sft = calloc(1, sizeof(*sft)); |
| if (!sft) |
| goto error; |
| |
| /* init sink_forward_target offset */ |
| sft->srv = srv; |
| sft->appctx = NULL; |
| sft->ofs = ~0; |
| HA_SPIN_INIT(&sft->lock); |
| |
| /* prepare description for the sink */ |
| chunk_reset(&trash); |
| chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line); |
| |
| /* allocate a new sink buffer */ |
| sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE); |
| if (!sink || sink->type != SINK_TYPE_BUFFER) { |
| goto error; |
| } |
| |
| /* link sink_forward_target to proxy */ |
| sink->forward_px = p; |
| p->parent = sink; |
| |
| /* insert into sink_forward_targets |
| * list into sink |
| */ |
| sft->next = sink->sft; |
| sink->sft = sft; |
| |
| /* mark server as an attached reader to the ring */ |
| if (!ring_attach(sink->ctx.ring)) { |
| /* should never fail since there is |
| * only one reader |
| */ |
| goto error; |
| } |
| |
| /* initialize sink buffer forwarding */ |
| if (!sink_init_forward(sink)) |
| goto error; |
| |
| /* reset familyt of logsrv to consider the ring buffer target */ |
| logsrv->addr.ss_family = AF_UNSPEC; |
| |
| return sink; |
| error: |
| if (p) { |
| if (p->id) |
| free(p->id); |
| if (p->conf.file) |
| free(p->conf.file); |
| |
| free(p); |
| } |
| |
| if (srv) { |
| if (srv->id) |
| free(srv->id); |
| if (srv->conf.file) |
| free((void *)srv->conf.file); |
| if (srv->per_thr) |
| free(srv->per_thr); |
| free(srv); |
| } |
| |
| if (sft) |
| free(sft); |
| |
| if (sink) { |
| if (sink->ctx.ring) |
| ring_free(sink->ctx.ring); |
| |
| LIST_DELETE(&sink->sink_list); |
| free(sink->name); |
| free(sink->desc); |
| free(sink); |
| } |
| |
| return NULL; |
| } |
| |
| /* |
| * Post parsing "ring" section. |
| * |
| * The function returns 0 in success case, otherwise, it returns error |
| * flags. |
| */ |
| int cfg_post_parse_ring() |
| { |
| int err_code = 0; |
| struct server *srv; |
| |
| if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) { |
| if (cfg_sink->maxlen > b_size(&cfg_sink->ctx.ring->buf)) { |
| ha_warning("ring '%s' event max length '%u' exceeds size, forced to size '%lu'.\n", |
| cfg_sink->name, cfg_sink->maxlen, (unsigned long)b_size(&cfg_sink->ctx.ring->buf)); |
| cfg_sink->maxlen = b_size(&cfg_sink->ctx.ring->buf); |
| err_code |= ERR_ALERT; |
| } |
| |
| /* prepare forward server descriptors */ |
| if (cfg_sink->forward_px) { |
| srv = cfg_sink->forward_px->srv; |
| while (srv) { |
| struct sink_forward_target *sft; |
| |
| /* allocate sink_forward_target descriptor */ |
| sft = calloc(1, sizeof(*sft)); |
| if (!sft) { |
| ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| break; |
| } |
| sft->srv = srv; |
| sft->appctx = NULL; |
| sft->ofs = ~0; /* init ring offset */ |
| sft->next = cfg_sink->sft; |
| HA_SPIN_INIT(&sft->lock); |
| |
| /* mark server attached to the ring */ |
| if (!ring_attach(cfg_sink->ctx.ring)) { |
| ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| cfg_sink->sft = sft; |
| srv = srv->next; |
| } |
| sink_init_forward(cfg_sink); |
| } |
| } |
| cfg_sink = NULL; |
| |
| return err_code; |
| } |
| |
| /* resolve sink names at end of config. Returns 0 on success otherwise error |
| * flags. |
| */ |
| int post_sink_resolve() |
| { |
| int err_code = ERR_NONE; |
| struct logsrv *logsrv, *logb; |
| struct sink *sink; |
| struct proxy *px; |
| |
| list_for_each_entry_safe(logsrv, logb, &global.logsrvs, list) { |
| if (logsrv->type == LOG_TARGET_BUFFER) { |
| sink = sink_find(logsrv->ring_name); |
| if (!sink) { |
| /* LOG_TARGET_BUFFER but !AF_UNSPEC |
| * means we must allocate a sink |
| * buffer to send messages to this logsrv |
| */ |
| if (logsrv->addr.ss_family != AF_UNSPEC) { |
| sink = sink_new_from_logsrv(logsrv); |
| if (!sink) { |
| ha_alert("global stream log server declared in file '%s' at line %d cannot be initialized'.\n", |
| logsrv->conf.file, logsrv->conf.line); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| } |
| else { |
| ha_alert("global log server declared in file '%s' at line %d uses unknown ring named '%s'.\n", |
| logsrv->conf.file, logsrv->conf.line, logsrv->ring_name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| } |
| else if (sink->type != SINK_TYPE_BUFFER) { |
| ha_alert("global log server declared in file '%s' at line %d uses incompatible ring '%s'.\n", |
| logsrv->conf.file, logsrv->conf.line, logsrv->ring_name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| logsrv->sink = sink; |
| } |
| |
| } |
| |
| for (px = proxies_list; px; px = px->next) { |
| list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) { |
| if (logsrv->type == LOG_TARGET_BUFFER) { |
| sink = sink_find(logsrv->ring_name); |
| if (!sink) { |
| /* LOG_TARGET_BUFFER but !AF_UNSPEC |
| * means we must allocate a sink |
| * buffer to send messages to this logsrv |
| */ |
| if (logsrv->addr.ss_family != AF_UNSPEC) { |
| sink = sink_new_from_logsrv(logsrv); |
| if (!sink) { |
| ha_alert("log server declared in proxy section '%s' file '%s' at line %d cannot be initialized'.\n", |
| px->id, logsrv->conf.file, logsrv->conf.line); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| } |
| else { |
| ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n", |
| px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| } |
| else if (sink->type != SINK_TYPE_BUFFER) { |
| ha_alert("log server declared in proxy section '%s' in file '%s' at line %d uses incomatible ring named '%s'.\n", |
| px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| logsrv->sink = sink; |
| } |
| } |
| } |
| |
| for (px = cfg_log_forward; px; px = px->next) { |
| list_for_each_entry_safe(logsrv, logb, &px->logsrvs, list) { |
| if (logsrv->type == LOG_TARGET_BUFFER) { |
| sink = sink_find(logsrv->ring_name); |
| if (!sink) { |
| /* LOG_TARGET_BUFFER but !AF_UNSPEC |
| * means we must allocate a sink |
| * buffer to send messages to this logsrv |
| */ |
| if (logsrv->addr.ss_family != AF_UNSPEC) { |
| sink = sink_new_from_logsrv(logsrv); |
| if (!sink) { |
| ha_alert("log server declared in log-forward section '%s' file '%s' at line %d cannot be initialized'.\n", |
| px->id, logsrv->conf.file, logsrv->conf.line); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| } |
| else { |
| ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n", |
| px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| } |
| else if (sink->type != SINK_TYPE_BUFFER) { |
| ha_alert("log server declared in log-forward section '%s' in file '%s' at line %d uses unknown ring named '%s'.\n", |
| px->id, logsrv->conf.file, logsrv->conf.line, logsrv->ring_name); |
| err_code |= ERR_ALERT | ERR_FATAL; |
| } |
| logsrv->sink = sink; |
| } |
| } |
| } |
| return err_code; |
| } |
| |
| |
| static void sink_init() |
| { |
| sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1); |
| sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2); |
| sink_new_buf("buf0", "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576); |
| } |
| |
| static void sink_deinit() |
| { |
| struct sink *sink, *sb; |
| |
| list_for_each_entry_safe(sink, sb, &sink_list, sink_list) { |
| if (sink->type == SINK_TYPE_BUFFER) |
| ring_free(sink->ctx.ring); |
| LIST_DELETE(&sink->sink_list); |
| free(sink->name); |
| free(sink->desc); |
| free(sink); |
| } |
| } |
| |
| INITCALL0(STG_REGISTER, sink_init); |
| REGISTER_POST_DEINIT(sink_deinit); |
| |
| static struct cli_kw_list cli_kws = {{ },{ |
| { { "show", "events", NULL }, "show events [<sink>] [-w] [-n] : show event sink state", cli_parse_show_events, NULL, NULL }, |
| {{},} |
| }}; |
| |
| INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws); |
| |
| /* config parsers for this section */ |
| REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring); |
| REGISTER_POST_CHECK(post_sink_resolve); |
| |
| /* |
| * Local variables: |
| * c-indent-level: 8 |
| * c-basic-offset: 8 |
| * End: |
| */ |