Skip to content

Commit

Permalink
Fix a couple of bugs in the logic for how AWS metric periods are calc…
Browse files Browse the repository at this point in the history
…uated.

Firstly, we clarify that periods are always whole-minute durations.
Any period that is less than or in between minutes is rounded up to the
next whole-minute.

Secondly, we ensure that the resulting time period is always in the past.
This stops us getting empty metrics for the current-minute period.

Thirdly, we follow the AWS guidelines of aligning periods to whole
multiples within the hour e.g. 10:25->10:30 instead of 10:27->10:32
for a 5 minute period.
  • Loading branch information
tommyers-elastic committed Aug 17, 2022
1 parent 428dbc0 commit 19e7073
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 25 deletions.
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
startDate, endDate := getStartDateEndDate(m.Period)

// Get startTime and endTime
startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency)

// get cost metrics from cost explorer
awsBeatsConfig := m.MetricSet.AwsConfig.Copy()
Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Get startTime and endTime
startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.Period, m.Latency)
m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime)

// Check statistic method in config
Expand Down
10 changes: 5 additions & 5 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ func TestCreateEventsWithIdentifier(t *testing.T) {
Value: "test-ec2",
},
}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
Expand Down Expand Up @@ -1432,7 +1432,7 @@ func TestCreateEventsWithoutIdentifier(t *testing.T) {
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)

events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
Expand Down Expand Up @@ -1478,7 +1478,7 @@ func TestCreateEventsWithTagsFilter(t *testing.T) {
},
}

startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
events, err := m.createEvents(mockCloudwatchSvc, mockTaggingSvc, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
assert.Equal(t, 1, len(events))
Expand Down Expand Up @@ -1630,7 +1630,7 @@ func TestCreateEventsTimestamp(t *testing.T) {
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)

cloudwatchMock := &MockCloudWatchClientWithoutDim{}
resGroupTaggingClientMock := &MockResourceGroupsTaggingClient{}
Expand All @@ -1644,6 +1644,6 @@ func TestGetStartTimeEndTime(t *testing.T) {
m.CloudwatchConfigs = []Config{{Statistic: []string{"Average"}}}
m.MetricSet = &aws.MetricSet{Period: 5 * time.Minute}
m.logger = logp.NewLogger("test")
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)
startTime, endTime := aws.GetStartTimeEndTime(time.Now(), m.MetricSet.Period, m.MetricSet.Latency)
assert.Equal(t, 5*time.Minute, endTime.Sub(startTime))
}
32 changes: 16 additions & 16 deletions x-pack/metricbeat/module/aws/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ import (
resourcegroupstaggingapitypes "github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi/types"
)

// GetStartTimeEndTime function uses durationString to create startTime and endTime for queries.
func GetStartTimeEndTime(period time.Duration, latency time.Duration) (time.Time, time.Time) {
endTime := time.Now()
if latency != 0 {
// add latency if config is not 0
endTime = endTime.Add(latency * -1)
}

// Set startTime to be one period earlier than the endTime. If metrics are
// not being collected, use latency config parameter to offset the startTime
// and endTime.
startTime := endTime.Add(period * -1)
// Defining duration
d := 60 * time.Second
// Calling Round() method
return startTime.Round(d), endTime.Round(d)
// GetStartTimeEndTime calculates start and end times for queries based on the current time and a duration.
//
// Whilst the inputs to this function are continuous, the maximum period granularity we can consistently use
// is 1 minute. The resulting interval should also be aligned to the period for best performance. This means
// if a period of 3 minutes is requested at 12:05, for example, the calculated times are 12:00->12:03. See
// https:/aws/aws-sdk-go-v2/blob/fdbd882cdf5c63a578caed14688cf9a456c75f2b/service/cloudwatch/api_op_GetMetricData.go#L88
// for more information about granularity and period alignment.
//
// If durations are configured in non-whole minute periods, they are rounded up to the next minute e.g. 90s becomes 120s.
//
// If `latency` is configured, the period is shifted back in time by specified duration (before period alignment).
func GetStartTimeEndTime(now time.Time, period time.Duration, latency time.Duration) (time.Time, time.Time) {
periodInMinutes := (period + time.Second*29).Round(time.Second * 60)
endTime := now.Add(latency * -1).Truncate(periodInMinutes)
startTime := endTime.Add(periodInMinutes * -1)
return startTime, endTime
}

// GetListMetricsOutput function gets listMetrics results from cloudwatch ~~per namespace~~ for each region.
Expand Down
55 changes: 53 additions & 2 deletions x-pack/metricbeat/module/aws/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestGetListMetricsOutputWithWildcard(t *testing.T) {
}

func TestGetMetricDataPerRegion(t *testing.T) {
startTime, endTime := GetStartTimeEndTime(10*time.Minute, 0)
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0)

mockSvc := &MockCloudWatchClient{}
var metricDataQueries []cloudwatchtypes.MetricDataQuery
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestGetMetricDataPerRegion(t *testing.T) {
}

func TestGetMetricDataResults(t *testing.T) {
startTime, endTime := GetStartTimeEndTime(10*time.Minute, 0)
startTime, endTime := GetStartTimeEndTime(time.Now(), 10*time.Minute, 0)

mockSvc := &MockCloudWatchClient{}
metricInfo := cloudwatchtypes.Metric{
Expand Down Expand Up @@ -434,3 +434,54 @@ func TestGetResourcesTags(t *testing.T) {
}
assert.Equal(t, expectedResourceTagMap, resourceTagMap)
}

func TestGetStartTimeEndTime(t *testing.T) {
var cases = []struct {
title string
start string
period time.Duration
latency time.Duration
expectedStart string
expectedEnd string
}{
// window should align with period e.g. requesting a 5 minute period at 10:27 gives 10:20->10:25
{"1 minute", "2022-08-15T13:38:45Z", time.Second * 60, 0, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"},
{"2 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 2, 0, "2022-08-15T13:36:00Z", "2022-08-15T13:38:00Z"},
{"3 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 3, 0, "2022-08-15T13:33:00Z", "2022-08-15T13:36:00Z"},
{"5 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 5, 0, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"},
{"30 minutes", "2022-08-15T13:38:45Z", time.Second * 60 * 30, 0, "2022-08-15T13:00:00Z", "2022-08-15T13:30:00Z"},

// latency should shift the time *before* period alignment
// e.g. requesting a 5 minute period at 10:27 with 1 minutes latency still gives 10:20->10:25,
// but with 3 minutes latency gives 10:15->10:20
{"1 minute, 10 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60, time.Second * 60 * 10, "2022-08-15T13:27:00Z", "2022-08-15T13:28:00Z"},
{"2 minutes, 1 minute latency", "2022-08-15T13:38:45Z", time.Second * 60 * 2, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"},
{"5 minutes, 4 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 5, time.Second * 60 * 4, "2022-08-15T13:25:00Z", "2022-08-15T13:30:00Z"},
{"30 minutes, 30 minutes latency", "2022-08-15T13:38:45Z", time.Second * 60 * 30, time.Second * 60 * 30, "2022-08-15T12:30:00Z", "2022-08-15T13:00:00Z"},

// non-whole-minute periods should be rounded up to the nearest minute; latency is applied as-is before period adjustment
{"20 seconds, 45 second latency", "2022-08-15T13:38:45Z", time.Second * 20, time.Second * 45, "2022-08-15T13:37:00Z", "2022-08-15T13:38:00Z"},
{"1.5 minutes, 60 second latency", "2022-08-15T13:38:45Z", time.Second * 90, time.Second * 60, "2022-08-15T13:34:00Z", "2022-08-15T13:36:00Z"},
{"just less than 5 minutes, 3 minute latency", "2022-08-15T13:38:45Z", time.Second * 59 * 5, time.Second * 90, "2022-08-15T13:30:00Z", "2022-08-15T13:35:00Z"},
}

parseTime := func(in string) time.Time {
time, err := time.Parse(time.RFC3339, in)
if err != nil {
t.Errorf("test setup failed - could not parse time with time.RFC3339: %s", in)
}
return time
}

for _, tt := range cases {
t.Run(tt.title, func(t *testing.T) {
startTime, expectedStartTime, expectedEndTime := parseTime(tt.start), parseTime(tt.expectedStart), parseTime(tt.expectedEnd)

start, end := GetStartTimeEndTime(startTime, tt.period, tt.latency)

if expectedStartTime != start || expectedEndTime != end {
t.Errorf("got (%s, %s), want (%s, %s)", start, end, tt.expectedStart, tt.expectedEnd)
}
})
}
}

0 comments on commit 19e7073

Please sign in to comment.