From 6b77e693b9d17fde8bd0fd0c583cd8925ba9d786 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 14 Dec 2015 09:57:34 -0800 Subject: [PATCH] feat(sampleTime): reimplement `sampleTime` with RxJS 4 behavior BREAKING CHANGE: `sampleTime` now has the same behavior `sample(number, scheduler)` did in RxJS 4 --- spec/operators/sampleTime-spec.js | 109 ++++++++++++++++++++++++++++++ src/Observable.ts | 1 + src/Rx.KitchenSink.ts | 1 + src/Rx.ts | 1 + src/add/operator/sampleTime.ts | 5 ++ src/operator/sampleTime.ts | 46 +++++++++++++ 6 files changed, 163 insertions(+) create mode 100644 spec/operators/sampleTime-spec.js create mode 100644 src/add/operator/sampleTime.ts create mode 100644 src/operator/sampleTime.ts diff --git a/spec/operators/sampleTime-spec.js b/spec/operators/sampleTime-spec.js new file mode 100644 index 0000000000..6bb2c22f54 --- /dev/null +++ b/spec/operators/sampleTime-spec.js @@ -0,0 +1,109 @@ +/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot, rxTestScheduler */ +var Rx = require('../../dist/cjs/Rx.KitchenSink'); +var Observable = Rx.Observable; + +describe('Observable.prototype.sampleTime', function () { + it('should get samples on a delay', function () { + var e1 = hot('----a-^--b----c----d----e----f----|'); + var e1subs = '^ !'; + var expected = '-----------c----------e-----|'; + // timer -----------!----------!--------- + + expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should sample nothing if new value has not arrived', function () { + var e1 = hot('----a-^--b----c--------------f----|'); + var e1subs = '^ !'; + var expected = '-----------c----------------|'; + // timer -----------!----------!--------- + + expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should sample ifnew value has arrived, even if it is the same value', function () { + var e1 = hot('----a-^--b----c----------c---f----|'); + var e1subs = '^ !'; + var expected = '-----------c----------c-----|'; + // timer -----------!----------!--------- + + expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should sample nothing if source has not nexted by time of sample', function () { + var e1 = hot('----a-^-------------b-------------|'); + var e1subs = '^ !'; + var expected = '----------------------b-----|'; + // timer -----------!----------!--------- + + expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should raise error if source raises error', function () { + var e1 = hot('----a-^--b----c----d----#'); + var e1subs = '^ !'; + var expected = '-----------c------#'; + // timer -----------!----------!--------- + + expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow unsubscribing explicitly and early', function () { + var e1 = hot('----a-^--b----c----d----e----f----|'); + var unsub = ' ! '; + var e1subs = '^ ! '; + var expected = '-----------c----- '; + // timer -----------!----------!--------- + + expectObservable(e1.sampleTime(110, rxTestScheduler), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not break unsubscription chains when result is unsubscribed explicitly', function () { + var e1 = hot('----a-^--b----c----d----e----f----|'); + var e1subs = '^ ! '; + // timer -----------!----------!--------- + var expected = '-----------c----- '; + var unsub = ' ! '; + + var result = e1 + .mergeMap(function (x) { return Observable.of(x); }) + .sampleTime(110, rxTestScheduler) + .mergeMap(function (x) { return Observable.of(x); }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should completes if source does not emits', function () { + var e1 = cold('|'); + var e1subs = '(^!)'; + var expected = '|'; + + expectObservable(e1.sampleTime(60, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should raise error if source throws immediately', function () { + var e1 = cold('#'); + var e1subs = '(^!)'; + var expected = '#'; + + expectObservable(e1.sampleTime(60, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not completes if source does not complete', function () { + var e1 = cold('-'); + var e1subs = '^'; + var expected = '-'; + + expectObservable(e1.sampleTime(60, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 3c39b3358a..50d365acdd 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -247,6 +247,7 @@ export class Observable implements CoreOperators { retry: (count?: number) => Observable; retryWhen: (notifier: (errors: Observable) => Observable) => Observable; sample: (notifier: Observable) => Observable; + sampleTime: (delay: number, scheduler?: Scheduler) => Observable; scan: (accumulator: (acc: R, x: T) => R, seed?: T | R) => Observable; share: () => Observable; single: (predicate?: (value: T, index: number) => boolean) => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 0106db03b1..81ff82c58d 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -103,6 +103,7 @@ import './add/operator/repeat'; import './add/operator/retry'; import './add/operator/retryWhen'; import './add/operator/sample'; +import './add/operator/sampleTime'; import './add/operator/scan'; import './add/operator/share'; import './add/operator/single'; diff --git a/src/Rx.ts b/src/Rx.ts index 3215d53dbb..d6d9a43799 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -75,6 +75,7 @@ import './add/operator/repeat'; import './add/operator/retry'; import './add/operator/retryWhen'; import './add/operator/sample'; +import './add/operator/sampleTime'; import './add/operator/scan'; import './add/operator/share'; import './add/operator/single'; diff --git a/src/add/operator/sampleTime.ts b/src/add/operator/sampleTime.ts new file mode 100644 index 0000000000..7a3d4833d3 --- /dev/null +++ b/src/add/operator/sampleTime.ts @@ -0,0 +1,5 @@ +import {Observable} from '../../Observable'; +import {sampleTime} from '../../operator/sampleTime'; +Observable.prototype.sampleTime = sampleTime; + +export var _void: void; \ No newline at end of file diff --git a/src/operator/sampleTime.ts b/src/operator/sampleTime.ts new file mode 100644 index 0000000000..2877a76307 --- /dev/null +++ b/src/operator/sampleTime.ts @@ -0,0 +1,46 @@ +import {Observable} from '../Observable'; +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; +import {Scheduler} from '../Scheduler'; +import {asap} from '../scheduler/asap'; + +export function sampleTime(delay: number, scheduler: Scheduler = asap): Observable { + return this.lift(new SampleTimeOperator(delay, scheduler)); +} + +class SampleTimeOperator implements Operator { + constructor(private delay: number, private scheduler: Scheduler) { + } + + call(subscriber: Subscriber) { + return new SampleTimeSubscriber(subscriber, this.delay, this.scheduler); + } +} + +class SampleTimeSubscriber extends Subscriber { + lastValue: T; + hasValue: boolean = false; + + constructor(destination: Subscriber, private delay: number, private scheduler: Scheduler) { + super(destination); + this.add(scheduler.schedule(dispatchNotification, delay, { subscriber: this, delay })); + } + + _next(value: T) { + this.lastValue = value; + this.hasValue = true; + } + + notifyNext() { + if (this.hasValue) { + this.hasValue = false; + this.destination.next(this.lastValue); + } + } +} + +function dispatchNotification(state) { + let { subscriber, delay } = state; + subscriber.notifyNext(); + (this).schedule(state, delay); +}