Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]: 34125 output name issue #34143

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ update: notice
clean: mage
@rm -rf build
@$(foreach var,$(PROJECTS) $(PROJECTS_XPACK_MAGE),$(MAKE) -C $(var) clean || exit 1;)
@$(MAKE) -C generator clean
@-mage -clean

## check : TBD.
Expand Down
11 changes: 11 additions & 0 deletions dev-tools/mage/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
package mage

import (
"errors"
"fmt"
"os"
"strings"

"github.com/magefile/mage/sh"
)

Expand Down Expand Up @@ -46,6 +51,12 @@ func Clean(pathLists ...[]string) error {
for _, f := range paths {
f = MustExpand(f)
if err := sh.Rm(f); err != nil {
if errors.Is(err, os.ErrPermission) ||
strings.Contains(err.Error(), "permission denied") {
fmt.Printf("warn: cannot delete %q: %v, proceeding anyway\n",
f, err)
continue
}
return err
}
}
Expand Down
24 changes: 13 additions & 11 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ import (
_ "github.com/elastic/beats/v7/filebeat/autodiscover"
)

const pipelinesWarning = "Filebeat is unable to load the ingest pipelines for the configured" +
" modules because the Elasticsearch output is not configured/enabled. If you have" +
" already loaded the ingest pipelines or are using Logstash pipelines, you" +
" can ignore this warning."
const pipelinesWarning = "ingest pipelines disabled as configured/enabled output is %q. " +
"Only Elasticsearch output supports them. If you have already loaded the " +
"ingest pipelines or are using Logstash pipelines, you can ignore this warning."

var once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF")

Expand Down Expand Up @@ -161,7 +160,8 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
// setupPipelineLoaderCallback sets the callback function for loading pipelines during setup.
func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn(pipelinesWarning)
logp.Info(fmt.Sprintf(pipelinesWarning, b.Config.Output.Name()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ingest pipelines (that run on Elasticsearch) are handled by integration packages for agent. For standalone Filebeat it can load them itself, but the agent doesn't use the functionality.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html#pipelines-for-fleet-elastic-agent for how this works under agent.

This entire section should be getting bypassed.

logp.Info("b.Config.Output keys: %v", b.Config.Output.Config().FlattenedKeys())
return nil
}

Expand All @@ -173,15 +173,15 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
}

// When running the subcommand setup, configuration from modules.d directories
// have to be loaded using cfg.Reloader. Otherwise those configurations are skipped.
// have to be loaded using cfg.Reloader. Otherwise, those configurations are skipped.
pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config())
enableAllFilesets, _ := b.BeatConfig.Bool("config.modules.enable_all_filesets", -1)
modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory, enableAllFilesets)
if fb.config.ConfigModules.Enabled() {
if enableAllFilesets {
//All module configs need to be loaded to enable all the filesets
//contained in the modules. The default glob just loads the enabled
//ones. Switching the glob pattern from *.yml to * achieves this.
// All module configs need to be loaded to enable all the filesets
Copy link
Member

@cmacknz cmacknz Dec 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI we explicitly disable modules (and filesets) when running under agent: https:/elastic/elastic-agent/blob/7f7faf365aeb303c6e4bb557f3f4b7f4de4025e7/specs/filebeat.spec.yml#L41

This functionality is replaced with integrations.

// contained in the modules. The default glob just loads the enabled
// ones. Switching the glob pattern from *.yml to * achieves this.
origPath, _ := fb.config.ConfigModules.String("path", -1)
newPath := strings.TrimSuffix(origPath, ".yml")
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
Expand All @@ -199,7 +199,8 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
// setup.
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn(pipelinesWarning)
logp.Info(fmt.Sprintf(pipelinesWarning, b.Config.Output.Name()))
logp.Info("b.Config.Output keys: %v", b.Config.Output.Config().FlattenedKeys())
return nil
}

Expand Down Expand Up @@ -287,7 +288,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
if b.Config.Output.Name() == "elasticsearch" {
pipelineLoaderFactory = newPipelineLoaderFactory(b.Config.Output.Config())
} else {
logp.Warn(pipelinesWarning)
logp.Info(fmt.Sprintf(pipelinesWarning, b.Config.Output.Name()))
logp.Info("b.Config.Output keys: %v", b.Config.Output.Config().FlattenedKeys())
}

inputsLogger := logp.NewLogger("input")
Expand Down
3 changes: 2 additions & 1 deletion heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ func newCommonPublishConfigs(info beat.Info, beatLocation *config.LocationWithID

var geoErrOnce = &sync.Once{}

// preProcessors sets up the required geo, event.dataset, data_stream.*, and write index processors for future event publishes.
// preProcessors sets up the required geo, event.dataset, data_stream.*
// and write index processors for future event publishes.
func preProcessors(info beat.Info, location *config.LocationWithID, settings publishSettings, monitorType string) (procs *processors.Processors, err error) {
procs = processors.NewList(nil)

Expand Down
6 changes: 3 additions & 3 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ type Creator func(*Beat, *config.C) (Beater, error)
//
// The Stop() method is invoked the first time (and only the first time) a
// shutdown signal is received. The Stop()-method normally will stop the Run()-loop,
// such that the beat can gracefully shutdown.
// such that the beat can gracefully shut down.
type Beater interface {
// The main event loop. This method should block until signalled to stop by an
// Run runs the main event loop. This method should block until signalled to stop by an
// invocation of the Stop() method.
Run(b *Beat) error

Expand Down Expand Up @@ -84,7 +84,7 @@ type Beat struct {
// BeatConfig struct contains the basic configuration of every beat
type BeatConfig struct {
// output/publishing related configurations
Output config.Namespace `config:"output"`
Output config.Namespace `config:"outputs"` // this seems to be the issue. But right now it seems that somehow it works when loading the config, but later on Output is empty again
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also the source of the problem in elastic/elastic-agent#1860 (comment)

The Beats assume they are reading a complete configuration file at startup to populate that. This is not true for the V2 agent. There is no output configured at startup.

In V1 we actually did read the reference Filebeat configuration at startup, and this worked by coincidence because the default output is Elasticsearch.

Ideally what we do in V2 is correct, don't assume an output until the agent configures it. This avoids problems where the Beat loads something from the reference config that conflicts with what the agent sent it, or problems where the user notices the Beats are reaching out to localhost:9200 at startup when a Logstash output is configured.

The problem is the Beats were written assuming they could read a valid config file at startup, so this is causing some weird problems in code that assumes this is populated when it isn't necessarily.

The original issue for this change was #31901

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where a blank default configuration is created:

c = config.NewConfig()

Copy link
Member

@cmacknz cmacknz Dec 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the problem is that the ingest pipeline setup is being skipped, that should not actually be the cause of this problem. See https:/elastic/beats/pull/34143/files#r1059158293

}

// OverwritePipelinesCallback can be used by the Beat to register Ingest pipeline loader
Expand Down
72 changes: 60 additions & 12 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ import (
libversion "github.com/elastic/elastic-agent-libs/version"
"github.com/elastic/elastic-agent-system-metrics/metric/system/host"
metricreport "github.com/elastic/elastic-agent-system-metrics/report"
sysinfo "github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo"
"github.com/elastic/go-sysinfo/types"
ucfg "github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg"
)

// Beat provides the runnable and configurable instance of a beat.
Expand Down Expand Up @@ -479,7 +479,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
return err
}

logp.Info("%s start running.", b.Info.Beat)
logp.Info("%s start running. 15", b.Info.Beat)

// Allow the manager to stop a currently running beats out of bound.
b.Manager.SetStopCallback(beater.Stop)
Expand Down Expand Up @@ -553,19 +553,20 @@ func (b *Beat) TestConfig(settings Settings, bt beat.Creator) error {
}())
}

//SetupSettings holds settings necessary for beat setup
// SetupSettings holds settings necessary for beat setup
type SetupSettings struct {
Dashboard bool
Pipeline bool
IndexManagement bool
//Deprecated: use IndexManagementKey instead
// Deprecated: use IndexManagementKey instead
Template bool
//Deprecated: use IndexManagementKey instead
// Deprecated: use IndexManagementKey instead
ILMPolicy bool
EnableAllFilesets bool
}

// Setup registers ES index template, kibana dashboards, ml jobs and pipelines.
//
//nolint:forbidigo // required to give feedback to user
func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) error {
return handleError(func() error {
Expand Down Expand Up @@ -663,7 +664,6 @@ func (b *Beat) configure(settings Settings) error {
if err != nil {
return fmt.Errorf("error loading config file: %w", err)
}

if err := initPaths(cfg); err != nil {
return err
}
Expand Down Expand Up @@ -700,9 +700,32 @@ func (b *Beat) configure(settings Settings) error {
if err != nil {
return fmt.Errorf("error unpacking config data: %w", err)
}

b.Beat.Config = &b.Config.BeatConfig

// if b.Beat.Config.Output.Name() == "" {
// return errors.New("b.Beat.Config.Output cannot be emoty")
// }
// if b.Config.BeatConfig.Output.Name() == "" {
// return errors.New("b.Config.BeatConfig.Output cannot be emoty")
// }

// c, err := cfg.Child("outputs", -1)
// if err != nil {
// logp.Err("cfg keys: %v", cfg.FlattenedKeys())
// return fmt.Errorf("error getting 'outputs' from config: %w", err)
// }
//
// logp.Info("cfg keys: %v", cfg.FlattenedKeys())
// outs := config.Namespace{}
// err = c.Unpack(&outs)
// if err != nil {
// return fmt.Errorf("error unpacking 'outputs' config data. Available keys: %v: %w",
// c.FlattenedKeys(), err)
// }
//
// b.Beat.Config = &beat.BeatConfig{Output: outs}
// b.Config.Output = outs

if name := b.Config.Name; name != "" {
b.Info.Name = name
}
Expand All @@ -715,6 +738,8 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error initializing logging: %w", err)
}

// WARNING: it seems the agent isn't collecting logs before this line

// log paths values to help with troubleshooting
logp.Info(paths.Paths.String())

Expand All @@ -724,7 +749,26 @@ func (b *Beat) configure(settings Settings) error {
return err
}

logp.Info("Beat ID: %v", b.Info.ID)
// is it possible the agent is losing this logs
logp.Info("Beat ID 11: %v", b.Info.ID)

logp.Info("===============================================")

logp.Info("b.Beat.Config.Output.Name(): %s",
b.Beat.Config.Output.Name())
logp.Info("b.Config.Output: %s",
b.Config.Output.Name())

logp.Info("cfg keys: %v", cfg.FlattenedKeys())

c, err := cfg.Child("outputs", -1)
if err != nil {
logp.Err("cfg keys: %v", cfg.FlattenedKeys())
return fmt.Errorf("error getting 'outputs' from config: %w", err)
}
logp.Info("truing to extract 'outputs` from cfg: %v", c.FlattenedKeys())

logp.Info("===============================================")

// initialize config manager
b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.RegisterV2, b.Beat.Info.ID)
Expand Down Expand Up @@ -1055,7 +1099,7 @@ func (b *Beat) setupMonitoring(settings Settings) (report.Reporter, error) {
}

// handleError handles the given error by logging it and then returning the
// error. If the err is nil or is a GracefulExit error then the method will
// error. If err is nil or is a GracefulExit error then the method will
// return nil without logging anything.
func handleError(err error) error {
if err == nil || err == beat.GracefulExit { //nolint:errorlint // keep old behaviour
Expand Down Expand Up @@ -1100,10 +1144,14 @@ func logSystemInfo(info beat.Info) {
log.Infow("Build info", "build", build)

// Go Runtime
log.Infow("Go runtime info", "go", sysinfo.Go())
log.Infow("Go runtime info", "go", sysinfo.Go()) // TODO: fix it, it is NOT being logged

// Host
if host, err := sysinfo.Host(); err == nil {
host, err := sysinfo.Host()
if err != nil {
log.Errorf("failed to get host info: %v", err)
}
if err == nil {
log.Infow("Host info", "host", host.Info())
}

Expand Down
42 changes: 32 additions & 10 deletions libbeat/processors/add_host_metadata/add_host_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
)

func init() {
logp.L().Named(processorName).Infof("add_host_metadata %s init", processorName)
logp.L().Infof("add_host_metadata %s init", processorName)
processors.RegisterPlugin("add_host_metadata", New)
jsprocessor.RegisterPlugin("AddHostMetadata", New)
}
Expand All @@ -57,20 +59,23 @@ const (

// New constructs a new add_host_metadata processor.
func New(cfg *config.C) (processors.Processor, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
defautcfg := defaultConfig()
if err := cfg.Unpack(&defautcfg); err != nil {
return nil, errors.Wrapf(err, "fail to unpack the %v configuration", processorName)
}

p := &addHostMetadata{
config: config,
config: defautcfg,
data: mapstr.NewPointer(nil),
logger: logp.NewLogger("add_host_metadata"),
}
p.loadData()
err := p.loadData()
if err != nil {
return nil, fmt.Errorf("coul not load metadata: %w", err)
}

if config.Geo != nil {
geoFields, err := util.GeoConfigToMap(*config.Geo)
if defautcfg.Geo != nil {
geoFields, err := util.GeoConfigToMap(*defautcfg.Geo)
if err != nil {
return nil, err
}
Expand All @@ -83,13 +88,16 @@ func New(cfg *config.C) (processors.Processor, error) {
// Run enriches the given event with the host meta data
func (p *addHostMetadata) Run(event *beat.Event) (*beat.Event, error) {
// check replace_host_fields field
p.logger.Info("add_host_metadata Run")
logp.L().Info("add_host_metadata Run")
if !p.config.ReplaceFields && skipAddingHostMetadata(event) {
p.logger.Infof("skipping addHostMetadata: config.ReplaceFields: %v, skipAddingHostMetadata: %v", p.config.ReplaceFields, skipAddingHostMetadata(event))
return event, nil
}

err := p.loadData()
if err != nil {
return nil, err
return nil, fmt.Errorf("could not load data to enrich event: %w", err)
}

event.Fields.DeepUpdate(p.data.Get().Clone())
Expand All @@ -116,7 +124,12 @@ func (p *addHostMetadata) expired() bool {
}

func (p *addHostMetadata) loadData() error {
fmt.Println("loadData invoked")
p.logger.Infof("loadData invoked")

if !p.expired() {
fmt.Printf("addHostMetadata is not expired, exiting: p.expired: %v\n", p.expired)
p.logger.Infof("addHostMetadata is not expired, exiting: p.expired: %v", p.expired)
return nil
}

Expand All @@ -134,15 +147,24 @@ func (p *addHostMetadata) loadData() error {
}

if len(ipList) > 0 {
data.Put("host.ip", ipList)
if _, err := data.Put("host.ip", ipList); err != nil {
p.logger.Warnf("failed to add 'host.ip':%v", err)
}
}
if len(hwList) > 0 {
data.Put("host.mac", hwList)
if _, err := data.Put("host.mac", hwList); err != nil {
p.logger.Warnf("failed to add 'host.mac':%v", err)
}
}
}

if p.config.Name != "" {
data.Put("host.name", p.config.Name)
if _, err := data.Put("host.name", p.config.Name); err != nil {
p.logger.Warnf("failed to add 'host.name':%v", err)
}
if _, err := data.Put("host.hostname", p.config.Name); err != nil {
p.logger.Warnf("failed to add 'host.hostname':%v", err)
}
}
p.data.Set(data)
return nil
Expand Down
Loading