Skip to content

Commit

Permalink
feat(timestamp): add timestamp operator
Browse files Browse the repository at this point in the history
closes #1515
  • Loading branch information
kwonoj committed Mar 27, 2016
1 parent 1631224 commit 80b1646
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 0 deletions.
1 change: 1 addition & 0 deletions doc/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ There are operators for different purposes, and they may be categorized as: crea
- [`observeOn`](../class/es6/Observable.js~Observable.html#instance-method-observeOn)
- [`subscribeOn`](../class/es6/Observable.js~Observable.html#instance-method-subscribeOn)
- [`timeInterval`](../class/es6/Observable.js~Observable.html#instance-method-timeInterval)
- [`timestamp`](../class/es6/Observable.js~Observable.html#instance-method-timestamp)
- [`timeout`](../class/es6/Observable.js~Observable.html#instance-method-timeout)
- [`timeoutWith`](../class/es6/Observable.js~Observable.html#instance-method-timeoutWith)
- [`toArray`](../class/es6/Observable.js~Observable.html#instance-method-toArray)
Expand Down
147 changes: 147 additions & 0 deletions spec/operators/timestamp-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import * as Rx from '../../dist/cjs/Rx.KitchenSink';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;

/** @test {timestamp} */
describe('Observable.prototype.timestamp', () => {
asDiagram('timestamp')('should record the time stamp per each source elements', () => {
const e1 = hot('-b-c-----d--e--|');
const e1subs = '^ !';
const expected = '-w-x-----y--z--|';
const expectedValue = { w: 10, x: 30, y: 90, z: 120 };

const result = e1.timestamp(rxTestScheduler)
.map(x => x.timestamp);

expectObservable(result).toBe(expected, expectedValue);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should record stamp if source emit elements', () => {
const e1 = hot('--a--^b--c----d---e--|');
const e1subs = '^ !';
const expected = '-w--x----y---z--|';

const expectedValue = {
w: new Rx.Timestamp('b', 10),
x: new Rx.Timestamp('c', 40),
y: new Rx.Timestamp('d', 90),
z: new Rx.Timestamp('e', 130)
};

expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected, expectedValue);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should completes without record stamp if source does not emits', () => {
const e1 = hot('---------|');
const e1subs = '^ !';
const expected = '---------|';

expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should complete immediately if source is empty', () => {
const e1 = cold('|');
const e1subs = '(^!)';
const expected = '|';

expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should record stamp then does not completes if source emits but not completes', () => {
const e1 = hot('-a--b--');
const e1subs = '^ ';
const expected = '-y--z--';

const expectedValue = {
y: new Rx.Timestamp('a', 10),
z: new Rx.Timestamp('b', 40)
};

expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected, expectedValue);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow unsubscribing explicitly and early', () => {
const e1 = hot('-a--b-----c---d---|');
const unsub = ' ! ';
const e1subs = '^ ! ';
const expected = '-y--z--- ';

const expectedValue = {
y: new Rx.Timestamp('a', 10),
z: new Rx.Timestamp('b', 40)
};

const result = e1.timestamp(rxTestScheduler);

expectObservable(result, unsub).toBe(expected, expectedValue);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
const e1 = hot('-a--b-----c---d---|');
const e1subs = '^ ! ';
const expected = '-y--z--- ';
const unsub = ' ! ';

const expectedValue = {
y: new Rx.Timestamp('a', 10),
z: new Rx.Timestamp('b', 40)
};

const result = e1
.mergeMap(x => Observable.of(x))
.timestamp(rxTestScheduler)
.mergeMap(x => Observable.of(x));

expectObservable(result, unsub).toBe(expected, expectedValue);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not completes if source never completes', () => {
const e1 = cold('-');
const e1subs = '^';
const expected = '-';

expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('raise error if source raises error', () => {
const e1 = hot('---#');
const e1subs = '^ !';
const expected = '---#';

expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should record stamp then raise error if source raises error after emit', () => {
const e1 = hot('-a--b--#');
const e1subs = '^ !';
const expected = '-y--z--#';

const expectedValue = {
y: new Rx.Timestamp('a', 10),
z: new Rx.Timestamp('b', 40)
};

expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected, expectedValue);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should raise error if source immediately throws', () => {
const e1 = cold('#');
const e1subs = '(^!)';
const expected = '#';

expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
2 changes: 2 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import './add/operator/mergeScan';
import './add/operator/min';
import './add/operator/pairwise';
import './add/operator/timeInterval';
import './add/operator/timestamp';

export {TimeInterval} from './operator/timeInterval';
export {Timestamp} from './operator/timestamp';
export {TestScheduler} from './testing/TestScheduler';
export {VirtualTimeScheduler} from './scheduler/VirtualTimeScheduler';
10 changes: 10 additions & 0 deletions src/add/operator/timestamp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {Observable} from '../../Observable';
import {timestamp, TimestampSignature} from '../../operator/timestamp';

Observable.prototype.timestamp = timestamp;

declare module '../../Observable' {
interface Observable<T> {
timestamp: TimestampSignature<T>;
}
}
45 changes: 45 additions & 0 deletions src/operator/timestamp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import {Operator} from '../Operator';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {async} from '../scheduler/async';

/**
* @param scheduler
* @return {Observable<Timestamp<any>>|WebSocketSubject<T>|Observable<T>}
* @method timestamp
* @owner Observable
*/
export function timestamp<T>(scheduler: Scheduler = async): Observable<Timestamp<T>> {
return this.lift(new TimestampOperator(scheduler));
}

export interface TimestampSignature<T> {
(scheduler?: Scheduler): Observable<Timestamp<T>>;
}

export class Timestamp<T> {
constructor(public value: T, public timestamp: number) {
}
};

class TimestampOperator<T> implements Operator<T, Timestamp<T>> {
constructor(private scheduler: Scheduler) {
}

call(observer: Subscriber<Timestamp<T>>): Subscriber<T> {
return new TimestampSubscriber(observer, this.scheduler);
}
}

class TimestampSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<Timestamp<T>>, private scheduler: Scheduler) {
super(destination);
}

protected _next(value: T): void {
const now = this.scheduler.now();

this.destination.next(new Timestamp(value, now));
}
}

0 comments on commit 80b1646

Please sign in to comment.