hyperledger/iroha
Iroha - A simple, decentralized ledger http://iroha.tech
subscription_engine.hpp
Go to the documentation of this file.
1 
6 #ifndef IROHA_SUBSCRIPTION_SUBSCRIPTION_ENGINE_HPP
7 #define IROHA_SUBSCRIPTION_SUBSCRIPTION_ENGINE_HPP
8 
9 #include <assert.h>
10 #include <list>
11 #include <memory>
12 #include <shared_mutex>
13 #include <unordered_map>
14 
15 #include "common/common.hpp"
18 
19 namespace iroha::subscription {
20 
21  struct IDisposable {
22  virtual void dispose() = 0;
23  };
24 
32  template <typename EventKey, typename Dispatcher, typename Receiver>
33  class SubscriptionEngine final
34  : public IDisposable,
35  public std::enable_shared_from_this<
36  SubscriptionEngine<EventKey, Dispatcher, Receiver>>,
39  public:
40  using EventKeyType = EventKey;
41  using ReceiverType = Receiver;
42  using SubscriberType = Receiver;
43  using SubscriberWeakPtr = std::weak_ptr<SubscriberType>;
44  using DispatcherType = typename std::decay<Dispatcher>::type;
45  using DispatcherPtr = std::shared_ptr<DispatcherType>;
46 
50  using SubscribersContainer = std::list<std::tuple<typename Dispatcher::Tid,
53  using IteratorType = typename SubscribersContainer::iterator;
54 
55  public:
56  explicit SubscriptionEngine(DispatcherPtr const &dispatcher)
57  : dispatcher_(dispatcher) {
58  assert(dispatcher_);
59  }
60  ~SubscriptionEngine() = default;
61 
62  void dispose() override {
63  dispatcher_.reset();
64  }
65 
66  private:
68  struct SubscriptionContext final {
69  std::mutex subscribers_list_cs;
70  SubscribersContainer subscribers_list;
71  };
72  using KeyValueContainer =
73  std::unordered_map<EventKeyType, SubscriptionContext>;
74 
75  mutable std::shared_mutex subscribers_map_cs_;
76 
78  KeyValueContainer subscribers_map_;
79 
81  DispatcherPtr dispatcher_;
82 
83  public:
95  IteratorType subscribe(typename Dispatcher::Tid tid,
96  SubscriptionSetId set_id,
97  const EventKeyType &key,
98  SubscriberWeakPtr ptr) {
99  std::unique_lock lock(subscribers_map_cs_);
100  auto &subscribers_context = subscribers_map_[key];
101 
102  std::lock_guard l(subscribers_context.subscribers_list_cs);
103  return subscribers_context.subscribers_list.emplace(
104  subscribers_context.subscribers_list.end(),
105  std::make_tuple(tid, set_id, std::move(ptr)));
106  }
107 
113  void unsubscribe(const EventKeyType &key, const IteratorType &it_remove) {
114  std::unique_lock lock(subscribers_map_cs_);
115  auto it = subscribers_map_.find(key);
116  if (subscribers_map_.end() != it) {
117  auto &subscribers_context = it->second;
118  std::lock_guard l(subscribers_context.subscribers_list_cs);
119  subscribers_context.subscribers_list.erase(it_remove);
120  if (subscribers_context.subscribers_list.empty())
121  subscribers_map_.erase(it);
122  }
123  }
124 
130  size_t size(const EventKeyType &key) const {
131  std::shared_lock lock(subscribers_map_cs_);
132  if (auto it = subscribers_map_.find(key); it != subscribers_map_.end()) {
133  auto &subscribers_context = it->second;
134  std::lock_guard l(subscribers_context.subscribers_list_cs);
135  return subscribers_context.subscribers_list.size();
136  }
137  return 0ull;
138  }
139 
144  size_t size() const {
145  std::shared_lock lock(subscribers_map_cs_);
146  size_t count = 0ull;
147  for (auto &it : subscribers_map_) {
148  auto &subscribers_context = it->second;
149  std::lock_guard l(subscribers_context.subscribers_list_cs);
150  count += subscribers_context.subscribers_list.size();
151  }
152  return count;
153  }
154 
161  template <typename... EventParams>
162  void notify(const EventKeyType &key, EventParams const &... args) {
163  notifyDelayed(std::chrono::microseconds(0ull), key, args...);
164  }
165 
173  template <typename... EventParams>
174  void notifyDelayed(std::chrono::microseconds timeout,
175  const EventKeyType &key,
176  EventParams const &... args) {
177  auto dispatcher = dispatcher_;
178  if (!dispatcher)
179  return;
180 
181  std::shared_lock lock(subscribers_map_cs_);
182  auto it = subscribers_map_.find(key);
183  if (subscribers_map_.end() == it)
184  return;
185 
186  auto &subscribers_container = it->second;
187  std::lock_guard l(subscribers_container.subscribers_list_cs);
188  for (auto it_sub = subscribers_container.subscribers_list.begin();
189  it_sub != subscribers_container.subscribers_list.end();) {
190  auto wsub = std::get<2>(*it_sub);
191  auto id = std::get<1>(*it_sub);
192 
193  if (!wsub.expired()) {
194  dispatcher->addDelayed(std::get<0>(*it_sub),
195  timeout,
196  [wsub(std::move(wsub)),
197  id(id),
198  key(key),
199  args = std::make_tuple(args...)]() mutable {
200  if (auto sub = wsub.lock())
201  std::apply(
202  [&](auto &&... args) {
203  sub->on_notify(
204  id, key, std::move(args)...);
205  },
206  std::move(args));
207  });
208  ++it_sub;
209  } else {
210  it_sub = subscribers_container.subscribers_list.erase(it_sub);
211  }
212  }
213  }
214  };
215 
216 } // namespace iroha::subscription
217 
218 #endif // IROHA_SUBSCRIPTION_SUBSCRIPTION_ENGINE_HPP
Receiver SubscriberType
Definition: subscription_engine.hpp:42
typename SubscribersContainer::iterator IteratorType
Definition: subscription_engine.hpp:53
size_t size(const EventKeyType &key) const
Definition: subscription_engine.hpp:130
size_t size() const
Definition: subscription_engine.hpp:144
void unsubscribe(const EventKeyType &key, const IteratorType &it_remove)
Definition: subscription_engine.hpp:113
uint32_t SubscriptionSetId
Definition: subscriber.hpp:18
std::weak_ptr< SubscriberType > SubscriberWeakPtr
Definition: subscription_engine.hpp:43
uint32_t Tid
Definition: dispatcher.hpp:17
SubscriptionEngine(DispatcherPtr const &dispatcher)
Definition: subscription_engine.hpp:56
Definition: common.hpp:26
Definition: subscription_fwd.hpp:60
std::list< std::tuple< typename Dispatcher::Tid, SubscriptionSetId, SubscriberWeakPtr > > SubscribersContainer
Definition: subscription_engine.hpp:52
Definition: subscription_engine.hpp:33
void dispose() override
Definition: subscription_engine.hpp:62
std::shared_ptr< DispatcherType > DispatcherPtr
Definition: subscription_engine.hpp:45
Receiver ReceiverType
Definition: subscription_engine.hpp:41
typename std::decay< Dispatcher >::type DispatcherType
Definition: subscription_engine.hpp:44
Definition: subscription_engine.hpp:21
EventKey EventKeyType
Definition: subscription_engine.hpp:40
void notifyDelayed(std::chrono::microseconds timeout, const EventKeyType &key, EventParams const &... args)
Definition: subscription_engine.hpp:174
Definition: common.hpp:32
IteratorType subscribe(typename Dispatcher::Tid tid, SubscriptionSetId set_id, const EventKeyType &key, SubscriberWeakPtr ptr)
Definition: subscription_engine.hpp:95
void notify(const EventKeyType &key, EventParams const &... args)
Definition: subscription_engine.hpp:162