Skip to content

Commit

Permalink
feat(memory): add reserved_memory_bytes opt (#16433)
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 authored Apr 24, 2024
1 parent 605bf0f commit dd57b28
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ mod test {
connector_rpc_sink_payload_format: None,
config_path: "src/config/test.toml",
total_memory_bytes: 34359738368,
reserved_memory_bytes: None,
parallelism: 10,
role: Both,
metrics_level: None,
Expand Down
7 changes: 7 additions & 0 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_TOTAL_MEMORY_BYTES", default_value_t = default_total_memory_bytes())]
pub total_memory_bytes: usize,

/// Reserved memory for the compute node in bytes.
/// If not set, a portion (default to 30%) for the total_memory_bytes will be used as the reserved memory.
///
/// The total memory compute and storage can use is `total_memory_bytes` - `reserved_memory_bytes`.
#[clap(long, env = "RW_RESERVED_MEMORY_BYTES")]
pub reserved_memory_bytes: Option<usize>,

/// The parallelism that the compute node will register to the scheduler of the meta service.
#[clap(long, env = "RW_PARALLELISM", default_value_t = default_parallelism())]
#[override_opts(if_absent, path = streaming.actor_runtime_worker_threads_num)]
Expand Down
54 changes: 40 additions & 14 deletions src/compute/src/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use risingwave_common::config::{
};
use risingwave_common::util::pretty_bytes::convert;

use crate::ComputeNodeOpts;

/// The minimal memory requirement of computing tasks in megabytes.
pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
/// The memory reserved for system usage (stack and code segment of processes, allocation
Expand All @@ -41,20 +43,24 @@ const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
/// Each compute node reserves some memory for stack and code segment of processes, allocation
/// overhead, network buffer, etc. based on `SYSTEM_RESERVED_MEMORY_PROPORTION`. The reserve memory
/// size must be larger than `MIN_SYSTEM_RESERVED_MEMORY_MB`
pub fn reserve_memory_bytes(total_memory_bytes: usize) -> (usize, usize) {
if total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 {
pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) {
if opts.total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 {
panic!(
"The total memory size ({}) is too small. It must be at least {} MB.",
convert(total_memory_bytes as _),
convert(opts.total_memory_bytes as _),
MIN_COMPUTE_MEMORY_MB
);
}

let reserved = std::cmp::max(
(total_memory_bytes as f64 * SYSTEM_RESERVED_MEMORY_PROPORTION).ceil() as usize,
MIN_SYSTEM_RESERVED_MEMORY_MB << 20,
);
(reserved, total_memory_bytes - reserved)
// If `reserved_memory_bytes` is not set, use `SYSTEM_RESERVED_MEMORY_PROPORTION` * `total_memory_bytes`.
let reserved = opts.reserved_memory_bytes.unwrap_or_else(|| {
(opts.total_memory_bytes as f64 * SYSTEM_RESERVED_MEMORY_PROPORTION).ceil() as usize
});

// Should have at least `MIN_SYSTEM_RESERVED_MEMORY_MB` for reserved memory.
let reserved = std::cmp::max(reserved, MIN_SYSTEM_RESERVED_MEMORY_MB << 20);

(reserved, opts.total_memory_bytes - reserved)
}

/// Decide the memory limit for each storage cache. If not specified in `StorageConfig`, memory
Expand Down Expand Up @@ -230,21 +236,41 @@ pub fn storage_memory_config(

#[cfg(test)]
mod tests {
use clap::Parser;
use risingwave_common::config::StorageConfig;

use super::{reserve_memory_bytes, storage_memory_config};
use crate::ComputeNodeOpts;

#[test]
fn test_reserve_memory_bytes() {
// at least 512 MB
let (reserved, non_reserved) = reserve_memory_bytes(1536 << 20);
assert_eq!(reserved, 512 << 20);
assert_eq!(non_reserved, 1024 << 20);
{
let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
opts.total_memory_bytes = 1536 << 20;
let (reserved, non_reserved) = reserve_memory_bytes(&opts);
assert_eq!(reserved, 512 << 20);
assert_eq!(non_reserved, 1024 << 20);
}

// reserve based on proportion
let (reserved, non_reserved) = reserve_memory_bytes(10 << 30);
assert_eq!(reserved, 3 << 30);
assert_eq!(non_reserved, 7 << 30);
{
let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
opts.total_memory_bytes = 10 << 30;
let (reserved, non_reserved) = reserve_memory_bytes(&opts);
assert_eq!(reserved, 3 << 30);
assert_eq!(non_reserved, 7 << 30);
}

// reserve based on opts
{
let mut opts = ComputeNodeOpts::parse_from(vec![] as Vec<String>);
opts.total_memory_bytes = 10 << 30;
opts.reserved_memory_bytes = Some(2 << 30);
let (reserved, non_reserved) = reserve_memory_bytes(&opts);
assert_eq!(reserved, 2 << 30);
assert_eq!(non_reserved, 8 << 30);
}
}

#[test]
Expand Down
5 changes: 2 additions & 3 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub async fn compute_node_serve(

// Register to the cluster. We're not ready to serve until activate is called.
let (meta_client, system_params) = MetaClient::register_new(
opts.meta_address,
opts.meta_address.clone(),
WorkerType::ComputeNode,
&advertise_addr,
Property {
Expand All @@ -122,8 +122,7 @@ pub async fn compute_node_serve(
let embedded_compactor_enabled =
embedded_compactor_enabled(state_store_url, config.storage.disable_remote_compactor);

let (reserved_memory_bytes, non_reserved_memory_bytes) =
reserve_memory_bytes(opts.total_memory_bytes);
let (reserved_memory_bytes, non_reserved_memory_bytes) = reserve_memory_bytes(&opts);
let storage_memory_config = storage_memory_config(
non_reserved_memory_bytes,
embedded_compactor_enabled,
Expand Down

0 comments on commit dd57b28

Please sign in to comment.