MEDIUM: fd/poller: turn update_mask to group-local IDs
From now on, the FD's update_mask only refers to local thread IDs. However,
there remains a limitation, in updt_fd_polling(), we temporarily have to
check and set shared FDs against .thread_mask, which still contains global
ones. As such, nbtgroups > 1 may break (but this is not yet supported without
special build options).
diff --git a/include/haproxy/fd.h b/include/haproxy/fd.h
index 601419e..9321777 100644
--- a/include/haproxy/fd.h
+++ b/include/haproxy/fd.h
@@ -131,17 +131,17 @@
{
unsigned long update_mask;
- update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~tid_bit);
- while ((update_mask & all_threads_mask)== 0) {
+ update_mask = _HA_ATOMIC_AND_FETCH(&fdtab[fd].update_mask, ~ti->ltid_bit);
+ while ((update_mask & tg->threads_enabled) == 0) {
/* If we were the last one that had to update that entry, remove it from the list */
fd_rm_from_fd_list(&update_list[tgid - 1], fd);
- update_mask = (volatile unsigned long)fdtab[fd].update_mask;
- if ((update_mask & all_threads_mask) != 0) {
+ update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask);
+ if ((update_mask & tg->threads_enabled) != 0) {
/* Maybe it's been re-updated in the meanwhile, and we
* wrongly removed it from the list, if so, re-add it
*/
fd_add_to_fd_list(&update_list[tgid - 1], fd);
- update_mask = (volatile unsigned long)(fdtab[fd].update_mask);
+ update_mask = _HA_ATOMIC_LOAD(&fdtab[fd].update_mask);
/* And then check again, just in case after all it
* should be removed, even if it's very unlikely, given
* the current thread wouldn't have been able to take
diff --git a/src/ev_epoll.c b/src/ev_epoll.c
index 6a71650..13ebc4f 100644
--- a/src/ev_epoll.c
+++ b/src/ev_epoll.c
@@ -169,7 +169,7 @@
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+ _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
if (!fdtab[fd].owner) {
activity[tid].poll_drop_fd++;
continue;
@@ -188,7 +188,7 @@
fd = -fd -4;
if (fd == -1)
break;
- if (fdtab[fd].update_mask & tid_bit)
+ if (fdtab[fd].update_mask & ti->ltid_bit)
done_update_polling(fd);
else
continue;
diff --git a/src/ev_evports.c b/src/ev_evports.c
index 8129e9e..a8fbc13 100644
--- a/src/ev_evports.c
+++ b/src/ev_evports.c
@@ -126,7 +126,7 @@
for (i = 0; i < fd_nbupdt; i++) {
fd = fd_updt[i];
- _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+ _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
if (fdtab[fd].owner == NULL) {
activity[tid].poll_drop_fd++;
continue;
@@ -145,7 +145,7 @@
fd = -fd -4;
if (fd == -1)
break;
- if (fdtab[fd].update_mask & tid_bit)
+ if (fdtab[fd].update_mask & ti->ltid_bit)
done_update_polling(fd);
else
continue;
diff --git a/src/ev_kqueue.c b/src/ev_kqueue.c
index 79ddf7d..70ee2ad 100644
--- a/src/ev_kqueue.c
+++ b/src/ev_kqueue.c
@@ -102,7 +102,7 @@
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+ _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
if (!fdtab[fd].owner) {
activity[tid].poll_drop_fd++;
continue;
@@ -119,7 +119,7 @@
fd = -fd -4;
if (fd == -1)
break;
- if (fdtab[fd].update_mask & tid_bit)
+ if (fdtab[fd].update_mask & ti->ltid_bit)
done_update_polling(fd);
else
continue;
diff --git a/src/ev_poll.c b/src/ev_poll.c
index a7fadea..2147813 100644
--- a/src/ev_poll.c
+++ b/src/ev_poll.c
@@ -116,7 +116,7 @@
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+ _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
if (!fdtab[fd].owner) {
activity[tid].poll_drop_fd++;
continue;
@@ -134,12 +134,12 @@
fd = -fd -4;
if (fd == -1)
break;
- if (fdtab[fd].update_mask & tid_bit) {
+ if (fdtab[fd].update_mask & ti->ltid_bit) {
/* Cheat a bit, as the state is global to all pollers
* we don't need every thread to take care of the
* update.
*/
- _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
+ _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tg->threads_enabled);
done_update_polling(fd);
} else
continue;
diff --git a/src/ev_select.c b/src/ev_select.c
index 2b39a81..eadd588 100644
--- a/src/ev_select.c
+++ b/src/ev_select.c
@@ -108,7 +108,7 @@
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tid_bit);
+ _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~ti->ltid_bit);
if (!fdtab[fd].owner) {
activity[tid].poll_drop_fd++;
continue;
@@ -125,12 +125,12 @@
fd = -fd -4;
if (fd == -1)
break;
- if (fdtab[fd].update_mask & tid_bit) {
+ if (fdtab[fd].update_mask & ti->ltid_bit) {
/* Cheat a bit, as the state is global to all pollers
* we don't need every thread to take care of the
* update.
*/
- _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~all_threads_mask);
+ _HA_ATOMIC_AND(&fdtab[fd].update_mask, ~tg->threads_enabled);
done_update_polling(fd);
} else
continue;
diff --git a/src/fd.c b/src/fd.c
index 6a7043c..45cc5f4 100644
--- a/src/fd.c
+++ b/src/fd.c
@@ -459,14 +459,14 @@
void updt_fd_polling(const int fd)
{
if (all_threads_mask == 1UL || (fdtab[fd].thread_mask & all_threads_mask) == tid_bit) {
- if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+ if (HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
return;
fd_updt[fd_nbupdt++] = fd;
} else {
unsigned long update_mask = fdtab[fd].update_mask;
do {
- if (update_mask == fdtab[fd].thread_mask)
+ if (update_mask == fdtab[fd].thread_mask) // FIXME: this works only on thread-groups 1
return;
} while (!_HA_ATOMIC_CAS(&fdtab[fd].update_mask, &update_mask, fdtab[fd].thread_mask));
@@ -525,7 +525,7 @@
activity[tid].poll_skip_fd++;
/* Let the poller know this FD was lost */
- if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+ if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
fd_updt[fd_nbupdt++] = fd;
fd_drop_tgid(fd);
@@ -603,10 +603,10 @@
/* we had to stop this FD and it still must be stopped after the I/O
* cb's changes, so let's program an update for this.
*/
- if (must_stop && !(fdtab[fd].update_mask & tid_bit)) {
+ if (must_stop && !(fdtab[fd].update_mask & ti->ltid_bit)) {
if (((must_stop & FD_POLL_IN) && !fd_recv_active(fd)) ||
((must_stop & FD_POLL_OUT) && !fd_send_active(fd)))
- if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, tid))
+ if (!HA_ATOMIC_BTS(&fdtab[fd].update_mask, ti->ltid))
fd_updt[fd_nbupdt++] = fd;
}
diff --git a/src/stream.c b/src/stream.c
index 66529d4..4208885 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -3330,7 +3330,7 @@
conn->flags,
conn_fd(conn),
conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].state : 0,
- conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
+ conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & ti->ltid_bit) : 0,
conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
}
@@ -3368,7 +3368,7 @@
conn->flags,
conn_fd(conn),
conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].state : 0,
- conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & tid_bit) : 0,
+ conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & ti->ltid_bit) : 0,
conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
}