Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport 2.1] fix PreservedDirDepth not working with polling and wildcard path #1887

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions core/checkpoint/CheckPointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,18 @@ bool CheckPointManager::GetCheckPoint(DevInode devInode, const std::string& conf
return false;
}

void CheckPointManager::DeleteDirCheckPoint(const std::string& filename) {
std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.find(filename);
if (it != mDirNameMap.end())
void CheckPointManager::DeleteDirCheckPoint(const std::string& dirname) {
std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.find(dirname);
if (it != mDirNameMap.end()) {
mDirNameMap.erase(it);
}
auto parentpos = dirname.find_last_of(PATH_SEPARATOR);
if (parentpos != std::string::npos) {
auto parentDirCheckpoint = mDirNameMap.find(dirname.substr(0, parentpos));
if (parentDirCheckpoint != mDirNameMap.end()) {
parentDirCheckpoint->second->mSubDir.erase(dirname);
}
}
}

bool CheckPointManager::GetDirCheckPoint(const std::string& dirname, DirCheckPointPtr& dirCheckPointPtr) {
Expand Down
2 changes: 1 addition & 1 deletion core/checkpoint/CheckPointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class CheckPointManager {
void AddCheckPoint(CheckPoint* checkPointPtr);
void AddDirCheckPoint(const std::string& dirname);
void DeleteCheckPoint(DevInode devInode, const std::string& configName);
void DeleteDirCheckPoint(const std::string& filename);
void DeleteDirCheckPoint(const std::string& dirname);
void LoadCheckPoint();
void LoadDirCheckPoint(const Json::Value& root);
void LoadFileCheckPoint(const Json::Value& root);
Expand Down
3 changes: 3 additions & 0 deletions core/config/provider/CommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ void CommonConfigProvider::Stop() {
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("common config provider", "stopped successfully"));
Expand Down
114 changes: 79 additions & 35 deletions core/config_manager/ConfigManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,15 @@ ParseConfResult ParseConfig(const std::string& configName, Json::Value& jsonRoot

ifstream is;
is.open(fullPath.c_str());
if (!is.good()) {
if (!is) { // https://horstmann.com/cpp/pitfalls.html
return CONFIG_NOT_EXIST;
}
std::string buffer;
try {
buffer.assign(std::istreambuf_iterator<char>(is), std::istreambuf_iterator<char>());
} catch (const std::ios_base::failure& e) {
return CONFIG_NOT_EXIST;
}
std::string buffer((std::istreambuf_iterator<char>(is)), (std::istreambuf_iterator<char>()));
if (!IsValidJson(buffer.c_str(), buffer.length())) {
return CONFIG_INVALID_FORMAT;
}
Expand Down Expand Up @@ -309,9 +314,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons
if (registerStatus == GET_REGISTER_STATUS_ERROR) {
return;
}
if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) {
if (config.first->mPreservedDirDepth < 0)
RegisterDescendants(
item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
RegisterHandlersWithinDepth(item,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100
: config.first->mMaxDirSearchDepth);
}
} else {
RegisterWildcardPath(config, item, depth + 1);
Expand Down Expand Up @@ -378,9 +390,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons
if (registerStatus == GET_REGISTER_STATUS_ERROR) {
return;
}
if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) {
if (config.first->mPreservedDirDepth < 0)
RegisterDescendants(
item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
RegisterHandlersWithinDepth(
item,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
}
} else {
RegisterWildcardPath(config, item, depth + 1);
Expand Down Expand Up @@ -417,25 +436,23 @@ bool ConfigManager::RegisterHandlers(const string& basePath, const FileDiscovery
DirRegisterStatus registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(basePath);
if (registerStatus == GET_REGISTER_STATUS_ERROR)
return result;
// dir in config is valid by default, do not call pathValidator
result = EventDispatcher::GetInstance()->RegisterEventHandler(basePath.c_str(), config, mSharedHandler);
// if we come into a failure, do not try to register others, there must be something wrong!
if (!result)
return result;

if (config.first->mPreservedDirDepth < 0)
result = RegisterDescendants(
basePath, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
int depth = config.first->mPreservedDirDepth;
result = RegisterHandlersWithinDepth(basePath, config, depth);
result = RegisterHandlersWithinDepth(basePath,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100
: config.first->mMaxDirSearchDepth);
}
return result;
}

bool ConfigManager::RegisterDirectory(const std::string& source, const std::string& object) {
// TODO��A potential bug: FindBestMatch will test @object with filePattern, which has very
// TODO: A potential bug: FindBestMatch will test @object with filePattern, which has very
// low possibility to match a sub directory name, so here will return false in most cases.
// e.g.: source: /path/to/monitor, file pattern: *.log, object: subdir.
// Match(subdir, *.log) = false.
Expand All @@ -445,24 +462,30 @@ bool ConfigManager::RegisterDirectory(const std::string& source, const std::stri
return false;
}

bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth) {
bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path,
const FileDiscoveryConfig& config,
int preservedDirDepth,
int maxDepth) {
if (maxDepth < 0) {
return true;
}
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", path));
return false;
}
if (depth <= 0) {
DirCheckPointPtr dirCheckPoint;
if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint) == false)
if (preservedDirDepth < 0) {
fsutil::PathStat statBuf;
if (!fsutil::PathStat::stat(path, statBuf)) {
return true;
}
int64_t sec = 0;
int64_t nsec = 0;
statBuf.GetLastWriteTime(sec, nsec);
auto curTime = time(nullptr);
if (curTime - sec > INT32_FLAG(timeout_interval)) {
return true;
// path had dircheckpoint means it was watched before, so it is valid
const set<string>& subdir = dirCheckPoint.get()->mSubDir;
for (set<string>::iterator it = subdir.begin(); it != subdir.end(); it++) {
if (EventDispatcher::GetInstance()->RegisterEventHandler((*it).c_str(), config, mSharedHandler))
RegisterHandlersWithinDepth(*it, config, depth - 1);
}
return true;
}
bool result = true;

fsutil::Dir dir(path);
if (!dir.Open()) {
Expand All @@ -476,30 +499,45 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const F
LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err));
return false;
}
if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler))) {
// break;// fail early, do not try to register others
return false;
}
if (maxDepth == 0) {
return true;
}

if (preservedDirDepth == 0) {
DirCheckPointPtr dirCheckPoint;
if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) {
// path had dircheckpoint means it was watched before, so it is valid
const set<string>& subdir = dirCheckPoint.get()->mSubDir;
for (const auto& it : subdir) {
RegisterHandlersWithinDepth(it, config, 0, maxDepth - 1);
}
return true;
}
}
fsutil::Entry ent;
while ((ent = dir.ReadNext())) {
string item = PathJoin(path, ent.Name());
if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) {
if (!(EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler))) {
// break;// fail early, do not try to register others
result = false;
} else // sub dir will not be registered if parent dir fails
RegisterHandlersWithinDepth(item, config, depth - 1);
RegisterHandlersWithinDepth(item, config, preservedDirDepth - 1, maxDepth - 1);
}
}

return result;
return true;
}

// path not terminated by '/', path already registered
bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryConfig& config, int withinDepth) {
if (withinDepth < 0) {
return true;
}
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", path));
return false;
}
if (withinDepth <= 0) {
return true;
}

fsutil::Dir dir(path);
if (!dir.Open()) {
Expand All @@ -512,14 +550,20 @@ bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryC
LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err));
return false;
}
if (!EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler)) {
// break;// fail early, do not try to register others
return false;
}
if (withinDepth == 0) {
return true;
}

fsutil::Entry ent;
bool result = true;
while ((ent = dir.ReadNext())) {
string item = PathJoin(path, ent.Name());
if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) {
result = EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler);
if (result)
RegisterDescendants(item, config, withinDepth - 1);
RegisterDescendants(item, config, withinDepth - 1);
}
}
return result;
Expand Down
5 changes: 4 additions & 1 deletion core/config_manager/ConfigManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,10 @@ class ConfigManager {
* @param path is the current dir that being registered
* @depth is the num of sub dir layers that should be registered
*/
bool RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth);
bool RegisterHandlersWithinDepth(const std::string& path,
const FileDiscoveryConfig& config,
int preservedDirDepth,
int maxDepth);
bool RegisterDescendants(const std::string& path, const FileDiscoveryConfig& config, int withinDepth);
// bool CheckLogType(const std::string& logTypeStr, LogType& logType);
// 废弃
Expand Down
9 changes: 1 addition & 8 deletions core/controller/EventDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,6 @@ bool EventDispatcher::RegisterEventHandler(const char* path,
bool dirTimeOutFlag = config.first->IsTimeout(path);

if (!mEventListener->IsValidID(wd)) {
if (dirTimeOutFlag) {
LOG_DEBUG(
sLogger,
("Drop timeout path, source", path)("config, basepath", config.first->GetBasePath())(
"preseveDepth", config.first->mPreservedDirDepth)("maxDepth", config.first->mMaxDirSearchDepth));
return false;
}
wd = mNonInotifyWd;
if (mNonInotifyWd == INT_MIN)
mNonInotifyWd = -1;
Expand Down Expand Up @@ -904,7 +897,7 @@ void EventDispatcher::HandleTimeout() {
time_t curTime = time(NULL);
MapType<int, time_t>::Type::iterator itr = mWdUpdateTimeMap.begin();
for (; itr != mWdUpdateTimeMap.end(); ++itr) {
if (curTime - (itr->second) >= INT32_FLAG(timeout_interval)) {
if (curTime - (itr->second) > INT32_FLAG(timeout_interval)) {
// add to vector then batch process to avoid possible iterator change problem
// mHandler may remove what itr points to, thus change the layout of the map container
// what follows may not work
Expand Down
9 changes: 7 additions & 2 deletions core/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,13 @@ endmacro()
# asan for debug
macro(link_asan target_name)
if(CMAKE_BUILD_TYPE MATCHES Debug)
target_compile_options(${target_name} PUBLIC -fsanitize=address)
target_link_options(${target_name} PUBLIC -fsanitize=address -static-libasan)
if (UNIX)
target_compile_options(${target_name} PUBLIC -fsanitize=address)
target_link_options(${target_name} PUBLIC -fsanitize=address -static-libasan)
elseif(MSVC)
target_compile_options(${target_name} PUBLIC /fsanitize=address)
target_link_options(${target_name} PUBLIC /fsanitize=address)
endif()
endif()
endmacro()

Expand Down
7 changes: 5 additions & 2 deletions core/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,13 @@ void CreateHandler::Handle(const Event& event) {
if (!config.first)
return;
else if (event.IsDir())
ConfigManager::GetInstance()->RegisterHandlersRecursively(path, config, false);
ConfigManager::GetInstance()->RegisterHandlers(path, config);
else {
// symbolic link
if (EventDispatcher::GetInstance()->IsDirRegistered(path) == PATH_INODE_NOT_REGISTERED)
if (EventDispatcher::GetInstance()->IsDirRegistered(path) == PATH_INODE_NOT_REGISTERED) {
// TODO: why not use RegisterHandlers
ConfigManager::GetInstance()->RegisterHandlersRecursively(path, config, true);
}
}
}

Expand All @@ -176,6 +178,7 @@ void TimeoutHandler::Handle(const Event& ev) {
const string& dir = ev.GetSource();
EventDispatcher::GetInstance()->UnregisterEventHandler(dir.c_str());
ConfigManager::GetInstance()->RemoveHandler(dir);
CheckPointManager::Instance()->DeleteDirCheckPoint(dir);
}


Expand Down
5 changes: 3 additions & 2 deletions core/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ using namespace std;

DEFINE_FLAG_INT32(check_symbolic_link_interval, "seconds", 120);
DEFINE_FLAG_INT32(check_base_dir_interval, "seconds", 60);
DEFINE_FLAG_INT32(check_timeout_interval, "seconds", 600);
DEFINE_FLAG_INT32(log_input_thread_wait_interval, "microseconds", 20 * 1000);
DEFINE_FLAG_INT64(read_fs_events_interval, "microseconds", 20 * 1000);
DEFINE_FLAG_INT32(check_handler_timeout_interval, "seconds", 180);
Expand Down Expand Up @@ -364,7 +365,7 @@ void* LogInput::ProcessLoop() {
int32_t prevTime = time(NULL);
mLastReadEventTime = prevTime;
int32_t curTime = prevTime;
srand(prevTime);
srand(0); // avoid random failures in unit tests
int32_t lastCheckDir = prevTime - rand() % 60;
int32_t lastCheckSymbolicLink = prevTime - rand() % 60;
time_t lastCheckHandlerTimeOut = prevTime - rand() % 60;
Expand Down Expand Up @@ -422,7 +423,7 @@ void* LogInput::ProcessLoop() {
lastCheckSymbolicLink = 0;
}

if (curTime - prevTime >= INT32_FLAG(timeout_interval)) {
if (curTime - prevTime >= INT32_FLAG(check_timeout_interval)) {
dispatcher->HandleTimeout();
prevTime = curTime;
}
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/FileDiscoveryOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ bool FileDiscoveryOptions::IsWildcardPathMatch(const string& path, const string&

// XXX: assume path is a subdir under mBasePath
bool FileDiscoveryOptions::IsTimeout(const string& path) const {
if (mPreservedDirDepth < 0 || mWildcardPaths.size() > 0)
if (mPreservedDirDepth < 0)
return false;

// we do not check if (path.find(mBasePath) == 0)
Expand Down
10 changes: 10 additions & 0 deletions core/file_server/FileServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,17 @@ namespace logtail {
void FileServer::Start() {
ConfigManager::GetInstance()->LoadDockerConfig();
CheckPointManager::Instance()->LoadCheckPoint();
LOG_INFO(sLogger, ("watch dirs", "start"));
auto start = GetCurrentTimeInMilliSeconds();
ConfigManager::GetInstance()->RegisterHandlers();
auto costMs = GetCurrentTimeInMilliSeconds() - start;
if (costMs >= 60 * 1000) {
LogtailAlarm::GetInstance()->SendAlarm(REGISTER_HANDLERS_TOO_SLOW_ALARM,
"Registering handlers took " + ToString(costMs) + " ms");
LOG_WARNING(sLogger, ("watch dirs", "succeeded")("costMs", costMs));
} else {
LOG_INFO(sLogger, ("watch dirs", "succeeded")("costMs", costMs));
}
LOG_INFO(sLogger, ("watch dirs", "succeeded"));
EventDispatcher::GetInstance()->AddExistedCheckPointFileEvents();
// the dump time must be reset after dir registration, since it may take long on NFS.
Expand Down
4 changes: 4 additions & 0 deletions core/monitor/LogtailAlarm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ LogtailAlarm::LogtailAlarm() {
mMessageType[OBSERVER_RUNTIME_ALARM] = "OBSERVER_RUNTIME_ALARM";
mMessageType[OBSERVER_STOP_ALARM] = "OBSERVER_STOP_ALARM";
mMessageType[INVALID_CONTAINER_PATH_ALARM] = "INVALID_CONTAINER_PATH_ALARM";
mMessageType[REGISTER_HANDLERS_TOO_SLOW_ALARM] = "REGISTER_HANDLERS_TOO_SLOW_ALARM";
}

void LogtailAlarm::Init() {
Expand All @@ -115,6 +116,9 @@ void LogtailAlarm::Stop() {
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("alarm gathering", "stopped successfully"));
Expand Down
Loading
Loading