blob: 53fc759bc5898a68574a458a01e27da79ede6219 [file] [log] [blame]
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001/*
2 * A Random IP reputation service acting as a Stream Processing Offload Agent
3 *
4 * This is a very simple service that implement a "random" ip reputation
5 * service. It will return random scores for all checked IP addresses. It only
6 * shows you how to implement a ip reputation service or such kind of services
7 * using the SPOE.
8 *
9 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010010 * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010011 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version
15 * 2 of the License, or (at your option) any later version.
16 *
17 */
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <stdbool.h>
22#include <unistd.h>
23#include <signal.h>
24#include <pthread.h>
25#include <sys/time.h>
26#include <sys/types.h>
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +010027#include <sys/wait.h>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010028#include <sys/socket.h>
29#include <netinet/in.h>
30#include <netinet/tcp.h>
31#include <arpa/inet.h>
32
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010033#include "spoa.h"
34
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010035#define DEFAULT_PORT 12345
36#define NUM_WORKERS 5
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010037
38#define SLEN(str) (sizeof(str)-1)
39
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010040/* Frame Types sent by HAProxy and by agents */
41enum spoe_frame_type {
42 /* Frames sent by HAProxy */
43 SPOE_FRM_T_HAPROXY_HELLO = 1,
44 SPOE_FRM_T_HAPROXY_DISCON,
45 SPOE_FRM_T_HAPROXY_NOTIFY,
46
47 /* Frames sent by the agents */
48 SPOE_FRM_T_AGENT_HELLO = 101,
49 SPOE_FRM_T_AGENT_DISCON,
50 SPOE_FRM_T_AGENT_ACK
51};
52
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010053/* Errors triggerd by SPOE applet */
54enum spoe_frame_error {
55 SPOE_FRM_ERR_NONE = 0,
56 SPOE_FRM_ERR_IO,
57 SPOE_FRM_ERR_TOUT,
58 SPOE_FRM_ERR_TOO_BIG,
59 SPOE_FRM_ERR_INVALID,
60 SPOE_FRM_ERR_NO_VSN,
61 SPOE_FRM_ERR_NO_FRAME_SIZE,
62 SPOE_FRM_ERR_NO_CAP,
63 SPOE_FRM_ERR_BAD_VSN,
64 SPOE_FRM_ERR_BAD_FRAME_SIZE,
65 SPOE_FRM_ERR_UNKNOWN = 99,
66 SPOE_FRM_ERRS,
67};
68
69/* All supported SPOE actions */
70enum spoe_action_type {
71 SPOE_ACT_T_SET_VAR = 1,
72 SPOE_ACT_T_UNSET_VAR,
73 SPOE_ACT_TYPES,
74};
75
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010076
77/* Masks to get data type or flags value */
78#define SPOE_DATA_T_MASK 0x0F
79#define SPOE_DATA_FL_MASK 0xF0
80
81/* Flags to set Boolean values */
82#define SPOE_DATA_FL_FALSE 0x00
83#define SPOE_DATA_FL_TRUE 0x10
84static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
85 [SPOE_FRM_ERR_NONE] = "normal",
86 [SPOE_FRM_ERR_IO] = "I/O error",
87 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
88 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
89 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
90 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
91 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
92 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
93 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
94 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
95 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
96};
97
Thierry FOURNIER880d7e12018-02-25 10:54:56 +010098bool debug = false;
99pthread_key_t worker_id;
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100100static struct ps *ps_list = NULL;
Thierry FOURNIER892f6642018-02-23 14:27:05 +0100101static struct ps_message *ps_messages = NULL;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +0100102static int nfiles = 0;
103static char **files = NULL;
104
105static inline void add_file(const char *file)
106{
107 nfiles++;
108 files = realloc(files, sizeof(*files) * nfiles);
109 if (files == NULL) {
110 fprintf(stderr, "Out of memory error\n");
111 exit(EXIT_FAILURE);
112 }
113 files[nfiles - 1] = strdup(file);
114 if (files[nfiles - 1] == NULL) {
115 fprintf(stderr, "Out of memory error\n");
116 exit(EXIT_FAILURE);
117 }
118}
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100119
120void ps_register(struct ps *ps)
121{
122 ps->next = ps_list;
123 ps_list = ps;
124}
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100125
Thierry FOURNIER892f6642018-02-23 14:27:05 +0100126void ps_register_message(struct ps *ps, const char *name, void *ref)
127{
128 struct ps_message *msg;
129
130 /* Look for already registered name */
131 for (msg = ps_messages; msg; msg = msg->next) {
132 if (strcmp(name, msg->name) == 0) {
133 LOG("Message \"%s\" already registered\n", name);
134 exit(EXIT_FAILURE);
135 }
136 }
137
138 msg = calloc(1, sizeof(*msg));
139 if (msg == NULL) {
140 LOG("Out of memory error\n");
141 exit(EXIT_FAILURE);
142 }
143
144 msg->next = ps_messages;
145 ps_messages = msg;
146 msg->name = strdup(name);
147 if (msg->name == NULL) {
148 LOG("Out of memory error\n");
149 exit(EXIT_FAILURE);
150 }
151 msg->ref = ref;
152 msg->ps = ps;
153}
154
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100155static void
156check_ipv4_reputation(struct worker *w, struct in_addr *ipv4)
157{
158 char str[INET_ADDRSTRLEN];
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100159 unsigned int score;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100160
161 if (inet_ntop(AF_INET, ipv4, str, INET_ADDRSTRLEN) == NULL)
162 return;
163
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100164 score = random() % 100;
165 set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100166
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100167 DEBUG(" IP score for %.*s is: %d", INET_ADDRSTRLEN, str, score);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100168}
169
170static void
171check_ipv6_reputation(struct worker *w, struct in6_addr *ipv6)
172{
173 char str[INET6_ADDRSTRLEN];
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100174 unsigned int score;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100175
176 if (inet_ntop(AF_INET6, ipv6, str, INET6_ADDRSTRLEN) == NULL)
177 return;
178
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100179 score = random() % 100;
180 set_var_uint32(w, "ip_score", 8, SPOE_SCOPE_SESS, score);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100181
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100182 DEBUG(" IP score for %.*s is: %d", INET6_ADDRSTRLEN, str, score);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100183}
184
185static int
186do_read(int sock, void *buf, int read_len)
187{
188 fd_set readfds;
189 int n = 0, total = 0, bytesleft = read_len;
190
191 FD_ZERO(&readfds);
192 FD_SET(sock, &readfds);
193
194 while (total < read_len) {
195 if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
196 return -1;
197 if (!FD_ISSET(sock, &readfds))
198 return -1;
199
200 n = read(sock, buf + total, bytesleft);
201 if (n <= 0)
202 break;
203
204 total += n;
205 bytesleft -= n;
206 }
207
208 return (n == -1) ? -1 : total;
209}
210
211static int
212do_write(int sock, void *buf, int write_len)
213{
214 fd_set writefds;
215 int n = 0, total = 0, bytesleft = write_len;
216
217 FD_ZERO(&writefds);
218 FD_SET(sock, &writefds);
219
220 while (total < write_len) {
221 if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
222 return -1;
223 if (!FD_ISSET(sock, &writefds))
224 return -1;
225
226 n = write(sock, buf + total, bytesleft);
227 if (n <= 0)
228 break;
229
230 total += n;
231 bytesleft -= n;
232 }
233
234 return (n == -1) ? -1 : total;
235}
236
237/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
238 * otherwise the number of read bytes.*/
239static int
240read_frame(int sock, struct worker *w)
241{
242 uint32_t netint;
243 unsigned int framesz;
244
245 /* Read the frame size, on 4 bytes */
246 if (do_read(sock, &netint, sizeof(netint)) != 4) {
247 w->status_code = SPOE_FRM_ERR_IO;
248 return -1;
249 }
250
251 /* Check it against the max size */
252 framesz = ntohl(netint);
253 if (framesz > w->size) {
254 w->status_code = SPOE_FRM_ERR_TOO_BIG;
255 return -1;
256 }
257
258 /* Read the frame */
259 if (do_read(sock, w->buf, framesz) != framesz) {
260 w->status_code = SPOE_FRM_ERR_IO;
261 return -1;
262 }
263
264 w->len = framesz;
265 return framesz;
266}
267
268/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
269 * number of written bytes. */
270static int
271write_frame(int sock, struct worker *w)
272{
273 uint32_t netint;
274
275 /* Write the frame size, on 4 bytes */
276 netint = htonl(w->len);
277 if (do_write(sock, &netint, sizeof(netint)) != 4) {
278 w->status_code = SPOE_FRM_ERR_IO;
279 return -1;
280 }
281
282 /* Write the frame */
283 if (do_write(sock, w->buf, w->len) != w->len) {
284 w->status_code = SPOE_FRM_ERR_IO;
285 return -1;
286 }
287 return w->len;
288}
289
290/* Encode a variable-length integer. This function never fails and returns the
291 * number of written bytes. */
292static int
293encode_spoe_varint(uint64_t i, char *buf)
294{
295 int idx;
296
297 if (i < 240) {
298 buf[0] = (unsigned char)i;
299 return 1;
300 }
301
302 buf[0] = (unsigned char)i | 240;
303 i = (i - 240) >> 4;
304 for (idx = 1; i >= 128; ++idx) {
305 buf[idx] = (unsigned char)i | 128;
306 i = (i - 128) >> 7;
307 }
308 buf[idx++] = (unsigned char)i;
309 return idx;
310}
311
312/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
313 * happens when the buffer's end in reached. On success, the number of read
314 * bytes is returned. */
315static int
316decode_spoe_varint(char *buf, char *end, uint64_t *i)
317{
318 unsigned char *msg = (unsigned char *)buf;
319 int idx = 0;
320
321 if (msg > (unsigned char *)end)
322 return -1;
323
324 if (msg[0] < 240) {
325 *i = msg[0];
326 return 1;
327 }
328 *i = msg[0];
329 do {
330 ++idx;
331 if (msg+idx > (unsigned char *)end)
332 return -1;
333 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
334 } while (msg[idx] >= 128);
335 return (idx + 1);
336}
337
338/* Encode a string. The string will be prefix by its length, encoded as a
339 * variable-length integer. This function never fails and returns the number of
340 * written bytes. */
341static int
342encode_spoe_string(const char *str, size_t len, char *dst)
343{
344 int idx = 0;
345
346 if (!len) {
347 dst[0] = 0;
348 return 1;
349 }
350
351 idx += encode_spoe_varint(len, dst);
352 memcpy(dst+idx, str, len);
353 return (idx + len);
354}
355
356/* Decode a string. Its length is decoded first as a variable-length integer. If
357 * it succeeds, and if the string length is valid, the begin of the string is
358 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
359 * read is returned. If an error occurred, -1 is returned and <*str> remains
360 * NULL. */
361static int
362decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
363{
364 int r, idx = 0;
365
366 *str = NULL;
367 *len = 0;
368
369 if ((r = decode_spoe_varint(buf, end, len)) == -1)
370 goto error;
371 idx += r;
372 if (buf + idx + *len > end)
373 goto error;
374
375 *str = buf+idx;
376 return (idx + *len);
377
378error:
379 return -1;
380}
381
382/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
383 * of bytes read is returned. A types data is composed of a type (1 byte) and
384 * corresponding data:
385 * - boolean: non additional data (0 bytes)
386 * - integers: a variable-length integer (see decode_spoe_varint)
387 * - ipv4: 4 bytes
388 * - ipv6: 16 bytes
389 * - binary and string: a buffer prefixed by its size, a variable-length
390 * integer (see decode_spoe_string) */
391static int
392skip_spoe_data(char *frame, char *end)
393{
394 uint64_t sz = 0;
395 int r, idx = 0;
396
397 if (frame > end)
398 return -1;
399
400 switch (frame[idx++] & SPOE_DATA_T_MASK) {
401 case SPOE_DATA_T_BOOL:
402 idx++;
403 break;
404 case SPOE_DATA_T_INT32:
405 case SPOE_DATA_T_INT64:
406 case SPOE_DATA_T_UINT32:
407 case SPOE_DATA_T_UINT64:
408 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
409 return -1;
410 idx += r;
411 break;
412 case SPOE_DATA_T_IPV4:
413 idx += 4;
414 break;
415 case SPOE_DATA_T_IPV6:
416 idx += 16;
417 break;
418 case SPOE_DATA_T_STR:
419 case SPOE_DATA_T_BIN:
420 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
421 return -1;
422 idx += r + sz;
423 break;
424 }
425
426 if (frame+idx > end)
427 return -1;
428 return idx;
429}
430
431/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
432 * number of read bytes is returned. See skip_spoe_data for details. */
433static int
434decode_spoe_data(char *frame, char *end, struct spoe_data *data)
435{
436 uint64_t sz = 0;
437 int type, r, idx = 0;
438
439 if (frame > end)
440 return -1;
441
442 type = frame[idx++];
443 data->type = (type & SPOE_DATA_T_MASK);
444 switch (data->type) {
445 case SPOE_DATA_T_BOOL:
446 data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
447 break;
448 case SPOE_DATA_T_INT32:
449 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
450 return -1;
451 data->u.sint32 = sz;
452 idx += r;
453 break;
454 case SPOE_DATA_T_INT64:
455 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
456 return -1;
457 data->u.uint32 = sz;
458 idx += r;
459 break;
460 case SPOE_DATA_T_UINT32:
461 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
462 return -1;
463 data->u.sint64 = sz;
464 idx += r;
465 break;
466 case SPOE_DATA_T_UINT64:
467 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
468 return -1;
469 data->u.uint64 = sz;
470 idx += r;
471 break;
472 case SPOE_DATA_T_IPV4:
473 if (frame+idx+4 > end)
474 return -1;
475 memcpy(&data->u.ipv4, frame+idx, 4);
476 idx += 4;
477 break;
478 case SPOE_DATA_T_IPV6:
479 if (frame+idx+16 > end)
480 return -1;
481 memcpy(&data->u.ipv6, frame+idx, 16);
482 idx += 16;
483 break;
484 case SPOE_DATA_T_STR:
485 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
486 return -1;
487 idx += r;
488 if (frame+idx+sz > end)
489 return -1;
490 data->u.buffer.str = frame+idx;
491 data->u.buffer.len = sz;
492 idx += sz;
493 break;
494 case SPOE_DATA_T_BIN:
495 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
496 return -1;
497 idx += r;
498 if (frame+idx+sz > end)
499 return -1;
500 data->u.buffer.str = frame+idx;
501 data->u.buffer.len = sz;
502 idx += sz;
503 break;
504 default:
505 break;
506 }
507
508 if (frame+idx > end)
509 return -1;
510 return idx;
511}
512
513
514/* Check the protocol version. It returns -1 if an error occurred, the number of
515 * read bytes otherwise. */
516static int
517check_proto_version(struct worker *w, int idx)
518{
519 char *str;
520 uint64_t sz;
521
522 /* Get the list of all supported versions by HAProxy */
523 if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
524 w->status_code = SPOE_FRM_ERR_INVALID;
525 return -1;
526 }
527 idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
528 if (str == NULL) {
529 w->status_code = SPOE_FRM_ERR_INVALID;
530 return -1;
531 }
532
533 /* TODO: Find the right verion in supported ones */
534
535 return idx;
536}
537
538/* Check max frame size value. It returns -1 if an error occurred, the number of
539 * read bytes otherwise. */
540static int
541check_max_frame_size(struct worker *w, int idx)
542{
543 uint64_t sz;
544 int type, i;
545
546 /* Get the max-frame-size value of HAProxy */
547 type = w->buf[idx++];
548 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
549 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
550 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
551 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
552 w->status_code = SPOE_FRM_ERR_INVALID;
553 return -1;
554 }
555 if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
556 w->status_code = SPOE_FRM_ERR_INVALID;
557 return -1;
558 }
559 idx += i;
560
561 /* Keep the lower value */
562 if (sz < w->size)
563 w->size = sz;
564
565 return idx;
566}
567
568/* Check healthcheck value. It returns -1 if an error occurred, the number of
569 * read bytes otherwise. */
570static int
571check_healthcheck(struct worker *w, int idx)
572{
573 int type;
574
575 /* Get the "healthcheck" value of HAProxy */
576 type = w->buf[idx++];
577 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
578 w->status_code = SPOE_FRM_ERR_INVALID;
579 return -1;
580 }
581 w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
582 return idx;
583}
584
585
586/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
587 * occurred, 0 if the frame must be skipped, otherwise the number of read
588 * bytes. */
589static int
590handle_hahello(struct worker *w)
591{
592 char *end = w->buf+w->len;
593 int i, idx = 0;
594
595 /* Check frame type */
596 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
597 goto skip;
598
599 /* Skip flags */
600 idx += 4;
601
602 /* stream-id and frame-id must be cleared */
603 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
604 w->status_code = SPOE_FRM_ERR_INVALID;
605 goto error;
606 }
607 idx += 2;
608
609 /* Loop on K/V items */
610 while (idx < w->len) {
611 char *str;
612 uint64_t sz;
613
614 /* Decode the item name */
615 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
616 if (str == NULL) {
617 w->status_code = SPOE_FRM_ERR_INVALID;
618 goto error;
619 }
620
621 /* Check "supported-versions" K/V item */
622 if (!memcmp(str, "supported-versions", sz)) {
623 if ((i = check_proto_version(w, idx)) == -1)
624 goto error;
625 idx = i;
626 }
627 /* Check "max-frame-size" K/V item "*/
628 else if (!memcmp(str, "max-frame-size", sz)) {
629 if ((i = check_max_frame_size(w, idx)) == -1)
630 goto error;
631 idx = i;
632 }
633 /* Check "healthcheck" K/V item "*/
634 else if (!memcmp(str, "healthcheck", sz)) {
635 if ((i = check_healthcheck(w, idx)) == -1)
636 goto error;
637 idx = i;
638 }
639 /* Skip "capabilities" K/V item for now */
640 else {
641 /* Silently ignore unknown item */
642 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
643 w->status_code = SPOE_FRM_ERR_INVALID;
644 goto error;
645 }
646 idx += i;
647 }
648 }
649
650 return idx;
651skip:
652 return 0;
653error:
654 return -1;
655}
656
657/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
658 * occurred, 0 if the frame must be skipped, otherwise the number of read
659 * bytes. */
660static int
661handle_hadiscon(struct worker *w)
662{
663 char *end = w->buf+w->len;
664 int i, idx = 0;
665
666 /* Check frame type */
667 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
668 goto skip;
669
670 /* Skip flags */
671 idx += 4;
672
673 /* stream-id and frame-id must be cleared */
674 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
675 w->status_code = SPOE_FRM_ERR_INVALID;
676 goto error;
677 }
678 idx += 2;
679
680 /* Loop on K/V items */
681 while (idx < w->len) {
682 char *str;
683 uint64_t sz;
684
685 /* Decode item key */
686 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
687 if (str == NULL) {
688 w->status_code = SPOE_FRM_ERR_INVALID;
689 goto error;
690 }
691 /* Silently ignore unknown item */
692 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
693 w->status_code = SPOE_FRM_ERR_INVALID;
694 goto error;
695 }
696 idx += i;
697 }
698
699 w->status_code = SPOE_FRM_ERR_NONE;
700 return idx;
701skip:
702 return 0;
703error:
704 return -1;
705}
706
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100707/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
708 * the number of written bytes otherwise. */
709static void prepare_agentack(struct worker *w)
710{
711 w->ack_len = 0;
712
713 /* Frame type */
714 w->ack[w->ack_len++] = SPOE_FRM_T_AGENT_ACK;
715
716 /* No flags for now */
717 memset(w->ack + w->ack_len, 0, 4); /* No flags */
718 w->ack_len += 4;
719
720 /* Set stream-id and frame-id for ACK frames */
721 w->ack_len += encode_spoe_varint(w->stream_id, w->ack + w->ack_len);
722 w->ack_len += encode_spoe_varint(w->frame_id, w->ack + w->ack_len);
723}
724
725static inline
726int set_var_name(struct worker *w, const char *name, int name_len, unsigned char scope)
727{
728 w->ack[w->ack_len++] = SPOE_ACT_T_SET_VAR; /* Action type */
729 w->ack[w->ack_len++] = 3; /* Number of args */
730 w->ack[w->ack_len++] = scope; /* Arg 1: the scope */
731 w->ack_len += encode_spoe_string(name, name_len, w->ack+w->ack_len); /* Arg 2: variable name */
732 return 1;
733}
734
735int set_var_null(struct worker *w,
736 const char *name, int name_len,
737 unsigned char scope)
738{
739 if (!set_var_name(w, name, name_len, scope))
740 return 0;
741 w->ack[w->ack_len++] = SPOE_DATA_T_NULL;
742 return 1;
743}
744
745int set_var_bool(struct worker *w,
746 const char *name, int name_len,
747 unsigned char scope, bool value)
748{
749 if (!set_var_name(w, name, name_len, scope))
750 return 0;
751 w->ack[w->ack_len++] = SPOE_DATA_T_BOOL | (!!value << 4);
752 return 1;
753}
754
755static inline
756int set_var_int(struct worker *w,
757 const char *name, int name_len,
758 unsigned char scope, int type, uint64_t value)
759{
760 if (!set_var_name(w, name, name_len, scope))
761 return 0;
762 w->ack[w->ack_len++] = SPOE_DATA_T_UINT32;
763 w->ack_len += encode_spoe_varint(value, w->ack+w->ack_len); /* Arg 3: variable value */
764 return 1;
765}
766
767int set_var_uint32(struct worker *w,
768 const char *name, int name_len,
769 unsigned char scope, uint32_t value)
770{
771 return set_var_int(w, name, name_len, scope, SPOE_DATA_T_UINT32, value);
772}
773
774int set_var_int32(struct worker *w,
775 const char *name, int name_len,
776 unsigned char scope, int32_t value)
777{
778 return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
779}
780
781int set_var_uint64(struct worker *w,
782 const char *name, int name_len,
783 unsigned char scope, uint64_t value)
784{
785 return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
786}
787
788int set_var_int64(struct worker *w,
789 const char *name, int name_len,
790 unsigned char scope, int64_t value)
791{
792 return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
793}
794
795int set_var_ipv4(struct worker *w,
796 const char *name, int name_len,
797 unsigned char scope,
798 struct in_addr *ipv4)
799{
800 if (!set_var_name(w, name, name_len, scope))
801 return 0;
802 w->ack[w->ack_len++] = SPOE_DATA_T_IPV4;
803 memcpy(w->ack+w->ack_len, ipv4, 4);
804 w->ack_len += 4;
805 return 1;
806}
807
808int set_var_ipv6(struct worker *w,
809 const char *name, int name_len,
810 unsigned char scope,
811 struct in6_addr *ipv6)
812{
813 if (!set_var_name(w, name, name_len, scope))
814 return 0;
815 w->ack[w->ack_len++] = SPOE_DATA_T_IPV6;
816 memcpy(w->ack+w->ack_len, ipv6, 16);
817 w->ack_len += 16;
818 return 1;
819}
820
821static inline
822int set_var_buf(struct worker *w,
823 const char *name, int name_len,
824 unsigned char scope, int type,
825 const char *str, int str_len)
826{
827 if (!set_var_name(w, name, name_len, scope))
828 return 0;
829 w->ack[w->ack_len++] = type;
830 w->ack_len += encode_spoe_string(str, str_len, w->ack+w->ack_len);
831 return 1;
832}
833
834int set_var_string(struct worker *w,
835 const char *name, int name_len,
836 unsigned char scope,
837 const char *str, int strlen)
838{
839 return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_STR, str, strlen);
840}
841
842int set_var_bin(struct worker *w,
843 const char *name, int name_len,
844 unsigned char scope,
845 const char *str, int strlen)
846{
847 return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_BIN, str, strlen);
848}
849
850/* This function is a little bit ugly,
851 * TODO: improve the response without copying the bufer
852 */
853static int commit_agentack(struct worker *w)
854{
855 memcpy(w->buf, w->ack, w->ack_len);
856 w->len = w->ack_len;
857 return 1;
858}
859
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100860/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
861 * occurred, 0 if the frame must be skipped, otherwise the number of read
862 * bytes. */
863static int
864handle_hanotify(struct worker *w)
865{
866 char *end = w->buf+w->len;
867 uint64_t stream_id, frame_id;
868 int nbargs, i, idx = 0;
869
870 /* Check frame type */
871 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
872 goto skip;
873
874 /* Skip flags */
875 idx += 4;
876
877 /* Read the stream-id */
878 if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
879 w->status_code = SPOE_FRM_ERR_INVALID;
880 goto error;
881 }
882 idx += i;
883
884 /* Read the frame-id */
885 if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
886 w->status_code = SPOE_FRM_ERR_INVALID;
887 goto error;
888 }
889 idx += i;
890
891 w->stream_id = (unsigned int)stream_id;
892 w->frame_id = (unsigned int)frame_id;
893
894 DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
895 w->stream_id, w->frame_id);
896
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100897 /* Prepara ack, if the processing fails tha ack will be cancelled */
898 prepare_agentack(w);
899
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100900 /* Loop on messages */
901 while (idx < w->len) {
902 char *str;
903 uint64_t sz;
904
905 /* Decode the message name */
906 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
907 if (str == NULL) {
908 w->status_code = SPOE_FRM_ERR_INVALID;
909 goto error;
910 }
911 DEBUG(" Message '%.*s' received", (int)sz, str);
912
913 nbargs = w->buf[idx++];
914 if (!memcmp(str, "check-client-ip", sz)) {
915 struct spoe_data data;
916
917 memset(&data, 0, sizeof(data));
918
919 if (nbargs != 1) {
920 w->status_code = SPOE_FRM_ERR_INVALID;
921 goto error;
922 }
923 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
924 w->status_code = SPOE_FRM_ERR_INVALID;
925 goto error;
926 }
927 idx += i;
928 if ((i = decode_spoe_data(w->buf+idx, end, &data)) == -1) {
929 w->status_code = SPOE_FRM_ERR_INVALID;
930 goto error;
931 }
932 idx += i;
933 if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV4)
934 check_ipv4_reputation(w, &data.u.ipv4);
935 else if ((data.type & SPOE_DATA_T_MASK) == SPOE_DATA_T_IPV6)
936 check_ipv6_reputation(w, &data.u.ipv6);
937 else {
938 w->status_code = SPOE_FRM_ERR_INVALID;
939 goto error;
940 }
941 }
942 else {
943 while (nbargs-- > 0) {
944 /* Silently ignore argument: its name and its value */
945 if ((i = decode_spoe_string(w->buf+idx, end, &str, &sz)) == -1) {
946 w->status_code = SPOE_FRM_ERR_INVALID;
947 goto error;
948 }
949 idx += i;
950 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
951 w->status_code = SPOE_FRM_ERR_INVALID;
952 goto error;
953 }
954 idx += i;
955 }
956 }
957 }
958
959 return idx;
960skip:
961 return 0;
962error:
963 return -1;
964}
965
966/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
967 * occurred, the number of written bytes otherwise. */
968static int
969prepare_agenthello(struct worker *w)
970{
971 int idx = 0;
972
973 /* Frame Type */
974 w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
975
976 /* No flags for now */
977 memset(w->buf+idx, 0, 4); /* No flags */
978 idx += 4;
979
980 /* No stream-id and frame-id for HELLO frames */
981 w->buf[idx++] = 0;
982 w->buf[idx++] = 0;
983
984 /* "version" K/V item */
985 idx += encode_spoe_string("version", 7, w->buf+idx);
986 w->buf[idx++] = SPOE_DATA_T_STR;
987 idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
988
989 /* "max-frame-size" K/V item */
990 idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
991 w->buf[idx++] = SPOE_DATA_T_UINT32;
992 idx += encode_spoe_varint(w->size, w->buf+idx);
993
994 /* "capabilities" K/V item */
995 idx += encode_spoe_string("capabilities", 12, w->buf+idx);
996 w->buf[idx++] = SPOE_DATA_T_STR;
997 idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
998
999 w->len = idx;
1000 return idx;
1001}
1002
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001003/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
1004 * occurred, the number of written bytes otherwise. */
1005static int
1006prepare_agentdicon(struct worker *w)
1007{
1008 const char *reason;
1009 int rlen, idx = 0;
1010
1011 if (w->status_code >= SPOE_FRM_ERRS)
1012 w->status_code = SPOE_FRM_ERR_UNKNOWN;
1013 reason = spoe_frm_err_reasons[w->status_code];
1014 rlen = strlen(reason);
1015
1016 /* Frame type */
1017 w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
1018
1019 /* No flags for now */
1020 memset(w->buf+idx, 0, 4);
1021 idx += 4;
1022
1023 /* No stream-id and frame-id for DISCONNECT frames */
1024 w->buf[idx++] = 0;
1025 w->buf[idx++] = 0;
1026
1027 /* There are 2 mandatory items: "status-code" and "message" */
1028
1029 /* "status-code" K/V item */
1030 idx += encode_spoe_string("status-code", 11, w->buf+idx);
1031 w->buf[idx++] = SPOE_DATA_T_UINT32;
1032 idx += encode_spoe_varint(w->status_code, w->buf+idx);
1033
1034 /* "message" K/V item */
1035 idx += encode_spoe_string("message", 7, w->buf+idx);
1036 w->buf[idx++] = SPOE_DATA_T_STR;
1037 idx += encode_spoe_string(reason, rlen, w->buf+idx);
1038
1039 w->len = idx;
1040 return idx;
1041}
1042
1043static int
1044hello_handshake(int sock, struct worker *w)
1045{
1046 if (read_frame(sock, w) < 0) {
1047 LOG("Failed to read Haproxy HELLO frame");
1048 goto error;
1049 }
1050 if (handle_hahello(w) < 0) {
1051 LOG("Failed to handle Haproxy HELLO frame");
1052 goto error;
1053 }
1054 if (prepare_agenthello(w) < 0) {
1055 LOG("Failed to prepare Agent HELLO frame");
1056 goto error;
1057 }
1058 if (write_frame(sock, w) < 0) {
1059 LOG("Failed to write Agent frame");
1060 goto error;
1061 }
1062 DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
1063 SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
1064 return 0;
1065error:
1066 return -1;
1067}
1068
1069static int
1070notify_ack_roundtip(int sock, struct worker *w)
1071{
1072 if (read_frame(sock, w) < 0) {
1073 LOG("Failed to read Haproxy NOTIFY frame");
1074 goto error_or_quit;
1075 }
1076 if (handle_hadiscon(w) != 0) {
1077 if (w->status_code != SPOE_FRM_ERR_NONE)
1078 LOG("Failed to handle Haproxy DISCONNECT frame");
1079 DEBUG("Disconnect frame received: reason=%s",
1080 spoe_frm_err_reasons[w->status_code]);
1081 goto error_or_quit;
1082 }
1083 if (handle_hanotify(w) < 0) {
1084 LOG("Failed to handle Haproxy NOTIFY frame");
1085 goto error_or_quit;
1086 }
Thierry FOURNIERfbd38242018-02-23 18:24:10 +01001087 if (commit_agentack(w) < 0) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001088 LOG("Failed to prepare Agent ACK frame");
1089 goto error_or_quit;
1090 }
1091 if (write_frame(sock, w) < 0) {
1092 LOG("Failed to write Agent ACK frame");
1093 goto error_or_quit;
1094 }
1095 DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
1096 w->stream_id, w->frame_id);
1097 return 0;
1098error_or_quit:
1099 return -1;
1100}
1101
1102static void *
Thierry FOURNIER5301ed12018-02-23 11:59:15 +01001103spoa_worker(void *data)
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001104{
1105 struct worker w;
1106 struct sockaddr_in client;
1107 int *info = (int *)data;
1108 int csock, lsock = info[0];
Thierry FOURNIER64eaa332018-02-23 14:58:40 +01001109 struct ps *ps;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001110 int i;
1111 int len;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001112
1113 signal(SIGPIPE, SIG_IGN);
1114 pthread_setspecific(worker_id, &info[1]);
1115
Thierry FOURNIER64eaa332018-02-23 14:58:40 +01001116 /* Init registered processors */
1117 for (ps = ps_list; ps != NULL; ps = ps->next)
1118 ps->init_worker(&w);
1119
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001120 /* Load files */
1121 for (i = 0; i < nfiles; i++) {
1122 len = strlen(files[i]);
1123 for (ps = ps_list; ps != NULL; ps = ps->next)
1124 if (strcmp(files[i] + len - strlen(ps->ext), ps->ext) == 0)
1125 break;
1126 if (ps == NULL) {
1127 LOG("Can't load file \"%s\"\n", files[i]);
1128 goto out;
1129 }
1130 if (!ps->load_file(&w, files[i]))
1131 goto out;
1132 }
1133
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001134 while (1) {
1135 socklen_t sz = sizeof(client);
1136
1137 if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) {
1138 LOG("Failed to accept client connection: %m");
1139 goto out;
1140 }
1141 memset(&w, 0, sizeof(w));
1142 w.id = info[1];
1143 w.size = MAX_FRAME_SIZE;
1144
1145 DEBUG("New connection from HAProxy accepted");
1146
1147 if (hello_handshake(csock, &w) < 0)
1148 goto disconnect;
1149 if (w.healthcheck == true)
1150 goto close;
1151 while (1) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001152 if (notify_ack_roundtip(csock, &w) < 0)
1153 break;
1154 }
1155
1156 disconnect:
1157 if (w.status_code == SPOE_FRM_ERR_IO) {
1158 LOG("Close the client socket because of I/O errors");
1159 goto close;
1160 }
1161 if (prepare_agentdicon(&w) < 0) {
1162 LOG("Failed to prepare Agent DISCONNECT frame");
1163 goto close;
1164 }
1165 if (write_frame(csock, &w) < 0) {
1166 LOG("Failed to write Agent DISCONNECT frame");
1167 goto close;
1168 }
1169 DEBUG("Disconnect frame sent: reason=%s",
1170 spoe_frm_err_reasons[w.status_code]);
1171
1172 close:
1173 close(csock);
1174 }
1175
1176out:
1177 free(info);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001178#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001179 pthread_exit(NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001180#endif
1181 return NULL;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001182}
1183
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001184int process_create(pid_t *pid, void *(*ps)(void *), void *data)
1185{
Thierry FOURNIER786e9e62018-02-23 19:11:47 +01001186 if (debug) {
1187 ps(data);
1188 exit(EXIT_SUCCESS);
1189 }
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001190 *pid = fork();
1191 if (*pid == -1)
1192 return -1;
1193 if (*pid > 0)
1194 return 0;
1195 ps(data);
1196 return 0;
1197}
1198
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001199static void
1200usage(char *prog)
1201{
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001202 fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>] -f <file>\n", prog);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001203 fprintf(stderr, " -h Print this message\n");
1204 fprintf(stderr, " -d Enable the debug mode\n");
1205 fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n");
1206 fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001207 fprintf(stderr, " -f <file> Specify the file whoch contains the processing code.\n");
1208 fprintf(stderr, " This argument can specified more than once.\n");
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001209}
1210
1211int
1212main(int argc, char **argv)
1213{
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001214#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001215 pthread_t *ts = NULL;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001216#endif
1217 pid_t *pids;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001218 struct sockaddr_in server;
1219 int i, sock, opt, nbworkers, port;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001220 int status;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001221
1222 nbworkers = NUM_WORKERS;
1223 port = DEFAULT_PORT;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001224 while ((opt = getopt(argc, argv, "hdn:p:f:")) != -1) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001225 switch (opt) {
1226 case 'h':
1227 usage(argv[0]);
1228 return EXIT_SUCCESS;
1229 case 'd':
1230 debug = true;
1231 break;
1232 case 'n':
1233 nbworkers = atoi(optarg);
1234 break;
1235 case 'p':
1236 port = atoi(optarg);
1237 break;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001238 case 'f':
1239 add_file(optarg);
1240 break;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001241 default:
1242 usage(argv[0]);
1243 return EXIT_FAILURE;
1244 }
1245 }
1246
1247 if (nbworkers <= 0) {
1248 fprintf(stderr, "%s: Invalid number of workers '%d'\n",
1249 argv[0], nbworkers);
1250 goto error;
1251 }
1252 if (port <= 0) {
1253 fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
1254 goto error;
1255 }
1256
1257 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1258 fprintf(stderr, "Failed creating socket: %m\n");
1259 goto error;
1260 }
1261
1262 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
1263 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
1264
1265 memset(&server, 0, sizeof(server));
1266 server.sin_family = AF_INET;
1267 server.sin_addr.s_addr = INADDR_ANY;
1268 server.sin_port = htons(port);
1269
1270 if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
1271 fprintf(stderr, "Failed to bind the socket: %m\n");
1272 goto error;
1273 }
1274
1275 if (listen(sock , 10) < 0) {
1276 fprintf(stderr, "Failed to listen on the socket: %m\n");
1277 goto error;
1278 }
1279 fprintf(stderr, "SPOA is listening on port %d\n", port);
1280
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001281 pthread_key_create(&worker_id, NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001282
1283 /* Initialise the server in thread mode. This code is commented
1284 * out and not deleted, because later I expect to work with
1285 * process ansd threads. This first version just support processes.
1286 */
1287#if 0
1288 ts = calloc(nbworkers, sizeof(*ts));
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001289 for (i = 0; i < nbworkers; i++) {
1290 int *info = calloc(2, sizeof(*info));
1291
1292 info[0] = sock;
1293 info[1] = i+1;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001294
Thierry FOURNIER5301ed12018-02-23 11:59:15 +01001295 if (pthread_create(&ts[i], NULL, spoa_worker, info) < 0) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001296 fprintf(stderr, "Failed to create thread %d: %m\n", i+1);
1297 goto error;
1298 }
1299 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1300 }
1301
1302 for (i = 0; i < nbworkers; i++) {
1303 pthread_join(ts[i], NULL);
1304 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1305 }
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001306 free(ts);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001307#endif
1308
1309 /* Start processes */
1310 pids = calloc(nbworkers, sizeof(*pids));
1311 if (!pids) {
1312 fprintf(stderr, "Out of memory error\n");
1313 goto error;
1314 }
1315 for (i = 0; i < nbworkers; i++) {
1316 int *info = calloc(2, sizeof(*info));
1317
1318 info[0] = sock;
1319 info[1] = i+1;
1320
1321 if (process_create(&pids[i], spoa_worker, info) == -1) {
1322 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1323 goto error;
1324 }
1325 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1326 }
1327 for (i = 0; i < nbworkers; i++) {
1328 waitpid(pids[0], &status, 0);
1329 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1330 }
1331
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001332 close(sock);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001333 pthread_key_delete(worker_id);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001334 return EXIT_SUCCESS;
1335error:
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001336 return EXIT_FAILURE;
1337}