TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #include <boost/capy/ex/thread_pool.hpp>
12 : #include <boost/capy/detail/intrusive.hpp>
13 : #include <boost/capy/test/thread_name.hpp>
14 : #include <algorithm>
15 : #include <atomic>
16 : #include <condition_variable>
17 : #include <cstdio>
18 : #include <mutex>
19 : #include <thread>
20 : #include <vector>
21 :
22 : /*
23 : Thread pool implementation using a shared work queue.
24 :
25 : Work items are coroutine handles wrapped in intrusive list nodes, stored
26 : in a single queue protected by a mutex. Worker threads wait on a
27 : condition_variable until work is available or stop is requested.
28 :
29 : Threads are started lazily on first post() via std::call_once to avoid
30 : spawning threads for pools that are constructed but never used. Each
31 : thread is named with a configurable prefix plus index for debugger
32 : visibility.
33 :
34 : Work tracking: on_work_started/on_work_finished maintain an atomic
35 : outstanding_work_ counter. join() blocks until this counter reaches
36 : zero, then signals workers to stop and joins threads.
37 :
38 : Two shutdown paths:
39 : - join(): waits for outstanding work to drain, then stops workers.
40 : - stop(): immediately signals workers to exit; queued work is abandoned.
41 : - Destructor: stop() then join() (abandon + wait for threads).
42 : */
43 :
44 : namespace boost {
45 : namespace capy {
46 :
47 : //------------------------------------------------------------------------------
48 :
49 : class thread_pool::impl
50 : {
51 : struct work : detail::intrusive_queue<work>::node
52 : {
53 : std::coroutine_handle<> h_;
54 :
55 HIT 791 : explicit work(std::coroutine_handle<> h) noexcept
56 791 : : h_(h)
57 : {
58 791 : }
59 :
60 609 : void run()
61 : {
62 609 : auto h = h_;
63 609 : delete this;
64 609 : h.resume();
65 609 : }
66 :
67 182 : void destroy()
68 : {
69 182 : auto h = h_;
70 182 : delete this;
71 182 : if(h && h != std::noop_coroutine())
72 130 : h.destroy();
73 182 : }
74 : };
75 :
76 : std::mutex mutex_;
77 : std::condition_variable cv_;
78 : detail::intrusive_queue<work> q_;
79 : std::vector<std::thread> threads_;
80 : std::atomic<std::size_t> outstanding_work_{0};
81 : bool stop_{false};
82 : bool joined_{false};
83 : std::size_t num_threads_;
84 : char thread_name_prefix_[13]{}; // 12 chars max + null terminator
85 : std::once_flag start_flag_;
86 :
87 : public:
88 151 : ~impl()
89 : {
90 333 : while(auto* w = q_.pop())
91 182 : w->destroy();
92 151 : }
93 :
94 151 : impl(std::size_t num_threads, std::string_view thread_name_prefix)
95 151 : : num_threads_(num_threads)
96 : {
97 151 : if(num_threads_ == 0)
98 4 : num_threads_ = std::max(
99 2 : std::thread::hardware_concurrency(), 1u);
100 :
101 : // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
102 151 : auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
103 151 : thread_name_prefix_[n] = '\0';
104 151 : }
105 :
106 : void
107 791 : post(std::coroutine_handle<> h)
108 : {
109 791 : ensure_started();
110 791 : auto* w = new work(h);
111 : {
112 791 : std::lock_guard<std::mutex> lock(mutex_);
113 791 : q_.push(w);
114 791 : }
115 791 : cv_.notify_one();
116 791 : }
117 :
118 : void
119 330 : on_work_started() noexcept
120 : {
121 330 : outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
122 330 : }
123 :
124 : void
125 330 : on_work_finished() noexcept
126 : {
127 330 : if(outstanding_work_.fetch_sub(
128 330 : 1, std::memory_order_acq_rel) == 1)
129 : {
130 79 : std::lock_guard<std::mutex> lock(mutex_);
131 79 : if(joined_ && !stop_)
132 4 : stop_ = true;
133 79 : cv_.notify_all();
134 79 : }
135 330 : }
136 :
137 : void
138 162 : join() noexcept
139 : {
140 : {
141 162 : std::unique_lock<std::mutex> lock(mutex_);
142 162 : if(joined_)
143 11 : return;
144 151 : joined_ = true;
145 :
146 151 : if(outstanding_work_.load(
147 151 : std::memory_order_acquire) == 0)
148 : {
149 95 : stop_ = true;
150 95 : cv_.notify_all();
151 : }
152 : else
153 : {
154 56 : cv_.wait(lock, [this]{
155 61 : return stop_;
156 : });
157 : }
158 162 : }
159 :
160 318 : for(auto& t : threads_)
161 167 : if(t.joinable())
162 167 : t.join();
163 : }
164 :
165 : void
166 153 : stop() noexcept
167 : {
168 : {
169 153 : std::lock_guard<std::mutex> lock(mutex_);
170 153 : stop_ = true;
171 153 : }
172 153 : cv_.notify_all();
173 153 : }
174 :
175 : private:
176 : void
177 791 : ensure_started()
178 : {
179 791 : std::call_once(start_flag_, [this]{
180 95 : threads_.reserve(num_threads_);
181 262 : for(std::size_t i = 0; i < num_threads_; ++i)
182 334 : threads_.emplace_back([this, i]{ run(i); });
183 95 : });
184 791 : }
185 :
186 : void
187 167 : run(std::size_t index)
188 : {
189 : // Build name; set_current_thread_name truncates to platform limits.
190 : char name[16];
191 167 : std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
192 167 : set_current_thread_name(name);
193 :
194 : for(;;)
195 : {
196 776 : work* w = nullptr;
197 : {
198 776 : std::unique_lock<std::mutex> lock(mutex_);
199 776 : cv_.wait(lock, [this]{
200 1253 : return !q_.empty() ||
201 1253 : stop_;
202 : });
203 776 : if(stop_)
204 334 : return;
205 609 : w = q_.pop();
206 776 : }
207 609 : if(w)
208 609 : w->run();
209 609 : }
210 : }
211 : };
212 :
213 : //------------------------------------------------------------------------------
214 :
215 151 : thread_pool::
216 : ~thread_pool()
217 : {
218 151 : impl_->stop();
219 151 : impl_->join();
220 151 : shutdown();
221 151 : destroy();
222 151 : delete impl_;
223 151 : }
224 :
225 151 : thread_pool::
226 151 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
227 151 : : impl_(new impl(num_threads, thread_name_prefix))
228 : {
229 151 : this->set_frame_allocator(std::allocator<void>{});
230 151 : }
231 :
232 : void
233 11 : thread_pool::
234 : join() noexcept
235 : {
236 11 : impl_->join();
237 11 : }
238 :
239 : void
240 2 : thread_pool::
241 : stop() noexcept
242 : {
243 2 : impl_->stop();
244 2 : }
245 :
246 : //------------------------------------------------------------------------------
247 :
248 : thread_pool::executor_type
249 148 : thread_pool::
250 : get_executor() const noexcept
251 : {
252 148 : return executor_type(
253 148 : const_cast<thread_pool&>(*this));
254 : }
255 :
256 : void
257 330 : thread_pool::executor_type::
258 : on_work_started() const noexcept
259 : {
260 330 : pool_->impl_->on_work_started();
261 330 : }
262 :
263 : void
264 330 : thread_pool::executor_type::
265 : on_work_finished() const noexcept
266 : {
267 330 : pool_->impl_->on_work_finished();
268 330 : }
269 :
270 : void
271 791 : thread_pool::executor_type::
272 : post(std::coroutine_handle<> h) const
273 : {
274 791 : pool_->impl_->post(h);
275 791 : }
276 :
277 : } // capy
278 : } // boost
|