Skip to content

Commit

Permalink
= kamon-datadog: Allow for retries when issuing http requests to Data…
Browse files Browse the repository at this point in the history
…dog API (#1359)
  • Loading branch information
thyandrecardoso authored Oct 11, 2024
1 parent c2338f6 commit d9f7f9f
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 39 deletions.
16 changes: 16 additions & 0 deletions reporters/kamon-datadog/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ kamon {
connect-timeout = 5 seconds
read-timeout = 5 seconds
write-timeout = 5 seconds

# Try this number of times to submit metrics to the Datadog API.
# Only in case of HTTP response status of 408, 429, 502, 503 or 504 is the request attempted again.
# A `0` value disables retries.
retries = 3

# The initial retry delay that gets exponentially increased after each retry attempt.
init-retry-delay = 500 milliseconds
}

#
Expand All @@ -65,6 +73,14 @@ kamon {

# Use 'Deflate' compression when posting to the Datadog API
compression = false

# Try this number of times to submit metrics to the Datadog API.
# Only in case of HTTP response status of 408, 429, 502, 503 or 504 is the request attempted again.
# A `0` value disables retries.
retries = 3

# The initial retry delay that gets exponentially increased after each retry attempt.
init-retry-delay = 500 milliseconds
}

# The log level in which to log failures to submit metrics.
Expand Down
38 changes: 31 additions & 7 deletions reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package kamon
import java.nio.charset.StandardCharsets
import java.time.{Duration, Instant}
import java.util.concurrent.TimeUnit

import com.typesafe.config.Config
import kamon.metric.MeasurementUnit
import kamon.metric.MeasurementUnit.{information, time}
import okhttp3._
import org.slf4j.Logger
import org.slf4j.event.Level

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

package object datadog {
Expand Down Expand Up @@ -62,10 +62,13 @@ package object datadog {
usingAgent: Boolean,
connectTimeout: Duration,
readTimeout: Duration,
writeTimeout: Duration
writeTimeout: Duration,
retries: Int,
initRetryDelay: Duration
) {

val httpClient: OkHttpClient = createHttpClient()
private val httpClient: OkHttpClient = createHttpClient()
private val retryableStatusCodes: Set[Int] = Set(408, 429, 502, 503, 504)

def this(config: Config, usingAgent: Boolean) = {
this(
Expand All @@ -75,20 +78,41 @@ package object datadog {
usingAgent,
config.getDuration("connect-timeout"),
config.getDuration("read-timeout"),
config.getDuration("write-timeout")
config.getDuration("write-timeout"),
config.getInt("retries"),
config.getDuration("init-retry-delay")
)
}

private def doRequest(request: Request): Try[Response] = {
Try(httpClient.newCall(request).execute())
@tailrec
private def doRequestWithRetries(request: Request, attempt: Int = 0): Try[Response] = {
// Try executing the request
val responseAttempt = Try(httpClient.newCall(request).execute())

if (attempt >= retries - 1) {
responseAttempt
} else {
responseAttempt match {
// If the request succeeded but with a retryable HTTP status code.
case Success(response) if retryableStatusCodes.contains(response.code) =>
response.close()
Thread.sleep(initRetryDelay.toMillis * Math.pow(2, attempt).toLong)
doRequestWithRetries(request, attempt + 1)

// Either the request succeeded with an HTTP status not included in `retryableStatusCodes`
// or we have an unknown failure
case _ =>
responseAttempt
}
}
}

def doMethodWithBody(method: String, contentType: String, contentBody: Array[Byte]): Try[String] = {
val body = RequestBody.create(MediaType.parse(contentType), contentBody)
val url = apiUrl + apiKey.map(key => "?api_key=" + key).getOrElse("")
val request = new Request.Builder().url(url).method(method, body).build

doRequest(request) match {
doRequestWithRetries(request) match {
case Success(response) =>
val responseBody = response.body().string()
response.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ abstract class AbstractHttpReporter extends AnyWordSpec with BeforeAndAfterAll {
server.url(path).toString
}

protected def mockResponse(response: MockResponse): Unit = {
server.enqueue(response)
}

override protected def afterAll(): Unit = {
super.afterAll()
server.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,52 @@ import scala.concurrent.ExecutionContext
class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Reconfigure {

"the DatadogAPIReporter" should {

val reporter =
new DatadogAPIReporterFactory().create(ModuleFactory.Settings(Kamon.config(), ExecutionContext.global))
val now = Instant.ofEpochMilli(1523395554)
val examplePeriod = PeriodSnapshot.apply(
now.minusMillis(1000),
now,
MetricSnapshot.ofValues[Long](
"test.counter",
"test",
Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO),
Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil
) :: Nil,
Nil,
Nil,
Nil,
Nil
)

"handle retries on retriable HTTP status codes" in {
val baseUrl = mockResponse("/test", new MockResponse().setResponseCode(429))
applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"")
applyConfig("kamon.datadog.api.api-key = \"dummy\"")
applyConfig("kamon.datadog.api.compression = false")
applyConfig("kamon.datadog.api.init-retry-delay = 100 milliseconds")
applyConfig("kamon.datadog.api.retries = 0")
reporter.reconfigure(Kamon.config())

reporter.reportPeriodSnapshot(examplePeriod)

server.getRequestCount shouldEqual 1
server.takeRequest()

applyConfig("kamon.datadog.api.retries = 3")
reporter.reconfigure(Kamon.config())

mockResponse(new MockResponse().setResponseCode(429))
mockResponse(new MockResponse().setResponseCode(503))
mockResponse(new MockResponse().setResponseCode(504))
reporter.reportPeriodSnapshot(examplePeriod)
Thread.sleep(1000)
server.takeRequest()
server.takeRequest()
server.takeRequest()
server.getRequestCount shouldEqual 4
}

"sends metrics - compressed" in {
val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK"))
Expand All @@ -29,22 +72,7 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec
applyConfig("kamon.datadog.api.compression = true")
reporter.reconfigure(Kamon.config())

reporter.reportPeriodSnapshot(
PeriodSnapshot.apply(
now.minusMillis(1000),
now,
MetricSnapshot.ofValues[Long](
"test.counter",
"test",
Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO),
Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil
) :: Nil,
Nil,
Nil,
Nil,
Nil
)
)
reporter.reportPeriodSnapshot(examplePeriod)

val request = server.takeRequest()

Expand All @@ -64,22 +92,7 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec
applyConfig("kamon.datadog.api.compression = false")
reporter.reconfigure(Kamon.config())

reporter.reportPeriodSnapshot(
PeriodSnapshot.apply(
now.minusMillis(1000),
now,
MetricSnapshot.ofValues[Long](
"test.counter",
"test",
Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO),
Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil
) :: Nil,
Nil,
Nil,
Nil,
Nil
)
)
reporter.reportPeriodSnapshot(examplePeriod)
val request = server.takeRequest()
request.getRequestUrl.toString shouldEqual baseUrl + "?api_key=dummy"
request.getMethod shouldEqual "POST"
Expand Down

0 comments on commit d9f7f9f

Please sign in to comment.