diff --git a/src/meta/src/hummock/compaction_scheduler.rs b/src/meta/src/hummock/compaction_scheduler.rs index b6a31a005a81..e301edf20fff 100644 --- a/src/meta/src/hummock/compaction_scheduler.rs +++ b/src/meta/src/hummock/compaction_scheduler.rs @@ -22,8 +22,6 @@ use futures::{FutureExt, Stream, StreamExt}; use parking_lot::Mutex; use risingwave_common::util::select_all; use risingwave_hummock_sdk::compact::compact_task_to_string; -use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; -use risingwave_hummock_sdk::table_stats::PbTableStatsMap; use risingwave_hummock_sdk::CompactionGroupId; use risingwave_pb::hummock::compact_task::{self, TaskStatus}; use risingwave_pb::hummock::subscribe_compact_tasks_response::Task; @@ -44,21 +42,13 @@ use crate::storage::MetaStore; pub type CompactionSchedulerRef = Arc>; pub type CompactionRequestChannelRef = Arc; -const HISTORY_TABLE_INFO_WINDOW_SIZE: usize = 16; -#[derive(Clone, Debug)] -pub enum CompactionRequestItem { - Compact { - compaction_group: CompactionGroupId, - task_type: compact_task::TaskType, - }, - SplitLargeGroup(PbTableStatsMap), -} +type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType); /// [`CompactionRequestChannel`] wrappers a mpsc channel and deduplicate requests from same /// compaction groups. pub struct CompactionRequestChannel { - request_tx: UnboundedSender, + request_tx: UnboundedSender, scheduled: Mutex>, } @@ -72,7 +62,7 @@ pub enum ScheduleStatus { } impl CompactionRequestChannel { - pub fn new(request_tx: UnboundedSender) -> Self { + pub fn new(request_tx: UnboundedSender) -> Self { Self { request_tx, scheduled: Default::default(), @@ -84,29 +74,17 @@ impl CompactionRequestChannel { &self, compaction_group: CompactionGroupId, task_type: compact_task::TaskType, - ) -> Result> { + ) -> Result> { let mut guard = self.scheduled.lock(); let key = (compaction_group, task_type); if guard.contains(&key) { return Ok(false); } - self.request_tx.send(CompactionRequestItem::Compact { - compaction_group, - task_type, - })?; + self.request_tx.send(key)?; guard.insert(key); Ok(true) } - /// Enqueues only if the target is not yet in queue. - pub fn try_split_groups( - &self, - stats: PbTableStatsMap, - ) -> Result<(), SendError> { - self.request_tx - .send(CompactionRequestItem::SplitLargeGroup(stats)) - } - pub fn unschedule( &self, compaction_group: CompactionGroupId, @@ -143,7 +121,8 @@ where } pub async fn start(&self, shutdown_rx: Receiver<()>) { - let (sched_tx, sched_rx) = tokio::sync::mpsc::unbounded_channel::(); + let (sched_tx, sched_rx) = + tokio::sync::mpsc::unbounded_channel::(); let sched_channel = Arc::new(CompactionRequestChannel::new(sched_tx)); self.hummock_manager @@ -346,35 +325,22 @@ where Either::Left((event, _)) => { if let Some(event) = event { match event { - SchedulerEvent::Channel(item) => { + SchedulerEvent::Channel((compaction_group, task_type)) => { // recv - match item { - CompactionRequestItem::Compact { + if !self + .on_handle_compact( compaction_group, + &mut compaction_selectors, task_type, - } => { - if !self - .on_handle_compact( - compaction_group, - &mut compaction_selectors, - task_type, - sched_channel.clone(), - ) - .await - { - self.hummock_manager - .metrics - .compact_skip_frequency - .with_label_values(&["total", "no-compactor"]) - .inc(); - } - } - CompactionRequestItem::SplitLargeGroup(stats) => { - self.collect_table_write_throughput( - stats, - &mut group_infos, - ); - } + sched_channel.clone(), + ) + .await + { + self.hummock_manager + .metrics + .compact_skip_frequency + .with_label_values(&["total", "no-compactor"]) + .inc(); } } SchedulerEvent::DynamicTrigger => { @@ -478,123 +444,11 @@ where true } - - fn collect_table_write_throughput( - &self, - table_stats: PbTableStatsMap, - table_infos: &mut HashMap>, - ) { - for (table_id, stat) in table_stats { - let throughput = (stat.total_value_size + stat.total_key_size) as u64; - let entry = table_infos.entry(table_id).or_default(); - entry.push_back(throughput); - if entry.len() > HISTORY_TABLE_INFO_WINDOW_SIZE { - entry.pop_front(); - } - } - } - - async fn on_handle_check_split_multi_group( - &self, - table_write_throughput: &HashMap>, - ) { - let mut group_infos = self - .hummock_manager - .calculate_compaction_group_statistic() - .await; - group_infos.sort_by_key(|group| group.group_size); - group_infos.reverse(); - let group_size_limit = self.env.opts.split_group_size_limit; - let table_split_limit = self.env.opts.move_table_size_limit; - let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); - let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); - let mut partition_vnode_count = self.env.opts.partition_vnode_count; - for group in &group_infos { - if group.table_statistic.len() == 1 { - continue; - } - - for (table_id, table_size) in &group.table_statistic { - let mut is_high_write_throughput = false; - let mut is_low_write_throughput = true; - if let Some(history) = table_write_throughput.get(table_id) { - if history.len() >= HISTORY_TABLE_INFO_WINDOW_SIZE { - let window_total_size = history.iter().sum::(); - is_high_write_throughput = history.iter().all(|throughput| { - *throughput > self.env.opts.table_write_throughput_threshold - }); - is_low_write_throughput = window_total_size - < (HISTORY_TABLE_INFO_WINDOW_SIZE as u64) - * self.env.opts.min_table_split_write_throughput; - } - } - - if *table_size < group_size_limit && !is_high_write_throughput { - continue; - } - - let parent_group_id = group.group_id; - let mut target_compact_group_id = None; - let mut allow_split_by_table = false; - if *table_size > group_size_limit && is_low_write_throughput { - // do not split a large table and a small table because it would increase IOPS - // of small table. - if parent_group_id != default_group_id && parent_group_id != mv_group_id { - let rest_group_size = group.group_size - *table_size; - if rest_group_size < *table_size && rest_group_size < table_split_limit { - continue; - } - } else { - for group in &group_infos { - // do not move to mv group or state group - if !group.split_by_table || group.group_id == mv_group_id - || group.group_id == default_group_id - || group.group_id == parent_group_id - // do not move state-table to a large group. - || group.group_size + *table_size > group_size_limit - // do not move state-table from group A to group B if this operation would make group B becomes larger than A. - || group.group_size + *table_size > group.group_size - table_size - { - continue; - } - target_compact_group_id = Some(group.group_id); - } - allow_split_by_table = true; - partition_vnode_count = 1; - } - } - - let ret = self - .hummock_manager - .move_state_table_to_compaction_group( - parent_group_id, - &[*table_id], - target_compact_group_id, - allow_split_by_table, - partition_vnode_count, - ) - .await; - match ret { - Ok(_) => { - info!( - "move state table [{}] from group-{} to group-{:?} success, Allow split by table: {}", - table_id, parent_group_id, target_compact_group_id, allow_split_by_table - ); - return; - } - Err(e) => info!( - "failed to move state table [{}] from group-{} to group-{:?} because {:?}", - table_id, parent_group_id, target_compact_group_id, e - ), - } - } - } - } } #[derive(Clone)] pub enum SchedulerEvent { - Channel(CompactionRequestItem), + Channel((CompactionGroupId, compact_task::TaskType)), DynamicTrigger, SpaceReclaimTrigger, TtlReclaimTrigger, @@ -605,7 +459,7 @@ where S: MetaStore, { fn scheduler_event_stream( - sched_rx: UnboundedReceiver, + sched_rx: UnboundedReceiver<(CompactionGroupId, compact_task::TaskType)>, periodic_space_reclaim_compaction_interval_sec: u64, periodic_ttl_reclaim_compaction_interval_sec: u64, periodic_compaction_interval_sec: u64, @@ -657,7 +511,7 @@ mod tests { use crate::hummock::compaction::default_level_selector; use crate::hummock::compaction_scheduler::{ - CompactionRequestChannel, CompactionRequestItem, ScheduleStatus, + CompactionRequestChannel, CompactionRequestChannelItem, ScheduleStatus, }; use crate::hummock::test_utils::{add_ssts, setup_compute_env}; use crate::hummock::CompactionScheduler; @@ -671,7 +525,7 @@ mod tests { CompactionScheduler::new(env, hummock_manager.clone(), compactor_manager.clone()); let (request_tx, _request_rx) = - tokio::sync::mpsc::unbounded_channel::(); + tokio::sync::mpsc::unbounded_channel::(); let request_channel = Arc::new(CompactionRequestChannel::new(request_tx)); // Add a compactor with invalid context_id. diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index f2556f70c8fe..d344c0dd8d05 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -92,6 +92,7 @@ mod worker; use compaction::*; type Snapshot = ArcSwap; +const HISTORY_TABLE_INFO_WINDOW_SIZE: usize = 16; // Update to states are performed as follow: // - Initialize ValTransaction for the meta state to update @@ -124,6 +125,7 @@ pub struct HummockManager { object_store: ObjectStoreRef, version_checkpoint_path: String, pause_version_checkpoint: AtomicBool, + history_table_throughput: parking_lot::RwLock>>, } pub type HummockManagerRef = Arc>; @@ -347,6 +349,7 @@ where object_store, version_checkpoint_path: checkpoint_path, pause_version_checkpoint: AtomicBool::new(false), + history_table_throughput: parking_lot::RwLock::new(HashMap::default()), }; let instance = Arc::new(instance); instance.start_worker(rx).await; @@ -1663,7 +1666,7 @@ where }); } if !table_stats_change.is_empty() { - self.try_sched_split_group(table_stats_change); + self.collect_table_write_throughput(table_stats_change); } } if !modified_compaction_groups.is_empty() { @@ -1958,18 +1961,6 @@ where } } - /// Sends a compaction request to compaction scheduler. - pub fn try_sched_split_group(&self, stats: PbTableStatsMap) { - if let Some(sender) = self.compaction_request_channel.read().as_ref() { - if let Err(e) = sender.try_split_groups(stats) { - tracing::error!( - "failed to send compaction request for compaction group {}", - e - ); - } - } - } - pub async fn trigger_manual_compaction( &self, compaction_group: CompactionGroupId, @@ -2204,7 +2195,6 @@ where pin_mut!(event_stream); let shutdown_rx_shared = shutdown_rx.shared(); - let mut group_infos = HashMap::default(); tracing::info!( "Hummock timer task tracing [GroupSplit interval {} sec] [CheckDeadTask interval {} sec] [Report interval {} sec] [CompactionHeartBeat interval {} sec]", @@ -2232,9 +2222,7 @@ where continue; } - hummock_manager - .on_handle_check_split_multi_group(&mut group_infos) - .await; + hummock_manager.on_handle_check_split_multi_group().await; } HummockTimerEvent::Report => { @@ -2385,121 +2373,107 @@ where } } - async fn on_handle_check_split_multi_group( - &self, - history_table_infos: &mut HashMap>, - ) { - const HISTORY_TABLE_INFO_WINDOW_SIZE: usize = 4; + fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) { + let mut table_infos = self.history_table_throughput.write(); + for (table_id, stat) in table_stats { + let throughput = (stat.total_value_size + stat.total_key_size) as u64; + let entry = table_infos.entry(table_id).or_default(); + entry.push_back(throughput); + if entry.len() > HISTORY_TABLE_INFO_WINDOW_SIZE { + entry.pop_front(); + } + } + } + + async fn on_handle_check_split_multi_group(&self) { + let table_write_throughput = self.history_table_throughput.read().clone(); let mut group_infos = self.calculate_compaction_group_statistic().await; group_infos.sort_by_key(|group| group.group_size); group_infos.reverse(); let group_size_limit = self.env.opts.split_group_size_limit; let table_split_limit = self.env.opts.move_table_size_limit; - let mut table_infos = vec![]; - // TODO: support move small state-table back to default-group to reduce IOPS. - for group in &group_infos { - if group.table_statistic.len() == 1 || group.group_size < group_size_limit { - continue; - } - for (table_id, table_size) in &group.table_statistic { - let last_table_infos = history_table_infos - .entry(*table_id) - .or_insert_with(VecDeque::new); - last_table_infos.push_back(*table_size); - if last_table_infos.len() > HISTORY_TABLE_INFO_WINDOW_SIZE { - last_table_infos.pop_front(); - } - table_infos.push((*table_id, group.group_id, group.group_size)); - } - } - table_infos.sort_by(|a, b| b.2.cmp(&a.2)); let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); let mut partition_vnode_count = self.env.opts.partition_vnode_count; - for (table_id, parent_group_id, parent_group_size) in table_infos { - let table_info = history_table_infos.get(&table_id).unwrap(); - let table_size = *table_info.back().unwrap(); - if table_size < table_split_limit - || (table_size < group_size_limit - && table_info.len() < HISTORY_TABLE_INFO_WINDOW_SIZE) - { + for group in &group_infos { + if group.table_statistic.len() == 1 { continue; } - let mut target_compact_group_id = None; - let mut allow_split_by_table = false; - if table_size < group_size_limit { - let mut increase = true; - for idx in 1..table_info.len() { - if table_info[idx] < table_info[idx - 1] { - increase = false; - break; + for (table_id, table_size) in &group.table_statistic { + let mut is_high_write_throughput = false; + let mut is_low_write_throughput = true; + if let Some(history) = table_write_throughput.get(table_id) { + if history.len() >= HISTORY_TABLE_INFO_WINDOW_SIZE { + let window_total_size = history.iter().sum::(); + is_high_write_throughput = history.iter().all(|throughput| { + *throughput > self.env.opts.table_write_throughput_threshold + }); + is_low_write_throughput = window_total_size + < (HISTORY_TABLE_INFO_WINDOW_SIZE as u64) + * self.env.opts.min_table_split_write_throughput; } } - if !increase { + if *table_size < group_size_limit && !is_high_write_throughput { continue; } - // do not split a large table and a small table because it would increase IOPS of - // small table. - if parent_group_id != default_group_id && parent_group_id != mv_group_id { - let rest_group_size = parent_group_size - table_size; - if rest_group_size < table_size && rest_group_size < table_split_limit { - continue; - } - } - - let increase_data_size = table_size.saturating_sub(*table_info.front().unwrap()); - let increase_slow = increase_data_size < table_split_limit; - - // if the size of this table increases too fast, we shall create one group for it. - if increase_slow - && (parent_group_id == mv_group_id || parent_group_id == default_group_id) - { - for group in &group_infos { - // do not move to mv group or state group - if !group.split_by_table || group.group_id == mv_group_id - || group.group_id == default_group_id - || group.group_id == parent_group_id - // do not move state-table to a large group. - || group.group_size + table_size > group_size_limit - // do not move state-table from group A to group B if this operation would make group B becomes larger than A. - || group.group_size + table_size > parent_group_size - table_size - { + let parent_group_id = group.group_id; + let mut target_compact_group_id = None; + let mut allow_split_by_table = false; + if *table_size > group_size_limit && is_low_write_throughput { + // do not split a large table and a small table because it would increase IOPS + // of small table. + if parent_group_id != default_group_id && parent_group_id != mv_group_id { + let rest_group_size = group.group_size - *table_size; + if rest_group_size < *table_size && rest_group_size < table_split_limit { continue; } - target_compact_group_id = Some(group.group_id); + } else { + for group in &group_infos { + // do not move to mv group or state group + if !group.split_by_table || group.group_id == mv_group_id + || group.group_id == default_group_id + || group.group_id == parent_group_id + // do not move state-table to a large group. + || group.group_size + *table_size > group_size_limit + // do not move state-table from group A to group B if this operation would make group B becomes larger than A. + || group.group_size + *table_size > group.group_size - table_size + { + continue; + } + target_compact_group_id = Some(group.group_id); + } + allow_split_by_table = true; + partition_vnode_count = 1; } - allow_split_by_table = true; - partition_vnode_count = 1; } - } - let ret = self - .move_state_table_to_compaction_group( - parent_group_id, - &[table_id], - target_compact_group_id, - allow_split_by_table, - partition_vnode_count, - ) - .await; - match ret { - Ok(_) => { - tracing::info!( + let ret = self + .move_state_table_to_compaction_group( + parent_group_id, + &[*table_id], + target_compact_group_id, + allow_split_by_table, + partition_vnode_count, + ) + .await; + match ret { + Ok(_) => { + tracing::info!( "move state table [{}] from group-{} to group-{:?} success, Allow split by table: {}", table_id, parent_group_id, target_compact_group_id, allow_split_by_table ); - return; + return; + } + Err(e) => { + tracing::info!( + "failed to move state table [{}] from group-{} to group-{:?} because {:?}", + table_id, parent_group_id, target_compact_group_id, e + ) + } } - Err(e) => tracing::info!( - "failed to move state table [{}] from group-{} to group-{:?} because {:?}", - table_id, - parent_group_id, - target_compact_group_id, - e - ), } } } diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 57d4065d7483..1e5715079123 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -40,7 +40,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use crate::hummock::compaction::{ default_level_selector, LevelSelector, SpaceReclaimCompactionSelector, }; -use crate::hummock::compaction_scheduler::{CompactionRequestChannel, CompactionRequestItem}; +use crate::hummock::compaction_scheduler::CompactionRequestChannel; use crate::hummock::HummockManager; use crate::storage::MemStore; @@ -199,40 +199,30 @@ impl HummockMetaClient for MockHummockMetaClient { let hummock_manager_compact = self.hummock_manager.clone(); let (task_tx, task_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = tokio::spawn(async move { - while let Some(item) = sched_rx.recv().await { - match item { - CompactionRequestItem::Compact { - compaction_group, - task_type, - } => { - sched_channel.unschedule(compaction_group, task_type); - - let mut selector: Box = match task_type { - compact_task::TaskType::Dynamic => default_level_selector(), - compact_task::TaskType::SpaceReclaim => { - Box::::default() - } - - _ => panic!( - "Error type when mock_hummock_meta_client subscribe_compact_tasks" - ), - }; - if let Some(task) = hummock_manager_compact - .get_compact_task(compaction_group, &mut selector) - .await - .unwrap() - { - hummock_manager_compact - .assign_compaction_task(&task, context_id) - .await - .unwrap(); - let resp = SubscribeCompactTasksResponse { - task: Some(Task::CompactTask(task)), - }; - let _ = task_tx.send(Ok(resp)); - } + while let Some((group, task_type)) = sched_rx.recv().await { + sched_channel.unschedule(group, task_type); + + let mut selector: Box = match task_type { + compact_task::TaskType::Dynamic => default_level_selector(), + compact_task::TaskType::SpaceReclaim => { + Box::::default() } - CompactionRequestItem::SplitLargeGroup(_) => (), + + _ => panic!("Error type when mock_hummock_meta_client subscribe_compact_tasks"), + }; + if let Some(task) = hummock_manager_compact + .get_compact_task(group, &mut selector) + .await + .unwrap() + { + hummock_manager_compact + .assign_compaction_task(&task, context_id) + .await + .unwrap(); + let resp = SubscribeCompactTasksResponse { + task: Some(Task::CompactTask(task)), + }; + let _ = task_tx.send(Ok(resp)); } } }); diff --git a/src/meta/src/hummock/mod.rs b/src/meta/src/hummock/mod.rs index ace54548a586..285440b177f1 100644 --- a/src/meta/src/hummock/mod.rs +++ b/src/meta/src/hummock/mod.rs @@ -41,7 +41,7 @@ use tokio::task::JoinHandle; pub use vacuum::*; pub use crate::hummock::compaction_scheduler::{ - CompactionRequestChannelRef, CompactionRequestItem, CompactionSchedulerRef, + CompactionRequestChannelRef, CompactionSchedulerRef, }; use crate::storage::MetaStore; use crate::MetaOpts;