blob: 73b71338570cde4948453a3e6d45e16043e3dc89 [file] [log] [blame]
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001/*
2 * A Random IP reputation service acting as a Stream Processing Offload Agent
3 *
4 * This is a very simple service that implement a "random" ip reputation
5 * service. It will return random scores for all checked IP addresses. It only
6 * shows you how to implement a ip reputation service or such kind of services
7 * using the SPOE.
8 *
9 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010010 * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010011 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version
15 * 2 of the License, or (at your option) any later version.
16 *
17 */
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdbool.h>
22#include <unistd.h>
23#include <signal.h>
24#include <pthread.h>
25#include <sys/time.h>
26#include <sys/types.h>
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +010027#include <sys/wait.h>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010028#include <sys/socket.h>
29#include <netinet/in.h>
30#include <netinet/tcp.h>
31#include <arpa/inet.h>
32
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010033#include "spoa.h"
34
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010035#define DEFAULT_PORT 12345
36#define NUM_WORKERS 5
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010037
38#define SLEN(str) (sizeof(str)-1)
39
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010040/* Frame Types sent by HAProxy and by agents */
41enum spoe_frame_type {
42 /* Frames sent by HAProxy */
43 SPOE_FRM_T_HAPROXY_HELLO = 1,
44 SPOE_FRM_T_HAPROXY_DISCON,
45 SPOE_FRM_T_HAPROXY_NOTIFY,
46
47 /* Frames sent by the agents */
48 SPOE_FRM_T_AGENT_HELLO = 101,
49 SPOE_FRM_T_AGENT_DISCON,
50 SPOE_FRM_T_AGENT_ACK
51};
52
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010053/* Errors triggerd by SPOE applet */
54enum spoe_frame_error {
55 SPOE_FRM_ERR_NONE = 0,
56 SPOE_FRM_ERR_IO,
57 SPOE_FRM_ERR_TOUT,
58 SPOE_FRM_ERR_TOO_BIG,
59 SPOE_FRM_ERR_INVALID,
60 SPOE_FRM_ERR_NO_VSN,
61 SPOE_FRM_ERR_NO_FRAME_SIZE,
62 SPOE_FRM_ERR_NO_CAP,
63 SPOE_FRM_ERR_BAD_VSN,
64 SPOE_FRM_ERR_BAD_FRAME_SIZE,
65 SPOE_FRM_ERR_UNKNOWN = 99,
66 SPOE_FRM_ERRS,
67};
68
69/* All supported SPOE actions */
70enum spoe_action_type {
71 SPOE_ACT_T_SET_VAR = 1,
72 SPOE_ACT_T_UNSET_VAR,
73 SPOE_ACT_TYPES,
74};
75
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010076
77/* Masks to get data type or flags value */
78#define SPOE_DATA_T_MASK 0x0F
79#define SPOE_DATA_FL_MASK 0xF0
80
81/* Flags to set Boolean values */
82#define SPOE_DATA_FL_FALSE 0x00
83#define SPOE_DATA_FL_TRUE 0x10
84static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
85 [SPOE_FRM_ERR_NONE] = "normal",
86 [SPOE_FRM_ERR_IO] = "I/O error",
87 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
88 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
89 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
90 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
91 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
92 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
93 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
94 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
95 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
96};
97
Thierry FOURNIER880d7e12018-02-25 10:54:56 +010098bool debug = false;
99pthread_key_t worker_id;
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100100static struct ps *ps_list = NULL;
Thierry FOURNIER892f6642018-02-23 14:27:05 +0100101static struct ps_message *ps_messages = NULL;
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100102
103void ps_register(struct ps *ps)
104{
105 ps->next = ps_list;
106 ps_list = ps;
107}
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100108
Thierry FOURNIER892f6642018-02-23 14:27:05 +0100109void ps_register_message(struct ps *ps, const char *name, void *ref)
110{
111 struct ps_message *msg;
112
113 /* Look for already registered name */
114 for (msg = ps_messages; msg; msg = msg->next) {
115 if (strcmp(name, msg->name) == 0) {
116 LOG("Message \"%s\" already registered\n", name);
117 exit(EXIT_FAILURE);
118 }
119 }
120
121 msg = calloc(1, sizeof(*msg));
122 if (msg == NULL) {
123 LOG("Out of memory error\n");
124 exit(EXIT_FAILURE);
125 }
126
127 msg->next = ps_messages;
128 ps_messages = msg;
129 msg->name = strdup(name);
130 if (msg->name == NULL) {
131 LOG("Out of memory error\n");
132 exit(EXIT_FAILURE);
133 }
134 msg->ref = ref;
135 msg->ps = ps;
136}
137
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100138static void
139check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
140{
141 char str[INET_ADDRSTRLEN];
142
143 if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
144 return;
145
146 w->ip_score = random() % 100;
147
148 DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
149}
150
151static void
152check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
153{
154 char str[INET6_ADDRSTRLEN];
155
156 if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
157 return;
158
159 w->ip_score = random() % 100;
160
161 DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
162}
163
164static int
165do_read(int sock, void *buf, int read_len)
166{
167 fd_set readfds;
168 int n = 0, total = 0, bytesleft = read_len;
169
170 FD_ZERO(&readfds);
171 FD_SET(sock, &readfds);
172
173 while (total < read_len) {
174 if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
175 return -1;
176 if (!FD_ISSET(sock, &readfds))
177 return -1;
178
179 n = read(sock, buf + total, bytesleft);
180 if (n <= 0)
181 break;
182
183 total += n;
184 bytesleft -= n;
185 }
186
187 return (n == -1) ? -1 : total;
188}
189
190static int
191do_write(int sock, void *buf, int write_len)
192{
193 fd_set writefds;
194 int n = 0, total = 0, bytesleft = write_len;
195
196 FD_ZERO(&writefds);
197 FD_SET(sock, &writefds);
198
199 while (total < write_len) {
200 if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
201 return -1;
202 if (!FD_ISSET(sock, &writefds))
203 return -1;
204
205 n = write(sock, buf + total, bytesleft);
206 if (n <= 0)
207 break;
208
209 total += n;
210 bytesleft -= n;
211 }
212
213 return (n == -1) ? -1 : total;
214}
215
216/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
217 * otherwise the number of read bytes.*/
218static int
219read_frame(int sock, struct worker *w)
220{
221 uint32_t netint;
222 unsigned int framesz;
223
224 /* Read the frame size, on 4 bytes */
225 if (do_read(sock, &netint, sizeof(netint)) != 4) {
226 w->status_code = SPOE_FRM_ERR_IO;
227 return -1;
228 }
229
230 /* Check it against the max size */
231 framesz = ntohl(netint);
232 if (framesz > w->size) {
233 w->status_code = SPOE_FRM_ERR_TOO_BIG;
234 return -1;
235 }
236
237 /* Read the frame */
238 if (do_read(sock, w->buf, framesz) != framesz) {
239 w->status_code = SPOE_FRM_ERR_IO;
240 return -1;
241 }
242
243 w->len = framesz;
244 return framesz;
245}
246
247/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
248 * number of written bytes. */
249static int
250write_frame(int sock, struct worker *w)
251{
252 uint32_t netint;
253
254 /* Write the frame size, on 4 bytes */
255 netint = htonl(w->len);
256 if (do_write(sock, &netint, sizeof(netint)) != 4) {
257 w->status_code = SPOE_FRM_ERR_IO;
258 return -1;
259 }
260
261 /* Write the frame */
262 if (do_write(sock, w->buf, w->len) != w->len) {
263 w->status_code = SPOE_FRM_ERR_IO;
264 return -1;
265 }
266 return w->len;
267}
268
269/* Encode a variable-length integer. This function never fails and returns the
270 * number of written bytes. */
271static int
272encode_spoe_varint(uint64_t i, char *buf)
273{
274 int idx;
275
276 if (i < 240) {
277 buf[0] = (unsigned char)i;
278 return 1;
279 }
280
281 buf[0] = (unsigned char)i | 240;
282 i = (i - 240) >> 4;
283 for (idx = 1; i >= 128; ++idx) {
284 buf[idx] = (unsigned char)i | 128;
285 i = (i - 128) >> 7;
286 }
287 buf[idx++] = (unsigned char)i;
288 return idx;
289}
290
291/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
292 * happens when the buffer's end in reached. On success, the number of read
293 * bytes is returned. */
294static int
295decode_spoe_varint(char *buf, char *end, uint64_t *i)
296{
297 unsigned char *msg = (unsigned char *)buf;
298 int idx = 0;
299
300 if (msg > (unsigned char *)end)
301 return -1;
302
303 if (msg[0] < 240) {
304 *i = msg[0];
305 return 1;
306 }
307 *i = msg[0];
308 do {
309 ++idx;
310 if (msg+idx > (unsigned char *)end)
311 return -1;
312 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
313 } while (msg[idx] >= 128);
314 return (idx + 1);
315}
316
317/* Encode a string. The string will be prefix by its length, encoded as a
318 * variable-length integer. This function never fails and returns the number of
319 * written bytes. */
320static int
321encode_spoe_string(const char *str, size_t len, char *dst)
322{
323 int idx = 0;
324
325 if (!len) {
326 dst[0] = 0;
327 return 1;
328 }
329
330 idx += encode_spoe_varint(len, dst);
331 memcpy(dst+idx, str, len);
332 return (idx + len);
333}
334
335/* Decode a string. Its length is decoded first as a variable-length integer. If
336 * it succeeds, and if the string length is valid, the begin of the string is
337 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
338 * read is returned. If an error occurred, -1 is returned and <*str> remains
339 * NULL. */
340static int
341decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
342{
343 int r, idx = 0;
344
345 *str = NULL;
346 *len = 0;
347
348 if ((r = decode_spoe_varint(buf, end, len)) == -1)
349 goto error;
350 idx += r;
351 if (buf + idx + *len > end)
352 goto error;
353
354 *str = buf+idx;
355 return (idx + *len);
356
357error:
358 return -1;
359}
360
361/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
362 * of bytes read is returned. A types data is composed of a type (1 byte) and
363 * corresponding data:
364 * - boolean: non additional data (0 bytes)
365 * - integers: a variable-length integer (see decode_spoe_varint)
366 * - ipv4: 4 bytes
367 * - ipv6: 16 bytes
368 * - binary and string: a buffer prefixed by its size, a variable-length
369 * integer (see decode_spoe_string) */
370static int
371skip_spoe_data(char *frame, char *end)
372{
373 uint64_t sz = 0;
374 int r, idx = 0;
375
376 if (frame > end)
377 return -1;
378
379 switch (frame[idx++] & SPOE_DATA_T_MASK) {
380 case SPOE_DATA_T_BOOL:
381 idx++;
382 break;
383 case SPOE_DATA_T_INT32:
384 case SPOE_DATA_T_INT64:
385 case SPOE_DATA_T_UINT32:
386 case SPOE_DATA_T_UINT64:
387 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
388 return -1;
389 idx += r;
390 break;
391 case SPOE_DATA_T_IPV4:
392 idx += 4;
393 break;
394 case SPOE_DATA_T_IPV6:
395 idx += 16;
396 break;
397 case SPOE_DATA_T_STR:
398 case SPOE_DATA_T_BIN:
399 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
400 return -1;
401 idx += r + sz;
402 break;
403 }
404
405 if (frame+idx > end)
406 return -1;
407 return idx;
408}
409
410/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
411 * number of read bytes is returned. See skip_spoe_data for details. */
412static int
413decode_spoe_data(char *frame, char *end, struct spoe_data *data)
414{
415 uint64_t sz = 0;
416 int type, r, idx = 0;
417
418 if (frame > end)
419 return -1;
420
421 type = frame[idx++];
422 data->type = (type & SPOE_DATA_T_MASK);
423 switch (data->type) {
424 case SPOE_DATA_T_BOOL:
425 data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
426 break;
427 case SPOE_DATA_T_INT32:
428 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
429 return -1;
430 data->u.sint32 = sz;
431 idx += r;
432 break;
433 case SPOE_DATA_T_INT64:
434 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
435 return -1;
436 data->u.uint32 = sz;
437 idx += r;
438 break;
439 case SPOE_DATA_T_UINT32:
440 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
441 return -1;
442 data->u.sint64 = sz;
443 idx += r;
444 break;
445 case SPOE_DATA_T_UINT64:
446 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
447 return -1;
448 data->u.uint64 = sz;
449 idx += r;
450 break;
451 case SPOE_DATA_T_IPV4:
452 if (frame+idx+4 > end)
453 return -1;
454 memcpy(&data->u.ipv4, frame+idx, 4);
455 idx += 4;
456 break;
457 case SPOE_DATA_T_IPV6:
458 if (frame+idx+16 > end)
459 return -1;
460 memcpy(&data->u.ipv6, frame+idx, 16);
461 idx += 16;
462 break;
463 case SPOE_DATA_T_STR:
464 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
465 return -1;
466 idx += r;
467 if (frame+idx+sz > end)
468 return -1;
469 data->u.buffer.str = frame+idx;
470 data->u.buffer.len = sz;
471 idx += sz;
472 break;
473 case SPOE_DATA_T_BIN:
474 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
475 return -1;
476 idx += r;
477 if (frame+idx+sz > end)
478 return -1;
479 data->u.buffer.str = frame+idx;
480 data->u.buffer.len = sz;
481 idx += sz;
482 break;
483 default:
484 break;
485 }
486
487 if (frame+idx > end)
488 return -1;
489 return idx;
490}
491
492
493/* Check the protocol version. It returns -1 if an error occurred, the number of
494 * read bytes otherwise. */
495static int
496check_proto_version(struct worker *w, int idx)
497{
498 char *str;
499 uint64_t sz;
500
501 /* Get the list of all supported versions by HAProxy */
502 if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
503 w->status_code = SPOE_FRM_ERR_INVALID;
504 return -1;
505 }
506 idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
507 if (str == NULL) {
508 w->status_code = SPOE_FRM_ERR_INVALID;
509 return -1;
510 }
511
512 /* TODO: Find the right verion in supported ones */
513
514 return idx;
515}
516
517/* Check max frame size value. It returns -1 if an error occurred, the number of
518 * read bytes otherwise. */
519static int
520check_max_frame_size(struct worker *w, int idx)
521{
522 uint64_t sz;
523 int type, i;
524
525 /* Get the max-frame-size value of HAProxy */
526 type = w->buf[idx++];
527 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
528 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
529 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
530 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
531 w->status_code = SPOE_FRM_ERR_INVALID;
532 return -1;
533 }
534 if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
535 w->status_code = SPOE_FRM_ERR_INVALID;
536 return -1;
537 }
538 idx += i;
539
540 /* Keep the lower value */
541 if (sz < w->size)
542 w->size = sz;
543
544 return idx;
545}
546
547/* Check healthcheck value. It returns -1 if an error occurred, the number of
548 * read bytes otherwise. */
549static int
550check_healthcheck(struct worker *w, int idx)
551{
552 int type;
553
554 /* Get the "healthcheck" value of HAProxy */
555 type = w->buf[idx++];
556 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
557 w->status_code = SPOE_FRM_ERR_INVALID;
558 return -1;
559 }
560 w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
561 return idx;
562}
563
564
565/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
566 * occurred, 0 if the frame must be skipped, otherwise the number of read
567 * bytes. */
568static int
569handle_hahello(struct worker *w)
570{
571 char *end = w->buf+w->len;
572 int i, idx = 0;
573
574 /* Check frame type */
575 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
576 goto skip;
577
578 /* Skip flags */
579 idx += 4;
580
581 /* stream-id and frame-id must be cleared */
582 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
583 w->status_code = SPOE_FRM_ERR_INVALID;
584 goto error;
585 }
586 idx += 2;
587
588 /* Loop on K/V items */
589 while (idx < w->len) {
590 char *str;
591 uint64_t sz;
592
593 /* Decode the item name */
594 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
595 if (str == NULL) {
596 w->status_code = SPOE_FRM_ERR_INVALID;
597 goto error;
598 }
599
600 /* Check "supported-versions" K/V item */
601 if (!memcmp(str, "supported-versions", sz)) {
602 if ((i = check_proto_version(w, idx)) == -1)
603 goto error;
604 idx = i;
605 }
606 /* Check "max-frame-size" K/V item "*/
607 else if (!memcmp(str, "max-frame-size", sz)) {
608 if ((i = check_max_frame_size(w, idx)) == -1)
609 goto error;
610 idx = i;
611 }
612 /* Check "healthcheck" K/V item "*/
613 else if (!memcmp(str, "healthcheck", sz)) {
614 if ((i = check_healthcheck(w, idx)) == -1)
615 goto error;
616 idx = i;
617 }
618 /* Skip "capabilities" K/V item for now */
619 else {
620 /* Silently ignore unknown item */
621 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
622 w->status_code = SPOE_FRM_ERR_INVALID;
623 goto error;
624 }
625 idx += i;
626 }
627 }
628
629 return idx;
630skip:
631 return 0;
632error:
633 return -1;
634}
635
636/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
637 * occurred, 0 if the frame must be skipped, otherwise the number of read
638 * bytes. */
639static int
640handle_hadiscon(struct worker *w)
641{
642 char *end = w->buf+w->len;
643 int i, idx = 0;
644
645 /* Check frame type */
646 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
647 goto skip;
648
649 /* Skip flags */
650 idx += 4;
651
652 /* stream-id and frame-id must be cleared */
653 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
654 w->status_code = SPOE_FRM_ERR_INVALID;
655 goto error;
656 }
657 idx += 2;
658
659 /* Loop on K/V items */
660 while (idx < w->len) {
661 char *str;
662 uint64_t sz;
663
664 /* Decode item key */
665 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
666 if (str == NULL) {
667 w->status_code = SPOE_FRM_ERR_INVALID;
668 goto error;
669 }
670 /* Silently ignore unknown item */
671 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
672 w->status_code = SPOE_FRM_ERR_INVALID;
673 goto error;
674 }
675 idx += i;
676 }
677
678 w->status_code = SPOE_FRM_ERR_NONE;
679 return idx;
680skip:
681 return 0;
682error:
683 return -1;
684}
685
686/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
687 * occurred, 0 if the frame must be skipped, otherwise the number of read
688 * bytes. */
689static int
690handle_hanotify(struct worker *w)
691{
692 char *end = w->buf+w->len;
693 uint64_t stream_id, frame_id;
694 int nbargs, i, idx = 0;
695
696 /* Check frame type */
697 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
698 goto skip;
699
700 /* Skip flags */
701 idx += 4;
702
703 /* Read the stream-id */
704 if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
705 w->status_code = SPOE_FRM_ERR_INVALID;
706 goto error;
707 }
708 idx += i;
709
710 /* Read the frame-id */
711 if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
712 w->status_code = SPOE_FRM_ERR_INVALID;
713 goto error;
714 }
715 idx += i;
716
717 w->stream_id = (unsigned int)stream_id;
718 w->frame_id = (unsigned int)frame_id;
719
720 DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
721 w->stream_id, w->frame_id);
722
723 /* Loop on messages */
724 while (idx < w->len) {
725 char *str;
726 uint64_t sz;
727
728 /* Decode the message name */
729 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
730 if (str == NULL) {
731 w->status_code = SPOE_FRM_ERR_INVALID;
732 goto error;
733 }
734 DEBUG(" Message '%.*s' received", (int)sz, str);
735
736 nbargs = w->buf[idx++];
737 if (!memcmp(str, "check-client-ip", sz)) {
738 struct spoe_data data;
739
740 memset(&data, 0, sizeof(data));
741
742 if (nbargs != 1) {
743 w->status_code = SPOE_FRM_ERR_INVALID;
744 goto error;
745 }
746 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
747 w->status_code = SPOE_FRM_ERR_INVALID;
748 goto error;
749 }
750 idx += i;
751 if ((i = decode_spoe_data(w->buf+idx, end, &data)) == -1) {
752 w->status_code = SPOE_FRM_ERR_INVALID;
753 goto error;
754 }
755 idx += i;
756 if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4)
757 check_ipv4_reputation(w, &data.u.ipv4);
758 else if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV6)
759 check_ipv6_reputation(w, &data.u.ipv6);
760 else {
761 w->status_code = SPOE_FRM_ERR_INVALID;
762 goto error;
763 }
764 }
765 else {
766 while (nbargs-- > 0) {
767 /* Silently ignore argument: its name and its value */
768 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
769 w->status_code = SPOE_FRM_ERR_INVALID;
770 goto error;
771 }
772 idx += i;
773 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
774 w->status_code = SPOE_FRM_ERR_INVALID;
775 goto error;
776 }
777 idx += i;
778 }
779 }
780 }
781
782 return idx;
783skip:
784 return 0;
785error:
786 return -1;
787}
788
789/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
790 * occurred, the number of written bytes otherwise. */
791static int
792prepare_agenthello(struct worker *w)
793{
794 int idx = 0;
795
796 /* Frame Type */
797 w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
798
799 /* No flags for now */
800 memset(w->buf+idx, 0, 4); /* No flags */
801 idx += 4;
802
803 /* No stream-id and frame-id for HELLO frames */
804 w->buf[idx++] = 0;
805 w->buf[idx++] = 0;
806
807 /* "version" K/V item */
808 idx += encode_spoe_string("version", 7, w->buf+idx);
809 w->buf[idx++] = SPOE_DATA_T_STR;
810 idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
811
812 /* "max-frame-size" K/V item */
813 idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
814 w->buf[idx++] = SPOE_DATA_T_UINT32;
815 idx += encode_spoe_varint(w->size, w->buf+idx);
816
817 /* "capabilities" K/V item */
818 idx += encode_spoe_string("capabilities", 12, w->buf+idx);
819 w->buf[idx++] = SPOE_DATA_T_STR;
820 idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
821
822 w->len = idx;
823 return idx;
824}
825
826/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
827 * the number of written bytes otherwise. */
828static int
829prepare_agentack(struct worker *w)
830{
831 int idx = 0;
832
833 /* Frame type */
834 w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
835
836 /* No flags for now */
837 memset(w->buf+idx, 0, 4); /* No flags */
838 idx += 4;
839
840 /* Set stream-id and frame-id for ACK frames */
841 idx += encode_spoe_varint(w->stream_id, w->buf+idx);
842 idx += encode_spoe_varint(w->frame_id, w->buf+idx);
843
844 /* Data */
845 if (w->ip_score == -1)
846 goto out;
847
848 w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */
849 w->buf[idx++] = 3; /* Number of args */
850 w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */
851 idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
852 w->buf[idx++] = SPOE_DATA_T_UINT32;
853 idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
854out:
855 w->len = idx;
856 return idx;
857}
858
859/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
860 * occurred, the number of written bytes otherwise. */
861static int
862prepare_agentdicon(struct worker *w)
863{
864 const char *reason;
865 int rlen, idx = 0;
866
867 if (w->status_code >= SPOE_FRM_ERRS)
868 w->status_code = SPOE_FRM_ERR_UNKNOWN;
869 reason = spoe_frm_err_reasons[w->status_code];
870 rlen = strlen(reason);
871
872 /* Frame type */
873 w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
874
875 /* No flags for now */
876 memset(w->buf+idx, 0, 4);
877 idx += 4;
878
879 /* No stream-id and frame-id for DISCONNECT frames */
880 w->buf[idx++] = 0;
881 w->buf[idx++] = 0;
882
883 /* There are 2 mandatory items: "status-code" and "message" */
884
885 /* "status-code" K/V item */
886 idx += encode_spoe_string("status-code", 11, w->buf+idx);
887 w->buf[idx++] = SPOE_DATA_T_UINT32;
888 idx += encode_spoe_varint(w->status_code, w->buf+idx);
889
890 /* "message" K/V item */
891 idx += encode_spoe_string("message", 7, w->buf+idx);
892 w->buf[idx++] = SPOE_DATA_T_STR;
893 idx += encode_spoe_string(reason, rlen, w->buf+idx);
894
895 w->len = idx;
896 return idx;
897}
898
899static int
900hello_handshake(int sock, struct worker *w)
901{
902 if (read_frame(sock, w) < 0) {
903 LOG("Failed to read Haproxy HELLO frame");
904 goto error;
905 }
906 if (handle_hahello(w) < 0) {
907 LOG("Failed to handle Haproxy HELLO frame");
908 goto error;
909 }
910 if (prepare_agenthello(w) < 0) {
911 LOG("Failed to prepare Agent HELLO frame");
912 goto error;
913 }
914 if (write_frame(sock, w) < 0) {
915 LOG("Failed to write Agent frame");
916 goto error;
917 }
918 DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
919 SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
920 return 0;
921error:
922 return -1;
923}
924
925static int
926notify_ack_roundtip(int sock, struct worker *w)
927{
928 if (read_frame(sock, w) < 0) {
929 LOG("Failed to read Haproxy NOTIFY frame");
930 goto error_or_quit;
931 }
932 if (handle_hadiscon(w) != 0) {
933 if (w->status_code != SPOE_FRM_ERR_NONE)
934 LOG("Failed to handle Haproxy DISCONNECT frame");
935 DEBUG("Disconnect frame received: reason=%s",
936 spoe_frm_err_reasons[w->status_code]);
937 goto error_or_quit;
938 }
939 if (handle_hanotify(w) < 0) {
940 LOG("Failed to handle Haproxy NOTIFY frame");
941 goto error_or_quit;
942 }
943 if (prepare_agentack(w) < 0) {
944 LOG("Failed to prepare Agent ACK frame");
945 goto error_or_quit;
946 }
947 if (write_frame(sock, w) < 0) {
948 LOG("Failed to write Agent ACK frame");
949 goto error_or_quit;
950 }
951 DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
952 w->stream_id, w->frame_id);
953 return 0;
954error_or_quit:
955 return -1;
956}
957
958static void *
Thierry FOURNIER5301ed12018-02-23 11:59:15 +0100959spoa_worker(void *data)
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100960{
961 struct worker w;
962 struct sockaddr_in client;
963 int *info = (int *)data;
964 int csock, lsock = info[0];
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100965 struct ps *ps;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100966
967 signal(SIGPIPE, SIG_IGN);
968 pthread_setspecific(worker_id, &info[1]);
969
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100970 /* Init registered processors */
971 for (ps = ps_list; ps != NULL; ps = ps->next)
972 ps->init_worker(&w);
973
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100974 while (1) {
975 socklen_t sz = sizeof(client);
976
977 if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) {
978 LOG("Failed to accept client connection: %m");
979 goto out;
980 }
981 memset(&w, 0, sizeof(w));
982 w.id = info[1];
983 w.size = MAX_FRAME_SIZE;
984
985 DEBUG("New connection from HAProxy accepted");
986
987 if (hello_handshake(csock, &w) < 0)
988 goto disconnect;
989 if (w.healthcheck == true)
990 goto close;
991 while (1) {
992 w.ip_score = -1;
993 if (notify_ack_roundtip(csock, &w) < 0)
994 break;
995 }
996
997 disconnect:
998 if (w.status_code == SPOE_FRM_ERR_IO) {
999 LOG("Close the client socket because of I/O errors");
1000 goto close;
1001 }
1002 if (prepare_agentdicon(&w) < 0) {
1003 LOG("Failed to prepare Agent DISCONNECT frame");
1004 goto close;
1005 }
1006 if (write_frame(csock, &w) < 0) {
1007 LOG("Failed to write Agent DISCONNECT frame");
1008 goto close;
1009 }
1010 DEBUG("Disconnect frame sent: reason=%s",
1011 spoe_frm_err_reasons[w.status_code]);
1012
1013 close:
1014 close(csock);
1015 }
1016
1017out:
1018 free(info);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001019#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001020 pthread_exit(NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001021#endif
1022 return NULL;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001023}
1024
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001025int process_create(pid_t *pid, void *(*ps)(void *), void *data)
1026{
Thierry FOURNIER786e9e62018-02-23 19:11:47 +01001027 if (debug) {
1028 ps(data);
1029 exit(EXIT_SUCCESS);
1030 }
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001031 *pid = fork();
1032 if (*pid == -1)
1033 return -1;
1034 if (*pid > 0)
1035 return 0;
1036 ps(data);
1037 return 0;
1038}
1039
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001040static void
1041usage(char *prog)
1042{
1043 fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>]\n", prog);
1044 fprintf(stderr, " -h Print this message\n");
1045 fprintf(stderr, " -d Enable the debug mode\n");
1046 fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n");
1047 fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
1048}
1049
1050int
1051main(int argc, char **argv)
1052{
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001053#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001054 pthread_t *ts = NULL;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001055#endif
1056 pid_t *pids;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001057 struct sockaddr_in server;
1058 int i, sock, opt, nbworkers, port;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001059 int status;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001060
1061 nbworkers = NUM_WORKERS;
1062 port = DEFAULT_PORT;
1063 while ((opt = getopt(argc, argv, "hdn:p:")) != -1) {
1064 switch (opt) {
1065 case 'h':
1066 usage(argv[0]);
1067 return EXIT_SUCCESS;
1068 case 'd':
1069 debug = true;
1070 break;
1071 case 'n':
1072 nbworkers = atoi(optarg);
1073 break;
1074 case 'p':
1075 port = atoi(optarg);
1076 break;
1077 default:
1078 usage(argv[0]);
1079 return EXIT_FAILURE;
1080 }
1081 }
1082
1083 if (nbworkers <= 0) {
1084 fprintf(stderr, "%s: Invalid number of workers '%d'\n",
1085 argv[0], nbworkers);
1086 goto error;
1087 }
1088 if (port <= 0) {
1089 fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
1090 goto error;
1091 }
1092
1093 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1094 fprintf(stderr, "Failed creating socket: %m\n");
1095 goto error;
1096 }
1097
1098 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
1099 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
1100
1101 memset(&server, 0, sizeof(server));
1102 server.sin_family = AF_INET;
1103 server.sin_addr.s_addr = INADDR_ANY;
1104 server.sin_port = htons(port);
1105
1106 if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
1107 fprintf(stderr, "Failed to bind the socket: %m\n");
1108 goto error;
1109 }
1110
1111 if (listen(sock , 10) < 0) {
1112 fprintf(stderr, "Failed to listen on the socket: %m\n");
1113 goto error;
1114 }
1115 fprintf(stderr, "SPOA is listening on port %d\n", port);
1116
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001117 pthread_key_create(&worker_id, NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001118
1119 /* Initialise the server in thread mode. This code is commented
1120 * out and not deleted, because later I expect to work with
1121 * process ansd threads. This first version just support processes.
1122 */
1123#if 0
1124 ts = calloc(nbworkers, sizeof(*ts));
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001125 for (i = 0; i < nbworkers; i++) {
1126 int *info = calloc(2, sizeof(*info));
1127
1128 info[0] = sock;
1129 info[1] = i+1;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001130
Thierry FOURNIER5301ed12018-02-23 11:59:15 +01001131 if (pthread_create(&ts[i], NULL, spoa_worker, info) < 0) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001132 fprintf(stderr, "Failed to create thread %d: %m\n", i+1);
1133 goto error;
1134 }
1135 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1136 }
1137
1138 for (i = 0; i < nbworkers; i++) {
1139 pthread_join(ts[i], NULL);
1140 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1141 }
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001142 free(ts);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001143#endif
1144
1145 /* Start processes */
1146 pids = calloc(nbworkers, sizeof(*pids));
1147 if (!pids) {
1148 fprintf(stderr, "Out of memory error\n");
1149 goto error;
1150 }
1151 for (i = 0; i < nbworkers; i++) {
1152 int *info = calloc(2, sizeof(*info));
1153
1154 info[0] = sock;
1155 info[1] = i+1;
1156
1157 if (process_create(&pids[i], spoa_worker, info) == -1) {
1158 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1159 goto error;
1160 }
1161 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1162 }
1163 for (i = 0; i < nbworkers; i++) {
1164 waitpid(pids[0], &status, 0);
1165 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1166 }
1167
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001168 close(sock);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001169 pthread_key_delete(worker_id);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001170 return EXIT_SUCCESS;
1171error:
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001172 return EXIT_FAILURE;
1173}