diff --git a/spec/operators/bufferToggle-spec.ts b/spec/operators/bufferToggle-spec.ts
index 4a82000915..e1f6edbcfc 100644
--- a/spec/operators/bufferToggle-spec.ts
+++ b/spec/operators/bufferToggle-spec.ts
@@ -1,5 +1,6 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
+import {DoneSignature} from '../helpers/test-helper';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};
const Observable = Rx.Observable;
@@ -342,4 +343,42 @@ describe('Observable.prototype.bufferToggle', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
+
+ it('should accept closing selector that returns a resolved promise', (done: DoneSignature) => {
+ const e1 = Observable.concat(Observable.of(1),
+ Observable.timer(10).mapTo(2),
+ Observable.timer(10).mapTo(3),
+ Observable.timer(100).mapTo(4)
+ );
+ const expected = [[1]];
+
+ e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any) => { resolve(42); }))
+ .subscribe((x) => {
+ expect(x).toEqual(expected.shift()); },
+ done.fail,
+ () => {
+ expect(expected.length).toBe(0);
+ done();
+ });
+ });
+
+ it('should accept closing selector that returns a rejected promise', (done: DoneSignature) => {
+ const e1 = Observable.concat(Observable.of(1),
+ Observable.timer(10).mapTo(2),
+ Observable.timer(10).mapTo(3),
+ Observable.timer(100).mapTo(4)
+ );
+
+ const expected = 42;
+
+ e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any, reject: any) => { reject(expected); }))
+ .subscribe((x) => {
+ done.fail();
+ }, (x) => {
+ expect(x).toBe(expected);
+ done();
+ }, () => {
+ done.fail();
+ });
+ });
});
\ No newline at end of file
diff --git a/src/operator/bufferToggle.ts b/src/operator/bufferToggle.ts
index 7b359af451..374f3ed483 100644
--- a/src/operator/bufferToggle.ts
+++ b/src/operator/bufferToggle.ts
@@ -1,9 +1,11 @@
import {Operator} from '../Operator';
import {Subscriber} from '../Subscriber';
-import {Observable} from '../Observable';
+import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscription} from '../Subscription';
-import {tryCatch} from '../util/tryCatch';
-import {errorObject} from '../util/errorObject';
+
+import {subscribeToResult} from '../util/subscribeToResult';
+import {OuterSubscriber} from '../OuterSubscriber';
+import {InnerSubscriber} from '../InnerSubscriber';
/**
* Buffers the source Observable values starting from an emission from
@@ -17,7 +19,7 @@ import {errorObject} from '../util/errorObject';
*
* Buffers values from the source by opening the buffer via signals from an
* Observable provided to `openings`, and closing and sending the buffers when
- * an Observable returned by the `closingSelector` function emits.
+ * a Subscribable or Promise returned by the `closingSelector` function emits.
*
* @example
Every other second, emit the click events from the next 500ms
* var clicks = Rx.Observable.fromEvent(document, 'click');
@@ -36,7 +38,7 @@ import {errorObject} from '../util/errorObject';
* @param {Observable} openings An observable of notifications to start new
* buffers.
* @param {function(value: O): Observable} closingSelector A function that takes
- * the value emitted by the `openings` observable and returns an Observable,
+ * the value emitted by the `openings` observable and returns a Subscribable or Promise,
* which, when it emits, signals that the associated buffer should be emitted
* and cleared.
* @return {Observable} An observable of arrays of buffered values.
@@ -44,18 +46,18 @@ import {errorObject} from '../util/errorObject';
* @owner Observable
*/
export function bufferToggle(openings: Observable,
- closingSelector: (value: O) => Observable): Observable {
+ closingSelector: (value: O) => SubscribableOrPromise | void): Observable {
return this.lift(new BufferToggleOperator(openings, closingSelector));
}
export interface BufferToggleSignature {
- (openings: Observable, closingSelector: (value: O) => Observable): Observable;
+ (openings: Observable, closingSelector: (value: O) => SubscribableOrPromise | void): Observable;
}
class BufferToggleOperator implements Operator {
constructor(private openings: Observable,
- private closingSelector: (value: O) => Observable) {
+ private closingSelector: (value: O) => SubscribableOrPromise | void) {
}
call(subscriber: Subscriber, source: any): any {
@@ -73,17 +75,17 @@ interface BufferContext {
* @ignore
* @extends {Ignored}
*/
-class BufferToggleSubscriber extends Subscriber {
+class BufferToggleSubscriber extends OuterSubscriber {
private contexts: Array> = [];
constructor(destination: Subscriber,
private openings: Observable,
- private closingSelector: (value: O) => Observable) {
+ private closingSelector: (value: O) => SubscribableOrPromise | void) {
super(destination);
this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this)));
}
- protected _next(value: T) {
+ protected _next(value: T): void {
const contexts = this.contexts;
const len = contexts.length;
for (let i = 0; i < len; i++) {
@@ -91,7 +93,7 @@ class BufferToggleSubscriber extends Subscriber {
}
}
- protected _error(err: any) {
+ protected _error(err: any): void {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
@@ -103,7 +105,7 @@ class BufferToggleSubscriber extends Subscriber {
super._error(err);
}
- protected _complete() {
+ protected _complete(): void {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
@@ -116,27 +118,29 @@ class BufferToggleSubscriber extends Subscriber {
super._complete();
}
- openBuffer(value: O) {
- const closingSelector = this.closingSelector;
- const contexts = this.contexts;
-
- let closingNotifier = tryCatch(closingSelector)(value);
- if (closingNotifier === errorObject) {
- this._error(errorObject.e);
- } else {
- let context = {
- buffer: [],
- subscription: new Subscription()
- };
- contexts.push(context);
- const subscriber = new BufferToggleClosingsSubscriber(this, context);
- const subscription = closingNotifier.subscribe(subscriber);
- context.subscription.add(subscription);
- this.add(subscription);
+ openBuffer(value: O): void {
+ try {
+ const closingSelector = this.closingSelector;
+ const closingNotifier = closingSelector.call(this, value);
+ if (closingNotifier) {
+ this.trySubscribe(closingNotifier);
+ }
+ } catch (err) {
+ this._error(err);
}
}
- closeBuffer(context: BufferContext) {
+ notifyNext(outerValue: any, innerValue: O,
+ outerIndex: number, innerIndex: number,
+ innerSub: InnerSubscriber): void {
+ this.closeBuffer(outerValue);
+ }
+
+ notifyComplete(innerSub: InnerSubscriber): void {
+ this.closeBuffer(( innerSub).context);
+ }
+
+ private closeBuffer(context: BufferContext): void {
const contexts = this.contexts;
if (contexts === null) {
return;
@@ -147,28 +151,20 @@ class BufferToggleSubscriber extends Subscriber {
this.remove(subscription);
subscription.unsubscribe();
}
-}
-/**
- * We need this JSDoc comment for affecting ESDoc.
- * @ignore
- * @extends {Ignored}
- */
-class BufferToggleOpeningsSubscriber extends Subscriber {
- constructor(private parent: BufferToggleSubscriber) {
- super(null);
- }
+ private trySubscribe(closingNotifier: any): void {
+ const contexts = this.contexts;
- protected _next(value: O) {
- this.parent.openBuffer(value);
- }
+ const buffer: Array = [];
+ const subscription = new Subscription();
+ const context = { buffer, subscription };
+ contexts.push(context);
- protected _error(err: any) {
- this.parent.error(err);
- }
+ const innerSubscription = subscribeToResult(this, closingNotifier, context);
+ ( innerSubscription).context = context;
- protected _complete() {
- // noop
+ this.add(innerSubscription);
+ subscription.add(innerSubscription);
}
}
@@ -177,14 +173,13 @@ class BufferToggleOpeningsSubscriber extends Subscriber {
* @ignore
* @extends {Ignored}
*/
-class BufferToggleClosingsSubscriber extends Subscriber {
- constructor(private parent: BufferToggleSubscriber,
- private context: { subscription: any, buffer: T[] }) {
+class BufferToggleOpeningsSubscriber extends Subscriber {
+ constructor(private parent: BufferToggleSubscriber) {
super(null);
}
- protected _next() {
- this.parent.closeBuffer(this.context);
+ protected _next(value: O) {
+ this.parent.openBuffer(value);
}
protected _error(err: any) {
@@ -192,6 +187,6 @@ class BufferToggleClosingsSubscriber extends Subscriber {
}
protected _complete() {
- this.parent.closeBuffer(this.context);
+ // noop
}
}