blob: 52d1c914db35d9839fb3e1e107b8a90934d8f959 [file] [log] [blame]
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001/*
2 * A Random IP reputation service acting as a Stream Processing Offload Agent
3 *
4 * This is a very simple service that implement a "random" ip reputation
5 * service. It will return random scores for all checked IP addresses. It only
6 * shows you how to implement a ip reputation service or such kind of services
7 * using the SPOE.
8 *
9 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010010 * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010011 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version
15 * 2 of the License, or (at your option) any later version.
16 *
17 */
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdbool.h>
22#include <unistd.h>
23#include <signal.h>
24#include <pthread.h>
25#include <sys/time.h>
26#include <sys/types.h>
27#include <sys/socket.h>
28#include <netinet/in.h>
29#include <netinet/tcp.h>
30#include <arpa/inet.h>
31
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010032#include "spoa.h"
33
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010034#define DEFAULT_PORT 12345
35#define NUM_WORKERS 5
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010036
37#define SLEN(str) (sizeof(str)-1)
38
39#define LOG(fmt, args...) \
40 do { \
41 struct timeval now; \
42 int wid = *((int*)pthread_getspecific(worker_id)); \
43 \
44 gettimeofday(&now, NULL); \
45 fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n", \
46 now.tv_sec, now.tv_usec, wid, ##args); \
47 } while (0)
48
49#define DEBUG(x...) \
50 do { \
51 if (debug) \
52 LOG(x); \
53 } while (0)
54
55/* Frame Types sent by HAProxy and by agents */
56enum spoe_frame_type {
57 /* Frames sent by HAProxy */
58 SPOE_FRM_T_HAPROXY_HELLO = 1,
59 SPOE_FRM_T_HAPROXY_DISCON,
60 SPOE_FRM_T_HAPROXY_NOTIFY,
61
62 /* Frames sent by the agents */
63 SPOE_FRM_T_AGENT_HELLO = 101,
64 SPOE_FRM_T_AGENT_DISCON,
65 SPOE_FRM_T_AGENT_ACK
66};
67
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010068/* Errors triggerd by SPOE applet */
69enum spoe_frame_error {
70 SPOE_FRM_ERR_NONE = 0,
71 SPOE_FRM_ERR_IO,
72 SPOE_FRM_ERR_TOUT,
73 SPOE_FRM_ERR_TOO_BIG,
74 SPOE_FRM_ERR_INVALID,
75 SPOE_FRM_ERR_NO_VSN,
76 SPOE_FRM_ERR_NO_FRAME_SIZE,
77 SPOE_FRM_ERR_NO_CAP,
78 SPOE_FRM_ERR_BAD_VSN,
79 SPOE_FRM_ERR_BAD_FRAME_SIZE,
80 SPOE_FRM_ERR_UNKNOWN = 99,
81 SPOE_FRM_ERRS,
82};
83
84/* All supported SPOE actions */
85enum spoe_action_type {
86 SPOE_ACT_T_SET_VAR = 1,
87 SPOE_ACT_T_UNSET_VAR,
88 SPOE_ACT_TYPES,
89};
90
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010091
92/* Masks to get data type or flags value */
93#define SPOE_DATA_T_MASK 0x0F
94#define SPOE_DATA_FL_MASK 0xF0
95
96/* Flags to set Boolean values */
97#define SPOE_DATA_FL_FALSE 0x00
98#define SPOE_DATA_FL_TRUE 0x10
99static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
100 [SPOE_FRM_ERR_NONE] = "normal",
101 [SPOE_FRM_ERR_IO] = "I/O error",
102 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
103 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
104 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
105 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
106 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
107 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
108 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
109 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
110 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
111};
112
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100113static bool debug = false;
114static pthread_key_t worker_id;
115
116static void
117check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
118{
119 char str[INET_ADDRSTRLEN];
120
121 if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
122 return;
123
124 w->ip_score = random() % 100;
125
126 DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
127}
128
129static void
130check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
131{
132 char str[INET6_ADDRSTRLEN];
133
134 if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
135 return;
136
137 w->ip_score = random() % 100;
138
139 DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
140}
141
142static int
143do_read(int sock, void *buf, int read_len)
144{
145 fd_set readfds;
146 int n = 0, total = 0, bytesleft = read_len;
147
148 FD_ZERO(&readfds);
149 FD_SET(sock, &readfds);
150
151 while (total < read_len) {
152 if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
153 return -1;
154 if (!FD_ISSET(sock, &readfds))
155 return -1;
156
157 n = read(sock, buf + total, bytesleft);
158 if (n <= 0)
159 break;
160
161 total += n;
162 bytesleft -= n;
163 }
164
165 return (n == -1) ? -1 : total;
166}
167
168static int
169do_write(int sock, void *buf, int write_len)
170{
171 fd_set writefds;
172 int n = 0, total = 0, bytesleft = write_len;
173
174 FD_ZERO(&writefds);
175 FD_SET(sock, &writefds);
176
177 while (total < write_len) {
178 if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
179 return -1;
180 if (!FD_ISSET(sock, &writefds))
181 return -1;
182
183 n = write(sock, buf + total, bytesleft);
184 if (n <= 0)
185 break;
186
187 total += n;
188 bytesleft -= n;
189 }
190
191 return (n == -1) ? -1 : total;
192}
193
194/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
195 * otherwise the number of read bytes.*/
196static int
197read_frame(int sock, struct worker *w)
198{
199 uint32_t netint;
200 unsigned int framesz;
201
202 /* Read the frame size, on 4 bytes */
203 if (do_read(sock, &netint, sizeof(netint)) != 4) {
204 w->status_code = SPOE_FRM_ERR_IO;
205 return -1;
206 }
207
208 /* Check it against the max size */
209 framesz = ntohl(netint);
210 if (framesz > w->size) {
211 w->status_code = SPOE_FRM_ERR_TOO_BIG;
212 return -1;
213 }
214
215 /* Read the frame */
216 if (do_read(sock, w->buf, framesz) != framesz) {
217 w->status_code = SPOE_FRM_ERR_IO;
218 return -1;
219 }
220
221 w->len = framesz;
222 return framesz;
223}
224
225/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
226 * number of written bytes. */
227static int
228write_frame(int sock, struct worker *w)
229{
230 uint32_t netint;
231
232 /* Write the frame size, on 4 bytes */
233 netint = htonl(w->len);
234 if (do_write(sock, &netint, sizeof(netint)) != 4) {
235 w->status_code = SPOE_FRM_ERR_IO;
236 return -1;
237 }
238
239 /* Write the frame */
240 if (do_write(sock, w->buf, w->len) != w->len) {
241 w->status_code = SPOE_FRM_ERR_IO;
242 return -1;
243 }
244 return w->len;
245}
246
247/* Encode a variable-length integer. This function never fails and returns the
248 * number of written bytes. */
249static int
250encode_spoe_varint(uint64_t i, char *buf)
251{
252 int idx;
253
254 if (i < 240) {
255 buf[0] = (unsigned char)i;
256 return 1;
257 }
258
259 buf[0] = (unsigned char)i | 240;
260 i = (i - 240) >> 4;
261 for (idx = 1; i >= 128; ++idx) {
262 buf[idx] = (unsigned char)i | 128;
263 i = (i - 128) >> 7;
264 }
265 buf[idx++] = (unsigned char)i;
266 return idx;
267}
268
269/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
270 * happens when the buffer's end in reached. On success, the number of read
271 * bytes is returned. */
272static int
273decode_spoe_varint(char *buf, char *end, uint64_t *i)
274{
275 unsigned char *msg = (unsigned char *)buf;
276 int idx = 0;
277
278 if (msg > (unsigned char *)end)
279 return -1;
280
281 if (msg[0] < 240) {
282 *i = msg[0];
283 return 1;
284 }
285 *i = msg[0];
286 do {
287 ++idx;
288 if (msg+idx > (unsigned char *)end)
289 return -1;
290 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
291 } while (msg[idx] >= 128);
292 return (idx + 1);
293}
294
295/* Encode a string. The string will be prefix by its length, encoded as a
296 * variable-length integer. This function never fails and returns the number of
297 * written bytes. */
298static int
299encode_spoe_string(const char *str, size_t len, char *dst)
300{
301 int idx = 0;
302
303 if (!len) {
304 dst[0] = 0;
305 return 1;
306 }
307
308 idx += encode_spoe_varint(len, dst);
309 memcpy(dst+idx, str, len);
310 return (idx + len);
311}
312
313/* Decode a string. Its length is decoded first as a variable-length integer. If
314 * it succeeds, and if the string length is valid, the begin of the string is
315 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
316 * read is returned. If an error occurred, -1 is returned and <*str> remains
317 * NULL. */
318static int
319decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
320{
321 int r, idx = 0;
322
323 *str = NULL;
324 *len = 0;
325
326 if ((r = decode_spoe_varint(buf, end, len)) == -1)
327 goto error;
328 idx += r;
329 if (buf + idx + *len > end)
330 goto error;
331
332 *str = buf+idx;
333 return (idx + *len);
334
335error:
336 return -1;
337}
338
339/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
340 * of bytes read is returned. A types data is composed of a type (1 byte) and
341 * corresponding data:
342 * - boolean: non additional data (0 bytes)
343 * - integers: a variable-length integer (see decode_spoe_varint)
344 * - ipv4: 4 bytes
345 * - ipv6: 16 bytes
346 * - binary and string: a buffer prefixed by its size, a variable-length
347 * integer (see decode_spoe_string) */
348static int
349skip_spoe_data(char *frame, char *end)
350{
351 uint64_t sz = 0;
352 int r, idx = 0;
353
354 if (frame > end)
355 return -1;
356
357 switch (frame[idx++] & SPOE_DATA_T_MASK) {
358 case SPOE_DATA_T_BOOL:
359 idx++;
360 break;
361 case SPOE_DATA_T_INT32:
362 case SPOE_DATA_T_INT64:
363 case SPOE_DATA_T_UINT32:
364 case SPOE_DATA_T_UINT64:
365 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
366 return -1;
367 idx += r;
368 break;
369 case SPOE_DATA_T_IPV4:
370 idx += 4;
371 break;
372 case SPOE_DATA_T_IPV6:
373 idx += 16;
374 break;
375 case SPOE_DATA_T_STR:
376 case SPOE_DATA_T_BIN:
377 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
378 return -1;
379 idx += r + sz;
380 break;
381 }
382
383 if (frame+idx > end)
384 return -1;
385 return idx;
386}
387
388/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
389 * number of read bytes is returned. See skip_spoe_data for details. */
390static int
391decode_spoe_data(char *frame, char *end, struct spoe_data *data)
392{
393 uint64_t sz = 0;
394 int type, r, idx = 0;
395
396 if (frame > end)
397 return -1;
398
399 type = frame[idx++];
400 data->type = (type & SPOE_DATA_T_MASK);
401 switch (data->type) {
402 case SPOE_DATA_T_BOOL:
403 data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
404 break;
405 case SPOE_DATA_T_INT32:
406 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
407 return -1;
408 data->u.sint32 = sz;
409 idx += r;
410 break;
411 case SPOE_DATA_T_INT64:
412 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
413 return -1;
414 data->u.uint32 = sz;
415 idx += r;
416 break;
417 case SPOE_DATA_T_UINT32:
418 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
419 return -1;
420 data->u.sint64 = sz;
421 idx += r;
422 break;
423 case SPOE_DATA_T_UINT64:
424 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
425 return -1;
426 data->u.uint64 = sz;
427 idx += r;
428 break;
429 case SPOE_DATA_T_IPV4:
430 if (frame+idx+4 > end)
431 return -1;
432 memcpy(&data->u.ipv4, frame+idx, 4);
433 idx += 4;
434 break;
435 case SPOE_DATA_T_IPV6:
436 if (frame+idx+16 > end)
437 return -1;
438 memcpy(&data->u.ipv6, frame+idx, 16);
439 idx += 16;
440 break;
441 case SPOE_DATA_T_STR:
442 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
443 return -1;
444 idx += r;
445 if (frame+idx+sz > end)
446 return -1;
447 data->u.buffer.str = frame+idx;
448 data->u.buffer.len = sz;
449 idx += sz;
450 break;
451 case SPOE_DATA_T_BIN:
452 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
453 return -1;
454 idx += r;
455 if (frame+idx+sz > end)
456 return -1;
457 data->u.buffer.str = frame+idx;
458 data->u.buffer.len = sz;
459 idx += sz;
460 break;
461 default:
462 break;
463 }
464
465 if (frame+idx > end)
466 return -1;
467 return idx;
468}
469
470
471/* Check the protocol version. It returns -1 if an error occurred, the number of
472 * read bytes otherwise. */
473static int
474check_proto_version(struct worker *w, int idx)
475{
476 char *str;
477 uint64_t sz;
478
479 /* Get the list of all supported versions by HAProxy */
480 if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
481 w->status_code = SPOE_FRM_ERR_INVALID;
482 return -1;
483 }
484 idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
485 if (str == NULL) {
486 w->status_code = SPOE_FRM_ERR_INVALID;
487 return -1;
488 }
489
490 /* TODO: Find the right verion in supported ones */
491
492 return idx;
493}
494
495/* Check max frame size value. It returns -1 if an error occurred, the number of
496 * read bytes otherwise. */
497static int
498check_max_frame_size(struct worker *w, int idx)
499{
500 uint64_t sz;
501 int type, i;
502
503 /* Get the max-frame-size value of HAProxy */
504 type = w->buf[idx++];
505 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
506 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
507 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
508 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
509 w->status_code = SPOE_FRM_ERR_INVALID;
510 return -1;
511 }
512 if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
513 w->status_code = SPOE_FRM_ERR_INVALID;
514 return -1;
515 }
516 idx += i;
517
518 /* Keep the lower value */
519 if (sz < w->size)
520 w->size = sz;
521
522 return idx;
523}
524
525/* Check healthcheck value. It returns -1 if an error occurred, the number of
526 * read bytes otherwise. */
527static int
528check_healthcheck(struct worker *w, int idx)
529{
530 int type;
531
532 /* Get the "healthcheck" value of HAProxy */
533 type = w->buf[idx++];
534 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
535 w->status_code = SPOE_FRM_ERR_INVALID;
536 return -1;
537 }
538 w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
539 return idx;
540}
541
542
543/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
544 * occurred, 0 if the frame must be skipped, otherwise the number of read
545 * bytes. */
546static int
547handle_hahello(struct worker *w)
548{
549 char *end = w->buf+w->len;
550 int i, idx = 0;
551
552 /* Check frame type */
553 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
554 goto skip;
555
556 /* Skip flags */
557 idx += 4;
558
559 /* stream-id and frame-id must be cleared */
560 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
561 w->status_code = SPOE_FRM_ERR_INVALID;
562 goto error;
563 }
564 idx += 2;
565
566 /* Loop on K/V items */
567 while (idx < w->len) {
568 char *str;
569 uint64_t sz;
570
571 /* Decode the item name */
572 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
573 if (str == NULL) {
574 w->status_code = SPOE_FRM_ERR_INVALID;
575 goto error;
576 }
577
578 /* Check "supported-versions" K/V item */
579 if (!memcmp(str, "supported-versions", sz)) {
580 if ((i = check_proto_version(w, idx)) == -1)
581 goto error;
582 idx = i;
583 }
584 /* Check "max-frame-size" K/V item "*/
585 else if (!memcmp(str, "max-frame-size", sz)) {
586 if ((i = check_max_frame_size(w, idx)) == -1)
587 goto error;
588 idx = i;
589 }
590 /* Check "healthcheck" K/V item "*/
591 else if (!memcmp(str, "healthcheck", sz)) {
592 if ((i = check_healthcheck(w, idx)) == -1)
593 goto error;
594 idx = i;
595 }
596 /* Skip "capabilities" K/V item for now */
597 else {
598 /* Silently ignore unknown item */
599 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
600 w->status_code = SPOE_FRM_ERR_INVALID;
601 goto error;
602 }
603 idx += i;
604 }
605 }
606
607 return idx;
608skip:
609 return 0;
610error:
611 return -1;
612}
613
614/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
615 * occurred, 0 if the frame must be skipped, otherwise the number of read
616 * bytes. */
617static int
618handle_hadiscon(struct worker *w)
619{
620 char *end = w->buf+w->len;
621 int i, idx = 0;
622
623 /* Check frame type */
624 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
625 goto skip;
626
627 /* Skip flags */
628 idx += 4;
629
630 /* stream-id and frame-id must be cleared */
631 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
632 w->status_code = SPOE_FRM_ERR_INVALID;
633 goto error;
634 }
635 idx += 2;
636
637 /* Loop on K/V items */
638 while (idx < w->len) {
639 char *str;
640 uint64_t sz;
641
642 /* Decode item key */
643 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
644 if (str == NULL) {
645 w->status_code = SPOE_FRM_ERR_INVALID;
646 goto error;
647 }
648 /* Silently ignore unknown item */
649 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
650 w->status_code = SPOE_FRM_ERR_INVALID;
651 goto error;
652 }
653 idx += i;
654 }
655
656 w->status_code = SPOE_FRM_ERR_NONE;
657 return idx;
658skip:
659 return 0;
660error:
661 return -1;
662}
663
664/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
665 * occurred, 0 if the frame must be skipped, otherwise the number of read
666 * bytes. */
667static int
668handle_hanotify(struct worker *w)
669{
670 char *end = w->buf+w->len;
671 uint64_t stream_id, frame_id;
672 int nbargs, i, idx = 0;
673
674 /* Check frame type */
675 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
676 goto skip;
677
678 /* Skip flags */
679 idx += 4;
680
681 /* Read the stream-id */
682 if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
683 w->status_code = SPOE_FRM_ERR_INVALID;
684 goto error;
685 }
686 idx += i;
687
688 /* Read the frame-id */
689 if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
690 w->status_code = SPOE_FRM_ERR_INVALID;
691 goto error;
692 }
693 idx += i;
694
695 w->stream_id = (unsigned int)stream_id;
696 w->frame_id = (unsigned int)frame_id;
697
698 DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
699 w->stream_id, w->frame_id);
700
701 /* Loop on messages */
702 while (idx < w->len) {
703 char *str;
704 uint64_t sz;
705
706 /* Decode the message name */
707 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
708 if (str == NULL) {
709 w->status_code = SPOE_FRM_ERR_INVALID;
710 goto error;
711 }
712 DEBUG(" Message '%.*s' received", (int)sz, str);
713
714 nbargs = w->buf[idx++];
715 if (!memcmp(str, "check-client-ip", sz)) {
716 struct spoe_data data;
717
718 memset(&data, 0, sizeof(data));
719
720 if (nbargs != 1) {
721 w->status_code = SPOE_FRM_ERR_INVALID;
722 goto error;
723 }
724 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
725 w->status_code = SPOE_FRM_ERR_INVALID;
726 goto error;
727 }
728 idx += i;
729 if ((i = decode_spoe_data(w->buf+idx, end, &data)) == -1) {
730 w->status_code = SPOE_FRM_ERR_INVALID;
731 goto error;
732 }
733 idx += i;
734 if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4)
735 check_ipv4_reputation(w, &data.u.ipv4);
736 else if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV6)
737 check_ipv6_reputation(w, &data.u.ipv6);
738 else {
739 w->status_code = SPOE_FRM_ERR_INVALID;
740 goto error;
741 }
742 }
743 else {
744 while (nbargs-- > 0) {
745 /* Silently ignore argument: its name and its value */
746 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
747 w->status_code = SPOE_FRM_ERR_INVALID;
748 goto error;
749 }
750 idx += i;
751 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
752 w->status_code = SPOE_FRM_ERR_INVALID;
753 goto error;
754 }
755 idx += i;
756 }
757 }
758 }
759
760 return idx;
761skip:
762 return 0;
763error:
764 return -1;
765}
766
767/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
768 * occurred, the number of written bytes otherwise. */
769static int
770prepare_agenthello(struct worker *w)
771{
772 int idx = 0;
773
774 /* Frame Type */
775 w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
776
777 /* No flags for now */
778 memset(w->buf+idx, 0, 4); /* No flags */
779 idx += 4;
780
781 /* No stream-id and frame-id for HELLO frames */
782 w->buf[idx++] = 0;
783 w->buf[idx++] = 0;
784
785 /* "version" K/V item */
786 idx += encode_spoe_string("version", 7, w->buf+idx);
787 w->buf[idx++] = SPOE_DATA_T_STR;
788 idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
789
790 /* "max-frame-size" K/V item */
791 idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
792 w->buf[idx++] = SPOE_DATA_T_UINT32;
793 idx += encode_spoe_varint(w->size, w->buf+idx);
794
795 /* "capabilities" K/V item */
796 idx += encode_spoe_string("capabilities", 12, w->buf+idx);
797 w->buf[idx++] = SPOE_DATA_T_STR;
798 idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
799
800 w->len = idx;
801 return idx;
802}
803
804/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
805 * the number of written bytes otherwise. */
806static int
807prepare_agentack(struct worker *w)
808{
809 int idx = 0;
810
811 /* Frame type */
812 w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
813
814 /* No flags for now */
815 memset(w->buf+idx, 0, 4); /* No flags */
816 idx += 4;
817
818 /* Set stream-id and frame-id for ACK frames */
819 idx += encode_spoe_varint(w->stream_id, w->buf+idx);
820 idx += encode_spoe_varint(w->frame_id, w->buf+idx);
821
822 /* Data */
823 if (w->ip_score == -1)
824 goto out;
825
826 w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */
827 w->buf[idx++] = 3; /* Number of args */
828 w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */
829 idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
830 w->buf[idx++] = SPOE_DATA_T_UINT32;
831 idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
832out:
833 w->len = idx;
834 return idx;
835}
836
837/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
838 * occurred, the number of written bytes otherwise. */
839static int
840prepare_agentdicon(struct worker *w)
841{
842 const char *reason;
843 int rlen, idx = 0;
844
845 if (w->status_code >= SPOE_FRM_ERRS)
846 w->status_code = SPOE_FRM_ERR_UNKNOWN;
847 reason = spoe_frm_err_reasons[w->status_code];
848 rlen = strlen(reason);
849
850 /* Frame type */
851 w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
852
853 /* No flags for now */
854 memset(w->buf+idx, 0, 4);
855 idx += 4;
856
857 /* No stream-id and frame-id for DISCONNECT frames */
858 w->buf[idx++] = 0;
859 w->buf[idx++] = 0;
860
861 /* There are 2 mandatory items: "status-code" and "message" */
862
863 /* "status-code" K/V item */
864 idx += encode_spoe_string("status-code", 11, w->buf+idx);
865 w->buf[idx++] = SPOE_DATA_T_UINT32;
866 idx += encode_spoe_varint(w->status_code, w->buf+idx);
867
868 /* "message" K/V item */
869 idx += encode_spoe_string("message", 7, w->buf+idx);
870 w->buf[idx++] = SPOE_DATA_T_STR;
871 idx += encode_spoe_string(reason, rlen, w->buf+idx);
872
873 w->len = idx;
874 return idx;
875}
876
877static int
878hello_handshake(int sock, struct worker *w)
879{
880 if (read_frame(sock, w) < 0) {
881 LOG("Failed to read Haproxy HELLO frame");
882 goto error;
883 }
884 if (handle_hahello(w) < 0) {
885 LOG("Failed to handle Haproxy HELLO frame");
886 goto error;
887 }
888 if (prepare_agenthello(w) < 0) {
889 LOG("Failed to prepare Agent HELLO frame");
890 goto error;
891 }
892 if (write_frame(sock, w) < 0) {
893 LOG("Failed to write Agent frame");
894 goto error;
895 }
896 DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
897 SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
898 return 0;
899error:
900 return -1;
901}
902
903static int
904notify_ack_roundtip(int sock, struct worker *w)
905{
906 if (read_frame(sock, w) < 0) {
907 LOG("Failed to read Haproxy NOTIFY frame");
908 goto error_or_quit;
909 }
910 if (handle_hadiscon(w) != 0) {
911 if (w->status_code != SPOE_FRM_ERR_NONE)
912 LOG("Failed to handle Haproxy DISCONNECT frame");
913 DEBUG("Disconnect frame received: reason=%s",
914 spoe_frm_err_reasons[w->status_code]);
915 goto error_or_quit;
916 }
917 if (handle_hanotify(w) < 0) {
918 LOG("Failed to handle Haproxy NOTIFY frame");
919 goto error_or_quit;
920 }
921 if (prepare_agentack(w) < 0) {
922 LOG("Failed to prepare Agent ACK frame");
923 goto error_or_quit;
924 }
925 if (write_frame(sock, w) < 0) {
926 LOG("Failed to write Agent ACK frame");
927 goto error_or_quit;
928 }
929 DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
930 w->stream_id, w->frame_id);
931 return 0;
932error_or_quit:
933 return -1;
934}
935
936static void *
937worker(void *data)
938{
939 struct worker w;
940 struct sockaddr_in client;
941 int *info = (int *)data;
942 int csock, lsock = info[0];
943
944 signal(SIGPIPE, SIG_IGN);
945 pthread_setspecific(worker_id, &info[1]);
946
947 while (1) {
948 socklen_t sz = sizeof(client);
949
950 if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) {
951 LOG("Failed to accept client connection: %m");
952 goto out;
953 }
954 memset(&w, 0, sizeof(w));
955 w.id = info[1];
956 w.size = MAX_FRAME_SIZE;
957
958 DEBUG("New connection from HAProxy accepted");
959
960 if (hello_handshake(csock, &w) < 0)
961 goto disconnect;
962 if (w.healthcheck == true)
963 goto close;
964 while (1) {
965 w.ip_score = -1;
966 if (notify_ack_roundtip(csock, &w) < 0)
967 break;
968 }
969
970 disconnect:
971 if (w.status_code == SPOE_FRM_ERR_IO) {
972 LOG("Close the client socket because of I/O errors");
973 goto close;
974 }
975 if (prepare_agentdicon(&w) < 0) {
976 LOG("Failed to prepare Agent DISCONNECT frame");
977 goto close;
978 }
979 if (write_frame(csock, &w) < 0) {
980 LOG("Failed to write Agent DISCONNECT frame");
981 goto close;
982 }
983 DEBUG("Disconnect frame sent: reason=%s",
984 spoe_frm_err_reasons[w.status_code]);
985
986 close:
987 close(csock);
988 }
989
990out:
991 free(info);
992 pthread_exit(NULL);
993}
994
995static void
996usage(char *prog)
997{
998 fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>]\n", prog);
999 fprintf(stderr, " -h Print this message\n");
1000 fprintf(stderr, " -d Enable the debug mode\n");
1001 fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n");
1002 fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
1003}
1004
1005int
1006main(int argc, char **argv)
1007{
1008 pthread_t *ts = NULL;
1009 struct sockaddr_in server;
1010 int i, sock, opt, nbworkers, port;
1011
1012 nbworkers = NUM_WORKERS;
1013 port = DEFAULT_PORT;
1014 while ((opt = getopt(argc, argv, "hdn:p:")) != -1) {
1015 switch (opt) {
1016 case 'h':
1017 usage(argv[0]);
1018 return EXIT_SUCCESS;
1019 case 'd':
1020 debug = true;
1021 break;
1022 case 'n':
1023 nbworkers = atoi(optarg);
1024 break;
1025 case 'p':
1026 port = atoi(optarg);
1027 break;
1028 default:
1029 usage(argv[0]);
1030 return EXIT_FAILURE;
1031 }
1032 }
1033
1034 if (nbworkers <= 0) {
1035 fprintf(stderr, "%s: Invalid number of workers '%d'\n",
1036 argv[0], nbworkers);
1037 goto error;
1038 }
1039 if (port <= 0) {
1040 fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
1041 goto error;
1042 }
1043
1044 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1045 fprintf(stderr, "Failed creating socket: %m\n");
1046 goto error;
1047 }
1048
1049 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
1050 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
1051
1052 memset(&server, 0, sizeof(server));
1053 server.sin_family = AF_INET;
1054 server.sin_addr.s_addr = INADDR_ANY;
1055 server.sin_port = htons(port);
1056
1057 if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
1058 fprintf(stderr, "Failed to bind the socket: %m\n");
1059 goto error;
1060 }
1061
1062 if (listen(sock , 10) < 0) {
1063 fprintf(stderr, "Failed to listen on the socket: %m\n");
1064 goto error;
1065 }
1066 fprintf(stderr, "SPOA is listening on port %d\n", port);
1067
1068 ts = calloc(nbworkers, sizeof(*ts));
1069 pthread_key_create(&worker_id, NULL);
1070 for (i = 0; i < nbworkers; i++) {
1071 int *info = calloc(2, sizeof(*info));
1072
1073 info[0] = sock;
1074 info[1] = i+1;
1075 if (pthread_create(&ts[i], NULL, worker, info) < 0) {
1076 fprintf(stderr, "Failed to create thread %d: %m\n", i+1);
1077 goto error;
1078 }
1079 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1080 }
1081
1082 for (i = 0; i < nbworkers; i++) {
1083 pthread_join(ts[i], NULL);
1084 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1085 }
1086 pthread_key_delete(worker_id);
1087 free(ts);
1088 close(sock);
1089 return EXIT_SUCCESS;
1090error:
1091 free(ts);
1092 return EXIT_FAILURE;
1093}