TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ALL_HPP
11 : #define BOOST_CAPY_WHEN_ALL_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/io_result_combinators.hpp>
15 : #include <boost/capy/continuation.hpp>
16 : #include <boost/capy/concept/executor.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <coroutine>
19 : #include <boost/capy/ex/frame_alloc_mixin.hpp>
20 : #include <boost/capy/ex/io_env.hpp>
21 : #include <boost/capy/ex/frame_allocator.hpp>
22 : #include <boost/capy/task.hpp>
23 :
24 : #include <array>
25 : #include <atomic>
26 : #include <exception>
27 : #include <memory>
28 : #include <optional>
29 : #include <ranges>
30 : #include <stdexcept>
31 : #include <stop_token>
32 : #include <tuple>
33 : #include <type_traits>
34 : #include <utility>
35 : #include <vector>
36 :
37 : namespace boost {
38 : namespace capy {
39 :
40 : namespace detail {
41 :
42 : /** Holds the result of a single task within when_all.
43 : */
44 : template<typename T>
45 : struct result_holder
46 : {
47 : std::optional<T> value_;
48 :
49 HIT 83 : void set(T v)
50 : {
51 83 : value_ = std::move(v);
52 83 : }
53 :
54 71 : T get() &&
55 : {
56 71 : return std::move(*value_);
57 : }
58 : };
59 :
60 : /** Core shared state for when_all operations.
61 :
62 : Contains all members and methods common to both heterogeneous (variadic)
63 : and homogeneous (range) when_all implementations. State classes embed
64 : this via composition to avoid CRTP destructor ordering issues.
65 :
66 : @par Thread Safety
67 : Atomic operations protect exception capture and completion count.
68 : */
69 : struct when_all_core
70 : {
71 : std::atomic<std::size_t> remaining_count_;
72 :
73 : // Exception storage - first error wins, others discarded
74 : std::atomic<bool> has_exception_{false};
75 : std::exception_ptr first_exception_;
76 :
77 : std::stop_source stop_source_;
78 :
79 : // Bridges parent's stop token to our stop_source
80 : struct stop_callback_fn
81 : {
82 : std::stop_source* source_;
83 4 : void operator()() const { source_->request_stop(); }
84 : };
85 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
86 : std::optional<stop_callback_t> parent_stop_callback_;
87 :
88 : continuation continuation_;
89 : io_env const* caller_env_ = nullptr;
90 :
91 75 : explicit when_all_core(std::size_t count) noexcept
92 75 : : remaining_count_(count)
93 : {
94 75 : }
95 :
96 : /** Capture an exception (first one wins). */
97 19 : void capture_exception(std::exception_ptr ep)
98 : {
99 19 : bool expected = false;
100 19 : if(has_exception_.compare_exchange_strong(
101 : expected, true, std::memory_order_relaxed))
102 17 : first_exception_ = ep;
103 19 : }
104 : };
105 :
106 : /** Shared state for heterogeneous when_all (variadic overload).
107 :
108 : @tparam Ts The result types of the tasks.
109 : */
110 : template<typename... Ts>
111 : struct when_all_state
112 : {
113 : static constexpr std::size_t task_count = sizeof...(Ts);
114 :
115 : when_all_core core_;
116 : std::tuple<result_holder<Ts>...> results_;
117 : std::array<continuation, task_count> runner_handles_{};
118 :
119 : std::atomic<bool> has_error_{false};
120 : std::error_code first_error_;
121 :
122 51 : when_all_state()
123 51 : : core_(task_count)
124 : {
125 51 : }
126 :
127 : /** Record the first error (subsequent errors are discarded). */
128 43 : void record_error(std::error_code ec)
129 : {
130 43 : bool expected = false;
131 43 : if(has_error_.compare_exchange_strong(
132 : expected, true, std::memory_order_relaxed))
133 29 : first_error_ = ec;
134 43 : }
135 : };
136 :
137 : /** Shared state for homogeneous when_all (range overload).
138 :
139 : Stores extracted io_result payloads in a vector indexed by task
140 : position. Tracks the first error_code for error propagation.
141 :
142 : @tparam T The payload type extracted from io_result.
143 : */
144 : template<typename T>
145 : struct when_all_homogeneous_state
146 : {
147 : when_all_core core_;
148 : std::vector<std::optional<T>> results_;
149 : std::unique_ptr<continuation[]> runner_handles_;
150 :
151 : std::atomic<bool> has_error_{false};
152 : std::error_code first_error_;
153 :
154 12 : explicit when_all_homogeneous_state(std::size_t count)
155 12 : : core_(count)
156 24 : , results_(count)
157 12 : , runner_handles_(std::make_unique<continuation[]>(count))
158 : {
159 12 : }
160 :
161 18 : void set_result(std::size_t index, T value)
162 : {
163 18 : results_[index].emplace(std::move(value));
164 18 : }
165 :
166 : /** Record the first error (subsequent errors are discarded). */
167 7 : void record_error(std::error_code ec)
168 : {
169 7 : bool expected = false;
170 7 : if(has_error_.compare_exchange_strong(
171 : expected, true, std::memory_order_relaxed))
172 5 : first_error_ = ec;
173 7 : }
174 : };
175 :
176 : /** Specialization for void io_result children (no payload storage). */
177 : template<>
178 : struct when_all_homogeneous_state<std::tuple<>>
179 : {
180 : when_all_core core_;
181 : std::unique_ptr<continuation[]> runner_handles_;
182 :
183 : std::atomic<bool> has_error_{false};
184 : std::error_code first_error_;
185 :
186 3 : explicit when_all_homogeneous_state(std::size_t count)
187 3 : : core_(count)
188 3 : , runner_handles_(std::make_unique<continuation[]>(count))
189 : {
190 3 : }
191 :
192 : /** Record the first error (subsequent errors are discarded). */
193 1 : void record_error(std::error_code ec)
194 : {
195 1 : bool expected = false;
196 1 : if(has_error_.compare_exchange_strong(
197 : expected, true, std::memory_order_relaxed))
198 1 : first_error_ = ec;
199 1 : }
200 : };
201 :
202 : /** Wrapper coroutine that intercepts task completion for when_all.
203 :
204 : Parameterized on StateType to work with both heterogeneous (variadic)
205 : and homogeneous (range) state types. All state types expose their
206 : shared members through a `core_` member of type when_all_core.
207 :
208 : @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
209 : */
210 : template<typename StateType>
211 : struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE when_all_runner
212 : {
213 : struct promise_type
214 : : frame_alloc_mixin
215 : {
216 : StateType* state_ = nullptr;
217 : std::size_t index_ = 0;
218 : io_env env_;
219 :
220 151 : when_all_runner get_return_object() noexcept
221 : {
222 : return when_all_runner(
223 151 : std::coroutine_handle<promise_type>::from_promise(*this));
224 : }
225 :
226 151 : std::suspend_always initial_suspend() noexcept
227 : {
228 151 : return {};
229 : }
230 :
231 151 : auto final_suspend() noexcept
232 : {
233 : struct awaiter
234 : {
235 : promise_type* p_;
236 151 : bool await_ready() const noexcept { return false; }
237 151 : auto await_suspend(std::coroutine_handle<> h) noexcept
238 : {
239 151 : auto& core = p_->state_->core_;
240 151 : auto* counter = &core.remaining_count_;
241 151 : auto* caller_env = core.caller_env_;
242 151 : auto& cont = core.continuation_;
243 :
244 151 : h.destroy();
245 :
246 151 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
247 151 : if(remaining == 1)
248 75 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
249 76 : return detail::symmetric_transfer(std::noop_coroutine());
250 : }
251 : void await_resume() const noexcept {} // LCOV_EXCL_LINE final_suspend awaiter, never resumed
252 : };
253 151 : return awaiter{this};
254 : }
255 :
256 132 : void return_void() noexcept {}
257 :
258 19 : void unhandled_exception() noexcept
259 : {
260 19 : state_->core_.capture_exception(std::current_exception());
261 19 : state_->core_.stop_source_.request_stop();
262 19 : }
263 :
264 : template<class Awaitable>
265 : struct transform_awaiter
266 : {
267 : std::decay_t<Awaitable> a_;
268 : promise_type* p_;
269 :
270 151 : bool await_ready() { return a_.await_ready(); }
271 151 : decltype(auto) await_resume() { return a_.await_resume(); }
272 :
273 : template<class Promise>
274 150 : auto await_suspend(std::coroutine_handle<Promise> h)
275 : {
276 : using R = decltype(a_.await_suspend(h, &p_->env_));
277 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
278 150 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
279 : else
280 : return a_.await_suspend(h, &p_->env_);
281 : }
282 : };
283 :
284 : template<class Awaitable>
285 151 : auto await_transform(Awaitable&& a)
286 : {
287 : using A = std::decay_t<Awaitable>;
288 : if constexpr (IoAwaitable<A>)
289 : {
290 : return transform_awaiter<Awaitable>{
291 302 : std::forward<Awaitable>(a), this};
292 : }
293 : else
294 : {
295 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
296 : }
297 151 : }
298 : };
299 :
300 : std::coroutine_handle<promise_type> h_;
301 :
302 151 : explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
303 151 : : h_(h)
304 : {
305 151 : }
306 :
307 : // Enable move for all clang versions - some versions need it
308 : when_all_runner(when_all_runner&& other) noexcept
309 : : h_(std::exchange(other.h_, nullptr))
310 : {
311 : }
312 :
313 : when_all_runner(when_all_runner const&) = delete;
314 : when_all_runner& operator=(when_all_runner const&) = delete;
315 : when_all_runner& operator=(when_all_runner&&) = delete;
316 :
317 151 : auto release() noexcept
318 : {
319 151 : return std::exchange(h_, nullptr);
320 : }
321 : };
322 :
323 : /** Create an io_result-aware runner for a single awaitable (range path).
324 :
325 : Checks the error code, records errors and requests stop on failure,
326 : or extracts the payload on success.
327 : */
328 : template<IoAwaitable Awaitable, typename StateType>
329 : when_all_runner<StateType>
330 34 : make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
331 : {
332 : auto result = co_await std::move(inner);
333 :
334 : if(result.ec)
335 : {
336 : state->record_error(result.ec);
337 : state->core_.stop_source_.request_stop();
338 : }
339 : else
340 : {
341 : using PayloadT = io_result_payload_t<
342 : awaitable_result_t<Awaitable>>;
343 : if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
344 : {
345 : state->set_result(index,
346 : extract_io_payload(std::move(result)));
347 : }
348 : }
349 68 : }
350 :
351 : /** Create a runner for io_result children that requests stop on ec. */
352 : template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
353 : when_all_runner<when_all_state<Ts...>>
354 99 : make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
355 : {
356 : auto result = co_await std::move(inner);
357 : auto ec = result.ec;
358 : std::get<Index>(state->results_).set(std::move(result));
359 :
360 : if(ec)
361 : {
362 : state->record_error(ec);
363 : state->core_.stop_source_.request_stop();
364 : }
365 198 : }
366 :
367 : /** Launcher that uses io_result-aware runners. */
368 : template<IoAwaitable... Awaitables>
369 : class when_all_io_launcher
370 : {
371 : using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
372 :
373 : std::tuple<Awaitables...>* awaitables_;
374 : state_type* state_;
375 :
376 : public:
377 51 : when_all_io_launcher(
378 : std::tuple<Awaitables...>* awaitables,
379 : state_type* state)
380 51 : : awaitables_(awaitables)
381 51 : , state_(state)
382 : {
383 51 : }
384 :
385 51 : bool await_ready() const noexcept
386 : {
387 51 : return sizeof...(Awaitables) == 0;
388 : }
389 :
390 51 : std::coroutine_handle<> await_suspend(
391 : std::coroutine_handle<> continuation, io_env const* caller_env)
392 : {
393 51 : state_->core_.continuation_.h = continuation;
394 51 : state_->core_.caller_env_ = caller_env;
395 :
396 51 : if(caller_env->stop_token.stop_possible())
397 : {
398 4 : state_->core_.parent_stop_callback_.emplace(
399 2 : caller_env->stop_token,
400 2 : when_all_core::stop_callback_fn{&state_->core_.stop_source_});
401 :
402 2 : if(caller_env->stop_token.stop_requested())
403 1 : state_->core_.stop_source_.request_stop();
404 : }
405 :
406 51 : auto token = state_->core_.stop_source_.get_token();
407 51 : launch_all(std::index_sequence_for<Awaitables...>{},
408 : caller_env->executor, token);
409 :
410 102 : return std::noop_coroutine();
411 51 : }
412 :
413 51 : void await_resume() const noexcept {}
414 :
415 : private:
416 : template<std::size_t... Is>
417 51 : void launch_all(std::index_sequence<Is...>,
418 : executor_ref ex, std::stop_token token)
419 : {
420 51 : (..., launch_one<Is>(ex, token));
421 51 : }
422 :
423 : template<std::size_t I>
424 99 : void launch_one(executor_ref caller_ex, std::stop_token token)
425 : {
426 99 : auto runner = make_when_all_io_runner<I>(
427 99 : std::move(std::get<I>(*awaitables_)), state_);
428 :
429 99 : auto h = runner.release();
430 99 : h.promise().state_ = state_;
431 99 : h.promise().env_ = io_env{caller_ex, token,
432 99 : state_->core_.caller_env_->frame_allocator};
433 :
434 99 : state_->runner_handles_[I].h = std::coroutine_handle<>{h};
435 99 : state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
436 198 : }
437 : };
438 :
439 : /** Helper to extract a single result from state.
440 : This is a separate function to work around a GCC-11 ICE that occurs
441 : when using nested immediately-invoked lambdas with pack expansion.
442 : */
443 : template<std::size_t I, typename... Ts>
444 71 : auto extract_single_result(when_all_state<Ts...>& state)
445 : {
446 71 : return std::move(std::get<I>(state.results_)).get();
447 : }
448 :
449 : /** Extract all results from state as a tuple.
450 : */
451 : template<typename... Ts>
452 37 : auto extract_results(when_all_state<Ts...>& state)
453 : {
454 57 : return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
455 37 : return std::tuple(extract_single_result<Is>(state)...);
456 74 : }(std::index_sequence_for<Ts...>{});
457 : }
458 :
459 : /** Launches all homogeneous runners concurrently.
460 :
461 : Two-phase approach: create all runners first, then post all.
462 : This avoids lifetime issues if a task completes synchronously.
463 : */
464 : template<typename Range>
465 : class when_all_homogeneous_launcher
466 : {
467 : using Awaitable = std::ranges::range_value_t<Range>;
468 : using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
469 :
470 : Range* range_;
471 : when_all_homogeneous_state<PayloadT>* state_;
472 :
473 : public:
474 15 : when_all_homogeneous_launcher(
475 : Range* range,
476 : when_all_homogeneous_state<PayloadT>* state)
477 15 : : range_(range)
478 15 : , state_(state)
479 : {
480 15 : }
481 :
482 15 : bool await_ready() const noexcept
483 : {
484 15 : return std::ranges::empty(*range_);
485 : }
486 :
487 15 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
488 : {
489 15 : state_->core_.continuation_.h = continuation;
490 15 : state_->core_.caller_env_ = caller_env;
491 :
492 15 : if(caller_env->stop_token.stop_possible())
493 : {
494 4 : state_->core_.parent_stop_callback_.emplace(
495 2 : caller_env->stop_token,
496 2 : when_all_core::stop_callback_fn{&state_->core_.stop_source_});
497 :
498 2 : if(caller_env->stop_token.stop_requested())
499 1 : state_->core_.stop_source_.request_stop();
500 : }
501 :
502 15 : auto token = state_->core_.stop_source_.get_token();
503 :
504 : // Phase 1: Create all runners without dispatching.
505 15 : std::size_t index = 0;
506 49 : for(auto&& a : *range_)
507 : {
508 34 : auto runner = make_when_all_homogeneous_runner(
509 34 : std::move(a), state_, index);
510 :
511 34 : auto h = runner.release();
512 34 : h.promise().state_ = state_;
513 34 : h.promise().index_ = index;
514 34 : h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
515 :
516 34 : state_->runner_handles_[index].h = std::coroutine_handle<>{h};
517 34 : ++index;
518 : }
519 :
520 : // Phase 2: Post all runners. Any may complete synchronously.
521 : // After last post, state_ and this may be destroyed.
522 15 : auto* handles = state_->runner_handles_.get();
523 15 : std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
524 49 : for(std::size_t i = 0; i < count; ++i)
525 34 : caller_env->executor.post(handles[i]);
526 :
527 30 : return std::noop_coroutine();
528 49 : }
529 :
530 15 : void await_resume() const noexcept
531 : {
532 15 : }
533 : };
534 :
535 : } // namespace detail
536 :
537 : /** Execute a range of io_result-returning awaitables concurrently.
538 :
539 : Launches all awaitables simultaneously and waits for all to complete.
540 : On success, extracted payloads are collected in a vector preserving
541 : input order. The first error_code cancels siblings and is propagated
542 : in the outer io_result. Exceptions always beat error codes.
543 :
544 : @li All child awaitables run concurrently on the caller's executor
545 : @li Payloads are returned as a vector in input order
546 : @li First error_code wins and cancels siblings
547 : @li Exception always beats error_code
548 : @li Completes only after all children have finished
549 :
550 : @par Thread Safety
551 : The returned task must be awaited from a single execution context.
552 : Child awaitables execute concurrently but complete through the caller's
553 : executor.
554 :
555 : @param awaitables Range of io_result-returning awaitables to execute
556 : concurrently (must not be empty).
557 :
558 : @return A task yielding io_result<vector<PayloadT>> where PayloadT
559 : is the payload extracted from each child's io_result.
560 :
561 : @throws std::invalid_argument if range is empty (thrown before
562 : coroutine suspends).
563 : @throws Rethrows the first child exception after all children
564 : complete (exception beats error_code).
565 :
566 : @par Example
567 : @code
568 : task<void> example()
569 : {
570 : std::vector<io_task<size_t>> reads;
571 : for (auto& buf : buffers)
572 : reads.push_back(stream.read_some(buf));
573 :
574 : auto [ec, counts] = co_await when_all(std::move(reads));
575 : if (ec) { // handle error
576 : }
577 : }
578 : @endcode
579 :
580 : @see IoAwaitableRange, when_all
581 : */
582 : template<IoAwaitableRange R>
583 : requires detail::is_io_result_v<
584 : awaitable_result_t<std::ranges::range_value_t<R>>>
585 : && (!std::is_same_v<
586 : detail::io_result_payload_t<
587 : awaitable_result_t<std::ranges::range_value_t<R>>>,
588 : std::tuple<>>)
589 13 : [[nodiscard]] auto when_all(R&& awaitables)
590 : -> task<io_result<std::vector<
591 : detail::io_result_payload_t<
592 : awaitable_result_t<std::ranges::range_value_t<R>>>>>>
593 : {
594 : using Awaitable = std::ranges::range_value_t<R>;
595 : using PayloadT = detail::io_result_payload_t<
596 : awaitable_result_t<Awaitable>>;
597 : using OwnedRange = std::remove_cvref_t<R>;
598 :
599 : auto count = std::ranges::size(awaitables);
600 : if(count == 0)
601 : throw std::invalid_argument("when_all requires at least one awaitable");
602 :
603 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
604 :
605 : detail::when_all_homogeneous_state<PayloadT> state(count);
606 :
607 : co_await detail::when_all_homogeneous_launcher<OwnedRange>(
608 : &owned_awaitables, &state);
609 :
610 : if(state.core_.first_exception_)
611 : std::rethrow_exception(state.core_.first_exception_);
612 :
613 : if(state.has_error_.load(std::memory_order_relaxed))
614 : co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
615 :
616 : std::vector<PayloadT> results;
617 : results.reserve(count);
618 : for(auto& opt : state.results_)
619 : results.push_back(std::move(*opt));
620 :
621 : co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
622 26 : }
623 :
624 : /** Execute a range of void io_result-returning awaitables concurrently.
625 :
626 : Launches all awaitables simultaneously and waits for all to complete.
627 : Since all awaitables return io_result<>, no payload values are
628 : collected. The first error_code cancels siblings and is propagated.
629 : Exceptions always beat error codes.
630 :
631 : @param awaitables Range of io_result<>-returning awaitables to
632 : execute concurrently (must not be empty).
633 :
634 : @return A task yielding io_result<> whose ec is the first child
635 : error, or default-constructed on success.
636 :
637 : @throws std::invalid_argument if range is empty.
638 : @throws Rethrows the first child exception after all children
639 : complete (exception beats error_code).
640 :
641 : @par Example
642 : @code
643 : task<void> example()
644 : {
645 : std::vector<io_task<>> jobs;
646 : for (int i = 0; i < n; ++i)
647 : jobs.push_back(process(i));
648 :
649 : auto [ec] = co_await when_all(std::move(jobs));
650 : }
651 : @endcode
652 :
653 : @see IoAwaitableRange, when_all
654 : */
655 : template<IoAwaitableRange R>
656 : requires detail::is_io_result_v<
657 : awaitable_result_t<std::ranges::range_value_t<R>>>
658 : && std::is_same_v<
659 : detail::io_result_payload_t<
660 : awaitable_result_t<std::ranges::range_value_t<R>>>,
661 : std::tuple<>>
662 4 : [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
663 : {
664 : using OwnedRange = std::remove_cvref_t<R>;
665 :
666 : auto count = std::ranges::size(awaitables);
667 : if(count == 0)
668 : throw std::invalid_argument("when_all requires at least one awaitable");
669 :
670 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
671 :
672 : detail::when_all_homogeneous_state<std::tuple<>> state(count);
673 :
674 : co_await detail::when_all_homogeneous_launcher<OwnedRange>(
675 : &owned_awaitables, &state);
676 :
677 : if(state.core_.first_exception_)
678 : std::rethrow_exception(state.core_.first_exception_);
679 :
680 : if(state.has_error_.load(std::memory_order_relaxed))
681 : co_return io_result<>{state.first_error_};
682 :
683 : co_return io_result<>{};
684 8 : }
685 :
686 : /** Execute io_result-returning awaitables concurrently, inspecting error codes.
687 :
688 : Overload selected when all children return io_result<Ts...>.
689 : The error_code is lifted out of each child into a single outer
690 : io_result. On success all values are returned; on failure the
691 : first error_code wins.
692 :
693 : @par Exception Safety
694 : Exception always beats error_code. If any child throws, the
695 : exception is rethrown regardless of error_code results.
696 :
697 : @param awaitables One or more awaitables each returning
698 : io_result<Ts...>.
699 :
700 : @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
701 : follows the payload flattening rules.
702 : */
703 : template<IoAwaitable... As>
704 : requires (sizeof...(As) > 0)
705 : && detail::all_io_result_awaitables<As...>
706 51 : [[nodiscard]] auto when_all(As... awaitables)
707 : -> task<io_result<
708 : detail::io_result_payload_t<awaitable_result_t<As>>...>>
709 : {
710 : using result_type = io_result<
711 : detail::io_result_payload_t<awaitable_result_t<As>>...>;
712 :
713 : detail::when_all_state<awaitable_result_t<As>...> state;
714 : std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
715 :
716 : co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
717 :
718 : // Exception always wins over error_code
719 : if(state.core_.first_exception_)
720 : std::rethrow_exception(state.core_.first_exception_);
721 :
722 : auto r = detail::build_when_all_io_result<result_type>(
723 : detail::extract_results(state));
724 : if(state.has_error_.load(std::memory_order_relaxed))
725 : r.ec = state.first_error_;
726 : co_return r;
727 102 : }
728 :
729 : } // namespace capy
730 : } // namespace boost
731 :
732 : #endif
|