Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix PreservedDirDepth not working with polling and wildcard path #1866

Merged
merged 9 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ _deps
core/build/
core/protobuf/config_server/*/*.pb.*
core/protobuf/*/*.pb.*
core/log_pb/*.pb.*
core/common/Version.cpp
!/Makefile
# Enterprise
Expand Down
1 change: 1 addition & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ cmake_dependent_option(ENABLE_STATIC_LINK_CRT "Build Logtail by linking CRT stat
option(WITHOUTGDB "Build Logtail without gdb")
option(WITHSPL "Build Logtail and UT with SPL" ON)
option(BUILD_LOGTAIL_UT "Build unit test for Logtail")
cmake_dependent_option(ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" OFF "CMAKE_BUILD_TYPE MATCHES Debug;NOT ANDROID" ON)
set(PROVIDER_PATH "provider" CACHE PATH "Path to the provider module") # external provider path can be set with -DPROVIDER_PATH
set(UNITTEST_PATH "unittest" CACHE PATH "Path to the unittest module") # external unittest path can be set with -DUNITTEST_PATH

Expand Down
14 changes: 11 additions & 3 deletions core/checkpoint/CheckPointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,18 @@ bool CheckPointManager::GetCheckPoint(DevInode devInode, const std::string& conf
return false;
}

void CheckPointManager::DeleteDirCheckPoint(const std::string& filename) {
std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.find(filename);
if (it != mDirNameMap.end())
void CheckPointManager::DeleteDirCheckPoint(const std::string& dirname) {
std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.find(dirname);
if (it != mDirNameMap.end()) {
mDirNameMap.erase(it);
}
auto parentpos = dirname.find_last_of(PATH_SEPARATOR);
if (parentpos != std::string::npos) {
auto parentDirCheckpoint = mDirNameMap.find(dirname.substr(0, parentpos));
if (parentDirCheckpoint != mDirNameMap.end()) {
parentDirCheckpoint->second->mSubDir.erase(dirname);
}
}
}

bool CheckPointManager::GetDirCheckPoint(const std::string& dirname, DirCheckPointPtr& dirCheckPointPtr) {
Expand Down
2 changes: 1 addition & 1 deletion core/checkpoint/CheckPointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class CheckPointManager {
void AddCheckPoint(CheckPoint* checkPointPtr);
void AddDirCheckPoint(const std::string& dirname);
void DeleteCheckPoint(DevInode devInode, const std::string& configName);
void DeleteDirCheckPoint(const std::string& filename);
void DeleteDirCheckPoint(const std::string& dirname);
void LoadCheckPoint();
void LoadDirCheckPoint(const Json::Value& root);
void LoadFileCheckPoint(const Json::Value& root);
Expand Down
2 changes: 1 addition & 1 deletion core/common/links.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ macro(common_link target_name)
link_zlib(${target_name})
link_zstd(${target_name})
link_unwind(${target_name})
if (NOT ANDROID)
if (ENABLE_ADDRESS_SANITIZER)
link_asan(${target_name})
endif()
if (UNIX)
Expand Down
5 changes: 4 additions & 1 deletion core/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,12 @@ endmacro()

# asan for debug
macro(link_asan target_name)
if(CMAKE_BUILD_TYPE MATCHES Debug)
if (UNIX)
target_compile_options(${target_name} PUBLIC -fsanitize=address)
target_link_options(${target_name} PUBLIC -fsanitize=address -static-libasan)
elseif(MSVC)
target_compile_options(${target_name} PUBLIC /fsanitize=address)
target_link_options(${target_name} PUBLIC /fsanitize=address)
endif()
endmacro()

Expand Down
35 changes: 18 additions & 17 deletions core/file_server/ConfigManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ bool ConfigManager::RegisterHandlersRecursively(const std::string& path,
return result;

if (!config.first->IsDirectoryInBlacklist(path))
result = EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler);
result = EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler);

if (!result)
return result;
Expand Down Expand Up @@ -462,7 +462,7 @@ bool ConfigManager::RegisterDirectory(const std::string& source, const std::stri
// Match(subdir, *.log) = false.
FileDiscoveryConfig config = FindBestMatch(source, object);
if (config.first && !config.first->IsDirectoryInBlacklist(source)) {
return EventDispatcher::GetInstance()->RegisterEventHandler(source.c_str(), config, mSharedHandler);
return EventDispatcher::GetInstance()->RegisterEventHandler(source, config, mSharedHandler);
}
return false;
}
Expand All @@ -478,16 +478,7 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path,
LOG_INFO(sLogger, ("ignore path matching host path blacklist", path));
return false;
}
if (preservedDirDepth <= 0) {
DirCheckPointPtr dirCheckPoint;
if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) {
// path had dircheckpoint means it was watched before, so it is valid
const set<string>& subdir = dirCheckPoint.get()->mSubDir;
for (const auto& it : subdir) {
RegisterHandlersWithinDepth(it, config, preservedDirDepth - 1, maxDepth - 1);
}
return true;
}
if (preservedDirDepth < 0) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=0和<0需要区分,=0时总是注册baseDir,<0时必须符合不超时的条件

fsutil::PathStat statBuf;
if (!fsutil::PathStat::stat(path, statBuf)) {
return true;
Expand All @@ -513,23 +504,33 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path,
LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err));
return false;
}
if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler))) {
if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler))) {
// break;// fail early, do not try to register others
return false;
}
if (maxDepth == 0) {
return true;
}
bool result = true;

if (preservedDirDepth == 0) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=0时就必须进入checkpoint恢复的注册方式,在此方式下保持PreservedDirDepth=0确保注册成功

DirCheckPointPtr dirCheckPoint;
if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) {
// path had dircheckpoint means it was watched before, so it is valid
const set<string>& subdir = dirCheckPoint.get()->mSubDir;
for (const auto& it : subdir) {
RegisterHandlersWithinDepth(it, config, 0, maxDepth - 1);
}
return true;
}
}
fsutil::Entry ent;
while ((ent = dir.ReadNext())) {
string item = PathJoin(path, ent.Name());
if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) {
RegisterHandlersWithinDepth(item, config, preservedDirDepth - 1, maxDepth - 1);
}
}

return result;
return true;
}

// path not terminated by '/', path already registered
Expand All @@ -553,7 +554,7 @@ bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryC
LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err));
return false;
}
if (!EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler)) {
if (!EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler)) {
// break;// fail early, do not try to register others
return false;
}
Expand Down
91 changes: 45 additions & 46 deletions core/file_server/EventDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ DEFINE_FLAG_INT32(default_max_inotify_watch_num, "the max allowed inotify watch

namespace logtail {

EventDispatcher::EventDispatcher() : mWatchNum(0), mInotifyWatchNum(0) {
EventDispatcher::EventDispatcher() : mWatchNum(0), mInotifyWatchNum(0), mEventListener(EventListener::GetInstance()) {
/*
* May add multiple inotify fd instances in the future,
* so use epoll here though a little more sophisticated than select
Expand All @@ -100,7 +100,6 @@ EventDispatcher::EventDispatcher() : mWatchNum(0), mInotifyWatchNum(0) {
// mListenFd = -1;
// mStreamLogTcpFd = -1;
// #endif
mEventListener = EventListener::GetInstance();
if (!AppConfig::GetInstance()->NoInotify()) {
if (!mEventListener->Init()) {
AlarmManager::GetInstance()->SendAlarm(EPOLL_ERROR_ALARM,
Expand Down Expand Up @@ -142,7 +141,7 @@ EventDispatcher::~EventDispatcher() {
delete mTimeoutHandler;
}

bool EventDispatcher::RegisterEventHandler(const char* path,
bool EventDispatcher::RegisterEventHandler(const string& path,
const FileDiscoveryConfig& config,
EventHandler*& handler) {
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) {
Expand Down Expand Up @@ -177,7 +176,7 @@ bool EventDispatcher::RegisterEventHandler(const char* path,
return false;
}
uint64_t inode = statBuf.GetDevInode().inode;
int wd;
int wd = -1;
MapType<string, int>::Type::iterator pathIter = mPathWdMap.find(path);
if (pathIter != mPathWdMap.end()) {
wd = pathIter->second;
Expand Down Expand Up @@ -236,8 +235,8 @@ bool EventDispatcher::RegisterEventHandler(const char* path,
} else {
// need check mEventListener valid
if (mEventListener->IsInit() && !AppConfig::GetInstance()->IsInInotifyBlackList(path)) {
wd = mEventListener->AddWatch(path);
if (!mEventListener->IsValidID(wd)) {
wd = mEventListener->AddWatch(path.c_str());
if (!EventListener::IsValidID(wd)) {
string str = ErrnoToString(GetErrno());
LOG_WARNING(sLogger, ("failed to register dir", path)("reason", str));
#if defined(__linux__)
Expand Down Expand Up @@ -279,7 +278,7 @@ bool EventDispatcher::RegisterEventHandler(const char* path,

bool dirTimeOutFlag = config.first->IsTimeout(path);

if (!mEventListener->IsValidID(wd)) {
if (!EventListener::IsValidID(wd)) {
wd = mNonInotifyWd;
if (mNonInotifyWd == INT_MIN)
mNonInotifyWd = -1;
Expand Down Expand Up @@ -311,7 +310,7 @@ bool EventDispatcher::RegisterEventHandler(const char* path,
}

// read files when add dir inotify watcher at first time
void EventDispatcher::AddExistedFileEvents(const char* path, int wd) {
void EventDispatcher::AddExistedFileEvents(const string& path, int wd) {
fsutil::Dir dir(path);
if (!dir.Open()) {
auto err = GetErrno();
Expand Down Expand Up @@ -618,7 +617,7 @@ void EventDispatcher::AddExistedCheckPointFileEvents() {
// Because they are not in v1 checkpoint manager, no need to delete them.
auto exactlyOnceConfigs = FileServer::GetInstance()->GetExactlyOnceConfigs();
if (!exactlyOnceConfigs.empty()) {
static auto sCptMV2 = CheckpointManagerV2::GetInstance();
static auto* sCptMV2 = CheckpointManagerV2::GetInstance();
auto exactlyOnceCpts = sCptMV2->ScanCheckpoints(exactlyOnceConfigs);
LOG_INFO(sLogger,
("start add exactly once checkpoint events",
Expand Down Expand Up @@ -687,14 +686,13 @@ void EventDispatcher::AddExistedCheckPointFileEvents() {
}
}

bool EventDispatcher::AddTimeoutWatch(const char* path) {
bool EventDispatcher::AddTimeoutWatch(const string& path) {
MapType<string, int>::Type::iterator itr = mPathWdMap.find(path);
if (itr != mPathWdMap.end()) {
mWdUpdateTimeMap[itr->second] = time(NULL);
return true;
} else {
return false;
}
return false;
}

void EventDispatcher::AddOneToOneMapEntry(DirInfo* dirInfo, int wd) {
Expand Down Expand Up @@ -811,11 +809,11 @@ void EventDispatcher::UnregisterAllDir(const string& baseDir) {
LOG_DEBUG(sLogger, ("Remove all sub dir", baseDir));
auto subDirAndHandlers = FindAllSubDirAndHandler(baseDir);
for (auto& subDirAndHandler : subDirAndHandlers) {
mTimeoutHandler->Handle(Event(subDirAndHandler.first.c_str(), "", 0, 0));
mTimeoutHandler->Handle(Event(subDirAndHandler.first, "", 0, 0));
}
}

void EventDispatcher::UnregisterEventHandler(const char* path) {
void EventDispatcher::UnregisterEventHandler(const string& path) {
MapType<string, int>::Type::iterator pos = mPathWdMap.find(path);
if (pos == mPathWdMap.end())
return;
Expand All @@ -828,10 +826,9 @@ void EventDispatcher::UnregisterEventHandler(const char* path) {
mBrokenLinkSet.insert(path);
}
}
LOG_INFO(sLogger, ("remove a new watcher for dir", path)("wd", wd));
RemoveOneToOneMapEntry(wd);
mWdUpdateTimeMap.erase(wd);
if (mEventListener->IsValidID(wd) && mEventListener->IsInit()) {
if (EventListener::IsValidID(wd) && mEventListener->IsInit()) {
mEventListener->RemoveWatch(wd);
mInotifyWatchNum--;
}
Expand All @@ -843,7 +840,7 @@ void EventDispatcher::StopAllDir(const string& baseDir) {
LOG_DEBUG(sLogger, ("Stop all sub dir", baseDir));
auto subDirAndHandlers = FindAllSubDirAndHandler(baseDir);
for (auto& subDirAndHandler : subDirAndHandlers) {
Event e(subDirAndHandler.first.c_str(), "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, -1, 0);
Event e(subDirAndHandler.first, "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, -1, 0);
subDirAndHandler.second->Handle(e);
}
}
Expand All @@ -864,7 +861,7 @@ DirRegisterStatus EventDispatcher::IsDirRegistered(const string& path) {
return PATH_INODE_NOT_REGISTERED;
}

bool EventDispatcher::IsRegistered(const char* path) {
bool EventDispatcher::IsRegistered(const std::string& path) {
MapType<string, int>::Type::iterator itr = mPathWdMap.find(path);
if (itr == mPathWdMap.end())
return false;
Expand All @@ -890,6 +887,8 @@ void EventDispatcher::HandleTimeout() {
time_t curTime = time(NULL);
MapType<int, time_t>::Type::iterator itr = mWdUpdateTimeMap.begin();
for (; itr != mWdUpdateTimeMap.end(); ++itr) {
LOG_ERROR(sLogger,
("path", mWdDirInfoMap[itr->first]->mPath)("curTime", curTime)("lastupdatetime", itr->second));
if (curTime - (itr->second) > INT32_FLAG(timeout_interval)) {
// add to vector then batch process to avoid possible iterator change problem
// mHandler may remove what itr points to, thus change the layout of the map container
Expand All @@ -911,33 +910,30 @@ void EventDispatcher::HandleTimeout() {
}
}

void EventDispatcher::PropagateTimeout(const char* path) {
char* tmp = strdup(path);
MapType<string, int>::Type::iterator pathpos = mPathWdMap.find(tmp);
void EventDispatcher::PropagateTimeout(const std::string& path) {
auto pathpos = mPathWdMap.find(path);
if (pathpos == mPathWdMap.end()) {
// walkarond of bug#5760293, should find the scenarios
AlarmManager::GetInstance()->SendAlarm(
INVALID_MEMORY_ACCESS_ALARM, "PropagateTimeout access invalid key of mPathWdMap, path : " + string(tmp));
LOG_ERROR(sLogger, ("PropagateTimeout access invalid key of mPathWdMap, path", string(tmp)));
free(tmp);
AlarmManager::GetInstance()->SendAlarm(INVALID_MEMORY_ACCESS_ALARM,
"PropagateTimeout access invalid key of mPathWdMap, path : " + path);
LOG_ERROR(sLogger, ("PropagateTimeout access invalid key of mPathWdMap, path", path));
return;
}
MapType<int, time_t>::Type::iterator pos = mWdUpdateTimeMap.find(pathpos->second);
char* slashpos;
time_t curTime = time(NULL);
string tmp(path);
auto pos = mWdUpdateTimeMap.find(pathpos->second);
time_t curTime = time(nullptr);
while (pos != mWdUpdateTimeMap.end()) {
pos->second = curTime;
slashpos = strrchr(tmp, '/');
if (slashpos == NULL)
auto slashpos = tmp.rfind('/');
if (slashpos == string::npos)
break;
*slashpos = '\0';
tmp.resize(slashpos);
pathpos = mPathWdMap.find(tmp);
if (pathpos != mPathWdMap.end())
pos = mWdUpdateTimeMap.find(pathpos->second);
else
break;
}
free(tmp);
}

void EventDispatcher::StartTimeCount() {
Expand All @@ -963,7 +959,7 @@ void EventDispatcher::DumpAllHandlersMeta(bool remove) {
int wd = timeout[i];
string path = mWdDirInfoMap[wd]->mPath;
if (remove) {
UnregisterEventHandler(path.c_str());
UnregisterEventHandler(path);
ConfigManager::GetInstance()->RemoveHandler(path, false);
if (ConfigManager::GetInstance()->FindBestMatch(path).first == NULL) {
continue;
Expand All @@ -979,26 +975,29 @@ void EventDispatcher::ProcessHandlerTimeOut() {
for (; mapIter != mWdDirInfoMap.end(); ++mapIter) {
mapIter->second->mHandler->HandleTimeOut();
}
return;
}

void EventDispatcher::DumpCheckPointPeriod(int32_t curTime) {
if (CheckPointManager::Instance()->NeedDump(curTime)) {
LOG_INFO(sLogger, ("checkpoint dump", "starts"));
FileServer::GetInstance()->Pause(false);
DumpAllHandlersMeta(false);

if (!(CheckPointManager::Instance()->DumpCheckPointToLocal()))
LOG_WARNING(sLogger, ("dump checkpoint to local", "failed"));
else
LOG_DEBUG(sLogger, ("dump checkpoint to local", "succeeded"));
// after save checkpoint, we should clear all checkpoint
CheckPointManager::Instance()->RemoveAllCheckPoint();
FileServer::GetInstance()->Resume(false);
LOG_INFO(sLogger, ("checkpoint dump", "succeeded"));
DumpCheckPoint();
}
}

void EventDispatcher::DumpCheckPoint() {
LOG_INFO(sLogger, ("checkpoint dump", "starts"));
FileServer::GetInstance()->Pause(false);
DumpAllHandlersMeta(false);

if (!(CheckPointManager::Instance()->DumpCheckPointToLocal()))
LOG_WARNING(sLogger, ("dump checkpoint to local", "failed"));
else
LOG_DEBUG(sLogger, ("dump checkpoint to local", "succeeded"));
// after save checkpoint, we should clear all checkpoint
CheckPointManager::Instance()->RemoveAllCheckPoint();
FileServer::GetInstance()->Resume(false);
LOG_INFO(sLogger, ("checkpoint dump", "succeeded"));
}

bool EventDispatcher::IsAllFileRead() {
for (auto it = mWdDirInfoMap.begin(); it != mWdDirInfoMap.end(); ++it) {
if (!((it->second)->mHandler)->IsAllFileRead()) {
Expand Down
Loading
Loading