[MAJOR] make stream sockets aware of the stream interface
As of now, a stream socket does not directly wake up the task
but it does contact the stream interface which itself knows the
task. This allows us to perform a few cleanups upon errors and
shutdowns, which reduces the number of calls to data_update()
from 8 per session to 2 per session, and make all the functions
called in the process_session() loop completely swappable.
Some improvements are required. We need to provide a shutw()
function on stream interfaces so that one side which closes
its read part on an empty buffer can propagate the close to
the remote side.
diff --git a/include/proto/stream_sock.h b/include/proto/stream_sock.h
index fe89d98..e104054 100644
--- a/include/proto/stream_sock.h
+++ b/include/proto/stream_sock.h
@@ -33,7 +33,7 @@
/* main event functions used to move data between sockets and buffers */
int stream_sock_read(int fd);
int stream_sock_write(int fd);
-int stream_sock_data_check_errors(int fd);
+int stream_sock_data_check_timeouts(int fd);
int stream_sock_data_update(int fd);
int stream_sock_data_finish(int fd);
diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h
index edfb758..2525f12 100644
--- a/include/types/stream_interface.h
+++ b/include/types/stream_interface.h
@@ -42,24 +42,26 @@
/* error types reported on the streams interface for more accurate reporting */
enum {
- SI_ET_NONE = 0, /* no error yet, leave it to zero */
- SI_ET_QUEUE_TO, /* queue timeout */
- SI_ET_QUEUE_ERR, /* queue error (eg: full) */
- SI_ET_QUEUE_ABRT, /* aborted in queue by external cause */
- SI_ET_CONN_TO, /* connection timeout */
- SI_ET_CONN_ERR, /* connection error (eg: no server available) */
- SI_ET_CONN_ABRT, /* connection aborted by external cause (eg: abort) */
- SI_ET_CONN_OTHER, /* connection aborted for other reason (eg: 500) */
- SI_ET_DATA_TO, /* timeout during data phase */
- SI_ET_DATA_ERR, /* error during data phase */
- SI_ET_DATA_ABRT, /* data phase aborted by external cause */
+ SI_ET_NONE = 0x0000, /* no error yet, leave it to zero */
+ SI_ET_QUEUE_TO = 0x0001, /* queue timeout */
+ SI_ET_QUEUE_ERR = 0x0002, /* queue error (eg: full) */
+ SI_ET_QUEUE_ABRT = 0x0004, /* aborted in queue by external cause */
+ SI_ET_CONN_TO = 0x0008, /* connection timeout */
+ SI_ET_CONN_ERR = 0x0010, /* connection error (eg: no server available) */
+ SI_ET_CONN_ABRT = 0x0020, /* connection aborted by external cause (eg: abort) */
+ SI_ET_CONN_OTHER = 0x0040, /* connection aborted for other reason (eg: 500) */
+ SI_ET_DATA_TO = 0x0080, /* timeout during data phase */
+ SI_ET_DATA_ERR = 0x0100, /* error during data phase */
+ SI_ET_DATA_ABRT = 0x0200, /* data phase aborted by external cause */
};
struct stream_interface {
unsigned int state; /* SI_ST* */
- int err_type; /* first error detected, one of SI_ET_* */
- void *err_loc; /* commonly the server, NULL when SI_ET_NONE */
+ unsigned int prev_state;/* SI_ST*, copy of previous state */
+ void *owner; /* generally a (struct task*) */
int fd; /* file descriptor for a stream driver when known */
+ unsigned int err_type; /* first error detected, one of SI_ET_* */
+ void *err_loc; /* commonly the server, NULL when SI_ET_NONE */
};
diff --git a/src/backend.c b/src/backend.c
index f51ac8f..16b2cc9 100644
--- a/src/backend.c
+++ b/src/backend.c
@@ -1805,7 +1805,7 @@
}
}
- fdtab[fd].owner = s->task;
+ fdtab[fd].owner = s->req->cons;
fdtab[fd].state = FD_STCONN; /* connection in progress */
fdtab[fd].cb[DIR_RD].f = &stream_sock_read;
fdtab[fd].cb[DIR_RD].b = s->rep;
diff --git a/src/client.c b/src/client.c
index 1f577d1..ef1ee09 100644
--- a/src/client.c
+++ b/src/client.c
@@ -173,12 +173,14 @@
s->si[0].state = SI_ST_EST;
s->si[0].err_type = SI_ET_NONE;
s->si[0].err_loc = NULL;
+ s->si[0].owner = t;
s->si[0].fd = cfd;
s->cli_fd = cfd;
s->si[1].state = SI_ST_INI;
s->si[1].err_type = SI_ET_NONE;
s->si[1].err_loc = NULL;
+ s->si[1].owner = t;
s->si[1].fd = -1; /* just to help with debugging */
s->srv = s->prev_srv = s->srv_conn = NULL;
@@ -373,7 +375,7 @@
t->expire = TICK_ETERNITY;
fd_insert(cfd);
- fdtab[cfd].owner = t;
+ fdtab[cfd].owner = &s->si[0];
fdtab[cfd].listener = l;
fdtab[cfd].state = FD_STREADY;
fdtab[cfd].cb[DIR_RD].f = l->proto->read;
diff --git a/src/proto_http.c b/src/proto_http.c
index bc8d3fb..b3db20d 100644
--- a/src/proto_http.c
+++ b/src/proto_http.c
@@ -660,40 +660,151 @@
unsigned int rqf_srv, rpf_srv;
unsigned int rqf_req, rpf_rep;
- /* check server-side errors during data phase */
- if (s->req->cons->state == SI_ST_EST) {
- stream_sock_data_check_errors(s->req->cons->fd);
- /* When a server-side connection is released, we have to
- * count it and check for pending connections on this server.
- */
- if (unlikely(s->req->cons->state == SI_ST_CLO)) {
- /* Count server-side errors (but not timeouts). */
- if (s->req->flags & BF_WRITE_ERROR) {
- s->be->failed_resp++;
- if (s->srv)
- s->srv->failed_resp++;
- }
+ /* Check timeouts only during data phase for now */
+ if (unlikely(t->state & TASK_WOKEN_TIMER)) {
+ if (s->rep->cons->state == SI_ST_EST)
+ stream_sock_data_check_timeouts(s->rep->cons->fd);
- if (s->srv) {
- s->srv->cur_sess--;
- sess_change_server(s, NULL);
- if (may_dequeue_tasks(s->srv, s->be))
- process_srv_queue(s->srv);
- }
+ if (s->req->cons->state == SI_ST_EST)
+ stream_sock_data_check_timeouts(s->req->cons->fd);
+ }
+
+ /* When a server-side connection is released, we have to
+ * count it and check for pending connections on this server.
+ */
+ if (unlikely(s->req->cons->state == SI_ST_CLO &&
+ s->req->cons->prev_state == SI_ST_EST)) {
+ /* Count server-side errors (but not timeouts). */
+ if (s->req->flags & BF_WRITE_ERROR) {
+ s->be->failed_resp++;
+ if (s->srv)
+ s->srv->failed_resp++;
+ }
+
+ if (s->srv) {
+ s->srv->cur_sess--;
+ sess_change_server(s, NULL);
+ if (may_dequeue_tasks(s->srv, s->be))
+ process_srv_queue(s->srv);
+ }
+
+ if (unlikely((s->req->cons->state == SI_ST_CLO) &&
+ (global.mode & MODE_DEBUG) &&
+ (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
+ int len;
+ len = sprintf(trash, "%08x:%s.srvcls[%04x:%04x]\n",
+ s->uniq_id, s->be->id, (unsigned short)s->req->prod->fd, (unsigned short)s->req->cons->fd);
+ write(1, trash, len);
+ }
+ }
+
+ if (unlikely(s->rep->cons->state == SI_ST_CLO &&
+ s->rep->cons->prev_state == SI_ST_EST)) {
+ if (unlikely((s->rep->cons->state == SI_ST_CLO) &&
+ (global.mode & MODE_DEBUG) &&
+ (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
+ int len;
+ len = sprintf(trash, "%08x:%s.clicls[%04x:%04x]\n",
+ s->uniq_id, s->be->id, (unsigned short)s->rep->prod->fd, (unsigned short)s->req->cons->fd);
+ write(1, trash, len);
+ }
+ }
+
+
+ /* Check if we need to close the write side. This can only happen
+ * when either SHUTR or EMPTY appears, because WRITE_ENA cannot appear
+ * from low level, and neither HIJACK nor SHUTW can disappear from low
+ * level. Later, this should move to stream_sock_{read,write}.
+ */
+ if ((s->req->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) {
+ buffer_shutw(s->req);
+ if (s->rep->flags & BF_SHUTR) {
+ fd_delete(s->req->cons->fd);
+ s->req->cons->state = SI_ST_CLO;
+ }
+ else {
+ EV_FD_CLR(s->req->cons->fd, DIR_WR);
+ shutdown(s->req->cons->fd, SHUT_WR);
+ }
+ }
+
+ /* Check if we need to close the write side */
+ if ((s->rep->flags & (BF_SHUTW|BF_EMPTY|BF_HIJACK|BF_WRITE_ENA|BF_SHUTR)) == (BF_EMPTY|BF_WRITE_ENA|BF_SHUTR)) {
+ buffer_shutw(s->rep);
+ if (s->req->flags & BF_SHUTR) {
+ fd_delete(s->rep->cons->fd);
+ s->rep->cons->state = SI_ST_CLO;
+ }
+ else {
+ EV_FD_CLR(s->rep->cons->fd, DIR_WR);
+ shutdown(s->rep->cons->fd, SHUT_WR);
}
}
- /* check client-side errors during data phase */
- if (s->rep->cons->state == SI_ST_EST)
- stream_sock_data_check_errors(s->rep->cons->fd);
- /* force one first pass everywhere */
+
+ /* Dirty trick: force one first pass everywhere */
rqf_cli = rqf_srv = rqf_req = ~s->req->flags;
rpf_cli = rpf_srv = rpf_rep = ~s->rep->flags;
+ /* well, the ST_CONN state is already handled properly */
+ if (s->req->prod->state == SI_ST_EST) {
+ rqf_cli = s->req->flags;
+ rpf_cli = s->rep->flags;
+ }
+
+ if (s->req->cons->state == SI_ST_EST) {
+ rqf_srv = s->req->flags;
+ rpf_srv = s->rep->flags;
+ }
+
do {
+ DPRINTF(stderr,"[%u] %s: task=%p rq=%p, rp=%p, exp(r,w)=%u,%u rqf=%08x rpf=%08x rql=%d rpl=%d cs=%d ss=%d\n",
+ now_ms, __FUNCTION__,
+ t,
+ s->req, s->rep,
+ s->req->rex, s->rep->wex,
+ s->req->flags, s->rep->flags,
+ s->req->l, s->rep->l, s->rep->cons->state, s->req->cons->state);
+
resync = 0;
+ /* Analyse request */
+ if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) {
+ if (s->req->prod->state >= SI_ST_EST) {
+ resync = 1;
+ /* it's up to the analysers to reset write_ena */
+ buffer_write_ena(s->req);
+ if (s->req->analysers)
+ process_request(s);
+ rqf_req = s->req->flags;
+ }
+
+ }
+
+ /* Analyse response */
+ if (unlikely(s->rep->flags & BF_HIJACK)) {
+ /* In inject mode, we wake up everytime something has
+ * happened on the write side of the buffer.
+ */
+ if ((s->rep->flags & (BF_WRITE_PARTIAL|BF_WRITE_ERROR|BF_SHUTW)) &&
+ !(s->rep->flags & BF_FULL)) {
+ if (produce_content(s) != 0)
+ resync = 1; /* completed, better re-check flags */
+ }
+ }
+ else if (s->rep->prod->state >= SI_ST_EST) {
+ if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) {
+ resync = 1;
+ /* it's up to the analysers to reset write_ena */
+ buffer_write_ena(s->rep);
+ if (s->rep->analysers)
+ process_response(s);
+ rpf_rep = s->rep->flags;
+ }
+ }
+
+ /* Maybe resync client FD state */
if (s->rep->cons->state != SI_ST_CLO) {
if (((rqf_cli ^ s->req->flags) & BF_MASK_INTERFACE_I) ||
((rpf_cli ^ s->rep->flags) & BF_MASK_INTERFACE_O)) {
@@ -713,7 +824,7 @@
}
}
-
+ /* Maybe resync server FD state */
if (s->req->cons->state != SI_ST_CLO) {
if (((rpf_srv ^ s->rep->flags) & BF_MASK_INTERFACE_I) ||
((rqf_srv ^ s->req->flags) & BF_MASK_INTERFACE_O)) {
@@ -761,38 +872,6 @@
}
}
- if ((rqf_req ^ s->req->flags) & BF_MASK_ANALYSER) {
- /* the analysers must block it themselves */
- if (s->req->prod->state >= SI_ST_EST) {
- resync = 1;
- buffer_write_ena(s->req);
- if (s->req->analysers)
- process_request(s);
- }
- rqf_req = s->req->flags;
- }
-
- if (unlikely(s->rep->flags & BF_HIJACK)) {
- /* In inject mode, we wake up everytime something has
- * happened on the write side of the buffer.
- */
- if ((s->rep->flags & (BF_WRITE_PARTIAL|BF_WRITE_ERROR|BF_SHUTW)) &&
- !(s->rep->flags & BF_FULL)) {
- if (produce_content(s) != 0)
- resync = 1; /* completed, better re-check flags */
- }
- }
- else if (s->rep->prod->state >= SI_ST_EST) {
- if ((rpf_rep ^ s->rep->flags) & BF_MASK_ANALYSER) {
- /* the analysers must block it themselves */
- resync = 1;
- buffer_write_ena(s->rep);
- if (s->rep->analysers)
- process_response(s);
- rpf_rep = s->rep->flags;
- }
- }
-
} while (resync);
if (likely((s->rep->cons->state != SI_ST_CLO) ||
@@ -809,6 +888,8 @@
s->req->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
s->rep->flags &= BF_CLEAR_READ & BF_CLEAR_WRITE;
+ s->si[0].prev_state = s->si[0].state;
+ s->si[1].prev_state = s->si[1].state;
/* Trick: if a request is being waiting for the server to respond,
* and if we know the server can timeout, we don't want the timeout
@@ -1766,7 +1847,7 @@
* - if one rule returns KO, then return KO
*/
- if (req->flags & (BF_READ_NULL | BF_SHUTR) || tick_is_expired(req->analyse_exp, now_ms))
+ if (req->flags & BF_SHUTR || tick_is_expired(req->analyse_exp, now_ms))
partial = 0;
else
partial = ACL_PARTIAL;
@@ -1921,7 +2002,7 @@
}
/* 4: have we encountered a close ? */
- else if (req->flags & (BF_READ_NULL | BF_SHUTR)) {
+ else if (req->flags & BF_SHUTR) {
txn->status = 400;
client_retnclose(t, error_message(t, HTTP_ERR_400));
msg->msg_state = HTTP_MSG_ERROR;
@@ -2607,7 +2688,7 @@
* timeout. We just have to check that the client is still
* there and that the timeout has not expired.
*/
- if ((req->flags & (BF_READ_NULL|BF_READ_ERROR)) == 0 &&
+ if ((req->flags & (BF_SHUTR|BF_READ_ERROR)) == 0 &&
!tick_is_expired(req->analyse_exp, now_ms))
return 0;
@@ -2690,7 +2771,7 @@
* buffer closed).
*/
if (req->l - body >= limit || /* enough bytes! */
- req->flags & (BF_FULL | BF_READ_ERROR | BF_SHUTR | BF_READ_NULL | BF_READ_TIMEOUT) ||
+ req->flags & (BF_FULL | BF_READ_ERROR | BF_SHUTR | BF_READ_TIMEOUT) ||
tick_is_expired(req->analyse_exp, now_ms)) {
/* The situation will not evolve, so let's give up on the analysis. */
t->logs.tv_request = now; /* update the request timer to reflect full request */
@@ -2887,7 +2968,7 @@
return 0;
}
/* write error to client, or close from server */
- else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR|BF_READ_NULL)) {
+ else if (rep->flags & (BF_WRITE_ERROR|BF_SHUTR)) {
buffer_shutr_now(rep);
buffer_shutw_now(req);
//fd_delete(req->cons->fd);
diff --git a/src/stream_sock.c b/src/stream_sock.c
index 52860ea..14a8df2 100644
--- a/src/stream_sock.c
+++ b/src/stream_sock.c
@@ -42,6 +42,7 @@
int stream_sock_read(int fd) {
__label__ out_wakeup, out_shutdown_r, out_error;
struct buffer *b = fdtab[fd].cb[DIR_RD].b;
+ struct stream_interface *si = fdtab[fd].owner;
int ret, max, retval, cur_read;
int read_poll = MAX_READ_POLL_LOOPS;
@@ -239,16 +240,21 @@
if (!(b->flags & BF_READ_ACTIVITY))
goto out_skip_wakeup;
out_wakeup:
- task_wakeup(fdtab[fd].owner, TASK_WOKEN_IO);
+ task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_IN;
return retval;
out_shutdown_r:
+ /* we received a shutdown */
fdtab[fd].ev &= ~FD_POLL_HUP;
b->flags |= BF_READ_NULL;
- b->rex = TICK_ETERNITY;
+ buffer_shutr(b);
+ /* Maybe we have to completely close the socket */
+ if (fdtab[fd].cb[DIR_WR].b->flags & BF_SHUTW)
+ goto do_close_and_return;
+ EV_FD_CLR(fd, DIR_RD);
goto out_wakeup;
out_error:
@@ -258,7 +264,27 @@
fdtab[fd].state = FD_STERROR;
fdtab[fd].ev &= ~FD_POLL_STICKY;
b->rex = TICK_ETERNITY;
- goto out_wakeup;
+
+ /* Read error on the file descriptor. We close the FD and set
+ * the error on both buffers.
+ * Note: right now we only support connected sockets.
+ */
+ if (si->state != SI_ST_EST)
+ goto out_wakeup;
+
+ if (!si->err_type)
+ si->err_type = SI_ET_DATA_ERR;
+
+ buffer_shutr(fdtab[fd].cb[DIR_RD].b);
+ fdtab[fd].cb[DIR_RD].b->flags |= BF_READ_ERROR;
+ buffer_shutw(fdtab[fd].cb[DIR_WR].b);
+ fdtab[fd].cb[DIR_WR].b->flags |= BF_WRITE_ERROR;
+
+ do_close_and_return:
+ fd_delete(fd);
+ si->state = SI_ST_CLO;
+ task_wakeup(si->owner, TASK_WOKEN_IO);
+ return 1;
}
@@ -271,6 +297,7 @@
int stream_sock_write(int fd) {
__label__ out_wakeup, out_error;
struct buffer *b = fdtab[fd].cb[DIR_WR].b;
+ struct stream_interface *si = fdtab[fd].owner;
int ret, max, retval;
int write_poll = MAX_WRITE_POLL_LOOPS;
@@ -411,7 +438,7 @@
if (!(b->flags & BF_WRITE_ACTIVITY))
goto out_skip_wakeup;
out_wakeup:
- task_wakeup(fdtab[fd].owner, TASK_WOKEN_IO);
+ task_wakeup(si->owner, TASK_WOKEN_IO);
out_skip_wakeup:
fdtab[fd].ev &= ~FD_POLL_OUT;
@@ -424,7 +451,25 @@
fdtab[fd].state = FD_STERROR;
fdtab[fd].ev &= ~FD_POLL_STICKY;
b->wex = TICK_ETERNITY;
- goto out_wakeup;
+ /* Read error on the file descriptor. We close the FD and set
+ * the error on both buffers.
+ * Note: right now we only support connected sockets.
+ */
+ if (si->state != SI_ST_EST)
+ goto out_wakeup;
+
+ if (!si->err_type)
+ si->err_type = SI_ET_DATA_ERR;
+
+ buffer_shutr(fdtab[fd].cb[DIR_RD].b);
+ fdtab[fd].cb[DIR_RD].b->flags |= BF_READ_ERROR;
+ buffer_shutw(fdtab[fd].cb[DIR_WR].b);
+ fdtab[fd].cb[DIR_WR].b->flags |= BF_WRITE_ERROR;
+
+ fd_delete(fd);
+ si->state = SI_ST_CLO;
+ task_wakeup(si->owner, TASK_WOKEN_IO);
+ return 1;
}
@@ -433,7 +478,7 @@
* phase. It controls the file descriptor's status, as well as read and write
* timeouts.
*/
-int stream_sock_data_check_errors(int fd)
+int stream_sock_data_check_timeouts(int fd)
{
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
@@ -446,24 +491,6 @@
ib->flags, ob->flags,
ib->l, ob->l);
- /* Read or write error on the file descriptor */
- if (unlikely(fdtab[fd].state == FD_STERROR)) {
- //trace_term(t, TT_HTTP_SRV_6);
- if (!ob->cons->err_type) {
- //ob->cons->err_loc = t->srv;
- ob->cons->err_type = SI_ET_DATA_ERR;
- }
- buffer_shutw(ob);
- ob->flags |= BF_WRITE_ERROR;
- buffer_shutr(ib);
- ib->flags |= BF_READ_ERROR;
-
- do_close_and_return:
- fd_delete(fd);
- ob->cons->state = SI_ST_CLO;
- return 0;
- }
-
/* Read timeout */
if (unlikely(!(ib->flags & (BF_SHUTR|BF_READ_TIMEOUT)) && tick_is_expired(ib->rex, now_ms))) {
//trace_term(t, TT_HTTP_SRV_12);
@@ -473,8 +500,13 @@
ob->cons->err_type = SI_ET_DATA_TO;
}
buffer_shutr(ib);
- if (ob->flags & BF_SHUTW)
- goto do_close_and_return;
+ if (ob->flags & BF_SHUTW) {
+ do_close_and_return:
+ fd_delete(fd);
+ ob->cons->state = SI_ST_CLO;
+ return 0;
+ }
+
EV_FD_CLR(fd, DIR_RD);
}
@@ -506,18 +538,18 @@
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
- DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
+ DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
now_ms, __FUNCTION__,
fd, fdtab[fd].owner,
ib, ob,
ib->rex, ob->wex,
ib->flags, ob->flags,
- ib->l, ob->l);
+ ib->l, ob->l, ob->cons->state);
/* Check if we need to close the read side */
if (!(ib->flags & BF_SHUTR)) {
/* Last read, forced read-shutdown, or other end closed */
- if (ib->flags & (BF_READ_NULL|BF_SHUTR_NOW|BF_SHUTW)) {
+ if (ib->flags & (BF_SHUTR_NOW|BF_SHUTW)) {
//trace_term(t, TT_HTTP_SRV_10);
buffer_shutr(ib);
if (ob->flags & BF_SHUTW) {
@@ -560,13 +592,13 @@
struct buffer *ib = fdtab[fd].cb[DIR_RD].b;
struct buffer *ob = fdtab[fd].cb[DIR_WR].b;
- DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d\n",
+ DPRINTF(stderr,"[%u] %s: fd=%d owner=%p ib=%p, ob=%p, exp(r,w)=%u,%u ibf=%08x obf=%08x ibl=%d obl=%d si=%d\n",
now_ms, __FUNCTION__,
fd, fdtab[fd].owner,
ib, ob,
ib->rex, ob->wex,
ib->flags, ob->flags,
- ib->l, ob->l);
+ ib->l, ob->l, ob->cons->state);
/* Check if we need to close the read side */
if (!(ib->flags & BF_SHUTR)) {