Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(join): add join operator #1371

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ enabling "composite" subscription behavior.
|`groupJoin`|`-`|
|`includes`|`-`|
|`indexOf`|`-`|
|`join`|`-`|
|`jortSort`|`-`|
|`jortSortUntil`|`-`|
|`lastIndexOf`|`-`|
Expand Down
353 changes: 353 additions & 0 deletions spec/operators/join-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx.KitchenSink';
declare const {hot, cold, time, asDiagram, expectObservable, expectSubscriptions};

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;

interface TimeInterval {
value: any;
interval: number;
}

function ti(v: any, i: number = 0): TimeInterval {
return { value: v, interval: i };
};

function duration(x: TimeInterval): Rx.Observable<number> {
return Observable.timer(x.interval, undefined, rxTestScheduler);
};

function concat(x: TimeInterval, y: TimeInterval): any {
return x.value + y.value;
};

describe('Observable.prototype.join()', () => {
it('should work for synced sources and duration', () => {
const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

let x = hot('--a--b--c--|', xs);
const xsubs = '^ !';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

let y = hot('--d--e--f--|', ys);
const ysubs = '^ !';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

const expected = '--g--h--i--|';
const values = { g: 'first0second0', h: 'first1second1', i: 'first2second2' };

const r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should work if left emits a single value with full duration', () => {
const xs = { a: ti('first0') };
const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

let x = hot('--a--------|', xs);
const xsubs = '^ !';
xs.a.interval = time('-----------|');

let y = hot('--d--e--f--|', ys);
const ysubs = '^ !';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

const expected = '--g--h--i--|';
const values = { g: 'first0second0', h: 'first0second1', i: 'first0second2' };

const r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should work if right emits a single value with full duration', () => {
const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
const ys = { d: ti('second0') };

let x = hot('--a--b--c--|', xs);
const xsubs = '^ !';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

let y = hot('--d--------|', ys);
const ysubs = '^ !';
ys.d.interval = time('-----------|');

const expected = '--g--h--i--|';
const values = { g: 'first0second0', h: 'first1second0', i: 'first2second0' };

const r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should work when sources and duration are not synced', () => {
const xs = { a: ti(0), b: ti(1), c: ti(2), d: ti(3) };
const ys = { g: ti('hat'), h: ti('bat'), i: ti('wag'), j: ti('pig') };

let x = hot('-a----b----------c-----d-|', xs);
const xsubs = '^ !';
xs.a.interval = time( '---| ');
xs.b.interval = time( '---| ');
xs.c.interval = time( '----| ');
xs.d.interval = time( '-| ');

let y = hot('--gh------i-j------------|', ys);
const ysubs = '^ !';
ys.g.interval = time( '| ');
ys.h.interval = time( '| ');
ys.i.interval = time( '---------| ');
ys.j.interval = time( '---------| ');

const expected = '--tu-------------(wz)----|';
const values = { t: '0hat', u: '0bat', w: '2wag', z: '2pig' };

const r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should propagate errors from left', () => {
const xs = { a: ti(0) };
const ys = { g: ti('hat'), h: ti('bat') };

let x = hot('-a----#', xs);
const xsubs = '^ !';
xs.a.interval = time( '---| ');

let y = hot('--gh------i-j------------|', ys);
const ysubs = '^ !';
ys.g.interval = time( '| ');
ys.h.interval = time( '| ');

const expected = '--tu--#';
const values = { t: '0hat', u: '0bat' };

const r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should propagate errors from right', () => {
const xs = { a: ti(0), b: ti(1), c: ti(2), d: ti(3) };
const ys = { g: ti('hat'), h: ti('bat') };

let x = hot('-a----b----------c-----d-|', xs);
const xsubs = '^ !';
xs.a.interval = time( '---| ');
xs.b.interval = time( '---| ');
xs.c.interval = time( '----| ');
xs.d.interval = time( '-| ');

let y = hot('--gh------#', ys);
const ysubs = '^ !';
ys.g.interval = time( '| ');
ys.h.interval = time( '| ');

const expected = '--tu------#';
const values = { t: '0hat', u: '0bat' };

const r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should propagate errors from left duration selector', () => {
const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

let x = hot('--a--b--c--|', xs);
const xsubs = '^ ! ';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

let y = hot('--d--e--f--|', ys);
const ysubs = '^ ! ';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

const expected = '--g--#';
const values = { g: 'first0second0' };

const throwError = (v: TimeInterval) => {
if (v.value === 'first1') {
throw 'error';
} else {
return duration(v);
}
};

const r = x.join(y, throwError, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should propagate errors from right duration selector', () => {
const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

let x = hot('--a--b--c--|', xs);
const xsubs = '^ ! ';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

let y = hot('--d--e--f--|', ys);
const ysubs = '^ ! ';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

const expected = '--g--#';
const values = { g: 'first0second0' };

const throwError = (v: TimeInterval) => {
if (v.value === 'second1') {
throw 'error';
} else {
return duration(v);
}
};

const r = x.join(y, duration, throwError, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should propagate errors from result selector', () => {
const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

let x = hot('--a--b--c--|', xs);
const xsubs = '^ ! ';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

let y = hot('--d--e--f--|', ys);
const ysubs = '^ ! ';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

const expected = '--g--h--#';
const values = { g: 'first0second0', h: 'first1second1' };

const throwError = (vx: TimeInterval, vy: TimeInterval) => {
if (vx.value === 'first2') {
throw 'error';
} else {
return concat(vx, vy);
}
};

const r = x.join(y, duration, duration, throwError);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should work when left is never', () => {
const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

let x = hot('------------');
const xsubs = '^ !';

let y = hot('--d--e--f--|', ys);
const ysubs = '^ !';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

const expected = '-----------|';

let calls = 0;
const durationCounted = (v: TimeInterval) => {
calls++;
return duration(v);
};

const r = x.join(y, duration, durationCounted, concat)
.do(undefined, undefined, () => expect(calls).to.equal(3));
expectObservable(r).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should work when right is never', () => {
const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };

let x = hot('--a--b--c--|', xs);
const xsubs = '^ !';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

let y = hot('------------');
const ysubs = '^ !';

const expected = '-----------|';

let calls = 0;
const durationCounted = (v: TimeInterval) => {
calls++;
return duration(v);
};

const r = x.join(y, durationCounted, duration, concat)
.do(undefined, undefined, () => expect(calls).to.equal(3));
expectObservable(r).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should not break unsubscription chain when unsubscribed explicitly', () => {
const xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
const ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

let x = hot('--a--b--c--|', xs);
const unsub = ' ! ';
const xsubs = '^ ! ';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

let y = hot('--d--e--f--|', ys);
const ysubs = '^ ! ';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

const expected = '--g--h--';
const values = { g: 'first0second0', h: 'first1second1' };

const r = x
.mergeMap((v: TimeInterval) => Observable.of(v))
.join(y, duration, duration, concat)
.mergeMap((v: TimeInterval) => Observable.of(v));

expectObservable(r, unsub).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});
});
2 changes: 1 addition & 1 deletion src/InnerSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {OuterSubscriber} from './OuterSubscriber';
export class InnerSubscriber<T, R> extends Subscriber<R> {
private index: number = 0;

constructor(private parent: OuterSubscriber<T, R>, private outerValue: T, private outerIndex: number) {
constructor(private parent: OuterSubscriber<T, R>, private outerValue: T, public outerIndex: number) {
super();
}

Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import './add/operator/exhaustMap';
import './add/operator/find';
import './add/operator/findIndex';
import './add/operator/isEmpty';
import './add/operator/join';
import './add/operator/max';
import './add/operator/mergeScan';
import './add/operator/min';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import './add/operator/finally';
import './add/operator/first';
import './add/operator/groupBy';
import './add/operator/ignoreElements';
import './add/operator/join';
import './add/operator/audit';
import './add/operator/auditTime';
import './add/operator/last';
Expand Down
10 changes: 10 additions & 0 deletions src/add/operator/join.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {Observable} from '../../Observable';
import {join, JoinSignature} from '../../operator/join';

Observable.prototype.join = join;

declare module '../../Observable' {
interface Observable<T> {
join: JoinSignature<T>;
}
}
Loading