Bitcoin ABC  0.22.13
P2P Digital Currency
checkqueue.h
Go to the documentation of this file.
1 // Copyright (c) 2012-2018 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_CHECKQUEUE_H
6 #define BITCOIN_CHECKQUEUE_H
7 
8 #include <sync.h>
9 
10 #include <algorithm>
11 #include <vector>
12 
13 #include <boost/thread/condition_variable.hpp>
14 #include <boost/thread/mutex.hpp>
15 
16 template <typename T> class CCheckQueueControl;
17 
28 template <typename T> class CCheckQueue {
29 private:
31  boost::mutex mutex;
32 
34  boost::condition_variable condWorker;
35 
37  boost::condition_variable condMaster;
38 
41  std::vector<T> queue;
42 
44  int nIdle;
45 
47  int nTotal;
48 
50  bool fAllOk;
51 
57  unsigned int nTodo;
58 
60  unsigned int nBatchSize;
61 
63  bool Loop(bool fMaster = false) {
64  boost::condition_variable &cond = fMaster ? condMaster : condWorker;
65  std::vector<T> vChecks;
66  vChecks.reserve(nBatchSize);
67  unsigned int nNow = 0;
68  bool fOk = true;
69  do {
70  {
71  boost::unique_lock<boost::mutex> lock(mutex);
72  // first do the clean-up of the previous loop run (allowing us
73  // to do it in the same critsect)
74  if (nNow) {
75  fAllOk &= fOk;
76  nTodo -= nNow;
77  if (nTodo == 0 && !fMaster) {
78  // We processed the last element; inform the master it
79  // can exit and return the result
80  condMaster.notify_one();
81  }
82  } else {
83  // first iteration
84  nTotal++;
85  }
86  // logically, the do loop starts here
87  while (queue.empty()) {
88  if (fMaster && nTodo == 0) {
89  nTotal--;
90  bool fRet = fAllOk;
91  // reset the status for new work later
92  fAllOk = true;
93  // return the current status
94  return fRet;
95  }
96  nIdle++;
97  cond.wait(lock); // wait
98  nIdle--;
99  }
100  // Decide how many work units to process now.
101  // * Do not try to do everything at once, but aim for
102  // increasingly smaller batches so all workers finish
103  // approximately simultaneously.
104  // * Try to account for idle jobs which will instantly start
105  // helping.
106  // * Don't do batches smaller than 1 (duh), or larger than
107  // nBatchSize.
108  nNow = std::max(
109  1U, std::min(nBatchSize, (unsigned int)queue.size() /
110  (nTotal + nIdle + 1)));
111  vChecks.resize(nNow);
112  for (unsigned int i = 0; i < nNow; i++) {
113  // We want the lock on the mutex to be as short as possible,
114  // so swap jobs from the global queue to the local batch
115  // vector instead of copying.
116  vChecks[i].swap(queue.back());
117  queue.pop_back();
118  }
119  // Check whether we need to do work at all
120  fOk = fAllOk;
121  }
122  // execute work
123  for (T &check : vChecks) {
124  if (fOk) {
125  fOk = check();
126  }
127  }
128  vChecks.clear();
129  } while (true);
130  }
131 
132 public:
134  boost::mutex ControlMutex;
135 
137  explicit CCheckQueue(unsigned int nBatchSizeIn)
138  : nIdle(0), nTotal(0), fAllOk(true), nTodo(0),
139  nBatchSize(nBatchSizeIn) {}
140 
142  void Thread() { Loop(); }
143 
146  bool Wait() { return Loop(true); }
147 
149  void Add(std::vector<T> &vChecks) {
150  boost::unique_lock<boost::mutex> lock(mutex);
151  for (T &check : vChecks) {
152  queue.push_back(T());
153  check.swap(queue.back());
154  }
155  nTodo += vChecks.size();
156  if (vChecks.size() == 1) {
157  condWorker.notify_one();
158  } else if (vChecks.size() > 1) {
159  condWorker.notify_all();
160  }
161  }
162 
164 };
165 
170 template <typename T> class CCheckQueueControl {
171 private:
173  bool fDone;
174 
175 public:
176  CCheckQueueControl() = delete;
177  CCheckQueueControl(const CCheckQueueControl &) = delete;
178  CCheckQueueControl &operator=(const CCheckQueueControl &) = delete;
179  explicit CCheckQueueControl(CCheckQueue<T> *const pqueueIn)
180  : pqueue(pqueueIn), fDone(false) {
181  // passed queue is supposed to be unused, or nullptr
182  if (pqueue != nullptr) {
184  }
185  }
186 
187  bool Wait() {
188  if (pqueue == nullptr) {
189  return true;
190  }
191  bool fRet = pqueue->Wait();
192  fDone = true;
193  return fRet;
194  }
195 
196  void Add(std::vector<T> &vChecks) {
197  if (pqueue != nullptr) {
198  pqueue->Add(vChecks);
199  }
200  }
201 
203  if (!fDone) {
204  Wait();
205  }
206  if (pqueue != nullptr) {
208  }
209  }
210 };
211 
212 #endif // BITCOIN_CHECKQUEUE_H
void Add(std::vector< T > &vChecks)
Definition: checkqueue.h:196
boost::condition_variable condWorker
Worker threads block on this when out of work.
Definition: checkqueue.h:34
boost::mutex mutex
Mutex to protect the inner state.
Definition: checkqueue.h:31
boost::condition_variable condMaster
Master thread blocks on this when out of work.
Definition: checkqueue.h:37
bool Loop(bool fMaster=false)
Internal function that does bulk of the verification work.
Definition: checkqueue.h:63
CCheckQueueControl(CCheckQueue< T > *const pqueueIn)
Definition: checkqueue.h:179
void Thread()
Worker thread.
Definition: checkqueue.h:142
RAII-style controller object for a CCheckQueue that guarantees the passed queue is finished before co...
Definition: checkqueue.h:16
CCheckQueue(unsigned int nBatchSizeIn)
Create a new check queue.
Definition: checkqueue.h:137
std::vector< T > queue
The queue of elements to be processed.
Definition: checkqueue.h:41
bool fAllOk
The temporary evaluation result.
Definition: checkqueue.h:50
#define LEAVE_CRITICAL_SECTION(cs)
Definition: sync.h:247
int nTotal
The total number of workers (including the master).
Definition: checkqueue.h:47
Queue for verifications that have to be performed.
Definition: checkqueue.h:28
CCheckQueue< T > *const pqueue
Definition: checkqueue.h:172
#define ENTER_CRITICAL_SECTION(cs)
Definition: sync.h:241
bool Wait()
Wait until execution finishes, and return whether all evaluations were successful.
Definition: checkqueue.h:146
int nIdle
The number of workers (including the master) that are idle.
Definition: checkqueue.h:44
unsigned int nTodo
Number of verifications that haven&#39;t completed yet.
Definition: checkqueue.h:57
void Add(std::vector< T > &vChecks)
Add a batch of checks to the queue.
Definition: checkqueue.h:149
unsigned int nBatchSize
The maximum number of elements to be processed in one batch.
Definition: checkqueue.h:60
boost::mutex ControlMutex
Mutex to ensure only one concurrent CCheckQueueControl.
Definition: checkqueue.h:134