diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 5c2eeebdc7..def87d400b 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -8843,57 +8843,6 @@ public final Flowable delaySubscription(long delay, TimeUnit unit, Scheduler return delaySubscription(timer(delay, unit, scheduler)); } - /** - * Returns a Flowable that reverses the effect of {@link #materialize materialize} by transforming the - * {@link Notification} objects emitted by the source Publisher into the items or notifications they - * represent. - *

- * - *

- * When the upstream signals an {@link Notification#createOnError(Throwable) onError} or - * {@link Notification#createOnComplete() onComplete} item, the - * returned Flowable cancels the flow and terminates with that type of terminal event: - *


-     * Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
-     * .doOnCancel(() -> System.out.println("Cancelled!"));
-     * .dematerialize()
-     * .test()
-     * .assertResult(1);
-     * 
- * If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated - * with the same event. - *

-     * Flowable.just(createOnNext(1), createOnNext(2))
-     * .dematerialize()
-     * .test()
-     * .assertResult(1, 2);
-     * 
- * If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(Publisher)} - * with a {@link #never()} source. - *
- *
Backpressure:
- *
The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s - * backpressure behavior.
- *
Scheduler:
- *
{@code dematerialize} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the output value type - * @return a Flowable that emits the items and notifications embedded in the {@link Notification} objects - * emitted by the source Publisher - * @see ReactiveX operators documentation: Dematerialize - * @see #dematerialize(Function) - * @deprecated in 2.2.4; inherently type-unsafe as it overrides the output generic type. Use {@link #dematerialize(Function)} instead. - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - @BackpressureSupport(BackpressureKind.PASS_THROUGH) - @Deprecated - @SuppressWarnings({ "unchecked", "rawtypes" }) - public final Flowable dematerialize() { - return RxJavaPlugins.onAssembly(new FlowableDematerialize(this, Functions.identity())); - } - /** * Returns a Flowable that reverses the effect of {@link #materialize materialize} by transforming the * {@link Notification} objects extracted from the source items via a selector function @@ -13234,51 +13183,6 @@ public final Flowable replay(Function, ? extends Publ FlowableInternalHelper.replaySupplier(this, bufferSize, time, unit, scheduler, eagerTruncate), selector); } - /** - * Returns a Flowable that emits items that are the results of invoking a specified selector on items - * emitted by a {@link ConnectableFlowable} that shares a single subscription to the source Publisher, - * replaying a maximum of {@code bufferSize} items. - *

- * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than - * {@code bufferSize} source emissions. - *

- * - *

- *
Backpressure:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Publisher sequence.
- *
Scheduler:
- *
You specify which {@link Scheduler} this operator will use.
- *
- * - * @param - * the type of items emitted by the resulting Publisher - * @param selector - * a selector function, which can use the multicasted sequence as many times as needed, without - * causing multiple subscriptions to the Publisher - * @param bufferSize - * the buffer size that limits the number of items the connectable Publisher can replay - * @param scheduler - * the Scheduler on which the replay is observed - * @return a Flowable that emits items that are the results of invoking the selector on items emitted by - * a {@link ConnectableFlowable} that shares a single subscription to the source Publisher, - * replaying no more than {@code bufferSize} notifications - * @see ReactiveX operators documentation: Replay - */ - @CheckReturnValue - @NonNull - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.CUSTOM) - public final Flowable replay(final Function, ? extends Publisher> selector, final int bufferSize, final Scheduler scheduler) { - ObjectHelper.requireNonNull(selector, "selector is null"); - ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return FlowableReplay.multicastSelector(FlowableInternalHelper.replaySupplier(this, bufferSize, false), - FlowableInternalHelper.replayFunction(selector, scheduler) - ); - } - /** * Returns a Flowable that emits items that are the results of invoking a specified selector on items * emitted by a {@link ConnectableFlowable} that shares a single subscription to the source Publisher, @@ -13403,43 +13307,6 @@ public final Flowable replay(Function, ? extends Publ return FlowableReplay.multicastSelector(FlowableInternalHelper.replaySupplier(this, time, unit, scheduler, eagerTruncate), selector); } - /** - * Returns a Flowable that emits items that are the results of invoking a specified selector on items - * emitted by a {@link ConnectableFlowable} that shares a single subscription to the source Publisher. - *

- * - *

- *
Backpressure:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Publisher sequence.
- *
Scheduler:
- *
You specify which {@link Scheduler} this operator will use.
- *
- * - * @param - * the type of items emitted by the resulting Publisher - * @param selector - * a selector function, which can use the multicasted sequence as many times as needed, without - * causing multiple subscriptions to the Publisher - * @param scheduler - * the Scheduler where the replay is observed - * @return a Flowable that emits items that are the results of invoking the selector on items emitted by - * a {@link ConnectableFlowable} that shares a single subscription to the source Publisher, - * replaying all items - * @see ReactiveX operators documentation: Replay - */ - @CheckReturnValue - @NonNull - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.CUSTOM) - public final Flowable replay(final Function, ? extends Publisher> selector, final Scheduler scheduler) { - ObjectHelper.requireNonNull(selector, "selector is null"); - ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - return FlowableReplay.multicastSelector(FlowableInternalHelper.replaySupplier(this), - FlowableInternalHelper.replayFunction(selector, scheduler)); - } - /** * Returns a {@link ConnectableFlowable} that shares a single subscription to the source Publisher that * replays at most {@code bufferSize} items emitted by that Publisher. A Connectable Publisher resembles @@ -13651,41 +13518,6 @@ public final ConnectableFlowable replay(final int bufferSize, final long time return FlowableReplay.create(this, time, unit, scheduler, bufferSize, eagerTruncate); } - /** - * Returns a {@link ConnectableFlowable} that shares a single subscription to the source Publisher and - * replays at most {@code bufferSize} items emitted by that Publisher. A Connectable Publisher resembles - * an ordinary Publisher, except that it does not begin emitting items when it is subscribed to, but only - * when its {@code connect} method is called. - *

- * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than - * {@code bufferSize} source emissions. - *

- * - *

- *
Backpressure:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Publisher sequence.
- *
Scheduler:
- *
You specify which {@link Scheduler} this operator will use.
- *
- * - * @param bufferSize - * the buffer size that limits the number of items that can be replayed - * @param scheduler - * the scheduler on which the Subscribers will observe the emitted items - * @return a {@link ConnectableFlowable} that shares a single subscription to the source Publisher and - * replays at most {@code bufferSize} items that were emitted by the Publisher - * @see ReactiveX operators documentation: Replay - */ - @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.CUSTOM) - public final ConnectableFlowable replay(final int bufferSize, final Scheduler scheduler) { - ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - return FlowableReplay.observeOn(replay(bufferSize), scheduler); - } - /** * Returns a {@link ConnectableFlowable} that shares a single subscription to the source Publisher and * replays all items emitted by that Publisher within a specified time window. A Connectable Publisher @@ -13800,37 +13632,6 @@ public final ConnectableFlowable replay(final long time, final TimeUnit unit, return FlowableReplay.create(this, time, unit, scheduler, eagerTruncate); } - /** - * Returns a {@link ConnectableFlowable} that shares a single subscription to the source Publisher that - * will replay all of its items and notifications to any future {@link Subscriber} on the given - * {@link Scheduler}. A Connectable Publisher resembles an ordinary Publisher, except that it does not - * begin emitting items when it is subscribed to, but only when its {@code connect} method is called. - *

- * - *

- *
Backpressure:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Publisher sequence.
- *
Scheduler:
- *
You specify which {@link Scheduler} this operator will use.
- *
- * - * @param scheduler - * the Scheduler on which the Subscribers will observe the emitted items - * @return a {@link ConnectableFlowable} that shares a single subscription to the source Publisher that - * will replay all of its items and notifications to any future {@link Subscriber} on the given - * {@link Scheduler} - * @see ReactiveX operators documentation: Replay - */ - @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.CUSTOM) - public final ConnectableFlowable replay(final Scheduler scheduler) { - ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - return FlowableReplay.observeOn(replay(), scheduler); - } - /** * Returns a Flowable that mirrors the source Publisher, resubscribing to it if it calls {@code onError} * (infinite retry count). diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index c42458f153..e7f0f694ab 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -7784,53 +7784,6 @@ public final Observable delaySubscription(long delay, TimeUnit unit, Schedule return delaySubscription(timer(delay, unit, scheduler)); } - /** - * Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the - * {@link Notification} objects emitted by the source ObservableSource into the items or notifications they - * represent. - *

- * - *

- * When the upstream signals an {@link Notification#createOnError(Throwable) onError} or - * {@link Notification#createOnComplete() onComplete} item, the - * returned Observable disposes of the flow and terminates with that type of terminal event: - *


-     * Observable.just(createOnNext(1), createOnComplete(), createOnNext(2))
-     * .doOnDispose(() -> System.out.println("Disposed!"));
-     * .dematerialize()
-     * .test()
-     * .assertResult(1);
-     * 
- * If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated - * with the same event. - *

-     * Observable.just(createOnNext(1), createOnNext(2))
-     * .dematerialize()
-     * .test()
-     * .assertResult(1, 2);
-     * 
- * If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(ObservableSource)} - * with a {@link #never()} source. - *
- *
Scheduler:
- *
{@code dematerialize} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the output value type - * @return an Observable that emits the items and notifications embedded in the {@link Notification} objects - * emitted by the source ObservableSource - * @see ReactiveX operators documentation: Dematerialize - * @see #dematerialize(Function) - * @deprecated in 2.2.4; inherently type-unsafe as it overrides the output generic type. Use {@link #dematerialize(Function)} instead. - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - @Deprecated - @SuppressWarnings({ "unchecked", "rawtypes" }) - public final Observable dematerialize() { - return RxJavaPlugins.onAssembly(new ObservableDematerialize(this, Functions.identity())); - } - /** * Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the * {@link Notification} objects extracted from the source items via a selector function @@ -10858,44 +10811,6 @@ public final Observable replay(Function, ? extends ObservableInternalHelper.replaySupplier(this, bufferSize, time, unit, scheduler, eagerTruncate), selector); } - /** - * Returns an Observable that emits items that are the results of invoking a specified selector on items - * emitted by a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource, - * replaying a maximum of {@code bufferSize} items. - *

- * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than - * {@code bufferSize} source emissions. - *

- * - *

- *
Scheduler:
- *
You specify which {@link Scheduler} this operator will use.
- *
- * - * @param - * the type of items emitted by the resulting ObservableSource - * @param selector - * a selector function, which can use the multicasted sequence as many times as needed, without - * causing multiple subscriptions to the ObservableSource - * @param bufferSize - * the buffer size that limits the number of items the connectable ObservableSource can replay - * @param scheduler - * the Scheduler on which the replay is observed - * @return an Observable that emits items that are the results of invoking the selector on items emitted by - * a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource, - * replaying no more than {@code bufferSize} notifications - * @see ReactiveX operators documentation: Replay - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.CUSTOM) - public final Observable replay(final Function, ? extends ObservableSource> selector, final int bufferSize, final Scheduler scheduler) { - ObjectHelper.requireNonNull(selector, "selector is null"); - ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return ObservableReplay.multicastSelector(ObservableInternalHelper.replaySupplier(this, bufferSize, false), - ObservableInternalHelper.replayFunction(selector, scheduler)); - } - /** * Returns an Observable that emits items that are the results of invoking a specified selector on items * emitted by a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource, @@ -11003,37 +10918,6 @@ public final Observable replay(Function, ? extends return ObservableReplay.multicastSelector(ObservableInternalHelper.replaySupplier(this, time, unit, scheduler, eagerTruncate), selector); } - /** - * Returns an Observable that emits items that are the results of invoking a specified selector on items - * emitted by a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource. - *

- * - *

- *
Scheduler:
- *
You specify which {@link Scheduler} this operator will use.
- *
- * - * @param - * the type of items emitted by the resulting ObservableSource - * @param selector - * a selector function, which can use the multicasted sequence as many times as needed, without - * causing multiple subscriptions to the ObservableSource - * @param scheduler - * the Scheduler where the replay is observed - * @return an Observable that emits items that are the results of invoking the selector on items emitted by - * a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource, - * replaying all items - * @see ReactiveX operators documentation: Replay - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.CUSTOM) - public final Observable replay(final Function, ? extends ObservableSource> selector, final Scheduler scheduler) { - ObjectHelper.requireNonNull(selector, "selector is null"); - ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - return ObservableReplay.multicastSelector(ObservableInternalHelper.replaySupplier(this), - ObservableInternalHelper.replayFunction(selector, scheduler)); - } - /** * Returns a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource that * replays at most {@code bufferSize} items emitted by that ObservableSource. A Connectable ObservableSource resembles @@ -11218,36 +11102,6 @@ public final ConnectableObservable replay(final int bufferSize, final long ti return ObservableReplay.create(this, time, unit, scheduler, bufferSize, eagerTruncate); } - /** - * Returns a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource and - * replays at most {@code bufferSize} items emitted by that ObservableSource. A Connectable ObservableSource resembles - * an ordinary ObservableSource, except that it does not begin emitting items when it is subscribed to, but only - * when its {@code connect} method is called. - *

- * Note that due to concurrency requirements, {@code replay(bufferSize)} may hold strong references to more than - * {@code bufferSize} source emissions. - *

- * - *

- *
Scheduler:
- *
You specify which {@link Scheduler} this operator will use.
- *
- * - * @param bufferSize - * the buffer size that limits the number of items that can be replayed - * @param scheduler - * the scheduler on which the Observers will observe the emitted items - * @return a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource and - * replays at most {@code bufferSize} items that were emitted by the ObservableSource - * @see ReactiveX operators documentation: Replay - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.CUSTOM) - public final ConnectableObservable replay(final int bufferSize, final Scheduler scheduler) { - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return ObservableReplay.observeOn(replay(bufferSize), scheduler); - } - /** * Returns a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource and * replays all items emitted by that ObservableSource within a specified time window. A Connectable ObservableSource @@ -11344,32 +11198,6 @@ public final ConnectableObservable replay(final long time, final TimeUnit uni return ObservableReplay.create(this, time, unit, scheduler, eagerTruncate); } - /** - * Returns a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource that - * will replay all of its items and notifications to any future {@link Observer} on the given - * {@link Scheduler}. A Connectable ObservableSource resembles an ordinary ObservableSource, except that it does not - * begin emitting items when it is subscribed to, but only when its {@code connect} method is called. - *

- * - *

- *
Scheduler:
- *
You specify which {@link Scheduler} this operator will use.
- *
- * - * @param scheduler - * the Scheduler on which the Observers will observe the emitted items - * @return a {@link ConnectableObservable} that shares a single subscription to the source ObservableSource that - * will replay all of its items and notifications to any future {@link Observer} on the given - * {@link Scheduler} - * @see ReactiveX operators documentation: Replay - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.CUSTOM) - public final ConnectableObservable replay(final Scheduler scheduler) { - ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - return ObservableReplay.observeOn(replay(), scheduler); - } - /** * Returns an Observable that mirrors the source ObservableSource, resubscribing to it if it calls {@code onError} * (infinite retry count). diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java index 8cb186da20..e0ed2f374c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java @@ -209,10 +209,6 @@ public static Supplier> replaySupplier(final Flowable return new TimedReplay(parent, time, unit, scheduler, eagerTruncate); } - public static Function, Publisher> replayFunction(final Function, ? extends Publisher> selector, final Scheduler scheduler) { - return new ReplayFunction(selector, scheduler); - } - public enum RequestMax implements Consumer { INSTANCE; @Override @@ -318,20 +314,4 @@ public ConnectableFlowable get() { return parent.replay(time, unit, scheduler, eagerTruncate); } } - - static final class ReplayFunction implements Function, Publisher> { - private final Function, ? extends Publisher> selector; - private final Scheduler scheduler; - - ReplayFunction(Function, ? extends Publisher> selector, Scheduler scheduler) { - this.selector = selector; - this.scheduler = scheduler; - } - - @Override - public Publisher apply(Flowable t) throws Throwable { - Publisher p = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Publisher"); - return Flowable.fromPublisher(p).observeOn(scheduler); - } - } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index a482f1fa01..6d57dffced 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -60,19 +60,6 @@ public static Flowable multicastSelector( return new MulticastFlowable(connectableFactory, selector); } - /** - * Child Subscribers will observe the events of the ConnectableObservable on the - * specified scheduler. - * @param the value type - * @param cf the ConnectableFlowable to wrap - * @param scheduler the target scheduler - * @return the new ConnectableObservable instance - */ - public static ConnectableFlowable observeOn(final ConnectableFlowable cf, final Scheduler scheduler) { - final Flowable flowable = cf.observeOn(scheduler); - return RxJavaPlugins.onAssembly(new ConnectableFlowableReplay(cf, flowable)); - } - /** * Creates a replaying ConnectableObservable with an unbounded buffer. * @param the value type diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index af9d5e4628..10927c39c7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -214,10 +214,6 @@ public static Supplier> replaySupplier(final Observ return new TimedReplayCallable(parent, time, unit, scheduler, eagerTruncate); } - public static Function, ObservableSource> replayFunction(final Function, ? extends ObservableSource> selector, final Scheduler scheduler) { - return new ReplayFunction(selector, scheduler); - } - static final class ZipIterableFunction implements Function>, ObservableSource> { private final Function zipper; @@ -312,20 +308,4 @@ public ConnectableObservable get() { return parent.replay(time, unit, scheduler, eagerTruncate); } } - - static final class ReplayFunction implements Function, ObservableSource> { - private final Function, ? extends ObservableSource> selector; - private final Scheduler scheduler; - - ReplayFunction(Function, ? extends ObservableSource> selector, Scheduler scheduler) { - this.selector = selector; - this.scheduler = scheduler; - } - - @Override - public ObservableSource apply(Observable t) throws Throwable { - ObservableSource apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource"); - return Observable.wrap(apply).observeOn(scheduler); - } - } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index c7a7bcfba8..964727df6e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -63,19 +63,6 @@ public static Observable multicastSelector( return RxJavaPlugins.onAssembly(new MulticastReplay(connectableFactory, selector)); } - /** - * Child Observers will observe the events of the ConnectableObservable on the - * specified scheduler. - * @param the value type - * @param co the connectable observable instance - * @param scheduler the target scheduler - * @return the new ConnectableObservable instance - */ - public static ConnectableObservable observeOn(final ConnectableObservable co, final Scheduler scheduler) { - final Observable observable = co.observeOn(scheduler); - return RxJavaPlugins.onAssembly(new Replay(co, observable)); - } - /** * Creates a replaying ConnectableObservable with an unbounded buffer. * @param the value type diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index e78772c7c4..d25713b362 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -1836,11 +1836,6 @@ public Publisher apply(Flowable v) { }, 1, 1, TimeUnit.SECONDS).blockingSubscribe(); } - @Test(expected = NullPointerException.class) - public void replaySchedulerNull() { - just1.replay((Scheduler)null); - } - @Test(expected = NullPointerException.class) public void replayBoundedUnitNull() { just1.replay(new Function, Publisher>() { @@ -1906,11 +1901,6 @@ public void replayTimeSizeBoundedSchedulerNull() { just1.replay(1, 1, TimeUnit.SECONDS, null); } - @Test(expected = NullPointerException.class) - public void replayBufferSchedulerNull() { - just1.replay(1, (Scheduler)null); - } - @Test(expected = NullPointerException.class) public void replayTimeBoundedUnitNull() { just1.replay(1, null, Schedulers.single()); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java index 94a91c7220..aa0d41c0f2 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java @@ -29,7 +29,6 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.testsupport.*; -@SuppressWarnings("deprecation") public class FlowableDematerializeTest { @Test @@ -78,7 +77,7 @@ public Notification apply(Notification v) throws Exception { @Test public void dematerialize1() { Flowable> notifications = Flowable.just(1, 2).materialize(); - Flowable dematerialize = notifications.dematerialize(); + Flowable dematerialize = notifications.dematerialize(Functions.>identity()); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -94,7 +93,7 @@ public void dematerialize1() { public void dematerialize2() { Throwable exception = new Throwable("test"); Flowable flowable = Flowable.error(exception); - Flowable dematerialize = flowable.materialize().dematerialize(); + Flowable dematerialize = flowable.materialize().dematerialize(Functions.>identity()); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -109,7 +108,7 @@ public void dematerialize2() { public void dematerialize3() { Exception exception = new Exception("test"); Flowable flowable = Flowable.error(exception); - Flowable dematerialize = flowable.materialize().dematerialize(); + Flowable dematerialize = flowable.materialize().dematerialize(Functions.>identity()); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -123,8 +122,8 @@ public void dematerialize3() { @Test public void errorPassThru() { Exception exception = new Exception("test"); - Flowable flowable = Flowable.error(exception); - Flowable dematerialize = flowable.dematerialize(); + Flowable> flowable = Flowable.error(exception); + Flowable dematerialize = flowable.dematerialize(Functions.>identity()); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -137,8 +136,8 @@ public void errorPassThru() { @Test public void completePassThru() { - Flowable flowable = Flowable.empty(); - Flowable dematerialize = flowable.dematerialize(); + Flowable> flowable = Flowable.empty(); + Flowable dematerialize = flowable.dematerialize(Functions.>identity()); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -156,7 +155,7 @@ public void completePassThru() { public void honorsContractWhenCompleted() { Flowable source = Flowable.just(1); - Flowable result = source.materialize().dematerialize(); + Flowable result = source.materialize().dematerialize(Functions.>identity()); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -171,7 +170,7 @@ public void honorsContractWhenCompleted() { public void honorsContractWhenThrows() { Flowable source = Flowable.error(new TestException()); - Flowable result = source.materialize().dematerialize(); + Flowable result = source.materialize().dematerialize(Functions.>identity()); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -184,15 +183,16 @@ public void honorsContractWhenThrows() { @Test public void dispose() { - TestHelper.checkDisposed(Flowable.just(Notification.createOnComplete()).dematerialize()); + TestHelper.checkDisposed(Flowable.just(Notification.createOnComplete()) + .dematerialize(Functions.>identity())); } @Test public void doubleOnSubscribe() { - TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function>, Flowable>() { @Override - public Flowable apply(Flowable f) throws Exception { - return f.dematerialize(); + public Flowable apply(Flowable> f) throws Exception { + return f.dematerialize(Functions.>identity()); } }); } @@ -201,17 +201,17 @@ public Flowable apply(Flowable f) throws Exception { public void eventsAfterDematerializedTerminal() { List errors = TestHelper.trackPluginErrors(); try { - new Flowable() { + new Flowable>() { @Override - protected void subscribeActual(Subscriber subscriber) { + protected void subscribeActual(Subscriber> subscriber) { subscriber.onSubscribe(new BooleanSubscription()); subscriber.onNext(Notification.createOnComplete()); - subscriber.onNext(Notification.createOnNext(1)); + subscriber.onNext(Notification.createOnNext(1)); subscriber.onNext(Notification.createOnError(new TestException("First"))); subscriber.onError(new TestException("Second")); } } - .dematerialize() + .dematerialize(Functions.>identity()) .test() .assertResult(); @@ -224,15 +224,15 @@ protected void subscribeActual(Subscriber subscriber) { @Test public void nonNotificationInstanceAfterDispose() { - new Flowable() { + new Flowable>() { @Override - protected void subscribeActual(Subscriber subscriber) { + protected void subscribeActual(Subscriber> subscriber) { subscriber.onSubscribe(new BooleanSubscription()); subscriber.onNext(Notification.createOnComplete()); - subscriber.onNext(1); + subscriber.onNext(Notification.createOnNext(1)); } } - .dematerialize() + .dematerialize(Functions.>identity()) .test() .assertResult(); } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayEagerTruncateTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayEagerTruncateTest.java index ac8049387e..59c081e03b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayEagerTruncateTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayEagerTruncateTest.java @@ -1232,16 +1232,6 @@ public void subscribersComeAndGoAtRequestBoundaries2() { ts3.assertComplete(); } - @Test - public void replayScheduler() { - - Flowable.just(1).replay(Schedulers.computation()) - .autoConnect() - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replayTime() { Flowable.just(1).replay(1, TimeUnit.MINUTES, Schedulers.computation(), true) @@ -1251,16 +1241,6 @@ public void replayTime() { .assertResult(1); } - @Test - public void replaySizeScheduler() { - - Flowable.just(1).replay(1, Schedulers.computation()) - .autoConnect() - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replaySizeAndTime() { Flowable.just(1).replay(1, 1, TimeUnit.MILLISECONDS, Schedulers.computation(), true) @@ -1270,22 +1250,6 @@ public void replaySizeAndTime() { .assertResult(1); } - @Test - public void replaySelectorSizeScheduler() { - Flowable.just(1).replay(Functions.>identity(), 1, Schedulers.io()) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - - @Test - public void replaySelectorScheduler() { - Flowable.just(1).replay(Functions.>identity(), Schedulers.io()) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replaySelectorTime() { Flowable.just(1).replay(Functions.>identity(), 1, TimeUnit.MINUTES, Schedulers.computation(), true) @@ -1741,19 +1705,6 @@ public void timedNoOutdatedData() { source.test().assertResult(); } - @Test - public void replaySelectorReturnsNull() { - Flowable.just(1) - .replay(new Function, Publisher>() { - @Override - public Publisher apply(Flowable v) throws Exception { - return null; - } - }, Schedulers.trampoline()) - .to(TestHelper.testConsumer()) - .assertFailureAndMessage(NullPointerException.class, "The selector returned a null Publisher"); - } - @Test public void multicastSelectorCallableConnectableCrash() { FlowableReplay.multicastSelector(new Supplier>() { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index 2c4ec557cc..6005709f07 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -1233,16 +1233,6 @@ public void subscribersComeAndGoAtRequestBoundaries2() { ts3.assertComplete(); } - @Test - public void replayScheduler() { - - Flowable.just(1).replay(Schedulers.computation()) - .autoConnect() - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replayTime() { Flowable.just(1).replay(1, TimeUnit.MINUTES) @@ -1252,16 +1242,6 @@ public void replayTime() { .assertResult(1); } - @Test - public void replaySizeScheduler() { - - Flowable.just(1).replay(1, Schedulers.computation()) - .autoConnect() - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replaySizeAndTime() { Flowable.just(1).replay(1, 1, TimeUnit.MILLISECONDS) @@ -1271,22 +1251,6 @@ public void replaySizeAndTime() { .assertResult(1); } - @Test - public void replaySelectorSizeScheduler() { - Flowable.just(1).replay(Functions.>identity(), 1, Schedulers.io()) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - - @Test - public void replaySelectorScheduler() { - Flowable.just(1).replay(Functions.>identity(), Schedulers.io()) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replaySelectorTime() { Flowable.just(1).replay(Functions.>identity(), 1, TimeUnit.MINUTES) @@ -1742,19 +1706,6 @@ public void timedNoOutdatedData() { source.test().assertResult(); } - @Test - public void replaySelectorReturnsNull() { - Flowable.just(1) - .replay(new Function, Publisher>() { - @Override - public Publisher apply(Flowable v) throws Exception { - return null; - } - }, Schedulers.trampoline()) - .to(TestHelper.testConsumer()) - .assertFailureAndMessage(NullPointerException.class, "The selector returned a null Publisher"); - } - @Test public void multicastSelectorCallableConnectableCrash() { FlowableReplay.multicastSelector(new Supplier>() { diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java index b9aed71dea..3b5952850a 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java @@ -28,7 +28,6 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.testsupport.*; -@SuppressWarnings("deprecation") public class ObservableDematerializeTest { @Test @@ -77,7 +76,7 @@ public Notification apply(Notification v) throws Exception { @Test public void dematerialize1() { Observable> notifications = Observable.just(1, 2).materialize(); - Observable dematerialize = notifications.dematerialize(); + Observable dematerialize = notifications.dematerialize(Functions.>identity()); Observer observer = TestHelper.mockObserver(); @@ -93,7 +92,7 @@ public void dematerialize1() { public void dematerialize2() { Throwable exception = new Throwable("test"); Observable o = Observable.error(exception); - Observable dematerialize = o.materialize().dematerialize(); + Observable dematerialize = o.materialize().dematerialize(Functions.>identity()); Observer observer = TestHelper.mockObserver(); @@ -108,7 +107,7 @@ public void dematerialize2() { public void dematerialize3() { Exception exception = new Exception("test"); Observable o = Observable.error(exception); - Observable dematerialize = o.materialize().dematerialize(); + Observable dematerialize = o.materialize().dematerialize(Functions.>identity()); Observer observer = TestHelper.mockObserver(); @@ -122,8 +121,8 @@ public void dematerialize3() { @Test public void errorPassThru() { Exception exception = new Exception("test"); - Observable o = Observable.error(exception); - Observable dematerialize = o.dematerialize(); + Observable> o = Observable.error(exception); + Observable dematerialize = o.dematerialize(Functions.>identity()); Observer observer = TestHelper.mockObserver(); @@ -136,8 +135,8 @@ public void errorPassThru() { @Test public void completePassThru() { - Observable o = Observable.empty(); - Observable dematerialize = o.dematerialize(); + Observable> o = Observable.empty(); + Observable dematerialize = o.dematerialize(Functions.>identity()); Observer observer = TestHelper.mockObserver(); @@ -155,7 +154,7 @@ public void completePassThru() { public void honorsContractWhenCompleted() { Observable source = Observable.just(1); - Observable result = source.materialize().dematerialize(); + Observable result = source.materialize().dematerialize(Functions.>identity()); Observer o = TestHelper.mockObserver(); @@ -170,7 +169,7 @@ public void honorsContractWhenCompleted() { public void honorsContractWhenThrows() { Observable source = Observable.error(new TestException()); - Observable result = source.materialize().dematerialize(); + Observable result = source.materialize().dematerialize(Functions.>identity()); Observer o = TestHelper.mockObserver(); @@ -183,15 +182,15 @@ public void honorsContractWhenThrows() { @Test public void dispose() { - TestHelper.checkDisposed(Observable.just(Notification.createOnComplete()).dematerialize()); + TestHelper.checkDisposed(Observable.just(Notification.createOnComplete()).dematerialize(Functions.>identity())); } @Test public void doubleOnSubscribe() { - TestHelper.checkDoubleOnSubscribeObservable(new Function, ObservableSource>() { + TestHelper.checkDoubleOnSubscribeObservable(new Function>, ObservableSource>() { @Override - public ObservableSource apply(Observable o) throws Exception { - return o.dematerialize(); + public ObservableSource apply(Observable> o) throws Exception { + return o.dematerialize(Functions.>identity()); } }); } @@ -200,17 +199,17 @@ public ObservableSource apply(Observable o) throws Exception { public void eventsAfterDematerializedTerminal() { List errors = TestHelper.trackPluginErrors(); try { - new Observable() { + new Observable>() { @Override - protected void subscribeActual(Observer observer) { + protected void subscribeActual(Observer> observer) { observer.onSubscribe(Disposables.empty()); observer.onNext(Notification.createOnComplete()); - observer.onNext(Notification.createOnNext(1)); + observer.onNext(Notification.createOnNext(1)); observer.onNext(Notification.createOnError(new TestException("First"))); observer.onError(new TestException("Second")); } } - .dematerialize() + .dematerialize(Functions.>identity()) .test() .assertResult(); @@ -223,15 +222,15 @@ protected void subscribeActual(Observer observer) { @Test public void nonNotificationInstanceAfterDispose() { - new Observable() { + new Observable>() { @Override - protected void subscribeActual(Observer observer) { + protected void subscribeActual(Observer> observer) { observer.onSubscribe(Disposables.empty()); observer.onNext(Notification.createOnComplete()); - observer.onNext(1); + observer.onNext(Notification.createOnNext(1)); } } - .dematerialize() + .dematerialize(Functions.>identity()) .test() .assertResult(); } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayEagerTruncateTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayEagerTruncateTest.java index 2922bf1d07..a12c8f634c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayEagerTruncateTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayEagerTruncateTest.java @@ -1097,16 +1097,6 @@ public void onNext(Integer t) { to.assertError(TestException.class); } - @Test - public void replayScheduler() { - - Observable.just(1).replay(Schedulers.computation()) - .autoConnect() - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replayTime() { Observable.just(1).replay(1, TimeUnit.MINUTES, Schedulers.computation(), true) @@ -1116,16 +1106,6 @@ public void replayTime() { .assertResult(1); } - @Test - public void replaySizeScheduler() { - - Observable.just(1).replay(1, Schedulers.computation()) - .autoConnect() - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replaySizeAndTime() { Observable.just(1).replay(1, 1, TimeUnit.MILLISECONDS, Schedulers.computation(), true) @@ -1135,22 +1115,6 @@ public void replaySizeAndTime() { .assertResult(1); } - @Test - public void replaySelectorSizeScheduler() { - Observable.just(1).replay(Functions.>identity(), 1, Schedulers.io()) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - - @Test - public void replaySelectorScheduler() { - Observable.just(1).replay(Functions.>identity(), Schedulers.io()) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replaySelectorTime() { Observable.just(1).replay(Functions.>identity(), 1, TimeUnit.MINUTES) @@ -1528,19 +1492,6 @@ public void timedNoOutdatedData() { source.test().assertResult(); } - @Test - public void replaySelectorReturnsNullScheduled() { - Observable.just(1) - .replay(new Function, Observable>() { - @Override - public Observable apply(Observable v) throws Exception { - return null; - } - }, Schedulers.trampoline()) - .to(TestHelper.testConsumer()) - .assertFailureAndMessage(NullPointerException.class, "The selector returned a null ObservableSource"); - } - @Test public void replaySelectorReturnsNull() { Observable.just(1) diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index bd4bdb92ff..0f4ee253b8 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -1097,16 +1097,6 @@ public void onNext(Integer t) { to.assertError(TestException.class); } - @Test - public void replayScheduler() { - - Observable.just(1).replay(Schedulers.computation()) - .autoConnect() - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replayTime() { Observable.just(1).replay(1, TimeUnit.MINUTES) @@ -1116,16 +1106,6 @@ public void replayTime() { .assertResult(1); } - @Test - public void replaySizeScheduler() { - - Observable.just(1).replay(1, Schedulers.computation()) - .autoConnect() - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replaySizeAndTime() { Observable.just(1).replay(1, 1, TimeUnit.MILLISECONDS) @@ -1135,22 +1115,6 @@ public void replaySizeAndTime() { .assertResult(1); } - @Test - public void replaySelectorSizeScheduler() { - Observable.just(1).replay(Functions.>identity(), 1, Schedulers.io()) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - - @Test - public void replaySelectorScheduler() { - Observable.just(1).replay(Functions.>identity(), Schedulers.io()) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertResult(1); - } - @Test public void replaySelectorTime() { Observable.just(1).replay(Functions.>identity(), 1, TimeUnit.MINUTES) @@ -1528,19 +1492,6 @@ public void timedNoOutdatedData() { source.test().assertResult(); } - @Test - public void replaySelectorReturnsNullScheduled() { - Observable.just(1) - .replay(new Function, Observable>() { - @Override - public Observable apply(Observable v) throws Exception { - return null; - } - }, Schedulers.trampoline()) - .to(TestHelper.testConsumer()) - .assertFailureAndMessage(NullPointerException.class, "The selector returned a null ObservableSource"); - } - @Test public void replaySelectorReturnsNull() { Observable.just(1) diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java index 47c2ab674a..aace11648a 100644 --- a/src/test/java/io/reactivex/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java @@ -1873,11 +1873,6 @@ public Observable apply(Observable v) { }, 1, 1, TimeUnit.SECONDS).blockingSubscribe(); } - @Test(expected = NullPointerException.class) - public void replaySchedulerNull() { - just1.replay((Scheduler)null); - } - @Test(expected = NullPointerException.class) public void replayBoundedUnitNull() { just1.replay(new Function, Observable>() { @@ -1943,11 +1938,6 @@ public void replayTimeSizeBoundedSchedulerNull() { just1.replay(1, 1, TimeUnit.SECONDS, null); } - @Test(expected = NullPointerException.class) - public void replayBufferSchedulerNull() { - just1.replay(1, (Scheduler)null); - } - @Test(expected = NullPointerException.class) public void replayTimeBoundedUnitNull() { just1.replay(1, null, Schedulers.single());