Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Jaeger gRPC collector #2976

Merged
merged 21 commits into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1,079 changes: 986 additions & 93 deletions NOTICE.txt

Large diffs are not rendered by default.

65 changes: 9 additions & 56 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,25 @@
package beater

import (
"bufio"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
"sync"
"time"

"go.elastic.co/apm"
"go.elastic.co/apm/transport"
"golang.org/x/sync/errgroup"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"go.elastic.co/apm"
"go.elastic.co/apm/transport"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/ingest/pipeline"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/pipelistener"
"github.com/elastic/apm-server/publish"
)
Expand All @@ -51,10 +47,10 @@ func init() {

type beater struct {
config *config.Config
logger *logp.Logger
mutex sync.Mutex // guards server and stopped
server *http.Server
server server
stopped bool
logger *logp.Logger
}

var (
Expand Down Expand Up @@ -188,56 +184,15 @@ func (bt *beater) Run(b *beat.Beat) error {
return nil
}

bt.server, err = newServer(bt.config, tracer, pub.Send)
bt.server, err = newServer(bt.logger, bt.config, tracer, pub.Send)
if err != nil {
bt.logger.Error("failed to create new server:", err)
return nil
}
bt.mutex.Unlock()

var g errgroup.Group
g.Go(func() error {
return run(bt.logger, bt.server, lis, bt.config)
})

if bt.isServerAvailable(bt.config.ShutdownTimeout) {
go notifyListening(bt.config, pub.Client().Publish)
}

if traceListener != nil {
g.Go(func() error {
return bt.server.Serve(traceListener)
})
}

if err := g.Wait(); err != http.ErrServerClosed {
return err
}
bt.logger.Infof("Server stopped")
return nil
}

func (bt *beater) isServerAvailable(timeout time.Duration) bool {
// following an example from https://golang.org/pkg/net/
// dial into tcp connection to ensure listener is ready, send get request and read response,
// in case tls is enabled, the server will respond with 400,
// as this only checks the server is up and reachable errors can be ignored
conn, err := net.DialTimeout("tcp", bt.config.Host, timeout)
if err != nil {
return false
}
err = conn.SetReadDeadline(time.Now().Add(timeout))
if err != nil {
return false
}
fmt.Fprintf(conn, "GET / HTTP/1.0\r\n\r\n")
_, err = bufio.NewReader(conn).ReadByte()
if err != nil {
return false
}

err = conn.Close()
return err == nil
//blocking until shutdown
return bt.server.run(lis, traceListener, pub.Client().Publish)
}

// initTracer configures and returns an apm.Tracer for tracing
Expand Down Expand Up @@ -335,9 +290,7 @@ func (bt *beater) Stop() {
bt.logger.Infof("stopping apm-server... waiting maximum of %v seconds for queues to drain",
bt.config.ShutdownTimeout.Seconds())
bt.mutex.Lock()
if bt.server != nil {
stop(bt.logger, bt.server)
}
bt.server.stop(bt.logger)
bt.stopped = true
bt.mutex.Unlock()
}
4 changes: 2 additions & 2 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (bt *beater) client(insecure bool) (string, *http.Client) {

bt.mutex.Lock() // for reading bt.server
defer bt.mutex.Unlock()
if parsed, err := url.Parse(bt.server.Addr); err == nil && parsed.Scheme == "unix" {
if parsed, err := url.Parse(bt.server.httpServer.Addr); err == nil && parsed.Scheme == "unix" {
transport.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", parsed.Path)
}
Expand All @@ -81,7 +81,7 @@ func (bt *beater) wait() error {
go func() {
for {
bt.mutex.Lock()
if bt.server != nil {
if bt.server.httpServer != nil {
bt.mutex.Unlock()
break
}
Expand Down
124 changes: 124 additions & 0 deletions beater/http_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"context"
"net"
"net/http"

"go.elastic.co/apm"
"go.elastic.co/apm/module/apmhttp"
"golang.org/x/net/netutil"

"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/apm-server/beater/api"
"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/publish"
)

type httpServer struct {
*http.Server
cfg *config.Config
logger *logp.Logger
}

func newHTTPServer(logger *logp.Logger, cfg *config.Config, tracer *apm.Tracer, reporter publish.Reporter) (*httpServer, error) {
mux, err := api.NewMux(cfg, reporter)
if err != nil {
return nil, err
}

server := &http.Server{
Addr: cfg.Host,
Handler: apmhttp.Wrap(mux,
apmhttp.WithServerRequestIgnorer(doNotTrace),
apmhttp.WithTracer(tracer),
),
IdleTimeout: cfg.IdleTimeout,
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
MaxHeaderBytes: cfg.MaxHeaderSize,
}

if cfg.TLS.IsEnabled() {
tlsServerConfig, err := tlscommon.LoadTLSServerConfig(cfg.TLS)
if err != nil {
return nil, err
}
server.TLSConfig = tlsServerConfig.BuildModuleConfig(cfg.Host)
}
return &httpServer{server, cfg, logger}, nil
}

func (h *httpServer) start(lis net.Listener) error {
h.logger.Infof("Listening on: %s", h.Server.Addr)
switch h.cfg.RumConfig.IsEnabled() {
case true:
h.logger.Info("RUM endpoints enabled!")
for _, s := range h.cfg.RumConfig.AllowOrigins {
if s == "*" {
h.logger.Warn("CORS related setting `apm-server.rum.allow_origins` allows all origins. Consider more restrictive setting for production use.")
break
}
}
case false:
h.logger.Info("RUM endpoints disabled.")
}

if h.cfg.MaxConnections > 0 {
lis = netutil.LimitListener(lis, h.cfg.MaxConnections)
h.logger.Infof("Connection limit set to: %d", h.cfg.MaxConnections)
}

if h.TLSConfig != nil {
h.logger.Info("SSL enabled.")
return h.ServeTLS(lis, "", "")
}
if h.cfg.SecretToken != "" {
h.logger.Warn("Secret token is set, but SSL is not enabled.")
}
h.logger.Info("SSL disabled.")
return h.Serve(lis)

}

func (h *httpServer) stop() {
h.logger.Infof("Stop listening on: %s", h.Server.Addr)
if err := h.Shutdown(context.Background()); err != nil {
h.logger.Errorf("error stopping http server: %s", err.Error())
if err := h.Close(); err != nil {
h.logger.Errorf("error closing http server: %s", err.Error())
}
}
}

func doNotTrace(req *http.Request) bool {
if req.RemoteAddr == "pipe" {
// Don't trace requests coming from self,
// or we will go into a continuous cycle.
return true
}
if req.URL.Path == api.RootPath {
// Don't trace root url (healthcheck) requests.
return true
}
return false
}
18 changes: 10 additions & 8 deletions beater/onboarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,33 @@ import (
"github.com/stretchr/testify/require"
"go.elastic.co/apm"

"github.com/elastic/apm-server/beater/beatertest"
"github.com/elastic/apm-server/beater/config"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/apm-server/beater/beatertest"
"github.com/elastic/apm-server/beater/config"
)

func TestNotifyUpServerDown(t *testing.T) {
config := config.DefaultConfig("7.0.0")
cfg := config.DefaultConfig("7.0.0")
var saved beat.Event
var publisher = func(e beat.Event) { saved = e }

lis, err := net.Listen("tcp", "localhost:0")
assert.NoError(t, err)
defer lis.Close()
config.Host = lis.Addr().String()
cfg.Host = lis.Addr().String()

server, err := newServer(config, apm.DefaultTracer, beatertest.NilReporter)
logger := logp.NewLogger("onboarding_test")
server, err := newServer(logger, cfg, apm.DefaultTracer, beatertest.NilReporter)
require.NoError(t, err)
go run(logp.NewLogger("onboarding_test"), server, lis, config)
go server.run(lis, nil, publisher)

notifyListening(config, publisher)
notifyListening(cfg, publisher)

listening := saved.Fields["observer"].(common.MapStr)["listening"]
assert.Equal(t, config.Host, listening)
assert.Equal(t, cfg.Host, listening)

processor := saved.Fields["processor"].(common.MapStr)
assert.Equal(t, "onboarding", processor["name"])
Expand Down
Loading