blob: 6645c8ca3c061e7dc543ba31a62e4a3e8101d8bf [file] [log] [blame]
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001/*
2 * A Random IP reputation service acting as a Stream Processing Offload Agent
3 *
4 * This is a very simple service that implement a "random" ip reputation
5 * service. It will return random scores for all checked IP addresses. It only
6 * shows you how to implement a ip reputation service or such kind of services
7 * using the SPOE.
8 *
9 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010010 * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010011 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version
15 * 2 of the License, or (at your option) any later version.
16 *
17 */
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdbool.h>
22#include <unistd.h>
23#include <signal.h>
24#include <pthread.h>
25#include <sys/time.h>
26#include <sys/types.h>
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +010027#include <sys/wait.h>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010028#include <sys/socket.h>
29#include <netinet/in.h>
30#include <netinet/tcp.h>
31#include <arpa/inet.h>
32
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010033#include "spoa.h"
34
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010035#define DEFAULT_PORT 12345
36#define NUM_WORKERS 5
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010037
38#define SLEN(str) (sizeof(str)-1)
39
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010040/* Frame Types sent by HAProxy and by agents */
41enum spoe_frame_type {
42 /* Frames sent by HAProxy */
43 SPOE_FRM_T_HAPROXY_HELLO = 1,
44 SPOE_FRM_T_HAPROXY_DISCON,
45 SPOE_FRM_T_HAPROXY_NOTIFY,
46
47 /* Frames sent by the agents */
48 SPOE_FRM_T_AGENT_HELLO = 101,
49 SPOE_FRM_T_AGENT_DISCON,
50 SPOE_FRM_T_AGENT_ACK
51};
52
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010053/* Errors triggerd by SPOE applet */
54enum spoe_frame_error {
55 SPOE_FRM_ERR_NONE = 0,
56 SPOE_FRM_ERR_IO,
57 SPOE_FRM_ERR_TOUT,
58 SPOE_FRM_ERR_TOO_BIG,
59 SPOE_FRM_ERR_INVALID,
60 SPOE_FRM_ERR_NO_VSN,
61 SPOE_FRM_ERR_NO_FRAME_SIZE,
62 SPOE_FRM_ERR_NO_CAP,
63 SPOE_FRM_ERR_BAD_VSN,
64 SPOE_FRM_ERR_BAD_FRAME_SIZE,
65 SPOE_FRM_ERR_UNKNOWN = 99,
66 SPOE_FRM_ERRS,
67};
68
69/* All supported SPOE actions */
70enum spoe_action_type {
71 SPOE_ACT_T_SET_VAR = 1,
72 SPOE_ACT_T_UNSET_VAR,
73 SPOE_ACT_TYPES,
74};
75
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010076
77/* Masks to get data type or flags value */
78#define SPOE_DATA_T_MASK 0x0F
79#define SPOE_DATA_FL_MASK 0xF0
80
81/* Flags to set Boolean values */
82#define SPOE_DATA_FL_FALSE 0x00
83#define SPOE_DATA_FL_TRUE 0x10
84static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
85 [SPOE_FRM_ERR_NONE] = "normal",
86 [SPOE_FRM_ERR_IO] = "I/O error",
87 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
88 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
89 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
90 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
91 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
92 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
93 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
94 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
95 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
96};
97
Thierry FOURNIER880d7e12018-02-25 10:54:56 +010098bool debug = false;
99pthread_key_t worker_id;
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100100static struct ps *ps_list = NULL;
101
102void ps_register(struct ps *ps)
103{
104 ps->next = ps_list;
105 ps_list = ps;
106}
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100107
108static void
109check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
110{
111 char str[INET_ADDRSTRLEN];
112
113 if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
114 return;
115
116 w->ip_score = random() % 100;
117
118 DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
119}
120
121static void
122check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
123{
124 char str[INET6_ADDRSTRLEN];
125
126 if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
127 return;
128
129 w->ip_score = random() % 100;
130
131 DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
132}
133
134static int
135do_read(int sock, void *buf, int read_len)
136{
137 fd_set readfds;
138 int n = 0, total = 0, bytesleft = read_len;
139
140 FD_ZERO(&readfds);
141 FD_SET(sock, &readfds);
142
143 while (total < read_len) {
144 if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
145 return -1;
146 if (!FD_ISSET(sock, &readfds))
147 return -1;
148
149 n = read(sock, buf + total, bytesleft);
150 if (n <= 0)
151 break;
152
153 total += n;
154 bytesleft -= n;
155 }
156
157 return (n == -1) ? -1 : total;
158}
159
160static int
161do_write(int sock, void *buf, int write_len)
162{
163 fd_set writefds;
164 int n = 0, total = 0, bytesleft = write_len;
165
166 FD_ZERO(&writefds);
167 FD_SET(sock, &writefds);
168
169 while (total < write_len) {
170 if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
171 return -1;
172 if (!FD_ISSET(sock, &writefds))
173 return -1;
174
175 n = write(sock, buf + total, bytesleft);
176 if (n <= 0)
177 break;
178
179 total += n;
180 bytesleft -= n;
181 }
182
183 return (n == -1) ? -1 : total;
184}
185
186/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
187 * otherwise the number of read bytes.*/
188static int
189read_frame(int sock, struct worker *w)
190{
191 uint32_t netint;
192 unsigned int framesz;
193
194 /* Read the frame size, on 4 bytes */
195 if (do_read(sock, &netint, sizeof(netint)) != 4) {
196 w->status_code = SPOE_FRM_ERR_IO;
197 return -1;
198 }
199
200 /* Check it against the max size */
201 framesz = ntohl(netint);
202 if (framesz > w->size) {
203 w->status_code = SPOE_FRM_ERR_TOO_BIG;
204 return -1;
205 }
206
207 /* Read the frame */
208 if (do_read(sock, w->buf, framesz) != framesz) {
209 w->status_code = SPOE_FRM_ERR_IO;
210 return -1;
211 }
212
213 w->len = framesz;
214 return framesz;
215}
216
217/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
218 * number of written bytes. */
219static int
220write_frame(int sock, struct worker *w)
221{
222 uint32_t netint;
223
224 /* Write the frame size, on 4 bytes */
225 netint = htonl(w->len);
226 if (do_write(sock, &netint, sizeof(netint)) != 4) {
227 w->status_code = SPOE_FRM_ERR_IO;
228 return -1;
229 }
230
231 /* Write the frame */
232 if (do_write(sock, w->buf, w->len) != w->len) {
233 w->status_code = SPOE_FRM_ERR_IO;
234 return -1;
235 }
236 return w->len;
237}
238
239/* Encode a variable-length integer. This function never fails and returns the
240 * number of written bytes. */
241static int
242encode_spoe_varint(uint64_t i, char *buf)
243{
244 int idx;
245
246 if (i < 240) {
247 buf[0] = (unsigned char)i;
248 return 1;
249 }
250
251 buf[0] = (unsigned char)i | 240;
252 i = (i - 240) >> 4;
253 for (idx = 1; i >= 128; ++idx) {
254 buf[idx] = (unsigned char)i | 128;
255 i = (i - 128) >> 7;
256 }
257 buf[idx++] = (unsigned char)i;
258 return idx;
259}
260
261/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
262 * happens when the buffer's end in reached. On success, the number of read
263 * bytes is returned. */
264static int
265decode_spoe_varint(char *buf, char *end, uint64_t *i)
266{
267 unsigned char *msg = (unsigned char *)buf;
268 int idx = 0;
269
270 if (msg > (unsigned char *)end)
271 return -1;
272
273 if (msg[0] < 240) {
274 *i = msg[0];
275 return 1;
276 }
277 *i = msg[0];
278 do {
279 ++idx;
280 if (msg+idx > (unsigned char *)end)
281 return -1;
282 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
283 } while (msg[idx] >= 128);
284 return (idx + 1);
285}
286
287/* Encode a string. The string will be prefix by its length, encoded as a
288 * variable-length integer. This function never fails and returns the number of
289 * written bytes. */
290static int
291encode_spoe_string(const char *str, size_t len, char *dst)
292{
293 int idx = 0;
294
295 if (!len) {
296 dst[0] = 0;
297 return 1;
298 }
299
300 idx += encode_spoe_varint(len, dst);
301 memcpy(dst+idx, str, len);
302 return (idx + len);
303}
304
305/* Decode a string. Its length is decoded first as a variable-length integer. If
306 * it succeeds, and if the string length is valid, the begin of the string is
307 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
308 * read is returned. If an error occurred, -1 is returned and <*str> remains
309 * NULL. */
310static int
311decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
312{
313 int r, idx = 0;
314
315 *str = NULL;
316 *len = 0;
317
318 if ((r = decode_spoe_varint(buf, end, len)) == -1)
319 goto error;
320 idx += r;
321 if (buf + idx + *len > end)
322 goto error;
323
324 *str = buf+idx;
325 return (idx + *len);
326
327error:
328 return -1;
329}
330
331/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
332 * of bytes read is returned. A types data is composed of a type (1 byte) and
333 * corresponding data:
334 * - boolean: non additional data (0 bytes)
335 * - integers: a variable-length integer (see decode_spoe_varint)
336 * - ipv4: 4 bytes
337 * - ipv6: 16 bytes
338 * - binary and string: a buffer prefixed by its size, a variable-length
339 * integer (see decode_spoe_string) */
340static int
341skip_spoe_data(char *frame, char *end)
342{
343 uint64_t sz = 0;
344 int r, idx = 0;
345
346 if (frame > end)
347 return -1;
348
349 switch (frame[idx++] & SPOE_DATA_T_MASK) {
350 case SPOE_DATA_T_BOOL:
351 idx++;
352 break;
353 case SPOE_DATA_T_INT32:
354 case SPOE_DATA_T_INT64:
355 case SPOE_DATA_T_UINT32:
356 case SPOE_DATA_T_UINT64:
357 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
358 return -1;
359 idx += r;
360 break;
361 case SPOE_DATA_T_IPV4:
362 idx += 4;
363 break;
364 case SPOE_DATA_T_IPV6:
365 idx += 16;
366 break;
367 case SPOE_DATA_T_STR:
368 case SPOE_DATA_T_BIN:
369 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
370 return -1;
371 idx += r + sz;
372 break;
373 }
374
375 if (frame+idx > end)
376 return -1;
377 return idx;
378}
379
380/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
381 * number of read bytes is returned. See skip_spoe_data for details. */
382static int
383decode_spoe_data(char *frame, char *end, struct spoe_data *data)
384{
385 uint64_t sz = 0;
386 int type, r, idx = 0;
387
388 if (frame > end)
389 return -1;
390
391 type = frame[idx++];
392 data->type = (type & SPOE_DATA_T_MASK);
393 switch (data->type) {
394 case SPOE_DATA_T_BOOL:
395 data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
396 break;
397 case SPOE_DATA_T_INT32:
398 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
399 return -1;
400 data->u.sint32 = sz;
401 idx += r;
402 break;
403 case SPOE_DATA_T_INT64:
404 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
405 return -1;
406 data->u.uint32 = sz;
407 idx += r;
408 break;
409 case SPOE_DATA_T_UINT32:
410 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
411 return -1;
412 data->u.sint64 = sz;
413 idx += r;
414 break;
415 case SPOE_DATA_T_UINT64:
416 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
417 return -1;
418 data->u.uint64 = sz;
419 idx += r;
420 break;
421 case SPOE_DATA_T_IPV4:
422 if (frame+idx+4 > end)
423 return -1;
424 memcpy(&data->u.ipv4, frame+idx, 4);
425 idx += 4;
426 break;
427 case SPOE_DATA_T_IPV6:
428 if (frame+idx+16 > end)
429 return -1;
430 memcpy(&data->u.ipv6, frame+idx, 16);
431 idx += 16;
432 break;
433 case SPOE_DATA_T_STR:
434 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
435 return -1;
436 idx += r;
437 if (frame+idx+sz > end)
438 return -1;
439 data->u.buffer.str = frame+idx;
440 data->u.buffer.len = sz;
441 idx += sz;
442 break;
443 case SPOE_DATA_T_BIN:
444 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
445 return -1;
446 idx += r;
447 if (frame+idx+sz > end)
448 return -1;
449 data->u.buffer.str = frame+idx;
450 data->u.buffer.len = sz;
451 idx += sz;
452 break;
453 default:
454 break;
455 }
456
457 if (frame+idx > end)
458 return -1;
459 return idx;
460}
461
462
463/* Check the protocol version. It returns -1 if an error occurred, the number of
464 * read bytes otherwise. */
465static int
466check_proto_version(struct worker *w, int idx)
467{
468 char *str;
469 uint64_t sz;
470
471 /* Get the list of all supported versions by HAProxy */
472 if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
473 w->status_code = SPOE_FRM_ERR_INVALID;
474 return -1;
475 }
476 idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
477 if (str == NULL) {
478 w->status_code = SPOE_FRM_ERR_INVALID;
479 return -1;
480 }
481
482 /* TODO: Find the right verion in supported ones */
483
484 return idx;
485}
486
487/* Check max frame size value. It returns -1 if an error occurred, the number of
488 * read bytes otherwise. */
489static int
490check_max_frame_size(struct worker *w, int idx)
491{
492 uint64_t sz;
493 int type, i;
494
495 /* Get the max-frame-size value of HAProxy */
496 type = w->buf[idx++];
497 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
498 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
499 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
500 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
501 w->status_code = SPOE_FRM_ERR_INVALID;
502 return -1;
503 }
504 if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
505 w->status_code = SPOE_FRM_ERR_INVALID;
506 return -1;
507 }
508 idx += i;
509
510 /* Keep the lower value */
511 if (sz < w->size)
512 w->size = sz;
513
514 return idx;
515}
516
517/* Check healthcheck value. It returns -1 if an error occurred, the number of
518 * read bytes otherwise. */
519static int
520check_healthcheck(struct worker *w, int idx)
521{
522 int type;
523
524 /* Get the "healthcheck" value of HAProxy */
525 type = w->buf[idx++];
526 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
527 w->status_code = SPOE_FRM_ERR_INVALID;
528 return -1;
529 }
530 w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
531 return idx;
532}
533
534
535/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
536 * occurred, 0 if the frame must be skipped, otherwise the number of read
537 * bytes. */
538static int
539handle_hahello(struct worker *w)
540{
541 char *end = w->buf+w->len;
542 int i, idx = 0;
543
544 /* Check frame type */
545 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
546 goto skip;
547
548 /* Skip flags */
549 idx += 4;
550
551 /* stream-id and frame-id must be cleared */
552 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
553 w->status_code = SPOE_FRM_ERR_INVALID;
554 goto error;
555 }
556 idx += 2;
557
558 /* Loop on K/V items */
559 while (idx < w->len) {
560 char *str;
561 uint64_t sz;
562
563 /* Decode the item name */
564 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
565 if (str == NULL) {
566 w->status_code = SPOE_FRM_ERR_INVALID;
567 goto error;
568 }
569
570 /* Check "supported-versions" K/V item */
571 if (!memcmp(str, "supported-versions", sz)) {
572 if ((i = check_proto_version(w, idx)) == -1)
573 goto error;
574 idx = i;
575 }
576 /* Check "max-frame-size" K/V item "*/
577 else if (!memcmp(str, "max-frame-size", sz)) {
578 if ((i = check_max_frame_size(w, idx)) == -1)
579 goto error;
580 idx = i;
581 }
582 /* Check "healthcheck" K/V item "*/
583 else if (!memcmp(str, "healthcheck", sz)) {
584 if ((i = check_healthcheck(w, idx)) == -1)
585 goto error;
586 idx = i;
587 }
588 /* Skip "capabilities" K/V item for now */
589 else {
590 /* Silently ignore unknown item */
591 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
592 w->status_code = SPOE_FRM_ERR_INVALID;
593 goto error;
594 }
595 idx += i;
596 }
597 }
598
599 return idx;
600skip:
601 return 0;
602error:
603 return -1;
604}
605
606/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
607 * occurred, 0 if the frame must be skipped, otherwise the number of read
608 * bytes. */
609static int
610handle_hadiscon(struct worker *w)
611{
612 char *end = w->buf+w->len;
613 int i, idx = 0;
614
615 /* Check frame type */
616 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
617 goto skip;
618
619 /* Skip flags */
620 idx += 4;
621
622 /* stream-id and frame-id must be cleared */
623 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
624 w->status_code = SPOE_FRM_ERR_INVALID;
625 goto error;
626 }
627 idx += 2;
628
629 /* Loop on K/V items */
630 while (idx < w->len) {
631 char *str;
632 uint64_t sz;
633
634 /* Decode item key */
635 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
636 if (str == NULL) {
637 w->status_code = SPOE_FRM_ERR_INVALID;
638 goto error;
639 }
640 /* Silently ignore unknown item */
641 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
642 w->status_code = SPOE_FRM_ERR_INVALID;
643 goto error;
644 }
645 idx += i;
646 }
647
648 w->status_code = SPOE_FRM_ERR_NONE;
649 return idx;
650skip:
651 return 0;
652error:
653 return -1;
654}
655
656/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
657 * occurred, 0 if the frame must be skipped, otherwise the number of read
658 * bytes. */
659static int
660handle_hanotify(struct worker *w)
661{
662 char *end = w->buf+w->len;
663 uint64_t stream_id, frame_id;
664 int nbargs, i, idx = 0;
665
666 /* Check frame type */
667 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
668 goto skip;
669
670 /* Skip flags */
671 idx += 4;
672
673 /* Read the stream-id */
674 if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
675 w->status_code = SPOE_FRM_ERR_INVALID;
676 goto error;
677 }
678 idx += i;
679
680 /* Read the frame-id */
681 if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
682 w->status_code = SPOE_FRM_ERR_INVALID;
683 goto error;
684 }
685 idx += i;
686
687 w->stream_id = (unsigned int)stream_id;
688 w->frame_id = (unsigned int)frame_id;
689
690 DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
691 w->stream_id, w->frame_id);
692
693 /* Loop on messages */
694 while (idx < w->len) {
695 char *str;
696 uint64_t sz;
697
698 /* Decode the message name */
699 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
700 if (str == NULL) {
701 w->status_code = SPOE_FRM_ERR_INVALID;
702 goto error;
703 }
704 DEBUG(" Message '%.*s' received", (int)sz, str);
705
706 nbargs = w->buf[idx++];
707 if (!memcmp(str, "check-client-ip", sz)) {
708 struct spoe_data data;
709
710 memset(&data, 0, sizeof(data));
711
712 if (nbargs != 1) {
713 w->status_code = SPOE_FRM_ERR_INVALID;
714 goto error;
715 }
716 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
717 w->status_code = SPOE_FRM_ERR_INVALID;
718 goto error;
719 }
720 idx += i;
721 if ((i = decode_spoe_data(w->buf+idx, end, &data)) == -1) {
722 w->status_code = SPOE_FRM_ERR_INVALID;
723 goto error;
724 }
725 idx += i;
726 if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4)
727 check_ipv4_reputation(w, &data.u.ipv4);
728 else if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV6)
729 check_ipv6_reputation(w, &data.u.ipv6);
730 else {
731 w->status_code = SPOE_FRM_ERR_INVALID;
732 goto error;
733 }
734 }
735 else {
736 while (nbargs-- > 0) {
737 /* Silently ignore argument: its name and its value */
738 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
739 w->status_code = SPOE_FRM_ERR_INVALID;
740 goto error;
741 }
742 idx += i;
743 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
744 w->status_code = SPOE_FRM_ERR_INVALID;
745 goto error;
746 }
747 idx += i;
748 }
749 }
750 }
751
752 return idx;
753skip:
754 return 0;
755error:
756 return -1;
757}
758
759/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
760 * occurred, the number of written bytes otherwise. */
761static int
762prepare_agenthello(struct worker *w)
763{
764 int idx = 0;
765
766 /* Frame Type */
767 w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
768
769 /* No flags for now */
770 memset(w->buf+idx, 0, 4); /* No flags */
771 idx += 4;
772
773 /* No stream-id and frame-id for HELLO frames */
774 w->buf[idx++] = 0;
775 w->buf[idx++] = 0;
776
777 /* "version" K/V item */
778 idx += encode_spoe_string("version", 7, w->buf+idx);
779 w->buf[idx++] = SPOE_DATA_T_STR;
780 idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
781
782 /* "max-frame-size" K/V item */
783 idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
784 w->buf[idx++] = SPOE_DATA_T_UINT32;
785 idx += encode_spoe_varint(w->size, w->buf+idx);
786
787 /* "capabilities" K/V item */
788 idx += encode_spoe_string("capabilities", 12, w->buf+idx);
789 w->buf[idx++] = SPOE_DATA_T_STR;
790 idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
791
792 w->len = idx;
793 return idx;
794}
795
796/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
797 * the number of written bytes otherwise. */
798static int
799prepare_agentack(struct worker *w)
800{
801 int idx = 0;
802
803 /* Frame type */
804 w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
805
806 /* No flags for now */
807 memset(w->buf+idx, 0, 4); /* No flags */
808 idx += 4;
809
810 /* Set stream-id and frame-id for ACK frames */
811 idx += encode_spoe_varint(w->stream_id, w->buf+idx);
812 idx += encode_spoe_varint(w->frame_id, w->buf+idx);
813
814 /* Data */
815 if (w->ip_score == -1)
816 goto out;
817
818 w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */
819 w->buf[idx++] = 3; /* Number of args */
820 w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */
821 idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
822 w->buf[idx++] = SPOE_DATA_T_UINT32;
823 idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
824out:
825 w->len = idx;
826 return idx;
827}
828
829/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
830 * occurred, the number of written bytes otherwise. */
831static int
832prepare_agentdicon(struct worker *w)
833{
834 const char *reason;
835 int rlen, idx = 0;
836
837 if (w->status_code >= SPOE_FRM_ERRS)
838 w->status_code = SPOE_FRM_ERR_UNKNOWN;
839 reason = spoe_frm_err_reasons[w->status_code];
840 rlen = strlen(reason);
841
842 /* Frame type */
843 w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
844
845 /* No flags for now */
846 memset(w->buf+idx, 0, 4);
847 idx += 4;
848
849 /* No stream-id and frame-id for DISCONNECT frames */
850 w->buf[idx++] = 0;
851 w->buf[idx++] = 0;
852
853 /* There are 2 mandatory items: "status-code" and "message" */
854
855 /* "status-code" K/V item */
856 idx += encode_spoe_string("status-code", 11, w->buf+idx);
857 w->buf[idx++] = SPOE_DATA_T_UINT32;
858 idx += encode_spoe_varint(w->status_code, w->buf+idx);
859
860 /* "message" K/V item */
861 idx += encode_spoe_string("message", 7, w->buf+idx);
862 w->buf[idx++] = SPOE_DATA_T_STR;
863 idx += encode_spoe_string(reason, rlen, w->buf+idx);
864
865 w->len = idx;
866 return idx;
867}
868
869static int
870hello_handshake(int sock, struct worker *w)
871{
872 if (read_frame(sock, w) < 0) {
873 LOG("Failed to read Haproxy HELLO frame");
874 goto error;
875 }
876 if (handle_hahello(w) < 0) {
877 LOG("Failed to handle Haproxy HELLO frame");
878 goto error;
879 }
880 if (prepare_agenthello(w) < 0) {
881 LOG("Failed to prepare Agent HELLO frame");
882 goto error;
883 }
884 if (write_frame(sock, w) < 0) {
885 LOG("Failed to write Agent frame");
886 goto error;
887 }
888 DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
889 SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
890 return 0;
891error:
892 return -1;
893}
894
895static int
896notify_ack_roundtip(int sock, struct worker *w)
897{
898 if (read_frame(sock, w) < 0) {
899 LOG("Failed to read Haproxy NOTIFY frame");
900 goto error_or_quit;
901 }
902 if (handle_hadiscon(w) != 0) {
903 if (w->status_code != SPOE_FRM_ERR_NONE)
904 LOG("Failed to handle Haproxy DISCONNECT frame");
905 DEBUG("Disconnect frame received: reason=%s",
906 spoe_frm_err_reasons[w->status_code]);
907 goto error_or_quit;
908 }
909 if (handle_hanotify(w) < 0) {
910 LOG("Failed to handle Haproxy NOTIFY frame");
911 goto error_or_quit;
912 }
913 if (prepare_agentack(w) < 0) {
914 LOG("Failed to prepare Agent ACK frame");
915 goto error_or_quit;
916 }
917 if (write_frame(sock, w) < 0) {
918 LOG("Failed to write Agent ACK frame");
919 goto error_or_quit;
920 }
921 DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
922 w->stream_id, w->frame_id);
923 return 0;
924error_or_quit:
925 return -1;
926}
927
928static void *
Thierry FOURNIER5301ed12018-02-23 11:59:15 +0100929spoa_worker(void *data)
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100930{
931 struct worker w;
932 struct sockaddr_in client;
933 int *info = (int *)data;
934 int csock, lsock = info[0];
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100935 struct ps *ps;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100936
937 signal(SIGPIPE, SIG_IGN);
938 pthread_setspecific(worker_id, &info[1]);
939
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100940 /* Init registered processors */
941 for (ps = ps_list; ps != NULL; ps = ps->next)
942 ps->init_worker(&w);
943
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100944 while (1) {
945 socklen_t sz = sizeof(client);
946
947 if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) {
948 LOG("Failed to accept client connection: %m");
949 goto out;
950 }
951 memset(&w, 0, sizeof(w));
952 w.id = info[1];
953 w.size = MAX_FRAME_SIZE;
954
955 DEBUG("New connection from HAProxy accepted");
956
957 if (hello_handshake(csock, &w) < 0)
958 goto disconnect;
959 if (w.healthcheck == true)
960 goto close;
961 while (1) {
962 w.ip_score = -1;
963 if (notify_ack_roundtip(csock, &w) < 0)
964 break;
965 }
966
967 disconnect:
968 if (w.status_code == SPOE_FRM_ERR_IO) {
969 LOG("Close the client socket because of I/O errors");
970 goto close;
971 }
972 if (prepare_agentdicon(&w) < 0) {
973 LOG("Failed to prepare Agent DISCONNECT frame");
974 goto close;
975 }
976 if (write_frame(csock, &w) < 0) {
977 LOG("Failed to write Agent DISCONNECT frame");
978 goto close;
979 }
980 DEBUG("Disconnect frame sent: reason=%s",
981 spoe_frm_err_reasons[w.status_code]);
982
983 close:
984 close(csock);
985 }
986
987out:
988 free(info);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +0100989#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100990 pthread_exit(NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +0100991#endif
992 return NULL;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100993}
994
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +0100995int process_create(pid_t *pid, void *(*ps)(void *), void *data)
996{
Thierry FOURNIER786e9e62018-02-23 19:11:47 +0100997 if (debug) {
998 ps(data);
999 exit(EXIT_SUCCESS);
1000 }
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001001 *pid = fork();
1002 if (*pid == -1)
1003 return -1;
1004 if (*pid > 0)
1005 return 0;
1006 ps(data);
1007 return 0;
1008}
1009
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001010static void
1011usage(char *prog)
1012{
1013 fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>]\n", prog);
1014 fprintf(stderr, " -h Print this message\n");
1015 fprintf(stderr, " -d Enable the debug mode\n");
1016 fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n");
1017 fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
1018}
1019
1020int
1021main(int argc, char **argv)
1022{
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001023#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001024 pthread_t *ts = NULL;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001025#endif
1026 pid_t *pids;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001027 struct sockaddr_in server;
1028 int i, sock, opt, nbworkers, port;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001029 int status;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001030
1031 nbworkers = NUM_WORKERS;
1032 port = DEFAULT_PORT;
1033 while ((opt = getopt(argc, argv, "hdn:p:")) != -1) {
1034 switch (opt) {
1035 case 'h':
1036 usage(argv[0]);
1037 return EXIT_SUCCESS;
1038 case 'd':
1039 debug = true;
1040 break;
1041 case 'n':
1042 nbworkers = atoi(optarg);
1043 break;
1044 case 'p':
1045 port = atoi(optarg);
1046 break;
1047 default:
1048 usage(argv[0]);
1049 return EXIT_FAILURE;
1050 }
1051 }
1052
1053 if (nbworkers <= 0) {
1054 fprintf(stderr, "%s: Invalid number of workers '%d'\n",
1055 argv[0], nbworkers);
1056 goto error;
1057 }
1058 if (port <= 0) {
1059 fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
1060 goto error;
1061 }
1062
1063 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1064 fprintf(stderr, "Failed creating socket: %m\n");
1065 goto error;
1066 }
1067
1068 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
1069 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
1070
1071 memset(&server, 0, sizeof(server));
1072 server.sin_family = AF_INET;
1073 server.sin_addr.s_addr = INADDR_ANY;
1074 server.sin_port = htons(port);
1075
1076 if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
1077 fprintf(stderr, "Failed to bind the socket: %m\n");
1078 goto error;
1079 }
1080
1081 if (listen(sock , 10) < 0) {
1082 fprintf(stderr, "Failed to listen on the socket: %m\n");
1083 goto error;
1084 }
1085 fprintf(stderr, "SPOA is listening on port %d\n", port);
1086
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001087 pthread_key_create(&worker_id, NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001088
1089 /* Initialise the server in thread mode. This code is commented
1090 * out and not deleted, because later I expect to work with
1091 * process ansd threads. This first version just support processes.
1092 */
1093#if 0
1094 ts = calloc(nbworkers, sizeof(*ts));
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001095 for (i = 0; i < nbworkers; i++) {
1096 int *info = calloc(2, sizeof(*info));
1097
1098 info[0] = sock;
1099 info[1] = i+1;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001100
Thierry FOURNIER5301ed12018-02-23 11:59:15 +01001101 if (pthread_create(&ts[i], NULL, spoa_worker, info) < 0) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001102 fprintf(stderr, "Failed to create thread %d: %m\n", i+1);
1103 goto error;
1104 }
1105 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1106 }
1107
1108 for (i = 0; i < nbworkers; i++) {
1109 pthread_join(ts[i], NULL);
1110 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1111 }
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001112 free(ts);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001113#endif
1114
1115 /* Start processes */
1116 pids = calloc(nbworkers, sizeof(*pids));
1117 if (!pids) {
1118 fprintf(stderr, "Out of memory error\n");
1119 goto error;
1120 }
1121 for (i = 0; i < nbworkers; i++) {
1122 int *info = calloc(2, sizeof(*info));
1123
1124 info[0] = sock;
1125 info[1] = i+1;
1126
1127 if (process_create(&pids[i], spoa_worker, info) == -1) {
1128 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1129 goto error;
1130 }
1131 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1132 }
1133 for (i = 0; i < nbworkers; i++) {
1134 waitpid(pids[0], &status, 0);
1135 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1136 }
1137
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001138 close(sock);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001139 pthread_key_delete(worker_id);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001140 return EXIT_SUCCESS;
1141error:
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001142 return EXIT_FAILURE;
1143}