diff --git a/processor/attributesprocessor/attributes.go b/processor/attributesprocessor/attributes.go index c13cc8c24265..f2c7278ca0b1 100644 --- a/processor/attributesprocessor/attributes.go +++ b/processor/attributesprocessor/attributes.go @@ -17,9 +17,6 @@ package attributesprocessor import ( "context" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/attraction" "go.opentelemetry.io/collector/internal/processor/filterspan" @@ -27,29 +24,24 @@ import ( ) type attributesProcessor struct { - nextConsumer consumer.TraceConsumer - attrProc *attraction.AttrProc - include filterspan.Matcher - exclude filterspan.Matcher + attrProc *attraction.AttrProc + include filterspan.Matcher + exclude filterspan.Matcher } // newTraceProcessor returns a processor that modifies attributes of a span. // To construct the attributes processors, the use of the factory methods are required // in order to validate the inputs. -func newTraceProcessor(nextConsumer consumer.TraceConsumer, attrProc *attraction.AttrProc, include, exclude filterspan.Matcher) (component.TraceProcessor, error) { - if nextConsumer == nil { - return nil, componenterror.ErrNilNextConsumer +func newAttributesProcessor(attrProc *attraction.AttrProc, include, exclude filterspan.Matcher) *attributesProcessor { + return &attributesProcessor{ + attrProc: attrProc, + include: include, + exclude: exclude, } - ap := &attributesProcessor{ - nextConsumer: nextConsumer, - attrProc: attrProc, - include: include, - exclude: exclude, - } - return ap, nil } -func (a *attributesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { +// ProcessTraces implements the TProcessor +func (a *attributesProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) { rss := td.ResourceSpans() for i := 0; i < rss.Len(); i++ { rs := rss.At(i) @@ -79,21 +71,7 @@ func (a *attributesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces } } } - return a.nextConsumer.ConsumeTraces(ctx, td) -} - -func (a *attributesProcessor) GetCapabilities() component.ProcessorCapabilities { - return component.ProcessorCapabilities{MutatesConsumedData: true} -} - -// Start is invoked during service startup. -func (a *attributesProcessor) Start(_ context.Context, _ component.Host) error { - return nil -} - -// Shutdown is invoked during service shutdown. -func (a *attributesProcessor) Shutdown(context.Context) error { - return nil + return td, nil } // skipSpan determines if a span should be processed. diff --git a/processor/attributesprocessor/factory.go b/processor/attributesprocessor/factory.go index c7a5e34e5bd7..6054494db3d3 100644 --- a/processor/attributesprocessor/factory.go +++ b/processor/attributesprocessor/factory.go @@ -31,6 +31,8 @@ const ( typeStr = "attributes" ) +var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true} + // NewFactory returns a new factory for the Attributes processor. func NewFactory() component.ProcessorFactory { return processorhelper.NewFactory( @@ -71,5 +73,10 @@ func createTraceProcessor( if err != nil { return nil, err } - return newTraceProcessor(nextConsumer, attrProc, include, exclude) + + return processorhelper.NewTraceProcessor( + cfg, + nextConsumer, + newAttributesProcessor(attrProc, include, exclude), + processorhelper.WithCapabilities(processorCapabilities)) } diff --git a/processor/attributesprocessor/factory_test.go b/processor/attributesprocessor/factory_test.go index 1ea994075f0c..986898b52a38 100644 --- a/processor/attributesprocessor/factory_test.go +++ b/processor/attributesprocessor/factory_test.go @@ -83,7 +83,7 @@ func TestFactoryCreateTraceProcessor(t *testing.T) { tp, err = factory.CreateTraceProcessor( context.Background(), component.ProcessorCreateParams{}, nil, cfg) assert.Nil(t, tp) - assert.NotNil(t, err) + assert.Error(t, err) oCfg.Actions = []attraction.ActionKeyValue{ {Action: attraction.DELETE}, @@ -91,7 +91,7 @@ func TestFactoryCreateTraceProcessor(t *testing.T) { tp, err = factory.CreateTraceProcessor( context.Background(), component.ProcessorCreateParams{}, exportertest.NewNopTraceExporter(), cfg) assert.Nil(t, tp) - assert.NotNil(t, err) + assert.Error(t, err) } func TestFactory_CreateMetricsProcessor(t *testing.T) { diff --git a/processor/filterprocessor/factory.go b/processor/filterprocessor/factory.go index aab0299382b2..05c89636de3b 100644 --- a/processor/filterprocessor/factory.go +++ b/processor/filterprocessor/factory.go @@ -28,6 +28,8 @@ const ( typeStr = "filter" ) +var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: false} + // NewFactory returns a new factory for the Filter processor. func NewFactory() component.ProcessorFactory { return processorhelper.NewFactory( @@ -51,6 +53,13 @@ func createMetricsProcessor( cfg configmodels.Processor, nextConsumer consumer.MetricsConsumer, ) (component.MetricsProcessor, error) { - oCfg := cfg.(*Config) - return newFilterMetricProcessor(nextConsumer, oCfg) + fp, err := newFilterMetricProcessor(cfg.(*Config)) + if err != nil { + return nil, err + } + return processorhelper.NewMetricsProcessor( + cfg, + nextConsumer, + fp, + processorhelper.WithCapabilities(processorCapabilities)) } diff --git a/processor/filterprocessor/factory_test.go b/processor/filterprocessor/factory_test.go index 13dbbeb4fc39..1f3e7343dfb7 100644 --- a/processor/filterprocessor/factory_test.go +++ b/processor/filterprocessor/factory_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exportertest" ) func TestType(t *testing.T) { @@ -81,7 +82,7 @@ func TestCreateProcessors(t *testing.T) { tp, tErr := factory.CreateTraceProcessor( context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, - nil, + exportertest.NewNopTraceExporter(), cfg) // Not implemented error assert.NotNil(t, tErr) @@ -90,9 +91,9 @@ func TestCreateProcessors(t *testing.T) { mp, mErr := factory.CreateMetricsProcessor( context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, - nil, + exportertest.NewNopMetricsExporter(), cfg) - assert.Equal(t, test.succeed, mp != (*filterMetricProcessor)(nil)) + assert.Equal(t, test.succeed, mp != nil) assert.Equal(t, test.succeed, mErr == nil) }) } diff --git a/processor/filterprocessor/filter_processor.go b/processor/filterprocessor/filter_processor.go index 6d1b1523a77f..6a079ed772bb 100644 --- a/processor/filterprocessor/filter_processor.go +++ b/processor/filterprocessor/filter_processor.go @@ -19,8 +19,6 @@ import ( metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/internal/processor/filtermetric" @@ -28,14 +26,11 @@ import ( type filterMetricProcessor struct { cfg *Config - next consumer.MetricsConsumer include *filtermetric.Matcher exclude *filtermetric.Matcher } -var _ component.MetricsProcessor = (*filterMetricProcessor)(nil) - -func newFilterMetricProcessor(next consumer.MetricsConsumer, cfg *Config) (*filterMetricProcessor, error) { +func newFilterMetricProcessor(cfg *Config) (*filterMetricProcessor, error) { inc, err := createMatcher(cfg.Metrics.Include) if err != nil { return nil, err @@ -48,7 +43,6 @@ func newFilterMetricProcessor(next consumer.MetricsConsumer, cfg *Config) (*filt return &filterMetricProcessor{ cfg: cfg, - next: next, include: inc, exclude: exc, }, nil @@ -68,28 +62,8 @@ func createMatcher(mp *filtermetric.MatchProperties) (*filtermetric.Matcher, err return &matcher, nil } -// GetCapabilities returns the Capabilities assocciated with the resource processor. -func (fmp *filterMetricProcessor) GetCapabilities() component.ProcessorCapabilities { - return component.ProcessorCapabilities{MutatesConsumedData: false} -} - -// Start is invoked during service startup. -func (*filterMetricProcessor) Start(_ context.Context, _ component.Host) error { - return nil -} - -// Shutdown is invoked during service shutdown. -func (*filterMetricProcessor) Shutdown(_ context.Context) error { - return nil -} - -// ConsumeMetricsData implements the MetricsProcessor interface -func (fmp *filterMetricProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { - return fmp.next.ConsumeMetrics(ctx, fmp.filterMetrics(md)) -} - -// filterMetrics filters the given spans based off the filterMetricProcessor's filters. -func (fmp *filterMetricProcessor) filterMetrics(md pdata.Metrics) pdata.Metrics { +// ProcessMetrics filters the given spans based off the filterMetricProcessor's filters. +func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { mds := pdatautil.MetricsToMetricsData(md) for i := range mds { if len(mds[i].Metrics) == 0 { @@ -103,7 +77,7 @@ func (fmp *filterMetricProcessor) filterMetrics(md pdata.Metrics) pdata.Metrics } mds[i].Metrics = keep } - return pdatautil.MetricsFromMetricsData(mds) + return pdatautil.MetricsFromMetricsData(mds), nil } // shouldKeepMetric determines whether a metric should be kept based off the filterMetricProcessor's filters. diff --git a/processor/filterprocessor/filter_processor_test.go b/processor/filterprocessor/filter_processor_test.go index c3efad919c8b..451062826d13 100644 --- a/processor/filterprocessor/filter_processor_test.go +++ b/processor/filterprocessor/filter_processor_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/consumer/pdatautil" @@ -199,7 +200,8 @@ func TestFilterMetricProcessor(t *testing.T) { Exclude: test.exc, }, } - fmp, err := newFilterMetricProcessor(next, cfg) + factory := NewFactory() + fmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, next, cfg) assert.NotNil(t, fmp) assert.Nil(t, err) @@ -282,7 +284,8 @@ func BenchmarkFilter_MetricNames(b *testing.B) { Exclude: test.exc, }, } - fmp, err := newFilterMetricProcessor(next, cfg) + factory := NewFactory() + fmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, next, cfg) assert.NotNil(b, fmp) assert.Nil(b, err) diff --git a/processor/memorylimiter/factory.go b/processor/memorylimiter/factory.go index ca5d1fcd7432..d3d06332f0aa 100644 --- a/processor/memorylimiter/factory.go +++ b/processor/memorylimiter/factory.go @@ -17,8 +17,6 @@ package memorylimiter import ( "context" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" @@ -30,6 +28,8 @@ const ( typeStr = "memory_limiter" ) +var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: false} + // NewFactory returns a new factory for the Memory Limiter processor. func NewFactory() component.ProcessorFactory { return processorhelper.NewFactory( @@ -57,7 +57,16 @@ func createTraceProcessor( cfg configmodels.Processor, nextConsumer consumer.TraceConsumer, ) (component.TraceProcessor, error) { - return createProcessor(params.Logger, nextConsumer, nil, nil, cfg) + ml, err := newMemoryLimiter(params.Logger, cfg.(*Config)) + if err != nil { + return nil, err + } + return processorhelper.NewTraceProcessor( + cfg, + nextConsumer, + ml, + processorhelper.WithCapabilities(processorCapabilities), + processorhelper.WithShutdown(ml.shutdown)) } func createMetricsProcessor( @@ -66,7 +75,16 @@ func createMetricsProcessor( cfg configmodels.Processor, nextConsumer consumer.MetricsConsumer, ) (component.MetricsProcessor, error) { - return createProcessor(params.Logger, nil, nextConsumer, nil, cfg) + ml, err := newMemoryLimiter(params.Logger, cfg.(*Config)) + if err != nil { + return nil, err + } + return processorhelper.NewMetricsProcessor( + cfg, + nextConsumer, + ml, + processorhelper.WithCapabilities(processorCapabilities), + processorhelper.WithShutdown(ml.shutdown)) } func createLogProcessor( @@ -75,29 +93,14 @@ func createLogProcessor( cfg configmodels.Processor, nextConsumer consumer.LogConsumer, ) (component.LogProcessor, error) { - return createProcessor(params.Logger, nil, nil, nextConsumer, cfg) -} - -type TripleTypeProcessor interface { - consumer.TraceConsumer - consumer.MetricsConsumer - consumer.LogConsumer - component.Processor -} - -func createProcessor( - logger *zap.Logger, - traceConsumer consumer.TraceConsumer, - metricConsumer consumer.MetricsConsumer, - logConsumer consumer.LogConsumer, - cfg configmodels.Processor, -) (TripleTypeProcessor, error) { - pCfg := cfg.(*Config) - return newMemoryLimiter( - logger, - traceConsumer, - metricConsumer, - logConsumer, - pCfg, - ) + ml, err := newMemoryLimiter(params.Logger, cfg.(*Config)) + if err != nil { + return nil, err + } + return processorhelper.NewLogProcessor( + cfg, + nextConsumer, + ml, + processorhelper.WithCapabilities(processorCapabilities), + processorhelper.WithShutdown(ml.shutdown)) } diff --git a/processor/memorylimiter/memorylimiter.go b/processor/memorylimiter/memorylimiter.go index c82d79b73d13..2a8172e129b9 100644 --- a/processor/memorylimiter/memorylimiter.go +++ b/processor/memorylimiter/memorylimiter.go @@ -24,8 +24,6 @@ import ( "go.opencensus.io/stats" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/internal/data" @@ -40,8 +38,6 @@ var ( // Construction errors - errNilNextConsumer = errors.New("nil nextConsumer") - errCheckIntervalOutOfRange = errors.New( "checkInterval must be greater than zero") @@ -53,10 +49,6 @@ var ( ) type memoryLimiter struct { - traceConsumer consumer.TraceConsumer - metricsConsumer consumer.MetricsConsumer - logConsumer consumer.LogConsumer - memAllocLimit uint64 memSpikeLimit uint64 memCheckWait time.Duration @@ -78,20 +70,12 @@ type memoryLimiter struct { } // newMemoryLimiter returns a new memorylimiter processor. -func newMemoryLimiter( - logger *zap.Logger, - traceConsumer consumer.TraceConsumer, - metricsConsumer consumer.MetricsConsumer, - logConsumer consumer.LogConsumer, - cfg *Config) (TripleTypeProcessor, error) { +func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) { const mibBytes = 1024 * 1024 memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes ballastSize := uint64(cfg.BallastSizeMiB) * mibBytes - if traceConsumer == nil && metricsConsumer == nil && logConsumer == nil { - return nil, errNilNextConsumer - } if cfg.CheckInterval <= 0 { return nil, errCheckIntervalOutOfRange } @@ -103,17 +87,14 @@ func newMemoryLimiter( } ml := &memoryLimiter{ - traceConsumer: traceConsumer, - metricsConsumer: metricsConsumer, - logConsumer: logConsumer, - memAllocLimit: memAllocLimit, - memSpikeLimit: memSpikeLimit, - memCheckWait: cfg.CheckInterval, - ballastSize: ballastSize, - ticker: time.NewTicker(cfg.CheckInterval), - readMemStatsFn: runtime.ReadMemStats, - procName: cfg.Name(), - logger: logger, + memAllocLimit: memAllocLimit, + memSpikeLimit: memSpikeLimit, + memCheckWait: cfg.CheckInterval, + ballastSize: ballastSize, + ticker: time.NewTicker(cfg.CheckInterval), + readMemStatsFn: runtime.ReadMemStats, + procName: cfg.Name(), + logger: logger, } ml.startMonitoring() @@ -121,12 +102,13 @@ func newMemoryLimiter( return ml, nil } -func (ml *memoryLimiter) ConsumeTraces( - ctx context.Context, - td pdata.Traces, -) error { +func (ml *memoryLimiter) shutdown(context.Context) error { + ml.ticker.Stop() + return nil +} - ctx = obsreport.ProcessorContext(ctx, ml.procName) +// ProcessTraces implements the TProcessor interface +func (ml *memoryLimiter) ProcessTraces(ctx context.Context, td pdata.Traces) (pdata.Traces, error) { numSpans := td.SpanCount() if ml.forcingDrop() { stats.Record( @@ -141,18 +123,17 @@ func (ml *memoryLimiter) ConsumeTraces( // callstack. obsreport.ProcessorTraceDataRefused(ctx, numSpans) - return errForcedDrop + return td, errForcedDrop } // Even if the next consumer returns error record the data as accepted by // this processor. obsreport.ProcessorTraceDataAccepted(ctx, numSpans) - return ml.traceConsumer.ConsumeTraces(ctx, td) + return td, nil } -func (ml *memoryLimiter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { - - ctx = obsreport.ProcessorContext(ctx, ml.procName) +// ProcessMetrics implements the MProcessor interface +func (ml *memoryLimiter) ProcessMetrics(ctx context.Context, md pdata.Metrics) (pdata.Metrics, error) { _, numDataPoints := pdatautil.MetricAndDataPointCount(md) if ml.forcingDrop() { // TODO: actually to be 100% sure that this is "refused" and not "dropped" @@ -162,18 +143,17 @@ func (ml *memoryLimiter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) e // callstack. obsreport.ProcessorMetricsDataRefused(ctx, numDataPoints) - return errForcedDrop + return md, errForcedDrop } // Even if the next consumer returns error record the data as accepted by // this processor. obsreport.ProcessorMetricsDataAccepted(ctx, numDataPoints) - return ml.metricsConsumer.ConsumeMetrics(ctx, md) + return md, nil } -func (ml *memoryLimiter) ConsumeLogs(ctx context.Context, ld data.Logs) error { - - ctx = obsreport.ProcessorContext(ctx, ml.procName) +// ProcessLogs implements the LProcessor interface +func (ml *memoryLimiter) ProcessLogs(ctx context.Context, ld data.Logs) (data.Logs, error) { numRecords := ld.LogRecordCount() if ml.forcingDrop() { // TODO: actually to be 100% sure that this is "refused" and not "dropped" @@ -183,26 +163,13 @@ func (ml *memoryLimiter) ConsumeLogs(ctx context.Context, ld data.Logs) error { // callstack. obsreport.ProcessorLogRecordsRefused(ctx, numRecords) - return errForcedDrop + return ld, errForcedDrop } // Even if the next consumer returns error record the data as accepted by // this processor. - obsreport.ProcessorMetricsDataAccepted(ctx, numRecords) - return ml.logConsumer.ConsumeLogs(ctx, ld) -} - -func (ml *memoryLimiter) GetCapabilities() component.ProcessorCapabilities { - return component.ProcessorCapabilities{MutatesConsumedData: false} -} - -func (ml *memoryLimiter) Start(_ context.Context, _ component.Host) error { - return nil -} - -func (ml *memoryLimiter) Shutdown(context.Context) error { - ml.ticker.Stop() - return nil + obsreport.ProcessorLogRecordsAccepted(ctx, numRecords) + return ld, nil } func (ml *memoryLimiter) readMemStats(ms *runtime.MemStats) { diff --git a/processor/memorylimiter/memorylimiter_test.go b/processor/memorylimiter/memorylimiter_test.go index d9d3e20cf0ef..ce9c72d1569f 100644 --- a/processor/memorylimiter/memorylimiter_test.go +++ b/processor/memorylimiter/memorylimiter_test.go @@ -21,13 +21,16 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/data" + "go.opentelemetry.io/collector/processor/processorhelper" ) func TestNew(t *testing.T) { @@ -43,10 +46,6 @@ func TestNew(t *testing.T) { args args wantErr error }{ - { - name: "nil_nextConsumer", - wantErr: errNilNextConsumer, - }, { name: "zero_checkInterval", args: args{ @@ -87,13 +86,13 @@ func TestNew(t *testing.T) { cfg.CheckInterval = tt.args.checkInterval cfg.MemoryLimitMiB = tt.args.memoryLimitMiB cfg.MemorySpikeLimitMiB = tt.args.memorySpikeLimitMiB - got, err := newMemoryLimiter(zap.NewNop(), tt.args.nextConsumer, nil, nil, cfg) + got, err := newMemoryLimiter(zap.NewNop(), cfg) if err != tt.wantErr { t.Errorf("newMemoryLimiter() error = %v, wantErr %v", err, tt.wantErr) return } if got != nil { - assert.NoError(t, got.Shutdown(context.Background())) + assert.NoError(t, got.shutdown(context.Background())) } }) } @@ -103,14 +102,24 @@ func TestNew(t *testing.T) { // check expected side effects. func TestMetricsMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 - sink := new(exportertest.SinkMetricsExporter) ml := &memoryLimiter{ - metricsConsumer: sink, - memAllocLimit: 1024, + memAllocLimit: 1024, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, } + mp, err := processorhelper.NewMetricsProcessor( + &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + }, + exportertest.NewNopMetricsExporter(), + ml, + processorhelper.WithCapabilities(processorCapabilities), + processorhelper.WithShutdown(ml.shutdown)) + require.NoError(t, err) ctx := context.Background() md := data.NewMetricData() @@ -118,12 +127,12 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Below memAllocLimit. currentMemAlloc = 800 ml.memCheck() - assert.NoError(t, ml.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) + assert.NoError(t, mp.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) // Above memAllocLimit. currentMemAlloc = 1800 ml.memCheck() - assert.Equal(t, errForcedDrop, ml.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) + assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) // Check ballast effect ml.ballastSize = 1000 @@ -131,12 +140,12 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Below memAllocLimit accounting for ballast. currentMemAlloc = 800 + ml.ballastSize ml.memCheck() - assert.NoError(t, ml.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) + assert.NoError(t, mp.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize ml.memCheck() - assert.Equal(t, errForcedDrop, ml.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) + assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) // Restore ballast to default. ml.ballastSize = 0 @@ -147,12 +156,12 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // Below memSpikeLimit. currentMemAlloc = 500 ml.memCheck() - assert.NoError(t, ml.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) + assert.NoError(t, mp.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) // Above memSpikeLimit. currentMemAlloc = 550 ml.memCheck() - assert.Equal(t, errForcedDrop, ml.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) + assert.Equal(t, errForcedDrop, mp.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md))) } @@ -160,14 +169,24 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { // check expected side effects. func TestTraceMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 - sink := new(exportertest.SinkTraceExporter) ml := &memoryLimiter{ - traceConsumer: sink, memAllocLimit: 1024, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, } + tp, err := processorhelper.NewTraceProcessor( + &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + }, + exportertest.NewNopTraceExporter(), + ml, + processorhelper.WithCapabilities(processorCapabilities), + processorhelper.WithShutdown(ml.shutdown)) + require.NoError(t, err) ctx := context.Background() td := pdata.NewTraces() @@ -175,12 +194,12 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Below memAllocLimit. currentMemAlloc = 800 ml.memCheck() - assert.NoError(t, ml.ConsumeTraces(ctx, td)) + assert.NoError(t, tp.ConsumeTraces(ctx, td)) // Above memAllocLimit. currentMemAlloc = 1800 ml.memCheck() - assert.Equal(t, errForcedDrop, ml.ConsumeTraces(ctx, td)) + assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) // Check ballast effect ml.ballastSize = 1000 @@ -188,12 +207,12 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Below memAllocLimit accounting for ballast. currentMemAlloc = 800 + ml.ballastSize ml.memCheck() - assert.NoError(t, ml.ConsumeTraces(ctx, td)) + assert.NoError(t, tp.ConsumeTraces(ctx, td)) // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize ml.memCheck() - assert.Equal(t, errForcedDrop, ml.ConsumeTraces(ctx, td)) + assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) // Restore ballast to default. ml.ballastSize = 0 @@ -204,12 +223,12 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // Below memSpikeLimit. currentMemAlloc = 500 ml.memCheck() - assert.NoError(t, ml.ConsumeTraces(ctx, td)) + assert.NoError(t, tp.ConsumeTraces(ctx, td)) // Above memSpikeLimit. currentMemAlloc = 550 ml.memCheck() - assert.Equal(t, errForcedDrop, ml.ConsumeTraces(ctx, td)) + assert.Equal(t, errForcedDrop, tp.ConsumeTraces(ctx, td)) } @@ -217,14 +236,24 @@ func TestTraceMemoryPressureResponse(t *testing.T) { // check expected side effects. func TestLogMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 - sink := new(exportertest.SinkLogExporter) ml := &memoryLimiter{ - logConsumer: sink, memAllocLimit: 1024, readMemStatsFn: func(ms *runtime.MemStats) { ms.Alloc = currentMemAlloc }, } + lp, err := processorhelper.NewLogProcessor( + &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + }, + exportertest.NewNopLogsExporter(), + ml, + processorhelper.WithCapabilities(processorCapabilities), + processorhelper.WithShutdown(ml.shutdown)) + require.NoError(t, err) ctx := context.Background() ld := data.NewLogs() @@ -232,12 +261,12 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Below memAllocLimit. currentMemAlloc = 800 ml.memCheck() - assert.NoError(t, ml.ConsumeLogs(ctx, ld)) + assert.NoError(t, lp.ConsumeLogs(ctx, ld)) // Above memAllocLimit. currentMemAlloc = 1800 ml.memCheck() - assert.Equal(t, errForcedDrop, ml.ConsumeLogs(ctx, ld)) + assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) // Check ballast effect ml.ballastSize = 1000 @@ -245,12 +274,12 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Below memAllocLimit accounting for ballast. currentMemAlloc = 800 + ml.ballastSize ml.memCheck() - assert.NoError(t, ml.ConsumeLogs(ctx, ld)) + assert.NoError(t, lp.ConsumeLogs(ctx, ld)) // Above memAllocLimit even accountiing for ballast. currentMemAlloc = 1800 + ml.ballastSize ml.memCheck() - assert.Equal(t, errForcedDrop, ml.ConsumeLogs(ctx, ld)) + assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) // Restore ballast to default. ml.ballastSize = 0 @@ -261,10 +290,10 @@ func TestLogMemoryPressureResponse(t *testing.T) { // Below memSpikeLimit. currentMemAlloc = 500 ml.memCheck() - assert.NoError(t, ml.ConsumeLogs(ctx, ld)) + assert.NoError(t, lp.ConsumeLogs(ctx, ld)) // Above memSpikeLimit. currentMemAlloc = 550 ml.memCheck() - assert.Equal(t, errForcedDrop, ml.ConsumeLogs(ctx, ld)) + assert.Equal(t, errForcedDrop, lp.ConsumeLogs(ctx, ld)) } diff --git a/processor/processorhelper/processor.go b/processor/processorhelper/processor.go new file mode 100644 index 000000000000..a89c46e0d4b5 --- /dev/null +++ b/processor/processorhelper/processor.go @@ -0,0 +1,241 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processorhelper + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/internal/data" + "go.opentelemetry.io/collector/obsreport" +) + +// Start specifies the function invoked when the processor is being started. +type Start func(context.Context, component.Host) error + +// Shutdown specifies the function invoked when the processor is being shutdown. +type Shutdown func(context.Context) error + +// TProcessor is a helper interface that allows avoiding implementing all functions in TraceProcessor by using NewTraceProcessor. +type TProcessor interface { + // ProcessTraces is a helper function that processes the incoming data and returns the data to be sent to the next component. + // If error is returned then returned data are ignored. It MUST not call the next component. + ProcessTraces(context.Context, pdata.Traces) (pdata.Traces, error) +} + +// MProcessor is a helper interface that allows avoiding implementing all functions in MetricsProcessor by using NewTraceProcessor. +type MProcessor interface { + // ProcessMetrics is a helper function that processes the incoming data and returns the data to be sent to the next component. + // If error is returned then returned data are ignored. It MUST not call the next component. + ProcessMetrics(context.Context, pdata.Metrics) (pdata.Metrics, error) +} + +// LProcessor is a helper interface that allows avoiding implementing all functions in LogProcessor by using NewLogProcessor. +type LProcessor interface { + // ProcessLogs is a helper function that processes the incoming data and returns the data to be sent to the next component. + // If error is returned then returned data are ignored. It MUST not call the next component. + ProcessLogs(context.Context, data.Logs) (data.Logs, error) +} + +// Option apply changes to internalOptions. +type Option func(*baseProcessor) + +// WithStart overrides the default Start function for an processor. +// The default shutdown function does nothing and always returns nil. +func WithStart(start Start) Option { + return func(o *baseProcessor) { + o.start = start + } +} + +// WithShutdown overrides the default Shutdown function for an processor. +// The default shutdown function does nothing and always returns nil. +func WithShutdown(shutdown Shutdown) Option { + return func(o *baseProcessor) { + o.shutdown = shutdown + } +} + +// WithShutdown overrides the default GetCapabilities function for an processor. +// The default GetCapabilities function returns mutable capabilities. +func WithCapabilities(capabilities component.ProcessorCapabilities) Option { + return func(o *baseProcessor) { + o.capabilities = capabilities + } +} + +// internalOptions contains internalOptions concerning how an Processor is configured. +type baseProcessor struct { + fullName string + start Start + shutdown Shutdown + capabilities component.ProcessorCapabilities +} + +// Construct the internalOptions from multiple Option. +func newBaseProcessor(fullName string, options ...Option) baseProcessor { + be := baseProcessor{ + fullName: fullName, + capabilities: component.ProcessorCapabilities{MutatesConsumedData: true}, + } + + for _, op := range options { + op(&be) + } + + return be +} + +// Start the processor, invoked during service start. +func (bp *baseProcessor) Start(ctx context.Context, host component.Host) error { + if bp.start != nil { + return bp.start(ctx, host) + } + return nil +} + +func (bp *baseProcessor) GetCapabilities() component.ProcessorCapabilities { + return bp.capabilities +} + +// Shutdown the processor, invoked during service shutdown. +func (bp *baseProcessor) Shutdown(ctx context.Context) error { + if bp.shutdown != nil { + return bp.shutdown(ctx) + } + return nil +} + +type tracesProcessor struct { + baseProcessor + processor TProcessor + nextConsumer consumer.TraceConsumer +} + +func (mp *tracesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { + processorCtx := obsreport.ProcessorContext(ctx, mp.fullName) + var err error + td, err = mp.processor.ProcessTraces(processorCtx, td) + if err != nil { + return err + } + return mp.nextConsumer.ConsumeTraces(ctx, td) +} + +// NewTraceProcessor creates a TraceProcessor that ensure context propagation and the right tags are set. +// TODO: Add observability metrics support +func NewTraceProcessor( + config configmodels.Processor, + nextConsumer consumer.TraceConsumer, + processor TProcessor, + options ...Option, +) (component.TraceProcessor, error) { + if processor == nil { + return nil, errors.New("nil processor") + } + + if nextConsumer == nil { + return nil, componenterror.ErrNilNextConsumer + } + + return &tracesProcessor{ + baseProcessor: newBaseProcessor(config.Name(), options...), + processor: processor, + nextConsumer: nextConsumer, + }, nil +} + +type metricsProcessor struct { + baseProcessor + processor MProcessor + nextConsumer consumer.MetricsConsumer +} + +func (mp *metricsProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + processorCtx := obsreport.ProcessorContext(ctx, mp.fullName) + var err error + md, err = mp.processor.ProcessMetrics(processorCtx, md) + if err != nil { + return err + } + return mp.nextConsumer.ConsumeMetrics(ctx, md) +} + +// NewMetricsProcessor creates a MetricsProcessor that ensure context propagation and the right tags are set. +// TODO: Add observability metrics support +func NewMetricsProcessor( + config configmodels.Processor, + nextConsumer consumer.MetricsConsumer, + processor MProcessor, + options ...Option, +) (component.MetricsProcessor, error) { + if processor == nil { + return nil, errors.New("nil processor") + } + + if nextConsumer == nil { + return nil, componenterror.ErrNilNextConsumer + } + + return &metricsProcessor{ + baseProcessor: newBaseProcessor(config.Name(), options...), + processor: processor, + nextConsumer: nextConsumer, + }, nil +} + +type logProcessor struct { + baseProcessor + processor LProcessor + nextConsumer consumer.LogConsumer +} + +func (lp *logProcessor) ConsumeLogs(ctx context.Context, ld data.Logs) error { + processorCtx := obsreport.ProcessorContext(ctx, lp.fullName) + var err error + ld, err = lp.processor.ProcessLogs(processorCtx, ld) + if err != nil { + return err + } + return lp.nextConsumer.ConsumeLogs(ctx, ld) +} + +// NewLogProcessor creates a LogProcessor that ensure context propagation and the right tags are set. +// TODO: Add observability metrics support +func NewLogProcessor( + config configmodels.Processor, + nextConsumer consumer.LogConsumer, + processor LProcessor, + options ...Option, +) (component.LogProcessor, error) { + if processor == nil { + return nil, errors.New("nil processor") + } + + if nextConsumer == nil { + return nil, componenterror.ErrNilNextConsumer + } + + return &logProcessor{ + baseProcessor: newBaseProcessor(config.Name(), options...), + processor: processor, + nextConsumer: nextConsumer, + }, nil +} diff --git a/processor/processorhelper/processor_test.go b/processor/processorhelper/processor_test.go new file mode 100644 index 000000000000..b52ded206032 --- /dev/null +++ b/processor/processorhelper/processor_test.go @@ -0,0 +1,191 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processorhelper + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/internal/data" + "go.opentelemetry.io/collector/internal/data/testdata" +) + +const testFullName = "testFullName" + +var testCfg = &configmodels.ProcessorSettings{ + TypeVal: testFullName, + NameVal: testFullName, +} + +func TestWithStart(t *testing.T) { + startCalled := false + start := func(context.Context, component.Host) error { startCalled = true; return nil } + + bp := newBaseProcessor(testFullName, WithStart(start)) + assert.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) + assert.True(t, startCalled) +} + +func TestWithStart_ReturnError(t *testing.T) { + want := errors.New("my_error") + start := func(context.Context, component.Host) error { return want } + + bp := newBaseProcessor(testFullName, WithStart(start)) + assert.Equal(t, want, bp.Start(context.Background(), componenttest.NewNopHost())) +} + +func TestWithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + bp := newBaseProcessor(testFullName, WithShutdown(shutdown)) + assert.NoError(t, bp.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + +func TestWithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + bp := newBaseProcessor(testFullName, WithShutdown(shutdownErr)) + assert.Equal(t, want, bp.Shutdown(context.Background())) +} + +func TestWithCapabilities(t *testing.T) { + bp := newBaseProcessor(testFullName) + assert.True(t, bp.GetCapabilities().MutatesConsumedData) + + bp = newBaseProcessor(testFullName, WithCapabilities(component.ProcessorCapabilities{MutatesConsumedData: false})) + assert.False(t, bp.GetCapabilities().MutatesConsumedData) +} + +func TestNewTraceExporter(t *testing.T) { + me, err := NewTraceProcessor(testCfg, exportertest.NewNopTraceExporter(), newTestTProcessor(nil)) + require.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeTraces(context.Background(), testdata.GenerateTraceDataEmpty())) + assert.NoError(t, me.Shutdown(context.Background())) +} + +func TestNewTraceExporter_NilRequiredFields(t *testing.T) { + _, err := NewTraceProcessor(testCfg, exportertest.NewNopTraceExporter(), nil) + assert.Error(t, err) + + _, err = NewTraceProcessor(testCfg, nil, newTestTProcessor(nil)) + assert.Equal(t, componenterror.ErrNilNextConsumer, err) +} + +func TestNewTraceExporter_ProcessTraceError(t *testing.T) { + want := errors.New("my_error") + me, err := NewTraceProcessor(testCfg, exportertest.NewNopTraceExporter(), newTestTProcessor(want)) + require.NoError(t, err) + assert.Equal(t, want, me.ConsumeTraces(context.Background(), testdata.GenerateTraceDataEmpty())) +} + +func TestNewMetricsExporter(t *testing.T) { + me, err := NewMetricsProcessor(testCfg, exportertest.NewNopMetricsExporter(), newTestMProcessor(nil)) + require.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataEmpty()))) + assert.NoError(t, me.Shutdown(context.Background())) +} + +func TestNewMetricsExporter_NilRequiredFields(t *testing.T) { + _, err := NewMetricsProcessor(testCfg, exportertest.NewNopMetricsExporter(), nil) + assert.Error(t, err) + + _, err = NewMetricsProcessor(testCfg, nil, newTestMProcessor(nil)) + assert.Equal(t, componenterror.ErrNilNextConsumer, err) +} + +func TestNewMetricsExporter_ProcessMetricsError(t *testing.T) { + want := errors.New("my_error") + me, err := NewMetricsProcessor(testCfg, exportertest.NewNopMetricsExporter(), newTestMProcessor(want)) + require.NoError(t, err) + assert.Equal(t, want, me.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataEmpty()))) +} + +func TestNewLogExporter(t *testing.T) { + me, err := NewLogProcessor(testCfg, exportertest.NewNopLogsExporter(), newTestLProcessor(nil)) + require.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty())) + assert.NoError(t, me.Shutdown(context.Background())) +} + +func TestNewLogExporter_NilRequiredFields(t *testing.T) { + _, err := NewLogProcessor(testCfg, exportertest.NewNopLogsExporter(), nil) + assert.Error(t, err) + + _, err = NewLogProcessor(testCfg, nil, newTestLProcessor(nil)) + assert.Equal(t, componenterror.ErrNilNextConsumer, err) +} + +func TestNewLogExporter_ProcessLogError(t *testing.T) { + want := errors.New("my_error") + me, err := NewLogProcessor(testCfg, exportertest.NewNopLogsExporter(), newTestLProcessor(want)) + require.NoError(t, err) + assert.Equal(t, want, me.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty())) +} + +type testTProcessor struct { + retError error +} + +func newTestTProcessor(retError error) TProcessor { + return &testTProcessor{retError: retError} +} + +func (ttp *testTProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) { + return td, ttp.retError +} + +type testMProcessor struct { + retError error +} + +func newTestMProcessor(retError error) MProcessor { + return &testMProcessor{retError: retError} +} + +func (tmp *testMProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { + return md, tmp.retError +} + +type testLProcessor struct { + retError error +} + +func newTestLProcessor(retError error) LProcessor { + return &testLProcessor{retError: retError} +} + +func (tlp *testLProcessor) ProcessLogs(_ context.Context, ld data.Logs) (data.Logs, error) { + return ld, tlp.retError +} diff --git a/processor/resourceprocessor/factory.go b/processor/resourceprocessor/factory.go index 78a0f5f792b6..b9d18c9f9505 100644 --- a/processor/resourceprocessor/factory.go +++ b/processor/resourceprocessor/factory.go @@ -33,6 +33,8 @@ const ( typeStr = "resource" ) +var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true} + // NewFactory returns a new factory for the Resource processor. func NewFactory() component.ProcessorFactory { return processorhelper.NewFactory( @@ -61,7 +63,11 @@ func createTraceProcessor( if err != nil { return nil, err } - return newResourceTraceProcessor(nextConsumer, attrProc), nil + return processorhelper.NewTraceProcessor( + cfg, + nextConsumer, + &resourceProcessor{attrProc: attrProc}, + processorhelper.WithCapabilities(processorCapabilities)) } func createMetricsProcessor( @@ -73,7 +79,11 @@ func createMetricsProcessor( if err != nil { return nil, err } - return newResourceMetricProcessor(nextConsumer, attrProc), nil + return processorhelper.NewMetricsProcessor( + cfg, + nextConsumer, + &resourceProcessor{attrProc: attrProc}, + processorhelper.WithCapabilities(processorCapabilities)) } func createAttrProcessor(cfg *Config, logger *zap.Logger) (*attraction.AttrProc, error) { diff --git a/processor/resourceprocessor/factory_test.go b/processor/resourceprocessor/factory_test.go index d4061234d544..dd087d37b83c 100644 --- a/processor/resourceprocessor/factory_test.go +++ b/processor/resourceprocessor/factory_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/processor/attraction" ) @@ -46,11 +47,11 @@ func TestCreateProcessor(t *testing.T) { }, } - tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) + tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, exportertest.NewNopTraceExporter(), cfg) assert.NoError(t, err) assert.NotNil(t, tp) - mp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) + mp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, exportertest.NewNopMetricsExporter(), cfg) assert.NoError(t, err) assert.NotNil(t, mp) } @@ -59,10 +60,10 @@ func TestInvalidEmptyActions(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() - _, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) + _, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, exportertest.NewNopTraceExporter(), cfg) assert.Error(t, err) - _, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) + _, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, exportertest.NewNopMetricsExporter(), cfg) assert.Error(t, err) } diff --git a/processor/resourceprocessor/resource_processor.go b/processor/resourceprocessor/resource_processor.go index 0b2597b01dc5..eb07a2a31817 100644 --- a/processor/resourceprocessor/resource_processor.go +++ b/processor/resourceprocessor/resource_processor.go @@ -17,27 +17,17 @@ package resourceprocessor import ( "context" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/internal/processor/attraction" ) -type resourceTraceProcessor struct { +type resourceProcessor struct { attrProc *attraction.AttrProc - next consumer.TraceConsumer } -func newResourceTraceProcessor(next consumer.TraceConsumer, attrProc *attraction.AttrProc) *resourceTraceProcessor { - return &resourceTraceProcessor{ - attrProc: attrProc, - next: next, - } -} - -// ConsumeTraceData implements the TraceProcessor interface -func (rtp *resourceTraceProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { +// ProcessTraces implements the TProcessor interface +func (rp *resourceProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) { rss := td.ResourceSpans() for i := 0; i < rss.Len(); i++ { resource := rss.At(i).Resource() @@ -45,55 +35,13 @@ func (rtp *resourceTraceProcessor) ConsumeTraces(ctx context.Context, td pdata.T resource.InitEmpty() } attrs := resource.Attributes() - rtp.attrProc.Process(attrs) + rp.attrProc.Process(attrs) } - return rtp.next.ConsumeTraces(ctx, td) -} - -// GetCapabilities returns the ProcessorCapabilities assocciated with the resource processor. -func (rtp *resourceTraceProcessor) GetCapabilities() component.ProcessorCapabilities { - return component.ProcessorCapabilities{MutatesConsumedData: true} -} - -// Start is invoked during service startup. -func (*resourceTraceProcessor) Start(ctx context.Context, host component.Host) error { - return nil -} - -// Shutdown is invoked during service shutdown. -func (*resourceTraceProcessor) Shutdown(context.Context) error { - return nil -} - -type resourceMetricProcessor struct { - attrProc *attraction.AttrProc - next consumer.MetricsConsumer -} - -func newResourceMetricProcessor(next consumer.MetricsConsumer, attrProc *attraction.AttrProc) *resourceMetricProcessor { - return &resourceMetricProcessor{ - attrProc: attrProc, - next: next, - } -} - -// GetCapabilities returns the ProcessorCapabilities assocciated with the resource processor. -func (rmp *resourceMetricProcessor) GetCapabilities() component.ProcessorCapabilities { - return component.ProcessorCapabilities{MutatesConsumedData: true} -} - -// Start is invoked during service startup. -func (*resourceMetricProcessor) Start(ctx context.Context, host component.Host) error { - return nil -} - -// Shutdown is invoked during service shutdown. -func (*resourceMetricProcessor) Shutdown(context.Context) error { - return nil + return td, nil } -// ConsumeMetricsData implements the MetricsProcessor interface -func (rmp *resourceMetricProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { +// ProcessMetrics implements the MProcessor interface +func (rp *resourceProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) { imd := pdatautil.MetricsToInternalMetrics(md) rms := imd.ResourceMetrics() for i := 0; i < rms.Len(); i++ { @@ -104,7 +52,7 @@ func (rmp *resourceMetricProcessor) ConsumeMetrics(ctx context.Context, md pdata if resource.Attributes().Len() == 0 { resource.Attributes().InitEmptyWithCapacity(1) } - rmp.attrProc.Process(resource.Attributes()) + rp.attrProc.Process(resource.Attributes()) } - return rmp.next.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(imd)) + return md, nil } diff --git a/processor/resourceprocessor/resource_processor_test.go b/processor/resourceprocessor/resource_processor_test.go index 4bfc08aeb7ac..5638d221aff7 100644 --- a/processor/resourceprocessor/resource_processor_test.go +++ b/processor/resourceprocessor/resource_processor_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" @@ -103,10 +104,10 @@ func TestResourceProcessorAttributesUpsert(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Test trace consumer ttn := &testTraceConsumer{} - attrProc, err := attraction.NewAttrProc(&attraction.Settings{Actions: tt.config.AttributesActions}) - require.NoError(t, err) - rtp := newResourceTraceProcessor(ttn, attrProc) + factory := NewFactory() + rtp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, ttn, tt.config) + require.NoError(t, err) assert.Equal(t, true, rtp.GetCapabilities().MutatesConsumedData) sourceTraceData := generateTraceData(tt.sourceAttributes) @@ -117,7 +118,8 @@ func TestResourceProcessorAttributesUpsert(t *testing.T) { // Test metrics consumer tmn := &testMetricsConsumer{} - rmp := newResourceMetricProcessor(tmn, attrProc) + rmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, tmn, tt.config) + require.NoError(t, err) assert.Equal(t, true, rtp.GetCapabilities().MutatesConsumedData) sourceMetricData := generateMetricData(tt.sourceAttributes) diff --git a/processor/spanprocessor/factory.go b/processor/spanprocessor/factory.go index b8c2c5d749c9..0d0119509c7b 100644 --- a/processor/spanprocessor/factory.go +++ b/processor/spanprocessor/factory.go @@ -29,6 +29,8 @@ const ( typeStr = "span" ) +var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true} + // errMissingRequiredField is returned when a required field in the config // is not specified. // TODO https://github.com/open-telemetry/opentelemetry-collector/issues/215 @@ -67,5 +69,13 @@ func createTraceProcessor( return nil, errMissingRequiredField } - return newSpanProcessor(nextConsumer, *oCfg) + sp, err := newSpanProcessor(*oCfg) + if err != nil { + return nil, err + } + return processorhelper.NewTraceProcessor( + cfg, + nextConsumer, + sp, + processorhelper.WithCapabilities(processorCapabilities)) } diff --git a/processor/spanprocessor/span.go b/processor/spanprocessor/span.go index f2438f22b2cc..5697e80b362b 100644 --- a/processor/spanprocessor/span.go +++ b/processor/spanprocessor/span.go @@ -21,24 +21,18 @@ import ( "strconv" "strings" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/filterspan" "go.opentelemetry.io/collector/processor" ) type spanProcessor struct { - nextConsumer consumer.TraceConsumer config Config toAttributeRules []toAttributeRule include filterspan.Matcher exclude filterspan.Matcher } -var _ component.TraceProcessor = (*spanProcessor)(nil) - // toAttributeRule is the compiled equivalent of config.ToAttributes field. type toAttributeRule struct { // Compiled regexp. @@ -49,11 +43,7 @@ type toAttributeRule struct { } // newSpanProcessor returns the span processor. -func newSpanProcessor(nextConsumer consumer.TraceConsumer, config Config) (*spanProcessor, error) { - if nextConsumer == nil { - return nil, componenterror.ErrNilNextConsumer - } - +func newSpanProcessor(config Config) (*spanProcessor, error) { include, err := filterspan.NewMatcher(config.Include) if err != nil { return nil, err @@ -64,10 +54,9 @@ func newSpanProcessor(nextConsumer consumer.TraceConsumer, config Config) (*span } sp := &spanProcessor{ - nextConsumer: nextConsumer, - config: config, - include: include, - exclude: exclude, + config: config, + include: include, + exclude: exclude, } // Compile ToAttributes regexp and extract attributes names. @@ -91,7 +80,7 @@ func newSpanProcessor(nextConsumer consumer.TraceConsumer, config Config) (*span return sp, nil } -func (sp *spanProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { +func (sp *spanProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) { rss := td.ResourceSpans() for i := 0; i < rss.Len(); i++ { rs := rss.At(i) @@ -120,21 +109,7 @@ func (sp *spanProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) err } } } - return sp.nextConsumer.ConsumeTraces(ctx, td) -} - -func (sp *spanProcessor) GetCapabilities() component.ProcessorCapabilities { - return component.ProcessorCapabilities{MutatesConsumedData: true} -} - -// Start is invoked during service startup. -func (sp *spanProcessor) Start(_ context.Context, _ component.Host) error { - return nil -} - -// Shutdown is invoked during service shutdown. -func (sp *spanProcessor) Shutdown(context.Context) error { - return nil + return td, nil } func (sp *spanProcessor) processFromAttributes(span pdata.Span) { diff --git a/processor/spanprocessor/span_test.go b/processor/spanprocessor/span_test.go index 816308c74051..83452e0cc53d 100644 --- a/processor/spanprocessor/span_test.go +++ b/processor/spanprocessor/span_test.go @@ -36,11 +36,12 @@ func TestNewTraceProcessor(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) - tp, err := newSpanProcessor(nil, *oCfg) + oCfg.Rename.FromAttributes = []string{"foo"} + tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg) require.Error(t, componenterror.ErrNilNextConsumer, err) require.Nil(t, tp) - tp, err = newSpanProcessor(exportertest.NewNopTraceExporter(), *oCfg) + tp, err = factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, exportertest.NewNopTraceExporter(), cfg) require.Nil(t, err) require.NotNil(t, tp) }