diff --git a/apmpackage/apm/0.2.0/data_stream/app_metrics/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json b/apmpackage/apm/0.2.0/data_stream/app_metrics/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json new file mode 100644 index 00000000000..18a733bff7c --- /dev/null +++ b/apmpackage/apm/0.2.0/data_stream/app_metrics/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json @@ -0,0 +1,41 @@ +{ + "description": "Populate Elastic APM metric fields from well-known OpenTelemetry metric counterparts", + "processors": [ + { + "set": { + "field": "jvm.memory.{{labels.area}}.{{labels.type}}", + "copy_from": "runtime.jvm.memory.area", + "if": "ctx.runtime?.jvm?.memory?.area != null && ctx.labels?.area != null && ctx.labels?.type != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.collection", + "if": "ctx.runtime?.jvm?.gc?.collection != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.time", + "if": "ctx.runtime?.jvm?.gc?.time != null" + } + }, + { + "set": { + "field": "jvm.gc.count", + "copy_from": "runtime.jvm.gc.count", + "if": "ctx.runtime?.jvm?.gc?.count != null" + } + }, + { + "set": { + "field": "labels.name", + "copy_from": "labels.gc", + "override": false, + "if": "ctx.labels?.gc != null && ctx.runtime?.jvm?.gc != null" + } + } + ] +} \ No newline at end of file diff --git a/apmpackage/apm/0.2.0/data_stream/app_metrics/elasticsearch/ingest_pipeline/default.json b/apmpackage/apm/0.2.0/data_stream/app_metrics/elasticsearch/ingest_pipeline/default.json index fc76b027f97..8c8723a674f 100644 --- a/apmpackage/apm/0.2.0/data_stream/app_metrics/elasticsearch/ingest_pipeline/default.json +++ b/apmpackage/apm/0.2.0/data_stream/app_metrics/elasticsearch/ingest_pipeline/default.json @@ -26,6 +26,12 @@ "name": "metrics-apm.app-0.2.0-apm_error_grouping_name", "if": "ctx.processor?.event == 'error'" } + }, + { + "pipeline": { + "name": "metrics-apm.app-0.2.0-apm_opentelemetry_metrics", + "if": "ctx.processor?.event == 'metric'" + } } ] } \ No newline at end of file diff --git a/apmpackage/apm/0.2.0/data_stream/error_logs/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json b/apmpackage/apm/0.2.0/data_stream/error_logs/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json new file mode 100644 index 00000000000..18a733bff7c --- /dev/null +++ b/apmpackage/apm/0.2.0/data_stream/error_logs/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json @@ -0,0 +1,41 @@ +{ + "description": "Populate Elastic APM metric fields from well-known OpenTelemetry metric counterparts", + "processors": [ + { + "set": { + "field": "jvm.memory.{{labels.area}}.{{labels.type}}", + "copy_from": "runtime.jvm.memory.area", + "if": "ctx.runtime?.jvm?.memory?.area != null && ctx.labels?.area != null && ctx.labels?.type != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.collection", + "if": "ctx.runtime?.jvm?.gc?.collection != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.time", + "if": "ctx.runtime?.jvm?.gc?.time != null" + } + }, + { + "set": { + "field": "jvm.gc.count", + "copy_from": "runtime.jvm.gc.count", + "if": "ctx.runtime?.jvm?.gc?.count != null" + } + }, + { + "set": { + "field": "labels.name", + "copy_from": "labels.gc", + "override": false, + "if": "ctx.labels?.gc != null && ctx.runtime?.jvm?.gc != null" + } + } + ] +} \ No newline at end of file diff --git a/apmpackage/apm/0.2.0/data_stream/error_logs/elasticsearch/ingest_pipeline/default.json b/apmpackage/apm/0.2.0/data_stream/error_logs/elasticsearch/ingest_pipeline/default.json index e01f7a1f331..9cd4d1d5c95 100644 --- a/apmpackage/apm/0.2.0/data_stream/error_logs/elasticsearch/ingest_pipeline/default.json +++ b/apmpackage/apm/0.2.0/data_stream/error_logs/elasticsearch/ingest_pipeline/default.json @@ -26,6 +26,12 @@ "name": "logs-apm.error-0.2.0-apm_error_grouping_name", "if": "ctx.processor?.event == 'error'" } + }, + { + "pipeline": { + "name": "logs-apm.error-0.2.0-apm_opentelemetry_metrics", + "if": "ctx.processor?.event == 'metric'" + } } ] } \ No newline at end of file diff --git a/apmpackage/apm/0.2.0/data_stream/internal_metrics/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json b/apmpackage/apm/0.2.0/data_stream/internal_metrics/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json new file mode 100644 index 00000000000..18a733bff7c --- /dev/null +++ b/apmpackage/apm/0.2.0/data_stream/internal_metrics/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json @@ -0,0 +1,41 @@ +{ + "description": "Populate Elastic APM metric fields from well-known OpenTelemetry metric counterparts", + "processors": [ + { + "set": { + "field": "jvm.memory.{{labels.area}}.{{labels.type}}", + "copy_from": "runtime.jvm.memory.area", + "if": "ctx.runtime?.jvm?.memory?.area != null && ctx.labels?.area != null && ctx.labels?.type != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.collection", + "if": "ctx.runtime?.jvm?.gc?.collection != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.time", + "if": "ctx.runtime?.jvm?.gc?.time != null" + } + }, + { + "set": { + "field": "jvm.gc.count", + "copy_from": "runtime.jvm.gc.count", + "if": "ctx.runtime?.jvm?.gc?.count != null" + } + }, + { + "set": { + "field": "labels.name", + "copy_from": "labels.gc", + "override": false, + "if": "ctx.labels?.gc != null && ctx.runtime?.jvm?.gc != null" + } + } + ] +} \ No newline at end of file diff --git a/apmpackage/apm/0.2.0/data_stream/internal_metrics/elasticsearch/ingest_pipeline/default.json b/apmpackage/apm/0.2.0/data_stream/internal_metrics/elasticsearch/ingest_pipeline/default.json index f2d2c341250..34c86b112d6 100644 --- a/apmpackage/apm/0.2.0/data_stream/internal_metrics/elasticsearch/ingest_pipeline/default.json +++ b/apmpackage/apm/0.2.0/data_stream/internal_metrics/elasticsearch/ingest_pipeline/default.json @@ -26,6 +26,12 @@ "name": "metrics-apm.internal-0.2.0-apm_error_grouping_name", "if": "ctx.processor?.event == 'error'" } + }, + { + "pipeline": { + "name": "metrics-apm.internal-0.2.0-apm_opentelemetry_metrics", + "if": "ctx.processor?.event == 'metric'" + } } ] } \ No newline at end of file diff --git a/apmpackage/apm/0.2.0/data_stream/profile_metrics/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json b/apmpackage/apm/0.2.0/data_stream/profile_metrics/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json new file mode 100644 index 00000000000..18a733bff7c --- /dev/null +++ b/apmpackage/apm/0.2.0/data_stream/profile_metrics/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json @@ -0,0 +1,41 @@ +{ + "description": "Populate Elastic APM metric fields from well-known OpenTelemetry metric counterparts", + "processors": [ + { + "set": { + "field": "jvm.memory.{{labels.area}}.{{labels.type}}", + "copy_from": "runtime.jvm.memory.area", + "if": "ctx.runtime?.jvm?.memory?.area != null && ctx.labels?.area != null && ctx.labels?.type != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.collection", + "if": "ctx.runtime?.jvm?.gc?.collection != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.time", + "if": "ctx.runtime?.jvm?.gc?.time != null" + } + }, + { + "set": { + "field": "jvm.gc.count", + "copy_from": "runtime.jvm.gc.count", + "if": "ctx.runtime?.jvm?.gc?.count != null" + } + }, + { + "set": { + "field": "labels.name", + "copy_from": "labels.gc", + "override": false, + "if": "ctx.labels?.gc != null && ctx.runtime?.jvm?.gc != null" + } + } + ] +} \ No newline at end of file diff --git a/apmpackage/apm/0.2.0/data_stream/profile_metrics/elasticsearch/ingest_pipeline/default.json b/apmpackage/apm/0.2.0/data_stream/profile_metrics/elasticsearch/ingest_pipeline/default.json index 2a24373d840..d6ce3932157 100644 --- a/apmpackage/apm/0.2.0/data_stream/profile_metrics/elasticsearch/ingest_pipeline/default.json +++ b/apmpackage/apm/0.2.0/data_stream/profile_metrics/elasticsearch/ingest_pipeline/default.json @@ -26,6 +26,12 @@ "name": "metrics-apm.profiling-0.2.0-apm_error_grouping_name", "if": "ctx.processor?.event == 'error'" } + }, + { + "pipeline": { + "name": "metrics-apm.profiling-0.2.0-apm_opentelemetry_metrics", + "if": "ctx.processor?.event == 'metric'" + } } ] } \ No newline at end of file diff --git a/apmpackage/apm/0.2.0/data_stream/traces/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json b/apmpackage/apm/0.2.0/data_stream/traces/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json new file mode 100644 index 00000000000..18a733bff7c --- /dev/null +++ b/apmpackage/apm/0.2.0/data_stream/traces/elasticsearch/ingest_pipeline/apm_opentelemetry_metrics.json @@ -0,0 +1,41 @@ +{ + "description": "Populate Elastic APM metric fields from well-known OpenTelemetry metric counterparts", + "processors": [ + { + "set": { + "field": "jvm.memory.{{labels.area}}.{{labels.type}}", + "copy_from": "runtime.jvm.memory.area", + "if": "ctx.runtime?.jvm?.memory?.area != null && ctx.labels?.area != null && ctx.labels?.type != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.collection", + "if": "ctx.runtime?.jvm?.gc?.collection != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.time", + "if": "ctx.runtime?.jvm?.gc?.time != null" + } + }, + { + "set": { + "field": "jvm.gc.count", + "copy_from": "runtime.jvm.gc.count", + "if": "ctx.runtime?.jvm?.gc?.count != null" + } + }, + { + "set": { + "field": "labels.name", + "copy_from": "labels.gc", + "override": false, + "if": "ctx.labels?.gc != null && ctx.runtime?.jvm?.gc != null" + } + } + ] +} \ No newline at end of file diff --git a/apmpackage/apm/0.2.0/data_stream/traces/elasticsearch/ingest_pipeline/default.json b/apmpackage/apm/0.2.0/data_stream/traces/elasticsearch/ingest_pipeline/default.json index 69668e5701d..fa672161fed 100644 --- a/apmpackage/apm/0.2.0/data_stream/traces/elasticsearch/ingest_pipeline/default.json +++ b/apmpackage/apm/0.2.0/data_stream/traces/elasticsearch/ingest_pipeline/default.json @@ -26,6 +26,12 @@ "name": "traces-apm-0.2.0-apm_error_grouping_name", "if": "ctx.processor?.event == 'error'" } + }, + { + "pipeline": { + "name": "traces-apm-0.2.0-apm_opentelemetry_metrics", + "if": "ctx.processor?.event == 'metric'" + } } ] } \ No newline at end of file diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc new file mode 100644 index 00000000000..3ecbd264a15 --- /dev/null +++ b/changelogs/head.asciidoc @@ -0,0 +1,36 @@ +[[release-notes-head]] +== APM Server version HEAD + +https://github.com/elastic/apm-server/compare/7.12\...master[View commits] + +[float] +==== Breaking Changes + +[float] +==== Bug fixes +* Fix `setup.template` config merging {pull}4950[4950] + +[float] +==== Intake API Changes + +[float] +==== Added +* Add support for Node.js wall time profiles {pull}4728[4728] +* Add metricset.name field to metric docs {pull}4857[4857] +* Add `apm-server.default_service_environment` config {pull}4861[4861] +* Transaction histogram metrics are now recorded by default {pull}4882[4882] +* Add `error.grouping_name` field to speed up error grouping aggregations {pull}4886[4886] +* Add support for OpenTelemetry exception span events {pull}4876[4876] +* Set metricset.name for breakdown metrics {pull}4910[4910] +* Set log and http responses for server timeout {pull}4918[4918] +* Define ES fields for cgroup.cpu and cgroup.cpuacct metrics {pull}4956[4956] +* Log gRPC tracing requests {pull}4934[4934] +* Improved coverage of translation of OpenTelemetry resource conventions {pull}4955[4955] +* Set `client.ip` for events from the Elastic APM iOS agent {pull}4975[4975] +* Calculate service destination metrics for OpenTelemetry spans {pull}4976[4976] +* Add exponential retries to api key and tail sampling requests{pull}4991[4991] +* Add `apm-server.rum.allow_service_names` config {pull}5030[5030] +* Ingest pipeline for translating OpenTelemetry Java metrics to Elastic APM fields {pull}4986[4986] + +[float] +==== Deprecated diff --git a/docs/configuring-ingest.asciidoc b/docs/configuring-ingest.asciidoc index 7e132db035c..fa021ad109b 100644 --- a/docs/configuring-ingest.asciidoc +++ b/docs/configuring-ingest.asciidoc @@ -35,6 +35,17 @@ that are already available on the parent transaction. In previous versions of APM Server, this functionality was hardcoded internally. Switching metadata cleanup from an internal process to a processor allows you to keep any span metadata that is important in your architecture. +`apm_error_grouping_name`:: +added:[7.13] +Adds `error.grouping_name` to error documents for use in the {kibana-ref}/xpack-apm.html[Kibana APM UI]. + +`apm_opentelemetry_metrics`:: +added:[7.13] +Copies well-known OpenTelemetry metrics to their Elastic APM counterparts, for vizualisation in the {kibana-ref}/xpack-apm.html[Kibana APM UI]. +For example, the OpenTelemetry metric field `runtime.jvm.gc.time` is copied to the Elastic APM metric field `jvm.gc.time`. + +Metrics are duplicated so you can refer to them by either the OpenTelemetry or Elastic APM metric name. + See the complete pipeline definition by navigating to the APM Server's home directory, and then viewing `ingest/pipeline/definition.json`. diff --git a/ingest/pipeline/definition.json b/ingest/pipeline/definition.json index 1fa3b1d1021..b1f8c81fe25 100644 --- a/ingest/pipeline/definition.json +++ b/ingest/pipeline/definition.json @@ -29,6 +29,12 @@ "name": "apm_error_grouping_name", "if": "ctx.processor?.event == 'error'" } + }, + { + "pipeline": { + "name": "apm_opentelemetry_metrics", + "if": "ctx.processor?.event == 'metric'" + } } ] } @@ -137,5 +143,49 @@ } ] } + }, + { + "id": "apm_opentelemetry_metrics", + "body": { + "description": "Populate Elastic APM metric fields from well-known OpenTelemetry metric counterparts", + "processors": [ + { + "set": { + "field": "jvm.memory.{{labels.area}}.{{labels.type}}", + "copy_from": "runtime.jvm.memory.area", + "if": "ctx.runtime?.jvm?.memory?.area != null && ctx.labels?.area != null && ctx.labels?.type != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.collection", + "if": "ctx.runtime?.jvm?.gc?.collection != null" + } + }, + { + "set": { + "field": "jvm.gc.time", + "copy_from": "runtime.jvm.gc.time", + "if": "ctx.runtime?.jvm?.gc?.time != null" + } + }, + { + "set": { + "field": "jvm.gc.count", + "copy_from": "runtime.jvm.gc.count", + "if": "ctx.runtime?.jvm?.gc?.count != null" + } + }, + { + "set": { + "field": "labels.name", + "copy_from": "labels.gc", + "override": false, + "if": "ctx.labels?.gc != null && ctx.runtime?.jvm?.gc != null" + } + } + ] + } } ] \ No newline at end of file diff --git a/ingest/pipeline/definition.yml b/ingest/pipeline/definition.yml index cbc681b423f..aedc93cffc9 100644 --- a/ingest/pipeline/definition.yml +++ b/ingest/pipeline/definition.yml @@ -12,6 +12,9 @@ apm: - pipeline: name: apm_error_grouping_name if: ctx.processor?.event == 'error' + - pipeline: + name: apm_opentelemetry_metrics + if: ctx.processor?.event == 'metric' apm_user_agent: description: Add user agent information for APM events @@ -74,3 +77,35 @@ apm_error_grouping_name: field: error.grouping_name copy_from: error.log.message if: ctx.error?.log?.message != null + +apm_opentelemetry_metrics: + description: Populate Elastic APM metric fields from well-known OpenTelemetry metric counterparts + processors: + - set: + # Copy `runtime.jvm.memory.area` (OpenTelemetry) to `jvm.memory.{area}.{type}` (Elastic APM). + field: jvm.memory.{{labels.area}}.{{labels.type}} + copy_from: runtime.jvm.memory.area + if: ctx.runtime?.jvm?.memory?.area != null && ctx.labels?.area != null && ctx.labels?.type != null + - set: + # Copy `runtime.jvm.gc.collection` (OpenTelemetry) to `jvm.gc.time` (Elastic APM). + # Both are defined in milliseconds. This is the old name for `runtime.jvm.gc.time`. + field: jvm.gc.time + copy_from: runtime.jvm.gc.collection + if: ctx.runtime?.jvm?.gc?.collection != null + - set: + # Copy `runtime.jvm.gc.time` (OpenTelemetry) to `jvm.gc.time` (Elastic APM). + # Both are defined in milliseconds. + field: jvm.gc.time + copy_from: runtime.jvm.gc.time + if: ctx.runtime?.jvm?.gc?.time != null + - set: + # Copy `runtime.jvm.gc.count` (OpenTelemetry) to `jvm.gc.count` (Elastic APM). + field: jvm.gc.count + copy_from: runtime.jvm.gc.count + if: ctx.runtime?.jvm?.gc?.count != null + - set: + # Copy `labels.gc` (OpenTelemetry) to `labels.name` (Elastic APM), for jvm.gc.{time,count}. + field: labels.name + copy_from: labels.gc + override: false # don't replace existing labels.name field, if any + if: ctx.labels?.gc != null && ctx.runtime?.jvm?.gc != null diff --git a/systemtest/otlp_test.go b/systemtest/otlp_test.go index 9f182fab01f..7921440d19d 100644 --- a/systemtest/otlp_test.go +++ b/systemtest/otlp_test.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + export "go.opentelemetry.io/otel/sdk/export/metric" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/metric/selector/simple" @@ -87,7 +88,15 @@ func TestOTLPGRPCMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err = sendOTLPMetrics(ctx, srv) + aggregator := simple.NewWithHistogramDistribution([]float64{1, 100, 1000, 10000}) + err = sendOTLPMetrics(ctx, srv, aggregator, func(meter metric.MeterMust) { + float64Counter := meter.NewFloat64Counter("float64_counter") + float64Counter.Add(context.Background(), 1) + + // This will be dropped, as we do not support consuming histograms yet. + int64Recorder := meter.NewInt64ValueRecorder("int64_recorder") + int64Recorder.Record(context.Background(), 123) + }) require.NoError(t, err) result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{ @@ -158,6 +167,61 @@ func TestOTLPClientIP(t *testing.T) { assert.True(t, gjson.GetBytes(result.Hits.Hits[0].RawSource, "client.ip").Exists()) } +func TestOpenTelemetryJavaMetrics(t *testing.T) { + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewUnstartedServer(t) + err := srv.Start() + require.NoError(t, err) + + aggregator := simple.NewWithExactDistribution() + err = sendOTLPMetrics(context.Background(), srv, aggregator, func(meter metric.MeterMust) { + // Record well-known JVM runtime metrics, to test that they are + // copied to their Elastic APM equivalents during ingest. + jvmGCTime := meter.NewInt64Counter("runtime.jvm.gc.time") + jvmGCCount := meter.NewInt64Counter("runtime.jvm.gc.count") + jvmGCTime.Bind(label.String("gc", "G1 Young Generation")).Add(context.Background(), 123) + jvmGCCount.Bind(label.String("gc", "G1 Young Generation")).Add(context.Background(), 1) + jvmMemoryArea := meter.NewInt64UpDownCounter("runtime.jvm.memory.area") + jvmMemoryArea.Bind( + label.String("area", "heap"), + label.String("type", "used"), + ).Add(context.Background(), 42) + }) + require.NoError(t, err) + + result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*", estest.BoolQuery{Filter: []interface{}{ + estest.TermQuery{Field: "processor.event", Value: "metric"}, + }}) + require.Len(t, result.Hits.Hits, 2) // one for each set of labels + + var gcHit, memoryAreaHit estest.SearchHit + for _, hit := range result.Hits.Hits { + require.Contains(t, hit.Source, "jvm") + switch { + case gjson.GetBytes(hit.RawSource, "labels.gc").Exists(): + gcHit = hit + case gjson.GetBytes(hit.RawSource, "labels.area").Exists(): + memoryAreaHit = hit + } + } + + assert.Equal(t, 123.0, gjson.GetBytes(gcHit.RawSource, "runtime.jvm.gc.time").Value()) + assert.Equal(t, 1.0, gjson.GetBytes(gcHit.RawSource, "runtime.jvm.gc.count").Value()) + assert.Equal(t, map[string]interface{}{ + "gc": "G1 Young Generation", + "name": "G1 Young Generation", + }, gcHit.Source["labels"]) + assert.Equal(t, 123.0, gjson.GetBytes(gcHit.RawSource, "jvm.gc.time").Value()) + assert.Equal(t, 1.0, gjson.GetBytes(gcHit.RawSource, "jvm.gc.count").Value()) + + assert.Equal(t, 42.0, gjson.GetBytes(memoryAreaHit.RawSource, "runtime.jvm.memory.area").Value()) + assert.Equal(t, map[string]interface{}{ + "area": "heap", + "type": "used", + }, memoryAreaHit.Source["labels"]) + assert.Equal(t, 42.0, gjson.GetBytes(memoryAreaHit.RawSource, "jvm.memory.heap.used").Value()) +} + func sendOTLPTrace(ctx context.Context, srv *apmservertest.Server, config sdktrace.Config, options ...otlpgrpc.Option) error { options = append(options, otlpgrpc.WithEndpoint(serverAddr(srv)), otlpgrpc.WithInsecure()) driver := otlpgrpc.NewDriver(options...) @@ -205,19 +269,20 @@ func sendOTLPTrace(ctx context.Context, srv *apmservertest.Server, config sdktra } } -func sendOTLPMetrics(ctx context.Context, srv *apmservertest.Server, options ...otlpgrpc.Option) error { - options = append(options, otlpgrpc.WithEndpoint(serverAddr(srv)), otlpgrpc.WithInsecure()) - driver := otlpgrpc.NewDriver(options...) +func sendOTLPMetrics( + ctx context.Context, + srv *apmservertest.Server, + aggregator export.AggregatorSelector, + recordMetrics func(metric.MeterMust), +) error { + driver := otlpgrpc.NewDriver(otlpgrpc.WithEndpoint(serverAddr(srv)), otlpgrpc.WithInsecure()) exporter, err := otlp.NewExporter(context.Background(), driver) if err != nil { panic(err) } controller := controller.New( - processor.New( - simple.NewWithHistogramDistribution([]float64{1, 100, 1000, 10000}), - exporter, - ), + processor.New(aggregator, exporter), controller.WithPusher(exporter), controller.WithCollectPeriod(time.Minute), ) @@ -225,14 +290,8 @@ func sendOTLPMetrics(ctx context.Context, srv *apmservertest.Server, options ... return err } meterProvider := controller.MeterProvider() - meter := meterProvider.Meter("test-meter") - - float64Counter := metric.Must(meter).NewFloat64Counter("float64_counter") - float64Counter.Add(context.Background(), 1) - - // This will be dropped, as we do not support consuming histograms yet. - int64Recorder := metric.Must(meter).NewInt64ValueRecorder("int64_recorder") - int64Recorder.Record(context.Background(), 123) + meter := metric.Must(meterProvider.Meter("test-meter")) + recordMetrics(meter) // Stopping the controller will collect and export metrics. if err := controller.Stop(context.Background()); err != nil {