MEDIUM: task: remove the tasks_run_queue counter and have one per thread

This counter is solely used for reporting in the stats and is the hottest
thread contention point to date. Moving it to the scheduler and having a
separate one for the global run queue dramatically improves the performance,
showing a 12% boost on the request rate on 16 threads!

In addition, the thread debugging output which used to rely on rqueue_size
was not totally accurate as it would only report task counts. Now we can
return the exact thread's run queue length.

It is also interesting to note that there are still a few other task/tasklet
counters in the scheduler that are not efficiently updated because some cover
a single area and others cover multiple areas. It looks like having a distinct
counter for each of the following entries would help and would keep the code
a bit cleaner:
  - global run queue (tree)
  - per-thread run queue (tree)
  - per-thread shared tasklets list
  - per-thread local lists

Maybe even splitting the shared tasklets lists between pure tasklets and
tasks instead of having the whole and tasks would simplify the code because
there remain a number of places where several counters have to be updated.

(cherry picked from commit 9c7b8085f4cad284642e7f67d2274f2fb568f243)
[wt: backported as it does have a significant impact on many-core machines;
 with 48 threads, the request rate is simply doubled and the response time
 halved; There were significant context updates, all verified to be OK; it
 does not seem reasonable to backport it to 2.2]
Signed-off-by: Willy Tarreau <w@1wt.eu>
diff --git a/include/haproxy/task-t.h b/include/haproxy/task-t.h
index 2403d99..13fc27a 100644
--- a/include/haproxy/task-t.h
+++ b/include/haproxy/task-t.h
@@ -83,6 +83,8 @@
 	int rqueue_size;        /* Number of elements in the per-thread run queue */
 	struct task *current;   /* current task (not tasklet) */
 	int current_queue;      /* points to current tasklet list being run, -1 if none */
+	unsigned int rq_ticks;  /* total size of the run queue, prio_tree + tasklets */
+	unsigned int rq_total;  /* total size of the run queue, prio_tree + tasklets */
 	uint8_t tl_class_mask;  /* bit mask of non-empty tasklets classes */
 	__attribute__((aligned(64))) char end[0];
 };
diff --git a/include/haproxy/task.h b/include/haproxy/task.h
index e4462c7..459cc23 100644
--- a/include/haproxy/task.h
+++ b/include/haproxy/task.h
@@ -89,8 +89,7 @@
 /* a few exported variables */
 extern unsigned int nb_tasks;     /* total number of tasks */
 extern volatile unsigned long global_tasks_mask; /* Mask of threads with tasks in the global runqueue */
-extern unsigned int tasks_run_queue;    /* run queue size */
-extern unsigned int tasks_run_queue_cur;
+extern unsigned int grq_total;    /* total number of entries in the global run queue */
 extern unsigned int nb_tasks_cur;
 extern unsigned int niced_tasks;  /* number of niced tasks in the run queue */
 extern struct pool_head *pool_head_task;
@@ -144,7 +143,22 @@
  */
 void mworker_cleantasks();
 
+/* returns the number of running tasks+tasklets on the whole process. Note
+ * that this *is* racy since a task may move from the global to a local
+ * queue for example and be counted twice. This is only for statistics
+ * reporting.
+ */
+static inline int total_run_queues()
+{
+	int thr, ret = 0;
 
+#ifdef USE_THREAD
+	ret = _HA_ATOMIC_LOAD(&grq_total);
+#endif
+	for (thr = 0; thr < global.nbthread; thr++)
+		ret += _HA_ATOMIC_LOAD(&task_per_thread[thr].rq_total);
+	return ret;
+}
 
 /* return 0 if task is in run queue, otherwise non-zero */
 static inline int task_in_rq(struct task *t)
@@ -284,21 +298,26 @@
 }
 
 /*
- * Unlink the task from the run queue. The tasks_run_queue size and number of
- * niced tasks are updated too. A pointer to the task itself is returned. The
- * task *must* already be in the run queue before calling this function. If
- * unsure, use the safer task_unlink_rq() function. Note that the pointer to the
- * next run queue entry is neither checked nor updated.
+ * Unlink the task from the run queue. The run queue size and number of niced
+ * tasks are updated too. A pointer to the task itself is returned. The task
+ * *must* already be in the run queue before calling this function. If the task
+ * is in the global run queue, the global run queue's lock must already be held.
+ * If unsure, use the safer task_unlink_rq() function. Note that the pointer to
+ * the next run queue entry is neither checked nor updated.
  */
 static inline struct task *__task_unlink_rq(struct task *t)
 {
-	_HA_ATOMIC_SUB(&tasks_run_queue, 1);
 #ifdef USE_THREAD
-	if (t->state & TASK_GLOBAL)
+	if (t->state & TASK_GLOBAL) {
+		grq_total--;
 		_HA_ATOMIC_AND(&t->state, ~TASK_GLOBAL);
+	}
 	else
 #endif
+	{
 		sched->rqueue_size--;
+		_HA_ATOMIC_SUB(&sched->rq_total, 1);
+	}
 	eb32sc_delete(&t->rq);
 	if (likely(t->nice))
 		_HA_ATOMIC_SUB(&niced_tasks, 1);
@@ -357,15 +376,16 @@
 			LIST_ADDQ(&sched->tasklets[sched->current_queue], &tl->list);
 			sched->tl_class_mask |= 1 << sched->current_queue;
 		}
+		_HA_ATOMIC_ADD(&sched->rq_total, 1);
 	} else {
 		/* this tasklet runs on a specific thread. */
 		MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list, (struct mt_list *)&tl->list);
+		_HA_ATOMIC_ADD(&task_per_thread[thr].rq_total, 1);
 		if (sleeping_thread_mask & (1UL << thr)) {
 			_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
 			wake_thread(thr);
 		}
 	}
-	_HA_ATOMIC_ADD(&tasks_run_queue, 1);
 }
 
 /* schedules tasklet <tl> to run onto the thread designated by tl->tid, which
@@ -381,7 +401,7 @@
  */
 static inline void tasklet_insert_into_tasklet_list(struct list *list, struct tasklet *tl)
 {
-	_HA_ATOMIC_ADD(&tasks_run_queue, 1);
+	_HA_ATOMIC_ADD(&sched->rq_total, 1);
 	LIST_ADDQ(list, &tl->list);
 }
 
@@ -395,7 +415,7 @@
 {
 	if (MT_LIST_DEL((struct mt_list *)&t->list)) {
 		_HA_ATOMIC_AND(&t->state, ~TASK_IN_LIST);
-		_HA_ATOMIC_SUB(&tasks_run_queue, 1);
+		_HA_ATOMIC_SUB(&task_per_thread[t->tid >= 0 ? t->tid : tid].rq_total, 1);
 	}
 }
 
@@ -510,7 +530,7 @@
 static inline void tasklet_free(struct tasklet *tl)
 {
 	if (MT_LIST_DEL((struct mt_list *)&tl->list))
-		_HA_ATOMIC_SUB(&tasks_run_queue, 1);
+		_HA_ATOMIC_SUB(&task_per_thread[tl->tid >= 0 ? tl->tid : tid].rq_total, 1);
 
 	pool_free(pool_head_tasklet, tl);
 	if (unlikely(stopping))
diff --git a/src/debug.c b/src/debug.c
index b2a4c6b..30e1559 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -70,7 +70,7 @@
 			LIST_ISEMPTY(&task_per_thread[thr].tasklets[TL_BULK]) &&
 			MT_LIST_ISEMPTY(&task_per_thread[thr].shared_tasklet_list)),
 	              task_per_thread[thr].task_list_size,
-	              task_per_thread[thr].rqueue_size,
+	              task_per_thread[thr].rq_total,
 	              stuck,
 	              !!(task_profiling_mask & thr_bit));
 
diff --git a/src/stats.c b/src/stats.c
index 9eea148..70040f8 100644
--- a/src/stats.c
+++ b/src/stats.c
@@ -2864,7 +2864,7 @@
 	              actconn, pipes_used, pipes_used+pipes_free, read_freq_ctr(&global.conn_per_sec),
 		      bps >= 1000000000UL ? (bps / 1000000000.0) : bps >= 1000000UL ? (bps / 1000000.0) : (bps / 1000.0),
 		      bps >= 1000000000UL ? 'G' : bps >= 1000000UL ? 'M' : 'k',
-	              tasks_run_queue_cur, nb_tasks_cur, ti->idle_pct
+	              total_run_queues(), nb_tasks_cur, ti->idle_pct
 	              );
 
 	/* scope_txt = search query, appctx->ctx.stats.scope_len is always <= STAT_SCOPE_TXT_MAXLEN */
@@ -3892,7 +3892,7 @@
 	info[INF_MAX_ZLIB_MEM_USAGE]             = mkf_u32(FO_CONFIG|FN_LIMIT, global.maxzlibmem);
 #endif
 	info[INF_TASKS]                          = mkf_u32(0, nb_tasks_cur);
-	info[INF_RUN_QUEUE]                      = mkf_u32(0, tasks_run_queue_cur);
+	info[INF_RUN_QUEUE]                      = mkf_u32(0, total_run_queues());
 	info[INF_IDLE_PCT]                       = mkf_u32(FN_AVG, ti->idle_pct);
 	info[INF_NODE]                           = mkf_str(FO_CONFIG|FN_OUTPUT|FS_SERVICE, global.node);
 	if (global.desc)
diff --git a/src/task.c b/src/task.c
index 20b08ea..2959812 100644
--- a/src/task.c
+++ b/src/task.c
@@ -37,8 +37,6 @@
 
 unsigned int nb_tasks = 0;
 volatile unsigned long global_tasks_mask = 0; /* Mask of threads with tasks in the global runqueue */
-unsigned int tasks_run_queue = 0;
-unsigned int tasks_run_queue_cur = 0;    /* copy of the run queue size */
 unsigned int nb_tasks_cur = 0;     /* copy of the tasks count */
 unsigned int niced_tasks = 0;      /* number of niced tasks in the run queue */
 
@@ -48,8 +46,9 @@
 __decl_aligned_rwlock(wq_lock);   /* RW lock related to the wait queue */
 
 #ifdef USE_THREAD
-struct eb_root timers;      /* sorted timers tree, global */
-struct eb_root rqueue;      /* tree constituting the run queue */
+struct eb_root timers;      /* sorted timers tree, global, accessed under wq_lock */
+struct eb_root rqueue;      /* tree constituting the global run queue, accessed under rq_lock */
+unsigned int grq_total;     /* total number of entries in the global run queue, use grq_lock */
 #endif
 
 static unsigned int rqueue_ticks;  /* insertion count */
@@ -97,7 +96,7 @@
 			/* Beware: tasks that have never run don't have their ->list empty yet! */
 			MT_LIST_ADDQ(&task_per_thread[thr].shared_tasklet_list,
 			             (struct mt_list *)&((struct tasklet *)t)->list);
-			_HA_ATOMIC_ADD(&tasks_run_queue, 1);
+			_HA_ATOMIC_ADD(&task_per_thread[thr].rq_total, 1);
 			_HA_ATOMIC_ADD(&task_per_thread[thr].task_list_size, 1);
 			if (sleeping_thread_mask & (1UL << thr)) {
 				_HA_ATOMIC_AND(&sleeping_thread_mask, ~(1UL << thr));
@@ -122,17 +121,16 @@
 	if (root == &rqueue) {
 		HA_SPIN_LOCK(TASK_RQ_LOCK, &rq_lock);
 	}
-#endif
-	/* Make sure if the task isn't in the runqueue, nobody inserts it
-	 * in the meanwhile.
-	 */
-	_HA_ATOMIC_ADD(&tasks_run_queue, 1);
-#ifdef USE_THREAD
+
 	if (root == &rqueue) {
 		global_tasks_mask |= t->thread_mask;
+		grq_total++;
 		__ha_barrier_store();
-	}
+	} else
 #endif
+	{
+		_HA_ATOMIC_ADD(&sched->rq_total, 1);
+	}
 	t->rq.key = _HA_ATOMIC_ADD(&rqueue_ticks, 1);
 
 	if (likely(t->nice)) {
@@ -472,7 +470,7 @@
 		t->calls++;
 		sched->current = t;
 
-		_HA_ATOMIC_SUB(&tasks_run_queue, 1);
+		_HA_ATOMIC_SUB(&sched->rq_total, 1);
 
 		if (TASK_IS_TASKLET(t)) {
 			LIST_DEL_INIT(&((struct tasklet *)t)->list);
@@ -587,7 +585,6 @@
 		return;
 	}
 
-	tasks_run_queue_cur = tasks_run_queue; /* keep a copy for reporting */
 	nb_tasks_cur = nb_tasks;
 	max_processed = global.tune.runqueue_depth;