Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes the purge thread in favour of standard ScheduledThreadPoolExecutor APIs #7293

Merged
merged 1 commit into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,86 +31,11 @@ private SchedulerPoolFactory() {

static final String PURGE_ENABLED_KEY = "rx3.purge-enabled";

/**
* Indicates the periodic purging of the ScheduledExecutorService is enabled.
*/
public static final boolean PURGE_ENABLED;

static final String PURGE_PERIOD_SECONDS_KEY = "rx3.purge-period-seconds";

/**
* Indicates the purge period of the ScheduledExecutorServices created by create().
*/
public static final int PURGE_PERIOD_SECONDS;

static final AtomicReference<ScheduledExecutorService> PURGE_THREAD =
new AtomicReference<>();

// Upcast to the Map interface here to avoid 8.x compatibility issues.
// See http://stackoverflow.com/a/32955708/61158
static final Map<ScheduledThreadPoolExecutor, Object> POOLS =
new ConcurrentHashMap<>();

/**
* Starts the purge thread if not already started.
*/
public static void start() {
tryStart(PURGE_ENABLED);
}

static void tryStart(boolean purgeEnabled) {
if (purgeEnabled) {
for (;;) {
ScheduledExecutorService curr = PURGE_THREAD.get();
if (curr != null) {
return;
}
ScheduledExecutorService next = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSchedulerPurge"));
if (PURGE_THREAD.compareAndSet(curr, next)) {

next.scheduleAtFixedRate(new ScheduledTask(), PURGE_PERIOD_SECONDS, PURGE_PERIOD_SECONDS, TimeUnit.SECONDS);

return;
} else {
next.shutdownNow();
}
}
}
}

/**
* Stops the purge thread.
*/
public static void shutdown() {
ScheduledExecutorService exec = PURGE_THREAD.getAndSet(null);
if (exec != null) {
exec.shutdownNow();
}
POOLS.clear();
}

static {
SystemPropertyAccessor propertyAccessor = new SystemPropertyAccessor();
PURGE_ENABLED = getBooleanProperty(true, PURGE_ENABLED_KEY, true, true, propertyAccessor);
PURGE_PERIOD_SECONDS = getIntProperty(PURGE_ENABLED, PURGE_PERIOD_SECONDS_KEY, 1, 1, propertyAccessor);

start();
}

static int getIntProperty(boolean enabled, String key, int defaultNotFound, int defaultNotEnabled, Function<String, String> propertyAccessor) {
if (enabled) {
try {
String value = propertyAccessor.apply(key);
if (value == null) {
return defaultNotFound;
}
return Integer.parseInt(value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
return defaultNotFound;
}
}
return defaultNotEnabled;
}

static boolean getBooleanProperty(boolean enabled, String key, boolean defaultNotFound, boolean defaultNotEnabled, Function<String, String> propertyAccessor) {
Expand Down Expand Up @@ -142,28 +67,8 @@ public String apply(String t) {
* @return the ScheduledExecutorService
*/
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
return exec;
}

static void tryPutIntoPool(boolean purgeEnabled, ScheduledExecutorService exec) {
if (purgeEnabled && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
}

static final class ScheduledTask implements Runnable {
@Override
public void run() {
for (ScheduledThreadPoolExecutor e : new ArrayList<>(POOLS.keySet())) {
if (e.isShutdown()) {
POOLS.remove(e);
} else {
e.purge();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
* <li>{@code rx3.computation-priority} (int): sets the thread priority of the {@link #computation()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.newthread-priority} (int): sets the thread priority of the {@link #newThread()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.single-priority} (int): sets the thread priority of the {@link #single()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.purge-enabled} (boolean): enables periodic purging of all {@code Scheduler}'s backing thread pools, default is {@code false}</li>
* <li>{@code rx3.purge-period-seconds} (int): specifies the periodic purge interval of all {@code Scheduler}'s backing thread pools, default is 1 second</li>
* <li>{@code rx3.purge-enabled} (boolean): enables purging of all {@code Scheduler}'s backing thread pools, default is {@code true}</li>
* <li>{@code rx3.scheduler.use-nanotime} (boolean): {@code true} instructs {@code Scheduler} to use {@link System#nanoTime()} for {@link Scheduler#now(TimeUnit)},
* instead of default {@link System#currentTimeMillis()} ({@code false})</li>
* </ul>
Expand Down Expand Up @@ -556,7 +555,6 @@ public static void shutdown() {
newThread().shutdown();
single().shutdown();
trampoline().shutdown();
SchedulerPoolFactory.shutdown();
}

/**
Expand All @@ -569,7 +567,6 @@ public static void start() {
newThread().start();
single().start();
trampoline().start();
SchedulerPoolFactory.start();
}

static final class IOTask implements Supplier<Scheduler> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,50 +30,6 @@ public void utilityClass() {
TestHelper.checkUtilityClass(SchedulerPoolFactory.class);
}

@Test
public void multiStartStop() {
SchedulerPoolFactory.shutdown();

SchedulerPoolFactory.shutdown();

SchedulerPoolFactory.tryStart(false);

assertNull(SchedulerPoolFactory.PURGE_THREAD.get());

SchedulerPoolFactory.start();

// restart schedulers
Schedulers.shutdown();

Schedulers.start();
}

@Test
public void startRace() throws InterruptedException {
try {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
SchedulerPoolFactory.shutdown();

Runnable r1 = new Runnable() {
@Override
public void run() {
SchedulerPoolFactory.start();
}
};

TestHelper.race(r1, r1);
}

} finally {
// restart schedulers
Schedulers.shutdown();

Thread.sleep(200);

Schedulers.start();
}
}

@Test
public void boolPropertiesDisabledReturnsDefaultDisabled() throws Throwable {
assertTrue(SchedulerPoolFactory.getBooleanProperty(false, "key", false, true, failingPropertiesAccessor));
Expand All @@ -98,30 +54,6 @@ public void boolPropertiesReturnsValue() throws Throwable {
assertFalse(SchedulerPoolFactory.getBooleanProperty(true, "false", false, true, Functions.<String>identity()));
}

@Test
public void intPropertiesDisabledReturnsDefaultDisabled() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 0, -1, failingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(false, "key", 1, -1, failingPropertiesAccessor));
}

@Test
public void intPropertiesEnabledMissingReturnsDefaultMissing() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, missingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, missingPropertiesAccessor));
}

@Test
public void intPropertiesFailureReturnsDefaultMissing() throws Throwable {
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 0, failingPropertiesAccessor));
assertEquals(-1, SchedulerPoolFactory.getIntProperty(true, "key", -1, 1, failingPropertiesAccessor));
}

@Test
public void intPropertiesReturnsValue() throws Throwable {
assertEquals(1, SchedulerPoolFactory.getIntProperty(true, "1", 0, 4, Functions.<String>identity()));
assertEquals(2, SchedulerPoolFactory.getIntProperty(true, "2", 3, 5, Functions.<String>identity()));
}

static final Function<String, String> failingPropertiesAccessor = new Function<String, String>() {
@Override
public String apply(String v) throws Throwable {
Expand All @@ -135,22 +67,4 @@ public String apply(String v) throws Throwable {
return null;
}
};

@Test
public void putIntoPoolNoPurge() {
int s = SchedulerPoolFactory.POOLS.size();

SchedulerPoolFactory.tryPutIntoPool(false, null);

assertEquals(s, SchedulerPoolFactory.POOLS.size());
}

@Test
public void putIntoPoolNonThreadPool() {
int s = SchedulerPoolFactory.POOLS.size();

SchedulerPoolFactory.tryPutIntoPool(true, null);

assertEquals(s, SchedulerPoolFactory.POOLS.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void run() {

System.out.println("Wait before second GC");
System.out.println("JDK 6 purge is N log N because it removes and shifts one by one");
int t = (int)(n * Math.log(n) / 100) + SchedulerPoolFactory.PURGE_PERIOD_SECONDS * 1000;
int t = (int)(n * Math.log(n) / 100) + 1000;
int sleepStep = 100;
while (t > 0) {
System.out.printf(" >> Waiting for purge: %.2f s remaining%n", t / 1000d);
Expand Down