BUG/MEDIUM: process_stream: Don't use si_cs_io_cb() in process_stream().

Instead of using si_cs_io_cb() in process_stream()  use si_cs_send/si_cs_recv
instead, as si_cs_io_cb() may lead to process_stream being woken up when it
shouldn't be, and thus timeout would never get triggered.
diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h
index 74537b9..4a38de8 100644
--- a/include/proto/stream_interface.h
+++ b/include/proto/stream_interface.h
@@ -53,6 +53,8 @@
 void stream_int_update_conn(struct stream_interface *si);
 void stream_int_update_applet(struct stream_interface *si);
 void stream_int_notify(struct stream_interface *si);
+int si_cs_recv(struct conn_stream *cs);
+int si_cs_send(struct conn_stream *cs);
 struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state);
 
 /* returns the channel which receives data from this stream interface (input channel) */
diff --git a/src/stream.c b/src/stream.c
index b7eb5db..e75491e 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -1646,6 +1646,8 @@
 	unsigned int req_ana_back;
 	struct channel *req, *res;
 	struct stream_interface *si_f, *si_b;
+	struct conn_stream *cs;
+	int ret;
 
 	activity[tid].stream++;
 
@@ -1656,8 +1658,16 @@
 	si_b = &s->si[1];
 
 	/* First, attempd to do I/Os */
-	si_cs_io_cb(NULL, si_f, 0);
-	si_cs_io_cb(NULL, si_b, 0);
+	cs = objt_cs(si_f->end);
+	if (cs) {
+		si_cs_send(cs);
+		si_cs_recv(cs);
+	}
+	cs = objt_cs(si_b->end);
+	if (cs) {
+		si_cs_send(cs);
+		si_cs_recv(cs);
+	}
 
 	//DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,
 	//        si_f->state, si_b->state, si_b->err_type, req->flags, res->flags);
@@ -2489,8 +2499,22 @@
 		s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
 		stream_release_buffers(s);
 		/* We may have free'd some space in buffers, or have more to send/recv, try again */
-		si_cs_io_cb(NULL, si_f, 0);
-		si_cs_io_cb(NULL, si_b, 0);
+		cs = objt_cs(si_f->end);
+		ret = 0;
+		if (cs && !(cs->conn->flags & CO_FL_ERROR)) {
+			ret |= si_cs_send(cs);
+			si_cs_recv(cs);
+			ret |= (ci_data(si_ic(si_f)) != 0 ) | (cs->conn->flags & CO_FL_ERROR);
+		}
+		cs = objt_cs(si_b->end);
+		if (cs && !(cs->conn->flags & CO_FL_ERROR)) {
+			ret |= si_cs_send(cs);
+			si_cs_recv(cs);
+			ret |= (ci_data(si_ic(si_b)) != 0 ) | (cs->conn->flags & CO_FL_ERROR);
+
+		}
+		if (ret)
+			task_wakeup(t, TASK_WOKEN_IO);
 		return t; /* nothing more to do */
 	}
 
diff --git a/src/stream_interface.c b/src/stream_interface.c
index c918a7c..7f787c1 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -51,10 +51,10 @@
 static void stream_int_shutw_applet(struct stream_interface *si);
 static void stream_int_chk_rcv_applet(struct stream_interface *si);
 static void stream_int_chk_snd_applet(struct stream_interface *si);
-static int si_cs_recv(struct conn_stream *cs);
+int si_cs_recv(struct conn_stream *cs);
 static int si_cs_process(struct conn_stream *cs);
 static int si_idle_conn_wake_cb(struct conn_stream *cs);
-static int si_cs_send(struct conn_stream *cs);
+int si_cs_send(struct conn_stream *cs);
 
 /* stream-interface operations for embedded tasks */
 struct si_ops si_embedded_ops = {
@@ -622,7 +622,7 @@
  * caller to commit polling changes. The caller should check conn->flags
  * for errors.
  */
-static int si_cs_send(struct conn_stream *cs)
+int si_cs_send(struct conn_stream *cs)
 {
 	struct connection *conn = cs->conn;
 	struct stream_interface *si = cs->data;
@@ -1114,7 +1114,7 @@
  * into the buffer from the connection. It iterates over the mux layer's
  * rcv_buf function.
  */
-static int si_cs_recv(struct conn_stream *cs)
+int si_cs_recv(struct conn_stream *cs)
 {
 	struct connection *conn = cs->conn;
 	struct stream_interface *si = cs->data;
@@ -1128,6 +1128,8 @@
 	 * happens when we send too large a request to a backend server
 	 * which rejects it before reading it all.
 	 */
+	if (!conn_xprt_ready(conn))
+		return 0;
 	if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
 		return 1; // We want to make sure si_cs_wake() is called, so that process_strema is woken up, on failure