BUG/MEDIUM: proxy/sktable: prevent watchdog trigger on soft-stop

During soft-stop, manage_proxy() (p->task) will try to purge
trashable (expired and not referenced) sticktable entries,
effectively releasing the process memory to leave some space
for new processes.

This is done by calling stktable_trash_oldest(), immediately
followed by a pool_gc() to give the memory back to the OS.

As already mentioned in dfe7925 ("BUG/MEDIUM: stick-table:
limit the time spent purging old entries"), calling
stktable_trash_oldest() with a huge batch can result in the function
spending too much time searching and purging entries, and ultimately
triggering the watchdog.

Lately, an internal issue was reported in which we could see
that the watchdog is being triggered in stktable_trash_oldest()
on soft-stop (thus initiated by manage_proxy())

According to the report, the crash seems to only occur since 5938021
("BUG/MEDIUM: stick-table: do not leave entries in end of window during purge")

This could be the result of stktable_trash_oldest() now working
as expected, and thus spending a large amount of time purging
entries when called with a large enough <to_batch>.

Instead of adding new checks in stktable_trash_oldest(), here we
chose to address the issue directly in manage_proxy().

Since the stktable_trash_oldest() function is called with
<to_batch> == <p->table->current>, it's pretty obvious that it could
cause some issues during soft-stop if a large table, assuming it is
full prior to the soft-stop, suddenly sees most of its entries
becoming trashable because of the soft-stop.

Moreover, we should note that the call to stktable_trash_oldest() is
immediately followed by a call to pool_gc():

We know for sure that pool_gc(), as it involves malloc_trim() on
glibc, is rather expensive, and the more memory to reclaim,
the longer the call.

We need to ensure that both stktable_trash_oldest() + consequent
pool_gc() call both theoretically fit in a single task execution window
to avoid contention, and thus prevent the watchdog from being triggered.

To do this, we now allocate a "budget" for each purging attempt.
budget is maxed out to 32K, it means that each sticktable cleanup
attempt will trash at most 32K entries.

32K value is quite arbitrary here, and might need to be adjusted or
even deducted from other parameters if this fails to properly address
the issue without introducing new side-effects.
The goal is to find a good balance between the max duration of each
cleanup batch and the frequency of (expensive) pool_gc() calls.

If most of the budget is actually spent trashing entries, then the task
will immediately be rescheduled to continue the purge.
This way, the purge is effectively batched over multiple task runs.

This may be slowly backported to all stable versions.
[Please note that this commit depends on 6e1fe25 ("MINOR: proxy/pool:
prevent unnecessary calls to pool_gc()")]
diff --git a/src/proxy.c b/src/proxy.c
index 1e4e36c..cb3d474 100644
--- a/src/proxy.c
+++ b/src/proxy.c
@@ -2003,11 +2003,40 @@
 			 * to push to a new process and
 			 * we are free to flush the table.
 			 */
-			if (stktable_trash_oldest(p->table, p->table->current))
+			int budget;
+			int cleaned_up;
+
+			/* We purposely enforce a budget limitation since we don't want
+			 * to spend too much time purging old entries
+			 *
+			 * This is known to cause the watchdog to occasionnaly trigger if
+			 * the table is huge and all entries become available for purge
+			 * at the same time
+			 *
+			 * Moreover, we must also anticipate the pool_gc() call which
+			 * will also be much slower if there is too much work at once
+			 */
+			budget = MIN(p->table->current, (1 << 15)); /* max: 32K */
+			cleaned_up = stktable_trash_oldest(p->table, budget);
+			if (cleaned_up) {
+				/* immediately release freed memory since we are stopping */
 				pool_gc(NULL);
+				if (cleaned_up > (budget / 2)) {
+					/* most of the budget was used to purge entries,
+					 * it is very likely that there are still trashable
+					 * entries in the table, reschedule a new cleanup
+					 * attempt ASAP
+					 */
+					t->expire = TICK_ETERNITY;
+					task_wakeup(t, TASK_WOKEN_RES);
+					return t;
+				}
+			}
 		}
 		if (p->table->current) {
-			/* some entries still remain, let's recheck in one second */
+			/* some entries still remain but are not yet available
+			 * for cleanup, let's recheck in one second
+			 */
 			next = tick_first(next, tick_add(now_ms, 1000));
 		}
 	}