MAJOR: stream-interface: restore splicing mechanism
The splicing is now provided by the data-layer rcv_pipe/snd_pipe functions
which in turn are called by the stream interface's recv and send callbacks.
The presence of the rcv_pipe/snd_pipe functions is used to attest support
for splicing at the data layer. It looks like the stream-interface's
SI_FL_CAP_SPLICE flag does not make sense anymore as it's used as a proxy
for the pointers above.
It also appears that we call chk_snd() from the recv callback and then
try to call it again in update_conn(). It is very likely that this last
function will progressively slip into the recv/send callbacks in order
to avoid duplicate check code.
The code works right now with and without splicing. Only raw_sock provides
support for it and it is automatically selected when the various splice
options are set. However it looks like splice-auto doesn't enable it, which
possibly means that the streamer detection code does not work anymore, or
that it's only called at a time where it's too late to enable splicing (in
process_session).
diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h
index ee552c2..921363f 100644
--- a/include/types/stream_interface.h
+++ b/include/types/stream_interface.h
@@ -96,6 +96,7 @@
struct proxy;
struct si_applet;
struct stream_interface;
+struct pipe;
struct target {
int type;
@@ -120,6 +121,8 @@
void (*close)(struct connection *); /* close the data channel on the connection */
int (*rcv_buf)(struct connection *conn, struct buffer *buf, int count); /* recv callback */
int (*snd_buf)(struct connection *conn, struct buffer *buf, int flags); /* send callback */
+ int (*rcv_pipe)(struct connection *conn, struct pipe *pipe, unsigned int count); /* recv-to-pipe callback */
+ int (*snd_pipe)(struct connection *conn, struct pipe *pipe); /* send-to-pipe callback */
};
/* A stream interface has 3 parts :
diff --git a/src/proto_tcp.c b/src/proto_tcp.c
index 86fd6bd..9ff5b66 100644
--- a/src/proto_tcp.c
+++ b/src/proto_tcp.c
@@ -479,7 +479,8 @@
conn_data_want_send(&si->conn); /* prepare to send data if any */
si->state = SI_ST_CON;
- si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
+ if (si->conn.data->rcv_pipe && si->conn.data->snd_pipe)
+ si->flags |= SI_FL_CAP_SPLTCP; /* TCP supports splicing */
si->exp = tick_add_ifset(now_ms, be->timeout.connect);
return SN_ERR_NONE; /* connection is OK */
diff --git a/src/raw_sock.c b/src/raw_sock.c
index 60c7ed0..035d338 100644
--- a/src/raw_sock.c
+++ b/src/raw_sock.c
@@ -43,7 +43,7 @@
#include <types/global.h>
-#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
+#if defined(CONFIG_HAP_LINUX_SPLICE)
#include <common/splice.h>
/* A pipe contains 16 segments max, and it's common to see segments of 1448 bytes
@@ -56,74 +56,29 @@
#define MAX_SPLICE_AT_ONCE (1<<30)
/* Returns :
- * -1 if splice is not possible or not possible anymore and we must switch to
- * user-land copy (eg: to_forward reached)
- * 0 otherwise, including errors and close.
- * Sets :
- * BF_READ_NULL
- * BF_READ_PARTIAL
- * BF_WRITE_PARTIAL (during copy)
- * BF_OUT_EMPTY (during copy)
- * SI_FL_ERR
- * SI_FL_WAIT_ROOM
- * (SI_FL_WAIT_RECV)
- *
- * This function automatically allocates a pipe from the pipe pool. It also
- * carefully ensures to clear b->pipe whenever it leaves the pipe empty.
+ * -1 if splice() is not supported
+ * >= 0 to report the amount of spliced bytes.
+ * connection flags are updated (error, read0, wait_room, wait_data).
+ * The caller must have previously allocated the pipe.
*/
-static int sock_raw_splice_in(struct channel *b, struct stream_interface *si)
+int raw_sock_to_pipe(struct connection *conn, struct pipe *pipe, unsigned int count)
{
static int splice_detects_close;
- int fd = si_fd(si);
int ret;
- unsigned long max;
int retval = 0;
- if (!b->to_forward)
- return -1;
-
- if (!(b->flags & BF_KERN_SPLICING))
- return -1;
-
- if (buffer_not_empty(&b->buf)) {
- /* We're embarrassed, there are already data pending in
- * the buffer and we don't want to have them at two
- * locations at a time. Let's indicate we need some
- * place and ask the consumer to hurry.
- */
- si->flags |= SI_FL_WAIT_ROOM;
- conn_data_stop_recv(&si->conn);
- b->rex = TICK_ETERNITY;
- si_chk_snd(b->cons);
- return 0;
- }
-
- if (unlikely(b->pipe == NULL)) {
- if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
- b->flags &= ~BF_KERN_SPLICING;
- return -1;
- }
- }
-
- /* At this point, b->pipe is valid */
-
- while (1) {
- if (b->to_forward == BUF_INFINITE_FORWARD)
- max = MAX_SPLICE_AT_ONCE;
- else
- max = b->to_forward;
+ /* Under Linux, if FD_POLL_HUP is set, we have reached the end.
+ * Since older splice() implementations were buggy and returned
+ * EAGAIN on end of read, let's bypass the call to splice() now.
+ */
+ if ((fdtab[conn->t.sock.fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP)
+ goto out_read0;
- if (!max) {
- /* It looks like the buffer + the pipe already contain
- * the maximum amount of data to be transferred. Try to
- * send those data immediately on the other side if it
- * is currently waiting.
- */
- retval = -1; /* end of forwarding */
- break;
- }
+ while (count) {
+ if (count > MAX_SPLICE_AT_ONCE)
+ count = MAX_SPLICE_AT_ONCE;
- ret = splice(fd, NULL, b->pipe->prod, NULL, max,
+ ret = splice(conn->t.sock.fd, NULL, pipe->prod, NULL, count,
SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
if (ret <= 0) {
@@ -133,8 +88,7 @@
* it works, we store the info for later use.
*/
splice_detects_close = 1;
- b->flags |= BF_READ_NULL;
- break;
+ goto out_read0;
}
if (errno == EAGAIN) {
@@ -142,13 +96,16 @@
* - nothing in the socket buffer (standard)
* - pipe is full
* - the connection is closed (kernel < 2.6.27.13)
- * Since we don't know if pipe is full, we'll
- * stop if the pipe is not empty. Anyway, we
- * will almost always fill/empty the pipe.
+ * The last case is annoying but know if we can detect it
+ * and if we can't then we rely on the call to recv() to
+ * get a valid verdict. The difference between the first
+ * two situations is problematic. Since we don't know if
+ * the pipe is full, we'll stop if the pipe is not empty.
+ * Anyway, we will almost always fill/empty the pipe.
*/
-
- if (b->pipe->data) {
- si->flags |= SI_FL_WAIT_ROOM;
+ if (pipe->data) {
+ /* alway stop reading until the pipe is flushed */
+ conn->flags |= CO_FL_WAIT_ROOM;
break;
}
@@ -161,48 +118,73 @@
* which will be able to deal with the situation.
*/
if (splice_detects_close)
- conn_data_poll_recv(&si->conn); /* we know for sure that it's EAGAIN */
- else
- retval = -1;
+ conn->flags |= CO_FL_WAIT_DATA; /* we know for sure that it's EAGAIN */
break;
}
-
- if (errno == ENOSYS || errno == EINVAL) {
- /* splice not supported on this end, disable it */
- b->flags &= ~BF_KERN_SPLICING;
- si->flags &= ~SI_FL_CAP_SPLICE;
- put_pipe(b->pipe);
- b->pipe = NULL;
+ else if (errno == ENOSYS || errno == EINVAL) {
+ /* splice not supported on this end, disable it.
+ * We can safely return -1 since there is no
+ * chance that any data has been piped yet.
+ */
return -1;
}
-
+ else if (errno == EINTR) {
+ /* try again */
+ continue;
+ }
/* here we have another error */
- si->flags |= SI_FL_ERR;
+ conn->flags |= CO_FL_ERROR;
break;
} /* ret <= 0 */
- if (b->to_forward != BUF_INFINITE_FORWARD)
- b->to_forward -= ret;
- b->total += ret;
- b->pipe->data += ret;
- b->flags |= BF_READ_PARTIAL;
- b->flags &= ~BF_OUT_EMPTY;
+ retval += ret;
+ pipe->data += ret;
- if (b->pipe->data >= SPLICE_FULL_HINT ||
- ret >= global.tune.recv_enough) {
- /* We've read enough of it for this time. */
+ if (pipe->data >= SPLICE_FULL_HINT || ret >= global.tune.recv_enough) {
+ /* We've read enough of it for this time, let's stop before
+ * being asked to poll.
+ */
break;
}
} /* while */
- if (unlikely(!b->pipe->data)) {
- put_pipe(b->pipe);
- b->pipe = NULL;
- }
+ return retval;
+ out_read0:
+ conn_sock_read0(conn);
return retval;
}
+/* Send as many bytes as possible from the pipe to the connection's socket.
+ */
+int raw_sock_from_pipe(struct connection *conn, struct pipe *pipe)
+{
+ int ret, done;
+
+ done = 0;
+ while (pipe->data) {
+ ret = splice(pipe->cons, NULL, conn->t.sock.fd, NULL, pipe->data,
+ SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
+
+ if (ret <= 0) {
+ if (ret == 0 || errno == EAGAIN) {
+ conn->flags |= CO_FL_WAIT_ROOM;
+ break;
+ }
+ else if (errno == EINTR)
+ continue;
+
+ /* here we have another error */
+ conn->flags |= CO_FL_ERROR;
+ break;
+ }
+
+ done += ret;
+ pipe->data -= ret;
+ }
+ return done;
+}
+
#endif /* CONFIG_HAP_LINUX_SPLICE */
@@ -347,6 +329,10 @@
.write = si_conn_send_cb,
.snd_buf = raw_sock_from_buf,
.rcv_buf = raw_sock_to_buf,
+#if defined(CONFIG_HAP_LINUX_SPLICE)
+ .rcv_pipe = raw_sock_to_pipe,
+ .snd_pipe = raw_sock_from_pipe,
+#endif
.close = NULL,
};
diff --git a/src/session.c b/src/session.c
index fa3c4dc..80415b3 100644
--- a/src/session.c
+++ b/src/session.c
@@ -179,12 +179,13 @@
if (likely(s->fe->options2 & PR_O2_INDEPSTR))
s->si[0].flags |= SI_FL_INDEP_STR;
- if (addr->ss_family == AF_INET || addr->ss_family == AF_INET6)
- s->si[0].flags = SI_FL_CAP_SPLTCP; /* TCP/TCPv6 splicing possible */
-
/* add the various callbacks */
stream_interface_prepare(&s->si[0], l->sock);
+ if ((s->si[0].conn.data->rcv_pipe && s->si[0].conn.data->snd_pipe) &&
+ (addr->ss_family == AF_INET || addr->ss_family == AF_INET6))
+ s->si[0].flags = SI_FL_CAP_SPLTCP; /* TCP/TCPv6 splicing possible */
+
/* pre-initialize the other side's stream interface to an INIT state. The
* callbacks will be initialized before attempting to connect.
*/
diff --git a/src/stream_interface.c b/src/stream_interface.c
index eef6dec..2f24c18 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -30,6 +30,7 @@
#include <proto/connection.h>
#include <proto/fd.h>
#include <proto/frontend.h>
+#include <proto/pipe.h>
#include <proto/stream_interface.h>
#include <proto/task.h>
@@ -666,42 +667,30 @@
int write_poll = MAX_WRITE_POLL_LOOPS;
int ret;
-#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
- while (b->pipe) {
- ret = splice(b->pipe->cons, NULL, si_fd(si), NULL, b->pipe->data,
- SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
- if (ret <= 0) {
- if (ret == 0 || errno == EAGAIN) {
- conn_data_poll_send(&si->conn);
- return 0;
- }
- /* here we have another error */
- return -1;
- }
+ conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
- b->flags |= BF_WRITE_PARTIAL;
- b->pipe->data -= ret;
+ if (b->pipe && conn->data->snd_pipe) {
+ ret = conn->data->snd_pipe(conn, b->pipe);
+ if (ret > 0)
+ b->flags |= BF_WRITE_PARTIAL;
if (!b->pipe->data) {
put_pipe(b->pipe);
b->pipe = NULL;
- break;
}
- if (--write_poll <= 0)
- return 0;
+ if (conn->flags & CO_FL_ERROR)
+ return -1;
- /* The only reason we did not empty the pipe is that the output
- * buffer is full.
- */
- conn_data_poll_send(&si->conn);
- return 0;
+ if (conn->flags & CO_FL_WAIT_ROOM) {
+ conn_data_poll_send(conn);
+ return 0;
+ }
}
/* At this point, the pipe is empty, but we may still have data pending
* in the normal buffer.
*/
-#endif
if (!b->buf.o) {
b->flags |= BF_OUT_EMPTY;
return 0;
@@ -710,7 +699,6 @@
/* when we're in this loop, we already know that there is no spliced
* data left, and that there are sendable buffered data.
*/
- conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_WR_SH | CO_FL_DATA_WR_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
/* check if we want to inform the kernel that we're interested in
* sending more data after this call. We want this if :
@@ -1004,34 +992,69 @@
if (b->flags & BF_SHUTR)
return;
+ cur_read = 0;
+ conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
+
-#if 0 && defined(CONFIG_HAP_LINUX_SPLICE)
- if (b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) {
+ /* First, let's see if we may splice data across the channel without
+ * using a buffer.
+ */
+ if (conn->data->rcv_pipe &&
+ b->to_forward >= MIN_SPLICE_FORWARD && b->flags & BF_KERN_SPLICING) {
+ if (buffer_not_empty(&b->buf)) {
+ /* We're embarrassed, there are already data pending in
+ * the buffer and we don't want to have them at two
+ * locations at a time. Let's indicate we need some
+ * place and ask the consumer to hurry.
+ */
+ goto abort_splice;
+ }
- /* Under Linux, if FD_POLL_HUP is set, we have reached the end.
- * Since older splice() implementations were buggy and returned
- * EAGAIN on end of read, let's bypass the call to splice() now.
- */
- if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
- goto out_shutdown_r;
+ if (unlikely(b->pipe == NULL)) {
+ if (pipes_used >= global.maxpipes || !(b->pipe = get_pipe())) {
+ b->flags &= ~BF_KERN_SPLICING;
+ goto abort_splice;
+ }
+ }
+
+ ret = conn->data->rcv_pipe(conn, b->pipe, b->to_forward);
+ if (ret < 0) {
+ /* splice not supported on this end, let's disable it */
+ b->flags &= ~BF_KERN_SPLICING;
+ si->flags &= ~SI_FL_CAP_SPLICE;
+ goto abort_splice;
+ }
- if (sock_raw_splice_in(b, si) >= 0) {
- if (si->flags & SI_FL_ERR)
- goto out_error;
- if (b->flags & BF_READ_NULL)
- goto out_shutdown_r;
- return;
+ if (ret > 0) {
+ if (b->to_forward != BUF_INFINITE_FORWARD)
+ b->to_forward -= ret;
+ b->total += ret;
+ cur_read += ret;
+ b->flags |= BF_READ_PARTIAL;
+ b->flags &= ~BF_OUT_EMPTY;
}
+
+ if (conn_data_read0_pending(conn))
+ goto out_shutdown_r;
+
+ if (conn->flags & CO_FL_ERROR)
+ goto out_error;
+
/* splice not possible (anymore), let's go on on standard copy */
}
-#endif
- cur_read = 0;
- conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
- while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
+
+ abort_splice:
+ /* release the pipe if we can, which is almost always the case */
+ if (b->pipe && !b->pipe->data) {
+ put_pipe(b->pipe);
+ b->pipe = NULL;
+ }
+
+ while (!b->pipe && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
max = bi_avail(b);
if (!max) {
b->flags |= BF_FULL;
- si->flags |= SI_FL_WAIT_ROOM;
+ conn->flags |= CO_FL_WAIT_ROOM;
break;
}
@@ -1133,7 +1156,39 @@
}
} /* while !flags */
+ if (conn->flags & CO_FL_ERROR)
+ goto out_error;
+
- if (conn->flags & CO_FL_WAIT_DATA) {
+ if (conn->flags & CO_FL_WAIT_ROOM) {
+ /* We might have some data the consumer is waiting for.
+ * We can do fast-forwarding, but we avoid doing this for partial
+ * buffers, because it is very likely that it will be done again
+ * immediately afterwards once the following data is parsed (eg:
+ * HTTP chunking).
+ */
+ if (((b->flags & (BF_READ_PARTIAL|BF_OUT_EMPTY)) == BF_READ_PARTIAL) &&
+ (b->pipe /* always try to send spliced data */ ||
+ (b->buf.i == 0 && (b->cons->flags & SI_FL_WAIT_DATA)))) {
+ int last_len = b->pipe ? b->pipe->data : 0;
+
+ si_chk_snd(b->cons);
+
+ /* check if the consumer has freed some space */
+ if (!(b->flags & BF_FULL) &&
+ (!last_len || !b->pipe || b->pipe->data < last_len))
+ si->flags &= ~SI_FL_WAIT_ROOM;
+ }
+
+ if (si->flags & SI_FL_WAIT_ROOM) {
+ conn_data_stop_recv(conn);
+ b->rex = TICK_ETERNITY;
+ }
+ else if ((b->flags & (BF_SHUTR|BF_READ_PARTIAL|BF_FULL|BF_DONT_READ|BF_READ_NOEXP)) == BF_READ_PARTIAL) {
+ if (tick_isset(b->rex))
+ b->rex = tick_add_ifset(now_ms, b->rto);
+ }
+ }
+ else if (conn->flags & CO_FL_WAIT_DATA) {
/* we don't automatically ask for polling if we have
* read enough data, as it saves some syscalls with
* speculative pollers.
@@ -1144,9 +1199,6 @@
__conn_data_want_recv(conn);
}
- if (conn->flags & CO_FL_ERROR)
- goto out_error;
-
if (conn_data_read0_pending(conn))
/* connection closed */
goto out_shutdown_r;