6 #ifndef IROHA_SUBSCRIPTION_SUBSCRIPTION_ENGINE_HPP 7 #define IROHA_SUBSCRIPTION_SUBSCRIPTION_ENGINE_HPP 12 #include <shared_mutex> 13 #include <unordered_map> 32 template <
typename EventKey,
typename Dispatcher,
typename Receiver>
35 public std::enable_shared_from_this<
36 SubscriptionEngine<EventKey, Dispatcher, Receiver>>,
57 : dispatcher_(dispatcher) {
68 struct SubscriptionContext final {
69 std::mutex subscribers_list_cs;
72 using KeyValueContainer =
73 std::unordered_map<EventKeyType, SubscriptionContext>;
75 mutable std::shared_mutex subscribers_map_cs_;
78 KeyValueContainer subscribers_map_;
96 SubscriptionSetId set_id,
99 std::unique_lock lock(subscribers_map_cs_);
100 auto &subscribers_context = subscribers_map_[key];
102 std::lock_guard l(subscribers_context.subscribers_list_cs);
103 return subscribers_context.subscribers_list.emplace(
104 subscribers_context.subscribers_list.end(),
105 std::make_tuple(tid, set_id, std::move(ptr)));
114 std::unique_lock lock(subscribers_map_cs_);
115 auto it = subscribers_map_.find(key);
116 if (subscribers_map_.end() != it) {
117 auto &subscribers_context = it->second;
118 std::lock_guard l(subscribers_context.subscribers_list_cs);
119 subscribers_context.subscribers_list.erase(it_remove);
120 if (subscribers_context.subscribers_list.empty())
121 subscribers_map_.erase(it);
131 std::shared_lock lock(subscribers_map_cs_);
132 if (
auto it = subscribers_map_.find(key); it != subscribers_map_.end()) {
133 auto &subscribers_context = it->second;
134 std::lock_guard l(subscribers_context.subscribers_list_cs);
135 return subscribers_context.subscribers_list.size();
145 std::shared_lock lock(subscribers_map_cs_);
147 for (
auto &it : subscribers_map_) {
148 auto &subscribers_context = it->second;
149 std::lock_guard l(subscribers_context.subscribers_list_cs);
150 count += subscribers_context.subscribers_list.size();
161 template <
typename... EventParams>
163 notifyDelayed(std::chrono::microseconds(0ull), key, args...);
173 template <
typename... EventParams>
176 EventParams
const &... args) {
177 auto dispatcher = dispatcher_;
181 std::shared_lock lock(subscribers_map_cs_);
182 auto it = subscribers_map_.find(key);
183 if (subscribers_map_.end() == it)
186 auto &subscribers_container = it->second;
187 std::lock_guard l(subscribers_container.subscribers_list_cs);
188 for (
auto it_sub = subscribers_container.subscribers_list.begin();
189 it_sub != subscribers_container.subscribers_list.end();) {
190 auto wsub = std::get<2>(*it_sub);
191 auto id = std::get<1>(*it_sub);
193 if (!wsub.expired()) {
194 dispatcher->addDelayed(std::get<0>(*it_sub),
196 [wsub(std::move(wsub)),
199 args = std::make_tuple(args...)]()
mutable {
200 if (auto sub = wsub.lock())
202 [&](auto &&... args) {
204 id, key, std::move(args)...);
210 it_sub = subscribers_container.subscribers_list.erase(it_sub);
218 #endif // IROHA_SUBSCRIPTION_SUBSCRIPTION_ENGINE_HPP Receiver SubscriberType
Definition: subscription_engine.hpp:42
typename SubscribersContainer::iterator IteratorType
Definition: subscription_engine.hpp:53
size_t size(const EventKeyType &key) const
Definition: subscription_engine.hpp:130
size_t size() const
Definition: subscription_engine.hpp:144
void unsubscribe(const EventKeyType &key, const IteratorType &it_remove)
Definition: subscription_engine.hpp:113
uint32_t SubscriptionSetId
Definition: subscriber.hpp:18
std::weak_ptr< SubscriberType > SubscriberWeakPtr
Definition: subscription_engine.hpp:43
uint32_t Tid
Definition: dispatcher.hpp:17
SubscriptionEngine(DispatcherPtr const &dispatcher)
Definition: subscription_engine.hpp:56
Definition: common.hpp:26
Definition: subscription_fwd.hpp:60
std::list< std::tuple< typename Dispatcher::Tid, SubscriptionSetId, SubscriberWeakPtr > > SubscribersContainer
Definition: subscription_engine.hpp:52
Definition: subscription_engine.hpp:33
void dispose() override
Definition: subscription_engine.hpp:62
std::shared_ptr< DispatcherType > DispatcherPtr
Definition: subscription_engine.hpp:45
Receiver ReceiverType
Definition: subscription_engine.hpp:41
typename std::decay< Dispatcher >::type DispatcherType
Definition: subscription_engine.hpp:44
Definition: subscription_engine.hpp:21
EventKey EventKeyType
Definition: subscription_engine.hpp:40
void notifyDelayed(std::chrono::microseconds timeout, const EventKeyType &key, EventParams const &... args)
Definition: subscription_engine.hpp:174
Definition: common.hpp:32
IteratorType subscribe(typename Dispatcher::Tid tid, SubscriptionSetId set_id, const EventKeyType &key, SubscriberWeakPtr ptr)
Definition: subscription_engine.hpp:95
void notify(const EventKeyType &key, EventParams const &... args)
Definition: subscription_engine.hpp:162