Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce cluster limit #18383

Merged
merged 15 commits into from
Sep 6, 2024
1 change: 1 addition & 0 deletions e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ user backfill_rate_limit
user background_ddl
user batch_enable_distributed_dml
user batch_parallelism
user bypass_cluster_limits
user bytea_output
user cdc_source_wait_streaming_start_timeout
user client_encoding
Expand Down
27 changes: 27 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -789,3 +789,30 @@ message RelationIdInfos {
// relation_id -> FragmentIdToActorIdMap
map<uint32, FragmentIdToActorIdMap> map = 1;
}

message ActorCountPerParallelism {
message WorkerActorCount {
uint64 actor_count = 1;
uint64 parallelism = 2;
}
map<uint32, WorkerActorCount> worker_id_to_actor_count = 1;
uint64 hard_limit = 2;
uint64 soft_limit = 3;
}

message ClusterLimit {
oneof limit {
ActorCountPerParallelism actor_count = 1;
// TODO: limit DDL using compaction pending bytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure they are either-or relation?

}
}

message GetClusterLimitsRequest {}

message GetClusterLimitsResponse {
repeated ClusterLimit active_limits = 1;
}

service ClusterLimitService {
rpc GetClusterLimits(GetClusterLimitsRequest) returns (GetClusterLimitsResponse);
}
18 changes: 18 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,16 @@ pub struct MetaDeveloperConfig {

#[serde(default = "default::developer::max_get_task_probe_times")]
pub max_get_task_probe_times: usize,

/// Max number of actor allowed per parallelism (default = 100).
/// CREATE MV/Table will be noticed when the number of actors exceeds this limit.
#[serde(default = "default::developer::actor_cnt_per_worker_parallelism_soft_limit")]
pub actor_cnt_per_worker_parallelism_soft_limit: usize,

/// Max number of actor allowed per parallelism (default = 400).
/// CREATE MV/Table will be rejected when the number of actors exceeds this limit.
#[serde(default = "default::developer::actor_cnt_per_worker_parallelism_hard_limit")]
pub actor_cnt_per_worker_parallelism_hard_limit: usize,
}

/// The section `[server]` in `risingwave.toml`.
Expand Down Expand Up @@ -1859,6 +1869,14 @@ pub mod default {
5
}

pub fn actor_cnt_per_worker_parallelism_soft_limit() -> usize {
100
}

pub fn actor_cnt_per_worker_parallelism_hard_limit() -> usize {
400
}

pub fn memory_controller_threshold_aggressive() -> f64 {
0.9
}
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ pub struct SessionConfig {

#[parameter(default = "hex", check_hook = check_bytea_output)]
bytea_output: String,

/// Bypass checks on cluster limits
///
/// When enabled, `CREATE MATERIALIZED VIEW` will not fail if the cluster limit is hit.
#[parameter(default = false)]
bypass_cluster_limits: bool,
}

fn check_timezone(val: &str) -> Result<(), String> {
Expand Down
134 changes: 134 additions & 0 deletions src/common/src/util/cluster_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::{self, Display, Formatter};

use risingwave_pb::meta::actor_count_per_parallelism::PbWorkerActorCount;
use risingwave_pb::meta::cluster_limit::PbLimit;
use risingwave_pb::meta::{PbActorCountPerParallelism, PbClusterLimit};
pub enum ClusterLimit {
ActorCount(ActorCountPerParallelism),
}

impl From<ClusterLimit> for PbClusterLimit {
fn from(limit: ClusterLimit) -> Self {
match limit {
ClusterLimit::ActorCount(actor_count_per_parallelism) => PbClusterLimit {
limit: Some(PbLimit::ActorCount(actor_count_per_parallelism.into())),
},
}
}
}

impl From<PbClusterLimit> for ClusterLimit {
fn from(pb_limit: PbClusterLimit) -> Self {
match pb_limit.limit.unwrap() {
PbLimit::ActorCount(actor_count_per_parallelism) => {
ClusterLimit::ActorCount(actor_count_per_parallelism.into())
}
}
}
}

#[derive(Debug)]
pub struct WorkerActorCount {
pub actor_count: usize,
pub parallelism: usize,
}

impl From<WorkerActorCount> for PbWorkerActorCount {
fn from(worker_actor_count: WorkerActorCount) -> Self {
PbWorkerActorCount {
actor_count: worker_actor_count.actor_count as u64,
parallelism: worker_actor_count.parallelism as u64,
}
}
}

impl From<PbWorkerActorCount> for WorkerActorCount {
fn from(pb_worker_actor_count: PbWorkerActorCount) -> Self {
WorkerActorCount {
actor_count: pb_worker_actor_count.actor_count as usize,
parallelism: pb_worker_actor_count.parallelism as usize,
}
}
}

pub struct ActorCountPerParallelism {
pub worker_id_to_actor_count: HashMap<u32, WorkerActorCount>,
pub hard_limit: usize,
pub soft_limit: usize,
}

impl From<ActorCountPerParallelism> for PbActorCountPerParallelism {
fn from(actor_count_per_parallelism: ActorCountPerParallelism) -> Self {
PbActorCountPerParallelism {
worker_id_to_actor_count: actor_count_per_parallelism
.worker_id_to_actor_count
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
hard_limit: actor_count_per_parallelism.hard_limit as u64,
soft_limit: actor_count_per_parallelism.soft_limit as u64,
}
}
}

impl From<PbActorCountPerParallelism> for ActorCountPerParallelism {
fn from(pb_actor_count_per_parallelism: PbActorCountPerParallelism) -> Self {
ActorCountPerParallelism {
worker_id_to_actor_count: pb_actor_count_per_parallelism
.worker_id_to_actor_count
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect(),
hard_limit: pb_actor_count_per_parallelism.hard_limit as usize,
soft_limit: pb_actor_count_per_parallelism.soft_limit as usize,
}
}
}

impl ActorCountPerParallelism {
pub fn exceed_hard_limit(&self) -> bool {
self.worker_id_to_actor_count
.values()
.any(|v| v.actor_count > self.hard_limit.saturating_mul(v.parallelism))
}

pub fn exceed_soft_limit(&self) -> bool {
self.worker_id_to_actor_count
.values()
.any(|v| v.actor_count > self.soft_limit.saturating_mul(v.parallelism))
}

pub fn exceed_limit(&self) -> bool {
self.exceed_soft_limit() || self.exceed_hard_limit()
}
}

impl Display for ActorCountPerParallelism {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let worker_id_to_actor_count_str: Vec<_> = self
.worker_id_to_actor_count
.iter()
.map(|(k, v)| format!("{} -> {:?}", k, v))
.collect();
write!(
f,
"ActorCountPerParallelism {{ critical limit: {:?}, recommended limit: {:?}. worker_id_to_actor_count: {:?} }}",
self.hard_limit, self.soft_limit, worker_id_to_actor_count_str
)
}
}
1 change: 1 addition & 0 deletions src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ pub mod tracing;
pub mod value_encoding;
pub mod worker_util;
pub use tokio_util;
pub mod cluster_limit;
2 changes: 2 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ meta_enable_trivial_move = true
meta_enable_check_task_level_overlap = false
meta_max_trivial_move_task_count_per_loop = 256
meta_max_get_task_probe_times = 5
meta_actor_cnt_per_worker_parallelism_soft_limit = 100
meta_actor_cnt_per_worker_parallelism_hard_limit = 400

[batch]
enable_barrier_read = false
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ mod rw_worker_nodes;

mod rw_actor_id_to_ddl;
mod rw_fragment_id_to_ddl;
mod rw_worker_actor_count;
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::Fields;
use risingwave_frontend_macro::system_catalog;

#[system_catalog(
view,
"rw_catalog.rw_worker_actor_count",
"SELECT t2.id as worker_id, parallelism, count(*) as actor_count
FROM rw_actors t1, rw_worker_nodes t2
where t1.worker_id = t2.id
GROUP BY t2.id, t2.parallelism;"
)]
#[derive(Fields)]
struct RwWorkerActorCount {
worker_id: i32,
parallelism: i32,
actor_count: i64,
}
3 changes: 3 additions & 0 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ pub async fn handle_create_mv_bound(
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

// Check cluster limits
session.check_cluster_limits().await?;

if let Either::Right(resp) = session.check_relation_name_duplicated(
name.clone(),
StatementType::CREATE_MATERIALIZED_VIEW,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ pub async fn handle_create_sink(
) -> Result<RwPgResponse> {
let session = handle_args.session.clone();

session.check_cluster_limits().await?;

if let Either::Right(resp) = session.check_relation_name_duplicated(
stmt.sink_name.clone(),
StatementType::CREATE_SINK,
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,8 @@ pub async fn handle_create_table(
session.notice_to_user("APPEND ONLY TABLE is currently an experimental feature.");
}

session.check_cluster_limits().await?;

if let Either::Right(resp) = session.check_relation_name_duplicated(
table_name.clone(),
StatementType::CREATE_TABLE,
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use anyhow::Context;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::util::cluster_limit::ClusterLimit;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
Expand Down Expand Up @@ -136,6 +137,8 @@ pub trait FrontendMetaClient: Send + Sync {
) -> Result<Vec<u64>>;

async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus>;

async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>>;
}

pub struct FrontendMetaClientImpl(pub MetaClient);
Expand Down Expand Up @@ -345,4 +348,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
async fn get_cluster_recovery_status(&self) -> Result<RecoveryStatus> {
self.0.get_cluster_recovery_status().await
}

async fn get_cluster_limits(&self) -> Result<Vec<ClusterLimit>> {
self.0.get_cluster_limits().await
}
}
44 changes: 43 additions & 1 deletion src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ use risingwave_common::telemetry::manager::TelemetryManager;
use risingwave_common::telemetry::telemetry_env_enabled;
use risingwave_common::types::DataType;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::resource_util;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::{cluster_limit, resource_util};
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_common_service::{MetricsManager, ObserverManager};
Expand Down Expand Up @@ -1194,6 +1195,47 @@ impl SessionImpl {
pub fn temporary_source_manager(&self) -> TemporarySourceManager {
self.temporary_source_manager.lock().clone()
}

pub async fn check_cluster_limits(&self) -> Result<()> {
if self.config().bypass_cluster_limits() {
return Ok(());
}

let gen_message = |violated_limit: &ActorCountPerParallelism,
exceed_hard_limit: bool|
-> String {
let (limit_type, action) = if exceed_hard_limit {
("critical", "Please scale the cluster before proceeding!")
} else {
("recommended", "Scaling the cluster is recommended.")
};
format!(
"\n- {}\n- {}\n- {}\n- {}\n- {}\n{}",
format_args!("Actor count per parallelism exceeds the {} limit.", limit_type),
format_args!("Depending on your workload, this may overload the cluster and cause performance/stability issues. {}", action),
"Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.",
"You can bypass this check via SQL `SET bypass_cluster_limits TO true`.",
"You can check actor count distribution via SQL `SELECT * FROM rw_worker_actor_count`.",
violated_limit,
)
};

let limits = self.env().meta_client().get_cluster_limits().await?;
for limit in limits {
match limit {
cluster_limit::ClusterLimit::ActorCount(l) => {
if l.exceed_hard_limit() {
return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
&l, true,
))));
} else if l.exceed_soft_limit() {
self.notice_to_user(gen_message(&l, false));
}
}
}
}
Ok(())
}
}

pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
Expand Down
Loading
Loading