Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion include/boost/capy/ex/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ namespace capy {
@code
thread_pool pool(4); // 4 worker threads
auto ex = pool.get_executor();
ex.post(some_coroutine);
run_async(ex)(some_task()); // launch work; tracked so join() waits for it
pool.join(); // wait for outstanding work to complete
// pool destructor stops the pool, discarding any pending work
@endcode

@note `join()` waits only for work that holds outstanding-work
counting, which `run_async` (and `make_work_guard`) provide. A bare
`executor_type::post()` does not register outstanding work, so
`join()` will not wait for it.
*/
class BOOST_CAPY_DECL
thread_pool
Expand Down
22 changes: 16 additions & 6 deletions src/ex/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@
thread is named with a configurable prefix plus index for debugger
visibility.

Work tracking: on_work_started/on_work_finished maintain an atomic
outstanding_work_ counter. join() blocks until this counter reaches
zero, then signals workers to stop and joins threads.
Work tracking: on_work_started/on_work_finished maintain the atomic
outstanding_work_ counter. on_work_started is lock-free; the worker
that drives the count to zero takes mutex_ and re-reads the count
before deciding to stop, so the count and the stop decision stay
consistent even if work is started in between. join() blocks until
this counter reaches zero, then signals workers to stop and joins
threads.

Two shutdown paths:
- join(): waits for outstanding work to drain, then stops workers.
Expand Down Expand Up @@ -160,11 +164,17 @@ class thread_pool::impl
if(outstanding_work_.fetch_sub(
1, std::memory_order_acq_rel) == 1)
{
// fetch_sub's result can be stale: a concurrent
// on_work_started() may raise the count before we take the
// lock, so re-read it here rather than trust the decrement.
std::lock_guard<std::mutex> lock(mutex_);
if(joined_ && !stop_)
if(outstanding_work_.load(
std::memory_order_acquire) == 0 && joined_ && !stop_)
{
stop_ = true;
done_cv_.notify_all();
work_cv_.notify_all();
done_cv_.notify_all();
work_cv_.notify_all();
}
}
}

Expand Down
Loading