diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 0d48896dc..5585a4fd6 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -30,6 +30,7 @@ endif() if(BOOST_CAPY_BUILD_CUDA_EXAMPLES) add_subdirectory(cuda/datamovement) + add_subdirectory(cuda/notification-strategies) endif() if(BOOST_CAPY_BUILD_NVEXEC_EXAMPLES) diff --git a/example/cuda/notification-strategies/CMakeLists.txt b/example/cuda/notification-strategies/CMakeLists.txt new file mode 100644 index 000000000..16cc44483 --- /dev/null +++ b/example/cuda/notification-strategies/CMakeLists.txt @@ -0,0 +1,36 @@ +# +# Copyright (c) 2026 Steve Gerbino +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +# CUDA was enabled at the top level when the option was flipped on. +if(NOT CMAKE_CUDA_COMPILER) + message(FATAL_ERROR + "example/cuda/notification-strategies requires CUDA; " + "did you set BOOST_CAPY_BUILD_CUDA_EXAMPLES?") +endif() + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS + *.cu *.cuh *.hpp + CMakeLists.txt + README.md) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_cuda_notification_strategies ${PFILES}) + +set_target_properties(capy_example_cuda_notification_strategies PROPERTIES + FOLDER "examples" + CUDA_STANDARD 20 + CUDA_STANDARD_REQUIRED ON + CUDA_SEPARABLE_COMPILATION OFF) + +target_compile_features(capy_example_cuda_notification_strategies PRIVATE cxx_std_20) + +target_link_libraries(capy_example_cuda_notification_strategies PRIVATE + Boost::capy + CUDA::cudart) diff --git a/example/cuda/notification-strategies/README.md b/example/cuda/notification-strategies/README.md new file mode 100644 index 000000000..0a69765ca --- /dev/null +++ b/example/cuda/notification-strategies/README.md @@ -0,0 +1,81 @@ +# CUDA notification-strategies example + +One GPU completion, three notification mechanisms, one protocol. + +The same pipeline (fill a device buffer, copy it back to host, await the +CUDA stream) is awaited three structurally different ways. All three are +`boost::capy::IoAwaitable`s, and all three produce the identical result +at runtime. This demonstrates that the IoAwaitable protocol is +independent of how asynchronous completion is detected: a host-function +callback is only one option, and not the best-scaling one. + +## The three mechanisms + +| Mechanism | How completion is detected | Resumes on | +|-----------|----------------------------|------------| +| `callback_awaitable` | `cudaLaunchHostFunc` enqueued on the stream | a CUDA driver thread re-posts through the executor | +| `poll_awaitable` | a service thread loops `cudaEventQuery` on a recorded event | the poll thread posts when ready | +| `deferred_sync_awaitable` | a service thread runs blocking `cudaStreamSynchronize` | the service thread posts when it returns | + +Each awaitable captures the executor and posts the continuation through +it, so the coroutine always resumes on a worker thread, never on a CUDA +or service thread. + +The callback mechanism is the only one that cannot report a stream error +through its host function; `cudaLaunchHostFunc` does not pass completion +status to the callback, so `callback_awaitable` always resumes with +success — this is an inherent limitation of the API. + +### Service lifetime + +`poll_service` and `sync_service` own threads that post continuations to +the worker executor. Construct them after the worker `thread_pool` and +destroy them before it, so no continuation is ever posted to a destroyed +executor. The driver joins every pipeline (via a `std::latch`) before +shutdown, so no wait is outstanding at teardown. + +## Scaling tradeoff + +This example proves the three mechanisms are *equivalent in result*. It +does not measure their *throughput under load*, which a single-GPU +developer box cannot show. That comparison needs many worker threads +driving a server-class GPU. + +For that measurement: E. Cano, M. Fila, A. Krasznahorkay, +"Scheduling for Next Generation Triggers", CHEP 2026, +. They +report that the CUDA host-function callback handler scales poorly as the +number of worker threads grows, while event polling and deferred +synchronization remain stable. In a multi-threaded framework, prefer the +poll or deferred-sync mechanisms; reach for the callback mechanism for +its simplicity in low-concurrency settings. + +## Prerequisites + +- NVIDIA GPU and driver visible to `nvidia-smi`. +- CUDA toolkit 13.x. +- clang as host and CUDA compiler (verified with clang 22). +- `CMAKE_CXX_STANDARD=20`. + +## Building and running + +``` +CXX=clang++ cmake -S . -B build-cuda -G Ninja \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_CXX_STANDARD=20 \ + -DBOOST_CAPY_BUILD_CUDA_EXAMPLES=ON \ + -DCMAKE_CUDA_COMPILER=clang++ \ + -DCMAKE_CUDA_HOST_COMPILER=clang++ \ + -DCMAKE_CUDA_ARCHITECTURES=89 \ + -DCUDAToolkit_ROOT=/opt/cuda +cmake --build build-cuda --config Release \ + --target capy_example_cuda_notification_strategies +./build-cuda/example/cuda/notification-strategies/capy_example_cuda_notification_strategies +``` + +Replace `89` with your GPU's compute capability +(`nvidia-smi --query-gpu=compute_cap --format=csv,noheader`). + +Unlike the sibling `cuda/datamovement` example, this one is meant to be +run. The pass condition is all three mechanisms printing the same +checksum and a zero exit code. diff --git a/example/cuda/notification-strategies/notification_strategies.cu b/example/cuda/notification-strategies/notification_strategies.cu new file mode 100644 index 000000000..733422bf0 --- /dev/null +++ b/example/cuda/notification-strategies/notification_strategies.cu @@ -0,0 +1,200 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// One GPU completion, three notification mechanisms. +// +// The IoAwaitable protocol does not care how completion is detected. The +// same pipeline (fill a device buffer, copy it back, await the stream) +// runs once per mechanism (host-function callback, event polling, +// deferred blocking synchronize) and all three must produce the same +// checksum. See README.md for the multi-threaded scaling tradeoff, which +// a single-GPU box cannot measure. + +#include "notification_strategies.hpp" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace capy = boost::capy; +namespace ex = boost::capy::example; + +static_assert(std::is_same_v< + decltype(ex::make_cuda_error(cudaSuccess)), std::error_code>); + +// The whole point: three different mechanisms, all IoAwaitables. +static_assert(capy::IoAwaitable); +static_assert(capy::IoAwaitable); +static_assert(capy::IoAwaitable); + +namespace { + +constexpr int buffer_len = 256; +constexpr int fill_value = 7; + +__global__ void +fill_kernel(int* p, int n, int v) +{ + int i = blockIdx.x * blockDim.x + threadIdx.x; + if(i < n) + p[i] = v; +} + +enum class notify +{ + callback, + poll, + deferred_sync +}; + +char const* +name_of(notify how) noexcept +{ + switch(how) + { + case notify::callback: return "callback"; + case notify::poll: return "poll"; + case notify::deferred_sync: return "deferred-sync"; + } + return "?"; +} + +// Await the stream's completion using the selected mechanism. poll waits +// on the event; callback and deferred-sync wait on the stream. +capy::task +wait(ex::cuda_stream& stream, + ex::cuda_event& event, + notify how, + ex::poll_service& poll_svc, + ex::sync_service& sync_svc) +{ + switch(how) + { + case notify::callback: + co_return co_await stream.sync_via_callback(); + case notify::poll: + co_return co_await event.sync_via_poll(poll_svc); + case notify::deferred_sync: + co_return co_await stream.sync_via_deferred(sync_svc); + } + co_return std::error_code{}; +} + +// Fill a device buffer, copy it back, await completion via `how`, and +// return the host-side checksum. Identical across mechanisms. +capy::task +run_pipeline(ex::cuda_stream& stream, + ex::cuda_event& event, + notify how, + ex::poll_service& poll_svc, + ex::sync_service& sync_svc) +{ + auto s = stream.native_handle(); + + int* d_buf = nullptr; + auto err = cudaMallocAsync( + reinterpret_cast(&d_buf), + buffer_len * sizeof(int), s); + if(err != cudaSuccess) + co_return -1; + + fill_kernel<<<(buffer_len + 63) / 64, 64, 0, s>>>( + d_buf, buffer_len, fill_value); + + std::vector host(buffer_len, 0); + cudaMemcpyAsync( + host.data(), d_buf, buffer_len * sizeof(int), + cudaMemcpyDeviceToHost, s); + cudaFreeAsync(d_buf, s); + event.record(s); + + auto ec = co_await wait(stream, event, how, poll_svc, sync_svc); + if(ec) + co_return -1; + + long sum = 0; + for(int v : host) + sum += v; + co_return sum; +} + +// Drive one pipeline run to completion on `pool` and return its checksum. +long +run_one(capy::thread_pool& pool, + ex::cuda_stream& stream, + ex::cuda_event& event, + notify how, + ex::poll_service& poll_svc, + ex::sync_service& sync_svc) +{ + long result = 0; + std::latch done{1}; + capy::run_async(pool.get_executor(), + [&](long r) { result = r; done.count_down(); })( + run_pipeline(stream, event, how, poll_svc, sync_svc)); + done.wait(); + return result; +} + +} // namespace + +int +main() +{ + int device_count = 0; + if(cudaGetDeviceCount(&device_count) != cudaSuccess || device_count == 0) + { + std::cout << "No CUDA device available.\n"; + return EXIT_FAILURE; + } + + // Declaration order fixes teardown: services stop before the pool + // they post to; the stream/event close first of all. + capy::thread_pool pool(4); + ex::poll_service poll_svc; + ex::sync_service sync_svc; + ex::cuda_stream stream; + ex::cuda_event event; + + notify const modes[] = + { notify::callback, notify::poll, notify::deferred_sync }; + + long first = 0; + bool ok = true; + std::cout << "mechanism checksum\n"; + for(std::size_t i = 0; i < std::size(modes); ++i) + { + long r = run_one(pool, stream, event, modes[i], poll_svc, sync_svc); + std::cout << name_of(modes[i]) << " " << r << "\n"; + if(i == 0) + first = r; + else if(r != first) + ok = false; + } + + long const expected = + static_cast(buffer_len) * fill_value; + if(! ok || first != expected) + { + std::cout << "MISMATCH: mechanisms disagree or wrong result " + "(expected " << expected << ")\n"; + return EXIT_FAILURE; + } + + std::cout << "All three mechanisms produced " << first + << " (identical).\n"; + return EXIT_SUCCESS; +} diff --git a/example/cuda/notification-strategies/notification_strategies.hpp b/example/cuda/notification-strategies/notification_strategies.hpp new file mode 100644 index 000000000..5c0956e68 --- /dev/null +++ b/example/cuda/notification-strategies/notification_strategies.hpp @@ -0,0 +1,459 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_EXAMPLE_CUDA_NOTIFICATION_STRATEGIES_HPP +#define BOOST_CAPY_EXAMPLE_CUDA_NOTIFICATION_STRATEGIES_HPP + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace capy { +namespace example { + +/// Error category mapping `cudaError_t` values to messages. +class cuda_error_category + : public std::error_category +{ +public: + char const* name() const noexcept override + { + return "cuda"; + } + + std::string message(int ev) const override + { + return ::cudaGetErrorString(static_cast(ev)); + } +}; + +/// Return the singleton CUDA error category. +inline std::error_category const& cuda_category() noexcept +{ + static cuda_error_category const cat; + return cat; +} + +/// Convert a `cudaError_t` to a `std::error_code`. +inline std::error_code make_cuda_error(cudaError_t e) noexcept +{ + return std::error_code(static_cast(e), cuda_category()); +} + +/// One pending poll registration. `cont` and `ec` point into the +/// awaitable's coroutine frame, valid for the suspension's lifetime. +struct poll_entry +{ + cudaEvent_t event; + executor_ref ex; + continuation* cont; + std::error_code* ec; +}; + +/// A dedicated thread that polls CUDA events with `cudaEventQuery` and +/// posts each waiter's continuation once its event reports ready. +/// +/// This is the notification mechanism that avoids both blocking a worker +/// thread and registering a driver callback. Must be destroyed before +/// the executor it posts to; in this example no waits are outstanding at +/// teardown because the driver joins every pipeline first. +class poll_service +{ + std::mutex mtx_; + std::vector entries_; + std::jthread thread_; + + void run(std::stop_token st) + { + while(! st.stop_requested()) + { + { + std::lock_guard lock(mtx_); + for(std::size_t i = 0; i < entries_.size();) + { + auto& e = entries_[i]; + auto s = cudaEventQuery(e.event); + if(s == cudaErrorNotReady) + { + ++i; + continue; + } + *e.ec = (s == cudaSuccess) + ? std::error_code{} + : make_cuda_error(s); + e.ex.post(*e.cont); + entries_[i] = entries_.back(); + entries_.pop_back(); + } + } + std::this_thread::yield(); + } + } + +public: + poll_service() + : thread_([this](std::stop_token st) { run(st); }) + { + } + + /// Register a waiter; its continuation posts when `e.event` is ready. + void register_wait(poll_entry e) + { + std::lock_guard lock(mtx_); + entries_.push_back(e); + } +}; + +/// A single-thread worker that runs blocking jobs off the calling +/// thread. The deferred-sync mechanism uses it to call the blocking +/// `cudaStreamSynchronize` without stalling a worker thread, then posts +/// the continuation. Stops and joins on destruction. +class sync_service +{ + std::mutex mtx_; + std::condition_variable cv_; + std::queue> jobs_; + bool stop_ = false; + std::jthread thread_; + + void run() + { + for(;;) + { + std::function job; + { + std::unique_lock lock(mtx_); + cv_.wait(lock, [this] { return stop_ || ! jobs_.empty(); }); + if(stop_ && jobs_.empty()) + return; + job = std::move(jobs_.front()); + jobs_.pop(); + } + job(); + } + } + +public: + sync_service() + : thread_([this] { run(); }) + { + } + + ~sync_service() + { + { + std::lock_guard lock(mtx_); + stop_ = true; + } + cv_.notify_all(); + } + + /// Enqueue a job to run on the service thread. + void post(std::function job) + { + { + std::lock_guard lock(mtx_); + jobs_.push(std::move(job)); + } + cv_.notify_one(); + } +}; + +struct callback_awaitable; +class poll_awaitable; +class deferred_sync_awaitable; + +/// Owns a CUDA stream; created on construction, destroyed on teardown. +class cuda_stream +{ + cudaStream_t stream_ = nullptr; + +public: + cuda_stream() + { + auto err = cudaStreamCreate(&stream_); + if(err != cudaSuccess) + throw std::system_error(make_cuda_error(err)); + } + + ~cuda_stream() + { + if(stream_) + cudaStreamDestroy(stream_); + } + + cuda_stream(cuda_stream&& other) noexcept + : stream_(std::exchange(other.stream_, nullptr)) + { + } + + cuda_stream& operator=(cuda_stream&& other) noexcept + { + if(this != &other) + { + if(stream_) + cudaStreamDestroy(stream_); + stream_ = std::exchange(other.stream_, nullptr); + } + return *this; + } + + cuda_stream(cuda_stream const&) = delete; + cuda_stream& operator=(cuda_stream const&) = delete; + + /// Return the underlying CUDA stream handle. + cudaStream_t native_handle() const noexcept + { + return stream_; + } + + /// Return an IoAwaitable that completes via a host-function callback. + callback_awaitable sync_via_callback() noexcept; + + /// Return an IoAwaitable that completes after a blocking synchronize + /// runs on `svc`. + deferred_sync_awaitable sync_via_deferred(sync_service& svc) noexcept; +}; + +/// Owns a CUDA event used to observe stream progress. +class cuda_event +{ + cudaEvent_t event_ = nullptr; + +public: + cuda_event() + { + auto err = cudaEventCreateWithFlags(&event_, cudaEventDisableTiming); + if(err != cudaSuccess) + throw std::system_error(make_cuda_error(err)); + } + + ~cuda_event() + { + if(event_) + cudaEventDestroy(event_); + } + + cuda_event(cuda_event&& other) noexcept + : event_(std::exchange(other.event_, nullptr)) + { + } + + cuda_event& operator=(cuda_event&& other) noexcept + { + if(this != &other) + { + if(event_) + cudaEventDestroy(event_); + event_ = std::exchange(other.event_, nullptr); + } + return *this; + } + + cuda_event(cuda_event const&) = delete; + cuda_event& operator=(cuda_event const&) = delete; + + /// Return the underlying CUDA event handle. + cudaEvent_t native_handle() const noexcept + { + return event_; + } + + /// Record this event into `stream` at the current queue position. + void record(cudaStream_t stream) + { + auto err = cudaEventRecord(event_, stream); + if(err != cudaSuccess) + throw std::system_error(make_cuda_error(err)); + } + + /// Return an IoAwaitable that completes when this event is observed + /// ready by `svc`. + poll_awaitable sync_via_poll(poll_service& svc) noexcept; +}; + +/** IoAwaitable that resumes via a CUDA host-function callback. + + `cudaLaunchHostFunc` enqueues a host callback at the stream's current + position. CUDA runs it on a driver-owned thread that must not make + CUDA calls or resume the coroutine inline, so the callback posts the + continuation through the captured executor instead. One operation is + in flight at a time, so the resume context is a member rather than a + per-operation allocation. +*/ +struct callback_awaitable +{ + cudaStream_t stream; + + struct resume_ctx + { + executor_ref ex; + continuation* cont = nullptr; + std::error_code* ec = nullptr; + }; + + continuation cont_; + std::error_code ec_; + resume_ctx ctx_; + + static void CUDART_CB + on_complete(void* arg) + { + auto* c = static_cast(arg); + *c->ec = std::error_code{}; + c->ex.post(*c->cont); + } + + bool await_ready() const noexcept + { + return false; + } + + std::coroutine_handle<> + await_suspend(std::coroutine_handle<> h, io_env const* env) + { + cont_.h = h; + ctx_ = resume_ctx{env->executor, &cont_, &ec_}; + auto err = cudaLaunchHostFunc(stream, &on_complete, &ctx_); + if(err != cudaSuccess) + { + // Could not register: resume inline so we never deadlock. + ec_ = make_cuda_error(err); + return h; + } + return std::noop_coroutine(); + } + + std::error_code await_resume() const noexcept + { + return ec_; + } +}; + +inline callback_awaitable +cuda_stream::sync_via_callback() noexcept +{ + return callback_awaitable{stream_}; +} + +/// IoAwaitable that resumes when a CUDA event reports ready, detected by +/// a polling service thread rather than a callback or a blocking wait. +/// +/// @note A production awaitable should also check `env->stop_token` for +/// cancellation in `await_suspend`; these demo awaitables omit that +/// for brevity. +class poll_awaitable +{ + poll_service& svc_; + cudaEvent_t event_; + continuation cont_; + std::error_code ec_; + +public: + poll_awaitable(poll_service& svc, cudaEvent_t event) noexcept + : svc_(svc) + , event_(event) + { + } + + bool await_ready() const noexcept + { + return false; + } + + std::coroutine_handle<> + await_suspend(std::coroutine_handle<> h, io_env const* env) + { + cont_.h = h; + svc_.register_wait(poll_entry{event_, env->executor, &cont_, &ec_}); + return std::noop_coroutine(); + } + + std::error_code await_resume() const noexcept + { + return ec_; + } +}; + +inline poll_awaitable +cuda_event::sync_via_poll(poll_service& svc) noexcept +{ + return poll_awaitable{svc, event_}; +} + +/// IoAwaitable that resumes after a blocking `cudaStreamSynchronize` runs +/// on a service thread, keeping the worker thread free meanwhile. +class deferred_sync_awaitable +{ + sync_service& svc_; + cudaStream_t stream_; + continuation cont_; + std::error_code ec_; + +public: + deferred_sync_awaitable(sync_service& svc, cudaStream_t stream) noexcept + : svc_(svc) + , stream_(stream) + { + } + + bool await_ready() const noexcept + { + return false; + } + + std::coroutine_handle<> + await_suspend(std::coroutine_handle<> h, io_env const* env) + { + auto ex = env->executor; + auto stream = stream_; + cont_.h = h; + svc_.post([ex, stream, ec = &ec_, cont = &cont_]() mutable + { + auto err = cudaStreamSynchronize(stream); + *ec = (err == cudaSuccess) + ? std::error_code{} + : make_cuda_error(err); + ex.post(*cont); + }); + return std::noop_coroutine(); + } + + std::error_code await_resume() const noexcept + { + return ec_; + } +}; + +inline deferred_sync_awaitable +cuda_stream::sync_via_deferred(sync_service& svc) noexcept +{ + return deferred_sync_awaitable{svc, stream_}; +} + +} // namespace example +} // namespace capy +} // namespace boost + +#endif