7 #ifndef IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP 8 #define IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP 10 #include <rxcpp/operators/rx-combine_latest.hpp> 24 template <
class Coordination,
class Selector,
class... ObservableN>
26 :
public rxcpp::operators::operator_base<rxcpp::util::value_type_t<
27 rxcpp::operators::detail::combine_latest_traits<Coordination,
35 typedef rxcpp::operators::detail::
36 combine_latest_traits<Coordination, Selector, ObservableN...>
48 values(tuple_source_type o, selector_type s, coordination_type sf)
61 : initial(
std::move(ts),
std::move(s),
std::move(sf)) {}
63 template <
int Index,
class State>
65 typedef typename std::tuple_element<Index,
66 tuple_source_type>::type::value_type
69 rxcpp::composite_subscription innercs;
73 state->out.add(innercs);
75 auto source = on_exception(
77 return state->coordinator.in(std::get<Index>(state->source));
87 auto sink = rxcpp::make_subscriber<source_value_type>(
91 [state](source_value_type st) {
92 auto &value = std::get<Index>(state->latest);
100 if (state->valuesSet ==
sizeof...(ObservableN)) {
101 auto values = rxcpp::util::surely(state->latest);
103 state->out.on_next(selectedResult);
107 [state](std::exception_ptr e) { state->out.on_error(e); },
109 [state]() { state->out.on_completed(); });
110 auto selectedSink = on_exception(
111 [&]() {
return state->coordinator.out(sink); }, state->out);
112 if (selectedSink.empty()) {
115 source->subscribe(std::move(selectedSink.get()));
118 template <
class State,
int... IndexN>
120 rxcpp::util::values<int, IndexN...>)
const {
121 bool subscribed[] = {(subscribe_one<IndexN>(state),
true)...};
122 subscribed[0] = (*subscribed);
125 template <
class Subscriber>
127 static_assert(rxcpp::is_subscriber<Subscriber>::value,
128 "subscribe must be passed a subscriber");
130 typedef Subscriber output_type;
132 struct combine_latest_until_first_completed_state_type
133 :
public std::enable_shared_from_this<
134 combine_latest_until_first_completed_state_type>,
136 combine_latest_until_first_completed_state_type(
values i,
137 coordinator_type coor,
141 coordinator(std::move(coor)),
142 out(std::move(oarg)) {}
144 mutable int valuesSet;
145 mutable tuple_source_value_type latest;
146 coordinator_type coordinator;
151 initial.
coordination.create_coordinator(scbr.get_subscription());
155 std::make_shared<combine_latest_until_first_completed_state_type>(
156 initial, std::move(coordinator), std::move(scbr));
160 typename rxcpp::util::values_from<
int,
161 sizeof...(ObservableN)>::type());
169 class... ObservableN,
170 class Enabled = rxcpp::util::enable_if_all_true_type_t<
171 rxcpp::is_coordination<Coordination>,
172 rxcpp::operators::detail::
173 is_combine_latest_selector<Selector, Observable, ObservableN...>,
174 rxcpp::all_observables<Observable, ObservableN...>>,
175 class ResolvedSelector = rxcpp::util::decay_t<Selector>,
179 rxcpp::util::decay_t<Observable>,
180 rxcpp::util::decay_t<ObservableN>...>,
181 class Value = rxcpp::util::value_type_t<combine_latest>,
182 class Result = rxcpp::observable<Value, combine_latest>>
186 ObservableN &&... on) {
188 combine_latest(std::forward<Coordination>(cn),
189 std::forward<Selector>(s),
190 std::make_tuple(std::forward<Observable>(o),
191 std::forward<ObservableN>(on)...)));
196 #endif // IROHA_COMBINE_LATEST_UNTIL_FIRST_COMPLETED_HPP decltype(auto) constexpr apply(Tuple &&t, F &&f)
apply F to Tuple
Definition: soci_utils.hpp:72
void subscribe_one(std::shared_ptr< State > state) const
Definition: combine_latest_until_first_completed.hpp:64
Definition: result_fwd.hpp:15
values(tuple_source_type o, selector_type s, coordination_type sf)
Definition: combine_latest_until_first_completed.hpp:48
coordination_type coordination
Definition: combine_latest_until_first_completed.hpp:54
traits::selector_type selector_type
Definition: combine_latest_until_first_completed.hpp:42
Definition: result_fwd.hpp:27
void subscribe_all(std::shared_ptr< State > state, rxcpp::util::values< int, IndexN... >) const
Definition: combine_latest_until_first_completed.hpp:119
static Result makeCombineLatestUntilFirstCompleted(Observable &&o, Coordination &&cn, Selector &&s, ObservableN &&... on)
Definition: combine_latest_until_first_completed.hpp:183
rxcpp::operators::detail::combine_latest_traits< Coordination, Selector, ObservableN... > traits
Definition: combine_latest_until_first_completed.hpp:37
Definition: block_query.hpp:15
Definition: combine_latest_until_first_completed.hpp:25
combine_latest_until_first_completed< Coordination, Selector, ObservableN... > this_type
Definition: combine_latest_until_first_completed.hpp:33
combine_latest_until_first_completed(coordination_type sf, selector_type s, tuple_source_type ts)
Definition: combine_latest_until_first_completed.hpp:58
traits::tuple_source_value_type tuple_source_value_type
Definition: combine_latest_until_first_completed.hpp:40
tuple_source_type source
Definition: combine_latest_until_first_completed.hpp:52
coordination_type::coordinator_type coordinator_type
Definition: combine_latest_until_first_completed.hpp:45
values initial
Definition: combine_latest_until_first_completed.hpp:56
void on_subscribe(Subscriber scbr) const
Definition: combine_latest_until_first_completed.hpp:126
traits::tuple_source_type tuple_source_type
Definition: combine_latest_until_first_completed.hpp:39
Definition: combine_latest_until_first_completed.hpp:47
selector_type selector
Definition: combine_latest_until_first_completed.hpp:53
traits::coordination_type coordination_type
Definition: combine_latest_until_first_completed.hpp:44