狐好きぷろぐらまー

狐好きプログラマー見習いのブログです。

【0.23.0対応】rxdartについて調べてみる 第2回目 -ObservableとSubject系クラス- 【rxdart】

こんにちは。 pregum_foxです。

今回は、rxdartでメインとなるRx、PublishSubject、BehaviorSubject、ReplaySubjectについて調べてみました。

0.23.0に対応したコードに変更しました。

目次です。

動作環境

動作環境は以下の通りです

項目 バージョン
Dart 2.6.1
rxdart 0.24.0

0.22.xから0.23.xで変わったこと

rxdartは0.23.0にバージョンが上がるタイミングで破壊的変更が加わり、今までのソースコードをそのままビルドすることができなくなりました。

Release Extension Methods Support · ReactiveX/rxdart · GitHub

その対策として0.22.xで書かれたrxdartのソースを0.23.xに対応するコードに変更してくれるツールがありますが、実際に使ってみた所全て変更はできなくて、結局手作業で修正していきました。。。

変更していたコードは以前rxdartが0.22.x時代に書いていたものです。

Change the version of rxdart from 0.22.0 to 0.24.0 by Pregum · Pull Request #1 · Pregum/rxdartPractice · GitHub

その変更である程度機械的に変更はできたので私が変更していく中でビルドができない原因になっていた箇所は主に以下の場所でした。

  • Observableクラスのストリーム生成系のstaticメソッドはRxの拡張メソッドになった
    • Observable#range -> Rx.range、 Observable#combineLatest4 -> Rx#combineLatest4、etc.
  • Observable#periodicコンストラクタは削除され、Stream#periodicメソッドを使用するようになった
  • Observable#justファクトリコンストラクタは削除され、Stream#valueファクトリコンストラクタを使用するようになった

基本的にはObservableはRxに置換、それでエラーが出たらStreamにすれば概ねエラーは消える印象でした。

後はジェネリクスの位置でエラーが出たりでなかったりしていた印象です。

ここで使用する用語の説明

前回では、観察役と購読者や観察や購読等似たような用語が複数あって理解しづらくなっていたので、ここで一旦記事内で使用する用語の説明をしたいと思います。ですので他のサイトとは違う意味で使用しているかもしれませんがご了承ください。

  • 購読(観察/listen)
    listenしているstreamから発行された値を受け取ることを指しています。
    なので購読を登録することはstreamをlistenすることです。

  • 購読者(観察役/listener)
    streamを購読しているオブジェクトの事です。

  • 発行(出力)
    streamから値が購読者に向けて値を出力することです。
    発行された値に対して購読者は様々な処理を行います。

コードでは以下のように記載するようにします。

  // 購読可能なオブジェクト(Stream)を作成
  final Observable<String> observable = subject.stream;
  // 購読者を登録(作成)
  final StreamSubscription<String> observer1 = observable.listen(
      (str) => print('observable1 say: $str'));

  // 値'step 1'を発行
  subject.add('step 1');

Rxクラス

Rxクラスは、ストリームを生成するstaticメソッドを持っているクラスです。 生成されるストリームの型はStream<T>クラスなので、基本的にはDartのStreamクラスと同じような使い方が可能です。

rxdartを使用する時は、以前はObservable<T>に対してmapやwhere等のオペレータを適用して、購読した値に対して様々な処理を行っていましたが、0.23.xからはStream<T>クラスの拡張メソッドにオペレータが移動したので、rxdartを意識することは減ったと思います。

こちらのrxdartのgithubのreadmeを見ると、Observableで使用できるほとんどのメソッドはStreamでも用意されています。
中にはObservableでのみ用意されているメソッドがいくつかあります(Observable.periodic()等)。
それについては、何かしらの代替案があるはずなので両者の大きな違いはないかと思います。
また、Stream<T>はデフォルトでシングルサブスクリプションなので複数の購読者を登録する場合は、asBroadcastStream()メソッドを使用し、ブロードキャストに変換してから購読者を登録してください。

詳しくはこちらをご覧ください。

GitHub - ReactiveX/rxdart: The Reactive Extensions for Dart


それでは、Streamで値を発行する簡単なファクトリコンストラクタ / メソッドをいくつか紹介します。
ファクトリコンストラクタとは、factoryキーワードを使用して作成したコンストラクタで、singletonパターンやサブクラスを生成する時に使用されます。

Observable<T>.just() ファクトリコンストラク / Stream<T>.value() ファクトリコンストラク

Observable<T>#just()ファクトリコンストラクタは0.23.0から削除されました

代わりにStream<T>#value()ファクトリコンストラクタを使用しましょう。

単一の値を含むStreamを作成します。
購読者が登録されると、値が発行されます。
発行後は、Streamは閉じられます。
サンプルコードは以下の通りです。

void studyJustValue() {
  // 単一の値を含むStreamを作成
  final Stream<String> stream = Stream.value('stream.value');
  print('subscriber create.');
  // 購読者を登録してから値が発行される。
  var streamSubscriber =
      stream.listen(print, onDone: () => print('stream.value done.'));
}
// 実行結果
// subscriber create.
// stream.value
// stream.value done.

Rx.repeat<T>() ファクトリコンストラクタ / RepeatStream<T> クラス

指定された回数Streamを作成するStreamを返します。
回数が指定されていない場合は、無限に繰り返されます。
値が作成される時、現在のループのindexを取得することができます。(0始まり)
サンプルコードは以下の通りです。

void studyRepeat() {
  // 指定された回数分Streamを作成するStreamを作成
  final Stream<String> repeatStream = RepeatStream<String>(
      (int repeatIndex) => Stream.value('st count: $repeatIndex'), 3);
  // 指定された回数分Observableを作成するObservableを作成
  final Stream<String> repeatObservable = Rx.repeat(
      (int repeatIndex) => Stream.value('ob count: $repeatIndex'), 3);
  print('subscriber create.');

  var streamSubscriber =
      repeatStream.listen(print, onDone: () => print('RepeatStream done.'));
  var observableSubscriber = repeatObservable.listen(print,
      onDone: () => print('Rx.repeat done.'));
}
// 実行結果
// subscriber create.
// st count: 0
// ob count: 0
// st count: 1
// ob count: 1
// st count: 2
// ob count: 2
// RepeatStream done.
// Rx.repeat done.

Rx.range<T>() 静的メソッド / RangeStream<T> クラス

指定された範囲内の整数のシーケンスを発行するStreamを返します。
第1引数から第2引数に向かって整数のシーケンスを作成する為、降順のシーケンスを作成する場合はRangeStream(10, 3);のように記載すれば10から3の順に値が発行されます。
サンプルコードを以下に示します。

void studyRange() async {
  // 指定された範囲内の整数のシーケンスを発行するStreamを作成
  final Stream<int> rangeStream = RangeStream(1, 3);
  // 指定された範囲内の整数のシーケンスを発行するObservableを作成
  final Stream<int> rangeObservable = Rx.range(1, 3);
  print('subscriber create.');
  var streamSubscriber = rangeStream.listen((i) => print('stream: $i'),
      onDone: () => print('RangeStream done.'));
  var observableSubscriber = rangeObservable.listen(
      (i) => print('observable: $i'),
      onDone: () => print('Rx.range done.'));

  // 区切るために少し待つ
  await Future.delayed(Duration(milliseconds: 500));
  print('---------------------');

  // 発行される整数のシーケンスは、第1引数から第2引数に向かって発行される。
  final Stream<int> inverseRangeObservable = Rx.range(3, 1);
  var observableSubscriber2 = inverseRangeObservable.listen(print,
      onDone: () => print('inv Rx.range done.'));
}
// 実行結果
// subscriber create.
// stream: 1
// observable: 1
// stream: 2
// observable: 2
// stream: 3
// observable: 3
// RangeStream done.
// Rx.range done.
// ---------------------
// 3
// 2
// 1
// inv Rx.range done.

Observable<T>.error() ファクトリコンストラク/ ErrorStream<T> クラス -> Stream<T>.error()ファクトリコンストラク

Observable<T>#error()ファクトリコンストラクタ、ErrorStream<T>クラスは0.23.0から削除されました

代わりにStream<T>#error()ファクトリコンストラクタを使用しましょう。

エラーを発行するStreamを作成します。
おそらく、テスト用にエラーを発行したい時に役立つと思います。
サンプルコードを以下に示します。

void studyError() {
  // エラーを発行するStreamを作成します。
  final Stream<String> errorStream = Stream.error('Stream.error output.');
  print('subscriber create.');

  var streamSubscriber =
      errorStream.listen(print, onError: (e) => print('stream error: $e'));
}
// 実行結果
// subscriber create.
// stream error: Stream.error output.

Rx.timer<T>() staticメソッド / TimerStream<T> クラス

指定された時間が経過すると、指定された値が発行されるStreamを作成します。
指定された値は、Stream作成時に設定されています。
一定時間経過後、現在の時間を発行する場合、Stream作成時の時間が設定されています。
サンプルコードを以下に示します。

void studyTimer() async {
  // 指定された時間経過すると、指定された値が発行されるStreamを作成
  final Stream<DateTime> timerStream =
      TimerStream(DateTime.now(), Duration(seconds: 2));
      print('create stream that listen 2sec after.');
  // 指定された時間経過すると、指定された値が発行されるObservablerを作成
  final Stream<DateTime> timerObservable =
      Rx.timer(DateTime.now(), Duration(seconds: 5));
      print('create stream that listen 5sec after.');
  print('now: ${DateTime.now()}');
  // 発行される値は、Stream作成時に作成されている。
  var streamSubscriber = timerStream.listen((dt) => print('st listen: $dt'),
      onDone: () => print('TimerStream done. -- now: ${DateTime.now()}'));
  // 発行される値は、Observable作成時に作成されている。
  var observableSubscriber = timerObservable.listen(
      (dt) => print('ob listen: $dt'),
      onDone: () => print('Rx.timer done. -- now: ${DateTime.now()}'));
}
// 実行結果
// create stream that listen 2sec after.
// create stream that listen 5sec after.
// now: 2020-05-04 10:41:35.083006
// st listen: 2020-05-04 10:41:35.082008
// TimerStream done. -- now: 2020-05-04 10:41:37.097631
// ob listen: 2020-05-04 10:41:35.083006
// Rx.timer done. -- now: 2020-05-04 10:41:40.092854

Subject系クラスについて

Streamについては発行する値の生成する方法を記載していました。
ここからはSubject / StreamControllerについて説明していきたいと思います。
Subjectクラスには以下のクラスが実装されています。

上記のクラスの基になるStreams APIでのクラスはStreamController<T>です。 通常の使い方としては、sinkプロパティを通して値を発行し、streamプロパティで値を購読してもらうような流れになっています。
ただ、前回も記載していましたが、StreamControllerはデフォルトがシングルサブスクリプションでSubject系のクラスはデフォルトでブロードキャストです。
なのでSubject系クラスを使用すれば、通常のコンストラクタで作成したインスタンスでも複数購読者を登録することが可能です。

参考サイト : Differences of RxDart and Rx standard could be explicit in the first page. · Issue #186 · ReactiveX/rxdart · GitHub

以下にサンプルコードを示します。

void studySubject() {
  // StreamControllerを生成
  final StreamController<String> sc = StreamController<String>();
  // StreamControllerのstream
  final scStream = sc.stream;
  // StreamControllerのsink
  final scSink = sc.sink;
  // Streamの購読者を登録する
  var scSubscriber = scStream.listen((str) => print('sc listen: $str'));

  // Streamの値を発行する
  scSink.add('hi StreamController.');
  // この書き方も可能
  // sc.add('hi StreamController.');

  // StreamControllerとほぼ同じ動きをするPublishSubjectを生成
  final PublishSubject<String> subject = PublishSubject<String>();
  // PublishSubjectのstream
  final subjStream = subject.stream;
  // PublishSubjectのsink
  final subjSink = subject.sink;
  // Observableの購読者を登録する
  var subjSubscriber = subjStream.listen((str) => print('subj listen: $str'));

  // Observableの値を発行する
  subjSink.add('hi subject.');
  // この書き方も可能
  // subject.add('hi subject.');
}
// 実行結果
// sc listen: hi StreamController.
// subj listen: hi subject.

カプセル化の為に必要なプロパティ(stream / sink)だけ外部に公開し、不要なものはすべて隠しておく為にこのような形になったのかと思います。
今回は単純にするため、サンプルコードにはSubject系クラスのみ記載します。
後、RxにはhotとcoldなObservableが存在していて違いがあるのですが、分かりやすいサイトがありましたのでこちらをご覧ください。
Dartでなくてスミマセン。

[Rx入門] Subject詳解 / Hotな、ColdなObservableのこと - Qiita

PublishSubject<T>クラス

このクラスはStreamController<T>クラスと1点を除いて同じです。

唯一違う点はデフォルトがホットかコールドかの違いです。

StreamControllerはコールドで、PublishSubject<T>はホットです。

ただ、StreamControllerクラスもbroadcast()ファクトリコンストラクタを使用すれば、ホットサブスクリプションになるため、どちらを使用するかで大幅な変更が加わることはないと思います。

以下にサンプルコードを示します。

void studyPublishSubject() {
  // PublishSubjectを作成
  final PublishSubject<int> subject = PublishSubject<int>();

  // 値1を発行。この時点では購読者がいないため、何も処理されない
  subject.add(1);
  // ここで、購読者1を登録
  var subscriber1 = subject.stream.listen((val) => print('sub1 listen: $val'));
  // 値2を発行。購読者1が登録されているため処理が行われる。
  subject.add(2);
  // 購読者2を登録
  var subscriber2 = subject.stream.listen((val) => print('sub2 listen: $val'));
  // 値3を発行。購読者1, 2が登録されている為、それぞれ処理が行われる。
  subject.add(3);
  // streamを閉じる。
  subject.close();
}
// 実行結果
// sub1 listen: 2
// sub2 listen: 3
// sub1 listen: 3

BehaviorSubject<T>クラス

最新の値を保持しておいて、それを最初に発行する値として登録された購読者に発行するStreamControllerです。
最初に発行する値がない場合は、初期値として何かしらの値をコンストラクタで用意することができます。
以下にサンプルコードを示します。

void studyBehaviorSubject() async {
  // BehaviorSubject(初期値なし)を作成
  final BehaviorSubject<int> subject = BehaviorSubject<int>();

  // 初期値がない場合は、初期値では何に購読されない。
  var subscriber1 = subject.stream.listen((val) => print('sub1 listen: $val'));
  subject.add(1);
  subject.add(2);
  // 購読登録時の最新の値が最初に購読される。
  var subscriber2 = subject.stream.listen((val) => print('sub2 listen: $val'));
  await subject.close();

  // 区切り線
  print('------------- ');

  // BehaviorSubject(初期値あり)を作成
  final BehaviorSubject<int> subjectWithSeed = BehaviorSubject<int>.seeded(0);
  // 初期値がある場合は、初期値が最初に購読される。
  var subscriber3 = subjectWithSeed.stream.listen((val) => print('sub3 listen: $val'));
  subjectWithSeed.add(1);
  subjectWithSeed.add(2);
  var subscriber4 = subjectWithSeed.stream.listen((val) => print('sub4 listen: $val'));
  await subjectWithSeed.close();
}
// 実行結果
// sub1 listen: 1
// sub2 listen: 2
// sub1 listen: 2
// -------------
// sub3 listen: 0
// sub3 listen: 1
// sub4 listen: 2
// sub3 listen: 2

ReplaySubject<T>クラス

追加されているすべての値を保持しておいて、それらを最初に購読する値として登録された購読者に発行するStreamControllerです。
最大値を設定することで、保持する値の数を制限することが可能です。
サンプルコードを以下に示します。

void studyReplaySubject() async {
  // ReplaySubjectを発行(制限なし)
  final ReplaySubject<int> subject = ReplaySubject<int>();

  // 値1を発行
  subject.add(1);
  subject.add(2);
  // 購読登録時の出力される値をすべて取得するので、値1と値2を購読する
  var subscriber1 = subject.stream.listen((val) => print('sub1 listen: $val'));
  // 値3を発行
  subject.add(3);
  // 購読登録時の出力される値をすべて取得するので、値1と値2と値3を購読する
  var subscriber2 = subject.stream.listen((val) => print('sub2 listen: $val'));
  await subject.close();

  // 区切る為に少し待機
  await Future.delayed(Duration(milliseconds: 500));
  print('--------------------');

  // ReplaySubjectを発行(制限あり:2)
  final ReplaySubject<int> limitedSubject = ReplaySubject<int>(maxSize: 2);

  // 値1を発行
  limitedSubject.add(1);
  limitedSubject.add(2);
  // 購読登録時の出力される値を最大2つまで保持するので、値1, 値2を購読する
  var subscriber3 = limitedSubject.stream.listen((val) => print('sub3 listen: $val'));
  // 値3を発行
  limitedSubject.add(3);
  // この時点で値1は破棄されるので、値2, 値3を購読する
  var subscriber4 = limitedSubject.stream.listen((val) => print('sub4 listen: $val'));
  await limitedSubject.close();
}
// 実行結果
// sub1 listen: 1
// sub2 listen: 1
// sub1 listen: 2
// sub2 listen: 2
// sub1 listen: 3
// sub2 listen: 3
// --------------------
// sub3 listen: 1
// sub4 listen: 2
// sub3 listen: 2
// sub4 listen: 3
// sub3 listen: 3

AsyncSubject<T>クラス

Rxにはもう一つAsyncSubjectが存在しますが、rxdartには今のところ(2020/05/04)存在しません。

参考サイト

代わりに、PublishSubject<T>を使用し実現することができます。

void studyAsyncSubject() {
  var subject = PublishSubject<int>();
  var asyncStream = Observable.fromFuture(subject.last);

  subject
    ..add(1)
    ..add(2)
    ..add(3)
    ..close();
  // 最後の値のみ購読する。
  asyncStream.listen(print);
}
// 実行結果
// 3

主な登場人物であるObservableとSubject系オブジェクトが調べ終わったので、次に各オペレータについて少しずつ調べていこうと思います。

ここまで読んで頂き、ありがとうございます。

雑感

見返してみると、そこまで量がないのに書くのに時間がかかるのは文章を書くのと調べるのに慣れていないからだと思いました。
Streams APIがrxdartの基礎になっていることが分かってよかったです。(小並感)
次回はconcatとconcatEagerについて書きかけがあるのでそれを記事にしたいと思います。

参考サイト