Bitcoin ABC  0.22.13
P2P Digital Currency
scheduler.h
Go to the documentation of this file.
1 // Copyright (c) 2015 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #ifndef BITCOIN_SCHEDULER_H
6 #define BITCOIN_SCHEDULER_H
7 
8 #include <sync.h>
9 
10 //
11 // NOTE:
12 // boost::thread should be ported to std::thread
13 // when we support C++11.
14 //
15 #include <condition_variable>
16 #include <functional>
17 #include <list>
18 #include <map>
19 
20 //
21 // Simple class for background tasks that should be run periodically or once
22 // "after a while"
23 //
24 // Usage:
25 //
26 // CScheduler* s = new CScheduler();
27 // // Assuming a: void doSomething() { }
28 // s->scheduleFromNow(doSomething, std::chrono::milliseconds{11});
29 // s->scheduleFromNow([=] { this->func(argument); },
30 // std::chrono::milliseconds{3});
31 // boost::thread *t = new boost::thread(std::bind(CScheduler::serviceQueue, s));
32 //
33 // ... then at program shutdown, make sure to call stop() to clean up the
34 // thread(s) running serviceQueue:
35 // s->stop();
36 // t->join();
37 // delete t;
38 // delete s; // Must be done after thread is interrupted/joined.
39 //
40 
41 class CScheduler {
42 public:
43  CScheduler();
44  ~CScheduler();
45 
46  typedef std::function<void()> Function;
47  typedef std::function<bool()> Predicate;
48 
49  // Call func at/after time t
51 
53  void scheduleFromNow(Function f, std::chrono::milliseconds delta) {
54  schedule(std::move(f), std::chrono::system_clock::now() + delta);
55  }
56 
64  void scheduleEvery(Predicate p, std::chrono::milliseconds delta);
65 
71  void MockForward(std::chrono::seconds delta_seconds);
72 
73  // To keep things as simple as possible, there is no unschedule.
74 
75  // Services the queue 'forever'. Should be run in a thread, and interrupted
76  // using boost::interrupt_thread
77  void serviceQueue();
78 
79  // Tell any threads running serviceQueue to stop as soon as they're done
80  // servicing whatever task they're currently servicing (drain=false) or when
81  // there is no work left to be done (drain=true)
82  void stop(bool drain = false);
83 
84  // Returns number of tasks waiting to be serviced,
85  // and first and last task times
88 
89  // Returns true if there are threads actively running in serviceQueue()
90  bool AreThreadsServicingQueue() const;
91 
92 private:
94  std::condition_variable newTaskScheduled;
95  std::multimap<std::chrono::system_clock::time_point, Function>
96  taskQueue GUARDED_BY(newTaskMutex);
97  int nThreadsServicingQueue GUARDED_BY(newTaskMutex){0};
98  bool stopRequested GUARDED_BY(newTaskMutex){false};
99  bool stopWhenEmpty GUARDED_BY(newTaskMutex){false};
100  bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) {
101  return stopRequested || (stopWhenEmpty && taskQueue.empty());
102  }
103 };
104 
116 private:
118 
120  std::list<std::function<void()>>
121  m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending);
122  bool m_are_callbacks_running GUARDED_BY(m_cs_callbacks_pending) = false;
123 
124  void MaybeScheduleProcessQueue();
125  void ProcessQueue();
126 
127 public:
129  : m_pscheduler(pschedulerIn) {}
130 
137  void AddToProcessQueue(std::function<void()> func);
138 
139  // Processes all remaining queue members on the calling thread, blocking
140  // until queue is empty. Must be called after the CScheduler has no
141  // remaining processing threads!
142  void EmptyQueue();
143 
144  size_t CallbacksPending();
145 };
146 
147 #endif // BITCOIN_SCHEDULER_H
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
Definition: scheduler.h:115
std::function< bool()> Predicate
Definition: scheduler.h:47
void MockForward(std::chrono::seconds delta_seconds)
Mock the scheduler to fast forward in time.
Definition: scheduler.cpp:96
std::function< void()> Function
Definition: scheduler.h:46
bool stopWhenEmpty GUARDED_BY(newTaskMutex)
Definition: scheduler.h:99
Mutex newTaskMutex
Definition: scheduler.h:93
bool stopRequested GUARDED_BY(newTaskMutex)
Definition: scheduler.h:98
std::condition_variable newTaskScheduled
Definition: scheduler.h:94
int nThreadsServicingQueue GUARDED_BY(newTaskMutex)
Definition: scheduler.h:97
void stop(bool drain=false)
Definition: scheduler.cpp:75
void schedule(Function f, std::chrono::system_clock::time_point t)
Definition: scheduler.cpp:87
void scheduleEvery(Predicate p, std::chrono::milliseconds delta)
Repeat p until it return false.
Definition: scheduler.cpp:127
RecursiveMutex m_cs_callbacks_pending
Definition: scheduler.h:119
SingleThreadedSchedulerClient(CScheduler *pschedulerIn)
Definition: scheduler.h:128
void serviceQueue()
Definition: scheduler.cpp:21
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:56
bool AreThreadsServicingQueue() const
Definition: scheduler.cpp:144
clock::time_point time_point
Definition: bench.h:50
std::multimap< std::chrono::system_clock::time_point, Function > taskQueue GUARDED_BY(newTaskMutex)
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
Definition: scheduler.cpp:133
void scheduleFromNow(Function f, std::chrono::milliseconds delta)
Call f once after the delta has passed.
Definition: scheduler.h:53
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)
Definition: scheduler.h:100