blob: 9a8ffd0a00367ac9d0f9f9af92d927fc419e51a2 [file] [log] [blame]
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001/*
2 * A Random IP reputation service acting as a Stream Processing Offload Agent
3 *
4 * This is a very simple service that implement a "random" ip reputation
5 * service. It will return random scores for all checked IP addresses. It only
6 * shows you how to implement a ip reputation service or such kind of services
7 * using the SPOE.
8 *
9 * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010010 * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010011 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version
15 * 2 of the License, or (at your option) any later version.
16 *
17 */
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +010018#include <limits.h>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010019#include <stdio.h>
20#include <stdlib.h>
21#include <string.h>
22#include <stdbool.h>
23#include <unistd.h>
24#include <signal.h>
25#include <pthread.h>
26#include <sys/time.h>
27#include <sys/types.h>
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +010028#include <sys/wait.h>
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010029#include <sys/socket.h>
30#include <netinet/in.h>
31#include <netinet/tcp.h>
32#include <arpa/inet.h>
33
Thierry FOURNIER4aec0a42018-02-23 11:42:57 +010034#include "spoa.h"
35
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010036#define DEFAULT_PORT 12345
37#define NUM_WORKERS 5
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010038
39#define SLEN(str) (sizeof(str)-1)
40
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010041/* Frame Types sent by HAProxy and by agents */
42enum spoe_frame_type {
43 /* Frames sent by HAProxy */
44 SPOE_FRM_T_HAPROXY_HELLO = 1,
45 SPOE_FRM_T_HAPROXY_DISCON,
46 SPOE_FRM_T_HAPROXY_NOTIFY,
47
48 /* Frames sent by the agents */
49 SPOE_FRM_T_AGENT_HELLO = 101,
50 SPOE_FRM_T_AGENT_DISCON,
51 SPOE_FRM_T_AGENT_ACK
52};
53
Ilya Shipitsince7b00f2020-03-23 22:28:40 +050054/* Errors triggered by SPOE applet */
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010055enum spoe_frame_error {
56 SPOE_FRM_ERR_NONE = 0,
57 SPOE_FRM_ERR_IO,
58 SPOE_FRM_ERR_TOUT,
59 SPOE_FRM_ERR_TOO_BIG,
60 SPOE_FRM_ERR_INVALID,
61 SPOE_FRM_ERR_NO_VSN,
62 SPOE_FRM_ERR_NO_FRAME_SIZE,
63 SPOE_FRM_ERR_NO_CAP,
64 SPOE_FRM_ERR_BAD_VSN,
65 SPOE_FRM_ERR_BAD_FRAME_SIZE,
66 SPOE_FRM_ERR_UNKNOWN = 99,
67 SPOE_FRM_ERRS,
68};
69
70/* All supported SPOE actions */
71enum spoe_action_type {
72 SPOE_ACT_T_SET_VAR = 1,
73 SPOE_ACT_T_UNSET_VAR,
74 SPOE_ACT_TYPES,
75};
76
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +010077
78/* Masks to get data type or flags value */
79#define SPOE_DATA_T_MASK 0x0F
80#define SPOE_DATA_FL_MASK 0xF0
81
82/* Flags to set Boolean values */
83#define SPOE_DATA_FL_FALSE 0x00
84#define SPOE_DATA_FL_TRUE 0x10
85static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
86 [SPOE_FRM_ERR_NONE] = "normal",
87 [SPOE_FRM_ERR_IO] = "I/O error",
88 [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
89 [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
90 [SPOE_FRM_ERR_INVALID] = "invalid frame received",
91 [SPOE_FRM_ERR_NO_VSN] = "version value not found",
92 [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
93 [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
94 [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
95 [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
96 [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
97};
98
Thierry FOURNIER880d7e12018-02-25 10:54:56 +010099bool debug = false;
100pthread_key_t worker_id;
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100101static struct ps *ps_list = NULL;
Thierry FOURNIER892f6642018-02-23 14:27:05 +0100102static struct ps_message *ps_messages = NULL;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +0100103static int nfiles = 0;
104static char **files = NULL;
105
106static inline void add_file(const char *file)
107{
108 nfiles++;
109 files = realloc(files, sizeof(*files) * nfiles);
110 if (files == NULL) {
111 fprintf(stderr, "Out of memory error\n");
112 exit(EXIT_FAILURE);
113 }
114 files[nfiles - 1] = strdup(file);
115 if (files[nfiles - 1] == NULL) {
116 fprintf(stderr, "Out of memory error\n");
117 exit(EXIT_FAILURE);
118 }
119}
Thierry FOURNIER64eaa332018-02-23 14:58:40 +0100120
121void ps_register(struct ps *ps)
122{
123 ps->next = ps_list;
124 ps_list = ps;
125}
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100126
Thierry FOURNIER892f6642018-02-23 14:27:05 +0100127void ps_register_message(struct ps *ps, const char *name, void *ref)
128{
129 struct ps_message *msg;
130
131 /* Look for already registered name */
132 for (msg = ps_messages; msg; msg = msg->next) {
133 if (strcmp(name, msg->name) == 0) {
134 LOG("Message \"%s\" already registered\n", name);
135 exit(EXIT_FAILURE);
136 }
137 }
138
139 msg = calloc(1, sizeof(*msg));
140 if (msg == NULL) {
141 LOG("Out of memory error\n");
142 exit(EXIT_FAILURE);
143 }
144
145 msg->next = ps_messages;
146 ps_messages = msg;
147 msg->name = strdup(name);
148 if (msg->name == NULL) {
149 LOG("Out of memory error\n");
150 exit(EXIT_FAILURE);
151 }
152 msg->ref = ref;
153 msg->ps = ps;
154}
155
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100156static int
157do_read(int sock, void *buf, int read_len)
158{
159 fd_set readfds;
160 int n = 0, total = 0, bytesleft = read_len;
161
162 FD_ZERO(&readfds);
163 FD_SET(sock, &readfds);
164
165 while (total < read_len) {
166 if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
167 return -1;
168 if (!FD_ISSET(sock, &readfds))
169 return -1;
170
171 n = read(sock, buf + total, bytesleft);
172 if (n <= 0)
173 break;
174
175 total += n;
176 bytesleft -= n;
177 }
178
179 return (n == -1) ? -1 : total;
180}
181
182static int
183do_write(int sock, void *buf, int write_len)
184{
185 fd_set writefds;
186 int n = 0, total = 0, bytesleft = write_len;
187
188 FD_ZERO(&writefds);
189 FD_SET(sock, &writefds);
190
191 while (total < write_len) {
192 if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
193 return -1;
194 if (!FD_ISSET(sock, &writefds))
195 return -1;
196
197 n = write(sock, buf + total, bytesleft);
198 if (n <= 0)
199 break;
200
201 total += n;
202 bytesleft -= n;
203 }
204
205 return (n == -1) ? -1 : total;
206}
207
208/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
209 * otherwise the number of read bytes.*/
210static int
211read_frame(int sock, struct worker *w)
212{
213 uint32_t netint;
214 unsigned int framesz;
215
216 /* Read the frame size, on 4 bytes */
217 if (do_read(sock, &netint, sizeof(netint)) != 4) {
218 w->status_code = SPOE_FRM_ERR_IO;
219 return -1;
220 }
221
222 /* Check it against the max size */
223 framesz = ntohl(netint);
224 if (framesz > w->size) {
225 w->status_code = SPOE_FRM_ERR_TOO_BIG;
226 return -1;
227 }
228
229 /* Read the frame */
230 if (do_read(sock, w->buf, framesz) != framesz) {
231 w->status_code = SPOE_FRM_ERR_IO;
232 return -1;
233 }
234
235 w->len = framesz;
236 return framesz;
237}
238
239/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
240 * number of written bytes. */
241static int
242write_frame(int sock, struct worker *w)
243{
244 uint32_t netint;
245
246 /* Write the frame size, on 4 bytes */
247 netint = htonl(w->len);
248 if (do_write(sock, &netint, sizeof(netint)) != 4) {
249 w->status_code = SPOE_FRM_ERR_IO;
250 return -1;
251 }
252
253 /* Write the frame */
254 if (do_write(sock, w->buf, w->len) != w->len) {
255 w->status_code = SPOE_FRM_ERR_IO;
256 return -1;
257 }
258 return w->len;
259}
260
261/* Encode a variable-length integer. This function never fails and returns the
262 * number of written bytes. */
263static int
264encode_spoe_varint(uint64_t i, char *buf)
265{
266 int idx;
267
268 if (i < 240) {
269 buf[0] = (unsigned char)i;
270 return 1;
271 }
272
273 buf[0] = (unsigned char)i | 240;
274 i = (i - 240) >> 4;
275 for (idx = 1; i >= 128; ++idx) {
276 buf[idx] = (unsigned char)i | 128;
277 i = (i - 128) >> 7;
278 }
279 buf[idx++] = (unsigned char)i;
280 return idx;
281}
282
283/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
284 * happens when the buffer's end in reached. On success, the number of read
285 * bytes is returned. */
286static int
287decode_spoe_varint(char *buf, char *end, uint64_t *i)
288{
289 unsigned char *msg = (unsigned char *)buf;
290 int idx = 0;
291
292 if (msg > (unsigned char *)end)
293 return -1;
294
295 if (msg[0] < 240) {
296 *i = msg[0];
297 return 1;
298 }
299 *i = msg[0];
300 do {
301 ++idx;
302 if (msg+idx > (unsigned char *)end)
303 return -1;
304 *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
305 } while (msg[idx] >= 128);
306 return (idx + 1);
307}
308
309/* Encode a string. The string will be prefix by its length, encoded as a
310 * variable-length integer. This function never fails and returns the number of
311 * written bytes. */
312static int
313encode_spoe_string(const char *str, size_t len, char *dst)
314{
315 int idx = 0;
316
317 if (!len) {
318 dst[0] = 0;
319 return 1;
320 }
321
322 idx += encode_spoe_varint(len, dst);
323 memcpy(dst+idx, str, len);
324 return (idx + len);
325}
326
327/* Decode a string. Its length is decoded first as a variable-length integer. If
328 * it succeeds, and if the string length is valid, the begin of the string is
329 * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
330 * read is returned. If an error occurred, -1 is returned and <*str> remains
331 * NULL. */
332static int
333decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
334{
335 int r, idx = 0;
336
337 *str = NULL;
338 *len = 0;
339
340 if ((r = decode_spoe_varint(buf, end, len)) == -1)
341 goto error;
342 idx += r;
343 if (buf + idx + *len > end)
344 goto error;
345
346 *str = buf+idx;
347 return (idx + *len);
348
349error:
350 return -1;
351}
352
353/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
354 * of bytes read is returned. A types data is composed of a type (1 byte) and
355 * corresponding data:
356 * - boolean: non additional data (0 bytes)
357 * - integers: a variable-length integer (see decode_spoe_varint)
358 * - ipv4: 4 bytes
359 * - ipv6: 16 bytes
360 * - binary and string: a buffer prefixed by its size, a variable-length
361 * integer (see decode_spoe_string) */
362static int
363skip_spoe_data(char *frame, char *end)
364{
365 uint64_t sz = 0;
366 int r, idx = 0;
367
368 if (frame > end)
369 return -1;
370
371 switch (frame[idx++] & SPOE_DATA_T_MASK) {
372 case SPOE_DATA_T_BOOL:
373 idx++;
374 break;
375 case SPOE_DATA_T_INT32:
376 case SPOE_DATA_T_INT64:
377 case SPOE_DATA_T_UINT32:
378 case SPOE_DATA_T_UINT64:
379 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
380 return -1;
381 idx += r;
382 break;
383 case SPOE_DATA_T_IPV4:
384 idx += 4;
385 break;
386 case SPOE_DATA_T_IPV6:
387 idx += 16;
388 break;
389 case SPOE_DATA_T_STR:
390 case SPOE_DATA_T_BIN:
391 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
392 return -1;
393 idx += r + sz;
394 break;
395 }
396
397 if (frame+idx > end)
398 return -1;
399 return idx;
400}
401
402/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
403 * number of read bytes is returned. See skip_spoe_data for details. */
404static int
405decode_spoe_data(char *frame, char *end, struct spoe_data *data)
406{
407 uint64_t sz = 0;
408 int type, r, idx = 0;
409
410 if (frame > end)
411 return -1;
412
413 type = frame[idx++];
414 data->type = (type & SPOE_DATA_T_MASK);
415 switch (data->type) {
416 case SPOE_DATA_T_BOOL:
417 data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
418 break;
419 case SPOE_DATA_T_INT32:
420 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
421 return -1;
422 data->u.sint32 = sz;
423 idx += r;
424 break;
425 case SPOE_DATA_T_INT64:
426 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
427 return -1;
428 data->u.uint32 = sz;
429 idx += r;
430 break;
431 case SPOE_DATA_T_UINT32:
432 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
433 return -1;
434 data->u.sint64 = sz;
435 idx += r;
436 break;
437 case SPOE_DATA_T_UINT64:
438 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
439 return -1;
440 data->u.uint64 = sz;
441 idx += r;
442 break;
443 case SPOE_DATA_T_IPV4:
444 if (frame+idx+4 > end)
445 return -1;
446 memcpy(&data->u.ipv4, frame+idx, 4);
447 idx += 4;
448 break;
449 case SPOE_DATA_T_IPV6:
450 if (frame+idx+16 > end)
451 return -1;
452 memcpy(&data->u.ipv6, frame+idx, 16);
453 idx += 16;
454 break;
455 case SPOE_DATA_T_STR:
456 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
457 return -1;
458 idx += r;
459 if (frame+idx+sz > end)
460 return -1;
461 data->u.buffer.str = frame+idx;
462 data->u.buffer.len = sz;
463 idx += sz;
464 break;
465 case SPOE_DATA_T_BIN:
466 if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
467 return -1;
468 idx += r;
469 if (frame+idx+sz > end)
470 return -1;
471 data->u.buffer.str = frame+idx;
472 data->u.buffer.len = sz;
473 idx += sz;
474 break;
475 default:
476 break;
477 }
478
479 if (frame+idx > end)
480 return -1;
481 return idx;
482}
483
484
485/* Check the protocol version. It returns -1 if an error occurred, the number of
486 * read bytes otherwise. */
487static int
488check_proto_version(struct worker *w, int idx)
489{
490 char *str;
491 uint64_t sz;
492
493 /* Get the list of all supported versions by HAProxy */
494 if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
495 w->status_code = SPOE_FRM_ERR_INVALID;
496 return -1;
497 }
498 idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
499 if (str == NULL) {
500 w->status_code = SPOE_FRM_ERR_INVALID;
501 return -1;
502 }
503
Ilya Shipitsince7b00f2020-03-23 22:28:40 +0500504 /* TODO: Find the right version in supported ones */
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100505
506 return idx;
507}
508
509/* Check max frame size value. It returns -1 if an error occurred, the number of
510 * read bytes otherwise. */
511static int
512check_max_frame_size(struct worker *w, int idx)
513{
514 uint64_t sz;
515 int type, i;
516
517 /* Get the max-frame-size value of HAProxy */
518 type = w->buf[idx++];
519 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
520 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
521 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
522 (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
523 w->status_code = SPOE_FRM_ERR_INVALID;
524 return -1;
525 }
526 if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
527 w->status_code = SPOE_FRM_ERR_INVALID;
528 return -1;
529 }
530 idx += i;
531
532 /* Keep the lower value */
533 if (sz < w->size)
534 w->size = sz;
535
536 return idx;
537}
538
539/* Check healthcheck value. It returns -1 if an error occurred, the number of
540 * read bytes otherwise. */
541static int
542check_healthcheck(struct worker *w, int idx)
543{
544 int type;
545
546 /* Get the "healthcheck" value of HAProxy */
547 type = w->buf[idx++];
548 if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
549 w->status_code = SPOE_FRM_ERR_INVALID;
550 return -1;
551 }
552 w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
553 return idx;
554}
555
556
557/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
558 * occurred, 0 if the frame must be skipped, otherwise the number of read
559 * bytes. */
560static int
561handle_hahello(struct worker *w)
562{
563 char *end = w->buf+w->len;
564 int i, idx = 0;
565
566 /* Check frame type */
567 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
568 goto skip;
569
570 /* Skip flags */
571 idx += 4;
572
573 /* stream-id and frame-id must be cleared */
574 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
575 w->status_code = SPOE_FRM_ERR_INVALID;
576 goto error;
577 }
578 idx += 2;
579
580 /* Loop on K/V items */
581 while (idx < w->len) {
582 char *str;
583 uint64_t sz;
584
585 /* Decode the item name */
586 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
587 if (str == NULL) {
588 w->status_code = SPOE_FRM_ERR_INVALID;
589 goto error;
590 }
591
592 /* Check "supported-versions" K/V item */
593 if (!memcmp(str, "supported-versions", sz)) {
594 if ((i = check_proto_version(w, idx)) == -1)
595 goto error;
596 idx = i;
597 }
598 /* Check "max-frame-size" K/V item "*/
599 else if (!memcmp(str, "max-frame-size", sz)) {
600 if ((i = check_max_frame_size(w, idx)) == -1)
601 goto error;
602 idx = i;
603 }
604 /* Check "healthcheck" K/V item "*/
605 else if (!memcmp(str, "healthcheck", sz)) {
606 if ((i = check_healthcheck(w, idx)) == -1)
607 goto error;
608 idx = i;
609 }
610 /* Skip "capabilities" K/V item for now */
611 else {
612 /* Silently ignore unknown item */
613 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
614 w->status_code = SPOE_FRM_ERR_INVALID;
615 goto error;
616 }
617 idx += i;
618 }
619 }
620
621 return idx;
622skip:
623 return 0;
624error:
625 return -1;
626}
627
628/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
629 * occurred, 0 if the frame must be skipped, otherwise the number of read
630 * bytes. */
631static int
632handle_hadiscon(struct worker *w)
633{
634 char *end = w->buf+w->len;
635 int i, idx = 0;
636
637 /* Check frame type */
638 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
639 goto skip;
640
641 /* Skip flags */
642 idx += 4;
643
644 /* stream-id and frame-id must be cleared */
645 if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
646 w->status_code = SPOE_FRM_ERR_INVALID;
647 goto error;
648 }
649 idx += 2;
650
651 /* Loop on K/V items */
652 while (idx < w->len) {
653 char *str;
654 uint64_t sz;
655
656 /* Decode item key */
657 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
658 if (str == NULL) {
659 w->status_code = SPOE_FRM_ERR_INVALID;
660 goto error;
661 }
662 /* Silently ignore unknown item */
663 if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
664 w->status_code = SPOE_FRM_ERR_INVALID;
665 goto error;
666 }
667 idx += i;
668 }
669
670 w->status_code = SPOE_FRM_ERR_NONE;
671 return idx;
672skip:
673 return 0;
674error:
675 return -1;
676}
677
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100678/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
679 * the number of written bytes otherwise. */
680static void prepare_agentack(struct worker *w)
681{
Daniel Corbett4e0fa552019-06-11 09:46:27 -0400682 unsigned int flags = 0;
683
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100684 w->ack_len = 0;
685
686 /* Frame type */
687 w->ack[w->ack_len++] = SPOE_FRM_T_AGENT_ACK;
688
Daniel Corbett4e0fa552019-06-11 09:46:27 -0400689 /* Set flags */
690 flags |= htonl(SPOE_FRM_FL_FIN);
691 memcpy(w->ack + w->ack_len, &flags, 4);
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100692 w->ack_len += 4;
693
694 /* Set stream-id and frame-id for ACK frames */
695 w->ack_len += encode_spoe_varint(w->stream_id, w->ack + w->ack_len);
696 w->ack_len += encode_spoe_varint(w->frame_id, w->ack + w->ack_len);
697}
698
699static inline
700int set_var_name(struct worker *w, const char *name, int name_len, unsigned char scope)
701{
702 w->ack[w->ack_len++] = SPOE_ACT_T_SET_VAR; /* Action type */
703 w->ack[w->ack_len++] = 3; /* Number of args */
704 w->ack[w->ack_len++] = scope; /* Arg 1: the scope */
705 w->ack_len += encode_spoe_string(name, name_len, w->ack+w->ack_len); /* Arg 2: variable name */
706 return 1;
707}
708
709int set_var_null(struct worker *w,
710 const char *name, int name_len,
711 unsigned char scope)
712{
713 if (!set_var_name(w, name, name_len, scope))
714 return 0;
715 w->ack[w->ack_len++] = SPOE_DATA_T_NULL;
716 return 1;
717}
718
719int set_var_bool(struct worker *w,
720 const char *name, int name_len,
721 unsigned char scope, bool value)
722{
723 if (!set_var_name(w, name, name_len, scope))
724 return 0;
725 w->ack[w->ack_len++] = SPOE_DATA_T_BOOL | (!!value << 4);
726 return 1;
727}
728
729static inline
730int set_var_int(struct worker *w,
731 const char *name, int name_len,
732 unsigned char scope, int type, uint64_t value)
733{
734 if (!set_var_name(w, name, name_len, scope))
735 return 0;
736 w->ack[w->ack_len++] = SPOE_DATA_T_UINT32;
737 w->ack_len += encode_spoe_varint(value, w->ack+w->ack_len); /* Arg 3: variable value */
738 return 1;
739}
740
741int set_var_uint32(struct worker *w,
742 const char *name, int name_len,
743 unsigned char scope, uint32_t value)
744{
745 return set_var_int(w, name, name_len, scope, SPOE_DATA_T_UINT32, value);
746}
747
748int set_var_int32(struct worker *w,
749 const char *name, int name_len,
750 unsigned char scope, int32_t value)
751{
752 return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
753}
754
755int set_var_uint64(struct worker *w,
756 const char *name, int name_len,
757 unsigned char scope, uint64_t value)
758{
759 return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
760}
761
762int set_var_int64(struct worker *w,
763 const char *name, int name_len,
764 unsigned char scope, int64_t value)
765{
766 return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
767}
768
769int set_var_ipv4(struct worker *w,
770 const char *name, int name_len,
771 unsigned char scope,
772 struct in_addr *ipv4)
773{
774 if (!set_var_name(w, name, name_len, scope))
775 return 0;
776 w->ack[w->ack_len++] = SPOE_DATA_T_IPV4;
777 memcpy(w->ack+w->ack_len, ipv4, 4);
778 w->ack_len += 4;
779 return 1;
780}
781
782int set_var_ipv6(struct worker *w,
783 const char *name, int name_len,
784 unsigned char scope,
785 struct in6_addr *ipv6)
786{
787 if (!set_var_name(w, name, name_len, scope))
788 return 0;
789 w->ack[w->ack_len++] = SPOE_DATA_T_IPV6;
790 memcpy(w->ack+w->ack_len, ipv6, 16);
791 w->ack_len += 16;
792 return 1;
793}
794
795static inline
796int set_var_buf(struct worker *w,
797 const char *name, int name_len,
798 unsigned char scope, int type,
799 const char *str, int str_len)
800{
801 if (!set_var_name(w, name, name_len, scope))
802 return 0;
803 w->ack[w->ack_len++] = type;
804 w->ack_len += encode_spoe_string(str, str_len, w->ack+w->ack_len);
805 return 1;
806}
807
808int set_var_string(struct worker *w,
809 const char *name, int name_len,
810 unsigned char scope,
811 const char *str, int strlen)
812{
813 return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_STR, str, strlen);
814}
815
816int set_var_bin(struct worker *w,
817 const char *name, int name_len,
818 unsigned char scope,
819 const char *str, int strlen)
820{
821 return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_BIN, str, strlen);
822}
823
824/* This function is a little bit ugly,
Ilya Shipitsince7b00f2020-03-23 22:28:40 +0500825 * TODO: improve the response without copying the buffer
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100826 */
827static int commit_agentack(struct worker *w)
828{
829 memcpy(w->buf, w->ack, w->ack_len);
830 w->len = w->ack_len;
831 return 1;
832}
833
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100834/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
835 * occurred, 0 if the frame must be skipped, otherwise the number of read
836 * bytes. */
837static int
838handle_hanotify(struct worker *w)
839{
840 char *end = w->buf+w->len;
841 uint64_t stream_id, frame_id;
842 int nbargs, i, idx = 0;
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +0100843 int index;
844 struct spoe_kv args[256];
845 uint64_t length;
846 struct ps_message *msg;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100847
848 /* Check frame type */
849 if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
850 goto skip;
851
852 /* Skip flags */
853 idx += 4;
854
855 /* Read the stream-id */
856 if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
857 w->status_code = SPOE_FRM_ERR_INVALID;
858 goto error;
859 }
860 idx += i;
861
862 /* Read the frame-id */
863 if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
864 w->status_code = SPOE_FRM_ERR_INVALID;
865 goto error;
866 }
867 idx += i;
868
869 w->stream_id = (unsigned int)stream_id;
870 w->frame_id = (unsigned int)frame_id;
871
872 DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
873 w->stream_id, w->frame_id);
874
Ilya Shipitsince7b00f2020-03-23 22:28:40 +0500875 /* Prepare ack, if the processing fails the ack will be cancelled */
Thierry FOURNIERfbd38242018-02-23 18:24:10 +0100876 prepare_agentack(w);
877
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100878 /* Loop on messages */
879 while (idx < w->len) {
880 char *str;
881 uint64_t sz;
882
883 /* Decode the message name */
884 idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
885 if (str == NULL) {
886 w->status_code = SPOE_FRM_ERR_INVALID;
887 goto error;
888 }
889 DEBUG(" Message '%.*s' received", (int)sz, str);
890
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +0100891 /* Decode all SPOE data */
892 nbargs = (unsigned char)w->buf[idx++];
893 for (index = 0; index < nbargs; index++) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100894
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +0100895 /* Read the key name */
896 if ((i = decode_spoe_string(w->buf+idx, end,
897 &args[index].name.str,
898 &length)) == -1) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100899 w->status_code = SPOE_FRM_ERR_INVALID;
900 goto error;
901 }
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +0100902 if (length > INT_MAX) {
903 w->status_code = SPOE_FRM_ERR_TOO_BIG;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100904 goto error;
905 }
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +0100906 args[index].name.len = length;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100907 idx += i;
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +0100908
909 /* Read the value */
910 memset(&args[index].value, 0, sizeof(args[index].value));
911 if ((i = decode_spoe_data(w->buf+idx, end, &args[index].value)) == -1) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100912 w->status_code = SPOE_FRM_ERR_INVALID;
913 goto error;
914 }
915 idx += i;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100916 }
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +0100917
Ilya Shipitsince7b00f2020-03-23 22:28:40 +0500918 /* Lookup for existing bindings. If no existing message
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +0100919 * where found, does nothing.
920 */
921 for (msg = ps_messages; msg; msg = msg->next)
922 if (sz == strlen(msg->name) && strncmp(str, msg->name, sz) == 0)
923 break;
924 if (msg == NULL || msg->ps->exec_message == NULL) {
925 DEBUG(" Message '%.*s' have no bindings registered", (int)sz, str);
926 continue;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100927 }
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +0100928
929 /* Process the message */
930 msg->ps->exec_message(w, msg->ref, nbargs, args);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100931 }
932
933 return idx;
934skip:
935 return 0;
936error:
937 return -1;
938}
939
940/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
941 * occurred, the number of written bytes otherwise. */
942static int
943prepare_agenthello(struct worker *w)
944{
945 int idx = 0;
Daniel Corbett4e0fa552019-06-11 09:46:27 -0400946 unsigned int flags = 0;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100947
948 /* Frame Type */
949 w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
950
Daniel Corbett4e0fa552019-06-11 09:46:27 -0400951 /* Set flags */
952 flags |= htonl(SPOE_FRM_FL_FIN);
953 memcpy(w->buf+idx, &flags, 4);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100954 idx += 4;
955
956 /* No stream-id and frame-id for HELLO frames */
957 w->buf[idx++] = 0;
958 w->buf[idx++] = 0;
959
960 /* "version" K/V item */
961 idx += encode_spoe_string("version", 7, w->buf+idx);
962 w->buf[idx++] = SPOE_DATA_T_STR;
963 idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
964
965 /* "max-frame-size" K/V item */
966 idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
967 w->buf[idx++] = SPOE_DATA_T_UINT32;
968 idx += encode_spoe_varint(w->size, w->buf+idx);
969
970 /* "capabilities" K/V item */
971 idx += encode_spoe_string("capabilities", 12, w->buf+idx);
972 w->buf[idx++] = SPOE_DATA_T_STR;
973 idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
974
975 w->len = idx;
976 return idx;
977}
978
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100979/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
980 * occurred, the number of written bytes otherwise. */
981static int
982prepare_agentdicon(struct worker *w)
983{
984 const char *reason;
985 int rlen, idx = 0;
Daniel Corbett4e0fa552019-06-11 09:46:27 -0400986 unsigned int flags = 0;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100987
988 if (w->status_code >= SPOE_FRM_ERRS)
989 w->status_code = SPOE_FRM_ERR_UNKNOWN;
990 reason = spoe_frm_err_reasons[w->status_code];
991 rlen = strlen(reason);
992
993 /* Frame type */
994 w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
995
Daniel Corbett4e0fa552019-06-11 09:46:27 -0400996 /* Set flags */
997 flags |= htonl(SPOE_FRM_FL_FIN);
998 memcpy(w->buf+idx, &flags, 4);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +0100999 idx += 4;
1000
1001 /* No stream-id and frame-id for DISCONNECT frames */
1002 w->buf[idx++] = 0;
1003 w->buf[idx++] = 0;
1004
1005 /* There are 2 mandatory items: "status-code" and "message" */
1006
1007 /* "status-code" K/V item */
1008 idx += encode_spoe_string("status-code", 11, w->buf+idx);
1009 w->buf[idx++] = SPOE_DATA_T_UINT32;
1010 idx += encode_spoe_varint(w->status_code, w->buf+idx);
1011
1012 /* "message" K/V item */
1013 idx += encode_spoe_string("message", 7, w->buf+idx);
1014 w->buf[idx++] = SPOE_DATA_T_STR;
1015 idx += encode_spoe_string(reason, rlen, w->buf+idx);
1016
1017 w->len = idx;
1018 return idx;
1019}
1020
1021static int
1022hello_handshake(int sock, struct worker *w)
1023{
1024 if (read_frame(sock, w) < 0) {
1025 LOG("Failed to read Haproxy HELLO frame");
1026 goto error;
1027 }
1028 if (handle_hahello(w) < 0) {
1029 LOG("Failed to handle Haproxy HELLO frame");
1030 goto error;
1031 }
1032 if (prepare_agenthello(w) < 0) {
1033 LOG("Failed to prepare Agent HELLO frame");
1034 goto error;
1035 }
1036 if (write_frame(sock, w) < 0) {
1037 LOG("Failed to write Agent frame");
1038 goto error;
1039 }
1040 DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
1041 SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
1042 return 0;
1043error:
1044 return -1;
1045}
1046
1047static int
1048notify_ack_roundtip(int sock, struct worker *w)
1049{
1050 if (read_frame(sock, w) < 0) {
1051 LOG("Failed to read Haproxy NOTIFY frame");
1052 goto error_or_quit;
1053 }
1054 if (handle_hadiscon(w) != 0) {
1055 if (w->status_code != SPOE_FRM_ERR_NONE)
1056 LOG("Failed to handle Haproxy DISCONNECT frame");
1057 DEBUG("Disconnect frame received: reason=%s",
1058 spoe_frm_err_reasons[w->status_code]);
1059 goto error_or_quit;
1060 }
1061 if (handle_hanotify(w) < 0) {
1062 LOG("Failed to handle Haproxy NOTIFY frame");
1063 goto error_or_quit;
1064 }
Thierry FOURNIERfbd38242018-02-23 18:24:10 +01001065 if (commit_agentack(w) < 0) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001066 LOG("Failed to prepare Agent ACK frame");
1067 goto error_or_quit;
1068 }
1069 if (write_frame(sock, w) < 0) {
1070 LOG("Failed to write Agent ACK frame");
1071 goto error_or_quit;
1072 }
1073 DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
1074 w->stream_id, w->frame_id);
1075 return 0;
1076error_or_quit:
1077 return -1;
1078}
1079
1080static void *
Thierry FOURNIER5301ed12018-02-23 11:59:15 +01001081spoa_worker(void *data)
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001082{
1083 struct worker w;
1084 struct sockaddr_in client;
1085 int *info = (int *)data;
1086 int csock, lsock = info[0];
Thierry FOURNIER64eaa332018-02-23 14:58:40 +01001087 struct ps *ps;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001088 int i;
1089 int len;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001090
1091 signal(SIGPIPE, SIG_IGN);
1092 pthread_setspecific(worker_id, &info[1]);
1093
Thierry FOURNIER64eaa332018-02-23 14:58:40 +01001094 /* Init registered processors */
1095 for (ps = ps_list; ps != NULL; ps = ps->next)
1096 ps->init_worker(&w);
1097
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001098 /* Load files */
1099 for (i = 0; i < nfiles; i++) {
1100 len = strlen(files[i]);
1101 for (ps = ps_list; ps != NULL; ps = ps->next)
1102 if (strcmp(files[i] + len - strlen(ps->ext), ps->ext) == 0)
1103 break;
1104 if (ps == NULL) {
1105 LOG("Can't load file \"%s\"\n", files[i]);
1106 goto out;
1107 }
1108 if (!ps->load_file(&w, files[i]))
1109 goto out;
1110 }
1111
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001112 while (1) {
1113 socklen_t sz = sizeof(client);
1114
1115 if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) {
1116 LOG("Failed to accept client connection: %m");
1117 goto out;
1118 }
1119 memset(&w, 0, sizeof(w));
1120 w.id = info[1];
1121 w.size = MAX_FRAME_SIZE;
1122
1123 DEBUG("New connection from HAProxy accepted");
1124
1125 if (hello_handshake(csock, &w) < 0)
1126 goto disconnect;
1127 if (w.healthcheck == true)
1128 goto close;
1129 while (1) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001130 if (notify_ack_roundtip(csock, &w) < 0)
1131 break;
1132 }
1133
1134 disconnect:
1135 if (w.status_code == SPOE_FRM_ERR_IO) {
1136 LOG("Close the client socket because of I/O errors");
1137 goto close;
1138 }
1139 if (prepare_agentdicon(&w) < 0) {
1140 LOG("Failed to prepare Agent DISCONNECT frame");
1141 goto close;
1142 }
1143 if (write_frame(csock, &w) < 0) {
1144 LOG("Failed to write Agent DISCONNECT frame");
1145 goto close;
1146 }
1147 DEBUG("Disconnect frame sent: reason=%s",
1148 spoe_frm_err_reasons[w.status_code]);
1149
1150 close:
1151 close(csock);
1152 }
1153
1154out:
1155 free(info);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001156#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001157 pthread_exit(NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001158#endif
1159 return NULL;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001160}
1161
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001162int process_create(pid_t *pid, void *(*ps)(void *), void *data)
1163{
Thierry FOURNIER786e9e62018-02-23 19:11:47 +01001164 if (debug) {
1165 ps(data);
1166 exit(EXIT_SUCCESS);
1167 }
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001168 *pid = fork();
1169 if (*pid == -1)
1170 return -1;
1171 if (*pid > 0)
1172 return 0;
1173 ps(data);
1174 return 0;
1175}
1176
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001177static void
1178usage(char *prog)
1179{
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001180 fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>] -f <file>\n", prog);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001181 fprintf(stderr, " -h Print this message\n");
1182 fprintf(stderr, " -d Enable the debug mode\n");
1183 fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n");
1184 fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001185 fprintf(stderr, " -f <file> Specify the file whoch contains the processing code.\n");
1186 fprintf(stderr, " This argument can specified more than once.\n");
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001187}
1188
1189int
1190main(int argc, char **argv)
1191{
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001192#if 0
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001193 pthread_t *ts = NULL;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001194#endif
1195 pid_t *pids;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001196 struct sockaddr_in server;
1197 int i, sock, opt, nbworkers, port;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001198 int status;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001199
1200 nbworkers = NUM_WORKERS;
1201 port = DEFAULT_PORT;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001202 while ((opt = getopt(argc, argv, "hdn:p:f:")) != -1) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001203 switch (opt) {
1204 case 'h':
1205 usage(argv[0]);
1206 return EXIT_SUCCESS;
1207 case 'd':
1208 debug = true;
1209 break;
1210 case 'n':
1211 nbworkers = atoi(optarg);
1212 break;
1213 case 'p':
1214 port = atoi(optarg);
1215 break;
Thierry FOURNIER8b9a73b2018-02-23 15:12:55 +01001216 case 'f':
1217 add_file(optarg);
1218 break;
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001219 default:
1220 usage(argv[0]);
1221 return EXIT_FAILURE;
1222 }
1223 }
1224
1225 if (nbworkers <= 0) {
1226 fprintf(stderr, "%s: Invalid number of workers '%d'\n",
1227 argv[0], nbworkers);
1228 goto error;
1229 }
1230 if (port <= 0) {
1231 fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
1232 goto error;
1233 }
1234
1235 if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1236 fprintf(stderr, "Failed creating socket: %m\n");
1237 goto error;
1238 }
1239
1240 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
1241 setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
1242
1243 memset(&server, 0, sizeof(server));
1244 server.sin_family = AF_INET;
1245 server.sin_addr.s_addr = INADDR_ANY;
1246 server.sin_port = htons(port);
1247
1248 if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
1249 fprintf(stderr, "Failed to bind the socket: %m\n");
1250 goto error;
1251 }
1252
1253 if (listen(sock , 10) < 0) {
1254 fprintf(stderr, "Failed to listen on the socket: %m\n");
1255 goto error;
1256 }
1257 fprintf(stderr, "SPOA is listening on port %d\n", port);
1258
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001259 pthread_key_create(&worker_id, NULL);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001260
1261 /* Initialise the server in thread mode. This code is commented
1262 * out and not deleted, because later I expect to work with
1263 * process ansd threads. This first version just support processes.
1264 */
1265#if 0
1266 ts = calloc(nbworkers, sizeof(*ts));
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001267 for (i = 0; i < nbworkers; i++) {
1268 int *info = calloc(2, sizeof(*info));
1269
1270 info[0] = sock;
1271 info[1] = i+1;
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001272
Thierry FOURNIER5301ed12018-02-23 11:59:15 +01001273 if (pthread_create(&ts[i], NULL, spoa_worker, info) < 0) {
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001274 fprintf(stderr, "Failed to create thread %d: %m\n", i+1);
1275 goto error;
1276 }
1277 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1278 }
1279
1280 for (i = 0; i < nbworkers; i++) {
1281 pthread_join(ts[i], NULL);
1282 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1283 }
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001284 free(ts);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001285#endif
1286
1287 /* Start processes */
1288 pids = calloc(nbworkers, sizeof(*pids));
1289 if (!pids) {
1290 fprintf(stderr, "Out of memory error\n");
1291 goto error;
1292 }
1293 for (i = 0; i < nbworkers; i++) {
1294 int *info = calloc(2, sizeof(*info));
1295
1296 info[0] = sock;
1297 info[1] = i+1;
1298
1299 if (process_create(&pids[i], spoa_worker, info) == -1) {
1300 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1301 goto error;
1302 }
1303 fprintf(stderr, "SPOA worker %02d started\n", i+1);
1304 }
1305 for (i = 0; i < nbworkers; i++) {
1306 waitpid(pids[0], &status, 0);
1307 fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
1308 }
1309
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001310 close(sock);
Thierry FOURNIER7de6fc62018-02-23 13:50:26 +01001311 pthread_key_delete(worker_id);
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001312 return EXIT_SUCCESS;
Thierry FOURNIERa09df3f2018-02-23 14:42:46 +01001313
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001314error:
Thierry FOURNIERd8b5c772018-02-23 11:40:03 +01001315 return EXIT_FAILURE;
1316}