diff --git a/perf/micro/current-thread-scheduler/operators/repeat.js b/perf/micro/current-thread-scheduler/operators/repeat.js deleted file mode 100644 index 10db36c4d62..00000000000 --- a/perf/micro/current-thread-scheduler/operators/repeat.js +++ /dev/null @@ -1,20 +0,0 @@ -var RxOld = require('rx'); -var RxNew = require('../../../../index'); - -module.exports = function (suite) { - var oldRepeatWithCurrentThreadScheduler = RxOld.Observable.of(25, RxOld.Scheduler.currentThread) - .repeat(5, RxOld.Scheduler.currentThread); - var newRepeatWithCurrentThreadScheduler = RxNew.Observable.of(25, RxNew.Scheduler.immediate) - .repeat(5, RxNew.Scheduler.immediate); - - function _next(x) { } - function _error(e) { } - function _complete() { } - return suite - .add('old repeat with current thread scheduler', function () { - oldRepeatWithCurrentThreadScheduler.subscribe(_next, _error, _complete); - }) - .add('new repeat with current thread scheduler', function () { - newRepeatWithCurrentThreadScheduler.subscribe(_next, _error, _complete); - }); -}; \ No newline at end of file diff --git a/perf/micro/immediate-scheduler/operators/repeat-scalar.js b/perf/micro/immediate-scheduler/operators/repeat-scalar.js new file mode 100644 index 00000000000..02d2d244b1e --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/repeat-scalar.js @@ -0,0 +1,18 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldRepeatWithImmediateScheduler = RxOld.Observable.of(25, RxOld.Scheduler.immediate).repeat(5); + var newRepeatWithImmediateScheduler = RxNew.Observable.of(25).repeat(5); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old repeat(scalar) with immediate scheduler', function () { + oldRepeatWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new repeat(scalar) with immediate scheduler', function () { + newRepeatWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; \ No newline at end of file diff --git a/perf/micro/immediate-scheduler/operators/repeat.js b/perf/micro/immediate-scheduler/operators/repeat.js index ef23c4e5a6c..3c25e279ba8 100644 --- a/perf/micro/immediate-scheduler/operators/repeat.js +++ b/perf/micro/immediate-scheduler/operators/repeat.js @@ -2,19 +2,17 @@ var RxOld = require('rx'); var RxNew = require('../../../../index'); module.exports = function (suite) { - var oldRepeatWithImmediateScheduler = RxOld.Observable.of(25, RxOld.Scheduler.immediate) - .repeat(5, RxOld.Scheduler.immediate); - var newRepeatWithImmediateScheduler = RxNew.Observable.of(25) - .repeat(5); + var oldRepeatWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).repeat(5); + var newRepeatWithImmediateScheduler = RxNew.Observable.range(0, 25).repeat(5); function _next(x) { } function _error(e) { } function _complete() { } return suite - .add('old repeat with immediate scheduler', function () { + .add('old repeat() with immediate scheduler', function () { oldRepeatWithImmediateScheduler.subscribe(_next, _error, _complete); }) - .add('new repeat with immediate scheduler', function () { + .add('new repeat() with immediate scheduler', function () { newRepeatWithImmediateScheduler.subscribe(_next, _error, _complete); }); }; \ No newline at end of file diff --git a/spec/operators/repeat-spec.js b/spec/operators/repeat-spec.js index 02f09d43544..c0ff8b33f9e 100644 --- a/spec/operators/repeat-spec.js +++ b/spec/operators/repeat-spec.js @@ -1,14 +1,158 @@ -/* globals describe, it, expect */ +/* globals describe, it, expect, expectObservable, hot, cold, rxTestScheduler */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; describe('Observable.prototype.repeat()', function () { - it('should resubscribe count number of times', function (done) { - var expected = [1, 2, 1, 2]; - Observable.of(1,2) - .repeat(2) - .subscribe(function (x) { - expect(x).toBe(expected.shift()); - }, null, done); + it('should resubscribe count number of times', function () { + var e1 = cold('--a--b--|'); + var expected = '--a--b----a--b----a--b--|'; + + expectObservable(e1.repeat(3)).toBe(expected); + }); + + it('should resubscribe multiple times', function () { + var e1 = cold('--a--b--|'); + var expected = '--a--b----a--b----a--b----a--b--|'; + + expectObservable(e1.repeat(2).repeat(2)).toBe(expected); + }); + + it('should complete without emit when count is zero', function () { + var e1 = cold('--a--b--|'); + var expected = '|'; + + expectObservable(e1.repeat(0)).toBe(expected); + }); + + it('should emit source once when count is one', function () { + var e1 = cold('--a--b--|'); + var expected = '--a--b--|'; + + expectObservable(e1.repeat(1)).toBe(expected); + }); + + it('should repeat until gets unsubscribed', function () { + var e1 = cold('--a--b--|'); + var unsub = '--------------!'; + var expected = '--a--b----a--b-'; + + expectObservable(e1.repeat(10), unsub).toBe(expected); + }); + + it('should not complete when source never completes', function () { + var e1 = Observable.never(); + var expected = '-'; + + expectObservable(e1.repeat(3)).toBe(expected); + }); + + it('should not complete when source does not completes', function () { + var e1 = cold('-'); + var expected = '-'; + + expectObservable(e1.repeat(3)).toBe(expected); + }); + + it('should complete immediately when source does not complete withut emit but count is zero', function () { + var e1 = cold('-'); + var expected = '|'; + + expectObservable(e1.repeat(0)).toBe(expected); + }); + + it('should complete immediately when source does not complete but count is zero', function () { + var e1 = cold('--a--b--'); + var expected = '|'; + + expectObservable(e1.repeat(0)).toBe(expected); + }); + + it('should emit source once and does not complete when source emits but does not complete', function () { + var e1 = cold('--a--b--'); + var expected = '--a--b--'; + + expectObservable(e1.repeat(3)).toBe(expected); + }); + + it('should complete when source is empty', function () { + var e1 = Observable.empty(); + var expected = '|'; + + expectObservable(e1.repeat(3)).toBe(expected); + }); + + it('should complete when source does not emit', function () { + var e1 = cold('----|'); + var expected = '------------|'; + + expectObservable(e1.repeat(3)).toBe(expected); + }); + + it('should complete immediately when source does not emit but count is zero', function () { + var e1 = cold('----|'); + var expected = '|'; + + expectObservable(e1.repeat(0)).toBe(expected); + }); + + it('should raise error when source raises error', function () { + var e1 = cold('--a--b--#'); + var expected = '--a--b--#'; + + expectObservable(e1.repeat(2)).toBe(expected); + }); + + it('should raises error if source throws', function () { + var e1 = Observable.throw('error'); + var expected = '#'; + + expectObservable(e1.repeat(3)).toBe(expected); + }); + + it('should raises error if source throws when repeating infinitely', function () { + var e1 = Observable.throw('error'); + var expected = '#'; + + expectObservable(e1.repeat(3)).toBe(expected); + }); + + it('should terminate repeat and throw if source subscription to _next throws', function () { + var e1 = Observable.of(1, 2, rxTestScheduler); + e1.subscribe(function () { throw new Error('error'); }); + + expect(function () { + e1.repeat(3); + rxTestScheduler.flush(); + }).toThrow(); + }); + + it('should terminate repeat and throw if source subscription to _complete throws', function () { + var e1 = Observable.of(1, 2, rxTestScheduler); + e1.subscribe(function () {}, function () {}, function () { throw new Error('error'); }); + + expect(function () { + e1.repeat(3); + rxTestScheduler.flush(); + }).toThrow(); + }); + + it('should terminate repeat and throw if source subscription to _next throws when repeating infinitely', function () { + var e1 = Observable.of(1, 2, rxTestScheduler); + e1.subscribe(function () { throw new Error('error'); }); + + expect(function () { + e1.repeat(); + rxTestScheduler.flush(); + }).toThrow(); + }); + + it('should terminate repeat and throw if source subscription to _complete throws when repeating infinitely', function () { + var e1 = Observable.of(1, 2, rxTestScheduler); + e1.subscribe(function () {}, function () {}, function () { throw new Error('error'); }); + + expect(function () { + e1.repeat(); + rxTestScheduler.flush(); + }).toThrow(); }); }); \ No newline at end of file diff --git a/src/operators/repeat.ts b/src/operators/repeat.ts index eac062fd112..befff15e834 100644 --- a/src/operators/repeat.ts +++ b/src/operators/repeat.ts @@ -2,40 +2,48 @@ import Operator from '../Operator'; import Observer from '../Observer'; import Subscriber from '../Subscriber'; import Observable from '../Observable'; -import Subject from '../Subject'; -import Subscription from '../Subscription'; +import immediate from '../schedulers/immediate'; -import tryCatch from '../util/tryCatch'; -import {errorObject} from '../util/errorObject'; - -export default function repeat(count: number): Observable { +export default function repeat(count: number = -1): Observable { return this.lift(new RepeatOperator(count, this)); } class RepeatOperator implements Operator { - constructor(protected count: number, protected original: Observable) { + constructor(private count: number, private original: Observable) { } call(subscriber: Subscriber): Subscriber { - return new RepeatSubscriber(subscriber, this.count, this.original); + return new RepeatSubscriber(subscriber, this.count, this.original); } } class RepeatSubscriber extends Subscriber { - private repeated: number = 0; - constructor(destination: Subscriber, public count: number, public original: Observable) { + constructor(destination: Observer, private count: number, private original: Observable) { super(destination); + this.invalidateRepeat(); } - _complete() { - if (this.count === (this.repeated += 1)) { + private repeatSubscription(): void { + let state = { dest: this.destination, count: this.count, original: this.original }; + immediate.scheduleNow(RepeatSubscriber.dispatchSubscription, state); + } + + private invalidateRepeat(): Boolean { + let completed = this.count === 0; + if (completed) { this.destination.complete(); - } else { - this.resubscribe(); } + return completed; } - resubscribe() { - this.original.subscribe(this); + private static dispatchSubscription({ dest, count, original }): void { + return original.subscribe(new RepeatSubscriber(dest, count, original)); + } + + _complete() { + if (!this.invalidateRepeat()) { + this.count--; + this.repeatSubscription(); + } } }