src/ex/detail/strand_service.cpp

97.8% Lines (89/91) 95.5% List of functions (21/22) 90.6% Branches (29/32)
f(x) Functions (22)
Function Calls Lines Branches Blocks
boost::capy::detail::strand_invoker::promise_type::operator new(unsigned long, boost::capy::detail::strand_impl&) :53 0 100.0% 75.0% boost::capy::detail::strand_invoker::promise_type::operator delete(void*, unsigned long) :70 0 87.5% 50.0% boost::capy::detail::strand_invoker::promise_type::get_return_object() :84 0 100.0% boost::capy::detail::strand_invoker::promise_type::initial_suspend() :87 0 100.0% boost::capy::detail::strand_invoker::promise_type::final_suspend() :88 0 100.0% boost::capy::detail::strand_invoker::promise_type::return_void() :89 0 100.0% boost::capy::detail::strand_invoker::promise_type::unhandled_exception() :90 0 0.0% boost::capy::detail::strand_service_impl::strand_service_impl(boost::capy::execution_context&) :112 0 100.0% boost::capy::detail::strand_service_impl::get_implementation() :117 0 100.0% 100.0% boost::capy::detail::strand_service_impl::shutdown() :127 0 100.0% 100.0% boost::capy::detail::strand_service_impl::enqueue(boost::capy::detail::strand_impl&, std::__n4861::coroutine_handle<void>) :143 0 100.0% 100.0% boost::capy::detail::strand_service_impl::dispatch_pending(boost::capy::detail::strand_impl&) :156 0 100.0% 100.0% boost::capy::detail::strand_service_impl::try_unlock(boost::capy::detail::strand_impl&) :167 0 100.0% 100.0% boost::capy::detail::strand_service_impl::set_dispatch_thread(boost::capy::detail::strand_impl&) :179 0 100.0% boost::capy::detail::strand_service_impl::clear_dispatch_thread(boost::capy::detail::strand_impl&) :185 0 100.0% boost::capy::detail::strand_service_impl::make_invoker(boost::capy::detail::strand_impl&) :193 0 100.0% 100.0% boost::capy::detail::strand_service::strand_service() :213 0 100.0% boost::capy::detail::strand_service::~strand_service() :219 0 100.0% boost::capy::detail::strand_service::running_in_this_thread(boost::capy::detail::strand_impl&) :223 0 100.0% boost::capy::detail::strand_service::dispatch(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>) :230 0 100.0% 83.3% boost::capy::detail::strand_service::post(boost::capy::detail::strand_impl&, boost::capy::executor_ref, std::__n4861::coroutine_handle<void>) :242 0 100.0% 100.0% boost::capy::detail::get_strand_service(boost::capy::execution_context&) :250 0 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #include "src/ex/detail/strand_queue.hpp"
11 #include <boost/capy/ex/detail/strand_service.hpp>
12 #include <atomic>
13 #include <coroutine>
14 #include <mutex>
15 #include <thread>
16 #include <utility>
17
18 namespace boost {
19 namespace capy {
20 namespace detail {
21
22 //----------------------------------------------------------
23
24 /** Implementation state for a strand.
25
26 Each strand_impl provides serialization for coroutines
27 dispatched through strands that share it.
28 */
29 // Sentinel stored in cached_frame_ after shutdown to prevent
30 // in-flight invokers from repopulating a freed cache slot.
31 inline void* const kCacheClosed = reinterpret_cast<void*>(1);
32
33 struct strand_impl
34 {
35 std::mutex mutex_;
36 strand_queue pending_;
37 bool locked_ = false;
38 std::atomic<std::thread::id> dispatch_thread_{};
39 std::atomic<void*> cached_frame_{nullptr};
40 };
41
42 //----------------------------------------------------------
43
44 /** Invoker coroutine for strand dispatch.
45
46 Uses custom allocator to recycle frame - one allocation
47 per strand_impl lifetime, stored in trailer for recovery.
48 */
49 struct strand_invoker
50 {
51 struct promise_type
52 {
53 14x void* operator new(std::size_t n, strand_impl& impl)
54 {
55 14x constexpr auto A = alignof(strand_impl*);
56 14x std::size_t padded = (n + A - 1) & ~(A - 1);
57 14x std::size_t total = padded + sizeof(strand_impl*);
58
59 14x void* p = impl.cached_frame_.exchange(
60 nullptr, std::memory_order_acquire);
61
3/4
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 3 times.
14x if(!p || p == kCacheClosed)
62 11x p = ::operator new(total);
63
64 // Trailer lets delete recover impl
65 14x *reinterpret_cast<strand_impl**>(
66 14x static_cast<char*>(p) + padded) = &impl;
67 14x return p;
68 }
69
70 14x void operator delete(void* p, std::size_t n) noexcept
71 {
72 14x constexpr auto A = alignof(strand_impl*);
73 14x std::size_t padded = (n + A - 1) & ~(A - 1);
74
75 14x auto* impl = *reinterpret_cast<strand_impl**>(
76 static_cast<char*>(p) + padded);
77
78 14x void* expected = nullptr;
79
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 14 times.
14x if(!impl->cached_frame_.compare_exchange_strong(
80 expected, p, std::memory_order_release))
81 ::operator delete(p);
82 14x }
83
84 14x strand_invoker get_return_object() noexcept
85 14x { return {std::coroutine_handle<promise_type>::from_promise(*this)}; }
86
87 14x std::suspend_always initial_suspend() noexcept { return {}; }
88 14x std::suspend_never final_suspend() noexcept { return {}; }
89 14x void return_void() noexcept {}
90 void unhandled_exception() { std::terminate(); }
91 };
92
93 std::coroutine_handle<promise_type> h_;
94 };
95
96 //----------------------------------------------------------
97
98 /** Concrete implementation of strand_service.
99
100 Holds the fixed pool of strand_impl objects.
101 */
102 class strand_service_impl : public strand_service
103 {
104 static constexpr std::size_t num_impls = 211;
105
106 strand_impl impls_[num_impls];
107 std::size_t salt_ = 0;
108 std::mutex mutex_;
109
110 public:
111 explicit
112 23x strand_service_impl(execution_context&)
113 4876x {
114 23x }
115
116 strand_impl*
117 27x get_implementation() override
118 {
119
1/1
✓ Branch 1 taken 27 times.
27x std::lock_guard<std::mutex> lock(mutex_);
120 27x std::size_t index = salt_++;
121 27x index = index % num_impls;
122 27x return &impls_[index];
123 27x }
124
125 protected:
126 void
127 23x shutdown() override
128 {
129
2/2
✓ Branch 0 taken 4853 times.
✓ Branch 1 taken 23 times.
4876x for(std::size_t i = 0; i < num_impls; ++i)
130 {
131
1/1
✓ Branch 1 taken 4853 times.
4853x std::lock_guard<std::mutex> lock(impls_[i].mutex_);
132 4853x impls_[i].locked_ = true;
133
134 4853x void* p = impls_[i].cached_frame_.exchange(
135 kCacheClosed, std::memory_order_acquire);
136
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 4842 times.
4853x if(p)
137 11x ::operator delete(p);
138 4853x }
139 23x }
140
141 private:
142 static bool
143 332x enqueue(strand_impl& impl, std::coroutine_handle<> h)
144 {
145
1/1
✓ Branch 1 taken 332 times.
332x std::lock_guard<std::mutex> lock(impl.mutex_);
146
1/1
✓ Branch 1 taken 332 times.
332x impl.pending_.push(h);
147
2/2
✓ Branch 0 taken 14 times.
✓ Branch 1 taken 318 times.
332x if(!impl.locked_)
148 {
149 14x impl.locked_ = true;
150 14x return true;
151 }
152 318x return false;
153 332x }
154
155 static void
156 28x dispatch_pending(strand_impl& impl)
157 {
158 28x strand_queue::taken_batch batch;
159 {
160
1/1
✓ Branch 1 taken 28 times.
28x std::lock_guard<std::mutex> lock(impl.mutex_);
161 28x batch = impl.pending_.take_all();
162 28x }
163
1/1
✓ Branch 1 taken 28 times.
28x impl.pending_.dispatch_batch(batch);
164 28x }
165
166 static bool
167 28x try_unlock(strand_impl& impl)
168 {
169
1/1
✓ Branch 1 taken 28 times.
28x std::lock_guard<std::mutex> lock(impl.mutex_);
170
2/2
✓ Branch 1 taken 14 times.
✓ Branch 2 taken 14 times.
28x if(impl.pending_.empty())
171 {
172 14x impl.locked_ = false;
173 14x return true;
174 }
175 14x return false;
176 28x }
177
178 static void
179 28x set_dispatch_thread(strand_impl& impl) noexcept
180 {
181 28x impl.dispatch_thread_.store(std::this_thread::get_id());
182 28x }
183
184 static void
185 14x clear_dispatch_thread(strand_impl& impl) noexcept
186 {
187 14x impl.dispatch_thread_.store(std::thread::id{});
188 14x }
189
190 // Loops until queue empty (aggressive). Alternative: per-batch fairness
191 // (repost after each batch to let other work run) - explore if starvation observed.
192 static strand_invoker
193
1/1
✓ Branch 1 taken 14 times.
14x make_invoker(strand_impl& impl)
194 {
195 strand_impl* p = &impl;
196 for(;;)
197 {
198 set_dispatch_thread(*p);
199 dispatch_pending(*p);
200 if(try_unlock(*p))
201 {
202 clear_dispatch_thread(*p);
203 co_return;
204 }
205 }
206 28x }
207
208 friend class strand_service;
209 };
210
211 //----------------------------------------------------------
212
213 23x strand_service::
214 23x strand_service()
215 23x : service()
216 {
217 23x }
218
219 23x strand_service::
220 ~strand_service() = default;
221
222 bool
223 10x strand_service::
224 running_in_this_thread(strand_impl& impl) noexcept
225 {
226 10x return impl.dispatch_thread_.load() == std::this_thread::get_id();
227 }
228
229 std::coroutine_handle<>
230 8x strand_service::
231 dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
232 {
233
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 5 times.
8x if(running_in_this_thread(impl))
234 3x return h;
235
236
1/2
✓ Branch 1 taken 5 times.
✗ Branch 2 not taken.
5x if(strand_service_impl::enqueue(impl, h))
237
2/2
✓ Branch 1 taken 5 times.
✓ Branch 5 taken 5 times.
5x ex.post(strand_service_impl::make_invoker(impl).h_);
238 5x return std::noop_coroutine();
239 }
240
241 void
242 327x strand_service::
243 post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h)
244 {
245
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 318 times.
327x if(strand_service_impl::enqueue(impl, h))
246
2/2
✓ Branch 1 taken 9 times.
✓ Branch 5 taken 9 times.
9x ex.post(strand_service_impl::make_invoker(impl).h_);
247 327x }
248
249 strand_service&
250 27x get_strand_service(execution_context& ctx)
251 {
252 27x return ctx.use_service<strand_service_impl>();
253 }
254
255 } // namespace detail
256 } // namespace capy
257 } // namespace boost
258