From 63e28d463761be9549465c65e534672d7cb19c92 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Sat, 8 Jun 2024 22:30:17 +0530 Subject: [PATCH 1/2] Create serde utility for Writable classes Signed-off-by: Sooraj Sinha --- .../core/compress/CompressorRegistry.java | 13 +++ .../ChecksumWritableBlobStoreFormat.java | 108 ++++++++++++++++++ .../ChecksumWritableBlobStoreFormatTests.java | 66 +++++++++++ 3 files changed, 187 insertions(+) create mode 100644 server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java create mode 100644 server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java diff --git a/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java b/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java index af09a7aebba79..711f56c9f3e3b 100644 --- a/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java @@ -78,6 +78,19 @@ public static Compressor compressor(final BytesReference bytes) { return null; } + /** + * @param bytes The bytes to check the compression for + * @return The detected compressor. If no compressor detected then return NoneCompressor. + */ + public static Compressor compressorForWritable(final BytesReference bytes) { + for (Compressor compressor : registeredCompressors.values()) { + if (compressor.isCompressed(bytes) == true) { + return compressor; + } + } + return CompressorRegistry.none(); + } + /** Decompress the provided {@link BytesReference}. */ public static BytesReference uncompress(BytesReference bytes) throws IOException { Compressor compressor = compressor(bytes); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java new file mode 100644 index 0000000000000..088f2153701b0 --- /dev/null +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.blobstore; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.apache.lucene.util.BytesRef; +import org.opensearch.Version; +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.common.lucene.store.IndexOutputOutputStream; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; +import org.opensearch.gateway.CorruptStateException; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +/** + * Checksum File format used to serialize/deserialize {@link Writeable} objects + * + * @opensearch.internal + */ +public class ChecksumWritableBlobStoreFormat { + + public static final int VERSION = 1; + + private static final int BUFFER_SIZE = 4096; + + private final String codec; + private final CheckedFunction reader; + + public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction reader) { + this.codec = codec; + this.reader = reader; + } + + public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException { + try (BytesStreamOutput outputStream = new BytesStreamOutput()) { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")", + blobName, + outputStream, + BUFFER_SIZE + ) + ) { + CodecUtil.writeHeader(indexOutput, codec, VERSION); + + try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { + @Override + public void close() throws IOException { + // this is important since some of the XContentBuilders write bytes on close. + // in order to write the footer we need to prevent closing the actual index input. + } + }; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) { + // TODO The stream version should be configurable + stream.setVersion(Version.CURRENT); + obj.writeTo(stream); + } + CodecUtil.writeFooter(indexOutput); + } + return outputStream.bytes(); + } + } + + public T deserialize(String blobName, CheckedFunction reader, BytesReference bytes) throws IOException { + final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; + try { + final IndexInput indexInput = bytes.length() > 0 + ? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc) + : new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES); + CodecUtil.checksumEntireFile(indexInput); + CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); + long filePointer = indexInput.getFilePointer(); + long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer; + BytesReference bytesReference = bytes.slice((int) filePointer, (int) contentSize); + Compressor compressor = CompressorRegistry.compressorForWritable(bytesReference); + try (StreamInput in = new InputStreamStreamInput(compressor.threadLocalInputStream(bytesReference.streamInput()))) { + return reader.apply(in); + } + } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + // we trick this into a dedicated exception with the original stacktrace + throw new CorruptStateException(ex); + } + } + +} diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java new file mode 100644 index 0000000000000..6d2f274a0ba48 --- /dev/null +++ b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.blobstore; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.CompressorRegistry; +import org.opensearch.core.index.Index; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.is; + +/** + * Tests for {@link ChecksumWritableBlobStoreFormat} + */ +public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase { + private static final String TEST_BLOB_FILE_NAME = "test-blob-name"; + private static final long VERSION = 5L; + + private final ChecksumWritableBlobStoreFormat clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>( + "index-metadata", + IndexMetadata::readFrom + ); + + public void testSerDe() throws IOException { + IndexMetadata indexMetadata = getIndexMetadata(); + BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none()); + IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, IndexMetadata::readFrom, bytesReference); + assertThat(readIndexMetadata, is(indexMetadata)); + } + + public void testSerDeForCompressed() throws IOException { + IndexMetadata indexMetadata = getIndexMetadata(); + BytesReference bytesReference = clusterBlocksFormat.serialize( + indexMetadata, + TEST_BLOB_FILE_NAME, + CompressorRegistry.getCompressor(DeflateCompressor.NAME) + ); + IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, IndexMetadata::readFrom, bytesReference); + assertThat(readIndexMetadata, is(indexMetadata)); + } + + private IndexMetadata getIndexMetadata() { + final Index index = new Index("test-index", "index-uuid"); + final Settings idxSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID()) + .build(); + return new IndexMetadata.Builder(index.getName()).settings(idxSettings) + .version(VERSION) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + } +} From b9b8075a35d31cc174505334c3cf07bf0017c522 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Mon, 10 Jun 2024 17:05:11 +0530 Subject: [PATCH 2/2] Remove reader function from method signature Signed-off-by: Sooraj Sinha --- .../blobstore/ChecksumWritableBlobStoreFormat.java | 2 +- .../blobstore/ChecksumWritableBlobStoreFormatTests.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java index 088f2153701b0..0add86ab88a16 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java @@ -84,7 +84,7 @@ public void close() throws IOException { } } - public T deserialize(String blobName, CheckedFunction reader, BytesReference bytes) throws IOException { + public T deserialize(String blobName, BytesReference bytes) throws IOException { final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; try { final IndexInput indexInput = bytes.length() > 0 diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java index 6d2f274a0ba48..536df880b2597 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java @@ -36,7 +36,7 @@ public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase { public void testSerDe() throws IOException { IndexMetadata indexMetadata = getIndexMetadata(); BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none()); - IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, IndexMetadata::readFrom, bytesReference); + IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference); assertThat(readIndexMetadata, is(indexMetadata)); } @@ -47,7 +47,7 @@ public void testSerDeForCompressed() throws IOException { TEST_BLOB_FILE_NAME, CompressorRegistry.getCompressor(DeflateCompressor.NAME) ); - IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, IndexMetadata::readFrom, bytesReference); + IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference); assertThat(readIndexMetadata, is(indexMetadata)); }