From bfa4837d393bc945a8aec150168a4e429cd688b8 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 3 Nov 2021 15:55:44 -0600 Subject: [PATCH 1/5] Add support in aws-s3 input for s3 notification from SNS to SQS --- x-pack/filebeat/input/awss3/sqs_s3_event.go | 19 ++++++++++++++-- .../filebeat/input/awss3/sqs_s3_event_test.go | 10 +++++++++ x-pack/filebeat/input/awss3/sqs_test.go | 22 +++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index b2c1a7f169d..a89aad7fc12 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -53,8 +53,12 @@ func nonRetryableErrorWrap(err error) error { // s3EventsV2 is the notification message that Amazon S3 sends to notify of S3 changes. // This was derived from the version 2.2 schema. // https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html +// If the notification message is sent from SNS to SQS, then Records will be +// replaced by TopicArn and Message fields. type s3EventsV2 struct { - Records []s3EventV2 `json:"Records"` + TopicArn string `json:"TopicArn"` + Message string `json:"Message"` + Records []s3EventV2 `json:"Records"` } // s3EventV2 is a S3 change notification event. @@ -189,6 +193,18 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro return nil, fmt.Errorf("failed to decode SQS message body as an S3 notification: %w", err) } + // Check if the notification is from S3 -> SNS -> SQS + if events.TopicArn != "" { + dec := json.NewDecoder(strings.NewReader(events.Message)) + if err := dec.Decode(&events); err != nil { + p.log.Debugw("Invalid SQS message body.", "sqs_message_body", body) + return nil, fmt.Errorf("failed to decode SQS message body as an S3 notification: %w", err) + } + } + return p.getS3Info(events) +} + +func (p *sqsS3EventProcessor) getS3Info(events s3EventsV2) ([]s3EventV2, error) { var out []s3EventV2 for _, record := range events.Records { if !p.isObjectCreatedEvents(record) { @@ -211,7 +227,6 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro out = append(out, record) } - return out, nil } diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index 8865c5d30cd..9edd5ec4ed9 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -184,6 +184,16 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) { require.NoError(t, err) assert.Len(t, events, 0) }) + + t.Run("sns-sqs notification", func(t *testing.T) { + msg := newSNSSQSMessage() + events, err := p.getS3Notifications(*msg.Body) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, "test-object-key", events[0].S3.Object.Key) + assert.Equal(t, "arn:aws:s3:::vpc-flow-logs-ks", events[0].S3.Bucket.ARN) + assert.Equal(t, "vpc-flow-logs-ks", events[0].S3.Bucket.Name) + }) } func TestNonRecoverableError(t *testing.T) { diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index 4940b4a6eca..a8b6e7b5f2a 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -126,6 +126,28 @@ func newSQSMessage(events ...s3EventV2) sqs.Message { } } +func newSNSSQSMessage() sqs.Message { + body, err := json.Marshal(s3EventsV2{ + TopicArn: "arn:aws:sns:us-east-1:1234:sns-topic", + Message: "{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"configurationId\":\"sns-notification-vpc-flow-logs\",\"bucket\":{\"name\":\"vpc-flow-logs-ks\",\"arn\":\"arn:aws:s3:::vpc-flow-logs-ks\"},\"object\":{\"key\":\"test-object-key\"}}}]}", + }) + if err != nil { + panic(err) + } + + hash := sha256.Sum256(body) + id, _ := uuid.FromBytes(hash[:16]) + messageID := id.String() + receipt := "receipt-" + messageID + bodyStr := string(body) + + return sqs.Message{ + Body: &bodyStr, + MessageId: &messageID, + ReceiptHandle: &receipt, + } +} + func newS3Event(key string) s3EventV2 { record := s3EventV2{ AWSRegion: "us-east-1", From 66ac501e78c30427a7b7e467657ad2c730e4f582 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 5 Nov 2021 14:22:17 -0600 Subject: [PATCH 2/5] add integration test for sns --- .../input/awss3/_meta/terraform/README.md | 6 +- .../input/awss3/_meta/terraform/main.tf | 74 ++++++++++++ .../input/awss3/_meta/terraform/outputs.tf | 2 + .../input/awss3/input_integration_test.go | 110 ++++++++++++++---- 4 files changed, 168 insertions(+), 24 deletions(-) diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/README.md b/x-pack/filebeat/input/awss3/_meta/terraform/README.md index 7ab27781704..d5614b99a92 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/README.md +++ b/x-pack/filebeat/input/awss3/_meta/terraform/README.md @@ -1,9 +1,9 @@ # Terraform setup for AWS S3 Input Integration Tests -This directory contains a Terrafrom module that creates the AWS resources needed +This directory contains a Terraform module that creates the AWS resources needed for executing the integration tests for the `aws-s3` Filebeat input. It creates an S3 bucket and SQS queue and configures S3 `ObjectCreated:*` notifications to -be delivered to SQS. +be delivered to SQS. It also creates a second S3 bucket, SNS topic, SQS queue and configures S3 `ObjectCreated:*` notifications to be delivered to SNS and also creates a subscription for this SNS topic to SQS queue to automatically place messages sent to SNS topic in SQS queue. It outputs configuration information that is consumed by the tests to `outputs.yml`. The AWS resources are randomly named to prevent name collisions @@ -33,7 +33,7 @@ to match the AWS region of the profile you are using. 4. Execute the integration test. ``` - cd x-pack/filebeat/inputs/awss3 + cd x-pack/filebeat/input/awss3 go test -tags aws,integration -run TestInputRun.+ -v . ``` diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf index 1b22b8bbfdb..62e86abc787 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf +++ b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf @@ -60,3 +60,77 @@ resource "aws_s3_bucket_notification" "bucket_notification" { aws_sqs_queue.filebeat-integtest, ] } + +resource "aws_sns_topic" "filebeat-integtest-sns" { + name = "filebeat-s3-integtest-sns-${random_string.random.result}" + + policy = < Date: Fri, 5 Nov 2021 14:25:45 -0600 Subject: [PATCH 3/5] fix integration test --- x-pack/filebeat/input/awss3/input_integration_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 8a14964f66f..065e6f5843b 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -4,6 +4,9 @@ // See _meta/terraform/README.md for integration test usage instructions. +//go:build integration && aws +// +build integration,aws + package awss3 import ( From d43b7b59ba78605ea46727270456964bf5fb505c Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 5 Nov 2021 14:26:56 -0600 Subject: [PATCH 4/5] add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e52fdab501a..1bf74b4683f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -322,6 +322,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update `aws-s3` input to connect to non AWS S3 buckets {issue}28222[28222] {pull}28234[28234] - Sophos UTM: Support logs containing hostname in syslog header. {pull}28638[28638] - Moving Oracle Filebeat module to GA. {pull}28754[28754] +- Add support in aws-s3 input for s3 notification from SNS to SQS. {pull}28800[28800] *Heartbeat* From 91e894e4d7c3d8abefe74cd57b932b40ed69c4a3 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 8 Nov 2021 16:44:32 -0700 Subject: [PATCH 5/5] add documentation --- .../filebeat/docs/inputs/input-aws-s3.asciidoc | 10 +++++++++- .../input/awss3/input_integration_test.go | 17 ++++++++--------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 1c3b23e9267..c0f75e67e12 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -17,7 +17,7 @@ The use of SQS notification is preferred: polling list of S3 objects is expensiv in terms of performance and costs and should be preferably used only when no SQS notification can be attached to the S3 buckets. This input can, for example, be used to receive S3 access logs to monitor detailed records for the requests that -are made to a bucket. +are made to a bucket. This input also supports S3 notification from SNS to SQS. SQS notification method is enabled setting `queue_url` configuration value. S3 bucket list polling method is enabled setting `bucket_arn` configuration value. @@ -386,6 +386,14 @@ create a notification through SQS. Please see https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to-bucket.html#step1-create-sqs-queue-for-notification[create-sqs-queue-for-notification] for more details. +[float] +=== S3 -> SNS -> SQS setup +If you would like to use the bucket notification in multiple different consumers +(others than {beatname_lc}), you should use an SNS topic for the bucket notification. +Please see https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html#step1-create-sns-topic-for-notification[create-SNS-topic-for-notification] +for more details. SQS queue will be configured as a +https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html[subscriber to the SNS topic]. + [float] === Parallel Processing diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 065e6f5843b..fe6a901aafd 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -50,11 +50,11 @@ const ( ) type terraformOutputData struct { - AWSRegion string `yaml:"aws_region"` - BucketName string `yaml:"bucket_name"` - QueueURL string `yaml:"queue_url"` - BucketName2 string `yaml:"bucket_name_for_sns"` - QueueURL2 string `yaml:"queue_url_for_sns"` + AWSRegion string `yaml:"aws_region"` + BucketName string `yaml:"bucket_name"` + QueueURL string `yaml:"queue_url"` + BucketNameForSNS string `yaml:"bucket_name_for_sns"` + QueueURLForSNS string `yaml:"queue_url_for_sns"` } func getTerraformOutputs(t *testing.T) terraformOutputData { @@ -458,12 +458,12 @@ func TestInputRunSNS(t *testing.T) { tfConfig := getTerraformOutputs(t) // Ensure SQS is empty before testing. - drainSQS(t, tfConfig.AWSRegion, tfConfig.QueueURL2) + drainSQS(t, tfConfig.AWSRegion, tfConfig.QueueURLForSNS) // Ensure metrics are removed before testing. monitoring.GetNamespace("dataset").GetRegistry().Remove(inputID) - uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName2, + uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketNameForSNS, "testdata/events-array.json", "testdata/invalid.json", "testdata/log.json", @@ -474,7 +474,7 @@ func TestInputRunSNS(t *testing.T) { "testdata/log.txt", // Skipped (no match). ) - s3Input := createInput(t, makeTestConfigSQS(tfConfig.QueueURL2)) + s3Input := createInput(t, makeTestConfigSQS(tfConfig.QueueURLForSNS)) inputCtx, cancel := newV2Context() t.Cleanup(cancel) @@ -486,7 +486,6 @@ func TestInputRunSNS(t *testing.T) { defer close(client.Channel) go func() { for event := range client.Channel { - // Fake the ACK handling that's not implemented in pubtest. event.Private.(*eventACKTracker).ACK() } }()