100.00% Lines (161/161) 100.00% Functions (43/43)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_WHEN_ALL_HPP 10   #ifndef BOOST_CAPY_WHEN_ALL_HPP
11   #define BOOST_CAPY_WHEN_ALL_HPP 11   #define BOOST_CAPY_WHEN_ALL_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/detail/io_result_combinators.hpp> 14   #include <boost/capy/detail/io_result_combinators.hpp>
15   #include <boost/capy/continuation.hpp> 15   #include <boost/capy/continuation.hpp>
16   #include <boost/capy/concept/executor.hpp> 16   #include <boost/capy/concept/executor.hpp>
17   #include <boost/capy/concept/io_awaitable.hpp> 17   #include <boost/capy/concept/io_awaitable.hpp>
18   #include <coroutine> 18   #include <coroutine>
19   #include <boost/capy/ex/frame_alloc_mixin.hpp> 19   #include <boost/capy/ex/frame_alloc_mixin.hpp>
20   #include <boost/capy/ex/io_env.hpp> 20   #include <boost/capy/ex/io_env.hpp>
21   #include <boost/capy/ex/frame_allocator.hpp> 21   #include <boost/capy/ex/frame_allocator.hpp>
22   #include <boost/capy/task.hpp> 22   #include <boost/capy/task.hpp>
23   23  
24   #include <array> 24   #include <array>
25   #include <atomic> 25   #include <atomic>
26   #include <exception> 26   #include <exception>
27   #include <memory> 27   #include <memory>
28   #include <optional> 28   #include <optional>
29   #include <ranges> 29   #include <ranges>
30   #include <stdexcept> 30   #include <stdexcept>
31   #include <stop_token> 31   #include <stop_token>
32   #include <tuple> 32   #include <tuple>
33   #include <type_traits> 33   #include <type_traits>
34   #include <utility> 34   #include <utility>
35   #include <vector> 35   #include <vector>
36   36  
37   namespace boost { 37   namespace boost {
38   namespace capy { 38   namespace capy {
39   39  
40   namespace detail { 40   namespace detail {
41   41  
42   /** Holds the result of a single task within when_all. 42   /** Holds the result of a single task within when_all.
43   */ 43   */
44   template<typename T> 44   template<typename T>
45   struct result_holder 45   struct result_holder
46   { 46   {
47   std::optional<T> value_; 47   std::optional<T> value_;
48   48  
HITCBC 49   83 void set(T v) 49   83 void set(T v)
50   { 50   {
HITCBC 51   83 value_ = std::move(v); 51   83 value_ = std::move(v);
HITCBC 52   83 } 52   83 }
53   53  
HITCBC 54   71 T get() && 54   71 T get() &&
55   { 55   {
HITCBC 56   71 return std::move(*value_); 56   71 return std::move(*value_);
57   } 57   }
58   }; 58   };
59   59  
60   /** Core shared state for when_all operations. 60   /** Core shared state for when_all operations.
61   61  
62   Contains all members and methods common to both heterogeneous (variadic) 62   Contains all members and methods common to both heterogeneous (variadic)
63   and homogeneous (range) when_all implementations. State classes embed 63   and homogeneous (range) when_all implementations. State classes embed
64   this via composition to avoid CRTP destructor ordering issues. 64   this via composition to avoid CRTP destructor ordering issues.
65   65  
66   @par Thread Safety 66   @par Thread Safety
67   Atomic operations protect exception capture and completion count. 67   Atomic operations protect exception capture and completion count.
68   */ 68   */
69   struct when_all_core 69   struct when_all_core
70   { 70   {
71   std::atomic<std::size_t> remaining_count_; 71   std::atomic<std::size_t> remaining_count_;
72   72  
73   // Exception storage - first error wins, others discarded 73   // Exception storage - first error wins, others discarded
74   std::atomic<bool> has_exception_{false}; 74   std::atomic<bool> has_exception_{false};
75   std::exception_ptr first_exception_; 75   std::exception_ptr first_exception_;
76   76  
77   std::stop_source stop_source_; 77   std::stop_source stop_source_;
78   78  
79   // Bridges parent's stop token to our stop_source 79   // Bridges parent's stop token to our stop_source
80   struct stop_callback_fn 80   struct stop_callback_fn
81   { 81   {
82   std::stop_source* source_; 82   std::stop_source* source_;
HITCBC 83   4 void operator()() const { source_->request_stop(); } 83   4 void operator()() const { source_->request_stop(); }
84   }; 84   };
85   using stop_callback_t = std::stop_callback<stop_callback_fn>; 85   using stop_callback_t = std::stop_callback<stop_callback_fn>;
86   std::optional<stop_callback_t> parent_stop_callback_; 86   std::optional<stop_callback_t> parent_stop_callback_;
87   87  
88   continuation continuation_; 88   continuation continuation_;
89   io_env const* caller_env_ = nullptr; 89   io_env const* caller_env_ = nullptr;
90   90  
HITCBC 91   75 explicit when_all_core(std::size_t count) noexcept 91   75 explicit when_all_core(std::size_t count) noexcept
HITCBC 92   75 : remaining_count_(count) 92   75 : remaining_count_(count)
93   { 93   {
HITCBC 94   75 } 94   75 }
95   95  
96   /** Capture an exception (first one wins). */ 96   /** Capture an exception (first one wins). */
HITCBC 97   19 void capture_exception(std::exception_ptr ep) 97   19 void capture_exception(std::exception_ptr ep)
98   { 98   {
HITCBC 99   19 bool expected = false; 99   19 bool expected = false;
HITCBC 100   19 if(has_exception_.compare_exchange_strong( 100   19 if(has_exception_.compare_exchange_strong(
101   expected, true, std::memory_order_relaxed)) 101   expected, true, std::memory_order_relaxed))
HITCBC 102   17 first_exception_ = ep; 102   17 first_exception_ = ep;
HITCBC 103   19 } 103   19 }
104   }; 104   };
105   105  
106   /** Shared state for heterogeneous when_all (variadic overload). 106   /** Shared state for heterogeneous when_all (variadic overload).
107   107  
108   @tparam Ts The result types of the tasks. 108   @tparam Ts The result types of the tasks.
109   */ 109   */
110   template<typename... Ts> 110   template<typename... Ts>
111   struct when_all_state 111   struct when_all_state
112   { 112   {
113   static constexpr std::size_t task_count = sizeof...(Ts); 113   static constexpr std::size_t task_count = sizeof...(Ts);
114   114  
115   when_all_core core_; 115   when_all_core core_;
116   std::tuple<result_holder<Ts>...> results_; 116   std::tuple<result_holder<Ts>...> results_;
117   std::array<continuation, task_count> runner_handles_{}; 117   std::array<continuation, task_count> runner_handles_{};
118   118  
119   std::atomic<bool> has_error_{false}; 119   std::atomic<bool> has_error_{false};
120   std::error_code first_error_; 120   std::error_code first_error_;
121   121  
HITCBC 122   51 when_all_state() 122   51 when_all_state()
HITCBC 123   51 : core_(task_count) 123   51 : core_(task_count)
124   { 124   {
HITCBC 125   51 } 125   51 }
126   126  
127   /** Record the first error (subsequent errors are discarded). */ 127   /** Record the first error (subsequent errors are discarded). */
HITCBC 128   43 void record_error(std::error_code ec) 128   43 void record_error(std::error_code ec)
129   { 129   {
HITCBC 130   43 bool expected = false; 130   43 bool expected = false;
HITCBC 131   43 if(has_error_.compare_exchange_strong( 131   43 if(has_error_.compare_exchange_strong(
132   expected, true, std::memory_order_relaxed)) 132   expected, true, std::memory_order_relaxed))
HITCBC 133   29 first_error_ = ec; 133   29 first_error_ = ec;
HITCBC 134   43 } 134   43 }
135   }; 135   };
136   136  
137   /** Shared state for homogeneous when_all (range overload). 137   /** Shared state for homogeneous when_all (range overload).
138   138  
139   Stores extracted io_result payloads in a vector indexed by task 139   Stores extracted io_result payloads in a vector indexed by task
140   position. Tracks the first error_code for error propagation. 140   position. Tracks the first error_code for error propagation.
141   141  
142   @tparam T The payload type extracted from io_result. 142   @tparam T The payload type extracted from io_result.
143   */ 143   */
144   template<typename T> 144   template<typename T>
145   struct when_all_homogeneous_state 145   struct when_all_homogeneous_state
146   { 146   {
147   when_all_core core_; 147   when_all_core core_;
148   std::vector<std::optional<T>> results_; 148   std::vector<std::optional<T>> results_;
149   std::unique_ptr<continuation[]> runner_handles_; 149   std::unique_ptr<continuation[]> runner_handles_;
150   150  
151   std::atomic<bool> has_error_{false}; 151   std::atomic<bool> has_error_{false};
152   std::error_code first_error_; 152   std::error_code first_error_;
153   153  
HITCBC 154   12 explicit when_all_homogeneous_state(std::size_t count) 154   12 explicit when_all_homogeneous_state(std::size_t count)
HITCBC 155   12 : core_(count) 155   12 : core_(count)
HITCBC 156   24 , results_(count) 156   24 , results_(count)
HITCBC 157   12 , runner_handles_(std::make_unique<continuation[]>(count)) 157   12 , runner_handles_(std::make_unique<continuation[]>(count))
158   { 158   {
HITCBC 159   12 } 159   12 }
160   160  
HITCBC 161   18 void set_result(std::size_t index, T value) 161   18 void set_result(std::size_t index, T value)
162   { 162   {
HITCBC 163   18 results_[index].emplace(std::move(value)); 163   18 results_[index].emplace(std::move(value));
HITCBC 164   18 } 164   18 }
165   165  
166   /** Record the first error (subsequent errors are discarded). */ 166   /** Record the first error (subsequent errors are discarded). */
HITCBC 167   7 void record_error(std::error_code ec) 167   7 void record_error(std::error_code ec)
168   { 168   {
HITCBC 169   7 bool expected = false; 169   7 bool expected = false;
HITCBC 170   7 if(has_error_.compare_exchange_strong( 170   7 if(has_error_.compare_exchange_strong(
171   expected, true, std::memory_order_relaxed)) 171   expected, true, std::memory_order_relaxed))
HITCBC 172   5 first_error_ = ec; 172   5 first_error_ = ec;
HITCBC 173   7 } 173   7 }
174   }; 174   };
175   175  
176   /** Specialization for void io_result children (no payload storage). */ 176   /** Specialization for void io_result children (no payload storage). */
177   template<> 177   template<>
178   struct when_all_homogeneous_state<std::tuple<>> 178   struct when_all_homogeneous_state<std::tuple<>>
179   { 179   {
180   when_all_core core_; 180   when_all_core core_;
181   std::unique_ptr<continuation[]> runner_handles_; 181   std::unique_ptr<continuation[]> runner_handles_;
182   182  
183   std::atomic<bool> has_error_{false}; 183   std::atomic<bool> has_error_{false};
184   std::error_code first_error_; 184   std::error_code first_error_;
185   185  
HITCBC 186   3 explicit when_all_homogeneous_state(std::size_t count) 186   3 explicit when_all_homogeneous_state(std::size_t count)
HITCBC 187   3 : core_(count) 187   3 : core_(count)
HITCBC 188   3 , runner_handles_(std::make_unique<continuation[]>(count)) 188   3 , runner_handles_(std::make_unique<continuation[]>(count))
189   { 189   {
HITCBC 190   3 } 190   3 }
191   191  
192   /** Record the first error (subsequent errors are discarded). */ 192   /** Record the first error (subsequent errors are discarded). */
HITCBC 193   1 void record_error(std::error_code ec) 193   1 void record_error(std::error_code ec)
194   { 194   {
HITCBC 195   1 bool expected = false; 195   1 bool expected = false;
HITCBC 196   1 if(has_error_.compare_exchange_strong( 196   1 if(has_error_.compare_exchange_strong(
197   expected, true, std::memory_order_relaxed)) 197   expected, true, std::memory_order_relaxed))
HITCBC 198   1 first_error_ = ec; 198   1 first_error_ = ec;
HITCBC 199   1 } 199   1 }
200   }; 200   };
201   201  
202   /** Wrapper coroutine that intercepts task completion for when_all. 202   /** Wrapper coroutine that intercepts task completion for when_all.
203   203  
204   Parameterized on StateType to work with both heterogeneous (variadic) 204   Parameterized on StateType to work with both heterogeneous (variadic)
205   and homogeneous (range) state types. All state types expose their 205   and homogeneous (range) state types. All state types expose their
206   shared members through a `core_` member of type when_all_core. 206   shared members through a `core_` member of type when_all_core.
207   207  
208   @tparam StateType The state type (when_all_state or when_all_homogeneous_state). 208   @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
209   */ 209   */
210   template<typename StateType> 210   template<typename StateType>
211   struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner 211   struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner
212   { 212   {
213   struct promise_type 213   struct promise_type
214   : frame_alloc_mixin 214   : frame_alloc_mixin
215   { 215   {
216   StateType* state_ = nullptr; 216   StateType* state_ = nullptr;
217   std::size_t index_ = 0; 217   std::size_t index_ = 0;
218   io_env env_; 218   io_env env_;
219   219  
HITCBC 220   151 when_all_runner get_return_object() noexcept 220   151 when_all_runner get_return_object() noexcept
221   { 221   {
222   return when_all_runner( 222   return when_all_runner(
HITCBC 223   151 std::coroutine_handle<promise_type>::from_promise(*this)); 223   151 std::coroutine_handle<promise_type>::from_promise(*this));
224   } 224   }
225   225  
HITCBC 226   151 std::suspend_always initial_suspend() noexcept 226   151 std::suspend_always initial_suspend() noexcept
227   { 227   {
HITCBC 228   151 return {}; 228   151 return {};
229   } 229   }
230   230  
HITCBC 231   151 auto final_suspend() noexcept 231   151 auto final_suspend() noexcept
232   { 232   {
233   struct awaiter 233   struct awaiter
234   { 234   {
235   promise_type* p_; 235   promise_type* p_;
HITCBC 236   151 bool await_ready() const noexcept { return false; } 236   151 bool await_ready() const noexcept { return false; }
HITCBC 237   151 auto await_suspend(std::coroutine_handle<> h) noexcept 237   151 auto await_suspend(std::coroutine_handle<> h) noexcept
238   { 238   {
HITCBC 239   151 auto& core = p_->state_->core_; 239   151 auto& core = p_->state_->core_;
HITCBC 240   151 auto* counter = &core.remaining_count_; 240   151 auto* counter = &core.remaining_count_;
HITCBC 241   151 auto* caller_env = core.caller_env_; 241   151 auto* caller_env = core.caller_env_;
HITCBC 242   151 auto& cont = core.continuation_; 242   151 auto& cont = core.continuation_;
243   243  
HITCBC 244   151 h.destroy(); 244   151 h.destroy();
245   245  
HITCBC 246   151 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel); 246   151 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
HITCBC 247   151 if(remaining == 1) 247   151 if(remaining == 1)
HITCBC 248   75 return detail::symmetric_transfer(caller_env->executor.dispatch(cont)); 248   75 return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
HITCBC 249   76 return detail::symmetric_transfer(std::noop_coroutine()); 249   76 return detail::symmetric_transfer(std::noop_coroutine());
250   } 250   }
251   void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed 251   void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed
252   }; 252   };
HITCBC 253   151 return awaiter{this}; 253   151 return awaiter{this};
254   } 254   }
255   255  
HITCBC 256   132 void return_void() noexcept {} 256   132 void return_void() noexcept {}
257   257  
HITCBC 258   19 void unhandled_exception() noexcept 258   19 void unhandled_exception() noexcept
259   { 259   {
HITCBC 260   19 state_->core_.capture_exception(std::current_exception()); 260   19 state_->core_.capture_exception(std::current_exception());
HITCBC 261   19 state_->core_.stop_source_.request_stop(); 261   19 state_->core_.stop_source_.request_stop();
HITCBC 262   19 } 262   19 }
263   263  
264   template<class Awaitable> 264   template<class Awaitable>
265   struct transform_awaiter 265   struct transform_awaiter
266   { 266   {
267   std::decay_t<Awaitable> a_; 267   std::decay_t<Awaitable> a_;
268   promise_type* p_; 268   promise_type* p_;
269   269  
HITCBC 270   151 bool await_ready() { return a_.await_ready(); } 270   151 bool await_ready() { return a_.await_ready(); }
HITCBC 271   151 decltype(auto) await_resume() { return a_.await_resume(); } 271   151 decltype(auto) await_resume() { return a_.await_resume(); }
272   272  
273   template<class Promise> 273   template<class Promise>
HITCBC 274   150 auto await_suspend(std::coroutine_handle<Promise> h) 274   150 auto await_suspend(std::coroutine_handle<Promise> h)
275   { 275   {
276   using R = decltype(a_.await_suspend(h, &p_->env_)); 276   using R = decltype(a_.await_suspend(h, &p_->env_));
277   if constexpr (std::is_same_v<R, std::coroutine_handle<>>) 277   if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
HITCBC 278   150 return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_)); 278   150 return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
279   else 279   else
280   return a_.await_suspend(h, &p_->env_); 280   return a_.await_suspend(h, &p_->env_);
281   } 281   }
282   }; 282   };
283   283  
284   template<class Awaitable> 284   template<class Awaitable>
HITCBC 285   151 auto await_transform(Awaitable&& a) 285   151 auto await_transform(Awaitable&& a)
286   { 286   {
287   using A = std::decay_t<Awaitable>; 287   using A = std::decay_t<Awaitable>;
288   if constexpr (IoAwaitable<A>) 288   if constexpr (IoAwaitable<A>)
289   { 289   {
290   return transform_awaiter<Awaitable>{ 290   return transform_awaiter<Awaitable>{
HITCBC 291   302 std::forward<Awaitable>(a), this}; 291   302 std::forward<Awaitable>(a), this};
292   } 292   }
293   else 293   else
294   { 294   {
295   static_assert(sizeof(A) == 0, "requires IoAwaitable"); 295   static_assert(sizeof(A) == 0, "requires IoAwaitable");
296   } 296   }
HITCBC 297   151 } 297   151 }
298   }; 298   };
299   299  
300   std::coroutine_handle<promise_type> h_; 300   std::coroutine_handle<promise_type> h_;
301   301  
HITCBC 302   151 explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept 302   151 explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
HITCBC 303   151 : h_(h) 303   151 : h_(h)
304   { 304   {
HITCBC 305   151 } 305   151 }
306   306  
307   // Enable move for all clang versions - some versions need it 307   // Enable move for all clang versions - some versions need it
308   when_all_runner(when_all_runner&& other) noexcept 308   when_all_runner(when_all_runner&& other) noexcept
309   : h_(std::exchange(other.h_, nullptr)) 309   : h_(std::exchange(other.h_, nullptr))
310   { 310   {
311   } 311   }
312   312  
313   when_all_runner(when_all_runner const&) = delete; 313   when_all_runner(when_all_runner const&) = delete;
314   when_all_runner& operator=(when_all_runner const&) = delete; 314   when_all_runner& operator=(when_all_runner const&) = delete;
315   when_all_runner& operator=(when_all_runner&&) = delete; 315   when_all_runner& operator=(when_all_runner&&) = delete;
316   316  
HITCBC 317   151 auto release() noexcept 317   151 auto release() noexcept
318   { 318   {
HITCBC 319   151 return std::exchange(h_, nullptr); 319   151 return std::exchange(h_, nullptr);
320   } 320   }
321   }; 321   };
322   322  
323   /** Create an io_result-aware runner for a single awaitable (range path). 323   /** Create an io_result-aware runner for a single awaitable (range path).
324   324  
325   Checks the error code, records errors and requests stop on failure, 325   Checks the error code, records errors and requests stop on failure,
326   or extracts the payload on success. 326   or extracts the payload on success.
327   */ 327   */
328   template<IoAwaitable Awaitable, typename StateType> 328   template<IoAwaitable Awaitable, typename StateType>
329   when_all_runner<StateType> 329   when_all_runner<StateType>
HITCBC 330   34 make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index) 330   34 make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
331   { 331   {
332   auto result = co_await std::move(inner); 332   auto result = co_await std::move(inner);
333   333  
334   if(result.ec) 334   if(result.ec)
335   { 335   {
336   state->record_error(result.ec); 336   state->record_error(result.ec);
337   state->core_.stop_source_.request_stop(); 337   state->core_.stop_source_.request_stop();
338   } 338   }
339   else 339   else
340   { 340   {
341   using PayloadT = io_result_payload_t< 341   using PayloadT = io_result_payload_t<
342   awaitable_result_t<Awaitable>>; 342   awaitable_result_t<Awaitable>>;
343   if constexpr (!std::is_same_v<PayloadT, std::tuple<>>) 343   if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
344   { 344   {
345   state->set_result(index, 345   state->set_result(index,
346   extract_io_payload(std::move(result))); 346   extract_io_payload(std::move(result)));
347   } 347   }
348   } 348   }
HITCBC 349   68 } 349   68 }
350   350  
351   /** Create a runner for io_result children that requests stop on ec. */ 351   /** Create a runner for io_result children that requests stop on ec. */
352   template<std::size_t Index, IoAwaitable Awaitable, typename... Ts> 352   template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
353   when_all_runner<when_all_state<Ts...>> 353   when_all_runner<when_all_state<Ts...>>
HITCBC 354   99 make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state) 354   99 make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
355   { 355   {
356   auto result = co_await std::move(inner); 356   auto result = co_await std::move(inner);
357   auto ec = result.ec; 357   auto ec = result.ec;
358   std::get<Index>(state->results_).set(std::move(result)); 358   std::get<Index>(state->results_).set(std::move(result));
359   359  
360   if(ec) 360   if(ec)
361   { 361   {
362   state->record_error(ec); 362   state->record_error(ec);
363   state->core_.stop_source_.request_stop(); 363   state->core_.stop_source_.request_stop();
364   } 364   }
HITCBC 365   198 } 365   198 }
366   366  
367   /** Launcher that uses io_result-aware runners. */ 367   /** Launcher that uses io_result-aware runners. */
368   template<IoAwaitable... Awaitables> 368   template<IoAwaitable... Awaitables>
369   class when_all_io_launcher 369   class when_all_io_launcher
370   { 370   {
371   using state_type = when_all_state<awaitable_result_t<Awaitables>...>; 371   using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
372   372  
373   std::tuple<Awaitables...>* awaitables_; 373   std::tuple<Awaitables...>* awaitables_;
374   state_type* state_; 374   state_type* state_;
375   375  
376   public: 376   public:
HITCBC 377   51 when_all_io_launcher( 377   51 when_all_io_launcher(
378   std::tuple<Awaitables...>* awaitables, 378   std::tuple<Awaitables...>* awaitables,
379   state_type* state) 379   state_type* state)
HITCBC 380   51 : awaitables_(awaitables) 380   51 : awaitables_(awaitables)
HITCBC 381   51 , state_(state) 381   51 , state_(state)
382   { 382   {
HITCBC 383   51 } 383   51 }
384   384  
HITCBC 385   51 bool await_ready() const noexcept 385   51 bool await_ready() const noexcept
386   { 386   {
HITCBC 387   51 return sizeof...(Awaitables) == 0; 387   51 return sizeof...(Awaitables) == 0;
388   } 388   }
389   389  
HITCBC 390   51 std::coroutine_handle<> await_suspend( 390   51 std::coroutine_handle<> await_suspend(
391   std::coroutine_handle<> continuation, io_env const* caller_env) 391   std::coroutine_handle<> continuation, io_env const* caller_env)
392   { 392   {
HITCBC 393   51 state_->core_.continuation_.h = continuation; 393   51 state_->core_.continuation_.h = continuation;
HITCBC 394   51 state_->core_.caller_env_ = caller_env; 394   51 state_->core_.caller_env_ = caller_env;
395   395  
HITCBC 396   51 if(caller_env->stop_token.stop_possible()) 396   51 if(caller_env->stop_token.stop_possible())
397   { 397   {
HITCBC 398   4 state_->core_.parent_stop_callback_.emplace( 398   4 state_->core_.parent_stop_callback_.emplace(
HITCBC 399   2 caller_env->stop_token, 399   2 caller_env->stop_token,
HITCBC 400   2 when_all_core::stop_callback_fn{&state_->core_.stop_source_}); 400   2 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
401   401  
HITCBC 402   2 if(caller_env->stop_token.stop_requested()) 402   2 if(caller_env->stop_token.stop_requested())
HITCBC 403   1 state_->core_.stop_source_.request_stop(); 403   1 state_->core_.stop_source_.request_stop();
404   } 404   }
405   405  
HITCBC 406   51 auto token = state_->core_.stop_source_.get_token(); 406   51 auto token = state_->core_.stop_source_.get_token();
HITCBC 407   51 launch_all(std::index_sequence_for<Awaitables...>{}, 407   51 launch_all(std::index_sequence_for<Awaitables...>{},
408   caller_env->executor, token); 408   caller_env->executor, token);
409   409  
HITCBC 410   102 return std::noop_coroutine(); 410   102 return std::noop_coroutine();
HITCBC 411   51 } 411   51 }
412   412  
HITCBC 413   51 void await_resume() const noexcept {} 413   51 void await_resume() const noexcept {}
414   414  
415   private: 415   private:
416   template<std::size_t... Is> 416   template<std::size_t... Is>
HITCBC 417   51 void launch_all(std::index_sequence<Is...>, 417   51 void launch_all(std::index_sequence<Is...>,
418   executor_ref ex, std::stop_token token) 418   executor_ref ex, std::stop_token token)
419   { 419   {
HITCBC 420   51 (..., launch_one<Is>(ex, token)); 420   51 (..., launch_one<Is>(ex, token));
HITCBC 421   51 } 421   51 }
422   422  
423   template<std::size_t I> 423   template<std::size_t I>
HITCBC 424   99 void launch_one(executor_ref caller_ex, std::stop_token token) 424   99 void launch_one(executor_ref caller_ex, std::stop_token token)
425   { 425   {
HITCBC 426   99 auto runner = make_when_all_io_runner<I>( 426   99 auto runner = make_when_all_io_runner<I>(
HITCBC 427   99 std::move(std::get<I>(*awaitables_)), state_); 427   99 std::move(std::get<I>(*awaitables_)), state_);
428   428  
HITCBC 429   99 auto h = runner.release(); 429   99 auto h = runner.release();
HITCBC 430   99 h.promise().state_ = state_; 430   99 h.promise().state_ = state_;
HITCBC 431   99 h.promise().env_ = io_env{caller_ex, token, 431   99 h.promise().env_ = io_env{caller_ex, token,
HITCBC 432   99 state_->core_.caller_env_->frame_allocator}; 432   99 state_->core_.caller_env_->frame_allocator};
433   433  
HITCBC 434   99 state_->runner_handles_[I].h = std::coroutine_handle<>{h}; 434   99 state_->runner_handles_[I].h = std::coroutine_handle<>{h};
HITCBC 435   99 state_->core_.caller_env_->executor.post(state_->runner_handles_[I]); 435   99 state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
HITCBC 436   198 } 436   198 }
437   }; 437   };
438   438  
439   /** Helper to extract a single result from state. 439   /** Helper to extract a single result from state.
440   This is a separate function to work around a GCC-11 ICE that occurs 440   This is a separate function to work around a GCC-11 ICE that occurs
441   when using nested immediately-invoked lambdas with pack expansion. 441   when using nested immediately-invoked lambdas with pack expansion.
442   */ 442   */
443   template<std::size_t I, typename... Ts> 443   template<std::size_t I, typename... Ts>
HITCBC 444   71 auto extract_single_result(when_all_state<Ts...>& state) 444   71 auto extract_single_result(when_all_state<Ts...>& state)
445   { 445   {
HITCBC 446   71 return std::move(std::get<I>(state.results_)).get(); 446   71 return std::move(std::get<I>(state.results_)).get();
447   } 447   }
448   448  
449   /** Extract all results from state as a tuple. 449   /** Extract all results from state as a tuple.
450   */ 450   */
451   template<typename... Ts> 451   template<typename... Ts>
HITCBC 452   37 auto extract_results(when_all_state<Ts...>& state) 452   37 auto extract_results(when_all_state<Ts...>& state)
453   { 453   {
HITCBC 454   57 return [&]<std::size_t... Is>(std::index_sequence<Is...>) { 454   57 return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
HITCBC 455   37 return std::tuple(extract_single_result<Is>(state)...); 455   37 return std::tuple(extract_single_result<Is>(state)...);
HITCBC 456   74 }(std::index_sequence_for<Ts...>{}); 456   74 }(std::index_sequence_for<Ts...>{});
457   } 457   }
458   458  
459   /** Launches all homogeneous runners concurrently. 459   /** Launches all homogeneous runners concurrently.
460   460  
461   Two-phase approach: create all runners first, then post all. 461   Two-phase approach: create all runners first, then post all.
462   This avoids lifetime issues if a task completes synchronously. 462   This avoids lifetime issues if a task completes synchronously.
463   */ 463   */
464   template<typename Range> 464   template<typename Range>
465   class when_all_homogeneous_launcher 465   class when_all_homogeneous_launcher
466   { 466   {
467   using Awaitable = std::ranges::range_value_t<Range>; 467   using Awaitable = std::ranges::range_value_t<Range>;
468   using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>; 468   using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
469   469  
470   Range* range_; 470   Range* range_;
471   when_all_homogeneous_state<PayloadT>* state_; 471   when_all_homogeneous_state<PayloadT>* state_;
472   472  
473   public: 473   public:
HITCBC 474   15 when_all_homogeneous_launcher( 474   15 when_all_homogeneous_launcher(
475   Range* range, 475   Range* range,
476   when_all_homogeneous_state<PayloadT>* state) 476   when_all_homogeneous_state<PayloadT>* state)
HITCBC 477   15 : range_(range) 477   15 : range_(range)
HITCBC 478   15 , state_(state) 478   15 , state_(state)
479   { 479   {
HITCBC 480   15 } 480   15 }
481   481  
HITCBC 482   15 bool await_ready() const noexcept 482   15 bool await_ready() const noexcept
483   { 483   {
HITCBC 484   15 return std::ranges::empty(*range_); 484   15 return std::ranges::empty(*range_);
485   } 485   }
486   486  
HITCBC 487   15 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env) 487   15 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
488   { 488   {
HITCBC 489   15 state_->core_.continuation_.h = continuation; 489   15 state_->core_.continuation_.h = continuation;
HITCBC 490   15 state_->core_.caller_env_ = caller_env; 490   15 state_->core_.caller_env_ = caller_env;
491   491  
HITCBC 492   15 if(caller_env->stop_token.stop_possible()) 492   15 if(caller_env->stop_token.stop_possible())
493   { 493   {
HITCBC 494   4 state_->core_.parent_stop_callback_.emplace( 494   4 state_->core_.parent_stop_callback_.emplace(
HITCBC 495   2 caller_env->stop_token, 495   2 caller_env->stop_token,
HITCBC 496   2 when_all_core::stop_callback_fn{&state_->core_.stop_source_}); 496   2 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
497   497  
HITCBC 498   2 if(caller_env->stop_token.stop_requested()) 498   2 if(caller_env->stop_token.stop_requested())
HITCBC 499   1 state_->core_.stop_source_.request_stop(); 499   1 state_->core_.stop_source_.request_stop();
500   } 500   }
501   501  
HITCBC 502   15 auto token = state_->core_.stop_source_.get_token(); 502   15 auto token = state_->core_.stop_source_.get_token();
503   503  
504   // Phase 1: Create all runners without dispatching. 504   // Phase 1: Create all runners without dispatching.
HITCBC 505   15 std::size_t index = 0; 505   15 std::size_t index = 0;
HITCBC 506   49 for(auto&& a : *range_) 506   49 for(auto&& a : *range_)
507   { 507   {
HITCBC 508   34 auto runner = make_when_all_homogeneous_runner( 508   34 auto runner = make_when_all_homogeneous_runner(
HITCBC 509   34 std::move(a), state_, index); 509   34 std::move(a), state_, index);
510   510  
HITCBC 511   34 auto h = runner.release(); 511   34 auto h = runner.release();
HITCBC 512   34 h.promise().state_ = state_; 512   34 h.promise().state_ = state_;
HITCBC 513   34 h.promise().index_ = index; 513   34 h.promise().index_ = index;
HITCBC 514   34 h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator}; 514   34 h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
515   515  
HITCBC 516   34 state_->runner_handles_[index].h = std::coroutine_handle<>{h}; 516   34 state_->runner_handles_[index].h = std::coroutine_handle<>{h};
HITCBC 517   34 ++index; 517   34 ++index;
518   } 518   }
519   519  
520   // Phase 2: Post all runners. Any may complete synchronously. 520   // Phase 2: Post all runners. Any may complete synchronously.
521   // After last post, state_ and this may be destroyed. 521   // After last post, state_ and this may be destroyed.
HITCBC 522   15 auto* handles = state_->runner_handles_.get(); 522   15 auto* handles = state_->runner_handles_.get();
HITCBC 523   15 std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed); 523   15 std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
HITCBC 524   49 for(std::size_t i = 0; i < count; ++i) 524   49 for(std::size_t i = 0; i < count; ++i)
HITCBC 525   34 caller_env->executor.post(handles[i]); 525   34 caller_env->executor.post(handles[i]);
526   526  
HITCBC 527   30 return std::noop_coroutine(); 527   30 return std::noop_coroutine();
HITCBC 528   49 } 528   49 }
529   529  
HITCBC 530   15 void await_resume() const noexcept 530   15 void await_resume() const noexcept
531   { 531   {
HITCBC 532   15 } 532   15 }
533   }; 533   };
534   534  
535   } // namespace detail 535   } // namespace detail
536   536  
537   /** Execute a range of io_result-returning awaitables concurrently. 537   /** Execute a range of io_result-returning awaitables concurrently.
538   538  
539   Launches all awaitables simultaneously and waits for all to complete. 539   Launches all awaitables simultaneously and waits for all to complete.
540   On success, extracted payloads are collected in a vector preserving 540   On success, extracted payloads are collected in a vector preserving
541   input order. The first error_code cancels siblings and is propagated 541   input order. The first error_code cancels siblings and is propagated
542   in the outer io_result. Exceptions always beat error codes. 542   in the outer io_result. Exceptions always beat error codes.
543   543  
544   @li All child awaitables run concurrently on the caller's executor 544   @li All child awaitables run concurrently on the caller's executor
545   @li Payloads are returned as a vector in input order 545   @li Payloads are returned as a vector in input order
546   @li First error_code wins and cancels siblings 546   @li First error_code wins and cancels siblings
547   @li Exception always beats error_code 547   @li Exception always beats error_code
548   @li Completes only after all children have finished 548   @li Completes only after all children have finished
549   549  
550   @par Thread Safety 550   @par Thread Safety
551   The returned task must be awaited from a single execution context. 551   The returned task must be awaited from a single execution context.
552   Child awaitables execute concurrently but complete through the caller's 552   Child awaitables execute concurrently but complete through the caller's
553   executor. 553   executor.
554   554  
555   @param awaitables Range of io_result-returning awaitables to execute 555   @param awaitables Range of io_result-returning awaitables to execute
556   concurrently (must not be empty). 556   concurrently (must not be empty).
557   557  
558   @return A task yielding io_result<vector<PayloadT>> where PayloadT 558   @return A task yielding io_result<vector<PayloadT>> where PayloadT
559   is the payload extracted from each child's io_result. 559   is the payload extracted from each child's io_result.
560   560  
561   @throws std::invalid_argument if range is empty (thrown before 561   @throws std::invalid_argument if range is empty (thrown before
562   coroutine suspends). 562   coroutine suspends).
563   @throws Rethrows the first child exception after all children 563   @throws Rethrows the first child exception after all children
564   complete (exception beats error_code). 564   complete (exception beats error_code).
565   565  
566   @par Example 566   @par Example
567   @code 567   @code
568   task<void> example() 568   task<void> example()
569   { 569   {
570   std::vector<io_task<size_t>> reads; 570   std::vector<io_task<size_t>> reads;
571   for (auto& buf : buffers) 571   for (auto& buf : buffers)
572   reads.push_back(stream.read_some(buf)); 572   reads.push_back(stream.read_some(buf));
573   573  
574   auto [ec, counts] = co_await when_all(std::move(reads)); 574   auto [ec, counts] = co_await when_all(std::move(reads));
575   if (ec) { // handle error 575   if (ec) { // handle error
576   } 576   }
577   } 577   }
578   @endcode 578   @endcode
579   579  
580   @see IoAwaitableRange, when_all 580   @see IoAwaitableRange, when_all
581   */ 581   */
582   template<IoAwaitableRange R> 582   template<IoAwaitableRange R>
583   requires detail::is_io_result_v< 583   requires detail::is_io_result_v<
584   awaitable_result_t<std::ranges::range_value_t<R>>> 584   awaitable_result_t<std::ranges::range_value_t<R>>>
585   && (!std::is_same_v< 585   && (!std::is_same_v<
586   detail::io_result_payload_t< 586   detail::io_result_payload_t<
587   awaitable_result_t<std::ranges::range_value_t<R>>>, 587   awaitable_result_t<std::ranges::range_value_t<R>>>,
588   std::tuple<>>) 588   std::tuple<>>)
HITCBC 589   13 [[nodiscard]] auto when_all(R&& awaitables) 589   13 [[nodiscard]] auto when_all(R&& awaitables)
590   -> task<io_result<std::vector< 590   -> task<io_result<std::vector<
591   detail::io_result_payload_t< 591   detail::io_result_payload_t<
592   awaitable_result_t<std::ranges::range_value_t<R>>>>>> 592   awaitable_result_t<std::ranges::range_value_t<R>>>>>>
593   { 593   {
594   using Awaitable = std::ranges::range_value_t<R>; 594   using Awaitable = std::ranges::range_value_t<R>;
595   using PayloadT = detail::io_result_payload_t< 595   using PayloadT = detail::io_result_payload_t<
596   awaitable_result_t<Awaitable>>; 596   awaitable_result_t<Awaitable>>;
597   using OwnedRange = std::remove_cvref_t<R>; 597   using OwnedRange = std::remove_cvref_t<R>;
598   598  
599   auto count = std::ranges::size(awaitables); 599   auto count = std::ranges::size(awaitables);
600   if(count == 0) 600   if(count == 0)
601   throw std::invalid_argument("when_all requires at least one awaitable"); 601   throw std::invalid_argument("when_all requires at least one awaitable");
602   602  
603   OwnedRange owned_awaitables = std::forward<R>(awaitables); 603   OwnedRange owned_awaitables = std::forward<R>(awaitables);
604   604  
605   detail::when_all_homogeneous_state<PayloadT> state(count); 605   detail::when_all_homogeneous_state<PayloadT> state(count);
606   606  
607   co_await detail::when_all_homogeneous_launcher<OwnedRange>( 607   co_await detail::when_all_homogeneous_launcher<OwnedRange>(
608   &owned_awaitables, &state); 608   &owned_awaitables, &state);
609   609  
610   if(state.core_.first_exception_) 610   if(state.core_.first_exception_)
611   std::rethrow_exception(state.core_.first_exception_); 611   std::rethrow_exception(state.core_.first_exception_);
612   612  
613   if(state.has_error_.load(std::memory_order_relaxed)) 613   if(state.has_error_.load(std::memory_order_relaxed))
614   co_return io_result<std::vector<PayloadT>>{state.first_error_, {}}; 614   co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
615   615  
616   std::vector<PayloadT> results; 616   std::vector<PayloadT> results;
617   results.reserve(count); 617   results.reserve(count);
618   for(auto& opt : state.results_) 618   for(auto& opt : state.results_)
619   results.push_back(std::move(*opt)); 619   results.push_back(std::move(*opt));
620   620  
621   co_return io_result<std::vector<PayloadT>>{{}, std::move(results)}; 621   co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
HITCBC 622   26 } 622   26 }
623   623  
624   /** Execute a range of void io_result-returning awaitables concurrently. 624   /** Execute a range of void io_result-returning awaitables concurrently.
625   625  
626   Launches all awaitables simultaneously and waits for all to complete. 626   Launches all awaitables simultaneously and waits for all to complete.
627   Since all awaitables return io_result<>, no payload values are 627   Since all awaitables return io_result<>, no payload values are
628   collected. The first error_code cancels siblings and is propagated. 628   collected. The first error_code cancels siblings and is propagated.
629   Exceptions always beat error codes. 629   Exceptions always beat error codes.
630   630  
631   @param awaitables Range of io_result<>-returning awaitables to 631   @param awaitables Range of io_result<>-returning awaitables to
632   execute concurrently (must not be empty). 632   execute concurrently (must not be empty).
633   633  
634   @return A task yielding io_result<> whose ec is the first child 634   @return A task yielding io_result<> whose ec is the first child
635   error, or default-constructed on success. 635   error, or default-constructed on success.
636   636  
637   @throws std::invalid_argument if range is empty. 637   @throws std::invalid_argument if range is empty.
638   @throws Rethrows the first child exception after all children 638   @throws Rethrows the first child exception after all children
639   complete (exception beats error_code). 639   complete (exception beats error_code).
640   640  
641   @par Example 641   @par Example
642   @code 642   @code
643   task<void> example() 643   task<void> example()
644   { 644   {
645   std::vector<io_task<>> jobs; 645   std::vector<io_task<>> jobs;
646   for (int i = 0; i < n; ++i) 646   for (int i = 0; i < n; ++i)
647   jobs.push_back(process(i)); 647   jobs.push_back(process(i));
648   648  
649   auto [ec] = co_await when_all(std::move(jobs)); 649   auto [ec] = co_await when_all(std::move(jobs));
650   } 650   }
651   @endcode 651   @endcode
652   652  
653   @see IoAwaitableRange, when_all 653   @see IoAwaitableRange, when_all
654   */ 654   */
655   template<IoAwaitableRange R> 655   template<IoAwaitableRange R>
656   requires detail::is_io_result_v< 656   requires detail::is_io_result_v<
657   awaitable_result_t<std::ranges::range_value_t<R>>> 657   awaitable_result_t<std::ranges::range_value_t<R>>>
658   && std::is_same_v< 658   && std::is_same_v<
659   detail::io_result_payload_t< 659   detail::io_result_payload_t<
660   awaitable_result_t<std::ranges::range_value_t<R>>>, 660   awaitable_result_t<std::ranges::range_value_t<R>>>,
661   std::tuple<>> 661   std::tuple<>>
HITCBC 662   4 [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>> 662   4 [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
663   { 663   {
664   using OwnedRange = std::remove_cvref_t<R>; 664   using OwnedRange = std::remove_cvref_t<R>;
665   665  
666   auto count = std::ranges::size(awaitables); 666   auto count = std::ranges::size(awaitables);
667   if(count == 0) 667   if(count == 0)
668   throw std::invalid_argument("when_all requires at least one awaitable"); 668   throw std::invalid_argument("when_all requires at least one awaitable");
669   669  
670   OwnedRange owned_awaitables = std::forward<R>(awaitables); 670   OwnedRange owned_awaitables = std::forward<R>(awaitables);
671   671  
672   detail::when_all_homogeneous_state<std::tuple<>> state(count); 672   detail::when_all_homogeneous_state<std::tuple<>> state(count);
673   673  
674   co_await detail::when_all_homogeneous_launcher<OwnedRange>( 674   co_await detail::when_all_homogeneous_launcher<OwnedRange>(
675   &owned_awaitables, &state); 675   &owned_awaitables, &state);
676   676  
677   if(state.core_.first_exception_) 677   if(state.core_.first_exception_)
678   std::rethrow_exception(state.core_.first_exception_); 678   std::rethrow_exception(state.core_.first_exception_);
679   679  
680   if(state.has_error_.load(std::memory_order_relaxed)) 680   if(state.has_error_.load(std::memory_order_relaxed))
681   co_return io_result<>{state.first_error_}; 681   co_return io_result<>{state.first_error_};
682   682  
683   co_return io_result<>{}; 683   co_return io_result<>{};
HITCBC 684   8 } 684   8 }
685   685  
686   /** Execute io_result-returning awaitables concurrently, inspecting error codes. 686   /** Execute io_result-returning awaitables concurrently, inspecting error codes.
687   687  
688   Overload selected when all children return io_result<Ts...>. 688   Overload selected when all children return io_result<Ts...>.
689   The error_code is lifted out of each child into a single outer 689   The error_code is lifted out of each child into a single outer
690   io_result. On success all values are returned; on failure the 690   io_result. On success all values are returned; on failure the
691   first error_code wins. 691   first error_code wins.
692   692  
693   @par Exception Safety 693   @par Exception Safety
694   Exception always beats error_code. If any child throws, the 694   Exception always beats error_code. If any child throws, the
695   exception is rethrown regardless of error_code results. 695   exception is rethrown regardless of error_code results.
696   696  
697   @param awaitables One or more awaitables each returning 697   @param awaitables One or more awaitables each returning
698   io_result<Ts...>. 698   io_result<Ts...>.
699   699  
700   @return A task yielding io_result<R1, R2, ..., Rn> where each Ri 700   @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
701   follows the payload flattening rules. 701   follows the payload flattening rules.
702   */ 702   */
703   template<IoAwaitable... As> 703   template<IoAwaitable... As>
704   requires (sizeof...(As) > 0) 704   requires (sizeof...(As) > 0)
705   && detail::all_io_result_awaitables<As...> 705   && detail::all_io_result_awaitables<As...>
HITCBC 706   51 [[nodiscard]] auto when_all(As... awaitables) 706   51 [[nodiscard]] auto when_all(As... awaitables)
707   -> task<io_result< 707   -> task<io_result<
708   detail::io_result_payload_t<awaitable_result_t<As>>...>> 708   detail::io_result_payload_t<awaitable_result_t<As>>...>>
709   { 709   {
710   using result_type = io_result< 710   using result_type = io_result<
711   detail::io_result_payload_t<awaitable_result_t<As>>...>; 711   detail::io_result_payload_t<awaitable_result_t<As>>...>;
712   712  
713   detail::when_all_state<awaitable_result_t<As>...> state; 713   detail::when_all_state<awaitable_result_t<As>...> state;
714   std::tuple<As...> awaitable_tuple(std::move(awaitables)...); 714   std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
715   715  
716   co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state); 716   co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
717   717  
718   // Exception always wins over error_code 718   // Exception always wins over error_code
719   if(state.core_.first_exception_) 719   if(state.core_.first_exception_)
720   std::rethrow_exception(state.core_.first_exception_); 720   std::rethrow_exception(state.core_.first_exception_);
721   721  
722   auto r = detail::build_when_all_io_result<result_type>( 722   auto r = detail::build_when_all_io_result<result_type>(
723   detail::extract_results(state)); 723   detail::extract_results(state));
724   if(state.has_error_.load(std::memory_order_relaxed)) 724   if(state.has_error_.load(std::memory_order_relaxed))
725   r.ec = state.first_error_; 725   r.ec = state.first_error_;
726   co_return r; 726   co_return r;
HITCBC 727   102 } 727   102 }
728   728  
729   } // namespace capy 729   } // namespace capy
730   } // namespace boost 730   } // namespace boost
731   731  
732   #endif 732   #endif