Skip to content

Commit

Permalink
Merge pull request #226 from umccr/feature/universal-event-archiver
Browse files Browse the repository at this point in the history
add universal events archiver function
  • Loading branch information
raylrui authored Apr 22, 2024
2 parents 76b1c0b + f8663db commit c9bf1ee
Show file tree
Hide file tree
Showing 12 changed files with 386 additions and 2 deletions.
4 changes: 4 additions & 0 deletions config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ export const regName = 'OrcaBusSchemaRegistry';
export const eventBusName = 'OrcaBusMain';
export const eventSourceQueueName = 'orcabus-event-source-queue';

// Event Archiver constants for EventBus Contruct in SharedStack
export const archiveBucketName = 'event-archive-bucket';
export const archiveSecurityGroupName = 'OrcaBusEventArchiveSecurityGroup';

/**
* Configuration for resources created in TokenServiceStack
*/
Expand Down
8 changes: 8 additions & 0 deletions config/stacks/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import {
regName,
stgBucket,
vpcProps,
archiveBucketName,
archiveSecurityGroupName,
} from '../constants';
import { Duration, RemovalPolicy } from 'aws-cdk-lib';
import { SchemaRegistryProps } from '../../lib/workload/stateful/stacks/shared/constructs/schema-registry';
Expand All @@ -36,6 +38,12 @@ const getEventBusConstructProps = (): EventBusProps => {
archiveName: 'OrcaBusMainArchive',
archiveDescription: 'OrcaBus main event bus archive',
archiveRetention: 365,

// add custom event archiver
addCustomEventArchiver: true,
vpcProps: vpcProps,
lambdaSecurityGroupName: archiveSecurityGroupName,
archiveBucketName: archiveBucketName,
};
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
install:
@pip install -r deps/requirements-test.txt

test:
@python -m unittest archive_service/tests/test_universal_event_archiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Custom Event Archiver

EventBridge does support a bus archive, but we don't have control over it and can't inspect it.\
In order to get a history record of any and all events that go over the OrcaBus event bus, custom event archivers are needed to write specific or all events as JSON objects to a dedicated S3 bucket using its timestamp (structure by year/month/day prefixes).

<!-- TOC -->
* [Universal Event Archiver](#universal-event-archiver)
* [Inputs](#inputs)
* [Outputs](#outputs)
* [Sanitize for object key](#sanitize-for-object-key)

<!-- TOC -->


## universal-event-archiver

### inputs

The lambda function takes all events that go over the OrcaBus event bus, and write it as JSON to the dedicated S3 bucket Formatting the S3 key with year/month/day partitioning.

Parameters:
* eventBus
* s3bucket
* vpc

Example Input: general Event structure

```json5
{
"version": "0",
"id": "UUID",
"detail-type": "event_name",
"source": "event source",
"account": "ARN",
"time": "timestamp",
"region": "region",
"resources": [
"ARN"
],
"detail": {
...
}
}
```

### Outputs

S3 object of that event archived in dedicated s3 bucket.\
URI: s3://{bucket_name}/events/year={year}/month={month}/day={day}/{event_type}_{totalSeconds.microsecond}.json \
Example Outputs: ```s3://{bucket_name}/events/year=2024/month=04/day=16/WorkflowRequest_1713252338.243297.json```


## Sanitize for object key

Event details will be retrieved as part of object key. In case of any issues happened when object key used in filenames or URL, function ```sanitize_string``` will be applied.

* Any sequence of characters that are not alphanumeric or underscores including special characters and spaces will be replaced with an underscore "_".
* Any leading and trailing underscore and whitespace will be removed.

Test case: ```sanitize_string(" test %01## 23%!~@#$%^&*(). case. ")``` will produce ```test_01_23_case```
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import unittest
from unittest.mock import patch
import botocore.session
from botocore.stub import Stubber
import os
import json
from freezegun import freeze_time
from archive_service.universal_event_archiver import handler

class UniversalEventArchiverUnitTest(unittest.TestCase):
def setUp(self):
self.s3 = botocore.session.get_session().create_client('s3')
self.stubber = Stubber(self.s3)
self.stubber.activate()
self.event = {
"detail-type": "Test Event Type. ", # detail-type with withe space and special characters, testing sanitize_string
"detail": {}
}
os.environ['BUCKET_NAME'] = 'test-bucket'
patch('archive_service.universal_event_archiver.s3', self.s3).start()

def tearDown(self):
self.stubber.deactivate()
patch.stopall()
if 'BUCKET_NAME' in os.environ:
del os.environ['BUCKET_NAME']

# freeze time for time stamp testing purposes
@freeze_time("2024-01-1")
def test_handler_success(self):

# expected time stamp (2024-01-01 00:00:00)
expected_key = 'events/year=2024/month=01/day=01/Test_Event_Type_1704067200.0.json'
expected_tagging = 'event_type=Test_Event_Type&event_time=2024-01-01__00-00-00'

# Mock the response and setup stubbing
response = {}
expected_params = {
'Bucket': 'test-bucket',
'Key': expected_key,
'Body': json.dumps(self.event),
'Tagging': expected_tagging
}
self.stubber.add_response('put_object', response, expected_params)

# Call the handler
result = handler(self.event, None)

# Verify
self.assertEqual(result['statusCode'], 200)
self.assertTrue('Event archived successfully!' in result['body'])
self.stubber.assert_no_pending_responses()

def test_handler_no_bucket_env(self):
# Remove environment variable to test error handling
del os.environ['BUCKET_NAME']

with self.assertRaises(AssertionError):
handler(self.event, None)

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import json
import boto3
import os
import re
import logging
from datetime import datetime, timezone

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Initialize S3 client
s3 = boto3.client('s3')

def handler(event, context):

#assert the environment variable is set
assert os.getenv('BUCKET_NAME'), "BUCKET_NAME environment variable is not set"

BUCKET_NAME = os.getenv('BUCKET_NAME')

# Current timestamp
now = datetime.now(timezone.utc)
time_stamp = str(now.timestamp()) # for object name
time_stamp_formated = now.strftime("%Y-%m-%d__%H-%M-%S") # for tagging

# Extract the event title (type) from detail type
event_type = sanitize_string(event.get('detail-type', 'undefinedEvent'))

# Formatting the S3 key with year/month/day partitioning
key = f'events/year={now.year}/month={now.month:02}/day={now.day:02}/{event_type+"_"+time_stamp}.json'

# Convert the event to JSON
event_json = json.dumps(event)
default_tags = {
'event_type': event_type,
'event_time': time_stamp_formated,
}

# Write the JSON to an S3 bucket
try:
s3.put_object(Bucket=BUCKET_NAME, Key=key, Body=event_json, Tagging='&'.join([f'{k}={v}' for k, v in default_tags.items()]))
logger.info("Event stored:", key)
except Exception as e:
logger.error("Error storing event: %s", str(e))
raise e

return {
'statusCode': 200,
'body': json.dumps('Event archived successfully! Archived path: '+ key)
}

def sanitize_string(input_string):
return re.sub(r'[^\w]+', '_', input_string.strip()).strip('_')
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import path from 'path';
import { IBucket } from 'aws-cdk-lib/aws-s3';
import { LambdaFunction } from 'aws-cdk-lib/aws-events-targets';
import { PolicyStatement } from 'aws-cdk-lib/aws-iam';
import { aws_lambda, Duration, Stack } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
import { Architecture } from 'aws-cdk-lib/aws-lambda';
import { IEventBus, Rule } from 'aws-cdk-lib/aws-events';
import { IVpc, ISecurityGroup } from 'aws-cdk-lib/aws-ec2';

export interface UniversalEventArchiverConstructProps {
vpc: IVpc;
lambdaSG: ISecurityGroup;
archiveBucket: IBucket;
eventBus: IEventBus;
}

export class UniversalEventArchiverConstruct extends Construct {
private readonly id: string;
private readonly lambdaRuntimePythonVersion: aws_lambda.Runtime = aws_lambda.Runtime.PYTHON_3_12;

constructor(scope: Construct, id: string, props: UniversalEventArchiverConstructProps) {
super(scope, id);

this.id = id;

const eventBus = props.eventBus;
const archiveBucket = props.archiveBucket;
const lambdaSG = props.lambdaSG;

const archiveEventFunction = new PythonFunction(this, 'UniversalEventArchiver', {
entry: path.join(__dirname, '../../archive_service'),
runtime: this.lambdaRuntimePythonVersion,
environment: {
BUCKET_NAME: archiveBucket.bucketName,
},
vpc: props.vpc,
vpcSubnets: { subnets: props.vpc.privateSubnets },
securityGroups: [lambdaSG],
architecture: Architecture.ARM_64,
timeout: Duration.seconds(28),
index: 'universal_event_archiver.py',
handler: 'handler',
});

archiveBucket.grantPut(archiveEventFunction);

const rule = new Rule(this, this.id + 'EventRule', {
ruleName: 'UniversalEventArchiverRule',
description: 'Rule to archive all events to S3',
eventBus,
eventPattern: {
account: [Stack.of(this).account],
},
});

rule.addTarget(
new LambdaFunction(archiveEventFunction, {
maxEventAge: Duration.seconds(60), // Maximum age must have value greater than or equal to 60 (Service: EventBridge)
retryAttempts: 3, // Retry up to 3 times
})
);

// Optional: If the Lambda function needs more permissions
archiveEventFunction.addToRolePolicy(
new PolicyStatement({
actions: ['s3:PutObject'],
resources: [archiveBucket.bucketArn + '/*'],
})
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3
freezegun
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import { Construct } from 'constructs';
import { Duration, Stack } from 'aws-cdk-lib';
import { Duration, RemovalPolicy, Stack } from 'aws-cdk-lib';
import * as events from 'aws-cdk-lib/aws-events';
import { Vpc, VpcLookupOptions, SecurityGroup } from 'aws-cdk-lib/aws-ec2';
import { Bucket } from 'aws-cdk-lib/aws-s3';
import { UniversalEventArchiverConstruct } from './custom-event-archiver/construct/universal-event-archiver';

export interface EventBusProps {
eventBusName: string;
archiveName: string;
archiveDescription: string;
archiveRetention: number;

// Optional for custom event archiver
addCustomEventArchiver?: boolean;
vpcProps?: VpcLookupOptions;
lambdaSecurityGroupName?: string;
archiveBucketName?: string;
}

export class EventBusConstruct extends Construct {
Expand All @@ -15,6 +24,11 @@ export class EventBusConstruct extends Construct {
constructor(scope: Construct, id: string, props: EventBusProps) {
super(scope, id);
this.mainBus = this.createMainBus(props);

// Optional for custom event archiver
if (props.addCustomEventArchiver) {
this.createUniversalEventArchiver(props);
}
}

private createMainBus(props: EventBusProps) {
Expand All @@ -33,4 +47,36 @@ export class EventBusConstruct extends Construct {

return mainBus;
}

private createUniversalEventArchiver(props: EventBusProps) {
if (!props.vpcProps || !props.archiveBucketName || !props.lambdaSecurityGroupName) {
throw new Error(
'VPC, Security Group and Archive Bucket are required for custom event archiver function.'
);
}

const vpc = Vpc.fromLookup(this, 'MainVpc', props.vpcProps);

// dedicated bucket for archiving all events
const archiveBucket = new Bucket(this, 'UniversalEventArchiveBucket', {
bucketName: props.archiveBucketName,
removalPolicy: RemovalPolicy.RETAIN,
serverAccessLogsPrefix: 'server-access-logs/',
enforceSSL: true, //denies any request made via plain HTTP
});
// dedicated security group for the lambda function
const lambdaSG = new SecurityGroup(this, 'UniversalEventArchiverLambdaSG', {
vpc,
securityGroupName: props.lambdaSecurityGroupName,
allowAllOutbound: true,
description: 'Security group for the Universal Event Archiver Lambda function to egress out.',
});

new UniversalEventArchiverConstruct(this, 'UniversalEventArchiver', {
vpc,
lambdaSG,
archiveBucket,
eventBus: this.mainBus,
});
}
}
Loading

0 comments on commit c9bf1ee

Please sign in to comment.