MEDIUM: stream_interface: Make recv() subscribe when more data is needed.
Refactor the code so that si_cs_recv() subscribes to receive events.
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 46e57d4..e5ddee6 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -51,10 +51,10 @@
static void stream_int_shutw_applet(struct stream_interface *si);
static void stream_int_chk_rcv_applet(struct stream_interface *si);
static void stream_int_chk_snd_applet(struct stream_interface *si);
-static void si_cs_recv(struct conn_stream *cs);
+static int si_cs_recv(struct conn_stream *cs);
static int si_cs_wake_cb(struct conn_stream *cs);
static int si_idle_conn_wake_cb(struct conn_stream *cs);
-static struct task * si_cs_send(struct conn_stream *cs);
+static int si_cs_send(struct conn_stream *cs);
/* stream-interface operations for embedded tasks */
struct si_ops si_embedded_ops = {
@@ -631,7 +631,7 @@
* caller to commit polling changes. The caller should check conn->flags
* for errors.
*/
-static struct task * si_cs_send(struct conn_stream *cs)
+static int si_cs_send(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
@@ -641,21 +641,21 @@
/* We're already waiting to be able to send, give up */
if (si->wait_list.wait_reason & SUB_CAN_SEND)
- return NULL;
+ return 0;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return NULL;
+ return 0;
if (conn->flags & CO_FL_HANDSHAKE) {
/* a handshake was requested */
/* Schedule ourself to be woken up once the handshake is done */
conn->xprt->subscribe(conn, SUB_CAN_SEND, &si->wait_list);
- return NULL;
+ return 0;
}
/* we might have been called just after an asynchronous shutw */
if (si_oc(si)->flags & CF_SHUTW)
- return NULL;
+ return 0;
/* ensure it's only set if a write attempt has succeeded */
oc->flags &= ~CF_WRITE_PARTIAL;
@@ -673,7 +673,7 @@
}
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return NULL;
+ return 0;
}
/* At this point, the pipe is empty, but we may still have data pending
@@ -753,20 +753,24 @@
}
}
- return NULL;
+ return did_send;
}
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
{
struct stream_interface *si = ctx;
struct conn_stream *cs = objt_cs(si->end);
+ int ret = 0;
if (!cs)
return NULL;
- if (!(si->wait_list.wait_reason & SUB_CAN_SEND)) {
- si_cs_send(cs);
+ if (!(si->wait_list.wait_reason & SUB_CAN_SEND))
+ ret = si_cs_send(cs);
+ if (!(si->wait_list.wait_reason & SUB_CAN_RECV))
+ ret |= si_cs_recv(cs);
+ if (ret != 0)
si_cs_wake_cb(cs);
- }
+
return (NULL);
}
@@ -1138,12 +1142,12 @@
* into the buffer from the connection. It iterates over the mux layer's
* rcv_buf function.
*/
-static void si_cs_recv(struct conn_stream *cs)
+static int si_cs_recv(struct conn_stream *cs)
{
struct connection *conn = cs->conn;
struct stream_interface *si = cs->data;
struct channel *ic = si_ic(si);
- int ret, max, cur_read;
+ int ret, max, cur_read = 0;
int read_poll = MAX_READ_POLL_LOOPS;
/* stop immediately on errors. Note that we DON'T want to stop on
@@ -1153,18 +1157,22 @@
* which rejects it before reading it all.
*/
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return;
+ return 0;
+
+ /* If another call to si_cs_recv() failed, and we subscribed to
+ * recv events already, give up now.
+ */
+ if (si->wait_list.wait_reason & SUB_CAN_RECV)
+ return 0;
/* maybe we were called immediately after an asynchronous shutr */
if (ic->flags & CF_SHUTR)
- return;
+ return 0;
/* stop here if we reached the end of data */
if (cs->flags & CS_FL_EOS)
goto out_shutdown_r;
- cur_read = 0;
-
if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) &&
global.tune.idle_timer &&
(unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) {
@@ -1218,7 +1226,7 @@
goto out_shutdown_r;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return;
+ return cur_read != 0;
if (conn->flags & CO_FL_WAIT_ROOM) {
/* the pipe is full or we have read enough data that it
@@ -1377,13 +1385,16 @@
end_recv:
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
- return;
+ return cur_read != 0;
if (cs->flags & CS_FL_EOS)
/* connection closed */
goto out_shutdown_r;
- return;
+ /* Subscribe to receive events */
+ conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_list);
+
+ return cur_read != 0;
out_shutdown_r:
/* we received a shutdown */
@@ -1391,7 +1402,7 @@
if (ic->flags & CF_AUTO_CLOSE)
channel_shutw_now(ic);
stream_sock_read0(si);
- return;
+ return cur_read != 0;
}
/*