From c7556ac9e378f439aaf34735610b534fa01065dc Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 11 Jun 2021 07:20:01 -0700 Subject: [PATCH 01/35] WIP: exploration of aggregate transform Signed-off-by: Ross McFarland --- Cargo.toml | 2 + src/internal_events/aggregate.rs | 11 + src/internal_events/mod.rs | 2 + src/transforms/aggregate.rs | 433 +++++++++++++++++++++++++++++++ src/transforms/mod.rs | 2 + 5 files changed, 450 insertions(+) create mode 100644 src/internal_events/aggregate.rs create mode 100644 src/transforms/aggregate.rs diff --git a/Cargo.toml b/Cargo.toml index 5e009bee0bb0d..5ad71137e9ee8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -484,6 +484,7 @@ transforms-logs = [ ] transforms-metrics = [ "transforms-add_tags", + "transforms-aggregate", "transforms-filter", "transforms-log_to_metric", "transforms-lua", @@ -495,6 +496,7 @@ transforms-metrics = [ transforms-add_fields = [] transforms-add_tags = [] +transforms-aggregate = [] transforms-ansi_stripper = [] transforms-aws_cloudwatch_logs_subscription_parser= [] transforms-aws_ec2_metadata = ["evmap"] diff --git a/src/internal_events/aggregate.rs b/src/internal_events/aggregate.rs new file mode 100644 index 0000000000000..d68eca2d5d1cb --- /dev/null +++ b/src/internal_events/aggregate.rs @@ -0,0 +1,11 @@ +use super::InternalEvent; +use metrics::counter; + +#[derive(Debug)] +pub struct AggregateEventDiscarded; + +impl InternalEvent for AggregateEventDiscarded { + fn emit_metrics(&self) { + counter!("events_discarded_total", 1); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 21627bb5db41d..62c3cfac2d316 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; mod adaptive_concurrency; mod add_fields; mod add_tags; +mod aggregate; mod ansi_stripper; #[cfg(feature = "sources-apache_metrics")] mod apache_metrics; @@ -132,6 +133,7 @@ pub mod kubernetes; pub use self::adaptive_concurrency::*; pub use self::add_fields::*; pub use self::add_tags::*; +pub use self::aggregate::*; pub use self::ansi_stripper::*; #[cfg(feature = "sources-apache_metrics")] pub use self::apache_metrics::*; diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs new file mode 100644 index 0000000000000..68fd5eb8854f0 --- /dev/null +++ b/src/transforms/aggregate.rs @@ -0,0 +1,433 @@ +// let g:cargo_makeprg_params = = '--lib --no-default-features --features=transforms-aggregate transforms::aggregate' +use chrono::{DateTime, Utc}; +use crate::{ + internal_events::AggregateEventDiscarded, + transforms::{ + TaskTransform, + Transform, + }, +}; +use crate::{ + config::{DataType, GenerateConfig, GlobalOptions, TransformConfig, TransformDescription}, + event::{ + Event, + metric, + }, +}; +use futures::{Stream, StreamExt}; +use serde::{Deserialize, Serialize}; +use std::{ + collections::HashMap, + future::ready, + pin::Pin, +}; + +#[derive(Deserialize, Serialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct AggregateConfig { + pub interval: f64, +} + +inventory::submit! { + TransformDescription::new::("aggregate") +} + +impl GenerateConfig for AggregateConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + interval: 10.0, + }) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "aggregate")] +impl TransformConfig for AggregateConfig { + async fn build(&self, _globals: &GlobalOptions) -> crate::Result { + Ok(Transform::task(Aggregate::new(self.interval))) + } + + fn input_type(&self) -> DataType { + DataType::Metric + } + + fn output_type(&self) -> DataType { + DataType::Metric + } + + fn transform_type(&self) -> &'static str { + "aggregate" + } +} + +#[derive(Debug, Eq, Hash, PartialEq)] +struct Key { + pub name: metric::MetricName, + pub tags: Option, +} + +impl Key { + fn new( + name: metric::MetricName, + tags: Option, + ) -> Self { + Self { + name, + tags, + } + } +} + +#[derive(Debug)] +struct Record { + pub value: metric::MetricValue, + pub most_recent: DateTime, +} + +impl Record { + fn new(value: metric::MetricValue, most_recent: DateTime) -> Self { + Self { + value, + most_recent, + } + } +} + +#[derive(Debug)] +pub struct Aggregate { + interval: f64, + map: HashMap, +} + +impl Aggregate { + pub fn new(interval: f64) -> Self { + let map = HashMap::new(); + Self { + interval, + map, + } + } + + fn record(&mut self, event: Event) -> Option { + let metric = event.as_metric(); + let series = metric.series(); + let data = metric.data(); + let key = Key::new(series.name.clone(), series.tags.clone()); + + let timestamp = match data.timestamp { + Some(datetime) => datetime, + None => Utc::now(), + }; + + match self.map.get_mut(&key) { + Some(record) => { + // Exists, add/increment or replace based on kind + match data.kind { + metric::MetricKind::Incremental => record.value.add(&data.value), + metric::MetricKind::Absolute => { + record.value = data.value.clone(); + true + }, + }; + record.most_recent = timestamp; + }, + _ => { + // Doesn't exist, insert a new record regardless + self.map.insert(key, Record::new(data.value.clone(), timestamp)); + () + } + }; + + // TODO: discarded or recorded? + emit!(AggregateEventDiscarded); + None + } +} + +impl TaskTransform for Aggregate { + fn transform( + self: Box, + task: Pin + Send>>, + ) -> Pin + Send>> + where + Self: 'static, + { + let mut inner = self; + println!("before"); + let ret = Box::pin(task.filter_map(move |e| ready(inner.record(e)))); + println!("after"); + ret + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{event::metric, event::Event, event::Metric}; + use std::collections::BTreeMap; + + #[test] + fn genreate_config() { + crate::test_util::test_generate_config::(); + } + + #[test] + fn counters() { + let mut agg = Aggregate::new(10.0); + + let counter_a = metric::MetricValue::Counter { value: 42.0 }; + let counter_b = metric::MetricValue::Counter { value: 43.0 }; + let summed = metric::MetricValue::Counter { value: 85.0 }; + let tags: BTreeMap = + vec![("tag1".into(), "val1".into())].into_iter().collect(); + + // Single item, just stored regardless of kind + agg.record(make_metric("counter", metric::MetricKind::Incremental, + counter_a.clone(), tags.clone())); + assert_eq!(1, agg.map.len()); + match agg.map.values().next() { + Some(record) => assert_eq!(counter_a, record.value), + _ => assert!(false), + } + + // When sent absolute, replaced, not incremented + agg.record(make_metric("counter", metric::MetricKind::Absolute, + counter_b.clone(), tags.clone())); + assert_eq!(1, agg.map.len()); + match agg.map.values().next() { + Some(record) => assert_eq!(counter_b, record.value), + _ => assert!(false), + } + + // Now back to incremental, expect them to be added + agg.record(make_metric("counter", metric::MetricKind::Incremental, + counter_a.clone(), tags.clone())); + assert_eq!(1, agg.map.len()); + match agg.map.values().next() { + Some(record) => assert_eq!(summed, record.value), + _ => assert!(false), + }; + + // Different name should create a distinct entry + agg.record(make_metric("counter2", metric::MetricKind::Incremental, + counter_a.clone(), tags.clone())); + assert_eq!(2, agg.map.len()); + for (key, record) in &agg.map { + match key.name.name.as_str() { + "counter" => assert_eq!(summed, record.value), + "counter2" => assert_eq!(counter_a, record.value), + _ => assert!(false), + } + } + + // Different MetricValue type, guage, with same name & tags is ignored, first establishes + // type + let guage = metric::MetricValue::Gauge { value: 44.0 }; + agg.record(make_metric("counter", metric::MetricKind::Incremental, + guage.clone(), tags.clone())); + // Nothing changed + assert_eq!(2, agg.map.len()); + for (key, record) in &agg.map { + match key.name.name.as_str() { + "counter" => assert_eq!(summed, record.value), + "counter2" => assert_eq!(counter_a, record.value), + _ => assert!(false), + } + } + } + + fn make_metric( + name: &'static str, + kind: metric::MetricKind, + value: metric::MetricValue, + tags: BTreeMap, + ) -> Event { + Event::Metric( + Metric::new( + name, + kind, + value, + ) + .with_tags(Some(tags)), + ) + } + + /* + use super::*; + use crate::{ + conditions::check_fields::CheckFieldsPredicateArg, config::log_schema, event::Event, + test_util::random_lines, transforms::test::transform_one, + }; + use approx::assert_relative_eq; + use indexmap::IndexMap; + + fn condition_contains(pre: &str) -> Box { + condition(log_schema().message_key(), "contains", pre) + } + + fn condition(field: &str, condition: &str, value: &str) -> Box { + let mut preds: IndexMap = IndexMap::new(); + preds.insert( + format!("{}.{}", field, condition), + CheckFieldsPredicateArg::String(value.into()), + ); + + CheckFieldsConfig::new(preds).build().unwrap() + } + + #[test] + fn genreate_config() { + crate::test_util::test_generate_config::(); + } + + #[test] + fn hash_samples_at_roughly_the_configured_rate() { + let num_events = 10000; + + let events = random_events(num_events); + let mut sampler = Aggregate::new( + 2, + Some(log_schema().message_key().into()), + Some(condition_contains("na")), + ); + let total_passed = events + .into_iter() + .filter_map(|event| { + let mut buf = Vec::with_capacity(1); + sampler.transform(&mut buf, event); + buf.pop() + }) + .count(); + let ideal = 1.0f64 / 2.0f64; + let actual = total_passed as f64 / num_events as f64; + assert_relative_eq!(ideal, actual, epsilon = ideal * 0.5); + + let events = random_events(num_events); + let mut sampler = Aggregate::new( + 25, + Some(log_schema().message_key().into()), + Some(condition_contains("na")), + ); + let total_passed = events + .into_iter() + .filter_map(|event| { + let mut buf = Vec::with_capacity(1); + sampler.transform(&mut buf, event); + buf.pop() + }) + .count(); + let ideal = 1.0f64 / 25.0f64; + let actual = total_passed as f64 / num_events as f64; + assert_relative_eq!(ideal, actual, epsilon = ideal * 0.5); + } + + #[test] + fn hash_consistently_samples_the_same_events() { + let events = random_events(1000); + let mut sampler = Aggregate::new( + 2, + Some(log_schema().message_key().into()), + Some(condition_contains("na")), + ); + + let first_run = events + .clone() + .into_iter() + .filter_map(|event| { + let mut buf = Vec::with_capacity(1); + sampler.transform(&mut buf, event); + buf.pop() + }) + .collect::>(); + let second_run = events + .into_iter() + .filter_map(|event| { + let mut buf = Vec::with_capacity(1); + sampler.transform(&mut buf, event); + buf.pop() + }) + .collect::>(); + + assert_eq!(first_run, second_run); + } + + #[test] + fn always_passes_events_matching_pass_list() { + for key_field in &[None, Some(log_schema().message_key().into())] { + let event = Event::from("i am important"); + let mut sampler = + Aggregate::new(0, key_field.clone(), Some(condition_contains("important"))); + let iterations = 0..1000; + let total_passed = iterations + .filter_map(|_| { + transform_one(&mut sampler, event.clone()) + .map(|result| assert_eq!(result, event)) + }) + .count(); + assert_eq!(total_passed, 1000); + } + } + + #[test] + fn handles_key_field() { + for key_field in &[None, Some(log_schema().timestamp_key().into())] { + let event = Event::from("nananana"); + let mut sampler = Aggregate::new( + 0, + key_field.clone(), + Some(condition(log_schema().timestamp_key(), "contains", ":")), + ); + let iterations = 0..1000; + let total_passed = iterations + .filter_map(|_| { + transform_one(&mut sampler, event.clone()) + .map(|result| assert_eq!(result, event)) + }) + .count(); + assert_eq!(total_passed, 1000); + } + } + + #[test] + fn sampler_adds_sampling_rate_to_event() { + for key_field in &[None, Some(log_schema().message_key().into())] { + let events = random_events(10000); + let mut sampler = Aggregate::new(10, key_field.clone(), Some(condition_contains("na"))); + let passing = events + .into_iter() + .filter(|s| { + !s.as_log()[log_schema().message_key()] + .to_string_lossy() + .contains("na") + }) + .find_map(|event| transform_one(&mut sampler, event)) + .unwrap(); + assert_eq!(passing.as_log()["sample_rate"], "10".into()); + + let events = random_events(10000); + let mut sampler = Aggregate::new(25, key_field.clone(), Some(condition_contains("na"))); + let passing = events + .into_iter() + .filter(|s| { + !s.as_log()[log_schema().message_key()] + .to_string_lossy() + .contains("na") + }) + .find_map(|event| transform_one(&mut sampler, event)) + .unwrap(); + assert_eq!(passing.as_log()["sample_rate"], "25".into()); + + // If the event passed the regex check, don't include the sampling rate + let mut sampler = Aggregate::new(25, key_field.clone(), Some(condition_contains("na"))); + let event = Event::from("nananana"); + let passing = transform_one(&mut sampler, event).unwrap(); + assert!(passing.as_log().get("sample_rate").is_none()); + } + } + + fn random_events(n: usize) -> Vec { + random_lines(10).take(n).map(Event::from).collect() + } + */ +} diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index 068e5611e4c12..0f0f704aec703 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -4,6 +4,8 @@ use snafu::Snafu; pub mod add_fields; #[cfg(feature = "transforms-add_tags")] pub mod add_tags; +#[cfg(feature = "transforms-aggregate")] +pub mod aggregate; #[cfg(feature = "transforms-ansi_stripper")] pub mod ansi_stripper; #[cfg(feature = "transforms-aws_cloudwatch_logs_subscription_parser")] From 6b5126f66bab6fa8182907211e09afba20124d5e Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 11 Jun 2021 10:09:55 -0700 Subject: [PATCH 02/35] Rework plumbing based on Reduce to allow async/timer generation Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 166 +++++++++++++++++------------------- 1 file changed, 78 insertions(+), 88 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 68fd5eb8854f0..f6a2ef2508fe6 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -1,51 +1,43 @@ // let g:cargo_makeprg_params = = '--lib --no-default-features --features=transforms-aggregate transforms::aggregate' -use chrono::{DateTime, Utc}; use crate::{ internal_events::AggregateEventDiscarded, transforms::{ TaskTransform, Transform, }, -}; -use crate::{ - config::{DataType, GenerateConfig, GlobalOptions, TransformConfig, TransformDescription}, + config::{DataType, GlobalOptions, TransformConfig, TransformDescription}, event::{ - Event, metric, + Event, + EventMetadata, }, }; -use futures::{Stream, StreamExt}; +use async_stream::stream; +use futures::{stream, Stream, StreamExt}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, - future::ready, pin::Pin, + time::{Duration}, }; -#[derive(Deserialize, Serialize, Debug, Clone)] -#[serde(deny_unknown_fields)] +#[derive(Deserialize, Serialize, Debug, Default, Clone)] +#[serde(deny_unknown_fields, default)] pub struct AggregateConfig { - pub interval: f64, + pub interval_ms: Option, } inventory::submit! { TransformDescription::new::("aggregate") } -impl GenerateConfig for AggregateConfig { - fn generate_config() -> toml::Value { - toml::Value::try_from(Self { - interval: 10.0, - }) - .unwrap() - } -} +impl_generate_config_from_default!(AggregateConfig); #[async_trait::async_trait] #[typetag::serde(name = "aggregate")] impl TransformConfig for AggregateConfig { async fn build(&self, _globals: &GlobalOptions) -> crate::Result { - Ok(Transform::task(Aggregate::new(self.interval))) + Aggregate::new(self).map(Transform::task) } fn input_type(&self) -> DataType { @@ -61,103 +53,97 @@ impl TransformConfig for AggregateConfig { } } -#[derive(Debug, Eq, Hash, PartialEq)] -struct Key { - pub name: metric::MetricName, - pub tags: Option, -} - -impl Key { - fn new( - name: metric::MetricName, - tags: Option, - ) -> Self { - Self { - name, - tags, - } - } -} - -#[derive(Debug)] -struct Record { - pub value: metric::MetricValue, - pub most_recent: DateTime, -} - -impl Record { - fn new(value: metric::MetricValue, most_recent: DateTime) -> Self { - Self { - value, - most_recent, - } - } -} - #[derive(Debug)] pub struct Aggregate { - interval: f64, - map: HashMap, + interval: Duration, + map: HashMap>, } impl Aggregate { - pub fn new(interval: f64) -> Self { + pub fn new(config: &AggregateConfig) -> crate::Result { let map = HashMap::new(); - Self { - interval, - map, - } + + Ok(Self { + interval: Duration::from_millis(config.interval_ms.unwrap_or(10 * 1000)), + map, + }) } - fn record(&mut self, event: Event) -> Option { + fn record(&mut self, event: Event) { let metric = event.as_metric(); let series = metric.series(); let data = metric.data(); - let key = Key::new(series.name.clone(), series.tags.clone()); - let timestamp = match data.timestamp { - Some(datetime) => datetime, - None => Utc::now(), - }; - - match self.map.get_mut(&key) { - Some(record) => { - // Exists, add/increment or replace based on kind - match data.kind { - metric::MetricKind::Incremental => record.value.add(&data.value), - metric::MetricKind::Absolute => { - record.value = data.value.clone(); - true - }, - }; - record.most_recent = timestamp; - }, + match self.map.get_mut(&series) { + Some(datum) => datum.push(data.clone()), _ => { - // Doesn't exist, insert a new record regardless - self.map.insert(key, Record::new(data.value.clone(), timestamp)); + self.map.insert(series.clone(), vec![data.clone()]); () } }; // TODO: discarded or recorded? emit!(AggregateEventDiscarded); - None + } + + fn flush_into(&mut self, output: &mut Vec) { + // TODO: should we preserve one, there's no way to combine? + let metadata = EventMetadata::default(); + for (series, datas) in &self.map { + for data in datas { + let metric = metric::Metric::from_parts(series.clone(), data.clone(), metadata.clone()); + output.push(Event::Metric(metric)); + } + } + } + + fn flush_all_into(&mut self, _output: &mut Vec) { + // TODO? } } impl TaskTransform for Aggregate { fn transform( self: Box, - task: Pin + Send>>, + mut input_rx: Pin + Send>>, ) -> Pin + Send>> where Self: 'static, { - let mut inner = self; - println!("before"); - let ret = Box::pin(task.filter_map(move |e| ready(inner.record(e)))); - println!("after"); - ret + let mut me = self; + + let interval = me.interval; + + let mut flush_stream = tokio::time::interval(interval); + + Box::pin( + stream! { + loop { + let mut output = Vec::new(); + let done = tokio::select! { + _ = flush_stream.tick() => { + me.flush_into(&mut output); + false + } + maybe_event = input_rx.next() => { + match maybe_event { + None => { + me.flush_all_into(&mut output); + true + } + Some(event) => { + me.record(event); + false + } + } + } + }; + yield stream::iter(output.into_iter()); + if done { break } + } + } + .flatten(), + ) } } @@ -174,7 +160,8 @@ mod tests { #[test] fn counters() { - let mut agg = Aggregate::new(10.0); + /* + let mut agg = Aggregate::new(Duration::from_millis(10 * 1000)); let counter_a = metric::MetricValue::Counter { value: 42.0 }; let counter_b = metric::MetricValue::Counter { value: 43.0 }; @@ -235,8 +222,10 @@ mod tests { _ => assert!(false), } } + */ } + /* fn make_metric( name: &'static str, kind: metric::MetricKind, @@ -252,6 +241,7 @@ mod tests { .with_tags(Some(tags)), ) } + */ /* use super::*; From 3475e466a15ebcec4ec98e5bd19f692803a27b82 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 11 Jun 2021 11:22:14 -0700 Subject: [PATCH 03/35] Working a/b maps for Aggregate Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 61 +++++++++++++++++++++++++++++++------ 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index f6a2ef2508fe6..6e0797fce120d 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -56,16 +56,18 @@ impl TransformConfig for AggregateConfig { #[derive(Debug)] pub struct Aggregate { interval: Duration, - map: HashMap>, + map_a: HashMap>, + map_b: HashMap>, + using_b: bool, } impl Aggregate { pub fn new(config: &AggregateConfig) -> crate::Result { - let map = HashMap::new(); - Ok(Self { interval: Duration::from_millis(config.interval_ms.unwrap_or(10 * 1000)), - map, + map_a: HashMap::new(), + map_b: HashMap::new(), + using_b: false, }) } @@ -74,10 +76,15 @@ impl Aggregate { let series = metric.series(); let data = metric.data(); - match self.map.get_mut(&series) { + let map = match self.using_b { + true => &mut self.map_b, + false => &mut self.map_a, + }; + + match map.get_mut(&series) { Some(datum) => datum.push(data.clone()), _ => { - self.map.insert(series.clone(), vec![data.clone()]); + map.insert(series.clone(), vec![data.clone()]); () } }; @@ -87,11 +94,43 @@ impl Aggregate { } fn flush_into(&mut self, output: &mut Vec) { - // TODO: should we preserve one, there's no way to combine? + + // TODO: locking/safety etc... + let map = match self.using_b { + true => { + self.using_b = false; + &mut self.map_b + }, + false => { + self.using_b = true; + &mut self.map_a + }, + }; + + if map.len() == 0 { + return + } + + // TODO: should we preserve one of these overall, per MetricSeries, ??? + // TODO: Doesn't seem like there's a way to combine. let metadata = EventMetadata::default(); - for (series, datas) in &self.map { - for data in datas { - let metric = metric::Metric::from_parts(series.clone(), data.clone(), metadata.clone()); + + for (series, datas) in map { + if let Some(mut last) = datas.pop() { + // We're assuming each series has a single kind, nothing forces that to be the + // case, but it's not really clear what we'd do if they vary so... + match last.kind { + // Add them up + metric::MetricKind::Incremental => { + for data in datas { + last.add(&data); + } + }, + // Last one wins + metric::MetricKind::Absolute => (), + } + + let metric = metric::Metric::from_parts(series.clone(), last, metadata.clone()); output.push(Event::Metric(metric)); } } @@ -149,6 +188,7 @@ impl TaskTransform for Aggregate { #[cfg(test)] mod tests { + /* use super::*; use crate::{event::metric, event::Event, event::Metric}; use std::collections::BTreeMap; @@ -157,6 +197,7 @@ mod tests { fn genreate_config() { crate::test_util::test_generate_config::(); } + */ #[test] fn counters() { From f826e3ae12a1a030bcad16b391352c8b192f61d2 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 11 Jun 2021 13:19:51 -0700 Subject: [PATCH 04/35] Working statsd style aggregation Signed-off-by: Ross McFarland --- src/internal_events/aggregate.rs | 15 ++++++-- src/transforms/aggregate.rs | 64 ++++++++++++++------------------ 2 files changed, 40 insertions(+), 39 deletions(-) diff --git a/src/internal_events/aggregate.rs b/src/internal_events/aggregate.rs index d68eca2d5d1cb..2e720715683a6 100644 --- a/src/internal_events/aggregate.rs +++ b/src/internal_events/aggregate.rs @@ -2,10 +2,19 @@ use super::InternalEvent; use metrics::counter; #[derive(Debug)] -pub struct AggregateEventDiscarded; +pub struct AggregateEventRecorded; -impl InternalEvent for AggregateEventDiscarded { +impl InternalEvent for AggregateEventRecorded { fn emit_metrics(&self) { - counter!("events_discarded_total", 1); + counter!("events_recorded_total", 1); + } +} + +#[derive(Debug)] +pub struct AggregateFlushed; + +impl InternalEvent for AggregateFlushed { + fn emit_metrics(&self) { + counter!("flushed_total", 1); } } diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 6e0797fce120d..f99024b43c9ea 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -1,6 +1,6 @@ // let g:cargo_makeprg_params = = '--lib --no-default-features --features=transforms-aggregate transforms::aggregate' use crate::{ - internal_events::AggregateEventDiscarded, + internal_events::{AggregateEventRecorded, AggregateFlushed}, transforms::{ TaskTransform, Transform, @@ -53,11 +53,12 @@ impl TransformConfig for AggregateConfig { } } + #[derive(Debug)] pub struct Aggregate { interval: Duration, - map_a: HashMap>, - map_b: HashMap>, + map_a: HashMap, + map_b: HashMap, using_b: bool, } @@ -81,21 +82,29 @@ impl Aggregate { false => &mut self.map_a, }; - match map.get_mut(&series) { - Some(datum) => datum.push(data.clone()), - _ => { - map.insert(series.clone(), vec![data.clone()]); - () + match data.kind { + metric::MetricKind::Incremental => { + match map.get_mut(&series) { + // We already have something, add to it + Some(existing) => existing.add(data), + None => { + // New so store + map.insert(series.clone(), data.clone()); + true + } + }; + }, + metric::MetricKind::Absolute => { + // Always store + map.insert(series.clone(), data.clone()); } }; - // TODO: discarded or recorded? - emit!(AggregateEventDiscarded); + emit!(AggregateEventRecorded); } fn flush_into(&mut self, output: &mut Vec) { - - // TODO: locking/safety etc... + // TODO: locking/safety etc... we can also only have 1 call into flush at a time let map = match self.using_b { true => { self.using_b = false; @@ -111,33 +120,16 @@ impl Aggregate { return } - // TODO: should we preserve one of these overall, per MetricSeries, ??? - // TODO: Doesn't seem like there's a way to combine. + // TODO: not clear how this should work with aggregation so just stuffing a default one in + // for now. let metadata = EventMetadata::default(); - for (series, datas) in map { - if let Some(mut last) = datas.pop() { - // We're assuming each series has a single kind, nothing forces that to be the - // case, but it's not really clear what we'd do if they vary so... - match last.kind { - // Add them up - metric::MetricKind::Incremental => { - for data in datas { - last.add(&data); - } - }, - // Last one wins - metric::MetricKind::Absolute => (), - } - - let metric = metric::Metric::from_parts(series.clone(), last, metadata.clone()); - output.push(Event::Metric(metric)); - } + for (series, metric) in map.drain() { + let metric = metric::Metric::from_parts(series, metric, metadata.clone()); + output.push(Event::Metric(metric)); } - } - fn flush_all_into(&mut self, _output: &mut Vec) { - // TODO? + emit!(AggregateFlushed); } } @@ -167,7 +159,7 @@ impl TaskTransform for Aggregate { maybe_event = input_rx.next() => { match maybe_event { None => { - me.flush_all_into(&mut output); + me.flush_into(&mut output); true } Some(event) => { From e1c6e08f25fdf63c2bf3d0d0414234c007e97ef7 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 11 Jun 2021 19:26:36 -0700 Subject: [PATCH 05/35] Add tests for aggregate and rework/fix bits found Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 403 ++++++++++++++---------------------- 1 file changed, 156 insertions(+), 247 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index f99024b43c9ea..a6d37e471d111 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -103,7 +103,9 @@ impl Aggregate { emit!(AggregateEventRecorded); } - fn flush_into(&mut self, output: &mut Vec) { + fn flush_into(&mut self, output: &mut Vec) -> u64 { + let mut count = 0_u64; + // TODO: locking/safety etc... we can also only have 1 call into flush at a time let map = match self.using_b { true => { @@ -116,20 +118,20 @@ impl Aggregate { }, }; - if map.len() == 0 { - return - } - - // TODO: not clear how this should work with aggregation so just stuffing a default one in - // for now. - let metadata = EventMetadata::default(); + if map.len() > 0 { + // TODO: not clear how this should work with aggregation so just stuffing a default one + // in for now. + let metadata = EventMetadata::default(); - for (series, metric) in map.drain() { - let metric = metric::Metric::from_parts(series, metric, metadata.clone()); - output.push(Event::Metric(metric)); + for (series, metric) in map.drain() { + let metric = metric::Metric::from_parts(series, metric, metadata.clone()); + output.push(Event::Metric(metric)); + count += 1; + } } emit!(AggregateFlushed); + return count; } } @@ -180,85 +182,15 @@ impl TaskTransform for Aggregate { #[cfg(test)] mod tests { - /* use super::*; - use crate::{event::metric, event::Event, event::Metric}; use std::collections::BTreeMap; + use crate::{event::metric, event::Event, event::Metric}; #[test] - fn genreate_config() { + fn generate_config() { crate::test_util::test_generate_config::(); } - */ - - #[test] - fn counters() { - /* - let mut agg = Aggregate::new(Duration::from_millis(10 * 1000)); - let counter_a = metric::MetricValue::Counter { value: 42.0 }; - let counter_b = metric::MetricValue::Counter { value: 43.0 }; - let summed = metric::MetricValue::Counter { value: 85.0 }; - let tags: BTreeMap = - vec![("tag1".into(), "val1".into())].into_iter().collect(); - - // Single item, just stored regardless of kind - agg.record(make_metric("counter", metric::MetricKind::Incremental, - counter_a.clone(), tags.clone())); - assert_eq!(1, agg.map.len()); - match agg.map.values().next() { - Some(record) => assert_eq!(counter_a, record.value), - _ => assert!(false), - } - - // When sent absolute, replaced, not incremented - agg.record(make_metric("counter", metric::MetricKind::Absolute, - counter_b.clone(), tags.clone())); - assert_eq!(1, agg.map.len()); - match agg.map.values().next() { - Some(record) => assert_eq!(counter_b, record.value), - _ => assert!(false), - } - - // Now back to incremental, expect them to be added - agg.record(make_metric("counter", metric::MetricKind::Incremental, - counter_a.clone(), tags.clone())); - assert_eq!(1, agg.map.len()); - match agg.map.values().next() { - Some(record) => assert_eq!(summed, record.value), - _ => assert!(false), - }; - - // Different name should create a distinct entry - agg.record(make_metric("counter2", metric::MetricKind::Incremental, - counter_a.clone(), tags.clone())); - assert_eq!(2, agg.map.len()); - for (key, record) in &agg.map { - match key.name.name.as_str() { - "counter" => assert_eq!(summed, record.value), - "counter2" => assert_eq!(counter_a, record.value), - _ => assert!(false), - } - } - - // Different MetricValue type, guage, with same name & tags is ignored, first establishes - // type - let guage = metric::MetricValue::Gauge { value: 44.0 }; - agg.record(make_metric("counter", metric::MetricKind::Incremental, - guage.clone(), tags.clone())); - // Nothing changed - assert_eq!(2, agg.map.len()); - for (key, record) in &agg.map { - match key.name.name.as_str() { - "counter" => assert_eq!(summed, record.value), - "counter2" => assert_eq!(counter_a, record.value), - _ => assert!(false), - } - } - */ - } - - /* fn make_metric( name: &'static str, kind: metric::MetricKind, @@ -274,183 +206,160 @@ mod tests { .with_tags(Some(tags)), ) } - */ - - /* - use super::*; - use crate::{ - conditions::check_fields::CheckFieldsPredicateArg, config::log_schema, event::Event, - test_util::random_lines, transforms::test::transform_one, - }; - use approx::assert_relative_eq; - use indexmap::IndexMap; - - fn condition_contains(pre: &str) -> Box { - condition(log_schema().message_key(), "contains", pre) - } - fn condition(field: &str, condition: &str, value: &str) -> Box { - let mut preds: IndexMap = IndexMap::new(); - preds.insert( - format!("{}.{}", field, condition), - CheckFieldsPredicateArg::String(value.into()), - ); + #[test] + fn incremental() { + let mut agg = Aggregate::new(&AggregateConfig { interval_ms: Some(1000_u64) }).unwrap(); - CheckFieldsConfig::new(preds).build().unwrap() - } + let tags: BTreeMap = + vec![("tag1".into(), "val1".into())].into_iter().collect(); + let counter_a_1 = make_metric("counter_a", metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 42.0 }, tags.clone()); + let counter_a_2 = make_metric("counter_a", metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 43.0 }, tags.clone()); + let counter_a_summed = make_metric("counter_a", metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 85.0 }, tags.clone()); - #[test] - fn genreate_config() { - crate::test_util::test_generate_config::(); + // Single item, just stored regardless of kind + agg.record(counter_a_1.clone()); + let mut out = vec![]; + // We should flush 1 item counter_a_1 + assert_eq!(1, agg.flush_into(&mut out)); + assert_eq!(1, out.len()); + assert_eq!(&counter_a_1, &out[0]); + + // A subsequent flush doesn't send out anything + out.clear(); + assert_eq!(0, agg.flush_into(&mut out)); + assert_eq!(0, out.len()); + + // One more just to make sure that we don't re-see from the other buffer + out.clear(); + assert_eq!(0, agg.flush_into(&mut out)); + assert_eq!(0, out.len()); + + // Two increments with the same series, should sum into 1 + agg.record(counter_a_1.clone()); + agg.record(counter_a_2.clone()); + out.clear(); + assert_eq!(1, agg.flush_into(&mut out)); + assert_eq!(1, out.len()); + assert_eq!(&counter_a_summed, &out[0]); + + let counter_b_1 = make_metric("counter_b", metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 44.0 }, tags.clone()); + // Two increments with the different series, should get each back as-is + agg.record(counter_a_1.clone()); + agg.record(counter_b_1.clone()); + out.clear(); + assert_eq!(2, agg.flush_into(&mut out)); + assert_eq!(2, out.len()); + // B/c we don't know the order they'll come back + for event in out { + match event.as_metric().series().name.name.as_str() { + "counter_a" => assert_eq!(counter_a_1, event), + "counter_b" => assert_eq!(counter_b_1, event), + _ => assert!(false), + } + } } #[test] - fn hash_samples_at_roughly_the_configured_rate() { - let num_events = 10000; - - let events = random_events(num_events); - let mut sampler = Aggregate::new( - 2, - Some(log_schema().message_key().into()), - Some(condition_contains("na")), - ); - let total_passed = events - .into_iter() - .filter_map(|event| { - let mut buf = Vec::with_capacity(1); - sampler.transform(&mut buf, event); - buf.pop() - }) - .count(); - let ideal = 1.0f64 / 2.0f64; - let actual = total_passed as f64 / num_events as f64; - assert_relative_eq!(ideal, actual, epsilon = ideal * 0.5); - - let events = random_events(num_events); - let mut sampler = Aggregate::new( - 25, - Some(log_schema().message_key().into()), - Some(condition_contains("na")), - ); - let total_passed = events - .into_iter() - .filter_map(|event| { - let mut buf = Vec::with_capacity(1); - sampler.transform(&mut buf, event); - buf.pop() - }) - .count(); - let ideal = 1.0f64 / 25.0f64; - let actual = total_passed as f64 / num_events as f64; - assert_relative_eq!(ideal, actual, epsilon = ideal * 0.5); - } + fn absolute() { + let mut agg = Aggregate::new(&AggregateConfig { interval_ms: Some(1000_u64) }).unwrap(); - #[test] - fn hash_consistently_samples_the_same_events() { - let events = random_events(1000); - let mut sampler = Aggregate::new( - 2, - Some(log_schema().message_key().into()), - Some(condition_contains("na")), - ); - - let first_run = events - .clone() - .into_iter() - .filter_map(|event| { - let mut buf = Vec::with_capacity(1); - sampler.transform(&mut buf, event); - buf.pop() - }) - .collect::>(); - let second_run = events - .into_iter() - .filter_map(|event| { - let mut buf = Vec::with_capacity(1); - sampler.transform(&mut buf, event); - buf.pop() - }) - .collect::>(); - - assert_eq!(first_run, second_run); - } + let tags: BTreeMap = + vec![("tag1".into(), "val1".into())].into_iter().collect(); + let gauge_a_1 = make_metric("gauge_a", metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 42.0 }, tags.clone()); + let gauge_a_2 = make_metric("gauge_a", metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 43.0 }, tags.clone()); - #[test] - fn always_passes_events_matching_pass_list() { - for key_field in &[None, Some(log_schema().message_key().into())] { - let event = Event::from("i am important"); - let mut sampler = - Aggregate::new(0, key_field.clone(), Some(condition_contains("important"))); - let iterations = 0..1000; - let total_passed = iterations - .filter_map(|_| { - transform_one(&mut sampler, event.clone()) - .map(|result| assert_eq!(result, event)) - }) - .count(); - assert_eq!(total_passed, 1000); + // Single item, just stored regardless of kind + agg.record(gauge_a_1.clone()); + let mut out = vec![]; + // We should flush 1 item gauge_a_1 + assert_eq!(1, agg.flush_into(&mut out)); + assert_eq!(1, out.len()); + assert_eq!(&gauge_a_1, &out[0]); + + // A subsequent flush doesn't send out anything + out.clear(); + assert_eq!(0, agg.flush_into(&mut out)); + assert_eq!(0, out.len()); + + // One more just to make sure that we don't re-see from the other buffer + out.clear(); + assert_eq!(0, agg.flush_into(&mut out)); + assert_eq!(0, out.len()); + + // Two absolutes with the same series, should get the 2nd (last) back. + agg.record(gauge_a_1.clone()); + agg.record(gauge_a_2.clone()); + out.clear(); + assert_eq!(1, agg.flush_into(&mut out)); + assert_eq!(1, out.len()); + assert_eq!(&gauge_a_2, &out[0]); + + let gauge_b_1 = make_metric("gauge_b", metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 44.0 }, tags.clone()); + // Two increments with the different series, should get each back as-is + agg.record(gauge_a_1.clone()); + agg.record(gauge_b_1.clone()); + out.clear(); + assert_eq!(2, agg.flush_into(&mut out)); + assert_eq!(2, out.len()); + // B/c we don't know the order they'll come back + for event in out { + match event.as_metric().series().name.name.as_str() { + "gauge_a" => assert_eq!(gauge_a_1, event), + "gauge_b" => assert_eq!(gauge_b_1, event), + _ => assert!(false), + } } } - #[test] - fn handles_key_field() { - for key_field in &[None, Some(log_schema().timestamp_key().into())] { - let event = Event::from("nananana"); - let mut sampler = Aggregate::new( - 0, - key_field.clone(), - Some(condition(log_schema().timestamp_key(), "contains", ":")), - ); - let iterations = 0..1000; - let total_passed = iterations - .filter_map(|_| { - transform_one(&mut sampler, event.clone()) - .map(|result| assert_eq!(result, event)) - }) - .count(); - assert_eq!(total_passed, 1000); - } - } + #[tokio::test] + async fn transform() { + let agg = toml::from_str::( + r#" +interval_ms = 10 +"#, + ) + .unwrap() + .build(&GlobalOptions::default()) + .await + .unwrap(); + let agg = agg.into_task(); - #[test] - fn sampler_adds_sampling_rate_to_event() { - for key_field in &[None, Some(log_schema().message_key().into())] { - let events = random_events(10000); - let mut sampler = Aggregate::new(10, key_field.clone(), Some(condition_contains("na"))); - let passing = events - .into_iter() - .filter(|s| { - !s.as_log()[log_schema().message_key()] - .to_string_lossy() - .contains("na") - }) - .find_map(|event| transform_one(&mut sampler, event)) - .unwrap(); - assert_eq!(passing.as_log()["sample_rate"], "10".into()); - - let events = random_events(10000); - let mut sampler = Aggregate::new(25, key_field.clone(), Some(condition_contains("na"))); - let passing = events - .into_iter() - .filter(|s| { - !s.as_log()[log_schema().message_key()] - .to_string_lossy() - .contains("na") - }) - .find_map(|event| transform_one(&mut sampler, event)) - .unwrap(); - assert_eq!(passing.as_log()["sample_rate"], "25".into()); - - // If the event passed the regex check, don't include the sampling rate - let mut sampler = Aggregate::new(25, key_field.clone(), Some(condition_contains("na"))); - let event = Event::from("nananana"); - let passing = transform_one(&mut sampler, event).unwrap(); - assert!(passing.as_log().get("sample_rate").is_none()); + let tags: BTreeMap = + vec![("tag1".into(), "val1".into())].into_iter().collect(); + let counter_a_1 = make_metric("counter_a", metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 42.0 }, tags.clone()); + let counter_a_2 = make_metric("counter_a", metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 43.0 }, tags.clone()); + let counter_a_summed = make_metric("counter_a", metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 85.0 }, tags.clone()); + let gauge_a_1 = make_metric("gauge_a", metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 42.0 }, tags.clone()); + let gauge_a_2 = make_metric("gauge_a", metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 43.0 }, tags.clone()); + let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()]; + + let in_stream = Box::pin(stream::iter(inputs)); + let mut out_stream = agg.transform(in_stream); + + let mut count = 0; + while let Some(event) = out_stream.next().await { + count += 1; + match event.as_metric().series().name.name.as_str() { + "counter_a" => assert_eq!(counter_a_summed, event), + "gauge_a" => assert_eq!(gauge_a_2, event), + _ => assert!(false), + }; } - } - fn random_events(n: usize) -> Vec { - random_lines(10).take(n).map(Event::from).collect() + // There were only 2 + assert_eq!(2, count); } - */ } From c135e8ed4b45a5bdbead44ec51419ba25f8ed0ef Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Sat, 12 Jun 2021 07:36:55 -0700 Subject: [PATCH 06/35] Use MetricData::update, we already know it's incremental Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index a6d37e471d111..4f43964f53ca7 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -86,7 +86,7 @@ impl Aggregate { metric::MetricKind::Incremental => { match map.get_mut(&series) { // We already have something, add to it - Some(existing) => existing.add(data), + Some(existing) => existing.update(data), None => { // New so store map.insert(series.clone(), data.clone()); From 010d9d690fa699298e8b103b170d5d77a6fd9495 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Sat, 12 Jun 2021 08:07:22 -0700 Subject: [PATCH 07/35] Documentation for aggregate transform Signed-off-by: Ross McFarland --- .../setup/installation/manual/from-source.md | 2 + .../components/transforms/aggregate.cue | 188 ++++++++++++++++++ src/transforms/aggregate.rs | 6 +- 3 files changed, 194 insertions(+), 2 deletions(-) create mode 100644 docs/reference/components/transforms/aggregate.cue diff --git a/docs/manual/setup/installation/manual/from-source.md b/docs/manual/setup/installation/manual/from-source.md index 1d8eaccaf14a7..b7e61a3e3066e 100644 --- a/docs/manual/setup/installation/manual/from-source.md +++ b/docs/manual/setup/installation/manual/from-source.md @@ -400,6 +400,7 @@ features one has to pass a comma-separated list of component features. | `sources-vector` | Enables building of [`vector` source][docs.sources.vector]. | | `transforms-add_fields` | Enables building of [`add_fields` transform][docs.transforms.add_fields]. | | `transforms-add_tags` | Enables building of [`add_tags` transform][docs.transforms.add_tags]. | +| `transforms-aggregate` | Enables building of [`add_tags` transform][docs.transforms.add_tags]. | | `transforms-ansi_stripper` | Enables building of [`ansi_stripper` transform][docs.transforms.ansi_stripper]. | | `transforms-aws_cloudwatch_logs_subscription_parser` | Enables building of [`aws_cloudwatch_logs_subscription_parser` transform][docs.transforms.aws_cloudwatch_logs_subscription_parser]. | | `transforms-aws_ec2_metadata` | Enables building of [`aws_ec2_metadata` transform][docs.transforms.aws_ec2_metadata]. | @@ -527,6 +528,7 @@ features one has to pass a comma-separated list of component features. [docs.sources.vector]: /docs/reference/sources/vector/ [docs.transforms.add_fields]: /docs/reference/transforms/add_fields/ [docs.transforms.add_tags]: /docs/reference/transforms/add_tags/ +[docs.transforms.aggregate]: /docs/reference/transforms/aggregate/ [docs.transforms.ansi_stripper]: /docs/reference/transforms/ansi_stripper/ [docs.transforms.aws_cloudwatch_logs_subscription_parser]: /docs/reference/transforms/aws_cloudwatch_logs_subscription_parser/ [docs.transforms.aws_ec2_metadata]: /docs/reference/transforms/aws_ec2_metadata/ diff --git a/docs/reference/components/transforms/aggregate.cue b/docs/reference/components/transforms/aggregate.cue new file mode 100644 index 0000000000000..39a29503ab5d6 --- /dev/null +++ b/docs/reference/components/transforms/aggregate.cue @@ -0,0 +1,188 @@ +package metadata + +components: transforms: aggregate: { + title: "Aggregate" + + description: """ + Aggregates multiple metric events into a single metric event based on a + the MetricKind. Incremental metrics are "added", Absolute uses last + value wins semantics. + """ + + classes: { + commonly_used: false + development: "beta" + egress_method: "stream" + stateful: true + } + + features: { + aggregate: {} + } + + support: { + targets: { + "aarch64-unknown-linux-gnu": true + "aarch64-unknown-linux-musl": true + "armv7-unknown-linux-gnueabihf": true + "armv7-unknown-linux-musleabihf": true + "x86_64-apple-darwin": true + "x86_64-pc-windows-msv": true + "x86_64-unknown-linux-gnu": true + "x86_64-unknown-linux-musl": true + } + requirements: [] + warnings: [] + notices: [] + } + + configuration: { + interval_ms: { + common: true + description: """ + The interval over which metrics are aggregated in milliseconds. Over this period metrics with the + same series data (name, namespace, tags, ...) will be aggregated. + """ + required: false + warnings: [] + type: uint: { + default: 10000 + unit: "milliseconds" + } + } + } + + input: { + logs: false, + metrics: { + counter: true + distribution: true + gauge: true + histogram: true + set: true + summary: true + } + } + + examples: [ + { + title: "Aggregate over 15 seconds" + input: [ + { + metric: { + kind: "incremental" + name: "counter.1" + timestamp: "2021-07-12T07:58:44.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 1.1 + } + } + }, + { + metric: { + kind: "incremental" + name: "counter.1" + timestamp: "2021-07-12T07:58:45.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 2.2 + } + } + }, + { + metric: { + kind: "incremental" + name: "counter.1" + timestamp: "2021-07-12T07:58:45.223543Z" + tags: { + host: "different.host.com" + } + counter: { + value: 1.1 + } + } + }, + { + metric: { + kind: "absolute" + name: "guage.1" + timestamp: "2021-07-12T07:58:47.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 22.33 + } + } + }, + { + metric: { + kind: "absolute" + name: "guage.1" + timestamp: "2021-07-12T07:58:45.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 44.55 + } + } + }, + ] + configuration: { + interval_ms: 5000 + } + output: [ + { + metric: { + kind: "incremental" + name: "counter.1" + timestamp: "2021-07-12T07:58:45.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 3.3 + } + } + }, + { + metric: { + kind: "incremental" + name: "counter.1" + timestamp: "2021-07-12T07:58:45.223543Z" + tags: { + host: "different.host.com" + } + counter: { + value: 1.1 + } + } + }, + { + metric: { + kind: "absolute" + name: "guage.1" + timestamp: "2021-07-12T07:58:45.223543Z" + tags: { + host: "my.host.com" + } + counter: { + value: 44.55 + } + } + }, + ] + }, + ] + + telemetry: metrics: { + events_recorded_total: components.sources.internal_metrics.output.metrics.events_recorded_total + flushed_total: components.sources.internal_metrics.output.metrics.flushed_total + } +} diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 4f43964f53ca7..e0b430f53057e 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -24,6 +24,7 @@ use std::{ #[derive(Deserialize, Serialize, Debug, Default, Clone)] #[serde(deny_unknown_fields, default)] pub struct AggregateConfig { + /// The interval between flushes in milliseconds. pub interval_ms: Option, } @@ -53,6 +54,7 @@ impl TransformConfig for AggregateConfig { } } +//------------------------------------------------------------------------------ #[derive(Debug)] pub struct Aggregate { @@ -85,7 +87,7 @@ impl Aggregate { match data.kind { metric::MetricKind::Incremental => { match map.get_mut(&series) { - // We already have something, add to it + // We already have something, add to it, will update timestamp as well. Some(existing) => existing.update(data), None => { // New so store @@ -95,7 +97,7 @@ impl Aggregate { }; }, metric::MetricKind::Absolute => { - // Always store + // Always replace/store map.insert(series.clone(), data.clone()); } }; From 35cdb39f89caebdc1d8faa629558a039de7473cb Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Sun, 13 Jun 2021 16:39:11 -0700 Subject: [PATCH 08/35] cue fixes Signed-off-by: Ross McFarland --- .../components/transforms/aggregate.cue | 138 +++++++++--------- 1 file changed, 69 insertions(+), 69 deletions(-) diff --git a/docs/reference/components/transforms/aggregate.cue b/docs/reference/components/transforms/aggregate.cue index 39a29503ab5d6..4a82f4a1ca696 100644 --- a/docs/reference/components/transforms/aggregate.cue +++ b/docs/reference/components/transforms/aggregate.cue @@ -5,8 +5,8 @@ components: transforms: aggregate: { description: """ Aggregates multiple metric events into a single metric event based on a - the MetricKind. Incremental metrics are "added", Absolute uses last - value wins semantics. + the MetricKind. Incremental metrics are "added", Absolute uses last + value wins semantics. """ classes: { @@ -40,8 +40,8 @@ components: transforms: aggregate: { interval_ms: { common: true description: """ - The interval over which metrics are aggregated in milliseconds. Over this period metrics with the - same series data (name, namespace, tags, ...) will be aggregated. + The interval over which metrics are aggregated in milliseconds. Over this period metrics with the + same series data (name, namespace, tags, ...) will be aggregated. """ required: false warnings: [] @@ -70,111 +70,111 @@ components: transforms: aggregate: { input: [ { metric: { - kind: "incremental" - name: "counter.1" + kind: "incremental" + name: "counter.1" timestamp: "2021-07-12T07:58:44.223543Z" - tags: { - host: "my.host.com" - } - counter: { - value: 1.1 - } + tags: { + host: "my.host.com" + } + counter: { + value: 1.1 + } } }, { metric: { - kind: "incremental" - name: "counter.1" + kind: "incremental" + name: "counter.1" timestamp: "2021-07-12T07:58:45.223543Z" - tags: { - host: "my.host.com" - } - counter: { - value: 2.2 - } + tags: { + host: "my.host.com" + } + counter: { + value: 2.2 + } } }, { metric: { - kind: "incremental" - name: "counter.1" + kind: "incremental" + name: "counter.1" timestamp: "2021-07-12T07:58:45.223543Z" - tags: { - host: "different.host.com" - } - counter: { - value: 1.1 - } + tags: { + host: "different.host.com" + } + counter: { + value: 1.1 + } } }, { metric: { - kind: "absolute" - name: "guage.1" + kind: "absolute" + name: "guage.1" timestamp: "2021-07-12T07:58:47.223543Z" - tags: { - host: "my.host.com" - } - counter: { - value: 22.33 - } + tags: { + host: "my.host.com" + } + counter: { + value: 22.33 + } } }, { metric: { - kind: "absolute" - name: "guage.1" + kind: "absolute" + name: "guage.1" timestamp: "2021-07-12T07:58:45.223543Z" - tags: { - host: "my.host.com" - } - counter: { - value: 44.55 - } + tags: { + host: "my.host.com" + } + counter: { + value: 44.55 + } } }, ] configuration: { - interval_ms: 5000 + interval_ms: 5000 } output: [ { metric: { - kind: "incremental" - name: "counter.1" + kind: "incremental" + name: "counter.1" timestamp: "2021-07-12T07:58:45.223543Z" - tags: { - host: "my.host.com" - } - counter: { - value: 3.3 - } + tags: { + host: "my.host.com" + } + counter: { + value: 3.3 + } } }, { metric: { - kind: "incremental" - name: "counter.1" + kind: "incremental" + name: "counter.1" timestamp: "2021-07-12T07:58:45.223543Z" - tags: { - host: "different.host.com" - } - counter: { - value: 1.1 - } + tags: { + host: "different.host.com" + } + counter: { + value: 1.1 + } } }, { metric: { - kind: "absolute" - name: "guage.1" + kind: "absolute" + name: "guage.1" timestamp: "2021-07-12T07:58:45.223543Z" - tags: { - host: "my.host.com" - } - counter: { - value: 44.55 - } + tags: { + host: "my.host.com" + } + counter: { + value: 44.55 + } } }, ] From 9f3c2b9d5f66308b33a37b55072cd43ad9ea276f Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Mon, 14 Jun 2021 10:27:38 -0700 Subject: [PATCH 09/35] Remove , and fix tab/spaces in aggregate cue doc Signed-off-by: Ross McFarland --- docs/reference/components/transforms/aggregate.cue | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/reference/components/transforms/aggregate.cue b/docs/reference/components/transforms/aggregate.cue index 4a82f4a1ca696..b300efe433d4e 100644 --- a/docs/reference/components/transforms/aggregate.cue +++ b/docs/reference/components/transforms/aggregate.cue @@ -53,7 +53,7 @@ components: transforms: aggregate: { } input: { - logs: false, + logs: false metrics: { counter: true distribution: true @@ -182,7 +182,7 @@ components: transforms: aggregate: { ] telemetry: metrics: { - events_recorded_total: components.sources.internal_metrics.output.metrics.events_recorded_total - flushed_total: components.sources.internal_metrics.output.metrics.flushed_total + events_recorded_total: components.sources.internal_metrics.output.metrics.events_recorded_total + flushed_total: components.sources.internal_metrics.output.metrics.flushed_total } } From e15996118b00eef7225bb26e3a98b669cfb94243 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Mon, 14 Jun 2021 13:34:36 -0700 Subject: [PATCH 10/35] Fix aggregate/add_tags copy-n-paste error Signed-off-by: Ross McFarland --- docs/manual/setup/installation/manual/from-source.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/manual/setup/installation/manual/from-source.md b/docs/manual/setup/installation/manual/from-source.md index b7e61a3e3066e..806703f7cedb3 100644 --- a/docs/manual/setup/installation/manual/from-source.md +++ b/docs/manual/setup/installation/manual/from-source.md @@ -400,7 +400,7 @@ features one has to pass a comma-separated list of component features. | `sources-vector` | Enables building of [`vector` source][docs.sources.vector]. | | `transforms-add_fields` | Enables building of [`add_fields` transform][docs.transforms.add_fields]. | | `transforms-add_tags` | Enables building of [`add_tags` transform][docs.transforms.add_tags]. | -| `transforms-aggregate` | Enables building of [`add_tags` transform][docs.transforms.add_tags]. | +| `transforms-aggregate` | Enables building of [`aggregate` transform][docs.transforms.aggregate]. | | `transforms-ansi_stripper` | Enables building of [`ansi_stripper` transform][docs.transforms.ansi_stripper]. | | `transforms-aws_cloudwatch_logs_subscription_parser` | Enables building of [`aws_cloudwatch_logs_subscription_parser` transform][docs.transforms.aws_cloudwatch_logs_subscription_parser]. | | `transforms-aws_ec2_metadata` | Enables building of [`aws_ec2_metadata` transform][docs.transforms.aws_ec2_metadata]. | From f3118acedada401eb4749d053d8cb209c377d513 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Mon, 14 Jun 2021 13:37:18 -0700 Subject: [PATCH 11/35] More aggregate cue fixes, this time from CI=true make check-docs Signed-off-by: Ross McFarland --- docs/reference/components/transforms/aggregate.cue | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/reference/components/transforms/aggregate.cue b/docs/reference/components/transforms/aggregate.cue index b300efe433d4e..40f660ed302d0 100644 --- a/docs/reference/components/transforms/aggregate.cue +++ b/docs/reference/components/transforms/aggregate.cue @@ -53,7 +53,7 @@ components: transforms: aggregate: { } input: { - logs: false + logs: false metrics: { counter: true distribution: true @@ -183,6 +183,6 @@ components: transforms: aggregate: { telemetry: metrics: { events_recorded_total: components.sources.internal_metrics.output.metrics.events_recorded_total - flushed_total: components.sources.internal_metrics.output.metrics.flushed_total + flushed_total: components.sources.internal_metrics.output.metrics.flushed_total } } From 1da1e06ba62e0b97200946bfbb3864e1b9416a7d Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Tue, 15 Jun 2021 10:50:33 -0700 Subject: [PATCH 12/35] Use tokio::time::advance to force interval in aggregate test Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index e0b430f53057e..3cb110117a71c 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -325,7 +325,7 @@ mod tests { async fn transform() { let agg = toml::from_str::( r#" -interval_ms = 10 +interval_ms = 10000 "#, ) .unwrap() @@ -351,6 +351,11 @@ interval_ms = 10 let in_stream = Box::pin(stream::iter(inputs)); let mut out_stream = agg.transform(in_stream); + tokio::time::pause(); + // 1s longer than our timeout configured up above + tokio::time::advance(Duration::from_secs(11)).await; + tokio::time::resume(); + let mut count = 0; while let Some(event) = out_stream.next().await { count += 1; From dda9ab6ff7649f5504f47bb9b8405e71177fda8b Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Tue, 15 Jun 2021 13:30:13 -0700 Subject: [PATCH 13/35] aggregate single map Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 27 ++++----------------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 3cb110117a71c..74a66c813210d 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -59,18 +59,14 @@ impl TransformConfig for AggregateConfig { #[derive(Debug)] pub struct Aggregate { interval: Duration, - map_a: HashMap, - map_b: HashMap, - using_b: bool, + map: HashMap, } impl Aggregate { pub fn new(config: &AggregateConfig) -> crate::Result { Ok(Self { interval: Duration::from_millis(config.interval_ms.unwrap_or(10 * 1000)), - map_a: HashMap::new(), - map_b: HashMap::new(), - using_b: false, + map: HashMap::new(), }) } @@ -78,11 +74,7 @@ impl Aggregate { let metric = event.as_metric(); let series = metric.series(); let data = metric.data(); - - let map = match self.using_b { - true => &mut self.map_b, - false => &mut self.map_a, - }; + let map = &mut self.map; match data.kind { metric::MetricKind::Incremental => { @@ -107,18 +99,7 @@ impl Aggregate { fn flush_into(&mut self, output: &mut Vec) -> u64 { let mut count = 0_u64; - - // TODO: locking/safety etc... we can also only have 1 call into flush at a time - let map = match self.using_b { - true => { - self.using_b = false; - &mut self.map_b - }, - false => { - self.using_b = true; - &mut self.map_a - }, - }; + let map = &mut self.map; if map.len() > 0 { // TODO: not clear how this should work with aggregation so just stuffing a default one From 7b73e64ee4389686800872cfac060ecfeffac849 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Tue, 15 Jun 2021 18:07:04 -0700 Subject: [PATCH 14/35] Full testing of aggregate flushing via shutdown and tick Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 107 ++++++++++++++++++++++++++++++++---- 1 file changed, 97 insertions(+), 10 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 74a66c813210d..0287d148cb4bc 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -166,8 +166,13 @@ impl TaskTransform for Aggregate { #[cfg(test)] mod tests { use super::*; - use std::collections::BTreeMap; use crate::{event::metric, event::Event, event::Metric}; + use futures::SinkExt; + use std::{ + collections::BTreeMap, + task::Poll, + }; + use tokio::task::yield_now; #[test] fn generate_config() { @@ -303,16 +308,17 @@ mod tests { } #[tokio::test] - async fn transform() { + async fn transform_shutdown() { let agg = toml::from_str::( r#" -interval_ms = 10000 +interval_ms = 999999 "#, ) .unwrap() .build(&GlobalOptions::default()) .await .unwrap(); + let agg = agg.into_task(); let tags: BTreeMap = @@ -329,15 +335,15 @@ interval_ms = 10000 metric::MetricValue::Gauge { value: 43.0 }, tags.clone()); let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()]; + // Queue up some events to be consummed & recorded let in_stream = Box::pin(stream::iter(inputs)); + // Kick off the transform process which should consume & record them let mut out_stream = agg.transform(in_stream); - tokio::time::pause(); - // 1s longer than our timeout configured up above - tokio::time::advance(Duration::from_secs(11)).await; - tokio::time::resume(); - - let mut count = 0; + // B/c the input stream has ended we will have gone through the `input_rx.next() => None` + // part of the loop and do the shutting down final flush immediately. We'll already be able + // to read our expected bits on the output. + let mut count = 0_u8; while let Some(event) = out_stream.next().await { count += 1; match event.as_metric().series().name.name.as_str() { @@ -346,8 +352,89 @@ interval_ms = 10000 _ => assert!(false), }; } - // There were only 2 assert_eq!(2, count); } + + #[tokio::test] + async fn transform_interval() { + let agg = toml::from_str::( + r#" +interval_ms = 10000 +"#, + ) + .unwrap() + .build(&GlobalOptions::default()) + .await + .unwrap(); + + let agg = agg.into_task(); + + let tags: BTreeMap = + vec![("tag1".into(), "val1".into())].into_iter().collect(); + let counter_a_1 = make_metric("counter_a", metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 42.0 }, tags.clone()); + let counter_a_2 = make_metric("counter_a", metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 43.0 }, tags.clone()); + let counter_a_summed = make_metric("counter_a", metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 85.0 }, tags.clone()); + let gauge_a_1 = make_metric("gauge_a", metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 42.0 }, tags.clone()); + let gauge_a_2 = make_metric("gauge_a", metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 43.0 }, tags.clone()); + + let (mut tx, rx) = futures::channel::mpsc::channel(10); + let mut out_stream = agg.transform(Box::pin(rx)); + + // Don't advance time + tokio::time::pause(); + + // Yeild so our first (at t0) tick can happen and see nothing + yield_now().await; + + // Send our events + tx.send(counter_a_1.into()).await.unwrap(); + tx.send(counter_a_2.into()).await.unwrap(); + tx.send(gauge_a_1.into()).await.unwrap(); + tx.send(gauge_a_2.clone().into()).await.unwrap(); + + // Give things a chance to run, flush shouldn't trigger, but give it an opportunity + yield_now().await; + + // We won't have flushed yet b/c the interval hasn't elapsed, so no outputs + assert_eq!(Poll::Pending, futures::poll!(out_stream.next())); + + // Now fast foward time enough that our flush should triggered. + yield_now().await; + tokio::time::advance(Duration::from_secs(11)).await; + tokio::time::resume(); + yield_now().await; + + // B/c the input stream has ended we will have gone through the `input_rx.next() => None` + // part of the loop and do the shutting down final flush immediately. We'll already be able + // to read our expected bits on the output. + let mut count = 0_u8; + while count < 2 { + if let Some(event) = out_stream.next().await { + match event.as_metric().series().name.name.as_str() { + "counter_a" => assert_eq!(counter_a_summed, event), + "gauge_a" => assert_eq!(gauge_a_2, event), + _ => assert!(false), + }; + count += 1; + } else { + assert!(false); + } + } + + // We should be back to pending, having nothing waiting for us + assert_eq!(Poll::Pending, futures::poll!(out_stream.next())); + + // Close the input stream which should trigger the shutting down flush + assert!(tx.close().await.is_ok()); + // Give the flush a chance to run + yield_now().await; + // And still nothing there + assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next())); + } } From a4363545f74c435e7c8ab0a1168abc819b5fc698 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Wed, 16 Jun 2021 18:25:35 -0700 Subject: [PATCH 15/35] aggregate: remove local map ref vars Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 0287d148cb4bc..18b2acae04c1d 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -74,23 +74,22 @@ impl Aggregate { let metric = event.as_metric(); let series = metric.series(); let data = metric.data(); - let map = &mut self.map; match data.kind { metric::MetricKind::Incremental => { - match map.get_mut(&series) { + match self.map.get_mut(&series) { // We already have something, add to it, will update timestamp as well. Some(existing) => existing.update(data), None => { // New so store - map.insert(series.clone(), data.clone()); + self.map.insert(series.clone(), data.clone()); true } }; }, metric::MetricKind::Absolute => { // Always replace/store - map.insert(series.clone(), data.clone()); + self.map.insert(series.clone(), data.clone()); } }; @@ -99,14 +98,13 @@ impl Aggregate { fn flush_into(&mut self, output: &mut Vec) -> u64 { let mut count = 0_u64; - let map = &mut self.map; - if map.len() > 0 { + if self.map.len() > 0 { // TODO: not clear how this should work with aggregation so just stuffing a default one // in for now. let metadata = EventMetadata::default(); - for (series, metric) in map.drain() { + for (series, metric) in self.map.drain() { let metric = metric::Metric::from_parts(series, metric, metadata.clone()); output.push(Event::Metric(metric)); count += 1; From 2b498bf3e01aff5565e252961c18e09bef6b773d Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Wed, 16 Jun 2021 18:31:06 -0700 Subject: [PATCH 16/35] guage -> gauge Signed-off-by: Ross McFarland --- docs/reference/components/transforms/aggregate.cue | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/reference/components/transforms/aggregate.cue b/docs/reference/components/transforms/aggregate.cue index 40f660ed302d0..77646ce1a80fe 100644 --- a/docs/reference/components/transforms/aggregate.cue +++ b/docs/reference/components/transforms/aggregate.cue @@ -110,7 +110,7 @@ components: transforms: aggregate: { { metric: { kind: "absolute" - name: "guage.1" + name: "gauge.1" timestamp: "2021-07-12T07:58:47.223543Z" tags: { host: "my.host.com" @@ -123,7 +123,7 @@ components: transforms: aggregate: { { metric: { kind: "absolute" - name: "guage.1" + name: "gauge.1" timestamp: "2021-07-12T07:58:45.223543Z" tags: { host: "my.host.com" @@ -167,7 +167,7 @@ components: transforms: aggregate: { { metric: { kind: "absolute" - name: "guage.1" + name: "gauge.1" timestamp: "2021-07-12T07:58:45.223543Z" tags: { host: "my.host.com" From 7b1e99b5e91613645e0a1562847c13e1143b93af Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Wed, 16 Jun 2021 18:33:11 -0700 Subject: [PATCH 17/35] Remove note about build vim var Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 18b2acae04c1d..46baf347b8705 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -1,4 +1,3 @@ -// let g:cargo_makeprg_params = = '--lib --no-default-features --features=transforms-aggregate transforms::aggregate' use crate::{ internal_events::{AggregateEventRecorded, AggregateFlushed}, transforms::{ From a28ab3cf68ede04602e5de6e33a2cc4e61eb447b Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Wed, 16 Jun 2021 18:43:48 -0700 Subject: [PATCH 18/35] Remove yield_now's and other small tweaks to aggregate interval test Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 32 +++++++++----------------------- 1 file changed, 9 insertions(+), 23 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 46baf347b8705..57c1d436b2ba7 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -169,7 +169,6 @@ mod tests { collections::BTreeMap, task::Poll, }; - use tokio::task::yield_now; #[test] fn generate_config() { @@ -383,33 +382,23 @@ interval_ms = 10000 let (mut tx, rx) = futures::channel::mpsc::channel(10); let mut out_stream = agg.transform(Box::pin(rx)); - // Don't advance time tokio::time::pause(); - // Yeild so our first (at t0) tick can happen and see nothing - yield_now().await; + // tokio interval is always immediately ready, so we poll once to make sure + // we trip it/set the interval in the future + assert_eq!(Poll::Pending, futures::poll!(out_stream.next())); - // Send our events + // Now send our events tx.send(counter_a_1.into()).await.unwrap(); tx.send(counter_a_2.into()).await.unwrap(); tx.send(gauge_a_1.into()).await.unwrap(); tx.send(gauge_a_2.clone().into()).await.unwrap(); - - // Give things a chance to run, flush shouldn't trigger, but give it an opportunity - yield_now().await; - // We won't have flushed yet b/c the interval hasn't elapsed, so no outputs assert_eq!(Poll::Pending, futures::poll!(out_stream.next())); - - // Now fast foward time enough that our flush should triggered. - yield_now().await; + // Now fast foward time enough that our flush should trigger. tokio::time::advance(Duration::from_secs(11)).await; - tokio::time::resume(); - yield_now().await; - - // B/c the input stream has ended we will have gone through the `input_rx.next() => None` - // part of the loop and do the shutting down final flush immediately. We'll already be able - // to read our expected bits on the output. + // We should have had an interval fire now and our output aggregate events should be + // available. let mut count = 0_u8; while count < 2 { if let Some(event) = out_stream.next().await { @@ -423,14 +412,11 @@ interval_ms = 10000 assert!(false); } } - // We should be back to pending, having nothing waiting for us assert_eq!(Poll::Pending, futures::poll!(out_stream.next())); - // Close the input stream which should trigger the shutting down flush - assert!(tx.close().await.is_ok()); - // Give the flush a chance to run - yield_now().await; + tx.disconnect(); + // And still nothing there assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next())); } From 9e9e431ec5d532a8101812f6408dff61de47f6a1 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Wed, 16 Jun 2021 18:47:15 -0700 Subject: [PATCH 19/35] aggregate: rework of transform stream loop Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 57c1d436b2ba7..016b5752be8fd 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -131,31 +131,28 @@ impl TaskTransform for Aggregate { Box::pin( stream! { - loop { - let mut output = Vec::new(); - let done = tokio::select! { + let mut output = Vec::new(); + let mut done = false; + while !done { + tokio::select! { _ = flush_stream.tick() => { me.flush_into(&mut output); - false - } + }, maybe_event = input_rx.next() => { match maybe_event { None => { me.flush_into(&mut output); - true - } - Some(event) => { - me.record(event); - false + done = true; } + Some(event) => me.record(event), } } }; - yield stream::iter(output.into_iter()); - if done { break } + for event in output.drain(..) { + yield event; + } } } - .flatten(), ) } } From 233576714075d0f18ec2de5e11f4a90d16d2f7e4 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 18 Jun 2021 14:36:02 -0700 Subject: [PATCH 20/35] Provide serde default for aggregate interval_ms Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 016b5752be8fd..4ab807db7043d 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -24,7 +24,12 @@ use std::{ #[serde(deny_unknown_fields, default)] pub struct AggregateConfig { /// The interval between flushes in milliseconds. - pub interval_ms: Option, + #[serde(default = "default_interval_ms")] + pub interval_ms: u64, +} + +fn default_interval_ms() -> u64 { + 10 * 1000 } inventory::submit! { @@ -64,7 +69,7 @@ pub struct Aggregate { impl Aggregate { pub fn new(config: &AggregateConfig) -> crate::Result { Ok(Self { - interval: Duration::from_millis(config.interval_ms.unwrap_or(10 * 1000)), + interval: Duration::from_millis(config.interval_ms), map: HashMap::new(), }) } @@ -190,7 +195,7 @@ mod tests { #[test] fn incremental() { - let mut agg = Aggregate::new(&AggregateConfig { interval_ms: Some(1000_u64) }).unwrap(); + let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64 }).unwrap(); let tags: BTreeMap = vec![("tag1".into(), "val1".into())].into_iter().collect(); @@ -247,7 +252,7 @@ mod tests { #[test] fn absolute() { - let mut agg = Aggregate::new(&AggregateConfig { interval_ms: Some(1000_u64) }).unwrap(); + let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64 }).unwrap(); let tags: BTreeMap = vec![("tag1".into(), "val1".into())].into_iter().collect(); @@ -353,7 +358,6 @@ interval_ms = 999999 async fn transform_interval() { let agg = toml::from_str::( r#" -interval_ms = 10000 "#, ) .unwrap() From 62f3c7a863d85bbc2a3c6e2a6fc67a07156b4d9c Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 18 Jun 2021 15:50:25 -0700 Subject: [PATCH 21/35] aggregate use into_metric and into_parts to avoid clones Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 4ab807db7043d..2ad95f2383b50 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -75,25 +75,23 @@ impl Aggregate { } fn record(&mut self, event: Event) { - let metric = event.as_metric(); - let series = metric.series(); - let data = metric.data(); + let (series, data, _metadata) = event.into_metric().into_parts(); match data.kind { metric::MetricKind::Incremental => { match self.map.get_mut(&series) { // We already have something, add to it, will update timestamp as well. - Some(existing) => existing.update(data), + Some(existing) => existing.update(&data), None => { // New so store - self.map.insert(series.clone(), data.clone()); + self.map.insert(series, data); true } }; }, metric::MetricKind::Absolute => { // Always replace/store - self.map.insert(series.clone(), data.clone()); + self.map.insert(series, data); } }; From c57695c50919a3176d9c1c4d3bf55b8cdce7412c Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 18 Jun 2021 15:56:54 -0700 Subject: [PATCH 22/35] Rework aggregate description and add how_it_works section with details. Fix internal events. Signed-off-by: Ross McFarland --- docs/reference/components.cue | 22 ++++++----- .../components/sources/internal_metrics.cue | 12 ++++++ .../components/transforms/aggregate.cue | 38 ++++++++++++++++--- src/internal_events/aggregate.rs | 4 +- 4 files changed, 60 insertions(+), 16 deletions(-) diff --git a/docs/reference/components.cue b/docs/reference/components.cue index 297673dd78fe9..660d8c5216cce 100644 --- a/docs/reference/components.cue +++ b/docs/reference/components.cue @@ -173,15 +173,16 @@ components: { } if Args.kind == "transform" { - convert?: #FeaturesConvert - enrich?: #FeaturesEnrich - filter?: #FeaturesFilter - parse?: #FeaturesParse - program?: #FeaturesProgram - reduce?: #FeaturesReduce - route?: #FeaturesRoute - sanitize?: #FeaturesSanitize - shape?: #FeaturesShape + aggregate?: #FeaturesAggregate + convert?: #FeaturesConvert + enrich?: #FeaturesEnrich + filter?: #FeaturesFilter + parse?: #FeaturesParse + program?: #FeaturesProgram + reduce?: #FeaturesReduce + route?: #FeaturesRoute + sanitize?: #FeaturesSanitize + shape?: #FeaturesShape } if Args.kind == "sink" { @@ -202,6 +203,9 @@ components: { descriptions: [Name=string]: string } + #FeaturesAggregate: { + } + #FeaturesCollect: { checkpoint: { enabled: bool diff --git a/docs/reference/components/sources/internal_metrics.cue b/docs/reference/components/sources/internal_metrics.cue index ec4be638b28c0..f45ecf3243235 100644 --- a/docs/reference/components/sources/internal_metrics.cue +++ b/docs/reference/components/sources/internal_metrics.cue @@ -73,6 +73,18 @@ components: sources: internal_metrics: { } // Instance-level "process" metrics + aggregate_events_recorded_total: { + description: "The number of events recorded by the aggregate transform." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } + aggregate_flushes_total: { + description: "The number of flushes done by the aggregate transform." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } api_started_total: { description: "The number of times the Vector GraphQL API has been started." type: "counter" diff --git a/docs/reference/components/transforms/aggregate.cue b/docs/reference/components/transforms/aggregate.cue index 77646ce1a80fe..a825348c3cc9b 100644 --- a/docs/reference/components/transforms/aggregate.cue +++ b/docs/reference/components/transforms/aggregate.cue @@ -4,9 +4,9 @@ components: transforms: aggregate: { title: "Aggregate" description: """ - Aggregates multiple metric events into a single metric event based on a - the MetricKind. Incremental metrics are "added", Absolute uses last - value wins semantics. + Aggregates multiple metric events into a single metric event based + on a defined interval window. This helps to reduce metric volume at + the cost of granularity. """ classes: { @@ -181,8 +181,36 @@ components: transforms: aggregate: { }, ] + how_it_works: { + aggregation_behavior: { + title: "Aggregation Behavior" + body: """ + Metrics are aggregated based their kind. During an interval `incremental` metrics + are "added" and `absolute` metrics use most recent value wins semantics. This results in a reduction + of volume, less granularity, while maintaining numerical correctness. As an example two + `incremental` `counter`s with values 10 and 13 processed by the transform during a period would be + aggregated into a single `incremental` `counter` with a value of 23. Two `absolute` `gauge`s with + values 93 and 95 would result in a single `absolute` `gauge` with the value of 95. More complex + types like `distribution`, `histogram`, `set`, and `summary` behave similarly with `incremental` + values being combined in a manner that makes sense based on their type. + """ + } + + advantages: { + title: "Advantages of Use" + body: """ + The major advantage to aggregation is the reduction of volume. It may reduce costs + directly in situations that charge by metric event volume, or indirectly by requiring less CPU to + process and/or less network bandwidth to transmit and receive. In systems that are constrained by + the processing required to ingest metric events it may help to reduce the processing overhead. This + may apply to transforms and sinks downstream of the aggregate transform as well. + """ + } + + } + telemetry: metrics: { - events_recorded_total: components.sources.internal_metrics.output.metrics.events_recorded_total - flushed_total: components.sources.internal_metrics.output.metrics.flushed_total + aggregate_events_recorded_total: components.sources.internal_metrics.output.metrics.aggregate_events_recorded_total + aggregate_flushes_total: components.sources.internal_metrics.output.metrics.aggregate_flushes_total } } diff --git a/src/internal_events/aggregate.rs b/src/internal_events/aggregate.rs index 2e720715683a6..3826623dfc61d 100644 --- a/src/internal_events/aggregate.rs +++ b/src/internal_events/aggregate.rs @@ -6,7 +6,7 @@ pub struct AggregateEventRecorded; impl InternalEvent for AggregateEventRecorded { fn emit_metrics(&self) { - counter!("events_recorded_total", 1); + counter!("aggregate_events_recorded_total", 1); } } @@ -15,6 +15,6 @@ pub struct AggregateFlushed; impl InternalEvent for AggregateFlushed { fn emit_metrics(&self) { - counter!("flushed_total", 1); + counter!("aggregate_flushes_total", 1); } } From c9da9a43ef23e23e52a9a9e047b81519a3d7595b Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 18 Jun 2021 16:20:14 -0700 Subject: [PATCH 23/35] Remove count return from flush_into Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 2ad95f2383b50..baf5d5a1e6582 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -98,9 +98,7 @@ impl Aggregate { emit!(AggregateEventRecorded); } - fn flush_into(&mut self, output: &mut Vec) -> u64 { - let mut count = 0_u64; - + fn flush_into(&mut self, output: &mut Vec) { if self.map.len() > 0 { // TODO: not clear how this should work with aggregation so just stuffing a default one // in for now. @@ -109,12 +107,10 @@ impl Aggregate { for (series, metric) in self.map.drain() { let metric = metric::Metric::from_parts(series, metric, metadata.clone()); output.push(Event::Metric(metric)); - count += 1; } } emit!(AggregateFlushed); - return count; } } @@ -208,25 +204,25 @@ mod tests { agg.record(counter_a_1.clone()); let mut out = vec![]; // We should flush 1 item counter_a_1 - assert_eq!(1, agg.flush_into(&mut out)); + agg.flush_into(&mut out); assert_eq!(1, out.len()); assert_eq!(&counter_a_1, &out[0]); // A subsequent flush doesn't send out anything out.clear(); - assert_eq!(0, agg.flush_into(&mut out)); + agg.flush_into(&mut out); assert_eq!(0, out.len()); // One more just to make sure that we don't re-see from the other buffer out.clear(); - assert_eq!(0, agg.flush_into(&mut out)); + agg.flush_into(&mut out); assert_eq!(0, out.len()); // Two increments with the same series, should sum into 1 agg.record(counter_a_1.clone()); agg.record(counter_a_2.clone()); out.clear(); - assert_eq!(1, agg.flush_into(&mut out)); + agg.flush_into(&mut out); assert_eq!(1, out.len()); assert_eq!(&counter_a_summed, &out[0]); @@ -236,7 +232,7 @@ mod tests { agg.record(counter_a_1.clone()); agg.record(counter_b_1.clone()); out.clear(); - assert_eq!(2, agg.flush_into(&mut out)); + agg.flush_into(&mut out); assert_eq!(2, out.len()); // B/c we don't know the order they'll come back for event in out { @@ -263,25 +259,25 @@ mod tests { agg.record(gauge_a_1.clone()); let mut out = vec![]; // We should flush 1 item gauge_a_1 - assert_eq!(1, agg.flush_into(&mut out)); + agg.flush_into(&mut out); assert_eq!(1, out.len()); assert_eq!(&gauge_a_1, &out[0]); // A subsequent flush doesn't send out anything out.clear(); - assert_eq!(0, agg.flush_into(&mut out)); + agg.flush_into(&mut out); assert_eq!(0, out.len()); // One more just to make sure that we don't re-see from the other buffer out.clear(); - assert_eq!(0, agg.flush_into(&mut out)); + agg.flush_into(&mut out); assert_eq!(0, out.len()); // Two absolutes with the same series, should get the 2nd (last) back. agg.record(gauge_a_1.clone()); agg.record(gauge_a_2.clone()); out.clear(); - assert_eq!(1, agg.flush_into(&mut out)); + agg.flush_into(&mut out); assert_eq!(1, out.len()); assert_eq!(&gauge_a_2, &out[0]); @@ -291,7 +287,7 @@ mod tests { agg.record(gauge_a_1.clone()); agg.record(gauge_b_1.clone()); out.clear(); - assert_eq!(2, agg.flush_into(&mut out)); + agg.flush_into(&mut out); assert_eq!(2, out.len()); // B/c we don't know the order they'll come back for event in out { From 07a9299cc7ae4a560c36029484449eb27c2130a6 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 18 Jun 2021 16:27:51 -0700 Subject: [PATCH 24/35] Ditch me = mut self pattern Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index baf5d5a1e6582..937eb360f87db 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -116,17 +116,13 @@ impl Aggregate { impl TaskTransform for Aggregate { fn transform( - self: Box, + mut self: Box, mut input_rx: Pin + Send>>, ) -> Pin + Send>> where Self: 'static, { - let mut me = self; - - let interval = me.interval; - - let mut flush_stream = tokio::time::interval(interval); + let mut flush_stream = tokio::time::interval(self.interval); Box::pin( stream! { @@ -135,15 +131,15 @@ impl TaskTransform for Aggregate { while !done { tokio::select! { _ = flush_stream.tick() => { - me.flush_into(&mut output); + self.flush_into(&mut output); }, maybe_event = input_rx.next() => { match maybe_event { None => { - me.flush_into(&mut output); + self.flush_into(&mut output); done = true; } - Some(event) => me.record(event), + Some(event) => self.record(event), } } }; From 9651655330c738edf2273a7a5463211b7adce372 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 18 Jun 2021 16:34:11 -0700 Subject: [PATCH 25/35] Remove tags from tests Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 49 ++++++++++++++----------------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 937eb360f87db..0debee6919e79 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -157,10 +157,7 @@ mod tests { use super::*; use crate::{event::metric, event::Event, event::Metric}; use futures::SinkExt; - use std::{ - collections::BTreeMap, - task::Poll, - }; + use std::task::Poll; #[test] fn generate_config() { @@ -171,7 +168,6 @@ mod tests { name: &'static str, kind: metric::MetricKind, value: metric::MetricValue, - tags: BTreeMap, ) -> Event { Event::Metric( Metric::new( @@ -179,7 +175,6 @@ mod tests { kind, value, ) - .with_tags(Some(tags)), ) } @@ -187,14 +182,12 @@ mod tests { fn incremental() { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64 }).unwrap(); - let tags: BTreeMap = - vec![("tag1".into(), "val1".into())].into_iter().collect(); let counter_a_1 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 42.0 }, tags.clone()); + metric::MetricValue::Counter { value: 42.0 }); let counter_a_2 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 43.0 }, tags.clone()); + metric::MetricValue::Counter { value: 43.0 }); let counter_a_summed = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 85.0 }, tags.clone()); + metric::MetricValue::Counter { value: 85.0 }); // Single item, just stored regardless of kind agg.record(counter_a_1.clone()); @@ -223,7 +216,7 @@ mod tests { assert_eq!(&counter_a_summed, &out[0]); let counter_b_1 = make_metric("counter_b", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 44.0 }, tags.clone()); + metric::MetricValue::Counter { value: 44.0 }); // Two increments with the different series, should get each back as-is agg.record(counter_a_1.clone()); agg.record(counter_b_1.clone()); @@ -244,12 +237,10 @@ mod tests { fn absolute() { let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64 }).unwrap(); - let tags: BTreeMap = - vec![("tag1".into(), "val1".into())].into_iter().collect(); let gauge_a_1 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 42.0 }, tags.clone()); + metric::MetricValue::Gauge { value: 42.0 }); let gauge_a_2 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 43.0 }, tags.clone()); + metric::MetricValue::Gauge { value: 43.0 }); // Single item, just stored regardless of kind agg.record(gauge_a_1.clone()); @@ -278,7 +269,7 @@ mod tests { assert_eq!(&gauge_a_2, &out[0]); let gauge_b_1 = make_metric("gauge_b", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 44.0 }, tags.clone()); + metric::MetricValue::Gauge { value: 44.0 }); // Two increments with the different series, should get each back as-is agg.record(gauge_a_1.clone()); agg.record(gauge_b_1.clone()); @@ -309,18 +300,16 @@ interval_ms = 999999 let agg = agg.into_task(); - let tags: BTreeMap = - vec![("tag1".into(), "val1".into())].into_iter().collect(); let counter_a_1 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 42.0 }, tags.clone()); + metric::MetricValue::Counter { value: 42.0 }); let counter_a_2 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 43.0 }, tags.clone()); + metric::MetricValue::Counter { value: 43.0 }); let counter_a_summed = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 85.0 }, tags.clone()); + metric::MetricValue::Counter { value: 85.0 }); let gauge_a_1 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 42.0 }, tags.clone()); + metric::MetricValue::Gauge { value: 42.0 }); let gauge_a_2 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 43.0 }, tags.clone()); + metric::MetricValue::Gauge { value: 43.0 }); let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()]; // Queue up some events to be consummed & recorded @@ -357,18 +346,16 @@ interval_ms = 999999 let agg = agg.into_task(); - let tags: BTreeMap = - vec![("tag1".into(), "val1".into())].into_iter().collect(); let counter_a_1 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 42.0 }, tags.clone()); + metric::MetricValue::Counter { value: 42.0 }); let counter_a_2 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 43.0 }, tags.clone()); + metric::MetricValue::Counter { value: 43.0 }); let counter_a_summed = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 85.0 }, tags.clone()); + metric::MetricValue::Counter { value: 85.0 }); let gauge_a_1 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 42.0 }, tags.clone()); + metric::MetricValue::Gauge { value: 42.0 }); let gauge_a_2 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 43.0 }, tags.clone()); + metric::MetricValue::Gauge { value: 43.0 }); let (mut tx, rx) = futures::channel::mpsc::channel(10); let mut out_stream = agg.transform(Box::pin(rx)); From 1525a45e9b1146cfdeaf15434acbc80957b101a9 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 18 Jun 2021 16:39:18 -0700 Subject: [PATCH 26/35] Use panic\!(message) rather than assert\!(false) in unexpected test situations Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 0debee6919e79..685f2b4e2f3c2 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -228,7 +228,7 @@ mod tests { match event.as_metric().series().name.name.as_str() { "counter_a" => assert_eq!(counter_a_1, event), "counter_b" => assert_eq!(counter_b_1, event), - _ => assert!(false), + _ => panic!("Unexpected metric name in aggregate output"), } } } @@ -281,7 +281,7 @@ mod tests { match event.as_metric().series().name.name.as_str() { "gauge_a" => assert_eq!(gauge_a_1, event), "gauge_b" => assert_eq!(gauge_b_1, event), - _ => assert!(false), + _ => panic!("Unexpected metric name in aggregate output"), } } } @@ -326,7 +326,7 @@ interval_ms = 999999 match event.as_metric().series().name.name.as_str() { "counter_a" => assert_eq!(counter_a_summed, event), "gauge_a" => assert_eq!(gauge_a_2, event), - _ => assert!(false), + _ => panic!("Unexpected metric name in aggregate output"), }; } // There were only 2 @@ -383,11 +383,11 @@ interval_ms = 999999 match event.as_metric().series().name.name.as_str() { "counter_a" => assert_eq!(counter_a_summed, event), "gauge_a" => assert_eq!(gauge_a_2, event), - _ => assert!(false), + _ => panic!("Unexpected metric name in aggregate output"), }; count += 1; } else { - assert!(false); + panic!("Unexpectedly recieved None in output stream"); } } // We should be back to pending, having nothing waiting for us From 6a293feeb29317a26438e6d503ea0b6922162b9b Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 18 Jun 2021 16:51:16 -0700 Subject: [PATCH 27/35] Switch to map entry api and emit metric for failed updates Signed-off-by: Ross McFarland --- .../components/sources/internal_metrics.cue | 6 ++++++ .../components/transforms/aggregate.cue | 1 + src/internal_events/aggregate.rs | 9 +++++++++ src/transforms/aggregate.rs | 17 +++++++---------- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/docs/reference/components/sources/internal_metrics.cue b/docs/reference/components/sources/internal_metrics.cue index f45ecf3243235..5af332f73a1c7 100644 --- a/docs/reference/components/sources/internal_metrics.cue +++ b/docs/reference/components/sources/internal_metrics.cue @@ -79,6 +79,12 @@ components: sources: internal_metrics: { default_namespace: "vector" tags: _component_tags } + aggregate_failed_updates: { + description: "The number of failed metric updates, `incremental` adds, encountered by the aggregate transform." + type: "counter" + default_namespace: "vector" + tags: _component_tags + } aggregate_flushes_total: { description: "The number of flushes done by the aggregate transform." type: "counter" diff --git a/docs/reference/components/transforms/aggregate.cue b/docs/reference/components/transforms/aggregate.cue index a825348c3cc9b..ac3c0b3db6969 100644 --- a/docs/reference/components/transforms/aggregate.cue +++ b/docs/reference/components/transforms/aggregate.cue @@ -211,6 +211,7 @@ components: transforms: aggregate: { telemetry: metrics: { aggregate_events_recorded_total: components.sources.internal_metrics.output.metrics.aggregate_events_recorded_total + aggregate_failed_updates: components.sources.internal_metrics.output.metrics.aggregate_failed_updates aggregate_flushes_total: components.sources.internal_metrics.output.metrics.aggregate_flushes_total } } diff --git a/src/internal_events/aggregate.rs b/src/internal_events/aggregate.rs index 3826623dfc61d..16d0022f7c9d4 100644 --- a/src/internal_events/aggregate.rs +++ b/src/internal_events/aggregate.rs @@ -18,3 +18,12 @@ impl InternalEvent for AggregateFlushed { counter!("aggregate_flushes_total", 1); } } + +#[derive(Debug)] +pub struct AggregateUpdateFailed; + +impl InternalEvent for AggregateUpdateFailed { + fn emit_metrics(&self) { + counter!("aggregate_failed_updates", 1); + } +} diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 685f2b4e2f3c2..a6383d1fd543e 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -1,5 +1,5 @@ use crate::{ - internal_events::{AggregateEventRecorded, AggregateFlushed}, + internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed}, transforms::{ TaskTransform, Transform, @@ -79,15 +79,12 @@ impl Aggregate { match data.kind { metric::MetricKind::Incremental => { - match self.map.get_mut(&series) { - // We already have something, add to it, will update timestamp as well. - Some(existing) => existing.update(&data), - None => { - // New so store - self.map.insert(series, data); - true - } - }; + self.map.entry(series) + .and_modify(|existing| { + if ! existing.update(&data) { + emit!(AggregateUpdateFailed); + } + }).or_insert(data); }, metric::MetricKind::Absolute => { // Always replace/store From 40b6123939b2b0962131812cb8aba035a7354021 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Fri, 18 Jun 2021 17:02:23 -0700 Subject: [PATCH 28/35] Store and use event metadata Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index a6383d1fd543e..47462f5ea0236 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -58,12 +58,14 @@ impl TransformConfig for AggregateConfig { } } +type MetricEntry = (metric::MetricData, EventMetadata); + //------------------------------------------------------------------------------ #[derive(Debug)] pub struct Aggregate { interval: Duration, - map: HashMap, + map: HashMap, } impl Aggregate { @@ -75,20 +77,21 @@ impl Aggregate { } fn record(&mut self, event: Event) { - let (series, data, _metadata) = event.into_metric().into_parts(); + let (series, data, metadata) = event.into_metric().into_parts(); match data.kind { metric::MetricKind::Incremental => { self.map.entry(series) .and_modify(|existing| { - if ! existing.update(&data) { + if ! existing.0.update(&data) { emit!(AggregateUpdateFailed); } - }).or_insert(data); + existing.1.merge(metadata.clone()); + }).or_insert((data, metadata)); }, metric::MetricKind::Absolute => { // Always replace/store - self.map.insert(series, data); + self.map.insert(series, (data, metadata)); } }; @@ -97,12 +100,8 @@ impl Aggregate { fn flush_into(&mut self, output: &mut Vec) { if self.map.len() > 0 { - // TODO: not clear how this should work with aggregation so just stuffing a default one - // in for now. - let metadata = EventMetadata::default(); - - for (series, metric) in self.map.drain() { - let metric = metric::Metric::from_parts(series, metric, metadata.clone()); + for (series, entry) in self.map.drain() { + let metric = metric::Metric::from_parts(series, entry.0, entry.1); output.push(Event::Metric(metric)); } } From fe083c51f6c606fd1921a7fb42df084c75c0ef72 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Tue, 22 Jun 2021 08:49:30 -0700 Subject: [PATCH 29/35] Wording updates and corrections for aggregate doc Signed-off-by: Ross McFarland --- docs/reference/components/transforms/aggregate.cue | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/reference/components/transforms/aggregate.cue b/docs/reference/components/transforms/aggregate.cue index ac3c0b3db6969..b9651419a2551 100644 --- a/docs/reference/components/transforms/aggregate.cue +++ b/docs/reference/components/transforms/aggregate.cue @@ -185,11 +185,11 @@ components: transforms: aggregate: { aggregation_behavior: { title: "Aggregation Behavior" body: """ - Metrics are aggregated based their kind. During an interval `incremental` metrics - are "added" and `absolute` metrics use most recent value wins semantics. This results in a reduction - of volume, less granularity, while maintaining numerical correctness. As an example two - `incremental` `counter`s with values 10 and 13 processed by the transform during a period would be - aggregated into a single `incremental` `counter` with a value of 23. Two `absolute` `gauge`s with + Metrics are aggregated based on their kind. During an interval, `incremental` metrics + are "added" and newer `absolute` metrics replace older ones in the same series. This results in a reduction + of volume and less granularity, while maintaining numerical correctness. As an example, two + `incremental` `counter` metrics with values 10 and 13 processed by the transform during a period would be + aggregated into a single `incremental` `counter` with a value of 23. Two `absolute` `gauge` metrics with values 93 and 95 would result in a single `absolute` `gauge` with the value of 95. More complex types like `distribution`, `histogram`, `set`, and `summary` behave similarly with `incremental` values being combined in a manner that makes sense based on their type. From 8c33d74df0237346c4b6911c1a16ea69a71d0332 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Tue, 22 Jun 2021 08:49:50 -0700 Subject: [PATCH 30/35] Avoid clone in update or insert logic for aggregate Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 47462f5ea0236..dc7bdb2c054bc 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -15,7 +15,7 @@ use async_stream::stream; use futures::{stream, Stream, StreamExt}; use serde::{Deserialize, Serialize}; use std::{ - collections::HashMap, + collections::{hash_map::Entry, HashMap}, pin::Pin, time::{Duration}, }; @@ -81,13 +81,18 @@ impl Aggregate { match data.kind { metric::MetricKind::Incremental => { - self.map.entry(series) - .and_modify(|existing| { + match self.map.entry(series) { + Entry::Occupied(mut entry) => { + let existing = entry.get_mut(); if ! existing.0.update(&data) { emit!(AggregateUpdateFailed); } - existing.1.merge(metadata.clone()); - }).or_insert((data, metadata)); + existing.1.merge(metadata); + } + Entry::Vacant(entry) => { + entry.insert((data, metadata)); + }, + } }, metric::MetricKind::Absolute => { // Always replace/store @@ -99,11 +104,9 @@ impl Aggregate { } fn flush_into(&mut self, output: &mut Vec) { - if self.map.len() > 0 { - for (series, entry) in self.map.drain() { - let metric = metric::Metric::from_parts(series, entry.0, entry.1); - output.push(Event::Metric(metric)); - } + for (series, entry) in self.map.drain() { + let metric = metric::Metric::from_parts(series, entry.0, entry.1); + output.push(Event::Metric(metric)); } emit!(AggregateFlushed); From a027d3f394405cfc2e5ef835a8a09186dd380aab Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Tue, 22 Jun 2021 18:58:56 +0000 Subject: [PATCH 31/35] Move unused import Also rustfmt Signed-off-by: Jesse Szwedko --- src/transforms/aggregate.rs | 214 +++++++++++++++++++++--------------- 1 file changed, 127 insertions(+), 87 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index dc7bdb2c054bc..00d09085903ac 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -1,23 +1,16 @@ use crate::{ - internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed}, - transforms::{ - TaskTransform, - Transform, - }, config::{DataType, GlobalOptions, TransformConfig, TransformDescription}, - event::{ - metric, - Event, - EventMetadata, - }, + event::{metric, Event, EventMetadata}, + internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed}, + transforms::{TaskTransform, Transform}, }; use async_stream::stream; -use futures::{stream, Stream, StreamExt}; +use futures::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; use std::{ collections::{hash_map::Entry, HashMap}, pin::Pin, - time::{Duration}, + time::Duration, }; #[derive(Deserialize, Serialize, Debug, Default, Clone)] @@ -80,18 +73,16 @@ impl Aggregate { let (series, data, metadata) = event.into_metric().into_parts(); match data.kind { - metric::MetricKind::Incremental => { - match self.map.entry(series) { - Entry::Occupied(mut entry) => { - let existing = entry.get_mut(); - if ! existing.0.update(&data) { - emit!(AggregateUpdateFailed); - } - existing.1.merge(metadata); + metric::MetricKind::Incremental => match self.map.entry(series) { + Entry::Occupied(mut entry) => { + let existing = entry.get_mut(); + if !existing.0.update(&data) { + emit!(AggregateUpdateFailed); } - Entry::Vacant(entry) => { - entry.insert((data, metadata)); - }, + existing.1.merge(metadata); + } + Entry::Vacant(entry) => { + entry.insert((data, metadata)); } }, metric::MetricKind::Absolute => { @@ -123,31 +114,29 @@ impl TaskTransform for Aggregate { { let mut flush_stream = tokio::time::interval(self.interval); - Box::pin( - stream! { - let mut output = Vec::new(); - let mut done = false; - while !done { - tokio::select! { - _ = flush_stream.tick() => { - self.flush_into(&mut output); - }, - maybe_event = input_rx.next() => { - match maybe_event { - None => { - self.flush_into(&mut output); - done = true; - } - Some(event) => self.record(event), + Box::pin(stream! { + let mut output = Vec::new(); + let mut done = false; + while !done { + tokio::select! { + _ = flush_stream.tick() => { + self.flush_into(&mut output); + }, + maybe_event = input_rx.next() => { + match maybe_event { + None => { + self.flush_into(&mut output); + done = true; } + Some(event) => self.record(event), } - }; - for event in output.drain(..) { - yield event; } + }; + for event in output.drain(..) { + yield event; } } - ) + }) } } @@ -155,7 +144,7 @@ impl TaskTransform for Aggregate { mod tests { use super::*; use crate::{event::metric, event::Event, event::Metric}; - use futures::SinkExt; + use futures::{stream, SinkExt}; use std::task::Poll; #[test] @@ -168,25 +157,31 @@ mod tests { kind: metric::MetricKind, value: metric::MetricValue, ) -> Event { - Event::Metric( - Metric::new( - name, - kind, - value, - ) - ) + Event::Metric(Metric::new(name, kind, value)) } #[test] fn incremental() { - let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64 }).unwrap(); + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + }) + .unwrap(); - let counter_a_1 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 42.0 }); - let counter_a_2 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 43.0 }); - let counter_a_summed = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 85.0 }); + let counter_a_1 = make_metric( + "counter_a", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 42.0 }, + ); + let counter_a_2 = make_metric( + "counter_a", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 43.0 }, + ); + let counter_a_summed = make_metric( + "counter_a", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 85.0 }, + ); // Single item, just stored regardless of kind agg.record(counter_a_1.clone()); @@ -214,8 +209,11 @@ mod tests { assert_eq!(1, out.len()); assert_eq!(&counter_a_summed, &out[0]); - let counter_b_1 = make_metric("counter_b", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 44.0 }); + let counter_b_1 = make_metric( + "counter_b", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 44.0 }, + ); // Two increments with the different series, should get each back as-is agg.record(counter_a_1.clone()); agg.record(counter_b_1.clone()); @@ -234,12 +232,21 @@ mod tests { #[test] fn absolute() { - let mut agg = Aggregate::new(&AggregateConfig { interval_ms: 1000_u64 }).unwrap(); + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + }) + .unwrap(); - let gauge_a_1 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 42.0 }); - let gauge_a_2 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 43.0 }); + let gauge_a_1 = make_metric( + "gauge_a", + metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 42.0 }, + ); + let gauge_a_2 = make_metric( + "gauge_a", + metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 43.0 }, + ); // Single item, just stored regardless of kind agg.record(gauge_a_1.clone()); @@ -267,8 +274,11 @@ mod tests { assert_eq!(1, out.len()); assert_eq!(&gauge_a_2, &out[0]); - let gauge_b_1 = make_metric("gauge_b", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 44.0 }); + let gauge_b_1 = make_metric( + "gauge_b", + metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 44.0 }, + ); // Two increments with the different series, should get each back as-is agg.record(gauge_a_1.clone()); agg.record(gauge_b_1.clone()); @@ -299,16 +309,31 @@ interval_ms = 999999 let agg = agg.into_task(); - let counter_a_1 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 42.0 }); - let counter_a_2 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 43.0 }); - let counter_a_summed = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 85.0 }); - let gauge_a_1 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 42.0 }); - let gauge_a_2 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 43.0 }); + let counter_a_1 = make_metric( + "counter_a", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 42.0 }, + ); + let counter_a_2 = make_metric( + "counter_a", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 43.0 }, + ); + let counter_a_summed = make_metric( + "counter_a", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 85.0 }, + ); + let gauge_a_1 = make_metric( + "gauge_a", + metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 42.0 }, + ); + let gauge_a_2 = make_metric( + "gauge_a", + metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 43.0 }, + ); let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()]; // Queue up some events to be consummed & recorded @@ -345,16 +370,31 @@ interval_ms = 999999 let agg = agg.into_task(); - let counter_a_1 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 42.0 }); - let counter_a_2 = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 43.0 }); - let counter_a_summed = make_metric("counter_a", metric::MetricKind::Incremental, - metric::MetricValue::Counter { value: 85.0 }); - let gauge_a_1 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 42.0 }); - let gauge_a_2 = make_metric("gauge_a", metric::MetricKind::Absolute, - metric::MetricValue::Gauge { value: 43.0 }); + let counter_a_1 = make_metric( + "counter_a", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 42.0 }, + ); + let counter_a_2 = make_metric( + "counter_a", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 43.0 }, + ); + let counter_a_summed = make_metric( + "counter_a", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 85.0 }, + ); + let gauge_a_1 = make_metric( + "gauge_a", + metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 42.0 }, + ); + let gauge_a_2 = make_metric( + "gauge_a", + metric::MetricKind::Absolute, + metric::MetricValue::Gauge { value: 43.0 }, + ); let (mut tx, rx) = futures::channel::mpsc::channel(10); let mut out_stream = agg.transform(Box::pin(rx)); From 81c754105c3d9dcaa16a3910bcbd2b90880271b8 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Tue, 22 Jun 2021 14:25:41 -0700 Subject: [PATCH 32/35] clippy suggested cleanup for aggregate Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 00d09085903ac..bbab75ca87f3a 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -203,7 +203,7 @@ mod tests { // Two increments with the same series, should sum into 1 agg.record(counter_a_1.clone()); - agg.record(counter_a_2.clone()); + agg.record(counter_a_2); out.clear(); agg.flush_into(&mut out); assert_eq!(1, out.len()); @@ -406,10 +406,10 @@ interval_ms = 999999 assert_eq!(Poll::Pending, futures::poll!(out_stream.next())); // Now send our events - tx.send(counter_a_1.into()).await.unwrap(); - tx.send(counter_a_2.into()).await.unwrap(); - tx.send(gauge_a_1.into()).await.unwrap(); - tx.send(gauge_a_2.clone().into()).await.unwrap(); + tx.send(counter_a_1).await.unwrap(); + tx.send(counter_a_2).await.unwrap(); + tx.send(gauge_a_1).await.unwrap(); + tx.send(gauge_a_2.clone()).await.unwrap(); // We won't have flushed yet b/c the interval hasn't elapsed, so no outputs assert_eq!(Poll::Pending, futures::poll!(out_stream.next())); // Now fast foward time enough that our flush should trigger. From ce95db25b63706ea4310e3e8d25ab3e1495738bc Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Wed, 23 Jun 2021 17:02:56 -0700 Subject: [PATCH 33/35] Correctly handle mis-matching kind's and explicitly test conflicting kind and value types Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 120 +++++++++++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 3 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index bbab75ca87f3a..801d365a0942c 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -76,10 +76,13 @@ impl Aggregate { metric::MetricKind::Incremental => match self.map.entry(series) { Entry::Occupied(mut entry) => { let existing = entry.get_mut(); - if !existing.0.update(&data) { + // In order to update (add) the new and old kind's must match + if existing.0.kind == data.kind && existing.0.update(&data) { + existing.1.merge(metadata); + } else { emit!(AggregateUpdateFailed); + *existing = (data, metadata); } - existing.1.merge(metadata); } Entry::Vacant(entry) => { entry.insert((data, metadata)); @@ -145,7 +148,7 @@ mod tests { use super::*; use crate::{event::metric, event::Event, event::Metric}; use futures::{stream, SinkExt}; - use std::task::Poll; + use std::{collections::BTreeSet, task::Poll}; #[test] fn generate_config() { @@ -295,6 +298,117 @@ mod tests { } } + #[test] + fn conflicting_value_type() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + }) + .unwrap(); + + let counter = make_metric( + "the-thing", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 42.0 }, + ); + let mut values = BTreeSet::::new(); + values.insert("a".into()); + values.insert("b".into()); + let set = make_metric( + "the-thing", + metric::MetricKind::Incremental, + metric::MetricValue::Set { values: values }, + ); + let summed = make_metric( + "the-thing", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 84.0 }, + ); + + // when types conflict the new values replaces whatever is there + + // Start with an counter + agg.record(counter.clone()); + // Another will "add" to it + agg.record(counter.clone()); + // Then an set will replace it due to a failed update + agg.record(set.clone()); + // Then a set union would be a noop + agg.record(set.clone()); + let mut out = vec![]; + // We should flush 1 item counter + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + assert_eq!(&set, &out[0]); + + // Start out with an set + agg.record(set.clone()); + // Union with itself, a noop + agg.record(set.clone()); + // Send an counter with the same name, will replace due to a failed update + agg.record(counter.clone()); + // Send another counter will "add" + agg.record(counter.clone()); + let mut out = vec![]; + // We should flush 1 item counter + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + assert_eq!(&summed, &out[0]); + } + + #[test] + fn conflicting_kinds() { + let mut agg = Aggregate::new(&AggregateConfig { + interval_ms: 1000_u64, + }) + .unwrap(); + + let incremental = make_metric( + "the-thing", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 42.0 }, + ); + let absolute = make_metric( + "the-thing", + metric::MetricKind::Absolute, + metric::MetricValue::Counter { value: 43.0 }, + ); + let summed = make_metric( + "the-thing", + metric::MetricKind::Incremental, + metric::MetricValue::Counter { value: 84.0 }, + ); + + // when types conflict the new values replaces whatever is there + + // Start with an incremental + agg.record(incremental.clone()); + // Another will "add" to it + agg.record(incremental.clone()); + // Then an absolute will replace it with a failed update + agg.record(absolute.clone()); + // Then another absolute will replace it normally + agg.record(absolute.clone()); + let mut out = vec![]; + // We should flush 1 item incremental + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + assert_eq!(&absolute, &out[0]); + + // Start out with an absolute + agg.record(absolute.clone()); + // Replace it normally + agg.record(absolute.clone()); + // Send an incremental with the same name, will replace due to a failed update + agg.record(incremental.clone()); + // Send another incremental will "add" + agg.record(incremental.clone()); + let mut out = vec![]; + // We should flush 1 item incremental + agg.flush_into(&mut out); + assert_eq!(1, out.len()); + assert_eq!(&summed, &out[0]); + } + #[tokio::test] async fn transform_shutdown() { let agg = toml::from_str::( From 8e58c2f12991d9308eb767a8a6b2a816f81da486 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Wed, 23 Jun 2021 21:24:14 -0700 Subject: [PATCH 34/35] make clippy happy Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 801d365a0942c..f7488c1f2d6c9 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -316,7 +316,7 @@ mod tests { let set = make_metric( "the-thing", metric::MetricKind::Incremental, - metric::MetricValue::Set { values: values }, + metric::MetricValue::Set { values }, ); let summed = make_metric( "the-thing", From ce84d30d6074b7d61a46d50cae3f1b1d8aa2ee65 Mon Sep 17 00:00:00 2001 From: Ross McFarland Date: Thu, 24 Jun 2021 10:32:25 -0700 Subject: [PATCH 35/35] More clippy fodder Signed-off-by: Ross McFarland --- src/transforms/aggregate.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index f7488c1f2d6c9..f57253e6b1c39 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -343,11 +343,11 @@ mod tests { // Start out with an set agg.record(set.clone()); // Union with itself, a noop - agg.record(set.clone()); + agg.record(set); // Send an counter with the same name, will replace due to a failed update agg.record(counter.clone()); // Send another counter will "add" - agg.record(counter.clone()); + agg.record(counter); let mut out = vec![]; // We should flush 1 item counter agg.flush_into(&mut out); @@ -397,11 +397,11 @@ mod tests { // Start out with an absolute agg.record(absolute.clone()); // Replace it normally - agg.record(absolute.clone()); + agg.record(absolute); // Send an incremental with the same name, will replace due to a failed update agg.record(incremental.clone()); // Send another incremental will "add" - agg.record(incremental.clone()); + agg.record(incremental); let mut out = vec![]; // We should flush 1 item incremental agg.flush_into(&mut out);