diff --git a/spec/operators/windowTime-spec.ts b/spec/operators/windowTime-spec.ts index 5aff47d00c8..ea0ef27a971 100644 --- a/spec/operators/windowTime-spec.ts +++ b/spec/operators/windowTime-spec.ts @@ -18,7 +18,7 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '--a--(b|) '); const y = cold( '-d--e| '); const z = cold( '-g--h| '); - const values = { x: x, y: y, z: z }; + const values = { x, y, z }; const result = source.windowTime(50, 100, rxTestScheduler); @@ -26,6 +26,43 @@ describe('Observable.prototype.windowTime', () => { expectSubscriptions(source.subscriptions).toBe(subs); }); + it('should close windows after max count is reached', () => { + const source = hot('--1--2--^--a--b--c--d--e--f--g-----|'); + const subs = '^ !'; + const timeSpan = time( '----------|'); + // 100 frames 0---------1---------2------| + const expected = 'x---------y---------z------|'; + const x = cold( '---a--(b|) '); + const y = cold( '--d--(e|) '); + const z = cold( '-g-----|'); + const values = { x, y, z }; + + const result = source.windowTime(timeSpan, null, 2, rxTestScheduler); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should close window after max count is reached with' + + 'windowCreationInterval', () => { + const source = hot('--1--2--^-a--b--c--de-f---g--h--i-|'); + const subs = '^ !'; + // 100 frames 0---------1---------2-----| + // 50 ----| + // 50 ----| + // 50 ----| + const expected = 'x---------y---------z-----|'; + const x = cold( '--a--(b|) '); + const y = cold( '-de-(f|) '); + const z = cold( '-h--i| '); + const values = { x, y, z }; + + const result = source.windowTime(50, 100, 3, rxTestScheduler); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + it('should emit windows given windowTimeSpan', () => { const source = hot('--1--2--^--a--b--c--d--e--f--g--h--|'); const subs = '^ !'; @@ -35,9 +72,9 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '---a--b--c| '); const y = cold( '--d--e--f-| '); const z = cold( '-g--h--|'); - const values = { x: x, y: y, z: z }; + const values = { x, y, z }; - const result = source.windowTime(timeSpan, null, rxTestScheduler); + const result = source.windowTime(timeSpan, rxTestScheduler); expectObservable(result).toBe(expected, values); expectSubscriptions(source.subscriptions).toBe(subs); @@ -56,7 +93,7 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '---a-| '); const y = cold( '--d--(e|) '); const z = cold( '-g--h| '); - const values = { x: x, y: y, z: z }; + const values = { x, y, z }; const result = source.windowTime(timeSpan, interval, rxTestScheduler); @@ -69,7 +106,7 @@ describe('Observable.prototype.windowTime', () => { const subs = '(^!)'; const expected = '(w|)'; const w = cold('|'); - const expectedValues = { w: w }; + const expectedValues = { w }; const timeSpan = time('-----|'); const interval = time('----------|'); @@ -84,7 +121,7 @@ describe('Observable.prototype.windowTime', () => { const subs = '(^!)'; const expected = '(w|)'; const w = cold('(a|)'); - const expectedValues = { w: w }; + const expectedValues = { w }; const timeSpan = time('-----|'); const interval = time('----------|'); @@ -105,7 +142,7 @@ describe('Observable.prototype.windowTime', () => { const c = cold( '---| '); const d = cold( '--'); const unsub = ' !'; - const expectedValues = { a: a, b: b, c: c, d: d }; + const expectedValues = { a, b, c, d }; const result = source.windowTime(timeSpan, interval, rxTestScheduler); @@ -118,7 +155,7 @@ describe('Observable.prototype.windowTime', () => { const subs = '(^!)'; const expected = '(w#)'; const w = cold('#'); - const expectedValues = { w: w }; + const expectedValues = { w }; const timeSpan = time('-----|'); const interval = time('----------|'); @@ -141,7 +178,7 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '---a-| '); const y = cold( '--d--(e|) '); const z = cold( '-g--h| '); - const values = { x: x, y: y, z: z }; + const values = { x, y, z }; const result = source.windowTime(timeSpan, interval, rxTestScheduler); @@ -163,7 +200,7 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '---a-| '); const y = cold( '-- '); const unsub = ' ! '; - const values = { x: x, y: y }; + const values = { x, y }; const result = source.windowTime(timeSpan, interval, rxTestScheduler); @@ -184,7 +221,7 @@ describe('Observable.prototype.windowTime', () => { const x = cold( '---a-| '); const y = cold( '--d-- '); const unsub = ' ! '; - const values = { x: x, y: y }; + const values = { x, y }; const result = source .mergeMap((x: string) => Observable.of(x)) diff --git a/src/operator/windowTime.ts b/src/operator/windowTime.ts index 3c563ee719a..9b8e616eca8 100644 --- a/src/operator/windowTime.ts +++ b/src/operator/windowTime.ts @@ -6,6 +6,8 @@ import { async } from '../scheduler/async'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; import { Subscription } from '../Subscription'; +import { isNumeric } from '../util/isNumeric'; +import { isScheduler } from '../util/isScheduler'; /** * Branch out the source Observable values as a nested Observable periodically @@ -24,7 +26,10 @@ import { Subscription } from '../Subscription'; * emits the current window and propagates the notification from the source * Observable. If `windowCreationInterval` is not provided, the output * Observable starts a new window when the previous window of duration - * `windowTimeSpan` completes. + * `windowTimeSpan` completes. If `maxWindowCount` is provided, each window + * will emit at most fixed number of values. Window will complete immediately + * after emitting last value and next one still will open as specified by + * `windowTimeSpan` and `windowCreationInterval` arguments. * * @example In every window of 1 second each, emit at most 2 click events * var clicks = Rx.Observable.fromEvent(document, 'click'); @@ -40,6 +45,12 @@ import { Subscription } from '../Subscription'; * .mergeAll(); // flatten the Observable-of-Observables * result.subscribe(x => console.log(x)); * + * @example Same as example above but with maxWindowCount instead of take + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.windowTime(1000, 5000, 2) // each window has still at most 2 emissions + * .mergeAll(); // flatten the Observable-of-Observables + * result.subscribe(x => console.log(x)); + * @see {@link window} * @see {@link windowCount} * @see {@link windowToggle} @@ -49,6 +60,8 @@ import { Subscription } from '../Subscription'; * @param {number} windowTimeSpan The amount of time to fill each window. * @param {number} [windowCreationInterval] The interval at which to start new * windows. + * @param {number} [maxWindowSize=Number.POSITIVE_INFINITY] Max number of + * values each window can emit before completion. * @param {Scheduler} [scheduler=async] The scheduler on which to schedule the * intervals that determine window boundaries. * @return {Observable>} An observable of windows, which in turn @@ -57,21 +70,52 @@ import { Subscription } from '../Subscription'; * @owner Observable */ export function windowTime(this: Observable, windowTimeSpan: number, - windowCreationInterval: number = null, - scheduler: IScheduler = async): Observable> { - return this.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, scheduler)); + scheduler?: IScheduler): Observable>; +export function windowTime(this: Observable, windowTimeSpan: number, + windowCreationInterval: number, + scheduler?: IScheduler): Observable>; +export function windowTime(this: Observable, windowTimeSpan: number, + windowCreationInterval: number, + maxWindowSize: number, + scheduler?: IScheduler): Observable>; + +export function windowTime(this: Observable, + windowTimeSpan: number): Observable> { + + let scheduler: IScheduler = async; + let windowCreationInterval: number = null; + let maxWindowSize: number = Number.POSITIVE_INFINITY; + + if (isScheduler(arguments[3])) { + scheduler = arguments[3]; + } + + if (isScheduler(arguments[2])) { + scheduler = arguments[2]; + } else if (isNumeric(arguments[2])) { + maxWindowSize = arguments[2]; + } + + if (isScheduler(arguments[1])) { + scheduler = arguments[1]; + } else if (isNumeric(arguments[1])) { + windowCreationInterval = arguments[1]; + } + + return this.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, maxWindowSize, scheduler)); } class WindowTimeOperator implements Operator> { constructor(private windowTimeSpan: number, - private windowCreationInterval: number, + private windowCreationInterval: number | null, + private maxWindowSize: number, private scheduler: IScheduler) { } call(subscriber: Subscriber>, source: any): any { - return source.subscribe(new WindowTimeSubscriber( - subscriber, this.windowTimeSpan, this.windowCreationInterval, this.scheduler + return source._subscribe(new WindowTimeSubscriber( + subscriber, this.windowTimeSpan, this.windowCreationInterval, this.maxWindowSize, this.scheduler )); } } @@ -84,7 +128,7 @@ interface CreationState { } interface TimeSpanOnlyState { - window: Subject; + window: CountedSubject; windowTimeSpan: number; subscriber: WindowTimeSubscriber; } @@ -96,21 +140,39 @@ interface CloseWindowContext { interface CloseState { subscriber: WindowTimeSubscriber; - window: Subject; + window: CountedSubject; context: CloseWindowContext; } +class CountedSubject extends Subject { + private numberOfNextedValues: number; + constructor() { + super(); + this.numberOfNextedValues = 0; + } + + next(value?: T) { + this.numberOfNextedValues++; + super.next(value); + } + + getNumberOfNextedValues() { + return this.numberOfNextedValues; + } +} + /** * We need this JSDoc comment for affecting ESDoc. * @ignore * @extends {Ignored} */ class WindowTimeSubscriber extends Subscriber { - private windows: Array> = []; + private windows: CountedSubject[] = []; constructor(protected destination: Subscriber>, private windowTimeSpan: number, - private windowCreationInterval: number, + private windowCreationInterval: number | null, + private maxWindowSize: number, private scheduler: IScheduler) { super(destination); @@ -133,6 +195,9 @@ class WindowTimeSubscriber extends Subscriber { const window = windows[i]; if (!window.closed) { window.next(value); + if (window.getNumberOfNextedValues() >= this.maxWindowSize) { + this.closeWindow(window); + } } } } @@ -156,15 +221,15 @@ class WindowTimeSubscriber extends Subscriber { this.destination.complete(); } - public openWindow(): Subject { - const window = new Subject(); + public openWindow(): CountedSubject { + const window = new CountedSubject(); this.windows.push(window); const destination = this.destination; destination.next(window); return window; } - public closeWindow(window: Subject): void { + public closeWindow(window: CountedSubject): void { window.complete(); const windows = this.windows; windows.splice(windows.indexOf(window), 1);