MEDIUM: threads/xref: Convert xref function to a thread safe model

Ensure that the unlink is done safely between thread and that
the peer struct will not destroy between the usage of the peer.
diff --git a/include/common/xref.h b/include/common/xref.h
index b020280..6dfa7b6 100644
--- a/include/common/xref.h
+++ b/include/common/xref.h
@@ -1,6 +1,8 @@
 #ifndef __XREF_H__
 #define __XREF_H__
 
+#include <common/hathreads.h>
+
 /* xref is used to create relation between two elements.
  * Once an element is released, it breaks the relation. If the
  * relation is already broken, it frees the xref struct.
@@ -13,25 +15,64 @@
 	struct xref *peer;
 };
 
+#define XREF_BUSY ((struct xref *)1)
+
 static inline void xref_create(struct xref *xref_a, struct xref *xref_b)
 {
 	xref_a->peer = xref_b;
 	xref_b->peer = xref_a;
 }
 
-static inline struct xref *xref_get_peer(struct xref *xref)
+static inline struct xref *xref_get_peer_and_lock(struct xref *xref)
 {
-	if (!xref->peer)
-		return NULL;
-	return xref->peer;
+	struct xref *local;
+	struct xref *remote;
+
+	while (1) {
+
+		/* Get the local pointer to the peer. */
+		local = HA_ATOMIC_XCHG(&xref->peer, XREF_BUSY);
+
+		/* If the local pointer is NULL, the peer no longer exists. */
+		if (local == NULL) {
+			xref->peer = NULL;
+			return NULL;
+		}
+
+		/* If the local pointeru is BUSY, the peer try to acquire the
+		 * lock. We retry the process.
+		 */
+		if (local == XREF_BUSY)
+			continue;
+
+		/* We are locked, the peer cant disapear, try to acquire
+		 * the pper's lock. Note that remote can't be NULL.
+		 */
+		remote = HA_ATOMIC_XCHG(&local->peer, XREF_BUSY);
+
+		/* The remote lock is BUSY, We retry the process. */
+		if (remote == XREF_BUSY) {
+			xref->peer = local;
+			continue;
+		}
+
+		/* We have the lock, we return the value of the xref. */
+		return local;
+	}
 }
 
-static inline void xref_disconnect(struct xref *xref)
+static inline void xref_unlock(struct xref *xref, struct xref *peer)
 {
-	if (!xref->peer)
-		return;
+	/* Release the peer. */
+	peer->peer = xref;
 
-	xref->peer->peer = NULL;
+	/* Release myself. */
+	xref->peer = peer;
+}
+
+static inline void xref_disconnect(struct xref *xref, struct xref *peer)
+{
+	peer->peer = NULL;
 	xref->peer = NULL;
 }
 
diff --git a/src/hlua.c b/src/hlua.c
index 1d14e54..6edb08a 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -1579,8 +1579,12 @@
  */
 static void hlua_socket_release(struct appctx *appctx)
 {
+	struct xref *peer;
+
 	/* Remove my link in the original object. */
-	xref_disconnect(&appctx->ctx.hlua_cosocket.xref);
+	peer = xref_get_peer_and_lock(&appctx->ctx.hlua_cosocket.xref);
+	if (peer)
+		xref_disconnect(&appctx->ctx.hlua_cosocket.xref, peer);
 
 	/* Wake all the task waiting for me. */
 	notification_wake(&appctx->ctx.hlua_cosocket.wake_on_read);
@@ -1602,11 +1606,9 @@
 	MAY_LJMP(check_args(L, 1, "__gc"));
 
 	socket = MAY_LJMP(hlua_checksocket(L, 1));
-	peer = xref_get_peer(&socket->xref);
-	if (!peer) {
-		xref_disconnect(&socket->xref);
+	peer = xref_get_peer_and_lock(&socket->xref);
+	if (!peer)
 		return 0;
-	}
 	appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
 
 	/* Set the flag which destroy the session. */
@@ -1614,7 +1616,7 @@
 	appctx_wakeup(appctx);
 
 	/* Remove all reference between the Lua stack and the coroutine stream. */
-	xref_disconnect(&socket->xref);
+	xref_disconnect(&socket->xref, peer);
 	return 0;
 }
 
@@ -1637,11 +1639,9 @@
 	if (socket->tid != tid)
 		WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
-	peer = xref_get_peer(&socket->xref);
-	if (!peer) {
-		xref_disconnect(&socket->xref);
+	peer = xref_get_peer_and_lock(&socket->xref);
+	if (!peer)
 		return 0;
-	}
 	appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
 
 	/* Set the flag which destroy the session. */
@@ -1649,7 +1649,7 @@
 	appctx_wakeup(appctx);
 
 	/* Remove all reference between the Lua stack and the coroutine stream. */
-	xref_disconnect(&socket->xref);
+	xref_disconnect(&socket->xref, peer);
 	return 0;
 }
 
@@ -1694,11 +1694,9 @@
 		WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
 	/* check for connection break. If some data where read, return it. */
-	peer = xref_get_peer(&socket->xref);
-	if (!peer) {
-		xref_disconnect(&socket->xref);
-		goto connection_closed;
-	}
+	peer = xref_get_peer_and_lock(&socket->xref);
+	if (!peer)
+		goto no_peer;
 	appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
 	si = appctx->owner;
 	s = si_strm(si);
@@ -1784,10 +1782,15 @@
 
 	/* Return result. */
 	luaL_pushresult(&socket->b);
+	xref_unlock(&socket->xref, peer);
 	return 1;
 
 connection_closed:
 
+	xref_unlock(&socket->xref, peer);
+
+no_peer:
+
 	/* If the buffer containds data. */
 	if (socket->b.n > 0) {
 		luaL_pushresult(&socket->b);
@@ -1800,8 +1803,11 @@
 connection_empty:
 
 	appctx = objt_appctx(s->si[0].end);
-	if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_read, hlua->task))
+	if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_read, hlua->task)) {
+		xref_unlock(&socket->xref, peer);
 		WILL_LJMP(luaL_error(L, "out of memory"));
+	}
+	xref_unlock(&socket->xref, peer);
 	WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_receive_yield, TICK_ETERNITY, 0));
 	return 0;
 }
@@ -1915,9 +1921,8 @@
 		WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
 	/* check for connection break. If some data where read, return it. */
-	peer = xref_get_peer(&socket->xref);
+	peer = xref_get_peer_and_lock(&socket->xref);
 	if (!peer) {
-		xref_disconnect(&socket->xref);
 		lua_pushinteger(L, -1);
 		return 1;
 	}
@@ -1927,6 +1932,7 @@
 
 	/* Check for connection close. */
 	if (channel_output_closed(&s->req)) {
+		xref_unlock(&socket->xref, peer);
 		lua_pushinteger(L, -1);
 		return 1;
 	}
@@ -1936,8 +1942,10 @@
 	send_len = buf_len - sent;
 
 	/* All the data are sent. */
-	if (sent >= buf_len)
+	if (sent >= buf_len) {
+		xref_unlock(&socket->xref, peer);
 		return 1; /* Implicitly return the length sent. */
+	}
 
 	/* Check if the buffer is avalaible because HAProxy doesn't allocate
 	 * the request buffer if its not required.
@@ -1952,8 +1960,10 @@
 	len = buffer_total_space(s->req.buf);
 	if (len <= 0) {
 		appctx = objt_appctx(s->si[0].end);
-		if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
+		if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
+			xref_unlock(&socket->xref, peer);
 			WILL_LJMP(luaL_error(L, "out of memory"));
+		}
 		goto hlua_socket_write_yield_return;
 	}
 
@@ -1974,6 +1984,7 @@
 		MAY_LJMP(hlua_socket_close(L));
 		lua_pop(L, 1);
 		lua_pushinteger(L, -1);
+		xref_unlock(&socket->xref, peer);
 		return 1;
 	}
 
@@ -1989,10 +2000,13 @@
 	lua_pushinteger(L, sent + len);
 
 	/* All the data buffer is sent ? */
-	if (sent + len >= buf_len)
+	if (sent + len >= buf_len) {
+		xref_unlock(&socket->xref, peer);
 		return 1;
+	}
 
 hlua_socket_write_yield_return:
+	xref_unlock(&socket->xref, peer);
 	WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_write_yield, TICK_ETERNITY, 0));
 	return 0;
 }
@@ -2131,6 +2145,7 @@
 	struct appctx *appctx;
 	struct stream_interface *si;
 	struct stream *s;
+	int ret;
 
 	MAY_LJMP(check_args(L, 1, "getpeername"));
 
@@ -2143,9 +2158,8 @@
 		WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
 	/* check for connection break. If some data where read, return it. */
-	peer = xref_get_peer(&socket->xref);
+	peer = xref_get_peer_and_lock(&socket->xref);
 	if (!peer) {
-		xref_disconnect(&socket->xref);
 		lua_pushnil(L);
 		return 1;
 	}
@@ -2155,17 +2169,21 @@
 
 	conn = objt_conn(s->si[1].end);
 	if (!conn) {
+		xref_unlock(&socket->xref, peer);
 		lua_pushnil(L);
 		return 1;
 	}
 
 	conn_get_to_addr(conn);
 	if (!(conn->flags & CO_FL_ADDR_TO_SET)) {
+		xref_unlock(&socket->xref, peer);
 		lua_pushnil(L);
 		return 1;
 	}
 
-	return MAY_LJMP(hlua_socket_info(L, &conn->addr.to));
+	ret = MAY_LJMP(hlua_socket_info(L, &conn->addr.to));
+	xref_unlock(&socket->xref, peer);
+	return ret;
 }
 
 /* Returns information about my connection side. */
@@ -2177,6 +2195,7 @@
 	struct xref *peer;
 	struct stream_interface *si;
 	struct stream *s;
+	int ret;
 
 	MAY_LJMP(check_args(L, 1, "getsockname"));
 
@@ -2189,9 +2208,8 @@
 		WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
 	/* check for connection break. If some data where read, return it. */
-	peer = xref_get_peer(&socket->xref);
+	peer = xref_get_peer_and_lock(&socket->xref);
 	if (!peer) {
-		xref_disconnect(&socket->xref);
 		lua_pushnil(L);
 		return 1;
 	}
@@ -2201,17 +2219,21 @@
 
 	conn = objt_conn(s->si[1].end);
 	if (!conn) {
+		xref_unlock(&socket->xref, peer);
 		lua_pushnil(L);
 		return 1;
 	}
 
 	conn_get_from_addr(conn);
 	if (!(conn->flags & CO_FL_ADDR_FROM_SET)) {
+		xref_unlock(&socket->xref, peer);
 		lua_pushnil(L);
 		return 1;
 	}
 
-	return hlua_socket_info(L, &conn->addr.from);
+	ret = hlua_socket_info(L, &conn->addr.from);
+	xref_unlock(&socket->xref, peer);
+	return ret;
 }
 
 /* This struct define the applet. */
@@ -2238,9 +2260,8 @@
 		WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
 	/* check for connection break. If some data where read, return it. */
-	peer = xref_get_peer(&socket->xref);
+	peer = xref_get_peer_and_lock(&socket->xref);
 	if (!peer) {
-		xref_disconnect(&socket->xref);
 		lua_pushnil(L);
 		lua_pushstring(L, "Can't connect");
 		return 2;
@@ -2252,11 +2273,14 @@
 	/* Check if we run on the same thread than the xreator thread.
 	 * We cannot access to the socket if the thread is different.
 	 */
-	if (socket->tid != tid)
+	if (socket->tid != tid) {
+		xref_unlock(&socket->xref, peer);
 		WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
+	}
 
 	/* Check for connection close. */
 	if (!hlua || channel_output_closed(&s->req)) {
+		xref_unlock(&socket->xref, peer);
 		lua_pushnil(L);
 		lua_pushstring(L, "Can't connect");
 		return 2;
@@ -2266,12 +2290,16 @@
 
 	/* Check for connection established. */
 	if (appctx->ctx.hlua_cosocket.connected) {
+		xref_unlock(&socket->xref, peer);
 		lua_pushinteger(L, 1);
 		return 1;
 	}
 
-	if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
+	if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
+		xref_unlock(&socket->xref, peer);
 		WILL_LJMP(luaL_error(L, "out of memory error"));
+	}
+	xref_unlock(&socket->xref, peer);
 	WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_connect_yield, TICK_ETERNITY, 0));
 	return 0;
 }
@@ -2308,9 +2336,8 @@
 		port = MAY_LJMP(luaL_checkinteger(L, 3));
 
 	/* check for connection break. If some data where read, return it. */
-	peer = xref_get_peer(&socket->xref);
+	peer = xref_get_peer_and_lock(&socket->xref);
 	if (!peer) {
-		xref_disconnect(&socket->xref);
 		lua_pushnil(L);
 		return 1;
 	}
@@ -2320,29 +2347,39 @@
 
 	/* Initialise connection. */
 	conn = si_alloc_conn(&s->si[1]);
-	if (!conn)
+	if (!conn) {
+		xref_unlock(&socket->xref, peer);
 		WILL_LJMP(luaL_error(L, "connect: internal error"));
+	}
 
 	/* needed for the connection not to be closed */
 	conn->target = s->target;
 
 	/* Parse ip address. */
 	addr = str2sa_range(ip, NULL, &low, &high, NULL, NULL, NULL, 0);
-	if (!addr)
+	if (!addr) {
+		xref_unlock(&socket->xref, peer);
 		WILL_LJMP(luaL_error(L, "connect: cannot parse destination address '%s'", ip));
-	if (low != high)
+	}
+	if (low != high) {
+		xref_unlock(&socket->xref, peer);
 		WILL_LJMP(luaL_error(L, "connect: port ranges not supported : address '%s'", ip));
+	}
 	memcpy(&conn->addr.to, addr, sizeof(struct sockaddr_storage));
 
 	/* Set port. */
 	if (low == 0) {
 		if (conn->addr.to.ss_family == AF_INET) {
-			if (port == -1)
+			if (port == -1) {
+				xref_unlock(&socket->xref, peer);
 				WILL_LJMP(luaL_error(L, "connect: port missing"));
+			}
 			((struct sockaddr_in *)&conn->addr.to)->sin_port = htons(port);
 		} else if (conn->addr.to.ss_family == AF_INET6) {
-			if (port == -1)
+			if (port == -1) {
+				xref_unlock(&socket->xref, peer);
 				WILL_LJMP(luaL_error(L, "connect: port missing"));
+			}
 			((struct sockaddr_in6 *)&conn->addr.to)->sin6_port = htons(port);
 		}
 	}
@@ -2359,8 +2396,11 @@
 
 	hlua->flags |= HLUA_MUST_GC;
 
-	if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
+	if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
+		xref_unlock(&socket->xref, peer);
 		WILL_LJMP(luaL_error(L, "out of memory"));
+	}
+	xref_unlock(&socket->xref, peer);
 	WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_connect_yield, TICK_ETERNITY, 0));
 
 	return 0;
@@ -2379,9 +2419,8 @@
 	socket  = MAY_LJMP(hlua_checksocket(L, 1));
 
 	/* check for connection break. If some data where read, return it. */
-	peer = xref_get_peer(&socket->xref);
+	peer = xref_get_peer_and_lock(&socket->xref);
 	if (!peer) {
-		xref_disconnect(&socket->xref);
 		lua_pushnil(L);
 		return 1;
 	}
@@ -2390,6 +2429,7 @@
 	s = si_strm(si);
 
 	s->target = &socket_ssl.obj_type;
+	xref_unlock(&socket->xref, peer);
 	return MAY_LJMP(hlua_socket_connect(L));
 }
 #endif
@@ -2420,9 +2460,8 @@
 		WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
 	/* check for connection break. If some data where read, return it. */
-	peer = xref_get_peer(&socket->xref);
+	peer = xref_get_peer_and_lock(&socket->xref);
 	if (!peer) {
-		xref_disconnect(&socket->xref);
 		hlua_pusherror(L, "socket: not yet initialised, you can't set timeouts.");
 		WILL_LJMP(lua_error(L));
 		return 0;
@@ -2435,6 +2474,7 @@
 	s->req.wto = tmout;
 	s->res.rto = tmout;
 	s->res.wto = tmout;
+	xref_unlock(&socket->xref, peer);
 
 	return 0;
 }