hyperledger/iroha
Iroha - A simple, decentralized ledger http://iroha.tech
subscription_manager.hpp
Go to the documentation of this file.
1 
6 #ifndef IROHA_SUBSCRIPTION_SUBSCRIPTION_MANAGER_HPP
7 #define IROHA_SUBSCRIPTION_SUBSCRIPTION_MANAGER_HPP
8 
9 #include <assert.h>
10 #include <memory>
11 #include <shared_mutex>
12 #include <unordered_map>
13 
14 #include "common/common.hpp"
19 
20 namespace iroha::subscription {
21 
28  template <uint32_t kHandlersCount, uint32_t kPoolSize>
29  class SubscriptionManager final
30  : public std::enable_shared_from_this<
31  SubscriptionManager<kHandlersCount, kPoolSize>>,
32  utils::NoMove,
33  utils::NoCopy {
34  public:
36 
37  private:
38  using EngineHash = uint64_t;
39  using DispatcherPtr = std::shared_ptr<Dispatcher>;
40  using EnginesList = std::unordered_map<EngineHash, std::shared_ptr<void>>;
41 
42  private:
44  DispatcherPtr dispatcher_;
45  std::shared_mutex engines_cs_;
47  EnginesList engines_;
48  std::atomic_flag disposed_;
49 
50  private:
51  template <typename... Args>
52  static constexpr EngineHash getSubscriptionHash() {
53 #ifdef _WIN32
54  constexpr EngineHash value = CT_MURMUR2(__FUNCSIG__);
55 #else //_WIN32
56  constexpr EngineHash value = CT_MURMUR2(__PRETTY_FUNCTION__);
57 #endif //_WIN32
58  return value;
59  }
60 
61  public:
63  : dispatcher_(std::move(dispatcher)) {
64  disposed_.clear();
65  }
66 
71  void dispose() {
72  if (!disposed_.test_and_set()) {
73  {
74  std::shared_lock lock(engines_cs_);
75  for (auto &descriptor : engines_)
77  ->dispose();
78  }
79  dispatcher_->dispose();
80  }
81  }
82 
90  template <typename EventKey, typename... Args>
91  auto getEngine() {
92  using EngineType =
93  SubscriptionEngine<EventKey,
94  Dispatcher,
95  Subscriber<EventKey, Dispatcher, Args...>>;
96  constexpr auto engineId = getSubscriptionHash<Args...>();
97  {
98  std::shared_lock lock(engines_cs_);
99  if (auto it = engines_.find(engineId); it != engines_.end()) {
100  return utils::reinterpret_pointer_cast<EngineType>(it->second);
101  }
102  }
103  std::unique_lock lock(engines_cs_);
104  if (auto it = engines_.find(engineId); it != engines_.end()) {
105  return utils::reinterpret_pointer_cast<EngineType>(it->second);
106  }
107 
109  static_assert(std::is_base_of_v<IDisposable, EngineType>,
110  "Engine type must be derived from IDisposable.");
111  assert(uintptr_t(reinterpret_cast<EngineType *>(0x1))
112  == uintptr_t(static_cast<IDisposable *>(
113  reinterpret_cast<EngineType *>(0x1))));
114 
115  auto obj = std::make_shared<EngineType>(dispatcher_);
116  engines_[engineId] = utils::reinterpret_pointer_cast<void>(obj);
117  return obj;
118  }
119 
127  template <typename EventKey, typename... Args>
128  void notify(const EventKey &key, Args const &... args) {
129  notifyDelayed(std::chrono::microseconds(0ull), key, args...);
130  }
131 
141  template <typename EventKey, typename... Args>
142  void notifyDelayed(std::chrono::microseconds timeout,
143  const EventKey &key,
144  Args const &... args) {
145  using EngineType =
146  SubscriptionEngine<EventKey,
147  Dispatcher,
148  Subscriber<EventKey, Dispatcher, Args...>>;
149  constexpr auto engineId = getSubscriptionHash<Args...>();
150  std::shared_ptr<EngineType> engine;
151  {
152  std::shared_lock lock(engines_cs_);
153  if (auto it = engines_.find(engineId); it != engines_.end())
154  engine = utils::reinterpret_pointer_cast<EngineType>(it->second);
155  else
156  return;
157  }
158  assert(engine);
159  engine->notifyDelayed(timeout, key, args...);
160  }
161 
166  DispatcherPtr dispatcher() const {
167  return dispatcher_;
168  }
169  };
170 } // namespace iroha::subscription
171 
172 #endif // IROHA_SUBSCRIPTION_SUBSCRIPTION_MANAGER_HPP
Definition: dispatcher.hpp:16
Definition: round.cpp:51
subscription::IDispatcher Dispatcher
Definition: subscription_manager.hpp:35
DispatcherPtr dispatcher() const
Definition: subscription_manager.hpp:166
Definition: subscription_fwd.hpp:60
void notify(const EventKey &key, Args const &... args)
Definition: subscription_manager.hpp:128
Definition: subscription_engine.hpp:33
SubscriptionManager(DispatcherPtr dispatcher)
Definition: subscription_manager.hpp:62
void notifyDelayed(std::chrono::microseconds timeout, const EventKey &key, Args const &... args)
Definition: subscription_manager.hpp:142
Definition: subscriber.hpp:27
void dispose()
Definition: subscription_manager.hpp:71
auto getEngine()
Definition: subscription_manager.hpp:91
EngineType
Type of smart contract engine.
Definition: engine_type.hpp:12
Definition: subscription_engine.hpp:21
#define CT_MURMUR2(x)
Definition: compile-time_murmur2.hpp:93
std::shared_ptr< To > reinterpret_pointer_cast(std::shared_ptr< From > const &ptr) noexcept
Definition: common.hpp:16