From 199841a8205c38ea8198db1902df0759eaa9f128 Mon Sep 17 00:00:00 2001 From: david7482 Date: Thu, 2 Aug 2018 06:43:17 +0800 Subject: [PATCH] Support StatisticValues in cloudwatch output plugin (#4364) --- plugins/outputs/cloudwatch/README.md | 10 + plugins/outputs/cloudwatch/cloudwatch.go | 289 ++++++++++++++---- plugins/outputs/cloudwatch/cloudwatch_test.go | 38 ++- 3 files changed, 284 insertions(+), 53 deletions(-) diff --git a/plugins/outputs/cloudwatch/README.md b/plugins/outputs/cloudwatch/README.md index c44ac4ead97f3..31619263f26f9 100644 --- a/plugins/outputs/cloudwatch/README.md +++ b/plugins/outputs/cloudwatch/README.md @@ -36,3 +36,13 @@ Examples include but are not limited to: ### namespace The namespace used for AWS CloudWatch metrics. + +### write_statistics + +If you have a large amount of metrics, you should consider to send statistic +values instead of raw metrics which could not only improve performance but +also save AWS API cost. If enable this flag, this plugin would parse the required +[CloudWatch statistic fields](https://docs.aws.amazon.com/sdk-for-go/api/service/cloudwatch/#StatisticSet) +(count, min, max, and sum) and send them to CloudWatch. You could use `basicstats` +aggregator to calculate those fields. If not all statistic fields are available, +all fields would still be sent as raw metrics. \ No newline at end of file diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index 52ab41a2880ac..d3bd663036b71 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -28,6 +28,128 @@ type CloudWatch struct { Namespace string `toml:"namespace"` // CloudWatch Metrics Namespace svc *cloudwatch.CloudWatch + + WriteStatistics bool `toml:"write_statistics"` +} + +type statisticType int + +const ( + statisticTypeNone statisticType = iota + statisticTypeMax + statisticTypeMin + statisticTypeSum + statisticTypeCount +) + +type cloudwatchField interface { + addValue(sType statisticType, value float64) + buildDatum() []*cloudwatch.MetricDatum +} + +type statisticField struct { + metricName string + fieldName string + tags map[string]string + values map[statisticType]float64 + timestamp time.Time +} + +func (f *statisticField) addValue(sType statisticType, value float64) { + if sType != statisticTypeNone { + f.values[sType] = value + } +} + +func (f *statisticField) buildDatum() []*cloudwatch.MetricDatum { + + var datums []*cloudwatch.MetricDatum + + if f.hasAllFields() { + // If we have all required fields, we build datum with StatisticValues + min, _ := f.values[statisticTypeMin] + max, _ := f.values[statisticTypeMax] + sum, _ := f.values[statisticTypeSum] + count, _ := f.values[statisticTypeCount] + + datum := &cloudwatch.MetricDatum{ + MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")), + Dimensions: BuildDimensions(f.tags), + Timestamp: aws.Time(f.timestamp), + StatisticValues: &cloudwatch.StatisticSet{ + Minimum: aws.Float64(min), + Maximum: aws.Float64(max), + Sum: aws.Float64(sum), + SampleCount: aws.Float64(count), + }, + } + + datums = append(datums, datum) + + } else { + // If we don't have all required fields, we build each field as independent datum + for sType, value := range f.values { + datum := &cloudwatch.MetricDatum{ + Value: aws.Float64(value), + Dimensions: BuildDimensions(f.tags), + Timestamp: aws.Time(f.timestamp), + } + + switch sType { + case statisticTypeMin: + datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "min"}, "_")) + case statisticTypeMax: + datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "max"}, "_")) + case statisticTypeSum: + datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "sum"}, "_")) + case statisticTypeCount: + datum.MetricName = aws.String(strings.Join([]string{f.metricName, f.fieldName, "count"}, "_")) + default: + // should not be here + continue + } + + datums = append(datums, datum) + } + } + + return datums +} + +func (f *statisticField) hasAllFields() bool { + + _, hasMin := f.values[statisticTypeMin] + _, hasMax := f.values[statisticTypeMax] + _, hasSum := f.values[statisticTypeSum] + _, hasCount := f.values[statisticTypeCount] + + return hasMin && hasMax && hasSum && hasCount +} + +type valueField struct { + metricName string + fieldName string + tags map[string]string + value float64 + timestamp time.Time +} + +func (f *valueField) addValue(sType statisticType, value float64) { + if sType == statisticTypeNone { + f.value = value + } +} + +func (f *valueField) buildDatum() []*cloudwatch.MetricDatum { + + return []*cloudwatch.MetricDatum{ + { + MetricName: aws.String(strings.Join([]string{f.metricName, f.fieldName}, "_")), + Value: aws.Float64(f.value), + Dimensions: BuildDimensions(f.tags), + Timestamp: aws.Time(f.timestamp), + }, + } } var sampleConfig = ` @@ -57,6 +179,14 @@ var sampleConfig = ` ## Namespace for the CloudWatch MetricDatums namespace = "InfluxData/Telegraf" + + ## If you have a large amount of metrics, you should consider to send statistic + ## values instead of raw metrics which could not only improve performance but + ## also save AWS API cost. If enable this flag, this plugin would parse the required + ## CloudWatch statistic fields (count, min, max, and sum) and send them to CloudWatch. + ## You could use basicstats aggregator to calculate those fields. If not all statistic + ## fields are available, all fields would still be sent as raw metrics. + # write_statistics = false ` func (c *CloudWatch) SampleConfig() string { @@ -104,7 +234,7 @@ func (c *CloudWatch) Write(metrics []telegraf.Metric) error { var datums []*cloudwatch.MetricDatum for _, m := range metrics { - d := BuildMetricDatum(m) + d := BuildMetricDatum(c.WriteStatistics, m) datums = append(datums, d...) } @@ -159,67 +289,58 @@ func PartitionDatums(size int, datums []*cloudwatch.MetricDatum) [][]*cloudwatch return partitions } -// Make a MetricDatum for each field in a Point. Only fields with values that can be -// converted to float64 are supported. Non-supported fields are skipped. -func BuildMetricDatum(point telegraf.Metric) []*cloudwatch.MetricDatum { - datums := make([]*cloudwatch.MetricDatum, len(point.Fields())) - i := 0 +// Make a MetricDatum from telegraf.Metric. It would check if all required fields of +// cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values. +// Otherwise, fields would still been built independently. +func BuildMetricDatum(buildStatistic bool, point telegraf.Metric) []*cloudwatch.MetricDatum { - var value float64 + fields := make(map[string]cloudwatchField) for k, v := range point.Fields() { - switch t := v.(type) { - case int: - value = float64(t) - case int32: - value = float64(t) - case int64: - value = float64(t) - case uint64: - value = float64(t) - case float64: - value = t - case bool: - if t { - value = 1 - } else { - value = 0 - } - case time.Time: - value = float64(t.Unix()) - default: - // Skip unsupported type. - datums = datums[:len(datums)-1] - continue - } - // Do CloudWatch boundary checking - // Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html - if math.IsNaN(value) { - datums = datums[:len(datums)-1] - continue - } - if math.IsInf(value, 0) { - datums = datums[:len(datums)-1] + val, ok := convert(v) + if !ok { + // Only fields with values that can be converted to float64 (and within CloudWatch boundary) are supported. + // Non-supported fields are skipped. continue } - if value > 0 && value < float64(8.515920e-109) { - datums = datums[:len(datums)-1] - continue - } - if value > float64(1.174271e+108) { - datums = datums[:len(datums)-1] + + sType, fieldName := getStatisticType(k) + + // If statistic metric is not enabled or non-statistic type, just take current field as a value field. + if !buildStatistic || sType == statisticTypeNone { + fields[k] = &valueField{ + metricName: point.Name(), + fieldName: k, + tags: point.Tags(), + timestamp: point.Time(), + value: val, + } continue } - datums[i] = &cloudwatch.MetricDatum{ - MetricName: aws.String(strings.Join([]string{point.Name(), k}, "_")), - Value: aws.Float64(value), - Dimensions: BuildDimensions(point.Tags()), - Timestamp: aws.Time(point.Time()), + // Otherwise, it shall be a statistic field. + if _, ok := fields[fieldName]; !ok { + // Hit an uncached field, create statisticField for first time + fields[fieldName] = &statisticField{ + metricName: point.Name(), + fieldName: fieldName, + tags: point.Tags(), + timestamp: point.Time(), + values: map[statisticType]float64{ + sType: val, + }, + } + } else { + // Add new statistic value to this field + fields[fieldName].addValue(sType, val) } + } - i += 1 + var datums []*cloudwatch.MetricDatum + for _, f := range fields { + d := f.buildDatum() + datums = append(datums, d...) } return datums @@ -268,6 +389,72 @@ func BuildDimensions(mTags map[string]string) []*cloudwatch.Dimension { return dimensions } +func getStatisticType(name string) (sType statisticType, fieldName string) { + switch { + case strings.HasSuffix(name, "_max"): + sType = statisticTypeMax + fieldName = strings.TrimSuffix(name, "_max") + case strings.HasSuffix(name, "_min"): + sType = statisticTypeMin + fieldName = strings.TrimSuffix(name, "_min") + case strings.HasSuffix(name, "_sum"): + sType = statisticTypeSum + fieldName = strings.TrimSuffix(name, "_sum") + case strings.HasSuffix(name, "_count"): + sType = statisticTypeCount + fieldName = strings.TrimSuffix(name, "_count") + default: + sType = statisticTypeNone + fieldName = name + } + return +} + +func convert(v interface{}) (value float64, ok bool) { + + ok = true + + switch t := v.(type) { + case int: + value = float64(t) + case int32: + value = float64(t) + case int64: + value = float64(t) + case uint64: + value = float64(t) + case float64: + value = t + case bool: + if t { + value = 1 + } else { + value = 0 + } + case time.Time: + value = float64(t.Unix()) + default: + // Skip unsupported type. + ok = false + return + } + + // Do CloudWatch boundary checking + // Constraints at: http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html + switch { + case math.IsNaN(value): + return 0, false + case math.IsInf(value, 0): + return 0, false + case value > 0 && value < float64(8.515920e-109): + return 0, false + case value > float64(1.174271e+108): + return 0, false + } + + return +} + func init() { outputs.Add("cloudwatch", func() telegraf.Output { return &CloudWatch{} diff --git a/plugins/outputs/cloudwatch/cloudwatch_test.go b/plugins/outputs/cloudwatch/cloudwatch_test.go index 8ab60de2f082b..c91c30e0c0b21 100644 --- a/plugins/outputs/cloudwatch/cloudwatch_test.go +++ b/plugins/outputs/cloudwatch/cloudwatch_test.go @@ -5,11 +5,13 @@ import ( "math" "sort" "testing" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" @@ -72,13 +74,45 @@ func TestBuildMetricDatums(t *testing.T) { testutil.TestMetric(float64(1.174272e+108)), // largest should be 1.174271e+108 } for _, point := range validMetrics { - datums := BuildMetricDatum(point) + datums := BuildMetricDatum(false, point) assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point)) } for _, point := range invalidMetrics { - datums := BuildMetricDatum(point) + datums := BuildMetricDatum(false, point) assert.Equal(0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point)) } + + statisticMetric, _ := metric.New( + "test1", + map[string]string{"tag1": "value1"}, + map[string]interface{}{"value_max": float64(10), "value_min": float64(0), "value_sum": float64(100), "value_count": float64(20)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + datums := BuildMetricDatum(true, statisticMetric) + assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", statisticMetric)) + + multiFieldsMetric, _ := metric.New( + "test1", + map[string]string{"tag1": "value1"}, + map[string]interface{}{"valueA": float64(10), "valueB": float64(0), "valueC": float64(100), "valueD": float64(20)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + datums = BuildMetricDatum(true, multiFieldsMetric) + assert.Equal(4, len(datums), fmt.Sprintf("Each field should create a Datum {value: %v}", multiFieldsMetric)) + + multiStatisticMetric, _ := metric.New( + "test1", + map[string]string{"tag1": "value1"}, + map[string]interface{}{ + "valueA_max": float64(10), "valueA_min": float64(0), "valueA_sum": float64(100), "valueA_count": float64(20), + "valueB_max": float64(10), "valueB_min": float64(0), "valueB_sum": float64(100), "valueB_count": float64(20), + "valueC_max": float64(10), "valueC_min": float64(0), "valueC_sum": float64(100), + "valueD": float64(10), "valueE": float64(0), + }, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + datums = BuildMetricDatum(true, multiStatisticMetric) + assert.Equal(7, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", multiStatisticMetric)) } func TestPartitionDatums(t *testing.T) {