Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding two level hashing in metrics hashmap #1564

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
98db4f3
initial commiy
lalitb Feb 21, 2024
1c440e0
handle empty resource
lalitb Feb 21, 2024
75c853e
fix lint
lalitb Feb 21, 2024
fd858f6
use static vector at first level hash
lalitb Feb 23, 2024
1f05dcf
add method to calculate data point size
lalitb Feb 24, 2024
d191cf7
lint error
lalitb Feb 24, 2024
fd94caa
add hashbrown and ahash as optional dependency
lalitb Feb 24, 2024
da817b5
Merge branch 'main' into two-level-hash
lalitb Feb 24, 2024
498f088
Merge branch 'main' into two-level-hash
lalitb Feb 24, 2024
2930fe1
add const for bucket count
lalitb Feb 26, 2024
ebe4a38
Merge branch 'main' into two-level-hash
lalitb Feb 28, 2024
d8cbc4c
remove panic while converting vector to fixed array
lalitb Feb 28, 2024
cad1391
Merge branch 'main' into two-level-hash
lalitb Feb 29, 2024
61ae262
remove special handling of empty attributes
lalitb Feb 29, 2024
2b8549f
Merge branch 'main' into two-level-hash
lalitb Feb 29, 2024
44efee7
graceful unwrap
lalitb Mar 1, 2024
d8c56da
lint error, and disclaimer for hashbrown
lalitb Mar 1, 2024
7b0bac5
Merge branch 'main' into two-level-hash
lalitb Mar 1, 2024
61c8b0d
update hashbrown usage disclaimer
lalitb Mar 4, 2024
267e305
fix for overflow metrics
lalitb Mar 5, 2024
078c994
add tests
lalitb Mar 7, 2024
cc12da1
Merge branch 'main' into two-level-hash
lalitb Mar 7, 2024
90bbb2d
remove leftover method
lalitb Mar 7, 2024
f89c3ea
fix atomic
lalitb Mar 7, 2024
603305e
fix
lalitb Mar 7, 2024
21b0b3e
more comments
lalitb Mar 7, 2024
f33729c
fix race condition for concurrent same attribute insert
lalitb Mar 9, 2024
8b48564
fix lint
lalitb Mar 9, 2024
04c68c2
Merge branch 'main' into two-level-hash
lalitb Mar 9, 2024
fdb5020
Merge branch 'main' into two-level-hash
lalitb Mar 19, 2024
d973c4d
Merge branch 'main' into two-level-hash
hdost Apr 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ rust-version = "1.65"
[dependencies]
opentelemetry = { version = "0.21", path = "../opentelemetry/" }
opentelemetry-http = { version = "0.10", path = "../opentelemetry-http", optional = true }
ahash = { version = "0.8", optional = true }
async-std = { workspace = true, features = ["unstable"], optional = true }
async-trait = { workspace = true, optional = true }
crossbeam-channel = { version = "0.5", optional = true }
futures-channel = "0.3"
futures-executor = { workspace = true }
futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] }
hashbrown = { version = "0.14", optional = true }
once_cell = { workspace = true }
ordered-float = { workspace = true }
percent-encoding = { version = "2.0", optional = true }
Expand Down Expand Up @@ -54,6 +56,7 @@ testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std",
rt-tokio = ["tokio", "tokio-stream"]
rt-tokio-current-thread = ["tokio", "tokio-stream"]
rt-async-std = ["async-std"]
use_hashbrown = ["hashbrown", "ahash"]
cijothomas marked this conversation as resolved.
Show resolved Hide resolved

[[bench]]
name = "context"
Expand Down
250 changes: 149 additions & 101 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Mutex,
sync::{Arc, Mutex},
time::SystemTime,
};

use crate::attributes::AttributeSet;
use crate::metrics::data::{self, Aggregation, DataPoint, Temporality};
use opentelemetry::{global, metrics::MetricsError};
use std::hash::{Hash, Hasher};

#[cfg(feature = "use_hashbrown")]
use ahash::AHasher;
#[cfg(feature = "use_hashbrown")]
use hashbrown::{hash_map::Entry, HashMap};

#[cfg(not(feature = "use_hashbrown"))]
use std::collections::{
hash_map::{DefaultHasher, Entry},
HashMap,
};

use super::{
aggregate::{is_under_cardinality_limit, STREAM_OVERFLOW_ATTRIBUTE_SET},
AtomicTracker, Number,
};

type BucketValue<T> = Mutex<Option<HashMap<AttributeSet, T>>>;
type Buckets<T> = Arc<[BucketValue<T>; 256]>;
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
/// The storage for sums.
struct ValueMap<T: Number<T>> {
values: Mutex<HashMap<AttributeSet, T>>,
buckets: Buckets<T>,
has_no_value_attribute_value: AtomicBool,
no_attribute_value: T::AtomicTracker,
}
Expand All @@ -29,12 +42,41 @@ impl<T: Number<T>> Default for ValueMap<T> {

impl<T: Number<T>> ValueMap<T> {
fn new() -> Self {
let buckets = std::iter::repeat_with(|| Mutex::new(None))
.take(256)
.collect::<Vec<_>>()
.try_into()
.unwrap_or_else(|_| panic!("Incorrect length"));
lalitb marked this conversation as resolved.
Show resolved Hide resolved

ValueMap {
values: Mutex::new(HashMap::new()),
buckets: Arc::new(buckets),
has_no_value_attribute_value: AtomicBool::new(false),
no_attribute_value: T::new_atomic_tracker(),
}
}

// Hash function to determine the bucket
fn hash_to_bucket(key: &AttributeSet) -> u8 {
#[cfg(not(feature = "use_hashbrown"))]
let mut hasher = DefaultHasher::new();
#[cfg(feature = "use_hashbrown")]
let mut hasher = AHasher::default();

key.hash(&mut hasher);
// Use the 8 least significant bits directly, avoiding the modulus operation.
hasher.finish() as u8
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
}

// Calculate the total length of data points across all buckets.
fn total_data_points_count(&self) -> usize {
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
self.buckets
.iter()
.map(|bucket_mutex| {
let locked_bucket = bucket_mutex.lock().unwrap();
locked_bucket.as_ref().map_or(0, |bucket| bucket.len())
})
.sum::<usize>()
}
}

impl<T: Number<T>> ValueMap<T> {
Expand All @@ -43,22 +85,33 @@ impl<T: Number<T>> ValueMap<T> {
self.no_attribute_value.add(measurement);
self.has_no_value_attribute_value
.store(true, Ordering::Release);
} else if let Ok(mut values) = self.values.lock() {
let size = values.len();
match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => {
let sum = occupied_entry.get_mut();
*sum += measurement;
}
Entry::Vacant(vacant_entry) => {
if is_under_cardinality_limit(size) {
vacant_entry.insert(measurement);
} else {
values
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
.and_modify(|val| *val += measurement)
.or_insert(measurement);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
} else {
let bucket_index = Self::hash_to_bucket(&attrs) as usize; // Ensure index is usize for array indexing
let bucket_mutex = &self.buckets[bucket_index];
let mut bucket_guard = bucket_mutex.lock().unwrap();

if bucket_guard.is_none() {
*bucket_guard = Some(HashMap::new()); // Initialize the bucket if it's None
}

if let Some(ref mut values) = *bucket_guard {
let size = values.len();
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
match values.entry(attrs) {
Entry::Occupied(mut occupied_entry) => {
let sum = occupied_entry.get_mut();
*sum += measurement;
}
Entry::Vacant(vacant_entry) => {
if is_under_cardinality_limit(size) {
vacant_entry.insert(measurement);
} else {
// TBD - Update total_count ??
values
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to store the overflow attribute in each bucket? or once only?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overflow attribute is handled separately, and is stored only once.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure how is this handled.. it looks to me like we'll store overflow for each of the hasmaps.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, this was indeed overflow every hashmap. Have fixed it now. Will also add tests to validate this.

.and_modify(|val| *val += measurement)
.or_insert(measurement);
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
}
}
}
}
Expand Down Expand Up @@ -112,16 +165,10 @@ impl<T: Number<T>> Sum<T> {
s_data.is_monotonic = self.monotonic;
s_data.data_points.clear();

let mut values = match self.value_map.values.lock() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
let total_len: usize = self.value_map.total_data_points_count() + 1;
if total_len > s_data.data_points.capacity() {
let additional_space_needed = total_len - s_data.data_points.capacity();
s_data.data_points.reserve_exact(additional_space_needed);
}

let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
Expand All @@ -139,14 +186,19 @@ impl<T: Number<T>> Sum<T> {
});
}

for (attrs, value) in values.drain() {
s_data.data_points.push(DataPoint {
attributes: attrs,
start_time: Some(prev_start),
time: Some(t),
value,
exemplars: vec![],
});
for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() {
for (attrs, value) in locked_bucket.drain() {
s_data.data_points.push(DataPoint {
attributes: attrs,
start_time: Some(*self.start.lock().unwrap()),
time: Some(t),
value,
exemplars: vec![],
});
}
// The bucket is automatically cleared by the .drain() method
}
}

// The delta collection cycle resets.
Expand Down Expand Up @@ -181,16 +233,10 @@ impl<T: Number<T>> Sum<T> {
s_data.is_monotonic = self.monotonic;
s_data.data_points.clear();

let values = match self.value_map.values.lock() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
let total_len: usize = self.value_map.total_data_points_count() + 1;
if total_len > s_data.data_points.capacity() {
let additional_space_needed = total_len - s_data.data_points.capacity();
s_data.data_points.reserve_exact(additional_space_needed);
}

let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
Expand All @@ -213,14 +259,18 @@ impl<T: Number<T>> Sum<T> {
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
for (attrs, value) in values.iter() {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: *value,
exemplars: vec![],
});
for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() {
for (attrs, value) in locked_bucket.iter() {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(*self.start.lock().unwrap()), // Consider last reset time
time: Some(t),
value: *value,
exemplars: vec![],
});
}
}
}

(
Expand Down Expand Up @@ -274,18 +324,13 @@ impl<T: Number<T>> PrecomputedSum<T> {
s_data.temporality = Temporality::Delta;
s_data.is_monotonic = self.monotonic;

let mut values = match self.value_map.values.lock() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
let total_len: usize = self.value_map.total_data_points_count() + 1;
if total_len > s_data.data_points.capacity() {
let additional_space_needed = total_len - s_data.data_points.capacity();
s_data.data_points.reserve_exact(additional_space_needed);
}
let mut new_reported = HashMap::with_capacity(n);

let mut new_reported = HashMap::with_capacity(total_len);
let mut reported = match self.reported.lock() {
Ok(r) => r,
Err(_) => return (0, None),
Expand All @@ -305,19 +350,23 @@ impl<T: Number<T>> PrecomputedSum<T> {
});
}

let default = T::default();
for (attrs, value) in values.drain() {
let delta = value - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value);
for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref mut locked_bucket) = *bucket_mutex.lock().unwrap() {
let default = T::default();
for (attrs, value) in locked_bucket.drain() {
let delta = value - *reported.get(&attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), value);
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}

// The delta collection cycle resets.
Expand Down Expand Up @@ -356,18 +405,13 @@ impl<T: Number<T>> PrecomputedSum<T> {
s_data.temporality = Temporality::Cumulative;
s_data.is_monotonic = self.monotonic;

let values = match self.value_map.values.lock() {
Ok(v) => v,
Err(_) => return (0, None),
};

let n = values.len() + 1;
if n > s_data.data_points.capacity() {
s_data
.data_points
.reserve_exact(n - s_data.data_points.capacity());
let total_len: usize = self.value_map.total_data_points_count() + 1;
if total_len > s_data.data_points.capacity() {
let additional_space_needed = total_len - s_data.data_points.capacity();
s_data.data_points.reserve_exact(additional_space_needed);
}
let mut new_reported = HashMap::with_capacity(n);

let mut new_reported = HashMap::with_capacity(total_len);
let mut reported = match self.reported.lock() {
Ok(r) => r,
Err(_) => return (0, None),
Expand All @@ -388,18 +432,22 @@ impl<T: Number<T>> PrecomputedSum<T> {
}

let default = T::default();
for (attrs, value) in values.iter() {
let delta = *value - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), *value);
for bucket_mutex in self.value_map.buckets.iter() {
if let Some(ref locked_bucket) = *bucket_mutex.lock().unwrap() {
for (attrs, value) in locked_bucket.iter() {
let delta = *value - *reported.get(attrs).unwrap_or(&default);
if delta != default {
new_reported.insert(attrs.clone(), *value);
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: *value, // For cumulative, we use the value directly without calculating delta
exemplars: vec![],
});
}
}
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}

*reported = new_reported;
Expand Down
3 changes: 3 additions & 0 deletions opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ impl TemporalitySelector for InMemoryMetricsExporter {
#[async_trait]
impl PushMetricsExporter for InMemoryMetricsExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
if metrics.scope_metrics.is_empty() || metrics.scope_metrics[0].metrics.is_empty() {
lalitb marked this conversation as resolved.
Show resolved Hide resolved
return Ok(());
}
self.metrics
.lock()
.map(|mut metrics_guard| {
Expand Down
2 changes: 1 addition & 1 deletion stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ctrlc = "3.2.5"
lazy_static = "1.4.0"
num_cpus = "1.15.0"
opentelemetry = { path = "../opentelemetry", features = ["metrics", "logs", "trace"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace"] }
opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["metrics", "logs", "trace", "use_hashbrown"] }
opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"}
rand = { version = "0.8.4", features = ["small_rng"] }
tracing = { workspace = true, features = ["std"]}
Expand Down
Loading