Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add client command #361

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/base_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ const std::string kCmdNameUnwatch = "unwatch";
const std::string kCmdNameDiscard = "discard";

// admin
const std::string kCmdNameClient = "client";
const std::string kSubCmdNameClientGetname = "getname";
const std::string kSubCmdNameClientSetname = "setname";
const std::string kSubCmdNameClientId = "id";
const std::string kSubCmdNameClientList = "list";
const std::string kSubCmdNameClientKill = "kill";

const std::string kCmdNameConfig = "config";
const std::string kSubCmdNameConfigGet = "get";
const std::string kSubCmdNameConfigSet = "set";
Expand Down
22 changes: 19 additions & 3 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

namespace pikiwidb {

const ClientInfo ClientInfo::invalidClientInfo = {-1, "", -1, -1};

void CmdRes::RedisAppendLen(std::string& str, int64_t ori, const std::string& prefix) {
str.append(prefix);
str.append(pstd::Int2string(ori));
Expand Down Expand Up @@ -487,6 +489,13 @@ int PClient::PeerPort() const {
return -1;
}

const int PClient::GetFd() const {
if (auto c = getTcpConnection(); c) {
return c->Fd();
}
return -1;
}

bool PClient::SendPacket(const std::string& buf) {
if (auto c = getTcpConnection(); c) {
return c->SendPacket(buf);
Expand Down Expand Up @@ -549,27 +558,34 @@ bool PClient::isClusterCmdTarget() const {
return PRAFT.GetClusterCmdCtx().GetPeerIp() == PeerIP() && PRAFT.GetClusterCmdCtx().GetPort() == PeerPort();
}

int PClient::uniqueID() const {
int PClient::GetUniqueId() const {
if (auto c = getTcpConnection(); c) {
return c->GetUniqueId();
}

return -1;
}

ClientInfo PClient::GetClientInfo() const {
if (auto c = getTcpConnection(); c) {
return {GetUniqueId(), PeerIP().c_str(), PeerPort(), GetFd()};
}
return ClientInfo::invalidClientInfo;
}

bool PClient::Watch(int dbno, const std::string& key) {
DEBUG("Client {} watch {}, db {}", name_, key, dbno);
return watch_keys_[dbno].insert(key).second;
}

bool PClient::NotifyDirty(int dbno, const std::string& key) {
if (IsFlagOn(kClientFlagDirty)) {
INFO("client is already dirty {}", uniqueID());
INFO("client is already dirty {}", GetUniqueId());
return true;
}

if (watch_keys_[dbno].contains(key)) {
INFO("{} client become dirty because key {} in db {}", uniqueID(), key, dbno);
INFO("{} client become dirty because key {} in db {}", GetUniqueId(), key, dbno);
SetFlag(kClientFlagDirty);
return true;
} else {
Expand Down
13 changes: 11 additions & 2 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "storage/storage.h"

namespace pikiwidb {

class CmdRes {
public:
enum CmdRet {
Expand Down Expand Up @@ -111,6 +110,14 @@ enum class ClientState {
class DB;
struct PSlaveInfo;

struct ClientInfo {
int client_id;
std::string ip;
int port;
int fd;
static const ClientInfo invalidClientInfo;
bool operator==(ClientInfo& ci) const { return client_id == ci.client_id; }
};
class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
public:
PClient() = delete;
Expand All @@ -122,6 +129,9 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {

const std::string& PeerIP() const;
int PeerPort() const;
const int GetFd() const;
int GetUniqueId() const;
ClientInfo GetClientInfo() const;

bool SendPacket(const std::string& buf);
bool SendPacket(const void* data, size_t size);
Expand Down Expand Up @@ -223,7 +233,6 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
int processInlineCmd(const char*, size_t, std::vector<std::string>&);
void reset();
bool isPeerMaster() const;
int uniqueID() const;

bool isClusterCmdTarget() const;

Expand Down
103 changes: 103 additions & 0 deletions src/client_map.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#include "client_map.h"
#include "log.h"

namespace pikiwidb {

uint32_t ClientMap::GetAllClientInfos(std::vector<ClientInfo>& results) {
// client info string type: ip, port, fd.
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
for (auto& [id, client_weak] : clients_) {
if (auto client = client_weak.lock()) {
results.emplace_back(client->GetClientInfo());
}
}
return results.size();
}

bool ClientMap::AddClient(int id, std::weak_ptr<PClient> client) {
std::unique_lock client_map_lock(client_map_mutex_);
if (clients_.find(id) == clients_.end()) {
clients_.insert({id, client});
return true;
}
return false;
Comment on lines +17 to +23
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议:改进添加客户端的方法。

建议在插入客户端之前检查 client 是否有效。

bool ClientMap::AddClient(int id, std::weak_ptr<PClient> client) {
    if (client.expired()) {
        return false;
    }
    std::unique_lock client_map_lock(client_map_mutex_);
    if (clients_.find(id) == clients_.end()) {
        clients_.insert({id, client});
        return true;
    }
    return false;
}

}

ClientInfo ClientMap::GetClientsInfoById(int id) {
std::shared_lock client_map_lock(client_map_mutex_);
if (auto it = clients_.find(id); it != clients_.end()) {
if (auto client = it->second.lock(); client) {
return client->GetClientInfo();
}
}
ERROR("Client with ID {} not found in GetClientsInfoById", id);
return ClientInfo::invalidClientInfo;
}

bool ClientMap::RemoveClientById(int id) {
std::unique_lock client_map_lock(client_map_mutex_);
if (auto it = clients_.find(id); it != clients_.end()) {
clients_.erase(it);
INFO("Removed client with ID {}", id);
return true;
}
return false;
}

bool ClientMap::KillAllClients() {
std::vector<std::shared_ptr<PClient>> clients_to_close;
{
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
for (auto& [id, client_weak] : clients_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有个疑问,这里close掉client不需要从这个map里去除吗?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我之前是在 tcp_client 的disconnect 回调函数里面从map去掉对应的client。感觉改成在这里直接删除确实更好点,我来改一下

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

刚才仔细想了一下,这里之所以在回调函数里面,是因为存在客户端主动关闭连接的情况。这时候就需要在回调函数里面去删除map里面的记录。

if (auto client = client_weak.lock()) {
clients_to_close.push_back(client);
}
}
}
for (auto& client : clients_to_close) {
client->Close();
}
return true;
}

bool ClientMap::KillClientByAddrPort(const std::string& addr_port) {
std::shared_ptr<PClient> client_to_close;
{
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
for (auto& [id, client_weak] : clients_) {
if (auto client = client_weak.lock()) {
std::string client_ip_port = client->PeerIP() + ":" + std::to_string(client->PeerPort());
if (client_ip_port == addr_port) {
client_to_close = client;
break;
}
}
}
}
if (client_to_close) {
client_to_close->Close();
return true;
}
return false;
}

bool ClientMap::KillClientById(int client_id) {
std::shared_ptr<PClient> client_to_close;
{
std::shared_lock<std::shared_mutex> client_map_lock(client_map_mutex_);
if (auto it = clients_.find(client_id); it != clients_.end()) {
if (auto client = it->second.lock()) {
client_to_close = client;
}
}
}
if (client_to_close) {
INFO("Closing client with ID {}", client_id);
client_to_close->Close();
INFO("Client with ID {} closed", client_id);
return true;
}
return false;
}

} // namespace pikiwidb
41 changes: 41 additions & 0 deletions src/client_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <map>
#include <memory>
#include <shared_mutex>
#include <string>
#include "client.h"

namespace pikiwidb {
class ClientMap {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个 ClientMap 使用了单例模式, 如果把 ClientMap 放到 g_pikiwidb 里会不会更好点. 我的这个意见不一定合适, 这个可以讨论一下

private:
ClientMap() = default;
// 禁用复制构造函数和赋值运算符

private:
std::map<int, std::weak_ptr<PClient>> clients_;
std::shared_mutex client_map_mutex_;

public:
static ClientMap& getInstance() {
static ClientMap instance;
return instance;
}

ClientMap(const ClientMap&) = delete;
ClientMap& operator=(const ClientMap&) = delete;

// client info function
pikiwidb::ClientInfo GetClientsInfoById(int id);
uint32_t GetAllClientInfos(std::vector<ClientInfo>& results);

bool AddClient(int id, std::weak_ptr<PClient>);

bool RemoveClientById(int id);

bool KillAllClients();
bool KillClientById(int client_id);
bool KillClientByAddrPort(const std::string& addr_port);
};

} // namespace pikiwidb
Loading
Loading