Skip to content

Commit

Permalink
sdk-metrics: make MetricPoints use NonEmptyVector for points
Browse files Browse the repository at this point in the history
  • Loading branch information
iRevive committed Apr 27, 2024
1 parent 33b6f5d commit a85e127
Show file tree
Hide file tree
Showing 31 changed files with 328 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
package org.typelevel.otel4s
package scalacheck

import cats.data.NonEmptyVector
import org.scalacheck.Arbitrary
import org.scalacheck.Gen

trait Gens {

def nonEmptyVector[A](gen: Gen[A]): Gen[NonEmptyVector[A]] =
for {
head <- gen
tail <- Gen.nonEmptyContainerOf[Vector, A](gen)
} yield NonEmptyVector(head, tail)

val nonEmptyString: Gen[String] =
for {
id <- Gen.identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,21 @@ private object MetricsProtoEncoder {
case sum: MetricPoints.Sum =>
Proto.Metric.Data.Sum(
Proto.Sum(
sum.points.map(ProtoEncoder.encode(_)),
sum.points.toVector.map(ProtoEncoder.encode(_)),
ProtoEncoder.encode(sum.aggregationTemporality),
sum.monotonic
)
)

case gauge: MetricPoints.Gauge =>
Proto.Metric.Data.Gauge(
Proto.Gauge(gauge.points.map(ProtoEncoder.encode(_)))
Proto.Gauge(gauge.points.toVector.map(ProtoEncoder.encode(_)))
)

case histogram: MetricPoints.Histogram =>
Proto.Metric.Data.Histogram(
Proto.Histogram(
histogram.points.map(ProtoEncoder.encode(_)),
histogram.points.toVector.map(ProtoEncoder.encode(_)),
ProtoEncoder.encode(histogram.aggregationTemporality)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sdk
package exporter.otlp
package metrics

import cats.data.NonEmptyVector
import io.circe.Encoder
import io.circe.Json
import io.circe.syntax._
Expand Down Expand Up @@ -114,7 +115,7 @@ private object MetricsJsonCodecs extends JsonCodecs {

Json
.obj(
"dataPoints" := (sum.points: Vector[PointData.NumberPoint]),
"dataPoints" := (sum.points: NonEmptyVector[PointData.NumberPoint]),
"aggregationTemporality" := sum.aggregationTemporality,
"isMonotonic" := monotonic
)
Expand All @@ -126,7 +127,7 @@ private object MetricsJsonCodecs extends JsonCodecs {
Encoder.instance { gauge =>
Json
.obj(
"dataPoints" := (gauge.points: Vector[PointData.NumberPoint])
"dataPoints" := (gauge.points: NonEmptyVector[PointData.NumberPoint])
)
.dropEmptyValues
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.typelevel.otel4s.sdk.metrics.aggregation

import cats.Applicative
import cats.data.NonEmptyVector
import cats.effect.Temporal
import cats.effect.std.Random
import org.typelevel.otel4s.Attributes
Expand Down Expand Up @@ -84,7 +85,7 @@ private[metrics] object Aggregator {
resource: TelemetryResource,
scope: InstrumentationScope,
descriptor: MetricDescriptor,
points: Vector[Point],
points: NonEmptyVector[Point],
temporality: AggregationTemporality
): F[MetricData]
}
Expand Down Expand Up @@ -137,7 +138,7 @@ private[metrics] object Aggregator {
resource: TelemetryResource,
scope: InstrumentationScope,
descriptor: MetricDescriptor,
measurements: Vector[AsynchronousMeasurement[A]],
measurements: NonEmptyVector[AsynchronousMeasurement[A]],
temporality: AggregationTemporality
): F[MetricData]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.typelevel.otel4s.sdk.metrics.aggregation

import cats.FlatMap
import cats.data.NonEmptyVector
import cats.effect.Concurrent
import cats.effect.Ref
import cats.effect.Temporal
Expand Down Expand Up @@ -67,7 +68,7 @@ private final class ExplicitBucketHistogramAggregator[
resource: TelemetryResource,
scope: InstrumentationScope,
descriptor: MetricDescriptor,
points: Vector[PointData.Histogram],
points: NonEmptyVector[PointData.Histogram],
temporality: AggregationTemporality
): F[MetricData] =
Concurrent[F].pure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.typelevel.otel4s.sdk.metrics.aggregation

import cats.Applicative
import cats.data.NonEmptyVector
import cats.effect.Concurrent
import cats.syntax.functor._
import org.typelevel.otel4s.Attributes
Expand Down Expand Up @@ -86,7 +87,7 @@ private object LastValueAggregator {
resource: TelemetryResource,
scope: InstrumentationScope,
descriptor: MetricDescriptor,
points: Vector[Point],
points: NonEmptyVector[Point],
temporality: AggregationTemporality
): F[MetricData] =
Concurrent[F].pure(
Expand Down Expand Up @@ -142,7 +143,7 @@ private object LastValueAggregator {
resource: TelemetryResource,
scope: InstrumentationScope,
descriptor: MetricDescriptor,
measurements: Vector[AsynchronousMeasurement[A]],
measurements: NonEmptyVector[AsynchronousMeasurement[A]],
temporality: AggregationTemporality
): F[MetricData] = {
val points = measurements.map { m =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.typelevel.otel4s.sdk.metrics.aggregation

import cats.Applicative
import cats.data.NonEmptyVector
import cats.effect.Temporal
import cats.effect.std.Random
import cats.syntax.flatMap._
Expand Down Expand Up @@ -109,7 +110,7 @@ private object SumAggregator {
resource: TelemetryResource,
scope: InstrumentationScope,
descriptor: MetricDescriptor,
points: Vector[Point],
points: NonEmptyVector[Point],
temporality: AggregationTemporality
): F[MetricData] =
Temporal[F].pure(
Expand Down Expand Up @@ -184,7 +185,7 @@ private object SumAggregator {
resource: TelemetryResource,
scope: InstrumentationScope,
descriptor: MetricDescriptor,
measurements: Vector[AsynchronousMeasurement[A]],
measurements: NonEmptyVector[AsynchronousMeasurement[A]],
temporality: AggregationTemporality
): F[MetricData] = {
val points = measurements.map { m =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,6 @@ sealed trait MetricData {
*/
def resource: TelemetryResource

/** Whether the measurements are empty.
*/
final def isEmpty: Boolean = data.points.isEmpty

/** Whether the measurements are non empty.
*/
final def nonEmpty: Boolean = !isEmpty

override final def hashCode(): Int =
Hash[MetricData].hash(this)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.typelevel.otel4s.sdk.metrics.data

import cats.Hash
import cats.Show
import cats.data.NonEmptyVector
import cats.syntax.foldable._

/** A collection of metric data points.
*
Expand All @@ -28,7 +30,7 @@ sealed trait MetricPoints {

/** The collection of the metric [[PointData]]s.
*/
def points: Vector[PointData]
def points: NonEmptyVector[PointData]

override final def hashCode(): Int =
Hash[MetricPoints].hash(this)
Expand All @@ -54,7 +56,7 @@ object MetricPoints {
sealed trait Sum extends MetricPoints {
type Point <: PointData.NumberPoint

def points: Vector[Point]
def points: NonEmptyVector[Point]

/** Whether the points are monotonic. If true, it means the data points are
* nominally increasing.
Expand All @@ -74,7 +76,7 @@ object MetricPoints {
sealed trait Gauge extends MetricPoints {
type Point <: PointData.NumberPoint

def points: Vector[Point]
def points: NonEmptyVector[Point]
}

/** Histogram represents the type of a metric that is calculated by
Expand All @@ -85,7 +87,7 @@ object MetricPoints {
* [[https://opentelemetry.io/docs/specs/otel/metrics/data-model/#histogram]]
*/
sealed trait Histogram extends MetricPoints {
def points: Vector[PointData.Histogram]
def points: NonEmptyVector[PointData.Histogram]

/** The aggregation temporality of this aggregation.
*/
Expand All @@ -95,7 +97,7 @@ object MetricPoints {
/** Creates a [[Sum]] with the given values.
*/
def sum[A <: PointData.NumberPoint](
points: Vector[A],
points: NonEmptyVector[A],
monotonic: Boolean,
aggregationTemporality: AggregationTemporality
): Sum =
Expand All @@ -104,29 +106,35 @@ object MetricPoints {
/** Creates a [[Gauge]] with the given values.
*/
def gauge[A <: PointData.NumberPoint](
points: Vector[A]
points: NonEmptyVector[A]
): Gauge =
GaugeImpl(points)

/** Creates a [[Histogram]] with the given values.
*/
def histogram(
points: Vector[PointData.Histogram],
points: NonEmptyVector[PointData.Histogram],
aggregationTemporality: AggregationTemporality
): Histogram =
HistogramImpl(points, aggregationTemporality)

implicit val metricPointsHash: Hash[MetricPoints] = {
val sumHash: Hash[Sum] =
Hash.by { s =>
(s.points: Vector[PointData], s.monotonic, s.aggregationTemporality)
(
s.points.toVector: Vector[PointData],
s.monotonic,
s.aggregationTemporality
)
}

val gaugeHash: Hash[Gauge] =
Hash.by(_.points: Vector[PointData])
Hash.by(_.points.toVector: Vector[PointData])

val histogramHash: Hash[Histogram] =
Hash.by(h => (h.points: Vector[PointData], h.aggregationTemporality))
Hash.by(h =>
(h.points.toVector: Vector[PointData], h.aggregationTemporality)
)

new Hash[MetricPoints] {
def hash(x: MetricPoints): Int =
Expand Down Expand Up @@ -154,30 +162,33 @@ object MetricPoints {
Show.show {
case sum: Sum =>
"MetricPoints.Sum{" +
s"points=${sum.points.mkString("{", ",", "}")}, " +
s"points=${(sum.points: NonEmptyVector[PointData]).mkString_("{", ",", "}")}, " +
s"monotonic=${sum.monotonic}, " +
s"aggregationTemporality=${sum.aggregationTemporality}}"

case gauge: Gauge =>
s"MetricPoints.Gauge{points=${gauge.points.mkString("{", ",", "}")}}"
"MetricPoints.Gauge{" +
s"points=${(gauge.points: NonEmptyVector[PointData]).mkString_("{", ",", "}")}}"

case h: Histogram =>
s"MetricPoints.Histogram{points=${h.points.mkString("{", ",", "}")}, aggregationTemporality=${h.aggregationTemporality}}"
"MetricPoints.Histogram{" +
s"points=${(h.points: NonEmptyVector[PointData]).mkString_("{", ",", "}")}, " +
s"aggregationTemporality=${h.aggregationTemporality}}"
}
}

private final case class SumImpl[A <: PointData.NumberPoint](
points: Vector[A],
points: NonEmptyVector[A],
monotonic: Boolean,
aggregationTemporality: AggregationTemporality
) extends Sum { type Point = A }

private final case class GaugeImpl[A <: PointData.NumberPoint](
points: Vector[A]
points: NonEmptyVector[A]
) extends Gauge { type Point = A }

private final case class HistogramImpl(
points: Vector[PointData.Histogram],
points: NonEmptyVector[PointData.Histogram],
aggregationTemporality: AggregationTemporality
) extends Histogram

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private[metrics] final class MeterSharedState[
result <- storages.traverse { storage =>
storage.collect(resource, scope, timeWindow)
}
} yield result.flatten.filter(_.nonEmpty)
} yield result.flatten
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.typelevel.otel4s.sdk.metrics.internal.storage

import cats.Monad
import cats.data.NonEmptyVector
import cats.effect.Concurrent
import cats.effect.Ref
import cats.effect.Temporal
Expand Down Expand Up @@ -95,20 +96,23 @@ private final class AsynchronousStorage[
scope: InstrumentationScope,
timeWindow: TimeWindow
): F[Option[MetricData]] =
collector.collectPoints.flatMap {
case measurements if measurements.nonEmpty =>
aggregator
.toMetricData(
resource,
scope,
metricDescriptor,
measurements,
aggregationTemporality
)
.map(Some(_))
collector.collectPoints.flatMap { points =>
NonEmptyVector.fromVector(points) match {
case Some(measurements) =>
aggregator
.toMetricData(
resource,
scope,
metricDescriptor,
measurements,
aggregationTemporality
)
.map(Some(_))

case _ =>
Monad[F].pure(None)
case None =>
Monad[F].pure(None)

}
}

private def cardinalityWarning: F[Unit] =
Expand Down
Loading

0 comments on commit a85e127

Please sign in to comment.