From 56f40111fd399f2a0fc7a6356d24026b5141565a Mon Sep 17 00:00:00 2001 From: Yuhao Su <31772373+yuhao-su@users.noreply.github.com> Date: Thu, 15 Jun 2023 16:27:28 +0800 Subject: [PATCH] feat(metrics): add memory usage metrics for more executor (#10351) --- src/stream/src/cache/managed_lru.rs | 65 ++++++++----------- .../src/executor/aggregation/distinct.rs | 4 +- .../src/executor/dedup/append_only_dedup.rs | 10 ++- src/stream/src/executor/dedup/cache.rs | 8 ++- src/stream/src/executor/lookup/cache.rs | 5 +- src/stream/src/executor/lookup/impl_.rs | 10 ++- src/stream/src/executor/mview/materialize.rs | 42 +++++------- src/stream/src/executor/over_window/eowc.rs | 10 ++- src/stream/src/executor/top_n/group_top_n.rs | 14 +++- .../executor/top_n/group_top_n_appendonly.rs | 10 ++- .../src/from_proto/append_only_dedup.rs | 1 + 11 files changed, 101 insertions(+), 78 deletions(-) diff --git a/src/stream/src/cache/managed_lru.rs b/src/stream/src/cache/managed_lru.rs index d36f34bda2a4..2ae702849a5a 100644 --- a/src/stream/src/cache/managed_lru.rs +++ b/src/stream/src/cache/managed_lru.rs @@ -39,24 +39,22 @@ pub struct ManagedLruCache, + memory_usage_metrics: IntGauge, // Metrics info - metrics_info: Option, + metrics_info: MetricsInfo, /// The size reported last time last_reported_size_bytes: usize, } impl Drop for ManagedLruCache { fn drop(&mut self) { - if let Some(metrics) = &self.memory_usage_metrics { - metrics.set(0.into()); - } - if let Some(info) = &self.metrics_info { - info.metrics - .stream_memory_usage - .remove_label_values(&[&info.table_id, &info.actor_id, &info.desc]) - .unwrap(); - } + let info = &self.metrics_info; + self.memory_usage_metrics.set(0.into()); + + info.metrics + .stream_memory_usage + .remove_label_values(&[&info.table_id, &info.actor_id, &info.desc]) + .unwrap(); } } @@ -66,15 +64,16 @@ impl, watermark_epoch: Arc, - metrics_info: Option, + metrics_info: MetricsInfo, ) -> Self { - let memory_usage_metrics = metrics_info.as_ref().map(|info| { - info.metrics.stream_memory_usage.with_label_values(&[ - &info.table_id, - &info.actor_id, - &info.desc, - ]) - }); + let memory_usage_metrics = metrics_info + .metrics + .stream_memory_usage + .with_label_values(&[ + &metrics_info.table_id, + &metrics_info.actor_id, + &metrics_info.desc, + ]); Self { inner, @@ -219,9 +218,7 @@ impl REPORT_SIZE_EVERY_N_KB_CHANGE << 10 { - if let Some(metrics) = self.memory_usage_metrics.as_ref() { - metrics.set(self.kv_heap_size as _); - } + self.memory_usage_metrics.set(self.kv_heap_size as _); self.last_reported_size_bytes = self.kv_heap_size; true } else { @@ -232,15 +229,9 @@ impl( watermark_epoch: Arc, -) -> ManagedLruCache { - ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, None) -} - -pub fn new_unbounded_with_metrics( - watermark_epoch: Arc, metrics_info: MetricsInfo, ) -> ManagedLruCache { - ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, Some(metrics_info)) + ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, metrics_info) } pub fn new_with_hasher_in< @@ -257,7 +248,7 @@ pub fn new_with_hasher_in< ManagedLruCache::new_inner( LruCache::unbounded_with_hasher_in(hasher, alloc), watermark_epoch, - Some(metrics_info), + metrics_info, ) } @@ -269,7 +260,7 @@ pub fn new_with_hasher { // The total size of a collection total_size: &'a mut usize, last_reported_size_bytes: &'a mut usize, - memory_usage_metrics: &'a mut Option, + memory_usage_metrics: &'a mut IntGauge, } impl<'a, V: EstimateSize> MutGuard<'a, V> { @@ -288,7 +279,7 @@ impl<'a, V: EstimateSize> MutGuard<'a, V> { inner: &'a mut V, total_size: &'a mut usize, last_reported_size_bytes: &'a mut usize, - memory_usage_metrics: &'a mut Option, + memory_usage_metrics: &'a mut IntGauge, ) -> Self { let original_val_size = inner.estimated_size(); Self { @@ -304,9 +295,7 @@ impl<'a, V: EstimateSize> MutGuard<'a, V> { if self.total_size.abs_diff(*self.last_reported_size_bytes) > REPORT_SIZE_EVERY_N_KB_CHANGE << 10 { - if let Some(metrics) = self.memory_usage_metrics.as_ref() { - metrics.set(*self.total_size as _); - } + self.memory_usage_metrics.set(*self.total_size as _); *self.last_reported_size_bytes = *self.total_size; true } else { @@ -346,7 +335,7 @@ pub struct UnsafeMutGuard { // The total size of a collection total_size: NonNull, last_reported_size_bytes: NonNull, - memory_usage_metrics: NonNull>, + memory_usage_metrics: NonNull, } impl UnsafeMutGuard { @@ -354,7 +343,7 @@ impl UnsafeMutGuard { inner: &mut V, total_size: &mut usize, last_reported_size_bytes: &mut usize, - memory_usage_metrics: &mut Option, + memory_usage_metrics: &mut IntGauge, ) -> Self { let original_val_size = inner.estimated_size(); Self { diff --git a/src/stream/src/executor/aggregation/distinct.rs b/src/stream/src/executor/aggregation/distinct.rs index 88353fcc29a8..13a32dedfb5d 100644 --- a/src/stream/src/executor/aggregation/distinct.rs +++ b/src/stream/src/executor/aggregation/distinct.rs @@ -26,7 +26,7 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_storage::StateStore; use super::{AggCall, GroupKey}; -use crate::cache::{new_unbounded_with_metrics, ManagedLruCache}; +use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::executor::monitor::StreamingMetrics; @@ -45,7 +45,7 @@ struct ColumnDeduplicater { impl ColumnDeduplicater { fn new(watermark_epoch: &Arc, metrics_info: MetricsInfo) -> Self { Self { - cache: new_unbounded_with_metrics(watermark_epoch.clone(), metrics_info.clone()), + cache: new_unbounded(watermark_epoch.clone(), metrics_info.clone()), metrics_info, _phantom: PhantomData, } diff --git a/src/stream/src/executor/dedup/append_only_dedup.rs b/src/stream/src/executor/dedup/append_only_dedup.rs index 255e6385e8ab..890b1e4ac997 100644 --- a/src/stream/src/executor/dedup/append_only_dedup.rs +++ b/src/stream/src/executor/dedup/append_only_dedup.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use futures::{stream, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; @@ -22,8 +24,10 @@ use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_storage::StateStore; use super::cache::DedupCache; +use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::executor::error::StreamExecutorError; +use crate::executor::monitor::StreamingMetrics; use crate::executor::{ expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, Message, PkIndices, PkIndicesRef, StreamExecutorResult, @@ -51,12 +55,15 @@ impl AppendOnlyDedupExecutor { executor_id: u64, ctx: ActorContextRef, watermark_epoch: AtomicU64Ref, + metrics: Arc, ) -> Self { let schema = input.schema().clone(); + let metrics_info = + MetricsInfo::new(metrics, state_table.table_id(), ctx.id, "AppendOnly Dedup"); Self { input: Some(input), state_table, - cache: DedupCache::new(watermark_epoch), + cache: DedupCache::new(watermark_epoch, metrics_info), pk_indices, identity: format!("AppendOnlyDedupExecutor {:X}", executor_id), schema, @@ -259,6 +266,7 @@ mod tests { 1, ActorContext::create(123), Arc::new(AtomicU64::new(0)), + Arc::new(StreamingMetrics::unused()), )) .execute(); diff --git a/src/stream/src/executor/dedup/cache.rs b/src/stream/src/executor/dedup/cache.rs index bf74f690d304..583af55c72c7 100644 --- a/src/stream/src/executor/dedup/cache.rs +++ b/src/stream/src/executor/dedup/cache.rs @@ -17,6 +17,7 @@ use std::hash::Hash; use risingwave_common::estimate_size::EstimateSize; use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::common::metrics::MetricsInfo; use crate::task::AtomicU64Ref; /// [`DedupCache`] is used for key deduplication. Currently, the cache behaves like a set that only @@ -26,8 +27,8 @@ pub struct DedupCache { } impl DedupCache { - pub fn new(watermark_epoch: AtomicU64Ref) -> Self { - let cache = new_unbounded(watermark_epoch); + pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { + let cache = new_unbounded(watermark_epoch, metrics_info); Self { inner: cache } } @@ -69,10 +70,11 @@ mod tests { use std::sync::Arc; use super::DedupCache; + use crate::common::metrics::MetricsInfo; #[test] fn test_dedup_cache() { - let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000))); + let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)), MetricsInfo::for_test()); cache.insert(10); assert!(cache.contains(&10)); diff --git a/src/stream/src/executor/lookup/cache.rs b/src/stream/src/executor/lookup/cache.rs index 77182a8e69df..48d01f2755a2 100644 --- a/src/stream/src/executor/lookup/cache.rs +++ b/src/stream/src/executor/lookup/cache.rs @@ -19,6 +19,7 @@ use risingwave_common::estimate_size::{EstimateSize, KvSize, VecWithKvSize}; use risingwave_common::row::{OwnedRow, Row, RowExt}; use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::common::metrics::MetricsInfo; use crate::task::AtomicU64Ref; /// A cache for lookup's arrangement side. @@ -73,8 +74,8 @@ impl LookupCache { self.data.clear(); } - pub fn new(watermark_epoch: AtomicU64Ref) -> Self { - let cache = new_unbounded(watermark_epoch); + pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { + let cache = new_unbounded(watermark_epoch, metrics_info); Self { data: cache } } } diff --git a/src/stream/src/executor/lookup/impl_.rs b/src/stream/src/executor/lookup/impl_.rs index 24a201c37f2e..866aca20e975 100644 --- a/src/stream/src/executor/lookup/impl_.rs +++ b/src/stream/src/executor/lookup/impl_.rs @@ -30,6 +30,7 @@ use risingwave_storage::StateStore; use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch}; use crate::cache::cache_may_stale; +use crate::common::metrics::MetricsInfo; use crate::common::StreamChunkBuilder; use crate::executor::error::{StreamExecutorError, StreamExecutorResult}; use crate::executor::lookup::cache::LookupCache; @@ -206,6 +207,13 @@ impl LookupExecutor { "mismatched output schema" ); + let metrics_info = MetricsInfo::new( + ctx.streaming_metrics.clone(), + storage_table.table_id().table_id(), + ctx.id, + "Lookup", + ); + Self { ctx, chunk_data_types, @@ -230,7 +238,7 @@ impl LookupExecutor { }, column_mapping, key_indices_mapping, - lookup_cache: LookupCache::new(watermark_epoch), + lookup_cache: LookupCache::new(watermark_epoch, metrics_info), chunk_size, } } diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index a05a218350ef..cc59fc5e8bfa 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -38,6 +38,7 @@ use risingwave_storage::mem_table::KeyOp; use risingwave_storage::StateStore; use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTableInner; use crate::executor::error::StreamExecutorError; use crate::executor::monitor::StreamingMetrics; @@ -97,8 +98,9 @@ impl MaterializeExecutor { StateTableInner::from_table_catalog(table_catalog, store, vnodes).await }; - let actor_id = actor_context.id; - let table_id = table_catalog.id; + let metrics_info = + MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize"); + Self { input, state_table, @@ -109,7 +111,7 @@ impl MaterializeExecutor { pk_indices: arrange_columns, identity: format!("MaterializeExecutor {:X}", executor_id), }, - materialize_cache: MaterializeCache::new(watermark_epoch, metrics, actor_id, table_id), + materialize_cache: MaterializeCache::new(watermark_epoch, metrics_info), conflict_behavior, } } @@ -235,12 +237,7 @@ impl MaterializeExecutor { pk_indices: arrange_columns, identity: format!("MaterializeExecutor {:X}", executor_id), }, - materialize_cache: MaterializeCache::new( - watermark_epoch, - Arc::new(StreamingMetrics::unused()), - 0, - 0, - ), + materialize_cache: MaterializeCache::new(watermark_epoch, MetricsInfo::for_test()), conflict_behavior, } } @@ -436,10 +433,8 @@ impl std::fmt::Debug for MaterializeExecutor { data: ManagedLruCache, CacheValue>, + metrics_info: MetricsInfo, _serde: PhantomData, - metrics: Arc, - actor_id: String, - table_id: String, } #[derive(EnumAsInner, EstimateSize)] @@ -451,19 +446,12 @@ pub enum CacheValue { type EmptyValue = (); impl MaterializeCache { - pub fn new( - watermark_epoch: AtomicU64Ref, - metrics: Arc, - actor_id: u32, - table_id: u32, - ) -> Self { - let cache = new_unbounded(watermark_epoch); + pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { + let cache = new_unbounded(watermark_epoch, metrics_info.clone()); Self { data: cache, + metrics_info, _serde: PhantomData, - metrics, - actor_id: actor_id.to_string(), - table_id: table_id.to_string(), } } @@ -604,14 +592,16 @@ impl MaterializeCache { ) -> StreamExecutorResult<()> { let mut futures = vec![]; for key in keys { - self.metrics + self.metrics_info + .metrics .materialize_cache_total_count - .with_label_values(&[&self.table_id, &self.actor_id]) + .with_label_values(&[&self.metrics_info.table_id, &self.metrics_info.actor_id]) .inc(); if self.data.contains(key) { - self.metrics + self.metrics_info + .metrics .materialize_cache_hit_count - .with_label_values(&[&self.table_id, &self.actor_id]) + .with_label_values(&[&self.metrics_info.table_id, &self.metrics_info.actor_id]) .inc(); continue; } diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index 87cf3cbbde99..8b9028ab25b5 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -33,6 +33,7 @@ use risingwave_storage::StateStore; use super::state::{create_window_state, EstimatedVecDeque, WindowState}; use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::executor::over_window::state::{StateEvictHint, StateKey}; use crate::executor::{ @@ -413,8 +414,15 @@ impl EowcOverWindowExecutor { inner: mut this, } = self; + let metrics_info = MetricsInfo::new( + this.actor_ctx.streaming_metrics.clone(), + this.state_table.table_id(), + this.actor_ctx.id, + "EowcOverWindow", + ); + let mut vars = ExecutionVars { - partitions: new_unbounded(this.watermark_epoch.clone()), + partitions: new_unbounded(this.watermark_epoch.clone(), metrics_info), _phantom: PhantomData::, }; diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 4d65a68c07fe..421b1141843a 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -29,6 +29,7 @@ use super::top_n_cache::TopNCacheTrait; use super::utils::*; use super::{ManagedTopNState, TopNCache}; use crate::cache::{new_unbounded, ManagedLruCache}; +use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; use crate::executor::error::StreamExecutorResult; @@ -113,6 +114,13 @@ impl InnerGroupTopNExecutor::new(state_table, cache_key_serde.clone()); @@ -127,7 +135,7 @@ impl InnerGroupTopNExecutor { } impl GroupTopNCache { - pub fn new(watermark_epoch: AtomicU64Ref) -> Self { - let cache = new_unbounded(watermark_epoch); + pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self { + let cache = new_unbounded(watermark_epoch, metrics_info); Self { data: cache } } } diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index f0460f459b7c..ceb4a3bca4d4 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -42,6 +42,7 @@ use super::group_top_n::GroupTopNCache; use super::top_n_cache::AppendOnlyTopNCacheTrait; use super::utils::*; use super::{ManagedTopNState, TopNCache}; +use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::StateTable; use crate::error::StreamResult; use crate::executor::error::StreamExecutorResult; @@ -133,6 +134,13 @@ impl pk_indices, schema, .. } = input_info; + let metrics_info = MetricsInfo::new( + ctx.streaming_metrics.clone(), + state_table.table_id(), + ctx.id, + "GroupTopN", + ); + let cache_key_serde = create_cache_key_serde(&storage_key, &schema, &order_by, &group_by); let managed_state = ManagedTopNState::::new(state_table, cache_key_serde.clone()); @@ -147,7 +155,7 @@ impl managed_state, storage_key_indices: storage_key.into_iter().map(|op| op.column_index).collect(), group_by, - caches: GroupTopNCache::new(watermark_epoch), + caches: GroupTopNCache::new(watermark_epoch, metrics_info), cache_key_serde, ctx, }) diff --git a/src/stream/src/from_proto/append_only_dedup.rs b/src/stream/src/from_proto/append_only_dedup.rs index 2559ed018c37..f8c78532dc66 100644 --- a/src/stream/src/from_proto/append_only_dedup.rs +++ b/src/stream/src/from_proto/append_only_dedup.rs @@ -52,6 +52,7 @@ impl ExecutorBuilder for AppendOnlyDedupExecutorBuilder { params.executor_id, params.actor_context, stream.get_watermark_epoch(), + stream.streaming_metrics.clone(), ))) } }