こんにちは。 pregu_foxです。
今回は、ForkJoinオペレータとZipオペレータについて調べました
前回までの記事は以下をご覧ください。
- 【Dart】rxdartについて調べてみる 第1回目 -概要-【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第2回目 -ObservableとSubject系クラス- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第3回目 -concatとconcatEager- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第4回目 -Defer, Merge- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第5回目 -Periodic, Never- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第6回目 -Race, Retry- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第7回目 -RetryWhen, SequenceEqual- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第8回目 -SwitchLatest, CombineLatest- 【rxdart】 - 狐好きぷろぐらまー
目次です。
動作環境
動作環境は以下の通りです。
項目 | バージョン |
---|---|
Dart | 2.5.1 |
rxdart | 0.22.2 |
以下にForkJoinオペレータとZipオペレータについて記載します。 オペレータの名称は、rxdart、Stream APIの順で記載します。
ForkJoinオペレータ
- Rx: なし(Zipオペレータの派生)
- rxdart: Observable<T>.ForkJoin2静的メソッド
- Stream API: ForkJoinStream<T, R>クラス
シグネチャ
rxdart
Observable<T> Observable.forkJoin2<A, B, T>(Stream<A> streamA, Stream<B> streamB, T combiner(A a, B b))
※forkJoin静的メソッドは一度にまとめるstreamの数でメソッド名が変化します。
forkJoinからforkJoin9まで存在します。(メソッドの末尾の数字が、発行時にまとめるstreamの数です)
以下に各apiのreferenceサイトを記載します。- forkJoin2
- forkjoin3
- forkjoin4
- forkjoin5
- forkjoin6
- forkjoin7
- forkjoin8
- forkjoin9
また、使用方法は違いますが、動的な数のstreamの最新のデータだけを発行するリスト型のstreamを作成したい場合は、forkJoinList
Stream API
ForkJoinStream<dynamic, R> ForkJoinStream.combine2<A, B, R>(Stream<A> streamOne, Stream<B> streamTwo, R combiner(A a, B b))
※ForkJoinStream.combine静的メソッドもrxdartと同じく一度にまとめるstreamの数でメソッド名が変化します。
combine2からcombine9まで存在します。(メソッドの末尾の数字が、発行時にまとめるstreamの数です。) combineはForkJoinStreamクラスのコンストラクタで代用可能です。- ForkJoinStream.combine2
- ForkJoinStream.combine3
- ForkJoinStream.combine4
- ForkJoinStream.combine5
- ForkJoinStream.combine6
- ForkJoinStream.combine7
- ForkJoinStream.combine8
- ForkJoinStream.combine9
また、こちらも動的な数のstreamの各々の最新データだけを発行するリスト型のstreamを作成したい場合は、ForkJoinStream.list
処理の概要
このオペレータは複数のstreamをまとめて購読し、すべてのstreamから値が発行された時に、指定した関数に従って1つのstreamにまとめます。
forkJoinList()やForkJoinStream.list()は、動的な数のstreamをまとめる時に役に立ちます。
以下にサンプルを示します。
import 'package:rxdart/rxdart.dart'; /// rxdartのForkJoinオペレータ void studyForkJoinTwo() { Observable.forkJoin2( Observable.just(1), Observable.fromIterable([0, 2, 4]), (a, b) => a + b, ).listen( print, onDone: () => print('done'), ); } // 実行結果 // 5 // done /// rxdartのForkJoinオペレータその2 void studyForkJoinTwo2() async { PublishSubject<int> ps1 = PublishSubject<int>(); PublishSubject<int> ps2 = PublishSubject<int>(); Observable.forkJoin2<int, int, int>( ps1.stream, ps2.stream, (ps1, ps2) { print('ps1: $ps1 + ps2: $ps2 = ${ps1 + ps2}'); return ps1 + ps2; }, ).listen( (val) => print('listen: $val'), onDone: () => print('done.'), ); print('ps1 onNext -- 1'); await ps1.add(1); print('ps2 onNext -- 11'); await ps2.add(11); print('ps2 onNext -- 12'); await ps2.add(12); print('ps2 onNext -- 13'); await ps2.add(13); print('ps1 onNext -- 2'); await ps1.add(2); await ps1.close(); await ps2.close(); } // 実行結果 // ps1 onNext -- 1 // ps2 onNext -- 11 // ps2 onNext -- 12 // ps2 onNext -- 13 // ps1 onNext -- 2 // ps1: 2 + ps2: 13 = 15 // listen: 15 // done. /// Stream APIのForkJoinオペレータ void studyForkJoinTwoStream() { ForkJoinStream.combine2( Stream.value(1), Stream.fromIterable([0, 2, 4]), (a, b) => a + b, ).listen( print, onDone: () => print('done.'), ); } // 実行結果 // 5 // done
サンプルコードの実行結果から、最終的なstreamの値のみ発行されていることが分かります。
Zipオペレータ
- Rx: Zipオペレータ
- rxdart: Observable.zip2<A, B, T>静的メソッド
- Stream API: ZipStream<T, R>クラス
シグネチャ
rxdart
Observable<T> Observable.zip2<A, B, T>(Stream<A> streamA, Stream<B> streamB, T zipper(A a, B b))
※forkJoinと同様にzip2静的メソッドもstreamをまとめる数でメソッド名が変化します。- zip2
- zip3
- zip4
- zip5
- zip6
- zip7
- zip8
- zip9
動的にstreamの数を変化させたい場合は、zipListを利用すると良いでしょう。
Stream API
ZipStream<dynamic, R> ZipStream.zip2<A, B, R>(Stream<A> streamOne, Stream<B> streamTwo, R zipper(A a, B b))
※こちらもForkJOinStream.combineと同じ様に、引数の数によって変化するzip静的メソッドが存在します。- zip2
- zip3
- zip4
- zip5
- zip6
- zip7
- zip8
- zip9
動的に引数のstreamの数を変化させたい場合は、ZipStream.list
処理の概要
引数のstreamが全て作成された段階で、引数のzipper関数を使用してまとめた1つのstreamを発行します。
forkJoinオペレータに動作が似ていますが、違う箇所は、forkJoinオペレータは最後のstreamのみ発行しますが、zipオペレータは全てのstreamが1以上作成されている場合にzipper関数でまとめた1つのstreamを発行します。その為、引数の全てのstreamに5つの値が作成された場合、5つのstreamが発行されます。
以下にサンプルを示します。
import 'package:rxdart/rxdart.dart'; /// rxdartのZipオペレータ void studyZipTwo() async { PublishSubject<int> ps1 = PublishSubject<int>(); PublishSubject<int> ps2 = PublishSubject<int>(); Observable.zip2( ps1.stream, ps2.stream, (ps1, ps2) { print('ps1: $ps1 + ps2: $ps2 = ${ps1 + ps2}'); return ps1 + ps2; }, ).listen( (val) => print('listen: $val'), onDone: () => print('done.'), ); print('ps1 onNext -- 1'); await ps1.add(1); print('ps2 onNext -- 11'); await ps2.add(11); print('ps2 onNext -- 12'); await ps2.add(12); print('ps2 onNext -- 13'); await ps2.add(13); print('ps1 onNext -- 2'); await ps1.add(2); await ps1.close(); await ps2.close(); } // 実行結果 // ps1 onNext -- 1 // ps2 onNext -- 11 // ps1: 1 + ps2: 11 = 12 // listen: 12 // ps2 onNext -- 12 // ps2 onNext -- 13 // ps1 onNext -- 2 // ps1: 2 + ps2: 12 = 14 // listen: 14 // done. /// rxdartのzipListオペレータ void studyZipList() { Observable.zipList([ Observable.fromIterable(['a']), Observable.fromIterable(['b']), Observable.fromIterable(['c', 'd', 'e']), ]).listen( print, onDone: () => print('done'), ); } // 実行結果 // [a, b, c] // done /// Stream APIのZipオペレータ void studyZipTwoStream() async { StreamController<int> ps1 = StreamController<int>(); StreamController<int> ps2 = StreamController<int>(); ZipStream.zip2( ps1.stream, ps2.stream, (ps1, ps2) { print('ps1: $ps1 + ps2: $ps2 = ${ps1 + ps2}'); return ps1 + ps2; }, ).listen( (val) => print('listen: $val'), onDone: () => print('done.'), ); print('ps1 onNext -- 1'); await ps1.add(1); print('ps2 onNext -- 11'); await ps2.add(11); print('ps2 onNext -- 12'); await ps2.add(12); print('ps2 onNext -- 13'); await ps2.add(13); print('ps1 onNext -- 2'); await ps1.add(2); await ps1.close(); await ps2.close(); } // 実行結果 // ps1 onNext -- 1 // ps2 onNext -- 11 // ps1: 1 + ps2: 11 = 12 // listen: 12 // ps2 onNext -- 12 // ps2 onNext -- 13 // ps1 onNext -- 2 // ps1: 2 + ps2: 12 = 14 // listen: 14 // done.
サンプルの実行結果から、引数の各streamが作成されたタイミングでstreamが発行されていることが分かります。
読んで頂き、ありがとうございました。