Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kinesisfirehose-alpha): refactor sourceStream property to support multiple types of sources #31723

Merged
merged 13 commits into from
Oct 17, 2024
12 changes: 6 additions & 6 deletions packages/@aws-cdk/aws-kinesisfirehose-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,23 @@ The above example defines the following resources:

## Sources

There are two main methods of sourcing input data: Kinesis Data Streams and via a "direct
put".
A Kinesis Data Firehose delivery stream can accept data from three main sources: Kinesis Data Streams, Managed Streaming for Apache Kafka (MSK), or via a "direct put" (API calls).

See: [Sending Data to a Delivery Stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html)
in the *Kinesis Data Firehose Developer Guide*.

### Kinesis Data Stream

A delivery stream can read directly from a Kinesis data stream as a consumer of the data
stream. Configure this behaviour by providing a data stream in the `sourceStream`
property when constructing a delivery stream:
stream. Configure this behaviour by passing in a data stream in the `source`
property via the `KinesisStreamSource` class when constructing a delivery stream:

```ts
declare const destination: firehose.IDestination;
const sourceStream = new kinesis.Stream(this, 'Source Stream');

new firehose.DeliveryStream(this, 'Delivery Stream', {
sourceStream: sourceStream,
source: new firehose.KinesisStreamSource(sourceStream),
paulhcsun marked this conversation as resolved.
Show resolved Hide resolved
destination: destination,
});
```
Expand Down Expand Up @@ -444,7 +444,7 @@ necessary permissions for Kinesis Data Firehose to access the resources referenc
delivery stream. One service role is created for the delivery stream that allows Kinesis
Data Firehose to read from a Kinesis data stream (if one is configured as the delivery
stream source) and for server-side encryption. Note that if the DeliveryStream is created
without specifying `sourceStream` or `encryptionKey`, this role is not created as it is not needed.
without specifying a `source` or `encryptionKey`, this role is not created as it is not needed.

Another service role is created for each destination, which gives Kinesis Data Firehose write
access to the destination resource, as well as the ability to invoke data transformers and
Expand Down
23 changes: 10 additions & 13 deletions packages/@aws-cdk/aws-kinesisfirehose-alpha/lib/delivery-stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as kms from 'aws-cdk-lib/aws-kms';
import * as cdk from 'aws-cdk-lib/core';
import { RegionInfo } from 'aws-cdk-lib/region-info';
Expand All @@ -10,6 +9,7 @@ import { IDestination } from './destination';
import { FirehoseMetrics } from 'aws-cdk-lib/aws-kinesisfirehose/lib/kinesisfirehose-canned-metrics.generated';
import { CfnDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';
import { StreamEncryption } from './encryption';
import { ISource } from './source';

const PUT_RECORD_ACTIONS = [
'firehose:PutRecord',
Expand Down Expand Up @@ -201,7 +201,7 @@ export interface DeliveryStreamProps {
*
* @default - data must be written to the delivery stream via a direct put.
*/
readonly sourceStream?: kinesis.IStream;
readonly source?: ISource;

/**
* The IAM role associated with this delivery stream.
Expand Down Expand Up @@ -322,14 +322,14 @@ export class DeliveryStream extends DeliveryStreamBase {

this._role = props.role;

if (props.encryption?.encryptionKey || props.sourceStream) {
if (props.encryption?.encryptionKey || props.source) {
this._role = this._role ?? new iam.Role(this, 'Service Role', {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
});
}

if (
props.sourceStream &&
props.source &&
(props.encryption?.type === StreamEncryptionType.AWS_OWNED || props.encryption?.type === StreamEncryptionType.CUSTOMER_MANAGED)
) {
throw new Error('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
Expand All @@ -353,27 +353,24 @@ export class DeliveryStream extends DeliveryStreamBase {
encryptionKey?.grantEncryptDecrypt(this._role);
}

let sourceStreamConfig = undefined;
let readStreamGrant = undefined;
if (this._role && props.sourceStream) {
sourceStreamConfig = {
kinesisStreamArn: props.sourceStream.streamArn,
roleArn: this._role.roleArn,
};
readStreamGrant = props.sourceStream.grantRead(this._role);
if (this._role && props.source) {
readStreamGrant = props.source.grantRead(this._role);
}

const destinationConfig = props.destination.bind(this, {});
const sourceConfig = props.source?._bind(this, this._role?.roleArn);

const resource = new CfnDeliveryStream(this, 'Resource', {
deliveryStreamEncryptionConfigurationInput: encryptionConfig,
deliveryStreamName: props.deliveryStreamName,
deliveryStreamType: props.sourceStream ? 'KinesisStreamAsSource' : 'DirectPut',
kinesisStreamSourceConfiguration: sourceStreamConfig,
deliveryStreamType: props.source ? 'KinesisStreamAsSource' : 'DirectPut',
...sourceConfig,
...destinationConfig,
});

destinationConfig.dependables?.forEach(dependable => resource.node.addDependency(dependable));

if (readStreamGrant) {
resource.node.addDependency(readStreamGrant);
}
Expand Down
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-kinesisfirehose-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './delivery-stream';
export * from './source';
export * from './destination';
export * from './encryption';
export * from './lambda-function-processor';
Expand Down
78 changes: 78 additions & 0 deletions packages/@aws-cdk/aws-kinesisfirehose-alpha/lib/source.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Construct } from 'constructs';
import { CfnDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';

/**
* A Kinesis Data Firehose delivery stream source configuration.
*/
interface SourceConfig {
/**
* Configuration for using a Kinesis Data Stream as a source for the delivery stream.
*
* This will be returned by the _bind method depending on what type of Source class is specified.
*
* @default - Kinesis Data Stream Source configuration property is not provided.
*/
readonly kinesisStreamSourceConfiguration?: CfnDeliveryStream.KinesisStreamSourceConfigurationProperty;

/**
* Configuration for using an MSK (Managed Streaming for Kafka) cluster as a source for the delivery stream.
*
* This will be returned by the _bind method depending on what type of Source class is specified.
*
* @default - MSK Source configuration property is not provided.
*/
readonly mskSourceConfiguration?: CfnDeliveryStream.MSKSourceConfigurationProperty;
}

/**
* An interface for defining a source that can be used in a Kinesis Data Firehose delivery stream.
*/
export interface ISource {
/**
* Binds this source to the Kinesis Data Firehose delivery stream.
*
* @internal
*/
_bind(scope: Construct, roleArn?: string): SourceConfig;

/**
* Grant read permissions for this source resource and its contents to an IAM
* principal (the delivery stream).
*
* If an encryption key is used, permission to use the key to decrypt the
* contents of the stream will also be granted.
*/
grantRead(grantee: iam.IGrantable): iam.Grant;
}

/**
* A Kinesis Data Firehose delivery stream source.
*/
export class KinesisStreamSource implements ISource {

/**
* Creates a new KinesisStreamSource.
*/
constructor(private readonly stream: kinesis.IStream) {}

grantRead(grantee: iam.IGrantable): iam.Grant {
return this.stream.grantRead(grantee);
}

/**
* Binds the Kinesis stream as a source for the Kinesis Data Firehose delivery stream.
*
* @returns The configuration needed to use this Kinesis stream as the delivery stream source.
* @internal
*/
_bind(_scope: Construct, roleArn: string): SourceConfig {
return {
kinesisStreamSourceConfiguration: {
kinesisStreamArn: this.stream.streamArn,
roleArn: roleArn,
},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as cdk from 'aws-cdk-lib';
import { Construct, Node } from 'constructs';
import * as firehose from '../lib';
import { StreamEncryption } from '../lib';
import * as source from '../lib/source';

describe('delivery stream', () => {
let stack: cdk.Stack;
Expand Down Expand Up @@ -134,7 +135,7 @@ describe('delivery stream', () => {

new firehose.DeliveryStream(stack, 'Delivery Stream', {
destination: mockS3Destination,
sourceStream: sourceStream,
source: new source.KinesisStreamSource(sourceStream),
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', {
Expand Down Expand Up @@ -180,7 +181,7 @@ describe('delivery stream', () => {

new firehose.DeliveryStream(stack, 'Delivery Stream', {
destination: mockS3Destination,
sourceStream: sourceStream,
source: new source.KinesisStreamSource(sourceStream),
role: deliveryStreamRole,
});

Expand Down Expand Up @@ -318,17 +319,17 @@ describe('delivery stream', () => {
expect(() => new firehose.DeliveryStream(stack, 'Delivery Stream 1', {
destination: mockS3Destination,
encryption: firehose.StreamEncryption.awsOwnedKey(),
sourceStream,
source: new source.KinesisStreamSource(sourceStream),
})).toThrowError('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
expect(() => new firehose.DeliveryStream(stack, 'Delivery Stream 2', {
destination: mockS3Destination,
encryption: firehose.StreamEncryption.customerManagedKey(),
sourceStream,
source: new source.KinesisStreamSource(sourceStream),
})).toThrowError('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
expect(() => new firehose.DeliveryStream(stack, 'Delivery Stream 3', {
destination: mockS3Destination,
encryption: StreamEncryption.customerManagedKey(new kms.Key(stack, 'Key')),
sourceStream,
source: new source.KinesisStreamSource(sourceStream),
})).toThrowError('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as s3 from 'aws-cdk-lib/aws-s3';
import * as cdk from 'aws-cdk-lib';
import * as constructs from 'constructs';
import * as firehose from '../lib';
import * as source from '../lib/source';

const app = new cdk.App();

Expand Down Expand Up @@ -35,7 +36,7 @@ const sourceStream = new kinesis.Stream(stack, 'Source Stream');

new firehose.DeliveryStream(stack, 'Delivery Stream', {
destination: mockS3Destination,
sourceStream,
source: new source.KinesisStreamSource(sourceStream),
});

new firehose.DeliveryStream(stack, 'Delivery Stream No Source Or Encryption Key', {
Expand Down