狐好きぷろぐらまー

狐好きプログラマー見習いのブログです。

【Dart】rxdartについて調べてみる 第10回目 -Buffer, BufferCount- 【rxdart】

お久しぶりです。 pregum_foxです。

前回からほぼ1か月空いてしまいましたが、引き続き書いていこうと思います。

前回までの記事は以下をご覧ください。

目次です。

動作環境

動作環境は以下の通りです。

項目 バージョン
Dart 2.5.1
rxdart 0.22.2

以下にForkJoinオペレータとZipオペレータについて記載します。 オペレータの名称は、rxdart、Stream APIの順で記載します。

今回からは変換系のオペレータのメモになります。
前回までのストリームに対してC#LinqJavaStreamのようなメソッドチェインでつなげて書いていくことができます。
メソッドチェインで掛けることで、各処理の意図が明確になるというメリットがあります。
便利なのでなんでもチェインで書いてしまう弊害もありますが...
というわけで今回から変換系のオペレータを調べていきます。

Bufferオペレータ

シグネチャ

  • rxdart
    Observable<List<T>> buffer(Stream window)

  • Stream API
    BufferStreamTransformer<T>(Stream window(T event))

処理の概要

Observableなストリームから発行されたアイテムを定期的にまとめ、まとまり事に発行するオペレータです。
とりあえずコードを以下に示します。

import 'package:rxdart/rxdart.dart';

// rxdart
Future studyBuffer() async {
  Observable<int>.periodic(const Duration(milliseconds: 100), (i) {
    // 100msecごとに値を生成する
    print('periodic: $i -- ${DateTime.now()}');
    return i;
  })
      // 150msecごとに購読したストリームをまとめる (型はObservable<List<int>>)
      .buffer(Observable<int>.periodic(const Duration(milliseconds: 150), (j) {
        print('buffer: ${j * 150}msec -- ${DateTime.now()}');
        // dummy data
        return j;
      }))
      // bufferでまとめたストリームを5要素取得する
      .take(5)
      // 購読する
      .listen((data) => print('listen: $data'), onDone: () => print('done. '));

  // 区切るために少し待つ
  await Future.delayed(const Duration(milliseconds: 1000));
  print('-----------------------------------------');

  Observable<int>.periodic(const Duration(milliseconds: 100), (i) {
    print('periodic: $i -- ${DateTime.now()}');
    return i;
  })
      // 購読したストリームを8要素取得する
      .take(8)
      // 300msecごとにまとめる
      .buffer(Observable.periodic(const Duration(milliseconds: 300)))
      // List<int>を購読する
      .listen((data) => print('listen: $data'), onDone: () => print('done.'));
}
// 実行結果
// periodic: 0 -- 2019-11-30 21:33:11.732326
// periodic: 1 -- 2019-11-30 21:33:11.830984
// buffer: 0msec -- 2019-11-30 21:33:11.887837
// listen: [0, 1]
// periodic: 2 -- 2019-11-30 21:33:11.930723
// periodic: 3 -- 2019-11-30 21:33:12.031452
// buffer: 150msec -- 2019-11-30 21:33:12.038436
// listen: [2, 3]
// periodic: 4 -- 2019-11-30 21:33:12.131357
// buffer: 300msec -- 2019-11-30 21:33:12.187466
// listen: [4]
// periodic: 5 -- 2019-11-30 21:33:12.230695
// periodic: 6 -- 2019-11-30 21:33:12.331394
// buffer: 450msec -- 2019-11-30 21:33:12.338383
// listen: [5, 6]
// periodic: 7 -- 2019-11-30 21:33:12.431134
// buffer: 600msec -- 2019-11-30 21:33:12.488004
// listen: [7]
// done. 
// -----------------------------------------
// periodic: 0 -- 2019-11-30 21:33:12.735647
// periodic: 1 -- 2019-11-30 21:33:12.834066
// periodic: 2 -- 2019-11-30 21:33:12.934208
// periodic: 3 -- 2019-11-30 21:33:13.034972
// listen: [0, 1, 2, 3]
// periodic: 4 -- 2019-11-30 21:33:13.134700
// periodic: 5 -- 2019-11-30 21:33:13.234820
// periodic: 6 -- 2019-11-30 21:33:13.335909
// listen: [4, 5, 6]
// periodic: 7 -- 2019-11-30 21:33:13.434182
// listen: [7]
// done.
  
// Stream API
void studyBufferStream() async {
  // transform用に作成
  BufferStreamTransformer<int> transform =
      BufferStreamTransformer<int>((window) {
    return Stream<int>.periodic(const Duration(milliseconds: 150), (j) {
      print('buffer: ${j * 150}msec -- ${DateTime.now()}');
      // dummy data
      return j;
    });
  });

  // 100msecごとにストリームを生成する
  Stream<int>.periodic(const Duration(milliseconds: 100), (i) {
    print('periodic: $i -- ${DateTime.now()}');
    return i;
  })
      // 変換オペレータ(BufferStreamTransformer)で購読したストリームを
      .transform(transform)
      // bufferでまとめたストリームを5要素取得する
      .take(5)
      // List<int>を購読する
      .listen((data) => print('listen: $data'), onDone: () => print('done.'));

  // 区切る為に少し待つ
  await Future.delayed(const Duration(milliseconds: 1000));
  print('-----------------------------------------');

  Stream<int>.periodic(const Duration(milliseconds: 100), (i) {
    print('periodic: $i -- ${DateTime.now()}');
    return i;
  })
      // 購読したストリームを8要素取得する
      .take(8)
      // 300msecごとにまとめる
      .transform(BufferStreamTransformer<int>((window) {
    return Stream<int>.periodic(const Duration(milliseconds: 300));
  }))
  // List<int>を購読する
  .listen((data) => print('listen: $data'), onDone: () => print('done.'));
}
// 実行結果
// periodic: 0 -- 2019-11-30 21:35:49.727264
// periodic: 1 -- 2019-11-30 21:35:49.825523
// buffer: 0msec -- 2019-11-30 21:35:49.884487
// listen: [0, 1]
// periodic: 2 -- 2019-11-30 21:35:49.925599
// periodic: 3 -- 2019-11-30 21:35:50.026411
// buffer: 150msec -- 2019-11-30 21:35:50.034391
// listen: [2, 3]
// periodic: 4 -- 2019-11-30 21:35:50.126242
// buffer: 300msec -- 2019-11-30 21:35:50.183365
// listen: [4]
// periodic: 5 -- 2019-11-30 21:35:50.225443
// periodic: 6 -- 2019-11-30 21:35:50.326175
// buffer: 450msec -- 2019-11-30 21:35:50.334158
// listen: [5, 6]
// periodic: 7 -- 2019-11-30 21:35:50.425243
// buffer: 600msec -- 2019-11-30 21:35:50.483406
// listen: [7]
// done.
// -----------------------------------------
// periodic: 0 -- 2019-11-30 21:35:50.728290
// periodic: 1 -- 2019-11-30 21:35:50.828272
// periodic: 2 -- 2019-11-30 21:35:50.928764
// periodic: 3 -- 2019-11-30 21:35:51.028473
// listen: [0, 1, 2, 3]
// periodic: 4 -- 2019-11-30 21:35:51.128276
// periodic: 5 -- 2019-11-30 21:35:51.229244
// periodic: 6 -- 2019-11-30 21:35:51.328975
// listen: [4, 5, 6]
// periodic: 7 -- 2019-11-30 21:35:51.429701
// listen: [7]
// done.

サンプルの実行結果から、発行された値がbufferメソッドまたはBufferStreamTransformerクラス内に定義されている間隔ごとにまとめられていることが分かります。
Stream APIでも書けますが、変換系のオペレータはrxdartのObservable<T>のメソッドを使用した方が処理の意図が分かりやすいと思います。(buffer() vs transform())
一定間隔でまとめてデータの更新を行う時に便利だと思います。
ただ、今回の使い方だとrxdartのオペレータはbufferではなく、bufferTimeの方がより明確に意図が伝わると思います。

buffer系オペレータとwindow系オペレータは各目的に応じてオペレータが用意されています。

今回はbufferとbufferCountオペレータのメモですので、次回にbufferTimeとbufferTestを書こうと思います。

BufferCountオペレータ

シグネチャ

  • rxdart
    Observable<List<T>> bufferCount(int count, [int startBufferEvery = 0])

  • Stream API
    BufferCountStreamTransformer<T> BufferCountStreamTransformer(int count, [int startBufferEvery = 0])

処理の概要

Bufferオペレータのまとめる条件を個数に制限したオペレータです。
購読したストリームがcountに達した段階でまとめたストリームを発行します。
以下にコードを示します。

import 'package:rxdart/rxdart.dart';

Future studyBufferCount() async {

  // 1から4の値を発行するストリームを生成する
  Observable.range(1, 4)
  // 購読したストリームから2要素取得する
  .bufferCount(2)
  // 購読する
  .listen(print);

  // 区切る為に少し待つ
  await Future.delayed(const Duration(milliseconds: 500));
  print('--------------------------------------------------');
  
  // 1から10の値を発行するストリームを生成する
  Observable.range(1, 10)
  // 購読したストリームから5要素取得し、古いストリームから2要素だけ進めたストリームを生成する
  .bufferCount(5, 2)
  // 購読する
  .listen(print);
  
  // 区切る為に少し待つ
  await Future.delayed(const Duration(milliseconds: 500));
  print('--------------------------------------------------');
  
  // 1から10の値を発行するストリームを生成する
  Observable.range(1, 10)
  // 購読したストリームから2要素取得し、古いストリームから5要素進めたストリームを生成する
  .bufferCount(2, 5)
  // 購読する
  .listen(print);
}
// 実行結果
// [1, 2]
// [3, 4]
// --------------------------------------------------
// [1, 2, 3, 4, 5]
// [3, 4, 5, 6, 7]
// [5, 6, 7, 8, 9]
// [7, 8, 9, 10]
// --------------------------------------------------
// [1, 2]
// [6, 7]

Future studyBufferCountStreamTransformer() async {
  // 1から4の値を発行するストリームを生成する
  RangeStream(1, 4)
  // 購読したストリームから2要素取得する
  .transform(BufferCountStreamTransformer(2))
  // 購読する
  .listen(print);

  // 区切る為に少し待つ
  await Future.delayed(const Duration(milliseconds: 500));
  print('--------------------------------------------------');

  // 1から10の値を発行するストリームを生成する
  RangeStream(1,10)
  // 購読したストリームから5要素取得し、古いストリームから2要素だけ進めたストリームを生成する
  .transform(BufferCountStreamTransformer(5, 2))
  // 購読する
  .listen(print);

  // 区切る為に少し待つ
  await Future.delayed(const Duration(milliseconds: 500));
  print('--------------------------------------------------');

  // 1から10の値を発行するストリームを生成する
  RangeStream(1, 10)
  // 購読したストリームから2要素取得し、古いストリームから5要素進めたストリームを生成する
  .transform(BufferCountStreamTransformer(2, 5))
  // 購読する
  .listen(print);
}
// 実行結果
// [1, 2]
// [3, 4]
// --------------------------------------------------
// [1, 2, 3, 4, 5]
// [3, 4, 5, 6, 7]
// [5, 6, 7, 8, 9]
// [7, 8, 9, 10]
// --------------------------------------------------
// [1, 2]
// [6, 7]

サンプルの実行結果から、値が個数によってまとめられていることが分かります。
また、2つ目の処理ブロックは5要素ずつまとめて2要素間隔で進めているので、最初の要素が1, 3, 5, 7と2要素ずつずれていっていることが分かります。
また要素が足りなくなったタイミングで購読が終了しています。
3つ目の処理ブロックは2要素ずつまとめて5要素間隔で進めている為、最初の要素が1, 6と5要素ずつずれていることが分かります。

雑感

生成系のオペレータ(RangeStream、Stream.periodic)はrxdartとStream API共に名前が違う程度でほぼ同じように書けていたので、特にハマらずに書けていましたが、変換系のオペレータだとStream APIでの書き方がObservableとの書き方が違う為、ハマりました。
書き心地できには、順番に書けてタイプ数もそこまで多くならないrxdartの方が書きやすいなと思いました。
ただ、dartは標準でstreamが用意されている為、rxdartが使用できない状況下でも使用するオペレータだけ自作すれば、ある程度は似たようなことができるのはすごいと思いました。

firebase系のプラグインとかも少し試したりしていましたのでまたそのあたり記事にできたらなと思います。
更新もなるべく頑張ります。

ご覧頂きありがとうございました。

参考サイト