Bitcoin ABC 0.32.4
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1// Copyright (c) 2012-2018 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef BITCOIN_CHECKQUEUE_H
6#define BITCOIN_CHECKQUEUE_H
7
8#include <sync.h>
9#include <tinyformat.h>
10#include <util/threadnames.h>
11
12#include <algorithm>
13#include <iterator>
14#include <optional>
15#include <vector>
16
29template <typename T,
30 typename R =
31 std::remove_cvref_t<decltype(std::declval<T>()().value())>>
33private:
36
38 std::condition_variable m_worker_cv;
39
41 std::condition_variable m_master_cv;
42
45 std::vector<T> queue GUARDED_BY(m_mutex);
46
48 int nIdle GUARDED_BY(m_mutex){0};
49
51 int nTotal GUARDED_BY(m_mutex){0};
52
54 std::optional<R> m_result GUARDED_BY(m_mutex);
55
61 unsigned int nTodo GUARDED_BY(m_mutex){0};
62
64 const unsigned int nBatchSize;
65
66 std::vector<std::thread> m_worker_threads;
67 bool m_request_stop GUARDED_BY(m_mutex){false};
68
73 std::optional<R> Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {
74 std::condition_variable &cond = fMaster ? m_master_cv : m_worker_cv;
75 std::vector<T> vChecks;
76 vChecks.reserve(nBatchSize);
77 unsigned int nNow = 0;
78 std::optional<R> local_result;
79 bool do_work;
80 do {
81 {
82 WAIT_LOCK(m_mutex, lock);
83 // first do the clean-up of the previous loop run (allowing us
84 // to do it in the same critsect)
85 if (nNow) {
86 if (local_result.has_value() && !m_result.has_value()) {
87 std::swap(local_result, m_result);
88 }
89 nTodo -= nNow;
90 if (nTodo == 0 && !fMaster) {
91 // We processed the last element; inform the master it
92 // can exit and return the result
93 m_master_cv.notify_one();
94 }
95 } else {
96 // first iteration
97 nTotal++;
98 }
99 // logically, the do loop starts here
100 while (queue.empty() && !m_request_stop) {
101 if (fMaster && nTodo == 0) {
102 nTotal--;
103 std::optional<R> to_return = std::move(m_result);
104 // reset the status for new work later
105 m_result = std::nullopt;
106 // return the current status
107 return to_return;
108 }
109 nIdle++;
110 cond.wait(lock); // wait
111 nIdle--;
112 }
113 if (m_request_stop) {
114 // return value does not matter, because m_request_stop is
115 // only set in the destructor.
116 return std::nullopt;
117 }
118
119 // Decide how many work units to process now.
120 // * Do not try to do everything at once, but aim for
121 // increasingly smaller batches so all workers finish
122 // approximately simultaneously.
123 // * Try to account for idle jobs which will instantly start
124 // helping.
125 // * Don't do batches smaller than 1 (duh), or larger than
126 // nBatchSize.
127 nNow = std::max(
128 1U, std::min(nBatchSize, (unsigned int)queue.size() /
129 (nTotal + nIdle + 1)));
130 auto start_it = queue.end() - nNow;
131 vChecks.assign(std::make_move_iterator(start_it),
132 std::make_move_iterator(queue.end()));
133 queue.erase(start_it, queue.end());
134 // Check whether we need to do work at all
135 do_work = !m_result.has_value();
136 }
137 // execute work
138 if (do_work) {
139 for (T &check : vChecks) {
140 local_result = check();
141 if (local_result.has_value()) {
142 break;
143 }
144 }
145 }
146 vChecks.clear();
147 } while (true);
148 }
149
150public:
153
155 explicit CCheckQueue(unsigned int nBatchSizeIn)
156 : nBatchSize(nBatchSizeIn) {}
157
159 void StartWorkerThreads(const int threads_num)
160 EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {
161 {
162 LOCK(m_mutex);
163 nIdle = 0;
164 nTotal = 0;
165 m_result = std::nullopt;
166 }
167 assert(m_worker_threads.empty());
168 for (int n = 0; n < threads_num; ++n) {
169 m_worker_threads.emplace_back([this, n]() {
170 util::ThreadRename(strprintf("scriptch.%i", n));
171 Loop(false /* worker thread */);
172 });
173 }
174 }
175
178 std::optional<R> Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {
179 return Loop(true /* master thread */);
180 }
181
183 void Add(std::vector<T> &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) {
184 LOCK(m_mutex);
185 queue.insert(queue.end(), std::make_move_iterator(vChecks.begin()),
186 std::make_move_iterator(vChecks.end()));
187 nTodo += vChecks.size();
188 if (vChecks.size() == 1) {
189 m_worker_cv.notify_one();
190 } else if (vChecks.size() > 1) {
191 m_worker_cv.notify_all();
192 }
193 }
194
197 WITH_LOCK(m_mutex, m_request_stop = true);
198 m_worker_cv.notify_all();
199 for (std::thread &t : m_worker_threads) {
200 t.join();
201 }
202 m_worker_threads.clear();
203 WITH_LOCK(m_mutex, m_request_stop = false);
204 }
205
206 ~CCheckQueue() { assert(m_worker_threads.empty()); }
207};
208
213template <typename T,
214 typename R =
215 std::remove_cvref_t<decltype(std::declval<T>()().value())>>
217private:
219 bool fDone;
220
221public:
225 explicit CCheckQueueControl(CCheckQueue<T> *const pqueueIn)
226 : pqueue(pqueueIn), fDone(false) {
227 // passed queue is supposed to be unused, or nullptr
228 if (pqueue != nullptr) {
230 }
231 }
232
233 std::optional<R> Complete() {
234 if (pqueue == nullptr) {
235 return std::nullopt;
236 }
237 auto ret = pqueue->Complete();
238 fDone = true;
239 return ret;
240 }
241
242 void Add(std::vector<T> &&vChecks) {
243 if (pqueue != nullptr) {
244 pqueue->Add(std::move(vChecks));
245 }
246 }
247
249 if (!fDone) {
250 Complete();
251 }
252 if (pqueue != nullptr) {
254 }
255 }
256};
257
258#endif // BITCOIN_CHECKQUEUE_H
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:216
CCheckQueueControl & operator=(const CCheckQueueControl &)=delete
CCheckQueueControl(const CCheckQueueControl &)=delete
CCheckQueueControl()=delete
CCheckQueue< T, R > *const pqueue
Definition: checkqueue.h:218
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:225
std::optional< R > Complete()
Definition: checkqueue.h:233
void Add(std::vector< T > &&vChecks)
Definition: checkqueue.h:242
The verifications are represented by a type T, which must provide an operator(), returning an std::op...
Definition: checkqueue.h:32
bool m_request_stop GUARDED_BY(m_mutex)
Definition: checkqueue.h:67
std::optional< R > Complete() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Join the execution until completion.
Definition: checkqueue.h:178
unsigned int nTodo GUARDED_BY(m_mutex)
Number of verifications that haven't completed yet.
Definition: checkqueue.h:61
Mutex m_control_mutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:152
std::vector< T > queue GUARDED_BY(m_mutex)
The queue of elements to be processed.
void Add(std::vector< T > &&vChecks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Add a batch of checks to the queue.
Definition: checkqueue.h:183
int nIdle GUARDED_BY(m_mutex)
The number of workers (including the master) that are idle.
Definition: checkqueue.h:48
int nTotal GUARDED_BY(m_mutex)
The total number of workers (including the master).
Definition: checkqueue.h:51
Mutex m_mutex
Mutex to protect the inner state.
Definition: checkqueue.h:35
std::optional< R > Loop(bool fMaster) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:73
std::condition_variable m_worker_cv
Worker threads block on this when out of work.
Definition: checkqueue.h:38
std::vector< std::thread > m_worker_threads
Definition: checkqueue.h:66
const unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:64
std::optional< R > m_result GUARDED_BY(m_mutex)
The temporary evaluation result.
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:155
void StopWorkerThreads() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Stop all of the worker threads.
Definition: checkqueue.h:196
void StartWorkerThreads(const int threads_num) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Create a pool of new worker threads.
Definition: checkqueue.h:159
std::condition_variable m_master_cv
Master thread blocks on this when out of work.
Definition: checkqueue.h:41
void ThreadRename(std::string &&)
Rename a thread both in terms of an internal (in-memory) name as well as its system thread name.
Definition: threadnames.cpp:48
#define WAIT_LOCK(cs, name)
Definition: sync.h:317
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:320
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:326
#define LOCK(cs)
Definition: sync.h:306
#define WITH_LOCK(cs, code)
Run code while locking a mutex.
Definition: sync.h:357
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:56
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1202
assert(!tx.IsCoinBase())