hyperledger/iroha
Iroha - A simple, decentralized ledger http://iroha.tech
scheduler_impl.hpp
Go to the documentation of this file.
1 
6 #ifndef IROHA_SUBSCRIPTION_SCHEDULER_IMPL_HPP
7 #define IROHA_SUBSCRIPTION_SCHEDULER_IMPL_HPP
8 
9 #include <assert.h>
10 #include <algorithm>
11 #include <chrono>
12 #include <deque>
13 #include <functional>
14 #include <mutex>
15 #include <shared_mutex>
16 #include <thread>
17 
19 
20 #include "common/common.hpp"
21 
26 //#define SE_SYNC_CALL_IF_SAME_THREAD
27 
28 namespace iroha::subscription {
29 
31  private:
32  using Time = std::chrono::high_resolution_clock;
33  using Timepoint = std::chrono::time_point<Time>;
34  struct TimedTask {
35  Timepoint timepoint;
36  Task task;
37  };
38  using TaskContainer = std::deque<TimedTask>;
39 
41  std::atomic_flag proceed_;
42 
43  mutable std::mutex tasks_cs_;
44 
46  TaskContainer tasks_;
47 
50 
52  bool is_busy_;
53 
54  std::thread::id id_;
55 
56  private:
57  inline void checkLocked() {
59  assert(!tasks_cs_.try_lock());
60  }
61 
62  inline Timepoint now() const {
63  return Time::now();
64  }
65 
66  TaskContainer::const_iterator after(Timepoint const &tp) {
67  checkLocked();
68  return std::upper_bound(
69  tasks_.begin(), tasks_.end(), tp, [](auto const &l, auto const &r) {
70  return l < r.timepoint;
71  });
72  }
73 
74  void insert(TaskContainer::const_iterator after, TimedTask &&t) {
75  checkLocked();
76  tasks_.insert(after, std::move(t));
77  }
78 
79  bool extractExpired(Task &task, Timepoint const &before) {
80  std::lock_guard lock(tasks_cs_);
81  if (!tasks_.empty()) {
82  auto &first_task = tasks_.front();
83  if (first_task.timepoint <= before) {
84  first_task.task.swap(task);
85  tasks_.pop_front();
86  is_busy_ = true;
87  return true;
88  }
89  }
90  return false;
91  }
92 
94  std::chrono::microseconds untilFirst() const {
95  auto const before = now();
96  std::lock_guard lock(tasks_cs_);
97  if (!tasks_.empty()) {
98  auto const &first = tasks_.front();
99  if (first.timepoint > before) {
100  return std::chrono::duration_cast<std::chrono::microseconds>(
101  first.timepoint - before);
102  }
103  return std::chrono::microseconds(0ull);
104  }
105  return std::chrono::minutes(10ull);
106  }
107 
108  public:
109  SchedulerBase() : is_busy_(false) {
110  proceed_.test_and_set();
111  }
112 
113  uint32_t process() {
114  id_ = std::this_thread::get_id();
115  Task task;
116  do {
117  if (extractExpired(task, now())) {
118  try {
119  if (task)
120  task();
121  } catch (...) {
122  }
123  } else {
124  {
125  std::lock_guard lock(tasks_cs_);
126  is_busy_ = false;
127  }
128  event_.wait(untilFirst());
129  }
130  } while (proceed_.test_and_set());
131  return 0;
132  }
133 
134  void dispose(bool wait_for_release = true) override {
135  proceed_.clear();
136  event_.set();
137  }
138 
139  bool isBusy() const override {
140  std::lock_guard lock(tasks_cs_);
141  return is_busy_;
142  }
143 
144  void add(Task &&t) override {
145  addDelayed(std::chrono::microseconds(0ull), std::move(t));
146  }
147 
148  void addDelayed(std::chrono::microseconds timeout, Task &&t) override {
149 #ifdef SE_SYNC_CALL_IF_SAME_THREAD
150  if (timeout == std::chrono::microseconds(0ull)
151  && id_ == std::this_thread::get_id()) {
152  std::forward<F>(f)();
153  } else {
154 #endif // SE_SYNC_CALL_IF_SAME_THREAD
155  auto const tp = now() + timeout;
156  std::lock_guard lock(tasks_cs_);
157  if (timeout == std::chrono::microseconds(0ull))
158  is_busy_ = true;
159  insert(after(tp), TimedTask{tp, std::move(t)});
160  event_.set();
161 #ifdef SE_SYNC_CALL_IF_SAME_THREAD
162  }
163 #endif // SE_SYNC_CALL_IF_SAME_THREAD
164  }
165  };
166 
167 } // namespace iroha::subscription
168 
169 #endif // IROHA_SUBSCRIPTION_SCHEDULER_IMPL_HPP
void set()
Definition: common.hpp:111
void addDelayed(std::chrono::microseconds timeout, Task &&t) override
Adds delayed task to execution queue.
Definition: scheduler_impl.hpp:148
uint32_t process()
Definition: scheduler_impl.hpp:113
Definition: scheduler.hpp:15
auto now()
Definition: time.hpp:23
Definition: scheduler_impl.hpp:30
Definition: common.hpp:26
Definition: subscription_fwd.hpp:60
bool isBusy() const override
Checks if current scheduler executes task.
Definition: scheduler_impl.hpp:139
void dispose(bool wait_for_release=true) override
Stops sheduler work and tasks execution.
Definition: scheduler_impl.hpp:134
std::function< void()> Task
Definition: scheduler.hpp:17
void add(Task &&t) override
Adds task to execution queue.
Definition: scheduler_impl.hpp:144
bool wait(std::chrono::microseconds wait_timeout)
Definition: common.hpp:93
SchedulerBase()
Definition: scheduler_impl.hpp:109
Definition: common.hpp:85
Definition: common.hpp:32