Skip to content

Commit

Permalink
Coroutine scheduler is used by default instead of deprecated CommonPool
Browse files Browse the repository at this point in the history
* Documentation and guide are updated correspondingly
* "DefaultDispatcher" is used as a public name of the default impl
* Implementation is integrated with virtual time-source
* Shutdown sequence is reimplemented in a safe way for tests,
  makes "close" safe to use on custom instances.
* "close" on DefaultDispatcher throws exception just in case
* -Dkotlinx.coroutines.scheduler=off can be used to switch back to
  CommonPool

Fixes #198
  • Loading branch information
elizarov committed Sep 28, 2018
1 parent 938c5e9 commit 5b490cc
Show file tree
Hide file tree
Showing 22 changed files with 131 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ public object Dispatchers {
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
* if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is backed by a shared pool of threads on JVM.
* You can set system property "`kotlinx.coroutines.scheduler`" (either no value or to the value of "`on`")
* to use an experimental coroutine dispatcher that shares threads with [Dispatchers.IO] and thus can switch to
* context without performing an actual thread context switch.
* It is backed by a shared pool of threads on JVM. By default, the maximal number of threads used
* by this dispatcher is equal to the number CPU cores, but is at least two.
*/
@JvmField
public val Default: CoroutineDispatcher =
Expand Down
13 changes: 5 additions & 8 deletions core/kotlinx-coroutines-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tasks.withType(Test) {

test {
exclude '**/*LFTest.*'
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
}

task lockFreedomTest(type: Test, dependsOn: testClasses) {
Expand All @@ -48,18 +49,14 @@ task jdk16Test(type: Test, dependsOn: [testClasses, checkJdk16]) {
exclude '**/exceptions/.*'
}

task schedulerTest(type: Test, dependsOn: testClasses) {
systemProperty 'kotlinx.coroutines.scheduler', ''
systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test
}

// Always run those tests
task moreTest(dependsOn: [lockFreedomTest])

build.dependsOn moreTest

// Run these tests only during nightly stress test
task extraTest(dependsOn: [jdk16Test, schedulerTest])
task extraTest(dependsOn: [jdk16Test])


build.dependsOn moreTest

extraTest.onlyIf { project.properties['stressTest'] != null }

Expand Down
2 changes: 1 addition & 1 deletion core/kotlinx-coroutines-core/src/CommonPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ object CommonPool : ExecutorCoroutineDispatcher() {
pool ?: createPool().also { pool = it }

override fun dispatch(context: CoroutineContext, block: Runnable) =
try { (pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
try { (pool ?: getOrCreatePoolSync()).execute(timeSource.wrapTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
Expand Down
6 changes: 3 additions & 3 deletions core/kotlinx-coroutines-core/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.sche

internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
when (value) {
null -> false
"", "on" -> true
null, "", "on" -> true
"off" -> false
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
}
}
Expand All @@ -39,7 +39,7 @@ public actual val DefaultDispatcher: CoroutineDispatcher
get() = Dispatchers.Default

internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) BackgroundDispatcher else CommonPool
if (useCoroutinesScheduler) DefaultScheduler else CommonPool

/**
* The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.
Expand Down
6 changes: 5 additions & 1 deletion core/kotlinx-coroutines-core/src/Dispatchers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public const val IO_PARALLELISM_PROPERTY_NAME = "kotlinx.coroutines.io.paralleli
* The number of threads used by this dispatcher is limited by the value of
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
*
* This dispatcher shares threads with a [Default][Dispatchers.Default] dispatcher, so using
* `withContext(Dispatchers.IO) { ... }` does not lead to an actual switching to another thread —
* typically execution continues in the same thread.
*/
public val Dispatchers.IO: CoroutineDispatcher
get() = BackgroundDispatcher.IO
get() = DefaultScheduler.IO
2 changes: 1 addition & 1 deletion core/kotlinx-coroutines-core/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatc
}

override fun dispatch(context: CoroutineContext, block: Runnable) =
try { executor.execute(timeSource.trackTask(block)) }
try { executor.execute(timeSource.wrapTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
Expand Down
6 changes: 4 additions & 2 deletions core/kotlinx-coroutines-core/src/TimeSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import java.util.concurrent.locks.LockSupport
internal interface TimeSource {
fun currentTimeMillis(): Long
fun nanoTime(): Long
fun trackTask(block: Runnable): Runnable
fun wrapTask(block: Runnable): Runnable
fun trackTask()
fun unTrackTask()
fun registerTimeLoopThread()
fun unregisterTimeLoopThread()
Expand All @@ -20,7 +21,8 @@ internal interface TimeSource {
internal object DefaultTimeSource : TimeSource {
override fun currentTimeMillis(): Long = System.currentTimeMillis()
override fun nanoTime(): Long = System.nanoTime()
override fun trackTask(block: Runnable): Runnable = block
override fun wrapTask(block: Runnable): Runnable = block
override fun trackTask() {}
override fun unTrackTask() {}
override fun registerTimeLoopThread() {}
override fun unregisterTimeLoopThread() {}
Expand Down
102 changes: 48 additions & 54 deletions core/kotlinx-coroutines-core/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -290,35 +290,33 @@ internal class CoroutineScheduler(

override fun execute(command: Runnable) = dispatch(command)

override fun close() = shutdown(1000L)
override fun close() = shutdown(10_000L)

/*
* Shuts down current scheduler and waits until all threads are stopped.
* This method uses unsafe API (does unconditional unparks)
* and intended to be used only for testing. Invocation has no additional effect if already closed.
*/
// Shuts down current scheduler and waits until all work is done and all threads are stopped.
fun shutdown(timeout: Long) {
// atomically set termination flag which is checked when workers are added or removed
if (!isTerminated.compareAndSet(false, true)) return

/*
* Shutdown current thread. Note that shutdown is testing utility,
* so we don't do anything special to properly verify that no tasks are submitted after close()
*/
val thread = Thread.currentThread()
(thread as? Worker)?.tryReleaseCpu(WorkerState.TERMINATED)

// make sure we are not waiting for the current thread
val currentWorker = Thread.currentThread() as? Worker
// Capture # of created workers that cannot change anymore (mind the synchronized block!)
val created = synchronized(workers) { createdWorkers }
for (i in 1..created) {
val worker = workers[i]!!
if (worker.isAlive) {
// Unparking alive thread is unsafe in general, but acceptable for testing purposes
if (worker.isAlive && worker !== currentWorker) {
LockSupport.unpark(worker)
worker.join(timeout)
worker.localQueue.offloadAllWork(globalQueue)
}

}
// Finish processing tasks from globalQueue and/or from this worker's local queue
while (true) {
val task = currentWorker?.findTask() ?: globalQueue.removeFirstOrNull() ?: break
runSafely(task)
}
// cleanup state to make sure that tryUnpark tries to create new threads and crashes because it isTerminated
// Shutdown current thread
currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
// cleanup state to make sure that tryUnpark tries to create new threads and fails because isTerminated
assert(cpuPermits.availablePermits() == corePoolSize)
parkedWorkersStack.value = 0L
controlState.value = 0L
Expand All @@ -333,6 +331,7 @@ internal class CoroutineScheduler(
* @param fair whether the task should be dispatched fairly (strict FIFO) or not (semi-FIFO)
*/
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
timeSource.trackTask() // this is needed for virtual time support
// TODO at some point make DispatchTask extend Task and make its field settable to save an allocation
val task = Task(block, schedulerTimeSource.nanoTime(), taskContext)
// try to submit the task to the local queue and act depending on the result
Expand Down Expand Up @@ -439,7 +438,7 @@ internal class CoroutineScheduler(
private fun createNewWorker(): Int {
synchronized(workers) {
// Make sure we're not trying to resurrect terminated scheduler
if (isTerminated.value) throw ShutdownException()
if (isTerminated.value) throw RejectedExecutionException("$schedulerName was terminated")
val state = controlState.value
val created = createdWorkers(state)
val blocking = blockingWorkers(state)
Expand All @@ -456,9 +455,6 @@ internal class CoroutineScheduler(
}
}

// Is thrown when attempting to create new worker, but this scheduler isTerminated
private class ShutdownException : RuntimeException()

/**
* Returns [ADDED], or [NOT_ADDED], or [ADDED_REQUIRES_HELP].
*/
Expand Down Expand Up @@ -565,6 +561,17 @@ internal class CoroutineScheduler(
"]"
}

private fun runSafely(task: Task) {
try {
task.run()
} catch (e: Throwable) {
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
timeSource.unTrackTask()
}
}

internal inner class Worker private constructor() : Thread() {
init {
isDaemon = true
Expand Down Expand Up @@ -685,41 +692,28 @@ internal class CoroutineScheduler(
private var lastStealIndex = 0 // try in order repeated, reset when unparked

override fun run() {
try {
var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
while (!isTerminated.value && state != WorkerState.TERMINATED) {
val task = findTask()
if (task == null) {
// Wait for a job with potential park
if (state == WorkerState.CPU_ACQUIRED) {
cpuWorkerIdle()
} else {
blockingWorkerIdle()
}
wasIdle = true
var wasIdle = false // local variable to avoid extra idleReset invocations when tasks repeatedly arrive
while (!isTerminated.value && state != WorkerState.TERMINATED) {
val task = findTask()
if (task == null) {
// Wait for a job with potential park
if (state == WorkerState.CPU_ACQUIRED) {
cpuWorkerIdle()
} else {
if (wasIdle) {
idleReset(task.mode)
wasIdle = false
}
beforeTask(task)
runSafely(task)
afterTask(task)
blockingWorkerIdle()
}
wasIdle = true
} else {
if (wasIdle) {
idleReset(task.mode)
wasIdle = false
}
beforeTask(task)
runSafely(task)
afterTask(task)
}
} catch (e: ShutdownException) {
// race with shutdown -- ignore exception and don't print it on the console
} finally {
tryReleaseCpu(WorkerState.TERMINATED)
}
}

private fun runSafely(task: Task) {
try {
task.run()
} catch (t: Throwable) {
uncaughtExceptionHandler.uncaughtException(this, t)
}
tryReleaseCpu(WorkerState.TERMINATED)
}

private fun beforeTask(task: Task) {
Expand Down Expand Up @@ -823,7 +817,7 @@ internal class CoroutineScheduler(
private fun tryTerminateWorker() {
synchronized(workers) {
// Make sure we're not trying race with termination of scheduler
if (isTerminated.value) throw ShutdownException()
if (isTerminated.value) return
// Someone else terminated, bail out
if (createdWorkers <= corePoolSize) return
// Try to find blocking task before termination
Expand Down Expand Up @@ -906,7 +900,7 @@ internal class CoroutineScheduler(
spins = 0 // Volatile write, should be written last
}

private fun findTask(): Task? {
internal fun findTask(): Task? {
if (tryAcquireCpuPermit()) return findTaskWithCpuPermit()
/*
* If the local queue is empty, try to extract blocking task from global queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ package kotlinx.coroutines.experimental.scheduling
import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.internal.*
import java.lang.UnsupportedOperationException
import java.util.concurrent.*
import kotlin.coroutines.experimental.*

/**
* Default instance of coroutine dispatcher for background coroutines (as opposed to UI coroutines).
* Default instance of coroutine dispatcher.
*/
internal object BackgroundDispatcher : ExperimentalCoroutineDispatcher() {
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))

override fun close() {
throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
}
}

/**
Expand All @@ -39,15 +44,14 @@ open class ExperimentalCoroutineDispatcher(
get() = coroutineScheduler

// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)
private var coroutineScheduler = createScheduler()

override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
coroutineScheduler.dispatch(block)

override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
coroutineScheduler.dispatch(block, fair = true)

// TODO throw error when this API becomes public and close it in tests via another method
override fun close() = coroutineScheduler.close()

override fun toString(): String {
Expand Down Expand Up @@ -82,16 +86,23 @@ open class ExperimentalCoroutineDispatcher(
internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean): Unit =
coroutineScheduler.dispatch(block, context, fair)

private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)

// fot tests only
@Synchronized
internal fun usePrivateScheduler() {
coroutineScheduler.shutdown(1000L)
coroutineScheduler = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs)
coroutineScheduler.shutdown(10_000L)
coroutineScheduler = createScheduler()
}

// for tests only
@Synchronized
internal fun shutdown(timeout: Long) {
coroutineScheduler.shutdown(timeout)
}

// for tests only
internal fun restore() = usePrivateScheduler() // recreate scheduler
}

private class LimitingDispatcher(
Expand Down
2 changes: 1 addition & 1 deletion core/kotlinx-coroutines-core/src/scheduling/Tasks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.concurrent.*

// TODO most of these fields will be moved to 'object ExperimentalDispatcher'

internal const val DEFAULT_SCHEDULER_NAME = "CoroutineScheduler"
internal const val DEFAULT_SCHEDULER_NAME = "DefaultDispatcher"

// 100us as default
@JvmField
Expand Down
7 changes: 7 additions & 0 deletions core/kotlinx-coroutines-core/src/scheduling/WorkQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ internal class WorkQueue {
}
}

internal fun offloadAllWork(globalQueue: GlobalQueue) {
while (true) {
val task = pollExternal() ?: return
globalQueue.addLast(task)
}
}

/**
* [poll] for external (not owning this queue) workers
*/
Expand Down
Loading

0 comments on commit 5b490cc

Please sign in to comment.