狐好きぷろぐらまー

狐好きプログラマーのブログです。

【Dart】rxdartについて調べてみる 第4回目 -Defer, Merge- 【rxdart】

こんにちは。 pregum_foxです。

今回は、rxdartのDeferオペレータとMergeオペレータについて調べていきます。

この記事は0.22.x以前のrxdartで書かれた記事です。

この記事に書かれているサンプルコードは0.23.0以降のrxdartでは動きません。0.23.0に対応した記事については以下の記事をご覧ください。

pregum-fox.hatenablog.jp

前回までの記事です。

目次です。

動作環境

動作環境は以下の通りです。

項目 バージョン
Dart 2.5.1
rxdart 0.22.2

以下にDeferオペレータとMergeオペレータについて記載します。 オペレータの名称はRx、rxdart、Stream APIの順で記載します。

Deferオペレータ (rxdart: Observable<T>.defer)ファクトリコンストラクタ / Stream API: DeferStream<T>クラス)

シグネチャ

  • rxdart
    Observable<T> Observable<T>.defer(Stream<T> streamFactory(), {bool reusable = false})

  • Stream API
    DeferStream<T> DeferStream<T>(Stream<T> streamFactory(), {bool reusable = false})

ご存知かと思いますが、{}で囲まれている引数はNamed parametersと呼ばれる要は名前付き省略可能引数です。省略した場合は今回の場合はfalseがデフォルト値として設定されます。似た記法でPositioned parametersが存在します。こちらは位置指定パラメータです。こちらは引数の順番が定義の順番と同じ必要がある引数です。 詳細はこちらをご覧ください。

処理の概要

購読者が購読するまで待機し、購読されたタイミングで引数のファクトリ関数(streamFactory)でObservableな値を作成します。
デフォルトでは、DeferStreamsはシングルサブスクリプションなので、複数購読を行おうとするとエラーが発生します。
ただし、reusable引数をtrueにするとブロードキャストになるので、複数購読が行うことができます。

サンプルを以下に示します。

// rxdartのDeferオペレータ(reusable=true)
void studyDeferWithReusable() async {
  var count = 0;
  // reusableにtrueを設定すると、ブロードキャストになるので複数購読可能
  var deferSubscription = Observable<String>.defer(() {
    print('hello world: ${++count}');
    return Observable<String>.just('defer push.');
  }, reusable: true);

  deferSubscription.listen((str) => print('listen1: $str'));
  // 複数Observableが発行されていることがわかるよう少し待機
  await Future.delayed(Duration(milliseconds: 10));
  // 二人目の購読者を登録してもエラーは発生しない
  deferSubscription.listen((str) => print('listen2: $str'));
}
// 実行結果
// hello world: 1
// listen1: defer push.
// hello world: 2
// listen2: defer push.
  
// rxdartのDeferオペレータ(reusable=false)
void studyDefer() async {
  var count = 0;
  // reusable引数をしていないため、reusableにはfalseが設定されている
  // その為シングルサブスクリプションなので、複数購読しようとするとエラーが発生する
  var deferSubscription = Observable<String>.defer(() {
    print('hello world: ${++count}');
    return Observable<String>.just('defer push.');
  });

  deferSubscription.listen((str) => print('listen1: $str'));
  // 複数Observableが発行されていることがわかるよう少し待機
  await Future.delayed(Duration(milliseconds: 10));
  // 下記の行をコメントアウトして実行するとエラーが発生する。
  // deferSubscription.listen((str) => print('listen2: $str'));
}
// 実行結果
// hello world: 1
// listen1: defer push.
  
// ここまで rxdart
// ---------------------------------------------
// ---------------------------------------------
// ここから Stream API
  
// Stream APIのDeferオペレータ(reusable=true)
void studyDeferStreamWithReusable() async {
  var count = 0;
  var deferSubscription = DeferStream<String>(() {
    print('hello world: ${++count}');
    return Stream<String>.value('defer push.');
  }, reusable: true);

  deferSubscription.listen((str) => print('listen1: $str'));
  // 複数Observableが発行されていることがわかるよう少し待機
  await Future.delayed(Duration(milliseconds: 10));
  // 二人目の購読者を登録してもエラーは発生しない
  deferSubscription.listen((str) => print('listen2: $str'));
}
// 実行結果
// hello world: 1
// listen1: defer push.
// hello world: 2
// listen2: defer push.
  
// Stream APIのDeferオペレータ(reusable=false)
void studyDeferStream() async {
  var count = 0;
  // reusable引数をしていないため、reusableにはfalseが設定されている
  // その為シングルサブスクリプションなので、複数購読しようとするとエラーが発生する
  var deferSubscription = DeferStream<String>(() {
    print('hello world: ${++count}');
    return Stream<String>.value('defer push.');
  });

  deferSubscription.listen((str) => print('listen1: $str'));
  // 複数Observableが発行されていることがわかるよう少し待機
  await Future.delayed(Duration(milliseconds: 10));
  // 下記の行をコメントアウトするとエラーが発生する。
  // deferSubscription.listen((str) => print('listen2: $str'));
}
// 実行結果
//  world: 1
// listen1: defer push.

サンプルの実行結果から、reusableをtrueにしておくとその都度Observableな値が生成されていることが分かります。
ただし、デフォルトがシングルサブスクリプションなので複数購読したい場合はreusableパラメータをtrueにしておく必要がありますのでご注意ください。

Mergeオペレータ (rxdart: Observable<T>.mergeファクトリコンストラクタ / Stream API: MergeStream<T>クラス)

シグネチャ

  • rxdart
    Observable<T> Observable<T>.merge(Iterable<Stream<T>> streams)

  • Stream API
    MergeStream<T> MergeStream<T>(Iterable<Stream<T>> streams)

処理の概要

このMergeオペレータは以前の記事に書いたConcatオペレータに似た動作をするオペレータです。

Mergeオペレータは、複数のObservableな値をまとめて単一のObservableに変換します。
発行される順番は、各Observableな値の処理によります。 サンプルを以下に示します。

// rxdartのMergeオペレータサンプル
void studyMerge() {
  // mergeは一度にすべてのObservableな値を発行するので、表示される順番はそれぞれのObservableな値次第です。
  // 最初にすべてのObservableな値が発行されるので、'do on listen.'は最初に表示されます。
  Observable<int>.merge([
    Observable<int>.just(1),
    Observable<int>.timer(2, Duration(seconds: 2)),
    Observable<int>.just(3),
    Observable<int>.timer(4, Duration(seconds: 1))
        .doOnListen(() => print('do on listen.')),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// do on listen.
// listen: 1
// listen: 3
// listen: 4
// listen: 2
  
// Stream APIのMergeオペレータサンプル
void studyMergeStream() {
  MergeStream<int>([
    Stream<int>.value(1),
    TimerStream<int>(2, Duration(seconds: 2)),
    Stream<int>.value(3),
    TimerStream<int>(4, Duration(seconds: 1))
        .transform(DoStreamTransformer(onListen: () => print('do on listen.'))),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// do on listen.
// listen: 1
// listen: 3
// listen: 4
// listen: 2

上記のサンプルから、発行される順番はObservableな値に依存していることが分かります。

Concatオペレータとの比較

以前のConcatオペレータとの違いは、順番に発行されるか否かです。 以下にConcatのサンプルを記載します。

// rxdartのConcatオペレータ
void studyConcat() {
  // concatは実行中のObservableが完了するまで、次のObservableが購読されないため
  // 'do on listen.'は3つ目のObservableが完了してから表示されます。
  // timerを使用してもその秒数待った後、順番に出力されます。
  Observable<int>.concat([
    Observable<int>.just(1),
    Observable<int>.timer(2, Duration(seconds: 2)),
    Observable<int>.just(3),
    Observable<int>.timer(4, Duration(seconds: 1))
        .doOnListen(() => print('do on listen.')),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// listen: 1
// listen: 2
// listen: 3
// do on listen.
// listen: 4
  
// Stream APIのConcatオペレータ
void studyConcatStream() {
  ConcatStream<int>([
    Stream<int>.value(1),
    TimerStream<int>(2, Duration(seconds: 2)),
    Stream<int>.value(3),
    TimerStream<int>(4, Duration(seconds: 1))
        .transform(DoStreamTransformer(onListen: () => print('do on listen.'))),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// listen: 1
// listen: 2
// listen: 3
// do on listen.
// listen: 4

前述のMergeオペレータと実行結果を比べてもらうと、Concatオペレータは、連結した順番に値が発行されていて、Mergeオペレータは前の値の処理が完了する前に発行します。

雑感

正直なところ、あまり違いが分からないまま記事を書いている所はあったりするので今後他のオペレータを調べていく中でわかりましたらその都度追記していきたいと思います。

後、rxdartに記載しているオペレータで記載できていない物は70個弱ですが、気長に調べていこうと思います。

ここまで読んで頂き、ありがとうございました。

参考サイト