Skip to content

Commit

Permalink
feat(pipes-sources): add Kinesis and DynamoDB (#29476)
Browse files Browse the repository at this point in the history
Closes #29378, #29377.

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
msambol authored Sep 26, 2024
1 parent 48c4192 commit 00c2efb
Show file tree
Hide file tree
Showing 37 changed files with 69,362 additions and 4 deletions.
12 changes: 11 additions & 1 deletion packages/@aws-cdk/aws-pipes-alpha/lib/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Construct } from 'constructs';
import { IEnrichment } from './enrichment';
import { IFilter } from './filter';
import { ILogDestination, IncludeExecutionData, LogLevel } from './logs';
import { ISource } from './source';
import { ISource, SourceWithDeadLetterTarget } from './source';
import { ITarget } from './target';

/**
Expand Down Expand Up @@ -233,6 +233,16 @@ export class Pipe extends PipeBase {
*/
const source = props.source.bind(this);
props.source.grantRead(this.pipeRole);

/**
* An optional dead-letter queue stores any events that are not successfully delivered to
* a target after all retry attempts are exhausted. The IAM role needs permission to write
* events to the dead-letter queue, either an SQS queue or SNS topic.
*/
if (SourceWithDeadLetterTarget.isSourceWithDeadLetterTarget(props.source)) {
props.source.grantPush(this.pipeRole, props.source.deadLetterTarget);
}

// Add the filter criteria to the source parameters
const sourceParameters : CfnPipe.PipeSourceParametersProperty= {
...source.sourceParameters,
Expand Down
54 changes: 54 additions & 0 deletions packages/@aws-cdk/aws-pipes-alpha/lib/source.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { IRole } from 'aws-cdk-lib/aws-iam';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { ITopic, Topic } from 'aws-cdk-lib/aws-sns';
import { IQueue, Queue } from 'aws-cdk-lib/aws-sqs';
import { IPipe } from './pipe';

/**
* Source properties.
*
Expand Down Expand Up @@ -100,3 +103,54 @@ export interface ISource {
*/
grantRead(grantee: IRole): void;
}

/**
* Sources that support a dead-letter target.
*/
export abstract class SourceWithDeadLetterTarget implements ISource {
/**
* Determines if the source is an instance of SourceWithDeadLetterTarget.
*/
public static isSourceWithDeadLetterTarget(source: ISource): source is SourceWithDeadLetterTarget {
return (source as SourceWithDeadLetterTarget).deadLetterTarget !== undefined;
}
/**
* The ARN of the source resource.
*/
readonly sourceArn: string;
/**
* The dead-letter SQS queue or SNS topic.
*/
readonly deadLetterTarget?: IQueue | ITopic;

constructor(sourceArn: string, deadLetterTarget?: IQueue | ITopic) {
this.sourceArn = sourceArn;
this.deadLetterTarget = deadLetterTarget;
}

abstract bind(pipe: IPipe): SourceConfig;
abstract grantRead(grantee: IRole): void;

/**
* Grants the pipe role permission to publish to the dead-letter target.
*/
public grantPush(grantee: IRole, deadLetterTarget?: IQueue | ITopic) {
if (deadLetterTarget instanceof Queue) {
deadLetterTarget.grantSendMessages(grantee);
} else if (deadLetterTarget instanceof Topic) {
deadLetterTarget.grantPublish(grantee);
}
}

/**
* Retrieves the ARN from the dead-letter SQS queue or SNS topic.
*/
protected getDeadLetterTargetArn(deadLetterTarget?: IQueue | ITopic): string | undefined {
if (deadLetterTarget instanceof Queue) {
return deadLetterTarget.queueArn;
} else if (deadLetterTarget instanceof Topic) {
return deadLetterTarget.topicArn;
}
return undefined;
}
}
74 changes: 72 additions & 2 deletions packages/@aws-cdk/aws-pipes-alpha/test/pipe.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { App, Stack } from 'aws-cdk-lib';

import { Template } from 'aws-cdk-lib/assertions';
import { Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam';
import { TestEnrichment, TestSource, TestTarget } from './test-classes';
import { Topic } from 'aws-cdk-lib/aws-sns';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { TestEnrichment, TestSource, TestSourceWithDeadLetterTarget, TestTarget } from './test-classes';
import { DesiredState, IEnrichment, ILogDestination, IPipe, ISource, ITarget, IncludeExecutionData, LogDestinationConfig, LogLevel, Pipe } from '../lib';

class TestLogDestination implements ILogDestination {
Expand Down Expand Up @@ -186,6 +187,75 @@ describe('Pipe', () => {
);

});

test('grantPush is called for sources with an SNS topic DLQ', () => {
// WHEN
const topic = new Topic(stack, 'MyTopic');
const sourceWithDeadLetterTarget = new TestSourceWithDeadLetterTarget(topic);

new Pipe(stack, 'TestPipe', {
pipeName: 'TestPipe',
source: sourceWithDeadLetterTarget,
target,
});

const template = Template.fromStack(stack);

// THEN
template.hasResource('AWS::IAM::Policy', {
Properties: {
Roles: [{
Ref: 'TestPipeRole0FD00B2B',
}],
PolicyDocument: {
Statement: [{
Action: 'sns:Publish',
Resource: {
Ref: 'MyTopic86869434',
},
}],
},
},
});
});

test('grantPush is called for sources with an SQS queue DLQ', () => {
// WHEN
const queue = new Queue(stack, 'MyQueue');
const sourceWithDeadLetterTarget = new TestSourceWithDeadLetterTarget(queue);

new Pipe(stack, 'TestPipe', {
pipeName: 'TestPipe',
source: sourceWithDeadLetterTarget,
target,
});

const template = Template.fromStack(stack);

// THEN
template.hasResource('AWS::IAM::Policy', {
Properties: {
Roles: [{
Ref: 'TestPipeRole0FD00B2B',
}],
PolicyDocument: {
Statement: [{
Action: [
'sqs:SendMessage',
'sqs:GetQueueAttributes',
'sqs:GetQueueUrl',
],
Resource: {
'Fn::GetAtt': [
'MyQueueE6CA6235',
'Arn',
],
},
}],
},
},
});
});
});

describe('target', () => {
Expand Down
40 changes: 39 additions & 1 deletion packages/@aws-cdk/aws-pipes-alpha/test/test-classes.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
import { Role } from 'aws-cdk-lib/aws-iam';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { EnrichmentParametersConfig, IEnrichment, ILogDestination, IPipe, ISource, ITarget, LogDestinationConfig, SourceConfig, SourceParameters } from '../lib';
import { ITopic, Topic } from 'aws-cdk-lib/aws-sns';
import { IQueue, Queue } from 'aws-cdk-lib/aws-sqs';
import {
EnrichmentParametersConfig,
IEnrichment,
ILogDestination,
IPipe,
ISource,
ITarget,
LogDestinationConfig,
SourceConfig,
SourceParameters,
SourceWithDeadLetterTarget,
} from '../lib';

export class TestSource implements ISource {
readonly sourceArn = 'source-arn';
Expand All @@ -19,6 +33,30 @@ export class TestSource implements ISource {
}
}

export class TestSourceWithDeadLetterTarget extends SourceWithDeadLetterTarget {
deadLetterTarget?: IQueue | ITopic;
public grantRead = jest.fn();

constructor(deadLetterTarget: IQueue | ITopic) {
super('source-arn', deadLetterTarget);
this.deadLetterTarget = deadLetterTarget;
}

grantPush(grantee: Role, deadLetterTarget?: IQueue | ITopic) {
if (deadLetterTarget instanceof Queue) {
deadLetterTarget.grantSendMessages(grantee);
} else if (deadLetterTarget instanceof Topic) {
deadLetterTarget.grantPublish(grantee);
}
}

bind(_pipe: IPipe): SourceConfig {
return {
sourceParameters: {},
};
}
}

export class TestTarget implements ITarget {
readonly targetArn: string = 'target-arn';
private targetParameters: CfnPipe.PipeTargetParametersProperty = {};
Expand Down
41 changes: 41 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,44 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

### Amazon Kinesis

A Kinesis stream can be used as a source for a pipe. The stream will be polled for new messages and the messages will be sent to the pipe.

```ts
declare const sourceStream: kinesis.Stream;
declare const targetQueue: sqs.Queue;

const pipeSource = new sources.KinesisSource(sourceStream, {
startingPosition: sources.KinesisStartingPosition.LATEST,
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: pipeSource,
target: new SomeTarget(targetQueue)
});
```

### Amazon DynamoDB

A DynamoDB stream can be used as a source for a pipe. The stream will be polled for new messages and the messages will be sent to the pipe.

```ts
const table = new ddb.TableV2(this, 'MyTable', {
partitionKey: {
name: 'id',
type: ddb.AttributeType.STRING,
},
dynamoStream: ddb.StreamViewType.NEW_IMAGE,
});
declare const targetQueue: sqs.Queue;

const pipeSource = new sources.DynamoDBSource(table, {
startingPosition: sources.DynamoDBStartingPosition.LATEST,
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: pipeSource,
target: new SomeTarget(targetQueue)
});
```
58 changes: 58 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { IPipe, SourceConfig } from '@aws-cdk/aws-pipes-alpha';
import { ITableV2 } from 'aws-cdk-lib/aws-dynamodb';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { DynamoDBStartingPosition } from './enums';
import { StreamSource, StreamSourceParameters } from './streamSource';

/**
* Parameters for the DynamoDB source.
*/
export interface DynamoDBSourceParameters extends StreamSourceParameters {
/**
* The position in a stream from which to start reading.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-startingposition
*/
readonly startingPosition: DynamoDBStartingPosition;
}

/**
* A source that reads from an DynamoDB stream.
*/
export class DynamoDBSource extends StreamSource {
private readonly table: ITableV2;
private readonly startingPosition: DynamoDBStartingPosition;
private readonly deadLetterTargetArn?: string;

constructor(table: ITableV2, parameters: DynamoDBSourceParameters) {
if (table.tableStreamArn === undefined) {
throw new Error('Table does not have a stream defined, cannot create pipes source');
}

super(table.tableStreamArn, parameters);
this.table = table;
this.startingPosition = parameters.startingPosition;
this.deadLetterTargetArn = this.getDeadLetterTargetArn(this.deadLetterTarget);
}

bind(_pipe: IPipe): SourceConfig {
return {
sourceParameters: {
dynamoDbStreamParameters: {
batchSize: this.sourceParameters.batchSize,
deadLetterConfig: this.deadLetterTargetArn ? { arn: this.deadLetterTargetArn } : undefined,
maximumBatchingWindowInSeconds: this.sourceParameters.maximumBatchingWindow?.toSeconds(),
maximumRecordAgeInSeconds: this.sourceParameters.maximumRecordAge?.toSeconds(),
maximumRetryAttempts: this.sourceParameters.maximumRetryAttempts,
onPartialBatchItemFailure: this.sourceParameters.onPartialBatchItemFailure,
parallelizationFactor: this.sourceParameters.parallelizationFactor,
startingPosition: this.startingPosition,
},
},
};
}

grantRead(grantee: IRole): void {
this.table.grantStreamRead(grantee);
}
}
47 changes: 47 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/enums.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Define how to handle item process failures.
*/
export enum OnPartialBatchItemFailure {
/**
* EventBridge halves each batch and will retry each half until all the
* records are processed or there is one failed message left in the batch.
*/
AUTOMATIC_BISECT = 'AUTOMATIC_BISECT',
}

/**
* The position in a Kinesis stream from which to start reading.
*/
export enum KinesisStartingPosition {
/**
* Start streaming at the last untrimmed record in the shard,
* which is the oldest data record in the shard.
*/
TRIM_HORIZON = 'TRIM_HORIZON',
/**
* Start streaming just after the most recent record in the shard,
* so that you always read the most recent data in the shard.
*/
LATEST = 'LATEST',
/**
* Start streaming from the position denoted by the time stamp
* specified in the `startingPositionTimestamp` field.
*/
AT_TIMESTAMP = 'AT_TIMESTAMP',
}

/**
* The position in a DynamoDB stream from which to start reading.
*/
export enum DynamoDBStartingPosition {
/**
* Start reading at the last (untrimmed) stream record,
* which is the oldest record in the shard.
*/
TRIM_HORIZON = 'TRIM_HORIZON',
/**
* Start reading just after the most recent stream record in the shard,
* so that you always read the most recent data in the shard.
*/
LATEST = 'LATEST',
}
4 changes: 4 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
export * from './dynamodb';
export * from './enums';
export * from './kinesis';
export * from './sqs';
export * from './streamSource';
Loading

0 comments on commit 00c2efb

Please sign in to comment.