MINOR: stream-int: use si_rx_blocked()/si_tx_blocked() to check readiness
This way we don't limit ourselves to random flags only and the code
is more readable and safer for the long term.
diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h
index 9100778..15b276c 100644
--- a/include/proto/stream_interface.h
+++ b/include/proto/stream_interface.h
@@ -389,15 +389,14 @@
}
/* This is to be used after making some room available in a channel. It will
- * return without doing anything if {SI_FL_RX_WAIT_EP,SI_FL_RXBLK_ROOM} != {0,0}.
+ * return without doing anything if the stream interface's RX path is blocked.
+ * It will automatically mark the stream interface as busy processing the end
+ * point in order to avoid useless repeated wakeups.
* It will then call ->chk_rcv() to enable receipt of new data.
*/
static inline void si_chk_rcv(struct stream_interface *si)
{
- if (si->flags & SI_FL_RXBLK_ROOM)
- return;
-
- if (si->flags & SI_FL_RX_WAIT_EP)
+ if (si_rx_blocked(si) || !si_rx_endp_ready(si))
return;
if (si->state > SI_ST_EST)
diff --git a/src/stream_interface.c b/src/stream_interface.c
index bb1dc7a..f4beccb 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -861,13 +861,13 @@
* handled at the latest moment.
*/
if (obj_type(si_f->end) == OBJ_TYPE_APPCTX &&
- (((si_f->flags & (SI_FL_RX_WAIT_EP|SI_FL_RXBLK_ROOM)) == 0) ||
- ((si_f->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET)))
+ ((si_rx_endp_ready(si_f) && !si_rx_blocked(si_f)) ||
+ (si_tx_endp_ready(si_f) && !si_tx_blocked(si_f))))
appctx_wakeup(si_appctx(si_f));
if (obj_type(si_b->end) == OBJ_TYPE_APPCTX &&
- (((si_b->flags & (SI_FL_RX_WAIT_EP|SI_FL_RXBLK_ROOM)) == 0) ||
- ((si_b->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET)))
+ ((si_rx_endp_ready(si_b) && !si_rx_blocked(si_b)) ||
+ (si_tx_endp_ready(si_b) && !si_tx_blocked(si_b))))
appctx_wakeup(si_appctx(si_b));
}
@@ -1342,11 +1342,11 @@
/* connection closed */
goto out_shutdown_r;
- /* Subscribe to receive events */
- if (!(si->flags & SI_FL_RXBLK_ROOM))
+ /* Subscribe to receive events if we're blocking on I/O */
+ if (!si_rx_blocked(si))
conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
- return (cur_read != 0 || (si->flags & SI_FL_RXBLK_ROOM));
+ return (cur_read != 0) || si_rx_blocked(si);
out_shutdown_r:
/* we received a shutdown */
@@ -1424,14 +1424,14 @@
/* update the stream-int, channels, and possibly wake the stream up */
stream_int_notify(si);
- /* stream_int_notify may pass through checksnd and released some
- * WAIT_ROOM flags. The process_stream will consider those flags
- * to wakeup the appctx but in the case the task is not in runqueue
- * we may have to wakeup the appctx immediately.
+ /* stream_int_notify may have passed through chk_snd and released some
+ * RXBLK flags. Process_stream will consider those flags to wake up the
+ * appctx but in the case the task is not in runqueue we may have to
+ * wakeup the appctx immediately.
*/
if (!task_in_rq(si_task(si)) &&
- (((si->flags & (SI_FL_RX_WAIT_EP|SI_FL_RXBLK_ROOM)) == 0) ||
- ((si->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET)))
+ ((si_rx_endp_ready(si) && !si_rx_blocked(si)) ||
+ (si_tx_endp_ready(si) && !si_tx_blocked(si))))
appctx_wakeup(si_appctx(si));
}