Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New label processor #20

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions processor/labelsprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Labels Processor

Supported pipeline types: metrics

The labels processor can be used to add data point labels to all metrics that pass through it.
If any specified labels already exist in the metric, the value will be updated.

Please refer to [config.go](./config.go) for the config spec.

Example:

```yaml
processors:
labels_processor:
labels:
- key: label1
value: value1
- key: label2
value: value2
amanbrar1999 marked this conversation as resolved.
Show resolved Hide resolved
```

Refer to [config.yaml](./testdata/config.yaml) for detailed
examples on using the processor.
15 changes: 15 additions & 0 deletions processor/labelsprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package labelsprocessor

import "go.opentelemetry.io/collector/config/configmodels"

// Config defines configuration for Labels processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
Labels []LabelConfig `mapstructure:"labels"`
}

// LabelConfig defines configuration for provided labels
type LabelConfig struct {
Key string `mapstructure:"key"`
Value string `mapstructure:"value"`
}
50 changes: 50 additions & 0 deletions processor/labelsprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package labelsprocessor

import (
"os"
"path"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
)

func TestLoadConfig(t *testing.T) {

factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factories.Processors[typeStr] = NewFactory()

os.Setenv("VALUE_1", "first_val")
os.Setenv("VALUE_2", "second_val")

cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
assert.NoError(t, err)
assert.NotNil(t, cfg)

assert.Equal(t, cfg.Processors["labels_processor"], &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "labels_processor",
NameVal: "labels_processor",
},
Labels: []LabelConfig{
{Key: "label1", Value: "value1"},
{Key: "label2", Value: "value2"},
},
})

assert.Equal(t, cfg.Processors["labels_processor/env_vars"], &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "labels_processor",
NameVal: "labels_processor/env_vars",
},
Labels: []LabelConfig{
{Key: "label1", Value: "first_val"},
{Key: "label2", Value: "second_val"},
},
})

}
49 changes: 49 additions & 0 deletions processor/labelsprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package labelsprocessor

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
)

const (
typeStr = "labels_processor"
)

var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true}

// NewFactory returns a new factory for the Labels processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithMetrics(createMetricsProcessor))
}

func createDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
}
}

func createMetricsProcessor(
_ context.Context,
_ component.ProcessorCreateParams,
cfg configmodels.Processor,
nextConsumer consumer.MetricsConsumer) (component.MetricsProcessor, error) {
lp, err := newLabelMetricProcessor(cfg.(*Config))
if err != nil {
return nil, err
}
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
lp,
processorhelper.WithCapabilities(processorCapabilities))
}
51 changes: 51 additions & 0 deletions processor/labelsprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package labelsprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func TestType(t *testing.T) {
factory := NewFactory()
pType := factory.Type()
assert.Equal(t, pType, configmodels.Type("labels_processor"))
}

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.Equal(t, cfg, &Config{
ProcessorSettings: configmodels.ProcessorSettings{
NameVal: typeStr,
TypeVal: typeStr,
},
})
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateProcessor(t *testing.T) {

factory := NewFactory()
cfg := &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "labels_processor",
NameVal: "labels_processor",
},
Labels: []LabelConfig{
{Key: "label1", Value: "value1"},
},
}

mp, mErr := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, exportertest.NewNopMetricsExporter())
assert.NoError(t, mErr)
assert.NotNil(t, mp)

}
118 changes: 118 additions & 0 deletions processor/labelsprocessor/labels_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package labelsprocessor

import (
"context"
"fmt"

"go.opentelemetry.io/collector/consumer/pdata"
v11 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1"
v1 "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1"
)

type labelMetricProcessor struct {
cfg *Config
}

func newLabelMetricProcessor(cfg *Config) (*labelMetricProcessor, error) {
err := validateConfig(cfg)
if err != nil {
return nil, err
}
return &labelMetricProcessor{cfg: cfg}, nil
}

func validateConfig(cfg *Config) error {
for _, elem := range cfg.Labels {
if elem.Key == "" || elem.Value == "" {
amanbrar1999 marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("Labels Processor configuration contains an empty key or value")
}
}

keys := make(map[string]bool)
for _, elem := range cfg.Labels {
_, value := keys[elem.Key]
if value {
return fmt.Errorf("Labels Processor configuration contains duplicate keys")
}
keys[elem.Key] = true
}

return nil
}

func (lp *labelMetricProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {

otlpMetrics := pdata.MetricsToOtlp(md)

for _, otlpMetric := range otlpMetrics {
for _, instrMetric := range otlpMetric.GetInstrumentationLibraryMetrics() {
for _, metric := range instrMetric.GetMetrics() {

// Multiple types of Data Points exists, and each of them must be handled differently
if metric.GetIntSum() != nil {
intDataPoints := metric.GetIntSum().GetDataPoints()
handleIntDataPoints(intDataPoints, lp)
} else if metric.GetIntGauge() != nil {
intDataPoints := metric.GetIntGauge().GetDataPoints()
handleIntDataPoints(intDataPoints, lp)
} else if metric.GetDoubleGauge() != nil {
doubleDataPoints := metric.GetDoubleGauge().GetDataPoints()
handleDoubleDataPoints(doubleDataPoints, lp)
} else if metric.GetDoubleSum() != nil {
doubleDataPoints := metric.GetDoubleSum().GetDataPoints()
handleDoubleDataPoints(doubleDataPoints, lp)
} else if metric.GetIntHistogram() != nil {
intHistogramDataPoints := metric.GetIntHistogram().GetDataPoints()
handleIntHistogramDataPoints(intHistogramDataPoints, lp)
} else if metric.GetDoubleHistogram() != nil {
doubleHistogramDataPoints := metric.GetDoubleHistogram().GetDataPoints()
handleDoubleHistogramDataPoints(doubleHistogramDataPoints, lp)
}

}
}
}

return md, nil
}

func handleIntDataPoints(intDataPoints []*v1.IntDataPoint, lp *labelMetricProcessor) {
for _, dataPoint := range intDataPoints {
upsertLabels(&dataPoint.Labels, lp)
}
}

func handleDoubleDataPoints(doubleDataPoints []*v1.DoubleDataPoint, lp *labelMetricProcessor) {
for _, dataPoint := range doubleDataPoints {
upsertLabels(&dataPoint.Labels, lp)
}
}

func handleIntHistogramDataPoints(intHistogramDataPoints []*v1.IntHistogramDataPoint, lp *labelMetricProcessor) {
for _, dataPoint := range intHistogramDataPoints {
upsertLabels(&dataPoint.Labels, lp)
}
}

func handleDoubleHistogramDataPoints(doubleHistogramDataPoints []*v1.DoubleHistogramDataPoint, lp *labelMetricProcessor) {
for _, dataPoint := range doubleHistogramDataPoints {
upsertLabels(&dataPoint.Labels, lp)
}

}

func upsertLabels(labels *[]*v11.StringKeyValue, lp *labelMetricProcessor) {
for _, label := range lp.cfg.Labels {
var updated bool = false
for _, elem := range *labels {
if elem.Key == label.Key {
elem.Value = label.Value
updated = true
break
}
}
if !updated {
*labels = append(*labels, &v11.StringKeyValue{Key: label.Key, Value: label.Value})
}
}
}
Loading