From bff3e63855b523260db69f19d70e7069dccf2047 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 18 Dec 2019 14:44:40 +0100 Subject: [PATCH 1/2] 3.x: [Java 8] Implement mapOptional, collector, first/last/single stage --- .../io/reactivex/rxjava3/core/Flowable.java | 260 +++++++++- .../jdk8/FlowableCollectWithCollector.java | 153 ++++++ .../FlowableCollectWithCollectorSingle.java | 164 ++++++ .../jdk8/FlowableFirstStageSubscriber.java | 59 +++ .../jdk8/FlowableLastStageSubscriber.java | 62 +++ .../internal/jdk8/FlowableMapOptional.java | 179 +++++++ .../jdk8/FlowableSingleStageSubscriber.java | 68 +++ .../jdk8/FlowableStageSubscriber.java | 80 +++ .../jdk8/CollectWithCollectorTckTest.java | 45 ++ .../FlowableCollectWithCollectorTest.java | 420 ++++++++++++++++ .../jdk8/FlowableMapOptionalTest.java | 470 +++++++++++++++++ .../FlowableStageSubscriberOrDefaultTest.java | 476 ++++++++++++++++++ .../FlowableStageSubscriberOrErrorTest.java | 470 +++++++++++++++++ .../internal/jdk8/MapOptionalTckTest.java | 38 ++ .../rxjava3/testsupport/TestHelper.java | 17 + .../ParamValidationCheckerTest.java | 6 + 16 files changed, 2966 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollector.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollectorSingle.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFirstStageSubscriber.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableLastStageSubscriber.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableMapOptional.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableSingleStageSubscriber.java create mode 100644 src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriber.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/CollectWithCollectorTckTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollectorTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableMapOptionalTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriberOrDefaultTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriberOrErrorTest.java create mode 100644 src/test/java/io/reactivex/rxjava3/internal/jdk8/MapOptionalTckTest.java diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 514845efbc..4177a2d9e6 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -6930,6 +6930,7 @@ public final Flowable cast(final Class clazz) { * @return a Single that emits the result of collecting the values emitted by the source Publisher * into a single mutable data structure * @see ReactiveX operators documentation: Reduce + * @see #collect(Collector) */ @CheckReturnValue @NonNull @@ -11256,12 +11257,13 @@ public final Flowable lift(FlowableOperator lifte * @return a Flowable that emits the items from the source Publisher, transformed by the specified * function * @see ReactiveX operators documentation: Map + * @see #mapOptional(Function) */ @CheckReturnValue @NonNull @BackpressureSupport(BackpressureKind.PASS_THROUGH) @SchedulerSupport(SchedulerSupport.NONE) - public final Flowable map(Function mapper) { + public final <@NonNull R> Flowable map(@NonNull Function mapper) { Objects.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new FlowableMap(this, mapper)); } @@ -18739,4 +18741,260 @@ public final TestSubscriber test(long initialRequest, boolean cancel) { // No Objects.requireNonNull(stream, "stream is null"); return RxJavaPlugins.onAssembly(new FlowableFromStream<>(stream)); } + + /** + * Maps each upstream value into an {@link Optional} and emits the contained item if not empty. + *

+ * + * + *

+ *
Backpressure:
+ *
The operator is a pass-through for downstream requests but issues {@code request(1)} whenever the + * mapped {@code Optional} is empty.
+ *
Scheduler:
+ *
{@code mapOptional} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the non-null output type + * @param mapper the function that receives the upstream item and should return a non-empty {@code Optional} + * to emit as the output or an empty {@code Optional} to skip to the next upstream value + * @return the new Flowable instance + * @since 3.0.0 + * @see #map(Function) + * @see #filter(Predicate) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final <@NonNull R> Flowable mapOptional(@NonNull Function> mapper) { + Objects.requireNonNull(mapper, "mapper is null"); + return RxJavaPlugins.onAssembly(new FlowableMapOptional<>(this, mapper)); + } + + /** + * Collects the finite upstream's values into a container via a Stream {@link Collector} callback set and emits + * it as the success result. + *

+ * + * + *

+ *
Backpressure:
+ *
The operator consumes the upstream in an unbounded manner.
+ *
Scheduler:
+ *
{@code collect} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the non-null result type + * @param the intermediate container type used for the accumulation + * @param collector the interface defining the container supplier, accumulator and finisher functions; + * see {@link Collectors} for some standard implementations + * @return the new Single instance + * @since 3.0.0 + * @see Collectors + * @see #collect(Supplier, BiConsumer) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final <@NonNull R, A> Single collect(@NonNull Collector collector) { + Objects.requireNonNull(collector, "collector is null"); + return RxJavaPlugins.onAssembly(new FlowableCollectWithCollectorSingle<>(this, collector)); + } + + /** + * Signals the first upstream item (or the default item if the upstream is empty) via + * a {@link CompletionStage}. + *

+ * + *

+ * The upstream can be canceled by converting the resulting {@code CompletionStage} into + * {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and + * calling {@link CompletableFuture#cancel(boolean)} on it. + * The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and + * completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}. + *

+ * {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use + * a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}: + *


+     * CompletionStage<Optional<T>> stage = source.map(Optional::of).firstStage(Optional.empty());
+     * 
+ *
+ *
Backpressure:
+ *
The operator requests one item from upstream and then when received, cancels the upstream.
+ *
Scheduler:
+ *
{@code firstStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param defaultItem the item to signal if the upstream is empty + * @return the new CompletionStage instance + * @since 3.0.0 + * @see #firstOrErrorStage() + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final CompletionStage firstStage(@Nullable T defaultItem) { + return subscribeWith(new FlowableFirstStageSubscriber<>(true, defaultItem)); + } + + /** + * Signals the only expected upstream item (or the default item if the upstream is empty) + * or signals {@link IllegalArgumentException} if the upstream has more than one item + * via a {@link CompletionStage}. + *

+ * + *

+ * The upstream can be canceled by converting the resulting {@code CompletionStage} into + * {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and + * calling {@link CompletableFuture#cancel(boolean)} on it. + * The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and + * completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}. + *

+ * {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use + * a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}: + *


+     * CompletionStage<Optional<T>> stage = source.map(Optional::of).singleStage(Optional.empty());
+     * 
+ *
+ *
Backpressure:
+ *
The operator requests two items from upstream and then when more than one item is received, cancels the upstream.
+ *
Scheduler:
+ *
{@code singleStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param defaultItem the item to signal if the upstream is empty + * @return the new CompletionStage instance + * @since 3.0.0 + * @see #singleOrErrorStage() + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final CompletionStage singleStage(@Nullable T defaultItem) { + return subscribeWith(new FlowableSingleStageSubscriber<>(true, defaultItem)); + } + + /** + * Signals the last upstream item (or the default item if the upstream is empty) via + * a {@link CompletionStage}. + *

+ * + *

+ * The upstream can be canceled by converting the resulting {@code CompletionStage} into + * {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and + * calling {@link CompletableFuture#cancel(boolean)} on it. + * The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and + * completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}. + *

+ * {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use + * a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}: + *


+     * CompletionStage<Optional<T>> stage = source.map(Optional::of).lastStage(Optional.empty());
+     * 
+ *
+ *
Backpressure:
+ *
The operator requests an unbounded number of items from the upstream.
+ *
Scheduler:
+ *
{@code lastStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param defaultItem the item to signal if the upstream is empty + * @return the new CompletionStage instance + * @since 3.0.0 + * @see #lastOrErrorStage() + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final CompletionStage lastStage(@Nullable T defaultItem) { + return subscribeWith(new FlowableLastStageSubscriber<>(true, defaultItem)); + } + + /** + * Signals the first upstream item or a {@link NoSuchElementException} if the upstream is empty via + * a {@link CompletionStage}. + *

+ * + *

+ * The upstream can be canceled by converting the resulting {@code CompletionStage} into + * {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and + * calling {@link CompletableFuture#cancel(boolean)} on it. + * The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and + * completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}. + *

+ *
Backpressure:
+ *
The operator requests one item from upstream and then when received, cancels the upstream.
+ *
Scheduler:
+ *
{@code firstOrErrorStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new CompletionStage instance + * @since 3.0.0 + * @see #firstStage(Object) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final CompletionStage firstOrErrorStage() { + return subscribeWith(new FlowableFirstStageSubscriber<>(false, null)); + } + + /** + * Signals the only expected upstream item, a {@link NoSuchElementException} if the upstream is empty + * or signals {@link IllegalArgumentException} if the upstream has more than one item + * via a {@link CompletionStage}. + *

+ * + *

+ * The upstream can be canceled by converting the resulting {@code CompletionStage} into + * {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and + * calling {@link CompletableFuture#cancel(boolean)} on it. + * The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and + * completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}. + *

+ *
Backpressure:
+ *
The operator requests two items from upstream and then when more than one item is received, cancels the upstream.
+ *
Scheduler:
+ *
{@code singleOrErrorStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new CompletionStage instance + * @since 3.0.0 + * @see #singleStage(Object) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final CompletionStage singleOrErrorStage() { + return subscribeWith(new FlowableSingleStageSubscriber<>(false, null)); + } + + /** + * Signals the last upstream item or a {@link NoSuchElementException} if the upstream is empty via + * a {@link CompletionStage}. + *

+ * + *

+ * The upstream can be canceled by converting the resulting {@code CompletionStage} into + * {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and + * calling {@link CompletableFuture#cancel(boolean)} on it. + * The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and + * completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}. + *

+ *
Backpressure:
+ *
The operator requests an unbounded number of items from the upstream.
+ *
Scheduler:
+ *
{@code lastOrErrorStage} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new CompletionStage instance + * @since 3.0.0 + * @see #lastStage(Object) + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final CompletionStage lastOrErrorStage() { + return subscribeWith(new FlowableLastStageSubscriber<>(false, null)); + } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollector.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollector.java new file mode 100644 index 0000000000..bc408db304 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollector.java @@ -0,0 +1,153 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.Objects; +import java.util.function.*; +import java.util.stream.Collector; + +import org.reactivestreams.*; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.internal.subscriptions.*; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Collect items into a container defined by a Stream {@link Collector} callback set. + * + * @param the upstream value type + * @param
the intermediate accumulator type + * @param the result type + * @since 3.0.0 + */ +public final class FlowableCollectWithCollector extends Flowable { + + final Flowable source; + + final Collector collector; + + public FlowableCollectWithCollector(Flowable source, Collector collector) { + this.source = source; + this.collector = collector; + } + + @Override + protected void subscribeActual(@NonNull Subscriber s) { + A container; + BiConsumer accumulator; + Function finisher; + + try { + container = collector.supplier().get(); + accumulator = collector.accumulator(); + finisher = collector.finisher(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptySubscription.error(ex, s); + return; + } + + source.subscribe(new CollectorSubscriber<>(s, container, accumulator, finisher)); + } + + static final class CollectorSubscriber + extends DeferredScalarSubscription + implements FlowableSubscriber { + + private static final long serialVersionUID = -229544830565448758L; + + final BiConsumer accumulator; + + final Function finisher; + + Subscription upstream; + + boolean done; + + A container; + + CollectorSubscriber(Subscriber downstream, A container, BiConsumer accumulator, Function finisher) { + super(downstream); + this.container = container; + this.accumulator = accumulator; + this.finisher = finisher; + } + + @Override + public void onSubscribe(@NonNull Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + this.upstream = s; + + downstream.onSubscribe(this); + + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + try { + accumulator.accept(container, t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + onError(ex); + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + } else { + done = true; + upstream = SubscriptionHelper.CANCELLED; + this.container = null; + downstream.onError(t); + } + } + + @Override + public void onComplete() { + if (done) { + return; + } + + done = true; + upstream = SubscriptionHelper.CANCELLED; + A container = this.container; + this.container = null; + R result; + try { + result = Objects.requireNonNull(finisher.apply(container), "The finisher returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + + complete(result); + } + + @Override + public void cancel() { + super.cancel(); + upstream.cancel(); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollectorSingle.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollectorSingle.java new file mode 100644 index 0000000000..134de68415 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollectorSingle.java @@ -0,0 +1,164 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.Objects; +import java.util.function.*; +import java.util.stream.Collector; + +import org.reactivestreams.Subscription; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; +import io.reactivex.rxjava3.internal.fuseable.FuseToFlowable; +import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Collect items into a container defined by a Stream {@link Collector} callback set. + * + * @param the upstream value type + * @param the intermediate accumulator type + * @param the result type + * @since 3.0.0 + */ +public final class FlowableCollectWithCollectorSingle extends Single implements FuseToFlowable { + + final Flowable source; + + final Collector collector; + + public FlowableCollectWithCollectorSingle(Flowable source, Collector collector) { + this.source = source; + this.collector = collector; + } + + @Override + public Flowable fuseToFlowable() { + return new FlowableCollectWithCollector<>(source, collector); + } + + @Override + protected void subscribeActual(@NonNull SingleObserver observer) { + A container; + BiConsumer accumulator; + Function finisher; + + try { + container = collector.supplier().get(); + accumulator = collector.accumulator(); + finisher = collector.finisher(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + EmptyDisposable.error(ex, observer); + return; + } + + source.subscribe(new CollectorSingleObserver<>(observer, container, accumulator, finisher)); + } + + static final class CollectorSingleObserver implements FlowableSubscriber, Disposable { + + final SingleObserver downstream; + + final BiConsumer accumulator; + + final Function finisher; + + Subscription upstream; + + boolean done; + + A container; + + CollectorSingleObserver(SingleObserver downstream, A container, BiConsumer accumulator, Function finisher) { + this.downstream = downstream; + this.container = container; + this.accumulator = accumulator; + this.finisher = finisher; + } + + @Override + public void onSubscribe(@NonNull Subscription s) { + if (SubscriptionHelper.validate(this.upstream, s)) { + this.upstream = s; + + downstream.onSubscribe(this); + + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(T t) { + if (done) { + return; + } + try { + accumulator.accept(container, t); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + onError(ex); + } + } + + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + } else { + done = true; + upstream = SubscriptionHelper.CANCELLED; + this.container = null; + downstream.onError(t); + } + } + + @Override + public void onComplete() { + if (done) { + return; + } + + done = true; + upstream = SubscriptionHelper.CANCELLED; + A container = this.container; + this.container = null; + R result; + try { + result = Objects.requireNonNull(finisher.apply(container), "The finisher returned a null value"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + + downstream.onSuccess(result); + } + + @Override + public void dispose() { + upstream.cancel(); + upstream = SubscriptionHelper.CANCELLED; + } + + @Override + public boolean isDisposed() { + return upstream == SubscriptionHelper.CANCELLED; + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFirstStageSubscriber.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFirstStageSubscriber.java new file mode 100644 index 0000000000..5af6cc5aac --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFirstStageSubscriber.java @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.NoSuchElementException; + +import org.reactivestreams.Subscription; + +/** + * Signals the first element of the source via the underlying CompletableFuture, + * signals the a default item if the upstream is empty or signals {@link NoSuchElementException}. + * + * @param the element type + * @since 3.0.0 + */ +public final class FlowableFirstStageSubscriber extends FlowableStageSubscriber { + + final boolean hasDefault; + + final T defaultItem; + + public FlowableFirstStageSubscriber(boolean hasDefault, T defaultItem) { + this.hasDefault = hasDefault; + this.defaultItem = defaultItem; + } + + @Override + public void onNext(T t) { + complete(t); + } + + @Override + public void onComplete() { + if (!isDone()) { + clear(); + if (hasDefault) { + complete(defaultItem); + } else { + completeExceptionally(new NoSuchElementException()); + } + } + } + + @Override + protected void afterSubscribe(Subscription s) { + s.request(1); + } + +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableLastStageSubscriber.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableLastStageSubscriber.java new file mode 100644 index 0000000000..c9a93eeb0e --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableLastStageSubscriber.java @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.NoSuchElementException; + +import org.reactivestreams.Subscription; + +/** + * Signals the last element of the source via the underlying CompletableFuture, + * signals the a default item if the upstream is empty or signals {@link NoSuchElementException}. + * + * @param the element type + * @since 3.0.0 + */ +public final class FlowableLastStageSubscriber extends FlowableStageSubscriber { + + final boolean hasDefault; + + final T defaultItem; + + public FlowableLastStageSubscriber(boolean hasDefault, T defaultItem) { + this.hasDefault = hasDefault; + this.defaultItem = defaultItem; + } + + @Override + public void onNext(T t) { + value = t; + } + + @Override + public void onComplete() { + if (!isDone()) { + T v = value; + clear(); + if (v != null) { + complete(v); + } else if (hasDefault) { + complete(defaultItem); + } else { + completeExceptionally(new NoSuchElementException()); + } + } + } + + @Override + protected void afterSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableMapOptional.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableMapOptional.java new file mode 100644 index 0000000000..3fdcd40a4f --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableMapOptional.java @@ -0,0 +1,179 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.*; + +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.internal.subscribers.*; + +/** + * Map the upstream values into an Optional and emit its value if any. + * @param the upstream element type + * @param the output element type + * @since 3.0.0 + */ +public final class FlowableMapOptional extends Flowable { + + final Flowable source; + + final Function> mapper; + + public FlowableMapOptional(Flowable source, Function> mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(Subscriber s) { + if (s instanceof ConditionalSubscriber) { + source.subscribe(new MapOptionalConditionalSubscriber<>((ConditionalSubscriber)s, mapper)); + } else { + source.subscribe(new MapOptionalSubscriber<>(s, mapper)); + } + } + + static final class MapOptionalSubscriber extends BasicFuseableSubscriber + implements ConditionalSubscriber { + + final Function> mapper; + + MapOptionalSubscriber(Subscriber downstream, Function> mapper) { + super(downstream); + this.mapper = mapper; + } + + @Override + public void onNext(T t) { + if (!tryOnNext(t)) { + upstream.request(1); + } + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return true; + } + + if (sourceMode != NONE) { + downstream.onNext(null); + return true; + } + + Optional result; + try { + result = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Optional"); + } catch (Throwable ex) { + fail(ex); + return true; + } + + if (result.isPresent()) { + downstream.onNext(result.get()); + return true; + } + return false; + } + + @Override + public int requestFusion(int mode) { + return transitiveBoundaryFusion(mode); + } + + @Override + public R poll() throws Throwable { + for (;;) { + T item = qs.poll(); + if (item == null) { + return null; + } + Optional result = Objects.requireNonNull(mapper.apply(item), "The mapper returned a null Optional"); + if (result.isPresent()) { + return result.get(); + } + if (sourceMode == ASYNC) { + qs.request(1); + } + } + } + } + + static final class MapOptionalConditionalSubscriber extends BasicFuseableConditionalSubscriber { + + final Function> mapper; + + MapOptionalConditionalSubscriber(ConditionalSubscriber downstream, Function> mapper) { + super(downstream); + this.mapper = mapper; + } + + @Override + public void onNext(T t) { + if (!tryOnNext(t)) { + upstream.request(1); + } + } + + @Override + public boolean tryOnNext(T t) { + if (done) { + return true; + } + + if (sourceMode != NONE) { + downstream.onNext(null); + return true; + } + + Optional result; + try { + result = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Optional"); + } catch (Throwable ex) { + fail(ex); + return true; + } + + if (result.isPresent()) { + return downstream.tryOnNext(result.get()); + } + return false; + } + + @Override + public int requestFusion(int mode) { + return transitiveBoundaryFusion(mode); + } + + @Override + public R poll() throws Throwable { + for (;;) { + T item = qs.poll(); + if (item == null) { + return null; + } + Optional result = Objects.requireNonNull(mapper.apply(item), "The mapper returned a null Optional"); + if (result.isPresent()) { + return result.get(); + } + if (sourceMode == ASYNC) { + qs.request(1); + } + } + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableSingleStageSubscriber.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableSingleStageSubscriber.java new file mode 100644 index 0000000000..8040ec6f5d --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableSingleStageSubscriber.java @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.NoSuchElementException; + +import org.reactivestreams.Subscription; + +/** + * Signals the only element of the source via the underlying CompletableFuture, + * signals the a default item if the upstream is empty or signals {@link IllegalArgumentException} + * if the upstream has more than one item. + * + * @param the element type + * @since 3.0.0 + */ +public final class FlowableSingleStageSubscriber extends FlowableStageSubscriber { + + final boolean hasDefault; + + final T defaultItem; + + public FlowableSingleStageSubscriber(boolean hasDefault, T defaultItem) { + this.hasDefault = hasDefault; + this.defaultItem = defaultItem; + } + + @Override + public void onNext(T t) { + if (value != null) { + value = null; + completeExceptionally(new IllegalArgumentException("Sequence contains more than one element!")); + } else { + value = t; + } + } + + @Override + public void onComplete() { + if (!isDone()) { + T v = value; + clear(); + if (v != null) { + complete(v); + } else if (hasDefault) { + complete(defaultItem); + } else { + completeExceptionally(new NoSuchElementException()); + } + } + } + + @Override + protected void afterSubscribe(Subscription s) { + s.request(2); + } + +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriber.java b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriber.java new file mode 100644 index 0000000000..6594b7ba5c --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriber.java @@ -0,0 +1,80 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import org.reactivestreams.Subscription; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; + +/** + * Base class that extends CompletableFuture and provides basic infrastructure + * to notify watchers upon upstream signals. + * @param the element type + * @since 3.0.0 + */ +abstract class FlowableStageSubscriber extends CompletableFuture implements FlowableSubscriber { + + final AtomicReference upstream = new AtomicReference<>(); + + T value; + + @Override + public final void onSubscribe(@NonNull Subscription s) { + if (SubscriptionHelper.setOnce(upstream, s)) { + afterSubscribe(s); + } + } + + protected abstract void afterSubscribe(Subscription s); + + @Override + public final void onError(Throwable t) { + clear(); + if (!completeExceptionally(t)) { + RxJavaPlugins.onError(t); + } + } + + protected final void cancelUpstream() { + SubscriptionHelper.cancel(upstream); + } + + protected final void clear() { + value = null; + upstream.lazySet(SubscriptionHelper.CANCELLED); + } + + @Override + public final boolean cancel(boolean mayInterruptIfRunning) { + cancelUpstream(); + return super.cancel(mayInterruptIfRunning); + } + + @Override + public final boolean complete(T value) { + cancelUpstream(); + return super.complete(value); + } + + @Override + public final boolean completeExceptionally(Throwable ex) { + cancelUpstream(); + return super.completeExceptionally(ex); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/CollectWithCollectorTckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/CollectWithCollectorTckTest.java new file mode 100644 index 0000000000..99fb2dcff9 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/CollectWithCollectorTckTest.java @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.List; +import java.util.stream.Collectors; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class CollectWithCollectorTckTest extends BaseTck> { + + @Override + public Publisher> createPublisher(final long elements) { + return + Flowable.range(0, (int)elements).collect(Collectors.toList()).toFlowable() + ; + } + + @Override + public Publisher> createFailedPublisher() { + return Flowable.error(new TestException()).collect(Collectors.toList()).toFlowable(); + } + + @Override + public long maxElementsFromPublisher() { + return 1; + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollectorTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollectorTest.java new file mode 100644 index 0000000000..ecef7665e0 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollectorTest.java @@ -0,0 +1,420 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.util.*; +import java.util.function.*; +import java.util.stream.*; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.processors.*; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class FlowableCollectWithCollectorTest extends RxJavaTest { + + @Test + public void basic() { + Flowable.range(1, 5) + .collect(Collectors.toList()) + .test() + .assertResult(Arrays.asList(1, 2, 3, 4, 5)); + } + + @Test + public void empty() { + Flowable.empty() + .collect(Collectors.toList()) + .test() + .assertResult(Collections.emptyList()); + } + + @Test + public void error() { + Flowable.error(new TestException()) + .collect(Collectors.toList()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void collectorSupplierCrash() { + Flowable.range(1, 5) + .collect(new Collector() { + + @Override + public Supplier supplier() { + throw new TestException(); + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> a; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void collectorAccumulatorCrash() { + BehaviorProcessor source = BehaviorProcessor.createDefault(1); + + source + .collect(new Collector() { + + @Override + public Supplier supplier() { + return () -> 1; + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { throw new TestException(); }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> a; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .test() + .assertFailure(TestException.class); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void collectorFinisherCrash() { + Flowable.range(1, 5) + .collect(new Collector() { + + @Override + public Supplier supplier() { + return () -> 1; + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> { throw new TestException(); }; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void collectorAccumulatorDropSignals() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Flowable source = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + s.onError(new IOException()); + s.onComplete(); + } + }; + + source + .collect(new Collector() { + + @Override + public Supplier supplier() { + return () -> 1; + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { throw new TestException(); }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> a; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .test() + .assertFailure(TestException.class); + + TestHelper.assertUndeliverable(errors, 0, IOException.class); + }); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(PublishProcessor.create() + .collect(Collectors.toList())); + } + + @Test + public void onSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowableToSingle(f -> f.collect(Collectors.toList())); + } + + @Test + public void basicToFlowable() { + Flowable.range(1, 5) + .collect(Collectors.toList()) + .toFlowable() + .test() + .assertResult(Arrays.asList(1, 2, 3, 4, 5)); + } + + @Test + public void emptyToFlowable() { + Flowable.empty() + .collect(Collectors.toList()) + .toFlowable() + .test() + .assertResult(Collections.emptyList()); + } + + @Test + public void errorToFlowable() { + Flowable.error(new TestException()) + .collect(Collectors.toList()) + .toFlowable() + .test() + .assertFailure(TestException.class); + } + + @Test + public void collectorSupplierCrashToFlowable() { + Flowable.range(1, 5) + .collect(new Collector() { + + @Override + public Supplier supplier() { + throw new TestException(); + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> a; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .toFlowable() + .test() + .assertFailure(TestException.class); + } + + @Test + public void collectorAccumulatorCrashToFlowable() { + BehaviorProcessor source = BehaviorProcessor.createDefault(1); + + source + .collect(new Collector() { + + @Override + public Supplier supplier() { + return () -> 1; + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { throw new TestException(); }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> a; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .toFlowable() + .test() + .assertFailure(TestException.class); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void collectorFinisherCrashToFlowable() { + Flowable.range(1, 5) + .collect(new Collector() { + + @Override + public Supplier supplier() { + return () -> 1; + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> { throw new TestException(); }; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .toFlowable() + .test() + .assertFailure(TestException.class); + } + + @Test + public void collectorAccumulatorDropSignalsToFlowable() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Flowable source = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + s.onError(new IOException()); + s.onComplete(); + } + }; + + source + .collect(new Collector() { + + @Override + public Supplier supplier() { + return () -> 1; + } + + @Override + public BiConsumer accumulator() { + return (a, b) -> { throw new TestException(); }; + } + + @Override + public BinaryOperator combiner() { + return (a, b) -> a + b; + } + + @Override + public Function finisher() { + return a -> a; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } + }) + .toFlowable() + .test() + .assertFailure(TestException.class); + + TestHelper.assertUndeliverable(errors, 0, IOException.class); + }); + } + + @Test + public void disposeToFlowable() { + TestHelper.checkDisposed(PublishProcessor.create() + .collect(Collectors.toList()).toFlowable()); + } + + @Test + public void onSubscribeToFlowable() { + TestHelper.checkDoubleOnSubscribeFlowable(f -> f.collect(Collectors.toList()).toFlowable()); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableMapOptionalTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableMapOptionalTest.java new file mode 100644 index 0000000000..85fc8d75be --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableMapOptionalTest.java @@ -0,0 +1,470 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.assertFalse; + +import java.util.Optional; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.internal.fuseable.QueueFuseable; +import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.processors.*; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class FlowableMapOptionalTest extends RxJavaTest { + + static final Function> MODULO = v -> v % 2 == 0 ? Optional.of(v) : Optional.empty(); + + @Test + public void allPresent() { + Flowable.range(1, 5) + .mapOptional(Optional::of) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void allEmpty() { + Flowable.range(1, 5) + .mapOptional(v -> Optional.empty()) + .test() + .assertResult(); + } + + @Test + public void mixed() { + Flowable.range(1, 10) + .mapOptional(MODULO) + .test() + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void mapperChash() { + BehaviorProcessor source = BehaviorProcessor.createDefault(1); + + source + .mapOptional(v -> { throw new TestException(); }) + .test() + .assertFailure(TestException.class); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void mapperNull() { + BehaviorProcessor source = BehaviorProcessor.createDefault(1); + + source + .mapOptional(v -> null) + .test() + .assertFailure(NullPointerException.class); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void crashDropsOnNexts() { + Flowable source = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + } + }; + + source + .mapOptional(v -> { throw new TestException(); }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void backpressureAll() { + Flowable.range(1, 5) + .mapOptional(Optional::of) + .test(0L) + .assertEmpty() + .requestMore(2) + .assertValuesOnly(1, 2) + .requestMore(2) + .assertValuesOnly(1, 2, 3, 4) + .requestMore(1) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void backpressureNone() { + Flowable.range(1, 5) + .mapOptional(v -> Optional.empty()) + .test(1L) + .assertResult(); + } + + @Test + public void backpressureMixed() { + Flowable.range(1, 10) + .mapOptional(MODULO) + .test(0L) + .assertEmpty() + .requestMore(2) + .assertValuesOnly(2, 4) + .requestMore(2) + .assertValuesOnly(2, 4, 6, 8) + .requestMore(1) + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void syncFusedAll() { + Flowable.range(1, 5) + .mapOptional(Optional::of) + .to(TestHelper.testConsumer(false, QueueFuseable.SYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.SYNC) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void asyncFusedAll() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .mapOptional(Optional::of) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void boundaryFusedAll() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .mapOptional(Optional::of) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC | QueueFuseable.BOUNDARY)) + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void syncFusedNone() { + Flowable.range(1, 5) + .mapOptional(v -> Optional.empty()) + .to(TestHelper.testConsumer(false, QueueFuseable.SYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.SYNC) + .assertResult(); + } + + @Test + public void asyncFusedNone() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .mapOptional(v -> Optional.empty()) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + } + + @Test + public void boundaryFusedNone() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .mapOptional(v -> Optional.empty()) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC | QueueFuseable.BOUNDARY)) + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(); + } + + @Test + public void syncFusedMixed() { + Flowable.range(1, 10) + .mapOptional(MODULO) + .to(TestHelper.testConsumer(false, QueueFuseable.SYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.SYNC) + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void asyncFusedMixed() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + up + .mapOptional(MODULO) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void boundaryFusedMixed() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + up + .mapOptional(MODULO) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC | QueueFuseable.BOUNDARY)) + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void allPresentConditional() { + Flowable.range(1, 5) + .mapOptional(Optional::of) + .filter(v -> true) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void allEmptyConditional() { + Flowable.range(1, 5) + .mapOptional(v -> Optional.empty()) + .filter(v -> true) + .test() + .assertResult(); + } + + @Test + public void mixedConditional() { + Flowable.range(1, 10) + .mapOptional(MODULO) + .filter(v -> true) + .test() + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void mapperChashConditional() { + BehaviorProcessor source = BehaviorProcessor.createDefault(1); + + source + .mapOptional(v -> { throw new TestException(); }) + .filter(v -> true) + .test() + .assertFailure(TestException.class); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void mapperNullConditional() { + BehaviorProcessor source = BehaviorProcessor.createDefault(1); + + source + .mapOptional(v -> null) + .filter(v -> true) + .test() + .assertFailure(NullPointerException.class); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void crashDropsOnNextsConditional() { + Flowable source = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + } + }; + + source + .mapOptional(v -> { throw new TestException(); }) + .filter(v -> true) + .test() + .assertFailure(TestException.class); + } + + @Test + public void backpressureAllConditional() { + Flowable.range(1, 5) + .mapOptional(Optional::of) + .filter(v -> true) + .test(0L) + .assertEmpty() + .requestMore(2) + .assertValuesOnly(1, 2) + .requestMore(2) + .assertValuesOnly(1, 2, 3, 4) + .requestMore(1) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void backpressureNoneConditional() { + Flowable.range(1, 5) + .mapOptional(v -> Optional.empty()) + .filter(v -> true) + .test(1L) + .assertResult(); + } + + @Test + public void backpressureMixedConditional() { + Flowable.range(1, 10) + .mapOptional(MODULO) + .filter(v -> true) + .test(0L) + .assertEmpty() + .requestMore(2) + .assertValuesOnly(2, 4) + .requestMore(2) + .assertValuesOnly(2, 4, 6, 8) + .requestMore(1) + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void syncFusedAllConditional() { + Flowable.range(1, 5) + .mapOptional(Optional::of) + .filter(v -> true) + .to(TestHelper.testConsumer(false, QueueFuseable.SYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.SYNC) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void asyncFusedAllConditional() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .mapOptional(Optional::of) + .filter(v -> true) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void boundaryFusedAllConditiona() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .mapOptional(Optional::of) + .filter(v -> true) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC | QueueFuseable.BOUNDARY)) + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void syncFusedNoneConditional() { + Flowable.range(1, 5) + .mapOptional(v -> Optional.empty()) + .filter(v -> true) + .to(TestHelper.testConsumer(false, QueueFuseable.SYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.SYNC) + .assertResult(); + } + + @Test + public void asyncFusedNoneConditional() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .mapOptional(v -> Optional.empty()) + .filter(v -> true) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(); + } + + @Test + public void boundaryFusedNoneConditional() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .mapOptional(v -> Optional.empty()) + .filter(v -> true) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC | QueueFuseable.BOUNDARY)) + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(); + } + + @Test + public void syncFusedMixedConditional() { + Flowable.range(1, 10) + .mapOptional(MODULO) + .filter(v -> true) + .to(TestHelper.testConsumer(false, QueueFuseable.SYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.SYNC) + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void asyncFusedMixedConditional() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + up + .mapOptional(MODULO) + .filter(v -> true) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC)) + .assertFuseable() + .assertFusionMode(QueueFuseable.ASYNC) + .assertResult(2, 4, 6, 8, 10); + } + + @Test + public void boundaryFusedMixedConditional() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + up + .mapOptional(MODULO) + .filter(v -> true) + .to(TestHelper.testConsumer(false, QueueFuseable.ASYNC | QueueFuseable.BOUNDARY)) + .assertFuseable() + .assertFusionMode(QueueFuseable.NONE) + .assertResult(2, 4, 6, 8, 10); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriberOrDefaultTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriberOrDefaultTest.java new file mode 100644 index 0000000000..882fd83dfb --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriberOrDefaultTest.java @@ -0,0 +1,476 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; + +import java.util.concurrent.*; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.processors.*; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class FlowableStageSubscriberOrDefaultTest extends RxJavaTest { + + @Test + public void firstJust() throws Exception { + Integer v = Flowable.just(1) + .firstStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + } + + @Test + public void firstEmpty() throws Exception { + Integer v = Flowable.empty() + .firstStage(2) + .toCompletableFuture() + .get(); + + assertEquals((Integer)2, v); + } + + @Test + public void firstCancels() throws Exception { + BehaviorProcessor source = BehaviorProcessor.createDefault(1); + + Integer v = source + .firstStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void firstCompletableFutureCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .firstStage(null) + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.cancel(true); + + assertTrue(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void firstCompletableManualCompleteCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .firstStage(null) + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.complete(1); + + assertTrue(cf.isDone()); + assertFalse(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + assertEquals((Integer)1, cf.get()); + } + + @Test + public void firstCompletableManualCompleteExceptionallyCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .firstStage(null) + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.completeExceptionally(new TestException()); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void firstError() throws Exception { + CompletableFuture cf = Flowable.error(new TestException()) + .firstStage(null) + .toCompletableFuture(); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void firstSourceIgnoresCancel() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onError(new TestException()); + s.onComplete(); + } + } + .firstStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void firstDoubleOnSubscribe() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + } + } + .firstStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } + + @Test + public void singleJust() throws Exception { + Integer v = Flowable.just(1) + .singleStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + } + + @Test + public void singleEmpty() throws Exception { + Integer v = Flowable.empty() + .singleStage(2) + .toCompletableFuture() + .get(); + + assertEquals((Integer)2, v); + } + + @Test + public void singleTooManyCancels() throws Exception { + ReplayProcessor source = ReplayProcessor.create(); + source.onNext(1); + source.onNext(2); + + TestHelper.assertError(source + .singleStage(null) + .toCompletableFuture(), IllegalArgumentException.class); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void singleCompletableFutureCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .singleStage(null) + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.cancel(true); + + assertTrue(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void singleCompletableManualCompleteCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .singleStage(null) + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.complete(1); + + assertTrue(cf.isDone()); + assertFalse(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + assertEquals((Integer)1, cf.get()); + } + + @Test + public void singleCompletableManualCompleteExceptionallyCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .singleStage(null) + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.completeExceptionally(new TestException()); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void singleError() throws Exception { + CompletableFuture cf = Flowable.error(new TestException()) + .singleStage(null) + .toCompletableFuture(); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void singleSourceIgnoresCancel() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onComplete(); + s.onError(new TestException()); + s.onComplete(); + } + } + .singleStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void singleDoubleOnSubscribe() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onComplete(); + } + } + .singleStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } + + @Test + public void lastJust() throws Exception { + Integer v = Flowable.just(1) + .lastStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + } + + @Test + public void lastRange() throws Exception { + Integer v = Flowable.range(1, 5) + .lastStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)5, v); + } + + @Test + public void lastEmpty() throws Exception { + Integer v = Flowable.empty() + .lastStage(2) + .toCompletableFuture() + .get(); + + assertEquals((Integer)2, v); + } + + @Test + public void lastCompletableFutureCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .lastStage(null) + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.cancel(true); + + assertTrue(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void lastCompletableManualCompleteCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .lastStage(null) + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.complete(1); + + assertTrue(cf.isDone()); + assertFalse(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + assertEquals((Integer)1, cf.get()); + } + + @Test + public void lastCompletableManualCompleteExceptionallyCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .lastStage(null) + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.completeExceptionally(new TestException()); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void lastError() throws Exception { + CompletableFuture cf = Flowable.error(new TestException()) + .lastStage(null) + .toCompletableFuture(); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void lastSourceIgnoresCancel() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onComplete(); + s.onError(new TestException()); + s.onComplete(); + } + } + .lastStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void lastDoubleOnSubscribe() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onComplete(); + } + } + .lastStage(null) + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriberOrErrorTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriberOrErrorTest.java new file mode 100644 index 0000000000..c34e4739fa --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriberOrErrorTest.java @@ -0,0 +1,470 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import static org.junit.Assert.*; + +import java.util.NoSuchElementException; +import java.util.concurrent.*; + +import org.junit.Test; +import org.reactivestreams.Subscriber; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.processors.*; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class FlowableStageSubscriberOrErrorTest extends RxJavaTest { + + @Test + public void firstJust() throws Exception { + Integer v = Flowable.just(1) + .firstOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + } + + @Test + public void firstEmpty() throws Exception { + TestHelper.assertError( + Flowable.empty() + .firstOrErrorStage() + .toCompletableFuture(), NoSuchElementException.class); + } + + @Test + public void firstCancels() throws Exception { + BehaviorProcessor source = BehaviorProcessor.createDefault(1); + + Integer v = source + .firstOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void firstCompletableFutureCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .firstOrErrorStage() + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.cancel(true); + + assertTrue(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void firstCompletableManualCompleteCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .firstOrErrorStage() + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.complete(1); + + assertTrue(cf.isDone()); + assertFalse(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + assertEquals((Integer)1, cf.get()); + } + + @Test + public void firstCompletableManualCompleteExceptionallyCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .firstOrErrorStage() + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.completeExceptionally(new TestException()); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void firstError() throws Exception { + CompletableFuture cf = Flowable.error(new TestException()) + .firstOrErrorStage() + .toCompletableFuture(); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void firstSourceIgnoresCancel() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onError(new TestException()); + s.onComplete(); + } + } + .firstOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void firstDoubleOnSubscribe() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + } + } + .firstOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } + + @Test + public void singleJust() throws Exception { + Integer v = Flowable.just(1) + .singleOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + } + + @Test + public void singleEmpty() throws Exception { + TestHelper.assertError( + Flowable.empty() + .singleOrErrorStage() + .toCompletableFuture(), NoSuchElementException.class); + } + + @Test + public void singleTooManyCancels() throws Exception { + ReplayProcessor source = ReplayProcessor.create(); + source.onNext(1); + source.onNext(2); + + TestHelper.assertError(source + .singleOrErrorStage() + .toCompletableFuture(), IllegalArgumentException.class); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void singleCompletableFutureCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .singleOrErrorStage() + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.cancel(true); + + assertTrue(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void singleCompletableManualCompleteCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .singleOrErrorStage() + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.complete(1); + + assertTrue(cf.isDone()); + assertFalse(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + assertEquals((Integer)1, cf.get()); + } + + @Test + public void singleCompletableManualCompleteExceptionallyCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .singleOrErrorStage() + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.completeExceptionally(new TestException()); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void singleError() throws Exception { + CompletableFuture cf = Flowable.error(new TestException()) + .singleOrErrorStage() + .toCompletableFuture(); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void singleSourceIgnoresCancel() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onComplete(); + s.onError(new TestException()); + s.onComplete(); + } + } + .singleOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void singleDoubleOnSubscribe() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onComplete(); + } + } + .singleOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } + + @Test + public void lastJust() throws Exception { + Integer v = Flowable.just(1) + .lastOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + } + + @Test + public void lastRange() throws Exception { + Integer v = Flowable.range(1, 5) + .lastOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)5, v); + } + + @Test + public void lastEmpty() throws Exception { + TestHelper.assertError(Flowable.empty() + .lastOrErrorStage() + .toCompletableFuture(), NoSuchElementException.class); + } + + @Test + public void lastCompletableFutureCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .lastOrErrorStage() + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.cancel(true); + + assertTrue(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + } + + @Test + public void lastCompletableManualCompleteCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .lastOrErrorStage() + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.complete(1); + + assertTrue(cf.isDone()); + assertFalse(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + assertEquals((Integer)1, cf.get()); + } + + @Test + public void lastCompletableManualCompleteExceptionallyCancels() throws Exception { + PublishProcessor source = PublishProcessor.create(); + + CompletableFuture cf = source + .lastOrErrorStage() + .toCompletableFuture(); + + assertTrue(source.hasSubscribers()); + + cf.completeExceptionally(new TestException()); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + assertFalse(source.hasSubscribers()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void lastError() throws Exception { + CompletableFuture cf = Flowable.error(new TestException()) + .lastOrErrorStage() + .toCompletableFuture(); + + assertTrue(cf.isDone()); + assertTrue(cf.isCompletedExceptionally()); + assertFalse(cf.isCancelled()); + + TestHelper.assertError(cf, TestException.class); + } + + @Test + public void lastSourceIgnoresCancel() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onComplete(); + s.onError(new TestException()); + s.onComplete(); + } + } + .lastOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + }); + } + + @Test + public void lastDoubleOnSubscribe() throws Throwable { + TestHelper.withErrorTracking(errors -> { + Integer v = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onComplete(); + } + } + .lastOrErrorStage() + .toCompletableFuture() + .get(); + + assertEquals((Integer)1, v); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + }); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/jdk8/MapOptionalTckTest.java b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MapOptionalTckTest.java new file mode 100644 index 0000000000..b7cb4d9725 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/jdk8/MapOptionalTckTest.java @@ -0,0 +1,38 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.jdk8; + +import java.util.Optional; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Test; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.tck.BaseTck; + +@Test +public class MapOptionalTckTest extends BaseTck { + + @Override + public Publisher createPublisher(final long elements) { + return + Flowable.range(0, (int)(2 * elements)).mapOptional(v -> v % 2 == 0 ? Optional.of(v) : Optional.empty()) + ; + } + + @Override + public Publisher createFailedPublisher() { + return Flowable.just(1).mapOptional(v -> null).onBackpressureDrop(); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java index 1ce53f01ae..865b31a082 100644 --- a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java +++ b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java @@ -3438,4 +3438,21 @@ public static void withErrorTracking(Consumer> action) throws Th RxJavaPlugins.reset(); } } + + /** + * Assert if the given CompletableFuture fails with a specified error inside an ExecutionException. + * @param cf the CompletableFuture to test + * @param error the error class expected + */ + public static void assertError(CompletableFuture cf, Class error) { + try { + cf.get(); + fail("Should have thrown!"); + } catch (Throwable ex) { + if (!error.isInstance(ex.getCause())) { + ex.printStackTrace(); + fail("Wrong cause: " + ex.getCause()); + } + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java index 2d4387d543..99f6783b4a 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/rxjava3/validators/ParamValidationCheckerTest.java @@ -503,6 +503,12 @@ public void checkParallelFlowable() { addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Scheduler.class, Long.TYPE, Boolean.TYPE)); addOverride(new ParamOverride(Observable.class, 0, ParamMode.ANY, "window", Long.TYPE, TimeUnit.class, Scheduler.class, Long.TYPE, Boolean.TYPE, Integer.TYPE)); + // null value allowed + + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "firstStage", Object.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "singleStage", Object.class)); + addOverride(new ParamOverride(Flowable.class, 0, ParamMode.ANY, "lastStage", Object.class)); + // ----------------------------------------------------------------------------------- ignores = new HashMap>(); From 5337a0045dac84d07941bd7f3c3175726ceb7ef5 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 18 Dec 2019 15:09:27 +0100 Subject: [PATCH 2/2] Upgrade the Checkstyle plugin --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 4dcead40c4..f0406bd3f7 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ buildscript { ext.bintrayVersion = "1.8.4" ext.jfrogExtractorVersion = "4.11.0" ext.bndVersion = "4.3.1" - ext.checkstyleVersion = "6.19" + ext.checkstyleVersion = "8.26" // --------------------------------------