Bitcoin ABC 0.33.6
P2P Digital Currency
scheduler.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-2016 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <scheduler.h>
6
7#include <sync.h>
8#include <util/time.h>
9
10#include <cassert>
11#include <chrono>
12#include <functional>
13#include <utility>
14
15CScheduler::CScheduler() = default;
16
18 assert(nThreadsServicingQueue == 0);
19 if (stopWhenEmpty) {
20 assert(taskQueue.empty());
21 }
22}
23
26 ++nThreadsServicingQueue;
27
28 // newTaskMutex is locked throughout this loop EXCEPT when the thread is
29 // waiting or when the user's function is called.
30 while (!shouldStop()) {
31 try {
32 while (!shouldStop() && taskQueue.empty()) {
33 // Wait until there is something to do.
34 newTaskScheduled.wait(lock);
35 }
36
37 // Wait until either there is a new task, or until
38 // the time of the first item on the queue:
39
40 while (!shouldStop() && !taskQueue.empty()) {
41 std::chrono::steady_clock::time_point timeToWaitFor =
42 taskQueue.begin()->first;
43 if (newTaskScheduled.wait_until(lock, timeToWaitFor) ==
44 std::cv_status::timeout) {
45 // Exit loop after timeout, it means we reached the time of
46 // the event
47 break;
48 }
49 }
50
51 // If there are multiple threads, the queue can empty while we're
52 // waiting (another thread may service the task we were waiting on).
53 if (shouldStop() || taskQueue.empty()) {
54 continue;
55 }
56
57 Function f = taskQueue.begin()->second;
58 taskQueue.erase(taskQueue.begin());
59
60 {
61 // Unlock before calling f, so it can reschedule itself or
62 // another task without deadlocking:
63 REVERSE_LOCK(lock);
64 f();
65 }
66 } catch (...) {
67 --nThreadsServicingQueue;
68 throw;
69 }
70 }
71 --nThreadsServicingQueue;
72 newTaskScheduled.notify_one();
73}
74
76 std::chrono::steady_clock::time_point t) {
77 {
79 taskQueue.insert(std::make_pair(t, f));
80 }
81 newTaskScheduled.notify_one();
82}
83
84void CScheduler::MockForward(std::chrono::seconds delta_seconds) {
85 assert(delta_seconds > 0s && delta_seconds <= 1h);
86
87 {
89
90 // use temp_queue to maintain updated schedule
91 std::multimap<std::chrono::steady_clock::time_point, Function>
92 temp_queue;
93
94 for (const auto &element : taskQueue) {
95 temp_queue.emplace_hint(temp_queue.cend(),
96 element.first - delta_seconds,
97 element.second);
98 }
99
100 // point taskQueue to temp_queue
101 taskQueue = std::move(temp_queue);
102 }
103
104 // notify that the taskQueue needs to be processed
105 newTaskScheduled.notify_one();
106}
107
109 std::chrono::milliseconds delta) {
110 if (p()) {
111 s.scheduleFromNow([=, &s] { Repeat(s, p, delta); }, delta);
112 }
113}
114
116 std::chrono::milliseconds delta) {
117 scheduleFromNow([this, p, delta] { Repeat(*this, p, delta); }, delta);
118}
119
120size_t
121CScheduler::getQueueInfo(std::chrono::steady_clock::time_point &first,
122 std::chrono::steady_clock::time_point &last) const {
124 size_t result = taskQueue.size();
125 if (!taskQueue.empty()) {
126 first = taskQueue.begin()->first;
127 last = taskQueue.rbegin()->first;
128 }
129 return result;
130}
131
134 return nThreadsServicingQueue;
135}
136
138 {
140 // Try to avoid scheduling too many copies here, but if we
141 // accidentally have two ProcessQueue's scheduled at once its
142 // not a big deal.
143 if (m_are_callbacks_running) {
144 return;
145 }
146 if (m_callbacks_pending.empty()) {
147 return;
148 }
149 }
150 m_scheduler.schedule([this] { this->ProcessQueue(); },
151 std::chrono::steady_clock::now());
152}
153
155 std::function<void()> callback;
156 {
158 if (m_are_callbacks_running) {
159 return;
160 }
161 if (m_callbacks_pending.empty()) {
162 return;
163 }
164 m_are_callbacks_running = true;
165
166 callback = std::move(m_callbacks_pending.front());
167 m_callbacks_pending.pop_front();
168 }
169
170 // RAII the setting of fCallbacksRunning and calling
171 // MaybeScheduleProcessQueue to ensure both happen safely even if callback()
172 // throws.
173 struct RAIICallbacksRunning {
175 explicit RAIICallbacksRunning(SingleThreadedSchedulerClient *_instance)
176 : instance(_instance) {}
177 ~RAIICallbacksRunning() {
178 {
179 LOCK(instance->m_callbacks_mutex);
180 instance->m_are_callbacks_running = false;
181 }
182 instance->MaybeScheduleProcessQueue();
183 }
184 } raiicallbacksrunning(this);
185
186 callback();
187}
188
190 std::function<void()> func) {
191 {
193 m_callbacks_pending.emplace_back(std::move(func));
194 }
196}
197
200 bool should_continue = true;
201 while (should_continue) {
202 ProcessQueue();
204 should_continue = !m_callbacks_pending.empty();
205 }
206}
207
210 return m_callbacks_pending.size();
211}
Simple class for background tasks that should be run periodically or once "after a while".
Definition: scheduler.h:41
void MockForward(std::chrono::seconds delta_seconds) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:84
void serviceQueue() EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Services the queue 'forever'.
Definition: scheduler.cpp:24
std::function< bool()> Predicate
Definition: scheduler.h:49
void scheduleEvery(Predicate p, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Repeat p until it return false.
Definition: scheduler.cpp:115
size_t getQueueInfo(std::chrono::steady_clock::time_point &first, std::chrono::steady_clock::time_point &last) const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Returns number of tasks waiting to be serviced, and first and last task times.
Definition: scheduler.cpp:121
std::function< void()> Function
Definition: scheduler.h:48
bool AreThreadsServicingQueue() const EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Returns true if there are threads actively running in serviceQueue()
Definition: scheduler.cpp:132
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:128
std::condition_variable newTaskScheduled
Definition: scheduler.h:122
Mutex newTaskMutex
Definition: scheduler.h:121
void scheduleFromNow(Function f, std::chrono::milliseconds delta) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Call f once after the delta has passed.
Definition: scheduler.h:56
void schedule(Function f, std::chrono::steady_clock::time_point t) EXCLUSIVE_LOCKS_REQUIRED(!newTaskMutex)
Call func at/after time t.
Definition: scheduler.cpp:75
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:143
void AddToProcessQueue(std::function< void()> func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Add a callback to be executed.
Definition: scheduler.cpp:189
void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:154
size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:208
void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Processes all remaining queue members on the calling thread, blocking until queue is empty.
Definition: scheduler.cpp:198
void MaybeScheduleProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex)
Definition: scheduler.cpp:137
static void Repeat(CScheduler &s, CScheduler::Predicate p, std::chrono::milliseconds delta)
Definition: scheduler.cpp:108
#define WAIT_LOCK(cs, name)
Definition: sync.h:317
#define LOCK(cs)
Definition: sync.h:306
#define REVERSE_LOCK(g)
Definition: sync.h:265
assert(!tx.IsCoinBase())