From 6640ea1276d571ca6f1d8a831db16b1330c498a8 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 22 Nov 2019 13:21:20 -0500 Subject: [PATCH 01/29] Added agent protocol config wiring. Removed agent http server Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/factory.go | 28 ++++++++++++++++++++--- receiver/jaegerreceiver/trace_receiver.go | 17 -------------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/receiver/jaegerreceiver/factory.go b/receiver/jaegerreceiver/factory.go index 29fb717c2d9..099ff65f75d 100644 --- a/receiver/jaegerreceiver/factory.go +++ b/receiver/jaegerreceiver/factory.go @@ -41,6 +41,8 @@ const ( // TODO https://github.com/open-telemetry/opentelemetry-collector/issues/267 // Remove ThriftTChannel support. protoThriftTChannel = "thrift-tchannel" + protoThriftBinary = "thrift-binary" + protoThriftCompact = "thrift-compact" // Default endpoints to bind to. defaultGRPCBindEndpoint = "localhost:14250" @@ -103,6 +105,8 @@ func (f *Factory) CreateTraceReceiver( protoGRPC := rCfg.Protocols[protoGRPC] protoHTTP := rCfg.Protocols[protoThriftHTTP] protoTChannel := rCfg.Protocols[protoThriftTChannel] + protoThriftCompact := rCfg.Protocols[protoThriftCompact] + protoThriftBinary := rCfg.Protocols[protoThriftBinary] config := Configuration{} var grpcServerOptions []grpc.ServerOption @@ -141,12 +145,30 @@ func (f *Factory) CreateTraceReceiver( } } - if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil) || - (config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0) { - err := fmt.Errorf("either %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver", + if protoThriftBinary != nil && protoThriftBinary.IsEnabled() { + var err error + config.AgentBinaryThriftPort, err = extractPortFromEndpoint(protoThriftBinary.Endpoint) + if err != nil { + return nil, err + } + } + + if protoThriftCompact != nil && protoThriftCompact.IsEnabled() { + var err error + config.AgentCompactThriftPort, err = extractPortFromEndpoint(protoThriftCompact.Endpoint) + if err != nil { + return nil, err + } + } + + if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil && protoThriftBinary == nil && protoThriftCompact == nil) || + (config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0 && config.AgentBinaryThriftPort == 0 && config.AgentCompactThriftPort == 0) { + err := fmt.Errorf("either %v, %v, %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver", protoGRPC, protoThriftHTTP, protoThriftTChannel, + protoThriftCompact, + protoThriftBinary, typeStr, ) return nil, err diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 249995199a2..9ad27a358ad 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -54,7 +54,6 @@ type Configuration struct { CollectorGRPCPort int CollectorGRPCOptions []grpc.ServerOption - AgentPort int AgentCompactThriftPort int AgentBinaryThriftPort int } @@ -131,19 +130,6 @@ func (jr *jReceiver) collectorAddr() string { return fmt.Sprintf(":%d", port) } -const defaultAgentPort = 5778 - -func (jr *jReceiver) agentAddress() string { - var port int - if jr.config != nil { - port = jr.config.AgentPort - } - if port <= 0 { - port = defaultAgentPort - } - return fmt.Sprintf(":%d", port) -} - // TODO https://github.com/open-telemetry/opentelemetry-collector/issues/267 // Remove ThriftTChannel support. func (jr *jReceiver) tchannelAddr() string { @@ -383,9 +369,6 @@ func (jr *jReceiver) startAgent(_ receiver.Host) error { builder := agentapp.Builder{ Processors: processorConfigs, - HTTPServer: agentapp.HTTPServerConfiguration{ - HostPort: jr.agentAddress(), - }, } agent, err := builder.CreateAgent(jr, zap.NewNop(), metrics.NullFactory) From 46dbf3574d88a8f1ebbdd921a0174e720f483e8e Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 22 Nov 2019 13:29:44 -0500 Subject: [PATCH 02/29] Conditionally start agent protocols based on config Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/trace_receiver.go | 48 +++++++++++------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 9ad27a358ad..76aaf2c4997 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -95,13 +95,6 @@ const ( // By default, can accept spans directly from clients in jaeger.thrift format over binary thrift protocol defaultCollectorHTTPPort = 14268 - // As per https://www.jaegertracing.io/docs/1.7/deployment/#agent - // 5775 UDP accept zipkin.thrift over compact thrift protocol - // 6831 UDP accept jaeger.thrift over compact thrift protocol - // 6832 UDP accept jaeger.thrift over binary thrift protocol - defaultCompactThriftUDPPort = 6831 - defaultBinaryThriftUDPPort = 6832 - traceSource string = "Jaeger" ) @@ -159,23 +152,25 @@ func (jr *jReceiver) agentCompactThriftAddr() string { if jr.config != nil { port = jr.config.AgentCompactThriftPort } - if port <= 0 { - port = defaultCompactThriftUDPPort - } return fmt.Sprintf(":%d", port) } +func (jr *jReceiver) agentCompactThriftEnabled() bool { + return jr.config != nil && jr.config.AgentCompactThriftPort > 0 +} + func (jr *jReceiver) agentBinaryThriftAddr() string { var port int if jr.config != nil { port = jr.config.AgentBinaryThriftPort } - if port <= 0 { - port = defaultBinaryThriftUDPPort - } return fmt.Sprintf(":%d", port) } +func (jr *jReceiver) agentBinaryThriftEnabled() bool { + return jr.config != nil && jr.config.AgentBinaryThriftPort > 0 +} + func (jr *jReceiver) TraceSource() string { return traceSource } @@ -348,23 +343,28 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) } func (jr *jReceiver) startAgent(_ receiver.Host) error { - processorConfigs := []agentapp.ProcessorConfiguration{ - { - // Compact Thrift running by default on 6831. - Model: "jaeger", - Protocol: "compact", - Server: agentapp.ServerConfiguration{ - HostPort: jr.agentCompactThriftAddr(), - }, - }, - { + processorConfigs := []agentapp.ProcessorConfiguration{} + + if jr.agentBinaryThriftEnabled() { + processorConfigs = append(processorConfigs, agentapp.ProcessorConfiguration{ // Binary Thrift running by default on 6832. Model: "jaeger", Protocol: "binary", Server: agentapp.ServerConfiguration{ HostPort: jr.agentBinaryThriftAddr(), }, - }, + }) + } + + if jr.agentCompactThriftEnabled() { + processorConfigs = append(processorConfigs, agentapp.ProcessorConfiguration{ + // Compact Thrift running by default on 6831. + Model: "jaeger", + Protocol: "compact", + Server: agentapp.ServerConfiguration{ + HostPort: jr.agentCompactThriftAddr(), + }, + }) } builder := agentapp.Builder{ From c7053023fbab23d6a68059fe2fe428a1a3f06b45 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 22 Nov 2019 13:31:14 -0500 Subject: [PATCH 03/29] Bail out of startAgent if nothing is configured Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/trace_receiver.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 76aaf2c4997..70560d52ba9 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -343,6 +343,10 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) } func (jr *jReceiver) startAgent(_ receiver.Host) error { + if !jr.agentBinaryThriftEnabled() && !jr.agentCompactThriftEnabled() { + return nil + } + processorConfigs := []agentapp.ProcessorConfiguration{} if jr.agentBinaryThriftEnabled() { From fd4799f00c6bde2682ccd45af99a9806d3ad8359 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 22 Nov 2019 13:49:44 -0500 Subject: [PATCH 04/29] Updated readme Signed-off-by: Joe Elliott --- README.md | 4 ++++ receiver/README.md | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5fbf33a28ff..019ac4c6d72 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,10 @@ receivers: endpoint: "localhost:14268" thrift-tchannel: endpoint: "localhost:14267" + thrift-compact: + endpoint: "localhost:6831" + thrift-binary: + endpoint: "localhost:6832" prometheus: config: diff --git a/receiver/README.md b/receiver/README.md index 7ed685da0db..df3dbf4f65f 100644 --- a/receiver/README.md +++ b/receiver/README.md @@ -107,7 +107,7 @@ This receiver receives traces in the [Jaeger](https://www.jaegertracing.io) format. It translates them into the internal format and sends it to processors and exporters. -It supports multiple protocols: +It supports the Jaeger Collector protocols: - Thrift HTTP - Thrift TChannel - gRPC @@ -124,6 +124,12 @@ It is possible to configure the protocols on different ports, refer to [config.yaml](jaegerreceiver/testdata/config.yaml) for detailed config examples. +It also supports the Jaeger Agent protocols: +- Thrift Compact +- Thrift Binary + +By default, these services are not started unless an endpoint is explicitly defined. + // TODO Issue https://github.com/open-telemetry/opentelemetry-collector/issues/158 // The Jaeger receiver enables all protocols even when one is specified or a // subset is enabled. The documentation should be updated when that fix occurs. From b131c9bd479f72221529ba1b4019a608d9e7323f Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Sat, 23 Nov 2019 10:49:54 -0500 Subject: [PATCH 05/29] Added tests Signed-off-by: Joe Elliott --- .../jaegerreceiver/trace_receiver_test.go | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index 910236b79b4..cadf76e6cdb 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -196,6 +196,114 @@ func TestGRPCReceptionWithTLS(t *testing.T) { assert.Equal(t, "", cmp.Diff(got, want)) } +func TestThriftCompactReception(t *testing.T) { + // 1. Create the Jaeger receiver aka "server" + config := &Configuration{ + AgentCompactThriftPort: 6831, // that's the only one used by this test + } + sink := new(exportertest.SinkTraceExporter) + + jr, err := New(context.Background(), config, sink) + defer jr.StopTraceReception() + assert.NoError(t, err, "should not have failed to create the Jaeger received") + + t.Log("Starting") + + mh := receivertest.NewMockHost() + err = jr.StartTraceReception(mh) + assert.NoError(t, err, "should not have failed to start trace reception") + + t.Log("StartTraceReception") + + now := time.Unix(1542158650, 536343000).UTC() + nowPlus10min := now.Add(10 * time.Minute) + nowPlus10min2sec := now.Add(10 * time.Minute).Add(2 * time.Second) + + // 2. Then with a "live application", send spans to the Jaeger exporter. + jexp, err := jaeger.NewExporter(jaeger.Options{ + Process: jaeger.Process{ + ServiceName: "issaTest", + Tags: []jaeger.Tag{ + jaeger.BoolTag("bool", true), + jaeger.StringTag("string", "yes"), + jaeger.Int64Tag("int64", 1e7), + }, + }, + AgentEndpoint: fmt.Sprintf("localhost:%d", config.AgentCompactThriftPort), + }) + assert.NoError(t, err, "should not have failed to create the Jaeger OpenCensus exporter") + + // 3. Now finally send some spans + for _, sd := range traceFixture(now, nowPlus10min, nowPlus10min2sec) { + jexp.ExportSpan(sd) + } + jexp.Flush() + + // sleep for one second to allow UDP to send. + time.Sleep(1 * time.Second) + + got := sink.AllTraces() + want := expectedTraceData(now, nowPlus10min, nowPlus10min2sec) + + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("Mismatched responses\n-Got +Want:\n\t%s", diff) + } +} + +func TestThriftBinaryReception(t *testing.T) { + // 1. Create the Jaeger receiver aka "server" + config := &Configuration{ + AgentBinaryThriftPort: 6832, // that's the only one used by this test + } + sink := new(exportertest.SinkTraceExporter) + + jr, err := New(context.Background(), config, sink) + defer jr.StopTraceReception() + assert.NoError(t, err, "should not have failed to create the Jaeger received") + + t.Log("Starting") + + mh := receivertest.NewMockHost() + err = jr.StartTraceReception(mh) + assert.NoError(t, err, "should not have failed to start trace reception") + + t.Log("StartTraceReception") + + now := time.Unix(1542158650, 536343000).UTC() + nowPlus10min := now.Add(10 * time.Minute) + nowPlus10min2sec := now.Add(10 * time.Minute).Add(2 * time.Second) + + // 2. Then with a "live application", send spans to the Jaeger exporter. + jexp, err := jaeger.NewExporter(jaeger.Options{ + Process: jaeger.Process{ + ServiceName: "issaTest", + Tags: []jaeger.Tag{ + jaeger.BoolTag("bool", true), + jaeger.StringTag("string", "yes"), + jaeger.Int64Tag("int64", 1e7), + }, + }, + AgentEndpoint: fmt.Sprintf("localhost:%d", config.AgentBinaryThriftPort), + }) + assert.NoError(t, err, "should not have failed to create the Jaeger OpenCensus exporter") + + // 3. Now finally send some spans + for _, sd := range traceFixture(now, nowPlus10min, nowPlus10min2sec) { + jexp.ExportSpan(sd) + } + jexp.Flush() + + // sleep for one second to allow UDP to send. + time.Sleep(1 * time.Second) + + got := sink.AllTraces() + want := expectedTraceData(now, nowPlus10min, nowPlus10min2sec) + + if diff := cmp.Diff(got, want); diff != "" { + t.Errorf("Mismatched responses\n-Got +Want:\n\t%s", diff) + } +} + func expectedTraceData(t1, t2, t3 time.Time) []consumerdata.TraceData { traceID := []byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80} parentSpanID := []byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18} From 88a60d0d6b3380c6d1efd095009e79e3bcb112e5 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Sat, 23 Nov 2019 11:14:20 -0500 Subject: [PATCH 06/29] Removed binary test because opencensus jaeger exporter doesn't support it Signed-off-by: Joe Elliott --- .../jaegerreceiver/trace_receiver_test.go | 54 ------------------- 1 file changed, 54 deletions(-) diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index cadf76e6cdb..f2be11f990f 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -250,60 +250,6 @@ func TestThriftCompactReception(t *testing.T) { } } -func TestThriftBinaryReception(t *testing.T) { - // 1. Create the Jaeger receiver aka "server" - config := &Configuration{ - AgentBinaryThriftPort: 6832, // that's the only one used by this test - } - sink := new(exportertest.SinkTraceExporter) - - jr, err := New(context.Background(), config, sink) - defer jr.StopTraceReception() - assert.NoError(t, err, "should not have failed to create the Jaeger received") - - t.Log("Starting") - - mh := receivertest.NewMockHost() - err = jr.StartTraceReception(mh) - assert.NoError(t, err, "should not have failed to start trace reception") - - t.Log("StartTraceReception") - - now := time.Unix(1542158650, 536343000).UTC() - nowPlus10min := now.Add(10 * time.Minute) - nowPlus10min2sec := now.Add(10 * time.Minute).Add(2 * time.Second) - - // 2. Then with a "live application", send spans to the Jaeger exporter. - jexp, err := jaeger.NewExporter(jaeger.Options{ - Process: jaeger.Process{ - ServiceName: "issaTest", - Tags: []jaeger.Tag{ - jaeger.BoolTag("bool", true), - jaeger.StringTag("string", "yes"), - jaeger.Int64Tag("int64", 1e7), - }, - }, - AgentEndpoint: fmt.Sprintf("localhost:%d", config.AgentBinaryThriftPort), - }) - assert.NoError(t, err, "should not have failed to create the Jaeger OpenCensus exporter") - - // 3. Now finally send some spans - for _, sd := range traceFixture(now, nowPlus10min, nowPlus10min2sec) { - jexp.ExportSpan(sd) - } - jexp.Flush() - - // sleep for one second to allow UDP to send. - time.Sleep(1 * time.Second) - - got := sink.AllTraces() - want := expectedTraceData(now, nowPlus10min, nowPlus10min2sec) - - if diff := cmp.Diff(got, want); diff != "" { - t.Errorf("Mismatched responses\n-Got +Want:\n\t%s", diff) - } -} - func expectedTraceData(t1, t2, t3 time.Time) []consumerdata.TraceData { traceID := []byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80} parentSpanID := []byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18} From c030827753a151296202e24d561b9965acc10830 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Sat, 23 Nov 2019 13:05:49 -0500 Subject: [PATCH 07/29] Corrected test to expect jaeger format and removed redundant test Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/jaeger_agent_test.go | 1 + receiver/jaegerreceiver/trace_receiver.go | 1 + .../jaegerreceiver/trace_receiver_test.go | 54 ------------------- 3 files changed, 2 insertions(+), 54 deletions(-) diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 821b36be9bf..93bf3e0aab1 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -220,6 +220,7 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configu }, }, }, + SourceFormat: "jaeger", }, } diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 70560d52ba9..7720a10367f 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -296,6 +296,7 @@ func (jr *jReceiver) EmitZipkinBatch(spans []*zipkincore.Span) error { // Jaeger spans received by the Jaeger agent processor. func (jr *jReceiver) EmitBatch(batch *jaeger.Batch) error { td, err := jaegertranslator.ThriftBatchToOCProto(batch) + td.SourceFormat = "jaeger" if err != nil { observability.RecordMetricsForTraceReceiver(jr.defaultAgentCtx, len(batch.Spans), len(batch.Spans)) return err diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index f2be11f990f..910236b79b4 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -196,60 +196,6 @@ func TestGRPCReceptionWithTLS(t *testing.T) { assert.Equal(t, "", cmp.Diff(got, want)) } -func TestThriftCompactReception(t *testing.T) { - // 1. Create the Jaeger receiver aka "server" - config := &Configuration{ - AgentCompactThriftPort: 6831, // that's the only one used by this test - } - sink := new(exportertest.SinkTraceExporter) - - jr, err := New(context.Background(), config, sink) - defer jr.StopTraceReception() - assert.NoError(t, err, "should not have failed to create the Jaeger received") - - t.Log("Starting") - - mh := receivertest.NewMockHost() - err = jr.StartTraceReception(mh) - assert.NoError(t, err, "should not have failed to start trace reception") - - t.Log("StartTraceReception") - - now := time.Unix(1542158650, 536343000).UTC() - nowPlus10min := now.Add(10 * time.Minute) - nowPlus10min2sec := now.Add(10 * time.Minute).Add(2 * time.Second) - - // 2. Then with a "live application", send spans to the Jaeger exporter. - jexp, err := jaeger.NewExporter(jaeger.Options{ - Process: jaeger.Process{ - ServiceName: "issaTest", - Tags: []jaeger.Tag{ - jaeger.BoolTag("bool", true), - jaeger.StringTag("string", "yes"), - jaeger.Int64Tag("int64", 1e7), - }, - }, - AgentEndpoint: fmt.Sprintf("localhost:%d", config.AgentCompactThriftPort), - }) - assert.NoError(t, err, "should not have failed to create the Jaeger OpenCensus exporter") - - // 3. Now finally send some spans - for _, sd := range traceFixture(now, nowPlus10min, nowPlus10min2sec) { - jexp.ExportSpan(sd) - } - jexp.Flush() - - // sleep for one second to allow UDP to send. - time.Sleep(1 * time.Second) - - got := sink.AllTraces() - want := expectedTraceData(now, nowPlus10min, nowPlus10min2sec) - - if diff := cmp.Diff(got, want); diff != "" { - t.Errorf("Mismatched responses\n-Got +Want:\n\t%s", diff) - } -} - func expectedTraceData(t1, t2, t3 time.Time) []consumerdata.TraceData { traceID := []byte{0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, 0xF8, 0xF9, 0xFA, 0xFB, 0xFC, 0xFD, 0xFE, 0xFF, 0x80} parentSpanID := []byte{0x1F, 0x1E, 0x1D, 0x1C, 0x1B, 0x1A, 0x19, 0x18} From 5b284b7883415c89aa940b2cf5808ed25cb38242 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 25 Nov 2019 08:34:22 -0500 Subject: [PATCH 08/29] Added independently configurable agent processors Signed-off-by: Joe Elliott --- go.sum | 1 + receiver/jaegerreceiver/trace_receiver.go | 85 +++++++++++++---------- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/go.sum b/go.sum index ca85bce092c..0e7837615db 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:Fv9bK1Q+ly/ROk4aJsVMeuIwPel4bEnD8EPiI91nZMg= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I= diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 7720a10367f..1e22dd4173b 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -23,14 +23,18 @@ import ( "net/http" "sync" + apacheThrift "github.com/apache/thrift/lib/go/thrift" "github.com/gorilla/mux" agentapp "github.com/jaegertracing/jaeger/cmd/agent/app" "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" + "github.com/jaegertracing/jaeger/cmd/agent/app/servers" + "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" "github.com/jaegertracing/jaeger/cmd/collector/app" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/thrift-gen/baggage" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + jaegerThrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" "github.com/jaegertracing/jaeger/thrift-gen/sampling" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" "github.com/uber/jaeger-lib/metrics" @@ -39,6 +43,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/jaegertracing/jaeger/cmd/agent/app/processors" "github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/observability" "github.com/open-telemetry/opentelemetry-collector/oterr" @@ -71,12 +76,13 @@ type jReceiver struct { config *Configuration - agent *agentapp.Agent - grpc *grpc.Server tchanServer *jTchannelReceiver collectorServer *http.Server + agentProcessors []processors.Processor + agentServer *http.Server + defaultAgentCtx context.Context } @@ -87,6 +93,10 @@ type jTchannelReceiver struct { } const ( + defaultAgentQueueSize = 1000 + defaultAgentMaxPacketSize = 65000 + defaultAgentServerWorkers = 10 + // As per https://www.jaegertracing.io/docs/1.13/deployment/ // By default, the port used by jaeger-agent to send spans in model.proto format defaultGRPCPort = 14250 @@ -208,9 +218,14 @@ func (jr *jReceiver) stopTraceReceptionLocked() error { jr.stopOnce.Do(func() { var errs []error - if jr.agent != nil { - jr.agent.Stop() - jr.agent = nil + if jr.agentServer != nil { + if aerr := jr.agentServer.Close(); aerr != nil { + errs = append(errs, aerr) + } + jr.agentServer = nil + } + for _, processor := range jr.agentProcessors { + go processor.Stop() } if jr.collectorServer != nil { @@ -348,46 +363,44 @@ func (jr *jReceiver) startAgent(_ receiver.Host) error { return nil } - processorConfigs := []agentapp.ProcessorConfiguration{} + handler := jaegerThrift.NewAgentProcessor(jr) if jr.agentBinaryThriftEnabled() { - processorConfigs = append(processorConfigs, agentapp.ProcessorConfiguration{ - // Binary Thrift running by default on 6832. - Model: "jaeger", - Protocol: "binary", - Server: agentapp.ServerConfiguration{ - HostPort: jr.agentBinaryThriftAddr(), - }, - }) + transport, err := thriftudp.NewTUDPServerTransport(jr.agentBinaryThriftAddr()) + if err != nil { + return err + } + server, err := servers.NewTBufferedServer(transport, defaultAgentQueueSize, defaultAgentMaxPacketSize, metrics.NullFactory) + if err != nil { + return err + } + processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, apacheThrift.NewTCompactProtocolFactory(), handler, zap.NewNop()) + if err != nil { + return err + } + jr.agentProcessors = append(jr.agentProcessors, processor) } if jr.agentCompactThriftEnabled() { - processorConfigs = append(processorConfigs, agentapp.ProcessorConfiguration{ - // Compact Thrift running by default on 6831. - Model: "jaeger", - Protocol: "compact", - Server: agentapp.ServerConfiguration{ - HostPort: jr.agentCompactThriftAddr(), - }, - }) - } - - builder := agentapp.Builder{ - Processors: processorConfigs, - } - - agent, err := builder.CreateAgent(jr, zap.NewNop(), metrics.NullFactory) - if err != nil { - return err + transport, err := thriftudp.NewTUDPServerTransport(jr.agentCompactThriftAddr()) + if err != nil { + return err + } + server, err := servers.NewTBufferedServer(transport, defaultAgentQueueSize, defaultAgentMaxPacketSize, metrics.NullFactory) + if err != nil { + return err + } + processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, apacheThrift.NewTCompactProtocolFactory(), handler, zap.NewNop()) + if err != nil { + return err + } + jr.agentProcessors = append(jr.agentProcessors, processor) } - if err := agent.Run(); err != nil { - return err + for _, processor := range jr.agentProcessors { + go processor.Serve() } - // Otherwise no error was encountered, - jr.agent = agent - return nil } From 8c024bc39300d0490790c53b8d07613bcc7dc5f2 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 25 Nov 2019 09:10:23 -0500 Subject: [PATCH 09/29] Added config tests Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/config_test.go | 10 ++++++++++ receiver/jaegerreceiver/testdata/config.yaml | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/receiver/jaegerreceiver/config_test.go b/receiver/jaegerreceiver/config_test.go index d70825b73ef..eb606091875 100644 --- a/receiver/jaegerreceiver/config_test.go +++ b/receiver/jaegerreceiver/config_test.go @@ -65,6 +65,16 @@ func TestLoadConfig(t *testing.T) { Endpoint: "0.0.0.0:123", }, }, + "thrift-compact": { + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: "0.0.0.0:456", + }, + }, + "thrift-binary": { + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: "0.0.0.0:789", + }, + }, }, }) diff --git a/receiver/jaegerreceiver/testdata/config.yaml b/receiver/jaegerreceiver/testdata/config.yaml index fb34391528d..68c08de2e42 100644 --- a/receiver/jaegerreceiver/testdata/config.yaml +++ b/receiver/jaegerreceiver/testdata/config.yaml @@ -15,6 +15,10 @@ receivers: endpoint: ":3456" thrift-tchannel: endpoint: "0.0.0.0:123" + thrift-compact: + endpoint: "0.0.0.0:456" + thrift-binary: + endpoint: "0.0.0.0:789" # The following demonstrates disabling the receiver. # All of the protocols need to be disabled for the receiver to be disabled. @@ -30,6 +34,10 @@ receivers: disabled: true thrift-tchannel: disabled: true + thrift-compact: + disabled: true + thrift-binary: + disabled: true # The following demonstrates specifying different endpoints. # The Jaeger receiver connects to ports on all available network interfaces. From c945fce9568d6647ef5775c19cec0058b89da295 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 25 Nov 2019 10:19:05 -0500 Subject: [PATCH 10/29] Added support for http agent Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/factory.go | 2 +- receiver/jaegerreceiver/jaeger_agent_test.go | 3 +- receiver/jaegerreceiver/trace_receiver.go | 41 +++++++++++++------ .../jaegerreceiver/trace_receiver_test.go | 7 ++-- 4 files changed, 36 insertions(+), 17 deletions(-) diff --git a/receiver/jaegerreceiver/factory.go b/receiver/jaegerreceiver/factory.go index 099ff65f75d..b4a17463269 100644 --- a/receiver/jaegerreceiver/factory.go +++ b/receiver/jaegerreceiver/factory.go @@ -175,7 +175,7 @@ func (f *Factory) CreateTraceReceiver( } // Create the receiver. - return New(ctx, &config, nextConsumer) + return New(ctx, &config, nextConsumer, logger) } // CreateMetricsReceiver creates a metrics receiver based on provided config. diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 93bf3e0aab1..1df87eb2947 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -25,6 +25,7 @@ import ( tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/google/go-cmp/cmp" "go.opencensus.io/trace" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" @@ -53,7 +54,7 @@ func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) { func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configuration) { // 1. Create the Jaeger receiver aka "server" sink := new(exportertest.SinkTraceExporter) - jr, err := New(context.Background(), receiverConfig, sink) + jr, err := New(context.Background(), receiverConfig, sink, zap.NewNop()) if err != nil { t.Fatalf("Failed to create new Jaeger Receiver: %v", err) } diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 1e22dd4173b..c28ea49c752 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -25,8 +25,9 @@ import ( apacheThrift "github.com/apache/thrift/lib/go/thrift" "github.com/gorilla/mux" - agentapp "github.com/jaegertracing/jaeger/cmd/agent/app" "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" + "github.com/jaegertracing/jaeger/cmd/agent/app/httpserver" + "github.com/jaegertracing/jaeger/cmd/agent/app/processors" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" "github.com/jaegertracing/jaeger/cmd/agent/app/servers" "github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp" @@ -43,7 +44,6 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" - "github.com/jaegertracing/jaeger/cmd/agent/app/processors" "github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/observability" "github.com/open-telemetry/opentelemetry-collector/oterr" @@ -61,6 +61,7 @@ type Configuration struct { AgentCompactThriftPort int AgentBinaryThriftPort int + AgentHTTPPort int } // Receiver type is used to receive spans that were originally intended to be sent to Jaeger. @@ -84,6 +85,7 @@ type jReceiver struct { agentServer *http.Server defaultAgentCtx context.Context + logger *zap.Logger } type jTchannelReceiver struct { @@ -109,7 +111,7 @@ const ( ) // New creates a TraceReceiver that receives traffic as a collector with both Thrift and HTTP transports. -func New(ctx context.Context, config *Configuration, nextConsumer consumer.TraceConsumer) (receiver.TraceReceiver, error) { +func New(ctx context.Context, config *Configuration, nextConsumer consumer.TraceConsumer, logger *zap.Logger) (receiver.TraceReceiver, error) { return &jReceiver{ config: config, defaultAgentCtx: observability.ContextWithReceiverName(context.Background(), "jaeger-agent"), @@ -117,6 +119,7 @@ func New(ctx context.Context, config *Configuration, nextConsumer consumer.Trace tchanServer: &jTchannelReceiver{ nextConsumer: nextConsumer, }, + logger: logger, }, nil } @@ -181,6 +184,18 @@ func (jr *jReceiver) agentBinaryThriftEnabled() bool { return jr.config != nil && jr.config.AgentBinaryThriftPort > 0 } +func (jr *jReceiver) agentHTTPPortAddr() string { + var port int + if jr.config != nil { + port = jr.config.AgentHTTPPort + } + return fmt.Sprintf(":%d", port) +} + +func (jr *jReceiver) agentHTTPPortEnabled() bool { + return jr.config != nil && jr.config.AgentHTTPPort > 0 +} + func (jr *jReceiver) TraceSource() string { return traceSource } @@ -298,8 +313,8 @@ func (jtr *jTchannelReceiver) SubmitBatches(ctx thrift.Context, batches []*jaege } var _ reporter.Reporter = (*jReceiver)(nil) -var _ agentapp.CollectorProxy = (*jReceiver)(nil) var _ api_v2.CollectorServiceServer = (*jReceiver)(nil) +var _ configmanager.ClientConfigManager = (*jReceiver)(nil) // EmitZipkinBatch implements cmd/agent/reporter.Reporter and it forwards // Zipkin spans received by the Jaeger agent processor. @@ -323,14 +338,6 @@ func (jr *jReceiver) EmitBatch(batch *jaeger.Batch) error { return err } -func (jr *jReceiver) GetReporter() reporter.Reporter { - return jr -} - -func (jr *jReceiver) GetManager() configmanager.ClientConfigManager { - return jr -} - func (jr *jReceiver) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { return &sampling.SamplingStrategyResponse{}, nil } @@ -401,6 +408,16 @@ func (jr *jReceiver) startAgent(_ receiver.Host) error { go processor.Serve() } + if jr.agentHTTPPortEnabled() { + jr.agentServer = httpserver.NewHTTPServer(jr.agentHTTPPortAddr(), jr, metrics.NullFactory) + + go func() { + if err := jr.agentServer.ListenAndServe(); err != nil { + jr.logger.Error("http server failure", zap.Error(err)) + } + }() + } + return nil } diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index 910236b79b4..e2f1af24bd8 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opencensus.io/trace" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -49,7 +50,7 @@ func TestReception(t *testing.T) { } sink := new(exportertest.SinkTraceExporter) - jr, err := New(context.Background(), config, sink) + jr, err := New(context.Background(), config, sink, zap.NewNop()) defer jr.StopTraceReception() assert.NoError(t, err, "should not have failed to create the Jaeger received") @@ -100,7 +101,7 @@ func TestGRPCReception(t *testing.T) { } sink := new(exportertest.SinkTraceExporter) - jr, err := New(context.Background(), config, sink) + jr, err := New(context.Background(), config, sink, zap.NewNop()) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.StopTraceReception() @@ -158,7 +159,7 @@ func TestGRPCReceptionWithTLS(t *testing.T) { } sink := new(exportertest.SinkTraceExporter) - jr, err := New(context.Background(), config, sink) + jr, err := New(context.Background(), config, sink, zap.NewNop()) assert.NoError(t, err, "should not have failed to create a new receiver") defer jr.StopTraceReception() From a734960fbb553e480e6cbd90925c1f1f5ef25c85 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 25 Nov 2019 10:57:41 -0500 Subject: [PATCH 11/29] Fixed testbed tests Signed-off-by: Joe Elliott --- testbed/go.mod | 1 + testbed/testbed/mock_backend.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/testbed/go.mod b/testbed/go.mod index b9fac87eecd..bffd9bfbab9 100644 --- a/testbed/go.mod +++ b/testbed/go.mod @@ -12,6 +12,7 @@ require ( github.com/spf13/viper v1.4.1-0.20190911140308-99520c81d86e github.com/stretchr/testify v1.4.0 go.opencensus.io v0.22.1 + go.uber.org/zap v1.10.0 ) replace github.com/open-telemetry/opentelemetry-collector => ../ diff --git a/testbed/testbed/mock_backend.go b/testbed/testbed/mock_backend.go index c2f004c6609..2da50c20c37 100644 --- a/testbed/testbed/mock_backend.go +++ b/testbed/testbed/mock_backend.go @@ -26,6 +26,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/receiver" "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver" "github.com/open-telemetry/opentelemetry-collector/receiver/opencensusreceiver" + "go.uber.org/zap" ) // MockBackend is a backend that allows receiving the data locally. @@ -109,7 +110,7 @@ func (mb *MockBackend) Start(backendType BackendType) error { jaegerCfg := jaegerreceiver.Configuration{ CollectorHTTPPort: 14268, } - mb.jaegerReceiver, err = jaegerreceiver.New(context.Background(), &jaegerCfg, mb.tc) + mb.jaegerReceiver, err = jaegerreceiver.New(context.Background(), &jaegerCfg, mb.tc, zap.NewNop()) if err != nil { return err } From 073410cf2aa6b4fbb65dec675aac745ea1ba6500 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 25 Nov 2019 11:04:33 -0500 Subject: [PATCH 12/29] Fixed imports Signed-off-by: Joe Elliott --- testbed/testbed/mock_backend.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/testbed/testbed/mock_backend.go b/testbed/testbed/mock_backend.go index 2da50c20c37..5214544b0b2 100644 --- a/testbed/testbed/mock_backend.go +++ b/testbed/testbed/mock_backend.go @@ -22,11 +22,12 @@ import ( "sync" "sync/atomic" + "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/receiver" "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver" "github.com/open-telemetry/opentelemetry-collector/receiver/opencensusreceiver" - "go.uber.org/zap" ) // MockBackend is a backend that allows receiving the data locally. From 96a6fd96ad290fd5057db3e70b653868df29482a Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 25 Nov 2019 11:51:06 -0500 Subject: [PATCH 13/29] Improved coverage in factory.go Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/factory_test.go | 30 +++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/receiver/jaegerreceiver/factory_test.go b/receiver/jaegerreceiver/factory_test.go index 11dfc3ae30f..fde495253a1 100644 --- a/receiver/jaegerreceiver/factory_test.go +++ b/receiver/jaegerreceiver/factory_test.go @@ -23,6 +23,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector/config/configcheck" "github.com/open-telemetry/opentelemetry-collector/config/configerror" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/receiver" ) func TestCreateDefaultConfig(t *testing.T) { @@ -75,6 +77,34 @@ func TestCreateInvalidTChannelEndpoint(t *testing.T) { assert.Error(t, err, "receiver creation with invalid tchannel endpoint must fail") } +func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) { + factory := Factory{} + cfg := factory.CreateDefaultConfig() + rCfg := cfg.(*Config) + + rCfg.Protocols[protoThriftBinary] = &receiver.SecureReceiverSettings{ + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: "", + }, + } + _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + assert.Error(t, err, "receiver creation with no endpoints must fail") +} + +func TestCreateInvalidThriftCompactEndpoint(t *testing.T) { + factory := Factory{} + cfg := factory.CreateDefaultConfig() + rCfg := cfg.(*Config) + + rCfg.Protocols[protoThriftCompact] = &receiver.SecureReceiverSettings{ + ReceiverSettings: configmodels.ReceiverSettings{ + Endpoint: "", + }, + } + _, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil) + assert.Error(t, err, "receiver creation with no endpoints must fail") +} + func TestCreateNoPort(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() From 12937ffaa6558e376d08c6679c72c1224cf2e229 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 25 Nov 2019 12:12:55 -0500 Subject: [PATCH 14/29] Added http proxy tests Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/jaeger_agent_test.go | 34 ++++++++++++++++++++ receiver/jaegerreceiver/trace_receiver.go | 6 ++-- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 1df87eb2947..7c8b514480c 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -17,6 +17,7 @@ package jaegerreceiver import ( "context" "fmt" + "net/http" "testing" "time" @@ -24,6 +25,7 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" "go.opencensus.io/trace" "go.uber.org/zap" @@ -51,6 +53,38 @@ func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) { }) } +func TestJaegerHTTP(t *testing.T) { + config := &Configuration{ + AgentHTTPPort: 5778, + } + jr, err := New(context.Background(), config, nil, zap.NewNop()) + if err != nil { + t.Fatalf("Failed to create new Jaeger Receiver: %v", err) + } + defer jr.StopTraceReception() + + mh := receivertest.NewMockHost() + if err := jr.StartTraceReception(mh); err != nil { + t.Fatalf("StartTraceReception failed: %v", err) + } + + // allow http server to start + <-time.After(100 * time.Millisecond) + + // this functionality is just stubbed out at the moment. just confirm they 200. + resp, err := http.Get("http://localhost:5778/sampling?service=test") + assert.NoError(t, err, "should not have failed to make request") + if resp != nil { + assert.Equal(t, 200, resp.StatusCode, "should have returned 200") + } + + resp, err = http.Get("http://localhost:5778/baggageRestrictions?service=test") + assert.NoError(t, err, "should not have failed to make request") + if resp != nil { + assert.Equal(t, 200, resp.StatusCode, "should have returned 200") + } +} + func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configuration) { // 1. Create the Jaeger receiver aka "server" sink := new(exportertest.SinkTraceExporter) diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index c28ea49c752..45e3e4edf87 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -192,7 +192,7 @@ func (jr *jReceiver) agentHTTPPortAddr() string { return fmt.Sprintf(":%d", port) } -func (jr *jReceiver) agentHTTPPortEnabled() bool { +func (jr *jReceiver) agentHTTPEnabled() bool { return jr.config != nil && jr.config.AgentHTTPPort > 0 } @@ -366,7 +366,7 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) } func (jr *jReceiver) startAgent(_ receiver.Host) error { - if !jr.agentBinaryThriftEnabled() && !jr.agentCompactThriftEnabled() { + if !jr.agentBinaryThriftEnabled() && !jr.agentCompactThriftEnabled() && !jr.agentHTTPEnabled() { return nil } @@ -408,7 +408,7 @@ func (jr *jReceiver) startAgent(_ receiver.Host) error { go processor.Serve() } - if jr.agentHTTPPortEnabled() { + if jr.agentHTTPEnabled() { jr.agentServer = httpserver.NewHTTPServer(jr.agentHTTPPortAddr(), jr, metrics.NullFactory) go func() { From dd99d1ee024d65a83df4c5c1603e6c5ad9ac2171 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 26 Nov 2019 11:42:00 -0500 Subject: [PATCH 15/29] Moved location of the testdata reference to show it refers to both agent and collector protocols Signed-off-by: Joe Elliott --- receiver/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/receiver/README.md b/receiver/README.md index df3dbf4f65f..9dd9dfa3cc3 100644 --- a/receiver/README.md +++ b/receiver/README.md @@ -120,16 +120,16 @@ receivers: jaeger: ``` -It is possible to configure the protocols on different ports, refer to -[config.yaml](jaegerreceiver/testdata/config.yaml) for detailed config -examples. - It also supports the Jaeger Agent protocols: - Thrift Compact - Thrift Binary By default, these services are not started unless an endpoint is explicitly defined. +It is possible to configure the protocols on different ports, refer to +[config.yaml](jaegerreceiver/testdata/config.yaml) for detailed config +examples. + // TODO Issue https://github.com/open-telemetry/opentelemetry-collector/issues/158 // The Jaeger receiver enables all protocols even when one is specified or a // subset is enabled. The documentation should be updated when that fix occurs. From 1e2623ea1cbde039eff39484bc835157c90e6f89 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 26 Nov 2019 13:29:21 -0500 Subject: [PATCH 16/29] Replaced hardcoded port with dynamic Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/jaeger_agent_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 7c8b514480c..264d24c4b14 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -32,6 +32,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" "github.com/open-telemetry/opentelemetry-collector/internal" + "github.com/open-telemetry/opentelemetry-collector/internal/testutils" "github.com/open-telemetry/opentelemetry-collector/receiver/receivertest" ) @@ -54,8 +55,9 @@ func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) { } func TestJaegerHTTP(t *testing.T) { + port := int(testutils.GetAvailablePort(t)) config := &Configuration{ - AgentHTTPPort: 5778, + AgentHTTPPort: port, } jr, err := New(context.Background(), config, nil, zap.NewNop()) if err != nil { @@ -72,13 +74,15 @@ func TestJaegerHTTP(t *testing.T) { <-time.After(100 * time.Millisecond) // this functionality is just stubbed out at the moment. just confirm they 200. - resp, err := http.Get("http://localhost:5778/sampling?service=test") + testURL := fmt.Sprintf("http://localhost:%d/sampling?service=test", port) + resp, err := http.Get(testURL) assert.NoError(t, err, "should not have failed to make request") if resp != nil { assert.Equal(t, 200, resp.StatusCode, "should have returned 200") } - resp, err = http.Get("http://localhost:5778/baggageRestrictions?service=test") + testURL = fmt.Sprintf("http://localhost:%d/sampling?service=test", port) + resp, err = http.Get(testURL) assert.NoError(t, err, "should not have failed to make request") if resp != nil { assert.Equal(t, 200, resp.StatusCode, "should have returned 200") From 78dcc9451ffee6d939bf3a5a6d876ac31180499e Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 26 Nov 2019 13:32:37 -0500 Subject: [PATCH 17/29] Synchronously stop processors Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/trace_receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 45e3e4edf87..f00f66202d2 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -240,7 +240,7 @@ func (jr *jReceiver) stopTraceReceptionLocked() error { jr.agentServer = nil } for _, processor := range jr.agentProcessors { - go processor.Stop() + processor.Stop() } if jr.collectorServer != nil { From 5a3ce04727e21801bcfe0a0298072552ed3ccd8f Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 26 Nov 2019 13:55:35 -0500 Subject: [PATCH 18/29] Added testutils method to wait for a port and used it to wait for the http server Signed-off-by: Joe Elliott --- internal/testutils/testutils.go | 21 ++++++++++++++++++++ receiver/jaegerreceiver/jaeger_agent_test.go | 8 +++++--- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/internal/testutils/testutils.go b/internal/testutils/testutils.go index d5de3c92f4e..b8753b230b7 100644 --- a/internal/testutils/testutils.go +++ b/internal/testutils/testutils.go @@ -16,9 +16,11 @@ package testutils import ( "encoding/json" + "fmt" "net" "strconv" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -67,3 +69,22 @@ func GetAvailablePort(t *testing.T) uint16 { return uint16(portInt) } + +// WaitForPort repeatedly attempts to open a local port until it either succeeds or 5 seconds pass +// It is useful if you need to asynchronously start a service and wait for it to start +func WaitForPort(t *testing.T, port uint16) error { + t.Helper() + + duration := 100 * time.Millisecond + iterations := 50 // 50 * 100ms = 5 seconds + address := fmt.Sprintf("localhost:%d", port) + for i := 0; i < iterations; i++ { + _, err := net.Dial("tcp", address) + + if err == nil { + return nil + } + time.Sleep(duration) + } + return fmt.Errorf("failed to wait for port %d", port) +} diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 264d24c4b14..a8f2d68c1c0 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -55,9 +55,9 @@ func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) { } func TestJaegerHTTP(t *testing.T) { - port := int(testutils.GetAvailablePort(t)) + port := testutils.GetAvailablePort(t) config := &Configuration{ - AgentHTTPPort: port, + AgentHTTPPort: int(port), } jr, err := New(context.Background(), config, nil, zap.NewNop()) if err != nil { @@ -71,7 +71,9 @@ func TestJaegerHTTP(t *testing.T) { } // allow http server to start - <-time.After(100 * time.Millisecond) + if err := testutils.WaitForPort(t, port); err != nil { + t.Fatalf("WaitForPort failed: %v", err) + } // this functionality is just stubbed out at the moment. just confirm they 200. testURL := fmt.Sprintf("http://localhost:%d/sampling?service=test", port) From 8657535798f386c383c1284f88f21304002b4579 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 26 Nov 2019 20:39:26 -0500 Subject: [PATCH 19/29] Cleaned up gross wait function Signed-off-by: Joe Elliott --- internal/testutils/testutils.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/testutils/testutils.go b/internal/testutils/testutils.go index b8753b230b7..c2e0eb63532 100644 --- a/internal/testutils/testutils.go +++ b/internal/testutils/testutils.go @@ -75,16 +75,16 @@ func GetAvailablePort(t *testing.T) uint16 { func WaitForPort(t *testing.T, port uint16) error { t.Helper() - duration := 100 * time.Millisecond - iterations := 50 // 50 * 100ms = 5 seconds + totalDuration := 5 * time.Second + wait := 100 * time.Millisecond address := fmt.Sprintf("localhost:%d", port) - for i := 0; i < iterations; i++ { + for i := totalDuration; i > 0; i -= wait { _, err := net.Dial("tcp", address) if err == nil { return nil } - time.Sleep(duration) + time.Sleep(wait) } return fmt.Errorf("failed to wait for port %d", port) } From 3a10dfbe50d42bca17daf607bfffc04d6db0228f Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 27 Nov 2019 14:20:20 -0500 Subject: [PATCH 20/29] Added WaitForPort Test Signed-off-by: Joe Elliott --- internal/testutils/testutils.go | 5 +++-- internal/testutils/testutils_test.go | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/internal/testutils/testutils.go b/internal/testutils/testutils.go index c2e0eb63532..c5fcfd5c859 100644 --- a/internal/testutils/testutils.go +++ b/internal/testutils/testutils.go @@ -79,9 +79,10 @@ func WaitForPort(t *testing.T, port uint16) error { wait := 100 * time.Millisecond address := fmt.Sprintf("localhost:%d", port) for i := totalDuration; i > 0; i -= wait { - _, err := net.Dial("tcp", address) + conn, err := net.Dial("tcp", address) - if err == nil { + if err == nil && conn != nil { + conn.Close() return nil } time.Sleep(wait) diff --git a/internal/testutils/testutils_test.go b/internal/testutils/testutils_test.go index 9b07b0d8a92..a345af5e358 100644 --- a/internal/testutils/testutils_test.go +++ b/internal/testutils/testutils_test.go @@ -15,6 +15,7 @@ package testutils import ( + "fmt" "net" "strconv" "testing" @@ -33,6 +34,20 @@ func TestGetAvailablePort(t *testing.T) { testEndpointAvailable(t, "localhost:"+portStr) } +func TestWaitForPort(t *testing.T) { + port := GetAvailablePort(t) + err := WaitForPort(t, port) + require.Error(t, err) + + port = GetAvailablePort(t) + l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + err = WaitForPort(t, port) + require.NoError(t, err) + + err = l.Close() + require.NoError(t, err) +} + func testEndpointAvailable(t *testing.T, endpoint string) { // Endpoint should be free. ln0, err := net.Listen("tcp", endpoint) From 651ea968f12b1b203ce843633441cc0fed4085cc Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 27 Nov 2019 14:35:35 -0500 Subject: [PATCH 21/29] Fixed testutils error. Added baggageRestrictions test Signed-off-by: Joe Elliott --- internal/testutils/testutils_test.go | 2 ++ receiver/jaegerreceiver/jaeger_agent_test.go | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/internal/testutils/testutils_test.go b/internal/testutils/testutils_test.go index a345af5e358..ea0fde7a6d8 100644 --- a/internal/testutils/testutils_test.go +++ b/internal/testutils/testutils_test.go @@ -41,6 +41,8 @@ func TestWaitForPort(t *testing.T) { port = GetAvailablePort(t) l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + require.NoError(t, err) + err = WaitForPort(t, port) require.NoError(t, err) diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index a8f2d68c1c0..516f906809b 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -89,6 +89,13 @@ func TestJaegerHTTP(t *testing.T) { if resp != nil { assert.Equal(t, 200, resp.StatusCode, "should have returned 200") } + + testURL = fmt.Sprintf("http://localhost:%d/baggageRestrictions?service=test", port) + resp, err = http.Get(testURL) + assert.NoError(t, err, "should not have failed to make request") + if resp != nil { + assert.Equal(t, 200, resp.StatusCode, "should have returned 200") + } } func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configuration) { From 787d867c097cc6a6f1dc5f7c0fc64285ad3b19c5 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 27 Nov 2019 15:16:08 -0500 Subject: [PATCH 22/29] Added tests for port in use and trace source Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/trace_receiver.go | 2 +- .../jaegerreceiver/trace_receiver_test.go | 51 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index f00f66202d2..43966d3bd55 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -381,7 +381,7 @@ func (jr *jReceiver) startAgent(_ receiver.Host) error { if err != nil { return err } - processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, apacheThrift.NewTCompactProtocolFactory(), handler, zap.NewNop()) + processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, apacheThrift.NewTBinaryProtocolFactoryDefault(), handler, zap.NewNop()) if err != nil { return err } diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index e2f1af24bd8..978a7e50e7f 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/binary" "fmt" + "net" "path" "testing" "time" @@ -38,11 +39,19 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" "github.com/open-telemetry/opentelemetry-collector/internal" + "github.com/open-telemetry/opentelemetry-collector/internal/testutils" "github.com/open-telemetry/opentelemetry-collector/receiver" "github.com/open-telemetry/opentelemetry-collector/receiver/receivertest" tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace" ) +func TestTraceSource(t *testing.T) { + jr, err := New(context.Background(), &Configuration{}, nil, zap.NewNop()) + assert.NoError(t, err, "should not have failed to create the Jaeger receiver") + + assert.Equal(t, traceSource, jr.TraceSource()) +} + func TestReception(t *testing.T) { // 1. Create the Jaeger receiver aka "server" config := &Configuration{ @@ -94,6 +103,27 @@ func TestReception(t *testing.T) { } } +func TestHTTPCollectorPortInUse(t *testing.T) { + port := testutils.GetAvailablePort(t) + + config := &Configuration{ + CollectorHTTPPort: int(port), + } + sink := new(exportertest.SinkTraceExporter) + + l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + defer l.Close() + assert.NoErrorf(t, err, "should have been able to open port %d", port) + + jr, err := New(context.Background(), config, sink, zap.NewNop()) + defer jr.StopTraceReception() + assert.NoError(t, err, "should have been to create the receiver") + + mh := receivertest.NewMockHost() + err = jr.StartTraceReception(mh) + assert.Error(t, err, "should not have been able to start the collector") +} + func TestGRPCReception(t *testing.T) { // prepare config := &Configuration{ @@ -141,6 +171,27 @@ func TestGRPCReception(t *testing.T) { } +func TestGRPCCollectorPortInUse(t *testing.T) { + port := testutils.GetAvailablePort(t) + + config := &Configuration{ + CollectorGRPCPort: int(port), + } + sink := new(exportertest.SinkTraceExporter) + + l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + defer l.Close() + assert.NoErrorf(t, err, "should have been able to open port %d", port) + + jr, err := New(context.Background(), config, sink, zap.NewNop()) + defer jr.StopTraceReception() + assert.NoError(t, err, "should have been to create the receiver") + + mh := receivertest.NewMockHost() + err = jr.StartTraceReception(mh) + assert.Error(t, err, "should not have been able to start the collector") +} + func TestGRPCReceptionWithTLS(t *testing.T) { // prepare grpcServerOptions := []grpc.ServerOption{} From 899a22cc76bedf85c1da5d68b5b70ee90ee4ae32 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 27 Nov 2019 15:20:43 -0500 Subject: [PATCH 23/29] Pass logger to processors Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/trace_receiver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 43966d3bd55..237000dc3bc 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -381,7 +381,7 @@ func (jr *jReceiver) startAgent(_ receiver.Host) error { if err != nil { return err } - processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, apacheThrift.NewTBinaryProtocolFactoryDefault(), handler, zap.NewNop()) + processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, apacheThrift.NewTBinaryProtocolFactoryDefault(), handler, jr.logger) if err != nil { return err } @@ -397,7 +397,7 @@ func (jr *jReceiver) startAgent(_ receiver.Host) error { if err != nil { return err } - processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, apacheThrift.NewTCompactProtocolFactory(), handler, zap.NewNop()) + processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, apacheThrift.NewTCompactProtocolFactory(), handler, jr.logger) if err != nil { return err } From c9a1e788e2119dd3c5f94f52660b16e847450448 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 27 Nov 2019 15:46:38 -0500 Subject: [PATCH 24/29] Removed flaky tests Signed-off-by: Joe Elliott --- .../jaegerreceiver/trace_receiver_test.go | 44 ------------------- 1 file changed, 44 deletions(-) diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index 978a7e50e7f..ab1dd49aa53 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -18,7 +18,6 @@ import ( "context" "encoding/binary" "fmt" - "net" "path" "testing" "time" @@ -39,7 +38,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" "github.com/open-telemetry/opentelemetry-collector/internal" - "github.com/open-telemetry/opentelemetry-collector/internal/testutils" "github.com/open-telemetry/opentelemetry-collector/receiver" "github.com/open-telemetry/opentelemetry-collector/receiver/receivertest" tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace" @@ -103,27 +101,6 @@ func TestReception(t *testing.T) { } } -func TestHTTPCollectorPortInUse(t *testing.T) { - port := testutils.GetAvailablePort(t) - - config := &Configuration{ - CollectorHTTPPort: int(port), - } - sink := new(exportertest.SinkTraceExporter) - - l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) - defer l.Close() - assert.NoErrorf(t, err, "should have been able to open port %d", port) - - jr, err := New(context.Background(), config, sink, zap.NewNop()) - defer jr.StopTraceReception() - assert.NoError(t, err, "should have been to create the receiver") - - mh := receivertest.NewMockHost() - err = jr.StartTraceReception(mh) - assert.Error(t, err, "should not have been able to start the collector") -} - func TestGRPCReception(t *testing.T) { // prepare config := &Configuration{ @@ -171,27 +148,6 @@ func TestGRPCReception(t *testing.T) { } -func TestGRPCCollectorPortInUse(t *testing.T) { - port := testutils.GetAvailablePort(t) - - config := &Configuration{ - CollectorGRPCPort: int(port), - } - sink := new(exportertest.SinkTraceExporter) - - l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) - defer l.Close() - assert.NoErrorf(t, err, "should have been able to open port %d", port) - - jr, err := New(context.Background(), config, sink, zap.NewNop()) - defer jr.StopTraceReception() - assert.NoError(t, err, "should have been to create the receiver") - - mh := receivertest.NewMockHost() - err = jr.StartTraceReception(mh) - assert.Error(t, err, "should not have been able to start the collector") -} - func TestGRPCReceptionWithTLS(t *testing.T) { // prepare grpcServerOptions := []grpc.ServerOption{} From 18b41d638820f46e552e5271c66f47177ea4fcdb Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 27 Nov 2019 16:41:41 -0500 Subject: [PATCH 25/29] Added test to confirm binary thrift opens the right port Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/jaeger_agent_test.go | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 516f906809b..6dd148c06fb 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -17,6 +17,7 @@ package jaegerreceiver import ( "context" "fmt" + "net" "net/http" "testing" "time" @@ -54,6 +55,32 @@ func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) { }) } +func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { + // This test confirms that the thrift binary port is opened correctly. This is all we can test at the moment. See above. + port := testutils.GetAvailablePort(t) + + config := &Configuration{ + AgentBinaryThriftPort: int(port), + } + jr, err := New(context.Background(), config, nil, zap.NewNop()) + if err != nil { + t.Fatalf("Failed to create new Jaeger Receiver: %v", err) + } + + mh := receivertest.NewMockHost() + if err := jr.StartTraceReception(mh); err != nil { + t.Fatalf("StartTraceReception failed: %v", err) + } + defer jr.StopTraceReception() + + l, err := net.Listen("udp", fmt.Sprintf("localhost:%d", port)) + assert.Error(t, err, "should not have been able to listen to the port") + + if l != nil { + l.Close() + } +} + func TestJaegerHTTP(t *testing.T) { port := testutils.GetAvailablePort(t) config := &Configuration{ From 234fdb5a7f571d4b5606aca220259d5b3afcfe54 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 27 Nov 2019 17:03:45 -0500 Subject: [PATCH 26/29] Added tests to to confirm invalid ports would not start Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/jaeger_agent_test.go | 36 ++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 6dd148c06fb..66c07619f33 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -45,6 +45,24 @@ func TestJaegerAgentUDP_ThriftCompact_6831(t *testing.T) { }) } +func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) { + port := 999999 + + config := &Configuration{ + AgentCompactThriftPort: int(port), + } + jr, err := New(context.Background(), config, nil, zap.NewNop()) + if err != nil { + t.Fatalf("Failed to create new Jaeger Receiver: %v", err) + } + + mh := receivertest.NewMockHost() + err = jr.StartTraceReception(mh) + assert.Error(t, err, "should not have been able to startTraceReception") + + jr.StopTraceReception() +} + func TestJaegerAgentUDP_ThriftBinary_6832(t *testing.T) { t.Skipf("Unfortunately due to Jaeger internal versioning, OpenCensus-Go's Thrift seems to conflict with ours") @@ -81,6 +99,24 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { } } +func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) { + port := 999999 + + config := &Configuration{ + AgentBinaryThriftPort: int(port), + } + jr, err := New(context.Background(), config, nil, zap.NewNop()) + if err != nil { + t.Fatalf("Failed to create new Jaeger Receiver: %v", err) + } + + mh := receivertest.NewMockHost() + err = jr.StartTraceReception(mh) + assert.Error(t, err, "should not have been able to startTraceReception") + + jr.StopTraceReception() +} + func TestJaegerHTTP(t *testing.T) { port := testutils.GetAvailablePort(t) config := &Configuration{ From 511ead517ee81840263069be12d972eed3a8ca5a Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 27 Nov 2019 17:24:04 -0500 Subject: [PATCH 27/29] Only call startAgent to avoid startCollector race condition Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/jaeger_agent_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index 66c07619f33..a5145fbd1b1 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -86,7 +86,7 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { } mh := receivertest.NewMockHost() - if err := jr.StartTraceReception(mh); err != nil { + if err := jr.(*jReceiver).startAgent(mh); err != nil { t.Fatalf("StartTraceReception failed: %v", err) } defer jr.StopTraceReception() From ca2d038a4e452030d9d76a37cee01552e5f79ad5 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 4 Dec 2019 08:09:24 -0500 Subject: [PATCH 28/29] Migrated assert.NoError Signed-off-by: Joe Elliott --- receiver/jaegerreceiver/jaeger_agent_test.go | 44 +++++++------------- 1 file changed, 14 insertions(+), 30 deletions(-) diff --git a/receiver/jaegerreceiver/jaeger_agent_test.go b/receiver/jaegerreceiver/jaeger_agent_test.go index a5145fbd1b1..c8fe181a4de 100644 --- a/receiver/jaegerreceiver/jaeger_agent_test.go +++ b/receiver/jaegerreceiver/jaeger_agent_test.go @@ -52,9 +52,7 @@ func TestJaegerAgentUDP_ThriftCompact_InvalidPort(t *testing.T) { AgentCompactThriftPort: int(port), } jr, err := New(context.Background(), config, nil, zap.NewNop()) - if err != nil { - t.Fatalf("Failed to create new Jaeger Receiver: %v", err) - } + assert.NoError(t, err, "Failed to create new Jaeger Receiver") mh := receivertest.NewMockHost() err = jr.StartTraceReception(mh) @@ -81,14 +79,11 @@ func TestJaegerAgentUDP_ThriftBinary_PortInUse(t *testing.T) { AgentBinaryThriftPort: int(port), } jr, err := New(context.Background(), config, nil, zap.NewNop()) - if err != nil { - t.Fatalf("Failed to create new Jaeger Receiver: %v", err) - } + assert.NoError(t, err, "Failed to create new Jaeger Receiver") mh := receivertest.NewMockHost() - if err := jr.(*jReceiver).startAgent(mh); err != nil { - t.Fatalf("StartTraceReception failed: %v", err) - } + err = jr.(*jReceiver).startAgent(mh) + assert.NoError(t, err, "StartTraceReception failed") defer jr.StopTraceReception() l, err := net.Listen("udp", fmt.Sprintf("localhost:%d", port)) @@ -106,9 +101,7 @@ func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) { AgentBinaryThriftPort: int(port), } jr, err := New(context.Background(), config, nil, zap.NewNop()) - if err != nil { - t.Fatalf("Failed to create new Jaeger Receiver: %v", err) - } + assert.NoError(t, err, "Failed to create new Jaeger Receiver") mh := receivertest.NewMockHost() err = jr.StartTraceReception(mh) @@ -123,20 +116,16 @@ func TestJaegerHTTP(t *testing.T) { AgentHTTPPort: int(port), } jr, err := New(context.Background(), config, nil, zap.NewNop()) - if err != nil { - t.Fatalf("Failed to create new Jaeger Receiver: %v", err) - } + assert.NoError(t, err, "Failed to create new Jaeger Receiver") defer jr.StopTraceReception() mh := receivertest.NewMockHost() - if err := jr.StartTraceReception(mh); err != nil { - t.Fatalf("StartTraceReception failed: %v", err) - } + err = jr.StartTraceReception(mh) + assert.NoError(t, err, "StartTraceReception failed") // allow http server to start - if err := testutils.WaitForPort(t, port); err != nil { - t.Fatalf("WaitForPort failed: %v", err) - } + err = testutils.WaitForPort(t, port) + assert.NoError(t, err, "WaitForPort failed") // this functionality is just stubbed out at the moment. just confirm they 200. testURL := fmt.Sprintf("http://localhost:%d/sampling?service=test", port) @@ -165,15 +154,12 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configu // 1. Create the Jaeger receiver aka "server" sink := new(exportertest.SinkTraceExporter) jr, err := New(context.Background(), receiverConfig, sink, zap.NewNop()) - if err != nil { - t.Fatalf("Failed to create new Jaeger Receiver: %v", err) - } + assert.NoError(t, err, "Failed to create new Jaeger Receiver") defer jr.StopTraceReception() mh := receivertest.NewMockHost() - if err := jr.StartTraceReception(mh); err != nil { - t.Fatalf("StartTraceReception failed: %v", err) - } + err = jr.StartTraceReception(mh) + assert.NoError(t, err, "StartTraceReception failed") now := time.Unix(1542158650, 536343000).UTC() nowPlus10min := now.Add(10 * time.Minute) @@ -192,9 +178,7 @@ func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *Configu }, }, }) - if err != nil { - t.Fatalf("Failed to create the Jaeger OpenCensus exporter for the live application: %v", err) - } + assert.NoError(t, err, "Failed to create the Jaeger OpenCensus exporter for the live application") // 3. Now finally send some spans spandata := []*trace.SpanData{ From 37dae1d20c333b86e4b5f5351dca997f71984901 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 4 Dec 2019 08:24:05 -0500 Subject: [PATCH 29/29] Consolidated similar code into a one function Signed-off-by: Joe Elliott --- go.mod | 2 +- go.sum | 2 ++ receiver/jaegerreceiver/trace_receiver.go | 39 +++++++++++------------ 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 26b1ae6490a..d4e21aca695 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/stretchr/testify v1.4.0 github.com/uber-go/atomic v1.4.0 // indirect github.com/uber/jaeger-client-go v2.16.0+incompatible // indirect - github.com/uber/jaeger-lib v2.0.0+incompatible + github.com/uber/jaeger-lib v2.2.0+incompatible github.com/uber/tchannel-go v1.10.0 go.opencensus.io v0.22.1 go.uber.org/zap v1.10.0 diff --git a/go.sum b/go.sum index 0e7837615db..7f7f161a8e3 100644 --- a/go.sum +++ b/go.sum @@ -554,6 +554,8 @@ github.com/uber/jaeger-client-go v2.16.0+incompatible h1:Q2Pp6v3QYiocMxomCaJuwQG github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.0.0+incompatible h1:iMSCV0rmXEogjNWPh2D0xk9YVKvrtGoHJNe9ebLu/pw= github.com/uber/jaeger-lib v2.0.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= +github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= +github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/uber/tchannel-go v1.10.0 h1:YOihLHuvkwT3nzvpgqFtexFW+pb5vD1Tz7h/bIWApgE= github.com/uber/tchannel-go v1.10.0/go.mod h1:Rrgz1eL8kMjW/nEzZos0t+Heq0O4LhnUJVA32OvWKHo= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 237000dc3bc..43c384cb128 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -370,18 +370,8 @@ func (jr *jReceiver) startAgent(_ receiver.Host) error { return nil } - handler := jaegerThrift.NewAgentProcessor(jr) - if jr.agentBinaryThriftEnabled() { - transport, err := thriftudp.NewTUDPServerTransport(jr.agentBinaryThriftAddr()) - if err != nil { - return err - } - server, err := servers.NewTBufferedServer(transport, defaultAgentQueueSize, defaultAgentMaxPacketSize, metrics.NullFactory) - if err != nil { - return err - } - processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, apacheThrift.NewTBinaryProtocolFactoryDefault(), handler, jr.logger) + processor, err := jr.buildProcessor(jr.agentBinaryThriftAddr(), apacheThrift.NewTBinaryProtocolFactoryDefault()) if err != nil { return err } @@ -389,15 +379,7 @@ func (jr *jReceiver) startAgent(_ receiver.Host) error { } if jr.agentCompactThriftEnabled() { - transport, err := thriftudp.NewTUDPServerTransport(jr.agentCompactThriftAddr()) - if err != nil { - return err - } - server, err := servers.NewTBufferedServer(transport, defaultAgentQueueSize, defaultAgentMaxPacketSize, metrics.NullFactory) - if err != nil { - return err - } - processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, apacheThrift.NewTCompactProtocolFactory(), handler, jr.logger) + processor, err := jr.buildProcessor(jr.agentCompactThriftAddr(), apacheThrift.NewTCompactProtocolFactory()) if err != nil { return err } @@ -421,6 +403,23 @@ func (jr *jReceiver) startAgent(_ receiver.Host) error { return nil } +func (jr *jReceiver) buildProcessor(address string, factory apacheThrift.TProtocolFactory) (processors.Processor, error) { + handler := jaegerThrift.NewAgentProcessor(jr) + transport, err := thriftudp.NewTUDPServerTransport(address) + if err != nil { + return nil, err + } + server, err := servers.NewTBufferedServer(transport, defaultAgentQueueSize, defaultAgentMaxPacketSize, metrics.NullFactory) + if err != nil { + return nil, err + } + processor, err := processors.NewThriftProcessor(server, defaultAgentServerWorkers, metrics.NullFactory, factory, handler, jr.logger) + if err != nil { + return nil, err + } + return processor, nil +} + func (jr *jReceiver) startCollector(host receiver.Host) error { tch, terr := tchannel.NewChannel("jaeger-collector", new(tchannel.ChannelOptions)) if terr != nil {