MEDIUM: stream-int: unconditionally call si_chk_rcv() in update and notify
For a long time, stream_int_update() and stream_int_notify() used to only
conditionally call si_chk_rcv() based on state change detection. This
detection is not reliable and quite complex. With the new blocked flags
that si_chk_rcv() checks, it's much more reliable to always call the
function to take into account recent changes,and let it decide if it needs
to wake something up or not.
This also removes the calls to si_chk_rcv() that were performed in
si_update_both() since these ones are systematically performed in
stream_int_update() after updating the Rx flags.
diff --git a/src/stream_interface.c b/src/stream_interface.c
index df2adb3..80412e7 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -448,6 +448,7 @@
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
+ struct stream_interface *sio = si_opposite(si);
/* process consumer side */
if (channel_is_empty(oc)) {
@@ -479,11 +480,10 @@
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
- if ((si_opposite(si)->flags & SI_FL_RXBLK_ROOM) &&
+ if ((sio->flags & SI_FL_RXBLK_ROOM) &&
((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL ||
channel_is_empty(oc))) {
- si_opposite(si)->flags &= ~SI_FL_RXBLK_ROOM;
- si_chk_rcv(si_opposite(si));
+ sio->flags &= ~SI_FL_RXBLK_ROOM;
}
/* Notify the other side when we've injected data into the IC that
@@ -498,7 +498,7 @@
* an HTTP parser might need more data to complete the parsing.
*/
if (!channel_is_empty(ic) &&
- (si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
+ (sio->flags & SI_FL_WAIT_DATA) &&
(!(ic->flags & CF_EXPECT_MORE) || c_full(ic) || ci_data(ic) == 0 || ic->pipe)) {
int new_len, last_len;
@@ -506,7 +506,7 @@
if (ic->pipe)
last_len += ic->pipe->data;
- si_chk_snd(si_opposite(si));
+ si_chk_snd(sio);
new_len = co_data(ic);
if (ic->pipe)
@@ -515,12 +515,13 @@
/* check if the consumer has freed some space either in the
* buffer or in the pipe.
*/
- if (new_len < last_len) {
+ if (new_len < last_len)
si->flags &= ~SI_FL_RXBLK_ROOM;
- si_chk_rcv(si);
- }
}
+ si_chk_rcv(si);
+ si_chk_rcv(sio);
+
if (si->flags & SI_FL_RXBLK_ROOM) {
ic->rex = TICK_ETERNITY;
}
@@ -536,14 +537,14 @@
(si->state != SI_ST_EST && si->state != SI_ST_CON) ||
(si->flags & SI_FL_ERR) ||
((ic->flags & CF_READ_PARTIAL) &&
- (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
+ (!ic->to_forward || sio->state != SI_ST_EST)) ||
/* changes on the consumption side */
(oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
((oc->flags & CF_WRITE_ACTIVITY) &&
((oc->flags & CF_SHUTW) ||
((oc->flags & CF_WAKE_WRITE) &&
- (si_opposite(si)->state != SI_ST_EST ||
+ (sio->state != SI_ST_EST ||
(channel_is_empty(oc) && !oc->to_forward)))))) {
task_wakeup(si_task(si), TASK_WOKEN_IO);
}
@@ -762,10 +763,10 @@
* have updated it if there has been a completed I/O.
*/
si->flags &= ~SI_FL_RXBLK_ROOM;
- si_chk_rcv(si);
if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
}
+ si_chk_rcv(si);
}
else
si_rx_shut_blk(si);
@@ -848,10 +849,6 @@
si_f->flags &= ~SI_FL_RXBLK_ROOM;
}
- /* it's time to try to receive */
- si_chk_rcv(si_f);
- si_chk_rcv(si_b);
-
/* let's recompute both sides states */
if (si_f->state == SI_ST_EST)
stream_int_update(si_f);