Skip to content

Commit

Permalink
fix load checkpoint fail
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Apr 21, 2024
1 parent b303f99 commit 3e53764
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 44 deletions.
5 changes: 2 additions & 3 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num)
ERROR("Storage open failed! {}", s.ToString());
abort();
}
opened_ = true;
opened_.store(true);
INFO("Open DB{} success!", db_index_);
}

Expand Down Expand Up @@ -80,7 +80,6 @@ void DB::CreateCheckpoint(const std::string& path, bool sync) {

void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) {
opened_.store(false);
// 对于每一个 rocksdb 分别去 Load 自己的 DB.
auto checkpoint_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::IsDir(path)) {
WARN("Checkpoint dir {} does not exist!", checkpoint_path);
Expand All @@ -106,7 +105,7 @@ void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) {
for (auto& r : result) {
r.get();
}
// 重新启动

storage::StorageOptions storage_options;
storage_options.options.create_if_missing = true;
storage_options.db_instance_num = rocksdb_inst_num_;
Expand Down
2 changes: 1 addition & 1 deletion src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ butil::Status PRaft::DoSnapshot() {
braft::SynchronizedClosure done;
node_->snapshot(&done);
done.wait();
return {0, "OK"};
return done.status();
}

void PRaft::OnClusterCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) {
Expand Down
6 changes: 6 additions & 0 deletions src/pstd/pstd_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,4 +619,10 @@ bool StringHasSpaces(const std::string& str) {
return std::count_if(str.begin(), str.end(), [](unsigned char c) { return std::isspace(c); });
}

void trimSlash(std::string& dirName) {
while (dirName.back() == '/') {
dirName.pop_back();
}
}

} // namespace pstd
2 changes: 2 additions & 0 deletions src/pstd/pstd_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,6 @@ std::string RandomStringWithNumber(size_t len);

bool StringHasSpaces(const std::string& str);

void trimSlash(std::string& dirName);

} // namespace pstd
1 change: 1 addition & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,7 @@ class Storage {

// Storage start the background thread for compaction task
pthread_t bg_tasks_thread_id_ = 0;
std::thread t;
pstd::Mutex bg_tasks_mutex_;
pstd::CondVar bg_tasks_cond_var_;
std::queue<BGTask> bg_tasks_queue_;
Expand Down
5 changes: 5 additions & 0 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ Redis::~Redis() {
for (auto handle : tmp_handles) {
delete handle;
}
// delete env_;
delete db_;

if (default_compact_range_options_.canceled) {
delete default_compact_range_options_.canceled;
}
}

Status Redis::Open(const StorageOptions& storage_options, const std::string& db_path) {
Expand Down
65 changes: 35 additions & 30 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "config.h"
#include "pstd/log.h"
#include "pstd/pikiwidb_slot.h"
#include "pstd/pstd_string.h"
#include "rocksdb/utilities/checkpoint.h"
#include "scope_snapshot.h"
#include "src/lru_cache.h"
Expand Down Expand Up @@ -68,13 +69,14 @@ Storage::Storage() {
}

Storage::~Storage() {
bg_tasks_should_exit_ = true;
bg_tasks_should_exit_.store(true);
bg_tasks_cond_var_.notify_one();

if (is_opened_) {
for (auto& inst : insts_) {
inst.reset();
if (is_opened_.load()) {
int ret = 0;
if (ret = pthread_join(bg_tasks_thread_id_, nullptr); ret != 0) {
ERROR("pthread_join failed with bgtask thread error : {}", ret);
}
insts_.clear();
}
}

Expand All @@ -92,18 +94,18 @@ static int RecursiveLinkAndCopy(const std::filesystem::path& source, const std::
return 0;
} else if (source.extension() == SST_FILE_EXTENSION) {
// Create a hard link
DEBUG("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string());
if (::link(source.c_str(), destination.c_str()) < 0) {
if (::link(source.c_str(), destination.c_str()) != 0) {
WARN("hard link file {} fail", source.string());
return -1;
}
DEBUG("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string());
} else {
// Copy the file
DEBUG("copy success! source_file = {} , destination_file = {}", source.string(), destination.string());
if (!std::filesystem::copy_file(source, destination, std::filesystem::copy_options::overwrite_existing)) {
WARN("copy file {} fail", source.string());
return -1;
}
DEBUG("copy success! source_file = {} , destination_file = {}", source.string(), destination.string());
}
} else {
if (!pstd::FileExists(destination)) {
Expand Down Expand Up @@ -192,33 +194,36 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) {
}

Status Storage::LoadCheckpoint(const std::string& dump_path, const std::string& db_path, int i) {
auto rocksdb_checkpoint_path = AppendSubDirectory(db_path, i);
INFO("DB{}'s RocksDB {} begin to load a checkpoint from {}!", db_id_, i, rocksdb_checkpoint_path);

// 首先将原来的 db path 改名, 当 load 失败的时候保证原来的数据还在.
auto tmp_path = db_path + ".tmp";
if (auto status = pstd::RenameFile(db_path, tmp_path); status != 0) {
WARN("DB{}'s RocksDB {} rename db directory {} to temporary directory {} fail!", db_id_, i, db_path, tmp_path);
auto rocksdb_checkpoint_path = AppendSubDirectory(dump_path, i);
INFO("DB{}'s RocksDB {} begin to load a checkpoint from {}", db_id_, i, rocksdb_checkpoint_path);
auto rocksdb_path = AppendSubDirectory(db_path, i); // ./db/db_id/i
auto tmp_rocksdb_path = rocksdb_path + ".tmp"; // ./db/db_id/i.tmp
insts_[i].reset();

// 1) Rename the original db to db.tmp, and only perform the maximum possible recovery of data
// when loading the checkpoint fails.
if (auto status = pstd::RenameFile(rocksdb_path, tmp_rocksdb_path); status != 0) {
WARN("DB{}'s RocksDB {} rename db directory {} to temporary directory {} fail!", db_id_, i, db_path,
tmp_rocksdb_path);
return Status::IOError("Rename directory {} fail!", db_path);
}

if (0 != pstd::CreateDir(db_path)) {
pstd::RenameFile(tmp_path, db_path);
// 2) Create a db directory to save the checkpoint.
if (0 != pstd::CreatePath(rocksdb_path)) {
pstd::RenameFile(tmp_rocksdb_path, rocksdb_path);
WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path);
return Status::IOError("Create directory {} fail!", db_path);
return Status::IOError("Create directory {} fail!", rocksdb_path);
}

// 将原来的数据拷贝到 DB 目录下.
if (RecursiveLinkAndCopy(dump_path, db_path) != 0) {
pstd::DeleteDir(db_path);
pstd::RenameFile(tmp_path, db_path);
if (RecursiveLinkAndCopy(rocksdb_checkpoint_path, rocksdb_path) != 0) {
pstd::DeleteDir(rocksdb_path);
pstd::RenameFile(tmp_rocksdb_path, rocksdb_path);
WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path);
return Status::IOError("recursive link and copy directory {} fail!", dump_path);
return Status::IOError("recursive link and copy directory {} fail!", rocksdb_path);
}

// 删除掉 tmp
if (auto s = rocksdb::DestroyDB(tmp_path, rocksdb::Options()); !s.ok()) {
WARN("Failure to destroy the old DB, path = {}", tmp_path);
// 3) Destroy the db.tmp directory.
if (auto s = rocksdb::DestroyDB(tmp_rocksdb_path, rocksdb::Options()); !s.ok()) {
WARN("Failure to destroy the old DB, path = {}", tmp_rocksdb_path);
}
return Status::OK();
}
Expand Down Expand Up @@ -2047,17 +2052,17 @@ Status Storage::AddBGTask(const BGTask& bg_task) {

Status Storage::RunBGTask() {
BGTask task;
while (!bg_tasks_should_exit_) {
while (!bg_tasks_should_exit_.load()) {
std::unique_lock<std::mutex> lock(bg_tasks_mutex_);
bg_tasks_cond_var_.wait(lock, [this]() { return !bg_tasks_queue_.empty() || bg_tasks_should_exit_; });
bg_tasks_cond_var_.wait(lock, [this]() { return !bg_tasks_queue_.empty() || bg_tasks_should_exit_.load(); });

if (!bg_tasks_queue_.empty()) {
task = bg_tasks_queue_.front();
bg_tasks_queue_.pop();
}
lock.unlock();

if (bg_tasks_should_exit_) {
if (bg_tasks_should_exit_.load()) {
return Status::Incomplete("bgtask return with bg_tasks_should_exit true");
}

Expand Down
12 changes: 3 additions & 9 deletions src/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "config.h"
#include "log.h"
#include "pstd/pstd_string.h"
#include "store.h"

namespace pikiwidb {
Expand Down Expand Up @@ -54,7 +55,7 @@ void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) {
return;
}
auto path = task.args.find(kCheckpointPath)->second;
trimSlash(path);
pstd::trimSlash(path);
db->CreateCheckpoint(path, task.sync);
break;
}
Expand All @@ -64,7 +65,7 @@ void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) {
return;
}
auto path = task.args.find(kCheckpointPath)->second;
trimSlash(path);
pstd::trimSlash(path);
db->LoadDBFromCheckPoint(path, task.sync);
break;
}
Expand All @@ -73,11 +74,4 @@ void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) {
}
});
}

void PStore::trimSlash(std::string& dirName) {
while (dirName.back() == '/') {
dirName.pop_back();
}
}

} // namespace pikiwidb
1 change: 0 additions & 1 deletion src/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ class PStore {

private:
PStore() = default;
void trimSlash(std::string& dirName);

int dbNum_ = 0;

Expand Down

0 comments on commit 3e53764

Please sign in to comment.