MEDIUM: connections: Get rid of the recv() method.

Remove the recv() method from mux and conn_stream.
The goal is to always receive from the upper layers, instead of waiting
for the connection later. For now, recv() is still called from the wake()
method, but that should change soon.
diff --git a/include/types/connection.h b/include/types/connection.h
index 421df3c..1fa0b73 100644
--- a/include/types/connection.h
+++ b/include/types/connection.h
@@ -310,7 +310,6 @@
  */
 struct mux_ops {
 	int  (*init)(struct connection *conn);        /* early initialization */
-	void (*recv)(struct connection *conn);        /* mux-layer recv callback */
 	int  (*wake)(struct connection *conn);        /* mux-layer callback to report activity, mandatory */
 	void (*update_poll)(struct conn_stream *cs);  /* commit cs flags to mux/conn */
 	size_t (*rcv_buf)(struct conn_stream *cs, struct buffer *buf, size_t count, int flags); /* Called from the upper layer to get data */
@@ -336,7 +335,6 @@
  * data movement. It may abort a connection by returning < 0.
  */
 struct data_cb {
-	void (*recv)(struct conn_stream *cs);  /* data-layer recv callback */
 	int  (*wake)(struct conn_stream *cs);  /* data-layer callback to report activity */
 	int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
 	char name[8];                           /* data layer name, zero-terminated */
diff --git a/src/checks.c b/src/checks.c
index b64d32e..2e21459 100644
--- a/src/checks.c
+++ b/src/checks.c
@@ -70,6 +70,7 @@
 static int tcpcheck_main(struct check *);
 static void __event_srv_chk_w(struct conn_stream *cs);
 static int wake_srv_chk(struct conn_stream *cs);
+static void __event_srv_chk_r(struct conn_stream *cs);
 
 static struct pool_head *pool_head_email_alert   = NULL;
 static struct pool_head *pool_head_tcpcheck_rule = NULL;
@@ -709,9 +710,15 @@
 static struct task *event_srv_chk_io(struct task *t, void *ctx, unsigned short state)
 {
 	struct conn_stream *cs = ctx;
+	struct check *check = cs->data;
 
 	if (!(cs->wait_list.wait_reason & SUB_CAN_SEND))
 		wake_srv_chk(cs);
+	if (!(cs->wait_list.wait_reason & SUB_CAN_RECV)) {
+		HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
+		__event_srv_chk_r(cs);
+		HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
+	}
 	return NULL;
 }
 
@@ -803,9 +810,11 @@
  * etc.
  *
  * Please do NOT place any return statement in this function and only leave
- * via the out_unlock label.
+ * via the out label.
+ *
+ * This must be called with the server lock held.
  */
-static void event_srv_chk_r(struct conn_stream *cs)
+static void __event_srv_chk_r(struct conn_stream *cs)
 {
 	struct connection *conn = cs->conn;
 	struct check *check = cs->data;
@@ -815,17 +824,17 @@
 	int done;
 	unsigned short msglen;
 
-	HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
-
 	if (unlikely(check->result == CHK_RES_FAILED))
 		goto out_wakeup;
 
-	if (conn->flags & CO_FL_HANDSHAKE)
-		goto out_unlock;
+	if (conn->flags & CO_FL_HANDSHAKE) {
+		cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &cs->wait_list);
+		goto out;
+	}
 
 	/* wake() will take care of calling tcpcheck_main() */
 	if (check->type == PR_O2_TCPCHK_CHK)
-		goto out_unlock;
+		goto out;
 
 	/* Warning! Linux returns EAGAIN on SO_ERROR if data are still available
 	 * but the connection was closed on the remote end. Fortunately, recv still
@@ -1372,13 +1381,13 @@
 		conn->flags |= CO_FL_ERROR;
 
 	task_wakeup(t, TASK_WOKEN_IO);
- out_unlock:
-	HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
+out:
 	return;
 
  wait_more_data:
 	__cs_want_recv(cs);
-        goto out_unlock;
+	cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &cs->wait_list);
+        goto out;
 }
 
 /*
@@ -1443,7 +1452,6 @@
 }
 
 struct data_cb check_conn_cb = {
-	.recv = event_srv_chk_r,
 	.wake = wake_srv_chk,
 	.name = "CHCK",
 };
@@ -2172,8 +2180,10 @@
 				t->expire = tick_first(t->expire, t_con);
 			}
 
-			if (check->type)
+			if (check->type) {
 				cs_want_recv(cs);   /* prepare for reading a possible reply */
+				__event_srv_chk_r(cs);
+			}
 
 			task_set_affinity(t, tid_bit);
 			goto reschedule;
@@ -2928,8 +2938,10 @@
 						goto out_end_tcpcheck;
 					}
 				}
-				else
+				else {
+					conn->mux->subscribe(cs, SUB_CAN_RECV, &cs->wait_list);
 					break;
+				}
 			}
 
 			/* mark the step as started */
@@ -3091,8 +3103,10 @@
 		__cs_want_send(cs);
 
 	if (&check->current_step->list != head &&
-	    check->current_step->action == TCPCHK_ACT_EXPECT)
+	    check->current_step->action == TCPCHK_ACT_EXPECT) {
 		__cs_want_recv(cs);
+		__event_srv_chk_r(cs);
+	}
 	goto out;
 
  out_end_tcpcheck:
diff --git a/src/connection.c b/src/connection.c
index 005e0e7..ad03863 100644
--- a/src/connection.c
+++ b/src/connection.c
@@ -64,7 +64,7 @@
 {
 	struct connection *conn = fdtab[fd].owner;
 	unsigned int flags;
-	int can_send = 0;
+	int io_available = 0;
 
 	if (unlikely(!conn)) {
 		activity[tid].conn_dead++;
@@ -128,7 +128,8 @@
 		 * both of which will be detected below.
 		 */
 		flags = 0;
-		can_send = LIST_ISEMPTY(&conn->send_wait_list);
+		io_available = (LIST_ISEMPTY(&conn->send_wait_list) &&
+		    LIST_ISEMPTY(&conn->sendrecv_wait_list));;
 		while (!LIST_ISEMPTY(&conn->send_wait_list)) {
 			struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
 			    struct wait_list *, list);
@@ -138,7 +139,7 @@
 			tasklet_wakeup(sw->task);
 		}
 		while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) {
-			struct wait_list *sw = LIST_ELEM(conn->send_wait_list.n,
+			struct wait_list *sw = LIST_ELEM(conn->sendrecv_wait_list.n,
 			    struct wait_list *, list);
 			LIST_DEL(&sw->list);
 			LIST_INIT(&sw->list);
@@ -159,7 +160,26 @@
 		 * both of which will be detected below.
 		 */
 		flags = 0;
-		conn->mux->recv(conn);
+		io_available |= (LIST_ISEMPTY(&conn->recv_wait_list) &&
+		    LIST_ISEMPTY(&conn->sendrecv_wait_list));
+		while (!LIST_ISEMPTY(&conn->recv_wait_list)) {
+			struct wait_list *sw = LIST_ELEM(conn->recv_wait_list.n,
+			    struct wait_list *, list);
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			sw->wait_reason &= ~SUB_CAN_RECV;
+			tasklet_wakeup(sw->task);
+		}
+		while (!(LIST_ISEMPTY(&conn->sendrecv_wait_list))) {
+			struct wait_list *sw = LIST_ELEM(conn->sendrecv_wait_list.n,
+			    struct wait_list *, list);
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			LIST_ADDQ(&conn->send_wait_list, &sw->list);
+			sw->wait_reason &= ~SUB_CAN_RECV;
+			tasklet_wakeup(sw->task);
+		}
+
 	}
 
 	/* It may happen during the data phase that a handshake is
@@ -206,7 +226,7 @@
 	 * Note that the wake callback is allowed to release the connection and
 	 * the fd (and return < 0 in this case).
 	 */
-	if ((can_send || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
+	if ((io_available || (((conn->flags ^ flags) & CO_FL_NOTIFY_DATA) ||
 	     ((flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) != CO_FL_CONNECTED &&
 	      (conn->flags & (CO_FL_CONNECTED|CO_FL_HANDSHAKE)) == CO_FL_CONNECTED))) &&
 	    conn->mux->wake(conn) < 0)
diff --git a/src/mux_h2.c b/src/mux_h2.c
index 3c873e9..8922f5c 100644
--- a/src/mux_h2.c
+++ b/src/mux_h2.c
@@ -121,8 +121,6 @@
 	struct list fctl_list; /* list of streams blocked by connection's fctl */
 	struct buffer_wait buf_wait; /* wait list for buffer allocations */
 	struct list send_wait_list;  /* list of tasks to wake when we're ready to send */
-	struct list recv_wait_list;          /* list of tasks to wake when we're ready to recv */
-	struct list sendrecv_wait_list;      /* list of tasks to wake when we're ready to either send or recv */
 	struct wait_list wait_list;  /* We're in a wait list, to send */
 };
 
@@ -186,6 +184,7 @@
 	enum h2_err errcode; /* H2 err code (H2_ERR_*) */
 	enum h2_ss st;
 	struct buffer rxbuf; /* receive buffer, always valid (buf_empty or real buffer) */
+	struct wait_list *recv_wait_list; /* Somebody subscribed to be waken up on recv */
 };
 
 /* descriptor for an h2 frame header */
@@ -222,6 +221,7 @@
 
 static struct task *h2_timeout_task(struct task *t, void *context, unsigned short state);
 static void h2_send(struct h2c *h2c);
+static void h2_recv(struct h2c *h2c);
 static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short state);
 static inline struct h2s *h2c_st_by_id(struct h2c *h2c, int id);
 static int h2_frt_decode_headers(struct h2s *h2s);
@@ -280,8 +280,10 @@
 
 	if ((h2c->flags & H2_CF_DEM_DALLOC) && b_alloc_margin(&h2c->dbuf, 0)) {
 		h2c->flags &= ~H2_CF_DEM_DALLOC;
-		if (h2_recv_allowed(h2c))
+		if (h2_recv_allowed(h2c)) {
 			conn_xprt_want_recv(h2c->conn);
+			h2_recv(h2c);
+		}
 		return 1;
 	}
 
@@ -292,8 +294,10 @@
 
 		if (h2c->flags & H2_CF_DEM_MROOM) {
 			h2c->flags &= ~H2_CF_DEM_MROOM;
-			if (h2_recv_allowed(h2c))
+			if (h2_recv_allowed(h2c)) {
 				conn_xprt_want_recv(h2c->conn);
+				h2_recv(h2c);
+			}
 		}
 		return 1;
 	}
@@ -302,8 +306,10 @@
 	    (h2s = h2c_st_by_id(h2c, h2c->dsi)) && h2s->cs &&
 	    b_alloc_margin(&h2s->rxbuf, 0)) {
 		h2c->flags &= ~H2_CF_DEM_SALLOC;
-		if (h2_recv_allowed(h2c))
+		if (h2_recv_allowed(h2c)) {
 			conn_xprt_want_recv(h2c->conn);
+			h2_recv(h2c);
+		}
 		return 1;
 	}
 
@@ -408,11 +414,10 @@
 		task_queue(t);
 	conn_xprt_want_recv(conn);
 	LIST_INIT(&h2c->send_wait_list);
-	LIST_INIT(&h2c->recv_wait_list);
-	LIST_INIT(&h2c->sendrecv_wait_list);
 	LIST_INIT(&h2c->wait_list.list);
 
-	/* mux->wake will be called soon to complete the operation */
+	/* Try to read, if nothing is available yet we'll just subscribe */
+	h2_recv(h2c);
 	return 0;
  fail:
 	if (t)
@@ -2227,18 +2232,17 @@
 	return (h2c->mws <= 0 || LIST_ISEMPTY(&h2c->fctl_list)) && LIST_ISEMPTY(&h2c->send_list);
 }
 
-
-/*********************************************************/
-/* functions below are I/O callbacks from the connection */
-/*********************************************************/
 
-/* callback called on recv event by the connection handler */
-static void h2_recv(struct connection *conn)
+/* Attempt to read data, and subscribe if none available */
+static void h2_recv(struct h2c *h2c)
 {
-	struct h2c *h2c = conn->mux_ctx;
+	struct connection *conn = h2c->conn;
 	struct buffer *buf;
 	int max;
 
+	if (h2c->wait_list.wait_reason & SUB_CAN_RECV)
+		return;
+
 	if (!h2_recv_allowed(h2c))
 		return;
 
@@ -2253,6 +2257,7 @@
 		conn->xprt->rcv_buf(conn, buf, max, 0);
 
 	if (!b_data(buf)) {
+		conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_list);
 		h2_release_buf(h2c, &h2c->dbuf);
 		return;
 	}
@@ -2334,20 +2339,9 @@
 			    struct wait_list *, list);
 			LIST_DEL(&sw->list);
 			LIST_INIT(&sw->list);
-			sw->wait_reason &= ~SUB_CAN_SEND;
-			tasklet_wakeup(sw->task);
-		}
-		while (!(LIST_ISEMPTY(&h2c->sendrecv_wait_list))) {
-			struct wait_list *sw = LIST_ELEM(h2c->send_wait_list.n,
-			    struct wait_list *, list);
-			LIST_DEL(&sw->list);
-			LIST_INIT(&sw->list);
-			LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
 			sw->wait_reason &= ~SUB_CAN_SEND;
 			tasklet_wakeup(sw->task);
 		}
-
-
 	}
 	/* We're done, no more to send */
 	if (!b_data(&h2c->mbuf))
@@ -2364,6 +2358,8 @@
 
 	if (!(h2c->wait_list.wait_reason & SUB_CAN_SEND))
 		h2_send(h2c);
+	if (!(h2c->wait_list.wait_reason & SUB_CAN_RECV))
+		h2_recv(h2c);
 	return NULL;
 }
 
@@ -2377,6 +2373,9 @@
 	struct session *sess = conn->owner;
 
 	h2_send(h2c);
+	if (h2_recv_allowed(h2c))
+		h2_recv(h2c);
+
 	if (b_data(&h2c->dbuf) && !(h2c->flags & H2_CF_DEM_BLOCK_ANY)) {
 		h2_process_demux(h2c);
 
@@ -2436,11 +2435,11 @@
 		h2_release_buf(h2c, &h2c->dbuf);
 
 	/* stop being notified of incoming data if we can't process them */
-	if (!h2_recv_allowed(h2c)) {
+	if (!h2_recv_allowed(h2c))
 		__conn_xprt_stop_recv(conn);
-	}
 	else {
 		__conn_xprt_want_recv(conn);
+		h2_recv(h2c);
 	}
 
 	/* adjust output polling */
@@ -2554,6 +2553,7 @@
 		h2s->h2c->flags &= ~H2_CF_DEM_SFULL;
 		if (h2s->h2c->dsi == h2s->id) {
 			conn_xprt_want_recv(cs->conn);
+			h2_recv(h2s->h2c);
 			conn_xprt_want_send(cs->conn);
 		}
 	}
@@ -2605,6 +2605,7 @@
 		h2c->flags &= ~H2_CF_DEM_TOOMANY;
 		if (h2_recv_allowed(h2c)) {
 			__conn_xprt_want_recv(h2c->conn);
+			h2_recv(h2c);
 			conn_xprt_want_send(h2c->conn);
 		}
 	}
@@ -2625,6 +2626,7 @@
 		h2c->flags &= ~H2_CF_DEM_BLOCK_ANY;
 		h2c->flags &= ~H2_CF_MUX_BLOCK_ANY;
 		conn_xprt_want_recv(cs->conn);
+		h2_recv(h2c);
 		conn_xprt_want_send(cs->conn);
 	}
 
@@ -3477,30 +3479,14 @@
 		sw = param;
 		if (!(sw->wait_reason & SUB_CAN_RECV)) {
 			sw->wait_reason |= SUB_CAN_RECV;
-			/* If we're already subscribed for send(), move it
-			 * to the send+recv list
-			 */
-			if (sw->wait_reason & SUB_CAN_SEND) {
-				LIST_DEL(&sw->list);
-				LIST_INIT(&sw->list);
-				LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
-			} else
-				LIST_ADDQ(&h2c->recv_wait_list, &sw->list);
+			h2s->recv_wait_list = sw;
 		}
 		return 0;
 	case SUB_CAN_SEND:
 		sw = param;
 		if (!(sw->wait_reason & SUB_CAN_SEND)) {
 			sw->wait_reason |= SUB_CAN_SEND;
-			/* If we're already subscribed for recv(), move it
-			 * to the send+recv list
-			 */
-			if (sw->wait_reason & SUB_CAN_RECV) {
-				LIST_DEL(&sw->list);
-				LIST_INIT(&sw->list);
-				LIST_ADDQ(&h2c->sendrecv_wait_list, &sw->list);
-			} else
-				LIST_ADDQ(&h2c->send_wait_list, &sw->list);
+			LIST_ADDQ(&h2c->send_wait_list, &sw->list);
 		}
 		return 0;
 	default:
@@ -3710,7 +3696,6 @@
 /* The mux operations */
 const struct mux_ops h2_ops = {
 	.init = h2_init,
-	.recv = h2_recv,
 	.wake = h2_wake,
 	.update_poll = h2_update_poll,
 	.snd_buf = h2_snd_buf,
diff --git a/src/mux_pt.c b/src/mux_pt.c
index 71f26e7..466ac21 100644
--- a/src/mux_pt.c
+++ b/src/mux_pt.c
@@ -85,21 +85,6 @@
 	conn_cond_update_xprt_polling(conn);
 }
 
-/* callback to be used by default for the pass-through mux. It simply calls the
- * data layer recv() callback much must be set.
- */
-static void mux_pt_recv(struct connection *conn)
-{
-	struct conn_stream *cs = conn->mux_ctx;
-
-	if (conn->flags & CO_FL_ERROR)
-		cs->flags |= CS_FL_ERROR;
-	if (conn_xprt_read0_pending(conn))
-		cs->flags |= CS_FL_EOS;
-	cs->data_cb->recv(cs);
-	cs_update_mux_polling(cs);
-}
-
 /*
  * Attach a new stream to a connection
  * (Used for outgoing connections)
@@ -200,7 +185,6 @@
 /* The mux operations */
 const struct mux_ops mux_pt_ops = {
 	.init = mux_pt_init,
-	.recv = mux_pt_recv,
 	.wake = mux_pt_wake,
 	.update_poll = mux_pt_update_poll,
 	.rcv_buf = mux_pt_rcv_buf,
diff --git a/src/stream_interface.c b/src/stream_interface.c
index cfa613a..46e57d4 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -51,10 +51,9 @@
 static void stream_int_shutw_applet(struct stream_interface *si);
 static void stream_int_chk_rcv_applet(struct stream_interface *si);
 static void stream_int_chk_snd_applet(struct stream_interface *si);
-static void si_cs_recv_cb(struct conn_stream *cs);
+static void si_cs_recv(struct conn_stream *cs);
 static int si_cs_wake_cb(struct conn_stream *cs);
 static int si_idle_conn_wake_cb(struct conn_stream *cs);
-static void si_idle_conn_null_cb(struct conn_stream *cs);
 static struct task * si_cs_send(struct conn_stream *cs);
 
 /* stream-interface operations for embedded tasks */
@@ -84,13 +83,11 @@
 };
 
 struct data_cb si_conn_cb = {
-	.recv    = si_cs_recv_cb,
 	.wake    = si_cs_wake_cb,
 	.name    = "STRM",
 };
 
 struct data_cb si_idle_conn_cb = {
-	.recv    = si_idle_conn_null_cb,
 	.wake    = si_idle_conn_wake_cb,
 	.name    = "IDLE",
 };
@@ -417,15 +414,6 @@
 }
 
 
-/* Tiny I/O callback called on recv/send I/O events on idle connections.
- * It simply sets the CO_FL_SOCK_RD_SH flag so that si_idle_conn_wake_cb()
- * is notified and can kill the connection.
- */
-static void si_idle_conn_null_cb(struct conn_stream *cs)
-{
-	conn_sock_drain(cs->conn);
-}
-
 /* Callback to be used by connection I/O handlers when some activity is detected
  * on an idle server connection. Its main purpose is to kill the connection once
  * a close was detected on it. It returns 0 if it did nothing serious, or -1 if
@@ -439,6 +427,8 @@
 	if (!conn_ctrl_ready(conn))
 		return 0;
 
+	conn_sock_drain(conn);
+
 	if (conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH) || cs->flags & CS_FL_ERROR) {
 		/* warning, we can't do anything on <conn> after this call ! */
 		si_release_endpoint(si);
@@ -582,8 +572,8 @@
 	 * for recv() (received only an empty response).
 	 */
 	if (!(cs->flags & CS_FL_EOS) &&
-	    (cs->flags & (CS_FL_DATA_RD_ENA|CS_FL_REOS|CS_FL_RCV_MORE)) > CS_FL_DATA_RD_ENA)
-		si_cs_recv_cb(cs);
+	    (cs->flags & (CS_FL_DATA_RD_ENA)))
+		si_cs_recv(cs);
 
 	/* If we have data to send, try it now */
 	if (!channel_is_empty(oc) && objt_cs(si->end))
@@ -753,7 +743,7 @@
 			tasklet_wakeup(sw->task);
 		}
 		while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) {
-			struct wait_list *sw = LIST_ELEM(cs->send_wait_list.n,
+			struct wait_list *sw = LIST_ELEM(cs->sendrecv_wait_list.n,
 			    struct wait_list *, list);
 			LIST_DEL(&sw->list);
 			LIST_INIT(&sw->list);
@@ -1148,7 +1138,7 @@
  * into the buffer from the connection. It iterates over the mux layer's
  * rcv_buf function.
  */
-static void si_cs_recv_cb(struct conn_stream *cs)
+static void si_cs_recv(struct conn_stream *cs)
 {
 	struct connection *conn = cs->conn;
 	struct stream_interface *si = cs->data;
@@ -1364,6 +1354,26 @@
 		}
 		ic->last_read = now_ms;
 	}
+	if (cur_read > 0) {
+		while (!LIST_ISEMPTY(&cs->recv_wait_list)) {
+			struct wait_list *sw = LIST_ELEM(cs->recv_wait_list.n,
+			    struct wait_list *, list);
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			sw->wait_reason &= ~SUB_CAN_RECV;
+			tasklet_wakeup(sw->task);
+		}
+		while (!(LIST_ISEMPTY(&cs->sendrecv_wait_list))) {
+			struct wait_list *sw = LIST_ELEM(cs->sendrecv_wait_list.n,
+			    struct wait_list *, list);
+			LIST_DEL(&sw->list);
+			LIST_INIT(&sw->list);
+			LIST_ADDQ(&cs->send_wait_list, &sw->list);
+			sw->wait_reason &= ~SUB_CAN_RECV;
+			tasklet_wakeup(sw->task);
+		}
+
+	}
 
  end_recv:
 	if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)