Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: command CONFIG SET * reply array length error(#2320) #2323

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/acl.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class AclSelector {
inline bool HasFlags(uint32_t flag) const { return flags_ & flag; };
inline void AddFlags(uint32_t flag) { flags_ |= flag; };
inline void DecFlags(uint32_t flag) { flags_ &= ~flag; };
bool EqualChannel(const std::vector<std::string> &allChannel);
bool EqualChannel(const std::vector<std::string>& allChannel);

private:
pstd::Status SetSelector(const std::string& op);
Expand Down
131 changes: 65 additions & 66 deletions include/pika_admin.h

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
#include <atomic>

#include "pstd/include/env.h"
#include "pstd/include/noncopyable.h"
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_status.h"
#include "pstd/include/noncopyable.h"

#include "include/pika_define.h"


std::string NewFileName(const std::string& name, uint32_t current);

class Version final : public pstd::noncopyable {
Expand Down Expand Up @@ -47,15 +46,16 @@ class Version final : public pstd::noncopyable {

class Binlog : public pstd::noncopyable {
public:
Binlog(std::string Binlog_path, int file_size = 100 * 1024 * 1024);
Binlog(std::string Binlog_path, int file_size = 100 * 1024 * 1024);
~Binlog();

void Lock() { mutex_.lock(); }
void Unlock() { mutex_.unlock(); }

pstd::Status Put(const std::string& item);

pstd::Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr, uint64_t* logic_id = nullptr);
pstd::Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr,
uint64_t* logic_id = nullptr);
/*
* Set Producer pro_num and pro_offset with lock
*/
Expand Down Expand Up @@ -100,7 +100,8 @@ class Binlog : public pstd::noncopyable {

std::unique_ptr<Version> version_;
std::unique_ptr<pstd::WritableFile> queue_;
// versionfile_ can only be used as a shared_ptr, and it will be used as a variable version_ in the ~Version() function.
// versionfile_ can only be used as a shared_ptr, and it will be used as a variable version_ in the ~Version()
// function.
std::shared_ptr<pstd::RWFile> versionfile_;

pstd::Mutex mutex_;
Expand Down
3 changes: 2 additions & 1 deletion include/pika_binlog_transverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class BinlogItem {

class PikaBinlogTransverter {
public:
PikaBinlogTransverter()= default;;
PikaBinlogTransverter() = default;
;
static std::string BinlogEncode(BinlogType type, uint32_t exec_time, uint32_t term_id, uint64_t logic_id,
uint32_t filenum, uint64_t offset, const std::string& content,
const std::vector<std::string>& extends);
Expand Down
16 changes: 8 additions & 8 deletions include/pika_bit.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class BitGetCmd : public Cmd {
void ReadCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new BitGetCmd(*this); }

private:
Expand All @@ -56,8 +56,8 @@ class BitSetCmd : public Cmd {
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new BitSetCmd(*this); }

private:
Expand Down Expand Up @@ -86,8 +86,8 @@ class BitCountCmd : public Cmd {
void ReadCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new BitCountCmd(*this); }

private:
Expand Down Expand Up @@ -118,8 +118,8 @@ class BitPosCmd : public Cmd {
void ReadCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new BitPosCmd(*this); }

private:
Expand Down
50 changes: 26 additions & 24 deletions include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
#include <sstream>
#include <vector>

#include "include/pika_server.h"
#include "cache/include/cache.h"
#include "include/pika_define.h"
#include "include/pika_server.h"
#include "include/pika_zset.h"
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_status.h"
#include "cache/include/cache.h"
#include "storage/storage.h"

enum RangeStatus { RangeError = 1, RangeHit, RangeMiss };
Expand Down Expand Up @@ -44,13 +44,12 @@ struct CacheInfo {
class PikaCacheLoadThread;
class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<PikaCache> {
public:

PikaCache(int zset_cache_start_pos, int zset_cache_field_num_per_key);
~PikaCache();

rocksdb::Status Init(uint32_t cache_num, cache::CacheConfig *cache_cfg);
rocksdb::Status Reset(uint32_t cache_num, cache::CacheConfig *cache_cfg = nullptr);
std::map<storage::DataType, int64_t> TTL(std::string &key, std::map<storage::DataType, rocksdb::Status>* type_status);
std::map<storage::DataType, int64_t> TTL(std::string &key, std::map<storage::DataType, rocksdb::Status> *type_status);
void ResetConfig(cache::CacheConfig *cache_cfg);
void Destroy(void);
void SetCacheStatus(int status);
Expand All @@ -71,7 +70,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status Persist(std::string &key);
rocksdb::Status Type(std::string &key, std::string *value);
rocksdb::Status RandomKey(std::string *key);
rocksdb::Status GetType(const std::string& key, bool single, std::vector<std::string>& types);
rocksdb::Status GetType(const std::string &key, bool single, std::vector<std::string> &types);

// String Commands
rocksdb::Status Set(std::string &key, std::string &value, int64_t ttl);
Expand All @@ -80,9 +79,9 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status SetnxWithoutTTL(std::string &key, std::string &value);
rocksdb::Status Setxx(std::string &key, std::string &value, int64_t ttl);
rocksdb::Status SetxxWithoutTTL(std::string &key, std::string &value);
rocksdb::Status MSet(const std::vector<storage::KeyValue>& kvs);
rocksdb::Status MSet(const std::vector<storage::KeyValue> &kvs);
rocksdb::Status Get(std::string &key, std::string *value);
rocksdb::Status MGet(const std::vector<std::string>& keys, std::vector<storage::ValueStatus>* vss);
rocksdb::Status MGet(const std::vector<std::string> &keys, std::vector<storage::ValueStatus> *vss);
rocksdb::Status Incrxx(std::string &key);
rocksdb::Status Decrxx(std::string &key);
rocksdb::Status IncrByxx(std::string &key, uint64_t incr);
Expand Down Expand Up @@ -115,7 +114,8 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<

// List Commands
rocksdb::Status LIndex(std::string &key, int64_t index, std::string *element);
rocksdb::Status LInsert(std::string &key, storage::BeforeOrAfter &before_or_after, std::string &pivot, std::string &value);
rocksdb::Status LInsert(std::string &key, storage::BeforeOrAfter &before_or_after, std::string &pivot,
std::string &value);
rocksdb::Status LLen(std::string &key, uint64_t *len);
rocksdb::Status LPop(std::string &key, std::string *element);
rocksdb::Status LPush(std::string &key, std::vector<std::string> &values);
Expand Down Expand Up @@ -150,29 +150,32 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status ZCount(std::string &key, std::string &min, std::string &max, uint64_t *len, ZCountCmd *cmd);
rocksdb::Status ZIncrby(std::string &key, std::string &member, double increment);
rocksdb::Status ZIncrbyIfKeyExist(std::string &key, std::string &member, double increment, ZIncrbyCmd *cmd,
const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRange(std::string &key, int64_t start, int64_t stop, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<Slot> &slot);
const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRange(std::string &key, int64_t start, int64_t stop,
std::vector<storage::ScoreMember> *score_members, const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRangebyscore(std::string &key, std::string &min, std::string &max,
std::vector<storage::ScoreMember> *score_members, ZRangebyscoreCmd *cmd);
rocksdb::Status ZRank(std::string &key, std::string &member, int64_t *rank, const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRem(std::string &key, std::vector<std::string> &members, std::shared_ptr<Slot> slot = nullptr);
rocksdb::Status ZRemrangebyrank(std::string &key, std::string &min, std::string &max, int32_t ele_deleted = 0,
const std::shared_ptr<Slot> &slot = nullptr);
rocksdb::Status ZRemrangebyscore(std::string &key, std::string &min, std::string &max, const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRevrange(std::string &key, int64_t start, int64_t stop, std::vector<storage::ScoreMember> *score_members,
const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRemrangebyscore(std::string &key, std::string &min, std::string &max,
const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRevrange(std::string &key, int64_t start, int64_t stop,
std::vector<storage::ScoreMember> *score_members, const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRevrangebyscore(std::string &key, std::string &min, std::string &max,
std::vector<storage::ScoreMember> *score_members, ZRevrangebyscoreCmd *cmd,
const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRevrangebylex(std::string &key, std::string &min, std::string &max, std::vector<std::string> *members,
const std::shared_ptr<Slot> &slot);
std::vector<storage::ScoreMember> *score_members, ZRevrangebyscoreCmd *cmd,
const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRevrangebylex(std::string &key, std::string &min, std::string &max,
std::vector<std::string> *members, const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRevrank(std::string &key, std::string &member, int64_t *rank, const std::shared_ptr<Slot> &slot);
rocksdb::Status ZScore(std::string &key, std::string &member, double *score, const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRangebylex(std::string &key, std::string &min, std::string &max, std::vector<std::string> *members, const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRangebylex(std::string &key, std::string &min, std::string &max, std::vector<std::string> *members,
const std::shared_ptr<Slot> &slot);
rocksdb::Status ZLexcount(std::string &key, std::string &min, std::string &max, uint64_t *len,
const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRemrangebylex(std::string &key, std::string &min, std::string &max, const std::shared_ptr<Slot> &slot);
rocksdb::Status ZRemrangebylex(std::string &key, std::string &min, std::string &max,
const std::shared_ptr<Slot> &slot);

// Bit Commands
rocksdb::Status SetBit(std::string &key, size_t offset, int64_t value);
Expand All @@ -193,16 +196,15 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status CacheZCard(std::string &key, uint64_t *len);

private:

rocksdb::Status InitWithoutLock(uint32_t cache_num, cache::CacheConfig *cache_cfg);
void DestroyWithoutLock(void);
int CacheIndex(const std::string &key);
RangeStatus CheckCacheRange(int32_t cache_len, int32_t db_len, int64_t start, int64_t stop, int64_t &out_start,
int64_t &out_stop);
RangeStatus CheckCacheRevRange(int32_t cache_len, int32_t db_len, int64_t start, int64_t stop, int64_t &out_start,
int64_t &out_stop);
RangeStatus CheckCacheRangeByScore(uint64_t cache_len, double cache_min, double cache_max, double min,
double max, bool left_close, bool right_close);
RangeStatus CheckCacheRangeByScore(uint64_t cache_len, double cache_min, double cache_max, double min, double max,
bool left_close, bool right_close);
bool CacheSizeEqsDB(std::string &key, const std::shared_ptr<Slot> &slot);
void GetMinMaxScore(std::vector<storage::ScoreMember> &score_members, double &min, double &max);
bool GetCacheMinMaxSM(cache::RedisCache *cache_obj, std::string &key, storage::ScoreMember &min_m,
Expand All @@ -220,7 +222,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
int zset_cache_field_num_per_key_ = 0;
std::shared_mutex rwlock_;
std::unique_ptr<PikaCacheLoadThread> cache_load_thread_;
std::vector<cache::RedisCache*> caches_;
std::vector<cache::RedisCache *> caches_;
std::vector<std::shared_ptr<pstd::Mutex>> cache_mutexs_;
};

Expand Down
7 changes: 3 additions & 4 deletions include/pika_cache_load_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.


#ifndef PIKA_CACHE_LOAD_THREAD_H_
#define PIKA_CACHE_LOAD_THREAD_H_

Expand All @@ -13,8 +12,8 @@
#include <vector>

#include "include/pika_cache.h"
#include "include/pika_slot.h"
#include "include/pika_define.h"
#include "include/pika_slot.h"
#include "net/include/net_thread.h"
#include "storage/storage.h"

Expand All @@ -25,7 +24,7 @@ class PikaCacheLoadThread : public net::Thread {

uint64_t AsyncLoadKeysNum(void) { return async_load_keys_num_; }
uint32_t WaittingLoadKeysNum(void) { return waitting_load_keys_num_; }
void Push(const char key_type, std::string& key, const std::shared_ptr<Slot> &slot);
void Push(const char key_type, std::string& key, const std::shared_ptr<Slot>& slot);

private:
bool LoadKV(std::string& key, const std::shared_ptr<Slot>& slot);
Expand All @@ -39,7 +38,7 @@ class PikaCacheLoadThread : public net::Thread {
private:
std::atomic_bool should_exit_;
std::deque<std::tuple<const char, std::string, const std::shared_ptr<Slot>>> loadkeys_queue_;

pstd::CondVar loadkeys_cond_;
pstd::Mutex loadkeys_mutex_;

Expand Down
22 changes: 7 additions & 15 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "acl.h"
#include "include/pika_command.h"


// TODO: stat time costing in write out data to connfd
struct TimeStat {
TimeStat() = default;
Expand All @@ -21,21 +20,13 @@ struct TimeStat {
process_done_ts_ = 0;
}

uint64_t start_ts() const {
return enqueue_ts_;
}
uint64_t start_ts() const { return enqueue_ts_; }

uint64_t total_time() const {
return process_done_ts_ > enqueue_ts_ ? process_done_ts_ - enqueue_ts_ : 0;
}
uint64_t total_time() const { return process_done_ts_ > enqueue_ts_ ? process_done_ts_ - enqueue_ts_ : 0; }

uint64_t queue_time() const {
return dequeue_ts_ > enqueue_ts_ ? dequeue_ts_ - enqueue_ts_ : 0;
}
uint64_t queue_time() const { return dequeue_ts_ > enqueue_ts_ ? dequeue_ts_ - enqueue_ts_ : 0; }

uint64_t process_time() const {
return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0;
}
uint64_t process_time() const { return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0; }

uint64_t enqueue_ts_;
uint64_t dequeue_ts_;
Expand Down Expand Up @@ -104,9 +95,9 @@ class PikaClientConn : public net::RedisConn {
void SetTxnInitFailState(bool is_failed);
void SetTxnStartState(bool is_start);

void AddKeysToWatch(const std::vector<std::string> &db_keys);
void AddKeysToWatch(const std::vector<std::string>& db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string> &db_keys);
void SetTxnFailedFromKeys(const std::vector<std::string>& db_keys);
void SetAllTxnFailed();
void SetTxnFailedFromDBs(std::string db_name);
void ExitTxn();
Expand All @@ -118,6 +109,7 @@ class PikaClientConn : public net::RedisConn {
std::vector<std::shared_ptr<std::string>> resp_array;

std::shared_ptr<TimeStat> time_stat_;

private:
net::ServerThread* const server_thread_;
std::string current_db_;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_client_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#ifndef PIKA_CLIENT_PROCESSOR_H_
#define PIKA_CLIENT_PROCESSOR_H_

#include <memory>
#include <string>
#include <vector>
#include <memory>
#include "net/include/bg_thread.h"
#include "net/include/thread_pool.h"

Expand Down
8 changes: 4 additions & 4 deletions include/pika_cmd_table_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ class PikaCmdTableManager {
std::vector<std::string> GetAclCategoryCmdNames(uint32_t flag);

/*
* Info Commandstats used
*/
* Info Commandstats used
*/
std::unordered_map<std::string, CommandStatistics>* GetCommandStatMap();

private:
Expand All @@ -57,8 +57,8 @@ class PikaCmdTableManager {
std::unordered_map<std::thread::id, std::unique_ptr<PikaDataDistribution>> thread_distribution_map_;

/*
* Info Commandstats used
*/
* Info Commandstats used
*/
std::unordered_map<std::string, CommandStatistics> cmdstat_map_;
};
#endif
2 changes: 1 addition & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ class CmdRes {

void AppendStringVector(const std::vector<std::string>& strArray) {
if (strArray.empty()) {
AppendArrayLen(-1);
AppendArrayLen(0);
return;
}
AppendArrayLen(strArray.size());
Expand Down
Loading
Loading