Skip to content

Commit

Permalink
Implement range reads in HDFS repository (#9524) (#9535)
Browse files Browse the repository at this point in the history
Resolves #9513


(cherry picked from commit 762c70b)

Signed-off-by: Andrew Ross <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 3d9378c commit f147372
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow test clusters to run with TLS ([#8900](https:/opensearch-project/OpenSearch/pull/8900))
- Add jdk.incubator.vector module support for JDK 20+ ([#8601](https:/opensearch-project/OpenSearch/pull/8601))
- [Feature] Expose term frequency in Painless script score context ([#9081](https:/opensearch-project/OpenSearch/pull/9081))
- Add support for reading partial files to HDFS repository ([#9513](https:/opensearch-project/OpenSearch/issues/9513))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https:/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.repositories.hdfs;

import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -46,6 +47,7 @@
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.io.Streams;
import org.opensearch.repositories.hdfs.HdfsBlobStore.Operation;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -125,8 +127,23 @@ public InputStream readBlob(String blobName) throws IOException {
}

@Override
public InputStream readBlob(String blobName, long position, long length) {
throw new UnsupportedOperationException();
public InputStream readBlob(String blobName, long position, long length) throws IOException {
return store.execute(fileContext -> {
final FSDataInputStream stream;
try {
stream = fileContext.open(new Path(path, blobName), bufferSize);
} catch (FileNotFoundException fnfe) {
throw new NoSuchFileException("[" + blobName + "] blob not found");
}
// Seek to the desired start position, closing the stream if any error occurs
try {
stream.seek(position);
} catch (Exception e) {
stream.close();
throw e;
}
return Streams.limitStream(new HDFSPrivilegedInputSteam(stream, securityContext), length);
});
}

@Override
Expand Down

0 comments on commit f147372

Please sign in to comment.