diff --git a/src/observable/ForkJoinObservable.ts b/src/observable/ForkJoinObservable.ts index fe542ea4ba..539c52b900 100644 --- a/src/observable/ForkJoinObservable.ts +++ b/src/observable/ForkJoinObservable.ts @@ -56,82 +56,63 @@ export class ForkJoinObservable extends Observable { } } -interface ForkJoinContext { - completed: number; - total: number; - values: Array; - haveValues: Array; - selector: Function; -} - class ForkJoinSubscriber extends OuterSubscriber { - private context: ForkJoinContext = null; + private completed = 0; + private total: number; + private values: any[]; + private haveValues = 0; constructor(destination: Subscriber, private sources: Array>, - resultSelector?: (...values: Array) => T) { + private resultSelector?: (...values: Array) => T) { super(destination); const len = sources.length; - this.context = { completed: 0, - total: len, - values: new Array(len), - haveValues: new Array(len), - selector: resultSelector }; + this.total = len; + this.values = new Array(len); + + for (let i = 0; i < len; i++) { + const source = sources[i]; + const innerSubscription = subscribeToResult(this, source, null, i); - this.tryForkJoin(); + if (innerSubscription) { + ( innerSubscription).outerIndex = i; + this.add(innerSubscription); + } + } } notifyNext(outerValue: any, innerValue: T, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber): void { - const context = this.context; - - context.values[outerIndex] = innerValue; - context.haveValues[outerIndex] = true; + this.values[outerIndex] = innerValue; + if (!(innerSub)._hasValue) { + (innerSub)._hasValue = true; + this.haveValues++; + } } notifyComplete(innerSub: InnerSubscriber): void { - const outerIndex = (innerSub).outerIndex; - this.tryComplete(outerIndex); - } - - private tryComplete(index: number): void { const destination = this.destination; - const context = this.context; + const { haveValues, resultSelector, values } = this; + const len = values.length; - context.completed++; - - if (!context.haveValues[index]) { + if (!(innerSub)._hasValue) { destination.complete(); + return; } - const values = context.values; - if (context.completed !== values.length) { + this.completed++; + + if (this.completed !== len) { return; } - if (context.haveValues.every(x => x === true)) { - const value = context.selector ? context.selector.apply(this, values) : - values; + if (haveValues === len) { + const value = resultSelector ? resultSelector.apply(this, values) : values; destination.next(value); } destination.complete(); } - - private tryForkJoin(): void { - const sources = this.sources; - const len = sources.length; - - for (let i = 0; i < len; i++) { - const source = sources[i]; - const innerSubscription = subscribeToResult(this, source, null, i); - - if (innerSubscription) { - ( innerSubscription).outerIndex = i; - this.add(innerSubscription); - } - } - } } \ No newline at end of file