Skip to content

Commit

Permalink
feat(join): add join operator
Browse files Browse the repository at this point in the history
  • Loading branch information
luisgabriel authored and kwonoj committed Feb 21, 2016
1 parent 15a450c commit 4e5a39c
Show file tree
Hide file tree
Showing 8 changed files with 531 additions and 1 deletion.
1 change: 0 additions & 1 deletion MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ enabling "composite" subscription behavior.
|`groupJoin`|`-`|
|`includes`|`-`|
|`indexOf`|`-`|
|`join`|`-`|
|`jortSort`|`-`|
|`jortSortUntil`|`-`|
|`lastIndexOf`|`-`|
Expand Down
345 changes: 345 additions & 0 deletions spec/operators/join-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

var TimeInterval = function (value, interval) {
this.value = value;
this.interval = interval;
};

var ti = function (v, i) {
return new TimeInterval(v, i);
};

var duration = function (x) {
return Observable.timer(x.interval, undefined, rxTestScheduler);
};

var concat = function (x, y) {
return x.value + y.value;
};

describe('Observable.prototype.join()', function () {
it('should work for synced sources and duration', function () {
var xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
var ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

var x = hot('--a--b--c--|', xs);
var xsubs = '^ !';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

var y = hot('--d--e--f--|', ys);
var ysubs = '^ !';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

var expected = '--g--h--i--|';
var values = { g: 'first0second0', h: 'first1second1', i: 'first2second2' };

var r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should work if left emits a single value with full duration', function () {
var xs = { a: ti('first0') };
var ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

var x = hot('--a--------|', xs);
var xsubs = '^ !';
xs.a.interval = time('-----------|');

var y = hot('--d--e--f--|', ys);
var ysubs = '^ !';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

var expected = '--g--h--i--|';
var values = { g: 'first0second0', h: 'first0second1', i: 'first0second2' };

var r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should work if right emits a single value with full duration', function () {
var xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
var ys = { d: ti('second0') };

var x = hot('--a--b--c--|', xs);
var xsubs = '^ !';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

var y = hot('--d--------|', ys);
var ysubs = '^ !';
ys.d.interval = time('-----------|');

var expected = '--g--h--i--|';
var values = { g: 'first0second0', h: 'first1second0', i: 'first2second0' };

var r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should work when sources and duration are not synced', function () {
var xs = { a: ti(0), b: ti(1), c: ti(2), d: ti(3) };
var ys = { g: ti('hat'), h: ti('bat'), i: ti('wag'), j: ti('pig') };

var x = hot('-a----b----------c-----d-|', xs);
var xsubs = '^ !';
xs.a.interval = time( '---| ');
xs.b.interval = time( '---| ');
xs.c.interval = time( '----| ');
xs.d.interval = time( '-| ');

var y = hot('--gh------i-j------------|', ys);
var ysubs = '^ !';
ys.g.interval = time( '| ');
ys.h.interval = time( '| ');
ys.i.interval = time( '---------| ');
ys.j.interval = time( '---------| ');

var expected = '--tu-------------(wz)----|';
var values = { t: '0hat', u: '0bat', w: '2wag', z: '2pig' };

var r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should propagate errors from left', function () {
var xs = { a: ti(0) };
var ys = { g: ti('hat'), h: ti('bat') };

var x = hot('-a----#', xs);
var xsubs = '^ !';
xs.a.interval = time( '---| ');

var y = hot('--gh------i-j------------|', ys);
var ysubs = '^ !';
ys.g.interval = time( '| ');
ys.h.interval = time( '| ');

var expected = '--tu--#';
var values = { t: '0hat', u: '0bat' };

var r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should propagate errors from right', function () {
var xs = { a: ti(0), b: ti(1), c: ti(2), d: ti(3) };
var ys = { g: ti('hat'), h: ti('bat') };

var x = hot('-a----b----------c-----d-|', xs);
var xsubs = '^ !';
xs.a.interval = time( '---| ');
xs.b.interval = time( '---| ');
xs.c.interval = time( '----| ');
xs.d.interval = time( '-| ');

var y = hot('--gh------#', ys);
var ysubs = '^ !';
ys.g.interval = time( '| ');
ys.h.interval = time( '| ');

var expected = '--tu------#';
var values = { t: '0hat', u: '0bat' };

var r = x.join(y, duration, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should propagate errors from left duration selector', function () {
var xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
var ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

var x = hot('--a--b--c--|', xs);
var xsubs = '^ ! ';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

var y = hot('--d--e--f--|', ys);
var ysubs = '^ ! ';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

var expected = '--g--#';
var values = { g: 'first0second0' };

var throwError = function (v) {
if (v.value === 'first1') { throw 'error'; }
else { return duration(v); }
};

var r = x.join(y, throwError, duration, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should propagate errors from right duration selector', function () {
var xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
var ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

var x = hot('--a--b--c--|', xs);
var xsubs = '^ ! ';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

var y = hot('--d--e--f--|', ys);
var ysubs = '^ ! ';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

var expected = '--g--#';
var values = { g: 'first0second0' };

var throwError = function (v) {
if (v.value === 'second1') { throw 'error'; }
else { return duration(v); }
};

var r = x.join(y, duration, throwError, concat);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should propagate errors from result selector', function () {
var xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
var ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

var x = hot('--a--b--c--|', xs);
var xsubs = '^ ! ';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

var y = hot('--d--e--f--|', ys);
var ysubs = '^ ! ';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

var expected = '--g--h--#';
var values = { g: 'first0second0', h: 'first1second1' };

var throwError = function (vx, vy) {
if (vx.value === 'first2') { throw 'error'; }
else { return concat(vx, vy); }
};

var r = x.join(y, duration, duration, throwError);
expectObservable(r).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should work when left is never', function () {
var ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

var x = hot('------------');
var xsubs = '^ !';

var y = hot('--d--e--f--|', ys);
var ysubs = '^ !';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

var expected = '-----------|';

var calls = 0;
var durationCounted = function (v) {
calls++;
return duration(v);
};

var r = x.join(y, duration, durationCounted, concat)
.do(undefined, undefined, function () {
expect(calls).toBe(3);
});
expectObservable(r).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should work when right is never', function () {
var xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };

var x = hot('--a--b--c--|', xs);
var xsubs = '^ !';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

var y = hot('------------');
var ysubs = '^ !';

var expected = '-----------|';

var calls = 0;
var durationCounted = function (v) {
calls++;
return duration(v);
};

var r = x.join(y, durationCounted, duration, concat)
.do(undefined, undefined, function () {
expect(calls).toBe(3);
});
expectObservable(r).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should not break unsubscription chain when unsubscribed explicitly', function () {
var xs = { a: ti('first0'), b: ti('first1'), c: ti('first2') };
var ys = { d: ti('second0'), e: ti('second1'), f: ti('second2') };

var x = hot('--a--b--c--|', xs);
var unsub = ' ! ';
var xsubs = '^ ! ';
xs.a.interval = time( '| ');
xs.b.interval = time( '| ');
xs.c.interval = time( '| ');

var y = hot('--d--e--f--|', ys);
var ysubs = '^ ! ';
ys.d.interval = time( '| ');
ys.e.interval = time( '| ');
ys.f.interval = time( '| ');

var expected = '--g--h--';
var values = { g: 'first0second0', h: 'first1second1' };

var r = x
.mergeMap(function (v) { return Observable.of(v); })
.join(y, duration, duration, concat)
.mergeMap(function (v) { return Observable.of(v); });

expectObservable(r, unsub).toBe(expected, values);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});
});
3 changes: 3 additions & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ export interface CoreOperators<T> {
ignoreElements?: () => Observable<T>;
inspect?: (durationSelector: (value: T) => Observable<any> | Promise<any>) => Observable<T>;
inspectTime?: (delay: number, scheduler?: Scheduler) => Observable<T>;
join?: <T2, D1, D2, R>(right: Observable<T2>, leftDurationSelector: (value: T) => Observable<D1>,
rightDurationSelector: (value: T2) => Observable<D2>,
resultSelector: (left: T, right: T2) => R) => Observable<R>;
last?: <R>(predicate?: (value: T, index: number) => boolean,
resultSelector?: (value: T, index: number) => R,
defaultValue?: any) => Observable<T> | Observable<R>;
Expand Down
3 changes: 3 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ export class Observable<T> implements CoreOperators<T> {
ignoreElements: () => Observable<T>;
inspect: (durationSelector: (value: T) => Observable<any> | Promise<any>) => Observable<T>;
inspectTime: (delay: number, scheduler?: Scheduler) => Observable<T>;
join: <T2, D1, D2, R>(right: Observable<T2>, leftDurationSelector: (value: T) => Observable<D1>,
rightDurationSelector: (value: T2) => Observable<D2>,
resultSelector: (left: T, right: T2) => R) => Observable<R>;
last: <R>(predicate?: (value: T, index: number) => boolean,
resultSelector?: (value: T, index: number) => R,
defaultValue?: any) => Observable<T> | Observable<R>;
Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ import './add/operator/ignoreElements';
import './add/operator/inspect';
import './add/operator/inspectTime';
import './add/operator/isEmpty';
import './add/operator/join';
import './add/operator/every';
import './add/operator/last';
import './add/operator/let';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import './add/operator/groupBy';
import './add/operator/ignoreElements';
import './add/operator/inspect';
import './add/operator/inspectTime';
import './add/operator/join';
import './add/operator/every';
import './add/operator/last';
import './add/operator/let';
Expand Down
6 changes: 6 additions & 0 deletions src/add/operator/join.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {Observable} from '../../Observable';
import {join} from '../../operator/join';

Observable.prototype.join = join;

export var _void: void;
Loading

0 comments on commit 4e5a39c

Please sign in to comment.