Skip to content

Commit

Permalink
feat(sampleTime): reimplement sampleTime with RxJS 4 behavior
Browse files Browse the repository at this point in the history
BREAKING CHANGE: `sampleTime` now has the same behavior `sample(number, scheduler)` did in RxJS 4
  • Loading branch information
benlesh committed Dec 14, 2015
1 parent e93bffc commit 6b77e69
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 0 deletions.
109 changes: 109 additions & 0 deletions spec/operators/sampleTime-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx.KitchenSink');
var Observable = Rx.Observable;

describe('Observable.prototype.sampleTime', function () {
it('should get samples on a delay', function () {
var e1 = hot('----a-^--b----c----d----e----f----|');
var e1subs = '^ !';
var expected = '-----------c----------e-----|';
// timer -----------!----------!---------

expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should sample nothing if new value has not arrived', function () {
var e1 = hot('----a-^--b----c--------------f----|');
var e1subs = '^ !';
var expected = '-----------c----------------|';
// timer -----------!----------!---------

expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should sample ifnew value has arrived, even if it is the same value', function () {
var e1 = hot('----a-^--b----c----------c---f----|');
var e1subs = '^ !';
var expected = '-----------c----------c-----|';
// timer -----------!----------!---------

expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should sample nothing if source has not nexted by time of sample', function () {
var e1 = hot('----a-^-------------b-------------|');
var e1subs = '^ !';
var expected = '----------------------b-----|';
// timer -----------!----------!---------

expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should raise error if source raises error', function () {
var e1 = hot('----a-^--b----c----d----#');
var e1subs = '^ !';
var expected = '-----------c------#';
// timer -----------!----------!---------

expectObservable(e1.sampleTime(110, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow unsubscribing explicitly and early', function () {
var e1 = hot('----a-^--b----c----d----e----f----|');
var unsub = ' ! ';
var e1subs = '^ ! ';
var expected = '-----------c----- ';
// timer -----------!----------!---------

expectObservable(e1.sampleTime(110, rxTestScheduler), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains when result is unsubscribed explicitly', function () {
var e1 = hot('----a-^--b----c----d----e----f----|');
var e1subs = '^ ! ';
// timer -----------!----------!---------
var expected = '-----------c----- ';
var unsub = ' ! ';

var result = e1
.mergeMap(function (x) { return Observable.of(x); })
.sampleTime(110, rxTestScheduler)
.mergeMap(function (x) { return Observable.of(x); });

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should completes if source does not emits', function () {
var e1 = cold('|');
var e1subs = '(^!)';
var expected = '|';

expectObservable(e1.sampleTime(60, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should raise error if source throws immediately', function () {
var e1 = cold('#');
var e1subs = '(^!)';
var expected = '#';

expectObservable(e1.sampleTime(60, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not completes if source does not complete', function () {
var e1 = cold('-');
var e1subs = '^';
var expected = '-';

expectObservable(e1.sampleTime(60, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ export class Observable<T> implements CoreOperators<T> {
retry: (count?: number) => Observable<T>;
retryWhen: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;
sample: (notifier: Observable<any>) => Observable<T>;
sampleTime: (delay: number, scheduler?: Scheduler) => Observable<T>;
scan: <R>(accumulator: (acc: R, x: T) => R, seed?: T | R) => Observable<R>;
share: () => Observable<T>;
single: (predicate?: (value: T, index: number) => boolean) => Observable<T>;
Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ import './add/operator/repeat';
import './add/operator/retry';
import './add/operator/retryWhen';
import './add/operator/sample';
import './add/operator/sampleTime';
import './add/operator/scan';
import './add/operator/share';
import './add/operator/single';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import './add/operator/repeat';
import './add/operator/retry';
import './add/operator/retryWhen';
import './add/operator/sample';
import './add/operator/sampleTime';
import './add/operator/scan';
import './add/operator/share';
import './add/operator/single';
Expand Down
5 changes: 5 additions & 0 deletions src/add/operator/sampleTime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {Observable} from '../../Observable';
import {sampleTime} from '../../operator/sampleTime';
Observable.prototype.sampleTime = sampleTime;

export var _void: void;
46 changes: 46 additions & 0 deletions src/operator/sampleTime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {asap} from '../scheduler/asap';

export function sampleTime<T>(delay: number, scheduler: Scheduler = asap): Observable<T> {
return this.lift(new SampleTimeOperator(delay, scheduler));
}

class SampleTimeOperator<T, R> implements Operator<T, R> {
constructor(private delay: number, private scheduler: Scheduler) {
}

call(subscriber: Subscriber<R>) {
return new SampleTimeSubscriber(subscriber, this.delay, this.scheduler);
}
}

class SampleTimeSubscriber<T> extends Subscriber<T> {
lastValue: T;
hasValue: boolean = false;

constructor(destination: Subscriber<T>, private delay: number, private scheduler: Scheduler) {
super(destination);
this.add(scheduler.schedule(dispatchNotification, delay, { subscriber: this, delay }));
}

_next(value: T) {
this.lastValue = value;
this.hasValue = true;
}

notifyNext() {
if (this.hasValue) {
this.hasValue = false;
this.destination.next(this.lastValue);
}
}
}

function dispatchNotification<T>(state) {
let { subscriber, delay } = state;
subscriber.notifyNext();
(<any>this).schedule(state, delay);
}

0 comments on commit 6b77e69

Please sign in to comment.