狐好きぷろぐらまー

狐好きプログラマーのブログです。

【Dart】rxdartについて調べてみる 第8回目 -SwitchLatest, CombineLatest- 【rxdart】

こんにちは。 pregum_foxです。

今回は、SwitchLatestオペレータとCombineLatestオペレータについて調べました。

前回までの記事は以下をご覧ください。

動作環境

動作環境は以下の通りです。

項目 バージョン
Dart 2.5.1
rxdart 0.22.2

以下にSwitchLatestオペレータとCombineLatestオペレータについて記載します。 オペレータの名称はRx、rxdart、Stream APIの順で記載します。

SwitchLatestオペレータ

RxにあるコアのオペレータにはSwitchのみ存在していて、SwitchLatestオペレータはプログラミング言語によって実装されていたりいなかったりするみたいです。

シグネチャ

  • rxdart
    Observable<T> Observable<T>.switchLatest(Stream<Stream<T>> streams)

  • Stream API
    SwitchLatest<T> SwitchLatest<T>(Stream<Stream<T>> streams)

処理の概要

streamを発行するstreamを、それらのstreamの中で最も最近(遅く)発行された値を発行するstreamに変換します。
分かりやすい利用する状況は、インクリメンタルサーチ(逐次検索)だと思います。
googleでフォーと打つとフォートナイトやフォーエバー21等の候補が出る機能を想像されるとわかりやすいかと思います。
フォまで打った時の検索候補を取得している間に長音符"ー"が入力された場合、"フォー"での検索候補のみ欲しいのでフォで取得した検索候補は購読しないようにしたい時、役に立つかもしれません。
似たようなオペレータでswitchMap/SwitchMapStreamTransformerがありますのでそちらも調べてみてください。(いつか記事が書けると思います)
以下にサンプルを示します。

import 'package:rxdart/rxdart.dart';

/// rxdartのSwitchLatestオペレータサンプル
void studySwitchLatest() {
  Observable<String>.switchLatest(
    Observable.fromIterable([
      Observable.timer('1', Duration(seconds: 1)),
      Observable.just('2'),
      Observable.timer('3', Duration(milliseconds: 1500)),
      Observable.timer('4', Duration(seconds: 1)),
    ]),
  ).listen(print, onDone: () => print('done.'));
}
// 実行結果
// 2
// 4
// done.
  
/// rxdartのSwitchLatestオペレータサンプルその2
void studySwitchLatest2() async {
  final subject = PublishSubject<int>();

  subject.stream
      .switchMap(
        (i) => Observable.periodic(
            Duration(milliseconds: 1000), (val) => (val + 1) * i).take(3),
      )
      .listen((i) => print('${DateTime.now()} -- $i'),
          onDone: () => print('done.'));

  print('start');
  print('onNext 1');
  subject.add(1);
  await Future.delayed(Duration(milliseconds: 2000));
  print('onNext 2');
  subject.add(2);
  await Future.delayed(Duration(milliseconds: 2000));
  print('onNext 3');
  subject.add(3);
  await subject.close();
}
// 実行結果
// start
// onNext 1
// 2019-10-23 21:23:17.161855 -- 1
// onNext 2
// 2019-10-23 21:23:19.168071 -- 2
// onNext 3
// 2019-10-23 21:23:21.168319 -- 3
// 2019-10-23 21:23:22.168026 -- 6
// 2019-10-23 21:23:23.167304 -- 9
// done.
  
/// Stream APIのSwitchLatestオペレータサンプル
void studySwitchLatestStream() {
  SwitchLatestStream<String>(
    Stream.fromIterable([
      TimerStream('1', Duration(seconds: 1)),
      Stream.value('2'),
      TimerStream('3', Duration(milliseconds: 1500)),
      TimerStream('4', Duration(seconds: 1)),
    ]),
  ).listen(print, onDone: () => print('done.'));
}
// 実行結果
// 2
// 4
// done.

サンプルの実行結果から、値が発行されるまでの間に次の値が発行された場合、発行待ちの値が発行されないままであることが確認できます。

CombineLatestオペレータ

シグネチャ

  • rxdart
    Observable<T> Observable<T>.combineLatest2<A,B,T>(Stream<A> streamA, Stream<B> streamB, T combiner(A a, B b))

※ObservableLatest2の場合

  • Stream API
    CombineLatestStream<T, R>(Iterable<Stream<T>> streams R combiner(List<T> values))

処理の概要

指定したstreamの数を購読し、指定したstreamの値を購読したら、1つのstreamにマージする関数を使用して、1つのstreamにまとめた後、発行します。
以下にサンプルを示します。

import 'package:rxdart/rxdart.dart';

/// rxdartのCombineLatestオペレータサンプル
void studyCombineLatest() {
  Observable.combineLatest2(
    Observable.just('hello'),
    Observable.fromIterable(['mike', 'jon']),
    (a, b) => '$a $b',
  ).listen(print, onDone: () => print('done.'));
}
// 実行結果
// hello mike
// hello jon
// done.
  
/// Stream APIのCombineLatestオペレータサンプル
void studyCombineLatestStream() {
  CombineLatestStream.combine2(
    Stream.value('hello'),
    Stream.fromIterable(['mike', 'jon']),
    (a, b) => '$a $b',
  ).listen(print, onDone: () => print('done.'));
}
// 実行結果
// hello mike
// hello jon
// done.

void studyCombineLatest2() async {
  PublishSubject sb1 = PublishSubject();
  PublishSubject sb2 = PublishSubject();
  
  Observable.combineLatest2(sb1.stream, sb2.stream, (a, b) => 'a:$a b:$b')
      .listen(print, onDone: () => print('done.'));
  
  print('sb1 onNext 1');
  await sb1.add(1);
  print('sb2 onNext 10');
  await sb2.add(10);
  print('sb1 onNext 2');
  await sb1.add(2);
  print('sb1 onNext 3');
  await sb1.add(3);
  print('sb2 onNext 20');
  sb2.add(20);
}
// 実行結果
// sb1 onNext 1
// sb2 onNext 10
// a:1 b:10
// sb1 onNext 2
// a:2 b:10
// sb1 onNext 3
// a:3 b:10
// sb2 onNext 20
// a:3 b:20
  
/// CombineLatest4のサンプル
void studyCombineLatest3() {
  Observable.combineLatest4(
      Observable.just(1),
      Observable.just(10),
      Observable.just(100),
      Observable.fromIterable([3, 33, 333]),
      (a, b, c, d) => a + b + c + d).listen(print, onDone: () => print('done.'));
}
// 実行結果
// 114
// 144
// 444
// done.

サンプルの実行結果から、複数のObservableな値を最後の引数で記述した内容で処理されたstreamが出力されていることが分かります。
ここでは、2つまたは4つのObservableをまとめたサンプルのみ書いていますが、最大9つまで同時にObservableな値を処理することができます。

読んで頂き、ありがとうございました。

参考サイト