Skip to content

Commit

Permalink
ingest: translate known OpenTelemetry JVM metrics
Browse files Browse the repository at this point in the history
Copy well-known JVM metrics from their OpenTelemetry
names and associated labels to their counterparts
produced by the Elastic APM Java agent. This enables
users to visualise the metrics in the Elastic APM
app in Kibana.

Metrics are duplicated to avoid surprising users by
silently dropping the OpenTelemetry names. Users can
drop either one of the duplicates with ingest pipeline
customisation.
  • Loading branch information
axw committed Mar 24, 2021
1 parent 330b543 commit 0172471
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 16 deletions.
50 changes: 50 additions & 0 deletions ingest/pipeline/definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
"name": "apm_error_grouping_name",
"if": "ctx.processor?.event == 'error'"
}
},
{
"pipeline": {
"name": "apm_opentelemetry_metrics",
"if": "ctx.processor?.event == 'metric'"
}
}
]
}
Expand Down Expand Up @@ -137,5 +143,49 @@
}
]
}
},
{
"id": "apm_opentelemetry_metrics",
"body": {
"description": "Populate Elastic APM metric fields from well-known OpenTelemetry metric counterparts",
"processors": [
{
"set": {
"field": "jvm.memory.{{labels.area}}.{{labels.type}}",
"copy_from": "runtime.jvm.memory.area",
"if": "ctx.runtime?.jvm?.memory?.area != null && ctx.labels?.area != null && ctx.labels?.type != null"
}
},
{
"set": {
"field": "jvm.gc.time",
"copy_from": "runtime.jvm.gc.time",
"if": "ctx.runtime?.jvm?.gc?.time != null"
}
},
{
"set": {
"field": "jvm.gc.count",
"copy_from": "runtime.jvm.gc.count",
"if": "ctx.runtime?.jvm?.gc?.count != null"
}
},
{
"set": {
"field": "jvm.gc.time",
"copy_from": "runtime.jvm.gc.collection",
"if": "ctx.runtime?.jvm?.gc?.collection != null"
}
},
{
"set": {
"field": "labels.name",
"copy_from": "labels.gc",
"override": false,
"if": "ctx.labels?.gc != null"
}
}
]
}
}
]
35 changes: 35 additions & 0 deletions ingest/pipeline/definition.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ apm:
- pipeline:
name: apm_error_grouping_name
if: ctx.processor?.event == 'error'
- pipeline:
name: apm_opentelemetry_metrics
if: ctx.processor?.event == 'metric'

apm_user_agent:
description: Add user agent information for APM events
Expand Down Expand Up @@ -74,3 +77,35 @@ apm_error_grouping_name:
field: error.grouping_name
copy_from: error.log.message
if: ctx.error?.log?.message != null

apm_opentelemetry_metrics:
description: Populate Elastic APM metric fields from well-known OpenTelemetry metric counterparts
processors:
- set:
# Copy `runtime.jvm.memory.area` (OpenTelemetry) to `jvm.memory.{area}.{type}` (Elastic APM).
field: jvm.memory.{{labels.area}}.{{labels.type}}
copy_from: runtime.jvm.memory.area
if: ctx.runtime?.jvm?.memory?.area != null && ctx.labels?.area != null && ctx.labels?.type != null
- set:
# Copy `runtime.jvm.gc.time` (OpenTelemetry) to `jvm.gc.time` (Elastic APM).
# Both are defined in milliseconds.
field: jvm.gc.time
copy_from: runtime.jvm.gc.time
if: ctx.runtime?.jvm?.gc?.time != null
- set:
# Copy `runtime.jvm.gc.count` (OpenTelemetry) to `jvm.gc.count` (Elastic APM).
field: jvm.gc.count
copy_from: runtime.jvm.gc.count
if: ctx.runtime?.jvm?.gc?.count != null
- set:
# Copy `runtime.jvm.gc.collection` (OpenTelemetry) to `jvm.gc.time` (Elastic APM).
# Both are defined in milliseconds. This is the old name for `runtime.jvm.gc.time`.
field: jvm.gc.time
copy_from: runtime.jvm.gc.collection
if: ctx.runtime?.jvm?.gc?.collection != null
- set:
# Copy `labels.gc` (OpenTelemetry) to `labels.name` (Elastic APM), for jvm.gc.{time,count}.
field: labels.name
copy_from: labels.gc
override: false # don't replace existing labels.name field, if any
if: ctx.labels?.gc != null
91 changes: 75 additions & 16 deletions systemtest/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
Expand Down Expand Up @@ -87,7 +88,15 @@ func TestOTLPGRPCMetrics(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = sendOTLPMetrics(ctx, srv)
aggregator := simple.NewWithHistogramDistribution([]float64{1, 100, 1000, 10000})
err = sendOTLPMetrics(ctx, srv, aggregator, func(meter metric.MeterMust) {
float64Counter := meter.NewFloat64Counter("float64_counter")
float64Counter.Add(context.Background(), 1)

// This will be dropped, as we do not support consuming histograms yet.
int64Recorder := meter.NewInt64ValueRecorder("int64_recorder")
int64Recorder.Record(context.Background(), 123)
})
require.NoError(t, err)

result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{
Expand Down Expand Up @@ -158,6 +167,61 @@ func TestOTLPClientIP(t *testing.T) {
assert.True(t, gjson.GetBytes(result.Hits.Hits[0].RawSource, "client.ip").Exists())
}

func TestOpenTelemetryJavaMetrics(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewUnstartedServer(t)
err := srv.Start()
require.NoError(t, err)

aggregator := simple.NewWithExactDistribution()
err = sendOTLPMetrics(context.Background(), srv, aggregator, func(meter metric.MeterMust) {
// Record well-known JVM runtime metrics, to test that they are
// copied to their Elastic APM equivalents during ingest.
jvmGCTime := meter.NewInt64Counter("runtime.jvm.gc.time")
jvmGCCount := meter.NewInt64Counter("runtime.jvm.gc.count")
jvmGCTime.Bind(label.String("gc", "G1 Young Generation")).Add(context.Background(), 123)
jvmGCCount.Bind(label.String("gc", "G1 Young Generation")).Add(context.Background(), 1)
jvmMemoryArea := meter.NewInt64UpDownCounter("runtime.jvm.memory.area")
jvmMemoryArea.Bind(
label.String("area", "heap"),
label.String("type", "used"),
).Add(context.Background(), 42)
})
require.NoError(t, err)

result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*", estest.BoolQuery{Filter: []interface{}{
estest.TermQuery{Field: "processor.event", Value: "metric"},
}})
require.Len(t, result.Hits.Hits, 2) // one for each set of labels

var gcHit, memoryAreaHit estest.SearchHit
for _, hit := range result.Hits.Hits {
require.Contains(t, hit.Source, "jvm")
switch {
case gjson.GetBytes(hit.RawSource, "labels.gc").Exists():
gcHit = hit
case gjson.GetBytes(hit.RawSource, "labels.area").Exists():
memoryAreaHit = hit
}
}

assert.Equal(t, 123.0, gjson.GetBytes(gcHit.RawSource, "runtime.jvm.gc.time").Value())
assert.Equal(t, 1.0, gjson.GetBytes(gcHit.RawSource, "runtime.jvm.gc.count").Value())
assert.Equal(t, map[string]interface{}{
"gc": "G1 Young Generation",
"name": "G1 Young Generation",
}, gcHit.Source["labels"])
assert.Equal(t, 123.0, gjson.GetBytes(gcHit.RawSource, "jvm.gc.time").Value())
assert.Equal(t, 1.0, gjson.GetBytes(gcHit.RawSource, "jvm.gc.count").Value())

assert.Equal(t, 42.0, gjson.GetBytes(memoryAreaHit.RawSource, "runtime.jvm.memory.area").Value())
assert.Equal(t, map[string]interface{}{
"area": "heap",
"type": "used",
}, memoryAreaHit.Source["labels"])
assert.Equal(t, 42.0, gjson.GetBytes(memoryAreaHit.RawSource, "jvm.memory.heap.used").Value())
}

func sendOTLPTrace(ctx context.Context, srv *apmservertest.Server, config sdktrace.Config, options ...otlpgrpc.Option) error {
options = append(options, otlpgrpc.WithEndpoint(serverAddr(srv)), otlpgrpc.WithInsecure())
driver := otlpgrpc.NewDriver(options...)
Expand Down Expand Up @@ -205,34 +269,29 @@ func sendOTLPTrace(ctx context.Context, srv *apmservertest.Server, config sdktra
}
}

func sendOTLPMetrics(ctx context.Context, srv *apmservertest.Server, options ...otlpgrpc.Option) error {
options = append(options, otlpgrpc.WithEndpoint(serverAddr(srv)), otlpgrpc.WithInsecure())
driver := otlpgrpc.NewDriver(options...)
func sendOTLPMetrics(
ctx context.Context,
srv *apmservertest.Server,
aggregator export.AggregatorSelector,
recordMetrics func(metric.MeterMust),
) error {
driver := otlpgrpc.NewDriver(otlpgrpc.WithEndpoint(serverAddr(srv)), otlpgrpc.WithInsecure())
exporter, err := otlp.NewExporter(context.Background(), driver)
if err != nil {
panic(err)
}

controller := controller.New(
processor.New(
simple.NewWithHistogramDistribution([]float64{1, 100, 1000, 10000}),
exporter,
),
processor.New(aggregator, exporter),
controller.WithPusher(exporter),
controller.WithCollectPeriod(time.Minute),
)
if err := controller.Start(context.Background()); err != nil {
return err
}
meterProvider := controller.MeterProvider()
meter := meterProvider.Meter("test-meter")

float64Counter := metric.Must(meter).NewFloat64Counter("float64_counter")
float64Counter.Add(context.Background(), 1)

// This will be dropped, as we do not support consuming histograms yet.
int64Recorder := metric.Must(meter).NewInt64ValueRecorder("int64_recorder")
int64Recorder.Record(context.Background(), 123)
meter := metric.Must(meterProvider.Meter("test-meter"))
recordMetrics(meter)

// Stopping the controller will collect and export metrics.
if err := controller.Stop(context.Background()); err != nil {
Expand Down

0 comments on commit 0172471

Please sign in to comment.