Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add client command #361

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ const std::string kCmdNameUnwatch = "unwatch";
const std::string kCmdNameDiscard = "discard";

// admin
const std::string kCmdNameClient = "client";
const std::string kSubCmdNameClientGetname = "getname";
const std::string kSubCmdNameClientSetname = "setname";
const std::string kSubCmdNameClientId = "id";
const std::string kSubCmdNameClientList = "list";
const std::string kSubCmdNameClientKill = "kill";

const std::string kCmdNameConfig = "config";
const std::string kSubCmdNameConfigGet = "get";
const std::string kSubCmdNameConfigSet = "set";
Expand Down
14 changes: 9 additions & 5 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

namespace pikiwidb {

const ClientInfo ClientInfo::invalidClientInfo = {0, "", -1};

void CmdRes::RedisAppendLen(std::string& str, int64_t ori, const std::string& prefix) {
str.append(prefix);
str.append(pstd::Int2string(ori));
Expand Down Expand Up @@ -451,15 +453,15 @@ void PClient::OnConnect() {

std::string PClient::PeerIP() const {
if (!addr_.IsValid()) {
ERROR("Invalid address detected for client {}", uniqueID());
ERROR("Invalid address detected for client {}", GetUniqueID());
return "";
}
return addr_.GetIP();
}

int PClient::PeerPort() const {
if (!addr_.IsValid()) {
ERROR("Invalid address detected for client {}", uniqueID());
ERROR("Invalid address detected for client {}", GetUniqueID());
return 0;
}
return addr_.GetPort();
Expand Down Expand Up @@ -506,7 +508,9 @@ bool PClient::isClusterCmdTarget() const {
return PRAFT.GetClusterCmdCtx().GetPeerIp() == PeerIP() && PRAFT.GetClusterCmdCtx().GetPort() == PeerPort();
}

uint64_t PClient::uniqueID() const { return GetConnId(); }
uint64_t PClient::GetUniqueID() const { return GetConnId(); }

ClientInfo PClient::GetClientInfo() const { return {GetUniqueID(), PeerIP().c_str(), PeerPort()}; }

bool PClient::Watch(int dbno, const std::string& key) {
DEBUG("Client {} watch {}, db {}", name_, key, dbno);
Expand All @@ -515,12 +519,12 @@ bool PClient::Watch(int dbno, const std::string& key) {

bool PClient::NotifyDirty(int dbno, const std::string& key) {
if (IsFlagOn(kClientFlagDirty)) {
INFO("client is already dirty {}", uniqueID());
INFO("client is already dirty {}", GetUniqueID());
return true;
}

if (watch_keys_[dbno].contains(key)) {
INFO("{} client become dirty because key {} in db {}", uniqueID(), key, dbno);
INFO("{} client become dirty because key {} in db {}", GetUniqueID(), key, dbno);
SetFlag(kClientFlagDirty);
return true;
} else {
Expand Down
13 changes: 11 additions & 2 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "storage/storage.h"

namespace pikiwidb {

class CmdRes {
public:
enum CmdRet {
Expand Down Expand Up @@ -118,6 +117,14 @@ enum class ClientState {
class DB;
struct PSlaveInfo;

struct ClientInfo {
uint64_t client_id;
std::string ip;
int port;
static const ClientInfo invalidClientInfo;
bool operator==(ClientInfo& ci) const { return client_id == ci.client_id; }
};
Copy link

@coderabbitai coderabbitai bot Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议使用 const 引用作为参数。

operator== 的参数应该是 const ClientInfo&,以提高一致性和效率。

-  bool operator==(ClientInfo& ci) const { return client_id == ci.client_id; }
+  bool operator==(const ClientInfo& ci) const { return client_id == ci.client_id; }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
struct ClientInfo {
uint64_t client_id;
std::string ip;
int port;
static const ClientInfo invalidClientInfo;
bool operator==(ClientInfo& ci) const { return client_id == ci.client_id; }
};
struct ClientInfo {
uint64_t client_id;
std::string ip;
int port;
static const ClientInfo invalidClientInfo;
bool operator==(const ClientInfo& ci) const { return client_id == ci.client_id; }
};

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, I'll modify

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gukj-spel, great! Let me know if you need any further assistance.


class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
public:
// PClient() = delete;
Expand All @@ -129,6 +136,8 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {

std::string PeerIP() const;
int PeerPort() const;
const int GetFd() const;
ClientInfo GetClientInfo() const;

// bool SendPacket(const std::string& buf);
// bool SendPacket(const void* data, size_t size);
Expand Down Expand Up @@ -217,6 +226,7 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {

void SetAuth() { auth_ = true; }
bool GetAuth() const { return auth_; }
uint64_t GetUniqueID() const;
void RewriteCmd(std::vector<std::string>& params) { parser_.SetParams(params); }
void Reexecutecommand() { this->executeCommand(); }

Expand Down Expand Up @@ -244,7 +254,6 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
int processInlineCmd(const char*, size_t, std::vector<std::string>&);
void reset();
bool isPeerMaster() const;
uint64_t uniqueID() const;

bool isClusterCmdTarget() const;

Expand Down
103 changes: 103 additions & 0 deletions src/client_map.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#include "client_map.h"
#include "log.h"

namespace pikiwidb {

uint32_t ClientMap::GetAllClientInfos(std::vector<ClientInfo>& results) {
// client info string type: ip, port, fd.
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
for (auto& [id, client_weak] : clients_) {
if (auto client = client_weak.lock()) {
results.emplace_back(client->GetClientInfo());
}
}
return results.size();
}

bool ClientMap::AddClient(int id, std::weak_ptr<PClient> client) {
std::unique_lock client_map_lock(client_map_mutex_);
if (clients_.find(id) == clients_.end()) {
clients_.insert({id, client});
return true;
}
return false;
Comment on lines +17 to +23
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议:改进添加客户端的方法。

建议在插入客户端之前检查 client 是否有效。

bool ClientMap::AddClient(int id, std::weak_ptr<PClient> client) {
    if (client.expired()) {
        return false;
    }
    std::unique_lock client_map_lock(client_map_mutex_);
    if (clients_.find(id) == clients_.end()) {
        clients_.insert({id, client});
        return true;
    }
    return false;
}

}

ClientInfo ClientMap::GetClientsInfoById(int id) {
std::shared_lock client_map_lock(client_map_mutex_);
if (auto it = clients_.find(id); it != clients_.end()) {
if (auto client = it->second.lock(); client) {
return client->GetClientInfo();
}
}
ERROR("Client with ID {} not found in GetClientsInfoById", id);
return ClientInfo::invalidClientInfo;
}

bool ClientMap::RemoveClientById(int id) {
std::unique_lock client_map_lock(client_map_mutex_);
if (auto it = clients_.find(id); it != clients_.end()) {
clients_.erase(it);
INFO("Removed client with ID {}", id);
return true;
}
return false;
}

bool ClientMap::KillAllClients() {
std::vector<std::shared_ptr<PClient>> clients_to_close;
{
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
for (auto& [id, client_weak] : clients_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有个疑问,这里close掉client不需要从这个map里去除吗?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我之前是在 tcp_client 的disconnect 回调函数里面从map去掉对应的client。感觉改成在这里直接删除确实更好点,我来改一下

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

刚才仔细想了一下,这里之所以在回调函数里面,是因为存在客户端主动关闭连接的情况。这时候就需要在回调函数里面去删除map里面的记录。

if (auto client = client_weak.lock()) {
clients_to_close.push_back(client);
}
}
}
for (auto& client : clients_to_close) {
client->Close();
}
return true;
}

bool ClientMap::KillClientByAddrPort(const std::string& addr_port) {
std::shared_ptr<PClient> client_to_close;
{
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
for (auto& [id, client_weak] : clients_) {
if (auto client = client_weak.lock()) {
std::string client_ip_port = client->PeerIP() + ":" + std::to_string(client->PeerPort());
if (client_ip_port == addr_port) {
client_to_close = client;
break;
}
}
}
}
if (client_to_close) {
client_to_close->Close();
return true;
}
return false;
}

bool ClientMap::KillClientById(int client_id) {
std::shared_ptr<PClient> client_to_close;
{
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
if (auto it = clients_.find(client_id); it != clients_.end()) {
if (auto client = it->second.lock()) {
client_to_close = client;
}
}
}
if (client_to_close) {
INFO("Closing client with ID {}", client_id);
client_to_close->Close();
INFO("Client with ID {} closed", client_id);
return true;
}
return false;
}

} // namespace pikiwidb
41 changes: 41 additions & 0 deletions src/client_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <map>
#include <memory>
#include <shared_mutex>
#include <string>
#include "client.h"

namespace pikiwidb {
class ClientMap {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个 ClientMap 使用了单例模式, 如果把 ClientMap 放到 g_pikiwidb 里会不会更好点. 我的这个意见不一定合适, 这个可以讨论一下

private:
ClientMap() = default;
// 禁用复制构造函数和赋值运算符

private:
std::map<int, std::weak_ptr<PClient>> clients_;
std::shared_mutex client_map_mutex_;

public:
static ClientMap& getInstance() {
static ClientMap instance;
return instance;
}

ClientMap(const ClientMap&) = delete;
ClientMap& operator=(const ClientMap&) = delete;

// client info function
pikiwidb::ClientInfo GetClientsInfoById(int id);
uint32_t GetAllClientInfos(std::vector<ClientInfo>& results);

bool AddClient(int id, std::weak_ptr<PClient>);

bool RemoveClientById(int id);

bool KillAllClients();
bool KillClientById(int client_id);
bool KillClientByAddrPort(const std::string& addr_port);
};

} // namespace pikiwidb
Loading
Loading