Skip to content

Commit

Permalink
handle comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Apr 22, 2024
1 parent 3e53764 commit 6bf07a0
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ logfile stdout
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 3
databases 1

################################ SNAPSHOTTING #################################
#
Expand Down Expand Up @@ -347,7 +347,7 @@ backendpath dump
# the frequency of dump to backend per second
backendhz 10
# the rocksdb number per db
db-instance-num 5
db-instance-num 3
# default 86400 * 7
rocksdb-ttl-second 604800
# default 86400 * 3
Expand Down
6 changes: 2 additions & 4 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num)
}

void DB::DoCheckpoint(const std::string& path, int i) {
// 1) always hold storage's sharedLock
// 1) always hold the storage's shared lock
std::shared_lock sharedLock(storage_mutex_);

// 2)Create the checkpoint of rocksdb i.
auto status = storage_->CreateCheckpoint(path, i);
}

void DB::LoadCheckpoint(const std::string& path, const std::string& db_path, int i) {
// 1) Already holding the mutual exclusion lock
// 1) Already holding the storage's exclusion lock

// 2) Load the checkpoint of rocksdb i.
auto status = storage_->LoadCheckpoint(path, db_path, i);
Expand Down Expand Up @@ -97,8 +97,6 @@ void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) {
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, Load a checkpoint for the specified rocksdb i
// In DB::DoBgSave, a read lock is always held to protect the Storage
// corresponding to this rocksdb i.
auto res = std::async(std::launch::async, &DB::LoadCheckpoint, this, checkpoint_path, db_path_, i);
result.push_back(std::move(res));
}
Expand Down
4 changes: 2 additions & 2 deletions src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done
auto path = writer->get_path();
INFO("Saving snapshot to {}", path);
TasksVector tasks(1, {TaskType::kCheckpoint, db_id_, {{TaskArg::kCheckpointPath, path}}, true});
PSTORE.DoSomeThingSpecificDB(tasks);
PSTORE.HandleTaskSpecificDB(tasks);
if (auto res = AddAllFiles(path, writer, path); res != 0) {
done->status().set_error(EIO, "Fail to add file to writer");
}
Expand All @@ -657,7 +657,7 @@ int PRaft::on_snapshot_load(braft::SnapshotReader* reader) {
auto reader_path = reader->get_path(); // xx/snapshot_0000001
auto path = g_config.dbpath + std::to_string(db_id_); // db/db_id
TasksVector tasks(1, {TaskType::kLoadDBFromCheckPoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true});
PSTORE.DoSomeThingSpecificDB(tasks);
PSTORE.HandleTaskSpecificDB(tasks);
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/pstd/pstd_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ bool StringHasSpaces(const std::string& str) {
return std::count_if(str.begin(), str.end(), [](unsigned char c) { return std::isspace(c); });
}

void trimSlash(std::string& dirName) {
void TrimSlash(std::string& dirName) {
while (dirName.back() == '/') {
dirName.pop_back();
}
Expand Down
2 changes: 1 addition & 1 deletion src/pstd/pstd_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,6 @@ std::string RandomStringWithNumber(size_t len);

bool StringHasSpaces(const std::string& str);

void trimSlash(std::string& dirName);
void TrimSlash(std::string& dirName);

} // namespace pstd
1 change: 0 additions & 1 deletion src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,6 @@ class Storage {

// Storage start the background thread for compaction task
pthread_t bg_tasks_thread_id_ = 0;
std::thread t;
pstd::Mutex bg_tasks_mutex_;
pstd::CondVar bg_tasks_cond_var_;
std::queue<BGTask> bg_tasks_queue_;
Expand Down
11 changes: 3 additions & 8 deletions src/store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,7 @@ void PStore::Init() {
}
}

void PStore::Clear() {
std::lock_guard<std::shared_mutex> lock(dbs_mutex_);
backends_.clear();
}

void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) {
void PStore::HandleTaskSpecificDB(const TasksVector& tasks) {
std::for_each(tasks.begin(), tasks.end(), [this](const auto& task) {
if (task.db < 0 || task.db >= dbNum_) {
WARN("The database index is out of range.");
Expand All @@ -55,7 +50,7 @@ void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) {
return;
}
auto path = task.args.find(kCheckpointPath)->second;
pstd::trimSlash(path);
pstd::TrimSlash(path);
db->CreateCheckpoint(path, task.sync);
break;
}
Expand All @@ -65,7 +60,7 @@ void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) {
return;
}
auto path = task.args.find(kCheckpointPath)->second;
pstd::trimSlash(path);
pstd::TrimSlash(path);
db->LoadDBFromCheckPoint(path, task.sync);
break;
}
Expand Down
15 changes: 1 addition & 14 deletions src/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,24 @@ using TasksVector = std::vector<TaskContext>;

class PStore {
public:
friend class CheckpointManager;
static PStore& Instance();

PStore(const PStore&) = delete;
void operator=(const PStore&) = delete;

void Init();

void Clear();

std::unique_ptr<DB>& GetBackend(int32_t index) { return backends_[index]; };

void DoSomeThingSpecificDB(const TasksVector& task);

void WaitForCheckpointDone();
void HandleTaskSpecificDB(const TasksVector& task);

int GetDBNumber() const { return dbNum_; }

std::shared_mutex& SharedMutex() { return dbs_mutex_; }

private:
PStore() = default;

int dbNum_ = 0;

/**
* If you want to access all the DBs at the same time,
* then you must hold the lock.
* For example: you want to execute flushall or bgsave.
*/
std::shared_mutex dbs_mutex_;
std::vector<std::unique_ptr<DB>> backends_;
};

Expand Down

0 comments on commit 6bf07a0

Please sign in to comment.