Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mergeScan): No longer emits state again upon completion. #5805

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions spec/operators/mergeScan-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ describe('mergeScan', () => {
it('should handle an empty projected Observable', () => {
const e1 = hot('--a--^--b--c--d--e--f--g--|');
const e1subs = '^ !';
const expected = '---------------------(x|)';
const expected = '---------------------|';

const values = { x: <string[]>[] };

const source = e1.pipe(mergeScan((acc, x) => EMPTY, []));
const source = e1.pipe(mergeScan(() => EMPTY, []));

expectObservable(source).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
Expand All @@ -223,15 +223,11 @@ describe('mergeScan', () => {
it('handle empty', () => {
const e1 = cold('|');
const e1subs = '(^!)';
const expected = '(u|)';

const values = {
u: <string[]>[]
};
const expected = '|';

const source = e1.pipe(mergeScan((acc, x) => of(acc.concat(x)), [] as string[]));

expectObservable(source).toBe(expected, values);
expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

Expand Down Expand Up @@ -309,7 +305,7 @@ describe('mergeScan', () => {
it('should emit accumulator if inner completes without value', () => {
const e1 = hot('--a--^--b--c--d--e--f--g--|');
const e1subs = '^ !';
const expected = '---------------------(x|)';
const expected = '---------------------|';

const source = e1.pipe(mergeScan((acc, x) => EMPTY, ['1']));

Expand All @@ -320,13 +316,14 @@ describe('mergeScan', () => {
it('should emit accumulator if inner completes without value after source completes', () => {
const e1 = hot('--a--^--b--c--d--e--f--g--|');
const e1subs = '^ !';
const expected = '---------------------(x|)';
const expected = '-----------------------|';
const inner = cold( '-----|');

const source = e1.pipe(
mergeScan((acc, x) => EMPTY.pipe(delay(50, rxTestScheduler)), ['1'])
mergeScan(() => inner, '1')
);

expectObservable(source).toBe(expected, {x: ['1']});
expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

Expand Down
3 changes: 1 addition & 2 deletions src/internal/operators/expand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ export function expand<T, R>(
project,
concurrent,

// These unused handlers are for `xScan`-type operators
undefined,
// onBeforeNext
undefined,

// Expand-specific
Expand Down
9 changes: 3 additions & 6 deletions src/internal/operators/mergeInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import { OperatorSubscriber } from './OperatorSubscriber';
* @param project The projection function to get our inner sources
* @param concurrent The number of concurrent inner subscriptions
* @param onBeforeNext Additional logic to apply before nexting to our consumer
* @param onBeforeComplete Additional logic to apply before telling the consumer
* we're complete.
* @param expand If `true` this will perform an "expand" strategy, which differs only
* in that it recurses, and the inner subscription must be schedule-able.
* @param innerSubScheduler A scheduler to use to schedule inner subscriptions,
Expand All @@ -26,9 +24,9 @@ export function mergeInternals<T, R>(
project: (value: T, index: number) => ObservableInput<R>,
concurrent: number,
onBeforeNext?: (innerValue: R) => void,
onBeforeComplete?: () => void,
expand?: boolean,
innerSubScheduler?: SchedulerLike
innerSubScheduler?: SchedulerLike,
additionalTeardown?: () => void
) {
// Buffered values, in the event of going over our concurrency limit
let buffer: T[] = [];
Expand All @@ -47,8 +45,6 @@ export function mergeInternals<T, R>(
// and we don't have any active inner subscriptions, then we can
// Emit the state and complete.
if (isComplete && !buffer.length && !active) {
// In the case of `mergeScan`, we need additional handling here.
onBeforeComplete?.();
subscriber.complete();
}
};
Expand Down Expand Up @@ -129,5 +125,6 @@ export function mergeInternals<T, R>(
return () => {
// Ensure buffered values are released.
buffer = null!;
additionalTeardown?.();
};
}
4 changes: 1 addition & 3 deletions src/internal/operators/mergeMap.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/** @prettier */
import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
import { map } from './map';
import { innerFrom } from '../observable/from';
Expand Down Expand Up @@ -87,8 +86,7 @@ export function mergeMap<T, R, O extends ObservableInput<any>>(
): OperatorFunction<T, ObservedValueOf<O> | R> {
if (isFunction(resultSelector)) {
// DEPRECATED PATH
return (source: Observable<T>) =>
source.pipe(mergeMap((a, i) => innerFrom(project(a, i)).pipe(map((b: any, ii: number) => resultSelector(a, b, i, ii))), concurrent));
return mergeMap((a, i) => map((b: any, ii: number) => resultSelector(a, b, i, ii))(innerFrom(project(a, i))), concurrent);
} else if (typeof resultSelector === 'number') {
concurrent = resultSelector;
}
Expand Down
8 changes: 3 additions & 5 deletions src/internal/operators/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ export function mergeScan<T, R>(
concurrent = Infinity
): OperatorFunction<T, R> {
return operate((source, subscriber) => {
// Whether or not we have gotten any accumulated state. This is used to
// decide whether or not to emit in the event of an empty result.
let hasState = false;
// The accumulated state.
let state = seed;

Expand All @@ -62,10 +59,11 @@ export function mergeScan<T, R>(
(value, index) => accumulator(state, value, index),
concurrent,
(value) => {
hasState = true;
state = value;
},
() => !hasState && subscriber.next(state)
false,
undefined,
() => (state = null!)
);
});
}