MEDIUM: threads/xref: Convert xref function to a thread safe model
Ensure that the unlink is done safely between thread and that
the peer struct will not destroy between the usage of the peer.
diff --git a/include/common/xref.h b/include/common/xref.h
index b020280..6dfa7b6 100644
--- a/include/common/xref.h
+++ b/include/common/xref.h
@@ -1,6 +1,8 @@
#ifndef __XREF_H__
#define __XREF_H__
+#include <common/hathreads.h>
+
/* xref is used to create relation between two elements.
* Once an element is released, it breaks the relation. If the
* relation is already broken, it frees the xref struct.
@@ -13,25 +15,64 @@
struct xref *peer;
};
+#define XREF_BUSY ((struct xref *)1)
+
static inline void xref_create(struct xref *xref_a, struct xref *xref_b)
{
xref_a->peer = xref_b;
xref_b->peer = xref_a;
}
-static inline struct xref *xref_get_peer(struct xref *xref)
+static inline struct xref *xref_get_peer_and_lock(struct xref *xref)
{
- if (!xref->peer)
- return NULL;
- return xref->peer;
+ struct xref *local;
+ struct xref *remote;
+
+ while (1) {
+
+ /* Get the local pointer to the peer. */
+ local = HA_ATOMIC_XCHG(&xref->peer, XREF_BUSY);
+
+ /* If the local pointer is NULL, the peer no longer exists. */
+ if (local == NULL) {
+ xref->peer = NULL;
+ return NULL;
+ }
+
+ /* If the local pointeru is BUSY, the peer try to acquire the
+ * lock. We retry the process.
+ */
+ if (local == XREF_BUSY)
+ continue;
+
+ /* We are locked, the peer cant disapear, try to acquire
+ * the pper's lock. Note that remote can't be NULL.
+ */
+ remote = HA_ATOMIC_XCHG(&local->peer, XREF_BUSY);
+
+ /* The remote lock is BUSY, We retry the process. */
+ if (remote == XREF_BUSY) {
+ xref->peer = local;
+ continue;
+ }
+
+ /* We have the lock, we return the value of the xref. */
+ return local;
+ }
}
-static inline void xref_disconnect(struct xref *xref)
+static inline void xref_unlock(struct xref *xref, struct xref *peer)
{
- if (!xref->peer)
- return;
+ /* Release the peer. */
+ peer->peer = xref;
- xref->peer->peer = NULL;
+ /* Release myself. */
+ xref->peer = peer;
+}
+
+static inline void xref_disconnect(struct xref *xref, struct xref *peer)
+{
+ peer->peer = NULL;
xref->peer = NULL;
}
diff --git a/src/hlua.c b/src/hlua.c
index 1d14e54..6edb08a 100644
--- a/src/hlua.c
+++ b/src/hlua.c
@@ -1579,8 +1579,12 @@
*/
static void hlua_socket_release(struct appctx *appctx)
{
+ struct xref *peer;
+
/* Remove my link in the original object. */
- xref_disconnect(&appctx->ctx.hlua_cosocket.xref);
+ peer = xref_get_peer_and_lock(&appctx->ctx.hlua_cosocket.xref);
+ if (peer)
+ xref_disconnect(&appctx->ctx.hlua_cosocket.xref, peer);
/* Wake all the task waiting for me. */
notification_wake(&appctx->ctx.hlua_cosocket.wake_on_read);
@@ -1602,11 +1606,9 @@
MAY_LJMP(check_args(L, 1, "__gc"));
socket = MAY_LJMP(hlua_checksocket(L, 1));
- peer = xref_get_peer(&socket->xref);
- if (!peer) {
- xref_disconnect(&socket->xref);
+ peer = xref_get_peer_and_lock(&socket->xref);
+ if (!peer)
return 0;
- }
appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
/* Set the flag which destroy the session. */
@@ -1614,7 +1616,7 @@
appctx_wakeup(appctx);
/* Remove all reference between the Lua stack and the coroutine stream. */
- xref_disconnect(&socket->xref);
+ xref_disconnect(&socket->xref, peer);
return 0;
}
@@ -1637,11 +1639,9 @@
if (socket->tid != tid)
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
- peer = xref_get_peer(&socket->xref);
- if (!peer) {
- xref_disconnect(&socket->xref);
+ peer = xref_get_peer_and_lock(&socket->xref);
+ if (!peer)
return 0;
- }
appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
/* Set the flag which destroy the session. */
@@ -1649,7 +1649,7 @@
appctx_wakeup(appctx);
/* Remove all reference between the Lua stack and the coroutine stream. */
- xref_disconnect(&socket->xref);
+ xref_disconnect(&socket->xref, peer);
return 0;
}
@@ -1694,11 +1694,9 @@
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
/* check for connection break. If some data where read, return it. */
- peer = xref_get_peer(&socket->xref);
- if (!peer) {
- xref_disconnect(&socket->xref);
- goto connection_closed;
- }
+ peer = xref_get_peer_and_lock(&socket->xref);
+ if (!peer)
+ goto no_peer;
appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
si = appctx->owner;
s = si_strm(si);
@@ -1784,10 +1782,15 @@
/* Return result. */
luaL_pushresult(&socket->b);
+ xref_unlock(&socket->xref, peer);
return 1;
connection_closed:
+ xref_unlock(&socket->xref, peer);
+
+no_peer:
+
/* If the buffer containds data. */
if (socket->b.n > 0) {
luaL_pushresult(&socket->b);
@@ -1800,8 +1803,11 @@
connection_empty:
appctx = objt_appctx(s->si[0].end);
- if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_read, hlua->task))
+ if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_read, hlua->task)) {
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "out of memory"));
+ }
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_receive_yield, TICK_ETERNITY, 0));
return 0;
}
@@ -1915,9 +1921,8 @@
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
/* check for connection break. If some data where read, return it. */
- peer = xref_get_peer(&socket->xref);
+ peer = xref_get_peer_and_lock(&socket->xref);
if (!peer) {
- xref_disconnect(&socket->xref);
lua_pushinteger(L, -1);
return 1;
}
@@ -1927,6 +1932,7 @@
/* Check for connection close. */
if (channel_output_closed(&s->req)) {
+ xref_unlock(&socket->xref, peer);
lua_pushinteger(L, -1);
return 1;
}
@@ -1936,8 +1942,10 @@
send_len = buf_len - sent;
/* All the data are sent. */
- if (sent >= buf_len)
+ if (sent >= buf_len) {
+ xref_unlock(&socket->xref, peer);
return 1; /* Implicitly return the length sent. */
+ }
/* Check if the buffer is avalaible because HAProxy doesn't allocate
* the request buffer if its not required.
@@ -1952,8 +1960,10 @@
len = buffer_total_space(s->req.buf);
if (len <= 0) {
appctx = objt_appctx(s->si[0].end);
- if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
+ if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "out of memory"));
+ }
goto hlua_socket_write_yield_return;
}
@@ -1974,6 +1984,7 @@
MAY_LJMP(hlua_socket_close(L));
lua_pop(L, 1);
lua_pushinteger(L, -1);
+ xref_unlock(&socket->xref, peer);
return 1;
}
@@ -1989,10 +2000,13 @@
lua_pushinteger(L, sent + len);
/* All the data buffer is sent ? */
- if (sent + len >= buf_len)
+ if (sent + len >= buf_len) {
+ xref_unlock(&socket->xref, peer);
return 1;
+ }
hlua_socket_write_yield_return:
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_write_yield, TICK_ETERNITY, 0));
return 0;
}
@@ -2131,6 +2145,7 @@
struct appctx *appctx;
struct stream_interface *si;
struct stream *s;
+ int ret;
MAY_LJMP(check_args(L, 1, "getpeername"));
@@ -2143,9 +2158,8 @@
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
/* check for connection break. If some data where read, return it. */
- peer = xref_get_peer(&socket->xref);
+ peer = xref_get_peer_and_lock(&socket->xref);
if (!peer) {
- xref_disconnect(&socket->xref);
lua_pushnil(L);
return 1;
}
@@ -2155,17 +2169,21 @@
conn = objt_conn(s->si[1].end);
if (!conn) {
+ xref_unlock(&socket->xref, peer);
lua_pushnil(L);
return 1;
}
conn_get_to_addr(conn);
if (!(conn->flags & CO_FL_ADDR_TO_SET)) {
+ xref_unlock(&socket->xref, peer);
lua_pushnil(L);
return 1;
}
- return MAY_LJMP(hlua_socket_info(L, &conn->addr.to));
+ ret = MAY_LJMP(hlua_socket_info(L, &conn->addr.to));
+ xref_unlock(&socket->xref, peer);
+ return ret;
}
/* Returns information about my connection side. */
@@ -2177,6 +2195,7 @@
struct xref *peer;
struct stream_interface *si;
struct stream *s;
+ int ret;
MAY_LJMP(check_args(L, 1, "getsockname"));
@@ -2189,9 +2208,8 @@
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
/* check for connection break. If some data where read, return it. */
- peer = xref_get_peer(&socket->xref);
+ peer = xref_get_peer_and_lock(&socket->xref);
if (!peer) {
- xref_disconnect(&socket->xref);
lua_pushnil(L);
return 1;
}
@@ -2201,17 +2219,21 @@
conn = objt_conn(s->si[1].end);
if (!conn) {
+ xref_unlock(&socket->xref, peer);
lua_pushnil(L);
return 1;
}
conn_get_from_addr(conn);
if (!(conn->flags & CO_FL_ADDR_FROM_SET)) {
+ xref_unlock(&socket->xref, peer);
lua_pushnil(L);
return 1;
}
- return hlua_socket_info(L, &conn->addr.from);
+ ret = hlua_socket_info(L, &conn->addr.from);
+ xref_unlock(&socket->xref, peer);
+ return ret;
}
/* This struct define the applet. */
@@ -2238,9 +2260,8 @@
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
/* check for connection break. If some data where read, return it. */
- peer = xref_get_peer(&socket->xref);
+ peer = xref_get_peer_and_lock(&socket->xref);
if (!peer) {
- xref_disconnect(&socket->xref);
lua_pushnil(L);
lua_pushstring(L, "Can't connect");
return 2;
@@ -2252,11 +2273,14 @@
/* Check if we run on the same thread than the xreator thread.
* We cannot access to the socket if the thread is different.
*/
- if (socket->tid != tid)
+ if (socket->tid != tid) {
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
+ }
/* Check for connection close. */
if (!hlua || channel_output_closed(&s->req)) {
+ xref_unlock(&socket->xref, peer);
lua_pushnil(L);
lua_pushstring(L, "Can't connect");
return 2;
@@ -2266,12 +2290,16 @@
/* Check for connection established. */
if (appctx->ctx.hlua_cosocket.connected) {
+ xref_unlock(&socket->xref, peer);
lua_pushinteger(L, 1);
return 1;
}
- if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
+ if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "out of memory error"));
+ }
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_connect_yield, TICK_ETERNITY, 0));
return 0;
}
@@ -2308,9 +2336,8 @@
port = MAY_LJMP(luaL_checkinteger(L, 3));
/* check for connection break. If some data where read, return it. */
- peer = xref_get_peer(&socket->xref);
+ peer = xref_get_peer_and_lock(&socket->xref);
if (!peer) {
- xref_disconnect(&socket->xref);
lua_pushnil(L);
return 1;
}
@@ -2320,29 +2347,39 @@
/* Initialise connection. */
conn = si_alloc_conn(&s->si[1]);
- if (!conn)
+ if (!conn) {
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "connect: internal error"));
+ }
/* needed for the connection not to be closed */
conn->target = s->target;
/* Parse ip address. */
addr = str2sa_range(ip, NULL, &low, &high, NULL, NULL, NULL, 0);
- if (!addr)
+ if (!addr) {
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "connect: cannot parse destination address '%s'", ip));
- if (low != high)
+ }
+ if (low != high) {
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "connect: port ranges not supported : address '%s'", ip));
+ }
memcpy(&conn->addr.to, addr, sizeof(struct sockaddr_storage));
/* Set port. */
if (low == 0) {
if (conn->addr.to.ss_family == AF_INET) {
- if (port == -1)
+ if (port == -1) {
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "connect: port missing"));
+ }
((struct sockaddr_in *)&conn->addr.to)->sin_port = htons(port);
} else if (conn->addr.to.ss_family == AF_INET6) {
- if (port == -1)
+ if (port == -1) {
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "connect: port missing"));
+ }
((struct sockaddr_in6 *)&conn->addr.to)->sin6_port = htons(port);
}
}
@@ -2359,8 +2396,11 @@
hlua->flags |= HLUA_MUST_GC;
- if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
+ if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(luaL_error(L, "out of memory"));
+ }
+ xref_unlock(&socket->xref, peer);
WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_connect_yield, TICK_ETERNITY, 0));
return 0;
@@ -2379,9 +2419,8 @@
socket = MAY_LJMP(hlua_checksocket(L, 1));
/* check for connection break. If some data where read, return it. */
- peer = xref_get_peer(&socket->xref);
+ peer = xref_get_peer_and_lock(&socket->xref);
if (!peer) {
- xref_disconnect(&socket->xref);
lua_pushnil(L);
return 1;
}
@@ -2390,6 +2429,7 @@
s = si_strm(si);
s->target = &socket_ssl.obj_type;
+ xref_unlock(&socket->xref, peer);
return MAY_LJMP(hlua_socket_connect(L));
}
#endif
@@ -2420,9 +2460,8 @@
WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
/* check for connection break. If some data where read, return it. */
- peer = xref_get_peer(&socket->xref);
+ peer = xref_get_peer_and_lock(&socket->xref);
if (!peer) {
- xref_disconnect(&socket->xref);
hlua_pusherror(L, "socket: not yet initialised, you can't set timeouts.");
WILL_LJMP(lua_error(L));
return 0;
@@ -2435,6 +2474,7 @@
s->req.wto = tmout;
s->res.rto = tmout;
s->res.wto = tmout;
+ xref_unlock(&socket->xref, peer);
return 0;
}