1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/read_source.hpp>
19  
#include <boost/capy/concept/read_source.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_task.hpp>
22  
#include <boost/capy/io_task.hpp>
23  

23  

24  
#include <concepts>
24  
#include <concepts>
25  
#include <coroutine>
25  
#include <coroutine>
26  
#include <cstddef>
26  
#include <cstddef>
27  
#include <exception>
27  
#include <exception>
28  
#include <new>
28  
#include <new>
29  
#include <span>
29  
#include <span>
30  
#include <stop_token>
30  
#include <stop_token>
31  
#include <system_error>
31  
#include <system_error>
32  
#include <utility>
32  
#include <utility>
33  

33  

34  
namespace boost {
34  
namespace boost {
35  
namespace capy {
35  
namespace capy {
36  

36  

37  
/** Type-erased wrapper for any ReadSource.
37  
/** Type-erased wrapper for any ReadSource.
38  

38  

39  
    This class provides type erasure for any type satisfying the
39  
    This class provides type erasure for any type satisfying the
40  
    @ref ReadSource concept, enabling runtime polymorphism for
40  
    @ref ReadSource concept, enabling runtime polymorphism for
41  
    source read operations. It uses cached awaitable storage to achieve
41  
    source read operations. It uses cached awaitable storage to achieve
42  
    zero steady-state allocation after construction.
42  
    zero steady-state allocation after construction.
43  

43  

44  
    The wrapper supports two construction modes:
44  
    The wrapper supports two construction modes:
45  
    - **Owning**: Pass by value to transfer ownership. The wrapper
45  
    - **Owning**: Pass by value to transfer ownership. The wrapper
46  
      allocates storage and owns the source.
46  
      allocates storage and owns the source.
47  
    - **Reference**: Pass a pointer to wrap without ownership. The
47  
    - **Reference**: Pass a pointer to wrap without ownership. The
48  
      pointed-to source must outlive this wrapper.
48  
      pointed-to source must outlive this wrapper.
49  

49  

50  
    @par Awaitable Preallocation
50  
    @par Awaitable Preallocation
51  
    The constructor preallocates storage for the type-erased awaitable.
51  
    The constructor preallocates storage for the type-erased awaitable.
52  
    This reserves all virtual address space at server startup
52  
    This reserves all virtual address space at server startup
53  
    so memory usage can be measured up front, rather than
53  
    so memory usage can be measured up front, rather than
54  
    allocating piecemeal as traffic arrives.
54  
    allocating piecemeal as traffic arrives.
55  

55  

56  
    @par Immediate Completion
56  
    @par Immediate Completion
57  
    Operations complete immediately without suspending when the
57  
    Operations complete immediately without suspending when the
58  
    buffer sequence is empty, or when the underlying source's
58  
    buffer sequence is empty, or when the underlying source's
59  
    awaitable reports readiness via `await_ready`.
59  
    awaitable reports readiness via `await_ready`.
60  

60  

61  
    @par Thread Safety
61  
    @par Thread Safety
62  
    Not thread-safe. Concurrent operations on the same wrapper
62  
    Not thread-safe. Concurrent operations on the same wrapper
63  
    are undefined behavior.
63  
    are undefined behavior.
64  

64  

65  
    @par Example
65  
    @par Example
66  
    @code
66  
    @code
67  
    // Owning - takes ownership of the source
67  
    // Owning - takes ownership of the source
68  
    any_read_source rs(some_source{args...});
68  
    any_read_source rs(some_source{args...});
69  

69  

70  
    // Reference - wraps without ownership
70  
    // Reference - wraps without ownership
71  
    some_source source;
71  
    some_source source;
72  
    any_read_source rs(&source);
72  
    any_read_source rs(&source);
73  

73  

74  
    mutable_buffer buf(data, size);
74  
    mutable_buffer buf(data, size);
75  
    auto [ec, n] = co_await rs.read(std::span(&buf, 1));
75  
    auto [ec, n] = co_await rs.read(std::span(&buf, 1));
76  
    @endcode
76  
    @endcode
77  

77  

78  
    @see any_read_stream, ReadSource
78  
    @see any_read_stream, ReadSource
79  
*/
79  
*/
80  
class any_read_source
80  
class any_read_source
81  
{
81  
{
82  
    struct vtable;
82  
    struct vtable;
83  
    struct awaitable_ops;
83  
    struct awaitable_ops;
84  

84  

85  
    template<ReadSource S>
85  
    template<ReadSource S>
86  
    struct vtable_for_impl;
86  
    struct vtable_for_impl;
87  

87  

88  
    void* source_ = nullptr;
88  
    void* source_ = nullptr;
89  
    vtable const* vt_ = nullptr;
89  
    vtable const* vt_ = nullptr;
90  
    void* cached_awaitable_ = nullptr;
90  
    void* cached_awaitable_ = nullptr;
91  
    void* storage_ = nullptr;
91  
    void* storage_ = nullptr;
92  
    awaitable_ops const* active_ops_ = nullptr;
92  
    awaitable_ops const* active_ops_ = nullptr;
93  

93  

94  
public:
94  
public:
95  
    /** Destructor.
95  
    /** Destructor.
96  

96  

97  
        Destroys the owned source (if any) and releases the cached
97  
        Destroys the owned source (if any) and releases the cached
98  
        awaitable storage.
98  
        awaitable storage.
99  
    */
99  
    */
100  
    ~any_read_source();
100  
    ~any_read_source();
101  

101  

102  
    /** Construct a default instance.
102  
    /** Construct a default instance.
103  

103  

104  
        Constructs an empty wrapper. Operations on a default-constructed
104  
        Constructs an empty wrapper. Operations on a default-constructed
105  
        wrapper result in undefined behavior.
105  
        wrapper result in undefined behavior.
106  
    */
106  
    */
107  
    any_read_source() = default;
107  
    any_read_source() = default;
108  

108  

109  
    /** Non-copyable.
109  
    /** Non-copyable.
110  

110  

111  
        The awaitable cache is per-instance and cannot be shared.
111  
        The awaitable cache is per-instance and cannot be shared.
112  
    */
112  
    */
113  
    any_read_source(any_read_source const&) = delete;
113  
    any_read_source(any_read_source const&) = delete;
114  
    any_read_source& operator=(any_read_source const&) = delete;
114  
    any_read_source& operator=(any_read_source const&) = delete;
115  

115  

116  
    /** Construct by moving.
116  
    /** Construct by moving.
117  

117  

118  
        Transfers ownership of the wrapped source (if owned) and
118  
        Transfers ownership of the wrapped source (if owned) and
119  
        cached awaitable storage from `other`. After the move, `other` is
119  
        cached awaitable storage from `other`. After the move, `other` is
120  
        in a default-constructed state.
120  
        in a default-constructed state.
121  

121  

122  
        @param other The wrapper to move from.
122  
        @param other The wrapper to move from.
123  
    */
123  
    */
124  
    any_read_source(any_read_source&& other) noexcept
124  
    any_read_source(any_read_source&& other) noexcept
125  
        : source_(std::exchange(other.source_, nullptr))
125  
        : source_(std::exchange(other.source_, nullptr))
126  
        , vt_(std::exchange(other.vt_, nullptr))
126  
        , vt_(std::exchange(other.vt_, nullptr))
127  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
128  
        , storage_(std::exchange(other.storage_, nullptr))
128  
        , storage_(std::exchange(other.storage_, nullptr))
129  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
129  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
130  
    {
130  
    {
131  
    }
131  
    }
132  

132  

133  
    /** Assign by moving.
133  
    /** Assign by moving.
134  

134  

135  
        Destroys any owned source and releases existing resources,
135  
        Destroys any owned source and releases existing resources,
136  
        then transfers ownership from `other`.
136  
        then transfers ownership from `other`.
137  

137  

138  
        @param other The wrapper to move from.
138  
        @param other The wrapper to move from.
139  
        @return Reference to this wrapper.
139  
        @return Reference to this wrapper.
140  
    */
140  
    */
141  
    any_read_source&
141  
    any_read_source&
142  
    operator=(any_read_source&& other) noexcept;
142  
    operator=(any_read_source&& other) noexcept;
143  

143  

144  
    /** Construct by taking ownership of a ReadSource.
144  
    /** Construct by taking ownership of a ReadSource.
145  

145  

146  
        Allocates storage and moves the source into this wrapper.
146  
        Allocates storage and moves the source into this wrapper.
147  
        The wrapper owns the source and will destroy it.
147  
        The wrapper owns the source and will destroy it.
148  

148  

149  
        @param s The source to take ownership of.
149  
        @param s The source to take ownership of.
150  
    */
150  
    */
151  
    template<ReadSource S>
151  
    template<ReadSource S>
152  
        requires (!std::same_as<std::decay_t<S>, any_read_source>)
152  
        requires (!std::same_as<std::decay_t<S>, any_read_source>)
153  
    any_read_source(S s);
153  
    any_read_source(S s);
154  

154  

155  
    /** Construct by wrapping a ReadSource without ownership.
155  
    /** Construct by wrapping a ReadSource without ownership.
156  

156  

157  
        Wraps the given source by pointer. The source must remain
157  
        Wraps the given source by pointer. The source must remain
158  
        valid for the lifetime of this wrapper.
158  
        valid for the lifetime of this wrapper.
159  

159  

160  
        @param s Pointer to the source to wrap.
160  
        @param s Pointer to the source to wrap.
161  
    */
161  
    */
162  
    template<ReadSource S>
162  
    template<ReadSource S>
163  
    any_read_source(S* s);
163  
    any_read_source(S* s);
164  

164  

165  
    /** Check if the wrapper contains a valid source.
165  
    /** Check if the wrapper contains a valid source.
166  

166  

167  
        @return `true` if wrapping a source, `false` if default-constructed
167  
        @return `true` if wrapping a source, `false` if default-constructed
168  
            or moved-from.
168  
            or moved-from.
169  
    */
169  
    */
170  
    bool
170  
    bool
171  
    has_value() const noexcept
171  
    has_value() const noexcept
172  
    {
172  
    {
173  
        return source_ != nullptr;
173  
        return source_ != nullptr;
174  
    }
174  
    }
175  

175  

176  
    /** Check if the wrapper contains a valid source.
176  
    /** Check if the wrapper contains a valid source.
177  

177  

178  
        @return `true` if wrapping a source, `false` if default-constructed
178  
        @return `true` if wrapping a source, `false` if default-constructed
179  
            or moved-from.
179  
            or moved-from.
180  
    */
180  
    */
181  
    explicit
181  
    explicit
182  
    operator bool() const noexcept
182  
    operator bool() const noexcept
183  
    {
183  
    {
184  
        return has_value();
184  
        return has_value();
185  
    }
185  
    }
186  

186  

187  
    /** Initiate a partial read operation.
187  
    /** Initiate a partial read operation.
188  

188  

189  
        Attempt to read up to `buffer_size( buffers )` bytes into
189  
        Attempt to read up to `buffer_size( buffers )` bytes into
190  
        the provided buffer sequence. May fill less than the
190  
        the provided buffer sequence. May fill less than the
191  
        full sequence.
191  
        full sequence.
192  

192  

193  
        @param buffers The buffer sequence to read into.
193  
        @param buffers The buffer sequence to read into.
194  

194  

195  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
195  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
196  

196  

197  
        @par Immediate Completion
197  
        @par Immediate Completion
198  
        The operation completes immediately without suspending
198  
        The operation completes immediately without suspending
199  
        the calling coroutine when:
199  
        the calling coroutine when:
200  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
200  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
201  
        @li The underlying source's awaitable reports immediate
201  
        @li The underlying source's awaitable reports immediate
202  
            readiness via `await_ready`.
202  
            readiness via `await_ready`.
203  

203  

204  
        @note This is a partial operation and may not process the
204  
        @note This is a partial operation and may not process the
205  
        entire buffer sequence. Use @ref read for guaranteed
205  
        entire buffer sequence. Use @ref read for guaranteed
206  
        complete transfer.
206  
        complete transfer.
207  

207  

208  
        @par Preconditions
208  
        @par Preconditions
209  
        The wrapper must contain a valid source (`has_value() == true`).
209  
        The wrapper must contain a valid source (`has_value() == true`).
210  
        The caller must not call this function again after a prior
210  
        The caller must not call this function again after a prior
211  
        call returned an error (including EOF).
211  
        call returned an error (including EOF).
212  
    */
212  
    */
213  
    template<MutableBufferSequence MB>
213  
    template<MutableBufferSequence MB>
214  
    auto
214  
    auto
215  
    read_some(MB buffers);
215  
    read_some(MB buffers);
216  

216  

217  
    /** Initiate a complete read operation.
217  
    /** Initiate a complete read operation.
218  

218  

219  
        Reads data into the provided buffer sequence by forwarding
219  
        Reads data into the provided buffer sequence by forwarding
220  
        to the underlying source's `read` operation. Large buffer
220  
        to the underlying source's `read` operation. Large buffer
221  
        sequences are processed in windows, with each window
221  
        sequences are processed in windows, with each window
222  
        forwarded as a separate `read` call to the underlying source.
222  
        forwarded as a separate `read` call to the underlying source.
223  
        The operation completes when the entire buffer sequence is
223  
        The operation completes when the entire buffer sequence is
224  
        filled, end-of-file is reached, or an error occurs.
224  
        filled, end-of-file is reached, or an error occurs.
225  

225  

226  
        @param buffers The buffer sequence to read into.
226  
        @param buffers The buffer sequence to read into.
227  

227  

228  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
228  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
229  

229  

230  
        @par Immediate Completion
230  
        @par Immediate Completion
231  
        The operation completes immediately without suspending
231  
        The operation completes immediately without suspending
232  
        the calling coroutine when:
232  
        the calling coroutine when:
233  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
233  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
234  
        @li The underlying source's `read` awaitable reports
234  
        @li The underlying source's `read` awaitable reports
235  
            immediate readiness via `await_ready`.
235  
            immediate readiness via `await_ready`.
236  

236  

237  
        @par Postconditions
237  
        @par Postconditions
238  
        Exactly one of the following is true on return:
238  
        Exactly one of the following is true on return:
239  
        @li **Success**: `!ec` and `n == buffer_size(buffers)`.
239  
        @li **Success**: `!ec` and `n == buffer_size(buffers)`.
240  
            The entire buffer was filled.
240  
            The entire buffer was filled.
241  
        @li **End-of-stream or Error**: `ec` and `n` indicates
241  
        @li **End-of-stream or Error**: `ec` and `n` indicates
242  
            the number of bytes transferred before the failure.
242  
            the number of bytes transferred before the failure.
243  

243  

244  
        @par Preconditions
244  
        @par Preconditions
245  
        The wrapper must contain a valid source (`has_value() == true`).
245  
        The wrapper must contain a valid source (`has_value() == true`).
246  
        The caller must not call this function again after a prior
246  
        The caller must not call this function again after a prior
247  
        call returned an error (including EOF).
247  
        call returned an error (including EOF).
248  
    */
248  
    */
249  
    template<MutableBufferSequence MB>
249  
    template<MutableBufferSequence MB>
250  
    io_task<std::size_t>
250  
    io_task<std::size_t>
251  
    read(MB buffers);
251  
    read(MB buffers);
252  

252  

253  
protected:
253  
protected:
254  
    /** Rebind to a new source after move.
254  
    /** Rebind to a new source after move.
255  

255  

256  
        Updates the internal pointer to reference a new source object.
256  
        Updates the internal pointer to reference a new source object.
257  
        Used by owning wrappers after move assignment when the owned
257  
        Used by owning wrappers after move assignment when the owned
258  
        object has moved to a new location.
258  
        object has moved to a new location.
259  

259  

260  
        @param new_source The new source to bind to. Must be the same
260  
        @param new_source The new source to bind to. Must be the same
261  
            type as the original source.
261  
            type as the original source.
262  

262  

263  
        @note Terminates if called with a source of different type
263  
        @note Terminates if called with a source of different type
264  
            than the original.
264  
            than the original.
265  
    */
265  
    */
266  
    template<ReadSource S>
266  
    template<ReadSource S>
267  
    void
267  
    void
268  
    rebind(S& new_source) noexcept
268  
    rebind(S& new_source) noexcept
269  
    {
269  
    {
270  
        if(vt_ != &vtable_for_impl<S>::value)
270  
        if(vt_ != &vtable_for_impl<S>::value)
271  
            std::terminate();
271  
            std::terminate();
272  
        source_ = &new_source;
272  
        source_ = &new_source;
273  
    }
273  
    }
274  

274  

275  
private:
275  
private:
276  
    auto
276  
    auto
277  
    read_(std::span<mutable_buffer const> buffers);
277  
    read_(std::span<mutable_buffer const> buffers);
278  
};
278  
};
279  

279  

280  
// ordered by call sequence for cache line coherence
280  
// ordered by call sequence for cache line coherence
281  
struct any_read_source::awaitable_ops
281  
struct any_read_source::awaitable_ops
282  
{
282  
{
283  
    bool (*await_ready)(void*);
283  
    bool (*await_ready)(void*);
284  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
284  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
285  
    io_result<std::size_t> (*await_resume)(void*);
285  
    io_result<std::size_t> (*await_resume)(void*);
286  
    void (*destroy)(void*) noexcept;
286  
    void (*destroy)(void*) noexcept;
287  
};
287  
};
288  

288  

289  
// ordered by call frequency for cache line coherence
289  
// ordered by call frequency for cache line coherence
290  
struct any_read_source::vtable
290  
struct any_read_source::vtable
291  
{
291  
{
292  
    awaitable_ops const* (*construct_read_some_awaitable)(
292  
    awaitable_ops const* (*construct_read_some_awaitable)(
293  
        void* source,
293  
        void* source,
294  
        void* storage,
294  
        void* storage,
295  
        std::span<mutable_buffer const> buffers);
295  
        std::span<mutable_buffer const> buffers);
296  
    awaitable_ops const* (*construct_read_awaitable)(
296  
    awaitable_ops const* (*construct_read_awaitable)(
297  
        void* source,
297  
        void* source,
298  
        void* storage,
298  
        void* storage,
299  
        std::span<mutable_buffer const> buffers);
299  
        std::span<mutable_buffer const> buffers);
300  
    std::size_t awaitable_size;
300  
    std::size_t awaitable_size;
301  
    std::size_t awaitable_align;
301  
    std::size_t awaitable_align;
302  
    void (*destroy)(void*) noexcept;
302  
    void (*destroy)(void*) noexcept;
303  
};
303  
};
304  

304  

305  
template<ReadSource S>
305  
template<ReadSource S>
306  
struct any_read_source::vtable_for_impl
306  
struct any_read_source::vtable_for_impl
307  
{
307  
{
308  
    using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
308  
    using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
309  
        std::span<mutable_buffer const>{}));
309  
        std::span<mutable_buffer const>{}));
310  
    using ReadAwaitable = decltype(std::declval<S&>().read(
310  
    using ReadAwaitable = decltype(std::declval<S&>().read(
311  
        std::span<mutable_buffer const>{}));
311  
        std::span<mutable_buffer const>{}));
312  

312  

313  
    static void
313  
    static void
314  
    do_destroy_impl(void* source) noexcept
314  
    do_destroy_impl(void* source) noexcept
315  
    {
315  
    {
316  
        static_cast<S*>(source)->~S();
316  
        static_cast<S*>(source)->~S();
317  
    }
317  
    }
318  

318  

319  
    static awaitable_ops const*
319  
    static awaitable_ops const*
320  
    construct_read_some_awaitable_impl(
320  
    construct_read_some_awaitable_impl(
321  
        void* source,
321  
        void* source,
322  
        void* storage,
322  
        void* storage,
323  
        std::span<mutable_buffer const> buffers)
323  
        std::span<mutable_buffer const> buffers)
324  
    {
324  
    {
325  
        auto& s = *static_cast<S*>(source);
325  
        auto& s = *static_cast<S*>(source);
326  
        ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
326  
        ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
327  

327  

328  
        static constexpr awaitable_ops ops = {
328  
        static constexpr awaitable_ops ops = {
329  
            +[](void* p) {
329  
            +[](void* p) {
330  
                return static_cast<ReadSomeAwaitable*>(p)->await_ready();
330  
                return static_cast<ReadSomeAwaitable*>(p)->await_ready();
331  
            },
331  
            },
332  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
332  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
333  
                return detail::call_await_suspend(
333  
                return detail::call_await_suspend(
334  
                    static_cast<ReadSomeAwaitable*>(p), h, env);
334  
                    static_cast<ReadSomeAwaitable*>(p), h, env);
335  
            },
335  
            },
336  
            +[](void* p) {
336  
            +[](void* p) {
337  
                return static_cast<ReadSomeAwaitable*>(p)->await_resume();
337  
                return static_cast<ReadSomeAwaitable*>(p)->await_resume();
338  
            },
338  
            },
339  
            +[](void* p) noexcept {
339  
            +[](void* p) noexcept {
340  
                static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
340  
                static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
341  
            }
341  
            }
342  
        };
342  
        };
343  
        return &ops;
343  
        return &ops;
344  
    }
344  
    }
345  

345  

346  
    static awaitable_ops const*
346  
    static awaitable_ops const*
347  
    construct_read_awaitable_impl(
347  
    construct_read_awaitable_impl(
348  
        void* source,
348  
        void* source,
349  
        void* storage,
349  
        void* storage,
350  
        std::span<mutable_buffer const> buffers)
350  
        std::span<mutable_buffer const> buffers)
351  
    {
351  
    {
352  
        auto& s = *static_cast<S*>(source);
352  
        auto& s = *static_cast<S*>(source);
353  
        ::new(storage) ReadAwaitable(s.read(buffers));
353  
        ::new(storage) ReadAwaitable(s.read(buffers));
354  

354  

355  
        static constexpr awaitable_ops ops = {
355  
        static constexpr awaitable_ops ops = {
356  
            +[](void* p) {
356  
            +[](void* p) {
357  
                return static_cast<ReadAwaitable*>(p)->await_ready();
357  
                return static_cast<ReadAwaitable*>(p)->await_ready();
358  
            },
358  
            },
359  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
359  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
360  
                return detail::call_await_suspend(
360  
                return detail::call_await_suspend(
361  
                    static_cast<ReadAwaitable*>(p), h, env);
361  
                    static_cast<ReadAwaitable*>(p), h, env);
362  
            },
362  
            },
363  
            +[](void* p) {
363  
            +[](void* p) {
364  
                return static_cast<ReadAwaitable*>(p)->await_resume();
364  
                return static_cast<ReadAwaitable*>(p)->await_resume();
365  
            },
365  
            },
366  
            +[](void* p) noexcept {
366  
            +[](void* p) noexcept {
367  
                static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
367  
                static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
368  
            }
368  
            }
369  
        };
369  
        };
370  
        return &ops;
370  
        return &ops;
371  
    }
371  
    }
372  

372  

373  
    static constexpr std::size_t max_awaitable_size =
373  
    static constexpr std::size_t max_awaitable_size =
374  
        sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
374  
        sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
375  
            ? sizeof(ReadSomeAwaitable)
375  
            ? sizeof(ReadSomeAwaitable)
376  
            : sizeof(ReadAwaitable);
376  
            : sizeof(ReadAwaitable);
377  
    static constexpr std::size_t max_awaitable_align =
377  
    static constexpr std::size_t max_awaitable_align =
378  
        alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
378  
        alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
379  
            ? alignof(ReadSomeAwaitable)
379  
            ? alignof(ReadSomeAwaitable)
380  
            : alignof(ReadAwaitable);
380  
            : alignof(ReadAwaitable);
381  

381  

382  
    static constexpr vtable value = {
382  
    static constexpr vtable value = {
383  
        &construct_read_some_awaitable_impl,
383  
        &construct_read_some_awaitable_impl,
384  
        &construct_read_awaitable_impl,
384  
        &construct_read_awaitable_impl,
385  
        max_awaitable_size,
385  
        max_awaitable_size,
386  
        max_awaitable_align,
386  
        max_awaitable_align,
387  
        &do_destroy_impl
387  
        &do_destroy_impl
388  
    };
388  
    };
389  
};
389  
};
390  

390  

391  
inline
391  
inline
392  
any_read_source::~any_read_source()
392  
any_read_source::~any_read_source()
393  
{
393  
{
394  
    if(storage_)
394  
    if(storage_)
395  
    {
395  
    {
396  
        vt_->destroy(source_);
396  
        vt_->destroy(source_);
397  
        ::operator delete(storage_);
397  
        ::operator delete(storage_);
398  
    }
398  
    }
399  
    if(cached_awaitable_)
399  
    if(cached_awaitable_)
400  
    {
400  
    {
401  
        if(active_ops_)
401  
        if(active_ops_)
402  
            active_ops_->destroy(cached_awaitable_);
402  
            active_ops_->destroy(cached_awaitable_);
403  
        ::operator delete(cached_awaitable_);
403  
        ::operator delete(cached_awaitable_);
404  
    }
404  
    }
405  
}
405  
}
406  

406  

407  
inline any_read_source&
407  
inline any_read_source&
408  
any_read_source::operator=(any_read_source&& other) noexcept
408  
any_read_source::operator=(any_read_source&& other) noexcept
409  
{
409  
{
410  
    if(this != &other)
410  
    if(this != &other)
411  
    {
411  
    {
412  
        if(storage_)
412  
        if(storage_)
413  
        {
413  
        {
414  
            vt_->destroy(source_);
414  
            vt_->destroy(source_);
415  
            ::operator delete(storage_);
415  
            ::operator delete(storage_);
416  
        }
416  
        }
417  
        if(cached_awaitable_)
417  
        if(cached_awaitable_)
418  
        {
418  
        {
419  
            if(active_ops_)
419  
            if(active_ops_)
420  
                active_ops_->destroy(cached_awaitable_);
420  
                active_ops_->destroy(cached_awaitable_);
421  
            ::operator delete(cached_awaitable_);
421  
            ::operator delete(cached_awaitable_);
422  
        }
422  
        }
423  
        source_ = std::exchange(other.source_, nullptr);
423  
        source_ = std::exchange(other.source_, nullptr);
424  
        vt_ = std::exchange(other.vt_, nullptr);
424  
        vt_ = std::exchange(other.vt_, nullptr);
425  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
425  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
426  
        storage_ = std::exchange(other.storage_, nullptr);
426  
        storage_ = std::exchange(other.storage_, nullptr);
427  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
427  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
428  
    }
428  
    }
429  
    return *this;
429  
    return *this;
430  
}
430  
}
431  

431  

432  
template<ReadSource S>
432  
template<ReadSource S>
433  
    requires (!std::same_as<std::decay_t<S>, any_read_source>)
433  
    requires (!std::same_as<std::decay_t<S>, any_read_source>)
434  
any_read_source::any_read_source(S s)
434  
any_read_source::any_read_source(S s)
435  
    : vt_(&vtable_for_impl<S>::value)
435  
    : vt_(&vtable_for_impl<S>::value)
436  
{
436  
{
437  
    struct guard {
437  
    struct guard {
438  
        any_read_source* self;
438  
        any_read_source* self;
439  
        bool committed = false;
439  
        bool committed = false;
440  
        ~guard() {
440  
        ~guard() {
441  
            if(!committed && self->storage_) {
441  
            if(!committed && self->storage_) {
442  
                self->vt_->destroy(self->source_);
442  
                self->vt_->destroy(self->source_);
443  
                ::operator delete(self->storage_);
443  
                ::operator delete(self->storage_);
444  
                self->storage_ = nullptr;
444  
                self->storage_ = nullptr;
445  
                self->source_ = nullptr;
445  
                self->source_ = nullptr;
446  
            }
446  
            }
447  
        }
447  
        }
448  
    } g{this};
448  
    } g{this};
449  

449  

450  
    storage_ = ::operator new(sizeof(S));
450  
    storage_ = ::operator new(sizeof(S));
451  
    source_ = ::new(storage_) S(std::move(s));
451  
    source_ = ::new(storage_) S(std::move(s));
452  

452  

453  
    // Preallocate the awaitable storage
453  
    // Preallocate the awaitable storage
454  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
454  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
455  

455  

456  
    g.committed = true;
456  
    g.committed = true;
457  
}
457  
}
458  

458  

459  
template<ReadSource S>
459  
template<ReadSource S>
460  
any_read_source::any_read_source(S* s)
460  
any_read_source::any_read_source(S* s)
461  
    : source_(s)
461  
    : source_(s)
462  
    , vt_(&vtable_for_impl<S>::value)
462  
    , vt_(&vtable_for_impl<S>::value)
463  
{
463  
{
464  
    // Preallocate the awaitable storage
464  
    // Preallocate the awaitable storage
465  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
465  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
466  
}
466  
}
467  

467  

468  
template<MutableBufferSequence MB>
468  
template<MutableBufferSequence MB>
469  
auto
469  
auto
470  
any_read_source::read_some(MB buffers)
470  
any_read_source::read_some(MB buffers)
471  
{
471  
{
472  
    struct awaitable
472  
    struct awaitable
473  
    {
473  
    {
474  
        any_read_source* self_;
474  
        any_read_source* self_;
475  
        mutable_buffer_array<detail::max_iovec_> ba_;
475  
        mutable_buffer_array<detail::max_iovec_> ba_;
476  

476  

477  
        awaitable(any_read_source* self, MB const& buffers)
477  
        awaitable(any_read_source* self, MB const& buffers)
478  
            : self_(self)
478  
            : self_(self)
479  
            , ba_(buffers)
479  
            , ba_(buffers)
480  
        {
480  
        {
481  
        }
481  
        }
482  

482  

483  
        bool
483  
        bool
484  
        await_ready() const noexcept
484  
        await_ready() const noexcept
485  
        {
485  
        {
486  
            return ba_.to_span().empty();
486  
            return ba_.to_span().empty();
487  
        }
487  
        }
488  

488  

489  
        std::coroutine_handle<>
489  
        std::coroutine_handle<>
490  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
490  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
491  
        {
491  
        {
492  
            self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
492  
            self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
493  
                self_->source_,
493  
                self_->source_,
494  
                self_->cached_awaitable_,
494  
                self_->cached_awaitable_,
495  
                ba_.to_span());
495  
                ba_.to_span());
496  

496  

497  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
497  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
498  
                return h;
498  
                return h;
499  

499  

500  
            return self_->active_ops_->await_suspend(
500  
            return self_->active_ops_->await_suspend(
501  
                self_->cached_awaitable_, h, env);
501  
                self_->cached_awaitable_, h, env);
502  
        }
502  
        }
503  

503  

504  
        io_result<std::size_t>
504  
        io_result<std::size_t>
505  
        await_resume()
505  
        await_resume()
506  
        {
506  
        {
507  
            if(ba_.to_span().empty())
507  
            if(ba_.to_span().empty())
508  
                return {{}, 0};
508  
                return {{}, 0};
509  

509  

510  
            struct guard {
510  
            struct guard {
511  
                any_read_source* self;
511  
                any_read_source* self;
512  
                ~guard() {
512  
                ~guard() {
513  
                    self->active_ops_->destroy(self->cached_awaitable_);
513  
                    self->active_ops_->destroy(self->cached_awaitable_);
514  
                    self->active_ops_ = nullptr;
514  
                    self->active_ops_ = nullptr;
515  
                }
515  
                }
516  
            } g{self_};
516  
            } g{self_};
517  
            return self_->active_ops_->await_resume(
517  
            return self_->active_ops_->await_resume(
518  
                self_->cached_awaitable_);
518  
                self_->cached_awaitable_);
519  
        }
519  
        }
520  
    };
520  
    };
521  
    return awaitable(this, buffers);
521  
    return awaitable(this, buffers);
522  
}
522  
}
523  

523  

524  
inline auto
524  
inline auto
525  
any_read_source::read_(std::span<mutable_buffer const> buffers)
525  
any_read_source::read_(std::span<mutable_buffer const> buffers)
526  
{
526  
{
527  
    struct awaitable
527  
    struct awaitable
528  
    {
528  
    {
529  
        any_read_source* self_;
529  
        any_read_source* self_;
530  
        std::span<mutable_buffer const> buffers_;
530  
        std::span<mutable_buffer const> buffers_;
531  

531  

532  
        bool
532  
        bool
533  
        await_ready() const noexcept
533  
        await_ready() const noexcept
534  
        {
534  
        {
535  
            return false;
535  
            return false;
536  
        }
536  
        }
537  

537  

538  
        std::coroutine_handle<>
538  
        std::coroutine_handle<>
539  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
539  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
540  
        {
540  
        {
541  
            self_->active_ops_ = self_->vt_->construct_read_awaitable(
541  
            self_->active_ops_ = self_->vt_->construct_read_awaitable(
542  
                self_->source_,
542  
                self_->source_,
543  
                self_->cached_awaitable_,
543  
                self_->cached_awaitable_,
544  
                buffers_);
544  
                buffers_);
545  

545  

546  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
546  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
547  
                return h;
547  
                return h;
548  

548  

549  
            return self_->active_ops_->await_suspend(
549  
            return self_->active_ops_->await_suspend(
550  
                self_->cached_awaitable_, h, env);
550  
                self_->cached_awaitable_, h, env);
551  
        }
551  
        }
552  

552  

553  
        io_result<std::size_t>
553  
        io_result<std::size_t>
554  
        await_resume()
554  
        await_resume()
555  
        {
555  
        {
556  
            struct guard {
556  
            struct guard {
557  
                any_read_source* self;
557  
                any_read_source* self;
558  
                ~guard() {
558  
                ~guard() {
559  
                    self->active_ops_->destroy(self->cached_awaitable_);
559  
                    self->active_ops_->destroy(self->cached_awaitable_);
560  
                    self->active_ops_ = nullptr;
560  
                    self->active_ops_ = nullptr;
561  
                }
561  
                }
562  
            } g{self_};
562  
            } g{self_};
563  
            return self_->active_ops_->await_resume(
563  
            return self_->active_ops_->await_resume(
564  
                self_->cached_awaitable_);
564  
                self_->cached_awaitable_);
565  
        }
565  
        }
566  
    };
566  
    };
567  
    return awaitable{this, buffers};
567  
    return awaitable{this, buffers};
568  
}
568  
}
569  

569  

570  
template<MutableBufferSequence MB>
570  
template<MutableBufferSequence MB>
571  
io_task<std::size_t>
571  
io_task<std::size_t>
572  
any_read_source::read(MB buffers)
572  
any_read_source::read(MB buffers)
573  
{
573  
{
574  
    buffer_param bp(buffers);
574  
    buffer_param bp(buffers);
575  
    std::size_t total = 0;
575  
    std::size_t total = 0;
576  

576  

577  
    for(;;)
577  
    for(;;)
578  
    {
578  
    {
579  
        auto bufs = bp.data();
579  
        auto bufs = bp.data();
580  
        if(bufs.empty())
580  
        if(bufs.empty())
581  
            break;
581  
            break;
582  

582  

583  
        auto [ec, n] = co_await read_(bufs);
583  
        auto [ec, n] = co_await read_(bufs);
584  
        total += n;
584  
        total += n;
585  
        if(ec)
585  
        if(ec)
586  
            co_return {ec, total};
586  
            co_return {ec, total};
587  
        bp.consume(n);
587  
        bp.consume(n);
588  
    }
588  
    }
589  

589  

590  
    co_return {{}, total};
590  
    co_return {{}, total};
591  
}
591  
}
592  

592  

593  
} // namespace capy
593  
} // namespace capy
594  
} // namespace boost
594  
} // namespace boost
595  

595  

596  
#endif
596  
#endif