From ce659e8b0f3b67b5ca90b785392c04a166f93d29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Cardoso?= Date: Sun, 1 Sep 2024 17:45:38 +0100 Subject: [PATCH] = kamon-datadog: Allow for retries when issuing http requests to Datadog API --- .../src/main/resources/reference.conf | 16 ++++ .../main/scala/kamon/datadog/package.scala | 38 +++++++-- .../kamon/datadog/AbstractHttpReporter.scala | 4 + .../datadog/DatadogAPIReporterSpec.scala | 77 +++++++++++-------- 4 files changed, 96 insertions(+), 39 deletions(-) diff --git a/reporters/kamon-datadog/src/main/resources/reference.conf b/reporters/kamon-datadog/src/main/resources/reference.conf index 0b2f2ccff..b8cafdb7e 100644 --- a/reporters/kamon-datadog/src/main/resources/reference.conf +++ b/reporters/kamon-datadog/src/main/resources/reference.conf @@ -40,6 +40,14 @@ kamon { connect-timeout = 5 seconds read-timeout = 5 seconds write-timeout = 5 seconds + + # Try this number of times to submit metrics to the Datadog API. + # Only in case of HTTP response status of 408, 429, 502, 503 or 504 is the request attempted again. + # A `0` value disables retries. + retries = 3 + + # The initial retry delay that gets exponentially increased after each retry attempt. + init-retry-delay = 500 milliseconds } # @@ -65,6 +73,14 @@ kamon { # Use 'Deflate' compression when posting to the Datadog API compression = false + + # Try this number of times to submit metrics to the Datadog API. + # Only in case of HTTP response status of 408, 429, 502, 503 or 504 is the request attempted again. + # A `0` value disables retries. + retries = 3 + + # The initial retry delay that gets exponentially increased after each retry attempt. + init-retry-delay = 500 milliseconds } diff --git a/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala b/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala index 9f1c2ca7a..6e2facd39 100644 --- a/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala +++ b/reporters/kamon-datadog/src/main/scala/kamon/datadog/package.scala @@ -19,12 +19,12 @@ package kamon import java.nio.charset.StandardCharsets import java.time.{Duration, Instant} import java.util.concurrent.TimeUnit - import com.typesafe.config.Config import kamon.metric.MeasurementUnit import kamon.metric.MeasurementUnit.{information, time} import okhttp3._ +import scala.annotation.tailrec import scala.util.{Failure, Success, Try} package object datadog { @@ -43,10 +43,13 @@ package object datadog { usingAgent: Boolean, connectTimeout: Duration, readTimeout: Duration, - writeTimeout: Duration + writeTimeout: Duration, + retries: Int, + initRetryDelay: Duration ) { - val httpClient: OkHttpClient = createHttpClient() + private val httpClient: OkHttpClient = createHttpClient() + private val retryableStatusCodes: Set[Int] = Set(408, 429, 502, 503, 504) def this(config: Config, usingAgent: Boolean) = { this( @@ -56,12 +59,33 @@ package object datadog { usingAgent, config.getDuration("connect-timeout"), config.getDuration("read-timeout"), - config.getDuration("write-timeout") + config.getDuration("write-timeout"), + config.getInt("retries"), + config.getDuration("init-retry-delay") ) } - private def doRequest(request: Request): Try[Response] = { - Try(httpClient.newCall(request).execute()) + @tailrec + private def doRequestWithRetries(request: Request, attempt: Int = 0): Try[Response] = { + // Try executing the request + val responseAttempt = Try(httpClient.newCall(request).execute()) + + if (attempt >= retries - 1) { + responseAttempt + } else { + responseAttempt match { + // If the request succeeded but with a retryable HTTP status code. + case Success(response) if retryableStatusCodes.contains(response.code) => + response.close() + Thread.sleep(initRetryDelay.toMillis * Math.pow(2, attempt).toLong) + doRequestWithRetries(request, attempt + 1) + + // Either the request succeeded with an HTTP status not included in `retryableStatusCodes` + // or we have an unknown failure + case _ => + responseAttempt + } + } } def doMethodWithBody(method: String, contentType: String, contentBody: Array[Byte]): Try[String] = { @@ -69,7 +93,7 @@ package object datadog { val url = apiUrl + apiKey.map(key => "?api_key=" + key).getOrElse("") val request = new Request.Builder().url(url).method(method, body).build - doRequest(request) match { + doRequestWithRetries(request) match { case Success(response) => val responseBody = response.body().string() response.close() diff --git a/reporters/kamon-datadog/src/test/scala/kamon/datadog/AbstractHttpReporter.scala b/reporters/kamon-datadog/src/test/scala/kamon/datadog/AbstractHttpReporter.scala index 8905a86fc..b5741e6df 100644 --- a/reporters/kamon-datadog/src/test/scala/kamon/datadog/AbstractHttpReporter.scala +++ b/reporters/kamon-datadog/src/test/scala/kamon/datadog/AbstractHttpReporter.scala @@ -22,6 +22,10 @@ abstract class AbstractHttpReporter extends AnyWordSpec with BeforeAndAfterAll { server.url(path).toString } + protected def mockResponse(response: MockResponse): Unit = { + server.enqueue(response) + } + override protected def afterAll(): Unit = { super.afterAll() server.shutdown() diff --git a/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala b/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala index 8676bb6c8..5e9cee0cc 100644 --- a/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala +++ b/reporters/kamon-datadog/src/test/scala/kamon/datadog/DatadogAPIReporterSpec.scala @@ -18,9 +18,52 @@ import scala.concurrent.ExecutionContext class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Reconfigure { "the DatadogAPIReporter" should { + val reporter = new DatadogAPIReporterFactory().create(ModuleFactory.Settings(Kamon.config(), ExecutionContext.global)) val now = Instant.ofEpochMilli(1523395554) + val examplePeriod = PeriodSnapshot.apply( + now.minusMillis(1000), + now, + MetricSnapshot.ofValues[Long]( + "test.counter", + "test", + Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO), + Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil + ) :: Nil, + Nil, + Nil, + Nil, + Nil + ) + + "handle retries on retriable HTTP status codes" in { + val baseUrl = mockResponse("/test", new MockResponse().setResponseCode(429)) + applyConfig("kamon.datadog.api.api-url = \"" + baseUrl + "\"") + applyConfig("kamon.datadog.api.api-key = \"dummy\"") + applyConfig("kamon.datadog.api.compression = false") + applyConfig("kamon.datadog.api.init-retry-delay = 100 milliseconds") + applyConfig("kamon.datadog.api.retries = 0") + reporter.reconfigure(Kamon.config()) + + reporter.reportPeriodSnapshot(examplePeriod) + + server.getRequestCount shouldEqual 1 + server.takeRequest() + + applyConfig("kamon.datadog.api.retries = 3") + reporter.reconfigure(Kamon.config()) + + mockResponse(new MockResponse().setResponseCode(429)) + mockResponse(new MockResponse().setResponseCode(503)) + mockResponse(new MockResponse().setResponseCode(504)) + reporter.reportPeriodSnapshot(examplePeriod) + Thread.sleep(1000) + server.takeRequest() + server.takeRequest() + server.takeRequest() + server.getRequestCount shouldEqual 4 + } "sends metrics - compressed" in { val baseUrl = mockResponse("/test", new MockResponse().setStatus("HTTP/1.1 200 OK")) @@ -29,22 +72,7 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec applyConfig("kamon.datadog.api.compression = true") reporter.reconfigure(Kamon.config()) - reporter.reportPeriodSnapshot( - PeriodSnapshot.apply( - now.minusMillis(1000), - now, - MetricSnapshot.ofValues[Long]( - "test.counter", - "test", - Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO), - Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil - ) :: Nil, - Nil, - Nil, - Nil, - Nil - ) - ) + reporter.reportPeriodSnapshot(examplePeriod) val request = server.takeRequest() @@ -64,22 +92,7 @@ class DatadogAPIReporterSpec extends AbstractHttpReporter with Matchers with Rec applyConfig("kamon.datadog.api.compression = false") reporter.reconfigure(Kamon.config()) - reporter.reportPeriodSnapshot( - PeriodSnapshot.apply( - now.minusMillis(1000), - now, - MetricSnapshot.ofValues[Long]( - "test.counter", - "test", - Metric.Settings.ForValueInstrument(MeasurementUnit.none, java.time.Duration.ZERO), - Instrument.Snapshot.apply(TagSet.of("tag1", "value1"), 0L) :: Nil - ) :: Nil, - Nil, - Nil, - Nil, - Nil - ) - ) + reporter.reportPeriodSnapshot(examplePeriod) val request = server.takeRequest() request.getRequestUrl.toString shouldEqual baseUrl + "?api_key=dummy" request.getMethod shouldEqual "POST"