Skip to content

Commit

Permalink
[7.x] Add data_streams.wait_for_integration (backport #5928) (#5929)
Browse files Browse the repository at this point in the history
* Add `data_streams.wait_for_integration` (#5928)

(cherry picked from commit 114d7d6)

# Conflicts:
#	beater/beater.go
#	changelogs/head.asciidoc

* Delete head.asciidoc

* Fix merge conflicts

Co-authored-by: Andrew Wilkins <[email protected]>
  • Loading branch information
mergify[bot] and axw authored Aug 12, 2021
1 parent 5228d81 commit a10e68f
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 8 deletions.
96 changes: 89 additions & 7 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package beater

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
Expand All @@ -28,7 +31,6 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common/fleetmode"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/go-ucfg"
Expand Down Expand Up @@ -121,9 +123,6 @@ func NewCreator(args CreatorParams) beat.Creator {
if b.Manager != nil && b.Manager.Enabled() {
return nil, errors.New("data streams must be enabled when the server is managed")
}
} else if bt.config.DataStreams.Enabled && !fleetmode.Enabled() {
// not supported only available for development purposes
bt.logger.Errorf("Started apm-server with data streams enabled but no active fleet management mode was specified")
}

if err := bt.registerPipelineCallback(b); err != nil {
Expand Down Expand Up @@ -265,7 +264,11 @@ func (r *reloader) reload(rawConfig *common.Config, namespace string, fleetConfi
if err != nil {
return err
}
go runner.run()
go func() {
if err := runner.run(); err != nil {
r.args.Logger.Error(err)
}
}()
// If the old runner exists, cancel it
if r.runner != nil {
r.runner.cancelRunServerContext()
Expand Down Expand Up @@ -355,19 +358,42 @@ func (s *serverRunner) run() error {
Namespace: s.namespace,
}

var kibanaClient kibana_client.Client
if s.config.Kibana.Enabled {
kibanaClient = kibana_client.NewConnectingClient(&s.config.Kibana)
}

cfg := ucfg.Config(*s.rawConfig)
parentCfg := cfg.Parent()
// Check for an environment variable set when running in a cloud environment
if eac := os.Getenv("ELASTIC_AGENT_CLOUD"); eac != "" && s.config.Kibana.Enabled {
// Don't block server startup sending the config.
go func() {
c := kibana_client.NewConnectingClient(&s.config.Kibana)
if err := kibana_client.SendConfig(s.runServerContext, c, parentCfg); err != nil {
if err := kibana_client.SendConfig(s.runServerContext, kibanaClient, parentCfg); err != nil {
s.logger.Infof("failed to upload config to kibana: %v", err)
}
}()
}

fleetManaged := s.beat.Manager != nil && s.beat.Manager.Enabled()
if !fleetManaged && s.config.DataStreams.Enabled && s.config.DataStreams.WaitForIntegration {
// TODO(axw) we should also try querying Elasticsearch in parallel
// (e.g. check for an index template created), for the case where
// there is no Kibana configuration.
if !s.config.Kibana.Enabled {
return errors.New("cannot wait for integration without Kibana config")
}
if err := waitForIntegration(
s.runServerContext,
kibanaClient,
s.config.DataStreams.WaitForIntegrationInterval,
s.tracer,
s.logger,
); err != nil {
return errors.Wrap(err, "error waiting for integration")
}
}

var sourcemapStore *sourcemap.Store
if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled {
store, err := newSourcemapStore(s.beat.Info, s.config.RumConfig.SourceMapping, s.fleetConfig)
Expand Down Expand Up @@ -673,3 +699,59 @@ func (p *reporterBatchProcessor) ProcessBatch(ctx context.Context, batch *model.
disableTracing, _ := ctx.Value(disablePublisherTracingKey{}).(bool)
return p.reporter(ctx, publish.PendingReq{Transformable: batch, Trace: !disableTracing})
}

// waitForIntegration waits for the APM integration to be installed by querying Kibana,
// or for the context to be cancelled.
func waitForIntegration(
ctx context.Context,
kibanaClient kibana_client.Client,
interval time.Duration,
tracer *apm.Tracer,
logger *logp.Logger,
) error {
logger.Info("waiting for integration package to be installed")
tx := tracer.StartTransaction("wait_for_integration", "init")
ctx = apm.ContextWithTransaction(ctx, tx)
var ticker *time.Ticker
for {
if ticker == nil {
ticker = time.NewTicker(interval)
defer ticker.Stop()
} else {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
if checkIntegrationInstalled(ctx, kibanaClient, logger) {
return nil
}
}
}

func checkIntegrationInstalled(ctx context.Context, kibanaClient kibana_client.Client, logger *logp.Logger) bool {
resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil)
if err != nil {
logger.Errorf("error querying integration package status: %s", err)
return false
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
logger.Errorf("unexpected status querying integration package status: %s (%s)", resp.Status, bytes.TrimSpace(body))
return false
}
var result struct {
Response struct {
Status string `json:"status"`
} `json:"response"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
logger.Errorf("error decoding integration package response: %s", err)
return false
}
logger.Infof("integration package status: %s", result.Response.Status)
return result.Response.Status == "installed"
}
11 changes: 11 additions & 0 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ func TestUnpackConfig(t *testing.T) {
},
},
DefaultServiceEnvironment: "overridden",
DataStreams: DataStreamsConfig{
Enabled: false,
WaitForIntegration: true,
WaitForIntegrationInterval: 5 * time.Second,
},
},
},
"merge config with default": {
Expand Down Expand Up @@ -342,6 +347,7 @@ func TestUnpackConfig(t *testing.T) {
"interval": "2m",
"ingest_rate_decay": 1.0,
},
"data_streams.wait_for_integration": false,
},
outCfg: &Config{
Host: "localhost:3000",
Expand Down Expand Up @@ -472,6 +478,11 @@ func TestUnpackConfig(t *testing.T) {
TTL: 30 * time.Minute,
},
},
DataStreams: DataStreamsConfig{
Enabled: false,
WaitForIntegration: false,
WaitForIntegrationInterval: 5 * time.Second,
},
},
},
"kibana trailing slash": {
Expand Down
21 changes: 20 additions & 1 deletion beater/config/data_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,30 @@

package config

import "time"

// DataStreamsConfig holds data streams configuration.
type DataStreamsConfig struct {
Enabled bool `config:"enabled"`

// WaitForIntegration controls whether APM Server waits for the Fleet
// integration package to be installed before indexing events.
//
// This requires a connection to Kibana, and is ignored when running
// under Elastic Agent; it is intended for running APM Server standalone,
// relying on Fleet to install the integration for creating Elasticsearch
// index templates, ILM policies, and ingest pipelines.
WaitForIntegration bool `config:"wait_for_integration"`

// WaitForIntegrationInterval holds the interval for checks when waiting
// for the integration package to be installed.
WaitForIntegrationInterval time.Duration `config:"wait_for_integration_interval"`
}

func defaultDataStreamsConfig() DataStreamsConfig {
return DataStreamsConfig{Enabled: false}
return DataStreamsConfig{
Enabled: false,
WaitForIntegration: true,
WaitForIntegrationInterval: 5 * time.Second,
}
}
42 changes: 42 additions & 0 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
Expand Down Expand Up @@ -522,6 +523,47 @@ func TestServerConfigReload(t *testing.T) {
assert.Error(t, err)
}

func TestServerWaitForIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping slow test")
}

var requests int
mux := http.NewServeMux()
mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"version":{"number":"1.2.3"}}`))
})
mux.HandleFunc("/api/fleet/epm/packages/apm", func(w http.ResponseWriter, r *http.Request) {
requests++
switch requests {
case 1:
w.WriteHeader(500)
case 2:
fmt.Fprintln(w, `{"response":{"status":"not_installed"}}`)
case 3:
fmt.Fprintln(w, `{"response":{"status":"installed"}}`)
}
})
srv := httptest.NewServer(mux)
defer srv.Close()

cfg := common.MustNewConfigFrom(map[string]interface{}{
"data_streams.enabled": true,
"data_streams.wait_for_integration_interval": "100ms",
"kibana.enabled": true,
"kibana.host": srv.URL,
})

beat, cfg := newBeat(t, cfg, nil, nil)
tb, err := newTestBeater(t, beat, cfg, nil)
require.NoError(t, err)
tb.start()

_, err = tb.waitListenAddr(30 * time.Second)
require.NoError(t, err)
assert.Equal(t, 3, requests)
}

type chanClient struct {
done chan struct{}
Channel chan beat.Event
Expand Down
19 changes: 19 additions & 0 deletions docs/configuration-process.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,22 @@ Default value is 1 second.
==== `max_procs`
Sets the maximum number of CPUs that can be executing simultaneously.
The default is the number of logical CPUs available in the system.

[float]
=== Configuration options: `data_streams`

experimental::[]

[[data_streams.enabled]]
[float]
==== `enabled`
Write events to Elasticsearch data streams instead of indices.
Events will be written to `traces-*`, `logs-*`, and `metrics-*` data streams.
Enabling data streams disables the setup of index templates, ILM policies, and ingest pipelines.
Defaults to false.

[[data_streams.wait_for_integration]]
[float]
==== `wait_for_integration`
Wait for the `apm` Fleet integration to be installed by Kibana. Requires <<kibana-enabled>>.
Defaults to true.

0 comments on commit a10e68f

Please sign in to comment.