Skip to content

Commit

Permalink
Add line number counter for multiline
Browse files Browse the repository at this point in the history
This allows to indicate if an event was multiline or not. The number of lines will be put under the multiline namespace and looks as following:

```
{
  ...
  "message": "[2015] hello world\n  First Line\n  Second Line",
  "multiline": {
    "lines": 3
  },
  ...
}
```

See elastic#957

= Refactor fields handling with readers

Each reader can add fields to the message object. The reader itself should always add data under its own namespace to prevent conflicts. All these fields are then added to the Data object. This will allow each reader in the future to add its own data if needed.

The JSON reader was simplified in the way that data by default is written under the `json` namespace. Now no special fields have to be passed for JSON and the processing can still happen on the event level.

Further refactoring to the JSON processing should happen in an other PR as event is probably not the right place to happen, as also the JSON config should not be part of it.
  • Loading branch information
ruflin committed Sep 19, 2016
1 parent e21c5d1 commit bbd195e
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ https:/elastic/beats/compare/v5.0.0-beta1...master[Check the HEAD di
*Filebeat*

- Add command line option -once to run filebeat only once and then close {pull}2456[2456]
- Add multiline line number to filebeat event {pull}2279[2279]

*Winlogbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ func (h *Harvester) Harvest(r reader.Reader) {
event.ReadTime = message.Ts
event.Bytes = message.Bytes
event.Text = &text
event.JSONFields = message.Fields
event.EventMetadata = h.config.EventMetadata
event.Data = message.Fields
event.InputType = h.config.InputType
event.DocumentType = h.config.DocumentType
event.JSONConfig = h.config.JSON
Expand Down
5 changes: 4 additions & 1 deletion filebeat/harvester/reader/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ func (r *JSON) Next() (Message, error) {
if err != nil {
return message, err
}
message.Content, message.Fields = r.decodeJSON(message.Content)

var fields = common.MapStr{}
message.Content, fields = r.decodeJSON(message.Content)
message.AddFields(common.MapStr{"json": fields})
return message, nil
}
11 changes: 11 additions & 0 deletions filebeat/harvester/reader/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,14 @@ func (m *Message) IsEmpty() bool {

return false
}

func (msg *Message) AddFields(fields common.MapStr) {
if fields == nil {
return
}

if msg.Fields == nil {
msg.Fields = common.MapStr{}
}
msg.Fields.Update(fields)
}
7 changes: 6 additions & 1 deletion filebeat/harvester/reader/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"regexp"
"time"

"github.com/elastic/beats/libbeat/common"
)

// MultiLine reader combining multiple line events into one multi-line event.
Expand Down Expand Up @@ -211,7 +213,7 @@ func (mlr *Multiline) load(m Message) {
mlr.addLine(m)
// Timestamp of first message is taken as overall timestamp
mlr.message.Ts = m.Ts
mlr.message.Fields = m.Fields
mlr.message.AddFields(m.Fields)
}

// clearBuffer resets the reader buffer variables
Expand All @@ -225,6 +227,8 @@ func (mlr *Multiline) clear() {
// finalize writes the existing content into the returned message and resets all reader variables.
func (mlr *Multiline) finalize() Message {

lines := common.MapStr{"lines": mlr.numLines}
mlr.message.AddFields(common.MapStr{"multiline": lines})
// Copy message from existing content
msg := mlr.message
mlr.clear()
Expand Down Expand Up @@ -265,6 +269,7 @@ func (mlr *Multiline) addLine(m Message) {

mlr.last = m.Content
mlr.message.Bytes += m.Bytes
mlr.message.AddFields(m.Fields)
}

// resetState sets state of the reader to readFirst
Expand Down
27 changes: 19 additions & 8 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type Event struct {
DocumentType string
Bytes int
Text *string
JSONFields common.MapStr
JSONConfig *reader.JSONConfig
State file.State
Data common.MapStr
}

func NewEvent(state file.State) *Event {
Expand All @@ -39,8 +39,18 @@ func (f *Event) ToMapStr() common.MapStr {
"input_type": f.InputType,
}

if f.JSONConfig != nil && len(f.JSONFields) > 0 {
mergeJSONFields(f, event)
// Add data fields which are added by the readers
for key, value := range f.Data {
event[key] = value
}

var jsonFields common.MapStr
if fields, ok := event["json"]; ok {
jsonFields = fields.(common.MapStr)
}

if f.JSONConfig != nil && len(jsonFields) > 0 {
mergeJSONFields(f, event, jsonFields)
} else if f.Text != nil {
event["message"] = *f.Text
}
Expand All @@ -58,15 +68,18 @@ func (e *Event) HasData() bool {
// respecting the KeysUnderRoot and OverwriteKeys configuration options.
// If MessageKey is defined, the Text value from the event always
// takes precedence.
func mergeJSONFields(f *Event, event common.MapStr) {
func mergeJSONFields(f *Event, event common.MapStr, jsonFields common.MapStr) {

// The message key might have been modified by multiline
if len(f.JSONConfig.MessageKey) > 0 && f.Text != nil {
f.JSONFields[f.JSONConfig.MessageKey] = *f.Text
jsonFields[f.JSONConfig.MessageKey] = *f.Text
}

if f.JSONConfig.KeysUnderRoot {
for k, v := range f.JSONFields {
// Delete existing json key
delete(event, "json")

for k, v := range jsonFields {
if f.JSONConfig.OverwriteKeys {
if k == "@timestamp" {
vstr, ok := v.(string)
Expand Down Expand Up @@ -104,7 +117,5 @@ func mergeJSONFields(f *Event, event common.MapStr) {
event[k] = v
}
}
} else {
event["json"] = f.JSONFields
}
}
20 changes: 10 additions & 10 deletions filebeat/input/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestEventToMapStrJSON(t *testing.T) {
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "text": "hello"},
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true},
},
ExpectedItems: common.MapStr{
Expand All @@ -46,7 +46,7 @@ func TestEventToMapStrJSON(t *testing.T) {
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "text": "hello"},
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
Expand All @@ -59,7 +59,7 @@ func TestEventToMapStrJSON(t *testing.T) {
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "text": "hello"},
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{},
},
ExpectedItems: common.MapStr{
Expand All @@ -72,7 +72,7 @@ func TestEventToMapStrJSON(t *testing.T) {
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "text": "hi"},
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hi"}},
JSONConfig: &reader.JSONConfig{MessageKey: "text"},
},
ExpectedItems: common.MapStr{
Expand All @@ -87,7 +87,7 @@ func TestEventToMapStrJSON(t *testing.T) {
ReadTime: now,
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"},
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
Expand All @@ -102,7 +102,7 @@ func TestEventToMapStrJSON(t *testing.T) {
ReadTime: now,
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"},
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
Expand All @@ -118,7 +118,7 @@ func TestEventToMapStrJSON(t *testing.T) {
ReadTime: now,
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "@timestamp": 42},
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
Expand All @@ -132,7 +132,7 @@ func TestEventToMapStrJSON(t *testing.T) {
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": 42},
Data: common.MapStr{"json": common.MapStr{"type": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
Expand All @@ -145,7 +145,7 @@ func TestEventToMapStrJSON(t *testing.T) {
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": ""},
Data: common.MapStr{"json": common.MapStr{"type": ""}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
Expand All @@ -158,7 +158,7 @@ func TestEventToMapStrJSON(t *testing.T) {
Event: Event{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "_type"},
Data: common.MapStr{"json": common.MapStr{"type": "_type"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
Expand Down
33 changes: 33 additions & 0 deletions filebeat/tests/system/test_multiline.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,36 @@ def test_consecutive_newline(self):
output = self.read_output_json()
output[0]["message"] = logentry1
output[1]["message"] = logentry2

def test_multiline_lines(self):
"""
Test if multiline line counter is added
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
pattern="^\[",
negate="true",
match="after",
close_timeout="1s",
)

os.mkdir(self.working_dir + "/log/")

testfile = self.working_dir + "/log/test.log"

with open(testfile, 'w', 0) as file:
file.write("[2015] hello world\n")
file.write(" First Line\n")
file.write(" Second Line\n")

filebeat = self.start_beat()

self.wait_until(
lambda: self.output_has(lines=1),
max_timeout=10)

filebeat.check_kill_and_wait()

output = self.read_output_json()
output[0]["multiline"]["lines"] = 3
2 changes: 1 addition & 1 deletion libbeat/common/mapstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (m MapStr) String() string {
// An error is returned if underRoot is true and the value of ms.fields is not a
// MapStr.
func MergeFields(ms, fields MapStr, underRoot bool) error {
if ms == nil || fields == nil {
if ms == nil || len(fields) == 0 {
return nil
}

Expand Down

0 comments on commit bbd195e

Please sign in to comment.