Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/slice.hpp>
18 : #include <boost/capy/concept/buffer_source.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/read_source.hpp>
21 : #include <boost/capy/coro.hpp>
22 : #include <boost/capy/error.hpp>
23 : #include <boost/capy/ex/executor_ref.hpp>
24 : #include <boost/capy/io_result.hpp>
25 : #include <boost/capy/task.hpp>
26 :
27 : #include <concepts>
28 : #include <coroutine>
29 : #include <cstddef>
30 : #include <exception>
31 : #include <new>
32 : #include <span>
33 : #include <stop_token>
34 : #include <system_error>
35 : #include <utility>
36 :
37 : namespace boost {
38 : namespace capy {
39 :
40 : /** Type-erased wrapper for any BufferSource.
41 :
42 : This class provides type erasure for any type satisfying the
43 : @ref BufferSource concept, enabling runtime polymorphism for
44 : buffer pull operations. The wrapper also satisfies @ref ReadSource,
45 : allowing it to be used with code expecting either interface.
46 : It uses cached awaitable storage to achieve zero steady-state
47 : allocation after construction.
48 :
49 : The wrapper also satisfies @ref ReadSource through the templated
50 : @ref read method. This method copies data from the source's
51 : internal buffers into the caller's buffers, incurring one extra
52 : buffer copy compared to using @ref pull and @ref consume directly.
53 :
54 : The wrapper supports two construction modes:
55 : - **Owning**: Pass by value to transfer ownership. The wrapper
56 : allocates storage and owns the source.
57 : - **Reference**: Pass a pointer to wrap without ownership. The
58 : pointed-to source must outlive this wrapper.
59 :
60 : @par Awaitable Preallocation
61 : The constructor preallocates storage for the type-erased awaitable.
62 : This reserves all virtual address space at server startup
63 : so memory usage can be measured up front, rather than
64 : allocating piecemeal as traffic arrives.
65 :
66 : @par Thread Safety
67 : Not thread-safe. Concurrent operations on the same wrapper
68 : are undefined behavior.
69 :
70 : @par Example
71 : @code
72 : // Owning - takes ownership of the source
73 : any_buffer_source abs(some_buffer_source{args...});
74 :
75 : // Reference - wraps without ownership
76 : some_buffer_source src;
77 : any_buffer_source abs(&src);
78 :
79 : const_buffer arr[16];
80 : auto [ec, bufs] = co_await abs.pull(arr);
81 : @endcode
82 :
83 : @see any_buffer_sink, BufferSource, ReadSource
84 : */
85 : class any_buffer_source
86 : {
87 : struct vtable;
88 : struct awaitable_ops;
89 :
90 : template<BufferSource S>
91 : struct vtable_for_impl;
92 :
93 : void* source_ = nullptr;
94 : vtable const* vt_ = nullptr;
95 : void* cached_awaitable_ = nullptr;
96 : void* storage_ = nullptr;
97 : awaitable_ops const* active_ops_ = nullptr;
98 :
99 : public:
100 : /** Destructor.
101 :
102 : Destroys the owned source (if any) and releases the cached
103 : awaitable storage.
104 : */
105 : ~any_buffer_source();
106 :
107 : /** Default constructor.
108 :
109 : Constructs an empty wrapper. Operations on a default-constructed
110 : wrapper result in undefined behavior.
111 : */
112 : any_buffer_source() = default;
113 :
114 : /** Non-copyable.
115 :
116 : The awaitable cache is per-instance and cannot be shared.
117 : */
118 : any_buffer_source(any_buffer_source const&) = delete;
119 : any_buffer_source& operator=(any_buffer_source const&) = delete;
120 :
121 : /** Move constructor.
122 :
123 : Transfers ownership of the wrapped source (if owned) and
124 : cached awaitable storage from `other`. After the move, `other` is
125 : in a default-constructed state.
126 :
127 : @param other The wrapper to move from.
128 : */
129 1 : any_buffer_source(any_buffer_source&& other) noexcept
130 1 : : source_(std::exchange(other.source_, nullptr))
131 1 : , vt_(std::exchange(other.vt_, nullptr))
132 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
133 1 : , storage_(std::exchange(other.storage_, nullptr))
134 1 : , active_ops_(std::exchange(other.active_ops_, nullptr))
135 : {
136 1 : }
137 :
138 : /** Move assignment operator.
139 :
140 : Destroys any owned source and releases existing resources,
141 : then transfers ownership from `other`.
142 :
143 : @param other The wrapper to move from.
144 : @return Reference to this wrapper.
145 : */
146 : any_buffer_source&
147 : operator=(any_buffer_source&& other) noexcept;
148 :
149 : /** Construct by taking ownership of a BufferSource.
150 :
151 : Allocates storage and moves the source into this wrapper.
152 : The wrapper owns the source and will destroy it.
153 :
154 : @param s The source to take ownership of.
155 : */
156 : template<BufferSource S>
157 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
158 : any_buffer_source(S s);
159 :
160 : /** Construct by wrapping a BufferSource without ownership.
161 :
162 : Wraps the given source by pointer. The source must remain
163 : valid for the lifetime of this wrapper.
164 :
165 : @param s Pointer to the source to wrap.
166 : */
167 : template<BufferSource S>
168 : any_buffer_source(S* s);
169 :
170 : /** Check if the wrapper contains a valid source.
171 :
172 : @return `true` if wrapping a source, `false` if default-constructed
173 : or moved-from.
174 : */
175 : bool
176 9 : has_value() const noexcept
177 : {
178 9 : return source_ != nullptr;
179 : }
180 :
181 : /** Check if the wrapper contains a valid source.
182 :
183 : @return `true` if wrapping a source, `false` if default-constructed
184 : or moved-from.
185 : */
186 : explicit
187 2 : operator bool() const noexcept
188 : {
189 2 : return has_value();
190 : }
191 :
192 : /** Consume bytes from the source.
193 :
194 : Advances the internal read position of the underlying source
195 : by the specified number of bytes. The next call to @ref pull
196 : returns data starting after the consumed bytes.
197 :
198 : @param n The number of bytes to consume. Must not exceed the
199 : total size of buffers returned by the previous @ref pull.
200 :
201 : @par Preconditions
202 : The wrapper must contain a valid source (`has_value() == true`).
203 : */
204 : void
205 : consume(std::size_t n) noexcept;
206 :
207 : /** Pull buffer data from the source.
208 :
209 : Fills the provided span with buffer descriptors from the
210 : underlying source. The operation completes when data is
211 : available, the source is exhausted, or an error occurs.
212 :
213 : @param dest Span of const_buffer to fill.
214 :
215 : @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
216 : On success with data, a non-empty span of filled buffers.
217 : On success with empty span, source is exhausted.
218 :
219 : @par Preconditions
220 : The wrapper must contain a valid source (`has_value() == true`).
221 : */
222 : auto
223 : pull(std::span<const_buffer> dest);
224 :
225 : /** Read data into a mutable buffer sequence.
226 :
227 : Fills the provided buffer sequence by pulling data from the
228 : underlying source and copying it into the caller's buffers.
229 : This satisfies @ref ReadSource but incurs a copy; for zero-copy
230 : access, use @ref pull and @ref consume instead.
231 :
232 : @note This operation copies data from the source's internal
233 : buffers into the caller's buffers. For zero-copy reads,
234 : use @ref pull and @ref consume directly.
235 :
236 : @param buffers The buffer sequence to fill.
237 :
238 : @return An awaitable yielding `(error_code,std::size_t)`.
239 : On success, `n == buffer_size(buffers)`.
240 : On EOF, `ec == error::eof` and `n` is bytes transferred.
241 :
242 : @par Preconditions
243 : The wrapper must contain a valid source (`has_value() == true`).
244 :
245 : @see pull, consume
246 : */
247 : template<MutableBufferSequence MB>
248 : task<io_result<std::size_t>>
249 : read(MB buffers);
250 :
251 : protected:
252 : /** Rebind to a new source after move.
253 :
254 : Updates the internal pointer to reference a new source object.
255 : Used by owning wrappers after move assignment when the owned
256 : object has moved to a new location.
257 :
258 : @param new_source The new source to bind to. Must be the same
259 : type as the original source.
260 :
261 : @note Terminates if called with a source of different type
262 : than the original.
263 : */
264 : template<BufferSource S>
265 : void
266 : rebind(S& new_source) noexcept
267 : {
268 : if(vt_ != &vtable_for_impl<S>::value)
269 : std::terminate();
270 : source_ = &new_source;
271 : }
272 : };
273 :
274 : //----------------------------------------------------------
275 :
276 : struct any_buffer_source::awaitable_ops
277 : {
278 : bool (*await_ready)(void*);
279 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
280 : io_result<std::span<const_buffer>> (*await_resume)(void*);
281 : void (*destroy)(void*) noexcept;
282 : };
283 :
284 : struct any_buffer_source::vtable
285 : {
286 : void (*destroy)(void*) noexcept;
287 : void (*do_consume)(void* source, std::size_t n) noexcept;
288 : std::size_t awaitable_size;
289 : std::size_t awaitable_align;
290 : awaitable_ops const* (*construct_awaitable)(
291 : void* source,
292 : void* storage,
293 : std::span<const_buffer> dest);
294 : };
295 :
296 : template<BufferSource S>
297 : struct any_buffer_source::vtable_for_impl
298 : {
299 : using Awaitable = decltype(std::declval<S&>().pull(
300 : std::declval<std::span<const_buffer>>()));
301 :
302 : static void
303 0 : do_destroy_impl(void* source) noexcept
304 : {
305 0 : static_cast<S*>(source)->~S();
306 0 : }
307 :
308 : static void
309 39 : do_consume_impl(void* source, std::size_t n) noexcept
310 : {
311 39 : static_cast<S*>(source)->consume(n);
312 39 : }
313 :
314 : static awaitable_ops const*
315 92 : construct_awaitable_impl(
316 : void* source,
317 : void* storage,
318 : std::span<const_buffer> dest)
319 : {
320 92 : auto& s = *static_cast<S*>(source);
321 92 : ::new(storage) Awaitable(s.pull(dest));
322 :
323 : static constexpr awaitable_ops ops = {
324 92 : +[](void* p) {
325 92 : return static_cast<Awaitable*>(p)->await_ready();
326 : },
327 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
328 0 : return detail::call_await_suspend(
329 0 : static_cast<Awaitable*>(p), h, ex, token);
330 : },
331 92 : +[](void* p) {
332 92 : return static_cast<Awaitable*>(p)->await_resume();
333 : },
334 92 : +[](void* p) noexcept {
335 92 : static_cast<Awaitable*>(p)->~Awaitable();
336 : }
337 : };
338 92 : return &ops;
339 : }
340 :
341 : static constexpr vtable value = {
342 : &do_destroy_impl,
343 : &do_consume_impl,
344 : sizeof(Awaitable),
345 : alignof(Awaitable),
346 : &construct_awaitable_impl
347 : };
348 : };
349 :
350 : //----------------------------------------------------------
351 :
352 : inline
353 59 : any_buffer_source::~any_buffer_source()
354 : {
355 59 : if(storage_)
356 : {
357 0 : vt_->destroy(source_);
358 0 : ::operator delete(storage_);
359 : }
360 59 : if(cached_awaitable_)
361 56 : ::operator delete(cached_awaitable_);
362 59 : }
363 :
364 : inline any_buffer_source&
365 1 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
366 : {
367 1 : if(this != &other)
368 : {
369 1 : if(storage_)
370 : {
371 0 : vt_->destroy(source_);
372 0 : ::operator delete(storage_);
373 : }
374 1 : if(cached_awaitable_)
375 0 : ::operator delete(cached_awaitable_);
376 1 : source_ = std::exchange(other.source_, nullptr);
377 1 : vt_ = std::exchange(other.vt_, nullptr);
378 1 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
379 1 : storage_ = std::exchange(other.storage_, nullptr);
380 1 : active_ops_ = std::exchange(other.active_ops_, nullptr);
381 : }
382 1 : return *this;
383 : }
384 :
385 : template<BufferSource S>
386 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
387 : any_buffer_source::any_buffer_source(S s)
388 : : vt_(&vtable_for_impl<S>::value)
389 : {
390 : struct guard {
391 : any_buffer_source* self;
392 : bool committed = false;
393 : ~guard() {
394 : if(!committed && self->storage_) {
395 : self->vt_->destroy(self->source_);
396 : ::operator delete(self->storage_);
397 : self->storage_ = nullptr;
398 : self->source_ = nullptr;
399 : }
400 : }
401 : } g{this};
402 :
403 : storage_ = ::operator new(sizeof(S));
404 : source_ = ::new(storage_) S(std::move(s));
405 :
406 : // Preallocate the awaitable storage
407 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
408 :
409 : g.committed = true;
410 : }
411 :
412 : template<BufferSource S>
413 56 : any_buffer_source::any_buffer_source(S* s)
414 56 : : source_(s)
415 56 : , vt_(&vtable_for_impl<S>::value)
416 : {
417 : // Preallocate the awaitable storage
418 56 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
419 56 : }
420 :
421 : //----------------------------------------------------------
422 :
423 : inline void
424 39 : any_buffer_source::consume(std::size_t n) noexcept
425 : {
426 39 : vt_->do_consume(source_, n);
427 39 : }
428 :
429 : inline auto
430 92 : any_buffer_source::pull(std::span<const_buffer> dest)
431 : {
432 : struct awaitable
433 : {
434 : any_buffer_source* self_;
435 : std::span<const_buffer> dest_;
436 :
437 : bool
438 92 : await_ready() const noexcept
439 : {
440 92 : return false;
441 : }
442 :
443 : coro
444 92 : await_suspend(coro h, executor_ref ex, std::stop_token token)
445 : {
446 : // Construct the underlying awaitable into cached storage
447 184 : self_->active_ops_ = self_->vt_->construct_awaitable(
448 92 : self_->source_,
449 92 : self_->cached_awaitable_,
450 : dest_);
451 :
452 : // Check if underlying is immediately ready
453 92 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
454 92 : return h;
455 :
456 : // Forward to underlying awaitable
457 0 : return self_->active_ops_->await_suspend(
458 0 : self_->cached_awaitable_, h, ex, token);
459 : }
460 :
461 : io_result<std::span<const_buffer>>
462 92 : await_resume()
463 : {
464 : struct guard {
465 : any_buffer_source* self;
466 92 : ~guard() {
467 92 : self->active_ops_->destroy(self->cached_awaitable_);
468 92 : self->active_ops_ = nullptr;
469 92 : }
470 92 : } g{self_};
471 92 : return self_->active_ops_->await_resume(
472 165 : self_->cached_awaitable_);
473 92 : }
474 : };
475 92 : return awaitable{this, dest};
476 : }
477 :
478 : template<MutableBufferSequence MB>
479 : task<io_result<std::size_t>>
480 : any_buffer_source::read(MB buffers)
481 : {
482 : std::size_t total = 0;
483 : auto dest = sans_prefix(buffers, 0);
484 :
485 : while(!buffer_empty(dest))
486 : {
487 : const_buffer arr[detail::max_iovec_];
488 : auto [ec, bufs] = co_await pull(arr);
489 :
490 : if(ec)
491 : co_return {ec, total};
492 :
493 : if(bufs.empty())
494 : co_return {error::eof, total};
495 :
496 : auto n = buffer_copy(dest, bufs);
497 : consume(n);
498 : total += n;
499 : dest = sans_prefix(dest, n);
500 : }
501 :
502 : co_return {{}, total};
503 : }
504 :
505 : //----------------------------------------------------------
506 :
507 : static_assert(BufferSource<any_buffer_source>);
508 : static_assert(ReadSource<any_buffer_source>);
509 :
510 : } // namespace capy
511 : } // namespace boost
512 :
513 : #endif
|