diff --git a/conf/pika.conf b/conf/pika.conf index 496d974174..60772e4c29 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -18,11 +18,16 @@ rocksdb-periodic-second : 86400 * 3; # Master's run-id # master-run-id : -# The number of threads for running Pika. +# The number of Net-worker threads in Pika. # It's not recommended to set this value exceeds # the number of CPU cores on the deployment server. thread-num : 1 +# use Net worker thread to read redis Cache for [Get, HGet] command, +# which can significantly improve QPS and reduce latency when cache hit rate is high +# default value is "yes", set it to "no" if you wanna disable it +rtc-cache-read : yes + # Size of the thread pool, The threads within this pool # are dedicated to handling user requests. thread-pool-size : 12 diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 30a371f6cf..5b912592ab 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -59,6 +59,7 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr resp_ptr; LogOffset offset; std::string db_name; + bool cache_miss_in_rtc_; }; struct TxnStateBitMask { @@ -78,7 +79,7 @@ class PikaClientConn : public net::RedisConn { void ProcessRedisCmds(const std::vector& argvs, bool async, std::string* response) override; bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt); - void BatchExecRedisCmd(const std::vector& argvs); + void BatchExecRedisCmd(const std::vector& argvs, bool cache_miss_in_rtc); int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; } static void DoBackgroundTask(void* arg); @@ -136,12 +137,12 @@ class PikaClientConn : public net::RedisConn { std::shared_ptr user_; std::shared_ptr DoCmd(const PikaCmdArgsType& argv, const std::string& opt, - const std::shared_ptr& resp_ptr); + const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); void ProcessSlowlog(const PikaCmdArgsType& argv, uint64_t do_duration); void ProcessMonitor(const PikaCmdArgsType& argv); - void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr); + void ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, bool cache_miss_in_rtc); void TryWriteResp(); }; diff --git a/include/pika_command.h b/include/pika_command.h index de06c332c8..c9e924eb8c 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -575,6 +575,9 @@ class Cmd : public std::enable_shared_from_this { uint32_t GetCmdId() const { return cmdId_; }; bool CheckArg(uint64_t num) const; + bool IsCacheMissedInRtc() const; + void SetCacheMissedInRtc(bool value); + protected: // enable copy, used default copy // Cmd(const Cmd&); @@ -603,6 +606,7 @@ class Cmd : public std::enable_shared_from_this { uint64_t do_duration_ = 0; uint32_t cmdId_ = 0; uint32_t aclCategory_ = 0; + bool cache_missed_in_rtc_{false}; private: virtual void DoInitial() = 0; diff --git a/include/pika_conf.h b/include/pika_conf.h index d85b7550dd..6638622ed2 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -468,6 +468,7 @@ class PikaConf : public pstd::BaseConf { // Immutable config items, we don't use lock. bool daemonize() { return daemonize_; } + bool rtc_cache_read_enabled() { return rtc_cache_read_enabled_; } std::string pidfile() { return pidfile_; } int binlog_file_size() { return binlog_file_size_; } std::vector compression_per_level(); @@ -930,6 +931,7 @@ class PikaConf : public pstd::BaseConf { int level0_file_num_compaction_trigger_ = 4; int64_t max_client_response_size_ = 0; bool daemonize_ = false; + bool rtc_cache_read_enabled_ = false; int timeout_ = 0; std::string server_id_; std::string run_id_; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 7834c057ef..85e740de1a 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -36,7 +36,7 @@ PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread* } std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const std::string& opt, - const std::shared_ptr& resp_ptr) { + const std::shared_ptr& resp_ptr, bool cache_miss_in_rtc) { // Get command info std::shared_ptr c_ptr = g_pika_cmd_table_manager->GetCmd(opt); if (!c_ptr) { @@ -47,6 +47,7 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st } return tmp_ptr; } + c_ptr->SetCacheMissedInRtc(cache_miss_in_rtc); c_ptr->SetConn(shared_from_this()); c_ptr->SetResp(resp_ptr); @@ -273,6 +274,7 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& time_stat_->Reset(); if (async) { auto arg = new BgTaskArg(); + arg->cache_miss_in_rtc_ = false; arg->redis_cmds = argvs; time_stat_->enqueue_ts_ = time_stat_->before_queue_ts_ = pstd::NowMicros(); arg->conn_ptr = std::dynamic_pointer_cast(shared_from_this()); @@ -288,7 +290,8 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt); //we don't intercept pipeline batch (argvs.size() > 1) - if (argvs.size() == 1 && IsInterceptedByRTC(opt) && + if (g_pika_conf->rtc_cache_read_enabled() && + argvs.size() == 1 && IsInterceptedByRTC(opt) && PIKA_CACHE_NONE != g_pika_conf->cache_mode() && !IsInTxn()) { // read in cache @@ -296,13 +299,14 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& delete arg; return; } + arg->cache_miss_in_rtc_ = true; time_stat_->before_queue_ts_ = pstd::NowMicros(); } g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd); return; } - BatchExecRedisCmd(argvs); + BatchExecRedisCmd(argvs, false); } void PikaClientConn::DoBackgroundTask(void* arg) { @@ -320,15 +324,15 @@ void PikaClientConn::DoBackgroundTask(void* arg) { } } - conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds); + conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds, bg_arg->cache_miss_in_rtc_); } -void PikaClientConn::BatchExecRedisCmd(const std::vector& argvs) { +void PikaClientConn::BatchExecRedisCmd(const std::vector& argvs, bool cache_miss_in_rtc) { resp_num.store(static_cast(argvs.size())); for (const auto& argv : argvs) { std::shared_ptr resp_ptr = std::make_shared(); resp_array.push_back(resp_ptr); - ExecRedisCmd(argv, resp_ptr); + ExecRedisCmd(argv, resp_ptr, cache_miss_in_rtc); } time_stat_->process_done_ts_ = pstd::NowMicros(); TryWriteResp(); @@ -363,9 +367,6 @@ bool PikaClientConn::ReadCmdInCache(const net::RedisCmdArgsType& argv, const std return false; } //only read command(Get, HGet) will reach here, no need of record lock - if (c_ptr->db_->cache()->CacheStatus() != PIKA_CACHE_STATUS_OK) { - return false; - } bool read_status = c_ptr->DoReadCommandInCache(); auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap(); resp_num--; @@ -508,7 +509,8 @@ void PikaClientConn::ExitTxn() { } } -void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr) { +void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr& resp_ptr, + bool cache_miss_in_rtc) { // get opt std::string opt = argv[0]; pstd::StringToLower(opt); @@ -519,7 +521,7 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr); + std::shared_ptr cmd_ptr = DoCmd(argv, opt, resp_ptr, cache_miss_in_rtc); *resp_ptr = std::move(cmd_ptr->res().message()); resp_num--; } diff --git a/src/pika_command.cc b/src/pika_command.cc index b92f75dd22..bab8dd93f6 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -828,7 +828,7 @@ Cmd* GetCmdFromDB(const std::string& opt, const CmdTable& cmd_table) { bool Cmd::CheckArg(uint64_t num) const { return !((arity_ > 0 && num != arity_) || (arity_ < 0 && num < -arity_)); } Cmd::Cmd(std::string name, int arity, uint32_t flag, uint32_t aclCategory) - : name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory) { + : name_(std::move(name)), arity_(arity), flag_(flag), aclCategory_(aclCategory), cache_missed_in_rtc_(false) { } void Cmd::Initial(const PikaCmdArgsType& argv, const std::string& db_name) { @@ -891,10 +891,12 @@ void Cmd::DoCommand(const HintKeys& hint_keys) { if (IsNeedCacheDo() && PIKA_CACHE_NONE != g_pika_conf->cache_mode() && db_->cache()->CacheStatus() == PIKA_CACHE_STATUS_OK) { - if (IsNeedReadCache()) { + if (!cache_missed_in_rtc_ + && IsNeedReadCache()) { ReadCache(); } - if (is_read() && res().CacheMiss()) { + if (is_read() + && (res().CacheMiss() || cache_missed_in_rtc_)) { pstd::lock::MultiScopeRecordLock record_lock(db_->LockMgr(), current_key()); DoThroughDB(); if (IsNeedUpdateCache()) { @@ -1064,3 +1066,5 @@ void Cmd::SetResp(const std::shared_ptr& resp) { resp_ = resp; } std::shared_ptr Cmd::GetResp() { return resp_.lock(); } void Cmd::SetStage(CmdStage stage) { stage_ = stage; } +bool Cmd::IsCacheMissedInRtc() const { return cache_missed_in_rtc_; } +void Cmd::SetCacheMissedInRtc(bool value) { cache_missed_in_rtc_ = value; } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 913f669880..741168be94 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -541,6 +541,11 @@ int PikaConf::Load() { GetConfStr("daemonize", &dmz); daemonize_ = dmz == "yes"; + // read redis cache in Net worker threads + std::string rtc_enabled; + GetConfStr("rtc-cache-read", &rtc_enabled); + rtc_cache_read_enabled_ = rtc_enabled != "no"; + // binlog std::string wb; GetConfStr("write-binlog", &wb);