Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use RAII to release resources in async task #1502

Merged
merged 3 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,34 +182,31 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
}

void PikaClientConn::DoBackgroundTask(void* arg) {
BgTaskArg* bg_arg = reinterpret_cast<BgTaskArg*>(arg);
std::unique_ptr<BgTaskArg> bg_arg(static_cast<BgTaskArg*>(arg));
std::shared_ptr<PikaClientConn> conn_ptr = bg_arg->conn_ptr;
if (bg_arg->redis_cmds.size() == 0) {
delete bg_arg;
conn_ptr->NotifyEpoll(false);
return;
}
for (const auto& argv : bg_arg->redis_cmds) {
if (argv.size() == 0) {
delete bg_arg;
conn_ptr->NotifyEpoll(false);
return;
}
}

conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds);
delete bg_arg;
}

void PikaClientConn::DoExecTask(void* arg) {
BgTaskArg* bg_arg = reinterpret_cast<BgTaskArg*>(arg);
std::unique_ptr<BgTaskArg> bg_arg(static_cast<BgTaskArg*>(arg));
std::shared_ptr<Cmd> cmd_ptr = bg_arg->cmd_ptr;
std::shared_ptr<PikaClientConn> conn_ptr = bg_arg->conn_ptr;
std::shared_ptr<std::string> resp_ptr = bg_arg->resp_ptr;
LogOffset offset = bg_arg->offset;
std::string table_name = bg_arg->table_name;
uint32_t partition_id = bg_arg->partition_id;
delete bg_arg;
bg_arg.reset();

uint64_t start_us = 0;
if (g_pika_conf->slowlog_slower_than() >= 0) {
Expand Down
3 changes: 1 addition & 2 deletions src/pika_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ BgSaveInfo Partition::bgsave_info() {
}

void Partition::DoBgSave(void* arg) {
BgTaskArg* bg_task_arg = static_cast<BgTaskArg*>(arg);
std::unique_ptr<BgTaskArg> bg_task_arg(static_cast<BgTaskArg*>(arg));

// Do BgSave
bool success = bg_task_arg->partition->RunBgsaveEngine();
Expand All @@ -314,7 +314,6 @@ void Partition::DoBgSave(void* arg) {
}
bg_task_arg->partition->FinishBgsave();

delete bg_task_arg;
}

bool Partition::RunBgsaveEngine() {
Expand Down
27 changes: 7 additions & 20 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "include/pika_conf.h"
#include "include/pika_rm.h"
#include "include/pika_server.h"
#include "pstd/include/pstd_defer.h"

extern PikaConf* g_pika_conf;
extern PikaServer* g_pika_server;
Expand Down Expand Up @@ -50,6 +51,11 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
PikaReplBgWorker* worker = task_arg->worker;
worker->ip_port_ = conn->ip_port();

DEFER {
delete index;
delete task_arg;
};

std::string table_name;
uint32_t partition_id = 0;
LogOffset pb_begin, pb_end;
Expand Down Expand Up @@ -97,17 +103,13 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id));
if (!partition) {
LOG(WARNING) << "Partition " << table_name << "_" << partition_id << " Not Found";
delete index;
delete task_arg;
return;
}

std::shared_ptr<SyncSlavePartition> slave_partition =
g_pika_rm->GetSyncSlavePartitionByName(PartitionInfo(table_name, partition_id));
if (!slave_partition) {
LOG(WARNING) << "Slave Partition " << table_name << "_" << partition_id << " Not Found";
delete index;
delete task_arg;
return;
}

Expand All @@ -120,8 +122,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
} else if (meta.term() < partition->ConsensusTerm()) /*outdated pb*/ {
LOG(WARNING) << "Drop outdated binlog sync response " << table_name << "_" << partition_id
<< " recv term: " << meta.term() << " local term: " << partition->ConsensusTerm();
delete index;
delete task_arg;
return;
}
if (!only_keepalive) {
Expand All @@ -133,8 +133,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
LOG(WARNING) << "last_offset " << last_offset.ToString() << " NOT equal to pb prev_offset "
<< prev_offset.ToString();
slave_partition->SetReplState(ReplState::kTryConnect);
delete index;
delete task_arg;
return;
}
}
Expand All @@ -146,8 +144,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
// BinlogSync state, we drop remain write binlog task
if ((!(g_pika_server->role() & PIKA_ROLE_SLAVE)) ||
((slave_partition->State() != ReplState::kConnected) && (slave_partition->State() != ReplState::kWaitDBSync))) {
delete index;
delete task_arg;
return;
}

Expand All @@ -159,8 +155,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
LOG(WARNING) << "Check Session failed " << binlog_res.partition().table_name() << "_"
<< binlog_res.partition().partition_id();
slave_partition->SetReplState(ReplState::kTryConnect);
delete index;
delete task_arg;
return;
}

Expand All @@ -171,8 +165,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog_res.binlog(), &worker->binlog_item_)) {
LOG(WARNING) << "Binlog item decode failed";
slave_partition->SetReplState(ReplState::kTryConnect);
delete index;
delete task_arg;
return;
}
const char* redis_parser_start = binlog_res.binlog().data() + BINLOG_ENCODE_LEN;
Expand All @@ -183,13 +175,9 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
if (ret != net::kRedisParserDone) {
LOG(WARNING) << "Redis parser failed";
slave_partition->SetReplState(ReplState::kTryConnect);
delete index;
delete task_arg;
return;
}
}
delete index;
delete task_arg;

if (res->has_consensus_meta()) {
LogOffset leader_commit;
Expand Down Expand Up @@ -255,7 +243,7 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red
}

void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
ReplClientWriteDBTaskArg* task_arg = static_cast<ReplClientWriteDBTaskArg*>(arg);
std::unique_ptr<ReplClientWriteDBTaskArg> task_arg(static_cast<ReplClientWriteDBTaskArg*>(arg));
const std::shared_ptr<Cmd> c_ptr = task_arg->cmd_ptr;
const PikaCmdArgsType& argv = c_ptr->argv();
LogOffset offset = task_arg->offset;
Expand Down Expand Up @@ -289,7 +277,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
}
}

delete task_arg;

if (g_pika_conf->consensus_level() != 0) {
std::shared_ptr<SyncMasterPartition> partition =
Expand Down
22 changes: 4 additions & 18 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ int PikaReplClientConn::DealMessage() {
}

void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
ReplClientTaskArg* task_arg = static_cast<ReplClientTaskArg*>(arg);
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;

Expand All @@ -104,7 +104,6 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
LOG(WARNING) << "Meta Sync Failed: " << reply;
g_pika_server->SyncError();
conn->NotifyClose();
delete task_arg;
return;
}

Expand All @@ -123,19 +122,17 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
<< "), failed to establish master-slave relationship";
g_pika_server->SyncError();
conn->NotifyClose();
delete task_arg;
return;
}

g_pika_conf->SetWriteBinlog("yes");
g_pika_server->PreparePartitionTrySync();
g_pika_server->FinishMetaSync();
LOG(INFO) << "Finish to handle meta sync response";
delete task_arg;
}

void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
ReplClientTaskArg* task_arg = static_cast<ReplClientTaskArg*>(arg);
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;

Expand All @@ -149,15 +146,13 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
g_pika_rm->GetSyncSlavePartitionByName(PartitionInfo(table_name, partition_id));
if (!slave_partition) {
LOG(WARNING) << "Slave Partition: " << table_name << ":" << partition_id << " Not Found";
delete task_arg;
return;
}

if (response->code() != InnerMessage::kOk) {
slave_partition->SetReplState(ReplState::kError);
std::string reply = response->has_reply() ? response->reply() : "";
LOG(WARNING) << "DBSync Failed: " << reply;
delete task_arg;
return;
}

Expand All @@ -166,18 +161,16 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
std::string partition_name = slave_partition->PartitionName();
slave_partition->SetReplState(ReplState::kWaitDBSync);
LOG(INFO) << "Partition: " << partition_name << " Need Wait To Sync";
delete task_arg;
}

void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
ReplClientTaskArg* task_arg = static_cast<ReplClientTaskArg*>(arg);
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;

if (response->code() != InnerMessage::kOk) {
std::string reply = response->has_reply() ? response->reply() : "";
LOG(WARNING) << "TrySync Failed: " << reply;
delete task_arg;
return;
}

Expand All @@ -189,15 +182,13 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id));
if (!partition) {
LOG(WARNING) << "Partition: " << table_name << ":" << partition_id << " Not Found";
delete task_arg;
return;
}

std::shared_ptr<SyncSlavePartition> slave_partition =
g_pika_rm->GetSyncSlavePartitionByName(PartitionInfo(table_name, partition_id));
if (!slave_partition) {
LOG(WARNING) << "Slave Partition: " << table_name << ":" << partition_id << " Not Found";
delete task_arg;
return;
}

Expand All @@ -211,7 +202,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
} else if (meta.term() < partition->ConsensusTerm()) /*outdated pb*/ {
LOG(WARNING) << "Drop outdated trysync response " << table_name << ":" << partition_id
<< " recv term: " << meta.term() << " local term: " << partition->ConsensusTerm();
delete task_arg;
return;
}

Expand All @@ -221,7 +211,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
slave_partition->SetReplState(ReplState::kError);
LOG(WARNING) << "Consensus Check failed " << s.ToString();
}
delete task_arg;
return;
}

Expand Down Expand Up @@ -251,7 +240,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
slave_partition->SetReplState(ReplState::kError);
LOG(WARNING) << "Partition: " << partition_name << " TrySync Error";
}
delete task_arg;
}

Status PikaReplClientConn::TrySyncConsensusCheck(const InnerMessage::ConsensusMeta& consensus_meta,
Expand Down Expand Up @@ -308,14 +296,12 @@ void PikaReplClientConn::DispatchBinlogRes(const std::shared_ptr<InnerMessage::I
}

void PikaReplClientConn::HandleRemoveSlaveNodeResponse(void* arg) {
ReplClientTaskArg* task_arg = static_cast<ReplClientTaskArg*>(arg);
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;
if (response->code() != InnerMessage::kOk) {
std::string reply = response->has_reply() ? response->reply() : "";
LOG(WARNING) << "Remove slave node Failed: " << reply;
delete task_arg;
return;
}
delete task_arg;
}
Loading