From 954efc9dcbf7e3a2d743bb8982c7459aaf6cfd3f Mon Sep 17 00:00:00 2001 From: Mariana Date: Fri, 15 May 2020 16:02:29 +0200 Subject: [PATCH 1/3] add array as parser --- x-pack/filebeat/input/azureeventhub/input.go | 27 +++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 2cf6494f8d7..52e5259b22e 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -203,14 +203,27 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo // parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { - var obj map[string][]interface{} - err := json.Unmarshal(bMessage, &obj) - if err != nil { - a.log.Errorw(fmt.Sprintf("deserializing multiple messages using the group object `records`"), "error", err) - } + var mapObject map[string][]interface{} + var arrayObject []interface{} var messages []string - if len(obj[expandEventListFromField]) > 0 { - for _, ms := range obj[expandEventListFromField] { + // check if the message is an object containing a list of events + err := json.Unmarshal(bMessage, &mapObject) + if err == nil { + if len(mapObject[expandEventListFromField]) > 0 { + for _, ms := range mapObject[expandEventListFromField] { + js, err := json.Marshal(ms) + if err == nil { + messages = append(messages, string(js)) + } else { + a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) + } + } + } + } else { + // in some cases the message is an array + a.log.Debugw(fmt.Sprintf("deserializing multiple messages using the group object `records`"), "warning", err) + err = json.Unmarshal(bMessage, &arrayObject) + for _, ms := range arrayObject { js, err := json.Marshal(ms) if err == nil { messages = append(messages, string(js)) From 7f70c2a50fbdddb1f2622cba613b278383101b4f Mon Sep 17 00:00:00 2001 From: Mariana Date: Fri, 15 May 2020 16:13:50 +0200 Subject: [PATCH 2/3] changelog --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/azureeventhub/input.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 50bf403c838..270f2be63d0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -317,6 +317,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Improve ECS categorization field mappings in system module. {issue}16031[16031] {pull}18065[18065] - Change the `json.*` input settings implementation to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958] - Improve ECS categorization field mappings in osquery module. {issue}16176[16176] {pull}17881[17881] +- Add support for array parsing in azure-eventhub input. {pull}18585[18585] *Heartbeat* diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 52e5259b22e..556907e9bcf 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -204,7 +204,6 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo // parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { var mapObject map[string][]interface{} - var arrayObject []interface{} var messages []string // check if the message is an object containing a list of events err := json.Unmarshal(bMessage, &mapObject) @@ -222,6 +221,7 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { } else { // in some cases the message is an array a.log.Debugw(fmt.Sprintf("deserializing multiple messages using the group object `records`"), "warning", err) + var arrayObject []interface{} err = json.Unmarshal(bMessage, &arrayObject) for _, ms := range arrayObject { js, err := json.Marshal(ms) From 3790bf3d57e1767512406d0d484b584041482ed2 Mon Sep 17 00:00:00 2001 From: Mariana Date: Tue, 19 May 2020 15:39:15 +0200 Subject: [PATCH 3/3] update test --- x-pack/filebeat/input/azureeventhub/input.go | 9 ++++-- .../input/azureeventhub/input_test.go | 31 ++++++++++++++++--- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index 556907e9bcf..de9c7cf9d79 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -205,7 +205,7 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { var mapObject map[string][]interface{} var messages []string - // check if the message is an object containing a list of events + // check if the message is a "records" object containing a list of events err := json.Unmarshal(bMessage, &mapObject) if err == nil { if len(mapObject[expandEventListFromField]) > 0 { @@ -219,10 +219,15 @@ func (a *azureInput) parseMultipleMessages(bMessage []byte) []string { } } } else { + a.log.Debugf("deserializing multiple messages to a `records` object returning error: %s", err) // in some cases the message is an array - a.log.Debugw(fmt.Sprintf("deserializing multiple messages using the group object `records`"), "warning", err) var arrayObject []interface{} err = json.Unmarshal(bMessage, &arrayObject) + if err != nil { + // return entire message + a.log.Debugf("deserializing multiple messages to an array returning error: %s", err) + return []string{string(bMessage)} + } for _, ms := range arrayObject { js, err := json.Marshal(ms) if err == nil { diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index 6e6cd47484c..8a48593c1ca 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/logp" + eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/stretchr/testify/assert" @@ -70,17 +72,38 @@ func TestProcessEvents(t *testing.T) { } func TestParseMultipleMessages(t *testing.T) { + // records object msg := "{\"records\":[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}" - input := azureInput{} - messages := input.parseMultipleMessages([]byte(msg)) - assert.NotNil(t, messages) - assert.Equal(t, len(messages), 3) msgs := []string{ fmt.Sprintf("{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), fmt.Sprintf("{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), fmt.Sprintf("{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}")} + input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))} + messages := input.parseMultipleMessages([]byte(msg)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } + + // array of events + msg1 := "[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]" + messages = input.parseMultipleMessages([]byte(msg1)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 3) + for _, ms := range messages { + assert.Contains(t, msgs, ms) + } + + // one event only + msg2 := "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}" + messages = input.parseMultipleMessages([]byte(msg2)) + assert.NotNil(t, messages) + assert.Equal(t, len(messages), 1) for _, ms := range messages { assert.Contains(t, msgs, ms) }