Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Jaeger Agent Configuration #434

Merged
merged 29 commits into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6640ea1
Added agent protocol config wiring. Removed agent http server
joe-elliott Nov 22, 2019
46dbf35
Conditionally start agent protocols based on config
joe-elliott Nov 22, 2019
c705302
Bail out of startAgent if nothing is configured
joe-elliott Nov 22, 2019
fd4799f
Updated readme
joe-elliott Nov 22, 2019
b131c9b
Added tests
joe-elliott Nov 23, 2019
88a60d0
Removed binary test because opencensus jaeger exporter doesn't suppor…
joe-elliott Nov 23, 2019
c030827
Corrected test to expect jaeger format and removed redundant test
joe-elliott Nov 23, 2019
5b284b7
Added independently configurable agent processors
joe-elliott Nov 25, 2019
8c024bc
Added config tests
joe-elliott Nov 25, 2019
c945fce
Added support for http agent
joe-elliott Nov 25, 2019
a734960
Fixed testbed tests
joe-elliott Nov 25, 2019
073410c
Fixed imports
joe-elliott Nov 25, 2019
96a6fd9
Improved coverage in factory.go
joe-elliott Nov 25, 2019
12937ff
Added http proxy tests
joe-elliott Nov 25, 2019
dd99d1e
Moved location of the testdata reference to show it refers to both ag…
joe-elliott Nov 26, 2019
1e2623e
Replaced hardcoded port with dynamic
joe-elliott Nov 26, 2019
78dcc94
Synchronously stop processors
joe-elliott Nov 26, 2019
5a3ce04
Added testutils method to wait for a port and used it to wait for the…
joe-elliott Nov 26, 2019
8657535
Cleaned up gross wait function
joe-elliott Nov 27, 2019
3a10dfb
Added WaitForPort Test
joe-elliott Nov 27, 2019
651ea96
Fixed testutils error. Added baggageRestrictions test
joe-elliott Nov 27, 2019
787d867
Added tests for port in use and trace source
joe-elliott Nov 27, 2019
899a22c
Pass logger to processors
joe-elliott Nov 27, 2019
c9a1e78
Removed flaky tests
joe-elliott Nov 27, 2019
18b41d6
Added test to confirm binary thrift opens the right port
joe-elliott Nov 27, 2019
234fdb5
Added tests to to confirm invalid ports would not start
joe-elliott Nov 27, 2019
511ead5
Only call startAgent to avoid startCollector race condition
joe-elliott Nov 27, 2019
ca2d038
Migrated assert.NoError
joe-elliott Dec 4, 2019
37dae1d
Consolidated similar code into a one function
joe-elliott Dec 4, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ receivers:
endpoint: "localhost:14268"
thrift-tchannel:
endpoint: "localhost:14267"
thrift-compact:
endpoint: "localhost:6831"
thrift-binary:
endpoint: "localhost:6832"

prometheus:
config:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/stretchr/testify v1.4.0
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible // indirect
github.com/uber/jaeger-lib v2.0.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible
github.com/uber/tchannel-go v1.10.0
go.opencensus.io v0.22.1
go.uber.org/zap v1.10.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:Fv9bK1Q+ly/ROk4aJsVMeuIwPel4bEnD8EPiI91nZMg=
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
Expand Down Expand Up @@ -553,6 +554,8 @@ github.com/uber/jaeger-client-go v2.16.0+incompatible h1:Q2Pp6v3QYiocMxomCaJuwQG
github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.0.0+incompatible h1:iMSCV0rmXEogjNWPh2D0xk9YVKvrtGoHJNe9ebLu/pw=
github.com/uber/jaeger-lib v2.0.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/uber/tchannel-go v1.10.0 h1:YOihLHuvkwT3nzvpgqFtexFW+pb5vD1Tz7h/bIWApgE=
github.com/uber/tchannel-go v1.10.0/go.mod h1:Rrgz1eL8kMjW/nEzZos0t+Heq0O4LhnUJVA32OvWKHo=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
Expand Down
22 changes: 22 additions & 0 deletions internal/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package testutils

import (
"encoding/json"
"fmt"
"net"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -67,3 +69,23 @@ func GetAvailablePort(t *testing.T) uint16 {

return uint16(portInt)
}

// WaitForPort repeatedly attempts to open a local port until it either succeeds or 5 seconds pass
// It is useful if you need to asynchronously start a service and wait for it to start
func WaitForPort(t *testing.T, port uint16) error {
t.Helper()

totalDuration := 5 * time.Second
wait := 100 * time.Millisecond
address := fmt.Sprintf("localhost:%d", port)
for i := totalDuration; i > 0; i -= wait {
conn, err := net.Dial("tcp", address)

if err == nil && conn != nil {
conn.Close()
return nil
}
time.Sleep(wait)
}
return fmt.Errorf("failed to wait for port %d", port)
}
17 changes: 17 additions & 0 deletions internal/testutils/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package testutils

import (
"fmt"
"net"
"strconv"
"testing"
Expand All @@ -33,6 +34,22 @@ func TestGetAvailablePort(t *testing.T) {
testEndpointAvailable(t, "localhost:"+portStr)
}

func TestWaitForPort(t *testing.T) {
port := GetAvailablePort(t)
err := WaitForPort(t, port)
require.Error(t, err)

port = GetAvailablePort(t)
l, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
require.NoError(t, err)

err = WaitForPort(t, port)
require.NoError(t, err)

err = l.Close()
require.NoError(t, err)
}

func testEndpointAvailable(t *testing.T, endpoint string) {
// Endpoint should be free.
ln0, err := net.Listen("tcp", endpoint)
Expand Down
8 changes: 7 additions & 1 deletion receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ This receiver receives traces in the [Jaeger](https://www.jaegertracing.io)
format. It translates them into the internal format and sends
it to processors and exporters.

It supports multiple protocols:
It supports the Jaeger Collector protocols:
- Thrift HTTP
- Thrift TChannel
- gRPC
Expand All @@ -120,6 +120,12 @@ receivers:
jaeger:
```

It also supports the Jaeger Agent protocols:
- Thrift Compact
- Thrift Binary

By default, these services are not started unless an endpoint is explicitly defined.

It is possible to configure the protocols on different ports, refer to
[config.yaml](jaegerreceiver/testdata/config.yaml) for detailed config
examples.
Expand Down
10 changes: 10 additions & 0 deletions receiver/jaegerreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ func TestLoadConfig(t *testing.T) {
Endpoint: "0.0.0.0:123",
},
},
"thrift-compact": {
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "0.0.0.0:456",
},
},
"thrift-binary": {
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "0.0.0.0:789",
},
},
},
})

Expand Down
30 changes: 26 additions & 4 deletions receiver/jaegerreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
// TODO https:/open-telemetry/opentelemetry-collector/issues/267
// Remove ThriftTChannel support.
protoThriftTChannel = "thrift-tchannel"
protoThriftBinary = "thrift-binary"
protoThriftCompact = "thrift-compact"

// Default endpoints to bind to.
defaultGRPCBindEndpoint = "localhost:14250"
Expand Down Expand Up @@ -103,6 +105,8 @@ func (f *Factory) CreateTraceReceiver(
protoGRPC := rCfg.Protocols[protoGRPC]
protoHTTP := rCfg.Protocols[protoThriftHTTP]
protoTChannel := rCfg.Protocols[protoThriftTChannel]
protoThriftCompact := rCfg.Protocols[protoThriftCompact]
protoThriftBinary := rCfg.Protocols[protoThriftBinary]

config := Configuration{}
var grpcServerOptions []grpc.ServerOption
Expand Down Expand Up @@ -141,19 +145,37 @@ func (f *Factory) CreateTraceReceiver(
}
}

if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil) ||
(config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0) {
err := fmt.Errorf("either %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver",
if protoThriftBinary != nil && protoThriftBinary.IsEnabled() {
var err error
config.AgentBinaryThriftPort, err = extractPortFromEndpoint(protoThriftBinary.Endpoint)
if err != nil {
return nil, err
}
}

if protoThriftCompact != nil && protoThriftCompact.IsEnabled() {
var err error
config.AgentCompactThriftPort, err = extractPortFromEndpoint(protoThriftCompact.Endpoint)
if err != nil {
return nil, err
}
}

if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil && protoThriftBinary == nil && protoThriftCompact == nil) ||
(config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0 && config.AgentBinaryThriftPort == 0 && config.AgentCompactThriftPort == 0) {
err := fmt.Errorf("either %v, %v, %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver",
protoGRPC,
protoThriftHTTP,
protoThriftTChannel,
protoThriftCompact,
protoThriftBinary,
typeStr,
)
return nil, err
}

// Create the receiver.
return New(ctx, &config, nextConsumer)
return New(ctx, &config, nextConsumer, logger)
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
Expand Down
30 changes: 30 additions & 0 deletions receiver/jaegerreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/receiver"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand Down Expand Up @@ -75,6 +77,34 @@ func TestCreateInvalidTChannelEndpoint(t *testing.T) {
assert.Error(t, err, "receiver creation with invalid tchannel endpoint must fail")
}

func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

rCfg.Protocols[protoThriftBinary] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "",
},
}
_, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
assert.Error(t, err, "receiver creation with no endpoints must fail")
}

func TestCreateInvalidThriftCompactEndpoint(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

rCfg.Protocols[protoThriftCompact] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "",
},
}
_, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, nil)
assert.Error(t, err, "receiver creation with no endpoints must fail")
}

func TestCreateNoPort(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
Expand Down
Loading