狐好きぷろぐらまー

狐好きプログラマー見習いのブログです。

【Dart】rxdartについて調べてみる 第11回目 -BufferTestとBufferTime- 【rxdart】

こんにちは。 pregum_foxです。

今回は、BufferTestとBufferTimeについてです。

使用しているrxdartのバージョンの注意

現在こちらの記事で使用しているrxdartのバージョンは0.22.xです。

0.23.xで破壊的変更が入ったため、この記事に記載しているコードは0.23.x以上では動作しません。

0.23.x未満のコードを0.23に対応させる方法がこちらに載っていますので、もし0.23以上で使用したい場合は、こちらの手順に従って対応してください。

少しずつ以下の記事についても0.23のバージョンに変更していく予定です。


前回までの記事は以下をご覧ください。

目次です。

動作環境

動作環境は以下の通りです。

項目 バージョン
Dart 2.6.1
rxdart 0.22.2

以下にBufferTestオペレータとBufferTimerオペレータについて記載します。 オペレータの名称は、rxdart、Stream APIの順で記載します。

BufferTestオペレータ

シグネチャ

  • rxdart

Observable<List<T>> bufferTest(bool onTestHandler(T event))

BufferTestStreamTransformer<T> BufferTestStreamTransformer(bool test(T value))

処理の概要

生成されたストリームを述語の条件式でtrueになるまで溜め、条件式がtrueになったタイミングでそれまでためられたストリームが発行されます。

以下にサンプルコードを記載します。

import 'package:rxdart/rxdart.dart';

/// BufferTestのサンプルコード
Future studyBufferTest() async {
  Observable.periodic(Duration(milliseconds: 100), (int i) => i)
      .take(20)
      // ストリームの値が偶数ならば発行します。
      .bufferTest((i) => i % 2 == 0)
      .listen(print, onDone: () => print('on done.'));

  // 上と下の処理が同時に実行されないよう少し待つ
  await Future.delayed(Duration(milliseconds: 3000));
  print('--------------------');

  Observable.periodic(Duration(milliseconds: 100), (int i) => i)
      .take(11)
      // ストリームの値が5の倍数ならば発行します。
      .bufferTest((i) => i % 5 == 0)
      .listen(print, onDone: () => print('on done.'));
}
// 実行結果
// [0]
// [1, 2]
// [3, 4]
// [5, 6]
// [7, 8]
// [9, 10]
// [11, 12]
// [13, 14]
// [15, 16]
// [17, 18]
// [19]
// on done.
// --------------------
// [0]
// [1, 2, 3, 4, 5]
// [6, 7, 8, 9, 10]
// on done.

Future studyBufferTestStreamTransformer() async {
  Stream.periodic(Duration(milliseconds: 100), (i) => i)
      .take(20)
      // ストリームの値が偶数ならば発行します。
      .transform(BufferTestStreamTransformer((i) => i % 2 == 0))
      .listen(print, onDone: () => print('on done.'));

  // 上下の処理が同時に実行されないよう少し待つ
  await Future.delayed(Duration(milliseconds: 3000));
  print('--------------------');

  Stream.periodic(Duration(milliseconds: 100), (i) => i)
      .take(11)
      // ストリームの値が5の倍数ならば発行します。
      .transform(BufferTestStreamTransformer((i) => i % 5 == 0))
      .listen(print, onDone: () => print('on done.'));
}
// [0]
// [1, 2]
// [3, 4]
// [5, 6]
// [7, 8]
// [9, 10]
// [11, 12]
// [13, 14]
// [15, 16]
// [17, 18]
// [19]
// on done.
// --------------------
// [0]
// [1, 2, 3, 4, 5]
// [6, 7, 8, 9, 10]
// on done.

BufferTimeオペレータ

シグネチャ

  • rxdart

Observable<List<T>> bufferTime(Duration duration)

BufferStreamTransformer<T> BufferStreamTransformer(Stream window(T event))

処理の概要

指定された期間が経過したらそれまで溜めていたストリームを発行します。 以下にサンプルコードを示します。

import 'package:rxdart/rxdart.dart';

Future studyBufferTime() async {
  // 100msecごとにカウントアップするストリームを生成する
  Observable.periodic(Duration(milliseconds: 100), (int i) => i)
      // 330msecごとにサンプリングする
      .bufferTime(Duration(milliseconds: 330))
      // 5要素取得する
      .take(5)
      // 購読する
      .listen(print, onDone: () => print('on done.'));

  // 区切る為に少し待つ
  await Future.delayed(const Duration(milliseconds: 2000));
  print('--------------------------------------------------');

  // 500msecごとにカウントアップするストリームを生成する
  Observable.periodic(Duration(milliseconds: 500), (int i) => i)
      // 100msecごとにサンプリングする
      .bufferTime(Duration(milliseconds: 100))
      // 5要素取得する
      .take(5)
      // 購読する
      .listen(print, onDone: () => print('on done.'));
}
// [0, 1, 2, 3]
// [4, 5, 6]
// [7, 8, 9]
// [10, 11, 12, 13]
// [14, 15, 16]
// on done.
// --------------------------------------------------
// [0]
// []
// []
// []
// [1]
// on done.


Future studyBufferTimeStreamTransfor() async {
  Stream.periodic(Duration(milliseconds: 100), (int i) => i)
      .transform(BufferStreamTransformer(
          (int window) => Stream.periodic(Duration(milliseconds: 330))))
      .take(5)
      .listen(print, onDone: () => print('on done.'));

  // 区切る為に少し待つ
  await Future.delayed(const Duration(milliseconds: 2000));
  print('--------------------------------------------------');

  Stream.periodic(Duration(milliseconds: 500), (int i) => i)
      .transform(BufferStreamTransformer(
          (int window) => Stream.periodic(Duration(milliseconds: 100))))
      .take(5)
      .listen(print, onDone: () => print('on done.'));
}
// [0, 1, 2, 3]
// [4, 5, 6]
// [7, 8, 9]
// [10, 11, 12, 13]
// [14, 15, 16]
// on done.
// --------------------------------------------------
// [0]
// []
// []
// []
// [1]
// on done.

雑感

前回からすごい時間が空いてしまいました。。。

少し離れていましたが、また少しずつ更新頻度を上げていきたいと思います。

ここまで読んで頂き、ありがとうございます。

参考サイト