Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IO thread pool for blocking calls #79

Closed
elizarov opened this issue Jul 5, 2017 · 45 comments
Closed

IO thread pool for blocking calls #79

elizarov opened this issue Jul 5, 2017 · 45 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Jul 5, 2017

We need some kind of IO dispatcher in kotlinx.coroutines that will be optimized for offloading of blocking calls and will be backed by an unbounded thread-pool that is smart enough to create new threads only when needed and to shut them down on timeout. The goal is that you could always wrap blocking IO call in withContext(IO) { ... } (UPDATED: it was formerly named run(IO)) and be sure that you will not run into the problem of having not enough threads in your pool. However, it will be up to the user to control the maximal number of concurrent operations via some other means.

It will be somewhat conceptually similar to newThread() scheduler in Rx, but it will be also designed to be used where io() scheduler in Rx is used.

@fvasco
Copy link
Contributor

fvasco commented Jul 5, 2017

Java 6 has Executors.newCachedThreadPool, it is easy to create and easy to customize:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

We can tune maximumPoolSize at startup to limit "the maximum allowed number of threads" throught a System property.

However we should consider that specific use case should create a fine tuned IO executor without using the global one.

@elizarov
Copy link
Contributor Author

We need to make sure that there is no confusion in difference between run(IO) { /* blocking code here */ } and runBlocking { ... }.

@fvasco
Copy link
Contributor

fvasco commented Jul 11, 2017

Kotlin can provide an explicit annotation (like @DslMarker) to avoid invocation from coroutine.

So we can rename the function to

@Blocking
fun runAndWait( ... )

@fvasco
Copy link
Contributor

fvasco commented Jul 11, 2017

Using the code:

val job = ...
run(IO + job) { ... }
job.cancel()

I/O thread should be interrupted?

@elizarov
Copy link
Contributor Author

We should not do interruption by default. It is dangerous. But we definitely need some easy way to adapt an interruptible blocking API (when we are sure it plays well with interrupts) to coroutines. I don't know yet what is going to be the best solution for this. This is also related to #57

@elizarov
Copy link
Contributor Author

@fvasco W.r.t to @Blocking annotation it is actually a larger issue that spans both Kotlin and Java. Here is a related issue in YT: https://youtrack.jetbrains.com/issue/KT-15525

fvasco pushed a commit to fvasco/kotlinx.coroutines that referenced this issue Jul 12, 2017
fvasco pushed a commit to fvasco/kotlinx.coroutines that referenced this issue Jul 13, 2017
fvasco pushed a commit to fvasco/kotlinx.coroutines that referenced this issue Jul 14, 2017
fvasco pushed a commit to fvasco/kotlinx.coroutines that referenced this issue Jul 14, 2017
fvasco pushed a commit to fvasco/kotlinx.coroutines that referenced this issue Jul 31, 2017
@voddan
Copy link

voddan commented Sep 14, 2017

IMHO the name is misleading. When I see run(IO){} I think it is a coroutine context that guaranties that it runs on the main thread, even if started somewhere else. Or is it an impossible case?

@elizarov
Copy link
Contributor Author

Hh... the assumption was that the code inside run(IO) { ... } runs inside special IO threads that are designed for blocking IO. The naming problem is how we highlight that it is designed for blocking IO only and that you should not go there if you have async IO....

@LouisCAD
Copy link
Contributor

@elizarov Why not use blockingIO then, instead of just IO?

@mykola-dev
Copy link

because it's not blocking? and rxjava already has Schedulers.io() which does the same thing

@oshai
Copy link
Contributor

oshai commented Sep 14, 2017

I would call it IOPool (similar to common pool) with the ability to give it number of concurrent operations as parameter.

@voddan
Copy link

voddan commented Sep 15, 2017

BTW, why do we need to call it IO? Isn't it suitable for any blocking operations, IO or not? I feel that the main property of this dispatcher is that it can vary the number of its threads. Maybe something like BlockingHeap would be more fit?

@fvasco
Copy link
Contributor

fvasco commented Sep 15, 2017

A bit unrelated question.

How a coroutine integrates with locks?
Using IO for a blocking call leads to:

run(IO) { blockingQueue.take() }

this might looks reasonable, but putting this line in actors, in thousand of actors can easily drain the pool or, on other hand, it can create thousand of threads.

Currently I can't figure how integrate coroutine to synchronized block, locks, etc...
IO requests are a particular case of locking code: limit the request number can be a benefit and this one doesn't produce a deadlock, never.

Without a golden rule to execute any kind of blocking code I consider really helpful highlight the IO nature of this dispatcher.

@elizarov
Copy link
Contributor Author

On the naming: we don't have to have IO in the name (it is indeed for blocking ops), but we want it to be discoverable. The problem with "arbitrary blocking" is that in a large app you might want to have different pools for different kinds of blocking ops, but we need to provide some "least common denominator" out of the box, and the problem of blocking IO is what people are facing all the time, so IOPool might be a good name. I'm also thinking on "blocking" int the name, but as I've already alluded earlier run(BlockingPool) { ... } is too confusing w.r.t runBlocking { ... }. Might not be an issue if we rename run, though.

@elizarov
Copy link
Contributor Author

@fvasco Good point. You could use "IO" pool for integration with blocking queues etc. I mean, if you have to integrate with somebody else's BlockingQueue then you have no choice but to schedule the wait into some pool that is designed with blocking operations in mind.

@elizarov
Copy link
Contributor Author

Here is a fresh idea that was pointed to by Raul Raja at public Slack. It suffices to have a single CPU-optimized dispatcher is we also provide Scala-like blocking { ... } wrapper function that tags the block of code as doing "blocking" operation for the scheduler. Now, the scheduler will know how many of its treads are blocked and can create new thread as needed to continue to be able to pump CPU-consuming stuff. This seems to me a better programming model for doing blocking IO compared to the dedicated scheduler/dispatcher.

@fvasco
Copy link
Contributor

fvasco commented Sep 18, 2017

Please correct me if i am wrong,
the Raja's proposal is to create a unbonded thread executor for blocking operations?
In such case the coroutines remain cheap but blocking code in coroutines may become a fork bomb.

@elizarov
Copy link
Contributor Author

elizarov commented Sep 18, 2017

Let me clarify. Here is a problem we have: We need to be able to do both CPU-bound tasks and IO/blocking tasks with coroutines. The original proposal (this github issue) was to achieve this via a dedicated IO-tuned dispatcher, so you'd separate these two use-cases by choosing an appropriate dispatcher:

launch(CommonPool) { ... cpu-bound code ... }
launch(IO) { ... blocking IO code ... }

The alternative idea is to borrow Scala's approach: instead of two separate dispatchers let's have a single dispatcher (let's call it DefaultDispatcher) and use it like this:

launch(DefaultDispatcher) { ... cpu-bound code ... }
launch(DefaultDispatcher) { blocking { ... blocking IO code ... } }

What I particularly like about it, is that with this approach is makes sense to truly make this DefaultDispatcher a global default for all coroutine builders and just write:

launch { ... cpu-bound code ... }
launch { blocking { ... blocking IO code ... } }

@oshai
Copy link
Contributor

oshai commented Sep 18, 2017 via email

@fvasco
Copy link
Contributor

fvasco commented Sep 18, 2017

I agree but it does not look so fresh,
it looks like the PR #83

@elizarov
Copy link
Contributor Author

@fvasco It finally clicked into my mind when I understood how it shall interface with a scheduler. The difference from PR #83 is that it should not require a switch of context at all if the scheduler that currently runs coroutine natively supports blocking operations itself and counts the number of ongoing blocking operations to make decisions on creating new threads.

@elizarov
Copy link
Contributor Author

@oshai You can always create thread-number limited context for your blocking ops. It is very good choice for people striving for best control. For example, if you are going JDBC and you want to limit your code to at most n concurrent DB operations, then you'll define val dbContext = newFixedThreadPoolContext(n, "DB") and use it for all you blocking DB operations, wrapping them into run(dbContext) { .... }.

The idea behind blocking { ... } is that is going to be a convenient for cases where you don't really care to put a limit, which is the case where the number of your concurrent blocking operations is limited though some other means, for example, by simply having a limited number of running coroutines that do blocking operations.

@elizarov
Copy link
Contributor Author

@fvasco The other difference is that this blocking { ... } does not have to be suspend. It can be defined with the following signature:

inline fun <T> blocking(block: () -> T): T

This way, you will be able to use it anywhere in your blocking code, regardless of whether you are going to run it from coroutine or not, and it will have the effect of signaling to the IO-aware coroutine scheduler when it is being use from coroutines.

However, the other implementation option is to declare it as suspend and then make its behavior depended on what kind of coroutine it is invoked from. It can simply serve as signal when it is invoked from an IO-aware scheduler or do switch (like run does) into a default IO-aware scheduler otherwise (if you use it from a UI-bound coroutine, for example).

@fvasco
Copy link
Contributor

fvasco commented Sep 19, 2017

Hi @elizarov,
build an unbounded thread pool is trivial in Java, also check the thread ownership.

Both changes are tiny patch over the #83, maybe your proposal is more complex, so I am patient for a code review.

@elizarov
Copy link
Contributor Author

UPDATE on the current design thinking around this issue:

  • DO introduce some kind of IO context so that one can write writeContext(IO) { ... } around blocking operation to ensure that limited-concurrency pool that executes coroutines is not blocked. However, it is going to be backed by an efficient implementation that does not perform any actual thread context switch when going from DefaultContext into IO context (see Sliceable dispatchers: Provide alternative to newSingle/FixedThreadPoolContext via a shared pool of threads  #261).

  • MAYBE implement some kind of blocking { ... } marker block as a complementary mechanism that can be used in non-suspending code that can get invoked on the coroutines thread pool and performs identically to withContext(IO) { ... } when used from a suspending function.

Open questions:

  • How do we name the context? Using IO is questionable, as it should be used for any blocking operations, not necessarily I/O related.

  • How do we name the maker block? Using blocking { ... } is questionable, as it is quite similar to runBlocking { ... } which would ensure lots of confusion and misunderstanding.

@LouisCAD
Copy link
Contributor

BlockingOps as an alternative name to IO comes to my mind. It's 2 words, like CommonPool.

@GeoffreyMetais
Copy link

What about Extended or Expandable? As its an expanding context.
this name enhances the fact that this context is not fixed and more costly than CommonPool.

@dave08
Copy link

dave08 commented Feb 28, 2018

Or ElasticPool, ExtendablePool? And withBlocking { }?

@LassoMike
Copy link

LassoMike commented Feb 28, 2018

I like CachedThreadPoolContext because it sounds like the existing newSingleThreadContext and newFixedThreadPoolContext. Or CachedThreadPool because it sounds like CommonPool.

@fvasco
Copy link
Contributor

fvasco commented Feb 28, 2018

Using blocking { ... } is questionable, as it is quite similar to runBlocking { ... } which would ensure lots of confusion and misunderstanding

It is also possible to solve this debate renaming the runBlocking function.
As counterpart of async { } function we can rename runBlocking { } to sync { }, for example.

How do we name the maker block

I consider usefull a dedicated builder, expecialling to indicate the synchoronous/blocking nature of invoked functions.
The name should sound like the defined pool, for BlockingPool pool we can use blocking { }, for IO pool we use io { }, and so on...

@elizarov
Copy link
Contributor Author

Renaming runBlocking is on the table, too.

As for the naming of the context please keep in mind #261. All the contexts will be backed by the same shared pool, so I don't think it is appropriate to use Pool in their names. Moreover, I'm considering to deprecate CommonPool and recommend its replacement with DefaultContext when we are done with this.

Let me also link a related discussion on the naming of dispatchers: #41. I'm not convinced that we should isolate names of the dispatchers into a separate named, but I'm also not firmly convinced that they should be top-level.

@fvasco
Copy link
Contributor

fvasco commented Feb 28, 2018

Personally I consider the DefaultContext's name a good name, but a little messy.
Defining only DefaultContext induces to define an AlternativeContext: chaos over confusion.

A coroutine context appears more rich than a single coroutine dispatcher, so use Context induces me to consider it as a dispatcher plus something else...
What is wrong in the DefaultContext is the dispatcher only, we can name it CpuBoundDispatcher, giving a clear name makes the problem more evident: we cannot use the CpuBoundDispatcher for a non CPU bound task.

Instead, defining

val DefaultContext : CoroutineContext = CpuBoundDispatcher + (optionally something else)

leads me to consider

val BlockingContext = DefaultContext + NonCpuBoundDispatcher

Unfortunately I haven't solved this issue, I am sorry.
The above considerations are confusing my mind, but it is probably the reason why this report looks more like a beauty contest than a technical problem.

@elizarov
Copy link
Contributor Author

elizarov commented Aug 4, 2018

Now when we merged experimental coroutines dispatcher we can deliver IO scheduler soon using it, even if DefaultDispatcher would be still pointing to CommonPool for some time.

@LouisCAD
Copy link
Contributor

LouisCAD commented Aug 5, 2018

I'm a bit confused as to what to expect for the future of kotlinx.coroutines default/recommended dispatchers. Will we have to continue running I/O separate from CPU-bound tasks like it's currently done with CommonPool and custom dispatcher (often executor based), or will they be merged to the new experimental coroutines dispatcher so we no longer have to worry about I/O being done in the same dispatcher as the CPU bound code?

@elizarov
Copy link
Contributor Author

elizarov commented Aug 5, 2018

The expected future looks like this. There going to be only a single pool of threads in addition to UI thread.

  • Experimental coroutines scheduler becomes DefaultDispatcher by default (CommonPool' becomes deprecated). So, you launch { .... }` CPU-bound task in the default dispatcher. Its default parallelism is equal to the number of cores.
  • Experimental coroutines scheduler backs IO dispatcher, too, but default parallelism of IO is effectively unbounded. You can do withContext(IO) { ... } without having switch to a different thread, but if too many threads get blocked on IO, then new threads will spin up automatically to help.
  • You can create additional dispatchers backed by the same thread pool with a custom limit on parallelism for doing things like synchronous DB access (see Sliceable dispatchers: Provide alternative to newSingle/FixedThreadPoolContext via a shared pool of threads  #261).

@fvasco
Copy link
Contributor

fvasco commented Aug 6, 2018

Its default parallelism is equal to the number of cores.

Are you considered:

The current plan is to set defaultParallelism to nCPUs + 1 as a compromise value that ensures utilization of the underlying hardware even if one coroutine accidentally blocks and helps us avoid issue #198

and later in #261 ?

elizarov added a commit that referenced this issue Sep 11, 2018
* Dispatchers.Default — a default dispatcher for background asynchronous tasks
  (currently backed by FJP commonPool, a new dispatcher in the future).
* Dispatchers.IO — a dispatcher for blocking background operations (#79).
* Dispatchers.Main — a dispatcher for Android Main Thread (#533).
* Dispatchers.Swing — a dispatcher for Swing Event Dispatch Thread.
* Dispatchers.JavaFx — a dispatcher for JavaFx Application Thread.
* Old dispatchers are deprecated, CommonPool is deprecated, too.
* awaitPulse() in JavaFx and awaitFrame() in Android are top-level funs.
* Introduced HandlerDispatcher, SwingDispatcher, and JavaFxDispatcher types
  in the corresponding UI modules for type-safety and future extensions

Fixes #41
elizarov added a commit that referenced this issue Sep 11, 2018
* Dispatchers.Default — a default dispatcher for background asynchronous tasks
  (currently backed by FJP commonPool, a new dispatcher in the future).
* Dispatchers.IO — a dispatcher for blocking background operations (#79).
* Dispatchers.Main — a dispatcher for Android Main Thread (#533).
* Dispatchers.Swing — a dispatcher for Swing Event Dispatch Thread.
* Dispatchers.JavaFx — a dispatcher for JavaFx Application Thread.
* Old dispatchers are deprecated, CommonPool is deprecated, too.
* awaitPulse() in JavaFx and awaitFrame() in Android are top-level funs.
* Introduced HandlerDispatcher, SwingDispatcher, and JavaFxDispatcher types
  in the corresponding UI modules for type-safety and future extensions

Fixes #41
Fixes #533
elizarov added a commit that referenced this issue Sep 11, 2018
* Dispatchers.Default — a default dispatcher for background asynchronous tasks
  (currently backed by FJP commonPool, a new dispatcher in the future).
* Dispatchers.IO — a dispatcher for blocking background operations (#79).
* Dispatchers.Main — a dispatcher for Android Main Thread (#533).
* Dispatchers.Swing — a dispatcher for Swing Event Dispatch Thread.
* Dispatchers.JavaFx — a dispatcher for JavaFx Application Thread.
* Old dispatchers are deprecated, CommonPool is deprecated, too.
* awaitPulse() in JavaFx and awaitFrame() in Android are top-level funs.
* Introduced HandlerDispatcher, SwingDispatcher, and JavaFxDispatcher types
  in the corresponding UI modules for type-safety and future extensions

Fixes #41
Fixes #533
qwwdfsad pushed a commit that referenced this issue Sep 11, 2018
* Dispatchers.Default — a default dispatcher for background asynchronous tasks
  (currently backed by FJP commonPool, a new dispatcher in the future).
* Dispatchers.IO — a dispatcher for blocking background operations (#79).
* Dispatchers.Main — a dispatcher for Android Main Thread (#533).
* Dispatchers.Swing — a dispatcher for Swing Event Dispatch Thread.
* Dispatchers.JavaFx — a dispatcher for JavaFx Application Thread.
* Old dispatchers are deprecated, CommonPool is deprecated, too.
* awaitPulse() in JavaFx and awaitFrame() in Android are top-level funs.
* Introduced HandlerDispatcher, SwingDispatcher, and JavaFxDispatcher types
  in the corresponding UI modules for type-safety and future extensions

Fixes #41
Fixes #533
@voddan
Copy link

voddan commented Sep 12, 2018

@elizarov What's the state of mind on this issue? Dispatchers.IO is as confusing as ever.

IMHO of all the names suggested above, Dispatchers.Elastic is the most direct and self-describing one. Also, it draws a useful parallel with Amazon Elastic Compute Cloud.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests