Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add protect mode for new leader sync and commit binlog #1276

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ class PikaReplicaManager {
void Start();
void Stop();

bool CheckMasterSyncFinished();

Status AddSyncPartitionSanityCheck(const std::set<PartitionInfo>& p_infos);
Status AddSyncPartition(const std::set<PartitionInfo>& p_infos);
Status RemoveSyncPartitionSanityCheck(const std::set<PartitionInfo>& p_infos);
Expand Down
3 changes: 3 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class PikaServer {
std::string master_ip();
int master_port();
int role();
bool leader_protected_mode();
void CheckLeaderProtectedMode();
bool readonly(const std::string& table, const std::string& key);
bool ConsensusCheck(const std::string& table_name, const std::string& key);
int repl_state();
Expand Down Expand Up @@ -360,6 +362,7 @@ class PikaServer {
bool first_meta_sync_;
bool loop_partition_state_machine_;
bool force_full_sync_;
bool leader_protected_mode_; // reject request after master slave sync done
pthread_rwlock_t state_protector_; //protect below, use for master-slave mode

/*
Expand Down
2 changes: 2 additions & 0 deletions src/pika_auxiliary_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ void* PikaAuxiliaryThread::ThreadMain() {
LOG(WARNING) << s.ToString();
}

g_pika_server->CheckLeaderProtectedMode();

// TODO(whoiami) timeout
s = g_pika_server->TriggerSendBinlogSync();
if (!s.ok()) {
Expand Down
6 changes: 6 additions & 0 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(
return c_ptr;
}

// reject all the request before new master sync finished
if (g_pika_server->leader_protected_mode()) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Cannot process command before new leader sync finished");
return c_ptr;
}

if (!g_pika_server->IsTableExist(current_table_)) {
c_ptr->res().SetRes(CmdRes::kErrOther, "Table not found");
return c_ptr;
Expand Down
16 changes: 16 additions & 0 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,22 @@ void PikaReplicaManager::Stop() {
pika_repl_server_->Stop();
}

bool PikaReplicaManager::CheckMasterSyncFinished() {
for (auto& iter : sync_master_partitions_) {
std::shared_ptr<SyncMasterPartition> partition = iter.second;
LogOffset commit = partition->ConsensusCommittedIndex();
BinlogOffset binlog;
Status s = partition->StableLogger()->Logger()->GetProducerStatus(&binlog.filenum, &binlog.offset);
if (!s.ok()) {
return false;
}
if (commit.b_offset < binlog) {
return false;
}
}
return true;
}

void PikaReplicaManager::InitPartition() {
std::vector<TableStruct> table_structs = g_pika_conf->table_structs();
for (const auto& table : table_structs) {
Expand Down
25 changes: 25 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ PikaServer::PikaServer() :
master_port_(0),
repl_state_(PIKA_REPL_NO_CONNECT),
role_(PIKA_ROLE_SINGLE),
leader_protected_mode_(false),
last_meta_sync_timestamp_(0),
first_meta_sync_(false),
loop_partition_state_machine_(false),
Expand Down Expand Up @@ -307,6 +308,23 @@ int PikaServer::role() {
return role_;
}

bool PikaServer::leader_protected_mode() {
slash::RWLock(&state_protector_, false);
return leader_protected_mode_;
}

void PikaServer::CheckLeaderProtectedMode() {
if (!leader_protected_mode()) {
return;
}
if (g_pika_rm->CheckMasterSyncFinished()) {
LOG(INFO) << "Master finish sync and commit binlog";

slash::RWLock(&state_protector_, true);
leader_protected_mode_ = false;
}
}

bool PikaServer::readonly(const std::string& table_name, const std::string& key) {
slash::RWLock(&state_protector_, false);
if ((role_ & PIKA_ROLE_SLAVE)
Expand Down Expand Up @@ -714,6 +732,12 @@ Status PikaServer::DoSameThingEveryPartition(const TaskType& type) {

void PikaServer::BecomeMaster() {
slash::RWLock l(&state_protector_, true);
if ((role_ & PIKA_ROLE_MASTER) == 0
&& g_pika_conf->write_binlog()
&& g_pika_conf->consensus_level() > 0) {
LOG(INFO) << "Become new master, start protect mode to waiting binlog sync and commit";
leader_protected_mode_ = true;
}
role_ |= PIKA_ROLE_MASTER;
}

Expand Down Expand Up @@ -747,6 +771,7 @@ void PikaServer::DeleteSlave(int fd) {
if (slave_num == 0) {
slash::RWLock l(&state_protector_, true);
role_ &= ~PIKA_ROLE_MASTER;
leader_protected_mode_ = false; // explicitly cancel protected mode
}
}

Expand Down