diff --git a/core/reader/LogFileReader.cpp b/core/reader/LogFileReader.cpp index eca81002f1..97b621c8e3 100644 --- a/core/reader/LogFileReader.cpp +++ b/core/reader/LogFileReader.cpp @@ -1650,6 +1650,7 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, logBuffer.readOffset = mLastFilePos; --nbytes; } + mLastForceRead = !allowRollback; mCache.clear(); moreData = false; } else { @@ -1662,7 +1663,8 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, if (READ_BYTE < lastCacheSize) { READ_BYTE = lastCacheSize; // this should not happen, just avoid READ_BYTE >= 0 theoratically } - StringBuffer stringMemory = logBuffer.sourcebuffer->AllocateStringBuffer(READ_BYTE); // allocate modifiable buffer + StringBuffer stringMemory + = logBuffer.sourcebuffer->AllocateStringBuffer(READ_BYTE); // allocate modifiable buffer if (lastCacheSize) { READ_BYTE -= lastCacheSize; // reserve space to copy from cache if needed } @@ -1687,6 +1689,7 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, logBuffer.readOffset = mLastFilePos; --nbytes; } + mLastForceRead = !allowRollback; const size_t stringBufferLen = nbytes; logBuffer.truncateInfo.reset(truncateInfo); lastReadPos = mLastFilePos + nbytes; // this doesn't seem right when ulogfs is used and a hole is skipped @@ -1737,9 +1740,10 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, // cache is sealed, nbytes should no change any more size_t stringLen = nbytes; - if (stringBuffer[stringLen - 1] == '\n' - || stringBuffer[stringLen - 1] - == '\0') { // \0 is for json, such behavior make ilogtail not able to collect binary log + if (stringLen > 0 + && (stringBuffer[stringLen - 1] == '\n' + || stringBuffer[stringLen - 1] + == '\0')) { // \0 is for json, such behavior make ilogtail not able to collect binary log --stringLen; } stringBuffer[stringLen] = '\0'; @@ -1749,7 +1753,6 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, setExactlyOnceCheckpointAfterRead(nbytes); mLastFilePos += nbytes; - mLastForceRead = !allowRollback; LOG_DEBUG(sLogger, ("read size", nbytes)("last file pos", mLastFilePos)); } @@ -1774,6 +1777,7 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b logBuffer.readOffset = mLastFilePos; --readCharCount; } + mLastForceRead = !allowRollback; lastReadPos = mLastFilePos + readCharCount; originReadCount = readCharCount; moreData = false; @@ -1806,6 +1810,7 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b ++mLastFilePos; logBuffer.readOffset = mLastFilePos; } + mLastForceRead = !allowRollback; logBuffer.truncateInfo.reset(truncateInfo); lastReadPos = mLastFilePos + readCharCount; originReadCount = readCharCount; @@ -1887,9 +1892,10 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b } // cache is sealed, readCharCount should not change any more size_t stringLen = resultCharCount; - if (stringBuffer[stringLen - 1] == '\n' - || stringBuffer[stringLen - 1] - == '\0') { // \0 is for json, such behavior make ilogtail not able to collect binary log + if (stringLen > 0 + && (stringBuffer[stringLen - 1] == '\n' + || stringBuffer[stringLen - 1] + == '\0')) { // \0 is for json, such behavior make ilogtail not able to collect binary log --stringLen; } stringBuffer[stringLen] = '\0'; @@ -1911,7 +1917,6 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b LogtailAlarm::GetInstance()->SendAlarm( SPLIT_LOG_FAIL_ALARM, oss.str(), GetProject(), GetLogstore(), GetRegion()); } - mLastForceRead = !allowRollback; LOG_DEBUG(sLogger, ("read gbk buffer, offset", mLastFilePos)("origin read", originReadCount)("at last read", readCharCount)); } @@ -2032,7 +2037,7 @@ LogFileReader::FileCompareResult LogFileReader::CompareToFile(const string& file */ int32_t LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& rollbackLineFeedCount, bool allowRollback) { - if (!allowRollback) { + if (!allowRollback || size == 0) { return size; } int32_t endPs; // the position of \n or \0 @@ -2087,7 +2092,7 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll */ StringView LogFileReader::GetLastLine(StringView buffer, size_t end) { if (end == 0) { - return buffer; + return StringView(buffer.data(), 0); } for (size_t begin = end; begin > 0; --begin) { diff --git a/core/reader/LogFileReader.h b/core/reader/LogFileReader.h index 7c31ffb5f3..2eda081a28 100644 --- a/core/reader/LogFileReader.h +++ b/core/reader/LogFileReader.h @@ -591,6 +591,7 @@ class LogFileReader { friend class LogSplitNoDiscardUnmatchUnittest; friend class RemoveLastIncompleteLogMultilineUnittest; friend class LogFileReaderCheckpointUnittest; + friend class GetLastLineUnittest; protected: void UpdateReaderManual(); diff --git a/core/unittest/reader/CMakeLists.txt b/core/unittest/reader/CMakeLists.txt index 22370e9c3a..8c51ed375e 100644 --- a/core/unittest/reader/CMakeLists.txt +++ b/core/unittest/reader/CMakeLists.txt @@ -24,8 +24,8 @@ target_link_libraries(file_reader_options_unittest unittest_base) add_executable(json_log_file_reader_unittest JsonLogFileReaderUnittest.cpp) target_link_libraries(json_log_file_reader_unittest unittest_base) -add_executable(last_matched_line_unittest RemoveLastIncompleteLogUnittest.cpp) -target_link_libraries(last_matched_line_unittest unittest_base) +add_executable(remove_last_incomplete_log_unittest RemoveLastIncompleteLogUnittest.cpp) +target_link_libraries(remove_last_incomplete_log_unittest unittest_base) add_executable(log_file_reader_unittest LogFileReaderUnittest.cpp) target_link_libraries(log_file_reader_unittest unittest_base) @@ -45,6 +45,6 @@ include(GoogleTest) gtest_discover_tests(log_file_reader_deleted_file_unittest) gtest_discover_tests(file_reader_options_unittest) gtest_discover_tests(json_log_file_reader_unittest) -gtest_discover_tests(last_matched_line_unittest) +gtest_discover_tests(remove_last_incomplete_log_unittest) gtest_discover_tests(log_file_reader_unittest) gtest_discover_tests(source_buffer_unittest) diff --git a/core/unittest/reader/LogFileReaderUnittest.cpp b/core/unittest/reader/LogFileReaderUnittest.cpp index 492eca2db7..a37df6956e 100644 --- a/core/unittest/reader/LogFileReaderUnittest.cpp +++ b/core/unittest/reader/LogFileReaderUnittest.cpp @@ -12,16 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "unittest/Unittest.h" #include + #include + #include "checkpoint/CheckPointManager.h" -#include "reader/LogFileReader.h" -#include "common/memory/SourceBuffer.h" -#include "common/RuntimeUtil.h" #include "common/FileSystemUtil.h" -#include "log_pb/sls_logs.pb.h" +#include "common/RuntimeUtil.h" +#include "common/memory/SourceBuffer.h" #include "file_server/FileServer.h" +#include "log_pb/sls_logs.pb.h" +#include "reader/LogFileReader.h" +#include "unittest/Unittest.h" DECLARE_FLAG_INT32(force_release_deleted_file_fd_timeout); @@ -237,6 +239,55 @@ void LogFileReaderUnittest::TestReadGBK() { APSARA_TEST_FALSE_FATAL(moreData); APSARA_TEST_STREQ_FATAL(NULL, logBuffer.rawBuffer.data()); } + { // force read + \n, which case read bytes is 0 + Json::Value config; + config["StartPattern"] = "iLogtail.*"; + MultilineOptions multilineOpts; + multilineOpts.Init(config, ctx, ""); + FileReaderOptions readerOpts; + readerOpts.mFileEncoding = FileReaderOptions::Encoding::GBK; + LogFileReader reader( + logPathDir, gbkFile, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + reader.UpdateReaderManual(); + reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); + int64_t fileSize = reader.mLogFileOp.GetFileSize(); + reader.CheckFileSignatureAndOffset(true); + LogBuffer logBuffer; + bool moreData = false; + std::string expectedPart(expectedContent.get()); + // first read, read first line without \n and not allowRollback + int64_t firstReadSize = expectedPart.find("\n"); + expectedPart.resize(firstReadSize); + reader.ReadGBK(logBuffer, 127, moreData, false); // first line without \n + APSARA_TEST_FALSE_FATAL(moreData); + APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data()); + APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL); + APSARA_TEST_TRUE_FATAL(reader.mLastForceRead); + + // second read, start with \n but with other lines + reader.ReadGBK(logBuffer, fileSize - 1, moreData); + APSARA_TEST_FALSE_FATAL(moreData); + APSARA_TEST_GE_FATAL(reader.mCache.size(), 0UL); + std::string expectedPart2(expectedContent.get() + firstReadSize + 1); // skip \n + int64_t secondReadSize = expectedPart2.rfind("iLogtail") - 1; + expectedPart2.resize(secondReadSize); + APSARA_TEST_STREQ_FATAL(expectedPart2.c_str(), logBuffer.rawBuffer.data()); + APSARA_TEST_FALSE_FATAL(reader.mLastForceRead); + + // third read, force read cache + reader.ReadGBK(logBuffer, fileSize - 1, moreData, false); + std::string expectedPart3(expectedContent.get() + firstReadSize + 1 + secondReadSize + 1); + APSARA_TEST_STREQ_FATAL(expectedPart3.c_str(), logBuffer.rawBuffer.data()); + APSARA_TEST_TRUE_FATAL(reader.mLastForceRead); + + // fourth read, only read \n + LogBuffer logBuffer2; + reader.ReadGBK(logBuffer2, fileSize, moreData); + APSARA_TEST_FALSE_FATAL(moreData); + APSARA_TEST_GE_FATAL(reader.mCache.size(), 0UL); + APSARA_TEST_EQUAL_FATAL(fileSize, reader.mLastFilePos); + APSARA_TEST_STREQ_FATAL(NULL, logBuffer2.rawBuffer.data()); + } } void LogFileReaderUnittest::TestReadUTF8() { @@ -383,6 +434,55 @@ void LogFileReaderUnittest::TestReadUTF8() { APSARA_TEST_FALSE_FATAL(moreData); APSARA_TEST_STREQ_FATAL(NULL, logBuffer.rawBuffer.data()); } + { // force read + \n, which case read bytes is 0 + Json::Value config; + config["StartPattern"] = "iLogtail.*"; + MultilineOptions multilineOpts; + multilineOpts.Init(config, ctx, ""); + FileReaderOptions readerOpts; + LogFileReader reader( + logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); + reader.UpdateReaderManual(); + reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); + int64_t fileSize = reader.mLogFileOp.GetFileSize(); + reader.CheckFileSignatureAndOffset(true); + LogBuffer logBuffer; + bool moreData = false; + std::string expectedPart(expectedContent.get()); + // first read, read first line without \n and not allowRollback + int64_t firstReadSize = expectedPart.find("\n"); + expectedPart.resize(firstReadSize); + reader.mLastForceRead = true; + reader.ReadUTF8(logBuffer, firstReadSize, moreData, false); + APSARA_TEST_FALSE_FATAL(moreData); + APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data()); + APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL); + APSARA_TEST_TRUE_FATAL(reader.mLastForceRead); + + // second read, start with \n but with other lines + reader.ReadUTF8(logBuffer, fileSize - 1, moreData); + APSARA_TEST_FALSE_FATAL(moreData); + APSARA_TEST_GE_FATAL(reader.mCache.size(), 0UL); + std::string expectedPart2(expectedContent.get() + firstReadSize + 1); // skip \n + int64_t secondReadSize = expectedPart2.rfind("iLogtail") - 1; + expectedPart2.resize(secondReadSize); + APSARA_TEST_STREQ_FATAL(expectedPart2.c_str(), logBuffer.rawBuffer.data()); + APSARA_TEST_FALSE_FATAL(reader.mLastForceRead); + + // third read, force read cache + reader.ReadUTF8(logBuffer, fileSize - 1, moreData, false); + std::string expectedPart3(expectedContent.get() + firstReadSize + 1 + secondReadSize + 1); + APSARA_TEST_STREQ_FATAL(expectedPart3.c_str(), logBuffer.rawBuffer.data()); + APSARA_TEST_TRUE_FATAL(reader.mLastForceRead); + + // fourth read, only read \n + LogBuffer logBuffer2; + reader.ReadUTF8(logBuffer2, fileSize, moreData); + APSARA_TEST_FALSE_FATAL(moreData); + APSARA_TEST_GE_FATAL(reader.mCache.size(), 0UL); + APSARA_TEST_EQUAL_FATAL(fileSize, reader.mLastFilePos); + APSARA_TEST_STREQ_FATAL(NULL, logBuffer2.rawBuffer.data()); + } } class LogMultiBytesUnittest : public ::testing::Test { diff --git a/core/unittest/reader/RemoveLastIncompleteLogUnittest.cpp b/core/unittest/reader/RemoveLastIncompleteLogUnittest.cpp index 076ab9c1df..d55c74ba99 100644 --- a/core/unittest/reader/RemoveLastIncompleteLogUnittest.cpp +++ b/core/unittest/reader/RemoveLastIncompleteLogUnittest.cpp @@ -13,8 +13,8 @@ // limitations under the License. #include "common/FileSystemUtil.h" -#include "reader/LogFileReader.h" #include "common/memory/SourceBuffer.h" +#include "reader/LogFileReader.h" #include "unittest/Unittest.h" namespace logtail { @@ -122,6 +122,15 @@ void RemoveLastIncompleteLogUnittest::TestSingleline() { // return the whole buffer, so no rollback APSARA_TEST_EQUAL_FATAL(1, rollbackLineFeedCount); } + { // case empty string + std::string expectMatch = ""; + int32_t rollbackLineFeedCount = 0; + std::string testLog2 = expectMatch + ""; + int32_t matchSize = logFileReader.RemoveLastIncompleteLog( + const_cast(testLog2.data()), testLog2.size(), rollbackLineFeedCount); + APSARA_TEST_EQUAL_FATAL(int32_t(expectMatch.size()), matchSize); + APSARA_TEST_EQUAL_FATAL(0, rollbackLineFeedCount); + } } void RemoveLastIncompleteLogUnittest::TestMultiline() { @@ -131,13 +140,13 @@ void RemoveLastIncompleteLogUnittest::TestMultiline() { multilineOpts.Init(config, ctx, ""); LogFileReader logFileReader( logPathDir, utf8File, DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(&multilineOpts, &ctx)); - int32_t rollbackLineFeedCount = 0; { // case multi line std::vector index; std::string firstLog = LOG_BEGIN_STRING + "first.\nmultiline1\nmultiline2"; std::string secondLog = LOG_BEGIN_STRING + "second.\nmultiline1\nmultiline2"; std::string expectMatch = firstLog + '\n'; std::string testLog = expectMatch + secondLog + '\n'; + int32_t rollbackLineFeedCount = 0; int32_t matchSize = logFileReader.RemoveLastIncompleteLog( const_cast(testLog.data()), testLog.size(), rollbackLineFeedCount); APSARA_TEST_EQUAL_FATAL(static_cast(expectMatch.size()), matchSize); @@ -150,6 +159,7 @@ void RemoveLastIncompleteLogUnittest::TestMultiline() { std::string secondLog = LOG_BEGIN_STRING + "second.\nmultiline1\nmultiline2"; std::string expectMatch = firstLog + '\n'; std::string testLog = expectMatch + secondLog; + int32_t rollbackLineFeedCount = 0; int32_t matchSize = logFileReader.RemoveLastIncompleteLog( const_cast(testLog.data()), testLog.size(), rollbackLineFeedCount); APSARA_TEST_EQUAL_FATAL(static_cast(expectMatch.size()), matchSize); @@ -158,6 +168,7 @@ void RemoveLastIncompleteLogUnittest::TestMultiline() { } { // case multi line not match std::string testLog2 = "log begin does not match.\nlog begin does not match.\nlog begin does not match.\n"; + int32_t rollbackLineFeedCount = 0; int32_t matchSize = logFileReader.RemoveLastIncompleteLog( const_cast(testLog2.data()), testLog2.size(), rollbackLineFeedCount); APSARA_TEST_EQUAL_FATAL(testLog2.size(), matchSize); @@ -166,11 +177,21 @@ void RemoveLastIncompleteLogUnittest::TestMultiline() { { // case multi line not match, buffer size not big enough (no new line at the end of line) std::string expectMatch = "log begin does not match.\nlog begin does not match.\n"; std::string testLog2 = expectMatch + "log begin does not"; + int32_t rollbackLineFeedCount = 0; int32_t matchSize = logFileReader.RemoveLastIncompleteLog( const_cast(testLog2.data()), testLog2.size(), rollbackLineFeedCount); APSARA_TEST_EQUAL_FATAL(expectMatch.size(), matchSize); APSARA_TEST_EQUAL_FATAL(1, rollbackLineFeedCount); } + { // case empty string + std::string expectMatch = ""; + int32_t rollbackLineFeedCount = 0; + std::string testLog2 = expectMatch + ""; + int32_t matchSize = logFileReader.RemoveLastIncompleteLog( + const_cast(testLog2.data()), testLog2.size(), rollbackLineFeedCount); + APSARA_TEST_EQUAL_FATAL(int32_t(expectMatch.size()), matchSize); + APSARA_TEST_EQUAL_FATAL(0, rollbackLineFeedCount); + } } class RemoveLastIncompleteLogMultilineUnittest : public ::testing::Test { @@ -425,6 +446,38 @@ void RemoveLastIncompleteLogMultilineUnittest::TestRemoveLastIncompleteLogWithEn } } +class GetLastLineUnittest : public ::testing::Test { +public: + void TestGetLastLine(); + void TestGetLastLineEmpty(); + +private: + FileReaderOptions readerOpts; + PipelineContext ctx; +}; + +UNIT_TEST_CASE(GetLastLineUnittest, TestGetLastLine); +UNIT_TEST_CASE(GetLastLineUnittest, TestGetLastLineEmpty); + +void GetLastLineUnittest::TestGetLastLine() { + std::string testLog = "first line\nsecond line\nthird line"; + LogFileReader logFileReader( + "dir", "file", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(nullptr, &ctx)); + auto lastLine = logFileReader.GetLastLine(const_cast(testLog.data()), testLog.size()); + std::string expectLog = "third line"; + APSARA_TEST_EQUAL_FATAL(expectLog, std::string(lastLine.data(), lastLine.size())); +} + +void GetLastLineUnittest::TestGetLastLineEmpty() { + std::string testLog = ""; + LogFileReader logFileReader( + "dir", "file", DevInode(), std::make_pair(&readerOpts, &ctx), std::make_pair(nullptr, &ctx)); + auto lastLine = logFileReader.GetLastLine(const_cast(testLog.data()), testLog.size()); + APSARA_TEST_EQUAL_FATAL(0, lastLine.size()); + APSARA_TEST_EQUAL_FATAL("", std::string(lastLine.data(), lastLine.size())); + APSARA_TEST_EQUAL_FATAL(testLog.data(), lastLine.data()); +} + } // namespace logtail UNIT_TEST_MAIN