Skip to content

Commit

Permalink
add DynamoDB
Browse files Browse the repository at this point in the history
  • Loading branch information
msambol committed Mar 15, 2024
1 parent 0cac722 commit 5404f66
Show file tree
Hide file tree
Showing 19 changed files with 34,409 additions and 44 deletions.
28 changes: 27 additions & 1 deletion packages/@aws-cdk/aws-pipes-sources-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,33 @@ declare const sourceStream: kinesis.Stream;
declare const targetQueue: sqs.Queue;

const pipeSource = new sources.KinesisSource(sourceStream, {
startingPosition: sources.StartingPosition.LATEST,
startingPosition: sources.KinesisStartingPosition.LATEST,
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: pipeSource,
target: new SomeTarget(targetQueue)
});
```

### Amazon DynamoDB

A DynamoDB stream can be used as a source for a pipe. The stream will be polled for new messages and the messages will be sent to the pipe.

```ts
import * as ddb from 'aws-cdk-lib/aws-dynamodb';

const table = new ddb.TableV2(stack, 'MyTable', {
partitionKey: {
name: 'id',
type: ddb.AttributeType.STRING,
},
dynamoStream: ddb.StreamViewType.NEW_IMAGE,
});
declare const targetQueue: sqs.Queue;

const pipeSource = new sources.DynamoDBSource(table, {
startingPosition: sources.DynamoDBStartingPosition.LATEST,
});

const pipe = new pipes.Pipe(this, 'Pipe', {
Expand Down
158 changes: 158 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import { IPipe, ISource, SourceConfig } from '@aws-cdk/aws-pipes-alpha';
import { Duration } from 'aws-cdk-lib';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { ITableV2 } from 'aws-cdk-lib/aws-dynamodb';
import { DeadLetterConfigParameters } from './deadLetterConfig';
import { DynamoDBStartingPosition, OnPartialBatchItemFailure } from './enums';

/**
* Parameters for the DynamoDB source.
*/
export interface DynamoDBSourceParameters {
/**
* The maximum number of records to include in each batch.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-batchsize
* @default 1
*/
readonly batchSize?: number;

/**
* Define the target queue to send dead-letter queue events to.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-deadletterconfig
* @default no dead letter queue
*/
readonly deadLetterConfig?: DeadLetterConfigParameters;

/**
* The maximum length of a time to wait for events.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-maximumbatchingwindowinseconds
* @default no batching window
*/
readonly maximumBatchingWindow?: Duration;

/**
* (Streams only) Discard records older than the specified age. The default value is -1, which sets the maximum age to infinite. When the value is set to infinite, EventBridge never discards old records.
*
* Leave undefined to set the maximum record age to infinite.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-maximumrecordageinseconds
* @default -1 (infinite)
*/
readonly maximumRecordAge?: Duration;

/**
* (Streams only) Discard records after the specified number of retries. The default value is -1, which sets the maximum number of retries to infinite. When MaximumRetryAttempts is infinite, EventBridge retries failed records until the record expires in the event source.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-maximumretryattempts
* @default -1 (infinite)
*/
readonly maximumRetryAttempts?: number;

/**
* (Streams only) Define how to handle item process failures. AUTOMATIC_BISECT halves each batch and retry each half until all the records are processed or there is one failed message left in the batch.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-onpartialbatchitemfailure
* @default off
*/
readonly onPartialBatchItemFailure?: OnPartialBatchItemFailure;

/**
* (Streams only) The number of batches to process concurrently from each shard. The default value is 1.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-parallelizationfactor
* @default 1
*/
readonly parallelizationFactor?: number;

/**
* (Streams only) The position in a stream from which to start reading.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html#cfn-pipes-pipe-pipesourcedynamodbstreamparameters-startingposition
*/
readonly startingPosition: DynamoDBStartingPosition;
}

/**
* A source that reads from an DynamoDB stream.
*/
export class DynamoDBSource implements ISource {
private readonly table: ITableV2;
readonly sourceArn;
private sourceParameters;

private batchSize;
private maximumBatchingWindowInSeconds;
private maximumRecordAgeInSeconds;
private maximumRetryAttempts;
private parallelizationFactor;

constructor(table: ITableV2, parameters: DynamoDBSourceParameters) {
this.table = table;

if (table.tableStreamArn === undefined) {
throw new Error('Table does not have a stream defined, cannot create pipes source');
}

this.sourceArn = table.tableStreamArn;
this.sourceParameters = parameters;

this.batchSize = this.sourceParameters.batchSize;
this.maximumBatchingWindowInSeconds = this.sourceParameters.maximumBatchingWindow?.toSeconds();
this.maximumRecordAgeInSeconds = this.sourceParameters.maximumRecordAge?.toSeconds();
this.maximumRetryAttempts = this.sourceParameters.maximumRetryAttempts;
this.parallelizationFactor = this.sourceParameters.parallelizationFactor;

if (this.batchSize !== undefined) {
if (this.batchSize < 1 || this.batchSize > 10000) {
throw new Error(`Batch size must be between 1 and 10000, received ${this.batchSize}`);
}
}
if (this.maximumBatchingWindowInSeconds !== undefined) {
// only need to check upper bound since Duration amounts cannot be negative
if (this.maximumBatchingWindowInSeconds > 300) {
throw new Error(`Maximum batching window must be between 0 and 300, received ${this.maximumBatchingWindowInSeconds}`);
}
}
if (this.maximumRecordAgeInSeconds !== undefined) {
// only need to check upper bound since Duration amounts cannot be negative
if (this.maximumRecordAgeInSeconds > 604800) {
throw new Error(`Maximum record age in seconds must be between -1 and 604800, received ${this.maximumRecordAgeInSeconds}`);
}
}
if (this.maximumRetryAttempts !== undefined) {
if (this.maximumRetryAttempts < -1 || this.maximumRetryAttempts > 10000) {
throw new Error(`Maximum retry attempts must be between -1 and 10000, received ${this.maximumRetryAttempts}`);
}
}
if (this.parallelizationFactor !== undefined) {
if (this.parallelizationFactor < 1 || this.parallelizationFactor > 10) {
throw new Error(`Parallelization factor must be between 1 and 10, received ${this.parallelizationFactor}`);
}
}
}

bind(_pipe: IPipe): SourceConfig {
return {
sourceParameters: {
dynamoDbStreamParameters: {
batchSize: this.batchSize,
deadLetterConfig: this.sourceParameters.deadLetterConfig,
maximumBatchingWindowInSeconds: this.maximumBatchingWindowInSeconds,
maximumRecordAgeInSeconds: this.maximumRecordAgeInSeconds,
maximumRetryAttempts: this.maximumRetryAttempts,
onPartialBatchItemFailure: this.sourceParameters.onPartialBatchItemFailure,
parallelizationFactor: this.sourceParameters.parallelizationFactor,
startingPosition: this.sourceParameters.startingPosition,
},
},
};
}

grantRead(grantee: IRole): void {
this.table.grantStreamRead(grantee);
}
}

41 changes: 41 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/enums.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Define how to handle item process failures.
*/
export enum OnPartialBatchItemFailure {
/**
* AUTOMATIC_BISECT
*/
AUTOMATIC_BISECT = 'AUTOMATIC_BISECT',
}

/**
* The position in a Kinesis stream from which to start reading.
*/
export enum KinesisStartingPosition {
/**
* TRIM_HORIZON
*/
TRIM_HORIZON = 'TRIM_HORIZON',
/**
* LATEST
*/
LATEST = 'LATEST',
/**
* AT_TIMESTAMP
*/
AT_TIMESTAMP = 'AT_TIMESTAMP',
}

/**
* The position in a DynamoDB stream from which to start reading.
*/
export enum DynamoDBStartingPosition {
/**
* TRIM_HORIZON
*/
TRIM_HORIZON = 'TRIM_HORIZON',
/**
* LATEST
*/
LATEST = 'LATEST',
}
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export * from './sqs';
export * from './kinesis';
export * from './dynamodb';
export * from './enums';
export * from './deadLetterConfig';
33 changes: 4 additions & 29 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Duration } from 'aws-cdk-lib';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { IStream } from 'aws-cdk-lib/aws-kinesis';
import { DeadLetterConfigParameters } from './deadLetterConfig';
import { KinesisStartingPosition, OnPartialBatchItemFailure } from './enums';

/**
* Parameters for the Kinesis source.
Expand Down Expand Up @@ -71,7 +72,7 @@ export interface KinesisSourceParameters {
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-startingposition
*/
readonly startingPosition: StartingPosition;
readonly startingPosition: KinesisStartingPosition;

/**
* With StartingPosition set to AT_TIMESTAMP, the time from which to start reading, in Unix time seconds.
Expand All @@ -82,41 +83,14 @@ export interface KinesisSourceParameters {
readonly startingPositionTimestamp?: string;
}

/**
* Define how to handle item process failures.
*/
export enum OnPartialBatchItemFailure {
/**
* AUTOMATIC_BISECT
*/
AUTOMATIC_BISECT = 'AUTOMATIC_BISECT',
}

/**
* The position in a stream from which to start reading.
*/
export enum StartingPosition {
/**
* TRIM_HORIZON
*/
TRIM_HORIZON = 'TRIM_HORIZON',
/**
* LATEST
*/
LATEST = 'LATEST',
/**
* AT_TIMESTAMP
*/
AT_TIMESTAMP = 'AT_TIMESTAMP',
}

/**
* A source that reads from Kinesis.
*/
export class KinesisSource implements ISource {
private readonly stream: IStream;
readonly sourceArn;
private sourceParameters;

private batchSize;
private maximumBatchingWindowInSeconds;
private maximumRecordAgeInSeconds;
Expand All @@ -127,6 +101,7 @@ export class KinesisSource implements ISource {
this.stream = stream;
this.sourceArn = stream.streamArn;
this.sourceParameters = parameters;

this.batchSize = this.sourceParameters.batchSize;
this.maximumBatchingWindowInSeconds = this.sourceParameters.maximumBatchingWindow?.toSeconds();
this.maximumRecordAgeInSeconds = this.sourceParameters.maximumRecordAge?.toSeconds();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`dynamodb source should grant pipe role read access 1`] = `
{
"MyPipeRoleCBC8E9AB": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`dynamodb source should grant pipe role read access 2`] = `
{
"MyPipeRoleDefaultPolicy31387C20": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "dynamodb:ListStreams",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyTable794EDED1",
"StreamArn",
],
},
},
{
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyTable794EDED1",
"StreamArn",
],
},
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
"Roles": [
{
"Ref": "MyPipeRoleCBC8E9AB",
},
],
},
"Type": "AWS::IAM::Policy",
},
}
`;
Loading

0 comments on commit 5404f66

Please sign in to comment.