Skip to content

Commit

Permalink
Cherry-pick #7991 to 6.x: Add tag "truncated" to "log.flags" if incom…
Browse files Browse the repository at this point in the history
…ing line is longer than configured limit (#8165)

* Add tag "truncated" to "log.flags" if incoming line is longer than configured limit (#7991)

A new field is added to store the flags of an event named "log.flags".
If a message is truncated, "truncated" flag is added to the list.

Example event with "truncated" flag:

{
  "@timestamp": "2018-08-16T13:00:46.759Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "doc",
    "version": "7.0.0-alpha1"
  },
  "host": {
    "name": "sleipnir"
  },
  "source": "/home/n/test.log",
  "offset": 33,
  "log": {
    "flags": [
       "truncated"
    ],
  },
  "message": "test line",
  "prospector": {
    "type": "log"
  },
  "input": {
    "type": "log"
  },
  "beat": {
    "hostname": "sleipnir",
    "version": "7.0.0-alpha1",
    "name": "sleipnir"
  }
}

Closes #7022
(cherry picked from commit 0884236)

* fix changelog && rebase
  • Loading branch information
kvch authored Sep 3, 2018
1 parent 3194bd0 commit 74ac00e
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ The list below covers the major changes between 6.3.0 and master only.
- Libbeat provides a global registry for beats developer that allow to register and retrieve plugin. {pull}7392[7392]
- Added more options to control required and optional fields in schema.Apply(), error returned is a plain nil if no error happened {pull}7335[7335]
- Packaging on MacOS now produces a .dmg file containing an installer (.pkg) and uninstaller for the Beat. {pull}7481[7481]
- New function `AddTagsWithKey` is added, so `common.MapStr` can be enriched with tags with an arbitrary key. {pull}7991[7991]
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ https:/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]

*Filebeat*

- Add tag "truncated" to "log.flags" if incoming line is longer than configured limit. {pull}7991[7991]

*Heartbeat*

*Metricbeat*
Expand Down
4 changes: 4 additions & 0 deletions filebeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@
description: >
Logging level.
- name: log.flags
description: >
This field contains the flags of the event.
- name: event.created
type: date
description: >
Expand Down
8 changes: 8 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2652,6 +2652,14 @@ type: keyword
Logging level.
--
*`log.flags`*::
+
--
This field contains the flags of the event.
--
*`event.created`*::
Expand Down
2 changes: 1 addition & 1 deletion filebeat/include/fields.go

Large diffs are not rendered by default.

23 changes: 19 additions & 4 deletions filebeat/reader/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,28 @@ func (m *Message) IsEmpty() bool {
return false
}

func (msg *Message) AddFields(fields common.MapStr) {
// AddFields adds fields to the message.
func (m *Message) AddFields(fields common.MapStr) {
if fields == nil {
return
}

if msg.Fields == nil {
msg.Fields = common.MapStr{}
if m.Fields == nil {
m.Fields = common.MapStr{}
}
msg.Fields.Update(fields)
m.Fields.Update(fields)
}

// AddFlagsWithKey adds flags to the message with an arbitrary key.
// If the field does not exist, it is created.
func (m *Message) AddFlagsWithKey(key string, flags ...string) error {
if len(flags) == 0 {
return nil
}

if m.Fields == nil {
m.Fields = common.MapStr{}
}

return common.AddTagsWithKey(m.Fields, key, flags)
}
17 changes: 17 additions & 0 deletions filebeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Reader struct {
separator []byte
last []byte
numLines int
truncated int
err error // last seen error
state func(*Reader) (reader.Message, error)
message reader.Message
Expand Down Expand Up @@ -262,13 +263,19 @@ func (mlr *Reader) clear() {
mlr.message = reader.Message{}
mlr.last = nil
mlr.numLines = 0
mlr.truncated = 0
mlr.err = nil
}

// finalize writes the existing content into the returned message and resets all reader variables.
func (mlr *Reader) finalize() reader.Message {
if mlr.truncated > 0 {
mlr.message.AddFlagsWithKey("log.flags", "truncated")
}

// Copy message from existing content
msg := mlr.message

mlr.clear()
return msg
}
Expand Down Expand Up @@ -303,6 +310,16 @@ func (mlr *Reader) addLine(m reader.Message) {
}
mlr.message.Content = append(tmp, m.Content[:space]...)
mlr.numLines++

// add number of truncated bytes to fields
diff := len(m.Content) - space
if diff > 0 {
mlr.truncated += diff
}
} else {
// increase the number of skipped bytes, if cannot add
mlr.truncated += len(m.Content)

}

mlr.last = m.Content
Expand Down
83 changes: 83 additions & 0 deletions filebeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,41 @@ func TestMultilineBeforeNegateOKWithEmptyLine(t *testing.T) {
)
}

func TestMultilineAfterTruncated(t *testing.T) {
pattern := match.MustCompile(`^[ ]`) // next line is indented a space
maxLines := 2
testMultilineTruncated(t,
Config{
Pattern: &pattern,
Match: "after",
MaxLines: &maxLines,
},
2,
true,
[]string{
"line1\n line1.1\n line1.2\n",
"line2\n line2.1\n line2.2\n"},
[]string{
"line1\n line1.1",
"line2\n line2.1"},
)
testMultilineTruncated(t,
Config{
Pattern: &pattern,
Match: "after",
MaxLines: &maxLines,
},
2,
false,
[]string{
"line1\n line1.1\n",
"line2\n line2.1\n"},
[]string{
"line1\n line1.1",
"line2\n line2.1"},
)
}

func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
_, buf := createLineBuffer(expected...)
r := createMultilineTestReader(t, buf, cfg)
Expand Down Expand Up @@ -177,6 +212,54 @@ func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
}
}

func testMultilineTruncated(t *testing.T, cfg Config, events int, truncated bool, input, expected []string) {
_, buf := createLineBuffer(input...)
r := createMultilineTestReader(t, buf, cfg)

var messages []reader.Message
for {
message, err := r.Next()
if err != nil {
break
}

messages = append(messages, message)
}

if len(messages) != events {
t.Fatalf("expected %v lines, read only %v line(s)", len(expected), len(messages))
}

for _, message := range messages {
found := false
statusFlags, err := message.Fields.GetValue("log.flags")
if err != nil {
if !truncated {
assert.False(t, found)
return
}
t.Fatalf("error while getting log.status field: %v", err)
}

switch flags := statusFlags.(type) {
case []string:
for _, f := range flags {
if f == "truncated" {
found = true
}
}
default:
t.Fatalf("incorrect type for log.flags")
}

if truncated {
assert.True(t, found)
} else {
assert.False(t, found)
}
}
}

func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg Config) reader.Reader {
encFactory, ok := encoding.FindEncoding("plain")
if !ok {
Expand Down
1 change: 1 addition & 0 deletions filebeat/reader/readfile/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (r *LimitReader) Next() (reader.Message, error) {
message, err := r.reader.Next()
if len(message.Content) > r.maxBytes {
message.Content = message.Content[:r.maxBytes]
message.AddFlagsWithKey("log.flags", "truncated")
}
return message, err
}
88 changes: 88 additions & 0 deletions filebeat/reader/readfile/limit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !integration

package readfile

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/reader"
)

type mockReader struct {
line []byte
}

func (m *mockReader) Next() (reader.Message, error) {
return reader.Message{
Content: m.line,
}, nil
}

var limitTests = []struct {
line string
maxBytes int
truncated bool
}{
{"long-long-line", 5, true},
{"long-long-line", 3, true},
{"long-long-line", len("long-long-line"), false},
}

func TestLimitReader(t *testing.T) {
for _, test := range limitTests {
r := NewLimitReader(&mockReader{[]byte(test.line)}, test.maxBytes)

msg, err := r.Next()
if err != nil {
t.Fatalf("Error reading from mock reader: %v", err)
}

assert.Equal(t, test.maxBytes, len(msg.Content))

found := false
statusFlags, err := msg.Fields.GetValue("log.flags")
if err != nil {
if !test.truncated {
assert.False(t, found)
return
}
t.Fatalf("Error getting truncated value: %v", err)
}

switch flags := statusFlags.(type) {
case []string:
for _, f := range flags {
if f == "truncated" {
found = true
}
}
default:
t.Fatalf("incorrect type for log.flags")
}

if test.truncated {
assert.True(t, found)
} else {
assert.False(t, found)
}
}
}
27 changes: 20 additions & 7 deletions libbeat/common/mapstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,25 +305,38 @@ func MergeFields(ms, fields MapStr, underRoot bool) error {
// exist then it will be created. If the tags field exists and is not a []string
// then an error will be returned. It does not deduplicate the list of tags.
func AddTags(ms MapStr, tags []string) error {
return AddTagsWithKey(ms, TagsKey, tags)
}

// AddTagsWithKey appends a tag to the key field of ms. If the field does not
// exist then it will be created. If the field exists and is not a []string
// then an error will be returned. It does not deduplicate the list.
func AddTagsWithKey(ms MapStr, key string, tags []string) error {
if ms == nil || len(tags) == 0 {
return nil
}
eventTags, exists := ms[TagsKey]
if !exists {
ms[TagsKey] = tags

k, subMap, oldTags, present, err := mapFind(key, ms, true)
if err != nil {
return err
}

if !present {
subMap[k] = tags
return nil
}

switch arr := eventTags.(type) {
switch arr := oldTags.(type) {
case []string:
ms[TagsKey] = append(arr, tags...)
subMap[k] = append(arr, tags...)
case []interface{}:
for _, tag := range tags {
arr = append(arr, tag)
}
ms[TagsKey] = arr
subMap[k] = arr
default:
return errors.Errorf("expected string array by type is %T", eventTags)
return errors.Errorf("expected string array by type is %T", oldTags)

}
return nil
}
Expand Down
Loading

0 comments on commit 74ac00e

Please sign in to comment.