Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(events): DLQ support for EventBus target #16383

Merged
merged 7 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion packages/@aws-cdk/aws-events-targets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Currently supported are:
* Put a record to a Kinesis stream
* [Log an event into a LogGroup](#log-an-event-into-a-loggroup)
* Put a record to a Kinesis Data Firehose stream
* Put an event on an EventBridge bus
* [Put an event on an EventBridge bus](#put-an-event-on-an-eventbridge-bus)

See the README of the `@aws-cdk/aws-events` library for more information on
EventBridge.
Expand Down Expand Up @@ -266,3 +266,23 @@ rule.addTarget(
} ),
)
```

## Put an event on an EventBridge bus

Use the `EventBus` target to route event to a different EventBus.

The code snippet below creates the scheduled event rule that route events to an imported event bus.

```ts
const rule = new events.Rule(this, 'Rule', {
schedule: events.Schedule.expression('rate(1 minute)'),
});

rule.addTarget(new targets.EventBus(
events.EventBus.fromEventBusArn(
this,
'External',
`arn:aws:events:eu-west-1:999999999999:event-bus/test-bus`,
),
));
```
35 changes: 26 additions & 9 deletions packages/@aws-cdk/aws-events-targets/lib/event-bus.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import * as events from '@aws-cdk/aws-events';
import * as iam from '@aws-cdk/aws-iam';
import { singletonEventRole } from './util';
import * as sqs from '@aws-cdk/aws-sqs';
import { singletonEventRole, addToDeadLetterQueueResourcePolicy } from './util';

/**
* Configuration properties of an Event Bus event
*
* Cannot extend TargetBaseProps. Retry policy is not supported for Event bus targets.
*/
export interface EventBusProps {
/**
Expand All @@ -12,25 +15,39 @@ export interface EventBusProps {
* @default a new role is created.
*/
readonly role?: iam.IRole;

/**
* The SQS queue to be used as deadLetterQueue.
* Check out the [considerations for using a dead-letter queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/rule-dlq.html#dlq-considerations).
*
* The events not successfully delivered are automatically retried for a specified period of time,
* depending on the retry policy of the target.
* If an event is not delivered before all retry attempts are exhausted, it will be sent to the dead letter queue.
*
* @default - no dead-letter queue
*/
readonly deadLetterQueue?: sqs.IQueue;
}

/**
* Notify an existing Event Bus of an event
*/
export class EventBus implements events.IRuleTarget {
private readonly role?: iam.IRole;

constructor(private readonly eventBus: events.IEventBus, props: EventBusProps = {}) {
this.role = props.role;
}
constructor(private readonly eventBus: events.IEventBus, private readonly props: EventBusProps = {}) { }

bind(rule: events.IRule, _id?: string): events.RuleTargetConfig {
if (this.role) {
this.role.addToPrincipalPolicy(this.putEventStatement());
if (this.props.role) {
this.props.role.addToPrincipalPolicy(this.putEventStatement());
}
const role = this.role ?? singletonEventRole(rule, [this.putEventStatement()]);
const role = this.props.role ?? singletonEventRole(rule, [this.putEventStatement()]);

if (this.props.deadLetterQueue) {
addToDeadLetterQueueResourcePolicy(rule, this.props.deadLetterQueue);
}

return {
arn: this.eventBus.eventBusArn,
deadLetterConfig: this.props.deadLetterQueue ? { arn: this.props.deadLetterQueue?.queueArn } : undefined,
role,
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import '@aws-cdk/assert-internal/jest';
import * as events from '@aws-cdk/aws-events';
import * as iam from '@aws-cdk/aws-iam';
import * as sqs from '@aws-cdk/aws-sqs';
import { Stack } from '@aws-cdk/core';
import * as targets from '../../lib';

Expand Down Expand Up @@ -90,4 +91,79 @@ test('with supplied role', () => {
Ref: 'Role1ABCC5F0',
}],
});
});
});

test('with a Dead Letter Queue specified', () => {
const stack = new Stack();
const rule = new events.Rule(stack, 'Rule', {
schedule: events.Schedule.expression('rate(1 min)'),
});
const queue = new sqs.Queue(stack, 'Queue');

rule.addTarget(new targets.EventBus(
events.EventBus.fromEventBusArn(
stack,
'External',
'arn:aws:events:us-east-1:123456789012:default',
),
{ deadLetterQueue: queue },
));

expect(stack).toHaveResource('AWS::Events::Rule', {
Targets: [{
Arn: 'arn:aws:events:us-east-1:123456789012:default',
Id: 'Target0',
RoleArn: {
'Fn::GetAtt': [
'RuleEventsRoleC51A4248',
'Arn',
],
},
DeadLetterConfig: {
Arn: {
'Fn::GetAtt': [
'Queue4A7E3555',
'Arn',
],
},
},
}],
});

expect(stack).toHaveResource('AWS::SQS::QueuePolicy', {
PolicyDocument: {
Statement: [
{
Action: 'sqs:SendMessage',
Condition: {
ArnEquals: {
'aws:SourceArn': {
'Fn::GetAtt': [
'Rule4C995B7F',
'Arn',
],
},
},
},
Effect: 'Allow',
Principal: {
Service: 'events.amazonaws.com',
},
Resource: {
'Fn::GetAtt': [
'Queue4A7E3555',
'Arn',
],
},
Sid: 'AllowEventRuleRule',
},
],
Version: '2012-10-17',
},
Queues: [
{
Ref: 'Queue4A7E3555',
},
],
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
]
]
},
"DeadLetterConfig": {
"Arn": {
"Fn::GetAtt": [
"Queue4A7E3555",
"Arn"
]
}
},
"Id": "Target0",
"RoleArn": {
"Fn::GetAtt": [
Expand Down Expand Up @@ -78,6 +86,50 @@
}
]
}
},
"Queue4A7E3555": {
"Type": "AWS::SQS::Queue",
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"QueuePolicy25439813": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Fn::GetAtt": [
"Rule4C995B7F",
"Arn"
]
}
}
},
"Effect": "Allow",
"Principal": {
"Service": "events.amazonaws.com"
},
"Resource": {
"Fn::GetAtt": [
"Queue4A7E3555",
"Arn"
]
},
"Sid": "AllowEventRuleeventsourcestackRuleFCA41174"
}
],
"Version": "2012-10-17"
},
"Queues": [
{
"Ref": "Queue4A7E3555"
}
]
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// !cdk-integ pragma:ignore-assets
import * as events from '@aws-cdk/aws-events';
import * as sqs from '@aws-cdk/aws-sqs';
import * as cdk from '@aws-cdk/core';
import * as targets from '../../lib';

Expand All @@ -12,12 +13,18 @@ class EventSourceStack extends cdk.Stack {
const rule = new events.Rule(this, 'Rule', {
schedule: events.Schedule.expression('rate(1 minute)'),
});

const queue = new sqs.Queue(this, 'Queue');

rule.addTarget(new targets.EventBus(
events.EventBus.fromEventBusArn(
this,
'External',
`arn:aws:events:${this.region}:999999999999:event-bus/test-bus`,
),
{
deadLetterQueue: queue,
},
));
}
}
Expand Down