From bbd195e15230c732ee0b8d39825b65e72c0bcaa5 Mon Sep 17 00:00:00 2001 From: ruflin Date: Tue, 16 Aug 2016 15:26:05 +0200 Subject: [PATCH] Add line number counter for multiline This allows to indicate if an event was multiline or not. The number of lines will be put under the multiline namespace and looks as following: ``` { ... "message": "[2015] hello world\n First Line\n Second Line", "multiline": { "lines": 3 }, ... } ``` See https://github.com/elastic/beats/issues/957 = Refactor fields handling with readers Each reader can add fields to the message object. The reader itself should always add data under its own namespace to prevent conflicts. All these fields are then added to the Data object. This will allow each reader in the future to add its own data if needed. The JSON reader was simplified in the way that data by default is written under the `json` namespace. Now no special fields have to be passed for JSON and the processing can still happen on the event level. Further refactoring to the JSON processing should happen in an other PR as event is probably not the right place to happen, as also the JSON config should not be part of it. --- CHANGELOG.asciidoc | 1 + filebeat/harvester/log.go | 2 +- filebeat/harvester/reader/json.go | 5 +++- filebeat/harvester/reader/message.go | 11 +++++++++ filebeat/harvester/reader/multiline.go | 7 +++++- filebeat/input/event.go | 27 ++++++++++++++------ filebeat/input/event_test.go | 20 +++++++-------- filebeat/tests/system/test_multiline.py | 33 +++++++++++++++++++++++++ libbeat/common/mapstr.go | 2 +- 9 files changed, 86 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a7b68c96c19..7c4df8d3e3e 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v5.0.0-beta1...master[Check the HEAD di *Filebeat* - Add command line option -once to run filebeat only once and then close {pull}2456[2456] +- Add multiline line number to filebeat event {pull}2279[2279] *Winlogbeat* diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index fabe0a2efe4..ee443124973 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -108,8 +108,8 @@ func (h *Harvester) Harvest(r reader.Reader) { event.ReadTime = message.Ts event.Bytes = message.Bytes event.Text = &text - event.JSONFields = message.Fields event.EventMetadata = h.config.EventMetadata + event.Data = message.Fields event.InputType = h.config.InputType event.DocumentType = h.config.DocumentType event.JSONConfig = h.config.JSON diff --git a/filebeat/harvester/reader/json.go b/filebeat/harvester/reader/json.go index 67deebdd5ff..d1bfca01f22 100644 --- a/filebeat/harvester/reader/json.go +++ b/filebeat/harvester/reader/json.go @@ -120,6 +120,9 @@ func (r *JSON) Next() (Message, error) { if err != nil { return message, err } - message.Content, message.Fields = r.decodeJSON(message.Content) + + var fields = common.MapStr{} + message.Content, fields = r.decodeJSON(message.Content) + message.AddFields(common.MapStr{"json": fields}) return message, nil } diff --git a/filebeat/harvester/reader/message.go b/filebeat/harvester/reader/message.go index 062c759e6a9..d5dcfc21081 100644 --- a/filebeat/harvester/reader/message.go +++ b/filebeat/harvester/reader/message.go @@ -31,3 +31,14 @@ func (m *Message) IsEmpty() bool { return false } + +func (msg *Message) AddFields(fields common.MapStr) { + if fields == nil { + return + } + + if msg.Fields == nil { + msg.Fields = common.MapStr{} + } + msg.Fields.Update(fields) +} diff --git a/filebeat/harvester/reader/multiline.go b/filebeat/harvester/reader/multiline.go index 94b6d2130f8..ae97acf4c73 100644 --- a/filebeat/harvester/reader/multiline.go +++ b/filebeat/harvester/reader/multiline.go @@ -5,6 +5,8 @@ import ( "fmt" "regexp" "time" + + "github.com/elastic/beats/libbeat/common" ) // MultiLine reader combining multiple line events into one multi-line event. @@ -211,7 +213,7 @@ func (mlr *Multiline) load(m Message) { mlr.addLine(m) // Timestamp of first message is taken as overall timestamp mlr.message.Ts = m.Ts - mlr.message.Fields = m.Fields + mlr.message.AddFields(m.Fields) } // clearBuffer resets the reader buffer variables @@ -225,6 +227,8 @@ func (mlr *Multiline) clear() { // finalize writes the existing content into the returned message and resets all reader variables. func (mlr *Multiline) finalize() Message { + lines := common.MapStr{"lines": mlr.numLines} + mlr.message.AddFields(common.MapStr{"multiline": lines}) // Copy message from existing content msg := mlr.message mlr.clear() @@ -265,6 +269,7 @@ func (mlr *Multiline) addLine(m Message) { mlr.last = m.Content mlr.message.Bytes += m.Bytes + mlr.message.AddFields(m.Fields) } // resetState sets state of the reader to readFirst diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 9bd43214c89..c7ecb70b2de 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -18,9 +18,9 @@ type Event struct { DocumentType string Bytes int Text *string - JSONFields common.MapStr JSONConfig *reader.JSONConfig State file.State + Data common.MapStr } func NewEvent(state file.State) *Event { @@ -39,8 +39,18 @@ func (f *Event) ToMapStr() common.MapStr { "input_type": f.InputType, } - if f.JSONConfig != nil && len(f.JSONFields) > 0 { - mergeJSONFields(f, event) + // Add data fields which are added by the readers + for key, value := range f.Data { + event[key] = value + } + + var jsonFields common.MapStr + if fields, ok := event["json"]; ok { + jsonFields = fields.(common.MapStr) + } + + if f.JSONConfig != nil && len(jsonFields) > 0 { + mergeJSONFields(f, event, jsonFields) } else if f.Text != nil { event["message"] = *f.Text } @@ -58,15 +68,18 @@ func (e *Event) HasData() bool { // respecting the KeysUnderRoot and OverwriteKeys configuration options. // If MessageKey is defined, the Text value from the event always // takes precedence. -func mergeJSONFields(f *Event, event common.MapStr) { +func mergeJSONFields(f *Event, event common.MapStr, jsonFields common.MapStr) { // The message key might have been modified by multiline if len(f.JSONConfig.MessageKey) > 0 && f.Text != nil { - f.JSONFields[f.JSONConfig.MessageKey] = *f.Text + jsonFields[f.JSONConfig.MessageKey] = *f.Text } if f.JSONConfig.KeysUnderRoot { - for k, v := range f.JSONFields { + // Delete existing json key + delete(event, "json") + + for k, v := range jsonFields { if f.JSONConfig.OverwriteKeys { if k == "@timestamp" { vstr, ok := v.(string) @@ -104,7 +117,5 @@ func mergeJSONFields(f *Event, event common.MapStr) { event[k] = v } } - } else { - event["json"] = f.JSONFields } } diff --git a/filebeat/input/event_test.go b/filebeat/input/event_test.go index 8001c512686..c678d262139 100644 --- a/filebeat/input/event_test.go +++ b/filebeat/input/event_test.go @@ -33,7 +33,7 @@ func TestEventToMapStrJSON(t *testing.T) { Event: Event{ DocumentType: "test_type", Text: &text, - JSONFields: common.MapStr{"type": "test", "text": "hello"}, + Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}}, JSONConfig: &reader.JSONConfig{KeysUnderRoot: true}, }, ExpectedItems: common.MapStr{ @@ -46,7 +46,7 @@ func TestEventToMapStrJSON(t *testing.T) { Event: Event{ DocumentType: "test_type", Text: &text, - JSONFields: common.MapStr{"type": "test", "text": "hello"}, + Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}}, JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ @@ -59,7 +59,7 @@ func TestEventToMapStrJSON(t *testing.T) { Event: Event{ DocumentType: "test_type", Text: &text, - JSONFields: common.MapStr{"type": "test", "text": "hello"}, + Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}}, JSONConfig: &reader.JSONConfig{}, }, ExpectedItems: common.MapStr{ @@ -72,7 +72,7 @@ func TestEventToMapStrJSON(t *testing.T) { Event: Event{ DocumentType: "test_type", Text: &text, - JSONFields: common.MapStr{"type": "test", "text": "hi"}, + Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hi"}}, JSONConfig: &reader.JSONConfig{MessageKey: "text"}, }, ExpectedItems: common.MapStr{ @@ -87,7 +87,7 @@ func TestEventToMapStrJSON(t *testing.T) { ReadTime: now, DocumentType: "test_type", Text: &text, - JSONFields: common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}, + Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}}, JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ @@ -102,7 +102,7 @@ func TestEventToMapStrJSON(t *testing.T) { ReadTime: now, DocumentType: "test_type", Text: &text, - JSONFields: common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}, + Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}}, JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ @@ -118,7 +118,7 @@ func TestEventToMapStrJSON(t *testing.T) { ReadTime: now, DocumentType: "test_type", Text: &text, - JSONFields: common.MapStr{"type": "test", "@timestamp": 42}, + Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": 42}}, JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ @@ -132,7 +132,7 @@ func TestEventToMapStrJSON(t *testing.T) { Event: Event{ DocumentType: "test_type", Text: &text, - JSONFields: common.MapStr{"type": 42}, + Data: common.MapStr{"json": common.MapStr{"type": 42}}, JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ @@ -145,7 +145,7 @@ func TestEventToMapStrJSON(t *testing.T) { Event: Event{ DocumentType: "test_type", Text: &text, - JSONFields: common.MapStr{"type": ""}, + Data: common.MapStr{"json": common.MapStr{"type": ""}}, JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ @@ -158,7 +158,7 @@ func TestEventToMapStrJSON(t *testing.T) { Event: Event{ DocumentType: "test_type", Text: &text, - JSONFields: common.MapStr{"type": "_type"}, + Data: common.MapStr{"json": common.MapStr{"type": "_type"}}, JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ diff --git a/filebeat/tests/system/test_multiline.py b/filebeat/tests/system/test_multiline.py index e0c5ed27bff..2b8ef1c66fe 100644 --- a/filebeat/tests/system/test_multiline.py +++ b/filebeat/tests/system/test_multiline.py @@ -338,3 +338,36 @@ def test_consecutive_newline(self): output = self.read_output_json() output[0]["message"] = logentry1 output[1]["message"] = logentry2 + + def test_multiline_lines(self): + """ + Test if multiline line counter is added + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + multiline=True, + pattern="^\[", + negate="true", + match="after", + close_timeout="1s", + ) + + os.mkdir(self.working_dir + "/log/") + + testfile = self.working_dir + "/log/test.log" + + with open(testfile, 'w', 0) as file: + file.write("[2015] hello world\n") + file.write(" First Line\n") + file.write(" Second Line\n") + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + filebeat.check_kill_and_wait() + + output = self.read_output_json() + output[0]["multiline"]["lines"] = 3 diff --git a/libbeat/common/mapstr.go b/libbeat/common/mapstr.go index b7add33e1cd..7e67f0493e2 100644 --- a/libbeat/common/mapstr.go +++ b/libbeat/common/mapstr.go @@ -241,7 +241,7 @@ func (m MapStr) String() string { // An error is returned if underRoot is true and the value of ms.fields is not a // MapStr. func MergeFields(ms, fields MapStr, underRoot bool) error { - if ms == nil || fields == nil { + if ms == nil || len(fields) == 0 { return nil }