Skip to content

Commit

Permalink
feat(new transform): Aggregate transform to reduce metric volume main…
Browse files Browse the repository at this point in the history
…taining data (#7846)

* WIP: exploration of aggregate transform

Signed-off-by: Ross McFarland <[email protected]>

* Rework plumbing based on Reduce to allow async/timer generation

Signed-off-by: Ross McFarland <[email protected]>

* Working a/b maps for Aggregate

Signed-off-by: Ross McFarland <[email protected]>

* Working statsd style aggregation

Signed-off-by: Ross McFarland <[email protected]>

* Add tests for aggregate and rework/fix bits found

Signed-off-by: Ross McFarland <[email protected]>

* Use MetricData::update, we already know it's incremental

Signed-off-by: Ross McFarland <[email protected]>

* Documentation for aggregate transform

Signed-off-by: Ross McFarland <[email protected]>

* cue fixes

Signed-off-by: Ross McFarland <[email protected]>

* Remove , and fix tab/spaces in aggregate cue doc

Signed-off-by: Ross McFarland <[email protected]>

* Fix aggregate/add_tags copy-n-paste error

Signed-off-by: Ross McFarland <[email protected]>

* More aggregate cue fixes, this time from CI=true make check-docs

Signed-off-by: Ross McFarland <[email protected]>

* Use tokio::time::advance to force interval in aggregate test

Signed-off-by: Ross McFarland <[email protected]>

* aggregate single map

Signed-off-by: Ross McFarland <[email protected]>

* Full testing of aggregate flushing via shutdown and tick

Signed-off-by: Ross McFarland <[email protected]>

* aggregate: remove local map ref vars

Signed-off-by: Ross McFarland <[email protected]>

* guage -> gauge

Signed-off-by: Ross McFarland <[email protected]>

* Remove note about build vim var

Signed-off-by: Ross McFarland <[email protected]>

* Remove yield_now's and other small tweaks to aggregate interval test

Signed-off-by: Ross McFarland <[email protected]>

* aggregate: rework of transform stream loop

Signed-off-by: Ross McFarland <[email protected]>

* Provide serde default for aggregate interval_ms

Signed-off-by: Ross McFarland <[email protected]>

* aggregate use into_metric and into_parts to avoid clones

Signed-off-by: Ross McFarland <[email protected]>

* Rework aggregate description and add how_it_works section with details. Fix internal events.

Signed-off-by: Ross McFarland <[email protected]>

* Remove count return from flush_into

Signed-off-by: Ross McFarland <[email protected]>

* Ditch me = mut self pattern

Signed-off-by: Ross McFarland <[email protected]>

* Remove tags from tests

Signed-off-by: Ross McFarland <[email protected]>

* Use panic\!(message) rather than assert\!(false) in unexpected test situations

Signed-off-by: Ross McFarland <[email protected]>

* Switch to map entry api and emit metric for failed updates

Signed-off-by: Ross McFarland <[email protected]>

* Store and use event metadata

Signed-off-by: Ross McFarland <[email protected]>

* Wording updates and corrections for aggregate doc

Signed-off-by: Ross McFarland <[email protected]>

* Avoid clone in update or insert logic for aggregate

Signed-off-by: Ross McFarland <[email protected]>

* Move unused import

Also rustfmt

Signed-off-by: Jesse Szwedko <[email protected]>

* clippy suggested cleanup for aggregate

Signed-off-by: Ross McFarland <[email protected]>

* Correctly handle mis-matching kind's and explicitly test conflicting kind and value types

Signed-off-by: Ross McFarland <[email protected]>

* make clippy happy

Signed-off-by: Ross McFarland <[email protected]>

* More clippy fodder

Signed-off-by: Ross McFarland <[email protected]>

Co-authored-by: Jesse Szwedko <[email protected]>
  • Loading branch information
ross and jszwedko authored Jun 25, 2021
1 parent 273e3e1 commit fe84032
Show file tree
Hide file tree
Showing 9 changed files with 839 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ transforms-logs = [
]
transforms-metrics = [
"transforms-add_tags",
"transforms-aggregate",
"transforms-filter",
"transforms-log_to_metric",
"transforms-lua",
Expand All @@ -513,6 +514,7 @@ transforms-metrics = [

transforms-add_fields = []
transforms-add_tags = []
transforms-aggregate = []
transforms-ansi_stripper = []
transforms-aws_cloudwatch_logs_subscription_parser= []
transforms-aws_ec2_metadata = ["evmap"]
Expand Down
2 changes: 2 additions & 0 deletions docs/manual/setup/installation/manual/from-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ features one has to pass a comma-separated list of component features.
| `sources-vector` | Enables building of [`vector` source][docs.sources.vector]. |
| `transforms-add_fields` | Enables building of [`add_fields` transform][docs.transforms.add_fields]. |
| `transforms-add_tags` | Enables building of [`add_tags` transform][docs.transforms.add_tags]. |
| `transforms-aggregate` | Enables building of [`aggregate` transform][docs.transforms.aggregate]. |
| `transforms-ansi_stripper` | Enables building of [`ansi_stripper` transform][docs.transforms.ansi_stripper]. |
| `transforms-aws_cloudwatch_logs_subscription_parser` | Enables building of [`aws_cloudwatch_logs_subscription_parser` transform][docs.transforms.aws_cloudwatch_logs_subscription_parser]. |
| `transforms-aws_ec2_metadata` | Enables building of [`aws_ec2_metadata` transform][docs.transforms.aws_ec2_metadata]. |
Expand Down Expand Up @@ -527,6 +528,7 @@ features one has to pass a comma-separated list of component features.
[docs.sources.vector]: /docs/reference/sources/vector/
[docs.transforms.add_fields]: /docs/reference/transforms/add_fields/
[docs.transforms.add_tags]: /docs/reference/transforms/add_tags/
[docs.transforms.aggregate]: /docs/reference/transforms/aggregate/
[docs.transforms.ansi_stripper]: /docs/reference/transforms/ansi_stripper/
[docs.transforms.aws_cloudwatch_logs_subscription_parser]: /docs/reference/transforms/aws_cloudwatch_logs_subscription_parser/
[docs.transforms.aws_ec2_metadata]: /docs/reference/transforms/aws_ec2_metadata/
Expand Down
22 changes: 13 additions & 9 deletions docs/reference/components.cue
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,16 @@ components: {
}

if Args.kind == "transform" {
convert?: #FeaturesConvert
enrich?: #FeaturesEnrich
filter?: #FeaturesFilter
parse?: #FeaturesParse
program?: #FeaturesProgram
reduce?: #FeaturesReduce
route?: #FeaturesRoute
sanitize?: #FeaturesSanitize
shape?: #FeaturesShape
aggregate?: #FeaturesAggregate
convert?: #FeaturesConvert
enrich?: #FeaturesEnrich
filter?: #FeaturesFilter
parse?: #FeaturesParse
program?: #FeaturesProgram
reduce?: #FeaturesReduce
route?: #FeaturesRoute
sanitize?: #FeaturesSanitize
shape?: #FeaturesShape
}

if Args.kind == "sink" {
Expand All @@ -202,6 +203,9 @@ components: {
descriptions: [Name=string]: string
}

#FeaturesAggregate: {
}

#FeaturesCollect: {
checkpoint: {
enabled: bool
Expand Down
18 changes: 18 additions & 0 deletions docs/reference/components/sources/internal_metrics.cue
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ components: sources: internal_metrics: {
}

// Instance-level "process" metrics
aggregate_events_recorded_total: {
description: "The number of events recorded by the aggregate transform."
type: "counter"
default_namespace: "vector"
tags: _component_tags
}
aggregate_failed_updates: {
description: "The number of failed metric updates, `incremental` adds, encountered by the aggregate transform."
type: "counter"
default_namespace: "vector"
tags: _component_tags
}
aggregate_flushes_total: {
description: "The number of flushes done by the aggregate transform."
type: "counter"
default_namespace: "vector"
tags: _component_tags
}
api_started_total: {
description: "The number of times the Vector GraphQL API has been started."
type: "counter"
Expand Down
217 changes: 217 additions & 0 deletions docs/reference/components/transforms/aggregate.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package metadata

components: transforms: aggregate: {
title: "Aggregate"

description: """
Aggregates multiple metric events into a single metric event based
on a defined interval window. This helps to reduce metric volume at
the cost of granularity.
"""

classes: {
commonly_used: false
development: "beta"
egress_method: "stream"
stateful: true
}

features: {
aggregate: {}
}

support: {
targets: {
"aarch64-unknown-linux-gnu": true
"aarch64-unknown-linux-musl": true
"armv7-unknown-linux-gnueabihf": true
"armv7-unknown-linux-musleabihf": true
"x86_64-apple-darwin": true
"x86_64-pc-windows-msv": true
"x86_64-unknown-linux-gnu": true
"x86_64-unknown-linux-musl": true
}
requirements: []
warnings: []
notices: []
}

configuration: {
interval_ms: {
common: true
description: """
The interval over which metrics are aggregated in milliseconds. Over this period metrics with the
same series data (name, namespace, tags, ...) will be aggregated.
"""
required: false
warnings: []
type: uint: {
default: 10000
unit: "milliseconds"
}
}
}

input: {
logs: false
metrics: {
counter: true
distribution: true
gauge: true
histogram: true
set: true
summary: true
}
}

examples: [
{
title: "Aggregate over 15 seconds"
input: [
{
metric: {
kind: "incremental"
name: "counter.1"
timestamp: "2021-07-12T07:58:44.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 1.1
}
}
},
{
metric: {
kind: "incremental"
name: "counter.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 2.2
}
}
},
{
metric: {
kind: "incremental"
name: "counter.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "different.host.com"
}
counter: {
value: 1.1
}
}
},
{
metric: {
kind: "absolute"
name: "gauge.1"
timestamp: "2021-07-12T07:58:47.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 22.33
}
}
},
{
metric: {
kind: "absolute"
name: "gauge.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 44.55
}
}
},
]
configuration: {
interval_ms: 5000
}
output: [
{
metric: {
kind: "incremental"
name: "counter.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 3.3
}
}
},
{
metric: {
kind: "incremental"
name: "counter.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "different.host.com"
}
counter: {
value: 1.1
}
}
},
{
metric: {
kind: "absolute"
name: "gauge.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 44.55
}
}
},
]
},
]

how_it_works: {
aggregation_behavior: {
title: "Aggregation Behavior"
body: """
Metrics are aggregated based on their kind. During an interval, `incremental` metrics
are "added" and newer `absolute` metrics replace older ones in the same series. This results in a reduction
of volume and less granularity, while maintaining numerical correctness. As an example, two
`incremental` `counter` metrics with values 10 and 13 processed by the transform during a period would be
aggregated into a single `incremental` `counter` with a value of 23. Two `absolute` `gauge` metrics with
values 93 and 95 would result in a single `absolute` `gauge` with the value of 95. More complex
types like `distribution`, `histogram`, `set`, and `summary` behave similarly with `incremental`
values being combined in a manner that makes sense based on their type.
"""
}

advantages: {
title: "Advantages of Use"
body: """
The major advantage to aggregation is the reduction of volume. It may reduce costs
directly in situations that charge by metric event volume, or indirectly by requiring less CPU to
process and/or less network bandwidth to transmit and receive. In systems that are constrained by
the processing required to ingest metric events it may help to reduce the processing overhead. This
may apply to transforms and sinks downstream of the aggregate transform as well.
"""
}

}

telemetry: metrics: {
aggregate_events_recorded_total: components.sources.internal_metrics.output.metrics.aggregate_events_recorded_total
aggregate_failed_updates: components.sources.internal_metrics.output.metrics.aggregate_failed_updates
aggregate_flushes_total: components.sources.internal_metrics.output.metrics.aggregate_flushes_total
}
}
29 changes: 29 additions & 0 deletions src/internal_events/aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct AggregateEventRecorded;

impl InternalEvent for AggregateEventRecorded {
fn emit_metrics(&self) {
counter!("aggregate_events_recorded_total", 1);
}
}

#[derive(Debug)]
pub struct AggregateFlushed;

impl InternalEvent for AggregateFlushed {
fn emit_metrics(&self) {
counter!("aggregate_flushes_total", 1);
}
}

#[derive(Debug)]
pub struct AggregateUpdateFailed;

impl InternalEvent for AggregateUpdateFailed {
fn emit_metrics(&self) {
counter!("aggregate_failed_updates", 1);
}
}
2 changes: 2 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::borrow::Cow;
mod adaptive_concurrency;
mod add_fields;
mod add_tags;
mod aggregate;
mod ansi_stripper;
#[cfg(feature = "sources-apache_metrics")]
mod apache_metrics;
Expand Down Expand Up @@ -135,6 +136,7 @@ pub mod kubernetes;
pub use self::adaptive_concurrency::*;
pub use self::add_fields::*;
pub use self::add_tags::*;
pub use self::aggregate::*;
pub use self::ansi_stripper::*;
#[cfg(feature = "sources-apache_metrics")]
pub use self::apache_metrics::*;
Expand Down
Loading

0 comments on commit fe84032

Please sign in to comment.