Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new transform): Aggregate transform to reduce metric volume maintaining data #7846

Merged
merged 36 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c7556ac
WIP: exploration of aggregate transform
ross Jun 11, 2021
6b5126f
Rework plumbing based on Reduce to allow async/timer generation
ross Jun 11, 2021
3475e46
Working a/b maps for Aggregate
ross Jun 11, 2021
f826e3a
Working statsd style aggregation
ross Jun 11, 2021
e1c6e08
Add tests for aggregate and rework/fix bits found
ross Jun 12, 2021
c135e8e
Use MetricData::update, we already know it's incremental
ross Jun 12, 2021
010d9d6
Documentation for aggregate transform
ross Jun 12, 2021
35cdb39
cue fixes
ross Jun 13, 2021
9f3c2b9
Remove , and fix tab/spaces in aggregate cue doc
ross Jun 14, 2021
e159961
Fix aggregate/add_tags copy-n-paste error
ross Jun 14, 2021
f3118ac
More aggregate cue fixes, this time from CI=true make check-docs
ross Jun 14, 2021
1da1e06
Use tokio::time::advance to force interval in aggregate test
ross Jun 15, 2021
dda9ab6
aggregate single map
ross Jun 15, 2021
7b73e64
Full testing of aggregate flushing via shutdown and tick
ross Jun 16, 2021
314f021
Merge branch 'master' into transforms-aggregate
ross Jun 16, 2021
a436354
aggregate: remove local map ref vars
ross Jun 17, 2021
2b498bf
guage -> gauge
ross Jun 17, 2021
7b1e99b
Remove note about build vim var
ross Jun 17, 2021
a28ab3c
Remove yield_now's and other small tweaks to aggregate interval test
ross Jun 17, 2021
9e9e431
aggregate: rework of transform stream loop
ross Jun 17, 2021
2335767
Provide serde default for aggregate interval_ms
ross Jun 18, 2021
62f3c7a
aggregate use into_metric and into_parts to avoid clones
ross Jun 18, 2021
c57695c
Rework aggregate description and add how_it_works section with detail…
ross Jun 18, 2021
c9da9a4
Remove count return from flush_into
ross Jun 18, 2021
07a9299
Ditch me = mut self pattern
ross Jun 18, 2021
9651655
Remove tags from tests
ross Jun 18, 2021
1525a45
Use panic\!(message) rather than assert\!(false) in unexpected test s…
ross Jun 18, 2021
6a293fe
Switch to map entry api and emit metric for failed updates
ross Jun 18, 2021
40b6123
Store and use event metadata
ross Jun 19, 2021
fe083c5
Wording updates and corrections for aggregate doc
ross Jun 22, 2021
8c33d74
Avoid clone in update or insert logic for aggregate
ross Jun 22, 2021
a027d3f
Move unused import
jszwedko Jun 22, 2021
81c7541
clippy suggested cleanup for aggregate
ross Jun 22, 2021
ce95db2
Correctly handle mis-matching kind's and explicitly test conflicting …
ross Jun 24, 2021
8e58c2f
make clippy happy
ross Jun 24, 2021
ce84d30
More clippy fodder
ross Jun 24, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ transforms-logs = [
]
transforms-metrics = [
"transforms-add_tags",
"transforms-aggregate",
bruceg marked this conversation as resolved.
Show resolved Hide resolved
"transforms-filter",
"transforms-log_to_metric",
"transforms-lua",
Expand All @@ -496,6 +497,7 @@ transforms-metrics = [

transforms-add_fields = []
transforms-add_tags = []
transforms-aggregate = []
transforms-ansi_stripper = []
transforms-aws_cloudwatch_logs_subscription_parser= []
transforms-aws_ec2_metadata = ["evmap"]
Expand Down
2 changes: 2 additions & 0 deletions docs/manual/setup/installation/manual/from-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ features one has to pass a comma-separated list of component features.
| `sources-vector` | Enables building of [`vector` source][docs.sources.vector]. |
| `transforms-add_fields` | Enables building of [`add_fields` transform][docs.transforms.add_fields]. |
| `transforms-add_tags` | Enables building of [`add_tags` transform][docs.transforms.add_tags]. |
| `transforms-aggregate` | Enables building of [`aggregate` transform][docs.transforms.aggregate]. |
| `transforms-ansi_stripper` | Enables building of [`ansi_stripper` transform][docs.transforms.ansi_stripper]. |
| `transforms-aws_cloudwatch_logs_subscription_parser` | Enables building of [`aws_cloudwatch_logs_subscription_parser` transform][docs.transforms.aws_cloudwatch_logs_subscription_parser]. |
| `transforms-aws_ec2_metadata` | Enables building of [`aws_ec2_metadata` transform][docs.transforms.aws_ec2_metadata]. |
Expand Down Expand Up @@ -527,6 +528,7 @@ features one has to pass a comma-separated list of component features.
[docs.sources.vector]: /docs/reference/sources/vector/
[docs.transforms.add_fields]: /docs/reference/transforms/add_fields/
[docs.transforms.add_tags]: /docs/reference/transforms/add_tags/
[docs.transforms.aggregate]: /docs/reference/transforms/aggregate/
[docs.transforms.ansi_stripper]: /docs/reference/transforms/ansi_stripper/
[docs.transforms.aws_cloudwatch_logs_subscription_parser]: /docs/reference/transforms/aws_cloudwatch_logs_subscription_parser/
[docs.transforms.aws_ec2_metadata]: /docs/reference/transforms/aws_ec2_metadata/
Expand Down
22 changes: 13 additions & 9 deletions docs/reference/components.cue
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,16 @@ components: {
}

if Args.kind == "transform" {
convert?: #FeaturesConvert
enrich?: #FeaturesEnrich
filter?: #FeaturesFilter
parse?: #FeaturesParse
program?: #FeaturesProgram
reduce?: #FeaturesReduce
route?: #FeaturesRoute
sanitize?: #FeaturesSanitize
shape?: #FeaturesShape
aggregate?: #FeaturesAggregate
convert?: #FeaturesConvert
enrich?: #FeaturesEnrich
filter?: #FeaturesFilter
parse?: #FeaturesParse
program?: #FeaturesProgram
reduce?: #FeaturesReduce
route?: #FeaturesRoute
sanitize?: #FeaturesSanitize
shape?: #FeaturesShape
}

if Args.kind == "sink" {
Expand All @@ -202,6 +203,9 @@ components: {
descriptions: [Name=string]: string
}

#FeaturesAggregate: {
}

#FeaturesCollect: {
checkpoint: {
enabled: bool
Expand Down
18 changes: 18 additions & 0 deletions docs/reference/components/sources/internal_metrics.cue
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ components: sources: internal_metrics: {
}

// Instance-level "process" metrics
aggregate_events_recorded_total: {
description: "The number of events recorded by the aggregate transform."
type: "counter"
default_namespace: "vector"
tags: _component_tags
}
aggregate_failed_updates: {
description: "The number of failed metric updates, `incremental` adds, encountered by the aggregate transform."
type: "counter"
default_namespace: "vector"
tags: _component_tags
}
aggregate_flushes_total: {
description: "The number of flushes done by the aggregate transform."
type: "counter"
default_namespace: "vector"
tags: _component_tags
}
api_started_total: {
description: "The number of times the Vector GraphQL API has been started."
type: "counter"
Expand Down
217 changes: 217 additions & 0 deletions docs/reference/components/transforms/aggregate.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
package metadata

components: transforms: aggregate: {
title: "Aggregate"

description: """
Aggregates multiple metric events into a single metric event based
on a defined interval window. This helps to reduce metric volume at
the cost of granularity.
"""

classes: {
commonly_used: false
development: "beta"
egress_method: "stream"
stateful: true
}

features: {
aggregate: {}
}

support: {
targets: {
"aarch64-unknown-linux-gnu": true
"aarch64-unknown-linux-musl": true
"armv7-unknown-linux-gnueabihf": true
"armv7-unknown-linux-musleabihf": true
"x86_64-apple-darwin": true
"x86_64-pc-windows-msv": true
"x86_64-unknown-linux-gnu": true
"x86_64-unknown-linux-musl": true
}
requirements: []
warnings: []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a warning about loss of precision for any metric types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow. I would expect counters to be the only numerical values that sees actual mathematical addition. and normal floating point behaviors should apply. I wouldn't expect that to have much impact beyond what floating point details have on individual numbers in the real world, but I guess it's possible. I don't have a deep enough understanding for the distribution, histogram, and summary value types to say there.

If sinks are already doing implicit adds then I don't think this would add any new precision issues that aren't already present there.

notices: []
}

configuration: {
interval_ms: {
common: true
description: """
The interval over which metrics are aggregated in milliseconds. Over this period metrics with the
same series data (name, namespace, tags, ...) will be aggregated.
"""
required: false
warnings: []
type: uint: {
default: 10000
unit: "milliseconds"
}
}
}

input: {
logs: false
metrics: {
counter: true
distribution: true
gauge: true
histogram: true
set: true
summary: true
}
}

examples: [
{
title: "Aggregate over 15 seconds"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting that this says 15 seconds but the interval_ms below is 5000.

input: [
{
metric: {
kind: "incremental"
name: "counter.1"
timestamp: "2021-07-12T07:58:44.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 1.1
}
}
},
{
metric: {
kind: "incremental"
name: "counter.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 2.2
}
}
},
{
metric: {
kind: "incremental"
name: "counter.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "different.host.com"
}
counter: {
value: 1.1
}
}
},
{
metric: {
kind: "absolute"
name: "gauge.1"
timestamp: "2021-07-12T07:58:47.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 22.33
}
}
},
{
metric: {
kind: "absolute"
name: "gauge.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 44.55
}
}
},
]
configuration: {
interval_ms: 5000
}
output: [
{
metric: {
kind: "incremental"
name: "counter.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 3.3
}
}
},
{
metric: {
kind: "incremental"
name: "counter.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "different.host.com"
}
counter: {
value: 1.1
}
}
},
{
metric: {
kind: "absolute"
name: "gauge.1"
timestamp: "2021-07-12T07:58:45.223543Z"
tags: {
host: "my.host.com"
}
counter: {
value: 44.55
}
}
},
]
},
]

how_it_works: {
aggregation_behavior: {
title: "Aggregation Behavior"
body: """
Metrics are aggregated based on their kind. During an interval, `incremental` metrics
are "added" and newer `absolute` metrics replace older ones in the same series. This results in a reduction
of volume and less granularity, while maintaining numerical correctness. As an example, two
`incremental` `counter` metrics with values 10 and 13 processed by the transform during a period would be
aggregated into a single `incremental` `counter` with a value of 23. Two `absolute` `gauge` metrics with
values 93 and 95 would result in a single `absolute` `gauge` with the value of 95. More complex
types like `distribution`, `histogram`, `set`, and `summary` behave similarly with `incremental`
values being combined in a manner that makes sense based on their type.
"""
}

advantages: {
title: "Advantages of Use"
body: """
The major advantage to aggregation is the reduction of volume. It may reduce costs
directly in situations that charge by metric event volume, or indirectly by requiring less CPU to
process and/or less network bandwidth to transmit and receive. In systems that are constrained by
the processing required to ingest metric events it may help to reduce the processing overhead. This
may apply to transforms and sinks downstream of the aggregate transform as well.
"""
}

}

telemetry: metrics: {
aggregate_events_recorded_total: components.sources.internal_metrics.output.metrics.aggregate_events_recorded_total
aggregate_failed_updates: components.sources.internal_metrics.output.metrics.aggregate_failed_updates
aggregate_flushes_total: components.sources.internal_metrics.output.metrics.aggregate_flushes_total
}
}
Copy link
Contributor

@binarylogic binarylogic Jun 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a how_it_works section that describes:

  1. How metric types merge (absolute vs incremental, which timestamp is used, etc).
  2. What the user loses when this occurs (granularity, precision, etc).
  3. Why someone would do this? (cost reasons due to volume reduction).
  4. A call out to the log_to_metric transform since this will commonly be paired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2. What the user loses when this occurs (granularity, precision, etc).

With Counters/Incremental no meaningful data should be lost in the bigger sense it'll just shift the granularity at which it appears.

With Gauges/Absolute it'll reduce granularity/resolution, potentially losing short term spikes/dips.

3. Why someone would do this? (cost reasons due to volume reduction).

That and overhead. Network transfers, processing time in the metrics storage system etc. It can push the "work" of aggregating data out to the "edge" where it'll better scale in many cases.

4. A call out to the log_to_metric transform since this will commonly be paired?

Not aware of why this would be the case so would need a snippet to include it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. With the exception of 4.

29 changes: 29 additions & 0 deletions src/internal_events/aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct AggregateEventRecorded;

impl InternalEvent for AggregateEventRecorded {
fn emit_metrics(&self) {
counter!("aggregate_events_recorded_total", 1);
}
}

#[derive(Debug)]
pub struct AggregateFlushed;

impl InternalEvent for AggregateFlushed {
fn emit_metrics(&self) {
counter!("aggregate_flushes_total", 1);
}
}

#[derive(Debug)]
pub struct AggregateUpdateFailed;

impl InternalEvent for AggregateUpdateFailed {
fn emit_metrics(&self) {
counter!("aggregate_failed_updates", 1);
}
}
2 changes: 2 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::borrow::Cow;
mod adaptive_concurrency;
mod add_fields;
mod add_tags;
mod aggregate;
mod ansi_stripper;
#[cfg(feature = "sources-apache_metrics")]
mod apache_metrics;
Expand Down Expand Up @@ -132,6 +133,7 @@ pub mod kubernetes;
pub use self::adaptive_concurrency::*;
pub use self::add_fields::*;
pub use self::add_tags::*;
pub use self::aggregate::*;
pub use self::ansi_stripper::*;
#[cfg(feature = "sources-apache_metrics")]
pub use self::apache_metrics::*;
Expand Down
Loading