MEDIUM: thread/spoe: Make the SPOE thread-safe

Because there is not migration mechanism yet, all runtime information about an
SPOE agent are thread-local and async exchanges with agents are disabled when we
have serveral threads. Howerver, pipelining is still available. So for now, the
thread part of the SPOE is pretty simple.
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 7fc4ed8..938faab 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -153,6 +153,7 @@
 {
 	struct spoe_message *msg, *msgback;
 	struct spoe_group   *grp, *grpback;
+	int                  i;
 
 	if (!agent)
 		return;
@@ -169,6 +170,9 @@
 		LIST_DEL(&grp->list);
 		spoe_release_group(grp);
 	}
+	for (i = 0; i < global.nbthread; ++i)
+		SPIN_DESTROY(&agent->rt[i].lock);
+	free(agent->rt);
 	free(agent);
 }
 
@@ -974,7 +978,7 @@
 
 	/* Try to find the corresponding SPOE context */
 	if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
-		list_for_each_entry((*ctx), &agent->waiting_queue, list) {
+		list_for_each_entry((*ctx), &agent->rt[tid].waiting_queue, list) {
 			if ((*ctx)->stream_id == (unsigned int)stream_id &&
 			    (*ctx)->frame_id  == (unsigned int)frame_id)
 				goto found;
@@ -1234,7 +1238,7 @@
 		    __FUNCTION__, appctx);
 
 	/* Remove applet from the list of running applets */
-	agent->applets_act--;
+	agent->rt[tid].applets_act--;
 	if (!LIST_ISEMPTY(&spoe_appctx->list)) {
 		LIST_DEL(&spoe_appctx->list);
 		LIST_INIT(&spoe_appctx->list);
@@ -1243,7 +1247,7 @@
 	/* Shutdown the server connection, if needed */
 	if (appctx->st0 != SPOE_APPCTX_ST_END) {
 		if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
-			agent->applets_idle--;
+			agent->rt[tid].applets_idle--;
 
 		appctx->st0 = SPOE_APPCTX_ST_END;
 		if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
@@ -1284,18 +1288,18 @@
 			    &spoe_appctx->buffer_wait);
 	pool_free2(pool2_spoe_appctx, spoe_appctx);
 
-	if (!LIST_ISEMPTY(&agent->applets))
+	if (!LIST_ISEMPTY(&agent->rt[tid].applets))
 		goto end;
 
 	/* If this was the last running applet, notify all waiting streams */
-	list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
+	list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	}
-	list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
+	list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
 		ctx->state = SPOE_CTX_ST_ERROR;
@@ -1305,10 +1309,9 @@
 
   end:
 	/* Update runtinme agent info */
-	agent->frame_size = agent->max_frame_size;
-	list_for_each_entry(spoe_appctx, &agent->applets, list)
-		agent->frame_size = MIN(spoe_appctx->max_frame_size,
-					agent->frame_size);
+	agent->rt[tid].frame_size = agent->max_frame_size;
+	list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list)
+		HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, spoe_appctx->max_frame_size);
 }
 
 static int
@@ -1421,14 +1424,15 @@
 		default:
 			/* HELLO handshake is finished, set the idle timeout and
 			 * add the applet in the list of running applets. */
-			agent->applets_idle++;
+			agent->rt[tid].applets_idle++;
 			appctx->st0 = SPOE_APPCTX_ST_IDLE;
+			SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 			LIST_DEL(&SPOE_APPCTX(appctx)->list);
-			LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
+			LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+			SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 
 			/* Update runtinme agent info */
-			agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size,
-						agent->frame_size);
+			HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size);
 			goto next;
 	}
 
@@ -1465,13 +1469,13 @@
 		ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
 						SPOE_APPCTX(appctx)->max_frame_size);
 	}
-	else if (LIST_ISEMPTY(&agent->sending_queue)) {
+	else if (LIST_ISEMPTY(&agent->rt[tid].sending_queue)) {
 		*skip = 1;
 		ret   = 1;
 		goto end;
 	}
 	else {
-		ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+		ctx = LIST_NEXT(&agent->rt[tid].sending_queue, typeof(ctx), list);
 		ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
 						  SPOE_APPCTX(appctx)->max_frame_size);
 
@@ -1532,7 +1536,7 @@
   no_frag_frame_sent:
 	if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
 		appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
-		LIST_ADDQ(&agent->waiting_queue, &ctx->list);
+		LIST_ADDQ(&agent->rt[tid].waiting_queue, &ctx->list);
 	}
 	else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
 		appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
@@ -1660,7 +1664,7 @@
 			goto next;
 
 		case 0: /* ignore */
-			agent->sending_rate++;
+			agent->rt[tid].sending_rate++;
 			fpa++;
 			break;
 
@@ -1668,7 +1672,7 @@
 			break;
 
 		default:
-			agent->sending_rate++;
+			agent->rt[tid].sending_rate++;
 			fpa++;
 			break;
 	}
@@ -1703,11 +1707,13 @@
   stop:
 	if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) {
 		appctx->st0 = SPOE_APPCTX_ST_IDLE;
-		agent->applets_idle++;
+		agent->rt[tid].applets_idle++;
 	}
 	if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) {
+		SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 		LIST_DEL(&SPOE_APPCTX(appctx)->list);
-		LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
+		LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+		SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 		if (fpa)
 			SPOE_APPCTX(appctx)->task->expire =
 				tick_add_ifset(now_ms, agent->timeout.idle);
@@ -1868,14 +1874,14 @@
 
 		case SPOE_APPCTX_ST_IDLE:
 			if (stopping &&
-			    LIST_ISEMPTY(&agent->sending_queue) &&
+			    LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
 			    LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
 				SPOE_APPCTX(appctx)->task->expire =
 					tick_add_ifset(now_ms, agent->timeout.idle);
 				appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
 				goto switchstate;
 			}
-			agent->applets_idle--;
+			agent->rt[tid].applets_idle--;
 			appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
 			/* fall through */
 
@@ -1909,6 +1915,9 @@
 			return;
 	}
   out:
+	if (stopping)
+		spoe_wakeup_appctx(appctx);
+
 	if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY)
 		task_queue(SPOE_APPCTX(appctx)->task);
 	si_oc(si)->flags |= CF_READ_DONTWAIT;
@@ -1940,7 +1949,7 @@
 	memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size);
 
 	appctx->st0 = SPOE_APPCTX_ST_CONNECT;
-	if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL)
+	if ((SPOE_APPCTX(appctx)->task = task_new(1UL << tid)) == NULL)
 		goto out_free_spoe_appctx;
 
 	SPOE_APPCTX(appctx)->owner           = appctx;
@@ -1976,8 +1985,10 @@
 	strm->do_log = NULL;
 	strm->res.flags |= CF_READ_DONTWAIT;
 
-	LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
-	conf->agent->applets_act++;
+	SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
+	LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+	SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
+	conf->agent->rt[tid].applets_act++;
 
 	task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
 	task_wakeup(strm->task, TASK_WOKEN_INIT);
@@ -2008,9 +2019,9 @@
 	min_applets = min_applets_act(agent);
 
 	/* Check if we need to create a new SPOE applet or not. */
-	if (agent->applets_act >= min_applets &&
-	    agent->applets_idle &&
-	    agent->sending_rate)
+	if (agent->rt[tid].applets_act >= min_applets &&
+	    agent->rt[tid].applets_idle &&
+	    agent->rt[tid].sending_rate)
 		goto end;
 
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
@@ -2031,7 +2042,7 @@
 	/* Do not try to create a new applet if we have reached the maximum of
 	 * connection per seconds */
 	if (agent->cps_max > 0) {
-		if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) {
+		if (!freq_ctr_remain(&agent->rt[tid].conn_per_sec, agent->cps_max, 0)) {
 			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 				    " - cannot create SPOE appctx: max CPS reached\n",
 				    (int)now.tv_sec, (int)now.tv_usec, agent->id,
@@ -2052,41 +2063,43 @@
 
 		goto end;
 	}
-	if (agent->applets_act <= min_applets)
+	if (agent->rt[tid].applets_act <= min_applets)
 		SPOE_APPCTX(appctx)->flags |= SPOE_APPCTX_FL_PERSIST;
 
 	/* Increase the per-process number of cumulated connections */
 	if (agent->cps_max > 0)
-		update_freq_ctr(&agent->conn_per_sec, 1);
+		update_freq_ctr(&agent->rt[tid].conn_per_sec, 1);
 
   end:
 	/* The only reason to return an error is when there is no applet */
-	if (LIST_ISEMPTY(&agent->applets)) {
+	if (LIST_ISEMPTY(&agent->rt[tid].applets)) {
 		ctx->status_code = SPOE_CTX_ERR_RES;
 		return -1;
 	}
 
 	/* Add the SPOE context in the sending queue and update all running
 	 * info */
-	LIST_ADDQ(&agent->sending_queue, &ctx->list);
-	if (agent->sending_rate)
-		agent->sending_rate--;
+	LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
+	if (agent->rt[tid].sending_rate)
+		agent->rt[tid].sending_rate--;
 
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 		    " - Add stream in sending queue"
 		    " - applets_act=%u - applets_idle=%u - sending_rate=%u\n",
 		    (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
-		    ctx->strm, agent->applets_act, agent->applets_idle,
-		    agent->sending_rate);
+		    ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
+		    agent->rt[tid].sending_rate);
 
 	/* Finally try to wakeup the first IDLE applet found and move it at the
 	 * end of the list. */
-	list_for_each_entry(spoe_appctx, &agent->applets, list) {
+	list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) {
 		appctx = spoe_appctx->owner;
 		if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
 			spoe_wakeup_appctx(appctx);
+			SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 			LIST_DEL(&spoe_appctx->list);
-			LIST_ADDQ(&agent->applets, &spoe_appctx->list);
+			LIST_ADDQ(&agent->rt[tid].applets, &spoe_appctx->list);
+			SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 			break;
 		}
 	}
@@ -2189,7 +2202,7 @@
 	char   *p, *end;
 
 	p   = ctx->buffer->p;
-	end =  p + agent->frame_size - FRAME_HDR_SIZE;
+	end =  p + agent->rt[tid].frame_size - FRAME_HDR_SIZE;
 
 	if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */
 		/* Resume encoding of a SPOE message */
@@ -2239,7 +2252,7 @@
 		    (int)now.tv_sec, (int)now.tv_usec,
 		    agent->id, __FUNCTION__, s,
 		    ((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"),
-		    ctx->frag_ctx.spoe_appctx, (agent->frame_size - FRAME_HDR_SIZE),
+		    ctx->frag_ctx.spoe_appctx, (agent->rt[tid].frame_size - FRAME_HDR_SIZE),
 		    p - ctx->buffer->p);
 
 	ctx->buffer->i = p - ctx->buffer->p;
@@ -2263,7 +2276,7 @@
 		    (int)now.tv_sec, (int)now.tv_usec,
 		    agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx,
 		    ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff,
-		    (agent->frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p);
+		    (agent->rt[tid].frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p);
 
 	ctx->buffer->i = p - ctx->buffer->p;
 	ctx->flags |= SPOE_CTX_FL_FRAGMENTED;
@@ -2504,7 +2517,7 @@
 			     struct spoe_context *ctx, int dir)
 {
 	if (agent->eps_max > 0)
-		update_freq_ctr(&agent->err_per_sec, 1);
+		update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
 
 	if (agent->var_on_error) {
 		struct sample smp;
@@ -2557,7 +2570,7 @@
 
 	if (ctx->state == SPOE_CTX_ST_READY) {
 		if (agent->eps_max > 0) {
-			if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
+			if (!freq_ctr_remain(&agent->rt[tid].err_per_sec, agent->eps_max, 0)) {
 				SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 					    " - skip processing of messages: max EPS reached\n",
 					    (int)now.tv_sec, (int)now.tv_usec,
@@ -2791,6 +2804,7 @@
 			struct spoe_config *conf;
 			struct spoe_agent  *agent;
 			struct spoe_appctx *spoe_appctx;
+			int i;
 
 			if (fconf->id != spoe_filter_id)
 				continue;
@@ -2798,8 +2812,11 @@
 			conf  = fconf->conf;
 			agent = conf->agent;
 
-			list_for_each_entry(spoe_appctx, &agent->applets, list) {
-				spoe_wakeup_appctx(spoe_appctx->owner);
+			for (i = 0; i < global.nbthread; ++i) {
+				SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
+				list_for_each_entry(spoe_appctx, &agent->rt[i].applets, list)
+					spoe_wakeup_appctx(spoe_appctx->owner);
+				SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
 			}
 		}
 		p = p->next;
@@ -3177,7 +3194,9 @@
 		curagent->engine_id      = NULL;
 		curagent->var_pfx        = NULL;
 		curagent->var_on_error   = NULL;
-		curagent->flags          = (SPOE_FL_PIPELINING | SPOE_FL_ASYNC | SPOE_FL_SND_FRAGMENTATION);
+		curagent->flags          = (SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION);
+		if (global.nbthread == 1)
+			curagent->flags |= SPOE_FL_ASYNC;
 		curagent->cps_max        = 0;
 		curagent->eps_max        = 0;
 		curagent->max_frame_size = MAX_FRAME_SIZE;
@@ -3189,14 +3208,21 @@
 		LIST_INIT(&curagent->groups);
 		LIST_INIT(&curagent->messages);
 
-		curagent->frame_size   = curagent->max_frame_size;
-		curagent->applets_act  = 0;
-		curagent->applets_idle = 0;
-		curagent->sending_rate = 0;
-
-		LIST_INIT(&curagent->applets);
-		LIST_INIT(&curagent->sending_queue);
-		LIST_INIT(&curagent->waiting_queue);
+		if ((curagent->rt = calloc(global.nbthread, sizeof(*curagent->rt))) == NULL) {
+			Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+			err_code |= ERR_ALERT | ERR_ABORT;
+			goto out;
+		}
+		for (i = 0; i < global.nbthread; ++i) {
+			curagent->rt[i].frame_size   = curagent->max_frame_size;
+			curagent->rt[i].applets_act  = 0;
+			curagent->rt[i].applets_idle = 0;
+			curagent->rt[i].sending_rate = 0;
+			LIST_INIT(&curagent->rt[i].applets);
+			LIST_INIT(&curagent->rt[i].sending_queue);
+			LIST_INIT(&curagent->rt[i].waiting_queue);
+			SPIN_INIT(&curagent->rt[i].lock);
+		}
 	}
 	else if (!strcmp(args[0], "use-backend")) {
 		if (!*args[1]) {
@@ -3320,8 +3346,15 @@
 				goto out;
 			if (kwm == 1)
 				curagent->flags &= ~SPOE_FL_ASYNC;
-			else
-				curagent->flags |= SPOE_FL_ASYNC;
+			else {
+				if (global.nbthread == 1)
+					curagent->flags |= SPOE_FL_ASYNC;
+				else {
+					Warning("parsing [%s:%d] Async option is not supported with threads.\n",
+						file, linenum);
+					err_code |= ERR_WARN;
+				}
+			}
 			goto out;
 		}
 		else if (!strcmp(args[1], "send-frag-payload")) {