Skip to content

Commit

Permalink
feat(pipes-sources): Kinesis source
Browse files Browse the repository at this point in the history
  • Loading branch information
msambol committed Mar 14, 2024
1 parent 135b520 commit d101e30
Show file tree
Hide file tree
Showing 17 changed files with 34,774 additions and 0 deletions.
17 changes: 17 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,20 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
});
```

### Amazon Kinesis

A Kinesis stream can be used as a source for a pipe. The stream will be polled for new messages and the messages will be sent to the pipe.

```ts
declare const sourceStream: kinesis.Stream;
declare const targetQueue: sqs.Queue;

const pipeSource = new sources.KinesisSource(sourceStream, {
startingPosition: StartingPosition.LATEST,
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: pipeSource,
target: new SomeTarget(targetQueue)
});
```
14 changes: 14 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/deadLetterConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* A DeadLetterConfig object that contains information about a dead-letter queue configuration.
*/
export interface DeadLetterConfigParameters {
/**
* The ARN of the specified target for the dead-letter queue,
*
* For Amazon Kinesis stream and Amazon DynamoDB stream sources, specify either an Amazon SNS topic or Amazon SQS queue ARN.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-deadletterconfig.html#cfn-pipes-pipe-deadletterconfig-arn
* @default undefined
*/
readonly arn?: string;
}
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
export * from './sqs';
export * from './kinesis';
export * from './deadLetterConfig';
146 changes: 146 additions & 0 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/kinesis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { IPipe, ISource, SourceConfig } from '@aws-cdk/aws-pipes-alpha';
import { Duration } from 'aws-cdk-lib';
import { IRole } from 'aws-cdk-lib/aws-iam';
import { IStream } from 'aws-cdk-lib/aws-kinesis';
import { DeadLetterConfigParameters } from './deadLetterConfig';

/**
* Parameters for the Kinesis source.
*/
export interface KinesisSourceParameters {
/**
* The maximum number of records to include in each batch.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-batchsize
* @default undefined
*/
readonly batchSize?: number;

/**
* Define the target queue to send dead-letter queue events to.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-deadletterconfig
* @default undefined
*/
readonly deadLetterConfig?: DeadLetterConfigParameters;

/**
* The maximum length of a time to wait for events.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-maximumbatchingwindowinseconds
* @default undefined
*/
readonly maximumBatchingWindow?: Duration;

/**
* (Streams only) Discard records older than the specified age. The default value is -1, which sets the maximum age to infinite. When the value is set to infinite, EventBridge never discards old records.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-maximumbatchingwindowinseconds
* @default -1
*/
readonly maximumRecordAge?: Duration;

/**
* (Streams only) Discard records after the specified number of retries. The default value is -1, which sets the maximum number of retries to infinite. When MaximumRetryAttempts is infinite, EventBridge retries failed records until the record expires in the event source.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-maximumretryattempts
* @default -1
*/
readonly maximumRetryAttempts?: number;

/**
* (Streams only) Define how to handle item process failures. AUTOMATIC_BISECT halves each batch and retry each half until all the records are processed or there is one failed message left in the batch.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-onpartialbatchitemfailure
* @default -1
*/
readonly onPartialBatchItemFailure?: OnPartialBatchItemFailure;

/**
* (Streams only) The number of batches to process concurrently from each shard. The default value is 1.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-parallelizationfactor
* @default 1
*/
readonly parallelizationFactor?: number;

/**
* (Streams only) The position in a stream from which to start reading.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-startingposition
*/
readonly startingPosition: StartingPosition;

/**
* With StartingPosition set to AT_TIMESTAMP, the time from which to start reading, in Unix time seconds.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-startingpositiontimestamp
* @default undefined
*/
readonly startingPositionTimestamp?: string;
}

/**
* Define how to handle item process failures.
*/
export enum OnPartialBatchItemFailure {
/**
* AUTOMATIC_BISECT
*/
AUTOMATIC_BISECT = 'AUTOMATIC_BISECT',
}

/**
* The position in a stream from which to start reading.
*/
export enum StartingPosition {
/**
* TRIM_HORIZON
*/
TRIM_HORIZON = 'AUTOMATIC_BISECT',
/**
* LATEST
*/
LATEST = 'LATEST',
/**
* AT_TIMESTAMP
*/
AT_TIMESTAMP = 'AT_TIMESTAMP',
}

/**
* A source that reads from Kinesis.
*/
export class KinesisSource implements ISource {
private readonly stream: IStream;
readonly sourceArn;
private sourceParameters;

constructor(stream: IStream, parameters: KinesisSourceParameters) {
this.stream = stream;
this.sourceArn = stream.streamArn;
this.sourceParameters = parameters;
}

bind(_pipe: IPipe): SourceConfig {
return {
sourceParameters: {
kinesisStreamParameters: {
batchSize: this.sourceParameters.batchSize,
deadLetterConfig: this.sourceParameters.deadLetterConfig,
maximumBatchingWindowInSeconds: this.sourceParameters.maximumBatchingWindow?.toSeconds(),
maximumRecordAgeInSeconds: this.sourceParameters.maximumRecordAge?.toSeconds() ?? -1,
maximumRetryAttempts: this.sourceParameters.maximumRetryAttempts ?? -1,
onPartialBatchItemFailure: this.sourceParameters.onPartialBatchItemFailure,
parallelizationFactor: this.sourceParameters.parallelizationFactor ?? 1,
startingPosition: this.sourceParameters.startingPosition,
startingPositionTimestamp: this.sourceParameters.startingPositionTimestamp,
},
},
};
}

grantRead(grantee: IRole): void {
this.stream.grantRead(grantee);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`kinesis should grant pipe role read access 1`] = `
{
"MyPipeRoleCBC8E9AB": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`kinesis should grant pipe role read access 2`] = `
{
"MyPipeRoleDefaultPolicy31387C20": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:SubscribeToShard",
"kinesis:DescribeStream",
"kinesis:ListStreams",
"kinesis:DescribeStreamConsumer",
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyStream5C050E93",
"Arn",
],
},
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
"Roles": [
{
"Ref": "MyPipeRoleCBC8E9AB",
},
],
},
"Type": "AWS::IAM::Policy",
},
}
`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`kinesis should grant pipe role read access 1`] = `
{
"MyPipeRoleCBC8E9AB": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;

exports[`kinesis should grant pipe role read access 2`] = `
{
"MyPipeRoleDefaultPolicy31387C20": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:SubscribeToShard",
"kinesis:DescribeStream",
"kinesis:ListStreams",
"kinesis:DescribeStreamConsumer",
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"MyStream5C050E93",
"Arn",
],
},
},
],
"Version": "2012-10-17",
},
"PolicyName": "MyPipeRoleDefaultPolicy31387C20",
"Roles": [
{
"Ref": "MyPipeRoleCBC8E9AB",
},
],
},
"Type": "AWS::IAM::Policy",
},
}
`;
Loading

0 comments on commit d101e30

Please sign in to comment.