Skip to content

Commit

Permalink
Merge pull request #665 from iRevive/sdk-metrics/exemplar-reservoirs
Browse files Browse the repository at this point in the history
sdk-metrics: use `ExemplarReservoirs` when they are really needed [no ci]
  • Loading branch information
iRevive authored May 4, 2024
2 parents 0dd9286 + 1710cb9 commit 8992172
Show file tree
Hide file tree
Showing 16 changed files with 407 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.typelevel.otel4s.sdk.context.AskContext
import org.typelevel.otel4s.sdk.internal.ComponentRegistry
import org.typelevel.otel4s.sdk.metrics.data.MetricData
import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter
import org.typelevel.otel4s.sdk.metrics.exemplar.Reservoirs
import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup
import org.typelevel.otel4s.sdk.metrics.exporter.MetricProducer
import org.typelevel.otel4s.sdk.metrics.exporter.MetricReader
Expand Down Expand Up @@ -235,13 +236,14 @@ object SdkMeterProvider {
ExemplarFilter.traceBased(traceContextLookup)
)

val reservoirs = Reservoirs(filter, traceContextLookup)

for {
state <- MeterSharedState.create(
resource,
scope,
startTimestamp,
filter,
traceContextLookup,
reservoirs,
viewRegistry,
readers
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package org.typelevel.otel4s.sdk.metrics.aggregation

import cats.Applicative
import cats.data.NonEmptyVector
import cats.effect.Temporal
import cats.effect.std.Random
import cats.effect.Concurrent
import org.typelevel.otel4s.Attributes
import org.typelevel.otel4s.metrics.BucketBoundaries
import org.typelevel.otel4s.metrics.MeasurementValue
Expand All @@ -32,8 +31,7 @@ import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality
import org.typelevel.otel4s.sdk.metrics.data.MetricData
import org.typelevel.otel4s.sdk.metrics.data.PointData
import org.typelevel.otel4s.sdk.metrics.data.TimeWindow
import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter
import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup
import org.typelevel.otel4s.sdk.metrics.exemplar.Reservoirs
import org.typelevel.otel4s.sdk.metrics.internal.AsynchronousMeasurement
import org.typelevel.otel4s.sdk.metrics.internal.InstrumentDescriptor
import org.typelevel.otel4s.sdk.metrics.internal.MetricDescriptor
Expand Down Expand Up @@ -195,44 +193,38 @@ private[metrics] object Aggregator {
}

/** Creates a [[Synchronous]] aggregator based on the given `aggregation`.
*
* @param reservoirs
* the allocator of exemplar reservoirs
*
* @param aggregation
* the aggregation to use
*
* @param descriptor
* the descriptor of the instrument
*
* @param filter
* used by the exemplar reservoir to filter the offered values
*
* @param traceContextLookup
* used by the exemplar reservoir to extract tracing information from the
* context
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*
* @tparam A
* the type of the values to record
*/
def synchronous[F[_]: Temporal: Random, A: MeasurementValue: Numeric](
def synchronous[F[_]: Concurrent, A: MeasurementValue: Numeric](
reservoirs: Reservoirs[F],
aggregation: Aggregation.Synchronous,
descriptor: InstrumentDescriptor.Synchronous,
filter: ExemplarFilter,
traceContextLookup: TraceContextLookup
descriptor: InstrumentDescriptor.Synchronous
): Aggregator.Synchronous[F, A] = {
def sum: Aggregator.Synchronous[F, A] =
SumAggregator.synchronous(
Runtime.getRuntime.availableProcessors,
filter,
traceContextLookup
reservoirs,
Runtime.getRuntime.availableProcessors
)

def lastValue: Aggregator.Synchronous[F, A] =
LastValueAggregator.synchronous[F, A]

def histogram(boundaries: BucketBoundaries): Aggregator.Synchronous[F, A] =
ExplicitBucketHistogramAggregator(boundaries, filter, traceContextLookup)
ExplicitBucketHistogramAggregator(reservoirs, boundaries)

aggregation match {
case Aggregation.Default =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import cats.FlatMap
import cats.data.NonEmptyVector
import cats.effect.Concurrent
import cats.effect.Ref
import cats.effect.Temporal
import cats.syntax.flatMap._
import cats.syntax.functor._
import org.typelevel.otel4s.Attributes
Expand All @@ -30,9 +29,8 @@ import org.typelevel.otel4s.sdk.TelemetryResource
import org.typelevel.otel4s.sdk.common.InstrumentationScope
import org.typelevel.otel4s.sdk.context.Context
import org.typelevel.otel4s.sdk.metrics.data._
import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter
import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarReservoir
import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup
import org.typelevel.otel4s.sdk.metrics.exemplar.Reservoirs
import org.typelevel.otel4s.sdk.metrics.internal.MetricDescriptor

/** The histogram aggregation that aggregates values into the corresponding
Expand All @@ -49,10 +47,10 @@ import org.typelevel.otel4s.sdk.metrics.internal.MetricDescriptor
*/
private final class ExplicitBucketHistogramAggregator[
F[_]: Concurrent,
A: MeasurementValue
A: MeasurementValue: Numeric
](
boundaries: BucketBoundaries,
makeReservoir: F[ExemplarReservoir[F, A]]
reservoirs: Reservoirs[F],
boundaries: BucketBoundaries
) extends Aggregator.Synchronous[F, A] {
import ExplicitBucketHistogramAggregator._

Expand All @@ -61,7 +59,7 @@ private final class ExplicitBucketHistogramAggregator[
def createAccumulator: F[Aggregator.Accumulator[F, A, PointData.Histogram]] =
for {
state <- Concurrent[F].ref(emptyState(boundaries.length))
reservoir <- makeReservoir
reservoir <- reservoirs.histogramBucket(boundaries)
} yield new Accumulator(state, boundaries, reservoir)

def toMetricData(
Expand All @@ -86,34 +84,24 @@ private final class ExplicitBucketHistogramAggregator[
private object ExplicitBucketHistogramAggregator {

/** Creates a histogram aggregation with the given values.
*
* @param reservoirs
* the allocator of exemplar reservoirs
*
* @param boundaries
* the bucket boundaries to aggregate values at
*
* @param filter
* used by the exemplar reservoir to filter the offered values
*
* @param lookup
* used by the exemplar reservoir to extract tracing information from the
* context
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*
* @tparam A
* the type of the values to record
*/
def apply[F[_]: Temporal, A: MeasurementValue: Numeric](
boundaries: BucketBoundaries,
filter: ExemplarFilter,
lookup: TraceContextLookup
): Aggregator.Synchronous[F, A] = {
val reservoir = ExemplarReservoir
.histogramBucket[F, A](boundaries, lookup)
.map(r => ExemplarReservoir.filtered(filter, r))

new ExplicitBucketHistogramAggregator[F, A](boundaries, reservoir)
}
def apply[F[_]: Concurrent, A: MeasurementValue: Numeric](
reservoirs: Reservoirs[F],
boundaries: BucketBoundaries
): Aggregator.Synchronous[F, A] =
new ExplicitBucketHistogramAggregator[F, A](reservoirs, boundaries)

private final case class State(
sum: Double,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package org.typelevel.otel4s.sdk.metrics.aggregation

import cats.Applicative
import cats.data.NonEmptyVector
import cats.effect.Temporal
import cats.effect.std.Random
import cats.effect.Concurrent
import cats.syntax.flatMap._
import cats.syntax.functor._
import org.typelevel.otel4s.Attributes
Expand All @@ -32,9 +31,8 @@ import org.typelevel.otel4s.sdk.metrics.data.AggregationTemporality
import org.typelevel.otel4s.sdk.metrics.data.MetricData
import org.typelevel.otel4s.sdk.metrics.data.MetricPoints
import org.typelevel.otel4s.sdk.metrics.data.TimeWindow
import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarFilter
import org.typelevel.otel4s.sdk.metrics.exemplar.ExemplarReservoir
import org.typelevel.otel4s.sdk.metrics.exemplar.TraceContextLookup
import org.typelevel.otel4s.sdk.metrics.exemplar.Reservoirs
import org.typelevel.otel4s.sdk.metrics.internal.AsynchronousMeasurement
import org.typelevel.otel4s.sdk.metrics.internal.MetricDescriptor
import org.typelevel.otel4s.sdk.metrics.internal.utils.Adder
Expand All @@ -47,28 +45,23 @@ private object SumAggregator {
* @see
* [[https://opentelemetry.io/docs/specs/otel/metrics/sdk/#sum-aggregation]]
*
* @param reservoirs
* the allocator of exemplar reservoirs
*
* @param reservoirSize
* the maximum number of exemplars to preserve
*
* @param filter
* used by the exemplar reservoir to filter the offered values
*
* @param lookup
* used by the exemplar reservoir to extract tracing information from the
* context
*
* @tparam F
* the higher-kinded type of a polymorphic effect
*
* @tparam A
* the type of the values to record
*/
def synchronous[F[_]: Temporal: Random, A: MeasurementValue: Numeric](
reservoirSize: Int,
filter: ExemplarFilter,
lookup: TraceContextLookup
def synchronous[F[_]: Concurrent, A: MeasurementValue: Numeric](
reservoirs: Reservoirs[F],
reservoirSize: Int
): Aggregator.Synchronous[F, A] =
new Synchronous(reservoirSize, filter, lookup)
new Synchronous(reservoirs, reservoirSize)

/** Creates a sum aggregator for asynchronous instruments.
*
Expand All @@ -88,13 +81,10 @@ private object SumAggregator {
new Asynchronous[F, A]

private final class Synchronous[
F[_]: Temporal: Random,
F[_]: Concurrent,
A: MeasurementValue: Numeric
](
reservoirSize: Int,
filter: ExemplarFilter,
traceContextLookup: TraceContextLookup
) extends Aggregator.Synchronous[F, A] {
](reservoirs: Reservoirs[F], reservoirSize: Int)
extends Aggregator.Synchronous[F, A] {

val target: Target[A] = Target[A]

Expand All @@ -103,7 +93,7 @@ private object SumAggregator {
def createAccumulator: F[Aggregator.Accumulator[F, A, Point]] =
for {
adder <- Adder.create[F, A]
reservoir <- makeReservoir
reservoir <- reservoirs.fixedSize(reservoirSize)
} yield new Accumulator(adder, reservoir)

def toMetricData(
Expand All @@ -113,7 +103,7 @@ private object SumAggregator {
points: NonEmptyVector[Point],
temporality: AggregationTemporality
): F[MetricData] =
Temporal[F].pure(
Concurrent[F].pure(
MetricData(
resource,
scope,
Expand All @@ -124,14 +114,6 @@ private object SumAggregator {
)
)

private def makeReservoir: F[ExemplarReservoir[F, A]] =
ExemplarReservoir
.fixedSize[F, A](
size = reservoirSize,
lookup = traceContextLookup
)
.map(r => ExemplarReservoir.filtered(filter, r))

private class Accumulator(
adder: Adder[F, A],
reservoir: ExemplarReservoir[F, A]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ private[metrics] object ExemplarReservoir {
original.collectAndReset(attributes)
}

/** Creates a reservoir that does not record measurements.
*/
def noop[F[_]: Applicative, A]: ExemplarReservoir[F, A] =
new ExemplarReservoir[F, A] {
def offer(value: A, attributes: Attributes, context: Context): F[Unit] =
Applicative[F].unit

def collectAndReset(attributes: Attributes): F[Vector[Exemplar[A]]] =
Applicative[F].pure(Vector.empty)
}

private def create[F[_]: Temporal, A](
size: Int,
cellSelector: CellSelector[F, A],
Expand Down
Loading

0 comments on commit 8992172

Please sign in to comment.