こんにちは。 pregum_foxです。
今回は、BufferTestとBufferTimeについてです。
使用しているrxdartのバージョンの注意
現在こちらの記事で使用しているrxdartのバージョンは0.22.xです。
0.23.xで破壊的変更が入ったため、この記事に記載しているコードは0.23.x以上では動作しません。
0.23.x未満のコードを0.23に対応させる方法がこちらに載っていますので、もし0.23以上で使用したい場合は、こちらの手順に従って対応してください。
少しずつ以下の記事についても0.23のバージョンに変更していく予定です。
前回までの記事は以下をご覧ください。
- 【Dart】rxdartについて調べてみる 第1回目 -概要-【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第2回目 -ObservableとSubject系クラス- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第3回目 -concatとconcatEager- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第4回目 -Defer, Merge- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第5回目 -Periodic, Never- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第6回目 -Race, Retry- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第7回目 -RetryWhen, SequenceEqual- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第8回目 -SwitchLatest, CombineLatest- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第9回目 -ForkJoin, Zip- 【rxdart】 - 狐好きぷろぐらまー
- 【Dart】rxdartについて調べてみる 第10回目 -Buffer, BufferCount- 【rxdart】 - 狐好きぷろぐらまー
目次です。
動作環境
動作環境は以下の通りです。
項目 | バージョン |
---|---|
Dart | 2.6.1 |
rxdart | 0.22.2 |
以下にBufferTestオペレータとBufferTimerオペレータについて記載します。 オペレータの名称は、rxdart、Stream APIの順で記載します。
BufferTestオペレータ
- Rx: なし(Bufferの派生オペレータ)
- rxdart: bufferTest メソッド
- Stream API: BufferTestStreamTransformer< T> クラス
シグネチャ
- rxdart
Observable<List<T>> bufferTest(bool onTestHandler(T event))
- Stream API
BufferTestStreamTransformer<T> BufferTestStreamTransformer(bool test(T value))
処理の概要
生成されたストリームを述語の条件式でtrueになるまで溜め、条件式がtrueになったタイミングでそれまでためられたストリームが発行されます。
以下にサンプルコードを記載します。
import 'package:rxdart/rxdart.dart'; /// BufferTestのサンプルコード Future studyBufferTest() async { Observable.periodic(Duration(milliseconds: 100), (int i) => i) .take(20) // ストリームの値が偶数ならば発行します。 .bufferTest((i) => i % 2 == 0) .listen(print, onDone: () => print('on done.')); // 上と下の処理が同時に実行されないよう少し待つ await Future.delayed(Duration(milliseconds: 3000)); print('--------------------'); Observable.periodic(Duration(milliseconds: 100), (int i) => i) .take(11) // ストリームの値が5の倍数ならば発行します。 .bufferTest((i) => i % 5 == 0) .listen(print, onDone: () => print('on done.')); } // 実行結果 // [0] // [1, 2] // [3, 4] // [5, 6] // [7, 8] // [9, 10] // [11, 12] // [13, 14] // [15, 16] // [17, 18] // [19] // on done. // -------------------- // [0] // [1, 2, 3, 4, 5] // [6, 7, 8, 9, 10] // on done. Future studyBufferTestStreamTransformer() async { Stream.periodic(Duration(milliseconds: 100), (i) => i) .take(20) // ストリームの値が偶数ならば発行します。 .transform(BufferTestStreamTransformer((i) => i % 2 == 0)) .listen(print, onDone: () => print('on done.')); // 上下の処理が同時に実行されないよう少し待つ await Future.delayed(Duration(milliseconds: 3000)); print('--------------------'); Stream.periodic(Duration(milliseconds: 100), (i) => i) .take(11) // ストリームの値が5の倍数ならば発行します。 .transform(BufferTestStreamTransformer((i) => i % 5 == 0)) .listen(print, onDone: () => print('on done.')); } // [0] // [1, 2] // [3, 4] // [5, 6] // [7, 8] // [9, 10] // [11, 12] // [13, 14] // [15, 16] // [17, 18] // [19] // on done. // -------------------- // [0] // [1, 2, 3, 4, 5] // [6, 7, 8, 9, 10] // on done.
BufferTimeオペレータ
- Rx: なし(Bufferの派生オペレータ)
- rxdart: bufferTime メソッド
- Stream API: BufferStreamTransformer クラス
シグネチャ
- rxdart
Observable<List<T>> bufferTime(Duration duration)
- Stream API
BufferStreamTransformer<T> BufferStreamTransformer(Stream window(T event))
処理の概要
指定された期間が経過したらそれまで溜めていたストリームを発行します。 以下にサンプルコードを示します。
import 'package:rxdart/rxdart.dart'; Future studyBufferTime() async { // 100msecごとにカウントアップするストリームを生成する Observable.periodic(Duration(milliseconds: 100), (int i) => i) // 330msecごとにサンプリングする .bufferTime(Duration(milliseconds: 330)) // 5要素取得する .take(5) // 購読する .listen(print, onDone: () => print('on done.')); // 区切る為に少し待つ await Future.delayed(const Duration(milliseconds: 2000)); print('--------------------------------------------------'); // 500msecごとにカウントアップするストリームを生成する Observable.periodic(Duration(milliseconds: 500), (int i) => i) // 100msecごとにサンプリングする .bufferTime(Duration(milliseconds: 100)) // 5要素取得する .take(5) // 購読する .listen(print, onDone: () => print('on done.')); } // [0, 1, 2, 3] // [4, 5, 6] // [7, 8, 9] // [10, 11, 12, 13] // [14, 15, 16] // on done. // -------------------------------------------------- // [0] // [] // [] // [] // [1] // on done. Future studyBufferTimeStreamTransfor() async { Stream.periodic(Duration(milliseconds: 100), (int i) => i) .transform(BufferStreamTransformer( (int window) => Stream.periodic(Duration(milliseconds: 330)))) .take(5) .listen(print, onDone: () => print('on done.')); // 区切る為に少し待つ await Future.delayed(const Duration(milliseconds: 2000)); print('--------------------------------------------------'); Stream.periodic(Duration(milliseconds: 500), (int i) => i) .transform(BufferStreamTransformer( (int window) => Stream.periodic(Duration(milliseconds: 100)))) .take(5) .listen(print, onDone: () => print('on done.')); } // [0, 1, 2, 3] // [4, 5, 6] // [7, 8, 9] // [10, 11, 12, 13] // [14, 15, 16] // on done. // -------------------------------------------------- // [0] // [] // [] // [] // [1] // on done.
雑感
前回からすごい時間が空いてしまいました。。。
少し離れていましたが、また少しずつ更新頻度を上げていきたいと思います。
ここまで読んで頂き、ありがとうございます。