MEDIUM: mux-h1: Rely on the H1C to deal with shutdown for reads
read0 is now handled with a H1 connection flag (H1C_F_EOS). Corresponding
flag was removed on the H1 stream and we fully rely on the SE descriptor at
the stream level.
Concretly, it means we rely on the H1 connection flags instead of the
connection one. H1C_F_EOS is only set in h1_recv() or h1_rcv_pipe() after a
read if a read0 was detected.
diff --git a/src/mux_h1.c b/src/mux_h1.c
index 3f7e102..5f3f121 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -421,39 +421,35 @@
/*****************************************************/
/*
* Indicates whether or not we may receive data. The rules are the following :
- * - if an error or a shutdown for reads was detected on the connection we
+ * - if an error or a shutdown for reads was detected on the H1 connection we
* must not attempt to receive
* - if we are waiting for the connection establishment, we must not attempt
* to receive
- * - if an error was detected on the stream we must not attempt to receive
* - if reads are explicitly disabled, we must not attempt to receive
* - if the input buffer failed to be allocated or is full , we must not try
* to receive
- * - if the mux is not blocked on an input condition, we may attempt to receive
- * - otherwise must may not attempt to receive
+ * - if the mux is blocked on an input condition, we must may not attempt to
+ * receive
+ * - otherwise we may attempt to receive
*/
static inline int h1_recv_allowed(const struct h1c *h1c)
{
- if (h1c->flags & H1C_F_ERROR) {
- TRACE_DEVEL("recv not allowed because of error on h1c", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn);
+ if (h1c->flags & (H1C_F_EOS|H1C_F_ERROR)) {
+ TRACE_DEVEL("recv not allowed because of (eos|error) on h1c", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn);
return 0;
}
- if (h1c->conn->flags & (CO_FL_ERROR|CO_FL_SOCK_RD_SH|CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) {
- TRACE_DEVEL("recv not allowed because of (error|read0|waitl4|waitl6) on connection", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn);
+ if (h1c->conn->flags & (CO_FL_WAIT_L4_CONN|CO_FL_WAIT_L6_CONN)) {
+ TRACE_DEVEL("recv not allowed because of (waitl4|waitl6) on connection", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn);
return 0;
}
- if (h1c->h1s && (h1c->h1s->flags & H1S_F_ERROR_MASK)) {
- TRACE_DEVEL("recv not allowed because of error on h1s", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn);
+ if ((h1c->flags & (H1C_F_IN_ALLOC|H1C_F_IN_FULL|H1C_F_IN_SALLOC))) {
+ TRACE_DEVEL("recv not allowed because input is blocked", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn);
return 0;
}
- if (!(h1c->flags & (H1C_F_IN_ALLOC|H1C_F_IN_FULL|H1C_F_IN_SALLOC)))
- return 1;
-
- TRACE_DEVEL("recv not allowed because input is blocked", H1_EV_H1C_RECV|H1_EV_H1C_BLK, h1c->conn);
- return 0;
+ return 1;
}
/*
@@ -861,9 +857,8 @@
TRACE_ERROR("h1s on error, set error on h1c", H1_EV_H1S_END|H1_EV_H1C_ERR, h1c->conn, h1s);
}
- if (!(h1c->flags & H1C_F_ERROR) && /* No error */
+ if (!(h1c->flags & (H1C_F_EOS|H1C_F_ERR_PENDING|H1C_F_ERROR)) && /* No error/read0 */
h1_is_alive(h1c) && /* still alive */
- !(h1c->conn->flags & (CO_FL_ERROR|CO_FL_SOCK_RD_SH|CO_FL_SOCK_WR_SH)) && /* No error/shutdown on conn */
(h1s->flags & H1S_F_WANT_KAL) && /* K/A possible */
h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE) { /* req/res in DONE state */
h1c->state = H1_CS_IDLE;
@@ -1899,12 +1894,14 @@
}
else {
se_fl_clr(h1s->sd, SE_FL_RCV_MORE | SE_FL_WANT_ROOM);
- if (h1s->flags & H1S_F_REOS) {
+ if (h1c->flags & H1C_F_EOS) {
se_fl_set(h1s->sd, SE_FL_EOS);
+ TRACE_STATE("report EOS to SE", H1_EV_RX_DATA, h1c->conn, h1s);
if (h1m->state >= H1_MSG_DONE || (h1m->state > H1_MSG_LAST_LF && !(h1m->flags & H1_MF_XFER_LEN))) {
/* DONE or TUNNEL or SHUTR without XFER_LEN, set
* EOI on the stream connector */
se_fl_set(h1s->sd, SE_FL_EOI);
+ TRACE_STATE("report EOI to SE", H1_EV_RX_DATA, h1c->conn, h1s);
}
else if (h1m->state < H1_MSG_DONE) {
se_fl_set(h1s->sd, SE_FL_ERROR);
@@ -2474,7 +2471,6 @@
* to the client. Switch the response to tunnel mode.
*/
h1_set_tunnel_mode(h1s);
- TRACE_STATE("switch H1 response in tunnel mode", H1_EV_TX_DATA|H1_EV_TX_HDRS, h1c->conn, h1s);
}
if (h1s->flags & H1S_F_RX_BLK) {
@@ -2492,9 +2488,9 @@
/* Unexpected error during output processing */
chn_htx->flags |= HTX_FL_PROCESSING_ERROR;
h1s->flags |= H1S_F_PROCESSING_ERROR;
- h1c->flags |= H1C_F_ERROR;
- TRACE_ERROR("processing output error, set error on h1c/h1s",
- H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
+ se_fl_set(h1s->sd, SE_FL_ERROR);
+ TRACE_ERROR("processing output error, set error on h1s",
+ H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
goto end;
}
@@ -2531,11 +2527,14 @@
* in the output buffer.
*/
if (h1s->req.state == H1_MSG_DONE && h1s->res.state == H1_MSG_DONE) {
+ se_fl_set(h1s->sd, SE_FL_EOI);
if (!htx_is_empty(chn_htx)) {
- h1c->flags |= H1C_F_ERROR;
- TRACE_ERROR("txn done but data waiting to be sent, set error on h1c", H1_EV_H1C_ERR, h1c->conn, h1s);
+ chn_htx->flags |= HTX_FL_PROCESSING_ERROR;
+ h1s->flags |= H1S_F_PROCESSING_ERROR;
+ se_fl_set(h1s->sd, SE_FL_ERROR);
+ TRACE_ERROR("txn done but data waiting to be sent, set error on h1s",
+ H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
}
- se_fl_set(h1s->sd, SE_FL_EOI);
}
TRACE_LEAVE(H1_EV_TX_DATA, h1c->conn, h1s, chn_htx, (size_t[]){total});
@@ -2815,13 +2814,23 @@
ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &h1c->ibuf, max, flags);
HA_ATOMIC_ADD(&h1c->px_counters->bytes_in, ret);
}
+
+ if (conn_xprt_read0_pending(conn)) {
+ TRACE_DEVEL("read0 on connection", H1_EV_H1C_RECV, h1c->conn);
+ h1c->flags |= H1C_F_EOS;
+ }
+ if (h1c->conn->flags & CO_FL_ERROR) {
+ TRACE_DEVEL("connection error", H1_EV_H1C_RECV, h1c->conn);
+ h1c->flags |= H1C_F_ERROR;
+ }
+
if (max && !ret && h1_recv_allowed(h1c)) {
TRACE_STATE("failed to receive data, subscribing", H1_EV_H1C_RECV, h1c->conn);
conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
}
else {
+ TRACE_DATA("data received or pending or connection error", H1_EV_H1C_RECV, h1c->conn, 0, 0, (size_t[]){ret});
h1_wake_stream_for_recv(h1c->h1s);
- TRACE_DATA("data received", H1_EV_H1C_RECV, h1c->conn, 0, 0, (size_t[]){ret});
}
if (!b_data(&h1c->ibuf))
@@ -2832,7 +2841,7 @@
}
TRACE_LEAVE(H1_EV_H1C_RECV, h1c->conn);
- return !!ret || (conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn);
+ return !!ret || (h1c->flags & (H1C_F_EOS|H1C_F_ERROR));
}
@@ -2848,9 +2857,11 @@
TRACE_ENTER(H1_EV_H1C_SEND, h1c->conn);
- if (conn->flags & CO_FL_ERROR) {
- TRACE_DEVEL("leaving on connection error", H1_EV_H1C_SEND, h1c->conn);
+ if (h1c->flags & (H1C_F_ERROR|H1C_F_ERR_PENDING)) {
+ TRACE_DEVEL("leaving on H1C error|err_pending", H1_EV_H1C_SEND, h1c->conn);
b_reset(&h1c->obuf);
+ if (h1c->flags & H1C_F_EOS)
+ h1c->flags |= H1C_F_ERROR;
return 1;
}
@@ -2874,9 +2885,12 @@
sent = 1;
}
- if (conn->flags & (CO_FL_ERROR|CO_FL_SOCK_WR_SH)) {
- TRACE_DEVEL("connection error or output closed", H1_EV_H1C_SEND, h1c->conn);
- /* error or output closed, nothing to send, clear the buffer to release it */
+ if (conn->flags & CO_FL_ERROR) {
+ /* connection error, nothing to send, clear the buffer to release it */
+ TRACE_DEVEL("connection error", H1_EV_H1C_SEND, h1c->conn);
+ h1c->flags |= H1C_F_ERR_PENDING;
+ if (h1c->flags & H1C_F_EOS)
+ h1c->flags |= H1C_F_ERROR;
b_reset(&h1c->obuf);
}
@@ -2899,7 +2913,7 @@
}
TRACE_LEAVE(H1_EV_H1C_SEND, h1c->conn);
- return sent || (h1c->state == H1_CS_CLOSED);
+ return sent || (h1c->flags & (H1C_F_ERR_PENDING|H1C_F_ERROR)) || (h1c->state == H1_CS_CLOSED);
}
/* callback called on any event by the connection handler.
@@ -2915,7 +2929,7 @@
/* Try to parse now the first block of a request, creating the H1 stream if necessary */
if (b_data(&h1c->ibuf) && /* Input data to be processed */
(h1c->state < H1_CS_RUNNING) && /* IDLE, EMBRYONIC or UPGRADING */
- !(h1c->flags & (H1C_F_IN_SALLOC|H1C_F_ERROR))) { /* No allocation failure on the stream rxbuf and no ERROR on the H1C */
+ !(h1c->flags & (H1C_F_IN_SALLOC|H1C_F_ABRT_PENDING))) { /* No allocation failure on the stream rxbuf and no ERROR on the H1C */
struct h1s *h1s = h1c->h1s;
struct buffer *buf;
size_t count;
@@ -2932,11 +2946,8 @@
if (b_isteq(&h1c->ibuf, 0, b_data(&h1c->ibuf), ist(H2_CONN_PREFACE)) > 0) {
h1c->flags |= H1C_F_UPG_H2C;
if (h1c->state == H1_CS_UPGRADING) {
- /* Force the REOS here to be sure to release the SC.
- Here ATTACHED implies !READY, and h1s defined
- */
BUG_ON(!h1s);
- h1s->flags |= H1S_F_REOS;
+ se_fl_set(h1s->sd, SE_FL_EOS); /* Set EOS here to release the SC */
}
TRACE_STATE("release h1c to perform H2 upgrade ", H1_EV_RX_DATA|H1_EV_H1C_WAKE);
goto release;
@@ -2948,7 +2959,9 @@
h1s = h1c_frt_stream_new(h1c, NULL, h1c->conn->owner);
if (!h1s) {
b_reset(&h1c->ibuf);
- h1c->flags |= H1C_F_ERROR;
+ h1_handle_internal_err(h1c);
+ h1c->flags &= ~H1C_F_WAIT_NEXT_REQ;
+ TRACE_ERROR("alloc error", H1_EV_H1C_WAKE|H1_EV_H1C_ERR);
goto no_parsing;
}
}
@@ -2972,17 +2985,17 @@
no_parsing:
if (h1s->flags & H1S_F_INTERNAL_ERROR) {
h1_handle_internal_err(h1c);
- h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ERROR;
+ h1c->flags &= ~H1C_F_WAIT_NEXT_REQ;
TRACE_ERROR("internal error detected", H1_EV_H1C_WAKE|H1_EV_H1C_ERR);
}
else if (h1s->flags & H1S_F_NOT_IMPL_ERROR) {
h1_handle_not_impl_err(h1c);
- h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ERROR;
+ h1c->flags &= ~H1C_F_WAIT_NEXT_REQ;
TRACE_ERROR("not-implemented error detected", H1_EV_H1C_WAKE|H1_EV_H1C_ERR);
}
else if (h1s->flags & H1S_F_PARSING_ERROR || se_fl_test(h1s->sd, SE_FL_ERROR)) {
h1_handle_parsing_error(h1c);
- h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ERROR;
+ h1c->flags &= ~H1C_F_WAIT_NEXT_REQ;
TRACE_ERROR("parsing error detected", H1_EV_H1C_WAKE|H1_EV_H1C_ERR);
}
else if (h1c->state < H1_CS_RUNNING) {
@@ -2990,24 +3003,23 @@
h1c->conn->xprt->subscribe(h1c->conn, h1c->conn->xprt_ctx, SUB_RETRY_RECV, &h1c->wait_event);
}
}
+
h1_send(h1c);
/* H1 connection must be released ASAP if:
- * - an error occurred on the connection or the H1C or
+ * - an error occurred on the H1C or
* - a read0 was received or
* - a silent shutdown was emitted and all outgoing data sent
*/
- if ((conn->flags & CO_FL_ERROR) ||
- conn_xprt_read0_pending(conn) ||
- (h1c->flags & H1C_F_ERROR) ||
+ if ((h1c->flags & (H1C_F_EOS|H1C_F_ERROR|H1C_F_ABRT_PENDING)) ||
(h1c->state >= H1_CS_CLOSING && (h1c->flags & H1C_F_SILENT_SHUT) && !b_data(&h1c->obuf))) {
if (h1c->state != H1_CS_RUNNING) {
- /* No stream connector or not ready */
+ /* No stream connector or upgrading */
if (h1c->state < H1_CS_RUNNING && !(h1c->flags & (H1C_F_IS_BACK|H1C_F_ERROR))) {
- /* shutdown for reads and error on the frontend connection: Send an error */
+ /* shutdown for reads and no error on the frontend connection: Send an error */
if (h1_handle_parsing_error(h1c))
h1_send(h1c);
- h1c->flags = (h1c->flags & ~H1C_F_WAIT_NEXT_REQ) | H1C_F_ERROR;
+ h1c->flags &= ~H1C_F_WAIT_NEXT_REQ;
}
else if (h1c->flags & H1C_F_ABRT_PENDING) {
/* Handle pending error, if any (only possible on frontend connection) */
@@ -3031,17 +3043,14 @@
struct h1s *h1s = h1c->h1s;
/* Here there is still a H1 stream with a stream connector.
- * Report the connection state at the stream level
+ * Report an error at the stream level and wake up the stream
*/
BUG_ON(!h1s);
- if (conn_xprt_read0_pending(conn)) {
- h1s->flags |= H1S_F_REOS;
- TRACE_STATE("read0 on connection", H1_EV_H1C_RECV, conn, h1s);
+ if (h1c->flags & (H1C_F_ERR_PENDING|H1C_F_ERROR)) {
+ se_fl_set_error(h1s->sd);
+ TRACE_STATE("report (ERR_PENDING|ERROR) to SE", H1_EV_H1C_RECV, conn, h1s);
}
- if ((h1c->flags & H1C_F_ERROR) || ((conn->flags & CO_FL_ERROR) &&
- (se_fl_test(h1s->sd, SE_FL_EOI | SE_FL_EOS) || !b_data(&h1c->ibuf))))
- se_fl_set(h1s->sd, SE_FL_ERROR);
TRACE_POINT(H1_EV_STRM_WAKE, h1c->conn, h1s);
h1_alert(h1s);
}
@@ -3098,10 +3107,14 @@
* the attached SC first */
BUG_ON(!h1s);
- if (conn_xprt_read0_pending(conn) || (h1s->flags & H1S_F_REOS))
+ if (h1c->flags & H1C_F_EOS) {
se_fl_set(h1s->sd, SE_FL_EOS);
- if ((h1c->flags & H1C_F_ERROR) || (conn->flags & CO_FL_ERROR))
- se_fl_set(h1s->sd, SE_FL_ERROR);
+ TRACE_STATE("report EOS to SE", H1_EV_H1C_RECV, conn, h1s);
+ }
+ if (h1c->flags & (H1C_F_ERR_PENDING|H1C_F_ERROR)) {
+ se_fl_set_error(h1s->sd);
+ TRACE_STATE("report (ERR_PENDING|ERROR) to SE", H1_EV_H1C_RECV, conn, h1s);
+ }
h1_alert(h1s);
TRACE_DEVEL("waiting to release the SC before releasing the connection", H1_EV_H1C_WAKE);
}
@@ -3302,7 +3315,7 @@
h1c->flags &= ~H1C_F_SILENT_SHUT;
TRACE_ENTER(H1_EV_STRM_NEW, conn);
- if (h1c->flags & H1C_F_ERROR) {
+ if (h1c->flags & (H1C_F_ERR_PENDING|H1C_F_ERROR)) {
TRACE_ERROR("h1c on error", H1_EV_STRM_NEW|H1_EV_STRM_END|H1_EV_STRM_ERR, conn);
goto err;
}
@@ -3437,7 +3450,6 @@
/* We don't want to close right now unless the connection is in error or shut down for writes */
if ((h1c->flags & H1C_F_ERROR) ||
(h1c->state == H1_CS_CLOSED) ||
- (h1c->conn->flags & (CO_FL_ERROR|CO_FL_SOCK_WR_SH)) ||
(h1c->state == H1_CS_CLOSING && !b_data(&h1c->obuf)) ||
!h1c->conn->owner) {
TRACE_DEVEL("killing dead connection", H1_EV_STRM_END, h1c->conn);
@@ -3492,12 +3504,12 @@
TRACE_STATE("stream wants to kill the connection", H1_EV_STRM_SHUT, h1c->conn, h1s);
goto do_shutw;
}
- if (h1c->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH)) {
- TRACE_STATE("shutdown on connection (error|rd_sh|wr_sh)", H1_EV_STRM_SHUT, h1c->conn, h1s);
+ if (h1c->state == H1_CS_CLOSING || (h1c->flags & (H1C_F_EOS|H1C_F_ERR_PENDING|H1C_F_ERROR))) {
+ TRACE_STATE("shutdown on connection (EOS || CLOSING || ERROR)", H1_EV_STRM_SHUT, h1c->conn, h1s);
goto do_shutw;
}
- if (h1c->state == H1_CS_UPGRADING && !(h1c->flags & H1C_F_ERROR)) {
+ if (h1c->state == H1_CS_UPGRADING) {
TRACE_STATE("keep connection alive (UPGRADING)", H1_EV_STRM_SHUT, h1c->conn, h1s);
goto end;
}
@@ -3670,8 +3682,8 @@
return 0;
}
- if (h1c->flags & H1C_F_ERROR) {
- se_fl_set(h1s->sd, SE_FL_ERROR);
+ if (h1c->flags & (H1C_F_ERR_PENDING|H1C_F_ERROR)) {
+ se_fl_set_error(h1s->sd);
TRACE_ERROR("H1C on error, leaving in error", H1_EV_STRM_SEND|H1_EV_H1C_ERR|H1_EV_H1S_ERR|H1_EV_STRM_ERR, h1c->conn, h1s);
return 0;
}
@@ -3707,9 +3719,10 @@
break;
}
- if ((h1c->flags & H1C_F_ERROR) || ((h1c->conn->flags & CO_FL_ERROR) &&
- (se_fl_test(h1s->sd, SE_FL_EOI | SE_FL_EOS) || !b_data(&h1c->ibuf)))) {
- se_fl_set(h1s->sd, SE_FL_ERROR);
+ if (h1c->flags & (H1C_F_ERR_PENDING|H1C_F_ERROR)) {
+ // FIXME: following test was removed :
+ // ((h1c->conn->flags & CO_FL_ERROR) && (se_fl_test(h1s->sd, SE_FL_EOI | SE_FL_EOS) || !b_data(&h1c->ibuf)))) {
+ se_fl_set_error(h1s->sd);
TRACE_ERROR("reporting error to the app-layer stream", H1_EV_STRM_SEND|H1_EV_H1S_ERR|H1_EV_STRM_ERR, h1c->conn, h1s);
}
@@ -3753,7 +3766,6 @@
if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN)) {
if (ret > h1m->curr_len) {
h1s->flags |= H1S_F_PARSING_ERROR;
- h1c->flags |= H1C_F_ERROR;
se_fl_set(h1s->sd, SE_FL_ERROR);
TRACE_ERROR("too much payload, more than announced",
H1_EV_RX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
@@ -3772,8 +3784,8 @@
end:
if (conn_xprt_read0_pending(h1c->conn)) {
- h1s->flags |= H1S_F_REOS;
- h1c->flags &= ~H1C_F_WANT_SPLICE;
+ se_fl_set(h1s->sd, SE_FL_EOS);
+ h1c->flags = (h1c->flags & ~H1C_F_WANT_SPLICE) | H1C_F_EOS;
TRACE_STATE("Allow xprt rcv_buf on read0", H1_EV_STRM_RECV, h1c->conn, h1s);
}
if (h1c->conn->flags & CO_FL_ERROR) {
@@ -3816,7 +3828,6 @@
if (h1m->state == H1_MSG_DATA && (h1m->flags & H1_MF_CLEN)) {
if (ret > h1m->curr_len) {
h1s->flags |= H1S_F_PROCESSING_ERROR;
- h1c->flags |= H1C_F_ERROR;
se_fl_set(h1s->sd, SE_FL_ERROR);
TRACE_ERROR("too much payload, more than announced",
H1_EV_TX_DATA|H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
@@ -3833,8 +3844,10 @@
end:
if (h1c->conn->flags & CO_FL_ERROR) {
- se_fl_set(h1s->sd, SE_FL_ERROR);
- h1c->flags = (h1c->flags & ~H1C_F_WANT_SPLICE) | H1C_F_ERROR;
+ h1c->flags = (h1c->flags & ~H1C_F_WANT_SPLICE) | H1C_F_ERR_PENDING;
+ if (h1c->flags & H1C_F_EOS)
+ h1c->flags |= H1C_F_ERROR;
+ se_fl_set_error(h1s->sd);
TRACE_DEVEL("connection error", H1_EV_STRM_ERR|H1_EV_H1C_ERR|H1_EV_H1S_ERR, h1c->conn, h1s);
}