Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler fails to stop LeaseCoordinator, causing integration tests to run in the background until max attempts to initialize LeaseCoordinator are exhausted #1385

Open
amitchidrewar1301 opened this issue Sep 27, 2024 · 0 comments

Comments

@amitchidrewar1301
Copy link

We are using version 2.6.0 (the latest) of the amazon-kinesis-client. During testing of the Kinesis Producer Library (KPL) and Kinesis Client Library (KCL) configuration, we encountered an issue where, even after the test passed successfully, the process continued running in the background.

The logs showed that the Scheduler was stuck in the initialize() phase, which caused it to continually try to create the LeaseCoordinator even though the Scheduler shutdown process had already begun. As a result, the LeaseCoordinator kept restarting. Since the test containers were already shut down after the assertion, the LeaseCoordinator threw ConnectionException errors and kept retrying until the maximum allowed attempts were reached.

In our integration test, we created the KCL_STREAM_LEASES table. When the test started, the scheduler created the lease table and added lease keys for four shard IDs. While the assertion was successful, the scheduler failed to shut down. Instead, after about 30 seconds, it attempted to clean up the leases and restart the LeaseCoordinator, which resulted in a loop of ConnectionException errors until the maximum retry attempts were exhausted.

Ideally, the LeaseCoordinator should not reinitialize after the assertion is complete. The scheduler should stop the LeaseCoordinator and shut down immediately.

PFB the logs to explain the scenario

2024-09-27T18:07:45.236+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Initializing LeaseCoordinator attempt 1
2024-09-27T18:07:45.247+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [           main] [                                                 ] c.g.m.a.kinesis.KplKclConfigurationTest  : Started KplKclConfigurationTest in 2.349 seconds (process running for 11.363)
2024-09-27T18:07:45.252+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:07:48.258+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:07:48.266+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:07:51.278+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:07:51.282+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:07:54.297+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:07:54.298+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:07:57.298+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Worker d43f4fde-ea0c-4ca4-b745-fcc7456bc51a is initiating the lease sync.
2024-09-27T18:07:57.299+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.c.PeriodicShardSyncManager         : Syncing Kinesis shard info for StreamConfig(streamIdentifier=msc.json.input, initialPositionInStreamExtended=InitialPositionInStreamExtended(position=TRIM_HORIZON, timestamp=null), consumerArn=null)
2024-09-27T18:07:57.315+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:07:57.319+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.kinesis.leases.KinesisShardDetector  : Stream msc.json.input: listing shards with list shards request ListShardsRequest(StreamName=msc.json.input, ShardFilter=ShardFilter(Type=AT_TRIM_HORIZON))
2024-09-27T18:07:57.412+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.leases.HierarchicalShardSyncer     : msc.json.input - Number of new leases to create: 4
2024-09-27T18:07:57.756+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.l.dynamodb.DynamoDBLeaseRefresher  : Created lease: Lease(leaseKey=shardId-000000000000, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0, endingHashKey=85070591730234615865843651857942052862))
2024-09-27T18:07:57.768+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.l.dynamodb.DynamoDBLeaseRefresher  : Created lease: Lease(leaseKey=shardId-000000000001, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=85070591730234615865843651857942052863, endingHashKey=170141183460469231731687303715884105725))
2024-09-27T18:07:57.777+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.l.dynamodb.DynamoDBLeaseRefresher  : Created lease: Lease(leaseKey=shardId-000000000002, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=170141183460469231731687303715884105726, endingHashKey=255211775190703847597530955573826158588))
2024-09-27T18:07:57.784+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.l.dynamodb.DynamoDBLeaseRefresher  : Created lease: Lease(leaseKey=shardId-000000000003, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=255211775190703847597530955573826158589, endingHashKey=340282366920938463463374607431768211455))
2024-09-27T18:07:57.784+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.leases.HierarchicalShardSyncer     : msc.json.input - Newly created leases 4: [Lease(leaseKey=shardId-000000000002, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=170141183460469231731687303715884105726, endingHashKey=255211775190703847597530955573826158588)), Lease(leaseKey=shardId-000000000001, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=85070591730234615865843651857942052863, endingHashKey=170141183460469231731687303715884105725)), Lease(leaseKey=shardId-000000000003, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=255211775190703847597530955573826158589, endingHashKey=340282366920938463463374607431768211455)), Lease(leaseKey=shardId-000000000000, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0, endingHashKey=85070591730234615865843651857942052862))]
2024-09-27T18:08:00.338+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:08:03.360+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:08:06.376+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:08:09.398+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Lease table is still empty. Checking again in 3000 ms
2024-09-27T18:08:12.403+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Worker 1f66631d-c886-41fe-b924-c7eff65259df is initiating the lease sync.
2024-09-27T18:08:12.404+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.c.PeriodicShardSyncManager         : Syncing Kinesis shard info for StreamConfig(streamIdentifier=msc.event.input, initialPositionInStreamExtended=InitialPositionInStreamExtended(position=TRIM_HORIZON, timestamp=null), consumerArn=null)
2024-09-27T18:08:12.422+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.kinesis.leases.KinesisShardDetector  : Stream msc.event.input: listing shards with list shards request ListShardsRequest(StreamName=msc.event.input, ShardFilter=ShardFilter(Type=AT_TRIM_HORIZON))
2024-09-27T18:08:12.506+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.leases.HierarchicalShardSyncer     : msc.event.input - Number of new leases to create: 4
2024-09-27T18:08:12.515+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.l.dynamodb.DynamoDBLeaseRefresher  : Created lease: Lease(leaseKey=shardId-000000000000, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0, endingHashKey=85070591730234615865843651857942052862))
2024-09-27T18:08:12.522+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.l.dynamodb.DynamoDBLeaseRefresher  : Created lease: Lease(leaseKey=shardId-000000000001, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=85070591730234615865843651857942052863, endingHashKey=170141183460469231731687303715884105725))
2024-09-27T18:08:12.529+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.l.dynamodb.DynamoDBLeaseRefresher  : Created lease: Lease(leaseKey=shardId-000000000002, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=170141183460469231731687303715884105726, endingHashKey=255211775190703847597530955573826158588))
2024-09-27T18:08:12.537+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.l.dynamodb.DynamoDBLeaseRefresher  : Created lease: Lease(leaseKey=shardId-000000000003, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=255211775190703847597530955573826158589, endingHashKey=340282366920938463463374607431768211455))
2024-09-27T18:08:12.537+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.k.leases.HierarchicalShardSyncer     : msc.event.input - Newly created leases 4: [Lease(leaseKey=shardId-000000000002, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=170141183460469231731687303715884105726, endingHashKey=255211775190703847597530955573826158588)), Lease(leaseKey=shardId-000000000001, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=85070591730234615865843651857942052863, endingHashKey=170141183460469231731687303715884105725)), Lease(leaseKey=shardId-000000000003, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=255211775190703847597530955573826158589, endingHashKey=340282366920938463463374607431768211455)), Lease(leaseKey=shardId-000000000000, leaseOwner=null, leaseCounter=0, concurrencyToken=null, lastCounterIncrementNanos=null, checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}, pendingCheckpoint=null, pendingCheckpointState=null, isMarkedForLeaseSteal=false, ownerSwitchesSinceCheckpoint=0, parentShardIds=[], childShardIds=[], hashKeyRangeForLease=HashKeyRangeForLease(startingHashKey=0, endingHashKey=85070591730234615865843651857942052862))]
2024-09-27T18:08:57.790+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.a.kinesis.leases.LeaseCleanupManager   : Starting lease cleanup thread.
2024-09-27T18:08:57.791+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Starting LeaseCoordinator
2024-09-27T18:08:57.791+01:00  INFO [msc-test,,,] 84399 --- [msc-test] [ool-16-thread-1] [                                                 ] s.a.kinesis.leases.LeaseCleanupManager   : Number of pending leases to clean before the scan : 0
2024-09-27T18:09:01.306+01:00 ERROR [msc-test,,,] 84399 --- [msc-test] [cTaskExecutor-1] [                                                 ] s.amazon.kinesis.coordinator.Scheduler   : Caught exception when initializing LeaseCoordinator

java.lang.RuntimeException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Connection refused: localhost/127.0.0.1:62599

PFB the integration test for reference on the similar lines:-

@ExtendWith(SpringExtension.class)
@Testcontainers
@SpringBootTest(classes = Application.class)
class KplKclConfigurationTest {

  @Container
  private static final LocalStackContainer localStackContainer = new LocalStackContainer(
      DockerImageName.parse("nexus.giffgaff.co.uk/localstack:3")
          .asCompatibleSubstituteFor("localstack/localstack")
  )
      .withServices(LocalStackContainer.Service.KINESIS, LocalStackContainer.Service.DYNAMODB)
      .withExposedPorts(4566);

  @BeforeAll
  static void beforeAll() throws Exception {
    executeAwsCommand(format(CREATE_KINESIS_STREAM, "kpl-kcl-stream"));
    executeAwsCommand(
        format(CREATE_DYNAMODB_TABLE, KCL_STREAM_LEASES));
  }

  private static void executeAwsCommand(String awsCommand) throws IOException, InterruptedException {
    assertThat(
        localStackContainer.execInContainer(
            "/bin/sh",
            "-c",
            awsCommand
        ).getExitCode()
    ).isZero();
  }

  @DynamicPropertySource
  static void registerPgProperties(DynamicPropertyRegistry registry) {
    registry.add("localstack.port", () -> localStackContainer.getMappedPort(4566));
  }

  private static final String CREATE_KINESIS_STREAM = "awslocal kinesis create-stream --stream-name %s --shard-count 4 --region eu-west-1";
  private static final String CREATE_DYNAMODB_TABLE = "awslocal dynamodb create-table --table-name %s --region eu-west-1 --attribute-definitions AttributeName=leaseKey,AttributeType=S --key-schema AttributeName=leaseKey,KeyType=HASH --provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=10";
  private static final String KCL_STREAM_LEASES = "test-lease-table";
  private static final String[] SHARD_LIST = {"shardId-000000000000", "shardId-000000000001",
      "shardId-000000000002", "shardId-000000000003"};

  @Resource
  private DynamoDbAsyncClient dynamoDB;


  @Test
  void verifyShards() {

    await()
        .pollInterval(ofSeconds(1))
        .atMost(ofSeconds(60))
        .untilAsserted(
            () -> {
              ScanResponse eventInputLease = dynamoDB.scan(
                  ScanRequest.builder().tableName(KCL_STREAM_LEASES).build()).join();
              assertThat(eventInputLease.items()
                  .stream().map(item -> item.get("leaseKey").s()))
                  .containsExactlyInAnyOrder(
                      SHARD_LIST
                  );
            });
  }
}
amitchidrewar1301 referenced this issue in spring-projects/spring-integration-aws Sep 27, 2024
Fixes: #245
Issue link: #245

* Add something like `setLeaseManagementConfigCustomizer(Consumer<LeaseManagementConfig> leaseManagementConfigCustomizer)`
to the `KclMessageDrivenChannelAdapter` and call them before creating a `Scheduler`
* Also add a simple `emptyRecordList` property for the `ProcessorConfig`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant