Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix performance issues with processors scaling under agent #35031

Merged
7 changes: 4 additions & 3 deletions auditbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/ecs"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/metricbeat/beater"
"github.com/elastic/beats/v7/metricbeat/mb/module"
Expand Down Expand Up @@ -53,13 +54,13 @@ var withECSVersion = processing.WithFields(mapstr.M{
})

// AuditbeatSettings contains the default settings for auditbeat
func AuditbeatSettings() instance.Settings {
func AuditbeatSettings(globals processors.PluginConfig) instance.Settings {
runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError)
return instance.Settings{
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, globals, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
}
}

Expand All @@ -76,5 +77,5 @@ func Initialize(settings instance.Settings) *cmd.BeatsRootCmd {
}

func init() {
RootCmd = Initialize(AuditbeatSettings())
RootCmd = Initialize(AuditbeatSettings(nil))
}
2 changes: 1 addition & 1 deletion heartbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var withECSVersion = processing.WithFields(mapstr.M{
func HeartbeatSettings() instance.Settings {
return instance.Settings{
Name: Name,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithAgentMeta()),
HasDashboards: false,
}
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *conf.C) (report
outClient := outputs.NewFailoverClient(clients)
outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max)

processing, err := processing.MakeDefaultSupport(true)(beat, log, conf.NewConfig())
processing, err := processing.MakeDefaultSupport(true, nil)(beat, log, conf.NewConfig())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration,
clusterUUID = getClusterUUID()
}
if clusterUUID != "" {
meta.Put("cluster_uuid", clusterUUID)
_, _ = meta.Put("cluster_uuid", clusterUUID)
}

r.client.Publish(beat.Event{
Expand Down
23 changes: 21 additions & 2 deletions libbeat/processors/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,29 @@

package processors

import "github.com/elastic/elastic-agent-libs/config"
import (
"fmt"

"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

// PluginConfig represents the list of processors.
type PluginConfig []*config.C

// fields that should be always exported
// MandatoryExportedFields are fields that should be always exported
var MandatoryExportedFields = []string{"type"}

// NewPluginConfigFromList creates a PluginConfig from a list of raw processor config objects
func NewPluginConfigFromList(raw []mapstr.M) (PluginConfig, error) {
processors := make([]*config.C, len(raw))
for i := 0; i < len(raw); i++ {
cfg, err := config.NewConfigFrom(raw[i])
if err != nil {
return nil, fmt.Errorf("error creating processor config: %w", err)
}
processors[i] = cfg
}

return processors, nil
}
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func RunTests(
) error {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return fmt.Errorf("unpacking config failed: %v", err)
return fmt.Errorf("unpacking config failed: %w", err)
}

log := logp.L()

processing, err := processing.MakeDefaultSupport(false)(info, log, cfg)
processing, err := processing.MakeDefaultSupport(false, nil)(info, log, cfg)
if err != nil {
return err
}
Expand All @@ -81,7 +81,7 @@ func RunTests(
},
)
if err != nil {
return fmt.Errorf("loading pipeline failed: %+v", err)
return fmt.Errorf("loading pipeline failed: %w", err)
}
defer func() {
log.Info("Stop pipeline")
Expand Down
22 changes: 18 additions & 4 deletions libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/elastic/beats/v7/libbeat/asset"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/fleetmode"
"github.com/elastic/beats/v7/libbeat/ecs"
"github.com/elastic/beats/v7/libbeat/features"
"github.com/elastic/beats/v7/libbeat/mapping"
Expand Down Expand Up @@ -77,14 +78,14 @@ type builtinModifier func(beat.Info) mapstr.M
// MakeDefaultBeatSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields
// to each event.
func MakeDefaultBeatSupport(normalize bool) SupportFactory {
return MakeDefaultSupport(normalize, WithECS, WithHost, WithAgentMeta())
return MakeDefaultSupport(normalize, nil, WithECS, WithHost, WithAgentMeta())
}

// MakeDefaultObserverSupport creates a new SupportFactory based on NewDefaultSupport.
// MakeDefaultObserverSupport automatically adds the `ecs.version` and `observer.X` fields
// to each event.
func MakeDefaultObserverSupport(normalize bool) SupportFactory {
return MakeDefaultSupport(normalize, WithECS, WithObserverMeta())
return MakeDefaultSupport(normalize, nil, WithECS, WithObserverMeta())
}

// MakeDefaultSupport creates a new SupportFactory for use with the publisher pipeline.
Expand All @@ -94,8 +95,11 @@ func MakeDefaultObserverSupport(normalize bool) SupportFactory {
// and `processor` settings to the event processing pipeline to be generated.
// Use WithFields, WithBeatMeta, and other to declare the builtin fields to be added
// to each event. Builtin fields can be modified using global `processors`, and `fields` only.
// the fleetDefaultProcessors argument will set the given global-level processors if the beat is currently running under fleet,
// and no other global-level processors are set.
func MakeDefaultSupport(
normalize bool,
fleetDefaultProcessors processors.PluginConfig,
modifiers ...modifier,
) SupportFactory {
return func(info beat.Info, log *logp.Logger, beatCfg *config.C) (Supporter, error) {
Expand All @@ -107,8 +111,18 @@ func MakeDefaultSupport(
if err := beatCfg.Unpack(&cfg); err != nil {
return nil, err
}
// don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those
// also makes it easier to disable global processors if needed, since they're otherwise hardcoded
var rawProcessors processors.PluginConfig
if fleetmode.Enabled() && !beatCfg.HasField("processors") {
log.Debugf("In fleet with processors specified, defaulting to global processors")
rawProcessors = fleetDefaultProcessors

fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
} else {
rawProcessors = cfg.Processors
}

processors, err := processors.New(cfg.Processors)
processors, err := processors.New(rawProcessors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %w", err)
}
Expand All @@ -125,7 +139,7 @@ func WithFields(fields mapstr.M) modifier {
}

// WithECS modifier adds `ecs.version` builtin fields to a processing pipeline.
var WithECS modifier = WithFields(mapstr.M{
var WithECS = WithFields(mapstr.M{
"ecs": mapstr.M{
"version": ecs.Version,
},
Expand Down
35 changes: 29 additions & 6 deletions libbeat/publisher/processing/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,31 @@ import (
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

_ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_docker_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_host_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata"
)

func TestGenerateProcessorList(t *testing.T) {
_ = logp.DevelopmentSetup()
inputCfg := []mapstr.M{
{"add_host_metadata": nil},
{"add_cloud_metadata": nil},
{"add_docker_metadata": nil},
{"add_kubernetes_metadata": nil},
}

plugins, err := processors.NewPluginConfigFromList(inputCfg)
require.NoError(t, err)

processors, err := processors.New(plugins)
require.NoError(t, err)
// make sure the processor init got the config formatted in a way it expected
require.Equal(t, 4, processors.List)
}

func TestProcessorsConfigs(t *testing.T) {
defaultInfo := beat.Info{
Beat: "test",
Expand Down Expand Up @@ -258,7 +281,7 @@ func TestProcessorsConfigs(t *testing.T) {

factory := test.factory
if factory == nil {
factory = MakeDefaultSupport(true)
factory = MakeDefaultSupport(true, nil)
}

support, err := factory(info, logp.L(), cfg)
Expand Down Expand Up @@ -343,7 +366,7 @@ func TestNormalization(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()

s, err := MakeDefaultSupport(test.normalize)(beat.Info{}, logp.L(), config.NewConfig())
s, err := MakeDefaultSupport(test.normalize, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

prog, err := s.Create(beat.ProcessingConfig{}, false)
Expand All @@ -364,7 +387,7 @@ func TestNormalization(t *testing.T) {
}

func BenchmarkNormalization(b *testing.B) {
s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
s, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(b, err)

prog, err := s.Create(beat.ProcessingConfig{}, false)
Expand All @@ -378,7 +401,7 @@ func BenchmarkNormalization(b *testing.B) {
}

func TestAlwaysDrop(t *testing.T) {
s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
s, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

prog, err := s.Create(beat.ProcessingConfig{}, true)
Expand All @@ -393,7 +416,7 @@ func TestAlwaysDrop(t *testing.T) {
}

func TestDynamicFields(t *testing.T) {
factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
factory, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

dynFields := mapstr.NewPointer(mapstr.M{})
Expand All @@ -416,7 +439,7 @@ func TestDynamicFields(t *testing.T) {
}

func TestProcessingClose(t *testing.T) {
factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
factory, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

// Inject a processor in the builder that we can check if has been closed.
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func MetricbeatSettings() instance.Settings {
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
}
}

Expand Down
7 changes: 4 additions & 3 deletions packetbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
cmd "github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/ecs"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/packetbeat/beater"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand All @@ -49,7 +50,7 @@ var withECSVersion = processing.WithFields(mapstr.M{
var RootCmd *cmd.BeatsRootCmd

// PacketbeatSettings contains the default settings for packetbeat
func PacketbeatSettings() instance.Settings {
func PacketbeatSettings(globals processors.PluginConfig) instance.Settings {
runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags.AddGoFlag(flag.CommandLine.Lookup("I"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("t"))
Expand All @@ -61,7 +62,7 @@ func PacketbeatSettings() instance.Settings {
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, globals, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
InputQueueSize: 400,
}
}
Expand All @@ -74,5 +75,5 @@ func Initialize(settings instance.Settings) *cmd.BeatsRootCmd {
}

func init() {
RootCmd = Initialize(PacketbeatSettings())
RootCmd = Initialize(PacketbeatSettings(nil))
}
2 changes: 1 addition & 1 deletion winlogbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func WinlogbeatSettings() instance.Settings {
return instance.Settings{
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithAgentMeta()),
}
}

Expand Down
10 changes: 7 additions & 3 deletions x-pack/auditbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
auditbeatcmd "github.com/elastic/beats/v7/auditbeat/cmd"
"github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/x-pack/libbeat/management"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand All @@ -30,8 +31,7 @@ var RootCmd *cmd.BeatsRootCmd
// auditbeatCfg is a callback registered with central management to perform any needed config transformations
// before agent configs are sent to a beat
func auditbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
procs := defaultProcessors()
modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo, procs...)
modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
}
Expand All @@ -55,7 +55,11 @@ func auditbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo)

func init() {
management.ConfigTransform.SetTransform(auditbeatCfg)
settings := auditbeatcmd.AuditbeatSettings()
globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors())
if err != nil { // these are hard-coded, shouldn't fail
panic(fmt.Errorf("error creating global processors: %w", err))
}
settings := auditbeatcmd.AuditbeatSettings(globalProcs)
settings.ElasticLicensed = true
RootCmd = auditbeatcmd.Initialize(settings)
}
Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ import (
)

func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
procs := defaultProcessors()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth moving the defaultProcessors function into the file that uses it, since it isn't actually used here anymore.

This comment applies to each of the beats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah, good point, was kinda unsure of how to clean those up.

modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo, procs...)
modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
}
Expand Down
9 changes: 9 additions & 0 deletions x-pack/filebeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
package cmd

import (
"fmt"

fbcmd "github.com/elastic/beats/v7/filebeat/cmd"
cmd "github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/x-pack/libbeat/management"

// Register the includes.
Expand All @@ -21,6 +25,11 @@ const Name = fbcmd.Name
func Filebeat() *cmd.BeatsRootCmd {
management.ConfigTransform.SetTransform(filebeatCfg)
settings := fbcmd.FilebeatSettings()
globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors())
if err != nil { // these are hard-coded, shouldn't fail
panic(fmt.Errorf("error creating global processors: %w", err))
}
settings.Processing = processing.MakeDefaultSupport(true, globalProcs)
settings.ElasticLicensed = true
command := fbcmd.Filebeat(inputs.Init, settings)
return command
Expand Down
3 changes: 1 addition & 2 deletions x-pack/metricbeat/cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import (
)

func metricbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) {
procs := defaultProcessors()
modules, err := management.CreateInputsFromStreams(rawIn, "metrics", agentInfo, procs...)
modules, err := management.CreateInputsFromStreams(rawIn, "metrics", agentInfo)
if err != nil {
return nil, fmt.Errorf("error creating input list from raw expected config: %w", err)
}
Expand Down
Loading