Skip to content

Commit

Permalink
Clear independent thread when graph destroy (#7862)
Browse files Browse the repository at this point in the history
* Clear empty thread when graph destroy

* fix bug of thread empty

* Rollback NNGraph weak_ptr hold by MultiClientSessCtx and fix bug of thread resume

* limit thread num 5k

* limit threads num 3000

* distributed run limit threads 1000

* fix

* refine code for review 1

* add note

* different thread limit for cuda and cpu

* Add lock of thread del. And using blocking cnt to  make sure graph destroy order with session

* remove IsMultiClient() and single client logic

Signed-off-by: daquexian <daquexian566@gmail.com>

* rename eager.multi_client to eager

Signed-off-by: daquexian <daquexian566@gmail.com>

* auto format by CI

* Fix bug of graph destroy order in bad case

* add py ref

* refine new session

* clean code

* make scope api inner use

* use session with ref cnt

* run barrier callback in BarrierPhyInstrOperand::~BarrierPhyInstrOperand

* test pass

* lock gil in vm Callback thread

* more comments for VirtualMachineEngine::Callback()

* merge

* merge rm single client

* rm initenv

* merge and fix master

* refactor env c api

* add debug code

* fix and serving test pass

* test passed

* rm useless

* rm useless code

* format

* rm useless include

* rm sync in py

* the Env is never destroyed.

* export Env into python

* more unittests

* fix and pass tests

* revert virtual_machine.cpp

* revert core/vm

* remove outdated python class oneflow.unittest.TestCase

* graph test passed

* wait shared_ptr.use_count() == 0

* export unittest.TestCase in framework/unittest.py

* SwitchToShuttingDownPhase

* optional is_normal_exit

* VirtualMachine::CloseVMThreads

* Delete env_api.h

env_api.h is deleted by master

* remove blocking count of session ctx graphs

* address pr comments

* rm is env init

* Merge clear threads (#7859)

* remove IsMultiClient() and single client logic

Signed-off-by: daquexian <daquexian566@gmail.com>

* rename eager.multi_client to eager

Signed-off-by: daquexian <daquexian566@gmail.com>

* auto format by CI

* add py ref

* refine new session

* clean code

* make scope api inner use

* use session with ref cnt

* run barrier callback in BarrierPhyInstrOperand::~BarrierPhyInstrOperand

* test pass

* lock gil in vm Callback thread

* more comments for VirtualMachineEngine::Callback()

* merge

* merge rm single client

* rm initenv

* merge and fix master

* refactor env c api

* add debug code

* fix and serving test pass

* test passed

* rm useless

* rm useless code

* format

* rm useless include

* rm sync in py

* the Env is never destroyed.

* export Env into python

* more unittests

* fix and pass tests

* revert virtual_machine.cpp

* revert core/vm

* remove outdated python class oneflow.unittest.TestCase

* graph test passed

* wait shared_ptr.use_count() == 0

* export unittest.TestCase in framework/unittest.py

* SwitchToShuttingDownPhase

* optional is_normal_exit

* VirtualMachine::CloseVMThreads

* Delete env_api.h

env_api.h is deleted by master

* rm is env init

Co-authored-by: daquexian <daquexian566@gmail.com>
Co-authored-by: oneflow-ci-bot <ci-bot@oneflow.org>
Co-authored-by: lixinqi <lixinqi0703106@163.com>
Co-authored-by: Li Xinqi <lixinqi2010@gmail.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Luyang <flowingsun007@163.com>

* Clear empty thread when graph destroy (#7633)

* Revert "Clear empty thread when graph destroy (#7633)" (#7860)

This reverts commit 3e8585e.

* auto format by CI

* format and rollback env.all_device_placement

* fix a ref-cnt bug in TryRunBarrierInstruction.

* rm env_api

* fix clang-tidy error

* fix clang-tidy in env_imp

* avoid multi env

* refine env api

* format

* refine graph del and sync at shuttingdown

* fix typo

* add comment

* rm useless

* rm useless

* fix static check on CHECK message

* fix bug of graph delete when graph not run

* rollback diff

* quick fix

* ONLY remove independent threads

* refine log

* refine thread limit

Co-authored-by: Shenghang Tsai <jackalcooper@gmail.com>
Co-authored-by: oneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
Co-authored-by: daquexian <daquexian566@gmail.com>
Co-authored-by: oneflow-ci-bot <ci-bot@oneflow.org>
Co-authored-by: strint <xiaoyulink@gmail.com>
Co-authored-by: lixinqi <lixinqi0703106@163.com>
Co-authored-by: Li Xinqi <lixinqi2010@gmail.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Luyang <flowingsun007@163.com>
  • Loading branch information
10 people authored Apr 22, 2022
1 parent 58e94af commit c8c6d35
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 14 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ jobs:
working-directory: ${{ env.ONEFLOW_SRC }}
run: |
docker run -d --rm --privileged --shm-size=8g \
--pids-limit -1 \
--pids-limit 1000 \
--cap-add=SYS_PTRACE --security-opt seccomp=unconfined \
--runtime=nvidia \
-v /dataset:/dataset:ro -v /model_zoo:/model_zoo:ro \
Expand Down Expand Up @@ -913,6 +913,14 @@ jobs:
EXTRA_DOCKER_ARGS+=" --env ONEFLOW_THREAD_ENABLE_LOCAL_MESSAGE_QUEUE=1"
EXTRA_DOCKER_ARGS+=" --env ONEFLOW_KERNEL_DISABLE_BLOB_ACCESS_CHECKER=1"
echo "EXTRA_DOCKER_ARGS=${EXTRA_DOCKER_ARGS}" >> $GITHUB_ENV
- name: Set Thread Limit (CPU)
if: ${{ !fromJson(matrix.cache-hit) && matrix.device == 'cpu' }}
run: |
echo "THREAD_LIMIT=10000" >> $GITHUB_ENV
- name: Set Thread Limit (CUDA)
if: ${{ !fromJson(matrix.cache-hit) && matrix.device == 'cuda' }}
run: |
echo "THREAD_LIMIT=10000" >> $GITHUB_ENV
- name: Enable ONEFLOW_TEST_VERBOSE
if: ${{ contains(github.event.pull_request.labels.*.name, 'need-test-verbose') }}
run: |
Expand All @@ -932,6 +940,7 @@ jobs:
working-directory: ${{ env.ONEFLOW_SRC }}
run: |
docker run -d --rm --privileged --shm-size=8g \
--pids-limit ${{ env.THREAD_LIMIT }} \
--cap-add=SYS_PTRACE --security-opt seccomp=unconfined \
--runtime=nvidia \
-v /dataset:/dataset:ro -v /model_zoo:/model_zoo:ro \
Expand Down
1 change: 1 addition & 0 deletions oneflow/core/framework/multi_client_session_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Maybe<void> MultiClientSessionContext::TryInit(const std::string& config_proto_s
}

Maybe<void> MultiClientSessionContext::UpdateResource(const Resource& reso_proto) {
CHECK_OR_RETURN(is_inited_) << " session must be inited when updating resource.";
CHECK_NOTNULL_OR_RETURN((Global<ResourceDesc, ForSession>::Get()));
Global<ResourceDesc, ForSession>::Get()->Update(reso_proto);
return Maybe<void>::Ok();
Expand Down
23 changes: 22 additions & 1 deletion oneflow/core/job/runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ bool HasNonCtrlConsumedRegstDescId(const TaskProto& task) {
} // namespace

Runtime::Runtime(const Plan& plan, const HashMap<std::string, Blob*>& variable_op_name2eager_blob) {
DumpThreadIdsFromPlan(plan);
{
// NOTE(chengcheng): All runtime Global objects AddPlan
Global<RegstMgr>::Get()->AddPlan(plan, variable_op_name2eager_blob);
Global<ThreadMgr>::Get()->AddPlan(plan);
Global<ThreadMgr>::Get()->AddThreads(thread_ids_);
Global<RuntimeJobDescs>::Get()->AddPlan(plan);
collective_boxing_scheduler_plan_token_ =
Global<boxing::collective::Scheduler>::Get()->AddPlan(plan);
Expand Down Expand Up @@ -106,7 +107,27 @@ Runtime::~Runtime() {
Global<RuntimeCtx>::Get()->WaitUntilCntEqualZero(GetRunningActorCountKeyByJobId(pair.first));
}
OF_SESSION_BARRIER();
Global<ThreadMgr>::Get()->DeleteThreads(independent_thread_ids_);
Global<boxing::collective::Scheduler>::Get()->DeletePlan(collective_boxing_scheduler_plan_token_);
}

void Runtime::DumpThreadIdsFromPlan(const Plan& plan) {
const int64_t this_rank = GlobalProcessCtx::Rank();
for (const TaskProto& task : plan.task()) {
TaskId task_id = DecodeTaskIdFromInt64(task.task_id());
StreamId stream_id = task_id.stream_id();
if (stream_id.rank() != this_rank) { continue; }
int64_t thrd_id = EncodeStreamIdToInt64(stream_id);
thread_ids_.insert(thrd_id);
// NOTE(chengcheng): there is not a interface to query whether a task type is indenpendent,
// so use hard code.
if (task.task_type() == TaskType::kWaitAndSendIds
|| task.task_type() == TaskType::kCriticalSectionWaitTick) {
CHECK(independent_thread_ids_.insert(thrd_id).second)
<< " RuntimeError! Thread : " << thrd_id
<< " not independent with task proto: " << task.DebugString();
}
}
}

} // namespace oneflow
4 changes: 4 additions & 0 deletions oneflow/core/job/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ class Runtime final {
Runtime(const Plan& plan, const HashMap<std::string, Blob*>& variable_op_name2eager_blob);

private:
void DumpThreadIdsFromPlan(const Plan& plan);

HashMap<int64_t, int64_t> job_id2actor_size_;
HashSet<int64_t> thread_ids_;
HashSet<int64_t> independent_thread_ids_;

boxing::collective::SchedulerPlanToken* collective_boxing_scheduler_plan_token_;
};
Expand Down
4 changes: 3 additions & 1 deletion oneflow/core/thread/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ void Thread::PollMsgChannel() {
local_msg_queue_.pop();
if (msg.msg_type() == ActorMsgType::kCmdMsg) {
if (msg.actor_cmd() == ActorCmd::kStopThread) {
CHECK(id2actor_ptr_.empty());
CHECK(id2actor_ptr_.empty())
<< " RuntimeError! Thread: " << thrd_id_
<< " NOT empty when stop with actor num: " << id2actor_ptr_.size();
break;
} else if (msg.actor_cmd() == ActorCmd::kConstructActor) {
ConstructActor(msg.dst_actor_id());
Expand Down
37 changes: 28 additions & 9 deletions oneflow/core/thread/thread_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
#include "oneflow/core/job/resource_desc.h"
#include "oneflow/core/job/global_for.h"
#include "oneflow/core/control/global_process_ctx.h"
#include "oneflow/core/job/global_for.h"

namespace oneflow {

Expand All @@ -26,27 +25,47 @@ ThreadMgr::~ThreadMgr() {
ActorMsg msg = ActorMsg::BuildCommandMsg(-1, ActorCmd::kStopThread);
thread_pair.second->GetMsgChannelPtr()->Send(msg);
thread_pair.second.reset();
VLOG(3) << "actor thread " << thread_pair.first << " finish";
VLOG(1) << " Actor thread: " << thread_pair.first << " finished when process exits.";
}
}

Thread* ThreadMgr::GetThrd(int64_t thrd_id) {
auto iter = threads_.find(thrd_id);
CHECK(iter != threads_.end()) << "thread " << thrd_id << " not found";
CHECK(iter != threads_.end()) << " Thread: " << thrd_id << " not found";
return iter->second.get();
}

void ThreadMgr::AddPlan(const Plan& plan) {
void ThreadMgr::AddThreads(const HashSet<int64_t>& thread_ids) {
const int64_t this_rank = GlobalProcessCtx::Rank();
for (const TaskProto& task : plan.task()) {
TaskId task_id = DecodeTaskIdFromInt64(task.task_id());
StreamId stream_id = task_id.stream_id();
for (int64_t thrd_id : thread_ids) {
const auto& it = threads_.find(thrd_id);
if (it != threads_.end()) {
// NOTE(chengcheng): check thread is not null.
CHECK(it->second) << " RuntimeError! Thread: " << thrd_id << " in manager must be NOT null.";
VLOG(1) << " Actor thread: " << thrd_id << " reused.";
continue;
}
StreamId stream_id = DecodeStreamIdFromInt64(thrd_id);
if (stream_id.rank() != this_rank) { continue; }
int64_t thrd_id = EncodeStreamIdToInt64(stream_id);
if (threads_.find(thrd_id) != threads_.end()) { continue; }
Thread* thread = new Thread(stream_id);
CHECK_NOTNULL(thread);
threads_[thrd_id].reset(thread);
VLOG(1) << " Actor thread: " << thrd_id << " created.";
}
}

void ThreadMgr::DeleteThreads(const HashSet<int64_t>& thread_ids) {
std::unique_lock<std::mutex> lock(mutex4del_threads_);
for (int64_t thrd_id : thread_ids) {
const auto& it = threads_.find(thrd_id);
CHECK((it != threads_.end()) && (it->second))
<< " RuntimeError! Actor thread: " << thrd_id << " non-existent but want to delete";
auto& thread = it->second;
ActorMsg msg = ActorMsg::BuildCommandMsg(-1, ActorCmd::kStopThread);
thread->GetMsgChannelPtr()->Send(msg);
thread.reset();
VLOG(1) << " Actor thread: " << thrd_id << " finished when the graph is destructed.";
threads_.erase(it);
}
}

Expand Down
5 changes: 4 additions & 1 deletion oneflow/core/thread/thread_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.
#ifndef ONEFLOW_CORE_THREAD_THREAD_MANAGER_H_
#define ONEFLOW_CORE_THREAD_THREAD_MANAGER_H_

#include <mutex>
#include "oneflow/core/common/channel.h"
#include "oneflow/core/common/protobuf.h"
#include "oneflow/core/common/auto_registration_factory.h"
Expand All @@ -36,13 +37,15 @@ class ThreadMgr final {
ThreadMgr() = default;
~ThreadMgr();

void AddPlan(const Plan& plan);
void AddThreads(const HashSet<int64_t>& thread_ids);
void DeleteThreads(const HashSet<int64_t>& thread_ids);
Thread* GetThrd(int64_t thrd_id);

private:
friend class Global<ThreadMgr>;

HashMap<int64_t, std::unique_ptr<Thread>> threads_;
std::mutex mutex4del_threads_;
};

void SingleThreadLoop(size_t num, std::function<void(size_t i)> Callback);
Expand Down
2 changes: 1 addition & 1 deletion python/oneflow/framework/env_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def api_all_device_placement(device_type: str) -> oneflow._oneflow_internal.plac
# Runs on 4 ranks
import oneflow as flow
p = flow.env.all_device_placement("cuda") # oneflow.placement(type="cuda", ranks=[0, 1, 2, 3])
p = flow.env.all_device_placement("cpu") # oneflow.placement(type="cpu", ranks=[0, 1, 2, 3])
Expand Down

0 comments on commit c8c6d35

Please sign in to comment.