Skip to content

Commit

Permalink
feat(stepfunctions-tasks): add step concurrency level to EmrCreateClu…
Browse files Browse the repository at this point in the history
…ster (#15242)

Added support for step concurrency when creating EMR clusters through Step Functions. This feature allows users to run multiple steps in parallel on a cluster created through SFN.

closes #15223.

As a byproduct, adds validation for `releaseLabel` to ensure that it follows the correct format laid out [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-components.html).

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
MousaZeidBaker authored and njlynch committed Oct 11, 2021
1 parent cdf4dd5 commit 0c1ef51
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 0 deletions.
12 changes: 12 additions & 0 deletions packages/@aws-cdk/aws-stepfunctions-tasks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,18 @@ new tasks.EmrCreateCluster(this, 'Create Cluster', {
});
```

If you want to run multiple steps in [parallel](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-concurrent-steps.html),
you can specify the `stepConcurrencyLevel` property. The concurrency range is between 1
and 256 inclusive, where the default concurrency of 1 means no step concurrency is allowed.
`stepConcurrencyLevel` requires the EMR release label to be 5.28.0 or above.

```ts
new tasks.EmrCreateCluster(this, 'Create Cluster', {
// ...
stepConcurrencyLevel: 10,
});
```

### Termination Protection

Locks a cluster (job flow) so the EC2 instances in the cluster cannot be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ export interface EmrCreateClusterProps extends sfn.TaskStateBaseProps {
*/
readonly securityConfiguration?: string;

/**
* Specifies the step concurrency level to allow multiple steps to run in parallel
*
* Requires EMR release label 5.28.0 or above.
* Must be in range [1, 256].
*
* @default 1 - no step concurrency allowed
*/
readonly stepConcurrencyLevel?: number;

/**
* A list of tags to associate with a cluster and propagate to Amazon EC2 instances.
*
Expand Down Expand Up @@ -191,6 +201,22 @@ export class EmrCreateCluster extends sfn.TaskStateBase {
}

this.taskPolicies = this.createPolicyStatements(this._serviceRole, this._clusterRole, this._autoScalingRole);

if (this.props.releaseLabel !== undefined) {
this.validateReleaseLabel(this.props.releaseLabel);
}

if (this.props.stepConcurrencyLevel !== undefined) {
if (this.props.stepConcurrencyLevel < 1 || this.props.stepConcurrencyLevel > 256) {
throw new Error(`Step concurrency level must be in range [1, 256], but got ${this.props.stepConcurrencyLevel}.`);
}
if (this.props.releaseLabel && this.props.stepConcurrencyLevel !== 1) {
const [major, minor] = this.props.releaseLabel.substr(4).split('.');
if (Number(major) < 5 || (Number(major) === 5 && Number(minor) < 28)) {
throw new Error(`Step concurrency is only supported in EMR release version 5.28.0 and above but got ${this.props.releaseLabel}.`);
}
}
}
}

/**
Expand Down Expand Up @@ -252,6 +278,7 @@ export class EmrCreateCluster extends sfn.TaskStateBase {
ReleaseLabel: cdk.stringToCloudFormation(this.props.releaseLabel),
ScaleDownBehavior: cdk.stringToCloudFormation(this.props.scaleDownBehavior?.valueOf()),
SecurityConfiguration: cdk.stringToCloudFormation(this.props.securityConfiguration),
StepConcurrencyLevel: cdk.numberToCloudFormation(this.props.stepConcurrencyLevel),
...(this.props.tags ? this.renderTags(this.props.tags) : undefined),
VisibleToAllUsers: cdk.booleanToCloudFormation(this.visibleToAllUsers),
}),
Expand Down Expand Up @@ -356,6 +383,25 @@ export class EmrCreateCluster extends sfn.TaskStateBase {

return role;
}

/**
* Validates the release label string is in proper format.
* Release labels are in the form `emr-x.x.x`. For example, `emr-5.33.0`.
*
* @see https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-components.html
*/
private validateReleaseLabel(releaseLabel: string): string {
const prefix = releaseLabel.substr(0, 4);
const versions = releaseLabel.substr(4).split('.');
if (prefix !== 'emr-' || versions.length !== 3 || versions.some((e) => isNotANumber(e))) {
throw new Error(`The release label must be in the format 'emr-x.x.x' but got ${releaseLabel}`);
}
return releaseLabel;

function isNotANumber(value: string): boolean {
return value === '' || isNaN(Number(value));
}
}
}

export namespace EmrCreateCluster {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,112 @@ test('Create Cluster with clusterConfiguration Name from payload', () => {
});
});

describe('Cluster with StepConcurrencyLevel', () => {
test('can be specified', async () => {
// WHEN
const task = new EmrCreateCluster(stack, 'Task', {
instances: {},
clusterRole,
name: 'Cluster',
serviceRole,
autoScalingRole,
stepConcurrencyLevel: 2,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
});

// THEN
expect(stack.resolve(task.toStateJson())).toMatchObject({
Parameters: {
Name: 'Cluster',
StepConcurrencyLevel: 2,
},
});
});

test('throws if < 1 or > 256', async () => {
expect(() => new EmrCreateCluster(stack, 'Task1', {
instances: {},
clusterRole,
name: 'Cluster',
serviceRole,
autoScalingRole,
stepConcurrencyLevel: 0,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
})).toThrow('Step concurrency level must be in range [1, 256], but got 0.');

expect(() => new EmrCreateCluster(stack, 'Task2', {
instances: {},
clusterRole,
name: 'Cluster',
serviceRole,
autoScalingRole,
stepConcurrencyLevel: 257,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
})).toThrow('Step concurrency level must be in range [1, 256], but got 257.');
});

test('throws if EMR release label below 5.28 and StepConcurrencyLevel !== 1', async () => {
expect(() => new EmrCreateCluster(stack, 'Task2', {
instances: {},
clusterRole,
name: 'Cluster',
serviceRole,
autoScalingRole,
releaseLabel: 'emr-5.14.0',
stepConcurrencyLevel: 2,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
})).toThrow('Step concurrency is only supported in EMR release version 5.28.0 and above but got emr-5.14.0.');
});

test('does not throw if EMR release label below 5.28 and StepConcurrencyLevel === 1', async () => {
new EmrCreateCluster(stack, 'Task1', {
instances: {},
clusterRole,
name: 'Cluster',
serviceRole,
autoScalingRole,
releaseLabel: 'emr-5.14.0',
stepConcurrencyLevel: 1,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
});
});
});

test('Cluster with invalid release label will throw', async () => {
expect(() => new EmrCreateCluster(stack, 'Task1', {
instances: {},
clusterRole,
name: 'Cluster',
serviceRole,
autoScalingRole,
releaseLabel: 'emra-5.14.0',
stepConcurrencyLevel: 1,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
})).toThrow('The release label must be in the format \'emr-x.x.x\' but got emra-5.14.0');

expect(() => new EmrCreateCluster(stack, 'Task2', {
instances: {},
clusterRole,
name: 'Cluster',
serviceRole,
autoScalingRole,
releaseLabel: 'emr-5.14.a',
stepConcurrencyLevel: 1,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
})).toThrow('The release label must be in the format \'emr-x.x.x\' but got emr-5.14.a');

expect(() => new EmrCreateCluster(stack, 'Task3', {
instances: {},
clusterRole,
name: 'Cluster',
serviceRole,
autoScalingRole,
releaseLabel: 'emr-5.14.0.0',
stepConcurrencyLevel: 1,
integrationPattern: sfn.IntegrationPattern.REQUEST_RESPONSE,
})).toThrow('The release label must be in the format \'emr-x.x.x\' but got emr-5.14.0.0');
});

test('Create Cluster with Tags', () => {
// WHEN
const task = new EmrCreateCluster(stack, 'Task', {
Expand Down

0 comments on commit 0c1ef51

Please sign in to comment.