TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/buffer_sink.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/write_sink.hpp>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/io_result.hpp>
23 : #include <boost/capy/io_task.hpp>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <new>
30 : #include <span>
31 : #include <stop_token>
32 : #include <system_error>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any BufferSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref BufferSink concept, enabling runtime polymorphism for
42 : buffer sink operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper exposes two interfaces for producing data:
46 : the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47 : and the @ref WriteSink interface (`write_some`, `write`,
48 : `write_eof`). Choose the interface that matches how your data
49 : is produced:
50 :
51 : @par Choosing an Interface
52 :
53 : Use the **BufferSink** interface when you are a generator that
54 : produces data into externally-provided buffers. The sink owns
55 : the memory; you call @ref prepare to obtain writable buffers,
56 : fill them, then call @ref commit or @ref commit_eof.
57 :
58 : Use the **WriteSink** interface when you already have buffers
59 : containing the data to write:
60 : - If the entire body is available up front, call
61 : @ref write_eof(buffers) to send everything atomically.
62 : - If data arrives incrementally, call @ref write or
63 : @ref write_some in a loop, then @ref write_eof() when done.
64 : Prefer `write` (complete) unless your streaming pattern
65 : benefits from partial writes via `write_some`.
66 :
67 : If the wrapped type only satisfies @ref BufferSink, the
68 : @ref WriteSink operations are provided automatically.
69 :
70 : @par Construction Modes
71 :
72 : - **Owning**: Pass by value to transfer ownership. The wrapper
73 : allocates storage and owns the sink.
74 : - **Reference**: Pass a pointer to wrap without ownership. The
75 : pointed-to sink must outlive this wrapper.
76 :
77 : @par Awaitable Preallocation
78 : The constructor preallocates storage for the type-erased awaitable.
79 : This reserves all virtual address space at server startup
80 : so memory usage can be measured up front, rather than
81 : allocating piecemeal as traffic arrives.
82 :
83 : @par Thread Safety
84 : Not thread-safe. Concurrent operations on the same wrapper
85 : are undefined behavior.
86 :
87 : @par Example
88 : @code
89 : // Owning - takes ownership of the sink
90 : any_buffer_sink abs(some_buffer_sink{args...});
91 :
92 : // Reference - wraps without ownership
93 : some_buffer_sink sink;
94 : any_buffer_sink abs(&sink);
95 :
96 : // BufferSink interface: generate into callee-owned buffers
97 : mutable_buffer arr[16];
98 : auto bufs = abs.prepare(arr);
99 : // Write data into bufs[0..bufs.size())
100 : auto [ec] = co_await abs.commit(bytes_written);
101 : auto [ec2] = co_await abs.commit_eof(0);
102 :
103 : // WriteSink interface: send caller-owned buffers
104 : auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105 : auto [ec4] = co_await abs.write_eof();
106 :
107 : // Or send everything at once
108 : auto [ec5, n2] = co_await abs.write_eof(
109 : make_buffer(body_data));
110 : @endcode
111 :
112 : @see any_buffer_source, BufferSink, WriteSink
113 : */
114 : class any_buffer_sink
115 : {
116 : struct vtable;
117 : struct awaitable_ops;
118 : struct write_awaitable_ops;
119 :
120 : template<BufferSink S>
121 : struct vtable_for_impl;
122 :
123 : // hot-path members first for cache locality
124 : void* sink_ = nullptr;
125 : vtable const* vt_ = nullptr;
126 : void* cached_awaitable_ = nullptr;
127 : awaitable_ops const* active_ops_ = nullptr;
128 : write_awaitable_ops const* active_write_ops_ = nullptr;
129 : void* storage_ = nullptr;
130 :
131 : public:
132 : /** Destructor.
133 :
134 : Destroys the owned sink (if any) and releases the cached
135 : awaitable storage.
136 : */
137 : ~any_buffer_sink();
138 :
139 : /** Construct a default instance.
140 :
141 : Constructs an empty wrapper. Operations on a default-constructed
142 : wrapper result in undefined behavior.
143 : */
144 : any_buffer_sink() = default;
145 :
146 : /** Non-copyable.
147 :
148 : The awaitable cache is per-instance and cannot be shared.
149 : */
150 : any_buffer_sink(any_buffer_sink const&) = delete;
151 : any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152 :
153 : /** Construct by moving.
154 :
155 : Transfers ownership of the wrapped sink (if owned) and
156 : cached awaitable storage from `other`. After the move, `other` is
157 : in a default-constructed state.
158 :
159 : @param other The wrapper to move from.
160 : */
161 HIT 2 : any_buffer_sink(any_buffer_sink&& other) noexcept
162 2 : : sink_(std::exchange(other.sink_, nullptr))
163 2 : , vt_(std::exchange(other.vt_, nullptr))
164 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
166 2 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167 2 : , storage_(std::exchange(other.storage_, nullptr))
168 : {
169 2 : }
170 :
171 : /** Assign by moving.
172 :
173 : Destroys any owned sink and releases existing resources,
174 : then transfers ownership from `other`.
175 :
176 : @param other The wrapper to move from.
177 : @return Reference to this wrapper.
178 : */
179 : any_buffer_sink&
180 : operator=(any_buffer_sink&& other) noexcept;
181 :
182 : /** Construct by taking ownership of a BufferSink.
183 :
184 : Allocates storage and moves the sink into this wrapper.
185 : The wrapper owns the sink and will destroy it. If `S` also
186 : satisfies @ref WriteSink, native write operations are
187 : forwarded through the virtual boundary.
188 :
189 : @param s The sink to take ownership of.
190 : */
191 : template<BufferSink S>
192 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193 : any_buffer_sink(S s);
194 :
195 : /** Construct by wrapping a BufferSink without ownership.
196 :
197 : Wraps the given sink by pointer. The sink must remain
198 : valid for the lifetime of this wrapper. If `S` also
199 : satisfies @ref WriteSink, native write operations are
200 : forwarded through the virtual boundary.
201 :
202 : @param s Pointer to the sink to wrap.
203 : */
204 : template<BufferSink S>
205 : any_buffer_sink(S* s);
206 :
207 : /** Check if the wrapper contains a valid sink.
208 :
209 : @return `true` if wrapping a sink, `false` if default-constructed
210 : or moved-from.
211 : */
212 : bool
213 26 : has_value() const noexcept
214 : {
215 26 : return sink_ != nullptr;
216 : }
217 :
218 : /** Check if the wrapper contains a valid sink.
219 :
220 : @return `true` if wrapping a sink, `false` if default-constructed
221 : or moved-from.
222 : */
223 : explicit
224 3 : operator bool() const noexcept
225 : {
226 3 : return has_value();
227 : }
228 :
229 : /** Prepare writable buffers.
230 :
231 : Fills the provided span with mutable buffer descriptors
232 : pointing to the underlying sink's internal storage. This
233 : operation is synchronous.
234 :
235 : @param dest Span of mutable_buffer to fill.
236 :
237 : @return A span of filled buffers.
238 :
239 : @par Preconditions
240 : The wrapper must contain a valid sink (`has_value() == true`).
241 : */
242 : std::span<mutable_buffer>
243 : prepare(std::span<mutable_buffer> dest);
244 :
245 : /** Commit bytes written to the prepared buffers.
246 :
247 : Commits `n` bytes written to the buffers returned by the
248 : most recent call to @ref prepare. The operation may trigger
249 : underlying I/O.
250 :
251 : @param n The number of bytes to commit.
252 :
253 : @return An awaitable that await-returns `(error_code)`.
254 :
255 : @par Preconditions
256 : The wrapper must contain a valid sink (`has_value() == true`).
257 : */
258 : auto
259 : commit(std::size_t n);
260 :
261 : /** Commit final bytes and signal end-of-stream.
262 :
263 : Commits `n` bytes written to the buffers returned by the
264 : most recent call to @ref prepare and finalizes the sink.
265 : After success, no further operations are permitted.
266 :
267 : @param n The number of bytes to commit.
268 :
269 : @return An awaitable that await-returns `(error_code)`.
270 :
271 : @par Preconditions
272 : The wrapper must contain a valid sink (`has_value() == true`).
273 : */
274 : auto
275 : commit_eof(std::size_t n);
276 :
277 : /** Write some data from a buffer sequence.
278 :
279 : Attempt to write up to `buffer_size( buffers )` bytes from
280 : the buffer sequence to the underlying sink. May consume less
281 : than the full sequence.
282 :
283 : When the wrapped type provides native @ref WriteSink support,
284 : the operation forwards directly. Otherwise it is synthesized
285 : from @ref prepare and @ref commit with a buffer copy.
286 :
287 : @param buffers The buffer sequence to write.
288 :
289 : @return An awaitable that await-returns `(error_code,std::size_t)`.
290 :
291 : @par Preconditions
292 : The wrapper must contain a valid sink (`has_value() == true`).
293 : */
294 : template<ConstBufferSequence CB>
295 : io_task<std::size_t>
296 : write_some(CB buffers);
297 :
298 : /** Write all data from a buffer sequence.
299 :
300 : Writes all data from the buffer sequence to the underlying
301 : sink. This method satisfies the @ref WriteSink concept.
302 :
303 : When the wrapped type provides native @ref WriteSink support,
304 : each window is forwarded directly. Otherwise the data is
305 : copied into the sink via @ref prepare and @ref commit.
306 :
307 : @param buffers The buffer sequence to write.
308 :
309 : @return An awaitable that await-returns `(error_code,std::size_t)`.
310 :
311 : @par Preconditions
312 : The wrapper must contain a valid sink (`has_value() == true`).
313 : */
314 : template<ConstBufferSequence CB>
315 : io_task<std::size_t>
316 : write(CB buffers);
317 :
318 : /** Atomically write data and signal end-of-stream.
319 :
320 : Writes all data from the buffer sequence to the underlying
321 : sink and then signals end-of-stream.
322 :
323 : When the wrapped type provides native @ref WriteSink support,
324 : the final window is sent atomically via the underlying
325 : `write_eof(buffers)`. Otherwise the data is synthesized
326 : through @ref prepare, @ref commit, and @ref commit_eof.
327 :
328 : @param buffers The buffer sequence to write.
329 :
330 : @return An awaitable that await-returns `(error_code,std::size_t)`.
331 :
332 : @par Preconditions
333 : The wrapper must contain a valid sink (`has_value() == true`).
334 : */
335 : template<ConstBufferSequence CB>
336 : io_task<std::size_t>
337 : write_eof(CB buffers);
338 :
339 : /** Signal end-of-stream.
340 :
341 : Indicates that no more data will be written to the sink.
342 : This method satisfies the @ref WriteSink concept.
343 :
344 : When the wrapped type provides native @ref WriteSink support,
345 : the underlying `write_eof()` is called. Otherwise the
346 : operation is implemented as `commit_eof(0)`.
347 :
348 : @return An awaitable that await-returns `(error_code)`.
349 :
350 : @par Preconditions
351 : The wrapper must contain a valid sink (`has_value() == true`).
352 : */
353 : auto
354 : write_eof();
355 :
356 : protected:
357 : /** Rebind to a new sink after move.
358 :
359 : Updates the internal pointer to reference a new sink object.
360 : Used by owning wrappers after move assignment when the owned
361 : object has moved to a new location.
362 :
363 : @param new_sink The new sink to bind to. Must be the same
364 : type as the original sink.
365 :
366 : @note Terminates if called with a sink of different type
367 : than the original.
368 : */
369 : template<BufferSink S>
370 : void
371 : rebind(S& new_sink) noexcept
372 : {
373 : if(vt_ != &vtable_for_impl<S>::value)
374 : std::terminate();
375 : sink_ = &new_sink;
376 : }
377 :
378 : private:
379 : /** Forward a partial write through the vtable.
380 :
381 : Constructs the underlying `write_some` awaitable in
382 : cached storage and returns a type-erased awaitable.
383 : */
384 : auto
385 : write_some_(std::span<const_buffer const> buffers);
386 :
387 : /** Forward a complete write through the vtable.
388 :
389 : Constructs the underlying `write` awaitable in
390 : cached storage and returns a type-erased awaitable.
391 : */
392 : auto
393 : write_(std::span<const_buffer const> buffers);
394 :
395 : /** Forward an atomic write-with-EOF through the vtable.
396 :
397 : Constructs the underlying `write_eof(buffers)` awaitable
398 : in cached storage and returns a type-erased awaitable.
399 : */
400 : auto
401 : write_eof_buffers_(std::span<const_buffer const> buffers);
402 : };
403 :
404 : /** Type-erased ops for awaitables that await-return `io_result<>`. */
405 : struct any_buffer_sink::awaitable_ops
406 : {
407 : bool (*await_ready)(void*);
408 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
409 : io_result<> (*await_resume)(void*);
410 : void (*destroy)(void*) noexcept;
411 : };
412 :
413 : /** Type-erased ops for awaitables that await-return `io_result<std::size_t>`. */
414 : struct any_buffer_sink::write_awaitable_ops
415 : {
416 : bool (*await_ready)(void*);
417 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
418 : io_result<std::size_t> (*await_resume)(void*);
419 : void (*destroy)(void*) noexcept;
420 : };
421 :
422 : struct any_buffer_sink::vtable
423 : {
424 : void (*destroy)(void*) noexcept;
425 : std::span<mutable_buffer> (*do_prepare)(
426 : void* sink,
427 : std::span<mutable_buffer> dest);
428 : std::size_t awaitable_size;
429 : std::size_t awaitable_align;
430 : awaitable_ops const* (*construct_commit_awaitable)(
431 : void* sink,
432 : void* storage,
433 : std::size_t n);
434 : awaitable_ops const* (*construct_commit_eof_awaitable)(
435 : void* sink,
436 : void* storage,
437 : std::size_t n);
438 :
439 : // WriteSink forwarding (null when wrapped type is BufferSink-only)
440 : write_awaitable_ops const* (*construct_write_some_awaitable)(
441 : void* sink,
442 : void* storage,
443 : std::span<const_buffer const> buffers);
444 : write_awaitable_ops const* (*construct_write_awaitable)(
445 : void* sink,
446 : void* storage,
447 : std::span<const_buffer const> buffers);
448 : write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
449 : void* sink,
450 : void* storage,
451 : std::span<const_buffer const> buffers);
452 : awaitable_ops const* (*construct_write_eof_awaitable)(
453 : void* sink,
454 : void* storage);
455 : };
456 :
457 : template<BufferSink S>
458 : struct any_buffer_sink::vtable_for_impl
459 : {
460 : using CommitAwaitable = decltype(std::declval<S&>().commit(
461 : std::size_t{}));
462 : using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
463 : std::size_t{}));
464 :
465 : static void
466 18 : do_destroy_impl(void* sink) noexcept
467 : {
468 18 : static_cast<S*>(sink)->~S();
469 18 : }
470 :
471 : static std::span<mutable_buffer>
472 132 : do_prepare_impl(
473 : void* sink,
474 : std::span<mutable_buffer> dest)
475 : {
476 132 : auto& s = *static_cast<S*>(sink);
477 132 : return s.prepare(dest);
478 : }
479 :
480 : static awaitable_ops const*
481 110 : construct_commit_awaitable_impl(
482 : void* sink,
483 : void* storage,
484 : std::size_t n)
485 : {
486 110 : auto& s = *static_cast<S*>(sink);
487 110 : ::new(storage) CommitAwaitable(s.commit(n));
488 :
489 : static constexpr awaitable_ops ops = {
490 110 : +[](void* p) {
491 110 : return static_cast<CommitAwaitable*>(p)->await_ready();
492 : },
493 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
494 1 : return detail::call_await_suspend(
495 1 : static_cast<CommitAwaitable*>(p), h, env);
496 : },
497 110 : +[](void* p) {
498 110 : return static_cast<CommitAwaitable*>(p)->await_resume();
499 : },
500 110 : +[](void* p) noexcept {
501 110 : static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
502 : }
503 : };
504 110 : return &ops;
505 : }
506 :
507 : static awaitable_ops const*
508 71 : construct_commit_eof_awaitable_impl(
509 : void* sink,
510 : void* storage,
511 : std::size_t n)
512 : {
513 71 : auto& s = *static_cast<S*>(sink);
514 71 : ::new(storage) CommitEofAwaitable(s.commit_eof(n));
515 :
516 : static constexpr awaitable_ops ops = {
517 71 : +[](void* p) {
518 71 : return static_cast<CommitEofAwaitable*>(p)->await_ready();
519 : },
520 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
521 1 : return detail::call_await_suspend(
522 1 : static_cast<CommitEofAwaitable*>(p), h, env);
523 : },
524 71 : +[](void* p) {
525 71 : return static_cast<CommitEofAwaitable*>(p)->await_resume();
526 : },
527 71 : +[](void* p) noexcept {
528 71 : static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
529 : }
530 : };
531 71 : return &ops;
532 : }
533 :
534 : static write_awaitable_ops const*
535 7 : construct_write_some_awaitable_impl(
536 : void* sink,
537 : void* storage,
538 : std::span<const_buffer const> buffers)
539 : requires WriteSink<S>
540 : {
541 : using Aw = decltype(std::declval<S&>().write_some(
542 : std::span<const_buffer const>{}));
543 7 : auto& s = *static_cast<S*>(sink);
544 7 : ::new(storage) Aw(s.write_some(buffers));
545 :
546 : static constexpr write_awaitable_ops ops = {
547 7 : +[](void* p) {
548 7 : return static_cast<Aw*>(p)->await_ready();
549 : },
550 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
551 1 : return detail::call_await_suspend(
552 1 : static_cast<Aw*>(p), h, env);
553 : },
554 7 : +[](void* p) {
555 7 : return static_cast<Aw*>(p)->await_resume();
556 : },
557 7 : +[](void* p) noexcept {
558 7 : static_cast<Aw*>(p)->~Aw();
559 : }
560 : };
561 7 : return &ops;
562 : }
563 :
564 : static write_awaitable_ops const*
565 15 : construct_write_awaitable_impl(
566 : void* sink,
567 : void* storage,
568 : std::span<const_buffer const> buffers)
569 : requires WriteSink<S>
570 : {
571 : using Aw = decltype(std::declval<S&>().write(
572 : std::span<const_buffer const>{}));
573 15 : auto& s = *static_cast<S*>(sink);
574 15 : ::new(storage) Aw(s.write(buffers));
575 :
576 : static constexpr write_awaitable_ops ops = {
577 15 : +[](void* p) {
578 15 : return static_cast<Aw*>(p)->await_ready();
579 : },
580 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
581 1 : return detail::call_await_suspend(
582 1 : static_cast<Aw*>(p), h, env);
583 : },
584 15 : +[](void* p) {
585 15 : return static_cast<Aw*>(p)->await_resume();
586 : },
587 15 : +[](void* p) noexcept {
588 15 : static_cast<Aw*>(p)->~Aw();
589 : }
590 : };
591 15 : return &ops;
592 : }
593 :
594 : static write_awaitable_ops const*
595 13 : construct_write_eof_buffers_awaitable_impl(
596 : void* sink,
597 : void* storage,
598 : std::span<const_buffer const> buffers)
599 : requires WriteSink<S>
600 : {
601 : using Aw = decltype(std::declval<S&>().write_eof(
602 : std::span<const_buffer const>{}));
603 13 : auto& s = *static_cast<S*>(sink);
604 13 : ::new(storage) Aw(s.write_eof(buffers));
605 :
606 : static constexpr write_awaitable_ops ops = {
607 13 : +[](void* p) {
608 13 : return static_cast<Aw*>(p)->await_ready();
609 : },
610 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
611 1 : return detail::call_await_suspend(
612 1 : static_cast<Aw*>(p), h, env);
613 : },
614 13 : +[](void* p) {
615 13 : return static_cast<Aw*>(p)->await_resume();
616 : },
617 13 : +[](void* p) noexcept {
618 13 : static_cast<Aw*>(p)->~Aw();
619 : }
620 : };
621 13 : return &ops;
622 : }
623 :
624 : static awaitable_ops const*
625 17 : construct_write_eof_awaitable_impl(
626 : void* sink,
627 : void* storage)
628 : requires WriteSink<S>
629 : {
630 : using Aw = decltype(std::declval<S&>().write_eof());
631 17 : auto& s = *static_cast<S*>(sink);
632 17 : ::new(storage) Aw(s.write_eof());
633 :
634 : static constexpr awaitable_ops ops = {
635 17 : +[](void* p) {
636 17 : return static_cast<Aw*>(p)->await_ready();
637 : },
638 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
639 1 : return detail::call_await_suspend(
640 1 : static_cast<Aw*>(p), h, env);
641 : },
642 17 : +[](void* p) {
643 17 : return static_cast<Aw*>(p)->await_resume();
644 : },
645 17 : +[](void* p) noexcept {
646 17 : static_cast<Aw*>(p)->~Aw();
647 : }
648 : };
649 17 : return &ops;
650 : }
651 :
652 : static consteval std::size_t
653 : compute_max_size() noexcept
654 : {
655 : std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
656 : ? sizeof(CommitAwaitable)
657 : : sizeof(CommitEofAwaitable);
658 : if constexpr (WriteSink<S>)
659 : {
660 : using WS = decltype(std::declval<S&>().write_some(
661 : std::span<const_buffer const>{}));
662 : using W = decltype(std::declval<S&>().write(
663 : std::span<const_buffer const>{}));
664 : using WEB = decltype(std::declval<S&>().write_eof(
665 : std::span<const_buffer const>{}));
666 : using WE = decltype(std::declval<S&>().write_eof());
667 :
668 : if(sizeof(WS) > s) s = sizeof(WS);
669 : if(sizeof(W) > s) s = sizeof(W);
670 : if(sizeof(WEB) > s) s = sizeof(WEB);
671 : if(sizeof(WE) > s) s = sizeof(WE);
672 : }
673 : return s;
674 : }
675 :
676 : static consteval std::size_t
677 : compute_max_align() noexcept
678 : {
679 : std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
680 : ? alignof(CommitAwaitable)
681 : : alignof(CommitEofAwaitable);
682 : if constexpr (WriteSink<S>)
683 : {
684 : using WS = decltype(std::declval<S&>().write_some(
685 : std::span<const_buffer const>{}));
686 : using W = decltype(std::declval<S&>().write(
687 : std::span<const_buffer const>{}));
688 : using WEB = decltype(std::declval<S&>().write_eof(
689 : std::span<const_buffer const>{}));
690 : using WE = decltype(std::declval<S&>().write_eof());
691 :
692 : if(alignof(WS) > a) a = alignof(WS);
693 : if(alignof(W) > a) a = alignof(W);
694 : if(alignof(WEB) > a) a = alignof(WEB);
695 : if(alignof(WE) > a) a = alignof(WE);
696 : }
697 : return a;
698 : }
699 :
700 : static consteval vtable
701 : make_vtable() noexcept
702 : {
703 : vtable v{};
704 : v.destroy = &do_destroy_impl;
705 : v.do_prepare = &do_prepare_impl;
706 : v.awaitable_size = compute_max_size();
707 : v.awaitable_align = compute_max_align();
708 : v.construct_commit_awaitable = &construct_commit_awaitable_impl;
709 : v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
710 : v.construct_write_some_awaitable = nullptr;
711 : v.construct_write_awaitable = nullptr;
712 : v.construct_write_eof_buffers_awaitable = nullptr;
713 : v.construct_write_eof_awaitable = nullptr;
714 :
715 : if constexpr (WriteSink<S>)
716 : {
717 : v.construct_write_some_awaitable =
718 : &construct_write_some_awaitable_impl;
719 : v.construct_write_awaitable =
720 : &construct_write_awaitable_impl;
721 : v.construct_write_eof_buffers_awaitable =
722 : &construct_write_eof_buffers_awaitable_impl;
723 : v.construct_write_eof_awaitable =
724 : &construct_write_eof_awaitable_impl;
725 : }
726 : return v;
727 : }
728 :
729 : static constexpr vtable value = make_vtable();
730 : };
731 :
732 : inline
733 218 : any_buffer_sink::~any_buffer_sink()
734 : {
735 218 : if(storage_)
736 : {
737 17 : vt_->destroy(sink_);
738 17 : ::operator delete(storage_);
739 : }
740 218 : if(cached_awaitable_)
741 211 : ::operator delete(cached_awaitable_);
742 218 : }
743 :
744 : inline any_buffer_sink&
745 5 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
746 : {
747 5 : if(this != &other)
748 : {
749 4 : if(storage_)
750 : {
751 1 : vt_->destroy(sink_);
752 1 : ::operator delete(storage_);
753 : }
754 4 : if(cached_awaitable_)
755 2 : ::operator delete(cached_awaitable_);
756 4 : sink_ = std::exchange(other.sink_, nullptr);
757 4 : vt_ = std::exchange(other.vt_, nullptr);
758 4 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
759 4 : storage_ = std::exchange(other.storage_, nullptr);
760 4 : active_ops_ = std::exchange(other.active_ops_, nullptr);
761 4 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
762 : }
763 5 : return *this;
764 : }
765 :
766 : template<BufferSink S>
767 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
768 19 : any_buffer_sink::any_buffer_sink(S s)
769 19 : : vt_(&vtable_for_impl<S>::value)
770 : {
771 : struct guard {
772 : any_buffer_sink* self;
773 : bool committed = false;
774 19 : ~guard() {
775 19 : if(!committed && self->storage_) {
776 1 : if(self->sink_)
777 : self->vt_->destroy(self->sink_); // LCOV_EXCL_LINE OOM rollback: only when the cached-awaitable allocation throws
778 1 : ::operator delete(self->storage_);
779 1 : self->storage_ = nullptr;
780 1 : self->sink_ = nullptr;
781 : }
782 19 : }
783 19 : } g{this};
784 :
785 19 : storage_ = ::operator new(sizeof(S));
786 19 : sink_ = ::new(storage_) S(std::move(s));
787 :
788 18 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
789 :
790 18 : g.committed = true;
791 19 : }
792 :
793 : template<BufferSink S>
794 195 : any_buffer_sink::any_buffer_sink(S* s)
795 195 : : sink_(s)
796 195 : , vt_(&vtable_for_impl<S>::value)
797 : {
798 195 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
799 195 : }
800 :
801 : inline std::span<mutable_buffer>
802 132 : any_buffer_sink::prepare(std::span<mutable_buffer> dest)
803 : {
804 132 : return vt_->do_prepare(sink_, dest);
805 : }
806 :
807 : inline auto
808 110 : any_buffer_sink::commit(std::size_t n)
809 : {
810 : struct awaitable
811 : {
812 : any_buffer_sink* self_;
813 : std::size_t n_;
814 :
815 : bool
816 110 : await_ready()
817 : {
818 220 : self_->active_ops_ = self_->vt_->construct_commit_awaitable(
819 110 : self_->sink_,
820 110 : self_->cached_awaitable_,
821 : n_);
822 110 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
823 : }
824 :
825 : std::coroutine_handle<>
826 1 : await_suspend(std::coroutine_handle<> h, io_env const* env)
827 : {
828 1 : return self_->active_ops_->await_suspend(
829 1 : self_->cached_awaitable_, h, env);
830 : }
831 :
832 : io_result<>
833 110 : await_resume()
834 : {
835 : struct guard {
836 : any_buffer_sink* self;
837 110 : ~guard() {
838 110 : self->active_ops_->destroy(self->cached_awaitable_);
839 110 : self->active_ops_ = nullptr;
840 110 : }
841 110 : } g{self_};
842 110 : return self_->active_ops_->await_resume(
843 193 : self_->cached_awaitable_);
844 110 : }
845 : };
846 110 : return awaitable{this, n};
847 : }
848 :
849 : inline auto
850 55 : any_buffer_sink::commit_eof(std::size_t n)
851 : {
852 : struct awaitable
853 : {
854 : any_buffer_sink* self_;
855 : std::size_t n_;
856 :
857 : bool
858 55 : await_ready()
859 : {
860 110 : self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
861 55 : self_->sink_,
862 55 : self_->cached_awaitable_,
863 : n_);
864 55 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
865 : }
866 :
867 : std::coroutine_handle<>
868 1 : await_suspend(std::coroutine_handle<> h, io_env const* env)
869 : {
870 1 : return self_->active_ops_->await_suspend(
871 1 : self_->cached_awaitable_, h, env);
872 : }
873 :
874 : io_result<>
875 55 : await_resume()
876 : {
877 : struct guard {
878 : any_buffer_sink* self;
879 55 : ~guard() {
880 55 : self->active_ops_->destroy(self->cached_awaitable_);
881 55 : self->active_ops_ = nullptr;
882 55 : }
883 55 : } g{self_};
884 55 : return self_->active_ops_->await_resume(
885 94 : self_->cached_awaitable_);
886 55 : }
887 : };
888 55 : return awaitable{this, n};
889 : }
890 :
891 : inline auto
892 7 : any_buffer_sink::write_some_(
893 : std::span<const_buffer const> buffers)
894 : {
895 : struct awaitable
896 : {
897 : any_buffer_sink* self_;
898 : std::span<const_buffer const> buffers_;
899 :
900 : bool
901 7 : await_ready() const noexcept
902 : {
903 7 : return false;
904 : }
905 :
906 : std::coroutine_handle<>
907 7 : await_suspend(std::coroutine_handle<> h, io_env const* env)
908 : {
909 14 : self_->active_write_ops_ =
910 14 : self_->vt_->construct_write_some_awaitable(
911 7 : self_->sink_,
912 7 : self_->cached_awaitable_,
913 : buffers_);
914 :
915 7 : if(self_->active_write_ops_->await_ready(
916 7 : self_->cached_awaitable_))
917 6 : return h;
918 :
919 1 : return self_->active_write_ops_->await_suspend(
920 1 : self_->cached_awaitable_, h, env);
921 : }
922 :
923 : io_result<std::size_t>
924 7 : await_resume()
925 : {
926 : struct guard {
927 : any_buffer_sink* self;
928 7 : ~guard() {
929 7 : self->active_write_ops_->destroy(
930 7 : self->cached_awaitable_);
931 7 : self->active_write_ops_ = nullptr;
932 7 : }
933 7 : } g{self_};
934 7 : return self_->active_write_ops_->await_resume(
935 12 : self_->cached_awaitable_);
936 7 : }
937 : };
938 7 : return awaitable{this, buffers};
939 : }
940 :
941 : inline auto
942 15 : any_buffer_sink::write_(
943 : std::span<const_buffer const> buffers)
944 : {
945 : struct awaitable
946 : {
947 : any_buffer_sink* self_;
948 : std::span<const_buffer const> buffers_;
949 :
950 : bool
951 15 : await_ready() const noexcept
952 : {
953 15 : return false;
954 : }
955 :
956 : std::coroutine_handle<>
957 15 : await_suspend(std::coroutine_handle<> h, io_env const* env)
958 : {
959 30 : self_->active_write_ops_ =
960 30 : self_->vt_->construct_write_awaitable(
961 15 : self_->sink_,
962 15 : self_->cached_awaitable_,
963 : buffers_);
964 :
965 15 : if(self_->active_write_ops_->await_ready(
966 15 : self_->cached_awaitable_))
967 14 : return h;
968 :
969 1 : return self_->active_write_ops_->await_suspend(
970 1 : self_->cached_awaitable_, h, env);
971 : }
972 :
973 : io_result<std::size_t>
974 15 : await_resume()
975 : {
976 : struct guard {
977 : any_buffer_sink* self;
978 15 : ~guard() {
979 15 : self->active_write_ops_->destroy(
980 15 : self->cached_awaitable_);
981 15 : self->active_write_ops_ = nullptr;
982 15 : }
983 15 : } g{self_};
984 15 : return self_->active_write_ops_->await_resume(
985 26 : self_->cached_awaitable_);
986 15 : }
987 : };
988 15 : return awaitable{this, buffers};
989 : }
990 :
991 : inline auto
992 13 : any_buffer_sink::write_eof_buffers_(
993 : std::span<const_buffer const> buffers)
994 : {
995 : struct awaitable
996 : {
997 : any_buffer_sink* self_;
998 : std::span<const_buffer const> buffers_;
999 :
1000 : bool
1001 13 : await_ready() const noexcept
1002 : {
1003 13 : return false;
1004 : }
1005 :
1006 : std::coroutine_handle<>
1007 13 : await_suspend(std::coroutine_handle<> h, io_env const* env)
1008 : {
1009 26 : self_->active_write_ops_ =
1010 26 : self_->vt_->construct_write_eof_buffers_awaitable(
1011 13 : self_->sink_,
1012 13 : self_->cached_awaitable_,
1013 : buffers_);
1014 :
1015 13 : if(self_->active_write_ops_->await_ready(
1016 13 : self_->cached_awaitable_))
1017 12 : return h;
1018 :
1019 1 : return self_->active_write_ops_->await_suspend(
1020 1 : self_->cached_awaitable_, h, env);
1021 : }
1022 :
1023 : io_result<std::size_t>
1024 13 : await_resume()
1025 : {
1026 : struct guard {
1027 : any_buffer_sink* self;
1028 13 : ~guard() {
1029 13 : self->active_write_ops_->destroy(
1030 13 : self->cached_awaitable_);
1031 13 : self->active_write_ops_ = nullptr;
1032 13 : }
1033 13 : } g{self_};
1034 13 : return self_->active_write_ops_->await_resume(
1035 22 : self_->cached_awaitable_);
1036 13 : }
1037 : };
1038 13 : return awaitable{this, buffers};
1039 : }
1040 :
1041 : template<ConstBufferSequence CB>
1042 : io_task<std::size_t>
1043 23 : any_buffer_sink::write_some(CB buffers)
1044 : {
1045 : buffer_param<CB> bp(buffers);
1046 : auto src = bp.data();
1047 : if(src.empty())
1048 : co_return {{}, 0};
1049 :
1050 : // Native WriteSink path
1051 : if(vt_->construct_write_some_awaitable)
1052 : co_return co_await write_some_(src);
1053 :
1054 : // Synthesized path: prepare + buffer_copy + commit
1055 : mutable_buffer arr[detail::max_iovec_];
1056 : auto dst_bufs = prepare(arr);
1057 : if(dst_bufs.empty())
1058 : {
1059 : auto [ec] = co_await commit(0);
1060 : if(ec)
1061 : co_return {ec, 0};
1062 : dst_bufs = prepare(arr);
1063 : if(dst_bufs.empty())
1064 : co_return {{}, 0};
1065 : }
1066 :
1067 : auto n = buffer_copy(dst_bufs, src);
1068 : auto [ec] = co_await commit(n);
1069 : if(ec)
1070 : co_return {ec, 0};
1071 : co_return {{}, n};
1072 46 : }
1073 :
1074 : template<ConstBufferSequence CB>
1075 : io_task<std::size_t>
1076 39 : any_buffer_sink::write(CB buffers)
1077 : {
1078 : buffer_param<CB> bp(buffers);
1079 : std::size_t total = 0;
1080 :
1081 : // Native WriteSink path
1082 : if(vt_->construct_write_awaitable)
1083 : {
1084 : for(;;)
1085 : {
1086 : auto bufs = bp.data();
1087 : if(bufs.empty())
1088 : break;
1089 :
1090 : auto [ec, n] = co_await write_(bufs);
1091 : total += n;
1092 : if(ec)
1093 : co_return {ec, total};
1094 : bp.consume(n);
1095 : }
1096 : co_return {{}, total};
1097 : }
1098 :
1099 : // Synthesized path: prepare + buffer_copy + commit
1100 : for(;;)
1101 : {
1102 : auto src = bp.data();
1103 : if(src.empty())
1104 : break;
1105 :
1106 : mutable_buffer arr[detail::max_iovec_];
1107 : auto dst_bufs = prepare(arr);
1108 : if(dst_bufs.empty())
1109 : {
1110 : auto [ec] = co_await commit(0);
1111 : if(ec)
1112 : co_return {ec, total};
1113 : continue;
1114 : }
1115 :
1116 : auto n = buffer_copy(dst_bufs, src);
1117 : auto [ec] = co_await commit(n);
1118 : if(ec)
1119 : co_return {ec, total};
1120 : bp.consume(n);
1121 : total += n;
1122 : }
1123 :
1124 : co_return {{}, total};
1125 78 : }
1126 :
1127 : inline auto
1128 33 : any_buffer_sink::write_eof()
1129 : {
1130 : struct awaitable
1131 : {
1132 : any_buffer_sink* self_;
1133 :
1134 : bool
1135 33 : await_ready()
1136 : {
1137 33 : if(self_->vt_->construct_write_eof_awaitable)
1138 : {
1139 : // Native WriteSink: forward to underlying write_eof()
1140 34 : self_->active_ops_ =
1141 17 : self_->vt_->construct_write_eof_awaitable(
1142 17 : self_->sink_,
1143 17 : self_->cached_awaitable_);
1144 : }
1145 : else
1146 : {
1147 : // Synthesized: commit_eof(0)
1148 32 : self_->active_ops_ =
1149 16 : self_->vt_->construct_commit_eof_awaitable(
1150 16 : self_->sink_,
1151 16 : self_->cached_awaitable_,
1152 : 0);
1153 : }
1154 66 : return self_->active_ops_->await_ready(
1155 33 : self_->cached_awaitable_);
1156 : }
1157 :
1158 : std::coroutine_handle<>
1159 1 : await_suspend(std::coroutine_handle<> h, io_env const* env)
1160 : {
1161 1 : return self_->active_ops_->await_suspend(
1162 1 : self_->cached_awaitable_, h, env);
1163 : }
1164 :
1165 : io_result<>
1166 33 : await_resume()
1167 : {
1168 : struct guard {
1169 : any_buffer_sink* self;
1170 33 : ~guard() {
1171 33 : self->active_ops_->destroy(self->cached_awaitable_);
1172 33 : self->active_ops_ = nullptr;
1173 33 : }
1174 33 : } g{self_};
1175 33 : return self_->active_ops_->await_resume(
1176 56 : self_->cached_awaitable_);
1177 33 : }
1178 : };
1179 33 : return awaitable{this};
1180 : }
1181 :
1182 : template<ConstBufferSequence CB>
1183 : io_task<std::size_t>
1184 41 : any_buffer_sink::write_eof(CB buffers)
1185 : {
1186 : // Native WriteSink path
1187 : if(vt_->construct_write_eof_buffers_awaitable)
1188 : {
1189 : const_buffer_param<CB> bp(buffers);
1190 : std::size_t total = 0;
1191 :
1192 : for(;;)
1193 : {
1194 : auto bufs = bp.data();
1195 : if(bufs.empty())
1196 : {
1197 : auto [ec] = co_await write_eof();
1198 : co_return {ec, total};
1199 : }
1200 :
1201 : if(!bp.more())
1202 : {
1203 : // Last window: send atomically with EOF
1204 : auto [ec, n] = co_await write_eof_buffers_(bufs);
1205 : total += n;
1206 : co_return {ec, total};
1207 : }
1208 :
1209 : auto [ec, n] = co_await write_(bufs);
1210 : total += n;
1211 : if(ec)
1212 : co_return {ec, total};
1213 : bp.consume(n);
1214 : }
1215 : }
1216 :
1217 : // Synthesized path: prepare + buffer_copy + commit + commit_eof
1218 : buffer_param<CB> bp(buffers);
1219 : std::size_t total = 0;
1220 :
1221 : for(;;)
1222 : {
1223 : auto src = bp.data();
1224 : if(src.empty())
1225 : break;
1226 :
1227 : mutable_buffer arr[detail::max_iovec_];
1228 : auto dst_bufs = prepare(arr);
1229 : if(dst_bufs.empty())
1230 : {
1231 : auto [ec] = co_await commit(0);
1232 : if(ec)
1233 : co_return {ec, total};
1234 : continue;
1235 : }
1236 :
1237 : auto n = buffer_copy(dst_bufs, src);
1238 : auto [ec] = co_await commit(n);
1239 : if(ec)
1240 : co_return {ec, total};
1241 : bp.consume(n);
1242 : total += n;
1243 : }
1244 :
1245 : auto [ec] = co_await commit_eof(0);
1246 : if(ec)
1247 : co_return {ec, total};
1248 :
1249 : co_return {{}, total};
1250 82 : }
1251 :
1252 : static_assert(BufferSink<any_buffer_sink>);
1253 : static_assert(WriteSink<any_buffer_sink>);
1254 :
1255 : } // namespace capy
1256 : } // namespace boost
1257 :
1258 : #endif
|