blob: 94be6c05502e07bc5b551a1c5c8c02ca17a9bfea [file] [log] [blame]
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001/*
2 * A Random IP reputation service acting as a Stream Processing Offload Agent
3 *
4 * This is a very simple service that implement a "random" ip reputation
5 * service. It will return random scores for all checked IP addresses. It only
6 * shows you how to implement a ip reputation service or such kind of services
7 * using the SPOE.
8 *
9 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010010 * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010011 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version
15 * 2 of the License, or (at your option) any later version.
16 *
17 */
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdbool.h>
22#include <unistd.h>
23#include <signal.h>
24#include <pthread.h>
25#include <sys/time.h>
26#include <sys/types.h>
27#include <sys/socket.h>
28#include <netinet/in.h>
29#include <netinet/tcp.h>
30#include <arpa/inet.h>
31
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010032#include "spoa.h"
33
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010034#define DEFAULT_PORT 12345
35#define NUM_WORKERS 5
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010036
37#define SLEN(str) (sizeof(str)-1)
38
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010039/* Frame Types sent by HAProxy and by agents */
40enum spoe_frame_type {
41 /* Frames sent by HAProxy */
42 SPOE_FRM_T_HAPROXY_HELLO = 1,
43 SPOE_FRM_T_HAPROXY_DISCON,
44 SPOE_FRM_T_HAPROXY_NOTIFY,
45
46 /* Frames sent by the agents */
47 SPOE_FRM_T_AGENT_HELLO = 101,
48 SPOE_FRM_T_AGENT_DISCON,
49 SPOE_FRM_T_AGENT_ACK
50};
51
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010052/* Errors triggerd by SPOE applet */
53enum spoe_frame_error {
54 SPOE_FRM_ERR_NONE = 0,
55 SPOE_FRM_ERR_IO,
56 SPOE_FRM_ERR_TOUT,
57 SPOE_FRM_ERR_TOO_BIG,
58 SPOE_FRM_ERR_INVALID,
59 SPOE_FRM_ERR_NO_VSN,
60 SPOE_FRM_ERR_NO_FRAME_SIZE,
61 SPOE_FRM_ERR_NO_CAP,
62 SPOE_FRM_ERR_BAD_VSN,
63 SPOE_FRM_ERR_BAD_FRAME_SIZE,
64 SPOE_FRM_ERR_UNKNOWN = 99,
65 SPOE_FRM_ERRS,
66};
67
68/* All supported SPOE actions */
69enum spoe_action_type {
70 SPOE_ACT_T_SET_VAR = 1,
71 SPOE_ACT_T_UNSET_VAR,
72 SPOE_ACT_TYPES,
73};
74
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010075
76/* Masks to get data type or flags value */
77#define SPOE_DATA_T_MASK 0x0F
78#define SPOE_DATA_FL_MASK 0xF0
79
80/* Flags to set Boolean values */
81#define SPOE_DATA_FL_FALSE 0x00
82#define SPOE_DATA_FL_TRUE 0x10
83static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
84 [SPOE_FRM_ERR_NONE] = "normal",
85 [SPOE_FRM_ERR_IO] = "I/O error",
86 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
87 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
88 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
89 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
90 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
91 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
92 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
93 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
94 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
95};
96
Thierry FOURNIER880d7e12018-02-25 10:54:56 +010097bool debug = false;
98pthread_key_t worker_id;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010099
100static void
101check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
102{
103 char str[INET_ADDRSTRLEN];
104
105 if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
106 return;
107
108 w->ip_score = random() % 100;
109
110 DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, w->ip_score);
111}
112
113static void
114check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
115{
116 char str[INET6_ADDRSTRLEN];
117
118 if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
119 return;
120
121 w->ip_score = random() % 100;
122
123 DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, w->ip_score);
124}
125
126static int
127do_read(int sock, void *buf, int read_len)
128{
129 fd_set readfds;
130 int n = 0, total = 0, bytesleft = read_len;
131
132 FD_ZERO(&readfds);
133 FD_SET(sock, &readfds);
134
135 while (total < read_len) {
136 if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
137 return -1;
138 if (!FD_ISSET(sock, &readfds))
139 return -1;
140
141 n = read(sock, buf + total, bytesleft);
142 if (n <= 0)
143 break;
144
145 total += n;
146 bytesleft -= n;
147 }
148
149 return (n == -1) ? -1 : total;
150}
151
152static int
153do_write(int sock, void *buf, int write_len)
154{
155 fd_set writefds;
156 int n = 0, total = 0, bytesleft = write_len;
157
158 FD_ZERO(&writefds);
159 FD_SET(sock, &writefds);
160
161 while (total < write_len) {
162 if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
163 return -1;
164 if (!FD_ISSET(sock, &writefds))
165 return -1;
166
167 n = write(sock, buf + total, bytesleft);
168 if (n <= 0)
169 break;
170
171 total += n;
172 bytesleft -= n;
173 }
174
175 return (n == -1) ? -1 : total;
176}
177
178/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
179 * otherwise the number of read bytes.*/
180static int
181read_frame(int sock, struct worker *w)
182{
183 uint32_t netint;
184 unsigned int framesz;
185
186 /* Read the frame size, on 4 bytes */
187 if (do_read(sock, &netint, sizeof(netint)) != 4) {
188 w->status_code = SPOE_FRM_ERR_IO;
189 return -1;
190 }
191
192 /* Check it against the max size */
193 framesz = ntohl(netint);
194 if (framesz > w->size) {
195 w->status_code = SPOE_FRM_ERR_TOO_BIG;
196 return -1;
197 }
198
199 /* Read the frame */
200 if (do_read(sock, w->buf, framesz) != framesz) {
201 w->status_code = SPOE_FRM_ERR_IO;
202 return -1;
203 }
204
205 w->len = framesz;
206 return framesz;
207}
208
209/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
210 * number of written bytes. */
211static int
212write_frame(int sock, struct worker *w)
213{
214 uint32_t netint;
215
216 /* Write the frame size, on 4 bytes */
217 netint = htonl(w->len);
218 if (do_write(sock, &netint, sizeof(netint)) != 4) {
219 w->status_code = SPOE_FRM_ERR_IO;
220 return -1;
221 }
222
223 /* Write the frame */
224 if (do_write(sock, w->buf, w->len) != w->len) {
225 w->status_code = SPOE_FRM_ERR_IO;
226 return -1;
227 }
228 return w->len;
229}
230
231/* Encode a variable-length integer. This function never fails and returns the
232 * number of written bytes. */
233static int
234encode_spoe_varint(uint64_t i, char *buf)
235{
236 int idx;
237
238 if (i < 240) {
239 buf[0] = (unsigned char)i;
240 return 1;
241 }
242
243 buf[0] = (unsigned char)i | 240;
244 i = (i - 240) >> 4;
245 for (idx = 1; i >= 128; ++idx) {
246 buf[idx] = (unsigned char)i | 128;
247 i = (i - 128) >> 7;
248 }
249 buf[idx++] = (unsigned char)i;
250 return idx;
251}
252
253/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
254 * happens when the buffer's end in reached. On success, the number of read
255 * bytes is returned. */
256static int
257decode_spoe_varint(char *buf, char *end, uint64_t *i)
258{
259 unsigned char *msg = (unsigned char *)buf;
260 int idx = 0;
261
262 if (msg > (unsigned char *)end)
263 return -1;
264
265 if (msg[0] < 240) {
266 *i = msg[0];
267 return 1;
268 }
269 *i = msg[0];
270 do {
271 ++idx;
272 if (msg+idx > (unsigned char *)end)
273 return -1;
274 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
275 } while (msg[idx] >= 128);
276 return (idx + 1);
277}
278
279/* Encode a string. The string will be prefix by its length, encoded as a
280 * variable-length integer. This function never fails and returns the number of
281 * written bytes. */
282static int
283encode_spoe_string(const char *str, size_t len, char *dst)
284{
285 int idx = 0;
286
287 if (!len) {
288 dst[0] = 0;
289 return 1;
290 }
291
292 idx += encode_spoe_varint(len, dst);
293 memcpy(dst+idx, str, len);
294 return (idx + len);
295}
296
297/* Decode a string. Its length is decoded first as a variable-length integer. If
298 * it succeeds, and if the string length is valid, the begin of the string is
299 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
300 * read is returned. If an error occurred, -1 is returned and <*str> remains
301 * NULL. */
302static int
303decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
304{
305 int r, idx = 0;
306
307 *str = NULL;
308 *len = 0;
309
310 if ((r = decode_spoe_varint(buf, end, len)) == -1)
311 goto error;
312 idx += r;
313 if (buf + idx + *len > end)
314 goto error;
315
316 *str = buf+idx;
317 return (idx + *len);
318
319error:
320 return -1;
321}
322
323/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
324 * of bytes read is returned. A types data is composed of a type (1 byte) and
325 * corresponding data:
326 * - boolean: non additional data (0 bytes)
327 * - integers: a variable-length integer (see decode_spoe_varint)
328 * - ipv4: 4 bytes
329 * - ipv6: 16 bytes
330 * - binary and string: a buffer prefixed by its size, a variable-length
331 * integer (see decode_spoe_string) */
332static int
333skip_spoe_data(char *frame, char *end)
334{
335 uint64_t sz = 0;
336 int r, idx = 0;
337
338 if (frame > end)
339 return -1;
340
341 switch (frame[idx++] & SPOE_DATA_T_MASK) {
342 case SPOE_DATA_T_BOOL:
343 idx++;
344 break;
345 case SPOE_DATA_T_INT32:
346 case SPOE_DATA_T_INT64:
347 case SPOE_DATA_T_UINT32:
348 case SPOE_DATA_T_UINT64:
349 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
350 return -1;
351 idx += r;
352 break;
353 case SPOE_DATA_T_IPV4:
354 idx += 4;
355 break;
356 case SPOE_DATA_T_IPV6:
357 idx += 16;
358 break;
359 case SPOE_DATA_T_STR:
360 case SPOE_DATA_T_BIN:
361 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
362 return -1;
363 idx += r + sz;
364 break;
365 }
366
367 if (frame+idx > end)
368 return -1;
369 return idx;
370}
371
372/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
373 * number of read bytes is returned. See skip_spoe_data for details. */
374static int
375decode_spoe_data(char *frame, char *end, struct spoe_data *data)
376{
377 uint64_t sz = 0;
378 int type, r, idx = 0;
379
380 if (frame > end)
381 return -1;
382
383 type = frame[idx++];
384 data->type = (type & SPOE_DATA_T_MASK);
385 switch (data->type) {
386 case SPOE_DATA_T_BOOL:
387 data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
388 break;
389 case SPOE_DATA_T_INT32:
390 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
391 return -1;
392 data->u.sint32 = sz;
393 idx += r;
394 break;
395 case SPOE_DATA_T_INT64:
396 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
397 return -1;
398 data->u.uint32 = sz;
399 idx += r;
400 break;
401 case SPOE_DATA_T_UINT32:
402 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
403 return -1;
404 data->u.sint64 = sz;
405 idx += r;
406 break;
407 case SPOE_DATA_T_UINT64:
408 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
409 return -1;
410 data->u.uint64 = sz;
411 idx += r;
412 break;
413 case SPOE_DATA_T_IPV4:
414 if (frame+idx+4 > end)
415 return -1;
416 memcpy(&data->u.ipv4, frame+idx, 4);
417 idx += 4;
418 break;
419 case SPOE_DATA_T_IPV6:
420 if (frame+idx+16 > end)
421 return -1;
422 memcpy(&data->u.ipv6, frame+idx, 16);
423 idx += 16;
424 break;
425 case SPOE_DATA_T_STR:
426 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
427 return -1;
428 idx += r;
429 if (frame+idx+sz > end)
430 return -1;
431 data->u.buffer.str = frame+idx;
432 data->u.buffer.len = sz;
433 idx += sz;
434 break;
435 case SPOE_DATA_T_BIN:
436 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
437 return -1;
438 idx += r;
439 if (frame+idx+sz > end)
440 return -1;
441 data->u.buffer.str = frame+idx;
442 data->u.buffer.len = sz;
443 idx += sz;
444 break;
445 default:
446 break;
447 }
448
449 if (frame+idx > end)
450 return -1;
451 return idx;
452}
453
454
455/* Check the protocol version. It returns -1 if an error occurred, the number of
456 * read bytes otherwise. */
457static int
458check_proto_version(struct worker *w, int idx)
459{
460 char *str;
461 uint64_t sz;
462
463 /* Get the list of all supported versions by HAProxy */
464 if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
465 w->status_code = SPOE_FRM_ERR_INVALID;
466 return -1;
467 }
468 idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
469 if (str == NULL) {
470 w->status_code = SPOE_FRM_ERR_INVALID;
471 return -1;
472 }
473
474 /* TODO: Find the right verion in supported ones */
475
476 return idx;
477}
478
479/* Check max frame size value. It returns -1 if an error occurred, the number of
480 * read bytes otherwise. */
481static int
482check_max_frame_size(struct worker *w, int idx)
483{
484 uint64_t sz;
485 int type, i;
486
487 /* Get the max-frame-size value of HAProxy */
488 type = w->buf[idx++];
489 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
490 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
491 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
492 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
493 w->status_code = SPOE_FRM_ERR_INVALID;
494 return -1;
495 }
496 if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
497 w->status_code = SPOE_FRM_ERR_INVALID;
498 return -1;
499 }
500 idx += i;
501
502 /* Keep the lower value */
503 if (sz < w->size)
504 w->size = sz;
505
506 return idx;
507}
508
509/* Check healthcheck value. It returns -1 if an error occurred, the number of
510 * read bytes otherwise. */
511static int
512check_healthcheck(struct worker *w, int idx)
513{
514 int type;
515
516 /* Get the "healthcheck" value of HAProxy */
517 type = w->buf[idx++];
518 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
519 w->status_code = SPOE_FRM_ERR_INVALID;
520 return -1;
521 }
522 w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
523 return idx;
524}
525
526
527/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
528 * occurred, 0 if the frame must be skipped, otherwise the number of read
529 * bytes. */
530static int
531handle_hahello(struct worker *w)
532{
533 char *end = w->buf+w->len;
534 int i, idx = 0;
535
536 /* Check frame type */
537 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
538 goto skip;
539
540 /* Skip flags */
541 idx += 4;
542
543 /* stream-id and frame-id must be cleared */
544 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
545 w->status_code = SPOE_FRM_ERR_INVALID;
546 goto error;
547 }
548 idx += 2;
549
550 /* Loop on K/V items */
551 while (idx < w->len) {
552 char *str;
553 uint64_t sz;
554
555 /* Decode the item name */
556 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
557 if (str == NULL) {
558 w->status_code = SPOE_FRM_ERR_INVALID;
559 goto error;
560 }
561
562 /* Check "supported-versions" K/V item */
563 if (!memcmp(str, "supported-versions", sz)) {
564 if ((i = check_proto_version(w, idx)) == -1)
565 goto error;
566 idx = i;
567 }
568 /* Check "max-frame-size" K/V item "*/
569 else if (!memcmp(str, "max-frame-size", sz)) {
570 if ((i = check_max_frame_size(w, idx)) == -1)
571 goto error;
572 idx = i;
573 }
574 /* Check "healthcheck" K/V item "*/
575 else if (!memcmp(str, "healthcheck", sz)) {
576 if ((i = check_healthcheck(w, idx)) == -1)
577 goto error;
578 idx = i;
579 }
580 /* Skip "capabilities" K/V item for now */
581 else {
582 /* Silently ignore unknown item */
583 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
584 w->status_code = SPOE_FRM_ERR_INVALID;
585 goto error;
586 }
587 idx += i;
588 }
589 }
590
591 return idx;
592skip:
593 return 0;
594error:
595 return -1;
596}
597
598/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
599 * occurred, 0 if the frame must be skipped, otherwise the number of read
600 * bytes. */
601static int
602handle_hadiscon(struct worker *w)
603{
604 char *end = w->buf+w->len;
605 int i, idx = 0;
606
607 /* Check frame type */
608 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
609 goto skip;
610
611 /* Skip flags */
612 idx += 4;
613
614 /* stream-id and frame-id must be cleared */
615 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
616 w->status_code = SPOE_FRM_ERR_INVALID;
617 goto error;
618 }
619 idx += 2;
620
621 /* Loop on K/V items */
622 while (idx < w->len) {
623 char *str;
624 uint64_t sz;
625
626 /* Decode item key */
627 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
628 if (str == NULL) {
629 w->status_code = SPOE_FRM_ERR_INVALID;
630 goto error;
631 }
632 /* Silently ignore unknown item */
633 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
634 w->status_code = SPOE_FRM_ERR_INVALID;
635 goto error;
636 }
637 idx += i;
638 }
639
640 w->status_code = SPOE_FRM_ERR_NONE;
641 return idx;
642skip:
643 return 0;
644error:
645 return -1;
646}
647
648/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
649 * occurred, 0 if the frame must be skipped, otherwise the number of read
650 * bytes. */
651static int
652handle_hanotify(struct worker *w)
653{
654 char *end = w->buf+w->len;
655 uint64_t stream_id, frame_id;
656 int nbargs, i, idx = 0;
657
658 /* Check frame type */
659 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
660 goto skip;
661
662 /* Skip flags */
663 idx += 4;
664
665 /* Read the stream-id */
666 if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
667 w->status_code = SPOE_FRM_ERR_INVALID;
668 goto error;
669 }
670 idx += i;
671
672 /* Read the frame-id */
673 if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
674 w->status_code = SPOE_FRM_ERR_INVALID;
675 goto error;
676 }
677 idx += i;
678
679 w->stream_id = (unsigned int)stream_id;
680 w->frame_id = (unsigned int)frame_id;
681
682 DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
683 w->stream_id, w->frame_id);
684
685 /* Loop on messages */
686 while (idx < w->len) {
687 char *str;
688 uint64_t sz;
689
690 /* Decode the message name */
691 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
692 if (str == NULL) {
693 w->status_code = SPOE_FRM_ERR_INVALID;
694 goto error;
695 }
696 DEBUG(" Message '%.*s' received", (int)sz, str);
697
698 nbargs = w->buf[idx++];
699 if (!memcmp(str, "check-client-ip", sz)) {
700 struct spoe_data data;
701
702 memset(&data, 0, sizeof(data));
703
704 if (nbargs != 1) {
705 w->status_code = SPOE_FRM_ERR_INVALID;
706 goto error;
707 }
708 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
709 w->status_code = SPOE_FRM_ERR_INVALID;
710 goto error;
711 }
712 idx += i;
713 if ((i = decode_spoe_data(w->buf+idx, end, &data)) == -1) {
714 w->status_code = SPOE_FRM_ERR_INVALID;
715 goto error;
716 }
717 idx += i;
718 if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4)
719 check_ipv4_reputation(w, &data.u.ipv4);
720 else if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV6)
721 check_ipv6_reputation(w, &data.u.ipv6);
722 else {
723 w->status_code = SPOE_FRM_ERR_INVALID;
724 goto error;
725 }
726 }
727 else {
728 while (nbargs-- > 0) {
729 /* Silently ignore argument: its name and its value */
730 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
731 w->status_code = SPOE_FRM_ERR_INVALID;
732 goto error;
733 }
734 idx += i;
735 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
736 w->status_code = SPOE_FRM_ERR_INVALID;
737 goto error;
738 }
739 idx += i;
740 }
741 }
742 }
743
744 return idx;
745skip:
746 return 0;
747error:
748 return -1;
749}
750
751/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
752 * occurred, the number of written bytes otherwise. */
753static int
754prepare_agenthello(struct worker *w)
755{
756 int idx = 0;
757
758 /* Frame Type */
759 w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
760
761 /* No flags for now */
762 memset(w->buf+idx, 0, 4); /* No flags */
763 idx += 4;
764
765 /* No stream-id and frame-id for HELLO frames */
766 w->buf[idx++] = 0;
767 w->buf[idx++] = 0;
768
769 /* "version" K/V item */
770 idx += encode_spoe_string("version", 7, w->buf+idx);
771 w->buf[idx++] = SPOE_DATA_T_STR;
772 idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
773
774 /* "max-frame-size" K/V item */
775 idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
776 w->buf[idx++] = SPOE_DATA_T_UINT32;
777 idx += encode_spoe_varint(w->size, w->buf+idx);
778
779 /* "capabilities" K/V item */
780 idx += encode_spoe_string("capabilities", 12, w->buf+idx);
781 w->buf[idx++] = SPOE_DATA_T_STR;
782 idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
783
784 w->len = idx;
785 return idx;
786}
787
788/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
789 * the number of written bytes otherwise. */
790static int
791prepare_agentack(struct worker *w)
792{
793 int idx = 0;
794
795 /* Frame type */
796 w->buf[idx++] = SPOE_FRM_T_AGENT_ACK;
797
798 /* No flags for now */
799 memset(w->buf+idx, 0, 4); /* No flags */
800 idx += 4;
801
802 /* Set stream-id and frame-id for ACK frames */
803 idx += encode_spoe_varint(w->stream_id, w->buf+idx);
804 idx += encode_spoe_varint(w->frame_id, w->buf+idx);
805
806 /* Data */
807 if (w->ip_score == -1)
808 goto out;
809
810 w->buf[idx++] = SPOE_ACT_T_SET_VAR; /* Action type */
811 w->buf[idx++] = 3; /* Number of args */
812 w->buf[idx++] = SPOE_SCOPE_SESS; /* Arg 1: the scope */
813 idx += encode_spoe_string("ip_score", 8, w->buf+idx); /* Arg 2: variable name */
814 w->buf[idx++] = SPOE_DATA_T_UINT32;
815 idx += encode_spoe_varint(w->ip_score, w->buf+idx); /* Arg 3: variable value */
816out:
817 w->len = idx;
818 return idx;
819}
820
821/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
822 * occurred, the number of written bytes otherwise. */
823static int
824prepare_agentdicon(struct worker *w)
825{
826 const char *reason;
827 int rlen, idx = 0;
828
829 if (w->status_code >= SPOE_FRM_ERRS)
830 w->status_code = SPOE_FRM_ERR_UNKNOWN;
831 reason = spoe_frm_err_reasons[w->status_code];
832 rlen = strlen(reason);
833
834 /* Frame type */
835 w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
836
837 /* No flags for now */
838 memset(w->buf+idx, 0, 4);
839 idx += 4;
840
841 /* No stream-id and frame-id for DISCONNECT frames */
842 w->buf[idx++] = 0;
843 w->buf[idx++] = 0;
844
845 /* There are 2 mandatory items: "status-code" and "message" */
846
847 /* "status-code" K/V item */
848 idx += encode_spoe_string("status-code", 11, w->buf+idx);
849 w->buf[idx++] = SPOE_DATA_T_UINT32;
850 idx += encode_spoe_varint(w->status_code, w->buf+idx);
851
852 /* "message" K/V item */
853 idx += encode_spoe_string("message", 7, w->buf+idx);
854 w->buf[idx++] = SPOE_DATA_T_STR;
855 idx += encode_spoe_string(reason, rlen, w->buf+idx);
856
857 w->len = idx;
858 return idx;
859}
860
861static int
862hello_handshake(int sock, struct worker *w)
863{
864 if (read_frame(sock, w) < 0) {
865 LOG("Failed to read Haproxy HELLO frame");
866 goto error;
867 }
868 if (handle_hahello(w) < 0) {
869 LOG("Failed to handle Haproxy HELLO frame");
870 goto error;
871 }
872 if (prepare_agenthello(w) < 0) {
873 LOG("Failed to prepare Agent HELLO frame");
874 goto error;
875 }
876 if (write_frame(sock, w) < 0) {
877 LOG("Failed to write Agent frame");
878 goto error;
879 }
880 DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
881 SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
882 return 0;
883error:
884 return -1;
885}
886
887static int
888notify_ack_roundtip(int sock, struct worker *w)
889{
890 if (read_frame(sock, w) < 0) {
891 LOG("Failed to read Haproxy NOTIFY frame");
892 goto error_or_quit;
893 }
894 if (handle_hadiscon(w) != 0) {
895 if (w->status_code != SPOE_FRM_ERR_NONE)
896 LOG("Failed to handle Haproxy DISCONNECT frame");
897 DEBUG("Disconnect frame received: reason=%s",
898 spoe_frm_err_reasons[w->status_code]);
899 goto error_or_quit;
900 }
901 if (handle_hanotify(w) < 0) {
902 LOG("Failed to handle Haproxy NOTIFY frame");
903 goto error_or_quit;
904 }
905 if (prepare_agentack(w) < 0) {
906 LOG("Failed to prepare Agent ACK frame");
907 goto error_or_quit;
908 }
909 if (write_frame(sock, w) < 0) {
910 LOG("Failed to write Agent ACK frame");
911 goto error_or_quit;
912 }
913 DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
914 w->stream_id, w->frame_id);
915 return 0;
916error_or_quit:
917 return -1;
918}
919
920static void *
921worker(void *data)
922{
923 struct worker w;
924 struct sockaddr_in client;
925 int *info = (int *)data;
926 int csock, lsock = info[0];
927
928 signal(SIGPIPE, SIG_IGN);
929 pthread_setspecific(worker_id, &info[1]);
930
931 while (1) {
932 socklen_t sz = sizeof(client);
933
934 if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) {
935 LOG("Failed to accept client connection: %m");
936 goto out;
937 }
938 memset(&w, 0, sizeof(w));
939 w.id = info[1];
940 w.size = MAX_FRAME_SIZE;
941
942 DEBUG("New connection from HAProxy accepted");
943
944 if (hello_handshake(csock, &w) < 0)
945 goto disconnect;
946 if (w.healthcheck == true)
947 goto close;
948 while (1) {
949 w.ip_score = -1;
950 if (notify_ack_roundtip(csock, &w) < 0)
951 break;
952 }
953
954 disconnect:
955 if (w.status_code == SPOE_FRM_ERR_IO) {
956 LOG("Close the client socket because of I/O errors");
957 goto close;
958 }
959 if (prepare_agentdicon(&w) < 0) {
960 LOG("Failed to prepare Agent DISCONNECT frame");
961 goto close;
962 }
963 if (write_frame(csock, &w) < 0) {
964 LOG("Failed to write Agent DISCONNECT frame");
965 goto close;
966 }
967 DEBUG("Disconnect frame sent: reason=%s",
968 spoe_frm_err_reasons[w.status_code]);
969
970 close:
971 close(csock);
972 }
973
974out:
975 free(info);
976 pthread_exit(NULL);
977}
978
979static void
980usage(char *prog)
981{
982 fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>]\n", prog);
983 fprintf(stderr, " -h Print this message\n");
984 fprintf(stderr, " -d Enable the debug mode\n");
985 fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n");
986 fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
987}
988
989int
990main(int argc, char **argv)
991{
992 pthread_t *ts = NULL;
993 struct sockaddr_in server;
994 int i, sock, opt, nbworkers, port;
995
996 nbworkers = NUM_WORKERS;
997 port = DEFAULT_PORT;
998 while ((opt = getopt(argc, argv, "hdn:p:")) != -1) {
999 switch (opt) {
1000 case 'h':
1001 usage(argv[0]);
1002 return EXIT_SUCCESS;
1003 case 'd':
1004 debug = true;
1005 break;
1006 case 'n':
1007 nbworkers = atoi(optarg);
1008 break;
1009 case 'p':
1010 port = atoi(optarg);
1011 break;
1012 default:
1013 usage(argv[0]);
1014 return EXIT_FAILURE;
1015 }
1016 }
1017
1018 if (nbworkers <= 0) {
1019 fprintf(stderr, "%s: Invalid number of workers '%d'\n",
1020 argv[0], nbworkers);
1021 goto error;
1022 }
1023 if (port <= 0) {
1024 fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
1025 goto error;
1026 }
1027
1028 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1029 fprintf(stderr, "Failed creating socket: %m\n");
1030 goto error;
1031 }
1032
1033 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
1034 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
1035
1036 memset(&server, 0, sizeof(server));
1037 server.sin_family = AF_INET;
1038 server.sin_addr.s_addr = INADDR_ANY;
1039 server.sin_port = htons(port);
1040
1041 if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
1042 fprintf(stderr, "Failed to bind the socket: %m\n");
1043 goto error;
1044 }
1045
1046 if (listen(sock , 10) < 0) {
1047 fprintf(stderr, "Failed to listen on the socket: %m\n");
1048 goto error;
1049 }
1050 fprintf(stderr, "SPOA is listening on port %d\n", port);
1051
1052 ts = calloc(nbworkers, sizeof(*ts));
1053 pthread_key_create(&worker_id, NULL);
1054 for (i = 0; i < nbworkers; i++) {
1055 int *info = calloc(2, sizeof(*info));
1056
1057 info[0] = sock;
1058 info[1] = i+1;
1059 if (pthread_create(&ts[i], NULL, worker, info) < 0) {
1060 fprintf(stderr, "Failed to create thread %d: %m\n", i+1);
1061 goto error;
1062 }
1063 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1064 }
1065
1066 for (i = 0; i < nbworkers; i++) {
1067 pthread_join(ts[i], NULL);
1068 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1069 }
1070 pthread_key_delete(worker_id);
1071 free(ts);
1072 close(sock);
1073 return EXIT_SUCCESS;
1074error:
1075 free(ts);
1076 return EXIT_FAILURE;
1077}