From 90307c51f9aedca65505b0371597b8fec83c75b3 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Tue, 7 Jul 2020 15:35:25 +0200 Subject: [PATCH 1/3] Enable batch and queued retry processors by default Signed-off-by: Pavol Loffay --- .../app/defaultconfig/default_config.go | 32 +++++++++++++------ .../app/defaultconfig/default_config_test.go | 30 +++++++++-------- 2 files changed, 40 insertions(+), 22 deletions(-) diff --git a/cmd/opentelemetry/app/defaultconfig/default_config.go b/cmd/opentelemetry/app/defaultconfig/default_config.go index 3c63da2206e..a3d14f9e380 100644 --- a/cmd/opentelemetry/app/defaultconfig/default_config.go +++ b/cmd/opentelemetry/app/defaultconfig/default_config.go @@ -23,6 +23,8 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configprotocol" + "go.opentelemetry.io/collector/processor/batchprocessor" + "go.opentelemetry.io/collector/processor/queuedprocessor" "go.opentelemetry.io/collector/processor/resourceprocessor" "go.opentelemetry.io/collector/receiver/jaegerreceiver" "go.opentelemetry.io/collector/receiver/zipkinreceiver" @@ -71,11 +73,7 @@ func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) { return nil, err } receivers := createReceivers(c.ComponentType, c.ZipkinHostPort, c.Factories) - processors := configmodels.Processors{} - resProcessor := c.Factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config) - if len(resProcessor.Labels) > 0 { - processors[resProcessor.Name()] = resProcessor - } + processors := createProcessors(c.Factories) hc := c.Factories.Extensions["health_check"].CreateDefaultConfig() return &configmodels.Config{ Receivers: receivers, @@ -88,7 +86,7 @@ func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) { string(configmodels.TracesDataType): { InputType: configmodels.TracesDataType, Receivers: receiverNames(receivers), - Processors: processorNames(processors), + Processors: processorNames([]string{"resource", "batch", "queued_retry"}, processors), Exporters: exporterNames(exporters), }, }, @@ -96,6 +94,19 @@ func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) { }, nil } +func createProcessors(factories config.Factories) configmodels.Processors { + processors := configmodels.Processors{} + resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config) + if len(resProcessor.Labels) > 0 { + processors[resProcessor.Name()] = resProcessor + } + batchProcessor := factories.Processors["batch"].CreateDefaultConfig().(*batchprocessor.Config) + processors[batchProcessor.Name()] = batchProcessor + queuedProcessor := factories.Processors["queued_retry"].CreateDefaultConfig().(*queuedprocessor.Config) + processors[queuedProcessor.Name()] = queuedProcessor + return processors +} + func createReceivers(component ComponentType, zipkinHostPort string, factories config.Factories) configmodels.Receivers { if component == Ingester { kafkaReceiver := factories.Receivers[kafkareceiver.TypeStr].CreateDefaultConfig().(*kafkareceiver.Config) @@ -188,10 +199,13 @@ func receiverNames(receivers configmodels.Receivers) []string { return names } -func processorNames(processors configmodels.Processors) []string { +func processorNames(processorNames []string, processors configmodels.Processors) []string { var names []string - for _, v := range processors { - names = append(names, v.Name()) + for _, name := range processorNames { + // add processors that are in processorNames + if _, ok := processors[name]; ok { + names = append(names, name) + } } return names } diff --git a/cmd/opentelemetry/app/defaultconfig/default_config_test.go b/cmd/opentelemetry/app/defaultconfig/default_config_test.go index 3b58c4deefd..c1ada86790c 100644 --- a/cmd/opentelemetry/app/defaultconfig/default_config_test.go +++ b/cmd/opentelemetry/app/defaultconfig/default_config_test.go @@ -51,9 +51,10 @@ func TestService(t *testing.T) { Extensions: []string{"health_check"}, Pipelines: configmodels.Pipelines{ "traces": &configmodels.Pipeline{ - InputType: configmodels.TracesDataType, - Receivers: []string{"otlp", "jaeger"}, - Exporters: []string{"jaeger"}, + InputType: configmodels.TracesDataType, + Receivers: []string{"otlp", "jaeger"}, + Processors: []string{"batch", "queued_retry"}, + Exporters: []string{"jaeger"}, }, }, }, @@ -70,7 +71,7 @@ func TestService(t *testing.T) { "traces": &configmodels.Pipeline{ InputType: configmodels.TracesDataType, Receivers: []string{"otlp", "jaeger"}, - Processors: []string{"resource"}, + Processors: []string{"resource", "batch", "queued_retry"}, Exporters: []string{elasticsearchexporter.TypeStr, kafkaexporter.TypeStr, memoryexporter.TypeStr}, }, }, @@ -85,9 +86,10 @@ func TestService(t *testing.T) { Extensions: []string{"health_check"}, Pipelines: configmodels.Pipelines{ "traces": &configmodels.Pipeline{ - InputType: configmodels.TracesDataType, - Receivers: []string{kafkareceiver.TypeStr}, - Exporters: []string{elasticsearchexporter.TypeStr}, + InputType: configmodels.TracesDataType, + Receivers: []string{kafkareceiver.TypeStr}, + Processors: []string{"batch", "queued_retry"}, + Exporters: []string{elasticsearchexporter.TypeStr}, }, }, }, @@ -101,9 +103,10 @@ func TestService(t *testing.T) { Extensions: []string{"health_check"}, Pipelines: configmodels.Pipelines{ "traces": &configmodels.Pipeline{ - InputType: configmodels.TracesDataType, - Receivers: []string{kafkareceiver.TypeStr}, - Exporters: []string{cassandraexporter.TypeStr, elasticsearchexporter.TypeStr, grpcpluginexporter.TypeStr}, + InputType: configmodels.TracesDataType, + Receivers: []string{kafkareceiver.TypeStr}, + Processors: []string{"batch", "queued_retry"}, + Exporters: []string{cassandraexporter.TypeStr, elasticsearchexporter.TypeStr, grpcpluginexporter.TypeStr}, }, }, }, @@ -118,9 +121,10 @@ func TestService(t *testing.T) { Extensions: []string{"health_check"}, Pipelines: configmodels.Pipelines{ "traces": &configmodels.Pipeline{ - InputType: configmodels.TracesDataType, - Receivers: []string{"otlp", "jaeger", "zipkin"}, - Exporters: []string{elasticsearchexporter.TypeStr}, + InputType: configmodels.TracesDataType, + Receivers: []string{"otlp", "jaeger", "zipkin"}, + Processors: []string{"batch", "queued_retry"}, + Exporters: []string{elasticsearchexporter.TypeStr}, }, }, }, From 2119ddac7dcd7c41e558094eae6f82adb177a553 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Tue, 7 Jul 2020 15:42:41 +0200 Subject: [PATCH 2/3] Add godoc Signed-off-by: Pavol Loffay --- cmd/opentelemetry/app/defaultconfig/default_config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/opentelemetry/app/defaultconfig/default_config.go b/cmd/opentelemetry/app/defaultconfig/default_config.go index a3d14f9e380..14a45922c18 100644 --- a/cmd/opentelemetry/app/defaultconfig/default_config.go +++ b/cmd/opentelemetry/app/defaultconfig/default_config.go @@ -199,6 +199,7 @@ func receiverNames(receivers configmodels.Receivers) []string { return names } +// processorNames returns processor names that are present in both input parameters func processorNames(processorNames []string, processors configmodels.Processors) []string { var names []string for _, name := range processorNames { From 272027158fad8f56cceeb058379f6671a13201e6 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Tue, 7 Jul 2020 16:33:30 +0200 Subject: [PATCH 3/3] Fix review comments Signed-off-by: Pavol Loffay --- .../app/defaultconfig/default_config.go | 38 ++++++++----------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/cmd/opentelemetry/app/defaultconfig/default_config.go b/cmd/opentelemetry/app/defaultconfig/default_config.go index 14a45922c18..0e43b1bf1cc 100644 --- a/cmd/opentelemetry/app/defaultconfig/default_config.go +++ b/cmd/opentelemetry/app/defaultconfig/default_config.go @@ -73,7 +73,7 @@ func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) { return nil, err } receivers := createReceivers(c.ComponentType, c.ZipkinHostPort, c.Factories) - processors := createProcessors(c.Factories) + processors, processorNames := createProcessors(c.Factories) hc := c.Factories.Extensions["health_check"].CreateDefaultConfig() return &configmodels.Config{ Receivers: receivers, @@ -86,7 +86,7 @@ func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) { string(configmodels.TracesDataType): { InputType: configmodels.TracesDataType, Receivers: receiverNames(receivers), - Processors: processorNames([]string{"resource", "batch", "queued_retry"}, processors), + Processors: processorNames, Exporters: exporterNames(exporters), }, }, @@ -94,17 +94,21 @@ func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) { }, nil } -func createProcessors(factories config.Factories) configmodels.Processors { +func createProcessors(factories config.Factories) (configmodels.Processors, []string) { processors := configmodels.Processors{} - resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config) - if len(resProcessor.Labels) > 0 { - processors[resProcessor.Name()] = resProcessor + var names []string + resource := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config) + if len(resource.Labels) > 0 { + processors[resource.Name()] = resource + names = append(names, resource.Name()) } - batchProcessor := factories.Processors["batch"].CreateDefaultConfig().(*batchprocessor.Config) - processors[batchProcessor.Name()] = batchProcessor - queuedProcessor := factories.Processors["queued_retry"].CreateDefaultConfig().(*queuedprocessor.Config) - processors[queuedProcessor.Name()] = queuedProcessor - return processors + batch := factories.Processors["batch"].CreateDefaultConfig().(*batchprocessor.Config) + processors[batch.Name()] = batch + names = append(names, batch.Name()) + queuedRetry := factories.Processors["queued_retry"].CreateDefaultConfig().(*queuedprocessor.Config) + processors[queuedRetry.Name()] = queuedRetry + names = append(names, queuedRetry.Name()) + return processors, names } func createReceivers(component ComponentType, zipkinHostPort string, factories config.Factories) configmodels.Receivers { @@ -199,18 +203,6 @@ func receiverNames(receivers configmodels.Receivers) []string { return names } -// processorNames returns processor names that are present in both input parameters -func processorNames(processorNames []string, processors configmodels.Processors) []string { - var names []string - for _, name := range processorNames { - // add processors that are in processorNames - if _, ok := processors[name]; ok { - names = append(names, name) - } - } - return names -} - func exporterNames(exporters configmodels.Exporters) []string { var names []string for _, v := range exporters {