Skip to content

Commit

Permalink
refactor(operators): Update operators per new Subscription chaining.
Browse files Browse the repository at this point in the history
Updates most operators to correctly extend the new Subscriber implementation. Refactors some operators that track inner Subscriptions to extend OuterSubscriber. Updates the test-helper to ensure all tests are run asynchronously, as Jasmine has trouble with the boundaries between synchronous and asynchronous tests.
  • Loading branch information
trxcllnt authored and benlesh committed Dec 29, 2015
1 parent 5390fbc commit 981da85
Show file tree
Hide file tree
Showing 25 changed files with 433 additions and 672 deletions.
38 changes: 32 additions & 6 deletions spec/helpers/test-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,23 @@ var glit = global.it;

global.it = function (description, cb, timeout) {
if (cb.length === 0) {
glit(description, function () {
glit(description, function (done) {
global.rxTestScheduler = new Rx.TestScheduler(assertDeepEqual);
cb();
global.rxTestScheduler.flush();
var error;
var errorHappened = false;
try {
cb();
global.rxTestScheduler.flush();
} catch (e) {
errorHappened = true;
error = e;
} finally {
if (errorHappened) {
setTimeout(function () { done.fail(error); });
} else {
setTimeout(function () { done(); });
}
}
});
} else {
glit.apply(this, arguments);
Expand All @@ -39,10 +52,23 @@ var glfit = global.fit;

global.fit = function (description, cb, timeout) {
if (cb.length === 0) {
glfit(description, function () {
glfit(description, function (done) {
global.rxTestScheduler = new Rx.TestScheduler(assertDeepEqual);
cb();
global.rxTestScheduler.flush();
var error;
var errorHappened = false;
try {
cb();
global.rxTestScheduler.flush();
} catch (e) {
errorHappened = true;
error = e;
} finally {
if (errorHappened) {
setTimeout(function () { done.fail(error); });
} else {
setTimeout(function () { done(); });
}
}
});
} else {
glfit.apply(this, arguments);
Expand Down
41 changes: 6 additions & 35 deletions src/operator/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';

import {OuterSubscriber} from '../OuterSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
* buffers the incoming observable values until the passed `closingNotifier` emits a value, at which point
* it emits the buffer on the returned observable and starts a new buffer internally, awaiting the
Expand All @@ -24,53 +27,21 @@ class BufferOperator<T, R> implements Operator<T, R> {
}
}

class BufferSubscriber<T> extends Subscriber<T> {
class BufferSubscriber<T, R> extends OuterSubscriber<T, R> {
private buffer: T[] = [];
private notifierSubscriber: BufferClosingNotifierSubscriber<any> = null;

constructor(destination: Subscriber<T>, closingNotifier: Observable<any>) {
super(destination);
this.notifierSubscriber = new BufferClosingNotifierSubscriber(this);
this.add(closingNotifier._subscribe(this.notifierSubscriber));
this.add(subscribeToResult(this, closingNotifier));
}

_next(value: T) {
this.buffer.push(value);
}

_error(err: any) {
this.destination.error(err);
}

_complete() {
this.destination.complete();
}

flushBuffer() {
notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
const buffer = this.buffer;
this.buffer = [];
this.destination.next(buffer);

if (this.isUnsubscribed) {
this.notifierSubscriber.unsubscribe();
}
}
}

class BufferClosingNotifierSubscriber<T> extends Subscriber<T> {
constructor(private parent: BufferSubscriber<any>) {
super(null);
}

_next(value: T) {
this.parent.flushBuffer();
}

_error(err: any) {
this.parent.error(err);
}

_complete() {
this.parent.complete();
}
}
6 changes: 1 addition & 5 deletions src/operator/bufferCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ class BufferCountSubscriber<T> extends Subscriber<T> {
}
}

_error(err: any) {
this.destination.error(err);
}

_complete() {
const destination = this.destination;
const buffers = this.buffers;
Expand All @@ -72,6 +68,6 @@ class BufferCountSubscriber<T> extends Subscriber<T> {
destination.next(buffer);
}
}
destination.complete();
super._complete();
}
}
12 changes: 8 additions & 4 deletions src/operator/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,19 @@ class BufferTimeSubscriber<T> extends Subscriber<T> {

_error(err) {
this.buffers.length = 0;
this.destination.error(err);
super._error(err);
}

_complete() {
const buffers = this.buffers;
const { buffers, destination } = this;
while (buffers.length > 0) {
this.destination.next(buffers.shift());
destination.next(buffers.shift());
}
this.destination.complete();
super._complete();
}

_unsubscribe() {
this.buffers = null;
}

openBuffer(): T[] {
Expand Down
8 changes: 4 additions & 4 deletions src/operator/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
private openings: Observable<O>,
private closingSelector: (openValue: O) => Observable<any>) {
super(destination);
this.add(this.openings._subscribe(new BufferToggleOpeningsSubscriber(this)));
this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this)));
}

_next(value: T) {
Expand All @@ -61,7 +61,7 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
context.subscription = null;
}
this.contexts = null;
this.destination.error(err);
super._error(err);
}

_complete() {
Expand All @@ -74,7 +74,7 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
context.subscription = null;
}
this.contexts = null;
this.destination.complete();
super._complete();
}

openBuffer(value: O) {
Expand All @@ -91,7 +91,7 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
};
contexts.push(context);
const subscriber = new BufferToggleClosingsSubscriber(this, context);
const subscription = closingNotifier._subscribe(subscriber);
const subscription = closingNotifier.subscribe(subscriber);
context.subscription.add(subscription);
this.add(subscription);
}
Expand Down
80 changes: 43 additions & 37 deletions src/operator/bufferWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

import {OuterSubscriber} from '../OuterSubscriber';
import {subscribeToResult} from '../util/subscribeToResult';

/**
* Opens a buffer immediately, then closes the buffer when the observable returned by calling `closingSelector` emits a value.
* It that immediately opens a new buffer and repeats the process
Expand All @@ -25,9 +28,10 @@ class BufferWhenOperator<T, R> implements Operator<T, R> {
}
}

class BufferWhenSubscriber<T> extends Subscriber<T> {
class BufferWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
private buffer: T[];
private closingNotification: Subscription;
private subscribing: boolean = false;
private closingSubscription: Subscription;

constructor(destination: Subscriber<T>, private closingSelector: () => Observable<any>) {
super(destination);
Expand All @@ -38,56 +42,58 @@ class BufferWhenSubscriber<T> extends Subscriber<T> {
this.buffer.push(value);
}

_error(err: any) {
this.buffer = null;
this.destination.error(err);
}

_complete() {
const buffer = this.buffer;
this.destination.next(buffer);
if (buffer) {
this.destination.next(buffer);
}
super._complete();
}

_unsubscribe() {
this.buffer = null;
this.destination.complete();
this.subscribing = false;
}

notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
this.openBuffer();
}

notifyComplete(): void {
if (this.subscribing) {
this.complete();
} else {
this.openBuffer();
}
}

openBuffer() {
const prevClosingNotification = this.closingNotification;
if (prevClosingNotification) {
this.remove(prevClosingNotification);
prevClosingNotification.unsubscribe();

let { closingSubscription } = this;

if (closingSubscription) {
this.remove(closingSubscription);
closingSubscription.unsubscribe();
}

const buffer = this.buffer;
if (buffer) {
if (this.buffer) {
this.destination.next(buffer);
}

this.buffer = [];

let closingNotifier = tryCatch(this.closingSelector)();
const closingNotifier = tryCatch(this.closingSelector)();

if (closingNotifier === errorObject) {
const err = closingNotifier.e;
this.buffer = null;
this.destination.error(err);
this.error(errorObject.e);
} else {
this.add(this.closingNotification = closingNotifier._subscribe(new BufferClosingNotifierSubscriber(this)));
closingSubscription = new Subscription();
this.closingSubscription = closingSubscription;
this.add(closingSubscription);
this.subscribing = true;
closingSubscription.add(subscribeToResult(this, closingNotifier));
this.subscribing = false;
}
}
}

class BufferClosingNotifierSubscriber<T> extends Subscriber<T> {
constructor(private parent: BufferWhenSubscriber<any>) {
super(null);
}

_next() {
this.parent.openBuffer();
}

_error(err) {
this.parent.error(err);
}

_complete() {
this.parent.openBuffer();
}
}
48 changes: 17 additions & 31 deletions src/operator/catch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Subscription} from '../Subscription';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

Expand All @@ -14,10 +13,9 @@ import {errorObject} from '../util/errorObject';
* catch `selector` function.
*/
export function _catch<T>(selector: (err: any, caught: Observable<any>) => Observable<any>): Observable<T> {
let catchOperator = new CatchOperator(selector);
let caught = this.lift(catchOperator);
catchOperator.caught = caught;
return caught;
const operator = new CatchOperator(selector);
const caught = this.lift(operator);
return (operator.caught = caught);
}

class CatchOperator<T, R> implements Operator<T, R> {
Expand All @@ -26,42 +24,30 @@ class CatchOperator<T, R> implements Operator<T, R> {
constructor(private selector: (err: any, caught: Observable<any>) => Observable<any>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
call(subscriber: Subscriber<R>): Subscriber<T> {
return new CatchSubscriber(subscriber, this.selector, this.caught);
}
}

class CatchSubscriber<T> extends Subscriber<T> {
private lastSubscription: Subscription;

constructor(public destination: Subscriber<T>,
constructor(destination: Subscriber<any>,
private selector: (err: any, caught: Observable<any>) => Observable<any>,
private caught: Observable<any>) {
super(null);
this.lastSubscription = this;
this.destination.add(this);
super(destination);
}

_next(value: T) {
this.destination.next(value);
}

_error(err) {
const result = tryCatch(this.selector)(err, this.caught);
if (result === errorObject) {
this.destination.error(errorObject.e);
} else {
this.lastSubscription.unsubscribe();
this.lastSubscription = result.subscribe(this.destination);
error(err) {
if (!this.isStopped) {
const result = tryCatch(this.selector)(err, this.caught);
if (result === errorObject) {
super.error(errorObject.e);
} else {
const { destination } = this;
this.unsubscribe();
(<any> destination).remove(this);
result.subscribe(this.destination);
}
}
}

_complete() {
this.lastSubscription.unsubscribe();
this.destination.complete();
}

_unsubscribe() {
this.lastSubscription.unsubscribe();
}
}
Loading

0 comments on commit 981da85

Please sign in to comment.