[MAJOR] implement autonomous inter-socket forwarding
If an analyser sets buf->to_forward to a given value, that many
data will be forwarded between the two stream interfaces attached
to a buffer without waking the task up. The same applies once all
analysers have been released. This saves a large amount of calls
to process_session() and a number of task_dequeue/queue.
diff --git a/include/common/defaults.h b/include/common/defaults.h
index 05628e1..acba5c6 100644
--- a/include/common/defaults.h
+++ b/include/common/defaults.h
@@ -2,8 +2,8 @@
include/common/defaults.h
Miscellaneous default values.
- Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu
-
+ Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
+
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation, version 2.1
@@ -40,6 +40,16 @@
#define MAXREWRITE (BUFSIZE / 2)
#endif
+/* FORWARD_DEFAULT_SIZE
+ * Indicates how many bytes may be forwarded at once in low-level stream-socks
+ * without waking the owner task up. This should be much larger than the buffer
+ * size. A few megabytes seem appropriate.
+ */
+#ifndef FORWARD_DEFAULT_SIZE
+#define FORWARD_DEFAULT_SIZE (16*1024*1024)
+#endif
+
+
#define REQURI_LEN 1024
#define CAPTURE_LEN 64
diff --git a/include/proto/buffers.h b/include/proto/buffers.h
index 6442796..ab0de11 100644
--- a/include/proto/buffers.h
+++ b/include/proto/buffers.h
@@ -46,6 +46,7 @@
static inline void buffer_init(struct buffer *buf)
{
buf->send_max = 0;
+ buf->to_forward = 0;
buf->l = buf->total = 0;
buf->analysers = 0;
buf->cons = NULL;
@@ -92,6 +93,7 @@
static inline void buffer_flush(struct buffer *buf)
{
buf->send_max = 0;
+ buf->to_forward = 0;
buf->r = buf->lr = buf->w = buf->data;
buf->l = 0;
buf->flags |= BF_EMPTY | BF_FULL;
diff --git a/include/types/buffers.h b/include/types/buffers.h
index d8f7118..f15d33d 100644
--- a/include/types/buffers.h
+++ b/include/types/buffers.h
@@ -130,6 +130,7 @@
char *r, *w, *lr; /* read ptr, write ptr, last read */
char *rlim; /* read limit, used for header rewriting */
unsigned int send_max; /* number of bytes the sender can consume */
+ unsigned int to_forward; /* number of bytes that can send without a wake-up, >= send_max */
unsigned int analysers; /* bit field indicating what to do on the buffer */
int analyse_exp; /* expiration date for current analysers (if set) */
void (*hijacker)(struct session *, struct buffer *); /* alternative content producer */
diff --git a/src/proto_uxst.c b/src/proto_uxst.c
index 6c13679..a6eed13 100644
--- a/src/proto_uxst.c
+++ b/src/proto_uxst.c
@@ -809,6 +809,17 @@
if (!s->req->analysers && !(s->req->flags & BF_HIJACK))
s->req->send_max = s->req->l;
+ /* if noone is interested in analysing data, let's forward everything
+ * and only wake up every 1-2 MB. We still wake up when send_max is
+ * reached though.
+ */
+ if (!s->req->send_max && s->req->prod->state >= SI_ST_EST &&
+ !s->req->analysers && !(s->req->flags & BF_HIJACK)) {
+ if (s->req->to_forward < FORWARD_DEFAULT_SIZE)
+ s->req->to_forward += FORWARD_DEFAULT_SIZE;
+ s->req->send_max = s->req->l;
+ }
+
/* reflect what the L7 analysers have seen last */
rqf_last = s->req->flags;
@@ -879,9 +890,17 @@
resync = 1;
}
- /* if noone is interested in analysing data, let's forward everything */
- if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK))
+ /* if noone is interested in analysing data, let's forward everything
+ * and only wake up every 1-2 MB. We still wake up when send_max is
+ * reached though.
+ */
+ if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST &&
+ !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
+ if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) {
+ s->rep->to_forward += FORWARD_DEFAULT_SIZE;
+ }
s->rep->send_max = s->rep->l;
+ }
/* reflect what the L7 analysers have seen last */
rpf_last = s->rep->flags;
diff --git a/src/session.c b/src/session.c
index a775ba5..97ce822 100644
--- a/src/session.c
+++ b/src/session.c
@@ -746,9 +746,16 @@
resync = 1;
}
- /* if noone is interested in analysing data, let's forward everything */
- if (!s->req->analysers && !(s->req->flags & BF_HIJACK))
+ /* if noone is interested in analysing data, let's forward everything
+ * and only wake up every 1-2 MB. We still wake up when send_max is
+ * reached though.
+ */
+ if (!s->req->send_max && s->req->prod->state >= SI_ST_EST &&
+ !s->req->analysers && !(s->req->flags & BF_HIJACK)) {
+ if (s->req->to_forward < FORWARD_DEFAULT_SIZE)
+ s->req->to_forward += FORWARD_DEFAULT_SIZE;
s->req->send_max = s->req->l;
+ }
/* reflect what the L7 analysers have seen last */
rqf_last = s->req->flags;
@@ -855,9 +862,17 @@
resync = 1;
}
- /* if noone is interested in analysing data, let's forward everything */
- if (!s->rep->analysers && !(s->rep->flags & BF_HIJACK))
+ /* if noone is interested in analysing data, let's forward everything
+ * and only wake up every 1-2 MB. We still wake up when send_max is
+ * reached though.
+ */
+ if (!s->rep->send_max && s->rep->prod->state >= SI_ST_EST &&
+ !s->rep->analysers && !(s->rep->flags & BF_HIJACK)) {
+ if (s->rep->to_forward < FORWARD_DEFAULT_SIZE) {
+ s->rep->to_forward += FORWARD_DEFAULT_SIZE;
+ }
s->rep->send_max = s->rep->l;
+ }
/* reflect what the L7 analysers have seen last */
rpf_last = s->rep->flags;
@@ -870,7 +885,7 @@
* FIXME: this is probably where we should produce error responses.
*/
- /* first, let's check if the request buffer needs to shutdown(write) */
+ /* first, let's check if the response buffer needs to shutdown(write) */
if (unlikely((s->rep->flags & (BF_SHUTW|BF_SHUTW_NOW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) ==
(BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)))
buffer_shutw_now(s->rep);
diff --git a/src/stream_sock.c b/src/stream_sock.c
index fdd0dbd..82e1055 100644
--- a/src/stream_sock.c
+++ b/src/stream_sock.c
@@ -116,8 +116,8 @@
cur_read += ret;
/* if noone is interested in analysing data, let's forward everything */
- if (!b->analysers)
- b->send_max += ret;
+ if (b->to_forward > b->send_max)
+ b->send_max = MIN(b->to_forward, b->l);
if (fdtab[fd].state == FD_STCONN)
fdtab[fd].state = FD_STREADY;
@@ -251,10 +251,17 @@
goto out_skip_wakeup;
out_wakeup:
/* the consumer might be waiting for data */
- if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL))
+ if (b->cons->flags & SI_FL_WAIT_DATA && (b->flags & BF_READ_PARTIAL) && !(b->flags & BF_EMPTY))
b->cons->chk_snd(b->cons);
- task_wakeup(si->owner, TASK_WOKEN_IO);
+ /* we have to wake up if there is a special event or if we don't have
+ * any more data to forward.
+ */
+ if ((b->flags & (BF_READ_NULL|BF_READ_ERROR|BF_SHUTR)) ||
+ !b->to_forward ||
+ si->state != SI_ST_EST ||
+ b->cons->state != SI_ST_EST)
+ task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_IN;
@@ -379,6 +386,13 @@
b->l -= ret;
b->w += ret;
b->send_max -= ret;
+ /* we can send up to send_max, we just want to know when
+ * to_forward has been reached.
+ */
+ if ((signed)(b->to_forward - ret) >= 0)
+ b->to_forward -= ret;
+ else
+ b->to_forward = 0;
if (fdtab[fd].state == FD_STCONN)
fdtab[fd].state = FD_STREADY;
@@ -453,10 +467,17 @@
goto out_skip_wakeup;
out_wakeup:
/* the producer might be waiting for more room to store data */
- if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL))
+ if ((b->prod->flags & SI_FL_WAIT_ROOM) && (b->flags & BF_WRITE_PARTIAL) && !(b->flags & BF_FULL))
b->prod->chk_rcv(b->prod);
- task_wakeup(si->owner, TASK_WOKEN_IO);
+ /* we have to wake up if there is a special event or if we don't have
+ * any more data to forward.
+ */
+ if ((b->flags & (BF_WRITE_NULL|BF_WRITE_ERROR|BF_SHUTW)) ||
+ !b->to_forward ||
+ si->state != SI_ST_EST ||
+ b->prod->state != SI_ST_EST)
+ task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_OUT;