From 572394a1f7ff235a0ed3ae83e21a29dc94e3d6c0 Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Thu, 25 Jun 2026 00:02:15 +0200 Subject: [PATCH] fix(thread_pool): re-read outstanding work under lock before stopping testJoinDrainsWork could intermittently fail: join() returned with posted tasks still queued and never run. on_work_finished() decremented outstanding_work_ lock-free and decided to stop from that decrement alone. A worker could observe the count transiently reach zero, get preempted before taking the mutex, and then latch stop_ after more work had been posted and join() had begun waiting; join() woke and abandoned the still-outstanding work. The same hole strands a task that suspends and is resumed after the count briefly hits zero, since its run queue is empty while it is in flight. Keep outstanding_work_ atomic and lock-free on the start path, but have the worker that drives the count to zero re-read it under mutex_ before latching stop_. The re-read observes any on_work_started() that landed in the window after the lock-free decrement, so work started before the decision is never stranded; work whose count is raised after the decision is post-drain and abandoned as before. join() still blocks until the count reaches zero. Also correct the class example: a bare post() does not register outstanding work, so join() does not wait for it. Use run_async, which holds a work guard for the operation, and document the contract. --- include/boost/capy/ex/thread_pool.hpp | 7 ++++++- src/ex/thread_pool.cpp | 22 ++++++++++++++++------ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/include/boost/capy/ex/thread_pool.hpp b/include/boost/capy/ex/thread_pool.hpp index f6e1b9625..6aa377296 100644 --- a/include/boost/capy/ex/thread_pool.hpp +++ b/include/boost/capy/ex/thread_pool.hpp @@ -36,10 +36,15 @@ namespace capy { @code thread_pool pool(4); // 4 worker threads auto ex = pool.get_executor(); - ex.post(some_coroutine); + run_async(ex)(some_task()); // launch work; tracked so join() waits for it pool.join(); // wait for outstanding work to complete // pool destructor stops the pool, discarding any pending work @endcode + + @note `join()` waits only for work that holds outstanding-work + counting, which `run_async` (and `make_work_guard`) provide. A bare + `executor_type::post()` does not register outstanding work, so + `join()` will not wait for it. */ class BOOST_CAPY_DECL thread_pool diff --git a/src/ex/thread_pool.cpp b/src/ex/thread_pool.cpp index 393a397c9..de03b922c 100644 --- a/src/ex/thread_pool.cpp +++ b/src/ex/thread_pool.cpp @@ -35,9 +35,13 @@ thread is named with a configurable prefix plus index for debugger visibility. - Work tracking: on_work_started/on_work_finished maintain an atomic - outstanding_work_ counter. join() blocks until this counter reaches - zero, then signals workers to stop and joins threads. + Work tracking: on_work_started/on_work_finished maintain the atomic + outstanding_work_ counter. on_work_started is lock-free; the worker + that drives the count to zero takes mutex_ and re-reads the count + before deciding to stop, so the count and the stop decision stay + consistent even if work is started in between. join() blocks until + this counter reaches zero, then signals workers to stop and joins + threads. Two shutdown paths: - join(): waits for outstanding work to drain, then stops workers. @@ -160,11 +164,17 @@ class thread_pool::impl if(outstanding_work_.fetch_sub( 1, std::memory_order_acq_rel) == 1) { + // fetch_sub's result can be stale: a concurrent + // on_work_started() may raise the count before we take the + // lock, so re-read it here rather than trust the decrement. std::lock_guard lock(mutex_); - if(joined_ && !stop_) + if(outstanding_work_.load( + std::memory_order_acquire) == 0 && joined_ && !stop_) + { stop_ = true; - done_cv_.notify_all(); - work_cv_.notify_all(); + done_cv_.notify_all(); + work_cv_.notify_all(); + } } }