Bitcoin ABC  0.22.13
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <scheduler.h>
6 
7 #include <random.h>
8 
9 #include <cassert>
10 #include <utility>
11 
13 
15  assert(nThreadsServicingQueue == 0);
16  if (stopWhenEmpty) {
17  assert(taskQueue.empty());
18  }
19 }
20 
22  WAIT_LOCK(newTaskMutex, lock);
23  ++nThreadsServicingQueue;
24 
25  // newTaskMutex is locked throughout this loop EXCEPT when the thread is
26  // waiting or when the user's function is called.
27  while (!shouldStop()) {
28  try {
29  if (!shouldStop() && taskQueue.empty()) {
30  REVERSE_LOCK(lock);
31  }
32  while (!shouldStop() && taskQueue.empty()) {
33  // Wait until there is something to do.
34  newTaskScheduled.wait(lock);
35  }
36 
37  // Wait until either there is a new task, or until
38  // the time of the first item on the queue:
39 
40  while (!shouldStop() && !taskQueue.empty()) {
42  taskQueue.begin()->first;
43  if (newTaskScheduled.wait_until(lock, timeToWaitFor) ==
44  std::cv_status::timeout) {
45  // Exit loop after timeout, it means we reached the time of
46  // the event
47  break;
48  }
49  }
50 
51  // If there are multiple threads, the queue can empty while we're
52  // waiting (another thread may service the task we were waiting on).
53  if (shouldStop() || taskQueue.empty()) {
54  continue;
55  }
56 
57  Function f = taskQueue.begin()->second;
58  taskQueue.erase(taskQueue.begin());
59 
60  {
61  // Unlock before calling f, so it can reschedule itself or
62  // another task without deadlocking:
63  REVERSE_LOCK(lock);
64  f();
65  }
66  } catch (...) {
67  --nThreadsServicingQueue;
68  throw;
69  }
70  }
71  --nThreadsServicingQueue;
72  newTaskScheduled.notify_one();
73 }
74 
75 void CScheduler::stop(bool drain) {
76  {
78  if (drain) {
79  stopWhenEmpty = true;
80  } else {
81  stopRequested = true;
82  }
83  }
84  newTaskScheduled.notify_all();
85 }
86 
89  {
91  taskQueue.insert(std::make_pair(t, f));
92  }
93  newTaskScheduled.notify_one();
94 }
95 
96 void CScheduler::MockForward(std::chrono::seconds delta_seconds) {
97  assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
98 
99  {
101 
102  // use temp_queue to maintain updated schedule
103  std::multimap<std::chrono::system_clock::time_point, Function>
104  temp_queue;
105 
106  for (const auto &element : taskQueue) {
107  temp_queue.emplace_hint(temp_queue.cend(),
108  element.first - delta_seconds,
109  element.second);
110  }
111 
112  // point taskQueue to temp_queue
113  taskQueue = std::move(temp_queue);
114  }
115 
116  // notify that the taskQueue needs to be processed
117  newTaskScheduled.notify_one();
118 }
119 
121  std::chrono::milliseconds delta) {
122  if (p()) {
123  s.scheduleFromNow([=, &s] { Repeat(s, p, delta); }, delta);
124  }
125 }
126 
128  std::chrono::milliseconds delta) {
129  scheduleFromNow([=] { Repeat(*this, p, delta); }, delta);
130 }
131 
132 size_t
136  size_t result = taskQueue.size();
137  if (!taskQueue.empty()) {
138  first = taskQueue.begin()->first;
139  last = taskQueue.rbegin()->first;
140  }
141  return result;
142 }
143 
146  return nThreadsServicingQueue;
147 }
148 
150  {
151  LOCK(m_cs_callbacks_pending);
152  // Try to avoid scheduling too many copies here, but if we
153  // accidentally have two ProcessQueue's scheduled at once its
154  // not a big deal.
155  if (m_are_callbacks_running) {
156  return;
157  }
158  if (m_callbacks_pending.empty()) {
159  return;
160  }
161  }
162  m_pscheduler->schedule(
164  std::chrono::system_clock::now());
165 }
166 
168  std::function<void()> callback;
169  {
170  LOCK(m_cs_callbacks_pending);
171  if (m_are_callbacks_running) {
172  return;
173  }
174  if (m_callbacks_pending.empty()) {
175  return;
176  }
177  m_are_callbacks_running = true;
178 
179  callback = std::move(m_callbacks_pending.front());
180  m_callbacks_pending.pop_front();
181  }
182 
183  // RAII the setting of fCallbacksRunning and calling
184  // MaybeScheduleProcessQueue to ensure both happen safely even if callback()
185  // throws.
186  struct RAIICallbacksRunning {
188  explicit RAIICallbacksRunning(SingleThreadedSchedulerClient *_instance)
189  : instance(_instance) {}
190  ~RAIICallbacksRunning() {
191  {
192  LOCK(instance->m_cs_callbacks_pending);
193  instance->m_are_callbacks_running = false;
194  }
195  instance->MaybeScheduleProcessQueue();
196  }
197  } raiicallbacksrunning(this);
198 
199  callback();
200 }
201 
203  std::function<void()> func) {
204  assert(m_pscheduler);
205 
206  {
207  LOCK(m_cs_callbacks_pending);
208  m_callbacks_pending.emplace_back(std::move(func));
209  }
210  MaybeScheduleProcessQueue();
211 }
212 
214  assert(!m_pscheduler->AreThreadsServicingQueue());
215  bool should_continue = true;
216  while (should_continue) {
217  ProcessQueue();
218  LOCK(m_cs_callbacks_pending);
219  should_continue = !m_callbacks_pending.empty();
220  }
221 }
222 
224  LOCK(m_cs_callbacks_pending);
225  return m_callbacks_pending.size();
226 }
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:115
std::function< bool()> Predicate
Definition: scheduler.h:47
void MockForward(std::chrono::seconds delta_seconds)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:96
std::function< void()> Function
Definition: scheduler.h:46
Mutex newTaskMutex
Definition: scheduler.h:93
#define REVERSE_LOCK(g)
Definition: sync.h:222
void AddToProcessQueue(std::function< void()> func)
Add a callback to be executed.
Definition: scheduler.cpp:202
std::condition_variable newTaskScheduled
Definition: scheduler.h:94
void stop(bool drain=false)
Definition: scheduler.cpp:75
#define LOCK(cs)
Definition: sync.h:230
void schedule(Function f, std::chrono::system_clock::time_point t)
Definition: scheduler.cpp:87
void scheduleEvery(Predicate p, std::chrono::milliseconds delta)
Repeat p until it return false.
Definition: scheduler.cpp:127
RecursiveMutex m_cs_callbacks_pending
Definition: scheduler.h:119
void serviceQueue()
Definition: scheduler.cpp:21
#define WAIT_LOCK(cs, name)
Definition: sync.h:238
static void Repeat(CScheduler &s, CScheduler::Predicate p, std::chrono::milliseconds delta)
Definition: scheduler.cpp:120
bool AreThreadsServicingQueue() const
Definition: scheduler.cpp:144
clock::time_point time_point
Definition: bench.h:50
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
Definition: scheduler.cpp:133
void scheduleFromNow(Function f, std::chrono::milliseconds delta)
Call f once after the delta has passed.
Definition: scheduler.h:53
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:100