Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data_streams.wait_for_integration #5928

Merged
merged 1 commit into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 89 additions & 7 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package beater

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand All @@ -28,7 +31,6 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common/fleetmode"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/go-ucfg"
Expand Down Expand Up @@ -121,9 +123,6 @@ func NewCreator(args CreatorParams) beat.Creator {
if b.Manager != nil && b.Manager.Enabled() {
return nil, errors.New("data streams must be enabled when the server is managed")
}
} else if bt.config.DataStreams.Enabled && !fleetmode.Enabled() {
// not supported only available for development purposes
bt.logger.Errorf("Started apm-server with data streams enabled but no active fleet management mode was specified")
}

if err := bt.registerPipelineCallback(b); err != nil {
Expand Down Expand Up @@ -265,7 +264,11 @@ func (r *reloader) reload(rawConfig *common.Config, namespace string, fleetConfi
if err != nil {
return err
}
go runner.run()
go func() {
if err := runner.run(); err != nil {
r.args.Logger.Error(err)
}
}()
// If the old runner exists, cancel it
if r.runner != nil {
r.runner.cancelRunServerContext()
Expand Down Expand Up @@ -355,19 +358,42 @@ func (s *serverRunner) run() error {
Namespace: s.namespace,
}

var kibanaClient kibana_client.Client
if s.config.Kibana.Enabled {
kibanaClient = kibana_client.NewConnectingClient(&s.config.Kibana)
}

cfg := ucfg.Config(*s.rawConfig)
parentCfg := cfg.Parent()
// Check for an environment variable set when running in a cloud environment
if eac := os.Getenv("ELASTIC_AGENT_CLOUD"); eac != "" && s.config.Kibana.Enabled {
// Don't block server startup sending the config.
go func() {
c := kibana_client.NewConnectingClient(&s.config.Kibana)
if err := kibana_client.SendConfig(s.runServerContext, c, parentCfg); err != nil {
if err := kibana_client.SendConfig(s.runServerContext, kibanaClient, parentCfg); err != nil {
s.logger.Infof("failed to upload config to kibana: %v", err)
}
}()
}

fleetManaged := s.beat.Manager != nil && s.beat.Manager.Enabled()
if !fleetManaged && s.config.DataStreams.Enabled && s.config.DataStreams.WaitForIntegration {
// TODO(axw) we should also try querying Elasticsearch in parallel
// (e.g. check for an index template created), for the case where
// there is no Kibana configuration.
if !s.config.Kibana.Enabled {
return errors.New("cannot wait for integration without Kibana config")
}
if err := waitForIntegration(
s.runServerContext,
kibanaClient,
s.config.DataStreams.WaitForIntegrationInterval,
s.tracer,
s.logger,
); err != nil {
return errors.Wrap(err, "error waiting for integration")
}
}

var sourcemapStore *sourcemap.Store
if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled {
store, err := newSourcemapStore(s.beat.Info, s.config.RumConfig.SourceMapping, s.fleetConfig)
Expand Down Expand Up @@ -674,3 +700,59 @@ func (p *reporterBatchProcessor) ProcessBatch(ctx context.Context, batch *model.
disableTracing, _ := ctx.Value(disablePublisherTracingKey{}).(bool)
return p.reporter(ctx, publish.PendingReq{Transformable: batch, Trace: !disableTracing})
}

// waitForIntegration waits for the APM integration to be installed by querying Kibana,
// or for the context to be cancelled.
func waitForIntegration(
ctx context.Context,
kibanaClient kibana_client.Client,
interval time.Duration,
tracer *apm.Tracer,
logger *logp.Logger,
) error {
logger.Info("waiting for integration package to be installed")
tx := tracer.StartTransaction("wait_for_integration", "init")
ctx = apm.ContextWithTransaction(ctx, tx)
var ticker *time.Ticker
for {
if ticker == nil {
ticker = time.NewTicker(interval)
defer ticker.Stop()
} else {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
if checkIntegrationInstalled(ctx, kibanaClient, logger) {
return nil
}
}
}

func checkIntegrationInstalled(ctx context.Context, kibanaClient kibana_client.Client, logger *logp.Logger) bool {
resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil)
if err != nil {
logger.Errorf("error querying integration package status: %s", err)
return false
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
logger.Errorf("unexpected status querying integration package status: %s (%s)", resp.Status, bytes.TrimSpace(body))
return false
}
var result struct {
Response struct {
Status string `json:"status"`
} `json:"response"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
logger.Errorf("error decoding integration package response: %s", err)
return false
}
logger.Infof("integration package status: %s", result.Response.Status)
return result.Response.Status == "installed"
}
11 changes: 11 additions & 0 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ func TestUnpackConfig(t *testing.T) {
},
},
DefaultServiceEnvironment: "overridden",
DataStreams: DataStreamsConfig{
Enabled: false,
WaitForIntegration: true,
WaitForIntegrationInterval: 5 * time.Second,
},
},
},
"merge config with default": {
Expand Down Expand Up @@ -342,6 +347,7 @@ func TestUnpackConfig(t *testing.T) {
"interval": "2m",
"ingest_rate_decay": 1.0,
},
"data_streams.wait_for_integration": false,
},
outCfg: &Config{
Host: "localhost:3000",
Expand Down Expand Up @@ -472,6 +478,11 @@ func TestUnpackConfig(t *testing.T) {
TTL: 30 * time.Minute,
},
},
DataStreams: DataStreamsConfig{
Enabled: false,
WaitForIntegration: false,
WaitForIntegrationInterval: 5 * time.Second,
},
},
},
"kibana trailing slash": {
Expand Down
21 changes: 20 additions & 1 deletion beater/config/data_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,30 @@

package config

import "time"

// DataStreamsConfig holds data streams configuration.
type DataStreamsConfig struct {
Enabled bool `config:"enabled"`

// WaitForIntegration controls whether APM Server waits for the Fleet
// integration package to be installed before indexing events.
//
// This requires a connection to Kibana, and is ignored when running
// under Elastic Agent; it is intended for running APM Server standalone,
// relying on Fleet to install the integration for creating Elasticsearch
// index templates, ILM policies, and ingest pipelines.
WaitForIntegration bool `config:"wait_for_integration"`

// WaitForIntegrationInterval holds the interval for checks when waiting
// for the integration package to be installed.
WaitForIntegrationInterval time.Duration `config:"wait_for_integration_interval"`
}

func defaultDataStreamsConfig() DataStreamsConfig {
return DataStreamsConfig{Enabled: false}
return DataStreamsConfig{
Enabled: false,
WaitForIntegration: true,
WaitForIntegrationInterval: 5 * time.Second,
}
}
42 changes: 42 additions & 0 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
Expand Down Expand Up @@ -522,6 +523,47 @@ func TestServerConfigReload(t *testing.T) {
assert.Error(t, err)
}

func TestServerWaitForIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow test")
}

var requests int
mux := http.NewServeMux()
mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"version":{"number":"1.2.3"}}`))
})
mux.HandleFunc("/api/fleet/epm/packages/apm", func(w http.ResponseWriter, r *http.Request) {
requests++
switch requests {
case 1:
w.WriteHeader(500)
case 2:
fmt.Fprintln(w, `{"response":{"status":"not_installed"}}`)
case 3:
fmt.Fprintln(w, `{"response":{"status":"installed"}}`)
}
})
srv := httptest.NewServer(mux)
defer srv.Close()

cfg := common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"data_streams.wait_for_integration_interval": "100ms",
"kibana.enabled": true,
"kibana.host": srv.URL,
})

beat, cfg := newBeat(t, cfg, nil, nil)
tb, err := newTestBeater(t, beat, cfg, nil)
require.NoError(t, err)
tb.start()

_, err = tb.waitListenAddr(30 * time.Second)
require.NoError(t, err)
assert.Equal(t, 3, requests)
}

type chanClient struct {
done chan struct{}
Channel chan beat.Event
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ https:/elastic/apm-server/compare/7.13\...master[View commits]
==== Added
- `service_destination` span metrics now take into account composite spans {pull}5896[5896]
- add zero-downtime config reloads via `SO_REUSEPORT` {pull}5911[5911]
- experimental support for writing data streams in standalone mode {pull}5928[5928]

[float]
==== Deprecated
19 changes: 19 additions & 0 deletions docs/configuration-process.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,22 @@ Default value is 1 second.
==== `max_procs`
Sets the maximum number of CPUs that can be executing simultaneously.
The default is the number of logical CPUs available in the system.

[float]
=== Configuration options: `data_streams`

experimental::[]

[[data_streams.enabled]]
[float]
==== `enabled`
Write events to Elasticsearch data streams instead of indices.
Events will be written to `traces-*`, `logs-*`, and `metrics-*` data streams.
Enabling data streams disables the setup of index templates, ILM policies, and ingest pipelines.
Defaults to false.

[[data_streams.wait_for_integration]]
[float]
==== `wait_for_integration`
Wait for the `apm` Fleet integration to be installed by Kibana. Requires <<kibana-enabled>>.
Defaults to true.