libs/capy/include/boost/capy/read_until.hpp

81.8% Lines (54/66) 85.0% Functions (34/40) 60.0% Branches (15/25)
libs/capy/include/boost/capy/read_until.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_READ_UNTIL_HPP
11 #define BOOST_CAPY_READ_UNTIL_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/buffers.hpp>
15 #include <boost/capy/cond.hpp>
16 #include <boost/capy/coro.hpp>
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/io_result.hpp>
19 #include <boost/capy/io_task.hpp>
20 #include <boost/capy/concept/dynamic_buffer.hpp>
21 #include <boost/capy/concept/match_condition.hpp>
22 #include <boost/capy/concept/read_stream.hpp>
23 #include <boost/capy/ex/executor_ref.hpp>
24
25 #include <algorithm>
26 #include <cstddef>
27 #include <optional>
28 #include <stop_token>
29 #include <string_view>
30 #include <type_traits>
31
32 namespace boost {
33 namespace capy {
34
35 namespace detail {
36
37 // Linearize a buffer sequence into a string
38 inline
39 std::string
40 linearize_buffers(ConstBufferSequence auto const& data)
41 {
42 std::string linear;
43 linear.reserve(buffer_size(data));
44 auto const end_ = end(data);
45 for(auto it = begin(data); it != end_; ++it)
46 linear.append(
47 static_cast<char const*>(it->data()),
48 it->size());
49 return linear;
50 }
51
52 // Search buffer using a MatchCondition, with single-buffer optimization
53 template<MatchCondition M>
54 std::size_t
55 240 search_buffer_for_match(
56 ConstBufferSequence auto const& data,
57 M const& match,
58 std::size_t* hint = nullptr)
59 {
60 // Fast path: single buffer - no linearization needed
61
1/2
✓ Branch 1 taken 240 times.
✗ Branch 2 not taken.
240 if(buffer_length(data) == 1)
62 {
63 240 auto const& buf = *begin(data);
64 720 return match(std::string_view(
65 240 static_cast<char const*>(buf.data()),
66 240 buf.size()), hint);
67 }
68 // Multiple buffers - linearize
69 return match(linearize_buffers(data), hint);
70 }
71
72 // Implementation coroutine for read_until with MatchCondition
73 template<class Stream, class B, MatchCondition M>
74 io_task<std::size_t>
75
1/1
✓ Branch 1 taken 126 times.
126 read_until_match_impl(
76 Stream& stream,
77 B& buffers,
78 M match,
79 std::size_t initial_amount)
80 {
81 std::size_t amount = initial_amount;
82
83 for(;;)
84 {
85 // Check max_size before preparing
86 if(buffers.size() >= buffers.max_size())
87 co_return {error::not_found, 0};
88
89 // Prepare space, respecting max_size
90 std::size_t const available = buffers.max_size() - buffers.size();
91 std::size_t const to_prepare = (std::min)(amount, available);
92 if(to_prepare == 0)
93 co_return {error::not_found, 0};
94
95 auto mb = buffers.prepare(to_prepare);
96 auto [ec, n] = co_await stream.read_some(mb);
97 buffers.commit(n);
98
99 if(n > 0)
100 {
101 auto pos = search_buffer_for_match(buffers.data(), match);
102 if(pos != std::string_view::npos)
103 co_return {{}, pos};
104 }
105
106 if(ec == cond::eof)
107 co_return {error::eof, buffers.size()};
108 if(ec)
109 co_return {ec, buffers.size()};
110
111 // Grow buffer size for next iteration
112 if(n == buffer_size(mb))
113 amount = amount / 2 + amount;
114 }
115 252 }
116
117 template<class Stream, class B, MatchCondition M, bool OwnsBuffer>
118 struct read_until_awaitable
119 {
120 Stream* stream_;
121 M match_;
122 std::size_t initial_amount_;
123 std::optional<io_result<std::size_t>> immediate_;
124 std::optional<io_task<std::size_t>> inner_;
125
126 using storage_type = std::conditional_t<OwnsBuffer, B, B*>;
127 storage_type buffers_storage_;
128
129 126 B& buffers() noexcept
130 {
131 if constexpr(OwnsBuffer)
132 126 return buffers_storage_;
133 else
134 return *buffers_storage_;
135 }
136
137 // Constructor for lvalue (pointer storage)
138 4 read_until_awaitable(
139 Stream& stream,
140 B* buffers,
141 M match,
142 std::size_t initial_amount)
143 requires (!OwnsBuffer)
144 4 : stream_(std::addressof(stream))
145 4 , match_(std::move(match))
146 4 , initial_amount_(initial_amount)
147 4 , buffers_storage_(buffers)
148 {
149
1/1
✓ Branch 1 taken 4 times.
4 auto pos = search_buffer_for_match(
150 4 buffers_storage_->data(), match_);
151
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 if(pos != std::string_view::npos)
152 4 immediate_.emplace(io_result<std::size_t>{{}, pos});
153 4 }
154
155 // Constructor for rvalue adapter (owned storage)
156 132 read_until_awaitable(
157 Stream& stream,
158 B&& buffers,
159 M match,
160 std::size_t initial_amount)
161 requires OwnsBuffer
162 132 : stream_(std::addressof(stream))
163 132 , match_(std::move(match))
164 132 , initial_amount_(initial_amount)
165 132 , buffers_storage_(std::move(buffers))
166 {
167
1/1
✓ Branch 1 taken 132 times.
132 auto pos = search_buffer_for_match(
168 132 buffers_storage_.data(), match_);
169
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 126 times.
132 if(pos != std::string_view::npos)
170 6 immediate_.emplace(io_result<std::size_t>{{}, pos});
171 132 }
172
173 bool
174 136 await_ready() const noexcept
175 {
176 136 return immediate_.has_value();
177 }
178
179 coro
180 126 await_suspend(coro h, executor_ref ex, std::stop_token token)
181 {
182 252 inner_.emplace(read_until_match_impl(
183
1/1
✓ Branch 1 taken 126 times.
126 *stream_, buffers(), match_, initial_amount_));
184 126 return inner_->await_suspend(h, ex, token);
185 }
186
187 io_result<std::size_t>
188 136 await_resume()
189 {
190
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 126 times.
136 if(immediate_)
191 10 return *immediate_;
192 126 return inner_->await_resume();
193 }
194 };
195
196 } // namespace detail
197
198 /** Match condition that searches for a delimiter string.
199
200 Satisfies @ref MatchCondition. Returns the position after the
201 delimiter when found, or `npos` otherwise. Provides an overlap
202 hint of `delim.size() - 1` to handle delimiters spanning reads.
203
204 @see MatchCondition, read_until
205 */
206 struct match_delim
207 {
208 std::string_view delim;
209
210 std::size_t
211 202 operator()(
212 std::string_view data,
213 std::size_t* hint) const noexcept
214 {
215
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 200 times.
202 if(delim.empty())
216 2 return 0;
217 200 auto pos = data.find(delim);
218
2/2
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 176 times.
200 if(pos != std::string_view::npos)
219 24 return pos + delim.size();
220
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 176 times.
176 if(hint)
221 *hint = delim.size() > 1 ? delim.size() - 1 : 0;
222 176 return std::string_view::npos;
223 }
224 };
225
226 /** Asynchronously read until a match condition is satisfied.
227
228 Reads data from the stream into the dynamic buffer until the match
229 condition returns a valid position. Implemented using `read_some`.
230 If the match condition is already satisfied by existing buffer
231 data, returns immediately without I/O.
232
233 @li The operation completes when:
234 @li The match condition returns a valid position
235 @li End-of-stream is reached (`cond::eof`)
236 @li The buffer's `max_size()` is reached (`cond::not_found`)
237 @li An error occurs
238 @li The operation is cancelled
239
240 @par Cancellation
241 Supports cancellation via `stop_token` propagated through the
242 IoAwaitable protocol. When cancelled, returns with `cond::canceled`.
243
244 @param stream The stream to read from. The caller retains ownership.
245 @param buffers The dynamic buffer to append data to. Must remain
246 valid until the operation completes.
247 @param match The match condition callable. Copied into the awaitable.
248 @param initial_amount Initial bytes to read per iteration (default
249 2048). Grows by 1.5x when filled.
250
251 @return An awaitable yielding `(error_code, std::size_t)`.
252 On success, `n` is the position returned by the match condition
253 (bytes up to and including the matched delimiter). Compare error
254 codes to conditions:
255 @li `cond::eof` - EOF before match; `n` is buffer size
256 @li `cond::not_found` - `max_size()` reached before match
257 @li `cond::canceled` - Operation was cancelled
258
259 @par Example
260
261 @code
262 task<> read_http_header( ReadStream auto& stream )
263 {
264 std::string header;
265 auto [ec, n] = co_await read_until(
266 stream,
267 string_dynamic_buffer( &header ),
268 []( std::string_view data, std::size_t* hint ) {
269 auto pos = data.find( "\r\n\r\n" );
270 if( pos != std::string_view::npos )
271 return pos + 4;
272 if( hint )
273 *hint = 3; // partial "\r\n\r" possible
274 return std::string_view::npos;
275 } );
276 if( ec.failed() )
277 detail::throw_system_error( ec );
278 // header contains data through "\r\n\r\n"
279 }
280 @endcode
281
282 @see read_some, MatchCondition, DynamicBufferParam
283 */
284 template<ReadStream Stream, class B, MatchCondition M>
285 requires DynamicBufferParam<B&&>
286 auto
287 136 read_until(
288 Stream& stream,
289 B&& buffers,
290 M match,
291 std::size_t initial_amount = 2048)
292 {
293 136 constexpr bool is_lvalue = std::is_lvalue_reference_v<B&&>;
294 using BareB = std::remove_reference_t<B>;
295
296 if constexpr(is_lvalue)
297 return detail::read_until_awaitable<Stream, BareB, M, false>(
298 4 stream, std::addressof(buffers), std::move(match), initial_amount);
299 else
300 return detail::read_until_awaitable<Stream, BareB, M, true>(
301 132 stream, std::move(buffers), std::move(match), initial_amount);
302 }
303
304 /** Asynchronously read until a delimiter string is found.
305
306 Reads data from the stream until the delimiter is found. This is
307 a convenience overload equivalent to calling `read_until` with
308 `match_delim{delim}`. If the delimiter already exists in the
309 buffer, returns immediately without I/O.
310
311 @li The operation completes when:
312 @li The delimiter string is found
313 @li End-of-stream is reached (`cond::eof`)
314 @li The buffer's `max_size()` is reached (`cond::not_found`)
315 @li An error occurs
316 @li The operation is cancelled
317
318 @par Cancellation
319 Supports cancellation via `stop_token` propagated through the
320 IoAwaitable protocol. When cancelled, returns with `cond::canceled`.
321
322 @param stream The stream to read from. The caller retains ownership.
323 @param buffers The dynamic buffer to append data to. Must remain
324 valid until the operation completes.
325 @param delim The delimiter string to search for.
326 @param initial_amount Initial bytes to read per iteration (default
327 2048). Grows by 1.5x when filled.
328
329 @return An awaitable yielding `(error_code, std::size_t)`.
330 On success, `n` is bytes up to and including the delimiter.
331 Compare error codes to conditions:
332 @li `cond::eof` - EOF before delimiter; `n` is buffer size
333 @li `cond::not_found` - `max_size()` reached before delimiter
334 @li `cond::canceled` - Operation was cancelled
335
336 @par Example
337
338 @code
339 task<std::string> read_line( ReadStream auto& stream )
340 {
341 std::string line;
342 auto [ec, n] = co_await read_until(
343 stream, string_dynamic_buffer( &line ), "\r\n" );
344 if( ec == cond::eof )
345 co_return line; // partial line at EOF
346 if( ec.failed() )
347 detail::throw_system_error( ec );
348 line.resize( n - 2 ); // remove "\r\n"
349 co_return line;
350 }
351 @endcode
352
353 @see read_until, match_delim, DynamicBufferParam
354 */
355 template<ReadStream Stream, class B>
356 requires DynamicBufferParam<B&&>
357 auto
358 108 read_until(
359 Stream& stream,
360 B&& buffers,
361 std::string_view delim,
362 std::size_t initial_amount = 2048)
363 {
364 return read_until(
365 stream,
366 std::forward<B>(buffers),
367 match_delim{delim},
368 108 initial_amount);
369 }
370
371 } // namespace capy
372 } // namespace boost
373
374 #endif
375