Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

atlas cloudwatch: remove polling filter. #474

Merged
merged 1 commit into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions atlas-cloudwatch/src/main/resources/ec2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ atlas {
namespace = "AWS/EC2"
period = 5m
end-period-offset = 4
poll-offset = 5m

dimensions = [
"Per-VPC Metrics",
Expand Down
5 changes: 0 additions & 5 deletions atlas-cloudwatch/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,6 @@ atlas {
// How often to run the polling scheduler to see if a polling run needs to
// execute.
frequency = 5m

# The period configured for namespaces that will be polled instead of sent via
# Firehose. So far we only care about S3 aggregates that are reported daily. If we
# need additional periods, we'll need to tweak the code.
period-filter = 1d
}

# TEMP: Used while testing cloud watch streaming to prepend "TEST." to metrics in order to compare against
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,13 @@ class CloudWatchPoller(
private val dpsPolled = registry.counter("atlas.cloudwatch.poller.dps.polled")
private val frequency = config.getDuration("atlas.cloudwatch.poller.frequency").getSeconds

private val periodFilter =
config.getDuration("atlas.cloudwatch.poller.period-filter").getSeconds.toInt
manolama marked this conversation as resolved.
Show resolved Hide resolved

private[cloudwatch] val flagMap = new ConcurrentHashMap[String, AtomicBoolean]()

private[cloudwatch] val offsetMap = {
var map = Map.empty[Int, List[MetricCategory]]
rules
.getCategories(config)
.filter(c => c.pollOffset.isDefined && c.period == periodFilter)
.filter(c => c.pollOffset.isDefined)
.foreach { category =>
val offset = category.pollOffset.get.getSeconds.toInt
val categories = map.getOrElse(offset, List.empty)
Expand Down Expand Up @@ -155,7 +152,7 @@ class CloudWatchPoller(
if (namespaces.size > 0) {
val filtered = categories.filter(c => namespaces.contains(c.namespace))
if (filtered.size > 0) {
val nextRun = timeToRun(offset, account, region)
val nextRun = timeToRun(filtered.head.period, offset, account, region)
if (nextRun > 0) {
// flag check
val flag = flagMap.computeIfAbsent(
Expand Down Expand Up @@ -412,12 +409,12 @@ class CloudWatchPoller(
}
}

private def timeToRun(offset: Int, account: String, region: Region): Long = {
private def timeToRun(period: Int, offset: Int, account: String, region: Region): Long = {
// see if we've past the next run time.
var nextRun = 0L
try {
val previousRun = processor.lastSuccessfulPoll(runKey(offset, account, region))
nextRun = runAfter(offset, periodFilter)
nextRun = runAfter(offset, period)
if (previousRun >= nextRun) {
logger.info(
s"Skipping CloudWatch polling for ${offset}s as we're within the polling interval. Previous ${previousRun}. Next ${nextRun}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.netflix.iep.aws2.AwsClientFactory
import com.netflix.iep.leader.api.LeaderStatus
import com.netflix.spectator.api.DefaultRegistry
import com.netflix.spectator.api.Registry
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import junit.framework.TestCase.assertFalse
import munit.FunSuite
Expand Down Expand Up @@ -104,14 +105,60 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("init") {
val poller = getPoller
val poller = getPoller()
val categories = poller.offsetMap.get(offset).get
assertEquals(categories.filter(_.namespace == "AWS/UT1").size, 1)
}

test("init multiple offsets") {
val cfg = ConfigFactory.parseString("""
|atlas {
| cloudwatch {
| categories = ["cfg1", "cfg2"]
| poller.frequency = "5m"
|
| cfg1 = {
| namespace = "AWS/CFG1"
| period = 5m
| poll-offset = 5m
|
| dimensions = ["foo"]
| metrics = [
| {
| name = "M1"
| alias = "aws.m1"
| conversion = "max"
| }
| ]
| }
|
| cfg2 = {
| namespace = "AWS/CFG2"
| period = 1d
| poll-offset = 1h
|
| dimensions = ["bar"]
| metrics = [
| {
| name = "M2"
| alias = "aws.m2"
| conversion = "max"
| }
| ]
| }
| }
|}
|""".stripMargin)
val poller = getPoller(cfg)
var categories = poller.offsetMap.get(300).get
assertEquals(categories.filter(_.namespace == "AWS/CFG1").size, 1)
categories = poller.offsetMap.get(3600).get
assertEquals(categories.filter(_.namespace == "AWS/CFG2").size, 1)
}

test("poll not leader") {
when(leaderStatus.hasLeadership).thenReturn(false)
val poller = getPoller
val poller = getPoller()
poller.poll(offset, List(getCategory(poller)))
assertCounters()
verify(accountSupplier, never).accounts
Expand All @@ -121,22 +168,22 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
test("poll already ran") {
when(processor.lastSuccessfulPoll(anyString))
.thenReturn(System.currentTimeMillis() + 86_400_000L)
val poller = getPoller
val poller = getPoller()
poller.poll(offset, List(getCategory(poller)))
assertCounters()
verify(processor, never).updateLastSuccessfulPoll(anyString, anyLong)
}

test("poll already running") {
val poller = getPoller
val poller = getPoller()
poller.flagMap.put(runKey(28800, "123456789012", Region.US_EAST_1), new AtomicBoolean(true))
poller.poll(offset, List(getCategory(poller)))
assertCounters()
verify(processor, never).updateLastSuccessfulPoll(anyString, anyLong)
}

test("poll success") {
val poller = getPoller
val poller = getPoller()
mockSuccess
val flag = new AtomicBoolean()
val full = Promise[List[CloudWatchPoller#Poller]]()
Expand All @@ -152,7 +199,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("poll on list failure") {
val poller = getPoller
val poller = getPoller()
val flag = new AtomicBoolean()
val full = Promise[List[CloudWatchPoller#Poller]]()
val accountsDone = Promise[Done]()
Expand All @@ -176,7 +223,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
any[Optional[Region]]
)
).thenThrow(new RuntimeException("test"))
val poller = getPoller
val poller = getPoller()
val flag = new AtomicBoolean()
val full = Promise[List[CloudWatchPoller#Poller]]()
val accountsDone = Promise[Done]()
Expand All @@ -195,7 +242,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {

test("poll accounts exception") {
when(accountSupplier.accounts).thenThrow(new RuntimeException("test"))
val poller = getPoller
val poller = getPoller()
val flag = new AtomicBoolean()
val full = Promise[List[CloudWatchPoller#Poller]]()
val accountsDone = Promise[Done]()
Expand All @@ -214,7 +261,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {

test("poll empty accounts") {
when(accountSupplier.accounts).thenReturn(Map.empty)
val poller = getPoller
val poller = getPoller()
val flag = new AtomicBoolean()
val full = Promise[List[CloudWatchPoller#Poller]]()
val accountsDone = Promise[Done]()
Expand All @@ -228,7 +275,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#execute all success") {
val poller = getPoller
val poller = getPoller()
mockSuccess
val child = getChild(poller)
val f = child.execute
Expand All @@ -237,7 +284,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#execute one failure") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val f = child.execute
intercept[RuntimeException] {
Expand All @@ -247,7 +294,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#ListMetrics success") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val (mdef, req) = getListReq(poller)
val promise = Promise[Done]()
Expand All @@ -260,7 +307,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#ListMetrics empty") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val (mdef, req) = getListReq(poller)
val promise = Promise[Done]()
Expand All @@ -272,7 +319,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#ListMetrics one failure") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val (mdef, req) = getListReq(poller)
val promise = Promise[Done]()
Expand All @@ -286,7 +333,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#ListMetrics client throws") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val (mdef, req) = getListReq(poller)
val promise = Promise[Done]()
Expand All @@ -300,7 +347,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#FetchMetricStats success") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val category = getCategory(poller)
val (mdef, _) = getListReq(poller)
Expand Down Expand Up @@ -347,7 +394,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#FetchMetricStats success empty") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val category = getCategory(poller)
val (mdef, _) = getListReq(poller)
Expand All @@ -365,7 +412,7 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
}

test("Poller#FetchMetricStats client throws") {
val poller = getPoller
val poller = getPoller()
val child = getChild(poller)
val category = getCategory(poller)
val (mdef, _) = getListReq(poller)
Expand All @@ -384,9 +431,9 @@ class CloudWatchPollerSuite extends FunSuite with TestKitBase {
assertCounters(errors = Map("metric" -> 1))
}

def getPoller: CloudWatchPoller = {
def getPoller(cfg: Config = config): CloudWatchPoller = {
new CloudWatchPoller(
config,
cfg,
registry,
leaderStatus,
accountSupplier,
Expand Down