Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Reenable XFlatMapTest.maybeSingle, add missing Single operators #6893

Merged
merged 1 commit into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3653,7 +3653,7 @@ public final <R> Maybe<R> flatMap(@NonNull Function<? super T, ? extends MaybeSo
}

/**
* Maps the {@code onSuccess}, {@code onError} or {@code onComplete} signals of this {@code Maybe} into {@link MaybeSource} and emits that
* Maps the {@code onSuccess}, {@code onError} or {@code onComplete} signals of the current {@code Maybe} into a {@link MaybeSource} and emits that
* {@code MaybeSource}'s signals.
* <p>
* <img width="640" height="354" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMap.mmm.png" alt="">
Expand Down Expand Up @@ -3691,7 +3691,7 @@ public final <R> Maybe<R> flatMap(
* Returns a {@code Maybe} that emits the results of a specified function to the pair of values emitted by the
* current {@code Maybe} and a specified mapped {@link MaybeSource}.
* <p>
* <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMap.r.png" alt="">
* <img width="640" height="268" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMap.combiner.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down
66 changes: 66 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -3196,6 +3196,72 @@ public final <R> Single<R> flatMap(@NonNull Function<? super T, ? extends Single
return RxJavaPlugins.onAssembly(new SingleFlatMap<>(this, mapper));
}

/**
* Returns a {@code Single} that emits the results of a specified function to the pair of values emitted by the
* current {@code Single} and a specified mapped {@link SingleSource}.
* <p>
* <img width="640" height="268" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.combiner.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U>
* the type of items emitted by the {@code SingleSource} returned by the {@code mapper} function
* @param <R>
* the type of items emitted by the resulting {@code Single}
* @param mapper
* a function that returns a {@code SingleSource} for the item emitted by the current {@code Single}
* @param combiner
* a function that combines one item emitted by each of the source and collection {@code SingleSource} and
* returns an item to be emitted by the resulting {@code SingleSource}
* @return the new {@code Single} instance
* @throws NullPointerException if {@code mapper} or {@code combiner} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <U, R> Single<R> flatMap(@NonNull Function<? super T, ? extends SingleSource<? extends U>> mapper,
@NonNull BiFunction<? super T, ? super U, ? extends R> combiner) {
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(combiner, "combiner is null");
return RxJavaPlugins.onAssembly(new SingleFlatMapBiSelector<>(this, mapper, combiner));
}

/**
* Maps the {@code onSuccess} or {@code onError} signals of the current {@code Single} into a {@link SingleSource} and emits that
* {@code SingleSource}'s signals.
* <p>
* <img width="640" height="449" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.notification.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R>
* the result type
* @param onSuccessMapper
* a function that returns a {@code SingleSource} to merge for the {@code onSuccess} item emitted by this {@code Single}
* @param onErrorMapper
* a function that returns a {@code SingleSource} to merge for an {@code onError} notification from this {@code Single}
* @return the new {@code Single} instance
* @throws NullPointerException if {@code onSuccessMapper} or {@code onErrorMapper} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> flatMap(
@NonNull Function<? super T, ? extends SingleSource<? extends R>> onSuccessMapper,
@NonNull Function<? super Throwable, ? extends SingleSource<? extends R>> onErrorMapper) {
Objects.requireNonNull(onSuccessMapper, "onSuccessMapper is null");
Objects.requireNonNull(onErrorMapper, "onErrorMapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMapNotification<>(this, onSuccessMapper, onErrorMapper));
}

/**
* Returns a {@link Maybe} that is based on applying a specified function to the item emitted by the current {@code Single},
* where that function returns a {@link MaybeSource}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ public void onSuccess(T value) {
return;
}

source.subscribe(new InnerObserver());
if (!isDisposed()) {
source.subscribe(new InnerObserver());
}
}

@Override
Expand All @@ -124,7 +126,9 @@ public void onError(Throwable e) {
return;
}

source.subscribe(new InnerObserver());
if (!isDisposed()) {
source.subscribe(new InnerObserver());
}
}

@Override
Expand All @@ -139,7 +143,9 @@ public void onComplete() {
return;
}

source.subscribe(new InnerObserver());
if (!isDisposed()) {
source.subscribe(new InnerObserver());
}
}

final class InnerObserver implements MaybeObserver<R> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public void onSuccess(T value) {
return;
}

ss.subscribe(new FlatMapSingleObserver<R>(this, downstream));
if (!isDisposed()) {
ss.subscribe(new FlatMapSingleObserver<R>(this, downstream));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public void onSuccess(T t) {
return;
}

o.subscribe(this);
if (!isDisposed()) {
o.subscribe(this);
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ public void onSuccess(T t) {
return;
}

p.subscribe(this);
if (get() != SubscriptionHelper.CANCELLED) {
p.subscribe(this);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public void onSuccess(T t) {
return;
}

o.subscribe(this);
if (!isDisposed()) {
o.subscribe(this);
}
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.single;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;

/**
* Maps a source item to another SingleSource then calls a BiFunction with the
* original item and the secondary item to generate the final result.
*
* @param <T> the main value type
* @param <U> the second value type
* @param <R> the result value type
* @since 3.0.0
*/
public final class SingleFlatMapBiSelector<T, U, R> extends Single<R> {

final SingleSource<T> source;

final Function<? super T, ? extends SingleSource<? extends U>> mapper;

final BiFunction<? super T, ? super U, ? extends R> resultSelector;

public SingleFlatMapBiSelector(SingleSource<T> source,
Function<? super T, ? extends SingleSource<? extends U>> mapper,
BiFunction<? super T, ? super U, ? extends R> resultSelector) {
this.source = source;
this.mapper = mapper;
this.resultSelector = resultSelector;
}

@Override
protected void subscribeActual(SingleObserver<? super R> observer) {
source.subscribe(new FlatMapBiMainObserver<T, U, R>(observer, mapper, resultSelector));
}

static final class FlatMapBiMainObserver<T, U, R>
implements SingleObserver<T>, Disposable {

final Function<? super T, ? extends SingleSource<? extends U>> mapper;

final InnerObserver<T, U, R> inner;

FlatMapBiMainObserver(SingleObserver<? super R> actual,
Function<? super T, ? extends SingleSource<? extends U>> mapper,
BiFunction<? super T, ? super U, ? extends R> resultSelector) {
this.inner = new InnerObserver<>(actual, resultSelector);
this.mapper = mapper;
}

@Override
public void dispose() {
DisposableHelper.dispose(inner);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(inner.get());
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(inner, d)) {
inner.downstream.onSubscribe(this);
}
}

@Override
public void onSuccess(T value) {
SingleSource<? extends U> next;

try {
next = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null MaybeSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
inner.downstream.onError(ex);
return;
}

if (DisposableHelper.replace(inner, null)) {
inner.value = value;
next.subscribe(inner);
}
}

@Override
public void onError(Throwable e) {
inner.downstream.onError(e);
}

static final class InnerObserver<T, U, R>
extends AtomicReference<Disposable>
implements SingleObserver<U> {

private static final long serialVersionUID = -2897979525538174559L;

final SingleObserver<? super R> downstream;

final BiFunction<? super T, ? super U, ? extends R> resultSelector;

T value;

InnerObserver(SingleObserver<? super R> actual,
BiFunction<? super T, ? super U, ? extends R> resultSelector) {
this.downstream = actual;
this.resultSelector = resultSelector;
}

@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}

@Override
public void onSuccess(U value) {
T t = this.value;
this.value = null;

R r;

try {
r = Objects.requireNonNull(resultSelector.apply(t, value), "The resultSelector returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}

downstream.onSuccess(r);
}

@Override
public void onError(Throwable e) {
downstream.onError(e);
}
}
}
}
Loading