こんにちは。 pregum_foxです。
今回は、RaceオペレータとRetryオペレータについて調べていきます。
目次です。
- 動作環境
- Ampオペレータ (rxdart: Rx.race<T>拡張メソッド / Stream API: RaceStream<T>クラス)
- Retryオペレータ (rxdart: Rx.retry<T>拡張メソッド / Stream API: RetryStream<T>クラス)
- 雑感
- 参考サイト
前回までの記事です。
- 【0.23.0対応】rxdartについて調べてみる 第2回目 -ObservableとSubject系クラス- 【rxdart】 - 狐好きぷろぐらまー
- 【0.23.0対応】rxdartについて調べてみる 第3回目 -concatとconcatEager-【rxdart】 - 狐好きぷろぐらまー
- 【0.23.0対応】rxdartについて調べてみる 第4回目 -Defer, Merge- 【rxdart】 - 狐好きぷろぐらまー
- 【0.23.0対応】rxdartについて調べてみる 第5回目 -Periodic, Never- 【rxdart】 - 狐好きぷろぐらまー
動作環境
動作環境は以下の通りです。
項目 | バージョン |
---|---|
Dart | 2.6.1 |
rxdart | 0.24.0 |
この記事はrxdartが0.22.x以前のバージョンで書いていたこの記事を0.23.0以降のバージョンに対応した記事です。
0.22.xから0.23.xにバージョンが上がったタイミングの変更内容は以下のサイトをご覧ください。
以下にRaceオペレータとRetryオペレータについて記載します。 オペレータの名称はRx、rxdart、Stream APIの順で記載します。
Ampオペレータ (rxdart: Rx.race<T>拡張メソッド / Stream API: RaceStream<T>クラス)
Rxでは、Ampという名称ですがrxdartではRaceというオペレータが同じ動きをします。他の言語ではほとんどAmbですがPHP(RxPHP)がRaceという名前で定義されています。
個人的には、できれば統一してほしいなぁと思います。。
シグネチャ
rxdart
Stream<T> race(Iterable<Stream<T>> streams)
Stream API
RaceStream<T> RaceStream<T>(Iterable<Stream<T>> streams)
処理の概要
引数のStreamの中から一番最初に作成されたStreamを発行します。
それ以外のStreamは発行しません。
以下にサンプルを示します。
/// rxdartのRace(Amb)オペレータ void studyRace() { // 複数のStreamの内一番最初に作成されたStreamを発行し // それ以外のStreamは発行しません Rx.race([ Rx.timer(1, Duration(seconds: 2)), Rx.timer(2, Duration(seconds: 1)), Rx.timer(3, Duration(seconds: 3)), ]).listen(print, onDone: () => print('done.')); } // 実行結果 // 2 // done. /// Stream APIのRace(Amb)オペレータ void studyRaceStream() { RaceStream<int>([ TimerStream<int>(1, Duration(seconds: 2)), TimerStream<int>(2, Duration(seconds: 1)), TimerStream<int>(3, Duration(seconds: 3)), ]).listen(print, onDone: () => print('done.')); } // 実行結果 // 2 // done.
サンプルの実行結果から、2つ目のStreamが1番最初に作成される為、2という値が発行されています。
その後、他のStreamが発行されることはなく Streamが購読完了処理が行われています。
Retryオペレータ (rxdart: Rx.retry<T>拡張メソッド / Stream API: RetryStream<T>クラス)
シグネチャ
rxdart
Stream<T> retry(Stream<T> streamFactory(), [int count])
Stream API
RetryStream<T> RetryStream<T>(Stream<T> streamFactory(), [int count])
処理の概要
Streamが正常に終了するまで、指定された回数リトライするStreamを作成します。
回数が指定されていない場合は、無制限にリトライします。
指定された回数リトライしてもStreamが正常に終了していない場合、RetryErrorが発行されます。
RetryErrorから発行された値には、エラー情報とStackTraceが含まれます。
以下にサンプルを示します。
/// rxdartのRetryオペレータサンプル void studyRetry() async { // 正常に値が処理された場合は、そのまま完了処理が行われる。 Rx.retry(() => Stream.value(1)) .listen(print, onDone: () => print('done.')); // 間をあけるため、少し待つ await Future.delayed(Duration(milliseconds: 500)); print('-----'); // 指定された回数失敗した時のStreamでRetryErrorが発行される。 // 今回は2を指定しているため、3回目の失敗でRetryErrorが発行される。 var val = 1; Rx.retry( () =>Stream.value(val++) .concatWith([Stream.error(Error())]), 2) .listen((x) => print('listen: $x'), onDone: () => print('done.'), onError: (e, s) => print('error: $e'), cancelOnError: true); } // 実行結果 // 1 // done. // ----- // listen: 1 // listen: 2 // listen: 3 // error: Received an error after attempting 2 retries /// Stream APIのRetryオペレータサンプル void studyRetryStream() async { RetryStream<int>(() => Stream<int>.value(1)) .listen(print, onDone: () => print('done.')); await Future.delayed(Duration(milliseconds: 500)); print('-----'); var val = 1; RetryStream<int>( () => ConcatStream( [Stream<int>.value(val++), Stream.error(Error())]), 2) .listen( (x) => print('listen: $x'), onDone: () => print('done.'), onError: (e, s) => print('error: $e'), cancelOnError: true, ); } // 実行結果 // 1 // done. // ----- // listen: 1 // listen: 2 // listen: 3 // error: Received an error after attempting 2 retries
サンプルの実行結果から、指定された回数を超えた段階でStreamが正常に終了していない場合は、RetryErrorが発行されていることが分かります。
また、リトライ処理時に再度発行するStreamが新しく作成されていることが分かります。
API処理や外部通信のリトライ処理に役に立ちそうです。
雑感
コレクション(Iterable)にはないRx特有の名前が沢山あり、なかなか調べるのが大変ですが、この調子で書いていきたいと思います。
また新しくオペレータを知った時に、元々知っているオペレータと組み合わせればいい使い方ができる等組み合わせて操作できる幅が徐々に広がっていることがモチベーションになっています。
ただ、今はただひたすらrxdartの記載順に調べていっているので、余力があれば関連のあるオペレータをまとめていけたらなと思います。
ここまで読んで頂き、ありがとうございました。
ここで書いたサンプルコードは以下のリポジトリにあります。