TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
12 : #define BOOST_CAPY_WHEN_ANY_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/detail/io_result_combinators.hpp>
16 : #include <boost/capy/continuation.hpp>
17 : #include <boost/capy/concept/executor.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <coroutine>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/ex/frame_alloc_mixin.hpp>
22 : #include <boost/capy/ex/frame_allocator.hpp>
23 : #include <boost/capy/ex/io_env.hpp>
24 : #include <boost/capy/task.hpp>
25 :
26 : #include <array>
27 : #include <atomic>
28 : #include <exception>
29 : #include <memory>
30 : #include <mutex>
31 : #include <optional>
32 : #include <ranges>
33 : #include <stdexcept>
34 : #include <stop_token>
35 : #include <tuple>
36 : #include <type_traits>
37 : #include <utility>
38 : #include <variant>
39 : #include <vector>
40 :
41 : /*
42 : when_any - Race multiple io_result tasks, select first success
43 : =============================================================
44 :
45 : OVERVIEW:
46 : ---------
47 : when_any launches N io_result-returning tasks concurrently. A task
48 : wins by returning !ec; errors and exceptions do not win. Once a
49 : winner is found, stop is requested for siblings and the winner's
50 : payload is returned. If no winner exists (all fail), the first
51 : error_code is returned or the last exception is rethrown.
52 :
53 : ARCHITECTURE:
54 : -------------
55 : The design mirrors when_all but with inverted completion semantics:
56 :
57 : when_all: complete when remaining_count reaches 0 (all done)
58 : when_any: complete when has_winner becomes true (first done)
59 : BUT still wait for remaining_count to reach 0 for cleanup
60 :
61 : Key components:
62 : - when_any_core: Shared state tracking winner and completion
63 : - when_any_io_runner: Wrapper coroutine for each child task
64 : - when_any_io_launcher/when_any_io_homogeneous_launcher:
65 : Awaitables that start all runners concurrently
66 :
67 : CRITICAL INVARIANTS:
68 : --------------------
69 : 1. Only a task returning !ec can become the winner (via atomic CAS)
70 : 2. All tasks must complete before parent resumes (cleanup safety)
71 : 3. Stop is requested immediately when winner is determined
72 : 4. Exceptions and errors do not claim winner status
73 :
74 : POSITIONAL VARIANT:
75 : -------------------
76 : The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
77 : Index 0 is error_code (failure/no-winner). Index 1..N identifies the
78 : winning child and carries its payload.
79 :
80 : RANGE OVERLOAD:
81 : ---------------
82 : The range overload returns variant<error_code, pair<size_t, T>> for
83 : non-void children or variant<error_code, size_t> for void children.
84 :
85 : MEMORY MODEL:
86 : -------------
87 : Synchronization chain from winner's write to parent's read:
88 :
89 : 1. Winner thread writes result_ (non-atomic)
90 : 2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
91 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
92 : -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
93 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
94 : 5. Parent coroutine resumes and reads result_
95 :
96 : Synchronization analysis:
97 : - All fetch_sub operations on remaining_count_ form a release sequence
98 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
99 : in the modification order of remaining_count_
100 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
101 : modification order, establishing happens-before from winner's writes
102 : - Executor dispatch() is expected to provide queue-based synchronization
103 : (release-on-post, acquire-on-execute) completing the chain to parent
104 : - Even inline executors work (same thread = sequenced-before)
105 :
106 : EXCEPTION SEMANTICS:
107 : --------------------
108 : Exceptions do NOT claim winner status. If a child throws, the exception
109 : is recorded but the combinator keeps waiting for a success. Only when
110 : all children complete without a winner does the combinator check: if
111 : any exception was recorded, it is rethrown (exception beats error_code).
112 : */
113 :
114 : namespace boost {
115 : namespace capy {
116 :
117 : namespace detail {
118 :
119 : /** Core shared state for when_any operations.
120 :
121 : Contains all members and methods common to both heterogeneous (variadic)
122 : and homogeneous (range) when_any implementations. State classes embed
123 : this via composition to avoid CRTP destructor ordering issues.
124 :
125 : @par Thread Safety
126 : Atomic operations protect winner selection and completion count.
127 : */
128 : struct when_any_core
129 : {
130 : std::atomic<std::size_t> remaining_count_;
131 : std::size_t winner_index_{0};
132 : std::exception_ptr winner_exception_;
133 : std::stop_source stop_source_;
134 :
135 : // Bridges parent's stop token to our stop_source
136 : struct stop_callback_fn
137 : {
138 : std::stop_source* source_;
139 HIT 3 : void operator()() const noexcept { source_->request_stop(); }
140 : };
141 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
142 : std::optional<stop_callback_t> parent_stop_callback_;
143 :
144 : continuation continuation_;
145 : io_env const* caller_env_ = nullptr;
146 :
147 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
148 : std::atomic<bool> has_winner_{false};
149 :
150 34 : explicit when_any_core(std::size_t count) noexcept
151 34 : : remaining_count_(count)
152 : {
153 34 : }
154 :
155 : /** Atomically claim winner status; exactly one task succeeds. */
156 53 : bool try_win(std::size_t index) noexcept
157 : {
158 53 : bool expected = false;
159 53 : if(has_winner_.compare_exchange_strong(
160 : expected, true, std::memory_order_acq_rel))
161 : {
162 23 : winner_index_ = index;
163 23 : stop_source_.request_stop();
164 23 : return true;
165 : }
166 30 : return false;
167 : }
168 :
169 : /** @pre try_win() returned true. */
170 1 : void set_winner_exception(std::exception_ptr ep) noexcept
171 : {
172 1 : winner_exception_ = ep;
173 1 : }
174 :
175 : // Runners signal completion directly via final_suspend; no member function needed.
176 : };
177 :
178 : } // namespace detail
179 :
180 : namespace detail {
181 :
182 : // State for io_result-aware when_any: only !ec wins.
183 : template<typename... Ts>
184 : struct when_any_io_state
185 : {
186 : static constexpr std::size_t task_count = sizeof...(Ts);
187 : using variant_type = std::variant<std::error_code, Ts...>;
188 :
189 : when_any_core core_;
190 : std::optional<variant_type> result_;
191 : std::array<continuation, task_count> runner_handles_{};
192 :
193 : // Last failure (error or exception) for the all-fail case.
194 : // Last writer wins — no priority between errors and exceptions.
195 : std::mutex failure_mu_;
196 : std::error_code last_error_;
197 : std::exception_ptr last_exception_;
198 :
199 18 : when_any_io_state()
200 18 : : core_(task_count)
201 : {
202 18 : }
203 :
204 14 : void record_error(std::error_code ec)
205 : {
206 14 : std::lock_guard lk(failure_mu_);
207 14 : last_error_ = ec;
208 14 : last_exception_ = nullptr;
209 14 : }
210 :
211 7 : void record_exception(std::exception_ptr ep)
212 : {
213 7 : std::lock_guard lk(failure_mu_);
214 7 : last_exception_ = ep;
215 7 : last_error_ = {};
216 7 : }
217 : };
218 :
219 : // Wrapper coroutine for io_result-aware when_any children.
220 : // unhandled_exception records the exception but does NOT claim winner status.
221 : template<typename StateType>
222 : struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_any_io_runner
223 : {
224 : struct promise_type
225 : : frame_alloc_mixin
226 : {
227 : StateType* state_ = nullptr;
228 : std::size_t index_ = 0;
229 : io_env env_;
230 :
231 87 : when_any_io_runner get_return_object() noexcept
232 : {
233 : return when_any_io_runner(
234 87 : std::coroutine_handle<promise_type>::from_promise(*this));
235 : }
236 :
237 87 : std::suspend_always initial_suspend() noexcept { return {}; }
238 :
239 87 : auto final_suspend() noexcept
240 : {
241 : struct awaiter
242 : {
243 : promise_type* p_;
244 87 : bool await_ready() const noexcept { return false; }
245 87 : auto await_suspend(std::coroutine_handle<> h) noexcept
246 : {
247 87 : auto& core = p_->state_->core_;
248 87 : auto* counter = &core.remaining_count_;
249 87 : auto* caller_env = core.caller_env_;
250 87 : auto& cont = core.continuation_;
251 :
252 87 : h.destroy();
253 :
254 87 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
255 87 : if(remaining == 1)
256 34 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
257 53 : return detail::symmetric_transfer(std::noop_coroutine());
258 : }
259 : void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed
260 : };
261 87 : return awaiter{this};
262 : }
263 :
264 74 : void return_void() noexcept {}
265 :
266 : // Exceptions do NOT win in io_result when_any
267 13 : void unhandled_exception() noexcept
268 : {
269 13 : state_->record_exception(std::current_exception());
270 13 : }
271 :
272 : template<class Awaitable>
273 : struct transform_awaiter
274 : {
275 : std::decay_t<Awaitable> a_;
276 : promise_type* p_;
277 :
278 87 : bool await_ready() { return a_.await_ready(); }
279 87 : decltype(auto) await_resume() { return a_.await_resume(); }
280 :
281 : template<class Promise>
282 86 : auto await_suspend(std::coroutine_handle<Promise> h)
283 : {
284 : using R = decltype(a_.await_suspend(h, &p_->env_));
285 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
286 86 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
287 : else
288 : return a_.await_suspend(h, &p_->env_);
289 : }
290 : };
291 :
292 : template<class Awaitable>
293 87 : auto await_transform(Awaitable&& a)
294 : {
295 : using A = std::decay_t<Awaitable>;
296 : if constexpr (IoAwaitable<A>)
297 : {
298 : return transform_awaiter<Awaitable>{
299 172 : std::forward<Awaitable>(a), this};
300 : }
301 : else
302 : {
303 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
304 : }
305 85 : }
306 : };
307 :
308 : std::coroutine_handle<promise_type> h_;
309 :
310 87 : explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
311 87 : : h_(h)
312 : {
313 87 : }
314 :
315 : when_any_io_runner(when_any_io_runner&& other) noexcept
316 : : h_(std::exchange(other.h_, nullptr))
317 : {
318 : }
319 :
320 : when_any_io_runner(when_any_io_runner const&) = delete;
321 : when_any_io_runner& operator=(when_any_io_runner const&) = delete;
322 : when_any_io_runner& operator=(when_any_io_runner&&) = delete;
323 :
324 87 : auto release() noexcept
325 : {
326 87 : return std::exchange(h_, nullptr);
327 : }
328 : };
329 :
330 : // Runner coroutine: only tries to win when the child returns !ec.
331 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
332 : when_any_io_runner<StateType>
333 33 : make_when_any_io_runner(Awaitable inner, StateType* state)
334 : {
335 : auto result = co_await std::move(inner);
336 :
337 : if(!result.ec)
338 : {
339 : // Success: try to claim winner
340 : if(state->core_.try_win(I))
341 : {
342 : try
343 : {
344 : state->result_.emplace(
345 : std::in_place_index<I + 1>,
346 : detail::extract_io_payload(std::move(result)));
347 : }
348 : catch(...)
349 : {
350 : state->core_.set_winner_exception(std::current_exception());
351 : }
352 : }
353 : }
354 : else
355 : {
356 : // Error: record but don't win
357 : state->record_error(result.ec);
358 : }
359 66 : }
360 :
361 : // Launcher for io_result-aware when_any.
362 : template<IoAwaitable... Awaitables>
363 : class when_any_io_launcher
364 : {
365 : using state_type = when_any_io_state<
366 : io_result_payload_t<awaitable_result_t<Awaitables>>...>;
367 :
368 : std::tuple<Awaitables...>* tasks_;
369 : state_type* state_;
370 :
371 : public:
372 18 : when_any_io_launcher(
373 : std::tuple<Awaitables...>* tasks,
374 : state_type* state)
375 18 : : tasks_(tasks)
376 18 : , state_(state)
377 : {
378 18 : }
379 :
380 18 : bool await_ready() const noexcept
381 : {
382 18 : return sizeof...(Awaitables) == 0;
383 : }
384 :
385 18 : std::coroutine_handle<> await_suspend(
386 : std::coroutine_handle<> continuation, io_env const* caller_env)
387 : {
388 18 : state_->core_.continuation_.h = continuation;
389 18 : state_->core_.caller_env_ = caller_env;
390 :
391 18 : if(caller_env->stop_token.stop_possible())
392 : {
393 4 : state_->core_.parent_stop_callback_.emplace(
394 2 : caller_env->stop_token,
395 2 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
396 :
397 2 : if(caller_env->stop_token.stop_requested())
398 1 : state_->core_.stop_source_.request_stop();
399 : }
400 :
401 18 : auto token = state_->core_.stop_source_.get_token();
402 18 : launch_all(std::index_sequence_for<Awaitables...>{},
403 : caller_env->executor, token);
404 :
405 36 : return std::noop_coroutine();
406 18 : }
407 :
408 18 : void await_resume() const noexcept {}
409 :
410 : private:
411 : template<std::size_t... Is>
412 18 : void launch_all(std::index_sequence<Is...>,
413 : executor_ref ex, std::stop_token token)
414 : {
415 18 : (..., launch_one<Is>(ex, token));
416 18 : }
417 :
418 : template<std::size_t I>
419 33 : void launch_one(executor_ref caller_ex, std::stop_token token)
420 : {
421 33 : auto runner = make_when_any_io_runner<I>(
422 33 : std::move(std::get<I>(*tasks_)), state_);
423 :
424 33 : auto h = runner.release();
425 33 : h.promise().state_ = state_;
426 33 : h.promise().index_ = I;
427 33 : h.promise().env_ = io_env{caller_ex, token,
428 33 : state_->core_.caller_env_->frame_allocator};
429 :
430 33 : state_->runner_handles_[I].h = std::coroutine_handle<>{h};
431 33 : caller_ex.post(state_->runner_handles_[I]);
432 66 : }
433 : };
434 :
435 : /** Shared state for homogeneous io_result-aware when_any (range overload).
436 :
437 : @tparam T The payload type extracted from io_result.
438 : */
439 : template<typename T>
440 : struct when_any_io_homogeneous_state
441 : {
442 : when_any_core core_;
443 : std::optional<T> result_;
444 : std::unique_ptr<continuation[]> runner_handles_;
445 :
446 : std::mutex failure_mu_;
447 : std::error_code last_error_;
448 : std::exception_ptr last_exception_;
449 :
450 13 : explicit when_any_io_homogeneous_state(std::size_t count)
451 13 : : core_(count)
452 13 : , runner_handles_(std::make_unique<continuation[]>(count))
453 : {
454 13 : }
455 :
456 6 : void record_error(std::error_code ec)
457 : {
458 6 : std::lock_guard lk(failure_mu_);
459 6 : last_error_ = ec;
460 6 : last_exception_ = nullptr;
461 6 : }
462 :
463 4 : void record_exception(std::exception_ptr ep)
464 : {
465 4 : std::lock_guard lk(failure_mu_);
466 4 : last_exception_ = ep;
467 4 : last_error_ = {};
468 4 : }
469 : };
470 :
471 : /** Specialization for void io_result children (no payload storage). */
472 : template<>
473 : struct when_any_io_homogeneous_state<std::tuple<>>
474 : {
475 : when_any_core core_;
476 : std::unique_ptr<continuation[]> runner_handles_;
477 :
478 : std::mutex failure_mu_;
479 : std::error_code last_error_;
480 : std::exception_ptr last_exception_;
481 :
482 3 : explicit when_any_io_homogeneous_state(std::size_t count)
483 3 : : core_(count)
484 3 : , runner_handles_(std::make_unique<continuation[]>(count))
485 : {
486 3 : }
487 :
488 1 : void record_error(std::error_code ec)
489 : {
490 1 : std::lock_guard lk(failure_mu_);
491 1 : last_error_ = ec;
492 1 : last_exception_ = nullptr;
493 1 : }
494 :
495 2 : void record_exception(std::exception_ptr ep)
496 : {
497 2 : std::lock_guard lk(failure_mu_);
498 2 : last_exception_ = ep;
499 2 : last_error_ = {};
500 2 : }
501 : };
502 :
503 : /** Create an io_result-aware runner for homogeneous when_any (range path).
504 :
505 : Only tries to win when the child returns !ec.
506 : */
507 : template<IoAwaitable Awaitable, typename StateType>
508 : when_any_io_runner<StateType>
509 54 : make_when_any_io_homogeneous_runner(
510 : Awaitable inner, StateType* state, std::size_t index)
511 : {
512 : auto result = co_await std::move(inner);
513 :
514 : if(!result.ec)
515 : {
516 : if(state->core_.try_win(index))
517 : {
518 : using PayloadT = io_result_payload_t<
519 : awaitable_result_t<Awaitable>>;
520 : if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
521 : {
522 : try
523 : {
524 : state->result_.emplace(
525 : extract_io_payload(std::move(result)));
526 : }
527 : catch(...)
528 : {
529 : state->core_.set_winner_exception(
530 : std::current_exception());
531 : }
532 : }
533 : }
534 : }
535 : else
536 : {
537 : state->record_error(result.ec);
538 : }
539 108 : }
540 :
541 : /** Launches all io_result-aware homogeneous runners concurrently. */
542 : template<IoAwaitableRange Range>
543 : class when_any_io_homogeneous_launcher
544 : {
545 : using Awaitable = std::ranges::range_value_t<Range>;
546 : using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
547 :
548 : Range* range_;
549 : when_any_io_homogeneous_state<PayloadT>* state_;
550 :
551 : public:
552 16 : when_any_io_homogeneous_launcher(
553 : Range* range,
554 : when_any_io_homogeneous_state<PayloadT>* state)
555 16 : : range_(range)
556 16 : , state_(state)
557 : {
558 16 : }
559 :
560 16 : bool await_ready() const noexcept
561 : {
562 16 : return std::ranges::empty(*range_);
563 : }
564 :
565 16 : std::coroutine_handle<> await_suspend(
566 : std::coroutine_handle<> continuation, io_env const* caller_env)
567 : {
568 16 : state_->core_.continuation_.h = continuation;
569 16 : state_->core_.caller_env_ = caller_env;
570 :
571 16 : if(caller_env->stop_token.stop_possible())
572 : {
573 4 : state_->core_.parent_stop_callback_.emplace(
574 2 : caller_env->stop_token,
575 2 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
576 :
577 2 : if(caller_env->stop_token.stop_requested())
578 1 : state_->core_.stop_source_.request_stop();
579 : }
580 :
581 16 : auto token = state_->core_.stop_source_.get_token();
582 :
583 : // Phase 1: Create all runners without dispatching.
584 16 : std::size_t index = 0;
585 70 : for(auto&& a : *range_)
586 : {
587 54 : auto runner = make_when_any_io_homogeneous_runner(
588 54 : std::move(a), state_, index);
589 :
590 54 : auto h = runner.release();
591 54 : h.promise().state_ = state_;
592 54 : h.promise().index_ = index;
593 54 : h.promise().env_ = io_env{caller_env->executor, token,
594 54 : caller_env->frame_allocator};
595 :
596 54 : state_->runner_handles_[index].h = std::coroutine_handle<>{h};
597 54 : ++index;
598 : }
599 :
600 : // Phase 2: Post all runners. Any may complete synchronously.
601 16 : auto* handles = state_->runner_handles_.get();
602 16 : std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
603 70 : for(std::size_t i = 0; i < count; ++i)
604 54 : caller_env->executor.post(handles[i]);
605 :
606 32 : return std::noop_coroutine();
607 70 : }
608 :
609 16 : void await_resume() const noexcept {}
610 : };
611 :
612 : } // namespace detail
613 :
614 : /** Race a range of io_result-returning awaitables (non-void payloads).
615 :
616 : Only a child returning !ec can win. Errors and exceptions do not
617 : claim winner status. If all children fail, the last failure
618 : is reported — either the last error_code at variant index 0,
619 : or the last exception rethrown.
620 :
621 : @param awaitables Range of io_result-returning awaitables (must
622 : not be empty).
623 :
624 : @return A task yielding variant<error_code, pair<size_t, PayloadT>>
625 : where index 0 is failure and index 1 carries the winner's
626 : index and payload.
627 :
628 : @throws std::invalid_argument if range is empty.
629 : @throws Rethrows last exception when no winner and the last
630 : failure was an exception.
631 :
632 : @par Example
633 : @code
634 : task<void> example()
635 : {
636 : std::vector<io_task<size_t>> reads;
637 : for (auto& buf : buffers)
638 : reads.push_back(stream.read_some(buf));
639 :
640 : auto result = co_await when_any(std::move(reads));
641 : if (result.index() == 1)
642 : {
643 : auto [idx, n] = std::get<1>(result);
644 : }
645 : }
646 : @endcode
647 :
648 : @see IoAwaitableRange, when_any
649 : */
650 : template<IoAwaitableRange R>
651 : requires detail::is_io_result_v<
652 : awaitable_result_t<std::ranges::range_value_t<R>>>
653 : && (!std::is_same_v<
654 : detail::io_result_payload_t<
655 : awaitable_result_t<std::ranges::range_value_t<R>>>,
656 : std::tuple<>>)
657 14 : [[nodiscard]] auto when_any(R&& awaitables)
658 : -> task<std::variant<std::error_code,
659 : std::pair<std::size_t,
660 : detail::io_result_payload_t<
661 : awaitable_result_t<std::ranges::range_value_t<R>>>>>>
662 : {
663 : using Awaitable = std::ranges::range_value_t<R>;
664 : using PayloadT = detail::io_result_payload_t<
665 : awaitable_result_t<Awaitable>>;
666 : using result_type = std::variant<std::error_code,
667 : std::pair<std::size_t, PayloadT>>;
668 : using OwnedRange = std::remove_cvref_t<R>;
669 :
670 : auto count = std::ranges::size(awaitables);
671 : if(count == 0)
672 : throw std::invalid_argument("when_any requires at least one awaitable");
673 :
674 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
675 :
676 : detail::when_any_io_homogeneous_state<PayloadT> state(count);
677 :
678 : co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
679 : &owned_awaitables, &state);
680 :
681 : // Winner found
682 : if(state.core_.has_winner_.load(std::memory_order_acquire))
683 : {
684 : if(state.core_.winner_exception_)
685 : std::rethrow_exception(state.core_.winner_exception_);
686 : co_return result_type{std::in_place_index<1>,
687 : std::pair{state.core_.winner_index_, std::move(*state.result_)}};
688 : }
689 :
690 : // No winner — report last failure
691 : if(state.last_exception_)
692 : std::rethrow_exception(state.last_exception_);
693 : co_return result_type{std::in_place_index<0>, state.last_error_};
694 28 : }
695 :
696 : /** Race a range of void io_result-returning awaitables.
697 :
698 : Only a child returning !ec can win. Returns the winner's index
699 : at variant index 1, or error_code at index 0 on all-fail.
700 :
701 : @param awaitables Range of io_result<>-returning awaitables (must
702 : not be empty).
703 :
704 : @return A task yielding variant<error_code, size_t> where index 0
705 : is failure and index 1 carries the winner's index.
706 :
707 : @throws std::invalid_argument if range is empty.
708 : @throws Rethrows first exception when no winner and at least one
709 : child threw.
710 :
711 : @par Example
712 : @code
713 : task<void> example()
714 : {
715 : std::vector<io_task<>> jobs;
716 : jobs.push_back(background_work_a());
717 : jobs.push_back(background_work_b());
718 :
719 : auto result = co_await when_any(std::move(jobs));
720 : if (result.index() == 1)
721 : {
722 : auto winner = std::get<1>(result);
723 : }
724 : }
725 : @endcode
726 :
727 : @see IoAwaitableRange, when_any
728 : */
729 : template<IoAwaitableRange R>
730 : requires detail::is_io_result_v<
731 : awaitable_result_t<std::ranges::range_value_t<R>>>
732 : && std::is_same_v<
733 : detail::io_result_payload_t<
734 : awaitable_result_t<std::ranges::range_value_t<R>>>,
735 : std::tuple<>>
736 3 : [[nodiscard]] auto when_any(R&& awaitables)
737 : -> task<std::variant<std::error_code, std::size_t>>
738 : {
739 : using OwnedRange = std::remove_cvref_t<R>;
740 : using result_type = std::variant<std::error_code, std::size_t>;
741 :
742 : auto count = std::ranges::size(awaitables);
743 : if(count == 0)
744 : throw std::invalid_argument("when_any requires at least one awaitable");
745 :
746 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
747 :
748 : detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
749 :
750 : co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
751 : &owned_awaitables, &state);
752 :
753 : // Winner found
754 : if(state.core_.has_winner_.load(std::memory_order_acquire))
755 : {
756 : if(state.core_.winner_exception_)
757 : std::rethrow_exception(state.core_.winner_exception_);
758 : co_return result_type{std::in_place_index<1>,
759 : state.core_.winner_index_};
760 : }
761 :
762 : // No winner — report last failure
763 : if(state.last_exception_)
764 : std::rethrow_exception(state.last_exception_);
765 : co_return result_type{std::in_place_index<0>, state.last_error_};
766 6 : }
767 :
768 : /** Race io_result-returning awaitables, selecting the first success.
769 :
770 : Overload selected when all children return io_result<Ts...>.
771 : Only a child returning !ec can win. Errors and exceptions do
772 : not claim winner status.
773 :
774 : @return A task yielding variant<error_code, R1, ..., Rn> where
775 : index 0 is the failure/no-winner case and index i+1
776 : identifies the winning child.
777 : */
778 : template<IoAwaitable... As>
779 : requires (sizeof...(As) > 0)
780 : && detail::all_io_result_awaitables<As...>
781 18 : [[nodiscard]] auto when_any(As... as)
782 : -> task<std::variant<
783 : std::error_code,
784 : detail::io_result_payload_t<awaitable_result_t<As>>...>>
785 : {
786 : using result_type = std::variant<
787 : std::error_code,
788 : detail::io_result_payload_t<awaitable_result_t<As>>...>;
789 :
790 : detail::when_any_io_state<
791 : detail::io_result_payload_t<awaitable_result_t<As>>...> state;
792 : std::tuple<As...> awaitable_tuple(std::move(as)...);
793 :
794 : co_await detail::when_any_io_launcher<As...>(
795 : &awaitable_tuple, &state);
796 :
797 : // Winner found: return their result
798 : if(state.result_.has_value())
799 : co_return std::move(*state.result_);
800 :
801 : // Winner claimed but payload construction failed
802 : if(state.core_.winner_exception_)
803 : std::rethrow_exception(state.core_.winner_exception_);
804 :
805 : // No winner — report last failure
806 : if(state.last_exception_)
807 : std::rethrow_exception(state.last_exception_);
808 : co_return result_type{std::in_place_index<0>, state.last_error_};
809 36 : }
810 :
811 : } // namespace capy
812 : } // namespace boost
813 :
814 : #endif
|