include/boost/capy/io/any_read_source.hpp

86.7% Lines (98/113) 92.0% List of functions (23/25) 80.8% Branches (21/26)
f(x) Functions (25)
Function Calls Lines Branches Blocks
boost::capy::any_read_source::any_read_source(boost::capy::any_read_source&&) :124 0 100.0% boost::capy::any_read_source::has_value() const :171 0 100.0% boost::capy::any_read_source::operator bool() const :182 0 100.0% boost::capy::any_read_source::vtable_for_impl<boost::capy::(anonymous namespace)::pending_read_source>::do_destroy_impl(void*) :314 0 0.0% boost::capy::any_read_source::vtable_for_impl<boost::capy::test::read_source>::do_destroy_impl(void*) :314 0 100.0% boost::capy::any_read_source::vtable_for_impl<boost::capy::(anonymous namespace)::pending_read_source>::construct_read_some_awaitable_impl(void*, void*, std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>) :320 0 100.0% boost::capy::any_read_source::vtable_for_impl<boost::capy::test::read_source>::construct_read_some_awaitable_impl(void*, void*, std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>) :320 0 100.0% boost::capy::any_read_source::vtable_for_impl<boost::capy::(anonymous namespace)::pending_read_source>::construct_read_awaitable_impl(void*, void*, std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>) :347 0 0.0% boost::capy::any_read_source::vtable_for_impl<boost::capy::test::read_source>::construct_read_awaitable_impl(void*, void*, std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>) :347 0 100.0% boost::capy::any_read_source::~any_read_source() :392 0 100.0% 100.0% boost::capy::any_read_source::operator=(boost::capy::any_read_source&&) :408 0 86.7% 87.5% boost::capy::any_read_source::any_read_source<boost::capy::test::read_source>(boost::capy::test::read_source) :434 0 100.0% boost::capy::any_read_source::any_read_source<boost::capy::test::read_source>(boost::capy::test::read_source)::guard::~guard() :440 0 69.2% 50.0% boost::capy::any_read_source::any_read_source<boost::capy::(anonymous namespace)::pending_read_source>(boost::capy::(anonymous namespace)::pending_read_source*) :460 0 100.0% boost::capy::any_read_source::any_read_source<boost::capy::test::read_source>(boost::capy::test::read_source*) :460 0 100.0% auto boost::capy::any_read_source::read_some<boost::capy::mutable_buffer>(boost::capy::mutable_buffer) :470 0 100.0% auto boost::capy::any_read_source::read_some<std::array<boost::capy::mutable_buffer, 2ul> >(std::array<boost::capy::mutable_buffer, 2ul>) :470 0 100.0% boost::capy::any_read_source::read_(std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>) :525 0 100.0% boost::capy::any_read_source::read_(std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>)::awaitable::await_ready() const :533 0 100.0% boost::capy::any_read_source::read_(std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>)::awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :539 0 75.0% 50.0% boost::capy::any_read_source::read_(std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>)::awaitable::await_resume() :554 0 100.0% boost::capy::any_read_source::read_(std::span<boost::capy::mutable_buffer const, 18446744073709551615ul>)::awaitable::await_resume()::guard::~guard() :558 0 100.0% 100.0% boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_read_source::read<boost::capy::mutable_buffer>(boost::capy::mutable_buffer) :572 0 100.0% 100.0% boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_read_source::read<std::array<boost::capy::mutable_buffer, 20ul> >(std::array<boost::capy::mutable_buffer, 20ul>) :572 0 100.0% 100.0% boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_read_source::read<std::array<boost::capy::mutable_buffer, 2ul> >(std::array<boost::capy::mutable_buffer, 2ul>) :572 0 100.0% 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/io_awaitable.hpp>
19 #include <boost/capy/concept/read_source.hpp>
20 #include <boost/capy/ex/io_env.hpp>
21 #include <boost/capy/io_result.hpp>
22 #include <boost/capy/io_task.hpp>
23
24 #include <concepts>
25 #include <coroutine>
26 #include <cstddef>
27 #include <exception>
28 #include <new>
29 #include <span>
30 #include <stop_token>
31 #include <system_error>
32 #include <utility>
33
34 namespace boost {
35 namespace capy {
36
37 /** Type-erased wrapper for any ReadSource.
38
39 This class provides type erasure for any type satisfying the
40 @ref ReadSource concept, enabling runtime polymorphism for
41 source read operations. It uses cached awaitable storage to achieve
42 zero steady-state allocation after construction.
43
44 The wrapper supports two construction modes:
45 - **Owning**: Pass by value to transfer ownership. The wrapper
46 allocates storage and owns the source.
47 - **Reference**: Pass a pointer to wrap without ownership. The
48 pointed-to source must outlive this wrapper.
49
50 @par Awaitable Preallocation
51 The constructor preallocates storage for the type-erased awaitable.
52 This reserves all virtual address space at server startup
53 so memory usage can be measured up front, rather than
54 allocating piecemeal as traffic arrives.
55
56 @par Immediate Completion
57 Operations complete immediately without suspending when the
58 buffer sequence is empty, or when the underlying source's
59 awaitable reports readiness via `await_ready`.
60
61 @par Thread Safety
62 Not thread-safe. Concurrent operations on the same wrapper
63 are undefined behavior.
64
65 @par Example
66 @code
67 // Owning - takes ownership of the source
68 any_read_source rs(some_source{args...});
69
70 // Reference - wraps without ownership
71 some_source source;
72 any_read_source rs(&source);
73
74 mutable_buffer buf(data, size);
75 auto [ec, n] = co_await rs.read(std::span(&buf, 1));
76 @endcode
77
78 @see any_read_stream, ReadSource
79 */
80 class any_read_source
81 {
82 struct vtable;
83 struct awaitable_ops;
84
85 template<ReadSource S>
86 struct vtable_for_impl;
87
88 void* source_ = nullptr;
89 vtable const* vt_ = nullptr;
90 void* cached_awaitable_ = nullptr;
91 void* storage_ = nullptr;
92 awaitable_ops const* active_ops_ = nullptr;
93
94 public:
95 /** Destructor.
96
97 Destroys the owned source (if any) and releases the cached
98 awaitable storage.
99 */
100 ~any_read_source();
101
102 /** Construct a default instance.
103
104 Constructs an empty wrapper. Operations on a default-constructed
105 wrapper result in undefined behavior.
106 */
107 any_read_source() = default;
108
109 /** Non-copyable.
110
111 The awaitable cache is per-instance and cannot be shared.
112 */
113 any_read_source(any_read_source const&) = delete;
114 any_read_source& operator=(any_read_source const&) = delete;
115
116 /** Construct by moving.
117
118 Transfers ownership of the wrapped source (if owned) and
119 cached awaitable storage from `other`. After the move, `other` is
120 in a default-constructed state.
121
122 @param other The wrapper to move from.
123 */
124 1x any_read_source(any_read_source&& other) noexcept
125 1x : source_(std::exchange(other.source_, nullptr))
126 1x , vt_(std::exchange(other.vt_, nullptr))
127 1x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
128 1x , storage_(std::exchange(other.storage_, nullptr))
129 1x , active_ops_(std::exchange(other.active_ops_, nullptr))
130 {
131 1x }
132
133 /** Assign by moving.
134
135 Destroys any owned source and releases existing resources,
136 then transfers ownership from `other`.
137
138 @param other The wrapper to move from.
139 @return Reference to this wrapper.
140 */
141 any_read_source&
142 operator=(any_read_source&& other) noexcept;
143
144 /** Construct by taking ownership of a ReadSource.
145
146 Allocates storage and moves the source into this wrapper.
147 The wrapper owns the source and will destroy it.
148
149 @param s The source to take ownership of.
150 */
151 template<ReadSource S>
152 requires (!std::same_as<std::decay_t<S>, any_read_source>)
153 any_read_source(S s);
154
155 /** Construct by wrapping a ReadSource without ownership.
156
157 Wraps the given source by pointer. The source must remain
158 valid for the lifetime of this wrapper.
159
160 @param s Pointer to the source to wrap.
161 */
162 template<ReadSource S>
163 any_read_source(S* s);
164
165 /** Check if the wrapper contains a valid source.
166
167 @return `true` if wrapping a source, `false` if default-constructed
168 or moved-from.
169 */
170 bool
171 27x has_value() const noexcept
172 {
173 27x return source_ != nullptr;
174 }
175
176 /** Check if the wrapper contains a valid source.
177
178 @return `true` if wrapping a source, `false` if default-constructed
179 or moved-from.
180 */
181 explicit
182 8x operator bool() const noexcept
183 {
184 8x return has_value();
185 }
186
187 /** Initiate a partial read operation.
188
189 Attempt to read up to `buffer_size( buffers )` bytes into
190 the provided buffer sequence. May fill less than the
191 full sequence.
192
193 @param buffers The buffer sequence to read into.
194
195 @return An awaitable that await-returns `(error_code,std::size_t)`.
196
197 @par Immediate Completion
198 The operation completes immediately without suspending
199 the calling coroutine when:
200 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
201 @li The underlying source's awaitable reports immediate
202 readiness via `await_ready`.
203
204 @note This is a partial operation and may not process the
205 entire buffer sequence. Use @ref read for guaranteed
206 complete transfer.
207
208 @par Preconditions
209 The wrapper must contain a valid source (`has_value() == true`).
210 The caller must not call this function again after a prior
211 call returned an error (including EOF).
212 */
213 template<MutableBufferSequence MB>
214 auto
215 read_some(MB buffers);
216
217 /** Initiate a complete read operation.
218
219 Reads data into the provided buffer sequence by forwarding
220 to the underlying source's `read` operation. Large buffer
221 sequences are processed in windows, with each window
222 forwarded as a separate `read` call to the underlying source.
223 The operation completes when the entire buffer sequence is
224 filled, end-of-file is reached, or an error occurs.
225
226 @param buffers The buffer sequence to read into.
227
228 @return An awaitable that await-returns `(error_code,std::size_t)`.
229
230 @par Immediate Completion
231 The operation completes immediately without suspending
232 the calling coroutine when:
233 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
234 @li The underlying source's `read` awaitable reports
235 immediate readiness via `await_ready`.
236
237 @par Postconditions
238 Exactly one of the following is true on return:
239 @li **Success**: `!ec` and `n == buffer_size(buffers)`.
240 The entire buffer was filled.
241 @li **End-of-stream or Error**: `ec` and `n` indicates
242 the number of bytes transferred before the failure.
243
244 @par Preconditions
245 The wrapper must contain a valid source (`has_value() == true`).
246 The caller must not call this function again after a prior
247 call returned an error (including EOF).
248 */
249 template<MutableBufferSequence MB>
250 io_task<std::size_t>
251 read(MB buffers);
252
253 protected:
254 /** Rebind to a new source after move.
255
256 Updates the internal pointer to reference a new source object.
257 Used by owning wrappers after move assignment when the owned
258 object has moved to a new location.
259
260 @param new_source The new source to bind to. Must be the same
261 type as the original source.
262
263 @note Terminates if called with a source of different type
264 than the original.
265 */
266 template<ReadSource S>
267 void
268 rebind(S& new_source) noexcept
269 {
270 if(vt_ != &vtable_for_impl<S>::value)
271 std::terminate();
272 source_ = &new_source;
273 }
274
275 private:
276 auto
277 read_(std::span<mutable_buffer const> buffers);
278 };
279
280 // ordered by call sequence for cache line coherence
281 struct any_read_source::awaitable_ops
282 {
283 bool (*await_ready)(void*);
284 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
285 io_result<std::size_t> (*await_resume)(void*);
286 void (*destroy)(void*) noexcept;
287 };
288
289 // ordered by call frequency for cache line coherence
290 struct any_read_source::vtable
291 {
292 awaitable_ops const* (*construct_read_some_awaitable)(
293 void* source,
294 void* storage,
295 std::span<mutable_buffer const> buffers);
296 awaitable_ops const* (*construct_read_awaitable)(
297 void* source,
298 void* storage,
299 std::span<mutable_buffer const> buffers);
300 std::size_t awaitable_size;
301 std::size_t awaitable_align;
302 void (*destroy)(void*) noexcept;
303 };
304
305 template<ReadSource S>
306 struct any_read_source::vtable_for_impl
307 {
308 using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
309 std::span<mutable_buffer const>{}));
310 using ReadAwaitable = decltype(std::declval<S&>().read(
311 std::span<mutable_buffer const>{}));
312
313 static void
314 6x do_destroy_impl(void* source) noexcept
315 {
316 6x static_cast<S*>(source)->~S();
317 6x }
318
319 static awaitable_ops const*
320 52x construct_read_some_awaitable_impl(
321 void* source,
322 void* storage,
323 std::span<mutable_buffer const> buffers)
324 {
325 52x auto& s = *static_cast<S*>(source);
326 52x ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
327
328 static constexpr awaitable_ops ops = {
329 +[](void* p) {
330 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
331 },
332 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
333 return detail::call_await_suspend(
334 static_cast<ReadSomeAwaitable*>(p), h, env);
335 },
336 +[](void* p) {
337 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
338 },
339 +[](void* p) noexcept {
340 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
341 }
342 };
343 52x return &ops;
344 }
345
346 static awaitable_ops const*
347 116x construct_read_awaitable_impl(
348 void* source,
349 void* storage,
350 std::span<mutable_buffer const> buffers)
351 {
352 116x auto& s = *static_cast<S*>(source);
353 116x ::new(storage) ReadAwaitable(s.read(buffers));
354
355 static constexpr awaitable_ops ops = {
356 +[](void* p) {
357 return static_cast<ReadAwaitable*>(p)->await_ready();
358 },
359 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
360 return detail::call_await_suspend(
361 static_cast<ReadAwaitable*>(p), h, env);
362 },
363 +[](void* p) {
364 return static_cast<ReadAwaitable*>(p)->await_resume();
365 },
366 +[](void* p) noexcept {
367 static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
368 }
369 };
370 116x return &ops;
371 }
372
373 static constexpr std::size_t max_awaitable_size =
374 sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
375 ? sizeof(ReadSomeAwaitable)
376 : sizeof(ReadAwaitable);
377 static constexpr std::size_t max_awaitable_align =
378 alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
379 ? alignof(ReadSomeAwaitable)
380 : alignof(ReadAwaitable);
381
382 static constexpr vtable value = {
383 &construct_read_some_awaitable_impl,
384 &construct_read_awaitable_impl,
385 max_awaitable_size,
386 max_awaitable_align,
387 &do_destroy_impl
388 };
389 };
390
391 inline
392 145x any_read_source::~any_read_source()
393 {
394
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 139 times.
145x if(storage_)
395 {
396 6x vt_->destroy(source_);
397 6x ::operator delete(storage_);
398 }
399
2/2
✓ Branch 0 taken 139 times.
✓ Branch 1 taken 6 times.
145x if(cached_awaitable_)
400 {
401
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 138 times.
139x if(active_ops_)
402 1x active_ops_->destroy(cached_awaitable_);
403 139x ::operator delete(cached_awaitable_);
404 }
405 145x }
406
407 inline any_read_source&
408 4x any_read_source::operator=(any_read_source&& other) noexcept
409 {
410
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4x if(this != &other)
411 {
412
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3x if(storage_)
413 {
414 vt_->destroy(source_);
415 ::operator delete(storage_);
416 }
417
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3x if(cached_awaitable_)
418 {
419
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2x if(active_ops_)
420 1x active_ops_->destroy(cached_awaitable_);
421 2x ::operator delete(cached_awaitable_);
422 }
423 3x source_ = std::exchange(other.source_, nullptr);
424 3x vt_ = std::exchange(other.vt_, nullptr);
425 3x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
426 3x storage_ = std::exchange(other.storage_, nullptr);
427 3x active_ops_ = std::exchange(other.active_ops_, nullptr);
428 }
429 4x return *this;
430 }
431
432 template<ReadSource S>
433 requires (!std::same_as<std::decay_t<S>, any_read_source>)
434 6x any_read_source::any_read_source(S s)
435 6x : vt_(&vtable_for_impl<S>::value)
436 {
437 struct guard {
438 any_read_source* self;
439 bool committed = false;
440 6x ~guard() {
441
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6x if(!committed && self->storage_) {
442 self->vt_->destroy(self->source_);
443 ::operator delete(self->storage_);
444 self->storage_ = nullptr;
445 self->source_ = nullptr;
446 }
447 6x }
448 6x } g{this};
449
450
1/1
✓ Branch 1 taken 6 times.
6x storage_ = ::operator new(sizeof(S));
451 6x source_ = ::new(storage_) S(std::move(s));
452
453 // Preallocate the awaitable storage
454
1/1
✓ Branch 1 taken 6 times.
6x cached_awaitable_ = ::operator new(vt_->awaitable_size);
455
456 6x g.committed = true;
457 6x }
458
459 template<ReadSource S>
460 135x any_read_source::any_read_source(S* s)
461 135x : source_(s)
462 135x , vt_(&vtable_for_impl<S>::value)
463 {
464 // Preallocate the awaitable storage
465 135x cached_awaitable_ = ::operator new(vt_->awaitable_size);
466 135x }
467
468 template<MutableBufferSequence MB>
469 auto
470 54x any_read_source::read_some(MB buffers)
471 {
472 struct awaitable
473 {
474 any_read_source* self_;
475 mutable_buffer_array<detail::max_iovec_> ba_;
476
477 awaitable(any_read_source* self, MB const& buffers)
478 : self_(self)
479 , ba_(buffers)
480 {
481 }
482
483 bool
484 await_ready() const noexcept
485 {
486 return ba_.to_span().empty();
487 }
488
489 std::coroutine_handle<>
490 await_suspend(std::coroutine_handle<> h, io_env const* env)
491 {
492 self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
493 self_->source_,
494 self_->cached_awaitable_,
495 ba_.to_span());
496
497 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
498 return h;
499
500 return self_->active_ops_->await_suspend(
501 self_->cached_awaitable_, h, env);
502 }
503
504 io_result<std::size_t>
505 await_resume()
506 {
507 if(ba_.to_span().empty())
508 return {{}, 0};
509
510 struct guard {
511 any_read_source* self;
512 ~guard() {
513 self->active_ops_->destroy(self->cached_awaitable_);
514 self->active_ops_ = nullptr;
515 }
516 } g{self_};
517 return self_->active_ops_->await_resume(
518 self_->cached_awaitable_);
519 }
520 };
521 54x return awaitable(this, buffers);
522 }
523
524 inline auto
525 116x any_read_source::read_(std::span<mutable_buffer const> buffers)
526 {
527 struct awaitable
528 {
529 any_read_source* self_;
530 std::span<mutable_buffer const> buffers_;
531
532 bool
533 116x await_ready() const noexcept
534 {
535 116x return false;
536 }
537
538 std::coroutine_handle<>
539 116x await_suspend(std::coroutine_handle<> h, io_env const* env)
540 {
541 232x self_->active_ops_ = self_->vt_->construct_read_awaitable(
542 116x self_->source_,
543 116x self_->cached_awaitable_,
544 buffers_);
545
546
1/2
✓ Branch 1 taken 116 times.
✗ Branch 2 not taken.
116x if(self_->active_ops_->await_ready(self_->cached_awaitable_))
547 116x return h;
548
549 return self_->active_ops_->await_suspend(
550 self_->cached_awaitable_, h, env);
551 }
552
553 io_result<std::size_t>
554 116x await_resume()
555 {
556 struct guard {
557 any_read_source* self;
558 116x ~guard() {
559 116x self->active_ops_->destroy(self->cached_awaitable_);
560 116x self->active_ops_ = nullptr;
561 116x }
562 116x } g{self_};
563 116x return self_->active_ops_->await_resume(
564
1/1
✓ Branch 1 taken 84 times.
200x self_->cached_awaitable_);
565 116x }
566 };
567 116x return awaitable{this, buffers};
568 }
569
570 template<MutableBufferSequence MB>
571 io_task<std::size_t>
572
3/3
boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_read_source::read<boost::capy::mutable_buffer>(boost::capy::mutable_buffer):
✓ Branch 1 taken 82 times.
boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_read_source::read<std::array<boost::capy::mutable_buffer, 20ul> >(std::array<boost::capy::mutable_buffer, 20ul>):
✓ Branch 1 taken 16 times.
boost::capy::task<boost::capy::io_result<unsigned long> > boost::capy::any_read_source::read<std::array<boost::capy::mutable_buffer, 2ul> >(std::array<boost::capy::mutable_buffer, 2ul>):
✓ Branch 1 taken 12 times.
110x any_read_source::read(MB buffers)
573 {
574 buffer_param bp(buffers);
575 std::size_t total = 0;
576
577 for(;;)
578 {
579 auto bufs = bp.data();
580 if(bufs.empty())
581 break;
582
583 auto [ec, n] = co_await read_(bufs);
584 total += n;
585 if(ec)
586 co_return {ec, total};
587 bp.consume(n);
588 }
589
590 co_return {{}, total};
591 220x }
592
593 } // namespace capy
594 } // namespace boost
595
596 #endif
597