diff --git a/src/operator/expand.ts b/src/operator/expand.ts index e4975e4cfe..b6f9183fe9 100644 --- a/src/operator/expand.ts +++ b/src/operator/expand.ts @@ -1,13 +1,6 @@ import { Observable } from '../Observable'; import { IScheduler } from '../Scheduler'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { tryCatch } from '../util/tryCatch'; -import { errorObject } from '../util/errorObject'; -import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { expand as higherOrder } from '../operators'; /* tslint:disable:max-line-length */ export function expand(this: Observable, project: (value: T, index: number) => Observable, concurrent?: number, scheduler?: IScheduler): Observable; @@ -64,105 +57,5 @@ export function expand(this: Observable, project: (value: T, index: num scheduler: IScheduler = undefined): Observable { concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent; - return this.lift(new ExpandOperator(project, concurrent, scheduler)); -} - -export class ExpandOperator implements Operator { - constructor(private project: (value: T, index: number) => Observable, - private concurrent: number, - private scheduler: IScheduler) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler)); - } -} - -interface DispatchArg { - subscriber: ExpandSubscriber; - result: Observable; - value: any; - index: number; -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class ExpandSubscriber extends OuterSubscriber { - private index: number = 0; - private active: number = 0; - private hasCompleted: boolean = false; - private buffer: any[]; - - constructor(destination: Subscriber, - private project: (value: T, index: number) => Observable, - private concurrent: number, - private scheduler: IScheduler) { - super(destination); - if (concurrent < Number.POSITIVE_INFINITY) { - this.buffer = []; - } - } - - private static dispatch(arg: DispatchArg): void { - const {subscriber, result, value, index} = arg; - subscriber.subscribeToProjection(result, value, index); - } - - protected _next(value: any): void { - const destination = this.destination; - - if (destination.closed) { - this._complete(); - return; - } - - const index = this.index++; - if (this.active < this.concurrent) { - destination.next(value); - let result = tryCatch(this.project)(value, index); - if (result === errorObject) { - destination.error(errorObject.e); - } else if (!this.scheduler) { - this.subscribeToProjection(result, value, index); - } else { - const state: DispatchArg = { subscriber: this, result, value, index }; - this.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state)); - } - } else { - this.buffer.push(value); - } - } - - private subscribeToProjection(result: any, value: T, index: number): void { - this.active++; - this.add(subscribeToResult(this, result, value, index)); - } - - protected _complete(): void { - this.hasCompleted = true; - if (this.hasCompleted && this.active === 0) { - this.destination.complete(); - } - } - - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this._next(innerValue); - } - - notifyComplete(innerSub: Subscription): void { - const buffer = this.buffer; - this.remove(innerSub); - this.active--; - if (buffer && buffer.length > 0) { - this._next(buffer.shift()); - } - if (this.hasCompleted && this.active === 0) { - this.destination.complete(); - } - } + return higherOrder(project, concurrent, scheduler)(this); } diff --git a/src/operators/expand.ts b/src/operators/expand.ts new file mode 100644 index 0000000000..78a9acbf13 --- /dev/null +++ b/src/operators/expand.ts @@ -0,0 +1,169 @@ +import { Observable } from '../Observable'; +import { IScheduler } from '../Scheduler'; +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { tryCatch } from '../util/tryCatch'; +import { errorObject } from '../util/errorObject'; +import { Subscription } from '../Subscription'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { MonoTypeOperatorFunction, OperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function expand(project: (value: T, index: number) => Observable, concurrent?: number, scheduler?: IScheduler): MonoTypeOperatorFunction; +export function expand(project: (value: T, index: number) => Observable, concurrent?: number, scheduler?: IScheduler): OperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Recursively projects each source value to an Observable which is merged in + * the output Observable. + * + * It's similar to {@link mergeMap}, but applies the + * projection function to every source value as well as every output value. + * It's recursive. + * + * + * + * Returns an Observable that emits items based on applying a function that you + * supply to each item emitted by the source Observable, where that function + * returns an Observable, and then merging those resulting Observables and + * emitting the results of this merger. *Expand* will re-emit on the output + * Observable every source value. Then, each output value is given to the + * `project` function which returns an inner Observable to be merged on the + * output Observable. Those output values resulting from the projection are also + * given to the `project` function to produce new output values. This is how + * *expand* behaves recursively. + * + * @example Start emitting the powers of two on every click, at most 10 of them + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var powersOfTwo = clicks + * .mapTo(1) + * .expand(x => Rx.Observable.of(2 * x).delay(1000)) + * .take(10); + * powersOfTwo.subscribe(x => console.log(x)); + * + * @see {@link mergeMap} + * @see {@link mergeScan} + * + * @param {function(value: T, index: number) => Observable} project A function + * that, when applied to an item emitted by the source or the output Observable, + * returns an Observable. + * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input + * Observables being subscribed to concurrently. + * @param {Scheduler} [scheduler=null] The IScheduler to use for subscribing to + * each projected inner Observable. + * @return {Observable} An Observable that emits the source values and also + * result of applying the projection function to each value emitted on the + * output Observable and and merging the results of the Observables obtained + * from this transformation. + * @method expand + * @owner Observable + */ +export function expand(project: (value: T, index: number) => Observable, + concurrent: number = Number.POSITIVE_INFINITY, + scheduler: IScheduler = undefined): OperatorFunction { + concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent; + + return (source: Observable) => source.lift(new ExpandOperator(project, concurrent, scheduler)); +} + +export class ExpandOperator implements Operator { + constructor(private project: (value: T, index: number) => Observable, + private concurrent: number, + private scheduler: IScheduler) { + } + + call(subscriber: Subscriber, source: any): any { + return source.subscribe(new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler)); + } +} + +interface DispatchArg { + subscriber: ExpandSubscriber; + result: Observable; + value: any; + index: number; +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +export class ExpandSubscriber extends OuterSubscriber { + private index: number = 0; + private active: number = 0; + private hasCompleted: boolean = false; + private buffer: any[]; + + constructor(destination: Subscriber, + private project: (value: T, index: number) => Observable, + private concurrent: number, + private scheduler: IScheduler) { + super(destination); + if (concurrent < Number.POSITIVE_INFINITY) { + this.buffer = []; + } + } + + private static dispatch(arg: DispatchArg): void { + const {subscriber, result, value, index} = arg; + subscriber.subscribeToProjection(result, value, index); + } + + protected _next(value: any): void { + const destination = this.destination; + + if (destination.closed) { + this._complete(); + return; + } + + const index = this.index++; + if (this.active < this.concurrent) { + destination.next(value); + let result = tryCatch(this.project)(value, index); + if (result === errorObject) { + destination.error(errorObject.e); + } else if (!this.scheduler) { + this.subscribeToProjection(result, value, index); + } else { + const state: DispatchArg = { subscriber: this, result, value, index }; + this.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state)); + } + } else { + this.buffer.push(value); + } + } + + private subscribeToProjection(result: any, value: T, index: number): void { + this.active++; + this.add(subscribeToResult(this, result, value, index)); + } + + protected _complete(): void { + this.hasCompleted = true; + if (this.hasCompleted && this.active === 0) { + this.destination.complete(); + } + } + + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this._next(innerValue); + } + + notifyComplete(innerSub: Subscription): void { + const buffer = this.buffer; + this.remove(innerSub); + this.active--; + if (buffer && buffer.length > 0) { + this._next(buffer.shift()); + } + if (this.hasCompleted && this.active === 0) { + this.destination.complete(); + } + } +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 2d1943458d..1cc743c473 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -23,6 +23,7 @@ export { elementAt } from './elementAt'; export { every } from './every'; export { exhaust } from './exhaust'; export { exhaustMap } from './exhaustMap'; +export { expand } from './expand'; export { filter } from './filter'; export { ignoreElements } from './ignoreElements'; export { map } from './map';