Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into incr
Browse files Browse the repository at this point in the history
  • Loading branch information
chejinge authored and brother-jin committed Aug 6, 2024
2 parents 8618729 + a2acd88 commit f2f3c08
Show file tree
Hide file tree
Showing 17 changed files with 98 additions and 61 deletions.
7 changes: 6 additions & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ rocksdb-periodic-second : 86400 * 3;
# Master's run-id
# master-run-id :

# The number of threads for running Pika.
# The number of Net-worker threads in Pika.
# It's not recommended to set this value exceeds
# the number of CPU cores on the deployment server.
thread-num : 1

# use Net worker thread to read redis Cache for [Get, HGet] command,
# which can significantly improve QPS and reduce latency when cache hit rate is high
# default value is "yes", set it to "no" if you wanna disable it
rtc-cache-read : yes

# Size of the thread pool, The threads within this pool
# are dedicated to handling user requests.
thread-pool-size : 12
Expand Down
7 changes: 4 additions & 3 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<std::string> resp_ptr;
LogOffset offset;
std::string db_name;
bool cache_miss_in_rtc_;
};

struct TxnStateBitMask {
Expand All @@ -78,7 +79,7 @@ class PikaClientConn : public net::RedisConn {
void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async, std::string* response) override;

bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);

Expand Down Expand Up @@ -136,12 +137,12 @@ class PikaClientConn : public net::RedisConn {
std::shared_ptr<User> user_;

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr);
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);

void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration);
void ProcessMonitor(const PikaCmdArgsType& argv);

void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr);
void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc);
void TryWriteResp();
};

Expand Down
4 changes: 4 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,9 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
uint32_t GetCmdId() const { return cmdId_; };
bool CheckArg(uint64_t num) const;

bool IsCacheMissedInRtc() const;
void SetCacheMissedInRtc(bool value);

protected:
// enable copy, used default copy
// Cmd(const Cmd&);
Expand Down Expand Up @@ -603,6 +606,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
uint64_t do_duration_ = 0;
uint32_t cmdId_ = 0;
uint32_t aclCategory_ = 0;
bool cache_missed_in_rtc_{false};

private:
virtual void DoInitial() = 0;
Expand Down
2 changes: 2 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ class PikaConf : public pstd::BaseConf {

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
bool rtc_cache_read_enabled() { return rtc_cache_read_enabled_; }
std::string pidfile() { return pidfile_; }
int binlog_file_size() { return binlog_file_size_; }
std::vector<rocksdb::CompressionType> compression_per_level();
Expand Down Expand Up @@ -930,6 +931,7 @@ class PikaConf : public pstd::BaseConf {
int level0_file_num_compaction_trigger_ = 4;
int64_t max_client_response_size_ = 0;
bool daemonize_ = false;
bool rtc_cache_read_enabled_ = false;
int timeout_ = 0;
std::string server_id_;
std::string run_id_;
Expand Down
9 changes: 5 additions & 4 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class IncrCmd : public Cmd {
int64_t new_value_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t ttl_ms_ = 0;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

Expand All @@ -140,7 +140,7 @@ class IncrbyCmd : public Cmd {
int64_t by_ = 0, new_value_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t ttl_ms_ = 0;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

Expand All @@ -165,7 +165,7 @@ class IncrbyfloatCmd : public Cmd {
double by_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t ttl_ms_ = 0;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

Expand Down Expand Up @@ -257,9 +257,10 @@ class AppendCmd : public Cmd {
private:
std::string key_;
std::string value_;
std::string new_value_;
void DoInitial() override;
rocksdb::Status s_;
int64_t ttl_ms_ = 0;
int64_t expired_timestamp_sec_ = 0;
std::string ToRedisProtocol() override;
};

Expand Down
4 changes: 0 additions & 4 deletions src/pika_cache_load_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ bool PikaCacheLoadThread::LoadHash(std::string& key, const std::shared_ptr<DB>&
int32_t len = 0;
db->storage()->HLen(key, &len);
if (0 >= len || CACHE_VALUE_ITEM_MAX_SIZE < len) {
LOG(WARNING) << "can not load key, because item size:" << len
<< " beyond max item size:" << CACHE_VALUE_ITEM_MAX_SIZE;
return false;
}

Expand Down Expand Up @@ -205,8 +203,6 @@ void *PikaCacheLoadThread::ThreadMain() {
for (auto & load_key : load_keys) {
if (LoadKey(std::get<0>(load_key), std::get<1>(load_key), std::get<2>(load_key))) {
++async_load_keys_num_;
} else {
LOG(WARNING) << "PikaCacheLoadThread::ThreadMain LoadKey: " << std::get<1>(load_key) << " failed !!!";
}

std::unique_lock lm(loadkeys_map_mutex_);
Expand Down
24 changes: 13 additions & 11 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread*
}

std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr) {
const std::shared_ptr<std::string>& resp_ptr, bool cache_miss_in_rtc) {
// Get command info
std::shared_ptr<Cmd> c_ptr = g_pika_cmd_table_manager->GetCmd(opt);
if (!c_ptr) {
Expand All @@ -47,6 +47,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
}
return tmp_ptr;
}
c_ptr->SetCacheMissedInRtc(cache_miss_in_rtc);
c_ptr->SetConn(shared_from_this());
c_ptr->SetResp(resp_ptr);

Expand Down Expand Up @@ -273,6 +274,7 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
time_stat_->Reset();
if (async) {
auto arg = new BgTaskArg();
arg->cache_miss_in_rtc_ = false;
arg->redis_cmds = argvs;
time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros();
arg->conn_ptr = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
Expand All @@ -288,21 +290,23 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt);

//we don't intercept pipeline batch (argvs.size() > 1)
if (argvs.size() == 1 && IsInterceptedByRTC(opt) &&
if (g_pika_conf->rtc_cache_read_enabled() &&
argvs.size() == 1 && IsInterceptedByRTC(opt) &&
PIKA_CACHE_NONE != g_pika_conf->cache_mode() &&
!IsInTxn()) {
// read in cache
if (ReadCmdInCache(argvs[0], opt)) {
delete arg;
return;
}
arg->cache_miss_in_rtc_ = true;
time_stat_->before_queue_ts_ = pstd::NowMicros();
}

g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd);
return;
}
BatchExecRedisCmd(argvs);
BatchExecRedisCmd(argvs, false);
}

void PikaClientConn::DoBackgroundTask(void* arg) {
Expand All @@ -320,15 +324,15 @@ void PikaClientConn::DoBackgroundTask(void* arg) {
}
}

conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds);
conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds, bg_arg->cache_miss_in_rtc_);
}

void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs) {
void PikaClientConn::BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs, bool cache_miss_in_rtc) {
resp_num.store(static_cast<int32_t>(argvs.size()));
for (const auto& argv : argvs) {
std::shared_ptr<std::string> resp_ptr = std::make_shared<std::string>();
resp_array.push_back(resp_ptr);
ExecRedisCmd(argv, resp_ptr);
ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc);
}
time_stat_->process_done_ts_ = pstd::NowMicros();
TryWriteResp();
Expand Down Expand Up @@ -363,9 +367,6 @@ bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std
return false;
}
//only read command(Get, HGet) will reach here, no need of record lock
if (c_ptr->db_->cache()->CacheStatus() != PIKA_CACHE_STATUS_OK) {
return false;
}
bool read_status = c_ptr->DoReadCommandInCache();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
resp_num--;
Expand Down Expand Up @@ -508,7 +509,8 @@ void PikaClientConn::ExitTxn() {
}
}

void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr) {
void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<std::string>& resp_ptr,
bool cache_miss_in_rtc) {
// get opt
std::string opt = argv[0];
pstd::StringToLower(opt);
Expand All @@ -519,7 +521,7 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr<s
}
}

std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr);
std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc);
*resp_ptr = std::move(cmd_ptr->res().message());
resp_num--;
}
Expand Down
10 changes: 7 additions & 3 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ Cmd* GetCmdFromDB(const std::string& opt, const CmdTable& cmd_table) {
bool Cmd::CheckArg(uint64_t num) const { return !((arity_ > 0 && num != arity_) || (arity_ < 0 && num < -arity_)); }

Cmd::Cmd(std::string name, int arity, uint32_t flag, uint32_t aclCategory)
: name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory) {
: name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory), cache_missed_in_rtc_(false) {
}

void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name) {
Expand Down Expand Up @@ -891,10 +891,12 @@ void Cmd::DoCommand(const HintKeys& hint_keys) {
if (IsNeedCacheDo()
&& PIKA_CACHE_NONE != g_pika_conf->cache_mode()
&& db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) {
if (IsNeedReadCache()) {
if (!cache_missed_in_rtc_
&& IsNeedReadCache()) {
ReadCache();
}
if (is_read() && res().CacheMiss()) {
if (is_read()
&& (res().CacheMiss() || cache_missed_in_rtc_)) {
pstd::lock::MultiScopeRecordLock record_lock(db_->LockMgr(), current_key());
DoThroughDB();
if (IsNeedUpdateCache()) {
Expand Down Expand Up @@ -1064,3 +1066,5 @@ void Cmd::SetResp(const std::shared_ptr<std::string>& resp) { resp_ = resp; }
std::shared_ptr<std::string> Cmd::GetResp() { return resp_.lock(); }

void Cmd::SetStage(CmdStage stage) { stage_ = stage; }
bool Cmd::IsCacheMissedInRtc() const { return cache_missed_in_rtc_; }
void Cmd::SetCacheMissedInRtc(bool value) { cache_missed_in_rtc_ = value; }
5 changes: 5 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,11 @@ int PikaConf::Load() {
GetConfStr("daemonize", &dmz);
daemonize_ = dmz == "yes";

// read redis cache in Net worker threads
std::string rtc_enabled;
GetConfStr("rtc-cache-read", &rtc_enabled);
rtc_cache_read_enabled_ = rtc_enabled != "no";

// binlog
std::string wb;
GetConfStr("write-binlog", &wb);
Expand Down
20 changes: 10 additions & 10 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ void IncrCmd::DoInitial() {
}

void IncrCmd::Do() {
s_ = db_->storage()->Incrby(key_, 1, &new_value_, &ttl_ms_);
s_ = db_->storage()->Incrby(key_, 1, &new_value_, &expired_timestamp_sec_);
if (s_.ok()) {
res_.AppendContent(":" + std::to_string(new_value_));
AddSlotKey("k", key_, db_);
Expand Down Expand Up @@ -294,7 +294,7 @@ std::string IncrCmd::ToRedisProtocol() {
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + ttl_ms_;
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
Expand All @@ -319,7 +319,7 @@ void IncrbyCmd::DoInitial() {
}

void IncrbyCmd::Do() {
s_ = db_->storage()->Incrby(key_, by_, &new_value_, &ttl_ms_);
s_ = db_->storage()->Incrby(key_, by_, &new_value_, &expired_timestamp_sec_);
if (s_.ok()) {
res_.AppendContent(":" + std::to_string(new_value_));
AddSlotKey("k", key_, db_);
Expand Down Expand Up @@ -358,7 +358,7 @@ std::string IncrbyCmd::ToRedisProtocol() {
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + ttl_ms_;
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
Expand All @@ -384,7 +384,7 @@ void IncrbyfloatCmd::DoInitial() {
}

void IncrbyfloatCmd::Do() {
s_ = db_->storage()->Incrbyfloat(key_, value_, &new_value_, &ttl_ms_);
s_ = db_->storage()->Incrbyfloat(key_, value_, &new_value_, &expired_timestamp_sec_);
if (s_.ok()) {
res_.AppendStringLenUint64(new_value_.size());
res_.AppendContent(new_value_);
Expand Down Expand Up @@ -427,7 +427,7 @@ std::string IncrbyfloatCmd::ToRedisProtocol() {
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + ttl_ms_;
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
Expand Down Expand Up @@ -558,7 +558,7 @@ void AppendCmd::DoInitial() {

void AppendCmd::Do() {
int32_t new_len = 0;
s_ = db_->storage()->Append(key_, value_, &new_len, &ttl_ms_);
s_ = db_->storage()->Append(key_, value_, &new_len, &expired_timestamp_sec_, new_value_);
if (s_.ok() || s_.IsNotFound()) {
res_.AppendInteger(new_len);
AddSlotKey("k", key_, db_);
Expand Down Expand Up @@ -593,14 +593,14 @@ std::string AppendCmd::ToRedisProtocol() {
RedisAppendContent(content, key_);
// time_stamp
char buf[100];
auto time_stamp = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + ttl_ms_;
auto time_stamp = expired_timestamp_sec_;
pstd::ll2string(buf, sizeof(buf), time_stamp);
std::string at(buf);
RedisAppendLenUint64(content, at.size(), "$");
RedisAppendContent(content, at);
// value
RedisAppendLenUint64(content, value_.size(), "$");
RedisAppendContent(content, value_);
RedisAppendLenUint64(content, new_value_.size(), "$");
RedisAppendContent(content, new_value_);
return content;
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class Storage {
// If key already exists and is a string, this command appends the value at
// the end of the string
// return the length of the string after the append operation
Status Append(const Slice& key, const Slice& value, int32_t* ret, int64_t* ttl);
Status Append(const Slice& key, const Slice& value, int32_t* ret, int64_t* ttl, std::string& out_new_value);

// Count the number of set bits (population counting) in a string.
// return the number of bits set to 1
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class Redis {
virtual Status SetsTTL(const Slice& key, int64_t* timestamp, std::string&& prefetch_meta = {});

// Strings Commands
Status Append(const Slice& key, const Slice& value, int32_t* ret, int64_t* ttl);
Status Append(const Slice& key, const Slice& value, int32_t* ret, int64_t* ttl, std::string& out_new_value);
Status BitCount(const Slice& key, int64_t start_offset, int64_t end_offset, int32_t* ret, bool have_range);
Status BitOp(BitOpType op, const std::string& dest_key, const std::vector<std::string>& src_keys, std::string &value_to_dest, int64_t* ret);
Status Decrby(const Slice& key, int64_t value, int64_t* ret);
Expand Down
3 changes: 2 additions & 1 deletion src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64
batch.Put(handles_[kMetaCF], base_meta_key.Encode(), meta_value);
HashesDataKey hashes_data_key(key, version, field);
Int64ToStr(value_buf, 32, value);
batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), value_buf);
BaseDataValue internal_value(value_buf);
batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), internal_value.Encode());
*ret = value;
} else {
version = parsed_hashes_meta_value.Version();
Expand Down
Loading

0 comments on commit f2f3c08

Please sign in to comment.