Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipes-sources): add Kinesis and DynamoDB #29476

Merged
merged 46 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c7b63ac
feat(pipes-sources): Kinesis source
msambol Mar 14, 2024
cfa2a59
add validation for parameters
msambol Mar 14, 2024
71d6533
fix undefined default
msambol Mar 14, 2024
2b78676
change maximumRecordAge back to Duration
msambol Mar 14, 2024
e3574b7
update snaps
msambol Mar 14, 2024
fad3251
update int tests
msambol Mar 14, 2024
afc05a1
fix readme, update int test
msambol Mar 14, 2024
7f24e1d
add DynamoDB
msambol Mar 15, 2024
122dae4
update rosetta
msambol Mar 16, 2024
a72e7bf
fix ddb int test
msambol Mar 16, 2024
0fcd8db
fix readme
msambol Mar 16, 2024
82a2d38
Add feedback from Jimmy
msambol Mar 26, 2024
00b95c7
fix docs
msambol Mar 26, 2024
cf601f1
Get integration tests working
msambol Mar 27, 2024
96a1a44
Change timestamp to number
msambol Mar 28, 2024
00c014a
integration test with timestamp
msambol Mar 28, 2024
ad538a3
add check for timestamp
msambol Mar 28, 2024
e6520f1
ISO 8601 format
msambol Mar 29, 2024
6fe65b1
refactor streaming source props
msambol Mar 30, 2024
183316c
add SourceWithDlq
msambol Mar 31, 2024
74b061d
change check for source with dlq
msambol Mar 31, 2024
9f270fc
remove method
msambol Mar 31, 2024
0a46681
move method
msambol Mar 31, 2024
6119392
implement some things in abstract class
msambol Mar 31, 2024
1c37f30
refactor into streamsource class
msambol Mar 31, 2024
9d691b5
dont need deadLetterTargetArn in SourceWithDlq
msambol Mar 31, 2024
f290e15
Add doc
msambol Mar 31, 2024
8ee3794
export stream source
msambol Apr 1, 2024
f7a7dcb
fix export order
msambol Apr 1, 2024
da26beb
add comments
msambol Apr 1, 2024
0d1d049
fix comments
msambol Apr 1, 2024
abd36c9
Move validation tests to streamSource.test.ts
msambol Apr 1, 2024
61339d5
Add first round of feedback from Kendra
msambol Apr 9, 2024
64150d3
Remove InstanceOf for source with dlq
msambol Apr 9, 2024
724ebfb
Use deadLetterTarget not sourceArn
msambol Apr 9, 2024
65f5512
Change name from dlq to deadLetterTarget
msambol Apr 9, 2024
21d490d
Fix integration test
msambol May 3, 2024
20d9fee
Remove streams only verbiage
msambol Jun 7, 2024
8b818f2
Update documentation
msambol Jun 26, 2024
035d70c
Check for tokens in validation, add variables
msambol Sep 16, 2024
946a718
Add type to attributes
msambol Sep 16, 2024
de70f7e
fix naming
msambol Sep 16, 2024
cb2bbbf
fix naming again
msambol Sep 16, 2024
82ae799
trigger build
msambol Sep 16, 2024
05fdd3e
Remove properties, back to how I had it
msambol Sep 23, 2024
ecba4fa
Add documentation around dlq
msambol Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion packages/@aws-cdk/aws-pipes-alpha/lib/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Construct } from 'constructs';
import { IEnrichment } from './enrichment';
import { IFilter } from './filter';
import { ILogDestination, IncludeExecutionData, LogLevel } from './logs';
import { ISource } from './source';
import { ISource, SourceWithDeadLetterTarget } from './source';
import { ITarget } from './target';

/**
Expand Down Expand Up @@ -233,6 +233,16 @@ export class Pipe extends PipeBase {
*/
const source = props.source.bind(this);
props.source.grantRead(this.pipeRole);

/**
* An optional dead-letter queue stores any events that are not successfully delivered to
* a target after all retry attempts are exhausted. The IAM role needs permission to write
* events to the dead-letter queue, either an SQS queue or SNS topic.
*/
if (SourceWithDeadLetterTarget.isSourceWithDeadLetterTarget(props.source)) {
props.source.grantPush(this.pipeRole, props.source.deadLetterTarget);
}
msambol marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +242 to +244
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments to explain why this grantPush permission is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some notes in ecba4fa.


// Add the filter criteria to the source parameters
const sourceParameters : CfnPipe.PipeSourceParametersProperty= {
...source.sourceParameters,
Expand Down
54 changes: 54 additions & 0 deletions packages/@aws-cdk/aws-pipes-alpha/lib/source.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { IRole } from 'aws-cdk-lib/aws-iam';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { ITopic, Topic } from 'aws-cdk-lib/aws-sns';
import { IQueue, Queue } from 'aws-cdk-lib/aws-sqs';
import { IPipe } from './pipe';

/**
* Source properties.
*
Expand Down Expand Up @@ -100,3 +103,54 @@ export interface ISource {
*/
grantRead(grantee: IRole): void;
}

/**
* Sources that support a dead-letter target.
*/
export abstract class SourceWithDeadLetterTarget implements ISource {
/**
* Determines if the source is an instance of SourceWithDeadLetterTarget.
*/
public static isSourceWithDeadLetterTarget(source: ISource): source is SourceWithDeadLetterTarget {
return (source as SourceWithDeadLetterTarget).deadLetterTarget !== undefined;
}
/**
* The ARN of the source resource.
*/
readonly sourceArn: string;
/**
* The dead-letter SQS queue or SNS topic.
*/
readonly deadLetterTarget?: IQueue | ITopic;

constructor(sourceArn: string, deadLetterTarget?: IQueue | ITopic) {
this.sourceArn = sourceArn;
this.deadLetterTarget = deadLetterTarget;
}

abstract bind(pipe: IPipe): SourceConfig;
abstract grantRead(grantee: IRole): void;

/**
* Grants the pipe role permission to publish to the dead-letter target.
*/
public grantPush(grantee: IRole, deadLetterTarget?: IQueue | ITopic) {
if (deadLetterTarget instanceof Queue) {
deadLetterTarget.grantSendMessages(grantee);
} else if (deadLetterTarget instanceof Topic) {
deadLetterTarget.grantPublish(grantee);
}
}

/**
* Retrieves the ARN from the dead-letter SQS queue or SNS topic.
*/
protected getDeadLetterTargetArn(deadLetterTarget?: IQueue | ITopic): string | undefined {
if (deadLetterTarget instanceof Queue) {
return deadLetterTarget.queueArn;
} else if (deadLetterTarget instanceof Topic) {
return deadLetterTarget.topicArn;
}
return undefined;
}
}
74 changes: 72 additions & 2 deletions packages/@aws-cdk/aws-pipes-alpha/test/pipe.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { App, Stack } from 'aws-cdk-lib';

import { Template } from 'aws-cdk-lib/assertions';
import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam';
import { TestEnrichment, TestSource, TestTarget } from './test-classes';
import { Topic } from 'aws-cdk-lib/aws-sns';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { TestEnrichment, TestSource, TestSourceWithDeadLetterTarget, TestTarget } from './test-classes';
import { DesiredState, IEnrichment, ILogDestination, IPipe, ISource, ITarget, IncludeExecutionData, LogDestinationConfig, LogLevel, Pipe } from '../lib';

class TestLogDestination implements ILogDestination {
Expand Down Expand Up @@ -186,6 +187,75 @@ describe('Pipe', () => {
);

});

test('grantPush is called for sources with an SNS topic DLQ', () => {
// WHEN
const topic = new Topic(stack, 'MyTopic');
const sourceWithDeadLetterTarget = new TestSourceWithDeadLetterTarget(topic);

new Pipe(stack, 'TestPipe', {
pipeName: 'TestPipe',
source: sourceWithDeadLetterTarget,
target,
});

const template = Template.fromStack(stack);

// THEN
template.hasResource('AWS::IAM::Policy', {
Properties: {
Roles: [{
Ref: 'TestPipeRole0FD00B2B',
}],
PolicyDocument: {
Statement: [{
Action: 'sns:Publish',
Resource: {
Ref: 'MyTopic86869434',
},
}],
},
},
});
});

test('grantPush is called for sources with an SQS queue DLQ', () => {
// WHEN
const queue = new Queue(stack, 'MyQueue');
const sourceWithDeadLetterTarget = new TestSourceWithDeadLetterTarget(queue);

new Pipe(stack, 'TestPipe', {
pipeName: 'TestPipe',
source: sourceWithDeadLetterTarget,
target,
});

const template = Template.fromStack(stack);

// THEN
template.hasResource('AWS::IAM::Policy', {
Properties: {
Roles: [{
Ref: 'TestPipeRole0FD00B2B',
}],
PolicyDocument: {
Statement: [{
Action: [
'sqs:SendMessage',
'sqs:GetQueueAttributes',
'sqs:GetQueueUrl',
],
Resource: {
'Fn::GetAtt': [
'MyQueueE6CA6235',
'Arn',
],
},
}],
},
},
});
});
});

describe('target', () => {
Expand Down
40 changes: 39 additions & 1 deletion packages/@aws-cdk/aws-pipes-alpha/test/test-classes.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
import { Role } from 'aws-cdk-lib/aws-iam';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { EnrichmentParametersConfig, IEnrichment, ILogDestination, IPipe, ISource, ITarget, LogDestinationConfig, SourceConfig, SourceParameters } from '../lib';
import { ITopic, Topic } from 'aws-cdk-lib/aws-sns';
import { IQueue, Queue } from 'aws-cdk-lib/aws-sqs';
import {
EnrichmentParametersConfig,
IEnrichment,
ILogDestination,
IPipe,
ISource,
ITarget,
LogDestinationConfig,
SourceConfig,
SourceParameters,
SourceWithDeadLetterTarget,
} from '../lib';

export class TestSource implements ISource {
readonly sourceArn = 'source-arn';
Expand All @@ -19,6 +33,30 @@ export class TestSource implements ISource {
}
}

export class TestSourceWithDeadLetterTarget extends SourceWithDeadLetterTarget {
deadLetterTarget?: IQueue | ITopic;
public grantRead = jest.fn();

constructor(deadLetterTarget: IQueue | ITopic) {
super('source-arn', deadLetterTarget);
this.deadLetterTarget = deadLetterTarget;
}

grantPush(grantee: Role, deadLetterTarget?: IQueue | ITopic) {
if (deadLetterTarget instanceof Queue) {
deadLetterTarget.grantSendMessages(grantee);
} else if (deadLetterTarget instanceof Topic) {
deadLetterTarget.grantPublish(grantee);
}
}

bind(_pipe: IPipe): SourceConfig {
return {
sourceParameters: {},
};
}
}

export class TestTarget implements ITarget {
readonly targetArn: string = 'target-arn';
private targetParameters: CfnPipe.PipeTargetParametersProperty = {};
Expand Down
41 changes: 41 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,44 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

### Amazon Kinesis

A Kinesis stream can be used as a source for a pipe. The stream will be polled for new messages and the messages will be sent to the pipe.

```ts
declare const sourceStream: kinesis.Stream;
declare const targetQueue: sqs.Queue;

const pipeSource = new sources.KinesisSource(sourceStream, {
startingPosition: sources.KinesisStartingPosition.LATEST,
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: pipeSource,
target: new SomeTarget(targetQueue)
});
```

### Amazon DynamoDB

A DynamoDB stream can be used as a source for a pipe. The stream will be polled for new messages and the messages will be sent to the pipe.

```ts
const table = new ddb.TableV2(this, 'MyTable', {
partitionKey: {
name: 'id',
type: ddb.AttributeType.STRING,
},
dynamoStream: ddb.StreamViewType.NEW_IMAGE,
});
declare const targetQueue: sqs.Queue;

const pipeSource = new sources.DynamoDBSource(table, {
startingPosition: sources.DynamoDBStartingPosition.LATEST,
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: pipeSource,
target: new SomeTarget(targetQueue)
});
```
58 changes: 58 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { IPipe, SourceConfig } from '@aws-cdk/aws-pipes-alpha';
import { ITableV2 } from 'aws-cdk-lib/aws-dynamodb';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { DynamoDBStartingPosition } from './enums';
import { StreamSource, StreamSourceParameters } from './streamSource';

/**
* Parameters for the DynamoDB source.
*/
export interface DynamoDBSourceParameters extends StreamSourceParameters {
/**
* The position in a stream from which to start reading.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-startingposition
*/
readonly startingPosition: DynamoDBStartingPosition;
msambol marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* A source that reads from an DynamoDB stream.
*/
export class DynamoDBSource extends StreamSource {
private readonly table: ITableV2;
private readonly startingPosition: DynamoDBStartingPosition;
private readonly deadLetterTargetArn?: string;

constructor(table: ITableV2, parameters: DynamoDBSourceParameters) {
if (table.tableStreamArn === undefined) {
throw new Error('Table does not have a stream defined, cannot create pipes source');
}

super(table.tableStreamArn, parameters);
this.table = table;
this.startingPosition = parameters.startingPosition;
this.deadLetterTargetArn = this.getDeadLetterTargetArn(this.deadLetterTarget);
}

bind(_pipe: IPipe): SourceConfig {
return {
sourceParameters: {
dynamoDbStreamParameters: {
batchSize: this.sourceParameters.batchSize,
deadLetterConfig: this.deadLetterTargetArn ? { arn: this.deadLetterTargetArn } : undefined,
maximumBatchingWindowInSeconds: this.sourceParameters.maximumBatchingWindow?.toSeconds(),
maximumRecordAgeInSeconds: this.sourceParameters.maximumRecordAge?.toSeconds(),
maximumRetryAttempts: this.sourceParameters.maximumRetryAttempts,
onPartialBatchItemFailure: this.sourceParameters.onPartialBatchItemFailure,
parallelizationFactor: this.sourceParameters.parallelizationFactor,
startingPosition: this.startingPosition,
},
},
};
}

grantRead(grantee: IRole): void {
this.table.grantStreamRead(grantee);
}
}
47 changes: 47 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/enums.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Define how to handle item process failures.
*/
export enum OnPartialBatchItemFailure {
GavinZZ marked this conversation as resolved.
Show resolved Hide resolved
/**
* EventBridge halves each batch and will retry each half until all the
* records are processed or there is one failed message left in the batch.
*/
AUTOMATIC_BISECT = 'AUTOMATIC_BISECT',
}

/**
* The position in a Kinesis stream from which to start reading.
*/
export enum KinesisStartingPosition {
/**
* Start streaming at the last untrimmed record in the shard,
* which is the oldest data record in the shard.
*/
TRIM_HORIZON = 'TRIM_HORIZON',
/**
* Start streaming just after the most recent record in the shard,
* so that you always read the most recent data in the shard.
*/
LATEST = 'LATEST',
/**
* Start streaming from the position denoted by the time stamp
* specified in the `startingPositionTimestamp` field.
*/
AT_TIMESTAMP = 'AT_TIMESTAMP',
}

/**
* The position in a DynamoDB stream from which to start reading.
*/
export enum DynamoDBStartingPosition {
/**
* Start reading at the last (untrimmed) stream record,
* which is the oldest record in the shard.
*/
TRIM_HORIZON = 'TRIM_HORIZON',
/**
* Start reading just after the most recent stream record in the shard,
* so that you always read the most recent data in the shard.
*/
LATEST = 'LATEST',
}
4 changes: 4 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
export * from './dynamodb';
export * from './enums';
export * from './kinesis';
export * from './sqs';
export * from './streamSource';
Loading