Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear empty thread when graph destroy #7633

Merged
merged 26 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
5feb16b
Clear empty thread when graph destroy
chengtbf Feb 28, 2022
7e3f93a
Merge branch 'master' into dev_cc_thread_leak
jackalcooper Mar 1, 2022
458e678
fix bug of thread empty
chengtbf Mar 1, 2022
2116953
Rollback NNGraph weak_ptr hold by MultiClientSessCtx and fix bug of t…
chengtbf Mar 1, 2022
de27358
limit thread num 5k
jackalcooper Mar 1, 2022
08b4104
limit threads num 3000
jackalcooper Mar 1, 2022
87cc7f0
distributed run limit threads 1000
jackalcooper Mar 1, 2022
9767719
Merge branch 'master' into dev_cc_thread_leak
jackalcooper Mar 1, 2022
1b7224b
fix
jackalcooper Mar 1, 2022
1e9e7a1
:wq
jackalcooper Mar 1, 2022
f1b6d1a
refine code for review 1
chengtbf Mar 1, 2022
545827c
add note
chengtbf Mar 1, 2022
57ea016
Merge branch 'master' into dev_cc_thread_leak
chengtbf Mar 1, 2022
7f2afb2
Merge branch 'master' into dev_cc_thread_leak
oneflow-ci-bot Mar 1, 2022
26f5417
Merge branch 'master' into dev_cc_thread_leak
oneflow-ci-bot Mar 1, 2022
415ec63
different thread limit for cuda and cpu
jackalcooper Mar 2, 2022
803d55c
Merge branch 'master' into dev_cc_thread_leak
chengtbf Mar 2, 2022
190ab62
Merge branch 'master' into dev_cc_thread_leak
oneflow-ci-bot Mar 2, 2022
ac22e06
Merge branch 'master' into dev_cc_thread_leak
oneflow-ci-bot Mar 2, 2022
2a3742e
Add lock of thread del. And using blocking cnt to make sure graph de…
chengtbf Mar 3, 2022
207afe3
Merge branch 'master' into dev_cc_thread_leak
chengtbf Mar 3, 2022
0f895c9
Fix bug of graph destroy order in bad case
chengtbf Mar 3, 2022
8216eac
fix conflicts
chengtbf Mar 22, 2022
63cb89f
remove blocking count of session ctx graphs
chengtbf Mar 22, 2022
3a169da
Merge clear threads (#7859)
strint Mar 22, 2022
ec52eba
Merge branch 'feat/graph_del_by_ref' into dev_cc_thread_leak
chengtbf Mar 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ jobs:
working-directory: ${{ env.ONEFLOW_SRC }}
run: |
docker run -d --rm --privileged --shm-size=8g \
--pids-limit -1 \
--pids-limit 5000 \
--cap-add=SYS_PTRACE --security-opt seccomp=unconfined \
--runtime=nvidia \
-v /dataset:/dataset:ro -v /model_zoo:/model_zoo:ro \
Expand Down
9 changes: 9 additions & 0 deletions oneflow/core/framework/multi_client_session_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,22 @@ Maybe<void> MultiClientSessionContext::TryClose() {
if (is_inited_) {
VLOG(2) << "Try to delete multi client session context." << std::endl;

for (auto wk_graph_ptr : graphs_) {
if (auto sh_graph_ptr = wk_graph_ptr.lock()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const auto&
不要养成用auto的习惯啊。基本上有三种方式替代:const auto&, auto*, auto&&,for循环的迭代变量还可以用auto&。所有的智能指针应该用const auto&

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已改,之前是回滚的啸宇的代码,以后都注意使用 const &

VLOG(2) << "grap name " << sh_graph_ptr->job_name() << " not closed, try to close it.";
JUST(sh_graph_ptr->Close());
}
}

/*
// sync before NNGraph release to ensure LaunchLazyJob instruction was completed and released
JUST(vm::ClusterSync());
for (const auto& graph : graphs_) {
VLOG(2) << "Try to close graph: " << graph->job_name() << std::endl;
JUST(graph->Close());
}
graphs_.clear();
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这段删除掉?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的,当时调试用的,忘了清了

{
// NOTE(chengcheng): delete runtime global objects
Global<boxing::collective::Scheduler>::Delete();
Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/framework/multi_client_session_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class MultiClientSessionContext {
bool is_inited_;
HashMap<std::string, std::vector<std::pair<std::string, std::shared_ptr<one::Tensor>>>>
graph_name2free_eager_tensors_;
std::vector<std::shared_ptr<NNGraph>> graphs_;
std::vector<std::weak_ptr<NNGraph>> graphs_;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这行代码的修改还需要在增加一些保护措施。因为现在 Graph 有可能不是在 main 线程析构 ,而是在指令释放的 Callback 线程中,因此 可能会发生 Graph 析构 顺序 与 Session 析构顺序不严格有序的情况(Session 早于 Graph 析构,就会报错)

Copy link
Contributor

@strint strint Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A→B表示A持有B的shared_ptr,以体现资源依赖关系;A ⇢ B表示A持有B的weak_ptr,以体现释放时的同步等待关系。

py env ← py session  ← py graph
   ↓          ↓           ↓
c env  ⇢   c session   ⇢  c graph
                           ⇣    
                        bc query vm has finished using c graph

py env 、py session、py graph在python层通过python的引用计数,保证了python对象的正确析构顺序。

c env 对 c session、c session对 c graph、c graph对vm那边则资源寻求bc,在析构时会做一遍检查,发现依赖自己的对象还没析构,就用weak ptr去同步等待依赖它的对象完成释放。

Copy link
Contributor Author

@chengtbf chengtbf Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有明确的办法,可以看我最新的 commit 2a3742e

};

} // namespace oneflow
Expand Down
15 changes: 14 additions & 1 deletion oneflow/core/job/runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ bool HasNonCtrlConsumedRegstDescId(const TaskProto& task) {
} // namespace

Runtime::Runtime(const Plan& plan, const HashMap<std::string, Blob*>& variable_op_name2eager_blob) {
DumpThreadIdsFromPlan(plan);
{
// NOTE(chengcheng): All runtime Global objects AddPlan
Global<RegstMgr>::Get()->AddPlan(plan, variable_op_name2eager_blob);
Global<ThreadMgr>::Get()->AddPlan(plan);
Global<ThreadMgr>::Get()->AddThreads(thread_ids_);
Global<RuntimeJobDescs>::Get()->AddPlan(plan);
collective_boxing_scheduler_plan_token_ =
Global<boxing::collective::Scheduler>::Get()->AddPlan(plan);
Expand Down Expand Up @@ -106,7 +107,19 @@ Runtime::~Runtime() {
Global<RuntimeCtx>::Get()->WaitUntilCntEqualZero(GetRunningActorCountKeyByJobId(pair.first));
}
OF_SESSION_BARRIER();
Global<ThreadMgr>::Get()->TryDeleteThreads(thread_ids_);
Global<boxing::collective::Scheduler>::Get()->DeletePlan(collective_boxing_scheduler_plan_token_);
}

void Runtime::DumpThreadIdsFromPlan(const Plan& plan) {
const int64_t this_rank = GlobalProcessCtx::Rank();
for (const TaskProto& task : plan.task()) {
TaskId task_id = DecodeTaskIdFromInt64(task.task_id());
StreamId stream_id = task_id.stream_id();
if (stream_id.rank() != this_rank) { continue; }
int64_t thrd_id = EncodeStreamIdToInt64(stream_id);
thread_ids_.insert(thrd_id);
}
}

} // namespace oneflow
3 changes: 3 additions & 0 deletions oneflow/core/job/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ class Runtime final {
Runtime(const Plan& plan, const HashMap<std::string, Blob*>& variable_op_name2eager_blob);

private:
void DumpThreadIdsFromPlan(const Plan& plan);

HashMap<int64_t, int64_t> job_id2actor_size_;
HashSet<int64_t> thread_ids_;

boxing::collective::SchedulerPlanToken* collective_boxing_scheduler_plan_token_;
};
Expand Down
2 changes: 2 additions & 0 deletions oneflow/core/thread/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ void Thread::AddTask(const TaskProto& task) {
CHECK(id2task_.emplace(task.task_id(), task).second);
}

bool Thread::Empty() const { return id2actor_ptr_.empty(); }
strint marked this conversation as resolved.
Show resolved Hide resolved

void Thread::PollMsgChannel() {
while (true) {
if (local_msg_queue_.empty()) {
Expand Down
1 change: 1 addition & 0 deletions oneflow/core/thread/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Thread {
virtual ~Thread();

void AddTask(const TaskProto&);
bool Empty() const;

Channel<ActorMsg>* GetMsgChannelPtr() { return &msg_channel_; }

Expand Down
41 changes: 30 additions & 11 deletions oneflow/core/thread/thread_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ limitations under the License.
#include "oneflow/core/job/resource_desc.h"
#include "oneflow/core/job/global_for.h"
#include "oneflow/core/control/global_process_ctx.h"
#include "oneflow/core/job/global_for.h"

namespace oneflow {

ThreadMgr::~ThreadMgr() {
for (auto& thread_pair : threads_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check(thread_pair.size() == 0) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是,直接 check 就行。原本想的是报错时把 thread 的信息打出来。但 FATAL 也没法遍历所有的 thread 了

ActorMsg msg = ActorMsg::BuildCommandMsg(-1, ActorCmd::kStopThread);
thread_pair.second->GetMsgChannelPtr()->Send(msg);
thread_pair.second.reset();
LOG(INFO) << "actor thread " << thread_pair.first << " finish";
LOG(FATAL) << " Runtime Error! thread id " << thread_pair.first
<< " not delete with graph, and it's empty is " << thread_pair.second->Empty();
}
}

Expand All @@ -36,20 +33,42 @@ Thread* ThreadMgr::GetThrd(int64_t thrd_id) {
return iter->second.get();
}

void ThreadMgr::AddPlan(const Plan& plan) {
void ThreadMgr::AddThreads(const HashSet<int64_t>& thread_ids) {
const int64_t this_rank = GlobalProcessCtx::Rank();
for (const TaskProto& task : plan.task()) {
TaskId task_id = DecodeTaskIdFromInt64(task.task_id());
StreamId stream_id = task_id.stream_id();
for (int64_t thrd_id : thread_ids) {
const auto& it = threads_.find(thrd_id);
if (it != threads_.end()) {
// NOTE(chengcheng): check thread is not null.
CHECK(it->second);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以增加一点注释,现在遇到check后面没有日志的信息,比较难定位,需要再加上日志;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的

continue;
}
StreamId stream_id = DecodeStreamIdFromInt64(thrd_id);
if (stream_id.rank() != this_rank) { continue; }
int64_t thrd_id = EncodeStreamIdToInt64(stream_id);
if (threads_.find(thrd_id) != threads_.end()) { continue; }
Thread* thread = new Thread(stream_id);
CHECK_NOTNULL(thread);
threads_[thrd_id].reset(thread);
}
}

void ThreadMgr::TryDeleteThreads(const HashSet<int64_t>& thread_ids) {
for (int64_t thrd_id : thread_ids) {
const auto& it = threads_.find(thrd_id);
if (it == threads_.end()) { continue; }
auto& thread = it->second;
CHECK(thread) << " actor thread " << thrd_id << " non-existent but want to delete";
if (thread->Empty()) {
// NOTE(chengcheng): Only delete thread when it is empty.
ActorMsg msg = ActorMsg::BuildCommandMsg(-1, ActorCmd::kStopThread);
thread->GetMsgChannelPtr()->Send(msg);
thread.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不需要等线程结束吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的逻辑跟之前 thread 下线的逻辑一致。 我的理解是这样的: 57行,发送 msg 会让 thread 的 actor poll msg channel 线程跳出循环(异步的)。 58 行,执行 thread 的析构函数,thread 析构时调用 actor_thread_.join(), 这里会同步等待线程结束吧。 所以这里还是会等到线程结束才会销毁 thread

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的。还是应该在这一行之前注释一下析构函数里的语义,减少读者的惊讶。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已加注释

LOG(INFO) << " actor thread " << thrd_id << " finish.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉VLOG(2)比较好

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以

threads_.erase(it);
} else {
LOG(INFO) << " actor thread " << thrd_id << " not delete because it is not empty.";
Copy link
Contributor

@strint strint Mar 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉LOG(Warning)比较好?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

想了一下,warning 可能不太好。因为这个可能是正常现象。比如 train eval,其中一个 graph
析构时, compute thread 不会被析构。

}
}
}

void SingleThreadLoop(size_t num, std::function<void(size_t i)> Callback) {
FOR_RANGE(size_t, i, 0, num) { Callback(i); }
}
Expand Down
3 changes: 2 additions & 1 deletion oneflow/core/thread/thread_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class ThreadMgr final {
ThreadMgr() = default;
~ThreadMgr();

void AddPlan(const Plan& plan);
void AddThreads(const HashSet<int64_t>& thread_ids);
void TryDeleteThreads(const HashSet<int64_t>& thread_ids);
Thread* GetThrd(int64_t thrd_id);

private:
Expand Down