狐好きぷろぐらまー

狐好きプログラマー見習いのブログです。

【Dart】rxdartについて調べてみる 第9回目 -ForkJoin, Zip- 【rxdart】

こんにちは。 pregu_foxです。

今回は、ForkJoinオペレータとZipオペレータについて調べました

前回までの記事は以下をご覧ください。

目次です。

動作環境

動作環境は以下の通りです。

項目 バージョン
Dart 2.5.1
rxdart 0.22.2

以下にForkJoinオペレータとZipオペレータについて記載します。 オペレータの名称は、rxdart、Stream APIの順で記載します。

ForkJoinオペレータ

シグネチャ

  • rxdart
    Observable<T> Observable.forkJoin2<A, B, T>(Stream<A> streamA, Stream<B> streamB, T combiner(A a, B b))
    ※forkJoin静的メソッドは一度にまとめるstreamの数でメソッド名が変化します。
    forkJoinからforkJoin9まで存在します。(メソッドの末尾の数字が、発行時にまとめるstreamの数です)
    以下に各apiのreferenceサイトを記載します。

  • forkJoin

  • forkJoin2
  • forkjoin3
  • forkjoin4
  • forkjoin5
  • forkjoin6
  • forkjoin7
  • forkjoin8
  • forkjoin9

また、使用方法は違いますが、動的な数のstreamの最新のデータだけを発行するリスト型のstreamを作成したい場合は、forkJoinList静的メソッドが便利です。

また、こちらも動的な数のstreamの各々の最新データだけを発行するリスト型のstreamを作成したい場合は、ForkJoinStream.list静的メソッドが使用できます。

処理の概要

このオペレータは複数のstreamをまとめて購読し、すべてのstreamから値が発行された時に、指定した関数に従って1つのstreamにまとめます。
forkJoinList()やForkJoinStream.list()は、動的な数のstreamをまとめる時に役に立ちます。
以下にサンプルを示します。

import 'package:rxdart/rxdart.dart';

/// rxdartのForkJoinオペレータ
void studyForkJoinTwo() {
  Observable.forkJoin2(
    Observable.just(1),
    Observable.fromIterable([0, 2, 4]),
    (a, b) => a + b,
  ).listen(
    print,
    onDone: () => print('done'),
  );
}
// 実行結果
// 5
// done
  
/// rxdartのForkJoinオペレータその2
void studyForkJoinTwo2() async {
  PublishSubject<int> ps1 = PublishSubject<int>();
  PublishSubject<int> ps2 = PublishSubject<int>();
  
  Observable.forkJoin2<int, int, int>(
    ps1.stream,
    ps2.stream,
    (ps1, ps2) {
      print('ps1: $ps1 + ps2: $ps2 = ${ps1 + ps2}');
      return ps1 + ps2;
    },
  ).listen(
    (val) => print('listen: $val'),
    onDone: () => print('done.'),
  );
  
  print('ps1 onNext -- 1');
  await ps1.add(1);
  print('ps2 onNext -- 11');
  await ps2.add(11);
  print('ps2 onNext -- 12');
  await ps2.add(12);
  print('ps2 onNext -- 13');
  await ps2.add(13);
  print('ps1 onNext -- 2');
  await ps1.add(2);
  await ps1.close();
  await ps2.close();
}
// 実行結果
// ps1 onNext -- 1
// ps2 onNext -- 11
// ps2 onNext -- 12
// ps2 onNext -- 13
// ps1 onNext -- 2
// ps1: 2 + ps2: 13 = 15
// listen: 15
// done.
  
/// Stream APIのForkJoinオペレータ
void studyForkJoinTwoStream() {
  ForkJoinStream.combine2(
    Stream.value(1),
    Stream.fromIterable([0, 2, 4]),
    (a, b) => a + b,
  ).listen(
    print,
    onDone: () => print('done.'),
  );
}
// 実行結果
// 5
// done

サンプルコードの実行結果から、最終的なstreamの値のみ発行されていることが分かります。

Zipオペレータ

シグネチャ

  • rxdart
    Observable<T> Observable.zip2<A, B, T>(Stream<A> streamA, Stream<B> streamB, T zipper(A a, B b))
    ※forkJoinと同様にzip2静的メソッドもstreamをまとめる数でメソッド名が変化します。

  • zip

  • zip2
  • zip3
  • zip4
  • zip5
  • zip6
  • zip7
  • zip8
  • zip9

動的にstreamの数を変化させたい場合は、zipListを利用すると良いでしょう。

  • Stream API ZipStream<dynamic, R> ZipStream.zip2<A, B, R>(Stream<A> streamOne, Stream<B> streamTwo, R zipper(A a, B b)) ※こちらもForkJOinStream.combineと同じ様に、引数の数によって変化するzip静的メソッドが存在します。

  • zip

  • zip2
  • zip3
  • zip4
  • zip5
  • zip6
  • zip7
  • zip8
  • zip9

動的に引数のstreamの数を変化させたい場合は、ZipStream.listを利用すると良いでしょう。

処理の概要

引数のstreamが全て作成された段階で、引数のzipper関数を使用してまとめた1つのstreamを発行します。
forkJoinオペレータに動作が似ていますが、違う箇所は、forkJoinオペレータは最後のstreamのみ発行しますが、zipオペレータは全てのstreamが1以上作成されている場合にzipper関数でまとめた1つのstreamを発行します。その為、引数の全てのstreamに5つの値が作成された場合、5つのstreamが発行されます。
以下にサンプルを示します。

import 'package:rxdart/rxdart.dart';

/// rxdartのZipオペレータ
void studyZipTwo() async {
  PublishSubject<int> ps1 = PublishSubject<int>();
  PublishSubject<int> ps2 = PublishSubject<int>();
  
  Observable.zip2(
    ps1.stream,
    ps2.stream,
    (ps1, ps2) {
      print('ps1: $ps1 + ps2: $ps2 = ${ps1 + ps2}');
      return ps1 + ps2;
    },
  ).listen(
    (val) => print('listen: $val'),
    onDone: () => print('done.'),
  );
  
  print('ps1 onNext -- 1');
  await ps1.add(1);
  print('ps2 onNext -- 11');
  await ps2.add(11);
  print('ps2 onNext -- 12');
  await ps2.add(12);
  print('ps2 onNext -- 13');
  await ps2.add(13);
  print('ps1 onNext -- 2');
  await ps1.add(2);
  await ps1.close();
  await ps2.close();
}
// 実行結果
// ps1 onNext -- 1
// ps2 onNext -- 11
// ps1: 1 + ps2: 11 = 12
// listen: 12
// ps2 onNext -- 12
// ps2 onNext -- 13
// ps1 onNext -- 2
// ps1: 2 + ps2: 12 = 14
// listen: 14
// done.
  
/// rxdartのzipListオペレータ
void studyZipList() {
  Observable.zipList([
    Observable.fromIterable(['a']),
    Observable.fromIterable(['b']),
    Observable.fromIterable(['c', 'd', 'e']),
  ]).listen(
    print,
    onDone: () => print('done'),
  );
}
// 実行結果
// [a, b, c]
// done
  
/// Stream APIのZipオペレータ
void studyZipTwoStream() async {
  StreamController<int> ps1 = StreamController<int>();
  StreamController<int> ps2 = StreamController<int>();
  
  ZipStream.zip2(
    ps1.stream,
    ps2.stream,
    (ps1, ps2) {
      print('ps1: $ps1 + ps2: $ps2 = ${ps1 + ps2}');
      return ps1 + ps2;
    },
  ).listen(
    (val) => print('listen: $val'),
    onDone: () => print('done.'),
  );
  
  print('ps1 onNext -- 1');
  await ps1.add(1);
  print('ps2 onNext -- 11');
  await ps2.add(11);
  print('ps2 onNext -- 12');
  await ps2.add(12);
  print('ps2 onNext -- 13');
  await ps2.add(13);
  print('ps1 onNext -- 2');
  await ps1.add(2);
  await ps1.close();
  await ps2.close();
}
// 実行結果
// ps1 onNext -- 1
// ps2 onNext -- 11
// ps1: 1 + ps2: 11 = 12
// listen: 12
// ps2 onNext -- 12
// ps2 onNext -- 13
// ps1 onNext -- 2
// ps1: 2 + ps2: 12 = 14
// listen: 14
// done.

サンプルの実行結果から、引数の各streamが作成されたタイミングでstreamが発行されていることが分かります。

読んで頂き、ありがとうございました。

参考サイト