LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 100.0 % 165 165
Test Date: 2026-06-12 18:04:16 Functions: 95.0 % 179 170 9

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/capy
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      12                 : #define BOOST_CAPY_WHEN_ANY_HPP
      13                 : 
      14                 : #include <boost/capy/detail/config.hpp>
      15                 : #include <boost/capy/detail/io_result_combinators.hpp>
      16                 : #include <boost/capy/continuation.hpp>
      17                 : #include <boost/capy/concept/executor.hpp>
      18                 : #include <boost/capy/concept/io_awaitable.hpp>
      19                 : #include <coroutine>
      20                 : #include <boost/capy/ex/executor_ref.hpp>
      21                 : #include <boost/capy/ex/frame_alloc_mixin.hpp>
      22                 : #include <boost/capy/ex/frame_allocator.hpp>
      23                 : #include <boost/capy/ex/io_env.hpp>
      24                 : #include <boost/capy/task.hpp>
      25                 : 
      26                 : #include <array>
      27                 : #include <atomic>
      28                 : #include <exception>
      29                 : #include <memory>
      30                 : #include <mutex>
      31                 : #include <optional>
      32                 : #include <ranges>
      33                 : #include <stdexcept>
      34                 : #include <stop_token>
      35                 : #include <tuple>
      36                 : #include <type_traits>
      37                 : #include <utility>
      38                 : #include <variant>
      39                 : #include <vector>
      40                 : 
      41                 : /*
      42                 :    when_any - Race multiple io_result tasks, select first success
      43                 :    =============================================================
      44                 : 
      45                 :    OVERVIEW:
      46                 :    ---------
      47                 :    when_any launches N io_result-returning tasks concurrently. A task
      48                 :    wins by returning !ec; errors and exceptions do not win. Once a
      49                 :    winner is found, stop is requested for siblings and the winner's
      50                 :    payload is returned. If no winner exists (all fail), the first
      51                 :    error_code is returned or the last exception is rethrown.
      52                 : 
      53                 :    ARCHITECTURE:
      54                 :    -------------
      55                 :    The design mirrors when_all but with inverted completion semantics:
      56                 : 
      57                 :      when_all:  complete when remaining_count reaches 0 (all done)
      58                 :      when_any:  complete when has_winner becomes true (first done)
      59                 :                 BUT still wait for remaining_count to reach 0 for cleanup
      60                 : 
      61                 :    Key components:
      62                 :      - when_any_core:    Shared state tracking winner and completion
      63                 :      - when_any_io_runner: Wrapper coroutine for each child task
      64                 :      - when_any_io_launcher/when_any_io_homogeneous_launcher:
      65                 :                           Awaitables that start all runners concurrently
      66                 : 
      67                 :    CRITICAL INVARIANTS:
      68                 :    --------------------
      69                 :    1. Only a task returning !ec can become the winner (via atomic CAS)
      70                 :    2. All tasks must complete before parent resumes (cleanup safety)
      71                 :    3. Stop is requested immediately when winner is determined
      72                 :    4. Exceptions and errors do not claim winner status
      73                 : 
      74                 :    POSITIONAL VARIANT:
      75                 :    -------------------
      76                 :    The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
      77                 :    Index 0 is error_code (failure/no-winner). Index 1..N identifies the
      78                 :    winning child and carries its payload.
      79                 : 
      80                 :    RANGE OVERLOAD:
      81                 :    ---------------
      82                 :    The range overload returns variant<error_code, pair<size_t, T>> for
      83                 :    non-void children or variant<error_code, size_t> for void children.
      84                 : 
      85                 :    MEMORY MODEL:
      86                 :    -------------
      87                 :    Synchronization chain from winner's write to parent's read:
      88                 : 
      89                 :    1. Winner thread writes result_ (non-atomic)
      90                 :    2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
      91                 :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      92                 :       -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      93                 :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      94                 :    5. Parent coroutine resumes and reads result_
      95                 : 
      96                 :    Synchronization analysis:
      97                 :    - All fetch_sub operations on remaining_count_ form a release sequence
      98                 :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      99                 :      in the modification order of remaining_count_
     100                 :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
     101                 :      modification order, establishing happens-before from winner's writes
     102                 :    - Executor dispatch() is expected to provide queue-based synchronization
     103                 :      (release-on-post, acquire-on-execute) completing the chain to parent
     104                 :    - Even inline executors work (same thread = sequenced-before)
     105                 : 
     106                 :    EXCEPTION SEMANTICS:
     107                 :    --------------------
     108                 :    Exceptions do NOT claim winner status. If a child throws, the exception
     109                 :    is recorded but the combinator keeps waiting for a success. Only when
     110                 :    all children complete without a winner does the combinator check: if
     111                 :    any exception was recorded, it is rethrown (exception beats error_code).
     112                 : */
     113                 : 
     114                 : namespace boost {
     115                 : namespace capy {
     116                 : 
     117                 : namespace detail {
     118                 : 
     119                 : /** Core shared state for when_any operations.
     120                 : 
     121                 :     Contains all members and methods common to both heterogeneous (variadic)
     122                 :     and homogeneous (range) when_any implementations. State classes embed
     123                 :     this via composition to avoid CRTP destructor ordering issues.
     124                 : 
     125                 :     @par Thread Safety
     126                 :     Atomic operations protect winner selection and completion count.
     127                 : */
     128                 : struct when_any_core
     129                 : {
     130                 :     std::atomic<std::size_t> remaining_count_;
     131                 :     std::size_t winner_index_{0};
     132                 :     std::exception_ptr winner_exception_;
     133                 :     std::stop_source stop_source_;
     134                 : 
     135                 :     // Bridges parent's stop token to our stop_source
     136                 :     struct stop_callback_fn
     137                 :     {
     138                 :         std::stop_source* source_;
     139 HIT           3 :         void operator()() const noexcept { source_->request_stop(); }
     140                 :     };
     141                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     142                 :     std::optional<stop_callback_t> parent_stop_callback_;
     143                 : 
     144                 :     continuation continuation_;
     145                 :     io_env const* caller_env_ = nullptr;
     146                 : 
     147                 :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     148                 :     std::atomic<bool> has_winner_{false};
     149                 : 
     150              34 :     explicit when_any_core(std::size_t count) noexcept
     151              34 :         : remaining_count_(count)
     152                 :     {
     153              34 :     }
     154                 : 
     155                 :     /** Atomically claim winner status; exactly one task succeeds. */
     156              53 :     bool try_win(std::size_t index) noexcept
     157                 :     {
     158              53 :         bool expected = false;
     159              53 :         if(has_winner_.compare_exchange_strong(
     160                 :             expected, true, std::memory_order_acq_rel))
     161                 :         {
     162              23 :             winner_index_ = index;
     163              23 :             stop_source_.request_stop();
     164              23 :             return true;
     165                 :         }
     166              30 :         return false;
     167                 :     }
     168                 : 
     169                 :     /** @pre try_win() returned true. */
     170               1 :     void set_winner_exception(std::exception_ptr ep) noexcept
     171                 :     {
     172               1 :         winner_exception_ = ep;
     173               1 :     }
     174                 : 
     175                 :     // Runners signal completion directly via final_suspend; no member function needed.
     176                 : };
     177                 : 
     178                 : } // namespace detail
     179                 : 
     180                 : namespace detail {
     181                 : 
     182                 : // State for io_result-aware when_any: only !ec wins.
     183                 : template<typename... Ts>
     184                 : struct when_any_io_state
     185                 : {
     186                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     187                 :     using variant_type = std::variant<std::error_code, Ts...>;
     188                 : 
     189                 :     when_any_core core_;
     190                 :     std::optional<variant_type> result_;
     191                 :     std::array<continuation, task_count> runner_handles_{};
     192                 : 
     193                 :     // Last failure (error or exception) for the all-fail case.
     194                 :     // Last writer wins — no priority between errors and exceptions.
     195                 :     std::mutex failure_mu_;
     196                 :     std::error_code last_error_;
     197                 :     std::exception_ptr last_exception_;
     198                 : 
     199              18 :     when_any_io_state()
     200              18 :         : core_(task_count)
     201                 :     {
     202              18 :     }
     203                 : 
     204              14 :     void record_error(std::error_code ec)
     205                 :     {
     206              14 :         std::lock_guard lk(failure_mu_);
     207              14 :         last_error_ = ec;
     208              14 :         last_exception_ = nullptr;
     209              14 :     }
     210                 : 
     211               7 :     void record_exception(std::exception_ptr ep)
     212                 :     {
     213               7 :         std::lock_guard lk(failure_mu_);
     214               7 :         last_exception_ = ep;
     215               7 :         last_error_ = {};
     216               7 :     }
     217                 : };
     218                 : 
     219                 : // Wrapper coroutine for io_result-aware when_any children.
     220                 : // unhandled_exception records the exception but does NOT claim winner status.
     221                 : template<typename StateType>
     222                 : struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_any_io_runner
     223                 : {
     224                 :     struct promise_type
     225                 :         : frame_alloc_mixin
     226                 :     {
     227                 :         StateType* state_ = nullptr;
     228                 :         std::size_t index_ = 0;
     229                 :         io_env env_;
     230                 : 
     231              87 :         when_any_io_runner get_return_object() noexcept
     232                 :         {
     233                 :             return when_any_io_runner(
     234              87 :                 std::coroutine_handle<promise_type>::from_promise(*this));
     235                 :         }
     236                 : 
     237              87 :         std::suspend_always initial_suspend() noexcept { return {}; }
     238                 : 
     239              87 :         auto final_suspend() noexcept
     240                 :         {
     241                 :             struct awaiter
     242                 :             {
     243                 :                 promise_type* p_;
     244              87 :                 bool await_ready() const noexcept { return false; }
     245              87 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     246                 :                 {
     247              87 :                     auto& core = p_->state_->core_;
     248              87 :                     auto* counter = &core.remaining_count_;
     249              87 :                     auto* caller_env = core.caller_env_;
     250              87 :                     auto& cont = core.continuation_;
     251                 : 
     252              87 :                     h.destroy();
     253                 : 
     254              87 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     255              87 :                     if(remaining == 1)
     256              34 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     257              53 :                     return detail::symmetric_transfer(std::noop_coroutine());
     258                 :                 }
     259                 :                 void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed
     260                 :             };
     261              87 :             return awaiter{this};
     262                 :         }
     263                 : 
     264              74 :         void return_void() noexcept {}
     265                 : 
     266                 :         // Exceptions do NOT win in io_result when_any
     267              13 :         void unhandled_exception() noexcept
     268                 :         {
     269              13 :             state_->record_exception(std::current_exception());
     270              13 :         }
     271                 : 
     272                 :         template<class Awaitable>
     273                 :         struct transform_awaiter
     274                 :         {
     275                 :             std::decay_t<Awaitable> a_;
     276                 :             promise_type* p_;
     277                 : 
     278              87 :             bool await_ready() { return a_.await_ready(); }
     279              87 :             decltype(auto) await_resume() { return a_.await_resume(); }
     280                 : 
     281                 :             template<class Promise>
     282              86 :             auto await_suspend(std::coroutine_handle<Promise> h)
     283                 :             {
     284                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     285                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     286              86 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     287                 :                 else
     288                 :                     return a_.await_suspend(h, &p_->env_);
     289                 :             }
     290                 :         };
     291                 : 
     292                 :         template<class Awaitable>
     293              87 :         auto await_transform(Awaitable&& a)
     294                 :         {
     295                 :             using A = std::decay_t<Awaitable>;
     296                 :             if constexpr (IoAwaitable<A>)
     297                 :             {
     298                 :                 return transform_awaiter<Awaitable>{
     299             172 :                     std::forward<Awaitable>(a), this};
     300                 :             }
     301                 :             else
     302                 :             {
     303                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     304                 :             }
     305              85 :         }
     306                 :     };
     307                 : 
     308                 :     std::coroutine_handle<promise_type> h_;
     309                 : 
     310              87 :     explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
     311              87 :         : h_(h)
     312                 :     {
     313              87 :     }
     314                 : 
     315                 :     when_any_io_runner(when_any_io_runner&& other) noexcept
     316                 :         : h_(std::exchange(other.h_, nullptr))
     317                 :     {
     318                 :     }
     319                 : 
     320                 :     when_any_io_runner(when_any_io_runner const&) = delete;
     321                 :     when_any_io_runner& operator=(when_any_io_runner const&) = delete;
     322                 :     when_any_io_runner& operator=(when_any_io_runner&&) = delete;
     323                 : 
     324              87 :     auto release() noexcept
     325                 :     {
     326              87 :         return std::exchange(h_, nullptr);
     327                 :     }
     328                 : };
     329                 : 
     330                 : // Runner coroutine: only tries to win when the child returns !ec.
     331                 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
     332                 : when_any_io_runner<StateType>
     333              33 : make_when_any_io_runner(Awaitable inner, StateType* state)
     334                 : {
     335                 :     auto result = co_await std::move(inner);
     336                 : 
     337                 :     if(!result.ec)
     338                 :     {
     339                 :         // Success: try to claim winner
     340                 :         if(state->core_.try_win(I))
     341                 :         {
     342                 :             try
     343                 :             {
     344                 :                 state->result_.emplace(
     345                 :                     std::in_place_index<I + 1>,
     346                 :                     detail::extract_io_payload(std::move(result)));
     347                 :             }
     348                 :             catch(...)
     349                 :             {
     350                 :                 state->core_.set_winner_exception(std::current_exception());
     351                 :             }
     352                 :         }
     353                 :     }
     354                 :     else
     355                 :     {
     356                 :         // Error: record but don't win
     357                 :         state->record_error(result.ec);
     358                 :     }
     359              66 : }
     360                 : 
     361                 : // Launcher for io_result-aware when_any.
     362                 : template<IoAwaitable... Awaitables>
     363                 : class when_any_io_launcher
     364                 : {
     365                 :     using state_type = when_any_io_state<
     366                 :         io_result_payload_t<awaitable_result_t<Awaitables>>...>;
     367                 : 
     368                 :     std::tuple<Awaitables...>* tasks_;
     369                 :     state_type* state_;
     370                 : 
     371                 : public:
     372              18 :     when_any_io_launcher(
     373                 :         std::tuple<Awaitables...>* tasks,
     374                 :         state_type* state)
     375              18 :         : tasks_(tasks)
     376              18 :         , state_(state)
     377                 :     {
     378              18 :     }
     379                 : 
     380              18 :     bool await_ready() const noexcept
     381                 :     {
     382              18 :         return sizeof...(Awaitables) == 0;
     383                 :     }
     384                 : 
     385              18 :     std::coroutine_handle<> await_suspend(
     386                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     387                 :     {
     388              18 :         state_->core_.continuation_.h = continuation;
     389              18 :         state_->core_.caller_env_ = caller_env;
     390                 : 
     391              18 :         if(caller_env->stop_token.stop_possible())
     392                 :         {
     393               4 :             state_->core_.parent_stop_callback_.emplace(
     394               2 :                 caller_env->stop_token,
     395               2 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     396                 : 
     397               2 :             if(caller_env->stop_token.stop_requested())
     398               1 :                 state_->core_.stop_source_.request_stop();
     399                 :         }
     400                 : 
     401              18 :         auto token = state_->core_.stop_source_.get_token();
     402              18 :         launch_all(std::index_sequence_for<Awaitables...>{},
     403                 :             caller_env->executor, token);
     404                 : 
     405              36 :         return std::noop_coroutine();
     406              18 :     }
     407                 : 
     408              18 :     void await_resume() const noexcept {}
     409                 : 
     410                 : private:
     411                 :     template<std::size_t... Is>
     412              18 :     void launch_all(std::index_sequence<Is...>,
     413                 :         executor_ref ex, std::stop_token token)
     414                 :     {
     415              18 :         (..., launch_one<Is>(ex, token));
     416              18 :     }
     417                 : 
     418                 :     template<std::size_t I>
     419              33 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     420                 :     {
     421              33 :         auto runner = make_when_any_io_runner<I>(
     422              33 :             std::move(std::get<I>(*tasks_)), state_);
     423                 : 
     424              33 :         auto h = runner.release();
     425              33 :         h.promise().state_ = state_;
     426              33 :         h.promise().index_ = I;
     427              33 :         h.promise().env_ = io_env{caller_ex, token,
     428              33 :             state_->core_.caller_env_->frame_allocator};
     429                 : 
     430              33 :         state_->runner_handles_[I].h = std::coroutine_handle<>{h};
     431              33 :         caller_ex.post(state_->runner_handles_[I]);
     432              66 :     }
     433                 : };
     434                 : 
     435                 : /** Shared state for homogeneous io_result-aware when_any (range overload).
     436                 : 
     437                 :     @tparam T The payload type extracted from io_result.
     438                 : */
     439                 : template<typename T>
     440                 : struct when_any_io_homogeneous_state
     441                 : {
     442                 :     when_any_core core_;
     443                 :     std::optional<T> result_;
     444                 :     std::unique_ptr<continuation[]> runner_handles_;
     445                 : 
     446                 :     std::mutex failure_mu_;
     447                 :     std::error_code last_error_;
     448                 :     std::exception_ptr last_exception_;
     449                 : 
     450              13 :     explicit when_any_io_homogeneous_state(std::size_t count)
     451              13 :         : core_(count)
     452              13 :         , runner_handles_(std::make_unique<continuation[]>(count))
     453                 :     {
     454              13 :     }
     455                 : 
     456               6 :     void record_error(std::error_code ec)
     457                 :     {
     458               6 :         std::lock_guard lk(failure_mu_);
     459               6 :         last_error_ = ec;
     460               6 :         last_exception_ = nullptr;
     461               6 :     }
     462                 : 
     463               4 :     void record_exception(std::exception_ptr ep)
     464                 :     {
     465               4 :         std::lock_guard lk(failure_mu_);
     466               4 :         last_exception_ = ep;
     467               4 :         last_error_ = {};
     468               4 :     }
     469                 : };
     470                 : 
     471                 : /** Specialization for void io_result children (no payload storage). */
     472                 : template<>
     473                 : struct when_any_io_homogeneous_state<std::tuple<>>
     474                 : {
     475                 :     when_any_core core_;
     476                 :     std::unique_ptr<continuation[]> runner_handles_;
     477                 : 
     478                 :     std::mutex failure_mu_;
     479                 :     std::error_code last_error_;
     480                 :     std::exception_ptr last_exception_;
     481                 : 
     482               3 :     explicit when_any_io_homogeneous_state(std::size_t count)
     483               3 :         : core_(count)
     484               3 :         , runner_handles_(std::make_unique<continuation[]>(count))
     485                 :     {
     486               3 :     }
     487                 : 
     488               1 :     void record_error(std::error_code ec)
     489                 :     {
     490               1 :         std::lock_guard lk(failure_mu_);
     491               1 :         last_error_ = ec;
     492               1 :         last_exception_ = nullptr;
     493               1 :     }
     494                 : 
     495               2 :     void record_exception(std::exception_ptr ep)
     496                 :     {
     497               2 :         std::lock_guard lk(failure_mu_);
     498               2 :         last_exception_ = ep;
     499               2 :         last_error_ = {};
     500               2 :     }
     501                 : };
     502                 : 
     503                 : /** Create an io_result-aware runner for homogeneous when_any (range path).
     504                 : 
     505                 :     Only tries to win when the child returns !ec.
     506                 : */
     507                 : template<IoAwaitable Awaitable, typename StateType>
     508                 : when_any_io_runner<StateType>
     509              54 : make_when_any_io_homogeneous_runner(
     510                 :     Awaitable inner, StateType* state, std::size_t index)
     511                 : {
     512                 :     auto result = co_await std::move(inner);
     513                 : 
     514                 :     if(!result.ec)
     515                 :     {
     516                 :         if(state->core_.try_win(index))
     517                 :         {
     518                 :             using PayloadT = io_result_payload_t<
     519                 :                 awaitable_result_t<Awaitable>>;
     520                 :             if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
     521                 :             {
     522                 :                 try
     523                 :                 {
     524                 :                     state->result_.emplace(
     525                 :                         extract_io_payload(std::move(result)));
     526                 :                 }
     527                 :                 catch(...)
     528                 :                 {
     529                 :                     state->core_.set_winner_exception(
     530                 :                         std::current_exception());
     531                 :                 }
     532                 :             }
     533                 :         }
     534                 :     }
     535                 :     else
     536                 :     {
     537                 :         state->record_error(result.ec);
     538                 :     }
     539             108 : }
     540                 : 
     541                 : /** Launches all io_result-aware homogeneous runners concurrently. */
     542                 : template<IoAwaitableRange Range>
     543                 : class when_any_io_homogeneous_launcher
     544                 : {
     545                 :     using Awaitable = std::ranges::range_value_t<Range>;
     546                 :     using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
     547                 : 
     548                 :     Range* range_;
     549                 :     when_any_io_homogeneous_state<PayloadT>* state_;
     550                 : 
     551                 : public:
     552              16 :     when_any_io_homogeneous_launcher(
     553                 :         Range* range,
     554                 :         when_any_io_homogeneous_state<PayloadT>* state)
     555              16 :         : range_(range)
     556              16 :         , state_(state)
     557                 :     {
     558              16 :     }
     559                 : 
     560              16 :     bool await_ready() const noexcept
     561                 :     {
     562              16 :         return std::ranges::empty(*range_);
     563                 :     }
     564                 : 
     565              16 :     std::coroutine_handle<> await_suspend(
     566                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     567                 :     {
     568              16 :         state_->core_.continuation_.h = continuation;
     569              16 :         state_->core_.caller_env_ = caller_env;
     570                 : 
     571              16 :         if(caller_env->stop_token.stop_possible())
     572                 :         {
     573               4 :             state_->core_.parent_stop_callback_.emplace(
     574               2 :                 caller_env->stop_token,
     575               2 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     576                 : 
     577               2 :             if(caller_env->stop_token.stop_requested())
     578               1 :                 state_->core_.stop_source_.request_stop();
     579                 :         }
     580                 : 
     581              16 :         auto token = state_->core_.stop_source_.get_token();
     582                 : 
     583                 :         // Phase 1: Create all runners without dispatching.
     584              16 :         std::size_t index = 0;
     585              70 :         for(auto&& a : *range_)
     586                 :         {
     587              54 :             auto runner = make_when_any_io_homogeneous_runner(
     588              54 :                 std::move(a), state_, index);
     589                 : 
     590              54 :             auto h = runner.release();
     591              54 :             h.promise().state_ = state_;
     592              54 :             h.promise().index_ = index;
     593              54 :             h.promise().env_ = io_env{caller_env->executor, token,
     594              54 :                 caller_env->frame_allocator};
     595                 : 
     596              54 :             state_->runner_handles_[index].h = std::coroutine_handle<>{h};
     597              54 :             ++index;
     598                 :         }
     599                 : 
     600                 :         // Phase 2: Post all runners. Any may complete synchronously.
     601              16 :         auto* handles = state_->runner_handles_.get();
     602              16 :         std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
     603              70 :         for(std::size_t i = 0; i < count; ++i)
     604              54 :             caller_env->executor.post(handles[i]);
     605                 : 
     606              32 :         return std::noop_coroutine();
     607              70 :     }
     608                 : 
     609              16 :     void await_resume() const noexcept {}
     610                 : };
     611                 : 
     612                 : } // namespace detail
     613                 : 
     614                 : /** Race a range of io_result-returning awaitables (non-void payloads).
     615                 : 
     616                 :     Only a child returning !ec can win. Errors and exceptions do not
     617                 :     claim winner status. If all children fail, the last failure
     618                 :     is reported — either the last error_code at variant index 0,
     619                 :     or the last exception rethrown.
     620                 : 
     621                 :     @param awaitables Range of io_result-returning awaitables (must
     622                 :         not be empty).
     623                 : 
     624                 :     @return A task yielding variant<error_code, pair<size_t, PayloadT>>
     625                 :         where index 0 is failure and index 1 carries the winner's
     626                 :         index and payload.
     627                 : 
     628                 :     @throws std::invalid_argument if range is empty.
     629                 :     @throws Rethrows last exception when no winner and the last
     630                 :         failure was an exception.
     631                 : 
     632                 :     @par Example
     633                 :     @code
     634                 :     task<void> example()
     635                 :     {
     636                 :         std::vector<io_task<size_t>> reads;
     637                 :         for (auto& buf : buffers)
     638                 :             reads.push_back(stream.read_some(buf));
     639                 : 
     640                 :         auto result = co_await when_any(std::move(reads));
     641                 :         if (result.index() == 1)
     642                 :         {
     643                 :             auto [idx, n] = std::get<1>(result);
     644                 :         }
     645                 :     }
     646                 :     @endcode
     647                 : 
     648                 :     @see IoAwaitableRange, when_any
     649                 : */
     650                 : template<IoAwaitableRange R>
     651                 :     requires detail::is_io_result_v<
     652                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     653                 :     && (!std::is_same_v<
     654                 :             detail::io_result_payload_t<
     655                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     656                 :             std::tuple<>>)
     657              14 : [[nodiscard]] auto when_any(R&& awaitables)
     658                 :     -> task<std::variant<std::error_code,
     659                 :         std::pair<std::size_t,
     660                 :             detail::io_result_payload_t<
     661                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>>>>
     662                 : {
     663                 :     using Awaitable = std::ranges::range_value_t<R>;
     664                 :     using PayloadT = detail::io_result_payload_t<
     665                 :         awaitable_result_t<Awaitable>>;
     666                 :     using result_type = std::variant<std::error_code,
     667                 :         std::pair<std::size_t, PayloadT>>;
     668                 :     using OwnedRange = std::remove_cvref_t<R>;
     669                 : 
     670                 :     auto count = std::ranges::size(awaitables);
     671                 :     if(count == 0)
     672                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     673                 : 
     674                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     675                 : 
     676                 :     detail::when_any_io_homogeneous_state<PayloadT> state(count);
     677                 : 
     678                 :     co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
     679                 :         &owned_awaitables, &state);
     680                 : 
     681                 :     // Winner found
     682                 :     if(state.core_.has_winner_.load(std::memory_order_acquire))
     683                 :     {
     684                 :         if(state.core_.winner_exception_)
     685                 :             std::rethrow_exception(state.core_.winner_exception_);
     686                 :         co_return result_type{std::in_place_index<1>,
     687                 :             std::pair{state.core_.winner_index_, std::move(*state.result_)}};
     688                 :     }
     689                 : 
     690                 :     // No winner — report last failure
     691                 :     if(state.last_exception_)
     692                 :         std::rethrow_exception(state.last_exception_);
     693                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     694              28 : }
     695                 : 
     696                 : /** Race a range of void io_result-returning awaitables.
     697                 : 
     698                 :     Only a child returning !ec can win. Returns the winner's index
     699                 :     at variant index 1, or error_code at index 0 on all-fail.
     700                 : 
     701                 :     @param awaitables Range of io_result<>-returning awaitables (must
     702                 :         not be empty).
     703                 : 
     704                 :     @return A task yielding variant<error_code, size_t> where index 0
     705                 :         is failure and index 1 carries the winner's index.
     706                 : 
     707                 :     @throws std::invalid_argument if range is empty.
     708                 :     @throws Rethrows first exception when no winner and at least one
     709                 :         child threw.
     710                 : 
     711                 :     @par Example
     712                 :     @code
     713                 :     task<void> example()
     714                 :     {
     715                 :         std::vector<io_task<>> jobs;
     716                 :         jobs.push_back(background_work_a());
     717                 :         jobs.push_back(background_work_b());
     718                 : 
     719                 :         auto result = co_await when_any(std::move(jobs));
     720                 :         if (result.index() == 1)
     721                 :         {
     722                 :             auto winner = std::get<1>(result);
     723                 :         }
     724                 :     }
     725                 :     @endcode
     726                 : 
     727                 :     @see IoAwaitableRange, when_any
     728                 : */
     729                 : template<IoAwaitableRange R>
     730                 :     requires detail::is_io_result_v<
     731                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     732                 :     && std::is_same_v<
     733                 :             detail::io_result_payload_t<
     734                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     735                 :             std::tuple<>>
     736               3 : [[nodiscard]] auto when_any(R&& awaitables)
     737                 :     -> task<std::variant<std::error_code, std::size_t>>
     738                 : {
     739                 :     using OwnedRange = std::remove_cvref_t<R>;
     740                 :     using result_type = std::variant<std::error_code, std::size_t>;
     741                 : 
     742                 :     auto count = std::ranges::size(awaitables);
     743                 :     if(count == 0)
     744                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     745                 : 
     746                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     747                 : 
     748                 :     detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
     749                 : 
     750                 :     co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
     751                 :         &owned_awaitables, &state);
     752                 : 
     753                 :     // Winner found
     754                 :     if(state.core_.has_winner_.load(std::memory_order_acquire))
     755                 :     {
     756                 :         if(state.core_.winner_exception_)
     757                 :             std::rethrow_exception(state.core_.winner_exception_);
     758                 :         co_return result_type{std::in_place_index<1>,
     759                 :             state.core_.winner_index_};
     760                 :     }
     761                 : 
     762                 :     // No winner — report last failure
     763                 :     if(state.last_exception_)
     764                 :         std::rethrow_exception(state.last_exception_);
     765                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     766               6 : }
     767                 : 
     768                 : /** Race io_result-returning awaitables, selecting the first success.
     769                 : 
     770                 :     Overload selected when all children return io_result<Ts...>.
     771                 :     Only a child returning !ec can win. Errors and exceptions do
     772                 :     not claim winner status.
     773                 : 
     774                 :     @return A task yielding variant<error_code, R1, ..., Rn> where
     775                 :         index 0 is the failure/no-winner case and index i+1
     776                 :         identifies the winning child.
     777                 : */
     778                 : template<IoAwaitable... As>
     779                 :     requires (sizeof...(As) > 0)
     780                 :           && detail::all_io_result_awaitables<As...>
     781              18 : [[nodiscard]] auto when_any(As... as)
     782                 :     -> task<std::variant<
     783                 :         std::error_code,
     784                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>>
     785                 : {
     786                 :     using result_type = std::variant<
     787                 :         std::error_code,
     788                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>;
     789                 : 
     790                 :     detail::when_any_io_state<
     791                 :         detail::io_result_payload_t<awaitable_result_t<As>>...> state;
     792                 :     std::tuple<As...> awaitable_tuple(std::move(as)...);
     793                 : 
     794                 :     co_await detail::when_any_io_launcher<As...>(
     795                 :         &awaitable_tuple, &state);
     796                 : 
     797                 :     // Winner found: return their result
     798                 :     if(state.result_.has_value())
     799                 :         co_return std::move(*state.result_);
     800                 : 
     801                 :     // Winner claimed but payload construction failed
     802                 :     if(state.core_.winner_exception_)
     803                 :         std::rethrow_exception(state.core_.winner_exception_);
     804                 : 
     805                 :     // No winner — report last failure
     806                 :     if(state.last_exception_)
     807                 :         std::rethrow_exception(state.last_exception_);
     808                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     809              36 : }
     810                 : 
     811                 : } // namespace capy
     812                 : } // namespace boost
     813                 : 
     814                 : #endif
        

Generated by: LCOV version 2.3