狐好きぷろぐらまー

狐好きプログラマー見習いのブログです。

【Dart】rxdartについて調べてみる 第2回目 -ObservableとSubject系クラス- 【rxdart】

こんにちは。 pregum_foxです。

今回は、rxdartでメインとなるObservable、PublishSubject、BehaviorSubject、ReplaySubjectについて調べてみました。

この記事は0.22.x以前のrxdartで書かれた記事です。

0.23.0以降のrxdartでは動きません。 0.23.0に対応した記事についてはこちらをご覧ください。

目次です。

動作環境

動作環境は以下の通りです

項目 バージョン
Dart 2.5.1

用語の説明

前回では、観察役と購読者や観察や購読等似たような用語が複数あって理解しづらくなっていたので、ここで一旦記事内で使用する用語の説明をしたいと思います。ですので他のサイトとは違う意味で使用しているかもしれませんがご了承ください。

  • 購読(観察/listen)
    listenしているstreamから発行された値を受け取ることを指しています。
    なので購読を登録することはstreamをlistenすることです。

  • 購読者(観察役/listener)
    streamを購読しているオブジェクトの事です。

  • 発行(出力)
    StreamやObservableから値が購読者に向けて値を出力することです。
    発行された値に対して購読者は様々な処理を行います。

コードでは以下のように記載するようにします。

  // 購読可能なオブジェクト(Stream)を作成
  final Observable<String> observable = subject.stream;
  // 購読者を登録(作成)
  final StreamSubscription<String> observer1 = observable.listen(
      (str) => print('observable1 say: $str'));

  // 値'step 1'を発行
  subject.add('step 1');

Observable<T>クラス

Observableクラスは、Stream<T>クラスを継承したラッパークラスです。なので、基本的にはDartのStreamクラスと同じような使い方が可能です。
rxdartを使用する時は、このObservable<T>に対してmapやwhere等のオペレータを適用して、購読した値に対して様々な処理を行います。
Dart Streamを使用する場合はこのObservable<T>の代わりにStream<T>を使用します。

こちらのrxdartのgithubのreadmeを見ると、Observableで使用できるほとんどのメソッドはStreamでも用意されています。
中にはObservableでのみ用意されているメソッドがいくつかあります(Observable.periodic()等)。
それについては、何かしらの代替案があるはずなので両者の大きな違いはないかと思います。
また、Observable<T> / Stream<T>はデフォルトでシングルサブスクリプションなので複数の購読者を登録する場合は、asBroadcastStream()メソッドを使用し、ブロードキャストに変換してから購読者を登録してください。

ObservableとStreamで値を発行する簡単なファクトリコンストラクタ / メソッドをいくつか紹介します。
ファクトリコンストラクタとは、factoryキーワードを使用して作成したコンストラクタで、singletonパターンやサブクラスを生成する時に使用されます。

Observable<T>.just() ファクトリコンストラクタ / Stream<T>.value() ファクトリコンストラク

単一の値を含むObservable / Streamを作成します。
購読者が登録されると、値が発行されます。
発行後は、Observable / Streamは閉じられます。
Stream<T>.value()はrxdartのreadmeには記載されていませんが機能として似ている為、Stream<T>.value()を記載しました。
サンプルコードは以下の通りです。

void studyJust(){
  // 単一の値を含むStreamを作成
  final Stream<String> stream = Stream.value('stream.value');
  // 単一の値を含むObservableを作成
  final Observable<String> observable = Observable.just('observable.just');
  print('subscriber create.');
  // 購読者を登録してから値が発行される。
  var streamSubscriber = stream.listen(print, onDone: () => print('stream.value done.'));
  var observableSubscriber = observable.listen(print, onDone: () => print('observable.just done.'));
  // 以下の行をコメントアウトすると、複数の購読者を登録してしまい、StateErrorが発生します。
  // var observableSubscriber2 = observable.listen(print, onDone: () => print('observable.just done.'));
}
// 実行結果
// subscriber create.
// stream.value
// observable.just
// stream done.
// observable done.

Observable<T>.repeat() ファクトリコンストラクタ / RepeatStream<T> クラス

指定された回数Observable / Streamを作成するObservable / Streamを返します。
回数が指定されていない場合は、無限に繰り返されます。
値が作成される時、現在のループのindexを取得することができます。(0始まり)
サンプルコードは以下の通りです。

void studyRepeat() {
  // 指定された回数分Streamを作成するStreamを作成
  final Stream<String> repeatStream = RepeatStream<String>(
      (int repeatIndex) => Stream.value('st count: $repeatIndex'), 3);
  // 指定された回数分Observableを作成するObservableを作成
  final Observable<String> repeatObservable = Observable.repeat(
      (int repeatIndex) => Observable.just('ob count: $repeatIndex'), 3);
  print('subscriber create');

  var streamSubscriber =
      repeatStream.listen(print, onDone: () => print('RepeatStream done.'));
  var observableSubscriber = repeatObservable.listen(print,
      onDone: () => print('Observable.repeat done.'));
}
// 実行結果
// subscriber create.
// st count: 0
// ob count: 0
// st count: 1
// ob count: 1
// st count: 2
// ob count: 2
// RepeatStream done.
// Observable.repeat done.

Observable<T>.range() 静的メソッド / RangeStream<T> クラス

指定された範囲内の整数のシーケンスを発行するObservable / Streamを返します。
第1引数から第2引数に向かって整数のシーケンスを作成する為、降順のシーケンスを作成する場合はRangeStream(10, 3);のように記載すれば10から3の順に値が発行されます。
サンプルコードを以下に示します。

void studyRange() async {
  // 指定された範囲内の整数のシーケンスを発行するStreamを作成
  final Stream<int> rangeStream = RangeStream(1, 3);
  // 指定された範囲内の整数のシーケンスを発行するObservableを作成
  final Observable<int> rangeObservable = Observable.range(1, 3);
  print('subscriber create.');
  var streamSubscriber = rangeStream.listen((i) => print('stream: $i'),
      onDone: () => print('RangeStream done.'));
  var observableSubscriber = rangeObservable.listen(
      (i) => print('observable: $i'),
      onDone: () => print('Observable.range done.'));

  // 区切るために少し待つ
  await Future.delayed(Duration(milliseconds: 500));
  print('---------------------');

  // 降順に値を発行する場合は(3, 1)のように指定する。
  final Observable<int> inverseRangeObservable = Observable.range(3, 1);
  var observableSubscriber2 = inverseRangeObservable.listen(print,
      onDone: () => print('Observable.range done.'));
}
// 実行結果
// subscriber create.
// stream: 1
// observable: 1
// stream: 2
// observable: 2
// stream: 3
// observable: 3
// RangeStream done.
// Observable.range done.
// ---------------------
// 3
// 2
// 1
// Observable.range done.

Observable<T>.error() ファクトリコンストラクタ / ErrorStream<T> クラス

エラーを発行するObservable / Streamを作成します。
おそらく、テスト用にエラーを発行したい時に役立つと思います。
サンプルコードを以下に示します。

void studyError() {
  // エラーを発行するStreamを作成します。
  final Stream<String> errorStream = ErrorStream('ErrorStream output.');
  // エラーを発行するObservableを作成します。
  final Observable<String> errorObservable =
      Observable.error(Exception('Observable.error output.'));
  print('subscriber create.');

  var streamSubscriber =
      errorStream.listen(print, onError: (e) => print('stream error: $e'));
  var observableSubscriber = errorObservable.listen(print,
      onError: (e) => print('observable occurs: $e'));
}
// 実行結果
// subscriber create.
// stream error: ErrorStream output.
// observable occurs: Exception: Observable.error output.

Observable<T>.timer() ファクトリコンストラクタ / TimerStream<T> クラス

指定された時間が経過すると、指定された値が発行されるObservable / Streamを作成します。
指定された値は、Observable / Stream作成時に設定されています。
一定時間経過後、現在の時間を発行する場合、Observable / Stream作成時の時間が設定されています。
サンプルコードを以下に示します。

void studyTimer() async {
  // 指定された時間経過すると、指定された値が発行されるStreamを作成
  final Stream<DateTime> timerStream =
      TimerStream(DateTime.now(), Duration(seconds: 1));
  // 指定された時間経過すると、指定された値が発行されるObservablerを作成
  final Observable<DateTime> timerObservable =
      Observable.timer(DateTime.now(), Duration(seconds: 1));
  print('now: ${DateTime.now()}');
  // 発行される値は、Stream作成時に作成されている。
  var streamSubscriber = timerStream.listen((dt) => print('st listen: $dt'),
      onDone: () => print('TimerStream done.'));
  // 発行される値は、Observable作成時に作成されている。
  var observableSubscriber = timerObservable.listen(
      (dt) => print('ob listen: $dt'),
      onDone: () => print('Observable.timer done.'));
  await Future.delayed(Duration(seconds: 1));
  print('now: ${DateTime.now()}');
}
// 実行結果
// now: 2019-10-06 19:21:34.599799
// st listen: 2019-10-06 19:21:34.595205
// TimerStream done.
// ob listen: 2019-10-06 19:21:34.598802
// Observable.timer done.
// now: 2019-10-06 19:21:35.606994

Subject系クラスについて

Observable / Streamについては発行する値の生成する方法を記載していました。
ここからはSubject / StreamControllerについて説明していきたいと思います。
Subjectクラスには以下のクラスが実装されています。

上記のクラスの基になるStreams APIでのクラスはStreamController<T>です。 通常の使い方としては、sinkプロパティを通して値を発行し、streamプロパティで値を購読してもらうような流れになっています。
ただ、前回も記載していましたが、StreamControllerはデフォルトがシングルサブスクリプションでSubject系のクラスはデフォルトでブロードキャストです。
なのでSubject系クラスを使用すれば、通常のコンストラクタで作成したインスタンスでも複数購読者を登録することが可能です。

参考サイト : Differences of RxDart and Rx standard could be explicit in the first page. · Issue #186 · ReactiveX/rxdart · GitHub

以下にサンプルコードを示します。

void studySubject() {
  // StreamControllerを生成
  final StreamController<String> sc = StreamController<String>();
  // StreamControllerのstream
  final scStream = sc.stream;
  // StreamControllerのsink
  final scSink = sc.sink;
  // Streamの購読者を登録する
  var scSubscriber = scStream.listen((str) => print('sc listen: $str'));

  // Streamの値を発行する
  scSink.add('hi StreamController.');
  // この書き方も可能
  // sc.add('hi StreamController.');

  // StreamControllerとほぼ同じ動きをするPublishSubjectを生成
  final PublishSubject<String> subject = PublishSubject<String>();
  // PublishSubjectのstream
  final subjStream = subject.stream;
  // PublishSubjectのsink
  final subjSink = subject.sink;
  // Observableの購読者を登録する
  var subjSubscriber = subjStream.listen((str) => print('subj listen: $str'));

  // Observableの値を発行する
  subjSink.add('hi subject.');
  // この書き方も可能
  // subject.add('hi subject.');
}
// 実行結果
// sc listen: hi StreamController.
// subj listen: hi subject.

カプセル化の為に必要なプロパティ(stream / sink)だけ外部に公開し、不要なものはすべて隠しておく為にこのような形になったのかと思います。
今回は単純にするため、サンプルコードにはSubject系クラスのみ記載します。
後、RxにはhotとcoldなObservableが存在していて違いがあるのですが、分かりやすいサイトがありましたのでこちらをご覧ください。
Dartでなくてスミマセン。

[Rx入門] Subject詳解 / Hotな、ColdなObservableのこと - Qiita

PublishSubject<T>クラス

このクラスはStreamController.broadcast() ファクトリコンストラクタと返す型が違うだけで後は同じです。
StreamControllerクラスはStreamを返しますが、PublishSubjectはObservableを返します。
いかにサンプルコードを示します。

void studyPublishSubject() {
  // PublishSubjectを作成
  final PublishSubject<int> subject = PublishSubject<int>();

  // 値1を発行。この時点では購読者がいないため、何も処理されない
  subject.add(1);
  // ここで、購読者1を登録
  var subscriber1 = subject.stream.listen((val) => print('sub1 listen: $val'));
  // 値2を発行。購読者1が登録されているため処理が行われる。
  subject.add(2);
  // 購読者2を登録
  var subscriber2 = subject.stream.listen((val) => print('sub2 listen: $val'));
  // 値3を発行。購読者1, 2が登録されている為、それぞれ処理が行われる。
  subject.add(3);
  // streamを閉じる。
  subject.close();
}
// 実行結果
// sub1 listen: 2
// sub2 listen: 3
// sub1 listen: 3

BehaviorSubject<T>クラス

最新の値を保持しておいて、それを最初に発行する値として登録された購読者に発行するStreamControllerです。
最初に発行する値がない場合は、初期値として何かしらの値をコンストラクタで用意することができます。
以下にサンプルコードを示します。

void studyBehaviorSubject() async {
  // BehaviorSubject(初期値なし)を作成
  final BehaviorSubject<int> subject = BehaviorSubject<int>();

  // 初期値がない場合は、初期値では何に購読されない。
  var subscriber1 = subject.stream.listen((val) => print('sub1 listen: $val'));
  subject.add(1);
  subject.add(2);
  // 購読登録時の最新の値が最初に購読される。
  var subscriber2 = subject.stream.listen((val) => print('sub2 listen: $val'));
  await subject.close();

  // 区切り線
  print('------------- ');

  // BehaviorSubject(初期値あり)を作成
  final BehaviorSubject<int> subjectWithSeed = BehaviorSubject<int>.seeded(0);
  // 初期値がある場合は、初期値が最初に購読される。
  var subscriber3 = subjectWithSeed.stream.listen((val) => print('sub3 listen: $val'));
  subjectWithSeed.add(1);
  subjectWithSeed.add(2);
  var subscriber4 = subjectWithSeed.stream.listen((val) => print('sub4 listen: $val'));
  await subjectWithSeed.close();
}
// 実行結果
// sub1 listen: 1
// sub2 listen: 2
// sub1 listen: 2
// -------------
// sub3 listen: 0
// sub3 listen: 1
// sub4 listen: 2
// sub3 listen: 2

ReplaySubject<T>クラス

追加されているすべての値を保持しておいて、それらを最初に購読する値として登録された購読者に発行するStreamControllerです。
最大値を設定することで、保持する値の数を制限することが可能です。
サンプルコードを以下に示します。

void studyReplaySubject() async {
  // ReplaySubjectを発行(制限なし)
  final ReplaySubject<int> subject = ReplaySubject<int>();

  // 値1を発行
  subject.add(1);
  subject.add(2);
  // 購読登録時の出力される値をすべて取得するので、値1と値2を購読する
  var subscriber1 = subject.stream.listen((val) => print('sub1 listen: $val'));
  // 値3を発行
  subject.add(3);
  // 購読登録時の出力される値をすべて取得するので、値1と値2と値3を購読する
  var subscriber2 = subject.stream.listen((val) => print('sub2 listen: $val'));
  await subject.close();

  // 区切る為に少し待機
  await Future.delayed(Duration(milliseconds: 500));
  print('--------------------');

  // ReplaySubjectを発行(制限あり:2)
  final ReplaySubject<int> limitedSubject = ReplaySubject<int>(maxSize: 2);

  // 値1を発行
  limitedSubject.add(1);
  limitedSubject.add(2);
  // 購読登録時の出力される値を最大2つまで保持するので、値1, 値2を購読する
  var subscriber3 = limitedSubject.stream.listen((val) => print('sub3 listen: $val'));
  // 値3を発行
  limitedSubject.add(3);
  // この時点で値1は破棄されるので、値2, 値3を購読する
  var subscriber4 = limitedSubject.stream.listen((val) => print('sub4 listen: $val'));
  await limitedSubject.close();
}
// 実行結果
// sub1 listen: 1
// sub2 listen: 1
// sub1 listen: 2
// sub2 listen: 2
// sub1 listen: 3
// sub2 listen: 3
// --------------------
// sub3 listen: 1
// sub4 listen: 2
// sub3 listen: 2
// sub4 listen: 3
// sub3 listen: 3

AsyncSubject<T>クラス

Rxにはもう一つAsyncSubjectが存在しますが、rxdartには今のところ(2019/10/06)存在しません。

参考サイト

代わりに、PublishSubject<T>を使用し実現することができます。

void studyAsyncSubject() {
  var subject = PublishSubject<int>();
  var asyncStream = Observable.fromFuture(subject.last);

  subject
    ..add(1)
    ..add(2)
    ..add(3)
    ..close();
  // 最後の値のみ購読する。
  asyncStream.listen(print);
}
// 実行結果
// 3

主な登場人物であるObservableとSubject系オブジェクトが調べ終わったので、次に各オペレータについて少しずつ調べていこうと思います。

ここまで読んで頂き、ありがとうございます。

雑感

見返してみると、そこまで量がないのに書くのに時間がかかるのは文章を書くのと調べるのに慣れていないからだと思いました。
Streams APIがrxdartの基礎になっていることが分かってよかったです。(小並感)
次回はconcatとconcatEagerについて書きかけがあるのでそれを記事にしたいと思います。

参考サイト