include/boost/capy/io/any_buffer_sink.hpp

100.0% Lines (220/220) 86.3% List of functions (63/73)
any_buffer_sink.hpp
f(x) Functions (73)
Function Calls Lines Blocks
boost::capy::any_buffer_sink::any_buffer_sink(boost::capy::any_buffer_sink&&) :161 2x 100.0% 100.0% boost::capy::any_buffer_sink::has_value() const :213 26x 100.0% 100.0% boost::capy::any_buffer_sink::operator bool() const :224 3x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::do_destroy_impl(void*) :466 7x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::do_destroy_impl(void*) :466 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::do_destroy_impl(void*) :466 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::test::buffer_sink>::do_destroy_impl(void*) :466 11x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::do_prepare_impl(void*, std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :472 6x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::do_prepare_impl(void*, std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :472 2x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::do_prepare_impl(void*, std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :472 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::test::buffer_sink>::do_prepare_impl(void*, std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :472 124x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_commit_awaitable_impl(void*, void*, unsigned long) :481 6x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_commit_awaitable_impl(void*, void*, unsigned long) :481 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_commit_awaitable_impl(void*, void*, unsigned long) :481 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::test::buffer_sink>::construct_commit_awaitable_impl(void*, void*, unsigned long) :481 103x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_commit_eof_awaitable_impl(void*, void*, unsigned long) :508 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_commit_eof_awaitable_impl(void*, void*, unsigned long) :508 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_commit_eof_awaitable_impl(void*, void*, unsigned long) :508 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::test::buffer_sink>::construct_commit_eof_awaitable_impl(void*, void*, unsigned long) :508 70x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_write_some_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :535 6x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_write_some_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :535 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_write_some_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :535 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_write_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :565 14x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_write_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :565 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_write_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :565 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_write_eof_buffers_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :595 12x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_write_eof_buffers_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :595 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_write_eof_buffers_awaitable_impl(void*, void*, std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :595 0 0.0% 0.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::buffer_write_sink>::construct_write_eof_awaitable_impl(void*, void*) :625 16x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::resuming_buffer_sink>::construct_write_eof_awaitable_impl(void*, void*) :625 1x 100.0% 100.0% boost::capy::any_buffer_sink::vtable_for_impl<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>::construct_write_eof_awaitable_impl(void*, void*) :625 0 0.0% 0.0% boost::capy::any_buffer_sink::~any_buffer_sink() :733 218x 100.0% 100.0% boost::capy::any_buffer_sink::operator=(boost::capy::any_buffer_sink&&) :745 5x 100.0% 100.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::(anonymous namespace)::buffer_write_sink>(boost::capy::(anonymous namespace)::buffer_write_sink) :768 7x 100.0% 80.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::(anonymous namespace)::throwing_move_buffer_sink>(boost::capy::(anonymous namespace)::throwing_move_buffer_sink) :768 1x 75.0% 77.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::test::buffer_sink>(boost::capy::test::buffer_sink) :768 11x 100.0% 80.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::(anonymous namespace)::buffer_write_sink>(boost::capy::(anonymous namespace)::buffer_write_sink*) :794 49x 100.0% 100.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::(anonymous namespace)::resuming_buffer_sink>(boost::capy::(anonymous namespace)::resuming_buffer_sink*) :794 1x 100.0% 100.0% boost::capy::any_buffer_sink::any_buffer_sink<boost::capy::test::buffer_sink>(boost::capy::test::buffer_sink*) :794 145x 100.0% 100.0% boost::capy::any_buffer_sink::prepare(std::span<boost::capy::mutable_buffer, 18446744073709551615ul>) :802 132x 100.0% 100.0% boost::capy::any_buffer_sink::commit(unsigned long) :808 110x 100.0% 100.0% boost::capy::any_buffer_sink::commit(unsigned long)::awaitable::await_ready() :816 110x 100.0% 100.0% boost::capy::any_buffer_sink::commit(unsigned long)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :826 1x 100.0% 100.0% boost::capy::any_buffer_sink::commit(unsigned long)::awaitable::await_resume() :833 110x 100.0% 100.0% boost::capy::any_buffer_sink::commit(unsigned long)::awaitable::await_resume()::guard::~guard() :837 110x 100.0% 100.0% boost::capy::any_buffer_sink::commit_eof(unsigned long) :850 55x 100.0% 100.0% boost::capy::any_buffer_sink::commit_eof(unsigned long)::awaitable::await_ready() :858 55x 100.0% 100.0% boost::capy::any_buffer_sink::commit_eof(unsigned long)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :868 1x 100.0% 100.0% boost::capy::any_buffer_sink::commit_eof(unsigned long)::awaitable::await_resume() :875 55x 100.0% 100.0% boost::capy::any_buffer_sink::commit_eof(unsigned long)::awaitable::await_resume()::guard::~guard() :879 55x 100.0% 100.0% boost::capy::any_buffer_sink::write_some_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :892 7x 100.0% 100.0% boost::capy::any_buffer_sink::write_some_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_ready() const :901 7x 100.0% 100.0% boost::capy::any_buffer_sink::write_some_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :907 7x 100.0% 100.0% boost::capy::any_buffer_sink::write_some_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume() :924 7x 100.0% 100.0% boost::capy::any_buffer_sink::write_some_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume()::guard::~guard() :928 7x 100.0% 100.0% boost::capy::any_buffer_sink::write_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :942 15x 100.0% 100.0% boost::capy::any_buffer_sink::write_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_ready() const :951 15x 100.0% 100.0% boost::capy::any_buffer_sink::write_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :957 15x 100.0% 100.0% boost::capy::any_buffer_sink::write_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume() :974 15x 100.0% 100.0% boost::capy::any_buffer_sink::write_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume()::guard::~guard() :978 15x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof_buffers_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>) :992 13x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof_buffers_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_ready() const :1001 13x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof_buffers_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :1007 13x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof_buffers_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume() :1024 13x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof_buffers_(std::span<boost::capy::const_buffer const, 18446744073709551615ul>)::awaitable::await_resume()::guard::~guard() :1028 13x 100.0% 100.0% boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_buffer_sink::write_some<boost::capy::const_buffer>(boost::capy::const_buffer) :1043 23x 100.0% 44.0% boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_buffer_sink::write<boost::capy::const_buffer>(boost::capy::const_buffer) :1076 39x 100.0% 44.0% boost::capy::any_buffer_sink::write_eof() :1128 33x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof()::awaitable::await_ready() :1135 33x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof()::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :1159 1x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof()::awaitable::await_resume() :1166 33x 100.0% 100.0% boost::capy::any_buffer_sink::write_eof()::awaitable::await_resume()::guard::~guard() :1170 33x 100.0% 100.0% boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_buffer_sink::write_eof<boost::capy::const_buffer>(boost::capy::const_buffer) :1184 41x 100.0% 44.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/buffer_sink.hpp>
19 #include <boost/capy/concept/io_awaitable.hpp>
20 #include <boost/capy/concept/write_sink.hpp>
21 #include <boost/capy/ex/io_env.hpp>
22 #include <boost/capy/io_result.hpp>
23 #include <boost/capy/io_task.hpp>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <exception>
29 #include <new>
30 #include <span>
31 #include <stop_token>
32 #include <system_error>
33 #include <utility>
34
35 namespace boost {
36 namespace capy {
37
38 /** Type-erased wrapper for any BufferSink.
39
40 This class provides type erasure for any type satisfying the
41 @ref BufferSink concept, enabling runtime polymorphism for
42 buffer sink operations. It uses cached awaitable storage to achieve
43 zero steady-state allocation after construction.
44
45 The wrapper exposes two interfaces for producing data:
46 the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47 and the @ref WriteSink interface (`write_some`, `write`,
48 `write_eof`). Choose the interface that matches how your data
49 is produced:
50
51 @par Choosing an Interface
52
53 Use the **BufferSink** interface when you are a generator that
54 produces data into externally-provided buffers. The sink owns
55 the memory; you call @ref prepare to obtain writable buffers,
56 fill them, then call @ref commit or @ref commit_eof.
57
58 Use the **WriteSink** interface when you already have buffers
59 containing the data to write:
60 - If the entire body is available up front, call
61 @ref write_eof(buffers) to send everything atomically.
62 - If data arrives incrementally, call @ref write or
63 @ref write_some in a loop, then @ref write_eof() when done.
64 Prefer `write` (complete) unless your streaming pattern
65 benefits from partial writes via `write_some`.
66
67 If the wrapped type only satisfies @ref BufferSink, the
68 @ref WriteSink operations are provided automatically.
69
70 @par Construction Modes
71
72 - **Owning**: Pass by value to transfer ownership. The wrapper
73 allocates storage and owns the sink.
74 - **Reference**: Pass a pointer to wrap without ownership. The
75 pointed-to sink must outlive this wrapper.
76
77 @par Awaitable Preallocation
78 The constructor preallocates storage for the type-erased awaitable.
79 This reserves all virtual address space at server startup
80 so memory usage can be measured up front, rather than
81 allocating piecemeal as traffic arrives.
82
83 @par Thread Safety
84 Not thread-safe. Concurrent operations on the same wrapper
85 are undefined behavior.
86
87 @par Example
88 @code
89 // Owning - takes ownership of the sink
90 any_buffer_sink abs(some_buffer_sink{args...});
91
92 // Reference - wraps without ownership
93 some_buffer_sink sink;
94 any_buffer_sink abs(&sink);
95
96 // BufferSink interface: generate into callee-owned buffers
97 mutable_buffer arr[16];
98 auto bufs = abs.prepare(arr);
99 // Write data into bufs[0..bufs.size())
100 auto [ec] = co_await abs.commit(bytes_written);
101 auto [ec2] = co_await abs.commit_eof(0);
102
103 // WriteSink interface: send caller-owned buffers
104 auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105 auto [ec4] = co_await abs.write_eof();
106
107 // Or send everything at once
108 auto [ec5, n2] = co_await abs.write_eof(
109 make_buffer(body_data));
110 @endcode
111
112 @see any_buffer_source, BufferSink, WriteSink
113 */
114 class any_buffer_sink
115 {
116 struct vtable;
117 struct awaitable_ops;
118 struct write_awaitable_ops;
119
120 template<BufferSink S>
121 struct vtable_for_impl;
122
123 // hot-path members first for cache locality
124 void* sink_ = nullptr;
125 vtable const* vt_ = nullptr;
126 void* cached_awaitable_ = nullptr;
127 awaitable_ops const* active_ops_ = nullptr;
128 write_awaitable_ops const* active_write_ops_ = nullptr;
129 void* storage_ = nullptr;
130
131 public:
132 /** Destructor.
133
134 Destroys the owned sink (if any) and releases the cached
135 awaitable storage.
136 */
137 ~any_buffer_sink();
138
139 /** Construct a default instance.
140
141 Constructs an empty wrapper. Operations on a default-constructed
142 wrapper result in undefined behavior.
143 */
144 any_buffer_sink() = default;
145
146 /** Non-copyable.
147
148 The awaitable cache is per-instance and cannot be shared.
149 */
150 any_buffer_sink(any_buffer_sink const&) = delete;
151 any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152
153 /** Construct by moving.
154
155 Transfers ownership of the wrapped sink (if owned) and
156 cached awaitable storage from `other`. After the move, `other` is
157 in a default-constructed state.
158
159 @param other The wrapper to move from.
160 */
161 2x any_buffer_sink(any_buffer_sink&& other) noexcept
162 2x : sink_(std::exchange(other.sink_, nullptr))
163 2x , vt_(std::exchange(other.vt_, nullptr))
164 2x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165 2x , active_ops_(std::exchange(other.active_ops_, nullptr))
166 2x , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167 2x , storage_(std::exchange(other.storage_, nullptr))
168 {
169 2x }
170
171 /** Assign by moving.
172
173 Destroys any owned sink and releases existing resources,
174 then transfers ownership from `other`.
175
176 @param other The wrapper to move from.
177 @return Reference to this wrapper.
178 */
179 any_buffer_sink&
180 operator=(any_buffer_sink&& other) noexcept;
181
182 /** Construct by taking ownership of a BufferSink.
183
184 Allocates storage and moves the sink into this wrapper.
185 The wrapper owns the sink and will destroy it. If `S` also
186 satisfies @ref WriteSink, native write operations are
187 forwarded through the virtual boundary.
188
189 @param s The sink to take ownership of.
190 */
191 template<BufferSink S>
192 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193 any_buffer_sink(S s);
194
195 /** Construct by wrapping a BufferSink without ownership.
196
197 Wraps the given sink by pointer. The sink must remain
198 valid for the lifetime of this wrapper. If `S` also
199 satisfies @ref WriteSink, native write operations are
200 forwarded through the virtual boundary.
201
202 @param s Pointer to the sink to wrap.
203 */
204 template<BufferSink S>
205 any_buffer_sink(S* s);
206
207 /** Check if the wrapper contains a valid sink.
208
209 @return `true` if wrapping a sink, `false` if default-constructed
210 or moved-from.
211 */
212 bool
213 26x has_value() const noexcept
214 {
215 26x return sink_ != nullptr;
216 }
217
218 /** Check if the wrapper contains a valid sink.
219
220 @return `true` if wrapping a sink, `false` if default-constructed
221 or moved-from.
222 */
223 explicit
224 3x operator bool() const noexcept
225 {
226 3x return has_value();
227 }
228
229 /** Prepare writable buffers.
230
231 Fills the provided span with mutable buffer descriptors
232 pointing to the underlying sink's internal storage. This
233 operation is synchronous.
234
235 @param dest Span of mutable_buffer to fill.
236
237 @return A span of filled buffers.
238
239 @par Preconditions
240 The wrapper must contain a valid sink (`has_value() == true`).
241 */
242 std::span<mutable_buffer>
243 prepare(std::span<mutable_buffer> dest);
244
245 /** Commit bytes written to the prepared buffers.
246
247 Commits `n` bytes written to the buffers returned by the
248 most recent call to @ref prepare. The operation may trigger
249 underlying I/O.
250
251 @param n The number of bytes to commit.
252
253 @return An awaitable that await-returns `(error_code)`.
254
255 @par Preconditions
256 The wrapper must contain a valid sink (`has_value() == true`).
257 */
258 auto
259 commit(std::size_t n);
260
261 /** Commit final bytes and signal end-of-stream.
262
263 Commits `n` bytes written to the buffers returned by the
264 most recent call to @ref prepare and finalizes the sink.
265 After success, no further operations are permitted.
266
267 @param n The number of bytes to commit.
268
269 @return An awaitable that await-returns `(error_code)`.
270
271 @par Preconditions
272 The wrapper must contain a valid sink (`has_value() == true`).
273 */
274 auto
275 commit_eof(std::size_t n);
276
277 /** Write some data from a buffer sequence.
278
279 Attempt to write up to `buffer_size( buffers )` bytes from
280 the buffer sequence to the underlying sink. May consume less
281 than the full sequence.
282
283 When the wrapped type provides native @ref WriteSink support,
284 the operation forwards directly. Otherwise it is synthesized
285 from @ref prepare and @ref commit with a buffer copy.
286
287 @param buffers The buffer sequence to write.
288
289 @return An awaitable that await-returns `(error_code,std::size_t)`.
290
291 @par Preconditions
292 The wrapper must contain a valid sink (`has_value() == true`).
293 */
294 template<ConstBufferSequence CB>
295 io_task<std::size_t>
296 write_some(CB buffers);
297
298 /** Write all data from a buffer sequence.
299
300 Writes all data from the buffer sequence to the underlying
301 sink. This method satisfies the @ref WriteSink concept.
302
303 When the wrapped type provides native @ref WriteSink support,
304 each window is forwarded directly. Otherwise the data is
305 copied into the sink via @ref prepare and @ref commit.
306
307 @param buffers The buffer sequence to write.
308
309 @return An awaitable that await-returns `(error_code,std::size_t)`.
310
311 @par Preconditions
312 The wrapper must contain a valid sink (`has_value() == true`).
313 */
314 template<ConstBufferSequence CB>
315 io_task<std::size_t>
316 write(CB buffers);
317
318 /** Atomically write data and signal end-of-stream.
319
320 Writes all data from the buffer sequence to the underlying
321 sink and then signals end-of-stream.
322
323 When the wrapped type provides native @ref WriteSink support,
324 the final window is sent atomically via the underlying
325 `write_eof(buffers)`. Otherwise the data is synthesized
326 through @ref prepare, @ref commit, and @ref commit_eof.
327
328 @param buffers The buffer sequence to write.
329
330 @return An awaitable that await-returns `(error_code,std::size_t)`.
331
332 @par Preconditions
333 The wrapper must contain a valid sink (`has_value() == true`).
334 */
335 template<ConstBufferSequence CB>
336 io_task<std::size_t>
337 write_eof(CB buffers);
338
339 /** Signal end-of-stream.
340
341 Indicates that no more data will be written to the sink.
342 This method satisfies the @ref WriteSink concept.
343
344 When the wrapped type provides native @ref WriteSink support,
345 the underlying `write_eof()` is called. Otherwise the
346 operation is implemented as `commit_eof(0)`.
347
348 @return An awaitable that await-returns `(error_code)`.
349
350 @par Preconditions
351 The wrapper must contain a valid sink (`has_value() == true`).
352 */
353 auto
354 write_eof();
355
356 protected:
357 /** Rebind to a new sink after move.
358
359 Updates the internal pointer to reference a new sink object.
360 Used by owning wrappers after move assignment when the owned
361 object has moved to a new location.
362
363 @param new_sink The new sink to bind to. Must be the same
364 type as the original sink.
365
366 @note Terminates if called with a sink of different type
367 than the original.
368 */
369 template<BufferSink S>
370 void
371 rebind(S& new_sink) noexcept
372 {
373 if(vt_ != &vtable_for_impl<S>::value)
374 std::terminate();
375 sink_ = &new_sink;
376 }
377
378 private:
379 /** Forward a partial write through the vtable.
380
381 Constructs the underlying `write_some` awaitable in
382 cached storage and returns a type-erased awaitable.
383 */
384 auto
385 write_some_(std::span<const_buffer const> buffers);
386
387 /** Forward a complete write through the vtable.
388
389 Constructs the underlying `write` awaitable in
390 cached storage and returns a type-erased awaitable.
391 */
392 auto
393 write_(std::span<const_buffer const> buffers);
394
395 /** Forward an atomic write-with-EOF through the vtable.
396
397 Constructs the underlying `write_eof(buffers)` awaitable
398 in cached storage and returns a type-erased awaitable.
399 */
400 auto
401 write_eof_buffers_(std::span<const_buffer const> buffers);
402 };
403
404 /** Type-erased ops for awaitables that await-return `io_result<>`. */
405 struct any_buffer_sink::awaitable_ops
406 {
407 bool (*await_ready)(void*);
408 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
409 io_result<> (*await_resume)(void*);
410 void (*destroy)(void*) noexcept;
411 };
412
413 /** Type-erased ops for awaitables that await-return `io_result<std::size_t>`. */
414 struct any_buffer_sink::write_awaitable_ops
415 {
416 bool (*await_ready)(void*);
417 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
418 io_result<std::size_t> (*await_resume)(void*);
419 void (*destroy)(void*) noexcept;
420 };
421
422 struct any_buffer_sink::vtable
423 {
424 void (*destroy)(void*) noexcept;
425 std::span<mutable_buffer> (*do_prepare)(
426 void* sink,
427 std::span<mutable_buffer> dest);
428 std::size_t awaitable_size;
429 std::size_t awaitable_align;
430 awaitable_ops const* (*construct_commit_awaitable)(
431 void* sink,
432 void* storage,
433 std::size_t n);
434 awaitable_ops const* (*construct_commit_eof_awaitable)(
435 void* sink,
436 void* storage,
437 std::size_t n);
438
439 // WriteSink forwarding (null when wrapped type is BufferSink-only)
440 write_awaitable_ops const* (*construct_write_some_awaitable)(
441 void* sink,
442 void* storage,
443 std::span<const_buffer const> buffers);
444 write_awaitable_ops const* (*construct_write_awaitable)(
445 void* sink,
446 void* storage,
447 std::span<const_buffer const> buffers);
448 write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
449 void* sink,
450 void* storage,
451 std::span<const_buffer const> buffers);
452 awaitable_ops const* (*construct_write_eof_awaitable)(
453 void* sink,
454 void* storage);
455 };
456
457 template<BufferSink S>
458 struct any_buffer_sink::vtable_for_impl
459 {
460 using CommitAwaitable = decltype(std::declval<S&>().commit(
461 std::size_t{}));
462 using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
463 std::size_t{}));
464
465 static void
466 18x do_destroy_impl(void* sink) noexcept
467 {
468 18x static_cast<S*>(sink)->~S();
469 18x }
470
471 static std::span<mutable_buffer>
472 132x do_prepare_impl(
473 void* sink,
474 std::span<mutable_buffer> dest)
475 {
476 132x auto& s = *static_cast<S*>(sink);
477 132x return s.prepare(dest);
478 }
479
480 static awaitable_ops const*
481 110x construct_commit_awaitable_impl(
482 void* sink,
483 void* storage,
484 std::size_t n)
485 {
486 110x auto& s = *static_cast<S*>(sink);
487 110x ::new(storage) CommitAwaitable(s.commit(n));
488
489 static constexpr awaitable_ops ops = {
490 +[](void* p) {
491 return static_cast<CommitAwaitable*>(p)->await_ready();
492 },
493 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
494 return detail::call_await_suspend(
495 static_cast<CommitAwaitable*>(p), h, env);
496 },
497 +[](void* p) {
498 return static_cast<CommitAwaitable*>(p)->await_resume();
499 },
500 +[](void* p) noexcept {
501 static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
502 }
503 };
504 110x return &ops;
505 }
506
507 static awaitable_ops const*
508 71x construct_commit_eof_awaitable_impl(
509 void* sink,
510 void* storage,
511 std::size_t n)
512 {
513 71x auto& s = *static_cast<S*>(sink);
514 71x ::new(storage) CommitEofAwaitable(s.commit_eof(n));
515
516 static constexpr awaitable_ops ops = {
517 +[](void* p) {
518 return static_cast<CommitEofAwaitable*>(p)->await_ready();
519 },
520 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
521 return detail::call_await_suspend(
522 static_cast<CommitEofAwaitable*>(p), h, env);
523 },
524 +[](void* p) {
525 return static_cast<CommitEofAwaitable*>(p)->await_resume();
526 },
527 +[](void* p) noexcept {
528 static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
529 }
530 };
531 71x return &ops;
532 }
533
534 static write_awaitable_ops const*
535 7x construct_write_some_awaitable_impl(
536 void* sink,
537 void* storage,
538 std::span<const_buffer const> buffers)
539 requires WriteSink<S>
540 {
541 using Aw = decltype(std::declval<S&>().write_some(
542 std::span<const_buffer const>{}));
543 7x auto& s = *static_cast<S*>(sink);
544 7x ::new(storage) Aw(s.write_some(buffers));
545
546 static constexpr write_awaitable_ops ops = {
547 +[](void* p) {
548 return static_cast<Aw*>(p)->await_ready();
549 },
550 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
551 return detail::call_await_suspend(
552 static_cast<Aw*>(p), h, env);
553 },
554 +[](void* p) {
555 return static_cast<Aw*>(p)->await_resume();
556 },
557 +[](void* p) noexcept {
558 static_cast<Aw*>(p)->~Aw();
559 }
560 };
561 7x return &ops;
562 }
563
564 static write_awaitable_ops const*
565 15x construct_write_awaitable_impl(
566 void* sink,
567 void* storage,
568 std::span<const_buffer const> buffers)
569 requires WriteSink<S>
570 {
571 using Aw = decltype(std::declval<S&>().write(
572 std::span<const_buffer const>{}));
573 15x auto& s = *static_cast<S*>(sink);
574 15x ::new(storage) Aw(s.write(buffers));
575
576 static constexpr write_awaitable_ops ops = {
577 +[](void* p) {
578 return static_cast<Aw*>(p)->await_ready();
579 },
580 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
581 return detail::call_await_suspend(
582 static_cast<Aw*>(p), h, env);
583 },
584 +[](void* p) {
585 return static_cast<Aw*>(p)->await_resume();
586 },
587 +[](void* p) noexcept {
588 static_cast<Aw*>(p)->~Aw();
589 }
590 };
591 15x return &ops;
592 }
593
594 static write_awaitable_ops const*
595 13x construct_write_eof_buffers_awaitable_impl(
596 void* sink,
597 void* storage,
598 std::span<const_buffer const> buffers)
599 requires WriteSink<S>
600 {
601 using Aw = decltype(std::declval<S&>().write_eof(
602 std::span<const_buffer const>{}));
603 13x auto& s = *static_cast<S*>(sink);
604 13x ::new(storage) Aw(s.write_eof(buffers));
605
606 static constexpr write_awaitable_ops ops = {
607 +[](void* p) {
608 return static_cast<Aw*>(p)->await_ready();
609 },
610 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
611 return detail::call_await_suspend(
612 static_cast<Aw*>(p), h, env);
613 },
614 +[](void* p) {
615 return static_cast<Aw*>(p)->await_resume();
616 },
617 +[](void* p) noexcept {
618 static_cast<Aw*>(p)->~Aw();
619 }
620 };
621 13x return &ops;
622 }
623
624 static awaitable_ops const*
625 17x construct_write_eof_awaitable_impl(
626 void* sink,
627 void* storage)
628 requires WriteSink<S>
629 {
630 using Aw = decltype(std::declval<S&>().write_eof());
631 17x auto& s = *static_cast<S*>(sink);
632 17x ::new(storage) Aw(s.write_eof());
633
634 static constexpr awaitable_ops ops = {
635 +[](void* p) {
636 return static_cast<Aw*>(p)->await_ready();
637 },
638 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
639 return detail::call_await_suspend(
640 static_cast<Aw*>(p), h, env);
641 },
642 +[](void* p) {
643 return static_cast<Aw*>(p)->await_resume();
644 },
645 +[](void* p) noexcept {
646 static_cast<Aw*>(p)->~Aw();
647 }
648 };
649 17x return &ops;
650 }
651
652 static consteval std::size_t
653 compute_max_size() noexcept
654 {
655 std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
656 ? sizeof(CommitAwaitable)
657 : sizeof(CommitEofAwaitable);
658 if constexpr (WriteSink<S>)
659 {
660 using WS = decltype(std::declval<S&>().write_some(
661 std::span<const_buffer const>{}));
662 using W = decltype(std::declval<S&>().write(
663 std::span<const_buffer const>{}));
664 using WEB = decltype(std::declval<S&>().write_eof(
665 std::span<const_buffer const>{}));
666 using WE = decltype(std::declval<S&>().write_eof());
667
668 if(sizeof(WS) > s) s = sizeof(WS);
669 if(sizeof(W) > s) s = sizeof(W);
670 if(sizeof(WEB) > s) s = sizeof(WEB);
671 if(sizeof(WE) > s) s = sizeof(WE);
672 }
673 return s;
674 }
675
676 static consteval std::size_t
677 compute_max_align() noexcept
678 {
679 std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
680 ? alignof(CommitAwaitable)
681 : alignof(CommitEofAwaitable);
682 if constexpr (WriteSink<S>)
683 {
684 using WS = decltype(std::declval<S&>().write_some(
685 std::span<const_buffer const>{}));
686 using W = decltype(std::declval<S&>().write(
687 std::span<const_buffer const>{}));
688 using WEB = decltype(std::declval<S&>().write_eof(
689 std::span<const_buffer const>{}));
690 using WE = decltype(std::declval<S&>().write_eof());
691
692 if(alignof(WS) > a) a = alignof(WS);
693 if(alignof(W) > a) a = alignof(W);
694 if(alignof(WEB) > a) a = alignof(WEB);
695 if(alignof(WE) > a) a = alignof(WE);
696 }
697 return a;
698 }
699
700 static consteval vtable
701 make_vtable() noexcept
702 {
703 vtable v{};
704 v.destroy = &do_destroy_impl;
705 v.do_prepare = &do_prepare_impl;
706 v.awaitable_size = compute_max_size();
707 v.awaitable_align = compute_max_align();
708 v.construct_commit_awaitable = &construct_commit_awaitable_impl;
709 v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
710 v.construct_write_some_awaitable = nullptr;
711 v.construct_write_awaitable = nullptr;
712 v.construct_write_eof_buffers_awaitable = nullptr;
713 v.construct_write_eof_awaitable = nullptr;
714
715 if constexpr (WriteSink<S>)
716 {
717 v.construct_write_some_awaitable =
718 &construct_write_some_awaitable_impl;
719 v.construct_write_awaitable =
720 &construct_write_awaitable_impl;
721 v.construct_write_eof_buffers_awaitable =
722 &construct_write_eof_buffers_awaitable_impl;
723 v.construct_write_eof_awaitable =
724 &construct_write_eof_awaitable_impl;
725 }
726 return v;
727 }
728
729 static constexpr vtable value = make_vtable();
730 };
731
732 inline
733 218x any_buffer_sink::~any_buffer_sink()
734 {
735 218x if(storage_)
736 {
737 17x vt_->destroy(sink_);
738 17x ::operator delete(storage_);
739 }
740 218x if(cached_awaitable_)
741 211x ::operator delete(cached_awaitable_);
742 218x }
743
744 inline any_buffer_sink&
745 5x any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
746 {
747 5x if(this != &other)
748 {
749 4x if(storage_)
750 {
751 1x vt_->destroy(sink_);
752 1x ::operator delete(storage_);
753 }
754 4x if(cached_awaitable_)
755 2x ::operator delete(cached_awaitable_);
756 4x sink_ = std::exchange(other.sink_, nullptr);
757 4x vt_ = std::exchange(other.vt_, nullptr);
758 4x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
759 4x storage_ = std::exchange(other.storage_, nullptr);
760 4x active_ops_ = std::exchange(other.active_ops_, nullptr);
761 4x active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
762 }
763 5x return *this;
764 }
765
766 template<BufferSink S>
767 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
768 19x any_buffer_sink::any_buffer_sink(S s)
769 19x : vt_(&vtable_for_impl<S>::value)
770 {
771 struct guard {
772 any_buffer_sink* self;
773 bool committed = false;
774 ~guard() {
775 if(!committed && self->storage_) {
776 if(self->sink_)
777 self->vt_->destroy(self->sink_); // LCOV_EXCL_LINE OOM rollback: only when the cached-awaitable allocation throws
778 ::operator delete(self->storage_);
779 self->storage_ = nullptr;
780 self->sink_ = nullptr;
781 }
782 }
783 19x } g{this};
784
785 19x storage_ = ::operator new(sizeof(S));
786 19x sink_ = ::new(storage_) S(std::move(s));
787
788 18x cached_awaitable_ = ::operator new(vt_->awaitable_size);
789
790 18x g.committed = true;
791 19x }
792
793 template<BufferSink S>
794 195x any_buffer_sink::any_buffer_sink(S* s)
795 195x : sink_(s)
796 195x , vt_(&vtable_for_impl<S>::value)
797 {
798 195x cached_awaitable_ = ::operator new(vt_->awaitable_size);
799 195x }
800
801 inline std::span<mutable_buffer>
802 132x any_buffer_sink::prepare(std::span<mutable_buffer> dest)
803 {
804 132x return vt_->do_prepare(sink_, dest);
805 }
806
807 inline auto
808 110x any_buffer_sink::commit(std::size_t n)
809 {
810 struct awaitable
811 {
812 any_buffer_sink* self_;
813 std::size_t n_;
814
815 bool
816 110x await_ready()
817 {
818 220x self_->active_ops_ = self_->vt_->construct_commit_awaitable(
819 110x self_->sink_,
820 110x self_->cached_awaitable_,
821 n_);
822 110x return self_->active_ops_->await_ready(self_->cached_awaitable_);
823 }
824
825 std::coroutine_handle<>
826 1x await_suspend(std::coroutine_handle<> h, io_env const* env)
827 {
828 1x return self_->active_ops_->await_suspend(
829 1x self_->cached_awaitable_, h, env);
830 }
831
832 io_result<>
833 110x await_resume()
834 {
835 struct guard {
836 any_buffer_sink* self;
837 110x ~guard() {
838 110x self->active_ops_->destroy(self->cached_awaitable_);
839 110x self->active_ops_ = nullptr;
840 110x }
841 110x } g{self_};
842 110x return self_->active_ops_->await_resume(
843 193x self_->cached_awaitable_);
844 110x }
845 };
846 110x return awaitable{this, n};
847 }
848
849 inline auto
850 55x any_buffer_sink::commit_eof(std::size_t n)
851 {
852 struct awaitable
853 {
854 any_buffer_sink* self_;
855 std::size_t n_;
856
857 bool
858 55x await_ready()
859 {
860 110x self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
861 55x self_->sink_,
862 55x self_->cached_awaitable_,
863 n_);
864 55x return self_->active_ops_->await_ready(self_->cached_awaitable_);
865 }
866
867 std::coroutine_handle<>
868 1x await_suspend(std::coroutine_handle<> h, io_env const* env)
869 {
870 1x return self_->active_ops_->await_suspend(
871 1x self_->cached_awaitable_, h, env);
872 }
873
874 io_result<>
875 55x await_resume()
876 {
877 struct guard {
878 any_buffer_sink* self;
879 55x ~guard() {
880 55x self->active_ops_->destroy(self->cached_awaitable_);
881 55x self->active_ops_ = nullptr;
882 55x }
883 55x } g{self_};
884 55x return self_->active_ops_->await_resume(
885 94x self_->cached_awaitable_);
886 55x }
887 };
888 55x return awaitable{this, n};
889 }
890
891 inline auto
892 7x any_buffer_sink::write_some_(
893 std::span<const_buffer const> buffers)
894 {
895 struct awaitable
896 {
897 any_buffer_sink* self_;
898 std::span<const_buffer const> buffers_;
899
900 bool
901 7x await_ready() const noexcept
902 {
903 7x return false;
904 }
905
906 std::coroutine_handle<>
907 7x await_suspend(std::coroutine_handle<> h, io_env const* env)
908 {
909 14x self_->active_write_ops_ =
910 14x self_->vt_->construct_write_some_awaitable(
911 7x self_->sink_,
912 7x self_->cached_awaitable_,
913 buffers_);
914
915 7x if(self_->active_write_ops_->await_ready(
916 7x self_->cached_awaitable_))
917 6x return h;
918
919 1x return self_->active_write_ops_->await_suspend(
920 1x self_->cached_awaitable_, h, env);
921 }
922
923 io_result<std::size_t>
924 7x await_resume()
925 {
926 struct guard {
927 any_buffer_sink* self;
928 7x ~guard() {
929 7x self->active_write_ops_->destroy(
930 7x self->cached_awaitable_);
931 7x self->active_write_ops_ = nullptr;
932 7x }
933 7x } g{self_};
934 7x return self_->active_write_ops_->await_resume(
935 12x self_->cached_awaitable_);
936 7x }
937 };
938 7x return awaitable{this, buffers};
939 }
940
941 inline auto
942 15x any_buffer_sink::write_(
943 std::span<const_buffer const> buffers)
944 {
945 struct awaitable
946 {
947 any_buffer_sink* self_;
948 std::span<const_buffer const> buffers_;
949
950 bool
951 15x await_ready() const noexcept
952 {
953 15x return false;
954 }
955
956 std::coroutine_handle<>
957 15x await_suspend(std::coroutine_handle<> h, io_env const* env)
958 {
959 30x self_->active_write_ops_ =
960 30x self_->vt_->construct_write_awaitable(
961 15x self_->sink_,
962 15x self_->cached_awaitable_,
963 buffers_);
964
965 15x if(self_->active_write_ops_->await_ready(
966 15x self_->cached_awaitable_))
967 14x return h;
968
969 1x return self_->active_write_ops_->await_suspend(
970 1x self_->cached_awaitable_, h, env);
971 }
972
973 io_result<std::size_t>
974 15x await_resume()
975 {
976 struct guard {
977 any_buffer_sink* self;
978 15x ~guard() {
979 15x self->active_write_ops_->destroy(
980 15x self->cached_awaitable_);
981 15x self->active_write_ops_ = nullptr;
982 15x }
983 15x } g{self_};
984 15x return self_->active_write_ops_->await_resume(
985 26x self_->cached_awaitable_);
986 15x }
987 };
988 15x return awaitable{this, buffers};
989 }
990
991 inline auto
992 13x any_buffer_sink::write_eof_buffers_(
993 std::span<const_buffer const> buffers)
994 {
995 struct awaitable
996 {
997 any_buffer_sink* self_;
998 std::span<const_buffer const> buffers_;
999
1000 bool
1001 13x await_ready() const noexcept
1002 {
1003 13x return false;
1004 }
1005
1006 std::coroutine_handle<>
1007 13x await_suspend(std::coroutine_handle<> h, io_env const* env)
1008 {
1009 26x self_->active_write_ops_ =
1010 26x self_->vt_->construct_write_eof_buffers_awaitable(
1011 13x self_->sink_,
1012 13x self_->cached_awaitable_,
1013 buffers_);
1014
1015 13x if(self_->active_write_ops_->await_ready(
1016 13x self_->cached_awaitable_))
1017 12x return h;
1018
1019 1x return self_->active_write_ops_->await_suspend(
1020 1x self_->cached_awaitable_, h, env);
1021 }
1022
1023 io_result<std::size_t>
1024 13x await_resume()
1025 {
1026 struct guard {
1027 any_buffer_sink* self;
1028 13x ~guard() {
1029 13x self->active_write_ops_->destroy(
1030 13x self->cached_awaitable_);
1031 13x self->active_write_ops_ = nullptr;
1032 13x }
1033 13x } g{self_};
1034 13x return self_->active_write_ops_->await_resume(
1035 22x self_->cached_awaitable_);
1036 13x }
1037 };
1038 13x return awaitable{this, buffers};
1039 }
1040
1041 template<ConstBufferSequence CB>
1042 io_task<std::size_t>
1043 23x any_buffer_sink::write_some(CB buffers)
1044 {
1045 buffer_param<CB> bp(buffers);
1046 auto src = bp.data();
1047 if(src.empty())
1048 co_return {{}, 0};
1049
1050 // Native WriteSink path
1051 if(vt_->construct_write_some_awaitable)
1052 co_return co_await write_some_(src);
1053
1054 // Synthesized path: prepare + buffer_copy + commit
1055 mutable_buffer arr[detail::max_iovec_];
1056 auto dst_bufs = prepare(arr);
1057 if(dst_bufs.empty())
1058 {
1059 auto [ec] = co_await commit(0);
1060 if(ec)
1061 co_return {ec, 0};
1062 dst_bufs = prepare(arr);
1063 if(dst_bufs.empty())
1064 co_return {{}, 0};
1065 }
1066
1067 auto n = buffer_copy(dst_bufs, src);
1068 auto [ec] = co_await commit(n);
1069 if(ec)
1070 co_return {ec, 0};
1071 co_return {{}, n};
1072 46x }
1073
1074 template<ConstBufferSequence CB>
1075 io_task<std::size_t>
1076 39x any_buffer_sink::write(CB buffers)
1077 {
1078 buffer_param<CB> bp(buffers);
1079 std::size_t total = 0;
1080
1081 // Native WriteSink path
1082 if(vt_->construct_write_awaitable)
1083 {
1084 for(;;)
1085 {
1086 auto bufs = bp.data();
1087 if(bufs.empty())
1088 break;
1089
1090 auto [ec, n] = co_await write_(bufs);
1091 total += n;
1092 if(ec)
1093 co_return {ec, total};
1094 bp.consume(n);
1095 }
1096 co_return {{}, total};
1097 }
1098
1099 // Synthesized path: prepare + buffer_copy + commit
1100 for(;;)
1101 {
1102 auto src = bp.data();
1103 if(src.empty())
1104 break;
1105
1106 mutable_buffer arr[detail::max_iovec_];
1107 auto dst_bufs = prepare(arr);
1108 if(dst_bufs.empty())
1109 {
1110 auto [ec] = co_await commit(0);
1111 if(ec)
1112 co_return {ec, total};
1113 continue;
1114 }
1115
1116 auto n = buffer_copy(dst_bufs, src);
1117 auto [ec] = co_await commit(n);
1118 if(ec)
1119 co_return {ec, total};
1120 bp.consume(n);
1121 total += n;
1122 }
1123
1124 co_return {{}, total};
1125 78x }
1126
1127 inline auto
1128 33x any_buffer_sink::write_eof()
1129 {
1130 struct awaitable
1131 {
1132 any_buffer_sink* self_;
1133
1134 bool
1135 33x await_ready()
1136 {
1137 33x if(self_->vt_->construct_write_eof_awaitable)
1138 {
1139 // Native WriteSink: forward to underlying write_eof()
1140 34x self_->active_ops_ =
1141 17x self_->vt_->construct_write_eof_awaitable(
1142 17x self_->sink_,
1143 17x self_->cached_awaitable_);
1144 }
1145 else
1146 {
1147 // Synthesized: commit_eof(0)
1148 32x self_->active_ops_ =
1149 16x self_->vt_->construct_commit_eof_awaitable(
1150 16x self_->sink_,
1151 16x self_->cached_awaitable_,
1152 0);
1153 }
1154 66x return self_->active_ops_->await_ready(
1155 33x self_->cached_awaitable_);
1156 }
1157
1158 std::coroutine_handle<>
1159 1x await_suspend(std::coroutine_handle<> h, io_env const* env)
1160 {
1161 1x return self_->active_ops_->await_suspend(
1162 1x self_->cached_awaitable_, h, env);
1163 }
1164
1165 io_result<>
1166 33x await_resume()
1167 {
1168 struct guard {
1169 any_buffer_sink* self;
1170 33x ~guard() {
1171 33x self->active_ops_->destroy(self->cached_awaitable_);
1172 33x self->active_ops_ = nullptr;
1173 33x }
1174 33x } g{self_};
1175 33x return self_->active_ops_->await_resume(
1176 56x self_->cached_awaitable_);
1177 33x }
1178 };
1179 33x return awaitable{this};
1180 }
1181
1182 template<ConstBufferSequence CB>
1183 io_task<std::size_t>
1184 41x any_buffer_sink::write_eof(CB buffers)
1185 {
1186 // Native WriteSink path
1187 if(vt_->construct_write_eof_buffers_awaitable)
1188 {
1189 const_buffer_param<CB> bp(buffers);
1190 std::size_t total = 0;
1191
1192 for(;;)
1193 {
1194 auto bufs = bp.data();
1195 if(bufs.empty())
1196 {
1197 auto [ec] = co_await write_eof();
1198 co_return {ec, total};
1199 }
1200
1201 if(!bp.more())
1202 {
1203 // Last window: send atomically with EOF
1204 auto [ec, n] = co_await write_eof_buffers_(bufs);
1205 total += n;
1206 co_return {ec, total};
1207 }
1208
1209 auto [ec, n] = co_await write_(bufs);
1210 total += n;
1211 if(ec)
1212 co_return {ec, total};
1213 bp.consume(n);
1214 }
1215 }
1216
1217 // Synthesized path: prepare + buffer_copy + commit + commit_eof
1218 buffer_param<CB> bp(buffers);
1219 std::size_t total = 0;
1220
1221 for(;;)
1222 {
1223 auto src = bp.data();
1224 if(src.empty())
1225 break;
1226
1227 mutable_buffer arr[detail::max_iovec_];
1228 auto dst_bufs = prepare(arr);
1229 if(dst_bufs.empty())
1230 {
1231 auto [ec] = co_await commit(0);
1232 if(ec)
1233 co_return {ec, total};
1234 continue;
1235 }
1236
1237 auto n = buffer_copy(dst_bufs, src);
1238 auto [ec] = co_await commit(n);
1239 if(ec)
1240 co_return {ec, total};
1241 bp.consume(n);
1242 total += n;
1243 }
1244
1245 auto [ec] = co_await commit_eof(0);
1246 if(ec)
1247 co_return {ec, total};
1248
1249 co_return {{}, total};
1250 82x }
1251
1252 static_assert(BufferSink<any_buffer_sink>);
1253 static_assert(WriteSink<any_buffer_sink>);
1254
1255 } // namespace capy
1256 } // namespace boost
1257
1258 #endif
1259