hyperledger/iroha
Iroha - A simple, decentralized ledger http://iroha.tech
subscriber_impl.hpp
Go to the documentation of this file.
1 
6 #ifndef IROHA_SUBSCRIPTION_SUBSCRIBER_IMPL_HPP
7 #define IROHA_SUBSCRIPTION_SUBSCRIBER_IMPL_HPP
8 
9 #include <atomic>
10 #include <functional>
11 #include <memory>
12 #include <mutex>
13 
14 #include "common/common.hpp"
17 
18 namespace iroha::subscription {
19 
31  template <typename EventKey,
32  typename Dispatcher,
33  typename Receiver,
34  typename... Arguments>
35  class SubscriberImpl : public Subscriber<EventKey, Dispatcher, Arguments...> {
36  public:
37  using ReceiverType = Receiver;
38  using Hash = size_t;
39  using Parent = Subscriber<EventKey, Dispatcher, Arguments...>;
40 
42  typename Parent::EventType,
43  Dispatcher,
44  Subscriber<typename Parent::EventType, Dispatcher, Arguments...>>;
45  using SubscriptionEnginePtr = std::shared_ptr<SubscriptionEngineType>;
46  using SubscriptionEngineWPtr = std::weak_ptr<SubscriptionEngineType>;
47 
48  using CallbackFnType =
49  std::function<void(SubscriptionSetId,
50  ReceiverType &,
51  const typename Parent::EventType &,
52  Arguments &&...)>;
53 
54  private:
55  using SubscriptionsContainer =
56  std::unordered_map<typename Parent::EventType,
58  using SubscriptionsSets =
59  std::unordered_map<SubscriptionSetId, SubscriptionsContainer>;
60 
61  std::atomic<SubscriptionSetId> next_id_;
62 
64  SubscriptionEngineWPtr engine_;
65 
68  ReceiverType object_;
69 
70  std::mutex subscriptions_cs_;
71 
74  SubscriptionsSets subscriptions_sets_;
75 
77  CallbackFnType on_notify_callback_;
78 
79  template <typename... SubscriberConstructorArgs>
81  SubscriberConstructorArgs &&... args)
82  : next_id_(0ull),
83  engine_(ptr),
84  object_(std::forward<SubscriberConstructorArgs>(args)...) {}
85 
86  public:
87  template <typename... SubscriberConstructorArgs>
88  static std::shared_ptr<SubscriberImpl> create(
89  SubscriptionEnginePtr const &ptr,
90  SubscriberConstructorArgs &&... args) {
91  struct Resolver : SubscriberImpl {
92  Resolver(SubscriptionEnginePtr const &ptr,
93  SubscriberConstructorArgs &&... args)
95  ptr, std::forward<SubscriberConstructorArgs>(args)...) {}
96  };
97  return std::make_shared<Resolver>(
98  ptr, std::forward<SubscriberConstructorArgs>(args)...);
99  }
100 
102 
104  on_notify_callback_ = std::move(f);
105  }
106 
108  return ++next_id_;
109  }
110 
112  const typename Parent::EventType &key,
114  if (auto engine = engine_.lock()) {
115  std::lock_guard lock(subscriptions_cs_);
116  auto &&[it, inserted] = subscriptions_sets_[id].emplace(
117  key, typename SubscriptionEngineType::IteratorType{});
118 
121  if (inserted)
122  it->second =
123  engine->subscribe(tid, id, key, Parent::weak_from_this());
124  }
125  }
126 
133  const typename Parent::EventType &key) {
134  std::lock_guard<std::mutex> lock(subscriptions_cs_);
135  if (auto set_it = subscriptions_sets_.find(id);
136  set_it != subscriptions_sets_.end()) {
137  auto &subscriptions = set_it->second;
138  auto it = subscriptions.find(key);
139  if (subscriptions.end() != it) {
140  if (auto engine = engine_.lock())
141  engine->unsubscribe(key, it->second);
142  subscriptions.erase(it);
143  return true;
144  }
145  }
146  return false;
147  }
148 
154  std::lock_guard<std::mutex> lock(subscriptions_cs_);
155  if (auto set_it = subscriptions_sets_.find(id);
156  set_it != subscriptions_sets_.end()) {
157  if (auto engine = engine_.lock()) {
158  auto &subscriptions = set_it->second;
159  for (auto &[key, it] : subscriptions) engine->unsubscribe(key, it);
160  }
161 
162  subscriptions_sets_.erase(set_it);
163  return true;
164  }
165  return false;
166  }
167 
168  void unsubscribe() {
169  std::lock_guard<std::mutex> lock(subscriptions_cs_);
170  if (auto engine = engine_.lock())
171  for (auto &[_, subscriptions] : subscriptions_sets_)
172  for (auto &[key, it] : subscriptions) engine->unsubscribe(key, it);
173 
174  subscriptions_sets_.clear();
175  }
176 
178  const typename Parent::EventType &key,
179  Arguments &&... args) override {
180  if (nullptr != on_notify_callback_)
181  on_notify_callback_(set_id, object_, key, std::move(args)...);
182  }
183 
184  ReceiverType &get() {
185  return object_;
186  }
187  };
188 
189 } // namespace iroha::subscription
190 
191 #endif // IROHA_SUBSCRIPTION_SUBSCRIBER_IMPL_HPP
Definition: subscription_fwd.hpp:70
std::shared_ptr< SubscriptionEngineType > SubscriptionEnginePtr
Definition: subscriber_impl.hpp:45
typename SubscribersContainer::iterator IteratorType
Definition: subscription_engine.hpp:53
SubscriptionSetId generateSubscriptionSetId()
Definition: subscriber_impl.hpp:107
void subscribe(SubscriptionSetId id, const typename Parent::EventType &key, typename Dispatcher::Tid tid=Dispatcher::kExecuteInPool)
Definition: subscriber_impl.hpp:111
size_t Hash
Definition: subscriber_impl.hpp:38
void setCallback(CallbackFnType &&f)
Definition: subscriber_impl.hpp:103
Definition: round.cpp:51
uint32_t SubscriptionSetId
Definition: subscriber.hpp:18
bool unsubscribe(SubscriptionSetId id)
Definition: subscriber_impl.hpp:153
uint32_t Tid
Definition: dispatcher.hpp:17
subscription::IDispatcher Dispatcher
Definition: subscription_fwd.hpp:73
void unsubscribe()
Definition: subscriber_impl.hpp:168
Definition: subscription_fwd.hpp:60
static constexpr Tid kExecuteInPool
Definition: dispatcher.hpp:19
Definition: subscription_engine.hpp:33
bool unsubscribe(SubscriptionSetId id, const typename Parent::EventType &key)
Definition: subscriber_impl.hpp:132
~SubscriberImpl()
Definition: subscriber_impl.hpp:101
void on_notify(SubscriptionSetId set_id, const typename Parent::EventType &key, Arguments &&... args) override
Definition: subscriber_impl.hpp:177
std::function< void(SubscriptionSetId, ReceiverType &, const typename Parent::EventType &, Arguments &&...)> CallbackFnType
Definition: subscriber_impl.hpp:52
static std::shared_ptr< SubscriberImpl > create(SubscriptionEnginePtr const &ptr, SubscriberConstructorArgs &&... args)
Definition: subscriber_impl.hpp:88
std::weak_ptr< SubscriptionEngineType > SubscriptionEngineWPtr
Definition: subscriber_impl.hpp:46
Receiver ReceiverType
Definition: subscriber_impl.hpp:37