blob: e484315f658e826b4269dd732423177d749d2377 [file] [log] [blame]
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001/*
2 * A Random IP reputation service acting as a Stream Processing Offload Agent
3 *
4 * This is a very simple service that implement a "random" ip reputation
5 * service. It will return random scores for all checked IP addresses. It only
6 * shows you how to implement a ip reputation service or such kind of services
7 * using the SPOE.
8 *
9 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010010 * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010011 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version
15 * 2 of the License, or (at your option) any later version.
16 *
17 */
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdbool.h>
22#include <unistd.h>
23#include <signal.h>
24#include <pthread.h>
25#include <sys/time.h>
26#include <sys/types.h>
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +010027#include <sys/wait.h>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010028#include <sys/socket.h>
29#include <netinet/in.h>
30#include <netinet/tcp.h>
31#include <arpa/inet.h>
32
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010033#include "spoa.h"
34
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010035#define DEFAULT_PORT 12345
36#define NUM_WORKERS 5
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010037
38#define SLEN(str) (sizeof(str)-1)
39
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010040/* Frame Types sent by HAProxy and by agents */
41enum spoe_frame_type {
42 /* Frames sent by HAProxy */
43 SPOE_FRM_T_HAPROXY_HELLO = 1,
44 SPOE_FRM_T_HAPROXY_DISCON,
45 SPOE_FRM_T_HAPROXY_NOTIFY,
46
47 /* Frames sent by the agents */
48 SPOE_FRM_T_AGENT_HELLO = 101,
49 SPOE_FRM_T_AGENT_DISCON,
50 SPOE_FRM_T_AGENT_ACK
51};
52
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010053/* Errors triggerd by SPOE applet */
54enum spoe_frame_error {
55 SPOE_FRM_ERR_NONE = 0,
56 SPOE_FRM_ERR_IO,
57 SPOE_FRM_ERR_TOUT,
58 SPOE_FRM_ERR_TOO_BIG,
59 SPOE_FRM_ERR_INVALID,
60 SPOE_FRM_ERR_NO_VSN,
61 SPOE_FRM_ERR_NO_FRAME_SIZE,
62 SPOE_FRM_ERR_NO_CAP,
63 SPOE_FRM_ERR_BAD_VSN,
64 SPOE_FRM_ERR_BAD_FRAME_SIZE,
65 SPOE_FRM_ERR_UNKNOWN = 99,
66 SPOE_FRM_ERRS,
67};
68
69/* All supported SPOE actions */
70enum spoe_action_type {
71 SPOE_ACT_T_SET_VAR = 1,
72 SPOE_ACT_T_UNSET_VAR,
73 SPOE_ACT_TYPES,
74};
75
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010076
77/* Masks to get data type or flags value */
78#define SPOE_DATA_T_MASK 0x0F
79#define SPOE_DATA_FL_MASK 0xF0
80
81/* Flags to set Boolean values */
82#define SPOE_DATA_FL_FALSE 0x00
83#define SPOE_DATA_FL_TRUE 0x10
84static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
85 [SPOE_FRM_ERR_NONE] = "normal",
86 [SPOE_FRM_ERR_IO] = "I/O error",
87 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
88 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
89 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
90 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
91 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
92 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
93 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
94 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
95 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
96};
97
Thierry FOURNIER880d7e12018-02-25 10:54:56 +010098bool debug = false;
99pthread_key_t worker_id;
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100100static struct ps *ps_list = NULL;
Thierry FOURNIER892f6642018-02-23 14:27:05 +0100101static struct ps_message *ps_messages = NULL;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +0100102static int nfiles = 0;
103static char **files = NULL;
104
105static inline void add_file(const char *file)
106{
107 nfiles++;
108 files = realloc(files, sizeof(*files) * nfiles);
109 if (files == NULL) {
110 fprintf(stderr, "Out of memory error\n");
111 exit(EXIT_FAILURE);
112 }
113 files[nfiles - 1] = strdup(file);
114 if (files[nfiles - 1] == NULL) {
115 fprintf(stderr, "Out of memory error\n");
116 exit(EXIT_FAILURE);
117 }
118}
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100119
120void ps_register(struct ps *ps)
121{
122 ps->next = ps_list;
123 ps_list = ps;
124}
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100125
Thierry FOURNIER892f6642018-02-23 14:27:05 +0100126void ps_register_message(struct ps *ps, const char *name, void *ref)
127{
128 struct ps_message *msg;
129
130 /* Look for already registered name */
131 for (msg = ps_messages; msg; msg = msg->next) {
132 if (strcmp(name, msg->name) == 0) {
133 LOG("Message \"%s\" already registered\n", name);
134 exit(EXIT_FAILURE);
135 }
136 }
137
138 msg = calloc(1, sizeof(*msg));
139 if (msg == NULL) {
140 LOG("Out of memory error\n");
141 exit(EXIT_FAILURE);
142 }
143
144 msg->next = ps_messages;
145 ps_messages = msg;
146 msg->name = strdup(name);
147 if (msg->name == NULL) {
148 LOG("Out of memory error\n");
149 exit(EXIT_FAILURE);
150 }
151 msg->ref = ref;
152 msg->ps = ps;
153}
154
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100155static void
156check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
157{
158 char str[INET_ADDRSTRLEN];
159
160 if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
161 return;
162
163 w->ip_score = random() % 100;
164
165 DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
166}
167
168static void
169check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
170{
171 char str[INET6_ADDRSTRLEN];
172
173 if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
174 return;
175
176 w->ip_score = random() % 100;
177
178 DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
179}
180
181static int
182do_read(int sock, void *buf, int read_len)
183{
184 fd_set readfds;
185 int n = 0, total = 0, bytesleft = read_len;
186
187 FD_ZERO(&readfds);
188 FD_SET(sock, &readfds);
189
190 while (total < read_len) {
191 if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
192 return -1;
193 if (!FD_ISSET(sock, &readfds))
194 return -1;
195
196 n = read(sock, buf + total, bytesleft);
197 if (n <= 0)
198 break;
199
200 total += n;
201 bytesleft -= n;
202 }
203
204 return (n == -1) ? -1 : total;
205}
206
207static int
208do_write(int sock, void *buf, int write_len)
209{
210 fd_set writefds;
211 int n = 0, total = 0, bytesleft = write_len;
212
213 FD_ZERO(&writefds);
214 FD_SET(sock, &writefds);
215
216 while (total < write_len) {
217 if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
218 return -1;
219 if (!FD_ISSET(sock, &writefds))
220 return -1;
221
222 n = write(sock, buf + total, bytesleft);
223 if (n <= 0)
224 break;
225
226 total += n;
227 bytesleft -= n;
228 }
229
230 return (n == -1) ? -1 : total;
231}
232
233/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
234 * otherwise the number of read bytes.*/
235static int
236read_frame(int sock, struct worker *w)
237{
238 uint32_t netint;
239 unsigned int framesz;
240
241 /* Read the frame size, on 4 bytes */
242 if (do_read(sock, &netint, sizeof(netint)) != 4) {
243 w->status_code = SPOE_FRM_ERR_IO;
244 return -1;
245 }
246
247 /* Check it against the max size */
248 framesz = ntohl(netint);
249 if (framesz > w->size) {
250 w->status_code = SPOE_FRM_ERR_TOO_BIG;
251 return -1;
252 }
253
254 /* Read the frame */
255 if (do_read(sock, w->buf, framesz) != framesz) {
256 w->status_code = SPOE_FRM_ERR_IO;
257 return -1;
258 }
259
260 w->len = framesz;
261 return framesz;
262}
263
264/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
265 * number of written bytes. */
266static int
267write_frame(int sock, struct worker *w)
268{
269 uint32_t netint;
270
271 /* Write the frame size, on 4 bytes */
272 netint = htonl(w->len);
273 if (do_write(sock, &netint, sizeof(netint)) != 4) {
274 w->status_code = SPOE_FRM_ERR_IO;
275 return -1;
276 }
277
278 /* Write the frame */
279 if (do_write(sock, w->buf, w->len) != w->len) {
280 w->status_code = SPOE_FRM_ERR_IO;
281 return -1;
282 }
283 return w->len;
284}
285
286/* Encode a variable-length integer. This function never fails and returns the
287 * number of written bytes. */
288static int
289encode_spoe_varint(uint64_t i, char *buf)
290{
291 int idx;
292
293 if (i < 240) {
294 buf[0] = (unsigned char)i;
295 return 1;
296 }
297
298 buf[0] = (unsigned char)i | 240;
299 i = (i - 240) >> 4;
300 for (idx = 1; i >= 128; ++idx) {
301 buf[idx] = (unsigned char)i | 128;
302 i = (i - 128) >> 7;
303 }
304 buf[idx++] = (unsigned char)i;
305 return idx;
306}
307
308/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
309 * happens when the buffer's end in reached. On success, the number of read
310 * bytes is returned. */
311static int
312decode_spoe_varint(char *buf, char *end, uint64_t *i)
313{
314 unsigned char *msg = (unsigned char *)buf;
315 int idx = 0;
316
317 if (msg > (unsigned char *)end)
318 return -1;
319
320 if (msg[0] < 240) {
321 *i = msg[0];
322 return 1;
323 }
324 *i = msg[0];
325 do {
326 ++idx;
327 if (msg+idx > (unsigned char *)end)
328 return -1;
329 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
330 } while (msg[idx] >= 128);
331 return (idx + 1);
332}
333
334/* Encode a string. The string will be prefix by its length, encoded as a
335 * variable-length integer. This function never fails and returns the number of
336 * written bytes. */
337static int
338encode_spoe_string(const char *str, size_t len, char *dst)
339{
340 int idx = 0;
341
342 if (!len) {
343 dst[0] = 0;
344 return 1;
345 }
346
347 idx += encode_spoe_varint(len, dst);
348 memcpy(dst+idx, str, len);
349 return (idx + len);
350}
351
352/* Decode a string. Its length is decoded first as a variable-length integer. If
353 * it succeeds, and if the string length is valid, the begin of the string is
354 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
355 * read is returned. If an error occurred, -1 is returned and <*str> remains
356 * NULL. */
357static int
358decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
359{
360 int r, idx = 0;
361
362 *str = NULL;
363 *len = 0;
364
365 if ((r = decode_spoe_varint(buf, end, len)) == -1)
366 goto error;
367 idx += r;
368 if (buf + idx + *len > end)
369 goto error;
370
371 *str = buf+idx;
372 return (idx + *len);
373
374error:
375 return -1;
376}
377
378/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
379 * of bytes read is returned. A types data is composed of a type (1 byte) and
380 * corresponding data:
381 * - boolean: non additional data (0 bytes)
382 * - integers: a variable-length integer (see decode_spoe_varint)
383 * - ipv4: 4 bytes
384 * - ipv6: 16 bytes
385 * - binary and string: a buffer prefixed by its size, a variable-length
386 * integer (see decode_spoe_string) */
387static int
388skip_spoe_data(char *frame, char *end)
389{
390 uint64_t sz = 0;
391 int r, idx = 0;
392
393 if (frame > end)
394 return -1;
395
396 switch (frame[idx++] & SPOE_DATA_T_MASK) {
397 case SPOE_DATA_T_BOOL:
398 idx++;
399 break;
400 case SPOE_DATA_T_INT32:
401 case SPOE_DATA_T_INT64:
402 case SPOE_DATA_T_UINT32:
403 case SPOE_DATA_T_UINT64:
404 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
405 return -1;
406 idx += r;
407 break;
408 case SPOE_DATA_T_IPV4:
409 idx += 4;
410 break;
411 case SPOE_DATA_T_IPV6:
412 idx += 16;
413 break;
414 case SPOE_DATA_T_STR:
415 case SPOE_DATA_T_BIN:
416 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
417 return -1;
418 idx += r + sz;
419 break;
420 }
421
422 if (frame+idx > end)
423 return -1;
424 return idx;
425}
426
427/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
428 * number of read bytes is returned. See skip_spoe_data for details. */
429static int
430decode_spoe_data(char *frame, char *end, struct spoe_data *data)
431{
432 uint64_t sz = 0;
433 int type, r, idx = 0;
434
435 if (frame > end)
436 return -1;
437
438 type = frame[idx++];
439 data->type = (type & SPOE_DATA_T_MASK);
440 switch (data->type) {
441 case SPOE_DATA_T_BOOL:
442 data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
443 break;
444 case SPOE_DATA_T_INT32:
445 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
446 return -1;
447 data->u.sint32 = sz;
448 idx += r;
449 break;
450 case SPOE_DATA_T_INT64:
451 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
452 return -1;
453 data->u.uint32 = sz;
454 idx += r;
455 break;
456 case SPOE_DATA_T_UINT32:
457 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
458 return -1;
459 data->u.sint64 = sz;
460 idx += r;
461 break;
462 case SPOE_DATA_T_UINT64:
463 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
464 return -1;
465 data->u.uint64 = sz;
466 idx += r;
467 break;
468 case SPOE_DATA_T_IPV4:
469 if (frame+idx+4 > end)
470 return -1;
471 memcpy(&data->u.ipv4, frame+idx, 4);
472 idx += 4;
473 break;
474 case SPOE_DATA_T_IPV6:
475 if (frame+idx+16 > end)
476 return -1;
477 memcpy(&data->u.ipv6, frame+idx, 16);
478 idx += 16;
479 break;
480 case SPOE_DATA_T_STR:
481 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
482 return -1;
483 idx += r;
484 if (frame+idx+sz > end)
485 return -1;
486 data->u.buffer.str = frame+idx;
487 data->u.buffer.len = sz;
488 idx += sz;
489 break;
490 case SPOE_DATA_T_BIN:
491 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
492 return -1;
493 idx += r;
494 if (frame+idx+sz > end)
495 return -1;
496 data->u.buffer.str = frame+idx;
497 data->u.buffer.len = sz;
498 idx += sz;
499 break;
500 default:
501 break;
502 }
503
504 if (frame+idx > end)
505 return -1;
506 return idx;
507}
508
509
510/* Check the protocol version. It returns -1 if an error occurred, the number of
511 * read bytes otherwise. */
512static int
513check_proto_version(struct worker *w, int idx)
514{
515 char *str;
516 uint64_t sz;
517
518 /* Get the list of all supported versions by HAProxy */
519 if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
520 w->status_code = SPOE_FRM_ERR_INVALID;
521 return -1;
522 }
523 idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
524 if (str == NULL) {
525 w->status_code = SPOE_FRM_ERR_INVALID;
526 return -1;
527 }
528
529 /* TODO: Find the right verion in supported ones */
530
531 return idx;
532}
533
534/* Check max frame size value. It returns -1 if an error occurred, the number of
535 * read bytes otherwise. */
536static int
537check_max_frame_size(struct worker *w, int idx)
538{
539 uint64_t sz;
540 int type, i;
541
542 /* Get the max-frame-size value of HAProxy */
543 type = w->buf[idx++];
544 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
545 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
546 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
547 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
548 w->status_code = SPOE_FRM_ERR_INVALID;
549 return -1;
550 }
551 if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
552 w->status_code = SPOE_FRM_ERR_INVALID;
553 return -1;
554 }
555 idx += i;
556
557 /* Keep the lower value */
558 if (sz < w->size)
559 w->size = sz;
560
561 return idx;
562}
563
564/* Check healthcheck value. It returns -1 if an error occurred, the number of
565 * read bytes otherwise. */
566static int
567check_healthcheck(struct worker *w, int idx)
568{
569 int type;
570
571 /* Get the "healthcheck" value of HAProxy */
572 type = w->buf[idx++];
573 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
574 w->status_code = SPOE_FRM_ERR_INVALID;
575 return -1;
576 }
577 w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
578 return idx;
579}
580
581
582/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
583 * occurred, 0 if the frame must be skipped, otherwise the number of read
584 * bytes. */
585static int
586handle_hahello(struct worker *w)
587{
588 char *end = w->buf+w->len;
589 int i, idx = 0;
590
591 /* Check frame type */
592 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
593 goto skip;
594
595 /* Skip flags */
596 idx += 4;
597
598 /* stream-id and frame-id must be cleared */
599 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
600 w->status_code = SPOE_FRM_ERR_INVALID;
601 goto error;
602 }
603 idx += 2;
604
605 /* Loop on K/V items */
606 while (idx < w->len) {
607 char *str;
608 uint64_t sz;
609
610 /* Decode the item name */
611 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
612 if (str == NULL) {
613 w->status_code = SPOE_FRM_ERR_INVALID;
614 goto error;
615 }
616
617 /* Check "supported-versions" K/V item */
618 if (!memcmp(str, "supported-versions", sz)) {
619 if ((i = check_proto_version(w, idx)) == -1)
620 goto error;
621 idx = i;
622 }
623 /* Check "max-frame-size" K/V item "*/
624 else if (!memcmp(str, "max-frame-size", sz)) {
625 if ((i = check_max_frame_size(w, idx)) == -1)
626 goto error;
627 idx = i;
628 }
629 /* Check "healthcheck" K/V item "*/
630 else if (!memcmp(str, "healthcheck", sz)) {
631 if ((i = check_healthcheck(w, idx)) == -1)
632 goto error;
633 idx = i;
634 }
635 /* Skip "capabilities" K/V item for now */
636 else {
637 /* Silently ignore unknown item */
638 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
639 w->status_code = SPOE_FRM_ERR_INVALID;
640 goto error;
641 }
642 idx += i;
643 }
644 }
645
646 return idx;
647skip:
648 return 0;
649error:
650 return -1;
651}
652
653/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
654 * occurred, 0 if the frame must be skipped, otherwise the number of read
655 * bytes. */
656static int
657handle_hadiscon(struct worker *w)
658{
659 char *end = w->buf+w->len;
660 int i, idx = 0;
661
662 /* Check frame type */
663 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
664 goto skip;
665
666 /* Skip flags */
667 idx += 4;
668
669 /* stream-id and frame-id must be cleared */
670 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
671 w->status_code = SPOE_FRM_ERR_INVALID;
672 goto error;
673 }
674 idx += 2;
675
676 /* Loop on K/V items */
677 while (idx < w->len) {
678 char *str;
679 uint64_t sz;
680
681 /* Decode item key */
682 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
683 if (str == NULL) {
684 w->status_code = SPOE_FRM_ERR_INVALID;
685 goto error;
686 }
687 /* Silently ignore unknown item */
688 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
689 w->status_code = SPOE_FRM_ERR_INVALID;
690 goto error;
691 }
692 idx += i;
693 }
694
695 w->status_code = SPOE_FRM_ERR_NONE;
696 return idx;
697skip:
698 return 0;
699error:
700 return -1;
701}
702
703/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
704 * occurred, 0 if the frame must be skipped, otherwise the number of read
705 * bytes. */
706static int
707handle_hanotify(struct worker *w)
708{
709 char *end = w->buf+w->len;
710 uint64_t stream_id, frame_id;
711 int nbargs, i, idx = 0;
712
713 /* Check frame type */
714 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
715 goto skip;
716
717 /* Skip flags */
718 idx += 4;
719
720 /* Read the stream-id */
721 if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
722 w->status_code = SPOE_FRM_ERR_INVALID;
723 goto error;
724 }
725 idx += i;
726
727 /* Read the frame-id */
728 if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
729 w->status_code = SPOE_FRM_ERR_INVALID;
730 goto error;
731 }
732 idx += i;
733
734 w->stream_id = (unsigned int)stream_id;
735 w->frame_id = (unsigned int)frame_id;
736
737 DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
738 w->stream_id, w->frame_id);
739
740 /* Loop on messages */
741 while (idx < w->len) {
742 char *str;
743 uint64_t sz;
744
745 /* Decode the message name */
746 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
747 if (str == NULL) {
748 w->status_code = SPOE_FRM_ERR_INVALID;
749 goto error;
750 }
751 DEBUG(" Message '%.*s' received", (int)sz, str);
752
753 nbargs = w->buf[idx++];
754 if (!memcmp(str, "check-client-ip", sz)) {
755 struct spoe_data data;
756
757 memset(&data, 0, sizeof(data));
758
759 if (nbargs != 1) {
760 w->status_code = SPOE_FRM_ERR_INVALID;
761 goto error;
762 }
763 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
764 w->status_code = SPOE_FRM_ERR_INVALID;
765 goto error;
766 }
767 idx += i;
768 if ((i = decode_spoe_data(w->buf+idx, end, &data)) == -1) {
769 w->status_code = SPOE_FRM_ERR_INVALID;
770 goto error;
771 }
772 idx += i;
773 if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4)
774 check_ipv4_reputation(w, &data.u.ipv4);
775 else if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV6)
776 check_ipv6_reputation(w, &data.u.ipv6);
777 else {
778 w->status_code = SPOE_FRM_ERR_INVALID;
779 goto error;
780 }
781 }
782 else {
783 while (nbargs-- > 0) {
784 /* Silently ignore argument: its name and its value */
785 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
786 w->status_code = SPOE_FRM_ERR_INVALID;
787 goto error;
788 }
789 idx += i;
790 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
791 w->status_code = SPOE_FRM_ERR_INVALID;
792 goto error;
793 }
794 idx += i;
795 }
796 }
797 }
798
799 return idx;
800skip:
801 return 0;
802error:
803 return -1;
804}
805
806/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
807 * occurred, the number of written bytes otherwise. */
808static int
809prepare_agenthello(struct worker *w)
810{
811 int idx = 0;
812
813 /* Frame Type */
814 w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
815
816 /* No flags for now */
817 memset(w->buf+idx, 0, 4); /* No flags */
818 idx += 4;
819
820 /* No stream-id and frame-id for HELLO frames */
821 w->buf[idx++] = 0;
822 w->buf[idx++] = 0;
823
824 /* "version" K/V item */
825 idx += encode_spoe_string("version", 7, w->buf+idx);
826 w->buf[idx++] = SPOE_DATA_T_STR;
827 idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
828
829 /* "max-frame-size" K/V item */
830 idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
831 w->buf[idx++] = SPOE_DATA_T_UINT32;
832 idx += encode_spoe_varint(w->size, w->buf+idx);
833
834 /* "capabilities" K/V item */
835 idx += encode_spoe_string("capabilities", 12, w->buf+idx);
836 w->buf[idx++] = SPOE_DATA_T_STR;
837 idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
838
839 w->len = idx;
840 return idx;
841}
842
843/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
844 * the number of written bytes otherwise. */
845static int
846prepare_agentack(struct worker *w)
847{
848 int idx = 0;
849
850 /* Frame type */
851 w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
852
853 /* No flags for now */
854 memset(w->buf+idx, 0, 4); /* No flags */
855 idx += 4;
856
857 /* Set stream-id and frame-id for ACK frames */
858 idx += encode_spoe_varint(w->stream_id, w->buf+idx);
859 idx += encode_spoe_varint(w->frame_id, w->buf+idx);
860
861 /* Data */
862 if (w->ip_score == -1)
863 goto out;
864
865 w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */
866 w->buf[idx++] = 3; /* Number of args */
867 w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */
868 idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
869 w->buf[idx++] = SPOE_DATA_T_UINT32;
870 idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
871out:
872 w->len = idx;
873 return idx;
874}
875
876/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
877 * occurred, the number of written bytes otherwise. */
878static int
879prepare_agentdicon(struct worker *w)
880{
881 const char *reason;
882 int rlen, idx = 0;
883
884 if (w->status_code >= SPOE_FRM_ERRS)
885 w->status_code = SPOE_FRM_ERR_UNKNOWN;
886 reason = spoe_frm_err_reasons[w->status_code];
887 rlen = strlen(reason);
888
889 /* Frame type */
890 w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
891
892 /* No flags for now */
893 memset(w->buf+idx, 0, 4);
894 idx += 4;
895
896 /* No stream-id and frame-id for DISCONNECT frames */
897 w->buf[idx++] = 0;
898 w->buf[idx++] = 0;
899
900 /* There are 2 mandatory items: "status-code" and "message" */
901
902 /* "status-code" K/V item */
903 idx += encode_spoe_string("status-code", 11, w->buf+idx);
904 w->buf[idx++] = SPOE_DATA_T_UINT32;
905 idx += encode_spoe_varint(w->status_code, w->buf+idx);
906
907 /* "message" K/V item */
908 idx += encode_spoe_string("message", 7, w->buf+idx);
909 w->buf[idx++] = SPOE_DATA_T_STR;
910 idx += encode_spoe_string(reason, rlen, w->buf+idx);
911
912 w->len = idx;
913 return idx;
914}
915
916static int
917hello_handshake(int sock, struct worker *w)
918{
919 if (read_frame(sock, w) < 0) {
920 LOG("Failed to read Haproxy HELLO frame");
921 goto error;
922 }
923 if (handle_hahello(w) < 0) {
924 LOG("Failed to handle Haproxy HELLO frame");
925 goto error;
926 }
927 if (prepare_agenthello(w) < 0) {
928 LOG("Failed to prepare Agent HELLO frame");
929 goto error;
930 }
931 if (write_frame(sock, w) < 0) {
932 LOG("Failed to write Agent frame");
933 goto error;
934 }
935 DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
936 SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
937 return 0;
938error:
939 return -1;
940}
941
942static int
943notify_ack_roundtip(int sock, struct worker *w)
944{
945 if (read_frame(sock, w) < 0) {
946 LOG("Failed to read Haproxy NOTIFY frame");
947 goto error_or_quit;
948 }
949 if (handle_hadiscon(w) != 0) {
950 if (w->status_code != SPOE_FRM_ERR_NONE)
951 LOG("Failed to handle Haproxy DISCONNECT frame");
952 DEBUG("Disconnect frame received: reason=%s",
953 spoe_frm_err_reasons[w->status_code]);
954 goto error_or_quit;
955 }
956 if (handle_hanotify(w) < 0) {
957 LOG("Failed to handle Haproxy NOTIFY frame");
958 goto error_or_quit;
959 }
960 if (prepare_agentack(w) < 0) {
961 LOG("Failed to prepare Agent ACK frame");
962 goto error_or_quit;
963 }
964 if (write_frame(sock, w) < 0) {
965 LOG("Failed to write Agent ACK frame");
966 goto error_or_quit;
967 }
968 DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
969 w->stream_id, w->frame_id);
970 return 0;
971error_or_quit:
972 return -1;
973}
974
975static void *
Thierry FOURNIER5301ed12018-02-23 11:59:15 +0100976spoa_worker(void *data)
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100977{
978 struct worker w;
979 struct sockaddr_in client;
980 int *info = (int *)data;
981 int csock, lsock = info[0];
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100982 struct ps *ps;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +0100983 int i;
984 int len;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100985
986 signal(SIGPIPE, SIG_IGN);
987 pthread_setspecific(worker_id, &info[1]);
988
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100989 /* Init registered processors */
990 for (ps = ps_list; ps != NULL; ps = ps->next)
991 ps->init_worker(&w);
992
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +0100993 /* Load files */
994 for (i = 0; i < nfiles; i++) {
995 len = strlen(files[i]);
996 for (ps = ps_list; ps != NULL; ps = ps->next)
997 if (strcmp(files[i] + len - strlen(ps->ext), ps->ext) == 0)
998 break;
999 if (ps == NULL) {
1000 LOG("Can't load file \"%s\"\n", files[i]);
1001 goto out;
1002 }
1003 if (!ps->load_file(&w, files[i]))
1004 goto out;
1005 }
1006
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001007 while (1) {
1008 socklen_t sz = sizeof(client);
1009
1010 if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) {
1011 LOG("Failed to accept client connection: %m");
1012 goto out;
1013 }
1014 memset(&w, 0, sizeof(w));
1015 w.id = info[1];
1016 w.size = MAX_FRAME_SIZE;
1017
1018 DEBUG("New connection from HAProxy accepted");
1019
1020 if (hello_handshake(csock, &w) < 0)
1021 goto disconnect;
1022 if (w.healthcheck == true)
1023 goto close;
1024 while (1) {
1025 w.ip_score = -1;
1026 if (notify_ack_roundtip(csock, &w) < 0)
1027 break;
1028 }
1029
1030 disconnect:
1031 if (w.status_code == SPOE_FRM_ERR_IO) {
1032 LOG("Close the client socket because of I/O errors");
1033 goto close;
1034 }
1035 if (prepare_agentdicon(&w) < 0) {
1036 LOG("Failed to prepare Agent DISCONNECT frame");
1037 goto close;
1038 }
1039 if (write_frame(csock, &w) < 0) {
1040 LOG("Failed to write Agent DISCONNECT frame");
1041 goto close;
1042 }
1043 DEBUG("Disconnect frame sent: reason=%s",
1044 spoe_frm_err_reasons[w.status_code]);
1045
1046 close:
1047 close(csock);
1048 }
1049
1050out:
1051 free(info);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001052#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001053 pthread_exit(NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001054#endif
1055 return NULL;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001056}
1057
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001058int process_create(pid_t *pid, void *(*ps)(void *), void *data)
1059{
Thierry FOURNIER786e9e62018-02-23 19:11:47 +01001060 if (debug) {
1061 ps(data);
1062 exit(EXIT_SUCCESS);
1063 }
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001064 *pid = fork();
1065 if (*pid == -1)
1066 return -1;
1067 if (*pid > 0)
1068 return 0;
1069 ps(data);
1070 return 0;
1071}
1072
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001073static void
1074usage(char *prog)
1075{
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001076 fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>] -f <file>\n", prog);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001077 fprintf(stderr, " -h Print this message\n");
1078 fprintf(stderr, " -d Enable the debug mode\n");
1079 fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n");
1080 fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001081 fprintf(stderr, " -f <file> Specify the file whoch contains the processing code.\n");
1082 fprintf(stderr, " This argument can specified more than once.\n");
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001083}
1084
1085int
1086main(int argc, char **argv)
1087{
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001088#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001089 pthread_t *ts = NULL;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001090#endif
1091 pid_t *pids;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001092 struct sockaddr_in server;
1093 int i, sock, opt, nbworkers, port;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001094 int status;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001095
1096 nbworkers = NUM_WORKERS;
1097 port = DEFAULT_PORT;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001098 while ((opt = getopt(argc, argv, "hdn:p:f:")) != -1) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001099 switch (opt) {
1100 case 'h':
1101 usage(argv[0]);
1102 return EXIT_SUCCESS;
1103 case 'd':
1104 debug = true;
1105 break;
1106 case 'n':
1107 nbworkers = atoi(optarg);
1108 break;
1109 case 'p':
1110 port = atoi(optarg);
1111 break;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001112 case 'f':
1113 add_file(optarg);
1114 break;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001115 default:
1116 usage(argv[0]);
1117 return EXIT_FAILURE;
1118 }
1119 }
1120
1121 if (nbworkers <= 0) {
1122 fprintf(stderr, "%s: Invalid number of workers '%d'\n",
1123 argv[0], nbworkers);
1124 goto error;
1125 }
1126 if (port <= 0) {
1127 fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
1128 goto error;
1129 }
1130
1131 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1132 fprintf(stderr, "Failed creating socket: %m\n");
1133 goto error;
1134 }
1135
1136 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
1137 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
1138
1139 memset(&server, 0, sizeof(server));
1140 server.sin_family = AF_INET;
1141 server.sin_addr.s_addr = INADDR_ANY;
1142 server.sin_port = htons(port);
1143
1144 if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
1145 fprintf(stderr, "Failed to bind the socket: %m\n");
1146 goto error;
1147 }
1148
1149 if (listen(sock , 10) < 0) {
1150 fprintf(stderr, "Failed to listen on the socket: %m\n");
1151 goto error;
1152 }
1153 fprintf(stderr, "SPOA is listening on port %d\n", port);
1154
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001155 pthread_key_create(&worker_id, NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001156
1157 /* Initialise the server in thread mode. This code is commented
1158 * out and not deleted, because later I expect to work with
1159 * process ansd threads. This first version just support processes.
1160 */
1161#if 0
1162 ts = calloc(nbworkers, sizeof(*ts));
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001163 for (i = 0; i < nbworkers; i++) {
1164 int *info = calloc(2, sizeof(*info));
1165
1166 info[0] = sock;
1167 info[1] = i+1;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001168
Thierry FOURNIER5301ed12018-02-23 11:59:15 +01001169 if (pthread_create(&ts[i], NULL, spoa_worker, info) < 0) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001170 fprintf(stderr, "Failed to create thread %d: %m\n", i+1);
1171 goto error;
1172 }
1173 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1174 }
1175
1176 for (i = 0; i < nbworkers; i++) {
1177 pthread_join(ts[i], NULL);
1178 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1179 }
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001180 free(ts);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001181#endif
1182
1183 /* Start processes */
1184 pids = calloc(nbworkers, sizeof(*pids));
1185 if (!pids) {
1186 fprintf(stderr, "Out of memory error\n");
1187 goto error;
1188 }
1189 for (i = 0; i < nbworkers; i++) {
1190 int *info = calloc(2, sizeof(*info));
1191
1192 info[0] = sock;
1193 info[1] = i+1;
1194
1195 if (process_create(&pids[i], spoa_worker, info) == -1) {
1196 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1197 goto error;
1198 }
1199 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1200 }
1201 for (i = 0; i < nbworkers; i++) {
1202 waitpid(pids[0], &status, 0);
1203 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1204 }
1205
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001206 close(sock);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001207 pthread_key_delete(worker_id);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001208 return EXIT_SUCCESS;
1209error:
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001210 return EXIT_FAILURE;
1211}