Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tag "truncated" to "log.flags" if incoming line is longer than configured limit #7991

1 change: 1 addition & 0 deletions CHANGELOG-developer.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ The list below covers the major changes between 6.3.0 and master only.
coverage profile (.cov), and produces an HTML coverage report. See
`mage -h goTestUnit`. {pull}7766[7766]
- Beats packaging now build non-oss binaries from code located in the x-pack folder. {issue}7783[7783]
- New function `AddTagsWithKey` is added, so `common.MapStr` can be enriched with tags with an arbitrary key. {pull}7991[7991]
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ https:/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Add custom unpack to log hints config to avoid env resolution {pull}7710[7710]
- Keep raw user agent information after parsing as user_agent_raw in Filebeat modules. {pull}7823[7832]
- Make docker input check if container strings are empty {pull}7960[7960]
- Add tag "truncated" to "log.flags" if incoming line is longer than configured limit. {pull}7991[7991]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
description: >
Logging level.

- name: log.flags
description: >
This field contains the flags of the event.

- name: event.created
type: date
description: >
Expand Down
8 changes: 8 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2682,6 +2682,14 @@ type: keyword
Logging level.


--

*`log.flags`*::
+
--
This field contains the flags of the event.


--

*`event.created`*::
Expand Down
2 changes: 1 addition & 1 deletion filebeat/include/fields.go

Large diffs are not rendered by default.

23 changes: 19 additions & 4 deletions filebeat/reader/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,28 @@ func (m *Message) IsEmpty() bool {
return false
}

func (msg *Message) AddFields(fields common.MapStr) {
// AddFields adds fields to the message.
func (m *Message) AddFields(fields common.MapStr) {
if fields == nil {
return
}

if msg.Fields == nil {
msg.Fields = common.MapStr{}
if m.Fields == nil {
m.Fields = common.MapStr{}
}
msg.Fields.Update(fields)
m.Fields.Update(fields)
}

// AddFlagsWithKey adds flags to the message with an arbitrary key.
// If the field does not exist, it is created.
func (m *Message) AddFlagsWithKey(key string, flags ...string) error {
if len(flags) == 0 {
return nil
}

if m.Fields == nil {
m.Fields = common.MapStr{}
}

return common.AddTagsWithKey(m.Fields, key, flags)
}
17 changes: 17 additions & 0 deletions filebeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Reader struct {
separator []byte
last []byte
numLines int
truncated int
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't think about multiline being able to capture truncated bytes as well. Good catch 👍

err error // last seen error
state func(*Reader) (reader.Message, error)
message reader.Message
Expand Down Expand Up @@ -262,13 +263,19 @@ func (mlr *Reader) clear() {
mlr.message = reader.Message{}
mlr.last = nil
mlr.numLines = 0
mlr.truncated = 0
mlr.err = nil
}

// finalize writes the existing content into the returned message and resets all reader variables.
func (mlr *Reader) finalize() reader.Message {
if mlr.truncated > 0 {
mlr.message.AddFlagsWithKey("log.flags", "truncated")
}

// Copy message from existing content
msg := mlr.message

mlr.clear()
return msg
}
Expand Down Expand Up @@ -303,6 +310,16 @@ func (mlr *Reader) addLine(m reader.Message) {
}
mlr.message.Content = append(tmp, m.Content[:space]...)
mlr.numLines++

// add number of truncated bytes to fields
diff := len(m.Content) - space
if diff > 0 {
mlr.truncated += diff
}
} else {
// increase the number of skipped bytes, if cannot add
mlr.truncated += len(m.Content)

}

mlr.last = m.Content
Expand Down
83 changes: 83 additions & 0 deletions filebeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,41 @@ func TestMultilineBeforeNegateOKWithEmptyLine(t *testing.T) {
)
}

func TestMultilineAfterTruncated(t *testing.T) {
pattern := match.MustCompile(`^[ ]`) // next line is indented a space
maxLines := 2
testMultilineTruncated(t,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add a test where the truncated flag should be misssing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Config{
Pattern: &pattern,
Match: "after",
MaxLines: &maxLines,
},
2,
true,
[]string{
"line1\n line1.1\n line1.2\n",
"line2\n line2.1\n line2.2\n"},
[]string{
"line1\n line1.1",
"line2\n line2.1"},
)
testMultilineTruncated(t,
Config{
Pattern: &pattern,
Match: "after",
MaxLines: &maxLines,
},
2,
false,
[]string{
"line1\n line1.1\n",
"line2\n line2.1\n"},
[]string{
"line1\n line1.1",
"line2\n line2.1"},
)
}

func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
_, buf := createLineBuffer(expected...)
r := createMultilineTestReader(t, buf, cfg)
Expand Down Expand Up @@ -177,6 +212,54 @@ func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
}
}

func testMultilineTruncated(t *testing.T, cfg Config, events int, truncated bool, input, expected []string) {
_, buf := createLineBuffer(input...)
r := createMultilineTestReader(t, buf, cfg)

var messages []reader.Message
for {
message, err := r.Next()
if err != nil {
break
}

messages = append(messages, message)
}

if len(messages) != events {
t.Fatalf("expected %v lines, read only %v line(s)", len(expected), len(messages))
}

for _, message := range messages {
found := false
statusFlags, err := message.Fields.GetValue("log.flags")
if err != nil {
if !truncated {
assert.False(t, found)
return
}
t.Fatalf("error while getting log.status field: %v", err)
}

switch flags := statusFlags.(type) {
case []string:
for _, f := range flags {
if f == "truncated" {
found = true
}
}
default:
t.Fatalf("incorrect type for log.flags")
}

if truncated {
assert.True(t, found)
} else {
assert.False(t, found)
}
}
}

func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reader.Reader {
encFactory, ok := encoding.FindEncoding("plain")
if !ok {
Expand Down
1 change: 1 addition & 0 deletions filebeat/reader/readfile/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (r *LimitReader) Next() (reader.Message, error) {
message, err := r.reader.Next()
if len(message.Content) > r.maxBytes {
message.Content = message.Content[:r.maxBytes]
message.AddFlagsWithKey("log.flags", "truncated")
}
return message, err
}
88 changes: 88 additions & 0 deletions filebeat/reader/readfile/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !integration

package readfile

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/reader"
)

type mockReader struct {
line []byte
}

func (m *mockReader) Next() (reader.Message, error) {
return reader.Message{
Content: m.line,
}, nil
}

var limitTests = []struct {
line string
maxBytes int
truncated bool
}{
{"long-long-line", 5, true},
{"long-long-line", 3, true},
{"long-long-line", len("long-long-line"), false},
}

func TestLimitReader(t *testing.T) {
for _, test := range limitTests {
r := NewLimitReader(&mockReader{[]byte(test.line)}, test.maxBytes)

msg, err := r.Next()
if err != nil {
t.Fatalf("Error reading from mock reader: %v", err)
}

assert.Equal(t, test.maxBytes, len(msg.Content))

found := false
statusFlags, err := msg.Fields.GetValue("log.flags")
if err != nil {
if !test.truncated {
assert.False(t, found)
return
}
t.Fatalf("Error getting truncated value: %v", err)
}

switch flags := statusFlags.(type) {
case []string:
for _, f := range flags {
if f == "truncated" {
found = true
}
}
default:
t.Fatalf("incorrect type for log.flags")
}

if test.truncated {
assert.True(t, found)
} else {
assert.False(t, found)
}
}
}
27 changes: 20 additions & 7 deletions libbeat/common/mapstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,25 +305,38 @@ func MergeFields(ms, fields MapStr, underRoot bool) error {
// exist then it will be created. If the tags field exists and is not a []string
// then an error will be returned. It does not deduplicate the list of tags.
func AddTags(ms MapStr, tags []string) error {
return AddTagsWithKey(ms, TagsKey, tags)
}

// AddTagsWithKey appends a tag to the key field of ms. If the field does not
// exist then it will be created. If the field exists and is not a []string
// then an error will be returned. It does not deduplicate the list.
func AddTagsWithKey(ms MapStr, key string, tags []string) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for the AddTagsWithKey method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I would keep the naming here as Tags even though I would rename everything in filebeat code above to Flags.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Done.

if ms == nil || len(tags) == 0 {
return nil
}
eventTags, exists := ms[TagsKey]
if !exists {
ms[TagsKey] = tags

k, subMap, oldTags, present, err := mapFind(key, ms, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wodner if this has a perfomance impact on the old AddTags method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Keys without dots can be processed in the first loop on a fast path in mapFind. If the key exists as is, it's simply returned at the beginning.

// Fast path, key is present as is.
if v, exists := data[key]; exists {
    return key, data, v, true, nil
}

If not, the only additional cost is one IndexRune which looks for a dot in the key. But currently old tags don't have dots in their keys. Thus, the function returns at the second possible point without doing anything "expensive" e.g. creating additional submaps.

idx := strings.IndexRune(key, '.')
if idx < 0 {
    return key, data, nil, false, nil
}

if err != nil {
return err
}

if !present {
subMap[k] = tags
return nil
}

switch arr := eventTags.(type) {
switch arr := oldTags.(type) {
case []string:
ms[TagsKey] = append(arr, tags...)
subMap[k] = append(arr, tags...)
case []interface{}:
for _, tag := range tags {
arr = append(arr, tag)
}
ms[TagsKey] = arr
subMap[k] = arr
default:
return errors.Errorf("expected string array by type is %T", eventTags)
return errors.Errorf("expected string array by type is %T", oldTags)

}
return nil
}
Expand Down
Loading