From cf7b90bbe7cb87099499876622f413a65a699038 Mon Sep 17 00:00:00 2001 From: Thomas Hurst Date: Tue, 28 Feb 2023 17:45:14 +0000 Subject: [PATCH 1/4] dd: use an alarm thread instead of elapsed() calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Quick benchmark on FreeBSD 13.1-RELEASE: Summary './dd-thread-alarm if=/dev/zero of=/dev/null count=4000000 status=progress' ran 1.17 ± 0.17 times faster than './dd-baseline if=/dev/zero of=/dev/null count=4000000 status=progress' --- src/uu/dd/src/dd.rs | 47 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index eaf89ca55e3..2baa12106f6 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -65,6 +65,46 @@ struct Settings { status: Option, } +use std::thread::sleep; +use std::time::Duration; + +use std::sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, +}; + +#[derive(Debug, Clone)] +pub struct Alarm { + interval: Duration, + trigger: Arc, +} + +impl Alarm { + pub fn with_interval(interval: Duration) -> Alarm { + let trigger = Arc::new(AtomicBool::default()); + + let weak_trigger = Arc::downgrade(&trigger); + std::thread::spawn(move || loop { + sleep(interval); + if let Some(trigger) = weak_trigger.upgrade() { + trigger.store(true, Relaxed); + } else { + break; + } + }); + + Alarm { interval, trigger } + } + + pub fn is_triggered(&self) -> bool { + self.trigger.swap(false, Relaxed) + } + + pub fn get_interval(&self) -> Duration { + self.interval + } +} + /// A number in blocks or bytes /// /// Some values (seek, skip, iseek, oseek) can have values either in blocks or in bytes. @@ -628,7 +668,6 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // information. let (prog_tx, rx) = mpsc::channel(); let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status)); - let mut progress_as_secs = 0; // Optimization: if no blocks are to be written, then don't // bother allocating any buffers. @@ -639,6 +678,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // Create a common buffer with a capacity of the block size. // This is the max size needed. let mut buf = vec![BUF_INIT_BYTE; bsize]; + let alarm = Alarm::with_interval(Duration::from_secs(1)); // The main read/write loop. // @@ -667,9 +707,8 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // error. rstat += rstat_update; wstat += wstat_update; - let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false); - if prog_update.duration.as_secs() >= progress_as_secs { - progress_as_secs = prog_update.duration.as_secs() + 1; + if alarm.is_triggered() { + let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false); prog_tx.send(prog_update).unwrap_or(()); } } From 52c93a4d107a6252c4cfe85c572075cacc493272 Mon Sep 17 00:00:00 2001 From: Thomas Hurst Date: Wed, 1 Mar 2023 13:56:18 +0000 Subject: [PATCH 2/4] dd: Simplify loop of progress Alarm thread --- src/uu/dd/src/dd.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index 2baa12106f6..15c7e8ed4af 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -80,20 +80,18 @@ pub struct Alarm { } impl Alarm { - pub fn with_interval(interval: Duration) -> Alarm { + pub fn with_interval(interval: Duration) -> Self { let trigger = Arc::new(AtomicBool::default()); let weak_trigger = Arc::downgrade(&trigger); - std::thread::spawn(move || loop { - sleep(interval); - if let Some(trigger) = weak_trigger.upgrade() { + std::thread::spawn(move || { + while let Some(trigger) = weak_trigger.upgrade() { + sleep(interval); trigger.store(true, Relaxed); - } else { - break; } }); - Alarm { interval, trigger } + Self { interval, trigger } } pub fn is_triggered(&self) -> bool { From 546631c8e77bb6256d6b211306ed71b5bbf0e16f Mon Sep 17 00:00:00 2001 From: Thomas Hurst Date: Wed, 3 May 2023 16:30:53 +0000 Subject: [PATCH 3/4] dd: Tidy includes --- src/uu/dd/src/dd.rs | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index 15c7e8ed4af..541c85ff959 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -33,9 +33,12 @@ use std::os::unix::fs::FileTypeExt; #[cfg(any(target_os = "linux", target_os = "android"))] use std::os::unix::fs::OpenOptionsExt; use std::path::Path; -use std::sync::mpsc; +use std::sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + mpsc, Arc, +}; use std::thread; -use std::time; +use std::time::{Duration, Instant}; use clap::{crate_version, Arg, Command}; use gcd::Gcd; @@ -65,14 +68,6 @@ struct Settings { status: Option, } -use std::thread::sleep; -use std::time::Duration; - -use std::sync::{ - atomic::{AtomicBool, Ordering::Relaxed}, - Arc, -}; - #[derive(Debug, Clone)] pub struct Alarm { interval: Duration, @@ -84,9 +79,9 @@ impl Alarm { let trigger = Arc::new(AtomicBool::default()); let weak_trigger = Arc::downgrade(&trigger); - std::thread::spawn(move || { + thread::spawn(move || { while let Some(trigger) = weak_trigger.upgrade() { - sleep(interval); + thread::sleep(interval); trigger.store(true, Relaxed); } }); @@ -646,7 +641,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // of its report includes the throughput in bytes per second, // which requires knowing how long the process has been // running. - let start = time::Instant::now(); + let start = Instant::now(); // A good buffer size for reading. // @@ -718,7 +713,7 @@ fn finalize( output: &mut Output, rstat: ReadStat, wstat: WriteStat, - start: time::Instant, + start: Instant, prog_tx: &mpsc::Sender, output_thread: thread::JoinHandle, ) -> std::io::Result<()> { From 01a8623d216599449f6c895996e6ba64c12fbd89 Mon Sep 17 00:00:00 2001 From: Thomas Hurst Date: Wed, 3 May 2023 16:31:14 +0000 Subject: [PATCH 4/4] dd: Add documentation to Alarm struct --- src/uu/dd/src/dd.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index 541c85ff959..41b7eb7732f 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -68,6 +68,15 @@ struct Settings { status: Option, } +/// A timer which triggers on a given interval +/// +/// After being constructed with [`Alarm::with_interval`], [`Alarm::is_triggered`] +/// will return true once per the given [`Duration`]. +/// +/// Can be cloned, but the trigger status is shared across all instances so only +/// the first caller each interval will yield true. +/// +/// When all instances are dropped the background thread will exit on the next interval. #[derive(Debug, Clone)] pub struct Alarm { interval: Duration, @@ -671,6 +680,11 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // Create a common buffer with a capacity of the block size. // This is the max size needed. let mut buf = vec![BUF_INIT_BYTE; bsize]; + + // Spawn a timer thread to provide a scheduled signal indicating when we + // should send an update of our progress to the reporting thread. + // + // This avoids the need to query the OS monotonic clock for every block. let alarm = Alarm::with_interval(Duration::from_secs(1)); // The main read/write loop.