7 #ifndef IROHA_TIMEOUT_HPP 8 #define IROHA_TIMEOUT_HPP 10 #include <rxcpp/operators/rx-timeout.hpp> 27 template <
class T,
class Selector,
class Coordination>
44 : initial(
std::move(s), coordination) {}
46 template <
class Subscriber>
61 coordinator(
std::move(c)),
62 worker(coordinator.get_worker()),
65 rxcpp::composite_subscription
cs;
71 typedef std::shared_ptr<timeout_subscriber_values>
state_type;
80 std::move(cs),
std::move(d), v,
std::move(c)))) {
81 auto localState = state;
83 auto disposer = [=](
const rxcpp::schedulers::schedulable &) {
84 localState->cs.unsubscribe();
85 localState->dest.unsubscribe();
86 localState->worker.unsubscribe();
88 auto selectedDisposer = on_exception(
89 [&]() {
return localState->coordinator.act(disposer); },
91 if (selectedDisposer.empty()) {
96 [=]() { localState->worker.schedule(selectedDisposer.get()); });
98 [=]() { localState->worker.schedule(selectedDisposer.get()); });
101 static std::function<void(const rxcpp::schedulers::schedulable &)>
103 auto produce = [id, state](
const rxcpp::schedulers::schedulable &) {
104 if (
id != state->index)
107 state->dest.on_error(std::make_exception_ptr(
108 rxcpp::timeout_error(
"timeout has occurred")));
111 auto selectedProduce = on_exception(
112 [&]() {
return state->coordinator.act(produce); }, state->dest);
113 if (selectedProduce.empty()) {
114 return std::function<void(const rxcpp::schedulers::schedulable &)>();
117 return std::function<void(const rxcpp::schedulers::schedulable &)>(
118 selectedProduce.get());
121 template <
class Value>
123 auto localState = state;
125 auto selected = on_exception(
126 [&]() {
return localState->selector(std::forward<Value>(v)); },
128 if (selected.empty()) {
132 auto work = [v, localState, period = std::move(selected.get())](
133 const rxcpp::schedulers::schedulable &) {
134 auto new_id = ++localState->index;
135 auto produce_time = localState->worker.now() + period;
137 localState->dest.on_next(v);
138 localState->worker.schedule(produce_time,
139 produce_timeout(new_id, localState));
142 on_exception([&]() {
return localState->coordinator.act(work); },
144 if (selectedWork.empty()) {
147 localState->worker.schedule(selectedWork.get());
151 auto localState = state;
152 auto work = [e, localState](
const rxcpp::schedulers::schedulable &) {
153 localState->dest.on_error(e);
156 on_exception([&]() {
return localState->coordinator.act(work); },
158 if (selectedWork.empty()) {
161 localState->worker.schedule(selectedWork.get());
165 auto localState = state;
166 auto work = [localState](
const rxcpp::schedulers::schedulable &) {
167 localState->dest.on_completed();
170 on_exception([&]() {
return localState->coordinator.act(work); },
172 if (selectedWork.empty()) {
175 localState->worker.schedule(selectedWork.get());
178 static rxcpp::subscriber<T, observer_type>
make(dest_type d,
180 auto cs = rxcpp::composite_subscription();
183 return rxcpp::make_subscriber<T>(
185 observer_type(this_type(
186 cs, std::move(d), std::move(v), std::move(coordinator))));
190 template <
class Subscriber>
201 typename Coordination,
202 class ResolvedSelector = rxcpp::util::decay_t<Selector>,
203 class Duration = decltype(
204 std::declval<ResolvedSelector>()((std::declval<std::decay_t<T>>()))),
205 class Enabled = rxcpp::util::enable_if_all_true_type_t<
206 rxcpp::is_coordination<Coordination>,
207 rxcpp::util::is_duration<Duration>>,
211 return Timeout(std::forward<Selector>(s), std::forward<Coordination>(cn));
216 #endif // IROHA_TIMEOUT_HPP Definition: result_fwd.hpp:15
coordination_type::coordinator_type coordinator_type
Definition: timeout.hpp:31
coordinator_type coordinator
Definition: timeout.hpp:67
static rxcpp::subscriber< T, observer_type > make(dest_type d, timeout_values v)
Definition: timeout.hpp:178
static std::function< void(const rxcpp::schedulers::schedulable &)> produce_timeout(std::size_t id, state_type state)
Definition: timeout.hpp:102
rxcpp::util::decay_t< Coordination > coordination_type
Definition: timeout.hpp:30
coordination_type coordination
Definition: timeout.hpp:39
timeout_values initial
Definition: timeout.hpp:41
timeout_values(select_type s, coordination_type c)
Definition: timeout.hpp:35
Definition: timeout.hpp:53
select_type selector
Definition: timeout.hpp:38
rxcpp::util::decay_t< T > source_value_type
Definition: timeout.hpp:29
timeout_subscriber_values(rxcpp::composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
Definition: timeout.hpp:54
timeout(select_type s, coordination_type coordination)
Definition: timeout.hpp:43
void on_next(Value &&v) const
Definition: timeout.hpp:122
timeout_observer(rxcpp::composite_subscription cs, dest_type d, timeout_values v, coordinator_type c)
Definition: timeout.hpp:74
rxcpp::observer< T, this_type > observer_type
Definition: timeout.hpp:51
std::shared_ptr< timeout_subscriber_values > state_type
Definition: timeout.hpp:71
auto operator()(Subscriber dest) const -> decltype(timeout_observer< Subscriber >::make(std::move(dest), initial))
Definition: timeout.hpp:191
void on_error(std::exception_ptr e) const
Definition: timeout.hpp:150
Definition: block_query.hpp:15
state_type state
Definition: timeout.hpp:72
Definition: timeout.hpp:28
rxcpp::util::decay_t< T > value_type
Definition: timeout.hpp:49
static auto makeTimeout(Selector &&s, Coordination &&cn)
Definition: timeout.hpp:210
dest_type dest
Definition: timeout.hpp:66
Definition: timeout.hpp:34
std::size_t index
Definition: timeout.hpp:69
rxcpp::util::decay_t< Selector > select_type
Definition: timeout.hpp:32
rxcpp::schedulers::worker worker
Definition: timeout.hpp:68
rxcpp::util::decay_t< Subscriber > dest_type
Definition: timeout.hpp:50
Definition: timeout.hpp:47
void on_completed() const
Definition: timeout.hpp:164
timeout_observer< Subscriber > this_type
Definition: timeout.hpp:48
rxcpp::composite_subscription cs
Definition: timeout.hpp:65