From a5a5a9e515ccb0504a9890f29e6477a84b79e707 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Tue, 26 Mar 2024 15:08:31 +0800 Subject: [PATCH 1/7] snapshot save & load --- src/checkpoint_manager.cpp | 9 --- src/client.cc | 2 +- src/cmd_admin.cc | 11 ++-- src/cmd_admin.h | 12 ++-- src/cmd_raft.cc | 9 +++ src/cmd_raft.h | 3 +- src/cmd_table_manager.cc | 4 +- src/db.cpp | 12 +++- src/db.h | 1 - src/pikiwidb.h | 2 +- src/praft/praft.cc | 83 +++++++++++++++++++++++++-- src/praft/praft.h | 9 ++- src/storage/include/storage/storage.h | 1 + src/storage/src/storage.cc | 10 +--- src/store.cc | 4 +- src/store.h | 5 +- 16 files changed, 132 insertions(+), 45 deletions(-) diff --git a/src/checkpoint_manager.cpp b/src/checkpoint_manager.cpp index 9938499c4..f2cd902fe 100644 --- a/src/checkpoint_manager.cpp +++ b/src/checkpoint_manager.cpp @@ -23,15 +23,6 @@ void CheckpointManager::Init(int instNum, DB* db) { void CheckpointManager::CreateCheckpoint(const std::string& path) { res_.clear(); - - if (!pstd::FileExists(path)) { - if (0 != pstd::CreatePath(path)) { - WARN("Create Dir {} fail!", path); - return; - } - INFO("Create Dir {} success!", path); - } - std::lock_guard Lock(shared_mutex_); for (int i = 0; i < checkpoint_num_; ++i) { checkpoint_infoes_[i].checkpoint_in_process = true; diff --git a/src/client.cc b/src/client.cc index 60b7b6d04..aaecc625d 100644 --- a/src/client.cc +++ b/src/client.cc @@ -12,10 +12,10 @@ #include "config.h" #include "log.h" #include "pikiwidb.h" +#include "praft.h" #include "pstd_string.h" #include "slow_log.h" #include "store.h" -#include "praft.h" namespace pikiwidb { diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index adc463a94..2ab195afc 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -6,9 +6,9 @@ */ #include "cmd_admin.h" -#include "store.h" #include "braft/raft.h" #include "praft.h" +#include "store.h" namespace pikiwidb { @@ -80,7 +80,7 @@ void SelectCmd::DoCmd(PClient* client) { client->SetRes(CmdRes::kOK); } -InfoCmd::InfoCmd(const std::string& name, int16_t arity) +InfoCmd::InfoCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsAdmin | kCmdFlagsReadonly, kAclCategoryAdmin) {} bool InfoCmd::DoInitial(PClient* client) { return true; } @@ -127,7 +127,7 @@ void InfoCmd::DoCmd(PClient* client) { message += "raft_state:up\r\n"; } else { message += "raft_state:down\r\n"; - } + } message += "raft_role:" + std::string(braft::state2str(node_status.state)) + "\r\n"; // message += "raft_is_voting:" + node_status.is_voting + "\r\n"; message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n"; @@ -141,9 +141,10 @@ void InfoCmd::DoCmd(PClient* client) { if (!status.ok()) { return client->SetRes(CmdRes::kErrOther, status.error_str()); } - + for (int i = 0; i < peers.size(); i++) { - message += "raft_node" + std::to_string(i) + ":addr=" + butil::ip2str(peers[i].addr.ip).c_str() + ",port=" + std::to_string(peers[i].addr.port) + "\r\n"; + message += "raft_node" + std::to_string(i) + ":addr=" + butil::ip2str(peers[i].addr.ip).c_str() + + ",port=" + std::to_string(peers[i].addr.port) + "\r\n"; } } diff --git a/src/cmd_admin.h b/src/cmd_admin.h index 476660e01..8ef4d6e58 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -85,14 +85,14 @@ class SelectCmd : public BaseCmd { }; class InfoCmd : public BaseCmd { - public: - InfoCmd(const std::string& name, int16_t arity); + public: + InfoCmd(const std::string& name, int16_t arity); - protected: - bool DoInitial(PClient* client) override; + protected: + bool DoInitial(PClient* client) override; - private: - void DoCmd(PClient* client) override; + private: + void DoCmd(PClient* client) override; }; } // namespace pikiwidb diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index 27f349fa9..8b58e6684 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -36,6 +36,8 @@ void RaftNodeCmd::DoCmd(PClient* client) { DoCmdAdd(client); } else if (!strcasecmp(cmd.c_str(), "REMOVE")) { DoCmdRemove(client); + } else if (!strcasecmp(cmd.c_str(), "DSS")) { + DoCmdSnapshot(client); } else { client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE only"); } @@ -70,6 +72,13 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { } } +void RaftNodeCmd::DoCmdSnapshot(PClient* client) { + auto s = PRAFT.DoSnapshot(); + if (s.ok()) { + client->SetRes(CmdRes::kOK); + } +} + RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} diff --git a/src/cmd_raft.h b/src/cmd_raft.h index a5e8f924d..bf36467e2 100644 --- a/src/cmd_raft.h +++ b/src/cmd_raft.h @@ -25,7 +25,7 @@ namespace pikiwidb { * : * : * - * RAFT.NODE REMOVE [id] + * RAFT.NODE REMOVE [id] * Remove an existing node from the cluster. * Reply: * -NOCLUSTER || @@ -45,6 +45,7 @@ class RaftNodeCmd : public BaseCmd { void DoCmd(PClient *client) override; void DoCmdAdd(PClient *client); void DoCmdRemove(PClient *client); + void DoCmdSnapshot(PClient *client); static constexpr std::string_view kAddCmd = "ADD"; static constexpr std::string_view kRemoveCmd = "REMOVE"; diff --git a/src/cmd_table_manager.cc b/src/cmd_table_manager.cc index 2ae39a0a2..e56299199 100644 --- a/src/cmd_table_manager.cc +++ b/src/cmd_table_manager.cc @@ -12,8 +12,8 @@ #include "cmd_keys.h" #include "cmd_kv.h" #include "cmd_list.h" -#include "cmd_set.h" #include "cmd_raft.h" +#include "cmd_set.h" #include "cmd_table_manager.h" #include "cmd_zset.h" @@ -46,7 +46,7 @@ void CmdTableManager::InitCmdTable() { // info ADD_COMMAND(Info, -1); - + // raft ADD_COMMAND(RaftCluster, -1); ADD_COMMAND(RaftNode, -2); diff --git a/src/db.cpp b/src/db.cpp index 3224f9610..cc694ca7f 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -30,6 +30,8 @@ DB::DB(int db_index, const std::string& db_path) ERROR("Storage open failed! {}", s.ToString()); abort(); } + checkpoint_manager_ = std::make_unique(); + checkpoint_manager_->Init(g_config.db_instance_num, this); opened_ = true; INFO("Open DB{} success!", db_index_); } @@ -45,8 +47,14 @@ void DB::DoBgSave(CheckpointInfo& checkpoint_info, const std::string& path, int checkpoint_info.checkpoint_in_process = false; } -void DB::CreateCheckpoint(const std::string& path) { checkpoint_manager_->CreateCheckpoint(path); } +void DB::CreateCheckpoint(const std::string& path) { + if (0 != pstd::CreatePath(path + '/' + std::to_string(db_index_))) { + WARN("Create dir {} fail !", path + '/' + std::to_string(db_index_)); + return; + } + checkpoint_manager_->CreateCheckpoint(path); +} void DB::WaitForCheckpointDone() { checkpoint_manager_->WaitForCheckpointDone(); } - + } // namespace pikiwidb diff --git a/src/db.h b/src/db.h index 1f5e28962..26a559a1c 100644 --- a/src/db.h +++ b/src/db.h @@ -55,7 +55,6 @@ class DB { bool opened_ = false; std::unique_ptr checkpoint_manager_; - }; } // namespace pikiwidb diff --git a/src/pikiwidb.h b/src/pikiwidb.h index 389096e24..f1d526779 100644 --- a/src/pikiwidb.h +++ b/src/pikiwidb.h @@ -9,8 +9,8 @@ #include "common.h" #include "event_loop.h" #include "io_thread_pool.h" -#include "tcp_connection.h" #include "praft/praft.h" +#include "tcp_connection.h" #define kPIKIWIDB_VERSION "4.0.0" diff --git a/src/praft/praft.cc b/src/praft/praft.cc index d68480c70..fb730009b 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -8,17 +8,18 @@ // // praft.cc -#include "praft.h" - #include #include #include +#include "braft/snapshot.h" + #include "client.h" #include "config.h" #include "event_loop.h" #include "log.h" #include "pikiwidb.h" +#include "praft.h" #include "praft.pb.h" #include "pstd_string.h" @@ -308,6 +309,16 @@ butil::Status PRaft::RemovePeer(const std::string& peer) { return {0, "OK"}; } +butil::Status PRaft::DoSnapshot() { + if (!node_) { + return ERROR_LOG_AND_STATUS("Node is not initialized"); + } + braft::SynchronizedClosure done; + node_->snapshot(&done); + done.wait(); + return {0, "OK"}; +} + void PRaft::OnJoinCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) { auto cli = join_ctx_.GetClient(); if (cli) { @@ -345,6 +356,46 @@ void PRaft::Apply(braft::Task& task) { } } +void PRaft::add_all_files(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path) { + for (const auto& entry : std::filesystem::directory_iterator(dir)) { + if (entry.is_directory()) { + if (entry.path() != "." && entry.path() != "..") { + INFO("dir_path = {}", entry.path().string()); + add_all_files(entry.path(), writer, path); + } + } else { + INFO("file_path = {}", std::filesystem::relative(entry.path(), path).string()); + if (writer->add_file(std::filesystem::relative(entry.path(), path)) != 0) { + WARN("出出出出 错错错错错错 啦啦啦啦啦啦"); + } + } + } +} + +void PRaft::recursive_copy(const std::filesystem::path& source, const std::filesystem::path& destination) { + if (std::filesystem::is_regular_file(source)) { + if (source.filename() == "__raft_snapshot_meta") { + return; + } else if (source.extension() == ".sst") { + // Create a hard link + INFO("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string()); + ::link(source.c_str(), destination.c_str()); + } else { + // Copy the file + INFO("copy success! source_file = {} , destination_file = {}", source.string(), destination.string()); + std::filesystem::copy_file(source, destination, std::filesystem::copy_options::overwrite_existing); + } + } else { + if (!pstd::FileExists(destination)) { + pstd::CreateDir(destination); + } + + for (const auto& entry : std::filesystem::directory_iterator(source)) { + recursive_copy(entry.path(), destination / entry.path().filename()); + } + } +} + // @braft::StateMachine void PRaft::on_apply(braft::Iterator& iter) { // A batch of tasks are committed, which must be processed through @@ -353,9 +404,33 @@ void PRaft::on_apply(braft::Iterator& iter) { } } -void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) {} +void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) { + brpc::ClosureGuard done_guard(done); + TasksVector tasks; + tasks.reserve(g_config.databases); + for (auto i = 0; i < g_config.databases; ++i) { + tasks.push_back({TaskType::kCheckpoint, i, {{TaskArg::kCheckpointPath, writer->get_path()}}}); + } + PSTORE.DoSomeThingSpecificDB(tasks); + PSTORE.WaitForCheckpointDone(); + auto writer_path = writer->get_path(); + add_all_files(writer_path, writer, writer_path); +} -int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { return 0; } +int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { + CHECK(!IsLeader()) << "Leader is not supposed to load snapshot"; + auto reader_path = reader->get_path(); // xx/snapshot_0000001 + auto db_path = g_config.dbpath; + PSTORE.Clear(); + for (int i = 0; i < g_config.databases; i++) { + auto sub_path = db_path + std::to_string(i); + pstd::DeleteDirIfExist(sub_path); + } + db_path.pop_back(); + recursive_copy(reader_path, db_path); + PSTORE.Init(); + return 0; +} void PRaft::on_leader_start(int64_t term) { WARN("Node {} start to be leader, term={}", node_->node_id().to_string(), term); diff --git a/src/praft/praft.h b/src/praft/praft.h index 8d8b1976a..e78ee1f89 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -7,11 +7,12 @@ #pragma once +#include #include #include +#include #include #include -#include #include "braft/configuration.h" #include "braft/raft.h" @@ -88,6 +89,7 @@ class PRaft : public braft::StateMachine { butil::Status AddPeer(const std::string& peer); butil::Status RemovePeer(const std::string& peer); butil::Status RaftRecvEntry(); + butil::Status DoSnapshot(); void ShutDown(); void Join(); @@ -125,6 +127,11 @@ class PRaft : public braft::StateMachine { void on_stop_following(const ::braft::LeaderChangeContext& ctx) override; void on_start_following(const ::braft::LeaderChangeContext& ctx) override; + private: + void add_all_files(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path); + + void recursive_copy(const std::filesystem::path& source, const std::filesystem::path& destination); + private: std::unique_ptr server_; // brpc std::unique_ptr node_; diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index c92cf59d7..16fd95439 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -41,6 +41,7 @@ inline constexpr size_t BATCH_DELETE_LIMIT = 100; inline constexpr size_t COMPACT_THRESHOLD_COUNT = 2000; inline constexpr uint64_t kNoFlush = std::numeric_limits::max(); +inline constexpr uint64_t kFlush = 0; using Options = rocksdb::Options; using BlockBasedTableOptions = rocksdb::BlockBasedTableOptions; diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 02e4312b5..9946a9d17 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -104,14 +104,6 @@ Status Storage::Open(const StorageOptions& storage_options, const std::string& d Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { INFO("DB{}'s RocksDB {} begin to generate a checkpoint!", db_id_, i); auto source_dir = AppendSubDirectory(dump_path, db_id_); - if (!pstd::FileExists(source_dir)) { - if (0 != pstd::CreatePath(source_dir)) { - WARN("Create Dir {} fail!", source_dir); - return Status::IOError("CreatePath() fail! dir_name : {} ", source_dir); - } - INFO("Create Dir {} success!", source_dir); - } - source_dir = AppendSubDirectory(source_dir, i); auto tmp_dir = source_dir + ".tmp"; @@ -132,7 +124,7 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { // 3) Create a checkpoint std::unique_ptr checkpoint_guard(checkpoint); - s = checkpoint->CreateCheckpoint(tmp_dir, kNoFlush, nullptr); + s = checkpoint->CreateCheckpoint(tmp_dir, kFlush, nullptr); if (!s.ok()) { WARN("DB{}'s RocksDB {} create checkpoint failed!. Error: {}", db_id_, i, s.ToString()); return s; diff --git a/src/store.cc b/src/store.cc index 1d0170586..61f55759a 100644 --- a/src/store.cc +++ b/src/store.cc @@ -29,7 +29,6 @@ void PStore::Init() { dbNum_ = g_config.databases; backends_.reserve(dbNum_); if (g_config.backend == kBackEndRocksDB) { - for (int i = 0; i < dbNum_; i++) { auto db = std::make_unique(i, g_config.dbpath); backends_.push_back(std::move(db)); @@ -39,6 +38,8 @@ void PStore::Init() { } } +void PStore::Clear() { backends_.clear(); } + void PStore::DoSomeThingSpecificDB(const TasksVector tasks) { std::for_each(tasks.begin(), tasks.end(), [this](const auto& task) { switch (task.type) { @@ -69,7 +70,6 @@ void PStore::WaitForCheckpointDone() { void PStore::trimSlash(std::string& dirName) { while (dirName.back() == '/') { dirName.pop_back(); - } } diff --git a/src/store.h b/src/store.h index 597b01c7f..77f6a7b9a 100644 --- a/src/store.h +++ b/src/store.h @@ -20,11 +20,11 @@ #include #include +#include "braft/raft.h" #include "checkpoint_manager.h" #include "common.h" #include "db.h" #include "storage/storage.h" -#include "braft/raft.h" namespace pikiwidb { @@ -59,12 +59,15 @@ class PStore { void Init(); + void Clear(); + std::unique_ptr& GetBackend(int32_t index) { return backends_[index]; }; void DoSomeThingSpecificDB(const TasksVector task); void WaitForCheckpointDone(); + int GetDBNumber() const { return dbNum_; } std::shared_mutex& SharedMutex() { return dbs_mutex_; } From ca88bb9d9b733417a1a5f248db4fdbd0d08bb60b Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Tue, 26 Mar 2024 16:11:37 +0800 Subject: [PATCH 2/7] shell file --- save_load.sh | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100755 save_load.sh diff --git a/save_load.sh b/save_load.sh new file mode 100755 index 000000000..4a6ad251b --- /dev/null +++ b/save_load.sh @@ -0,0 +1,19 @@ +#!/bin/bash +killall -9 pikiwidb +mkdir leader follower1 follower2 + +cd leader && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 7777 & + +cd follower1 && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 8888 & +sleep 10 +redis-cli -p 7777 raft.cluster init +redis-benchmark -p 7777 -c 5 -n 10000 -r 10000000 -d 1024 -t set + + +redis-cli -p 7777 raft.node DSS +redis-cli -p 7777 raft.node DSS + +redis-cli -p 8888 raft.cluster join 127.0.0.1:7777 + + + From 0104b46ef34c556f242dce70e4afd3d21e12ed15 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Tue, 16 Apr 2024 18:00:17 +0800 Subject: [PATCH 3/7] fix comments --- src/db.cc | 5 +++-- src/praft/praft.cc | 15 ++++++++------- src/praft/praft.h | 5 +++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/db.cc b/src/db.cc index 02c9a8def..26decaa26 100644 --- a/src/db.cc +++ b/src/db.cc @@ -54,8 +54,9 @@ void DB::DoBgSave(const std::string& path, int i) { } void DB::CreateCheckpoint(const std::string& path) { - if (0 != pstd::CreatePath(path + '/' + std::to_string(db_index_))) { - WARN("Create dir {} fail !", path + '/' + std::to_string(db_index_)); + auto tmp_path = path + '/' + std::to_string(db_index_); + if (0 != pstd::CreatePath(tmp_path)) { + WARN("Create dir {} fail !", tmp_path); return; } checkpoint_manager_->CreateCheckpoint(path); diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 3fd14fa3d..85ee1c80e 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -372,12 +372,13 @@ void PRaft::AppendLog(const Binlog& log, std::promise&& promise node_->apply(task); } -void PRaft::add_all_files(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path) { +void PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path) { + assert(writer); for (const auto& entry : std::filesystem::directory_iterator(dir)) { if (entry.is_directory()) { if (entry.path() != "." && entry.path() != "..") { INFO("dir_path = {}", entry.path().string()); - add_all_files(entry.path(), writer, path); + AddAllFiles(entry.path(), writer, path); } } else { INFO("file_path = {}", std::filesystem::relative(entry.path(), path).string()); @@ -388,9 +389,9 @@ void PRaft::add_all_files(const std::filesystem::path& dir, braft::SnapshotWrite } } -void PRaft::recursive_copy(const std::filesystem::path& source, const std::filesystem::path& destination) { +void PRaft::RecursiveCopy(const std::filesystem::path& source, const std::filesystem::path& destination) { if (std::filesystem::is_regular_file(source)) { - if (source.filename() == "__raft_snapshot_meta") { + if (source.filename() == PBRAFT_SNAPSHOT_META_FILE) { return; } else if (source.extension() == ".sst") { // Create a hard link @@ -407,7 +408,7 @@ void PRaft::recursive_copy(const std::filesystem::path& source, const std::files } for (const auto& entry : std::filesystem::directory_iterator(source)) { - recursive_copy(entry.path(), destination / entry.path().filename()); + RecursiveCopy(entry.path(), destination / entry.path().filename()); } } } @@ -453,7 +454,7 @@ void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done PSTORE.DoSomeThingSpecificDB(tasks); PSTORE.WaitForCheckpointDone(); auto writer_path = writer->get_path(); - add_all_files(writer_path, writer, writer_path); + AddAllFiles(writer_path, writer, writer_path); } int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { @@ -466,7 +467,7 @@ int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { pstd::DeleteDirIfExist(sub_path); } db_path.pop_back(); - recursive_copy(reader_path, db_path); + RecursiveCopy(reader_path, db_path); PSTORE.Init(); return 0; } diff --git a/src/praft/praft.h b/src/praft/praft.h index 1670b2ccf..ff9abf868 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -20,6 +20,7 @@ namespace pikiwidb { #define RAFT_DBID_LEN 32 +#define PBRAFT_SNAPSHOT_META_FILE "__raft_snapshot_meta" #define PRAFT PRaft::Instance() @@ -139,9 +140,9 @@ class PRaft : public braft::StateMachine { void on_start_following(const ::braft::LeaderChangeContext& ctx) override; private: - void add_all_files(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path); + void AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path); - void recursive_copy(const std::filesystem::path& source, const std::filesystem::path& destination); + void RecursiveCopy(const std::filesystem::path& source, const std::filesystem::path& destination); private: std::unique_ptr server_{nullptr}; // brpc From b303f99b138ce297a1b8372f55e232a59fcf0590 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Sat, 20 Apr 2024 18:30:13 +0800 Subject: [PATCH 4/7] remove snapshotManager --- save_load.sh | 6 +- src/checkpoint_manager.cc | 36 ----------- src/checkpoint_manager.h | 37 ----------- src/db.cc | 91 +++++++++++++++++++++++---- src/db.h | 23 +++---- src/praft/praft.cc | 61 +++++------------- src/praft/praft.h | 6 +- src/storage/include/storage/storage.h | 2 + src/storage/src/storage.cc | 80 +++++++++++++++++++++-- src/store.cc | 35 ++++++----- src/store.h | 14 ++--- 11 files changed, 214 insertions(+), 177 deletions(-) delete mode 100644 src/checkpoint_manager.cc delete mode 100644 src/checkpoint_manager.h diff --git a/save_load.sh b/save_load.sh index 4a6ad251b..b2b6fd836 100755 --- a/save_load.sh +++ b/save_load.sh @@ -7,11 +7,11 @@ cd leader && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf - cd follower1 && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 8888 & sleep 10 redis-cli -p 7777 raft.cluster init -redis-benchmark -p 7777 -c 5 -n 10000 -r 10000000 -d 1024 -t set +redis-benchmark -p 7777 -c 5 -n 10000 -r 10000000 -d 1024 -t hset -redis-cli -p 7777 raft.node DSS -redis-cli -p 7777 raft.node DSS +redis-cli -p 7777 raft.node DOSNAPSHOT +redis-cli -p 7777 raft.node DOSNAPSHOT redis-cli -p 8888 raft.cluster join 127.0.0.1:7777 diff --git a/src/checkpoint_manager.cc b/src/checkpoint_manager.cc deleted file mode 100644 index 72c39cb0a..000000000 --- a/src/checkpoint_manager.cc +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ - -#include "checkpoint_manager.h" -#include "db.h" -#include "log.h" -#include "pstd/env.h" - -namespace pikiwidb { - -void CheckpointManager::Init(int instNum, DB* db) { - checkpoint_num_ = instNum; - res_.reserve(checkpoint_num_); - db_ = db; -} - -void CheckpointManager::CreateCheckpoint(const std::string& path) { - res_.clear(); - std::lock_guard Lock(shared_mutex_); - for (int i = 0; i < checkpoint_num_; ++i) { - auto res = std::async(std::launch::async, &DB::DoBgSave, db_, path, i); - res_.push_back(std::move(res)); - } -} - -void CheckpointManager::WaitForCheckpointDone() { - for (auto& r : res_) { - r.get(); - } -} - -} // namespace pikiwidb diff --git a/src/checkpoint_manager.h b/src/checkpoint_manager.h deleted file mode 100644 index 065027424..000000000 --- a/src/checkpoint_manager.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ - -#pragma once - -#include -#include -#include - -namespace pikiwidb { - -class DB; - -class CheckpointManager { - public: - CheckpointManager() = default; - ~CheckpointManager() = default; - - void Init(int instNum, DB* db); - - void CreateCheckpoint(const std::string& path); - - void WaitForCheckpointDone(); - - private: - int checkpoint_num_ = 0; - std::vector> res_; - DB* db_ = nullptr; - - std::shared_mutex shared_mutex_; -}; - -} // namespace pikiwidb diff --git a/src/db.cc b/src/db.cc index 26decaa26..a56f7dd93 100644 --- a/src/db.cc +++ b/src/db.cc @@ -7,7 +7,6 @@ #include "db.h" -#include "checkpoint_manager.h" #include "config.h" #include "praft/praft.h" #include "pstd/log.h" @@ -16,11 +15,11 @@ extern pikiwidb::PConfig g_config; namespace pikiwidb { -DB::DB(int db_index, const std::string& db_path) - : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') { +DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num) + : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/'), rocksdb_inst_num_(rocksdb_inst_num) { storage::StorageOptions storage_options; storage_options.options.create_if_missing = true; - storage_options.db_instance_num = g_config.db_instance_num; + storage_options.db_instance_num = rocksdb_inst_num_; storage_options.db_id = db_index_; // options for CF @@ -37,31 +36,97 @@ DB::DB(int db_index, const std::string& db_path) ERROR("Storage open failed! {}", s.ToString()); abort(); } - - checkpoint_manager_ = std::make_unique(); - checkpoint_manager_->Init(g_config.db_instance_num, this); - opened_ = true; INFO("Open DB{} success!", db_index_); } -void DB::DoBgSave(const std::string& path, int i) { +void DB::DoCheckpoint(const std::string& path, int i) { // 1) always hold storage's sharedLock std::shared_lock sharedLock(storage_mutex_); - // 2)Create the storage's checkpoint 。 + // 2)Create the checkpoint of rocksdb i. auto status = storage_->CreateCheckpoint(path, i); } -void DB::CreateCheckpoint(const std::string& path) { +void DB::LoadCheckpoint(const std::string& path, const std::string& db_path, int i) { + // 1) Already holding the mutual exclusion lock + + // 2) Load the checkpoint of rocksdb i. + auto status = storage_->LoadCheckpoint(path, db_path, i); +} + +void DB::CreateCheckpoint(const std::string& path, bool sync) { auto tmp_path = path + '/' + std::to_string(db_index_); if (0 != pstd::CreatePath(tmp_path)) { WARN("Create dir {} fail !", tmp_path); return; } - checkpoint_manager_->CreateCheckpoint(path); + + std::vector> result; + result.reserve(rocksdb_inst_num_); + for (int i = 0; i < rocksdb_inst_num_; ++i) { + // In a new thread, create a checkpoint for the specified rocksdb i + // In DB::DoBgSave, a read lock is always held to protect the Storage + // corresponding to this rocksdb i. + auto res = std::async(std::launch::async, &DB::DoCheckpoint, this, path, i); + result.push_back(std::move(res)); + } + if (sync) { + for (auto& r : result) { + r.get(); + } + } } -void DB::WaitForCheckpointDone() { checkpoint_manager_->WaitForCheckpointDone(); } +void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) { + opened_.store(false); + // 对于每一个 rocksdb 分别去 Load 自己的 DB. + auto checkpoint_path = path + '/' + std::to_string(db_index_); + if (0 != pstd::IsDir(path)) { + WARN("Checkpoint dir {} does not exist!", checkpoint_path); + return; + } + if (0 != pstd::IsDir(db_path_)) { + if (0 != pstd::CreateDir(db_path_)) { + WARN("Create dir {} fail !", db_path_); + return; + } + } + + std::lock_guard lock(storage_mutex_); + std::vector> result; + result.reserve(rocksdb_inst_num_); + for (int i = 0; i < rocksdb_inst_num_; ++i) { + // In a new thread, Load a checkpoint for the specified rocksdb i + // In DB::DoBgSave, a read lock is always held to protect the Storage + // corresponding to this rocksdb i. + auto res = std::async(std::launch::async, &DB::LoadCheckpoint, this, checkpoint_path, db_path_, i); + result.push_back(std::move(res)); + } + for (auto& r : result) { + r.get(); + } + // 重新启动 + storage::StorageOptions storage_options; + storage_options.options.create_if_missing = true; + storage_options.db_instance_num = rocksdb_inst_num_; + storage_options.db_id = db_index_; + + // options for CF + storage_options.options.ttl = g_config.rocksdb_ttl_second; + storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second; + if (g_config.use_raft) { + storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise&& promise) { + r.AppendLog(log, std::move(promise)); + }; + } + storage_ = std::make_unique(); + if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) { + ERROR("Storage open failed! {}", s.ToString()); + abort(); + } + opened_.store(true); + INFO("DB{} load a checkpoint from {} success!", db_index_, path); +} } // namespace pikiwidb diff --git a/src/db.h b/src/db.h index 11dc3a207..2794ac440 100644 --- a/src/db.h +++ b/src/db.h @@ -7,15 +7,14 @@ #pragma once +#include #include "storage/storage.h" namespace pikiwidb { -class CheckpointManager; - class DB { public: - DB(int db_index, const std::string& db_path); + DB(int db_index, const std::string& db_path, int rocksdb_inst_num); std::unique_ptr& GetStorage() { return storage_; } @@ -27,20 +26,20 @@ class DB { void UnLockShared() { storage_mutex_.unlock_shared(); } - void CreateCheckpoint(const std::string& path); - - [[maybe_unused]] void DoBgSave(const std::string&, int i); + void CreateCheckpoint(const std::string& path, bool sync); - void WaitForCheckpointDone(); + void LoadDBFromCheckPoint(const std::string& path, bool sync = false); int GetDbIndex() { return db_index_; } + private: + void DoCheckpoint(const std::string&, int i); + void LoadCheckpoint(const std::string&, const std::string& db_path, int i); + private: const int db_index_ = 0; const std::string db_path_; - const std::string dump_parent_path_; - const std::string dump_path_; - + int rocksdb_inst_num_ = 0; /** * If you want to change the pointer that points to storage, * you must first acquire a mutex lock. @@ -49,9 +48,7 @@ class DB { */ std::shared_mutex storage_mutex_; std::unique_ptr storage_; - bool opened_ = false; - - std::unique_ptr checkpoint_manager_; + std::atomic_bool opened_ = false; }; } // namespace pikiwidb diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 0dbecbab1..4489beae3 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -115,7 +115,6 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { server_.reset(); return ERROR_LOG_AND_STATUS("Failed to start server"); } - // It's ok to start PRaft; assert(group_id.size() == RAFT_GROUPID_LEN); this->group_id_ = group_id; @@ -580,7 +579,7 @@ void PRaft::AppendLog(const Binlog& log, std::promise&& promise node_->apply(task); } -void PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path) { +int PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path) { assert(writer); for (const auto& entry : std::filesystem::directory_iterator(dir)) { if (entry.is_directory()) { @@ -591,34 +590,12 @@ void PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* } else { DEBUG("file_path = {}", std::filesystem::relative(entry.path(), path).string()); if (writer->add_file(std::filesystem::relative(entry.path(), path)) != 0) { - ERROR("add file error!"); + ERROR("add file {} to snapshot fail!", entry.path().string()); + return -1; } } } -} - -void PRaft::RecursiveCopy(const std::filesystem::path& source, const std::filesystem::path& destination) { - if (std::filesystem::is_regular_file(source)) { - if (source.filename() == PBRAFT_SNAPSHOT_META_FILE) { - return; - } else if (source.extension() == ".sst") { - // Create a hard link - DEBUG("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string()); - ::link(source.c_str(), destination.c_str()); - } else { - // Copy the file - DEBUG("copy success! source_file = {} , destination_file = {}", source.string(), destination.string()); - std::filesystem::copy_file(source, destination, std::filesystem::copy_options::overwrite_existing); - } - } else { - if (!pstd::FileExists(destination)) { - pstd::CreateDir(destination); - } - - for (const auto& entry : std::filesystem::directory_iterator(source)) { - RecursiveCopy(entry.path(), destination / entry.path().filename()); - } - } + return 0; } // @braft::StateMachine @@ -663,30 +640,24 @@ void PRaft::on_apply(braft::Iterator& iter) { } void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) { + assert(writer); brpc::ClosureGuard done_guard(done); - TasksVector tasks; - tasks.reserve(g_config.databases); - for (auto i = 0; i < g_config.databases; ++i) { - tasks.push_back({TaskType::kCheckpoint, i, {{TaskArg::kCheckpointPath, writer->get_path()}}}); - } + auto path = writer->get_path(); + INFO("Saving snapshot to {}", path); + TasksVector tasks(1, {TaskType::kCheckpoint, db_id_, {{TaskArg::kCheckpointPath, path}}, true}); PSTORE.DoSomeThingSpecificDB(tasks); - PSTORE.WaitForCheckpointDone(); - auto writer_path = writer->get_path(); - AddAllFiles(writer_path, writer, writer_path); + if (auto res = AddAllFiles(path, writer, path); res != 0) { + done->status().set_error(EIO, "Fail to add file to writer"); + } } int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { CHECK(!IsLeader()) << "Leader is not supposed to load snapshot"; - auto reader_path = reader->get_path(); // xx/snapshot_0000001 - auto db_path = g_config.dbpath; - PSTORE.Clear(); - for (int i = 0; i < g_config.databases; i++) { - auto sub_path = db_path + std::to_string(i); - pstd::DeleteDirIfExist(sub_path); - } - db_path.pop_back(); - RecursiveCopy(reader_path, db_path); - PSTORE.Init(); + assert(reader); + auto reader_path = reader->get_path(); // xx/snapshot_0000001 + auto path = g_config.dbpath + std::to_string(db_id_); // db/db_id + TasksVector tasks(1, {TaskType::kLoadDBFromCheckPoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true}); + PSTORE.DoSomeThingSpecificDB(tasks); return 0; } diff --git a/src/praft/praft.h b/src/praft/praft.h index 6eeb8d4b6..c2ff888be 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -22,7 +22,6 @@ namespace pikiwidb { -#define PBRAFT_SNAPSHOT_META_FILE "__raft_snapshot_meta" #define RAFT_GROUPID_LEN 32 #define OK "+OK" @@ -156,9 +155,7 @@ class PRaft : public braft::StateMachine { void on_start_following(const ::braft::LeaderChangeContext& ctx) override; private: - void AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path); - - void RecursiveCopy(const std::filesystem::path& source, const std::filesystem::path& destination); + static int AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path); private: std::unique_ptr server_{nullptr}; // brpc @@ -168,6 +165,7 @@ class PRaft : public braft::StateMachine { ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command std::string group_id_; // group id + int db_id_ = 0; // db_id }; } // namespace pikiwidb diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 35ffb4e21..aad21e337 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -183,6 +183,8 @@ class Storage { Status CreateCheckpoint(const std::string& dump_path, int index); + Status LoadCheckpoint(const std::string& dump_path, const std::string& db_path, int index); + Status LoadCursorStartKey(const DataType& dtype, int64_t cursor, char* type, std::string* start_key); Status StoreCursorStartKey(const DataType& dtype, int64_t cursor, char type, const std::string& next_key); diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 222045a13..418ce3e21 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include +#include #include #include @@ -23,6 +24,9 @@ #include "storage/storage.h" #include "storage/util.h" +#define PRAFT_SNAPSHOT_META_FILE "__raft_snapshot_meta" +#define SST_FILE_EXTENSION ".sst" + namespace storage { extern std::string BitOpOperate(BitOpType op, const std::vector& src_values, int64_t max_len); class Redis; @@ -82,6 +86,42 @@ static std::string AppendSubDirectory(const std::string& db_path, int index) { } } +static int RecursiveLinkAndCopy(const std::filesystem::path& source, const std::filesystem::path& destination) { + if (std::filesystem::is_regular_file(source)) { + if (source.filename() == PRAFT_SNAPSHOT_META_FILE) { + return 0; + } else if (source.extension() == SST_FILE_EXTENSION) { + // Create a hard link + DEBUG("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string()); + if (::link(source.c_str(), destination.c_str()) < 0) { + WARN("hard link file {} fail", source.string()); + return -1; + } + } else { + // Copy the file + DEBUG("copy success! source_file = {} , destination_file = {}", source.string(), destination.string()); + if (!std::filesystem::copy_file(source, destination, std::filesystem::copy_options::overwrite_existing)) { + WARN("copy file {} fail", source.string()); + return -1; + } + } + } else { + if (!pstd::FileExists(destination)) { + if (pstd::CreateDir(destination) != 0) { + WARN("create dir {} fail", destination.string()); + return -1; + } + } + + for (const auto& entry : std::filesystem::directory_iterator(source)) { + if (RecursiveLinkAndCopy(entry.path(), destination / entry.path().filename()) != 0) { + return -1; + } + } + } + return 0; +} + Status Storage::Open(const StorageOptions& storage_options, const std::string& db_path) { mkpath(db_path.c_str(), 0755); db_instance_num_ = storage_options.db_instance_num; @@ -110,7 +150,7 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { auto tmp_dir = source_dir + ".tmp"; // 1) Make sure the temporary directory does not exist if (!pstd::DeleteDirIfExist(tmp_dir)) { - WARN("DB{}'s RocksDB {} delete dir fail!", db_id_, i); + WARN("DB{}'s RocksDB {} delete directory fail!", db_id_, i); return Status::IOError("DeleteDirIfExist() fail! dir_name : {} ", tmp_dir); } @@ -125,7 +165,7 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { // 3) Create a checkpoint std::unique_ptr checkpoint_guard(checkpoint); - s = checkpoint->CreateCheckpoint(tmp_dir, kNoFlush, nullptr); + s = checkpoint->CreateCheckpoint(tmp_dir, kFlush, nullptr); if (!s.ok()) { WARN("DB{}'s RocksDB {} create checkpoint failed!. Error: {}", db_id_, i, s.ToString()); return s; @@ -133,7 +173,7 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { // 4) Make sure the source directory does not exist if (!pstd::DeleteDirIfExist(source_dir)) { - WARN("DB{}'s RocksDB {} delete dir {} fail!", db_id_, i, source_dir); + WARN("DB{}'s RocksDB {} delete directory {} fail!", db_id_, i, source_dir); return Status::IOError("DeleteDirIfExist() fail! dir_name : {} ", source_dir); } @@ -144,13 +184,45 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { if (!pstd::DeleteDirIfExist(tmp_dir)) { WARN("DB{}'s RocksDB {} fail to delete the rename failed directory {} ", db_id_, i, tmp_dir); } - return Status::IOError("Rename dir {} fail!", tmp_dir); + return Status::IOError("Rename directory {} fail!", tmp_dir); } INFO("DB{}'s RocksDB {} create checkpoint {} success!", db_id_, i, source_dir); return Status::OK(); } +Status Storage::LoadCheckpoint(const std::string& dump_path, const std::string& db_path, int i) { + auto rocksdb_checkpoint_path = AppendSubDirectory(db_path, i); + INFO("DB{}'s RocksDB {} begin to load a checkpoint from {}!", db_id_, i, rocksdb_checkpoint_path); + + // 首先将原来的 db path 改名, 当 load 失败的时候保证原来的数据还在. + auto tmp_path = db_path + ".tmp"; + if (auto status = pstd::RenameFile(db_path, tmp_path); status != 0) { + WARN("DB{}'s RocksDB {} rename db directory {} to temporary directory {} fail!", db_id_, i, db_path, tmp_path); + return Status::IOError("Rename directory {} fail!", db_path); + } + + if (0 != pstd::CreateDir(db_path)) { + pstd::RenameFile(tmp_path, db_path); + WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path); + return Status::IOError("Create directory {} fail!", db_path); + } + + // 将原来的数据拷贝到 DB 目录下. + if (RecursiveLinkAndCopy(dump_path, db_path) != 0) { + pstd::DeleteDir(db_path); + pstd::RenameFile(tmp_path, db_path); + WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path); + return Status::IOError("recursive link and copy directory {} fail!", dump_path); + } + + // 删除掉 tmp + if (auto s = rocksdb::DestroyDB(tmp_path, rocksdb::Options()); !s.ok()) { + WARN("Failure to destroy the old DB, path = {}", tmp_path); + } + return Status::OK(); +} + Status Storage::LoadCursorStartKey(const DataType& dtype, int64_t cursor, char* type, std::string* start_key) { std::string index_key = DataTypeTag[dtype] + std::to_string(cursor); std::string index_value; diff --git a/src/store.cc b/src/store.cc index 7b3c59a51..c67da7127 100644 --- a/src/store.cc +++ b/src/store.cc @@ -7,7 +7,6 @@ #include -#include "checkpoint_manager.h" #include "config.h" #include "log.h" #include "store.h" @@ -28,7 +27,7 @@ void PStore::Init() { backends_.reserve(dbNum_); if (g_config.backend == kBackEndRocksDB) { for (int i = 0; i < dbNum_; i++) { - auto db = std::make_unique(i, g_config.dbpath); + auto db = std::make_unique(i, g_config.dbpath, g_config.db_instance_num); backends_.push_back(std::move(db)); } } else { @@ -36,39 +35,45 @@ void PStore::Init() { } } -void PStore::Clear() { backends_.clear(); } +void PStore::Clear() { + std::lock_guard lock(dbs_mutex_); + backends_.clear(); +} void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) { std::for_each(tasks.begin(), tasks.end(), [this](const auto& task) { + if (task.db < 0 || task.db >= dbNum_) { + WARN("The database index is out of range."); + return; + } + auto& db = backends_[task.db]; switch (task.type) { case kCheckpoint: { - if (task.db < 0 || task.db >= dbNum_) { - WARN("The database index is out of range."); + if (auto s = task.args.find(kCheckpointPath); s == task.args.end()) { + WARN("The critical parameter 'path' is missing for do a checkpoint."); return; } - auto& db = backends_[task.db]; + auto path = task.args.find(kCheckpointPath)->second; + trimSlash(path); + db->CreateCheckpoint(path, task.sync); + break; + } + case kLoadDBFromCheckPoint: { if (auto s = task.args.find(kCheckpointPath); s == task.args.end()) { - WARN("The critical parameter 'path' is missing in the checkpoint."); + WARN("The critical parameter 'path' is missing for load a checkpoint."); return; } auto path = task.args.find(kCheckpointPath)->second; trimSlash(path); - db->CreateCheckpoint(path); + db->LoadDBFromCheckPoint(path, task.sync); break; } - default: break; } }); } -void PStore::WaitForCheckpointDone() { - for (auto& db : backends_) { - db->WaitForCheckpointDone(); - } -} - void PStore::trimSlash(std::string& dirName) { while (dirName.back() == '/') { dirName.pop_back(); diff --git a/src/store.h b/src/store.h index 3c60aee09..2fd207474 100644 --- a/src/store.h +++ b/src/store.h @@ -17,21 +17,21 @@ namespace pikiwidb { -enum TaskType { - kCheckpoint, -}; +enum TaskType { kCheckpoint = 0, kLoadDBFromCheckPoint }; enum TaskArg { - kCheckpointPath, + kCheckpointPath = 0, }; struct TaskContext { TaskType type; int db; std::map args; - TaskContext(TaskType t) : type(t) {} - TaskContext(TaskType t, int d) : type(t), db(d) {} - TaskContext(TaskType t, int d, const std::map& a) : type(t), db(d), args(a) {} + bool sync; + TaskContext(TaskType t, bool s = false) : type(t), sync(s) {} + TaskContext(TaskType t, int d, bool s = false) : type(t), db(d), sync(s) {} + TaskContext(TaskType t, int d, const std::map& a, bool s = false) + : type(t), db(d), args(a), sync(s) {} }; using TasksVector = std::vector; From 3e53764bc6352e499ba5402ac788c54606605579 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Sun, 21 Apr 2024 15:09:35 +0800 Subject: [PATCH 5/7] fix load checkpoint fail --- src/db.cc | 5 +-- src/praft/praft.cc | 2 +- src/pstd/pstd_string.cc | 6 +++ src/pstd/pstd_string.h | 2 + src/storage/include/storage/storage.h | 1 + src/storage/src/redis.cc | 5 +++ src/storage/src/storage.cc | 65 ++++++++++++++------------- src/store.cc | 12 ++--- src/store.h | 1 - 9 files changed, 55 insertions(+), 44 deletions(-) diff --git a/src/db.cc b/src/db.cc index a56f7dd93..924055847 100644 --- a/src/db.cc +++ b/src/db.cc @@ -36,7 +36,7 @@ DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num) ERROR("Storage open failed! {}", s.ToString()); abort(); } - opened_ = true; + opened_.store(true); INFO("Open DB{} success!", db_index_); } @@ -80,7 +80,6 @@ void DB::CreateCheckpoint(const std::string& path, bool sync) { void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) { opened_.store(false); - // 对于每一个 rocksdb 分别去 Load 自己的 DB. auto checkpoint_path = path + '/' + std::to_string(db_index_); if (0 != pstd::IsDir(path)) { WARN("Checkpoint dir {} does not exist!", checkpoint_path); @@ -106,7 +105,7 @@ void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) { for (auto& r : result) { r.get(); } - // 重新启动 + storage::StorageOptions storage_options; storage_options.options.create_if_missing = true; storage_options.db_instance_num = rocksdb_inst_num_; diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 4489beae3..124a0bf8e 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -523,7 +523,7 @@ butil::Status PRaft::DoSnapshot() { braft::SynchronizedClosure done; node_->snapshot(&done); done.wait(); - return {0, "OK"}; + return done.status(); } void PRaft::OnClusterCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) { diff --git a/src/pstd/pstd_string.cc b/src/pstd/pstd_string.cc index c10e99255..deb9c431a 100755 --- a/src/pstd/pstd_string.cc +++ b/src/pstd/pstd_string.cc @@ -619,4 +619,10 @@ bool StringHasSpaces(const std::string& str) { return std::count_if(str.begin(), str.end(), [](unsigned char c) { return std::isspace(c); }); } +void trimSlash(std::string& dirName) { + while (dirName.back() == '/') { + dirName.pop_back(); + } +} + } // namespace pstd \ No newline at end of file diff --git a/src/pstd/pstd_string.h b/src/pstd/pstd_string.h index ed8411bb4..8960e3242 100755 --- a/src/pstd/pstd_string.h +++ b/src/pstd/pstd_string.h @@ -93,4 +93,6 @@ std::string RandomStringWithNumber(size_t len); bool StringHasSpaces(const std::string& str); +void trimSlash(std::string& dirName); + } // namespace pstd diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index aad21e337..3fc174ac8 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -1110,6 +1110,7 @@ class Storage { // Storage start the background thread for compaction task pthread_t bg_tasks_thread_id_ = 0; + std::thread t; pstd::Mutex bg_tasks_mutex_; pstd::CondVar bg_tasks_cond_var_; std::queue bg_tasks_queue_; diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 10febd50a..ed8ee7d3c 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -51,7 +51,12 @@ Redis::~Redis() { for (auto handle : tmp_handles) { delete handle; } + // delete env_; delete db_; + + if (default_compact_range_options_.canceled) { + delete default_compact_range_options_.canceled; + } } Status Redis::Open(const StorageOptions& storage_options, const std::string& db_path) { diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 418ce3e21..fe7a3c943 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -12,6 +12,7 @@ #include "config.h" #include "pstd/log.h" #include "pstd/pikiwidb_slot.h" +#include "pstd/pstd_string.h" #include "rocksdb/utilities/checkpoint.h" #include "scope_snapshot.h" #include "src/lru_cache.h" @@ -68,13 +69,14 @@ Storage::Storage() { } Storage::~Storage() { - bg_tasks_should_exit_ = true; + bg_tasks_should_exit_.store(true); bg_tasks_cond_var_.notify_one(); - - if (is_opened_) { - for (auto& inst : insts_) { - inst.reset(); + if (is_opened_.load()) { + int ret = 0; + if (ret = pthread_join(bg_tasks_thread_id_, nullptr); ret != 0) { + ERROR("pthread_join failed with bgtask thread error : {}", ret); } + insts_.clear(); } } @@ -92,18 +94,18 @@ static int RecursiveLinkAndCopy(const std::filesystem::path& source, const std:: return 0; } else if (source.extension() == SST_FILE_EXTENSION) { // Create a hard link - DEBUG("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string()); - if (::link(source.c_str(), destination.c_str()) < 0) { + if (::link(source.c_str(), destination.c_str()) != 0) { WARN("hard link file {} fail", source.string()); return -1; } + DEBUG("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string()); } else { // Copy the file - DEBUG("copy success! source_file = {} , destination_file = {}", source.string(), destination.string()); if (!std::filesystem::copy_file(source, destination, std::filesystem::copy_options::overwrite_existing)) { WARN("copy file {} fail", source.string()); return -1; } + DEBUG("copy success! source_file = {} , destination_file = {}", source.string(), destination.string()); } } else { if (!pstd::FileExists(destination)) { @@ -192,33 +194,36 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { } Status Storage::LoadCheckpoint(const std::string& dump_path, const std::string& db_path, int i) { - auto rocksdb_checkpoint_path = AppendSubDirectory(db_path, i); - INFO("DB{}'s RocksDB {} begin to load a checkpoint from {}!", db_id_, i, rocksdb_checkpoint_path); - - // 首先将原来的 db path 改名, 当 load 失败的时候保证原来的数据还在. - auto tmp_path = db_path + ".tmp"; - if (auto status = pstd::RenameFile(db_path, tmp_path); status != 0) { - WARN("DB{}'s RocksDB {} rename db directory {} to temporary directory {} fail!", db_id_, i, db_path, tmp_path); + auto rocksdb_checkpoint_path = AppendSubDirectory(dump_path, i); + INFO("DB{}'s RocksDB {} begin to load a checkpoint from {}", db_id_, i, rocksdb_checkpoint_path); + auto rocksdb_path = AppendSubDirectory(db_path, i); // ./db/db_id/i + auto tmp_rocksdb_path = rocksdb_path + ".tmp"; // ./db/db_id/i.tmp + insts_[i].reset(); + + // 1) Rename the original db to db.tmp, and only perform the maximum possible recovery of data + // when loading the checkpoint fails. + if (auto status = pstd::RenameFile(rocksdb_path, tmp_rocksdb_path); status != 0) { + WARN("DB{}'s RocksDB {} rename db directory {} to temporary directory {} fail!", db_id_, i, db_path, + tmp_rocksdb_path); return Status::IOError("Rename directory {} fail!", db_path); } - if (0 != pstd::CreateDir(db_path)) { - pstd::RenameFile(tmp_path, db_path); + // 2) Create a db directory to save the checkpoint. + if (0 != pstd::CreatePath(rocksdb_path)) { + pstd::RenameFile(tmp_rocksdb_path, rocksdb_path); WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path); - return Status::IOError("Create directory {} fail!", db_path); + return Status::IOError("Create directory {} fail!", rocksdb_path); } - - // 将原来的数据拷贝到 DB 目录下. - if (RecursiveLinkAndCopy(dump_path, db_path) != 0) { - pstd::DeleteDir(db_path); - pstd::RenameFile(tmp_path, db_path); + if (RecursiveLinkAndCopy(rocksdb_checkpoint_path, rocksdb_path) != 0) { + pstd::DeleteDir(rocksdb_path); + pstd::RenameFile(tmp_rocksdb_path, rocksdb_path); WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path); - return Status::IOError("recursive link and copy directory {} fail!", dump_path); + return Status::IOError("recursive link and copy directory {} fail!", rocksdb_path); } - // 删除掉 tmp - if (auto s = rocksdb::DestroyDB(tmp_path, rocksdb::Options()); !s.ok()) { - WARN("Failure to destroy the old DB, path = {}", tmp_path); + // 3) Destroy the db.tmp directory. + if (auto s = rocksdb::DestroyDB(tmp_rocksdb_path, rocksdb::Options()); !s.ok()) { + WARN("Failure to destroy the old DB, path = {}", tmp_rocksdb_path); } return Status::OK(); } @@ -2047,9 +2052,9 @@ Status Storage::AddBGTask(const BGTask& bg_task) { Status Storage::RunBGTask() { BGTask task; - while (!bg_tasks_should_exit_) { + while (!bg_tasks_should_exit_.load()) { std::unique_lock lock(bg_tasks_mutex_); - bg_tasks_cond_var_.wait(lock, [this]() { return !bg_tasks_queue_.empty() || bg_tasks_should_exit_; }); + bg_tasks_cond_var_.wait(lock, [this]() { return !bg_tasks_queue_.empty() || bg_tasks_should_exit_.load(); }); if (!bg_tasks_queue_.empty()) { task = bg_tasks_queue_.front(); @@ -2057,7 +2062,7 @@ Status Storage::RunBGTask() { } lock.unlock(); - if (bg_tasks_should_exit_) { + if (bg_tasks_should_exit_.load()) { return Status::Incomplete("bgtask return with bg_tasks_should_exit true"); } diff --git a/src/store.cc b/src/store.cc index c67da7127..1320c0e77 100644 --- a/src/store.cc +++ b/src/store.cc @@ -9,6 +9,7 @@ #include "config.h" #include "log.h" +#include "pstd/pstd_string.h" #include "store.h" namespace pikiwidb { @@ -54,7 +55,7 @@ void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) { return; } auto path = task.args.find(kCheckpointPath)->second; - trimSlash(path); + pstd::trimSlash(path); db->CreateCheckpoint(path, task.sync); break; } @@ -64,7 +65,7 @@ void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) { return; } auto path = task.args.find(kCheckpointPath)->second; - trimSlash(path); + pstd::trimSlash(path); db->LoadDBFromCheckPoint(path, task.sync); break; } @@ -73,11 +74,4 @@ void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) { } }); } - -void PStore::trimSlash(std::string& dirName) { - while (dirName.back() == '/') { - dirName.pop_back(); - } -} - } // namespace pikiwidb diff --git a/src/store.h b/src/store.h index 2fd207474..76a8442ff 100644 --- a/src/store.h +++ b/src/store.h @@ -60,7 +60,6 @@ class PStore { private: PStore() = default; - void trimSlash(std::string& dirName); int dbNum_ = 0; From 6bf07a0ef704e65bc4954adbcb82556d13ea9c7f Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Mon, 22 Apr 2024 10:00:51 +0800 Subject: [PATCH 6/7] handle comments --- pikiwidb.conf | 4 ++-- src/db.cc | 6 ++---- src/praft/praft.cc | 4 ++-- src/pstd/pstd_string.cc | 2 +- src/pstd/pstd_string.h | 2 +- src/storage/include/storage/storage.h | 1 - src/store.cc | 11 +++-------- src/store.h | 15 +-------------- 8 files changed, 12 insertions(+), 33 deletions(-) diff --git a/pikiwidb.conf b/pikiwidb.conf index 55fe6d3d2..b375faeb0 100644 --- a/pikiwidb.conf +++ b/pikiwidb.conf @@ -38,7 +38,7 @@ logfile stdout # Set the number of databases. The default database is DB 0, you can select # a different one on a per-connection basis using SELECT where # dbid is a number between 0 and 'databases'-1 -databases 3 +databases 1 ################################ SNAPSHOTTING ################################# # @@ -347,7 +347,7 @@ backendpath dump # the frequency of dump to backend per second backendhz 10 # the rocksdb number per db -db-instance-num 5 +db-instance-num 3 # default 86400 * 7 rocksdb-ttl-second 604800 # default 86400 * 3 diff --git a/src/db.cc b/src/db.cc index 924055847..0129a925f 100644 --- a/src/db.cc +++ b/src/db.cc @@ -41,7 +41,7 @@ DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num) } void DB::DoCheckpoint(const std::string& path, int i) { - // 1) always hold storage's sharedLock + // 1) always hold the storage's shared lock std::shared_lock sharedLock(storage_mutex_); // 2)Create the checkpoint of rocksdb i. @@ -49,7 +49,7 @@ void DB::DoCheckpoint(const std::string& path, int i) { } void DB::LoadCheckpoint(const std::string& path, const std::string& db_path, int i) { - // 1) Already holding the mutual exclusion lock + // 1) Already holding the storage's exclusion lock // 2) Load the checkpoint of rocksdb i. auto status = storage_->LoadCheckpoint(path, db_path, i); @@ -97,8 +97,6 @@ void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) { result.reserve(rocksdb_inst_num_); for (int i = 0; i < rocksdb_inst_num_; ++i) { // In a new thread, Load a checkpoint for the specified rocksdb i - // In DB::DoBgSave, a read lock is always held to protect the Storage - // corresponding to this rocksdb i. auto res = std::async(std::launch::async, &DB::LoadCheckpoint, this, checkpoint_path, db_path_, i); result.push_back(std::move(res)); } diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 124a0bf8e..4db70e580 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -645,7 +645,7 @@ void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done auto path = writer->get_path(); INFO("Saving snapshot to {}", path); TasksVector tasks(1, {TaskType::kCheckpoint, db_id_, {{TaskArg::kCheckpointPath, path}}, true}); - PSTORE.DoSomeThingSpecificDB(tasks); + PSTORE.HandleTaskSpecificDB(tasks); if (auto res = AddAllFiles(path, writer, path); res != 0) { done->status().set_error(EIO, "Fail to add file to writer"); } @@ -657,7 +657,7 @@ int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { auto reader_path = reader->get_path(); // xx/snapshot_0000001 auto path = g_config.dbpath + std::to_string(db_id_); // db/db_id TasksVector tasks(1, {TaskType::kLoadDBFromCheckPoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true}); - PSTORE.DoSomeThingSpecificDB(tasks); + PSTORE.HandleTaskSpecificDB(tasks); return 0; } diff --git a/src/pstd/pstd_string.cc b/src/pstd/pstd_string.cc index deb9c431a..973656a2b 100755 --- a/src/pstd/pstd_string.cc +++ b/src/pstd/pstd_string.cc @@ -619,7 +619,7 @@ bool StringHasSpaces(const std::string& str) { return std::count_if(str.begin(), str.end(), [](unsigned char c) { return std::isspace(c); }); } -void trimSlash(std::string& dirName) { +void TrimSlash(std::string& dirName) { while (dirName.back() == '/') { dirName.pop_back(); } diff --git a/src/pstd/pstd_string.h b/src/pstd/pstd_string.h index 8960e3242..d6ffd828a 100755 --- a/src/pstd/pstd_string.h +++ b/src/pstd/pstd_string.h @@ -93,6 +93,6 @@ std::string RandomStringWithNumber(size_t len); bool StringHasSpaces(const std::string& str); -void trimSlash(std::string& dirName); +void TrimSlash(std::string& dirName); } // namespace pstd diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 3fc174ac8..aad21e337 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -1110,7 +1110,6 @@ class Storage { // Storage start the background thread for compaction task pthread_t bg_tasks_thread_id_ = 0; - std::thread t; pstd::Mutex bg_tasks_mutex_; pstd::CondVar bg_tasks_cond_var_; std::queue bg_tasks_queue_; diff --git a/src/store.cc b/src/store.cc index 1320c0e77..4ef54d93f 100644 --- a/src/store.cc +++ b/src/store.cc @@ -36,12 +36,7 @@ void PStore::Init() { } } -void PStore::Clear() { - std::lock_guard lock(dbs_mutex_); - backends_.clear(); -} - -void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) { +void PStore::HandleTaskSpecificDB(const TasksVector& tasks) { std::for_each(tasks.begin(), tasks.end(), [this](const auto& task) { if (task.db < 0 || task.db >= dbNum_) { WARN("The database index is out of range."); @@ -55,7 +50,7 @@ void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) { return; } auto path = task.args.find(kCheckpointPath)->second; - pstd::trimSlash(path); + pstd::TrimSlash(path); db->CreateCheckpoint(path, task.sync); break; } @@ -65,7 +60,7 @@ void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) { return; } auto path = task.args.find(kCheckpointPath)->second; - pstd::trimSlash(path); + pstd::TrimSlash(path); db->LoadDBFromCheckPoint(path, task.sync); break; } diff --git a/src/store.h b/src/store.h index 76a8442ff..e8192f7e3 100644 --- a/src/store.h +++ b/src/store.h @@ -38,7 +38,6 @@ using TasksVector = std::vector; class PStore { public: - friend class CheckpointManager; static PStore& Instance(); PStore(const PStore&) = delete; @@ -46,29 +45,17 @@ class PStore { void Init(); - void Clear(); - std::unique_ptr& GetBackend(int32_t index) { return backends_[index]; }; - void DoSomeThingSpecificDB(const TasksVector& task); - - void WaitForCheckpointDone(); + void HandleTaskSpecificDB(const TasksVector& task); int GetDBNumber() const { return dbNum_; } - std::shared_mutex& SharedMutex() { return dbs_mutex_; } - private: PStore() = default; int dbNum_ = 0; - /** - * If you want to access all the DBs at the same time, - * then you must hold the lock. - * For example: you want to execute flushall or bgsave. - */ - std::shared_mutex dbs_mutex_; std::vector> backends_; }; From 7fd94d7b6b4e09fdcdb1a752f423f728b55bc6e0 Mon Sep 17 00:00:00 2001 From: dingxiaoshuai123 <2486016589@qq.com> Date: Mon, 22 Apr 2024 17:26:53 +0800 Subject: [PATCH 7/7] fix comments --- src/db.cc | 2 +- src/db.h | 2 +- src/praft/praft.cc | 2 +- src/store.cc | 18 +++++++++++------- src/store.h | 13 +++++++------ 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/db.cc b/src/db.cc index 0129a925f..4c002e146 100644 --- a/src/db.cc +++ b/src/db.cc @@ -78,7 +78,7 @@ void DB::CreateCheckpoint(const std::string& path, bool sync) { } } -void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) { +void DB::LoadDBFromCheckpoint(const std::string& path, bool sync) { opened_.store(false); auto checkpoint_path = path + '/' + std::to_string(db_index_); if (0 != pstd::IsDir(path)) { diff --git a/src/db.h b/src/db.h index 2794ac440..a127e4f16 100644 --- a/src/db.h +++ b/src/db.h @@ -28,7 +28,7 @@ class DB { void CreateCheckpoint(const std::string& path, bool sync); - void LoadDBFromCheckPoint(const std::string& path, bool sync = false); + void LoadDBFromCheckpoint(const std::string& path, bool sync = false); int GetDbIndex() { return db_index_; } diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 4db70e580..ecb4b002c 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -656,7 +656,7 @@ int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { assert(reader); auto reader_path = reader->get_path(); // xx/snapshot_0000001 auto path = g_config.dbpath + std::to_string(db_id_); // db/db_id - TasksVector tasks(1, {TaskType::kLoadDBFromCheckPoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true}); + TasksVector tasks(1, {TaskType::kLoadDBFromCheckpoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true}); PSTORE.HandleTaskSpecificDB(tasks); return 0; } diff --git a/src/store.cc b/src/store.cc index 4ef54d93f..e3c9e3560 100644 --- a/src/store.cc +++ b/src/store.cc @@ -24,10 +24,10 @@ void PStore::Init() { return; } - dbNum_ = g_config.databases; - backends_.reserve(dbNum_); + db_number_ = g_config.databases; + backends_.reserve(db_number_); if (g_config.backend == kBackEndRocksDB) { - for (int i = 0; i < dbNum_; i++) { + for (int i = 0; i < db_number_; i++) { auto db = std::make_unique(i, g_config.dbpath, g_config.db_instance_num); backends_.push_back(std::move(db)); } @@ -38,11 +38,11 @@ void PStore::Init() { void PStore::HandleTaskSpecificDB(const TasksVector& tasks) { std::for_each(tasks.begin(), tasks.end(), [this](const auto& task) { - if (task.db < 0 || task.db >= dbNum_) { + if (task.db < 0 || task.db >= db_number_) { WARN("The database index is out of range."); return; } - auto& db = backends_[task.db]; + auto& db = backends_.at(task.db); switch (task.type) { case kCheckpoint: { if (auto s = task.args.find(kCheckpointPath); s == task.args.end()) { @@ -54,14 +54,18 @@ void PStore::HandleTaskSpecificDB(const TasksVector& tasks) { db->CreateCheckpoint(path, task.sync); break; } - case kLoadDBFromCheckPoint: { + case kLoadDBFromCheckpoint: { if (auto s = task.args.find(kCheckpointPath); s == task.args.end()) { WARN("The critical parameter 'path' is missing for load a checkpoint."); return; } auto path = task.args.find(kCheckpointPath)->second; pstd::TrimSlash(path); - db->LoadDBFromCheckPoint(path, task.sync); + db->LoadDBFromCheckpoint(path, task.sync); + break; + } + case kEmpty: { + WARN("A empty task was passed in, not doing anything."); break; } default: diff --git a/src/store.h b/src/store.h index e8192f7e3..4bf15c5f3 100644 --- a/src/store.h +++ b/src/store.h @@ -17,17 +17,18 @@ namespace pikiwidb { -enum TaskType { kCheckpoint = 0, kLoadDBFromCheckPoint }; +enum TaskType { kCheckpoint = 0, kLoadDBFromCheckpoint, kEmpty }; enum TaskArg { kCheckpointPath = 0, }; struct TaskContext { - TaskType type; - int db; + TaskType type = kEmpty; + int db = -1; std::map args; - bool sync; + bool sync = false; + TaskContext() = delete; TaskContext(TaskType t, bool s = false) : type(t), sync(s) {} TaskContext(TaskType t, int d, bool s = false) : type(t), db(d), sync(s) {} TaskContext(TaskType t, int d, const std::map& a, bool s = false) @@ -49,12 +50,12 @@ class PStore { void HandleTaskSpecificDB(const TasksVector& task); - int GetDBNumber() const { return dbNum_; } + int GetDBNumber() const { return db_number_; } private: PStore() = default; - int dbNum_ = 0; + int db_number_ = 0; std::vector> backends_; };