Skip to content

Commit

Permalink
Merge branch 'open-telemetry:main' into config-package-typos
Browse files Browse the repository at this point in the history
  • Loading branch information
euniceek committed May 25, 2021
2 parents 34745bc + a817b0f commit 09723e8
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 69 deletions.
5 changes: 3 additions & 2 deletions exporter/otlpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestLoadConfig(t *testing.T) {

factory := NewFactory()
factories.Exporters[typeStr] = factory

cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

require.NoError(t, err)
Expand Down Expand Up @@ -81,12 +82,12 @@ func TestLoadConfig(t *testing.T) {
PermitWithoutStream: true,
Timeout: 30 * time.Second,
},
WriteBufferSize: 512 * 1024,
PerRPCAuth: &configgrpc.PerRPCAuthConfig{
AuthType: "bearer",
BearerToken: "some-token",
},
BalancerName: "round_robin",
WriteBufferSize: 512 * 1024,
BalancerName: "round_robin",
},
})
}
3 changes: 3 additions & 0 deletions exporter/otlpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func createTracesExporter(
exporterhelper.WithTimeout(oCfg.TimeoutSettings),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown))
}

Expand All @@ -92,6 +93,7 @@ func createMetricsExporter(
exporterhelper.WithTimeout(oCfg.TimeoutSettings),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown),
)
}
Expand All @@ -114,6 +116,7 @@ func createLogsExporter(
exporterhelper.WithTimeout(oCfg.TimeoutSettings),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown),
)
}
43 changes: 24 additions & 19 deletions exporter/otlpexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configgrpc"
Expand Down Expand Up @@ -57,11 +58,11 @@ func TestCreateMetricsExporter(t *testing.T) {

func TestCreateTracesExporter(t *testing.T) {
endpoint := testutil.GetAvailableLocalAddress(t)

tests := []struct {
name string
config Config
mustFail bool
name string
config Config
mustFailOnCreate bool
mustFailOnStart bool
}{
{
name: "NoEndpoint",
Expand All @@ -71,7 +72,7 @@ func TestCreateTracesExporter(t *testing.T) {
Endpoint: "",
},
},
mustFail: true,
mustFailOnCreate: true,
},
{
name: "UseSecure",
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestCreateTracesExporter(t *testing.T) {
Compression: "unknown compression",
},
},
mustFail: true,
mustFailOnStart: true,
},
{
name: "CaCert",
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestCreateTracesExporter(t *testing.T) {
},
},
},
mustFail: true,
mustFailOnStart: true,
},
}

Expand All @@ -178,19 +179,23 @@ func TestCreateTracesExporter(t *testing.T) {
factory := NewFactory()
creationParams := component.ExporterCreateParams{Logger: zap.NewNop()}
consumer, err := factory.CreateTracesExporter(context.Background(), creationParams, &tt.config)

if tt.mustFail {
if tt.mustFailOnCreate {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, consumer)

err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of OTLP exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
return
}
assert.NoError(t, err)
assert.NotNil(t, consumer)
err = consumer.Start(context.Background(), componenttest.NewNopHost())
if tt.mustFailOnStart {
assert.Error(t, err)
return
}
assert.NoError(t, err)
err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of OTLP exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
})
}
Expand Down
17 changes: 9 additions & 8 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
Expand All @@ -48,14 +49,14 @@ func newExporter(cfg config.Exporter) (*exporter, error) {
return nil, errors.New("OTLP exporter config requires an Endpoint")
}

e := &exporter{}
e.config = oCfg
w, err := newGrpcSender(oCfg)
if err != nil {
return nil, err
}
e.w = w
return e, nil
return &exporter{config: oCfg}, nil
}

// start actually creates the gRPC connection. The client construction is deferred till this point as this
// is the only place we get hold of Extensions which are required to construct auth round tripper.
func (e *exporter) start(_ context.Context, _ component.Host) (err error) {
e.w, err = newGrpcSender(e.config)
return
}

func (e *exporter) shutdown(context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ func TestSendTraces(t *testing.T) {
exp, err := factory.CreateTracesExporter(context.Background(), creationParams, cfg)
require.NoError(t, err)
require.NotNil(t, exp)

defer func() {
assert.NoError(t, exp.Shutdown(context.Background()))
}()

host := componenttest.NewNopHost()

assert.NoError(t, exp.Start(context.Background(), host))

// Ensure that initially there is no data in the receiver.
Expand Down
3 changes: 3 additions & 0 deletions exporter/otlphttpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func createTracesExporter(
cfg,
params.Logger,
oce.pushTraceData,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
Expand Down Expand Up @@ -119,6 +120,7 @@ func createMetricsExporter(
cfg,
params.Logger,
oce.pushMetricsData,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
Expand Down Expand Up @@ -146,6 +148,7 @@ func createLogsExporter(
cfg,
params.Logger,
oce.pushLogData,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
Expand Down
37 changes: 22 additions & 15 deletions exporter/otlphttpexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/confighttp"
Expand Down Expand Up @@ -62,9 +63,10 @@ func TestCreateTracesExporter(t *testing.T) {
endpoint := "http://" + testutil.GetAvailableLocalAddress(t)

tests := []struct {
name string
config Config
mustFail bool
name string
config Config
mustFailOnCreate bool
mustFailOnStart bool
}{
{
name: "NoEndpoint",
Expand All @@ -74,7 +76,7 @@ func TestCreateTracesExporter(t *testing.T) {
Endpoint: "",
},
},
mustFail: true,
mustFailOnCreate: true,
},
{
name: "UseSecure",
Expand Down Expand Up @@ -128,7 +130,8 @@ func TestCreateTracesExporter(t *testing.T) {
},
},
},
mustFail: true,
mustFailOnCreate: false,
mustFailOnStart: true,
},
}

Expand All @@ -138,18 +141,22 @@ func TestCreateTracesExporter(t *testing.T) {
creationParams := component.ExporterCreateParams{Logger: zap.NewNop()}
consumer, err := factory.CreateTracesExporter(context.Background(), creationParams, &tt.config)

if tt.mustFail {
if tt.mustFailOnCreate {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, consumer)
return
}
assert.NoError(t, err)
assert.NotNil(t, consumer)
err = consumer.Start(context.Background(), componenttest.NewNopHost())
if tt.mustFailOnStart {
assert.Error(t, err)
}

err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of OTLP exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of OTLP exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
})
}
Expand Down
29 changes: 18 additions & 11 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer/consumererror"
Expand Down Expand Up @@ -65,24 +66,30 @@ func newExporter(cfg config.Exporter, logger *zap.Logger) (*exporter, error) {
}
}

client, err := oCfg.HTTPClientSettings.ToClient()
// client construction is deferred to start
return &exporter{
config: oCfg,
logger: logger,
}, nil
}

// start actually creates the HTTP client. The client construction is deferred till this point as this
// is the only place we get hold of Extensions which are required to construct auth round tripper.
func (e *exporter) start(_ context.Context, _ component.Host) error {
client, err := e.config.HTTPClientSettings.ToClient()
if err != nil {
return nil, err
return err
}

if oCfg.Compression != "" {
if strings.ToLower(oCfg.Compression) == configgrpc.CompressionGzip {
if e.config.Compression != "" {
if strings.ToLower(e.config.Compression) == configgrpc.CompressionGzip {
client.Transport = middleware.NewCompressRoundTripper(client.Transport)
} else {
return nil, fmt.Errorf("unsupported compression type %q", oCfg.Compression)
return fmt.Errorf("unsupported compression type %q", e.config.Compression)
}
}

return &exporter{
config: oCfg,
client: client,
logger: logger,
}, nil
e.client = client
return nil
}

func (e *exporter) pushTraceData(ctx context.Context, traces pdata.Traces) error {
Expand Down
18 changes: 14 additions & 4 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,15 @@ func TestCompressionOptions(t *testing.T) {
factory := NewFactory()
cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig())
cfg.Compression = test.compression
exp, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
exp, _ := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
err := exp.Start(context.Background(), componenttest.NewNopHost())
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
if test.err {
assert.Error(t, err)
require.Error(t, err)
return
}
require.NoError(t, err)
startAndCleanup(t, exp)

td := testdata.GenerateTracesOneSpan()
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))
Expand Down Expand Up @@ -451,6 +453,14 @@ func TestErrorResponses(t *testing.T) {
exp, err := createTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)

// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})

// generate traces
traces := pdata.NewTraces()
err = exp.ConsumeTraces(context.Background(), traces)
assert.Error(t, err)
Expand Down
1 change: 1 addition & 0 deletions exporter/zipkinexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func createTracesExporter(
zc,
params.Logger,
ze.pushTraceData,
exporterhelper.WithStart(ze.start),
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithQueue(zc.QueueSettings),
Expand Down
Loading

0 comments on commit 09723e8

Please sign in to comment.