TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 : #define BOOST_CAPY_WHEN_ANY_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/void_to_monostate.hpp>
15 : #include <boost/capy/concept/executor.hpp>
16 : #include <boost/capy/concept/io_awaitable.hpp>
17 : #include <coroutine>
18 : #include <boost/capy/ex/executor_ref.hpp>
19 : #include <boost/capy/ex/frame_allocator.hpp>
20 : #include <boost/capy/ex/io_env.hpp>
21 : #include <boost/capy/task.hpp>
22 :
23 : #include <array>
24 : #include <atomic>
25 : #include <exception>
26 : #include <optional>
27 : #include <ranges>
28 : #include <stdexcept>
29 : #include <stop_token>
30 : #include <tuple>
31 : #include <type_traits>
32 : #include <utility>
33 : #include <variant>
34 : #include <vector>
35 :
36 : /*
37 : when_any - Race multiple tasks, return first completion
38 : ========================================================
39 :
40 : OVERVIEW:
41 : ---------
42 : when_any launches N tasks concurrently and completes when the FIRST task
43 : finishes (success or failure). It then requests stop for all siblings and
44 : waits for them to acknowledge before returning.
45 :
46 : ARCHITECTURE:
47 : -------------
48 : The design mirrors when_all but with inverted completion semantics:
49 :
50 : when_all: complete when remaining_count reaches 0 (all done)
51 : when_any: complete when has_winner becomes true (first done)
52 : BUT still wait for remaining_count to reach 0 for cleanup
53 :
54 : Key components:
55 : - when_any_state: Shared state tracking winner and completion
56 : - when_any_runner: Wrapper coroutine for each child task
57 : - when_any_launcher: Awaitable that starts all runners concurrently
58 :
59 : CRITICAL INVARIANTS:
60 : --------------------
61 : 1. Exactly one task becomes the winner (via atomic compare_exchange)
62 : 2. All tasks must complete before parent resumes (cleanup safety)
63 : 3. Stop is requested immediately when winner is determined
64 : 4. Only the winner's result/exception is stored
65 :
66 : POSITIONAL VARIANT:
67 : -------------------
68 : The variadic overload returns a std::variant with one alternative per
69 : input task, preserving positional correspondence. Use .index() on
70 : the variant to identify which task won.
71 :
72 : Example: when_any(task<int>, task<string>, task<int>)
73 : - Raw types after void->monostate: int, string, int
74 : - Result variant: std::variant<int, string, int>
75 : - variant.index() tells you which task won (0, 1, or 2)
76 :
77 : VOID HANDLING:
78 : --------------
79 : void tasks contribute std::monostate to the variant.
80 : All-void tasks result in: variant<monostate, monostate, monostate>
81 :
82 : MEMORY MODEL:
83 : -------------
84 : Synchronization chain from winner's write to parent's read:
85 :
86 : 1. Winner thread writes result_/winner_exception_ (non-atomic)
87 : 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
88 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
89 : → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
90 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
91 : 5. Parent coroutine resumes and reads result_/winner_exception_
92 :
93 : Synchronization analysis:
94 : - All fetch_sub operations on remaining_count_ form a release sequence
95 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
96 : in the modification order of remaining_count_
97 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
98 : modification order, establishing happens-before from winner's writes
99 : - Executor dispatch() is expected to provide queue-based synchronization
100 : (release-on-post, acquire-on-execute) completing the chain to parent
101 : - Even inline executors work (same thread = sequenced-before)
102 :
103 : Alternative considered: Adding winner_ready_ atomic (set with release after
104 : storing winner data, acquired before reading) would make synchronization
105 : self-contained and not rely on executor implementation details. Current
106 : approach is correct but requires careful reasoning about release sequences
107 : and executor behavior.
108 :
109 : EXCEPTION SEMANTICS:
110 : --------------------
111 : Unlike when_all (which captures first exception, discards others), when_any
112 : treats exceptions as valid completions. If the winning task threw, that
113 : exception is rethrown. Exceptions from non-winners are silently discarded.
114 : */
115 :
116 : namespace boost {
117 : namespace capy {
118 :
119 : namespace detail {
120 :
121 : /** Core shared state for when_any operations.
122 :
123 : Contains all members and methods common to both heterogeneous (variadic)
124 : and homogeneous (range) when_any implementations. State classes embed
125 : this via composition to avoid CRTP destructor ordering issues.
126 :
127 : @par Thread Safety
128 : Atomic operations protect winner selection and completion count.
129 : */
130 : struct when_any_core
131 : {
132 : std::atomic<std::size_t> remaining_count_;
133 : std::size_t winner_index_{0};
134 : std::exception_ptr winner_exception_;
135 : std::stop_source stop_source_;
136 :
137 : // Bridges parent's stop token to our stop_source
138 : struct stop_callback_fn
139 : {
140 : std::stop_source* source_;
141 HIT 9 : void operator()() const noexcept { source_->request_stop(); }
142 : };
143 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
144 : std::optional<stop_callback_t> parent_stop_callback_;
145 :
146 : std::coroutine_handle<> continuation_;
147 : io_env const* caller_env_ = nullptr;
148 :
149 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
150 : std::atomic<bool> has_winner_{false};
151 :
152 73 : explicit when_any_core(std::size_t count) noexcept
153 73 : : remaining_count_(count)
154 : {
155 73 : }
156 :
157 : /** Atomically claim winner status; exactly one task succeeds. */
158 206 : bool try_win(std::size_t index) noexcept
159 : {
160 206 : bool expected = false;
161 206 : if(has_winner_.compare_exchange_strong(
162 : expected, true, std::memory_order_acq_rel))
163 : {
164 73 : winner_index_ = index;
165 73 : stop_source_.request_stop();
166 73 : return true;
167 : }
168 133 : return false;
169 : }
170 :
171 : /** @pre try_win() returned true. */
172 9 : void set_winner_exception(std::exception_ptr ep) noexcept
173 : {
174 9 : winner_exception_ = ep;
175 9 : }
176 :
177 : // Runners signal completion directly via final_suspend; no member function needed.
178 : };
179 :
180 : /** Shared state for heterogeneous when_any operation.
181 :
182 : Coordinates winner selection, result storage, and completion tracking
183 : for all child tasks in a when_any operation. Uses composition with
184 : when_any_core for shared functionality.
185 :
186 : @par Lifetime
187 : Allocated on the parent coroutine's frame, outlives all runners.
188 :
189 : @tparam Ts Task result types.
190 : */
191 : template<typename... Ts>
192 : struct when_any_state
193 : {
194 : static constexpr std::size_t task_count = sizeof...(Ts);
195 : using variant_type = std::variant<void_to_monostate_t<Ts>...>;
196 :
197 : when_any_core core_;
198 : std::optional<variant_type> result_;
199 : std::array<std::coroutine_handle<>, task_count> runner_handles_{};
200 :
201 51 : when_any_state()
202 51 : : core_(task_count)
203 : {
204 51 : }
205 :
206 : // Runners self-destruct in final_suspend. No destruction needed here.
207 :
208 : /** @pre core_.try_win() returned true.
209 : @note Uses in_place_index (not type) for positional variant access.
210 : */
211 : template<std::size_t I, typename T>
212 37 : void set_winner_result(T value)
213 : noexcept(std::is_nothrow_move_constructible_v<T>)
214 : {
215 37 : result_.emplace(std::in_place_index<I>, std::move(value));
216 37 : }
217 :
218 : /** @pre core_.try_win() returned true. */
219 : template<std::size_t I>
220 8 : void set_winner_void() noexcept
221 : {
222 8 : result_.emplace(std::in_place_index<I>, std::monostate{});
223 8 : }
224 : };
225 :
226 : /** Wrapper coroutine that runs a single child task for when_any.
227 :
228 : Propagates executor/stop_token to the child, attempts to claim winner
229 : status on completion, and signals completion for cleanup coordination.
230 :
231 : @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
232 : */
233 : template<typename StateType>
234 : struct when_any_runner
235 : {
236 : struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
237 : {
238 : StateType* state_ = nullptr;
239 : std::size_t index_ = 0;
240 : io_env env_;
241 :
242 206 : when_any_runner get_return_object() noexcept
243 : {
244 206 : return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
245 : }
246 :
247 : // Starts suspended; launcher sets up state/ex/token then resumes
248 206 : std::suspend_always initial_suspend() noexcept
249 : {
250 206 : return {};
251 : }
252 :
253 206 : auto final_suspend() noexcept
254 : {
255 : struct awaiter
256 : {
257 : promise_type* p_;
258 206 : bool await_ready() const noexcept { return false; }
259 206 : auto await_suspend(std::coroutine_handle<> h) noexcept
260 : {
261 : // Extract everything needed before self-destruction.
262 206 : auto& core = p_->state_->core_;
263 206 : auto* counter = &core.remaining_count_;
264 206 : auto* caller_env = core.caller_env_;
265 206 : auto cont = core.continuation_;
266 :
267 206 : h.destroy();
268 :
269 : // If last runner, dispatch parent for symmetric transfer.
270 206 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
271 206 : if(remaining == 1)
272 73 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
273 133 : return detail::symmetric_transfer(std::noop_coroutine());
274 : }
275 MIS 0 : void await_resume() const noexcept {}
276 : };
277 HIT 206 : return awaiter{this};
278 : }
279 :
280 193 : void return_void() noexcept {}
281 :
282 : // Exceptions are valid completions in when_any (unlike when_all)
283 13 : void unhandled_exception()
284 : {
285 13 : if(state_->core_.try_win(index_))
286 9 : state_->core_.set_winner_exception(std::current_exception());
287 13 : }
288 :
289 : /** Injects executor and stop token into child awaitables. */
290 : template<class Awaitable>
291 : struct transform_awaiter
292 : {
293 : std::decay_t<Awaitable> a_;
294 : promise_type* p_;
295 :
296 206 : bool await_ready() { return a_.await_ready(); }
297 206 : auto await_resume() { return a_.await_resume(); }
298 :
299 : template<class Promise>
300 200 : auto await_suspend(std::coroutine_handle<Promise> h)
301 : {
302 : using R = decltype(a_.await_suspend(h, &p_->env_));
303 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
304 200 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
305 : else
306 : return a_.await_suspend(h, &p_->env_);
307 : }
308 : };
309 :
310 : template<class Awaitable>
311 206 : auto await_transform(Awaitable&& a)
312 : {
313 : using A = std::decay_t<Awaitable>;
314 : if constexpr (IoAwaitable<A>)
315 : {
316 : return transform_awaiter<Awaitable>{
317 412 : std::forward<Awaitable>(a), this};
318 : }
319 : else
320 : {
321 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
322 : }
323 206 : }
324 : };
325 :
326 : std::coroutine_handle<promise_type> h_;
327 :
328 206 : explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
329 206 : : h_(h)
330 : {
331 206 : }
332 :
333 : // Enable move for all clang versions - some versions need it
334 : when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
335 :
336 : // Non-copyable
337 : when_any_runner(when_any_runner const&) = delete;
338 : when_any_runner& operator=(when_any_runner const&) = delete;
339 : when_any_runner& operator=(when_any_runner&&) = delete;
340 :
341 206 : auto release() noexcept
342 : {
343 206 : return std::exchange(h_, nullptr);
344 : }
345 : };
346 :
347 : /** Indexed overload for heterogeneous when_any (compile-time index).
348 :
349 : Uses compile-time index I for variant construction via in_place_index.
350 : Called from when_any_launcher::launch_one<I>().
351 : */
352 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
353 : when_any_runner<StateType>
354 121 : make_when_any_runner(Awaitable inner, StateType* state)
355 : {
356 : using T = awaitable_result_t<Awaitable>;
357 : if constexpr (std::is_void_v<T>)
358 : {
359 : co_await std::move(inner);
360 : if(state->core_.try_win(I))
361 : state->template set_winner_void<I>();
362 : }
363 : else
364 : {
365 : auto result = co_await std::move(inner);
366 : if(state->core_.try_win(I))
367 : {
368 : try
369 : {
370 : state->template set_winner_result<I>(std::move(result));
371 : }
372 : catch(...)
373 : {
374 : state->core_.set_winner_exception(std::current_exception());
375 : }
376 : }
377 : }
378 242 : }
379 :
380 : /** Runtime-index overload for homogeneous when_any (range path).
381 :
382 : Uses requires-expressions to detect state capabilities:
383 : - set_winner_void(): for heterogeneous void tasks (stores monostate)
384 : - set_winner_result(): for non-void tasks
385 : - Neither: for homogeneous void tasks (no result storage)
386 : */
387 : template<IoAwaitable Awaitable, typename StateType>
388 : when_any_runner<StateType>
389 85 : make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
390 : {
391 : using T = awaitable_result_t<Awaitable>;
392 : if constexpr (std::is_void_v<T>)
393 : {
394 : co_await std::move(inner);
395 : if(state->core_.try_win(index))
396 : {
397 : if constexpr (requires { state->set_winner_void(); })
398 : state->set_winner_void();
399 : }
400 : }
401 : else
402 : {
403 : auto result = co_await std::move(inner);
404 : if(state->core_.try_win(index))
405 : {
406 : try
407 : {
408 : state->set_winner_result(std::move(result));
409 : }
410 : catch(...)
411 : {
412 : state->core_.set_winner_exception(std::current_exception());
413 : }
414 : }
415 : }
416 170 : }
417 :
418 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
419 : template<IoAwaitable... Awaitables>
420 : class when_any_launcher
421 : {
422 : using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
423 :
424 : std::tuple<Awaitables...>* tasks_;
425 : state_type* state_;
426 :
427 : public:
428 51 : when_any_launcher(
429 : std::tuple<Awaitables...>* tasks,
430 : state_type* state)
431 51 : : tasks_(tasks)
432 51 : , state_(state)
433 : {
434 51 : }
435 :
436 51 : bool await_ready() const noexcept
437 : {
438 51 : return sizeof...(Awaitables) == 0;
439 : }
440 :
441 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
442 : destroys this object before await_suspend returns. Must not reference
443 : `this` after the final launch_one call.
444 : */
445 51 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
446 : {
447 51 : state_->core_.continuation_ = continuation;
448 51 : state_->core_.caller_env_ = caller_env;
449 :
450 51 : if(caller_env->stop_token.stop_possible())
451 : {
452 18 : state_->core_.parent_stop_callback_.emplace(
453 9 : caller_env->stop_token,
454 9 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
455 :
456 9 : if(caller_env->stop_token.stop_requested())
457 3 : state_->core_.stop_source_.request_stop();
458 : }
459 :
460 51 : auto token = state_->core_.stop_source_.get_token();
461 102 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
462 51 : (..., launch_one<Is>(caller_env->executor, token));
463 51 : }(std::index_sequence_for<Awaitables...>{});
464 :
465 102 : return std::noop_coroutine();
466 51 : }
467 :
468 51 : void await_resume() const noexcept
469 : {
470 51 : }
471 :
472 : private:
473 : /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
474 : template<std::size_t I>
475 121 : void launch_one(executor_ref caller_ex, std::stop_token token)
476 : {
477 121 : auto runner = make_when_any_runner<I>(
478 121 : std::move(std::get<I>(*tasks_)), state_);
479 :
480 121 : auto h = runner.release();
481 121 : h.promise().state_ = state_;
482 121 : h.promise().index_ = I;
483 121 : h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
484 :
485 121 : std::coroutine_handle<> ch{h};
486 121 : state_->runner_handles_[I] = ch;
487 121 : caller_ex.post(ch);
488 242 : }
489 : };
490 :
491 : } // namespace detail
492 :
493 : /** Wait for the first awaitable to complete.
494 :
495 : Races multiple heterogeneous awaitables concurrently and returns when the
496 : first one completes. The result is a variant with one alternative per
497 : input task, preserving positional correspondence.
498 :
499 : @par Suspends
500 : The calling coroutine suspends when co_await is invoked. All awaitables
501 : are launched concurrently and execute in parallel. The coroutine resumes
502 : only after all awaitables have completed, even though the winner is
503 : determined by the first to finish.
504 :
505 : @par Completion Conditions
506 : @li Winner is determined when the first awaitable completes (success or exception)
507 : @li Only one task can claim winner status via atomic compare-exchange
508 : @li Once a winner exists, stop is requested for all remaining siblings
509 : @li Parent coroutine resumes only after all siblings acknowledge completion
510 : @li The winner's result is returned; if the winner threw, the exception is rethrown
511 :
512 : @par Cancellation Semantics
513 : Cancellation is supported via stop_token propagated through the
514 : IoAwaitable protocol:
515 : @li Each child awaitable receives a stop_token derived from a shared stop_source
516 : @li When the parent's stop token is activated, the stop is forwarded to all children
517 : @li When a winner is determined, stop_source_.request_stop() is called immediately
518 : @li Siblings must handle cancellation gracefully and complete before parent resumes
519 : @li Stop requests are cooperative; tasks must check and respond to them
520 :
521 : @par Concurrency/Overlap
522 : All awaitables are launched concurrently before any can complete.
523 : The launcher iterates through the arguments, starting each task on the
524 : caller's executor. Tasks may execute in parallel on multi-threaded
525 : executors or interleave on single-threaded executors. There is no
526 : guaranteed ordering of task completion.
527 :
528 : @par Notable Error Conditions
529 : @li Winner exception: if the winning task threw, that exception is rethrown
530 : @li Non-winner exceptions: silently discarded (only winner's result matters)
531 : @li Cancellation: tasks may complete via cancellation without throwing
532 :
533 : @par Example
534 : @code
535 : task<void> example() {
536 : auto result = co_await when_any(
537 : fetch_int(), // task<int>
538 : fetch_string() // task<std::string>
539 : );
540 : // result.index() is 0 or 1
541 : if (result.index() == 0)
542 : std::cout << "Got int: " << std::get<0>(result) << "\n";
543 : else
544 : std::cout << "Got string: " << std::get<1>(result) << "\n";
545 : }
546 : @endcode
547 :
548 : @param as Awaitables to race concurrently (at least one required; each
549 : must satisfy IoAwaitable).
550 : @return A task yielding a std::variant with one alternative per awaitable.
551 : Use .index() to identify the winner. Void awaitables contribute
552 : std::monostate.
553 :
554 : @throws Rethrows the winner's exception if the winning task threw an exception.
555 :
556 : @par Remarks
557 : Awaitables are moved into the coroutine frame; original objects become
558 : empty after the call. The variant preserves one alternative per input
559 : task. Use .index() to determine which awaitable completed first.
560 : Void awaitables contribute std::monostate to the variant.
561 :
562 : @see when_all, IoAwaitable
563 : */
564 : template<IoAwaitable... As>
565 : requires (sizeof...(As) > 0)
566 51 : [[nodiscard]] auto when_any(As... as)
567 : -> task<std::variant<void_to_monostate_t<awaitable_result_t<As>>...>>
568 : {
569 : detail::when_any_state<awaitable_result_t<As>...> state;
570 : std::tuple<As...> awaitable_tuple(std::move(as)...);
571 :
572 : co_await detail::when_any_launcher<As...>(&awaitable_tuple, &state);
573 :
574 : if(state.core_.winner_exception_)
575 : std::rethrow_exception(state.core_.winner_exception_);
576 :
577 : co_return std::move(*state.result_);
578 102 : }
579 :
580 : namespace detail {
581 :
582 : /** Shared state for homogeneous when_any (range overload).
583 :
584 : Uses composition with when_any_core for shared functionality.
585 : Simpler than heterogeneous: optional<T> instead of variant, vector
586 : instead of array for runner handles.
587 : */
588 : template<typename T>
589 : struct when_any_homogeneous_state
590 : {
591 : when_any_core core_;
592 : std::optional<T> result_;
593 : std::vector<std::coroutine_handle<>> runner_handles_;
594 :
595 19 : explicit when_any_homogeneous_state(std::size_t count)
596 19 : : core_(count)
597 38 : , runner_handles_(count)
598 : {
599 19 : }
600 :
601 : // Runners self-destruct in final_suspend. No destruction needed here.
602 :
603 : /** @pre core_.try_win() returned true. */
604 17 : void set_winner_result(T value)
605 : noexcept(std::is_nothrow_move_constructible_v<T>)
606 : {
607 17 : result_.emplace(std::move(value));
608 17 : }
609 : };
610 :
611 : /** Specialization for void tasks (no result storage needed). */
612 : template<>
613 : struct when_any_homogeneous_state<void>
614 : {
615 : when_any_core core_;
616 : std::vector<std::coroutine_handle<>> runner_handles_;
617 :
618 3 : explicit when_any_homogeneous_state(std::size_t count)
619 3 : : core_(count)
620 6 : , runner_handles_(count)
621 : {
622 3 : }
623 :
624 : // Runners self-destruct in final_suspend. No destruction needed here.
625 :
626 : // No set_winner_result - void tasks have no result to store
627 : };
628 :
629 : /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
630 : template<IoAwaitableRange Range>
631 : class when_any_homogeneous_launcher
632 : {
633 : using Awaitable = std::ranges::range_value_t<Range>;
634 : using T = awaitable_result_t<Awaitable>;
635 :
636 : Range* range_;
637 : when_any_homogeneous_state<T>* state_;
638 :
639 : public:
640 22 : when_any_homogeneous_launcher(
641 : Range* range,
642 : when_any_homogeneous_state<T>* state)
643 22 : : range_(range)
644 22 : , state_(state)
645 : {
646 22 : }
647 :
648 22 : bool await_ready() const noexcept
649 : {
650 22 : return std::ranges::empty(*range_);
651 : }
652 :
653 : /** CRITICAL: If the last task finishes synchronously, parent resumes and
654 : destroys this object before await_suspend returns. Must not reference
655 : `this` after dispatching begins.
656 :
657 : Two-phase approach:
658 : 1. Create all runners (safe - no dispatch yet)
659 : 2. Dispatch all runners (any may complete synchronously)
660 : */
661 22 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
662 : {
663 22 : state_->core_.continuation_ = continuation;
664 22 : state_->core_.caller_env_ = caller_env;
665 :
666 22 : if(caller_env->stop_token.stop_possible())
667 : {
668 14 : state_->core_.parent_stop_callback_.emplace(
669 7 : caller_env->stop_token,
670 7 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
671 :
672 7 : if(caller_env->stop_token.stop_requested())
673 4 : state_->core_.stop_source_.request_stop();
674 : }
675 :
676 22 : auto token = state_->core_.stop_source_.get_token();
677 :
678 : // Phase 1: Create all runners without dispatching.
679 : // This iterates over *range_ safely because no runners execute yet.
680 22 : std::size_t index = 0;
681 107 : for(auto&& a : *range_)
682 : {
683 85 : auto runner = make_when_any_runner(
684 85 : std::move(a), state_, index);
685 :
686 85 : auto h = runner.release();
687 85 : h.promise().state_ = state_;
688 85 : h.promise().index_ = index;
689 85 : h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
690 :
691 85 : state_->runner_handles_[index] = std::coroutine_handle<>{h};
692 85 : ++index;
693 : }
694 :
695 : // Phase 2: Post all runners. Any may complete synchronously.
696 : // After last post, state_ and this may be destroyed.
697 : // Use raw pointer/count captured before posting.
698 22 : std::coroutine_handle<>* handles = state_->runner_handles_.data();
699 22 : std::size_t count = state_->runner_handles_.size();
700 107 : for(std::size_t i = 0; i < count; ++i)
701 85 : caller_env->executor.post(handles[i]);
702 :
703 44 : return std::noop_coroutine();
704 107 : }
705 :
706 22 : void await_resume() const noexcept
707 : {
708 22 : }
709 : };
710 :
711 : } // namespace detail
712 :
713 : /** Wait for the first awaitable to complete (range overload).
714 :
715 : Races a range of awaitables with the same result type. Accepts any
716 : sized input range of IoAwaitable types, enabling use with arrays,
717 : spans, or custom containers.
718 :
719 : @par Suspends
720 : The calling coroutine suspends when co_await is invoked. All awaitables
721 : in the range are launched concurrently and execute in parallel. The
722 : coroutine resumes only after all awaitables have completed, even though
723 : the winner is determined by the first to finish.
724 :
725 : @par Completion Conditions
726 : @li Winner is determined when the first awaitable completes (success or exception)
727 : @li Only one task can claim winner status via atomic compare-exchange
728 : @li Once a winner exists, stop is requested for all remaining siblings
729 : @li Parent coroutine resumes only after all siblings acknowledge completion
730 : @li The winner's index and result are returned; if the winner threw, the exception is rethrown
731 :
732 : @par Cancellation Semantics
733 : Cancellation is supported via stop_token propagated through the
734 : IoAwaitable protocol:
735 : @li Each child awaitable receives a stop_token derived from a shared stop_source
736 : @li When the parent's stop token is activated, the stop is forwarded to all children
737 : @li When a winner is determined, stop_source_.request_stop() is called immediately
738 : @li Siblings must handle cancellation gracefully and complete before parent resumes
739 : @li Stop requests are cooperative; tasks must check and respond to them
740 :
741 : @par Concurrency/Overlap
742 : All awaitables are launched concurrently before any can complete.
743 : The launcher iterates through the range, starting each task on the
744 : caller's executor. Tasks may execute in parallel on multi-threaded
745 : executors or interleave on single-threaded executors. There is no
746 : guaranteed ordering of task completion.
747 :
748 : @par Notable Error Conditions
749 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
750 : @li Winner exception: if the winning task threw, that exception is rethrown
751 : @li Non-winner exceptions: silently discarded (only winner's result matters)
752 : @li Cancellation: tasks may complete via cancellation without throwing
753 :
754 : @par Example
755 : @code
756 : task<void> example() {
757 : std::array<task<Response>, 3> requests = {
758 : fetch_from_server(0),
759 : fetch_from_server(1),
760 : fetch_from_server(2)
761 : };
762 :
763 : auto [index, response] = co_await when_any(std::move(requests));
764 : }
765 : @endcode
766 :
767 : @par Example with Vector
768 : @code
769 : task<Response> fetch_fastest(std::vector<Server> const& servers) {
770 : std::vector<task<Response>> requests;
771 : for (auto const& server : servers)
772 : requests.push_back(fetch_from(server));
773 :
774 : auto [index, response] = co_await when_any(std::move(requests));
775 : co_return response;
776 : }
777 : @endcode
778 :
779 : @tparam R Range type satisfying IoAwaitableRange.
780 : @param awaitables Range of awaitables to race concurrently (must not be empty).
781 : @return A task yielding a pair of (winner_index, result).
782 :
783 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
784 : @throws Rethrows the winner's exception if the winning task threw an exception.
785 :
786 : @par Remarks
787 : Elements are moved from the range; for lvalue ranges, the original
788 : container will have moved-from elements after this call. The range
789 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
790 : the variadic overload, no variant wrapper is needed since all tasks
791 : share the same return type.
792 :
793 : @see when_any, IoAwaitableRange
794 : */
795 : template<IoAwaitableRange R>
796 : requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
797 21 : [[nodiscard]] auto when_any(R&& awaitables)
798 : -> task<std::pair<std::size_t, awaitable_result_t<std::ranges::range_value_t<R>>>>
799 : {
800 : using Awaitable = std::ranges::range_value_t<R>;
801 : using T = awaitable_result_t<Awaitable>;
802 : using result_type = std::pair<std::size_t, T>;
803 : using OwnedRange = std::remove_cvref_t<R>;
804 :
805 : auto count = std::ranges::size(awaitables);
806 : if(count == 0)
807 : throw std::invalid_argument("when_any requires at least one awaitable");
808 :
809 : // Move/copy range onto coroutine frame to ensure lifetime
810 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
811 :
812 : detail::when_any_homogeneous_state<T> state(count);
813 :
814 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
815 :
816 : if(state.core_.winner_exception_)
817 : std::rethrow_exception(state.core_.winner_exception_);
818 :
819 : co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
820 42 : }
821 :
822 : /** Wait for the first awaitable to complete (void range overload).
823 :
824 : Races a range of void-returning awaitables. Since void awaitables have
825 : no result value, only the winner's index is returned.
826 :
827 : @par Suspends
828 : The calling coroutine suspends when co_await is invoked. All awaitables
829 : in the range are launched concurrently and execute in parallel. The
830 : coroutine resumes only after all awaitables have completed, even though
831 : the winner is determined by the first to finish.
832 :
833 : @par Completion Conditions
834 : @li Winner is determined when the first awaitable completes (success or exception)
835 : @li Only one task can claim winner status via atomic compare-exchange
836 : @li Once a winner exists, stop is requested for all remaining siblings
837 : @li Parent coroutine resumes only after all siblings acknowledge completion
838 : @li The winner's index is returned; if the winner threw, the exception is rethrown
839 :
840 : @par Cancellation Semantics
841 : Cancellation is supported via stop_token propagated through the
842 : IoAwaitable protocol:
843 : @li Each child awaitable receives a stop_token derived from a shared stop_source
844 : @li When the parent's stop token is activated, the stop is forwarded to all children
845 : @li When a winner is determined, stop_source_.request_stop() is called immediately
846 : @li Siblings must handle cancellation gracefully and complete before parent resumes
847 : @li Stop requests are cooperative; tasks must check and respond to them
848 :
849 : @par Concurrency/Overlap
850 : All awaitables are launched concurrently before any can complete.
851 : The launcher iterates through the range, starting each task on the
852 : caller's executor. Tasks may execute in parallel on multi-threaded
853 : executors or interleave on single-threaded executors. There is no
854 : guaranteed ordering of task completion.
855 :
856 : @par Notable Error Conditions
857 : @li Empty range: throws std::invalid_argument immediately (not via co_return)
858 : @li Winner exception: if the winning task threw, that exception is rethrown
859 : @li Non-winner exceptions: silently discarded (only winner's result matters)
860 : @li Cancellation: tasks may complete via cancellation without throwing
861 :
862 : @par Example
863 : @code
864 : task<void> example() {
865 : std::vector<task<void>> tasks;
866 : for (int i = 0; i < 5; ++i)
867 : tasks.push_back(background_work(i));
868 :
869 : std::size_t winner = co_await when_any(std::move(tasks));
870 : // winner is the index of the first task to complete
871 : }
872 : @endcode
873 :
874 : @par Example with Timeout
875 : @code
876 : task<void> with_timeout() {
877 : std::vector<task<void>> tasks;
878 : tasks.push_back(long_running_operation());
879 : tasks.push_back(delay(std::chrono::seconds(5)));
880 :
881 : std::size_t winner = co_await when_any(std::move(tasks));
882 : if (winner == 1) {
883 : // Timeout occurred
884 : }
885 : }
886 : @endcode
887 :
888 : @tparam R Range type satisfying IoAwaitableRange with void result.
889 : @param awaitables Range of void awaitables to race concurrently (must not be empty).
890 : @return A task yielding the winner's index (zero-based).
891 :
892 : @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
893 : @throws Rethrows the winner's exception if the winning task threw an exception.
894 :
895 : @par Remarks
896 : Elements are moved from the range; for lvalue ranges, the original
897 : container will have moved-from elements after this call. The range
898 : is moved onto the coroutine frame to ensure lifetime safety. Unlike
899 : the non-void overload, no result storage is needed since void tasks
900 : produce no value.
901 :
902 : @see when_any, IoAwaitableRange
903 : */
904 : template<IoAwaitableRange R>
905 : requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
906 3 : [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
907 : {
908 : using OwnedRange = std::remove_cvref_t<R>;
909 :
910 : auto count = std::ranges::size(awaitables);
911 : if(count == 0)
912 : throw std::invalid_argument("when_any requires at least one awaitable");
913 :
914 : // Move/copy range onto coroutine frame to ensure lifetime
915 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
916 :
917 : detail::when_any_homogeneous_state<void> state(count);
918 :
919 : co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
920 :
921 : if(state.core_.winner_exception_)
922 : std::rethrow_exception(state.core_.winner_exception_);
923 :
924 : co_return state.core_.winner_index_;
925 6 : }
926 :
927 : } // namespace capy
928 : } // namespace boost
929 :
930 : #endif
|