Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download functionality of global metadata from remote store #10535

3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. ([#10110](https:/opensearch-project/OpenSearch/pull/10110))
- Provide service accounts tokens to extensions ([#9618](https:/opensearch-project/OpenSearch/pull/9618))
- Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https:/opensearch-project/OpenSearch/pull/9992))
- Download functionality of global metadata from remote store ([#10535](https:/opensearch-project/OpenSearch/pull/10535))
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down Expand Up @@ -157,4 +158,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https:/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https:/opensearch-project/OpenSearch/compare/2.11...2.x
[Unreleased 2.x]: https:/opensearch-project/OpenSearch/compare/2.11...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ public void testFullClusterRestoreStaleDelete() throws Exception {

assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata(
Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestMetadata(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
);
).getIndices();
assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ public List<UploadedIndexMetadata> getIndices() {
return indices;
}

// TODO: Remove this once upload PR is merged.
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
public String getGlobalMetadataFileName() {
return "global-metadata-file";
}

public long getClusterTerm() {
return clusterTerm;
}
Expand Down Expand Up @@ -366,6 +371,11 @@ public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) {
return this;
}

// TODO: remove once upload PR is merged.
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
public Builder globalMetadataFileName(String globalMetadatFileName) {
return this;
}

public Builder() {
indices = new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
Expand Down Expand Up @@ -516,18 +517,14 @@ private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @param clusterMetadataManifest manifest file of cluster
* @return {@code Map<String, IndexMetadata>} latest IndexUUID to IndexMetadata map
*/
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException {
start();
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!clusterMetadataManifest.isPresent()) {
throw new IllegalStateException("Latest index metadata is not present for the provided clusterUUID");
}
assert Objects.equals(clusterUUID, clusterMetadataManifest.get().getClusterUUID())
private Map<String, IndexMetadata> getIndexMetadataMap(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) throws IOException {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID())
: "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch.";
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.get().getIndices()) {
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) {
IndexMetadata indexMetadata = getIndexMetadata(clusterName, clusterUUID, uploadedIndexMetadata);
remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata);
}
Expand Down Expand Up @@ -558,6 +555,56 @@ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, U
}
}

/**
* Fetch latest metadata from remote cluster state including global metadata and index metadata
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return {@link IndexMetadata}
*/
public Metadata getLatestMetadata(String clusterName, String clusterUUID) throws IOException{
start();
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
if (!clusterMetadataManifest.isPresent()) {
throw new IllegalStateException("Latest index metadata is not present for the provided clusterUUID");
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
}
// Fetch Global Metadata
Metadata metadataWithGlobalMetadata = getGlobalMetadata(clusterName, clusterUUID, clusterMetadataManifest.get());
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved

// Fetch Index Metadata
Map<String, IndexMetadata> indices = getIndexMetadataMap(clusterName, clusterUUID, clusterMetadataManifest.get());

return Metadata.builder(metadataWithGlobalMetadata).indices(indices).build();
}

private Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) {
String globalMetadataFileName = clusterMetadataManifest.getGlobalMetadataFileName();
try {
// Fetch Global metadata
if(globalMetadataFileName != null) {
String[] splitPath = globalMetadataFileName.split("/");
return BlobStoreRepository.GLOBAL_METADATA_FORMAT.read(
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
globalMetadataContainer(clusterName, clusterUUID),
splitPath[splitPath.length - 1],
blobStoreRepository.getNamedXContentRegistry()
);
} else {
return Metadata.EMPTY_METADATA;
}
} catch (IOException e) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Error while downloading Global Metadata - %s", globalMetadataFileName),
e
);
}
}

// TODO: Remove this once Upload PR is merged
private BlobContainer globalMetadataContainer(String clusterName, String clusterUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/
return blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(clusterName, clusterUUID).add("global-metadata"));
}

/**
* Fetch latest ClusterMetadataManifest from remote state store
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public RemoteRestoreResult restore(
|| restoreClusterUUID.isBlank()) == false;
if (metadataFromRemoteStore) {
try {
remoteClusterStateService.getLatestIndexMetadata(currentState.getClusterName().value(), restoreClusterUUID)
remoteClusterStateService.getLatestMetadata(currentState.getClusterName().value(), restoreClusterUUID).getIndices()
.values()
.forEach(indexMetadata -> {
indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE

remoteClusterStateService.start();
assertEquals(
remoteClusterStateService.getLatestIndexMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID()).getIndices()
.size(),
0
);
Expand Down Expand Up @@ -433,10 +433,10 @@ public void testReadLatestMetadataManifestSuccessButIndexMetadataFetchIOExceptio
remoteClusterStateService.start();
Exception e = assertThrows(
IllegalStateException.class,
() -> remoteClusterStateService.getLatestIndexMetadata(
() -> remoteClusterStateService.getLatestMetadata(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
)
).getIndices()
);
assertEquals(e.getMessage(), "Error while downloading IndexMetadata - " + uploadedIndexMetadata.getUploadedFilename());
}
Expand Down Expand Up @@ -474,6 +474,83 @@ public void testReadLatestMetadataManifestSuccess() throws IOException {
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
}

public void testReadGlobalMetadata() throws IOException {
final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();

final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
.indices(List.of())
.clusterTerm(1L)
.stateVersion(1L)
.stateUUID("state-uuid")
.clusterUUID("cluster-uuid")
.globalMetadataFileName("global-metadata-file")
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID("prev-cluster-uuid")
.build();

Metadata expactedMetadata = Metadata.builder().persistentSettings(Settings.builder().put("readonly", true).build()).build();
mockBlobContainerForGlobalMetadata(mockBlobStoreObjects(), expectedManifest, expactedMetadata);

Metadata metadata = remoteClusterStateService.getLatestMetadata(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);

assertTrue(Metadata.isGlobalStateEquals(metadata, expactedMetadata));
}

public void testReadGlobalMetadataIOException() throws IOException {
final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();
String globalIndexMetadataName = "global-metadata-file";
final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
.indices(List.of())
.clusterTerm(1L)
.stateVersion(1L)
.stateUUID("state-uuid")
.clusterUUID("cluster-uuid")
.globalMetadataFileName(globalIndexMetadataName)
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID("prev-cluster-uuid")
.build();

Metadata expactedMetadata = Metadata.builder().persistentSettings(Settings.builder().put("readonly", true).build()).build();

BlobContainer blobContainer = mockBlobStoreObjects();
mockBlobContainerForGlobalMetadata(blobContainer, expectedManifest, expactedMetadata);

when(blobContainer.readBlob(BlobStoreRepository.GLOBAL_METADATA_FORMAT.blobName(globalIndexMetadataName))).thenThrow(
FileNotFoundException.class
);

remoteClusterStateService.start();
Exception e = assertThrows(
IllegalStateException.class,
() -> remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
);
assertEquals(e.getMessage(), "Error while downloading Global Metadata - " + globalIndexMetadataName);
}

// TODO remove this once Upload PR gets merged.
private static ClusterState.Builder generateClusterStateWithGlobalMetadata() {
final Settings clusterSettings = Settings.builder().put("cluster.blocks.read_only", true).build();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();

return ClusterState.builder(ClusterName.DEFAULT)
.version(1L)
.stateUUID("state-uuid")
.metadata(
Metadata.builder()
.persistentSettings(clusterSettings)
.clusterUUID("cluster-uuid")
.coordinationMetadata(coordinationMetadata)
.build()
);
}

public void testReadLatestIndexMetadataSuccess() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();
Expand Down Expand Up @@ -504,10 +581,10 @@ public void testReadLatestIndexMetadataSuccess() throws IOException {

mockBlobContainer(mockBlobStoreObjects(), expectedManifest, Map.of(index.getUUID(), indexMetadata));

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata(
Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestMetadata(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);
).getIndices();

assertEquals(indexMetadataMap.size(), 1);
assertEquals(indexMetadataMap.get(index.getUUID()).getIndex().getName(), index.getName());
Expand Down Expand Up @@ -846,6 +923,38 @@ private void mockBlobContainer(
});
}

private void mockBlobContainerForGlobalMetadata(
BlobContainer blobContainer,
ClusterMetadataManifest clusterMetadataManifest,
Metadata metadata
) throws IOException {
BlobMetadata blobMetadata = new PlainBlobMetadata("manifestFileName", 1);
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(Arrays.asList(blobMetadata));

BytesReference bytes = RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.serialize(
clusterMetadataManifest,
"manifestFileName",
blobStoreRepository.getCompressor()
);
when(blobContainer.readBlob("manifestFileName")).thenReturn(new ByteArrayInputStream(bytes.streamInput().readAllBytes()));

BytesReference bytesGlobalMetadata = BlobStoreRepository.GLOBAL_METADATA_FORMAT.serialize(
metadata,
"global-metadata",
blobStoreRepository.getCompressor()
);
String[] splitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/");
when(blobContainer.readBlob(BlobStoreRepository.GLOBAL_METADATA_FORMAT.blobName(splitPath[splitPath.length - 1]))).thenReturn(
new ByteArrayInputStream(bytesGlobalMetadata.streamInput().readAllBytes())
);
}

private static ClusterState.Builder generateClusterStateWithOneIndex() {
final Index index = new Index("test-index", "index-uuid");
final Settings idxSettings = Settings.builder()
Expand Down
Loading