diff --git a/CMakeLists.txt b/CMakeLists.txt index 506a07586..4ea2c79fe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -146,6 +146,9 @@ INCLUDE(braft) ENABLE_TESTING() +SET(PROTO_OUTPUT_DIR "${CMAKE_BINARY_DIR}/generated_pb") +FILE(MAKE_DIRECTORY "${PROTO_OUTPUT_DIR}" PARENT) + ADD_SUBDIRECTORY(src/pstd) ADD_SUBDIRECTORY(src/net) ADD_SUBDIRECTORY(src/praft) diff --git a/cmake/protobuf.cmake b/cmake/protobuf.cmake index 9a8539bb9..2754a7fc5 100644 --- a/cmake/protobuf.cmake +++ b/cmake/protobuf.cmake @@ -132,6 +132,7 @@ FUNCTION(build_protobuf TARGET_NAME) UPDATE_COMMAND "" DEPENDS zlib URL "https://github.com/protocolbuffers/protobuf/archive/v3.18.0.tar.gz" + URL_HASH SHA256=14e8042b5da37652c92ef6a2759e7d2979d295f60afd7767825e3de68c856c54 CONFIGURE_COMMAND mv ../config.sh . COMMAND sh config.sh CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${PROTOBUF_INSTALL_DIR} diff --git a/pikiwidb.conf b/pikiwidb.conf index 3affebcc5..93bdb0622 100644 --- a/pikiwidb.conf +++ b/pikiwidb.conf @@ -347,7 +347,7 @@ backendpath dump # the frequency of dump to backend per second backendhz 10 # the rocksdb number per db -db-instance-num 5 +db-instance-num 1 # default 86400 * 7 rocksdb-ttl-second 604800 # default 86400 * 3 diff --git a/save_load.sh b/save_load.sh new file mode 100755 index 000000000..ffff69385 --- /dev/null +++ b/save_load.sh @@ -0,0 +1,16 @@ +#!/bin/bash +killall -9 pikiwidb +mkdir leader follower1 + +cd leader && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 7777 & +cd follower1 && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 8888 & +sleep 5 + +redis-cli -p 7777 raft.cluster init + +redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset +redis-cli -p 7777 raft.node DSS +redis-benchmark -p 7777 -c 5 -n 10000 -r 10000 -d 1024 -t hset +redis-cli -p 7777 raft.node DSS + +redis-cli -p 8888 raft.cluster join 127.0.0.1:7777 \ No newline at end of file diff --git a/src/checkpoint_manager.cpp b/src/checkpoint_manager.cpp index 9938499c4..f2cd902fe 100644 --- a/src/checkpoint_manager.cpp +++ b/src/checkpoint_manager.cpp @@ -23,15 +23,6 @@ void CheckpointManager::Init(int instNum, DB* db) { void CheckpointManager::CreateCheckpoint(const std::string& path) { res_.clear(); - - if (!pstd::FileExists(path)) { - if (0 != pstd::CreatePath(path)) { - WARN("Create Dir {} fail!", path); - return; - } - INFO("Create Dir {} success!", path); - } - std::lock_guard Lock(shared_mutex_); for (int i = 0; i < checkpoint_num_; ++i) { checkpoint_infoes_[i].checkpoint_in_process = true; diff --git a/src/client.cc b/src/client.cc index 60b7b6d04..aaecc625d 100644 --- a/src/client.cc +++ b/src/client.cc @@ -12,10 +12,10 @@ #include "config.h" #include "log.h" #include "pikiwidb.h" +#include "praft.h" #include "pstd_string.h" #include "slow_log.h" #include "store.h" -#include "praft.h" namespace pikiwidb { diff --git a/src/client.h b/src/client.h index 16e470495..e979cac27 100644 --- a/src/client.h +++ b/src/client.h @@ -13,10 +13,10 @@ #include #include "common.h" +#include "net/tcp_connection.h" #include "proto_parser.h" #include "replication.h" #include "storage/storage.h" -#include "tcp_connection.h" namespace pikiwidb { @@ -250,4 +250,4 @@ class PClient : public std::enable_shared_from_this, public CmdRes { static thread_local PClient* s_current; }; -} // namespace pikiwidb \ No newline at end of file +} // namespace pikiwidb diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index adc463a94..2ab195afc 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -6,9 +6,9 @@ */ #include "cmd_admin.h" -#include "store.h" #include "braft/raft.h" #include "praft.h" +#include "store.h" namespace pikiwidb { @@ -80,7 +80,7 @@ void SelectCmd::DoCmd(PClient* client) { client->SetRes(CmdRes::kOK); } -InfoCmd::InfoCmd(const std::string& name, int16_t arity) +InfoCmd::InfoCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsAdmin | kCmdFlagsReadonly, kAclCategoryAdmin) {} bool InfoCmd::DoInitial(PClient* client) { return true; } @@ -127,7 +127,7 @@ void InfoCmd::DoCmd(PClient* client) { message += "raft_state:up\r\n"; } else { message += "raft_state:down\r\n"; - } + } message += "raft_role:" + std::string(braft::state2str(node_status.state)) + "\r\n"; // message += "raft_is_voting:" + node_status.is_voting + "\r\n"; message += "raft_leader_id:" + node_status.leader_id.to_string() + "\r\n"; @@ -141,9 +141,10 @@ void InfoCmd::DoCmd(PClient* client) { if (!status.ok()) { return client->SetRes(CmdRes::kErrOther, status.error_str()); } - + for (int i = 0; i < peers.size(); i++) { - message += "raft_node" + std::to_string(i) + ":addr=" + butil::ip2str(peers[i].addr.ip).c_str() + ",port=" + std::to_string(peers[i].addr.port) + "\r\n"; + message += "raft_node" + std::to_string(i) + ":addr=" + butil::ip2str(peers[i].addr.ip).c_str() + + ",port=" + std::to_string(peers[i].addr.port) + "\r\n"; } } diff --git a/src/cmd_admin.h b/src/cmd_admin.h index 476660e01..8ef4d6e58 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -85,14 +85,14 @@ class SelectCmd : public BaseCmd { }; class InfoCmd : public BaseCmd { - public: - InfoCmd(const std::string& name, int16_t arity); + public: + InfoCmd(const std::string& name, int16_t arity); - protected: - bool DoInitial(PClient* client) override; + protected: + bool DoInitial(PClient* client) override; - private: - void DoCmd(PClient* client) override; + private: + void DoCmd(PClient* client) override; }; } // namespace pikiwidb diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index 27f349fa9..8b58e6684 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -36,6 +36,8 @@ void RaftNodeCmd::DoCmd(PClient* client) { DoCmdAdd(client); } else if (!strcasecmp(cmd.c_str(), "REMOVE")) { DoCmdRemove(client); + } else if (!strcasecmp(cmd.c_str(), "DSS")) { + DoCmdSnapshot(client); } else { client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE only"); } @@ -70,6 +72,13 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { } } +void RaftNodeCmd::DoCmdSnapshot(PClient* client) { + auto s = PRAFT.DoSnapshot(); + if (s.ok()) { + client->SetRes(CmdRes::kOK); + } +} + RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} diff --git a/src/cmd_raft.h b/src/cmd_raft.h index a5e8f924d..bf36467e2 100644 --- a/src/cmd_raft.h +++ b/src/cmd_raft.h @@ -25,7 +25,7 @@ namespace pikiwidb { * : * : * - * RAFT.NODE REMOVE [id] + * RAFT.NODE REMOVE [id] * Remove an existing node from the cluster. * Reply: * -NOCLUSTER || @@ -45,6 +45,7 @@ class RaftNodeCmd : public BaseCmd { void DoCmd(PClient *client) override; void DoCmdAdd(PClient *client); void DoCmdRemove(PClient *client); + void DoCmdSnapshot(PClient *client); static constexpr std::string_view kAddCmd = "ADD"; static constexpr std::string_view kRemoveCmd = "REMOVE"; diff --git a/src/cmd_table_manager.cc b/src/cmd_table_manager.cc index 2ae39a0a2..e56299199 100644 --- a/src/cmd_table_manager.cc +++ b/src/cmd_table_manager.cc @@ -12,8 +12,8 @@ #include "cmd_keys.h" #include "cmd_kv.h" #include "cmd_list.h" -#include "cmd_set.h" #include "cmd_raft.h" +#include "cmd_set.h" #include "cmd_table_manager.h" #include "cmd_zset.h" @@ -46,7 +46,7 @@ void CmdTableManager::InitCmdTable() { // info ADD_COMMAND(Info, -1); - + // raft ADD_COMMAND(RaftCluster, -1); ADD_COMMAND(RaftNode, -2); diff --git a/src/db.cpp b/src/db.cpp index 3224f9610..cc694ca7f 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -30,6 +30,8 @@ DB::DB(int db_index, const std::string& db_path) ERROR("Storage open failed! {}", s.ToString()); abort(); } + checkpoint_manager_ = std::make_unique(); + checkpoint_manager_->Init(g_config.db_instance_num, this); opened_ = true; INFO("Open DB{} success!", db_index_); } @@ -45,8 +47,14 @@ void DB::DoBgSave(CheckpointInfo& checkpoint_info, const std::string& path, int checkpoint_info.checkpoint_in_process = false; } -void DB::CreateCheckpoint(const std::string& path) { checkpoint_manager_->CreateCheckpoint(path); } +void DB::CreateCheckpoint(const std::string& path) { + if (0 != pstd::CreatePath(path + '/' + std::to_string(db_index_))) { + WARN("Create dir {} fail !", path + '/' + std::to_string(db_index_)); + return; + } + checkpoint_manager_->CreateCheckpoint(path); +} void DB::WaitForCheckpointDone() { checkpoint_manager_->WaitForCheckpointDone(); } - + } // namespace pikiwidb diff --git a/src/db.h b/src/db.h index 1f5e28962..ee4a46300 100644 --- a/src/db.h +++ b/src/db.h @@ -11,7 +11,7 @@ #include #include "checkpoint_manager.h" -#include "log.h" +#include "pstd/log.h" #include "pstd/noncopyable.h" #include "storage/storage.h" namespace pikiwidb { @@ -55,7 +55,6 @@ class DB { bool opened_ = false; std::unique_ptr checkpoint_manager_; - }; } // namespace pikiwidb diff --git a/src/pikiwidb.h b/src/pikiwidb.h index 389096e24..1e9c547c8 100644 --- a/src/pikiwidb.h +++ b/src/pikiwidb.h @@ -7,9 +7,8 @@ #include "cmd_table_manager.h" #include "common.h" -#include "event_loop.h" #include "io_thread_pool.h" -#include "tcp_connection.h" +#include "net/tcp_connection.h" #include "praft/praft.h" #define kPIKIWIDB_VERSION "4.0.0" diff --git a/src/praft/CMakeLists.txt b/src/praft/CMakeLists.txt index c2b059b89..6e302f941 100644 --- a/src/praft/CMakeLists.txt +++ b/src/praft/CMakeLists.txt @@ -2,30 +2,28 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. An additional grant # of patent rights can be found in the PATENTS file in the same directory. - -FILE(GLOB PRAFT_PROTO "${CMAKE_CURRENT_SOURCE_DIR}/*.proto") -EXECUTE_PROCESS( - COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} --cpp_out=${CMAKE_CURRENT_SOURCE_DIR} -I=${CMAKE_CURRENT_SOURCE_DIR} ${PRAFT_PROTO} +ADD_CUSTOM_COMMAND( + OUTPUT "${PROTO_OUTPUT_DIR}/binlog.pb.cc" + DEPENDS extern_protobuf + COMMAND ${PROTOBUF_PROTOC_EXECUTABLE} + ARGS -I ${CMAKE_CURRENT_SOURCE_DIR} + --cpp_out ${PROTO_OUTPUT_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/binlog.proto ) +ADD_LIBRARY(binlog_pb STATIC "${PROTO_OUTPUT_DIR}/binlog.pb.cc") FILE(GLOB PRAFT_SRC "${CMAKE_CURRENT_SOURCE_DIR}/*.cc" - "${CMAKE_CURRENT_SOURCE_DIR}/*.h" ) SET(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin) ADD_LIBRARY(praft ${PRAFT_SRC}) TARGET_INCLUDE_DIRECTORIES(praft PRIVATE ${PROJECT_SOURCE_DIR}/src - PRIVATE ${PROJECT_SOURCE_DIR}/src/pstd - PRIVATE ${PROJECT_SOURCE_DIR}/src/net - PRIVATE ${PROJECT_SOURCE_DIR}/src/storage/include - PRIVATE ${rocksdb_SOURCE_DIR}/ PRIVATE ${rocksdb_SOURCE_DIR}/include PRIVATE ${BRAFT_INCLUDE_DIR} PRIVATE ${BRPC_INCLUDE_DIR} - PRIVATE ${GFLAGS_INCLUDE_PATH} - PRIVATE ${PROJECT_SOURCE_DIR}/src/praft + PRIVATE ${PROTO_OUTPUT_DIR} ) IF(CMAKE_SYSTEM_NAME STREQUAL "Linux") @@ -34,4 +32,4 @@ ENDIF() TARGET_LINK_LIBRARIES(praft net; dl; fmt; storage; pstd braft brpc ssl crypto zlib protobuf leveldb gflags rocksdb z ${PRAFT_LIB}) -SET_TARGET_PROPERTIES(praft PROPERTIES LINKER_LANGUAGE CXX) \ No newline at end of file +SET_TARGET_PROPERTIES(praft PROPERTIES LINKER_LANGUAGE CXX) diff --git a/src/praft/binlog.proto b/src/praft/binlog.proto new file mode 100644 index 000000000..264da0780 --- /dev/null +++ b/src/praft/binlog.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; +package pikiwidb; +// option optimize_for = LITE_RUNTIME; + +enum OperateType { + kNoOperate = 0; + kPut = 1; + kDelete = 2; +} + +message BinlogEntry { + uint32 cf_idx = 1; + OperateType op_type = 2; + bytes key = 3; + optional bytes value = 4; +} + +message Binlog { + uint32 db_id = 1; + uint32 slot_idx = 2; + repeated BinlogEntry entries = 3; +} diff --git a/src/praft/praft.cc b/src/praft/praft.cc index d68480c70..dcc80893d 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -8,19 +8,24 @@ // // praft.cc -#include "praft.h" - #include -#include -#include +#include "braft/snapshot.h" + +#include "braft/util.h" +#include "brpc/server.h" + +#include "pstd/log.h" +#include "pstd/pstd_string.h" + +#include "binlog.pb.h" #include "client.h" #include "config.h" -#include "event_loop.h" -#include "log.h" #include "pikiwidb.h" +#include "praft.h" #include "praft.pb.h" -#include "pstd_string.h" +#include "psnapshot.h" +#include "store.h" #define ERROR_LOG_AND_STATUS(msg) \ ({ \ @@ -30,16 +35,6 @@ namespace pikiwidb { -class DummyServiceImpl : public DummyService { - public: - explicit DummyServiceImpl(PRaft* praft) : praft_(praft) {} - void DummyMethod(::google::protobuf::RpcController* controller, const ::pikiwidb::DummyRequest* request, - ::pikiwidb::DummyResponse* response, ::google::protobuf::Closure* done) {} - - private: - PRaft* praft_; -}; - PRaft& PRaft::Instance() { static PRaft store; return store; @@ -51,12 +46,7 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { } server_ = std::make_unique(); - DummyServiceImpl service(&PRAFT); auto port = g_config.port + pikiwidb::g_config.raft_port_offset; - // Add your service into RPC server - if (server_->AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { - return ERROR_LOG_AND_STATUS("Failed to add service"); - } // raft can share the same RPC server. Notice the second parameter, because // adding services into a running server is not allowed and the listen // address of this server is impossible to get before the server starts. You @@ -108,12 +98,15 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { // node_options_.election_timeout_ms = FLAGS_election_timeout_ms; node_options_.fsm = this; node_options_.node_owns_fsm = false; - // node_options_.snapshot_interval_s = FLAGS_snapshot_interval; + node_options_.snapshot_interval_s = 0; std::string prefix = "local://" + g_config.dbpath + "_praft"; node_options_.log_uri = prefix + "/log"; node_options_.raft_meta_uri = prefix + "/raft_meta"; node_options_.snapshot_uri = prefix + "/snapshot"; // node_options_.disable_cli = FLAGS_disable_cli; + snapshot_adaptor_ = new PPosixFileSystemAdaptor(); + node_options_.snapshot_file_system_adaptor = &snapshot_adaptor_; + node_ = std::make_unique("pikiwidb", braft::PeerId(addr)); // group_id if (node_->init(node_options_) != 0) { node_.reset(); @@ -308,6 +301,19 @@ butil::Status PRaft::RemovePeer(const std::string& peer) { return {0, "OK"}; } +butil::Status PRaft::DoSnapshot(int64_t self_snapshot_index, bool is_sync) { + if (!node_) { + return ERROR_LOG_AND_STATUS("Node is not initialized"); + } + braft::SynchronizedClosure done; + node_->snapshot(&done); // @todo self_snapshot_index + if (is_sync) { + done.wait(); + } + + return {0, "OK"}; +} + void PRaft::OnJoinCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) { auto cli = join_ctx_.GetClient(); if (cli) { @@ -339,23 +345,96 @@ void PRaft::Join() { } } -void PRaft::Apply(braft::Task& task) { - if (node_) { - node_->apply(task); +void PRaft::AppendLog(const Binlog& log, std::promise&& promise) { + assert(node_); + butil::IOBuf data; + butil::IOBufAsZeroCopyOutputStream wrapper(&data); + auto done = new PRaftWriteDoneClosure(std::move(promise)); + if (!log.SerializeToZeroCopyStream(&wrapper)) { + done->SetStatus(rocksdb::Status::Incomplete("Failed to serialize binlog")); + done->Run(); + return; + } + DEBUG("append binlog: {}", log.ShortDebugString()); + braft::Task task; + task.data = &data; + task.done = done; + node_->apply(task); +} + +void PRaft::recursive_copy(const std::filesystem::path& source, const std::filesystem::path& destination) { + if (std::filesystem::is_regular_file(source)) { + if (source.filename() == PBRAFT_SNAPSHOT_META_FILE) { + return; + } else if (source.extension() == ".sst") { + // Create a hard link + INFO("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string()); + ::link(source.c_str(), destination.c_str()); + } else { + // Copy the file + INFO("copy success! source_file = {} , destination_file = {}", source.string(), destination.string()); + std::filesystem::copy_file(source, destination, std::filesystem::copy_options::overwrite_existing); + } + } else { + if (!pstd::FileExists(destination)) { + pstd::CreateDir(destination); + } + + for (const auto& entry : std::filesystem::directory_iterator(source)) { + recursive_copy(entry.path(), destination / entry.path().filename()); + } } } // @braft::StateMachine void PRaft::on_apply(braft::Iterator& iter) { // A batch of tasks are committed, which must be processed through - // |iter| for (; iter.valid(); iter.next()) { + auto done = iter.done(); + brpc::ClosureGuard done_guard(done); + + Binlog log; + butil::IOBufAsZeroCopyInputStream wrapper(iter.data()); + bool success = log.ParseFromZeroCopyStream(&wrapper); + DEBUG("apply binlog: {}", log.ShortDebugString()); + + if (!success) { + static constexpr std::string_view kMsg = "Failed to parse from protobuf when on_apply"; + ERROR(kMsg); + if (done) { // in leader + dynamic_cast(done)->SetStatus(rocksdb::Status::Incomplete(kMsg)); + } + braft::run_closure_in_bthread(done_guard.release()); + return; + } + + auto s = PSTORE.GetBackend(log.db_id())->GetStorage()->OnBinlogWrite(log); + if (done) { // in leader + dynamic_cast(done)->SetStatus(s); + } + // _applied_index = iter.index(); // consider to maintain a member applied_idx + braft::run_closure_in_bthread(done_guard.release()); } } -void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) {} +void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) { + brpc::ClosureGuard done_guard(done); +} -int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { return 0; } +int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { + CHECK(!IsLeader()) << "Leader is not supposed to load snapshot"; + auto reader_path = reader->get_path(); // xx/snapshot_0000001 + auto db_path = g_config.dbpath; + PSTORE.Clear(); + for (int i = 0; i < g_config.databases; i++) { + auto sub_path = db_path + std::to_string(i); + pstd::DeleteDirIfExist(sub_path); + } + db_path.pop_back(); + recursive_copy(reader_path, db_path); + PSTORE.Init(); + return 0; +} void PRaft::on_leader_start(int64_t term) { WARN("Node {} start to be leader, term={}", node_->node_id().to_string(), term); diff --git a/src/praft/praft.h b/src/praft/praft.h index 8d8b1976a..0f4655d57 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -7,22 +7,17 @@ #pragma once +#include +#include #include #include +#include #include #include -#include -#include "braft/configuration.h" +#include "braft/file_system_adaptor.h" #include "braft/raft.h" -#include "braft/util.h" -#include "brpc/controller.h" -#include "brpc/server.h" -#include "butil/status.h" - -#include "client.h" -#include "event_loop.h" -#include "tcp_connection.h" +#include "rocksdb/status.h" namespace pikiwidb { @@ -30,6 +25,11 @@ namespace pikiwidb { #define PRAFT PRaft::Instance() +class PClient; +class EventLoop; +class Binlog; +class PPosixFileSystemAdaptor; + class JoinCmdContext { friend class PRaft; @@ -73,10 +73,24 @@ class JoinCmdContext { int port_ = 0; }; -class PRaft : public braft::StateMachine { +class PRaftWriteDoneClosure : public braft::Closure { public: - PRaft() : server_(nullptr), node_(nullptr) {} + explicit PRaftWriteDoneClosure(std::promise&& promise) : promise_(std::move(promise)) {} + + void Run() override { + promise_.set_value(result_); + delete this; + } + void SetStatus(rocksdb::Status status) { result_ = std::move(status); } + private: + std::promise promise_; + rocksdb::Status result_{rocksdb::Status::Aborted("Unknown error")}; +}; + +class PRaft : public braft::StateMachine { + public: + PRaft() = default; ~PRaft() override = default; static PRaft& Instance(); @@ -87,11 +101,11 @@ class PRaft : public braft::StateMachine { butil::Status Init(std::string& group_id, bool initial_conf_is_null); butil::Status AddPeer(const std::string& peer); butil::Status RemovePeer(const std::string& peer); - butil::Status RaftRecvEntry(); + butil::Status DoSnapshot(int64_t self_snapshot_index = 0, bool is_sync = true); void ShutDown(); void Join(); - void Apply(braft::Task& task); + void AppendLog(const Binlog& log, std::promise&& promise); //===--------------------------------------------------------------------===// // ClusterJoin command @@ -125,6 +139,9 @@ class PRaft : public braft::StateMachine { void on_stop_following(const ::braft::LeaderChangeContext& ctx) override; void on_start_following(const ::braft::LeaderChangeContext& ctx) override; + private: + void recursive_copy(const std::filesystem::path& source, const std::filesystem::path& destination); + private: std::unique_ptr server_; // brpc std::unique_ptr node_; @@ -133,6 +150,8 @@ class PRaft : public braft::StateMachine { JoinCmdContext join_ctx_; // context for cluster join command std::string dbid_; // dbid of group, + + scoped_refptr snapshot_adaptor_ = nullptr; }; } // namespace pikiwidb diff --git a/src/praft/praft.proto b/src/praft/praft.proto deleted file mode 100644 index 61a495f21..000000000 --- a/src/praft/praft.proto +++ /dev/null @@ -1,13 +0,0 @@ -syntax="proto3"; -package pikiwidb; -option cc_generic_services = true; - -message DummyRequest { -}; - -message DummyResponse { -}; - -service DummyService { - rpc DummyMethod(DummyRequest) returns (DummyResponse); -}; diff --git a/src/praft/psnapshot.cc b/src/praft/psnapshot.cc new file mode 100644 index 000000000..73618d18c --- /dev/null +++ b/src/praft/psnapshot.cc @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +// +// psnapshot.cc + +#include + +#include "braft/file_system_adaptor.h" +#include "braft/local_file_meta.pb.h" +#include "braft/snapshot.h" +#include "butil/files/file_path.h" +#include "config.h" +#include "psnapshot.h" +#include "store.h" + +namespace pikiwidb { + +struct PConfig; +extern PConfig g_config; + +braft::FileAdaptor* PPosixFileSystemAdaptor::open(const std::string& path, int oflag, + const ::google::protobuf::Message* file_meta, butil::File::Error* e) { + if ((oflag & IS_RDONLY) == 0) { // This is a read operation + bool found_other_files = false; + std::string snapshot_path; + + // parse snapshot path + butil::FilePath parse_snapshot_path(path); + std::vector components; + parse_snapshot_path.GetComponents(&components); + for (auto component : components) { + snapshot_path += component + "/"; + if (component.find("snapshot_") != std::string::npos) { + break; + } + } + + // check whether snapshots have been created + std::lock_guard guard(mutex_); + if (!snapshot_path.empty()) { + for (const auto& entry : std::filesystem::directory_iterator(snapshot_path)) { + std::string filename = entry.path().filename().string(); + if (entry.is_regular_file() || entry.is_directory()) { + if (filename != "." && filename != ".." && filename.find(PBRAFT_SNAPSHOT_META_FILE) == std::string::npos) { + // If the path directory contains files other than raft_snapshot_meta, snapshots have been generated + found_other_files = true; + break; + } + } + } + } + + // Snapshot generation + if (!found_other_files) { + INFO("start generate snapshot"); + braft::LocalSnapshotMetaTable snapshot_meta_memtable; + std::string meta_path = snapshot_path + "/" PBRAFT_SNAPSHOT_META_FILE; + braft::FileSystemAdaptor* fs = braft::default_file_system(); + snapshot_meta_memtable.load_from_file(fs, meta_path); + + TasksVector tasks; + tasks.reserve(g_config.databases); + for (auto i = 0; i < g_config.databases; ++i) { + tasks.push_back({TaskType::kCheckpoint, i, {{TaskArg::kCheckpointPath, snapshot_path}}}); + } + + PSTORE.DoSomeThingSpecificDB(tasks); + PSTORE.WaitForCheckpointDone(); + add_all_files(snapshot_path, &snapshot_meta_memtable, snapshot_path); + const int rc = snapshot_meta_memtable.save_to_file(fs, meta_path); + if (rc == 0) { + INFO("Succeed to save, path: {}", snapshot_path); + } else { + ERROR("Fail to save, path: {}", snapshot_path); + } + INFO("end generate snapshot"); + } + } + + return braft::PosixFileSystemAdaptor::open(path, oflag, file_meta, e); +} + +void PPosixFileSystemAdaptor::add_all_files(const std::filesystem::path& dir, + braft::LocalSnapshotMetaTable* snapshot_meta_memtable, + const std::string& path) { + for (const auto& entry : std::filesystem::directory_iterator(dir)) { + if (entry.is_directory()) { + if (entry.path() != "." && entry.path() != "..") { + INFO("dir_path = {}", entry.path().string()); + add_all_files(entry.path(), snapshot_meta_memtable, path); + } + } else { + INFO("file_path = {}", std::filesystem::relative(entry.path(), path).string()); + braft::LocalFileMeta meta; + if (snapshot_meta_memtable->add_file(std::filesystem::relative(entry.path(), path), meta) != 0) { + WARN("Failed to add file"); + } + } + } +} + +} // namespace pikiwidb diff --git a/src/praft/psnapshot.h b/src/praft/psnapshot.h new file mode 100644 index 000000000..d47625cdb --- /dev/null +++ b/src/praft/psnapshot.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include + +#include "braft/file_system_adaptor.h" +#include "braft/macros.h" + +#define PBRAFT_SNAPSHOT_META_FILE "__raft_snapshot_meta" +#define PBRAFT_SNAPSHOT_PATH "snapshot/snapshot_" +#define IS_RDONLY 0x01 + +namespace braft { +class LocalSnapshotMetaTable; +} + +namespace pikiwidb { + +class PPosixFileSystemAdaptor : public braft::PosixFileSystemAdaptor { + public: + PPosixFileSystemAdaptor() {} + ~PPosixFileSystemAdaptor() {} + + braft::FileAdaptor* open(const std::string& path, int oflag, const ::google::protobuf::Message* file_meta, + butil::File::Error* e) override; + void add_all_files(const std::filesystem::path& dir, braft::LocalSnapshotMetaTable* snapshot_meta_memtable, + const std::string& path); + + private: + braft::raft_mutex_t mutex_; +}; + +} // namespace pikiwidb diff --git a/src/replication.h b/src/replication.h index 142656788..11d8807f8 100644 --- a/src/replication.h +++ b/src/replication.h @@ -12,9 +12,9 @@ #include #include "common.h" -#include "memory_file.h" +#include "net/unbounded_buffer.h" #include "net/util.h" -#include "unbounded_buffer.h" +#include "pstd/memory_file.h" namespace pikiwidb { diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index babd7f5d2..b34adb24f 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -19,8 +19,21 @@ TARGET_INCLUDE_DIRECTORIES(storage PRIVATE ${rocksdb_SOURCE_DIR}/include PRIVATE ${BRAFT_INCLUDE_DIR} PRIVATE ${BRPC_INCLUDE_DIR} + PRIVATE ${PROTO_OUTPUT_DIR} ) -TARGET_LINK_LIBRARIES (storage pstd braft brpc ssl crypto zlib protobuf leveldb gflags rocksdb) +TARGET_LINK_LIBRARIES (storage + pstd + braft + brpc + ssl + crypto + zlib + leveldb + gflags + rocksdb + binlog_pb + protobuf +) SET_TARGET_PROPERTIES(storage PROPERTIES LINKER_LANGUAGE CXX) diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index c92cf59d7..d67aa8204 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -26,7 +26,9 @@ #include "pstd/pstd_mutex.h" #include "storage/slot_indexer.h" -#include "braft/raft.h" +namespace pikiwidb { +class Binlog; +} namespace storage { @@ -41,6 +43,7 @@ inline constexpr size_t BATCH_DELETE_LIMIT = 100; inline constexpr size_t COMPACT_THRESHOLD_COUNT = 2000; inline constexpr uint64_t kNoFlush = std::numeric_limits::max(); +inline constexpr uint64_t kFlush = 0; using Options = rocksdb::Options; using BlockBasedTableOptions = rocksdb::BlockBasedTableOptions; @@ -64,6 +67,8 @@ struct StorageOptions { size_t small_compaction_duration_threshold = 10000; size_t db_instance_num = 3; // default = 3 int db_id; + bool is_use_raft = true; + uint32_t raft_timeout = 10; Status ResetOptions(const OptionType& option_type, const std::unordered_map& options_map); }; @@ -1086,6 +1091,7 @@ class Storage { Status SetOptions(const OptionType& option_type, const std::unordered_map& options); void GetRocksDBInfo(std::string& info); + Status OnBinlogWrite(const pikiwidb::Binlog& log); private: std::vector> insts_; diff --git a/src/storage/src/batch.h b/src/storage/src/batch.h new file mode 100644 index 000000000..c8184caa5 --- /dev/null +++ b/src/storage/src/batch.h @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2024-present, Qihoo, Inc. All rights reserved. + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#pragma once + +#include +#include +#include +#include + +#include "rocksdb/db.h" + +#include "binlog.pb.h" +#include "praft/praft.h" +#include "src/redis.h" +#include "storage/storage.h" +#include "storage/storage_define.h" + +namespace storage { + +class Batch { + public: + virtual ~Batch() = default; + + virtual void Put(ColumnFamilyIndex cf_idx, const Slice& key, const Slice& val) = 0; + virtual void Delete(ColumnFamilyIndex cf_idx, const Slice& key) = 0; + virtual auto Commit() -> Status = 0; + + static auto CreateBatch(Redis* redis, bool use_binlog = false) -> std::unique_ptr; +}; + +class RocksBatch : public Batch { + public: + RocksBatch(rocksdb::DB* db, const rocksdb::WriteOptions& options, + const std::vector& handles) + : db_(db), options_(options), handles_(handles) {} + + void Put(ColumnFamilyIndex cf_idx, const Slice& key, const Slice& val) override { + batch_.Put(handles_[cf_idx], key, val); + } + void Delete(ColumnFamilyIndex cf_idx, const Slice& key) override { batch_.Delete(handles_[cf_idx], key); } + auto Commit() -> Status override { return db_->Write(options_, &batch_); } + + private: + rocksdb::WriteBatch batch_; + rocksdb::DB* db_; + const rocksdb::WriteOptions& options_; + const std::vector& handles_; +}; + +class BinlogBatch : public Batch { + public: + BinlogBatch(int32_t index, uint32_t seconds = 10) : seconds_(seconds) { + binlog_.set_db_id(0); + binlog_.set_slot_idx(index); + } + + void Put(ColumnFamilyIndex cf_idx, const Slice& key, const Slice& value) override { + auto entry = binlog_.add_entries(); + entry->set_cf_idx(cf_idx); + entry->set_op_type(pikiwidb::OperateType::kPut); + entry->set_key(key.ToString()); + entry->set_value(value.ToString()); + } + + void Delete(ColumnFamilyIndex cf_idx, const Slice& key) override { + auto entry = binlog_.add_entries(); + entry->set_cf_idx(cf_idx); + entry->set_op_type(pikiwidb::OperateType::kDelete); + entry->set_key(key.ToString()); + } + + Status Commit() override { + std::promise promise; + auto future = promise.get_future(); + pikiwidb::PRaft::Instance().AppendLog(binlog_, std::move(promise)); + auto status = future.wait_for(std::chrono::seconds(seconds_)); + if (status == std::future_status::timeout) { + return Status::Incomplete("Wait for write timeout"); + } + return future.get(); + } + + private: + pikiwidb::Binlog binlog_; + uint32_t seconds_ = 10; +}; + +inline auto Batch::CreateBatch(Redis* redis, bool use_binlog) -> std::unique_ptr { + if (use_binlog) { + return std::make_unique(redis->GetIndex(), redis->GetRaftTimeout()); + } + return std::make_unique(redis->GetDB(), redis->GetWriteOptions(), redis->GetColumnFamilyHandles()); +} + +} // namespace storage diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 5203e3bb3..ceb716d5c 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -52,6 +52,8 @@ Redis::~Redis() { } Status Redis::Open(const StorageOptions& storage_options, const std::string& db_path) { + is_use_raft_ = storage_options.is_use_raft; + raft_timeout_ = storage_options.raft_timeout; statistics_store_->SetCapacity(storage_options.statistics_max_size); small_compaction_threshold_ = storage_options.small_compaction_threshold; diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index e5439042a..9ad5565f3 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -210,6 +210,9 @@ class Redis { Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold); Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold); void GetRocksDBInfo(std::string& info, const char* prefix); + auto GetWriteOptions() const -> const rocksdb::WriteOptions& { return default_write_options_; } + auto GetColumnFamilyHandles() const -> const std::vector& { return handles_; } + auto GetRaftTimeout() const -> uint32_t { return raft_timeout_; } // Sets Commands Status SAdd(const Slice& key, const std::vector& members, int32_t* ret); @@ -354,6 +357,10 @@ class Redis { std::atomic_uint64_t small_compaction_duration_threshold_; std::unique_ptr> statistics_store_; + // For raft + bool is_use_raft_ = false; + uint32_t raft_timeout_ = 10; + Status UpdateSpecificKeyStatistics(const DataType& dtype, const std::string& key, uint64_t count); Status UpdateSpecificKeyDuration(const DataType& dtype, const std::string& key, uint64_t duration); Status AddCompactKeyTaskIfNeeded(const DataType& dtype, const std::string& key, uint64_t count, uint64_t duration); diff --git a/src/storage/src/redis_hashes.cc b/src/storage/src/redis_hashes.cc index 9abf51e87..54c3451df 100644 --- a/src/storage/src/redis_hashes.cc +++ b/src/storage/src/redis_hashes.cc @@ -20,6 +20,7 @@ #include "src/scope_snapshot.h" #include "storage/storage_define.h" #include "storage/util.h" +#include "batch.h" namespace storage { Status Redis::ScanHashesKeyNum(KeyInfo* key_info) { @@ -118,7 +119,7 @@ Status Redis::HDel(const Slice& key, const std::vector& fields, int } } - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this, is_use_raft_); rocksdb::ReadOptions read_options; const rocksdb::Snapshot* snapshot; @@ -145,7 +146,7 @@ Status Redis::HDel(const Slice& key, const std::vector& fields, int if (s.ok()) { del_cnt++; statistic++; - batch.Delete(handles_[kHashesDataCF], hashes_data_key.Encode()); + batch->Delete(kHashesDataCF, hashes_data_key.Encode()); } else if (s.IsNotFound()) { continue; } else { @@ -157,7 +158,7 @@ Status Redis::HDel(const Slice& key, const std::vector& fields, int return Status::InvalidArgument("hash size overflow"); } parsed_hashes_meta_value.ModifyCount(-del_cnt); - batch.Put(handles_[kHashesMetaCF], base_meta_key.Encode(), meta_value); + batch->Put(kHashesMetaCF, base_meta_key.Encode(), meta_value); } } else if (s.IsNotFound()) { *ret = 0; @@ -165,7 +166,7 @@ Status Redis::HDel(const Slice& key, const std::vector& fields, int } else { return s; } - s = db_->Write(default_write_options_, &batch); + batch->Commit(); UpdateSpecificKeyStatistics(DataType::kHashes, key.ToString(), statistic); return s; } @@ -616,7 +617,7 @@ Status Redis::HMSet(const Slice& key, const std::vector& fvs) { } Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res) { - rocksdb::WriteBatch batch; + auto batch = Batch::CreateBatch(this, is_use_raft_); ScopeRecordLock l(lock_mgr_, key); uint64_t version = 0; @@ -631,10 +632,10 @@ Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) { version = parsed_hashes_meta_value.InitialMetaValue(); parsed_hashes_meta_value.SetCount(1); - batch.Put(handles_[kHashesMetaCF], base_meta_key.Encode(), meta_value); + batch->Put(kHashesMetaCF, base_meta_key.Encode(), meta_value); HashesDataKey data_key(key, version, field); BaseDataValue internal_value(value); - batch.Put(handles_[kHashesDataCF], data_key.Encode(), internal_value.Encode()); + batch->Put(kHashesDataCF, data_key.Encode(), internal_value.Encode()); *res = 1; } else { version = parsed_hashes_meta_value.Version(); @@ -647,7 +648,7 @@ Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int return Status::OK(); } else { BaseDataValue internal_value(value); - batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), internal_value.Encode()); + batch->Put(kHashesDataCF, hashes_data_key.Encode(), internal_value.Encode()); statistic++; } } else if (s.IsNotFound()) { @@ -656,8 +657,8 @@ Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int } parsed_hashes_meta_value.ModifyCount(1); BaseDataValue internal_value(value); - batch.Put(handles_[kHashesMetaCF], base_meta_key.Encode(), meta_value); - batch.Put(handles_[kHashesDataCF], hashes_data_key.Encode(), internal_value.Encode()); + batch->Put(kHashesMetaCF, base_meta_key.Encode(), meta_value); + batch->Put(kHashesDataCF, hashes_data_key.Encode(), internal_value.Encode()); *res = 1; } else { return s; @@ -667,15 +668,15 @@ Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int EncodeFixed32(meta_value_buf, 1); HashesMetaValue meta_value(Slice(meta_value_buf, sizeof(int32_t))); version = meta_value.UpdateVersion(); - batch.Put(handles_[kHashesMetaCF], base_meta_key.Encode(), meta_value.Encode()); + batch->Put(kHashesMetaCF, base_meta_key.Encode(), meta_value.Encode()); HashesDataKey data_key(key, version, field); BaseDataValue internal_value(value); - batch.Put(handles_[kHashesDataCF], data_key.Encode(), internal_value.Encode()); + batch->Put(kHashesDataCF, data_key.Encode(), internal_value.Encode()); *res = 1; } else { return s; } - s = db_->Write(default_write_options_, &batch); + s = batch->Commit(); UpdateSpecificKeyStatistics(DataType::kHashes, key.ToString(), statistic); return s; } diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 02e4312b5..fcc8eb313 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -7,7 +7,7 @@ #include #include "rocksdb/utilities/checkpoint.h" - +#include "binlog.pb.h" #include "config.h" #include "pstd/log.h" #include "pstd/pikiwidb_slot.h" @@ -104,14 +104,6 @@ Status Storage::Open(const StorageOptions& storage_options, const std::string& d Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { INFO("DB{}'s RocksDB {} begin to generate a checkpoint!", db_id_, i); auto source_dir = AppendSubDirectory(dump_path, db_id_); - if (!pstd::FileExists(source_dir)) { - if (0 != pstd::CreatePath(source_dir)) { - WARN("Create Dir {} fail!", source_dir); - return Status::IOError("CreatePath() fail! dir_name : {} ", source_dir); - } - INFO("Create Dir {} success!", source_dir); - } - source_dir = AppendSubDirectory(source_dir, i); auto tmp_dir = source_dir + ".tmp"; @@ -132,7 +124,7 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { // 3) Create a checkpoint std::unique_ptr checkpoint_guard(checkpoint); - s = checkpoint->CreateCheckpoint(tmp_dir, kNoFlush, nullptr); + s = checkpoint->CreateCheckpoint(tmp_dir, kFlush, nullptr); if (!s.ok()) { WARN("DB{}'s RocksDB {} create checkpoint failed!. Error: {}", db_id_, i, s.ToString()); return s; @@ -2258,4 +2250,26 @@ void Storage::DisableWal(const bool is_wal_disable) { } } +Status Storage::OnBinlogWrite(const pikiwidb::Binlog& log) { + auto& inst = insts_[log.slot_idx()]; + + rocksdb::WriteBatch batch; + for (const auto& entry : log.entries()) { + switch (entry.op_type()) { + case pikiwidb::OperateType::kPut: { + assert(entry.has_value()); + batch.Put(inst->GetColumnFamilyHandles()[entry.cf_idx()], entry.key(), entry.value()); + } break; + case pikiwidb::OperateType::kDelete: { + assert(!entry.has_value()); + batch.Delete(inst->GetColumnFamilyHandles()[entry.cf_idx()], entry.key()); + } break; + default: + assert(0); + } + } + + return inst->GetDB()->Write(inst->GetWriteOptions(), &batch); +} + } // namespace storage diff --git a/src/store.cc b/src/store.cc index 1d0170586..61f55759a 100644 --- a/src/store.cc +++ b/src/store.cc @@ -29,7 +29,6 @@ void PStore::Init() { dbNum_ = g_config.databases; backends_.reserve(dbNum_); if (g_config.backend == kBackEndRocksDB) { - for (int i = 0; i < dbNum_; i++) { auto db = std::make_unique(i, g_config.dbpath); backends_.push_back(std::move(db)); @@ -39,6 +38,8 @@ void PStore::Init() { } } +void PStore::Clear() { backends_.clear(); } + void PStore::DoSomeThingSpecificDB(const TasksVector tasks) { std::for_each(tasks.begin(), tasks.end(), [this](const auto& task) { switch (task.type) { @@ -69,7 +70,6 @@ void PStore::WaitForCheckpointDone() { void PStore::trimSlash(std::string& dirName) { while (dirName.back() == '/') { dirName.pop_back(); - } } diff --git a/src/store.h b/src/store.h index 597b01c7f..77f6a7b9a 100644 --- a/src/store.h +++ b/src/store.h @@ -20,11 +20,11 @@ #include #include +#include "braft/raft.h" #include "checkpoint_manager.h" #include "common.h" #include "db.h" #include "storage/storage.h" -#include "braft/raft.h" namespace pikiwidb { @@ -59,12 +59,15 @@ class PStore { void Init(); + void Clear(); + std::unique_ptr& GetBackend(int32_t index) { return backends_[index]; }; void DoSomeThingSpecificDB(const TasksVector task); void WaitForCheckpointDone(); + int GetDBNumber() const { return dbNum_; } std::shared_mutex& SharedMutex() { return dbs_mutex_; }