1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/void_to_monostate.hpp>
14  
#include <boost/capy/detail/void_to_monostate.hpp>
15  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/concept/io_awaitable.hpp>
16  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <coroutine>
17  
#include <coroutine>
18  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/ex/frame_allocator.hpp>
19  
#include <boost/capy/ex/frame_allocator.hpp>
20  
#include <boost/capy/task.hpp>
20  
#include <boost/capy/task.hpp>
21  

21  

22  
#include <array>
22  
#include <array>
23  
#include <atomic>
23  
#include <atomic>
24  
#include <exception>
24  
#include <exception>
25  
#include <optional>
25  
#include <optional>
 
26 +
#include <ranges>
 
27 +
#include <stdexcept>
26  
#include <stop_token>
28  
#include <stop_token>
27  
#include <tuple>
29  
#include <tuple>
28  
#include <type_traits>
30  
#include <type_traits>
29  
#include <utility>
31  
#include <utility>
 
32 +
#include <vector>
30  

33  

31  
namespace boost {
34  
namespace boost {
32  
namespace capy {
35  
namespace capy {
33  

36  

34  
namespace detail {
37  
namespace detail {
35  

38  

36  
/** Holds the result of a single task within when_all.
39  
/** Holds the result of a single task within when_all.
37  
*/
40  
*/
38  
template<typename T>
41  
template<typename T>
39  
struct result_holder
42  
struct result_holder
40  
{
43  
{
41  
    std::optional<T> value_;
44  
    std::optional<T> value_;
42  

45  

43  
    void set(T v)
46  
    void set(T v)
44  
    {
47  
    {
45  
        value_ = std::move(v);
48  
        value_ = std::move(v);
46  
    }
49  
    }
47  

50  

48  
    T get() &&
51  
    T get() &&
49  
    {
52  
    {
50  
        return std::move(*value_);
53  
        return std::move(*value_);
51  
    }
54  
    }
52  
};
55  
};
53  

56  

54  
/** Specialization for void tasks - returns monostate to preserve index mapping.
57  
/** Specialization for void tasks - returns monostate to preserve index mapping.
55  
*/
58  
*/
56  
template<>
59  
template<>
57  
struct result_holder<void>
60  
struct result_holder<void>
58  
{
61  
{
59  
    std::monostate get() && { return {}; }
62  
    std::monostate get() && { return {}; }
60  
};
63  
};
61  

64  

62 -
/** Shared state for when_all operation.
65 +
/** Core shared state for when_all operations.
63  

66  

64 -
    @tparam Ts The result types of the tasks.
67 +
    Contains all members and methods common to both heterogeneous (variadic)
 
68 +
    and homogeneous (range) when_all implementations. State classes embed
 
69 +
    this via composition to avoid CRTP destructor ordering issues.
 
70 +

 
71 +
    @par Thread Safety
 
72 +
    Atomic operations protect exception capture and completion count.
65  
*/
73  
*/
66 -
template<typename... Ts>
74 +
struct when_all_core
67 -
struct when_all_state
 
68 -
    static constexpr std::size_t task_count = sizeof...(Ts);
 
69 -

 
70 -
    // Completion tracking - when_all waits for all children
 
71  
{
75  
{
72  
    std::atomic<std::size_t> remaining_count_;
76  
    std::atomic<std::size_t> remaining_count_;
73 -
    // Result storage in input order
 
74 -
    std::tuple<result_holder<Ts>...> results_;
 
75 -

 
76 -
    // Runner handles - destroyed in await_resume while allocator is valid
 
77 -
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
 
78 -

 
79  

77  

80  
    // Exception storage - first error wins, others discarded
78  
    // Exception storage - first error wins, others discarded
81  
    std::atomic<bool> has_exception_{false};
79  
    std::atomic<bool> has_exception_{false};
82  
    std::exception_ptr first_exception_;
80  
    std::exception_ptr first_exception_;
83 -
    // Stop propagation - on error, request stop for siblings
 
84  

81  

85  
    std::stop_source stop_source_;
82  
    std::stop_source stop_source_;
86  

83  

87 -
    // Connects parent's stop_token to our stop_source
84 +
    // Bridges parent's stop token to our stop_source
88  
    struct stop_callback_fn
85  
    struct stop_callback_fn
89  
    {
86  
    {
90  
        std::stop_source* source_;
87  
        std::stop_source* source_;
91  
        void operator()() const { source_->request_stop(); }
88  
        void operator()() const { source_->request_stop(); }
92  
    };
89  
    };
93  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
90  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
94  
    std::optional<stop_callback_t> parent_stop_callback_;
91  
    std::optional<stop_callback_t> parent_stop_callback_;
95 -
    // Parent resumption
 
96  

92  

97  
    std::coroutine_handle<> continuation_;
93  
    std::coroutine_handle<> continuation_;
98  
    io_env const* caller_env_ = nullptr;
94  
    io_env const* caller_env_ = nullptr;
99  

95  

100 -
    when_all_state()
96 +
    explicit when_all_core(std::size_t count) noexcept
101 -
        : remaining_count_(task_count)
97 +
        : remaining_count_(count)
102  
    {
98  
    {
103  
    }
99  
    }
104  

100  

105 -
    // Runners self-destruct in final_suspend. No destruction needed here.
101 +
    /** Capture an exception (first one wins). */
106 -

 
107 -
    /** Capture an exception (first one wins).
 
108 -
    */
 
109  
    void capture_exception(std::exception_ptr ep)
102  
    void capture_exception(std::exception_ptr ep)
110  
    {
103  
    {
111  
        bool expected = false;
104  
        bool expected = false;
112  
        if(has_exception_.compare_exchange_strong(
105  
        if(has_exception_.compare_exchange_strong(
113  
            expected, true, std::memory_order_relaxed))
106  
            expected, true, std::memory_order_relaxed))
114  
            first_exception_ = ep;
107  
            first_exception_ = ep;
115  
    }
108  
    }
 
109 +
};
 
110 +

 
111 +
/** Shared state for heterogeneous when_all (variadic overload).
 
112 +

 
113 +
    @tparam Ts The result types of the tasks.
 
114 +
*/
 
115 +
template<typename... Ts>
 
116 +
struct when_all_state
 
117 +
{
 
118 +
    static constexpr std::size_t task_count = sizeof...(Ts);
 
119 +

 
120 +
    when_all_core core_;
 
121 +
    std::tuple<result_holder<Ts>...> results_;
 
122 +
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
 
123 +

 
124 +
    when_all_state()
 
125 +
        : core_(task_count)
 
126 +
    {
 
127 +
    }
 
128 +
};
 
129 +

 
130 +
/** Shared state for homogeneous when_all (range overload).
 
131 +

 
132 +
    Stores all results in a vector indexed by task position.
 
133 +

 
134 +
    @tparam T The common result type of all tasks.
 
135 +
*/
 
136 +
template<typename T>
 
137 +
struct when_all_homogeneous_state
 
138 +
{
 
139 +
    when_all_core core_;
 
140 +
    std::vector<std::optional<T>> results_;
 
141 +
    std::vector<std::coroutine_handle<>> runner_handles_;
 
142 +

 
143 +
    explicit when_all_homogeneous_state(std::size_t count)
 
144 +
        : core_(count)
 
145 +
        , results_(count)
 
146 +
        , runner_handles_(count)
 
147 +
    {
 
148 +
    }
116  

149  

 
150 +
    void set_result(std::size_t index, T value)
 
151 +
    {
 
152 +
        results_[index].emplace(std::move(value));
 
153 +
    }
117  
};
154  
};
118  

155  

119 -
/** Wrapper coroutine that intercepts task completion.
156 +
/** Specialization for void tasks (no result storage). */
 
157 +
template<>
 
158 +
struct when_all_homogeneous_state<void>
 
159 +
{
 
160 +
    when_all_core core_;
 
161 +
    std::vector<std::coroutine_handle<>> runner_handles_;
120  

162  

121 -
    This runner awaits its assigned task and stores the result in
163 +
    explicit when_all_homogeneous_state(std::size_t count)
122 -
    the shared state, or captures the exception and requests stop.
164 +
        : core_(count)
 
165 +
        , runner_handles_(count)
 
166 +
    {
 
167 +
    }
 
168 +
};
 
169 +

 
170 +
/** Wrapper coroutine that intercepts task completion for when_all.
 
171 +

 
172 +
    Parameterized on StateType to work with both heterogeneous (variadic)
 
173 +
    and homogeneous (range) state types. All state types expose their
 
174 +
    shared members through a `core_` member of type when_all_core.
 
175 +

 
176 +
    @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
123  
*/
177  
*/
124 -
template<typename T, typename... Ts>
178 +
template<typename StateType>
125  
struct when_all_runner
179  
struct when_all_runner
126  
{
180  
{
127 -
    struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
181 +
    struct promise_type
128  
    {
182  
    {
129 -
        when_all_state<Ts...>* state_ = nullptr;
183 +
        StateType* state_ = nullptr;
 
184 +
        std::size_t index_ = 0;
130  
        io_env env_;
185  
        io_env env_;
131  

186  

132 -
        when_all_runner get_return_object()
187 +
        when_all_runner get_return_object() noexcept
133  
        {
188  
        {
134 -
            return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
189 +
            return when_all_runner(
 
190 +
                std::coroutine_handle<promise_type>::from_promise(*this));
135  
        }
191  
        }
136  

192  

137  
        std::suspend_always initial_suspend() noexcept
193  
        std::suspend_always initial_suspend() noexcept
138  
        {
194  
        {
139  
            return {};
195  
            return {};
140  
        }
196  
        }
141  

197  

142  
        auto final_suspend() noexcept
198  
        auto final_suspend() noexcept
143  
        {
199  
        {
144  
            struct awaiter
200  
            struct awaiter
145  
            {
201  
            {
146  
                promise_type* p_;
202  
                promise_type* p_;
147 -

203 +
                bool await_ready() const noexcept { return false; }
148 -
                bool await_ready() const noexcept
 
149 -
                {
 
150 -
                    return false;
 
151 -
                }
 
152 -

 
153  
                auto await_suspend(std::coroutine_handle<> h) noexcept
204  
                auto await_suspend(std::coroutine_handle<> h) noexcept
154  
                {
205  
                {
155 -
                    // Extract everything needed before self-destruction.
206 +
                    auto& core = p_->state_->core_;
156 -
                    auto* state = p_->state_;
207 +
                    auto* counter = &core.remaining_count_;
157 -
                    auto* counter = &state->remaining_count_;
208 +
                    auto* caller_env = core.caller_env_;
158 -
                    auto* caller_env = state->caller_env_;
209 +
                    auto cont = core.continuation_;
159 -
                    auto cont = state->continuation_;
 
160  

210  

161  
                    h.destroy();
211  
                    h.destroy();
162 -
                    // If last runner, dispatch parent for symmetric transfer.
 
163  

212  

164  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
213  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
165  
                    if(remaining == 1)
214  
                    if(remaining == 1)
166  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
215  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
167  
                    return detail::symmetric_transfer(std::noop_coroutine());
216  
                    return detail::symmetric_transfer(std::noop_coroutine());
168  
                }
217  
                }
169 -

218 +
                void await_resume() const noexcept {}
170 -
                void await_resume() const noexcept
 
171 -
                {
 
172 -
                }
 
173  
            };
219  
            };
174  
            return awaiter{this};
220  
            return awaiter{this};
175  
        }
221  
        }
176  

222  

177 -
        void return_void()
223 +
        void return_void() noexcept {}
178 -
        {
 
179 -
        }
 
180  

224  

181  
        void unhandled_exception()
225  
        void unhandled_exception()
182  
        {
226  
        {
183 -
            state_->capture_exception(std::current_exception());
227 +
            state_->core_.capture_exception(std::current_exception());
184 -
            // Request stop for sibling tasks
228 +
            state_->core_.stop_source_.request_stop();
185 -
            state_->stop_source_.request_stop();
 
186  
        }
229  
        }
187  

230  

188  
        template<class Awaitable>
231  
        template<class Awaitable>
189  
        struct transform_awaiter
232  
        struct transform_awaiter
190  
        {
233  
        {
191  
            std::decay_t<Awaitable> a_;
234  
            std::decay_t<Awaitable> a_;
192  
            promise_type* p_;
235  
            promise_type* p_;
193  

236  

194 -
            bool await_ready()
237 +
            bool await_ready() { return a_.await_ready(); }
195 -
            {
238 +
            decltype(auto) await_resume() { return a_.await_resume(); }
196 -
                return a_.await_ready();
 
197 -
            }
 
198 -

 
199 -
            decltype(auto) await_resume()
 
200 -
            {
 
201 -
                return a_.await_resume();
 
202 -
            }
 
203  

239  

204  
            template<class Promise>
240  
            template<class Promise>
205  
            auto await_suspend(std::coroutine_handle<Promise> h)
241  
            auto await_suspend(std::coroutine_handle<Promise> h)
206  
            {
242  
            {
207  
                using R = decltype(a_.await_suspend(h, &p_->env_));
243  
                using R = decltype(a_.await_suspend(h, &p_->env_));
208  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
244  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
209  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
245  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
210  
                else
246  
                else
211  
                    return a_.await_suspend(h, &p_->env_);
247  
                    return a_.await_suspend(h, &p_->env_);
212  
            }
248  
            }
213  
        };
249  
        };
214  

250  

215  
        template<class Awaitable>
251  
        template<class Awaitable>
216  
        auto await_transform(Awaitable&& a)
252  
        auto await_transform(Awaitable&& a)
217  
        {
253  
        {
218  
            using A = std::decay_t<Awaitable>;
254  
            using A = std::decay_t<Awaitable>;
219  
            if constexpr (IoAwaitable<A>)
255  
            if constexpr (IoAwaitable<A>)
220  
            {
256  
            {
221  
                return transform_awaiter<Awaitable>{
257  
                return transform_awaiter<Awaitable>{
222  
                    std::forward<Awaitable>(a), this};
258  
                    std::forward<Awaitable>(a), this};
223  
            }
259  
            }
224  
            else
260  
            else
225  
            {
261  
            {
226  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
262  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
227  
            }
263  
            }
228  
        }
264  
        }
229  
    };
265  
    };
230  

266  

231  
    std::coroutine_handle<promise_type> h_;
267  
    std::coroutine_handle<promise_type> h_;
232  

268  

233 -
    explicit when_all_runner(std::coroutine_handle<promise_type> h)
269 +
    explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
234  
        : h_(h)
270  
        : h_(h)
235  
    {
271  
    {
236  
    }
272  
    }
237  

273  

238  
    // Enable move for all clang versions - some versions need it
274  
    // Enable move for all clang versions - some versions need it
239 -
    when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
275 +
    when_all_runner(when_all_runner&& other) noexcept
 
276 +
        : h_(std::exchange(other.h_, nullptr))
 
277 +
    {
 
278 +
    }
240 -
    // Non-copyable
 
241  

279  

242  
    when_all_runner(when_all_runner const&) = delete;
280  
    when_all_runner(when_all_runner const&) = delete;
243  
    when_all_runner& operator=(when_all_runner const&) = delete;
281  
    when_all_runner& operator=(when_all_runner const&) = delete;
244  
    when_all_runner& operator=(when_all_runner&&) = delete;
282  
    when_all_runner& operator=(when_all_runner&&) = delete;
245  

283  

246  
    auto release() noexcept
284  
    auto release() noexcept
247  
    {
285  
    {
248  
        return std::exchange(h_, nullptr);
286  
        return std::exchange(h_, nullptr);
249  
    }
287  
    }
250  
};
288  
};
251  

289  

252 -
/** Create a runner coroutine for a single awaitable.
290 +
/** Create a runner coroutine for a single awaitable (variadic path).
253  

291  

254 -
    Awaitable is passed directly to ensure proper coroutine frame storage.
292 +
    Uses compile-time index for tuple-based result storage.
255  
*/
293  
*/
256  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
294  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
257 -
when_all_runner<awaitable_result_t<Awaitable>, Ts...>
295 +
when_all_runner<when_all_state<Ts...>>
258  
make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state)
296  
make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state)
259  
{
297  
{
260  
    using T = awaitable_result_t<Awaitable>;
298  
    using T = awaitable_result_t<Awaitable>;
261  
    if constexpr (std::is_void_v<T>)
299  
    if constexpr (std::is_void_v<T>)
262  
    {
300  
    {
263  
        co_await std::move(inner);
301  
        co_await std::move(inner);
264  
    }
302  
    }
265  
    else
303  
    else
266  
    {
304  
    {
267  
        std::get<Index>(state->results_).set(co_await std::move(inner));
305  
        std::get<Index>(state->results_).set(co_await std::move(inner));
268  
    }
306  
    }
269  
}
307  
}
270  

308  

271 -
/** Internal awaitable that launches all runner coroutines and waits.
309 +
/** Create a runner coroutine for a single awaitable (range path).
272  

310  

273 -
    This awaitable is used inside the when_all coroutine to handle
311 +
    Uses runtime index for vector-based result storage.
274 -
    the concurrent execution of child awaitables.
312 +
*/
 
313 +
template<IoAwaitable Awaitable, typename StateType>
 
314 +
when_all_runner<StateType>
 
315 +
make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
 
316 +
{
 
317 +
    using T = awaitable_result_t<Awaitable>;
 
318 +
    if constexpr (std::is_void_v<T>)
 
319 +
    {
 
320 +
        co_await std::move(inner);
 
321 +
    }
 
322 +
    else
 
323 +
    {
 
324 +
        state->set_result(index, co_await std::move(inner));
 
325 +
    }
 
326 +
}
 
327 +

 
328 +
/** Internal awaitable that launches all variadic runner coroutines.
 
329 +

 
330 +
    CRITICAL: If the last task finishes synchronously then the parent
 
331 +
    coroutine resumes, destroying its frame, and destroying this object
 
332 +
    prior to the completion of await_suspend. Therefore, await_suspend
 
333 +
    must ensure `this` cannot be referenced after calling `launch_one`
 
334 +
    for the last time.
275  
*/
335  
*/
276  
template<IoAwaitable... Awaitables>
336  
template<IoAwaitable... Awaitables>
277  
class when_all_launcher
337  
class when_all_launcher
278  
{
338  
{
279  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
339  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
280  

340  

281  
    std::tuple<Awaitables...>* awaitables_;
341  
    std::tuple<Awaitables...>* awaitables_;
282  
    state_type* state_;
342  
    state_type* state_;
283  

343  

284  
public:
344  
public:
285  
    when_all_launcher(
345  
    when_all_launcher(
286  
        std::tuple<Awaitables...>* awaitables,
346  
        std::tuple<Awaitables...>* awaitables,
287  
        state_type* state)
347  
        state_type* state)
288  
        : awaitables_(awaitables)
348  
        : awaitables_(awaitables)
289  
        , state_(state)
349  
        , state_(state)
290  
    {
350  
    {
291  
    }
351  
    }
292  

352  

293  
    bool await_ready() const noexcept
353  
    bool await_ready() const noexcept
294  
    {
354  
    {
295  
        return sizeof...(Awaitables) == 0;
355  
        return sizeof...(Awaitables) == 0;
296  
    }
356  
    }
297  

357  

298  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
358  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
299  
    {
359  
    {
300 -
        state_->continuation_ = continuation;
360 +
        state_->core_.continuation_ = continuation;
301 -
        state_->caller_env_ = caller_env;
361 +
        state_->core_.caller_env_ = caller_env;
302 -
        // Forward parent's stop requests to children
 
303  

362  

304  
        if(caller_env->stop_token.stop_possible())
363  
        if(caller_env->stop_token.stop_possible())
305  
        {
364  
        {
306 -
            state_->parent_stop_callback_.emplace(
365 +
            state_->core_.parent_stop_callback_.emplace(
307  
                caller_env->stop_token,
366  
                caller_env->stop_token,
308 -
                typename state_type::stop_callback_fn{&state_->stop_source_});
367 +
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
309  

368  

310  
            if(caller_env->stop_token.stop_requested())
369  
            if(caller_env->stop_token.stop_requested())
311 -
                state_->stop_source_.request_stop();
370 +
                state_->core_.stop_source_.request_stop();
312  
        }
371  
        }
313  

372  

314 -
        // CRITICAL: If the last task finishes synchronously then the parent
373 +
        auto token = state_->core_.stop_source_.get_token();
315 -
        // coroutine resumes, destroying its frame, and destroying this object
 
316 -
        // prior to the completion of await_suspend. Therefore, await_suspend
 
317 -
        // must ensure `this` cannot be referenced after calling `launch_one`
 
318 -
        // for the last time.
 
319 -
        auto token = state_->stop_source_.get_token();
 
320  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
374  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
321  
            (..., launch_one<Is>(caller_env->executor, token));
375  
            (..., launch_one<Is>(caller_env->executor, token));
322  
        }(std::index_sequence_for<Awaitables...>{});
376  
        }(std::index_sequence_for<Awaitables...>{});
323 -
        // Let signal_completion() handle resumption
 
324  

377  

325  
        return std::noop_coroutine();
378  
        return std::noop_coroutine();
326  
    }
379  
    }
327  

380  

328  
    void await_resume() const noexcept
381  
    void await_resume() const noexcept
329 -
        // Results are extracted by the when_all coroutine from state
 
330  
    {
382  
    {
331  
    }
383  
    }
332  

384  

333  
private:
385  
private:
334  
    template<std::size_t I>
386  
    template<std::size_t I>
335  
    void launch_one(executor_ref caller_ex, std::stop_token token)
387  
    void launch_one(executor_ref caller_ex, std::stop_token token)
336  
    {
388  
    {
337  
        auto runner = make_when_all_runner<I>(
389  
        auto runner = make_when_all_runner<I>(
338  
            std::move(std::get<I>(*awaitables_)), state_);
390  
            std::move(std::get<I>(*awaitables_)), state_);
339  

391  

340  
        auto h = runner.release();
392  
        auto h = runner.release();
341  
        h.promise().state_ = state_;
393  
        h.promise().state_ = state_;
342 -
        h.promise().env_ = io_env{caller_ex, token, state_->caller_env_->frame_allocator};
394 +
        h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
343  

395  

344  
        std::coroutine_handle<> ch{h};
396  
        std::coroutine_handle<> ch{h};
345  
        state_->runner_handles_[I] = ch;
397  
        state_->runner_handles_[I] = ch;
346 -
        state_->caller_env_->executor.post(ch);
398 +
        state_->core_.caller_env_->executor.post(ch);
347  
    }
399  
    }
348  
};
400  
};
349  

401  

350  
/** Helper to extract a single result from state.
402  
/** Helper to extract a single result from state.
351  
    This is a separate function to work around a GCC-11 ICE that occurs
403  
    This is a separate function to work around a GCC-11 ICE that occurs
352  
    when using nested immediately-invoked lambdas with pack expansion.
404  
    when using nested immediately-invoked lambdas with pack expansion.
353  
*/
405  
*/
354  
template<std::size_t I, typename... Ts>
406  
template<std::size_t I, typename... Ts>
355  
auto extract_single_result(when_all_state<Ts...>& state)
407  
auto extract_single_result(when_all_state<Ts...>& state)
356  
{
408  
{
357  
    return std::move(std::get<I>(state.results_)).get();
409  
    return std::move(std::get<I>(state.results_)).get();
358  
}
410  
}
359  

411  

360  
/** Extract all results from state as a tuple.
412  
/** Extract all results from state as a tuple.
361  
*/
413  
*/
362  
template<typename... Ts>
414  
template<typename... Ts>
363  
auto extract_results(when_all_state<Ts...>& state)
415  
auto extract_results(when_all_state<Ts...>& state)
364  
{
416  
{
365  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
417  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
366  
        return std::tuple(extract_single_result<Is>(state)...);
418  
        return std::tuple(extract_single_result<Is>(state)...);
367  
    }(std::index_sequence_for<Ts...>{});
419  
    }(std::index_sequence_for<Ts...>{});
368  
}
420  
}
369  

421  

 
422 +
/** Launches all homogeneous runners concurrently.
 
423 +

 
424 +
    Two-phase approach: create all runners first, then post all.
 
425 +
    This avoids lifetime issues if a task completes synchronously.
 
426 +
*/
 
427 +
template<typename Range>
 
428 +
class when_all_homogeneous_launcher
 
429 +
{
 
430 +
    using Awaitable = std::ranges::range_value_t<Range>;
 
431 +
    using T = awaitable_result_t<Awaitable>;
 
432 +

 
433 +
    Range* range_;
 
434 +
    when_all_homogeneous_state<T>* state_;
 
435 +

 
436 +
public:
 
437 +
    when_all_homogeneous_launcher(
 
438 +
        Range* range,
 
439 +
        when_all_homogeneous_state<T>* state)
 
440 +
        : range_(range)
 
441 +
        , state_(state)
 
442 +
    {
 
443 +
    }
 
444 +

 
445 +
    bool await_ready() const noexcept
 
446 +
    {
 
447 +
        return std::ranges::empty(*range_);
 
448 +
    }
 
449 +

 
450 +
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
 
451 +
    {
 
452 +
        state_->core_.continuation_ = continuation;
 
453 +
        state_->core_.caller_env_ = caller_env;
 
454 +

 
455 +
        if(caller_env->stop_token.stop_possible())
 
456 +
        {
 
457 +
            state_->core_.parent_stop_callback_.emplace(
 
458 +
                caller_env->stop_token,
 
459 +
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
 
460 +

 
461 +
            if(caller_env->stop_token.stop_requested())
 
462 +
                state_->core_.stop_source_.request_stop();
 
463 +
        }
 
464 +

 
465 +
        auto token = state_->core_.stop_source_.get_token();
 
466 +

 
467 +
        // Phase 1: Create all runners without dispatching.
 
468 +
        std::size_t index = 0;
 
469 +
        for(auto&& a : *range_)
 
470 +
        {
 
471 +
            auto runner = make_when_all_homogeneous_runner(
 
472 +
                std::move(a), state_, index);
 
473 +

 
474 +
            auto h = runner.release();
 
475 +
            h.promise().state_ = state_;
 
476 +
            h.promise().index_ = index;
 
477 +
            h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
 
478 +

 
479 +
            state_->runner_handles_[index] = std::coroutine_handle<>{h};
 
480 +
            ++index;
 
481 +
        }
 
482 +

 
483 +
        // Phase 2: Post all runners. Any may complete synchronously.
 
484 +
        // After last post, state_ and this may be destroyed.
 
485 +
        std::coroutine_handle<>* handles = state_->runner_handles_.data();
 
486 +
        std::size_t count = state_->runner_handles_.size();
 
487 +
        for(std::size_t i = 0; i < count; ++i)
 
488 +
            caller_env->executor.post(handles[i]);
 
489 +

 
490 +
        return std::noop_coroutine();
 
491 +
    }
 
492 +

 
493 +
    void await_resume() const noexcept
 
494 +
    {
 
495 +
    }
 
496 +
};
 
497 +

370  
} // namespace detail
498  
} // namespace detail
371  

499  

372  
/** Compute the when_all result tuple type.
500  
/** Compute the when_all result tuple type.
373  

501  

374  
    Void-returning tasks contribute std::monostate to preserve the
502  
    Void-returning tasks contribute std::monostate to preserve the
375  
    task-index-to-result-index mapping, matching when_any's approach.
503  
    task-index-to-result-index mapping, matching when_any's approach.
376  

504  

377  
    Example: when_all_result_t<int, void, string> = std::tuple<int, std::monostate, string>
505  
    Example: when_all_result_t<int, void, string> = std::tuple<int, std::monostate, string>
378  
    Example: when_all_result_t<void, void> = std::tuple<std::monostate, std::monostate>
506  
    Example: when_all_result_t<void, void> = std::tuple<std::monostate, std::monostate>
379  
*/
507  
*/
380  
template<typename... Ts>
508  
template<typename... Ts>
381  
using when_all_result_t = std::tuple<void_to_monostate_t<Ts>...>;
509  
using when_all_result_t = std::tuple<void_to_monostate_t<Ts>...>;
382  

510  

383  
/** Execute multiple awaitables concurrently and collect their results.
511  
/** Execute multiple awaitables concurrently and collect their results.
384  

512  

385  
    Launches all awaitables simultaneously and waits for all to complete
513  
    Launches all awaitables simultaneously and waits for all to complete
386  
    before returning. Results are collected in input order. If any
514  
    before returning. Results are collected in input order. If any
387  
    awaitable throws, cancellation is requested for siblings and the first
515  
    awaitable throws, cancellation is requested for siblings and the first
388  
    exception is rethrown after all awaitables complete.
516  
    exception is rethrown after all awaitables complete.
389  

517  

390  
    @li All child awaitables run concurrently on the caller's executor
518  
    @li All child awaitables run concurrently on the caller's executor
391  
    @li Results are returned as a tuple in input order
519  
    @li Results are returned as a tuple in input order
392  
    @li Void-returning awaitables contribute std::monostate to the
520  
    @li Void-returning awaitables contribute std::monostate to the
393  
        result tuple, preserving the task-index-to-result-index mapping
521  
        result tuple, preserving the task-index-to-result-index mapping
394  
    @li First exception wins; subsequent exceptions are discarded
522  
    @li First exception wins; subsequent exceptions are discarded
395  
    @li Stop is requested for siblings on first error
523  
    @li Stop is requested for siblings on first error
396  
    @li Completes only after all children have finished
524  
    @li Completes only after all children have finished
397  

525  

398  
    @par Thread Safety
526  
    @par Thread Safety
399  
    The returned task must be awaited from a single execution context.
527  
    The returned task must be awaited from a single execution context.
400  
    Child awaitables execute concurrently but complete through the caller's
528  
    Child awaitables execute concurrently but complete through the caller's
401  
    executor.
529  
    executor.
402  

530  

403  
    @param awaitables The awaitables to execute concurrently. Each must
531  
    @param awaitables The awaitables to execute concurrently. Each must
404  
        satisfy @ref IoAwaitable and is consumed (moved-from) when
532  
        satisfy @ref IoAwaitable and is consumed (moved-from) when
405  
        `when_all` is awaited.
533  
        `when_all` is awaited.
406  

534  

407  
    @return A task yielding a tuple of results in input order. Void tasks
535  
    @return A task yielding a tuple of results in input order. Void tasks
408  
        contribute std::monostate to preserve index correspondence.
536  
        contribute std::monostate to preserve index correspondence.
409  

537  

410  
    @par Example
538  
    @par Example
411  

539  

412  
    @code
540  
    @code
413  
    task<> example()
541  
    task<> example()
414  
    {
542  
    {
415  
        // Concurrent fetch, results collected in order
543  
        // Concurrent fetch, results collected in order
416  
        auto [user, posts] = co_await when_all(
544  
        auto [user, posts] = co_await when_all(
417  
            fetch_user( id ),      // task<User>
545  
            fetch_user( id ),      // task<User>
418  
            fetch_posts( id )      // task<std::vector<Post>>
546  
            fetch_posts( id )      // task<std::vector<Post>>
419  
        );
547  
        );
420  

548  

421  
        // Void awaitables contribute monostate
549  
        // Void awaitables contribute monostate
422  
        auto [a, _, b] = co_await when_all(
550  
        auto [a, _, b] = co_await when_all(
423  
            fetch_int(),           // task<int>
551  
            fetch_int(),           // task<int>
424  
            log_event( "start" ),  // task<void>  → monostate
552  
            log_event( "start" ),  // task<void>  → monostate
425  
            fetch_str()            // task<string>
553  
            fetch_str()            // task<string>
426  
        );
554  
        );
427  
        // a is int, _ is monostate, b is string
555  
        // a is int, _ is monostate, b is string
428  
    }
556  
    }
429  
    @endcode
557  
    @endcode
430  

558  

431  
    @see IoAwaitable, task
559  
    @see IoAwaitable, task
432  
*/
560  
*/
433  
template<IoAwaitable... As>
561  
template<IoAwaitable... As>
434  
[[nodiscard]] auto when_all(As... awaitables)
562  
[[nodiscard]] auto when_all(As... awaitables)
435  
    -> task<when_all_result_t<awaitable_result_t<As>...>>
563  
    -> task<when_all_result_t<awaitable_result_t<As>...>>
436  
{
564  
{
437  
    // State is stored in the coroutine frame, using the frame allocator
565  
    // State is stored in the coroutine frame, using the frame allocator
438  
    detail::when_all_state<awaitable_result_t<As>...> state;
566  
    detail::when_all_state<awaitable_result_t<As>...> state;
439  

567  

440  
    // Store awaitables in the frame
568  
    // Store awaitables in the frame
441  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
569  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
442  

570  

443  
    // Launch all awaitables and wait for completion
571  
    // Launch all awaitables and wait for completion
444  
    co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);
572  
    co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);
445  

573  

446  
    // Propagate first exception if any.
574  
    // Propagate first exception if any.
447  
    // Safe without explicit acquire: capture_exception() is sequenced-before
575  
    // Safe without explicit acquire: capture_exception() is sequenced-before
448  
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
576  
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
449  
    // last task's decrement that resumes this coroutine.
577  
    // last task's decrement that resumes this coroutine.
450 -
    if(state.first_exception_)
578 +
    if(state.core_.first_exception_)
451 -
        std::rethrow_exception(state.first_exception_);
579 +
        std::rethrow_exception(state.core_.first_exception_);
452  

580  

453  
    co_return detail::extract_results(state);
581  
    co_return detail::extract_results(state);
 
582 +
}
 
583 +

 
584 +
/** Execute a range of awaitables concurrently and collect their results.
 
585 +

 
586 +
    Launches all awaitables in the range simultaneously and waits for all
 
587 +
    to complete. Results are collected in a vector preserving input order.
 
588 +
    If any awaitable throws, cancellation is requested for siblings and
 
589 +
    the first exception is rethrown after all awaitables complete.
 
590 +

 
591 +
    @li All child awaitables run concurrently on the caller's executor
 
592 +
    @li Results are returned as a vector in input order
 
593 +
    @li First exception wins; subsequent exceptions are discarded
 
594 +
    @li Stop is requested for siblings on first error
 
595 +
    @li Completes only after all children have finished
 
596 +

 
597 +
    @par Thread Safety
 
598 +
    The returned task must be awaited from a single execution context.
 
599 +
    Child awaitables execute concurrently but complete through the caller's
 
600 +
    executor.
 
601 +

 
602 +
    @param awaitables Range of awaitables to execute concurrently (must
 
603 +
        not be empty). Each element must satisfy @ref IoAwaitable and is
 
604 +
        consumed (moved-from) when `when_all` is awaited.
 
605 +

 
606 +
    @return A task yielding a vector where each element is the result of
 
607 +
        the corresponding awaitable, in input order.
 
608 +

 
609 +
    @throws std::invalid_argument if range is empty (thrown before
 
610 +
        coroutine suspends).
 
611 +
    @throws Rethrows the first child exception after all children
 
612 +
        complete.
 
613 +

 
614 +
    @par Example
 
615 +
    @code
 
616 +
    task<void> example()
 
617 +
    {
 
618 +
        std::vector<task<Response>> requests;
 
619 +
        for (auto const& url : urls)
 
620 +
            requests.push_back(fetch(url));
 
621 +

 
622 +
        auto responses = co_await when_all(std::move(requests));
 
623 +
    }
 
624 +
    @endcode
 
625 +

 
626 +
    @see IoAwaitableRange, when_all
 
627 +
*/
 
628 +
template<IoAwaitableRange R>
 
629 +
    requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
 
630 +
[[nodiscard]] auto when_all(R&& awaitables)
 
631 +
    -> task<std::vector<awaitable_result_t<std::ranges::range_value_t<R>>>>
 
632 +
{
 
633 +
    using Awaitable = std::ranges::range_value_t<R>;
 
634 +
    using T = awaitable_result_t<Awaitable>;
 
635 +
    using OwnedRange = std::remove_cvref_t<R>;
 
636 +

 
637 +
    auto count = std::ranges::size(awaitables);
 
638 +
    if(count == 0)
 
639 +
        throw std::invalid_argument("when_all requires at least one awaitable");
 
640 +

 
641 +
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
 
642 +

 
643 +
    detail::when_all_homogeneous_state<T> state(count);
 
644 +

 
645 +
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
 
646 +
        &owned_awaitables, &state);
 
647 +

 
648 +
    if(state.core_.first_exception_)
 
649 +
        std::rethrow_exception(state.core_.first_exception_);
 
650 +

 
651 +
    std::vector<T> results;
 
652 +
    results.reserve(count);
 
653 +
    for(auto& opt : state.results_)
 
654 +
        results.push_back(std::move(*opt));
 
655 +

 
656 +
    co_return results;
 
657 +
}
 
658 +

 
659 +
/** Execute a range of void awaitables concurrently.
 
660 +

 
661 +
    Launches all awaitables in the range simultaneously and waits for all
 
662 +
    to complete. Since all awaitables return void, no results are collected.
 
663 +
    If any awaitable throws, cancellation is requested for siblings and
 
664 +
    the first exception is rethrown after all awaitables complete.
 
665 +

 
666 +
    @li All child awaitables run concurrently on the caller's executor
 
667 +
    @li First exception wins; subsequent exceptions are discarded
 
668 +
    @li Stop is requested for siblings on first error
 
669 +
    @li Completes only after all children have finished
 
670 +

 
671 +
    @par Thread Safety
 
672 +
    The returned task must be awaited from a single execution context.
 
673 +
    Child awaitables execute concurrently but complete through the caller's
 
674 +
    executor.
 
675 +

 
676 +
    @param awaitables Range of void awaitables to execute concurrently
 
677 +
        (must not be empty).
 
678 +

 
679 +
    @throws std::invalid_argument if range is empty (thrown before
 
680 +
        coroutine suspends).
 
681 +
    @throws Rethrows the first child exception after all children
 
682 +
        complete.
 
683 +

 
684 +
    @par Example
 
685 +
    @code
 
686 +
    task<void> example()
 
687 +
    {
 
688 +
        std::vector<task<void>> jobs;
 
689 +
        for (int i = 0; i < n; ++i)
 
690 +
            jobs.push_back(process(i));
 
691 +

 
692 +
        co_await when_all(std::move(jobs));
 
693 +
    }
 
694 +
    @endcode
 
695 +

 
696 +
    @see IoAwaitableRange, when_all
 
697 +
*/
 
698 +
template<IoAwaitableRange R>
 
699 +
    requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
 
700 +
[[nodiscard]] auto when_all(R&& awaitables) -> task<void>
 
701 +
{
 
702 +
    using OwnedRange = std::remove_cvref_t<R>;
 
703 +

 
704 +
    auto count = std::ranges::size(awaitables);
 
705 +
    if(count == 0)
 
706 +
        throw std::invalid_argument("when_all requires at least one awaitable");
 
707 +

 
708 +
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
 
709 +

 
710 +
    detail::when_all_homogeneous_state<void> state(count);
 
711 +

 
712 +
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
 
713 +
        &owned_awaitables, &state);
 
714 +

 
715 +
    if(state.core_.first_exception_)
 
716 +
        std::rethrow_exception(state.core_.first_exception_);
454  
}
717  
}
455  

718  

456  
} // namespace capy
719  
} // namespace capy
457  
} // namespace boost
720  
} // namespace boost
458  

721  

459  
#endif
722  
#endif