狐好きぷろぐらまー

狐好きプログラマー見習いのブログです。

【0.23.0対応】rxdartについて調べてみる 第4回目 -Defer, Merge- 【rxdart】

こんにちは。 pregum_foxです。

今回は、rxdartのDeferオペレータとMergeオペレータについて調べていきます。

前回までの記事です。

目次です。

動作環境

動作環境は以下の通りです。

項目 バージョン
Dart 2.6.1
rxdart 0.24.0

以下にDeferオペレータとMergeオペレータについて記載します。 オペレータの名称はRx、rxdart、Stream APIの順で記載します。

この記事はrxdartが0.22.x以前のバージョンで書いていたこの記事を0.23.0以降のバージョンに対応した記事です。

0.22.xから0.23.xにバージョンが上がったタイミングの変更内容は以下のサイトをご覧ください。

Deferオペレータ (rxdart: Rx.defer<T>拡張メソッド / Stream API: DeferStream<T>クラス)

シグネチャ

  • rxdart
    Stream<T> defer<T>(Stream<T> streamFactory(), {bool reusable = false})

  • Stream API
    DeferStream<T> DeferStream<T>(Stream<T> streamFactory(), {bool reusable = false})

ご存知かと思いますが、{}で囲まれている引数はNamed parametersと呼ばれる名前付き省略可能引数です。省略した場合は今回の場合はfalseがデフォルト値として設定されます。似た記法でPositioned parametersが存在します。こちらは位置指定パラメータです。こちらは引数の順番が定義の順番と同じ必要がある引数です。 詳細はこちらをご覧ください。

処理の概要

購読者が購読するまで待機し、購読されたタイミングで引数のファクトリ関数(streamFactory)でStreamを作成します。
デフォルトでは、DeferStreamsはシングルサブスクリプションなので、複数購読を行おうとするとエラーが発生します。
ただし、reusable引数をtrueにするとブロードキャストになるので、複数購読が行うことができます。

サンプルを以下に示します。

// reusable=trueのRx.defer<T>
// rxdartのDeferオペレータ
void studyDeferWithReusable() async {
  var count = 0;
  // reusableにtrueを設定すると、ブロードキャストになるので複数購読可能
  var deferSubscription = Rx.defer(() {
    print('hello world: ${++count}');
    return Stream.value('defer push.');
  }, reusable: true);

  deferSubscription.listen((str) => print('listen1: $str'));
  // 複数Streamが発行されていることがわかるよう少し待機
  await Future.delayed(Duration(milliseconds: 10));
  // 二人目の購読者を登録してもエラーは発生しない
  deferSubscription.listen((str) => print('listen2: $str'));
}
// 実行結果
// hello world: 1
// listen1: defer push.
// hello world: 2
// listen2: defer push.

// reuasable=falseのRx.defer<T>
void studyDefer() async {
  var count = 0;
  // reusable引数をしていないため、reusableにはfalseが設定されている
  // その為シングルサブスクリプションなので、複数購読しようとするとエラーが発生する
  var deferSubscription = Rx.defer(() {
    print('hello world: ${++count}');
    return Stream.value('defer push.');
  });

  deferSubscription.listen((str) => print('listen1: $str'));
  // 複数Streamが発行されていることがわかるよう少し待機
  await Future.delayed(Duration(milliseconds: 10));
  // 下記の行をコメントアウトして実行するとエラーが発生する。
  // deferSubscription.listen((str) => print('listen2: $str'));
}
// 実行結果
// hello world: 1
// listen1: defer push.
  
// Stream APIのDeferオペレータ
void studyDeferStreamWithReusable() async {
  var count = 0;
  var deferSubscription = DeferStream<String>(() {
    print('hello world: ${++count}');
    return Stream<String>.value('defer push.');
  }, reusable: true);

  deferSubscription.listen((str) => print('listen1: $str'));
  // 複数Streamが発行されていることがわかるよう少し待機
  await Future.delayed(Duration(milliseconds: 10));
  // 二人目の購読者を登録してもエラーは発生しない
  deferSubscription.listen((str) => print('listen2: $str'));
}
// 実行結果
// hello world: 1
// listen1: defer push.
// hello world: 2
// listen2: defer push.

void studyDeferStream() async {
  var count = 0;
  // reusable引数をしていないため、reusableにはfalseが設定されている
  // その為シングルサブスクリプションなので、複数購読しようとするとエラーが発生する
  var deferSubscription = DeferStream<String>(() {
    print('hello world: ${++count}');
    return Stream<String>.value('defer push.');
  });

  deferSubscription.listen((str) => print('listen1: $str'));
  // 複数Streamが発行されていることがわかるよう少し待機
  await Future.delayed(Duration(milliseconds: 10));
  // 下記の行をコメントアウトするとエラーが発生する。
  // deferSubscription.listen((str) => print('listen2: $str'));
}
// 実行結果
// hello world: 1
// listen1: defer push.

サンプルの実行結果から、reusableをtrueにしておくとその都度Streamな値が生成されていることが分かります。
ただし、デフォルトがシングルサブスクリプションなので複数購読したい場合はreusableパラメータをtrueにしておく必要がありますのでご注意ください。

Mergeオペレータ (rxdart: Rx.merge<T>拡張メソッド / Stream API: MergeStream<T>クラス)

シグネチャ

  • rxdart
    Stream<T> merge<T>(Iterable<Stream<T>> streams)

  • Stream API
    MergeStream<T> MergeStream<T>(Iterable<Stream<T>> streams)

処理の概要

このMergeオペレータは以前の記事に書いたConcatオペレータに似た動作をするオペレータです。

Mergeオペレータは、複数のStreamをまとめて単一のStreamに変換します。
発行される順番は、各Streamの処理によります。 サンプルを以下に示します。

// rxdartのMergeオペレータ
void studyMerge() {
  // mergeは一度にすべてのStreamを発行するので、表示される順番はそれぞれのStream次第です。
  // 最初にすべてのStreamが発行されるので、'do on listen.'は最初に表示されます。
  Rx.merge([
    Stream.value(1),
    Rx.timer(2, Duration(seconds: 2)),
    Stream.value(3),
    Rx.timer(4, Duration(seconds: 1))
        .doOnListen(() => print('do on listen.')),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// do on listen.
// listen: 1
// listen: 3
// listen: 4
// listen: 2

// Stream APIのMergeオペレータ
void studyMergeStream() {
  MergeStream<int>([
    Stream<int>.value(1),
    TimerStream<int>(2, Duration(seconds: 2)),
    Stream<int>.value(3),
    TimerStream<int>(4, Duration(seconds: 1))
        .transform(DoStreamTransformer(onListen: () => print('do on listen.'))),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// do on listen.
// listen: 1
// listen: 3
// listen: 4
// listen: 2

上記のサンプルから、発行される順番はStreamに依存していることが分かります。

Concatオペレータとの比較

以前のConcatオペレータとの違いは、順番に発行されるか否かです。 以下にConcatのサンプルを再掲します。

// rxdartのConcatオペレータ
void studyConcat() {
  // concatは実行中のStreamが完了するまで、次のStreamが購読されないため
  // 'do on listen.'は3つ目のStreamが完了してから表示されます。
  // timerを使用してもその秒数待った後、順番に出力されます。
  Rx.concat<int>([
    Stream<int>.value(1),
    // Stream<int>.timer(2, Duration(seconds: 2)),
    Rx.timer(2, Duration(seconds: 2)),
    Stream<int>.value(3),
    Rx.timer(4, Duration(seconds: 1))
        .doOnListen(() => print('do on listen.')),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// listen: 1
// listen: 2
// listen: 3
// do on listen.
// listen: 4

// Stream APIのconcatオペレータ
void studyConcatStream() {
  ConcatStream<int>([
    Stream<int>.value(1),
    TimerStream<int>(2, Duration(seconds: 2)),
    Stream<int>.value(3),
    TimerStream<int>(4, Duration(seconds: 1))
        .transform(DoStreamTransformer(onListen: () => print('do on listen.'))),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// listen: 1
// listen: 2
// listen: 3
// do on listen.
// listen: 4

前述のMergeオペレータと実行結果を比べてもらうと、Concatオペレータは、連結した順番に値が発行されていて、Mergeオペレータは前の値の処理が完了する前に発行します。

雑感

正直なところ、あまり違いが分からないまま記事を書いている所はあったりするので今後他のオペレータを調べていく中でわかりましたらその都度追記していきたいと思います。

後、rxdartに記載しているオペレータで記載できていない物は70個弱ですが、気長に調べていこうと思います。

ここまで読んで頂き、ありがとうございました。

参考サイト