Skip to content

Commit

Permalink
fix(repeat): add additional resubscription behavior
Browse files Browse the repository at this point in the history
- repeat operator now supports resubscription behavior properly along
with infinite repeat

closes #516
  • Loading branch information
kwonoj authored and benlesh committed Oct 16, 2015
1 parent 0cb21e6 commit 4f9f33b
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions src/operators/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,48 @@ import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';
import Subject from '../Subject';
import Subscription from '../Subscription';
import immediate from '../schedulers/immediate';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

export default function repeat<T>(count: number): Observable<T> {
export default function repeat<T>(count: number = -1): Observable<T> {
return this.lift(new RepeatOperator(count, this));
}

class RepeatOperator<T, R> implements Operator<T, R> {
constructor(protected count: number, protected original: Observable<T>) {
constructor(private count: number, private original: Observable<T>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new RepeatSubscriber<T>(subscriber, this.count, this.original);
return new RepeatSubscriber(subscriber, this.count, this.original);
}
}

class RepeatSubscriber<T> extends Subscriber<T> {
private repeated: number = 0;
constructor(destination: Subscriber<T>, public count: number, public original: Observable<T>) {
constructor(destination: Observer<T>, private count: number, private original: Observable<T>) {
super(destination);
this.invalidateRepeat();
}

_complete() {
if (this.count === (this.repeated += 1)) {
private repeatSubscription(): void {
let state = { dest: this.destination, count: this.count, original: this.original };
immediate.scheduleNow(RepeatSubscriber.dispatchSubscription, state);
}

private invalidateRepeat(): Boolean {
let completed = this.count === 0;
if (completed) {
this.destination.complete();
} else {
this.resubscribe();
}
return completed;
}

resubscribe() {
this.original.subscribe(this);
private static dispatchSubscription({ dest, count, original }): void {
return original.subscribe(new RepeatSubscriber(dest, count, original));
}

_complete() {
if (!this.invalidateRepeat()) {
this.count--;
this.repeatSubscription();
}
}
}

0 comments on commit 4f9f33b

Please sign in to comment.