狐好きぷろぐらまー

狐好きプログラマーのブログです。

【0.23.0対応】rxdartについて調べてみる 第6回目 -Race, Retry- 【rxdart】

こんにちは。 pregum_foxです。

今回は、RaceオペレータとRetryオペレータについて調べていきます。

目次です。

前回までの記事です。

動作環境

動作環境は以下の通りです。

項目 バージョン
Dart 2.6.1
rxdart 0.24.0

この記事はrxdartが0.22.x以前のバージョンで書いていたこの記事を0.23.0以降のバージョンに対応した記事です。

0.22.xから0.23.xにバージョンが上がったタイミングの変更内容は以下のサイトをご覧ください。

以下にRaceオペレータとRetryオペレータについて記載します。 オペレータの名称はRx、rxdart、Stream APIの順で記載します。

Ampオペレータ (rxdart: Rx.race<T>拡張メソッド / Stream API: RaceStream<T>クラス)

Rxでは、Ampという名称ですがrxdartではRaceというオペレータが同じ動きをします。他の言語ではほとんどAmbですがPHP(RxPHP)がRaceという名前で定義されています。
個人的には、できれば統一してほしいなぁと思います。。

シグネチャ

  • rxdart
    Stream<T> race(Iterable<Stream<T>> streams)

  • Stream API
    RaceStream<T> RaceStream<T>(Iterable<Stream<T>> streams)

処理の概要

引数のStreamの中から一番最初に作成されたStreamを発行します。
それ以外のStreamは発行しません。
以下にサンプルを示します。

/// rxdartのRace(Amb)オペレータ
void studyRace() {
  // 複数のStreamの内一番最初に作成されたStreamを発行し
  // それ以外のStreamは発行しません
  Rx.race([
    Rx.timer(1, Duration(seconds: 2)),
    Rx.timer(2, Duration(seconds: 1)),
    Rx.timer(3, Duration(seconds: 3)),
  ]).listen(print, onDone: () => print('done.'));
}
// 実行結果
// 2
// done.

/// Stream APIのRace(Amb)オペレータ
void studyRaceStream() {
  RaceStream<int>([
    TimerStream<int>(1, Duration(seconds: 2)),
    TimerStream<int>(2, Duration(seconds: 1)),
    TimerStream<int>(3, Duration(seconds: 3)),
  ]).listen(print, onDone: () => print('done.'));
}
// 実行結果
// 2
// done.

サンプルの実行結果から、2つ目のStreamが1番最初に作成される為、2という値が発行されています。
その後、他のStreamが発行されることはなく Streamが購読完了処理が行われています。

Retryオペレータ (rxdart: Rx.retry<T>拡張メソッド / Stream API: RetryStream<T>クラス)

シグネチャ

  • rxdart
    Stream<T> retry(Stream<T> streamFactory(), [int count])

  • Stream API
    RetryStream<T> RetryStream<T>(Stream<T> streamFactory(), [int count])

処理の概要

Streamが正常に終了するまで、指定された回数リトライするStreamを作成します。
回数が指定されていない場合は、無制限にリトライします。
指定された回数リトライしてもStreamが正常に終了していない場合、RetryErrorが発行されます。
RetryErrorから発行された値には、エラー情報とStackTraceが含まれます。

以下にサンプルを示します。

/// rxdartのRetryオペレータサンプル
void studyRetry() async {
  // 正常に値が処理された場合は、そのまま完了処理が行われる。
  Rx.retry(() => Stream.value(1))
      .listen(print, onDone: () => print('done.'));

  // 間をあけるため、少し待つ
  await Future.delayed(Duration(milliseconds: 500));
  print('-----');

  // 指定された回数失敗した時のStreamでRetryErrorが発行される。
  // 今回は2を指定しているため、3回目の失敗でRetryErrorが発行される。
  var val = 1;
  Rx.retry(
          () =>Stream.value(val++)
              .concatWith([Stream.error(Error())]),
          2)
      .listen((x) => print('listen: $x'),
          onDone: () => print('done.'),
          onError: (e, s) => print('error: $e'),
          cancelOnError: true);
}
// 実行結果
// 1
// done.
// -----
// listen: 1
// listen: 2
// listen: 3
// error: Received an error after attempting 2 retries
  
/// Stream APIのRetryオペレータサンプル
void studyRetryStream() async {
  RetryStream<int>(() => Stream<int>.value(1))
      .listen(print, onDone: () => print('done.'));
  
  await Future.delayed(Duration(milliseconds: 500));
  print('-----');
  
  var val = 1;
  RetryStream<int>(
          () => ConcatStream(
              [Stream<int>.value(val++), Stream.error(Error())]),
          2)
      .listen(
    (x) => print('listen: $x'),
    onDone: () => print('done.'),
    onError: (e, s) => print('error: $e'),
    cancelOnError: true,
  );
}
// 実行結果
// 1
// done.
// -----
// listen: 1
// listen: 2
// listen: 3
// error: Received an error after attempting 2 retries

サンプルの実行結果から、指定された回数を超えた段階でStreamが正常に終了していない場合は、RetryErrorが発行されていることが分かります。
また、リトライ処理時に再度発行するStreamが新しく作成されていることが分かります。
API処理や外部通信のリトライ処理に役に立ちそうです。

雑感

コレクション(Iterable)にはないRx特有の名前が沢山あり、なかなか調べるのが大変ですが、この調子で書いていきたいと思います。
また新しくオペレータを知った時に、元々知っているオペレータと組み合わせればいい使い方ができる等組み合わせて操作できる幅が徐々に広がっていることがモチベーションになっています。
ただ、今はただひたすらrxdartの記載順に調べていっているので、余力があれば関連のあるオペレータをまとめていけたらなと思います。

ここまで読んで頂き、ありがとうございました。

ここで書いたサンプルコードは以下のリポジトリにあります。

参考サイト