狐好きぷろぐらまー

狐好きプログラマーのブログです。

【Dart】rxdartについて調べてみる 第1回目 -概要-【rxdart】

こんにちは、pregum_foxです。

最近私の中でBLoCを使ってみたい欲が高まってきていますが、rxdartについて全くと言っていいほど知識がないため、rxdartについて備忘録としてまとめていきたいと思います。 最初は、rxdartの概要を調べてみました。

目次です。

rxdartとは

rxdartとはDartでRxを使用する為のライブラリです。

githubのreadme.mdの冒頭を下記に引用します。

RxDart is a reactive functional programming library for Google Dart, based on ReactiveX. Google Dart comes with a very decent Streams API out-of-the-box; rather than attempting to provide an alternative to this API, RxDart adds functionality on top of it.

google翻訳で日本語に訳すと

RxDartは、ReactiveXに基づいたGoogle Dartのリアクティブ関数型プログラミングライブラリです。 Google Dartには、すぐに使えるまともなStreams API が付属しています。このAPIの代替を提供しようとするのではなく、RxDartはその上に機能を追加します。

上記の通り、DartにはStream APIがあり、Rxと似たような記述ができる言語機能がありますが、Rxで統一されている方が他言語の型でも理解がしやすいという事とBLoCでrxdartが使用されているということで、rxdartを積極的に使っていきたいです。
様々な言語でReactive Extensions(Rx)が用意されている為、ある言語で覚えておくと別の言語での学習コストが軽くなりそうでいいですね。

こちらのサイトには、以下の言語でRxが用意されていると記載されていました。すごいですね。

Java: RxJava
JavaScript: RxJS
C#: Rx.NET
C#(Unity): UniRx
Scala: RxScala
Clojure: RxClojure
C++: RxCpp
Lua: RxLua
Ruby: Rx.rb
Python: RxPY
Go: RxGo
Groovy: RxGroovy
JRuby: RxJRuby
Kotlin: RxKotlin
Swift: RxSwift
PHP: RxPHP
Elixir: reaxive
Dart: RxDart

Rx

Rxについては他言語でのとても分かりやすい説明が沢山ありましたので参考にさせてもらったサイトを記載しておきます。

今の段階で私がRxについてわかっていることは、Observerパターンを拡張し非同期処理やイベントなどを1つの時間軸に乗せて、非同期なストリームとして扱うことで様々な値を抽象的に扱うことができるライブラリということです。
核となるオブジェクトが2つあり、1つは値を発行するオブジェクトのObservable、
もう1つはその値を受け取るオブジェクトのObservableの2つからなっています。 またrxdartでは、値を発行するのにSinkクラスを使用し、その値を観察(購読/listen)するのにStreamクラスを使用しています。 コードでは以下のような感じです。

void test_firstRx() async {
  // 値を発行するオブジェクト(Sink)を作成
  PublishSubject<String> subject = PublishSubject<String>();
  print('isBroadCast: ${subject.isBroadcast}');
  // 観察(購読)可能なオブジェクト(Stream)用変数を作成
  final Observable<String> observable = subject.stream;
  // 観察役(購読者)その1を登録(作成)
  final StreamSubscription<String> observer1 = observable.listen(
      // 観察(購読)しているオブジェクト(Stream)から値が流れてきた時に行う処理
      (str) => print('observable1 say: $str'),
      // エラーが流れてきた時に行う処理
      onError: (str) => print('error occurs!!!1!!!'),
      // Streamが閉じられた時に行う処理
      onDone: () => print('observer1 done.'));
  // 観察役(購読者)その2を登録(作成)
  final StreamSubscription<String> observer2 = observable
      // キャンセルした時に実行される処理
      .doOnCancel(() => print('on cancel 2.'))
      .listen((str) => print('observable2 say: $str'),
          onError: (str) => print('error occurs!!!2!!!'),
          onDone: () => print('observer2 done.'));

  // 値'step 1'を発行
  subject.add('step 1');
  // 値を購読する前にキャンセルされるのを防ぐ為、delayする
  await Future.delayed(Duration(milliseconds: 100));
  // 観察役(購読者)その2の購読をキャンセルさせたので、観察役(購読者)その2はこの後の値を購読しない
  await observer2.cancel();
  subject.add('step 2');
  // エラーを発行
  subject.addError('step error');
  subject.add('step 3');
  // Streamを閉じる
  await subject.close();
}
// 実行結果
// isBroadCast: true
// observable1 say: step 1
// observable2 say: step 1
// on cancel 2.
// observable1 say: step 2
// error occurs!!!1!!!
// observable1 say: step 3
// observer1 done.

上記の例では、SinkとStreamを一緒の関数の中で作って、その中で購読をしているサンプルの為、ありがたみがわかりづらいかと思いますが、値を発行する側ではSinkクラスを値を購読する側はStreamクラスを使用することによって処理を分離させることができます。

また、コレクションを高階関数で処理する時と同じような名前・意味で使用することができるメソッドが多々ありますのでDartのIterable<T>型のメソッドについて知っている方(map(), where(), take(), skip()等)は同じような感覚でRxでも使用できるメソッドが多々あるので学習コストが比較的低く済むと思います。
Rxは多言語で実装されているため、各言語の思想に合わせた実装になっています。
Dartでは既に言語機能として実装されているDart Streamクラスを拡張してObservableが作成されて、StreamControllerクラスを拡張してSubject系のクラス(PublishSubject, BehaviorSubject, ReplaySubject)が作成されています。

rxdartを使う上での注意点

rxdartはDart Streamクラスを拡張して作成されている為、ほとんどの場合同じような挙動をしますがrxdartはRxに合わせて作成されている為、いくつか違う点があり調べている途中で私が躓いた点を記載します。

  1. Dart Streamのstreamはデフォルトではシングルサブスクリプションですが、Subject系のクラス(PublishSubject, BehaviorSubject, ReplaySubject)のstreamはデフォルトでブロードキャストです。
    サンプルコードを以下に記載します。
void testBroadCast(){
  // デフォルトコンストラクタだとシングルサブスクリプションなので複数listenできないためExceptionが発生する
  StreamController<String> st = StreamController<String>();
  // broadcastファクトリ関数で作成したStreamはブロードキャストなので複数listen可能
  // StreamController<String> st = StreamController<String>.broadcast();
  // Subject系のStreamもbroad castなので複数listen可能
  // Subject<String> st = PublishSubject<String>();

  final Stream<String> observable = st.stream;
  print('isBroadCast: ${observable.isBroadcast}');
  var observer1 = observable.listen((val) => print('say 1 $val'));
  try{
    // ここでException発生
    var observer2 = observable.listen((val) => print('say 2 $val'));
  } catch (e) {
    print('error occurs');
  }
  st.sink.add('hello rx');
}
// 実行結果
// isBroadCast: false
// error occurs
// say 1 hello rx

上記の実行結果を見てもらうとわかりますが、Subject系のstreamはデフォルトでブロードキャストなので、複数の観察(購読)を行うことが可能でしたが、Dart Straemのstreamはシングルサブスクリプションなので、単一の観察(購読)しか行うことができません。
その為、上記のコードは2つ目の観察役(購読者)を登録する行でExceptionが発生します。
その点、rxdartのSubject系はデフォルトでブロードキャストなので、複数の観察役(購読者)を登録してもExceptionは発生しません。

  1. Dart StreamのstreamやrxdartのSubject系のstreamはエラーが発生してもデフォルトでは閉じません。エラーが発生した時に閉じる場合は、cancelOnErrorをtrueにする必要があります。

例としては以下をご覧ください。

void test_errorClose() {
  // StreamController<int> st = StreamController<int>();
  // 値を発行するオブジェクト(Sink)を作成
  PublishSubject<int> st = PublishSubject<int>();

  final subject = st;
  final observable = st.stream;

  // エラー観察時、観察をキャンセルする観察役
  var cancelObserver = observable.listen((val) => print('cancelObserver listen: $val'),
      // エラー発生時に実行する処理
      onError: (error, stackTrace) => print('error occurs. cancel listen.'),
      // Streamが閉じられた時に実行する処理
      onDone: () => print('errorStopObserver done.'),
      // エラーが発生した時、観察をキャンセルするかどうかのフラグ(デフォルトはfalse)
      cancelOnError: true);
  // エラー観察時、観察を続ける観察役
  var continueObserver = observable.listen((val) => print('continueObserver listen: $val'),
      onError: (error, stackTrace) => print('error occurs. continue listen.'),
      onDone: () => print('continueObserver done.'));

  // 値5を発行
  subject.add(5);
  // エラーを発行
  // この時点で、cancelOnErrorがtrueの場合、観察(購読)をキャンセルする
  st.addError(3);
  // 値4を発行
  subject.add(4);
  // Streamを閉じる
  subject.close();
}
// 実行結果
// cancelObserver listen: 5
// continueObserver listen: 5
// error occurs. cancel listen.
// error occurs. continue listen.
// continueObserver listen: 4
// continueObserver done.

上記のように、エラー観察時に以降の観察をキャンセルしたい場合、listen時にcancelOnErrorをtrueにしておく必要があります。

詳細は以下のサイトをご覧ください。

Differences of RxDart and Rx standard could be explicit in the first page. · Issue #186 · ReactiveX/rxdart · GitHub

https://pub.dev/documentation/rxdart/latest/rx/Observable-class.html

Rxについて少し分かったところで、次からrxdartでよく使う、Observable<T>、 PublishSubject<T>、BehaviorSubject<T>、ReplaySubject<T>について調べていきたいと思います。

雑感

rxdartは、rxと付いているので以前私がC#で少し触っていたReactive Extensions(Rx.NET)と同じものだと思って触ってみていましたが、いろいろ調べていく内にDartのStreamの上に載っているRxの為、言語機能を工夫して実装されていて勉強になるなと思いました。ただ、Rx.NETを書いている時のメソッド名や用語が変わっていたりしていて少し戸惑いました。(シングルサブスクリプションがcoldのようなものでブロードキャストがhotのようなもの等)
rxを使う利点の1つとしては言語の壁を越えて概念を共有できることだと思っているので、rxdartに限らず各Rxの違いが分かるページがあるといいなぁと感じました。(量的に難しそうですが...)
Dartには既にStreams APIが付属していてrxdartと相互に切り替える機能があるので、ライブラリを作る時に内部ではrxdartを使ってインタフェースはStreams APIにするという使い方もできて素敵だと思いました。

調べつつ書いていますので、ここ違うんじゃないかという箇所が多々あるかもしれませんがその場合、コメント頂けると幸いです。

ここまで読んで頂きありがとうございました。

参考サイト