Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: overload the PosixFileSystemAdaptor interface to generate real snapshot when follower installs snapshot #245

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ BUILD_TIME=${BUILD_TIME: 0: 10}
COMMIT_ID=$(git rev-parse HEAD)
SHORT_COMMIT_ID=${COMMIT_ID: 0: 8}

BUILD_TYPE=Debug
BUILD_TYPE=Release
VERBOSE=0
CMAKE_FLAGS=""
MAKE_FLAGS=""
Expand Down
14 changes: 7 additions & 7 deletions save_load.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ mkdir leader follower1
cd leader && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 7777 &
cd follower1 && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 8888 &

# sleep 10
sleep 10

# redis-cli -p 7777 raft.cluster init
redis-cli -p 7777 raft.cluster init

# redis-benchmark -p 7777 -c 5 -n 100 -r 1000 -d 1024 -t hset
# redis-cli -p 7777 raft.node DSS
# redis-benchmark -p 7777 -c 5 -n 10000 -r 1000000 -d 1024 -t hset
# redis-cli -p 7777 raft.node DSS
redis-benchmark -p 7777 -c 5 -n 100 -r 1000 -d 1024 -t hset
redis-cli -p 7777 raft.node DSS
redis-benchmark -p 7777 -c 5 -n 100 -r 1000 -d 1024 -t hset
redis-cli -p 7777 raft.node DSS

# redis-cli -p 8888 raft.cluster join 127.0.0.1:7777
redis-cli -p 8888 raft.cluster join 127.0.0.1:7777
44 changes: 26 additions & 18 deletions src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "pikiwidb.h"
#include "praft.h"
#include "praft.pb.h"
#include "psnapshot.h"
#include "store.h"

#define ERROR_LOG_AND_STATUS(msg) \
Expand Down Expand Up @@ -103,22 +104,8 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) {
node_options_.raft_meta_uri = prefix + "/raft_meta";
node_options_.snapshot_uri = prefix + "/snapshot";
// node_options_.disable_cli = FLAGS_disable_cli;

// checkpoint callback
auto checkpoint_callback = [&] (braft::SnapshotWriter* writer) {
TasksVector tasks;
tasks.reserve(g_config.databases);
for (auto i = 0; i < g_config.databases; ++i) {
tasks.push_back({TaskType::kCheckpoint, i, {{TaskArg::kCheckpointPath, writer->get_path()}}});
}
INFO("start generate snapshot");
PSTORE.DoSomeThingSpecificDB(tasks);
PSTORE.WaitForCheckpointDone();
auto writer_path = writer->get_path();
add_all_files(writer_path, writer, writer_path);
INFO("end generate snapshot");
};
node_options_.checkpoint_callback = checkpoint_callback;
snapshot_adaptor_ = new PPosixFileSystemAdaptor();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里使用new申请了,虽然只申请了一次,但是最好有对应delete的操作。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scoped_refptrbraft::FileSystemAdaptor snapshot_adaptor_ = nullptr;
这样定义的,应该不需要主动调用delete吧

node_options_.snapshot_file_system_adaptor = &snapshot_adaptor_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node_options里有一个定时打快照的参数snapshot_interval_s,可以考虑置零。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经置0了


node_ = std::make_unique<braft::Node>("pikiwidb", braft::PeerId(addr)); // group_id
if (node_->init(node_options_) != 0) {
Expand Down Expand Up @@ -314,16 +301,21 @@ butil::Status PRaft::RemovePeer(const std::string& peer) {
return {0, "OK"};
}

butil::Status PRaft::DoSnapshot() {
butil::Status PRaft::DoSnapshot(int64_t self_snapshot_index) {
if (!node_) {
return ERROR_LOG_AND_STATUS("Node is not initialized");
}
braft::SynchronizedClosure done;
node_->snapshot(&done);
node_->snapshot(&done); // @todo self_snapshot_index
done.wait();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里考虑到我们想通过rocksdb触发event listerner去调用这个函数,不一定需要同步等待。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

所以可以考虑多一个参数用来控制是否进行等待。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的

return {0, "OK"};
}

void PRaft::GenerateRealSnapshot() {
is_generate_snapshot_.store(true, std::memory_order_release);
DoSnapshot(); // @todo customize the snapshot index
}

void PRaft::OnJoinCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) {
auto cli = join_ctx_.GetClient();
if (cli) {
Expand Down Expand Up @@ -445,6 +437,22 @@ void PRaft::on_apply(braft::Iterator& iter) {

void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) {
brpc::ClosureGuard done_guard(done);
if (is_generate_snapshot_.load(std::memory_order_acquire)) {
TasksVector tasks;
tasks.reserve(g_config.databases);
for (auto i = 0; i < g_config.databases; ++i) {
tasks.push_back({TaskType::kCheckpoint, i, {{TaskArg::kCheckpointPath, writer->get_path()}}});
}

INFO("start generate snapshot");
PSTORE.DoSomeThingSpecificDB(tasks);
PSTORE.WaitForCheckpointDone();
auto writer_path = writer->get_path();
add_all_files(writer_path, writer, writer_path);
INFO("end generate snapshot");

is_generate_snapshot_.store(false, std::memory_order_release);
}
}

int PRaft::on_snapshot_load(braft::SnapshotReader* reader) {
Expand Down
12 changes: 8 additions & 4 deletions src/praft/praft.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
#pragma once

#include <filesystem>
#include <future>
#include <memory>
#include <mutex>
#include <string>
#include <tuple>
#include <vector>
#include <future>

#include "braft/file_system_adaptor.h"
#include "braft/raft.h"
#include "rocksdb/status.h"

Expand All @@ -27,6 +28,7 @@ namespace pikiwidb {
class PClient;
class EventLoop;
class Binlog;
class PPosixFileSystemAdaptor;

class JoinCmdContext {
friend class PRaft;
Expand Down Expand Up @@ -99,8 +101,8 @@ class PRaft : public braft::StateMachine {
butil::Status Init(std::string& group_id, bool initial_conf_is_null);
butil::Status AddPeer(const std::string& peer);
butil::Status RemovePeer(const std::string& peer);
butil::Status RaftRecvEntry();
butil::Status DoSnapshot();
butil::Status DoSnapshot(int64_t self_snapshot_index = 0);
void GenerateRealSnapshot();

void ShutDown();
void Join();
Expand Down Expand Up @@ -140,7 +142,6 @@ class PRaft : public braft::StateMachine {

private:
void add_all_files(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path);

void recursive_copy(const std::filesystem::path& source, const std::filesystem::path& destination);

private:
Expand All @@ -151,6 +152,9 @@ class PRaft : public braft::StateMachine {

JoinCmdContext join_ctx_; // context for cluster join command
std::string dbid_; // dbid of group,

std::atomic<bool> is_generate_snapshot_ = false; // whether generate real snapshot
scoped_refptr<braft::FileSystemAdaptor> snapshot_adaptor_ = nullptr;
};

} // namespace pikiwidb
30 changes: 30 additions & 0 deletions src/praft/psnapshot.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

//
// psnapshot.cc

#include "psnapshot.h"
#include "config.h"
#include "pikiwidb.h"
#include "praft.h"
#include "store.h"

namespace pikiwidb {

struct PConfig;
extern PConfig g_config;

braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int oflag,
const ::google::protobuf::Message* file_meta, butil::File::Error* e) {
// checkpoint callback
PRAFT.GenerateRealSnapshot();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里会驱动on_snapshot_save去生成快照,如果系统一直向前运行,这时on_snapshot_save会生成在一个新的目录下而不是当前PPosixFileSystemAdaptor::open要找到的目录吧?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

或者说我们通过定制传进去的snapshot index这个接口中还能覆盖到上一次生成的目录里?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


return braft::PosixFileSystemAdaptor::open(path, oflag, file_meta, e);
}

} // namespace pikiwidb
24 changes: 24 additions & 0 deletions src/praft/psnapshot.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#pragma once

#include "braft/file_system_adaptor.h"
#include "praft.h"

namespace pikiwidb {

class PPosixFileSystemAdaptor : public braft::PosixFileSystemAdaptor {
public:
PPosixFileSystemAdaptor() {}
~PPosixFileSystemAdaptor() {}

braft::FileAdaptor* open(const std::string& path, int oflag, const ::google::protobuf::Message* file_meta,
butil::File::Error* e) override;
};

} // namespace pikiwidb
Loading