Skip to content

Commit

Permalink
Add support to clone existing pinned timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Sep 2, 2024
1 parent c6ca9bd commit d767d2b
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -17,6 +18,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Set;
import java.util.concurrent.CountDownLatch;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -75,10 +77,25 @@ public void testTimestampPinUnpin() throws Exception {

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// This should be a no-op as pinning entity is different
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", noOpActionListener);
// Unpinning already pinned entity
remoteStorePinnedTimestampService.unpinTimestamp(timestamp2, "ss3", noOpActionListener);

// This should fail as timestamp is not pinned by pinning entity
CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "no-snapshot", new LatchedActionListener<>(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
// onResponse should not get called.
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}, latch));
latch.await();

// Adding different entity to already pinned timestamp
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss5", noOpActionListener);

Expand All @@ -93,4 +110,74 @@ public void testTimestampPinUnpin() throws Exception {

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}

public void testPinnedTimestampClone() throws Exception {
prepareCluster(1, 1, INDEX_NAME, 0, 2);
ensureGreen(INDEX_NAME);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

long timestamp1 = System.currentTimeMillis() + 30000L;
long timestamp2 = System.currentTimeMillis() + 60000L;
long timestamp3 = System.currentTimeMillis() + 900000L;
remoteStorePinnedTimestampService.pinTimestamp(timestamp1, "ss2", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp2, "ss3", noOpActionListener);
remoteStorePinnedTimestampService.pinTimestamp(timestamp3, "ss4", noOpActionListener);

// Clone timestamp1
remoteStorePinnedTimestampService.cloneTimestamp(timestamp1, "ss2", "ss2-2", noOpActionListener);

// With clone, set of pinned timestamp will not change
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// Clone timestamp1 but provide invalid existing entity
CountDownLatch latch = new CountDownLatch(1);
remoteStorePinnedTimestampService.cloneTimestamp(
timestamp1,
"ss3",
"ss2-3",
new LatchedActionListener<>(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
// onResponse should not get called.
fail();
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof IllegalArgumentException);
}
}, latch)
);
latch.await();

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// Now we have timestamp1 pinned by 2 entities, unpin 1, this should not change set of pinned timestamps
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "ss2", noOpActionListener);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(
() -> assertEquals(Set.of(timestamp1, timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2())
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));

// Now unpin second entity as well, set of pinned timestamp should be reduced by 1
remoteStorePinnedTimestampService.unpinTimestamp(timestamp1, "ss2-2", noOpActionListener);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
assertBusy(() -> assertEquals(Set.of(timestamp2, timestamp3), RemoteStorePinnedTimestampService.getPinnedTimestamps().v2()));
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueMinutes(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,11 @@ public static List<String> filterOutMetadataFilesBasedOnAge(
if (RemoteStoreSettings.isPinnedTimestampsEnabled() == false) {
return new ArrayList<>(metadataFiles);
}
long maximumAllowedTimestamp = lastSuccessfulFetchOfPinnedTimestamps - RemoteStoreSettings.getPinnedTimestampsLookbackInterval()
.getMillis();
// We allow now() - loopback interval to be pinned. Also, the actual pinning can take at most loopback interval
// This means the pinned timestamp can be available for read after at most (2 * loopback interval)
long maximumAllowedTimestamp = lastSuccessfulFetchOfPinnedTimestamps - (2 * RemoteStoreSettings
.getPinnedTimestampsLookbackInterval()
.getMillis());
List<String> metadataFilesWithMinAge = new ArrayList<>();
for (String metadataFileName : metadataFiles) {
long metadataTimestamp = getTimestampFunction.apply(metadataFileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,61 @@ public void pinTimestamp(long timestamp, String pinningEntity, ActionListener<Vo
"Timestamp to be pinned is less than current timestamp - value of cluster.remote_store.pinned_timestamps.lookback_interval"
);
}
long startTime = System.nanoTime();
try {
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, pinningEntity);
blobContainer.writeBlob(getBlobName(timestamp, pinningEntity), new ByteArrayInputStream(new byte[0]), 0, true);
long elapsedTime = System.nanoTime() - startTime;
if (elapsedTime > RemoteStoreSettings.getPinnedTimestampsLookbackInterval().nanos()) {
String errorMessage = String.format(
"Timestamp pinning took %s nanoseconds which is more than limit of %s nanoseconds, failing the operation",
elapsedTime,
RemoteStoreSettings.getPinnedTimestampsLookbackInterval().nanos()
);
unpinTimestamp(timestamp, pinningEntity, ActionListener.wrap(() -> listener.onFailure(new RuntimeException(errorMessage))));
} else {
listener.onResponse(null);
}
} catch (IOException e) {
listener.onFailure(e);
}
}

/**
* Clones a timestamp by creating a new pinning entity for an existing timestamp.
*
* This method attempts to create a new pinning entity for a given timestamp that is already
* associated with an existing pinning entity. If the timestamp exists for the existing entity,
* a new blob is created for the new pinning entity. If the timestamp doesn't exist for the
* existing entity, the operation fails with an IllegalArgumentException.
*
* @param timestamp The timestamp to be cloned.
* @param existingPinningEntity The name of the existing entity that has pinned the timestamp.
* @param newPinningEntity The name of the new entity to pin the timestamp to.
* @param listener An ActionListener that will be notified of the operation's success or failure.
* On success, onResponse will be called with null. On failure, onFailure will
* be called with the appropriate exception.
*/
public void cloneTimestamp(long timestamp, String existingPinningEntity, String newPinningEntity, ActionListener<Void> listener) {
try {
logger.debug(
"cloning timestamp = {} with existing pinningEntity = {} with new pinningEntity = {}",
timestamp,
existingPinningEntity,
newPinningEntity
);
String blobName = getBlobName(timestamp, existingPinningEntity);
if (blobContainer.blobExists(blobName)) {
logger.debug("Pinning timestamp = {} against entity = {}", timestamp, newPinningEntity);
blobContainer.writeBlob(getBlobName(timestamp, newPinningEntity), new ByteArrayInputStream(new byte[0]), 0, true);
listener.onResponse(null);
} else {
String errorMessage = String.format("Timestamp: %s is not pinned by existing entity: %s", timestamp, existingPinningEntity);
listener.onFailure(new IllegalArgumentException(errorMessage));
}
} catch (IOException e) {
listener.onFailure(e);
}
listener.onResponse(null);
}

private String getBlobName(long timestamp, String pinningEntity) {
Expand Down Expand Up @@ -147,13 +195,14 @@ public void unpinTimestamp(long timestamp, String pinningEntity, ActionListener<
String blobName = getBlobName(timestamp, pinningEntity);
if (blobContainer.blobExists(blobName)) {
blobContainer.deleteBlobsIgnoringIfNotExists(List.of(blobName));
listener.onResponse(null);
} else {
logger.warn("Timestamp: {} is not pinned by entity: {}", timestamp, pinningEntity);
String errorMessage = String.format("Timestamp: %s is not pinned by entity: %s", timestamp, pinningEntity);
listener.onFailure(new IllegalArgumentException(errorMessage));
}
} catch (IOException e) {
listener.onFailure(e);
}
listener.onResponse(null);
}

@Override
Expand Down

0 comments on commit d767d2b

Please sign in to comment.