From d32736af3b5d5eda237fd7d48f084ec773f586f9 Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Fri, 6 Mar 2026 11:50:45 -0800 Subject: [PATCH] roll back snr.cuh as a temporary fix for maxwell example perf regression --- examples/nvexec/maxwell/snr.cuh | 505 ++++++++++++++++++++++++++++++-- 1 file changed, 488 insertions(+), 17 deletions(-) diff --git a/examples/nvexec/maxwell/snr.cuh b/examples/nvexec/maxwell/snr.cuh index 931934ded..5b2aec3d6 100644 --- a/examples/nvexec/maxwell/snr.cuh +++ b/examples/nvexec/maxwell/snr.cuh @@ -18,30 +18,499 @@ #pragma once -#include // IWYU pragma: export +#include "common.cuh" +#include "stdexec/execution.hpp" // IWYU pragma: export -#include -#include - -#include "./common.cuh" +namespace ex = stdexec; #if STDEXEC_CUDA_COMPILATION() # include // IWYU pragma: export # include // IWYU pragma: export #else -namespace nv::execution +namespace nvexec { - inline constexpr bool is_on_gpu() noexcept + struct stream_receiver_base + { + using receiver_concept = ex::receiver_t; + }; + + struct stream_sender_base + { + using sender_concept = ex::sender_t; + }; + + namespace detail + { + struct stream_op_state_base + {}; + } // namespace detail + + inline auto is_on_gpu() -> bool { return false; } -} // namespace nv::execution - -namespace nvexec = nv::execution; +} // namespace nvexec #endif +#include +#include + +STDEXEC_PRAGMA_PUSH() +STDEXEC_PRAGMA_IGNORE_GNU("-Wmissing-braces") + namespace ex = stdexec; +namespace repeat_n_detail +{ + template + struct repeat_n_sender_t; +} // namespace repeat_n_detail + +struct repeat_n_t +{ + template + auto operator()(Sender __sndr, std::size_t n, Closure closure) const noexcept + -> repeat_n_detail::repeat_n_sender_t + { + return repeat_n_detail::repeat_n_sender_t{ + {}, + {closure, n}, + std::move(__sndr) + }; + } + + template + auto operator()(std::size_t n, Closure closure) const + { + return ex::__closure(*this, n, closure); + } +}; + +inline constexpr repeat_n_t repeat_n{}; + +namespace repeat_n_detail +{ + + template + class receiver_2_t + { + using Sender = OpT::child_t; + using Receiver = OpT::receiver_t; + + OpT& op_state_; + + public: + using receiver_concept = ex::receiver_t; + + void set_value() noexcept + { + using inner_op_state_t = OpT::inner_op_state_t; + + op_state_.i_++; + + if (op_state_.i_ == op_state_.n_) + { + ex::set_value(std::move(op_state_.rcvr_)); + return; + } + + auto sch = ex::get_scheduler(ex::get_env(op_state_.rcvr_)); + inner_op_state_t& inner_op_state = op_state_.inner_op_state_.emplace(ex::__emplace_from{ + [&]() noexcept + { + return ex::connect(ex::schedule(sch) | op_state_.closure_, receiver_2_t{op_state_}); + }}); + + ex::start(inner_op_state); + } + + template + void set_error(Error&& err) noexcept + { + ex::set_error(std::move(op_state_.rcvr_), static_cast(err)); + } + + void set_stopped() noexcept + { + ex::set_stopped(std::move(op_state_.rcvr_)); + } + + [[nodiscard]] + auto get_env() const noexcept -> ex::env_of_t + { + return ex::get_env(op_state_.rcvr_); + } + + explicit receiver_2_t(OpT& op_state) + : op_state_(op_state) + {} + }; + + template + class receiver_1_t + { + using Receiver = OpT::receiver_t; + + OpT& op_state_; + + public: + using receiver_concept = ex::receiver_t; + + void set_value() noexcept + { + using inner_op_state_t = OpT::inner_op_state_t; + + if (op_state_.n_) + { + inner_op_state_t& inner_op_state = op_state_.inner_op_state_.emplace( + ex::__emplace_from{[this]() noexcept + { + auto sch = ex::get_scheduler(ex::get_env(op_state_.rcvr_)); + return ex::connect(ex::schedule(sch) | op_state_.closure_, + receiver_2_t{op_state_}); + }}); + + ex::start(inner_op_state); + } + else + { + ex::set_value(std::move(op_state_.rcvr_)); + } + } + + template + void set_error(Error&& err) noexcept + { + ex::set_error(std::move(op_state_.rcvr_), static_cast(err)); + } + + void set_stopped() noexcept + { + ex::set_stopped(std::move(op_state_.rcvr_)); + } + + [[nodiscard]] + auto get_env() const noexcept -> ex::env_of_t + { + return ex::get_env(op_state_.rcvr_); + } + + explicit receiver_1_t(OpT& op_state) + : op_state_(op_state) + {} + }; + + template + struct operation_state_t + { + using receiver_t = Receiver; + using child_t = PredSender; + using Scheduler = std::invoke_result_t>; + using InnerSender = std::invoke_result_t>; + + using predecessor_op_state_t = + ex::connect_result_t>; + using inner_op_state_t = ex::connect_result_t>; + + PredSender pred_sender_; + Closure closure_; + Receiver rcvr_; + std::optional pred_op_state_; + std::optional inner_op_state_; + std::size_t n_{}; + std::size_t i_{}; + + void start() & noexcept + { + if (n_) + { + ex::start(*pred_op_state_); + } + else + { + ex::set_value(std::move(rcvr_)); + } + } + + operation_state_t(PredSender&& pred_sender, Closure closure, Receiver&& rcvr, std::size_t n) + : pred_sender_{static_cast(pred_sender)} + , closure_(closure) + , rcvr_(rcvr) + , n_(n) + { + pred_op_state_.emplace(ex::__emplace_from{ + [&]() noexcept + { return ex::connect(static_cast(pred_sender_), receiver_1_t{*this}); }}); + } + }; + + template + struct repeat_n_sender_t + { + using sender_concept = ex::sender_t; + + template + static consteval auto get_completion_signatures() noexcept + { + return ex::completion_signatures< + ex::set_value_t(), + ex::set_stopped_t(), + ex::set_error_t(std::exception_ptr) + // STDEXEC_WHEN(STDEXEC_CUDA_COMPILATION(), , ex::set_error_t(cudaError_t)) + >(); + } + + template Self, ex::receiver Receiver> + STDEXEC_EXPLICIT_THIS_BEGIN(auto connect)(this Self&& self, Receiver r) + -> repeat_n_detail::operation_state_t + { + return repeat_n_detail::operation_state_t( + static_cast(self).sender_, + static_cast(self).data_.first, + static_cast(r), + self.data_.second); + } + STDEXEC_EXPLICIT_THIS_END(connect) + + [[nodiscard]] + auto get_env() const noexcept -> ex::env_of_t + { + return ex::get_env(sender_); + } + + STDEXEC_ATTRIBUTE(no_unique_address, maybe_unused) + repeat_n_t tag_; + std::pair data_; + Sender sender_; + }; +} // namespace repeat_n_detail + +namespace STDEXEC +{ + template + inline constexpr std::size_t + __structured_binding_size_v> = 3; +} // namespace STDEXEC + +#if STDEXEC_CUDA_COMPILATION() +// A CUDA stream implementation of repeat_n +namespace nv::execution::_strm +{ + namespace repeat_n + { + template + class receiver_2_t : public stream_receiver_base + { + using Sender = OpT::PredSender; + using Receiver = OpT::Receiver; + + OpT& op_state_; + + public: + void set_value() noexcept + { + using inner_op_state_t = OpT::inner_op_state_t; + + op_state_.i_++; + + if (op_state_.i_ == op_state_.n_) + { + op_state_.propagate_completion_signal(ex::set_value); + return; + } + + inner_op_state_t& inner_op_state = op_state_.inner_op_state_.emplace(ex::__emplace_from{ + [&]() noexcept + { + return ex::connect(op_state_.closure_(ex::schedule(op_state_.scheduler_)), + receiver_2_t{op_state_}); + }}); + + ex::start(inner_op_state); + } + + template + void set_error(Error&& err) noexcept + { + op_state_.propagate_completion_signal(set_error_t(), static_cast(err)); + } + + void set_stopped() noexcept + { + op_state_.propagate_completion_signal(set_stopped_t()); + } + + auto get_env() const noexcept -> OpT::env_t + { + return op_state_.make_env(); + } + + explicit receiver_2_t(OpT& op_state) + : op_state_(op_state) + {} + }; + + template + class receiver_1_t : public stream_receiver_base + { + using Receiver = OpT::Receiver; + + OpT& op_state_; + + public: + void set_value() noexcept + { + using inner_op_state_t = OpT::inner_op_state_t; + + if (op_state_.n_) + { + inner_op_state_t& inner_op_state = op_state_.inner_op_state_.emplace(ex::__emplace_from{ + [&]() noexcept + { + return ex::connect(op_state_.closure_(ex::schedule(op_state_.scheduler_)), + receiver_2_t{op_state_}); + }}); + + ex::start(inner_op_state); + } + else + { + op_state_.propagate_completion_signal(set_value_t()); + } + } + + template + void set_error(Error&& err) noexcept + { + op_state_.propagate_completion_signal(set_error_t(), static_cast(err)); + } + + void set_stopped() noexcept + { + op_state_.propagate_completion_signal(set_stopped_t()); + } + + auto get_env() const noexcept -> OpT::env_t + { + return op_state_.make_env(); + } + + explicit receiver_1_t(OpT& op_state) + : op_state_(op_state) + {} + }; + + template + struct operation_state_t : _strm::opstate_base + { + using Scheduler = std::invoke_result_t, + ex::env_of_t, + ex::env_of_t>; + using InnerSender = std::invoke_result_t>; + + using predecessor_op_state_t = + ex::connect_result_t>; + using inner_op_state_t = ex::connect_result_t>; + + Scheduler scheduler_; + Closure closure_; + std::optional pred_op_state_; + std::optional inner_op_state_; + std::size_t n_{}; + std::size_t i_{}; + + void start() & noexcept + { + if (this->stream_provider_.status_ != cudaSuccess) + { + // Couldn't allocate memory for operation state, complete with error + this->propagate_completion_signal(ex::set_error, + std::move(this->stream_provider_.status_)); + } + else + { + if (n_) + { + ex::start(*pred_op_state_); + } + else + { + this->propagate_completion_signal(ex::set_value); + } + } + } + + operation_state_t(PredSender&& pred_sender, Closure closure, Receiver&& rcvr, std::size_t n) + : _strm::opstate_base( + static_cast(rcvr), + ex::get_completion_scheduler(ex::get_env(pred_sender), + ex::get_env(rcvr)) + .context_state_) + , scheduler_(ex::get_completion_scheduler(ex::get_env(pred_sender), + ex::get_env(rcvr))) + , closure_(closure) + , n_(n) + { + pred_op_state_.emplace(ex::__emplace_from{ + [&]() noexcept + { return ex::connect(static_cast(pred_sender), receiver_1_t{*this}); }}); + } + }; + + template + struct sender_t + { + using sender_concept = ex::sender_t; + + using completion_signatures = ex::completion_signatures; + + template Self, ex::receiver Receiver> + requires(ex::sender_to) + STDEXEC_EXPLICIT_THIS_BEGIN(auto connect)(this Self&& self, Receiver r) + -> nvexec::_strm::repeat_n::operation_state_t + { + return nvexec::_strm::repeat_n::operation_state_t( + static_cast(self).sender_, + static_cast(self).closure_, + static_cast(r), + self.n_); + } + STDEXEC_EXPLICIT_THIS_END(connect) + + [[nodiscard]] + auto get_env() const noexcept -> ex::env_of_t + { + return ex::get_env(sender_); + } + + Sender sender_; + Closure closure_; + std::size_t n_{}; + }; + } // namespace repeat_n + + template <> + struct transform_sender_for + { + template + auto operator()(Env const &, ex::__ignore, Data&& data, Sender sndr) const + { + static_assert(sizeof(Env) == 0); + auto& [closure, n] = data; + using closure_t = decltype(closure); + + return repeat_n::sender_t(static_cast(sndr), + ex::__forward_like(closure), + n); + } + }; +} // namespace nv::execution::_strm + +#endif // STDEXEC_CUDA_COMPILATION() + template [[nodiscard]] auto is_gpu_scheduler([[maybe_unused]] SchedulerT&& scheduler) -> bool @@ -58,11 +527,11 @@ auto maxwell_eqs_snr(float dt, fields_accessor accessor, ex::scheduler auto&& computer) { - return ex::on(computer, - ex::just() // - | ex::bulk(ex::par, accessor.cells, update_h(accessor)) - | ex::bulk(ex::par, accessor.cells, update_e(time, dt, accessor)) - | exec::repeat_n(n_iterations)) + return ex::just() + | repeat_n(n_iterations, + ex::on(computer, + ex::bulk(ex::par, accessor.cells, update_h(accessor)) + | ex::bulk(ex::par, accessor.cells, update_e(time, dt, accessor)))) | ex::then(dump_vtk(write_results, accessor)); } @@ -76,8 +545,8 @@ void run_snr(float dt, time_storage_t time{is_gpu_scheduler(computer)}; fields_accessor accessor = grid.accessor(); - auto init = ex::on(computer, - ex::just() | ex::bulk(ex::par, grid.cells, grid_initializer(dt, accessor))); + auto init = ex::just() + | ex::on(computer, ex::bulk(ex::par, grid.cells, grid_initializer(dt, accessor))); ex::sync_wait(init); auto snd = maxwell_eqs_snr(dt, time.get(), write_vtk, n_iterations, accessor, computer); @@ -87,3 +556,5 @@ void run_snr(float dt, scheduler_name, [&snd] { ex::sync_wait(std::move(snd)); }); } + +STDEXEC_PRAGMA_POP()