LCOV - code coverage report
Current view: top level - capy - when_all.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 98.6 % 148 146 2
Test Date: 2026-03-12 18:24:42 Functions: 94.1 % 623 586 37

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_WHEN_ALL_HPP
      11                 : #define BOOST_CAPY_WHEN_ALL_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/void_to_monostate.hpp>
      15                 : #include <boost/capy/concept/executor.hpp>
      16                 : #include <boost/capy/concept/io_awaitable.hpp>
      17                 : #include <coroutine>
      18                 : #include <boost/capy/ex/io_env.hpp>
      19                 : #include <boost/capy/ex/frame_allocator.hpp>
      20                 : #include <boost/capy/task.hpp>
      21                 : 
      22                 : #include <array>
      23                 : #include <atomic>
      24                 : #include <exception>
      25                 : #include <optional>
      26                 : #include <ranges>
      27                 : #include <stdexcept>
      28                 : #include <stop_token>
      29                 : #include <tuple>
      30                 : #include <type_traits>
      31                 : #include <utility>
      32                 : #include <vector>
      33                 : 
      34                 : namespace boost {
      35                 : namespace capy {
      36                 : 
      37                 : namespace detail {
      38                 : 
      39                 : /** Holds the result of a single task within when_all.
      40                 : */
      41                 : template<typename T>
      42                 : struct result_holder
      43                 : {
      44                 :     std::optional<T> value_;
      45                 : 
      46 HIT          64 :     void set(T v)
      47                 :     {
      48              64 :         value_ = std::move(v);
      49              64 :     }
      50                 : 
      51              57 :     T get() &&
      52                 :     {
      53              57 :         return std::move(*value_);
      54                 :     }
      55                 : };
      56                 : 
      57                 : /** Specialization for void tasks - returns monostate to preserve index mapping.
      58                 : */
      59                 : template<>
      60                 : struct result_holder<void>
      61                 : {
      62              43 :     std::monostate get() && { return {}; }
      63                 : };
      64                 : 
      65                 : /** Core shared state for when_all operations.
      66                 : 
      67                 :     Contains all members and methods common to both heterogeneous (variadic)
      68                 :     and homogeneous (range) when_all implementations. State classes embed
      69                 :     this via composition to avoid CRTP destructor ordering issues.
      70                 : 
      71                 :     @par Thread Safety
      72                 :     Atomic operations protect exception capture and completion count.
      73                 : */
      74                 : struct when_all_core
      75                 : {
      76                 :     std::atomic<std::size_t> remaining_count_;
      77                 : 
      78                 :     // Exception storage - first error wins, others discarded
      79                 :     std::atomic<bool> has_exception_{false};
      80                 :     std::exception_ptr first_exception_;
      81                 : 
      82                 :     std::stop_source stop_source_;
      83                 : 
      84                 :     // Bridges parent's stop token to our stop_source
      85                 :     struct stop_callback_fn
      86                 :     {
      87                 :         std::stop_source* source_;
      88               4 :         void operator()() const { source_->request_stop(); }
      89                 :     };
      90                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
      91                 :     std::optional<stop_callback_t> parent_stop_callback_;
      92                 : 
      93                 :     std::coroutine_handle<> continuation_;
      94                 :     io_env const* caller_env_ = nullptr;
      95                 : 
      96              71 :     explicit when_all_core(std::size_t count) noexcept
      97              71 :         : remaining_count_(count)
      98                 :     {
      99              71 :     }
     100                 : 
     101                 :     /** Capture an exception (first one wins). */
     102              23 :     void capture_exception(std::exception_ptr ep)
     103                 :     {
     104              23 :         bool expected = false;
     105              23 :         if(has_exception_.compare_exchange_strong(
     106                 :             expected, true, std::memory_order_relaxed))
     107              20 :             first_exception_ = ep;
     108              23 :     }
     109                 : };
     110                 : 
     111                 : /** Shared state for heterogeneous when_all (variadic overload).
     112                 : 
     113                 :     @tparam Ts The result types of the tasks.
     114                 : */
     115                 : template<typename... Ts>
     116                 : struct when_all_state
     117                 : {
     118                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     119                 : 
     120                 :     when_all_core core_;
     121                 :     std::tuple<result_holder<Ts>...> results_;
     122                 :     std::array<std::coroutine_handle<>, task_count> runner_handles_{};
     123                 : 
     124              62 :     when_all_state()
     125              62 :         : core_(task_count)
     126                 :     {
     127              62 :     }
     128                 : };
     129                 : 
     130                 : /** Shared state for homogeneous when_all (range overload).
     131                 : 
     132                 :     Stores all results in a vector indexed by task position.
     133                 : 
     134                 :     @tparam T The common result type of all tasks.
     135                 : */
     136                 : template<typename T>
     137                 : struct when_all_homogeneous_state
     138                 : {
     139                 :     when_all_core core_;
     140                 :     std::vector<std::optional<T>> results_;
     141                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     142                 : 
     143               7 :     explicit when_all_homogeneous_state(std::size_t count)
     144               7 :         : core_(count)
     145              14 :         , results_(count)
     146              14 :         , runner_handles_(count)
     147                 :     {
     148               7 :     }
     149                 : 
     150              16 :     void set_result(std::size_t index, T value)
     151                 :     {
     152              16 :         results_[index].emplace(std::move(value));
     153              16 :     }
     154                 : };
     155                 : 
     156                 : /** Specialization for void tasks (no result storage). */
     157                 : template<>
     158                 : struct when_all_homogeneous_state<void>
     159                 : {
     160                 :     when_all_core core_;
     161                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     162                 : 
     163               2 :     explicit when_all_homogeneous_state(std::size_t count)
     164               2 :         : core_(count)
     165               4 :         , runner_handles_(count)
     166                 :     {
     167               2 :     }
     168                 : };
     169                 : 
     170                 : /** Wrapper coroutine that intercepts task completion for when_all.
     171                 : 
     172                 :     Parameterized on StateType to work with both heterogeneous (variadic)
     173                 :     and homogeneous (range) state types. All state types expose their
     174                 :     shared members through a `core_` member of type when_all_core.
     175                 : 
     176                 :     @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
     177                 : */
     178                 : template<typename StateType>
     179                 : struct when_all_runner
     180                 : {
     181                 :     struct promise_type
     182                 :     {
     183                 :         StateType* state_ = nullptr;
     184                 :         std::size_t index_ = 0;
     185                 :         io_env env_;
     186                 : 
     187             159 :         when_all_runner get_return_object() noexcept
     188                 :         {
     189                 :             return when_all_runner(
     190             159 :                 std::coroutine_handle<promise_type>::from_promise(*this));
     191                 :         }
     192                 : 
     193             159 :         std::suspend_always initial_suspend() noexcept
     194                 :         {
     195             159 :             return {};
     196                 :         }
     197                 : 
     198             159 :         auto final_suspend() noexcept
     199                 :         {
     200                 :             struct awaiter
     201                 :             {
     202                 :                 promise_type* p_;
     203             159 :                 bool await_ready() const noexcept { return false; }
     204             159 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     205                 :                 {
     206             159 :                     auto& core = p_->state_->core_;
     207             159 :                     auto* counter = &core.remaining_count_;
     208             159 :                     auto* caller_env = core.caller_env_;
     209             159 :                     auto cont = core.continuation_;
     210                 : 
     211             159 :                     h.destroy();
     212                 : 
     213             159 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     214             159 :                     if(remaining == 1)
     215              71 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     216              88 :                     return detail::symmetric_transfer(std::noop_coroutine());
     217                 :                 }
     218 MIS           0 :                 void await_resume() const noexcept {}
     219                 :             };
     220 HIT         159 :             return awaiter{this};
     221                 :         }
     222                 : 
     223             136 :         void return_void() noexcept {}
     224                 : 
     225              23 :         void unhandled_exception()
     226                 :         {
     227              23 :             state_->core_.capture_exception(std::current_exception());
     228              23 :             state_->core_.stop_source_.request_stop();
     229              23 :         }
     230                 : 
     231                 :         template<class Awaitable>
     232                 :         struct transform_awaiter
     233                 :         {
     234                 :             std::decay_t<Awaitable> a_;
     235                 :             promise_type* p_;
     236                 : 
     237             159 :             bool await_ready() { return a_.await_ready(); }
     238             159 :             decltype(auto) await_resume() { return a_.await_resume(); }
     239                 : 
     240                 :             template<class Promise>
     241             158 :             auto await_suspend(std::coroutine_handle<Promise> h)
     242                 :             {
     243                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     244                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     245             158 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     246                 :                 else
     247                 :                     return a_.await_suspend(h, &p_->env_);
     248                 :             }
     249                 :         };
     250                 : 
     251                 :         template<class Awaitable>
     252             159 :         auto await_transform(Awaitable&& a)
     253                 :         {
     254                 :             using A = std::decay_t<Awaitable>;
     255                 :             if constexpr (IoAwaitable<A>)
     256                 :             {
     257                 :                 return transform_awaiter<Awaitable>{
     258             318 :                     std::forward<Awaitable>(a), this};
     259                 :             }
     260                 :             else
     261                 :             {
     262                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     263                 :             }
     264             159 :         }
     265                 :     };
     266                 : 
     267                 :     std::coroutine_handle<promise_type> h_;
     268                 : 
     269             159 :     explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
     270             159 :         : h_(h)
     271                 :     {
     272             159 :     }
     273                 : 
     274                 :     // Enable move for all clang versions - some versions need it
     275                 :     when_all_runner(when_all_runner&& other) noexcept
     276                 :         : h_(std::exchange(other.h_, nullptr))
     277                 :     {
     278                 :     }
     279                 : 
     280                 :     when_all_runner(when_all_runner const&) = delete;
     281                 :     when_all_runner& operator=(when_all_runner const&) = delete;
     282                 :     when_all_runner& operator=(when_all_runner&&) = delete;
     283                 : 
     284             159 :     auto release() noexcept
     285                 :     {
     286             159 :         return std::exchange(h_, nullptr);
     287                 :     }
     288                 : };
     289                 : 
     290                 : /** Create a runner coroutine for a single awaitable (variadic path).
     291                 : 
     292                 :     Uses compile-time index for tuple-based result storage.
     293                 : */
     294                 : template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
     295                 : when_all_runner<when_all_state<Ts...>>
     296             136 : make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state)
     297                 : {
     298                 :     using T = awaitable_result_t<Awaitable>;
     299                 :     if constexpr (std::is_void_v<T>)
     300                 :     {
     301                 :         co_await std::move(inner);
     302                 :     }
     303                 :     else
     304                 :     {
     305                 :         std::get<Index>(state->results_).set(co_await std::move(inner));
     306                 :     }
     307             272 : }
     308                 : 
     309                 : /** Create a runner coroutine for a single awaitable (range path).
     310                 : 
     311                 :     Uses runtime index for vector-based result storage.
     312                 : */
     313                 : template<IoAwaitable Awaitable, typename StateType>
     314                 : when_all_runner<StateType>
     315              23 : make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
     316                 : {
     317                 :     using T = awaitable_result_t<Awaitable>;
     318                 :     if constexpr (std::is_void_v<T>)
     319                 :     {
     320                 :         co_await std::move(inner);
     321                 :     }
     322                 :     else
     323                 :     {
     324                 :         state->set_result(index, co_await std::move(inner));
     325                 :     }
     326              46 : }
     327                 : 
     328                 : /** Internal awaitable that launches all variadic runner coroutines.
     329                 : 
     330                 :     CRITICAL: If the last task finishes synchronously then the parent
     331                 :     coroutine resumes, destroying its frame, and destroying this object
     332                 :     prior to the completion of await_suspend. Therefore, await_suspend
     333                 :     must ensure `this` cannot be referenced after calling `launch_one`
     334                 :     for the last time.
     335                 : */
     336                 : template<IoAwaitable... Awaitables>
     337                 : class when_all_launcher
     338                 : {
     339                 :     using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
     340                 : 
     341                 :     std::tuple<Awaitables...>* awaitables_;
     342                 :     state_type* state_;
     343                 : 
     344                 : public:
     345              62 :     when_all_launcher(
     346                 :         std::tuple<Awaitables...>* awaitables,
     347                 :         state_type* state)
     348              62 :         : awaitables_(awaitables)
     349              62 :         , state_(state)
     350                 :     {
     351              62 :     }
     352                 : 
     353              62 :     bool await_ready() const noexcept
     354                 :     {
     355              62 :         return sizeof...(Awaitables) == 0;
     356                 :     }
     357                 : 
     358              62 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     359                 :     {
     360              62 :         state_->core_.continuation_ = continuation;
     361              62 :         state_->core_.caller_env_ = caller_env;
     362                 : 
     363              62 :         if(caller_env->stop_token.stop_possible())
     364                 :         {
     365              16 :             state_->core_.parent_stop_callback_.emplace(
     366               8 :                 caller_env->stop_token,
     367               8 :                 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
     368                 : 
     369               8 :             if(caller_env->stop_token.stop_requested())
     370               4 :                 state_->core_.stop_source_.request_stop();
     371                 :         }
     372                 : 
     373              62 :         auto token = state_->core_.stop_source_.get_token();
     374              64 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     375              62 :             (..., launch_one<Is>(caller_env->executor, token));
     376              62 :         }(std::index_sequence_for<Awaitables...>{});
     377                 : 
     378             124 :         return std::noop_coroutine();
     379              62 :     }
     380                 : 
     381              62 :     void await_resume() const noexcept
     382                 :     {
     383              62 :     }
     384                 : 
     385                 : private:
     386                 :     template<std::size_t I>
     387             136 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     388                 :     {
     389             136 :         auto runner = make_when_all_runner<I>(
     390             136 :             std::move(std::get<I>(*awaitables_)), state_);
     391                 : 
     392             136 :         auto h = runner.release();
     393             136 :         h.promise().state_ = state_;
     394             136 :         h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
     395                 : 
     396             136 :         std::coroutine_handle<> ch{h};
     397             136 :         state_->runner_handles_[I] = ch;
     398             136 :         state_->core_.caller_env_->executor.post(ch);
     399             272 :     }
     400                 : };
     401                 : 
     402                 : /** Helper to extract a single result from state.
     403                 :     This is a separate function to work around a GCC-11 ICE that occurs
     404                 :     when using nested immediately-invoked lambdas with pack expansion.
     405                 : */
     406                 : template<std::size_t I, typename... Ts>
     407             100 : auto extract_single_result(when_all_state<Ts...>& state)
     408                 : {
     409             100 :     return std::move(std::get<I>(state.results_)).get();
     410                 : }
     411                 : 
     412                 : /** Extract all results from state as a tuple.
     413                 : */
     414                 : template<typename... Ts>
     415              45 : auto extract_results(when_all_state<Ts...>& state)
     416                 : {
     417              69 :     return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     418              45 :         return std::tuple(extract_single_result<Is>(state)...);
     419              90 :     }(std::index_sequence_for<Ts...>{});
     420                 : }
     421                 : 
     422                 : /** Launches all homogeneous runners concurrently.
     423                 : 
     424                 :     Two-phase approach: create all runners first, then post all.
     425                 :     This avoids lifetime issues if a task completes synchronously.
     426                 : */
     427                 : template<typename Range>
     428                 : class when_all_homogeneous_launcher
     429                 : {
     430                 :     using Awaitable = std::ranges::range_value_t<Range>;
     431                 :     using T = awaitable_result_t<Awaitable>;
     432                 : 
     433                 :     Range* range_;
     434                 :     when_all_homogeneous_state<T>* state_;
     435                 : 
     436                 : public:
     437               9 :     when_all_homogeneous_launcher(
     438                 :         Range* range,
     439                 :         when_all_homogeneous_state<T>* state)
     440               9 :         : range_(range)
     441               9 :         , state_(state)
     442                 :     {
     443               9 :     }
     444                 : 
     445               9 :     bool await_ready() const noexcept
     446                 :     {
     447               9 :         return std::ranges::empty(*range_);
     448                 :     }
     449                 : 
     450               9 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     451                 :     {
     452               9 :         state_->core_.continuation_ = continuation;
     453               9 :         state_->core_.caller_env_ = caller_env;
     454                 : 
     455               9 :         if(caller_env->stop_token.stop_possible())
     456                 :         {
     457               2 :             state_->core_.parent_stop_callback_.emplace(
     458               1 :                 caller_env->stop_token,
     459               1 :                 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
     460                 : 
     461               1 :             if(caller_env->stop_token.stop_requested())
     462 MIS           0 :                 state_->core_.stop_source_.request_stop();
     463                 :         }
     464                 : 
     465 HIT           9 :         auto token = state_->core_.stop_source_.get_token();
     466                 : 
     467                 :         // Phase 1: Create all runners without dispatching.
     468               9 :         std::size_t index = 0;
     469              32 :         for(auto&& a : *range_)
     470                 :         {
     471              23 :             auto runner = make_when_all_homogeneous_runner(
     472              23 :                 std::move(a), state_, index);
     473                 : 
     474              23 :             auto h = runner.release();
     475              23 :             h.promise().state_ = state_;
     476              23 :             h.promise().index_ = index;
     477              23 :             h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
     478                 : 
     479              23 :             state_->runner_handles_[index] = std::coroutine_handle<>{h};
     480              23 :             ++index;
     481                 :         }
     482                 : 
     483                 :         // Phase 2: Post all runners. Any may complete synchronously.
     484                 :         // After last post, state_ and this may be destroyed.
     485               9 :         std::coroutine_handle<>* handles = state_->runner_handles_.data();
     486               9 :         std::size_t count = state_->runner_handles_.size();
     487              32 :         for(std::size_t i = 0; i < count; ++i)
     488              23 :             caller_env->executor.post(handles[i]);
     489                 : 
     490              18 :         return std::noop_coroutine();
     491              32 :     }
     492                 : 
     493               9 :     void await_resume() const noexcept
     494                 :     {
     495               9 :     }
     496                 : };
     497                 : 
     498                 : } // namespace detail
     499                 : 
     500                 : /** Compute the when_all result tuple type.
     501                 : 
     502                 :     Void-returning tasks contribute std::monostate to preserve the
     503                 :     task-index-to-result-index mapping, matching when_any's approach.
     504                 : 
     505                 :     Example: when_all_result_t<int, void, string> = std::tuple<int, std::monostate, string>
     506                 :     Example: when_all_result_t<void, void> = std::tuple<std::monostate, std::monostate>
     507                 : */
     508                 : template<typename... Ts>
     509                 : using when_all_result_t = std::tuple<void_to_monostate_t<Ts>...>;
     510                 : 
     511                 : /** Execute multiple awaitables concurrently and collect their results.
     512                 : 
     513                 :     Launches all awaitables simultaneously and waits for all to complete
     514                 :     before returning. Results are collected in input order. If any
     515                 :     awaitable throws, cancellation is requested for siblings and the first
     516                 :     exception is rethrown after all awaitables complete.
     517                 : 
     518                 :     @li All child awaitables run concurrently on the caller's executor
     519                 :     @li Results are returned as a tuple in input order
     520                 :     @li Void-returning awaitables contribute std::monostate to the
     521                 :         result tuple, preserving the task-index-to-result-index mapping
     522                 :     @li First exception wins; subsequent exceptions are discarded
     523                 :     @li Stop is requested for siblings on first error
     524                 :     @li Completes only after all children have finished
     525                 : 
     526                 :     @par Thread Safety
     527                 :     The returned task must be awaited from a single execution context.
     528                 :     Child awaitables execute concurrently but complete through the caller's
     529                 :     executor.
     530                 : 
     531                 :     @param awaitables The awaitables to execute concurrently. Each must
     532                 :         satisfy @ref IoAwaitable and is consumed (moved-from) when
     533                 :         `when_all` is awaited.
     534                 : 
     535                 :     @return A task yielding a tuple of results in input order. Void tasks
     536                 :         contribute std::monostate to preserve index correspondence.
     537                 : 
     538                 :     @par Example
     539                 : 
     540                 :     @code
     541                 :     task<> example()
     542                 :     {
     543                 :         // Concurrent fetch, results collected in order
     544                 :         auto [user, posts] = co_await when_all(
     545                 :             fetch_user( id ),      // task<User>
     546                 :             fetch_posts( id )      // task<std::vector<Post>>
     547                 :         );
     548                 : 
     549                 :         // Void awaitables contribute monostate
     550                 :         auto [a, _, b] = co_await when_all(
     551                 :             fetch_int(),           // task<int>
     552                 :             log_event( "start" ),  // task<void>  → monostate
     553                 :             fetch_str()            // task<string>
     554                 :         );
     555                 :         // a is int, _ is monostate, b is string
     556                 :     }
     557                 :     @endcode
     558                 : 
     559                 :     @see IoAwaitable, task
     560                 : */
     561                 : template<IoAwaitable... As>
     562              62 : [[nodiscard]] auto when_all(As... awaitables)
     563                 :     -> task<when_all_result_t<awaitable_result_t<As>...>>
     564                 : {
     565                 :     // State is stored in the coroutine frame, using the frame allocator
     566                 :     detail::when_all_state<awaitable_result_t<As>...> state;
     567                 : 
     568                 :     // Store awaitables in the frame
     569                 :     std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
     570                 : 
     571                 :     // Launch all awaitables and wait for completion
     572                 :     co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);
     573                 : 
     574                 :     // Propagate first exception if any.
     575                 :     // Safe without explicit acquire: capture_exception() is sequenced-before
     576                 :     // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
     577                 :     // last task's decrement that resumes this coroutine.
     578                 :     if(state.core_.first_exception_)
     579                 :         std::rethrow_exception(state.core_.first_exception_);
     580                 : 
     581                 :     co_return detail::extract_results(state);
     582             124 : }
     583                 : 
     584                 : /** Execute a range of awaitables concurrently and collect their results.
     585                 : 
     586                 :     Launches all awaitables in the range simultaneously and waits for all
     587                 :     to complete. Results are collected in a vector preserving input order.
     588                 :     If any awaitable throws, cancellation is requested for siblings and
     589                 :     the first exception is rethrown after all awaitables complete.
     590                 : 
     591                 :     @li All child awaitables run concurrently on the caller's executor
     592                 :     @li Results are returned as a vector in input order
     593                 :     @li First exception wins; subsequent exceptions are discarded
     594                 :     @li Stop is requested for siblings on first error
     595                 :     @li Completes only after all children have finished
     596                 : 
     597                 :     @par Thread Safety
     598                 :     The returned task must be awaited from a single execution context.
     599                 :     Child awaitables execute concurrently but complete through the caller's
     600                 :     executor.
     601                 : 
     602                 :     @param awaitables Range of awaitables to execute concurrently (must
     603                 :         not be empty). Each element must satisfy @ref IoAwaitable and is
     604                 :         consumed (moved-from) when `when_all` is awaited.
     605                 : 
     606                 :     @return A task yielding a vector where each element is the result of
     607                 :         the corresponding awaitable, in input order.
     608                 : 
     609                 :     @throws std::invalid_argument if range is empty (thrown before
     610                 :         coroutine suspends).
     611                 :     @throws Rethrows the first child exception after all children
     612                 :         complete.
     613                 : 
     614                 :     @par Example
     615                 :     @code
     616                 :     task<void> example()
     617                 :     {
     618                 :         std::vector<task<Response>> requests;
     619                 :         for (auto const& url : urls)
     620                 :             requests.push_back(fetch(url));
     621                 : 
     622                 :         auto responses = co_await when_all(std::move(requests));
     623                 :     }
     624                 :     @endcode
     625                 : 
     626                 :     @see IoAwaitableRange, when_all
     627                 : */
     628                 : template<IoAwaitableRange R>
     629                 :     requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
     630               8 : [[nodiscard]] auto when_all(R&& awaitables)
     631                 :     -> task<std::vector<awaitable_result_t<std::ranges::range_value_t<R>>>>
     632                 : {
     633                 :     using Awaitable = std::ranges::range_value_t<R>;
     634                 :     using T = awaitable_result_t<Awaitable>;
     635                 :     using OwnedRange = std::remove_cvref_t<R>;
     636                 : 
     637                 :     auto count = std::ranges::size(awaitables);
     638                 :     if(count == 0)
     639                 :         throw std::invalid_argument("when_all requires at least one awaitable");
     640                 : 
     641                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     642                 : 
     643                 :     detail::when_all_homogeneous_state<T> state(count);
     644                 : 
     645                 :     co_await detail::when_all_homogeneous_launcher<OwnedRange>(
     646                 :         &owned_awaitables, &state);
     647                 : 
     648                 :     if(state.core_.first_exception_)
     649                 :         std::rethrow_exception(state.core_.first_exception_);
     650                 : 
     651                 :     std::vector<T> results;
     652                 :     results.reserve(count);
     653                 :     for(auto& opt : state.results_)
     654                 :         results.push_back(std::move(*opt));
     655                 : 
     656                 :     co_return results;
     657              16 : }
     658                 : 
     659                 : /** Execute a range of void awaitables concurrently.
     660                 : 
     661                 :     Launches all awaitables in the range simultaneously and waits for all
     662                 :     to complete. Since all awaitables return void, no results are collected.
     663                 :     If any awaitable throws, cancellation is requested for siblings and
     664                 :     the first exception is rethrown after all awaitables complete.
     665                 : 
     666                 :     @li All child awaitables run concurrently on the caller's executor
     667                 :     @li First exception wins; subsequent exceptions are discarded
     668                 :     @li Stop is requested for siblings on first error
     669                 :     @li Completes only after all children have finished
     670                 : 
     671                 :     @par Thread Safety
     672                 :     The returned task must be awaited from a single execution context.
     673                 :     Child awaitables execute concurrently but complete through the caller's
     674                 :     executor.
     675                 : 
     676                 :     @param awaitables Range of void awaitables to execute concurrently
     677                 :         (must not be empty).
     678                 : 
     679                 :     @throws std::invalid_argument if range is empty (thrown before
     680                 :         coroutine suspends).
     681                 :     @throws Rethrows the first child exception after all children
     682                 :         complete.
     683                 : 
     684                 :     @par Example
     685                 :     @code
     686                 :     task<void> example()
     687                 :     {
     688                 :         std::vector<task<void>> jobs;
     689                 :         for (int i = 0; i < n; ++i)
     690                 :             jobs.push_back(process(i));
     691                 : 
     692                 :         co_await when_all(std::move(jobs));
     693                 :     }
     694                 :     @endcode
     695                 : 
     696                 :     @see IoAwaitableRange, when_all
     697                 : */
     698                 : template<IoAwaitableRange R>
     699                 :     requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
     700               3 : [[nodiscard]] auto when_all(R&& awaitables) -> task<void>
     701                 : {
     702                 :     using OwnedRange = std::remove_cvref_t<R>;
     703                 : 
     704                 :     auto count = std::ranges::size(awaitables);
     705                 :     if(count == 0)
     706                 :         throw std::invalid_argument("when_all requires at least one awaitable");
     707                 : 
     708                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     709                 : 
     710                 :     detail::when_all_homogeneous_state<void> state(count);
     711                 : 
     712                 :     co_await detail::when_all_homogeneous_launcher<OwnedRange>(
     713                 :         &owned_awaitables, &state);
     714                 : 
     715                 :     if(state.core_.first_exception_)
     716                 :         std::rethrow_exception(state.core_.first_exception_);
     717               6 : }
     718                 : 
     719                 : } // namespace capy
     720                 : } // namespace boost
     721                 : 
     722                 : #endif
        

Generated by: LCOV version 2.3