From accbcd0c5f9fd5976be3f491d454c4a61f699c4b Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 5 Oct 2017 19:32:41 -0700 Subject: [PATCH] fix(shareReplay): properly retains history on subscribe (#2910) When the refCount hits zero, and the source has neither completed nor errored, `shareReplay` will now properly stay subscribed to the source, and retain the internal ReplaySubject that is caching values fixes #2908 --- spec/operators/shareReplay-spec.ts | 15 ++++++++++ src/operators/shareReplay.ts | 44 ++++++++++++++++++++++-------- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index 0363beeab8..e1f9c29521 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -7,6 +7,8 @@ declare const hot: typeof marbleTestingSignature.hot; declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; +declare const time: typeof marbleTestingSignature.time; +declare const rxTestScheduler: typeof marbleTestingSignature.rxTestScheduler; const Observable = Rx.Observable; @@ -164,4 +166,17 @@ describe('Observable.prototype.shareReplay', () => { expectObservable(subscriber3).toBe(expected3); expectSubscriptions(source.subscriptions).toBe(subs); }); + + it('should not restart if refCount hits 0 due to unsubscriptions', () => { + const results = []; + const source = Rx.Observable.interval(10, rxTestScheduler) + .take(10) + .shareReplay(1); + const subs = source.subscribe(x => results.push(x)); + rxTestScheduler.schedule(() => subs.unsubscribe(), 35); + rxTestScheduler.schedule(() => source.subscribe(x => results.push(x)), 54); + + rxTestScheduler.flush(); + expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]); + }); }); diff --git a/src/operators/shareReplay.ts b/src/operators/shareReplay.ts index 6b77042c98..c3479fea1d 100644 --- a/src/operators/shareReplay.ts +++ b/src/operators/shareReplay.ts @@ -1,25 +1,45 @@ import { Observable } from '../Observable'; -import { multicast } from './multicast'; -import { refCount } from './refCount'; import { ReplaySubject } from '../ReplaySubject'; -import { ConnectableObservable } from '../observable/ConnectableObservable'; import { IScheduler } from '../Scheduler'; - +import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction } from '../interfaces'; - /** * @method shareReplay * @owner Observable */ export function shareReplay(bufferSize?: number, windowTime?: number, scheduler?: IScheduler ): MonoTypeOperatorFunction { let subject: ReplaySubject; + let refCount = 0; + let subscription: Subscription; + let hasError = false; + let isComplete = true; - const connectable = multicast(function shareReplaySubjectFactory(this: ConnectableObservable) { - if (this._isComplete) { - return subject; - } else { - return (subject = new ReplaySubject(bufferSize, windowTime, scheduler)); + return (source: Observable) => new Observable(observer => { + refCount++; + if (!subject || hasError) { + hasError = false; + subject = new ReplaySubject(bufferSize, windowTime, scheduler); + subscription = source.subscribe({ + next(value) { subject.next(value); }, + error(err) { + hasError = true; + subject.error(err); + }, + complete() { + isComplete = true; + subject.complete(); + }, + }); } + + const innerSub = subject.subscribe(observer); + + return () => { + refCount--; + innerSub.unsubscribe(); + if (subscription && refCount === 0 && !isComplete) { + subscription.unsubscribe(); + } + }; }); - return ((source: Observable) => refCount()(connectable(source))) as MonoTypeOperatorFunction; -}; \ No newline at end of file +};