Skip to content

Commit

Permalink
feat(throttle): add throttle operator with durationSelector
Browse files Browse the repository at this point in the history
Add new throttle operator (previous throttle was renamed to throttleTime) that takes a
durationSelector parameter, much like debounce does.

Resolves ReactiveX#496.
  • Loading branch information
staltz committed Nov 11, 2015
1 parent c1d04ac commit e98d350
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export interface CoreOperators<T> {
switchMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take?: (count: number) => Observable<T>;
takeUntil?: (notifier: Observable<any>) => Observable<T>;
throttle?: (durationSelector: (value: T) => Observable<any> | Promise<any>) => Observable<T>;
throttleTime?: (delay: number, scheduler?: Scheduler) => Observable<T>;
timeout?: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith?: <R>(due: number | Date, withObservable: Observable<R>, scheduler?: Scheduler) => Observable<T> | Observable<R>;
Expand Down
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ export class Observable<T> implements CoreOperators<T> {
switchMapTo: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take: (count: number) => Observable<T>;
takeUntil: (notifier: Observable<any>) => Observable<T>;
throttle: (durationSelector: (value: T) => Observable<any> | Promise<any>) => Observable<T>;
throttleTime: (delay: number, scheduler?: Scheduler) => Observable<T>;
timeout: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith: <R>(due: number | Date, withObservable: Observable<R>, scheduler?: Scheduler) => Observable<T> | Observable<R>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ observableProto.take = take;
import {takeUntil} from './operators/takeUntil';
observableProto.takeUntil = takeUntil;

import {throttle} from './operators/throttle';
observableProto.throttle = throttle;

import {throttleTime} from './operators/throttleTime';
observableProto.throttleTime = throttleTime;

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ observableProto.take = take;
import {takeUntil} from './operators/takeUntil';
observableProto.takeUntil = takeUntil;

import {throttle} from './operators/throttle';
observableProto.throttle = throttle;

import {throttleTime} from './operators/throttleTime';
observableProto.throttleTime = throttleTime;

Expand Down
84 changes: 84 additions & 0 deletions src/operators/throttle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import {Operator} from '../Operator';
import {Observable} from '../Observable';
import {PromiseObservable} from '../observables/PromiseObservable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

import {tryCatch} from '../util/tryCatch';
import {isPromise} from '../util/isPromise';
import {errorObject} from '../util/errorObject';

export function throttle<T>(durationSelector: (value: T) => Observable<any> | Promise<any>): Observable<T> {
return this.lift(new ThrottleOperator(durationSelector));
}

class ThrottleOperator<T, R> implements Operator<T, R> {
constructor(private durationSelector: (value: T) => Observable<any> | Promise<any>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new ThrottleSubscriber(subscriber, this.durationSelector);
}
}

class ThrottleSubscriber<T> extends Subscriber<T> {
private throttled: Subscription<any>;

constructor(destination: Subscriber<T>,
private durationSelector: (value: T) => Observable<any> | Promise<any>) {
super(destination);
}

_next(value: T): void {
if (!this.throttled) {
const destination = this.destination;
let duration = tryCatch(this.durationSelector)(value);
if (duration === errorObject) {
destination.error(errorObject.e);
return;
}
if (isPromise(duration)) {
duration = PromiseObservable.create(duration);
}
this.add(this.throttled = duration._subscribe(new ThrottleDurationSelectorSubscriber(this)));
destination.next(value);
}
}

_error(err: any): void {
this.clearThrottle();
super._error(err);
}

_complete(): void {
this.clearThrottle();
super._complete();
}

clearThrottle(): void {
const throttled = this.throttled;
if (throttled) {
throttled.unsubscribe();
this.remove(throttled);
this.throttled = null;
}
}
}

class ThrottleDurationSelectorSubscriber<T> extends Subscriber<T> {
constructor(private parent: ThrottleSubscriber<any>) {
super(null);
}

_next(unused: T): void {
this.parent.clearThrottle();
}

_error(err): void {
this.parent.error(err);
}

_complete(): void {
this.parent.clearThrottle();
}
}

0 comments on commit e98d350

Please sign in to comment.