diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index db893e443ac3..fa52f1123827 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -83,6 +83,43 @@ type s3EventV2 struct { } `json:"s3"` } +// eventBridgeEvents is 'Object Created' payload generated by AWS EventBridge +// At the moment it doesn't seem to have a version +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html +type eventBridgeEvents struct { + Messages []eventBridgeEvent `json:"messages"` +} + +// Object created event. +type eventBridgeEvent struct { + Version string `json:"version"` + Id string `json:"id"` + DetailType string `json:"detail-type"` + Source string `json:"source"` + Account string `json:"account"` + Time string `json:"time"` + Region string `json:"region"` + Resources []string `json:"resources"` + Detail struct { + Version string `json:"version"` + Bucket struct { + Name string `json:"name"` + } + Object struct { + Key string `json:"key"` + Size int `json:"size"` + Etag string `json:"etag"` + VersionId string `json:"version-id"` + Sequencer string `json:"sequencer"` + } + RequestId string `json:"request-id"` + Requester string `json:"requester"` + SourceIpAddress string `json:"source-ip-address"` + Reason string `json:"reason"` + } +} + type sqsS3EventProcessor struct { s3ObjectHandler s3ObjectHandlerFactory sqsVisibilityTimeout time.Duration @@ -252,6 +289,17 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro } } + // Check if the notification is from S3 -> EventBridge -> SQS + var eventBridgeEvents eventBridgeEvents + if events.Records == nil { + dec := json.NewDecoder(strings.NewReader(events.Message)) + if err := dec.Decode(&eventBridgeEvents); err != nil { + p.log.Debugw("Could not parse message as EventBridge payload", "sqs_message_body", body) + } else { + convertEventBridge(&eventBridgeEvents, &events) + } + } + if events.Records == nil { p.log.Debugw("Invalid SQS message body: missing Records field", "sqs_message_body", body) return nil, errors.New("the message is an invalid S3 notification: missing Records field") @@ -260,6 +308,28 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro return p.getS3Info(events) } +func convertEventBridge(eventBridgeEvents *eventBridgeEvents, s3Events *s3EventsV2) { + for _, message := range eventBridgeEvents.Messages { + s3Events.Records = append(s3Events.Records, convertEventBridgeEvent(&message)) + } +} + +func convertEventBridgeEvent(message *eventBridgeEvent) s3EventV2 { + var event = s3EventV2{} + if message.DetailType == "Object Created" { + event.SetEventName("ObjectCreated:Put") + } + event.SetS3BucketARN(message.Resources[0]) + event.SetAWSRegion(message.Region) + if message.Source == "aws.s3" { + event.SetEventSource("aws:s3") + } + event.SetS3BucketName(message.Detail.Bucket.Name) + event.SetS3ObjectKey(message.Detail.Object.Key) + event.SetProvider("aws:s3") //TODO + return event +} + func (p *sqsS3EventProcessor) getS3Info(events s3EventsV2) ([]s3EventV2, error) { out := make([]s3EventV2, 0, len(events.Records)) for _, record := range events.Records { diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index 65552525136d..e39b23bb48a6 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -270,6 +270,16 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) { assert.Equal(t, "vpc-flow-logs-ks", events[0].S3.Bucket.Name) }) + t.Run("EventBridge-sqs notification", func(t *testing.T) { + msg := newEventBridgeSQSMessage() + events, err := p.getS3Notifications(*msg.Body) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, "test-object-key", events[0].S3.Object.Key) + assert.Equal(t, "arn:aws:s3:::vpc-flow-logs-ks", events[0].S3.Bucket.ARN) + assert.Equal(t, "vpc-flow-logs-ks", events[0].S3.Bucket.Name) + }) + t.Run("missing Records fail", func(t *testing.T) { msg := `{"message":"missing records"}` _, err := p.getS3Notifications(msg) diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index cf82f03c6dec..2595f1313a6b 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -9,6 +9,7 @@ import ( "crypto/sha256" "encoding/json" "errors" + "github.com/aws/aws-sdk-go/service/sqs" "testing" "time" @@ -221,6 +222,27 @@ func newSNSSQSMessage() types.Message { } } +func newEventBridgeSQSMessage() sqs.Message { + body, err := json.Marshal(s3EventsV2{ + Message: "{\"Messages\":[{ \"version\": \"0\", \"id\": \"f17994c0-7cb9-ee01-79fd-ae46df89e3a4\", \"detail-type\": \"Object Created\", \"source\": \"aws.s3\", \"account\": \"952856826737\", \"time\": \"2024-06-24T08:31:26Z\", \"region\": \"eu-west-1\", \"resources\": [\"arn:aws:s3:::vpc-flow-logs-ks\" ], \"detail\": {\"version\": \"0\",\"bucket\": { \"name\": \"vpc-flow-logs-ks\"},\"object\": { \"key\": \"test-object-key\", \"size\": 744, \"etag\": \"2ba6b152f13c75a9155b87b5b072963c\", \"version-id\": \"uoW5awQhqux4rhi4Nuh6il967FzQlsHJ\", \"sequencer\": \"0066792EC46EC0B037\"},\"request-id\": \"Q93BVWXD5G6FKWC2\",\"requester\": \"516635408544\",\"source-ip-address\": \"10.0.27.95\",\"reason\": \"PutObject\" }}]}", + }) + if err != nil { + panic(err) + } + + hash := sha256.Sum256(body) + id, _ := uuid.FromBytes(hash[:16]) + messageID := id.String() + receipt := "receipt-" + messageID + bodyStr := string(body) + + return sqs.Message{ + Body: &bodyStr, + MessageId: &messageID, + ReceiptHandle: &receipt, + } +} + func newS3Event(key string) s3EventV2 { record := s3EventV2{ AWSRegion: "us-east-1",