15 assert(nThreadsServicingQueue == 0);
17 assert(taskQueue.empty());
23 ++nThreadsServicingQueue;
42 taskQueue.begin()->first;
44 std::cv_status::timeout) {
57 Function f = taskQueue.begin()->second;
58 taskQueue.erase(taskQueue.begin());
67 --nThreadsServicingQueue;
71 --nThreadsServicingQueue;
91 taskQueue.insert(std::make_pair(t, f));
97 assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1});
103 std::multimap<std::chrono::system_clock::time_point, Function>
106 for (
const auto &element : taskQueue) {
107 temp_queue.emplace_hint(temp_queue.cend(),
108 element.first - delta_seconds,
113 taskQueue = std::move(temp_queue);
121 std::chrono::milliseconds delta) {
128 std::chrono::milliseconds delta) {
136 size_t result = taskQueue.size();
137 if (!taskQueue.empty()) {
138 first = taskQueue.begin()->first;
139 last = taskQueue.rbegin()->first;
146 return nThreadsServicingQueue;
151 LOCK(m_cs_callbacks_pending);
155 if (m_are_callbacks_running) {
158 if (m_callbacks_pending.empty()) {
162 m_pscheduler->schedule(
164 std::chrono::system_clock::now());
168 std::function<void()> callback;
170 LOCK(m_cs_callbacks_pending);
171 if (m_are_callbacks_running) {
174 if (m_callbacks_pending.empty()) {
177 m_are_callbacks_running =
true;
179 callback = std::move(m_callbacks_pending.front());
180 m_callbacks_pending.pop_front();
186 struct RAIICallbacksRunning {
189 : instance(_instance) {}
190 ~RAIICallbacksRunning() {
193 instance->m_are_callbacks_running =
false;
197 } raiicallbacksrunning(
this);
203 std::function<
void()> func) {
204 assert(m_pscheduler);
207 LOCK(m_cs_callbacks_pending);
208 m_callbacks_pending.emplace_back(std::move(func));
210 MaybeScheduleProcessQueue();
214 assert(!m_pscheduler->AreThreadsServicingQueue());
215 bool should_continue =
true;
216 while (should_continue) {
218 LOCK(m_cs_callbacks_pending);
219 should_continue = !m_callbacks_pending.empty();
224 LOCK(m_cs_callbacks_pending);
225 return m_callbacks_pending.size();
Class used by CScheduler clients which may schedule multiple jobs which are required to be run serial...
std::function< bool()> Predicate
void MaybeScheduleProcessQueue()
void MockForward(std::chrono::seconds delta_seconds)
Mock the scheduler to fast forward in time.
std::function< void()> Function
void AddToProcessQueue(std::function< void()> func)
Add a callback to be executed.
std::condition_variable newTaskScheduled
void stop(bool drain=false)
void schedule(Function f, std::chrono::system_clock::time_point t)
void scheduleEvery(Predicate p, std::chrono::milliseconds delta)
Repeat p until it return false.
RecursiveMutex m_cs_callbacks_pending
#define WAIT_LOCK(cs, name)
size_t CallbacksPending()
static void Repeat(CScheduler &s, CScheduler::Predicate p, std::chrono::milliseconds delta)
bool AreThreadsServicingQueue() const
clock::time_point time_point
size_t getQueueInfo(std::chrono::system_clock::time_point &first, std::chrono::system_clock::time_point &last) const
void scheduleFromNow(Function f, std::chrono::milliseconds delta)
Call f once after the delta has passed.
bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex)