Skip to content

Commit

Permalink
Adding code to OOTB support AWS Eventbridge generated events for S3 c…
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-Bischoff committed Jun 25, 2024
1 parent e3a8223 commit a4f82b1
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 0 deletions.
70 changes: 70 additions & 0 deletions x-pack/filebeat/input/awss3/sqs_s3_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,43 @@ type s3EventV2 struct {
} `json:"s3"`
}

// eventBridgeEvents is 'Object Created' payload generated by AWS EventBridge
// At the moment it doesn't seem to have a version
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html
type eventBridgeEvents struct {
Messages []eventBridgeEvent `json:"messages"`
}

// Object created event.
type eventBridgeEvent struct {
Version string `json:"version"`
Id string `json:"id"`
DetailType string `json:"detail-type"`
Source string `json:"source"`
Account string `json:"account"`
Time string `json:"time"`
Region string `json:"region"`
Resources []string `json:"resources"`
Detail struct {
Version string `json:"version"`
Bucket struct {
Name string `json:"name"`
}
Object struct {
Key string `json:"key"`
Size int `json:"size"`
Etag string `json:"etag"`
VersionId string `json:"version-id"`
Sequencer string `json:"sequencer"`
}
RequestId string `json:"request-id"`
Requester string `json:"requester"`
SourceIpAddress string `json:"source-ip-address"`
Reason string `json:"reason"`
}
}

type sqsS3EventProcessor struct {
s3ObjectHandler s3ObjectHandlerFactory
sqsVisibilityTimeout time.Duration
Expand Down Expand Up @@ -252,6 +289,17 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro
}
}

// Check if the notification is from S3 -> EventBridge -> SQS
var eventBridgeEvents eventBridgeEvents
if events.Records == nil {
dec := json.NewDecoder(strings.NewReader(events.Message))
if err := dec.Decode(&eventBridgeEvents); err != nil {
p.log.Debugw("Could not parse message as EventBridge payload", "sqs_message_body", body)
} else {
convertEventBridge(&eventBridgeEvents, &events)
}
}

if events.Records == nil {
p.log.Debugw("Invalid SQS message body: missing Records field", "sqs_message_body", body)
return nil, errors.New("the message is an invalid S3 notification: missing Records field")
Expand All @@ -260,6 +308,28 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro
return p.getS3Info(events)
}

func convertEventBridge(eventBridgeEvents *eventBridgeEvents, s3Events *s3EventsV2) {
for _, message := range eventBridgeEvents.Messages {
s3Events.Records = append(s3Events.Records, convertEventBridgeEvent(&message))

Check failure on line 313 in x-pack/filebeat/input/awss3/sqs_s3_event.go

View workflow job for this annotation

GitHub Actions / lint (windows)

G601: Implicit memory aliasing in for loop. (gosec)

Check failure on line 313 in x-pack/filebeat/input/awss3/sqs_s3_event.go

View workflow job for this annotation

GitHub Actions / lint (linux)

G601: Implicit memory aliasing in for loop. (gosec)
}
}

func convertEventBridgeEvent(message *eventBridgeEvent) s3EventV2 {
var event = s3EventV2{}
if message.DetailType == "Object Created" {
event.SetEventName("ObjectCreated:Put")
}
event.SetS3BucketARN(message.Resources[0])
event.SetAWSRegion(message.Region)
if message.Source == "aws.s3" {
event.SetEventSource("aws:s3")
}
event.SetS3BucketName(message.Detail.Bucket.Name)
event.SetS3ObjectKey(message.Detail.Object.Key)
event.SetProvider("aws:s3") //TODO
return event
}

func (p *sqsS3EventProcessor) getS3Info(events s3EventsV2) ([]s3EventV2, error) {
out := make([]s3EventV2, 0, len(events.Records))
for _, record := range events.Records {
Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/input/awss3/sqs_s3_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) {
assert.Equal(t, "vpc-flow-logs-ks", events[0].S3.Bucket.Name)
})

t.Run("EventBridge-sqs notification", func(t *testing.T) {
msg := newEventBridgeSQSMessage()
events, err := p.getS3Notifications(*msg.Body)
require.NoError(t, err)
assert.Len(t, events, 1)
assert.Equal(t, "test-object-key", events[0].S3.Object.Key)
assert.Equal(t, "arn:aws:s3:::vpc-flow-logs-ks", events[0].S3.Bucket.ARN)
assert.Equal(t, "vpc-flow-logs-ks", events[0].S3.Bucket.Name)
})

t.Run("missing Records fail", func(t *testing.T) {
msg := `{"message":"missing records"}`
_, err := p.getS3Notifications(msg)
Expand Down
22 changes: 22 additions & 0 deletions x-pack/filebeat/input/awss3/sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/sha256"
"encoding/json"
"errors"
"github.com/aws/aws-sdk-go/service/sqs"

Check failure on line 12 in x-pack/filebeat/input/awss3/sqs_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

File is not `goimports`-ed with -local github.com/elastic (goimports)

Check failure on line 12 in x-pack/filebeat/input/awss3/sqs_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

File is not `goimports`-ed with -local github.com/elastic (goimports)
"testing"
"time"

Expand Down Expand Up @@ -221,6 +222,27 @@ func newSNSSQSMessage() types.Message {
}
}

func newEventBridgeSQSMessage() sqs.Message {
body, err := json.Marshal(s3EventsV2{
Message: "{\"Messages\":[{ \"version\": \"0\", \"id\": \"f17994c0-7cb9-ee01-79fd-ae46df89e3a4\", \"detail-type\": \"Object Created\", \"source\": \"aws.s3\", \"account\": \"952856826737\", \"time\": \"2024-06-24T08:31:26Z\", \"region\": \"eu-west-1\", \"resources\": [\"arn:aws:s3:::vpc-flow-logs-ks\" ], \"detail\": {\"version\": \"0\",\"bucket\": { \"name\": \"vpc-flow-logs-ks\"},\"object\": { \"key\": \"test-object-key\", \"size\": 744, \"etag\": \"2ba6b152f13c75a9155b87b5b072963c\", \"version-id\": \"uoW5awQhqux4rhi4Nuh6il967FzQlsHJ\", \"sequencer\": \"0066792EC46EC0B037\"},\"request-id\": \"Q93BVWXD5G6FKWC2\",\"requester\": \"516635408544\",\"source-ip-address\": \"10.0.27.95\",\"reason\": \"PutObject\" }}]}",
})
if err != nil {
panic(err)
}

hash := sha256.Sum256(body)
id, _ := uuid.FromBytes(hash[:16])
messageID := id.String()
receipt := "receipt-" + messageID
bodyStr := string(body)

return sqs.Message{
Body: &bodyStr,
MessageId: &messageID,
ReceiptHandle: &receipt,
}
}

func newS3Event(key string) s3EventV2 {
record := s3EventV2{
AWSRegion: "us-east-1",
Expand Down

0 comments on commit a4f82b1

Please sign in to comment.