BUG/MEDIUM: stream: Save unprocessed events for a stream

A stream can be awakened for different reasons. During its processing, it can be
early stopped if no buffer is available. In this situation, the reason why the
stream was awakened is lost, because we rely on the task state, which is reset
after each processing loop.

In many cases, that's not a big deal. But it can be useful to accumulate the
task states if the stream processing is interrupted, especially if some filters
need to be called.

To be clearer, here is an simple example:

  1) A stream is awakened with the reason TASK_WOKEN_MSG.

  2) Because no buffer is available, the processing is interrupted, the stream
  is back to sleep. And the task state is reset.

  3) Some buffers become available, so the stream is awakened with the reason
  TASK_WOKEN_RES. At this step, the previous reason (TASK_WOKEN_MSG) is lost.

Now, the task states are saved for a stream and reset only when the stream
processing is not interrupted. The correspoing bitfield represents the pending
events for a stream. And we use this one instead of the task state during the
stream processing.

Note that TASK_WOKEN_TIMER and TASK_WOKEN_RES are always removed because these
events are always handled during the stream processing.

[wt: backport to 1.7 and 1.6]
diff --git a/src/stream.c b/src/stream.c
index 055cc23..db8702d 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -145,6 +145,7 @@
 	s->unique_id = NULL;
 
 	s->task = t;
+	s->pending_events = 0;
 	t->process = process_stream;
 	t->context = s;
 	t->expire = TICK_ETERNITY;
@@ -1584,10 +1585,13 @@
 	si_f->flags |= SI_FL_DONT_WAKE;
 	si_b->flags |= SI_FL_DONT_WAKE;
 
+	/* update pending events */
+	s->pending_events |= (t->state & TASK_WOKEN_ANY);
+
 	/* 1a: Check for low level timeouts if needed. We just set a flag on
 	 * stream interfaces when their timeouts have expired.
 	 */
-	if (unlikely(t->state & TASK_WOKEN_TIMER)) {
+	if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
 		stream_int_check_timeouts(si_f);
 		stream_int_check_timeouts(si_b);
 
@@ -1635,7 +1639,7 @@
 		      (CF_SHUTR|CF_READ_ACTIVITY|CF_READ_TIMEOUT|CF_SHUTW|
 		       CF_WRITE_ACTIVITY|CF_WRITE_TIMEOUT|CF_ANA_TIMEOUT)) &&
 		    !((si_f->flags | si_b->flags) & (SI_FL_EXP|SI_FL_ERR)) &&
-		    ((t->state & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
+		    ((s->pending_events & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
 			si_f->flags &= ~SI_FL_DONT_WAKE;
 			si_b->flags &= ~SI_FL_DONT_WAKE;
 			goto update_exp_and_leave;
@@ -1769,7 +1773,7 @@
 	    ((req->flags ^ rqf_last) & CF_MASK_STATIC) ||
 	    si_f->state != rq_prod_last ||
 	    si_b->state != rq_cons_last ||
-	    s->task->state & TASK_WOKEN_MSG) {
+	    s->pending_events & TASK_WOKEN_MSG) {
 		unsigned int flags = req->flags;
 
 		if (si_f->state >= SI_ST_EST) {
@@ -1868,7 +1872,7 @@
 		 (res->flags ^ rpf_last) & CF_MASK_STATIC ||
 		 si_f->state != rp_cons_last ||
 		 si_b->state != rp_prod_last ||
-		 s->task->state & TASK_WOKEN_MSG) {
+		 s->pending_events & TASK_WOKEN_MSG) {
 		unsigned int flags = res->flags;
 
 		if ((res->flags & CF_MASK_ANALYSER) &&
@@ -2369,6 +2373,9 @@
 			req->rex = TICK_ETERNITY;
 		}
 
+		/* Reset pending events now */
+		s->pending_events = 0;
+
 	update_exp_and_leave:
 		/* Note: please ensure that if you branch here you disable SI_FL_DONT_WAKE */
 		t->expire = tick_first((tick_is_expired(t->expire, now_ms) ? 0 : t->expire),
@@ -2402,6 +2409,7 @@
 		if (!tick_isset(t->expire))
 			ABORT_NOW();
 #endif
+		s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
 		stream_release_buffers(s);
 		return t; /* nothing more to do */
 	}