diff --git a/spec/operators/publish-spec.js b/spec/operators/publish-spec.js index 4e06f8f0f2..3b9528f40f 100644 --- a/spec/operators/publish-spec.js +++ b/spec/operators/publish-spec.js @@ -132,10 +132,7 @@ describe('Observable.prototype.publish()', function () { it('should NOT be retryable', function () { var source = cold('-1-2-3----4-#'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publish().refCount().retry(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-#'; @@ -152,9 +149,7 @@ describe('Observable.prototype.publish()', function () { it('should NOT be repeatable', function () { var source = cold('-1-2-3----4-|'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publish().refCount().repeat(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-|'; diff --git a/spec/operators/publishBehavior-spec.js b/spec/operators/publishBehavior-spec.js index ad6e2808ea..881666ff1e 100644 --- a/spec/operators/publishBehavior-spec.js +++ b/spec/operators/publishBehavior-spec.js @@ -131,10 +131,7 @@ describe('Observable.prototype.publishBehavior()', function () { it('should NOT be retryable', function () { var source = cold('-1-2-3----4-#'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishBehavior('0').refCount().retry(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '01-2-3----4-#'; @@ -151,9 +148,7 @@ describe('Observable.prototype.publishBehavior()', function () { it('should NOT be repeatable', function () { var source = cold('-1-2-3----4-|'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishBehavior('0').refCount().repeat(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '01-2-3----4-|'; diff --git a/spec/operators/publishReplay-spec.js b/spec/operators/publishReplay-spec.js index c61c786b88..794bea6920 100644 --- a/spec/operators/publishReplay-spec.js +++ b/spec/operators/publishReplay-spec.js @@ -150,10 +150,7 @@ describe('Observable.prototype.publishReplay()', function () { it('should NOT be retryable', function () { var source = cold('-1-2-3----4-#'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishReplay(1).refCount().retry(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-(444#)'; @@ -170,9 +167,7 @@ describe('Observable.prototype.publishReplay()', function () { it('should NOT be repeatable', function () { var source = cold('-1-2-3----4-|'); - var sourceSubs = ['^ !', - ' (^!)', - ' (^!)']; + var sourceSubs = '^ !'; var published = source.publishReplay(1).refCount().repeat(3); var subscriber1 = hot('a| ').mergeMapTo(published); var expected1 = '-1-2-3----4-(44|)'; diff --git a/src/observables/ConnectableObservable.ts b/src/observables/ConnectableObservable.ts index 6f7a9546a2..12815a75e9 100644 --- a/src/observables/ConnectableObservable.ts +++ b/src/observables/ConnectableObservable.ts @@ -1,6 +1,7 @@ import {Subject} from '../Subject'; import {Observable} from '../Observable'; import {Subscription} from '../Subscription'; +import {Subscriber} from '../Subscriber'; export class ConnectableObservable extends Observable { @@ -24,7 +25,16 @@ export class ConnectableObservable extends Observable { return (this.subject = this.subjectFactory()); } - connect() { + connect(onSubscribe?: (subscription: Subscription) => void): Subscription { + if (onSubscribe) { + this._callbackConnect(onSubscribe); + return null; + } else { + return this._returningConnect(); + } + } + + _returningConnect(): Subscription { const source = this.source; let subscription = this.subscription; if (subscription && !subscription.isUnsubscribed) { @@ -35,6 +45,26 @@ export class ConnectableObservable extends Observable { return (this.subscription = subscription); } + /** + * Instructs the ConnectableObservable to begin emitting the items from its + * underlying source to its Subscribers. + * + * @param onSubscribe a function that receives the connection subscription + * before the subscription to source happens, allowing the caller to + * synchronously disconnect a synchronous source. + */ + _callbackConnect(onSubscribe: (subscription: Subscription) => void): void { + let subscription = this.subscription; + if (subscription && !subscription.isUnsubscribed) { + onSubscribe(subscription); + return; + } + this.subscription = subscription = new Subscription(); + onSubscribe(subscription); + subscription.add(this.source.subscribe(this._getSubject())); + subscription.add(new ConnectableSubscription(this)); + } + refCount(): Observable { return new RefCountObservable(this); } @@ -62,24 +92,60 @@ class RefCountObservable extends Observable { _subscribe(subscriber) { const connectable = this.connectable; - const subscription = connectable.subscribe(subscriber); - if (++this.refCount === 1) { - this.connection = connectable.connect(); + const refCountSubscriber = new RefCountSubscriber(subscriber, this); + refCountSubscriber.myConnection = this.connection; + const subscription = connectable.subscribe(refCountSubscriber); + + if (!subscription.isUnsubscribed && ++this.refCount === 1) { + connectable.connect(_subscription => { + refCountSubscriber.myConnection = this.connection = _subscription; + }); } - subscription.add(new RefCountSubscription(this)); return subscription; } } -class RefCountSubscription extends Subscription { +class RefCountSubscriber extends Subscriber { + myConnection: Subscription; - constructor(private refCountObservable: RefCountObservable) { - super(); + constructor(public destination: Subscriber, + private refCountObservable: RefCountObservable) { + super(null); + destination.add(this); + } + + _next(value: T) { + this.destination.next(value); + } + + _error(err: any) { + this._resetConnectable(); + this.destination.error(err); + } + + _complete() { + this._resetConnectable(); + this.destination.complete(); + } + + _resetConnectable() { + const observable = this.refCountObservable; + const myConnection = this.myConnection; + if (myConnection && myConnection === observable.connection) { + observable.refCount = 0; + observable.connection.unsubscribe(); + observable.connection = void 0; + this.unsubscribe(); + } } _unsubscribe() { const observable = this.refCountObservable; - if (--observable.refCount === 0) { + if (observable.refCount === 0) { + return; + } + const myConnection = this.myConnection; + if (--observable.refCount === 0 && myConnection && myConnection === observable.connection) { observable.connection.unsubscribe(); observable.connection = void 0; }