6 #ifndef IROHA_SUBSCRIPTION_ASYNC_DISPATCHER_IMPL_HPP 7 #define IROHA_SUBSCRIPTION_ASYNC_DISPATCHER_IMPL_HPP 16 template <u
int32_t kCount, u
int32_t kPoolSize>
27 struct SchedulerContext {
29 std::shared_ptr<IScheduler> handler;
39 std::atomic_int64_t temporary_handlers_tasks_counter_;
40 std::atomic<bool> is_disposed_;
42 struct BoundContexts {
44 std::unordered_map<typename Parent::Tid, SchedulerContext> contexts;
48 inline SchedulerContext findHandler(
typename Parent::Tid const tid) {
49 if (tid < kHandlersCount)
50 return handlers_[tid];
54 -> std::optional<SchedulerContext> {
55 if (auto it = bound.contexts.find(tid);
56 it != bound.contexts.end())
62 for (
auto &handler : pool_)
63 if (!handler.handler->isBusy())
66 return SchedulerContext{
67 std::make_shared<ThreadHandler>(),
74 temporary_handlers_tasks_counter_.store(0);
76 for (
auto &h : handlers_) {
77 h.handler = std::make_shared<ThreadHandler>();
78 h.is_temporary =
false;
80 for (
auto &h : pool_) {
81 h.handler = std::make_shared<ThreadHandler>();
82 h.is_temporary =
false;
88 for (
auto &h : handlers_) h.handler->dispose();
89 for (
auto &h : pool_) h.handler->dispose();
91 while (temporary_handlers_tasks_counter_.load() != 0)
92 std::this_thread::sleep_for(std::chrono::microseconds(0ull));
96 if (is_disposed_.load())
99 auto h = findHandler(tid);
101 h.handler->add(std::move(task));
103 ++temporary_handlers_tasks_counter_;
104 h.handler->add([
this, h, task{std::move(task)}]()
mutable {
105 if (!is_disposed_.load())
107 --temporary_handlers_tasks_counter_;
108 h.handler->dispose(
false);
114 std::chrono::microseconds timeout,
116 if (is_disposed_.load())
119 auto h = findHandler(tid);
121 h.handler->addDelayed(timeout, std::move(task));
123 ++temporary_handlers_tasks_counter_;
124 h.handler->addDelayed(timeout,
125 [
this, h, task{std::move(task)}]()
mutable {
126 if (!is_disposed_.load())
128 --temporary_handlers_tasks_counter_;
129 h.handler->dispose(
false);
134 std::optional<Tid>
bind(std::shared_ptr<IScheduler> scheduler)
override {
139 [scheduler(std::move(scheduler))](BoundContexts &bound) {
140 auto const execution_tid = kHandlersCount + bound.next_tid_offset;
141 assert(bound.contexts.find(execution_tid) == bound.contexts.end());
142 bound.contexts[execution_tid] = SchedulerContext{scheduler,
false};
143 ++bound.next_tid_offset;
144 return execution_tid;
150 return bound.contexts.erase(tid) == 1;
157 #endif // IROHA_SUBSCRIPTION_ASYNC_DISPATCHER_IMPL_HPP Definition: dispatcher.hpp:16
void dispose() override
Definition: async_dispatcher_impl.hpp:86
auto sharedAccess(F &&f) const
Definition: common.hpp:75
uint32_t Tid
Definition: dispatcher.hpp:17
Definition: common.hpp:26
Definition: subscription_fwd.hpp:60
void addDelayed(typename Parent::Tid tid, std::chrono::microseconds timeout, typename Parent::Task &&task) override
Definition: async_dispatcher_impl.hpp:113
Definition: async_dispatcher_impl.hpp:17
static constexpr uint32_t kPoolThreadsCount
Definition: async_dispatcher_impl.hpp:22
std::optional< Tid > bind(std::shared_ptr< IScheduler > scheduler) override
Definition: async_dispatcher_impl.hpp:134
auto exclusiveAccess(F &&f)
Definition: common.hpp:69
static constexpr uint32_t kHandlersCount
Definition: async_dispatcher_impl.hpp:21
AsyncDispatcher()
Definition: async_dispatcher_impl.hpp:73
IScheduler::Task Task
Definition: dispatcher.hpp:18
Definition: common.hpp:32
void add(typename Parent::Tid tid, typename Parent::Task &&task) override
Definition: async_dispatcher_impl.hpp:95
bool unbind(Tid tid) override
Definition: async_dispatcher_impl.hpp:148