BUG/MEDIUM: threads: Run the poll loop on the main thread too

There was a flaw in the way the threads was created. the main one was just used
to create all the others and just wait to exit. Now, it is used to run a poll
loop. So we only create nbthread-1 threads.

This also fixes a bug about the compression filter when there is only 1 thread
(nbthread == 1 or no threads support). The bug was in the way thread-local
resources was initialized. per-thread init/deinit callbacks were never called
for the main process. So, with nthread set to 1, some buffers remained
uninitialized.
diff --git a/src/buffer.c b/src/buffer.c
index e892d1e..db2e053 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -83,19 +83,13 @@
 
 	pool_free2(pool2_buffer, buffer);
 
-	if (global.nbthread > 1) {
-		hap_register_per_thread_init(init_buffer_per_thread);
-		hap_register_per_thread_deinit(deinit_buffer_per_thread);
-	}
-	else if (!init_buffer_per_thread())
-		     return 0;
-
+	hap_register_per_thread_init(init_buffer_per_thread);
+	hap_register_per_thread_deinit(deinit_buffer_per_thread);
 	return 1;
 }
 
 void deinit_buffer()
 {
-	deinit_buffer_per_thread();
 	pool_destroy2(pool2_buffer);
 }
 
diff --git a/src/chunk.c b/src/chunk.c
index 1fdcebb..15c681a 100644
--- a/src/chunk.c
+++ b/src/chunk.c
@@ -68,23 +68,37 @@
  */
 static int alloc_trash_buffers(int bufsize)
 {
+	chunk_init(&trash, my_realloc2(trash.str, bufsize), bufsize);
 	trash_size = bufsize;
 	trash_buf1 = (char *)my_realloc2(trash_buf1, bufsize);
 	trash_buf2 = (char *)my_realloc2(trash_buf2, bufsize);
-	pool2_trash = create_pool("trash", sizeof(struct chunk) + bufsize, MEM_F_EXACT);
-	return trash_buf1 && trash_buf2 && pool2_trash;
+	return trash.str && trash_buf1 && trash_buf2;
 }
 
+static int init_trash_buffers_per_thread()
+{
+	return alloc_trash_buffers(global.tune.bufsize);
+}
+
+static void deinit_trash_buffers_per_thread()
+{
+	chunk_destroy(&trash);
+	free(trash_buf2);
+	free(trash_buf1);
+	trash_buf2 = NULL;
+	trash_buf1 = NULL;
+}
+
 /* Initialize the trash buffers. It returns 0 if an error occurred. */
-int init_trash_buffers()
+int init_trash_buffers(int first)
 {
-	if (global.nbthread > 1 && tid == (unsigned int)(-1)) {
-		hap_register_per_thread_init(init_trash_buffers);
-		hap_register_per_thread_deinit(deinit_trash_buffers);
+	if (!first) {
+		hap_register_per_thread_init(init_trash_buffers_per_thread);
+		hap_register_per_thread_deinit(deinit_trash_buffers_per_thread);
 	}
-
-	chunk_init(&trash, my_realloc2(trash.str, global.tune.bufsize), global.tune.bufsize);
-	if (!trash.str || !alloc_trash_buffers(global.tune.bufsize))
+	pool_destroy2(pool2_trash);
+	pool2_trash = create_pool("trash", sizeof(struct chunk) + global.tune.bufsize, MEM_F_EXACT);
+	if (!pool2_trash || !alloc_trash_buffers(global.tune.bufsize))
 		return 0;
 	return 1;
 }
@@ -94,11 +108,7 @@
  */
 void deinit_trash_buffers(void)
 {
-	chunk_destroy(&trash);
-	free(trash_buf2);
-	free(trash_buf1);
-	trash_buf2 = NULL;
-	trash_buf1 = NULL;
+	pool_destroy2(pool2_trash);
 }
 
 /*
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index 642b4d6..d26c3a2 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -176,6 +176,7 @@
 static void deinit_epoll_per_thread()
 {
 	free(epoll_events);
+	epoll_events = NULL;
 }
 
 /*
@@ -191,18 +192,11 @@
 	if (epoll_fd < 0)
 		goto fail_fd;
 
-	if (global.nbthread > 1) {
-		hap_register_per_thread_init(init_epoll_per_thread);
-		hap_register_per_thread_deinit(deinit_epoll_per_thread);
-	}
-	else if (!init_epoll_per_thread())
-		goto fail_ee;
+	hap_register_per_thread_init(init_epoll_per_thread);
+	hap_register_per_thread_deinit(deinit_epoll_per_thread);
 
 	return 1;
 
- fail_ee:
-	close(epoll_fd);
-	epoll_fd = -1;
  fail_fd:
 	p->pref = 0;
 	return 0;
@@ -214,14 +208,11 @@
  */
 REGPRM1 static void _do_term(struct poller *p)
 {
-	free(epoll_events);
-
 	if (epoll_fd >= 0) {
 		close(epoll_fd);
 		epoll_fd = -1;
 	}
 
-	epoll_events = NULL;
 	p->private = NULL;
 	p->pref = 0;
 }
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 8f20db2..2ae2774 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -154,6 +154,7 @@
 static void deinit_kqueue_per_thread()
 {
 	free(kev);
+	kev = NULL;
 }
 
 /*
@@ -169,18 +170,10 @@
 	if (kqueue_fd < 0)
 		goto fail_fd;
 
-	if (global.nbthread > 1) {
-		hap_register_per_thread_init(init_kqueue_per_thread);
-		hap_register_per_thread_deinit(deinit_kqueue_per_thread);
-	}
-	else if (!init_kqueue_per_thread())
-		goto fail_kev;
-
+	hap_register_per_thread_init(init_kqueue_per_thread);
+	hap_register_per_thread_deinit(deinit_kqueue_per_thread);
 	return 1;
 
- fail_kev:
-	close(kqueue_fd);
-	kqueue_fd = -1;
  fail_fd:
 	p->pref = 0;
 	return 0;
@@ -192,8 +185,6 @@
  */
 REGPRM1 static void _do_term(struct poller *p)
 {
-	free(kev);
-
 	if (kqueue_fd >= 0) {
 		close(kqueue_fd);
 		kqueue_fd = -1;
diff --git a/src/ev_poll.c b/src/ev_poll.c
index 455c4e1..f7632b6 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -191,6 +191,7 @@
 static void deinit_poll_per_thread()
 {
 	free(poll_events);
+	poll_events = NULL;
 }
 
 /*
@@ -200,31 +201,26 @@
  */
 REGPRM1 static int _do_init(struct poller *p)
 {
-	__label__ fail_swevt, fail_srevt, fail_pe;
+	__label__ fail_swevt, fail_srevt;
 	int fd_evts_bytes;
 
 	p->private = NULL;
 	fd_evts_bytes = (global.maxsock + sizeof(**fd_evts) - 1) / sizeof(**fd_evts) * sizeof(**fd_evts);
 
-	if (global.nbthread > 1) {
-		hap_register_per_thread_init(init_poll_per_thread);
-		hap_register_per_thread_deinit(deinit_poll_per_thread);
-	}
-	else if (!init_poll_per_thread())
-		goto fail_pe;
-
 	if ((fd_evts[DIR_RD] = calloc(1, fd_evts_bytes)) == NULL)
 		goto fail_srevt;
 	if ((fd_evts[DIR_WR] = calloc(1, fd_evts_bytes)) == NULL)
 		goto fail_swevt;
 
+	hap_register_per_thread_init(init_poll_per_thread);
+	hap_register_per_thread_deinit(deinit_poll_per_thread);
+
 	return 1;
 
  fail_swevt:
 	free(fd_evts[DIR_RD]);
  fail_srevt:
 	free(poll_events);
- fail_pe:
 	p->pref = 0;
 	return 0;
 }
@@ -237,7 +233,6 @@
 {
 	free(fd_evts[DIR_WR]);
 	free(fd_evts[DIR_RD]);
-	free(poll_events);
 	p->private = NULL;
 	p->pref = 0;
 }
diff --git a/src/ev_select.c b/src/ev_select.c
index 49e980f..4f5a2d1 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -173,8 +173,8 @@
 
 static void deinit_select_per_thread()
 {
-	free(tmp_evts[DIR_WR]);
-	free(tmp_evts[DIR_RD]);
+	free(tmp_evts[DIR_WR]); tmp_evts[DIR_WR] = NULL;
+	free(tmp_evts[DIR_RD]); tmp_evts[DIR_RD] = NULL;
 }
 
 /*
@@ -193,18 +193,15 @@
 		goto fail_revt;
 
 	fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE;
-	if (global.nbthread > 1) {
-		hap_register_per_thread_init(init_select_per_thread);
-		hap_register_per_thread_deinit(deinit_select_per_thread);
-	}
-	else if (!init_select_per_thread())
-		goto fail_revt;
 
 	if ((fd_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
 		goto fail_srevt;
 	if ((fd_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
 		goto fail_swevt;
 
+	hap_register_per_thread_init(init_select_per_thread);
+	hap_register_per_thread_deinit(deinit_select_per_thread);
+
 	return 1;
 
  fail_swevt:
@@ -225,8 +222,6 @@
 {
 	free(fd_evts[DIR_WR]);
 	free(fd_evts[DIR_RD]);
-	free(tmp_evts[DIR_WR]);
-	free(tmp_evts[DIR_RD]);
 	p->private = NULL;
 	p->pref = 0;
 }
diff --git a/src/fd.c b/src/fd.c
index 58bc863..ea5d683 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -325,18 +325,12 @@
 	if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL)
 		goto fail_cache;
 
-	if (global.nbthread > 1) {
-		hap_register_per_thread_init(init_pollers_per_thread);
-		hap_register_per_thread_deinit(deinit_pollers_per_thread);
-	}
-	else if (!init_pollers_per_thread())
-		goto fail_updt;
+	hap_register_per_thread_init(init_pollers_per_thread);
+	hap_register_per_thread_deinit(deinit_pollers_per_thread);
 
 	for (p = 0; p < global.maxsock; p++)
 		SPIN_INIT(&fdtab[p].lock);
 
-	//memset(fd_cache, -1, global.maxsock);
-
 	SPIN_INIT(&fdtab_lock);
 	RWLOCK_INIT(&fdcache_lock);
 	SPIN_INIT(&poll_lock);
@@ -356,8 +350,6 @@
 	} while (!bp || bp->pref == 0);
 	return 0;
 
- fail_updt:
-	free(fd_cache);
  fail_cache:
 	free(fdinfo);
  fail_info:
@@ -384,7 +376,6 @@
 			bp->term(bp);
 	}
 
-	free(fd_updt);  fd_updt  = NULL;
 	free(fd_cache); fd_cache = NULL;
 	free(fdinfo);   fdinfo   = NULL;
 	free(fdtab);    fdtab    = NULL;
diff --git a/src/haproxy.c b/src/haproxy.c
index f4353e1..2d2d877 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -1179,7 +1179,7 @@
 	global.mode = MODE_STARTING;
 	next_argv = copy_argv(argc, argv);
 
-	if (!init_trash_buffers()) {
+	if (!init_trash_buffers(1)) {
 		Alert("failed to initialize trash buffers.\n");
 		exit(1);
 	}
@@ -1196,10 +1196,10 @@
 	/*
 	 * Initialize the previously static variables.
 	 */
-    
+
 	totalconn = actconn = maxfd = listeners = stopping = 0;
 	killed = 0;
-    
+
 
 #ifdef HAPROXY_MEMMAX
 	global.rlimit_memmax_all = HAPROXY_MEMMAX;
@@ -1760,16 +1760,11 @@
 		global.nbthread = 1;
 
 	/* Realloc trash buffers because global.tune.bufsize may have changed */
-	if (!init_trash_buffers()) {
+	if (!init_trash_buffers(0)) {
 		Alert("failed to initialize trash buffers.\n");
 		exit(1);
 	}
 
-	if (!init_log_buffers()) {
-		Alert("failed to initialize log buffers.\n");
-		exit(1);
-	}
-
 	/*
 	 * Note: we could register external pollers here.
 	 * Built-in pollers have been registered before main().
@@ -2194,7 +2189,6 @@
 	pool_destroy2(pool2_stream);
 	pool_destroy2(pool2_session);
 	pool_destroy2(pool2_connection);
-	pool_destroy2(pool2_trash);
 	pool_destroy2(pool2_requri);
 	pool_destroy2(pool2_task);
 	pool_destroy2(pool2_capture);
@@ -2291,7 +2285,6 @@
 	}
 }
 
-#ifdef USE_THREAD
 static void *run_thread_poll_loop(void *data)
 {
 	struct per_thread_init_fct   *ptif;
@@ -2318,9 +2311,12 @@
 	list_for_each_entry(ptdf, &per_thread_deinit_list, list)
 		ptdf->fct();
 
-	pthread_exit(NULL);
-}
+#ifdef USE_THREAD
+	if (tid > 0)
+		pthread_exit(NULL);
 #endif
+	return NULL;
+}
 
 /* This is the global management task for listeners. It enables listeners waiting
  * for global resources when there are enough free resource, or at least once in
@@ -2804,17 +2800,24 @@
 	/*
 	 * That's it : the central polling loop. Run until we stop.
 	 */
-	if (global.nbthread > 1) {
 #ifdef USE_THREAD
+	{
 		unsigned int *tids    = calloc(global.nbthread, sizeof(unsigned int));
 		pthread_t    *threads = calloc(global.nbthread, sizeof(pthread_t));
 		int          i;
 
-		THREAD_SYNC_INIT((1UL << global.nbthread) - 1);
-		for (i = 0; i < global.nbthread; i++) {
+		/* Init tids array */
+		for (i = 0; i < global.nbthread; i++)
 			tids[i] = i;
+
+		/* Create nbthread-1 thread. The first thread is the current process */
+		threads[0] = pthread_self();
+		for (i = 1; i < global.nbthread; i++)
 			pthread_create(&threads[i], NULL, &run_thread_poll_loop, &tids[i]);
+
 #ifdef USE_CPU_AFFINITY
+		/* Now the CPU affinity for all threads */
+		for (i = 0; i < global.nbthread; i++) {
 			if (global.cpu_map[relative_pid-1])
 				global.thread_map[relative_pid-1][i] &= global.cpu_map[relative_pid-1];
 
@@ -2822,9 +2825,14 @@
 			    global.thread_map[relative_pid-1][i]) /* only do this if the thread has a THREAD map */
 				pthread_setaffinity_np(threads[i],
 						       sizeof(unsigned long), (void *)&global.thread_map[relative_pid-1][i]);
-#endif
 		}
-		for (i = 0; i < global.nbthread; i++)
+#endif /* !USE_CPU_AFFINITY */
+
+		/* Finally, start the poll loop for the first thread */
+		run_thread_poll_loop(&tids[0]);
+
+		/* Wait the end of other threads */
+		for (i = 1; i < global.nbthread; i++)
 			pthread_join(threads[i], NULL);
 
 		free(tids);
@@ -2833,30 +2841,12 @@
 #if defined(DEBUG_THREAD) || defined(DEBUG_FULL)
 		show_lock_stats();
 #endif
-
-#endif /* USE_THREAD */
 	}
-	else {
-		tid = 0;
+#else /* ! USE_THREAD */
 
-#ifdef USE_THREAD
-#ifdef USE_CPU_AFFINITY
-		if (global.cpu_map[relative_pid-1])
-			global.thread_map[relative_pid-1][tid] &= global.cpu_map[relative_pid-1];
+	run_thread_poll_loop((int []){0});
 
-		if (global.thread_map[relative_pid-1][tid]) /* only do this if the thread has a THREAD map */
-			pthread_setaffinity_np(pthread_self(),
-					       sizeof(unsigned long), (void *)&global.thread_map[relative_pid-1][tid]);
 #endif
-#endif
-
-		if (global.mode & MODE_MWORKER)
-			mworker_pipe_register(mworker_pipe);
-
-		protocol_enable_all();
-
-		run_poll_loop();
-	}
 
 	/* Do some cleanup */
 	deinit();
diff --git a/src/log.c b/src/log.c
index 864560b..773662b 100644
--- a/src/log.c
+++ b/src/log.c
@@ -1344,14 +1344,19 @@
 	}
 }
 
+static int init_log_buffers_per_thread()
+{
+	return init_log_buffers();
+}
+
+static void deinit_log_buffers_per_thread()
+{
+	deinit_log_buffers();
+}
+
 /* Initialize log buffers used for syslog messages */
 int init_log_buffers()
 {
-	if (global.nbthread > 1 && tid == (unsigned int)(-1)) {
-		hap_register_per_thread_init(init_log_buffers);
-		hap_register_per_thread_deinit(deinit_log_buffers);
-	}
-
 	logheader = my_realloc2(logheader, global.max_syslog_len + 1);
 	logheader_rfc5424 = my_realloc2(logheader_rfc5424, global.max_syslog_len + 1);
 	logline = my_realloc2(logline, global.max_syslog_len + 1);
@@ -2413,6 +2418,8 @@
 __attribute__((constructor))
 static void __log_init(void)
 {
+	hap_register_per_thread_init(init_log_buffers_per_thread);
+	hap_register_per_thread_deinit(deinit_log_buffers_per_thread);
 	cli_register_kw(&cli_kws);
 }
 /*