Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filebeat monitoring enters infinite error loop for "closed processor" #34716

Closed
faec opened this issue Mar 2, 2023 · 15 comments · Fixed by #34761
Closed

Filebeat monitoring enters infinite error loop for "closed processor" #34716

faec opened this issue Mar 2, 2023 · 15 comments · Fixed by #34761
Assignees
Labels
bug Team:Elastic-Agent Label for the Agent team v8.7.0

Comments

@faec
Copy link
Contributor

faec commented Mar 2, 2023

Intermittently, the monitoring started by Agent enters a loop where it repeats this message ~10K times a second:

{"log.level":"error","@timestamp":"2023-03-02T11:59:42.394Z","message":"Failed to publish event: attempt to use a closed processor","component":{"binary":"filebeat","dataset":"elastic_agent.filebeat","id":"filestream-monitoring","type":"filestream"},"log":{"source":"filestream-monitoring"},"log.logger":"publisher","log.origin":{"file.line":102,"file.name":"pipeline/client.go"},"service.name":"filebeat","ecs.version":"1.6.0","ecs.version":"1.6.0"}

This error is inconsistent -- on some runs it begins soon after startup, on many runs it never happens at all. When it does happen, it severely degrades or blocks other ingestion. Subsequent runs using identical configurations with identical binaries are still inconsistent as to whether this bug occurs.

All the times I've encountered this so far have been while testing the shipper, but since most of my agent runs are testing the shipper I'm uncertain if that's a reliable correlation: the error is happening before events reach the shipper output, which is the first place event handling diverges significantly when the shipper is in use. (There could be some shipper-specific agent-injected configuration that would affect the processor list, though I'm not aware of anything like that.)

The most recent configuration I used to exhibit the error is:

outputs:
  default:
    type: elasticsearch
    log_level: debug
    enabled: true
    hosts: [https://127.0.0.1:9200]
    username: "elastic"
    password: [password]
    allow_older_versions: true
    ssl:
      verification_mode: none
    shipper:
      enabled: true

inputs:
  - type: system/metrics
    id: unique-system-metrics-input
    data_stream.namespace: default
    use_output: default
    streams:
      - metricset: cpu
        data_stream.dataset: system.cpu
      - metricset: memory
        data_stream.dataset: system.memory
      - metricset: network
        data_stream.dataset: system.network
      - metricset: filesystem
        data_stream.dataset: system.filesystem
@faec faec added bug Team:Elastic-Agent Label for the Agent team labels Mar 2, 2023
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent (Team:Elastic-Agent)

@cmacknz
Copy link
Member

cmacknz commented Mar 2, 2023

This error is new. It is coming from:

var ErrClosed = errors.New("attempt to use a closed processor")
type SafeProcessor struct {
Processor
closed uint32
}
// Run allows to run processor only when `Close` was not called prior
func (p *SafeProcessor) Run(event *beat.Event) (*beat.Event, error) {
if atomic.LoadUint32(&p.closed) == 1 {
return nil, ErrClosed
}
return p.Processor.Run(event)
}

This error was added last week in #34647. @rdner can you investigate what might be going on here?

@cmacknz
Copy link
Member

cmacknz commented Mar 2, 2023

The filestream monitoring processors are defined in the agent, see https:/elastic/elastic-agent/blob/98b8fe7ff2e5fc5db9a309ffd19c587f10ef8270/internal/pkg/agent/application/monitoring/v1_monitor.go#L320 for example. The monitoring Beats are configured in that file.

@cmacknz
Copy link
Member

cmacknz commented Mar 2, 2023

This only affects the unreleased v8.7 and v7.17. Likely this needs to be an 8.7 release blocker, but I'll give us some time to investigate and triage before adding it to the dev issue.

@pierrehilbert FYI, added to the current sprint. I don't think we can release v8.7 with this.

@faec
Copy link
Contributor Author

faec commented Mar 2, 2023

I think this actually brought to light a long-standing related bug: none of the processors are really getting closed. Here's the code that closes the processors:

[snip]

Desregard this comment, I tested it against the wrong code 😅

@faec
Copy link
Contributor Author

faec commented Mar 2, 2023

Sorry, closed the wrong bug -- the additional closer error I described was spurious, it was a mismatched prototype

@faec faec reopened this Mar 2, 2023
@rdner
Copy link
Member

rdner commented Mar 2, 2023

@cmacknz this error is a safeguard which makes sure that none of the processors is used after Closed is called, as the error message suggests.

We didn't introduce a new issue with #34647 we exposed an existing one. So, there is no regression here.

This is some kind of race condition in the Beats internals, I looked through the code when working on #34647 but could not find a possible place where it could occur, this is how the safeguard was introduced.

I'd like to ask a fresh pair of eyes to look at it.

@rdner
Copy link
Member

rdner commented Mar 2, 2023

Removing the safeguard would most likely lead to a panic because on Close processors free their resources, e.g. close channels and another Run would panic in some processors.

@cmacknz
Copy link
Member

cmacknz commented Mar 2, 2023

The Failed to publish event: attempt to use a closed processor here in the monitoring Filebeat strongly indicates we cannot ship Elastic agent logs to Fleet successfully.

I believe @rdner that #34647 didn't cause this intentionally and has maybe uncovered an old problem, but given the error points to this change it seems like an unintended side effect.

@rdner you are the most familiar with this area of Beats, so are the best positioned to try to debug this. I'm not sure who else to give this too otherwise, maybe @pierrehilbert can help with this.

If we are indeed failing to send monitoring logs to Fleet we can't release 8.7 with this bug, so addressing this needs to take priority over something else in the sprint.

@faec
Copy link
Contributor Author

faec commented Mar 2, 2023

Based on how I encountered it (in particular its inconsistency across repeated runs), I wonder if it would previously have produced a panic or similar fatal error which would have just restarted the beat. In that case this would be uncovering an error that was already there, but by spin-locking on the error we are now preventing a natural restart + recovery that masked it in the past.

@rdner
Copy link
Member

rdner commented Mar 2, 2023

The Failed to publish event: attempt to use a closed processor here in the monitoring Filebeat strongly indicates we cannot ship Elastic agent logs to Fleet successfully.

Agreed but, as @faec said, previously it was just panicking and restarting, I don't consider this the right behaviour either. The only difference is that we have the actual log line that says we lost the event, it was not the case before. We definitely need to find the root cause which has nothing to do with my change, we should keep the change for cases exactly like this, to expose the incorrect behaviour.

I believe @rdner that #34647 didn't cause this intentionally and has maybe uncovered an old problem, but given the error points to this change it seems like an unintended side effect.

The opposite this is the intended side-effect. We didn't have any prove that we use Run after Close, this change was introduced to check exactly this and, as we can see, it found the issue. Would be nice to have a stacktrace of that log, perhaps we can somehow tune the logger. I'll try to reproduce using the config from the description.

@rdner you are the most familiar with this area of Beats, so are the best positioned to try to debug this. I'm not sure who else to give this too otherwise, maybe @pierrehilbert can help with this.

I'll try to get to the bottom of this in the 2 days I have before my PTO.

@cmacknz #34647 was merged on 22th of February, if I understand correctly the BC for 8.7.0 was created earlier and the change would not affect the current release, it's included in 8.7.1. So, we have plenty of time to look into this, don't we?

@cmacknz
Copy link
Member

cmacknz commented Mar 2, 2023

There are multiple build candidates scheduled to pick up fixes before release.

The last 8.7.0 build candidate is scheduled for 2023-03-22, so we have until then to fix this (I can't link to the 8.7.0 release schedule here as it's private)

@faec
Copy link
Contributor Author

faec commented Mar 6, 2023

I would say the intended side effect is to report the error of run-after-close, and the unintended side effect is a spin lock that wedges the whole pipeline while filling the logs with 100MB of error messages per minute. We would prefer to keep the intended side effect, but we can't ship something that spinlocks on what would otherwise be a recoverable error.

@rdner
Copy link
Member

rdner commented Mar 7, 2023

@faec

unintended side effect is a spin lock that wedges the whole pipeline while filling the logs with 100MB of error messages per minute. We would prefer to keep the intended side effect, but we can't ship something that spinlocks on what would otherwise be a recoverable error.

Absolutely agree but to be fair none of us knew that the use of processors was broken that much. I tested a simple processor configuration in my PR and it worked normally, so it's not like it was not tested at all.

Just to be clear, in my previous comments I never meant "won't fix", I tried to communicate that the fact that we caught this behaviour was a good thing and I started investigating right away.

Who would have thought that the problem was at this scale, it had been previously reported only for the add_kubernetes_metadata processor.

@rdner
Copy link
Member

rdner commented Mar 7, 2023

@cmacknz @faec

I've finally managed to find the cause. The fixing PR is here #34761

The Source of the Issue

I've tracked it down to where the multiple Close calls always come from:

tried to close already closed "add_cloud_metadata={}" processor: "goroutine 195 [running]:
runtime/debug.Stack()
	/opt/homebrew/Cellar/[email protected]/1.19.6/libexec/src/runtime/debug/stack.go:24 +0x64
github.com/elastic/beats/v7/libbeat/processors.(*SafeProcessor).Close(0x14000fb1788?)
	/Users/rdner/Projects/beats/libbeat/processors/safe_processor.go:51 +0xe0
github.com/elastic/beats/v7/libbeat/processors.Close(...)
	/Users/rdner/Projects/beats/libbeat/processors/processor.go:58
github.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Close(0x14000ebea98?)
	/Users/rdner/Projects/beats/libbeat/publisher/processing/processors.go:96 +0x158
github.com/elastic/beats/v7/libbeat/processors.Close(...)
	/Users/rdner/Projects/beats/libbeat/processors/processor.go:58
github.com/elastic/beats/v7/libbeat/publisher/processing.(*group).Close(0x140004065e0?)
	/Users/rdner/Projects/beats/libbeat/publisher/processing/processors.go:96 +0x158
github.com/elastic/beats/v7/libbeat/processors.Close(...)
	/Users/rdner/Projects/beats/libbeat/processors/processor.go:58
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close.func1()
	/Users/rdner/Projects/beats/libbeat/publisher/pipeline/client.go:167 +0x278
sync.(*Once).doSlow(0x14000d24750?, 0x140010b03c0?)
	/opt/homebrew/Cellar/[email protected]/1.19.6/libexec/src/sync/once.go:74 +0x104
sync.(*Once).Do(...)
	/opt/homebrew/Cellar/[email protected]/1.19.6/libexec/src/sync/once.go:65
github.com/elastic/beats/v7/libbeat/publisher/pipeline.(*client).Close(0x1031250fe?)
	/Users/rdner/Projects/beats/libbeat/publisher/pipeline/client.go:148 +0x58
github.com/elastic/beats/v7/filebeat/beater.(*countingClient).Close(0x140010b03c0?)
	/Users/rdner/Projects/beats/filebeat/beater/channels.go:145 +0x28
github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile.startHarvester.func1({0x10447a7e0?, 0x14000d30000})
	/Users/rdner/Projects/beats/filebeat/input/filestream/internal/input-logfile/harvester.go:219 +0x6a4
github.com/elastic/go-concert/unison.(*TaskGroup).Go.func1()
	/Users/rdner/go/pkg/mod/github.com/elastic/[email protected]/unison/taskgroup.go:163 +0x94
created by github.com/elastic/go-concert/unison.(*TaskGroup).Go
	/Users/rdner/go/pkg/mod/github.com/elastic/[email protected]/unison/taskgroup.go:159 +0xc4

In my tests (the policy in the description of this issue) it happened for the following processors:

  • add_cloud_metadata
  • add_docker_metadata
  • add_fields
  • add_index_pattern
  • add_kubernetes_metadata
  • copy_fields
  • drop_fields

Apparently, it does not really depend on a particular processor and happens only in filestream triggered by this code:

client, err := hg.pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
ACKHandler: newInputACKHandler(hg.ackCH, ctx.Logger),
})
if err != nil {
hg.readers.remove(srcID)
return fmt.Errorf("error while connecting to output with pipeline: %v", err)
}
defer client.Close()

Every time a filestream harvester stops or restarts for any reason, it closes the client, which closes the processors:

if c.processors != nil {
log.Debug("client: closing processors")
err := processors.Close(c.processors)
if err != nil {
log.Errorf("client: error closing processors: %v", err)
}
log.Debug("client: done closing processors")
}

Closing the client is normal behaviour, seems like something is re-using the same processors across multiple clients and this is causing the issue.

The Root Cause

I found this function:

func newCommonConfigEditor(
beatInfo beat.Info,
cfg *conf.C,
) (pipetool.ConfigEditor, error) {
config := commonInputConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
var indexProcessor processors.Processor
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor = add_formatted_index.New(timestampFormat)
}
userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
serviceType := config.ServiceType
if serviceType == "" {
serviceType = config.Module
}
return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) {
meta := clientCfg.Processing.Meta.Clone()
fields := clientCfg.Processing.Fields.Clone()
setOptional(meta, "pipeline", config.Pipeline)
setOptional(fields, "fileset.name", config.Fileset)
setOptional(fields, "service.type", serviceType)
setOptional(fields, "input.type", config.Type)
if config.Module != "" {
event := mapstr.M{"module": config.Module}
if config.Fileset != "" {
event["dataset"] = config.Module + "." + config.Fileset
}
fields["event"] = event
}
// assemble the processors. Ordering is important.
// 1. add support for index configuration via processor
// 2. add processors added by the input that wants to connect
// 3. add locally configured processors from the 'processors' settings
procs := processors.NewList(nil)
if indexProcessor != nil {
procs.AddProcessor(indexProcessor)
}
if lst := clientCfg.Processing.Processor; lst != nil {
procs.AddProcessor(lst)
}
if userProcessors != nil {
procs.AddProcessors(*userProcessors)
}
clientCfg.Processing.EventMetadata = config.EventMetadata
clientCfg.Processing.Meta = meta
clientCfg.Processing.Fields = fields
clientCfg.Processing.Processor = procs
clientCfg.Processing.KeepNull = config.KeepNull
clientCfg.Processing.DisableHost = config.PublisherPipeline.DisableHost
return clientCfg, nil
}, nil
}

Which is actually re-using all the user-space (defined in the config) processors since they're created in the closure and then used by the return factory function later. And that was it. Once I moved the initialisation inside the returned function I could not reproduce the errors anymore.

My Analysis

The pipeline creates a new set of processors here:

processors, err := p.createEventProcessing(cfg.Processing, publishDisabled)

Which eventually reaches these two functions:

This first – creates user-space processors from the config:

func New(config PluginConfig) (*Processors, error) {
procs := NewList(nil)
for _, procConfig := range config {
// Handle if/then/else processor which has multiple top-level keys.
if procConfig.HasField("if") {
p, err := NewIfElseThenProcessor(procConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to make if/then/else processor")
}
procs.AddProcessor(p)
continue
}
if len(procConfig.GetFields()) != 1 {
return nil, errors.Errorf("each processor must have exactly one "+
"action, but found %d actions (%v)",
len(procConfig.GetFields()),
strings.Join(procConfig.GetFields(), ","))
}
actionName := procConfig.GetFields()[0]
actionCfg, err := procConfig.Child(actionName, -1)
if err != nil {
return nil, err
}
gen, exists := registry.reg[actionName]
if !exists {
var validActions []string
for k := range registry.reg {
validActions = append(validActions, k)
}
return nil, errors.Errorf("the processor action %s does not exist. Valid actions: %v", actionName, strings.Join(validActions, ", "))
}
common.PrintConfigDebugf(actionCfg, "Configure processor action '%v' with:", actionName)
constructor := gen.Plugin()
plugin, err := constructor(actionCfg)
if err != nil {
return nil, err
}
procs.AddProcessor(plugin)
}
if len(procs.List) > 0 {
procs.log.Debugf("Generated new processors: %v", procs)
}
return procs, nil
}
// AddProcessor adds a single Processor to Processors
func (procs *Processors) AddProcessor(p Processor) {
procs.List = append(procs.List, p)
}

Then some internal processors on top of it here:

func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) {
var (
// pipeline processors
processors = newGroup("processPipeline", b.log)
// client fields and metadata
clientMeta = cfg.Meta
localProcessors = makeClientProcessors(b.log, cfg)
)
needsCopy := b.alwaysCopy || localProcessors != nil || b.processors != nil
builtin := b.builtinMeta
if cfg.DisableHost {
tmp := builtin.Clone()
delete(tmp, "host")
builtin = tmp
}
var clientFields mapstr.M
for _, mod := range b.modifiers {
m := mod.ClientFields(b.info, cfg)
if len(m) > 0 {
if clientFields == nil {
clientFields = mapstr.M{}
}
clientFields.DeepUpdate(m.Clone())
}
}
if len(clientFields) > 0 {
tmp := builtin.Clone()
tmp.DeepUpdate(clientFields)
builtin = tmp
}
// setup 1: generalize/normalize output (P)
if cfg.EventNormalization != nil {
if *cfg.EventNormalization {
processors.add(newGeneralizeProcessor(cfg.KeepNull))
}
} else if !b.skipNormalize {
processors.add(newGeneralizeProcessor(cfg.KeepNull))
}
// setup 2: add Meta from client config (C)
if m := clientMeta; len(m) > 0 {
processors.add(clientEventMeta(m, needsCopy))
}
// setup 4, 5: pipeline tags + client tags
var tags []string
tags = append(tags, b.tags...)
tags = append(tags, cfg.EventMetadata.Tags...)
if len(tags) > 0 {
processors.add(actions.NewAddTags("tags", tags))
}
// setup 3, 4, 5: client config fields + pipeline fields + client fields + dyn metadata
fields := cfg.Fields.Clone()
fields.DeepUpdate(b.fields.Clone())
if em := cfg.EventMetadata; len(em.Fields) > 0 {
if err := mapstr.MergeFieldsDeep(fields, em.Fields.Clone(), em.FieldsUnderRoot); err != nil {
return nil, fmt.Errorf("failed merging client event metadata into fields: %w", err)
}
}
if len(fields) > 0 {
// Enforce a copy of fields if dynamic fields are configured or agent
// metadata will be merged into the fields.
// With dynamic fields potentially changing at any time, we need to copy,
// so we do not change shared structures be accident.
fieldsNeedsCopy := needsCopy || cfg.DynamicFields != nil || hasKeyAnyOf(fields, builtin)
processors.add(actions.NewAddFields(fields, fieldsNeedsCopy, true))
}
if cfg.DynamicFields != nil {
checkCopy := func(m mapstr.M) bool {
return needsCopy || hasKeyAnyOf(m, builtin)
}
processors.add(makeAddDynMetaProcessor("dynamicFields", cfg.DynamicFields, checkCopy))
}
// setup 5: client processor list
processors.add(localProcessors)
// setup 6: add beats and host metadata
if meta := builtin; len(meta) > 0 {
processors.add(actions.NewAddFields(meta, needsCopy, false))
}
// setup 8: pipeline processors list
if b.processors != nil {
// Add the global pipeline as a function processor, so clients cannot close it
processors.add(newProcessor(b.processors.title, b.processors.Run))
}
// setup 9: time series metadata
if b.timeSeries {
processors.add(timeseries.NewTimeSeriesProcessor(b.timeseriesFields))
}
// setup 10: debug print final event (P)
if b.log.IsDebug() || publisher.UnderAgent() {
processors.add(debugPrintProcessor(b.info, b.log))
}
// setup 11: drop all events if outputs are disabled (P)
if drop {
processors.add(dropDisabledProcessor)
}
return processors, nil
}

Because of these assignments

processingFactory := settings.Processing
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}
b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig)

func MakeDefaultBeatSupport(normalize bool) SupportFactory {
return MakeDefaultSupport(normalize, WithECS, WithHost, WithAgentMeta())
}

func MakeDefaultSupport(
normalize bool,
modifiers ...modifier,
) SupportFactory {
return func(info beat.Info, log *logp.Logger, beatCfg *config.C) (Supporter, error) {
cfg := struct {
mapstr.EventMetadata `config:",inline"` // Fields and tags to add to each event.
Processors processors.PluginConfig `config:"processors"`
TimeSeries bool `config:"timeseries.enabled"`
}{}
if err := beatCfg.Unpack(&cfg); err != nil {
return nil, err
}
processors, err := processors.New(cfg.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %w", err)
}
return newBuilder(info, log, processors, cfg.EventMetadata, modifiers, !normalize, cfg.TimeSeries)
}
}

So, this Processors.New function is calling our processor Plugin API which eventually returns new instances of the requested processors. No problem here.

The builder also does not re-use anything and protects the global processors from being closed:

if b.processors != nil {
// Add the global pipeline as a function processor, so clients cannot close it
processors.add(newProcessor(b.processors.title, b.processors.Run))
}

However, Filebeat is a somewhat special in the way it handles its pipeline/input creation and it's using this function:

inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info, compat.Combine(
compat.RunnerFactory(inputsLogger, b.Info, v2InputLoader),
input.NewRunnerFactory(pipelineConnector, registrar, fb.done),
))
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)

which is basically a factory for inputs, I suppose it's also to handle input restarts during runtime.

If we go to this function definition, we can see it calls:

func withClientConfig(
beatInfo beat.Info,
pipeline beat.PipelineConnector,
cfg *conf.C,
) (beat.PipelineConnector, error) {
editor, err := newCommonConfigEditor(beatInfo, cfg)
if err != nil {
return nil, err
}
return pipetool.WithClientConfigEdit(pipeline, editor), nil
}

And here we can see the function that caused it all:

func newCommonConfigEditor(
beatInfo beat.Info,
cfg *conf.C,
) (pipetool.ConfigEditor, error) {
config := commonInputConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
var indexProcessor processors.Processor
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor = add_formatted_index.New(timestampFormat)
}
userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
serviceType := config.ServiceType
if serviceType == "" {
serviceType = config.Module
}
return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) {
meta := clientCfg.Processing.Meta.Clone()
fields := clientCfg.Processing.Fields.Clone()
setOptional(meta, "pipeline", config.Pipeline)
setOptional(fields, "fileset.name", config.Fileset)
setOptional(fields, "service.type", serviceType)
setOptional(fields, "input.type", config.Type)
if config.Module != "" {
event := mapstr.M{"module": config.Module}
if config.Fileset != "" {
event["dataset"] = config.Module + "." + config.Fileset
}
fields["event"] = event
}
// assemble the processors. Ordering is important.
// 1. add support for index configuration via processor
// 2. add processors added by the input that wants to connect
// 3. add locally configured processors from the 'processors' settings
procs := processors.NewList(nil)
if indexProcessor != nil {
procs.AddProcessor(indexProcessor)
}
if lst := clientCfg.Processing.Processor; lst != nil {
procs.AddProcessor(lst)
}
if userProcessors != nil {
procs.AddProcessors(*userProcessors)
}
clientCfg.Processing.EventMetadata = config.EventMetadata
clientCfg.Processing.Meta = meta
clientCfg.Processing.Fields = fields
clientCfg.Processing.Processor = procs
clientCfg.Processing.KeepNull = config.KeepNull
clientCfg.Processing.DisableHost = config.PublisherPipeline.DisableHost
return clientCfg, nil
}, nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Team:Elastic-Agent Label for the Agent team v8.7.0
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants