MEDIUM: connections/mux: Add a recv and a send+recv wait list.
For struct connection, struct conn_stream, and for the h2 mux, add 2 new
lists, one that handles waiters for recv, and one that handles waiters for
recv and send. That way we can ask to subscribe for either recv or send.
diff --git a/include/proto/connection.h b/include/proto/connection.h
index ea6b17b..c7f2561 100644
--- a/include/proto/connection.h
+++ b/include/proto/connection.h
@@ -602,6 +602,8 @@
cs->flags = CS_FL_NONE;
LIST_INIT(&cs->wait_list.list);
LIST_INIT(&cs->send_wait_list);
+ LIST_INIT(&cs->recv_wait_list);
+ LIST_INIT(&cs->sendrecv_wait_list);
cs->conn = conn;
cs->wait_list.wait_reason = 0;
}
@@ -629,6 +631,8 @@
conn->proxy_netns = NULL;
LIST_INIT(&conn->list);
LIST_INIT(&conn->send_wait_list);
+ LIST_INIT(&conn->recv_wait_list);
+ LIST_INIT(&conn->sendrecv_wait_list);
}
/* sets <owner> as the connection's owner */
@@ -711,8 +715,19 @@
/* Releases a connection previously allocated by conn_new() */
static inline void conn_free(struct connection *conn)
{
- LIST_DEL(&conn->send_wait_list);
- LIST_INIT(&conn->send_wait_list);
+ struct wait_list *sw, *sw_back;
+ list_for_each_entry_safe(sw, sw_back, &conn->recv_wait_list, list) {
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ }
+ list_for_each_entry_safe(sw, sw_back, &conn->send_wait_list, list) {
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ }
+ list_for_each_entry_safe(sw, sw_back, &conn->sendrecv_wait_list, list) {
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ }
pool_free(pool_head_connection, conn);
}
diff --git a/include/types/connection.h b/include/types/connection.h
index 9a1ba96..421df3c 100644
--- a/include/types/connection.h
+++ b/include/types/connection.h
@@ -375,6 +375,8 @@
struct connection *conn; /* xprt-level connection */
struct wait_list wait_list; /* We're in a wait list for send */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
+ struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */
+ struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */
void *data; /* pointer to upper layer's entity (eg: stream interface) */
const struct data_cb *data_cb; /* data layer callbacks. Must be set before xprt->init() */
void *ctx; /* mux-specific context */
@@ -406,6 +408,8 @@
/* second cache line */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
+ struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */
+ struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */
struct list list; /* attach point to various connection lists (idle, ...) */
int xprt_st; /* transport layer state, initialized to zero */
int tmp_early_data; /* 1st byte of early data, if any */
diff --git a/src/connection.c b/src/connection.c
index e303f2c..005e0e7 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -137,6 +137,15 @@
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
+ while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) {
+ struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
+ struct wait_list *, list);
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ LIST_ADDQ(&conn->recv_wait_list, &sw->list);
+ sw->wait_reason &= ~SUB_CAN_SEND;
+ tasklet_wakeup(sw->task);
+ }
}
/* The data transfer starts here and stops on error and handshakes. Note
@@ -334,11 +343,34 @@
struct wait_list *sw;
switch (event_type) {
+ case SUB_CAN_RECV:
+ sw = param;
+ if (!(sw->wait_reason & SUB_CAN_RECV)) {
+ sw->wait_reason |= SUB_CAN_RECV;
+ /* If we're already subscribed for send(), move it
+ * to the send+recv list
+ */
+ if (sw->wait_reason & SUB_CAN_SEND) {
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list);
+ } else
+ LIST_ADDQ(&conn->recv_wait_list, &sw->list);
+ }
+ return 0;
case SUB_CAN_SEND:
sw = param;
if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
- LIST_ADDQ(&conn->send_wait_list, &sw->list);
+ /* If we're already subscribed for recv(), move it
+ * to the send+recv list
+ */
+ if (sw->wait_reason & SUB_CAN_RECV) {
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ LIST_ADDQ(&conn->sendrecv_wait_list, &sw->list);
+ } else
+ LIST_ADDQ(&conn->send_wait_list, &sw->list);
}
return 0;
default:
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 946288d..3c873e9 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -121,6 +121,8 @@
struct list fctl_list; /* list of streams blocked by connection's fctl */
struct buffer_wait buf_wait; /* wait list for buffer allocations */
struct list send_wait_list; /* list of tasks to wake when we're ready to send */
+ struct list recv_wait_list; /* list of tasks to wake when we're ready to recv */
+ struct list sendrecv_wait_list; /* list of tasks to wake when we're ready to either send or recv */
struct wait_list wait_list; /* We're in a wait list, to send */
};
@@ -406,6 +408,8 @@
task_queue(t);
conn_xprt_want_recv(conn);
LIST_INIT(&h2c->send_wait_list);
+ LIST_INIT(&h2c->recv_wait_list);
+ LIST_INIT(&h2c->sendrecv_wait_list);
LIST_INIT(&h2c->wait_list.list);
/* mux->wake will be called soon to complete the operation */
@@ -2333,6 +2337,16 @@
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
+ while (!(LIST_ISEMPTY(&h2c->sendrecv_wait_list))) {
+ struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n,
+ struct wait_list *, list);
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
+ sw->wait_reason &= ~SUB_CAN_SEND;
+ tasklet_wakeup(sw->task);
+ }
+
}
/* We're done, no more to send */
@@ -3456,14 +3470,37 @@
{
struct wait_list *sw;
struct h2s *h2s = cs->ctx;
+ struct h2c *h2c = h2s->h2c;
switch (event_type) {
+ case SUB_CAN_RECV:
+ sw = param;
+ if (!(sw->wait_reason & SUB_CAN_RECV)) {
+ sw->wait_reason |= SUB_CAN_RECV;
+ /* If we're already subscribed for send(), move it
+ * to the send+recv list
+ */
+ if (sw->wait_reason & SUB_CAN_SEND) {
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
+ } else
+ LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
+ }
+ return 0;
case SUB_CAN_SEND:
sw = param;
- if (LIST_ISEMPTY(&h2s->list) &&
- !(sw->wait_reason & SUB_CAN_SEND)) {
- LIST_ADDQ(&h2s->h2c->send_wait_list, &sw->list);
+ if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
+ /* If we're already subscribed for recv(), move it
+ * to the send+recv list
+ */
+ if (sw->wait_reason & SUB_CAN_RECV) {
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
+ } else
+ LIST_ADDQ(&h2c->send_wait_list, &sw->list);
}
return 0;
default:
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 72fec21..cfa613a 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -752,6 +752,16 @@
sw->wait_reason &= ~SUB_CAN_SEND;
tasklet_wakeup(sw->task);
}
+ while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) {
+ struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n,
+ struct wait_list *, list);
+ LIST_DEL(&sw->list);
+ LIST_INIT(&sw->list);
+ LIST_ADDQ(&cs->recv_wait_list, &sw->list);
+ sw->wait_reason &= ~SUB_CAN_SEND;
+ tasklet_wakeup(sw->task);
+ }
+
}
return NULL;
}