MEDIUM: threads/stick-tables: handle multithreads on stick tables

The stick table API was slightly reworked:

A global spin lock on stick table was added to perform lookup and
insert in a thread safe way. The handling of refcount on entries
is now handled directly by stick tables functions under protection
of this lock and was removed from the code of callers.

The "stktable_store" function is no more externalized and users should
now use "stktable_set_entry" in any case of insertion. This last one performs
a lookup followed by a store if not found. So the code using "stktable_store"
was re-worked.

Lookup, and set_entry functions automatically increase the refcount
of the returned/stored entry.

The function "sticktable_touch" was renamed "sticktable_touch_local"
and is now able to decrease the refcount if last arg is set to true. It
is allowing to release the entry without taking the lock twice.

A new function "sticktable_touch_remote" is now used to insert
entries coming from remote peers at the right place in the update tree.
The code of peer update was re-worked to use this new function.
This function is also able to decrease the refcount if wanted.

The function "stksess_kill" also handle a parameter to decrease
the refcount on the entry.

A read/write lock is added on each entry to protect the data content
updates of the entry.
diff --git a/include/common/hathreads.h b/include/common/hathreads.h
index 9946d51..e997ea3 100644
--- a/include/common/hathreads.h
+++ b/include/common/hathreads.h
@@ -152,6 +152,8 @@
 	UPDATED_SERVERS_LOCK,
 	LBPRM_LOCK,
 	SIGNALS_LOCK,
+	STK_TABLE_LOCK,
+	STK_SESS_LOCK,
 	LOCK_LABELS
 };
 struct lock_stat {
@@ -237,7 +239,7 @@
 	const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
 					   "TASK_RQ", "TASK_WQ", "POOL",
 					   "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
-					   "UPDATED_SERVERS", "LBPRM", "SIGNALS" };
+					   "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS" };
 	int lbl;
 
 	for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
diff --git a/include/proto/session.h b/include/proto/session.h
index cb4deec..3dead44 100644
--- a/include/proto/session.h
+++ b/include/proto/session.h
@@ -46,19 +46,26 @@
 {
 	void *ptr;
 	int i;
+	struct stksess *ts;
 
 	for (i = 0; i < MAX_SESS_STKCTR; i++) {
 		struct stkctr *stkctr = &sess->stkctr[i];
 
-		if (!stkctr_entry(stkctr))
+		ts = stkctr_entry(stkctr);
+		if (!ts)
 			continue;
 
-		ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_CONN_CUR);
-		if (ptr)
+		ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_CONN_CUR);
+		if (ptr) {
+			RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
+
 			stktable_data_cast(ptr, conn_cur)--;
-		stkctr_entry(stkctr)->ref_cnt--;
-		stksess_kill_if_expired(stkctr->table, stkctr_entry(stkctr));
+
+			RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+		}
+
 		stkctr_set_entry(stkctr, NULL);
+		stksess_kill_if_expired(stkctr->table, ts, 1);
 	}
 }
 
diff --git a/include/proto/stick_table.h b/include/proto/stick_table.h
index f48b9eb..8c9f834 100644
--- a/include/proto/stick_table.h
+++ b/include/proto/stick_table.h
@@ -34,17 +34,15 @@
 struct stksess *stksess_new(struct stktable *t, struct stktable_key *key);
 void stksess_setkey(struct stktable *t, struct stksess *ts, struct stktable_key *key);
 void stksess_free(struct stktable *t, struct stksess *ts);
-void stksess_kill(struct stktable *t, struct stksess *ts);
+int stksess_kill(struct stktable *t, struct stksess *ts, int decrefcount);
 
 int stktable_init(struct stktable *t);
 int stktable_parse_type(char **args, int *idx, unsigned long *type, size_t *key_size);
 struct stksess *stktable_get_entry(struct stktable *table, struct stktable_key *key);
-struct stksess *stktable_store(struct stktable *t, struct stksess *ts, int local);
-struct stksess *stktable_store_with_exp(struct stktable *t, struct stksess *ts,
-                                        int local, int expire);
-struct stksess *stktable_touch_with_exp(struct stktable *t, struct stksess *ts,
-                                        int local, int expire);
-struct stksess *stktable_touch(struct stktable *t, struct stksess *ts, int local);
+struct stksess *stktable_set_entry(struct stktable *table, struct stksess *nts);
+void stktable_touch_with_exp(struct stktable *t, struct stksess *ts, int decrefcount, int expire);
+void stktable_touch_remote(struct stktable *t, struct stksess *ts, int decrefcnt);
+void stktable_touch_local(struct stktable *t, struct stksess *ts, int decrefccount);
 struct stksess *stktable_lookup(struct stktable *t, struct stksess *ts);
 struct stksess *stktable_lookup_key(struct stktable *t, struct stktable_key *key);
 struct stksess *stktable_update_key(struct stktable *table, struct stktable_key *key);
@@ -52,12 +50,13 @@
 struct stktable_key *stktable_fetch_key(struct stktable *t, struct proxy *px, struct session *sess,
                                         struct stream *strm, unsigned int opt,
                                         struct sample_expr *expr, struct sample *smp);
-struct stkctr *smp_fetch_sc_stkctr(struct session *sess, struct stream *strm, const struct arg *args, const char *kw);
-struct stkctr *smp_create_src_stkctr(struct session *sess, struct stream *strm, const struct arg *args, const char *kw);
+struct stkctr *smp_fetch_sc_stkctr(struct session *sess, struct stream *strm, const struct arg *args, const char *kw, struct stkctr *stkctr);
+struct stkctr *smp_create_src_stkctr(struct session *sess, struct stream *strm, const struct arg *args, const char *kw, struct stkctr *stkctr);
 int stktable_compatible_sample(struct sample_expr *expr, unsigned long table_type);
 int stktable_register_data_store(int idx, const char *name, int std_type, int arg_type);
 int stktable_get_data_type(char *name);
 int stktable_trash_oldest(struct stktable *t, int to_batch);
+int __stksess_kill(struct stktable *t, struct stksess *ts);
 
 /* return allocation size for standard data type <type> */
 static inline int stktable_type_size(int type)
@@ -132,10 +131,29 @@
 }
 
 /* kill an entry if it's expired and its ref_cnt is zero */
-static inline void stksess_kill_if_expired(struct stktable *t, struct stksess *ts)
+static inline int __stksess_kill_if_expired(struct stktable *t, struct stksess *ts)
 {
 	if (t->expire != TICK_ETERNITY && tick_is_expired(ts->expire, now_ms))
-		stksess_kill(t, ts);
+		return __stksess_kill(t, ts);
+
+	return 0;
+}
+
+static inline int stksess_kill_if_expired(struct stktable *t, struct stksess *ts, int decrefcnt)
+{
+	int ret;
+
+	SPIN_LOCK(STK_TABLE_LOCK, &t->lock);
+
+	if (decrefcnt)
+		ts->ref_cnt--;
+
+	if (t->expire != TICK_ETERNITY && tick_is_expired(ts->expire, now_ms))
+		ret = __stksess_kill_if_expired(t, ts);
+
+	SPIN_UNLOCK(STK_TABLE_LOCK, &t->lock);
+
+	return ret;
 }
 
 /* sets the stick counter's entry pointer */
diff --git a/include/proto/stream.h b/include/proto/stream.h
index aae7d34..3efb42b 100644
--- a/include/proto/stream.h
+++ b/include/proto/stream.h
@@ -90,20 +90,26 @@
 {
 	void *ptr;
 	int i;
+	struct stksess *ts;
 
 	for (i = 0; i < MAX_SESS_STKCTR; i++) {
-		if (!stkctr_entry(&s->stkctr[i]))
+		ts = stkctr_entry(&s->stkctr[i]);
+		if (!ts)
 			continue;
 
 		if (stkctr_entry(&s->sess->stkctr[i]))
 			continue;
 
-		ptr = stktable_data_ptr(s->stkctr[i].table, stkctr_entry(&s->stkctr[i]), STKTABLE_DT_CONN_CUR);
-		if (ptr)
+		ptr = stktable_data_ptr(s->stkctr[i].table, ts, STKTABLE_DT_CONN_CUR);
+		if (ptr) {
+			RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
+
 			stktable_data_cast(ptr, conn_cur)--;
-		stkctr_entry(&s->stkctr[i])->ref_cnt--;
-		stksess_kill_if_expired(s->stkctr[i].table, stkctr_entry(&s->stkctr[i]));
+
+			RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+		}
 		stkctr_set_entry(&s->stkctr[i], NULL);
+		stksess_kill_if_expired(s->stkctr[i].table, ts, 1);
 	}
 }
 
@@ -114,11 +120,13 @@
  */
 static inline void stream_stop_content_counters(struct stream *s)
 {
+	struct stksess *ts;
 	void *ptr;
 	int i;
 
 	for (i = 0; i < MAX_SESS_STKCTR; i++) {
-		if (!stkctr_entry(&s->stkctr[i]))
+		ts = stkctr_entry(&s->stkctr[i]);
+		if (!ts)
 			continue;
 
 		if (stkctr_entry(&s->sess->stkctr[i]))
@@ -127,12 +135,16 @@
 		if (!(stkctr_flags(&s->stkctr[i]) & STKCTR_TRACK_CONTENT))
 			continue;
 
-		ptr = stktable_data_ptr(s->stkctr[i].table, stkctr_entry(&s->stkctr[i]), STKTABLE_DT_CONN_CUR);
-		if (ptr)
+		ptr = stktable_data_ptr(s->stkctr[i].table, ts, STKTABLE_DT_CONN_CUR);
+		if (ptr) {
+			RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
+
 			stktable_data_cast(ptr, conn_cur)--;
-		stkctr_entry(&s->stkctr[i])->ref_cnt--;
-		stksess_kill_if_expired(s->stkctr[i].table, stkctr_entry(&s->stkctr[i]));
+
+			RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
+		}
 		stkctr_set_entry(&s->stkctr[i], NULL);
+		stksess_kill_if_expired(s->stkctr[i].table, ts, 1);
 	}
 }
 
@@ -144,6 +156,8 @@
 {
 	void *ptr;
 
+	RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
+
 	ptr = stktable_data_ptr(t, ts, STKTABLE_DT_CONN_CUR);
 	if (ptr)
 		stktable_data_cast(ptr, conn_cur)++;
@@ -158,6 +172,8 @@
 				       t->data_arg[STKTABLE_DT_CONN_RATE].u, 1);
 	if (tick_isset(t->expire))
 		ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
+
+	RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
 }
 
 /* Enable tracking of stream counters as <stkctr> on stksess <ts>. The caller is
@@ -166,10 +182,10 @@
  */
 static inline void stream_track_stkctr(struct stkctr *ctr, struct stktable *t, struct stksess *ts)
 {
+	/* Why this test ???? */
 	if (stkctr_entry(ctr))
 		return;
 
-	ts->ref_cnt++;
 	ctr->table = t;
 	stkctr_set_entry(ctr, ts);
 	stream_start_counters(t, ts);
@@ -178,26 +194,33 @@
 /* Increase the number of cumulated HTTP requests in the tracked counters */
 static void inline stream_inc_http_req_ctr(struct stream *s)
 {
+	struct stksess *ts;
 	void *ptr;
 	int i;
 
 	for (i = 0; i < MAX_SESS_STKCTR; i++) {
 		struct stkctr *stkctr = &s->stkctr[i];
 
-		if (!stkctr_entry(stkctr)) {
+		ts = stkctr_entry(stkctr);
+		if (!ts) {
 			stkctr = &s->sess->stkctr[i];
-			if (!stkctr_entry(stkctr))
+			ts = stkctr_entry(stkctr);
+			if (!ts)
 				continue;
 		}
 
-		ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_REQ_CNT);
+		RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
+
+		ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_REQ_CNT);
 		if (ptr)
 			stktable_data_cast(ptr, http_req_cnt)++;
 
-		ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_REQ_RATE);
+		ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_REQ_RATE);
 		if (ptr)
 			update_freq_ctr_period(&stktable_data_cast(ptr, http_req_rate),
 					       stkctr->table->data_arg[STKTABLE_DT_HTTP_REQ_RATE].u, 1);
+
+		RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
 	}
 }
 
@@ -206,26 +229,32 @@
  */
 static void inline stream_inc_be_http_req_ctr(struct stream *s)
 {
+	struct stksess *ts;
 	void *ptr;
 	int i;
 
 	for (i = 0; i < MAX_SESS_STKCTR; i++) {
 		struct stkctr *stkctr = &s->stkctr[i];
 
-		if (!stkctr_entry(stkctr))
+		ts = stkctr_entry(stkctr);
+		if (!ts)
 			continue;
 
 		if (!(stkctr_flags(&s->stkctr[i]) & STKCTR_TRACK_BACKEND))
 			continue;
 
-		ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_REQ_CNT);
+		RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
+
+		ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_REQ_CNT);
 		if (ptr)
 			stktable_data_cast(ptr, http_req_cnt)++;
 
-		ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_REQ_RATE);
+		ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_REQ_RATE);
 		if (ptr)
 			update_freq_ctr_period(&stktable_data_cast(ptr, http_req_rate),
 			                       stkctr->table->data_arg[STKTABLE_DT_HTTP_REQ_RATE].u, 1);
+
+		RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
 	}
 }
 
@@ -237,26 +266,33 @@
  */
 static void inline stream_inc_http_err_ctr(struct stream *s)
 {
+	struct stksess *ts;
 	void *ptr;
 	int i;
 
 	for (i = 0; i < MAX_SESS_STKCTR; i++) {
 		struct stkctr *stkctr = &s->stkctr[i];
 
-		if (!stkctr_entry(stkctr)) {
+		ts = stkctr_entry(stkctr);
+		if (!ts) {
 			stkctr = &s->sess->stkctr[i];
-			if (!stkctr_entry(stkctr))
+			ts = stkctr_entry(stkctr);
+			if (!ts)
 				continue;
 		}
 
-		ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_ERR_CNT);
+		RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
+
+		ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_ERR_CNT);
 		if (ptr)
 			stktable_data_cast(ptr, http_err_cnt)++;
 
-		ptr = stktable_data_ptr(stkctr->table, stkctr_entry(stkctr), STKTABLE_DT_HTTP_ERR_RATE);
+		ptr = stktable_data_ptr(stkctr->table, ts, STKTABLE_DT_HTTP_ERR_RATE);
 		if (ptr)
 			update_freq_ctr_period(&stktable_data_cast(ptr, http_err_rate),
 			                       stkctr->table->data_arg[STKTABLE_DT_HTTP_ERR_RATE].u, 1);
+
+		RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
 	}
 }
 
diff --git a/include/types/stick_table.h b/include/types/stick_table.h
index 77eeccd..4f5de99 100644
--- a/include/types/stick_table.h
+++ b/include/types/stick_table.h
@@ -129,6 +129,9 @@
 struct stksess {
 	unsigned int expire;      /* session expiration date */
 	unsigned int ref_cnt;     /* reference count, can only purge when zero */
+#ifdef USE_THREAD
+	HA_RWLOCK_T lock;         /* lock related to the table entry */
+#endif
 	struct eb32_node exp;     /* ebtree node used to hold the session in expiration tree */
 	struct eb32_node upd;     /* ebtree node used to hold the update sequence tree */
 	struct ebmb_node key;     /* ebtree node used to hold the session in table */
@@ -143,6 +146,9 @@
 	struct eb_root exps;      /* head of sticky session expiration tree */
 	struct eb_root updates;   /* head of sticky updates sequence tree */
 	struct pool_head *pool;   /* pool used to allocate sticky sessions */
+#ifdef USE_THREAD
+	HA_SPINLOCK_T lock;       /* spin lock related to the table */
+#endif
 	struct task *exp_task;    /* expiration task */
 	struct task *sync_task;   /* sync task */
 	unsigned int update;