diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b3a47e9d35f..d3f30a757f7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -733,7 +733,7 @@ jobs: working-directory: ${{ env.ONEFLOW_SRC }} run: | docker run -d --rm --privileged --shm-size=8g \ - --pids-limit -1 \ + --pids-limit 1000 \ --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ --runtime=nvidia \ -v /dataset:/dataset:ro -v /model_zoo:/model_zoo:ro \ @@ -913,6 +913,14 @@ jobs: EXTRA_DOCKER_ARGS+=" --env ONEFLOW_THREAD_ENABLE_LOCAL_MESSAGE_QUEUE=1" EXTRA_DOCKER_ARGS+=" --env ONEFLOW_KERNEL_DISABLE_BLOB_ACCESS_CHECKER=1" echo "EXTRA_DOCKER_ARGS=${EXTRA_DOCKER_ARGS}" >> $GITHUB_ENV + - name: Set Thread Limit (CPU) + if: ${{ !fromJson(matrix.cache-hit) && matrix.device == 'cpu' }} + run: | + echo "THREAD_LIMIT=10000" >> $GITHUB_ENV + - name: Set Thread Limit (CUDA) + if: ${{ !fromJson(matrix.cache-hit) && matrix.device == 'cuda' }} + run: | + echo "THREAD_LIMIT=10000" >> $GITHUB_ENV - name: Enable ONEFLOW_TEST_VERBOSE if: ${{ contains(github.event.pull_request.labels.*.name, 'need-test-verbose') }} run: | @@ -932,6 +940,7 @@ jobs: working-directory: ${{ env.ONEFLOW_SRC }} run: | docker run -d --rm --privileged --shm-size=8g \ + --pids-limit ${{ env.THREAD_LIMIT }} \ --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ --runtime=nvidia \ -v /dataset:/dataset:ro -v /model_zoo:/model_zoo:ro \ diff --git a/oneflow/core/framework/multi_client_session_context.cpp b/oneflow/core/framework/multi_client_session_context.cpp index e26f32c2ed1..01559d71b83 100644 --- a/oneflow/core/framework/multi_client_session_context.cpp +++ b/oneflow/core/framework/multi_client_session_context.cpp @@ -140,6 +140,7 @@ Maybe MultiClientSessionContext::TryInit(const std::string& config_proto_s } Maybe MultiClientSessionContext::UpdateResource(const Resource& reso_proto) { + CHECK_OR_RETURN(is_inited_) << " session must be inited when updating resource."; CHECK_NOTNULL_OR_RETURN((Global::Get())); Global::Get()->Update(reso_proto); return Maybe::Ok(); diff --git a/oneflow/core/job/runtime.cpp b/oneflow/core/job/runtime.cpp index f15357abc47..7cfdc1735dc 100644 --- a/oneflow/core/job/runtime.cpp +++ b/oneflow/core/job/runtime.cpp @@ -58,10 +58,11 @@ bool HasNonCtrlConsumedRegstDescId(const TaskProto& task) { } // namespace Runtime::Runtime(const Plan& plan, const HashMap& variable_op_name2eager_blob) { + DumpThreadIdsFromPlan(plan); { // NOTE(chengcheng): All runtime Global objects AddPlan Global::Get()->AddPlan(plan, variable_op_name2eager_blob); - Global::Get()->AddPlan(plan); + Global::Get()->AddThreads(thread_ids_); Global::Get()->AddPlan(plan); collective_boxing_scheduler_plan_token_ = Global::Get()->AddPlan(plan); @@ -106,7 +107,27 @@ Runtime::~Runtime() { Global::Get()->WaitUntilCntEqualZero(GetRunningActorCountKeyByJobId(pair.first)); } OF_SESSION_BARRIER(); + Global::Get()->DeleteThreads(independent_thread_ids_); Global::Get()->DeletePlan(collective_boxing_scheduler_plan_token_); } +void Runtime::DumpThreadIdsFromPlan(const Plan& plan) { + const int64_t this_rank = GlobalProcessCtx::Rank(); + for (const TaskProto& task : plan.task()) { + TaskId task_id = DecodeTaskIdFromInt64(task.task_id()); + StreamId stream_id = task_id.stream_id(); + if (stream_id.rank() != this_rank) { continue; } + int64_t thrd_id = EncodeStreamIdToInt64(stream_id); + thread_ids_.insert(thrd_id); + // NOTE(chengcheng): there is not a interface to query whether a task type is indenpendent, + // so use hard code. + if (task.task_type() == TaskType::kWaitAndSendIds + || task.task_type() == TaskType::kCriticalSectionWaitTick) { + CHECK(independent_thread_ids_.insert(thrd_id).second) + << " RuntimeError! Thread : " << thrd_id + << " not independent with task proto: " << task.DebugString(); + } + } +} + } // namespace oneflow diff --git a/oneflow/core/job/runtime.h b/oneflow/core/job/runtime.h index d784de07fb1..c305aaa4a1f 100644 --- a/oneflow/core/job/runtime.h +++ b/oneflow/core/job/runtime.h @@ -33,7 +33,11 @@ class Runtime final { Runtime(const Plan& plan, const HashMap& variable_op_name2eager_blob); private: + void DumpThreadIdsFromPlan(const Plan& plan); + HashMap job_id2actor_size_; + HashSet thread_ids_; + HashSet independent_thread_ids_; boxing::collective::SchedulerPlanToken* collective_boxing_scheduler_plan_token_; }; diff --git a/oneflow/core/thread/thread.cpp b/oneflow/core/thread/thread.cpp index d394bf505c7..0af7609a878 100644 --- a/oneflow/core/thread/thread.cpp +++ b/oneflow/core/thread/thread.cpp @@ -60,7 +60,9 @@ void Thread::PollMsgChannel() { local_msg_queue_.pop(); if (msg.msg_type() == ActorMsgType::kCmdMsg) { if (msg.actor_cmd() == ActorCmd::kStopThread) { - CHECK(id2actor_ptr_.empty()); + CHECK(id2actor_ptr_.empty()) + << " RuntimeError! Thread: " << thrd_id_ + << " NOT empty when stop with actor num: " << id2actor_ptr_.size(); break; } else if (msg.actor_cmd() == ActorCmd::kConstructActor) { ConstructActor(msg.dst_actor_id()); diff --git a/oneflow/core/thread/thread_manager.cpp b/oneflow/core/thread/thread_manager.cpp index 16db1d8f151..cad421fdbb3 100644 --- a/oneflow/core/thread/thread_manager.cpp +++ b/oneflow/core/thread/thread_manager.cpp @@ -17,7 +17,6 @@ limitations under the License. #include "oneflow/core/job/resource_desc.h" #include "oneflow/core/job/global_for.h" #include "oneflow/core/control/global_process_ctx.h" -#include "oneflow/core/job/global_for.h" namespace oneflow { @@ -26,27 +25,47 @@ ThreadMgr::~ThreadMgr() { ActorMsg msg = ActorMsg::BuildCommandMsg(-1, ActorCmd::kStopThread); thread_pair.second->GetMsgChannelPtr()->Send(msg); thread_pair.second.reset(); - VLOG(3) << "actor thread " << thread_pair.first << " finish"; + VLOG(1) << " Actor thread: " << thread_pair.first << " finished when process exits."; } } Thread* ThreadMgr::GetThrd(int64_t thrd_id) { auto iter = threads_.find(thrd_id); - CHECK(iter != threads_.end()) << "thread " << thrd_id << " not found"; + CHECK(iter != threads_.end()) << " Thread: " << thrd_id << " not found"; return iter->second.get(); } -void ThreadMgr::AddPlan(const Plan& plan) { +void ThreadMgr::AddThreads(const HashSet& thread_ids) { const int64_t this_rank = GlobalProcessCtx::Rank(); - for (const TaskProto& task : plan.task()) { - TaskId task_id = DecodeTaskIdFromInt64(task.task_id()); - StreamId stream_id = task_id.stream_id(); + for (int64_t thrd_id : thread_ids) { + const auto& it = threads_.find(thrd_id); + if (it != threads_.end()) { + // NOTE(chengcheng): check thread is not null. + CHECK(it->second) << " RuntimeError! Thread: " << thrd_id << " in manager must be NOT null."; + VLOG(1) << " Actor thread: " << thrd_id << " reused."; + continue; + } + StreamId stream_id = DecodeStreamIdFromInt64(thrd_id); if (stream_id.rank() != this_rank) { continue; } - int64_t thrd_id = EncodeStreamIdToInt64(stream_id); - if (threads_.find(thrd_id) != threads_.end()) { continue; } Thread* thread = new Thread(stream_id); CHECK_NOTNULL(thread); threads_[thrd_id].reset(thread); + VLOG(1) << " Actor thread: " << thrd_id << " created."; + } +} + +void ThreadMgr::DeleteThreads(const HashSet& thread_ids) { + std::unique_lock lock(mutex4del_threads_); + for (int64_t thrd_id : thread_ids) { + const auto& it = threads_.find(thrd_id); + CHECK((it != threads_.end()) && (it->second)) + << " RuntimeError! Actor thread: " << thrd_id << " non-existent but want to delete"; + auto& thread = it->second; + ActorMsg msg = ActorMsg::BuildCommandMsg(-1, ActorCmd::kStopThread); + thread->GetMsgChannelPtr()->Send(msg); + thread.reset(); + VLOG(1) << " Actor thread: " << thrd_id << " finished when the graph is destructed."; + threads_.erase(it); } } diff --git a/oneflow/core/thread/thread_manager.h b/oneflow/core/thread/thread_manager.h index 39602dd8681..7d562a8b756 100644 --- a/oneflow/core/thread/thread_manager.h +++ b/oneflow/core/thread/thread_manager.h @@ -16,6 +16,7 @@ limitations under the License. #ifndef ONEFLOW_CORE_THREAD_THREAD_MANAGER_H_ #define ONEFLOW_CORE_THREAD_THREAD_MANAGER_H_ +#include #include "oneflow/core/common/channel.h" #include "oneflow/core/common/protobuf.h" #include "oneflow/core/common/auto_registration_factory.h" @@ -36,13 +37,15 @@ class ThreadMgr final { ThreadMgr() = default; ~ThreadMgr(); - void AddPlan(const Plan& plan); + void AddThreads(const HashSet& thread_ids); + void DeleteThreads(const HashSet& thread_ids); Thread* GetThrd(int64_t thrd_id); private: friend class Global; HashMap> threads_; + std::mutex mutex4del_threads_; }; void SingleThreadLoop(size_t num, std::function Callback); diff --git a/python/oneflow/framework/env_util.py b/python/oneflow/framework/env_util.py index 07c096c9a7e..d216dfe88e3 100644 --- a/python/oneflow/framework/env_util.py +++ b/python/oneflow/framework/env_util.py @@ -40,7 +40,7 @@ def api_all_device_placement(device_type: str) -> oneflow._oneflow_internal.plac # Runs on 4 ranks import oneflow as flow - + p = flow.env.all_device_placement("cuda") # oneflow.placement(type="cuda", ranks=[0, 1, 2, 3]) p = flow.env.all_device_placement("cpu") # oneflow.placement(type="cpu", ranks=[0, 1, 2, 3])