Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter-helper] [resource_to_label_conversion] add helper function and expose exporter settings #2060

Merged
merged 10 commits into from
Nov 9, 2020
4 changes: 3 additions & 1 deletion exporter/exporterhelper/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Exporter Helper

This is a helper exporter that other exporters can depend on. Today, it
primarily offers queued retries capabilities.
primarily offers queued retries and resource attributes to metric labels conversion.

> :warning: This exporter should not be added to a service pipeline.

Expand All @@ -21,5 +21,7 @@ The following configuration options can be modified:
User should calculate this as `num_seconds * requests_per_second` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds.
- `resource_to_telemetry_conversion`
- `enabled` (default = false): If `enabled` is `true`, all the resource attributes will be converted to metric labels by default.

The full list of settings exposed for this helper exporter are documented [here](factory.go).
38 changes: 25 additions & 13 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type internalOptions struct {
TimeoutSettings
QueueSettings
RetrySettings
ResourceToTelemetrySettings
Start
Shutdown
}
Expand All @@ -99,9 +100,10 @@ func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
// TODO: Enable queuing by default (call CreateDefaultQueueSettings)
QueueSettings: QueueSettings{Enabled: false},
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
Start: func(ctx context.Context, host component.Host) error { return nil },
Shutdown: func(ctx context.Context) error { return nil },
RetrySettings: RetrySettings{Enabled: false},
ResourceToTelemetrySettings: createDefaultResourceToTelemetrySettings(),
Start: func(ctx context.Context, host component.Host) error { return nil },
Shutdown: func(ctx context.Context) error { return nil },
}

for _, op := range options {
Expand Down Expand Up @@ -154,23 +156,33 @@ func WithQueue(queueSettings QueueSettings) ExporterOption {
}
}

// WithResourceToTelemetryConversion overrides the default ResourceToTelemetrySettings for an exporter.
// The default ResourceToTelemetrySettings is to disable resource attributes to metric labels conversion.
func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) ExporterOption {
return func(o *internalOptions) {
o.ResourceToTelemetrySettings = resourceToTelemetrySettings
}
}

// baseExporter contains common fields between different exporter types.
type baseExporter struct {
cfg configmodels.Exporter
sender requestSender
qrSender *queuedRetrySender
start Start
shutdown Shutdown
startOnce sync.Once
shutdownOnce sync.Once
cfg configmodels.Exporter
sender requestSender
qrSender *queuedRetrySender
start Start
shutdown Shutdown
startOnce sync.Once
shutdownOnce sync.Once
convertResourceToTelemetry bool
}

func newBaseExporter(cfg configmodels.Exporter, logger *zap.Logger, options ...ExporterOption) *baseExporter {
opts := fromConfiguredOptions(options...)
be := &baseExporter{
cfg: cfg,
start: opts.Start,
shutdown: opts.Shutdown,
cfg: cfg,
start: opts.Start,
shutdown: opts.Shutdown,
convertResourceToTelemetry: opts.ResourceToTelemetrySettings.Enabled,
}

be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings}, logger)
Expand Down
5 changes: 4 additions & 1 deletion exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ func TestBaseExporterWithOptions(t *testing.T) {
defaultExporterCfg,
zap.NewNop(),
WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }),
WithShutdown(func(ctx context.Context) error { return errors.New("my error") }))
WithShutdown(func(ctx context.Context) error { return errors.New("my error") }),
WithResourceToTelemetryConversion(createDefaultResourceToTelemetrySettings()),
WithTimeout(CreateDefaultTimeoutSettings()),
)
require.Error(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, be.Shutdown(context.Background()))
}
Expand Down
3 changes: 3 additions & 0 deletions exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type metricsExporter struct {
}

func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
if mexp.baseExporter.convertResourceToTelemetry {
md = convertResourceToLabels(md)
}
exporterCtx := obsreport.ExporterContext(ctx, mexp.cfg.Name())
req := newMetricsRequest(exporterCtx, md, mexp.pusher)
_, err := mexp.sender.send(req)
Expand Down
20 changes: 20 additions & 0 deletions exporter/exporterhelper/metricshelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,26 @@ func TestMetricsExporter_WithShutdown(t *testing.T) {
assert.True(t, shutdownCalled)
}

func TestMetricsExporter_WithResourceToTelemetryConversionDisabled(t *testing.T) {
md := testdata.GenerateMetricsTwoMetrics()
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil), WithResourceToTelemetryConversion(createDefaultResourceToTelemetrySettings()))
assert.NotNil(t, me)
assert.NoError(t, err)

assert.Nil(t, me.ConsumeMetrics(context.Background(), md))
assert.Nil(t, me.Shutdown(context.Background()))
}

func TestMetricsExporter_WithResourceToTelemetryConversionEbabled(t *testing.T) {
md := testdata.GenerateMetricsTwoMetrics()
me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil), WithResourceToTelemetryConversion(ResourceToTelemetrySettings{Enabled: true}))
assert.NotNil(t, me)
assert.NoError(t, err)

assert.Nil(t, me.ConsumeMetrics(context.Background(), md))
assert.Nil(t, me.Shutdown(context.Background()))
}

func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) {
want := errors.New("my_error")
shutdownErr := func(context.Context) error { return want }
Expand Down
138 changes: 138 additions & 0 deletions exporter/exporterhelper/resource_to_label.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporterhelper

import (
"go.opentelemetry.io/collector/consumer/pdata"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)

// ResourceToTelemetrySettings defines configuration for converting resource attributes to metric labels.
type ResourceToTelemetrySettings struct {
// Enabled indicates whether to not convert resource attributes to metric labels
Enabled bool `mapstructure:"enabled"`
}

// createDefaultResourceToTelemetrySettings returns the default settings for ResourceToTelemetrySettings.
func createDefaultResourceToTelemetrySettings() ResourceToTelemetrySettings {
return ResourceToTelemetrySettings{
Enabled: false,
}
}

// convertResourceToLabels converts all resource attributes to metric labels
func convertResourceToLabels(md pdata.Metrics) pdata.Metrics {
cloneMd := md.Clone()
rms := cloneMd.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
resource := rms.At(i).Resource()

labelMap := extractLabelsFromResource(&resource)

ilms := rms.At(i).InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
if ilm.IsNil() {
continue
}
metricSlice := ilm.Metrics()
for k := 0; k < metricSlice.Len(); k++ {
metric := metricSlice.At(k)
if metric.IsNil() {
continue
}
addLabelsToMetric(&metric, labelMap)
}
}
}
return cloneMd
}

// extractAttributesFromResource extracts the attributes from a given resource and
// returns them as a StringMap.
func extractLabelsFromResource(resource *pdata.Resource) pdata.StringMap {
labelMap := pdata.NewStringMap()

attrMap := resource.Attributes()
attrMap.ForEach(func(k string, av pdata.AttributeValue) {
stringLabel := tracetranslator.AttributeValueToString(av, false)
labelMap.Upsert(k, stringLabel)
})
return labelMap
}

// addLabelsToMetric adds additional labels to the given metric
func addLabelsToMetric(metric *pdata.Metric, labelMap pdata.StringMap) {
switch metric.DataType() {
case pdata.MetricDataTypeIntGauge:
addLabelsToIntDataPoints(metric.IntGauge().DataPoints(), labelMap)
case pdata.MetricDataTypeDoubleGauge:
addLabelsToDoubleDataPoints(metric.DoubleGauge().DataPoints(), labelMap)
case pdata.MetricDataTypeIntSum:
addLabelsToIntDataPoints(metric.IntSum().DataPoints(), labelMap)
case pdata.MetricDataTypeDoubleSum:
addLabelsToDoubleDataPoints(metric.DoubleSum().DataPoints(), labelMap)
case pdata.MetricDataTypeIntHistogram:
addLabelsToIntHistogramDataPoints(metric.IntHistogram().DataPoints(), labelMap)
case pdata.MetricDataTypeDoubleHistogram:
addLabelsToDoubleHistogramDataPoints(metric.DoubleHistogram().DataPoints(), labelMap)
}
}

func addLabelsToIntDataPoints(ps pdata.IntDataPointSlice, newLabelMap pdata.StringMap) {
for i := 0; i < ps.Len(); i++ {
dataPoint := ps.At(i)
if dataPoint.IsNil() {
continue
}
joinStringMaps(newLabelMap, dataPoint.LabelsMap())
}
}

func addLabelsToDoubleDataPoints(ps pdata.DoubleDataPointSlice, newLabelMap pdata.StringMap) {
for i := 0; i < ps.Len(); i++ {
dataPoint := ps.At(i)
if dataPoint.IsNil() {
continue
}
joinStringMaps(newLabelMap, dataPoint.LabelsMap())
}
}

func addLabelsToIntHistogramDataPoints(ps pdata.IntHistogramDataPointSlice, newLabelMap pdata.StringMap) {
for i := 0; i < ps.Len(); i++ {
dataPoint := ps.At(i)
if dataPoint.IsNil() {
continue
}
joinStringMaps(newLabelMap, dataPoint.LabelsMap())
}
}

func addLabelsToDoubleHistogramDataPoints(ps pdata.DoubleHistogramDataPointSlice, newLabelMap pdata.StringMap) {
for i := 0; i < ps.Len(); i++ {
dataPoint := ps.At(i)
if dataPoint.IsNil() {
continue
}
joinStringMaps(newLabelMap, dataPoint.LabelsMap())
}
}

func joinStringMaps(from, to pdata.StringMap) {
from.ForEach(func(k, v string) {
to.Upsert(k, v)
})
}
120 changes: 120 additions & 0 deletions exporter/exporterhelper/resource_to_label_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exporterhelper

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/internal/data/testdata"
)

func TestConvertResourceToLabels(t *testing.T) {
md := testdata.GenerateMetricsOneMetric()
assert.NotNil(t, md)

// Before converting resource to labels
assert.Equal(t, 1, md.ResourceMetrics().At(0).Resource().Attributes().Len())
assert.Equal(t, 1, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).IntSum().DataPoints().At(0).LabelsMap().Len())

cloneMd := convertResourceToLabels(md)

// After converting resource to labels
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).Resource().Attributes().Len())
assert.Equal(t, 2, cloneMd.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).IntSum().DataPoints().At(0).LabelsMap().Len())

assert.Equal(t, 1, md.ResourceMetrics().At(0).Resource().Attributes().Len())
assert.Equal(t, 1, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).IntSum().DataPoints().At(0).LabelsMap().Len())

}

func TestConvertResourceToLabelsAllDataTypes(t *testing.T) {
md := testdata.GenerateMetricsAllTypesEmptyDataPoint()
assert.NotNil(t, md)

// Before converting resource to labels
assert.Equal(t, 1, md.ResourceMetrics().At(0).Resource().Attributes().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).DoubleGauge().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(1).IntGauge().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(2).DoubleSum().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(3).IntSum().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).DoubleHistogram().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(5).IntHistogram().DataPoints().At(0).LabelsMap().Len())

cloneMd := convertResourceToLabels(md)

// After converting resource to labels
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).Resource().Attributes().Len())
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).DoubleGauge().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(1).IntGauge().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(2).DoubleSum().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(3).IntSum().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).DoubleHistogram().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(5).IntHistogram().DataPoints().At(0).LabelsMap().Len())

assert.Equal(t, 1, md.ResourceMetrics().At(0).Resource().Attributes().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).DoubleGauge().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(1).IntGauge().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(2).DoubleSum().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(3).IntSum().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(4).DoubleHistogram().DataPoints().At(0).LabelsMap().Len())
assert.Equal(t, 0, md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(5).IntHistogram().DataPoints().At(0).LabelsMap().Len())

}

func TestConvertResourceToLabelsAllDataTypesNilDataPoint(t *testing.T) {
md := testdata.GenerateMetricsAllTypesNilDataPoint()
assert.NotNil(t, md)

// Before converting resource to labels
assert.Equal(t, 1, md.ResourceMetrics().At(0).Resource().Attributes().Len())

cloneMd := convertResourceToLabels(md)

// After converting resource to labels
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).Resource().Attributes().Len())

assert.Equal(t, 1, md.ResourceMetrics().At(0).Resource().Attributes().Len())
}

func TestConvertResourceToLabelsOneMetricOneNil(t *testing.T) {
md := testdata.GenerateMetricsOneMetricOneNil()
assert.NotNil(t, md)

// Before converting resource to labels
assert.Equal(t, 1, md.ResourceMetrics().At(0).Resource().Attributes().Len())

cloneMd := convertResourceToLabels(md)

// After converting resource to labels
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).Resource().Attributes().Len())

assert.Equal(t, 1, md.ResourceMetrics().At(0).Resource().Attributes().Len())
}

func TestConvertResourceToLabelsOneEmptyOneNilIlm(t *testing.T) {
md := testdata.GenerateMetricsOneEmptyOneNilInstrumentationLibrary()
assert.NotNil(t, md)

// Before converting resource to labels
assert.Equal(t, 1, md.ResourceMetrics().At(0).Resource().Attributes().Len())

cloneMd := convertResourceToLabels(md)

// After converting resource to labels
assert.Equal(t, 1, cloneMd.ResourceMetrics().At(0).Resource().Attributes().Len())

assert.Equal(t, 1, md.ResourceMetrics().At(0).Resource().Attributes().Len())
}