Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipes): add LogDestination implementation #31672

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 28 additions & 102 deletions packages/@aws-cdk/aws-pipes-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@


EventBridge Pipes let you create source to target connections between several
aws services. While transporting messages from a source to a target the messages
AWS services. While transporting messages from a source to a target the messages
can be filtered, transformed and enriched.

![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png)

For more details see the service documentation:

[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)
For more details see the [service documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html).

## Pipe

Expand All @@ -34,12 +32,12 @@ is a fully managed service that enables point-to-point integrations between
event producers and consumers. Pipes can be used to connect several AWS services
to each other, or to connect AWS services to external services.

A Pipe has a source and a target. The source events can be filtered and enriched
A pipe has a source and a target. The source events can be filtered and enriched
before reaching the target.

## Example - pipe usage

> The following code examples use an example implementation of a [source](#source) and [target](#target). In the future there will be separate packages for the sources and targets.
> The following code examples use an example implementation of a [source](#source) and [target](#target).

To define a pipe you need to create a new `Pipe` construct. The `Pipe` construct needs a source and a target.

Expand Down Expand Up @@ -68,39 +66,18 @@ possible:
- [Self managed Apache Kafka stream](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-kafka.html)
- [Amazon SQS queue](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html)

> Currently no implementation exist for any of the supported sources. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.

### Example source implementation
### Example source

```ts fixture=pipes-imports
class SqsSource implements pipes.ISource {
sourceArn: string;
sourceParameters = undefined;

constructor(private readonly queue: sqs.Queue) {
this.queue = queue;
this.sourceArn = queue.queueArn;
}

bind(_pipe: pipes.IPipe): pipes.SourceConfig {
return {
sourceParameters: this.sourceParameters,
};
}

grantRead(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantConsumeMessages(pipeRole);
}
}
```ts
declare const sourceQueue: sqs.Queue;
const pipeSource = new SqsSource(sourceQueue);
```

A source implementation needs to provide the `sourceArn`, `sourceParameters` and grant the pipe role read access to the source.

## Filter

A Filter can be used to filter the events from the source before they are
A filter can be used to filter the events from the source before they are
forwarded to the enrichment or, if no enrichment is present, target step. Multiple filter expressions are possible.
If one of the filter expressions matches the event is forwarded to the enrichment or target step.
If one of the filter expressions matches, the event is forwarded to the enrichment or target step.

### Example - filter usage

Expand Down Expand Up @@ -130,7 +107,7 @@ This example shows a filter that only forwards events with the `customerType` B2

You can define multiple filter pattern which are combined with a logical `OR`.

Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html)
Additional filter pattern and details can be found in the EventBridge pipes [docs](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html).

## Input transformation

Expand Down Expand Up @@ -175,7 +152,7 @@ So when the following batch of input events is processed by the pipe
]
```

it is converted into the following payload.
it is converted into the following payload:

```json
[
Expand All @@ -189,7 +166,7 @@ it is converted into the following payload.
]
```

If the transformation is applied to a target it might be converted to a string representation. E.g. the resulting SQS message body looks like this.
If the transformation is applied to a target it might be converted to a string representation. For example, the resulting SQS message body looks like this:

```json
[
Expand Down Expand Up @@ -237,7 +214,7 @@ So when the following batch of input events is processed by the pipe
]
```

it is converted into the following target payload.
it is converted into the following target payload:

```json
[
Expand Down Expand Up @@ -420,95 +397,44 @@ targets are supported:
The target event can be transformed before it is forwarded to the target using
the same input transformation as in the enrichment step.

### Example target implementation

> Currently no implementation exist for any of the supported targets. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.

```ts fixture=pipes-imports
class SqsTarget implements pipes.ITarget {
targetArn: string;
private inputTransformation: pipes.InputTransformation | undefined;

constructor(private readonly queue: sqs.Queue, props: {inputTransformation?: pipes.InputTransformation} = {}) {
this.queue = queue;
this.targetArn = queue.queueArn;
this.inputTransformation = props?.inputTransformation
}

bind(_pipe: pipes.Pipe): pipes.TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTransformation?.bind(_pipe).inputTemplate,
},
};
}
### Example target

grantPush(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantSendMessages(pipeRole);
}
}
```ts
declare const targetQueue: sqs.Queue;
const pipeTarget = new SqsTarget(targetQueue);
```

A target implementation needs to provide the `targetArn`, `enrichmentParameters` and grant the pipe role invoke access to the enrichment.

## Log destination

A pipe can produce log events that are forwarded to different log destinations.
You can configure multiple destinations, but all the destination share the same log level and log data.
For details check the official [documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-logs.html).

The log level and data that is included in the log events is configured on the pipe class itself.
Whereas the actual destination is defined independent.

### Example log destination implementation
The actual destination is defined independently, and there are three options:

> Currently no implementation exist for any of the supported enrichments. The following example shows how an implementation can look like. The actual implementation is not part of this package and will be in a separate one.


```ts fixture=pipes-imports
class CloudwatchDestination implements pipes.ILogDestination {
parameters: pipes.LogDestinationParameters;

constructor(private readonly logGroup: cdk.aws_logs.LogGroup) {
this.logGroup = logGroup;
this.parameters = {
cloudwatchLogsLogDestination: {
logGroupArn: logGroup.logGroupArn,
},
};
}

bind(_pipe: pipes.IPipe): pipes.LogDestinationConfig {
return {
parameters: this.parameters,
};
}

grantPush(pipeRole: cdk.aws_iam.IRole): void {
this.logGroup.grantWrite(pipeRole);
}
}
```
1. `CloudwatchLogsLogDestination`
2. `FirehoseLogDestination`
3. `S3LogDestination`

### Example log destination usage

```ts
declare const sourceQueue: sqs.Queue;
declare const targetQueue: sqs.Queue;
declare const loggroup: logs.LogGroup;
declare const logGroup: logs.LogGroup;

const cwlLogDestination = new pipes.CloudwatchLogsLogDestination(logGroup);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SqsSource(sourceQueue),
target: new SqsTarget(targetQueue),

logLevel: pipes.LogLevel.TRACE,
logIncludeExecutionData: [pipes.IncludeExecutionData.ALL],

logDestinations: [
new CloudwatchDestination(loggroup),
],
logDestinations: [cwlLogDestination],
});
```

This example uses a cloudwatch loggroup to store the log emitted during a pipe execution. The log level is set to `TRACE` so all steps of the pipe are logged.
This example uses a CloudWatch Logs log group to store the log emitted during a pipe execution.
The log level is set to `TRACE` so all steps of the pipe are logged.
Additionally all execution data is logged as well.
129 changes: 128 additions & 1 deletion packages/@aws-cdk/aws-pipes-alpha/lib/logs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
// eslint-disable-next-line import/no-extraneous-dependencies
import { IDeliveryStream } from '@aws-cdk/aws-kinesisfirehose-alpha';
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paulhcsun I saw you working on this module. Can I use this here??

import { IRole } from 'aws-cdk-lib/aws-iam';
import { ILogGroup } from 'aws-cdk-lib/aws-logs';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { IBucket } from 'aws-cdk-lib/aws-s3';
import { IPipe } from './pipe';

/**
Expand Down Expand Up @@ -36,6 +40,52 @@ export enum LogLevel {
TRACE = 'TRACE',
}

/**
* Log format for `S3LogDestination` logging configuration.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-s3logdestination.html#cfn-pipes-pipe-s3logdestination-outputformat
*/
export enum S3OutputFormat {
/**
* PLAIN
*/
PLAIN = 'plain',
/**
* JSON
*/
JSON = 'json',
/**
* W3C
*/
W3C = 'w3c',
}

/**
* Properties for `S3LogDestination`.
*/
export interface S3LogDestinationProps {
/**
* The name of the S3 bucket to deliver the log records for the pipe.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-s3logdestination.html#cfn-pipes-pipe-s3logdestination-bucketname
*/
readonly bucket: IBucket;
/**
* The format for the log records.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-s3logdestination.html#cfn-pipes-pipe-s3logdestination-outputformat
* @default `S3OutputFormat.PLAIN`
*/
readonly outputFormat?: S3OutputFormat;
/**
* The prefix text with which to begin Amazon S3 log object names.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-s3logdestination.html#cfn-pipes-pipe-s3logdestination-prefix
* @default - no prefix
*/
readonly prefix?: string;
}

/**
* Log destination configuration parameters.
*/
Expand Down Expand Up @@ -82,7 +132,6 @@ export interface LogDestinationConfig {
* Log destination base class.
*/
export interface ILogDestination {

/**
* Bind the log destination to the pipe.
*/
Expand All @@ -93,3 +142,81 @@ export interface ILogDestination {
*/
grantPush(grantee: IRole): void;
}

/**
* CloudWatch Logs log group for delivery of pipe logs.
*/
export class CloudwatchLogsLogDestination implements ILogDestination {
private logGroup: ILogGroup;

constructor(logGroup: ILogGroup) {
this.logGroup = logGroup;
}

bind(_pipe: IPipe): LogDestinationConfig {
return {
parameters: {
cloudwatchLogsLogDestination: {
logGroupArn: this.logGroup.logGroupArn,
},
},
};
}

grantPush(pipeRole: IRole): void {
this.logGroup.grantWrite(pipeRole);
}
}

/**
* Firehose stream for delivery of pipe logs.
*/
export class FirehoseLogDestination implements ILogDestination {
private deliveryStream: IDeliveryStream;

constructor(deliveryStream: IDeliveryStream) {
this.deliveryStream = deliveryStream;
}

bind(_pipe: IPipe): LogDestinationConfig {
return {
parameters: {
firehoseLogDestination: {
deliveryStreamArn: this.deliveryStream.deliveryStreamArn,
},
},
};
}

grantPush(pipeRole: IRole): void {
this.deliveryStream.grantPutRecords(pipeRole);
}
}

/**
* S3 bucket for delivery of pipe logs.
*/
export class S3LogDestination implements ILogDestination {
private parameters: S3LogDestinationProps;

constructor(parameters: S3LogDestinationProps) {
this.parameters = parameters;
}

bind(_pipe: IPipe): LogDestinationConfig {
return {
parameters: {
s3LogDestination: {
bucketName: this.parameters.bucket.bucketName,
bucketOwner: this.parameters.bucket.env.account,
outputFormat: this.parameters.outputFormat,
prefix: this.parameters.prefix,
},
},
};
}

grantPush(pipeRole: IRole): void {
this.parameters.bucket.grantWrite(pipeRole);
}
}
6 changes: 4 additions & 2 deletions packages/@aws-cdk/aws-pipes-alpha/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@
"jest": "^29",
"aws-cdk-lib": "0.0.0",
"constructs": "^10.0.0",
"@aws-cdk/integ-tests-alpha": "0.0.0"
"@aws-cdk/integ-tests-alpha": "0.0.0",
"@aws-cdk/aws-kinesisfirehose-alpha": "0.0.0"
},
"dependencies": {},
"peerDependencies": {
"aws-cdk-lib": "^0.0.0",
"constructs": "^10.0.0"
"constructs": "^10.0.0",
"@aws-cdk/aws-kinesisfirehose-alpha": "0.0.0"
},
"engines": {
"node": ">= 14.15.0"
Expand Down
Loading