Skip to content

Commit

Permalink
fix timeout force read when still can read (#1551)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc authored Jun 19, 2024
1 parent edb7ff0 commit 550d736
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 32 deletions.
8 changes: 5 additions & 3 deletions core/event/BlockEventManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

#include "BlockEventManager.h"
#include "processor/daemon/LogProcess.h"

#include "common/HashUtil.h"
#include "common/StringTools.h"
#include "polling/PollingEventQueue.h"
#include "processor/daemon/LogProcess.h"

DEFINE_FLAG_INT32(max_block_event_timeout, "max block event timeout, seconds", 3);

Expand Down Expand Up @@ -48,8 +49,9 @@ void BlockedEventManager::UpdateBlockEvent(const LogstoreFeedBackKey& logstoreKe
.append(pEvent->GetConfigName());
hashKey = HashSignatureString(key.c_str(), key.size());
}
// LOG_DEBUG(sLogger, ("Add block event ", pEvent->GetSource())(pEvent->GetObject(),
// pEvent->GetInode())(pEvent->GetConfigName(), hashKey));
LOG_DEBUG(sLogger,
("Add block event ", pEvent->GetSource())(pEvent->GetObject(),
pEvent->GetInode())(pEvent->GetConfigName(), hashKey));
ScopedSpinLock lock(mLock);
mBlockEventMap[hashKey].Update(logstoreKey, pEvent, curTime);
}
Expand Down
54 changes: 32 additions & 22 deletions core/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1001,28 +1001,26 @@ bool LogFileReader::ReadLog(LogBuffer& logBuffer, const Event* event) {
}

size_t lastFilePos = mLastFilePos;
bool allowRollback = true;
bool tryRollback = true;
if (event != nullptr && event->IsReaderFlushTimeout()) {
// If flush timeout event, we should filter whether the event is legacy.
if (event->GetLastReadPos() == GetLastReadPos() && event->GetLastFilePos() == mLastFilePos
&& event->GetInode() == mDevInode.inode) {
allowRollback = false;
// For the scenario: log rotation, the last line needs to be read by timeout, which is a normal situation.
// So here only local warning is given, don't raise alarm.
LOG_WARNING(sLogger,
("read log", "timeout")("project", GetProject())("logstore", GetLogstore())(
"config", GetConfigName())("log reader queue name", mHostLogPath)("log path", mRealLogPath)(
"file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))(
"file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)(
"last file position", mLastFilePos)("last file size", mLastFileSize)(
"read size", mLastFilePos - lastFilePos)("log", logBuffer.rawBuffer));
tryRollback = false;
} else {
return false;
}
}
bool moreData = GetRawData(logBuffer, mLastFileSize, allowRollback);
if (!allowRollback) {
// For the scenario: log rotation, the last line needs to be read by timeout, which is a normal situation.
// So here only local warning is given, don't raise alarm.
LOG_WARNING(sLogger,
("read timeout", "force read")("project", GetProject())("logstore", GetLogstore())(
"config", GetConfigName())("log reader queue name", mHostLogPath)("log path", mRealLogPath)(
"file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))(
"file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)(
"last file position", mLastFilePos)("last file size", mLastFileSize)(
"read size", mLastFilePos - lastFilePos)("log", logBuffer.rawBuffer));
}
bool moreData = GetRawData(logBuffer, mLastFileSize, tryRollback);
if (!logBuffer.rawBuffer.empty() > 0) {
if (mEOOption) {
// This read was replayed by checkpoint, adjust mLastFilePos to skip hole.
Expand All @@ -1035,7 +1033,8 @@ bool LogFileReader::ReadLog(LogBuffer& logBuffer, const Event* event) {
LOG_DEBUG(sLogger,
("read log file", mRealLogPath)("last file pos", mLastFilePos)("last file size", mLastFileSize)(
"read size", mLastFilePos - lastFilePos));
if (HasDataInCache()) {
if (HasDataInCache() && GetLastReadPos() == mLastFileSize) {
LOG_DEBUG(sLogger, ("add timeout event", mRealLogPath));
auto event = CreateFlushTimeoutEvent();
BlockedEventManager::GetInstance()->UpdateBlockEvent(
GetLogstoreKey(), GetConfigName(), *event, mDevInode, time(NULL) + mReaderConfig.first->mFlushTimeoutSecs);
Expand Down Expand Up @@ -1528,17 +1527,17 @@ bool LogFileReader::GetLogTimeByOffset(const char* buffer,
* "SingleLineLog_1\nSingleLineLog_2\nSingleLineLog_3\n" -> "SingleLineLog_1\nSingleLineLog_2\nSingleLineLog_3\0"
* "SingleLineLog_1\nSingleLineLog_2\nxxx" -> "SingleLineLog_1\nSingleLineLog_2\0"
*/
bool LogFileReader::GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool allowRollback) {
bool LogFileReader::GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool tryRollback) {
// Truncate, return false to indicate no more data.
if (fileSize == mLastFilePos) {
return false;
}

bool moreData = false;
if (mReaderConfig.first->mFileEncoding == FileReaderOptions::Encoding::GBK)
ReadGBK(logBuffer, fileSize, moreData, allowRollback);
ReadGBK(logBuffer, fileSize, moreData, tryRollback);
else
ReadUTF8(logBuffer, fileSize, moreData, allowRollback);
ReadUTF8(logBuffer, fileSize, moreData, tryRollback);

int64_t delta = fileSize - mLastFilePos;
if (delta > mReaderConfig.first->mReadDelayAlertThresholdBytes && !logBuffer.rawBuffer.empty()) {
Expand Down Expand Up @@ -1643,7 +1642,7 @@ void LogFileReader::setExactlyOnceCheckpointAfterRead(size_t readSize) {
cpt.set_read_length(readSize);
}

void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback) {
void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback) {
char* stringBuffer = nullptr;
size_t nbytes = 0;

Expand All @@ -1661,7 +1660,7 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
logBuffer.readOffset = mLastFilePos;
--nbytes;
}
mLastForceRead = !allowRollback;
mLastForceRead = true;
mCache.clear();
moreData = false;
} else {
Expand All @@ -1685,6 +1684,11 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
? ReadFile(mLogFileOp, stringMemory.data + lastCacheSize, READ_BYTE, lastReadPos, &truncateInfo)
: 0UL;
stringBuffer = stringMemory.data;
bool allowRollback = true;
// Only when there is no new log and not try rollback, then force read
if (!tryRollback && nbytes == 0) {
allowRollback = false;
}
if (nbytes == 0 && (!lastCacheSize || allowRollback)) { // read nothing, if no cached data or allow rollback the
// reader's state cannot be changed
return;
Expand Down Expand Up @@ -1767,12 +1771,13 @@ void LogFileReader::ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData,
LOG_DEBUG(sLogger, ("read size", nbytes)("last file pos", mLastFilePos));
}

void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback) {
void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback) {
std::unique_ptr<char[]> gbkMemory;
char* gbkBuffer = nullptr;
size_t readCharCount = 0, originReadCount = 0;
int64_t lastReadPos = 0;
bool logTooLongSplitFlag = false, fromCpt = false;
bool allowRollback = true;

logBuffer.readOffset = mLastFilePos;
if (!mLogFileOp.IsOpen()) {
Expand All @@ -1788,7 +1793,8 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b
logBuffer.readOffset = mLastFilePos;
--readCharCount;
}
mLastForceRead = !allowRollback;
mLastForceRead = true;
allowRollback = false;
lastReadPos = mLastFilePos + readCharCount;
originReadCount = readCharCount;
moreData = false;
Expand All @@ -1807,6 +1813,10 @@ void LogFileReader::ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, b
lastReadPos = GetLastReadPos();
readCharCount
= READ_BYTE ? ReadFile(mLogFileOp, gbkBuffer + lastCacheSize, READ_BYTE, lastReadPos, &truncateInfo) : 0UL;
// Only when there is no new log and not try rollback, then force read
if (!tryRollback && readCharCount == 0) {
allowRollback = false;
}
if (readCharCount == 0 && (!lastCacheSize || allowRollback)) { // just keep last cache
return;
}
Expand Down
6 changes: 3 additions & 3 deletions core/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,9 @@ class LogFileReader {
int64_t GetLogGroupKey() const { return mLogGroupKey; }

protected:
bool GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool allowRollback = true);
void ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback = true);
void ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool allowRollback = true);
bool GetRawData(LogBuffer& logBuffer, int64_t fileSize, bool tryRollback = true);
void ReadUTF8(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback = true);
void ReadGBK(LogBuffer& logBuffer, int64_t end, bool& moreData, bool tryRollback = true);

size_t
ReadFile(LogFileOperator& logFileOp, void* buf, size_t size, int64_t& offset, TruncateInfo** truncateInfo = NULL);
Expand Down
12 changes: 8 additions & 4 deletions core/unittest/reader/LogFileReaderUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,11 @@ void LogFileReaderUnittest::TestReadGBK() {
expectedPart.resize(firstReadSize);
reader.ReadGBK(logBuffer, 127, moreData, false); // first line without \n
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data());
APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_FALSE_FATAL(reader.mLastForceRead);
reader.ReadGBK(logBuffer, 127, moreData, false); // force read, clear cache
APSARA_TEST_TRUE_FATAL(reader.mLastForceRead);
APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data());

// second read, start with \n but with other lines
reader.ReadGBK(logBuffer, fileSize - 1, moreData);
Expand Down Expand Up @@ -455,9 +457,11 @@ void LogFileReaderUnittest::TestReadUTF8() {
reader.mLastForceRead = true;
reader.ReadUTF8(logBuffer, firstReadSize, moreData, false);
APSARA_TEST_FALSE_FATAL(moreData);
APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data());
APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_FALSE_FATAL(reader.mLastForceRead);
reader.ReadUTF8(logBuffer, firstReadSize, moreData, false); // force read, clear cache
APSARA_TEST_TRUE_FATAL(reader.mLastForceRead);
APSARA_TEST_EQUAL_FATAL(reader.mCache.size(), 0UL);
APSARA_TEST_STREQ_FATAL(expectedPart.c_str(), logBuffer.rawBuffer.data());

// second read, start with \n but with other lines
reader.ReadUTF8(logBuffer, fileSize - 1, moreData);
Expand Down

0 comments on commit 550d736

Please sign in to comment.