Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing preprocessors on the module input override #27154

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions filebeat/docs/filebeat-modules-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,39 @@ You can also enable `close_eof` for all inputs created by any of the modules:
-M "*.*.input.close_eof=true"
----------------------------------------------------------------------

==== Processors

As described in <<where-valid,Where are processors valid?>> you can define `processors`
under the input section of the module definition. These processors are run _after_ any processors defined in the
module.

[source,yaml]
----------------------------------------------------------------------
- module: nginx
access:
input:
processors:
- drop_event:
when:
contains:
source: "test"
----------------------------------------------------------------------

Alternatively you can define `preprocessors` which are run after the input but _before_ the processors defined
in the module. Preprocessors can be useful if we don't retrieve the data directly from the log source and the
intermediate channel introduces additional 'wrapping'.

[source,yaml]
----------------------------------------------------------------------
- module: nginx
access:
input:
preprocessors:
- drop_event:
when:
contains:
source: "test"
----------------------------------------------------------------------


:modulename!:
14 changes: 14 additions & 0 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,21 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) {
if err != nil {
return nil, fmt.Errorf("Error creating config from input overrides: %v", err)
}

cfg, err = common.MergeConfigsWithOptions([]*common.Config{cfg, overrides}, ucfg.FieldReplaceValues("**.paths"), ucfg.FieldAppendValues("**.processors"))
if overrides.HasField("preprocessors") {
preprocessors, err := overrides.Child("preprocessors", -1)
if err != nil {
return nil, fmt.Errorf("Error creating config from input overrides: %v", err)
}
overrides, err = common.NewConfigFrom(map[string]interface{}{
"processors": preprocessors,
})
if err != nil {
return nil, fmt.Errorf("Error creating config from input overrides: %v", err)
}
cfg, err = common.MergeConfigsWithOptions([]*common.Config{cfg, overrides}, ucfg.FieldReplaceValues("**.paths"), ucfg.FieldPrependValues("**.processors"))
}
if err != nil {
return nil, fmt.Errorf("Error applying config overrides: %v", err)
}
Expand Down
94 changes: 94 additions & 0 deletions filebeat/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,100 @@ func TestGetInputConfigNginxOverrides(t *testing.T) {
require.Equal(t, "foobar", v)
},
},
"processors": {
map[string]interface{}{
"processors": []map[string]interface{}{
map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"contains": map[string]interface{}{
"source": "test",
},
},
},
},
},
},
func(t require.TestingT, cfg interface{}, rest ...interface{}) {
c, ok := cfg.(*common.Config)
if !ok {
t.FailNow()
}

require.True(t, c.HasField("processors"))
count, _ := c.CountField("processors")
require.Equal(t, 3, count)
processors, err := c.Child("processors", -1)
require.NoError(t, err)
dropProcessor, err := processors.Child("", 2)
require.NoError(t, err)
require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' last")
},
},
"preprocessors": {
map[string]interface{}{
"close_eof": true,
"preprocessors": []map[string]interface{}{
map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"contains": map[string]interface{}{
"source": "test",
},
},
},
},
},
},
func(t require.TestingT, cfg interface{}, rest ...interface{}) {
c, ok := cfg.(*common.Config)
if !ok {
t.FailNow()
}

require.True(t, c.HasField("processors"))
count, _ := c.CountField("processors")
require.Equal(t, 3, count, "Expecting processors to be merged 2 from module 1 from preprocessor")
processors, err := c.Child("processors", -1)
require.NoError(t, err)
addFieldsProcessor, err := processors.Child("", 2)
require.NoError(t, err)
require.True(t, addFieldsProcessor.HasField("add_fields"), "should contain 'add_fields' last")
},
},
"combinedProcessors": {
map[string]interface{}{
"close_eof": true,
"processors": []map[string]interface{}{
map[string]interface{}{
"drop_event": map[string]interface{}{},
},
},
"preprocessors": []map[string]interface{}{
map[string]interface{}{
"drop_event": map[string]interface{}{},
},
},
},
func(t require.TestingT, cfg interface{}, rest ...interface{}) {
c, ok := cfg.(*common.Config)
if !ok {
t.FailNow()
}

require.True(t, c.HasField("processors"))
count, _ := c.CountField("processors")
require.Equal(t, 4, count, "Expecting processors to be merged: 2 from module, 1 from preprocessor, 1 from processor")
processors, err := c.Child("processors", -1)
require.NoError(t, err)
dropProcessor, err := processors.Child("", 0)
require.NoError(t, err)
require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' first")
dropProcessor, err = processors.Child("", 3)
require.NoError(t, err)
require.True(t, dropProcessor.HasField("drop_event"), "should contain 'drop_event' last")
},
},
}

for name, test := range tests {
Expand Down