MEDIUM: stream: rearrange the events to remove the loop
The "goto redo" at the end of process_stream() to make the states converge
is still a big source of problems and mostly stems from the very late call
to the send() functions, whose results need to be considered, while it's
being done in si_update_both() when leaving.
This patch extracts the si_sync_send() calls from si_update_both(), and
places them at the relevant places in process_stream(), which are just
after the amount of data to forward is updated and before the shutw()
calls (which were also moved). The stream-interface resynchronization
needs to go slightly upper to take into account the transition from CON
to RDY that will happen consecutive to some successful send(), and that's
all.
By doing so we can now get rid of this loop and have si_update_both()
called only to update the stream interface and channel when leaving the
function, as it was initially designed to work.
It is worth noting that a number of the remaining conditions to perform
a goto resync_XXX still seem suboptimal and would benefit from being
refined to perform les resynchronization. But what matters at this stage
is that the code remains valid and efficient.
diff --git a/src/stream.c b/src/stream.c
index 22861f8..f92a008 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -1843,7 +1843,6 @@
si_sync_recv(si_f);
si_sync_recv(si_b);
-redo:
rate = update_freq_ctr(&s->call_rate, 1);
if (rate >= 100000 && s->call_rate.prev_ctr) { // make sure to wait at least a full second
stream_dump_and_crash(&s->obj_type, read_freq_ctr(&s->call_rate));
@@ -1929,6 +1928,7 @@
}
}
+ resync_stream_interface:
/* below we may emit error messages so we have to ensure that we have
* our buffers properly allocated.
*/
@@ -2014,7 +2014,6 @@
rp_cons_last = si_f->state;
rp_prod_last = si_b->state;
- resync_stream_interface:
/* Check for connection closure */
DPRINTF(stderr,
@@ -2385,40 +2384,6 @@
/* reflect what the L7 analysers have seen last */
rqf_last = req->flags;
- /*
- * Now forward all shutdown requests between both sides of the buffer
- */
-
- /* first, let's check if the request buffer needs to shutdown(write), which may
- * happen either because the input is closed or because we want to force a close
- * once the server has begun to respond. If a half-closed timeout is set, we adjust
- * the other side's timeout as well.
- */
- if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CLOSE|CF_SHUTR)) ==
- (CF_AUTO_CLOSE|CF_SHUTR))) {
- channel_shutw_now(req);
- }
-
- /* shutdown(write) pending */
- if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW &&
- channel_is_empty(req))) {
- if (req->flags & CF_READ_ERROR)
- si_b->flags |= SI_FL_NOLINGER;
- si_shutw(si_b);
- }
-
- /* shutdown(write) done on server side, we must stop the client too */
- if (unlikely((req->flags & (CF_SHUTW|CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTW &&
- !req->analysers))
- channel_shutr_now(req);
-
- /* shutdown(read) pending */
- if (unlikely((req->flags & (CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTR_NOW)) {
- if (si_f->flags & SI_FL_NOHALF)
- si_f->flags |= SI_FL_NOLINGER;
- si_shutr(si_f);
- }
-
/* it's possible that an upper layer has requested a connection setup or abort.
* There are 2 situations where we decide to establish a new connection :
* - there are data scheduled for emission in the buffer
@@ -2490,8 +2455,48 @@
} while (si_b->state == SI_ST_ASS);
}
+ /* Let's see if we can send the pending request now */
+ si_sync_send(si_b);
+
+ /*
+ * Now forward all shutdown requests between both sides of the request buffer
+ */
+
+ /* first, let's check if the request buffer needs to shutdown(write), which may
+ * happen either because the input is closed or because we want to force a close
+ * once the server has begun to respond. If a half-closed timeout is set, we adjust
+ * the other side's timeout as well.
+ */
+ if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CLOSE|CF_SHUTR)) ==
+ (CF_AUTO_CLOSE|CF_SHUTR))) {
+ channel_shutw_now(req);
+ }
+
+ /* shutdown(write) pending */
+ if (unlikely((req->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW &&
+ channel_is_empty(req))) {
+ if (req->flags & CF_READ_ERROR)
+ si_b->flags |= SI_FL_NOLINGER;
+ si_shutw(si_b);
+ }
+
+ /* shutdown(write) done on server side, we must stop the client too */
+ if (unlikely((req->flags & (CF_SHUTW|CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTW &&
+ !req->analysers))
+ channel_shutr_now(req);
+
+ /* shutdown(read) pending */
+ if (unlikely((req->flags & (CF_SHUTR|CF_SHUTR_NOW)) == CF_SHUTR_NOW)) {
+ if (si_f->flags & SI_FL_NOHALF)
+ si_f->flags |= SI_FL_NOLINGER;
+ si_shutr(si_f);
+ }
+
/* Benchmarks have shown that it's optimal to do a full resync now */
- if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS)
+ if (si_f->state == SI_ST_DIS ||
+ si_state_in(si_b->state, SI_SB_RDY|SI_SB_DIS) ||
+ (si_f->flags & SI_FL_ERR && si_f->state != SI_ST_CLO) ||
+ (si_b->flags & SI_FL_ERR && si_b->state != SI_ST_CLO))
goto resync_stream_interface;
/* otherwise we want to check if we need to resync the req buffer or not */
@@ -2583,6 +2588,9 @@
/* reflect what the L7 analysers have seen last */
rpf_last = res->flags;
+ /* Let's see if we can send the pending response now */
+ si_sync_send(si_f);
+
/*
* Now forward all shutdown requests between both sides of the buffer
*/
@@ -2615,7 +2623,10 @@
si_shutr(si_b);
}
- if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS)
+ if (si_f->state == SI_ST_DIS ||
+ si_state_in(si_b->state, SI_SB_RDY|SI_SB_DIS) ||
+ (si_f->flags & SI_FL_ERR && si_f->state != SI_ST_CLO) ||
+ (si_b->flags & SI_FL_ERR && si_b->state != SI_ST_CLO))
goto resync_stream_interface;
if (req->flags != rqf_last)
@@ -2624,6 +2635,9 @@
if ((res->flags ^ rpf_last) & CF_MASK_STATIC)
goto resync_response;
+ if (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER)
+ goto resync_request;
+
/* we're interested in getting wakeups again */
si_f->flags &= ~SI_FL_DONT_WAKE;
si_b->flags &= ~SI_FL_DONT_WAKE;
@@ -2656,33 +2670,11 @@
}
if (likely((si_f->state != SI_ST_CLO) || !si_state_in(si_b->state, SI_SB_INI|SI_SB_CLO))) {
- enum si_state si_b_prev_state, si_f_prev_state;
-
- si_f_prev_state = si_f->prev_state;
- si_b_prev_state = si_b->prev_state;
-
if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED))
stream_process_counters(s);
- /* take the exact same flags si_update_both() will have before
- * trying to update again.
- */
- rqf_last = req->flags & ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
- rpf_last = res->flags & ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
-
si_update_both(si_f, si_b);
- /* changes requiring immediate attention are processed right now */
- if (si_f->state == SI_ST_DIS || si_f->state != si_f_prev_state ||
- si_b->state == SI_ST_DIS || si_b->state != si_b_prev_state ||
- ((si_f->flags & SI_FL_ERR) && si_f->state != SI_ST_CLO) ||
- ((si_b->flags & SI_FL_ERR) && si_b->state != SI_ST_CLO))
- goto redo;
-
- /* I/O events (mostly CF_WRITE_PARTIAL) are aggregated with other I/Os */
- if (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER)
- task_wakeup(s->task, TASK_WOKEN_IO);
-
/* Trick: if a request is being waiting for the server to respond,
* and if we know the server can timeout, we don't want the timeout
* to expire on the client side first, but we're still interested
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 3bb3248..5a85b3b 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -918,11 +918,12 @@
si_rx_room_rdy(si_opposite(si));
}
-/* updates both stream ints of a same stream at once */
/* Updates at once the channel flags, and timers of both stream interfaces of a
* same stream, to complete the work after the analysers, then updates the data
* layer below. This will ensure that any synchronous update performed at the
* data layer will be reflected in the channel flags and/or stream-interface.
+ * Note that this does not change the stream interface's current state, though
+ * it updates the previous state to the current one.
*/
void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b)
{
@@ -935,9 +936,6 @@
si_f->prev_state = si_f->state;
si_b->prev_state = si_b->state;
- si_sync_send(si_f);
- si_sync_send(si_b);
-
/* let's recompute both sides states */
if (si_state_in(si_f->state, SI_SB_RDY|SI_SB_EST))
si_update(si_f);