Skip to content

Commit

Permalink
feat(mergeScan): add new mergeScan operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt authored and benlesh committed Nov 6, 2015
1 parent 73d743d commit 0ebb5bd
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 0 deletions.
106 changes: 106 additions & 0 deletions spec/operators/mergeScan-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/* globals describe, it, expect, expectObservable, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.mergeScan()', function () {
it('should mergeScan things', function () {
var e1 = hot('--a--^--b--c--d--e--f--g--|');
var expected = '---u--v--w--x--y--z--|';

var values = {
u: ['b'],
v: ['b', 'c'],
w: ['b', 'c', 'd'],
x: ['b', 'c', 'd', 'e'],
y: ['b', 'c', 'd', 'e', 'f'],
z: ['b', 'c', 'd', 'e', 'f', 'g']
};

var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);

expectObservable(source).toBe(expected, values);
});

it('should handle errors', function () {
var e1 = hot('--a--^--b--c--d--#');
var expected = '---u--v--w--#';

var values = {
u: ['b'],
v: ['b', 'c'],
w: ['b', 'c', 'd']
};

var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);

expectObservable(source).toBe(expected, values);
});

it('should handle errors in the projection function', function () {
var e1 = hot('--a--^--b--c--d--e--f--g--|');
var expected = '---u--v--#';

var values = {
u: ['b'],
v: ['b', 'c']
};

var source = e1.mergeScan(function (acc, x) {
if (x === 'd') {
throw 'bad!';
}
return Observable.of(acc.concat(x));
}, []);

expectObservable(source).toBe(expected, values, 'bad!');
});

it('handle empty', function () {
var e1 = Observable.empty();
var expected = '(u|)';

var values = {
u: []
};

var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);

expectObservable(source).toBe(expected, values);
});

it('handle never', function () {
var e1 = Observable.never();
var expected = '-';

var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);

expectObservable(source).toBe(expected);
});

it('handle throw', function () {
var e1 = Observable.throw('bad!');
var expected = '#';

var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);

expectObservable(source).toBe(expected, undefined, 'bad!');
});

it('should mergeScan unsubscription', function () {
var e1 = hot('--a--^--b--c--d--e--f--g--|');
var expected = '---u--v--w--x--';
var sub = '^ !';
var values = {
u: ['b'],
v: ['b', 'c'],
w: ['b', 'c', 'd'],
x: ['b', 'c', 'd', 'e'],
y: ['b', 'c', 'd', 'e', 'f'],
z: ['b', 'c', 'd', 'e', 'f', 'g']
};

var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []);

expectObservable(source, sub).toBe(expected, values);
});
});
4 changes: 4 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ interface KitchenSinkOperators<T> extends CoreOperators<T> {
max?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
min?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
timeInterval?: <T>(scheduler?: IScheduler) => Observable<T>;
mergeScan?: <T, R>(project: (acc: R, x: T) => Observable<R>, seed: R) => Observable<R>;
}

// operators
Expand Down Expand Up @@ -196,6 +197,9 @@ import mergeMapTo from './operators/mergeMapTo';
observableProto.mergeMapTo = mergeMapTo;
observableProto.flatMapTo = mergeMapTo;

import mergeScan from './operators/extended/mergeScan';
observableProto.mergeScan = mergeScan;

import min from './operators/extended/min';
observableProto.min = min;

Expand Down
91 changes: 91 additions & 0 deletions src/operators/extended/mergeScan.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import Operator from '../../Operator';
import Observable from '../../Observable';
import Subscriber from '../../Subscriber';
import Subscription from '../../Subscription';
import tryCatch from '../../util/tryCatch';
import { errorObject } from '../../util/errorObject';
import subscribeToResult from '../../util/subscribeToResult';
import OuterSubscriber from '../../OuterSubscriber';

export default function mergeScan<T, R>(project: (acc: R, x: T) => Observable<R>, seed: R) {
return this.lift(new MergeScanOperator(project, seed));
}

export class MergeScanOperator<T, R> implements Operator<T, R> {
constructor(private project: (acc: R, x: T) => Observable<R>,
private seed: R,
private concurrent: number = Number.POSITIVE_INFINITY) {
}

call(subscriber: Subscriber<R>): Subscriber<T> {
return new MergeScanSubscriber(
subscriber, this.project, this.seed, this.concurrent
);
}
}

export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
private hasValue: boolean = false;
private hasCompleted: boolean = false;
private buffer: Observable<any>[] = [];
private active: number = 0;
protected index: number = 0;

constructor(destination: Subscriber<R>,
private project: (acc: R, x: T) => Observable<R>,
private acc: R,
private concurrent: number = Number.POSITIVE_INFINITY) {
super(destination);
}

_next(value: any): void {
if (this.active < this.concurrent) {
const index = this.index++;
const ish = tryCatch(this.project)(this.acc, value);
const destination = this.destination;
if (ish === errorObject) {
destination.error(ish.e);
} else {
this.active++;
this._innerSub(ish, value, index);
}
} else {
this.buffer.push(value);
}
}

_innerSub(ish: any, value: T, index: number): void {
this.add(subscribeToResult<T, R>(this, ish, value, index));
}

_complete(): void {
this.hasCompleted = true;
if (this.active === 0 && this.buffer.length === 0) {
if (this.hasValue === false) {
this.destination.next(this.acc);
}
this.destination.complete();
}
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
const { destination } = this;
this.acc = innerValue;
this.hasValue = true;
destination.next(innerValue);
}

notifyComplete(innerSub: Subscription<T>): void {
const buffer = this.buffer;
this.remove(innerSub);
this.active--;
if (buffer.length > 0) {
this._next(buffer.shift());
} else if (this.active === 0 && this.hasCompleted) {
if (this.hasValue === false) {
this.destination.next(this.acc);
}
this.destination.complete();
}
}
}

0 comments on commit 0ebb5bd

Please sign in to comment.