diff --git a/docs/Operator-Matrix.md b/docs/Operator-Matrix.md index 81e1f7878d..10427533cc 100644 --- a/docs/Operator-Matrix.md +++ b/docs/Operator-Matrix.md @@ -58,7 +58,7 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `defer`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `delay`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `delaySubscription`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| -`dematerialize`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| +`dematerialize`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| `distinct`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([39](#notes-39))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([39](#notes-39))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| `distinctUntilChanged`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([39](#notes-39))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([39](#notes-39))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| `doAfterNext`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([40](#notes-40))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([40](#notes-40))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([2](#notes-2))| @@ -212,10 +212,10 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `throttleLast`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([36](#notes-36))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([36](#notes-36))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| `throttleLatest`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([36](#notes-36))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([36](#notes-36))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| `throttleWithTimeout`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([36](#notes-36))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([36](#notes-36))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| -`timeInterval`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)| +`timeInterval`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| `timeout`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `timer`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| -`timestamp`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_half.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| +`timestamp`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([37](#notes-37))| `to`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `toCompletionStage`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([98](#notes-98))|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([98](#notes-98))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| `toFlowable`|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([99](#notes-99))|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)| @@ -237,7 +237,7 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat `zip`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([108](#notes-108))| `zipArray`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([109](#notes-109))| `zipWith`|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![present](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png)|![absent](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) ([110](#notes-110))| -**237 operators** | **215** | **209** | **108** | **93** | **76** | +**237 operators** | **215** | **209** | **111** | **95** | **76** | #### Notes 1 Use [`contains()`](#contains).
@@ -372,25 +372,19 @@ Operator | ![Flowable](https://raw.github.com/wiki/ReactiveX/RxJava/images/opmat 17. Single.concatMapMaybe() 18. Maybe.concatMapSingle() 19. Single.concatMapSingle() -20. Maybe.dematerialize() -21. Maybe.doOnLifecycle() -22. Single.doOnLifecycle() -23. Completable.doOnLifecycle() -24. Single.mergeArray() -25. Single.mergeArrayDelayError() -26. Single.ofType() -27. Completable.onErrorReturn() -28. Completable.onErrorReturnItem() -29. Maybe.safeSubscribe() -30. Single.safeSubscribe() -31. Completable.safeSubscribe() -32. Completable.sequenceEqual() -33. Maybe.startWith() -34. Single.startWith() -35. Maybe.timeInterval() -36. Single.timeInterval() -37. Completable.timeInterval() -38. Maybe.timestamp() -39. Single.timestamp() -40. Maybe.toFuture() -41. Completable.toFuture() +20. Maybe.doOnLifecycle() +21. Single.doOnLifecycle() +22. Completable.doOnLifecycle() +23. Single.mergeArray() +24. Single.mergeArrayDelayError() +25. Single.ofType() +26. Completable.onErrorReturn() +27. Completable.onErrorReturnItem() +28. Maybe.safeSubscribe() +29. Single.safeSubscribe() +30. Completable.safeSubscribe() +31. Completable.sequenceEqual() +32. Maybe.startWith() +33. Single.startWith() +34. Maybe.toFuture() +35. Completable.toFuture() diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index 12ab3678e2..03e73abe65 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -34,7 +34,7 @@ import io.reactivex.rxjava3.internal.util.ErrorMode; import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; -import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.schedulers.*; /** * The {@code Maybe} class represents a deferred computation and emission of a single value, no value at all or an exception. @@ -4991,6 +4991,232 @@ public final Maybe takeUntil(@NonNull Publisher other) { return RxJavaPlugins.onAssembly(new MaybeTakeUntilPublisher<>(this, other)); } + /** + * Measures the time (in milliseconds) between the subscription and success item emission + * of the current {@code Maybe} and signals it as a tuple ({@link Timed}) + * success value. + *

+ * + *

+ * If the current {@code Maybe} is empty or fails, the resulting {@code Maybe} will + * pass along the signals to the downstream. To measure the time to termination, + * use {@link #materialize()} and apply {@link Single#timeInterval()}. + *

+ *
Scheduler:
+ *
{@code timeInterval} uses the {@code computation} {@link Scheduler} + * for determining the current time upon subscription and upon receiving the + * success item from the current {@code Maybe}.
+ *
+ * @return the new {@code Maybe} instance + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.COMPUTATION) + public final Maybe> timeInterval() { + return timeInterval(TimeUnit.MILLISECONDS, Schedulers.computation()); + } + + /** + * Measures the time (in milliseconds) between the subscription and success item emission + * of the current {@code Maybe} and signals it as a tuple ({@link Timed}) + * success value. + *

+ * + *

+ * If the current {@code Maybe} is empty or fails, the resulting {@code Maybe} will + * pass along the signals to the downstream. To measure the time to termination, + * use {@link #materialize()} and apply {@link Single#timeInterval(Scheduler)}. + *

+ *
Scheduler:
+ *
{@code timeInterval} uses the provided {@link Scheduler} + * for determining the current time upon subscription and upon receiving the + * success item from the current {@code Maybe}.
+ *
+ * @param scheduler the {@code Scheduler} used for providing the current time + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code scheduler} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Maybe> timeInterval(@NonNull Scheduler scheduler) { + return timeInterval(TimeUnit.MILLISECONDS, scheduler); + } + + /** + * Measures the time between the subscription and success item emission + * of the current {@code Maybe} and signals it as a tuple ({@link Timed}) + * success value. + *

+ * + *

+ * If the current {@code Maybe} is empty or fails, the resulting {@code Maybe} will + * pass along the signals to the downstream. To measure the time to termination, + * use {@link #materialize()} and apply {@link Single#timeInterval(TimeUnit)}. + *

+ *
Scheduler:
+ *
{@code timeInterval} uses the {@code computation} {@link Scheduler} + * for determining the current time upon subscription and upon receiving the + * success item from the current {@code Maybe}.
+ *
+ * @param unit the time unit for measurement + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code unit} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.COMPUTATION) + public final Maybe> timeInterval(@NonNull TimeUnit unit) { + return timeInterval(unit, Schedulers.computation()); + } + + /** + * Measures the time between the subscription and success item emission + * of the current {@code Maybe} and signals it as a tuple ({@link Timed}) + * success value. + *

+ * + *

+ * If the current {@code Maybe} is empty or fails, the resulting {@code Maybe} will + * pass along the signals to the downstream. To measure the time to termination, + * use {@link #materialize()} and apply {@link Single#timeInterval(TimeUnit, Scheduler)}. + *

+ *
Scheduler:
+ *
{@code timeInterval} uses the provided {@link Scheduler} + * for determining the current time upon subscription and upon receiving the + * success item from the current {@code Maybe}.
+ *
+ * @param unit the time unit for measurement + * @param scheduler the {@code Scheduler} used for providing the current time + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Maybe> timeInterval(@NonNull TimeUnit unit, @NonNull Scheduler scheduler) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new MaybeTimeInterval<>(this, unit, scheduler, true)); + } + + /** + * Combines the success value from the current {@code Maybe} with the current time (in milliseconds) of + * its reception, using the {@code computation} {@link Scheduler} as time source, + * then signals them as a {@link Timed} instance. + *

+ * + *

+ * If the current {@code Maybe} is empty or fails, the resulting {@code Maybe} will + * pass along the signals to the downstream. To measure the time to termination, + * use {@link #materialize()} and apply {@link Single#timestamp()}. + *

+ *
Scheduler:
+ *
{@code timestamp} uses the {@code computation} {@code Scheduler} + * for determining the current time upon receiving the + * success item from the current {@code Maybe}.
+ *
+ * @return the new {@code Maybe} instance + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.COMPUTATION) + public final Maybe> timestamp() { + return timestamp(TimeUnit.MILLISECONDS, Schedulers.computation()); + } + + /** + * Combines the success value from the current {@code Maybe} with the current time (in milliseconds) of + * its reception, using the given {@link Scheduler} as time source, + * then signals them as a {@link Timed} instance. + *

+ * + *

+ * If the current {@code Maybe} is empty or fails, the resulting {@code Maybe} will + * pass along the signals to the downstream. To measure the time to termination, + * use {@link #materialize()} and apply {@link Single#timestamp(Scheduler)}. + *

+ *
Scheduler:
+ *
{@code timestamp} uses the provided {@code Scheduler} + * for determining the current time upon receiving the + * success item from the current {@code Maybe}.
+ *
+ * @param scheduler the {@code Scheduler} used for providing the current time + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code scheduler} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Maybe> timestamp(@NonNull Scheduler scheduler) { + return timestamp(TimeUnit.MILLISECONDS, scheduler); + } + + /** + * Combines the success value from the current {@code Maybe} with the current time of + * its reception, using the {@code computation} {@link Scheduler} as time source, + * then signals it as a {@link Timed} instance. + *

+ * + *

+ * If the current {@code Maybe} is empty or fails, the resulting {@code Maybe} will + * pass along the signals to the downstream. To measure the time to termination, + * use {@link #materialize()} and apply {@link Single#timestamp(TimeUnit)}. + *

+ *
Scheduler:
+ *
{@code timestamp} uses the {@code computation} {@code Scheduler}, + * for determining the current time upon receiving the + * success item from the current {@code Maybe}.
+ *
+ * @param unit the time unit for measurement + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code unit} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.COMPUTATION) + public final Maybe> timestamp(@NonNull TimeUnit unit) { + return timestamp(unit, Schedulers.computation()); + } + + /** + * Combines the success value from the current {@code Maybe} with the current time of + * its reception, using the given {@link Scheduler} as time source, + * then signals it as a {@link Timed} instance. + *

+ * + *

+ * If the current {@code Maybe} is empty or fails, the resulting {@code Maybe} will + * pass along the signals to the downstream. To measure the time to termination, + * use {@link #materialize()} and apply {@link Single#timestamp(TimeUnit, Scheduler)}. + *

+ *
Scheduler:
+ *
{@code timestamp} uses the provided {@code Scheduler}, + * which is used for determining the current time upon receiving the + * success item from the current {@code Maybe}.
+ *
+ * @param unit the time unit for measurement + * @param scheduler the {@code Scheduler} used for providing the current time + * @return the new {@code Maybe} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Maybe> timestamp(@NonNull TimeUnit unit, @NonNull Scheduler scheduler) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new MaybeTimeInterval<>(this, unit, scheduler, false)); + } + /** * Returns a {@code Maybe} that mirrors the current {@code Maybe} but applies a timeout policy for each emitted * item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index a3b536b1f7..697c1355b5 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -36,7 +36,7 @@ import io.reactivex.rxjava3.internal.util.ErrorMode; import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; -import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.schedulers.*; /** * The {@code Single} class implements the Reactive Pattern for a single value response. @@ -4137,6 +4137,232 @@ public final Single subscribeOn(@NonNull Scheduler scheduler) { return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler)); } + /** + * Measures the time (in milliseconds) between the subscription and success item emission + * of the current {@code Single} and signals it as a tuple ({@link Timed}) + * success value. + *

+ * + *

+ * If the current {@code Single} fails, the resulting {@code Single} will + * pass along the signal to the downstream. To measure the time to error, + * use {@link #materialize()} and apply {@link #timeInterval()}. + *

+ *
Scheduler:
+ *
{@code timeInterval} uses the {@code computation} {@link Scheduler} + * for determining the current time upon subscription and upon receiving the + * success item from the current {@code Single}.
+ *
+ * @return the new {@code Single} instance + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.COMPUTATION) + public final Single> timeInterval() { + return timeInterval(TimeUnit.MILLISECONDS, Schedulers.computation()); + } + + /** + * Measures the time (in milliseconds) between the subscription and success item emission + * of the current {@code Single} and signals it as a tuple ({@link Timed}) + * success value. + *

+ * + *

+ * If the current {@code Single} fails, the resulting {@code Single} will + * pass along the signal to the downstream. To measure the time to error, + * use {@link #materialize()} and apply {@link #timeInterval(Scheduler)}. + *

+ *
Scheduler:
+ *
{@code timeInterval} uses the provided {@link Scheduler} + * for determining the current time upon subscription and upon receiving the + * success item from the current {@code Single}.
+ *
+ * @param scheduler the {@code Scheduler} used for providing the current time + * @return the new {@code Single} instance + * @throws NullPointerException if {@code scheduler} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Single> timeInterval(@NonNull Scheduler scheduler) { + return timeInterval(TimeUnit.MILLISECONDS, scheduler); + } + + /** + * Measures the time between the subscription and success item emission + * of the current {@code Single} and signals it as a tuple ({@link Timed}) + * success value. + *

+ * + *

+ * If the current {@code Single} fails, the resulting {@code Single} will + * pass along the signals to the downstream. To measure the time to error, + * use {@link #materialize()} and apply {@link #timeInterval(TimeUnit, Scheduler)}. + *

+ *
Scheduler:
+ *
{@code timeInterval} uses the {@code computation} {@link Scheduler} + * for determining the current time upon subscription and upon receiving the + * success item from the current {@code Single}.
+ *
+ * @param unit the time unit for measurement + * @return the new {@code Single} instance + * @throws NullPointerException if {@code unit} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.COMPUTATION) + public final Single> timeInterval(@NonNull TimeUnit unit) { + return timeInterval(unit, Schedulers.computation()); + } + + /** + * Measures the time between the subscription and success item emission + * of the current {@code Single} and signals it as a tuple ({@link Timed}) + * success value. + *

+ * + *

+ * If the current {@code Single} is empty or fails, the resulting {@code Single} will + * pass along the signals to the downstream. To measure the time to termination, + * use {@link #materialize()} and apply {@link #timeInterval(TimeUnit, Scheduler)}. + *

+ *
Scheduler:
+ *
{@code timeInterval} uses the provided {@link Scheduler} + * for determining the current time upon subscription and upon receiving the + * success item from the current {@code Single}.
+ *
+ * @param unit the time unit for measurement + * @param scheduler the {@code Scheduler} used for providing the current time + * @return the new {@code Single} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Single> timeInterval(@NonNull TimeUnit unit, @NonNull Scheduler scheduler) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new SingleTimeInterval<>(this, unit, scheduler, true)); + } + + /** + * Combines the success value from the current {@code Single} with the current time (in milliseconds) of + * its reception, using the {@code computation} {@link Scheduler} as time source, + * then signals them as a {@link Timed} instance. + *

+ * + *

+ * If the current {@code Single} is empty or fails, the resulting {@code Single} will + * pass along the signals to the downstream. To get the timestamp of the error, + * use {@link #materialize()} and apply {@link #timestamp()}. + *

+ *
Scheduler:
+ *
{@code timestamp} uses the {@code computation} {@code Scheduler} + * for determining the current time upon receiving the + * success item from the current {@code Single}.
+ *
+ * @return the new {@code Single} instance + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.COMPUTATION) + public final Single> timestamp() { + return timestamp(TimeUnit.MILLISECONDS, Schedulers.computation()); + } + + /** + * Combines the success value from the current {@code Single} with the current time (in milliseconds) of + * its reception, using the given {@link Scheduler} as time source, + * then signals them as a {@link Timed} instance. + *

+ * + *

+ * If the current {@code Single} is empty or fails, the resulting {@code Single} will + * pass along the signals to the downstream. To get the timestamp of the error, + * use {@link #materialize()} and apply {@link #timestamp(Scheduler)}. + *

+ *
Scheduler:
+ *
{@code timestamp} uses the provided {@code Scheduler} + * for determining the current time upon receiving the + * success item from the current {@code Single}.
+ *
+ * @param scheduler the {@code Scheduler} used for providing the current time + * @return the new {@code Single} instance + * @throws NullPointerException if {@code scheduler} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Single> timestamp(@NonNull Scheduler scheduler) { + return timestamp(TimeUnit.MILLISECONDS, scheduler); + } + + /** + * Combines the success value from the current {@code Single} with the current time of + * its reception, using the {@code computation} {@link Scheduler} as time source, + * then signals it as a {@link Timed} instance. + *

+ * + *

+ * If the current {@code Single} is empty or fails, the resulting {@code Single} will + * pass along the signals to the downstream. To get the timestamp of the error, + * use {@link #materialize()} and apply {@link #timestamp(TimeUnit)}. + *

+ *
Scheduler:
+ *
{@code timestamp} uses the {@code computation} {@code Scheduler}, + * for determining the current time upon receiving the + * success item from the current {@code Single}.
+ *
+ * @param unit the time unit for measurement + * @return the new {@code Single} instance + * @throws NullPointerException if {@code unit} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.COMPUTATION) + public final Single> timestamp(@NonNull TimeUnit unit) { + return timestamp(unit, Schedulers.computation()); + } + + /** + * Combines the success value from the current {@code Single} with the current time of + * its reception, using the given {@link Scheduler} as time source, + * then signals it as a {@link Timed} instance. + *

+ * + *

+ * If the current {@code Single} is empty or fails, the resulting {@code Single} will + * pass along the signals to the downstream. To get the timestamp of the error, + * use {@link #materialize()} and apply {@link #timestamp(TimeUnit, Scheduler)}. + *

+ *
Scheduler:
+ *
{@code timestamp} uses the provided {@code Scheduler}, + * which is used for determining the current time upon receiving the + * success item from the current {@code Single}.
+ *
+ * @param unit the time unit for measurement + * @param scheduler the {@code Scheduler} used for providing the current time + * @return the new {@code Single} instance + * @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} + * @since 3.0.0 + */ + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Single> timestamp(@NonNull TimeUnit unit, @NonNull Scheduler scheduler) { + Objects.requireNonNull(unit, "unit is null"); + Objects.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new SingleTimeInterval<>(this, unit, scheduler, false)); + } + /** * Returns a {@code Single} that emits the item emitted by the current {@code Single} until a {@link CompletableSource} terminates. Upon * termination of {@code other}, this will emit a {@link CancellationException} rather than go to diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeInterval.java b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeInterval.java new file mode 100644 index 0000000000..af30ee3a1d --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeInterval.java @@ -0,0 +1,105 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import java.util.concurrent.TimeUnit; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.schedulers.Timed; + +/** + * Measures the time between subscription and the success item emission + * from the upstream and emits this as a {@link Timed} success value. + * @param the element type of the sequence + * @since 3.0.0 + */ +public final class MaybeTimeInterval extends Maybe> { + + final MaybeSource source; + + final TimeUnit unit; + + final Scheduler scheduler; + + final boolean start; + + public MaybeTimeInterval(MaybeSource source, TimeUnit unit, Scheduler scheduler, boolean start) { + this.source = source; + this.unit = unit; + this.scheduler = scheduler; + this.start = start; + } + + @Override + protected void subscribeActual(@NonNull MaybeObserver> observer) { + source.subscribe(new TimeIntervalMaybeObserver<>(observer, unit, scheduler, start)); + } + + static final class TimeIntervalMaybeObserver implements MaybeObserver, Disposable { + + final MaybeObserver> downstream; + + final TimeUnit unit; + + final Scheduler scheduler; + + final long startTime; + + Disposable upstream; + + TimeIntervalMaybeObserver(MaybeObserver> downstream, TimeUnit unit, Scheduler scheduler, boolean start) { + this.downstream = downstream; + this.unit = unit; + this.scheduler = scheduler; + this.startTime = start ? scheduler.now(unit) : 0L; + } + + @Override + public void onSubscribe(@NonNull Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(@NonNull T t) { + downstream.onSuccess(new Timed<>(t, scheduler.now(unit) - startTime, unit)); + } + + @Override + public void onError(@NonNull Throwable e) { + downstream.onError(e); + } + + @Override + public void onComplete() { + downstream.onComplete(); + } + + @Override + public void dispose() { + upstream.dispose(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeInterval.java b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeInterval.java new file mode 100644 index 0000000000..2eac3bbc3a --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeInterval.java @@ -0,0 +1,100 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import java.util.concurrent.TimeUnit; + +import io.reactivex.rxjava3.annotations.NonNull; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.schedulers.Timed; + +/** + * Measures the time between subscription and the success item emission + * from the upstream and emits this as a {@link Timed} success value. + * @param the element type of the sequence + * @since 3.0.0 + */ +public final class SingleTimeInterval extends Single> { + + final SingleSource source; + + final TimeUnit unit; + + final Scheduler scheduler; + + final boolean start; + + public SingleTimeInterval(SingleSource source, TimeUnit unit, Scheduler scheduler, boolean start) { + this.source = source; + this.unit = unit; + this.scheduler = scheduler; + this.start = start; + } + + @Override + protected void subscribeActual(@NonNull SingleObserver> observer) { + source.subscribe(new TimeIntervalSingleObserver<>(observer, unit, scheduler, start)); + } + + static final class TimeIntervalSingleObserver implements SingleObserver, Disposable { + + final SingleObserver> downstream; + + final TimeUnit unit; + + final Scheduler scheduler; + + final long startTime; + + Disposable upstream; + + TimeIntervalSingleObserver(SingleObserver> downstream, TimeUnit unit, Scheduler scheduler, boolean start) { + this.downstream = downstream; + this.unit = unit; + this.scheduler = scheduler; + this.startTime = start ? scheduler.now(unit) : 0L; + } + + @Override + public void onSubscribe(@NonNull Disposable d) { + if (DisposableHelper.validate(this.upstream, d)) { + this.upstream = d; + + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(@NonNull T t) { + downstream.onSuccess(new Timed<>(t, scheduler.now(unit) - startTime, unit)); + } + + @Override + public void onError(@NonNull Throwable e) { + downstream.onError(e); + } + + @Override + public void dispose() { + upstream.dispose(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + } +} diff --git a/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java b/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java index 814b223dcf..a7b6cc46e7 100644 --- a/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java +++ b/src/main/java/io/reactivex/rxjava3/schedulers/Timed.java @@ -33,10 +33,10 @@ public final class Timed { * @param value the value to hold * @param time the time to hold * @param unit the time unit, not null - * @throws NullPointerException if unit is {@code null} + * @throws NullPointerException if {@code value} or {@code unit} is {@code null} */ public Timed(@NonNull T value, long time, @NonNull TimeUnit unit) { - this.value = value; + this.value = Objects.requireNonNull(value, "value is null"); this.time = time; this.unit = Objects.requireNonNull(unit, "unit is null"); } @@ -89,7 +89,7 @@ public boolean equals(Object other) { @Override public int hashCode() { - int h = value != null ? value.hashCode() : 0; + int h = value.hashCode(); h = h * 31 + (int)((time >>> 31) ^ time); h = h * 31 + unit.hashCode(); return h; diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeIntervalTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeIntervalTest.java new file mode 100644 index 0000000000..30b770aed4 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeIntervalTest.java @@ -0,0 +1,111 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.schedulers.*; +import io.reactivex.rxjava3.subjects.MaybeSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class MaybeTimeIntervalTest { + + @Test + public void just() { + Maybe.just(1) + .timeInterval() + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void empty() { + Maybe.empty() + .timeInterval() + .test() + .assertResult(); + } + + @Test + public void error() { + Maybe.error(new TestException()) + .timeInterval() + .test() + .assertFailure(TestException.class); + } + + @Test + public void justSeconds() { + Maybe.just(1) + .timeInterval(TimeUnit.SECONDS) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void justScheduler() { + Maybe.just(1) + .timeInterval(Schedulers.single()) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void justSecondsScheduler() { + Maybe.just(1) + .timeInterval(TimeUnit.SECONDS, Schedulers.single()) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(m -> m.timeInterval()); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(MaybeSubject.create().timeInterval()); + } + + @Test + public void timeInfo() { + TestScheduler scheduler = new TestScheduler(); + + MaybeSubject ms = MaybeSubject.create(); + + TestObserver> to = ms + .timeInterval(scheduler) + .test(); + + scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + + ms.onSuccess(1); + + to.assertResult(new Timed<>(1, 1000L, TimeUnit.MILLISECONDS)); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimestampTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimestampTest.java new file mode 100644 index 0000000000..1ce230f8d0 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimestampTest.java @@ -0,0 +1,111 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.maybe; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.schedulers.*; +import io.reactivex.rxjava3.subjects.MaybeSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class MaybeTimestampTest { + + @Test + public void just() { + Maybe.just(1) + .timestamp() + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void empty() { + Maybe.empty() + .timestamp() + .test() + .assertResult(); + } + + @Test + public void error() { + Maybe.error(new TestException()) + .timestamp() + .test() + .assertFailure(TestException.class); + } + + @Test + public void justSeconds() { + Maybe.just(1) + .timestamp(TimeUnit.SECONDS) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void justScheduler() { + Maybe.just(1) + .timestamp(Schedulers.single()) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void justSecondsScheduler() { + Maybe.just(1) + .timestamp(TimeUnit.SECONDS, Schedulers.single()) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybe(m -> m.timestamp()); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(MaybeSubject.create().timestamp()); + } + + @Test + public void timeInfo() { + TestScheduler scheduler = new TestScheduler(); + + MaybeSubject ms = MaybeSubject.create(); + + TestObserver> to = ms + .timestamp(scheduler) + .test(); + + scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + + ms.onSuccess(1); + + to.assertResult(new Timed<>(1, 1000L, TimeUnit.MILLISECONDS)); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeIntervalTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeIntervalTest.java new file mode 100644 index 0000000000..09dd26eeba --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeIntervalTest.java @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.schedulers.*; +import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class SingleTimeIntervalTest { + + @Test + public void just() { + Single.just(1) + .timestamp() + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void error() { + Single.error(new TestException()) + .timestamp() + .test() + .assertFailure(TestException.class); + } + + @Test + public void justSeconds() { + Single.just(1) + .timestamp(TimeUnit.SECONDS) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void justScheduler() { + Single.just(1) + .timestamp(Schedulers.single()) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void justSecondsScheduler() { + Single.just(1) + .timestamp(TimeUnit.SECONDS, Schedulers.single()) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(m -> m.timestamp()); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(SingleSubject.create().timestamp()); + } + + @Test + public void timeInfo() { + TestScheduler scheduler = new TestScheduler(); + + SingleSubject ss = SingleSubject.create(); + + TestObserver> to = ss + .timestamp(scheduler) + .test(); + + scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + + ss.onSuccess(1); + + to.assertResult(new Timed<>(1, 1000L, TimeUnit.MILLISECONDS)); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimestampTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimestampTest.java new file mode 100644 index 0000000000..a282c0345e --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimestampTest.java @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.exceptions.TestException; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.schedulers.*; +import io.reactivex.rxjava3.subjects.SingleSubject; +import io.reactivex.rxjava3.testsupport.TestHelper; + +public class SingleTimestampTest { + + @Test + public void just() { + Single.just(1) + .timeInterval() + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void error() { + Single.error(new TestException()) + .timeInterval() + .test() + .assertFailure(TestException.class); + } + + @Test + public void justSeconds() { + Single.just(1) + .timeInterval(TimeUnit.SECONDS) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void justScheduler() { + Single.just(1) + .timeInterval(Schedulers.single()) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void justSecondsScheduler() { + Single.just(1) + .timeInterval(TimeUnit.SECONDS, Schedulers.single()) + .test() + .assertValueCount(1) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(m -> m.timeInterval()); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(SingleSubject.create().timeInterval()); + } + + @Test + public void timeInfo() { + TestScheduler scheduler = new TestScheduler(); + + SingleSubject ss = SingleSubject.create(); + + TestObserver> to = ss + .timeInterval(scheduler) + .test(); + + scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS); + + ss.onSuccess(1); + + to.assertResult(new Timed<>(1, 1000L, TimeUnit.MILLISECONDS)); + } +} diff --git a/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java b/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java index 60b5a11fb6..a0ba3e52e4 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java +++ b/src/test/java/io/reactivex/rxjava3/internal/util/OperatorMatrixGenerator.java @@ -458,6 +458,7 @@ static String findNotes(String clazzName, String operatorName) { " C throttleLatest Always empty thus no items to work with.", " MS throttleWithTimeout At most one item signaled so no subsequent items to work with.", " C throttleWithTimeout Always empty thus no items to work with.", + " C timeInterval Always empty thus no items to work with.", " C timestamp Always empty thus no items to work with.", "FO toCompletionStage Use [`firstStage`](#firstStage), [`lastStage`](#lastStage) or [`singleStage`](#singleStage).", "F toFlowable Would be no-op.", diff --git a/src/test/java/io/reactivex/rxjava3/schedulers/TimedTest.java b/src/test/java/io/reactivex/rxjava3/schedulers/TimedTest.java index d7a5eb752e..ce117c2fb7 100644 --- a/src/test/java/io/reactivex/rxjava3/schedulers/TimedTest.java +++ b/src/test/java/io/reactivex/rxjava3/schedulers/TimedTest.java @@ -39,7 +39,7 @@ public void hashCodeOf() { assertEquals(TimeUnit.SECONDS.hashCode() + 31 * (5 + 31 * 1), t1.hashCode()); - Timed t2 = new Timed<>(null, 5, TimeUnit.SECONDS); + Timed t2 = new Timed<>(0, 5, TimeUnit.SECONDS); assertEquals(TimeUnit.SECONDS.hashCode() + 31 * (5 + 31 * 0), t2.hashCode()); } diff --git a/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java b/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java index d351e7809d..e22fdd9521 100644 --- a/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java +++ b/src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java @@ -153,12 +153,12 @@ public void maybeDocRefersToMaybeTypes() throws Exception { jdx = 0; for (;;) { int idx = m.javadoc.indexOf("Single", jdx); - if (idx >= 0) { + if (idx >= 0 && m.javadoc.indexOf("Single#", jdx) != idx) { int j = m.javadoc.indexOf("#toSingle", jdx); int k = m.javadoc.indexOf("{@code Single", jdx); if (!m.signature.contains("Single") && (j + 3 != idx && k + 7 != idx)) { e.append("java.lang.RuntimeException: Maybe doc mentions Single but not in the signature\r\n at io.reactivex.rxjava3.core.") - .append("Maybe(Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + .append("Maybe.method(Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); } jdx = idx + 6; } else {