Skip to content

Commit

Permalink
feat(mergeScan): support concurrency parameter for mergeScan
Browse files Browse the repository at this point in the history
- expose concurrency parameter to interface of mergeScan
- expand test coverage to test concurrency works

closes ReactiveX#868
  • Loading branch information
kwonoj committed Dec 3, 2015
1 parent f1dc764 commit 76c0e99
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 5 deletions.
120 changes: 120 additions & 0 deletions spec/operators/mergeScan-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,124 @@ describe('Observable.prototype.mergeScan()', function () {
expectObservable(source, sub).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(sub);
});

it('should mergescan projects cold Observable with single concurrency', function () {
var e1 = hot('--a--b--c--|');
var e1subs = '^ !';

var inner = [
cold( '--d--e--f--| '),
cold( '--g--h--i--| '),
cold( '--j--k--l--|')
];

var xsubs = ' ^ !';
var ysubs = ' ^ !';
var zsubs = ' ^ !';

var expected = '--x-d--e--f--f-g--h--i--i-j--k--l--|';

var index = 0;
var source = e1.mergeScan(function (acc, x) {
var value = inner[index++];
return value.startWith(acc);
}, 'x', 1);

expectObservable(source).toBe(expected);

expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
});

it('should mergescan projects hot Observable with single concurrency', function () {
var e1 = hot('---a---b---c---|');
var e1subs = '^ !';

var inner = [
hot( '--d--e--f--|'),
hot( '----g----h----i----|'),
hot( '------j------k-------l------|')
];

var xsubs = ' ^ !';
var ysubs = ' ^ !';
var zsubs = ' ^ !';

var expected = '---x-e--f--f--i----i-l------|';

var index = 0;
var source = e1.mergeScan(function (acc, x) {
var value = inner[index++];
return value.startWith(acc);
}, 'x', 1);

expectObservable(source).toBe(expected);

expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
});

it('should mergescan projects cold Observable with dual concurrency', function () {
var e1 = hot('----a----b----c----|');
var e1subs = '^ !';

var inner = [
cold( '---d---e---f---| '),
cold( '---g---h---i---| '),
cold( '---j---k---l---|')
];

var xsubs = ' ^ !';
var ysubs = ' ^ !';
var zsubs = ' ^ !';

var expected = '----x--d-d-eg--fh--hi-j---k---l---|';

var index = 0;
var source = e1.mergeScan(function (acc, x) {
var value = inner[index++];
return value.startWith(acc);
}, 'x', 2);

expectObservable(source).toBe(expected);

expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
});

it('should mergescan projects hot Observable with dual concurrency', function () {
var e1 = hot('---a---b---c---|');
var e1subs = '^ !';

var inner = [
hot( '--d--e--f--|'),
hot( '----g----h----i----|'),
hot( '------j------k-------l------|')
];

var xsubs = ' ^ !';
var ysubs = ' ^ !';
var zsubs = ' ^ !';

var expected = '---x-e-efh-h-ki------l------|';

var index = 0;
var source = e1.mergeScan(function (acc, x) {
var value = inner[index++];
return value.startWith(acc);
}, 'x', 2);

expectObservable(source).toBe(expected);

expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
});
});
2 changes: 1 addition & 1 deletion src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface KitchenSinkOperators<T> extends CoreOperators<T> {
max?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
min?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
timeInterval?: <T>(scheduler?: IScheduler) => Observable<T>;
mergeScan?: <T, R>(project: (acc: R, x: T) => Observable<R>, seed: R) => Observable<R>;
mergeScan?: <T, R>(project: (acc: R, x: T) => Observable<R>, seed: R, concurrent?: number) => Observable<R>;
switchFirst?: () => Observable<T>;
switchMapFirst?: <R>(project: ((x: T, ix: number) => Observable<any>),
projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
Expand Down
10 changes: 6 additions & 4 deletions src/operator/extended/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import {KitchenSinkOperators} from '../../Rx.KitchenSink';

const observableProto = (<KitchenSinkOperators<any>>Observable.prototype);

export function mergeScan<T, R>(project: (acc: R, x: T) => Observable<R>, seed: R) {
return this.lift(new MergeScanOperator(project, seed));
export function mergeScan<T, R>(project: (acc: R, x: T) => Observable<R>,
seed: R,
concurrent: number = Number.POSITIVE_INFINITY) {
return this.lift(new MergeScanOperator(project, seed, concurrent));
}

export class MergeScanOperator<T, R> implements Operator<T, R> {
constructor(private project: (acc: R, x: T) => Observable<R>,
private seed: R,
private concurrent: number = Number.POSITIVE_INFINITY) {
private concurrent: number) {
}

call(subscriber: Subscriber<R>): Subscriber<T> {
Expand All @@ -37,7 +39,7 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
constructor(destination: Subscriber<R>,
private project: (acc: R, x: T) => Observable<R>,
private acc: R,
private concurrent: number = Number.POSITIVE_INFINITY) {
private concurrent: number) {
super(destination);
}

Expand Down

0 comments on commit 76c0e99

Please sign in to comment.