SPSC: fix double close with MultiWaiter

This commit is contained in:
Miroslav Crnic
2025-09-30 16:00:57 +00:00
committed by Miroslav Crnic
parent 0d4b7ec885
commit 86fc9fd03b

View File

@@ -35,7 +35,7 @@ public:
for (; unlikely(queuesWithWork == 0); queuesWithWork = _queuesWithWork.load(std::memory_order_acquire)) {
if (timeout == 0) {
return false;
}
}
timespec spec = timeout.timespec();
long ret = syscall(SYS_futex, &_queuesWithWork, FUTEX_WAIT_PRIVATE, 0, timeout < 0 ? nullptr : &spec, nullptr, 0);
if (unlikely(ret != 0)) {
@@ -98,16 +98,21 @@ public:
// This will interrupt pullers.
void close() {
if (_size.fetch_or(1ull<<31, std::memory_order_relaxed) > 0) {
if (MultiWaiter) {
_waiter->removeWork();
}
auto prev = _size.fetch_or(1ull<<31, std::memory_order_relaxed);
if (prev & (1ull<<31)) {
// already closed nothing to do
return;
}
if (!MultiWaiter) {
long ret = syscall(SYS_futex, &_size, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
if (unlikely(ret < 0)) {
throw SYSCALL_EXCEPTION("futex");
if (MultiWaiter) {
if (prev > 0) {
_waiter->removeWork();
}
} else {
if (prev == 0) {
long ret = syscall(SYS_futex, &_size, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
if (unlikely(ret < 0)) {
throw SYSCALL_EXCEPTION("futex");
}
}
}
}
@@ -158,7 +163,7 @@ public:
for (; unlikely(sz == 0); sz = _size.load(std::memory_order_acquire)) {
if (timeout == 0) {
return 0;
}
}
timespec spec = timeout.timespec();
long ret = syscall(SYS_futex, &_size, FUTEX_WAIT_PRIVATE, 0, timeout < 0 ? nullptr : &spec, nullptr, 0);
if (unlikely(ret != 0)) {