Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: [Java 8] Implement mapOptional, collector, first/last/single stage #6775

Merged
merged 2 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ buildscript {
ext.bintrayVersion = "1.8.4"
ext.jfrogExtractorVersion = "4.11.0"
ext.bndVersion = "4.3.1"
ext.checkstyleVersion = "6.19"
ext.checkstyleVersion = "8.26"

// --------------------------------------

Expand Down
260 changes: 259 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6930,6 +6930,7 @@ public final <U> Flowable<U> cast(final Class<U> clazz) {
* @return a Single that emits the result of collecting the values emitted by the source Publisher
* into a single mutable data structure
* @see <a href="http://reactivex.io/documentation/operators/reduce.html">ReactiveX operators documentation: Reduce</a>
* @see #collect(Collector)
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -11256,12 +11257,13 @@ public final <R> Flowable<R> lift(FlowableOperator<? extends R, ? super T> lifte
* @return a Flowable that emits the items from the source Publisher, transformed by the specified
* function
* @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
* @see #mapOptional(Function)
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
public final <@NonNull R> Flowable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
}
Expand Down Expand Up @@ -18739,4 +18741,260 @@ public final TestSubscriber<T> test(long initialRequest, boolean cancel) { // No
Objects.requireNonNull(stream, "stream is null");
return RxJavaPlugins.onAssembly(new FlowableFromStream<>(stream));
}

/**
* Maps each upstream value into an {@link Optional} and emits the contained item if not empty.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mapOptional.f.png" alt="">
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator is a pass-through for downstream requests but issues {@code request(1)} whenever the
* mapped {@code Optional} is empty.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mapOptional} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the non-null output type
* @param mapper the function that receives the upstream item and should return a <em>non-empty</em> {@code Optional}
* to emit as the output or an <em>empty</em> {@code Optional} to skip to the next upstream value
* @return the new Flowable instance
* @since 3.0.0
* @see #map(Function)
* @see #filter(Predicate)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <@NonNull R> Flowable<R> mapOptional(@NonNull Function<? super T, @NonNull Optional<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableMapOptional<>(this, mapper));
}

/**
* Collects the finite upstream's values into a container via a Stream {@link Collector} callback set and emits
* it as the success result.
* <p>
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/collector.f.png" alt="">
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the upstream in an unbounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code collect} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <R> the non-null result type
* @param <A> the intermediate container type used for the accumulation
* @param collector the interface defining the container supplier, accumulator and finisher functions;
* see {@link Collectors} for some standard implementations
* @return the new Single instance
* @since 3.0.0
* @see Collectors
* @see #collect(Supplier, BiConsumer)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <@NonNull R, A> Single<R> collect(@NonNull Collector<T, A, R> collector) {
Objects.requireNonNull(collector, "collector is null");
return RxJavaPlugins.onAssembly(new FlowableCollectWithCollectorSingle<>(this, collector));
}

/**
* Signals the first upstream item (or the default item if the upstream is empty) via
* a {@link CompletionStage}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstStage.f.png" alt="">
* <p>
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
* calling {@link CompletableFuture#cancel(boolean)} on it.
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
* <p>
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
* a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}:
* <pre><code>
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).firstStage(Optional.empty());
* </code></pre>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requests one item from upstream and then when received, cancels the upstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code firstStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param defaultItem the item to signal if the upstream is empty
* @return the new CompletionStage instance
* @since 3.0.0
* @see #firstOrErrorStage()
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final CompletionStage<T> firstStage(@Nullable T defaultItem) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if these are actually useful compared to just using the regular Single-returning operators and then calling a toStage().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, put another way, would you add blockingFirstOptional() and blockingLastOptional() and the like or would you expect people to using the Single-returning operators and then call blockingOptional()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. We have first and blockingFirst so in case of a firstStage -> first().toCompletionStage() change, wouldn't the API consistency require us dropping blockingFirst for first().blockingGet()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind that. It keeps you inside the reactive APIs for transforming the data into the shape you want and then you can use a conversion or blocking operator to escape.

Up to you though. Just thinking of how to minimize the number of operators needed to support these new types.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd keep them for now and see if I can beef up blockingGet with macro-fusion (as their backing specific implementation is to be reused there). If it can be done reasonably well, I'm willing to drop blockingX, and xStage operators.

return subscribeWith(new FlowableFirstStageSubscriber<>(true, defaultItem));
}

/**
* Signals the only expected upstream item (or the default item if the upstream is empty)
* or signals {@link IllegalArgumentException} if the upstream has more than one item
* via a {@link CompletionStage}.
* <p>
* <img width="640" height="229" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/singleStage.f.png" alt="">
* <p>
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
* calling {@link CompletableFuture#cancel(boolean)} on it.
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
* <p>
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
* a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}:
* <pre><code>
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).singleStage(Optional.empty());
* </code></pre>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requests two items from upstream and then when more than one item is received, cancels the upstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code singleStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param defaultItem the item to signal if the upstream is empty
* @return the new CompletionStage instance
* @since 3.0.0
* @see #singleOrErrorStage()
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final CompletionStage<T> singleStage(@Nullable T defaultItem) {
return subscribeWith(new FlowableSingleStageSubscriber<>(true, defaultItem));
}

/**
* Signals the last upstream item (or the default item if the upstream is empty) via
* a {@link CompletionStage}.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastStage.f.png" alt="">
* <p>
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
* calling {@link CompletableFuture#cancel(boolean)} on it.
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
* <p>
* {@code CompletionStage}s don't have a notion of emptyness and allow {@code null}s, therefore, one can either use
* a {@code defaultItem} of {@code null} or turn the flow into a sequence of {@link Optional}s and default to {@link Optional#empty()}:
* <pre><code>
* CompletionStage&lt;Optional&lt;T&gt;&gt; stage = source.map(Optional::of).lastStage(Optional.empty());
* </code></pre>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requests an unbounded number of items from the upstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lastStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param defaultItem the item to signal if the upstream is empty
* @return the new CompletionStage instance
* @since 3.0.0
* @see #lastOrErrorStage()
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final CompletionStage<T> lastStage(@Nullable T defaultItem) {
return subscribeWith(new FlowableLastStageSubscriber<>(true, defaultItem));
}

/**
* Signals the first upstream item or a {@link NoSuchElementException} if the upstream is empty via
* a {@link CompletionStage}.
* <p>
* <img width="640" height="338" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrErrorStage.f.png" alt="">
* <p>
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
* calling {@link CompletableFuture#cancel(boolean)} on it.
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requests one item from upstream and then when received, cancels the upstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code firstOrErrorStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new CompletionStage instance
* @since 3.0.0
* @see #firstStage(Object)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final CompletionStage<T> firstOrErrorStage() {
return subscribeWith(new FlowableFirstStageSubscriber<>(false, null));
}

/**
* Signals the only expected upstream item, a {@link NoSuchElementException} if the upstream is empty
* or signals {@link IllegalArgumentException} if the upstream has more than one item
* via a {@link CompletionStage}.
* <p>
* <img width="640" height="229" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/singleOrErrorStage.f.png" alt="">
* <p>
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
* calling {@link CompletableFuture#cancel(boolean)} on it.
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requests two items from upstream and then when more than one item is received, cancels the upstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code singleOrErrorStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new CompletionStage instance
* @since 3.0.0
* @see #singleStage(Object)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final CompletionStage<T> singleOrErrorStage() {
return subscribeWith(new FlowableSingleStageSubscriber<>(false, null));
}

/**
* Signals the last upstream item or a {@link NoSuchElementException} if the upstream is empty via
* a {@link CompletionStage}.
* <p>
* <img width="640" height="346" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrErrorStage.f.png" alt="">
* <p>
* The upstream can be canceled by converting the resulting {@code CompletionStage} into
* {@link CompletableFuture} via {@link CompletionStage#toCompletableFuture()} and
* calling {@link CompletableFuture#cancel(boolean)} on it.
* The upstream will be also cancelled if the resulting {@code CompletionStage} is converted to and
* completed manually by {@link CompletableFuture#complete(Object)} or {@link CompletableFuture#completeExceptionally(Throwable)}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requests an unbounded number of items from the upstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lastOrErrorStage} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new CompletionStage instance
* @since 3.0.0
* @see #lastStage(Object)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final CompletionStage<T> lastOrErrorStage() {
return subscribeWith(new FlowableLastStageSubscriber<>(false, null));
}
}
Loading