Skip to content

Commit

Permalink
Add processor helper. In the future add metrics and tracing (#1359)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jul 15, 2020
1 parent a509d72 commit afc4796
Show file tree
Hide file tree
Showing 19 changed files with 647 additions and 297 deletions.
44 changes: 11 additions & 33 deletions processor/attributesprocessor/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,31 @@ package attributesprocessor
import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/attraction"
"go.opentelemetry.io/collector/internal/processor/filterspan"
"go.opentelemetry.io/collector/processor"
)

type attributesProcessor struct {
nextConsumer consumer.TraceConsumer
attrProc *attraction.AttrProc
include filterspan.Matcher
exclude filterspan.Matcher
attrProc *attraction.AttrProc
include filterspan.Matcher
exclude filterspan.Matcher
}

// newTraceProcessor returns a processor that modifies attributes of a span.
// To construct the attributes processors, the use of the factory methods are required
// in order to validate the inputs.
func newTraceProcessor(nextConsumer consumer.TraceConsumer, attrProc *attraction.AttrProc, include, exclude filterspan.Matcher) (component.TraceProcessor, error) {
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
func newAttributesProcessor(attrProc *attraction.AttrProc, include, exclude filterspan.Matcher) *attributesProcessor {
return &attributesProcessor{
attrProc: attrProc,
include: include,
exclude: exclude,
}
ap := &attributesProcessor{
nextConsumer: nextConsumer,
attrProc: attrProc,
include: include,
exclude: exclude,
}
return ap, nil
}

func (a *attributesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
// ProcessTraces implements the TProcessor
func (a *attributesProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
Expand Down Expand Up @@ -79,21 +71,7 @@ func (a *attributesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces
}
}
}
return a.nextConsumer.ConsumeTraces(ctx, td)
}

func (a *attributesProcessor) GetCapabilities() component.ProcessorCapabilities {
return component.ProcessorCapabilities{MutatesConsumedData: true}
}

// Start is invoked during service startup.
func (a *attributesProcessor) Start(_ context.Context, _ component.Host) error {
return nil
}

// Shutdown is invoked during service shutdown.
func (a *attributesProcessor) Shutdown(context.Context) error {
return nil
return td, nil
}

// skipSpan determines if a span should be processed.
Expand Down
9 changes: 8 additions & 1 deletion processor/attributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
typeStr = "attributes"
)

var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true}

// NewFactory returns a new factory for the Attributes processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
Expand Down Expand Up @@ -71,5 +73,10 @@ func createTraceProcessor(
if err != nil {
return nil, err
}
return newTraceProcessor(nextConsumer, attrProc, include, exclude)

return processorhelper.NewTraceProcessor(
cfg,
nextConsumer,
newAttributesProcessor(attrProc, include, exclude),
processorhelper.WithCapabilities(processorCapabilities))
}
4 changes: 2 additions & 2 deletions processor/attributesprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ func TestFactoryCreateTraceProcessor(t *testing.T) {
tp, err = factory.CreateTraceProcessor(
context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.Nil(t, tp)
assert.NotNil(t, err)
assert.Error(t, err)

oCfg.Actions = []attraction.ActionKeyValue{
{Action: attraction.DELETE},
}
tp, err = factory.CreateTraceProcessor(
context.Background(), component.ProcessorCreateParams{}, exportertest.NewNopTraceExporter(), cfg)
assert.Nil(t, tp)
assert.NotNil(t, err)
assert.Error(t, err)
}

func TestFactory_CreateMetricsProcessor(t *testing.T) {
Expand Down
13 changes: 11 additions & 2 deletions processor/filterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
typeStr = "filter"
)

var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: false}

// NewFactory returns a new factory for the Filter processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
Expand All @@ -51,6 +53,13 @@ func createMetricsProcessor(
cfg configmodels.Processor,
nextConsumer consumer.MetricsConsumer,
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)
return newFilterMetricProcessor(nextConsumer, oCfg)
fp, err := newFilterMetricProcessor(cfg.(*Config))
if err != nil {
return nil, err
}
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
fp,
processorhelper.WithCapabilities(processorCapabilities))
}
7 changes: 4 additions & 3 deletions processor/filterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exportertest"
)

func TestType(t *testing.T) {
Expand Down Expand Up @@ -81,7 +82,7 @@ func TestCreateProcessors(t *testing.T) {
tp, tErr := factory.CreateTraceProcessor(
context.Background(),
component.ProcessorCreateParams{Logger: zap.NewNop()},
nil,
exportertest.NewNopTraceExporter(),
cfg)
// Not implemented error
assert.NotNil(t, tErr)
Expand All @@ -90,9 +91,9 @@ func TestCreateProcessors(t *testing.T) {
mp, mErr := factory.CreateMetricsProcessor(
context.Background(),
component.ProcessorCreateParams{Logger: zap.NewNop()},
nil,
exportertest.NewNopMetricsExporter(),
cfg)
assert.Equal(t, test.succeed, mp != (*filterMetricProcessor)(nil))
assert.Equal(t, test.succeed, mp != nil)
assert.Equal(t, test.succeed, mErr == nil)
})
}
Expand Down
34 changes: 4 additions & 30 deletions processor/filterprocessor/filter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,18 @@ import (

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/internal/processor/filtermetric"
)

type filterMetricProcessor struct {
cfg *Config
next consumer.MetricsConsumer
include *filtermetric.Matcher
exclude *filtermetric.Matcher
}

var _ component.MetricsProcessor = (*filterMetricProcessor)(nil)

func newFilterMetricProcessor(next consumer.MetricsConsumer, cfg *Config) (*filterMetricProcessor, error) {
func newFilterMetricProcessor(cfg *Config) (*filterMetricProcessor, error) {
inc, err := createMatcher(cfg.Metrics.Include)
if err != nil {
return nil, err
Expand All @@ -48,7 +43,6 @@ func newFilterMetricProcessor(next consumer.MetricsConsumer, cfg *Config) (*filt

return &filterMetricProcessor{
cfg: cfg,
next: next,
include: inc,
exclude: exc,
}, nil
Expand All @@ -68,28 +62,8 @@ func createMatcher(mp *filtermetric.MatchProperties) (*filtermetric.Matcher, err
return &matcher, nil
}

// GetCapabilities returns the Capabilities assocciated with the resource processor.
func (fmp *filterMetricProcessor) GetCapabilities() component.ProcessorCapabilities {
return component.ProcessorCapabilities{MutatesConsumedData: false}
}

// Start is invoked during service startup.
func (*filterMetricProcessor) Start(_ context.Context, _ component.Host) error {
return nil
}

// Shutdown is invoked during service shutdown.
func (*filterMetricProcessor) Shutdown(_ context.Context) error {
return nil
}

// ConsumeMetricsData implements the MetricsProcessor interface
func (fmp *filterMetricProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
return fmp.next.ConsumeMetrics(ctx, fmp.filterMetrics(md))
}

// filterMetrics filters the given spans based off the filterMetricProcessor's filters.
func (fmp *filterMetricProcessor) filterMetrics(md pdata.Metrics) pdata.Metrics {
// ProcessMetrics filters the given spans based off the filterMetricProcessor's filters.
func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
mds := pdatautil.MetricsToMetricsData(md)
for i := range mds {
if len(mds[i].Metrics) == 0 {
Expand All @@ -103,7 +77,7 @@ func (fmp *filterMetricProcessor) filterMetrics(md pdata.Metrics) pdata.Metrics
}
mds[i].Metrics = keep
}
return pdatautil.MetricsFromMetricsData(mds)
return pdatautil.MetricsFromMetricsData(mds), nil
}

// shouldKeepMetric determines whether a metric should be kept based off the filterMetricProcessor's filters.
Expand Down
7 changes: 5 additions & 2 deletions processor/filterprocessor/filter_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
Expand Down Expand Up @@ -199,7 +200,8 @@ func TestFilterMetricProcessor(t *testing.T) {
Exclude: test.exc,
},
}
fmp, err := newFilterMetricProcessor(next, cfg)
factory := NewFactory()
fmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, next, cfg)
assert.NotNil(t, fmp)
assert.Nil(t, err)

Expand Down Expand Up @@ -282,7 +284,8 @@ func BenchmarkFilter_MetricNames(b *testing.B) {
Exclude: test.exc,
},
}
fmp, err := newFilterMetricProcessor(next, cfg)
factory := NewFactory()
fmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, next, cfg)
assert.NotNil(b, fmp)
assert.Nil(b, err)

Expand Down
61 changes: 32 additions & 29 deletions processor/memorylimiter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package memorylimiter
import (
"context"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -30,6 +28,8 @@ const (
typeStr = "memory_limiter"
)

var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: false}

// NewFactory returns a new factory for the Memory Limiter processor.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
Expand Down Expand Up @@ -57,7 +57,16 @@ func createTraceProcessor(
cfg configmodels.Processor,
nextConsumer consumer.TraceConsumer,
) (component.TraceProcessor, error) {
return createProcessor(params.Logger, nextConsumer, nil, nil, cfg)
ml, err := newMemoryLimiter(params.Logger, cfg.(*Config))
if err != nil {
return nil, err
}
return processorhelper.NewTraceProcessor(
cfg,
nextConsumer,
ml,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
}

func createMetricsProcessor(
Expand All @@ -66,7 +75,16 @@ func createMetricsProcessor(
cfg configmodels.Processor,
nextConsumer consumer.MetricsConsumer,
) (component.MetricsProcessor, error) {
return createProcessor(params.Logger, nil, nextConsumer, nil, cfg)
ml, err := newMemoryLimiter(params.Logger, cfg.(*Config))
if err != nil {
return nil, err
}
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
ml,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
}

func createLogProcessor(
Expand All @@ -75,29 +93,14 @@ func createLogProcessor(
cfg configmodels.Processor,
nextConsumer consumer.LogConsumer,
) (component.LogProcessor, error) {
return createProcessor(params.Logger, nil, nil, nextConsumer, cfg)
}

type TripleTypeProcessor interface {
consumer.TraceConsumer
consumer.MetricsConsumer
consumer.LogConsumer
component.Processor
}

func createProcessor(
logger *zap.Logger,
traceConsumer consumer.TraceConsumer,
metricConsumer consumer.MetricsConsumer,
logConsumer consumer.LogConsumer,
cfg configmodels.Processor,
) (TripleTypeProcessor, error) {
pCfg := cfg.(*Config)
return newMemoryLimiter(
logger,
traceConsumer,
metricConsumer,
logConsumer,
pCfg,
)
ml, err := newMemoryLimiter(params.Logger, cfg.(*Config))
if err != nil {
return nil, err
}
return processorhelper.NewLogProcessor(
cfg,
nextConsumer,
ml,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
}
Loading

0 comments on commit afc4796

Please sign in to comment.