Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/buffer_sink.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/write_sink.hpp>
21 : #include <boost/capy/coro.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <boost/capy/io_result.hpp>
24 : #include <boost/capy/task.hpp>
25 :
26 : #include <concepts>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <exception>
30 : #include <new>
31 : #include <stop_token>
32 : #include <system_error>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any BufferSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref BufferSink concept, enabling runtime polymorphism for
42 : buffer sink operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper also satisfies @ref WriteSink through templated
46 : @ref write methods. These methods copy data from the caller's
47 : buffers into the sink's internal storage, incurring one extra
48 : buffer copy compared to using @ref prepare and @ref commit
49 : directly.
50 :
51 : The wrapper supports two construction modes:
52 : - **Owning**: Pass by value to transfer ownership. The wrapper
53 : allocates storage and owns the sink.
54 : - **Reference**: Pass a pointer to wrap without ownership. The
55 : pointed-to sink must outlive this wrapper.
56 :
57 : @par Awaitable Preallocation
58 : The constructor preallocates storage for the type-erased awaitable.
59 : This reserves all virtual address space at server startup
60 : so memory usage can be measured up front, rather than
61 : allocating piecemeal as traffic arrives.
62 :
63 : @par Thread Safety
64 : Not thread-safe. Concurrent operations on the same wrapper
65 : are undefined behavior.
66 :
67 : @par Example
68 : @code
69 : // Owning - takes ownership of the sink
70 : any_buffer_sink abs(some_buffer_sink{args...});
71 :
72 : // Reference - wraps without ownership
73 : some_buffer_sink sink;
74 : any_buffer_sink abs(&sink);
75 :
76 : mutable_buffer arr[16];
77 : auto bufs = abs.prepare(arr);
78 : // Write data into bufs[0..bufs.size())
79 : auto [ec] = co_await abs.commit(bytes_written);
80 : auto [ec2] = co_await abs.commit_eof();
81 : @endcode
82 :
83 : @see any_buffer_source, BufferSink, WriteSink
84 : */
85 : class any_buffer_sink
86 : {
87 : struct vtable;
88 : struct awaitable_ops;
89 :
90 : template<BufferSink S>
91 : struct vtable_for_impl;
92 :
93 : void* sink_ = nullptr;
94 : vtable const* vt_ = nullptr;
95 : void* cached_awaitable_ = nullptr;
96 : void* storage_ = nullptr;
97 : awaitable_ops const* active_ops_ = nullptr;
98 :
99 : public:
100 : /** Destructor.
101 :
102 : Destroys the owned sink (if any) and releases the cached
103 : awaitable storage.
104 : */
105 : ~any_buffer_sink();
106 :
107 : /** Default constructor.
108 :
109 : Constructs an empty wrapper. Operations on a default-constructed
110 : wrapper result in undefined behavior.
111 : */
112 : any_buffer_sink() = default;
113 :
114 : /** Non-copyable.
115 :
116 : The awaitable cache is per-instance and cannot be shared.
117 : */
118 : any_buffer_sink(any_buffer_sink const&) = delete;
119 : any_buffer_sink& operator=(any_buffer_sink const&) = delete;
120 :
121 : /** Move constructor.
122 :
123 : Transfers ownership of the wrapped sink (if owned) and
124 : cached awaitable storage from `other`. After the move, `other` is
125 : in a default-constructed state.
126 :
127 : @param other The wrapper to move from.
128 : */
129 1 : any_buffer_sink(any_buffer_sink&& other) noexcept
130 1 : : sink_(std::exchange(other.sink_, nullptr))
131 1 : , vt_(std::exchange(other.vt_, nullptr))
132 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
133 1 : , storage_(std::exchange(other.storage_, nullptr))
134 1 : , active_ops_(std::exchange(other.active_ops_, nullptr))
135 : {
136 1 : }
137 :
138 : /** Move assignment operator.
139 :
140 : Destroys any owned sink and releases existing resources,
141 : then transfers ownership from `other`.
142 :
143 : @param other The wrapper to move from.
144 : @return Reference to this wrapper.
145 : */
146 : any_buffer_sink&
147 : operator=(any_buffer_sink&& other) noexcept;
148 :
149 : /** Construct by taking ownership of a BufferSink.
150 :
151 : Allocates storage and moves the sink into this wrapper.
152 : The wrapper owns the sink and will destroy it.
153 :
154 : @param s The sink to take ownership of.
155 : */
156 : template<BufferSink S>
157 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
158 : any_buffer_sink(S s);
159 :
160 : /** Construct by wrapping a BufferSink without ownership.
161 :
162 : Wraps the given sink by pointer. The sink must remain
163 : valid for the lifetime of this wrapper.
164 :
165 : @param s Pointer to the sink to wrap.
166 : */
167 : template<BufferSink S>
168 : any_buffer_sink(S* s);
169 :
170 : /** Check if the wrapper contains a valid sink.
171 :
172 : @return `true` if wrapping a sink, `false` if default-constructed
173 : or moved-from.
174 : */
175 : bool
176 9 : has_value() const noexcept
177 : {
178 9 : return sink_ != nullptr;
179 : }
180 :
181 : /** Check if the wrapper contains a valid sink.
182 :
183 : @return `true` if wrapping a sink, `false` if default-constructed
184 : or moved-from.
185 : */
186 : explicit
187 2 : operator bool() const noexcept
188 : {
189 2 : return has_value();
190 : }
191 :
192 : /** Prepare writable buffers.
193 :
194 : Fills the provided span with mutable buffer descriptors
195 : pointing to the underlying sink's internal storage. This
196 : operation is synchronous.
197 :
198 : @param dest Span of mutable_buffer to fill.
199 :
200 : @return A span of filled buffers.
201 :
202 : @par Preconditions
203 : The wrapper must contain a valid sink (`has_value() == true`).
204 : */
205 : std::span<mutable_buffer>
206 : prepare(std::span<mutable_buffer> dest);
207 :
208 : /** Commit bytes written to the prepared buffers.
209 :
210 : Commits `n` bytes written to the buffers returned by the
211 : most recent call to @ref prepare. The operation may trigger
212 : underlying I/O.
213 :
214 : @param n The number of bytes to commit.
215 :
216 : @return An awaitable yielding `(error_code)`.
217 :
218 : @par Preconditions
219 : The wrapper must contain a valid sink (`has_value() == true`).
220 : */
221 : auto
222 : commit(std::size_t n);
223 :
224 : /** Commit bytes written with optional end-of-stream.
225 :
226 : Commits `n` bytes written to the buffers returned by the
227 : most recent call to @ref prepare. If `eof` is true, also
228 : signals end-of-stream.
229 :
230 : @param n The number of bytes to commit.
231 : @param eof If true, signals end-of-stream after committing.
232 :
233 : @return An awaitable yielding `(error_code)`.
234 :
235 : @par Preconditions
236 : The wrapper must contain a valid sink (`has_value() == true`).
237 : */
238 : auto
239 : commit(std::size_t n, bool eof);
240 :
241 : /** Signal end-of-stream.
242 :
243 : Indicates that no more data will be written to the sink.
244 : The operation completes when the sink is finalized, or
245 : an error occurs.
246 :
247 : @return An awaitable yielding `(error_code)`.
248 :
249 : @par Preconditions
250 : The wrapper must contain a valid sink (`has_value() == true`).
251 : */
252 : auto
253 : commit_eof();
254 :
255 : /** Write data from a buffer sequence.
256 :
257 : Writes all data from the buffer sequence to the underlying
258 : sink. This method satisfies the @ref WriteSink concept.
259 :
260 : @note This operation copies data from the caller's buffers
261 : into the sink's internal buffers. For zero-copy writes,
262 : use @ref prepare and @ref commit directly.
263 :
264 : @param buffers The buffer sequence to write.
265 :
266 : @return An awaitable yielding `(error_code,std::size_t)`.
267 :
268 : @par Preconditions
269 : The wrapper must contain a valid sink (`has_value() == true`).
270 : */
271 : template<ConstBufferSequence CB>
272 : task<io_result<std::size_t>>
273 : write(CB buffers);
274 :
275 : /** Write data with optional end-of-stream.
276 :
277 : Writes all data from the buffer sequence to the underlying
278 : sink, optionally finalizing it afterwards. This method
279 : satisfies the @ref WriteSink concept.
280 :
281 : @note This operation copies data from the caller's buffers
282 : into the sink's internal buffers. For zero-copy writes,
283 : use @ref prepare and @ref commit directly.
284 :
285 : @param buffers The buffer sequence to write.
286 : @param eof If true, finalize the sink after writing.
287 :
288 : @return An awaitable yielding `(error_code,std::size_t)`.
289 :
290 : @par Preconditions
291 : The wrapper must contain a valid sink (`has_value() == true`).
292 : */
293 : template<ConstBufferSequence CB>
294 : task<io_result<std::size_t>>
295 : write(CB buffers, bool eof);
296 :
297 : /** Signal end-of-stream.
298 :
299 : Indicates that no more data will be written to the sink.
300 : This method satisfies the @ref WriteSink concept.
301 :
302 : @return An awaitable yielding `(error_code)`.
303 :
304 : @par Preconditions
305 : The wrapper must contain a valid sink (`has_value() == true`).
306 : */
307 : auto
308 : write_eof();
309 :
310 : protected:
311 : /** Rebind to a new sink after move.
312 :
313 : Updates the internal pointer to reference a new sink object.
314 : Used by owning wrappers after move assignment when the owned
315 : object has moved to a new location.
316 :
317 : @param new_sink The new sink to bind to. Must be the same
318 : type as the original sink.
319 :
320 : @note Terminates if called with a sink of different type
321 : than the original.
322 : */
323 : template<BufferSink S>
324 : void
325 : rebind(S& new_sink) noexcept
326 : {
327 : if(vt_ != &vtable_for_impl<S>::value)
328 : std::terminate();
329 : sink_ = &new_sink;
330 : }
331 : };
332 :
333 : //----------------------------------------------------------
334 :
335 : struct any_buffer_sink::awaitable_ops
336 : {
337 : bool (*await_ready)(void*);
338 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
339 : io_result<> (*await_resume)(void*);
340 : void (*destroy)(void*) noexcept;
341 : };
342 :
343 : struct any_buffer_sink::vtable
344 : {
345 : void (*destroy)(void*) noexcept;
346 : std::span<mutable_buffer> (*do_prepare)(
347 : void* sink,
348 : std::span<mutable_buffer> dest);
349 : std::size_t awaitable_size;
350 : std::size_t awaitable_align;
351 : awaitable_ops const* (*construct_commit_awaitable)(
352 : void* sink,
353 : void* storage,
354 : std::size_t n,
355 : bool eof);
356 : awaitable_ops const* (*construct_eof_awaitable)(
357 : void* sink,
358 : void* storage);
359 : };
360 :
361 : template<BufferSink S>
362 : struct any_buffer_sink::vtable_for_impl
363 : {
364 : using CommitAwaitable = decltype(std::declval<S&>().commit(
365 : std::size_t{}, false));
366 : using EofAwaitable = decltype(std::declval<S&>().commit_eof());
367 :
368 : static void
369 0 : do_destroy_impl(void* sink) noexcept
370 : {
371 0 : static_cast<S*>(sink)->~S();
372 0 : }
373 :
374 : static std::span<mutable_buffer>
375 68 : do_prepare_impl(
376 : void* sink,
377 : std::span<mutable_buffer> dest)
378 : {
379 68 : auto& s = *static_cast<S*>(sink);
380 68 : return s.prepare(dest);
381 : }
382 :
383 : static awaitable_ops const*
384 48 : construct_commit_awaitable_impl(
385 : void* sink,
386 : void* storage,
387 : std::size_t n,
388 : bool eof)
389 : {
390 48 : auto& s = *static_cast<S*>(sink);
391 48 : ::new(storage) CommitAwaitable(s.commit(n, eof));
392 :
393 : static constexpr awaitable_ops ops = {
394 48 : +[](void* p) {
395 48 : return static_cast<CommitAwaitable*>(p)->await_ready();
396 : },
397 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
398 0 : return detail::call_await_suspend(
399 0 : static_cast<CommitAwaitable*>(p), h, ex, token);
400 : },
401 48 : +[](void* p) {
402 48 : return static_cast<CommitAwaitable*>(p)->await_resume();
403 : },
404 48 : +[](void* p) noexcept {
405 48 : static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
406 : }
407 : };
408 48 : return &ops;
409 : }
410 :
411 : static awaitable_ops const*
412 18 : construct_eof_awaitable_impl(
413 : void* sink,
414 : void* storage)
415 : {
416 18 : auto& s = *static_cast<S*>(sink);
417 18 : ::new(storage) EofAwaitable(s.commit_eof());
418 :
419 : static constexpr awaitable_ops ops = {
420 18 : +[](void* p) {
421 18 : return static_cast<EofAwaitable*>(p)->await_ready();
422 : },
423 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
424 0 : return detail::call_await_suspend(
425 0 : static_cast<EofAwaitable*>(p), h, ex, token);
426 : },
427 18 : +[](void* p) {
428 18 : return static_cast<EofAwaitable*>(p)->await_resume();
429 : },
430 18 : +[](void* p) noexcept {
431 18 : static_cast<EofAwaitable*>(p)->~EofAwaitable();
432 : }
433 : };
434 18 : return &ops;
435 : }
436 :
437 : static constexpr std::size_t max_awaitable_size =
438 : sizeof(CommitAwaitable) > sizeof(EofAwaitable)
439 : ? sizeof(CommitAwaitable)
440 : : sizeof(EofAwaitable);
441 :
442 : static constexpr std::size_t max_awaitable_align =
443 : alignof(CommitAwaitable) > alignof(EofAwaitable)
444 : ? alignof(CommitAwaitable)
445 : : alignof(EofAwaitable);
446 :
447 : static constexpr vtable value = {
448 : &do_destroy_impl,
449 : &do_prepare_impl,
450 : max_awaitable_size,
451 : max_awaitable_align,
452 : &construct_commit_awaitable_impl,
453 : &construct_eof_awaitable_impl
454 : };
455 : };
456 :
457 : //----------------------------------------------------------
458 :
459 : inline
460 63 : any_buffer_sink::~any_buffer_sink()
461 : {
462 63 : if(storage_)
463 : {
464 0 : vt_->destroy(sink_);
465 0 : ::operator delete(storage_);
466 : }
467 63 : if(cached_awaitable_)
468 60 : ::operator delete(cached_awaitable_);
469 63 : }
470 :
471 : inline any_buffer_sink&
472 1 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
473 : {
474 1 : if(this != &other)
475 : {
476 1 : if(storage_)
477 : {
478 0 : vt_->destroy(sink_);
479 0 : ::operator delete(storage_);
480 : }
481 1 : if(cached_awaitable_)
482 0 : ::operator delete(cached_awaitable_);
483 1 : sink_ = std::exchange(other.sink_, nullptr);
484 1 : vt_ = std::exchange(other.vt_, nullptr);
485 1 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
486 1 : storage_ = std::exchange(other.storage_, nullptr);
487 1 : active_ops_ = std::exchange(other.active_ops_, nullptr);
488 : }
489 1 : return *this;
490 : }
491 :
492 : template<BufferSink S>
493 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
494 : any_buffer_sink::any_buffer_sink(S s)
495 : : vt_(&vtable_for_impl<S>::value)
496 : {
497 : struct guard {
498 : any_buffer_sink* self;
499 : bool committed = false;
500 : ~guard() {
501 : if(!committed && self->storage_) {
502 : self->vt_->destroy(self->sink_);
503 : ::operator delete(self->storage_);
504 : self->storage_ = nullptr;
505 : self->sink_ = nullptr;
506 : }
507 : }
508 : } g{this};
509 :
510 : storage_ = ::operator new(sizeof(S));
511 : sink_ = ::new(storage_) S(std::move(s));
512 :
513 : // Preallocate the awaitable storage (sized for max of commit/eof)
514 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
515 :
516 : g.committed = true;
517 : }
518 :
519 : template<BufferSink S>
520 60 : any_buffer_sink::any_buffer_sink(S* s)
521 60 : : sink_(s)
522 60 : , vt_(&vtable_for_impl<S>::value)
523 : {
524 : // Preallocate the awaitable storage (sized for max of commit/eof)
525 60 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
526 60 : }
527 :
528 : //----------------------------------------------------------
529 :
530 : inline std::span<mutable_buffer>
531 68 : any_buffer_sink::prepare(std::span<mutable_buffer> dest)
532 : {
533 68 : return vt_->do_prepare(sink_, dest);
534 : }
535 :
536 : inline auto
537 48 : any_buffer_sink::commit(std::size_t n, bool eof)
538 : {
539 : struct awaitable
540 : {
541 : any_buffer_sink* self_;
542 : std::size_t n_;
543 : bool eof_;
544 :
545 : bool
546 48 : await_ready() const noexcept
547 : {
548 48 : return false;
549 : }
550 :
551 : coro
552 48 : await_suspend(coro h, executor_ref ex, std::stop_token token)
553 : {
554 : // Construct the underlying awaitable into cached storage
555 96 : self_->active_ops_ = self_->vt_->construct_commit_awaitable(
556 48 : self_->sink_,
557 48 : self_->cached_awaitable_,
558 : n_,
559 48 : eof_);
560 :
561 : // Check if underlying is immediately ready
562 48 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
563 48 : return h;
564 :
565 : // Forward to underlying awaitable
566 0 : return self_->active_ops_->await_suspend(
567 0 : self_->cached_awaitable_, h, ex, token);
568 : }
569 :
570 : io_result<>
571 48 : await_resume()
572 : {
573 : struct guard {
574 : any_buffer_sink* self;
575 48 : ~guard() {
576 48 : self->active_ops_->destroy(self->cached_awaitable_);
577 48 : self->active_ops_ = nullptr;
578 48 : }
579 48 : } g{self_};
580 48 : return self_->active_ops_->await_resume(
581 85 : self_->cached_awaitable_);
582 48 : }
583 : };
584 48 : return awaitable{this, n, eof};
585 : }
586 :
587 : inline auto
588 38 : any_buffer_sink::commit(std::size_t n)
589 : {
590 38 : return commit(n, false);
591 : }
592 :
593 : inline auto
594 18 : any_buffer_sink::commit_eof()
595 : {
596 : struct awaitable
597 : {
598 : any_buffer_sink* self_;
599 :
600 : bool
601 18 : await_ready() const noexcept
602 : {
603 18 : return false;
604 : }
605 :
606 : coro
607 18 : await_suspend(coro h, executor_ref ex, std::stop_token token)
608 : {
609 : // Construct the underlying awaitable into cached storage
610 36 : self_->active_ops_ = self_->vt_->construct_eof_awaitable(
611 18 : self_->sink_,
612 18 : self_->cached_awaitable_);
613 :
614 : // Check if underlying is immediately ready
615 18 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
616 18 : return h;
617 :
618 : // Forward to underlying awaitable
619 0 : return self_->active_ops_->await_suspend(
620 0 : self_->cached_awaitable_, h, ex, token);
621 : }
622 :
623 : io_result<>
624 18 : await_resume()
625 : {
626 : struct guard {
627 : any_buffer_sink* self;
628 18 : ~guard() {
629 18 : self->active_ops_->destroy(self->cached_awaitable_);
630 18 : self->active_ops_ = nullptr;
631 18 : }
632 18 : } g{self_};
633 18 : return self_->active_ops_->await_resume(
634 31 : self_->cached_awaitable_);
635 18 : }
636 : };
637 18 : return awaitable{this};
638 : }
639 :
640 : //----------------------------------------------------------
641 :
642 : template<ConstBufferSequence CB>
643 : task<io_result<std::size_t>>
644 : any_buffer_sink::write(CB buffers)
645 : {
646 : return write(buffers, false);
647 : }
648 :
649 : template<ConstBufferSequence CB>
650 : task<io_result<std::size_t>>
651 : any_buffer_sink::write(CB buffers, bool eof)
652 : {
653 : buffer_param<CB> bp(buffers);
654 : std::size_t total = 0;
655 :
656 : for(;;)
657 : {
658 : auto src = bp.data();
659 : if(src.empty())
660 : break;
661 :
662 : mutable_buffer arr[detail::max_iovec_];
663 : auto dst_bufs = prepare(arr);
664 : if(dst_bufs.empty())
665 : {
666 : auto [ec] = co_await commit(0);
667 : if(ec)
668 : co_return {ec, total};
669 : continue;
670 : }
671 :
672 : auto n = buffer_copy(dst_bufs, src);
673 : auto [ec] = co_await commit(n);
674 : if(ec)
675 : co_return {ec, total};
676 : bp.consume(n);
677 : total += n;
678 : }
679 :
680 : if(eof)
681 : {
682 : auto [ec] = co_await commit_eof();
683 : if(ec)
684 : co_return {ec, total};
685 : }
686 :
687 : co_return {{}, total};
688 : }
689 :
690 : inline auto
691 : any_buffer_sink::write_eof()
692 : {
693 : return commit_eof();
694 : }
695 :
696 : //----------------------------------------------------------
697 :
698 : static_assert(WriteSink<any_buffer_sink>);
699 :
700 : } // namespace capy
701 : } // namespace boost
702 :
703 : #endif
|