blob: a718cec70aef4ac78819e5e054889c351c93107d [file] [log] [blame]
Willy Tarreau172945f2019-08-08 15:28:52 +02001/*
2 * Ring buffer management
3 *
4 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation, version 2.1
9 * exclusively.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 */
20
21#include <stdlib.h>
22#include <common/buf.h>
23#include <common/compat.h>
24#include <common/config.h>
25#include <common/hathreads.h>
Willy Tarreau072931c2019-08-27 11:55:39 +020026#include <types/applet.h>
27#include <proto/cli.h>
Willy Tarreau172945f2019-08-08 15:28:52 +020028#include <proto/ring.h>
Willy Tarreau072931c2019-08-27 11:55:39 +020029#include <proto/stream_interface.h>
Willy Tarreau172945f2019-08-08 15:28:52 +020030
31/* Creates and returns a ring buffer of size <size> bytes. Returns NULL on
32 * allocation failure.
33 */
34struct ring *ring_new(size_t size)
35{
36 struct ring *ring = NULL;
37 void *area = NULL;
38
39 if (size < 2)
40 goto fail;
41
42 ring = malloc(sizeof(*ring));
43 if (!ring)
44 goto fail;
45
46 area = malloc(size);
47 if (!area)
48 goto fail;
49
50 HA_RWLOCK_INIT(&ring->lock);
51 ring->readers_count = 0;
52 ring->ofs = 0;
53 ring->buf = b_make(area, size, 0, 0);
54 /* write the initial RC byte */
55 b_putchr(&ring->buf, 0);
56 return ring;
57 fail:
58 free(area);
59 free(ring);
60 return NULL;
61}
62
63/* Resizes existing ring <ring> to <size> which must be larger, without losing
64 * its contents. The new size must be at least as large as the previous one or
65 * no change will be performed. The pointer to the ring is returned on success,
66 * or NULL on allocation failure. This will lock the ring for writes.
67 */
68struct ring *ring_resize(struct ring *ring, size_t size)
69{
70 void *area;
71
72 if (b_size(&ring->buf) >= size)
73 return ring;
74
75 area = malloc(size);
76 if (!area)
77 return NULL;
78
79 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
80
81 /* recheck the buffer's size, it may have changed during the malloc */
82 if (b_size(&ring->buf) < size) {
83 /* copy old contents */
84 b_getblk(&ring->buf, area, ring->buf.data, 0);
85 area = HA_ATOMIC_XCHG(&ring->buf.area, area);
86 ring->buf.size = size;
87 ring->buf.head = 0;
88 }
89
90 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
91
92 free(area);
93 return ring;
94}
95
96/* destroys and frees ring <ring> */
97void ring_free(struct ring *ring)
98{
99 if (!ring)
100 return;
101 free(ring->buf.area);
102 free(ring);
103}
104
Willy Tarreaube978532019-08-27 11:44:13 +0200105/* Tries to send <npfx> parts from <prefix> followed by <nmsg> parts from <msg>
106 * to ring <ring>. The message is sent atomically. It may be truncated to
107 * <maxlen> bytes if <maxlen> is non-null. There is no distinction between the
108 * two lists, it's just a convenience to help the caller prepend some prefixes
109 * when necessary. It takes the ring's write lock to make sure no other thread
110 * will touch the buffer during the update. Returns the number of bytes sent,
111 * or <=0 on failure.
112 */
113ssize_t ring_write(struct ring *ring, size_t maxlen, const struct ist pfx[], size_t npfx, const struct ist msg[], size_t nmsg)
114{
115 struct buffer *buf = &ring->buf;
116 size_t totlen = 0;
117 size_t lenlen;
118 size_t dellen;
119 int dellenlen;
120 ssize_t sent = 0;
121 int i;
122
123 /* we have to find some room to add our message (the buffer is
124 * never empty and at least contains the previous counter) and
125 * to update both the buffer contents and heads at the same
126 * time (it's doable using atomic ops but not worth the
127 * trouble, let's just lock). For this we first need to know
128 * the total message's length. We cannot measure it while
129 * copying due to the varint encoding of the length.
130 */
131 for (i = 0; i < npfx; i++)
132 totlen += pfx[i].len;
133 for (i = 0; i < nmsg; i++)
134 totlen += msg[i].len;
135
136 if (totlen > maxlen)
137 totlen = maxlen;
138
139 lenlen = varint_bytes(totlen);
140
141 HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
142 if (lenlen + totlen + 1 + 1 > b_size(buf))
143 goto done_buf;
144
145 while (b_room(buf) < lenlen + totlen + 1) {
146 /* we need to delete the oldest message (from the end),
147 * and we have to stop if there's a reader stuck there.
148 * Unless there's corruption in the buffer it's guaranteed
149 * that we have enough data to find 1 counter byte, a
150 * varint-encoded length (1 byte min) and the message
151 * payload (0 bytes min).
152 */
153 if (*b_head(buf))
154 goto done_buf;
155 dellenlen = b_peek_varint(buf, 1, &dellen);
156 if (!dellenlen)
157 goto done_buf;
158 BUG_ON(b_data(buf) < 1 + dellenlen + dellen);
159
160 b_del(buf, 1 + dellenlen + dellen);
161 ring->ofs += 1 + dellenlen + dellen;
162 }
163
164 /* OK now we do have room */
165 __b_put_varint(buf, totlen);
166
167 totlen = 0;
168 for (i = 0; i < npfx; i++) {
169 size_t len = pfx[i].len;
170
171 if (len + totlen > maxlen)
172 len = maxlen - totlen;
173 if (len)
174 __b_putblk(buf, pfx[i].ptr, len);
175 totlen += len;
176 }
177
178 for (i = 0; i < nmsg; i++) {
179 size_t len = msg[i].len;
180
181 if (len + totlen > maxlen)
182 len = maxlen - totlen;
183 if (len)
184 __b_putblk(buf, msg[i].ptr, len);
185 totlen += len;
186 }
187
188 *b_tail(buf) = 0; buf->data++;; // new read counter
189 sent = lenlen + totlen + 1;
190 done_buf:
191 HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
192 return sent;
193}
Willy Tarreau172945f2019-08-08 15:28:52 +0200194
Willy Tarreau072931c2019-08-27 11:55:39 +0200195/* Tries to attach CLI handler <appctx> as a new reader on ring <ring>. This is
196 * meant to be used when registering a CLI function to dump a buffer, so it
197 * returns zero on success, or non-zero on failure with a message in the appctx
198 * CLI context.
199 */
200int ring_attach_cli(struct ring *ring, struct appctx *appctx)
201{
202 int users = ring->readers_count;
203
204 do {
205 if (users >= 1)
206 return cli_err(appctx,
207 "Sorry, too many watchers (255) on this ring buffer. "
208 "What could it have so interesting to attract so many watchers ?");
209
210 } while (!_HA_ATOMIC_CAS(&ring->readers_count, &users, users + 1));
211
212 appctx->ctx.cli.p0 = ring;
213 appctx->ctx.cli.p1 = 0; // start from the oldest event
214 return 0;
215}
216
217/* This function dumps all events from the ring whose pointer is in <p0> into
218 * the appctx's output buffer, and takes from <p1> the seek offset into the
219 * buffer's history (0 for oldest known event). It returns 0 if the output
220 * buffer is full and it needs to be called again, otherwise non-zero. It is
221 * meant to be used with cli_release_show_ring() to clean up.
222 */
223int cli_io_handler_show_ring(struct appctx *appctx)
224{
225 struct stream_interface *si = appctx->owner;
226 struct ring *ring = appctx->ctx.cli.p0;
227 struct buffer *buf = &ring->buf;
228 size_t ofs = (unsigned long)appctx->ctx.cli.p1;
229 uint64_t msg_len;
230 size_t len, cnt;
231 int ret;
232
233 if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
234 return 1;
235
236 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
237
238 /* explanation for the initialization below: it would be better to do
239 * this in the parsing function but this would occasionally result in
240 * dropped events because we'd take a reference on the oldest message
241 * and keep it while being scheduled. Thus instead let's take it the
242 * first time we enter here so that we have a chance to pass many
243 * existing messages before grabbing a reference to a location.
244 */
245 if (unlikely(!ofs)) {
246 HA_ATOMIC_ADD(b_head(buf), 1);
247 ofs += ring->ofs;
248 }
249
250 /* we were already there, adjust the offset to be relative to
251 * the buffer's head and remove us from the counter.
252 */
253 ofs -= ring->ofs;
254 BUG_ON(ofs >= buf->size);
255 HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
256
257 /* in this loop, ofs always points to the counter byte that precedes
258 * the message so that we can take our reference there if we have to
259 * stop before the end (ret=0).
260 */
261 ret = 1;
262 while (ofs + 1 < b_data(buf)) {
263 cnt = 1;
264 len = b_peek_varint(buf, ofs + cnt, &msg_len);
265 if (!len)
266 break;
267 cnt += len;
268 BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
269
270 if (unlikely(msg_len + 1 > b_size(&trash))) {
271 /* too large a message to ever fit, let's skip it */
272 ofs += cnt + msg_len;
273 continue;
274 }
275
276 chunk_reset(&trash);
277 len = b_getblk(buf, trash.area, msg_len, ofs + cnt);
278 trash.data += len;
279 trash.area[trash.data++] = '\n';
280
281 if (ci_putchk(si_ic(si), &trash) == -1) {
282 si_rx_room_blk(si);
283 ret = 0;
284 break;
285 }
286 ofs += cnt + msg_len;
287 }
288
289 HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
290 ofs += ring->ofs;
291 appctx->ctx.cli.p1 = (void *)ofs;
292 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
293 return ret;
294}
295
296/* must be called after cli_io_handler_show_ring() above */
297void cli_io_release_show_ring(struct appctx *appctx)
298{
299 struct ring *ring = appctx->ctx.cli.p0;
300 size_t ofs = (unsigned long)appctx->ctx.cli.p1;
301
302 if (!ring)
303 return;
304
305 HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
306 ofs -= ring->ofs;
307 BUG_ON(ofs >= b_size(&ring->buf));
308 HA_ATOMIC_SUB(b_peek(&ring->buf, ofs), 1);
309 HA_ATOMIC_SUB(&ring->readers_count, 1);
310 HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
311}
312
313
Willy Tarreau172945f2019-08-08 15:28:52 +0200314/*
315 * Local variables:
316 * c-indent-level: 8
317 * c-basic-offset: 8
318 * End:
319 */