Skip to content

Commit

Permalink
feat(shareReplay): adds shareReplay variant of publishReplay (#2443)
Browse files Browse the repository at this point in the history
`shareReplay` returns an observable that is the source multicasted over a `ReplaySubject`. That replay subject is recycled on error from the `source`, but not on completion of the source. This makes `shareReplay` ideal for handling things like caching AJAX results, as it's retryable. It's repeat behavior, however, differs from `share` in that it will not repeat the `source` observable, rather it will repeat the `source` observable's values.

related #2013, #453, #2043
  • Loading branch information
benlesh authored May 9, 2017
1 parent 4ffbbe5 commit 5a2266a
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 10 deletions.
4 changes: 3 additions & 1 deletion spec/helpers/marble-testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ import {Observable} from '../../dist/cjs/Observable';
import {SubscriptionLog} from '../../dist/cjs/testing/SubscriptionLog';
import {ColdObservable} from '../../dist/cjs/testing/ColdObservable';
import {HotObservable} from '../../dist/cjs/testing/HotObservable';
import {observableToBeFn, subscriptionLogsToBeFn} from '../../dist/cjs/testing/TestScheduler';
import {TestScheduler, observableToBeFn, subscriptionLogsToBeFn} from '../../dist/cjs/testing/TestScheduler';

declare const global: any;

export const rxTestScheduler: TestScheduler = global.rxTestScheduler;

export function hot(marbles: string, values?: any, error?: any): HotObservable<any> {
if (!global.rxTestScheduler) {
throw 'tried to use hot() in async test';
Expand Down
167 changes: 167 additions & 0 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

declare const { asDiagram };
declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;

const Observable = Rx.Observable;

/** @test {shareReplay} */
describe('Observable.prototype.shareReplay', () => {
it('should mirror a simple source Observable', () => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = '^ !';
const published = source.shareReplay();
const expected = '--1-2---3-4--5-|';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should do nothing if result is not subscribed', () => {
let subscribed = false;
const source = new Observable(() => {
subscribed = true;
});
source.shareReplay();
expect(subscribed).to.be.false;
});

it('should multicast the same values to multiple observers, bufferSize=1', () => {
const source = cold('-1-2-3----4-|'); const shared = source.shareReplay(1);
const sourceSubs = '^ !';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-3----4-|';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' 23----4-|';
const subscriber3 = hot(' c| ').mergeMapTo(shared);
const expected3 = ' 3-4-|';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast the same values to multiple observers, bufferSize=2', () => {
const source = cold('-1-2-----3------4-|'); const shared = source.shareReplay(2);
const sourceSubs = '^ !';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-----3------4-|';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' (12)-3------4-|';
const subscriber3 = hot(' c| ').mergeMapTo(shared);
const expected3 = ' (23)-4-|';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast an error from the source to multiple observers', () => {
const source = cold('-1-2-3----4-#'); const shared = source.shareReplay(1);
const sourceSubs = '^ !';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-3----4-#';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' 23----4-#';
const subscriber3 = hot(' c| ').mergeMapTo(shared);
const expected3 = ' 3-4-#';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast an empty source', () => {
const source = cold('|');
const sourceSubs = '(^!)';
const shared = source.shareReplay(1);
const expected = '|';

expectObservable(shared).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast a never source', () => {
const source = cold('-');
const sourceSubs = '^';

const shared = source.shareReplay(1);
const expected = '-';

expectObservable(shared).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast a throw source', () => {
const source = cold('#');
const sourceSubs = '(^!)';
const shared = source.shareReplay(1);
const expected = '#';

expectObservable(shared).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should replay results to subsequent subscriptions if source completes, bufferSize=2', () => {
const source = cold('-1-2-----3-| ');
const shared = source.shareReplay(2);
const sourceSubs = '^ ! ';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-----3-| ';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' (12)-3-| ';
const subscriber3 = hot(' (c|) ').mergeMapTo(shared);
const expected3 = ' (23|)';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should completely restart for subsequent subscriptions if source errors, bufferSize=2', () => {
const source = cold('-1-2-----3-# ');
const shared = source.shareReplay(2);
const sourceSubs1 = '^ ! ';
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-----3-# ';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' (12)-3-# ';
const subscriber3 = hot(' (c|) ').mergeMapTo(shared);
const expected3 = ' -1-2-----3-#';
const sourceSubs2 = ' ^ !';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe([sourceSubs1, sourceSubs2]);
});

it('should be retryable, bufferSize=2', () => {
const subs = [];
const source = cold('-1-2-----3-# ');
const shared = source.shareReplay(2).retry(1);
subs.push( '^ ! ');
subs.push( ' ^ ! ');
subs.push( ' ^ !');
const subscriber1 = hot('a| ').mergeMapTo(shared);
const expected1 = '-1-2-----3--1-2-----3-# ';
const subscriber2 = hot(' b| ').mergeMapTo(shared);
const expected2 = ' (12)-3--1-2-----3-# ';
const subscriber3 = hot(' (c|) ').mergeMapTo(shared);
const expected3 = ' (12)-3--1-2-----3-#';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ import './add/operator/sampleTime';
import './add/operator/scan';
import './add/operator/sequenceEqual';
import './add/operator/share';
import './add/operator/shareReplay';
import './add/operator/single';
import './add/operator/skip';
import './add/operator/skipLast';
Expand Down
11 changes: 11 additions & 0 deletions src/add/operator/shareReplay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

import { Observable } from '../../Observable';
import { shareReplay } from '../../operator/shareReplay';

Observable.prototype.shareReplay = shareReplay;

declare module '../../Observable' {
interface Observable<T> {
shareReplay: typeof shareReplay;
}
}
24 changes: 15 additions & 9 deletions src/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export class ConnectableObservable<T> extends Observable<T> {
protected _subject: Subject<T>;
protected _refCount: number = 0;
protected _connection: Subscription;
_isComplete = false;

constructor(protected source: Observable<T>,
protected subjectFactory: () => Subject<T>) {
Expand All @@ -33,6 +34,7 @@ export class ConnectableObservable<T> extends Observable<T> {
connect(): Subscription {
let connection = this._connection;
if (!connection) {
this._isComplete = false;
connection = this._connection = new Subscription();
connection.add(this.source
.subscribe(new ConnectableSubscriber(this.getSubject(), this)));
Expand All @@ -51,15 +53,18 @@ export class ConnectableObservable<T> extends Observable<T> {
}
}

const connectableProto = <any>ConnectableObservable.prototype;

export const connectableObservableDescriptor: PropertyDescriptorMap = {
operator: { value: null },
_refCount: { value: 0, writable: true },
_subject: { value: null, writable: true },
_connection: { value: null, writable: true },
_subscribe: { value: (<any> ConnectableObservable.prototype)._subscribe },
getSubject: { value: (<any> ConnectableObservable.prototype).getSubject },
connect: { value: (<any> ConnectableObservable.prototype).connect },
refCount: { value: (<any> ConnectableObservable.prototype).refCount }
_subscribe: { value: connectableProto._subscribe },
_isComplete: { value: connectableProto._isComplete, writable: true },
getSubject: { value: connectableProto.getSubject },
connect: { value: connectableProto.connect },
refCount: { value: connectableProto.refCount }
};

class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
Expand All @@ -72,17 +77,18 @@ class ConnectableSubscriber<T> extends SubjectSubscriber<T> {
super._error(err);
}
protected _complete(): void {
this.connectable._isComplete = true;
this._unsubscribe();
super._complete();
}
protected _unsubscribe() {
const { connectable } = this;
const connectable = <any>this.connectable;
if (connectable) {
this.connectable = null;
const connection = (<any> connectable)._connection;
(<any> connectable)._refCount = 0;
(<any> connectable)._subject = null;
(<any> connectable)._connection = null;
const connection = connectable._connection;
connectable._refCount = 0;
connectable._subject = null;
connectable._connection = null;
if (connection) {
connection.unsubscribe();
}
Expand Down
26 changes: 26 additions & 0 deletions src/operator/shareReplay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Observable } from '../Observable';
import { multicast } from './multicast';
import { ReplaySubject } from '../ReplaySubject';
import { ConnectableObservable } from '../observable/ConnectableObservable';
import { IScheduler } from '../Scheduler';

/**
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(
this: Observable<T>,
bufferSize?: number,
windowTime?: number,
scheduler?: IScheduler
): Observable<T> {
let subject: ReplaySubject<T>;
const connectable = multicast.call(this, function shareReplaySubjectFactory(this: ConnectableObservable<T>) {
if (this._isComplete) {
return subject;
} else {
return (subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler));
}
});
return connectable.refCount();
};

0 comments on commit 5a2266a

Please sign in to comment.