diff --git a/spec/operators/observeOn-spec.ts b/spec/operators/observeOn-spec.ts index 039f04c486..22e295c7b8 100644 --- a/spec/operators/observeOn-spec.ts +++ b/spec/operators/observeOn-spec.ts @@ -104,21 +104,21 @@ describe('Observable.prototype.observeOn', () => { .observeOn(Rx.Scheduler.asap) .subscribe( x => { - const observeOnSubscriber = subscription._subscriptions[0]._innerSub; + const observeOnSubscriber = subscription._subscriptions[0]; expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, and one for the notification - expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind) + expect(observeOnSubscriber._subscriptions[1].state.notification.kind) .to.equal('N'); - expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.value) + expect(observeOnSubscriber._subscriptions[1].state.notification.value) .to.equal(x); results.push(x); }, err => done(err), () => { // now that the last nexted value is done, there should only be a complete notification scheduled - const observeOnSubscriber = subscription._subscriptions[0]._innerSub; + const observeOnSubscriber = subscription._subscriptions[0]; expect(observeOnSubscriber._subscriptions.length).to.equal(2); // 1 for the consumer, one for the complete notification // only this completion notification should remain. - expect(observeOnSubscriber._subscriptions[1]._innerSub.state.notification.kind) + expect(observeOnSubscriber._subscriptions[1].state.notification.kind) .to.equal('C'); // After completion, the entire _subscriptions list is nulled out anyhow, so we can't test much further than this. expect(results).to.deep.equal([1, 2, 3]); diff --git a/spec/operators/switch-spec.ts b/spec/operators/switch-spec.ts index fd4135a2a0..1f0703ab6f 100644 --- a/spec/operators/switch-spec.ts +++ b/spec/operators/switch-spec.ts @@ -223,4 +223,44 @@ describe('Observable.prototype.switch', () => { expect(completed).to.be.true; }); -}); \ No newline at end of file + + it('should not leak when child completes before each switch (prevent memory leaks #2355)', () => { + let iStream: Rx.Subject; + const oStreamControl = new Rx.Subject(); + const oStream = oStreamControl.map(() => { + return (iStream = new Rx.Subject()); + }); + const switcher = oStream.switch(); + const result = []; + let sub = switcher.subscribe((x: number) => result.push(x)); + + [0, 1, 2, 3, 4].forEach((n) => { + oStreamControl.next(n); // creates inner + iStream.complete(); + }); + // Expect one child of switch(): The oStream + expect( + (sub)._subscriptions[0]._subscriptions.length + ).to.equal(1); + sub.unsubscribe(); + }); + + it('should not leak if we switch before child completes (prevent memory leaks #2355)', () => { + const oStreamControl = new Rx.Subject(); + const oStream = oStreamControl.map(() => { + return (new Rx.Subject()); + }); + const switcher = oStream.switch(); + const result = []; + let sub = switcher.subscribe((x: number) => result.push(x)); + + [0, 1, 2, 3, 4].forEach((n) => { + oStreamControl.next(n); // creates inner + }); + // Expect two children of switch(): The oStream and the first inner + expect( + (sub)._subscriptions[0]._subscriptions.length + ).to.equal(2); + sub.unsubscribe(); + }); +}); diff --git a/src/Subscriber.ts b/src/Subscriber.ts index 88c900fdf9..50f5e541e6 100644 --- a/src/Subscriber.ts +++ b/src/Subscriber.ts @@ -144,6 +144,18 @@ export class Subscriber extends Subscription implements Observer { this.destination.complete(); this.unsubscribe(); } + + protected _unsubscribeAndRecycle(): Subscriber { + const { _parent, _parents } = this; + this._parent = null; + this._parents = null; + this.unsubscribe(); + this.closed = false; + this.isStopped = false; + this._parent = _parent; + this._parents = _parents; + return this; + } } /** @@ -155,7 +167,7 @@ class SafeSubscriber extends Subscriber { private _context: any; - constructor(private _parent: Subscriber, + constructor(private _parentSubscriber: Subscriber, observerOrNext?: PartialObserver | ((value: T) => void), error?: (e?: any) => void, complete?: () => void) { @@ -185,10 +197,10 @@ class SafeSubscriber extends Subscriber { next(value?: T): void { if (!this.isStopped && this._next) { - const { _parent } = this; - if (!_parent.syncErrorThrowable) { + const { _parentSubscriber } = this; + if (!_parentSubscriber.syncErrorThrowable) { this.__tryOrUnsub(this._next, value); - } else if (this.__tryOrSetError(_parent, this._next, value)) { + } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) { this.unsubscribe(); } } @@ -196,21 +208,21 @@ class SafeSubscriber extends Subscriber { error(err?: any): void { if (!this.isStopped) { - const { _parent } = this; + const { _parentSubscriber } = this; if (this._error) { - if (!_parent.syncErrorThrowable) { + if (!_parentSubscriber.syncErrorThrowable) { this.__tryOrUnsub(this._error, err); this.unsubscribe(); } else { - this.__tryOrSetError(_parent, this._error, err); + this.__tryOrSetError(_parentSubscriber, this._error, err); this.unsubscribe(); } - } else if (!_parent.syncErrorThrowable) { + } else if (!_parentSubscriber.syncErrorThrowable) { this.unsubscribe(); throw err; } else { - _parent.syncErrorValue = err; - _parent.syncErrorThrown = true; + _parentSubscriber.syncErrorValue = err; + _parentSubscriber.syncErrorThrown = true; this.unsubscribe(); } } @@ -218,13 +230,13 @@ class SafeSubscriber extends Subscriber { complete(): void { if (!this.isStopped) { - const { _parent } = this; + const { _parentSubscriber } = this; if (this._complete) { - if (!_parent.syncErrorThrowable) { + if (!_parentSubscriber.syncErrorThrowable) { this.__tryOrUnsub(this._complete); this.unsubscribe(); } else { - this.__tryOrSetError(_parent, this._complete); + this.__tryOrSetError(_parentSubscriber, this._complete); this.unsubscribe(); } } else { @@ -254,9 +266,9 @@ class SafeSubscriber extends Subscriber { } protected _unsubscribe(): void { - const { _parent } = this; + const { _parentSubscriber } = this; this._context = null; - this._parent = null; - _parent.unsubscribe(); + this._parentSubscriber = null; + _parentSubscriber.unsubscribe(); } } diff --git a/src/Subscription.ts b/src/Subscription.ts index aec99264ec..6dd7d3258b 100644 --- a/src/Subscription.ts +++ b/src/Subscription.ts @@ -40,7 +40,9 @@ export class Subscription implements ISubscription { */ public closed: boolean = false; - private _subscriptions: ISubscription[]; + protected _parent: Subscription = null; + protected _parents: Subscription[] = null; + private _subscriptions: ISubscription[] = null; /** * @param {function(): void} [unsubscribe] A function describing how to @@ -66,11 +68,26 @@ export class Subscription implements ISubscription { return; } - this.closed = true; - - const { _unsubscribe, _subscriptions } = ( this); + let { _parent, _parents, _unsubscribe, _subscriptions } = ( this); - ( this)._subscriptions = null; + this.closed = true; + this._parent = null; + this._parents = null; + // null out _subscriptions first so any child subscriptions that attempt + // to remove themselves from this subscription will noop + this._subscriptions = null; + + let index = -1; + let len = _parents ? _parents.length : 0; + + // if this._parent is null, then so is this._parents, and we + // don't have to remove ourselves from any parent subscriptions. + while (_parent) { + _parent.remove(this); + // if this._parents is null or index >= len, + // then _parent is set to null, and the loop exits + _parent = ++index < len && _parents[index] || null; + } if (isFunction(_unsubscribe)) { let trial = tryCatch(_unsubscribe).call(this); @@ -85,8 +102,8 @@ export class Subscription implements ISubscription { if (isArray(_subscriptions)) { - let index = -1; - const len = _subscriptions.length; + index = -1; + len = _subscriptions.length; while (++index < len) { const sub = _subscriptions[index]; @@ -138,27 +155,33 @@ export class Subscription implements ISubscription { return this; } - let sub = ( teardown); + let subscription = ( teardown); switch (typeof teardown) { case 'function': - sub = new Subscription(<(() => void) > teardown); + subscription = new Subscription(<(() => void) > teardown); case 'object': - if (sub.closed || typeof sub.unsubscribe !== 'function') { - return sub; + if (subscription.closed || typeof subscription.unsubscribe !== 'function') { + return subscription; } else if (this.closed) { - sub.unsubscribe(); - return sub; + subscription.unsubscribe(); + return subscription; + } else if (typeof subscription._addParent !== 'function' /* quack quack */) { + const tmp = subscription; + subscription = new Subscription(); + subscription._subscriptions = [tmp]; } break; default: throw new Error('unrecognized teardown ' + teardown + ' added to Subscription.'); } - const childSub = new ChildSubscription(sub, this); - this._subscriptions = this._subscriptions || []; - this._subscriptions.push(childSub); - return childSub; + const subscriptions = this._subscriptions || (this._subscriptions = []); + + subscriptions.push(subscription); + subscription._addParent(this); + + return subscription; } /** @@ -168,16 +191,7 @@ export class Subscription implements ISubscription { * @return {void} */ remove(subscription: Subscription): void { - - // HACK: This might be redundant because of the logic in `add()` - if (subscription == null || ( - subscription === this) || ( - subscription === Subscription.EMPTY)) { - return; - } - - const subscriptions = ( this)._subscriptions; - + const subscriptions = this._subscriptions; if (subscriptions) { const subscriptionIndex = subscriptions.indexOf(subscription); if (subscriptionIndex !== -1) { @@ -185,20 +199,24 @@ export class Subscription implements ISubscription { } } } -} - -export class ChildSubscription extends Subscription { - constructor(private _innerSub: ISubscription, private _parent: Subscription) { - super(); - } - _unsubscribe() { - const { _innerSub, _parent } = this; - _parent.remove(this); - _innerSub.unsubscribe(); + private _addParent(parent: Subscription) { + let { _parent, _parents } = this; + if (!_parent || _parent === parent) { + // If we don't have a parent, or the new parent is the same as the + // current parent, then set this._parent to the new parent. + this._parent = parent; + } else if (!_parents) { + // If there's already one parent, but not multiple, allocate an Array to + // store the rest of the parent Subscriptions. + this._parents = [parent]; + } else if (_parents.indexOf(parent) === -1) { + // Only add the new parent to the _parents list if it's not already there. + _parents.push(parent); + } } } function flattenUnsubscriptionErrors(errors: any[]) { return errors.reduce((errs, err) => errs.concat((err instanceof UnsubscriptionError) ? err.errors : err), []); -} \ No newline at end of file +} diff --git a/src/operator/catch.ts b/src/operator/catch.ts index 2fb91ffbd5..961ff4f953 100644 --- a/src/operator/catch.ts +++ b/src/operator/catch.ts @@ -107,9 +107,7 @@ class CatchSubscriber extends OuterSubscriber { super.error(err2); return; } - this.unsubscribe(); - this.closed = false; - this.isStopped = false; + this._unsubscribeAndRecycle(); this.add(subscribeToResult(this, result)); } } diff --git a/src/operator/observeOn.ts b/src/operator/observeOn.ts index e4650aaa8c..0b437bd639 100644 --- a/src/operator/observeOn.ts +++ b/src/operator/observeOn.ts @@ -4,7 +4,7 @@ import { Operator } from '../Operator'; import { PartialObserver } from '../Observer'; import { Subscriber } from '../Subscriber'; import { Notification } from '../Notification'; -import { TeardownLogic, Subscription } from '../Subscription'; +import { TeardownLogic } from '../Subscription'; import { Action } from '../scheduler/Action'; /** @@ -36,11 +36,9 @@ export class ObserveOnOperator implements Operator { */ export class ObserveOnSubscriber extends Subscriber { static dispatch(this: Action, arg: ObserveOnMessage) { - const { notification, destination, subscription } = arg; + const { notification, destination } = arg; notification.observe(destination); - if (subscription) { - subscription.unsubscribe(); - } + this.unsubscribe(); } constructor(destination: Subscriber, @@ -50,10 +48,11 @@ export class ObserveOnSubscriber extends Subscriber { } private scheduleMessage(notification: Notification): void { - const message = new ObserveOnMessage(notification, this.destination); - message.subscription = this.add( - this.scheduler.schedule(ObserveOnSubscriber.dispatch, this.delay, message) - ); + this.add(this.scheduler.schedule( + ObserveOnSubscriber.dispatch, + this.delay, + new ObserveOnMessage(notification, this.destination) + )); } protected _next(value: T): void { @@ -70,8 +69,6 @@ export class ObserveOnSubscriber extends Subscriber { } export class ObserveOnMessage { - public subscription: Subscription; - constructor(public notification: Notification, public destination: PartialObserver) { } diff --git a/src/operator/repeat.ts b/src/operator/repeat.ts index b3b7fb8b82..1f0abb36ff 100644 --- a/src/operator/repeat.ts +++ b/src/operator/repeat.ts @@ -56,10 +56,7 @@ class RepeatSubscriber extends Subscriber { } else if (count > -1) { this.count = count - 1; } - this.unsubscribe(); - this.isStopped = false; - this.closed = false; - source.subscribe(this); + source.subscribe(this._unsubscribeAndRecycle()); } } } diff --git a/src/operator/repeatWhen.ts b/src/operator/repeatWhen.ts index 31d4e7e349..e70c316763 100644 --- a/src/operator/repeatWhen.ts +++ b/src/operator/repeatWhen.ts @@ -60,8 +60,8 @@ class RepeatWhenSubscriber extends OuterSubscriber { notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber): void { - this.source.subscribe(this); this.sourceIsBeingSubscribedTo = true; + this.source.subscribe(this); } notifyComplete(innerSub: InnerSubscriber): void { @@ -80,7 +80,7 @@ class RepeatWhenSubscriber extends OuterSubscriber { return super.complete(); } - this.temporarilyUnsubscribe(); + this._unsubscribeAndRecycle(); this.notifications.next(); } } @@ -98,29 +98,25 @@ class RepeatWhenSubscriber extends OuterSubscriber { this.retries = null; } - private subscribeToRetries() { - this.notifications = new Subject(); - const retries = tryCatch(this.notifier)(this.notifications); - if (retries === errorObject) { - return super.complete(); - } - this.retries = retries; - this.retriesSubscription = subscribeToResult(this, retries); - } - - private temporarilyUnsubscribe() { + protected _unsubscribeAndRecycle(): Subscriber { const { notifications, retries, retriesSubscription } = this; this.notifications = null; this.retries = null; this.retriesSubscription = null; - - this.unsubscribe(); - this.isStopped = false; - this.closed = false; - + super._unsubscribeAndRecycle(); this.notifications = notifications; this.retries = retries; this.retriesSubscription = retriesSubscription; + return this; } + private subscribeToRetries() { + this.notifications = new Subject(); + const retries = tryCatch(this.notifier)(this.notifications); + if (retries === errorObject) { + return super.complete(); + } + this.retries = retries; + this.retriesSubscription = subscribeToResult(this, retries); + } } diff --git a/src/operator/retry.ts b/src/operator/retry.ts index fe267df6ed..30acb31d0d 100644 --- a/src/operator/retry.ts +++ b/src/operator/retry.ts @@ -53,10 +53,7 @@ class RetrySubscriber extends Subscriber { } else if (count > -1) { this.count = count - 1; } - this.unsubscribe(); - this.isStopped = false; - this.closed = false; - source.subscribe(this); + source.subscribe(this._unsubscribeAndRecycle()); } } } diff --git a/src/operator/retryWhen.ts b/src/operator/retryWhen.ts index 21633648d7..2f7c1a6db3 100644 --- a/src/operator/retryWhen.ts +++ b/src/operator/retryWhen.ts @@ -76,8 +76,7 @@ class RetryWhenSubscriber extends OuterSubscriber { this.retriesSubscription = null; } - this.unsubscribe(); - this.closed = false; + this._unsubscribeAndRecycle(); this.errors = errors; this.retries = retries; @@ -103,15 +102,12 @@ class RetryWhenSubscriber extends OuterSubscriber { notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber): void { - const { errors, retries, retriesSubscription } = this; this.errors = null; this.retries = null; this.retriesSubscription = null; - this.unsubscribe(); - this.isStopped = false; - this.closed = false; + this._unsubscribeAndRecycle(); this.errors = errors; this.retries = retries;