MINOR: stream-int: implement the stream_int_notify() function
stream_int_notify() was taken from the common part between si_conn_wake_cb()
and si_applet_done(). It is designed to report activity to a stream from
outside its handler. It'll generally be used by lower layers to report I/O
completion but may also be used by remote streams if the buffer processing
is shared.
diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h
index d1d75ba..22d05be 100644
--- a/include/proto/stream_interface.h
+++ b/include/proto/stream_interface.h
@@ -51,6 +51,7 @@
void stream_int_update(struct stream_interface *si);
void stream_int_update_conn(struct stream_interface *si);
void stream_int_update_applet(struct stream_interface *si);
+void stream_int_notify(struct stream_interface *si);
/* returns the channel which receives data from this stream interface (input channel) */
static inline struct channel *si_ic(struct stream_interface *si)
diff --git a/src/stream_interface.c b/src/stream_interface.c
index 1da5248..dbc481f 100644
--- a/src/stream_interface.c
+++ b/src/stream_interface.c
@@ -519,6 +519,115 @@
return 0;
}
+/* This function is the equivalent to stream_int_update() except that it's
+ * designed to be called from outside the stream handlers, typically the lower
+ * layers (applets, connections) after I/O completion. After updating the stream
+ * interface and timeouts, it will try to forward what can be forwarded, then to
+ * wake the associated task up if an important event requires special handling.
+ * It should not be called from within the stream itself, stream_int_update()
+ * is designed for this.
+ */
+void stream_int_notify(struct stream_interface *si)
+{
+ struct channel *ic = si_ic(si);
+ struct channel *oc = si_oc(si);
+
+ /* process consumer side */
+ if (channel_is_empty(oc)) {
+ if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
+ (si->state == SI_ST_EST))
+ si_shutw(si);
+ oc->wex = TICK_ETERNITY;
+ }
+
+ /* indicate that we may be waiting for data from the output channel */
+ if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
+ si->flags |= SI_FL_WAIT_DATA;
+
+ /* update OC timeouts and wake the other side up if it's waiting for room */
+ if (oc->flags & CF_WRITE_ACTIVITY) {
+ if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
+ !channel_is_empty(oc))
+ if (tick_isset(oc->wex))
+ oc->wex = tick_add_ifset(now_ms, oc->wto);
+
+ if (!(si->flags & SI_FL_INDEP_STR))
+ if (tick_isset(ic->rex))
+ ic->rex = tick_add_ifset(now_ms, ic->rto);
+
+ if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
+ channel_may_recv(oc) &&
+ (si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
+ si_chk_rcv(si_opposite(si));
+ }
+
+ /* Notify the other side when we've injected data into the IC that
+ * needs to be forwarded. We can do fast-forwarding as soon as there
+ * are output data, but we avoid doing this if some of the data are
+ * not yet scheduled for being forwarded, because it is very likely
+ * that it will be done again immediately afterwards once the following
+ * data are parsed (eg: HTTP chunking). We only SI_FL_WAIT_ROOM once
+ * we've emptied *some* of the output buffer, and not just when there
+ * is available room, because applets are often forced to stop before
+ * the buffer is full. We must not stop based on input data alone because
+ * an HTTP parser might need more data to complete the parsing.
+ */
+ if (!channel_is_empty(ic) &&
+ (si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
+ (ic->buf->i == 0 || ic->pipe)) {
+ int new_len, last_len;
+
+ last_len = ic->buf->o;
+ if (ic->pipe)
+ last_len += ic->pipe->data;
+
+ si_chk_snd(si_opposite(si));
+
+ new_len = ic->buf->o;
+ if (ic->pipe)
+ new_len += ic->pipe->data;
+
+ /* check if the consumer has freed some space either in the
+ * buffer or in the pipe.
+ */
+ if (channel_may_recv(ic) && new_len < last_len)
+ si->flags &= ~SI_FL_WAIT_ROOM;
+ }
+
+ if (si->flags & SI_FL_WAIT_ROOM) {
+ ic->rex = TICK_ETERNITY;
+ }
+ else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
+ channel_may_recv(ic)) {
+ /* we must re-enable reading if si_chk_snd() has freed some space */
+ if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
+ ic->rex = tick_add_ifset(now_ms, ic->rto);
+ }
+
+ /* wake the task up only when needed */
+ if (/* changes on the production side */
+ (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
+ si->state != SI_ST_EST ||
+ (si->flags & SI_FL_ERR) ||
+ ((ic->flags & CF_READ_PARTIAL) &&
+ (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
+
+ /* changes on the consumption side */
+ (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
+ ((oc->flags & CF_WRITE_ACTIVITY) &&
+ ((oc->flags & CF_SHUTW) ||
+ ((oc->flags & CF_WAKE_WRITE) &&
+ (si_opposite(si)->state != SI_ST_EST ||
+ (channel_is_empty(oc) && !oc->to_forward)))))) {
+ task_wakeup(si_task(si), TASK_WOKEN_IO);
+ }
+ if (ic->flags & CF_READ_ACTIVITY)
+ ic->flags &= ~CF_READ_DONTWAIT;
+
+ stream_release_buffers(si_strm(si));
+}
+
+
/* Callback to be used by connection I/O handlers upon completion. It differs from
* the update function in that it is designed to be called by lower layers after I/O
* events have been completed. It will also try to wake the associated task up if