hyperledger/iroha
Iroha - A simple, decentralized ledger http://iroha.tech
async_dispatcher_impl.hpp
Go to the documentation of this file.
1 
6 #ifndef IROHA_SUBSCRIPTION_ASYNC_DISPATCHER_IMPL_HPP
7 #define IROHA_SUBSCRIPTION_ASYNC_DISPATCHER_IMPL_HPP
8 
10 
11 #include "common/common.hpp"
13 
14 namespace iroha::subscription {
15 
16  template <uint32_t kCount, uint32_t kPoolSize>
17  class AsyncDispatcher final : public IDispatcher,
20  public:
21  static constexpr uint32_t kHandlersCount = kCount;
22  static constexpr uint32_t kPoolThreadsCount = kPoolSize;
23 
24  private:
25  using Parent = IDispatcher;
26 
27  struct SchedulerContext {
29  std::shared_ptr<IScheduler> handler;
30 
33  bool is_temporary;
34  };
35 
36  SchedulerContext handlers_[kHandlersCount];
37  SchedulerContext pool_[kPoolThreadsCount];
38 
39  std::atomic_int64_t temporary_handlers_tasks_counter_;
40  std::atomic<bool> is_disposed_;
41 
42  struct BoundContexts {
43  typename Parent::Tid next_tid_offset = 0u;
44  std::unordered_map<typename Parent::Tid, SchedulerContext> contexts;
45  };
47 
48  inline SchedulerContext findHandler(typename Parent::Tid const tid) {
49  if (tid < kHandlersCount)
50  return handlers_[tid];
51 
52  if (auto context =
53  bound_.sharedAccess([tid](BoundContexts const &bound)
54  -> std::optional<SchedulerContext> {
55  if (auto it = bound.contexts.find(tid);
56  it != bound.contexts.end())
57  return it->second;
58  return std::nullopt;
59  }))
60  return *context;
61 
62  for (auto &handler : pool_)
63  if (!handler.handler->isBusy())
64  return handler;
65 
66  return SchedulerContext{
67  std::make_shared<ThreadHandler>(),
68  true // temporary
69  };
70  }
71 
72  public:
74  temporary_handlers_tasks_counter_.store(0);
75  is_disposed_ = false;
76  for (auto &h : handlers_) {
77  h.handler = std::make_shared<ThreadHandler>();
78  h.is_temporary = false;
79  }
80  for (auto &h : pool_) {
81  h.handler = std::make_shared<ThreadHandler>();
82  h.is_temporary = false;
83  }
84  }
85 
86  void dispose() override {
87  is_disposed_ = true;
88  for (auto &h : handlers_) h.handler->dispose();
89  for (auto &h : pool_) h.handler->dispose();
90 
91  while (temporary_handlers_tasks_counter_.load() != 0)
92  std::this_thread::sleep_for(std::chrono::microseconds(0ull));
93  }
94 
95  void add(typename Parent::Tid tid, typename Parent::Task &&task) override {
96  if (is_disposed_.load())
97  return;
98 
99  auto h = findHandler(tid);
100  if (!h.is_temporary)
101  h.handler->add(std::move(task));
102  else {
103  ++temporary_handlers_tasks_counter_;
104  h.handler->add([this, h, task{std::move(task)}]() mutable {
105  if (!is_disposed_.load())
106  task();
107  --temporary_handlers_tasks_counter_;
108  h.handler->dispose(false);
109  });
110  }
111  }
112 
113  void addDelayed(typename Parent::Tid tid,
114  std::chrono::microseconds timeout,
115  typename Parent::Task &&task) override {
116  if (is_disposed_.load())
117  return;
118 
119  auto h = findHandler(tid);
120  if (!h.is_temporary)
121  h.handler->addDelayed(timeout, std::move(task));
122  else {
123  ++temporary_handlers_tasks_counter_;
124  h.handler->addDelayed(timeout,
125  [this, h, task{std::move(task)}]() mutable {
126  if (!is_disposed_.load())
127  task();
128  --temporary_handlers_tasks_counter_;
129  h.handler->dispose(false);
130  });
131  }
132  }
133 
134  std::optional<Tid> bind(std::shared_ptr<IScheduler> scheduler) override {
135  if (!scheduler)
136  return std::nullopt;
137 
138  return bound_.exclusiveAccess(
139  [scheduler(std::move(scheduler))](BoundContexts &bound) {
140  auto const execution_tid = kHandlersCount + bound.next_tid_offset;
141  assert(bound.contexts.find(execution_tid) == bound.contexts.end());
142  bound.contexts[execution_tid] = SchedulerContext{scheduler, false};
143  ++bound.next_tid_offset;
144  return execution_tid;
145  });
146  }
147 
148  bool unbind(Tid tid) override {
149  return bound_.exclusiveAccess([tid](BoundContexts &bound) {
150  return bound.contexts.erase(tid) == 1;
151  });
152  }
153  };
154 
155 } // namespace iroha::subscription
156 
157 #endif // IROHA_SUBSCRIPTION_ASYNC_DISPATCHER_IMPL_HPP
Definition: dispatcher.hpp:16
void dispose() override
Definition: async_dispatcher_impl.hpp:86
auto sharedAccess(F &&f) const
Definition: common.hpp:75
uint32_t Tid
Definition: dispatcher.hpp:17
Definition: common.hpp:26
Definition: subscription_fwd.hpp:60
void addDelayed(typename Parent::Tid tid, std::chrono::microseconds timeout, typename Parent::Task &&task) override
Definition: async_dispatcher_impl.hpp:113
Definition: async_dispatcher_impl.hpp:17
static constexpr uint32_t kPoolThreadsCount
Definition: async_dispatcher_impl.hpp:22
std::optional< Tid > bind(std::shared_ptr< IScheduler > scheduler) override
Definition: async_dispatcher_impl.hpp:134
auto exclusiveAccess(F &&f)
Definition: common.hpp:69
static constexpr uint32_t kHandlersCount
Definition: async_dispatcher_impl.hpp:21
AsyncDispatcher()
Definition: async_dispatcher_impl.hpp:73
IScheduler::Task Task
Definition: dispatcher.hpp:18
Definition: common.hpp:32
void add(typename Parent::Tid tid, typename Parent::Task &&task) override
Definition: async_dispatcher_impl.hpp:95
bool unbind(Tid tid) override
Definition: async_dispatcher_impl.hpp:148