Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform retries for certain HTTP status codes when submitting metrics to Datadog API #1359

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions reporters/kamon-datadog/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ kamon {
connect-timeout = 5 seconds
read-timeout = 5 seconds
write-timeout = 5 seconds

# Try this number of times to submit metrics to the Datadog API.
# Only in case of HTTP response status of 408, 429, 502, 503 or 504 is the request attempted again.
# A `0` value disables retries.
retries = 3

# The initial retry delay that gets exponentially increased after each retry attempt.
init-retry-delay = 500 milliseconds
}

#
Expand All @@ -65,6 +73,14 @@ kamon {

# Use 'Deflate' compression when posting to the Datadog API
compression = false

# Try this number of times to submit metrics to the Datadog API.
# Only in case of HTTP response status of 408, 429, 502, 503 or 504 is the request attempted again.
# A `0` value disables retries.
retries = 3

# The initial retry delay that gets exponentially increased after each retry attempt.
init-retry-delay = 500 milliseconds
}


Expand Down
38 changes: 31 additions & 7 deletions reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package kamon
import java.nio.charset.StandardCharsets
import java.time.{Duration, Instant}
import java.util.concurrent.TimeUnit

import com.typesafe.config.Config
import kamon.metric.MeasurementUnit
import kamon.metric.MeasurementUnit.{information, time}
import okhttp3._

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

package object datadog {
Expand All @@ -43,10 +43,13 @@ package object datadog {
usingAgent: Boolean,
connectTimeout: Duration,
readTimeout: Duration,
writeTimeout: Duration
writeTimeout: Duration,
retries: Int,
initRetryDelay: Duration
) {

val httpClient: OkHttpClient = createHttpClient()
private val httpClient: OkHttpClient = createHttpClient()
private val retryableStatusCodes: Set[Int] = Set(408, 429, 502, 503, 504)

def this(config: Config, usingAgent: Boolean) = {
this(
Expand All @@ -56,20 +59,41 @@ package object datadog {
usingAgent,
config.getDuration("connect-timeout"),
config.getDuration("read-timeout"),
config.getDuration("write-timeout")
config.getDuration("write-timeout"),
config.getInt("retries"),
config.getDuration("init-retry-delay")
)
}

private def doRequest(request: Request): Try[Response] = {
Try(httpClient.newCall(request).execute())
@tailrec
private def doRequestWithRetries(request: Request, attempt: Int = 0): Try[Response] = {
// Try executing the request
val responseAttempt = Try(httpClient.newCall(request).execute())

if (attempt >= retries - 1) {
responseAttempt
} else {
responseAttempt match {
// If the request succeeded but with a retryable HTTP status code.
case Success(response) if retryableStatusCodes.contains(response.code) =>
response.close()
Thread.sleep(initRetryDelay.toMillis * Math.pow(2, attempt).toLong)
doRequestWithRetries(request, attempt + 1)

// Either the request succeeded with an HTTP status not included in `retryableStatusCodes`
// or we have an unknown failure
case _ =>
responseAttempt
}
}
}

def doMethodWithBody(method: String, contentType: String, contentBody: Array[Byte]): Try[String] = {
val body = RequestBody.create(MediaType.parse(contentType), contentBody)
val url = apiUrl + apiKey.map(key => "?api_key=" + key).getOrElse("")
val request = new Request.Builder().url(url).method(method, body).build

doRequest(request) match {
doRequestWithRetries(request) match {
case Success(response) =>
val responseBody = response.body().string()
response.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ abstract class AbstractHttpReporter extends AnyWordSpec with BeforeAndAfterAll {
server.url(path).toString
}

protected def mockResponse(response: MockResponse): Unit = {
server.enqueue(response)
}

override protected def afterAll(): Unit = {
super.afterAll()
server.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,52 @@ import scala.concurrent.ExecutionContext
class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Reconfigure {

"the DatadogAPIReporter" should {

val reporter =
new DatadogAPIReporterFactory().create(ModuleFactory.Settings(Kamon.config(), ExecutionContext.global))
val now = Instant.ofEpochMilli(1523395554)
val examplePeriod = PeriodSnapshot.apply(
now.minusMillis(1000),
now,
MetricSnapshot.ofValues[Long](
"test.counter",
"test",
Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO),
Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil
) :: Nil,
Nil,
Nil,
Nil,
Nil
)

"handle retries on retriable HTTP status codes" in {
val baseUrl = mockResponse("/test", new MockResponse().setResponseCode(429))
applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"")
applyConfig("kamon.datadog.api.api-key = \"dummy\"")
applyConfig("kamon.datadog.api.compression = false")
applyConfig("kamon.datadog.api.init-retry-delay = 100 milliseconds")
applyConfig("kamon.datadog.api.retries = 0")
reporter.reconfigure(Kamon.config())

reporter.reportPeriodSnapshot(examplePeriod)

server.getRequestCount shouldEqual 1
server.takeRequest()

applyConfig("kamon.datadog.api.retries = 3")
reporter.reconfigure(Kamon.config())

mockResponse(new MockResponse().setResponseCode(429))
mockResponse(new MockResponse().setResponseCode(503))
mockResponse(new MockResponse().setResponseCode(504))
reporter.reportPeriodSnapshot(examplePeriod)
Thread.sleep(1000)
server.takeRequest()
server.takeRequest()
server.takeRequest()
server.getRequestCount shouldEqual 4
}

"sends metrics - compressed" in {
val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK"))
Expand All @@ -29,22 +72,7 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec
applyConfig("kamon.datadog.api.compression = true")
reporter.reconfigure(Kamon.config())

reporter.reportPeriodSnapshot(
PeriodSnapshot.apply(
now.minusMillis(1000),
now,
MetricSnapshot.ofValues[Long](
"test.counter",
"test",
Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO),
Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil
) :: Nil,
Nil,
Nil,
Nil,
Nil
)
)
reporter.reportPeriodSnapshot(examplePeriod)

val request = server.takeRequest()

Expand All @@ -64,22 +92,7 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec
applyConfig("kamon.datadog.api.compression = false")
reporter.reconfigure(Kamon.config())

reporter.reportPeriodSnapshot(
PeriodSnapshot.apply(
now.minusMillis(1000),
now,
MetricSnapshot.ofValues[Long](
"test.counter",
"test",
Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO),
Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil
) :: Nil,
Nil,
Nil,
Nil,
Nil
)
)
reporter.reportPeriodSnapshot(examplePeriod)
val request = server.takeRequest()
request.getRequestUrl.toString shouldEqual baseUrl + "?api_key=dummy"
request.getMethod shouldEqual "POST"
Expand Down