Skip to content

Commit

Permalink
feat(metrics): add memory usage metrics for more executor (#10351)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Jun 15, 2023
1 parent ea7f95b commit 56f4011
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 78 deletions.
65 changes: 27 additions & 38 deletions src/stream/src/cache/managed_lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,22 @@ pub struct ManagedLruCache<K, V, S = DefaultHasher, A: Clone + Allocator = Globa
/// The heap size of keys/values
kv_heap_size: usize,
/// The metrics of memory usage
memory_usage_metrics: Option<IntGauge>,
memory_usage_metrics: IntGauge,
// Metrics info
metrics_info: Option<MetricsInfo>,
metrics_info: MetricsInfo,
/// The size reported last time
last_reported_size_bytes: usize,
}

impl<K, V, S, A: Clone + Allocator> Drop for ManagedLruCache<K, V, S, A> {
fn drop(&mut self) {
if let Some(metrics) = &self.memory_usage_metrics {
metrics.set(0.into());
}
if let Some(info) = &self.metrics_info {
info.metrics
.stream_memory_usage
.remove_label_values(&[&info.table_id, &info.actor_id, &info.desc])
.unwrap();
}
let info = &self.metrics_info;
self.memory_usage_metrics.set(0.into());

info.metrics
.stream_memory_usage
.remove_label_values(&[&info.table_id, &info.actor_id, &info.desc])
.unwrap();
}
}

Expand All @@ -66,15 +64,16 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
pub fn new_inner(
inner: LruCache<K, V, S, A>,
watermark_epoch: Arc<AtomicU64>,
metrics_info: Option<MetricsInfo>,
metrics_info: MetricsInfo,
) -> Self {
let memory_usage_metrics = metrics_info.as_ref().map(|info| {
info.metrics.stream_memory_usage.with_label_values(&[
&info.table_id,
&info.actor_id,
&info.desc,
])
});
let memory_usage_metrics = metrics_info
.metrics
.stream_memory_usage
.with_label_values(&[
&metrics_info.table_id,
&metrics_info.actor_id,
&metrics_info.desc,
]);

Self {
inner,
Expand Down Expand Up @@ -219,9 +218,7 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
if self.kv_heap_size.abs_diff(self.last_reported_size_bytes)
> REPORT_SIZE_EVERY_N_KB_CHANGE << 10
{
if let Some(metrics) = self.memory_usage_metrics.as_ref() {
metrics.set(self.kv_heap_size as _);
}
self.memory_usage_metrics.set(self.kv_heap_size as _);
self.last_reported_size_bytes = self.kv_heap_size;
true
} else {
Expand All @@ -232,15 +229,9 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al

pub fn new_unbounded<K: Hash + Eq + EstimateSize, V: EstimateSize>(
watermark_epoch: Arc<AtomicU64>,
) -> ManagedLruCache<K, V> {
ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, None)
}

pub fn new_unbounded_with_metrics<K: Hash + Eq + EstimateSize, V: EstimateSize>(
watermark_epoch: Arc<AtomicU64>,
metrics_info: MetricsInfo,
) -> ManagedLruCache<K, V> {
ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, Some(metrics_info))
ManagedLruCache::new_inner(LruCache::unbounded(), watermark_epoch, metrics_info)
}

pub fn new_with_hasher_in<
Expand All @@ -257,7 +248,7 @@ pub fn new_with_hasher_in<
ManagedLruCache::new_inner(
LruCache::unbounded_with_hasher_in(hasher, alloc),
watermark_epoch,
Some(metrics_info),
metrics_info,
)
}

Expand All @@ -269,7 +260,7 @@ pub fn new_with_hasher<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHas
ManagedLruCache::new_inner(
LruCache::unbounded_with_hasher(hasher),
watermark_epoch,
Some(metrics_info),
metrics_info,
)
}

Expand All @@ -280,15 +271,15 @@ pub struct MutGuard<'a, V: EstimateSize> {
// The total size of a collection
total_size: &'a mut usize,
last_reported_size_bytes: &'a mut usize,
memory_usage_metrics: &'a mut Option<IntGauge>,
memory_usage_metrics: &'a mut IntGauge,
}

impl<'a, V: EstimateSize> MutGuard<'a, V> {
pub fn new(
inner: &'a mut V,
total_size: &'a mut usize,
last_reported_size_bytes: &'a mut usize,
memory_usage_metrics: &'a mut Option<IntGauge>,
memory_usage_metrics: &'a mut IntGauge,
) -> Self {
let original_val_size = inner.estimated_size();
Self {
Expand All @@ -304,9 +295,7 @@ impl<'a, V: EstimateSize> MutGuard<'a, V> {
if self.total_size.abs_diff(*self.last_reported_size_bytes)
> REPORT_SIZE_EVERY_N_KB_CHANGE << 10
{
if let Some(metrics) = self.memory_usage_metrics.as_ref() {
metrics.set(*self.total_size as _);
}
self.memory_usage_metrics.set(*self.total_size as _);
*self.last_reported_size_bytes = *self.total_size;
true
} else {
Expand Down Expand Up @@ -346,15 +335,15 @@ pub struct UnsafeMutGuard<V: EstimateSize> {
// The total size of a collection
total_size: NonNull<usize>,
last_reported_size_bytes: NonNull<usize>,
memory_usage_metrics: NonNull<Option<IntGauge>>,
memory_usage_metrics: NonNull<IntGauge>,
}

impl<V: EstimateSize> UnsafeMutGuard<V> {
pub fn new(
inner: &mut V,
total_size: &mut usize,
last_reported_size_bytes: &mut usize,
memory_usage_metrics: &mut Option<IntGauge>,
memory_usage_metrics: &mut IntGauge,
) -> Self {
let original_val_size = inner.estimated_size();
Self {
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/aggregation/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_storage::StateStore;

use super::{AggCall, GroupKey};
use crate::cache::{new_unbounded_with_metrics, ManagedLruCache};
use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTable;
use crate::executor::monitor::StreamingMetrics;
Expand All @@ -45,7 +45,7 @@ struct ColumnDeduplicater<S: StateStore> {
impl<S: StateStore> ColumnDeduplicater<S> {
fn new(watermark_epoch: &Arc<AtomicU64>, metrics_info: MetricsInfo) -> Self {
Self {
cache: new_unbounded_with_metrics(watermark_epoch.clone(), metrics_info.clone()),
cache: new_unbounded(watermark_epoch.clone(), metrics_info.clone()),
metrics_info,
_phantom: PhantomData,
}
Expand Down
10 changes: 9 additions & 1 deletion src/stream/src/executor/dedup/append_only_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use futures::{stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
Expand All @@ -22,8 +24,10 @@ use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_storage::StateStore;

use super::cache::DedupCache;
use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, Message,
PkIndices, PkIndicesRef, StreamExecutorResult,
Expand Down Expand Up @@ -51,12 +55,15 @@ impl<S: StateStore> AppendOnlyDedupExecutor<S> {
executor_id: u64,
ctx: ActorContextRef,
watermark_epoch: AtomicU64Ref,
metrics: Arc<StreamingMetrics>,
) -> Self {
let schema = input.schema().clone();
let metrics_info =
MetricsInfo::new(metrics, state_table.table_id(), ctx.id, "AppendOnly Dedup");
Self {
input: Some(input),
state_table,
cache: DedupCache::new(watermark_epoch),
cache: DedupCache::new(watermark_epoch, metrics_info),
pk_indices,
identity: format!("AppendOnlyDedupExecutor {:X}", executor_id),
schema,
Expand Down Expand Up @@ -259,6 +266,7 @@ mod tests {
1,
ActorContext::create(123),
Arc::new(AtomicU64::new(0)),
Arc::new(StreamingMetrics::unused()),
))
.execute();

Expand Down
8 changes: 5 additions & 3 deletions src/stream/src/executor/dedup/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::hash::Hash;
use risingwave_common::estimate_size::EstimateSize;

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::task::AtomicU64Ref;

/// [`DedupCache`] is used for key deduplication. Currently, the cache behaves like a set that only
Expand All @@ -26,8 +27,8 @@ pub struct DedupCache<K: Hash + Eq + EstimateSize> {
}

impl<K: Hash + Eq + EstimateSize> DedupCache<K> {
pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
let cache = new_unbounded(watermark_epoch);
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
let cache = new_unbounded(watermark_epoch, metrics_info);
Self { inner: cache }
}

Expand Down Expand Up @@ -69,10 +70,11 @@ mod tests {
use std::sync::Arc;

use super::DedupCache;
use crate::common::metrics::MetricsInfo;

#[test]
fn test_dedup_cache() {
let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)));
let mut cache = DedupCache::new(Arc::new(AtomicU64::new(10000)), MetricsInfo::for_test());

cache.insert(10);
assert!(cache.contains(&10));
Expand Down
5 changes: 3 additions & 2 deletions src/stream/src/executor/lookup/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use risingwave_common::estimate_size::{EstimateSize, KvSize, VecWithKvSize};
use risingwave_common::row::{OwnedRow, Row, RowExt};

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::task::AtomicU64Ref;

/// A cache for lookup's arrangement side.
Expand Down Expand Up @@ -73,8 +74,8 @@ impl LookupCache {
self.data.clear();
}

pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
let cache = new_unbounded(watermark_epoch);
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
let cache = new_unbounded(watermark_epoch, metrics_info);
Self { data: cache }
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/stream/src/executor/lookup/impl_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use risingwave_storage::StateStore;

use super::sides::{stream_lookup_arrange_prev_epoch, stream_lookup_arrange_this_epoch};
use crate::cache::cache_may_stale;
use crate::common::metrics::MetricsInfo;
use crate::common::StreamChunkBuilder;
use crate::executor::error::{StreamExecutorError, StreamExecutorResult};
use crate::executor::lookup::cache::LookupCache;
Expand Down Expand Up @@ -206,6 +207,13 @@ impl<S: StateStore> LookupExecutor<S> {
"mismatched output schema"
);

let metrics_info = MetricsInfo::new(
ctx.streaming_metrics.clone(),
storage_table.table_id().table_id(),
ctx.id,
"Lookup",
);

Self {
ctx,
chunk_data_types,
Expand All @@ -230,7 +238,7 @@ impl<S: StateStore> LookupExecutor<S> {
},
column_mapping,
key_indices_mapping,
lookup_cache: LookupCache::new(watermark_epoch),
lookup_cache: LookupCache::new(watermark_epoch, metrics_info),
chunk_size,
}
}
Expand Down
42 changes: 16 additions & 26 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use risingwave_storage::mem_table::KeyOp;
use risingwave_storage::StateStore;

use crate::cache::{new_unbounded, ManagedLruCache};
use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTableInner;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
Expand Down Expand Up @@ -97,8 +98,9 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
StateTableInner::from_table_catalog(table_catalog, store, vnodes).await
};

let actor_id = actor_context.id;
let table_id = table_catalog.id;
let metrics_info =
MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");

Self {
input,
state_table,
Expand All @@ -109,7 +111,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
pk_indices: arrange_columns,
identity: format!("MaterializeExecutor {:X}", executor_id),
},
materialize_cache: MaterializeCache::new(watermark_epoch, metrics, actor_id, table_id),
materialize_cache: MaterializeCache::new(watermark_epoch, metrics_info),
conflict_behavior,
}
}
Expand Down Expand Up @@ -235,12 +237,7 @@ impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
pk_indices: arrange_columns,
identity: format!("MaterializeExecutor {:X}", executor_id),
},
materialize_cache: MaterializeCache::new(
watermark_epoch,
Arc::new(StreamingMetrics::unused()),
0,
0,
),
materialize_cache: MaterializeCache::new(watermark_epoch, MetricsInfo::for_test()),
conflict_behavior,
}
}
Expand Down Expand Up @@ -436,10 +433,8 @@ impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S
/// A cache for materialize executors.
pub struct MaterializeCache<SD> {
data: ManagedLruCache<Vec<u8>, CacheValue>,
metrics_info: MetricsInfo,
_serde: PhantomData<SD>,
metrics: Arc<StreamingMetrics>,
actor_id: String,
table_id: String,
}

#[derive(EnumAsInner, EstimateSize)]
Expand All @@ -451,19 +446,12 @@ pub enum CacheValue {
type EmptyValue = ();

impl<SD: ValueRowSerde> MaterializeCache<SD> {
pub fn new(
watermark_epoch: AtomicU64Ref,
metrics: Arc<StreamingMetrics>,
actor_id: u32,
table_id: u32,
) -> Self {
let cache = new_unbounded(watermark_epoch);
pub fn new(watermark_epoch: AtomicU64Ref, metrics_info: MetricsInfo) -> Self {
let cache = new_unbounded(watermark_epoch, metrics_info.clone());
Self {
data: cache,
metrics_info,
_serde: PhantomData,
metrics,
actor_id: actor_id.to_string(),
table_id: table_id.to_string(),
}
}

Expand Down Expand Up @@ -604,14 +592,16 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
) -> StreamExecutorResult<()> {
let mut futures = vec![];
for key in keys {
self.metrics
self.metrics_info
.metrics
.materialize_cache_total_count
.with_label_values(&[&self.table_id, &self.actor_id])
.with_label_values(&[&self.metrics_info.table_id, &self.metrics_info.actor_id])
.inc();
if self.data.contains(key) {
self.metrics
self.metrics_info
.metrics
.materialize_cache_hit_count
.with_label_values(&[&self.table_id, &self.actor_id])
.with_label_values(&[&self.metrics_info.table_id, &self.metrics_info.actor_id])
.inc();
continue;
}
Expand Down
Loading

0 comments on commit 56f4011

Please sign in to comment.