diff --git a/src/internal_events/aggregate.rs b/src/internal_events/aggregate.rs index d68eca2d5d1cbf..2e720715683a62 100644 --- a/src/internal_events/aggregate.rs +++ b/src/internal_events/aggregate.rs @@ -2,10 +2,19 @@ use super::InternalEvent; use metrics::counter; #[derive(Debug)] -pub struct AggregateEventDiscarded; +pub struct AggregateEventRecorded; -impl InternalEvent for AggregateEventDiscarded { +impl InternalEvent for AggregateEventRecorded { fn emit_metrics(&self) { - counter!("events_discarded_total", 1); + counter!("events_recorded_total", 1); + } +} + +#[derive(Debug)] +pub struct AggregateFlushed; + +impl InternalEvent for AggregateFlushed { + fn emit_metrics(&self) { + counter!("flushed_total", 1); } } diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 6e0797fce120d6..f99024b43c9ea5 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -1,6 +1,6 @@ // let g:cargo_makeprg_params = = '--lib --no-default-features --features=transforms-aggregate transforms::aggregate' use crate::{ - internal_events::AggregateEventDiscarded, + internal_events::{AggregateEventRecorded, AggregateFlushed}, transforms::{ TaskTransform, Transform, @@ -53,11 +53,12 @@ impl TransformConfig for AggregateConfig { } } + #[derive(Debug)] pub struct Aggregate { interval: Duration, - map_a: HashMap>, - map_b: HashMap>, + map_a: HashMap, + map_b: HashMap, using_b: bool, } @@ -81,21 +82,29 @@ impl Aggregate { false => &mut self.map_a, }; - match map.get_mut(&series) { - Some(datum) => datum.push(data.clone()), - _ => { - map.insert(series.clone(), vec![data.clone()]); - () + match data.kind { + metric::MetricKind::Incremental => { + match map.get_mut(&series) { + // We already have something, add to it + Some(existing) => existing.add(data), + None => { + // New so store + map.insert(series.clone(), data.clone()); + true + } + }; + }, + metric::MetricKind::Absolute => { + // Always store + map.insert(series.clone(), data.clone()); } }; - // TODO: discarded or recorded? - emit!(AggregateEventDiscarded); + emit!(AggregateEventRecorded); } fn flush_into(&mut self, output: &mut Vec) { - - // TODO: locking/safety etc... + // TODO: locking/safety etc... we can also only have 1 call into flush at a time let map = match self.using_b { true => { self.using_b = false; @@ -111,33 +120,16 @@ impl Aggregate { return } - // TODO: should we preserve one of these overall, per MetricSeries, ??? - // TODO: Doesn't seem like there's a way to combine. + // TODO: not clear how this should work with aggregation so just stuffing a default one in + // for now. let metadata = EventMetadata::default(); - for (series, datas) in map { - if let Some(mut last) = datas.pop() { - // We're assuming each series has a single kind, nothing forces that to be the - // case, but it's not really clear what we'd do if they vary so... - match last.kind { - // Add them up - metric::MetricKind::Incremental => { - for data in datas { - last.add(&data); - } - }, - // Last one wins - metric::MetricKind::Absolute => (), - } - - let metric = metric::Metric::from_parts(series.clone(), last, metadata.clone()); - output.push(Event::Metric(metric)); - } + for (series, metric) in map.drain() { + let metric = metric::Metric::from_parts(series, metric, metadata.clone()); + output.push(Event::Metric(metric)); } - } - fn flush_all_into(&mut self, _output: &mut Vec) { - // TODO? + emit!(AggregateFlushed); } } @@ -167,7 +159,7 @@ impl TaskTransform for Aggregate { maybe_event = input_rx.next() => { match maybe_event { None => { - me.flush_all_into(&mut output); + me.flush_into(&mut output); true } Some(event) => {