6 #ifndef IROHA_SUBSCRIPTION_SUBSCRIBER_IMPL_HPP 7 #define IROHA_SUBSCRIPTION_SUBSCRIBER_IMPL_HPP 31 template <
typename EventKey,
34 typename... Arguments>
35 class SubscriberImpl :
public Subscriber<EventKey, Dispatcher, Arguments...> {
51 const typename Parent::EventType &,
55 using SubscriptionsContainer =
58 using SubscriptionsSets =
59 std::unordered_map<SubscriptionSetId, SubscriptionsContainer>;
61 std::atomic<SubscriptionSetId> next_id_;
70 std::mutex subscriptions_cs_;
74 SubscriptionsSets subscriptions_sets_;
79 template <
typename... SubscriberConstructorArgs>
81 SubscriberConstructorArgs &&... args)
84 object_(
std::forward<SubscriberConstructorArgs>(args)...) {}
87 template <
typename... SubscriberConstructorArgs>
88 static std::shared_ptr<SubscriberImpl>
create(
90 SubscriberConstructorArgs &&... args) {
93 SubscriberConstructorArgs &&... args)
95 ptr, std::forward<SubscriberConstructorArgs>(args)...) {}
97 return std::make_shared<Resolver>(
98 ptr, std::forward<SubscriberConstructorArgs>(args)...);
104 on_notify_callback_ = std::move(f);
112 const typename Parent::EventType &key,
114 if (
auto engine = engine_.lock()) {
115 std::lock_guard lock(subscriptions_cs_);
116 auto &&[it, inserted] = subscriptions_sets_[id].emplace(
117 key,
typename SubscriptionEngineType::IteratorType{});
123 engine->subscribe(tid,
id, key, Parent::weak_from_this());
133 const typename Parent::EventType &key) {
134 std::lock_guard<std::mutex> lock(subscriptions_cs_);
135 if (
auto set_it = subscriptions_sets_.find(
id);
136 set_it != subscriptions_sets_.end()) {
137 auto &subscriptions = set_it->second;
138 auto it = subscriptions.find(key);
139 if (subscriptions.end() != it) {
140 if (
auto engine = engine_.lock())
141 engine->unsubscribe(key, it->second);
142 subscriptions.erase(it);
154 std::lock_guard<std::mutex> lock(subscriptions_cs_);
155 if (
auto set_it = subscriptions_sets_.find(
id);
156 set_it != subscriptions_sets_.end()) {
157 if (
auto engine = engine_.lock()) {
158 auto &subscriptions = set_it->second;
159 for (
auto &[key, it] : subscriptions) engine->unsubscribe(key, it);
162 subscriptions_sets_.erase(set_it);
169 std::lock_guard<std::mutex> lock(subscriptions_cs_);
170 if (
auto engine = engine_.lock())
171 for (
auto &[_, subscriptions] : subscriptions_sets_)
172 for (
auto &[key, it] : subscriptions) engine->unsubscribe(key, it);
174 subscriptions_sets_.clear();
178 const typename Parent::EventType &key,
179 Arguments &&... args)
override {
180 if (
nullptr != on_notify_callback_)
181 on_notify_callback_(set_id, object_, key, std::move(args)...);
191 #endif // IROHA_SUBSCRIPTION_SUBSCRIBER_IMPL_HPP Definition: subscription_fwd.hpp:70
std::shared_ptr< SubscriptionEngineType > SubscriptionEnginePtr
Definition: subscriber_impl.hpp:45
typename SubscribersContainer::iterator IteratorType
Definition: subscription_engine.hpp:53
SubscriptionSetId generateSubscriptionSetId()
Definition: subscriber_impl.hpp:107
void subscribe(SubscriptionSetId id, const typename Parent::EventType &key, typename Dispatcher::Tid tid=Dispatcher::kExecuteInPool)
Definition: subscriber_impl.hpp:111
size_t Hash
Definition: subscriber_impl.hpp:38
void setCallback(CallbackFnType &&f)
Definition: subscriber_impl.hpp:103
uint32_t SubscriptionSetId
Definition: subscriber.hpp:18
bool unsubscribe(SubscriptionSetId id)
Definition: subscriber_impl.hpp:153
uint32_t Tid
Definition: dispatcher.hpp:17
subscription::IDispatcher Dispatcher
Definition: subscription_fwd.hpp:73
void unsubscribe()
Definition: subscriber_impl.hpp:168
Definition: subscription_fwd.hpp:60
EventKey EventType
Definition: subscriber.hpp:32
static constexpr Tid kExecuteInPool
Definition: dispatcher.hpp:19
Definition: subscription_engine.hpp:33
bool unsubscribe(SubscriptionSetId id, const typename Parent::EventType &key)
Definition: subscriber_impl.hpp:132
~SubscriberImpl()
Definition: subscriber_impl.hpp:101
void on_notify(SubscriptionSetId set_id, const typename Parent::EventType &key, Arguments &&... args) override
Definition: subscriber_impl.hpp:177
std::function< void(SubscriptionSetId, ReceiverType &, const typename Parent::EventType &, Arguments &&...)> CallbackFnType
Definition: subscriber_impl.hpp:52
static std::shared_ptr< SubscriberImpl > create(SubscriptionEnginePtr const &ptr, SubscriberConstructorArgs &&... args)
Definition: subscriber_impl.hpp:88
std::weak_ptr< SubscriptionEngineType > SubscriptionEngineWPtr
Definition: subscriber_impl.hpp:46
Receiver ReceiverType
Definition: subscriber_impl.hpp:37