diff --git a/spec/operators/mergeScan-spec.js b/spec/operators/mergeScan-spec.js new file mode 100644 index 0000000000..9a16bd8b21 --- /dev/null +++ b/spec/operators/mergeScan-spec.js @@ -0,0 +1,106 @@ +/* globals describe, it, expect, expectObservable, hot */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.mergeScan()', function () { + it('should mergeScan things', function () { + var e1 = hot('--a--^--b--c--d--e--f--g--|'); + var expected = '---u--v--w--x--y--z--|'; + + var values = { + u: ['b'], + v: ['b', 'c'], + w: ['b', 'c', 'd'], + x: ['b', 'c', 'd', 'e'], + y: ['b', 'c', 'd', 'e', 'f'], + z: ['b', 'c', 'd', 'e', 'f', 'g'] + }; + + var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []); + + expectObservable(source).toBe(expected, values); + }); + + it('should handle errors', function () { + var e1 = hot('--a--^--b--c--d--#'); + var expected = '---u--v--w--#'; + + var values = { + u: ['b'], + v: ['b', 'c'], + w: ['b', 'c', 'd'] + }; + + var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []); + + expectObservable(source).toBe(expected, values); + }); + + it('should handle errors in the projection function', function () { + var e1 = hot('--a--^--b--c--d--e--f--g--|'); + var expected = '---u--v--#'; + + var values = { + u: ['b'], + v: ['b', 'c'] + }; + + var source = e1.mergeScan(function (acc, x) { + if (x === 'd') { + throw 'bad!'; + } + return Observable.of(acc.concat(x)); + }, []); + + expectObservable(source).toBe(expected, values, 'bad!'); + }); + + it('handle empty', function () { + var e1 = Observable.empty(); + var expected = '(u|)'; + + var values = { + u: [] + }; + + var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []); + + expectObservable(source).toBe(expected, values); + }); + + it('handle never', function () { + var e1 = Observable.never(); + var expected = '-'; + + var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []); + + expectObservable(source).toBe(expected); + }); + + it('handle throw', function () { + var e1 = Observable.throw('bad!'); + var expected = '#'; + + var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []); + + expectObservable(source).toBe(expected, undefined, 'bad!'); + }); + + it('should mergeScan unsubscription', function () { + var e1 = hot('--a--^--b--c--d--e--f--g--|'); + var expected = '---u--v--w--x--'; + var sub = '^ !'; + var values = { + u: ['b'], + v: ['b', 'c'], + w: ['b', 'c', 'd'], + x: ['b', 'c', 'd', 'e'], + y: ['b', 'c', 'd', 'e', 'f'], + z: ['b', 'c', 'd', 'e', 'f', 'g'] + }; + + var source = e1.mergeScan(function (acc, x) { return Observable.of(acc.concat(x)); }, []); + + expectObservable(source, sub).toBe(expected, values); + }); +}); diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 773c550538..280b8bb91f 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -12,6 +12,7 @@ interface KitchenSinkOperators extends CoreOperators { max?: (comparer?: (x: R, y: T) => R) => Observable; min?: (comparer?: (x: R, y: T) => R) => Observable; timeInterval?: (scheduler?: IScheduler) => Observable; + mergeScan?: (project: (acc: R, x: T) => Observable, seed: R) => Observable; } // operators @@ -196,6 +197,9 @@ import mergeMapTo from './operators/mergeMapTo'; observableProto.mergeMapTo = mergeMapTo; observableProto.flatMapTo = mergeMapTo; +import mergeScan from './operators/extended/mergeScan'; +observableProto.mergeScan = mergeScan; + import min from './operators/extended/min'; observableProto.min = min; diff --git a/src/operators/extended/mergeScan.ts b/src/operators/extended/mergeScan.ts new file mode 100644 index 0000000000..3118fdcdaa --- /dev/null +++ b/src/operators/extended/mergeScan.ts @@ -0,0 +1,91 @@ +import Operator from '../../Operator'; +import Observable from '../../Observable'; +import Subscriber from '../../Subscriber'; +import Subscription from '../../Subscription'; +import tryCatch from '../../util/tryCatch'; +import { errorObject } from '../../util/errorObject'; +import subscribeToResult from '../../util/subscribeToResult'; +import OuterSubscriber from '../../OuterSubscriber'; + +export default function mergeScan(project: (acc: R, x: T) => Observable, seed: R) { + return this.lift(new MergeScanOperator(project, seed)); +} + +export class MergeScanOperator implements Operator { + constructor(private project: (acc: R, x: T) => Observable, + private seed: R, + private concurrent: number = Number.POSITIVE_INFINITY) { + } + + call(subscriber: Subscriber): Subscriber { + return new MergeScanSubscriber( + subscriber, this.project, this.seed, this.concurrent + ); + } +} + +export class MergeScanSubscriber extends OuterSubscriber { + private hasValue: boolean = false; + private hasCompleted: boolean = false; + private buffer: Observable[] = []; + private active: number = 0; + protected index: number = 0; + + constructor(destination: Subscriber, + private project: (acc: R, x: T) => Observable, + private acc: R, + private concurrent: number = Number.POSITIVE_INFINITY) { + super(destination); + } + + _next(value: any): void { + if (this.active < this.concurrent) { + const index = this.index++; + const ish = tryCatch(this.project)(this.acc, value); + const destination = this.destination; + if (ish === errorObject) { + destination.error(ish.e); + } else { + this.active++; + this._innerSub(ish, value, index); + } + } else { + this.buffer.push(value); + } + } + + _innerSub(ish: any, value: T, index: number): void { + this.add(subscribeToResult(this, ish, value, index)); + } + + _complete(): void { + this.hasCompleted = true; + if (this.active === 0 && this.buffer.length === 0) { + if (this.hasValue === false) { + this.destination.next(this.acc); + } + this.destination.complete(); + } + } + + notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + const { destination } = this; + this.acc = innerValue; + this.hasValue = true; + destination.next(innerValue); + } + + notifyComplete(innerSub: Subscription): void { + const buffer = this.buffer; + this.remove(innerSub); + this.active--; + if (buffer.length > 0) { + this._next(buffer.shift()); + } else if (this.active === 0 && this.hasCompleted) { + if (this.hasValue === false) { + this.destination.next(this.acc); + } + this.destination.complete(); + } + } +}