Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read blob from blob cache if exists when GetBlob() #10178

Closed
wants to merge 15 commits into from
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ set(SOURCES
db/blob/blob_log_format.cc
db/blob/blob_log_sequential_reader.cc
db/blob/blob_log_writer.cc
db/blob/blob_source.cc
db/blob/prefetch_buffer_collection.cc
db/builder.cc
db/c.cc
Expand Down Expand Up @@ -1212,6 +1213,7 @@ if(WITH_TESTS)
db/blob/blob_file_garbage_test.cc
db/blob/blob_file_reader_test.cc
db/blob/blob_garbage_meter_test.cc
db/blob/blob_source_test.cc
db/blob/db_blob_basic_test.cc
db/blob/db_blob_compaction_test.cc
db/blob/db_blob_corruption_test.cc
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,9 @@ blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRA
blob_file_reader_test: $(OBJ_DIR)/db/blob/blob_file_reader_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

blob_source_test: $(OBJ_DIR)/db/blob/blob_source_test.o $(TEST_LIBRARY) $(LIBRARY)
gangliao marked this conversation as resolved.
Show resolved Hide resolved
$(AM_LINK)

blob_garbage_meter_test: $(OBJ_DIR)/db/blob/blob_garbage_meter_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
8 changes: 8 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/blob/blob_log_format.cc",
"db/blob/blob_log_sequential_reader.cc",
"db/blob/blob_log_writer.cc",
"db/blob/blob_source.cc",
"db/blob/prefetch_buffer_collection.cc",
"db/builder.cc",
"db/c.cc",
Expand Down Expand Up @@ -358,6 +359,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"db/blob/blob_log_format.cc",
"db/blob/blob_log_sequential_reader.cc",
"db/blob/blob_log_writer.cc",
"db/blob/blob_source.cc",
"db/blob/prefetch_buffer_collection.cc",
"db/builder.cc",
"db/c.cc",
Expand Down Expand Up @@ -4798,6 +4800,12 @@ cpp_unittest_wrapper(name="blob_garbage_meter_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="blob_source_test",
srcs=["db/blob/blob_source_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="block_based_table_reader_test",
srcs=["table/block_based/block_based_table_reader_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
188 changes: 188 additions & 0 deletions db/blob/blob_source.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "db/blob/blob_source.h"

#include <cassert>
#include <string>

#include "db/blob/blob_file_reader.h"
#include "file/file_prefetch_buffer.h"
#include "options/cf_options.h"

namespace ROCKSDB_NAMESPACE {

BlobSource::BlobSource(const ImmutableOptions* immutable_options,
const std::string& db_id,
const std::string& db_session_id,
BlobFileCache* blob_file_cache)
: db_id_(db_id),
db_session_id_(db_session_id),
statistics_(immutable_options->statistics.get()),
blob_file_cache_(blob_file_cache),
blob_cache_(immutable_options->blob_cache) {}

BlobSource::~BlobSource() = default;

Status BlobSource::GetBlobFromCache(const Slice& cache_key, Cache* blob_cache,
gangliao marked this conversation as resolved.
Show resolved Hide resolved
CachableEntry<std::string>* blob) const {
assert(blob);
assert(blob->IsEmpty());

if (blob_cache != nullptr) {
gangliao marked this conversation as resolved.
Show resolved Hide resolved
assert(!cache_key.empty());
Cache::Handle* cache_handle = nullptr;
cache_handle = GetEntryFromCache(blob_cache, cache_key);
if (cache_handle != nullptr) {
blob->SetCachedValue(
static_cast<std::string*>(blob_cache->Value(cache_handle)),
blob_cache, cache_handle);
return Status::OK();
}
}

assert(blob->IsEmpty());

return Status::NotFound("Blob record not found in cache");
}

Status BlobSource::PutBlobIntoCache(const Slice& cache_key, Cache* blob_cache,
CachableEntry<std::string>* cached_blob,
PinnableSlice* blob) const {
assert(blob);
assert(!cache_key.empty());

const Cache::Priority priority = Cache::Priority::LOW;

Status s;

if (blob_cache != nullptr) {
// Objects to be put into the cache have to be heap-allocated and
// self-contained, i.e. own their contents. The Cache has to be able to take
// unique ownership of them. Therefore, we copy the blob into a string
// directly, and insert that into the cache.
std::string* buf = new std::string();
buf->assign(blob->data(), blob->size());

Cache::Handle* cache_handle = nullptr;
s = InsertEntryIntoCache(blob_cache, cache_key, buf, buf->size(),
gangliao marked this conversation as resolved.
Show resolved Hide resolved
&cache_handle, priority);
if (s.ok()) {
assert(cache_handle != nullptr);
cached_blob->SetCachedValue(buf, blob_cache, cache_handle);
}
}

return s;
}

Status BlobSource::GetBlob(const ReadOptions& read_options,
const Slice& user_key, uint64_t file_number,
uint64_t offset, uint64_t file_size,
uint64_t value_size,
CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer,
PinnableSlice* value, uint64_t* bytes_read) {
assert(value);

const uint64_t key_size = user_key.size();

if (!IsValidBlobOffset(offset, key_size, value_size, file_size)) {
return Status::Corruption("Invalid blob offset");
}

const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);

CachableEntry<std::string> blob_entry;

// First, try to get the blob from the cache
//
// If blob cache is enabled, we'll try to read from it.
Status s;
if (blob_cache_) {
Slice key = cache_key.AsSlice();
s = GetBlobFromCache(key, blob_cache_.get(), &blob_entry);
if (blob_entry.GetValue()) {
gangliao marked this conversation as resolved.
Show resolved Hide resolved
if (prefetch_buffer) {
// Update the blob details so that PrefetchBuffer can use the read
// pattern to determine if reads are sequential or not for prefetching.
// It should also take in account blob read from cache.
prefetch_buffer->UpdateReadPattern(offset,
blob_entry.GetValue()->size(),
false /* decrease_readahead_size */);
gangliao marked this conversation as resolved.
Show resolved Hide resolved
}
}

if (s.ok() && blob_entry.GetValue() != nullptr) {
assert(blob_entry.GetValue()->size() == value_size);
if (bytes_read) {
*bytes_read = value_size;
}
value->GetSelf()->swap(*blob_entry.GetValue());
gangliao marked this conversation as resolved.
Show resolved Hide resolved
value->PinSelf();
return s;
}
}

assert(blob_entry.IsEmpty());

const bool no_io = read_options.read_tier == kBlockCacheTier;
if (no_io) {
return Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
}

// Can't find the blob from the cache. Since I/O is allowed, read from the
// file.
{
CacheHandleGuard<BlobFileReader> blob_file_reader;
s = blob_file_cache_->GetBlobFileReader(file_number, &blob_file_reader);
if (!s.ok()) {
return s;
}

assert(blob_file_reader.GetValue());

if (compression_type != blob_file_reader.GetValue()->GetCompressionType()) {
return Status::Corruption("Compression type mismatch when reading blob");
}

s = blob_file_reader.GetValue()->GetBlob(
read_options, user_key, offset, value_size, compression_type,
prefetch_buffer, value, bytes_read);
if (!s.ok()) {
return s;
}
}

if (blob_cache_ && read_options.fill_cache) {
// If filling cache is allowed and a cache is configured, try to put the
// blob to the cache.
Slice key = cache_key.AsSlice();
s = PutBlobIntoCache(key, blob_cache_.get(), &blob_entry, value);
gangliao marked this conversation as resolved.
Show resolved Hide resolved
if (!s.ok()) {
return s;
}
}

assert(s.ok());
return s;
}

bool BlobSource::TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
uint64_t offset) const {
const CacheKey cache_key = GetCacheKey(file_number, file_size, offset);
const Slice key = cache_key.AsSlice();

CachableEntry<std::string> blob_entry;
const Status s = GetBlobFromCache(key, blob_cache_.get(), &blob_entry);

if (s.ok() && blob_entry.GetValue() != nullptr) {
return true;
}

return false;
}

} // namespace ROCKSDB_NAMESPACE
99 changes: 99 additions & 0 deletions db/blob/blob_source.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#pragma once

#include <cinttypes>

#include "cache/cache_helpers.h"
#include "cache/cache_key.h"
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_log_format.h"
#include "rocksdb/rocksdb_namespace.h"
#include "table/block_based/cachable_entry.h"

namespace ROCKSDB_NAMESPACE {

class Cache;
struct ImmutableOptions;
struct FileOptions;
class HistogramImpl;
class Status;
class BlobFileReader;
class FilePrefetchBuffer;
class Slice;
class IOTracer;
gangliao marked this conversation as resolved.
Show resolved Hide resolved

// BlobSource is a class that provides universal access to blobs, regardless of
// whether they are in the blob cache, secondary cache, or (remote) storage.
// Depending on user settings, it always fetch blobs from multi-tier cache and
// storage with minimal cost.
class BlobSource {
gangliao marked this conversation as resolved.
Show resolved Hide resolved
public:
// The BlobFileCache* parameter is used to store blob file readers. When it's
// passed in, BlobSource will exclusively own cache afterwards. If it's a raw
// pointer managed by another shared/unique pointer, the developer must
// release the ownership.
BlobSource(const ImmutableOptions* immutable_options,
const std::string& db_id, const std::string& db_session_id,
BlobFileCache* blob_file_cache);

BlobSource(const BlobSource&) = delete;
BlobSource& operator=(const BlobSource&) = delete;

~BlobSource();

Status GetBlob(const ReadOptions& read_options, const Slice& user_key,
uint64_t file_number, uint64_t offset, uint64_t file_size,
uint64_t value_size, CompressionType compression_type,
FilePrefetchBuffer* prefetch_buffer, PinnableSlice* value,
uint64_t* bytes_read);

bool TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
uint64_t offset) const;

private:
Status GetBlobFromCache(const Slice& cache_key, Cache* blob_cache,
CachableEntry<std::string>* blob) const;

Status PutBlobIntoCache(const Slice& cache_key, Cache* blob_cache,
CachableEntry<std::string>* cached_blob,
PinnableSlice* blob) const;

inline CacheKey GetCacheKey(uint64_t file_number, uint64_t file_size,
uint64_t offset) const {
OffsetableCacheKey base_cache_key =
OffsetableCacheKey(db_id_, db_session_id_, file_number, file_size);
gangliao marked this conversation as resolved.
Show resolved Hide resolved
return base_cache_key.WithOffset(offset);
}

inline Cache::Handle* GetEntryFromCache(Cache* blob_cache,
const Slice& key) const {
assert(blob_cache);
return blob_cache->Lookup(key, statistics_);
}

inline Status InsertEntryIntoCache(Cache* blob_cache, const Slice& key,
const std::string* value, size_t charge,
Cache::Handle** cache_handle,
Cache::Priority priority) const {
return blob_cache->Insert(
key, const_cast<void*>(static_cast<const void*>(value)), charge,
gangliao marked this conversation as resolved.
Show resolved Hide resolved
&DeleteCacheEntry<std::string>, cache_handle, priority);
}

const std::string db_id_;
const std::string db_session_id_;

Statistics* statistics_;

// A cache to store blob file reader.
std::unique_ptr<BlobFileCache> blob_file_cache_;

// A cache to store uncompressed blobs.
std::shared_ptr<Cache> blob_cache_;
};

} // namespace ROCKSDB_NAMESPACE
Loading