こんにちは。 pregum_foxです。
今回は、SwitchLatestオペレータとCombineLatestオペレータについて調べました。
前回までの記事は以下をご覧ください。
- 【Dart】rxdartについて調べてみる 第1回目 -概要-【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第2回目 -ObservableとSubject系クラス- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第3回目 -concatとconcatEager- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第4回目 -Defer, Merge- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第5回目 -Periodic, Never- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第6回目 -Race, Retry- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第7回目 -RetryWhen, SequenceEqual- 【rxdart】 - 狐好きぷろぐらまー
動作環境
動作環境は以下の通りです。
項目 | バージョン |
---|---|
Dart | 2.5.1 |
rxdart | 0.22.2 |
以下にSwitchLatestオペレータとCombineLatestオペレータについて記載します。 オペレータの名称はRx、rxdart、Stream APIの順で記載します。
SwitchLatestオペレータ
- Rx: なし(Switchオペレータの派生)
- rxdart: Observable<T>.switchLatestファクトリコンストラクタ
- Stream API: SwitchLatestStream<T>クラス
RxにあるコアのオペレータにはSwitchのみ存在していて、SwitchLatestオペレータはプログラミング言語によって実装されていたりいなかったりするみたいです。
シグネチャ
rxdart
Observable<T> Observable<T>.switchLatest(Stream<Stream<T>> streams)
Stream API
SwitchLatest<T> SwitchLatest<T>(Stream<Stream<T>> streams)
処理の概要
streamを発行するstreamを、それらのstreamの中で最も最近(遅く)発行された値を発行するstreamに変換します。
分かりやすい利用する状況は、インクリメンタルサーチ(逐次検索)だと思います。
googleでフォーと打つとフォートナイトやフォーエバー21等の候補が出る機能を想像されるとわかりやすいかと思います。
フォまで打った時の検索候補を取得している間に長音符"ー"が入力された場合、"フォー"での検索候補のみ欲しいのでフォで取得した検索候補は購読しないようにしたい時、役に立つかもしれません。
似たようなオペレータでswitchMap/SwitchMapStreamTransformerがありますのでそちらも調べてみてください。(いつか記事が書けると思います)
以下にサンプルを示します。
import 'package:rxdart/rxdart.dart'; /// rxdartのSwitchLatestオペレータサンプル void studySwitchLatest() { Observable<String>.switchLatest( Observable.fromIterable([ Observable.timer('1', Duration(seconds: 1)), Observable.just('2'), Observable.timer('3', Duration(milliseconds: 1500)), Observable.timer('4', Duration(seconds: 1)), ]), ).listen(print, onDone: () => print('done.')); } // 実行結果 // 2 // 4 // done. /// rxdartのSwitchLatestオペレータサンプルその2 void studySwitchLatest2() async { final subject = PublishSubject<int>(); subject.stream .switchMap( (i) => Observable.periodic( Duration(milliseconds: 1000), (val) => (val + 1) * i).take(3), ) .listen((i) => print('${DateTime.now()} -- $i'), onDone: () => print('done.')); print('start'); print('onNext 1'); subject.add(1); await Future.delayed(Duration(milliseconds: 2000)); print('onNext 2'); subject.add(2); await Future.delayed(Duration(milliseconds: 2000)); print('onNext 3'); subject.add(3); await subject.close(); } // 実行結果 // start // onNext 1 // 2019-10-23 21:23:17.161855 -- 1 // onNext 2 // 2019-10-23 21:23:19.168071 -- 2 // onNext 3 // 2019-10-23 21:23:21.168319 -- 3 // 2019-10-23 21:23:22.168026 -- 6 // 2019-10-23 21:23:23.167304 -- 9 // done. /// Stream APIのSwitchLatestオペレータサンプル void studySwitchLatestStream() { SwitchLatestStream<String>( Stream.fromIterable([ TimerStream('1', Duration(seconds: 1)), Stream.value('2'), TimerStream('3', Duration(milliseconds: 1500)), TimerStream('4', Duration(seconds: 1)), ]), ).listen(print, onDone: () => print('done.')); } // 実行結果 // 2 // 4 // done.
サンプルの実行結果から、値が発行されるまでの間に次の値が発行された場合、発行待ちの値が発行されないままであることが確認できます。
CombineLatestオペレータ
- Rx: CombineLatestオペレータ
- rxdart: Observable<T>l;.combineLatest静的メソッド
- Stream API: CombineLatestStream<T, R>クラス
シグネチャ
- rxdart
Observable<T> Observable<T>.combineLatest2<A,B,T>(Stream<A> streamA, Stream<B> streamB, T combiner(A a, B b))
※ObservableLatest2の場合
- Stream API
CombineLatestStream<T, R>(Iterable<Stream<T>> streams R combiner(List<T> values))
処理の概要
指定したstreamの数を購読し、指定したstreamの値を購読したら、1つのstreamにマージする関数を使用して、1つのstreamにまとめた後、発行します。
以下にサンプルを示します。
import 'package:rxdart/rxdart.dart'; /// rxdartのCombineLatestオペレータサンプル void studyCombineLatest() { Observable.combineLatest2( Observable.just('hello'), Observable.fromIterable(['mike', 'jon']), (a, b) => '$a $b', ).listen(print, onDone: () => print('done.')); } // 実行結果 // hello mike // hello jon // done. /// Stream APIのCombineLatestオペレータサンプル void studyCombineLatestStream() { CombineLatestStream.combine2( Stream.value('hello'), Stream.fromIterable(['mike', 'jon']), (a, b) => '$a $b', ).listen(print, onDone: () => print('done.')); } // 実行結果 // hello mike // hello jon // done. void studyCombineLatest2() async { PublishSubject sb1 = PublishSubject(); PublishSubject sb2 = PublishSubject(); Observable.combineLatest2(sb1.stream, sb2.stream, (a, b) => 'a:$a b:$b') .listen(print, onDone: () => print('done.')); print('sb1 onNext 1'); await sb1.add(1); print('sb2 onNext 10'); await sb2.add(10); print('sb1 onNext 2'); await sb1.add(2); print('sb1 onNext 3'); await sb1.add(3); print('sb2 onNext 20'); sb2.add(20); } // 実行結果 // sb1 onNext 1 // sb2 onNext 10 // a:1 b:10 // sb1 onNext 2 // a:2 b:10 // sb1 onNext 3 // a:3 b:10 // sb2 onNext 20 // a:3 b:20 /// CombineLatest4のサンプル void studyCombineLatest3() { Observable.combineLatest4( Observable.just(1), Observable.just(10), Observable.just(100), Observable.fromIterable([3, 33, 333]), (a, b, c, d) => a + b + c + d).listen(print, onDone: () => print('done.')); } // 実行結果 // 114 // 144 // 444 // done.
サンプルの実行結果から、複数のObservableな値を最後の引数で記述した内容で処理されたstreamが出力されていることが分かります。
ここでは、2つまたは4つのObservableをまとめたサンプルのみ書いていますが、最大9つまで同時にObservableな値を処理することができます。
読んで頂き、ありがとうございました。
参考サイト
- ReactiveX - Switch operator
- https://pub.dev/documentation/rxdart/latest/rx/Observable/Observable.switchLatest.html
- ReactiveX - CombineLatest operator
- SwitchLatestStream class - rx library - Dart API
- https://pub.dev/documentation/rxdart/latest/rx/Observable/combineLatest2.html
- CombineLatestStream class - rx library - Dart API