6 #ifndef IROHA_SUBSCRIPTION_SUBSCRIPTION_MANAGER_HPP 7 #define IROHA_SUBSCRIPTION_SUBSCRIPTION_MANAGER_HPP 11 #include <shared_mutex> 12 #include <unordered_map> 28 template <u
int32_t kHandlersCount, u
int32_t kPoolSize>
29 class SubscriptionManager final
30 :
public std::enable_shared_from_this<
31 SubscriptionManager<kHandlersCount, kPoolSize>>,
38 using EngineHash = uint64_t;
39 using DispatcherPtr = std::shared_ptr<Dispatcher>;
40 using EnginesList = std::unordered_map<EngineHash, std::shared_ptr<void>>;
44 DispatcherPtr dispatcher_;
45 std::shared_mutex engines_cs_;
48 std::atomic_flag disposed_;
51 template <
typename... Args>
52 static constexpr EngineHash getSubscriptionHash() {
54 constexpr EngineHash value =
CT_MURMUR2(__FUNCSIG__);
56 constexpr EngineHash value =
CT_MURMUR2(__PRETTY_FUNCTION__);
63 : dispatcher_(
std::move(dispatcher)) {
72 if (!disposed_.test_and_set()) {
74 std::shared_lock lock(engines_cs_);
75 for (
auto &descriptor : engines_)
79 dispatcher_->dispose();
90 template <
typename EventKey,
typename... Args>
96 constexpr
auto engineId = getSubscriptionHash<Args...>();
98 std::shared_lock lock(engines_cs_);
99 if (
auto it = engines_.find(engineId); it != engines_.end()) {
103 std::unique_lock lock(engines_cs_);
104 if (
auto it = engines_.find(engineId); it != engines_.end()) {
109 static_assert(std::is_base_of_v<IDisposable, EngineType>,
110 "Engine type must be derived from IDisposable.");
111 assert(uintptr_t(reinterpret_cast<EngineType *>(0x1))
112 == uintptr_t(static_cast<IDisposable *>(
113 reinterpret_cast<EngineType *>(0x1))));
115 auto obj = std::make_shared<EngineType>(dispatcher_);
127 template <
typename EventKey,
typename... Args>
128 void notify(
const EventKey &key, Args
const &... args) {
129 notifyDelayed(std::chrono::microseconds(0ull), key, args...);
141 template <
typename EventKey,
typename... Args>
144 Args
const &... args) {
149 constexpr
auto engineId = getSubscriptionHash<Args...>();
150 std::shared_ptr<EngineType> engine;
152 std::shared_lock lock(engines_cs_);
153 if (
auto it = engines_.find(engineId); it != engines_.end())
154 engine = utils::reinterpret_pointer_cast<EngineType>(it->second);
159 engine->notifyDelayed(timeout, key, args...);
172 #endif // IROHA_SUBSCRIPTION_SUBSCRIPTION_MANAGER_HPP Definition: dispatcher.hpp:16
subscription::IDispatcher Dispatcher
Definition: subscription_manager.hpp:35
DispatcherPtr dispatcher() const
Definition: subscription_manager.hpp:166
Definition: subscription_fwd.hpp:60
void notify(const EventKey &key, Args const &... args)
Definition: subscription_manager.hpp:128
Definition: subscription_engine.hpp:33
SubscriptionManager(DispatcherPtr dispatcher)
Definition: subscription_manager.hpp:62
void notifyDelayed(std::chrono::microseconds timeout, const EventKey &key, Args const &... args)
Definition: subscription_manager.hpp:142
Definition: subscriber.hpp:27
void dispose()
Definition: subscription_manager.hpp:71
auto getEngine()
Definition: subscription_manager.hpp:91
EngineType
Type of smart contract engine.
Definition: engine_type.hpp:12
Definition: subscription_engine.hpp:21
#define CT_MURMUR2(x)
Definition: compile-time_murmur2.hpp:93
std::shared_ptr< To > reinterpret_pointer_cast(std::shared_ptr< From > const &ptr) noexcept
Definition: common.hpp:16