Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: sync all snapshot data in Pika process without rsync subprocess #1805

Merged
merged 44 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
73dc198
define rsync related header file and proto
Jul 7, 2023
4b92d87
Merge branch 'OpenAtomFoundation:unstable' into optimize-rsync
luky116 Jul 10, 2023
c93f5d6
feat:add throttle (#167)
chejinge Jul 12, 2023
8ba038f
feat: implement rsync network tansform (#169)
wangshao1 Jul 12, 2023
dd51cde
fix action (#171)
luky116 Jul 12, 2023
4bbd2e6
merge unstable code
luky116 Jul 12, 2023
d23e398
fix
luky116 Jul 12, 2023
7785b42
feat: add load local meta file (#175)
luky116 Jul 13, 2023
e4aa948
[feat] add rsync client/server code (#177)
wangshao1 Jul 13, 2023
26d4db8
add rsyncclient to syncslaveslot (#182)
wangshao1 Jul 14, 2023
547530c
feat: add read meta file and data (#179)
luky116 Jul 14, 2023
7bac336
fix compile error (#183)
wangshao1 Jul 14, 2023
b2f4091
fix compile error (#184)
wangshao1 Jul 14, 2023
1b4b979
optimize: add_throttle (#189)
chejinge Jul 14, 2023
fa96b9d
rsyncclient periodically flush meta table (#192)
wangshao1 Jul 14, 2023
84e69da
change rsync response (#190)
luky116 Jul 17, 2023
3fe94e4
add debug log for test
Jul 18, 2023
0d35c3b
fix rsync client/server bugs
wangshao1 Jul 18, 2023
1130130
fix bugs
wangshao1 Jul 18, 2023
612f5e4
add debug log for test
Jul 18, 2023
63c4a7e
fix bugs
wangshao1 Jul 19, 2023
dc86779
fix bugs
Jul 19, 2023
002d34d
fix bugs
wangshao1 Jul 19, 2023
1918b90
rix rsync bugs (#194)
wangshao1 Jul 20, 2023
9583007
fix bugs
wangshao1 Jul 20, 2023
852b8d6
fix bugs
Jul 20, 2023
f39a024
fix bugs 1
wangshao1 Jul 21, 2023
ce861f7
Merge branch 'optimize-rsync' into optimize-rsync-wsy
Jul 22, 2023
bbf6585
fix bugs
Jul 22, 2023
17217ac
fix rsync bugs (#195)
wangshao1 Jul 22, 2023
44c57ed
remove unused code
Jul 23, 2023
5983379
Merge branch 'optimize-rsync' into optimize-rsync-wsy
Jul 23, 2023
fe25080
remove unused code
Jul 23, 2023
6fa3717
Merge branch 'unstable' into optimize-rsync-wsy
Jul 24, 2023
f48726b
remove unused code
Jul 24, 2023
1e7750b
Merge pull request #211 from wangshao1/optimize-rsync-wangsy
chejinge Jul 24, 2023
c5936b2
remove unused code
Jul 24, 2023
afd133a
add copyright
Jul 24, 2023
4a54423
Merge pull request #212 from wangshao1/optimize-rsync-wangsy
chejinge Jul 24, 2023
bbd489c
fix by review comments (#213)
wangshao1 Jul 25, 2023
17095de
fix by review comments (#214)
wangshao1 Jul 26, 2023
2090367
fix by review comments (#216)
wangshao1 Jul 26, 2023
2be71f4
Optimize rsync wangsy (#217)
wangshao1 Jul 26, 2023
01b6a22
fix by review comments (#218)
wangshao1 Jul 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/operator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:

- name: Build
run: |
cd tools/pika_operator && make
cd tools/pika_operator && make
- name: Unit Test
run: |
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ set(PIKA_BUILD_VERSION_CC ${CMAKE_BINARY_DIR}/pika_build_version.cc)
message("PIKA_BUILD_VERSION_CC : " ${PIKA_BUILD_VERSION_CC})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/build_version.cc.in ${PIKA_BUILD_VERSION_CC} @ONLY)

set(PROTO_FILES ${CMAKE_CURRENT_SOURCE_DIR}/src/pika_inner_message.proto)
set(PROTO_FILES ${CMAKE_CURRENT_SOURCE_DIR}/src/pika_inner_message.proto ${CMAKE_CURRENT_SOURCE_DIR}/src/rsync_service.proto)
custom_protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${PROTO_FILES})
message("pika PROTO_SRCS = ${PROTO_SRCS}")
message("pika PROTO_HDRS = ${PROTO_HDRS}")
Expand Down
2 changes: 2 additions & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class PikaServer;
/* Port shift */
const int kPortShiftRSync = 1000;
const int kPortShiftReplServer = 2000;
// todo 待移除,使用 kPortShiftRSync
wangshao1 marked this conversation as resolved.
Show resolved Hide resolved
const int kPortShiftRsync2 = 10001;

const std::string kPikaPidFile = "pika.pid";
const std::string kPikaSecretFile = "rsync.secret";
Expand Down
6 changes: 6 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "include/pika_repl_server.h"
#include "include/pika_slave_node.h"
#include "include/pika_stable_log.h"
#include "include/rsync_client.h"

#define kBinlogSendPacketNum 40
#define kBinlogSendBatchNum 100
Expand Down Expand Up @@ -157,7 +158,12 @@ class SyncSlaveSlot : public SyncSlot {

std::string LocalIp();

void ActivateRsync();

bool IsRsyncRunning() {return rsync_cli_->IsRunning();}

private:
std::unique_ptr<rsync::RsyncClient> rsync_cli_;
pstd::Mutex slot_mu_;
RmNode m_info_;
ReplState repl_state_{kNoConnect};
Expand Down
4 changes: 4 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"
#include "include/pika_rsync_service.h"
#include "include/rsync_server.h"
#include "include/pika_statistic.h"
#include "include/pika_slot_command.h"
#include "include/pika_migrate_thread.h"
Expand Down Expand Up @@ -261,6 +262,8 @@ class PikaServer : public pstd::noncopyable {
/*
* DBSync used
*/
pstd::Status GetDumpUUID(const std::string& db_name, const uint32_t slot_id, std::string* snapshot_uuid);
pstd::Status GetDumpMeta(const std::string& db_name, const uint32_t slot_id, std::vector<std::string>* files, std::string* snapshot_uuid);
void DBSync(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id);
void TryDBSync(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id, int32_t top);
void DbSyncSendFile(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id);
Expand Down Expand Up @@ -587,6 +590,7 @@ class PikaServer : public pstd::noncopyable {
* Rsync used
*/
std::unique_ptr<PikaRsyncService> pika_rsync_service_;
std::unique_ptr<rsync::RsyncServer> rsync_server_;

/*
* Pubsub used
Expand Down
3 changes: 3 additions & 0 deletions include/pika_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
bool IsBgSaving();
void BgSaveSlot();
BgSaveInfo bgsave_info();
void GetBgSaveMetaData(std::vector<std::string>* fileNames, std::string* snapshot_uuid);
pstd::Status GetBgSaveUUID(std::string* snapshot_uuid);

// FlushDB & FlushSubDB use
bool FlushDB();
Expand All @@ -93,6 +95,7 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
private:
std::string db_name_;
uint32_t slot_id_ = 0;
std::string snapshot_uuid_;

std::string db_path_;
std::string bgsave_sub_path_;
Expand Down
156 changes: 156 additions & 0 deletions include/rsync_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef RSYNC_CLIENT_H_
#define RSYNC_CLIENT_H_
wangshao1 marked this conversation as resolved.
Show resolved Hide resolved
#include <glog/logging.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <list>
#include <atomic>
#include <memory>
#include <thread>
#include <condition_variable>

#include "include/rsync_client_thread.h"
#include "net/include/bg_thread.h"
#include "pstd/include/pstd_status.h"
#include "include/rsync_client_thread.h"
#include "net/include/net_cli.h"
#include "pstd/include/env.h"
#include "pstd/include/pstd_hash.h"
#include "pstd/include/pstd_string.h"
#include "pstd/include/pstd_status.h"
#include "rsync_service.pb.h"
#include "include/throttle.h"

using namespace pstd;
using namespace net;
using namespace RsyncService;

const std::string kDumpMetaFileName = "DUMP_META_DATA";
const std::string kUuidPrefix = "snapshot-uuid:";

namespace rsync {

class RsyncWriter;
class Session;
class WaitObject;

class RsyncClient : public net::Thread {
public:
enum State {
IDLE,
RUNNING,
STOP,
};
RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id);
void* ThreadMain() override;
bool Init();
Status Start();
Status Stop();
bool IsRunning() {
return state_.load() == RUNNING;
}
bool IsIdle() { return state_.load() == IDLE;}
void OnReceive(RsyncResponse* resp);

private:
bool Recover();
Status Wait(RsyncResponse*& resp);
Status CopyRemoteFile(const std::string& filename);
Status CopyRemoteMeta(std::string* snapshot_uuid, std::set<std::string>* file_set);
Status LoadLocalMeta(std::string* snapshot_uuid, std::map<std::string, std::string>* file_map);
std::string GetLocalMetaFilePath();
AlexStocks marked this conversation as resolved.
Show resolved Hide resolved
Status FlushMetaTable();
Status CleanUpExpiredFiles(bool need_reset_path, const std::set<std::string>& files);
Status UpdateLocalMeta(const std::string& snapshot_uuid, const std::set<std::string>& expired_files, std::map<std::string, std::string>* localFileMap);
void HandleRsyncMetaResponse(RsyncResponse* response);

private:
std::map<std::string, std::string> meta_table_;
int flush_period_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class member 初始化没做

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在构造函数里初始化

std::set<std::string> file_set_;
std::string snapshot_uuid_;
std::string dir_;
std::string db_name_;
uint32_t slot_id_;
std::unique_ptr<RsyncClientThread> client_thread_;
std::atomic<State> state_;
int max_retries_;
std::unique_ptr<WaitObject> wo_;
std::condition_variable cond_;
std::mutex mu_;
std::unique_ptr<Throttle> throttle_;
std::string master_ip_;
int master_port_;
};

class RsyncWriter {
public:
RsyncWriter(const std::string& filepath) {
filepath_ = filepath;
fd_ = open(filepath.c_str(), O_RDWR | O_APPEND | O_CREAT, 0644);
}
~RsyncWriter() {}
Status Write(uint64_t offset, size_t n, const char* data) {
const char* ptr = data;
size_t left = n;
Status s;
while (left != 0) {
ssize_t done = write(fd_, ptr, left);
if (done < 0) {
if (errno == EINTR) {
continue;
}
LOG(WARNING) << "pwrite failed, filename: " << filepath_ << "errno: " << strerror(errno) << "n: " << n;
return Status::IOError(filepath_, "pwrite failed");
}
left -= done;
ptr += done;
offset += done;
}
return Status::OK();
}
Status Close() {
close(fd_);
return Status::OK();
}
Status Fsync() {
fsync(fd_);
return Status::OK();
}

private:
std::string filepath_;
int fd_;
};

class WaitObject {
public:
WaitObject() : filename_(""), type_(RsyncService::kRsyncMeta), offset_(0), resp_(nullptr) {}
~WaitObject() {}
void Reset(const std::string& filename, RsyncService::Type t, size_t offset) {
resp_ = nullptr;
filename_ = filename;
type_ = t;
offset_ = offset;
}
void Reset(RsyncService::Type t) {
resp_ = nullptr;
filename_ = "";
type_ = t;
offset_ = 0xFFFFFFFF;
}
std::string filename_;
RsyncService::Type type_;
wangshao1 marked this conversation as resolved.
Show resolved Hide resolved
size_t offset_;
RsyncResponse* resp_;
};

} // end namespace rsync

#endif
wangshao1 marked this conversation as resolved.
Show resolved Hide resolved
55 changes: 55 additions & 0 deletions include/rsync_client_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef RSYNC_CLIENT_THREAD_H_
#define RSYNC_CLIENT_THREAD_H_

#include "net/include/client_thread.h"
#include "net/include/net_conn.h"
#include "net/include/pb_conn.h"
#include "rsync_service.pb.h"

using namespace pstd;
using namespace net;

namespace rsync {

class RsyncClientConn : public PbConn {
public:
RsyncClientConn(int fd, const std::string& ip_port,
net::Thread* thread, void* cb_handler,
NetMultiplexer* mpx);
~RsyncClientConn() override;
int DealMessage() override;

private:
void* cb_handler_;
wangshao1 marked this conversation as resolved.
Show resolved Hide resolved
};

class RsyncClientConnFactory : public ConnFactory {
public:
RsyncClientConnFactory(void* scheduler) : cb_handler_(scheduler) {}
std::shared_ptr<net::NetConn> NewNetConn(int connfd, const std::string& ip_port,
net::Thread* thread, void* cb_handler,
net::NetMultiplexer* net) const override {
return std::static_pointer_cast<net::NetConn>(
std::make_shared<RsyncClientConn>(connfd, ip_port, thread, cb_handler_, net));
}
private:
void* cb_handler_;
wangshao1 marked this conversation as resolved.
Show resolved Hide resolved
};

class RsyncClientThread : public ClientThread {
public:
RsyncClientThread(int cron_interval, int keepalive_timeout, void* scheduler);
~RsyncClientThread() override;
private:
RsyncClientConnFactory conn_factory_;
ClientHandle handle_;
};

} //end namespace rsync

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code patch appears to be a header file related to a client thread for an rsync application. Here are some suggestions and areas to consider in your code review:

  1. Licensing: Verify that the license information provided (BSD-style license) is accurate and consistent with your project's requirements.

  2. Namespace Usage: It seems that the namespaces pstd and net are being used in this file. Ensure that these namespaces are correctly defined and included in the relevant source files.

  3. Class Design: Review the design of the RsyncClientConn, RsyncClientConnFactory, and RsyncClientThread classes to ensure they align with your intended functionality. Consider whether any additional member variables or methods are needed or if any existing ones can be made more specific.

  4. Error Handling: Check if proper error handling mechanisms, such as exception handling or error return codes, are implemented in the code. It is crucial to handle potential errors or exceptional scenarios gracefully.

  5. Documentation: Consider adding appropriate comments/documentation to aid understanding and maintainability of the codebase. Include brief descriptions of the purpose of classes, methods, and important variables.

  6. Thread Safety: If multiple threads will access the code in question, validate that proper synchronization mechanisms, like locks or atomic operations, are used where necessary to prevent race conditions.

  7. Memory Management: Examine how memory allocation and deallocation are handled within your code. Ensure there are no memory leaks or undefined behavior and that destructors and smart pointers (if applicable) are used appropriately.

  8. Code Style and Consistency: Verify that the coding style is consistent throughout the codebase, adhering to any existing conventions. Maintain readability by following best practices such as indentation, meaningful variable names, and appropriate use of white space.

Remember that a comprehensive code review requires access to the complete codebase including related implementation files, so these suggestions may not cover all aspects of your project.

#endif
wangshao1 marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading