Skip to content

Commit

Permalink
fix: init every class member's value to defeat the redis-cli connecti…
Browse files Browse the repository at this point in the history
…on closed by pika (OpenAtomFoundation#1390)

* update version

* fix bug: wrong buffer size in vsnprintf

* fix holyThread close fd bug

* handle ewouldblock error

* add output macro

* init NetConn & RedisConn & ClientConn members' value in its class

* delete output macro

* delete ks.sh

* reformat output macro

* init NetConn & RedisConn & ClientConn members' value in its class

* add connection log

* recove GetTestDirectory

* do not init non-POD obj
  • Loading branch information
AlexStocks authored Apr 26, 2023
1 parent 572ecc5 commit f3314a4
Show file tree
Hide file tree
Showing 25 changed files with 106 additions and 65 deletions.
2 changes: 1 addition & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class PikaClientConn : public net::RedisConn {
net::ServerThread* const server_thread_;
std::string current_table_;
WriteCompleteCallback write_completed_cb_;
bool is_pubsub_;
bool is_pubsub_ = false;

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
std::shared_ptr<std::string> resp_ptr);
Expand Down
8 changes: 4 additions & 4 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,17 +471,17 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
void LogCommand() const;

std::string name_;
int arity_;
uint16_t flag_;
int arity_ = -2;
uint16_t flag_ = 0;

CmdRes res_;
PikaCmdArgsType argv_;
std::string table_name_;

std::weak_ptr<net::NetConn> conn_;
std::weak_ptr<std::string> resp_;
CmdStage stage_;
uint64_t do_duration_;
CmdStage stage_ = kNone;
uint64_t do_duration_ = 0;

private:
virtual void DoInitial() = 0;
Expand Down
3 changes: 1 addition & 2 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class PikaDispatchThread {
explicit ClientConnFactory(int max_conn_rbuf_size) : max_conn_rbuf_size_(max_conn_rbuf_size) {}
virtual std::shared_ptr<net::NetConn> NewNetConn(int connfd, const std::string& ip_port, net::Thread* server_thread,
void* worker_specific_data, net::NetMultiplexer* net) const {
return std::static_pointer_cast<net::NetConn>(std::make_shared<PikaClientConn>(
connfd, ip_port, server_thread, net, net::HandleType::kAsynchronous, max_conn_rbuf_size_));
return std::make_shared<PikaClientConn>(connfd, ip_port, server_thread, net, net::HandleType::kAsynchronous, max_conn_rbuf_size_);
}

private:
Expand Down
2 changes: 1 addition & 1 deletion include/pika_version.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@

#define PIKA_MAJOR 3
#define PIKA_MINOR 4
#define PIKA_PATCH 1
#define PIKA_PATCH 2

#endif // INCLUDE_PIKA_VERSION_H_
26 changes: 17 additions & 9 deletions src/net/include/net_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define NET_INCLUDE_NET_CONN_H_

#include <sys/time.h>
#include <sstream>
#include <string>

#ifdef __ENABLE_SSL
Expand All @@ -17,6 +18,7 @@
#include "net/include/net_define.h"
#include "net/include/server_thread.h"
#include "net/src/net_multiplexer.h"
#include "pstd/include/testutil.h"

namespace net {

Expand All @@ -28,7 +30,7 @@ class NetConn : public std::enable_shared_from_this<NetConn> {
virtual ~NetConn();

/*
* Set the fd to nonblock && set the flag_ the the fd flag
* Set the fd to nonblock && set the flag_ the fd flag
*/
bool SetNonblock();

Expand Down Expand Up @@ -64,7 +66,7 @@ class NetConn : public std::enable_shared_from_this<NetConn> {
void set_name(std::string name) { name_ = std::move(name); }

bool IsClose() { return close_; }
void SetClose(bool close) { close_ = close; }
void SetClose(bool close);

void set_last_interaction(const struct timeval& now) { last_interaction_ = now; }

Expand All @@ -76,30 +78,36 @@ class NetConn : public std::enable_shared_from_this<NetConn> {

NetMultiplexer* net_multiplexer() const { return net_multiplexer_; }

std::string String() const {
std::stringstream ss;
ss << "fd: " << fd_ << ", ip_port: " << ip_port_ << ", name: " << name_ << ", is_reply: " << is_reply_ << ", close: " << close_;
return ss.str();
}

#ifdef __ENABLE_SSL
SSL* ssl() { return ssl_; }

bool security() { return ssl_ != nullptr; }
#endif

private:
int fd_;
int fd_ = -1;
std::string ip_port_;
bool is_reply_;
bool is_writable_;
bool close_;
bool is_reply_ = false;
bool is_writable_ = false;
bool close_ = false;
struct timeval last_interaction_;
int flags_;
int flags_ = 0;
std::string name_;

#ifdef __ENABLE_SSL
SSL* ssl_;
#endif

// thread this conn belong to
Thread* thread_;
Thread* thread_ = nullptr;
// the net epoll this conn belong to
NetMultiplexer* net_multiplexer_;
NetMultiplexer* net_multiplexer_ = nullptr;

/*
* No allowed copy and copy assign operator
Expand Down
18 changes: 9 additions & 9 deletions src/net/include/redis_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ class RedisConn : public NetConn {
static int ParserCompleteCb(RedisParser* parser, const std::vector<RedisCmdArgsType>& argvs);
ReadStatus ParseRedisParserStatus(RedisParserStatus status);

HandleType handle_type_;
HandleType handle_type_ = kSynchronous;

char* rbuf_;
int rbuf_len_;
int rbuf_max_len_;
int msg_peak_;
int command_len_;
char* rbuf_ = nullptr;
int rbuf_len_ = 0;
int rbuf_max_len_ = 0;
int msg_peak_ = 0;
int command_len_ = 0;

uint32_t wbuf_pos_;
uint32_t wbuf_pos_ = 0;
std::string response_;

// For Redis Protocol parser
int last_read_pos_;
int last_read_pos_ = -1;
RedisParser redis_parser_;
long bulk_len_;
long bulk_len_ = -1;
};

} // namespace net
Expand Down
6 changes: 3 additions & 3 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ class ServerThread : public Thread {
friend class DispatchThread;
friend class WorkerThread;

int cron_interval_;
int cron_interval_ = 0;
virtual void DoCronTask();

// process events in notify_queue
virtual void ProcessNotifyEvents(const NetFiredEvent* pfe);

const ServerHandle* handle_;
bool own_handle_;
bool own_handle_ = false;

#ifdef __ENABLE_SSL
bool security_;
Expand All @@ -186,7 +186,7 @@ class ServerThread : public Thread {
/*
* The tcp server port and address
*/
int port_;
int port_ = -1;
std::set<std::string> ips_;
std::vector<ServerSocket*> server_sockets_;
std::set<int32_t> server_fds_;
Expand Down
4 changes: 3 additions & 1 deletion src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

#include <vector>

#include "net/src/dispatch_thread.h"
#include <glog/logging.h>

#include "net/src/dispatch_thread.h"
#include "net/src/net_item.h"
#include "net/src/net_multiplexer.h"
#include "net/src/worker_thread.h"
Expand Down Expand Up @@ -150,6 +151,7 @@ void DispatchThread::HandleNewConn(const int connfd, const std::string& ip_port)
// Slow workers may consume many fds.
// We simply loop to find next legal worker.
NetItem ti(connfd, ip_port);
LOG(INFO) << "accept new conn " << ti.String();
int next_thread = last_thread_;
bool find = false;
for (int cnt = 0; cnt < work_num_; cnt++) {
Expand Down
3 changes: 3 additions & 0 deletions src/net/src/holy_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ void HolyThread::DoCronTask() {
}
conns_.clear();
deleting_conn_ipport_.clear();
for (const auto conn : to_close) {
CloseFd(conn);
}
return;
}

Expand Down
6 changes: 3 additions & 3 deletions src/net/src/holy_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class NetConn;

class HolyThread : public ServerThread {
public:
// This type thread thread will listen and work self list redis thread
// This type thread will listen and work self list redis thread
HolyThread(int port, ConnFactory* conn_factory, int cron_interval = 0, const ServerHandle* handle = nullptr,
bool async = true);
HolyThread(const std::string& bind_ip, int port, ConnFactory* conn_factory, int cron_interval = 0,
Expand Down Expand Up @@ -57,8 +57,8 @@ class HolyThread : public ServerThread {
mutable pstd::RWMutex rwlock_; /* For external statistics */
std::map<int, std::shared_ptr<NetConn>> conns_;

ConnFactory* conn_factory_;
void* private_data_;
ConnFactory* conn_factory_ = nullptr;
void* private_data_ = nullptr;

std::atomic<int> keepalive_timeout_; // keepalive second
bool async_;
Expand Down
4 changes: 4 additions & 0 deletions src/net/src/net_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ NetConn::~NetConn() {
#endif
}

void NetConn::SetClose(bool close) {
close_ = close;
}

bool NetConn::SetNonblock() {
flags_ = Setnonblocking(fd());
if (flags_ == -1) {
Expand Down
7 changes: 5 additions & 2 deletions src/net/src/net_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ class NetItem {

int fd() const { return fd_; }
std::string ip_port() const { return ip_port_; }
std::string String() const {
return std::to_string(fd_) + ":" + ip_port_ + ":" + std::to_string(notify_type_);
}

NotifyType notify_type() const { return notify_type_; }

private:
int fd_;
int fd_ = -1;
std::string ip_port_;
NotifyType notify_type_;
NotifyType notify_type_ = kNotiConnect;
};

} // namespace net
Expand Down
2 changes: 1 addition & 1 deletion src/net/src/redis_cli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ int redisvFormatCommand(std::string* cmd, const char* format, va_list ap) {
memcpy(_format, c, _l);
_format[_l] = '\0';

int n = vsnprintf(buf, REDIS_MAX_MESSAGE, _format, _cpy);
int n = vsnprintf(buf, sizeof(buf), _format, _cpy);
curarg.append(buf, n);

/* Update current position (note: outer blocks
Expand Down
4 changes: 2 additions & 2 deletions src/net/src/redis_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ ReadStatus RedisConn::GetRequest() {

nread = read(fd(), rbuf_ + next_read_pos, remain);
if (nread == -1) {
if (errno == EAGAIN) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
nread = 0;
return kReadHalf; // HALF
} else {
Expand Down Expand Up @@ -150,7 +150,7 @@ WriteStatus RedisConn::SendReply() {
}
}
if (nwritten == -1) {
if (errno == EAGAIN) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return kWriteHalf;
} else {
// Here we should close the connection
Expand Down
1 change: 1 addition & 0 deletions src/net/src/server_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "net/src/server_socket.h"
#include "pstd/include/xdebug.h"
#include "pstd/include/testutil.h"

namespace net {

Expand Down
7 changes: 7 additions & 0 deletions src/net/src/worker_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

#include <vector>

#include <glog/logging.h>

#include "pstd/include/testutil.h"
#include "net/src/worker_thread.h"

#include "net/include/net_conn.h"
Expand Down Expand Up @@ -49,6 +52,7 @@ std::shared_ptr<NetConn> WorkerThread::MoveConnOut(int fd) {
int fd = iter->first;
conn = iter->second;
net_multiplexer_->NetDelEvent(fd, 0);
DLOG(INFO) << "move out connection " << conn->String();
conns_.erase(iter);
}
return conn;
Expand Down Expand Up @@ -171,6 +175,7 @@ void* WorkerThread::ThreadMain() {
in_conn->set_is_reply(false);
if (in_conn->IsClose()) {
should_close = 1;
LOG(INFO) << "will close client connection " << in_conn->String();
}
} else if (write_status == kWriteHalf) {
continue;
Expand Down Expand Up @@ -238,13 +243,15 @@ void WorkerThread::DoCronTask() {
to_close.push_back(conn);
deleting_conn_ipport_.erase(conn->ip_port());
iter = conns_.erase(iter);
LOG(INFO) << "will close client connection " << conn->String();
continue;
}

// Check keepalive timeout connection
if (keepalive_timeout_ > 0 && (now.tv_sec - conn->last_interaction().tv_sec > keepalive_timeout_)) {
to_timeout.push_back(conn);
iter = conns_.erase(iter);
LOG(INFO) << "connection " << conn->String() << " keepalive timeout, the keepalive_timeout_ is " << keepalive_timeout_.load();
continue;
}

Expand Down
8 changes: 4 additions & 4 deletions src/net/src/worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ class WorkerThread : public Thread {
mutable pstd::RWMutex rwlock_; /* For external statistics */
std::map<int, std::shared_ptr<NetConn>> conns_;

void* private_data_;
void* private_data_ = nullptr;

private:
ServerThread* server_thread_;
ConnFactory* conn_factory_;
int cron_interval_;
ServerThread* server_thread_ = nullptr;
ConnFactory* conn_factory_ = nullptr;
int cron_interval_ = 0;

/*
* The epoll handler
Expand Down
3 changes: 3 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#include <algorithm>

#include <glog/logging.h>

#include "include/build_version.h"
#include "include/pika_conf.h"
#include "include/pika_rm.h"
Expand Down Expand Up @@ -2302,6 +2304,7 @@ void QuitCmd::DoInitial() {

void QuitCmd::Do(std::shared_ptr<Partition> partition) {
res_.SetRes(CmdRes::kOk);
LOG(INFO) << "QutCmd will close connection " << GetConn()->String();
GetConn()->SetClose(true);
}

Expand Down
3 changes: 2 additions & 1 deletion src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "include/pika_conf.h"
#include "include/pika_server.h"
#include "pstd/include/testutil.h"

extern PikaConf* g_pika_conf;
extern PikaServer* g_pika_server;
Expand Down Expand Up @@ -56,7 +57,7 @@ bool PikaDispatchThread::Handles::AccessHandle(std::string& ip) const {
return false;
}

DLOG(INFO) << "new clinet comming, ip: " << ip;
DLOG(INFO) << "new client comming, ip: " << ip;
g_pika_server->incr_accumulative_connections();
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/pika_monitor_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ net::WriteStatus PikaMonitorThread::SendMessage(int32_t fd, std::string& message
ssize_t nwritten = 0, message_len_sended = 0, message_len_left = message.size();
while (message_len_left > 0) {
nwritten = write(fd, message.data() + message_len_sended, message_len_left);
if (nwritten == -1 && errno == EAGAIN) {
if (nwritten == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
// If the write buffer is full, but the client no longer consumes, it will
// get stuck in the loop and cause the entire Pika to block becase of monitor_mutex_protector_.
// So we put a limit on the number of retries
Expand Down
Loading

0 comments on commit f3314a4

Please sign in to comment.