Skip to content

Commit

Permalink
fix(repeat): add additional resubscription behavior
Browse files Browse the repository at this point in the history
- repeat operator now supports resubscription behavior properly along
with infinite repeat
- add, expand test coverage
- update micro perf test case

closes ReactiveX#516
  • Loading branch information
kwonoj committed Oct 15, 2015
1 parent 05ebb07 commit b2d7a4d
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 50 deletions.
20 changes: 0 additions & 20 deletions perf/micro/current-thread-scheduler/operators/repeat.js

This file was deleted.

18 changes: 18 additions & 0 deletions perf/micro/immediate-scheduler/operators/repeat-scalar.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldRepeatWithImmediateScheduler = RxOld.Observable.of(25, RxOld.Scheduler.immediate).repeat(5);
var newRepeatWithImmediateScheduler = RxNew.Observable.of(25).repeat(5);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old repeat(scalar) with immediate scheduler', function () {
oldRepeatWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new repeat(scalar) with immediate scheduler', function () {
newRepeatWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
10 changes: 4 additions & 6 deletions perf/micro/immediate-scheduler/operators/repeat.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,17 @@ var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldRepeatWithImmediateScheduler = RxOld.Observable.of(25, RxOld.Scheduler.immediate)
.repeat(5, RxOld.Scheduler.immediate);
var newRepeatWithImmediateScheduler = RxNew.Observable.of(25)
.repeat(5);
var oldRepeatWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).repeat(5);
var newRepeatWithImmediateScheduler = RxNew.Observable.range(0, 25).repeat(5);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old repeat with immediate scheduler', function () {
.add('old repeat() with immediate scheduler', function () {
oldRepeatWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new repeat with immediate scheduler', function () {
.add('new repeat() with immediate scheduler', function () {
newRepeatWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
160 changes: 152 additions & 8 deletions spec/operators/repeat-spec.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,158 @@
/* globals describe, it, expect */
/* globals describe, it, expect, expectObservable, hot, cold, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.repeat()', function () {
it('should resubscribe count number of times', function (done) {
var expected = [1, 2, 1, 2];
Observable.of(1,2)
.repeat(2)
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
it('should resubscribe count number of times', function () {
var e1 = cold('--a--b--|');
var expected = '--a--b----a--b----a--b--|';

expectObservable(e1.repeat(3)).toBe(expected);
});

it('should resubscribe multiple times', function () {
var e1 = cold('--a--b--|');
var expected = '--a--b----a--b----a--b----a--b--|';

expectObservable(e1.repeat(2).repeat(2)).toBe(expected);
});

it('should complete without emit when count is zero', function () {
var e1 = cold('--a--b--|');
var expected = '|';

expectObservable(e1.repeat(0)).toBe(expected);
});

it('should emit source once when count is one', function () {
var e1 = cold('--a--b--|');
var expected = '--a--b--|';

expectObservable(e1.repeat(1)).toBe(expected);
});

it('should repeat until gets unsubscribed', function () {
var e1 = cold('--a--b--|');
var unsub = '--------------!';
var expected = '--a--b----a--b-';

expectObservable(e1.repeat(10), unsub).toBe(expected);
});

it('should not complete when source never completes', function () {
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.repeat(3)).toBe(expected);
});

it('should not complete when source does not completes', function () {
var e1 = cold('-');
var expected = '-';

expectObservable(e1.repeat(3)).toBe(expected);
});

it('should complete immediately when source does not complete withut emit but count is zero', function () {
var e1 = cold('-');
var expected = '|';

expectObservable(e1.repeat(0)).toBe(expected);
});

it('should complete immediately when source does not complete but count is zero', function () {
var e1 = cold('--a--b--');
var expected = '|';

expectObservable(e1.repeat(0)).toBe(expected);
});

it('should emit source once and does not complete when source emits but does not complete', function () {
var e1 = cold('--a--b--');
var expected = '--a--b--';

expectObservable(e1.repeat(3)).toBe(expected);
});

it('should complete when source is empty', function () {
var e1 = Observable.empty();
var expected = '|';

expectObservable(e1.repeat(3)).toBe(expected);
});

it('should complete when source does not emit', function () {
var e1 = cold('----|');
var expected = '------------|';

expectObservable(e1.repeat(3)).toBe(expected);
});

it('should complete immediately when source does not emit but count is zero', function () {
var e1 = cold('----|');
var expected = '|';

expectObservable(e1.repeat(0)).toBe(expected);
});

it('should raise error when source raises error', function () {
var e1 = cold('--a--b--#');
var expected = '--a--b--#';

expectObservable(e1.repeat(2)).toBe(expected);
});

it('should raises error if source throws', function () {
var e1 = Observable.throw('error');
var expected = '#';

expectObservable(e1.repeat(3)).toBe(expected);
});

it('should raises error if source throws when repeating infinitely', function () {
var e1 = Observable.throw('error');
var expected = '#';

expectObservable(e1.repeat(3)).toBe(expected);
});

it('should terminate repeat and throw if source subscription to _next throws', function () {
var e1 = Observable.of(1, 2, rxTestScheduler);
e1.subscribe(function () { throw new Error('error'); });

expect(function () {
e1.repeat(3);
rxTestScheduler.flush();
}).toThrow();
});

it('should terminate repeat and throw if source subscription to _complete throws', function () {
var e1 = Observable.of(1, 2, rxTestScheduler);
e1.subscribe(function () {}, function () {}, function () { throw new Error('error'); });

expect(function () {
e1.repeat(3);
rxTestScheduler.flush();
}).toThrow();
});

it('should terminate repeat and throw if source subscription to _next throws when repeating infinitely', function () {
var e1 = Observable.of(1, 2, rxTestScheduler);
e1.subscribe(function () { throw new Error('error'); });

expect(function () {
e1.repeat();
rxTestScheduler.flush();
}).toThrow();
});

it('should terminate repeat and throw if source subscription to _complete throws when repeating infinitely', function () {
var e1 = Observable.of(1, 2, rxTestScheduler);
e1.subscribe(function () {}, function () {}, function () { throw new Error('error'); });

expect(function () {
e1.repeat();
rxTestScheduler.flush();
}).toThrow();
});
});
40 changes: 24 additions & 16 deletions src/operators/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,48 @@ import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import Subject from '../Subject';
import Subscription from '../Subscription';
import immediate from '../schedulers/immediate';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

export default function repeat<T>(count: number): Observable<T> {
export default function repeat<T>(count: number = -1): Observable<T> {
return this.lift(new RepeatOperator(count, this));
}

class RepeatOperator<T, R> implements Operator<T, R> {
constructor(protected count: number, protected original: Observable<T>) {
constructor(private count: number, private original: Observable<T>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new RepeatSubscriber<T>(subscriber, this.count, this.original);
return new RepeatSubscriber(subscriber, this.count, this.original);
}
}

class RepeatSubscriber<T> extends Subscriber<T> {
private repeated: number = 0;
constructor(destination: Subscriber<T>, public count: number, public original: Observable<T>) {
constructor(destination: Observer<T>, private count: number, private original: Observable<T>) {
super(destination);
this.invalidateRepeat();
}

_complete() {
if (this.count === (this.repeated += 1)) {
private repeatSubscription(): void {
let state = { dest: this.destination, count: this.count, original: this.original };
immediate.scheduleNow(RepeatSubscriber.dispatchSubscription, state);
}

private invalidateRepeat(): Boolean {
let completed = this.count === 0;
if (completed) {
this.destination.complete();
} else {
this.resubscribe();
}
return completed;
}

resubscribe() {
this.original.subscribe(this);
private static dispatchSubscription({ dest, count, original }): void {
return original.subscribe(new RepeatSubscriber(dest, count, original));
}

_complete() {
if (!this.invalidateRepeat()) {
this.count--;
this.repeatSubscription();
}
}
}

0 comments on commit b2d7a4d

Please sign in to comment.