Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix performance issues with processors scaling under agent #35031

Merged
7 changes: 4 additions & 3 deletions auditbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/ecs"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/metricbeat/beater"
"github.com/elastic/beats/v7/metricbeat/mb/module"
Expand Down Expand Up @@ -53,13 +54,13 @@ var withECSVersion = processing.WithFields(mapstr.M{
})

// AuditbeatSettings contains the default settings for auditbeat
func AuditbeatSettings() instance.Settings {
func AuditbeatSettings(globals processors.PluginConfig) instance.Settings {
runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError)
return instance.Settings{
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, globals, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
}
}

Expand All @@ -76,5 +77,5 @@ func Initialize(settings instance.Settings) *cmd.BeatsRootCmd {
}

func init() {
RootCmd = Initialize(AuditbeatSettings())
RootCmd = Initialize(AuditbeatSettings(nil))
}
2 changes: 1 addition & 1 deletion heartbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var withECSVersion = processing.WithFields(mapstr.M{
func HeartbeatSettings() instance.Settings {
return instance.Settings{
Name: Name,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithAgentMeta()),
HasDashboards: false,
}
}
Expand Down
17 changes: 17 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,9 +847,26 @@ func (b *Beat) configure(settings Settings) error {
}
b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig)

b.Manager.RegisterDiagnosticHook("global processors", "a list of currently configured global beat processors",
"global_processors.txt", "text/plain", b.agentDiagnosticHook)

return err
}

// agentDiagnosticHook is the callback function sent to the agent manager RegisterDiagnosticHook function
// right now, this only returns information on the global processors; however, in the future, we might find it useful
// to expand this to other components of the beat state.
// To anyone refactoring: be careful to make sure the callback is registered after the global processors are initialized
func (b *Beat) agentDiagnosticHook() []byte {
list := b.processing.Processors()

var debugBytes []byte
for _, proc := range list {
debugBytes = append(debugBytes, []byte(proc+"\n")...)
}
return debugBytes
}

func (b *Beat) loadMeta(metaPath string) error {
type meta struct {
UUID uuid.UUID `json:"uuid"`
Expand Down
5 changes: 5 additions & 0 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type Manager interface {

// SetPayload Allows to add additional metadata to future requests made by the manager.
SetPayload(map[string]interface{})

// RegisterDiagnosticHook registers a callback for elastic-agent diagnostics
RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook)
}

// ManagerFactory is the factory type for creating a config manager
Expand Down Expand Up @@ -192,3 +195,5 @@ func (n *fallbackManager) CheckRawConfig(cfg *config.C) error { return nil }
func (n *fallbackManager) RegisterAction(action client.Action) {}
func (n *fallbackManager) UnregisterAction(action client.Action) {}
func (n *fallbackManager) SetPayload(map[string]interface{}) {}
func (n *fallbackManager) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ client.DiagnosticHook) {
}
4 changes: 2 additions & 2 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *conf.C) (report
outClient := outputs.NewFailoverClient(clients)
outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max)

processing, err := processing.MakeDefaultSupport(true)(beat, log, conf.NewConfig())
processing, err := processing.MakeDefaultSupport(true, nil)(beat, log, conf.NewConfig())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration,
clusterUUID = getClusterUUID()
}
if clusterUUID != "" {
meta.Put("cluster_uuid", clusterUUID)
_, _ = meta.Put("cluster_uuid", clusterUUID)
}

r.client.Publish(beat.Event{
Expand Down
23 changes: 21 additions & 2 deletions libbeat/processors/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,29 @@

package processors

import "github.com/elastic/elastic-agent-libs/config"
import (
"fmt"

"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

// PluginConfig represents the list of processors.
type PluginConfig []*config.C

// fields that should be always exported
// MandatoryExportedFields are fields that should be always exported
var MandatoryExportedFields = []string{"type"}

// NewPluginConfigFromList creates a PluginConfig from a list of raw processor config objects
func NewPluginConfigFromList(raw []mapstr.M) (PluginConfig, error) {
processors := make([]*config.C, len(raw))
for i := 0; i < len(raw); i++ {
cfg, err := config.NewConfigFrom(raw[i])
if err != nil {
return nil, fmt.Errorf("error creating processor config: %w", err)
}
processors[i] = cfg
}

return processors, nil
}
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func RunTests(
) error {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return fmt.Errorf("unpacking config failed: %v", err)
return fmt.Errorf("unpacking config failed: %w", err)
}

log := logp.L()

processing, err := processing.MakeDefaultSupport(false)(info, log, cfg)
processing, err := processing.MakeDefaultSupport(false, nil)(info, log, cfg)
if err != nil {
return err
}
Expand All @@ -81,7 +81,7 @@ func RunTests(
},
)
if err != nil {
return fmt.Errorf("loading pipeline failed: %+v", err)
return fmt.Errorf("loading pipeline failed: %w", err)
}
defer func() {
log.Info("Stop pipeline")
Expand Down
32 changes: 28 additions & 4 deletions libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/elastic/beats/v7/libbeat/asset"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/fleetmode"
"github.com/elastic/beats/v7/libbeat/ecs"
"github.com/elastic/beats/v7/libbeat/features"
"github.com/elastic/beats/v7/libbeat/mapping"
Expand Down Expand Up @@ -77,14 +78,14 @@ type builtinModifier func(beat.Info) mapstr.M
// MakeDefaultBeatSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields
// to each event.
func MakeDefaultBeatSupport(normalize bool) SupportFactory {
return MakeDefaultSupport(normalize, WithECS, WithHost, WithAgentMeta())
return MakeDefaultSupport(normalize, nil, WithECS, WithHost, WithAgentMeta())
}

// MakeDefaultObserverSupport creates a new SupportFactory based on NewDefaultSupport.
// MakeDefaultObserverSupport automatically adds the `ecs.version` and `observer.X` fields
// to each event.
func MakeDefaultObserverSupport(normalize bool) SupportFactory {
return MakeDefaultSupport(normalize, WithECS, WithObserverMeta())
return MakeDefaultSupport(normalize, nil, WithECS, WithObserverMeta())
}

// MakeDefaultSupport creates a new SupportFactory for use with the publisher pipeline.
Expand All @@ -94,8 +95,11 @@ func MakeDefaultObserverSupport(normalize bool) SupportFactory {
// and `processor` settings to the event processing pipeline to be generated.
// Use WithFields, WithBeatMeta, and other to declare the builtin fields to be added
// to each event. Builtin fields can be modified using global `processors`, and `fields` only.
// the fleetDefaultProcessors argument will set the given global-level processors if the beat is currently running under fleet,
// and no other global-level processors are set.
func MakeDefaultSupport(
normalize bool,
fleetDefaultProcessors processors.PluginConfig,
modifiers ...modifier,
) SupportFactory {
return func(info beat.Info, log *logp.Logger, beatCfg *config.C) (Supporter, error) {
Expand All @@ -107,8 +111,19 @@ func MakeDefaultSupport(
if err := beatCfg.Unpack(&cfg); err != nil {
return nil, err
}
// don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those
// also makes it easier to disable global processors if needed, since they're otherwise hardcoded
var rawProcessors processors.PluginConfig
// don't check the array directly, use HasField, that way processors can easily be bypassed with -E processors=[]
if fleetmode.Enabled() && !beatCfg.HasField("processors") {
log.Debugf("In fleet with processors specified, defaulting to global processors")
rawProcessors = fleetDefaultProcessors

fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
} else {
rawProcessors = cfg.Processors
}

processors, err := processors.New(cfg.Processors)
processors, err := processors.New(rawProcessors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %w", err)
}
Expand All @@ -125,7 +140,7 @@ func WithFields(fields mapstr.M) modifier {
}

// WithECS modifier adds `ecs.version` builtin fields to a processing pipeline.
var WithECS modifier = WithFields(mapstr.M{
var WithECS = WithFields(mapstr.M{
"ecs": mapstr.M{
"version": ecs.Version,
},
Expand Down Expand Up @@ -243,6 +258,15 @@ func newBuilder(
return b, nil
}

// Processors returns a string description of the processor config
func (b *builder) Processors() []string {
procList := []string{}
for _, proc := range b.processors.list {
procList = append(procList, proc.String())
}
return procList
}

// Create combines the builder configuration with the client settings
// in order to build the event processing pipeline.
//
Expand Down
34 changes: 28 additions & 6 deletions libbeat/publisher/processing/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,30 @@ import (
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

_ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_docker_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_host_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata"
)

func TestGenerateProcessorList(t *testing.T) {
inputCfg := []mapstr.M{
{"add_host_metadata": nil},
{"add_cloud_metadata": nil},
{"add_docker_metadata": nil},
{"add_kubernetes_metadata": nil},
}

plugins, err := processors.NewPluginConfigFromList(inputCfg)
require.NoError(t, err)

processors, err := processors.New(plugins)
require.NoError(t, err)
// make sure the processor init got the config formatted in a way it expected
require.Equal(t, 4, len(processors.List))
}

func TestProcessorsConfigs(t *testing.T) {
defaultInfo := beat.Info{
Beat: "test",
Expand Down Expand Up @@ -258,7 +280,7 @@ func TestProcessorsConfigs(t *testing.T) {

factory := test.factory
if factory == nil {
factory = MakeDefaultSupport(true)
factory = MakeDefaultSupport(true, nil)
}

support, err := factory(info, logp.L(), cfg)
Expand Down Expand Up @@ -343,7 +365,7 @@ func TestNormalization(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()

s, err := MakeDefaultSupport(test.normalize)(beat.Info{}, logp.L(), config.NewConfig())
s, err := MakeDefaultSupport(test.normalize, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

prog, err := s.Create(beat.ProcessingConfig{}, false)
Expand All @@ -364,7 +386,7 @@ func TestNormalization(t *testing.T) {
}

func BenchmarkNormalization(b *testing.B) {
s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
s, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(b, err)

prog, err := s.Create(beat.ProcessingConfig{}, false)
Expand All @@ -378,7 +400,7 @@ func BenchmarkNormalization(b *testing.B) {
}

func TestAlwaysDrop(t *testing.T) {
s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
s, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

prog, err := s.Create(beat.ProcessingConfig{}, true)
Expand All @@ -393,7 +415,7 @@ func TestAlwaysDrop(t *testing.T) {
}

func TestDynamicFields(t *testing.T) {
factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
factory, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

dynFields := mapstr.NewPointer(mapstr.M{})
Expand All @@ -416,7 +438,7 @@ func TestDynamicFields(t *testing.T) {
}

func TestProcessingClose(t *testing.T) {
factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
factory, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

// Inject a processor in the builder that we can check if has been closed.
Expand Down
4 changes: 4 additions & 0 deletions libbeat/publisher/processing/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type SupportFactory func(info beat.Info, log *logp.Logger, cfg *config.C) (Suppo
// If `drop` is set, then the processor generated must always drop all events.
// A Supporter needs to be closed with `Close()` to release its global resources.
type Supporter interface {
// Create a running processor interface based on the given config
Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error)
// Processors returns a list of config strings for the given processor, for debug purposes
Processors() []string
// Close the processor supporter
Close() error
}
2 changes: 1 addition & 1 deletion metricbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func MetricbeatSettings() instance.Settings {
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
}
}

Expand Down
7 changes: 4 additions & 3 deletions packetbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
cmd "github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/ecs"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/packetbeat/beater"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand All @@ -49,7 +50,7 @@ var withECSVersion = processing.WithFields(mapstr.M{
var RootCmd *cmd.BeatsRootCmd

// PacketbeatSettings contains the default settings for packetbeat
func PacketbeatSettings() instance.Settings {
func PacketbeatSettings(globals processors.PluginConfig) instance.Settings {
runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags.AddGoFlag(flag.CommandLine.Lookup("I"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("t"))
Expand All @@ -61,7 +62,7 @@ func PacketbeatSettings() instance.Settings {
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, globals, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
InputQueueSize: 400,
}
}
Expand All @@ -74,5 +75,5 @@ func Initialize(settings instance.Settings) *cmd.BeatsRootCmd {
}

func init() {
RootCmd = Initialize(PacketbeatSettings())
RootCmd = Initialize(PacketbeatSettings(nil))
}
2 changes: 1 addition & 1 deletion winlogbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func WinlogbeatSettings() instance.Settings {
return instance.Settings{
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithAgentMeta()),
}
}

Expand Down
Loading