diff --git a/receiver/jaegerreceiver/factory.go b/receiver/jaegerreceiver/factory.go index 8b3f5f15823..4e0e725b7e7 100644 --- a/receiver/jaegerreceiver/factory.go +++ b/receiver/jaegerreceiver/factory.go @@ -111,9 +111,9 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { // CreateTraceReceiver creates a trace receiver based on provided config. func (f *Factory) CreateTraceReceiver( ctx context.Context, - logger *zap.Logger, + params component.ReceiverCreateParams, cfg configmodels.Receiver, - nextConsumer consumer.TraceConsumerOld, + nextConsumer consumer.TraceConsumer, ) (component.TraceReceiver, error) { // Convert settings in the source config to Configuration struct @@ -130,6 +130,7 @@ func (f *Factory) CreateTraceReceiver( config := Configuration{} var grpcServerOptions []grpc.ServerOption + logger := params.Logger // Set ports if protoGRPC != nil && protoGRPC.IsEnabled() { @@ -213,14 +214,15 @@ func (f *Factory) CreateTraceReceiver( } // Create the receiver. - return New(rCfg.Name(), &config, nextConsumer, logger) + return New(rCfg.Name(), &config, nextConsumer, params) } // CreateMetricsReceiver creates a metrics receiver based on provided config. func (f *Factory) CreateMetricsReceiver( - logger *zap.Logger, - cfg configmodels.Receiver, - consumer consumer.MetricsConsumerOld, + _ context.Context, + _ component.ReceiverCreateParams, + _ configmodels.Receiver, + _ consumer.MetricsConsumer, ) (component.MetricsReceiver, error) { return nil, configerror.ErrDataTypeIsNotSupported } diff --git a/receiver/jaegerreceiver/factory_test.go b/receiver/jaegerreceiver/factory_test.go index 2c682fd4580..15046c80446 100644 --- a/receiver/jaegerreceiver/factory_test.go +++ b/receiver/jaegerreceiver/factory_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector/component" "github.com/open-telemetry/opentelemetry-collector/config" "github.com/open-telemetry/opentelemetry-collector/config/configcheck" "github.com/open-telemetry/opentelemetry-collector/config/configerror" @@ -48,11 +49,12 @@ func TestCreateReceiver(t *testing.T) { // have to enable at least one protocol for the jaeger receiver to be created cfg.(*Config).Protocols[protoGRPC], _ = defaultsForProtocol(protoGRPC) - tReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + tReceiver, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.NoError(t, err, "receiver creation failed") assert.NotNil(t, tReceiver, "receiver creation failed") - mReceiver, err := factory.CreateMetricsReceiver(zap.NewNop(), cfg, nil) + mReceiver, err := factory.CreateMetricsReceiver(context.Background(), params, cfg, nil) assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported) assert.Nil(t, mReceiver) } @@ -64,7 +66,8 @@ func TestCreateDefaultGRPCEndpoint(t *testing.T) { rCfg := cfg.(*Config) rCfg.Protocols[protoGRPC], _ = defaultsForProtocol(protoGRPC) - r, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.NoError(t, err, "unexpected error creating receiver") assert.Equal(t, 14250, r.(*jReceiver).config.CollectorGRPCPort, "grpc port should be default") @@ -77,14 +80,15 @@ func TestCreateTLSGPRCEndpoint(t *testing.T) { rCfg.Protocols[protoGRPC], _ = defaultsForProtocol(protoGRPC) rCfg.Protocols[protoGRPC].TLSCredentials = &receiver.TLSCredentials{} - _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "tls-enabled receiver creation with no credentials must fail") rCfg.Protocols[protoGRPC].TLSCredentials = &receiver.TLSCredentials{ CertFile: "./testdata/certificate.pem", KeyFile: "./testdata/key.pem", } - _, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + _, err = factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.NoError(t, err, "tls-enabled receiver creation failed") } @@ -94,7 +98,8 @@ func TestCreateInvalidHTTPEndpoint(t *testing.T) { rCfg := cfg.(*Config) rCfg.Protocols[protoThriftHTTP], _ = defaultsForProtocol(protoThriftHTTP) - r, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.NoError(t, err, "unexpected error creating receiver") assert.Equal(t, 14268, r.(*jReceiver).config.CollectorHTTPPort, "http port should be default") @@ -106,7 +111,8 @@ func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) { rCfg := cfg.(*Config) rCfg.Protocols[protoThriftBinary], _ = defaultsForProtocol(protoThriftBinary) - r, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.NoError(t, err, "unexpected error creating receiver") assert.Equal(t, 6832, r.(*jReceiver).config.AgentBinaryThriftPort, "thrift port should be default") @@ -118,7 +124,8 @@ func TestCreateInvalidThriftCompactEndpoint(t *testing.T) { rCfg := cfg.(*Config) rCfg.Protocols[protoThriftCompact], _ = defaultsForProtocol(protoThriftCompact) - r, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.NoError(t, err, "unexpected error creating receiver") assert.Equal(t, 6831, r.(*jReceiver).config.AgentCompactThriftPort, "thrift port should be default") @@ -134,7 +141,8 @@ func TestDefaultAgentRemoteSamplingHTTPPort(t *testing.T) { rCfg.RemoteSampling = &RemoteSamplingConfig{ FetchEndpoint: endpoint, } - r, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.NoError(t, err, "create trace receiver should not error") assert.Equal(t, endpoint, r.(*jReceiver).config.RemoteSamplingEndpoint) @@ -151,7 +159,8 @@ func TestCreateNoPort(t *testing.T) { Endpoint: "localhost:", }, } - _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "receiver creation with no port number must fail") } @@ -165,7 +174,8 @@ func TestCreateLargePort(t *testing.T) { Endpoint: "localhost:65536", }, } - _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "receiver creation with too large port number must fail") } @@ -179,7 +189,8 @@ func TestCreateInvalidHost(t *testing.T) { Endpoint: "1234", }, } - _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "receiver creation with bad hostname must fail") } @@ -190,7 +201,8 @@ func TestCreateNoProtocols(t *testing.T) { rCfg.Protocols = make(map[string]*receiver.SecureReceiverSettings) - _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "receiver creation with no protocols must fail") } @@ -205,7 +217,8 @@ func TestThriftBinaryBadPort(t *testing.T) { }, } - _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "receiver creation with a bad thrift binary port must fail") } @@ -220,7 +233,8 @@ func TestThriftCompactBadPort(t *testing.T) { }, } - _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "receiver creation with a bad thrift compact port must fail") } @@ -238,7 +252,8 @@ func TestRemoteSamplingConfigPropagation(t *testing.T) { HostEndpoint: fmt.Sprintf("localhost:%d", hostPort), StrategyFile: strategyFile, } - r, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.NoError(t, err, "create trace receiver should not error") assert.Equal(t, endpoint, r.(*jReceiver).config.RemoteSamplingEndpoint) @@ -256,7 +271,8 @@ func TestRemoteSamplingFileRequiresGRPC(t *testing.T) { rCfg.RemoteSampling = &RemoteSamplingConfig{ StrategyFile: strategyFile, } - _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "create trace receiver should error") } diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 92ead691c42..6773f28c212 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -23,19 +23,15 @@ import ( "time" "contrib.go.opencensus.io/exporter/jaeger" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opencensus.io/trace" "go.uber.org/zap" "google.golang.org/grpc" + "github.com/open-telemetry/opentelemetry-collector/component" "github.com/open-telemetry/opentelemetry-collector/component/componenttest" - "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" - "github.com/open-telemetry/opentelemetry-collector/internal" "github.com/open-telemetry/opentelemetry-collector/testutils" ) @@ -55,7 +51,8 @@ func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) { config := &Configuration{ AgentCompactThriftPort: int(port), } - jr, err := New(jaegerAgent, config, nil, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerAgent, config, nil, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") err = jr.Start(context.Background(), componenttest.NewNopHost()) @@ -81,7 +78,8 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { config := &Configuration{ AgentBinaryThriftPort: int(port), } - jr, err := New(jaegerAgent, config, nil, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerAgent, config, nil, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") err = jr.(*jReceiver).startAgent(componenttest.NewNopHost()) @@ -102,7 +100,8 @@ func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) { config := &Configuration{ AgentBinaryThriftPort: int(port), } - jr, err := New(jaegerAgent, config, nil, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerAgent, config, nil, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") err = jr.Start(context.Background(), componenttest.NewNopHost()) @@ -141,7 +140,8 @@ func TestJaegerHTTP(t *testing.T) { AgentHTTPPort: int(port), RemoteSamplingEndpoint: addr.String(), } - jr, err := New(jaegerAgent, config, nil, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerAgent, config, nil, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") defer jr.Shutdown(context.Background()) @@ -176,8 +176,9 @@ func TestJaegerHTTP(t *testing.T) { func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configuration) { // 1. Create the Jaeger receiver aka "server" - sink := new(exportertest.SinkTraceExporterOld) - jr, err := New(jaegerAgent, receiverConfig, sink, zap.NewNop()) + sink := new(exportertest.SinkTraceExporter) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerAgent, receiverConfig, sink, params) assert.NoError(t, err, "Failed to create new Jaeger Receiver") defer jr.Shutdown(context.Background()) @@ -204,49 +205,7 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configu assert.NoError(t, err, "Failed to create the Jaeger OpenCensus exporter for the live application") // 3. Now finally send some spans - spandata := []*trace.SpanData{ - { - SpanContext: trace.SpanContext{ - TraceID: trace.TraceID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x80}, - SpanID: trace.SpanID{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8}, - }, - ParentSpanID: trace.SpanID{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18}, - Name: "DBSearch", - StartTime: now, - EndTime: nowPlus10min, - Status: trace.Status{ - Code: trace.StatusCodeNotFound, - Message: "Stale indices", - }, - Links: []trace.Link{ - { - TraceID: trace.TraceID{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80}, - SpanID: trace.SpanID{0xCF, 0xCE, 0xCD, 0xCC, 0xCB, 0xCA, 0xC9, 0xC8}, - Type: trace.LinkTypeParent, - }, - }, - }, - { - SpanContext: trace.SpanContext{ - TraceID: trace.TraceID{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80}, - SpanID: trace.SpanID{0xCF, 0xCE, 0xCD, 0xCC, 0xCB, 0xCA, 0xC9, 0xC8}, - }, - Name: "ProxyFetch", - StartTime: nowPlus10min, - EndTime: nowPlus10min2sec, - Status: trace.Status{ - Code: trace.StatusCodeInternal, - Message: "Frontend crash", - }, - Links: []trace.Link{ - { - TraceID: trace.TraceID{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80}, - SpanID: trace.SpanID{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8}, - Type: trace.LinkTypeChild, - }, - }, - }, - } + spandata := traceFixture(now, nowPlus10min, nowPlus10min2sec) for _, sd := range spandata { jexp.ExportSpan(sd) @@ -261,86 +220,10 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configu <-time.After(60 * time.Millisecond) } - got := sink.AllTraces() - - want := []consumerdata.TraceData{ - { - Node: &commonpb.Node{ - ServiceInfo: &commonpb.ServiceInfo{Name: "issaTest"}, - LibraryInfo: &commonpb.LibraryInfo{}, - Identifier: &commonpb.ProcessIdentifier{}, - Attributes: map[string]string{ - "bool": "true", - "string": "yes", - "int64": "10000000", - }, - }, + gotTraces := sink.AllTraces() + assert.Equal(t, 1, len(gotTraces)) - Spans: []*tracepb.Span{ - { - TraceId: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x80}, - SpanId: []byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8}, - ParentSpanId: []byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18}, - Name: &tracepb.TruncatableString{Value: "DBSearch"}, - StartTime: internal.TimeToTimestamp(now), - EndTime: internal.TimeToTimestamp(nowPlus10min), - Status: &tracepb.Status{ - Code: trace.StatusCodeNotFound, - Message: "Stale indices", - }, - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - "error": { - Value: &tracepb.AttributeValue_BoolValue{BoolValue: true}, - }, - }, - }, - Links: &tracepb.Span_Links{ - Link: []*tracepb.Span_Link{ - { - TraceId: []byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80}, - SpanId: []byte{0xCF, 0xCE, 0xCD, 0xCC, 0xCB, 0xCA, 0xC9, 0xC8}, - Type: tracepb.Span_Link_PARENT_LINKED_SPAN, - }, - }, - }, - }, - { - TraceId: []byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80}, - SpanId: []byte{0xCF, 0xCE, 0xCD, 0xCC, 0xCB, 0xCA, 0xC9, 0xC8}, - Name: &tracepb.TruncatableString{Value: "ProxyFetch"}, - StartTime: internal.TimeToTimestamp(nowPlus10min), - EndTime: internal.TimeToTimestamp(nowPlus10min2sec), - Status: &tracepb.Status{ - Code: trace.StatusCodeInternal, - Message: "Frontend crash", - }, - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - "error": { - Value: &tracepb.AttributeValue_BoolValue{BoolValue: true}, - }, - }, - }, - Links: &tracepb.Span_Links{ - Link: []*tracepb.Span_Link{ - { - TraceId: []byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80}, - SpanId: []byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8}, - // TODO: (@pjanotti, @odeke-em) contact the Jaeger maintains to inquire about - // Parent_Linked_Spans as currently they've only got: - // * Child_of - // * Follows_from - // yet OpenCensus has Parent too but Jaeger uses a zero-value for LinkCHILD. - Type: tracepb.Span_Link_PARENT_LINKED_SPAN, - }, - }, - }, - }, - }, - SourceFormat: "jaeger", - }, - } + want := expectedTraceData(now, nowPlus10min, nowPlus10min2sec) - assert.EqualValues(t, want, got) + assert.EqualValues(t, want, gotTraces[0]) } diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 6bac2bd3371..a1a5a5394b0 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -78,7 +78,7 @@ type jReceiver struct { // mu protects the fields of this type mu sync.Mutex - nextConsumer consumer.TraceConsumerOld + nextConsumer consumer.TraceConsumer instanceName string startOnce sync.Once @@ -119,8 +119,8 @@ const ( func New( instanceName string, config *Configuration, - nextConsumer consumer.TraceConsumerOld, - logger *zap.Logger, + nextConsumer consumer.TraceConsumer, + params component.ReceiverCreateParams, ) (component.TraceReceiver, error) { return &jReceiver{ config: config, @@ -128,7 +128,7 @@ func New( context.Background(), instanceName, agentTransport, agentReceiverTagValue), nextConsumer: nextConsumer, instanceName: instanceName, - logger: logger, + logger: params.Logger, }, nil } @@ -260,10 +260,10 @@ func (jr *jReceiver) stopTraceReceptionLocked() error { return err } -func consumeTraceData( +func consumeTraces( ctx context.Context, batches []*jaeger.Batch, - consumer consumer.TraceConsumerOld, + consumer consumer.TraceConsumer, ) ([]*jaeger.BatchSubmitResponse, int, error) { jbsr := make([]*jaeger.BatchSubmitResponse, 0, len(batches)) @@ -276,10 +276,8 @@ func consumeTraceData( continue } - // TODO: function below never returns error, change the signature. - td, _ := jaegertranslator.ThriftBatchToOCProto(batch) - td.SourceFormat = "jaeger" - consumerError = consumer.ConsumeTraceData(ctx, td) + td := jaegertranslator.ThriftBatchToInternalTraces(batch) + consumerError = consumer.ConsumeTraces(ctx, td) jsr := batchSubmitOkResponse if consumerError != nil { jsr = batchSubmitNotOkResponse @@ -296,7 +294,7 @@ func (jr *jReceiver) SubmitBatches(batches []*jaeger.Batch, options handler.Subm ctx = obsreport.StartTraceDataReceiveOp( ctx, jr.instanceName, collectorHTTPTransport) - jbsr, numSpans, err := consumeTraceData(ctx, batches, jr.nextConsumer) + jbsr, numSpans, err := consumeTraces(ctx, batches, jr.nextConsumer) obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err) return jbsr, err @@ -317,11 +315,10 @@ func (jr *jReceiver) EmitZipkinBatch(spans []*zipkincore.Span) error { func (jr *jReceiver) EmitBatch(batch *jaeger.Batch) error { ctx := obsreport.StartTraceDataReceiveOp( jr.defaultAgentCtx, jr.instanceName, agentTransport) - // TODO: call below never returns error it remove from the signature - td, _ := jaegertranslator.ThriftBatchToOCProto(batch) - td.SourceFormat = "jaeger" - err := jr.nextConsumer.ConsumeTraceData(ctx, td) + td := jaegertranslator.ThriftBatchToInternalTraces(batch) + + err := jr.nextConsumer.ConsumeTraces(ctx, td) obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, len(batch.Spans), err) return err @@ -351,11 +348,9 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) ctx, jr.instanceName, grpcTransport, collectorReceiverTagValue) ctx = obsreport.StartTraceDataReceiveOp(ctx, jr.instanceName, grpcTransport) - // TODO: the function below never returns error, change its interface. - td, _ := jaegertranslator.ProtoBatchToOCProto(r.GetBatch()) - td.SourceFormat = "jaeger" + td := jaegertranslator.ProtoBatchToInternalTraces(r.GetBatch()) - err := jr.nextConsumer.ConsumeTraceData(ctx, td) + err := jr.nextConsumer.ConsumeTraces(ctx, td) obsreport.EndTraceDataReceiveOp(ctx, protobufFormat, len(r.GetBatch().Spans), err) if err != nil { return nil, err diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index 46de3d34b23..11c223434dd 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -24,10 +24,9 @@ import ( "time" "contrib.go.opencensus.io/exporter/jaeger" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" + otlptrace "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opencensus.io/trace" @@ -35,19 +34,21 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "github.com/open-telemetry/opentelemetry-collector/component" "github.com/open-telemetry/opentelemetry-collector/component/componenttest" - "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" - "github.com/open-telemetry/opentelemetry-collector/internal" "github.com/open-telemetry/opentelemetry-collector/receiver" "github.com/open-telemetry/opentelemetry-collector/testutils" + "github.com/open-telemetry/opentelemetry-collector/translator/conventions" tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace" ) const jaegerReceiver = "jaeger_receiver_test" func TestTraceSource(t *testing.T) { - jr, err := New(jaegerReceiver, &Configuration{}, nil, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerReceiver, &Configuration{}, nil, params) assert.NoError(t, err, "should not have failed to create the Jaeger receiver") require.NotNil(t, jr) } @@ -57,9 +58,10 @@ func TestReception(t *testing.T) { config := &Configuration{ CollectorHTTPPort: 14268, // that's the only one used by this test } - sink := new(exportertest.SinkTraceExporterOld) + sink := new(exportertest.SinkTraceExporter) - jr, err := New(jaegerReceiver, config, sink, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerReceiver, config, sink, params) defer jr.Shutdown(context.Background()) assert.NoError(t, err, "should not have failed to create the Jaeger received") @@ -93,19 +95,21 @@ func TestReception(t *testing.T) { } jexp.Flush() - got := sink.AllTraces() + gotTraces := sink.AllTraces() + assert.Equal(t, 1, len(gotTraces)) want := expectedTraceData(now, nowPlus10min, nowPlus10min2sec) - assert.EqualValues(t, want, got) + assert.EqualValues(t, want, gotTraces[0]) } func TestPortsNotOpen(t *testing.T) { // an empty config should result in no open ports config := &Configuration{} - sink := new(exportertest.SinkTraceExporterOld) + sink := new(exportertest.SinkTraceExporter) - jr, err := New(jaegerReceiver, config, sink, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) @@ -132,9 +136,10 @@ func TestGRPCReception(t *testing.T) { config := &Configuration{ CollectorGRPCPort: 14250, // that's the only one used by this test } - sink := new(exportertest.SinkTraceExporterOld) + sink := new(exportertest.SinkTraceExporter) - jr, err := New(jaegerReceiver, config, sink, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) @@ -161,12 +166,13 @@ func TestGRPCReception(t *testing.T) { assert.NoError(t, err, "should not have failed to post spans") assert.NotNil(t, resp, "response should not have been nil") - got := sink.AllTraces() + gotTraces := sink.AllTraces() + assert.Equal(t, 1, len(gotTraces)) want := expectedTraceData(now, nowPlus10min, nowPlus10min2sec) - assert.Len(t, req.Batch.Spans, len(want[0].Spans), "got a conflicting amount of spans") + assert.Len(t, req.Batch.Spans, want.SpanCount(), "got a conflicting amount of spans") - assert.EqualValues(t, want, got) + assert.EqualValues(t, want, gotTraces[0]) } func TestGRPCReceptionWithTLS(t *testing.T) { @@ -186,9 +192,10 @@ func TestGRPCReceptionWithTLS(t *testing.T) { CollectorGRPCPort: int(port), CollectorGRPCOptions: grpcServerOptions, } - sink := new(exportertest.SinkTraceExporterOld) + sink := new(exportertest.SinkTraceExporter) - jr, err := New(jaegerReceiver, config, sink, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) @@ -217,81 +224,58 @@ func TestGRPCReceptionWithTLS(t *testing.T) { assert.NoError(t, err, "should not have failed to post spans") assert.NotNil(t, resp, "response should not have been nil") - got := sink.AllTraces() + gotTraces := sink.AllTraces() + assert.Equal(t, 1, len(gotTraces)) want := expectedTraceData(now, nowPlus10min, nowPlus10min2sec) - assert.Len(t, req.Batch.Spans, len(want[0].Spans), "got a conflicting amount of spans") - assert.EqualValues(t, want, got) + assert.Len(t, req.Batch.Spans, want.SpanCount(), "got a conflicting amount of spans") + assert.EqualValues(t, want, gotTraces[0]) } -func expectedTraceData(t1, t2, t3 time.Time) []consumerdata.TraceData { - traceID := []byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80} - parentSpanID := []byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18} - childSpanID := []byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8} - - return []consumerdata.TraceData{ - { - Node: &commonpb.Node{ - ServiceInfo: &commonpb.ServiceInfo{Name: "issaTest"}, - LibraryInfo: &commonpb.LibraryInfo{}, - Identifier: &commonpb.ProcessIdentifier{}, - Attributes: map[string]string{ - "bool": "true", - "string": "yes", - "int64": "10000000", - }, - }, - Spans: []*tracepb.Span{ - { - TraceId: traceID, - SpanId: childSpanID, - ParentSpanId: parentSpanID, - Name: &tracepb.TruncatableString{Value: "DBSearch"}, - StartTime: internal.TimeToTimestamp(t1), - EndTime: internal.TimeToTimestamp(t2), - Status: &tracepb.Status{ - Code: trace.StatusCodeNotFound, - Message: "Stale indices", - }, - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - "error": { - Value: &tracepb.AttributeValue_BoolValue{BoolValue: true}, - }, - }, - }, - Links: &tracepb.Span_Links{ - Link: []*tracepb.Span_Link{ - { - TraceId: traceID, - SpanId: parentSpanID, - Type: tracepb.Span_Link_PARENT_LINKED_SPAN, - }, - }, - }, - }, - { - TraceId: traceID, - SpanId: parentSpanID, - Name: &tracepb.TruncatableString{Value: "ProxyFetch"}, - StartTime: internal.TimeToTimestamp(t2), - EndTime: internal.TimeToTimestamp(t3), - Status: &tracepb.Status{ - Code: trace.StatusCodeInternal, - Message: "Frontend crash", - }, - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - "error": { - Value: &tracepb.AttributeValue_BoolValue{BoolValue: true}, - }, - }, - }, - }, - }, - SourceFormat: "jaeger", - }, - } +func expectedTraceData(t1, t2, t3 time.Time) pdata.Traces { + traceID := pdata.TraceID( + []byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80}) + parentSpanID := pdata.SpanID([]byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18}) + childSpanID := pdata.SpanID([]byte{0xAF, 0xAE, 0xAD, 0xAC, 0xAB, 0xAA, 0xA9, 0xA8}) + + traces := pdata.NewTraces() + traces.ResourceSpans().Resize(1) + rs := traces.ResourceSpans().At(0) + rs.Resource().InitEmpty() + rs.Resource().Attributes().InsertString(conventions.AttributeServiceName, "issaTest") + rs.Resource().Attributes().InsertBool("bool", true) + rs.Resource().Attributes().InsertString("string", "yes") + rs.Resource().Attributes().InsertInt("int64", 10000000) + rs.InstrumentationLibrarySpans().Resize(1) + rs.InstrumentationLibrarySpans().At(0).Spans().Resize(2) + + span0 := rs.InstrumentationLibrarySpans().At(0).Spans().At(0) + span0.SetSpanID(childSpanID) + span0.SetParentSpanID(parentSpanID) + span0.SetTraceID(traceID) + span0.SetName("DBSearch") + span0.SetStartTime(pdata.TimestampUnixNano(uint64(t1.UnixNano()))) + span0.SetEndTime(pdata.TimestampUnixNano(uint64(t2.UnixNano()))) + span0.Status().InitEmpty() + span0.Status().SetCode(pdata.StatusCode(otlptrace.Status_NotFound)) + span0.Status().SetMessage("Stale indices") + span0.Attributes().InsertBool("error", true) + span0.Links().Resize(1) + span0.Links().At(0).SetTraceID(traceID) + span0.Links().At(0).SetSpanID(parentSpanID) + + span1 := rs.InstrumentationLibrarySpans().At(0).Spans().At(1) + span1.SetSpanID(parentSpanID) + span1.SetTraceID(traceID) + span1.SetName("ProxyFetch") + span1.SetStartTime(pdata.TimestampUnixNano(uint64(t2.UnixNano()))) + span1.SetEndTime(pdata.TimestampUnixNano(uint64(t3.UnixNano()))) + span1.Status().InitEmpty() + span1.Status().SetCode(pdata.StatusCode(otlptrace.Status_InternalError)) + span1.Status().SetMessage("Frontend crash") + span1.Attributes().InsertBool("error", true) + + return traces } func traceFixture(t1, t2, t3 time.Time) []*trace.SpanData { @@ -396,9 +380,10 @@ func TestSampling(t *testing.T) { CollectorGRPCPort: int(port), RemoteSamplingStrategyFile: "testdata/strategies.json", } - sink := new(exportertest.SinkTraceExporterOld) + sink := new(exportertest.SinkTraceExporter) - jr, err := New(jaegerReceiver, config, sink, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) @@ -448,9 +433,10 @@ func TestSamplingFailsOnNotConfigured(t *testing.T) { config := &Configuration{ CollectorGRPCPort: int(port), } - sink := new(exportertest.SinkTraceExporterOld) + sink := new(exportertest.SinkTraceExporter) - jr, err := New(jaegerReceiver, config, sink, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) @@ -476,9 +462,10 @@ func TestSamplingFailsOnBadFile(t *testing.T) { CollectorGRPCPort: int(port), RemoteSamplingStrategyFile: "does-not-exist", } - sink := new(exportertest.SinkTraceExporterOld) + sink := new(exportertest.SinkTraceExporter) - jr, err := New(jaegerReceiver, config, sink, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr, err := New(jaegerReceiver, config, sink, params) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.Shutdown(context.Background()) assert.Error(t, jr.Start(context.Background(), componenttest.NewNopHost())) diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 25e8ea9b9ee..cb2c3e3d170 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -125,7 +125,8 @@ func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsume CollectorGRPCPort: jr.Port, } var err error - jr.receiver, err = jaegerreceiver.New("jaeger", &jaegerCfg, tc, zap.NewNop()) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + jr.receiver, err = jaegerreceiver.New("jaeger", &jaegerCfg, tc, params) if err != nil { return err } diff --git a/translator/trace/jaeger/jaegerproto_to_traces.go b/translator/trace/jaeger/jaegerproto_to_traces.go new file mode 100644 index 00000000000..bbb620c5ac5 --- /dev/null +++ b/translator/trace/jaeger/jaegerproto_to_traces.go @@ -0,0 +1,272 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jaeger + +import ( + "encoding/base64" + "fmt" + "reflect" + "strconv" + + "github.com/jaegertracing/jaeger/model" + otlptrace "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1" + + "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" + "github.com/open-telemetry/opentelemetry-collector/translator/conventions" + tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace" +) + +func ProtoBatchToInternalTraces(batch model.Batch) pdata.Traces { + traceData := pdata.NewTraces() + jProcess := batch.GetProcess() + jSpans := batch.GetSpans() + + if jProcess == nil && len(jSpans) == 0 { + return traceData + } + + rss := traceData.ResourceSpans() + rss.Resize(1) + rs := rss.At(0) + jProcessToInternalResource(jProcess, rs.Resource()) + + if len(jSpans) == 0 { + return traceData + } + + ilss := rs.InstrumentationLibrarySpans() + ilss.Resize(1) + jSpansToInternal(jSpans, ilss.At(0).Spans()) + + return traceData +} + +func jProcessToInternalResource(process *model.Process, dest pdata.Resource) { + if process == nil { + return + } + + dest.InitEmpty() + + serviceName := process.GetServiceName() + tags := process.GetTags() + if serviceName == "" && tags == nil { + return + } + + attrs := dest.Attributes() + + if serviceName != "" { + attrs.UpsertString(conventions.AttributeServiceName, serviceName) + } + + for _, tag := range tags { + + // Handle special keys in the tags. + switch tag.GetKey() { + case "hostname": + attrs.UpsertString(conventions.AttributeHostHostname, tag.GetVStr()) + continue + case "jaeger.version": + attrs.UpsertString(conventions.OCAttributeExporterVersion, "Jaeger-"+tag.GetVStr()) + continue + } + + switch tag.GetVType() { + case model.ValueType_STRING: + attrs.UpsertString(tag.Key, tag.GetVStr()) + case model.ValueType_BOOL: + attrs.UpsertBool(tag.Key, tag.GetVBool()) + case model.ValueType_INT64: + attrs.UpsertInt(tag.Key, tag.GetVInt64()) + case model.ValueType_FLOAT64: + attrs.UpsertDouble(tag.Key, tag.GetVFloat64()) + case model.ValueType_BINARY: + attrs.UpsertString(tag.Key, base64.StdEncoding.EncodeToString(tag.GetVBinary())) + default: + attrs.UpsertString(tag.Key, fmt.Sprintf("", tag.GetVType())) + } + } +} + +func jSpansToInternal(spans []*model.Span, dest pdata.SpanSlice) { + if len(spans) == 0 { + return + } + + dest.Resize(len(spans)) + i := 0 + for _, span := range spans { + if span == nil || reflect.DeepEqual(span, blankJaegerProtoSpan) { + continue + } + jSpanToInternal(span, dest.At(i)) + i++ + } + + if i < len(spans) { + dest.Resize(i) + } +} + +func jSpanToInternal(span *model.Span, dest pdata.Span) { + dest.SetTraceID(pdata.TraceID(tracetranslator.UInt64ToByteTraceID(span.TraceID.High, span.TraceID.Low))) + dest.SetSpanID(pdata.SpanID(tracetranslator.UInt64ToByteSpanID(uint64(span.SpanID)))) + dest.SetName(span.OperationName) + dest.SetStartTime(pdata.TimestampUnixNano(uint64(span.StartTime.UnixNano()))) + dest.SetEndTime(pdata.TimestampUnixNano(uint64(span.StartTime.Add(span.Duration).UnixNano()))) + + parentSpanID := uint64(span.ParentSpanID()) + if parentSpanID != 0 { + dest.SetParentSpanID(pdata.SpanID(tracetranslator.UInt64ToByteSpanID(parentSpanID))) + } + + attrs := jTagsToInternalAttributes(span.Tags) + setInternalSpanStatus(attrs, dest.Status()) + if spanKindAttr, ok := attrs[tracetranslator.TagSpanKind]; ok { + dest.SetKind(jSpanKindToInternal(spanKindAttr.StringVal())) + } + dest.Attributes().InitFromMap(attrs) + + jLogsToSpanEvents(span.Logs, dest.Events()) + jReferencesToSpanLinks(span.References, dest.Links()) +} + +func jTagsToInternalAttributes(tags []model.KeyValue) map[string]pdata.AttributeValue { + attrs := make(map[string]pdata.AttributeValue) + + for _, tag := range tags { + switch tag.GetVType() { + case model.ValueType_STRING: + attrs[tag.Key] = pdata.NewAttributeValueString(tag.GetVStr()) + case model.ValueType_BOOL: + attrs[tag.Key] = pdata.NewAttributeValueBool(tag.GetVBool()) + case model.ValueType_INT64: + attrs[tag.Key] = pdata.NewAttributeValueInt(tag.GetVInt64()) + case model.ValueType_FLOAT64: + attrs[tag.Key] = pdata.NewAttributeValueDouble(tag.GetVFloat64()) + case model.ValueType_BINARY: + attrs[tag.Key] = pdata.NewAttributeValueString(base64.StdEncoding.EncodeToString(tag.GetVBinary())) + default: + attrs[tag.Key] = pdata.NewAttributeValueString(fmt.Sprintf("", tag.GetVType())) + } + } + + return attrs +} + +func setInternalSpanStatus(attrs map[string]pdata.AttributeValue, dest pdata.SpanStatus) { + dest.InitEmpty() + + var codeSet bool + if codeAttr, ok := attrs[tracetranslator.TagStatusCode]; ok { + code, err := getStatusCodeFromAttr(codeAttr) + if err == nil { + codeSet = true + dest.SetCode(code) + delete(attrs, tracetranslator.TagStatusCode) + } + } else if errorVal, ok := attrs[tracetranslator.TagError]; ok { + if errorVal.BoolVal() { + dest.SetCode(pdata.StatusCode(otlptrace.Status_UnknownError)) + codeSet = true + } + } + + if codeSet { + if msgAttr, ok := attrs[tracetranslator.TagStatusMsg]; ok { + dest.SetMessage(msgAttr.StringVal()) + delete(attrs, tracetranslator.TagStatusMsg) + } + } else { + codeAttr, ok := attrs[tracetranslator.TagHTTPStatusCode] + if ok { + code, err := getStatusCodeFromAttr(codeAttr) + if err == nil { + dest.SetCode(code) + } + } else { + dest.SetCode(pdata.StatusCode(otlptrace.Status_Ok)) + } + if msgAttr, ok := attrs[tracetranslator.TagHTTPStatusMsg]; ok { + dest.SetMessage(msgAttr.StringVal()) + } + } + +} + +func getStatusCodeFromAttr(attrVal pdata.AttributeValue) (pdata.StatusCode, error) { + switch attrVal.Type() { + case pdata.AttributeValueINT: + return pdata.StatusCode(attrVal.IntVal()), nil + case pdata.AttributeValueSTRING: + i, err := strconv.Atoi(attrVal.StringVal()) + if err != nil { + return pdata.StatusCode(0), err + } + return pdata.StatusCode(i), nil + } + return pdata.StatusCode(0), fmt.Errorf("invalid attribute type") +} + +func jSpanKindToInternal(spanKind string) pdata.SpanKind { + switch spanKind { + case "client": + return pdata.SpanKindCLIENT + case "server": + return pdata.SpanKindSERVER + case "producer": + return pdata.SpanKindPRODUCER + case "consumer": + return pdata.SpanKindCONSUMER + } + return pdata.SpanKindUNSPECIFIED +} + +func jLogsToSpanEvents(logs []model.Log, dest pdata.SpanEventSlice) { + if len(logs) == 0 { + return + } + + dest.Resize(len(logs)) + + for i, log := range logs { + event := dest.At(i) + + event.SetTimestamp(pdata.TimestampUnixNano(uint64(log.Timestamp.UnixNano()))) + + attrs := jTagsToInternalAttributes(log.Fields) + if name, ok := attrs["message"]; ok { + event.SetName(name.StringVal()) + } + if len(attrs) > 0 { + event.Attributes().InitFromMap(attrs) + } + } +} + +// jReferencesToSpanLinks translates span links and sets ParentID if found +func jReferencesToSpanLinks(refs []model.SpanRef, dest pdata.SpanLinkSlice) { + if len(refs) == 0 { + return + } + + dest.Resize(len(refs)) + for i, ref := range refs { + link := dest.At(i) + link.SetTraceID(pdata.NewTraceID(tracetranslator.UInt64ToByteTraceID(ref.TraceID.High, ref.TraceID.Low))) + link.SetSpanID(pdata.NewSpanID(tracetranslator.UInt64ToByteSpanID(uint64(ref.SpanID)))) + } +} diff --git a/translator/trace/jaeger/jaegerthrift_to_traces.go b/translator/trace/jaeger/jaegerthrift_to_traces.go new file mode 100644 index 00000000000..bed5a439643 --- /dev/null +++ b/translator/trace/jaeger/jaegerthrift_to_traces.go @@ -0,0 +1,205 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jaeger + +import ( + "encoding/base64" + "fmt" + "reflect" + + "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + + "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" + "github.com/open-telemetry/opentelemetry-collector/translator/conventions" + tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace" +) + +func ThriftBatchToInternalTraces(batch *jaeger.Batch) pdata.Traces { + traceData := pdata.NewTraces() + jProcess := batch.GetProcess() + jSpans := batch.GetSpans() + + if jProcess == nil && len(jSpans) == 0 { + return traceData + } + + rss := traceData.ResourceSpans() + rss.Resize(1) + rs := rss.At(0) + jThriftProcessToInternalResource(jProcess, rs.Resource()) + + if len(jSpans) == 0 { + return traceData + } + + ilss := rs.InstrumentationLibrarySpans() + ilss.Resize(1) + jThriftSpansToInternal(jSpans, ilss.At(0).Spans()) + + return traceData +} + +func jThriftProcessToInternalResource(process *jaeger.Process, dest pdata.Resource) { + if process == nil { + return + } + + dest.InitEmpty() + + serviceName := process.GetServiceName() + tags := process.GetTags() + if serviceName == "" && tags == nil { + return + } + + attrs := dest.Attributes() + + if serviceName != "" { + attrs.UpsertString(conventions.AttributeServiceName, serviceName) + } + + for _, tag := range tags { + + // Handle special keys in the tags. + switch tag.GetKey() { + case "hostname": + attrs.UpsertString(conventions.AttributeHostHostname, tag.GetVStr()) + continue + case "jaeger.version": + attrs.UpsertString(conventions.OCAttributeExporterVersion, "Jaeger-"+tag.GetVStr()) + continue + } + + switch tag.GetVType() { + case jaeger.TagType_STRING: + attrs.UpsertString(tag.Key, tag.GetVStr()) + case jaeger.TagType_BOOL: + attrs.UpsertBool(tag.Key, tag.GetVBool()) + case jaeger.TagType_LONG: + attrs.UpsertInt(tag.Key, tag.GetVLong()) + case jaeger.TagType_DOUBLE: + attrs.UpsertDouble(tag.Key, tag.GetVDouble()) + case jaeger.TagType_BINARY: + attrs.UpsertString(tag.Key, base64.StdEncoding.EncodeToString(tag.GetVBinary())) + default: + attrs.UpsertString(tag.Key, fmt.Sprintf("", tag.GetVType())) + } + } +} + +func jThriftSpansToInternal(spans []*jaeger.Span, dest pdata.SpanSlice) { + if len(spans) == 0 { + return + } + + dest.Resize(len(spans)) + i := 0 + for _, span := range spans { + if span == nil || reflect.DeepEqual(span, blankJaegerProtoSpan) { + continue + } + jThriftSpanToInternal(span, dest.At(i)) + i++ + } + + if i < len(spans) { + dest.Resize(i) + } +} + +func jThriftSpanToInternal(span *jaeger.Span, dest pdata.Span) { + dest.SetTraceID(pdata.TraceID(tracetranslator.Int64ToByteTraceID(span.TraceIdHigh, span.TraceIdLow))) + dest.SetSpanID(pdata.SpanID(tracetranslator.Int64ToByteSpanID(span.SpanId))) + dest.SetName(span.OperationName) + dest.SetStartTime(msToUnixNano(span.StartTime)) + dest.SetEndTime(msToUnixNano(span.StartTime + span.Duration)) + + parentSpanID := uint64(span.ParentSpanId) + if parentSpanID != 0 { + dest.SetParentSpanID(pdata.SpanID(tracetranslator.UInt64ToByteSpanID(parentSpanID))) + } + + attrs := jThriftTagsToInternalAttributes(span.Tags) + setInternalSpanStatus(attrs, dest.Status()) + if spanKindAttr, ok := attrs[tracetranslator.TagSpanKind]; ok { + dest.SetKind(jSpanKindToInternal(spanKindAttr.StringVal())) + } + dest.Attributes().InitFromMap(attrs) + + jThriftLogsToSpanEvents(span.Logs, dest.Events()) + jThriftReferencesToSpanLinks(span.References, dest.Links()) +} + +func jThriftTagsToInternalAttributes(tags []*jaeger.Tag) map[string]pdata.AttributeValue { + attrs := make(map[string]pdata.AttributeValue) + + for _, tag := range tags { + switch tag.GetVType() { + case jaeger.TagType_STRING: + attrs[tag.Key] = pdata.NewAttributeValueString(tag.GetVStr()) + case jaeger.TagType_BOOL: + attrs[tag.Key] = pdata.NewAttributeValueBool(tag.GetVBool()) + case jaeger.TagType_LONG: + attrs[tag.Key] = pdata.NewAttributeValueInt(tag.GetVLong()) + case jaeger.TagType_DOUBLE: + attrs[tag.Key] = pdata.NewAttributeValueDouble(tag.GetVDouble()) + case jaeger.TagType_BINARY: + attrs[tag.Key] = pdata.NewAttributeValueString(base64.StdEncoding.EncodeToString(tag.GetVBinary())) + default: + attrs[tag.Key] = pdata.NewAttributeValueString(fmt.Sprintf("", tag.GetVType())) + } + } + + return attrs +} + +func jThriftLogsToSpanEvents(logs []*jaeger.Log, dest pdata.SpanEventSlice) { + if len(logs) == 0 { + return + } + + dest.Resize(len(logs)) + + for i, log := range logs { + event := dest.At(i) + + event.SetTimestamp(msToUnixNano(log.Timestamp)) + + attrs := jThriftTagsToInternalAttributes(log.Fields) + if name, ok := attrs["message"]; ok { + event.SetName(name.StringVal()) + } + if len(attrs) > 0 { + event.Attributes().InitFromMap(attrs) + } + } +} + +func jThriftReferencesToSpanLinks(refs []*jaeger.SpanRef, dest pdata.SpanLinkSlice) { + if len(refs) == 0 { + return + } + + dest.Resize(len(refs)) + for i, ref := range refs { + link := dest.At(i) + link.SetTraceID(pdata.NewTraceID(tracetranslator.Int64ToByteTraceID(ref.TraceIdHigh, ref.TraceIdLow))) + link.SetSpanID(pdata.NewSpanID(tracetranslator.Int64ToByteSpanID(ref.SpanId))) + } +} + +func msToUnixNano(ms int64) pdata.TimestampUnixNano { + return pdata.TimestampUnixNano(uint64(ms) * 1000) +}