Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supprot blobdb #1456

Merged
merged 3 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,39 @@ max-bytes-for-level-multiplier : 10

# rate limiter bandwidth, default 200MB
#rate-limiter-bandwidth : 209715200

# rocksdb blob configure
# wiki https:/facebook/rocksdb/wiki/BlobDB
# enable rocksdb blob, default no
# enable-blob-files : yes

# values at or above this threshold will be written to blob files during flush or compaction.
# Supported Units [K|M|G], default unit is in [bytes].
# min-blob-size : 4K

# the size limit for blob files
# Supported Units [K|M|G], default unit is in [bytes].
# blob-file-size : 256M

# the compression type to use for blob files. All blobs in the same file are compressed using the same algorithm.
# Supported types: [snappy, zlib, lz4, zstd]. If you do not wanna compress the SST file, please set its value as none.
# you should compile the Pika from the source code and then link it with other compression algorithm library statically by yourself.
# blob-compression-type : lz4
wanghenshui marked this conversation as resolved.
Show resolved Hide resolved

# set this to open to make BlobDB actively relocate valid blobs from the oldest blob files as they are encountered during compaction.
# enable-blob-garbage-collection : no

# the cutoff that the GC logic uses to determine which blob files should be considered “old“.
# This parameter can be tuned to adjust the trade-off between write amplification and space amplification.
# blob-garbage-collection-age-cutoff : 0.25

# if the ratio of garbage in the oldest blob files exceeds this threshold,
# targeted compactions are scheduled in order to force garbage collecting the blob files in question
# blob_garbage_collection_force_threshold : 1.0

# the Cache object to use for blobs, default not open
# blob-cache : 0

# blob-num-shard-bits default -1, the number of bits from cache keys to be use as shard id.
# The cache will be sharded into 2^blob-num-shard-bits shards.
# blob-num-shard-bits : -1
21 changes: 21 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,16 @@ class PikaConf : public pstd::BaseConf {
return rate_limiter_bandwidth_;
}

bool enable_blob_files() { return enable_blob_files_; }
int64_t min_blob_size() { return min_blob_size_; }
int64_t blob_file_size() { return blob_file_size_; }
std::string blob_compression_type() { return blob_compression_type_; }
bool enable_blob_garbage_collection() { return enable_blob_garbage_collection_; }
double blob_garbage_collection_age_cutoff() { return blob_garbage_collection_age_cutoff_; }
double blob_garbage_collection_force_threshold() { return blob_garbage_collection_force_threshold_; }
int64_t blob_cache() { return blob_cache_; }
int64_t blob_num_shard_bits() { return blob_num_shard_bits_; }

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
Expand Down Expand Up @@ -540,6 +550,17 @@ class PikaConf : public pstd::BaseConf {
int target_file_size_base_ = 0;
int binlog_file_size_ = 0;

// rocksdb blob
bool enable_blob_files_ = false;
int64_t min_blob_size_ = 4096; // 4K
int64_t blob_file_size_ = 256 * 1024 * 1024; // 256M
std::string blob_compression_type_ = "none";
bool enable_blob_garbage_collection_ = false;
double blob_garbage_collection_age_cutoff_ = 0.25;
double blob_garbage_collection_force_threshold_ = 1.0;
int64_t blob_cache_ = 0;
int64_t blob_num_shard_bits_ = 0;

PikaMeta* local_meta_ = nullptr;

pthread_rwlock_t rwlock_;
Expand Down
23 changes: 23 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,29 @@ int PikaConf::Load() {
max_conn_rbuf_size_.store(PIKA_MAX_CONN_RBUF);
}

// rocksdb blob configure
GetConfBool("enable-blob-files", &enable_blob_files_);
GetConfInt64("min-blob-size", &min_blob_size_);
if (min_blob_size_ <= 0) {
min_blob_size_ = 4096;
}
GetConfInt64Human("blob-file-size", &blob_file_size_);
if (blob_file_size_ <= 0) {
blob_file_size_ = 256 * 1024 * 1024;
}
GetConfStr("blob-compression-type", &blob_compression_type_);
GetConfBool("enable-blob-garbage-collection", &enable_blob_garbage_collection_);
GetConfDouble("blob-garbage-collection-age-cutoff", &blob_garbage_collection_age_cutoff_);
if (blob_garbage_collection_age_cutoff_ <= 0) {
blob_garbage_collection_age_cutoff_ = 0.25;
}
GetConfDouble("blob-garbage-collection-force-threshold", &blob_garbage_collection_force_threshold_);
if (blob_garbage_collection_force_threshold_ <= 0) {
blob_garbage_collection_force_threshold_ = 1.0;
}
GetConfInt64("blob-cache", &block_cache_);
GetConfInt64("blob-num-shard-bits", &blob_num_shard_bits_);

return ret;
}

Expand Down
27 changes: 23 additions & 4 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -962,13 +962,15 @@ void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& t
int32_t top) {
std::shared_ptr<Partition> partition = GetTablePartitionById(table_name, partition_id);
if (!partition) {
LOG(WARNING) << "can not find Partition whose id is " << partition_id << " in table " << table_name << ", TryDBSync Failed";
LOG(WARNING) << "can not find Partition whose id is " << partition_id << " in table " << table_name
<< ", TryDBSync Failed";
return;
}
std::shared_ptr<SyncMasterPartition> sync_partition =
g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id));
if (!sync_partition) {
LOG(WARNING) << "can not find Partition whose id is " << partition_id << " in table " << table_name << ", TryDBSync Failed";
LOG(WARNING) << "can not find Partition whose id is " << partition_id << " in table " << table_name
<< ", TryDBSync Failed";
return;
}
BgSaveInfo bgsave_info = partition->bgsave_info();
Expand All @@ -985,7 +987,8 @@ void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& t
void PikaServer::DbSyncSendFile(const std::string& ip, int port, const std::string& table_name, uint32_t partition_id) {
std::shared_ptr<Partition> partition = GetTablePartitionById(table_name, partition_id);
if (!partition) {
LOG(WARNING) << "can not find Partition whose id is " << partition_id << " in table " << table_name << ", DbSync send file Failed";
LOG(WARNING) << "can not find Partition whose id is " << partition_id << " in table " << table_name
<< ", DbSync send file Failed";
return;
}

Expand Down Expand Up @@ -1520,7 +1523,7 @@ void PikaServer::InitStorageOptions() {
storage_options_.table_options.no_block_cache = true;
} else if (storage_options_.share_block_cache) {
storage_options_.table_options.block_cache =
rocksdb::NewLRUCache(storage_options_.block_cache_size, g_pika_conf->num_shard_bits());
rocksdb::NewLRUCache(storage_options_.block_cache_size, static_cast<int>(g_pika_conf->num_shard_bits()));
}

storage_options_.options.rate_limiter =
Expand All @@ -1529,6 +1532,22 @@ void PikaServer::InitStorageOptions() {
// For Storage small compaction
storage_options_.statistics_max_size = g_pika_conf->max_cache_statistic_keys();
storage_options_.small_compaction_threshold = g_pika_conf->small_compaction_threshold();

// rocksdb blob
if (g_pika_conf->enable_blob_files()) {
wanghenshui marked this conversation as resolved.
Show resolved Hide resolved
storage_options_.options.enable_blob_files = g_pika_conf->enable_blob_files();
storage_options_.options.min_blob_size = g_pika_conf->min_blob_size();
storage_options_.options.blob_file_size = g_pika_conf->blob_file_size();
storage_options_.options.blob_compression_type = PikaConf::GetCompression(g_pika_conf->blob_compression_type());
storage_options_.options.enable_blob_garbage_collection = g_pika_conf->enable_blob_garbage_collection();
storage_options_.options.blob_garbage_collection_age_cutoff = g_pika_conf->blob_garbage_collection_age_cutoff();
storage_options_.options.blob_garbage_collection_force_threshold =
g_pika_conf->blob_garbage_collection_force_threshold();
if (g_pika_conf->block_cache() > 0) { // blob cache less than 0,not open cache
storage_options_.options.blob_cache =
rocksdb::NewLRUCache(g_pika_conf->block_cache(), static_cast<int>(g_pika_conf->blob_num_shard_bits()));
}
}
}

storage::Status PikaServer::RewriteStorageOptions(const storage::OptionType& option_type,
Expand Down
2 changes: 2 additions & 0 deletions src/pstd/include/base_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ class BaseConf {
bool GetConfStr(const std::string& name, std::string* value) const;
bool GetConfBool(const std::string& name, bool* value) const;
bool GetConfStrVec(const std::string& name, std::vector<std::string>* value) const;
bool GetConfDouble(const std::string& name, double* value) const;

bool SetConfInt(const std::string& name, const int value);
bool SetConfInt64(const std::string& name, const int64_t value);

bool SetConfStr(const std::string& name, const std::string& value);
bool SetConfBool(const std::string& name, const bool value);
bool SetConfStrVec(const std::string& name, const std::vector<std::string>& value);
bool SetConfDouble(const std::string& name, const double value);

bool CheckConfExist(const std::string& name) const;

Expand Down
26 changes: 26 additions & 0 deletions src/pstd/src/base_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,19 @@ bool BaseConf::GetConfBool(const std::string& name, bool* value) const {
return false;
}

bool BaseConf::GetConfDouble(const std::string& name, double* value) const {
for (auto& item : rep_->item) {
if (item.type == Rep::kComment) {
continue;
}
if (name == item.name) {
*value = std::strtod(item.value.c_str(), nullptr);
return true;
}
}
return false;
}

bool BaseConf::SetConfInt(const std::string& name, const int value) {
for (size_t i = 0; i < rep_->item.size(); i++) {
if (rep_->item[i].type == Rep::kComment) {
Expand Down Expand Up @@ -275,6 +288,19 @@ bool BaseConf::SetConfStrVec(const std::string& name, const std::vector<std::str
return SetConfStr(name, value_str);
}

bool BaseConf::SetConfDouble(const std::string& name, const double value) {
for (size_t i = 0; i < rep_->item.size(); i++) {
if (rep_->item[i].type == Rep::kComment) {
continue;
}
if (name == rep_->item[i].name) {
rep_->item[i].value = std::to_string(value);
return true;
}
}
return false;
}

bool BaseConf::CheckConfExist(const std::string& name) const {
for (size_t i = 0; i < rep_->item.size(); i++) {
if (rep_->item[i].type == Rep::kComment) {
Expand Down