1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
21  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/ex/io_env.hpp>
22  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_result.hpp>
23  
#include <boost/capy/io_task.hpp>
23  
#include <boost/capy/io_task.hpp>
24  

24  

25  
#include <concepts>
25  
#include <concepts>
26  
#include <coroutine>
26  
#include <coroutine>
27  
#include <cstddef>
27  
#include <cstddef>
28  
#include <exception>
28  
#include <exception>
29  
#include <new>
29  
#include <new>
30  
#include <span>
30  
#include <span>
31  
#include <stop_token>
31  
#include <stop_token>
32  
#include <system_error>
32  
#include <system_error>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
namespace boost {
35  
namespace boost {
36  
namespace capy {
36  
namespace capy {
37  

37  

38  
/** Type-erased wrapper for any BufferSink.
38  
/** Type-erased wrapper for any BufferSink.
39  

39  

40  
    This class provides type erasure for any type satisfying the
40  
    This class provides type erasure for any type satisfying the
41  
    @ref BufferSink concept, enabling runtime polymorphism for
41  
    @ref BufferSink concept, enabling runtime polymorphism for
42  
    buffer sink operations. It uses cached awaitable storage to achieve
42  
    buffer sink operations. It uses cached awaitable storage to achieve
43  
    zero steady-state allocation after construction.
43  
    zero steady-state allocation after construction.
44  

44  

45  
    The wrapper exposes two interfaces for producing data:
45  
    The wrapper exposes two interfaces for producing data:
46  
    the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
46  
    the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47  
    and the @ref WriteSink interface (`write_some`, `write`,
47  
    and the @ref WriteSink interface (`write_some`, `write`,
48  
    `write_eof`). Choose the interface that matches how your data
48  
    `write_eof`). Choose the interface that matches how your data
49  
    is produced:
49  
    is produced:
50  

50  

51  
    @par Choosing an Interface
51  
    @par Choosing an Interface
52  

52  

53  
    Use the **BufferSink** interface when you are a generator that
53  
    Use the **BufferSink** interface when you are a generator that
54  
    produces data into externally-provided buffers. The sink owns
54  
    produces data into externally-provided buffers. The sink owns
55  
    the memory; you call @ref prepare to obtain writable buffers,
55  
    the memory; you call @ref prepare to obtain writable buffers,
56  
    fill them, then call @ref commit or @ref commit_eof.
56  
    fill them, then call @ref commit or @ref commit_eof.
57  

57  

58  
    Use the **WriteSink** interface when you already have buffers
58  
    Use the **WriteSink** interface when you already have buffers
59  
    containing the data to write:
59  
    containing the data to write:
60  
    - If the entire body is available up front, call
60  
    - If the entire body is available up front, call
61  
      @ref write_eof(buffers) to send everything atomically.
61  
      @ref write_eof(buffers) to send everything atomically.
62  
    - If data arrives incrementally, call @ref write or
62  
    - If data arrives incrementally, call @ref write or
63  
      @ref write_some in a loop, then @ref write_eof() when done.
63  
      @ref write_some in a loop, then @ref write_eof() when done.
64  
      Prefer `write` (complete) unless your streaming pattern
64  
      Prefer `write` (complete) unless your streaming pattern
65  
      benefits from partial writes via `write_some`.
65  
      benefits from partial writes via `write_some`.
66  

66  

67  
    If the wrapped type only satisfies @ref BufferSink, the
67  
    If the wrapped type only satisfies @ref BufferSink, the
68  
    @ref WriteSink operations are provided automatically.
68  
    @ref WriteSink operations are provided automatically.
69  

69  

70  
    @par Construction Modes
70  
    @par Construction Modes
71  

71  

72  
    - **Owning**: Pass by value to transfer ownership. The wrapper
72  
    - **Owning**: Pass by value to transfer ownership. The wrapper
73  
      allocates storage and owns the sink.
73  
      allocates storage and owns the sink.
74  
    - **Reference**: Pass a pointer to wrap without ownership. The
74  
    - **Reference**: Pass a pointer to wrap without ownership. The
75  
      pointed-to sink must outlive this wrapper.
75  
      pointed-to sink must outlive this wrapper.
76  

76  

77  
    @par Awaitable Preallocation
77  
    @par Awaitable Preallocation
78  
    The constructor preallocates storage for the type-erased awaitable.
78  
    The constructor preallocates storage for the type-erased awaitable.
79  
    This reserves all virtual address space at server startup
79  
    This reserves all virtual address space at server startup
80  
    so memory usage can be measured up front, rather than
80  
    so memory usage can be measured up front, rather than
81  
    allocating piecemeal as traffic arrives.
81  
    allocating piecemeal as traffic arrives.
82  

82  

83  
    @par Thread Safety
83  
    @par Thread Safety
84  
    Not thread-safe. Concurrent operations on the same wrapper
84  
    Not thread-safe. Concurrent operations on the same wrapper
85  
    are undefined behavior.
85  
    are undefined behavior.
86  

86  

87  
    @par Example
87  
    @par Example
88  
    @code
88  
    @code
89  
    // Owning - takes ownership of the sink
89  
    // Owning - takes ownership of the sink
90  
    any_buffer_sink abs(some_buffer_sink{args...});
90  
    any_buffer_sink abs(some_buffer_sink{args...});
91  

91  

92  
    // Reference - wraps without ownership
92  
    // Reference - wraps without ownership
93  
    some_buffer_sink sink;
93  
    some_buffer_sink sink;
94  
    any_buffer_sink abs(&sink);
94  
    any_buffer_sink abs(&sink);
95  

95  

96  
    // BufferSink interface: generate into callee-owned buffers
96  
    // BufferSink interface: generate into callee-owned buffers
97  
    mutable_buffer arr[16];
97  
    mutable_buffer arr[16];
98  
    auto bufs = abs.prepare(arr);
98  
    auto bufs = abs.prepare(arr);
99  
    // Write data into bufs[0..bufs.size())
99  
    // Write data into bufs[0..bufs.size())
100  
    auto [ec] = co_await abs.commit(bytes_written);
100  
    auto [ec] = co_await abs.commit(bytes_written);
101  
    auto [ec2] = co_await abs.commit_eof(0);
101  
    auto [ec2] = co_await abs.commit_eof(0);
102  

102  

103  
    // WriteSink interface: send caller-owned buffers
103  
    // WriteSink interface: send caller-owned buffers
104  
    auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
104  
    auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105  
    auto [ec4] = co_await abs.write_eof();
105  
    auto [ec4] = co_await abs.write_eof();
106  

106  

107  
    // Or send everything at once
107  
    // Or send everything at once
108  
    auto [ec5, n2] = co_await abs.write_eof(
108  
    auto [ec5, n2] = co_await abs.write_eof(
109  
        make_buffer(body_data));
109  
        make_buffer(body_data));
110  
    @endcode
110  
    @endcode
111  

111  

112  
    @see any_buffer_source, BufferSink, WriteSink
112  
    @see any_buffer_source, BufferSink, WriteSink
113  
*/
113  
*/
114  
class any_buffer_sink
114  
class any_buffer_sink
115  
{
115  
{
116  
    struct vtable;
116  
    struct vtable;
117  
    struct awaitable_ops;
117  
    struct awaitable_ops;
118  
    struct write_awaitable_ops;
118  
    struct write_awaitable_ops;
119  

119  

120  
    template<BufferSink S>
120  
    template<BufferSink S>
121  
    struct vtable_for_impl;
121  
    struct vtable_for_impl;
122  

122  

123  
    // hot-path members first for cache locality
123  
    // hot-path members first for cache locality
124  
    void* sink_ = nullptr;
124  
    void* sink_ = nullptr;
125  
    vtable const* vt_ = nullptr;
125  
    vtable const* vt_ = nullptr;
126  
    void* cached_awaitable_ = nullptr;
126  
    void* cached_awaitable_ = nullptr;
127  
    awaitable_ops const* active_ops_ = nullptr;
127  
    awaitable_ops const* active_ops_ = nullptr;
128  
    write_awaitable_ops const* active_write_ops_ = nullptr;
128  
    write_awaitable_ops const* active_write_ops_ = nullptr;
129  
    void* storage_ = nullptr;
129  
    void* storage_ = nullptr;
130  

130  

131  
public:
131  
public:
132  
    /** Destructor.
132  
    /** Destructor.
133  

133  

134  
        Destroys the owned sink (if any) and releases the cached
134  
        Destroys the owned sink (if any) and releases the cached
135  
        awaitable storage.
135  
        awaitable storage.
136  
    */
136  
    */
137  
    ~any_buffer_sink();
137  
    ~any_buffer_sink();
138  

138  

139  
    /** Construct a default instance.
139  
    /** Construct a default instance.
140  

140  

141  
        Constructs an empty wrapper. Operations on a default-constructed
141  
        Constructs an empty wrapper. Operations on a default-constructed
142  
        wrapper result in undefined behavior.
142  
        wrapper result in undefined behavior.
143  
    */
143  
    */
144  
    any_buffer_sink() = default;
144  
    any_buffer_sink() = default;
145  

145  

146  
    /** Non-copyable.
146  
    /** Non-copyable.
147  

147  

148  
        The awaitable cache is per-instance and cannot be shared.
148  
        The awaitable cache is per-instance and cannot be shared.
149  
    */
149  
    */
150  
    any_buffer_sink(any_buffer_sink const&) = delete;
150  
    any_buffer_sink(any_buffer_sink const&) = delete;
151  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
151  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152  

152  

153  
    /** Construct by moving.
153  
    /** Construct by moving.
154  

154  

155  
        Transfers ownership of the wrapped sink (if owned) and
155  
        Transfers ownership of the wrapped sink (if owned) and
156  
        cached awaitable storage from `other`. After the move, `other` is
156  
        cached awaitable storage from `other`. After the move, `other` is
157  
        in a default-constructed state.
157  
        in a default-constructed state.
158  

158  

159  
        @param other The wrapper to move from.
159  
        @param other The wrapper to move from.
160  
    */
160  
    */
161  
    any_buffer_sink(any_buffer_sink&& other) noexcept
161  
    any_buffer_sink(any_buffer_sink&& other) noexcept
162  
        : sink_(std::exchange(other.sink_, nullptr))
162  
        : sink_(std::exchange(other.sink_, nullptr))
163  
        , vt_(std::exchange(other.vt_, nullptr))
163  
        , vt_(std::exchange(other.vt_, nullptr))
164  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
164  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
165  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
166  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
166  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167  
        , storage_(std::exchange(other.storage_, nullptr))
167  
        , storage_(std::exchange(other.storage_, nullptr))
168  
    {
168  
    {
169  
    }
169  
    }
170  

170  

171  
    /** Assign by moving.
171  
    /** Assign by moving.
172  

172  

173  
        Destroys any owned sink and releases existing resources,
173  
        Destroys any owned sink and releases existing resources,
174  
        then transfers ownership from `other`.
174  
        then transfers ownership from `other`.
175  

175  

176  
        @param other The wrapper to move from.
176  
        @param other The wrapper to move from.
177  
        @return Reference to this wrapper.
177  
        @return Reference to this wrapper.
178  
    */
178  
    */
179  
    any_buffer_sink&
179  
    any_buffer_sink&
180  
    operator=(any_buffer_sink&& other) noexcept;
180  
    operator=(any_buffer_sink&& other) noexcept;
181  

181  

182  
    /** Construct by taking ownership of a BufferSink.
182  
    /** Construct by taking ownership of a BufferSink.
183  

183  

184  
        Allocates storage and moves the sink into this wrapper.
184  
        Allocates storage and moves the sink into this wrapper.
185  
        The wrapper owns the sink and will destroy it. If `S` also
185  
        The wrapper owns the sink and will destroy it. If `S` also
186  
        satisfies @ref WriteSink, native write operations are
186  
        satisfies @ref WriteSink, native write operations are
187  
        forwarded through the virtual boundary.
187  
        forwarded through the virtual boundary.
188  

188  

189  
        @param s The sink to take ownership of.
189  
        @param s The sink to take ownership of.
190  
    */
190  
    */
191  
    template<BufferSink S>
191  
    template<BufferSink S>
192  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
192  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193  
    any_buffer_sink(S s);
193  
    any_buffer_sink(S s);
194  

194  

195  
    /** Construct by wrapping a BufferSink without ownership.
195  
    /** Construct by wrapping a BufferSink without ownership.
196  

196  

197  
        Wraps the given sink by pointer. The sink must remain
197  
        Wraps the given sink by pointer. The sink must remain
198  
        valid for the lifetime of this wrapper. If `S` also
198  
        valid for the lifetime of this wrapper. If `S` also
199  
        satisfies @ref WriteSink, native write operations are
199  
        satisfies @ref WriteSink, native write operations are
200  
        forwarded through the virtual boundary.
200  
        forwarded through the virtual boundary.
201  

201  

202  
        @param s Pointer to the sink to wrap.
202  
        @param s Pointer to the sink to wrap.
203  
    */
203  
    */
204  
    template<BufferSink S>
204  
    template<BufferSink S>
205  
    any_buffer_sink(S* s);
205  
    any_buffer_sink(S* s);
206  

206  

207  
    /** Check if the wrapper contains a valid sink.
207  
    /** Check if the wrapper contains a valid sink.
208  

208  

209  
        @return `true` if wrapping a sink, `false` if default-constructed
209  
        @return `true` if wrapping a sink, `false` if default-constructed
210  
            or moved-from.
210  
            or moved-from.
211  
    */
211  
    */
212  
    bool
212  
    bool
213  
    has_value() const noexcept
213  
    has_value() const noexcept
214  
    {
214  
    {
215  
        return sink_ != nullptr;
215  
        return sink_ != nullptr;
216  
    }
216  
    }
217  

217  

218  
    /** Check if the wrapper contains a valid sink.
218  
    /** Check if the wrapper contains a valid sink.
219  

219  

220  
        @return `true` if wrapping a sink, `false` if default-constructed
220  
        @return `true` if wrapping a sink, `false` if default-constructed
221  
            or moved-from.
221  
            or moved-from.
222  
    */
222  
    */
223  
    explicit
223  
    explicit
224  
    operator bool() const noexcept
224  
    operator bool() const noexcept
225  
    {
225  
    {
226  
        return has_value();
226  
        return has_value();
227  
    }
227  
    }
228  

228  

229  
    /** Prepare writable buffers.
229  
    /** Prepare writable buffers.
230  

230  

231  
        Fills the provided span with mutable buffer descriptors
231  
        Fills the provided span with mutable buffer descriptors
232  
        pointing to the underlying sink's internal storage. This
232  
        pointing to the underlying sink's internal storage. This
233  
        operation is synchronous.
233  
        operation is synchronous.
234  

234  

235  
        @param dest Span of mutable_buffer to fill.
235  
        @param dest Span of mutable_buffer to fill.
236  

236  

237  
        @return A span of filled buffers.
237  
        @return A span of filled buffers.
238  

238  

239  
        @par Preconditions
239  
        @par Preconditions
240  
        The wrapper must contain a valid sink (`has_value() == true`).
240  
        The wrapper must contain a valid sink (`has_value() == true`).
241  
    */
241  
    */
242  
    std::span<mutable_buffer>
242  
    std::span<mutable_buffer>
243  
    prepare(std::span<mutable_buffer> dest);
243  
    prepare(std::span<mutable_buffer> dest);
244  

244  

245  
    /** Commit bytes written to the prepared buffers.
245  
    /** Commit bytes written to the prepared buffers.
246  

246  

247  
        Commits `n` bytes written to the buffers returned by the
247  
        Commits `n` bytes written to the buffers returned by the
248  
        most recent call to @ref prepare. The operation may trigger
248  
        most recent call to @ref prepare. The operation may trigger
249  
        underlying I/O.
249  
        underlying I/O.
250  

250  

251  
        @param n The number of bytes to commit.
251  
        @param n The number of bytes to commit.
252  

252  

253  
        @return An awaitable that await-returns `(error_code)`.
253  
        @return An awaitable that await-returns `(error_code)`.
254  

254  

255  
        @par Preconditions
255  
        @par Preconditions
256  
        The wrapper must contain a valid sink (`has_value() == true`).
256  
        The wrapper must contain a valid sink (`has_value() == true`).
257  
    */
257  
    */
258  
    auto
258  
    auto
259  
    commit(std::size_t n);
259  
    commit(std::size_t n);
260  

260  

261  
    /** Commit final bytes and signal end-of-stream.
261  
    /** Commit final bytes and signal end-of-stream.
262  

262  

263  
        Commits `n` bytes written to the buffers returned by the
263  
        Commits `n` bytes written to the buffers returned by the
264  
        most recent call to @ref prepare and finalizes the sink.
264  
        most recent call to @ref prepare and finalizes the sink.
265  
        After success, no further operations are permitted.
265  
        After success, no further operations are permitted.
266  

266  

267  
        @param n The number of bytes to commit.
267  
        @param n The number of bytes to commit.
268  

268  

269  
        @return An awaitable that await-returns `(error_code)`.
269  
        @return An awaitable that await-returns `(error_code)`.
270  

270  

271  
        @par Preconditions
271  
        @par Preconditions
272  
        The wrapper must contain a valid sink (`has_value() == true`).
272  
        The wrapper must contain a valid sink (`has_value() == true`).
273  
    */
273  
    */
274  
    auto
274  
    auto
275  
    commit_eof(std::size_t n);
275  
    commit_eof(std::size_t n);
276  

276  

277  
    /** Write some data from a buffer sequence.
277  
    /** Write some data from a buffer sequence.
278  

278  

279  
        Attempt to write up to `buffer_size( buffers )` bytes from
279  
        Attempt to write up to `buffer_size( buffers )` bytes from
280  
        the buffer sequence to the underlying sink. May consume less
280  
        the buffer sequence to the underlying sink. May consume less
281  
        than the full sequence.
281  
        than the full sequence.
282  

282  

283  
        When the wrapped type provides native @ref WriteSink support,
283  
        When the wrapped type provides native @ref WriteSink support,
284  
        the operation forwards directly. Otherwise it is synthesized
284  
        the operation forwards directly. Otherwise it is synthesized
285  
        from @ref prepare and @ref commit with a buffer copy.
285  
        from @ref prepare and @ref commit with a buffer copy.
286  

286  

287  
        @param buffers The buffer sequence to write.
287  
        @param buffers The buffer sequence to write.
288  

288  

289  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
289  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
290  

290  

291  
        @par Preconditions
291  
        @par Preconditions
292  
        The wrapper must contain a valid sink (`has_value() == true`).
292  
        The wrapper must contain a valid sink (`has_value() == true`).
293  
    */
293  
    */
294  
    template<ConstBufferSequence CB>
294  
    template<ConstBufferSequence CB>
295  
    io_task<std::size_t>
295  
    io_task<std::size_t>
296  
    write_some(CB buffers);
296  
    write_some(CB buffers);
297  

297  

298  
    /** Write all data from a buffer sequence.
298  
    /** Write all data from a buffer sequence.
299  

299  

300  
        Writes all data from the buffer sequence to the underlying
300  
        Writes all data from the buffer sequence to the underlying
301  
        sink. This method satisfies the @ref WriteSink concept.
301  
        sink. This method satisfies the @ref WriteSink concept.
302  

302  

303  
        When the wrapped type provides native @ref WriteSink support,
303  
        When the wrapped type provides native @ref WriteSink support,
304  
        each window is forwarded directly. Otherwise the data is
304  
        each window is forwarded directly. Otherwise the data is
305  
        copied into the sink via @ref prepare and @ref commit.
305  
        copied into the sink via @ref prepare and @ref commit.
306  

306  

307  
        @param buffers The buffer sequence to write.
307  
        @param buffers The buffer sequence to write.
308  

308  

309  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
309  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
310  

310  

311  
        @par Preconditions
311  
        @par Preconditions
312  
        The wrapper must contain a valid sink (`has_value() == true`).
312  
        The wrapper must contain a valid sink (`has_value() == true`).
313  
    */
313  
    */
314  
    template<ConstBufferSequence CB>
314  
    template<ConstBufferSequence CB>
315  
    io_task<std::size_t>
315  
    io_task<std::size_t>
316  
    write(CB buffers);
316  
    write(CB buffers);
317  

317  

318  
    /** Atomically write data and signal end-of-stream.
318  
    /** Atomically write data and signal end-of-stream.
319  

319  

320  
        Writes all data from the buffer sequence to the underlying
320  
        Writes all data from the buffer sequence to the underlying
321  
        sink and then signals end-of-stream.
321  
        sink and then signals end-of-stream.
322  

322  

323  
        When the wrapped type provides native @ref WriteSink support,
323  
        When the wrapped type provides native @ref WriteSink support,
324  
        the final window is sent atomically via the underlying
324  
        the final window is sent atomically via the underlying
325  
        `write_eof(buffers)`. Otherwise the data is synthesized
325  
        `write_eof(buffers)`. Otherwise the data is synthesized
326  
        through @ref prepare, @ref commit, and @ref commit_eof.
326  
        through @ref prepare, @ref commit, and @ref commit_eof.
327  

327  

328  
        @param buffers The buffer sequence to write.
328  
        @param buffers The buffer sequence to write.
329  

329  

330  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
330  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
331  

331  

332  
        @par Preconditions
332  
        @par Preconditions
333  
        The wrapper must contain a valid sink (`has_value() == true`).
333  
        The wrapper must contain a valid sink (`has_value() == true`).
334  
    */
334  
    */
335  
    template<ConstBufferSequence CB>
335  
    template<ConstBufferSequence CB>
336  
    io_task<std::size_t>
336  
    io_task<std::size_t>
337  
    write_eof(CB buffers);
337  
    write_eof(CB buffers);
338  

338  

339  
    /** Signal end-of-stream.
339  
    /** Signal end-of-stream.
340  

340  

341  
        Indicates that no more data will be written to the sink.
341  
        Indicates that no more data will be written to the sink.
342  
        This method satisfies the @ref WriteSink concept.
342  
        This method satisfies the @ref WriteSink concept.
343  

343  

344  
        When the wrapped type provides native @ref WriteSink support,
344  
        When the wrapped type provides native @ref WriteSink support,
345  
        the underlying `write_eof()` is called. Otherwise the
345  
        the underlying `write_eof()` is called. Otherwise the
346  
        operation is implemented as `commit_eof(0)`.
346  
        operation is implemented as `commit_eof(0)`.
347  

347  

348  
        @return An awaitable that await-returns `(error_code)`.
348  
        @return An awaitable that await-returns `(error_code)`.
349  

349  

350  
        @par Preconditions
350  
        @par Preconditions
351  
        The wrapper must contain a valid sink (`has_value() == true`).
351  
        The wrapper must contain a valid sink (`has_value() == true`).
352  
    */
352  
    */
353  
    auto
353  
    auto
354  
    write_eof();
354  
    write_eof();
355  

355  

356  
protected:
356  
protected:
357  
    /** Rebind to a new sink after move.
357  
    /** Rebind to a new sink after move.
358  

358  

359  
        Updates the internal pointer to reference a new sink object.
359  
        Updates the internal pointer to reference a new sink object.
360  
        Used by owning wrappers after move assignment when the owned
360  
        Used by owning wrappers after move assignment when the owned
361  
        object has moved to a new location.
361  
        object has moved to a new location.
362  

362  

363  
        @param new_sink The new sink to bind to. Must be the same
363  
        @param new_sink The new sink to bind to. Must be the same
364  
            type as the original sink.
364  
            type as the original sink.
365  

365  

366  
        @note Terminates if called with a sink of different type
366  
        @note Terminates if called with a sink of different type
367  
            than the original.
367  
            than the original.
368  
    */
368  
    */
369  
    template<BufferSink S>
369  
    template<BufferSink S>
370  
    void
370  
    void
371  
    rebind(S& new_sink) noexcept
371  
    rebind(S& new_sink) noexcept
372  
    {
372  
    {
373  
        if(vt_ != &vtable_for_impl<S>::value)
373  
        if(vt_ != &vtable_for_impl<S>::value)
374  
            std::terminate();
374  
            std::terminate();
375  
        sink_ = &new_sink;
375  
        sink_ = &new_sink;
376  
    }
376  
    }
377  

377  

378  
private:
378  
private:
379  
    /** Forward a partial write through the vtable.
379  
    /** Forward a partial write through the vtable.
380  

380  

381  
        Constructs the underlying `write_some` awaitable in
381  
        Constructs the underlying `write_some` awaitable in
382  
        cached storage and returns a type-erased awaitable.
382  
        cached storage and returns a type-erased awaitable.
383  
    */
383  
    */
384  
    auto
384  
    auto
385  
    write_some_(std::span<const_buffer const> buffers);
385  
    write_some_(std::span<const_buffer const> buffers);
386  

386  

387  
    /** Forward a complete write through the vtable.
387  
    /** Forward a complete write through the vtable.
388  

388  

389  
        Constructs the underlying `write` awaitable in
389  
        Constructs the underlying `write` awaitable in
390  
        cached storage and returns a type-erased awaitable.
390  
        cached storage and returns a type-erased awaitable.
391  
    */
391  
    */
392  
    auto
392  
    auto
393  
    write_(std::span<const_buffer const> buffers);
393  
    write_(std::span<const_buffer const> buffers);
394  

394  

395  
    /** Forward an atomic write-with-EOF through the vtable.
395  
    /** Forward an atomic write-with-EOF through the vtable.
396  

396  

397  
        Constructs the underlying `write_eof(buffers)` awaitable
397  
        Constructs the underlying `write_eof(buffers)` awaitable
398  
        in cached storage and returns a type-erased awaitable.
398  
        in cached storage and returns a type-erased awaitable.
399  
    */
399  
    */
400  
    auto
400  
    auto
401  
    write_eof_buffers_(std::span<const_buffer const> buffers);
401  
    write_eof_buffers_(std::span<const_buffer const> buffers);
402  
};
402  
};
403  

403  

404  
/** Type-erased ops for awaitables that await-return `io_result<>`. */
404  
/** Type-erased ops for awaitables that await-return `io_result<>`. */
405  
struct any_buffer_sink::awaitable_ops
405  
struct any_buffer_sink::awaitable_ops
406  
{
406  
{
407  
    bool (*await_ready)(void*);
407  
    bool (*await_ready)(void*);
408  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
408  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
409  
    io_result<> (*await_resume)(void*);
409  
    io_result<> (*await_resume)(void*);
410  
    void (*destroy)(void*) noexcept;
410  
    void (*destroy)(void*) noexcept;
411  
};
411  
};
412  

412  

413  
/** Type-erased ops for awaitables that await-return `io_result<std::size_t>`. */
413  
/** Type-erased ops for awaitables that await-return `io_result<std::size_t>`. */
414  
struct any_buffer_sink::write_awaitable_ops
414  
struct any_buffer_sink::write_awaitable_ops
415  
{
415  
{
416  
    bool (*await_ready)(void*);
416  
    bool (*await_ready)(void*);
417  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
417  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
418  
    io_result<std::size_t> (*await_resume)(void*);
418  
    io_result<std::size_t> (*await_resume)(void*);
419  
    void (*destroy)(void*) noexcept;
419  
    void (*destroy)(void*) noexcept;
420  
};
420  
};
421  

421  

422  
struct any_buffer_sink::vtable
422  
struct any_buffer_sink::vtable
423  
{
423  
{
424  
    void (*destroy)(void*) noexcept;
424  
    void (*destroy)(void*) noexcept;
425  
    std::span<mutable_buffer> (*do_prepare)(
425  
    std::span<mutable_buffer> (*do_prepare)(
426  
        void* sink,
426  
        void* sink,
427  
        std::span<mutable_buffer> dest);
427  
        std::span<mutable_buffer> dest);
428  
    std::size_t awaitable_size;
428  
    std::size_t awaitable_size;
429  
    std::size_t awaitable_align;
429  
    std::size_t awaitable_align;
430  
    awaitable_ops const* (*construct_commit_awaitable)(
430  
    awaitable_ops const* (*construct_commit_awaitable)(
431  
        void* sink,
431  
        void* sink,
432  
        void* storage,
432  
        void* storage,
433  
        std::size_t n);
433  
        std::size_t n);
434  
    awaitable_ops const* (*construct_commit_eof_awaitable)(
434  
    awaitable_ops const* (*construct_commit_eof_awaitable)(
435  
        void* sink,
435  
        void* sink,
436  
        void* storage,
436  
        void* storage,
437  
        std::size_t n);
437  
        std::size_t n);
438  

438  

439  
    // WriteSink forwarding (null when wrapped type is BufferSink-only)
439  
    // WriteSink forwarding (null when wrapped type is BufferSink-only)
440  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
440  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
441  
        void* sink,
441  
        void* sink,
442  
        void* storage,
442  
        void* storage,
443  
        std::span<const_buffer const> buffers);
443  
        std::span<const_buffer const> buffers);
444  
    write_awaitable_ops const* (*construct_write_awaitable)(
444  
    write_awaitable_ops const* (*construct_write_awaitable)(
445  
        void* sink,
445  
        void* sink,
446  
        void* storage,
446  
        void* storage,
447  
        std::span<const_buffer const> buffers);
447  
        std::span<const_buffer const> buffers);
448  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
448  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
449  
        void* sink,
449  
        void* sink,
450  
        void* storage,
450  
        void* storage,
451  
        std::span<const_buffer const> buffers);
451  
        std::span<const_buffer const> buffers);
452  
    awaitable_ops const* (*construct_write_eof_awaitable)(
452  
    awaitable_ops const* (*construct_write_eof_awaitable)(
453  
        void* sink,
453  
        void* sink,
454  
        void* storage);
454  
        void* storage);
455  
};
455  
};
456  

456  

457  
template<BufferSink S>
457  
template<BufferSink S>
458  
struct any_buffer_sink::vtable_for_impl
458  
struct any_buffer_sink::vtable_for_impl
459  
{
459  
{
460  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
460  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
461  
        std::size_t{}));
461  
        std::size_t{}));
462  
    using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
462  
    using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
463  
        std::size_t{}));
463  
        std::size_t{}));
464  

464  

465  
    static void
465  
    static void
466  
    do_destroy_impl(void* sink) noexcept
466  
    do_destroy_impl(void* sink) noexcept
467  
    {
467  
    {
468  
        static_cast<S*>(sink)->~S();
468  
        static_cast<S*>(sink)->~S();
469  
    }
469  
    }
470  

470  

471  
    static std::span<mutable_buffer>
471  
    static std::span<mutable_buffer>
472  
    do_prepare_impl(
472  
    do_prepare_impl(
473  
        void* sink,
473  
        void* sink,
474  
        std::span<mutable_buffer> dest)
474  
        std::span<mutable_buffer> dest)
475  
    {
475  
    {
476  
        auto& s = *static_cast<S*>(sink);
476  
        auto& s = *static_cast<S*>(sink);
477  
        return s.prepare(dest);
477  
        return s.prepare(dest);
478  
    }
478  
    }
479  

479  

480  
    static awaitable_ops const*
480  
    static awaitable_ops const*
481  
    construct_commit_awaitable_impl(
481  
    construct_commit_awaitable_impl(
482  
        void* sink,
482  
        void* sink,
483  
        void* storage,
483  
        void* storage,
484  
        std::size_t n)
484  
        std::size_t n)
485  
    {
485  
    {
486  
        auto& s = *static_cast<S*>(sink);
486  
        auto& s = *static_cast<S*>(sink);
487  
        ::new(storage) CommitAwaitable(s.commit(n));
487  
        ::new(storage) CommitAwaitable(s.commit(n));
488  

488  

489  
        static constexpr awaitable_ops ops = {
489  
        static constexpr awaitable_ops ops = {
490  
            +[](void* p) {
490  
            +[](void* p) {
491  
                return static_cast<CommitAwaitable*>(p)->await_ready();
491  
                return static_cast<CommitAwaitable*>(p)->await_ready();
492  
            },
492  
            },
493  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
493  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
494  
                return detail::call_await_suspend(
494  
                return detail::call_await_suspend(
495  
                    static_cast<CommitAwaitable*>(p), h, env);
495  
                    static_cast<CommitAwaitable*>(p), h, env);
496  
            },
496  
            },
497  
            +[](void* p) {
497  
            +[](void* p) {
498  
                return static_cast<CommitAwaitable*>(p)->await_resume();
498  
                return static_cast<CommitAwaitable*>(p)->await_resume();
499  
            },
499  
            },
500  
            +[](void* p) noexcept {
500  
            +[](void* p) noexcept {
501  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
501  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
502  
            }
502  
            }
503  
        };
503  
        };
504  
        return &ops;
504  
        return &ops;
505  
    }
505  
    }
506  

506  

507  
    static awaitable_ops const*
507  
    static awaitable_ops const*
508  
    construct_commit_eof_awaitable_impl(
508  
    construct_commit_eof_awaitable_impl(
509  
        void* sink,
509  
        void* sink,
510  
        void* storage,
510  
        void* storage,
511  
        std::size_t n)
511  
        std::size_t n)
512  
    {
512  
    {
513  
        auto& s = *static_cast<S*>(sink);
513  
        auto& s = *static_cast<S*>(sink);
514  
        ::new(storage) CommitEofAwaitable(s.commit_eof(n));
514  
        ::new(storage) CommitEofAwaitable(s.commit_eof(n));
515  

515  

516  
        static constexpr awaitable_ops ops = {
516  
        static constexpr awaitable_ops ops = {
517  
            +[](void* p) {
517  
            +[](void* p) {
518  
                return static_cast<CommitEofAwaitable*>(p)->await_ready();
518  
                return static_cast<CommitEofAwaitable*>(p)->await_ready();
519  
            },
519  
            },
520  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
520  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
521  
                return detail::call_await_suspend(
521  
                return detail::call_await_suspend(
522  
                    static_cast<CommitEofAwaitable*>(p), h, env);
522  
                    static_cast<CommitEofAwaitable*>(p), h, env);
523  
            },
523  
            },
524  
            +[](void* p) {
524  
            +[](void* p) {
525  
                return static_cast<CommitEofAwaitable*>(p)->await_resume();
525  
                return static_cast<CommitEofAwaitable*>(p)->await_resume();
526  
            },
526  
            },
527  
            +[](void* p) noexcept {
527  
            +[](void* p) noexcept {
528  
                static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
528  
                static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
529  
            }
529  
            }
530  
        };
530  
        };
531  
        return &ops;
531  
        return &ops;
532  
    }
532  
    }
533  

533  

534  
    static write_awaitable_ops const*
534  
    static write_awaitable_ops const*
535  
    construct_write_some_awaitable_impl(
535  
    construct_write_some_awaitable_impl(
536  
        void* sink,
536  
        void* sink,
537  
        void* storage,
537  
        void* storage,
538  
        std::span<const_buffer const> buffers)
538  
        std::span<const_buffer const> buffers)
539  
        requires WriteSink<S>
539  
        requires WriteSink<S>
540  
    {
540  
    {
541  
        using Aw = decltype(std::declval<S&>().write_some(
541  
        using Aw = decltype(std::declval<S&>().write_some(
542  
            std::span<const_buffer const>{}));
542  
            std::span<const_buffer const>{}));
543  
        auto& s = *static_cast<S*>(sink);
543  
        auto& s = *static_cast<S*>(sink);
544  
        ::new(storage) Aw(s.write_some(buffers));
544  
        ::new(storage) Aw(s.write_some(buffers));
545  

545  

546  
        static constexpr write_awaitable_ops ops = {
546  
        static constexpr write_awaitable_ops ops = {
547  
            +[](void* p) {
547  
            +[](void* p) {
548  
                return static_cast<Aw*>(p)->await_ready();
548  
                return static_cast<Aw*>(p)->await_ready();
549  
            },
549  
            },
550  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
550  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
551  
                return detail::call_await_suspend(
551  
                return detail::call_await_suspend(
552  
                    static_cast<Aw*>(p), h, env);
552  
                    static_cast<Aw*>(p), h, env);
553  
            },
553  
            },
554  
            +[](void* p) {
554  
            +[](void* p) {
555  
                return static_cast<Aw*>(p)->await_resume();
555  
                return static_cast<Aw*>(p)->await_resume();
556  
            },
556  
            },
557  
            +[](void* p) noexcept {
557  
            +[](void* p) noexcept {
558  
                static_cast<Aw*>(p)->~Aw();
558  
                static_cast<Aw*>(p)->~Aw();
559  
            }
559  
            }
560  
        };
560  
        };
561  
        return &ops;
561  
        return &ops;
562  
    }
562  
    }
563  

563  

564  
    static write_awaitable_ops const*
564  
    static write_awaitable_ops const*
565  
    construct_write_awaitable_impl(
565  
    construct_write_awaitable_impl(
566  
        void* sink,
566  
        void* sink,
567  
        void* storage,
567  
        void* storage,
568  
        std::span<const_buffer const> buffers)
568  
        std::span<const_buffer const> buffers)
569  
        requires WriteSink<S>
569  
        requires WriteSink<S>
570  
    {
570  
    {
571  
        using Aw = decltype(std::declval<S&>().write(
571  
        using Aw = decltype(std::declval<S&>().write(
572  
            std::span<const_buffer const>{}));
572  
            std::span<const_buffer const>{}));
573  
        auto& s = *static_cast<S*>(sink);
573  
        auto& s = *static_cast<S*>(sink);
574  
        ::new(storage) Aw(s.write(buffers));
574  
        ::new(storage) Aw(s.write(buffers));
575  

575  

576  
        static constexpr write_awaitable_ops ops = {
576  
        static constexpr write_awaitable_ops ops = {
577  
            +[](void* p) {
577  
            +[](void* p) {
578  
                return static_cast<Aw*>(p)->await_ready();
578  
                return static_cast<Aw*>(p)->await_ready();
579  
            },
579  
            },
580  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
580  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
581  
                return detail::call_await_suspend(
581  
                return detail::call_await_suspend(
582  
                    static_cast<Aw*>(p), h, env);
582  
                    static_cast<Aw*>(p), h, env);
583  
            },
583  
            },
584  
            +[](void* p) {
584  
            +[](void* p) {
585  
                return static_cast<Aw*>(p)->await_resume();
585  
                return static_cast<Aw*>(p)->await_resume();
586  
            },
586  
            },
587  
            +[](void* p) noexcept {
587  
            +[](void* p) noexcept {
588  
                static_cast<Aw*>(p)->~Aw();
588  
                static_cast<Aw*>(p)->~Aw();
589  
            }
589  
            }
590  
        };
590  
        };
591  
        return &ops;
591  
        return &ops;
592  
    }
592  
    }
593  

593  

594  
    static write_awaitable_ops const*
594  
    static write_awaitable_ops const*
595  
    construct_write_eof_buffers_awaitable_impl(
595  
    construct_write_eof_buffers_awaitable_impl(
596  
        void* sink,
596  
        void* sink,
597  
        void* storage,
597  
        void* storage,
598  
        std::span<const_buffer const> buffers)
598  
        std::span<const_buffer const> buffers)
599  
        requires WriteSink<S>
599  
        requires WriteSink<S>
600  
    {
600  
    {
601  
        using Aw = decltype(std::declval<S&>().write_eof(
601  
        using Aw = decltype(std::declval<S&>().write_eof(
602  
            std::span<const_buffer const>{}));
602  
            std::span<const_buffer const>{}));
603  
        auto& s = *static_cast<S*>(sink);
603  
        auto& s = *static_cast<S*>(sink);
604  
        ::new(storage) Aw(s.write_eof(buffers));
604  
        ::new(storage) Aw(s.write_eof(buffers));
605  

605  

606  
        static constexpr write_awaitable_ops ops = {
606  
        static constexpr write_awaitable_ops ops = {
607  
            +[](void* p) {
607  
            +[](void* p) {
608  
                return static_cast<Aw*>(p)->await_ready();
608  
                return static_cast<Aw*>(p)->await_ready();
609  
            },
609  
            },
610  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
610  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
611  
                return detail::call_await_suspend(
611  
                return detail::call_await_suspend(
612  
                    static_cast<Aw*>(p), h, env);
612  
                    static_cast<Aw*>(p), h, env);
613  
            },
613  
            },
614  
            +[](void* p) {
614  
            +[](void* p) {
615  
                return static_cast<Aw*>(p)->await_resume();
615  
                return static_cast<Aw*>(p)->await_resume();
616  
            },
616  
            },
617  
            +[](void* p) noexcept {
617  
            +[](void* p) noexcept {
618  
                static_cast<Aw*>(p)->~Aw();
618  
                static_cast<Aw*>(p)->~Aw();
619  
            }
619  
            }
620  
        };
620  
        };
621  
        return &ops;
621  
        return &ops;
622  
    }
622  
    }
623  

623  

624  
    static awaitable_ops const*
624  
    static awaitable_ops const*
625  
    construct_write_eof_awaitable_impl(
625  
    construct_write_eof_awaitable_impl(
626  
        void* sink,
626  
        void* sink,
627  
        void* storage)
627  
        void* storage)
628  
        requires WriteSink<S>
628  
        requires WriteSink<S>
629  
    {
629  
    {
630  
        using Aw = decltype(std::declval<S&>().write_eof());
630  
        using Aw = decltype(std::declval<S&>().write_eof());
631  
        auto& s = *static_cast<S*>(sink);
631  
        auto& s = *static_cast<S*>(sink);
632  
        ::new(storage) Aw(s.write_eof());
632  
        ::new(storage) Aw(s.write_eof());
633  

633  

634  
        static constexpr awaitable_ops ops = {
634  
        static constexpr awaitable_ops ops = {
635  
            +[](void* p) {
635  
            +[](void* p) {
636  
                return static_cast<Aw*>(p)->await_ready();
636  
                return static_cast<Aw*>(p)->await_ready();
637  
            },
637  
            },
638  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
638  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
639  
                return detail::call_await_suspend(
639  
                return detail::call_await_suspend(
640  
                    static_cast<Aw*>(p), h, env);
640  
                    static_cast<Aw*>(p), h, env);
641  
            },
641  
            },
642  
            +[](void* p) {
642  
            +[](void* p) {
643  
                return static_cast<Aw*>(p)->await_resume();
643  
                return static_cast<Aw*>(p)->await_resume();
644  
            },
644  
            },
645  
            +[](void* p) noexcept {
645  
            +[](void* p) noexcept {
646  
                static_cast<Aw*>(p)->~Aw();
646  
                static_cast<Aw*>(p)->~Aw();
647  
            }
647  
            }
648  
        };
648  
        };
649  
        return &ops;
649  
        return &ops;
650  
    }
650  
    }
651  

651  

652  
    static consteval std::size_t
652  
    static consteval std::size_t
653  
    compute_max_size() noexcept
653  
    compute_max_size() noexcept
654  
    {
654  
    {
655  
        std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
655  
        std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
656  
            ? sizeof(CommitAwaitable)
656  
            ? sizeof(CommitAwaitable)
657  
            : sizeof(CommitEofAwaitable);
657  
            : sizeof(CommitEofAwaitable);
658  
        if constexpr (WriteSink<S>)
658  
        if constexpr (WriteSink<S>)
659  
        {
659  
        {
660  
            using WS = decltype(std::declval<S&>().write_some(
660  
            using WS = decltype(std::declval<S&>().write_some(
661  
                std::span<const_buffer const>{}));
661  
                std::span<const_buffer const>{}));
662  
            using W = decltype(std::declval<S&>().write(
662  
            using W = decltype(std::declval<S&>().write(
663  
                std::span<const_buffer const>{}));
663  
                std::span<const_buffer const>{}));
664  
            using WEB = decltype(std::declval<S&>().write_eof(
664  
            using WEB = decltype(std::declval<S&>().write_eof(
665  
                std::span<const_buffer const>{}));
665  
                std::span<const_buffer const>{}));
666  
            using WE = decltype(std::declval<S&>().write_eof());
666  
            using WE = decltype(std::declval<S&>().write_eof());
667  

667  

668  
            if(sizeof(WS) > s) s = sizeof(WS);
668  
            if(sizeof(WS) > s) s = sizeof(WS);
669  
            if(sizeof(W) > s) s = sizeof(W);
669  
            if(sizeof(W) > s) s = sizeof(W);
670  
            if(sizeof(WEB) > s) s = sizeof(WEB);
670  
            if(sizeof(WEB) > s) s = sizeof(WEB);
671  
            if(sizeof(WE) > s) s = sizeof(WE);
671  
            if(sizeof(WE) > s) s = sizeof(WE);
672  
        }
672  
        }
673  
        return s;
673  
        return s;
674  
    }
674  
    }
675  

675  

676  
    static consteval std::size_t
676  
    static consteval std::size_t
677  
    compute_max_align() noexcept
677  
    compute_max_align() noexcept
678  
    {
678  
    {
679  
        std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
679  
        std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
680  
            ? alignof(CommitAwaitable)
680  
            ? alignof(CommitAwaitable)
681  
            : alignof(CommitEofAwaitable);
681  
            : alignof(CommitEofAwaitable);
682  
        if constexpr (WriteSink<S>)
682  
        if constexpr (WriteSink<S>)
683  
        {
683  
        {
684  
            using WS = decltype(std::declval<S&>().write_some(
684  
            using WS = decltype(std::declval<S&>().write_some(
685  
                std::span<const_buffer const>{}));
685  
                std::span<const_buffer const>{}));
686  
            using W = decltype(std::declval<S&>().write(
686  
            using W = decltype(std::declval<S&>().write(
687  
                std::span<const_buffer const>{}));
687  
                std::span<const_buffer const>{}));
688  
            using WEB = decltype(std::declval<S&>().write_eof(
688  
            using WEB = decltype(std::declval<S&>().write_eof(
689  
                std::span<const_buffer const>{}));
689  
                std::span<const_buffer const>{}));
690  
            using WE = decltype(std::declval<S&>().write_eof());
690  
            using WE = decltype(std::declval<S&>().write_eof());
691  

691  

692  
            if(alignof(WS) > a) a = alignof(WS);
692  
            if(alignof(WS) > a) a = alignof(WS);
693  
            if(alignof(W) > a) a = alignof(W);
693  
            if(alignof(W) > a) a = alignof(W);
694  
            if(alignof(WEB) > a) a = alignof(WEB);
694  
            if(alignof(WEB) > a) a = alignof(WEB);
695  
            if(alignof(WE) > a) a = alignof(WE);
695  
            if(alignof(WE) > a) a = alignof(WE);
696  
        }
696  
        }
697  
        return a;
697  
        return a;
698  
    }
698  
    }
699  

699  

700  
    static consteval vtable
700  
    static consteval vtable
701  
    make_vtable() noexcept
701  
    make_vtable() noexcept
702  
    {
702  
    {
703  
        vtable v{};
703  
        vtable v{};
704  
        v.destroy = &do_destroy_impl;
704  
        v.destroy = &do_destroy_impl;
705  
        v.do_prepare = &do_prepare_impl;
705  
        v.do_prepare = &do_prepare_impl;
706  
        v.awaitable_size = compute_max_size();
706  
        v.awaitable_size = compute_max_size();
707  
        v.awaitable_align = compute_max_align();
707  
        v.awaitable_align = compute_max_align();
708  
        v.construct_commit_awaitable = &construct_commit_awaitable_impl;
708  
        v.construct_commit_awaitable = &construct_commit_awaitable_impl;
709  
        v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
709  
        v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
710  
        v.construct_write_some_awaitable = nullptr;
710  
        v.construct_write_some_awaitable = nullptr;
711  
        v.construct_write_awaitable = nullptr;
711  
        v.construct_write_awaitable = nullptr;
712  
        v.construct_write_eof_buffers_awaitable = nullptr;
712  
        v.construct_write_eof_buffers_awaitable = nullptr;
713  
        v.construct_write_eof_awaitable = nullptr;
713  
        v.construct_write_eof_awaitable = nullptr;
714  

714  

715  
        if constexpr (WriteSink<S>)
715  
        if constexpr (WriteSink<S>)
716  
        {
716  
        {
717  
            v.construct_write_some_awaitable =
717  
            v.construct_write_some_awaitable =
718  
                &construct_write_some_awaitable_impl;
718  
                &construct_write_some_awaitable_impl;
719  
            v.construct_write_awaitable =
719  
            v.construct_write_awaitable =
720  
                &construct_write_awaitable_impl;
720  
                &construct_write_awaitable_impl;
721  
            v.construct_write_eof_buffers_awaitable =
721  
            v.construct_write_eof_buffers_awaitable =
722  
                &construct_write_eof_buffers_awaitable_impl;
722  
                &construct_write_eof_buffers_awaitable_impl;
723  
            v.construct_write_eof_awaitable =
723  
            v.construct_write_eof_awaitable =
724  
                &construct_write_eof_awaitable_impl;
724  
                &construct_write_eof_awaitable_impl;
725  
        }
725  
        }
726  
        return v;
726  
        return v;
727  
    }
727  
    }
728  

728  

729  
    static constexpr vtable value = make_vtable();
729  
    static constexpr vtable value = make_vtable();
730  
};
730  
};
731  

731  

732  
inline
732  
inline
733  
any_buffer_sink::~any_buffer_sink()
733  
any_buffer_sink::~any_buffer_sink()
734  
{
734  
{
735  
    if(storage_)
735  
    if(storage_)
736  
    {
736  
    {
737  
        vt_->destroy(sink_);
737  
        vt_->destroy(sink_);
738  
        ::operator delete(storage_);
738  
        ::operator delete(storage_);
739  
    }
739  
    }
740  
    if(cached_awaitable_)
740  
    if(cached_awaitable_)
741  
        ::operator delete(cached_awaitable_);
741  
        ::operator delete(cached_awaitable_);
742  
}
742  
}
743  

743  

744  
inline any_buffer_sink&
744  
inline any_buffer_sink&
745  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
745  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
746  
{
746  
{
747  
    if(this != &other)
747  
    if(this != &other)
748  
    {
748  
    {
749  
        if(storage_)
749  
        if(storage_)
750  
        {
750  
        {
751  
            vt_->destroy(sink_);
751  
            vt_->destroy(sink_);
752  
            ::operator delete(storage_);
752  
            ::operator delete(storage_);
753  
        }
753  
        }
754  
        if(cached_awaitable_)
754  
        if(cached_awaitable_)
755  
            ::operator delete(cached_awaitable_);
755  
            ::operator delete(cached_awaitable_);
756  
        sink_ = std::exchange(other.sink_, nullptr);
756  
        sink_ = std::exchange(other.sink_, nullptr);
757  
        vt_ = std::exchange(other.vt_, nullptr);
757  
        vt_ = std::exchange(other.vt_, nullptr);
758  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
758  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
759  
        storage_ = std::exchange(other.storage_, nullptr);
759  
        storage_ = std::exchange(other.storage_, nullptr);
760  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
760  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
761  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
761  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
762  
    }
762  
    }
763  
    return *this;
763  
    return *this;
764  
}
764  
}
765  

765  

766  
template<BufferSink S>
766  
template<BufferSink S>
767  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
767  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
768  
any_buffer_sink::any_buffer_sink(S s)
768  
any_buffer_sink::any_buffer_sink(S s)
769  
    : vt_(&vtable_for_impl<S>::value)
769  
    : vt_(&vtable_for_impl<S>::value)
770  
{
770  
{
771  
    struct guard {
771  
    struct guard {
772  
        any_buffer_sink* self;
772  
        any_buffer_sink* self;
773  
        bool committed = false;
773  
        bool committed = false;
774  
        ~guard() {
774  
        ~guard() {
775  
            if(!committed && self->storage_) {
775  
            if(!committed && self->storage_) {
776  
                self->vt_->destroy(self->sink_);
776  
                self->vt_->destroy(self->sink_);
777  
                ::operator delete(self->storage_);
777  
                ::operator delete(self->storage_);
778  
                self->storage_ = nullptr;
778  
                self->storage_ = nullptr;
779  
                self->sink_ = nullptr;
779  
                self->sink_ = nullptr;
780  
            }
780  
            }
781  
        }
781  
        }
782  
    } g{this};
782  
    } g{this};
783  

783  

784  
    storage_ = ::operator new(sizeof(S));
784  
    storage_ = ::operator new(sizeof(S));
785  
    sink_ = ::new(storage_) S(std::move(s));
785  
    sink_ = ::new(storage_) S(std::move(s));
786  

786  

787  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
787  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
788  

788  

789  
    g.committed = true;
789  
    g.committed = true;
790  
}
790  
}
791  

791  

792  
template<BufferSink S>
792  
template<BufferSink S>
793  
any_buffer_sink::any_buffer_sink(S* s)
793  
any_buffer_sink::any_buffer_sink(S* s)
794  
    : sink_(s)
794  
    : sink_(s)
795  
    , vt_(&vtable_for_impl<S>::value)
795  
    , vt_(&vtable_for_impl<S>::value)
796  
{
796  
{
797  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
797  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
798  
}
798  
}
799  

799  

800  
inline std::span<mutable_buffer>
800  
inline std::span<mutable_buffer>
801  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
801  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
802  
{
802  
{
803  
    return vt_->do_prepare(sink_, dest);
803  
    return vt_->do_prepare(sink_, dest);
804  
}
804  
}
805  

805  

806  
inline auto
806  
inline auto
807  
any_buffer_sink::commit(std::size_t n)
807  
any_buffer_sink::commit(std::size_t n)
808  
{
808  
{
809  
    struct awaitable
809  
    struct awaitable
810  
    {
810  
    {
811  
        any_buffer_sink* self_;
811  
        any_buffer_sink* self_;
812  
        std::size_t n_;
812  
        std::size_t n_;
813  

813  

814  
        bool
814  
        bool
815  
        await_ready()
815  
        await_ready()
816  
        {
816  
        {
817  
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
817  
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
818  
                self_->sink_,
818  
                self_->sink_,
819  
                self_->cached_awaitable_,
819  
                self_->cached_awaitable_,
820  
                n_);
820  
                n_);
821  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
821  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
822  
        }
822  
        }
823  

823  

824  
        std::coroutine_handle<>
824  
        std::coroutine_handle<>
825  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
825  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
826  
        {
826  
        {
827  
            return self_->active_ops_->await_suspend(
827  
            return self_->active_ops_->await_suspend(
828  
                self_->cached_awaitable_, h, env);
828  
                self_->cached_awaitable_, h, env);
829  
        }
829  
        }
830  

830  

831  
        io_result<>
831  
        io_result<>
832  
        await_resume()
832  
        await_resume()
833  
        {
833  
        {
834  
            struct guard {
834  
            struct guard {
835  
                any_buffer_sink* self;
835  
                any_buffer_sink* self;
836  
                ~guard() {
836  
                ~guard() {
837  
                    self->active_ops_->destroy(self->cached_awaitable_);
837  
                    self->active_ops_->destroy(self->cached_awaitable_);
838  
                    self->active_ops_ = nullptr;
838  
                    self->active_ops_ = nullptr;
839  
                }
839  
                }
840  
            } g{self_};
840  
            } g{self_};
841  
            return self_->active_ops_->await_resume(
841  
            return self_->active_ops_->await_resume(
842  
                self_->cached_awaitable_);
842  
                self_->cached_awaitable_);
843  
        }
843  
        }
844  
    };
844  
    };
845  
    return awaitable{this, n};
845  
    return awaitable{this, n};
846  
}
846  
}
847  

847  

848  
inline auto
848  
inline auto
849  
any_buffer_sink::commit_eof(std::size_t n)
849  
any_buffer_sink::commit_eof(std::size_t n)
850  
{
850  
{
851  
    struct awaitable
851  
    struct awaitable
852  
    {
852  
    {
853  
        any_buffer_sink* self_;
853  
        any_buffer_sink* self_;
854  
        std::size_t n_;
854  
        std::size_t n_;
855  

855  

856  
        bool
856  
        bool
857  
        await_ready()
857  
        await_ready()
858  
        {
858  
        {
859  
            self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
859  
            self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
860  
                self_->sink_,
860  
                self_->sink_,
861  
                self_->cached_awaitable_,
861  
                self_->cached_awaitable_,
862  
                n_);
862  
                n_);
863  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
863  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
864  
        }
864  
        }
865  

865  

866  
        std::coroutine_handle<>
866  
        std::coroutine_handle<>
867  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
867  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
868  
        {
868  
        {
869  
            return self_->active_ops_->await_suspend(
869  
            return self_->active_ops_->await_suspend(
870  
                self_->cached_awaitable_, h, env);
870  
                self_->cached_awaitable_, h, env);
871  
        }
871  
        }
872  

872  

873  
        io_result<>
873  
        io_result<>
874  
        await_resume()
874  
        await_resume()
875  
        {
875  
        {
876  
            struct guard {
876  
            struct guard {
877  
                any_buffer_sink* self;
877  
                any_buffer_sink* self;
878  
                ~guard() {
878  
                ~guard() {
879  
                    self->active_ops_->destroy(self->cached_awaitable_);
879  
                    self->active_ops_->destroy(self->cached_awaitable_);
880  
                    self->active_ops_ = nullptr;
880  
                    self->active_ops_ = nullptr;
881  
                }
881  
                }
882  
            } g{self_};
882  
            } g{self_};
883  
            return self_->active_ops_->await_resume(
883  
            return self_->active_ops_->await_resume(
884  
                self_->cached_awaitable_);
884  
                self_->cached_awaitable_);
885  
        }
885  
        }
886  
    };
886  
    };
887  
    return awaitable{this, n};
887  
    return awaitable{this, n};
888  
}
888  
}
889  

889  

890  
inline auto
890  
inline auto
891  
any_buffer_sink::write_some_(
891  
any_buffer_sink::write_some_(
892  
    std::span<const_buffer const> buffers)
892  
    std::span<const_buffer const> buffers)
893  
{
893  
{
894  
    struct awaitable
894  
    struct awaitable
895  
    {
895  
    {
896  
        any_buffer_sink* self_;
896  
        any_buffer_sink* self_;
897  
        std::span<const_buffer const> buffers_;
897  
        std::span<const_buffer const> buffers_;
898  

898  

899  
        bool
899  
        bool
900  
        await_ready() const noexcept
900  
        await_ready() const noexcept
901  
        {
901  
        {
902  
            return false;
902  
            return false;
903  
        }
903  
        }
904  

904  

905  
        std::coroutine_handle<>
905  
        std::coroutine_handle<>
906  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
906  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
907  
        {
907  
        {
908  
            self_->active_write_ops_ =
908  
            self_->active_write_ops_ =
909  
                self_->vt_->construct_write_some_awaitable(
909  
                self_->vt_->construct_write_some_awaitable(
910  
                    self_->sink_,
910  
                    self_->sink_,
911  
                    self_->cached_awaitable_,
911  
                    self_->cached_awaitable_,
912  
                    buffers_);
912  
                    buffers_);
913  

913  

914  
            if(self_->active_write_ops_->await_ready(
914  
            if(self_->active_write_ops_->await_ready(
915  
                self_->cached_awaitable_))
915  
                self_->cached_awaitable_))
916  
                return h;
916  
                return h;
917  

917  

918  
            return self_->active_write_ops_->await_suspend(
918  
            return self_->active_write_ops_->await_suspend(
919  
                self_->cached_awaitable_, h, env);
919  
                self_->cached_awaitable_, h, env);
920  
        }
920  
        }
921  

921  

922  
        io_result<std::size_t>
922  
        io_result<std::size_t>
923  
        await_resume()
923  
        await_resume()
924  
        {
924  
        {
925  
            struct guard {
925  
            struct guard {
926  
                any_buffer_sink* self;
926  
                any_buffer_sink* self;
927  
                ~guard() {
927  
                ~guard() {
928  
                    self->active_write_ops_->destroy(
928  
                    self->active_write_ops_->destroy(
929  
                        self->cached_awaitable_);
929  
                        self->cached_awaitable_);
930  
                    self->active_write_ops_ = nullptr;
930  
                    self->active_write_ops_ = nullptr;
931  
                }
931  
                }
932  
            } g{self_};
932  
            } g{self_};
933  
            return self_->active_write_ops_->await_resume(
933  
            return self_->active_write_ops_->await_resume(
934  
                self_->cached_awaitable_);
934  
                self_->cached_awaitable_);
935  
        }
935  
        }
936  
    };
936  
    };
937  
    return awaitable{this, buffers};
937  
    return awaitable{this, buffers};
938  
}
938  
}
939  

939  

940  
inline auto
940  
inline auto
941  
any_buffer_sink::write_(
941  
any_buffer_sink::write_(
942  
    std::span<const_buffer const> buffers)
942  
    std::span<const_buffer const> buffers)
943  
{
943  
{
944  
    struct awaitable
944  
    struct awaitable
945  
    {
945  
    {
946  
        any_buffer_sink* self_;
946  
        any_buffer_sink* self_;
947  
        std::span<const_buffer const> buffers_;
947  
        std::span<const_buffer const> buffers_;
948  

948  

949  
        bool
949  
        bool
950  
        await_ready() const noexcept
950  
        await_ready() const noexcept
951  
        {
951  
        {
952  
            return false;
952  
            return false;
953  
        }
953  
        }
954  

954  

955  
        std::coroutine_handle<>
955  
        std::coroutine_handle<>
956  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
956  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
957  
        {
957  
        {
958  
            self_->active_write_ops_ =
958  
            self_->active_write_ops_ =
959  
                self_->vt_->construct_write_awaitable(
959  
                self_->vt_->construct_write_awaitable(
960  
                    self_->sink_,
960  
                    self_->sink_,
961  
                    self_->cached_awaitable_,
961  
                    self_->cached_awaitable_,
962  
                    buffers_);
962  
                    buffers_);
963  

963  

964  
            if(self_->active_write_ops_->await_ready(
964  
            if(self_->active_write_ops_->await_ready(
965  
                self_->cached_awaitable_))
965  
                self_->cached_awaitable_))
966  
                return h;
966  
                return h;
967  

967  

968  
            return self_->active_write_ops_->await_suspend(
968  
            return self_->active_write_ops_->await_suspend(
969  
                self_->cached_awaitable_, h, env);
969  
                self_->cached_awaitable_, h, env);
970  
        }
970  
        }
971  

971  

972  
        io_result<std::size_t>
972  
        io_result<std::size_t>
973  
        await_resume()
973  
        await_resume()
974  
        {
974  
        {
975  
            struct guard {
975  
            struct guard {
976  
                any_buffer_sink* self;
976  
                any_buffer_sink* self;
977  
                ~guard() {
977  
                ~guard() {
978  
                    self->active_write_ops_->destroy(
978  
                    self->active_write_ops_->destroy(
979  
                        self->cached_awaitable_);
979  
                        self->cached_awaitable_);
980  
                    self->active_write_ops_ = nullptr;
980  
                    self->active_write_ops_ = nullptr;
981  
                }
981  
                }
982  
            } g{self_};
982  
            } g{self_};
983  
            return self_->active_write_ops_->await_resume(
983  
            return self_->active_write_ops_->await_resume(
984  
                self_->cached_awaitable_);
984  
                self_->cached_awaitable_);
985  
        }
985  
        }
986  
    };
986  
    };
987  
    return awaitable{this, buffers};
987  
    return awaitable{this, buffers};
988  
}
988  
}
989  

989  

990  
inline auto
990  
inline auto
991  
any_buffer_sink::write_eof_buffers_(
991  
any_buffer_sink::write_eof_buffers_(
992  
    std::span<const_buffer const> buffers)
992  
    std::span<const_buffer const> buffers)
993  
{
993  
{
994  
    struct awaitable
994  
    struct awaitable
995  
    {
995  
    {
996  
        any_buffer_sink* self_;
996  
        any_buffer_sink* self_;
997  
        std::span<const_buffer const> buffers_;
997  
        std::span<const_buffer const> buffers_;
998  

998  

999  
        bool
999  
        bool
1000  
        await_ready() const noexcept
1000  
        await_ready() const noexcept
1001  
        {
1001  
        {
1002  
            return false;
1002  
            return false;
1003  
        }
1003  
        }
1004  

1004  

1005  
        std::coroutine_handle<>
1005  
        std::coroutine_handle<>
1006  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1006  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1007  
        {
1007  
        {
1008  
            self_->active_write_ops_ =
1008  
            self_->active_write_ops_ =
1009  
                self_->vt_->construct_write_eof_buffers_awaitable(
1009  
                self_->vt_->construct_write_eof_buffers_awaitable(
1010  
                    self_->sink_,
1010  
                    self_->sink_,
1011  
                    self_->cached_awaitable_,
1011  
                    self_->cached_awaitable_,
1012  
                    buffers_);
1012  
                    buffers_);
1013  

1013  

1014  
            if(self_->active_write_ops_->await_ready(
1014  
            if(self_->active_write_ops_->await_ready(
1015  
                self_->cached_awaitable_))
1015  
                self_->cached_awaitable_))
1016  
                return h;
1016  
                return h;
1017  

1017  

1018  
            return self_->active_write_ops_->await_suspend(
1018  
            return self_->active_write_ops_->await_suspend(
1019  
                self_->cached_awaitable_, h, env);
1019  
                self_->cached_awaitable_, h, env);
1020  
        }
1020  
        }
1021  

1021  

1022  
        io_result<std::size_t>
1022  
        io_result<std::size_t>
1023  
        await_resume()
1023  
        await_resume()
1024  
        {
1024  
        {
1025  
            struct guard {
1025  
            struct guard {
1026  
                any_buffer_sink* self;
1026  
                any_buffer_sink* self;
1027  
                ~guard() {
1027  
                ~guard() {
1028  
                    self->active_write_ops_->destroy(
1028  
                    self->active_write_ops_->destroy(
1029  
                        self->cached_awaitable_);
1029  
                        self->cached_awaitable_);
1030  
                    self->active_write_ops_ = nullptr;
1030  
                    self->active_write_ops_ = nullptr;
1031  
                }
1031  
                }
1032  
            } g{self_};
1032  
            } g{self_};
1033  
            return self_->active_write_ops_->await_resume(
1033  
            return self_->active_write_ops_->await_resume(
1034  
                self_->cached_awaitable_);
1034  
                self_->cached_awaitable_);
1035  
        }
1035  
        }
1036  
    };
1036  
    };
1037  
    return awaitable{this, buffers};
1037  
    return awaitable{this, buffers};
1038  
}
1038  
}
1039  

1039  

1040  
template<ConstBufferSequence CB>
1040  
template<ConstBufferSequence CB>
1041  
io_task<std::size_t>
1041  
io_task<std::size_t>
1042  
any_buffer_sink::write_some(CB buffers)
1042  
any_buffer_sink::write_some(CB buffers)
1043  
{
1043  
{
1044  
    buffer_param<CB> bp(buffers);
1044  
    buffer_param<CB> bp(buffers);
1045  
    auto src = bp.data();
1045  
    auto src = bp.data();
1046  
    if(src.empty())
1046  
    if(src.empty())
1047  
        co_return {{}, 0};
1047  
        co_return {{}, 0};
1048  

1048  

1049  
    // Native WriteSink path
1049  
    // Native WriteSink path
1050  
    if(vt_->construct_write_some_awaitable)
1050  
    if(vt_->construct_write_some_awaitable)
1051  
        co_return co_await write_some_(src);
1051  
        co_return co_await write_some_(src);
1052  

1052  

1053  
    // Synthesized path: prepare + buffer_copy + commit
1053  
    // Synthesized path: prepare + buffer_copy + commit
1054  
    mutable_buffer arr[detail::max_iovec_];
1054  
    mutable_buffer arr[detail::max_iovec_];
1055  
    auto dst_bufs = prepare(arr);
1055  
    auto dst_bufs = prepare(arr);
1056  
    if(dst_bufs.empty())
1056  
    if(dst_bufs.empty())
1057  
    {
1057  
    {
1058  
        auto [ec] = co_await commit(0);
1058  
        auto [ec] = co_await commit(0);
1059  
        if(ec)
1059  
        if(ec)
1060  
            co_return {ec, 0};
1060  
            co_return {ec, 0};
1061  
        dst_bufs = prepare(arr);
1061  
        dst_bufs = prepare(arr);
1062  
        if(dst_bufs.empty())
1062  
        if(dst_bufs.empty())
1063  
            co_return {{}, 0};
1063  
            co_return {{}, 0};
1064  
    }
1064  
    }
1065  

1065  

1066  
    auto n = buffer_copy(dst_bufs, src);
1066  
    auto n = buffer_copy(dst_bufs, src);
1067  
    auto [ec] = co_await commit(n);
1067  
    auto [ec] = co_await commit(n);
1068  
    if(ec)
1068  
    if(ec)
1069  
        co_return {ec, 0};
1069  
        co_return {ec, 0};
1070  
    co_return {{}, n};
1070  
    co_return {{}, n};
1071  
}
1071  
}
1072  

1072  

1073  
template<ConstBufferSequence CB>
1073  
template<ConstBufferSequence CB>
1074  
io_task<std::size_t>
1074  
io_task<std::size_t>
1075  
any_buffer_sink::write(CB buffers)
1075  
any_buffer_sink::write(CB buffers)
1076  
{
1076  
{
1077  
    buffer_param<CB> bp(buffers);
1077  
    buffer_param<CB> bp(buffers);
1078  
    std::size_t total = 0;
1078  
    std::size_t total = 0;
1079  

1079  

1080  
    // Native WriteSink path
1080  
    // Native WriteSink path
1081  
    if(vt_->construct_write_awaitable)
1081  
    if(vt_->construct_write_awaitable)
1082  
    {
1082  
    {
1083  
        for(;;)
1083  
        for(;;)
1084  
        {
1084  
        {
1085  
            auto bufs = bp.data();
1085  
            auto bufs = bp.data();
1086  
            if(bufs.empty())
1086  
            if(bufs.empty())
1087  
                break;
1087  
                break;
1088  

1088  

1089  
            auto [ec, n] = co_await write_(bufs);
1089  
            auto [ec, n] = co_await write_(bufs);
1090  
            total += n;
1090  
            total += n;
1091  
            if(ec)
1091  
            if(ec)
1092  
                co_return {ec, total};
1092  
                co_return {ec, total};
1093  
            bp.consume(n);
1093  
            bp.consume(n);
1094  
        }
1094  
        }
1095  
        co_return {{}, total};
1095  
        co_return {{}, total};
1096  
    }
1096  
    }
1097  

1097  

1098  
    // Synthesized path: prepare + buffer_copy + commit
1098  
    // Synthesized path: prepare + buffer_copy + commit
1099  
    for(;;)
1099  
    for(;;)
1100  
    {
1100  
    {
1101  
        auto src = bp.data();
1101  
        auto src = bp.data();
1102  
        if(src.empty())
1102  
        if(src.empty())
1103  
            break;
1103  
            break;
1104  

1104  

1105  
        mutable_buffer arr[detail::max_iovec_];
1105  
        mutable_buffer arr[detail::max_iovec_];
1106  
        auto dst_bufs = prepare(arr);
1106  
        auto dst_bufs = prepare(arr);
1107  
        if(dst_bufs.empty())
1107  
        if(dst_bufs.empty())
1108  
        {
1108  
        {
1109  
            auto [ec] = co_await commit(0);
1109  
            auto [ec] = co_await commit(0);
1110  
            if(ec)
1110  
            if(ec)
1111  
                co_return {ec, total};
1111  
                co_return {ec, total};
1112  
            continue;
1112  
            continue;
1113  
        }
1113  
        }
1114  

1114  

1115  
        auto n = buffer_copy(dst_bufs, src);
1115  
        auto n = buffer_copy(dst_bufs, src);
1116  
        auto [ec] = co_await commit(n);
1116  
        auto [ec] = co_await commit(n);
1117  
        if(ec)
1117  
        if(ec)
1118  
            co_return {ec, total};
1118  
            co_return {ec, total};
1119  
        bp.consume(n);
1119  
        bp.consume(n);
1120  
        total += n;
1120  
        total += n;
1121  
    }
1121  
    }
1122  

1122  

1123  
    co_return {{}, total};
1123  
    co_return {{}, total};
1124  
}
1124  
}
1125  

1125  

1126  
inline auto
1126  
inline auto
1127  
any_buffer_sink::write_eof()
1127  
any_buffer_sink::write_eof()
1128  
{
1128  
{
1129  
    struct awaitable
1129  
    struct awaitable
1130  
    {
1130  
    {
1131  
        any_buffer_sink* self_;
1131  
        any_buffer_sink* self_;
1132  

1132  

1133  
        bool
1133  
        bool
1134  
        await_ready()
1134  
        await_ready()
1135  
        {
1135  
        {
1136  
            if(self_->vt_->construct_write_eof_awaitable)
1136  
            if(self_->vt_->construct_write_eof_awaitable)
1137  
            {
1137  
            {
1138  
                // Native WriteSink: forward to underlying write_eof()
1138  
                // Native WriteSink: forward to underlying write_eof()
1139  
                self_->active_ops_ =
1139  
                self_->active_ops_ =
1140  
                    self_->vt_->construct_write_eof_awaitable(
1140  
                    self_->vt_->construct_write_eof_awaitable(
1141  
                        self_->sink_,
1141  
                        self_->sink_,
1142  
                        self_->cached_awaitable_);
1142  
                        self_->cached_awaitable_);
1143  
            }
1143  
            }
1144  
            else
1144  
            else
1145  
            {
1145  
            {
1146  
                // Synthesized: commit_eof(0)
1146  
                // Synthesized: commit_eof(0)
1147  
                self_->active_ops_ =
1147  
                self_->active_ops_ =
1148  
                    self_->vt_->construct_commit_eof_awaitable(
1148  
                    self_->vt_->construct_commit_eof_awaitable(
1149  
                        self_->sink_,
1149  
                        self_->sink_,
1150  
                        self_->cached_awaitable_,
1150  
                        self_->cached_awaitable_,
1151  
                        0);
1151  
                        0);
1152  
            }
1152  
            }
1153  
            return self_->active_ops_->await_ready(
1153  
            return self_->active_ops_->await_ready(
1154  
                self_->cached_awaitable_);
1154  
                self_->cached_awaitable_);
1155  
        }
1155  
        }
1156  

1156  

1157  
        std::coroutine_handle<>
1157  
        std::coroutine_handle<>
1158  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1158  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1159  
        {
1159  
        {
1160  
            return self_->active_ops_->await_suspend(
1160  
            return self_->active_ops_->await_suspend(
1161  
                self_->cached_awaitable_, h, env);
1161  
                self_->cached_awaitable_, h, env);
1162  
        }
1162  
        }
1163  

1163  

1164  
        io_result<>
1164  
        io_result<>
1165  
        await_resume()
1165  
        await_resume()
1166  
        {
1166  
        {
1167  
            struct guard {
1167  
            struct guard {
1168  
                any_buffer_sink* self;
1168  
                any_buffer_sink* self;
1169  
                ~guard() {
1169  
                ~guard() {
1170  
                    self->active_ops_->destroy(self->cached_awaitable_);
1170  
                    self->active_ops_->destroy(self->cached_awaitable_);
1171  
                    self->active_ops_ = nullptr;
1171  
                    self->active_ops_ = nullptr;
1172  
                }
1172  
                }
1173  
            } g{self_};
1173  
            } g{self_};
1174  
            return self_->active_ops_->await_resume(
1174  
            return self_->active_ops_->await_resume(
1175  
                self_->cached_awaitable_);
1175  
                self_->cached_awaitable_);
1176  
        }
1176  
        }
1177  
    };
1177  
    };
1178  
    return awaitable{this};
1178  
    return awaitable{this};
1179  
}
1179  
}
1180  

1180  

1181  
template<ConstBufferSequence CB>
1181  
template<ConstBufferSequence CB>
1182  
io_task<std::size_t>
1182  
io_task<std::size_t>
1183  
any_buffer_sink::write_eof(CB buffers)
1183  
any_buffer_sink::write_eof(CB buffers)
1184  
{
1184  
{
1185  
    // Native WriteSink path
1185  
    // Native WriteSink path
1186  
    if(vt_->construct_write_eof_buffers_awaitable)
1186  
    if(vt_->construct_write_eof_buffers_awaitable)
1187  
    {
1187  
    {
1188  
        const_buffer_param<CB> bp(buffers);
1188  
        const_buffer_param<CB> bp(buffers);
1189  
        std::size_t total = 0;
1189  
        std::size_t total = 0;
1190  

1190  

1191  
        for(;;)
1191  
        for(;;)
1192  
        {
1192  
        {
1193  
            auto bufs = bp.data();
1193  
            auto bufs = bp.data();
1194  
            if(bufs.empty())
1194  
            if(bufs.empty())
1195  
            {
1195  
            {
1196  
                auto [ec] = co_await write_eof();
1196  
                auto [ec] = co_await write_eof();
1197  
                co_return {ec, total};
1197  
                co_return {ec, total};
1198  
            }
1198  
            }
1199  

1199  

1200  
            if(!bp.more())
1200  
            if(!bp.more())
1201  
            {
1201  
            {
1202  
                // Last window: send atomically with EOF
1202  
                // Last window: send atomically with EOF
1203  
                auto [ec, n] = co_await write_eof_buffers_(bufs);
1203  
                auto [ec, n] = co_await write_eof_buffers_(bufs);
1204  
                total += n;
1204  
                total += n;
1205  
                co_return {ec, total};
1205  
                co_return {ec, total};
1206  
            }
1206  
            }
1207  

1207  

1208  
            auto [ec, n] = co_await write_(bufs);
1208  
            auto [ec, n] = co_await write_(bufs);
1209  
            total += n;
1209  
            total += n;
1210  
            if(ec)
1210  
            if(ec)
1211  
                co_return {ec, total};
1211  
                co_return {ec, total};
1212  
            bp.consume(n);
1212  
            bp.consume(n);
1213  
        }
1213  
        }
1214  
    }
1214  
    }
1215  

1215  

1216  
    // Synthesized path: prepare + buffer_copy + commit + commit_eof
1216  
    // Synthesized path: prepare + buffer_copy + commit + commit_eof
1217  
    buffer_param<CB> bp(buffers);
1217  
    buffer_param<CB> bp(buffers);
1218  
    std::size_t total = 0;
1218  
    std::size_t total = 0;
1219  

1219  

1220  
    for(;;)
1220  
    for(;;)
1221  
    {
1221  
    {
1222  
        auto src = bp.data();
1222  
        auto src = bp.data();
1223  
        if(src.empty())
1223  
        if(src.empty())
1224  
            break;
1224  
            break;
1225  

1225  

1226  
        mutable_buffer arr[detail::max_iovec_];
1226  
        mutable_buffer arr[detail::max_iovec_];
1227  
        auto dst_bufs = prepare(arr);
1227  
        auto dst_bufs = prepare(arr);
1228  
        if(dst_bufs.empty())
1228  
        if(dst_bufs.empty())
1229  
        {
1229  
        {
1230  
            auto [ec] = co_await commit(0);
1230  
            auto [ec] = co_await commit(0);
1231  
            if(ec)
1231  
            if(ec)
1232  
                co_return {ec, total};
1232  
                co_return {ec, total};
1233  
            continue;
1233  
            continue;
1234  
        }
1234  
        }
1235  

1235  

1236  
        auto n = buffer_copy(dst_bufs, src);
1236  
        auto n = buffer_copy(dst_bufs, src);
1237  
        auto [ec] = co_await commit(n);
1237  
        auto [ec] = co_await commit(n);
1238  
        if(ec)
1238  
        if(ec)
1239  
            co_return {ec, total};
1239  
            co_return {ec, total};
1240  
        bp.consume(n);
1240  
        bp.consume(n);
1241  
        total += n;
1241  
        total += n;
1242  
    }
1242  
    }
1243  

1243  

1244  
    auto [ec] = co_await commit_eof(0);
1244  
    auto [ec] = co_await commit_eof(0);
1245  
    if(ec)
1245  
    if(ec)
1246  
        co_return {ec, total};
1246  
        co_return {ec, total};
1247  

1247  

1248  
    co_return {{}, total};
1248  
    co_return {{}, total};
1249  
}
1249  
}
1250  

1250  

1251  
static_assert(BufferSink<any_buffer_sink>);
1251  
static_assert(BufferSink<any_buffer_sink>);
1252  
static_assert(WriteSink<any_buffer_sink>);
1252  
static_assert(WriteSink<any_buffer_sink>);
1253  

1253  

1254  
} // namespace capy
1254  
} // namespace capy
1255  
} // namespace boost
1255  
} // namespace boost
1256  

1256  

1257  
#endif
1257  
#endif