Skip to content

Commit

Permalink
feat(sample): readd sample operator
Browse files Browse the repository at this point in the history
Add an implementation of `sample` that behaves like RxJS 4

BREAKING CHANGE: `sample` behavior returned to RxJS 4 behavior
  • Loading branch information
benlesh committed Dec 14, 2015
1 parent 0f9d45b commit e93bffc
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 0 deletions.
180 changes: 180 additions & 0 deletions spec/operators/sample-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/* globals describe, it, expect, hot, expectObservable,, expectSubscriptions */
var Rx = require('../../dist/cjs/Rx.KitchenSink');
var Observable = Rx.Observable;

describe('Observable.prototype.sample', function () {
it('should get samples when the notifier emits', function () {
var e1 = hot('----a-^--b----c----d----e----f----| ');
var e1subs = '^ ! ';
var e2 = hot( '-----x----------x----------x----------|');
var e2subs = '^ ! ';
var expected = '-----b----------d----------f| ';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should sample nothing if source has not nexted yet', function () {
var e1 = hot('----a-^-------b----|');
var e1subs = '^ !';
var e2 = hot( '-----x-------|');
var e2subs = '^ !';
var expected = '-------------|';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should not complete when the notifier completes, nor should it emit', function () {
var e1 = hot('----a----b----c----d----e----f----');
var e1subs = '^ ';
var e2 = hot('------x-| ');
var e2subs = '^ ! ';
var expected = '------a---------------------------';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should complete only when the source completes, if notifier completes early', function () {
var e1 = hot('----a----b----c----d----e----f---|');
var e1subs = '^ !';
var e2 = hot('------x-| ');
var e2subs = '^ ! ';
var expected = '------a--------------------------|';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should allow unsubscribing explicitly and early', function () {
var e1 = hot('----a-^--b----c----d----e----f----| ');
var unsub = ' ! ';
var e1subs = '^ ! ';
var e2 = hot( '-----x----------x----------x----------|');
var e2subs = '^ ! ';
var expected = '-----b--------- ';

expectObservable(e1.sample(e2), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should not break unsubscription chains when result is unsubscribed explicitly', function () {
var e1 = hot('----a-^--b----c----d----e----f----| ');
var e1subs = '^ ! ';
var e2 = hot( '-----x----------x----------x----------|');
var e2subs = '^ ! ';
var expected = '-----b--------- ';
var unsub = ' ! ';

var result = e1
.mergeMap(function (x) { return Observable.of(x); })
.sample(e2)
.mergeMap(function (x) { return Observable.of(x); });

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should only sample when a new value arrives, even if it is the same value', function () {
var e1 = hot('----a----b----c----c----e----f----| ');
var e1subs = '^ ! ';
var e2 = hot('------x-x------xx-x---x----x--------|');
var e2subs = '^ ! ';
var expected = '------a--------c------c----e------| ';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should raise error if source raises error', function () {
var e1 = hot('----a-^--b----c----d----# ');
var e1subs = '^ ! ';
var e2 = hot( '-----x----------x----------x----------|');
var e2subs = '^ ! ';
var expected = '-----b----------d-# ';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should completes if source does not emits', function () {
var e1 = hot('|');
var e2 = hot('------x-------|');
var expected = '|';
var e1subs = '(^!)';
var e2subs = '(^!)';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should raise error if source throws immediately', function () {
var e1 = hot('#');
var e2 = hot('------x-------|');
var expected = '#';
var e1subs = '(^!)';
var e2subs = '(^!)';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should raise error if notification raises error', function () {
var e1 = hot('--a-----|');
var e2 = hot('----#');
var expected = '----#';
var e1subs = '^ !';
var e2subs = '^ !';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should not completes if source does not complete', function () {
var e1 = hot('-');
var e1subs = '^ ';
var e2 = hot('------x-------|');
var e2subs = '^ !';
var expected = '-';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should sample only until source completes', function () {
var e1 = hot('----a----b----c----d-|');
var e1subs = '^ !';
var e2 = hot('-----------x----------x------------|');
var e2subs = '^ !';
var expected = '-----------b---------|';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should complete sampling if sample observable completes', function () {
var e1 = hot('----a----b----c----d-|');
var e1subs = '^ !';
var e2 = hot('|');
var e2subs = '(^!)';
var expected = '---------------------|';

expectObservable(e1.sample(e2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export interface CoreOperators<T> {
repeat?: (count?: number) => Observable<T>;
retry?: (count?: number) => Observable<T>;
retryWhen?: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;
sample?: (notifier: Observable<any>) => Observable<T>;
scan?: <R>(project: (acc: R, x: T) => R, acc?: R) => Observable<R>;
share?: () => Observable<T>;
single?: (predicate?: (value: T, index: number) => boolean) => Observable<T>;
Expand Down
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ export class Observable<T> implements CoreOperators<T> {
repeat: (count?: number) => Observable<T>;
retry: (count?: number) => Observable<T>;
retryWhen: (notifier: (errors: Observable<any>) => Observable<any>) => Observable<T>;
sample: (notifier: Observable<any>) => Observable<T>;
scan: <R>(accumulator: (acc: R, x: T) => R, seed?: T | R) => Observable<R>;
share: () => Observable<T>;
single: (predicate?: (value: T, index: number) => boolean) => Observable<T>;
Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import './add/operator/reduce';
import './add/operator/repeat';
import './add/operator/retry';
import './add/operator/retryWhen';
import './add/operator/sample';
import './add/operator/scan';
import './add/operator/share';
import './add/operator/single';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import './add/operator/reduce';
import './add/operator/repeat';
import './add/operator/retry';
import './add/operator/retryWhen';
import './add/operator/sample';
import './add/operator/scan';
import './add/operator/share';
import './add/operator/single';
Expand Down
5 changes: 5 additions & 0 deletions src/add/operator/sample.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {Observable} from '../../Observable';
import {sample} from '../../operator/sample';
Observable.prototype.sample = sample;

export var _void: void;
56 changes: 56 additions & 0 deletions src/operator/sample.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import {Observable} from '../Observable';
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';

export function sample<T>(notifier: Observable<any>): Observable<T> {
return this.lift(new SampleOperator(notifier));
}

class SampleOperator<T, R> implements Operator<T, R> {
constructor(private notifier: Observable<any>) {
}

call(subscriber: Subscriber<R>) {
return new SampleSubscriber(subscriber, this.notifier);
}
}

class SampleSubscriber<T> extends Subscriber<T> {
private lastValue: T;
private hasValue: boolean = false;

constructor(destination: Subscriber<T>, private notifier: Observable<any>) {
super(destination);
this.add(notifier._subscribe(new SampleNotificationSubscriber(this)));
}

_next(value: T) {
this.lastValue = value;
this.hasValue = true;
}

notifyNext() {
if (this.hasValue) {
this.hasValue = false;
this.destination.next(this.lastValue);
}
}
}

class SampleNotificationSubscriber<T> extends Subscriber<T> {
constructor(private parent: SampleSubscriber<T>) {
super(null);
}

_next() {
this.parent.notifyNext();
}

_error(err: any) {
this.parent.error(err);
}

_complete() {
//noop
}
}

0 comments on commit e93bffc

Please sign in to comment.