Skip to content

Commit

Permalink
Removes the purge thread in favor of standard ScheduledThreadPoolExec…
Browse files Browse the repository at this point in the history
…utor APIs

The RXSchedulerPurge thread is currently enabled by default and runs every second
to call purge() on each executor in the pool. This is causing significant issues
for low powered devices (e.g. mobile phones), because it needs to periodically
wake up the CPU to perform purging. The RXSchedulerPurge thread could be completely
removed in favor of using the standard setRemoveOnCancelPolicy() API on the
ScheduledThreadPoolExecutor which became available in Java 7 and offers removal
of cancelled tasks at the moment they are cancelled in O(1).
  • Loading branch information
chggr committed Jul 15, 2021
1 parent c5883dc commit 331b50c
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,86 +31,11 @@ private SchedulerPoolFactory() {

static final String PURGE_ENABLED_KEY = "rx3.purge-enabled";

/**
* Indicates the periodic purging of the ScheduledExecutorService is enabled.
*/
public static final boolean PURGE_ENABLED;

static final String PURGE_PERIOD_SECONDS_KEY = "rx3.purge-period-seconds";

/**
* Indicates the purge period of the ScheduledExecutorServices created by create().
*/
public static final int PURGE_PERIOD_SECONDS;

static final AtomicReference<ScheduledExecutorService> PURGE_THREAD =
new AtomicReference<>();

// Upcast to the Map interface here to avoid 8.x compatibility issues.
// See http://stackoverflow.com/a/32955708/61158
static final Map<ScheduledThreadPoolExecutor, Object> POOLS =
new ConcurrentHashMap<>();

/**
* Starts the purge thread if not already started.
*/
public static void start() {
tryStart(PURGE_ENABLED);
}

static void tryStart(boolean purgeEnabled) {
if (purgeEnabled) {
for (;;) {
ScheduledExecutorService curr = PURGE_THREAD.get();
if (curr != null) {
return;
}
ScheduledExecutorService next = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSchedulerPurge"));
if (PURGE_THREAD.compareAndSet(curr, next)) {

next.scheduleAtFixedRate(new ScheduledTask(), PURGE_PERIOD_SECONDS, PURGE_PERIOD_SECONDS, TimeUnit.SECONDS);

return;
} else {
next.shutdownNow();
}
}
}
}

/**
* Stops the purge thread.
*/
public static void shutdown() {
ScheduledExecutorService exec = PURGE_THREAD.getAndSet(null);
if (exec != null) {
exec.shutdownNow();
}
POOLS.clear();
}

static {
SystemPropertyAccessor propertyAccessor = new SystemPropertyAccessor();
PURGE_ENABLED = getBooleanProperty(true, PURGE_ENABLED_KEY, true, true, propertyAccessor);
PURGE_PERIOD_SECONDS = getIntProperty(PURGE_ENABLED, PURGE_PERIOD_SECONDS_KEY, 1, 1, propertyAccessor);

start();
}

static int getIntProperty(boolean enabled, String key, int defaultNotFound, int defaultNotEnabled, Function<String, String> propertyAccessor) {
if (enabled) {
try {
String value = propertyAccessor.apply(key);
if (value == null) {
return defaultNotFound;
}
return Integer.parseInt(value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
return defaultNotFound;
}
}
return defaultNotEnabled;
}

static boolean getBooleanProperty(boolean enabled, String key, boolean defaultNotFound, boolean defaultNotEnabled, Function<String, String> propertyAccessor) {
Expand Down Expand Up @@ -142,28 +67,8 @@ public String apply(String t) {
* @return the ScheduledExecutorService
*/
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
return exec;
}

static void tryPutIntoPool(boolean purgeEnabled, ScheduledExecutorService exec) {
if (purgeEnabled && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
}

static final class ScheduledTask implements Runnable {
@Override
public void run() {
for (ScheduledThreadPoolExecutor e : new ArrayList<>(POOLS.keySet())) {
if (e.isShutdown()) {
POOLS.remove(e);
} else {
e.purge();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
* <li>{@code rx3.computation-priority} (int): sets the thread priority of the {@link #computation()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.newthread-priority} (int): sets the thread priority of the {@link #newThread()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.single-priority} (int): sets the thread priority of the {@link #single()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.purge-enabled} (boolean): enables periodic purging of all {@code Scheduler}'s backing thread pools, default is {@code false}</li>
* <li>{@code rx3.purge-period-seconds} (int): specifies the periodic purge interval of all {@code Scheduler}'s backing thread pools, default is 1 second</li>
* <li>{@code rx3.purge-enabled} (boolean): enables purging of all {@code Scheduler}'s backing thread pools, default is {@code true}</li>
* <li>{@code rx3.scheduler.use-nanotime} (boolean): {@code true} instructs {@code Scheduler} to use {@link System#nanoTime()} for {@link Scheduler#now(TimeUnit)},
* instead of default {@link System#currentTimeMillis()} ({@code false})</li>
* </ul>
Expand Down Expand Up @@ -556,7 +555,6 @@ public static void shutdown() {
newThread().shutdown();
single().shutdown();
trampoline().shutdown();
SchedulerPoolFactory.shutdown();
}

/**
Expand All @@ -569,7 +567,6 @@ public static void start() {
newThread().start();
single().start();
trampoline().start();
SchedulerPoolFactory.start();
}

static final class IOTask implements Supplier<Scheduler> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,50 +30,6 @@ public void utilityClass() {
TestHelper.checkUtilityClass(SchedulerPoolFactory.class);
}

@Test
public void multiStartStop() {
SchedulerPoolFactory.shutdown();

SchedulerPoolFactory.shutdown();

SchedulerPoolFactory.tryStart(false);

assertNull(SchedulerPoolFactory.PURGE_THREAD.get());

SchedulerPoolFactory.start();

// restart schedulers
Schedulers.shutdown();

Schedulers.start();
}

@Test
public void startRace() throws InterruptedException {
try {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
SchedulerPoolFactory.shutdown();

Runnable r1 = new Runnable() {
@Override
public void run() {
SchedulerPoolFactory.start();
}
};

TestHelper.race(r1, r1);
}

} finally {
// restart schedulers
Schedulers.shutdown();

Thread.sleep(200);

Schedulers.start();
}
}

@Test
public void boolPropertiesDisabledReturnsDefaultDisabled() throws Throwable {
assertTrue(SchedulerPoolFactory.getBooleanProperty(false, "key", false, true, failingPropertiesAccessor));
Expand All @@ -98,30 +54,6 @@ public void boolPropertiesReturnsValue() throws Throwable {
assertFalse(SchedulerPoolFactory.getBooleanProperty(true, "false", false, true, Functions.<String>identity()));
}

@Test
public void intPropertiesDisabledReturnsDefaultDisabled() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 0, -1, failingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 1, -1, failingPropertiesAccessor));
}

@Test
public void intPropertiesEnabledMissingReturnsDefaultMissing() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, missingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, missingPropertiesAccessor));
}

@Test
public void intPropertiesFailureReturnsDefaultMissing() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, failingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, failingPropertiesAccessor));
}

@Test
public void intPropertiesReturnsValue() throws Throwable {
assertEquals(1, SchedulerPoolFactory.getIntProperty(true, "1", 0, 4, Functions.<String>identity()));
assertEquals(2, SchedulerPoolFactory.getIntProperty(true, "2", 3, 5, Functions.<String>identity()));
}

static final Function<String, String> failingPropertiesAccessor = new Function<String, String>() {
@Override
public String apply(String v) throws Throwable {
Expand All @@ -135,22 +67,4 @@ public String apply(String v) throws Throwable {
return null;
}
};

@Test
public void putIntoPoolNoPurge() {
int s = SchedulerPoolFactory.POOLS.size();

SchedulerPoolFactory.tryPutIntoPool(false, null);

assertEquals(s, SchedulerPoolFactory.POOLS.size());
}

@Test
public void putIntoPoolNonThreadPool() {
int s = SchedulerPoolFactory.POOLS.size();

SchedulerPoolFactory.tryPutIntoPool(true, null);

assertEquals(s, SchedulerPoolFactory.POOLS.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void run() {

System.out.println("Wait before second GC");
System.out.println("JDK 6 purge is N log N because it removes and shifts one by one");
int t = (int)(n * Math.log(n) / 100) + SchedulerPoolFactory.PURGE_PERIOD_SECONDS * 1000;
int t = (int)(n * Math.log(n) / 100) + 1000;
int sleepStep = 100;
while (t > 0) {
System.out.printf(" >> Waiting for purge: %.2f s remaining%n", t / 1000d);
Expand Down

0 comments on commit 331b50c

Please sign in to comment.