Skip to content

Commit

Permalink
3.x: Add fair mode overload to Schedulers.from(Executor) (#6744)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Dec 3, 2019
1 parent e912237 commit 292dc62
Show file tree
Hide file tree
Showing 5 changed files with 655 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,23 @@ public final class ExecutorScheduler extends Scheduler {

final boolean interruptibleWorker;

final boolean fair;

@NonNull
final Executor executor;

static final Scheduler HELPER = Schedulers.single();

public ExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker) {
public ExecutorScheduler(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) {
this.executor = executor;
this.interruptibleWorker = interruptibleWorker;
this.fair = fair;
}

@NonNull
@Override
public Worker createWorker() {
return new ExecutorWorker(executor, interruptibleWorker);
return new ExecutorWorker(executor, interruptibleWorker, fair);
}

@NonNull
Expand Down Expand Up @@ -123,6 +126,8 @@ public static final class ExecutorWorker extends Scheduler.Worker implements Run

final boolean interruptibleWorker;

final boolean fair;

final Executor executor;

final MpscLinkedQueue<Runnable> queue;
Expand All @@ -133,10 +138,11 @@ public static final class ExecutorWorker extends Scheduler.Worker implements Run

final CompositeDisposable tasks = new CompositeDisposable();

public ExecutorWorker(Executor executor, boolean interruptibleWorker) {
public ExecutorWorker(Executor executor, boolean interruptibleWorker, boolean fair) {
this.executor = executor;
this.queue = new MpscLinkedQueue<Runnable>();
this.interruptibleWorker = interruptibleWorker;
this.fair = fair;
}

@NonNull
Expand Down Expand Up @@ -236,6 +242,36 @@ public boolean isDisposed() {

@Override
public void run() {
if (fair) {
runFair();
} else {
runEager();
}
}

void runFair() {
final MpscLinkedQueue<Runnable> q = queue;
if (disposed) {
q.clear();
return;
}

Runnable run = q.poll();
if (run != null) {
run.run();
}

if (disposed) {
q.clear();
return;
}

if (wip.decrementAndGet() != 0) {
executor.execute(this);
}
}

void runEager() {
int missed = 1;
final MpscLinkedQueue<Runnable> q = queue;
for (;;) {
Expand Down
81 changes: 79 additions & 2 deletions src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ public static Scheduler single() {
* non-delayed tasks as it can, which may result in a longer than expected occupation of a
* thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness
* in case the worker runs on a shared underlying thread of the Executor.
* See {@link #from(Executor, boolean, boolean)} to create a wrapper that uses the underlying Executor
* more fairly.
* <p>
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
* executor's lifecycle must be managed externally:
Expand All @@ -346,10 +348,11 @@ public static Scheduler single() {
* @param executor
* the executor to wrap
* @return the new Scheduler wrapping the Executor
* @see #from(Executor, boolean, boolean)
*/
@NonNull
public static Scheduler from(@NonNull Executor executor) {
return new ExecutorScheduler(executor, false);
return new ExecutorScheduler(executor, false, false);
}

/**
Expand Down Expand Up @@ -382,6 +385,8 @@ public static Scheduler from(@NonNull Executor executor) {
* non-delayed tasks as it can, which may result in a longer than expected occupation of a
* thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness
* in case the worker runs on a shared underlying thread of the Executor.
* See {@link #from(Executor, boolean, boolean)} to create a wrapper that uses the underlying Executor
* more fairly.
* <p>
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
* executor's lifecycle must be managed externally:
Expand Down Expand Up @@ -411,10 +416,82 @@ public static Scheduler from(@NonNull Executor executor) {
* be interrupted when the task is disposed.
* @return the new Scheduler wrapping the Executor
* @since 3.0.0
* @see #from(Executor, boolean, boolean)
*/
@NonNull
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker) {
return new ExecutorScheduler(executor, interruptibleWorker);
return new ExecutorScheduler(executor, interruptibleWorker, false);
}

/**
* Wraps an {@link Executor} into a new Scheduler instance and delegates {@code schedule()}
* calls to it.
* <p>
* The tasks scheduled by the returned {@link Scheduler} and its {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker}
* can be optionally interrupted.
* <p>
* If the provided executor doesn't support any of the more specific standard Java executor
* APIs, tasks scheduled with a time delay or periodically will use the
* {@link #single()} scheduler for the timed waiting
* before posting the actual task to the given executor.
* <p>
* If the provided executor supports the standard Java {@link ExecutorService} API,
* canceling tasks scheduled by this scheduler can be cancelled/interrupted by calling
* {@link io.reactivex.rxjava3.disposables.Disposable#dispose()}. In addition, tasks scheduled with
* a time delay or periodically will use the {@link #single()} scheduler for the timed waiting
* before posting the actual task to the given executor.
* <p>
* If the provided executor supports the standard Java {@link ScheduledExecutorService} API,
* canceling tasks scheduled by this scheduler can be cancelled/interrupted by calling
* {@link io.reactivex.rxjava3.disposables.Disposable#dispose()}. In addition, tasks scheduled with
* a time delay or periodically will use the provided executor. Note, however, if the provided
* {@code ScheduledExecutorService} instance is not single threaded, tasks scheduled
* with a time delay close to each other may end up executing in different order than
* the original schedule() call was issued. This limitation may be lifted in a future patch.
* <p>
* The implementation of the Worker of this wrapper Scheduler can operate in both eager (non-fair) and
* fair modes depending on the specified parameter. In <em>eager</em> mode, it will execute as many
* non-delayed tasks as it can, which may result in a longer than expected occupation of a
* thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness
* in case the worker runs on a shared underlying thread of the Executor. In <em>fair</em> mode,
* non-delayed tasks will still be executed in a FIFO and non-overlapping manner, but after each task,
* the execution for the next task is rescheduled with the same underlying Executor, allowing interleaving
* from both the same Scheduler or other external usages of the underlying Executor.
* <p>
* Starting, stopping and restarting this scheduler is not supported (no-op) and the provided
* executor's lifecycle must be managed externally:
* <pre><code>
* ExecutorService exec = Executors.newSingleThreadedExecutor();
* try {
* Scheduler scheduler = Schedulers.from(exec, true, true);
* Flowable.just(1)
* .subscribeOn(scheduler)
* .map(v -&gt; v + 1)
* .observeOn(scheduler)
* .blockingSubscribe(System.out::println);
* } finally {
* exec.shutdown();
* }
* </code></pre>
* <p>
* This type of scheduler is less sensitive to leaking {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} instances, although
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
* execute those tasks "unexpectedly".
* <p>
* Note that this method returns a new {@link Scheduler} instance, even for the same {@link Executor} instance.
* @param executor
* the executor to wrap
* @param interruptibleWorker if {@code true} the tasks submitted to the {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} will
* be interrupted when the task is disposed.
* @param fair if {@code true} tasks submitted to the will be executed by the underlying {@link Executor} one after the other, still
* in a FIFO and non-overlapping manner, but allows interleaving with other tasks submitted to the underlying {@code Executor}.
* If {@code false}, the underlying FIFO scheme will execute as many tasks as it can before giving up the underlying {@code Executor} thread.
* @return the new Scheduler wrapping the Executor
* @since 3.0.0
*/
@NonNull
public static Scheduler from(@NonNull Executor executor, boolean interruptibleWorker, boolean fair) {
return new ExecutorScheduler(executor, interruptibleWorker, fair);
}

/**
Expand Down
Loading

0 comments on commit 292dc62

Please sign in to comment.