-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
event-source-mapping.ts
459 lines (397 loc) · 17.2 KB
/
event-source-mapping.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
import { Construct } from 'constructs';
import { IEventSourceDlq } from './dlq';
import { IFunction } from './function-base';
import { CfnEventSourceMapping } from './lambda.generated';
import * as cdk from '../../core';
/**
* The type of authentication protocol or the VPC components for your event source's SourceAccessConfiguration
* @see https://docs.aws.amazon.com/lambda/latest/dg/API_SourceAccessConfiguration.html#SSS-Type-SourceAccessConfiguration-Type
*/
export class SourceAccessConfigurationType {
/**
* (MQ) The Secrets Manager secret that stores your broker credentials.
*/
public static readonly BASIC_AUTH = new SourceAccessConfigurationType('BASIC_AUTH');
/**
* The subnets associated with your VPC. Lambda connects to these subnets to fetch data from your Self-Managed Apache Kafka cluster.
*/
public static readonly VPC_SUBNET = new SourceAccessConfigurationType('VPC_SUBNET');
/**
* The VPC security group used to manage access to your Self-Managed Apache Kafka brokers.
*/
public static readonly VPC_SECURITY_GROUP = new SourceAccessConfigurationType('VPC_SECURITY_GROUP');
/**
* The Secrets Manager ARN of your secret key used for SASL SCRAM-256 authentication of your Self-Managed Apache Kafka brokers.
*/
public static readonly SASL_SCRAM_256_AUTH = new SourceAccessConfigurationType('SASL_SCRAM_256_AUTH');
/**
* The Secrets Manager ARN of your secret key used for SASL SCRAM-512 authentication of your Self-Managed Apache Kafka brokers.
*/
public static readonly SASL_SCRAM_512_AUTH = new SourceAccessConfigurationType('SASL_SCRAM_512_AUTH');
/**
* The Secrets Manager ARN of your secret key containing the certificate chain (X.509 PEM), private key (PKCS#8 PEM),
* and private key password (optional) used for mutual TLS authentication of your MSK/Apache Kafka brokers.
*/
public static readonly CLIENT_CERTIFICATE_TLS_AUTH = new SourceAccessConfigurationType('CLIENT_CERTIFICATE_TLS_AUTH');
/**
* The Secrets Manager ARN of your secret key containing the root CA certificate (X.509 PEM) used for TLS encryption of your Apache Kafka brokers.
*/
public static readonly SERVER_ROOT_CA_CERTIFICATE = new SourceAccessConfigurationType('SERVER_ROOT_CA_CERTIFICATE');
/** A custom source access configuration property */
public static of(name: string): SourceAccessConfigurationType {
return new SourceAccessConfigurationType(name);
}
/**
* The key to use in `SourceAccessConfigurationProperty.Type` property in CloudFormation
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html#cfn-lambda-eventsourcemapping-sourceaccessconfiguration-type
*/
public readonly type: string;
private constructor(type: string) {
this.type = type;
}
}
/**
* Specific settings like the authentication protocol or the VPC components to secure access to your event source.
*/
export interface SourceAccessConfiguration {
/**
* The type of authentication protocol or the VPC components for your event source. For example: "SASL_SCRAM_512_AUTH".
*/
readonly type: SourceAccessConfigurationType,
/**
* The value for your chosen configuration in type.
* For example: "URI": "arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName".
* The exact string depends on the type.
* @see SourceAccessConfigurationType
*/
readonly uri: string
}
export interface EventSourceMappingOptions {
/**
* The Amazon Resource Name (ARN) of the event source. Any record added to
* this stream can invoke the Lambda function.
*
* @default - not set if using a self managed Kafka cluster, throws an error otherwise
*/
readonly eventSourceArn?: string;
/**
* The largest number of records that AWS Lambda will retrieve from your event
* source at the time of invoking your function. Your function receives an
* event with all the retrieved records.
*
* Valid Range: Minimum value of 1. Maximum value of 10000.
*
* @default - Amazon Kinesis, Amazon DynamoDB, and Amazon MSK is 100 records.
* The default for Amazon SQS is 10 messages. For standard SQS queues, the maximum is 10,000. For FIFO SQS queues, the maximum is 10.
*/
readonly batchSize?: number;
/**
* If the function returns an error, split the batch in two and retry.
*
* @default false
*/
readonly bisectBatchOnError?: boolean;
/**
* An Amazon SQS queue or Amazon SNS topic destination for discarded records.
*
* @default discarded records are ignored
*/
readonly onFailure?: IEventSourceDlq;
/**
* Set to false to disable the event source upon creation.
*
* @default true
*/
readonly enabled?: boolean;
/**
* The position in the DynamoDB, Kinesis or MSK stream where AWS Lambda should
* start reading.
*
* @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType
*
* @default - no starting position
*/
readonly startingPosition?: StartingPosition;
/**
* The time from which to start reading, in Unix time seconds.
*
* @default - no timestamp
*/
readonly startingPositionTimestamp?: number;
/**
* Allow functions to return partially successful responses for a batch of records.
*
* @see https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting
*
* @default false
*/
readonly reportBatchItemFailures?: boolean;
/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* @default Duration.seconds(0)
*/
readonly maxBatchingWindow?: cdk.Duration;
/**
* The maximum concurrency setting limits the number of concurrent instances of the function that an Amazon SQS event source can invoke.
*
* @see https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency
*
* Valid Range: Minimum value of 2. Maximum value of 1000.
*
* @default - No specific limit.
*/
readonly maxConcurrency?: number;
/**
* The maximum age of a record that Lambda sends to a function for processing.
* Valid Range:
* * Minimum value of 60 seconds
* * Maximum value of 7 days
*
* @default - infinite or until the record expires.
*/
readonly maxRecordAge?: cdk.Duration;
/**
* The maximum number of times to retry when the function returns an error.
* Set to `undefined` if you want lambda to keep retrying infinitely or until
* the record expires.
*
* Valid Range:
* * Minimum value of 0
* * Maximum value of 10000
*
* @default - infinite or until the record expires.
*/
readonly retryAttempts?: number;
/**
* The number of batches to process from each shard concurrently.
* Valid Range:
* * Minimum value of 1
* * Maximum value of 10
*
* @default 1
*/
readonly parallelizationFactor?: number;
/**
* The name of the Kafka topic.
*
* @default - no topic
*/
readonly kafkaTopic?: string;
/**
* The size of the tumbling windows to group records sent to DynamoDB or Kinesis
*
* @see https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
*
* Valid Range: 0 - 15 minutes
*
* @default - None
*/
readonly tumblingWindow?: cdk.Duration;
/**
* A list of host and port pairs that are the addresses of the Kafka brokers in a self managed "bootstrap" Kafka cluster
* that a Kafka client connects to initially to bootstrap itself.
* They are in the format `abc.example.com:9096`.
*
* @default - none
*/
readonly kafkaBootstrapServers?: string[]
/**
* The identifier for the Kafka consumer group to join. The consumer group ID must be unique among all your Kafka event sources. After creating a Kafka event source mapping with the consumer group ID specified, you cannot update this value. The value must have a lenght between 1 and 200 and full the pattern '[a-zA-Z0-9-\/*:_+=.@-]*'. For more information, see [Customizable consumer group ID](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html#services-msk-consumer-group-id).
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-amazonmanagedkafkaeventsourceconfig.html
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-selfmanagedkafkaeventsourceconfig.html
*
* @default - none
*/
readonly kafkaConsumerGroupId?: string
/**
* Specific settings like the authentication protocol or the VPC components to secure access to your event source.
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html
*
* @default - none
*/
readonly sourceAccessConfigurations?: SourceAccessConfiguration[]
/**
* Add filter criteria to Event Source
* @see https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html
*
* @default - none
*/
readonly filters?: Array<{[key: string]: any}>
}
/**
* Properties for declaring a new event source mapping.
*/
export interface EventSourceMappingProps extends EventSourceMappingOptions {
/**
* The target AWS Lambda function.
*/
readonly target: IFunction;
}
/**
* Represents an event source mapping for a lambda function.
* @see https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html
*/
export interface IEventSourceMapping extends cdk.IResource {
/**
* The identifier for this EventSourceMapping
* @attribute
*/
readonly eventSourceMappingId: string;
/**
* The ARN of the event source mapping (i.e. arn:aws:lambda:region:account-id:event-source-mapping/event-source-mapping-id)
*/
readonly eventSourceMappingArn: string;
}
/**
* Defines a Lambda EventSourceMapping resource.
*
* Usually, you won't need to define the mapping yourself. This will usually be done by
* event sources. For example, to add an SQS event source to a function:
*
* import { SqsEventSource } from '@aws-cdk/aws-lambda-event-sources';
* lambda.addEventSource(new SqsEventSource(sqs));
*
* The `SqsEventSource` class will automatically create the mapping, and will also
* modify the Lambda's execution role so it can consume messages from the queue.
*/
export class EventSourceMapping extends cdk.Resource implements IEventSourceMapping {
/**
* Import an event source into this stack from its event source id.
*/
public static fromEventSourceMappingId(scope: Construct, id: string, eventSourceMappingId: string): IEventSourceMapping {
const eventSourceMappingArn = EventSourceMapping.formatArn(scope,
eventSourceMappingId,
);
class Import extends cdk.Resource implements IEventSourceMapping {
public readonly eventSourceMappingId = eventSourceMappingId;
public readonly eventSourceMappingArn = eventSourceMappingArn;
}
return new Import(scope, id);
}
private static formatArn(scope: Construct, eventSourceMappingId: string): string {
return cdk.Stack.of(scope).formatArn({
service: 'lambda',
resource: 'event-source-mapping',
resourceName: eventSourceMappingId,
arnFormat: cdk.ArnFormat.COLON_RESOURCE_NAME,
});
}
public readonly eventSourceMappingId: string;
public readonly eventSourceMappingArn: string;
constructor(scope: Construct, id: string, props: EventSourceMappingProps) {
super(scope, id);
if (props.eventSourceArn == undefined && props.kafkaBootstrapServers == undefined) {
throw new Error('Either eventSourceArn or kafkaBootstrapServers must be set');
}
if (props.eventSourceArn !== undefined && props.kafkaBootstrapServers !== undefined) {
throw new Error('eventSourceArn and kafkaBootstrapServers are mutually exclusive');
}
if (props.kafkaBootstrapServers && (props.kafkaBootstrapServers?.length < 1)) {
throw new Error('kafkaBootStrapServers must not be empty if set');
}
if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) {
throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`);
}
if (props.maxConcurrency && (props.maxConcurrency < 2 || props.maxConcurrency > 1000)) {
throw new Error('maxConcurrency must be between 2 and 1000 concurrent instances');
}
if (props.maxRecordAge && (props.maxRecordAge.toSeconds() < 60 || props.maxRecordAge.toDays({ integral: false }) > 7)) {
throw new Error('maxRecordAge must be between 60 seconds and 7 days inclusive');
}
props.retryAttempts !== undefined && cdk.withResolved(props.retryAttempts, (attempts) => {
if (attempts < 0 || attempts > 10000) {
throw new Error(`retryAttempts must be between 0 and 10000 inclusive, got ${attempts}`);
}
});
props.parallelizationFactor !== undefined && cdk.withResolved(props.parallelizationFactor, (factor) => {
if (factor < 1 || factor > 10) {
throw new Error(`parallelizationFactor must be between 1 and 10 inclusive, got ${factor}`);
}
});
if (props.tumblingWindow && !cdk.Token.isUnresolved(props.tumblingWindow) && props.tumblingWindow.toSeconds() > 900) {
throw new Error(`tumblingWindow cannot be over 900 seconds, got ${props.tumblingWindow.toSeconds()}`);
}
if (props.startingPosition === StartingPosition.AT_TIMESTAMP && !props.startingPositionTimestamp) {
throw new Error('startingPositionTimestamp must be provided when startingPosition is AT_TIMESTAMP');
}
if (props.startingPosition !== StartingPosition.AT_TIMESTAMP && props.startingPositionTimestamp) {
throw new Error('startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP');
}
if (props.kafkaConsumerGroupId) {
this.validateKafkaConsumerGroupIdOrThrow(props.kafkaConsumerGroupId);
}
let destinationConfig;
if (props.onFailure) {
destinationConfig = {
onFailure: props.onFailure.bind(this, props.target),
};
}
let selfManagedEventSource;
if (props.kafkaBootstrapServers) {
selfManagedEventSource = { endpoints: { kafkaBootstrapServers: props.kafkaBootstrapServers } };
}
let consumerGroupConfig = props.kafkaConsumerGroupId ? { consumerGroupId: props.kafkaConsumerGroupId } : undefined;
const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
bisectBatchOnFunctionError: props.bisectBatchOnError,
destinationConfig,
enabled: props.enabled,
eventSourceArn: props.eventSourceArn,
functionName: props.target.functionName,
startingPosition: props.startingPosition,
startingPositionTimestamp: props.startingPositionTimestamp,
functionResponseTypes: props.reportBatchItemFailures ? ['ReportBatchItemFailures'] : undefined,
maximumBatchingWindowInSeconds: props.maxBatchingWindow?.toSeconds(),
maximumRecordAgeInSeconds: props.maxRecordAge?.toSeconds(),
maximumRetryAttempts: props.retryAttempts,
parallelizationFactor: props.parallelizationFactor,
topics: props.kafkaTopic !== undefined ? [props.kafkaTopic] : undefined,
tumblingWindowInSeconds: props.tumblingWindow?.toSeconds(),
scalingConfig: props.maxConcurrency ? { maximumConcurrency: props.maxConcurrency } : undefined,
sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}),
selfManagedEventSource,
filterCriteria: props.filters ? { filters: props.filters }: undefined,
selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined,
amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined,
});
this.eventSourceMappingId = cfnEventSourceMapping.ref;
this.eventSourceMappingArn = EventSourceMapping.formatArn(this, this.eventSourceMappingId);
}
private validateKafkaConsumerGroupIdOrThrow(kafkaConsumerGroupId: string) {
if (cdk.Token.isUnresolved(kafkaConsumerGroupId)) {
return;
}
if (kafkaConsumerGroupId.length > 200 || kafkaConsumerGroupId.length < 1) {
throw new Error('kafkaConsumerGroupId must be a valid string between 1 and 200 characters');
}
const regex = new RegExp(/[a-zA-Z0-9-\/*:_+=.@-]*/);
const patternMatch = regex.exec(kafkaConsumerGroupId);
if (patternMatch === null || patternMatch[0] !== kafkaConsumerGroupId) {
throw new Error('kafkaConsumerGroupId contains invalid characters. Allowed values are "[a-zA-Z0-9-\/*:_+=.@-]"');
}
}
}
/**
* The position in the DynamoDB, Kinesis or MSK stream where AWS Lambda should start
* reading.
*/
export enum StartingPosition {
/**
* Start reading at the last untrimmed record in the shard in the system,
* which is the oldest data record in the shard.
*/
TRIM_HORIZON = 'TRIM_HORIZON',
/**
* Start reading just after the most recent record in the shard, so that you
* always read the most recent data in the shard
*/
LATEST = 'LATEST',
/**
* Start reading from a position defined by a time stamp.
* Only supported for Amazon Kinesis streams, otherwise an error will occur.
* If supplied, `startingPositionTimestamp` must also be set.
*/
AT_TIMESTAMP = 'AT_TIMESTAMP',
}