diff --git a/src/server/async/rpc_server.cc b/src/server/async/rpc_server.cc index 9b4af00d2..0ea981905 100644 --- a/src/server/async/rpc_server.cc +++ b/src/server/async/rpc_server.cc @@ -262,7 +262,7 @@ void RPCServer::doVineyardRequestMemory(VineyardRecvContext* recv_context, << " fid_mr:" << remote_request_mem_info.mr; void* msg = nullptr; - rdma_server_->GetTXFreeMsgBuffer(msg); + VINEYARD_DISCARD(rdma_server_->GetTXFreeMsgBuffer(msg)); VineyardMsg* send_msg = reinterpret_cast(msg); send_msg->type = VINEYARD_MSG_REQUEST_MEM; send_msg->remoteMemInfo.remote_address = @@ -280,8 +280,8 @@ void RPCServer::doVineyardRequestMemory(VineyardRecvContext* recv_context, remote_mem_infos_[recv_context->rdma_conn_id].insert(std::make_pair( remote_request_mem_info.mr_desc, remote_request_mem_info)); } - rdma_server_->Send(recv_context->rdma_conn_id, msg, sizeof(VineyardMsg), - send_context); + VINEYARD_CHECK_OK(rdma_server_->Send(recv_context->rdma_conn_id, msg, + sizeof(VineyardMsg), send_context)); VLOG(100) << "Send key:" << remote_request_mem_info.rkey << " send address:" << reinterpret_cast(remote_request_mem_info.address) << " size: " << remote_request_mem_info.size; @@ -332,7 +332,9 @@ void RPCServer::doVineyardClose(VineyardRecvContext* recv_context) { if (recv_context == nullptr) { return; } - rdma_server_->CloseConnection(recv_context->rdma_conn_id); + if (!rdma_server_->CloseConnection(recv_context->rdma_conn_id).ok()) { + LOG(ERROR) << "Close connection failed!"; + } std::lock_guard scope_lock(this->rdma_mutex_); { @@ -353,14 +355,14 @@ void RPCServer::doPrepareRecv(uint64_t rdma_conn_id) { recv_context->rdma_conn_id = rdma_conn_id; void* context = reinterpret_cast(recv_context); void* msg = nullptr; - rdma_server_->GetRXFreeMsgBuffer(msg); + VINEYARD_DISCARD(rdma_server_->GetRXFreeMsgBuffer(msg)); recv_context->attr.msg_buffer = msg; rdma_server_->Recv(rdma_conn_id, msg, sizeof(VineyardMsg), context); } void RPCServer::doNothing(VineyardRecvContext* recv_context) { void* msg = nullptr; - rdma_server_->GetTXFreeMsgBuffer(msg); + VINEYARD_DISCARD(rdma_server_->GetTXFreeMsgBuffer(msg)); VineyardMsg* send_msg = reinterpret_cast(msg); send_msg->type = VINEYARD_MSG_REQUEST_MEM; @@ -372,8 +374,8 @@ void RPCServer::doNothing(VineyardRecvContext* recv_context) { VineyardSendContext* send_context = new VineyardSendContext(); memset(send_context, 0, sizeof(VineyardSendContext)); send_context->attr.msg_buffer = msg; - rdma_server_->Send(recv_context->rdma_conn_id, msg, sizeof(VineyardMsg), - send_context); + VINEYARD_DISCARD(rdma_server_->Send(recv_context->rdma_conn_id, msg, + sizeof(VineyardMsg), send_context)); } void RPCServer::doRDMARecv() { @@ -430,9 +432,9 @@ void RPCServer::doRDMARecv() { delete recv_msg_tmp; delete recv_context_tmp; }); - rdma_server_->Recv( + VINEYARD_CHECK_OK(rdma_server_->Recv( recv_context->rdma_conn_id, reinterpret_cast(recv_msg), - sizeof(VineyardMsg), reinterpret_cast(recv_context)); + sizeof(VineyardMsg), reinterpret_cast(recv_context))); } else if (recv_msg->type == VINEYARD_MSG_RELEASE_MEM) { boost::asio::post( vs_ptr_->GetIOContext(), [this, recv_context_tmp, recv_msg_tmp] { @@ -440,9 +442,9 @@ void RPCServer::doRDMARecv() { delete recv_msg_tmp; delete recv_context_tmp; }); - rdma_server_->Recv( + VINEYARD_CHECK_OK(rdma_server_->Recv( recv_context->rdma_conn_id, reinterpret_cast(recv_msg), - sizeof(VineyardMsg), reinterpret_cast(recv_context)); + sizeof(VineyardMsg), reinterpret_cast(recv_context))); } else if (recv_msg->type == VINEYARD_MSG_EMPTY) { boost::asio::post(vs_ptr_->GetIOContext(), [this, recv_context_tmp, recv_msg_tmp] { @@ -450,14 +452,14 @@ void RPCServer::doRDMARecv() { delete recv_msg_tmp; delete recv_context_tmp; }); - rdma_server_->Recv( + VINEYARD_CHECK_OK(rdma_server_->Recv( recv_context->rdma_conn_id, reinterpret_cast(recv_msg), - sizeof(VineyardMsg), reinterpret_cast(recv_context)); + sizeof(VineyardMsg), reinterpret_cast(recv_context))); } else { LOG(ERROR) << "Unknown message type: " << recv_msg->type; - rdma_server_->Recv( + VINEYARD_CHECK_OK(rdma_server_->Recv( recv_context->rdma_conn_id, reinterpret_cast(recv_msg), - sizeof(VineyardMsg), reinterpret_cast(recv_context)); + sizeof(VineyardMsg), reinterpret_cast(recv_context))); } } } diff --git a/src/server/server/vineyard_server.cc b/src/server/server/vineyard_server.cc index c5ce251dc..982f8d538 100644 --- a/src/server/server/vineyard_server.cc +++ b/src/server/server/vineyard_server.cc @@ -1473,7 +1473,7 @@ void VineyardServer::Stop() { this->bulk_store_.reset(); this->plasma_bulk_store_.reset(); - RDMAClientCreator::Clear(); + VINEYARD_DISCARD(RDMAClientCreator::Clear()); } bool VineyardServer::Running() const { return !stopped_.load(); } diff --git a/src/server/server/vineyard_server.h b/src/server/server/vineyard_server.h index 7381786a4..085310aad 100644 --- a/src/server/server/vineyard_server.h +++ b/src/server/server/vineyard_server.h @@ -316,8 +316,9 @@ class VineyardServer : public std::enable_shared_from_this { } pendding_to_delete_objects_.clear(); } - this->DelData(ids, false, false, false, false, - [](const Status& status) { return Status::OK(); }); + VINEYARD_DISCARD( + this->DelData(ids, false, false, false, false, + [](const Status& status) { return Status::OK(); })); } void AddPendingObjects(std::vector const& ids) { diff --git a/src/server/util/remote.cc b/src/server/util/remote.cc index ee69f0059..afac7db93 100644 --- a/src/server/util/remote.cc +++ b/src/server/util/remote.cc @@ -108,7 +108,7 @@ Status RemoteClient::Connect(const std::string& rpc_endpoint, Status RemoteClient::RDMARequestMemInfo(RegisterMemInfo& remote_info) { void* buffer; - this->rdma_client_->GetTXFreeMsgBuffer(buffer); + VINEYARD_DISCARD(this->rdma_client_->GetTXFreeMsgBuffer(buffer)); VineyardMsg* msg = reinterpret_cast(buffer); msg->type = VINEYARD_MSG_REQUEST_MEM; msg->remoteMemInfo.remote_address = (uint64_t) remote_info.address; @@ -116,7 +116,7 @@ Status RemoteClient::RDMARequestMemInfo(RegisterMemInfo& remote_info) { VLOG(100) << "Request remote addr: " << reinterpret_cast(msg->remoteMemInfo.remote_address); void* remoteMsg; - this->rdma_client_->GetRXFreeMsgBuffer(remoteMsg); + VINEYARD_DISCARD(this->rdma_client_->GetRXFreeMsgBuffer(remoteMsg)); memset(remoteMsg, 0, 64); VINEYARD_CHECK_OK( this->rdma_client_->Recv(remoteMsg, sizeof(VineyardMsg), nullptr)); @@ -142,7 +142,7 @@ Status RemoteClient::RDMARequestMemInfo(RegisterMemInfo& remote_info) { Status RemoteClient::RDMAReleaseMemInfo(RegisterMemInfo& remote_info) { void* buffer; - this->rdma_client_->GetTXFreeMsgBuffer(buffer); + VINEYARD_DISCARD(this->rdma_client_->GetTXFreeMsgBuffer(buffer)); VineyardMsg* msg = reinterpret_cast(buffer); msg->type = VINEYARD_MSG_RELEASE_MEM; msg->remoteMemInfo.remote_address = (uint64_t) remote_info.address; @@ -151,7 +151,8 @@ Status RemoteClient::RDMAReleaseMemInfo(RegisterMemInfo& remote_info) { << reinterpret_cast(msg->remoteMemInfo.remote_address) << ", rkey: " << msg->remoteMemInfo.key; - this->rdma_client_->Send(buffer, sizeof(VineyardMsg), nullptr); + RETURN_ON_ERROR( + this->rdma_client_->Send(buffer, sizeof(VineyardMsg), nullptr)); VINEYARD_CHECK_OK(rdma_client_->GetTXCompletion(-1, nullptr)); return Status::OK();