Skip to content

Commit

Permalink
Merge pull request #3506 from benlesh/smaller-elementAt
Browse files Browse the repository at this point in the history
Fix elementAt defaultValue, make elementAt impl smaller
  • Loading branch information
benlesh authored Mar 31, 2018
2 parents 3db18d1 + 13706e7 commit 37fcc33
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 46 deletions.
2 changes: 1 addition & 1 deletion compat/operator/elementAt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ import { elementAt as higherOrder } from 'rxjs/operators';
* @owner Observable
*/
export function elementAt<T>(this: Observable<T>, index: number, defaultValue?: T): Observable<T> {
return higherOrder(index, defaultValue)(this);
return higherOrder.apply(undefined, arguments)(this);
}
58 changes: 13 additions & 45 deletions src/internal/operators/elementAt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { Subscriber } from '../Subscriber';
import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError';
import { Observable } from '../Observable';
import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
import { filter } from './filter';
import { throwIfEmpty } from './throwIfEmpty';
import { defaultIfEmpty } from './defaultIfEmpty';
import { take } from './take';

/**
* Emits the single value at the specified `index` in a sequence of emissions
Expand Down Expand Up @@ -47,49 +51,13 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
* @owner Observable
*/
export function elementAt<T>(index: number, defaultValue?: T): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(new ElementAtOperator(index, defaultValue));
}

class ElementAtOperator<T> implements Operator<T, T> {

constructor(private index: number, private defaultValue?: T) {
if (index < 0) {
throw new ArgumentOutOfRangeError;
}
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new ElementAtSubscriber(subscriber, this.index, this.defaultValue));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class ElementAtSubscriber<T> extends Subscriber<T> {

constructor(destination: Subscriber<T>, private index: number, private defaultValue?: T) {
super(destination);
}

protected _next(x: T) {
if (this.index-- === 0) {
this.destination.next(x);
this.destination.complete();
}
}

protected _complete() {
const destination = this.destination;
if (this.index >= 0) {
if (typeof this.defaultValue !== 'undefined') {
destination.next(this.defaultValue);
} else {
destination.error(new ArgumentOutOfRangeError);
}
}
destination.complete();
}
if (index < 0) { throw new ArgumentOutOfRangeError(); }
const hasDefaultValue = arguments.length >= 2;
return (source: Observable<T>) => source.pipe(
filter((v, i) => i === index),
take(1),
hasDefaultValue
? defaultIfEmpty(defaultValue)
: throwIfEmpty(() => new ArgumentOutOfRangeError()),
);
}

0 comments on commit 37fcc33

Please sign in to comment.