狐好きぷろぐらまー

狐好きプログラマーのブログです。

【0.23.0対応】rxdartについて調べてみる 第3回目 -concatとconcatEager-【rxdart】

こんにちは。 pregum_foxです。

今回は、rxdartのページの最初に載ってあったconcatとconcatEagerについて調べてみました。

前回までの記事は以下をご覧ください。

動作環境

動作環境は以下の通りです。

項目 バージョン
Dart 2.6.1
rxdart 0.24.0

以下にconcatオペレータとconcatEagerオペレータについて記載します。

今回は以前rxdartが0.22.xのバージョンの頃に書いていたこちらの記事を0.23.0以降のバージョンに対応したものです。

0.22.xから0.23.xにバージョンが上がったタイミングの変更内容は以下のサイトをご覧ください

concat オペレータ / concatEager オペレータ

基本的にはconcatオペレータもconcatEagerオペレータもStreamを連結させて、1つのStreamとして振る舞います。
連結した順番に値が発行され、発行が完了したら次の値が発行されます。 発行する順番をこちらで設定しておきたい時に便利なオペレータです。

Concatオペレータ (rxdart: Rx.concat<T>拡張メソッド / Stream API: ConcatStream<T>クラス)

シグネチャ

  • rxdart
    Stream<T> concat<T>(Iterable<Stream<T>> streams)

  • Stream API
    ConcatStream<T> ConcatStream(Iterable<Stream<T>> streams)

処理の概要

前のStreamが完了してから、次のStreamを実行するよう連結します。
concatは、Streamが購読されると順番に発行します。
以下にサンプルを示します。

void studyConcat() {
  // concatは実行中のStreamが完了するまで、次のStreamが購読されないため
  // 'do on listen.'は3つ目のStreamが完了してから表示されます。
  // timerを使用してもその秒数待った後、順番に出力されます。
  Rx.concat<int>([
    Stream<int>.value(1),
    // Stream<int>.timer(2, Duration(seconds: 2)),
    Rx.timer(2, Duration(seconds: 2)),
    Stream<int>.value(3),
    Rx.timer(4, Duration(seconds: 1))
        .doOnListen(() => print('do on listen.')),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// listen: 1
// listen: 2
// listen: 3
// do on listen.
// listen: 4
  
void studyConcatStream() {
  ConcatStream<int>([
    Stream<int>.value(1),
    TimerStream<int>(2, Duration(seconds: 2)),
    Stream<int>.value(3),
    TimerStream<int>(4, Duration(seconds: 1))
        .transform(DoStreamTransformer(onListen: () => print('do on listen.'))),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// listen: 1
// listen: 2
// listen: 3
// do on listen.
// listen: 4

Concatオペレータは複数のStreamをまとめるオペレータです。
そしてまとめた順番の通りに値を発行します。
後述するconcatEagerと違う箇所は、発行と購読がStreamごとに行われることです。
その為、3回目の値の発行の後に4つ目のStreamのdoOnListen()が処理されています。

ConcatEagerオペレータ (rxdart: Rx.concatEager<T>拡張メソッド / Stream API: ConcatEagerStream<T> クラス)

ConcatEagerオペレータはRxでは正規のオペレータ名では存在しなかったので、リンクはつけておりません、ご了承ください。

シグネチャ

  • rxdart
    Stream<T> concatEager<T>(Iterable<Stream<T>> streams)

  • Stream API
    ConcatEagerStream<T> ConcatEagerStream(Iterable<Stream<T>> streams)

処理の概要

前のStreamが完了してから、次のStreamを実行するよう連結します。
concatEagerは、Streamの発行前に一度全て処理しておいてから順番に発行します。
以下にサンプルを示します。

void studyConcatEager() {
  Rx.concatEager<int>([
    Stream.value(1),
    Rx.timer(2, Duration(seconds: 2)),
    Stream.value(3),
    Rx.timer(4, Duration(seconds: 1))
        .doOnListen(() => print('do on listen.')),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// do on listen.
// listen: 1
// listen: 2
// listen: 3
// listen: 4
  
void studyConcatEagerStream() {
  ConcatEagerStream<int>([
    Stream<int>.value(1),
    TimerStream<int>(2, Duration(seconds: 2)),
    Stream<int>.value(3),
    TimerStream<int>(4, Duration(seconds: 1))
        .transform(DoStreamTransformer(onListen: () => print('do on listen.'))),
  ]).listen((i) => print('listen: $i'));
}
// 実行結果
// do on listen.
// listen: 1
// listen: 2
// listen: 3
// listen: 4

ConcatEagerオペレータは複数のStreamをまとめるオペレータです。
そしてまとめた順番の通りに値を発行します。
concatEagerは、最初のStreamの発行前に全て購読しておきます。 その為、1回目の値の発行の前に4つ目のStreamのdoOnListen()が処理されています。

concat / concatEagerに関連するオペレータ

調べていく中で、concatやconcatEagerに類似するオペレータについてこちら探すのに良さそうなサイトがありましたのでいくつか記載します。

  • Mergeオペレータ (Rx: Rx.merge<T>拡張メソッド / Stream API: MergeStream<T> クラス)
    • 連結後、順番に関係なく値を発行します。
  • Zipオペレータ (Rx: Rx.zipN(Nは2~9)拡張メソッド / Stream API: ZipStream クラス(staticメソッドにzip2~zip9まで存在する))
    • 複数のStreamを順番に組み合わせて1つのStreamを発行します。
    • 複数のStreamを全て購読したタイミングでStreamが生成され、発行されます。
  • CombineLatest (Rx: Rx.combineLatestN(Nは2~9) 拡張メソッド / Stream API: CombineLatestStream クラス (staticメソッドにcombine2~combine9まで存在))
    • 複数のStreamを順番に組み合わせて、1つのStreamを発行します。
    • 複数のStreamの内いずれかを購読したタイミングで値が生成され、発行されます。

ここだけの説明だけでは私は理解できなかったので、次回以降で調べてみようと思います。

雑感

オペレータは、各言語によって名前が違ったりするので探すのも大変ですね。
concatEagerはRxの基本のオペレータではなかったので、 ReactiveX.ioにどのオペレータをするかの指標が記載されているので、参考にさせてもらっています。

concatとconcatEagerだけだとあまりrxdartがどこで使用できるか分かりづらいと思いますが、様々なオペレータが沢山あるので、調べつつ記事にしていきたいと思います。

未だに、DartPadにgistにアップしたコードを載せる方法がよく分かっていないので、そろそろ調べたいと思います。

ここまで読んで頂き、ありがとうございました。

参考サイト