6 #ifndef IROHA_SUBSCRIPTION_SCHEDULER_IMPL_HPP 7 #define IROHA_SUBSCRIPTION_SCHEDULER_IMPL_HPP 15 #include <shared_mutex> 32 using Time = std::chrono::high_resolution_clock;
33 using Timepoint = std::chrono::time_point<Time>;
38 using TaskContainer = std::deque<TimedTask>;
41 std::atomic_flag proceed_;
43 mutable std::mutex tasks_cs_;
57 inline void checkLocked() {
59 assert(!tasks_cs_.try_lock());
62 inline Timepoint now()
const {
66 TaskContainer::const_iterator after(Timepoint
const &tp) {
68 return std::upper_bound(
69 tasks_.begin(), tasks_.end(), tp, [](
auto const &l,
auto const &r) {
70 return l < r.timepoint;
74 void insert(TaskContainer::const_iterator after, TimedTask &&t) {
76 tasks_.insert(after, std::move(t));
79 bool extractExpired(
Task &task, Timepoint
const &before) {
80 std::lock_guard lock(tasks_cs_);
81 if (!tasks_.empty()) {
82 auto &first_task = tasks_.front();
83 if (first_task.timepoint <= before) {
84 first_task.task.swap(task);
94 std::chrono::microseconds untilFirst()
const {
95 auto const before = now();
96 std::lock_guard lock(tasks_cs_);
97 if (!tasks_.empty()) {
98 auto const &first = tasks_.front();
99 if (first.timepoint > before) {
100 return std::chrono::duration_cast<std::chrono::microseconds>(
101 first.timepoint - before);
103 return std::chrono::microseconds(0ull);
105 return std::chrono::minutes(10ull);
110 proceed_.test_and_set();
114 id_ = std::this_thread::get_id();
117 if (extractExpired(task, now())) {
125 std::lock_guard lock(tasks_cs_);
128 event_.
wait(untilFirst());
130 }
while (proceed_.test_and_set());
134 void dispose(
bool wait_for_release =
true)
override {
140 std::lock_guard lock(tasks_cs_);
145 addDelayed(std::chrono::microseconds(0ull), std::move(t));
149 #ifdef SE_SYNC_CALL_IF_SAME_THREAD 150 if (timeout == std::chrono::microseconds(0ull)
151 && id_ == std::this_thread::get_id()) {
152 std::forward<F>(f)();
154 #endif // SE_SYNC_CALL_IF_SAME_THREAD 155 auto const tp = now() + timeout;
156 std::lock_guard lock(tasks_cs_);
157 if (timeout == std::chrono::microseconds(0ull))
159 insert(after(tp), TimedTask{tp, std::move(t)});
161 #ifdef SE_SYNC_CALL_IF_SAME_THREAD 163 #endif // SE_SYNC_CALL_IF_SAME_THREAD 169 #endif // IROHA_SUBSCRIPTION_SCHEDULER_IMPL_HPP void set()
Definition: common.hpp:111
void addDelayed(std::chrono::microseconds timeout, Task &&t) override
Adds delayed task to execution queue.
Definition: scheduler_impl.hpp:148
uint32_t process()
Definition: scheduler_impl.hpp:113
Definition: scheduler.hpp:15
auto now()
Definition: time.hpp:23
Definition: scheduler_impl.hpp:30
Definition: common.hpp:26
Definition: subscription_fwd.hpp:60
bool isBusy() const override
Checks if current scheduler executes task.
Definition: scheduler_impl.hpp:139
void dispose(bool wait_for_release=true) override
Stops sheduler work and tasks execution.
Definition: scheduler_impl.hpp:134
std::function< void()> Task
Definition: scheduler.hpp:17
void add(Task &&t) override
Adds task to execution queue.
Definition: scheduler_impl.hpp:144
bool wait(std::chrono::microseconds wait_timeout)
Definition: common.hpp:93
SchedulerBase()
Definition: scheduler_impl.hpp:109
Definition: common.hpp:85
Definition: common.hpp:32