diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4c15e89f3b8..859664c29e8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -529,7 +529,7 @@ jobs: working-directory: ${{ env.ONEFLOW_SRC }} run: | docker run -d --rm --privileged --shm-size=8g \ - --pids-limit 1000 \ + --pids-limit -1 \ --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ --runtime=nvidia \ -v /dataset:/dataset:ro -v /model_zoo:/model_zoo:ro \ @@ -691,14 +691,6 @@ jobs: EXTRA_DOCKER_ARGS+=" --env ONEFLOW_THREAD_ENABLE_LOCAL_MESSAGE_QUEUE=1" EXTRA_DOCKER_ARGS+=" --env ONEFLOW_KERNEL_DISABLE_BLOB_ACCESS_CHECKER=1" echo "EXTRA_DOCKER_ARGS=${EXTRA_DOCKER_ARGS}" >> $GITHUB_ENV - - name: Set Thread Limit (CPU) - if: ${{ !fromJson(matrix.cache-hit) && matrix.device == 'cpu' }} - run: | - echo "THREAD_LIMIT=8000" >> $GITHUB_ENV - - name: Set Thread Limit (CUDA) - if: ${{ !fromJson(matrix.cache-hit) && matrix.device == 'cuda' }} - run: | - echo "THREAD_LIMIT=3000" >> $GITHUB_ENV - name: Enable ONEFLOW_TEST_VERBOSE if: ${{ contains(github.event.pull_request.labels.*.name, 'need-test-verbose') }} run: | @@ -718,7 +710,6 @@ jobs: working-directory: ${{ env.ONEFLOW_SRC }} run: | docker run -d --rm --privileged --shm-size=8g \ - --pids-limit ${{ env.THREAD_LIMIT }} \ --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ --runtime=nvidia \ -v /dataset:/dataset:ro -v /model_zoo:/model_zoo:ro \ diff --git a/oneflow/core/framework/multi_client_session_context.cpp b/oneflow/core/framework/multi_client_session_context.cpp index f106482894a..15c368959b1 100644 --- a/oneflow/core/framework/multi_client_session_context.cpp +++ b/oneflow/core/framework/multi_client_session_context.cpp @@ -141,7 +141,6 @@ Maybe MultiClientSessionContext::TryInit(const std::string& config_proto_s } Maybe MultiClientSessionContext::UpdateResource(const Resource& reso_proto) { - CHECK_OR_RETURN(is_inited_); CHECK_NOTNULL_OR_RETURN((Global::Get())); Global::Get()->Update(reso_proto); return Maybe::Ok(); diff --git a/oneflow/core/framework/nn_graph.cpp b/oneflow/core/framework/nn_graph.cpp index 9ccd7e488ea..a64286c344d 100644 --- a/oneflow/core/framework/nn_graph.cpp +++ b/oneflow/core/framework/nn_graph.cpp @@ -76,12 +76,9 @@ NNGraph::~NNGraph() { Maybe NNGraph::Close() { if (!is_closed_) { VLOG(1) << "Try to close c nn graph name " << name_ << "." << std::endl; - if (runtime_inited_) { - CloseRuntimeBuffers(); - runtime_.reset(); - } + CloseRuntimeBuffers(); + runtime_.reset(); session_ctx_->RemoveGraphFreeEagerTensors(name_); - is_closed_ = true; VLOG(1) << "Finish close c nn graph name " << name_ << "." << std::endl; session_ctx_.reset(); @@ -431,23 +428,25 @@ void NNGraph::NewRuntimeBuffers() { } void NNGraph::CloseRuntimeBuffers() { - { - auto* buffer_mgr = Global>>::Get(); - for (const std::string& output_op_name : outputs_op_names_) { - buffer_mgr->Get(GetOutputBufferName(name_, output_op_name))->Close(); + if (runtime_inited_) { + { + auto* buffer_mgr = Global>>::Get(); + for (const std::string& output_op_name : outputs_op_names_) { + buffer_mgr->Get(GetOutputBufferName(name_, output_op_name))->Close(); + } + for (const std::string& input_op_name : inputs_op_names_) { + buffer_mgr->Get(GetInputBufferName(name_, input_op_name))->Close(); + } + buffer_mgr->Get(GetOutputCriticalSectionCallbackBufferName(name_))->Close(); + buffer_mgr->Get(GetOutputCriticalSectionWaitBufferName(name_))->Close(); + buffer_mgr->Get(GetInputCriticalSectionCallbackBufferName(name_))->Close(); + buffer_mgr->Get(GetInputCriticalSectionWaitBufferName(name_))->Close(); } - for (const std::string& input_op_name : inputs_op_names_) { - buffer_mgr->Get(GetInputBufferName(name_, input_op_name))->Close(); + { + auto* buffer_mgr = Global>>::Get(); + buffer_mgr->Get(GetCallbackNotifierBufferName(name_))->Close(); + buffer_mgr->Get(GetSourceTickBufferName(name_))->Close(); } - buffer_mgr->Get(GetOutputCriticalSectionCallbackBufferName(name_))->Close(); - buffer_mgr->Get(GetOutputCriticalSectionWaitBufferName(name_))->Close(); - buffer_mgr->Get(GetInputCriticalSectionCallbackBufferName(name_))->Close(); - buffer_mgr->Get(GetInputCriticalSectionWaitBufferName(name_))->Close(); - } - { - auto* buffer_mgr = Global>>::Get(); - buffer_mgr->Get(GetCallbackNotifierBufferName(name_))->Close(); - buffer_mgr->Get(GetSourceTickBufferName(name_))->Close(); } } diff --git a/oneflow/core/job/runtime.cpp b/oneflow/core/job/runtime.cpp index 4607a7faa2f..f15357abc47 100644 --- a/oneflow/core/job/runtime.cpp +++ b/oneflow/core/job/runtime.cpp @@ -58,11 +58,10 @@ bool HasNonCtrlConsumedRegstDescId(const TaskProto& task) { } // namespace Runtime::Runtime(const Plan& plan, const HashMap& variable_op_name2eager_blob) { - DumpThreadIdsFromPlan(plan); { // NOTE(chengcheng): All runtime Global objects AddPlan Global::Get()->AddPlan(plan, variable_op_name2eager_blob); - Global::Get()->AddThreads(thread_ids_); + Global::Get()->AddPlan(plan); Global::Get()->AddPlan(plan); collective_boxing_scheduler_plan_token_ = Global::Get()->AddPlan(plan); @@ -107,19 +106,7 @@ Runtime::~Runtime() { Global::Get()->WaitUntilCntEqualZero(GetRunningActorCountKeyByJobId(pair.first)); } OF_SESSION_BARRIER(); - Global::Get()->TryDeleteThreads(thread_ids_); Global::Get()->DeletePlan(collective_boxing_scheduler_plan_token_); } -void Runtime::DumpThreadIdsFromPlan(const Plan& plan) { - const int64_t this_rank = GlobalProcessCtx::Rank(); - for (const TaskProto& task : plan.task()) { - TaskId task_id = DecodeTaskIdFromInt64(task.task_id()); - StreamId stream_id = task_id.stream_id(); - if (stream_id.rank() != this_rank) { continue; } - int64_t thrd_id = EncodeStreamIdToInt64(stream_id); - thread_ids_.insert(thrd_id); - } -} - } // namespace oneflow diff --git a/oneflow/core/job/runtime.h b/oneflow/core/job/runtime.h index 984142e8617..d784de07fb1 100644 --- a/oneflow/core/job/runtime.h +++ b/oneflow/core/job/runtime.h @@ -33,10 +33,7 @@ class Runtime final { Runtime(const Plan& plan, const HashMap& variable_op_name2eager_blob); private: - void DumpThreadIdsFromPlan(const Plan& plan); - HashMap job_id2actor_size_; - HashSet thread_ids_; boxing::collective::SchedulerPlanToken* collective_boxing_scheduler_plan_token_; }; diff --git a/oneflow/core/thread/thread.cpp b/oneflow/core/thread/thread.cpp index 6feac565cca..5361bf1c694 100644 --- a/oneflow/core/thread/thread.cpp +++ b/oneflow/core/thread/thread.cpp @@ -50,8 +50,6 @@ void Thread::AddTask(const TaskProto& task) { CHECK(id2task_.emplace(task.task_id(), task).second); } -bool Thread::Empty() const { return id2actor_ptr_.empty(); } - void Thread::PollMsgChannel() { while (true) { if (local_msg_queue_.empty()) { diff --git a/oneflow/core/thread/thread.h b/oneflow/core/thread/thread.h index 7c7c2bacca8..e51f1eb0fda 100644 --- a/oneflow/core/thread/thread.h +++ b/oneflow/core/thread/thread.h @@ -34,8 +34,6 @@ class Thread { virtual ~Thread(); void AddTask(const TaskProto&); - // NOTE(chengcheng): Indicates whether all actors in the thread have been destructed. - bool Empty() const; Channel* GetMsgChannelPtr() { return &msg_channel_; } diff --git a/oneflow/core/thread/thread_manager.cpp b/oneflow/core/thread/thread_manager.cpp index 4e65efc0160..16db1d8f151 100644 --- a/oneflow/core/thread/thread_manager.cpp +++ b/oneflow/core/thread/thread_manager.cpp @@ -17,12 +17,17 @@ limitations under the License. #include "oneflow/core/job/resource_desc.h" #include "oneflow/core/job/global_for.h" #include "oneflow/core/control/global_process_ctx.h" +#include "oneflow/core/job/global_for.h" namespace oneflow { ThreadMgr::~ThreadMgr() { - CHECK(threads_.empty()) << " Runtime Error! num = " << threads_.size() - << " threads did not destroy with graph."; + for (auto& thread_pair : threads_) { + ActorMsg msg = ActorMsg::BuildCommandMsg(-1, ActorCmd::kStopThread); + thread_pair.second->GetMsgChannelPtr()->Send(msg); + thread_pair.second.reset(); + VLOG(3) << "actor thread " << thread_pair.first << " finish"; + } } Thread* ThreadMgr::GetThrd(int64_t thrd_id) { @@ -31,46 +36,20 @@ Thread* ThreadMgr::GetThrd(int64_t thrd_id) { return iter->second.get(); } -void ThreadMgr::AddThreads(const HashSet& thread_ids) { +void ThreadMgr::AddPlan(const Plan& plan) { const int64_t this_rank = GlobalProcessCtx::Rank(); - for (int64_t thrd_id : thread_ids) { - const auto& it = threads_.find(thrd_id); - if (it != threads_.end()) { - // NOTE(chengcheng): check thread is not null. - CHECK(it->second) << " Runtime Error! Thread: " << thrd_id << " in manager must be NOT null."; - continue; - } - StreamId stream_id = DecodeStreamIdFromInt64(thrd_id); + for (const TaskProto& task : plan.task()) { + TaskId task_id = DecodeTaskIdFromInt64(task.task_id()); + StreamId stream_id = task_id.stream_id(); if (stream_id.rank() != this_rank) { continue; } + int64_t thrd_id = EncodeStreamIdToInt64(stream_id); + if (threads_.find(thrd_id) != threads_.end()) { continue; } Thread* thread = new Thread(stream_id); CHECK_NOTNULL(thread); threads_[thrd_id].reset(thread); } } -void ThreadMgr::TryDeleteThreads(const HashSet& thread_ids) { - std::unique_lock lock(mutex4del_threads_); - for (int64_t thrd_id : thread_ids) { - const auto& it = threads_.find(thrd_id); - if (it == threads_.end()) { continue; } - auto& thread = it->second; - CHECK(thread) << " actor thread " << thrd_id << " non-existent but want to delete"; - if (thread->Empty()) { - // NOTE(chengcheng): Only delete thread when it is empty. - // We need send Stop msg to exit the main loop of the thread. Here we can safely call reset - // directly, because the thread destructor will specify actor_thread_.join() to blocking - // wait for the end of the thread real execution. - ActorMsg msg = ActorMsg::BuildCommandMsg(-1, ActorCmd::kStopThread); - thread->GetMsgChannelPtr()->Send(msg); - thread.reset(); - VLOG(2) << " actor thread " << thrd_id << " finish."; - threads_.erase(it); - } else { - LOG(INFO) << " actor thread " << thrd_id << " not delete because it is not empty."; - } - } -} - void SingleThreadLoop(size_t num, std::function Callback) { FOR_RANGE(size_t, i, 0, num) { Callback(i); } } diff --git a/oneflow/core/thread/thread_manager.h b/oneflow/core/thread/thread_manager.h index b8be8b6f11b..39602dd8681 100644 --- a/oneflow/core/thread/thread_manager.h +++ b/oneflow/core/thread/thread_manager.h @@ -16,7 +16,6 @@ limitations under the License. #ifndef ONEFLOW_CORE_THREAD_THREAD_MANAGER_H_ #define ONEFLOW_CORE_THREAD_THREAD_MANAGER_H_ -#include #include "oneflow/core/common/channel.h" #include "oneflow/core/common/protobuf.h" #include "oneflow/core/common/auto_registration_factory.h" @@ -37,15 +36,13 @@ class ThreadMgr final { ThreadMgr() = default; ~ThreadMgr(); - void AddThreads(const HashSet& thread_ids); - void TryDeleteThreads(const HashSet& thread_ids); + void AddPlan(const Plan& plan); Thread* GetThrd(int64_t thrd_id); private: friend class Global; HashMap> threads_; - std::mutex mutex4del_threads_; }; void SingleThreadLoop(size_t num, std::function Callback);