Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_READ_UNTIL_HPP
11 : #define BOOST_CAPY_READ_UNTIL_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/cond.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/error.hpp>
18 : #include <boost/capy/io_result.hpp>
19 : #include <boost/capy/io_task.hpp>
20 : #include <boost/capy/concept/dynamic_buffer.hpp>
21 : #include <boost/capy/concept/match_condition.hpp>
22 : #include <boost/capy/concept/read_stream.hpp>
23 : #include <boost/capy/ex/executor_ref.hpp>
24 :
25 : #include <algorithm>
26 : #include <cstddef>
27 : #include <optional>
28 : #include <stop_token>
29 : #include <string_view>
30 : #include <type_traits>
31 :
32 : namespace boost {
33 : namespace capy {
34 :
35 : namespace detail {
36 :
37 : // Linearize a buffer sequence into a string
38 : inline
39 : std::string
40 0 : linearize_buffers(ConstBufferSequence auto const& data)
41 : {
42 0 : std::string linear;
43 0 : linear.reserve(buffer_size(data));
44 0 : auto const end_ = end(data);
45 0 : for(auto it = begin(data); it != end_; ++it)
46 0 : linear.append(
47 0 : static_cast<char const*>(it->data()),
48 : it->size());
49 0 : return linear;
50 0 : }
51 :
52 : // Search buffer using a MatchCondition, with single-buffer optimization
53 : template<MatchCondition M>
54 : std::size_t
55 240 : search_buffer_for_match(
56 : ConstBufferSequence auto const& data,
57 : M const& match,
58 : std::size_t* hint = nullptr)
59 : {
60 : // Fast path: single buffer - no linearization needed
61 240 : if(buffer_length(data) == 1)
62 : {
63 240 : auto const& buf = *begin(data);
64 720 : return match(std::string_view(
65 240 : static_cast<char const*>(buf.data()),
66 240 : buf.size()), hint);
67 : }
68 : // Multiple buffers - linearize
69 0 : return match(linearize_buffers(data), hint);
70 : }
71 :
72 : // Implementation coroutine for read_until with MatchCondition
73 : template<class Stream, class B, MatchCondition M>
74 : io_task<std::size_t>
75 126 : read_until_match_impl(
76 : Stream& stream,
77 : B& buffers,
78 : M match,
79 : std::size_t initial_amount)
80 : {
81 : std::size_t amount = initial_amount;
82 :
83 : for(;;)
84 : {
85 : // Check max_size before preparing
86 : if(buffers.size() >= buffers.max_size())
87 : co_return {error::not_found, 0};
88 :
89 : // Prepare space, respecting max_size
90 : std::size_t const available = buffers.max_size() - buffers.size();
91 : std::size_t const to_prepare = (std::min)(amount, available);
92 : if(to_prepare == 0)
93 : co_return {error::not_found, 0};
94 :
95 : auto mb = buffers.prepare(to_prepare);
96 : auto [ec, n] = co_await stream.read_some(mb);
97 : buffers.commit(n);
98 :
99 : if(n > 0)
100 : {
101 : auto pos = search_buffer_for_match(buffers.data(), match);
102 : if(pos != std::string_view::npos)
103 : co_return {{}, pos};
104 : }
105 :
106 : if(ec == cond::eof)
107 : co_return {error::eof, buffers.size()};
108 : if(ec)
109 : co_return {ec, buffers.size()};
110 :
111 : // Grow buffer size for next iteration
112 : if(n == buffer_size(mb))
113 : amount = amount / 2 + amount;
114 : }
115 252 : }
116 :
117 : template<class Stream, class B, MatchCondition M, bool OwnsBuffer>
118 : struct read_until_awaitable
119 : {
120 : Stream* stream_;
121 : M match_;
122 : std::size_t initial_amount_;
123 : std::optional<io_result<std::size_t>> immediate_;
124 : std::optional<io_task<std::size_t>> inner_;
125 :
126 : using storage_type = std::conditional_t<OwnsBuffer, B, B*>;
127 : storage_type buffers_storage_;
128 :
129 126 : B& buffers() noexcept
130 : {
131 : if constexpr(OwnsBuffer)
132 126 : return buffers_storage_;
133 : else
134 0 : return *buffers_storage_;
135 : }
136 :
137 : // Constructor for lvalue (pointer storage)
138 4 : read_until_awaitable(
139 : Stream& stream,
140 : B* buffers,
141 : M match,
142 : std::size_t initial_amount)
143 : requires (!OwnsBuffer)
144 4 : : stream_(std::addressof(stream))
145 4 : , match_(std::move(match))
146 4 : , initial_amount_(initial_amount)
147 4 : , buffers_storage_(buffers)
148 : {
149 4 : auto pos = search_buffer_for_match(
150 4 : buffers_storage_->data(), match_);
151 4 : if(pos != std::string_view::npos)
152 4 : immediate_.emplace(io_result<std::size_t>{{}, pos});
153 4 : }
154 :
155 : // Constructor for rvalue adapter (owned storage)
156 132 : read_until_awaitable(
157 : Stream& stream,
158 : B&& buffers,
159 : M match,
160 : std::size_t initial_amount)
161 : requires OwnsBuffer
162 132 : : stream_(std::addressof(stream))
163 132 : , match_(std::move(match))
164 132 : , initial_amount_(initial_amount)
165 132 : , buffers_storage_(std::move(buffers))
166 : {
167 132 : auto pos = search_buffer_for_match(
168 132 : buffers_storage_.data(), match_);
169 132 : if(pos != std::string_view::npos)
170 6 : immediate_.emplace(io_result<std::size_t>{{}, pos});
171 132 : }
172 :
173 : bool
174 136 : await_ready() const noexcept
175 : {
176 136 : return immediate_.has_value();
177 : }
178 :
179 : coro
180 126 : await_suspend(coro h, executor_ref ex, std::stop_token token)
181 : {
182 252 : inner_.emplace(read_until_match_impl(
183 126 : *stream_, buffers(), match_, initial_amount_));
184 126 : return inner_->await_suspend(h, ex, token);
185 : }
186 :
187 : io_result<std::size_t>
188 136 : await_resume()
189 : {
190 136 : if(immediate_)
191 10 : return *immediate_;
192 126 : return inner_->await_resume();
193 : }
194 : };
195 :
196 : } // namespace detail
197 :
198 : /** Match condition that searches for a delimiter string.
199 :
200 : Satisfies @ref MatchCondition. Returns the position after the
201 : delimiter when found, or `npos` otherwise. Provides an overlap
202 : hint of `delim.size() - 1` to handle delimiters spanning reads.
203 :
204 : @see MatchCondition, read_until
205 : */
206 : struct match_delim
207 : {
208 : std::string_view delim;
209 :
210 : std::size_t
211 202 : operator()(
212 : std::string_view data,
213 : std::size_t* hint) const noexcept
214 : {
215 202 : if(delim.empty())
216 2 : return 0;
217 200 : auto pos = data.find(delim);
218 200 : if(pos != std::string_view::npos)
219 24 : return pos + delim.size();
220 176 : if(hint)
221 0 : *hint = delim.size() > 1 ? delim.size() - 1 : 0;
222 176 : return std::string_view::npos;
223 : }
224 : };
225 :
226 : /** Asynchronously read until a match condition is satisfied.
227 :
228 : Reads data from the stream into the dynamic buffer until the match
229 : condition returns a valid position. Implemented using `read_some`.
230 : If the match condition is already satisfied by existing buffer
231 : data, returns immediately without I/O.
232 :
233 : @li The operation completes when:
234 : @li The match condition returns a valid position
235 : @li End-of-stream is reached (`cond::eof`)
236 : @li The buffer's `max_size()` is reached (`cond::not_found`)
237 : @li An error occurs
238 : @li The operation is cancelled
239 :
240 : @par Cancellation
241 : Supports cancellation via `stop_token` propagated through the
242 : IoAwaitable protocol. When cancelled, returns with `cond::canceled`.
243 :
244 : @param stream The stream to read from. The caller retains ownership.
245 : @param buffers The dynamic buffer to append data to. Must remain
246 : valid until the operation completes.
247 : @param match The match condition callable. Copied into the awaitable.
248 : @param initial_amount Initial bytes to read per iteration (default
249 : 2048). Grows by 1.5x when filled.
250 :
251 : @return An awaitable yielding `(error_code, std::size_t)`.
252 : On success, `n` is the position returned by the match condition
253 : (bytes up to and including the matched delimiter). Compare error
254 : codes to conditions:
255 : @li `cond::eof` - EOF before match; `n` is buffer size
256 : @li `cond::not_found` - `max_size()` reached before match
257 : @li `cond::canceled` - Operation was cancelled
258 :
259 : @par Example
260 :
261 : @code
262 : task<> read_http_header( ReadStream auto& stream )
263 : {
264 : std::string header;
265 : auto [ec, n] = co_await read_until(
266 : stream,
267 : string_dynamic_buffer( &header ),
268 : []( std::string_view data, std::size_t* hint ) {
269 : auto pos = data.find( "\r\n\r\n" );
270 : if( pos != std::string_view::npos )
271 : return pos + 4;
272 : if( hint )
273 : *hint = 3; // partial "\r\n\r" possible
274 : return std::string_view::npos;
275 : } );
276 : if( ec.failed() )
277 : detail::throw_system_error( ec );
278 : // header contains data through "\r\n\r\n"
279 : }
280 : @endcode
281 :
282 : @see read_some, MatchCondition, DynamicBufferParam
283 : */
284 : template<ReadStream Stream, class B, MatchCondition M>
285 : requires DynamicBufferParam<B&&>
286 : auto
287 136 : read_until(
288 : Stream& stream,
289 : B&& buffers,
290 : M match,
291 : std::size_t initial_amount = 2048)
292 : {
293 136 : constexpr bool is_lvalue = std::is_lvalue_reference_v<B&&>;
294 : using BareB = std::remove_reference_t<B>;
295 :
296 : if constexpr(is_lvalue)
297 : return detail::read_until_awaitable<Stream, BareB, M, false>(
298 4 : stream, std::addressof(buffers), std::move(match), initial_amount);
299 : else
300 : return detail::read_until_awaitable<Stream, BareB, M, true>(
301 132 : stream, std::move(buffers), std::move(match), initial_amount);
302 : }
303 :
304 : /** Asynchronously read until a delimiter string is found.
305 :
306 : Reads data from the stream until the delimiter is found. This is
307 : a convenience overload equivalent to calling `read_until` with
308 : `match_delim{delim}`. If the delimiter already exists in the
309 : buffer, returns immediately without I/O.
310 :
311 : @li The operation completes when:
312 : @li The delimiter string is found
313 : @li End-of-stream is reached (`cond::eof`)
314 : @li The buffer's `max_size()` is reached (`cond::not_found`)
315 : @li An error occurs
316 : @li The operation is cancelled
317 :
318 : @par Cancellation
319 : Supports cancellation via `stop_token` propagated through the
320 : IoAwaitable protocol. When cancelled, returns with `cond::canceled`.
321 :
322 : @param stream The stream to read from. The caller retains ownership.
323 : @param buffers The dynamic buffer to append data to. Must remain
324 : valid until the operation completes.
325 : @param delim The delimiter string to search for.
326 : @param initial_amount Initial bytes to read per iteration (default
327 : 2048). Grows by 1.5x when filled.
328 :
329 : @return An awaitable yielding `(error_code, std::size_t)`.
330 : On success, `n` is bytes up to and including the delimiter.
331 : Compare error codes to conditions:
332 : @li `cond::eof` - EOF before delimiter; `n` is buffer size
333 : @li `cond::not_found` - `max_size()` reached before delimiter
334 : @li `cond::canceled` - Operation was cancelled
335 :
336 : @par Example
337 :
338 : @code
339 : task<std::string> read_line( ReadStream auto& stream )
340 : {
341 : std::string line;
342 : auto [ec, n] = co_await read_until(
343 : stream, string_dynamic_buffer( &line ), "\r\n" );
344 : if( ec == cond::eof )
345 : co_return line; // partial line at EOF
346 : if( ec.failed() )
347 : detail::throw_system_error( ec );
348 : line.resize( n - 2 ); // remove "\r\n"
349 : co_return line;
350 : }
351 : @endcode
352 :
353 : @see read_until, match_delim, DynamicBufferParam
354 : */
355 : template<ReadStream Stream, class B>
356 : requires DynamicBufferParam<B&&>
357 : auto
358 108 : read_until(
359 : Stream& stream,
360 : B&& buffers,
361 : std::string_view delim,
362 : std::size_t initial_amount = 2048)
363 : {
364 : return read_until(
365 : stream,
366 : std::forward<B>(buffers),
367 : match_delim{delim},
368 108 : initial_amount);
369 : }
370 :
371 : } // namespace capy
372 : } // namespace boost
373 :
374 : #endif
|