Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move agent metadata to a processor #9952

Merged
merged 5 commits into from
Jan 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ The list below covers the major changes between 7.0.0-alpha2 and master only.
==== Added

- Allow multiple object type configurations per field. {pull}9772[9772]
- Move agent metadata addition to a processor. {pull}9952[9952]
4 changes: 4 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type ClientConfig struct {
// if the normalization step should be skipped set this to true.
SkipNormalization bool

// By default events are decorated with agent metadata.
// To skip adding that metadata set this to true.
SkipAgentMetadata bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was not even aware we have these options 👍

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The settings 'Fields', 'Meta', 'EventMetadata', 'DynamicFields' are legacy fields we will remove in the future in favor of processors (I'm in the middle of introducing add_fields and add_tags processor).

I think I'd prefer to be able to set the agent metadata in the pipeline constructor, and have this optionally be empty in comparison to build more logic/support for the client to change/overwrite global settings. But I know it's not really possible yet to 'influence' the constructor.

By introducing SkipAgentMetadata we loose the Beats meta-data always. Is this really intended? I wonder if we want to change but the beats agent meta-data into another namespace (e.g. collector.agent...).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By introducing SkipAgentMetadata we loose the Beats meta-data always. Is this really intended?

Yes, for APM we never want libbeat to set these fields, even if apm-server hasn't set them.
Another namespace would be fine for the APM case, as we had planned to put them under observer. In that case, we'd want these to be merged (overwrite would be fine even) as observer will have other fields too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can later put the Beat info under @metadata so apm-server could still fetch it.

I suggest for now to move forward with this as the only customer of this for now will be apm-server and we can still improve it later. On the Beats side this should not change anything.


// ACK handler strategies.
// Note: ack handlers are run in another go-routine owned by the publisher pipeline.
// They should not block for to long, to not block the internal buffers for
Expand Down
11 changes: 0 additions & 11 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ func Load(
Annotations: Annotations{
Event: config.EventMetadata,
Builtin: common.MapStr{
"agent": common.MapStr{
"type": beatInfo.Beat,
"hostname": beatInfo.Hostname,
"version": beatInfo.Version,
"id": beatInfo.ID.String(),
"ephemeral_id": beatInfo.EphemeralID.String(),
},
"host": common.MapStr{
"name": name,
},
Expand All @@ -97,10 +90,6 @@ func Load(
},
}

if name != beatInfo.Hostname {
settings.Annotations.Builtin.Put("agent.name", name)
}

queueBuilder, err := createQueueBuilder(config.Queue, monitors)
if err != nil {
return nil, err
Expand Down
21 changes: 20 additions & 1 deletion libbeat/publisher/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ func newProcessorPipeline(
processors.add(makeAddFieldsProcessor("beatsMeta", meta, needsCopy))
}

// setup 7: pipeline processors list
// setup 7: add agent metadata
if !config.SkipAgentMetadata {
processors.add(makeAddAgentMetadataProcessor(info))
}

// setup 8: pipeline processors list
processors.add(global.processors)

// setup 9: debug print final event (P)
Expand Down Expand Up @@ -290,6 +295,20 @@ func makeAddDynMetaProcessor(
})
}

func makeAddAgentMetadataProcessor(info beat.Info) *processorFn {
metadata := common.MapStr{
"type": info.Beat,
"ephemeral_id": info.EphemeralID.String(),
"hostname": info.Hostname,
"id": info.ID.String(),
"version": info.Version,
}
if info.Name != info.Hostname {
metadata.Put("name", info.Name)
}
return makeAddFieldsProcessor("add_agent_metadata", common.MapStr{"agent": metadata}, true)
}

func debugPrintProcessor(info beat.Info) *processorFn {
// ensure only one go-routine is using the encoder (in case
// beat.Client is shared between multiple go-routines by accident)
Expand Down
159 changes: 116 additions & 43 deletions libbeat/publisher/pipeline/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,36 @@ import (
"testing"
"time"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestProcessors(t *testing.T) {
info := beat.Info{}
defaultInfo := beat.Info{}

type local struct {
config beat.ClientConfig
events []common.MapStr
expected []common.MapStr
config beat.ClientConfig
events []common.MapStr
expected []common.MapStr
includeAgentMetadata bool
}

tests := []struct {
name string
global pipelineProcessors
local []local
info *beat.Info
}{
{
"user global fields and tags",
pipelineProcessors{
name: "user global fields and tags",
global: pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
[]local{
local: []local{
{
config: beat.ClientConfig{},
events: []common.MapStr{{"value": "abc", "user": nil}},
Expand All @@ -59,12 +62,12 @@ func TestProcessors(t *testing.T) {
},
},
{
"no normalization",
pipelineProcessors{
name: "no normalization",
global: pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
[]local{
local: []local{
{
config: beat.ClientConfig{SkipNormalization: true},
events: []common.MapStr{{"value": "abc", "user": nil}},
Expand All @@ -75,9 +78,78 @@ func TestProcessors(t *testing.T) {
},
},
{
"beat local fields",
pipelineProcessors{},
[]local{
name: "add agent metadata",
global: pipelineProcessors{
fields: common.MapStr{"global": 1, "agent": common.MapStr{"foo": "bar"}},
tags: []string{"tag"},
},
info: &beat.Info{
Beat: "test",
EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")),
Hostname: "test.host.name",
ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")),
Name: "test.host.name",
Version: "0.1",
},
local: []local{
{
config: beat.ClientConfig{},
events: []common.MapStr{{"value": "abc", "user": nil}},
expected: []common.MapStr{
{
"agent": common.MapStr{
"ephemeral_id": "123e4567-e89b-12d3-a456-426655440000",
"hostname": "test.host.name",
"id": "123e4567-e89b-12d3-a456-426655440001",
"type": "test",
"version": "0.1",
"foo": "bar",
},
"value": "abc", "global": 1, "tags": []string{"tag"},
},
},
includeAgentMetadata: true,
},
},
},
{
name: "add agent metadata with custom host.name",
global: pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
info: &beat.Info{
Beat: "test",
EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")),
Hostname: "test.host.name",
ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")),
Name: "other.test.host.name",
Version: "0.1",
},
local: []local{
{
config: beat.ClientConfig{},
events: []common.MapStr{{"value": "abc", "user": nil}},
expected: []common.MapStr{
{
"agent": common.MapStr{
"ephemeral_id": "123e4567-e89b-12d3-a456-426655440000",
"hostname": "test.host.name",
"id": "123e4567-e89b-12d3-a456-426655440001",
"name": "other.test.host.name",
"type": "test",
"version": "0.1",
},
"value": "abc", "global": 1, "tags": []string{"tag"},
},
},
includeAgentMetadata: true,
},
},
},
{
name: "beat local fields",
local: []local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
Expand All @@ -88,12 +160,12 @@ func TestProcessors(t *testing.T) {
},
},
{
"beat local and user global fields",
pipelineProcessors{
name: "beat local and user global fields",
global: pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
Expand All @@ -106,12 +178,12 @@ func TestProcessors(t *testing.T) {
},
},
{
"user global fields overwrite beat local fields",
pipelineProcessors{
name: "user global fields overwrite beat local fields",
global: pipelineProcessors{
fields: common.MapStr{"global": 1, "shared": "global"},
tags: []string{"tag"},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1, "shared": "local"},
Expand All @@ -124,9 +196,8 @@ func TestProcessors(t *testing.T) {
},
},
{
"beat local fields isolated",
pipelineProcessors{},
[]local{
name: "beat local fields isolated",
local: []local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
Expand All @@ -145,11 +216,11 @@ func TestProcessors(t *testing.T) {
},

{
"beat local fields + user global fields isolated",
pipelineProcessors{
name: "beat local fields + user global fields isolated",
global: pipelineProcessors{
fields: common.MapStr{"global": 0},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
Expand All @@ -167,9 +238,8 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local fields and tags",
pipelineProcessors{},
[]local{
name: "user local fields and tags",
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand All @@ -185,9 +255,8 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local fields (under root) and tags",
pipelineProcessors{},
[]local{
name: "user local fields (under root) and tags",
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand All @@ -204,12 +273,12 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local fields overwrite user global fields",
pipelineProcessors{
name: "user local fields overwrite user global fields",
global: pipelineProcessors{
fields: common.MapStr{"global": 0, "shared": "global"},
tags: []string{"global"},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand All @@ -230,9 +299,8 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local fields isolated",
pipelineProcessors{},
[]local{
name: "user local fields isolated",
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand All @@ -254,11 +322,11 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local + global fields isolated",
pipelineProcessors{
name: "user local + global fields isolated",
global: pipelineProcessors{
fields: common.MapStr{"fields": common.MapStr{"global": 0}},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand All @@ -280,11 +348,11 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local + global fields isolated (fields with root)",
pipelineProcessors{
name: "user local + global fields isolated (fields with root)",
global: pipelineProcessors{
fields: common.MapStr{"global": 0},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand Down Expand Up @@ -314,7 +382,12 @@ func TestProcessors(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
// create processor pipelines
programs := make([]beat.Processor, len(test.local))
info := defaultInfo
if test.info != nil {
info = *test.info
}
for i, local := range test.local {
local.config.SkipAgentMetadata = !local.includeAgentMetadata
programs[i] = newProcessorPipeline(info, test.global, local.config)
}

Expand Down