From bdc51c6cc64debd64f3a182be8e35d9216e5b419 Mon Sep 17 00:00:00 2001 From: liuzhen3 Date: Fri, 24 Mar 2023 09:43:03 +0000 Subject: [PATCH 1/2] bugfix:coredump when pika open file number exceed ulimt #1345 --- src/pika_binlog.cc | 132 ++++++++++++++++++++++----------------------- 1 file changed, 64 insertions(+), 68 deletions(-) diff --git a/src/pika_binlog.cc b/src/pika_binlog.cc index f1f7efce89..a810858fba 100644 --- a/src/pika_binlog.cc +++ b/src/pika_binlog.cc @@ -5,10 +5,9 @@ #include "include/pika_binlog.h" -#include -#include #include - +#include +#include #include "include/pika_binlog_transverter.h" @@ -24,10 +23,7 @@ std::string NewFileName(const std::string name, const uint32_t current) { * Version */ Version::Version(slash::RWFile *save) - : pro_num_(0), - pro_offset_(0), - logic_id_(0), - save_(save) { + : pro_num_(0), pro_offset_(0), logic_id_(0), save_(save) { assert(save_ != NULL); pthread_rwlock_init(&rwlock_, NULL); @@ -53,10 +49,10 @@ Status Version::StableSave() { Status Version::Init() { Status s; if (save_->GetData() != NULL) { - memcpy((char*)(&pro_num_), save_->GetData(), sizeof(uint32_t)); - memcpy((char*)(&pro_offset_), save_->GetData() + 4, sizeof(uint64_t)); - memcpy((char*)(&logic_id_), save_->GetData() + 12, sizeof(uint64_t)); - memcpy((char*)(&term_), save_->GetData() + 20, sizeof(uint32_t)); + memcpy((char *)(&pro_num_), save_->GetData(), sizeof(uint32_t)); + memcpy((char *)(&pro_offset_), save_->GetData() + 4, sizeof(uint64_t)); + memcpy((char *)(&logic_id_), save_->GetData() + 12, sizeof(uint64_t)); + memcpy((char *)(&term_), save_->GetData() + 20, sizeof(uint32_t)); return Status::OK(); } else { return Status::Corruption("version init error"); @@ -66,21 +62,15 @@ Status Version::Init() { /* * Binlog */ -Binlog::Binlog(const std::string& binlog_path, const int file_size) : - opened_(false), - version_(NULL), - queue_(NULL), - versionfile_(NULL), - pro_num_(0), - pool_(NULL), - exit_all_consume_(false), - binlog_path_(binlog_path), - file_size_(file_size), - binlog_io_error_(false) { +Binlog::Binlog(const std::string &binlog_path, const int file_size) + : opened_(false), version_(NULL), queue_(NULL), versionfile_(NULL), + pro_num_(0), pool_(NULL), exit_all_consume_(false), + binlog_path_(binlog_path), file_size_(file_size), + binlog_io_error_(false) { // To intergrate with old version, we don't set mmap file size to 100M; - //slash::SetMmapBoundSize(file_size); - //slash::kMmapBoundSize = 1024 * 1024 * 100; + // slash::SetMmapBoundSize(file_size); + // slash::kMmapBoundSize = 1024 * 1024 * 100; Status s; @@ -99,7 +89,6 @@ Binlog::Binlog(const std::string& binlog_path, const int file_size) : LOG(FATAL) << "Binlog: new " << filename_ << " " << s.ToString(); } - s = slash::NewRWFile(manifest, &versionfile_); if (!s.ok()) { LOG(FATAL) << "Binlog: new versionfile error " << s.ToString(); @@ -117,7 +106,7 @@ Binlog::Binlog(const std::string& binlog_path, const int file_size) : pro_num_ = version_->pro_num_; // Debug - //version_->debug(); + // version_->debug(); } else { LOG(FATAL) << "Binlog: open versionfile error"; } @@ -126,7 +115,8 @@ Binlog::Binlog(const std::string& binlog_path, const int file_size) : DLOG(INFO) << "Binlog: open profile " << profile; s = slash::AppendWritableFile(profile, &queue_, version_->pro_offset_); if (!s.ok()) { - LOG(FATAL) << "Binlog: Open file " << profile << " error " << s.ToString(); + LOG(FATAL) << "Binlog: Open file " << profile << " error " + << s.ToString(); } uint64_t filesize = queue_->Filesize(); @@ -161,8 +151,8 @@ void Binlog::InitLogFile() { opened_.store(true); } -Status Binlog::GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, - uint32_t* term, uint64_t* logic_id) { +Status Binlog::GetProducerStatus(uint32_t *filenum, uint64_t *pro_offset, + uint32_t *term, uint64_t *logic_id) { if (!opened_.load()) { return Status::Busy("Binlog is not open yet"); } @@ -194,19 +184,22 @@ Status Binlog::Put(const std::string &item) { } // Note: mutex lock should be held -Status Binlog::Put(const char* item, int len) { +Status Binlog::Put(const char *item, int len) { Status s; /* Check to roll log file */ uint64_t filesize = queue_->Filesize(); if (filesize > file_size_) { + slash::WritableFile * queue = nullptr; + std::string profile = NewFileName(filename_, pro_num_ + 1); + s = slash::NewWritableFile(profile, &queue); + if (!s.ok()) { + LOG(ERROR) << "Binlog: new " << filename_ << " " << s.ToString(); + return s; + } delete queue_; - queue_ = NULL; - + queue_ = queue; pro_num_++; - std::string profile = NewFileName(filename_, pro_num_); - slash::NewWritableFile(profile, &queue_); - { slash::RWLock l(&(version_->rwlock_), true); version_->pro_offset_ = 0; @@ -227,38 +220,39 @@ Status Binlog::Put(const char* item, int len) { return s; } - -Status Binlog::EmitPhysicalRecord(RecordType t, const char *ptr, size_t n, int *temp_pro_offset) { - Status s; - assert(n <= 0xffffff); - assert(block_offset_ + kHeaderSize + n <= kBlockSize); - - char buf[kHeaderSize]; - - uint64_t now; - struct timeval tv; - gettimeofday(&tv, NULL); - now = tv.tv_sec; - buf[0] = static_cast(n & 0xff); - buf[1] = static_cast((n & 0xff00) >> 8); - buf[2] = static_cast(n >> 16); - buf[3] = static_cast(now & 0xff); - buf[4] = static_cast((now & 0xff00) >> 8); - buf[5] = static_cast((now & 0xff0000) >> 16); - buf[6] = static_cast((now & 0xff000000) >> 24); - buf[7] = static_cast(t); - - s = queue_->Append(Slice(buf, kHeaderSize)); + +Status Binlog::EmitPhysicalRecord(RecordType t, const char *ptr, size_t n, + int *temp_pro_offset) { + Status s; + assert(n <= 0xffffff); + assert(block_offset_ + kHeaderSize + n <= kBlockSize); + + char buf[kHeaderSize]; + + uint64_t now; + struct timeval tv; + gettimeofday(&tv, NULL); + now = tv.tv_sec; + buf[0] = static_cast(n & 0xff); + buf[1] = static_cast((n & 0xff00) >> 8); + buf[2] = static_cast(n >> 16); + buf[3] = static_cast(now & 0xff); + buf[4] = static_cast((now & 0xff00) >> 8); + buf[5] = static_cast((now & 0xff0000) >> 16); + buf[6] = static_cast((now & 0xff000000) >> 24); + buf[7] = static_cast(t); + + s = queue_->Append(Slice(buf, kHeaderSize)); + if (s.ok()) { + s = queue_->Append(Slice(ptr, n)); if (s.ok()) { - s = queue_->Append(Slice(ptr, n)); - if (s.ok()) { - s = queue_->Flush(); - } + s = queue_->Flush(); } - block_offset_ += static_cast(kHeaderSize + n); + } + block_offset_ += static_cast(kHeaderSize + n); - *temp_pro_offset += kHeaderSize + n; - return s; + *temp_pro_offset += kHeaderSize + n; + return s; } Status Binlog::Produce(const Slice &item, int *temp_pro_offset) { @@ -304,8 +298,8 @@ Status Binlog::Produce(const Slice &item, int *temp_pro_offset) { return s; } - -Status Binlog::AppendPadding(slash::WritableFile* file, uint64_t* len) { + +Status Binlog::AppendPadding(slash::WritableFile *file, uint64_t *len) { if (*len < kHeaderSize) { return Status::OK(); } @@ -346,12 +340,14 @@ Status Binlog::AppendPadding(slash::WritableFile* file, uint64_t* len) { } *len -= left; if (left != 0) { - LOG(WARNING) << "AppendPadding left bytes: " << left << " is less then kHeaderSize"; + LOG(WARNING) << "AppendPadding left bytes: " << left + << " is less then kHeaderSize"; } return s; } -Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term, uint64_t index) { +Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, + uint32_t term, uint64_t index) { if (!opened_.load()) { return Status::Busy("Binlog is not open yet"); } From 046d5120d6ff0265b17ed55d09b9b653f2679280 Mon Sep 17 00:00:00 2001 From: liuzhen3 Date: Mon, 27 Mar 2023 02:07:14 +0000 Subject: [PATCH 2/2] typo: delete redundance blank --- src/pika_binlog.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pika_binlog.cc b/src/pika_binlog.cc index a810858fba..798edd5e4f 100644 --- a/src/pika_binlog.cc +++ b/src/pika_binlog.cc @@ -49,10 +49,10 @@ Status Version::StableSave() { Status Version::Init() { Status s; if (save_->GetData() != NULL) { - memcpy((char *)(&pro_num_), save_->GetData(), sizeof(uint32_t)); - memcpy((char *)(&pro_offset_), save_->GetData() + 4, sizeof(uint64_t)); - memcpy((char *)(&logic_id_), save_->GetData() + 12, sizeof(uint64_t)); - memcpy((char *)(&term_), save_->GetData() + 20, sizeof(uint32_t)); + memcpy((char*)(&pro_num_), save_->GetData(), sizeof(uint32_t)); + memcpy((char*)(&pro_offset_), save_->GetData() + 4, sizeof(uint64_t)); + memcpy((char*)(&logic_id_), save_->GetData() + 12, sizeof(uint64_t)); + memcpy((char*)(&term_), save_->GetData() + 20, sizeof(uint32_t)); return Status::OK(); } else { return Status::Corruption("version init error");