Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dd: move dd_out() function up to module level #4428

Merged
merged 2 commits into from
Feb 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 123 additions & 124 deletions src/uu/dd/src/dd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl Read for Source {
///
/// Use the [`Input::new_stdin`] or [`Input::new_file`] functions to
/// construct a new instance of this struct. Then pass the instance to
/// the [`Output::dd_out`] function to execute the main copy operation
/// the [`dd_copy`] function to execute the main copy operation
/// for `dd`.
struct Input<'a> {
/// The source from which bytes will be read.
Expand Down Expand Up @@ -449,7 +449,7 @@ impl Write for Dest {
///
/// Use the [`Output::new_stdout`] or [`Output::new_file`] functions
/// to construct a new instance of this struct. Then use the
/// [`Output::dd_out`] function to execute the main copy operation for
/// [`dd_copy`] function to execute the main copy operation for
/// `dd`.
struct Output<'a> {
/// The destination to which bytes will be written.
Expand Down Expand Up @@ -579,136 +579,135 @@ impl<'a> Output<'a> {
Ok(())
}
}
}

/// Copy the given input data to this output, consuming both.
///
/// This method contains the main loop for the `dd` program. Bytes
/// are read in blocks from `i` and written in blocks to this
/// output. Read/write statistics are reported to stderr as
/// configured by the `status` command-line argument.
///
/// # Errors
///
/// If there is a problem reading from the input or writing to
/// this output.
fn dd_out(mut self, mut i: Input) -> std::io::Result<()> {
// The read and write statistics.
//
// These objects are counters, initialized to zero. After each
// iteration of the main loop, each will be incremented by the
// number of blocks read and written, respectively.
let mut rstat = ReadStat::default();
let mut wstat = WriteStat::default();

// The time at which the main loop starts executing.
//
// When `status=progress` is given on the command-line, the
// `dd` program reports its progress every second or so. Part
// of its report includes the throughput in bytes per second,
// which requires knowing how long the process has been
// running.
let start = time::Instant::now();

// A good buffer size for reading.
//
// This is an educated guess about a good buffer size based on
// the input and output block sizes.
let bsize = calc_bsize(i.settings.ibs, self.settings.obs);

// Start a thread that reports transfer progress.
//
// The `dd` program reports its progress after every block is written,
// at most every 1 second, and only if `status=progress` is given on
// the command-line or a SIGUSR1 signal is received. We
// perform this reporting in a new thread so as not to take
// any CPU time away from the actual reading and writing of
// data. We send a `ProgUpdate` from the transmitter `prog_tx`
// to the receives `rx`, and the receiver prints the transfer
// information.
let (prog_tx, rx) = mpsc::channel();
let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status));
let mut progress_as_secs = 0;

// Optimization: if no blocks are to be written, then don't
// bother allocating any buffers.
if let Some(Num::Blocks(0) | Num::Bytes(0)) = i.settings.count {
return self.finalize(rstat, wstat, start, &prog_tx, output_thread);
};

// Create a common buffer with a capacity of the block size.
// This is the max size needed.
let mut buf = vec![BUF_INIT_BYTE; bsize];
/// Copy the given input data to this output, consuming both.
///
/// This method contains the main loop for the `dd` program. Bytes
/// are read in blocks from `i` and written in blocks to this
/// output. Read/write statistics are reported to stderr as
/// configured by the `status` command-line argument.
///
/// # Errors
///
/// If there is a problem reading from the input or writing to
/// this output.
fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
// The read and write statistics.
//
// These objects are counters, initialized to zero. After each
// iteration of the main loop, each will be incremented by the
// number of blocks read and written, respectively.
let mut rstat = ReadStat::default();
let mut wstat = WriteStat::default();

// The time at which the main loop starts executing.
//
// When `status=progress` is given on the command-line, the
// `dd` program reports its progress every second or so. Part
// of its report includes the throughput in bytes per second,
// which requires knowing how long the process has been
// running.
let start = time::Instant::now();

// A good buffer size for reading.
//
// This is an educated guess about a good buffer size based on
// the input and output block sizes.
let bsize = calc_bsize(i.settings.ibs, o.settings.obs);

// Start a thread that reports transfer progress.
//
// The `dd` program reports its progress after every block is written,
// at most every 1 second, and only if `status=progress` is given on
// the command-line or a SIGUSR1 signal is received. We
// perform this reporting in a new thread so as not to take
// any CPU time away from the actual reading and writing of
// data. We send a `ProgUpdate` from the transmitter `prog_tx`
// to the receives `rx`, and the receiver prints the transfer
// information.
let (prog_tx, rx) = mpsc::channel();
let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status));
let mut progress_as_secs = 0;

// Optimization: if no blocks are to be written, then don't
// bother allocating any buffers.
if let Some(Num::Blocks(0) | Num::Bytes(0)) = i.settings.count {
return finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread);
};

// The main read/write loop.
// Create a common buffer with a capacity of the block size.
// This is the max size needed.
let mut buf = vec![BUF_INIT_BYTE; bsize];

// The main read/write loop.
//
// Each iteration reads blocks from the input and writes
// blocks to this output. Read/write statistics are updated on
// each iteration and cumulative statistics are reported to
// the progress reporting thread.
while below_count_limit(&i.settings.count, &rstat, &wstat) {
// Read a block from the input then write the block to the output.
//
// Each iteration reads blocks from the input and writes
// blocks to this output. Read/write statistics are updated on
// each iteration and cumulative statistics are reported to
// the progress reporting thread.
while below_count_limit(&i.settings.count, &rstat, &wstat) {
// Read a block from the input then write the block to the output.
//
// As an optimization, make an educated guess about the
// best buffer size for reading based on the number of
// blocks already read and the number of blocks remaining.
let loop_bsize =
calc_loop_bsize(&i.settings.count, &rstat, &wstat, i.settings.ibs, bsize);
let rstat_update = read_helper(&mut i, &mut buf, loop_bsize)?;
if rstat_update.is_empty() {
break;
}
let wstat_update = self.write_blocks(&buf)?;

// Update the read/write stats and inform the progress thread once per second.
//
// If the receiver is disconnected, `send()` returns an
// error. Since it is just reporting progress and is not
// crucial to the operation of `dd`, let's just ignore the
// error.
rstat += rstat_update;
wstat += wstat_update;
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false);
if prog_update.duration.as_secs() >= progress_as_secs {
progress_as_secs = prog_update.duration.as_secs() + 1;
prog_tx.send(prog_update).unwrap_or(());
}
// As an optimization, make an educated guess about the
// best buffer size for reading based on the number of
// blocks already read and the number of blocks remaining.
let loop_bsize = calc_loop_bsize(&i.settings.count, &rstat, &wstat, i.settings.ibs, bsize);
let rstat_update = read_helper(&mut i, &mut buf, loop_bsize)?;
if rstat_update.is_empty() {
break;
}
self.finalize(rstat, wstat, start, &prog_tx, output_thread)
}
let wstat_update = o.write_blocks(&buf)?;

/// Flush output, print final stats, and join with the progress thread.
fn finalize<T>(
&mut self,
rstat: ReadStat,
wstat: WriteStat,
start: time::Instant,
prog_tx: &mpsc::Sender<ProgUpdate>,
output_thread: thread::JoinHandle<T>,
) -> std::io::Result<()> {
// Flush the output, if configured to do so.
self.sync()?;

// Truncate the file to the final cursor location.
// Update the read/write stats and inform the progress thread once per second.
//
// Calling `set_len()` may result in an error (for example,
// when calling it on `/dev/null`), but we don't want to
// terminate the process when that happens. Instead, we
// suppress the error by calling `Result::ok()`. This matches
// the behavior of GNU `dd` when given the command-line
// argument `of=/dev/null`.
if !self.settings.oconv.notrunc {
self.dst.truncate().ok();
// If the receiver is disconnected, `send()` returns an
// error. Since it is just reporting progress and is not
// crucial to the operation of `dd`, let's just ignore the
// error.
rstat += rstat_update;
wstat += wstat_update;
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false);
if prog_update.duration.as_secs() >= progress_as_secs {
progress_as_secs = prog_update.duration.as_secs() + 1;
prog_tx.send(prog_update).unwrap_or(());
}
}
finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread)
}

// Print the final read/write statistics.
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true);
prog_tx.send(prog_update).unwrap_or(());
// Wait for the output thread to finish
output_thread
.join()
.expect("Failed to join with the output thread.");
Ok(())
/// Flush output, print final stats, and join with the progress thread.
fn finalize<T>(
output: &mut Output,
rstat: ReadStat,
wstat: WriteStat,
start: time::Instant,
prog_tx: &mpsc::Sender<ProgUpdate>,
output_thread: thread::JoinHandle<T>,
) -> std::io::Result<()> {
// Flush the output, if configured to do so.
output.sync()?;

// Truncate the file to the final cursor location.
//
// Calling `set_len()` may result in an error (for example,
// when calling it on `/dev/null`), but we don't want to
// terminate the process when that happens. Instead, we
// suppress the error by calling `Result::ok()`. This matches
// the behavior of GNU `dd` when given the command-line
// argument `of=/dev/null`.
if !output.settings.oconv.notrunc {
output.dst.truncate().ok();
}

// Print the final read/write statistics.
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true);
prog_tx.send(prog_update).unwrap_or(());
// Wait for the output thread to finish
output_thread
.join()
.expect("Failed to join with the output thread.");
Ok(())
}

#[cfg(any(target_os = "linux", target_os = "android"))]
Expand Down Expand Up @@ -925,7 +924,7 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
}
None => Output::new_stdout(&settings)?,
};
o.dd_out(i).map_err_context(|| "IO error".to_string())
dd_copy(i, o).map_err_context(|| "IO error".to_string())
}

pub fn uu_app() -> Command {
Expand Down