Skip to content

Commit

Permalink
feat: add read meta file and data (OpenAtomFoundation#179)
Browse files Browse the repository at this point in the history
* add read meta file and data
  • Loading branch information
luky116 authored Jul 14, 2023
1 parent 26d4db8 commit 547530c
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 15 deletions.
4 changes: 2 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,10 @@ class PikaServer : public pstd::noncopyable {
* DBSync used
*/
//TODO: yuecai
size_t ReadDumpFile(const std::string& db_name, uint32_t slot_id, const std::string& filename,
pstd::Status ReadDumpFile(const std::string& db_name, uint32_t slot_id, const std::string& filename,
const size_t offset, const size_t count, char* data);
//TODO: yuecai
void GetDumpMeta(const std::string& db_name, const uint32_t slot_id, std::vector<std::string>* files, std::string* snapshot_uuid);
pstd::Status GetDumpMeta(const std::string& db_name, const uint32_t slot_id, std::vector<std::string>* files, std::string* snapshot_uuid);
void DBSync(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id);
void TryDBSync(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id, int32_t top);
void DbSyncSendFile(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id);
Expand Down
1 change: 1 addition & 0 deletions include/pika_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Slot : public std::enable_shared_from_this<Slot>,public pstd::noncopyable
bool IsBgSaving();
void BgSaveSlot();
BgSaveInfo bgsave_info();
void GetBgSaveMetaData(std::vector<std::string>* fileNames, std::string* snapshot_uuid);

// FlushDB & FlushSubDB use
bool FlushDB();
Expand Down
8 changes: 5 additions & 3 deletions include/rsync_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@
#include <atomic>
#include <memory>

#include "rsync_service.pb.h"
#include "net/include/net_conn.h"
#include "net/include/net_thread.h"
#include "net/include/thread_pool.h"
#include "net/include/pb_conn.h"
#include "net/include/server_thread.h"
#include "net/include/thread_pool.h"
#include "net/src/holy_thread.h"
#include "net/src/net_multiplexer.h"
#include "pstd/include/env.h"
#include "net/src/holy_thread.h"
#include "pstd_hash.h"
#include "rsync_service.pb.h"

using namespace net;
using namespace RsyncService;
Expand Down Expand Up @@ -42,6 +43,7 @@ class RsyncServer {
int port_;
std::string ip_;
std::string dir_;
std::string info_dir_;
std::map<std::string, std::shared_ptr<RsyncReader> > file_map_;
std::unique_ptr<ThreadPool> work_thread_ = nullptr;
std::unique_ptr<RsyncServerThread> rsync_server_thread_ = nullptr;
Expand Down
56 changes: 46 additions & 10 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
#include "net/include/redis_cli.h"
#include "pstd/include/env.h"
#include "pstd/include/rsync.h"
# include "pstd/include/pstd_defer.h"

#include "include/pika_cmd_table_manager.h"
#include "include/pika_dispatch_thread.h"
#include "include/pika_rm.h"
#include "pstd_hash.h"

using pstd::Status;
extern PikaServer* g_pika_server;
Expand Down Expand Up @@ -905,24 +907,58 @@ void PikaServer::DBSync(const std::string& ip, int port, const std::string& db_n
bgsave_thread_.Schedule(&DoDBSync, reinterpret_cast<void*>(arg));
}

void PikaServer::GetDumpMeta(const std::string& db_name, const uint32_t slot_id, std::vector<std::string>* files, std::string* snapshot_uuid) {
std::shared_ptr<Slot> slot = GetDBSlotById(db_name, slot_id);
pstd::GetChildren(slot->bgsave_info().path, *files);
*snapshot_uuid = "demo_snapshot_uuid";
pstd::Status PikaServer::GetDumpMeta(const std::string& db_name, const uint32_t slot_id, std::vector<std::string>* fileNames, std::string* snapshot_uuid) {
std::shared_ptr<Slot> slot = GetDBSlotById(db_name, slot_id);
if (!slot) {
LOG(WARNING) << "cannot find slot for db_name " << db_name << "slot_id: " << slot_id;
return pstd::Status::NotFound("slot no found");
}
slot->GetBgSaveMetaData(fileNames, snapshot_uuid);
}

size_t PikaServer::ReadDumpFile(const std::string& db_name, uint32_t slot_id, const std::string& filename,
pstd::Status PikaServer::ReadDumpFile(const std::string& db_name, uint32_t slot_id, const std::string& filename,
const size_t offset, const size_t count, char* data) {
std::shared_ptr<Slot> slot = GetDBSlotById(db_name, slot_id);
if (!slot) {
LOG(WARNING) << "cannot find slot for db_name " << db_name << "slot_id: " << slot_id;
return 0;
return pstd::Status::NotFound("slot no found");
}
const std::string filepath = slot->bgsave_info().path + "/" + filename;
int fd = open(filepath.c_str(), O_RDONLY, 0644);
ssize_t n = pread(fd, data, count, offset);
close(fd);
return n;
int fd = open(filepath.c_str(), O_RDONLY);
if (fd < 0) {
return Status::IOError("fd open failed");
}
DEFER { close(fd); };

const int kMaxCopyBlockSize = 8 << 10;
size_t read_offset = offset;
size_t read_count = count;
if (read_count > kMaxCopyBlockSize) {
read_count = kMaxCopyBlockSize;
}

size_t bytesin = 0;
size_t left_read_count = count;

while ((bytesin = pread(fd, data, read_count, read_offset)) > 0) {
left_read_count -= bytesin;
if (left_read_count <= 0) {
break ;
}
if (read_count > left_read_count) {
read_count = left_read_count;
}

data += bytesin;
read_offset += bytesin;
}

if (bytesin == -1) {
LOG(ERROR) << "unable to read from " << filename;
return pstd::Status::IOError("unable to read from " + filename);
}

return pstd::Status::OK();
}

void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& db_name, uint32_t slot_id,
Expand Down
23 changes: 23 additions & 0 deletions src/pika_slot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "include/pika_server.h"

#include "pstd/include/mutex_impl.h"
#include "pstd_hash.h"

using pstd::Status;

Expand Down Expand Up @@ -296,6 +297,28 @@ BgSaveInfo Slot::bgsave_info() {
return bgsave_info_;
}

void Slot::GetBgSaveMetaData(std::vector<std::string>* fileNames, std::string* snapshot_uuid) {
const std::string dbFilePath = bgsave_info().path;
// todo 待确认 info 文件的路径
const std::string infoFilePath = bgsave_info().path + "/../info";

int ret = pstd::GetChildren(dbFilePath, *fileNames);
if (ret) {
LOG(WARNING) << dbFilePath << " read dump meta files failed! error:" << ret;
return;
}

std::string info_data;
rocksdb::Status s = rocksdb::ReadFileToString(rocksdb::Env::Default(), infoFilePath, &info_data);
if (!s.ok()) {
LOG(WARNING) << "read dump meta info failed! error:" << s.ToString();
return;
}

pstd::MD5 md5 = pstd::MD5(info_data);
*snapshot_uuid = md5.hexdigest();
}

void Slot::DoBgSave(void* arg) {
std::unique_ptr<BgTaskArg> bg_task_arg(static_cast<BgTaskArg*>(arg));

Expand Down

0 comments on commit 547530c

Please sign in to comment.