MEDIUM: mworker: replace the master pipe by socketpairs

In order to communicate with the workers, the master pipe has been
replaced by a socketpair() per worker.

The goal is to use these sockets as stats sockets and be able to access
them from the master.

When reloading, the master serialize the information of the workers and
put them in a environment variable. Once the master has been reexecuted
it unserialize that information and it is capable of closing the FDs of
the leaving children.
diff --git a/src/haproxy.c b/src/haproxy.c
index 0844af3..9cf6742 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -206,7 +206,20 @@
 static volatile sig_atomic_t caught_signal = 0;
 static char **next_argv = NULL;
 
-int mworker_pipe[2];
+struct list proc_list = LIST_HEAD_INIT(proc_list);
+
+int master = 0; /* 1 if in master, 0 if in child */
+
+struct mworker_proc {
+	int pid;
+	int ipc_fd[2]; /* 0 is master side, 1 is worker side */
+	int relative_pid;
+	struct list list;
+};
+
+struct mworker_proc *proc_self;
+
+
 
 /* list of the temporarily limited listeners because of lack of resource */
 struct list global_listener_queue = LIST_HEAD_INIT(global_listener_queue);
@@ -513,6 +526,65 @@
 		for (i = 0; i < global.nbproc; i++)
 			kill(children[i], sig);
 	}
+}
+
+/*
+ * serialize the proc list and put it in the environment
+ */
+static void mworker_proc_list_to_env()
+{
+	char *msg = NULL;
+	struct mworker_proc *child;
+
+	list_for_each_entry(child, &proc_list, list) {
+		if (msg)
+			memprintf(&msg, "%s|type=worker;fd=%d;pid=%d;rpid=%d", msg, child->ipc_fd[0], child->pid, child->relative_pid);
+		else
+			memprintf(&msg, "type=worker;fd=%d;pid=%d;rpid=%d", child->ipc_fd[0], child->pid, child->relative_pid);
+	}
+	if (msg)
+		setenv("HAPROXY_CHILDREN", msg, 1);
+}
+
+/*
+ * unserialize the proc list from the environment
+ */
+static void mworker_env_to_proc_list()
+{
+	char *msg, *token = NULL, *s1;
+
+	msg = getenv("HAPROXY_CHILDREN");
+	if (!msg)
+		return;
+
+	while ((token = strtok_r(msg, "|", &s1))) {
+		struct mworker_proc *child;
+		char *subtoken = NULL;
+		char *s2;
+
+		msg = NULL;
+
+		child = calloc(1, sizeof(*child));
+
+		while ((subtoken = strtok_r(token, ";", &s2))) {
+
+			token = NULL;
+
+			if (strncmp(subtoken, "fd=", 3) == 0) {
+				child->ipc_fd[0] = atoi(subtoken+3);
+			} else if (strncmp(subtoken, "pid=", 4) == 0) {
+				child->pid = atoi(subtoken+4);
+			} else if (strncmp(subtoken, "rpid=", 5) == 0) {
+				child->relative_pid = atoi(subtoken+5);
+			}
+		}
+		if (child->pid)
+			LIST_ADDQ(&proc_list, &child->list);
+		else
+			free(child);
+	}
+
+	unsetenv("HAPROXY_CHILDREN");
 }
 
 /*
@@ -628,6 +700,8 @@
 #endif
 	setenv("HAPROXY_MWORKER_REEXEC", "1", 1);
 
+	mworker_proc_list_to_env(); /* put the children description in the env */
+
 	/* compute length  */
 	while (next_argv[next_argc])
 		next_argc++;
@@ -710,6 +784,7 @@
 {
 	int exitpid = -1;
 	int status = 0;
+	struct mworker_proc *child, *it;
 
 restart_wait:
 
@@ -724,6 +799,16 @@
 		else
 			status = 255;
 
+		list_for_each_entry_safe(child, it, &proc_list, list) {
+			if (child->pid != exitpid)
+				continue;
+
+			LIST_DEL(&child->list);
+			close(child->ipc_fd[0]);
+			free(child);
+			break;
+		}
+
 		if (!children) {
 			ha_warning("Worker %d exited with code %d\n", exitpid, status);
 		} else {
@@ -760,6 +845,10 @@
 		sd_notifyf(0, "READY=1\nMAINPID=%lu", (unsigned long)getpid());
 #endif
 
+	master = 1;
+
+	mworker_env_to_proc_list(); /* get the info of the children in the env */
+
 	signal_register_fct(SIGTERM, mworker_catch_sigterm, SIGTERM);
 	signal_register_fct(SIGUSR1, mworker_catch_sigterm, SIGUSR1);
 	signal_register_fct(SIGINT, mworker_catch_sigterm, SIGINT);
@@ -770,6 +859,9 @@
 	mworker_unblock_signals();
 	mworker_cleanlisteners();
 
+	mworker_catch_sigchld(NULL); /* ensure we clean the children in case
+				     some SIGCHLD were lost */
+
 	tid = 0;
 
 	global.nbthread = 1;
@@ -2391,13 +2483,13 @@
 /* should only be called once per process */
 void mworker_pipe_register()
 {
-	if (fdtab[mworker_pipe[0]].owner)
+	if (fdtab[proc_self->ipc_fd[1]].owner)
 		/* already initialized */
 		return;
 
-	fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
-	fd_insert(mworker_pipe[0], mworker_pipe, mworker_pipe_handler, MAX_THREADS_MASK);
-	fd_want_recv(mworker_pipe[0]);
+	fcntl(proc_self->ipc_fd[1], F_SETFL, O_NONBLOCK);
+	fd_insert(proc_self->ipc_fd[1], proc_self->ipc_fd, mworker_pipe_handler, MAX_THREADS_MASK);
+	fd_want_recv(proc_self->ipc_fd[1]);
 }
 
 /* Runs the polling loop */
@@ -2466,7 +2558,7 @@
 		}
 	}
 
-	if (global.mode & MODE_MWORKER) {
+	if ((global.mode & MODE_MWORKER) && master == 0) {
 		HA_SPIN_LOCK(START_LOCK, &start_lock);
 		mworker_pipe_register();
 		HA_SPIN_UNLOCK(START_LOCK, &start_lock);
@@ -2786,33 +2878,6 @@
 				setsid();
 		}
 
-		if (global.mode & MODE_MWORKER) {
-			if ((getenv("HAPROXY_MWORKER_REEXEC") == NULL)) {
-				char *msg = NULL;
-				/* master pipe to ensure the master is still alive  */
-				ret = pipe(mworker_pipe);
-				if (ret < 0) {
-					ha_alert("[%s.main()] Cannot create master pipe.\n", argv[0]);
-					exit(EXIT_FAILURE);
-				} else {
-					memprintf(&msg, "%d", mworker_pipe[0]);
-					setenv("HAPROXY_MWORKER_PIPE_RD", msg, 1);
-					memprintf(&msg, "%d", mworker_pipe[1]);
-					setenv("HAPROXY_MWORKER_PIPE_WR", msg, 1);
-					free(msg);
-				}
-			} else {
-				char* rd = getenv("HAPROXY_MWORKER_PIPE_RD");
-				char* wr = getenv("HAPROXY_MWORKER_PIPE_WR");
-				if (!rd || !wr) {
-					ha_alert("[%s.main()] Cannot get master pipe FDs.\n", argv[0]);
-					atexit_flag = 0;// dont reexecute master process
-					exit(EXIT_FAILURE);
-				}
-				mworker_pipe[0] = atoi(rd);
-				mworker_pipe[1] = atoi(wr);
-			}
-		}
 
 		/* if in master-worker mode, write the PID of the father */
 		if (global.mode & MODE_MWORKER) {
@@ -2824,6 +2889,24 @@
 
 		/* the father launches the required number of processes */
 		for (proc = 0; proc < global.nbproc; proc++) {
+			if (global.mode & MODE_MWORKER) {
+
+				proc_self = malloc(sizeof(*proc_self));
+				if (!proc_self) {
+					ha_alert("[%s.main()] Cannot allocate process structures.\n", argv[0]);
+					exit(1);
+				}
+
+				/* master pipe to ensure the master is still alive  */
+				ret = socketpair(AF_UNIX, SOCK_STREAM, 0, proc_self->ipc_fd);
+				if (ret < 0) {
+					ha_alert("[%s.main()] Cannot create master pipe.\n", argv[0]);
+					exit(EXIT_FAILURE);
+				} else {
+					proc_self->relative_pid = relative_pid;
+					LIST_ADDQ(&proc_list, &proc_self->list);
+				}
+			}
 			ret = fork();
 			if (ret < 0) {
 				ha_alert("[%s.main()] Cannot fork.\n", argv[0]);
@@ -2838,6 +2921,12 @@
 				snprintf(pidstr, sizeof(pidstr), "%d\n", ret);
 				shut_your_big_mouth_gcc(write(pidfd, pidstr, strlen(pidstr)));
 			}
+			if (global.mode & MODE_MWORKER) {
+				proc_self->pid = ret;
+				close(proc_self->ipc_fd[1]); /* close client side */
+				proc_self->ipc_fd[1] = -1;
+			}
+
 			relative_pid++; /* each child will get a different one */
 			pid_bit <<= 1;
 		}
@@ -2898,9 +2987,24 @@
 		/* child must never use the atexit function */
 		atexit_flag = 0;
 
-		/* close the write end of the master pipe in the children */
-		if (global.mode & MODE_MWORKER)
-			close(mworker_pipe[1]);
+		/* close useless master sockets */
+		if (global.mode & MODE_MWORKER) {
+			struct mworker_proc *child, *it;
+			master = 0;
+
+			/* free proc struct of other processes  */
+			list_for_each_entry_safe(child, it, &proc_list, list) {
+				if (child->ipc_fd[0] > -1) {
+					close(child->ipc_fd[0]);
+					child->ipc_fd[0] = -1;
+				}
+				if (child == proc_self)
+					continue;
+				close(child->ipc_fd[1]);
+				LIST_DEL(&child->list);
+				free(child);
+			}
+		}
 
 		if (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)) {
 			devnullfd = open("/dev/null", O_RDWR, 0);