Skip to content

Commit

Permalink
Fix microsecond handling in Apsara log parser (#1309)
Browse files Browse the repository at this point in the history
The parser previously used microsecond incorrectly in the time
cache. Now only the time in second is properly retrieved from the
cache. And the microsecond part is always parsed from log timestamps.

Additionally, a bounds check has been added to prevent out-of-range array access,
ensuring reliable operation.
  • Loading branch information
yyuuttaaoo authored Jan 3, 2024
1 parent 25b774d commit 6a3851a
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 65 deletions.
76 changes: 39 additions & 37 deletions core/processor/ProcessorParseApsaraNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ void ProcessorParseApsaraNative::Process(PipelineEventGroup& logGroup) {
const StringView& logPath = logGroup.GetMetadata(EventGroupMetaKey::LOG_FILE_PATH_RESOLVED);
EventsContainer& events = logGroup.MutableEvents();
StringView timeStrCache;
LogtailTime lastLogTime;
LogtailTime cachedLogTime;
// works good normally. poor performance if most data need to be discarded.
for (auto it = events.begin(); it != events.end();) {
if (ProcessEvent(logPath, *it, lastLogTime, timeStrCache)) {
if (ProcessEvent(logPath, *it, cachedLogTime, timeStrCache)) {
++it;
} else {
it = events.erase(it);
Expand All @@ -76,13 +76,13 @@ void ProcessorParseApsaraNative::Process(PipelineEventGroup& logGroup) {
* 处理单个日志事件。
* @param logPath - 日志文件的路径。
* @param e - 指向待处理日志事件的智能指针。
* @param lastLogTime - 上一条日志的时间戳(秒)。
* @param cachedLogTime - 上一条日志的时间戳(秒)。
* @param timeStrCache - 缓存时间字符串,用于比较和更新。
* @return 如果事件被处理且保留,则返回true,如果事件被丢弃,则返回false。
*/
bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
PipelineEventPtr& e,
LogtailTime& lastLogTime,
LogtailTime& cachedLogTime,
StringView& timeStrCache) {
if (!IsSupportedEvent(e)) {
return true;
Expand All @@ -97,7 +97,7 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
}
mProcParseInSizeBytes->Add(buffer.size());
int64_t logTime_in_micro = 0;
time_t logTime = ApsaraEasyReadLogTimeParser(buffer, timeStrCache, lastLogTime, logTime_in_micro);
time_t logTime = ApsaraEasyReadLogTimeParser(buffer, timeStrCache, cachedLogTime, logTime_in_micro);
if (logTime <= 0) // this case will handle empty apsara log line
{
StringView bufOut(buffer);
Expand Down Expand Up @@ -210,18 +210,19 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath,
/*
* 解析Apsara格式日志的时间。
* @param buffer - 包含日志数据的字符串视图。
* @param timeStr - 解析后的时间字符串
* @param lastLogTime - 上一条日志的时间戳(秒)。
* @param cachedTimeStr - 缓存的时间字符串
* @param cachedLogTime - 缓存的时间字符串的时间戳(秒),必须与cachedTimeStr同时修改
* @param microTime - 解析出的微秒时间戳。
* @return 解析出的时间戳(秒),如果解析失败,则返回0。
*/
time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffer,
StringView& timeStr,
LogtailTime& lastLogTime,
StringView& cachedTimeStr,
LogtailTime& cachedLogTime,
int64_t& microTime) {
if (buffer[0] != '[') {
return 0;
}
LogtailTime logTime = {};
if (buffer[1] == '1') // for normal time, e.g 1378882630, starts with '1'
{
int nanosecondLength = 0;
Expand All @@ -232,13 +233,13 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
}
// strTime is the content between '[' and ']' and ends with '\0'
std::string strTime = buffer.substr(1, pos).to_string();
auto strptimeResult = Strptime(strTime.c_str(), "%s", &lastLogTime, nanosecondLength);
auto strptimeResult = Strptime(strTime.c_str(), "%s", &logTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer)("timeformat", "%s"));
return 0;
}
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
return lastLogTime.tv_sec;
microTime = (int64_t)logTime.tv_sec * 1000000 + logTime.tv_nsec / 1000;
return logTime.tv_sec;
}
// test other date format case
{
Expand All @@ -249,37 +250,46 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
}
// strTime is the content between '[' and ']' and ends with '\0'
std::string strTime = buffer.substr(1, pos).to_string();
if (IsPrefixString(strTime.c_str(), timeStr) == true) {
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
return lastLogTime.tv_sec;
}
struct tm tm;
memset(&tm, 0, sizeof(tm));
int nanosecondLength = 0;
if (IsPrefixString(strTime, cachedTimeStr) == true) {
if (strTime.size() > cachedTimeStr.size()) {
auto strptimeResult
= Strptime(strTime.c_str() + cachedTimeStr.size() + 1, "%f", &logTime, nanosecondLength);
if (NULL == strptimeResult) {
LOG_WARNING(sLogger,
("parse apsara log time microsecond",
"fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S.%f"));
}
}
microTime = (int64_t)cachedLogTime.tv_sec * 1000000 + logTime.tv_nsec / 1000;
return cachedLogTime.tv_sec;
}
// parse second part
auto strptimeResult = Strptime(strTime.c_str(), "%Y-%m-%d %H:%M:%S", &lastLogTime, nanosecondLength);
auto strptimeResult = Strptime(strTime.c_str(), "%Y-%m-%d %H:%M:%S", &logTime, nanosecondLength);
if (NULL == strptimeResult) {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S"));
return 0;
}
// parse nanosecond part (optional)
if (*strptimeResult != '\0') {
strptimeResult = Strptime(strptimeResult + 1, "%f", &lastLogTime, nanosecondLength);
strptimeResult = Strptime(strptimeResult + 1, "%f", &logTime, nanosecondLength);
if (NULL == strptimeResult) {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S.%f"));
("parse apsara log time microsecond", "fail")("string", buffer)("timeformat",
"%Y-%m-%d %H:%M:%S.%f"));
}
}
logTime.tv_sec = logTime.tv_sec - mLogTimeZoneOffsetSecond;
microTime = (int64_t)logTime.tv_sec * 1000000 + logTime.tv_nsec / 1000;
// if the time is valid (strptime not return NULL), the date value size must be 19 ,like '2013-09-11 03:11:05'
timeStr = StringView(buffer.data() + 1, 19);
lastLogTime.tv_sec = lastLogTime.tv_sec - mLogTimeZoneOffsetSecond;
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
cachedTimeStr = StringView(buffer.data() + 1, 19);
cachedLogTime = logTime;
// TODO: deprecated
if (!mAdjustApsaraMicroTimezone) {
microTime = (int64_t)microTime + (int64_t)mLogTimeZoneOffsetSecond * (int64_t)1000000;
}
return lastLogTime.tv_sec;
return logTime.tv_sec;
}
}

Expand All @@ -289,16 +299,8 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
* @param prefix - 要检查的前缀。
* @return 如果字符串以指定前缀开头,则返回true;否则返回false。
*/
bool ProcessorParseApsaraNative::IsPrefixString(const char* all, const StringView& prefix) {
if (prefix.size() == 0)
return false;
for (size_t i = 0; i < prefix.size(); ++i) {
if (all[i] == '\0')
return false;
if (all[i] != prefix[i])
return false;
}
return true;
bool ProcessorParseApsaraNative::IsPrefixString(const std::string& all, const StringView& prefix) {
return !prefix.empty() && std::equal(prefix.begin(), prefix.end(), all.begin());
}

/*
Expand All @@ -314,14 +316,14 @@ static int32_t FindBaseFields(const StringView& buffer, int32_t beginIndexArray[
if (buffer[i] == '[') {
beginIndexArray[baseFieldNum] = i + 1;
} else if (buffer[i] == ']') {
if (buffer[i + 1] == '\t' || buffer[i + 1] == '\n') {
if (i + 1 == buffer.size() || buffer[i + 1] == '\t' || buffer[i + 1] == '\n') {
endIndexArray[baseFieldNum] = i;
baseFieldNum++;
}
if (baseFieldNum >= LogParser::MAX_BASE_FIELD_NUM) {
break;
}
if (buffer[i + 1] == '\t' && buffer[i + 2] != '[') {
if (buffer[i + 1] == '\t' && (i + 2 == buffer.size() || buffer[i + 2] != '[')) {
break;
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/processor/ProcessorParseApsaraNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ class ProcessorParseApsaraNative : public Processor {
private:
bool ProcessEvent(const StringView& logPath, PipelineEventPtr& e, LogtailTime& lastLogTime, StringView& timeStrCache);
void AddLog(const StringView& key, const StringView& value, LogEvent& targetEvent);
time_t ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime);
int32_t GetApsaraLogMicroTime(StringView& buffer);
bool IsPrefixString(const char* all, const StringView& prefix);
time_t
ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime);
bool IsPrefixString(const std::string& all, const StringView& prefix);
int32_t ParseApsaraBaseFields(const StringView& buffer, LogEvent& sourceEvent);

std::string mSourceKey;
Expand Down
2 changes: 1 addition & 1 deletion core/reader/SourceBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class SourceBuffer {
virtual ~SourceBuffer() {}
StringBuffer AllocateStringBuffer(size_t size) {
if (!mAllocator.IsInited()) {
if (!mAllocator.Init(size * 1.2)) {
if (!mAllocator.Init(std::max(4096UL, size_t(size * 1.2)))) {
return StringBuffer(nullptr, 0);
}; // TODO: better allocate strategy
}
Expand Down
13 changes: 7 additions & 6 deletions core/unittest/processor/LogFilterUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,14 +433,15 @@ class LogFilterUnittest : public ::testing::Test {
file << config;
file.close();
APSARA_TEST_TRUE_DESC(filterPtr->InitFilter("logtail_filter.json"), "Can't parse the filter config");
APSARA_TEST_EQUAL_DESC(filterPtr->mFilters.size(), 2, "The filter size should be 2");
APSARA_TEST_EQUAL_DESC(filterPtr->mFilters.size(), 2UL, "The filter size should be 2");
APSARA_TEST_TRUE_DESC(filterPtr->mFilters.find("123_proj_test") != filterPtr->mFilters.end(),
"The 123_proj_test should be a key of the filter");
APSARA_TEST_TRUE_DESC(filterPtr->mFilters.find("456_proj_test_1") != filterPtr->mFilters.end(),
"The 456_test_1 should be a key of the filter");
LogFilterRule* filterRulePtr = filterPtr->mFilters["123_proj_test"];
APSARA_TEST_EQUAL_DESC(filterRulePtr->FilterKeys.size(), 2, "The filter keys size should be 2");
APSARA_TEST_EQUAL_DESC(filterRulePtr->FilterRegs.size(), 2, "The filter regs size should be 2");
APSARA_TEST_EQUAL_DESC(filterRulePtr->FilterKeys.size(), 2UL, "The filter keys size should be 2");
APSARA_TEST_EQUAL_DESC(filterRulePtr->FilterRegs.size(), 2UL, "The filter regs size should be 2");
APSARA_TEST_EQUAL_DESC(filterRulePtr->FilterRegs.size(), 2UL, "The filter regs size should be 2");
APSARA_TEST_EQUAL_DESC(filterRulePtr->FilterKeys[0], "key1", "The filter key should be key1");
APSARA_TEST_EQUAL_DESC(filterRulePtr->FilterKeys[1], "key2", "The filter key should be key2");
APSARA_TEST_EQUAL_DESC(filterRulePtr->FilterRegs[0], regex("value1"), "The filter reg should be value1");
Expand Down Expand Up @@ -543,7 +544,7 @@ class LogFilterUnittest : public ::testing::Test {
logContentPtr->set_value("value2xxxxx");

indexs = filterPtr->Filter("999_proj", "", logGroup);
APSARA_TEST_EQUAL_DESC(indexs.size(), 2, "the filtered size should be 2");
APSARA_TEST_EQUAL_DESC(indexs.size(), 2UL, "the filtered size should be 2");
APSARA_TEST_EQUAL_DESC(indexs[0], 0, "the first item should be contained");
APSARA_TEST_EQUAL_DESC(indexs[1], 1, "the second item should be contained");
logGroup.Clear();
Expand Down Expand Up @@ -652,7 +653,7 @@ class LogFilterUnittest : public ::testing::Test {
if (log.find("key") != string::npos)
lines.push_back(log);
}
APSARA_TEST_EQUAL(lines.size(), 6);
APSARA_TEST_EQUAL(lines.size(), 6UL);
if (lines.size() == 6) {
for (int i = 0; i < 2; ++i) {
APSARA_TEST_TRUE_DESC(lines[i].find("value1") != string::npos, lines[i]);
Expand Down Expand Up @@ -1142,7 +1143,7 @@ class LogFilterUnittest : public ::testing::Test {
static LogFilter* filter = LogFilter::Instance();
std::vector<int32_t> index = filter->Filter(
logGroup, root, LogGroupContext("default_region", "ant-test-project", "test-logstore"));
APSARA_TEST_EQUAL_FATAL(index.size(), 1);
APSARA_TEST_EQUAL_FATAL(index.size(), 1UL);
APSARA_TEST_EQUAL(index[0], 1);
}

Expand Down
2 changes: 1 addition & 1 deletion core/unittest/processor/ProcessorFilterNativeUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ void ProcessorFilterNativeUnittest::TestBaseFilter() {
})";
APSARA_TEST_STREQ_FATAL(CompactJson(expectJson).c_str(), CompactJson(outJson).c_str());

APSARA_TEST_EQUAL_FATAL(2, processor.mProcFilterRecordsTotal->GetValue());
APSARA_TEST_EQUAL_FATAL(2UL, processor.mProcFilterRecordsTotal->GetValue());
}
{
const char* jsonStr = "{\n"
Expand Down
Loading

0 comments on commit 6a3851a

Please sign in to comment.