src/ex/thread_pool.cpp

100.0% Lines (119/119) 100.0% List of functions (24/24) 85.2% Branches (46/54)
f(x) Functions (24)
Function Calls Lines Branches Blocks
boost::capy::thread_pool::impl::work::work(std::__n4861::coroutine_handle<void>) :55 0 100.0% boost::capy::thread_pool::impl::work::run() :60 0 100.0% 66.7% boost::capy::thread_pool::impl::work::destroy() :67 0 100.0% 77.8% boost::capy::thread_pool::impl::~impl() :88 0 100.0% 100.0% boost::capy::thread_pool::impl::impl(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :94 0 100.0% 100.0% boost::capy::thread_pool::impl::post(std::__n4861::coroutine_handle<void>) :107 0 100.0% 100.0% boost::capy::thread_pool::impl::on_work_started() :119 0 100.0% boost::capy::thread_pool::impl::on_work_finished() :125 0 100.0% 100.0% boost::capy::thread_pool::impl::join() :138 0 100.0% 100.0% boost::capy::thread_pool::impl::join()::{lambda()#1}::operator()() const :154 0 100.0% 75.0% boost::capy::thread_pool::impl::stop() :166 0 100.0% boost::capy::thread_pool::impl::ensure_started() :177 0 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const :179 0 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const::{lambda()#1}::operator()() const :182 0 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long) :187 0 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long)::{lambda()#1}::operator()() const :199 0 100.0% 90.0% boost::capy::thread_pool::~thread_pool() :215 0 100.0% 50.0% boost::capy::thread_pool::thread_pool(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :225 0 100.0% 60.0% boost::capy::thread_pool::join() :233 0 100.0% boost::capy::thread_pool::stop() :240 0 100.0% boost::capy::thread_pool::get_executor() const :249 0 100.0% boost::capy::thread_pool::executor_type::on_work_started() const :257 0 100.0% boost::capy::thread_pool::executor_type::on_work_finished() const :264 0 100.0% boost::capy::thread_pool::executor_type::post(std::__n4861::coroutine_handle<void>) const :271 0 100.0%
Line Branch TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/detail/intrusive.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <algorithm>
15 #include <atomic>
16 #include <condition_variable>
17 #include <cstdio>
18 #include <mutex>
19 #include <thread>
20 #include <vector>
21
22 /*
23 Thread pool implementation using a shared work queue.
24
25 Work items are coroutine handles wrapped in intrusive list nodes, stored
26 in a single queue protected by a mutex. Worker threads wait on a
27 condition_variable until work is available or stop is requested.
28
29 Threads are started lazily on first post() via std::call_once to avoid
30 spawning threads for pools that are constructed but never used. Each
31 thread is named with a configurable prefix plus index for debugger
32 visibility.
33
34 Work tracking: on_work_started/on_work_finished maintain an atomic
35 outstanding_work_ counter. join() blocks until this counter reaches
36 zero, then signals workers to stop and joins threads.
37
38 Two shutdown paths:
39 - join(): waits for outstanding work to drain, then stops workers.
40 - stop(): immediately signals workers to exit; queued work is abandoned.
41 - Destructor: stop() then join() (abandon + wait for threads).
42 */
43
44 namespace boost {
45 namespace capy {
46
47 //------------------------------------------------------------------------------
48
49 class thread_pool::impl
50 {
51 struct work : detail::intrusive_queue<work>::node
52 {
53 std::coroutine_handle<> h_;
54
55 791x explicit work(std::coroutine_handle<> h) noexcept
56 791x : h_(h)
57 {
58 791x }
59
60 609x void run()
61 {
62 609x auto h = h_;
63
1/2
✓ Branch 0 taken 609 times.
✗ Branch 1 not taken.
609x delete this;
64
1/1
✓ Branch 1 taken 609 times.
609x h.resume();
65 609x }
66
67 182x void destroy()
68 {
69 182x auto h = h_;
70
1/2
✓ Branch 0 taken 182 times.
✗ Branch 1 not taken.
182x delete this;
71
5/6
✓ Branch 1 taken 182 times.
✗ Branch 2 not taken.
✓ Branch 6 taken 130 times.
✓ Branch 7 taken 52 times.
✓ Branch 8 taken 130 times.
✓ Branch 9 taken 52 times.
182x if(h && h != std::noop_coroutine())
72
1/1
✓ Branch 1 taken 130 times.
130x h.destroy();
73 182x }
74 };
75
76 std::mutex mutex_;
77 std::condition_variable cv_;
78 detail::intrusive_queue<work> q_;
79 std::vector<std::thread> threads_;
80 std::atomic<std::size_t> outstanding_work_{0};
81 bool stop_{false};
82 bool joined_{false};
83 std::size_t num_threads_;
84 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
85 std::once_flag start_flag_;
86
87 public:
88 151x ~impl()
89 {
90
2/2
✓ Branch 1 taken 182 times.
✓ Branch 2 taken 151 times.
333x while(auto* w = q_.pop())
91 182x w->destroy();
92 151x }
93
94 151x impl(std::size_t num_threads, std::string_view thread_name_prefix)
95 151x : num_threads_(num_threads)
96 {
97
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 149 times.
151x if(num_threads_ == 0)
98 4x num_threads_ = std::max(
99 2x std::thread::hardware_concurrency(), 1u);
100
101 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
102
1/1
✓ Branch 1 taken 151 times.
151x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
103 151x thread_name_prefix_[n] = '\0';
104 151x }
105
106 void
107 791x post(std::coroutine_handle<> h)
108 {
109 791x ensure_started();
110 791x auto* w = new work(h);
111 {
112
1/1
✓ Branch 1 taken 791 times.
791x std::lock_guard<std::mutex> lock(mutex_);
113 791x q_.push(w);
114 791x }
115 791x cv_.notify_one();
116 791x }
117
118 void
119 330x on_work_started() noexcept
120 {
121 330x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
122 330x }
123
124 void
125 330x on_work_finished() noexcept
126 {
127 330x if(outstanding_work_.fetch_sub(
128
2/2
✓ Branch 0 taken 79 times.
✓ Branch 1 taken 251 times.
330x 1, std::memory_order_acq_rel) == 1)
129 {
130 79x std::lock_guard<std::mutex> lock(mutex_);
131
4/4
✓ Branch 0 taken 56 times.
✓ Branch 1 taken 23 times.
✓ Branch 2 taken 4 times.
✓ Branch 3 taken 52 times.
79x if(joined_ && !stop_)
132 4x stop_ = true;
133 79x cv_.notify_all();
134 79x }
135 330x }
136
137 void
138 162x join() noexcept
139 {
140 {
141 162x std::unique_lock<std::mutex> lock(mutex_);
142
2/2
✓ Branch 0 taken 11 times.
✓ Branch 1 taken 151 times.
162x if(joined_)
143 11x return;
144 151x joined_ = true;
145
146 151x if(outstanding_work_.load(
147
2/2
✓ Branch 0 taken 95 times.
✓ Branch 1 taken 56 times.
151x std::memory_order_acquire) == 0)
148 {
149 95x stop_ = true;
150 95x cv_.notify_all();
151 }
152 else
153 {
154 56x cv_.wait(lock, [this]{
155 61x return stop_;
156 });
157 }
158 162x }
159
160
2/2
✓ Branch 5 taken 167 times.
✓ Branch 6 taken 151 times.
318x for(auto& t : threads_)
161
1/2
✓ Branch 1 taken 167 times.
✗ Branch 2 not taken.
167x if(t.joinable())
162 167x t.join();
163 }
164
165 void
166 153x stop() noexcept
167 {
168 {
169 153x std::lock_guard<std::mutex> lock(mutex_);
170 153x stop_ = true;
171 153x }
172 153x cv_.notify_all();
173 153x }
174
175 private:
176 void
177 791x ensure_started()
178 {
179
1/1
✓ Branch 1 taken 791 times.
791x std::call_once(start_flag_, [this]{
180 95x threads_.reserve(num_threads_);
181
2/2
✓ Branch 0 taken 167 times.
✓ Branch 1 taken 95 times.
262x for(std::size_t i = 0; i < num_threads_; ++i)
182
1/1
✓ Branch 2 taken 167 times.
334x threads_.emplace_back([this, i]{ run(i); });
183 95x });
184 791x }
185
186 void
187 167x run(std::size_t index)
188 {
189 // Build name; set_current_thread_name truncates to platform limits.
190 char name[16];
191 167x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
192 167x set_current_thread_name(name);
193
194 for(;;)
195 {
196 776x work* w = nullptr;
197 {
198
1/1
✓ Branch 1 taken 776 times.
776x std::unique_lock<std::mutex> lock(mutex_);
199
1/1
✓ Branch 1 taken 776 times.
776x cv_.wait(lock, [this]{
200
2/2
✓ Branch 1 taken 275 times.
✓ Branch 2 taken 703 times.
1253x return !q_.empty() ||
201
2/2
✓ Branch 0 taken 73 times.
✓ Branch 1 taken 202 times.
1253x stop_;
202 });
203
2/2
✓ Branch 0 taken 167 times.
✓ Branch 1 taken 609 times.
776x if(stop_)
204 334x return;
205 609x w = q_.pop();
206 776x }
207
1/2
✓ Branch 0 taken 609 times.
✗ Branch 1 not taken.
609x if(w)
208
1/1
✓ Branch 1 taken 609 times.
609x w->run();
209 609x }
210 }
211 };
212
213 //------------------------------------------------------------------------------
214
215 151x thread_pool::
216 ~thread_pool()
217 {
218 151x impl_->stop();
219 151x impl_->join();
220 151x shutdown();
221 151x destroy();
222
1/2
✓ Branch 0 taken 151 times.
✗ Branch 1 not taken.
151x delete impl_;
223 151x }
224
225 151x thread_pool::
226 151x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
227
2/4
✓ Branch 2 taken 151 times.
✓ Branch 5 taken 151 times.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
151x : impl_(new impl(num_threads, thread_name_prefix))
228 {
229
1/1
✓ Branch 1 taken 151 times.
151x this->set_frame_allocator(std::allocator<void>{});
230 151x }
231
232 void
233 11x thread_pool::
234 join() noexcept
235 {
236 11x impl_->join();
237 11x }
238
239 void
240 2x thread_pool::
241 stop() noexcept
242 {
243 2x impl_->stop();
244 2x }
245
246 //------------------------------------------------------------------------------
247
248 thread_pool::executor_type
249 148x thread_pool::
250 get_executor() const noexcept
251 {
252 148x return executor_type(
253 148x const_cast<thread_pool&>(*this));
254 }
255
256 void
257 330x thread_pool::executor_type::
258 on_work_started() const noexcept
259 {
260 330x pool_->impl_->on_work_started();
261 330x }
262
263 void
264 330x thread_pool::executor_type::
265 on_work_finished() const noexcept
266 {
267 330x pool_->impl_->on_work_finished();
268 330x }
269
270 void
271 791x thread_pool::executor_type::
272 post(std::coroutine_handle<> h) const
273 {
274 791x pool_->impl_->post(h);
275 791x }
276
277 } // capy
278 } // boost
279