Skip to content

Commit

Permalink
sorry folks, remove the big mess of accident rebase...
Browse files Browse the repository at this point in the history
  • Loading branch information
raylrui committed Apr 19, 2024
1 parent f44443e commit aba6f35
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ Example Input: general Event structure
### Outputs

S3 object of that event archived in dedicated s3 bucket.\
URI: s3://{bucket_name}/events/{year}/{month}/{day}/{event_type}_{totalSeconds.microsecond}.json \
Example Outputs: ```s3://{bucket_name}/events/2024/04/16/WorkflowRequest_1713252338.243297.json```
URI: s3://{bucket_name}/events/year={year}/month={month}/day={day}/{event_type}_{totalSeconds.microsecond}.json \
Example Outputs: ```s3://{bucket_name}/events/year=2024/month=04/day=16/WorkflowRequest_1713252338.243297.json```


## Sanitize for object key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,31 @@
# Initialize S3 client
s3 = boto3.client('s3')

# The name of the bucket
BUCKET_NAME = os.getenv('BUCKET_NAME')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def handler(event, context):
#assert the environment variable is set
assert os.getenv('BUCKET_NAME'), "BUCKET_NAME environment variable is not set"

BUCKET_NAME = os.getenv('BUCKET_NAME')

# Current timestamp
now = datetime.now(timezone.utc)
time_stamp = str(now.timestamp()) # for object name
time_stamp_details = now.strftime("%Y-%m-%d__%H-%M-%S") # for tagging
time_stamp_formated = now.strftime("%Y-%m-%d__%H-%M-%S") # for tagging

# Extract the event title (type) from detail type
event_title = sanitize_string(event.get('detail-type', 'undefinedEvent'))
event_type = sanitize_string(event.get('detail-type', 'undefinedEvent'))

# Formatting the S3 key with year/month/day partitioning
key = f'events/year={now.year}/month={now.month:02}/day={now.day:02}/{event_title+'_'+time_stamp}.json'
key = f'events/year={now.year}/month={now.month:02}/day={now.day:02}/{event_type+'_'+time_stamp}.json'

# Convert the event to JSON
event_json = json.dumps(event)
default_tags = {
'event_type': event_title,
'event_time': time_stamp_details,
'event_type': event_type,
'event_time': time_stamp_formated,
}

# Write the JSON to an S3 bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import { Construct } from 'constructs';
import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
import { Architecture } from 'aws-cdk-lib/aws-lambda';
import { IEventBus, Rule } from 'aws-cdk-lib/aws-events';
import { IVpc } from 'aws-cdk-lib/aws-ec2';
import { IVpc, ISecurityGroup } from 'aws-cdk-lib/aws-ec2';

export interface UniversalEventArchiverConstructProps {
vpc: IVpc;
lambdaSG: ISecurityGroup;
archiveBucket: IBucket;
eventBus: IEventBus;
}
Expand All @@ -26,6 +27,7 @@ export class UniversalEventArchiverConstruct extends Construct {

const eventBus = props.eventBus;
const archiveBucket = props.archiveBucket;
const lambdaSG = props.lambdaSG;

const archiveEventFunction = new PythonFunction(this, 'UniversalEventArchiver', {
entry: path.join(__dirname, '../../archiver-service'),
Expand All @@ -35,6 +37,7 @@ export class UniversalEventArchiverConstruct extends Construct {
},
vpc: props.vpc,
vpcSubnets: { subnets: props.vpc.privateSubnets },
securityGroups: [lambdaSG],
architecture: Architecture.ARM_64,
timeout: Duration.seconds(28),
index: 'universal_event_achiver.py',
Expand All @@ -43,17 +46,18 @@ export class UniversalEventArchiverConstruct extends Construct {

archiveBucket.grantReadWrite(archiveEventFunction);

const rule = new Rule(this, 'Rule', {
const rule = new Rule(this, this.id + 'EventRule', {
ruleName: 'UniversalEventArchiverRule',
description: 'Rule to archive all events to S3',
eventBus,
eventPattern: {
//account: [cdk.Aws.ACCOUNT_ID],
account: [Stack.of(this).account],
},
});

rule.addTarget(
new LambdaFunction(archiveEventFunction, {
maxEventAge: Duration.seconds(28), // Maximum age for an event to be retried
maxEventAge: Duration.seconds(60), // Maximum age must have value greater than or equal to 60 (Service: EventBridge)
retryAttempts: 3, // Retry up to 3 times
})
);
Expand Down
33 changes: 25 additions & 8 deletions lib/workload/stateful/stacks/shared/constructs/event-bus/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Construct } from 'constructs';
import { Duration, Stack } from 'aws-cdk-lib';
import { Duration, RemovalPolicy, Stack } from 'aws-cdk-lib';
import * as events from 'aws-cdk-lib/aws-events';
import { Vpc, VpcLookupOptions } from 'aws-cdk-lib/aws-ec2';
import { Vpc, VpcLookupOptions, SecurityGroup } from 'aws-cdk-lib/aws-ec2';
import { Bucket } from 'aws-cdk-lib/aws-s3';
import { UniversalEventArchiverConstruct } from './custom-event-archiver/construct/universal-event-archiver';

Expand All @@ -13,8 +13,9 @@ export interface EventBusProps {

// Optional for custom event archiver
addCustomEventArchiver?: boolean;
vpc?: VpcLookupOptions;
archiveBucket?: string;
vpcProps?: VpcLookupOptions;
lambdaSecurityGroupName?: string;
archiveBucketName?: string;
}

export class EventBusConstruct extends Construct {
Expand Down Expand Up @@ -48,14 +49,30 @@ export class EventBusConstruct extends Construct {
}

private createUniversalEventArchiver(props: EventBusProps) {
if (!props.vpc || !props.archiveBucket) {
throw new Error('VPC and Archive Bucket are required for custom event archiver');
if (!props.vpcProps || !props.archiveBucketName || !props.lambdaSecurityGroupName) {
throw new Error(
'VPC, Security Group and Archive Bucket are required for custom event archiver function.'
);
}
const archiveBucket = Bucket.fromBucketName(this, 'AuditBucket', props.archiveBucket);
const vpc = Vpc.fromLookup(this, 'MainVpc', props.vpc);

const vpc = Vpc.fromLookup(this, 'MainVpc', props.vpcProps);

// dedicated bucket for archiving all events
const archiveBucket = new Bucket(this, 'UniversalEventArchiveBucket', {
bucketName: props.archiveBucketName,
removalPolicy: RemovalPolicy.RETAIN,
});
// dedicated security group for the lambda function
const lambdaSG = new SecurityGroup(this, 'UniversalEventArchiverLambdaSG', {
vpc,
securityGroupName: props.lambdaSecurityGroupName,
allowAllOutbound: true,
description: 'Security group for the Universal Event Archiver Lambda function to egress out.',
});

new UniversalEventArchiverConstruct(this, 'UniversalEventArchiver', {
vpc,
lambdaSG,
archiveBucket,
eventBus: this.mainBus,
});
Expand Down

0 comments on commit aba6f35

Please sign in to comment.