Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start/stop Kinesis Analytics Flink application and add application snapshot resource #18056

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e66e32d
r/aws_kinesisanalyticsv2_application: Return 'NotFoundError' instead …
ewbankkit Mar 4, 2021
63f9738
r/aws_kinesisanalyticsv2_application: Add 'start_application' attribute.
ewbankkit Mar 5, 2021
f62f2a6
Wait for correct status when updating running application.
ewbankkit Mar 5, 2021
ae0788a
r/aws_kinesisanalyticsv2_application: Test starting SQL application o…
ewbankkit Mar 5, 2021
94036cd
r/aws_kinesisanalyticsv2_application: Test starting and stopping SQL …
ewbankkit Mar 7, 2021
ef6c528
r/aws_kinesisanalyticsv2_application: Test update running SQL applica…
ewbankkit Mar 7, 2021
9bd5c8e
r/aws_kinesisanalyticsv2_application: Add 'run_configuration' attribute.
ewbankkit Mar 7, 2021
257b86d
Remove 'Computed' from 'run_configuration'.
ewbankkit Mar 8, 2021
35690c2
r/aws_kinesisanalyticsv2_application: Add 'expandKinesisAnalyticsV2St…
ewbankkit Mar 9, 2021
0d2a6ea
r/aws_kinesisanalyticsv2_application: Set 'create_timestamp' in Creat…
ewbankkit Mar 10, 2021
eb4deab
r/aws_kinesisanalyticsv2_application: Add 'TestAccAWSKinesisAnalytics…
ewbankkit Mar 11, 2021
0727fe8
r/aws_kinesisanalyticsv2_application: Add 'TestAccAWSKinesisAnalytics…
ewbankkit Mar 11, 2021
566df65
Add CHANGELOG entry.
ewbankkit Mar 11, 2021
df30646
Add 'aws_kinesisanalyticsv2_application_snapshot' resource.
ewbankkit Mar 11, 2021
9c363c4
r/aws_kinesisanalyticsv2_application_snapshot: Add documentation.
ewbankkit Mar 11, 2021
769278a
r/aws_kinesisanalyticsv2_application_snapshot: Add tests.
ewbankkit Mar 11, 2021
478a395
Correct documentation file name.
ewbankkit Mar 12, 2021
9466da1
r/aws_kinesisanalyticsv2_application_snapshot: First passing acceptan…
ewbankkit Mar 12, 2021
6789b5b
r/aws_kinesisanalyticsv2_application_snapshot: Input and output strea…
ewbankkit Mar 12, 2021
02d99af
Move 'aws-kinesis-analytics-java-apps-1.0.jar' to 'flink-app.jar'.
ewbankkit Mar 12, 2021
8d6d7bf
r/aws_kinesisanalyticsv2_application: Add 'testAccKinesisAnalyticsV2A…
ewbankkit Mar 12, 2021
4e8fccc
Add new resource 'aws_kinesisanalyticsv2_application_snapshot' to the…
ewbankkit Mar 12, 2021
2cb6eaf
'testAccKinesisAnalyticsV2ApplicationConfigBaseSnapshotableFlinkAppli…
ewbankkit Mar 12, 2021
47dfab2
Revert "'testAccKinesisAnalyticsV2ApplicationConfigBaseSnapshotableFl…
ewbankkit Mar 12, 2021
f149e59
r/aws_kinesisanalyticsv2_application: Add 'TestAccAWSKinesisAnalytics…
ewbankkit Mar 14, 2021
6a81bae
r/aws_kinesisanalyticsv2_application: Add 'force_stop' attribute.
ewbankkit Mar 14, 2021
39e42ff
Fix 'XAT001: missing ErrorCheck' errors.
ewbankkit Mar 20, 2021
1059afc
Fix 'error checking file contents: example section code block languag…
ewbankkit Mar 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .changelog/18056.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
```release-note:enhancement
resource/aws_kinesisanalyticsv2_application: Add `start_application` attribute
```

```release-note:enhancement
resource/aws_kinesisanalyticsv2_application: `starting_position_configuration` can be specified when starting a SQL application
```

```release-note:enhancement
resource/aws_kinesisanalyticsv2_application: Add `run_configuration` attribute for starting a Flink application
```

```release-note:enhancement
resource/aws_kinesisanalyticsv2_application: Add `force_stop` attribute
```

```release-note:new-resource
aws_kinesisanalyticsv2_application_snapshot
```
72 changes: 70 additions & 2 deletions aws/internal/service/kinesisanalyticsv2/finder/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,86 @@ package finder
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesisanalyticsv2"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

// ApplicationByName returns the application corresponding to the specified name.
func ApplicationByName(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
// ApplicationDetailByName returns the application corresponding to the specified name.
// Returns NotFoundError if no application is found.
func ApplicationDetailByName(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
input := &kinesisanalyticsv2.DescribeApplicationInput{
ApplicationName: aws.String(name),
}

return ApplicationDetail(conn, input)
}

// ApplicationDetail returns the application details corresponding to the specified input.
// Returns NotFoundError if no application is found.
func ApplicationDetail(conn *kinesisanalyticsv2.KinesisAnalyticsV2, input *kinesisanalyticsv2.DescribeApplicationInput) (*kinesisanalyticsv2.ApplicationDetail, error) {
output, err := conn.DescribeApplication(input)

if tfawserr.ErrCodeEquals(err, kinesisanalyticsv2.ErrCodeResourceNotFoundException) {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil || output.ApplicationDetail == nil {
return nil, &resource.NotFoundError{
Message: "Empty result",
LastRequest: input,
}
}

return output.ApplicationDetail, nil
}

// SnapshotDetailsByApplicationAndSnapshotNames returns the application snapshot details corresponding to the specified application and snapshot names.
// Returns NotFoundError if no application snapshot is found.
func SnapshotDetailsByApplicationAndSnapshotNames(conn *kinesisanalyticsv2.KinesisAnalyticsV2, applicationName, snapshotName string) (*kinesisanalyticsv2.SnapshotDetails, error) {
input := &kinesisanalyticsv2.DescribeApplicationSnapshotInput{
ApplicationName: aws.String(applicationName),
SnapshotName: aws.String(snapshotName),
}

return SnapshotDetails(conn, input)
}

// SnapshotDetails returns the application snapshot details corresponding to the specified input.
// Returns NotFoundError if no application snapshot is found.
func SnapshotDetails(conn *kinesisanalyticsv2.KinesisAnalyticsV2, input *kinesisanalyticsv2.DescribeApplicationSnapshotInput) (*kinesisanalyticsv2.SnapshotDetails, error) {
output, err := conn.DescribeApplicationSnapshot(input)

if tfawserr.ErrCodeEquals(err, kinesisanalyticsv2.ErrCodeResourceNotFoundException) {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if tfawserr.ErrMessageContains(err, kinesisanalyticsv2.ErrCodeInvalidArgumentException, "does not exist") {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil || output.SnapshotDetails == nil {
return nil, &resource.NotFoundError{
Message: "Empty result",
LastRequest: input,
}
}

return output.SnapshotDetails, nil
}
25 changes: 25 additions & 0 deletions aws/internal/service/kinesisanalyticsv2/id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package kinesisanalyticsv2

import (
"fmt"
"strings"
)

const applicationSnapshotIDSeparator = "/"

func ApplicationSnapshotCreateID(applicationName, snapshotName string) string {
parts := []string{applicationName, snapshotName}
id := strings.Join(parts, applicationSnapshotIDSeparator)

return id
}

func ApplicationSnapshotParseID(id string) (string, string, error) {
parts := strings.Split(id, applicationSnapshotIDSeparator)

if len(parts) == 2 && parts[0] != "" && parts[1] != "" {
return parts[0], parts[1], nil
}

return "", "", fmt.Errorf("unexpected format for ID (%q), expected application-name%ssnapshot-name", id, applicationSnapshotIDSeparator)
}
36 changes: 24 additions & 12 deletions aws/internal/service/kinesisanalyticsv2/waiter/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,41 @@ package waiter
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesisanalyticsv2"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesisanalyticsv2/finder"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

const (
applicationStatusNotFound = "NotFound"
applicationStatusUnknown = "Unknown"
)

// ApplicationStatus fetches the Application and its Status
// ApplicationStatus fetches the ApplicationDetail and its Status
func ApplicationStatus(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
application, err := finder.ApplicationByName(conn, name)
applicationDetail, err := finder.ApplicationDetailByName(conn, name)

if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, "", err
}

return applicationDetail, aws.StringValue(applicationDetail.ApplicationStatus), nil
}
}

// SnapshotDetailsStatus fetches the SnapshotDetails and its Status
func SnapshotDetailsStatus(conn *kinesisanalyticsv2.KinesisAnalyticsV2, applicationName, snapshotName string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
snapshotDetails, err := finder.SnapshotDetailsByApplicationAndSnapshotNames(conn, applicationName, snapshotName)

if tfawserr.ErrCodeEquals(err, kinesisanalyticsv2.ErrCodeResourceNotFoundException) {
return nil, applicationStatusNotFound, nil
if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, applicationStatusUnknown, err
return nil, "", err
}

return application, aws.StringValue(application.ApplicationStatus), nil
return snapshotDetails, aws.StringValue(snapshotDetails.SnapshotStatus), nil
}
}
104 changes: 102 additions & 2 deletions aws/internal/service/kinesisanalyticsv2/waiter/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,77 @@ import (
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

const (
ApplicationDeletedTimeout = 5 * time.Minute
ApplicationStartedTimeout = 5 * time.Minute
ApplicationStoppedTimeout = 5 * time.Minute
ApplicationUpdatedTimeout = 5 * time.Minute

SnapshotCreatedTimeout = 5 * time.Minute
SnapshotDeletedTimeout = 5 * time.Minute
)

// ApplicationDeleted waits for an Application to return Deleted
func ApplicationDeleted(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string, timeout time.Duration) (*kinesisanalyticsv2.ApplicationDetail, error) {
func ApplicationDeleted(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.ApplicationStatusDeleting},
Target: []string{},
Refresh: ApplicationStatus(conn, name),
Timeout: timeout,
Timeout: ApplicationDeletedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesisanalyticsv2.ApplicationDetail); ok {
return v, err
}

return nil, err
}

// ApplicationStarted waits for an Application to start
func ApplicationStarted(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.ApplicationStatusStarting},
Target: []string{kinesisanalyticsv2.ApplicationStatusRunning},
Refresh: ApplicationStatus(conn, name),
Timeout: ApplicationStartedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesisanalyticsv2.ApplicationDetail); ok {
return v, err
}

return nil, err
}

// ApplicationStopped waits for an Application to stop
func ApplicationStopped(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.ApplicationStatusForceStopping, kinesisanalyticsv2.ApplicationStatusStopping},
Target: []string{kinesisanalyticsv2.ApplicationStatusReady},
Refresh: ApplicationStatus(conn, name),
Timeout: ApplicationStoppedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesisanalyticsv2.ApplicationDetail); ok {
return v, err
}

return nil, err
}

// ApplicationUpdated waits for an Application to return Deleted
func ApplicationUpdated(conn *kinesisanalyticsv2.KinesisAnalyticsV2, name string) (*kinesisanalyticsv2.ApplicationDetail, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.ApplicationStatusUpdating},
Target: []string{kinesisanalyticsv2.ApplicationStatusReady, kinesisanalyticsv2.ApplicationStatusRunning},
Refresh: ApplicationStatus(conn, name),
Timeout: ApplicationUpdatedTimeout,
}

outputRaw, err := stateConf.WaitForState()
Expand Down Expand Up @@ -75,3 +139,39 @@ func IAMPropagation(f func() (interface{}, error)) (interface{}, error) {

return output, nil
}

// SnapshotCreated waits for a Snapshot to return Created
func SnapshotCreated(conn *kinesisanalyticsv2.KinesisAnalyticsV2, applicationName, snapshotName string) (*kinesisanalyticsv2.SnapshotDetails, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.SnapshotStatusCreating},
Target: []string{kinesisanalyticsv2.SnapshotStatusReady},
Refresh: SnapshotDetailsStatus(conn, applicationName, snapshotName),
Timeout: SnapshotCreatedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesisanalyticsv2.SnapshotDetails); ok {
return v, err
}

return nil, err
}

// SnapshotDeleted waits for a Snapshot to return Deleted
func SnapshotDeleted(conn *kinesisanalyticsv2.KinesisAnalyticsV2, applicationName, snapshotName string) (*kinesisanalyticsv2.SnapshotDetails, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesisanalyticsv2.SnapshotStatusDeleting},
Target: []string{},
Refresh: SnapshotDetailsStatus(conn, applicationName, snapshotName),
Timeout: SnapshotDeletedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesisanalyticsv2.SnapshotDetails); ok {
return v, err
}

return nil, err
}
1 change: 1 addition & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ func Provider() *schema.Provider {
"aws_key_pair": resourceAwsKeyPair(),
"aws_kinesis_analytics_application": resourceAwsKinesisAnalyticsApplication(),
"aws_kinesisanalyticsv2_application": resourceAwsKinesisAnalyticsV2Application(),
"aws_kinesisanalyticsv2_application_snapshot": resourceAwsKinesisAnalyticsV2ApplicationSnapshot(),
"aws_kinesis_firehose_delivery_stream": resourceAwsKinesisFirehoseDeliveryStream(),
"aws_kinesis_stream": resourceAwsKinesisStream(),
"aws_kinesis_stream_consumer": resourceAwsKinesisStreamConsumer(),
Expand Down
Loading