From ac0e97a3cf28f092adace1480f3c339d60f2c232 Mon Sep 17 00:00:00 2001 From: Mixficsol <838844609@qq.com> Date: Sun, 30 Jul 2023 23:28:22 +0800 Subject: [PATCH 1/2] Add diskrecovery command --- docs/ops/APIDifference.md | 6 ++++- include/pika_admin.h | 12 +++++++++ include/pika_command.h | 1 + include/pika_db.h | 4 ++- include/pika_server.h | 7 +++++ src/pika_admin.cc | 48 ++++++++++++++++++++++++++++++++++ src/pika_command.cc | 2 ++ src/pika_server.cc | 54 --------------------------------------- 8 files changed, 78 insertions(+), 56 deletions(-) diff --git a/docs/ops/APIDifference.md b/docs/ops/APIDifference.md index ac4d32847..430ed777c 100644 --- a/docs/ops/APIDifference.md +++ b/docs/ops/APIDifference.md @@ -122,4 +122,8 @@ slaveof命令允许通过指定write2file(binlog)的文件名称及同步位置 * field_end:返回的结束Field, 空字符串表示 +inf(无限大) ### pkhrscanrange key field_start field_end [MATCH pattern] [LIMIT limit] -类似于pkhscanrange, 逆序 \ No newline at end of file +类似于pkhscanrange, 逆序 + +### diskrecovery +Pika 原创命令,功能为当磁盘意外写满后,RocksDB 会进入写保护状态,当我们将空间调整为充足空间时,这个命令可以将 RocksDB 的写保护状态解除,变为可以继续写的状态, 避免了 Pika 因为磁盘写满后需要重启才能恢复写的情况,执行成功时返回 OK,如果当前磁盘空间依然不足,执行这个命令返回`"The available disk capacity is insufficient`,该命令执行时不需要额外参数,只需要执行 +diskrecovery 即可。 \ No newline at end of file diff --git a/include/pika_admin.h b/include/pika_admin.h index f77f10840..4f3438836 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -449,6 +449,18 @@ class HelloCmd : public Cmd { void DoInitial() override; }; +class DiskRecoveryCmd : public Cmd { +public: + DiskRecoveryCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} + void Do(std::shared_ptr slot = nullptr) override; + void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Merge() override{}; + Cmd* Clone() override { return new DiskRecoveryCmd(*this); } + +private: + void DoInitial() override; +}; + #ifdef WITH_COMMAND_DOCS class CommandCmd : public Cmd { public: diff --git a/include/pika_command.h b/include/pika_command.h index 7e88a6051..d2d0b8ff2 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -50,6 +50,7 @@ const std::string kCmdDummy = "dummy"; const std::string kCmdNameQuit = "quit"; const std::string kCmdNameHello = "hello"; const std::string kCmdNameCommand = "command"; +const std::string kCmdNameDiskRecovery = "diskrecovery"; // Migrate slot const std::string kCmdNameSlotsMgrtSlot = "slotsmgrtslot"; diff --git a/include/pika_db.h b/include/pika_db.h index 4490c3f89..3e4a80aae 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -56,7 +56,9 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { bool DBIsEmpty(); pstd::Status MovetoToTrash(const std::string& path); pstd::Status Leave(); - + std::map> GetSlots() { + return slots_; + } private: std::string db_name_; uint32_t slot_num_ = 0; diff --git a/include/pika_server.h b/include/pika_server.h index 6b6578bd8..a6f76d261 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -497,6 +497,13 @@ class PikaServer : public pstd::noncopyable { */ std::unique_ptr instant_; + /* + * Diskrecovery used + */ + std::map> GetDB() { + return dbs_; + } + friend class Cmd; friend class InfoCmd; friend class PikaReplClientConn; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index d841701a8..b28ab6642 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -2550,6 +2551,53 @@ void HelloCmd::Do(std::shared_ptr slot) { res_.AppendStringRaw(raw); } +void DiskRecoveryCmd::DoInitial() { + if (!CheckArg(argv_.size())) { + res_.SetRes(CmdRes::kWrongNum, kCmdNameDiskRecovery); + return; + } +} + + +void DiskRecoveryCmd::Do(std::shared_ptr slot) { + struct statvfs disk_info; + int ret = statvfs(g_pika_conf->db_path().c_str(), &disk_info); + int64_t least_free_size = g_pika_conf->least_resume_free_disk_size(); + uint64_t free_size = disk_info.f_bsize * disk_info.f_bfree; + if (free_size < least_free_size) { + res_.SetRes(CmdRes::kErrOther, "The available disk capacity is insufficient"); + return; + } + + std::shared_mutex dbs_rw; + std::shared_mutex slots_rw; + std::map background_errors; + std::shared_lock db_rwl(dbs_rw); + // loop every db + for (const auto& db_item : g_pika_server->GetDB()) { + if (!db_item.second) { + continue; + } + std::shared_lock slot_rwl(slots_rw); + // loop every slot + for (const auto &slot_item: db_item.second->GetSlots()) { + background_errors.clear(); + slot_item.second->DbRWLockReader(); + slot_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors); + slot_item.second->DbRWUnLock(); + for (const auto &item: background_errors) { + if (item.second != 0) { + rocksdb::Status s = slot_item.second->db()->GetDBByType(item.first)->Resume(); + if (!s.ok()) { + LOG(WARNING) << s.ToString(); + } + } + } + } + } + res_.SetRes(CmdRes::kOk, "The disk error has been recovered"); +} + #ifdef WITH_COMMAND_DOCS bool CommandCmd::CommandFieldCompare::operator()(const std::string& a, const std::string& b) const { diff --git a/src/pika_command.cc b/src/pika_command.cc index 9e8b71430..6088ea075 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -92,6 +92,8 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdDummy, std::move(dummyptr))); std::unique_ptr quitptr = std::make_unique(kCmdNameQuit, 1, kCmdFlagsRead); cmd_table->insert(std::pair>(kCmdNameQuit, std::move(quitptr))); + std::unique_ptr diskrecoveryptr = std::make_unique(kCmdNameDiskRecovery, 1, kCmdFlagsRead); + cmd_table->insert(std::pair>(kCmdNameDiskRecovery, std::move(diskrecoveryptr))); #ifdef WITH_COMMAND_DOCS std::unique_ptr commandptr = std::make_unique(kCmdNameCommand, -1, kCmdFlagsRead | kCmdFlagsAdmin); diff --git a/src/pika_server.cc b/src/pika_server.cc index 6af2391f5..a36833e17 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -1324,8 +1324,6 @@ void PikaServer::PubSubNumSub(const std::vector& channels, /******************************* PRIVATE *******************************/ void PikaServer::DoTimingTask() { - // Resume DB if satisfy the condition - AutoResumeDB(); // Maybe schedule compactrange AutoCompactRange(); // Purge log @@ -1519,58 +1517,6 @@ void PikaServer::AutoKeepAliveRSync() { } } -void PikaServer::AutoResumeDB() { - int64_t interval = g_pika_conf->resume_interval(); - int64_t least_free_size = g_pika_conf->least_resume_free_disk_size(); - struct timeval now; - gettimeofday(&now, nullptr); - // first check or time interval between now and last check is larger than variable "interval" - if (last_check_resume_time_.tv_sec == 0 || now.tv_sec - last_check_resume_time_.tv_sec >= interval) { - struct statvfs disk_info; - int ret = statvfs(g_pika_conf->db_path().c_str(), &disk_info); - if (ret == -1) { - LOG(WARNING) << "statvfs error: " << strerror(errno); - return; - } - double min_check_resume_ratio = g_pika_conf->min_check_resume_ratio(); - uint64_t free_size = disk_info.f_bsize * disk_info.f_bfree; - uint64_t total_size = disk_info.f_bsize * disk_info.f_blocks; - double disk_use_ratio = 1.0 - static_cast(free_size) / static_cast(total_size); - if (disk_use_ratio > min_check_resume_ratio) { - gettimeofday(&last_check_resume_time_, nullptr); - if (disk_use_ratio < min_check_resume_ratio || free_size < least_free_size) { - return; - } - - std::map background_errors; - std::shared_lock db_rwl(g_pika_server->dbs_rw_); - // loop every db - for (const auto &db_item: g_pika_server->dbs_) { - if (!db_item.second) { - continue; - } - std::shared_lock slot_rwl(db_item.second->slots_rw_); - // loop every slot - for (const auto &slot_item: db_item.second->slots_) { - background_errors.clear(); - slot_item.second->DbRWLockReader(); - slot_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, - &background_errors); - slot_item.second->DbRWUnLock(); - for (const auto &item: background_errors) { - if (item.second != 0) { - rocksdb::Status s = slot_item.second->db()->GetDBByType(item.first)->Resume(); - if (!s.ok()) { - LOG(WARNING) << s.ToString(); - } - } - } - } - } - } - } -} - void PikaServer::AutoUpdateNetworkMetric() { monotime current_time = getMonotonicUs(); size_t factor = 5e6; // us, 5s From 8616c516b46ac1466e38be1c7b2360621421d30f Mon Sep 17 00:00:00 2001 From: Mixficsol <838844609@qq.com> Date: Sun, 30 Jul 2023 23:45:55 +0800 Subject: [PATCH 2/2] Add diskrecovery command --- docs/ops/APIDifference.md | 3 +-- include/pika_admin.h | 13 +++++++------ include/pika_db.h | 1 + include/pika_server.h | 2 +- src/pika_admin.cc | 25 +++++++++++++++---------- src/pika_command.cc | 2 +- src/pika_db.cc | 2 +- 7 files changed, 27 insertions(+), 21 deletions(-) diff --git a/docs/ops/APIDifference.md b/docs/ops/APIDifference.md index 430ed777c..9891a1482 100644 --- a/docs/ops/APIDifference.md +++ b/docs/ops/APIDifference.md @@ -125,5 +125,4 @@ slaveof命令允许通过指定write2file(binlog)的文件名称及同步位置 类似于pkhscanrange, 逆序 ### diskrecovery -Pika 原创命令,功能为当磁盘意外写满后,RocksDB 会进入写保护状态,当我们将空间调整为充足空间时,这个命令可以将 RocksDB 的写保护状态解除,变为可以继续写的状态, 避免了 Pika 因为磁盘写满后需要重启才能恢复写的情况,执行成功时返回 OK,如果当前磁盘空间依然不足,执行这个命令返回`"The available disk capacity is insufficient`,该命令执行时不需要额外参数,只需要执行 -diskrecovery 即可。 \ No newline at end of file +Pika 原创命令,功能为当磁盘意外写满后,RocksDB 会进入写保护状态,当我们将空间调整为充足空间时,这个命令可以将 RocksDB 的写保护状态解除,变为可以继续写的状态, 避免了 Pika 因为磁盘写满后需要重启才能恢复写的情况,执行成功时返回 OK,如果当前磁盘空间依然不足,执行这个命令返回`"The available disk capacity is insufficient`,该命令执行时不需要额外参数,只需要执行 diskrecovery 即可。 \ No newline at end of file diff --git a/include/pika_admin.h b/include/pika_admin.h index 4f3438836..24d55a475 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -451,14 +451,15 @@ class HelloCmd : public Cmd { class DiskRecoveryCmd : public Cmd { public: - DiskRecoveryCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; - void Merge() override{}; - Cmd* Clone() override { return new DiskRecoveryCmd(*this); } + DiskRecoveryCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} + void Do(std::shared_ptr slot = nullptr) override; + void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Merge() override{}; + Cmd* Clone() override { return new DiskRecoveryCmd(*this); } private: - void DoInitial() override; + void DoInitial() override; + std::map background_errors_; }; #ifdef WITH_COMMAND_DOCS diff --git a/include/pika_db.h b/include/pika_db.h index 3e4a80aae..21c1d6ac1 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -29,6 +29,7 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { bool FlushSlotDB(); bool FlushSlotSubDB(const std::string& db_name); void SetBinlogIoError(); + void SetBinlogIoErrorrelieve(); bool IsBinlogIoError(); uint32_t SlotNum(); void GetAllSlots(std::set& slot_ids); diff --git a/include/pika_server.h b/include/pika_server.h index a6f76d261..0d78c26b1 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -501,7 +501,7 @@ class PikaServer : public pstd::noncopyable { * Diskrecovery used */ std::map> GetDB() { - return dbs_; + return dbs_; } friend class Cmd; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index b28ab6642..510bedd9c 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -2553,43 +2553,48 @@ void HelloCmd::Do(std::shared_ptr slot) { void DiskRecoveryCmd::DoInitial() { if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNameDiskRecovery); - return; + res_.SetRes(CmdRes::kWrongNum, kCmdNameDiskRecovery); + return; } } - void DiskRecoveryCmd::Do(std::shared_ptr slot) { struct statvfs disk_info; int ret = statvfs(g_pika_conf->db_path().c_str(), &disk_info); + if (ret == -1) { + std::stringstream tmp_stream; + tmp_stream << "statvfs error:" << strerror(errno); + const std::string res = tmp_stream.str(); + res_.SetRes(CmdRes::kErrOther, res); + return; + } int64_t least_free_size = g_pika_conf->least_resume_free_disk_size(); uint64_t free_size = disk_info.f_bsize * disk_info.f_bfree; if (free_size < least_free_size) { res_.SetRes(CmdRes::kErrOther, "The available disk capacity is insufficient"); return; } - - std::shared_mutex dbs_rw; std::shared_mutex slots_rw; - std::map background_errors; + std::shared_mutex dbs_rw; std::shared_lock db_rwl(dbs_rw); // loop every db for (const auto& db_item : g_pika_server->GetDB()) { if (!db_item.second) { continue; } + db_item.second->SetBinlogIoErrorrelieve(); std::shared_lock slot_rwl(slots_rw); // loop every slot for (const auto &slot_item: db_item.second->GetSlots()) { - background_errors.clear(); + background_errors_.clear(); slot_item.second->DbRWLockReader(); - slot_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors); + slot_item.second->db()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors_); slot_item.second->DbRWUnLock(); - for (const auto &item: background_errors) { + for (const auto &item: background_errors_) { if (item.second != 0) { rocksdb::Status s = slot_item.second->db()->GetDBByType(item.first)->Resume(); if (!s.ok()) { - LOG(WARNING) << s.ToString(); + res_.SetRes(CmdRes::kErrOther, "The restore operation failed."); } } } diff --git a/src/pika_command.cc b/src/pika_command.cc index 6088ea075..889f0e70c 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -92,7 +92,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdDummy, std::move(dummyptr))); std::unique_ptr quitptr = std::make_unique(kCmdNameQuit, 1, kCmdFlagsRead); cmd_table->insert(std::pair>(kCmdNameQuit, std::move(quitptr))); - std::unique_ptr diskrecoveryptr = std::make_unique(kCmdNameDiskRecovery, 1, kCmdFlagsRead); + std::unique_ptr diskrecoveryptr = std::make_unique(kCmdNameDiskRecovery, 1, kCmdFlagsRead | kCmdFlagsAdmin); cmd_table->insert(std::pair>(kCmdNameDiskRecovery, std::move(diskrecoveryptr))); #ifdef WITH_COMMAND_DOCS diff --git a/src/pika_db.cc b/src/pika_db.cc index 53831d30b..a1d83d24a 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -80,7 +80,7 @@ bool DB::FlushSlotSubDB(const std::string& db_name) { } void DB::SetBinlogIoError() { return binlog_io_error_.store(true); } - +void DB::SetBinlogIoErrorrelieve() { return binlog_io_error_.store(false); } bool DB::IsBinlogIoError() { return binlog_io_error_.load(); } uint32_t DB::SlotNum() { return slot_num_; }