MAJOR: raw_sock: extract raw_sock_to_buf() from raw_sock_read()
This is the start of the stream connection iterator which calls the
data-layer reader. This still looks a bit tricky but is OK. Splicing
is not handled at all at the moment.
diff --git a/include/types/connection.h b/include/types/connection.h
index 8dd4f20..6d402cf 100644
--- a/include/types/connection.h
+++ b/include/types/connection.h
@@ -73,6 +73,13 @@
*/
CO_FL_POLL_SOCK = CO_FL_HANDSHAKE | CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN,
+ /* These flags are used by data layers to indicate to their iterators
+ * whether they had to stop due to missing data or missing room. Their
+ * callers must reset them before calling the data layer handlers.
+ */
+ CO_FL_WAIT_DATA = 0x00004000, /* data source is empty */
+ CO_FL_WAIT_ROOM = 0x00008000, /* data sink is full */
+
/* flags used to remember what shutdown have been performed/reported */
CO_FL_DATA_RD_SH = 0x00010000, /* DATA layer was notified about shutr/read0 */
CO_FL_DATA_WR_SH = 0x00020000, /* DATA layer asked for shutw */
diff --git a/src/raw_sock.c b/src/raw_sock.c
index 28ee222..2c96cbb 100644
--- a/src/raw_sock.c
+++ b/src/raw_sock.c
@@ -209,19 +209,91 @@
#endif /* CONFIG_HAP_LINUX_SPLICE */
+/* Receive up to <count> bytes from connection <conn>'s socket and store them
+ * into buffer <buf>. The caller must ensure that <count> is always smaller
+ * than the buffer's size. Only one call to recv() is performed, unless the
+ * buffer wraps, in which case a second call may be performed. The connection's
+ * flags are updated with whatever special event is detected (error, read0,
+ * empty). The caller is responsible for taking care of those events and
+ * avoiding the call if inappropriate. The function does not call the
+ * connection's polling update function, so the caller is responsible for this.
+ */
+static int raw_sock_to_buf(struct connection *conn, struct buffer *buf, int count)
+{
+ int ret, done = 0;
+ int try = count;
+
+ /* stop here if we reached the end of data */
+ if ((fdtab[conn->t.sock.fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP)
+ goto read0;
+
+ /* compute the maximum block size we can read at once. */
+ if (buffer_empty(buf)) {
+ /* let's realign the buffer to optimize I/O */
+ buf->p = buf->data;
+ }
+ else if (buf->data + buf->o < buf->p &&
+ buf->p + buf->i < buf->data + buf->size) {
+ /* remaining space wraps at the end, with a moving limit */
+ if (try > buf->data + buf->size - (buf->p + buf->i))
+ try = buf->data + buf->size - (buf->p + buf->i);
+ }
+
+ /* read the largest possible block. For this, we perform only one call
+ * to recv() unless the buffer wraps and we exactly fill the first hunk,
+ * in which case we accept to do it once again. A new attempt is made on
+ * EINTR too.
+ */
+ while (try) {
+ ret = recv(conn->t.sock.fd, bi_end(buf), try, 0);
+
+ if (ret > 0) {
+ buf->i += ret;
+ done += ret;
+ if (ret < try) {
+ /* unfortunately, on level-triggered events, POLL_HUP
+ * is generally delivered AFTER the system buffer is
+ * empty, so this one might never match.
+ */
+ if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
+ goto read0;
+ break;
+ }
+ count -= ret;
+ try = count;
+ }
+ else if (ret == 0) {
+ goto read0;
+ }
+ else if (errno == EAGAIN) {
+ conn->flags |= CO_FL_WAIT_DATA;
+ break;
+ }
+ else if (errno != EINTR) {
+ conn->flags |= CO_FL_ERROR;
+ break;
+ }
+ }
+ return done;
+
+ read0:
+ conn_sock_read0(conn);
+ return done;
+}
+
+
/*
* this function is called on a read event from a stream socket.
*/
static void sock_raw_read(struct connection *conn)
{
- int fd = conn->t.sock.fd;
struct stream_interface *si = container_of(conn, struct stream_interface, conn);
struct channel *b = si->ib;
int ret, max, cur_read;
int read_poll = MAX_READ_POLL_LOOPS;
#ifdef DEBUG_FULL
- fprintf(stderr,"sock_raw_read : fd=%d, ev=0x%02x, owner=%p\n", fd, fdtab[fd].ev, fdtab[fd].owner);
+ fprintf(stderr,"sock_raw_read : fd=%d, ev=0x%02x, owner=%p\n", conn->t.sock.fd, fdtab[conn->t.sock.fd].ev, fdtab[conn->t.sock.fd].owner);
#endif
/* stop immediately on errors. Note that we DON'T want to stop on
* POLL_ERR, as the poller might report a write error while there
@@ -233,7 +305,7 @@
goto out_error;
/* stop here if we reached the end of data */
- if ((fdtab[fd].ev & (FD_POLL_IN|FD_POLL_HUP)) == FD_POLL_HUP)
+ if (conn_data_read0_pending(conn))
goto out_shutdown_r;
/* maybe we were called immediately after an asynchronous shutr */
@@ -247,7 +319,7 @@
* Since older splice() implementations were buggy and returned
* EAGAIN on end of read, let's bypass the call to splice() now.
*/
- if (fdtab[fd].ev & FD_POLL_HUP)
+ if (fdtab[conn->t.sock.fd].ev & FD_POLL_HUP)
goto out_shutdown_r;
if (sock_raw_splice_in(b, si) >= 0) {
@@ -261,7 +333,8 @@
}
#endif
cur_read = 0;
- while (1) {
+ conn->flags &= ~(CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM);
+ while (!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_DATA | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {
max = bi_avail(b);
if (!max) {
@@ -269,173 +342,134 @@
si->flags |= SI_FL_WAIT_ROOM;
break;
}
-
- /*
- * 1. compute the maximum block size we can read at once.
- */
- if (buffer_empty(&b->buf)) {
- /* let's realign the buffer to optimize I/O */
- b->buf.p = b->buf.data;
- }
- else if (b->buf.data + b->buf.o < b->buf.p &&
- b->buf.p + b->buf.i < b->buf.data + b->buf.size) {
- /* remaining space wraps at the end, with a moving limit */
- if (max > b->buf.data + b->buf.size - (b->buf.p + b->buf.i))
- max = b->buf.data + b->buf.size - (b->buf.p + b->buf.i);
- }
- /* else max is already OK */
- /*
- * 2. read the largest possible block
- */
- ret = recv(fd, bi_end(&b->buf), max, 0);
+ ret = raw_sock_to_buf(conn, &b->buf, max);
+ if (ret <= 0)
+ break;
- if (ret > 0) {
- b->buf.i += ret;
- cur_read += ret;
+ cur_read += ret;
- /* if we're allowed to directly forward data, we must update ->o */
- if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) {
- unsigned long fwd = ret;
- if (b->to_forward != BUF_INFINITE_FORWARD) {
- if (fwd > b->to_forward)
- fwd = b->to_forward;
- b->to_forward -= fwd;
- }
- b_adv(b, fwd);
+ /* if we're allowed to directly forward data, we must update ->o */
+ if (b->to_forward && !(b->flags & (BF_SHUTW|BF_SHUTW_NOW))) {
+ unsigned long fwd = ret;
+ if (b->to_forward != BUF_INFINITE_FORWARD) {
+ if (fwd > b->to_forward)
+ fwd = b->to_forward;
+ b->to_forward -= fwd;
}
+ b_adv(b, fwd);
+ }
- if (conn->flags & CO_FL_WAIT_L4_CONN) {
- conn->flags &= ~CO_FL_WAIT_L4_CONN;
- si->exp = TICK_ETERNITY;
- }
+ if (conn->flags & CO_FL_WAIT_L4_CONN) {
+ conn->flags &= ~CO_FL_WAIT_L4_CONN;
+ si->exp = TICK_ETERNITY;
+ }
- b->flags |= BF_READ_PARTIAL;
- b->total += ret;
+ b->flags |= BF_READ_PARTIAL;
+ b->total += ret;
- if (bi_full(b)) {
- /* The buffer is now full, there's no point in going through
- * the loop again.
- */
- if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) {
- b->xfer_small = 0;
- b->xfer_large++;
- if (b->xfer_large >= 3) {
- /* we call this buffer a fast streamer if it manages
- * to be filled in one call 3 consecutive times.
- */
- b->flags |= (BF_STREAMER | BF_STREAMER_FAST);
- //fputc('+', stderr);
- }
- }
- else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
- (cur_read <= b->buf.size / 2)) {
- b->xfer_large = 0;
- b->xfer_small++;
- if (b->xfer_small >= 2) {
- /* if the buffer has been at least half full twice,
- * we receive faster than we send, so at least it
- * is not a "fast streamer".
- */
- b->flags &= ~BF_STREAMER_FAST;
- //fputc('-', stderr);
- }
- }
- else {
- b->xfer_small = 0;
- b->xfer_large = 0;
+ if (bi_full(b)) {
+ /* The buffer is now full, there's no point in going through
+ * the loop again.
+ */
+ if (!(b->flags & BF_STREAMER_FAST) && (cur_read == buffer_len(&b->buf))) {
+ b->xfer_small = 0;
+ b->xfer_large++;
+ if (b->xfer_large >= 3) {
+ /* we call this buffer a fast streamer if it manages
+ * to be filled in one call 3 consecutive times.
+ */
+ b->flags |= (BF_STREAMER | BF_STREAMER_FAST);
+ //fputc('+', stderr);
}
-
- b->flags |= BF_FULL;
- si->flags |= SI_FL_WAIT_ROOM;
- break;
}
-
- /* if too many bytes were missing from last read, it means that
- * it's pointless trying to read again because the system does
- * not have them in buffers. BTW, if FD_POLL_HUP was present,
- * it means that we have reached the end and that the connection
- * is closed.
- */
- if (ret < max) {
- if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
- (cur_read <= b->buf.size / 2)) {
- b->xfer_large = 0;
- b->xfer_small++;
- if (b->xfer_small >= 3) {
- /* we have read less than half of the buffer in
- * one pass, and this happened at least 3 times.
- * This is definitely not a streamer.
- */
- b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST);
- //fputc('!', stderr);
- }
+ else if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
+ (cur_read <= b->buf.size / 2)) {
+ b->xfer_large = 0;
+ b->xfer_small++;
+ if (b->xfer_small >= 2) {
+ /* if the buffer has been at least half full twice,
+ * we receive faster than we send, so at least it
+ * is not a "fast streamer".
+ */
+ b->flags &= ~BF_STREAMER_FAST;
+ //fputc('-', stderr);
}
- /* unfortunately, on level-triggered events, POLL_HUP
- * is generally delivered AFTER the system buffer is
- * empty, so this one might never match.
- */
- if (fdtab[fd].ev & FD_POLL_HUP)
- goto out_shutdown_r;
+ }
+ else {
+ b->xfer_small = 0;
+ b->xfer_large = 0;
+ }
- /* if a streamer has read few data, it may be because we
- * have exhausted system buffers. It's not worth trying
- * again.
- */
- if (b->flags & BF_STREAMER)
- break;
+ b->flags |= BF_FULL;
+ si->flags |= SI_FL_WAIT_ROOM;
+ break;
+ }
- /* generally if we read something smaller than 1 or 2 MSS,
- * it means that either we have exhausted the system's
- * buffers (streamer or question-response protocol) or
- * that the connection will be closed. Streamers are
- * easily detected so we return early. For other cases,
- * it's still better to perform a last read to be sure,
- * because it may save one complete poll/read/wakeup cycle
- * in case of shutdown.
- */
- if (ret < MIN_RET_FOR_READ_LOOP && b->flags & BF_STREAMER)
- break;
+ if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0)
+ break;
- /* if we read a large block smaller than what we requested,
- * it's almost certain we'll never get anything more.
- */
- if (ret >= global.tune.recv_enough)
- break;
+ /* if too many bytes were missing from last read, it means that
+ * it's pointless trying to read again because the system does
+ * not have them in buffers.
+ */
+ if (ret < max) {
+ if ((b->flags & (BF_STREAMER | BF_STREAMER_FAST)) &&
+ (cur_read <= b->buf.size / 2)) {
+ b->xfer_large = 0;
+ b->xfer_small++;
+ if (b->xfer_small >= 3) {
+ /* we have read less than half of the buffer in
+ * one pass, and this happened at least 3 times.
+ * This is definitely not a streamer.
+ */
+ b->flags &= ~(BF_STREAMER | BF_STREAMER_FAST);
+ //fputc('!', stderr);
+ }
}
- if ((b->flags & BF_READ_DONTWAIT) || --read_poll <= 0)
+ /* if a streamer has read few data, it may be because we
+ * have exhausted system buffers. It's not worth trying
+ * again.
+ */
+ if (b->flags & BF_STREAMER)
break;
- }
- else if (ret == 0) {
- /* connection closed */
- goto out_shutdown_r;
- }
- else if (errno == EAGAIN) {
- /* Ignore EAGAIN but inform the poller that there is
- * nothing to read left if we did not read much, ie
- * less than what we were still expecting to read.
- * But we may have done some work justifying to notify
- * the task.
+
+ /* if we read a large block smaller than what we requested,
+ * it's almost certain we'll never get anything more.
*/
- if (cur_read < MIN_RET_FOR_READ_LOOP)
- conn_data_poll_recv(conn);
- break;
- }
- else {
- goto out_error;
+ if (ret >= global.tune.recv_enough)
+ break;
}
- } /* while (1) */
+ } /* while !flags */
+
+ if (conn->flags & CO_FL_ERROR)
+ goto out_error;
+
+ if (conn->flags & CO_FL_WAIT_DATA) {
+ /* we don't automatically ask for polling if we have
+ * read enough data, as it saves some syscalls with
+ * speculative pollers.
+ */
+ if (cur_read < MIN_RET_FOR_READ_LOOP)
+ __conn_data_poll_recv(conn);
+ else
+ __conn_data_want_recv(conn);
+ }
+
+ if (conn_data_read0_pending(conn))
+ /* connection closed */
+ goto out_shutdown_r;
return;
out_shutdown_r:
/* we received a shutdown */
- fdtab[fd].ev &= ~FD_POLL_HUP;
b->flags |= BF_READ_NULL;
if (b->flags & BF_AUTO_CLOSE)
buffer_shutw_now(b);
stream_sock_read0(si);
+ conn_data_read0(conn);
return;
out_error: