Skip to content

Commit

Permalink
bugfix binlog sender confict at multi-network interface (#234)
Browse files Browse the repository at this point in the history
* bugfix binlog sender confict at multi-network interface

* bugfix configing set maxclients has no effect

* set resource
       limits
  • Loading branch information
Leviathan1995 authored Mar 22, 2018
1 parent 745d4c1 commit bcba759
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 9 deletions.
2 changes: 2 additions & 0 deletions include/pika_binlog_sender_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class PikaBinlogSenderThread : public pink::Thread {
public:

PikaBinlogSenderThread(const std::string &ip, int port,
int64_t sid,
slash::SequentialFile *queue,
uint32_t filenum, uint64_t con_offset);

Expand Down Expand Up @@ -50,6 +51,7 @@ class PikaBinlogSenderThread : public pink::Thread {

std::string ip_;
int port_;
int64_t sid_;

int timeout_ms_;
pink::PinkCli *cli_;
Expand Down
4 changes: 4 additions & 0 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class PikaDispatchThread {
bool ClientKill(const std::string& ip_port);
void ClientKillAll();

void SetQueueLimit(int queue_limit) {
thread_rep_->SetQueueLimit(queue_limit);
}

private:
class ClientConnFactory : public pink::ConnFactory {
public:
Expand Down
1 change: 1 addition & 0 deletions include/pika_master_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class PikaMasterConn: public pink::RedisConn {
PikaMasterConn(int fd, std::string ip_port, void* worker_specific_data);
virtual int DealMessage(PikaCmdArgsType& argv, std::string* response);
private:
bool is_first_send_;
PikaBinlogReceiverThread* binlog_receiver_;
};

Expand Down
14 changes: 13 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ class PikaServer {
int master_port() {
return master_port_;
}

int64_t sid() {
return sid_;
}

void SetSid(int64_t sid) {
sid_ = sid;
}

const std::shared_ptr<nemo::Nemo> db() {
return db_;
}
Expand Down Expand Up @@ -184,7 +193,8 @@ class PikaServer {
*/
Binlog *logger_;
Status AddBinlogSender(const std::string& ip, int64_t port,
uint32_t filenum, uint64_t con_offset);
int64_t sid,
uint32_t filenum, uint64_t con_offset);

/*
* BGSave used
Expand Down Expand Up @@ -457,6 +467,8 @@ class PikaServer {
void ResetStat();
slash::RecordMutex mutex_record_;

void SetDispatchQueueLimit(int queue_limit);

private:
std::atomic<bool> exit_;
std::atomic<bool> binlog_io_error_;
Expand Down
4 changes: 3 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ void TrysyncCmd::Do() {
int64_t sid = g_pika_server->TryAddSlave(slave_ip_, slave_port_);
if (sid >= 0) {
Status status = g_pika_server->AddBinlogSender(slave_ip_, slave_port_,
filenum_, pro_offset_);
sid,
filenum_, pro_offset_);
if (status.ok()) {
res_.AppendInteger(sid);
LOG(INFO) << "Send Sid to Slave: " << sid;
Expand Down Expand Up @@ -1320,6 +1321,7 @@ void ConfigCmd::ConfigSet(std::string& ret) {
ret = "-ERR Invalid argument " + value + " for CONFIG SET 'maxclients'\r\n";
return;
}
g_pika_server->SetDispatchQueueLimit(ival);
g_pika_conf->SetMaxConnection(ival);
ret = "+OK\r\n";
} else if (set_item == "dump-expire") {
Expand Down
10 changes: 5 additions & 5 deletions src/pika_binlog_receiver_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ bool PikaBinlogReceiverThread::Handles::AccessHandle(std::string& ip) const {
if (ip == "127.0.0.1") {
ip = g_pika_server->host();
}
// if (binlog_receiver_->thread_rep_->conn_num() != 0 ||
if (!g_pika_server->ShouldAccessConnAsMaster(ip)) {
LOG(WARNING) << "BinlogReceiverThread AccessHandle failed: " << ip;
return false;
}
// if (binlog_receiver_->thread_rep_->conn_num() != 0 ||
// if (!g_pika_server->ShouldAccessConnAsMaster(ip)) {
// LOG(WARNING) << "BinlogReceiverThread AccessHandle failed: " << ip;
// return false;
// }
g_pika_server->PlusMasterConnection();
return true;
}
Expand Down
13 changes: 13 additions & 0 deletions src/pika_binlog_sender_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
extern PikaServer* g_pika_server;

PikaBinlogSenderThread::PikaBinlogSenderThread(const std::string &ip, int port,
int64_t sid,
slash::SequentialFile *queue,
uint32_t filenum,
uint64_t con_offset)
Expand All @@ -29,6 +30,7 @@ PikaBinlogSenderThread::PikaBinlogSenderThread(const std::string &ip, int port,
buffer_(),
ip_(ip),
port_(port),
sid_(sid),
timeout_ms_(35000) {
cli_ = pink::NewRedisCli();
last_record_offset_ = con_offset % kBlockSize;
Expand Down Expand Up @@ -257,6 +259,17 @@ void* PikaBinlogSenderThread::ThreadMain() {

if (result.ok()) {
cli_->set_send_timeout(timeout_ms_);
// Auth sid
std::string wbuf_str;
pink::RedisCmdArgsType argv;
argv.push_back("auth");
argv.push_back(std::to_string(sid_));
pink::SerializeRedisCommand(argv, &wbuf_str);
result = cli_->Send(&wbuf_str);
if (!result.ok()) {
LOG(WARNING) << "BinlogSender send slave(" << ip_ << ":" << port_ << ") failed, " << result.ToString();
break;
}
while (true) {
// 2. Should Parse new msg;
if (last_send_flag) {
Expand Down
14 changes: 14 additions & 0 deletions src/pika_master_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extern PikaConf* g_pika_conf;
PikaMasterConn::PikaMasterConn(int fd, std::string ip_port,
void* worker_specific_data)
: RedisConn(fd, ip_port, NULL) {
is_first_send_ = true;
binlog_receiver_ =
reinterpret_cast<PikaBinlogReceiverThread*>(worker_specific_data);
}
Expand All @@ -30,6 +31,19 @@ int PikaMasterConn::DealMessage(
return -2;
}

// Auth
if (is_first_send_) {
if (argv[0] == "auth") {
if (argv[1] == std::to_string(g_pika_server->sid())) {
is_first_send_ = false;
LOG(INFO) << "BinlogReceiverThread AccessHandle success, my sid: " << g_pika_server->sid() << " auth sid: " << argv[1];
return 0;
}
}
LOG(WARNING) << "BinlogReceiverThread AccessHandle failed";
return -2;
}

// TODO(shq) maybe monitor do not need these infomation
std::string server_id;
std::string binlog_info;
Expand Down
25 changes: 23 additions & 2 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <iterator>
#include <ctime>
#include <algorithm>
#include <sys/resource.h>

#include "slash/include/env.h"
#include "slash/include/rsync.h"
Expand Down Expand Up @@ -850,7 +851,8 @@ void PikaServer::DBSyncSendFile(const std::string& ip, int port) {
* BinlogSender
*/
Status PikaServer::AddBinlogSender(const std::string& ip, int64_t port,
uint32_t filenum, uint64_t con_offset) {
int64_t sid,
uint32_t filenum, uint64_t con_offset) {
// Sanity check
if (con_offset > logger_->file_size()) {
return Status::InvalidArgument("AddBinlogSender invalid binlog offset");
Expand Down Expand Up @@ -885,7 +887,7 @@ Status PikaServer::AddBinlogSender(const std::string& ip, int64_t port,
}

PikaBinlogSenderThread* sender = new PikaBinlogSenderThread(ip,
port + 1000, readfile, filenum, con_offset);
port + 1000, sid, readfile, filenum, con_offset);

if (sender->trim() == 0 // Error binlog
&& SetSlaveSender(ip, port, sender)) { // SlaveItem not exist
Expand Down Expand Up @@ -1720,6 +1722,25 @@ void PikaServer::ResetStat() {
statistic_data_.last_thread_querynum = 0;
}

void PikaServer::SetDispatchQueueLimit(int queue_limit) {
rlimit limit;
if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
LOG(WARNING) << "getrlimit error: " << strerror(errno);
} else if (limit.rlim_cur < static_cast<unsigned int>(g_pika_conf->maxclients() + PIKA_MIN_RESERVED_FDS)) {
rlim_t old_limit = limit.rlim_cur;
rlim_t best_limit = g_pika_conf->maxclients() + PIKA_MIN_RESERVED_FDS;
limit.rlim_cur = best_limit > limit.rlim_max ? limit.rlim_max-1 : best_limit;
limit.rlim_max = best_limit > limit.rlim_max ? limit.rlim_max-1 : best_limit;
if (setrlimit(RLIMIT_NOFILE,&limit) != -1) {
LOG(WARNING) << "your 'limit -n ' of " << old_limit << " is not enough for Redis to start. pika have successfully reconfig it to " << limit.rlim_cur;
} else {
LOG(FATAL) << "your 'limit -n ' of " << old_limit << " is not enough for Redis to start. pika can not reconfig it(" << strerror(errno) << "), do it by yourself";
}
}

pika_dispatch_thread_->SetQueueLimit(queue_limit);
}

uint64_t PikaServer::ServerCurrentQps() {
slash::ReadLock l(&statistic_data_.statistic_lock);
return statistic_data_.last_sec_thread_querynum;
Expand Down
1 change: 1 addition & 0 deletions src/pika_trysync_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ bool PikaTrysyncThread::RecvProc() {
slash::string2l(reply.data(), reply.size(), &sid_)) {
// Luckly, I got your point, the sync is comming
LOG(INFO) << "Recv sid from master: " << sid_;
g_pika_server->SetSid(sid_);
break;
}

Expand Down

0 comments on commit bcba759

Please sign in to comment.