From 1bfb558b403248298659e1edaed83a68e10e1d75 Mon Sep 17 00:00:00 2001 From: benlesh Date: Sun, 8 Oct 2017 20:30:42 -0700 Subject: [PATCH] fix(shareReplay): properly uses `lift` resolves #2921 --- spec/operators/shareReplay-spec.ts | 32 ++++++++++++++++++++++++++++++ src/operators/shareReplay.ts | 14 +++++++++---- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index e1f9c29521..740f5f5840 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -179,4 +179,36 @@ describe('Observable.prototype.shareReplay', () => { rxTestScheduler.flush(); expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]); }); + + it('should not break lift() composability', (done: MochaDone) => { + class MyCustomObservable extends Rx.Observable { + lift(operator: Rx.Operator): Rx.Observable { + const observable = new MyCustomObservable(); + (observable).source = this; + (observable).operator = operator; + return observable; + } + } + + const result = new MyCustomObservable((observer: Rx.Observer) => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.complete(); + }).shareReplay(); + + expect(result instanceof MyCustomObservable).to.be.true; + + const expected = [1, 2, 3]; + + result + .subscribe((n: any) => { + expect(expected.length).to.be.greaterThan(0); + expect(n).to.equal(expected.shift()); + }, (x) => { + done(new Error('should not be called')); + }, () => { + done(); + }); + }); }); diff --git a/src/operators/shareReplay.ts b/src/operators/shareReplay.ts index 5760b94ee1..853e1bbbb7 100644 --- a/src/operators/shareReplay.ts +++ b/src/operators/shareReplay.ts @@ -3,18 +3,24 @@ import { ReplaySubject } from '../ReplaySubject'; import { IScheduler } from '../Scheduler'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction } from '../interfaces'; +import { Subscriber } from '../Subscriber'; + /** * @method shareReplay * @owner Observable */ export function shareReplay(bufferSize?: number, windowTime?: number, scheduler?: IScheduler ): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(shareReplayOperator(bufferSize, windowTime, scheduler)); +} + +export function shareReplayOperator(bufferSize?: number, windowTime?: number, scheduler?: IScheduler) { let subject: ReplaySubject; let refCount = 0; let subscription: Subscription; let hasError = false; let isComplete = false; - return (source: Observable) => new Observable(observer => { + return function shareReplayOperation(this: Subscriber, source: Observable) { refCount++; if (!subject || hasError) { hasError = false; @@ -32,7 +38,7 @@ export function shareReplay(bufferSize?: number, windowTime?: number, schedul }); } - const innerSub = subject.subscribe(observer); + const innerSub = subject.subscribe(this); return () => { refCount--; @@ -41,5 +47,5 @@ export function shareReplay(bufferSize?: number, windowTime?: number, schedul subscription.unsubscribe(); } }; - }); -}; + }; +}; \ No newline at end of file