Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable batch and queued retry processors by default #2330

Merged
merged 3 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions cmd/opentelemetry/app/defaultconfig/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configprotocol"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/processor/queuedprocessor"
"go.opentelemetry.io/collector/processor/resourceprocessor"
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
"go.opentelemetry.io/collector/receiver/zipkinreceiver"
Expand Down Expand Up @@ -71,11 +73,7 @@ func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) {
return nil, err
}
receivers := createReceivers(c.ComponentType, c.ZipkinHostPort, c.Factories)
processors := configmodels.Processors{}
resProcessor := c.Factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
if len(resProcessor.Labels) > 0 {
processors[resProcessor.Name()] = resProcessor
}
processors := createProcessors(c.Factories)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this method return the list of processor names aswell, rather than having a hardcoded list on line 89.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 that is better

hc := c.Factories.Extensions["health_check"].CreateDefaultConfig()
return &configmodels.Config{
Receivers: receivers,
Expand All @@ -88,14 +86,27 @@ func (c ComponentSettings) CreateDefaultConfig() (*configmodels.Config, error) {
string(configmodels.TracesDataType): {
InputType: configmodels.TracesDataType,
Receivers: receiverNames(receivers),
Processors: processorNames(processors),
Processors: processorNames([]string{"resource", "batch", "queued_retry"}, processors),
Exporters: exporterNames(exporters),
},
},
},
}, nil
}

func createProcessors(factories config.Factories) configmodels.Processors {
processors := configmodels.Processors{}
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
if len(resProcessor.Labels) > 0 {
processors[resProcessor.Name()] = resProcessor
}
batchProcessor := factories.Processors["batch"].CreateDefaultConfig().(*batchprocessor.Config)
processors[batchProcessor.Name()] = batchProcessor
queuedProcessor := factories.Processors["queued_retry"].CreateDefaultConfig().(*queuedprocessor.Config)
processors[queuedProcessor.Name()] = queuedProcessor
return processors
}

func createReceivers(component ComponentType, zipkinHostPort string, factories config.Factories) configmodels.Receivers {
if component == Ingester {
kafkaReceiver := factories.Receivers[kafkareceiver.TypeStr].CreateDefaultConfig().(*kafkareceiver.Config)
Expand Down Expand Up @@ -188,10 +199,14 @@ func receiverNames(receivers configmodels.Receivers) []string {
return names
}

func processorNames(processors configmodels.Processors) []string {
// processorNames returns processor names that are present in both input parameters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the list of processors be different?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the resource processor is enabled only when needed.

func processorNames(processorNames []string, processors configmodels.Processors) []string {
var names []string
for _, v := range processors {
names = append(names, v.Name())
for _, name := range processorNames {
// add processors that are in processorNames
if _, ok := processors[name]; ok {
names = append(names, name)
}
}
return names
}
Expand Down
30 changes: 17 additions & 13 deletions cmd/opentelemetry/app/defaultconfig/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func TestService(t *testing.T) {
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger"},
Exporters: []string{"jaeger"},
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger"},
Processors: []string{"batch", "queued_retry"},
Exporters: []string{"jaeger"},
},
},
},
Expand All @@ -70,7 +71,7 @@ func TestService(t *testing.T) {
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger"},
Processors: []string{"resource"},
Processors: []string{"resource", "batch", "queued_retry"},
Exporters: []string{elasticsearchexporter.TypeStr, kafkaexporter.TypeStr, memoryexporter.TypeStr},
},
},
Expand All @@ -85,9 +86,10 @@ func TestService(t *testing.T) {
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Exporters: []string{elasticsearchexporter.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Processors: []string{"batch", "queued_retry"},
Exporters: []string{elasticsearchexporter.TypeStr},
},
},
},
Expand All @@ -101,9 +103,10 @@ func TestService(t *testing.T) {
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Exporters: []string{cassandraexporter.TypeStr, elasticsearchexporter.TypeStr, grpcpluginexporter.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{kafkareceiver.TypeStr},
Processors: []string{"batch", "queued_retry"},
Exporters: []string{cassandraexporter.TypeStr, elasticsearchexporter.TypeStr, grpcpluginexporter.TypeStr},
},
},
},
Expand All @@ -118,9 +121,10 @@ func TestService(t *testing.T) {
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger", "zipkin"},
Exporters: []string{elasticsearchexporter.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"otlp", "jaeger", "zipkin"},
Processors: []string{"batch", "queued_retry"},
Exporters: []string{elasticsearchexporter.TypeStr},
},
},
},
Expand Down