Skip to content

Commit

Permalink
perf(forkJoin): improve forkJoin perf slightly by removing unnecessar…
Browse files Browse the repository at this point in the history
…y context tracking
  • Loading branch information
benlesh committed Mar 28, 2016
1 parent 38b0b24 commit 280b985
Showing 1 changed file with 30 additions and 49 deletions.
79 changes: 30 additions & 49 deletions src/observable/ForkJoinObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,82 +56,63 @@ export class ForkJoinObservable<T> extends Observable<T> {
}
}

interface ForkJoinContext {
completed: number;
total: number;
values: Array<any>;
haveValues: Array<boolean>;
selector: Function;
}

class ForkJoinSubscriber<T> extends OuterSubscriber<T, T> {
private context: ForkJoinContext = null;
private completed = 0;
private total: number;
private values: any[];
private haveValues = 0;

constructor(destination: Subscriber<T>,
private sources: Array<SubscribableOrPromise<any>>,
resultSelector?: (...values: Array<any>) => T) {
private resultSelector?: (...values: Array<any>) => T) {
super(destination);

const len = sources.length;
this.context = { completed: 0,
total: len,
values: new Array(len),
haveValues: new Array(len),
selector: resultSelector };
this.total = len;
this.values = new Array(len);

for (let i = 0; i < len; i++) {
const source = sources[i];
const innerSubscription = subscribeToResult(this, source, null, i);

this.tryForkJoin();
if (innerSubscription) {
(<any> innerSubscription).outerIndex = i;
this.add(innerSubscription);
}
}
}

notifyNext(outerValue: any, innerValue: T,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, T>): void {
const context = this.context;

context.values[outerIndex] = innerValue;
context.haveValues[outerIndex] = true;
this.values[outerIndex] = innerValue;
if (!(<any>innerSub)._hasValue) {
(<any>innerSub)._hasValue = true;
this.haveValues++;
}
}

notifyComplete(innerSub: InnerSubscriber<T, T>): void {
const outerIndex = (<any>innerSub).outerIndex;
this.tryComplete(outerIndex);
}

private tryComplete(index: number): void {
const destination = this.destination;
const context = this.context;
const { haveValues, resultSelector, values } = this;
const len = values.length;

context.completed++;

if (!context.haveValues[index]) {
if (!(<any>innerSub)._hasValue) {
destination.complete();
return;
}

const values = context.values;
if (context.completed !== values.length) {
this.completed++;

if (this.completed !== len) {
return;
}

if (context.haveValues.every(x => x === true)) {
const value = context.selector ? context.selector.apply(this, values) :
values;
if (haveValues === len) {
const value = resultSelector ? resultSelector.apply(this, values) : values;
destination.next(value);
}

destination.complete();
}

private tryForkJoin(): void {
const sources = this.sources;
const len = sources.length;

for (let i = 0; i < len; i++) {
const source = sources[i];
const innerSubscription = subscribeToResult(this, source, null, i);

if (innerSubscription) {
(<any> innerSubscription).outerIndex = i;
this.add(innerSubscription);
}
}
}
}

0 comments on commit 280b985

Please sign in to comment.