diff --git a/MIGRATION.md b/MIGRATION.md index cb50e88ba1..c37c919530 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -82,7 +82,6 @@ enabling "composite" subscription behavior. |`groupJoin`|`-`| |`includes`|`-`| |`indexOf`|`-`| -|`join`|`-`| |`jortSort`|`-`| |`jortSortUntil`|`-`| |`lastIndexOf`|`-`| diff --git a/spec/operators/join-spec.ts b/spec/operators/join-spec.ts new file mode 100644 index 0000000000..e332162063 --- /dev/null +++ b/spec/operators/join-spec.ts @@ -0,0 +1,353 @@ +import {expect} from 'chai'; +import * as Rx from '../../dist/cjs/Rx.KitchenSink'; +declare const {hot, cold, time, asDiagram, expectObservable, expectSubscriptions}; + +declare const rxTestScheduler: Rx.TestScheduler; +const Observable = Rx.Observable; + +interface TimeInterval { + value: any; + interval: number; +} + +function ti(v: any, i: number = 0): TimeInterval { + return { value: v, interval: i }; +}; + +function duration(x: TimeInterval): Rx.Observable { + return Observable.timer(x.interval, undefined, rxTestScheduler); +}; + +function concat(x: TimeInterval, y: TimeInterval): any { + return x.value + y.value; +}; + +describe('Observable.prototype.join()', () => { + it('should work for synced sources and duration', () => { + const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') }; + const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') }; + + let x = hot('--a--b--c--|', xs); + const xsubs = '^ !'; + xs.a.interval = time( '| '); + xs.b.interval = time( '| '); + xs.c.interval = time( '| '); + + let y = hot('--d--e--f--|', ys); + const ysubs = '^ !'; + ys.d.interval = time( '| '); + ys.e.interval = time( '| '); + ys.f.interval = time( '| '); + + const expected = '--g--h--i--|'; + const values = { g: 'first0second0', h: 'first1second1', i: 'first2second2' }; + + const r = x.join(y, duration, duration, concat); + expectObservable(r).toBe(expected, values); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should work if left emits a single value with full duration', () => { + const xs = { a: ti('first0') }; + const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') }; + + let x = hot('--a--------|', xs); + const xsubs = '^ !'; + xs.a.interval = time('-----------|'); + + let y = hot('--d--e--f--|', ys); + const ysubs = '^ !'; + ys.d.interval = time( '| '); + ys.e.interval = time( '| '); + ys.f.interval = time( '| '); + + const expected = '--g--h--i--|'; + const values = { g: 'first0second0', h: 'first0second1', i: 'first0second2' }; + + const r = x.join(y, duration, duration, concat); + expectObservable(r).toBe(expected, values); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should work if right emits a single value with full duration', () => { + const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') }; + const ys = { d: ti('second0') }; + + let x = hot('--a--b--c--|', xs); + const xsubs = '^ !'; + xs.a.interval = time( '| '); + xs.b.interval = time( '| '); + xs.c.interval = time( '| '); + + let y = hot('--d--------|', ys); + const ysubs = '^ !'; + ys.d.interval = time('-----------|'); + + const expected = '--g--h--i--|'; + const values = { g: 'first0second0', h: 'first1second0', i: 'first2second0' }; + + const r = x.join(y, duration, duration, concat); + expectObservable(r).toBe(expected, values); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should work when sources and duration are not synced', () => { + const xs = { a: ti(0), b: ti(1), c: ti(2), d: ti(3) }; + const ys = { g: ti('hat'), h: ti('bat'), i: ti('wag'), j: ti('pig') }; + + let x = hot('-a----b----------c-----d-|', xs); + const xsubs = '^ !'; + xs.a.interval = time( '---| '); + xs.b.interval = time( '---| '); + xs.c.interval = time( '----| '); + xs.d.interval = time( '-| '); + + let y = hot('--gh------i-j------------|', ys); + const ysubs = '^ !'; + ys.g.interval = time( '| '); + ys.h.interval = time( '| '); + ys.i.interval = time( '---------| '); + ys.j.interval = time( '---------| '); + + const expected = '--tu-------------(wz)----|'; + const values = { t: '0hat', u: '0bat', w: '2wag', z: '2pig' }; + + const r = x.join(y, duration, duration, concat); + expectObservable(r).toBe(expected, values); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should propagate errors from left', () => { + const xs = { a: ti(0) }; + const ys = { g: ti('hat'), h: ti('bat') }; + + let x = hot('-a----#', xs); + const xsubs = '^ !'; + xs.a.interval = time( '---| '); + + let y = hot('--gh------i-j------------|', ys); + const ysubs = '^ !'; + ys.g.interval = time( '| '); + ys.h.interval = time( '| '); + + const expected = '--tu--#'; + const values = { t: '0hat', u: '0bat' }; + + const r = x.join(y, duration, duration, concat); + expectObservable(r).toBe(expected, values); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should propagate errors from right', () => { + const xs = { a: ti(0), b: ti(1), c: ti(2), d: ti(3) }; + const ys = { g: ti('hat'), h: ti('bat') }; + + let x = hot('-a----b----------c-----d-|', xs); + const xsubs = '^ !'; + xs.a.interval = time( '---| '); + xs.b.interval = time( '---| '); + xs.c.interval = time( '----| '); + xs.d.interval = time( '-| '); + + let y = hot('--gh------#', ys); + const ysubs = '^ !'; + ys.g.interval = time( '| '); + ys.h.interval = time( '| '); + + const expected = '--tu------#'; + const values = { t: '0hat', u: '0bat' }; + + const r = x.join(y, duration, duration, concat); + expectObservable(r).toBe(expected, values); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should propagate errors from left duration selector', () => { + const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') }; + const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') }; + + let x = hot('--a--b--c--|', xs); + const xsubs = '^ ! '; + xs.a.interval = time( '| '); + xs.b.interval = time( '| '); + xs.c.interval = time( '| '); + + let y = hot('--d--e--f--|', ys); + const ysubs = '^ ! '; + ys.d.interval = time( '| '); + ys.e.interval = time( '| '); + ys.f.interval = time( '| '); + + const expected = '--g--#'; + const values = { g: 'first0second0' }; + + const throwError = (v: TimeInterval) => { + if (v.value === 'first1') { + throw 'error'; + } else { + return duration(v); + } + }; + + const r = x.join(y, throwError, duration, concat); + expectObservable(r).toBe(expected, values); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should propagate errors from right duration selector', () => { + const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') }; + const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') }; + + let x = hot('--a--b--c--|', xs); + const xsubs = '^ ! '; + xs.a.interval = time( '| '); + xs.b.interval = time( '| '); + xs.c.interval = time( '| '); + + let y = hot('--d--e--f--|', ys); + const ysubs = '^ ! '; + ys.d.interval = time( '| '); + ys.e.interval = time( '| '); + ys.f.interval = time( '| '); + + const expected = '--g--#'; + const values = { g: 'first0second0' }; + + const throwError = (v: TimeInterval) => { + if (v.value === 'second1') { + throw 'error'; + } else { + return duration(v); + } + }; + + const r = x.join(y, duration, throwError, concat); + expectObservable(r).toBe(expected, values); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should propagate errors from result selector', () => { + const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') }; + const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') }; + + let x = hot('--a--b--c--|', xs); + const xsubs = '^ ! '; + xs.a.interval = time( '| '); + xs.b.interval = time( '| '); + xs.c.interval = time( '| '); + + let y = hot('--d--e--f--|', ys); + const ysubs = '^ ! '; + ys.d.interval = time( '| '); + ys.e.interval = time( '| '); + ys.f.interval = time( '| '); + + const expected = '--g--h--#'; + const values = { g: 'first0second0', h: 'first1second1' }; + + const throwError = (vx: TimeInterval, vy: TimeInterval) => { + if (vx.value === 'first2') { + throw 'error'; + } else { + return concat(vx, vy); + } + }; + + const r = x.join(y, duration, duration, throwError); + expectObservable(r).toBe(expected, values); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should work when left is never', () => { + const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') }; + + let x = hot('------------'); + const xsubs = '^ !'; + + let y = hot('--d--e--f--|', ys); + const ysubs = '^ !'; + ys.d.interval = time( '| '); + ys.e.interval = time( '| '); + ys.f.interval = time( '| '); + + const expected = '-----------|'; + + let calls = 0; + const durationCounted = (v: TimeInterval) => { + calls++; + return duration(v); + }; + + const r = x.join(y, duration, durationCounted, concat) + .do(undefined, undefined, () => expect(calls).to.equal(3)); + expectObservable(r).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should work when right is never', () => { + const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') }; + + let x = hot('--a--b--c--|', xs); + const xsubs = '^ !'; + xs.a.interval = time( '| '); + xs.b.interval = time( '| '); + xs.c.interval = time( '| '); + + let y = hot('------------'); + const ysubs = '^ !'; + + const expected = '-----------|'; + + let calls = 0; + const durationCounted = (v: TimeInterval) => { + calls++; + return duration(v); + }; + + const r = x.join(y, durationCounted, duration, concat) + .do(undefined, undefined, () => expect(calls).to.equal(3)); + expectObservable(r).toBe(expected); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); + + it('should not break unsubscription chain when unsubscribed explicitly', () => { + const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') }; + const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') }; + + let x = hot('--a--b--c--|', xs); + const unsub = ' ! '; + const xsubs = '^ ! '; + xs.a.interval = time( '| '); + xs.b.interval = time( '| '); + xs.c.interval = time( '| '); + + let y = hot('--d--e--f--|', ys); + const ysubs = '^ ! '; + ys.d.interval = time( '| '); + ys.e.interval = time( '| '); + ys.f.interval = time( '| '); + + const expected = '--g--h--'; + const values = { g: 'first0second0', h: 'first1second1' }; + + const r = x + .mergeMap((v: TimeInterval) => Observable.of(v)) + .join(y, duration, duration, concat) + .mergeMap((v: TimeInterval) => Observable.of(v)); + + expectObservable(r, unsub).toBe(expected, values); + expectSubscriptions(x.subscriptions).toBe(xsubs); + expectSubscriptions(y.subscriptions).toBe(ysubs); + }); +}); diff --git a/src/InnerSubscriber.ts b/src/InnerSubscriber.ts index cabe90907e..030eb962fb 100644 --- a/src/InnerSubscriber.ts +++ b/src/InnerSubscriber.ts @@ -9,7 +9,7 @@ import {OuterSubscriber} from './OuterSubscriber'; export class InnerSubscriber extends Subscriber { private index: number = 0; - constructor(private parent: OuterSubscriber, private outerValue: T, private outerIndex: number) { + constructor(private parent: OuterSubscriber, private outerValue: T, public outerIndex: number) { super(); } diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 2583aca57a..206489790c 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -15,6 +15,7 @@ import './add/operator/exhaustMap'; import './add/operator/find'; import './add/operator/findIndex'; import './add/operator/isEmpty'; +import './add/operator/join'; import './add/operator/max'; import './add/operator/mergeScan'; import './add/operator/min'; diff --git a/src/Rx.ts b/src/Rx.ts index 73e55132b3..9b57d9da68 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -58,6 +58,7 @@ import './add/operator/finally'; import './add/operator/first'; import './add/operator/groupBy'; import './add/operator/ignoreElements'; +import './add/operator/join'; import './add/operator/audit'; import './add/operator/auditTime'; import './add/operator/last'; diff --git a/src/add/operator/join.ts b/src/add/operator/join.ts new file mode 100644 index 0000000000..75d205ddb7 --- /dev/null +++ b/src/add/operator/join.ts @@ -0,0 +1,10 @@ +import {Observable} from '../../Observable'; +import {join, JoinSignature} from '../../operator/join'; + +Observable.prototype.join = join; + +declare module '../../Observable' { + interface Observable { + join: JoinSignature; + } +} diff --git a/src/operator/join.ts b/src/operator/join.ts new file mode 100644 index 0000000000..7f8525fe8c --- /dev/null +++ b/src/operator/join.ts @@ -0,0 +1,186 @@ +import {Observable} from '../Observable'; +import {ArrayObservable} from '../observable/ArrayObservable'; +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; +import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; +import {subscribeToResult} from '../util/subscribeToResult'; +import {take} from './take'; + +/** + * Returns an Observable that combines the items emitted by two Observables, and selects which items to combine + * based on duration-windows that you define on a per-item basis via the durtionSelector functions. You implement + * these windows as Observables whose lifespans begin with each item emitted by either Observable. When such a + * window-defining Observable either emits an item or completes, the window for the item it is associated with + * closes. So long as an item's window is open, it will combine with any item emitted by the other Observable. + * `resultSelector` is the function you define to combine the items. + * + * @param {Observable} right the right observable sequence to join elements for + * @param {Function} leftDurationSelector a function to select the duration (expressed as an observable sequence) + * of each element of the left observable sequence, used to determine overlap + * @param {Function} rightDurationSelector a function to select the duration (expressed as an observable sequence) + * of each element of the left observable sequence, used to determine overlap + * @param {Function} resultSelector a function invoked to compute a result element for any two overlapping elements + * of the left and right observable sequences. + * @returns {Observable} an observable sequence that contains result elements computed from source elements that + * have an overlapping duration + */ +export function join(right: Observable, + leftDurationSelector: (value: T1) => Observable, + rightDurationSelector: (value: T2) => Observable, + resultSelector: (left: T1, right: T2) => R): Observable { + const o = new ArrayObservable([this, right]); + return o.lift(new JoinOperator(leftDurationSelector, rightDurationSelector, resultSelector)); +} + +export interface JoinSignature { + (right: Observable, leftDurationSelector: (value: T) => Observable, + rightDurationSelector: (value: T2) => Observable, + resultSelector: (left: T, right: T2) => R): Observable; +} + +class JoinOperator implements Operator { + constructor(private leftDurationSelector: (value: T1) => Observable, + private rightDurationSelector: (value: T2) => Observable, + private resultSelector: (left: T1, right: T2) => R) { + } + + call(subscriber: Subscriber): Subscriber { + return new JoinSubscriber(subscriber, this.leftDurationSelector, this.rightDurationSelector, this.resultSelector); + } +} + +class Context { + public map: Map = new Map(); + public done: boolean = false; + + constructor(public idCounter: number, public source: Observable) { + } + + get empty(): boolean { + return this.map.size === 0; + } + + add(value: T): number { + const id = this.idCounter; + this.idCounter += 2; + this.map.set(id, value); + return id; + } + + remove(id: number): void { + this.map.delete(id); + } +} + +class JoinSubscriber extends OuterSubscriber { + private left: Context; + private right: Context; + + constructor(destination: Subscriber, + private leftDurationSelector: (value: T1) => Observable, + private rightDurationSelector: (value: T2) => Observable, + private resultSelector: (left: T1, right: T2) => R) { + super(destination); + } + + protected _next(value: Observable): void { + if (this.left) { + this.right = new Context(3, >value); + } else { + this.left = new Context(2, >value); + } + } + + protected _complete(): void { + this.add(subscribeToResult(this, this.left.source, null, 0)); + this.add(subscribeToResult(this, this.right.source, null, 1)); + } + + notifyNext(outerValue: any, innerValue: any, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + if (outerIndex === 0) { + this.leftNext(innerValue); + } else if (outerIndex === 1) { + this.rightNext(innerValue); + } + } + + private leftNext(value: T1): void { + const id = this.left.add(value); + const success = this.tryDurationSelector(this.leftDurationSelector, value, id); + if (success) { + this.right.map.forEach((v: T2) => { + this.tryResultSelector(value, v); + }); + } + } + + private rightNext(value: T2): void { + const id = this.right.add(value); + const success = this.tryDurationSelector(this.rightDurationSelector, value, id); + if (success) { + this.left.map.forEach((v: T1) => { + this.tryResultSelector(v, value); + }); + } + } + + private tryDurationSelector(durationSelector: (value: T1 | T2) => Observable, + value: T1 | T2, id: number): boolean { + let duration: Observable; + try { + duration = durationSelector(value); + } catch (err) { + this.destination.error(err); + return false; + } + this.subscribeToDuration(duration, id); + return true; + } + + private subscribeToDuration(duration: Observable, id: number): void { + const durationObservable = take.call(duration, 1); + this.add(subscribeToResult(this, durationObservable, null, id)); + } + + private tryResultSelector(leftValue: T1, rightValue: T2): void { + const destination = this.destination; + let result: R; + try { + result = this.resultSelector(leftValue, rightValue); + } catch (err) { + destination.error(err); + return; + } + destination.next(result); + } + + notifyComplete(innerSub: InnerSubscriber): void { + const id = innerSub.outerIndex; + this.remove(innerSub); + if (id === 0) { + this.mainComplete(this.left, this.right); + } else if (id === 1) { + this.mainComplete(this.right, this.left); + } else { + const context = id % 2 === 0 ? this.left : this.right; + this.durationComplete(id, context); + } + } + + private mainComplete(x: Context, y: Context): void { + x.done = true; + if (y.done || x.empty) { + this.destination.complete(); + } + } + + private durationComplete(id: number, context: Context): void { + context.remove(id); + if (context.done && context.empty) { + this.destination.complete(); + } + } +}