diff --git a/include/pika_cluster.h b/include/pika_cluster.h deleted file mode 100644 index 589bfdfe0..000000000 --- a/include/pika_cluster.h +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#ifndef PIKA_CLUSTER_H_ -#define PIKA_CLUSTER_H_ - -#include "include/pika_command.h" - -Status ParseSlotGroup(const std::string& slot_group, std::set* slots); - -class PkClusterInfoCmd : public Cmd { - public: - enum InfoSection { kInfoErr = 0x0, kInfoSlot, kInfoTable }; - enum InfoRange { kSingle = 0x0, kAll, kRange }; - PkClusterInfoCmd(const std::string& name, int arity, uint16_t flag) - : Cmd(name, arity, flag), info_section_(kInfoErr), info_range_(kAll) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new PkClusterInfoCmd(*this); } - - private: - InfoSection info_section_; - InfoRange info_range_; - - std::string table_name_; - std::set slots_; - - virtual void DoInitial() override; - virtual void Clear() { - info_section_ = kInfoErr; - info_range_ = kAll; - table_name_.clear(); - slots_.clear(); - } - const static std::string kSlotSection; - const static std::string kTableSection; - void ClusterInfoTableAll(std::string* info); - void ClusterInfoTable(std::string* info); - void ClusterInfoSlotRange(const std::string& table_name, const std::set slots, std::string* info); - void ClusterInfoSlotAll(std::string* info); - Status GetSlotInfo(const std::string table_name, uint32_t partition_id, std::string* info); - bool ParseInfoSlotSubCmd(); - bool ParseInfoTableSubCmd(); -}; -#endif // PIKA_CLUSTER_H_ diff --git a/include/pika_command.h b/include/pika_command.h index 9b37feb12..cd1cc2d5b 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -190,24 +190,6 @@ const std::string kCmdNamePubSub = "pubsub"; const std::string kCmdNamePSubscribe = "psubscribe"; const std::string kCmdNamePUnSubscribe = "punsubscribe"; -// Codis Slots -const std::string kCmdNameSlotsInfo = "slotsinfo"; -const std::string kCmdNameSlotsHashKey = "slotshashkey"; -const std::string kCmdNameSlotsMgrtTagSlotAsync = "slotsmgrttagslot-async"; -const std::string kCmdNameSlotsMgrtSlotAsync = "slotsmgrtslot-async"; -const std::string kCmdNameSlotsDel = "slotsdel"; -const std::string kCmdNameSlotsScan = "slotsscan"; -const std::string kCmdNameSlotsMgrtExecWrapper = "slotsmgrt-exec-wrapper"; -const std::string kCmdNameSlotsMgrtAsyncStatus = "slotsmgrt-async-status"; -const std::string kCmdNameSlotsMgrtAsyncCancel = "slotsmgrt-async-cancel"; -const std::string kCmdNameSlotsMgrtSlot = "slotsmgrtslot"; -const std::string kCmdNameSlotsMgrtTagSlot = "slotsmgrttagslot"; -const std::string kCmdNameSlotsMgrtOne = "slotsmgrtone"; -const std::string kCmdNameSlotsMgrtTagOne = "slotsmgrttagone"; - -// Cluster -const std::string kCmdNamePkClusterInfo = "pkclusterinfo"; - const std::string kClusterPrefix = "pkcluster"; typedef net::RedisCmdArgsType PikaCmdArgsType; static const int RAW_ARGS_LEN = 1024 * 1024; diff --git a/include/pika_slot.h b/include/pika_slot.h deleted file mode 100644 index cce059927..000000000 --- a/include/pika_slot.h +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#ifndef PIKA_SLOT_H_ -#define PIKA_SLOT_H_ - -#include "include/pika_command.h" - -class SlotsInfoCmd : public Cmd { - public: - SlotsInfoCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsInfoCmd(*this); } - - private: - virtual void DoInitial() override; -}; - -class SlotsHashKeyCmd : public Cmd { - public: - SlotsHashKeyCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsHashKeyCmd(*this); } - - private: - virtual void DoInitial() override; -}; - -class SlotsMgrtSlotAsyncCmd : public Cmd { - public: - SlotsMgrtSlotAsyncCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsMgrtSlotAsyncCmd(*this); } - - private: - virtual void DoInitial() override; -}; - -class SlotsMgrtTagSlotAsyncCmd : public Cmd { - public: - SlotsMgrtTagSlotAsyncCmd(const std::string& name, int arity, uint16_t flag) - : Cmd(name, arity, flag), dest_port_(0), slot_num_(-1) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsMgrtTagSlotAsyncCmd(*this); } - - private: - virtual void DoInitial() override; - std::string dest_ip_; - int64_t dest_port_ = 0; - int64_t slot_num_ = -1; - virtual void Clear() { - dest_ip_.clear(); - dest_port_ = 0; - slot_num_ = -1; - } -}; - -class SlotsScanCmd : public Cmd { - public: - SlotsScanCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag), pattern_("*"), count_(10) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsScanCmd(*this); } - - private: - int64_t cursor_ = 0; - uint32_t slotnum_ = 0; - std::string pattern_ = "*"; - int64_t count_ = 10; - virtual void DoInitial() override; - virtual void Clear() { - pattern_ = "*"; - count_ = 10; - } -}; - -class SlotsDelCmd : public Cmd { - public: - SlotsDelCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsDelCmd(*this); } - - private: - std::vector slots_; - virtual void DoInitial() override; - virtual void Clear() { slots_.clear(); } -}; - -class SlotsMgrtExecWrapperCmd : public Cmd { - public: - SlotsMgrtExecWrapperCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsMgrtExecWrapperCmd(*this); } - - private: - std::string key_; - virtual void DoInitial() override; - virtual void Clear() { key_.clear(); } -}; - -class SlotsMgrtAsyncStatusCmd : public Cmd { - public: - SlotsMgrtAsyncStatusCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsMgrtAsyncStatusCmd(*this); } - - private: - virtual void DoInitial() override; -}; - -class SlotsMgrtAsyncCancelCmd : public Cmd { - public: - SlotsMgrtAsyncCancelCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsMgrtAsyncCancelCmd(*this); } - - private: - virtual void DoInitial() override; -}; - -class SlotsMgrtSlotCmd : public Cmd { - public: - SlotsMgrtSlotCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsMgrtSlotCmd(*this); } - - private: - virtual void DoInitial() override; -}; - -class SlotsMgrtTagSlotCmd : public Cmd { - public: - SlotsMgrtTagSlotCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsMgrtTagSlotCmd(*this); } - - private: - virtual void DoInitial() override; -}; - -class SlotsMgrtOneCmd : public Cmd { - public: - SlotsMgrtOneCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsMgrtOneCmd(*this); } - - private: - virtual void DoInitial() override; -}; - -class SlotsMgrtTagOneCmd : public Cmd { - public: - SlotsMgrtTagOneCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - virtual void Do(std::shared_ptr partition = nullptr); - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys){}; - virtual void Merge(){}; - virtual Cmd* Clone() override { return new SlotsMgrtTagOneCmd(*this); } - - private: - virtual void DoInitial() override; -}; - -#endif // PIKA_SLOT_H_ diff --git a/src/pika_cluster.cc b/src/pika_cluster.cc deleted file mode 100644 index d39c97bc4..000000000 --- a/src/pika_cluster.cc +++ /dev/null @@ -1,261 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include "include/pika_cluster.h" -#include "include/pika_cmd_table_manager.h" -#include "include/pika_rm.h" -#include "include/pika_server.h" -#include "include/pika_table.h" - -extern PikaReplicaManager* g_pika_rm; -extern PikaServer* g_pika_server; -extern PikaConf* g_pika_conf; - -const std::string PkClusterInfoCmd::kSlotSection = "slot"; -const std::string PkClusterInfoCmd::kTableSection = "table"; - -// pkcluster info slot table:slot -// pkcluster info table -// pkcluster info node -// pkcluster info cluster -void PkClusterInfoCmd::DoInitial() { - if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNamePkClusterInfo); - return; - } - if (g_pika_conf->classic_mode()) { - res_.SetRes(CmdRes::kErrOther, "PkClusterInfo only support on sharding mode"); - return; - } - if (!strcasecmp(argv_[2].data(), kSlotSection.data())) { - info_section_ = kInfoSlot; - if (!ParseInfoSlotSubCmd()) { - return; - } - } else if (!strcasecmp(argv_[2].data(), kTableSection.data())) { - info_section_ = kInfoTable; - if (!ParseInfoTableSubCmd()) { - return; - } - } else { - info_section_ = kInfoErr; - } - return; -} - -void PkClusterInfoCmd::Do(std::shared_ptr partition) { - std::string info; - switch (info_section_) { - case kInfoSlot: - if (info_range_ == kAll) { - ClusterInfoSlotAll(&info); - } else if (info_range_ == kRange) { - // doesn't process error, if error return nothing - ClusterInfoSlotRange(table_name_, slots_, &info); - } - break; - case kInfoTable: - if (info_range_ == kAll) { - ClusterInfoTableAll(&info); - } else if (info_range_ == kSingle) { - ClusterInfoTable(&info); - } - default: - break; - } - res_.AppendStringLen(info.size()); - res_.AppendContent(info); - return; -} - -void PkClusterInfoCmd::ClusterInfoTableAll(std::string* info) { - std::stringstream tmp_stream; - std::vector table_structs = g_pika_conf->table_structs(); - std::unordered_map table_stat = g_pika_server->ServerAllTableStat(); - for (const auto& table_struct : table_structs) { - std::string table_id = table_struct.table_name.substr(2); - tmp_stream << "table_id: " << table_id << "\r\n"; - tmp_stream << " partition_num: " << table_struct.partition_num << "\r\n"; - QpsStatistic qps = table_stat[table_struct.table_name]; - tmp_stream << " total_commands_processed:" << qps.querynum.load() << "\r\n"; - tmp_stream << " total_write_commands_processed:" << qps.write_querynum.load() << "\r\n"; - tmp_stream << " qps: " << qps.last_sec_querynum << "\r\n"; - tmp_stream << " write_qps: " << qps.last_sec_write_querynum << "\r\n"; - tmp_stream << " read_qps: " << qps.last_sec_querynum - qps.last_sec_write_querynum << "\r\n"; - } - info->append(tmp_stream.str()); -} - -void PkClusterInfoCmd::ClusterInfoTable(std::string* info) { - std::stringstream tmp_stream; - std::vector table_structs = g_pika_conf->table_structs(); - for (const auto& table_struct : table_structs) { - if (table_struct.table_name == table_name_) { - std::string table_id = table_struct.table_name.substr(2); - tmp_stream << "table_id: " << table_id << "\r\n"; - tmp_stream << " partition_num: " << table_struct.partition_num << "\r\n"; - QpsStatistic qps = g_pika_server->ServerTableStat(table_name_); - tmp_stream << " qps: " << qps.last_sec_querynum << "\r\n"; - tmp_stream << " write_qps: " << qps.last_sec_write_querynum << "\r\n"; - tmp_stream << " read_qps: " << qps.last_sec_querynum - qps.last_sec_write_querynum << "\r\n"; - break; - } - } - info->append(tmp_stream.str()); -} - -bool PkClusterInfoCmd::ParseInfoSlotSubCmd() { - if (argv_.size() > 3) { - if (argv_.size() == 4) { - info_range_ = kRange; - std::string tmp(argv_[3]); - size_t pos = tmp.find(':'); - std::string slot_num_str; - if (pos == std::string::npos) { - table_name_ = g_pika_conf->default_table(); - slot_num_str = tmp; - } else { - table_name_ = tmp.substr(0, pos); - slot_num_str = tmp.substr(pos + 1); - } - if (!ParseSlotGroup(slot_num_str, &slots_).ok()) { - res_.SetRes(CmdRes::kInvalidParameter, kCmdNamePkClusterInfo); - return false; - } - } else { - res_.SetRes(CmdRes::kWrongNum, kCmdNamePkClusterInfo); - return false; - } - } - return true; -} - -bool PkClusterInfoCmd::ParseInfoTableSubCmd() { - if (argv_.size() == 3) { - info_range_ = kAll; - } else if (argv_.size() == 4) { - std::string tmp(argv_[3]); - int64_t table_id; - if (!pstd::string2int(tmp.c_str(), tmp.size(), &table_id)) { - res_.SetRes(CmdRes::kInvalidParameter, kCmdNamePkClusterInfo); - return false; - } - table_name_ = "db" + tmp; - info_range_ = kSingle; - } else if (argv_.size() > 4) { - res_.SetRes(CmdRes::kWrongNum, kCmdNamePkClusterInfo); - return false; - } - return true; -} - -void PkClusterInfoCmd::ClusterInfoSlotRange(const std::string& table_name, const std::set slots, - std::string* info) { - std::stringstream tmp_stream; - for (const auto& partition_id : slots) { - std::string p_info; - Status s = GetSlotInfo(table_name, partition_id, &p_info); - if (!s.ok()) { - continue; - } - tmp_stream << p_info; - } - info->append(tmp_stream.str()); -} - -void PkClusterInfoCmd::ClusterInfoSlotAll(std::string* info) { - std::stringstream tmp_stream; - for (const auto& table_item : g_pika_server->tables_) { - std::shared_lock partition_rwl(table_item.second->partitions_rw_); - for (const auto& partition_item : table_item.second->partitions_) { - std::string table_name = table_item.second->GetTableName(); - uint32_t partition_id = partition_item.second->GetPartitionId(); - std::string p_info; - Status s = GetSlotInfo(table_name, partition_id, &p_info); - if (!s.ok()) { - continue; - } - tmp_stream << p_info; - } - } - info->append(tmp_stream.str()); -} - -Status PkClusterInfoCmd::GetSlotInfo(const std::string table_name, uint32_t partition_id, std::string* info) { - std::shared_ptr partition = - g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id)); - if (!partition) { - return Status::NotFound("not found"); - } - Status s; - std::stringstream tmp_stream; - - // binlog offset section - uint32_t filenum = 0; - uint64_t offset = 0; - partition->Logger()->GetProducerStatus(&filenum, &offset); - tmp_stream << partition->PartitionName() << " binlog_offset=" << filenum << " " << offset; - - // safety purge section - std::string safety_purge; - std::shared_ptr master_partition = - g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id)); - if (!master_partition) { - LOG(WARNING) << "Sync Master Partition: " << table_name << ":" << partition_id << ", NotFound"; - s = Status::NotFound("SyncMasterPartition NotFound"); - } else { - master_partition->GetSafetyPurgeBinlog(&safety_purge); - } - tmp_stream << ",safety_purge=" << (s.ok() ? safety_purge : "error") << "\r\n"; - - if (g_pika_conf->consensus_level()) { - LogOffset last_log = master_partition->ConsensusLastIndex(); - tmp_stream << " consensus_last_log=" << last_log.ToString() << "\r\n"; - } - - // partition info section - std::string p_info; - s = g_pika_rm->GetPartitionInfo(table_name, partition_id, &p_info); - if (!s.ok()) { - return s; - } - tmp_stream << p_info; - info->append(tmp_stream.str()); - return Status::OK(); -} - -Status ParseSlotGroup(const std::string& slot_group, std::set* slots) { - std::set tmp_slots; - int64_t slot_idx, start_idx, end_idx; - std::string::size_type pos; - std::vector elems; - pstd::StringSplit(slot_group, COMMA, elems); - for (const auto& elem : elems) { - if ((pos = elem.find("-")) == std::string::npos) { - if (!pstd::string2int(elem.data(), elem.size(), &slot_idx) || slot_idx < 0) { - return Status::Corruption("syntax error"); - } else { - tmp_slots.insert(static_cast(slot_idx)); - } - } else { - if (pos == 0 || pos == (elem.size() - 1)) { - return Status::Corruption("syntax error"); - } else { - std::string start_pos = elem.substr(0, pos); - std::string end_pos = elem.substr(pos + 1, elem.size() - pos); - if (!pstd::string2int(start_pos.data(), start_pos.size(), &start_idx) || - !pstd::string2int(end_pos.data(), end_pos.size(), &end_idx) || start_idx < 0 || end_idx < 0 || - start_idx > end_idx) { - return Status::Corruption("syntax error"); - } - for (int64_t idx = start_idx; idx <= end_idx; ++idx) { - tmp_slots.insert(static_cast(idx)); - } - } - } - } - slots->swap(tmp_slots); - return Status::OK(); -} diff --git a/src/pika_command.cc b/src/pika_command.cc index ff1fa315e..319fbceaf 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -7,7 +7,6 @@ #include "include/pika_admin.h" #include "include/pika_bit.h" -#include "include/pika_cluster.h" #include "include/pika_cmd_table_manager.h" #include "include/pika_geo.h" #include "include/pika_hash.h" @@ -18,7 +17,6 @@ #include "include/pika_rm.h" #include "include/pika_server.h" #include "include/pika_set.h" -#include "include/pika_slot.h" #include "include/pika_zset.h" extern PikaServer* g_pika_server; @@ -81,42 +79,6 @@ void InitCmdTable(std::unordered_map* cmd_table) { Cmd* quitptr = new QuitCmd(kCmdNameQuit, 1, kCmdFlagsRead); cmd_table->insert(std::pair(kCmdNameQuit, quitptr)); - // Slots related - Cmd* slotsinfoptr = new SlotsInfoCmd(kCmdNameSlotsInfo, -1, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsInfo, slotsinfoptr)); - Cmd* slotshashkeyptr = new SlotsHashKeyCmd(kCmdNameSlotsHashKey, -2, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsHashKey, slotshashkeyptr)); - Cmd* slotmgrtslotasyncptr = new SlotsMgrtSlotAsyncCmd(kCmdNameSlotsMgrtSlotAsync, 8, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsMgrtSlotAsync, slotmgrtslotasyncptr)); - Cmd* slotmgrttagslotasyncptr = - new SlotsMgrtTagSlotAsyncCmd(kCmdNameSlotsMgrtTagSlotAsync, 8, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsMgrtTagSlotAsync, slotmgrttagslotasyncptr)); - Cmd* slotsdelptr = new SlotsDelCmd(kCmdNameSlotsDel, -2, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsDel, slotsdelptr)); - Cmd* slotsscanptr = new SlotsScanCmd(kCmdNameSlotsScan, -3, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsScan, slotsscanptr)); - Cmd* slotmgrtexecwrapper = - new SlotsMgrtExecWrapperCmd(kCmdNameSlotsMgrtExecWrapper, -3, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsMgrtExecWrapper, slotmgrtexecwrapper)); - Cmd* slotmgrtasyncstatus = - new SlotsMgrtAsyncStatusCmd(kCmdNameSlotsMgrtAsyncStatus, 1, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsMgrtAsyncStatus, slotmgrtasyncstatus)); - Cmd* slotmgrtasynccancel = - new SlotsMgrtAsyncCancelCmd(kCmdNameSlotsMgrtAsyncCancel, 1, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsMgrtAsyncCancel, slotmgrtasynccancel)); - Cmd* slotmgrtslotptr = new SlotsMgrtSlotCmd(kCmdNameSlotsMgrtSlot, 5, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsMgrtSlot, slotmgrtslotptr)); - Cmd* slotmgrttagslotptr = new SlotsMgrtTagSlotCmd(kCmdNameSlotsMgrtTagSlot, 5, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsMgrtTagSlot, slotmgrttagslotptr)); - Cmd* slotmgrtoneptr = new SlotsMgrtOneCmd(kCmdNameSlotsMgrtOne, 5, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsMgrtOne, slotmgrtoneptr)); - Cmd* slotmgrttagoneptr = new SlotsMgrtTagOneCmd(kCmdNameSlotsMgrtTagOne, 5, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNameSlotsMgrtTagOne, slotmgrttagoneptr)); - - // Cluster related - Cmd* pkclusterinfoptr = new PkClusterInfoCmd(kCmdNamePkClusterInfo, -3, kCmdFlagsRead | kCmdFlagsAdmin); - cmd_table->insert(std::pair(kCmdNamePkClusterInfo, pkclusterinfoptr)); - // Kv ////SetCmd Cmd* setptr = new SetCmd(kCmdNameSet, -3, kCmdFlagsWrite | kCmdFlagsSinglePartition | kCmdFlagsKv); diff --git a/src/pika_slot.cc b/src/pika_slot.cc deleted file mode 100644 index 11bd7227c..000000000 --- a/src/pika_slot.cc +++ /dev/null @@ -1,441 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include "include/pika_slot.h" -#include "include/pika_cmd_table_manager.h" -#include "include/pika_rm.h" -#include "include/pika_server.h" -#include "include/pika_table.h" - -extern PikaCmdTableManager* g_pika_cmd_table_manager; -extern PikaReplicaManager* g_pika_rm; -extern PikaServer* g_pika_server; -extern PikaConf* g_pika_conf; - -// SLOTSINFO -void SlotsInfoCmd::DoInitial() { - if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNameSlotsInfo); - return; - } - - if (g_pika_conf->classic_mode()) { - res_.SetRes(CmdRes::kErrOther, "SLOTSINFO only support on sharding mode"); - return; - } - - return; -} - -void SlotsInfoCmd::Do(std::shared_ptr partition) { - std::shared_ptr table_ptr = g_pika_server->GetTable(g_pika_conf->default_table()); - if (!table_ptr) { - res_.SetRes(CmdRes::kNotFound, kCmdNameSlotsInfo); - return; - } - table_ptr->KeyScan(); - // this get will get last time scan info - KeyScanInfo key_scan_info = table_ptr->GetKeyScanInfo(); - - std::map infos; - Status s = table_ptr->GetPartitionsKeyScanInfo(&infos); - if (!s.ok()) { - res_.SetRes(CmdRes::kInvalidParameter, kCmdNameSlotsInfo); - return; - } - res_.AppendArrayLen(infos.size()); - for (auto& key_info : infos) { - uint64_t total_key_size = 0; - for (size_t idx = 0; idx < key_info.second.key_infos.size(); ++idx) { - total_key_size += key_info.second.key_infos[idx].keys; - } - res_.AppendArrayLen(2); - res_.AppendInteger(key_info.first); - res_.AppendInteger(total_key_size); - } - return; -} - -// SLOTSHASHKEY key1 [key2 …] -void SlotsHashKeyCmd::DoInitial() { - if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNameSlotsHashKey); - return; - } - - if (g_pika_conf->classic_mode()) { - res_.SetRes(CmdRes::kErrOther, "SLOTSHASHKEY only support on sharding mode"); - return; - } - - return; -} - -void SlotsHashKeyCmd::Do(std::shared_ptr partition) { - res_.AppendArrayLen(argv_.size() - 1); - std::shared_ptr
table_ptr = g_pika_server->GetTable(g_pika_conf->default_table()); - if (!table_ptr) { - res_.SetRes(CmdRes::kInvalidParameter, kCmdNameSlotsHashKey); - return; - } - uint32_t partition_num = table_ptr->PartitionNum(); - // iter starts from real key, first item in argv_ is command name - std::vector::const_iterator iter = argv_.begin() + 1; - for (; iter != argv_.end(); iter++) { - res_.AppendInteger(g_pika_cmd_table_manager->DistributeKey(*iter, partition_num)); - } - return; -} - -// slotsmgrtslot-async host port timeout maxbulks maxbytes slot numkeys -void SlotsMgrtSlotAsyncCmd::DoInitial() { - if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNameSlotsMgrtSlotAsync); - return; - } - - if (g_pika_conf->classic_mode()) { - res_.SetRes(CmdRes::kErrOther, "SLOTSMGRTTAGSLOT-ASYNC only support on sharding mode"); - return; - } - - return; -} - -void SlotsMgrtSlotAsyncCmd::Do(std::shared_ptr partition) { - int64_t moved = 0; - int64_t remained = 0; - res_.AppendArrayLen(2); - res_.AppendInteger(moved); - res_.AppendInteger(remained); -} - -// SLOTSMGRTTAGSLOT-ASYNC host port timeout maxbulks maxbytes slot numkeys -void SlotsMgrtTagSlotAsyncCmd::DoInitial() { - if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNameSlotsMgrtTagSlotAsync); - return; - } - - if (g_pika_conf->classic_mode()) { - res_.SetRes(CmdRes::kErrOther, "SLOTSMGRTTAGSLOT-ASYNC only support on sharding mode"); - return; - } - - PikaCmdArgsType::const_iterator it = argv_.begin() + 1; // Remember the first args is the opt name - dest_ip_ = *it++; - pstd::StringToLower(dest_ip_); - - std::string str_dest_port = *it++; - if (!pstd::string2int(str_dest_port.data(), str_dest_port.size(), &dest_port_) || dest_port_ <= 0) { - res_.SetRes(CmdRes::kInvalidInt, kCmdNameSlotsMgrtTagSlotAsync); - return; - } - - if ((dest_ip_ == "127.0.0.1" || dest_ip_ == g_pika_server->host()) && dest_port_ == g_pika_server->port()) { - res_.SetRes(CmdRes::kErrOther, "destination address error"); - return; - } - - std::string str_timeout_ms = *it++; - - std::string str_max_bulks = *it++; - - std::string str_max_bytes_ = *it++; - - std::string str_slot_num = *it++; - - std::shared_ptr
table = g_pika_server->GetTable(table_name_); - if (table == nullptr) { - res_.SetRes(CmdRes::kNotFound, kCmdNameSlotsMgrtTagSlotAsync); - return; - } - - if (!pstd::string2int(str_slot_num.data(), str_slot_num.size(), &slot_num_) || slot_num_ < 0 || - slot_num_ >= table->PartitionNum()) { - res_.SetRes(CmdRes::kInvalidInt, kCmdNameSlotsMgrtTagSlotAsync); - return; - } - - std::string str_keys_num = *it++; - return; -} - -void SlotsMgrtTagSlotAsyncCmd::Do(std::shared_ptr partition) { - int64_t moved = 0; - int64_t remained = 0; - // check if this slave node exist. - // if exist, dont mark migrate done - // cache coming request in codis proxy and keep retrying - // Until sync done, new node slaveof no one. - // mark this migrate done - // proxy retry cached request in new node - bool is_exist = true; - std::shared_ptr master_partition = - g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name_, slot_num_)); - if (!master_partition) { - LOG(WARNING) << "Sync Master Partition: " << table_name_ << ":" << slot_num_ << ", NotFound"; - res_.SetRes(CmdRes::kNotFound, kCmdNameSlotsMgrtTagSlotAsync); - return; - } - is_exist = master_partition->CheckSlaveNodeExist(dest_ip_, dest_port_); - if (is_exist) { - remained = 1; - } else { - remained = 0; - } - res_.AppendArrayLen(2); - res_.AppendInteger(moved); - res_.AppendInteger(remained); -} - -// SLOTSSCAN slotnum cursor [COUNT count] -void SlotsScanCmd::DoInitial() { - if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNameSlotsScan); - return; - } - - if (g_pika_conf->classic_mode()) { - res_.SetRes(CmdRes::kErrOther, "SLOTSSCAN only support on sharding mode"); - return; - } - - int64_t slotnum; - if (!pstd::string2int(argv_[1].data(), argv_[1].size(), &slotnum)) { - res_.SetRes(CmdRes::kInvalidInt, kCmdNameSlotsScan); - return; - } - slotnum_ = static_cast(slotnum); - if (!pstd::string2int(argv_[2].data(), argv_[2].size(), &cursor_)) { - res_.SetRes(CmdRes::kInvalidInt, kCmdNameSlotsScan); - return; - } - size_t argc = argv_.size(), index = 3; - - while (index < argc) { - std::string opt = argv_[index]; - if (!strcasecmp(opt.data(), "match") || !strcasecmp(opt.data(), "count")) { - index++; - if (index >= argc) { - res_.SetRes(CmdRes::kSyntaxErr); - return; - } - if (!strcasecmp(opt.data(), "match")) { - pattern_ = argv_[index]; - } else if (!pstd::string2int(argv_[index].data(), argv_[index].size(), &count_) || count_ <= 0) { - res_.SetRes(CmdRes::kInvalidInt); - return; - } - } else { - res_.SetRes(CmdRes::kSyntaxErr); - return; - } - index++; - } - return; -} - -void SlotsScanCmd::Do(std::shared_ptr partition) { - std::shared_ptr
table_ptr = g_pika_server->GetTable(g_pika_conf->default_table()); - if (!table_ptr) { - res_.SetRes(CmdRes::kNotFound, kCmdNameSlotsScan); - return; - } - std::shared_ptr cur_partition = table_ptr->GetPartitionById(slotnum_); - if (!cur_partition) { - res_.SetRes(CmdRes::kNotFound, kCmdNameSlotsScan); - return; - } - std::vector keys; - int64_t cursor_ret = cur_partition->db()->Scan(storage::DataType::kAll, cursor_, pattern_, count_, &keys); - - res_.AppendArrayLen(2); - - char buf[32]; - int len = pstd::ll2string(buf, sizeof(buf), cursor_ret); - res_.AppendStringLen(len); - res_.AppendContent(buf); - - res_.AppendArrayLen(keys.size()); - std::vector::iterator iter; - for (iter = keys.begin(); iter != keys.end(); iter++) { - res_.AppendStringLen(iter->size()); - res_.AppendContent(*iter); - } - return; -} - -// SLOTSDEL slot1 [slot2 …] -void SlotsDelCmd::DoInitial() { - if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNameSlotsDel); - return; - } - - if (g_pika_conf->classic_mode()) { - res_.SetRes(CmdRes::kErrOther, "SLOTSDEL only support on sharding mode"); - return; - } - - // iter starts from real key, first item in argv_ is command name - std::vector::const_iterator iter = argv_.begin() + 1; - for (; iter != argv_.end(); iter++) { - int64_t slotnum; - if (!pstd::string2int(iter->data(), iter->size(), &slotnum)) { - res_.SetRes(CmdRes::kInvalidInt, kCmdNameSlotsDel); - return; - } - slots_.push_back(static_cast(slotnum)); - } - return; -} - -void SlotsDelCmd::Do(std::shared_ptr partition) { - std::shared_ptr
table_ptr = g_pika_server->GetTable(g_pika_conf->default_table()); - if (!table_ptr) { - res_.SetRes(CmdRes::kNotFound, kCmdNameSlotsDel); - return; - } - if (table_ptr->IsKeyScaning()) { - res_.SetRes(CmdRes::kErrOther, "The keyscan operation is executing, Try again later"); - return; - } - std::vector successed_slots; - for (auto& slotnum : slots_) { - std::shared_ptr cur_partition = table_ptr->GetPartitionById(slotnum); - if (!cur_partition) { - continue; - } - cur_partition->FlushDB(); - successed_slots.push_back(slotnum); - } - res_.AppendArrayLen(successed_slots.size()); - for (auto& slotnum : successed_slots) { - res_.AppendArrayLen(2); - res_.AppendInteger(slotnum); - res_.AppendInteger(0); - } - return; -} - -// SLOTSMGRT-EXEC-WRAPPER $hashkey $command [$arg1 ...] -void SlotsMgrtExecWrapperCmd::DoInitial() { - if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNameSlotsMgrtExecWrapper); - return; - } - - if (g_pika_conf->classic_mode()) { - res_.SetRes(CmdRes::kErrOther, "SLOTSMGRT-EXEC-WRAPPER only support on sharding mode"); - return; - } - - PikaCmdArgsType::const_iterator it = argv_.begin() + 1; - key_ = *it++; - // pstd::StringToLower(key_); - return; -} - -void SlotsMgrtExecWrapperCmd::Do(std::shared_ptr partition) { - // return 0 means proxy will request to new slot server - // return 1 means proxy will keey trying - // return 2 means return this key directly - res_.AppendArrayLen(2); - res_.AppendInteger(1); - res_.AppendInteger(1); - return; -} - -// slotsmgrt-async-status -void SlotsMgrtAsyncStatusCmd::DoInitial() { - if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNameSlotsMgrtAsyncStatus); - return; - } - - if (g_pika_conf->classic_mode()) { - res_.SetRes(CmdRes::kErrOther, "SLOTSMGRT-ASYNC-STATUS only support on sharding mode"); - return; - } - - return; -} - -void SlotsMgrtAsyncStatusCmd::Do(std::shared_ptr partition) { - std::string status; - std::string ip = "none"; - int64_t port = -1, slot = -1, moved = -1, remained = -1; - std::string mstatus = "no"; - res_.AppendArrayLen(5); - status = "dest server: " + ip + ":" + std::to_string(port); - res_.AppendStringLen(status.size()); - res_.AppendContent(status); - status = "slot number: " + std::to_string(slot); - res_.AppendStringLen(status.size()); - res_.AppendContent(status); - status = "migrating : " + mstatus; - res_.AppendStringLen(status.size()); - res_.AppendContent(status); - status = "moved keys : " + std::to_string(moved); - res_.AppendStringLen(status.size()); - res_.AppendContent(status); - status = "remain keys: " + std::to_string(remained); - res_.AppendStringLen(status.size()); - res_.AppendContent(status); - return; -} - -// slotsmgrt-async-cancel -void SlotsMgrtAsyncCancelCmd::DoInitial() { - if (!CheckArg(argv_.size())) { - res_.SetRes(CmdRes::kWrongNum, kCmdNameSlotsMgrtAsyncCancel); - return; - } - - if (g_pika_conf->classic_mode()) { - res_.SetRes(CmdRes::kErrOther, "SLOTSMGRT-ASYNC-CANCEL only support on sharding mode"); - return; - } - - return; -} - -void SlotsMgrtAsyncCancelCmd::Do(std::shared_ptr partition) { - res_.SetRes(CmdRes::kOk); - return; -} - -// slotsmgrtslot host port timeout slot -void SlotsMgrtSlotCmd::DoInitial() { - res_.SetRes(CmdRes::kErrOther, kCmdNameSlotsMgrtSlot + " NOT supported"); - return; -} - -void SlotsMgrtSlotCmd::Do(std::shared_ptr partition) { return; } - -// slotsmgrttagslot host port timeout slot -void SlotsMgrtTagSlotCmd::DoInitial() { - res_.SetRes(CmdRes::kErrOther, kCmdNameSlotsMgrtTagSlot + " NOT supported"); - return; -} - -void SlotsMgrtTagSlotCmd::Do(std::shared_ptr partition) { return; } - -// slotsmgrtone host port timeout key -void SlotsMgrtOneCmd::DoInitial() { - res_.SetRes(CmdRes::kErrOther, kCmdNameSlotsMgrtOne + " NOT supported"); - return; -} - -void SlotsMgrtOneCmd::Do(std::shared_ptr partition) { return; } - -// slotsmgrttagone host port timeout key -void SlotsMgrtTagOneCmd::DoInitial() { - res_.SetRes(CmdRes::kErrOther, kCmdNameSlotsMgrtTagOne + " NOT supported"); - return; -} - -void SlotsMgrtTagOneCmd::Do(std::shared_ptr partition) { return; }