blob: ce59c04a08a54d060fa0ed96bc20ae9198a4ef60 [file] [log] [blame]
Christopher Faulet010fded2016-11-03 22:49:37 +01001/*
2 * A Random IP reputation service acting as a Stream Processing Offload Agent
3 *
4 * This is a very simple service that implement a "random" ip reputation
5 * service. It will return random scores for all checked IP addresses. It only
6 * shows you how to implement a ip reputation service or such kind of services
7 * using the SPOE.
8 *
9 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
10 *
11 * This program is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU General Public License
13 * as published by the Free Software Foundation; either version
14 * 2 of the License, or (at your option) any later version.
15 *
16 */
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20#include <stdbool.h>
21#include <unistd.h>
22#include <signal.h>
23#include <pthread.h>
24#include <sys/time.h>
25#include <sys/types.h>
26#include <sys/socket.h>
27#include <netinet/in.h>
28#include <netinet/tcp.h>
29#include <arpa/inet.h>
30
31#define DEFAULT_PORT 12345
32#define NUM_WORKERS 5
33#define MAX_FRAME_SIZE 16384
34#define SPOP_VERSION "1.0"
35#define SPOA_CAPABILITIES ""
36
37#define SLEN(str) (sizeof(str)-1)
38
39#define LOG(fmt, args...) \
40 do { \
41 struct timeval now; \
42 int wid = *((int*)pthread_getspecific(worker_id)); \
43 \
44 gettimeofday(&now, NULL); \
45 fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n", \
46 now.tv_sec, now.tv_usec, wid, ##args); \
47 } while (0)
48
49#define DEBUG(x...) \
50 do { \
51 if (debug) \
52 LOG(x); \
53 } while (0)
54
55/* Frame Types sent by HAProxy and by agents */
56enum spoe_frame_type {
57 /* Frames sent by HAProxy */
58 SPOE_FRM_T_HAPROXY_HELLO = 1,
59 SPOE_FRM_T_HAPROXY_DISCON,
60 SPOE_FRM_T_HAPROXY_NOTIFY,
61
62 /* Frames sent by the agents */
63 SPOE_FRM_T_AGENT_HELLO = 101,
64 SPOE_FRM_T_AGENT_DISCON,
65 SPOE_FRM_T_AGENT_ACK
66};
67
68/* All supported data types */
69enum spoe_data_type {
70 SPOE_DATA_T_NULL = 0,
71 SPOE_DATA_T_BOOL,
72 SPOE_DATA_T_INT32,
73 SPOE_DATA_T_UINT32,
74 SPOE_DATA_T_INT64,
75 SPOE_DATA_T_UINT64,
76 SPOE_DATA_T_IPV4,
77 SPOE_DATA_T_IPV6,
78 SPOE_DATA_T_STR,
79 SPOE_DATA_T_BIN,
80 SPOE_DATA_TYPES
81};
82
83/* Errors triggerd by SPOE applet */
84enum spoe_frame_error {
85 SPOE_FRM_ERR_NONE = 0,
86 SPOE_FRM_ERR_IO,
87 SPOE_FRM_ERR_TOUT,
88 SPOE_FRM_ERR_TOO_BIG,
89 SPOE_FRM_ERR_INVALID,
90 SPOE_FRM_ERR_NO_VSN,
91 SPOE_FRM_ERR_NO_FRAME_SIZE,
92 SPOE_FRM_ERR_NO_CAP,
93 SPOE_FRM_ERR_BAD_VSN,
94 SPOE_FRM_ERR_BAD_FRAME_SIZE,
95 SPOE_FRM_ERR_UNKNOWN = 99,
96 SPOE_FRM_ERRS,
97};
98
99/* All supported SPOE actions */
100enum spoe_action_type {
101 SPOE_ACT_T_SET_VAR = 1,
102 SPOE_ACT_T_UNSET_VAR,
103 SPOE_ACT_TYPES,
104};
105
106/* Scopes used for variables set by agents. It is a way to be agnotic to vars
107 * scope. */
108enum spoe_vars_scope {
109 SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
110 SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
111 SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
112 SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
113 SPOE_SCOPE_RES, /* <=> SCOPE_RES */
114};
115
116
117/* Masks to get data type or flags value */
118#define SPOE_DATA_T_MASK 0x0F
119#define SPOE_DATA_FL_MASK 0xF0
120
121/* Flags to set Boolean values */
122#define SPOE_DATA_FL_FALSE 0x00
123#define SPOE_DATA_FL_TRUE 0x10
124static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
125 [SPOE_FRM_ERR_NONE] = "normal",
126 [SPOE_FRM_ERR_IO] = "I/O error",
127 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
128 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
129 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
130 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
131 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
132 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
133 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
134 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
135 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
136};
137
138struct worker {
139 unsigned int id;
140 char buf[MAX_FRAME_SIZE];
141 unsigned int len;
142 unsigned int size;
143 int status_code;
144 unsigned int stream_id;
145 unsigned int frame_id;
Christopher Fauletba7bc162016-11-07 21:07:38 +0100146 bool healthcheck;
Christopher Faulet010fded2016-11-03 22:49:37 +0100147 int ip_score; /* -1 if unset, else between 0 and 100 */
148};
149
150struct chunk {
151 char *str; /* beginning of the string itself. Might not be 0-terminated */
152 int len; /* current size of the string from first to last char */
153};
154
155union spoe_value {
156 bool boolean; /* use for boolean */
157 int32_t sint32; /* used for signed 32bits integers */
158 uint32_t uint32; /* used for signed 32bits integers */
159 int32_t sint64; /* used for signed 64bits integers */
160 uint32_t uint64; /* used for signed 64bits integers */
161 struct in_addr ipv4; /* used for ipv4 addresses */
162 struct in6_addr ipv6; /* used for ipv6 addresses */
163 struct chunk buffer; /* used for char strings or buffers */
164};
165
166/* Used to store sample constant */
167struct spoe_data {
168 enum spoe_data_type type; /* SPOE_DATA_T_* */
169 union spoe_value u; /* spoe data value */
170};
171
172static bool debug = false;
173static pthread_key_t worker_id;
174
175static void
176check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
177{
178 char str[INET_ADDRSTRLEN];
179
180 if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
181 return;
182
183 w->ip_score = random() % 100;
184
185 DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
186}
187
188static void
189check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
190{
191 char str[INET6_ADDRSTRLEN];
192
193 if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
194 return;
195
196 w->ip_score = random() % 100;
197
198 DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
199}
200
201static int
202do_read(int sock, void *buf, int read_len)
203{
204 fd_set readfds;
205 int n = 0, total = 0, bytesleft = read_len;
206
207 FD_ZERO(&readfds);
208 FD_SET(sock, &readfds);
209
210 while (total < read_len) {
211 if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
212 return -1;
213 if (!FD_ISSET(sock, &readfds))
214 return -1;
215
216 n = read(sock, buf + total, bytesleft);
217 if (n <= 0)
218 break;
219
220 total += n;
221 bytesleft -= n;
222 }
223
224 return (n == -1) ? -1 : total;
225}
226
227static int
228do_write(int sock, void *buf, int write_len)
229{
230 fd_set writefds;
231 int n = 0, total = 0, bytesleft = write_len;
232
233 FD_ZERO(&writefds);
234 FD_SET(sock, &writefds);
235
236 while (total < write_len) {
237 if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
238 return -1;
239 if (!FD_ISSET(sock, &writefds))
240 return -1;
241
242 n = write(sock, buf + total, bytesleft);
243 if (n <= 0)
244 break;
245
246 total += n;
247 bytesleft -= n;
248 }
249
250 return (n == -1) ? -1 : total;
251}
252
253/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
254 * otherwise the number of read bytes.*/
255static int
256read_frame(int sock, struct worker *w)
257{
258 uint32_t netint;
259 unsigned int framesz;
260
261 /* Read the frame size, on 4 bytes */
262 if (do_read(sock, &netint, sizeof(netint)) != 4) {
263 w->status_code = SPOE_FRM_ERR_IO;
264 return -1;
265 }
266
267 /* Check it against the max size */
268 framesz = ntohl(netint);
269 if (framesz > w->size) {
270 w->status_code = SPOE_FRM_ERR_TOO_BIG;
271 return -1;
272 }
273
274 /* Read the frame */
275 if (do_read(sock, w->buf, framesz) != framesz) {
276 w->status_code = SPOE_FRM_ERR_IO;
277 return -1;
278 }
279
280 w->len = framesz;
281 return framesz;
282}
283
284/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
285 * number of written bytes. */
286static int
287write_frame(int sock, struct worker *w)
288{
289 uint32_t netint;
290
291 /* Write the frame size, on 4 bytes */
292 netint = htonl(w->len);
293 if (do_write(sock, &netint, sizeof(netint)) != 4) {
294 w->status_code = SPOE_FRM_ERR_IO;
295 return -1;
296 }
297
298 /* Write the frame */
299 if (do_write(sock, w->buf, w->len) != w->len) {
300 w->status_code = SPOE_FRM_ERR_IO;
301 return -1;
302 }
303 return w->len;
304}
305
306/* Encode a variable-length integer. This function never fails and returns the
307 * number of written bytes. */
308static int
309encode_spoe_varint(uint64_t i, char *buf)
310{
311 int idx;
312
313 if (i < 240) {
314 buf[0] = (unsigned char)i;
315 return 1;
316 }
317
318 buf[0] = (unsigned char)i | 240;
319 i = (i - 240) >> 4;
320 for (idx = 1; i >= 128; ++idx) {
321 buf[idx] = (unsigned char)i | 128;
322 i = (i - 128) >> 7;
323 }
324 buf[idx++] = (unsigned char)i;
325 return idx;
326}
327
328/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
329 * happens when the buffer's end in reached. On success, the number of read
330 * bytes is returned. */
331static int
332decode_spoe_varint(char *buf, char *end, uint64_t *i)
333{
334 unsigned char *msg = (unsigned char *)buf;
335 int idx = 0;
336
337 if (msg > (unsigned char *)end)
338 return -1;
339
340 if (msg[0] < 240) {
341 *i = msg[0];
342 return 1;
343 }
344 *i = msg[0];
345 do {
346 ++idx;
347 if (msg+idx > (unsigned char *)end)
348 return -1;
349 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
350 } while (msg[idx] >= 128);
351 return (idx + 1);
352}
353
354/* Encode a string. The string will be prefix by its length, encoded as a
355 * variable-length integer. This function never fails and returns the number of
356 * written bytes. */
357static int
358encode_spoe_string(const char *str, size_t len, char *dst)
359{
360 int idx = 0;
361
362 if (!len) {
363 dst[0] = 0;
364 return 1;
365 }
366
367 idx += encode_spoe_varint(len, dst);
368 memcpy(dst+idx, str, len);
369 return (idx + len);
370}
371
372/* Decode a string. Its length is decoded first as a variable-length integer. If
373 * it succeeds, and if the string length is valid, the begin of the string is
374 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
375 * read is returned. If an error occurred, -1 is returned and <*str> remains
376 * NULL. */
377static int
378decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
379{
380 int r, idx = 0;
381
382 *str = NULL;
383 *len = 0;
384
385 if ((r = decode_spoe_varint(buf, end, len)) == -1)
386 goto error;
387 idx += r;
388 if (buf + idx + *len > end)
389 goto error;
390
391 *str = buf+idx;
392 return (idx + *len);
393
394error:
395 return -1;
396}
397
398/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
399 * of bytes read is returned. A types data is composed of a type (1 byte) and
400 * corresponding data:
401 * - boolean: non additional data (0 bytes)
402 * - integers: a variable-length integer (see decode_spoe_varint)
403 * - ipv4: 4 bytes
404 * - ipv6: 16 bytes
405 * - binary and string: a buffer prefixed by its size, a variable-length
406 * integer (see decode_spoe_string) */
407static int
408skip_spoe_data(char *frame, char *end)
409{
410 uint64_t sz = 0;
411 int r, idx = 0;
412
413 if (frame > end)
414 return -1;
415
416 switch (frame[idx++] & SPOE_DATA_T_MASK) {
417 case SPOE_DATA_T_BOOL:
418 idx++;
419 break;
420 case SPOE_DATA_T_INT32:
421 case SPOE_DATA_T_INT64:
422 case SPOE_DATA_T_UINT32:
423 case SPOE_DATA_T_UINT64:
424 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
425 return -1;
426 idx += r;
427 break;
428 case SPOE_DATA_T_IPV4:
429 idx += 4;
430 break;
431 case SPOE_DATA_T_IPV6:
432 idx += 16;
433 break;
434 case SPOE_DATA_T_STR:
435 case SPOE_DATA_T_BIN:
436 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
437 return -1;
438 idx += r + sz;
439 break;
440 }
441
442 if (frame+idx > end)
443 return -1;
444 return idx;
445}
446
447/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
448 * number of read bytes is returned. See skip_spoe_data for details. */
449static int
450decode_spoe_data(char *frame, char *end, struct spoe_data *data)
451{
452 uint64_t sz = 0;
453 int type, r, idx = 0;
454
455 if (frame > end)
456 return -1;
457
458 type = frame[idx++];
459 data->type = (type & SPOE_DATA_T_MASK);
460 switch (data->type) {
461 case SPOE_DATA_T_BOOL:
462 data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
463 break;
464 case SPOE_DATA_T_INT32:
465 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
466 return -1;
467 data->u.sint32 = sz;
468 idx += r;
469 break;
470 case SPOE_DATA_T_INT64:
471 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
472 return -1;
473 data->u.uint32 = sz;
474 idx += r;
475 break;
476 case SPOE_DATA_T_UINT32:
477 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
478 return -1;
479 data->u.sint64 = sz;
480 idx += r;
481 break;
482 case SPOE_DATA_T_UINT64:
483 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
484 return -1;
485 data->u.uint64 = sz;
486 idx += r;
487 break;
488 case SPOE_DATA_T_IPV4:
489 if (frame+idx+4 > end)
490 return -1;
491 memcpy(&data->u.ipv4, frame+idx, 4);
492 idx += 4;
493 break;
494 case SPOE_DATA_T_IPV6:
495 if (frame+idx+16 > end)
496 return -1;
497 memcpy(&data->u.ipv6, frame+idx, 16);
498 idx += 16;
499 break;
500 case SPOE_DATA_T_STR:
501 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
502 return -1;
503 idx += r;
504 if (frame+idx+sz > end)
505 return -1;
506 data->u.buffer.str = frame+idx;
507 data->u.buffer.len = sz;
508 idx += sz;
509 break;
510 case SPOE_DATA_T_BIN:
511 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
512 return -1;
513 idx += r;
514 if (frame+idx+sz > end)
515 return -1;
516 data->u.buffer.str = frame+idx;
517 data->u.buffer.len = sz;
518 idx += sz;
519 break;
520 default:
521 break;
522 }
523
524 if (frame+idx > end)
525 return -1;
526 return idx;
527}
528
529
530/* Check the protocol version. It returns -1 if an error occurred, the number of
531 * read bytes otherwise. */
532static int
533check_proto_version(struct worker *w, int idx)
534{
535 char *str;
536 uint64_t sz;
537
538 /* Get the list of all supported versions by HAProxy */
539 if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
540 w->status_code = SPOE_FRM_ERR_INVALID;
541 return -1;
542 }
543 idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
544 if (str == NULL) {
545 w->status_code = SPOE_FRM_ERR_INVALID;
546 return -1;
547 }
548
549 /* TODO: Find the right verion in supported ones */
550
551 return idx;
552}
553
554/* Check max frame size value. It returns -1 if an error occurred, the number of
555 * read bytes otherwise. */
556static int
557check_max_frame_size(struct worker *w, int idx)
558{
559 uint64_t sz;
560 int type, i;
561
562 /* Get the max-frame-size value of HAProxy */
563 type = w->buf[idx++];
564 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
565 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
566 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
567 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
568 w->status_code = SPOE_FRM_ERR_INVALID;
569 return -1;
570 }
571 if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
572 w->status_code = SPOE_FRM_ERR_INVALID;
573 return -1;
574 }
575 idx += i;
576
577 /* Keep the lower value */
578 if (sz < w->size)
579 w->size = sz;
580
581 return idx;
582}
583
Christopher Fauletba7bc162016-11-07 21:07:38 +0100584/* Check healthcheck value. It returns -1 if an error occurred, the number of
585 * read bytes otherwise. */
586static int
587check_healthcheck(struct worker *w, int idx)
588{
589 int type;
590
591 /* Get the "healthcheck" value of HAProxy */
592 type = w->buf[idx++];
593 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
594 w->status_code = SPOE_FRM_ERR_INVALID;
595 return -1;
596 }
597 w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
598 return idx;
599}
600
601
Christopher Faulet010fded2016-11-03 22:49:37 +0100602/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
603 * occurred, 0 if the frame must be skipped, otherwise the number of read
604 * bytes. */
605static int
606handle_hahello(struct worker *w)
607{
608 char *end = w->buf+w->len;
609 int i, idx = 0;
610
611 /* Check frame type */
612 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
613 goto skip;
614
615 /* Skip flags */
616 idx += 4;
617
618 /* stream-id and frame-id must be cleared */
619 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
620 w->status_code = SPOE_FRM_ERR_INVALID;
621 goto error;
622 }
623 idx += 2;
624
625 /* Loop on K/V items */
626 while (idx < w->len) {
627 char *str;
628 uint64_t sz;
629
630 /* Decode the item name */
631 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
632 if (str == NULL) {
633 w->status_code = SPOE_FRM_ERR_INVALID;
634 goto error;
635 }
636
637 /* Check "supported-versions" K/V item */
638 if (!memcmp(str, "supported-versions", sz)) {
639 if ((i = check_proto_version(w, idx)) == -1)
640 goto error;
641 idx = i;
642 }
643 /* Check "max-frame-size" K/V item "*/
644 else if (!memcmp(str, "max-frame-size", sz)) {
645 if ((i = check_max_frame_size(w, idx)) == -1)
646 goto error;
647 idx = i;
648 }
Christopher Fauletba7bc162016-11-07 21:07:38 +0100649 /* Check "healthcheck" K/V item "*/
650 else if (!memcmp(str, "healthcheck", sz)) {
651 if ((i = check_healthcheck(w, idx)) == -1)
652 goto error;
653 idx = i;
654 }
Christopher Faulet010fded2016-11-03 22:49:37 +0100655 /* Skip "capabilities" K/V item for now */
656 else {
657 /* Silently ignore unknown item */
658 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
659 w->status_code = SPOE_FRM_ERR_INVALID;
660 goto error;
661 }
662 idx += i;
663 }
664 }
665
666 return idx;
667skip:
668 return 0;
669error:
670 return -1;
671}
672
673/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
674 * occurred, 0 if the frame must be skipped, otherwise the number of read
675 * bytes. */
676static int
677handle_hadiscon(struct worker *w)
678{
679 char *end = w->buf+w->len;
680 int i, idx = 0;
681
682 /* Check frame type */
683 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
684 goto skip;
685
686 /* Skip flags */
687 idx += 4;
688
689 /* stream-id and frame-id must be cleared */
690 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
691 w->status_code = SPOE_FRM_ERR_INVALID;
692 goto error;
693 }
694 idx += 2;
695
696 /* Loop on K/V items */
697 while (idx < w->len) {
698 char *str;
699 uint64_t sz;
700
701 /* Decode item key */
702 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
703 if (str == NULL) {
704 w->status_code = SPOE_FRM_ERR_INVALID;
705 goto error;
706 }
707 /* Silently ignore unknown item */
708 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
709 w->status_code = SPOE_FRM_ERR_INVALID;
710 goto error;
711 }
712 idx += i;
713 }
714
715 w->status_code = SPOE_FRM_ERR_NONE;
716 return idx;
717skip:
718 return 0;
719error:
720 return -1;
721}
722
723/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
724 * occurred, 0 if the frame must be skipped, otherwise the number of read
725 * bytes. */
726static int
727handle_hanotify(struct worker *w)
728{
729 char *end = w->buf+w->len;
730 uint64_t stream_id, frame_id;
731 int nbargs, i, idx = 0;
732
733 /* Check frame type */
734 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
735 goto skip;
736
737 /* Skip flags */
738 idx += 4;
739
740 /* Read the stream-id */
741 if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
742 w->status_code = SPOE_FRM_ERR_INVALID;
743 goto error;
744 }
745 idx += i;
746
747 /* Read the frame-id */
748 if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
749 w->status_code = SPOE_FRM_ERR_INVALID;
750 goto error;
751 }
752 idx += i;
753
754 w->stream_id = (unsigned int)stream_id;
755 w->frame_id = (unsigned int)frame_id;
756
757 DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
758 w->stream_id, w->frame_id);
759
760 /* Loop on messages */
761 while (idx < w->len) {
762 char *str;
763 uint64_t sz;
764
765 /* Decode the message name */
766 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
767 if (str == NULL) {
768 w->status_code = SPOE_FRM_ERR_INVALID;
769 goto error;
770 }
771 DEBUG(" Message '%.*s' received", (int)sz, str);
772
773 nbargs = w->buf[idx++];
774 if (!memcmp(str, "check-client-ip", sz)) {
775 struct spoe_data data;
776
777 memset(&data, 0, sizeof(data));
778
779 if (nbargs != 1) {
780 w->status_code = SPOE_FRM_ERR_INVALID;
781 goto error;
782 }
783 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
784 w->status_code = SPOE_FRM_ERR_INVALID;
785 goto error;
786 }
787 idx += i;
788 if ((i = decode_spoe_data(w->buf+idx, end, &data)) == -1) {
789 w->status_code = SPOE_FRM_ERR_INVALID;
790 goto error;
791 }
792 idx += i;
793 if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4)
794 check_ipv4_reputation(w, &data.u.ipv4);
795 else if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV6)
796 check_ipv6_reputation(w, &data.u.ipv6);
797 else {
798 w->status_code = SPOE_FRM_ERR_INVALID;
799 goto error;
800 }
801 }
802 else {
803 while (nbargs-- > 0) {
804 /* Silently ignore argument: its name and its value */
805 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
806 w->status_code = SPOE_FRM_ERR_INVALID;
807 goto error;
808 }
809 idx += i;
810 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
811 w->status_code = SPOE_FRM_ERR_INVALID;
812 goto error;
813 }
814 idx += i;
815 }
816 }
817 }
818
819 return idx;
820skip:
821 return 0;
822error:
823 return -1;
824}
825
826/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
827 * occurred, the number of written bytes otherwise. */
828static int
829prepare_agenthello(struct worker *w)
830{
831 int idx = 0;
832
833 /* Frame Type */
834 w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
835
836 /* No flags for now */
837 memset(w->buf+idx, 0, 4); /* No flags */
838 idx += 4;
839
840 /* No stream-id and frame-id for HELLO frames */
841 w->buf[idx++] = 0;
842 w->buf[idx++] = 0;
843
844 /* "version" K/V item */
845 idx += encode_spoe_string("version", 7, w->buf+idx);
846 w->buf[idx++] = SPOE_DATA_T_STR;
847 idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
848
849 /* "max-frame-size" K/V item */
850 idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
851 w->buf[idx++] = SPOE_DATA_T_UINT32;
852 idx += encode_spoe_varint(w->size, w->buf+idx);
853
854 /* "capabilities" K/V item */
855 idx += encode_spoe_string("capabilities", 12, w->buf+idx);
856 w->buf[idx++] = SPOE_DATA_T_STR;
857 idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
858
859 w->len = idx;
860 return idx;
861}
862
863/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
864 * the number of written bytes otherwise. */
865static int
866prepare_agentack(struct worker *w)
867{
868 int idx = 0;
869
870 /* Frame type */
871 w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
872
873 /* No flags for now */
874 memset(w->buf+idx, 0, 4); /* No flags */
875 idx += 4;
876
877 /* Set stream-id and frame-id for ACK frames */
878 idx += encode_spoe_varint(w->stream_id, w->buf+idx);
879 idx += encode_spoe_varint(w->frame_id, w->buf+idx);
880
881 /* Data */
882 if (w->ip_score == -1)
883 goto out;
884
885 w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */
886 w->buf[idx++] = 3; /* Number of args */
887 w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */
888 idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
889 w->buf[idx++] = SPOE_DATA_T_UINT32;
890 idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
891out:
892 w->len = idx;
893 return idx;
894}
895
896/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
897 * occurred, the number of written bytes otherwise. */
898static int
899prepare_agentdicon(struct worker *w)
900{
901 const char *reason;
902 int rlen, idx = 0;
903
904 if (w->status_code >= SPOE_FRM_ERRS)
905 w->status_code = SPOE_FRM_ERR_UNKNOWN;
906 reason = spoe_frm_err_reasons[w->status_code];
907 rlen = strlen(reason);
908
909 /* Frame type */
910 w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
911
912 /* No flags for now */
913 memset(w->buf+idx, 0, 4);
914 idx += 4;
915
916 /* No stream-id and frame-id for DISCONNECT frames */
917 w->buf[idx++] = 0;
918 w->buf[idx++] = 0;
919
920 /* There are 2 mandatory items: "status-code" and "message" */
921
922 /* "status-code" K/V item */
923 idx += encode_spoe_string("status-code", 11, w->buf+idx);
924 w->buf[idx++] = SPOE_DATA_T_UINT32;
925 idx += encode_spoe_varint(w->status_code, w->buf+idx);
926
927 /* "message" K/V item */
928 idx += encode_spoe_string("message", 7, w->buf+idx);
929 w->buf[idx++] = SPOE_DATA_T_STR;
930 idx += encode_spoe_string(reason, rlen, w->buf+idx);
931
932 w->len = idx;
933 return idx;
934}
935
936static int
937hello_handshake(int sock, struct worker *w)
938{
939 if (read_frame(sock, w) < 0) {
940 LOG("Failed to read Haproxy HELLO frame");
941 goto error;
942 }
943 if (handle_hahello(w) < 0) {
944 LOG("Failed to handle Haproxy HELLO frame");
945 goto error;
946 }
947 if (prepare_agenthello(w) < 0) {
948 LOG("Failed to prepare Agent HELLO frame");
949 goto error;
950 }
951 if (write_frame(sock, w) < 0) {
952 LOG("Failed to write Agent frame");
953 goto error;
954 }
Christopher Fauletba7bc162016-11-07 21:07:38 +0100955 DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
956 SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
Christopher Faulet010fded2016-11-03 22:49:37 +0100957 return 0;
958error:
959 return -1;
960}
961
962static int
963notify_ack_roundtip(int sock, struct worker *w)
964{
965 if (read_frame(sock, w) < 0) {
966 LOG("Failed to read Haproxy NOTIFY frame");
967 goto error_or_quit;
968 }
969 if (handle_hadiscon(w) != 0) {
970 if (w->status_code != SPOE_FRM_ERR_NONE)
971 LOG("Failed to handle Haproxy DISCONNECT frame");
972 DEBUG("Disconnect frame received: reason=%s",
973 spoe_frm_err_reasons[w->status_code]);
974 goto error_or_quit;
975 }
976 if (handle_hanotify(w) < 0) {
977 LOG("Failed to handle Haproxy NOTIFY frame");
978 goto error_or_quit;
979 }
980 if (prepare_agentack(w) < 0) {
981 LOG("Failed to prepare Agent ACK frame");
982 goto error_or_quit;
983 }
984 if (write_frame(sock, w) < 0) {
985 LOG("Failed to write Agent ACK frame");
986 goto error_or_quit;
987 }
988 DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
989 w->stream_id, w->frame_id);
990 return 0;
991error_or_quit:
992 return -1;
993}
994
995static void *
996worker(void *data)
997{
998 struct worker w;
999 struct sockaddr_in client;
1000 int *info = (int *)data;
1001 int csock, lsock = info[0];
1002
1003 signal(SIGPIPE, SIG_IGN);
1004 pthread_setspecific(worker_id, &info[1]);
1005
1006 while (1) {
1007 socklen_t sz = sizeof(client);
1008
1009 if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) {
1010 LOG("Failed to accept client connection: %m");
1011 goto out;
1012 }
1013 memset(&w, 0, sizeof(w));
1014 w.id = info[1];
1015 w.size = MAX_FRAME_SIZE;
1016
1017 DEBUG("New connection from HAProxy accepted");
1018
1019 if (hello_handshake(csock, &w) < 0)
1020 goto disconnect;
Christopher Fauletba7bc162016-11-07 21:07:38 +01001021 if (w.healthcheck == true)
1022 goto close;
Christopher Faulet010fded2016-11-03 22:49:37 +01001023 while (1) {
1024 w.ip_score = -1;
1025 if (notify_ack_roundtip(csock, &w) < 0)
1026 break;
1027 }
1028
1029 disconnect:
1030 if (w.status_code == SPOE_FRM_ERR_IO) {
1031 LOG("Close the client socket because of I/O errors");
1032 goto close;
1033 }
1034 if (prepare_agentdicon(&w) < 0) {
1035 LOG("Failed to prepare Agent DISCONNECT frame");
1036 goto close;
1037 }
1038 if (write_frame(csock, &w) < 0) {
1039 LOG("Failed to write Agent DISCONNECT frame");
1040 goto close;
1041 }
1042 DEBUG("Disconnect frame sent: reason=%s",
1043 spoe_frm_err_reasons[w.status_code]);
1044
1045 close:
1046 close(csock);
1047 }
1048
1049out:
1050 free(info);
1051 pthread_exit(NULL);
1052}
1053
1054static void
1055usage(char *prog)
1056{
1057 fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>]\n", prog);
1058 fprintf(stderr, " -h Print this message\n");
1059 fprintf(stderr, " -d Enable the debug mode\n");
1060 fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n");
1061 fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
1062}
1063
1064int
1065main(int argc, char **argv)
1066{
1067 pthread_t *ts = NULL;
1068 struct sockaddr_in server;
1069 int i, sock, opt, nbworkers, port;
1070
1071 nbworkers = NUM_WORKERS;
1072 port = DEFAULT_PORT;
1073 while ((opt = getopt(argc, argv, "hdn:p:")) != -1) {
1074 switch (opt) {
1075 case 'h':
1076 usage(argv[0]);
1077 return EXIT_SUCCESS;
1078 case 'd':
1079 debug = true;
1080 break;
1081 case 'n':
1082 nbworkers = atoi(optarg);
1083 break;
1084 case 'p':
1085 port = atoi(optarg);
1086 break;
1087 default:
1088 usage(argv[0]);
1089 return EXIT_FAILURE;
1090 }
1091 }
1092
1093 if (nbworkers <= 0) {
1094 fprintf(stderr, "%s: Invalid number of workers '%d'\n",
1095 argv[0], nbworkers);
1096 goto error;
1097 }
1098 if (port <= 0) {
1099 fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
1100 goto error;
1101 }
1102
1103 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1104 fprintf(stderr, "Failed creating socket: %m\n");
1105 goto error;
1106 }
1107
1108 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
1109 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
1110
1111 memset(&server, 0, sizeof(server));
1112 server.sin_family = AF_INET;
1113 server.sin_addr.s_addr = INADDR_ANY;
1114 server.sin_port = htons(port);
1115
1116 if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
1117 fprintf(stderr, "Failed to bind the socket: %m\n");
1118 goto error;
1119 }
1120
1121 if (listen(sock , 10) < 0) {
1122 fprintf(stderr, "Failed to listen on the socket: %m\n");
1123 goto error;
1124 }
1125 fprintf(stderr, "SPOA is listening on port %d\n", port);
1126
1127 ts = calloc(nbworkers, sizeof(*ts));
1128 pthread_key_create(&worker_id, NULL);
1129 for (i = 0; i < nbworkers; i++) {
1130 int *info = calloc(2, sizeof(*info));
1131
1132 info[0] = sock;
1133 info[1] = i+1;
1134 if (pthread_create(&ts[i], NULL, worker, info) < 0) {
1135 fprintf(stderr, "Failed to create thread %d: %m\n", i+1);
1136 goto error;
1137 }
1138 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1139 }
1140
1141 for (i = 0; i < nbworkers; i++) {
1142 pthread_join(ts[i], NULL);
1143 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1144 }
1145 pthread_key_delete(worker_id);
1146 free(ts);
1147 close(sock);
1148 return EXIT_SUCCESS;
1149error:
1150 free(ts);
1151 return EXIT_FAILURE;
1152}