From aef9578ceda42712a216620c04ed6f96756c13fe Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Fri, 13 Nov 2015 14:16:49 +0200 Subject: [PATCH] fix(ConnectableObservable): fix ConnectableObservable connectability and refCounting When the ConnectableObservable with refCount always shares the same instance of the underlying subject (such as in publish, publishReplay, publishBehavior), the subscription to the connectable observable should NOT incur additional subscriptions to the underlying cold source. See how tests for publish/publishBehavior/publishReplay were updated to assert that only one subscription to the underlying cold source happens, not multiple, because as soon as the multicasting subject raises an error, this error impedes subsequent subscriptions to the cold source from happening. Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other asynchronous scenarios. Resolves bug #678. --- spec/operators/publish-spec.js | 9 +-- spec/operators/publishBehavior-spec.js | 9 +-- spec/operators/publishReplay-spec.js | 9 +-- src/observables/ConnectableObservable.ts | 84 +++++++++++++++++++++--- 4 files changed, 81 insertions(+), 30 deletions(-) diff --git a/spec/operators/publish-spec.js b/spec/operators/publish-spec.js index 4e06f8f0f2..3b9528f40f 100644 --- a/spec/operators/publish-spec.js +++ b/spec/operators/publish-spec.js @@ -132,10 +132,7 @@ describe('Observable.prototype.publish()', function () { it('should NOT be retryable', function () { var source = cold('-1-2-3----4-#'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publish().refCount().retry(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-#'; @@ -152,9 +149,7 @@ describe('Observable.prototype.publish()', function () { it('should NOT be repeatable', function () { var source = cold('-1-2-3----4-|'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publish().refCount().repeat(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-|'; diff --git a/spec/operators/publishBehavior-spec.js b/spec/operators/publishBehavior-spec.js index ad6e2808ea..881666ff1e 100644 --- a/spec/operators/publishBehavior-spec.js +++ b/spec/operators/publishBehavior-spec.js @@ -131,10 +131,7 @@ describe('Observable.prototype.publishBehavior()', function () { it('should NOT be retryable', function () { var source = cold('-1-2-3----4-#'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishBehavior('0').refCount().retry(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '01-2-3----4-#'; @@ -151,9 +148,7 @@ describe('Observable.prototype.publishBehavior()', function () { it('should NOT be repeatable', function () { var source = cold('-1-2-3----4-|'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishBehavior('0').refCount().repeat(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '01-2-3----4-|'; diff --git a/spec/operators/publishReplay-spec.js b/spec/operators/publishReplay-spec.js index c61c786b88..794bea6920 100644 --- a/spec/operators/publishReplay-spec.js +++ b/spec/operators/publishReplay-spec.js @@ -150,10 +150,7 @@ describe('Observable.prototype.publishReplay()', function () { it('should NOT be retryable', function () { var source = cold('-1-2-3----4-#'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishReplay(1).refCount().retry(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-(444#)'; @@ -170,9 +167,7 @@ describe('Observable.prototype.publishReplay()', function () { it('should NOT be repeatable', function () { var source = cold('-1-2-3----4-|'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishReplay(1).refCount().repeat(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-(44|)'; diff --git a/src/observables/ConnectableObservable.ts b/src/observables/ConnectableObservable.ts index 6f7a9546a2..12815a75e9 100644 --- a/src/observables/ConnectableObservable.ts +++ b/src/observables/ConnectableObservable.ts @@ -1,6 +1,7 @@ import {Subject} from '../Subject'; import {Observable} from '../Observable'; import {Subscription} from '../Subscription'; +import {Subscriber} from '../Subscriber'; export class ConnectableObservable extends Observable { @@ -24,7 +25,16 @@ export class ConnectableObservable extends Observable { return (this.subject = this.subjectFactory()); } - connect() { + connect(onSubscribe?: (subscription: Subscription) => void): Subscription { + if (onSubscribe) { + this._callbackConnect(onSubscribe); + return null; + } else { + return this._returningConnect(); + } + } + + _returningConnect(): Subscription { const source = this.source; let subscription = this.subscription; if (subscription && !subscription.isUnsubscribed) { @@ -35,6 +45,26 @@ export class ConnectableObservable extends Observable { return (this.subscription = subscription); } + /** + * Instructs the ConnectableObservable to begin emitting the items from its + * underlying source to its Subscribers. + * + * @param onSubscribe a function that receives the connection subscription + * before the subscription to source happens, allowing the caller to + * synchronously disconnect a synchronous source. + */ + _callbackConnect(onSubscribe: (subscription: Subscription) => void): void { + let subscription = this.subscription; + if (subscription && !subscription.isUnsubscribed) { + onSubscribe(subscription); + return; + } + this.subscription = subscription = new Subscription(); + onSubscribe(subscription); + subscription.add(this.source.subscribe(this._getSubject())); + subscription.add(new ConnectableSubscription(this)); + } + refCount(): Observable { return new RefCountObservable(this); } @@ -62,24 +92,60 @@ class RefCountObservable extends Observable { _subscribe(subscriber) { const connectable = this.connectable; - const subscription = connectable.subscribe(subscriber); - if (++this.refCount === 1) { - this.connection = connectable.connect(); + const refCountSubscriber = new RefCountSubscriber(subscriber, this); + refCountSubscriber.myConnection = this.connection; + const subscription = connectable.subscribe(refCountSubscriber); + + if (!subscription.isUnsubscribed && ++this.refCount === 1) { + connectable.connect(_subscription => { + refCountSubscriber.myConnection = this.connection = _subscription; + }); } - subscription.add(new RefCountSubscription(this)); return subscription; } } -class RefCountSubscription extends Subscription { +class RefCountSubscriber extends Subscriber { + myConnection: Subscription; - constructor(private refCountObservable: RefCountObservable) { - super(); + constructor(public destination: Subscriber, + private refCountObservable: RefCountObservable) { + super(null); + destination.add(this); + } + + _next(value: T) { + this.destination.next(value); + } + + _error(err: any) { + this._resetConnectable(); + this.destination.error(err); + } + + _complete() { + this._resetConnectable(); + this.destination.complete(); + } + + _resetConnectable() { + const observable = this.refCountObservable; + const myConnection = this.myConnection; + if (myConnection && myConnection === observable.connection) { + observable.refCount = 0; + observable.connection.unsubscribe(); + observable.connection = void 0; + this.unsubscribe(); + } } _unsubscribe() { const observable = this.refCountObservable; - if (--observable.refCount === 0) { + if (observable.refCount === 0) { + return; + } + const myConnection = this.myConnection; + if (--observable.refCount === 0 && myConnection && myConnection === observable.connection) { observable.connection.unsubscribe(); observable.connection = void 0; }