Skip to content

Commit

Permalink
feat(events): dead letter queue for Lambda Targets (#11617)
Browse files Browse the repository at this point in the history
Add DLQ Configuration to Rule targets. Using a DLQ on a rule prevents the application to loose events after all retry attempts are exhausted.

Resolves #11612 

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
DaWyz authored Mar 3, 2021
1 parent 3413b2f commit 1bb3650
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 3 deletions.
35 changes: 34 additions & 1 deletion packages/@aws-cdk/aws-events-targets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,40 @@ Currently supported are:
See the README of the `@aws-cdk/aws-events` library for more information on
EventBridge.

## LogGroup
## Invoke a Lambda function

Use the `LambdaFunction` target to invoke a lambda function.

The code snippet below creates an event rule with a Lambda function as a target
triggered for every events from `aws.ec2` source. You can optionally attach a
[dead letter queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/rule-dlq.html).

```ts
import * as lambda from "@aws-cdk/aws-lambda";
import * as events from "@aws-cdk/aws-events";
import * as sqs from "@aws-cdk/aws-sqs";
import * as targets from "@aws-cdk/aws-events-targets";

const fn = new lambda.Function(this, 'MyFunc', {
runtime: lambda.Runtime.NODEJS_12_X,
handler: 'index.handler',
code: lambda.Code.fromInline(`exports.handler = ${handler.toString()}`),
});

const rule = new events.Rule(this, 'rule', {
eventPattern: {
source: ["aws.ec2"],
},
});

const queue = new sqs.Queue(this, 'Queue');

rule.addTarget(new targets.LambdaFunction(fn, {
deadLetterQueue: queue, // Optional: add a dead letter queue
}));
```

## Log an event into a LogGroup

Use the `LogGroup` target to log your events in a CloudWatch LogGroup.

Expand Down
20 changes: 19 additions & 1 deletion packages/@aws-cdk/aws-events-targets/lib/lambda.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as events from '@aws-cdk/aws-events';
import * as lambda from '@aws-cdk/aws-lambda';
import { addLambdaPermission } from './util';
import * as sqs from '@aws-cdk/aws-sqs';
import { addLambdaPermission, addToDeadLetterQueueResourcePolicy } from './util';

/**
* Customize the Lambda Event Target
Expand All @@ -14,6 +15,18 @@ export interface LambdaFunctionProps {
* @default the entire EventBridge event
*/
readonly event?: events.RuleTargetInput;

/**
* The SQS queue to be used as deadLetterQueue.
* Check out the [considerations for using a dead-letter queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/rule-dlq.html#dlq-considerations).
*
* The events not successfully delivered are automatically retried for a specified period of time,
* depending on the retry policy of the target.
* If an event is not delivered before all retry attempts are exhausted, it will be sent to the dead letter queue.
*
* @default - no dead-letter queue
*/
readonly deadLetterQueue?: sqs.IQueue;
}

/**
Expand All @@ -32,9 +45,14 @@ export class LambdaFunction implements events.IRuleTarget {
// Allow handler to be called from rule
addLambdaPermission(rule, this.handler);

if (this.props.deadLetterQueue) {
addToDeadLetterQueueResourcePolicy(rule, this.props.deadLetterQueue);
}

return {
id: '',
arn: this.handler.functionArn,
deadLetterConfig: this.props.deadLetterQueue ? { arn: this.props.deadLetterQueue?.queueArn } : undefined,
input: this.props.event,
targetResource: this.handler,
};
Expand Down
45 changes: 44 additions & 1 deletion packages/@aws-cdk/aws-events-targets/lib/util.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as events from '@aws-cdk/aws-events';
import * as iam from '@aws-cdk/aws-iam';
import * as lambda from '@aws-cdk/aws-lambda';
import { ConstructNode, IConstruct, Names } from '@aws-cdk/core';
import * as sqs from '@aws-cdk/aws-sqs';
import { Annotations, ConstructNode, IConstruct, Names, Token, TokenComparison } from '@aws-cdk/core';

// keep this import separate from other imports to reduce chance for merge conflicts with v2-main
// eslint-disable-next-line no-duplicate-imports, import/order
Expand Down Expand Up @@ -50,3 +51,45 @@ export function addLambdaPermission(rule: events.IRule, handler: lambda.IFunctio
});
}
}

/**
* Allow a rule to send events with failed invocation to an Amazon SQS queue.
*/
export function addToDeadLetterQueueResourcePolicy(rule: events.IRule, queue: sqs.IQueue) {
if (!sameEnvDimension(rule.env.region, queue.env.region)) {
throw new Error(`Cannot assign Dead Letter Queue in region ${queue.env.region} to the rule ${Names.nodeUniqueId(rule.node)} in region ${rule.env.region}. Both the queue and the rule must be in the same region.`);
}

// Skip Resource Policy creation if the Queue is not in the same account.
// There is no way to add a target onto an imported rule, so we can assume we will run the following code only
// in the account where the rule is created.
if (sameEnvDimension(rule.env.account, queue.env.account)) {
const policyStatementId = `AllowEventRule${Names.nodeUniqueId(rule.node)}`;

queue.addToResourcePolicy(new iam.PolicyStatement({
sid: policyStatementId,
principals: [new iam.ServicePrincipal('events.amazonaws.com')],
effect: iam.Effect.ALLOW,
actions: ['sqs:SendMessage'],
resources: [queue.queueArn],
conditions: {
ArnEquals: {
'aws:SourceArn': rule.ruleArn,
},
},
}));
} else {
Annotations.of(rule).addWarning(`Cannot add a resource policy to your dead letter queue associated with rule ${rule.ruleName} because the queue is in a different account. You must add the resource policy manually to the dead letter queue in account ${queue.env.account}.`);
}
}


/**
* Whether two string probably contain the same environment dimension (region or account)
*
* Used to compare either accounts or regions, and also returns true if both
* are unresolved (in which case both are expted to be "current region" or "current account").
*/
function sameEnvDimension(dim1: string, dim2: string) {
return [TokenComparison.SAME, TokenComparison.BOTH_UNRESOLVED].includes(Token.compareStrings(dim1, dim2));
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,95 @@
]
}
}
},
"Timer30894E3BB": {
"Type": "AWS::Events::Rule",
"Properties": {
"ScheduleExpression": "rate(2 minutes)",
"State": "ENABLED",
"Targets": [
{
"Arn": {
"Fn::GetAtt": [
"MyFunc8A243A2C",
"Arn"
]
},
"DeadLetterConfig": {
"Arn": {
"Fn::GetAtt": [
"Queue4A7E3555",
"Arn"
]
}
},
"Id": "Target0"
}
]
}
},
"Timer3AllowEventRulelambdaeventsMyFunc910E580F79317F73": {
"Type": "AWS::Lambda::Permission",
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Fn::GetAtt": [
"MyFunc8A243A2C",
"Arn"
]
},
"Principal": "events.amazonaws.com",
"SourceArn": {
"Fn::GetAtt": [
"Timer30894E3BB",
"Arn"
]
}
}
},
"Queue4A7E3555": {
"Type": "AWS::SQS::Queue",
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"QueuePolicy25439813": {
"Type": "AWS::SQS::QueuePolicy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Fn::GetAtt": [
"Timer30894E3BB",
"Arn"
]
}
}
},
"Effect": "Allow",
"Principal": {
"Service": "events.amazonaws.com"
},
"Resource": {
"Fn::GetAtt": [
"Queue4A7E3555",
"Arn"
]
},
"Sid": "AllowEventRulelambdaeventsTimer3107B9373"
}
],
"Version": "2012-10-17"
},
"Queues": [
{
"Ref": "Queue4A7E3555"
}
]
}
}
}
}
12 changes: 12 additions & 0 deletions packages/@aws-cdk/aws-events-targets/test/lambda/integ.events.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as events from '@aws-cdk/aws-events';
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import * as cdk from '@aws-cdk/core';
import * as targets from '../../lib';

Expand All @@ -23,6 +24,17 @@ const timer2 = new events.Rule(stack, 'Timer2', {
});
timer2.addTarget(new targets.LambdaFunction(fn));


const timer3 = new events.Rule(stack, 'Timer3', {
schedule: events.Schedule.rate(cdk.Duration.minutes(2)),
});

const queue = new sqs.Queue(stack, 'Queue');

timer3.addTarget(new targets.LambdaFunction(fn, {
deadLetterQueue: queue,
}));

app.synth();

/* eslint-disable no-console */
Expand Down
Loading

0 comments on commit 1bb3650

Please sign in to comment.