REORG: conn-stream: Move cs_app_ops in conn_stream.c
Callback functions to perform shutdown for reads and writes and to trigger
I/O calls are now moved in conn_stream.c.
diff --git a/include/haproxy/stream_interface.h b/include/haproxy/stream_interface.h
index 211d63a..3875385 100644
--- a/include/haproxy/stream_interface.h
+++ b/include/haproxy/stream_interface.h
@@ -29,9 +29,6 @@
#include <haproxy/conn_stream.h>
#include <haproxy/obj_type.h>
-extern struct cs_app_ops cs_app_embedded_ops;
-extern struct cs_app_ops cs_app_conn_ops;
-extern struct cs_app_ops cs_app_applet_ops;
extern struct data_cb si_conn_cb;
extern struct data_cb check_conn_cb;
diff --git a/src/conn_stream.c b/src/conn_stream.c
index 892a582..736cfb2 100644
--- a/src/conn_stream.c
+++ b/src/conn_stream.c
@@ -21,6 +21,49 @@
DECLARE_POOL(pool_head_connstream, "conn_stream", sizeof(struct conn_stream));
DECLARE_POOL(pool_head_cs_endpoint, "cs_endpoint", sizeof(struct cs_endpoint));
+/* functions used by default on a detached conn-stream */
+static void cs_app_shutr(struct conn_stream *cs);
+static void cs_app_shutw(struct conn_stream *cs);
+static void cs_app_chk_rcv(struct conn_stream *cs);
+static void cs_app_chk_snd(struct conn_stream *cs);
+
+/* functions used on a mux-based conn-stream */
+static void cs_app_shutr_conn(struct conn_stream *cs);
+static void cs_app_shutw_conn(struct conn_stream *cs);
+static void cs_app_chk_rcv_conn(struct conn_stream *cs);
+static void cs_app_chk_snd_conn(struct conn_stream *cs);
+
+/* functions used on an applet-based conn-stream */
+static void cs_app_shutr_applet(struct conn_stream *cs);
+static void cs_app_shutw_applet(struct conn_stream *cs);
+static void cs_app_chk_rcv_applet(struct conn_stream *cs);
+static void cs_app_chk_snd_applet(struct conn_stream *cs);
+
+/* conn-stream operations for connections */
+struct cs_app_ops cs_app_conn_ops = {
+ .chk_rcv = cs_app_chk_rcv_conn,
+ .chk_snd = cs_app_chk_snd_conn,
+ .shutr = cs_app_shutr_conn,
+ .shutw = cs_app_shutw_conn,
+};
+
+/* conn-stream operations for embedded tasks */
+struct cs_app_ops cs_app_embedded_ops = {
+ .chk_rcv = cs_app_chk_rcv,
+ .chk_snd = cs_app_chk_snd,
+ .shutr = cs_app_shutr,
+ .shutw = cs_app_shutw,
+};
+
+/* conn-stream operations for connections */
+struct cs_app_ops cs_app_applet_ops = {
+ .chk_rcv = cs_app_chk_rcv_applet,
+ .chk_snd = cs_app_chk_snd_applet,
+ .shutr = cs_app_shutr_applet,
+ .shutw = cs_app_shutw_applet,
+};
+
+
void cs_endpoint_init(struct cs_endpoint *endp)
{
endp->target = NULL;
@@ -388,3 +431,512 @@
if (appctx->applet->release && !cs_state_in(cs->state, CS_SB_DIS|CS_SB_CLO))
appctx->applet->release(appctx);
}
+
+/*
+ * This function performs a shutdown-read on a detached conn-stream in a
+ * connected or init state (it does nothing for other states). It either shuts
+ * the read side or marks itself as closed. The buffer flags are updated to
+ * reflect the new state. If the stream interface has CS_FL_NOHALF, we also
+ * forward the close to the write side. The owner task is woken up if it exists.
+ */
+static void cs_app_shutr(struct conn_stream *cs)
+{
+ struct channel *ic = cs_ic(cs);
+
+ si_rx_shut_blk(cs->si);
+ if (ic->flags & CF_SHUTR)
+ return;
+ ic->flags |= CF_SHUTR;
+ ic->rex = TICK_ETERNITY;
+
+ if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
+ return;
+
+ if (cs_oc(cs)->flags & CF_SHUTW) {
+ cs->state = CS_ST_DIS;
+ __cs_strm(cs)->conn_exp = TICK_ETERNITY;
+ }
+ else if (cs->flags & CS_FL_NOHALF) {
+ /* we want to immediately forward this close to the write side */
+ return cs_app_shutw(cs);
+ }
+
+ /* note that if the task exists, it must unregister itself once it runs */
+ if (!(cs->flags & CS_FL_DONT_WAKE))
+ task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
+}
+
+/*
+ * This function performs a shutdown-write on a detached conn-stream in a
+ * connected or init state (it does nothing for other states). It either shuts
+ * the write side or marks itself as closed. The buffer flags are updated to
+ * reflect the new state. It does also close everything if the SI was marked as
+ * being in error state. The owner task is woken up if it exists.
+ */
+static void cs_app_shutw(struct conn_stream *cs)
+{
+ struct channel *ic = cs_ic(cs);
+ struct channel *oc = cs_oc(cs);
+
+ oc->flags &= ~CF_SHUTW_NOW;
+ if (oc->flags & CF_SHUTW)
+ return;
+ oc->flags |= CF_SHUTW;
+ oc->wex = TICK_ETERNITY;
+ si_done_get(cs->si);
+
+ if (tick_isset(cs->hcto)) {
+ ic->rto = cs->hcto;
+ ic->rex = tick_add(now_ms, ic->rto);
+ }
+
+ switch (cs->state) {
+ case CS_ST_RDY:
+ case CS_ST_EST:
+ /* we have to shut before closing, otherwise some short messages
+ * may never leave the system, especially when there are remaining
+ * unread data in the socket input buffer, or when nolinger is set.
+ * However, if CS_FL_NOLINGER is explicitly set, we know there is
+ * no risk so we close both sides immediately.
+ */
+ if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
+ !(ic->flags & (CF_SHUTR|CF_DONT_READ)))
+ return;
+
+ /* fall through */
+ case CS_ST_CON:
+ case CS_ST_CER:
+ case CS_ST_QUE:
+ case CS_ST_TAR:
+ /* Note that none of these states may happen with applets */
+ cs->state = CS_ST_DIS;
+ /* fall through */
+ default:
+ cs->flags &= ~CS_FL_NOLINGER;
+ si_rx_shut_blk(cs->si);
+ ic->flags |= CF_SHUTR;
+ ic->rex = TICK_ETERNITY;
+ __cs_strm(cs)->conn_exp = TICK_ETERNITY;
+ }
+
+ /* note that if the task exists, it must unregister itself once it runs */
+ if (!(cs->flags & CS_FL_DONT_WAKE))
+ task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
+}
+
+/* default chk_rcv function for scheduled tasks */
+static void cs_app_chk_rcv(struct conn_stream *cs)
+{
+ struct channel *ic = cs_ic(cs);
+
+ DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
+ __FUNCTION__,
+ cs, cs->state, ic->flags, cs_oc(cs)->flags);
+
+ if (ic->pipe) {
+ /* stop reading */
+ si_rx_room_blk(cs->si);
+ }
+ else {
+ /* (re)start reading */
+ if (!(cs->flags & CS_FL_DONT_WAKE))
+ task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
+ }
+}
+
+/* default chk_snd function for scheduled tasks */
+static void cs_app_chk_snd(struct conn_stream *cs)
+{
+ struct channel *oc = cs_oc(cs);
+
+ DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
+ __FUNCTION__,
+ cs, cs->state, cs_ic(cs)->flags, oc->flags);
+
+ if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
+ return;
+
+ if (!(cs->si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
+ channel_is_empty(oc)) /* called with nothing to send ! */
+ return;
+
+ /* Otherwise there are remaining data to be sent in the buffer,
+ * so we tell the handler.
+ */
+ cs->si->flags &= ~SI_FL_WAIT_DATA;
+ if (!tick_isset(oc->wex))
+ oc->wex = tick_add_ifset(now_ms, oc->wto);
+
+ if (!(cs->flags & CS_FL_DONT_WAKE))
+ task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
+}
+
+/*
+ * This function performs a shutdown-read on a conn-stream attached to
+ * a connection in a connected or init state (it does nothing for other
+ * states). It either shuts the read side or marks itself as closed. The buffer
+ * flags are updated to reflect the new state. If the stream interface has
+ * CS_FL_NOHALF, we also forward the close to the write side. If a control
+ * layer is defined, then it is supposed to be a socket layer and file
+ * descriptors are then shutdown or closed accordingly. The function
+ * automatically disables polling if needed.
+ */
+static void cs_app_shutr_conn(struct conn_stream *cs)
+{
+ struct channel *ic = cs_ic(cs);
+
+ BUG_ON(!cs_conn(cs));
+
+ si_rx_shut_blk(cs->si);
+ if (ic->flags & CF_SHUTR)
+ return;
+ ic->flags |= CF_SHUTR;
+ ic->rex = TICK_ETERNITY;
+
+ if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
+ return;
+
+ if (cs_oc(cs)->flags & CF_SHUTW) {
+ cs_conn_close(cs);
+ cs->state = CS_ST_DIS;
+ __cs_strm(cs)->conn_exp = TICK_ETERNITY;
+ }
+ else if (cs->flags & CS_FL_NOHALF) {
+ /* we want to immediately forward this close to the write side */
+ return cs_app_shutw_conn(cs);
+ }
+}
+
+/*
+ * This function performs a shutdown-write on a conn-stream attached to
+ * a connection in a connected or init state (it does nothing for other
+ * states). It either shuts the write side or marks itself as closed. The
+ * buffer flags are updated to reflect the new state. It does also close
+ * everything if the SI was marked as being in error state. If there is a
+ * data-layer shutdown, it is called.
+ */
+static void cs_app_shutw_conn(struct conn_stream *cs)
+{
+ struct channel *ic = cs_ic(cs);
+ struct channel *oc = cs_oc(cs);
+
+ BUG_ON(!cs_conn(cs));
+
+ oc->flags &= ~CF_SHUTW_NOW;
+ if (oc->flags & CF_SHUTW)
+ return;
+ oc->flags |= CF_SHUTW;
+ oc->wex = TICK_ETERNITY;
+ si_done_get(cs->si);
+
+ if (tick_isset(cs->hcto)) {
+ ic->rto = cs->hcto;
+ ic->rex = tick_add(now_ms, ic->rto);
+ }
+
+ switch (cs->state) {
+ case CS_ST_RDY:
+ case CS_ST_EST:
+ /* we have to shut before closing, otherwise some short messages
+ * may never leave the system, especially when there are remaining
+ * unread data in the socket input buffer, or when nolinger is set.
+ * However, if CS_FL_NOLINGER is explicitly set, we know there is
+ * no risk so we close both sides immediately.
+ */
+
+ if (cs->endp->flags & CS_EP_ERROR) {
+ /* quick close, the socket is already shut anyway */
+ }
+ else if (cs->flags & CS_FL_NOLINGER) {
+ /* unclean data-layer shutdown, typically an aborted request
+ * or a forwarded shutdown from a client to a server due to
+ * option abortonclose. No need for the TLS layer to try to
+ * emit a shutdown message.
+ */
+ cs_conn_shutw(cs, CO_SHW_SILENT);
+ }
+ else {
+ /* clean data-layer shutdown. This only happens on the
+ * frontend side, or on the backend side when forwarding
+ * a client close in TCP mode or in HTTP TUNNEL mode
+ * while option abortonclose is set. We want the TLS
+ * layer to try to signal it to the peer before we close.
+ */
+ cs_conn_shutw(cs, CO_SHW_NORMAL);
+
+ if (!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
+ return;
+ }
+
+ /* fall through */
+ case CS_ST_CON:
+ /* we may have to close a pending connection, and mark the
+ * response buffer as shutr
+ */
+ cs_conn_close(cs);
+ /* fall through */
+ case CS_ST_CER:
+ case CS_ST_QUE:
+ case CS_ST_TAR:
+ cs->state = CS_ST_DIS;
+ /* fall through */
+ default:
+ cs->flags &= ~CS_FL_NOLINGER;
+ si_rx_shut_blk(cs->si);
+ ic->flags |= CF_SHUTR;
+ ic->rex = TICK_ETERNITY;
+ __cs_strm(cs)->conn_exp = TICK_ETERNITY;
+ }
+}
+
+/* This function is used for inter-conn-stream calls. It is called by the
+ * consumer to inform the producer side that it may be interested in checking
+ * for free space in the buffer. Note that it intentionally does not update
+ * timeouts, so that we can still check them later at wake-up. This function is
+ * dedicated to connection-based stream interfaces.
+ */
+static void cs_app_chk_rcv_conn(struct conn_stream *cs)
+{
+ BUG_ON(!cs_conn(cs));
+
+ /* (re)start reading */
+ if (cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
+ tasklet_wakeup(cs->wait_event.tasklet);
+}
+
+
+/* This function is used for inter-conn-stream calls. It is called by the
+ * producer to inform the consumer side that it may be interested in checking
+ * for data in the buffer. Note that it intentionally does not update timeouts,
+ * so that we can still check them later at wake-up.
+ */
+static void cs_app_chk_snd_conn(struct conn_stream *cs)
+{
+ struct channel *oc = cs_oc(cs);
+
+ BUG_ON(!cs_conn(cs));
+
+ if (unlikely(!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST) ||
+ (oc->flags & CF_SHUTW)))
+ return;
+
+ if (unlikely(channel_is_empty(oc))) /* called with nothing to send ! */
+ return;
+
+ if (!oc->pipe && /* spliced data wants to be forwarded ASAP */
+ !(cs->si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
+ return;
+
+ if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
+ si_cs_send(cs);
+
+ if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
+ /* Write error on the file descriptor */
+ if (cs->state >= CS_ST_CON)
+ cs->endp->flags |= CS_EP_ERROR;
+ goto out_wakeup;
+ }
+
+ /* OK, so now we know that some data might have been sent, and that we may
+ * have to poll first. We have to do that too if the buffer is not empty.
+ */
+ if (channel_is_empty(oc)) {
+ /* the connection is established but we can't write. Either the
+ * buffer is empty, or we just refrain from sending because the
+ * ->o limit was reached. Maybe we just wrote the last
+ * chunk and need to close.
+ */
+ if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
+ (CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
+ cs_state_in(cs->state, CS_SB_RDY|CS_SB_EST)) {
+ cs_shutw(cs);
+ goto out_wakeup;
+ }
+
+ if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0)
+ cs->si->flags |= SI_FL_WAIT_DATA;
+ oc->wex = TICK_ETERNITY;
+ }
+ else {
+ /* Otherwise there are remaining data to be sent in the buffer,
+ * which means we have to poll before doing so.
+ */
+ cs->si->flags &= ~SI_FL_WAIT_DATA;
+ if (!tick_isset(oc->wex))
+ oc->wex = tick_add_ifset(now_ms, oc->wto);
+ }
+
+ if (likely(oc->flags & CF_WRITE_ACTIVITY)) {
+ struct channel *ic = cs_ic(cs);
+
+ /* update timeout if we have written something */
+ if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
+ !channel_is_empty(oc))
+ oc->wex = tick_add_ifset(now_ms, oc->wto);
+
+ if (tick_isset(ic->rex) && !(cs->flags & CS_FL_INDEP_STR)) {
+ /* Note: to prevent the client from expiring read timeouts
+ * during writes, we refresh it. We only do this if the
+ * interface is not configured for "independent streams",
+ * because for some applications it's better not to do this,
+ * for instance when continuously exchanging small amounts
+ * of data which can full the socket buffers long before a
+ * write timeout is detected.
+ */
+ ic->rex = tick_add_ifset(now_ms, ic->rto);
+ }
+ }
+
+ /* in case of special condition (error, shutdown, end of write...), we
+ * have to notify the task.
+ */
+ if (likely((oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
+ ((oc->flags & CF_WAKE_WRITE) &&
+ ((channel_is_empty(oc) && !oc->to_forward) ||
+ !cs_state_in(cs->state, CS_SB_EST))))) {
+ out_wakeup:
+ if (!(cs->flags & CS_FL_DONT_WAKE))
+ task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
+ }
+}
+
+/*
+ * This function performs a shutdown-read on a conn-stream attached to an
+ * applet in a connected or init state (it does nothing for other states). It
+ * either shuts the read side or marks itself as closed. The buffer flags are
+ * updated to reflect the new state. If the stream interface has CS_FL_NOHALF,
+ * we also forward the close to the write side. The owner task is woken up if
+ * it exists.
+ */
+static void cs_app_shutr_applet(struct conn_stream *cs)
+{
+ struct channel *ic = cs_ic(cs);
+
+ BUG_ON(!cs_appctx(cs));
+
+ si_rx_shut_blk(cs->si);
+ if (ic->flags & CF_SHUTR)
+ return;
+ ic->flags |= CF_SHUTR;
+ ic->rex = TICK_ETERNITY;
+
+ /* Note: on shutr, we don't call the applet */
+
+ if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
+ return;
+
+ if (cs_oc(cs)->flags & CF_SHUTW) {
+ cs_applet_release(cs);
+ cs->state = CS_ST_DIS;
+ __cs_strm(cs)->conn_exp = TICK_ETERNITY;
+ }
+ else if (cs->flags & CS_FL_NOHALF) {
+ /* we want to immediately forward this close to the write side */
+ return cs_app_shutw_applet(cs);
+ }
+}
+
+/*
+ * This function performs a shutdown-write on a conn-stream attached to an
+ * applet in a connected or init state (it does nothing for other states). It
+ * either shuts the write side or marks itself as closed. The buffer flags are
+ * updated to reflect the new state. It does also close everything if the SI
+ * was marked as being in error state. The owner task is woken up if it exists.
+ */
+static void cs_app_shutw_applet(struct conn_stream *cs)
+{
+ struct channel *ic = cs_ic(cs);
+ struct channel *oc = cs_oc(cs);
+
+ BUG_ON(!cs_appctx(cs));
+
+ oc->flags &= ~CF_SHUTW_NOW;
+ if (oc->flags & CF_SHUTW)
+ return;
+ oc->flags |= CF_SHUTW;
+ oc->wex = TICK_ETERNITY;
+ si_done_get(cs->si);
+
+ if (tick_isset(cs->hcto)) {
+ ic->rto = cs->hcto;
+ ic->rex = tick_add(now_ms, ic->rto);
+ }
+
+ /* on shutw we always wake the applet up */
+ appctx_wakeup(__cs_appctx(cs));
+
+ switch (cs->state) {
+ case CS_ST_RDY:
+ case CS_ST_EST:
+ /* we have to shut before closing, otherwise some short messages
+ * may never leave the system, especially when there are remaining
+ * unread data in the socket input buffer, or when nolinger is set.
+ * However, if CS_FL_NOLINGER is explicitly set, we know there is
+ * no risk so we close both sides immediately.
+ */
+ if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
+ !(ic->flags & (CF_SHUTR|CF_DONT_READ)))
+ return;
+
+ /* fall through */
+ case CS_ST_CON:
+ case CS_ST_CER:
+ case CS_ST_QUE:
+ case CS_ST_TAR:
+ /* Note that none of these states may happen with applets */
+ cs_applet_release(cs);
+ cs->state = CS_ST_DIS;
+ /* fall through */
+ default:
+ cs->flags &= ~CS_FL_NOLINGER;
+ si_rx_shut_blk(cs->si);
+ ic->flags |= CF_SHUTR;
+ ic->rex = TICK_ETERNITY;
+ __cs_strm(cs)->conn_exp = TICK_ETERNITY;
+ }
+}
+
+/* chk_rcv function for applets */
+static void cs_app_chk_rcv_applet(struct conn_stream *cs)
+{
+ struct channel *ic = cs_ic(cs);
+
+ BUG_ON(!cs_appctx(cs));
+
+ DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
+ __FUNCTION__,
+ cs, cs->state, ic->flags, cs_oc(cs)->flags);
+
+ if (!ic->pipe) {
+ /* (re)start reading */
+ appctx_wakeup(__cs_appctx(cs));
+ }
+}
+
+/* chk_snd function for applets */
+static void cs_app_chk_snd_applet(struct conn_stream *cs)
+{
+ struct channel *oc = cs_oc(cs);
+
+ BUG_ON(!cs_appctx(cs));
+
+ DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
+ __FUNCTION__,
+ cs, cs->state, cs_ic(cs)->flags, oc->flags);
+
+ if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
+ return;
+
+ /* we only wake the applet up if it was waiting for some data */
+
+ if (!(cs->si->flags & SI_FL_WAIT_DATA))
+ return;
+
+ if (!tick_isset(oc->wex))
+ oc->wex = tick_add_ifset(now_ms, oc->wto);
+
+ if (!channel_is_empty(oc)) {
+ /* (re)start sending */
+ appctx_wakeup(__cs_appctx(cs));
+ }
+}
diff --git a/src/stream_interface.c b/src/stream_interface.c
index f2dfbcd..e252cd5 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -41,56 +41,12 @@
DECLARE_POOL(pool_head_streaminterface, "stream_interface", sizeof(struct stream_interface));
-
-/* functions used by default on a detached conn-stream */
-static void cs_app_shutr(struct conn_stream *cs);
-static void cs_app_shutw(struct conn_stream *cs);
-static void cs_app_chk_rcv(struct conn_stream *cs);
-static void cs_app_chk_snd(struct conn_stream *cs);
-
-/* functions used on a mux-based conn-stream */
-static void cs_app_shutr_conn(struct conn_stream *cs);
-static void cs_app_shutw_conn(struct conn_stream *cs);
-static void cs_app_chk_rcv_conn(struct conn_stream *cs);
-static void cs_app_chk_snd_conn(struct conn_stream *cs);
-
-/* functions used on an applet-based conn-stream */
-static void cs_app_shutr_applet(struct conn_stream *cs);
-static void cs_app_shutw_applet(struct conn_stream *cs);
-static void cs_app_chk_rcv_applet(struct conn_stream *cs);
-static void cs_app_chk_snd_applet(struct conn_stream *cs);
-
/* last read notification */
static void stream_int_read0(struct stream_interface *si);
/* post-IO notification callback */
static void stream_int_notify(struct stream_interface *si);
-
-/* conn-stream operations for connections */
-struct cs_app_ops cs_app_conn_ops = {
- .chk_rcv = cs_app_chk_rcv_conn,
- .chk_snd = cs_app_chk_snd_conn,
- .shutr = cs_app_shutr_conn,
- .shutw = cs_app_shutw_conn,
-};
-
-/* conn-stream operations for embedded tasks */
-struct cs_app_ops cs_app_embedded_ops = {
- .chk_rcv = cs_app_chk_rcv,
- .chk_snd = cs_app_chk_snd,
- .shutr = cs_app_shutr,
- .shutw = cs_app_shutw,
-};
-
-/* conn-stream operations for connections */
-struct cs_app_ops cs_app_applet_ops = {
- .chk_rcv = cs_app_chk_rcv_applet,
- .chk_snd = cs_app_chk_snd_applet,
- .shutr = cs_app_shutr_applet,
- .shutw = cs_app_shutw_applet,
-};
-
struct data_cb si_conn_cb = {
.wake = si_cs_process,
.name = "STRM",
@@ -121,146 +77,6 @@
pool_free(pool_head_streaminterface, si);
}
-
-/*
- * This function performs a shutdown-read on a detached conn-stream in a
- * connected or init state (it does nothing for other states). It either shuts
- * the read side or marks itself as closed. The buffer flags are updated to
- * reflect the new state. If the stream interface has CS_FL_NOHALF, we also
- * forward the close to the write side. The owner task is woken up if it exists.
- */
-static void cs_app_shutr(struct conn_stream *cs)
-{
- struct channel *ic = cs_ic(cs);
-
- si_rx_shut_blk(cs->si);
- if (ic->flags & CF_SHUTR)
- return;
- ic->flags |= CF_SHUTR;
- ic->rex = TICK_ETERNITY;
-
- if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
- return;
-
- if (cs_oc(cs)->flags & CF_SHUTW) {
- cs->state = CS_ST_DIS;
- __cs_strm(cs)->conn_exp = TICK_ETERNITY;
- }
- else if (cs->flags & CS_FL_NOHALF) {
- /* we want to immediately forward this close to the write side */
- return cs_app_shutw(cs);
- }
-
- /* note that if the task exists, it must unregister itself once it runs */
- if (!(cs->flags & CS_FL_DONT_WAKE))
- task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
-}
-
-/*
- * This function performs a shutdown-write on a detached conn-stream in a
- * connected or init state (it does nothing for other states). It either shuts
- * the write side or marks itself as closed. The buffer flags are updated to
- * reflect the new state. It does also close everything if the SI was marked as
- * being in error state. The owner task is woken up if it exists.
- */
-static void cs_app_shutw(struct conn_stream *cs)
-{
- struct channel *ic = cs_ic(cs);
- struct channel *oc = cs_oc(cs);
-
- oc->flags &= ~CF_SHUTW_NOW;
- if (oc->flags & CF_SHUTW)
- return;
- oc->flags |= CF_SHUTW;
- oc->wex = TICK_ETERNITY;
- si_done_get(cs->si);
-
- if (tick_isset(cs->hcto)) {
- ic->rto = cs->hcto;
- ic->rex = tick_add(now_ms, ic->rto);
- }
-
- switch (cs->state) {
- case CS_ST_RDY:
- case CS_ST_EST:
- /* we have to shut before closing, otherwise some short messages
- * may never leave the system, especially when there are remaining
- * unread data in the socket input buffer, or when nolinger is set.
- * However, if CS_FL_NOLINGER is explicitly set, we know there is
- * no risk so we close both sides immediately.
- */
- if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
- !(ic->flags & (CF_SHUTR|CF_DONT_READ)))
- return;
-
- /* fall through */
- case CS_ST_CON:
- case CS_ST_CER:
- case CS_ST_QUE:
- case CS_ST_TAR:
- /* Note that none of these states may happen with applets */
- cs->state = CS_ST_DIS;
- /* fall through */
- default:
- cs->flags &= ~CS_FL_NOLINGER;
- si_rx_shut_blk(cs->si);
- ic->flags |= CF_SHUTR;
- ic->rex = TICK_ETERNITY;
- __cs_strm(cs)->conn_exp = TICK_ETERNITY;
- }
-
- /* note that if the task exists, it must unregister itself once it runs */
- if (!(cs->flags & CS_FL_DONT_WAKE))
- task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
-}
-
-/* default chk_rcv function for scheduled tasks */
-static void cs_app_chk_rcv(struct conn_stream *cs)
-{
- struct channel *ic = cs_ic(cs);
-
- DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
- __FUNCTION__,
- cs, cs->state, ic->flags, cs_oc(cs)->flags);
-
- if (ic->pipe) {
- /* stop reading */
- si_rx_room_blk(cs->si);
- }
- else {
- /* (re)start reading */
- if (!(cs->flags & CS_FL_DONT_WAKE))
- task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
- }
-}
-
-/* default chk_snd function for scheduled tasks */
-static void cs_app_chk_snd(struct conn_stream *cs)
-{
- struct channel *oc = cs_oc(cs);
-
- DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
- __FUNCTION__,
- cs, cs->state, cs_ic(cs)->flags, oc->flags);
-
- if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
- return;
-
- if (!(cs->si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */
- channel_is_empty(oc)) /* called with nothing to send ! */
- return;
-
- /* Otherwise there are remaining data to be sent in the buffer,
- * so we tell the handler.
- */
- cs->si->flags &= ~SI_FL_WAIT_DATA;
- if (!tick_isset(oc->wex))
- oc->wex = tick_add_ifset(now_ms, oc->wto);
-
- if (!(cs->flags & CS_FL_DONT_WAKE))
- task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
-}
-
/* This function is the equivalent to si_update() except that it's
* designed to be called from outside the stream handlers, typically the lower
* layers (applets, connections) after I/O completion. After updating the stream
@@ -838,235 +654,6 @@
}
/*
- * This function performs a shutdown-read on a conn-stream attached to
- * a connection in a connected or init state (it does nothing for other
- * states). It either shuts the read side or marks itself as closed. The buffer
- * flags are updated to reflect the new state. If the stream interface has
- * CS_FL_NOHALF, we also forward the close to the write side. If a control
- * layer is defined, then it is supposed to be a socket layer and file
- * descriptors are then shutdown or closed accordingly. The function
- * automatically disables polling if needed.
- */
-static void cs_app_shutr_conn(struct conn_stream *cs)
-{
- struct channel *ic = cs_ic(cs);
-
- BUG_ON(!cs_conn(cs));
-
- si_rx_shut_blk(cs->si);
- if (ic->flags & CF_SHUTR)
- return;
- ic->flags |= CF_SHUTR;
- ic->rex = TICK_ETERNITY;
-
- if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
- return;
-
- if (cs_oc(cs)->flags & CF_SHUTW) {
- cs_conn_close(cs);
- cs->state = CS_ST_DIS;
- __cs_strm(cs)->conn_exp = TICK_ETERNITY;
- }
- else if (cs->flags & CS_FL_NOHALF) {
- /* we want to immediately forward this close to the write side */
- return cs_app_shutw_conn(cs);
- }
-}
-
-/*
- * This function performs a shutdown-write on a conn-stream attached to
- * a connection in a connected or init state (it does nothing for other
- * states). It either shuts the write side or marks itself as closed. The
- * buffer flags are updated to reflect the new state. It does also close
- * everything if the SI was marked as being in error state. If there is a
- * data-layer shutdown, it is called.
- */
-static void cs_app_shutw_conn(struct conn_stream *cs)
-{
- struct channel *ic = cs_ic(cs);
- struct channel *oc = cs_oc(cs);
-
- BUG_ON(!cs_conn(cs));
-
- oc->flags &= ~CF_SHUTW_NOW;
- if (oc->flags & CF_SHUTW)
- return;
- oc->flags |= CF_SHUTW;
- oc->wex = TICK_ETERNITY;
- si_done_get(cs->si);
-
- if (tick_isset(cs->hcto)) {
- ic->rto = cs->hcto;
- ic->rex = tick_add(now_ms, ic->rto);
- }
-
- switch (cs->state) {
- case CS_ST_RDY:
- case CS_ST_EST:
- /* we have to shut before closing, otherwise some short messages
- * may never leave the system, especially when there are remaining
- * unread data in the socket input buffer, or when nolinger is set.
- * However, if CS_FL_NOLINGER is explicitly set, we know there is
- * no risk so we close both sides immediately.
- */
-
- if (cs->endp->flags & CS_EP_ERROR) {
- /* quick close, the socket is already shut anyway */
- }
- else if (cs->flags & CS_FL_NOLINGER) {
- /* unclean data-layer shutdown, typically an aborted request
- * or a forwarded shutdown from a client to a server due to
- * option abortonclose. No need for the TLS layer to try to
- * emit a shutdown message.
- */
- cs_conn_shutw(cs, CO_SHW_SILENT);
- }
- else {
- /* clean data-layer shutdown. This only happens on the
- * frontend side, or on the backend side when forwarding
- * a client close in TCP mode or in HTTP TUNNEL mode
- * while option abortonclose is set. We want the TLS
- * layer to try to signal it to the peer before we close.
- */
- cs_conn_shutw(cs, CO_SHW_NORMAL);
-
- if (!(ic->flags & (CF_SHUTR|CF_DONT_READ)))
- return;
- }
-
- /* fall through */
- case CS_ST_CON:
- /* we may have to close a pending connection, and mark the
- * response buffer as shutr
- */
- cs_conn_close(cs);
- /* fall through */
- case CS_ST_CER:
- case CS_ST_QUE:
- case CS_ST_TAR:
- cs->state = CS_ST_DIS;
- /* fall through */
- default:
- cs->flags &= ~CS_FL_NOLINGER;
- si_rx_shut_blk(cs->si);
- ic->flags |= CF_SHUTR;
- ic->rex = TICK_ETERNITY;
- __cs_strm(cs)->conn_exp = TICK_ETERNITY;
- }
-}
-
-/* This function is used for inter-conn-stream calls. It is called by the
- * consumer to inform the producer side that it may be interested in checking
- * for free space in the buffer. Note that it intentionally does not update
- * timeouts, so that we can still check them later at wake-up. This function is
- * dedicated to connection-based stream interfaces.
- */
-static void cs_app_chk_rcv_conn(struct conn_stream *cs)
-{
- BUG_ON(!cs_conn(cs));
-
- /* (re)start reading */
- if (cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
- tasklet_wakeup(cs->wait_event.tasklet);
-}
-
-
-/* This function is used for inter-conn-stream calls. It is called by the
- * producer to inform the consumer side that it may be interested in checking
- * for data in the buffer. Note that it intentionally does not update timeouts,
- * so that we can still check them later at wake-up.
- */
-static void cs_app_chk_snd_conn(struct conn_stream *cs)
-{
- struct channel *oc = cs_oc(cs);
-
- BUG_ON(!cs_conn(cs));
-
- if (unlikely(!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST) ||
- (oc->flags & CF_SHUTW)))
- return;
-
- if (unlikely(channel_is_empty(oc))) /* called with nothing to send ! */
- return;
-
- if (!oc->pipe && /* spliced data wants to be forwarded ASAP */
- !(cs->si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
- return;
-
- if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
- si_cs_send(cs);
-
- if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
- /* Write error on the file descriptor */
- if (cs->state >= CS_ST_CON)
- cs->endp->flags |= CS_EP_ERROR;
- goto out_wakeup;
- }
-
- /* OK, so now we know that some data might have been sent, and that we may
- * have to poll first. We have to do that too if the buffer is not empty.
- */
- if (channel_is_empty(oc)) {
- /* the connection is established but we can't write. Either the
- * buffer is empty, or we just refrain from sending because the
- * ->o limit was reached. Maybe we just wrote the last
- * chunk and need to close.
- */
- if (((oc->flags & (CF_SHUTW|CF_AUTO_CLOSE|CF_SHUTW_NOW)) ==
- (CF_AUTO_CLOSE|CF_SHUTW_NOW)) &&
- cs_state_in(cs->state, CS_SB_RDY|CS_SB_EST)) {
- cs_shutw(cs);
- goto out_wakeup;
- }
-
- if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0)
- cs->si->flags |= SI_FL_WAIT_DATA;
- oc->wex = TICK_ETERNITY;
- }
- else {
- /* Otherwise there are remaining data to be sent in the buffer,
- * which means we have to poll before doing so.
- */
- cs->si->flags &= ~SI_FL_WAIT_DATA;
- if (!tick_isset(oc->wex))
- oc->wex = tick_add_ifset(now_ms, oc->wto);
- }
-
- if (likely(oc->flags & CF_WRITE_ACTIVITY)) {
- struct channel *ic = cs_ic(cs);
-
- /* update timeout if we have written something */
- if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
- !channel_is_empty(oc))
- oc->wex = tick_add_ifset(now_ms, oc->wto);
-
- if (tick_isset(ic->rex) && !(cs->flags & CS_FL_INDEP_STR)) {
- /* Note: to prevent the client from expiring read timeouts
- * during writes, we refresh it. We only do this if the
- * interface is not configured for "independent streams",
- * because for some applications it's better not to do this,
- * for instance when continuously exchanging small amounts
- * of data which can full the socket buffers long before a
- * write timeout is detected.
- */
- ic->rex = tick_add_ifset(now_ms, ic->rto);
- }
- }
-
- /* in case of special condition (error, shutdown, end of write...), we
- * have to notify the task.
- */
- if (likely((oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR|CF_SHUTW)) ||
- ((oc->flags & CF_WAKE_WRITE) &&
- ((channel_is_empty(oc) && !oc->to_forward) ||
- !cs_state_in(cs->state, CS_SB_EST))))) {
- out_wakeup:
- if (!(cs->flags & CS_FL_DONT_WAKE))
- task_wakeup(cs_strm_task(cs), TASK_WOKEN_IO);
- }
-}
-
-/*
* This is the callback which is called by the connection layer to receive data
* into the buffer from the connection. It iterates over the mux layer's
* rcv_buf function.
@@ -1483,148 +1070,6 @@
appctx_wakeup(__cs_appctx(si->cs));
}
-
-/*
- * This function performs a shutdown-read on a conn-stream attached to an
- * applet in a connected or init state (it does nothing for other states). It
- * either shuts the read side or marks itself as closed. The buffer flags are
- * updated to reflect the new state. If the stream interface has CS_FL_NOHALF,
- * we also forward the close to the write side. The owner task is woken up if
- * it exists.
- */
-static void cs_app_shutr_applet(struct conn_stream *cs)
-{
- struct channel *ic = cs_ic(cs);
-
- BUG_ON(!cs_appctx(cs));
-
- si_rx_shut_blk(cs->si);
- if (ic->flags & CF_SHUTR)
- return;
- ic->flags |= CF_SHUTR;
- ic->rex = TICK_ETERNITY;
-
- /* Note: on shutr, we don't call the applet */
-
- if (!cs_state_in(cs->state, CS_SB_CON|CS_SB_RDY|CS_SB_EST))
- return;
-
- if (cs_oc(cs)->flags & CF_SHUTW) {
- cs_applet_release(cs);
- cs->state = CS_ST_DIS;
- __cs_strm(cs)->conn_exp = TICK_ETERNITY;
- }
- else if (cs->flags & CS_FL_NOHALF) {
- /* we want to immediately forward this close to the write side */
- return cs_app_shutw_applet(cs);
- }
-}
-
-/*
- * This function performs a shutdown-write on a conn-stream attached to an
- * applet in a connected or init state (it does nothing for other states). It
- * either shuts the write side or marks itself as closed. The buffer flags are
- * updated to reflect the new state. It does also close everything if the SI
- * was marked as being in error state. The owner task is woken up if it exists.
- */
-static void cs_app_shutw_applet(struct conn_stream *cs)
-{
- struct channel *ic = cs_ic(cs);
- struct channel *oc = cs_oc(cs);
-
- BUG_ON(!cs_appctx(cs));
-
- oc->flags &= ~CF_SHUTW_NOW;
- if (oc->flags & CF_SHUTW)
- return;
- oc->flags |= CF_SHUTW;
- oc->wex = TICK_ETERNITY;
- si_done_get(cs->si);
-
- if (tick_isset(cs->hcto)) {
- ic->rto = cs->hcto;
- ic->rex = tick_add(now_ms, ic->rto);
- }
-
- /* on shutw we always wake the applet up */
- appctx_wakeup(__cs_appctx(cs));
-
- switch (cs->state) {
- case CS_ST_RDY:
- case CS_ST_EST:
- /* we have to shut before closing, otherwise some short messages
- * may never leave the system, especially when there are remaining
- * unread data in the socket input buffer, or when nolinger is set.
- * However, if CS_FL_NOLINGER is explicitly set, we know there is
- * no risk so we close both sides immediately.
- */
- if (!(cs->endp->flags & CS_EP_ERROR) && !(cs->flags & CS_FL_NOLINGER) &&
- !(ic->flags & (CF_SHUTR|CF_DONT_READ)))
- return;
-
- /* fall through */
- case CS_ST_CON:
- case CS_ST_CER:
- case CS_ST_QUE:
- case CS_ST_TAR:
- /* Note that none of these states may happen with applets */
- cs_applet_release(cs);
- cs->state = CS_ST_DIS;
- /* fall through */
- default:
- cs->flags &= ~CS_FL_NOLINGER;
- si_rx_shut_blk(cs->si);
- ic->flags |= CF_SHUTR;
- ic->rex = TICK_ETERNITY;
- __cs_strm(cs)->conn_exp = TICK_ETERNITY;
- }
-}
-
-/* chk_rcv function for applets */
-static void cs_app_chk_rcv_applet(struct conn_stream *cs)
-{
- struct channel *ic = cs_ic(cs);
-
- BUG_ON(!cs_appctx(cs));
-
- DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
- __FUNCTION__,
- cs, cs->state, ic->flags, cs_oc(cs)->flags);
-
- if (!ic->pipe) {
- /* (re)start reading */
- appctx_wakeup(__cs_appctx(cs));
- }
-}
-
-/* chk_snd function for applets */
-static void cs_app_chk_snd_applet(struct conn_stream *cs)
-{
- struct channel *oc = cs_oc(cs);
-
- BUG_ON(!cs_appctx(cs));
-
- DPRINTF(stderr, "%s: cs=%p, cs->state=%d ic->flags=%08x oc->flags=%08x\n",
- __FUNCTION__,
- cs, cs->state, cs_ic(cs)->flags, oc->flags);
-
- if (unlikely(cs->state != CS_ST_EST || (oc->flags & CF_SHUTW)))
- return;
-
- /* we only wake the applet up if it was waiting for some data */
-
- if (!(cs->si->flags & SI_FL_WAIT_DATA))
- return;
-
- if (!tick_isset(oc->wex))
- oc->wex = tick_add_ifset(now_ms, oc->wto);
-
- if (!channel_is_empty(oc)) {
- /* (re)start sending */
- appctx_wakeup(__cs_appctx(cs));
- }
-}
-
/*
* Local variables:
* c-indent-level: 8