diff --git a/beater/beater.go b/beater/beater.go index 8b9ff158514..f25b29b4be5 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -18,8 +18,11 @@ package beater import ( + "bytes" "context" + "encoding/json" "fmt" + "io/ioutil" "net" "net/http" "os" @@ -28,7 +31,6 @@ import ( "sync" "time" - "github.com/elastic/beats/v7/libbeat/common/fleetmode" "github.com/elastic/beats/v7/libbeat/common/transport" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/go-ucfg" @@ -121,9 +123,6 @@ func NewCreator(args CreatorParams) beat.Creator { if b.Manager != nil && b.Manager.Enabled() { return nil, errors.New("data streams must be enabled when the server is managed") } - } else if bt.config.DataStreams.Enabled && !fleetmode.Enabled() { - // not supported only available for development purposes - bt.logger.Errorf("Started apm-server with data streams enabled but no active fleet management mode was specified") } if err := bt.registerPipelineCallback(b); err != nil { @@ -265,7 +264,11 @@ func (r *reloader) reload(rawConfig *common.Config, namespace string, fleetConfi if err != nil { return err } - go runner.run() + go func() { + if err := runner.run(); err != nil { + r.args.Logger.Error(err) + } + }() // If the old runner exists, cancel it if r.runner != nil { r.runner.cancelRunServerContext() @@ -355,19 +358,42 @@ func (s *serverRunner) run() error { Namespace: s.namespace, } + var kibanaClient kibana_client.Client + if s.config.Kibana.Enabled { + kibanaClient = kibana_client.NewConnectingClient(&s.config.Kibana) + } + cfg := ucfg.Config(*s.rawConfig) parentCfg := cfg.Parent() // Check for an environment variable set when running in a cloud environment if eac := os.Getenv("ELASTIC_AGENT_CLOUD"); eac != "" && s.config.Kibana.Enabled { // Don't block server startup sending the config. go func() { - c := kibana_client.NewConnectingClient(&s.config.Kibana) - if err := kibana_client.SendConfig(s.runServerContext, c, parentCfg); err != nil { + if err := kibana_client.SendConfig(s.runServerContext, kibanaClient, parentCfg); err != nil { s.logger.Infof("failed to upload config to kibana: %v", err) } }() } + fleetManaged := s.beat.Manager != nil && s.beat.Manager.Enabled() + if !fleetManaged && s.config.DataStreams.Enabled && s.config.DataStreams.WaitForIntegration { + // TODO(axw) we should also try querying Elasticsearch in parallel + // (e.g. check for an index template created), for the case where + // there is no Kibana configuration. + if !s.config.Kibana.Enabled { + return errors.New("cannot wait for integration without Kibana config") + } + if err := waitForIntegration( + s.runServerContext, + kibanaClient, + s.config.DataStreams.WaitForIntegrationInterval, + s.tracer, + s.logger, + ); err != nil { + return errors.Wrap(err, "error waiting for integration") + } + } + var sourcemapStore *sourcemap.Store if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled { store, err := newSourcemapStore(s.beat.Info, s.config.RumConfig.SourceMapping, s.fleetConfig) @@ -674,3 +700,59 @@ func (p *reporterBatchProcessor) ProcessBatch(ctx context.Context, batch *model. disableTracing, _ := ctx.Value(disablePublisherTracingKey{}).(bool) return p.reporter(ctx, publish.PendingReq{Transformable: batch, Trace: !disableTracing}) } + +// waitForIntegration waits for the APM integration to be installed by querying Kibana, +// or for the context to be cancelled. +func waitForIntegration( + ctx context.Context, + kibanaClient kibana_client.Client, + interval time.Duration, + tracer *apm.Tracer, + logger *logp.Logger, +) error { + logger.Info("waiting for integration package to be installed") + tx := tracer.StartTransaction("wait_for_integration", "init") + ctx = apm.ContextWithTransaction(ctx, tx) + var ticker *time.Ticker + for { + if ticker == nil { + ticker = time.NewTicker(interval) + defer ticker.Stop() + } else { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } + if checkIntegrationInstalled(ctx, kibanaClient, logger) { + return nil + } + } +} + +func checkIntegrationInstalled(ctx context.Context, kibanaClient kibana_client.Client, logger *logp.Logger) bool { + resp, err := kibanaClient.Send(ctx, "GET", "/api/fleet/epm/packages/apm", nil, nil, nil) + if err != nil { + logger.Errorf("error querying integration package status: %s", err) + return false + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + logger.Errorf("unexpected status querying integration package status: %s (%s)", resp.Status, bytes.TrimSpace(body)) + return false + } + var result struct { + Response struct { + Status string `json:"status"` + } `json:"response"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + logger.Errorf("error decoding integration package response: %s", err) + return false + } + logger.Infof("integration package status: %s", result.Response.Status) + return result.Response.Status == "installed" +} diff --git a/beater/config/config_test.go b/beater/config/config_test.go index b316352f553..c095d3ae617 100644 --- a/beater/config/config_test.go +++ b/beater/config/config_test.go @@ -289,6 +289,11 @@ func TestUnpackConfig(t *testing.T) { }, }, DefaultServiceEnvironment: "overridden", + DataStreams: DataStreamsConfig{ + Enabled: false, + WaitForIntegration: true, + WaitForIntegrationInterval: 5 * time.Second, + }, }, }, "merge config with default": { @@ -342,6 +347,7 @@ func TestUnpackConfig(t *testing.T) { "interval": "2m", "ingest_rate_decay": 1.0, }, + "data_streams.wait_for_integration": false, }, outCfg: &Config{ Host: "localhost:3000", @@ -472,6 +478,11 @@ func TestUnpackConfig(t *testing.T) { TTL: 30 * time.Minute, }, }, + DataStreams: DataStreamsConfig{ + Enabled: false, + WaitForIntegration: false, + WaitForIntegrationInterval: 5 * time.Second, + }, }, }, "kibana trailing slash": { diff --git a/beater/config/data_streams.go b/beater/config/data_streams.go index 1faf6144617..a9771e984e0 100644 --- a/beater/config/data_streams.go +++ b/beater/config/data_streams.go @@ -17,11 +17,30 @@ package config +import "time" + // DataStreamsConfig holds data streams configuration. type DataStreamsConfig struct { Enabled bool `config:"enabled"` + + // WaitForIntegration controls whether APM Server waits for the Fleet + // integration package to be installed before indexing events. + // + // This requires a connection to Kibana, and is ignored when running + // under Elastic Agent; it is intended for running APM Server standalone, + // relying on Fleet to install the integration for creating Elasticsearch + // index templates, ILM policies, and ingest pipelines. + WaitForIntegration bool `config:"wait_for_integration"` + + // WaitForIntegrationInterval holds the interval for checks when waiting + // for the integration package to be installed. + WaitForIntegrationInterval time.Duration `config:"wait_for_integration_interval"` } func defaultDataStreamsConfig() DataStreamsConfig { - return DataStreamsConfig{Enabled: false} + return DataStreamsConfig{ + Enabled: false, + WaitForIntegration: true, + WaitForIntegrationInterval: 5 * time.Second, + } } diff --git a/beater/server_test.go b/beater/server_test.go index 9bb6c2fab32..c191891fb44 100644 --- a/beater/server_test.go +++ b/beater/server_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "net" "net/http" + "net/http/httptest" "net/url" "os" "reflect" @@ -522,6 +523,47 @@ func TestServerConfigReload(t *testing.T) { assert.Error(t, err) } +func TestServerWaitForIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping slow test") + } + + var requests int + mux := http.NewServeMux() + mux.HandleFunc("/api/status", func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(`{"version":{"number":"1.2.3"}}`)) + }) + mux.HandleFunc("/api/fleet/epm/packages/apm", func(w http.ResponseWriter, r *http.Request) { + requests++ + switch requests { + case 1: + w.WriteHeader(500) + case 2: + fmt.Fprintln(w, `{"response":{"status":"not_installed"}}`) + case 3: + fmt.Fprintln(w, `{"response":{"status":"installed"}}`) + } + }) + srv := httptest.NewServer(mux) + defer srv.Close() + + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "data_streams.enabled": true, + "data_streams.wait_for_integration_interval": "100ms", + "kibana.enabled": true, + "kibana.host": srv.URL, + }) + + beat, cfg := newBeat(t, cfg, nil, nil) + tb, err := newTestBeater(t, beat, cfg, nil) + require.NoError(t, err) + tb.start() + + _, err = tb.waitListenAddr(30 * time.Second) + require.NoError(t, err) + assert.Equal(t, 3, requests) +} + type chanClient struct { done chan struct{} Channel chan beat.Event diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index ced33151739..9ff42d6d070 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -22,6 +22,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits] ==== Added - `service_destination` span metrics now take into account composite spans {pull}5896[5896] - add zero-downtime config reloads via `SO_REUSEPORT` {pull}5911[5911] +- experimental support for writing data streams in standalone mode {pull}5928[5928] [float] ==== Deprecated diff --git a/docs/configuration-process.asciidoc b/docs/configuration-process.asciidoc index 0304be8889e..148c2986ab2 100644 --- a/docs/configuration-process.asciidoc +++ b/docs/configuration-process.asciidoc @@ -184,3 +184,22 @@ Default value is 1 second. ==== `max_procs` Sets the maximum number of CPUs that can be executing simultaneously. The default is the number of logical CPUs available in the system. + +[float] +=== Configuration options: `data_streams` + +experimental::[] + +[[data_streams.enabled]] +[float] +==== `enabled` +Write events to Elasticsearch data streams instead of indices. +Events will be written to `traces-*`, `logs-*`, and `metrics-*` data streams. +Enabling data streams disables the setup of index templates, ILM policies, and ingest pipelines. +Defaults to false. + +[[data_streams.wait_for_integration]] +[float] +==== `wait_for_integration` +Wait for the `apm` Fleet integration to be installed by Kibana. Requires <>. +Defaults to true.