diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/README.md b/packages/@aws-cdk/aws-stepfunctions-tasks/README.md index 063d058aab8c9..521052e865b0b 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/README.md +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/README.md @@ -644,6 +644,18 @@ new tasks.EmrCreateCluster(this, 'Create Cluster', { }); ``` +If you want to run multiple steps in [parallel](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-concurrent-steps.html), +you can specify the `stepConcurrencyLevel` property. The concurrency range is between 1 +and 256 inclusive, where the default concurrency of 1 means no step concurrency is allowed. +`stepConcurrencyLevel` requires the EMR release label to be 5.28.0 or above. + +```ts +new tasks.EmrCreateCluster(this, 'Create Cluster', { + // ... + stepConcurrencyLevel: 10, +}); +``` + ### Termination Protection Locks a cluster (job flow) so the EC2 instances in the cluster cannot be diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/emr/emr-create-cluster.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/emr/emr-create-cluster.ts index bfb3d94e28b5c..4cac7c7180bde 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/lib/emr/emr-create-cluster.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/lib/emr/emr-create-cluster.ts @@ -130,6 +130,16 @@ export interface EmrCreateClusterProps extends sfn.TaskStateBaseProps { */ readonly securityConfiguration?: string; + /** + * Specifies the step concurrency level to allow multiple steps to run in parallel + * + * Requires EMR release label 5.28.0 or above. + * Must be in range [1, 256]. + * + * @default 1 - no step concurrency allowed + */ + readonly stepConcurrencyLevel?: number; + /** * A list of tags to associate with a cluster and propagate to Amazon EC2 instances. * @@ -191,6 +201,22 @@ export class EmrCreateCluster extends sfn.TaskStateBase { } this.taskPolicies = this.createPolicyStatements(this._serviceRole, this._clusterRole, this._autoScalingRole); + + if (this.props.releaseLabel !== undefined) { + this.validateReleaseLabel(this.props.releaseLabel); + } + + if (this.props.stepConcurrencyLevel !== undefined) { + if (this.props.stepConcurrencyLevel < 1 || this.props.stepConcurrencyLevel > 256) { + throw new Error(`Step concurrency level must be in range [1, 256], but got ${this.props.stepConcurrencyLevel}.`); + } + if (this.props.releaseLabel && this.props.stepConcurrencyLevel !== 1) { + const [major, minor] = this.props.releaseLabel.substr(4).split('.'); + if (Number(major) < 5 || (Number(major) === 5 && Number(minor) < 28)) { + throw new Error(`Step concurrency is only supported in EMR release version 5.28.0 and above but got ${this.props.releaseLabel}.`); + } + } + } } /** @@ -252,6 +278,7 @@ export class EmrCreateCluster extends sfn.TaskStateBase { ReleaseLabel: cdk.stringToCloudFormation(this.props.releaseLabel), ScaleDownBehavior: cdk.stringToCloudFormation(this.props.scaleDownBehavior?.valueOf()), SecurityConfiguration: cdk.stringToCloudFormation(this.props.securityConfiguration), + StepConcurrencyLevel: cdk.numberToCloudFormation(this.props.stepConcurrencyLevel), ...(this.props.tags ? this.renderTags(this.props.tags) : undefined), VisibleToAllUsers: cdk.booleanToCloudFormation(this.visibleToAllUsers), }), @@ -356,6 +383,25 @@ export class EmrCreateCluster extends sfn.TaskStateBase { return role; } + + /** + * Validates the release label string is in proper format. + * Release labels are in the form `emr-x.x.x`. For example, `emr-5.33.0`. + * + * @see https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-components.html + */ + private validateReleaseLabel(releaseLabel: string): string { + const prefix = releaseLabel.substr(0, 4); + const versions = releaseLabel.substr(4).split('.'); + if (prefix !== 'emr-' || versions.length !== 3 || versions.some((e) => isNotANumber(e))) { + throw new Error(`The release label must be in the format 'emr-x.x.x' but got ${releaseLabel}`); + } + return releaseLabel; + + function isNotANumber(value: string): boolean { + return value === '' || isNaN(Number(value)); + } + } } export namespace EmrCreateCluster { diff --git a/packages/@aws-cdk/aws-stepfunctions-tasks/test/emr/emr-create-cluster.test.ts b/packages/@aws-cdk/aws-stepfunctions-tasks/test/emr/emr-create-cluster.test.ts index 273a580a6a38e..3effd99e5eed0 100644 --- a/packages/@aws-cdk/aws-stepfunctions-tasks/test/emr/emr-create-cluster.test.ts +++ b/packages/@aws-cdk/aws-stepfunctions-tasks/test/emr/emr-create-cluster.test.ts @@ -172,6 +172,112 @@ test('Create Cluster with clusterConfiguration Name from payload', () => { }); }); +describe('Cluster with StepConcurrencyLevel', () => { + test('can be specified', async () => { + // WHEN + const task = new EmrCreateCluster(stack, 'Task', { + instances: {}, + clusterRole, + name: 'Cluster', + serviceRole, + autoScalingRole, + stepConcurrencyLevel: 2, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + }); + + // THEN + expect(stack.resolve(task.toStateJson())).toMatchObject({ + Parameters: { + Name: 'Cluster', + StepConcurrencyLevel: 2, + }, + }); + }); + + test('throws if < 1 or > 256', async () => { + expect(() => new EmrCreateCluster(stack, 'Task1', { + instances: {}, + clusterRole, + name: 'Cluster', + serviceRole, + autoScalingRole, + stepConcurrencyLevel: 0, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + })).toThrow('Step concurrency level must be in range [1, 256], but got 0.'); + + expect(() => new EmrCreateCluster(stack, 'Task2', { + instances: {}, + clusterRole, + name: 'Cluster', + serviceRole, + autoScalingRole, + stepConcurrencyLevel: 257, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + })).toThrow('Step concurrency level must be in range [1, 256], but got 257.'); + }); + + test('throws if EMR release label below 5.28 and StepConcurrencyLevel !== 1', async () => { + expect(() => new EmrCreateCluster(stack, 'Task2', { + instances: {}, + clusterRole, + name: 'Cluster', + serviceRole, + autoScalingRole, + releaseLabel: 'emr-5.14.0', + stepConcurrencyLevel: 2, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + })).toThrow('Step concurrency is only supported in EMR release version 5.28.0 and above but got emr-5.14.0.'); + }); + + test('does not throw if EMR release label below 5.28 and StepConcurrencyLevel === 1', async () => { + new EmrCreateCluster(stack, 'Task1', { + instances: {}, + clusterRole, + name: 'Cluster', + serviceRole, + autoScalingRole, + releaseLabel: 'emr-5.14.0', + stepConcurrencyLevel: 1, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + }); + }); +}); + +test('Cluster with invalid release label will throw', async () => { + expect(() => new EmrCreateCluster(stack, 'Task1', { + instances: {}, + clusterRole, + name: 'Cluster', + serviceRole, + autoScalingRole, + releaseLabel: 'emra-5.14.0', + stepConcurrencyLevel: 1, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + })).toThrow('The release label must be in the format \'emr-x.x.x\' but got emra-5.14.0'); + + expect(() => new EmrCreateCluster(stack, 'Task2', { + instances: {}, + clusterRole, + name: 'Cluster', + serviceRole, + autoScalingRole, + releaseLabel: 'emr-5.14.a', + stepConcurrencyLevel: 1, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + })).toThrow('The release label must be in the format \'emr-x.x.x\' but got emr-5.14.a'); + + expect(() => new EmrCreateCluster(stack, 'Task3', { + instances: {}, + clusterRole, + name: 'Cluster', + serviceRole, + autoScalingRole, + releaseLabel: 'emr-5.14.0.0', + stepConcurrencyLevel: 1, + integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE, + })).toThrow('The release label must be in the format \'emr-x.x.x\' but got emr-5.14.0.0'); +}); + test('Create Cluster with Tags', () => { // WHEN const task = new EmrCreateCluster(stack, 'Task', {