Skip to content

Commit

Permalink
atlas cloudwatch: remove polling filter.
Browse files Browse the repository at this point in the history
Allows for polling more frequently, e.g. every 5m for the EC2 VPC metrics.
  • Loading branch information
manolama committed Jun 23, 2023
1 parent 02ceac9 commit 02de667
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 32 deletions.
1 change: 1 addition & 0 deletions atlas-cloudwatch/src/main/resources/ec2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ atlas {
namespace = "AWS/EC2"
period = 5m
end-period-offset = 4
poll-offset = 5m

dimensions = [
"Per-VPC Metrics",
Expand Down
5 changes: 0 additions & 5 deletions atlas-cloudwatch/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ atlas {
// How often to run the polling scheduler to see if a polling run needs to
// execute.
frequency = 5m

# The period configured for namespaces that will be polled instead of sent via
# Firehose. So far we only care about S3 aggregates that are reported daily. If we
# need additional periods, we'll need to tweak the code.
period-filter = 1d
}

# TEMP: Used while testing cloud watch streaming to prepend "TEST." to metrics in order to compare against
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,13 @@ class CloudWatchPoller(
private val dpsPolled = registry.counter("atlas.cloudwatch.poller.dps.polled")
private val frequency = config.getDuration("atlas.cloudwatch.poller.frequency").getSeconds

private val periodFilter =
config.getDuration("atlas.cloudwatch.poller.period-filter").getSeconds.toInt

private[cloudwatch] val flagMap = new ConcurrentHashMap[String, AtomicBoolean]()

private[cloudwatch] val offsetMap = {
var map = Map.empty[Int, List[MetricCategory]]
rules
.getCategories(config)
.filter(c => c.pollOffset.isDefined && c.period == periodFilter)
.filter(c => c.pollOffset.isDefined)
.foreach { category =>
val offset = category.pollOffset.get.getSeconds.toInt
val categories = map.getOrElse(offset, List.empty)
Expand Down Expand Up @@ -155,7 +152,7 @@ class CloudWatchPoller(
if (namespaces.size > 0) {
val filtered = categories.filter(c => namespaces.contains(c.namespace))
if (filtered.size > 0) {
val nextRun = timeToRun(offset, account, region)
val nextRun = timeToRun(filtered.head.period, offset, account, region)
if (nextRun > 0) {
// flag check
val flag = flagMap.computeIfAbsent(
Expand Down Expand Up @@ -412,12 +409,12 @@ class CloudWatchPoller(
}
}

private def timeToRun(offset: Int, account: String, region: Region): Long = {
private def timeToRun(period: Int, offset: Int, account: String, region: Region): Long = {
// see if we've past the next run time.
var nextRun = 0L
try {
val previousRun = processor.lastSuccessfulPoll(runKey(offset, account, region))
nextRun = runAfter(offset, periodFilter)
nextRun = runAfter(offset, period)
if (previousRun >= nextRun) {
logger.info(
s"Skipping CloudWatch polling for ${offset}s as we're within the polling interval. Previous ${previousRun}. Next ${nextRun}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.netflix.iep.aws2.AwsClientFactory
import com.netflix.iep.leader.api.LeaderStatus
import com.netflix.spectator.api.DefaultRegistry
import com.netflix.spectator.api.Registry
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import junit.framework.TestCase.assertFalse
import munit.FunSuite
Expand Down Expand Up @@ -104,14 +105,61 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("init") {
val poller = getPoller
val poller = getPoller()
val categories = poller.offsetMap.get(offset).get
assertEquals(categories.filter(_.namespace == "AWS/UT1").size, 1)
}

test("init multiple offsets") {
val cfg = ConfigFactory.parseString(
"""
|atlas {
| cloudwatch {
| categories = ["cfg1", "cfg2"]
| poller.frequency = "5m"
|
| cfg1 = {
| namespace = "AWS/CFG1"
| period = 5m
| poll-offset = 5m
|
| dimensions = ["foo"]
| metrics = [
| {
| name = "M1"
| alias = "aws.m1"
| conversion = "max"
| }
| ]
| }
|
| cfg2 = {
| namespace = "AWS/CFG2"
| period = 1d
| poll-offset = 1h
|
| dimensions = ["bar"]
| metrics = [
| {
| name = "M2"
| alias = "aws.m2"
| conversion = "max"
| }
| ]
| }
| }
|}
|""".stripMargin)
val poller = getPoller(cfg)
var categories = poller.offsetMap.get(300).get
assertEquals(categories.filter(_.namespace == "AWS/CFG1").size, 1)
categories = poller.offsetMap.get(3600).get
assertEquals(categories.filter(_.namespace == "AWS/CFG2").size, 1)
}

test("poll not leader") {
when(leaderStatus.hasLeadership).thenReturn(false)
val poller = getPoller
val poller = getPoller()
poller.poll(offset, List(getCategory(poller)))
assertCounters()
verify(accountSupplier, never).accounts
Expand All @@ -121,22 +169,22 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
test("poll already ran") {
when(processor.lastSuccessfulPoll(anyString))
.thenReturn(System.currentTimeMillis() + 86_400_000L)
val poller = getPoller
val poller = getPoller()
poller.poll(offset, List(getCategory(poller)))
assertCounters()
verify(processor, never).updateLastSuccessfulPoll(anyString, anyLong)
}

test("poll already running") {
val poller = getPoller
val poller = getPoller()
poller.flagMap.put(runKey(28800, "123456789012", Region.US_EAST_1), new AtomicBoolean(true))
poller.poll(offset, List(getCategory(poller)))
assertCounters()
verify(processor, never).updateLastSuccessfulPoll(anyString, anyLong)
}

test("poll success") {
val poller = getPoller
val poller = getPoller()
mockSuccess
val flag = new AtomicBoolean()
val full = Promise[List[CloudWatchPoller#Poller]]()
Expand All @@ -152,7 +200,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("poll on list failure") {
val poller = getPoller
val poller = getPoller()
val flag = new AtomicBoolean()
val full = Promise[List[CloudWatchPoller#Poller]]()
val accountsDone = Promise[Done]()
Expand All @@ -176,7 +224,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
any[Optional[Region]]
)
).thenThrow(new RuntimeException("test"))
val poller = getPoller
val poller = getPoller()
val flag = new AtomicBoolean()
val full = Promise[List[CloudWatchPoller#Poller]]()
val accountsDone = Promise[Done]()
Expand All @@ -195,7 +243,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {

test("poll accounts exception") {
when(accountSupplier.accounts).thenThrow(new RuntimeException("test"))
val poller = getPoller
val poller = getPoller()
val flag = new AtomicBoolean()
val full = Promise[List[CloudWatchPoller#Poller]]()
val accountsDone = Promise[Done]()
Expand All @@ -214,7 +262,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {

test("poll empty accounts") {
when(accountSupplier.accounts).thenReturn(Map.empty)
val poller = getPoller
val poller = getPoller()
val flag = new AtomicBoolean()
val full = Promise[List[CloudWatchPoller#Poller]]()
val accountsDone = Promise[Done]()
Expand All @@ -228,7 +276,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#execute all success") {
val poller = getPoller
val poller = getPoller()
mockSuccess
val child = getChild(poller)
val f = child.execute
Expand All @@ -237,7 +285,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#execute one failure") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val f = child.execute
intercept[RuntimeException] {
Expand All @@ -247,7 +295,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#ListMetrics success") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val (mdef, req) = getListReq(poller)
val promise = Promise[Done]()
Expand All @@ -260,7 +308,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#ListMetrics empty") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val (mdef, req) = getListReq(poller)
val promise = Promise[Done]()
Expand All @@ -272,7 +320,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#ListMetrics one failure") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val (mdef, req) = getListReq(poller)
val promise = Promise[Done]()
Expand All @@ -286,7 +334,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#ListMetrics client throws") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val (mdef, req) = getListReq(poller)
val promise = Promise[Done]()
Expand All @@ -300,7 +348,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#FetchMetricStats success") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val category = getCategory(poller)
val (mdef, _) = getListReq(poller)
Expand Down Expand Up @@ -347,7 +395,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#FetchMetricStats success empty") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val category = getCategory(poller)
val (mdef, _) = getListReq(poller)
Expand All @@ -365,7 +413,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#FetchMetricStats client throws") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val category = getCategory(poller)
val (mdef, _) = getListReq(poller)
Expand All @@ -384,9 +432,9 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
assertCounters(errors = Map("metric" -> 1))
}

def getPoller: CloudWatchPoller = {
def getPoller(cfg: Config = config): CloudWatchPoller = {
new CloudWatchPoller(
config,
cfg,
registry,
leaderStatus,
accountSupplier,
Expand Down

0 comments on commit 02de667

Please sign in to comment.