blob: c7625414e5a4bb05cd028acde72efa20375c198c [file] [log] [blame]
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001/*
2 * A Random IP reputation service acting as a Stream Processing Offload Agent
3 *
4 * This is a very simple service that implement a "random" ip reputation
5 * service. It will return random scores for all checked IP addresses. It only
6 * shows you how to implement a ip reputation service or such kind of services
7 * using the SPOE.
8 *
9 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010010 * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010011 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version
15 * 2 of the License, or (at your option) any later version.
16 *
17 */
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdbool.h>
22#include <unistd.h>
23#include <signal.h>
24#include <pthread.h>
25#include <sys/time.h>
26#include <sys/types.h>
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +010027#include <sys/wait.h>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010028#include <sys/socket.h>
29#include <netinet/in.h>
30#include <netinet/tcp.h>
31#include <arpa/inet.h>
32
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010033#include "spoa.h"
34
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010035#define DEFAULT_PORT 12345
36#define NUM_WORKERS 5
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010037
38#define SLEN(str) (sizeof(str)-1)
39
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010040/* Frame Types sent by HAProxy and by agents */
41enum spoe_frame_type {
42 /* Frames sent by HAProxy */
43 SPOE_FRM_T_HAPROXY_HELLO = 1,
44 SPOE_FRM_T_HAPROXY_DISCON,
45 SPOE_FRM_T_HAPROXY_NOTIFY,
46
47 /* Frames sent by the agents */
48 SPOE_FRM_T_AGENT_HELLO = 101,
49 SPOE_FRM_T_AGENT_DISCON,
50 SPOE_FRM_T_AGENT_ACK
51};
52
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010053/* Errors triggerd by SPOE applet */
54enum spoe_frame_error {
55 SPOE_FRM_ERR_NONE = 0,
56 SPOE_FRM_ERR_IO,
57 SPOE_FRM_ERR_TOUT,
58 SPOE_FRM_ERR_TOO_BIG,
59 SPOE_FRM_ERR_INVALID,
60 SPOE_FRM_ERR_NO_VSN,
61 SPOE_FRM_ERR_NO_FRAME_SIZE,
62 SPOE_FRM_ERR_NO_CAP,
63 SPOE_FRM_ERR_BAD_VSN,
64 SPOE_FRM_ERR_BAD_FRAME_SIZE,
65 SPOE_FRM_ERR_UNKNOWN = 99,
66 SPOE_FRM_ERRS,
67};
68
69/* All supported SPOE actions */
70enum spoe_action_type {
71 SPOE_ACT_T_SET_VAR = 1,
72 SPOE_ACT_T_UNSET_VAR,
73 SPOE_ACT_TYPES,
74};
75
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010076
77/* Masks to get data type or flags value */
78#define SPOE_DATA_T_MASK 0x0F
79#define SPOE_DATA_FL_MASK 0xF0
80
81/* Flags to set Boolean values */
82#define SPOE_DATA_FL_FALSE 0x00
83#define SPOE_DATA_FL_TRUE 0x10
84static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
85 [SPOE_FRM_ERR_NONE] = "normal",
86 [SPOE_FRM_ERR_IO] = "I/O error",
87 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
88 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
89 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
90 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
91 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
92 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
93 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
94 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
95 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
96};
97
Thierry FOURNIER880d7e12018-02-25 10:54:56 +010098bool debug = false;
99pthread_key_t worker_id;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100100
101static void
102check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
103{
104 char str[INET_ADDRSTRLEN];
105
106 if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
107 return;
108
109 w->ip_score = random() % 100;
110
111 DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
112}
113
114static void
115check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
116{
117 char str[INET6_ADDRSTRLEN];
118
119 if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
120 return;
121
122 w->ip_score = random() % 100;
123
124 DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
125}
126
127static int
128do_read(int sock, void *buf, int read_len)
129{
130 fd_set readfds;
131 int n = 0, total = 0, bytesleft = read_len;
132
133 FD_ZERO(&readfds);
134 FD_SET(sock, &readfds);
135
136 while (total < read_len) {
137 if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
138 return -1;
139 if (!FD_ISSET(sock, &readfds))
140 return -1;
141
142 n = read(sock, buf + total, bytesleft);
143 if (n <= 0)
144 break;
145
146 total += n;
147 bytesleft -= n;
148 }
149
150 return (n == -1) ? -1 : total;
151}
152
153static int
154do_write(int sock, void *buf, int write_len)
155{
156 fd_set writefds;
157 int n = 0, total = 0, bytesleft = write_len;
158
159 FD_ZERO(&writefds);
160 FD_SET(sock, &writefds);
161
162 while (total < write_len) {
163 if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
164 return -1;
165 if (!FD_ISSET(sock, &writefds))
166 return -1;
167
168 n = write(sock, buf + total, bytesleft);
169 if (n <= 0)
170 break;
171
172 total += n;
173 bytesleft -= n;
174 }
175
176 return (n == -1) ? -1 : total;
177}
178
179/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
180 * otherwise the number of read bytes.*/
181static int
182read_frame(int sock, struct worker *w)
183{
184 uint32_t netint;
185 unsigned int framesz;
186
187 /* Read the frame size, on 4 bytes */
188 if (do_read(sock, &netint, sizeof(netint)) != 4) {
189 w->status_code = SPOE_FRM_ERR_IO;
190 return -1;
191 }
192
193 /* Check it against the max size */
194 framesz = ntohl(netint);
195 if (framesz > w->size) {
196 w->status_code = SPOE_FRM_ERR_TOO_BIG;
197 return -1;
198 }
199
200 /* Read the frame */
201 if (do_read(sock, w->buf, framesz) != framesz) {
202 w->status_code = SPOE_FRM_ERR_IO;
203 return -1;
204 }
205
206 w->len = framesz;
207 return framesz;
208}
209
210/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
211 * number of written bytes. */
212static int
213write_frame(int sock, struct worker *w)
214{
215 uint32_t netint;
216
217 /* Write the frame size, on 4 bytes */
218 netint = htonl(w->len);
219 if (do_write(sock, &netint, sizeof(netint)) != 4) {
220 w->status_code = SPOE_FRM_ERR_IO;
221 return -1;
222 }
223
224 /* Write the frame */
225 if (do_write(sock, w->buf, w->len) != w->len) {
226 w->status_code = SPOE_FRM_ERR_IO;
227 return -1;
228 }
229 return w->len;
230}
231
232/* Encode a variable-length integer. This function never fails and returns the
233 * number of written bytes. */
234static int
235encode_spoe_varint(uint64_t i, char *buf)
236{
237 int idx;
238
239 if (i < 240) {
240 buf[0] = (unsigned char)i;
241 return 1;
242 }
243
244 buf[0] = (unsigned char)i | 240;
245 i = (i - 240) >> 4;
246 for (idx = 1; i >= 128; ++idx) {
247 buf[idx] = (unsigned char)i | 128;
248 i = (i - 128) >> 7;
249 }
250 buf[idx++] = (unsigned char)i;
251 return idx;
252}
253
254/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
255 * happens when the buffer's end in reached. On success, the number of read
256 * bytes is returned. */
257static int
258decode_spoe_varint(char *buf, char *end, uint64_t *i)
259{
260 unsigned char *msg = (unsigned char *)buf;
261 int idx = 0;
262
263 if (msg > (unsigned char *)end)
264 return -1;
265
266 if (msg[0] < 240) {
267 *i = msg[0];
268 return 1;
269 }
270 *i = msg[0];
271 do {
272 ++idx;
273 if (msg+idx > (unsigned char *)end)
274 return -1;
275 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
276 } while (msg[idx] >= 128);
277 return (idx + 1);
278}
279
280/* Encode a string. The string will be prefix by its length, encoded as a
281 * variable-length integer. This function never fails and returns the number of
282 * written bytes. */
283static int
284encode_spoe_string(const char *str, size_t len, char *dst)
285{
286 int idx = 0;
287
288 if (!len) {
289 dst[0] = 0;
290 return 1;
291 }
292
293 idx += encode_spoe_varint(len, dst);
294 memcpy(dst+idx, str, len);
295 return (idx + len);
296}
297
298/* Decode a string. Its length is decoded first as a variable-length integer. If
299 * it succeeds, and if the string length is valid, the begin of the string is
300 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
301 * read is returned. If an error occurred, -1 is returned and <*str> remains
302 * NULL. */
303static int
304decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
305{
306 int r, idx = 0;
307
308 *str = NULL;
309 *len = 0;
310
311 if ((r = decode_spoe_varint(buf, end, len)) == -1)
312 goto error;
313 idx += r;
314 if (buf + idx + *len > end)
315 goto error;
316
317 *str = buf+idx;
318 return (idx + *len);
319
320error:
321 return -1;
322}
323
324/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
325 * of bytes read is returned. A types data is composed of a type (1 byte) and
326 * corresponding data:
327 * - boolean: non additional data (0 bytes)
328 * - integers: a variable-length integer (see decode_spoe_varint)
329 * - ipv4: 4 bytes
330 * - ipv6: 16 bytes
331 * - binary and string: a buffer prefixed by its size, a variable-length
332 * integer (see decode_spoe_string) */
333static int
334skip_spoe_data(char *frame, char *end)
335{
336 uint64_t sz = 0;
337 int r, idx = 0;
338
339 if (frame > end)
340 return -1;
341
342 switch (frame[idx++] & SPOE_DATA_T_MASK) {
343 case SPOE_DATA_T_BOOL:
344 idx++;
345 break;
346 case SPOE_DATA_T_INT32:
347 case SPOE_DATA_T_INT64:
348 case SPOE_DATA_T_UINT32:
349 case SPOE_DATA_T_UINT64:
350 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
351 return -1;
352 idx += r;
353 break;
354 case SPOE_DATA_T_IPV4:
355 idx += 4;
356 break;
357 case SPOE_DATA_T_IPV6:
358 idx += 16;
359 break;
360 case SPOE_DATA_T_STR:
361 case SPOE_DATA_T_BIN:
362 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
363 return -1;
364 idx += r + sz;
365 break;
366 }
367
368 if (frame+idx > end)
369 return -1;
370 return idx;
371}
372
373/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
374 * number of read bytes is returned. See skip_spoe_data for details. */
375static int
376decode_spoe_data(char *frame, char *end, struct spoe_data *data)
377{
378 uint64_t sz = 0;
379 int type, r, idx = 0;
380
381 if (frame > end)
382 return -1;
383
384 type = frame[idx++];
385 data->type = (type & SPOE_DATA_T_MASK);
386 switch (data->type) {
387 case SPOE_DATA_T_BOOL:
388 data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
389 break;
390 case SPOE_DATA_T_INT32:
391 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
392 return -1;
393 data->u.sint32 = sz;
394 idx += r;
395 break;
396 case SPOE_DATA_T_INT64:
397 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
398 return -1;
399 data->u.uint32 = sz;
400 idx += r;
401 break;
402 case SPOE_DATA_T_UINT32:
403 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
404 return -1;
405 data->u.sint64 = sz;
406 idx += r;
407 break;
408 case SPOE_DATA_T_UINT64:
409 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
410 return -1;
411 data->u.uint64 = sz;
412 idx += r;
413 break;
414 case SPOE_DATA_T_IPV4:
415 if (frame+idx+4 > end)
416 return -1;
417 memcpy(&data->u.ipv4, frame+idx, 4);
418 idx += 4;
419 break;
420 case SPOE_DATA_T_IPV6:
421 if (frame+idx+16 > end)
422 return -1;
423 memcpy(&data->u.ipv6, frame+idx, 16);
424 idx += 16;
425 break;
426 case SPOE_DATA_T_STR:
427 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
428 return -1;
429 idx += r;
430 if (frame+idx+sz > end)
431 return -1;
432 data->u.buffer.str = frame+idx;
433 data->u.buffer.len = sz;
434 idx += sz;
435 break;
436 case SPOE_DATA_T_BIN:
437 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
438 return -1;
439 idx += r;
440 if (frame+idx+sz > end)
441 return -1;
442 data->u.buffer.str = frame+idx;
443 data->u.buffer.len = sz;
444 idx += sz;
445 break;
446 default:
447 break;
448 }
449
450 if (frame+idx > end)
451 return -1;
452 return idx;
453}
454
455
456/* Check the protocol version. It returns -1 if an error occurred, the number of
457 * read bytes otherwise. */
458static int
459check_proto_version(struct worker *w, int idx)
460{
461 char *str;
462 uint64_t sz;
463
464 /* Get the list of all supported versions by HAProxy */
465 if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
466 w->status_code = SPOE_FRM_ERR_INVALID;
467 return -1;
468 }
469 idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
470 if (str == NULL) {
471 w->status_code = SPOE_FRM_ERR_INVALID;
472 return -1;
473 }
474
475 /* TODO: Find the right verion in supported ones */
476
477 return idx;
478}
479
480/* Check max frame size value. It returns -1 if an error occurred, the number of
481 * read bytes otherwise. */
482static int
483check_max_frame_size(struct worker *w, int idx)
484{
485 uint64_t sz;
486 int type, i;
487
488 /* Get the max-frame-size value of HAProxy */
489 type = w->buf[idx++];
490 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
491 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
492 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
493 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
494 w->status_code = SPOE_FRM_ERR_INVALID;
495 return -1;
496 }
497 if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
498 w->status_code = SPOE_FRM_ERR_INVALID;
499 return -1;
500 }
501 idx += i;
502
503 /* Keep the lower value */
504 if (sz < w->size)
505 w->size = sz;
506
507 return idx;
508}
509
510/* Check healthcheck value. It returns -1 if an error occurred, the number of
511 * read bytes otherwise. */
512static int
513check_healthcheck(struct worker *w, int idx)
514{
515 int type;
516
517 /* Get the "healthcheck" value of HAProxy */
518 type = w->buf[idx++];
519 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
520 w->status_code = SPOE_FRM_ERR_INVALID;
521 return -1;
522 }
523 w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
524 return idx;
525}
526
527
528/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
529 * occurred, 0 if the frame must be skipped, otherwise the number of read
530 * bytes. */
531static int
532handle_hahello(struct worker *w)
533{
534 char *end = w->buf+w->len;
535 int i, idx = 0;
536
537 /* Check frame type */
538 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
539 goto skip;
540
541 /* Skip flags */
542 idx += 4;
543
544 /* stream-id and frame-id must be cleared */
545 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
546 w->status_code = SPOE_FRM_ERR_INVALID;
547 goto error;
548 }
549 idx += 2;
550
551 /* Loop on K/V items */
552 while (idx < w->len) {
553 char *str;
554 uint64_t sz;
555
556 /* Decode the item name */
557 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
558 if (str == NULL) {
559 w->status_code = SPOE_FRM_ERR_INVALID;
560 goto error;
561 }
562
563 /* Check "supported-versions" K/V item */
564 if (!memcmp(str, "supported-versions", sz)) {
565 if ((i = check_proto_version(w, idx)) == -1)
566 goto error;
567 idx = i;
568 }
569 /* Check "max-frame-size" K/V item "*/
570 else if (!memcmp(str, "max-frame-size", sz)) {
571 if ((i = check_max_frame_size(w, idx)) == -1)
572 goto error;
573 idx = i;
574 }
575 /* Check "healthcheck" K/V item "*/
576 else if (!memcmp(str, "healthcheck", sz)) {
577 if ((i = check_healthcheck(w, idx)) == -1)
578 goto error;
579 idx = i;
580 }
581 /* Skip "capabilities" K/V item for now */
582 else {
583 /* Silently ignore unknown item */
584 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
585 w->status_code = SPOE_FRM_ERR_INVALID;
586 goto error;
587 }
588 idx += i;
589 }
590 }
591
592 return idx;
593skip:
594 return 0;
595error:
596 return -1;
597}
598
599/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
600 * occurred, 0 if the frame must be skipped, otherwise the number of read
601 * bytes. */
602static int
603handle_hadiscon(struct worker *w)
604{
605 char *end = w->buf+w->len;
606 int i, idx = 0;
607
608 /* Check frame type */
609 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
610 goto skip;
611
612 /* Skip flags */
613 idx += 4;
614
615 /* stream-id and frame-id must be cleared */
616 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
617 w->status_code = SPOE_FRM_ERR_INVALID;
618 goto error;
619 }
620 idx += 2;
621
622 /* Loop on K/V items */
623 while (idx < w->len) {
624 char *str;
625 uint64_t sz;
626
627 /* Decode item key */
628 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
629 if (str == NULL) {
630 w->status_code = SPOE_FRM_ERR_INVALID;
631 goto error;
632 }
633 /* Silently ignore unknown item */
634 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
635 w->status_code = SPOE_FRM_ERR_INVALID;
636 goto error;
637 }
638 idx += i;
639 }
640
641 w->status_code = SPOE_FRM_ERR_NONE;
642 return idx;
643skip:
644 return 0;
645error:
646 return -1;
647}
648
649/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
650 * occurred, 0 if the frame must be skipped, otherwise the number of read
651 * bytes. */
652static int
653handle_hanotify(struct worker *w)
654{
655 char *end = w->buf+w->len;
656 uint64_t stream_id, frame_id;
657 int nbargs, i, idx = 0;
658
659 /* Check frame type */
660 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
661 goto skip;
662
663 /* Skip flags */
664 idx += 4;
665
666 /* Read the stream-id */
667 if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
668 w->status_code = SPOE_FRM_ERR_INVALID;
669 goto error;
670 }
671 idx += i;
672
673 /* Read the frame-id */
674 if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
675 w->status_code = SPOE_FRM_ERR_INVALID;
676 goto error;
677 }
678 idx += i;
679
680 w->stream_id = (unsigned int)stream_id;
681 w->frame_id = (unsigned int)frame_id;
682
683 DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
684 w->stream_id, w->frame_id);
685
686 /* Loop on messages */
687 while (idx < w->len) {
688 char *str;
689 uint64_t sz;
690
691 /* Decode the message name */
692 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
693 if (str == NULL) {
694 w->status_code = SPOE_FRM_ERR_INVALID;
695 goto error;
696 }
697 DEBUG(" Message '%.*s' received", (int)sz, str);
698
699 nbargs = w->buf[idx++];
700 if (!memcmp(str, "check-client-ip", sz)) {
701 struct spoe_data data;
702
703 memset(&data, 0, sizeof(data));
704
705 if (nbargs != 1) {
706 w->status_code = SPOE_FRM_ERR_INVALID;
707 goto error;
708 }
709 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
710 w->status_code = SPOE_FRM_ERR_INVALID;
711 goto error;
712 }
713 idx += i;
714 if ((i = decode_spoe_data(w->buf+idx, end, &data)) == -1) {
715 w->status_code = SPOE_FRM_ERR_INVALID;
716 goto error;
717 }
718 idx += i;
719 if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4)
720 check_ipv4_reputation(w, &data.u.ipv4);
721 else if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV6)
722 check_ipv6_reputation(w, &data.u.ipv6);
723 else {
724 w->status_code = SPOE_FRM_ERR_INVALID;
725 goto error;
726 }
727 }
728 else {
729 while (nbargs-- > 0) {
730 /* Silently ignore argument: its name and its value */
731 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
732 w->status_code = SPOE_FRM_ERR_INVALID;
733 goto error;
734 }
735 idx += i;
736 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
737 w->status_code = SPOE_FRM_ERR_INVALID;
738 goto error;
739 }
740 idx += i;
741 }
742 }
743 }
744
745 return idx;
746skip:
747 return 0;
748error:
749 return -1;
750}
751
752/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
753 * occurred, the number of written bytes otherwise. */
754static int
755prepare_agenthello(struct worker *w)
756{
757 int idx = 0;
758
759 /* Frame Type */
760 w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
761
762 /* No flags for now */
763 memset(w->buf+idx, 0, 4); /* No flags */
764 idx += 4;
765
766 /* No stream-id and frame-id for HELLO frames */
767 w->buf[idx++] = 0;
768 w->buf[idx++] = 0;
769
770 /* "version" K/V item */
771 idx += encode_spoe_string("version", 7, w->buf+idx);
772 w->buf[idx++] = SPOE_DATA_T_STR;
773 idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
774
775 /* "max-frame-size" K/V item */
776 idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
777 w->buf[idx++] = SPOE_DATA_T_UINT32;
778 idx += encode_spoe_varint(w->size, w->buf+idx);
779
780 /* "capabilities" K/V item */
781 idx += encode_spoe_string("capabilities", 12, w->buf+idx);
782 w->buf[idx++] = SPOE_DATA_T_STR;
783 idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
784
785 w->len = idx;
786 return idx;
787}
788
789/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
790 * the number of written bytes otherwise. */
791static int
792prepare_agentack(struct worker *w)
793{
794 int idx = 0;
795
796 /* Frame type */
797 w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
798
799 /* No flags for now */
800 memset(w->buf+idx, 0, 4); /* No flags */
801 idx += 4;
802
803 /* Set stream-id and frame-id for ACK frames */
804 idx += encode_spoe_varint(w->stream_id, w->buf+idx);
805 idx += encode_spoe_varint(w->frame_id, w->buf+idx);
806
807 /* Data */
808 if (w->ip_score == -1)
809 goto out;
810
811 w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */
812 w->buf[idx++] = 3; /* Number of args */
813 w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */
814 idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
815 w->buf[idx++] = SPOE_DATA_T_UINT32;
816 idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
817out:
818 w->len = idx;
819 return idx;
820}
821
822/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
823 * occurred, the number of written bytes otherwise. */
824static int
825prepare_agentdicon(struct worker *w)
826{
827 const char *reason;
828 int rlen, idx = 0;
829
830 if (w->status_code >= SPOE_FRM_ERRS)
831 w->status_code = SPOE_FRM_ERR_UNKNOWN;
832 reason = spoe_frm_err_reasons[w->status_code];
833 rlen = strlen(reason);
834
835 /* Frame type */
836 w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
837
838 /* No flags for now */
839 memset(w->buf+idx, 0, 4);
840 idx += 4;
841
842 /* No stream-id and frame-id for DISCONNECT frames */
843 w->buf[idx++] = 0;
844 w->buf[idx++] = 0;
845
846 /* There are 2 mandatory items: "status-code" and "message" */
847
848 /* "status-code" K/V item */
849 idx += encode_spoe_string("status-code", 11, w->buf+idx);
850 w->buf[idx++] = SPOE_DATA_T_UINT32;
851 idx += encode_spoe_varint(w->status_code, w->buf+idx);
852
853 /* "message" K/V item */
854 idx += encode_spoe_string("message", 7, w->buf+idx);
855 w->buf[idx++] = SPOE_DATA_T_STR;
856 idx += encode_spoe_string(reason, rlen, w->buf+idx);
857
858 w->len = idx;
859 return idx;
860}
861
862static int
863hello_handshake(int sock, struct worker *w)
864{
865 if (read_frame(sock, w) < 0) {
866 LOG("Failed to read Haproxy HELLO frame");
867 goto error;
868 }
869 if (handle_hahello(w) < 0) {
870 LOG("Failed to handle Haproxy HELLO frame");
871 goto error;
872 }
873 if (prepare_agenthello(w) < 0) {
874 LOG("Failed to prepare Agent HELLO frame");
875 goto error;
876 }
877 if (write_frame(sock, w) < 0) {
878 LOG("Failed to write Agent frame");
879 goto error;
880 }
881 DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
882 SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
883 return 0;
884error:
885 return -1;
886}
887
888static int
889notify_ack_roundtip(int sock, struct worker *w)
890{
891 if (read_frame(sock, w) < 0) {
892 LOG("Failed to read Haproxy NOTIFY frame");
893 goto error_or_quit;
894 }
895 if (handle_hadiscon(w) != 0) {
896 if (w->status_code != SPOE_FRM_ERR_NONE)
897 LOG("Failed to handle Haproxy DISCONNECT frame");
898 DEBUG("Disconnect frame received: reason=%s",
899 spoe_frm_err_reasons[w->status_code]);
900 goto error_or_quit;
901 }
902 if (handle_hanotify(w) < 0) {
903 LOG("Failed to handle Haproxy NOTIFY frame");
904 goto error_or_quit;
905 }
906 if (prepare_agentack(w) < 0) {
907 LOG("Failed to prepare Agent ACK frame");
908 goto error_or_quit;
909 }
910 if (write_frame(sock, w) < 0) {
911 LOG("Failed to write Agent ACK frame");
912 goto error_or_quit;
913 }
914 DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
915 w->stream_id, w->frame_id);
916 return 0;
917error_or_quit:
918 return -1;
919}
920
921static void *
Thierry FOURNIER5301ed12018-02-23 11:59:15 +0100922spoa_worker(void *data)
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100923{
924 struct worker w;
925 struct sockaddr_in client;
926 int *info = (int *)data;
927 int csock, lsock = info[0];
928
929 signal(SIGPIPE, SIG_IGN);
930 pthread_setspecific(worker_id, &info[1]);
931
932 while (1) {
933 socklen_t sz = sizeof(client);
934
935 if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) {
936 LOG("Failed to accept client connection: %m");
937 goto out;
938 }
939 memset(&w, 0, sizeof(w));
940 w.id = info[1];
941 w.size = MAX_FRAME_SIZE;
942
943 DEBUG("New connection from HAProxy accepted");
944
945 if (hello_handshake(csock, &w) < 0)
946 goto disconnect;
947 if (w.healthcheck == true)
948 goto close;
949 while (1) {
950 w.ip_score = -1;
951 if (notify_ack_roundtip(csock, &w) < 0)
952 break;
953 }
954
955 disconnect:
956 if (w.status_code == SPOE_FRM_ERR_IO) {
957 LOG("Close the client socket because of I/O errors");
958 goto close;
959 }
960 if (prepare_agentdicon(&w) < 0) {
961 LOG("Failed to prepare Agent DISCONNECT frame");
962 goto close;
963 }
964 if (write_frame(csock, &w) < 0) {
965 LOG("Failed to write Agent DISCONNECT frame");
966 goto close;
967 }
968 DEBUG("Disconnect frame sent: reason=%s",
969 spoe_frm_err_reasons[w.status_code]);
970
971 close:
972 close(csock);
973 }
974
975out:
976 free(info);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +0100977#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100978 pthread_exit(NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +0100979#endif
980 return NULL;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100981}
982
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +0100983int process_create(pid_t *pid, void *(*ps)(void *), void *data)
984{
985 *pid = fork();
986 if (*pid == -1)
987 return -1;
988 if (*pid > 0)
989 return 0;
990 ps(data);
991 return 0;
992}
993
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100994static void
995usage(char *prog)
996{
997 fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>]\n", prog);
998 fprintf(stderr, " -h Print this message\n");
999 fprintf(stderr, " -d Enable the debug mode\n");
1000 fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n");
1001 fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
1002}
1003
1004int
1005main(int argc, char **argv)
1006{
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001007#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001008 pthread_t *ts = NULL;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001009#endif
1010 pid_t *pids;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001011 struct sockaddr_in server;
1012 int i, sock, opt, nbworkers, port;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001013 int status;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001014
1015 nbworkers = NUM_WORKERS;
1016 port = DEFAULT_PORT;
1017 while ((opt = getopt(argc, argv, "hdn:p:")) != -1) {
1018 switch (opt) {
1019 case 'h':
1020 usage(argv[0]);
1021 return EXIT_SUCCESS;
1022 case 'd':
1023 debug = true;
1024 break;
1025 case 'n':
1026 nbworkers = atoi(optarg);
1027 break;
1028 case 'p':
1029 port = atoi(optarg);
1030 break;
1031 default:
1032 usage(argv[0]);
1033 return EXIT_FAILURE;
1034 }
1035 }
1036
1037 if (nbworkers <= 0) {
1038 fprintf(stderr, "%s: Invalid number of workers '%d'\n",
1039 argv[0], nbworkers);
1040 goto error;
1041 }
1042 if (port <= 0) {
1043 fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
1044 goto error;
1045 }
1046
1047 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1048 fprintf(stderr, "Failed creating socket: %m\n");
1049 goto error;
1050 }
1051
1052 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
1053 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
1054
1055 memset(&server, 0, sizeof(server));
1056 server.sin_family = AF_INET;
1057 server.sin_addr.s_addr = INADDR_ANY;
1058 server.sin_port = htons(port);
1059
1060 if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
1061 fprintf(stderr, "Failed to bind the socket: %m\n");
1062 goto error;
1063 }
1064
1065 if (listen(sock , 10) < 0) {
1066 fprintf(stderr, "Failed to listen on the socket: %m\n");
1067 goto error;
1068 }
1069 fprintf(stderr, "SPOA is listening on port %d\n", port);
1070
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001071 pthread_key_create(&worker_id, NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001072
1073 /* Initialise the server in thread mode. This code is commented
1074 * out and not deleted, because later I expect to work with
1075 * process ansd threads. This first version just support processes.
1076 */
1077#if 0
1078 ts = calloc(nbworkers, sizeof(*ts));
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001079 for (i = 0; i < nbworkers; i++) {
1080 int *info = calloc(2, sizeof(*info));
1081
1082 info[0] = sock;
1083 info[1] = i+1;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001084
Thierry FOURNIER5301ed12018-02-23 11:59:15 +01001085 if (pthread_create(&ts[i], NULL, spoa_worker, info) < 0) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001086 fprintf(stderr, "Failed to create thread %d: %m\n", i+1);
1087 goto error;
1088 }
1089 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1090 }
1091
1092 for (i = 0; i < nbworkers; i++) {
1093 pthread_join(ts[i], NULL);
1094 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1095 }
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001096 free(ts);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001097#endif
1098
1099 /* Start processes */
1100 pids = calloc(nbworkers, sizeof(*pids));
1101 if (!pids) {
1102 fprintf(stderr, "Out of memory error\n");
1103 goto error;
1104 }
1105 for (i = 0; i < nbworkers; i++) {
1106 int *info = calloc(2, sizeof(*info));
1107
1108 info[0] = sock;
1109 info[1] = i+1;
1110
1111 if (process_create(&pids[i], spoa_worker, info) == -1) {
1112 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1113 goto error;
1114 }
1115 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1116 }
1117 for (i = 0; i < nbworkers; i++) {
1118 waitpid(pids[0], &status, 0);
1119 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1120 }
1121
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001122 close(sock);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001123 pthread_key_delete(worker_id);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001124 return EXIT_SUCCESS;
1125error:
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001126 return EXIT_FAILURE;
1127}