From e4dd1fdcaf9a5269a5bbdaec2fea1dd597d357bf Mon Sep 17 00:00:00 2001 From: Jason Aden Date: Fri, 8 Sep 2017 06:50:49 -0700 Subject: [PATCH] feat(throttle): add higher-order lettable version of throttle --- src/operator/throttle.ts | 115 +-------------------------- src/operators/index.ts | 1 + src/operators/throttle.ts | 160 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 113 deletions(-) create mode 100644 src/operators/throttle.ts diff --git a/src/operator/throttle.ts b/src/operator/throttle.ts index e840629182..385832770c 100644 --- a/src/operator/throttle.ts +++ b/src/operator/throttle.ts @@ -1,21 +1,5 @@ -import { Operator } from '../Operator'; import { Observable, SubscribableOrPromise } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { Subscription, TeardownLogic } from '../Subscription'; - -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; - -export interface ThrottleConfig { - leading?: boolean; - trailing?: boolean; -} - -export const defaultThrottleConfig: ThrottleConfig = { - leading: true, - trailing: false -}; +import { throttle as higherOrder, ThrottleConfig, defaultThrottleConfig } from '../operators/throttle'; /** * Emits a value from the source Observable, then ignores subsequent source @@ -60,100 +44,5 @@ export const defaultThrottleConfig: ThrottleConfig = { export function throttle(this: Observable, durationSelector: (value: T) => SubscribableOrPromise, config: ThrottleConfig = defaultThrottleConfig): Observable { - return this.lift(new ThrottleOperator(durationSelector, config.leading, config.trailing)); -} - -class ThrottleOperator implements Operator { - constructor(private durationSelector: (value: T) => SubscribableOrPromise, - private leading: boolean, - private trailing: boolean) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe( - new ThrottleSubscriber(subscriber, this.durationSelector, this.leading, this.trailing) - ); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc - * @ignore - * @extends {Ignored} - */ -class ThrottleSubscriber extends OuterSubscriber { - private throttled: Subscription; - private _trailingValue: T; - private _hasTrailingValue = false; - - constructor(protected destination: Subscriber, - private durationSelector: (value: T) => SubscribableOrPromise, - private _leading: boolean, - private _trailing: boolean) { - super(destination); - } - - protected _next(value: T): void { - if (this.throttled) { - if (this._trailing) { - this._hasTrailingValue = true; - this._trailingValue = value; - } - } else { - const duration = this.tryDurationSelector(value); - if (duration) { - this.add(this.throttled = subscribeToResult(this, duration)); - } - if (this._leading) { - this.destination.next(value); - if (this._trailing) { - this._hasTrailingValue = true; - this._trailingValue = value; - } - } - } - } - - private tryDurationSelector(value: T): SubscribableOrPromise { - try { - return this.durationSelector(value); - } catch (err) { - this.destination.error(err); - return null; - } - } - - protected _unsubscribe() { - const { throttled, _trailingValue, _hasTrailingValue, _trailing } = this; - - this._trailingValue = null; - this._hasTrailingValue = false; - - if (throttled) { - this.remove(throttled); - this.throttled = null; - throttled.unsubscribe(); - } - } - - private _sendTrailing() { - const { destination, throttled, _trailing, _trailingValue, _hasTrailingValue } = this; - if (throttled && _trailing && _hasTrailingValue) { - destination.next(_trailingValue); - this._trailingValue = null; - this._hasTrailingValue = false; - } - } - - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this._sendTrailing(); - this._unsubscribe(); - } - - notifyComplete(): void { - this._sendTrailing(); - this._unsubscribe(); - } + return higherOrder(durationSelector, config)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 457aeb5ee0..e39d5b6424 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -65,6 +65,7 @@ export { switchAll } from './switchAll'; export { switchMap } from './switchMap'; export { takeLast } from './takeLast'; export { tap } from './tap'; +export { throttle } from './throttle'; export { timestamp } from './timestamp'; export { toArray } from './toArray'; export { window } from './window'; diff --git a/src/operators/throttle.ts b/src/operators/throttle.ts new file mode 100644 index 0000000000..f8085bd38a --- /dev/null +++ b/src/operators/throttle.ts @@ -0,0 +1,160 @@ +import { Operator } from '../Operator'; +import { Observable, SubscribableOrPromise } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { Subscription, TeardownLogic } from '../Subscription'; + +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; + +import { MonoTypeOperatorFunction } from '../interfaces'; + +export interface ThrottleConfig { + leading?: boolean; + trailing?: boolean; +} + +export const defaultThrottleConfig: ThrottleConfig = { + leading: true, + trailing: false +}; + +/** + * Emits a value from the source Observable, then ignores subsequent source + * values for a duration determined by another Observable, then repeats this + * process. + * + * It's like {@link throttleTime}, but the silencing + * duration is determined by a second Observable. + * + * + * + * `throttle` emits the source Observable values on the output Observable + * when its internal timer is disabled, and ignores source values when the timer + * is enabled. Initially, the timer is disabled. As soon as the first source + * value arrives, it is forwarded to the output Observable, and then the timer + * is enabled by calling the `durationSelector` function with the source value, + * which returns the "duration" Observable. When the duration Observable emits a + * value or completes, the timer is disabled, and this process repeats for the + * next source value. + * + * @example Emit clicks at a rate of at most one click per second + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.throttle(ev => Rx.Observable.interval(1000)); + * result.subscribe(x => console.log(x)); + * + * @see {@link audit} + * @see {@link debounce} + * @see {@link delayWhen} + * @see {@link sample} + * @see {@link throttleTime} + * + * @param {function(value: T): SubscribableOrPromise} durationSelector A function + * that receives a value from the source Observable, for computing the silencing + * duration for each source value, returned as an Observable or a Promise. + * @param {Object} config a configuration object to define `leading` and `trailing` behavior. Defaults + * to `{ leading: true, trailing: false }`. + * @return {Observable} An Observable that performs the throttle operation to + * limit the rate of emissions from the source. + * @method throttle + * @owner Observable + */ +export function throttle(durationSelector: (value: T) => SubscribableOrPromise, + config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(new ThrottleOperator(durationSelector, config.leading, config.trailing)); +} + +class ThrottleOperator implements Operator { + constructor(private durationSelector: (value: T) => SubscribableOrPromise, + private leading: boolean, + private trailing: boolean) { + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe( + new ThrottleSubscriber(subscriber, this.durationSelector, this.leading, this.trailing) + ); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc + * @ignore + * @extends {Ignored} + */ +class ThrottleSubscriber extends OuterSubscriber { + private throttled: Subscription; + private _trailingValue: T; + private _hasTrailingValue = false; + + constructor(protected destination: Subscriber, + private durationSelector: (value: T) => SubscribableOrPromise, + private _leading: boolean, + private _trailing: boolean) { + super(destination); + } + + protected _next(value: T): void { + if (this.throttled) { + if (this._trailing) { + this._hasTrailingValue = true; + this._trailingValue = value; + } + } else { + const duration = this.tryDurationSelector(value); + if (duration) { + this.add(this.throttled = subscribeToResult(this, duration)); + } + if (this._leading) { + this.destination.next(value); + if (this._trailing) { + this._hasTrailingValue = true; + this._trailingValue = value; + } + } + } + } + + private tryDurationSelector(value: T): SubscribableOrPromise { + try { + return this.durationSelector(value); + } catch (err) { + this.destination.error(err); + return null; + } + } + + protected _unsubscribe() { + const { throttled, _trailingValue, _hasTrailingValue, _trailing } = this; + + this._trailingValue = null; + this._hasTrailingValue = false; + + if (throttled) { + this.remove(throttled); + this.throttled = null; + throttled.unsubscribe(); + } + } + + private _sendTrailing() { + const { destination, throttled, _trailing, _trailingValue, _hasTrailingValue } = this; + if (throttled && _trailing && _hasTrailingValue) { + destination.next(_trailingValue); + this._trailingValue = null; + this._hasTrailingValue = false; + } + } + + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this._sendTrailing(); + this._unsubscribe(); + } + + notifyComplete(): void { + this._sendTrailing(); + this._unsubscribe(); + } +}