diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 65580d4108a..a87681027ac 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -36,6 +36,7 @@ func TestLoadConfig(t *testing.T) { factory := NewFactory() factories.Exporters[typeStr] = factory + cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) require.NoError(t, err) @@ -81,12 +82,12 @@ func TestLoadConfig(t *testing.T) { PermitWithoutStream: true, Timeout: 30 * time.Second, }, - WriteBufferSize: 512 * 1024, PerRPCAuth: &configgrpc.PerRPCAuthConfig{ AuthType: "bearer", BearerToken: "some-token", }, - BalancerName: "round_robin", + WriteBufferSize: 512 * 1024, + BalancerName: "round_robin", }, }) } diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 023fd8d4728..1abdad5449f 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -71,6 +71,7 @@ func createTracesExporter( exporterhelper.WithTimeout(oCfg.TimeoutSettings), exporterhelper.WithRetry(oCfg.RetrySettings), exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(oce.start), exporterhelper.WithShutdown(oce.shutdown)) } @@ -92,6 +93,7 @@ func createMetricsExporter( exporterhelper.WithTimeout(oCfg.TimeoutSettings), exporterhelper.WithRetry(oCfg.RetrySettings), exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(oce.start), exporterhelper.WithShutdown(oce.shutdown), ) } @@ -114,6 +116,7 @@ func createLogsExporter( exporterhelper.WithTimeout(oCfg.TimeoutSettings), exporterhelper.WithRetry(oCfg.RetrySettings), exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(oce.start), exporterhelper.WithShutdown(oce.shutdown), ) } diff --git a/exporter/otlpexporter/factory_test.go b/exporter/otlpexporter/factory_test.go index 2364a80679d..d9f5fee16a1 100644 --- a/exporter/otlpexporter/factory_test.go +++ b/exporter/otlpexporter/factory_test.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configgrpc" @@ -57,11 +58,11 @@ func TestCreateMetricsExporter(t *testing.T) { func TestCreateTracesExporter(t *testing.T) { endpoint := testutil.GetAvailableLocalAddress(t) - tests := []struct { - name string - config Config - mustFail bool + name string + config Config + mustFailOnCreate bool + mustFailOnStart bool }{ { name: "NoEndpoint", @@ -71,7 +72,7 @@ func TestCreateTracesExporter(t *testing.T) { Endpoint: "", }, }, - mustFail: true, + mustFailOnCreate: true, }, { name: "UseSecure", @@ -140,7 +141,7 @@ func TestCreateTracesExporter(t *testing.T) { Compression: "unknown compression", }, }, - mustFail: true, + mustFailOnStart: true, }, { name: "CaCert", @@ -169,7 +170,7 @@ func TestCreateTracesExporter(t *testing.T) { }, }, }, - mustFail: true, + mustFailOnStart: true, }, } @@ -178,19 +179,23 @@ func TestCreateTracesExporter(t *testing.T) { factory := NewFactory() creationParams := component.ExporterCreateParams{Logger: zap.NewNop()} consumer, err := factory.CreateTracesExporter(context.Background(), creationParams, &tt.config) - - if tt.mustFail { + if tt.mustFailOnCreate { assert.NotNil(t, err) - } else { - assert.NoError(t, err) - assert.NotNil(t, consumer) - - err = consumer.Shutdown(context.Background()) - if err != nil { - // Since the endpoint of OTLP exporter doesn't actually exist, - // exporter may already stop because it cannot connect. - assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing") - } + return + } + assert.NoError(t, err) + assert.NotNil(t, consumer) + err = consumer.Start(context.Background(), componenttest.NewNopHost()) + if tt.mustFailOnStart { + assert.Error(t, err) + return + } + assert.NoError(t, err) + err = consumer.Shutdown(context.Background()) + if err != nil { + // Since the endpoint of OTLP exporter doesn't actually exist, + // exporter may already stop because it cannot connect. + assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing") } }) } diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index c4a6d424994..ffbcd082635 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" @@ -48,14 +49,14 @@ func newExporter(cfg config.Exporter) (*exporter, error) { return nil, errors.New("OTLP exporter config requires an Endpoint") } - e := &exporter{} - e.config = oCfg - w, err := newGrpcSender(oCfg) - if err != nil { - return nil, err - } - e.w = w - return e, nil + return &exporter{config: oCfg}, nil +} + +// start actually creates the gRPC connection. The client construction is deferred till this point as this +// is the only place we get hold of Extensions which are required to construct auth round tripper. +func (e *exporter) start(_ context.Context, _ component.Host) (err error) { + e.w, err = newGrpcSender(e.config) + return } func (e *exporter) shutdown(context.Context) error { diff --git a/exporter/otlpexporter/otlp_test.go b/exporter/otlpexporter/otlp_test.go index 5df7046ab8e..446904ebea2 100644 --- a/exporter/otlpexporter/otlp_test.go +++ b/exporter/otlpexporter/otlp_test.go @@ -188,12 +188,12 @@ func TestSendTraces(t *testing.T) { exp, err := factory.CreateTracesExporter(context.Background(), creationParams, cfg) require.NoError(t, err) require.NotNil(t, exp) + defer func() { assert.NoError(t, exp.Shutdown(context.Background())) }() host := componenttest.NewNopHost() - assert.NoError(t, exp.Start(context.Background(), host)) // Ensure that initially there is no data in the receiver. diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index 0ab75cf1c21..d4eb9000201 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -92,6 +92,7 @@ func createTracesExporter( cfg, params.Logger, oce.pushTraceData, + exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), @@ -119,6 +120,7 @@ func createMetricsExporter( cfg, params.Logger, oce.pushMetricsData, + exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), @@ -146,6 +148,7 @@ func createLogsExporter( cfg, params.Logger, oce.pushLogData, + exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), diff --git a/exporter/otlphttpexporter/factory_test.go b/exporter/otlphttpexporter/factory_test.go index eb922aec60b..f0f398d91f4 100644 --- a/exporter/otlphttpexporter/factory_test.go +++ b/exporter/otlphttpexporter/factory_test.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/confighttp" @@ -62,9 +63,10 @@ func TestCreateTracesExporter(t *testing.T) { endpoint := "http://" + testutil.GetAvailableLocalAddress(t) tests := []struct { - name string - config Config - mustFail bool + name string + config Config + mustFailOnCreate bool + mustFailOnStart bool }{ { name: "NoEndpoint", @@ -74,7 +76,7 @@ func TestCreateTracesExporter(t *testing.T) { Endpoint: "", }, }, - mustFail: true, + mustFailOnCreate: true, }, { name: "UseSecure", @@ -128,7 +130,8 @@ func TestCreateTracesExporter(t *testing.T) { }, }, }, - mustFail: true, + mustFailOnCreate: false, + mustFailOnStart: true, }, } @@ -138,18 +141,22 @@ func TestCreateTracesExporter(t *testing.T) { creationParams := component.ExporterCreateParams{Logger: zap.NewNop()} consumer, err := factory.CreateTracesExporter(context.Background(), creationParams, &tt.config) - if tt.mustFail { + if tt.mustFailOnCreate { assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.NotNil(t, consumer) + return + } + assert.NoError(t, err) + assert.NotNil(t, consumer) + err = consumer.Start(context.Background(), componenttest.NewNopHost()) + if tt.mustFailOnStart { + assert.Error(t, err) + } - err = consumer.Shutdown(context.Background()) - if err != nil { - // Since the endpoint of OTLP exporter doesn't actually exist, - // exporter may already stop because it cannot connect. - assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing") - } + err = consumer.Shutdown(context.Background()) + if err != nil { + // Since the endpoint of OTLP exporter doesn't actually exist, + // exporter may already stop because it cannot connect. + assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing") } }) } diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index dddee78d751..5d3c39a31a3 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -31,6 +31,7 @@ import ( "google.golang.org/genproto/googleapis/rpc/status" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer/consumererror" @@ -65,24 +66,30 @@ func newExporter(cfg config.Exporter, logger *zap.Logger) (*exporter, error) { } } - client, err := oCfg.HTTPClientSettings.ToClient() + // client construction is deferred to start + return &exporter{ + config: oCfg, + logger: logger, + }, nil +} + +// start actually creates the HTTP client. The client construction is deferred till this point as this +// is the only place we get hold of Extensions which are required to construct auth round tripper. +func (e *exporter) start(_ context.Context, _ component.Host) error { + client, err := e.config.HTTPClientSettings.ToClient() if err != nil { - return nil, err + return err } - if oCfg.Compression != "" { - if strings.ToLower(oCfg.Compression) == configgrpc.CompressionGzip { + if e.config.Compression != "" { + if strings.ToLower(e.config.Compression) == configgrpc.CompressionGzip { client.Transport = middleware.NewCompressRoundTripper(client.Transport) } else { - return nil, fmt.Errorf("unsupported compression type %q", oCfg.Compression) + return fmt.Errorf("unsupported compression type %q", e.config.Compression) } } - - return &exporter{ - config: oCfg, - client: client, - logger: logger, - }, nil + e.client = client + return nil } func (e *exporter) pushTraceData(ctx context.Context, traces pdata.Traces) error { diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 5154757cc0c..6dd467fffcc 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -165,13 +165,15 @@ func TestCompressionOptions(t *testing.T) { factory := NewFactory() cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig()) cfg.Compression = test.compression - exp, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) + exp, _ := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) + err := exp.Start(context.Background(), componenttest.NewNopHost()) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) if test.err { - assert.Error(t, err) + require.Error(t, err) return } - require.NoError(t, err) - startAndCleanup(t, exp) td := testdata.GenerateTracesOneSpan() assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) @@ -451,6 +453,14 @@ func TestErrorResponses(t *testing.T) { exp, err := createTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) require.NoError(t, err) + // start the exporter + err = exp.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, exp.Shutdown(context.Background())) + }) + + // generate traces traces := pdata.NewTraces() err = exp.ConsumeTraces(context.Background(), traces) assert.Error(t, err) diff --git a/exporter/zipkinexporter/factory.go b/exporter/zipkinexporter/factory.go index bd6b0239337..7ffebdefd94 100644 --- a/exporter/zipkinexporter/factory.go +++ b/exporter/zipkinexporter/factory.go @@ -79,6 +79,7 @@ func createTracesExporter( zc, params.Logger, ze.pushTraceData, + exporterhelper.WithStart(ze.start), // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithQueue(zc.QueueSettings), diff --git a/exporter/zipkinexporter/zipkin.go b/exporter/zipkinexporter/zipkin.go index dcfd864aac5..058732311bb 100644 --- a/exporter/zipkinexporter/zipkin.go +++ b/exporter/zipkinexporter/zipkin.go @@ -23,6 +23,8 @@ import ( "github.com/openzipkin/zipkin-go/proto/zipkin_proto3" zipkinreporter "github.com/openzipkin/zipkin-go/reporter" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/translator/trace/zipkin" @@ -36,21 +38,18 @@ import ( type zipkinExporter struct { defaultServiceName string - url string - client *http.Client - serializer zipkinreporter.SpanSerializer + url string + client *http.Client + serializer zipkinreporter.SpanSerializer + clientSettings *confighttp.HTTPClientSettings } func createZipkinExporter(cfg *Config) (*zipkinExporter, error) { - client, err := cfg.HTTPClientSettings.ToClient() - if err != nil { - return nil, err - } - ze := &zipkinExporter{ defaultServiceName: cfg.DefaultServiceName, url: cfg.Endpoint, - client: client, + clientSettings: &cfg.HTTPClientSettings, + client: nil, } switch cfg.Format { @@ -65,6 +64,12 @@ func createZipkinExporter(cfg *Config) (*zipkinExporter, error) { return ze, nil } +// start creates the http client +func (ze *zipkinExporter) start(_ context.Context, _ component.Host) (err error) { + ze.client, err = ze.clientSettings.ToClient() + return +} + func (ze *zipkinExporter) pushTraceData(ctx context.Context, td pdata.Traces) error { tbatch, err := zipkin.InternalTracesToZipkinSpans(td) if err != nil { diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index 14aaf302277..55f0f621df1 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -69,6 +69,8 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) { assert.NoError(t, err) require.NotNil(t, zexp) + require.NoError(t, zexp.Start(context.Background(), componenttest.NewNopHost())) + // The test requires the spans from zipkinSpansJSONJavaLibrary to be sent in a single batch, use // a mock to ensure that this happens as intended. mzr := newMockZipkinReporter(cst.URL) @@ -314,6 +316,8 @@ func TestZipkinExporter_roundtripProto(t *testing.T) { zexp, err := NewFactory().CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) require.NoError(t, err) + require.NoError(t, zexp.Start(context.Background(), componenttest.NewNopHost())) + // The test requires the spans from zipkinSpansJSONJavaLibrary to be sent in a single batch, use // a mock to ensure that this happens as intended. mzr := newMockZipkinReporter(cst.URL)