MEDIUM: mux-h1: Revamp the way subscriptions are handled.
Don't always wake the tasklets subscribed to recv or send events as soon as
we had any I/O event, and don't call the wake() method if there were no
subscription, instead, wake the recv tasklet if we received data in h2_recv(),
and wake the send tasklet if we were able to send data in h2_send(), and the
buffer is not full anymore.
Only call the data_cb->wake() method if we get an error/a read 0, just in
case the stream was not subscribed to receive events.
diff --git a/src/mux_h1.c b/src/mux_h1.c
index ff54adc..6b00320 100644
--- a/src/mux_h1.c
+++ b/src/mux_h1.c
@@ -1463,14 +1463,16 @@
{
struct connection *conn = h1c->conn;
struct h1s *h1s = h1c->h1s;
- size_t ret, max;
+ size_t ret = 0, max;
int rcvd = 0;
if (h1c->wait_event.wait_reason & SUB_CAN_RECV)
return 0;
- if (!h1_recv_allowed(h1c))
+ if (!h1_recv_allowed(h1c)) {
+ rcvd = 1;
goto end;
+ }
if (h1s && (h1s->flags & (H1S_F_BUF_FLUSH|H1S_F_SPLICED_DATA))) {
rcvd = 1;
@@ -1482,7 +1484,6 @@
goto end;
}
- ret = 0;
max = b_room(&h1c->ibuf);
if (max) {
h1c->flags &= ~H1C_F_IN_FULL;
@@ -1503,6 +1504,13 @@
rcvd = 1;
end:
+ if ((ret > 0 || (conn->flags & CO_FL_ERROR) ||
+ conn_xprt_read0_pending(conn)) && h1s && h1s->recv_wait) {
+ h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
+ tasklet_wakeup(h1s->recv_wait->task);
+ h1s->recv_wait = NULL;
+
+ }
if (!b_data(&h1c->ibuf))
h1_release_buf(h1c, &h1c->ibuf);
else if (b_full(&h1c->ibuf))
@@ -1544,6 +1552,13 @@
}
end:
+ if (!(h1c->flags & H1C_F_OUT_FULL) && h1c->h1s && h1c->h1s->send_wait) {
+ struct h1s *h1s = h1c->h1s;
+
+ h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
+ tasklet_wakeup(h1s->send_wait->task);
+ h1s->send_wait = NULL;
+ }
/* We're done, no more to send */
if (!b_data(&h1c->obuf)) {
h1_release_buf(h1c, &h1c->obuf);
@@ -1557,38 +1572,6 @@
return sent;
}
-
-static void h1_wake_stream(struct h1c *h1c)
-{
- struct connection *conn = h1c->conn;
- struct h1s *h1s = h1c->h1s;
- uint32_t flags = 0;
- int dont_wake = 0;
-
- if (!h1s || !h1s->cs)
- return;
-
- if ((h1c->flags & H1C_F_CS_ERROR) || (conn->flags & CO_FL_ERROR))
- flags |= CS_FL_ERROR;
- if (conn_xprt_read0_pending(conn))
- flags |= CS_FL_REOS;
-
- h1s->cs->flags |= flags;
- if (h1s->recv_wait) {
- h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
- tasklet_wakeup(h1s->recv_wait->task);
- h1s->recv_wait = NULL;
- dont_wake = 1;
- }
- if (h1s->send_wait) {
- h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
- tasklet_wakeup(h1s->send_wait->task);
- h1s->send_wait = NULL;
- dont_wake = 1;
- }
- if (!dont_wake && h1s->cs->data_cb->wake)
- h1s->cs->data_cb->wake(h1s->cs);
-}
/* callback called on any event by the connection handler.
* It applies changes and returns zero, or < 0 if it wants immediate
@@ -1623,7 +1606,18 @@
if (b_data(&h1c->ibuf) && h1s->csinfo.t_idle == -1)
h1s->csinfo.t_idle = tv_ms_elapsed(&h1s->csinfo.tv_create, &now) - h1s->csinfo.t_handshake;
- h1_wake_stream(h1c);
+ if (!b_data(&h1c->ibuf) && h1s && h1s->cs && h1s->cs->data_cb->wake &&
+ (conn_xprt_read0_pending(conn) || h1c->flags & H1C_F_CS_ERROR ||
+ conn->flags & CO_FL_ERROR)) {
+ int flags = 0;
+
+ if (h1c->flags & H1C_F_CS_ERROR || conn->flags & CO_FL_ERROR)
+ flags |= CS_FL_ERROR;
+ if (conn_xprt_read0_pending(conn))
+ flags |= CS_FL_REOS;
+ h1s->cs->flags |= flags;
+ h1s->cs->data_cb->wake(h1s->cs);
+ }
end:
return 0;
@@ -1650,9 +1644,17 @@
static int h1_wake(struct connection *conn)
{
struct h1c *h1c = conn->mux_ctx;
+ int ret;
h1_send(h1c);
- return h1_process(h1c);
+ ret = h1_process(h1c);
+ if (ret == 0) {
+ struct h1s *h1s = h1c->h1s;
+
+ if (h1s && h1s->cs && h1s->cs->data_cb->wake)
+ ret = h1s->cs->data_cb->wake(h1s->cs);
+ }
+ return ret;
}
/*******************************************/
@@ -1921,10 +1923,6 @@
*/
if (!b_data(buf))
total = count;
- else if (total != count) {
- if (!(h1c->wait_event.wait_reason & SUB_CAN_SEND))
- cs->conn->xprt->subscribe(cs->conn, SUB_CAN_SEND, &h1c->wait_event);
- }
return total;
}