MEDIUM: stconn: move the RXBLK flags to the stream connector
The following flags are not at all related to the endpoint but to the
connector itself:
- SE_FL_RXBLK_ROOM
- SE_FL_RXBLK_BUFF
- SE_FL_RXBLK_CHAN
As such they have no business staying in the endpoint descriptor and
they must move to the stream connector. They've also been renamed
accordingly to better match what they correspond to (the same name
as the function that sets them).
The rare occurrences of cs_rx_blocked() were replaced by an explicit
test on the list of flags. The reason is that cs_rx_blocked() used to
preserve some tests that are not needed at certain places since already
known. For the same reason SE_FL_RXBLK_ANY wasn't converted. As such it
will later be possible to carefully review these few locations and
eliminate the unneeded flags from the tests. No particular function
was made to test them since they're explicit enough.
It now looks like ci_putchk() and friends could very well place the flag
themselves on the connector when they detect a buffer full condition, as
this would significantly simplify the high-level API. But all usages must
first be reviewed before this simplification can be done. For now it
remains done by applet_put*() instead.
diff --git a/dev/flags/flags.c b/dev/flags/flags.c
index 19036a3..6bac4c9 100644
--- a/dev/flags/flags.c
+++ b/dev/flags/flags.c
@@ -184,9 +184,6 @@
return;
}
- SHOW_FLAG(f, SE_FL_RXBLK_CHAN);
- SHOW_FLAG(f, SE_FL_RXBLK_BUFF);
- SHOW_FLAG(f, SE_FL_RXBLK_ROOM);
SHOW_FLAG(f, SE_FL_APPLET_NEED_CONN);
SHOW_FLAG(f, SE_FL_HAVE_NO_DATA);
SHOW_FLAG(f, SE_FL_WANT_GET);
@@ -223,6 +220,9 @@
printf("0\n");
return;
}
+ SHOW_FLAG(f, SC_FL_NEED_ROOM);
+ SHOW_FLAG(f, SC_FL_NEED_BUFF);
+ SHOW_FLAG(f, SC_FL_WONT_READ);
SHOW_FLAG(f, SC_FL_INDEP_STR);
SHOW_FLAG(f, SC_FL_DONT_WAKE);
SHOW_FLAG(f, SC_FL_NOLINGER);
diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h
index 5fc6249..e7c1e29 100644
--- a/include/haproxy/applet.h
+++ b/include/haproxy/applet.h
@@ -145,8 +145,8 @@
}
/* writes chunk <chunk> into the input channel of the stream attached to this
- * appctx's endpoint, and marks the RXBLK_ROOM on a channel full error. See
- * ci_putchk() for the list of return codes.
+ * appctx's endpoint, and marks the SC_FL_NEED_ROOM on a channel full error.
+ * See ci_putchk() for the list of return codes.
*/
static inline int applet_putchk(struct appctx *appctx, struct buffer *chunk)
{
@@ -155,14 +155,14 @@
ret = ci_putchk(sc_ic(se->sc), chunk);
if (ret == -1)
- se_fl_set(se, SE_FL_RXBLK_ROOM);
+ sc_need_room(se->sc);
return ret;
}
/* writes <len> chars from <blk> into the input channel of the stream attached
- * to this appctx's endpoint, and marks the RXBLK_ROOM on a channel full error.
- * See ci_putblk() for the list of return codes.
+ * to this appctx's endpoint, and marks the SC_FL_NEED_ROOM on a channel full
+ * error. See ci_putblk() for the list of return codes.
*/
static inline int applet_putblk(struct appctx *appctx, const char *blk, int len)
{
@@ -171,15 +171,15 @@
ret = ci_putblk(sc_ic(se->sc), blk, len);
if (ret == -1)
- se_fl_set(se, SE_FL_RXBLK_ROOM);
+ sc_need_room(se->sc);
return ret;
}
/* writes chars from <str> up to the trailing zero (excluded) into the input
* channel of the stream attached to this appctx's endpoint, and marks the
- * RXBLK_ROOM on a channel full error. See ci_putstr() for the list of return
- * codes.
+ * SC_FL_NEED_ROOM on a channel full error. See ci_putstr() for the list of
+ * return codes.
*/
static inline int applet_putstr(struct appctx *appctx, const char *str)
{
@@ -188,14 +188,14 @@
ret = ci_putstr(sc_ic(se->sc), str);
if (ret == -1)
- se_fl_set(se, SE_FL_RXBLK_ROOM);
+ sc_need_room(se->sc);
return ret;
}
/* writes character <chr> into the input channel of the stream attached to this
- * appctx's endpoint, and marks the RXBLK_ROOM on a channel full error. See
- * ci_putchr() for the list of return codes.
+ * appctx's endpoint, and marks the SC_FL_NEED_ROOM on a channel full error.
+ * See ci_putchr() for the list of return codes.
*/
static inline int applet_putchr(struct appctx *appctx, char chr)
{
@@ -204,7 +204,7 @@
ret = ci_putchr(sc_ic(se->sc), chr);
if (ret == -1)
- se_fl_set(se, SE_FL_RXBLK_ROOM);
+ sc_need_room(se->sc);
return ret;
}
diff --git a/include/haproxy/conn_stream-t.h b/include/haproxy/conn_stream-t.h
index 1aa6115..601018a 100644
--- a/include/haproxy/conn_stream-t.h
+++ b/include/haproxy/conn_stream-t.h
@@ -75,11 +75,10 @@
SE_FL_WAIT_DATA = 0x00800000, /* CS waits for more outgoing data to send */
SE_FL_WANT_GET = 0x01000000, /* CS would like to get some data from the buffer */
SE_FL_HAVE_NO_DATA = 0x02000000, /* the endpoint has no more data to deliver to the stream */
- SE_FL_RXBLK_CHAN = 0x04000000, /* the channel doesn't want the CS to introduce data */
- SE_FL_RXBLK_BUFF = 0x08000000, /* CS waits for a buffer allocation to complete */
- SE_FL_RXBLK_ROOM = 0x10000000, /* CS waits for more buffer room to store incoming data */
- SE_FL_RXBLK_ANY = 0x1C000000, /* any of the RXBLK flags above */
- SE_FL_APP_MASK = 0x1fe00000, /* Mask for flags set by the app layer */
+ SE_FL_APP_MASK = 0x02e00000, /* Mask for flags set by the app layer */
+ /* unused 0x04000000,*/
+ /* unused 0x08000000,*/
+ /* unused 0x10000000,*/
/* unused 0x20000000,*/
SE_FL_APPLET_NEED_CONN = 0x40000000, /* applet is waiting for the other side to (fail to) connect */
};
@@ -96,6 +95,10 @@
SC_FL_NOHALF = 0x00000010, /* no half close, close both sides at once */
SC_FL_DONT_WAKE = 0x00000020, /* resync in progress, don't wake up */
SC_FL_INDEP_STR = 0x00000040, /* independent streams = don't update rex on write */
+
+ SC_FL_WONT_READ = 0x00000080, /* SC doesn't want to read data */
+ SC_FL_NEED_BUFF = 0x00000100, /* SC waits for an rx buffer allocation to complete */
+ SC_FL_NEED_ROOM = 0x00000200, /* SC needs more room in the rx buffer to store incoming data */
};
/* A conn stream must have its own errors independently of the buffer's, so that
diff --git a/include/haproxy/conn_stream.h b/include/haproxy/conn_stream.h
index 296a853..6317e06 100644
--- a/include/haproxy/conn_stream.h
+++ b/include/haproxy/conn_stream.h
@@ -283,20 +283,14 @@
sc_conn_shutr(cs, CO_SHR_DRAIN);
}
-/* Returns non-zero if the stream connector's Rx path is blocked */
-static inline int cs_rx_blocked(const struct stconn *cs)
-{
- return !!sc_ep_test(cs, SE_FL_RXBLK_ANY);
-}
-
/* Returns non-zero if the stream connector's Rx path is blocked because of
* lack of room in the input buffer. This usually happens after applets failed
* to deliver data into the channel's buffer and reported it via sc_need_room().
*/
__attribute__((warn_unused_result))
-static inline int sc_waiting_room(const struct stconn *cs)
+static inline int sc_waiting_room(const struct stconn *sc)
{
- return !!sc_ep_test(cs, SE_FL_RXBLK_ROOM);
+ return !!(sc->flags & SC_FL_NEED_ROOM);
}
/* The stream endpoint announces it has more data to deliver to the stream's
@@ -318,18 +312,18 @@
/* The application layer informs a stream connector that it's willing to
* receive data from the endpoint.
*/
-static inline void sc_will_read(struct stconn *cs)
+static inline void sc_will_read(struct stconn *sc)
{
- sc_ep_clr(cs, SE_FL_RXBLK_CHAN);
+ sc->flags &= ~SC_FL_WONT_READ;
}
/* The application layer informs a stream connector that it will not receive
* data from the endpoint (e.g. need to flush, bw limitations etc). Usually
* it corresponds to the channel's CF_DONT_READ flag.
*/
-static inline void sc_wont_read(struct stconn *cs)
+static inline void sc_wont_read(struct stconn *sc)
{
- sc_ep_set(cs, SE_FL_RXBLK_CHAN);
+ sc->flags |= SC_FL_WONT_READ;
}
/* An frontend (applet) stream endpoint tells the connector it needs the other
@@ -345,38 +339,38 @@
/* The application layer tells the stream connector that it just got the input
* buffer it was waiting for.
*/
-static inline void sc_have_buff(struct stconn *cs)
+static inline void sc_have_buff(struct stconn *sc)
{
- sc_ep_clr(cs, SE_FL_RXBLK_BUFF);
+ sc->flags &= ~SC_FL_NEED_BUFF;
}
/* The stream connector failed to get an input buffer and is waiting for it.
* It indicates a willingness to deliver data to the buffer that will have to
* be retried. As such, callers will often automatically clear SE_FL_HAVE_NO_DATA
- * called again as soon as RXBLK_BUFF is cleared.
+ * to be called again as soon as SC_FL_NEED_BUFF is cleared.
*/
-static inline void sc_need_buff(struct stconn *cs)
+static inline void sc_need_buff(struct stconn *sc)
{
- sc_ep_set(cs, SE_FL_RXBLK_BUFF);
+ sc->flags |= SC_FL_NEED_BUFF;
}
/* Tell a stream connector some room was made in the input buffer and any
* failed attempt to inject data into it may be tried again. This is usually
* called after a successful transfer of buffer contents to the other side.
*/
-static inline void sc_have_room(struct stconn *cs)
+static inline void sc_have_room(struct stconn *sc)
{
- sc_ep_clr(cs, SE_FL_RXBLK_ROOM);
+ sc->flags &= ~SC_FL_NEED_ROOM;
}
/* The stream connector announces it failed to put data into the input buffer
* by lack of room. Since it indicates a willingness to deliver data to the
* buffer that will have to be retried. Usually the caller will also clear
- * SE_FL_HAVE_NO_DATA to be called again as soon as RXBLK_ROOM is cleared.
+ * SE_FL_HAVE_NO_DATA to be called again as soon as SC_FL_NEED_ROOM is cleared.
*/
-static inline void sc_need_room(struct stconn *cs)
+static inline void sc_need_room(struct stconn *sc)
{
- sc_ep_set(cs, SE_FL_RXBLK_ROOM);
+ sc->flags |= SC_FL_NEED_ROOM;
}
/* Returns non-zero if the stream connector's Tx path is blocked */
diff --git a/include/haproxy/cs_utils.h b/include/haproxy/cs_utils.h
index 13f29e3..b53602c 100644
--- a/include/haproxy/cs_utils.h
+++ b/include/haproxy/cs_utils.h
@@ -150,7 +150,7 @@
* channel_alloc_buffer() for this so it abides by its rules. It returns 0 on
* failure, non-zero otherwise. If no buffer is available, the requester,
* represented by the <wait> pointer, will be added in the list of objects
- * waiting for an available buffer, and SE_FL_RXBLK_BUFF will be set on the
+ * waiting for an available buffer, and SC_FL_NEED_BUFF will be set on the
* stream connector and SE_FL_HAVE_NO_DATA cleared. The requester will be responsible
* for calling this function to try again once woken up.
*/
@@ -307,7 +307,7 @@
if (sc_ep_test(sc, SE_FL_HAVE_NO_DATA))
return 0;
- return !cs_rx_blocked(sc);
+ return !(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM));
}
/* This is to be used after making some room available in a channel. It will
diff --git a/src/applet.c b/src/applet.c
index 7330512..7e7f47d 100644
--- a/src/applet.c
+++ b/src/applet.c
@@ -167,7 +167,7 @@
struct stconn *cs = appctx_cs(appctx);
/* allocation requested ? */
- if (!se_fl_test(appctx->sedesc, SE_FL_RXBLK_BUFF))
+ if (!(cs->flags & SC_FL_NEED_BUFF))
return 0;
sc_have_buff(cs);
@@ -244,8 +244,8 @@
/* measure the call rate and check for anomalies when too high */
rate = update_freq_ctr(&app->call_rate, 1);
if (rate >= 100000 && app->call_rate.prev_ctr && // looped more than 100k times over last second
- ((b_size(sc_ib(cs)) && se_fl_test(app->sedesc, SE_FL_RXBLK_BUFF)) || // asks for a buffer which is present
- (b_size(sc_ib(cs)) && !b_data(sc_ib(cs)) && se_fl_test(app->sedesc, SE_FL_RXBLK_ROOM)) || // asks for room in an empty buffer
+ ((b_size(sc_ib(cs)) && cs->flags & SC_FL_NEED_ROOM) || // asks for a buffer which is present
+ (b_size(sc_ib(cs)) && !b_data(sc_ib(cs)) && cs->flags & SC_FL_NEED_ROOM) || // asks for room in an empty buffer
(b_data(sc_ob(cs)) && sc_is_send_allowed(cs)) || // asks for data already present
(!b_data(sc_ib(cs)) && b_data(sc_ob(cs)) && // didn't return anything ...
(sc_oc(cs)->flags & (CF_WRITE_PARTIAL|CF_SHUTW_NOW)) == CF_SHUTW_NOW))) { // ... and left data pending after a shut
diff --git a/src/conn_stream.c b/src/conn_stream.c
index 284fa52..3e89512 100644
--- a/src/conn_stream.c
+++ b/src/conn_stream.c
@@ -1031,7 +1031,7 @@
*/
sc_have_room(cs);
}
- if (sc_ep_test(cs, SE_FL_RXBLK_ANY))
+ if (cs->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))
ic->rex = TICK_ETERNITY;
else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
ic->rex = tick_add_ifset(now_ms, ic->rto);
@@ -1092,7 +1092,7 @@
* layers (applets, connections) after I/O completion. After updating the stream
* interface and timeouts, it will try to forward what can be forwarded, then to
* wake the associated task up if an important event requires special handling.
- * It may update SE_FL_WAIT_DATA and/or SE_FL_RXBLK_ROOM, that the callers are
+ * It may update SE_FL_WAIT_DATA and/or SC_FL_NEED_ROOM, that the callers are
* encouraged to watch to take appropriate action.
* It should not be called from within the stream itself, cs_update()
* is designed for this.
@@ -1144,11 +1144,12 @@
* are output data, but we avoid doing this if some of the data are
* not yet scheduled for being forwarded, because it is very likely
* that it will be done again immediately afterwards once the following
- * data are parsed (eg: HTTP chunking). We only SE_FL_RXBLK_ROOM once
- * we've emptied *some* of the output buffer, and not just when there
- * is available room, because applets are often forced to stop before
- * the buffer is full. We must not stop based on input data alone because
- * an HTTP parser might need more data to complete the parsing.
+ * data are parsed (eg: HTTP chunking). We only clear SC_FL_NEED_ROOM
+ * once we've emptied *some* of the output buffer, and not just when
+ * there is available room, because applets are often forced to stop
+ * before the buffer is full. We must not stop based on input data
+ * alone because an HTTP parser might need more data to complete the
+ * parsing.
*/
if (!channel_is_empty(ic) &&
sc_ep_test(cso, SE_FL_WAIT_DATA) &&
@@ -1178,7 +1179,8 @@
cs_chk_rcv(cs);
cs_chk_rcv(cso);
- if (ic->flags & CF_SHUTR || sc_ep_test(cs, SE_FL_APPLET_NEED_CONN) || cs_rx_blocked(cs)) {
+ if (ic->flags & CF_SHUTR || sc_ep_test(cs, SE_FL_APPLET_NEED_CONN) ||
+ (cs->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))) {
ic->rex = TICK_ETERNITY;
}
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) {
@@ -1521,7 +1523,7 @@
/* if we are waiting for more space, don't try to read more data
* right now.
*/
- if (cs_rx_blocked(cs))
+ if (cs->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))
break;
} /* while !flags */
@@ -1585,7 +1587,8 @@
sc_conn_read0(cs);
ret = 1;
}
- else if (!cs_rx_blocked(cs) && !(ic->flags & CF_SHUTR)) {
+ else if (!(cs->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) &&
+ !(ic->flags & CF_SHUTR)) {
/* Subscribe to receive events if we're blocking on I/O */
conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event);
se_have_no_more_data(cs->sedesc);
@@ -1925,15 +1928,16 @@
/* automatically mark the applet having data available if it reported
* begin blocked by the channel.
*/
- if (cs_rx_blocked(cs) || sc_ep_test(cs, SE_FL_APPLET_NEED_CONN))
+ if ((cs->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) ||
+ sc_ep_test(cs, SE_FL_APPLET_NEED_CONN))
applet_have_more_data(__sc_appctx(cs));
/* update the stream connector, channels, and possibly wake the stream up */
cs_notify(cs);
stream_release_buffers(__sc_strm(cs));
- /* cs_notify may have passed through chk_snd and released some
- * RXBLK flags. Process_stream will consider those flags to wake up the
+ /* cs_notify may have passed through chk_snd and released some blocking
+ * flags. Process_stream will consider those flags to wake up the
* appctx but in the case the task is not in runqueue we may have to
* wakeup the appctx immediately.
*/
diff --git a/src/stream.c b/src/stream.c
index 913f925..c46bbe0 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -304,7 +304,7 @@
/* Callback used to wake up a stream when an input buffer is available. The
* stream <s>'s stream connectors are checked for a failed buffer allocation
- * as indicated by the presence of the SE_FL_RXBLK_ROOM flag and the lack of a
+ * as indicated by the presence of the SC_FL_NEED_BUFF flag and the lack of a
* buffer, and and input buffer is assigned there (at most one). The function
* returns 1 and wakes the stream up if a buffer was taken, otherwise zero.
* It's designed to be called from __offer_buffer().
@@ -313,10 +313,10 @@
{
struct stream *s = arg;
- if (!s->req.buf.size && !s->req.pipe && sc_ep_test(s->scf, SE_FL_RXBLK_BUFF) &&
+ if (!s->req.buf.size && !s->req.pipe && s->scf->flags & SC_FL_NEED_BUFF &&
b_alloc(&s->req.buf))
sc_have_buff(s->scf);
- else if (!s->res.buf.size && !s->res.pipe && sc_ep_test(s->scb, SE_FL_RXBLK_BUFF) &&
+ else if (!s->res.buf.size && !s->res.pipe && s->scb->flags & SC_FL_NEED_BUFF &&
b_alloc(&s->res.buf))
sc_have_buff(s->scb);
else