Skip to content

Commit

Permalink
Merge pull request awslabs#55 from ashwing/ltr_1_periodic_auditor_met…
Browse files Browse the repository at this point in the history
…rics_configs

Ltr 1 periodic auditor metrics configs
  • Loading branch information
ashwing authored Jun 16, 2020
2 parents a24e7dc + e7e8196 commit 562face
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Validate;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.HashKeyRangeForLease;
Expand All @@ -38,6 +39,10 @@
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;

import java.io.Serializable;
import java.math.BigInteger;
Expand All @@ -47,7 +52,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
Expand All @@ -67,13 +71,11 @@
@Slf4j
class PeriodicShardSyncManager {
private static final long INITIAL_DELAY = 60 * 1000L;
private static final long PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
@VisibleForTesting
static final BigInteger MIN_HASH_KEY = BigInteger.ZERO;
@VisibleForTesting
static final BigInteger MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE);
@VisibleForTesting
static final int CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY = 3;
static final String PERIODIC_SHARD_SYNC_MANAGER = "PeriodicShardSyncManager";
private Map<StreamIdentifier, HashRangeHoleTracker> hashRangeHoleTrackerMap = new HashMap<>();

private final String workerId;
Expand All @@ -83,19 +85,29 @@ class PeriodicShardSyncManager {
private final Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider;
private final ScheduledExecutorService shardSyncThreadPool;
private final boolean isMultiStreamingMode;
private final MetricsFactory metricsFactory;
private final long leasesRecoveryAuditorExecutionFrequencyMillis;
private final int leasesRecoveryAuditorInconsistencyConfidenceThreshold;
private boolean isRunning;

PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, boolean isMultiStreamingMode) {
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider, boolean isMultiStreamingMode,
MetricsFactory metricsFactory,
long leasesRecoveryAuditorExecutionFrequencyMillis,
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
this(workerId, leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider,
Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode);
Executors.newSingleThreadScheduledExecutor(), isMultiStreamingMode, metricsFactory,
leasesRecoveryAuditorExecutionFrequencyMillis, leasesRecoveryAuditorInconsistencyConfidenceThreshold);
}

PeriodicShardSyncManager(String workerId, LeaderDecider leaderDecider, LeaseRefresher leaseRefresher,
Map<StreamIdentifier, StreamConfig> currentStreamConfigMap,
Function<StreamConfig, ShardSyncTaskManager> shardSyncTaskManagerProvider,
ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode) {
ScheduledExecutorService shardSyncThreadPool, boolean isMultiStreamingMode,
MetricsFactory metricsFactory,
long leasesRecoveryAuditorExecutionFrequencyMillis,
int leasesRecoveryAuditorInconsistencyConfidenceThreshold) {
Validate.notBlank(workerId, "WorkerID is required to initialize PeriodicShardSyncManager.");
Validate.notNull(leaderDecider, "LeaderDecider is required to initialize PeriodicShardSyncManager.");
this.workerId = workerId;
Expand All @@ -105,6 +117,9 @@ class PeriodicShardSyncManager {
this.shardSyncTaskManagerProvider = shardSyncTaskManagerProvider;
this.shardSyncThreadPool = shardSyncThreadPool;
this.isMultiStreamingMode = isMultiStreamingMode;
this.metricsFactory = metricsFactory;
this.leasesRecoveryAuditorExecutionFrequencyMillis = leasesRecoveryAuditorExecutionFrequencyMillis;
this.leasesRecoveryAuditorInconsistencyConfidenceThreshold = leasesRecoveryAuditorInconsistencyConfidenceThreshold;
}

public synchronized TaskResult start() {
Expand All @@ -116,7 +131,7 @@ public synchronized TaskResult start() {
log.error("Error during runShardSync.", t);
}
};
shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, PERIODIC_SHARD_SYNC_INTERVAL_MILLIS,
shardSyncThreadPool.scheduleWithFixedDelay(periodicShardSyncer, INITIAL_DELAY, leasesRecoveryAuditorExecutionFrequencyMillis,
TimeUnit.MILLISECONDS);
isRunning = true;

Expand Down Expand Up @@ -157,6 +172,14 @@ public void stop() {
private void runShardSync() {
if (leaderDecider.isLeader(workerId)) {
log.info(String.format("WorkerId %s is leader, running the periodic shard sync task", workerId));

final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory,
PERIODIC_SHARD_SYNC_MANAGER);
int numStreamsWithPartialLeases = 0;
int numStreamsToSync = 0;
boolean isRunSuccess = false;
final long runStartMillis = System.currentTimeMillis();

try {
// Construct the stream to leases map to be used in the lease sync
final Map<StreamIdentifier, List<Lease>> streamToLeasesMap = getStreamToLeasesMap(
Expand All @@ -166,6 +189,10 @@ private void runShardSync() {
for (Map.Entry<StreamIdentifier, StreamConfig> streamConfigEntry : currentStreamConfigMap.entrySet()) {
final ShardSyncResponse shardSyncResponse = checkForShardSync(streamConfigEntry.getKey(),
streamToLeasesMap.get(streamConfigEntry.getKey()));

numStreamsWithPartialLeases += shardSyncResponse.isHoleDetected() ? 1 : 0;
numStreamsToSync += shardSyncResponse.shouldDoShardSync ? 1 : 0;

if (shardSyncResponse.shouldDoShardSync()) {
log.info("Periodic shard syncer initiating shard sync for {} due to the reason - {} ",
streamConfigEntry.getKey(), shardSyncResponse.reasonForDecision());
Expand All @@ -181,8 +208,14 @@ private void runShardSync() {
shardSyncResponse.reasonForDecision());
}
}
isRunSuccess = true;
} catch (Exception e) {
log.error("Caught exception while running periodic shard syncer.", e);
} finally {
scope.addData("NumStreamsWithPartialLeases", numStreamsWithPartialLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("NumStreamsToSync", numStreamsToSync, StandardUnit.COUNT, MetricsLevel.SUMMARY);
MetricsUtil.addSuccessAndLatency(scope, isRunSuccess, runStartMillis, MetricsLevel.SUMMARY);
scope.end();
}
} else {
log.debug("WorkerId {} is not a leader, not running the shard sync task", workerId);
Expand Down Expand Up @@ -214,7 +247,7 @@ ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List<Leas
if (CollectionUtils.isNullOrEmpty(leases)) {
// If the leases is null or empty then we need to do shard sync
log.info("No leases found for {}. Will be triggering shard sync", streamIdentifier);
return new ShardSyncResponse(true, "No leases found for " + streamIdentifier);
return new ShardSyncResponse(true, false, "No leases found for " + streamIdentifier);
}
// Check if there are any holes in the leases and return the first hole if present.
Optional<HashRangeHole> hashRangeHoleOpt = hasHoleInLeases(streamIdentifier, leases);
Expand All @@ -227,15 +260,15 @@ ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List<Leas
.computeIfAbsent(streamIdentifier, s -> new HashRangeHoleTracker());
final boolean hasHoleWithHighConfidence = hashRangeHoleTracker
.hasHighConfidenceOfHoleWith(hashRangeHoleOpt.get());
return new ShardSyncResponse(hasHoleWithHighConfidence,
return new ShardSyncResponse(hasHoleWithHighConfidence, true,
"Detected same hole for " + hashRangeHoleTracker.getNumConsecutiveHoles()
+ " times. Shard sync will be initiated when threshold reaches "
+ CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY);
+ leasesRecoveryAuditorInconsistencyConfidenceThreshold);

} else {
// If hole is not present, clear any previous tracking for this stream and return false;
hashRangeHoleTrackerMap.remove(streamIdentifier);
return new ShardSyncResponse(false, "Hash Ranges are complete for " + streamIdentifier);
return new ShardSyncResponse(false, false, "Hash Ranges are complete for " + streamIdentifier);
}
}

Expand All @@ -244,6 +277,7 @@ ShardSyncResponse checkForShardSync(StreamIdentifier streamIdentifier, List<Leas
@VisibleForTesting
static class ShardSyncResponse {
private final boolean shouldDoShardSync;
private final boolean isHoleDetected;
private final String reasonForDecision;
}

Expand Down Expand Up @@ -365,7 +399,7 @@ private static class HashRangeHole {
private final HashKeyRangeForLease hashRangeAtEndOfPossibleHole;
}

private static class HashRangeHoleTracker {
private class HashRangeHoleTracker {
private HashRangeHole hashRangeHole;
@Getter
private Integer numConsecutiveHoles;
Expand All @@ -377,7 +411,7 @@ public boolean hasHighConfidenceOfHoleWith(@NonNull HashRangeHole hashRangeHole)
this.hashRangeHole = hashRangeHole;
this.numConsecutiveHoles = 1;
}
return numConsecutiveHoles >= CONSECUTIVE_HOLES_FOR_TRIGGERING_RECOVERY;
return numConsecutiveHoles >= leasesRecoveryAuditorInconsistencyConfidenceThreshold;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class Scheduler implements Runnable {
private static final long MIN_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 1 * 1000L;
private static final long MAX_WAIT_TIME_FOR_LEASE_TABLE_CHECK_MILLIS = 30 * 1000L;
private static final long NEW_STREAM_CHECK_INTERVAL_MILLIS = 1 * 60 * 1000L;
private static final boolean SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS = false;
private static final String MULTI_STREAM_TRACKER = "MultiStreamTracker";
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
Expand Down Expand Up @@ -290,7 +291,9 @@ protected Scheduler(@NonNull final CheckpointConfig checkpointConfig,
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, isMultiStreamMode);
shardSyncTaskManagerProvider, isMultiStreamMode, metricsFactory,
leaseManagementConfig.leasesRecoveryAuditorExecutionFrequencyMillis(),
leaseManagementConfig.leasesRecoveryAuditorInconsistencyConfidenceThreshold());
this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createLeaseCleanupManager(metricsFactory);
}
Expand Down Expand Up @@ -489,10 +492,9 @@ Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
}
};

if (formerStreamsLeasesDeletionStrategy.shouldCleanupLeasesForDeletedStreams()) {
if (SHOULD_DO_LEASE_SYNC_FOR_OLD_STREAMS) {
// We do lease sync for old streams, before leaving to the deletion strategy to delete leases for
// strategy detected leases. Also, for deleted streams we expect the shard sync to remove the
// leases.
// strategy detected leases.
Iterator<StreamIdentifier> currentSetOfStreamsIter = currentStreamConfigMap.keySet().iterator();
while (currentSetOfStreamsIter.hasNext()) {
StreamIdentifier streamIdentifier = currentSetOfStreamsIter.next();
Expand Down Expand Up @@ -531,13 +533,13 @@ Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
currentStreamConfigMap.keySet().stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier));

} else if (formerStreamsLeasesDeletionStrategy.leaseDeletionType() == StreamsLeasesDeletionType.PROVIDED_STREAMS_DEFERRED_DELETION) {
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiers()).ifPresent(
Optional.ofNullable(formerStreamsLeasesDeletionStrategy.streamIdentifiersForLeaseCleanup()).ifPresent(
streamIdentifiers -> streamIdentifiers.stream().forEach(streamIdentifier -> enqueueStreamLeaseDeletionOperation.accept(streamIdentifier)));
}

// Now let's scan the streamIdentifiers eligible for deferred deletion and delete them.
// Now let's scan the streamIdentifiersForLeaseCleanup eligible for deferred deletion and delete them.
// StreamIdentifiers are eligible for deletion only when the deferment period has elapsed and
// the streamIdentifiers are not present in the latest snapshot.
// the streamIdentifiersForLeaseCleanup are not present in the latest snapshot.
final Map<Boolean, Set<StreamIdentifier>> staleStreamIdDeletionDecisionMap = staleStreamDeletionMap.keySet().stream().collect(Collectors
.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet()));
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class LeaseManagementConfig {
public static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis();
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
public static final long DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS = 2 * 60 * 1000L;
public static final int DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY = 3;


public static final LeaseCleanupConfig DEFAULT_LEASE_CLEANUP_CONFIG = LeaseCleanupConfig.builder()
.leaseCleanupIntervalMillis(DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS)
Expand Down Expand Up @@ -195,6 +198,20 @@ public class LeaseManagementConfig {

private BillingMode billingMode = BillingMode.PROVISIONED;

/**
* Frequency (in millis) of the auditor job to scan for partial leases in the lease table.
* If the auditor detects any hole in the leases for a stream, then it would trigger shard sync based on
* {@link #leasesRecoveryAuditorInconsistencyConfidenceThreshold}
*/
private long leasesRecoveryAuditorExecutionFrequencyMillis = DEFAULT_PERIODIC_SHARD_SYNC_INTERVAL_MILLIS;

/**
* Confidence threshold for the periodic auditor job to determine if leases for a stream in the lease table
* is inconsistent. If the auditor finds same set of inconsistencies consecutively for a stream for this many times,
* then it would trigger a shard sync.
*/
private int leasesRecoveryAuditorInconsistencyConfidenceThreshold = DEFAULT_CONSECUTIVE_HOLES_FOR_TRIGGERING_LEASE_RECOVERY;

/**
* The initial position for getting records from Kinesis streams.
*
Expand Down
Loading

0 comments on commit 562face

Please sign in to comment.