[MAJOR] replaced rbtree with ul2tree.
The rbtree-based wait queue consumes a lot of CPU. Use the ul2tree
instead. Lots of cleanups and code reorganizations made it possible
to reduce the task struct and simplify the code a bit.
diff --git a/src/appsession.c b/src/appsession.c
index 62b096f..0adbc33 100644
--- a/src/appsession.c
+++ b/src/appsession.c
@@ -114,7 +114,7 @@
if ((t = pool_alloc(task)) == NULL)
return -1;
t->wq = NULL;
- t->rqnext = NULL;
+ t->qlist.p = NULL;
t->state = TASK_IDLE;
t->context = NULL;
tv_delayfrom(&t->expire, &now, TBLCHKINT);
diff --git a/src/backend.c b/src/backend.c
index 0eb97ed..f096874 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -572,7 +572,7 @@
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
return 0;
@@ -611,7 +611,7 @@
t->be->failed_conns++;
/* release other sessions waiting for this server */
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
/* ensure that we have enough retries left */
@@ -625,7 +625,7 @@
*/
/* let's try to offer this slot to anybody */
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
if (t->srv)
t->srv->failed_conns++;
@@ -691,7 +691,7 @@
/* release other sessions waiting for this server */
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
/* if we get here, it's because we got SRV_STATUS_OK, which also
diff --git a/src/cfgparse.c b/src/cfgparse.c
index 33670f1..289d634 100644
--- a/src/cfgparse.c
+++ b/src/cfgparse.c
@@ -2415,7 +2415,7 @@
return -1;
}
- t->rqnext = NULL;
+ t->qlist.p = NULL;
t->wq = NULL;
t->state = TASK_IDLE;
t->process = process_srv_queue;
@@ -2463,7 +2463,7 @@
}
t->wq = NULL;
- t->rqnext = NULL;
+ t->qlist.p = NULL;
t->state = TASK_IDLE;
t->process = process_chk;
t->context = newsrv;
diff --git a/src/checks.c b/src/checks.c
index 309d0c4..f9bc5c5 100644
--- a/src/checks.c
+++ b/src/checks.c
@@ -75,7 +75,7 @@
sess->srv = NULL; /* it's left to the dispatcher to choose a server */
http_flush_cookie_flags(&sess->txn);
pendconn_free(pc);
- task_wakeup(&rq, sess->task);
+ task_wakeup(sess->task);
xferred++;
}
}
@@ -167,7 +167,7 @@
}
}
out_wakeup:
- task_wakeup(&rq, t);
+ task_wakeup(t);
out_nowake:
EV_FD_CLR(fd, DIR_WR); /* nothing more to write */
fdtab[fd].ev &= ~FD_POLL_WR;
@@ -237,7 +237,7 @@
out_wakeup:
EV_FD_CLR(fd, DIR_RD);
- task_wakeup(&rq, t);
+ task_wakeup(t);
fdtab[fd].ev &= ~FD_POLL_RD;
return 1;
}
@@ -436,7 +436,7 @@
p->sess->srv = s;
sess = p->sess;
pendconn_free(p);
- task_wakeup(&rq, sess->task);
+ task_wakeup(sess->task);
}
sprintf(trash,
diff --git a/src/client.c b/src/client.c
index 521b860..028282e 100644
--- a/src/client.c
+++ b/src/client.c
@@ -152,7 +152,7 @@
setsockopt(cfd, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one));
t->wq = NULL;
- t->rqnext = NULL;
+ t->qlist.p = NULL;
t->state = TASK_IDLE;
t->process = process_session;
t->context = s;
@@ -422,7 +422,7 @@
task_queue(t);
if (p->mode != PR_MODE_HEALTH)
- task_wakeup(&rq, t);
+ task_wakeup(t);
p->feconn++; /* beconn will be increased later */
if (p->feconn > p->feconn_max)
diff --git a/src/haproxy.c b/src/haproxy.c
index f8946f9..5776d18 100644
--- a/src/haproxy.c
+++ b/src/haproxy.c
@@ -269,6 +269,7 @@
void dump(int sig)
{
+#if 0
struct task *t;
struct session *s;
struct rb_node *node;
@@ -290,6 +291,7 @@
s->req->l, s->rep?s->rep->l:0, s->cli_fd
);
}
+#endif
}
#ifdef DEBUG_MEMORY
diff --git a/src/proto_http.c b/src/proto_http.c
index e9b3b56..00277f5 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -2353,7 +2353,7 @@
*/
/* let's try to offer this slot to anybody */
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
if (t->srv)
t->srv->failed_conns++;
@@ -2539,7 +2539,7 @@
* we have to inform the server that it may be used by another session.
*/
if (t->srv && may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
@@ -2581,7 +2581,7 @@
* we have to inform the server that it may be used by another session.
*/
if (t->srv && may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
@@ -2753,7 +2753,7 @@
* we have to inform the server that it may be used by another session.
*/
if (t->srv && may_dequeue_tasks(t->srv, cur_proxy))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
}
@@ -2988,7 +2988,7 @@
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
@@ -3101,7 +3101,7 @@
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
@@ -3117,7 +3117,7 @@
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
@@ -3137,7 +3137,7 @@
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
@@ -3182,7 +3182,7 @@
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
@@ -3198,7 +3198,7 @@
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
@@ -3218,7 +3218,7 @@
* we have to inform the server that it may be used by another session.
*/
if (may_dequeue_tasks(t->srv, t->be))
- task_wakeup(&rq, t->srv->queue_mgt);
+ task_wakeup(t->srv->queue_mgt);
return 1;
}
diff --git a/src/queue.c b/src/queue.c
index 54fd6a3..9b53687 100644
--- a/src/queue.c
+++ b/src/queue.c
@@ -62,7 +62,7 @@
sess = pendconn_get_next_sess(s, p);
if (sess == NULL)
break;
- task_wakeup(&rq, sess->task);
+ task_wakeup(sess->task);
}
return TIME_ETERNITY;
diff --git a/src/stream_sock.c b/src/stream_sock.c
index 2a7275b..77eda02 100644
--- a/src/stream_sock.c
+++ b/src/stream_sock.c
@@ -156,7 +156,7 @@
else
tv_eternity(&b->rex);
- task_wakeup(&rq, fdtab[fd].owner);
+ task_wakeup(fdtab[fd].owner);
}
fdtab[fd].ev &= ~FD_POLL_RD;
@@ -291,7 +291,7 @@
}
}
- task_wakeup(&rq, fdtab[fd].owner);
+ task_wakeup(fdtab[fd].owner);
fdtab[fd].ev &= ~FD_POLL_WR;
return retval;
}
diff --git a/src/task.c b/src/task.c
index ca262d6..a0bf7af 100644
--- a/src/task.c
+++ b/src/task.c
@@ -1,7 +1,7 @@
/*
* Task management functions.
*
- * Copyright 2000-2006 Willy Tarreau <w@1wt.eu>
+ * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@@ -13,95 +13,117 @@
#include <common/config.h>
#include <common/mini-clist.h>
#include <common/time.h>
+#include <common/standard.h>
#include <proto/task.h>
+#include <types/task.h>
+// FIXME: check 8bitops.c for faster FLS
+#include <import/bitops.h>
+#include <import/tree.h>
+
/* FIXME : this should be removed very quickly ! */
extern int maintain_proxies(void);
void **pool_task= NULL;
-struct task *rq = NULL; /* global run queue */
+void **pool_tree64 = NULL;
+static struct ultree *stack[LLONGBITS];
-struct rb_root wait_queue[2] = {
- RB_ROOT,
- RB_ROOT,
-};
+UL2TREE_HEAD(timer_wq);
+void *eternity_queue = NULL;
+void *run_queue = NULL;
-
-static inline void __rb_insert_task_queue(struct task *newtask)
+struct ultree *ul2tree_insert(struct ultree *root, unsigned long h, unsigned long l)
{
- struct rb_node **p = &newtask->wq->rb_node;
- struct rb_node *parent = NULL;
- struct task * task;
+ return __ul2tree_insert(root, h, l);
+}
- while (*p)
- {
- parent = *p;
- task = rb_entry(parent, struct task, rb_node);
- if (tv_cmp_ge2(&task->expire, &newtask->expire))
- p = &(*p)->rb_left;
- else
- p = &(*p)->rb_right;
- }
- rb_link_node(&newtask->rb_node, parent, p);
+void *tree_delete(void *node) {
+ return __tree_delete(node);
}
-static void rb_insert_task_queue(struct task *newtask)
+/*
+ * task_queue()
+ *
+ * Inserts a task into the wait queue at the position given by its expiration
+ * date.
+ *
+ */
+struct task *task_queue(struct task *task)
{
- __rb_insert_task_queue(newtask);
- rb_insert_color(&newtask->rb_node, newtask->wq);
+ if (unlikely(task->qlist.p != NULL)) {
+ DLIST_DEL(&task->qlist);
+ task->qlist.p = NULL;
+ }
+
+ if (unlikely(task->wq)) {
+ tree_delete(task->wq);
+ task->wq = NULL;
+ }
+
+ if (unlikely(tv_iseternity(&task->expire))) {
+ task->wq = NULL;
+ DLIST_ADD(eternity_queue, &task->qlist);
+ return task;
+ }
+
+ task->wq = ul2tree_insert(&timer_wq, task->expire.tv_sec, task->expire.tv_usec);
+ DLIST_ADD(task->wq->data, &task->qlist);
+ return task;
}
-struct task *task_queue(struct task *task)
+/*
+ * Extract all expired timers from the wait queue, and wakes up all
+ * associated tasks.
+ * Returns the time to wait for next task (next_time).
+ *
+ * FIXME: Use an alternative queue for ETERNITY tasks.
+ *
+ */
+int wake_expired_tasks()
{
- struct rb_node *node;
- struct task *next, *prev;
+ int slen;
+ struct task *task;
+ void *data;
+ int next_time;
- if (tv_iseternity(&task->expire)) {
- if (task->wq) {
- if (task->wq == &wait_queue[1])
- return task;
- else
- task_delete(task);
- }
- task->wq = &wait_queue[1];
- rb_insert_task_queue(task);
- return task;
- } else {
- if (task->wq != &wait_queue[0]) {
- if (task->wq)
- task_delete(task);
- task->wq = &wait_queue[0];
- rb_insert_task_queue(task);
- return task;
- }
+ /*
+ * Hint: tasks are *rarely* expired. So we can try to optimize
+ * by not scanning the tree at all in most cases.
+ */
+
+ if (likely(timer_wq.data != NULL)) {
+ task = LIST_ELEM(timer_wq.data, struct task *, qlist);
+ if (likely(tv_cmp_ge(&task->expire, &now) > 0))
+ return tv_remain(&now, &task->expire);
+ }
+
+ /* OK we lose. Let's scan the tree then. */
+ next_time = TIME_ETERNITY;
- // check whether task should be re insert
- node = rb_prev(&task->rb_node);
- if (node) {
- prev = rb_entry(node, struct task, rb_node);
- if (tv_cmp_ge(&prev->expire, &task->expire)) {
- task_delete(task);
- task->wq = &wait_queue[0];
- rb_insert_task_queue(task);
- return task;
- }
+ tree64_foreach(&timer_wq, data, stack, slen) {
+ task = LIST_ELEM(data, struct task *, qlist);
+
+ if (unlikely(tv_cmp_ge(&task->expire, &now) > 0)) {
+ next_time = tv_remain(&now, &task->expire);
+ break;
}
- node = rb_next(&task->rb_node);
- if (node) {
- next = rb_entry(node, struct task, rb_node);
- if (tv_cmp_ge(&task->expire, &next->expire)) {
- task_delete(task);
- task->wq = &wait_queue[0];
- rb_insert_task_queue(task);
- return task;
- }
+ /*
+ * OK, all tasks linked to this node will be unlinked, as well
+ * as the node itself, so we do not need to care about correct
+ * unlinking.
+ */
+ foreach_dlist_item(task, data, struct task *, qlist) {
+ DLIST_DEL(&task->qlist);
+ task->wq = NULL;
+ DLIST_ADD(run_queue, &task->qlist);
+ task->state = TASK_RUNNING;
}
- return task;
}
+ return next_time;
}
/*
@@ -117,35 +139,23 @@
int next_time;
int time2;
struct task *t;
- struct rb_node *node;
-
- next_time = TIME_ETERNITY;
- for (node = rb_first(&wait_queue[0]);
- node != NULL; node = rb_next(node)) {
- t = rb_entry(node, struct task, rb_node);
- if (t->state & TASK_RUNNING)
- continue;
- if (tv_iseternity(&t->expire))
- continue;
- if (tv_cmp_ms(&t->expire, &now) <= 0) {
- task_wakeup(&rq, t);
- } else {
- int temp_time = tv_remain(&now, &t->expire);
- if (temp_time)
- next_time = temp_time;
- break;
- }
- }
+ void *queue;
+ next_time = wake_expired_tasks();
/* process each task in the run queue now. Each task may be deleted
* since we only use the run queue's head. Note that any task can be
* woken up by any other task and it will be processed immediately
* after as it will be queued on the run queue's head !
*/
- while ((t = rq) != NULL) {
+
+ queue = run_queue;
+ foreach_dlist_item(t, queue, struct task *, qlist) {
int temp_time;
- task_sleep(&rq, t);
+ DLIST_DEL(&t->qlist);
+ t->qlist.p = NULL;
+
+ t->state = TASK_IDLE;
temp_time = t->process(t);
next_time = MINTIME(temp_time, next_time);
}