Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Runtime][Pipeline Executor] multiple threads management and the data forwarding notification mechanism. #10234

Merged
merged 10 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions python/tvm/contrib/pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,22 +117,18 @@ def __init__(self, module):
# Get the packed functions from the pipeline executor.
self._get_params_group_pipeline_map = self.module["get_params_group_pipeline_map"]
self._run = self.module["run"]
self._stop = self.module["stop"]
self._set_param = self.module["set_param"]
self._set_input = self.module["set_input"]
self._get_input = self.module["get_input"]
self._get_output = self.module["get_output"]
self._get_num_outputs = self.module["get_num_outputs"]
self._get_input_pipeline_map = self.module["get_input_pipeline_map"]
self._get_pipe_execute_count = self.module["get_execute_count"]

def run(self, sync=False):
"""Run the pipeline executor."""
self._run(sync)

def stop(self):
"""Stop the pipeline executor."""
self._stop()

def get_input_pipeline_map(self, name):
"""Using the "name" to get the corresponding subgraph index and also get the "input name"
of the corresponding subgraph interface.
Expand Down Expand Up @@ -213,6 +209,16 @@ def get_output(self):
"""
return self._get_output()

@property
def num_executing_pipeline(self):
"""Getting the count of running pipeline.
Returns
-------
count : int
The count of running pipeline.
"""
return self._get_pipe_execute_count()

@property
def num_outputs(self):
"""Get the number of outputs.
Expand Down
16 changes: 7 additions & 9 deletions src/runtime/pipeline/pipeline_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name,
[sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetOutput(); });
} else if (name == "run") {
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Run(args[0]); });
} else if (name == "stop") {
return PackedFunc([sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { this->Stop(); });
} else if (name == "get_execute_count") {
return PackedFunc(
[sptr_to_self, this](TVMArgs args, TVMRetValue* rv) { *rv = this->GetExecutionCount(); });
} else {
LOG(FATAL) << "Unknown packed function: " << name;
return PackedFunc();
Expand All @@ -96,7 +97,6 @@ PackedFunc PipelineExecutor::GetFunction(const std::string& name,
void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) {
std::pair<int, int> indexs = this->GetInputIndex(input_name);
if (indexs.first < 0 || indexs.first >= static_cast<int>(runtimes_.size())) {
this->Stop();
LOG(FATAL) << "input name " << input_name << " not found.";
}
runtimes_[indexs.first]->SetInput(indexs.second, data_in);
Expand All @@ -109,7 +109,6 @@ void PipelineExecutor::SetInput(std::string input_name, DLTensor* data_in) {
NDArray PipelineExecutor::GetInput(std::string input_name) {
std::pair<int, int> indexs = this->GetInputIndex(input_name);
if (indexs.first < 0 || indexs.first >= static_cast<int>(runtimes_.size())) {
this->Stop();
LOG(FATAL) << "input name " << input_name << " not found.";
}
return runtimes_[indexs.first]->GetInput(indexs.second);
Expand Down Expand Up @@ -153,11 +152,6 @@ void PipelineExecutor::Run(bool serialized_mode) {
* \brief return A list of global output data.
*/
Array<NDArray> PipelineExecutor::GetOutput(void) { return pipeline_scheduler_.PipelineGetOutput(); }
/*!
* \brief Stop the pipeline executor.
*/
void PipelineExecutor::Stop() { pipeline_scheduler_.PipelineStop(); }

/*!
* \brief Use the mod_config information to create a graph runtime list.
* \param mod_config The config information that generates by the export library function call.
Expand Down Expand Up @@ -242,6 +236,10 @@ std::pair<int, int> PipelineExecutor::GetInputIndex(const std::string& name) {
auto gruntime = runtimes_[index.first];
return std::make_pair(index.first, gruntime->GetInputIndex(index.second));
}
/*!
* \brief Getting the count of running pipeline.
*/
int PipelineExecutor::GetExecutionCount() { return runtimes_.back()->GetExecutionCount(); }
/*!
* \brief Initialize the pipeline executor with a list of modules to be pipelined
* and config in JSON format.
Expand Down
8 changes: 4 additions & 4 deletions src/runtime/pipeline/pipeline_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
* \return Return input data.
*/
NDArray GetInput(std::string input_name);
/*!
* \brief Getting the count of running pipeline.
*/
int GetExecutionCount();
/*!
* \brief Use the parameters group name to get the specific backend runtime then use
* the param_key_name to set param data for the said backend runtime.
Expand All @@ -114,10 +118,6 @@ class TVM_DLL PipelineExecutor : public ModuleNode {
* \param serialized_mode Whether run the pipeline executor in serialized mode.
*/
void Run(bool serialized_mode);
/*!
* \brief Stop the pipeline executor.
*/
void Stop();
/*!
* \brief Get a list output data.
* \return A list of output data.
Expand Down
18 changes: 8 additions & 10 deletions src/runtime/pipeline/pipeline_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ std::vector<std::shared_ptr<BackendRuntime>> PipelineScheduler::PipelineInit(
const std::vector<Module>& modules, const ConfigPipelineExecution& pipeline_config) {
std::vector<std::shared_ptr<BackendRuntime>> runtimes;
graph_modules_ = modules;
// Creating a list of runtimes.
for (size_t i = 0; i < graph_modules_.size(); i++) {
auto runItem = std::make_shared<BackendRuntime>(graph_modules_[i], i);
runtimes.push_back(runItem);
auto run_item = std::make_shared<BackendRuntime>(graph_modules_[i], i);
runtimes.push_back(run_item);
}
// Creating a list of NDArray in order to storage the outputs data.
auto global_output_map = pipeline_config.GetGlobalConfigOutputBindings();
Expand All @@ -46,6 +47,10 @@ std::vector<std::shared_ptr<BackendRuntime>> PipelineScheduler::PipelineInit(
NDArray output = runtimes[output_pair.first]->CreateFromOutput(output_pair.second);
output_arrays_.push_back(output);
}
// Initializing and then running the worker thread.
for (auto runtime : runtimes) {
runtime->InitializePipeline(pipeline_config, &runtimes);
}
return runtimes;
}
/*!
Expand Down Expand Up @@ -101,18 +106,11 @@ void PipelineScheduler::PipelineRunSequential(
void PipelineScheduler::PipelineRun(const std::vector<std::shared_ptr<BackendRuntime>>& runtimes,
ConfigPipelineExecution pipeline_config, bool sequential_mode) {
if (!sequential_mode) {
// TODO(huajsj) remove this check after all of pipeline features in.
LOG(FATAL) << "Currently only supports sequential mode.";
runtimes.front()->RunPipeline();
} else {
PipelineRunSequential(runtimes, pipeline_config);
}
}
/*!
* \brief Stop the pipeline exection.
*/
void PipelineScheduler::PipelineStop() {
// TODO(huajsj) Add stop logic.
}
/*!
* \brief Get a list of output.
*/
Expand Down
4 changes: 0 additions & 4 deletions src/runtime/pipeline/pipeline_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ class PipelineScheduler {
*/
void PipelineRunSequential(const std::vector<std::shared_ptr<BackendRuntime>>& runtimes,
ConfigPipelineExecution pipeline_config);
/*!
* \brief Stop the pipeline exection.
*/
void PipelineStop();
/*!
* \brief Get a list of outputs.
*/
Expand Down
Loading