Skip to content

Commit

Permalink
Try #4740:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] authored May 14, 2022
2 parents 947d3f9 + 8c722e3 commit 362294f
Show file tree
Hide file tree
Showing 14 changed files with 379 additions and 204 deletions.
6 changes: 3 additions & 3 deletions benches/benches/bevy_tasks/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn bench_overhead(c: &mut Criterion) {
let mut v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("overhead_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
let pool = TaskPoolBuilder::new().compute_threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
Expand Down Expand Up @@ -69,7 +69,7 @@ fn bench_for_each(c: &mut Criterion) {
let mut v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("for_each_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
let pool = TaskPoolBuilder::new().compute_threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
Expand Down Expand Up @@ -115,7 +115,7 @@ fn bench_many_maps(c: &mut Criterion) {
let v = (0..10000).collect::<Vec<usize>>();
let mut group = c.benchmark_group("many_maps_par_iter");
for thread_count in &[1, 2, 4, 8, 16, 32] {
let pool = TaskPoolBuilder::new().num_threads(*thread_count).build();
let pool = TaskPoolBuilder::new().compute_threads(*thread_count).build();
group.bench_with_input(
BenchmarkId::new("threads", thread_count),
thread_count,
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_asset/src/asset_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl AssetServer {
let owned_path = asset_path.to_owned();
self.server
.task_pool
.spawn(async move {
.spawn_io(async move {
if let Err(err) = server.load_async(owned_path, force).await {
warn!("{}", err);
}
Expand Down
8 changes: 4 additions & 4 deletions crates/bevy_asset/src/debug_asset_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bevy_ecs::{
schedule::SystemLabel,
system::{NonSendMut, Res, ResMut, SystemState},
};
use bevy_tasks::{IoTaskPool, TaskPoolBuilder};
use bevy_tasks::TaskPoolBuilder;
use bevy_utils::HashMap;
use std::{
ops::{Deref, DerefMut},
Expand Down Expand Up @@ -60,12 +60,12 @@ impl Plugin for DebugAssetServerPlugin {
fn build(&self, app: &mut bevy_app::App) {
let mut debug_asset_app = App::new();
debug_asset_app
.insert_resource(IoTaskPool(
.insert_resource(
TaskPoolBuilder::default()
.num_threads(2)
.io_threads(2)
.thread_name("Debug Asset Server IO Task Pool".to_string())
.build(),
))
)
.insert_resource(AssetServerSettings {
asset_folder: "crates".to_string(),
watch_for_changes: true,
Expand Down
4 changes: 2 additions & 2 deletions crates/bevy_asset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use path::*;

use bevy_app::{prelude::Plugin, App};
use bevy_ecs::schedule::{StageLabel, SystemStage};
use bevy_tasks::IoTaskPool;
use bevy_tasks::TaskPool;

/// The names of asset stages in an App Schedule
#[derive(Debug, Hash, PartialEq, Eq, Clone, StageLabel)]
Expand Down Expand Up @@ -82,7 +82,7 @@ pub fn create_platform_default_asset_io(app: &mut App) -> Box<dyn AssetIo> {
impl Plugin for AssetPlugin {
fn build(&self, app: &mut App) {
if !app.world.contains_resource::<AssetServer>() {
let task_pool = app.world.resource::<IoTaskPool>().0.clone();
let task_pool = app.world.resource::<TaskPool>().clone();

let source = create_platform_default_asset_io(app);

Expand Down
82 changes: 33 additions & 49 deletions crates/bevy_core/src/task_pool_options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bevy_ecs::world::World;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder};
use bevy_tasks::{TaskPool, TaskPoolBuilder};
use bevy_utils::tracing::trace;

/// Defines a simple way to determine how many threads to use given the number of remaining cores
Expand Down Expand Up @@ -100,54 +100,38 @@ impl DefaultTaskPoolOptions {

let mut remaining_threads = total_threads;

if !world.contains_resource::<IoTaskPool>() {
// Determine the number of IO threads we will use
let io_threads = self
.io
.get_number_of_threads(remaining_threads, total_threads);

trace!("IO Threads: {}", io_threads);
remaining_threads = remaining_threads.saturating_sub(io_threads);

world.insert_resource(IoTaskPool(
TaskPoolBuilder::default()
.num_threads(io_threads)
.thread_name("IO Task Pool".to_string())
.build(),
));
}

if !world.contains_resource::<AsyncComputeTaskPool>() {
// Determine the number of async compute threads we will use
let async_compute_threads = self
.async_compute
.get_number_of_threads(remaining_threads, total_threads);

trace!("Async Compute Threads: {}", async_compute_threads);
remaining_threads = remaining_threads.saturating_sub(async_compute_threads);

world.insert_resource(AsyncComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(async_compute_threads)
.thread_name("Async Compute Task Pool".to_string())
.build(),
));
}

if !world.contains_resource::<ComputeTaskPool>() {
// Determine the number of compute threads we will use
// This is intentionally last so that an end user can specify 1.0 as the percent
let compute_threads = self
.compute
.get_number_of_threads(remaining_threads, total_threads);

trace!("Compute Threads: {}", compute_threads);
world.insert_resource(ComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(compute_threads)
.thread_name("Compute Task Pool".to_string())
.build(),
));
if world.contains_resource::<TaskPool>() {
return;
}
// Determine the number of IO threads we will use
let io_threads = self
.io
.get_number_of_threads(remaining_threads, total_threads);

trace!("IO Threads: {}", io_threads);
remaining_threads = remaining_threads.saturating_sub(io_threads);

// Determine the number of async compute threads we will use
let async_compute_threads = self
.async_compute
.get_number_of_threads(remaining_threads, total_threads);

trace!("Async Compute Threads: {}", async_compute_threads);
remaining_threads = remaining_threads.saturating_sub(async_compute_threads);

// Determine the number of compute threads we will use
// This is intentionally last so that an end user can specify 1.0 as the percent
let compute_threads = self
.compute
.get_number_of_threads(remaining_threads, total_threads);

world.insert_resource(
TaskPoolBuilder::default()
.compute_threads(compute_threads)
.async_compute_threads(async_compute_threads)
.io_threads(io_threads)
.thread_name("Task Pool".to_string())
.build(),
);
}
}
8 changes: 3 additions & 5 deletions crates/bevy_ecs/src/schedule/executor_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
world::World,
};
use async_channel::{Receiver, Sender};
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool};
use bevy_tasks::{Scope, TaskPool};
#[cfg(feature = "trace")]
use bevy_utils::tracing::Instrument;
use fixedbitset::FixedBitSet;
Expand Down Expand Up @@ -123,10 +123,8 @@ impl ParallelSystemExecutor for ParallelExecutor {
}
}

let compute_pool = world
.get_resource_or_insert_with(|| ComputeTaskPool(TaskPool::default()))
.clone();
compute_pool.scope(|scope| {
let task_pool = world.get_resource_or_insert_with(TaskPool::default).clone();
task_pool.scope(|scope| {
self.prepare_systems(scope, systems, world);
let parallel_executor = async {
// All systems have been ran if there are no queued or running systems.
Expand Down
6 changes: 3 additions & 3 deletions crates/bevy_tasks/examples/busy_behavior.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use bevy_tasks::TaskPoolBuilder;

// This sample demonstrates creating a thread pool with 4 tasks and spawning 40 tasks that spin
// for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical
// This sample demonstrates creating a thread pool with 4 compute threads and spawning 40 tasks that
// spin for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical
// cores)

fn main() {
let pool = TaskPoolBuilder::new()
.thread_name("Busy Behavior ThreadPool".to_string())
.num_threads(4)
.compute_threads(4)
.build();

let t0 = instant::Instant::now();
Expand Down
8 changes: 4 additions & 4 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ mod single_threaded_task_pool;
#[cfg(target_arch = "wasm32")]
pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder};

mod usages;
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};

mod iter;
pub use iter::ParallelIterator;

#[allow(missing_docs)]
pub mod prelude {
#[cfg(target_arch = "wasm32")]
pub use crate::single_threaded_task_pool::TaskPool;
#[cfg(not(target_arch = "wasm32"))]
pub use crate::task_pool::TaskPool;
#[doc(hidden)]
pub use crate::{
iter::ParallelIterator,
slice::{ParallelSlice, ParallelSliceMut},
usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool},
};
}

Expand Down
91 changes: 88 additions & 3 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,19 @@ impl TaskPoolBuilder {
Self::default()
}

/// No op on the single threaded task pool
pub fn num_threads(self, _num_threads: usize) -> Self {
/// Override the number of compute-priority threads created for the pool. If unset, this default to the number
/// of logical cores of the system
pub fn compute_threads(self, num_threads: usize) -> Self {
self
}

/// Override the number of async-compute priority threads created for the pool. If unset, this defaults to 0.
pub fn async_compute_threads(self, num_threads: usize) -> Self {
self
}

/// Override the number of IO-priority threads created for the pool. If unset, this defaults to 0.
pub fn io_threads(self, num_threads: usize) -> Self {
self
}

Expand All @@ -37,6 +48,20 @@ impl TaskPoolBuilder {

/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by
/// the pool on threads owned by the pool. In this case - main thread only.
///
/// # Scheduling Semantics
/// Each thread in the pool is assigned to one of three priority groups: Compute, IO, and Async
/// Compute. Compute is higher priority than IO, which are both higher priority than async compute.
/// Every task is assigned to a group upon being spawned. A lower priority thread will always prioritize
/// its specific tasks (i.e. IO tasks on a IO thread), but will run higher priority tasks if it would
/// otherwise be sitting idle.
///
/// For example, under heavy compute workloads, compute tasks will be scheduled to run on the IO and
/// async compute thread groups, but any IO task will take precedence over any compute task on the IO
/// threads. Likewise, async compute tasks will never be scheduled on a compute or IO thread.
///
/// By default, all threads in the pool are dedicated to compute group. Thread counts can be altered
/// via [`TaskPoolBuilder`] when constructing the pool.
#[derive(Debug, Default, Clone)]
pub struct TaskPool {}

Expand Down Expand Up @@ -106,6 +131,44 @@ impl TaskPool {
FakeTask
}

/// Spawns a static future onto the JS event loop. For now it is returning FakeTask
/// instance with no-op detach method. Returning real Task is possible here, but tricky:
/// future is running on JS event loop, Task is running on async_executor::LocalExecutor
/// so some proxy future is needed. Moreover currently we don't have long-living
/// LocalExecutor here (above `spawn` implementation creates temporary one)
/// But for typical use cases it seems that current implementation should be sufficient:
/// caller can spawn long-running future writing results to some channel / event queue
/// and simply call detach on returned Task (like AssetServer does) - spawned future
/// can write results to some channel / event queue.
pub fn spawn_async_compute<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
where
T: Send + 'static,
{
wasm_bindgen_futures::spawn_local(async move {
future.await;
});
FakeTask
}

/// Spawns a static future onto the JS event loop. For now it is returning FakeTask
/// instance with no-op detach method. Returning real Task is possible here, but tricky:
/// future is running on JS event loop, Task is running on async_executor::LocalExecutor
/// so some proxy future is needed. Moreover currently we don't have long-living
/// LocalExecutor here (above `spawn` implementation creates temporary one)
/// But for typical use cases it seems that current implementation should be sufficient:
/// caller can spawn long-running future writing results to some channel / event queue
/// and simply call detach on returned Task (like AssetServer does) - spawned future
/// can write results to some channel / event queue.
pub fn spawn_io<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
where
T: Send + 'static,
{
wasm_bindgen_futures::spawn_local(async move {
future.await;
});
FakeTask
}

/// Spawns a static future on the JS event loop. This is exactly the same as [`TaskSpool::spawn`].
pub fn spawn_local<T>(&self, future: impl Future<Output = T> + 'static) -> FakeTask
where
Expand Down Expand Up @@ -141,7 +204,29 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> {
/// On the single threaded task pool, it just calls [`Scope::spawn_local`].
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&mut self, f: Fut) {
pub fn spawn<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
self.spawn_local(f);
}

/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
/// the provided future. The results of the future will be returned as a part of
/// [`TaskPool::scope`]'s return value.
///
/// On the single threaded task pool, it just calls [`Scope::spawn_local`].
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_async_compute<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
self.spawn_local(f);
}

/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
/// the provided future. The results of the future will be returned as a part of
/// [`TaskPool::scope`]'s return value.
///
/// On the single threaded task pool, it just calls [`Scope::spawn_local`].
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_io<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
self.spawn_local(f);
}

Expand Down
Loading

0 comments on commit 362294f

Please sign in to comment.