Skip to content

Commit

Permalink
change the declared position of async_write_db_task_count
Browse files Browse the repository at this point in the history
  • Loading branch information
cheniujh committed Jul 30, 2024
1 parent 8fa2d04 commit 42de339
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 28 deletions.
16 changes: 1 addition & 15 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,13 @@ class ConsensusCoordinator {
return tmp_stream.str();
}

void IncrAsyncWriteDBTaskCount(int32_t step_size) {
async_write_db_task_count_.fetch_add(step_size, std::memory_order::memory_order_seq_cst);
}

void DecrAsyncWriteDBTaskCount(int32_t step_size) {
async_write_db_task_count_.fetch_sub(step_size, std::memory_order::memory_order_seq_cst);
}

private:
pstd::Status TruncateTo(const LogOffset& offset);

pstd::Status InternalAppendLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status InternalAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
void InternalApply(const MemLog::LogItem& log);
void InternalApplyFollower(std::shared_ptr<Cmd> cmd_ptr, std::function<void()>& call_back_fun);
void InternalApplyFollower(std::shared_ptr<Cmd> cmd_ptr);

pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, const BinlogOffset& end_offset,
Expand Down Expand Up @@ -206,11 +198,5 @@ class ConsensusCoordinator {
SyncProgress sync_pros_;
std::shared_ptr<StableLog> stable_logger_;
std::shared_ptr<MemLog> mem_logger_;

//this is used when consuming binlog, which indicates the nums of async writedb tasks that are
//queued or being executing by WriteDBWorkers. If a flushdb-binlog need to apply DB, it must wait
//util this count drop to zero. you can also check pika discussion #2807 to know more
//it is only used in slaveNode when comsuming binlog
std::atomic<int32_t> async_write_db_task_count_{0};
};
#endif // INCLUDE_PIKA_CONSENSUS_H_
30 changes: 29 additions & 1 deletion include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class PikaReplClient {
void ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name);
void ScheduleWriteBinlogTask(const std::string& db_name, const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(std::shared_ptr<Cmd> cmd_ptr, std::function<void()>& call_back_fun);
void ScheduleWriteDBTask(std::shared_ptr<Cmd> cmd_ptr, const std::string& db_name);

pstd::Status SendMetaSync();
pstd::Status SendDBSync(const std::string& ip, uint32_t port, const std::string& db_name,
Expand All @@ -76,6 +76,26 @@ class PikaReplClient {
const std::string& local_ip, bool is_first_send);
pstd::Status SendRemoveSlaveNode(const std::string& ip, uint32_t port, const std::string& db_name, const std::string& local_ip);

void IncrAsyncWriteDBTaskCount(const std::string& db_name, int32_t incr_step) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
async_write_db_task_counts_[db_index].fetch_add(incr_step, std::memory_order::memory_order_seq_cst);
LOG(INFO) << db_name << " incr 1, curr:" << async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst);
}

void DecrAsyncWriteDBTaskCount(const std::string& db_name, int32_t incr_step) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
async_write_db_task_counts_[db_index].fetch_sub(incr_step, std::memory_order::memory_order_seq_cst);
LOG(INFO) << db_name << " decr 1, curr:" << async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst);
}

int32_t GetUnfinishedAsyncDBTaskCount(const std::string& db_name) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
return async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst);
}

private:
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);
size_t GetHashIndexByKey(const std::string& key);
Expand All @@ -84,6 +104,14 @@ class PikaReplClient {
std::unique_ptr<PikaReplClientThread> client_thread_;
int next_avail_ = 0;
std::hash<std::string> str_hash;

// this is used when consuming binlog, which indicates the nums of async write-DB tasks that are
// queued or being executing by WriteDBWorkers. If a flushdb-binlog need to apply DB, it must wait
// util this count drop to zero. you can also check pika discussion #2807 to know more
// it is only used in slaveNode when consuming binlog
std::atomic<int32_t> async_write_db_task_counts_[MAX_DB_NUM];
// [NOTICE] write_db_workers_ must be declared after async_write_db_task_counts_ to ensure write_db_workers_ will be destroyed before async_write_db_task_counts_
// when PikaReplClient is de-constructing, because some of the async task that exec by write_db_workers_ will manipulate async_write_db_task_counts_
std::vector<std::unique_ptr<PikaReplBgWorker>> write_binlog_workers_;
std::vector<std::unique_ptr<PikaReplBgWorker>> write_db_workers_;
};
Expand Down
6 changes: 5 additions & 1 deletion include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class PikaReplicaManager {
void ScheduleWriteBinlogTask(const std::string& db_name,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(std::shared_ptr<Cmd> cmd_ptr, std::function<void()>& call_back_fun);
void ScheduleWriteDBTask(std::shared_ptr<Cmd> cmd_ptr, const std::string& db_name);
void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name);
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);
Expand All @@ -205,6 +205,10 @@ class PikaReplicaManager {
return sync_slave_dbs_;
}

int32_t GetUnfinishedAsyncDBTaskCount(const std::string& db_name) {
return pika_repl_client_->GetUnfinishedAsyncDBTaskCount(db_name);
}

private:
void InitDB();
pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip);
Expand Down
10 changes: 4 additions & 6 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,12 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_pt
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
IncrUnfinishedAsyncWriteDbTaskCount(1);
std::function<void()> call_back = [this]() { this->DecrUnfinishedAsyncWriteDbTaskCount(1); };
InternalApplyFollower(cmd_ptr, call_back);
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (async_write_db_task_count_.load(std::memory_order::memory_order_seq_cst) > 0) {
while (g_pika_rm->GetUnfinishedAsyncDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
Expand Down Expand Up @@ -425,8 +423,8 @@ uint32_t ConsensusCoordinator::term() {
return term_;
}

void ConsensusCoordinator::InternalApplyFollower(std::shared_ptr<Cmd> cmd_ptr, std::function<void()>& call_back_fun) {
g_pika_rm->ScheduleWriteDBTask(std::move(cmd_ptr), call_back_fun);
void ConsensusCoordinator::InternalApplyFollower(std::shared_ptr<Cmd> cmd_ptr) {
g_pika_rm->ScheduleWriteDBTask(std::move(cmd_ptr), db_name_);
}

int ConsensusCoordinator::InitCmd(net::RedisParser* parser, const net::RedisCmdArgsType& argv) {
Expand Down
14 changes: 11 additions & 3 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ using pstd::Status;
extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;

PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
for (int i = 0; i < MAX_DB_NUM; i++) {
async_write_db_task_counts_[i].store(0, std::memory_order::memory_order_seq_cst);
}
client_thread_ = std::make_unique<PikaReplClientThread>(cron_interval, keepalive_timeout);
client_thread_->set_thread_name("PikaReplClient");
for (int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++) {
Expand Down Expand Up @@ -98,12 +101,17 @@ void PikaReplClient::ScheduleWriteBinlogTask(const std::string& db_name,
write_binlog_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast<void*>(task_arg));
}

void PikaReplClient::ScheduleWriteDBTask(std::shared_ptr<Cmd> cmd_ptr, std::function<void()>& call_back_fun) {
void PikaReplClient::ScheduleWriteDBTask(std::shared_ptr<Cmd> cmd_ptr, const std::string& db_name) {
const PikaCmdArgsType& argv = cmd_ptr->argv();
std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0];
size_t index = GetHashIndexByKey(dispatch_key);
auto task_arg = new ReplClientWriteDBTaskArg(std::move(cmd_ptr));
write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg), call_back_fun);

IncrAsyncWriteDBTaskCount(db_name, 1);
std::function<void()> task_finish_call_back = [this, db_name]() { this->DecrAsyncWriteDBTaskCount(db_name, 1); };

write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg),
task_finish_call_back);
}

size_t PikaReplClient::GetBinlogWorkerIndexByDBName(const std::string &db_name) {
Expand Down
4 changes: 2 additions & 2 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,8 @@ void PikaReplicaManager::ScheduleWriteBinlogTask(const std::string& db,
pika_repl_client_->ScheduleWriteBinlogTask(db, res, conn, res_private_data);
}

void PikaReplicaManager::ScheduleWriteDBTask(std::shared_ptr<Cmd> cmd_ptr, std::function<void()>& call_back_fun) {
pika_repl_client_->ScheduleWriteDBTask(std::move(cmd_ptr), call_back_fun);
void PikaReplicaManager::ScheduleWriteDBTask(std::shared_ptr<Cmd> cmd_ptr, const std::string& db_name) {
pika_repl_client_->ScheduleWriteDBTask(std::move(cmd_ptr), db_name);
}

void PikaReplicaManager::ReplServerRemoveClientConn(int fd) { pika_repl_server_->RemoveClientConn(fd); }
Expand Down

0 comments on commit 42de339

Please sign in to comment.