Skip to content

Commit

Permalink
use excute func (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 authored Jan 22, 2024
1 parent b2a6823 commit b2856a7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 41 deletions.
15 changes: 11 additions & 4 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -625,10 +625,17 @@ void PClient::TransferToSlaveThreads() {
auto slave_loop = tcp_connection->SelectSlaveEventLoop();
auto id = tcp_connection->GetUniqueId();
auto event_object = loop->GetEventObject(id);
loop->Unregister(event_object);
event_object->SetUniqueId(-1);
slave_loop->Register(event_object, 0);
tcp_connection->ResetEventLoop(slave_loop);
auto del_conn = [loop, slave_loop, event_object]() {
loop->Unregister(event_object);
event_object->SetUniqueId(-1);
auto tcp_connection = std::dynamic_pointer_cast<TcpConnection>(event_object);
assert(tcp_connection);
tcp_connection->ResetEventLoop(slave_loop);

auto add_conn = [slave_loop, event_object]() { slave_loop->Register(event_object, 0); };
slave_loop->Execute(std::move(add_conn));
};
loop->Execute(std::move(del_conn));
}
}

Expand Down
53 changes: 17 additions & 36 deletions src/net/event_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,11 @@ void EventLoop::Run() {
}
}

{
std::unique_lock<std::mutex> guard(object_mutex_);
for (auto& pair : objects_) {
reactor_->Unregister(pair.second.get());
}

objects_.clear();
for (auto& pair : objects_) {
reactor_->Unregister(pair.second.get());
}

objects_.clear();
reactor_.reset();
}

Expand Down Expand Up @@ -106,7 +102,7 @@ EventLoop* EventLoop::Self() { return g_this_loop; }
bool EventLoop::Register(std::shared_ptr<EventObject> obj, int events) {
if (!obj) return false;

// assert(InThisLoop());
assert(InThisLoop());
assert(obj->GetUniqueId() == -1);

if (!reactor_) {
Expand All @@ -115,7 +111,6 @@ bool EventLoop::Register(std::shared_ptr<EventObject> obj, int events) {

{
// alloc unique id
std::unique_lock<std::mutex> guard(object_mutex_);
int id = -1;
do {
id = obj_id_generator_.fetch_add(1) + 1;
Expand All @@ -139,14 +134,9 @@ bool EventLoop::Modify(std::shared_ptr<EventObject> obj, int events) {

assert(InThisLoop());
assert(obj->GetUniqueId() >= 0);

{
#ifdef DEBUG
std::unique_lock<std::mutex> guard(object_mutex_);
assert(objects_.count(obj->GetUniqueId()) == 1);
assert(objects_.count(obj->GetUniqueId()) == 1);
#endif
}

if (!reactor_) {
return false;
}
Expand All @@ -157,18 +147,15 @@ void EventLoop::Unregister(std::shared_ptr<EventObject> obj) {
if (!obj) return;

int id = obj->GetUniqueId();
// assert(InThisLoop());
assert(InThisLoop());
assert(id >= 0);
{
std::unique_lock<std::mutex> guard(object_mutex_);
assert(objects_.count(id) == 1);
assert(objects_.count(id) == 1);

if (!reactor_) {
return;
}
reactor_->Unregister(obj.get());
objects_.erase(id);
if (!reactor_) {
return;
}
reactor_->Unregister(obj.get());
objects_.erase(id);
}

bool EventLoop::Listen(const char* ip, int port, NewTcpConnectionCallback ccb, EventLoopSelector selector) {
Expand Down Expand Up @@ -218,13 +205,10 @@ std::shared_ptr<HttpClient> EventLoop::ConnectHTTP(const char* ip, int port) {
}

void EventLoop::Reset() {
{
std::unique_lock<std::mutex> guard(object_mutex_);
for (auto& kv : objects_) {
Unregister(kv.second);
}
objects_.clear();
for (auto& kv : objects_) {
Unregister(kv.second);
}
objects_.clear();

{
std::unique_lock<std::mutex> guard(task_mutex_);
Expand All @@ -236,12 +220,9 @@ void EventLoop::Reset() {
}

std::shared_ptr<EventObject> EventLoop::GetEventObject(int id) const {
{
std::unique_lock<std::mutex> guard(object_mutex_);
auto it = objects_.find(id);
if (it != objects_.end()) {
return it->second;
}
auto it = objects_.find(id);
if (it != objects_.end()) {
return it->second;
}

return nullptr;
Expand Down
1 change: 0 additions & 1 deletion src/net/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class EventLoop {
private:
std::unique_ptr<Reactor> reactor_;

mutable std::mutex object_mutex_;
std::unordered_map<int, std::shared_ptr<EventObject>> objects_;

std::shared_ptr<internal::PipeObject> notifier_;
Expand Down

0 comments on commit b2856a7

Please sign in to comment.