MAJOR: stream-interface: restore splicing mechanism

The splicing is now provided by the data-layer rcv_pipe/snd_pipe functions
which in turn are called by the stream interface's recv and send callbacks.

The presence of the rcv_pipe/snd_pipe functions is used to attest support
for splicing at the data layer. It looks like the stream-interface's
SI_FL_CAP_SPLICE flag does not make sense anymore as it's used as a proxy
for the pointers above.

It also appears that we call chk_snd() from the recv callback and then
try to call it again in update_conn(). It is very likely that this last
function will progressively slip into the recv/send callbacks in order
to avoid duplicate check code.

The code works right now with and without splicing. Only raw_sock provides
support for it and it is automatically selected when the various splice
options are set. However it looks like splice-auto doesn't enable it, which
possibly means that the streamer detection code does not work anymore, or
that it's only called at a time where it's too late to enable splicing (in
process_session).
diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h
index ee552c2..921363f 100644
--- a/include/types/stream_interface.h
+++ b/include/types/stream_interface.h
@@ -96,6 +96,7 @@
 struct proxy;
 struct si_applet;
 struct stream_interface;
+struct pipe;
 
 struct target {
 	int type;
@@ -120,6 +121,8 @@
 	void (*close)(struct connection *);         /* close the data channel on the connection */
 	int  (*rcv_buf)(struct connection *conn, struct buffer *buf, int count); /* recv callback */
 	int  (*snd_buf)(struct connection *conn, struct buffer *buf, int flags); /* send callback */
+	int  (*rcv_pipe)(struct connection *conn, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */
+	int  (*snd_pipe)(struct connection *conn, struct pipe *pipe); /* send-to-pipe callback */
 };
 
 /* A stream interface has 3 parts :
diff --git a/src/proto_tcp.c b/src/proto_tcp.c
index 86fd6bd..9ff5b66 100644
--- a/src/proto_tcp.c
+++ b/src/proto_tcp.c
@@ -479,7 +479,8 @@
 		conn_data_want_send(&si->conn);  /* prepare to send data if any */
 
 	si->state = SI_ST_CON;
-	si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
+	if (si->conn.data->rcv_pipe && si->conn.data->snd_pipe)
+		si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
 	si->exp = tick_add_ifset(now_ms, be->timeout.connect);
 
 	return SN_ERR_NONE;  /* connection is OK */
diff --git a/src/raw_sock.c b/src/raw_sock.c
index 60c7ed0..035d338 100644
--- a/src/raw_sock.c
+++ b/src/raw_sock.c
@@ -43,7 +43,7 @@
 #include <types/global.h>
 
 
-#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
+#if defined(CONFIG_HAP_LINUX_SPLICE)
 #include <common/splice.h>
 
 /* A pipe contains 16 segments max, and it's common to see segments of 1448 bytes
@@ -56,74 +56,29 @@
 #define MAX_SPLICE_AT_ONCE	(1<<30)
 
 /* Returns :
- *   -1 if splice is not possible or not possible anymore and we must switch to
- *      user-land copy (eg: to_forward reached)
- *    0 otherwise, including errors and close.
- * Sets :
- *   BF_READ_NULL
- *   BF_READ_PARTIAL
- *   BF_WRITE_PARTIAL (during copy)
- *   BF_OUT_EMPTY (during copy)
- *   SI_FL_ERR
- *   SI_FL_WAIT_ROOM
- *   (SI_FL_WAIT_RECV)
- *
- * This function automatically allocates a pipe from the pipe pool. It also
- * carefully ensures to clear b->pipe whenever it leaves the pipe empty.
+ *   -1 if splice() is not supported
+ *   >= 0 to report the amount of spliced bytes.
+ *   connection flags are updated (error, read0, wait_room, wait_data).
+ *   The caller must have previously allocated the pipe.
  */
-static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
+int raw_sock_to_pipe(struct connection *conn, struct pipe *pipe, unsigned int count)
 {
 	static int splice_detects_close;
-	int fd = si_fd(si);
 	int ret;
-	unsigned long max;
 	int retval = 0;
 
-	if (!b->to_forward)
-		return -1;
-
-	if (!(b->flags & BF_KERN_SPLICING))
-		return -1;
-
-	if (buffer_not_empty(&b->buf)) {
-		/* We're embarrassed, there are already data pending in
-		 * the buffer and we don't want to have them at two
-		 * locations at a time. Let's indicate we need some
-		 * place and ask the consumer to hurry.
-		 */
-		si->flags |= SI_FL_WAIT_ROOM;
-		conn_data_stop_recv(&si->conn);
-		b->rex = TICK_ETERNITY;
-		si_chk_snd(b->cons);
-		return 0;
-	}
-
-	if (unlikely(b->pipe == NULL)) {
-		if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
-			b->flags &= ~BF_KERN_SPLICING;
-			return -1;
-		}
-	}
-
-	/* At this point, b->pipe is valid */
-
-	while (1) {
-		if (b->to_forward == BUF_INFINITE_FORWARD)
-			max = MAX_SPLICE_AT_ONCE;
-		else
-			max = b->to_forward;
+	/* Under Linux, if FD_POLL_HUP is set, we have reached the end.
+	 * Since older splice() implementations were buggy and returned
+	 * EAGAIN on end of read, let's bypass the call to splice() now.
+	 */
+	if ((fdtab[conn->t.sock.fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP)
+		goto out_read0;
 
-		if (!max) {
-			/* It looks like the buffer + the pipe already contain
-			 * the maximum amount of data to be transferred. Try to
-			 * send those data immediately on the other side if it
-			 * is currently waiting.
-			 */
-			retval = -1; /* end of forwarding */
-			break;
-		}
+	while (count) {
+		if (count > MAX_SPLICE_AT_ONCE)
+			count = MAX_SPLICE_AT_ONCE;
 
-		ret = splice(fd, NULL, b->pipe->prod, NULL, max,
+		ret = splice(conn->t.sock.fd, NULL, pipe->prod, NULL, count,
 			     SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
 
 		if (ret <= 0) {
@@ -133,8 +88,7 @@
 				 * it works, we store the info for later use.
 				 */
 				splice_detects_close = 1;
-				b->flags |= BF_READ_NULL;
-				break;
+				goto out_read0;
 			}
 
 			if (errno == EAGAIN) {
@@ -142,13 +96,16 @@
 				 *   - nothing in the socket buffer (standard)
 				 *   - pipe is full
 				 *   - the connection is closed (kernel < 2.6.27.13)
-				 * Since we don't know if pipe is full, we'll
-				 * stop if the pipe is not empty. Anyway, we
-				 * will almost always fill/empty the pipe.
+				 * The last case is annoying but know if we can detect it
+				 * and if we can't then we rely on the call to recv() to
+				 * get a valid verdict. The difference between the first
+				 * two situations is problematic. Since we don't know if
+				 * the pipe is full, we'll stop if the pipe is not empty.
+				 * Anyway, we will almost always fill/empty the pipe.
 				 */
-
-				if (b->pipe->data) {
-					si->flags |= SI_FL_WAIT_ROOM;
+				if (pipe->data) {
+					/* alway stop reading until the pipe is flushed */
+					conn->flags |= CO_FL_WAIT_ROOM;
 					break;
 				}
 
@@ -161,48 +118,73 @@
 				 * which will be able to deal with the situation.
 				 */
 				if (splice_detects_close)
-					conn_data_poll_recv(&si->conn); /* we know for sure that it's EAGAIN */
-				else
-					retval = -1;
+					conn->flags |= CO_FL_WAIT_DATA; /* we know for sure that it's EAGAIN */
 				break;
 			}
-
-			if (errno == ENOSYS || errno == EINVAL) {
-				/* splice not supported on this end, disable it */
-				b->flags &= ~BF_KERN_SPLICING;
-				si->flags &= ~SI_FL_CAP_SPLICE;
-				put_pipe(b->pipe);
-				b->pipe = NULL;
+			else if (errno == ENOSYS || errno == EINVAL) {
+				/* splice not supported on this end, disable it.
+				 * We can safely return -1 since there is no
+				 * chance that any data has been piped yet.
+				 */
 				return -1;
 			}
-
+			else if (errno == EINTR) {
+				/* try again */
+				continue;
+			}
 			/* here we have another error */
-			si->flags |= SI_FL_ERR;
+			conn->flags |= CO_FL_ERROR;
 			break;
 		} /* ret <= 0 */
 
-		if (b->to_forward != BUF_INFINITE_FORWARD)
-			b->to_forward -= ret;
-		b->total += ret;
-		b->pipe->data += ret;
-		b->flags |= BF_READ_PARTIAL;
-		b->flags &= ~BF_OUT_EMPTY;
+		retval += ret;
+		pipe->data += ret;
 
-		if (b->pipe->data >= SPLICE_FULL_HINT ||
-		    ret >= global.tune.recv_enough) {
-			/* We've read enough of it for this time. */
+		if (pipe->data >= SPLICE_FULL_HINT || ret >= global.tune.recv_enough) {
+			/* We've read enough of it for this time, let's stop before
+			 * being asked to poll.
+			 */
 			break;
 		}
 	} /* while */
 
-	if (unlikely(!b->pipe->data)) {
-		put_pipe(b->pipe);
-		b->pipe = NULL;
-	}
+	return retval;
 
+ out_read0:
+	conn_sock_read0(conn);
 	return retval;
 }
 
+/* Send as many bytes as possible from the pipe to the connection's socket.
+ */
+int raw_sock_from_pipe(struct connection *conn, struct pipe *pipe)
+{
+	int ret, done;
+
+	done = 0;
+	while (pipe->data) {
+		ret = splice(pipe->cons, NULL, conn->t.sock.fd, NULL, pipe->data,
+			     SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
+
+		if (ret <= 0) {
+			if (ret == 0 || errno == EAGAIN) {
+				conn->flags |= CO_FL_WAIT_ROOM;
+				break;
+			}
+			else if (errno == EINTR)
+				continue;
+
+			/* here we have another error */
+			conn->flags |= CO_FL_ERROR;
+			break;
+		}
+
+		done += ret;
+		pipe->data -= ret;
+	}
+	return done;
+}
+
 #endif /* CONFIG_HAP_LINUX_SPLICE */
 
 
@@ -347,6 +329,10 @@
 	.write   = si_conn_send_cb,
 	.snd_buf = raw_sock_from_buf,
 	.rcv_buf = raw_sock_to_buf,
+#if defined(CONFIG_HAP_LINUX_SPLICE)
+	.rcv_pipe = raw_sock_to_pipe,
+	.snd_pipe = raw_sock_from_pipe,
+#endif
 	.close   = NULL,
 };
 
diff --git a/src/session.c b/src/session.c
index fa3c4dc..80415b3 100644
--- a/src/session.c
+++ b/src/session.c
@@ -179,12 +179,13 @@
 	if (likely(s->fe->options2 & PR_O2_INDEPSTR))
 		s->si[0].flags |= SI_FL_INDEP_STR;
 
-	if (addr->ss_family == AF_INET || addr->ss_family == AF_INET6)
-		s->si[0].flags = SI_FL_CAP_SPLTCP; /* TCP/TCPv6 splicing possible */
-
 	/* add the various callbacks */
 	stream_interface_prepare(&s->si[0], l->sock);
 
+	if ((s->si[0].conn.data->rcv_pipe && s->si[0].conn.data->snd_pipe) &&
+	    (addr->ss_family == AF_INET || addr->ss_family == AF_INET6))
+		s->si[0].flags = SI_FL_CAP_SPLTCP; /* TCP/TCPv6 splicing possible */
+
 	/* pre-initialize the other side's stream interface to an INIT state. The
 	 * callbacks will be initialized before attempting to connect.
 	 */
diff --git a/src/stream_interface.c b/src/stream_interface.c
index eef6dec..2f24c18 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -30,6 +30,7 @@
 #include <proto/connection.h>
 #include <proto/fd.h>
 #include <proto/frontend.h>
+#include <proto/pipe.h>
 #include <proto/stream_interface.h>
 #include <proto/task.h>
 
@@ -666,42 +667,30 @@
 	int write_poll = MAX_WRITE_POLL_LOOPS;
 	int ret;
 
-#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
-	while (b->pipe) {
-		ret = splice(b->pipe->cons, NULL, si_fd(si), NULL, b->pipe->data,
-			     SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
-		if (ret <= 0) {
-			if (ret == 0 || errno == EAGAIN) {
-				conn_data_poll_send(&si->conn);
-				return 0;
-			}
-			/* here we have another error */
-			return -1;
-		}
+	conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
 
-		b->flags |= BF_WRITE_PARTIAL;
-		b->pipe->data -= ret;
+	if (b->pipe && conn->data->snd_pipe) {
+		ret = conn->data->snd_pipe(conn, b->pipe);
+		if (ret > 0)
+			b->flags |= BF_WRITE_PARTIAL;
 
 		if (!b->pipe->data) {
 			put_pipe(b->pipe);
 			b->pipe = NULL;
-			break;
 		}
 
-		if (--write_poll <= 0)
-			return 0;
+		if (conn->flags & CO_FL_ERROR)
+			return -1;
 
-		/* The only reason we did not empty the pipe is that the output
-		 * buffer is full.
-		 */
-		conn_data_poll_send(&si->conn);
-		return 0;
+		if (conn->flags & CO_FL_WAIT_ROOM) {
+			conn_data_poll_send(conn);
+			return 0;
+		}
 	}
 
 	/* At this point, the pipe is empty, but we may still have data pending
 	 * in the normal buffer.
 	 */
-#endif
 	if (!b->buf.o) {
 		b->flags |= BF_OUT_EMPTY;
 		return 0;
@@ -710,7 +699,6 @@
 	/* when we're in this loop, we already know that there is no spliced
 	 * data left, and that there are sendable buffered data.
 	 */
-	conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
 	while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_DATA_WR_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
 		/* check if we want to inform the kernel that we're interested in
 		 * sending more data after this call. We want this if :
@@ -1004,34 +992,69 @@
 	if (b->flags & BF_SHUTR)
 		return;
 
+	cur_read = 0;
+	conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
+
-#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
-	if (b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) {
+	/* First, let's see if we may splice data across the channel without
+	 * using a buffer.
+	 */
+	if (conn->data->rcv_pipe &&
+	    b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) {
+		if (buffer_not_empty(&b->buf)) {
+			/* We're embarrassed, there are already data pending in
+			 * the buffer and we don't want to have them at two
+			 * locations at a time. Let's indicate we need some
+			 * place and ask the consumer to hurry.
+			 */
+			goto abort_splice;
+		}
 
-		/* Under Linux, if FD_POLL_HUP is set, we have reached the end.
-		 * Since older splice() implementations were buggy and returned
-		 * EAGAIN on end of read, let's bypass the call to splice() now.
-		 */
-		if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
-			goto out_shutdown_r;
+		if (unlikely(b->pipe == NULL)) {
+			if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
+				b->flags &= ~BF_KERN_SPLICING;
+				goto abort_splice;
+			}
+		}
+
+		ret = conn->data->rcv_pipe(conn, b->pipe, b->to_forward);
+		if (ret < 0) {
+			/* splice not supported on this end, let's disable it */
+			b->flags &= ~BF_KERN_SPLICING;
+			si->flags &= ~SI_FL_CAP_SPLICE;
+			goto abort_splice;
+		}
 
-		if (sock_raw_splice_in(b, si) >= 0) {
-			if (si->flags & SI_FL_ERR)
-				goto out_error;
-			if (b->flags & BF_READ_NULL)
-				goto out_shutdown_r;
-			return;
+		if (ret > 0) {
+			if (b->to_forward != BUF_INFINITE_FORWARD)
+				b->to_forward -= ret;
+			b->total += ret;
+			cur_read += ret;
+			b->flags |= BF_READ_PARTIAL;
+			b->flags &= ~BF_OUT_EMPTY;
 		}
+
+		if (conn_data_read0_pending(conn))
+			goto out_shutdown_r;
+
+		if (conn->flags & CO_FL_ERROR)
+			goto out_error;
+
 		/* splice not possible (anymore), let's go on on standard copy */
 	}
-#endif
-	cur_read = 0;
-	conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
-	while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
+
+ abort_splice:
+	/* release the pipe if we can, which is almost always the case */
+	if (b->pipe && !b->pipe->data) {
+		put_pipe(b->pipe);
+		b->pipe = NULL;
+	}
+
+	while (!b->pipe && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
 		max = bi_avail(b);
 
 		if (!max) {
 			b->flags |= BF_FULL;
-			si->flags |= SI_FL_WAIT_ROOM;
+			conn->flags |= CO_FL_WAIT_ROOM;
 			break;
 		}
 
@@ -1133,7 +1156,39 @@
 		}
 	} /* while !flags */
 
+	if (conn->flags & CO_FL_ERROR)
+		goto out_error;
+
-	if (conn->flags & CO_FL_WAIT_DATA) {
+	if (conn->flags & CO_FL_WAIT_ROOM) {
+		/* We might have some data the consumer is waiting for.
+		 * We can do fast-forwarding, but we avoid doing this for partial
+		 * buffers, because it is very likely that it will be done again
+		 * immediately afterwards once the following data is parsed (eg:
+		 * HTTP chunking).
+		 */
+		if (((b->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
+		    (b->pipe /* always try to send spliced data */ ||
+		     (b->buf.i == 0 && (b->cons->flags & SI_FL_WAIT_DATA)))) {
+			int last_len = b->pipe ? b->pipe->data : 0;
+
+			si_chk_snd(b->cons);
+
+			/* check if the consumer has freed some space */
+			if (!(b->flags & BF_FULL) &&
+			    (!last_len || !b->pipe || b->pipe->data < last_len))
+				si->flags &= ~SI_FL_WAIT_ROOM;
+		}
+
+		if (si->flags & SI_FL_WAIT_ROOM) {
+			conn_data_stop_recv(conn);
+			b->rex = TICK_ETERNITY;
+		}
+		else if ((b->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
+			if (tick_isset(b->rex))
+				b->rex = tick_add_ifset(now_ms, b->rto);
+		}
+	}
+	else if (conn->flags & CO_FL_WAIT_DATA) {
 		/* we don't automatically ask for polling if we have
 		 * read enough data, as it saves some syscalls with
 		 * speculative pollers.
@@ -1144,9 +1199,6 @@
 			__conn_data_want_recv(conn);
 	}
 
-	if (conn->flags & CO_FL_ERROR)
-		goto out_error;
-
 	if (conn_data_read0_pending(conn))
 		/* connection closed */
 		goto out_shutdown_r;