Skip to content

Commit

Permalink
sync: reduce contention in watch channel (#5464)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn authored Feb 17, 2023
1 parent 0dc1b71 commit b921fe4
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 5 deletions.
2 changes: 1 addition & 1 deletion tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ tokio_thread_local! {
}
}

#[cfg(feature = "macros")]
#[cfg(any(feature = "macros", all(feature = "sync", feature = "rt")))]
pub(crate) fn thread_rng_n(n: u32) -> u32 {
CONTEXT.with(|ctx| ctx.rng.fastrand_n(n))
}
Expand Down
74 changes: 71 additions & 3 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use crate::sync::notify::Notify;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
use std::fmt;
use std::mem;
use std::ops;
use std::panic;
Expand Down Expand Up @@ -166,7 +167,6 @@ impl<'a, T> Ref<'a, T> {
}
}

#[derive(Debug)]
struct Shared<T> {
/// The most recent value.
value: RwLock<T>,
Expand All @@ -181,12 +181,24 @@ struct Shared<T> {
ref_count_rx: AtomicUsize,

/// Notifies waiting receivers that the value changed.
notify_rx: Notify,
notify_rx: big_notify::BigNotify,

/// Notifies any task listening for `Receiver` dropped events.
notify_tx: Notify,
}

impl<T: fmt::Debug> fmt::Debug for Shared<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load();
f.debug_struct("Shared")
.field("value", &self.value)
.field("version", &state.version())
.field("is_closed", &state.is_closed())
.field("ref_count_rx", &self.ref_count_rx)
.finish()
}
}

pub mod error {
//! Watch error types.

Expand Down Expand Up @@ -221,6 +233,62 @@ pub mod error {
impl std::error::Error for RecvError {}
}

mod big_notify {
use super::*;
use crate::sync::notify::Notified;

// To avoid contention on the lock inside the `Notify`, we store multiple
// copies of it. Then, we use either circular access or randomness to spread
// out threads over different `Notify` objects.
//
// Some simple benchmarks show that randomness performs slightly better than
// circular access (probably due to contention on `next`), so we prefer to
// use randomness when Tokio is compiled with a random number generator.
//
// When the random number generator is not available, we fall back to
// circular access.

pub(super) struct BigNotify {
#[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
next: AtomicUsize,
inner: [Notify; 8],
}

impl BigNotify {
pub(super) fn new() -> Self {
Self {
#[cfg(not(all(
not(loom),
feature = "sync",
any(feature = "rt", feature = "macros")
)))]
next: AtomicUsize::new(0),
inner: Default::default(),
}
}

pub(super) fn notify_waiters(&self) {
for notify in &self.inner {
notify.notify_waiters();
}
}

/// This function implements the case where randomness is not available.
#[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
pub(super) fn notified(&self) -> Notified<'_> {
let i = self.next.fetch_add(1, Relaxed) % 8;
self.inner[i].notified()
}

/// This function implements the case where randomness is available.
#[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
pub(super) fn notified(&self) -> Notified<'_> {
let i = crate::runtime::context::thread_rng_n(8) as usize;
self.inner[i].notified()
}
}
}

use self::state::{AtomicState, Version};
mod state {
use crate::loom::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -320,7 +388,7 @@ pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
value: RwLock::new(init),
state: AtomicState::new(),
ref_count_rx: AtomicUsize::new(1),
notify_rx: Notify::new(),
notify_rx: big_notify::BigNotify::new(),
notify_tx: Notify::new(),
});

Expand Down
6 changes: 5 additions & 1 deletion tokio/src/util/rand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ impl FastRand {
old_seed
}

#[cfg(any(feature = "macros", feature = "rt-multi-thread"))]
#[cfg(any(
feature = "macros",
feature = "rt-multi-thread",
all(feature = "sync", feature = "rt")
))]
pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
// This is similar to fastrand() % n, but faster.
// See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
Expand Down

0 comments on commit b921fe4

Please sign in to comment.