Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_param.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/read_source.hpp>
19 : #include <boost/capy/coro.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/io_result.hpp>
22 : #include <boost/capy/task.hpp>
23 :
24 : #include <concepts>
25 : #include <coroutine>
26 : #include <cstddef>
27 : #include <new>
28 : #include <span>
29 : #include <stop_token>
30 : #include <system_error>
31 : #include <utility>
32 :
33 : namespace boost {
34 : namespace capy {
35 :
36 : /** Type-erased wrapper for any ReadSource.
37 :
38 : This class provides type erasure for any type satisfying the
39 : @ref ReadSource concept, enabling runtime polymorphism for
40 : source read operations. It uses cached awaitable storage to achieve
41 : zero steady-state allocation after construction.
42 :
43 : The wrapper supports two construction modes:
44 : - **Owning**: Pass by value to transfer ownership. The wrapper
45 : allocates storage and owns the source.
46 : - **Reference**: Pass a pointer to wrap without ownership. The
47 : pointed-to source must outlive this wrapper.
48 :
49 : @par Awaitable Preallocation
50 : The constructor preallocates storage for the type-erased awaitable.
51 : This reserves all virtual address space at server startup
52 : so memory usage can be measured up front, rather than
53 : allocating piecemeal as traffic arrives.
54 :
55 : @par Thread Safety
56 : Not thread-safe. Concurrent operations on the same wrapper
57 : are undefined behavior.
58 :
59 : @par Example
60 : @code
61 : // Owning - takes ownership of the source
62 : any_read_source rs(some_source{args...});
63 :
64 : // Reference - wraps without ownership
65 : some_source source;
66 : any_read_source rs(&source);
67 :
68 : mutable_buffer buf(data, size);
69 : auto [ec, n] = co_await rs.read(std::span(&buf, 1));
70 : @endcode
71 :
72 : @see any_read_stream, ReadSource
73 : */
74 : class any_read_source
75 : {
76 : struct vtable;
77 : struct awaitable_ops;
78 :
79 : template<ReadSource S>
80 : struct vtable_for_impl;
81 :
82 : void* source_ = nullptr;
83 : vtable const* vt_ = nullptr;
84 : void* cached_awaitable_ = nullptr;
85 : void* storage_ = nullptr;
86 : awaitable_ops const* active_ops_ = nullptr;
87 :
88 : public:
89 : /** Destructor.
90 :
91 : Destroys the owned source (if any) and releases the cached
92 : awaitable storage.
93 : */
94 : ~any_read_source();
95 :
96 : /** Default constructor.
97 :
98 : Constructs an empty wrapper. Operations on a default-constructed
99 : wrapper result in undefined behavior.
100 : */
101 : any_read_source() = default;
102 :
103 : /** Non-copyable.
104 :
105 : The awaitable cache is per-instance and cannot be shared.
106 : */
107 : any_read_source(any_read_source const&) = delete;
108 : any_read_source& operator=(any_read_source const&) = delete;
109 :
110 : /** Move constructor.
111 :
112 : Transfers ownership of the wrapped source (if owned) and
113 : cached awaitable storage from `other`. After the move, `other` is
114 : in a default-constructed state.
115 :
116 : @param other The wrapper to move from.
117 : */
118 1 : any_read_source(any_read_source&& other) noexcept
119 1 : : source_(std::exchange(other.source_, nullptr))
120 1 : , vt_(std::exchange(other.vt_, nullptr))
121 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
122 1 : , storage_(std::exchange(other.storage_, nullptr))
123 1 : , active_ops_(std::exchange(other.active_ops_, nullptr))
124 : {
125 1 : }
126 :
127 : /** Move assignment operator.
128 :
129 : Destroys any owned source and releases existing resources,
130 : then transfers ownership from `other`.
131 :
132 : @param other The wrapper to move from.
133 : @return Reference to this wrapper.
134 : */
135 : any_read_source&
136 : operator=(any_read_source&& other) noexcept;
137 :
138 : /** Construct by taking ownership of a ReadSource.
139 :
140 : Allocates storage and moves the source into this wrapper.
141 : The wrapper owns the source and will destroy it.
142 :
143 : @param s The source to take ownership of.
144 : */
145 : template<ReadSource S>
146 : requires (!std::same_as<std::decay_t<S>, any_read_source>)
147 : any_read_source(S s);
148 :
149 : /** Construct by wrapping a ReadSource without ownership.
150 :
151 : Wraps the given source by pointer. The source must remain
152 : valid for the lifetime of this wrapper.
153 :
154 : @param s Pointer to the source to wrap.
155 : */
156 : template<ReadSource S>
157 : any_read_source(S* s);
158 :
159 : /** Check if the wrapper contains a valid source.
160 :
161 : @return `true` if wrapping a source, `false` if default-constructed
162 : or moved-from.
163 : */
164 : bool
165 9 : has_value() const noexcept
166 : {
167 9 : return source_ != nullptr;
168 : }
169 :
170 : /** Check if the wrapper contains a valid source.
171 :
172 : @return `true` if wrapping a source, `false` if default-constructed
173 : or moved-from.
174 : */
175 : explicit
176 2 : operator bool() const noexcept
177 : {
178 2 : return has_value();
179 : }
180 :
181 : /** Initiate an asynchronous read operation.
182 :
183 : Reads data into the provided buffer sequence. The operation
184 : completes when the entire buffer sequence is filled, end-of-file
185 : is reached, or an error occurs.
186 :
187 : @param buffers The buffer sequence to read into. Passed by
188 : value to ensure the sequence lives in the coroutine frame
189 : across suspension points.
190 :
191 : @return An awaitable yielding `(error_code,std::size_t)`.
192 :
193 : @par Postconditions
194 : Exactly one of the following is true on return:
195 : @li **Success**: `!ec` and `n == buffer_size(buffers)`.
196 : The entire buffer was filled.
197 : @li **End-of-stream or Error**: `ec` and `n` indicates
198 : the number of bytes transferred before the failure.
199 :
200 : @par Preconditions
201 : The wrapper must contain a valid source (`has_value() == true`).
202 : */
203 : template<MutableBufferSequence MB>
204 : task<io_result<std::size_t>>
205 : read(MB buffers);
206 :
207 : protected:
208 : /** Rebind to a new source after move.
209 :
210 : Updates the internal pointer to reference a new source object.
211 : Used by owning wrappers after move assignment when the owned
212 : object has moved to a new location.
213 :
214 : @param new_source The new source to bind to. Must be the same
215 : type as the original source.
216 :
217 : @note Terminates if called with a source of different type
218 : than the original.
219 : */
220 : template<ReadSource S>
221 : void
222 : rebind(S& new_source) noexcept
223 : {
224 : if(vt_ != &vtable_for_impl<S>::value)
225 : std::terminate();
226 : source_ = &new_source;
227 : }
228 :
229 : private:
230 : auto
231 : read_some_(std::span<mutable_buffer const> buffers);
232 : };
233 :
234 : //----------------------------------------------------------
235 :
236 : struct any_read_source::awaitable_ops
237 : {
238 : bool (*await_ready)(void*);
239 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
240 : io_result<std::size_t> (*await_resume)(void*);
241 : void (*destroy)(void*) noexcept;
242 : };
243 :
244 : struct any_read_source::vtable
245 : {
246 : void (*destroy)(void*) noexcept;
247 : std::size_t awaitable_size;
248 : std::size_t awaitable_align;
249 : awaitable_ops const* (*construct_awaitable)(
250 : void* source,
251 : void* storage,
252 : std::span<mutable_buffer const> buffers);
253 : };
254 :
255 : template<ReadSource S>
256 : struct any_read_source::vtable_for_impl
257 : {
258 : using Awaitable = decltype(std::declval<S&>().read(
259 : std::span<mutable_buffer const>{}));
260 :
261 : static void
262 0 : do_destroy_impl(void* source) noexcept
263 : {
264 0 : static_cast<S*>(source)->~S();
265 0 : }
266 :
267 : static awaitable_ops const*
268 130 : construct_awaitable_impl(
269 : void* source,
270 : void* storage,
271 : std::span<mutable_buffer const> buffers)
272 : {
273 130 : auto& s = *static_cast<S*>(source);
274 130 : ::new(storage) Awaitable(s.read(buffers));
275 :
276 : static constexpr awaitable_ops ops = {
277 130 : +[](void* p) {
278 130 : return static_cast<Awaitable*>(p)->await_ready();
279 : },
280 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
281 0 : return detail::call_await_suspend(
282 0 : static_cast<Awaitable*>(p), h, ex, token);
283 : },
284 130 : +[](void* p) {
285 130 : return static_cast<Awaitable*>(p)->await_resume();
286 : },
287 130 : +[](void* p) noexcept {
288 130 : static_cast<Awaitable*>(p)->~Awaitable();
289 : }
290 : };
291 130 : return &ops;
292 : }
293 :
294 : static constexpr vtable value = {
295 : &do_destroy_impl,
296 : sizeof(Awaitable),
297 : alignof(Awaitable),
298 : &construct_awaitable_impl
299 : };
300 : };
301 :
302 : //----------------------------------------------------------
303 :
304 : inline
305 91 : any_read_source::~any_read_source()
306 : {
307 91 : if(storage_)
308 : {
309 0 : vt_->destroy(source_);
310 0 : ::operator delete(storage_);
311 : }
312 91 : if(cached_awaitable_)
313 88 : ::operator delete(cached_awaitable_);
314 91 : }
315 :
316 : inline any_read_source&
317 1 : any_read_source::operator=(any_read_source&& other) noexcept
318 : {
319 1 : if(this != &other)
320 : {
321 1 : if(storage_)
322 : {
323 0 : vt_->destroy(source_);
324 0 : ::operator delete(storage_);
325 : }
326 1 : if(cached_awaitable_)
327 0 : ::operator delete(cached_awaitable_);
328 1 : source_ = std::exchange(other.source_, nullptr);
329 1 : vt_ = std::exchange(other.vt_, nullptr);
330 1 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
331 1 : storage_ = std::exchange(other.storage_, nullptr);
332 1 : active_ops_ = std::exchange(other.active_ops_, nullptr);
333 : }
334 1 : return *this;
335 : }
336 :
337 : template<ReadSource S>
338 : requires (!std::same_as<std::decay_t<S>, any_read_source>)
339 : any_read_source::any_read_source(S s)
340 : : vt_(&vtable_for_impl<S>::value)
341 : {
342 : struct guard {
343 : any_read_source* self;
344 : bool committed = false;
345 : ~guard() {
346 : if(!committed && self->storage_) {
347 : self->vt_->destroy(self->source_);
348 : ::operator delete(self->storage_);
349 : self->storage_ = nullptr;
350 : self->source_ = nullptr;
351 : }
352 : }
353 : } g{this};
354 :
355 : storage_ = ::operator new(sizeof(S));
356 : source_ = ::new(storage_) S(std::move(s));
357 :
358 : // Preallocate the awaitable storage
359 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
360 :
361 : g.committed = true;
362 : }
363 :
364 : template<ReadSource S>
365 88 : any_read_source::any_read_source(S* s)
366 88 : : source_(s)
367 88 : , vt_(&vtable_for_impl<S>::value)
368 : {
369 : // Preallocate the awaitable storage
370 88 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
371 88 : }
372 :
373 : //----------------------------------------------------------
374 :
375 : inline auto
376 130 : any_read_source::read_some_(std::span<mutable_buffer const> buffers)
377 : {
378 : struct awaitable
379 : {
380 : any_read_source* self_;
381 : std::span<mutable_buffer const> buffers_;
382 :
383 : bool
384 130 : await_ready() const noexcept
385 : {
386 130 : return false;
387 : }
388 :
389 : coro
390 130 : await_suspend(coro h, executor_ref ex, std::stop_token token)
391 : {
392 : // Construct the underlying awaitable into cached storage
393 260 : self_->active_ops_ = self_->vt_->construct_awaitable(
394 130 : self_->source_,
395 130 : self_->cached_awaitable_,
396 : buffers_);
397 :
398 : // Check if underlying is immediately ready
399 130 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
400 130 : return h;
401 :
402 : // Forward to underlying awaitable
403 0 : return self_->active_ops_->await_suspend(
404 0 : self_->cached_awaitable_, h, ex, token);
405 : }
406 :
407 : io_result<std::size_t>
408 130 : await_resume()
409 : {
410 : struct guard {
411 : any_read_source* self;
412 130 : ~guard() {
413 130 : self->active_ops_->destroy(self->cached_awaitable_);
414 130 : self->active_ops_ = nullptr;
415 130 : }
416 130 : } g{self_};
417 130 : return self_->active_ops_->await_resume(
418 229 : self_->cached_awaitable_);
419 130 : }
420 : };
421 130 : return awaitable{this, buffers};
422 : }
423 :
424 : template<MutableBufferSequence MB>
425 : task<io_result<std::size_t>>
426 106 : any_read_source::read(MB buffers)
427 : {
428 : buffer_param<MB> bp(std::move(buffers));
429 : std::size_t total = 0;
430 :
431 : for(;;)
432 : {
433 : auto bufs = bp.data();
434 : if(bufs.empty())
435 : break;
436 :
437 : auto [ec, n] = co_await read_some_(bufs);
438 : total += n;
439 : if(ec)
440 : co_return {ec, total};
441 : bp.consume(n);
442 : }
443 :
444 : co_return {{}, total};
445 212 : }
446 :
447 : } // namespace capy
448 : } // namespace boost
449 :
450 : #endif
|