[MEDIUM] split stream_sock_process_data
It was a waste to constantly update the file descriptor's status
and timeouts during a flags update. So stream_sock_process_data
has been slit in two parts :
stream_sock_data_update() => computes updated flags
stream_sock_data_finish() => computes timeouts
Only the first one is called during flag updates. The second one
is only called upon completion. The number of calls to fd_set/fd_clr
has now significantly dropped.
Also, it's useless to check for errors and timeouts in the
process_session() loop, it's enough to check for them at the
beginning.
diff --git a/include/proto/stream_sock.h b/include/proto/stream_sock.h
index dc2b763..fe89d98 100644
--- a/include/proto/stream_sock.h
+++ b/include/proto/stream_sock.h
@@ -33,7 +33,9 @@
/* main event functions used to move data between sockets and buffers */
int stream_sock_read(int fd);
int stream_sock_write(int fd);
-int stream_sock_process_data(int fd);
+int stream_sock_data_check_errors(int fd);
+int stream_sock_data_update(int fd);
+int stream_sock_data_finish(int fd);
/* This either returns the sockname or the original destination address. Code
diff --git a/include/types/buffers.h b/include/types/buffers.h
index aee48ee..bc8a184 100644
--- a/include/types/buffers.h
+++ b/include/types/buffers.h
@@ -71,7 +71,7 @@
#define BF_MASK_INTERFACE_O (BF_EMPTY|BF_HIJACK|BF_MAY_FORWARD|BF_SHUTR|BF_SHUTW|BF_SHUTW_NOW)
#define BF_MASK_INTERFACE (BF_MASK_INTF_I | BF_MASK_INTF_O)
-#define BF_MASK_ANALYSER (BF_FULL|BF_READ_ERROR|BF_READ_TIMEOUT|BF_WRITE_ERROR|BF_SHUTW|BF_SHUTR|BF_READ_NULL)
+#define BF_MASK_ANALYSER (BF_FULL|BF_READ_NULL|BF_READ_ERROR|BF_READ_TIMEOUT|BF_SHUTR|BF_WRITE_ERROR)
/* Analysers (buffer->analysers).
* Those bits indicate that there are some processing to do on the buffer
diff --git a/src/proto_http.c b/src/proto_http.c
index b0ec9e5..d28506d 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -660,6 +660,33 @@
unsigned int rqf_srv, rpf_srv;
unsigned int rqf_req, rpf_rep;
+ /* check server-side errors during data phase */
+ if (s->req->cons->state == SI_ST_EST) {
+ stream_sock_data_check_errors(s->req->cons->fd);
+ /* When a server-side connection is released, we have to
+ * count it and check for pending connections on this server.
+ */
+ if (unlikely(s->req->cons->state == SI_ST_CLO)) {
+ /* Count server-side errors (but not timeouts). */
+ if (s->req->flags & BF_WRITE_ERROR) {
+ s->be->failed_resp++;
+ if (s->srv)
+ s->srv->failed_resp++;
+ }
+
+ if (s->srv) {
+ s->srv->cur_sess--;
+ sess_change_server(s, NULL);
+ if (may_dequeue_tasks(s->srv, s->be))
+ process_srv_queue(s->srv);
+ }
+ }
+ }
+
+ /* check client-side errors during data phase */
+ if (s->rep->cons->state == SI_ST_EST)
+ stream_sock_data_check_errors(s->rep->cons->fd);
+
/* force one first pass everywhere */
rqf_cli = rqf_srv = rqf_req = ~s->req->flags;
rpf_cli = rpf_srv = rpf_rep = ~s->rep->flags;
@@ -667,29 +694,31 @@
do {
resync = 0;
- if (((rqf_cli ^ s->req->flags) & BF_MASK_INTERFACE_I) ||
- ((rpf_cli ^ s->rep->flags) & BF_MASK_INTERFACE_O)) {
- resync = 1;
- if (s->rep->cons->state != SI_ST_CLO) {
- stream_sock_process_data(s->rep->cons->fd);
+ if (s->rep->cons->state != SI_ST_CLO) {
+ if (((rqf_cli ^ s->req->flags) & BF_MASK_INTERFACE_I) ||
+ ((rpf_cli ^ s->rep->flags) & BF_MASK_INTERFACE_O)) {
+ resync = 1;
+ stream_sock_data_update(s->rep->cons->fd);
+ rqf_cli = s->req->flags;
+ rpf_cli = s->rep->flags;
+
if (unlikely((s->rep->cons->state == SI_ST_CLO) &&
(global.mode & MODE_DEBUG) &&
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
int len;
len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n",
- s->uniq_id, s->be->id, (unsigned short)s->rep->cons->fd, (unsigned short)s->req->cons->fd);
+ s->uniq_id, s->be->id, (unsigned short)s->rep->prod->fd, (unsigned short)s->req->cons->fd);
write(1, trash, len);
}
}
- rqf_cli = s->req->flags;
- rpf_cli = s->rep->flags;
}
- if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) ||
- ((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) {
- resync = 1;
- if (s->req->cons->state != SI_ST_CLO) {
+ if (s->req->cons->state != SI_ST_CLO) {
+ if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) ||
+ ((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) {
+ resync = 1;
+
if (s->req->cons->state < SI_ST_EST && s->req->flags & BF_MAY_FORWARD)
process_srv_conn(s);
@@ -704,14 +733,7 @@
buffer_shutw_now(s->req);
}
- stream_sock_process_data(s->req->cons->fd);
-
- /* Count server-side errors (but not timeouts). */
- if (s->req->flags & BF_WRITE_ERROR) {
- s->be->failed_resp++;
- if (s->srv)
- s->srv->failed_resp++;
- }
+ stream_sock_data_update(s->req->cons->fd);
/* When a server-side connection is released, we have to
* count it and check for pending connections on this server.
@@ -725,18 +747,18 @@
}
}
}
+ rqf_srv = s->req->flags;
+ rpf_srv = s->rep->flags;
if (unlikely((s->req->cons->state == SI_ST_CLO) &&
(global.mode & MODE_DEBUG) &&
(!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
int len;
len = sprintf(trash, "%08x:%s.srvcls[%04x:%04x]\n",
- s->uniq_id, s->be->id, (unsigned short)s->cli_fd, (unsigned short)s->req->cons->fd);
+ s->uniq_id, s->be->id, (unsigned short)s->req->prod->fd, (unsigned short)s->req->cons->fd);
write(1, trash, len);
}
}
- rqf_srv = s->req->flags;
- rpf_srv = s->rep->flags;
}
if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) {
@@ -752,7 +774,8 @@
if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) {
resync = 1;
/* the analysers must block it themselves */
- s->rep->flags |= BF_MAY_FORWARD;
+ if (s->req->cons->state >= SI_ST_EST)
+ s->rep->flags |= BF_MAY_FORWARD;
if (s->rep->analysers) {
process_response(s);
@@ -768,6 +791,12 @@
if ((s->fe->options & PR_O_CONTSTATS) && (s->flags & SN_BE_ASSIGNED))
session_process_counters(s);
+ if (s->rep->cons->state == SI_ST_EST)
+ stream_sock_data_finish(s->rep->cons->fd);
+
+ if (s->req->cons->state == SI_ST_EST)
+ stream_sock_data_finish(s->req->cons->fd);
+
s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
@@ -810,7 +839,7 @@
int len;
len = sprintf(trash, "%08x:%s.closed[%04x:%04x] (term_trace=0x%08x)\n",
s->uniq_id, s->be->id,
- (unsigned short)s->cli_fd, (unsigned short)s->req->cons->fd,
+ (unsigned short)s->req->prod->fd, (unsigned short)s->req->cons->fd,
s->term_trace);
write(1, trash, len);
}
@@ -1673,12 +1702,14 @@
struct buffer *req = t->req;
struct buffer *rep = t->rep;
- DPRINTF(stderr,"[%u] %s: c=%s set(r,w)=%d,%d exp(r,w)=%u,%u req=%08x rep=%08x analysers=%02x\n",
+ DPRINTF(stderr,"[%u] %s: session=%p b=%p, exp(r,w)=%u,%u bf=%08x bl=%d analysers=%02x\n",
now_ms, __FUNCTION__,
- cli_stnames[t->cli_state],
- t->cli_fd >= 0 && fdtab[t->cli_fd].state != FD_STCLOSE ? EV_FD_ISSET(t->cli_fd, DIR_RD) : 0,
- t->cli_fd >= 0 && fdtab[t->cli_fd].state != FD_STCLOSE ? EV_FD_ISSET(t->cli_fd, DIR_WR) : 0,
- req->rex, rep->wex, req->flags, rep->flags, req->analysers);
+ t,
+ req,
+ req->rex, req->wex,
+ req->flags,
+ req->l,
+ req->analysers);
/* The tcp-inspect analyser is always called alone */
if (req->analysers & AN_REQ_INSPECT) {
@@ -2692,10 +2723,14 @@
struct buffer *req = t->req;
struct buffer *rep = t->rep;
- DPRINTF(stderr,"[%u] %s: c=%s exp(r,w)=%u,%u req=%08x rep=%08x analysers=%02x\n",
+ DPRINTF(stderr,"[%u] %s: session=%p b=%p, exp(r,w)=%u,%u bf=%08x bl=%d analysers=%02x\n",
now_ms, __FUNCTION__,
- cli_stnames[t->cli_state],
- req->rex, rep->wex, req->flags, rep->flags, rep->analysers);
+ t,
+ rep,
+ rep->rex, rep->wex,
+ rep->flags,
+ rep->l,
+ rep->analysers);
if (rep->analysers & AN_RTR_HTTP_HDR) { /* receiving server headers */
/*
@@ -2838,7 +2873,7 @@
return 0;
}
/* write error to client, or close from server */
- else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTW|BF_SHUTR|BF_READ_NULL)) {
+ else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR|BF_READ_NULL)) {
buffer_shutr_now(rep);
buffer_shutw_now(req);
//fd_delete(req->cons->fd);
@@ -3146,7 +3181,7 @@
#ifdef CONFIG_HAP_TCPSPLICE
if ((t->fe->options & t->be->options) & PR_O_TCPSPLICE) {
/* TCP splicing supported by both FE and BE */
- tcp_splice_splicefd(t->cli_fd, req->cons->fd, 0);
+ tcp_splice_splicefd(rep->cons->fd, rep->prod->fd, 0);
}
#endif
/* if the user wants to log as soon as possible, without counting
@@ -3556,7 +3591,7 @@
#ifdef CONFIG_HAP_TCPSPLICE
if ((t->fe->options & t->be->options) & PR_O_TCPSPLICE) {
/* TCP splicing supported by both FE and BE */
- tcp_splice_splicefd(t->cli_fd, req->cons->fd, 0);
+ tcp_splice_splicefd(req->prod->fd, req->cons->fd, 0);
}
#endif
}
@@ -5262,7 +5297,7 @@
/* The request is valid, the user is authenticated. Let's start sending
* data.
*/
- EV_FD_CLR(t->cli_fd, DIR_RD);
+ EV_FD_CLR(t->req->prod->fd, DIR_RD);
buffer_shutr(t->req);
buffer_shutr(t->rep);
buffer_set_rlim(t->req, BUFSIZE); /* no more rewrite needed */
@@ -5282,7 +5317,7 @@
{
int len, max;
len = sprintf(trash, "%08x:%s.%s[%04x:%04x]: ", t->uniq_id, t->be->id,
- dir, (unsigned short)t->cli_fd, (unsigned short)t->req->cons->fd);
+ dir, (unsigned short)t->req->prod->fd, (unsigned short)t->req->cons->fd);
max = end - start;
UBOUND(max, sizeof(trash) - len - 1);
len += strlcpy2(trash + len, start, max + 1);
diff --git a/src/stream_sock.c b/src/stream_sock.c
index a08bf9b..3f4be67 100644
--- a/src/stream_sock.c
+++ b/src/stream_sock.c
@@ -417,12 +417,11 @@
/*
- * Manages a stream_sock connection during its data phase. The file descriptor
- * status is checked, and the read and write timeouts are controlled. The
- * buffers are examined for special shutdown cases and finally the timeouts,
- * file descriptor and buffers' flags are updated accordingly.
+ * This function only has to be called once after a wakeup event during a data
+ * phase. It controls the file descriptor's status, as well as read and write
+ * timeouts.
*/
-int stream_sock_process_data(int fd)
+int stream_sock_data_check_errors(int fd)
{
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
@@ -436,7 +435,7 @@
ib->l, ob->l);
/* Read or write error on the file descriptor */
- if (fdtab[fd].state == FD_STERROR) {
+ if (unlikely(fdtab[fd].state == FD_STERROR)) {
//trace_term(t, TT_HTTP_SRV_6);
if (!ob->cons->err_type) {
//ob->cons->err_loc = t->srv;
@@ -453,30 +452,114 @@
return 0;
}
+ /* Read timeout */
+ if (unlikely(!(ib->flags & (BF_SHUTR|BF_READ_TIMEOUT)) && tick_is_expired(ib->rex, now_ms))) {
+ //trace_term(t, TT_HTTP_SRV_12);
+ ib->flags |= BF_READ_TIMEOUT;
+ if (!ob->cons->err_type) {
+ //ob->cons->err_loc = t->srv;
+ ob->cons->err_type = SI_ET_DATA_TO;
+ }
+ buffer_shutr(ib);
+ if (ob->flags & BF_SHUTW)
+ goto do_close_and_return;
+ EV_FD_CLR(fd, DIR_RD);
+ }
+
+ /* Write timeout */
+ if (unlikely(!(ob->flags & (BF_SHUTW|BF_WRITE_TIMEOUT)) && tick_is_expired(ob->wex, now_ms))) {
+ //trace_term(t, TT_HTTP_SRV_13);
+ ob->flags |= BF_WRITE_TIMEOUT;
+ if (!ob->cons->err_type) {
+ //ob->cons->err_loc = t->srv;
+ ob->cons->err_type = SI_ET_DATA_TO;
+ }
+ buffer_shutw(ob);
+ if (ib->flags & BF_SHUTR)
+ goto do_close_and_return;
+
+ EV_FD_CLR(fd, DIR_WR);
+ shutdown(fd, SHUT_WR);
+ }
+ return 0;
+}
+
+/*
+ * Manages a stream_sock connection during its data phase. The buffers are
+ * examined for various cases of shutdown, then file descriptor and buffers'
+ * flags are updated accordingly.
+ */
+int stream_sock_data_update(int fd)
+{
+ struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
+ struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
+
+ DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
+ now_ms, __FUNCTION__,
+ fd, fdtab[fd].owner,
+ ib, ob,
+ ib->rex, ob->wex,
+ ib->flags, ob->flags,
+ ib->l, ob->l);
+
/* Check if we need to close the read side */
if (!(ib->flags & BF_SHUTR)) {
/* Last read, forced read-shutdown, or other end closed */
if (ib->flags & (BF_READ_NULL|BF_SHUTR_NOW|BF_SHUTW)) {
//trace_term(t, TT_HTTP_SRV_10);
- do_close_read:
buffer_shutr(ib);
- if (ob->flags & BF_SHUTW)
- goto do_close_and_return;
-
+ if (ob->flags & BF_SHUTW) {
+ fd_delete(fd);
+ ob->cons->state = SI_ST_CLO;
+ return 0;
+ }
EV_FD_CLR(fd, DIR_RD);
}
- /* Read timeout */
- else if (unlikely(!(ib->flags & BF_READ_TIMEOUT) && tick_is_expired(ib->rex, now_ms))) {
- //trace_term(t, TT_HTTP_SRV_12);
- ib->flags |= BF_READ_TIMEOUT;
- if (!ob->cons->err_type) {
- //ob->cons->err_loc = t->srv;
- ob->cons->err_type = SI_ET_DATA_TO;
+ }
+
+ /* Check if we need to close the write side */
+ if (!(ob->flags & BF_SHUTW)) {
+ /* Forced write-shutdown or other end closed with empty buffer. */
+ if ((ob->flags & BF_SHUTW_NOW) ||
+ (ob->flags & (BF_EMPTY|BF_HIJACK|BF_MAY_FORWARD|BF_SHUTR)) == (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) {
+ //trace_term(t, TT_HTTP_SRV_11);
+ buffer_shutw(ob);
+ if (ib->flags & BF_SHUTR) {
+ fd_delete(fd);
+ ob->cons->state = SI_ST_CLO;
+ return 0;
}
- goto do_close_read;
+ EV_FD_CLR(fd, DIR_WR);
+ shutdown(fd, SHUT_WR);
}
+ }
+ return 0; /* other cases change nothing */
+}
+
+
+/*
+ * Updates a connected stream_sock file descriptor status and timeouts
+ * according to the buffers' flags. It should only be called once after the
+ * buffer flags have settled down, and before they are cleared. It doesn't
+ * harm to call it as often as desired (it just slightly hurts performance).
+ */
+int stream_sock_data_finish(int fd)
+{
+ struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
+ struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
+
+ DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
+ now_ms, __FUNCTION__,
+ fd, fdtab[fd].owner,
+ ib, ob,
+ ib->rex, ob->wex,
+ ib->flags, ob->flags,
+ ib->l, ob->l);
+
+ /* Check if we need to close the read side */
+ if (!(ib->flags & BF_SHUTR)) {
/* Read not closed, update FD status and timeout for reads */
- else if (ib->flags & (BF_FULL|BF_HIJACK)) {
+ if (ib->flags & (BF_FULL|BF_HIJACK)) {
/* stop reading */
EV_FD_COND_C(fd, DIR_RD);
ib->rex = TICK_ETERNITY;
@@ -494,30 +577,9 @@
/* Check if we need to close the write side */
if (!(ob->flags & BF_SHUTW)) {
- /* Forced write-shutdown or other end closed with empty buffer. */
- if ((ob->flags & BF_SHUTW_NOW) ||
- (ob->flags & (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) == (BF_EMPTY|BF_MAY_FORWARD|BF_SHUTR)) {
- //trace_term(t, TT_HTTP_SRV_11);
- do_close_write:
- buffer_shutw(ob);
- if (ib->flags & BF_SHUTR)
- goto do_close_and_return;
-
- EV_FD_CLR(fd, DIR_WR);
- shutdown(fd, SHUT_WR);
- }
- /* Write timeout */
- else if (unlikely(!(ob->flags & BF_WRITE_TIMEOUT) && tick_is_expired(ob->wex, now_ms))) {
- //trace_term(t, TT_HTTP_SRV_13);
- ob->flags |= BF_WRITE_TIMEOUT;
- if (!ob->cons->err_type) {
- //ob->cons->err_loc = t->srv;
- ob->cons->err_type = SI_ET_DATA_TO;
- }
- goto do_close_write;
- }
/* Write not closed, update FD status and timeout for writes */
- else if ((ob->flags & (BF_EMPTY|BF_MAY_FORWARD)) != BF_MAY_FORWARD) {
+ if ((ob->flags & BF_EMPTY) ||
+ (ob->flags & (BF_HIJACK|BF_MAY_FORWARD)) == 0) {
/* stop writing */
EV_FD_COND_C(fd, DIR_WR);
ob->wex = TICK_ETERNITY;
@@ -541,7 +603,7 @@
}
}
}
- return 0; /* other cases change nothing */
+ return 0;
}