Skip to content

Commit

Permalink
feat(meta): add hummock config relevant tables to rw_catalog (#12337)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Sep 15, 2023
1 parent 59bb645 commit c0060b2
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 9 deletions.
15 changes: 15 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,19 @@ message ListBranchedObjectResponse {
repeated BranchedObject branched_objects = 1;
}

message ListActiveWriteLimitRequest {}

message ListActiveWriteLimitResponse {
// < compaction group id, write limit info >
map<uint64, WriteLimits.WriteLimit> write_limits = 1;
}

message ListHummockMetaConfigRequest {}

message ListHummockMetaConfigResponse {
map<string, string> configs = 1;
}

service HummockManagerService {
rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse);
rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse);
Expand Down Expand Up @@ -664,6 +677,8 @@ service HummockManagerService {
rpc RiseCtlListCompactionStatus(RiseCtlListCompactionStatusRequest) returns (RiseCtlListCompactionStatusResponse);
rpc SubscribeCompactionEvent(stream SubscribeCompactionEventRequest) returns (stream SubscribeCompactionEventResponse);
rpc ListBranchedObject(ListBranchedObjectRequest) returns (ListBranchedObjectResponse);
rpc ListActiveWriteLimit(ListActiveWriteLimitRequest) returns (ListActiveWriteLimitResponse);
rpc ListHummockMetaConfig(ListHummockMetaConfigRequest) returns (ListHummockMetaConfigResponse);
}

message CompactionConfig {
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ prepare_sys_catalog! {
{ BuiltinCatalog::Table(&RW_HUMMOCK_CHECKPOINT_VERSION), read_hummock_checkpoint_version await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_VERSION_DELTAS), read_hummock_version_deltas await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_BRANCHED_OBJECTS), read_hummock_branched_objects await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_COMPACTION_GROUP_CONFIGS), read_hummock_compaction_group_configs await },
{ BuiltinCatalog::Table(&RW_HUMMOCK_META_CONFIGS), read_hummock_meta_configs await},
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mod rw_ddl_progress;
mod rw_fragments;
mod rw_functions;
mod rw_hummock_branched_objects;
mod rw_hummock_compaction_group_configs;
mod rw_hummock_meta_configs;
mod rw_hummock_pinned_snapshots;
mod rw_hummock_pinned_versions;
mod rw_hummock_version;
Expand Down Expand Up @@ -51,6 +53,8 @@ pub use rw_ddl_progress::*;
pub use rw_fragments::*;
pub use rw_functions::*;
pub use rw_hummock_branched_objects::*;
pub use rw_hummock_compaction_group_configs::*;
pub use rw_hummock_meta_configs::*;
pub use rw_hummock_pinned_snapshots::*;
pub use rw_hummock_pinned_versions::*;
pub use rw_hummock_version::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};
use serde_json::json;

use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};

pub const RW_HUMMOCK_COMPACTION_GROUP_CONFIGS: BuiltinTable = BuiltinTable {
name: "rw_hummock_compaction_group_configs",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Int64, "id"),
(DataType::Int64, "parent_id"),
(DataType::Jsonb, "member_tables"),
(DataType::Jsonb, "compaction_config"),
(DataType::Jsonb, "active_write_limit"),
],
pk: &[0],
};

impl SysCatalogReaderImpl {
pub async fn read_hummock_compaction_group_configs(&self) -> Result<Vec<OwnedRow>> {
let info = self
.meta_client
.list_hummock_compaction_group_configs()
.await?;
let mut write_limits = self.meta_client.list_hummock_active_write_limits().await?;
let mut rows = info
.into_iter()
.map(|i| {
let active_write_limit = write_limits
.remove(&i.id)
.map(|w| ScalarImpl::Jsonb(json!(w).into()));
OwnedRow::new(vec![
Some(ScalarImpl::Int64(i.id as _)),
Some(ScalarImpl::Int64(i.parent_id as _)),
Some(ScalarImpl::Jsonb(json!(i.member_table_ids).into())),
Some(ScalarImpl::Jsonb(json!(i.compaction_config).into())),
active_write_limit,
])
})
.collect_vec();
// As compaction group configs and active write limits are fetched via two RPCs, it's possible there's inconsistency.
// Just leave unknown field blank.
rows.extend(write_limits.into_iter().map(|(cg, w)| {
OwnedRow::new(vec![
Some(ScalarImpl::Int64(cg as _)),
None,
None,
None,
Some(ScalarImpl::Jsonb(json!(w).into())),
])
}));
Ok(rows)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use risingwave_common::catalog::RW_CATALOG_SCHEMA_NAME;
use risingwave_common::error::Result;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, ScalarImpl};

use crate::catalog::system_catalog::{BuiltinTable, SysCatalogReaderImpl};

pub const RW_HUMMOCK_META_CONFIGS: BuiltinTable = BuiltinTable {
name: "rw_hummock_meta_configs",
schema: RW_CATALOG_SCHEMA_NAME,
columns: &[
(DataType::Varchar, "config_name"),
(DataType::Varchar, "config_value"),
],
pk: &[0],
};

impl SysCatalogReaderImpl {
pub async fn read_hummock_meta_configs(&self) -> Result<Vec<OwnedRow>> {
let configs = self
.meta_client
.list_hummock_meta_configs()
.await?
.into_iter()
.sorted()
.map(|(k, v)| {
OwnedRow::new(vec![
Some(ScalarImpl::Utf8(k.into())),
Some(ScalarImpl::Utf8(v.into())),
])
})
.collect_vec();
Ok(configs)
}
}
21 changes: 20 additions & 1 deletion src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::catalog::Table;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, HummockSnapshot, HummockVersion, HummockVersionDelta,
BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
Expand Down Expand Up @@ -86,6 +87,12 @@ pub trait FrontendMetaClient: Send + Sync {
async fn list_version_deltas(&self) -> Result<Vec<HummockVersionDelta>>;

async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>>;

async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>>;

async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>>;

async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>>;
}

pub struct FrontendMetaClientImpl(pub MetaClient);
Expand Down Expand Up @@ -214,4 +221,16 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
async fn list_branched_objects(&self) -> Result<Vec<BranchedObject>> {
self.0.list_branched_object().await
}

async fn list_hummock_compaction_group_configs(&self) -> Result<Vec<CompactionGroupInfo>> {
self.0.risectl_list_compaction_group().await
}

async fn list_hummock_active_write_limits(&self) -> Result<HashMap<u64, WriteLimit>> {
self.0.list_active_write_limit().await
}

async fn list_hummock_meta_configs(&self) -> Result<HashMap<String, String>> {
self.0.list_hummock_meta_config().await
}
}
15 changes: 14 additions & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ use risingwave_pb::catalog::{
PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable, PbView, Table,
};
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress};
use risingwave_pb::hummock::write_limits::WriteLimit;
use risingwave_pb::hummock::{
BranchedObject, HummockSnapshot, HummockVersion, HummockVersionDelta,
BranchedObject, CompactionGroupInfo, HummockSnapshot, HummockVersion, HummockVersionDelta,
};
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
Expand Down Expand Up @@ -850,6 +851,18 @@ impl FrontendMetaClient for MockFrontendMetaClient {
async fn list_branched_objects(&self) -> RpcResult<Vec<BranchedObject>> {
unimplemented!()
}

async fn list_hummock_compaction_group_configs(&self) -> RpcResult<Vec<CompactionGroupInfo>> {
unimplemented!()
}

async fn list_hummock_active_write_limits(&self) -> RpcResult<HashMap<u64, WriteLimit>> {
unimplemented!()
}

async fn list_hummock_meta_configs(&self) -> RpcResult<HashMap<String, String>> {
unimplemented!()
}
}

#[cfg(test)]
Expand Down
7 changes: 3 additions & 4 deletions src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;

use arc_swap::ArcSwap;
use itertools::Itertools;
use risingwave_backup::error::BackupError;
use risingwave_backup::storage::{BoxedMetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest};
Expand Down Expand Up @@ -307,16 +307,15 @@ impl BackupManager {
}

/// List all `SSTables` required by backups.
pub fn list_pinned_ssts(&self) -> Vec<HummockSstableObjectId> {
pub fn list_pinned_ssts(&self) -> HashSet<HummockSstableObjectId> {
self.backup_store
.load()
.0
.manifest()
.snapshot_metadata
.iter()
.flat_map(|s| s.ssts.clone())
.dedup()
.collect_vec()
.collect()
}

pub fn manifest(&self) -> Arc<MetaSnapshotManifest> {
Expand Down
3 changes: 1 addition & 2 deletions src/meta/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ impl VacuumManager {
&self,
objects_to_delete: &mut Vec<HummockSstableObjectId>,
) -> MetaResult<()> {
let reject: HashSet<HummockSstableObjectId> =
self.backup_manager.list_pinned_ssts().into_iter().collect();
let reject = self.backup_manager.list_pinned_ssts();
// Ack these SSTs immediately, because they tend to be pinned for long time.
// They will be GCed during full GC when they are no longer pinned.
let to_ack = objects_to_delete
Expand Down
73 changes: 72 additions & 1 deletion src/meta/src/rpc/service/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::time::Duration;

use futures::StreamExt;
Expand Down Expand Up @@ -48,6 +48,18 @@ impl HummockServiceImpl {
}
}

macro_rules! fields_to_kvs {
($struct:ident, $($field:ident),*) => {
{
let mut kvs = HashMap::default();
$(
kvs.insert(stringify!($field).to_string(), $struct.$field.to_string());
)*
kvs
}
}
}

#[async_trait::async_trait]
impl HummockManagerService for HummockServiceImpl {
type SubscribeCompactionEventStream = RwReceiverStream<SubscribeCompactionEventResponse>;
Expand Down Expand Up @@ -536,4 +548,63 @@ impl HummockManagerService for HummockServiceImpl {
branched_objects,
}))
}

async fn list_active_write_limit(
&self,
_request: Request<ListActiveWriteLimitRequest>,
) -> Result<Response<ListActiveWriteLimitResponse>, Status> {
Ok(Response::new(ListActiveWriteLimitResponse {
write_limits: self.hummock_manager.write_limits().await,
}))
}

async fn list_hummock_meta_config(
&self,
_request: Request<ListHummockMetaConfigRequest>,
) -> Result<Response<ListHummockMetaConfigResponse>, Status> {
let opt = &self.hummock_manager.env.opts;
let configs = fields_to_kvs!(
opt,
vacuum_interval_sec,
vacuum_spin_interval_ms,
hummock_version_checkpoint_interval_sec,
min_delta_log_num_for_hummock_version_checkpoint,
min_sst_retention_time_sec,
full_gc_interval_sec,
collect_gc_watermark_spin_interval_sec,
periodic_compaction_interval_sec,
periodic_space_reclaim_compaction_interval_sec,
periodic_ttl_reclaim_compaction_interval_sec,
periodic_tombstone_reclaim_compaction_interval_sec,
periodic_split_compact_group_interval_sec,
split_group_size_limit,
min_table_split_size,
do_not_config_object_storage_lifecycle,
partition_vnode_count,
table_write_throughput_threshold,
min_table_split_write_throughput,
compaction_task_max_heartbeat_interval_secs
);
Ok(Response::new(ListHummockMetaConfigResponse { configs }))
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
#[test]
fn test_fields_to_kvs() {
struct S {
foo: u64,
bar: String,
}
let s = S {
foo: 15,
bar: "foobar".to_string(),
};
let kvs: HashMap<String, String> = fields_to_kvs!(s, foo, bar);
assert_eq!(kvs.len(), 2);
assert_eq!(kvs.get("foo").unwrap(), "15");
assert_eq!(kvs.get("bar").unwrap(), "foobar");
}
}
Loading

0 comments on commit c0060b2

Please sign in to comment.