diff --git a/spec/operators/sample-spec.js b/spec/operators/sample-spec.js new file mode 100644 index 0000000000..046e3e2e19 --- /dev/null +++ b/spec/operators/sample-spec.js @@ -0,0 +1,26 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.sample', function () { + it('should get samples when the notifier emits', function (done) { + var expected = [1, 3, 5]; + Observable.interval(100) + .sample(Observable.interval(220)) + .take(3) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }, 2000); + + it('should not complete when the notifier completes, nor should it emit', function (done) { + Observable.interval(100) + .sample(Observable.timer(220)) + .subscribe(function (x) { + expect(x).toBe(1); + setTimeout(done, 500); + }, null, function () { + throw 'should not be called'; + }); + }); +}); \ No newline at end of file diff --git a/spec/operators/sampleTime-spec.js b/spec/operators/sampleTime-spec.js new file mode 100644 index 0000000000..82957f6f29 --- /dev/null +++ b/spec/operators/sampleTime-spec.js @@ -0,0 +1,15 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.sampleTime', function () { + it('should get samples on a delay', function (done) { + var expected = [1, 3, 5]; + Observable.interval(100) + .sampleTime(220) + .take(3) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 6e992922ff..97af12fd42 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -178,6 +178,9 @@ export default class Observable { bufferTime: (bufferTimeSpan: number, bufferCreationInterval?: number, scheduler?: Scheduler) => Observable; bufferCount: (bufferSize: number, startBufferEvery: number) => Observable; + sample: (notifier: Observable) => Observable; + sampleTime: (delay: number, scheduler?: Scheduler) => Observable; + finally: (ensure: () => void, thisArg?: any) => Observable; timeout: (due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable; timeoutWith: (due: number|Date, withObservable: Observable, scheduler?: Scheduler) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index 7baa32e8d4..57d32b4d78 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -207,6 +207,12 @@ observableProto.bufferTime = bufferTime; observableProto.bufferToggle = bufferToggle; observableProto.bufferWhen = bufferWhen; +import sample from './operators/sample'; +import sampleTime from './operators/sampleTime'; + +observableProto.sample = sample; +observableProto.sampleTime = sampleTime; + var Scheduler = { nextTick, immediate diff --git a/src/operators/sample.ts b/src/operators/sample.ts new file mode 100644 index 0000000000..da311d8cf7 --- /dev/null +++ b/src/operators/sample.ts @@ -0,0 +1,64 @@ +import Observable from '../Observable'; +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Subscription from '../Subscription'; + +export default function sample(notifier: Observable): Observable { + return this.lift(new SampleOperator(notifier)); +} + +class SampleOperator implements Operator { + constructor(private notifier: Observable) { + } + + call(observer: Observer) { + return new SampleSubscriber(observer, this.notifier); + } +} + +class SampleSubscriber extends Subscriber { + private lastValue: T; + private hasValue: boolean = false; + private innerSubscription: Subscription; + + constructor(destination: Observer, private notifier: Observable) { + super(destination); + this.add(this.innerSubscription = notifier.subscribe(new SampleNoficationSubscriber(this))); + } + + _next(value: T) { + this.lastValue = value; + this.hasValue = true; + } + + notifyNext() { + if (this.hasValue) { + this.destination.next(this.lastValue); + } + } + + notifyComplete() { + this.remove(this.innerSubscription); + this.innerSubscription.unsubscribe(); + } +} + + +class SampleNoficationSubscriber extends Subscriber { + constructor(private parent: SampleSubscriber) { + super(null); + } + + _next() { + this.parent.notifyNext(); + } + + _error(err: any) { + this.parent.error(err); + } + + _complete() { + this.parent.notifyComplete(); + } +} \ No newline at end of file diff --git a/src/operators/sampleTime.ts b/src/operators/sampleTime.ts new file mode 100644 index 0000000000..18c1b4645d --- /dev/null +++ b/src/operators/sampleTime.ts @@ -0,0 +1,46 @@ +import Observable from '../Observable'; +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Subscription from '../Subscription'; +import Scheduler from '../Scheduler'; +import nextTick from '../schedulers/nextTick'; + +export default function sampleTime(delay: number, scheduler: Scheduler = nextTick): Observable { + return this.lift(new SampleTimeOperator(delay, scheduler)); +} + +class SampleTimeOperator implements Operator { + constructor(private delay: number, private scheduler: Scheduler) { + } + + call(observer: Observer) { + return new SampleTimeSubscriber(observer, this.delay, this.scheduler); + } +} + +class SampleTimeSubscriber extends Subscriber { + lastValue: T; + hasValue: boolean = false; + + constructor(destination: Observer, private delay: number, private scheduler: Scheduler) { + super(destination); + this.add(scheduler.schedule(delay, { subscriber: this }, dispatchNotification)); + } + + _next(value: T) { + this.lastValue = value; + this.hasValue = true; + } + + notifyNext() { + if (this.hasValue) { + this.destination.next(this.lastValue); + } + } +} + +function dispatchNotification(state: { subscriber: SampleTimeSubscriber }) { + state.subscriber.notifyNext(); + (this).schedule(state); +} \ No newline at end of file