MEDIUM: spoe: Use an ebtree to manage idle applets

Instead of using a list of applets with idle ones in front, we now use an
ebtree. Aapplets in the tree are idle by definition. And the key is the applet's
weight. When a new frame is queued, the first idle applet (with the lowest
weight) is woken up and its weight is increased by one. And when an applet sends
a frame to a SPOA, its weight is decremented by one.

This is empirical, but it should avoid to overuse a very few number of applets
and increase the balancing between idle applets.
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index ae34c3b..5848fdc 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -46,8 +46,10 @@
 
 #if defined(DEBUG_SPOE) || defined(DEBUG_FULL)
 #define SPOE_PRINTF(x...) fprintf(x)
+#define SPOE_DEBUG_STMT(statement) statement
 #else
 #define SPOE_PRINTF(x...)
+#define SPOE_DEBUG_STMT(statement)
 #endif
 
 /* Reserved 4 bytes to the frame size. So a frame and its size can be written
@@ -1212,16 +1214,20 @@
 		    __FUNCTION__, appctx);
 
 	/* Remove applet from the list of running applets */
-	agent->rt[tid].applets_act--;
+	SPOE_DEBUG_STMT(agent->rt[tid].applets_act--);
+	HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 	if (!LIST_ISEMPTY(&spoe_appctx->list)) {
 		LIST_DEL(&spoe_appctx->list);
 		LIST_INIT(&spoe_appctx->list);
 	}
+	HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 
 	/* Shutdown the server connection, if needed */
 	if (appctx->st0 != SPOE_APPCTX_ST_END) {
-		if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
-			agent->rt[tid].applets_idle--;
+		if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
+			eb32_delete(&spoe_appctx->node);
+			SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
+		}
 
 		appctx->st0 = SPOE_APPCTX_ST_END;
 		if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
@@ -1398,12 +1404,10 @@
 		default:
 			/* HELLO handshake is finished, set the idle timeout and
 			 * add the applet in the list of running applets. */
-			agent->rt[tid].applets_idle++;
+			SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
 			appctx->st0 = SPOE_APPCTX_ST_IDLE;
-			HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
-			LIST_DEL(&SPOE_APPCTX(appctx)->list);
-			LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
-			HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
+			SPOE_APPCTX(appctx)->node.key = 0;
+			eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
 
 			/* Update runtinme agent info */
 			HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size);
@@ -1474,6 +1478,7 @@
 			ctx->state = SPOE_CTX_ST_ERROR;
 			ctx->status_code = (SPOE_APPCTX(appctx)->status_code + 0x100);
 			task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
+			*skip = 1;
 			break;
 
 		case 1: /* retry */
@@ -1491,15 +1496,14 @@
 			if (!(ctx->flags & SPOE_CTX_FL_FRAGMENTED) ||
 			    (ctx->frag_ctx.flags & SPOE_FRM_FL_FIN))
 				goto no_frag_frame_sent;
-			else {
-				*skip = 1;
+			else
 				goto frag_frame_sent;
-			}
 	}
 	goto end;
 
   frag_frame_sent:
 	appctx->st0 = SPOE_APPCTX_ST_SENDING_FRAG_NOTIFY;
+	*skip = 1;
 	SPOE_APPCTX(appctx)->frag_ctx.ctx    = ctx;
 	SPOE_APPCTX(appctx)->frag_ctx.cursid = ctx->stream_id;
 	SPOE_APPCTX(appctx)->frag_ctx.curfid = ctx->frame_id;
@@ -1518,6 +1522,7 @@
 	}
 	else {
 		appctx->st0 = SPOE_APPCTX_ST_WAITING_SYNC_ACK;
+		*skip = 1;
 		LIST_ADDQ(&SPOE_APPCTX(appctx)->waiting_queue, &ctx->list);
 	}
 	SPOE_APPCTX(appctx)->frag_ctx.ctx    = NULL;
@@ -1552,6 +1557,7 @@
 	if (ret > 1) {
 		if (*frame == SPOE_FRM_T_AGENT_DISCON) {
 			appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING;
+			ret = -1;
 			goto end;
 		}
 		trash.len = ret + 4;
@@ -1616,13 +1622,12 @@
 		goto next;
 	}
 
-
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p"
-		    " - process: fpa=%u/%u - appctx-state=%s - flags=0x%08x\n",
+		    " - process: fpa=%u/%u - appctx-state=%s - weight=%u - flags=0x%08x\n",
 		    (int)now.tv_sec, (int)now.tv_usec, agent->id,
 		    __FUNCTION__, appctx, SPOE_APPCTX(appctx)->cur_fpa,
 		    agent->max_fpa, spoe_appctx_state_str[appctx->st0],
-		    SPOE_APPCTX(appctx)->flags);
+		    SPOE_APPCTX(appctx)->node.key, SPOE_APPCTX(appctx)->flags);
 
 	if (appctx->st0 == SPOE_APPCTX_ST_WAITING_SYNC_ACK)
 		skip_sending = 1;
@@ -1655,6 +1660,8 @@
 				goto next;
 
 			case 0: /* ignore */
+				if (SPOE_APPCTX(appctx)->node.key)
+					SPOE_APPCTX(appctx)->node.key--;
 				active_s++;
 				break;
 
@@ -1662,23 +1669,22 @@
 				break;
 
 			default:
+				if (SPOE_APPCTX(appctx)->node.key)
+					SPOE_APPCTX(appctx)->node.key--;
 				active_s++;
 				break;
 		}
 	}
 
 	if (active_s || active_r) {
-		HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
-		LIST_DEL(&SPOE_APPCTX(appctx)->list);
-		LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
-		HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
-
 		update_freq_ctr(&agent->rt[tid].processing_per_sec, active_s);
 		SPOE_APPCTX(appctx)->task->expire = tick_add_ifset(now_ms, agent->timeout.idle);
 	}
+
 	if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING && SPOE_APPCTX(appctx)->cur_fpa < agent->max_fpa) {
+		SPOE_DEBUG_STMT(agent->rt[tid].applets_idle++);
 		appctx->st0 = SPOE_APPCTX_ST_IDLE;
-		agent->rt[tid].applets_idle++;
+		eb32_insert(&agent->rt[tid].idle_applets, &SPOE_APPCTX(appctx)->node);
 	}
 	return 1;
 
@@ -1847,7 +1853,8 @@
 				appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
 				goto switchstate;
 			}
-			agent->rt[tid].applets_idle--;
+			SPOE_DEBUG_STMT(agent->rt[tid].applets_idle--);
+			eb32_delete(&SPOE_APPCTX(appctx)->node);
 			appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
 			/* fall through */
 
@@ -1955,7 +1962,7 @@
 	HA_SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
 	LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
 	HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
-	conf->agent->rt[tid].applets_act++;
+	SPOE_DEBUG_STMT(conf->agent->rt[tid].applets_act++);
 
 	task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
 	task_wakeup(strm->task, TASK_WOKEN_INIT);
@@ -1983,7 +1990,7 @@
 	struct spoe_appctx *spoe_appctx;
 
 	/* Check if we need to create a new SPOE applet or not. */
-	if (agent->rt[tid].applets_idle &&
+	if (!eb_is_empty(&agent->rt[tid].idle_applets) &&
 	    agent->rt[tid].processing < read_freq_ctr(&agent->rt[tid].processing_per_sec))
 		goto end;
 
@@ -2048,17 +2055,17 @@
 		    ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
 		    agent->rt[tid].processing);
 
-	/* Finally try to wakeup the first IDLE applet found and move it at the
-	 * end of the list. */
-	list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) {
-		appctx = spoe_appctx->owner;
-		if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
-			spoe_wakeup_appctx(appctx);
-			HA_SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
-			LIST_DEL(&spoe_appctx->list);
-			LIST_ADDQ(&agent->rt[tid].applets, &spoe_appctx->list);
-			HA_SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
-			break;
+	/* Finally try to wakeup an IDLE applet. */
+	if (!eb_is_empty(&agent->rt[tid].idle_applets)) {
+		struct eb32_node *node;
+
+		node = eb32_first(&agent->rt[tid].idle_applets);
+		spoe_appctx = eb32_entry(node, struct spoe_appctx, node);
+		if (node && spoe_appctx) {
+			eb32_delete(&spoe_appctx->node);
+			spoe_appctx->node.key++;
+			eb32_insert(&agent->rt[tid].idle_applets, &spoe_appctx->node);
+			spoe_wakeup_appctx(spoe_appctx->owner);
 		}
 	}
 	return 1;
@@ -3169,7 +3176,7 @@
 		curagent->cps_max        = 0;
 		curagent->eps_max        = 0;
 		curagent->max_frame_size = MAX_FRAME_SIZE;
-		curagent->max_fpa        = 100;
+		curagent->max_fpa        = 20;
 
 		for (i = 0; i < SPOE_EV_EVENTS; ++i)
 			LIST_INIT(&curagent->events[i]);
@@ -3183,8 +3190,8 @@
 		}
 		for (i = 0; i < global.nbthread; ++i) {
 			curagent->rt[i].frame_size   = curagent->max_frame_size;
-			curagent->rt[i].applets_act  = 0;
-			curagent->rt[i].applets_idle = 0;
+			SPOE_DEBUG_STMT(curagent->rt[i].applets_act  = 0);
+			SPOE_DEBUG_STMT(curagent->rt[i].applets_idle = 0);
 			curagent->rt[i].processing   = 0;
 			LIST_INIT(&curagent->rt[i].applets);
 			LIST_INIT(&curagent->rt[i].sending_queue);