MEDIUM: thread/spoe: Make the SPOE thread-safe

Because there is not migration mechanism yet, all runtime information about an
SPOE agent are thread-local and async exchanges with agents are disabled when we
have serveral threads. Howerver, pipelining is still available. So for now, the
thread part of the SPOE is pretty simple.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 774fe7b..39d8220 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -167,6 +167,7 @@
 	COMP_POOL_LOCK,
 	LUA_LOCK,
 	NOTIF_LOCK,
+	SPOE_APPLET_LOCK,
 	LOCK_LABELS
 };
 struct lock_stat {
@@ -255,7 +256,7 @@
 					   "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
 					   "APPLETS", "PEER", "BUF_WQ", "STREAMS", "SSL", "SSL_GEN_CERTS",
 					   "PATREF", "PATEXP", "PATLRU", "VARS", "COMP_POOL", "LUA",
-					   "NOTIF" };
+					   "NOTIF", "SPOE_APPLET" };
 	int lbl;
 
 	for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
diff --git a/include/types/spoe.h b/include/types/spoe.h
index 108bc98..aead2ba 100644
--- a/include/types/spoe.h
+++ b/include/types/spoe.h
@@ -24,6 +24,7 @@
 
 #include <common/buffer.h>
 #include <common/mini-clist.h>
+#include <common/hathreads.h>
 
 #include <types/filters.h>
 #include <types/freq_ctr.h>
@@ -251,17 +252,23 @@
 	struct list messages;                 /* list of all messages attached to this SPOE agent */
 
 	/* running info */
-	unsigned int          frame_size;     /* current maximum frame size, only used to encode messages */
-	unsigned int          applets_act;    /* # of applets alive at a time */
-	unsigned int          applets_idle;   /* # of applets in the state SPOE_APPCTX_ST_IDLE */
-	unsigned int          sending_rate;   /* the global sending rate */
+	struct {
+		unsigned int    frame_size;     /* current maximum frame size, only used to encode messages */
+		unsigned int    applets_act;    /* # of applets alive at a time */
+		unsigned int    applets_idle;   /* # of applets in the state SPOE_APPCTX_ST_IDLE */
+		unsigned int    sending_rate;   /* the global sending rate */
+
+		struct freq_ctr conn_per_sec;   /* connections per second */
+		struct freq_ctr err_per_sec;    /* connetion errors per second */
 
-	struct freq_ctr       conn_per_sec;   /* connections per second */
-	struct freq_ctr       err_per_sec;    /* connetion errors per second */
+		struct list     applets;        /* List of available SPOE applets */
+		struct list     sending_queue;  /* Queue of streams waiting to send data */
+		struct list     waiting_queue;  /* Queue of streams waiting for a ack, in async mode */
 
-	struct list           applets;        /* List of available SPOE applets */
-	struct list           sending_queue;  /* Queue of streams waiting to send data */
-	struct list           waiting_queue;  /* Queue of streams waiting for a ack, in async mode */
+#ifdef USE_THREAD
+		HA_SPINLOCK_T   lock;
+#endif
+	} *rt;
 
 };
 
diff --git a/src/flt_spoe.c b/src/flt_spoe.c
index 7fc4ed8..938faab 100644
--- a/src/flt_spoe.c
+++ b/src/flt_spoe.c
@@ -153,6 +153,7 @@
 {
 	struct spoe_message *msg, *msgback;
 	struct spoe_group   *grp, *grpback;
+	int                  i;
 
 	if (!agent)
 		return;
@@ -169,6 +170,9 @@
 		LIST_DEL(&grp->list);
 		spoe_release_group(grp);
 	}
+	for (i = 0; i < global.nbthread; ++i)
+		SPIN_DESTROY(&agent->rt[i].lock);
+	free(agent->rt);
 	free(agent);
 }
 
@@ -974,7 +978,7 @@
 
 	/* Try to find the corresponding SPOE context */
 	if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
-		list_for_each_entry((*ctx), &agent->waiting_queue, list) {
+		list_for_each_entry((*ctx), &agent->rt[tid].waiting_queue, list) {
 			if ((*ctx)->stream_id == (unsigned int)stream_id &&
 			    (*ctx)->frame_id  == (unsigned int)frame_id)
 				goto found;
@@ -1234,7 +1238,7 @@
 		    __FUNCTION__, appctx);
 
 	/* Remove applet from the list of running applets */
-	agent->applets_act--;
+	agent->rt[tid].applets_act--;
 	if (!LIST_ISEMPTY(&spoe_appctx->list)) {
 		LIST_DEL(&spoe_appctx->list);
 		LIST_INIT(&spoe_appctx->list);
@@ -1243,7 +1247,7 @@
 	/* Shutdown the server connection, if needed */
 	if (appctx->st0 != SPOE_APPCTX_ST_END) {
 		if (appctx->st0 == SPOE_APPCTX_ST_IDLE)
-			agent->applets_idle--;
+			agent->rt[tid].applets_idle--;
 
 		appctx->st0 = SPOE_APPCTX_ST_END;
 		if (spoe_appctx->status_code == SPOE_FRM_ERR_NONE)
@@ -1284,18 +1288,18 @@
 			    &spoe_appctx->buffer_wait);
 	pool_free2(pool2_spoe_appctx, spoe_appctx);
 
-	if (!LIST_ISEMPTY(&agent->applets))
+	if (!LIST_ISEMPTY(&agent->rt[tid].applets))
 		goto end;
 
 	/* If this was the last running applet, notify all waiting streams */
-	list_for_each_entry_safe(ctx, back, &agent->sending_queue, list) {
+	list_for_each_entry_safe(ctx, back, &agent->rt[tid].sending_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
 		ctx->state = SPOE_CTX_ST_ERROR;
 		ctx->status_code = (spoe_appctx->status_code + 0x100);
 		task_wakeup(ctx->strm->task, TASK_WOKEN_MSG);
 	}
-	list_for_each_entry_safe(ctx, back, &agent->waiting_queue, list) {
+	list_for_each_entry_safe(ctx, back, &agent->rt[tid].waiting_queue, list) {
 		LIST_DEL(&ctx->list);
 		LIST_INIT(&ctx->list);
 		ctx->state = SPOE_CTX_ST_ERROR;
@@ -1305,10 +1309,9 @@
 
   end:
 	/* Update runtinme agent info */
-	agent->frame_size = agent->max_frame_size;
-	list_for_each_entry(spoe_appctx, &agent->applets, list)
-		agent->frame_size = MIN(spoe_appctx->max_frame_size,
-					agent->frame_size);
+	agent->rt[tid].frame_size = agent->max_frame_size;
+	list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list)
+		HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, spoe_appctx->max_frame_size);
 }
 
 static int
@@ -1421,14 +1424,15 @@
 		default:
 			/* HELLO handshake is finished, set the idle timeout and
 			 * add the applet in the list of running applets. */
-			agent->applets_idle++;
+			agent->rt[tid].applets_idle++;
 			appctx->st0 = SPOE_APPCTX_ST_IDLE;
+			SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 			LIST_DEL(&SPOE_APPCTX(appctx)->list);
-			LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
+			LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+			SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 
 			/* Update runtinme agent info */
-			agent->frame_size = MIN(SPOE_APPCTX(appctx)->max_frame_size,
-						agent->frame_size);
+			HA_ATOMIC_UPDATE_MIN(&agent->rt[tid].frame_size, SPOE_APPCTX(appctx)->max_frame_size);
 			goto next;
 	}
 
@@ -1465,13 +1469,13 @@
 		ret = spoe_prepare_hafrag_frame(appctx, ctx, frame,
 						SPOE_APPCTX(appctx)->max_frame_size);
 	}
-	else if (LIST_ISEMPTY(&agent->sending_queue)) {
+	else if (LIST_ISEMPTY(&agent->rt[tid].sending_queue)) {
 		*skip = 1;
 		ret   = 1;
 		goto end;
 	}
 	else {
-		ctx = LIST_NEXT(&agent->sending_queue, typeof(ctx), list);
+		ctx = LIST_NEXT(&agent->rt[tid].sending_queue, typeof(ctx), list);
 		ret = spoe_prepare_hanotify_frame(appctx, ctx, frame,
 						  SPOE_APPCTX(appctx)->max_frame_size);
 
@@ -1532,7 +1536,7 @@
   no_frag_frame_sent:
 	if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_ASYNC) {
 		appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
-		LIST_ADDQ(&agent->waiting_queue, &ctx->list);
+		LIST_ADDQ(&agent->rt[tid].waiting_queue, &ctx->list);
 	}
 	else if (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PIPELINING) {
 		appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
@@ -1660,7 +1664,7 @@
 			goto next;
 
 		case 0: /* ignore */
-			agent->sending_rate++;
+			agent->rt[tid].sending_rate++;
 			fpa++;
 			break;
 
@@ -1668,7 +1672,7 @@
 			break;
 
 		default:
-			agent->sending_rate++;
+			agent->rt[tid].sending_rate++;
 			fpa++;
 			break;
 	}
@@ -1703,11 +1707,13 @@
   stop:
 	if (appctx->st0 == SPOE_APPCTX_ST_PROCESSING) {
 		appctx->st0 = SPOE_APPCTX_ST_IDLE;
-		agent->applets_idle++;
+		agent->rt[tid].applets_idle++;
 	}
 	if (fpa || (SPOE_APPCTX(appctx)->flags & SPOE_APPCTX_FL_PERSIST)) {
+		SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 		LIST_DEL(&SPOE_APPCTX(appctx)->list);
-		LIST_ADD(&agent->applets, &SPOE_APPCTX(appctx)->list);
+		LIST_ADD(&agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+		SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 		if (fpa)
 			SPOE_APPCTX(appctx)->task->expire =
 				tick_add_ifset(now_ms, agent->timeout.idle);
@@ -1868,14 +1874,14 @@
 
 		case SPOE_APPCTX_ST_IDLE:
 			if (stopping &&
-			    LIST_ISEMPTY(&agent->sending_queue) &&
+			    LIST_ISEMPTY(&agent->rt[tid].sending_queue) &&
 			    LIST_ISEMPTY(&SPOE_APPCTX(appctx)->waiting_queue)) {
 				SPOE_APPCTX(appctx)->task->expire =
 					tick_add_ifset(now_ms, agent->timeout.idle);
 				appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
 				goto switchstate;
 			}
-			agent->applets_idle--;
+			agent->rt[tid].applets_idle--;
 			appctx->st0 = SPOE_APPCTX_ST_PROCESSING;
 			/* fall through */
 
@@ -1909,6 +1915,9 @@
 			return;
 	}
   out:
+	if (stopping)
+		spoe_wakeup_appctx(appctx);
+
 	if (SPOE_APPCTX(appctx)->task->expire != TICK_ETERNITY)
 		task_queue(SPOE_APPCTX(appctx)->task);
 	si_oc(si)->flags |= CF_READ_DONTWAIT;
@@ -1940,7 +1949,7 @@
 	memset(appctx->ctx.spoe.ptr, 0, pool2_spoe_appctx->size);
 
 	appctx->st0 = SPOE_APPCTX_ST_CONNECT;
-	if ((SPOE_APPCTX(appctx)->task = task_new(MAX_THREADS_MASK)) == NULL)
+	if ((SPOE_APPCTX(appctx)->task = task_new(1UL << tid)) == NULL)
 		goto out_free_spoe_appctx;
 
 	SPOE_APPCTX(appctx)->owner           = appctx;
@@ -1976,8 +1985,10 @@
 	strm->do_log = NULL;
 	strm->res.flags |= CF_READ_DONTWAIT;
 
-	LIST_ADDQ(&conf->agent->applets, &SPOE_APPCTX(appctx)->list);
-	conf->agent->applets_act++;
+	SPIN_LOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
+	LIST_ADDQ(&conf->agent->rt[tid].applets, &SPOE_APPCTX(appctx)->list);
+	SPIN_UNLOCK(SPOE_APPLET_LOCK, &conf->agent->rt[tid].lock);
+	conf->agent->rt[tid].applets_act++;
 
 	task_wakeup(SPOE_APPCTX(appctx)->task, TASK_WOKEN_INIT);
 	task_wakeup(strm->task, TASK_WOKEN_INIT);
@@ -2008,9 +2019,9 @@
 	min_applets = min_applets_act(agent);
 
 	/* Check if we need to create a new SPOE applet or not. */
-	if (agent->applets_act >= min_applets &&
-	    agent->applets_idle &&
-	    agent->sending_rate)
+	if (agent->rt[tid].applets_act >= min_applets &&
+	    agent->rt[tid].applets_idle &&
+	    agent->rt[tid].sending_rate)
 		goto end;
 
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
@@ -2031,7 +2042,7 @@
 	/* Do not try to create a new applet if we have reached the maximum of
 	 * connection per seconds */
 	if (agent->cps_max > 0) {
-		if (!freq_ctr_remain(&agent->conn_per_sec, agent->cps_max, 0)) {
+		if (!freq_ctr_remain(&agent->rt[tid].conn_per_sec, agent->cps_max, 0)) {
 			SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 				    " - cannot create SPOE appctx: max CPS reached\n",
 				    (int)now.tv_sec, (int)now.tv_usec, agent->id,
@@ -2052,41 +2063,43 @@
 
 		goto end;
 	}
-	if (agent->applets_act <= min_applets)
+	if (agent->rt[tid].applets_act <= min_applets)
 		SPOE_APPCTX(appctx)->flags |= SPOE_APPCTX_FL_PERSIST;
 
 	/* Increase the per-process number of cumulated connections */
 	if (agent->cps_max > 0)
-		update_freq_ctr(&agent->conn_per_sec, 1);
+		update_freq_ctr(&agent->rt[tid].conn_per_sec, 1);
 
   end:
 	/* The only reason to return an error is when there is no applet */
-	if (LIST_ISEMPTY(&agent->applets)) {
+	if (LIST_ISEMPTY(&agent->rt[tid].applets)) {
 		ctx->status_code = SPOE_CTX_ERR_RES;
 		return -1;
 	}
 
 	/* Add the SPOE context in the sending queue and update all running
 	 * info */
-	LIST_ADDQ(&agent->sending_queue, &ctx->list);
-	if (agent->sending_rate)
-		agent->sending_rate--;
+	LIST_ADDQ(&agent->rt[tid].sending_queue, &ctx->list);
+	if (agent->rt[tid].sending_rate)
+		agent->rt[tid].sending_rate--;
 
 	SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 		    " - Add stream in sending queue"
 		    " - applets_act=%u - applets_idle=%u - sending_rate=%u\n",
 		    (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__,
-		    ctx->strm, agent->applets_act, agent->applets_idle,
-		    agent->sending_rate);
+		    ctx->strm, agent->rt[tid].applets_act, agent->rt[tid].applets_idle,
+		    agent->rt[tid].sending_rate);
 
 	/* Finally try to wakeup the first IDLE applet found and move it at the
 	 * end of the list. */
-	list_for_each_entry(spoe_appctx, &agent->applets, list) {
+	list_for_each_entry(spoe_appctx, &agent->rt[tid].applets, list) {
 		appctx = spoe_appctx->owner;
 		if (appctx->st0 == SPOE_APPCTX_ST_IDLE) {
 			spoe_wakeup_appctx(appctx);
+			SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 			LIST_DEL(&spoe_appctx->list);
-			LIST_ADDQ(&agent->applets, &spoe_appctx->list);
+			LIST_ADDQ(&agent->rt[tid].applets, &spoe_appctx->list);
+			SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[tid].lock);
 			break;
 		}
 	}
@@ -2189,7 +2202,7 @@
 	char   *p, *end;
 
 	p   = ctx->buffer->p;
-	end =  p + agent->frame_size - FRAME_HDR_SIZE;
+	end =  p + agent->rt[tid].frame_size - FRAME_HDR_SIZE;
 
 	if (type == SPOE_MSGS_BY_EVENT) { /* Loop on messages by event */
 		/* Resume encoding of a SPOE message */
@@ -2239,7 +2252,7 @@
 		    (int)now.tv_sec, (int)now.tv_usec,
 		    agent->id, __FUNCTION__, s,
 		    ((ctx->flags & SPOE_CTX_FL_FRAGMENTED) ? "last fragment of" : "unfragmented"),
-		    ctx->frag_ctx.spoe_appctx, (agent->frame_size - FRAME_HDR_SIZE),
+		    ctx->frag_ctx.spoe_appctx, (agent->rt[tid].frame_size - FRAME_HDR_SIZE),
 		    p - ctx->buffer->p);
 
 	ctx->buffer->i = p - ctx->buffer->p;
@@ -2263,7 +2276,7 @@
 		    (int)now.tv_sec, (int)now.tv_usec,
 		    agent->id, __FUNCTION__, s, ctx->frag_ctx.spoe_appctx,
 		    ctx->frag_ctx.curmsg, ctx->frag_ctx.curarg, ctx->frag_ctx.curoff,
-		    (agent->frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p);
+		    (agent->rt[tid].frame_size - FRAME_HDR_SIZE), p - ctx->buffer->p);
 
 	ctx->buffer->i = p - ctx->buffer->p;
 	ctx->flags |= SPOE_CTX_FL_FRAGMENTED;
@@ -2504,7 +2517,7 @@
 			     struct spoe_context *ctx, int dir)
 {
 	if (agent->eps_max > 0)
-		update_freq_ctr(&agent->err_per_sec, 1);
+		update_freq_ctr(&agent->rt[tid].err_per_sec, 1);
 
 	if (agent->var_on_error) {
 		struct sample smp;
@@ -2557,7 +2570,7 @@
 
 	if (ctx->state == SPOE_CTX_ST_READY) {
 		if (agent->eps_max > 0) {
-			if (!freq_ctr_remain(&agent->err_per_sec, agent->eps_max, 0)) {
+			if (!freq_ctr_remain(&agent->rt[tid].err_per_sec, agent->eps_max, 0)) {
 				SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p"
 					    " - skip processing of messages: max EPS reached\n",
 					    (int)now.tv_sec, (int)now.tv_usec,
@@ -2791,6 +2804,7 @@
 			struct spoe_config *conf;
 			struct spoe_agent  *agent;
 			struct spoe_appctx *spoe_appctx;
+			int i;
 
 			if (fconf->id != spoe_filter_id)
 				continue;
@@ -2798,8 +2812,11 @@
 			conf  = fconf->conf;
 			agent = conf->agent;
 
-			list_for_each_entry(spoe_appctx, &agent->applets, list) {
-				spoe_wakeup_appctx(spoe_appctx->owner);
+			for (i = 0; i < global.nbthread; ++i) {
+				SPIN_LOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
+				list_for_each_entry(spoe_appctx, &agent->rt[i].applets, list)
+					spoe_wakeup_appctx(spoe_appctx->owner);
+				SPIN_UNLOCK(SPOE_APPLET_LOCK, &agent->rt[i].lock);
 			}
 		}
 		p = p->next;
@@ -3177,7 +3194,9 @@
 		curagent->engine_id      = NULL;
 		curagent->var_pfx        = NULL;
 		curagent->var_on_error   = NULL;
-		curagent->flags          = (SPOE_FL_PIPELINING | SPOE_FL_ASYNC | SPOE_FL_SND_FRAGMENTATION);
+		curagent->flags          = (SPOE_FL_PIPELINING | SPOE_FL_SND_FRAGMENTATION);
+		if (global.nbthread == 1)
+			curagent->flags |= SPOE_FL_ASYNC;
 		curagent->cps_max        = 0;
 		curagent->eps_max        = 0;
 		curagent->max_frame_size = MAX_FRAME_SIZE;
@@ -3189,14 +3208,21 @@
 		LIST_INIT(&curagent->groups);
 		LIST_INIT(&curagent->messages);
 
-		curagent->frame_size   = curagent->max_frame_size;
-		curagent->applets_act  = 0;
-		curagent->applets_idle = 0;
-		curagent->sending_rate = 0;
-
-		LIST_INIT(&curagent->applets);
-		LIST_INIT(&curagent->sending_queue);
-		LIST_INIT(&curagent->waiting_queue);
+		if ((curagent->rt = calloc(global.nbthread, sizeof(*curagent->rt))) == NULL) {
+			Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
+			err_code |= ERR_ALERT | ERR_ABORT;
+			goto out;
+		}
+		for (i = 0; i < global.nbthread; ++i) {
+			curagent->rt[i].frame_size   = curagent->max_frame_size;
+			curagent->rt[i].applets_act  = 0;
+			curagent->rt[i].applets_idle = 0;
+			curagent->rt[i].sending_rate = 0;
+			LIST_INIT(&curagent->rt[i].applets);
+			LIST_INIT(&curagent->rt[i].sending_queue);
+			LIST_INIT(&curagent->rt[i].waiting_queue);
+			SPIN_INIT(&curagent->rt[i].lock);
+		}
 	}
 	else if (!strcmp(args[0], "use-backend")) {
 		if (!*args[1]) {
@@ -3320,8 +3346,15 @@
 				goto out;
 			if (kwm == 1)
 				curagent->flags &= ~SPOE_FL_ASYNC;
-			else
-				curagent->flags |= SPOE_FL_ASYNC;
+			else {
+				if (global.nbthread == 1)
+					curagent->flags |= SPOE_FL_ASYNC;
+				else {
+					Warning("parsing [%s:%d] Async option is not supported with threads.\n",
+						file, linenum);
+					err_code |= ERR_WARN;
+				}
+			}
 			goto out;
 		}
 		else if (!strcmp(args[1], "send-frag-payload")) {