Skip to content

Commit

Permalink
Working statsd style aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
ross committed Jun 11, 2021
1 parent 3c122f1 commit 9bdc6e7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 39 deletions.
15 changes: 12 additions & 3 deletions src/internal_events/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@ use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct AggregateEventDiscarded;
pub struct AggregateEventRecorded;

impl InternalEvent for AggregateEventDiscarded {
impl InternalEvent for AggregateEventRecorded {
fn emit_metrics(&self) {
counter!("events_discarded_total", 1);
counter!("events_recorded_total", 1);
}
}

#[derive(Debug)]
pub struct AggregateFlushed;

impl InternalEvent for AggregateFlushed {
fn emit_metrics(&self) {
counter!("flushed_total", 1);
}
}
64 changes: 28 additions & 36 deletions src/transforms/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// let g:cargo_makeprg_params = = '--lib --no-default-features --features=transforms-aggregate transforms::aggregate'
use crate::{
internal_events::AggregateEventDiscarded,
internal_events::{AggregateEventRecorded, AggregateFlushed},
transforms::{
TaskTransform,
Transform,
Expand Down Expand Up @@ -53,11 +53,12 @@ impl TransformConfig for AggregateConfig {
}
}


#[derive(Debug)]
pub struct Aggregate {
interval: Duration,
map_a: HashMap<metric::MetricSeries, Vec<metric::MetricData>>,
map_b: HashMap<metric::MetricSeries, Vec<metric::MetricData>>,
map_a: HashMap<metric::MetricSeries, metric::MetricData>,
map_b: HashMap<metric::MetricSeries, metric::MetricData>,
using_b: bool,
}

Expand All @@ -81,21 +82,29 @@ impl Aggregate {
false => &mut self.map_a,
};

match map.get_mut(&series) {
Some(datum) => datum.push(data.clone()),
_ => {
map.insert(series.clone(), vec![data.clone()]);
()
match data.kind {
metric::MetricKind::Incremental => {
match map.get_mut(&series) {
// We already have something, add to it
Some(existing) => existing.add(data),
None => {
// New so store
map.insert(series.clone(), data.clone());
true
}
};
},
metric::MetricKind::Absolute => {
// Always store
map.insert(series.clone(), data.clone());
}
};

// TODO: discarded or recorded?
emit!(AggregateEventDiscarded);
emit!(AggregateEventRecorded);
}

fn flush_into(&mut self, output: &mut Vec<Event>) {

// TODO: locking/safety etc...
// TODO: locking/safety etc... we can also only have 1 call into flush at a time
let map = match self.using_b {
true => {
self.using_b = false;
Expand All @@ -111,33 +120,16 @@ impl Aggregate {
return
}

// TODO: should we preserve one of these overall, per MetricSeries, ???
// TODO: Doesn't seem like there's a way to combine.
// TODO: not clear how this should work with aggregation so just stuffing a default one in
// for now.
let metadata = EventMetadata::default();

for (series, datas) in map {
if let Some(mut last) = datas.pop() {
// We're assuming each series has a single kind, nothing forces that to be the
// case, but it's not really clear what we'd do if they vary so...
match last.kind {
// Add them up
metric::MetricKind::Incremental => {
for data in datas {
last.add(&data);
}
},
// Last one wins
metric::MetricKind::Absolute => (),
}

let metric = metric::Metric::from_parts(series.clone(), last, metadata.clone());
output.push(Event::Metric(metric));
}
for (series, metric) in map.drain() {
let metric = metric::Metric::from_parts(series, metric, metadata.clone());
output.push(Event::Metric(metric));
}
}

fn flush_all_into(&mut self, _output: &mut Vec<Event>) {
// TODO?
emit!(AggregateFlushed);
}
}

Expand Down Expand Up @@ -167,7 +159,7 @@ impl TaskTransform for Aggregate {
maybe_event = input_rx.next() => {
match maybe_event {
None => {
me.flush_all_into(&mut output);
me.flush_into(&mut output);
true
}
Some(event) => {
Expand Down

0 comments on commit 9bdc6e7

Please sign in to comment.