From c0060b2ecbcbe16ce518f49237b2f380d1e41e40 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Fri, 15 Sep 2023 15:32:14 +0800 Subject: [PATCH] feat(meta): add hummock config relevant tables to rw_catalog (#12337) --- proto/hummock.proto | 15 ++++ .../src/catalog/system_catalog/mod.rs | 2 + .../catalog/system_catalog/rw_catalog/mod.rs | 4 + .../rw_hummock_compaction_group_configs.rs | 72 ++++++++++++++++++ .../rw_catalog/rw_hummock_meta_configs.rs | 50 +++++++++++++ src/frontend/src/meta_client.rs | 21 +++++- src/frontend/src/test_utils.rs | 15 +++- src/meta/src/backup_restore/backup_manager.rs | 7 +- src/meta/src/hummock/vacuum.rs | 3 +- src/meta/src/rpc/service/hummock_service.rs | 73 ++++++++++++++++++- src/rpc_client/src/meta_client.rs | 15 ++++ 11 files changed, 268 insertions(+), 9 deletions(-) create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_group_configs.rs create mode 100644 src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_meta_configs.rs diff --git a/proto/hummock.proto b/proto/hummock.proto index fd16e32457ec..db99cbe5f850 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -633,6 +633,19 @@ message ListBranchedObjectResponse { repeated BranchedObject branched_objects = 1; } +message ListActiveWriteLimitRequest {} + +message ListActiveWriteLimitResponse { + // < compaction group id, write limit info > + map write_limits = 1; +} + +message ListHummockMetaConfigRequest {} + +message ListHummockMetaConfigResponse { + map configs = 1; +} + service HummockManagerService { rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse); rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse); @@ -664,6 +677,8 @@ service HummockManagerService { rpc RiseCtlListCompactionStatus(RiseCtlListCompactionStatusRequest) returns (RiseCtlListCompactionStatusResponse); rpc SubscribeCompactionEvent(stream SubscribeCompactionEventRequest) returns (stream SubscribeCompactionEventResponse); rpc ListBranchedObject(ListBranchedObjectRequest) returns (ListBranchedObjectResponse); + rpc ListActiveWriteLimit(ListActiveWriteLimitRequest) returns (ListActiveWriteLimitResponse); + rpc ListHummockMetaConfig(ListHummockMetaConfigRequest) returns (ListHummockMetaConfigResponse); } message CompactionConfig { diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index 59685698ddc9..185f8017311c 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -409,6 +409,8 @@ prepare_sys_catalog! { { BuiltinCatalog::Table(&RW_HUMMOCK_CHECKPOINT_VERSION), read_hummock_checkpoint_version await }, { BuiltinCatalog::Table(&RW_HUMMOCK_VERSION_DELTAS), read_hummock_version_deltas await }, { BuiltinCatalog::Table(&RW_HUMMOCK_BRANCHED_OBJECTS), read_hummock_branched_objects await }, + { BuiltinCatalog::Table(&RW_HUMMOCK_COMPACTION_GROUP_CONFIGS), read_hummock_compaction_group_configs await }, + { BuiltinCatalog::Table(&RW_HUMMOCK_META_CONFIGS), read_hummock_meta_configs await}, } #[cfg(test)] diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 58beecbb528a..9f89c9eed5e8 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -20,6 +20,8 @@ mod rw_ddl_progress; mod rw_fragments; mod rw_functions; mod rw_hummock_branched_objects; +mod rw_hummock_compaction_group_configs; +mod rw_hummock_meta_configs; mod rw_hummock_pinned_snapshots; mod rw_hummock_pinned_versions; mod rw_hummock_version; @@ -51,6 +53,8 @@ pub use rw_ddl_progress::*; pub use rw_fragments::*; pub use rw_functions::*; pub use rw_hummock_branched_objects::*; +pub use rw_hummock_compaction_group_configs::*; +pub use rw_hummock_meta_configs::*; pub use rw_hummock_pinned_snapshots::*; pub use rw_hummock_pinned_versions::*; pub use rw_hummock_version::*; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_group_configs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_group_configs.rs new file mode 100644 index 000000000000..758d639388f7 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_compaction_group_configs.rs @@ -0,0 +1,72 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; +use serde_json::json; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_HUMMOCK_COMPACTION_GROUP_CONFIGS: BuiltinTable = BuiltinTable { + name: "rw_hummock_compaction_group_configs", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Int64, "id"), + (DataType::Int64, "parent_id"), + (DataType::Jsonb, "member_tables"), + (DataType::Jsonb, "compaction_config"), + (DataType::Jsonb, "active_write_limit"), + ], + pk: &[0], +}; + +impl SysCatalogReaderImpl { + pub async fn read_hummock_compaction_group_configs(&self) -> Result> { + let info = self + .meta_client + .list_hummock_compaction_group_configs() + .await?; + let mut write_limits = self.meta_client.list_hummock_active_write_limits().await?; + let mut rows = info + .into_iter() + .map(|i| { + let active_write_limit = write_limits + .remove(&i.id) + .map(|w| ScalarImpl::Jsonb(json!(w).into())); + OwnedRow::new(vec![ + Some(ScalarImpl::Int64(i.id as _)), + Some(ScalarImpl::Int64(i.parent_id as _)), + Some(ScalarImpl::Jsonb(json!(i.member_table_ids).into())), + Some(ScalarImpl::Jsonb(json!(i.compaction_config).into())), + active_write_limit, + ]) + }) + .collect_vec(); + // As compaction group configs and active write limits are fetched via two RPCs, it's possible there's inconsistency. + // Just leave unknown field blank. + rows.extend(write_limits.into_iter().map(|(cg, w)| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int64(cg as _)), + None, + None, + None, + Some(ScalarImpl::Jsonb(json!(w).into())), + ]) + })); + Ok(rows) + } +} diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_meta_configs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_meta_configs.rs new file mode 100644 index 000000000000..e28dc0a926c2 --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_hummock_meta_configs.rs @@ -0,0 +1,50 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME; +use risingwave_common::error::Result; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{DataType, ScalarImpl}; + +use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl}; + +pub const RW_HUMMOCK_META_CONFIGS: BuiltinTable = BuiltinTable { + name: "rw_hummock_meta_configs", + schema: RW_CATALOG_SCHEMA_NAME, + columns: &[ + (DataType::Varchar, "config_name"), + (DataType::Varchar, "config_value"), + ], + pk: &[0], +}; + +impl SysCatalogReaderImpl { + pub async fn read_hummock_meta_configs(&self) -> Result> { + let configs = self + .meta_client + .list_hummock_meta_configs() + .await? + .into_iter() + .sorted() + .map(|(k, v)| { + OwnedRow::new(vec![ + Some(ScalarImpl::Utf8(k.into())), + Some(ScalarImpl::Utf8(v.into())), + ]) + }) + .collect_vec(); + Ok(configs) + } +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index c45cf1237726..ae90c2e345f9 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -18,8 +18,9 @@ use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::catalog::Table; use risingwave_pb::ddl_service::DdlProgress; +use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - BranchedObject, HummockSnapshot, HummockVersion, HummockVersionDelta, + BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; @@ -86,6 +87,12 @@ pub trait FrontendMetaClient: Send + Sync { async fn list_version_deltas(&self) -> Result>; async fn list_branched_objects(&self) -> Result>; + + async fn list_hummock_compaction_group_configs(&self) -> Result>; + + async fn list_hummock_active_write_limits(&self) -> Result>; + + async fn list_hummock_meta_configs(&self) -> Result>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -214,4 +221,16 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn list_branched_objects(&self) -> Result> { self.0.list_branched_object().await } + + async fn list_hummock_compaction_group_configs(&self) -> Result> { + self.0.risectl_list_compaction_group().await + } + + async fn list_hummock_active_write_limits(&self) -> Result> { + self.0.list_active_write_limit().await + } + + async fn list_hummock_meta_configs(&self) -> Result> { + self.0.list_hummock_meta_config().await + } } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index abdfa90c064b..20eb252fc505 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -35,8 +35,9 @@ use risingwave_pb::catalog::{ PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table, }; use risingwave_pb::ddl_service::{create_connection_request, DdlProgress}; +use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ - BranchedObject, HummockSnapshot, HummockVersion, HummockVersionDelta, + BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta, }; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; @@ -850,6 +851,18 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn list_branched_objects(&self) -> RpcResult> { unimplemented!() } + + async fn list_hummock_compaction_group_configs(&self) -> RpcResult> { + unimplemented!() + } + + async fn list_hummock_active_write_limits(&self) -> RpcResult> { + unimplemented!() + } + + async fn list_hummock_meta_configs(&self) -> RpcResult> { + unimplemented!() + } } #[cfg(test)] diff --git a/src/meta/src/backup_restore/backup_manager.rs b/src/meta/src/backup_restore/backup_manager.rs index 1eea48307cb3..c280572c796d 100644 --- a/src/meta/src/backup_restore/backup_manager.rs +++ b/src/meta/src/backup_restore/backup_manager.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; use std::time::Instant; use arc_swap::ArcSwap; -use itertools::Itertools; use risingwave_backup::error::BackupError; use risingwave_backup::storage::{BoxedMetaSnapshotStorage, ObjectStoreMetaSnapshotStorage}; use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest}; @@ -307,7 +307,7 @@ impl BackupManager { } /// List all `SSTables` required by backups. - pub fn list_pinned_ssts(&self) -> Vec { + pub fn list_pinned_ssts(&self) -> HashSet { self.backup_store .load() .0 @@ -315,8 +315,7 @@ impl BackupManager { .snapshot_metadata .iter() .flat_map(|s| s.ssts.clone()) - .dedup() - .collect_vec() + .collect() } pub fn manifest(&self) -> Arc { diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 31f4651d6fdf..992deb5e636c 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -164,8 +164,7 @@ impl VacuumManager { &self, objects_to_delete: &mut Vec, ) -> MetaResult<()> { - let reject: HashSet = - self.backup_manager.list_pinned_ssts().into_iter().collect(); + let reject = self.backup_manager.list_pinned_ssts(); // Ack these SSTs immediately, because they tend to be pinned for long time. // They will be GCed during full GC when they are no longer pinned. let to_ack = objects_to_delete diff --git a/src/meta/src/rpc/service/hummock_service.rs b/src/meta/src/rpc/service/hummock_service.rs index a8419da6e207..161e92ddad11 100644 --- a/src/meta/src/rpc/service/hummock_service.rs +++ b/src/meta/src/rpc/service/hummock_service.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use futures::StreamExt; @@ -48,6 +48,18 @@ impl HummockServiceImpl { } } +macro_rules! fields_to_kvs { + ($struct:ident, $($field:ident),*) => { + { + let mut kvs = HashMap::default(); + $( + kvs.insert(stringify!($field).to_string(), $struct.$field.to_string()); + )* + kvs + } + } +} + #[async_trait::async_trait] impl HummockManagerService for HummockServiceImpl { type SubscribeCompactionEventStream = RwReceiverStream; @@ -536,4 +548,63 @@ impl HummockManagerService for HummockServiceImpl { branched_objects, })) } + + async fn list_active_write_limit( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(ListActiveWriteLimitResponse { + write_limits: self.hummock_manager.write_limits().await, + })) + } + + async fn list_hummock_meta_config( + &self, + _request: Request, + ) -> Result, Status> { + let opt = &self.hummock_manager.env.opts; + let configs = fields_to_kvs!( + opt, + vacuum_interval_sec, + vacuum_spin_interval_ms, + hummock_version_checkpoint_interval_sec, + min_delta_log_num_for_hummock_version_checkpoint, + min_sst_retention_time_sec, + full_gc_interval_sec, + collect_gc_watermark_spin_interval_sec, + periodic_compaction_interval_sec, + periodic_space_reclaim_compaction_interval_sec, + periodic_ttl_reclaim_compaction_interval_sec, + periodic_tombstone_reclaim_compaction_interval_sec, + periodic_split_compact_group_interval_sec, + split_group_size_limit, + min_table_split_size, + do_not_config_object_storage_lifecycle, + partition_vnode_count, + table_write_throughput_threshold, + min_table_split_write_throughput, + compaction_task_max_heartbeat_interval_secs + ); + Ok(Response::new(ListHummockMetaConfigResponse { configs })) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + #[test] + fn test_fields_to_kvs() { + struct S { + foo: u64, + bar: String, + } + let s = S { + foo: 15, + bar: "foobar".to_string(), + }; + let kvs: HashMap = fields_to_kvs!(s, foo, bar); + assert_eq!(kvs.len(), 2); + assert_eq!(kvs.get("foo").unwrap(), "15"); + assert_eq!(kvs.get("bar").unwrap(), "foobar"); + } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index e4b837ba517f..8ebbded3dd08 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -53,6 +53,7 @@ use risingwave_pb::ddl_service::*; use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::hummock::subscribe_compaction_event_request::Register; +use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::*; use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; @@ -1050,6 +1051,18 @@ impl MetaClient { Ok(resp.branched_objects) } + pub async fn list_active_write_limit(&self) -> Result> { + let req = ListActiveWriteLimitRequest {}; + let resp = self.inner.list_active_write_limit(req).await?; + Ok(resp.write_limits) + } + + pub async fn list_hummock_meta_config(&self) -> Result> { + let req = ListHummockMetaConfigRequest {}; + let resp = self.inner.list_hummock_meta_config(req).await?; + Ok(resp.configs) + } + pub async fn delete_worker_node(&self, worker: HostAddress) -> Result<()> { let _resp = self .inner @@ -1716,6 +1729,8 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, rise_ctl_list_compaction_status, RiseCtlListCompactionStatusRequest, RiseCtlListCompactionStatusResponse } ,{ hummock_client, subscribe_compaction_event, impl tonic::IntoStreamingRequest, Streaming } ,{ hummock_client, list_branched_object, ListBranchedObjectRequest, ListBranchedObjectResponse } + ,{ hummock_client, list_active_write_limit, ListActiveWriteLimitRequest, ListActiveWriteLimitResponse } + ,{ hummock_client, list_hummock_meta_config, ListHummockMetaConfigRequest, ListHummockMetaConfigResponse } ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } ,{ user_client, update_user, UpdateUserRequest, UpdateUserResponse } ,{ user_client, drop_user, DropUserRequest, DropUserResponse }