libs/capy/include/boost/capy/io/any_read_stream.hpp

82.7% Lines (62/75) 84.0% Functions (21/25) 62.5% Branches (10/16)
libs/capy/include/boost/capy/io/any_read_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/read_stream.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <concepts>
24 #include <coroutine>
25 #include <cstddef>
26 #include <new>
27 #include <span>
28 #include <stop_token>
29 #include <system_error>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any ReadStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref ReadStream concept, enabling runtime polymorphism for
39 read operations. It uses cached awaitable storage to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Awaitable Preallocation
49 The constructor preallocates storage for the type-erased awaitable.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Thread Safety
55 Not thread-safe. Concurrent operations on the same wrapper
56 are undefined behavior.
57
58 @par Example
59 @code
60 // Owning - takes ownership of the stream
61 any_read_stream stream(socket{ioc});
62
63 // Reference - wraps without ownership
64 socket sock(ioc);
65 any_read_stream stream(&sock);
66
67 mutable_buffer buf(data, size);
68 auto [ec, n] = co_await stream.read_some(std::span(&buf, 1));
69 @endcode
70
71 @see any_write_stream, any_stream, ReadStream
72 */
73 class any_read_stream
74 {
75 struct vtable;
76 struct awaitable_ops;
77
78 template<ReadStream S>
79 struct vtable_for_impl;
80
81 void* stream_ = nullptr;
82 vtable const* vt_ = nullptr;
83 void* cached_awaitable_ = nullptr;
84 void* storage_ = nullptr;
85 awaitable_ops const* active_ops_ = nullptr;
86
87 public:
88 /** Destructor.
89
90 Destroys the owned stream (if any) and releases the cached
91 awaitable storage.
92 */
93 ~any_read_stream();
94
95 /** Default constructor.
96
97 Constructs an empty wrapper. Operations on a default-constructed
98 wrapper result in undefined behavior.
99 */
100 1 any_read_stream() = default;
101
102 /** Non-copyable.
103
104 The awaitable cache is per-instance and cannot be shared.
105 */
106 any_read_stream(any_read_stream const&) = delete;
107 any_read_stream& operator=(any_read_stream const&) = delete;
108
109 /** Move constructor.
110
111 Transfers ownership of the wrapped stream (if owned) and
112 cached awaitable storage from `other`. After the move, `other` is
113 in a default-constructed state.
114
115 @param other The wrapper to move from.
116 */
117 2 any_read_stream(any_read_stream&& other) noexcept
118 2 : stream_(std::exchange(other.stream_, nullptr))
119 2 , vt_(std::exchange(other.vt_, nullptr))
120 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
121 2 , storage_(std::exchange(other.storage_, nullptr))
122 2 , active_ops_(std::exchange(other.active_ops_, nullptr))
123 {
124 2 }
125
126 /** Move assignment operator.
127
128 Destroys any owned stream and releases existing resources,
129 then transfers ownership from `other`.
130
131 @param other The wrapper to move from.
132 @return Reference to this wrapper.
133 */
134 any_read_stream&
135 operator=(any_read_stream&& other) noexcept;
136
137 /** Construct by taking ownership of a ReadStream.
138
139 Allocates storage and moves the stream into this wrapper.
140 The wrapper owns the stream and will destroy it.
141
142 @param s The stream to take ownership of.
143 */
144 template<ReadStream S>
145 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
146 any_read_stream(S s);
147
148 /** Construct by wrapping a ReadStream without ownership.
149
150 Wraps the given stream by pointer. The stream must remain
151 valid for the lifetime of this wrapper.
152
153 @param s Pointer to the stream to wrap.
154 */
155 template<ReadStream S>
156 any_read_stream(S* s);
157
158 /** Check if the wrapper contains a valid stream.
159
160 @return `true` if wrapping a stream, `false` if default-constructed
161 or moved-from.
162 */
163 bool
164 19 has_value() const noexcept
165 {
166 19 return stream_ != nullptr;
167 }
168
169 /** Check if the wrapper contains a valid stream.
170
171 @return `true` if wrapping a stream, `false` if default-constructed
172 or moved-from.
173 */
174 explicit
175 2 operator bool() const noexcept
176 {
177 2 return has_value();
178 }
179
180 /** Initiate an asynchronous read operation.
181
182 Reads data into the provided buffer sequence. The operation
183 completes when at least one byte has been read, or an error
184 occurs.
185
186 @param buffers The buffer sequence to read into. Passed by
187 value to ensure the sequence lives in the coroutine frame
188 across suspension points.
189
190 @return An awaitable yielding `(error_code,std::size_t)`.
191
192 @par Preconditions
193 The wrapper must contain a valid stream (`has_value() == true`).
194 */
195 template<MutableBufferSequence MB>
196 auto
197 read_some(MB buffers);
198
199 protected:
200 /** Rebind to a new stream after move.
201
202 Updates the internal pointer to reference a new stream object.
203 Used by owning wrappers after move assignment when the owned
204 object has moved to a new location.
205
206 @param new_stream The new stream to bind to. Must be the same
207 type as the original stream.
208
209 @note Terminates if called with a stream of different type
210 than the original.
211 */
212 template<ReadStream S>
213 void
214 rebind(S& new_stream) noexcept
215 {
216 if(vt_ != &vtable_for_impl<S>::value)
217 std::terminate();
218 stream_ = &new_stream;
219 }
220 };
221
222 //----------------------------------------------------------
223
224 struct any_read_stream::awaitable_ops
225 {
226 bool (*await_ready)(void*);
227 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
228 io_result<std::size_t> (*await_resume)(void*);
229 void (*destroy)(void*) noexcept;
230 };
231
232 struct any_read_stream::vtable
233 {
234 void (*destroy)(void*) noexcept;
235 std::size_t awaitable_size;
236 std::size_t awaitable_align;
237 awaitable_ops const* (*construct_awaitable)(
238 void* stream,
239 void* storage,
240 std::span<mutable_buffer const> buffers);
241 };
242
243 template<ReadStream S>
244 struct any_read_stream::vtable_for_impl
245 {
246 using Awaitable = decltype(std::declval<S&>().read_some(
247 std::span<mutable_buffer const>{}));
248
249 static void
250 do_destroy_impl(void* stream) noexcept
251 {
252 static_cast<S*>(stream)->~S();
253 }
254
255 static awaitable_ops const*
256 70 construct_awaitable_impl(
257 void* stream,
258 void* storage,
259 std::span<mutable_buffer const> buffers)
260 {
261 70 auto& s = *static_cast<S*>(stream);
262 70 ::new(storage) Awaitable(s.read_some(buffers));
263
264 static constexpr awaitable_ops ops = {
265 70 +[](void* p) {
266 70 return static_cast<Awaitable*>(p)->await_ready();
267 },
268 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
269 return detail::call_await_suspend(
270 static_cast<Awaitable*>(p), h, ex, token);
271 },
272 70 +[](void* p) {
273 70 return static_cast<Awaitable*>(p)->await_resume();
274 },
275 70 +[](void* p) noexcept {
276 70 static_cast<Awaitable*>(p)->~Awaitable();
277 }
278 };
279 70 return &ops;
280 }
281
282 static constexpr vtable value = {
283 &do_destroy_impl,
284 sizeof(Awaitable),
285 alignof(Awaitable),
286 &construct_awaitable_impl
287 };
288 };
289
290 //----------------------------------------------------------
291
292 inline
293 78 any_read_stream::~any_read_stream()
294 {
295
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78 times.
78 if(storage_)
296 {
297 vt_->destroy(stream_);
298 ::operator delete(storage_);
299 }
300
2/2
✓ Branch 0 taken 71 times.
✓ Branch 1 taken 7 times.
78 if(cached_awaitable_)
301 71 ::operator delete(cached_awaitable_);
302 78 }
303
304 inline any_read_stream&
305 3 any_read_stream::operator=(any_read_stream&& other) noexcept
306 {
307
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if(this != &other)
308 {
309
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(storage_)
310 {
311 vt_->destroy(stream_);
312 ::operator delete(storage_);
313 }
314
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(cached_awaitable_)
315 ::operator delete(cached_awaitable_);
316 3 stream_ = std::exchange(other.stream_, nullptr);
317 3 vt_ = std::exchange(other.vt_, nullptr);
318 3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
319 3 storage_ = std::exchange(other.storage_, nullptr);
320 3 active_ops_ = std::exchange(other.active_ops_, nullptr);
321 }
322 3 return *this;
323 }
324
325 template<ReadStream S>
326 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
327 any_read_stream::any_read_stream(S s)
328 : vt_(&vtable_for_impl<S>::value)
329 {
330 struct guard {
331 any_read_stream* self;
332 bool committed = false;
333 ~guard() {
334 if(!committed && self->storage_) {
335 self->vt_->destroy(self->stream_);
336 ::operator delete(self->storage_);
337 self->storage_ = nullptr;
338 self->stream_ = nullptr;
339 }
340 }
341 } g{this};
342
343 storage_ = ::operator new(sizeof(S));
344 stream_ = ::new(storage_) S(std::move(s));
345
346 // Preallocate the awaitable storage
347 cached_awaitable_ = ::operator new(vt_->awaitable_size);
348
349 g.committed = true;
350 }
351
352 template<ReadStream S>
353 71 any_read_stream::any_read_stream(S* s)
354 71 : stream_(s)
355 71 , vt_(&vtable_for_impl<S>::value)
356 {
357 // Preallocate the awaitable storage
358 71 cached_awaitable_ = ::operator new(vt_->awaitable_size);
359 71 }
360
361 //----------------------------------------------------------
362
363 template<MutableBufferSequence MB>
364 auto
365 70 any_read_stream::read_some(MB buffers)
366 {
367 struct awaitable
368 {
369 any_read_stream* self_;
370 buffer_param<MB> bp_;
371
372 bool
373 70 await_ready() const noexcept
374 {
375 70 return false;
376 }
377
378 coro
379 70 await_suspend(coro h, executor_ref ex, std::stop_token token)
380 {
381 // Construct the underlying awaitable into cached storage
382 70 self_->active_ops_ = self_->vt_->construct_awaitable(
383 70 self_->stream_,
384
1/1
✓ Branch 1 taken 14 times.
70 self_->cached_awaitable_,
385
1/1
✓ Branch 1 taken 14 times.
70 bp_.data());
386
387 // Check if underlying is immediately ready
388
1/2
✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
70 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
389 70 return h;
390
391 // Forward to underlying awaitable
392 return self_->active_ops_->await_suspend(
393 self_->cached_awaitable_, h, ex, token);
394 }
395
396 io_result<std::size_t>
397 70 await_resume()
398 {
399 struct guard {
400 any_read_stream* self;
401 70 ~guard() {
402 70 self->active_ops_->destroy(self->cached_awaitable_);
403 70 self->active_ops_ = nullptr;
404 70 }
405 70 } g{self_};
406 70 return self_->active_ops_->await_resume(
407
1/1
✓ Branch 1 taken 10 times.
120 self_->cached_awaitable_);
408 70 }
409 };
410 70 return awaitable{this, buffer_param<MB>(buffers)};
411 }
412
413 } // namespace capy
414 } // namespace boost
415
416 #endif
417