Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add function DropItemInOneWriteQueue to do the accurate queue clear when slave and master disconnect due to timeout #2666

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class PikaReplicaManager {

// write_queue related
void ProduceWriteQueue(const std::string& ip, int port, std::string db_name, const std::vector<WriteTask>& tasks);
void DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name);
void DropItemInWriteQueue(const std::string& ip, int port);
int ConsumeWriteQueue();

Expand Down
12 changes: 10 additions & 2 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Status SyncMasterDB::ActivateSlaveBinlogSync(const std::string& ip, int port, co
}
//Since we init a new reader, we should drop items in write queue and reset sync_window.
//Or the sent_offset and acked_offset will not match
g_pika_rm->DropItemInWriteQueue(ip, port);
g_pika_rm->DropItemInOneWriteQueue(ip, port, slave_ptr->DBName());
slave_ptr->sync_win.Reset();
slave_ptr->b_state = kReadFromFile;
}
Expand Down Expand Up @@ -335,7 +335,7 @@ Status SyncMasterDB::CheckSyncTimeout(uint64_t now) {

for (auto& node : to_del) {
coordinator_.SyncPros().RemoveSlaveNode(node.Ip(), node.Port());
g_pika_rm->DropItemInWriteQueue(node.Ip(), node.Port());
g_pika_rm->DropItemInOneWriteQueue(node.Ip(), node.Port(), DBName());
LOG(WARNING) << SyncDBInfo().ToString() << " Master del Recv Timeout slave success " << node.ToString();
}
return Status::OK();
Expand Down Expand Up @@ -645,6 +645,14 @@ int PikaReplicaManager::ConsumeWriteQueue() {
return counter;
}

void PikaReplicaManager::DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name) {
std::lock_guard l(write_queue_mu_);
std::string index = ip + ":" + std::to_string(port);
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
if (write_queues_.find(index) != write_queues_.end()) {
write_queues_[index].erase(db_name);
}
}

void PikaReplicaManager::DropItemInWriteQueue(const std::string& ip, int port) {
std::lock_guard l(write_queue_mu_);
std::string index = ip + ":" + std::to_string(port);
Expand Down
Loading