BUG/MEDIUM: h2: Try to be fair when sending data.
On the send path, try to be fair, and make sure the first to attempt to
send data will actually be the first to send data when it's possible (ie
when the mux' buffer is not full anymore).
To do so, use a separate list element for the sending_list, and only remove
the h2s from the send_list/fctl_list if we successfully sent data. If we did
not, we'll keep our place in the list, and will be able to try again next time.
This should be backported to 1.9.
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 369e78d..d91cd08 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -199,6 +199,7 @@
struct wait_event *recv_wait; /* recv wait_event the conn_stream associated is waiting on (via h2_subscribe) */
struct wait_event *send_wait; /* send wait_event the conn_stream associated is waiting on (via h2_subscribe) */
struct list list; /* To be used when adding in h2c->send_list or h2c->fctl_lsit */
+ struct list sending_list; /* To be used when adding in h2c->sending_list */
};
/* descriptor for an h2 frame header */
@@ -861,8 +862,8 @@
* reference left would be in the h2c send_list/fctl_list, and if
* we're in it, we're getting out anyway
*/
- LIST_DEL(&h2s->list);
- LIST_INIT(&h2s->list);
+ LIST_DEL_INIT(&h2s->list);
+ LIST_DEL_INIT(&h2s->sending_list);
tasklet_free(h2s->wait_event.task);
pool_free(pool_head_h2s, h2s);
}
@@ -893,6 +894,7 @@
h2s->wait_event.handle = NULL;
h2s->wait_event.events = 0;
LIST_INIT(&h2s->list);
+ LIST_INIT(&h2s->sending_list);
h2s->h2c = h2c;
h2s->cs = NULL;
h2s->mws = h2c->miw;
@@ -2531,7 +2533,7 @@
*/
static int h2_process_mux(struct h2c *h2c)
{
- struct h2s *h2s, *h2s_back;
+ struct h2s *h2s;
if (unlikely(h2c->st0 < H2_CS_FRAME_H)) {
if (unlikely(h2c->st0 == H2_CS_PREFACE && (h2c->flags & H2_CF_IS_BACK))) {
@@ -2561,31 +2563,31 @@
* blocked just on this.
*/
- list_for_each_entry_safe(h2s, h2s_back, &h2c->fctl_list, list) {
+ list_for_each_entry(h2s, &h2c->fctl_list, list) {
if (h2c->mws <= 0 || h2c->flags & H2_CF_MUX_BLOCK_ANY ||
h2c->st0 >= H2_CS_ERROR)
break;
+ if (h2s->send_wait->events & SUB_CALL_UNSUBSCRIBE)
+ continue;
h2s->flags &= ~H2_SF_BLK_ANY;
h2s->send_wait->events &= ~SUB_RETRY_SEND;
h2s->send_wait->events |= SUB_CALL_UNSUBSCRIBE;
+ LIST_ADDQ(&h2c->sending_list, &h2s->sending_list);
tasklet_wakeup(h2s->send_wait->task);
- LIST_DEL(&h2s->list);
- LIST_INIT(&h2s->list);
- LIST_ADDQ(&h2c->sending_list, &h2s->list);
}
- list_for_each_entry_safe(h2s, h2s_back, &h2c->send_list, list) {
+ list_for_each_entry(h2s, &h2c->send_list, list) {
if (h2c->st0 >= H2_CS_ERROR || h2c->flags & H2_CF_MUX_BLOCK_ANY)
break;
+ if (h2s->send_wait->events & SUB_CALL_UNSUBSCRIBE)
+ continue;
h2s->flags &= ~H2_SF_BLK_ANY;
h2s->send_wait->events &= ~SUB_RETRY_SEND;
h2s->send_wait->events |= SUB_CALL_UNSUBSCRIBE;
+ LIST_ADDQ(&h2c->sending_list, &h2s->sending_list);
tasklet_wakeup(h2s->send_wait->task);
- LIST_DEL(&h2s->list);
- LIST_INIT(&h2s->list);
- LIST_ADDQ(&h2c->sending_list, &h2s->list);
}
fail:
@@ -2601,7 +2603,7 @@
}
return 1;
}
- return (h2c->mws <= 0 || LIST_ISEMPTY(&h2c->fctl_list)) && LIST_ISEMPTY(&h2c->send_list);
+ return (1);
}
@@ -2738,15 +2740,19 @@
* for us.
*/
if (!(h2c->flags & (H2_CF_MUX_MFULL | H2_CF_DEM_MROOM))) {
- while (!LIST_ISEMPTY(&h2c->send_list)) {
- struct h2s *h2s = LIST_ELEM(h2c->send_list.n,
- struct h2s *, list);
- LIST_DEL(&h2s->list);
- LIST_INIT(&h2s->list);
- LIST_ADDQ(&h2c->sending_list, &h2s->list);
+ struct h2s *h2s;
+
+ list_for_each_entry(h2s, &h2c->send_list, list) {
+ if (h2c->st0 >= H2_CS_ERROR || h2c->flags & H2_CF_MUX_BLOCK_ANY)
+ break;
+ if (h2s->send_wait->events & SUB_CALL_UNSUBSCRIBE)
+ continue;
+
+ h2s->flags &= ~H2_SF_BLK_ANY;
h2s->send_wait->events &= ~SUB_RETRY_SEND;
h2s->send_wait->events |= SUB_CALL_UNSUBSCRIBE;
tasklet_wakeup(h2s->send_wait->task);
+ LIST_ADDQ(&h2c->sending_list, &h2s->sending_list);
}
}
/* We're done, no more to send */
@@ -5260,19 +5266,11 @@
{
struct h2s *h2s, *h2s_back;
- list_for_each_entry_safe(h2s, h2s_back, &h2c->sending_list, list) {
- /* Don't unschedule the stream if the mux is just busy waiting for more data fro mthat stream */
- if (h2c->msi == h2s_id(h2s))
- continue;
- LIST_DEL(&h2s->list);
- LIST_INIT(&h2s->list);
+ list_for_each_entry_safe(h2s, h2s_back, &h2c->sending_list, sending_list) {
+ LIST_DEL_INIT(&h2s->sending_list);
task_remove_from_task_list((struct task *)h2s->send_wait->task);
h2s->send_wait->events |= SUB_RETRY_SEND;
h2s->send_wait->events &= ~SUB_CALL_UNSUBSCRIBE;
- if (h2s->flags & H2_SF_BLK_MFCTL)
- LIST_ADDQ(&h2c->fctl_list, &h2s->list);
- else
- LIST_ADDQ(&h2c->send_list, &h2s->list);
}
}
@@ -5292,11 +5290,22 @@
uint32_t bsize;
int32_t idx;
+ /* If we were not just woken because we wanted to send but couldn't,
+ * and there's somebody else that is waiting to send, do nothing,
+ * we will subscribe later and be put at the end of the list
+ */
+ LIST_DEL_INIT(&h2s->sending_list);
+ if ((!(h2s->send_wait) || !(h2s->send_wait->events & SUB_CALL_UNSUBSCRIBE)) &&
+ (!LIST_ISEMPTY(&h2s->h2c->send_list) || !LIST_ISEMPTY(&h2s->h2c->fctl_list)))
+ return 0;
+
if (h2s->send_wait) {
+ /* We want to stay in the send_list, so prepare ourself to be
+ * eventually recalled if needed, and only remove ourself from
+ * the list if we managed to send anything.
+ */
h2s->send_wait->events &= ~SUB_CALL_UNSUBSCRIBE;
- h2s->send_wait = NULL;
- LIST_DEL(&h2s->list);
- LIST_INIT(&h2s->list);
+ h2s->send_wait->events |= SUB_RETRY_SEND;
}
if (h2s->h2c->st0 < H2_CS_FRAME_H)
return 0;
@@ -5486,6 +5495,12 @@
else
cs->flags |= CS_FL_ERR_PENDING;
}
+ if (total > 0 && h2s->send_wait) {
+ /* Ok we managed to send something, leave the send_list */
+ h2s->send_wait->events &= ~SUB_RETRY_SEND;
+ h2s->send_wait = NULL;
+ LIST_DEL_INIT(&h2s->list);
+ }
return total;
}