Skip to content

Commit

Permalink
bugfix[OpenAtomFoundation#1345]:coredump when pika open file number e…
Browse files Browse the repository at this point in the history
…xceed ulimt (OpenAtomFoundation#1346)

* bugfix:coredump when pika open file number exceed ulimt OpenAtomFoundation#1345

* typo: delete redundance blank

---------

Co-authored-by: liuzhen3 <[email protected]>
  • Loading branch information
kernelai and liuzhen3 authored Mar 27, 2023
1 parent 0abbea5 commit 066ee16
Showing 1 changed file with 60 additions and 64 deletions.
124 changes: 60 additions & 64 deletions src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

#include "include/pika_binlog.h"

#include <sys/time.h>
#include <glog/logging.h>
#include <fcntl.h>

#include <glog/logging.h>
#include <sys/time.h>

#include "include/pika_binlog_transverter.h"

Expand All @@ -24,10 +23,7 @@ std::string NewFileName(const std::string name, const uint32_t current) {
* Version
*/
Version::Version(slash::RWFile *save)
: pro_num_(0),
pro_offset_(0),
logic_id_(0),
save_(save) {
: pro_num_(0), pro_offset_(0), logic_id_(0), save_(save) {
assert(save_ != NULL);

pthread_rwlock_init(&rwlock_, NULL);
Expand Down Expand Up @@ -66,21 +62,15 @@ Status Version::Init() {
/*
* Binlog
*/
Binlog::Binlog(const std::string& binlog_path, const int file_size) :
opened_(false),
version_(NULL),
queue_(NULL),
versionfile_(NULL),
pro_num_(0),
pool_(NULL),
exit_all_consume_(false),
binlog_path_(binlog_path),
file_size_(file_size),
binlog_io_error_(false) {
Binlog::Binlog(const std::string &binlog_path, const int file_size)
: opened_(false), version_(NULL), queue_(NULL), versionfile_(NULL),
pro_num_(0), pool_(NULL), exit_all_consume_(false),
binlog_path_(binlog_path), file_size_(file_size),
binlog_io_error_(false) {

// To intergrate with old version, we don't set mmap file size to 100M;
//slash::SetMmapBoundSize(file_size);
//slash::kMmapBoundSize = 1024 * 1024 * 100;
// slash::SetMmapBoundSize(file_size);
// slash::kMmapBoundSize = 1024 * 1024 * 100;

Status s;

Expand All @@ -99,7 +89,6 @@ Binlog::Binlog(const std::string& binlog_path, const int file_size) :
LOG(FATAL) << "Binlog: new " << filename_ << " " << s.ToString();
}


s = slash::NewRWFile(manifest, &versionfile_);
if (!s.ok()) {
LOG(FATAL) << "Binlog: new versionfile error " << s.ToString();
Expand All @@ -117,7 +106,7 @@ Binlog::Binlog(const std::string& binlog_path, const int file_size) :
pro_num_ = version_->pro_num_;

// Debug
//version_->debug();
// version_->debug();
} else {
LOG(FATAL) << "Binlog: open versionfile error";
}
Expand All @@ -126,7 +115,8 @@ Binlog::Binlog(const std::string& binlog_path, const int file_size) :
DLOG(INFO) << "Binlog: open profile " << profile;
s = slash::AppendWritableFile(profile, &queue_, version_->pro_offset_);
if (!s.ok()) {
LOG(FATAL) << "Binlog: Open file " << profile << " error " << s.ToString();
LOG(FATAL) << "Binlog: Open file " << profile << " error "
<< s.ToString();
}

uint64_t filesize = queue_->Filesize();
Expand Down Expand Up @@ -161,8 +151,8 @@ void Binlog::InitLogFile() {
opened_.store(true);
}

Status Binlog::GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset,
uint32_t* term, uint64_t* logic_id) {
Status Binlog::GetProducerStatus(uint32_t *filenum, uint64_t *pro_offset,
uint32_t *term, uint64_t *logic_id) {
if (!opened_.load()) {
return Status::Busy("Binlog is not open yet");
}
Expand Down Expand Up @@ -194,19 +184,22 @@ Status Binlog::Put(const std::string &item) {
}

// Note: mutex lock should be held
Status Binlog::Put(const char* item, int len) {
Status Binlog::Put(const char *item, int len) {
Status s;

/* Check to roll log file */
uint64_t filesize = queue_->Filesize();
if (filesize > file_size_) {
slash::WritableFile * queue = nullptr;
std::string profile = NewFileName(filename_, pro_num_ + 1);
s = slash::NewWritableFile(profile, &queue);
if (!s.ok()) {
LOG(ERROR) << "Binlog: new " << filename_ << " " << s.ToString();
return s;
}
delete queue_;
queue_ = NULL;

queue_ = queue;
pro_num_++;
std::string profile = NewFileName(filename_, pro_num_);
slash::NewWritableFile(profile, &queue_);

{
slash::RWLock l(&(version_->rwlock_), true);
version_->pro_offset_ = 0;
Expand All @@ -227,38 +220,39 @@ Status Binlog::Put(const char* item, int len) {

return s;
}

Status Binlog::EmitPhysicalRecord(RecordType t, const char *ptr, size_t n, int *temp_pro_offset) {
Status s;
assert(n <= 0xffffff);
assert(block_offset_ + kHeaderSize + n <= kBlockSize);

char buf[kHeaderSize];

uint64_t now;
struct timeval tv;
gettimeofday(&tv, NULL);
now = tv.tv_sec;
buf[0] = static_cast<char>(n & 0xff);
buf[1] = static_cast<char>((n & 0xff00) >> 8);
buf[2] = static_cast<char>(n >> 16);
buf[3] = static_cast<char>(now & 0xff);
buf[4] = static_cast<char>((now & 0xff00) >> 8);
buf[5] = static_cast<char>((now & 0xff0000) >> 16);
buf[6] = static_cast<char>((now & 0xff000000) >> 24);
buf[7] = static_cast<char>(t);

s = queue_->Append(Slice(buf, kHeaderSize));

Status Binlog::EmitPhysicalRecord(RecordType t, const char *ptr, size_t n,
int *temp_pro_offset) {
Status s;
assert(n <= 0xffffff);
assert(block_offset_ + kHeaderSize + n <= kBlockSize);

char buf[kHeaderSize];

uint64_t now;
struct timeval tv;
gettimeofday(&tv, NULL);
now = tv.tv_sec;
buf[0] = static_cast<char>(n & 0xff);
buf[1] = static_cast<char>((n & 0xff00) >> 8);
buf[2] = static_cast<char>(n >> 16);
buf[3] = static_cast<char>(now & 0xff);
buf[4] = static_cast<char>((now & 0xff00) >> 8);
buf[5] = static_cast<char>((now & 0xff0000) >> 16);
buf[6] = static_cast<char>((now & 0xff000000) >> 24);
buf[7] = static_cast<char>(t);

s = queue_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = queue_->Append(Slice(ptr, n));
if (s.ok()) {
s = queue_->Append(Slice(ptr, n));
if (s.ok()) {
s = queue_->Flush();
}
s = queue_->Flush();
}
block_offset_ += static_cast<int>(kHeaderSize + n);
}
block_offset_ += static_cast<int>(kHeaderSize + n);

*temp_pro_offset += kHeaderSize + n;
return s;
*temp_pro_offset += kHeaderSize + n;
return s;
}

Status Binlog::Produce(const Slice &item, int *temp_pro_offset) {
Expand Down Expand Up @@ -304,8 +298,8 @@ Status Binlog::Produce(const Slice &item, int *temp_pro_offset) {

return s;
}
Status Binlog::AppendPadding(slash::WritableFile* file, uint64_t* len) {

Status Binlog::AppendPadding(slash::WritableFile *file, uint64_t *len) {
if (*len < kHeaderSize) {
return Status::OK();
}
Expand Down Expand Up @@ -346,12 +340,14 @@ Status Binlog::AppendPadding(slash::WritableFile* file, uint64_t* len) {
}
*len -= left;
if (left != 0) {
LOG(WARNING) << "AppendPadding left bytes: " << left << " is less then kHeaderSize";
LOG(WARNING) << "AppendPadding left bytes: " << left
<< " is less then kHeaderSize";
}
return s;
}

Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t term, uint64_t index) {
Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset,
uint32_t term, uint64_t index) {
if (!opened_.load()) {
return Status::Busy("Binlog is not open yet");
}
Expand Down

0 comments on commit 066ee16

Please sign in to comment.