From 213cfa1d3009317803ca4c7cd6974213a8d1d2d1 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Thu, 26 May 2022 19:27:48 +0800 Subject: [PATCH 01/22] Add a fast topological traversal --- oneflow/core/graph/graph.h | 50 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/oneflow/core/graph/graph.h b/oneflow/core/graph/graph.h index a72f728c1d8..68bfa06585e 100644 --- a/oneflow/core/graph/graph.h +++ b/oneflow/core/graph/graph.h @@ -35,8 +35,10 @@ class Graph { void ForEachNode(std::function NodeHandler) const; Maybe MaybeForEachNode(std::function(NodeType*)> NodeHandler) const; void TopoForEachNode(std::function NodeHandler) const; + void TopoForEachNodeFast(std::function NodeHandler) const; Maybe TopoForEachNodeWithErrorCaptured( std::function(NodeType*)> NodeHandler) const; + Maybe TopoForEachNodeFastMaybe(std::function(NodeType*)> NodeHandler) const; void ReverseTopoForEachNode(std::function NodeHandler) const; void ForEachEdge(std::function EdgeHandler) const; @@ -65,6 +67,12 @@ class Graph { const std::function&)>& ForEachOutNode, const std::function(NodeType*)>& Handler) const; + Maybe TopoForEachNodeFastMaybe( + const std::list& starts, + const std::function&)>& ForEachInNode, + const std::function&)>& ForEachOutNode, + const std::function(NodeType*)>& Handler) const; + void DfsTopoForEachNode( const std::list& starts, const std::function&)>& ForEachInNode, @@ -217,6 +225,16 @@ void Graph::TopoForEachNode(std::function N NodeHandler); } +template +void Graph::TopoForEachNodeFast( + std::function NodeHandler) const { + TopoForEachNodeFastMaybe(source_nodes(), &NodeType::ForEachNodeOnInEdge, + &NodeType::ForEachNodeOnOutEdge, [&](NodeType* node) { + NodeHandler(node); + return Maybe::Ok(); + }); +} + template Maybe Graph::TopoForEachNodeWithErrorCaptured( std::function(NodeType*)> NodeHandler) const { @@ -224,6 +242,13 @@ Maybe Graph::TopoForEachNodeWithErrorCaptured( &NodeType::ForEachNodeOnOutEdge, NodeHandler); } +template +Maybe Graph::TopoForEachNodeFastMaybe( + std::function(NodeType*)> NodeHandler) const { + return TopoForEachNodeFastMaybe(source_nodes(), &NodeType::ForEachNodeOnInEdge, + &NodeType::ForEachNodeOnOutEdge, NodeHandler); +} + template void Graph::SortedTopoForEachNode( std::function LessThan, @@ -537,6 +562,31 @@ Maybe Graph::TopoForEachNodeWithErrorCaptured( return Maybe::Ok(); } +template +Maybe Graph::TopoForEachNodeFastMaybe( + const std::list& starts, + const std::function&)>& ForEachInNode, + const std::function&)>& ForEachOutNode, + const std::function(NodeType*)>& Handler) const { + HashMap counter_in; + std::queue queue; + ForEachNode([&](NodeType* node) { + int32_t count = 0; + ForEachInNode(node, [&](NodeType*) { count++; }); + counter_in[node] = count; + if (count == 0) { queue.push(node); } + }); + while (!queue.empty()) { + NodeType* cur_node = queue.front(); + queue.pop(); + JUST(Handler(cur_node)); + ForEachOutNode(cur_node, [&](NodeType* out) { + if (--counter_in[out] == 0) { queue.push(out); } + }); + } + return Maybe::Ok(); +} + template void Graph::DfsTopoForEachNodeSortByDistanceToSink( const std::list& starts, From ea6370a6665fd0800080ff98c00f7b5870a251be Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Thu, 26 May 2022 19:43:02 +0800 Subject: [PATCH 02/22] Add an initial implementation of straighen nodes --- oneflow/core/graph/straighten_nodes.cpp | 139 ++++++++++++++++++++++++ oneflow/core/graph/straighten_nodes.h | 59 ++++++++++ 2 files changed, 198 insertions(+) create mode 100644 oneflow/core/graph/straighten_nodes.cpp create mode 100644 oneflow/core/graph/straighten_nodes.h diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp new file mode 100644 index 00000000000..63e840ddb08 --- /dev/null +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -0,0 +1,139 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include "oneflow/core/job_rewriter/straighten_nodes.h" +#include "oneflow/core/graph/op_graph.h" +#include "oneflow/core/auto_parallel/sbp_constructor.h" +#include "oneflow/core/graph/task_node.h" +#include "oneflow/core/job/job_desc.h" +#include "oneflow/core/common/protobuf.h" +#include "oneflow/core/rpc/include/global_process_ctx.h" + +namespace oneflow { + +namespace { + +bool IsTransferNode(int32_t task_type) { + return task_type == 12 || task_type == 13 || (48 <= task_type && task_type <= 64); +} + +} // anonymous namespace + +// Drop down the maximum layer with the minimum layer form consumer +void TopoStruct::DropTributaryLayer(int32_t upper_bound) { + if (upper_bound < TributaryLayer || TributaryLayer < 0) { TributaryLayer = upper_bound; } +} + +// Should initialize the counter to be the number of out edges +// Compute maximum layer for tributaries +void TopoStruct::SpreadTributaryLayer(HashMap& task_node2topo_struct) { + if (counter || MinLayer <= 0) { return; } + int32_t producer_max_lay = 0; + if (IfMainstem) { + producer_max_lay = MinLayer - 1; + } else { + // On a tributary, the operator could be run later. + producer_max_lay = TributaryLayer; + // producer_max_lay = TributaryLayer - 1; + } + node->ForEachNodeOnInEdge([&](TaskNode* in) { + auto& topo_struct_in = task_node2topo_struct[in]; + topo_struct_in.DropTributaryLayer(producer_max_lay); + if (--topo_struct_in.counter == 0) { + topo_struct_in.SpreadTributaryLayer(task_node2topo_struct); + } + }); + // Reduce counter to -1 to avoid visitting again + counter--; +} + +// Judge if this node is on the mainstem +// If so, judge it for its producer/upstream nodes +void TopoStruct::SpreadMainstem(HashMap& task_node2topo_struct) { + // Skip it if this node is already judged. + if (IfMainstem) { return; } + CHECK(MinLayer >= 0) << "TopoStruct not initialized!"; + IfMainstem = true; + // If I am in the mainstem, then all the children with (MinLayer >= my layer id - 1) would be + // considered as in the mainstem + node->ForEachNodeOnInEdge([&](TaskNode* in) { + auto& topo_struct_in = task_node2topo_struct[in]; + if (topo_struct_in.MinLayer == MinLayer - 1) { + topo_struct_in.SpreadTributaryLayer(task_node2topo_struct); + } + }); +} + +// The minimum computation distance from the beginning of this op to the next transfer +int32_t TopoStruct::GetMinDistance2Transfer(HashMap& task_node2topo_struct) { + if (MinDistance2Transfer >= 0) { return MinDistance2Transfer; } + // if this node is a transfer node + if (IsTransferNode(node->GetTaskType())) { + MinDistance2Transfer = 0; + return MinDistance2Transfer; + } + MinDistance2Transfer = 1000000; + node->ForEachNodeOnOutEdge([&](TaskNode* out) { + MinDistance2Transfer = + std::min(MinDistance2Transfer, + task_node2topo_struct[out].GetMinDistance2Transfer(task_node2topo_struct)); + }); + return ++MinDistance2Transfer; +} + +// deciding parameter +int32_t TopoStruct::GetDecidingParameter(int32_t i) const { + int32_t sign = 1; + if (i >= 3) { + i -= 3; + sign = -1; + } + switch (i) { + case 0: return sign * TributaryLayer; + case 1: return sign * MinDistance2Transfer; + case 2: return sign * MinLayer; + } + return 0; +} + +// Find the mianstem of the task graph, then reduce the wait time for tributaries +void FindMainstem(HashMap& task_node2topo_struct) { + // Find the maximum layer number + int32_t max_MinLayer = -1; + for (const auto& pair : task_node2topo_struct) { + if (max_MinLayer < pair.second.MinLayer) { max_MinLayer = pair.second.MinLayer; } + } + // All the nodes with MinLayer>=mainstem_end_id would be considered as mainstem nodes + int32_t mainstem_end_id = max_MinLayer - 4; + for (auto& pair : task_node2topo_struct) { + auto& topo_struct = pair.second; + // Initialize the counter and Tributary Layer + topo_struct.counter = pair.first->out_edges().size(); + topo_struct.TributaryLayer = max_MinLayer; + // Find out all the nodes on the mainstem. + if (topo_struct.MinLayer >= mainstem_end_id) { + topo_struct.SpreadMainstem(task_node2topo_struct); + } + } + + for (auto& pair : task_node2topo_struct) { + // Compute maximum layer for tributaries + pair.second.SpreadTributaryLayer(task_node2topo_struct); + // Set the MinDistance2Transfer for each topological structure + pair.second.GetMinDistance2Transfer(task_node2topo_struct); + } +} + +} // namespace oneflow diff --git a/oneflow/core/graph/straighten_nodes.h b/oneflow/core/graph/straighten_nodes.h new file mode 100644 index 00000000000..5980a8709d7 --- /dev/null +++ b/oneflow/core/graph/straighten_nodes.h @@ -0,0 +1,59 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef STRAIGHTEN_NODES_H_ +#define STRAIGHTEN_NODES_H_ + +#include "oneflow/core/graph/op_graph.h" +#include "oneflow/core/graph/task_node.h" + +namespace oneflow { + +class OpGraph; +class Job; + +class TopoStruct { + public: + TaskNode* node; + int32_t MinLayer = -1; + int32_t TributaryLayer = -1; + bool IfMainstem = false; + int32_t counter = 0; + int32_t MinDistance2Transfer = -1; + TopoStruct* next_same_node = nullptr; + // We can have some other nodes in it for example + // SbpNode* node; + // SbpEdge* node; + // Or we can omit all the pointers and leave all the useful parameters. + + // Drop down the tributary layer + void DropTributaryLayer(int32_t upper_bound); + + void SpreadTributaryLayer(HashMap& task_node2topo_struct); + + void SpreadMainstem(HashMap& task_node2topo_struct); + + // The minimum computation distance from the beginning of this op to the next transfer + int32_t GetMinDistance2Transfer(HashMap& task_node2topo_struct); + + // deciding parameter + int32_t GetDecidingParameter(int32_t i) const; +}; + +void FindMainstem(HashMap& task_node2topo_struct); + +} // namespace oneflow + +#endif // STRAIGHTEN_NODES_H_ From d99ebf42021607566aeae51472860797042545ab Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Thu, 26 May 2022 21:44:35 +0800 Subject: [PATCH 03/22] Add the straighen nodes algorithm --- oneflow/core/graph/straighten_nodes.cpp | 343 +++++++++++++++++++++++- oneflow/core/graph/straighten_nodes.h | 5 +- oneflow/core/graph/task_graph.cpp | 7 +- 3 files changed, 350 insertions(+), 5 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 63e840ddb08..a931a7f24b1 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -13,9 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "oneflow/core/job_rewriter/straighten_nodes.h" +#include "oneflow/core/graph/straighten_nodes.h" #include "oneflow/core/graph/op_graph.h" -#include "oneflow/core/auto_parallel/sbp_constructor.h" #include "oneflow/core/graph/task_node.h" #include "oneflow/core/job/job_desc.h" #include "oneflow/core/common/protobuf.h" @@ -136,4 +135,344 @@ void FindMainstem(HashMap& task_node2topo_struct) { } } +void StraightenNodes(TaskGraph* task_graph, std::vector& ordered_task_nodes) { + // The function for settle the order in the graph + int64_t order_in_graph = 0; + HashMap task_type_map; + + // The same order in the set + // It is only run in the following situation because there are too many implicit conditions. + auto should_run_simultaneously = [](TopoStruct* a, TopoStruct* b) -> bool { + // Normal node would have the same name + if (a->node->GetTaskType() == 1) { return a->node->VisualStr() == b->node->VisualStr(); } + // Otherwise they must have the same parameters with different machine ids and the closest node + // id. We only use Min Layer here, since Tributary Layer might be different due to asymmetry of + // graph. + return true; + }; + + // move the head from source to target + auto move_front_between_maps = [](std::map& source, + std::map& target) { + if (!source.empty()) { + const auto& front = source.begin(); + target[front->first] = front->second; + source.erase(front); + } + }; + + // Generate topological data structure for each task node + HashMap task_node2topo_struct; + // Determine the same nodes which should run simultaneously + HashMap>> + task_type2machine_id2node_id2topo_structs; + std::map min_node_id2topo_struct; + int32_t previous_MinLayer = 0; + task_graph->TopoForEachNodeFast([&](TaskNode* node) { + auto& topo_struct = task_node2topo_struct[node]; + topo_struct.node = node; + if (node->in_edges().empty()) { + topo_struct.MinLayer = 0; + } else { + int32_t max_min_layer = 0; + node->ForEachNodeOnInEdge([&](TaskNode* in) { + max_min_layer = std::max(max_min_layer, task_node2topo_struct[in].MinLayer); + }); + topo_struct.MinLayer = max_min_layer + 1; + // Deal with all the nodes with MinLayer=previous_MinLayer + std::cout << "Min Layer: " << topo_struct.MinLayer << std::endl; + if (max_min_layer >= previous_MinLayer) { + // Using "7" to represent "and" + // a7b means a pair (a, b) + for (auto& task_type7machine_id2node_id2topo_structs : + task_type2machine_id2node_id2topo_structs) { + auto& machine_id2node_id2topo_structs = task_type7machine_id2node_id2topo_structs.second; + // Initializing the smallest node id for each machine + for (auto& machine_id7node_id2topo_structs : machine_id2node_id2topo_structs) { + move_front_between_maps(machine_id7node_id2topo_structs.second, + min_node_id2topo_struct); + } + // + while (!min_node_id2topo_struct.empty()) { + auto* topo_struct_min_node_id = min_node_id2topo_struct.begin()->second; + // Store the same nodes in different machine + std::vector same_nodes; + for (auto& min_node_id7topo_struct : min_node_id2topo_struct) { + auto* curr_topo_struct = min_node_id7topo_struct.second; + // Find out all the same nodes + if (should_run_simultaneously(topo_struct_min_node_id, curr_topo_struct)) { + same_nodes.push_back(curr_topo_struct); + } + } + // Cyclize them + for (int32_t i = 1; i < same_nodes.size(); i++) { + same_nodes[i - 1]->next_same_node = same_nodes[i]; + } + (*same_nodes.rbegin())->next_same_node = same_nodes[0]; + // Delete them and add new candidates + for (auto* same_node_topo_struct : same_nodes) { + // Erase them from min_node_id2topo_struct + min_node_id2topo_struct.erase(same_node_topo_struct->node->node_id()); + // Add new candidate + move_front_between_maps( + machine_id2node_id2topo_structs[same_node_topo_struct->node->machine_id()], + min_node_id2topo_struct); + } + } + } + // Renew the previous MinLayer at the end + previous_MinLayer = topo_struct.MinLayer; + } + } + // Put the topo structure into the map, waiting for determine the same nodes + task_type2machine_id2node_id2topo_structs[node->GetTaskType()][node->machine_id()] + [node->node_id()] = &topo_struct; + }); + + // Generate other parameters in the topological data structure + FindMainstem(task_node2topo_struct); + + // test debug + if (GlobalProcessCtx::Rank() == 0) { + std::cout << "Straightening order type: " << ParseIntegerFromEnv("Parameter0", 0) << ", " + << ParseIntegerFromEnv("Parameter1", 1) << ", " + << ParseIntegerFromEnv("Parameter2", 2) << std::endl; + } + // Order in the waiting sets + // Decide which node should run first + struct comp { + bool operator()(const TopoStruct* a, const TopoStruct* b) const { + static std::vector decide_parameters({ParseIntegerFromEnv("Parameter0", 0), + ParseIntegerFromEnv("Parameter1", 1), + ParseIntegerFromEnv("Parameter2", 2)}); + for (int32_t decide_parameter : decide_parameters) { + int32_t decide_parameter_a = a->GetDecidingParameter(decide_parameter); + int32_t decide_parameter_b = b->GetDecidingParameter(decide_parameter); + if (decide_parameter_a != decide_parameter_b) { + return decide_parameter_a < decide_parameter_b; + } + } + return a->node->node_id() < b->node->node_id(); + // auto comp_str = a->node->VisualStr().compare(b->node->VisualStr()); + // if (comp_str == 0) { + // // the order does not matter right now, but we need a strict order + // return a < b; + // } else { + // return comp_str < 0; + // }; + + // if (a->TributaryLayer == b->TributaryLayer) { + // if (a->MinDistance2Transfer == b->MinDistance2Transfer) { + // if (a->MinLayer == b->MinLayer) { + // // Put the task with the same names together + // auto comp_str = a->node->VisualStr().compare(b->node->VisualStr()); + // if (comp_str == 0) { + // // the order does not matter right now, but we need a strict order + // return a < b; + // } else { + // return comp_str < 0; + // } + // } else { + // // the node that shows up first has higher priority + // return a->MinLayer < b->MinLayer; + // } + // } else { + // return a->MinDistance2Transfer < b->MinDistance2Transfer; + // } + // } else { + // return a->TributaryLayer < b->TributaryLayer; + // } + } + }; + + // Classify sets for the task nodes + // std::set waiting_transfer; // 0 + // std::set waiting_computation; // 1 + // std::set run_asap; // 2, run as soon as possible + // std::set run_alap; // 3, run as late as possible + std::vector> waiting_lists(4); + + std::vector remain_task_nums(4, 0); + + // Classifier for the set according to the task type + auto set_classifier = [&](TaskNode* node) { + // Check task.pb.h for detail + int32_t task_type = node->GetTaskType(); + if (task_type == 1) { return 1; } + if (task_type == 12 || task_type == 13 || (48 <= task_type && task_type <= 64)) { return 0; } + if (task_type == 47) { return 2; } + return 3; + }; + + auto SetOrderInGraph = [&](TaskNode* task_node) { + if (GlobalProcessCtx::Rank() == 0) { + auto& topo_struct = task_node2topo_struct[task_node]; + std::cout << "Execution order: " << order_in_graph << ": " << task_node->VisualStr() + << ", node id: " << task_node->node_id() << std::endl; + std::cout << ": task type: " << task_node->GetTaskType() << ", " + << (task_node->parallel_ctx() == 0) << ", MinLayer: " << topo_struct.MinLayer + << ", TributaryLayer: " << topo_struct.TributaryLayer + << ", MinDist2Transfer: " << topo_struct.MinDistance2Transfer + << ", machine id: " << task_node->machine_id() + << ", thread id: " << task_node->thrd_id() << std::endl; + + if (task_type_map.find(task_node->GetTaskType()) == task_type_map.end()) { + task_type_map[task_node->GetTaskType()] = 0; + } + task_type_map[task_node->GetTaskType()]++; + } + task_node->set_order_in_graph(order_in_graph); + ordered_task_nodes.emplace_back(task_node); + ++order_in_graph; + }; + + // wait in the list + auto wait = [&](TaskNode* node) { + TopoStruct* first_topo_struct = &task_node2topo_struct[node]; + // Check if all the same nodes are ready simultaneously + TopoStruct* curr_topo_struct = first_topo_struct->next_same_node; + while (curr_topo_struct && curr_topo_struct != first_topo_struct) { + if (curr_topo_struct->counter) { return; } + curr_topo_struct = curr_topo_struct->next_same_node; + } + // Add all the same nodes at the same time + curr_topo_struct = first_topo_struct; + auto& waiting_list = waiting_lists[set_classifier(node)]; + while (true) { + waiting_list.insert(curr_topo_struct); + // Reduce counter then this node will never be added again + // Though inserting into a map twice does not matter because of the same keys + curr_topo_struct->counter--; + curr_topo_struct = curr_topo_struct->next_same_node; + if ((!curr_topo_struct) || (curr_topo_struct == first_topo_struct)) { break; } + } + }; + + std::map> task_type2node_id2machine_id; + // initialization + task_graph->ForEachNode([&](TaskNode* node) { + int32_t count = node->in_edges().size(); + task_node2topo_struct[node].counter = count; + if (count == 0) { wait(node); } + remain_task_nums[set_classifier(node)]++; + task_type2node_id2machine_id[node->GetTaskType()][node->node_id()] = node->machine_id(); + }); + + for (auto& task_type_group : task_type2node_id2machine_id) { + std::cout << "task type: " << task_type_group.first << std::endl; + int32_t pre_machine_id = -1; + for (auto& pair : task_type_group.second) { + std::cout << "node id: " << pair.first << ", machine id: " << pair.second << ", ? " + << (pair.second == 0 || pair.second > pre_machine_id) << std::endl; + pre_machine_id = pair.second; + } + } + + if (GlobalProcessCtx::Rank() == 0) { + std::cout << "Total task nums:" << std::endl; + std::cout << "Transfers: " << remain_task_nums[0] << ", Computation: " << remain_task_nums[1] + << ", Run Asap: " << remain_task_nums[2] << ", Run Alap: " << remain_task_nums[3] + << std::endl; + } + + // Finish execution + auto finish_execution = [&](TaskNode* node) { + node->ForEachNodeOnOutEdge([&](TaskNode* out) { + if (--(task_node2topo_struct[out].counter) == 0) { wait(out); } + }); + }; + + // Move the first node of the waiting list to the execution list + auto move2execution_list = [&](std::set& waiting_list, + std::vector& execution_list) { + TaskNode* first_node = (*waiting_list.begin())->node; + int32_t execution_num = 0; + TopoStruct* first_topo_struct = &task_node2topo_struct[first_node]; + // Find all the same nodes in different machine + // They should be run simultaneously + TopoStruct* curr_topo_struct = first_topo_struct; + while (true) { + execution_num++; + execution_list.push_back(curr_topo_struct->node); + waiting_list.erase(curr_topo_struct); + // move and maybe leave + curr_topo_struct = curr_topo_struct->next_same_node; + if ((!curr_topo_struct) || (curr_topo_struct == first_topo_struct)) { break; } + } + CHECK_GT(execution_num, 0) << "Error, no task nodes are moved to the execution list"; + }; + + // Execute the first n nodes in the waiting list + auto execute = [&](int32_t list_classifier, int32_t n, bool if_reverse = false) { + // n>=1 + if (n <= 0) { return; } + if (GlobalProcessCtx::Rank() == 0) { + std::cout << "Total task nums:" << std::endl; + std::cout << "Transfers: " << waiting_lists[0].size() + << ", Computation: " << waiting_lists[1].size() + << ", Run Asap: " << waiting_lists[2].size() + << ", Run Alap: " << waiting_lists[3].size() << std::endl; + } + auto& waiting_list = waiting_lists[list_classifier]; + std::vector execution_list; + int32_t count = 0; + // Move to the execution list + while (!waiting_list.empty()) { + move2execution_list(waiting_list, execution_list); + count++; + if (count >= n) { break; } + } + remain_task_nums[list_classifier] -= execution_list.size(); + // Set the order and then remove from the execution list + for (auto* node : execution_list) { + SetOrderInGraph(node); + finish_execution(node); + } + }; + + // int32_t max_overlap_computation_num = ParseIntegerFromEnv("MAX_OVERLAP_NUM", 40); + + // straightening + while (true) { + if (waiting_lists[2].empty()) { + if (waiting_lists[0].empty()) { + if (waiting_lists[1].empty()) { + if (waiting_lists[3].empty()) { + if (GlobalProcessCtx::Rank() == 0) { std::cout << "Execution done" << std::endl; } + break; + } else { + execute(3, waiting_lists[3].size()); + } + } else { + execute(1, 1); + } + } else { + int32_t computation_num = + std::min(int32_t(waiting_lists[1].size() / (waiting_lists[0].size())), + remain_task_nums[1] / remain_task_nums[0]); + // Holding the transfer + std::vector transfer_execution_list; + move2execution_list(waiting_lists[0], transfer_execution_list); + remain_task_nums[0] -= transfer_execution_list.size(); + for (auto* transfer_node : transfer_execution_list) { SetOrderInGraph(transfer_node); } + // Overlap transfer with computation + execute(1, computation_num); + + // Release the transfer + for (auto* transfer_node : transfer_execution_list) { finish_execution(transfer_node); } + } + } else { + execute(2, waiting_lists[2].size()); + } + } + + // test debug + if (GlobalProcessCtx::Rank() == 0) { + std::cout << "Print all task type: " << std::endl; + for (auto& pair : task_type_map) { + std::cout << "task type: " << pair.first << ", " << pair.second << std::endl; + } + } +} + } // namespace oneflow diff --git a/oneflow/core/graph/straighten_nodes.h b/oneflow/core/graph/straighten_nodes.h index 5980a8709d7..de95615270c 100644 --- a/oneflow/core/graph/straighten_nodes.h +++ b/oneflow/core/graph/straighten_nodes.h @@ -16,8 +16,7 @@ limitations under the License. #ifndef STRAIGHTEN_NODES_H_ #define STRAIGHTEN_NODES_H_ -#include "oneflow/core/graph/op_graph.h" -#include "oneflow/core/graph/task_node.h" +#include "oneflow/core/graph/task_graph.h" namespace oneflow { @@ -54,6 +53,8 @@ class TopoStruct { void FindMainstem(HashMap& task_node2topo_struct); +void StraightenNodes(TaskGraph* task_graph, std::vector& ordered_task_nodes); + } // namespace oneflow #endif // STRAIGHTEN_NODES_H_ diff --git a/oneflow/core/graph/task_graph.cpp b/oneflow/core/graph/task_graph.cpp index 5fd69c40274..0151ec0048e 100644 --- a/oneflow/core/graph/task_graph.cpp +++ b/oneflow/core/graph/task_graph.cpp @@ -29,6 +29,7 @@ limitations under the License. #include "oneflow/core/graph/boxing/hierarchical_sub_task_graph_builder_impl.h" #include "oneflow/core/graph/task_stream_index_manager.h" #include "oneflow/core/ep/include/primitive/memcpy.h" +#include "oneflow/core/graph/straighten_nodes.h" namespace oneflow { @@ -450,7 +451,11 @@ TaskGraph::TaskGraph() { } }); - SetOrderInGraphForEachNode(); + if (ParseBooleanFromEnv("ONEFLOW_RANDOM_STRAIGHTEN_NODES", false)) { + SetOrderInGraphForEachNode(); + } else { + StraightenNodes(this, ordered_task_nodes_); + } if (Global::Get()->enable_debug_mode()) { ToDotWithAutoFilePath(); } } From e807d0bab531ed69e466980276c40e6fbf547b1e Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Thu, 26 May 2022 23:21:36 +0800 Subject: [PATCH 04/22] Change algorithm structure --- oneflow/core/graph/straighten_nodes.cpp | 82 +++++++++++-------------- oneflow/core/graph/straighten_nodes.h | 2 +- oneflow/core/graph/task_graph.cpp | 2 +- 3 files changed, 39 insertions(+), 47 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index a931a7f24b1..c1cdb1938dc 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -28,6 +28,37 @@ bool IsTransferNode(int32_t task_type) { return task_type == 12 || task_type == 13 || (48 <= task_type && task_type <= 64); } +// The same order in the set +// It is only run in the following situation because there are too many implicit conditions. +bool should_run_simultaneously(TopoStruct* a, TopoStruct* b) { + // Normal node would have the same name + if (a->node->GetTaskType() == 1) { return a->node->VisualStr() == b->node->VisualStr(); } + // Otherwise they must have the same parameters with different machine ids and the closest node + // id. We only use Min Layer here, since Tributary Layer might be different due to asymmetry of + // graph. + return true; +}; + +// move the head from source to target +void move_front_between_maps(std::map& source, + std::map& target) { + if (!source.empty()) { + const auto& front = source.begin(); + target[front->first] = front->second; + source.erase(front); + } +}; + +// Classifier for the set according to the task type +int32_t set_classifier(const TaskNode* node) { + // Check task.pb.h for detail + int32_t task_type = node->GetTaskType(); + if (task_type == 1) { return 1; } + if (task_type == 12 || task_type == 13 || (48 <= task_type && task_type <= 64)) { return 0; } + if (task_type == 47) { return 2; } + return 3; +}; + } // anonymous namespace // Drop down the maximum layer with the minimum layer form consumer @@ -135,32 +166,11 @@ void FindMainstem(HashMap& task_node2topo_struct) { } } -void StraightenNodes(TaskGraph* task_graph, std::vector& ordered_task_nodes) { +void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task_nodes) { // The function for settle the order in the graph int64_t order_in_graph = 0; HashMap task_type_map; - // The same order in the set - // It is only run in the following situation because there are too many implicit conditions. - auto should_run_simultaneously = [](TopoStruct* a, TopoStruct* b) -> bool { - // Normal node would have the same name - if (a->node->GetTaskType() == 1) { return a->node->VisualStr() == b->node->VisualStr(); } - // Otherwise they must have the same parameters with different machine ids and the closest node - // id. We only use Min Layer here, since Tributary Layer might be different due to asymmetry of - // graph. - return true; - }; - - // move the head from source to target - auto move_front_between_maps = [](std::map& source, - std::map& target) { - if (!source.empty()) { - const auto& front = source.begin(); - target[front->first] = front->second; - source.erase(front); - } - }; - // Generate topological data structure for each task node HashMap task_node2topo_struct; // Determine the same nodes which should run simultaneously @@ -233,11 +243,10 @@ void StraightenNodes(TaskGraph* task_graph, std::vector& ordered_task FindMainstem(task_node2topo_struct); // test debug - if (GlobalProcessCtx::Rank() == 0) { - std::cout << "Straightening order type: " << ParseIntegerFromEnv("Parameter0", 0) << ", " - << ParseIntegerFromEnv("Parameter1", 1) << ", " - << ParseIntegerFromEnv("Parameter2", 2) << std::endl; - } + std::cout << "Straightening order type: " << ParseIntegerFromEnv("Parameter0", 0) << ", " + << ParseIntegerFromEnv("Parameter1", 1) << ", " << ParseIntegerFromEnv("Parameter2", 2) + << std::endl; + // Order in the waiting sets // Decide which node should run first struct comp { @@ -253,13 +262,6 @@ void StraightenNodes(TaskGraph* task_graph, std::vector& ordered_task } } return a->node->node_id() < b->node->node_id(); - // auto comp_str = a->node->VisualStr().compare(b->node->VisualStr()); - // if (comp_str == 0) { - // // the order does not matter right now, but we need a strict order - // return a < b; - // } else { - // return comp_str < 0; - // }; // if (a->TributaryLayer == b->TributaryLayer) { // if (a->MinDistance2Transfer == b->MinDistance2Transfer) { @@ -294,16 +296,6 @@ void StraightenNodes(TaskGraph* task_graph, std::vector& ordered_task std::vector remain_task_nums(4, 0); - // Classifier for the set according to the task type - auto set_classifier = [&](TaskNode* node) { - // Check task.pb.h for detail - int32_t task_type = node->GetTaskType(); - if (task_type == 1) { return 1; } - if (task_type == 12 || task_type == 13 || (48 <= task_type && task_type <= 64)) { return 0; } - if (task_type == 47) { return 2; } - return 3; - }; - auto SetOrderInGraph = [&](TaskNode* task_node) { if (GlobalProcessCtx::Rank() == 0) { auto& topo_struct = task_node2topo_struct[task_node]; @@ -322,7 +314,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector& ordered_task task_type_map[task_node->GetTaskType()]++; } task_node->set_order_in_graph(order_in_graph); - ordered_task_nodes.emplace_back(task_node); + ordered_task_nodes->emplace_back(task_node); ++order_in_graph; }; diff --git a/oneflow/core/graph/straighten_nodes.h b/oneflow/core/graph/straighten_nodes.h index de95615270c..8bbae78be1c 100644 --- a/oneflow/core/graph/straighten_nodes.h +++ b/oneflow/core/graph/straighten_nodes.h @@ -53,7 +53,7 @@ class TopoStruct { void FindMainstem(HashMap& task_node2topo_struct); -void StraightenNodes(TaskGraph* task_graph, std::vector& ordered_task_nodes); +void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task_nodes); } // namespace oneflow diff --git a/oneflow/core/graph/task_graph.cpp b/oneflow/core/graph/task_graph.cpp index 0151ec0048e..abd28f421f4 100644 --- a/oneflow/core/graph/task_graph.cpp +++ b/oneflow/core/graph/task_graph.cpp @@ -454,7 +454,7 @@ TaskGraph::TaskGraph() { if (ParseBooleanFromEnv("ONEFLOW_RANDOM_STRAIGHTEN_NODES", false)) { SetOrderInGraphForEachNode(); } else { - StraightenNodes(this, ordered_task_nodes_); + StraightenNodes(this, &ordered_task_nodes_); } if (Global::Get()->enable_debug_mode()) { ToDotWithAutoFilePath(); } } From c438bda417e26e4cc35009ab74a14064431efeb1 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Thu, 26 May 2022 23:32:40 +0800 Subject: [PATCH 05/22] Remove some debug information --- oneflow/core/graph/straighten_nodes.cpp | 55 ------------------------- 1 file changed, 55 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index c1cdb1938dc..7c53ae346da 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -169,7 +169,6 @@ void FindMainstem(HashMap& task_node2topo_struct) { void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task_nodes) { // The function for settle the order in the graph int64_t order_in_graph = 0; - HashMap task_type_map; // Generate topological data structure for each task node HashMap task_node2topo_struct; @@ -190,7 +189,6 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task }); topo_struct.MinLayer = max_min_layer + 1; // Deal with all the nodes with MinLayer=previous_MinLayer - std::cout << "Min Layer: " << topo_struct.MinLayer << std::endl; if (max_min_layer >= previous_MinLayer) { // Using "7" to represent "and" // a7b means a pair (a, b) @@ -297,22 +295,6 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task std::vector remain_task_nums(4, 0); auto SetOrderInGraph = [&](TaskNode* task_node) { - if (GlobalProcessCtx::Rank() == 0) { - auto& topo_struct = task_node2topo_struct[task_node]; - std::cout << "Execution order: " << order_in_graph << ": " << task_node->VisualStr() - << ", node id: " << task_node->node_id() << std::endl; - std::cout << ": task type: " << task_node->GetTaskType() << ", " - << (task_node->parallel_ctx() == 0) << ", MinLayer: " << topo_struct.MinLayer - << ", TributaryLayer: " << topo_struct.TributaryLayer - << ", MinDist2Transfer: " << topo_struct.MinDistance2Transfer - << ", machine id: " << task_node->machine_id() - << ", thread id: " << task_node->thrd_id() << std::endl; - - if (task_type_map.find(task_node->GetTaskType()) == task_type_map.end()) { - task_type_map[task_node->GetTaskType()] = 0; - } - task_type_map[task_node->GetTaskType()]++; - } task_node->set_order_in_graph(order_in_graph); ordered_task_nodes->emplace_back(task_node); ++order_in_graph; @@ -340,33 +322,14 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task } }; - std::map> task_type2node_id2machine_id; // initialization task_graph->ForEachNode([&](TaskNode* node) { int32_t count = node->in_edges().size(); task_node2topo_struct[node].counter = count; if (count == 0) { wait(node); } remain_task_nums[set_classifier(node)]++; - task_type2node_id2machine_id[node->GetTaskType()][node->node_id()] = node->machine_id(); }); - for (auto& task_type_group : task_type2node_id2machine_id) { - std::cout << "task type: " << task_type_group.first << std::endl; - int32_t pre_machine_id = -1; - for (auto& pair : task_type_group.second) { - std::cout << "node id: " << pair.first << ", machine id: " << pair.second << ", ? " - << (pair.second == 0 || pair.second > pre_machine_id) << std::endl; - pre_machine_id = pair.second; - } - } - - if (GlobalProcessCtx::Rank() == 0) { - std::cout << "Total task nums:" << std::endl; - std::cout << "Transfers: " << remain_task_nums[0] << ", Computation: " << remain_task_nums[1] - << ", Run Asap: " << remain_task_nums[2] << ", Run Alap: " << remain_task_nums[3] - << std::endl; - } - // Finish execution auto finish_execution = [&](TaskNode* node) { node->ForEachNodeOnOutEdge([&](TaskNode* out) { @@ -398,13 +361,6 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task auto execute = [&](int32_t list_classifier, int32_t n, bool if_reverse = false) { // n>=1 if (n <= 0) { return; } - if (GlobalProcessCtx::Rank() == 0) { - std::cout << "Total task nums:" << std::endl; - std::cout << "Transfers: " << waiting_lists[0].size() - << ", Computation: " << waiting_lists[1].size() - << ", Run Asap: " << waiting_lists[2].size() - << ", Run Alap: " << waiting_lists[3].size() << std::endl; - } auto& waiting_list = waiting_lists[list_classifier]; std::vector execution_list; int32_t count = 0; @@ -422,15 +378,12 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task } }; - // int32_t max_overlap_computation_num = ParseIntegerFromEnv("MAX_OVERLAP_NUM", 40); - // straightening while (true) { if (waiting_lists[2].empty()) { if (waiting_lists[0].empty()) { if (waiting_lists[1].empty()) { if (waiting_lists[3].empty()) { - if (GlobalProcessCtx::Rank() == 0) { std::cout << "Execution done" << std::endl; } break; } else { execute(3, waiting_lists[3].size()); @@ -457,14 +410,6 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task execute(2, waiting_lists[2].size()); } } - - // test debug - if (GlobalProcessCtx::Rank() == 0) { - std::cout << "Print all task type: " << std::endl; - for (auto& pair : task_type_map) { - std::cout << "task type: " << pair.first << ", " << pair.second << std::endl; - } - } } } // namespace oneflow From 1f70c6b5918b5c7b851c8c0e382736a498e915bb Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Wed, 1 Jun 2022 18:57:12 +0800 Subject: [PATCH 06/22] Finalize the straighten algorithm after deciding the parameters by experiments --- oneflow/core/graph/straighten_nodes.cpp | 36 +++++-------------------- 1 file changed, 6 insertions(+), 30 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 7c53ae346da..5927f55350d 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -240,18 +240,16 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task // Generate other parameters in the topological data structure FindMainstem(task_node2topo_struct); - // test debug - std::cout << "Straightening order type: " << ParseIntegerFromEnv("Parameter0", 0) << ", " - << ParseIntegerFromEnv("Parameter1", 1) << ", " << ParseIntegerFromEnv("Parameter2", 2) - << std::endl; - // Order in the waiting sets // Decide which node should run first struct comp { bool operator()(const TopoStruct* a, const TopoStruct* b) const { - static std::vector decide_parameters({ParseIntegerFromEnv("Parameter0", 0), - ParseIntegerFromEnv("Parameter1", 1), - ParseIntegerFromEnv("Parameter2", 2)}); + // NOTE: Leave these code for debugging in the future + // static std::vector decide_parameters({ParseIntegerFromEnv("Parameter0", 0), + // ParseIntegerFromEnv("Parameter1", 1), + // ParseIntegerFromEnv("Parameter2", 2)}); + // The best parameter set is {5, 3} + static std::vector decide_parameters({5, 3}); for (int32_t decide_parameter : decide_parameters) { int32_t decide_parameter_a = a->GetDecidingParameter(decide_parameter); int32_t decide_parameter_b = b->GetDecidingParameter(decide_parameter); @@ -260,28 +258,6 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task } } return a->node->node_id() < b->node->node_id(); - - // if (a->TributaryLayer == b->TributaryLayer) { - // if (a->MinDistance2Transfer == b->MinDistance2Transfer) { - // if (a->MinLayer == b->MinLayer) { - // // Put the task with the same names together - // auto comp_str = a->node->VisualStr().compare(b->node->VisualStr()); - // if (comp_str == 0) { - // // the order does not matter right now, but we need a strict order - // return a < b; - // } else { - // return comp_str < 0; - // } - // } else { - // // the node that shows up first has higher priority - // return a->MinLayer < b->MinLayer; - // } - // } else { - // return a->MinDistance2Transfer < b->MinDistance2Transfer; - // } - // } else { - // return a->TributaryLayer < b->TributaryLayer; - // } } }; From 20b39252e4dcd6ebdf25f7ed1341286601968457 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Wed, 1 Jun 2022 19:05:03 +0800 Subject: [PATCH 07/22] Notify the usage of straighten algorithm --- oneflow/core/graph/straighten_nodes.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 5927f55350d..7d623a1d5ac 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -240,6 +240,8 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task // Generate other parameters in the topological data structure FindMainstem(task_node2topo_struct); + LOG(INFO) << "Straightening order: " << 5 << ", " << 3; + // Order in the waiting sets // Decide which node should run first struct comp { From 71cac094defce80992e0e13fddad2e5fa3b17420 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Tue, 14 Jun 2022 17:17:49 +0800 Subject: [PATCH 08/22] Of format --- oneflow/core/graph/straighten_nodes.cpp | 33 +++++++++++++------------ oneflow/core/graph/straighten_nodes.h | 8 +++--- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 7d623a1d5ac..f6ea85d44cc 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -68,7 +68,7 @@ void TopoStruct::DropTributaryLayer(int32_t upper_bound) { // Should initialize the counter to be the number of out edges // Compute maximum layer for tributaries -void TopoStruct::SpreadTributaryLayer(HashMap& task_node2topo_struct) { +void TopoStruct::SpreadTributaryLayer(HashMap* task_node2topo_struct) { if (counter || MinLayer <= 0) { return; } int32_t producer_max_lay = 0; if (IfMainstem) { @@ -79,11 +79,10 @@ void TopoStruct::SpreadTributaryLayer(HashMap& task_node2 // producer_max_lay = TributaryLayer - 1; } node->ForEachNodeOnInEdge([&](TaskNode* in) { - auto& topo_struct_in = task_node2topo_struct[in]; + auto& topo_struct_in = task_node2topo_struct->at(in); topo_struct_in.DropTributaryLayer(producer_max_lay); - if (--topo_struct_in.counter == 0) { - topo_struct_in.SpreadTributaryLayer(task_node2topo_struct); - } + --topo_struct_in.counter; + if (topo_struct_in.counter == 0) { topo_struct_in.SpreadTributaryLayer(task_node2topo_struct); } }); // Reduce counter to -1 to avoid visitting again counter--; @@ -91,7 +90,7 @@ void TopoStruct::SpreadTributaryLayer(HashMap& task_node2 // Judge if this node is on the mainstem // If so, judge it for its producer/upstream nodes -void TopoStruct::SpreadMainstem(HashMap& task_node2topo_struct) { +void TopoStruct::SpreadMainstem(HashMap* task_node2topo_struct) { // Skip it if this node is already judged. if (IfMainstem) { return; } CHECK(MinLayer >= 0) << "TopoStruct not initialized!"; @@ -99,7 +98,7 @@ void TopoStruct::SpreadMainstem(HashMap& task_node2topo_s // If I am in the mainstem, then all the children with (MinLayer >= my layer id - 1) would be // considered as in the mainstem node->ForEachNodeOnInEdge([&](TaskNode* in) { - auto& topo_struct_in = task_node2topo_struct[in]; + auto& topo_struct_in = task_node2topo_struct->at(in); if (topo_struct_in.MinLayer == MinLayer - 1) { topo_struct_in.SpreadTributaryLayer(task_node2topo_struct); } @@ -107,7 +106,7 @@ void TopoStruct::SpreadMainstem(HashMap& task_node2topo_s } // The minimum computation distance from the beginning of this op to the next transfer -int32_t TopoStruct::GetMinDistance2Transfer(HashMap& task_node2topo_struct) { +int32_t TopoStruct::GetMinDistance2Transfer(HashMap* task_node2topo_struct) { if (MinDistance2Transfer >= 0) { return MinDistance2Transfer; } // if this node is a transfer node if (IsTransferNode(node->GetTaskType())) { @@ -118,9 +117,10 @@ int32_t TopoStruct::GetMinDistance2Transfer(HashMap& task node->ForEachNodeOnOutEdge([&](TaskNode* out) { MinDistance2Transfer = std::min(MinDistance2Transfer, - task_node2topo_struct[out].GetMinDistance2Transfer(task_node2topo_struct)); + task_node2topo_struct->at(out).GetMinDistance2Transfer(task_node2topo_struct)); }); - return ++MinDistance2Transfer; + ++MinDistance2Transfer; + return MinDistance2Transfer; } // deciding parameter @@ -139,15 +139,15 @@ int32_t TopoStruct::GetDecidingParameter(int32_t i) const { } // Find the mianstem of the task graph, then reduce the wait time for tributaries -void FindMainstem(HashMap& task_node2topo_struct) { +void FindMainstem(HashMap* task_node2topo_struct) { // Find the maximum layer number int32_t max_MinLayer = -1; - for (const auto& pair : task_node2topo_struct) { + for (const auto& pair : *task_node2topo_struct) { if (max_MinLayer < pair.second.MinLayer) { max_MinLayer = pair.second.MinLayer; } } // All the nodes with MinLayer>=mainstem_end_id would be considered as mainstem nodes int32_t mainstem_end_id = max_MinLayer - 4; - for (auto& pair : task_node2topo_struct) { + for (auto& pair : *task_node2topo_struct) { auto& topo_struct = pair.second; // Initialize the counter and Tributary Layer topo_struct.counter = pair.first->out_edges().size(); @@ -158,7 +158,7 @@ void FindMainstem(HashMap& task_node2topo_struct) { } } - for (auto& pair : task_node2topo_struct) { + for (auto& pair : *task_node2topo_struct) { // Compute maximum layer for tributaries pair.second.SpreadTributaryLayer(task_node2topo_struct); // Set the MinDistance2Transfer for each topological structure @@ -238,7 +238,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task }); // Generate other parameters in the topological data structure - FindMainstem(task_node2topo_struct); + FindMainstem(&task_node2topo_struct); LOG(INFO) << "Straightening order: " << 5 << ", " << 3; @@ -311,7 +311,8 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task // Finish execution auto finish_execution = [&](TaskNode* node) { node->ForEachNodeOnOutEdge([&](TaskNode* out) { - if (--(task_node2topo_struct[out].counter) == 0) { wait(out); } + --(task_node2topo_struct[out].counter); + if (task_node2topo_struct[out].counter == 0) { wait(out); } }); }; diff --git a/oneflow/core/graph/straighten_nodes.h b/oneflow/core/graph/straighten_nodes.h index 8bbae78be1c..c8e75ac3e30 100644 --- a/oneflow/core/graph/straighten_nodes.h +++ b/oneflow/core/graph/straighten_nodes.h @@ -40,18 +40,18 @@ class TopoStruct { // Drop down the tributary layer void DropTributaryLayer(int32_t upper_bound); - void SpreadTributaryLayer(HashMap& task_node2topo_struct); + void SpreadTributaryLayer(HashMap* task_node2topo_struct); - void SpreadMainstem(HashMap& task_node2topo_struct); + void SpreadMainstem(HashMap* task_node2topo_struct); // The minimum computation distance from the beginning of this op to the next transfer - int32_t GetMinDistance2Transfer(HashMap& task_node2topo_struct); + int32_t GetMinDistance2Transfer(HashMap* task_node2topo_struct); // deciding parameter int32_t GetDecidingParameter(int32_t i) const; }; -void FindMainstem(HashMap& task_node2topo_struct); +void FindMainstem(HashMap* task_node2topo_struct); void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task_nodes); From 840b8de48ff66d4b193a069a38650a4fb9d3cffa Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Tue, 14 Jun 2022 17:19:53 +0800 Subject: [PATCH 09/22] Update oneflow/core/graph/straighten_nodes.cpp Of format Co-authored-by: daquexian --- oneflow/core/graph/straighten_nodes.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index f6ea85d44cc..e4dac1d8150 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -93,7 +93,7 @@ void TopoStruct::SpreadTributaryLayer(HashMap* task_node2 void TopoStruct::SpreadMainstem(HashMap* task_node2topo_struct) { // Skip it if this node is already judged. if (IfMainstem) { return; } - CHECK(MinLayer >= 0) << "TopoStruct not initialized!"; + CHECK_GE(MinLayer, 0) << "TopoStruct not initialized!"; IfMainstem = true; // If I am in the mainstem, then all the children with (MinLayer >= my layer id - 1) would be // considered as in the mainstem From fc65801c8e9734bda1fea1e7f9ca24a78be1a3ed Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Tue, 14 Jun 2022 17:26:48 +0800 Subject: [PATCH 10/22] Of format --- oneflow/core/graph/graph.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/oneflow/core/graph/graph.h b/oneflow/core/graph/graph.h index 68bfa06585e..2583cec899f 100644 --- a/oneflow/core/graph/graph.h +++ b/oneflow/core/graph/graph.h @@ -228,11 +228,11 @@ void Graph::TopoForEachNode(std::function N template void Graph::TopoForEachNodeFast( std::function NodeHandler) const { - TopoForEachNodeFastMaybe(source_nodes(), &NodeType::ForEachNodeOnInEdge, - &NodeType::ForEachNodeOnOutEdge, [&](NodeType* node) { - NodeHandler(node); - return Maybe::Ok(); - }); + CHECK_JUST(TopoForEachNodeFastMaybe(source_nodes(), &NodeType::ForEachNodeOnInEdge, + &NodeType::ForEachNodeOnOutEdge, [&](NodeType* node) { + NodeHandler(node); + return Maybe::Ok(); + })); } template From 26820050a471b280c347aec481a06d705747ad27 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Tue, 14 Jun 2022 17:40:53 +0800 Subject: [PATCH 11/22] Stop using visual string before we find a better key --- oneflow/core/graph/straighten_nodes.cpp | 28 +++++++++++++------------ 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index e4dac1d8150..d8a3c3f150f 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -30,14 +30,14 @@ bool IsTransferNode(int32_t task_type) { // The same order in the set // It is only run in the following situation because there are too many implicit conditions. -bool should_run_simultaneously(TopoStruct* a, TopoStruct* b) { - // Normal node would have the same name - if (a->node->GetTaskType() == 1) { return a->node->VisualStr() == b->node->VisualStr(); } - // Otherwise they must have the same parameters with different machine ids and the closest node - // id. We only use Min Layer here, since Tributary Layer might be different due to asymmetry of - // graph. - return true; -}; +// bool ShouldRunSimultaneously(TopoStruct* a, TopoStruct* b) { +// // Normal node would have the same name +// if (a->node->GetTaskType() == 1) { return a->node->VisualStr() == b->node->VisualStr(); } +// // Otherwise they must have the same parameters with different machine ids and the closest node +// // id. We only use Min Layer here, since Tributary Layer might be different due to asymmetry of +// // graph. +// return true; +// }; // move the head from source to target void move_front_between_maps(std::map& source, @@ -200,17 +200,19 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task move_front_between_maps(machine_id7node_id2topo_structs.second, min_node_id2topo_struct); } - // + while (!min_node_id2topo_struct.empty()) { auto* topo_struct_min_node_id = min_node_id2topo_struct.begin()->second; - // Store the same nodes in different machine + // Store the same nodes in different machines std::vector same_nodes; for (auto& min_node_id7topo_struct : min_node_id2topo_struct) { auto* curr_topo_struct = min_node_id7topo_struct.second; // Find out all the same nodes - if (should_run_simultaneously(topo_struct_min_node_id, curr_topo_struct)) { - same_nodes.push_back(curr_topo_struct); - } + // Stop using Visual string before we find a better key + // Currently we can use the topological structure and node id to decide the same nodes + // if (ShouldRunSimultaneously(topo_struct_min_node_id, curr_topo_struct)) { + same_nodes.push_back(curr_topo_struct); + // } } // Cyclize them for (int32_t i = 1; i < same_nodes.size(); i++) { From e8917a756fb21005bc89df3359141ad3f13ff09f Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Wed, 15 Jun 2022 00:16:51 +0800 Subject: [PATCH 12/22] Remove magic numbers and Of format --- oneflow/core/graph/straighten_nodes.cpp | 139 +++++++++++++++++------- oneflow/core/graph/straighten_nodes.h | 13 +++ 2 files changed, 111 insertions(+), 41 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index d8a3c3f150f..8fb49175148 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -14,20 +14,18 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "oneflow/core/graph/straighten_nodes.h" +#include "oneflow/core/common/data_type.h" #include "oneflow/core/graph/op_graph.h" #include "oneflow/core/graph/task_node.h" #include "oneflow/core/job/job_desc.h" #include "oneflow/core/common/protobuf.h" +#include "oneflow/core/job/task.pb.h" #include "oneflow/core/rpc/include/global_process_ctx.h" namespace oneflow { namespace { -bool IsTransferNode(int32_t task_type) { - return task_type == 12 || task_type == 13 || (48 <= task_type && task_type <= 64); -} - // The same order in the set // It is only run in the following situation because there are too many implicit conditions. // bool ShouldRunSimultaneously(TopoStruct* a, TopoStruct* b) { @@ -40,8 +38,8 @@ bool IsTransferNode(int32_t task_type) { // }; // move the head from source to target -void move_front_between_maps(std::map& source, - std::map& target) { +void MoveFrontBetweenMaps(std::map& source, + std::map& target) { if (!source.empty()) { const auto& front = source.begin(); target[front->first] = front->second; @@ -49,18 +47,69 @@ void move_front_between_maps(std::map& source, } }; -// Classifier for the set according to the task type -int32_t set_classifier(const TaskNode* node) { - // Check task.pb.h for detail - int32_t task_type = node->GetTaskType(); - if (task_type == 1) { return 1; } - if (task_type == 12 || task_type == 13 || (48 <= task_type && task_type <= 64)) { return 0; } - if (task_type == 47) { return 2; } - return 3; -}; +bool ShouldRunASAP(TaskType task_type) { + // They are sorted according to frequency of occurrences + switch (task_type) { + // We mark the number of occurrences in bert + case TaskType::kDeviceTick: // 38 + case TaskType::kTick: // 8 + case TaskType::kSrcSubsetTick: // 6 + case TaskType::kDstSubsetTick: // 6 + case TaskType::kCriticalSectionWaitTick: // 4 + case TaskType::kWaitAndSendIds: // 2 + case TaskType::kPack: // 0 + case TaskType::kUnpack: // 0 + case TaskType::kRepeat: // 0 + case TaskType::kAcc: // 0 + case TaskType::kSourceTick: // 0 + case TaskType::kAccTick: // 0 + case TaskType::kCase: // 0 + case TaskType::kEsac: // 0 + case TaskType::kReentrantLock: return true; // 0 + default: return false; + } +} } // anonymous namespace +bool IsTransferNode(TaskType task_type) { + // return task_type == 12 || task_type == 13 || (48 <= task_type && task_type <= 64); + // They are sorted according to frequency of occurrences + switch (task_type) { + // We mark the number of occurrences in bert + case TaskType::kCollectiveBoxingGeneric: // 76 + case TaskType::kCopyHd: // 27 + case TaskType::kSliceBoxing: // 16 + case TaskType::kCopyCommNet: // 12 + case TaskType::kCollectiveBoxingPack: // 8 + case TaskType::kCollectiveBoxingUnpack: // 8 + case TaskType::kBoxingZeros: // 3 + case TaskType::kForeignInput: // 0 + case TaskType::kForeignOutput: // 0 + case TaskType::kDistributeConcat: // 0 + case TaskType::kDistributeSplit: // 0 + case TaskType::kBoxingIdentity: // 0 + case TaskType::kDecodeH2D: // 0 + case TaskType::kSspVariableProxy: return true; // 0 + default: return false; + } +} + +// Classifier for the set according to the task type +TaskClassifier GetTaskClassifier(const TaskNode* node) { + // Check task.pb.h for detail + // They are sorted according to frequency of judgement + // frequency of judgement = the number of occurrences / the times of judgement + TaskType task_type = node->GetTaskType(); + if (task_type == TaskType::kNormalForward) { return TaskClassifier::kWaitingComputation; } + if (IsTransferNode(task_type)) { return TaskClassifier::kWaitingTransfer; } + if (task_type == TaskType::kCallbackNotify) { return TaskClassifier::kRunALAP; } + if (ShouldRunASAP(task_type)) { return TaskClassifier::kRunASAP; } + CHECK(false) << "Unclassified or invalid task type (" << task_type << ") showing up"; + // Throw a kRunASAP which means ignoring this node in the algorithm + return TaskClassifier::kRunASAP; +} + // Drop down the maximum layer with the minimum layer form consumer void TopoStruct::DropTributaryLayer(int32_t upper_bound) { if (upper_bound < TributaryLayer || TributaryLayer < 0) { TributaryLayer = upper_bound; } @@ -113,7 +162,8 @@ int32_t TopoStruct::GetMinDistance2Transfer(HashMap* task MinDistance2Transfer = 0; return MinDistance2Transfer; } - MinDistance2Transfer = 1000000; + // Otherwise, initialize it with a large number + MinDistance2Transfer = GetMaxVal(); node->ForEachNodeOnOutEdge([&](TaskNode* out) { MinDistance2Transfer = std::min(MinDistance2Transfer, @@ -146,6 +196,7 @@ void FindMainstem(HashMap* task_node2topo_struct) { if (max_MinLayer < pair.second.MinLayer) { max_MinLayer = pair.second.MinLayer; } } // All the nodes with MinLayer>=mainstem_end_id would be considered as mainstem nodes + // The last 5 layers would be considered as in mainstem anyway. int32_t mainstem_end_id = max_MinLayer - 4; for (auto& pair : *task_node2topo_struct) { auto& topo_struct = pair.second; @@ -197,12 +248,11 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task auto& machine_id2node_id2topo_structs = task_type7machine_id2node_id2topo_structs.second; // Initializing the smallest node id for each machine for (auto& machine_id7node_id2topo_structs : machine_id2node_id2topo_structs) { - move_front_between_maps(machine_id7node_id2topo_structs.second, - min_node_id2topo_struct); + MoveFrontBetweenMaps(machine_id7node_id2topo_structs.second, min_node_id2topo_struct); } while (!min_node_id2topo_struct.empty()) { - auto* topo_struct_min_node_id = min_node_id2topo_struct.begin()->second; + // auto* topo_struct_min_node_id = min_node_id2topo_struct.begin()->second; // Store the same nodes in different machines std::vector same_nodes; for (auto& min_node_id7topo_struct : min_node_id2topo_struct) { @@ -224,7 +274,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task // Erase them from min_node_id2topo_struct min_node_id2topo_struct.erase(same_node_topo_struct->node->node_id()); // Add new candidate - move_front_between_maps( + MoveFrontBetweenMaps( machine_id2node_id2topo_structs[same_node_topo_struct->node->machine_id()], min_node_id2topo_struct); } @@ -266,13 +316,14 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task }; // Classify sets for the task nodes - // std::set waiting_transfer; // 0 - // std::set waiting_computation; // 1 - // std::set run_asap; // 2, run as soon as possible - // std::set run_alap; // 3, run as late as possible - std::vector> waiting_lists(4); + // std::set waiting_transfer; // 0, TaskClassifier::kWaitingTransfer + // std::set waiting_computation; // 1, TaskClassifier::kWaitingComputation + // std::set run_asap; // 2, TaskClassifier::kRunASAP , run as soon as possible + // std::set run_alap; // 3, TaskClassifier::kRunALAP , run as late as possible + const int32_t num_classifier = 4; + std::vector> waiting_lists(num_classifier); - std::vector remain_task_nums(4, 0); + std::vector remain_task_nums(num_classifier, 0); auto SetOrderInGraph = [&](TaskNode* task_node) { task_node->set_order_in_graph(order_in_graph); @@ -291,7 +342,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task } // Add all the same nodes at the same time curr_topo_struct = first_topo_struct; - auto& waiting_list = waiting_lists[set_classifier(node)]; + auto& waiting_list = waiting_lists[GetTaskClassifier(node)]; while (true) { waiting_list.insert(curr_topo_struct); // Reduce counter then this node will never be added again @@ -307,7 +358,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task int32_t count = node->in_edges().size(); task_node2topo_struct[node].counter = count; if (count == 0) { wait(node); } - remain_task_nums[set_classifier(node)]++; + remain_task_nums[GetTaskClassifier(node)]++; }); // Finish execution @@ -340,7 +391,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task // Execute the first n nodes in the waiting list auto execute = [&](int32_t list_classifier, int32_t n, bool if_reverse = false) { - // n>=1 + // n > 0 if (n <= 0) { return; } auto& waiting_list = waiting_lists[list_classifier]; std::vector execution_list; @@ -361,34 +412,40 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task // straightening while (true) { - if (waiting_lists[2].empty()) { - if (waiting_lists[0].empty()) { - if (waiting_lists[1].empty()) { - if (waiting_lists[3].empty()) { + if (waiting_lists[TaskClassifier::kRunASAP].empty()) { + if (waiting_lists[TaskClassifier::kWaitingTransfer].empty()) { + if (waiting_lists[TaskClassifier::kWaitingComputation].empty()) { + if (waiting_lists[TaskClassifier::kRunALAP].empty()) { + // All the waiting lists are empty break; } else { - execute(3, waiting_lists[3].size()); + // Execute all the nodes left + execute(TaskClassifier::kRunALAP, waiting_lists[TaskClassifier::kRunALAP].size()); } } else { - execute(1, 1); + // Execute one computation node + execute(TaskClassifier::kWaitingComputation, 1); } } else { int32_t computation_num = - std::min(int32_t(waiting_lists[1].size() / (waiting_lists[0].size())), - remain_task_nums[1] / remain_task_nums[0]); + std::min(int32_t(waiting_lists[TaskClassifier::kWaitingComputation].size() + / (waiting_lists[TaskClassifier::kWaitingTransfer].size())), + remain_task_nums[TaskClassifier::kWaitingComputation] + / remain_task_nums[TaskClassifier::kWaitingTransfer]); // Holding the transfer std::vector transfer_execution_list; - move2execution_list(waiting_lists[0], transfer_execution_list); - remain_task_nums[0] -= transfer_execution_list.size(); + move2execution_list(waiting_lists[TaskClassifier::kWaitingTransfer], + transfer_execution_list); + remain_task_nums[TaskClassifier::kWaitingTransfer] -= transfer_execution_list.size(); for (auto* transfer_node : transfer_execution_list) { SetOrderInGraph(transfer_node); } // Overlap transfer with computation - execute(1, computation_num); + execute(TaskClassifier::kWaitingComputation, computation_num); // Release the transfer for (auto* transfer_node : transfer_execution_list) { finish_execution(transfer_node); } } } else { - execute(2, waiting_lists[2].size()); + execute(TaskClassifier::kRunASAP, waiting_lists[TaskClassifier::kRunASAP].size()); } } } diff --git a/oneflow/core/graph/straighten_nodes.h b/oneflow/core/graph/straighten_nodes.h index c8e75ac3e30..66c0e2ed1c2 100644 --- a/oneflow/core/graph/straighten_nodes.h +++ b/oneflow/core/graph/straighten_nodes.h @@ -17,12 +17,25 @@ limitations under the License. #define STRAIGHTEN_NODES_H_ #include "oneflow/core/graph/task_graph.h" +#include "oneflow/core/job/task.pb.h" namespace oneflow { class OpGraph; class Job; +enum TaskClassifier : int { + kWaitingTransfer = 0, + kWaitingComputation = 1, + kRunASAP = 2, + kRunALAP = 3 +}; + +bool IsTransferNode(TaskType task_type); + +// Classifier for the set according to the task type +TaskClassifier GetTaskClassifier(const TaskNode* node); + class TopoStruct { public: TaskNode* node; From 86d2499891e74724cdbefbe3cccbdfac27e2325f Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Wed, 15 Jun 2022 12:12:21 +0800 Subject: [PATCH 13/22] Remove starts --- oneflow/core/graph/graph.h | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/oneflow/core/graph/graph.h b/oneflow/core/graph/graph.h index 2583cec899f..115de62a47c 100644 --- a/oneflow/core/graph/graph.h +++ b/oneflow/core/graph/graph.h @@ -68,7 +68,6 @@ class Graph { const std::function(NodeType*)>& Handler) const; Maybe TopoForEachNodeFastMaybe( - const std::list& starts, const std::function&)>& ForEachInNode, const std::function&)>& ForEachOutNode, const std::function(NodeType*)>& Handler) const; @@ -228,7 +227,7 @@ void Graph::TopoForEachNode(std::function N template void Graph::TopoForEachNodeFast( std::function NodeHandler) const { - CHECK_JUST(TopoForEachNodeFastMaybe(source_nodes(), &NodeType::ForEachNodeOnInEdge, + CHECK_JUST(TopoForEachNodeFastMaybe(&NodeType::ForEachNodeOnInEdge, &NodeType::ForEachNodeOnOutEdge, [&](NodeType* node) { NodeHandler(node); return Maybe::Ok(); @@ -245,8 +244,8 @@ Maybe Graph::TopoForEachNodeWithErrorCaptured( template Maybe Graph::TopoForEachNodeFastMaybe( std::function(NodeType*)> NodeHandler) const { - return TopoForEachNodeFastMaybe(source_nodes(), &NodeType::ForEachNodeOnInEdge, - &NodeType::ForEachNodeOnOutEdge, NodeHandler); + return TopoForEachNodeFastMaybe(&NodeType::ForEachNodeOnInEdge, &NodeType::ForEachNodeOnOutEdge, + NodeHandler); } template @@ -564,7 +563,6 @@ Maybe Graph::TopoForEachNodeWithErrorCaptured( template Maybe Graph::TopoForEachNodeFastMaybe( - const std::list& starts, const std::function&)>& ForEachInNode, const std::function&)>& ForEachOutNode, const std::function(NodeType*)>& Handler) const { From c480685d22432c352e063917c825863a3cfd0a6f Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Wed, 15 Jun 2022 18:09:45 +0800 Subject: [PATCH 14/22] Of format --- oneflow/core/graph/graph.h | 3 +- oneflow/core/graph/straighten_nodes.cpp | 81 +++++++++++++------------ oneflow/core/graph/straighten_nodes.h | 20 +++--- 3 files changed, 58 insertions(+), 46 deletions(-) diff --git a/oneflow/core/graph/graph.h b/oneflow/core/graph/graph.h index 115de62a47c..640318c1ec3 100644 --- a/oneflow/core/graph/graph.h +++ b/oneflow/core/graph/graph.h @@ -579,7 +579,8 @@ Maybe Graph::TopoForEachNodeFastMaybe( queue.pop(); JUST(Handler(cur_node)); ForEachOutNode(cur_node, [&](NodeType* out) { - if (--counter_in[out] == 0) { queue.push(out); } + --counter_in[out]; + if (counter_in[out] == 0) { queue.push(out); } }); } return Maybe::Ok(); diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 8fb49175148..02beb1a1854 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -20,7 +20,6 @@ limitations under the License. #include "oneflow/core/job/job_desc.h" #include "oneflow/core/common/protobuf.h" #include "oneflow/core/job/task.pb.h" -#include "oneflow/core/rpc/include/global_process_ctx.h" namespace oneflow { @@ -112,20 +111,20 @@ TaskClassifier GetTaskClassifier(const TaskNode* node) { // Drop down the maximum layer with the minimum layer form consumer void TopoStruct::DropTributaryLayer(int32_t upper_bound) { - if (upper_bound < TributaryLayer || TributaryLayer < 0) { TributaryLayer = upper_bound; } + if (upper_bound < tributary_layer || tributary_layer < 0) { tributary_layer = upper_bound; } } // Should initialize the counter to be the number of out edges // Compute maximum layer for tributaries void TopoStruct::SpreadTributaryLayer(HashMap* task_node2topo_struct) { - if (counter || MinLayer <= 0) { return; } + if (counter || min_layer <= 0) { return; } int32_t producer_max_lay = 0; - if (IfMainstem) { - producer_max_lay = MinLayer - 1; + if (on_mainstem) { + producer_max_lay = min_layer - 1; } else { // On a tributary, the operator could be run later. - producer_max_lay = TributaryLayer; - // producer_max_lay = TributaryLayer - 1; + producer_max_lay = tributary_layer; + // producer_max_lay = tributary_layer - 1; } node->ForEachNodeOnInEdge([&](TaskNode* in) { auto& topo_struct_in = task_node2topo_struct->at(in); @@ -141,14 +140,14 @@ void TopoStruct::SpreadTributaryLayer(HashMap* task_node2 // If so, judge it for its producer/upstream nodes void TopoStruct::SpreadMainstem(HashMap* task_node2topo_struct) { // Skip it if this node is already judged. - if (IfMainstem) { return; } - CHECK_GE(MinLayer, 0) << "TopoStruct not initialized!"; - IfMainstem = true; - // If I am in the mainstem, then all the children with (MinLayer >= my layer id - 1) would be + if (on_mainstem) { return; } + CHECK_GE(min_layer, 0) << "TopoStruct not initialized!"; + on_mainstem = true; + // If I am in the mainstem, then all the children with (min_layer >= my layer id - 1) would be // considered as in the mainstem node->ForEachNodeOnInEdge([&](TaskNode* in) { auto& topo_struct_in = task_node2topo_struct->at(in); - if (topo_struct_in.MinLayer == MinLayer - 1) { + if (topo_struct_in.min_layer == min_layer - 1) { topo_struct_in.SpreadTributaryLayer(task_node2topo_struct); } }); @@ -156,24 +155,30 @@ void TopoStruct::SpreadMainstem(HashMap* task_node2topo_s // The minimum computation distance from the beginning of this op to the next transfer int32_t TopoStruct::GetMinDistance2Transfer(HashMap* task_node2topo_struct) { - if (MinDistance2Transfer >= 0) { return MinDistance2Transfer; } + if (min_distance2transfer >= 0) { return min_distance2transfer; } // if this node is a transfer node if (IsTransferNode(node->GetTaskType())) { - MinDistance2Transfer = 0; - return MinDistance2Transfer; + min_distance2transfer = 0; + return min_distance2transfer; } // Otherwise, initialize it with a large number - MinDistance2Transfer = GetMaxVal(); + min_distance2transfer = GetMaxVal(); node->ForEachNodeOnOutEdge([&](TaskNode* out) { - MinDistance2Transfer = - std::min(MinDistance2Transfer, + min_distance2transfer = + std::min(min_distance2transfer, task_node2topo_struct->at(out).GetMinDistance2Transfer(task_node2topo_struct)); }); - ++MinDistance2Transfer; - return MinDistance2Transfer; + ++min_distance2transfer; + return min_distance2transfer; } // deciding parameter +// i = 0: those with small tributary layers go first +// i = 1: those with small minimum distance to transfer go first +// i = 2: first in first out +// i = 3: those with large tributary layers go first +// i = 4: those with long distance to transfer go first +// i = 5: last in first out int32_t TopoStruct::GetDecidingParameter(int32_t i) const { int32_t sign = 1; if (i >= 3) { @@ -181,9 +186,9 @@ int32_t TopoStruct::GetDecidingParameter(int32_t i) const { sign = -1; } switch (i) { - case 0: return sign * TributaryLayer; - case 1: return sign * MinDistance2Transfer; - case 2: return sign * MinLayer; + case 0: return sign * tributary_layer; + case 1: return sign * min_distance2transfer; + case 2: return sign * min_layer; } return 0; } @@ -191,20 +196,20 @@ int32_t TopoStruct::GetDecidingParameter(int32_t i) const { // Find the mianstem of the task graph, then reduce the wait time for tributaries void FindMainstem(HashMap* task_node2topo_struct) { // Find the maximum layer number - int32_t max_MinLayer = -1; + int32_t max_min_layer = -1; for (const auto& pair : *task_node2topo_struct) { - if (max_MinLayer < pair.second.MinLayer) { max_MinLayer = pair.second.MinLayer; } + if (max_min_layer < pair.second.min_layer) { max_min_layer = pair.second.min_layer; } } - // All the nodes with MinLayer>=mainstem_end_id would be considered as mainstem nodes + // All the nodes with min_layer>=mainstem_end_id would be considered as mainstem nodes // The last 5 layers would be considered as in mainstem anyway. - int32_t mainstem_end_id = max_MinLayer - 4; + int32_t mainstem_end_id = max_min_layer - 4; for (auto& pair : *task_node2topo_struct) { auto& topo_struct = pair.second; // Initialize the counter and Tributary Layer topo_struct.counter = pair.first->out_edges().size(); - topo_struct.TributaryLayer = max_MinLayer; + topo_struct.tributary_layer = max_min_layer; // Find out all the nodes on the mainstem. - if (topo_struct.MinLayer >= mainstem_end_id) { + if (topo_struct.min_layer >= mainstem_end_id) { topo_struct.SpreadMainstem(task_node2topo_struct); } } @@ -212,7 +217,7 @@ void FindMainstem(HashMap* task_node2topo_struct) { for (auto& pair : *task_node2topo_struct) { // Compute maximum layer for tributaries pair.second.SpreadTributaryLayer(task_node2topo_struct); - // Set the MinDistance2Transfer for each topological structure + // Set the min_distance2transfer for each topological structure pair.second.GetMinDistance2Transfer(task_node2topo_struct); } } @@ -227,20 +232,20 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task HashMap>> task_type2machine_id2node_id2topo_structs; std::map min_node_id2topo_struct; - int32_t previous_MinLayer = 0; + int32_t previous_min_layer = 0; task_graph->TopoForEachNodeFast([&](TaskNode* node) { auto& topo_struct = task_node2topo_struct[node]; topo_struct.node = node; if (node->in_edges().empty()) { - topo_struct.MinLayer = 0; + topo_struct.min_layer = 0; } else { int32_t max_min_layer = 0; node->ForEachNodeOnInEdge([&](TaskNode* in) { - max_min_layer = std::max(max_min_layer, task_node2topo_struct[in].MinLayer); + max_min_layer = std::max(max_min_layer, task_node2topo_struct[in].min_layer); }); - topo_struct.MinLayer = max_min_layer + 1; - // Deal with all the nodes with MinLayer=previous_MinLayer - if (max_min_layer >= previous_MinLayer) { + topo_struct.min_layer = max_min_layer + 1; + // Deal with all the nodes with min_layer=previous_min_layer + if (max_min_layer >= previous_min_layer) { // Using "7" to represent "and" // a7b means a pair (a, b) for (auto& task_type7machine_id2node_id2topo_structs : @@ -280,8 +285,8 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task } } } - // Renew the previous MinLayer at the end - previous_MinLayer = topo_struct.MinLayer; + // Renew the previous min_layer at the end + previous_min_layer = topo_struct.min_layer; } } // Put the topo structure into the map, waiting for determine the same nodes diff --git a/oneflow/core/graph/straighten_nodes.h b/oneflow/core/graph/straighten_nodes.h index 66c0e2ed1c2..42c6471c6fb 100644 --- a/oneflow/core/graph/straighten_nodes.h +++ b/oneflow/core/graph/straighten_nodes.h @@ -13,8 +13,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#ifndef STRAIGHTEN_NODES_H_ -#define STRAIGHTEN_NODES_H_ +#ifndef ONEFLOW_CORE_GRAPH_STRAIGHTEN_NODES_H_ +#define ONEFLOW_CORE_GRAPH_STRAIGHTEN_NODES_H_ #include "oneflow/core/graph/task_graph.h" #include "oneflow/core/job/task.pb.h" @@ -39,11 +39,11 @@ TaskClassifier GetTaskClassifier(const TaskNode* node); class TopoStruct { public: TaskNode* node; - int32_t MinLayer = -1; - int32_t TributaryLayer = -1; - bool IfMainstem = false; + int32_t min_layer = -1; + int32_t tributary_layer = -1; + bool on_mainstem = false; int32_t counter = 0; - int32_t MinDistance2Transfer = -1; + int32_t min_distance2transfer = -1; TopoStruct* next_same_node = nullptr; // We can have some other nodes in it for example // SbpNode* node; @@ -61,6 +61,12 @@ class TopoStruct { int32_t GetMinDistance2Transfer(HashMap* task_node2topo_struct); // deciding parameter + // i = 0: those with small tributary layers go first + // i = 1: those with small minimum distance to transfer go first + // i = 2: first in first out + // i = 3: those with large tributary layers go first + // i = 4: those with long distance to transfer go first + // i = 5: last in first out int32_t GetDecidingParameter(int32_t i) const; }; @@ -70,4 +76,4 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task } // namespace oneflow -#endif // STRAIGHTEN_NODES_H_ +#endif // ONEFLOW_CORE_GRAPH_STRAIGHTEN_NODES_H_ From c532e7c2a7f5c80f3f49c488cc091bacb237a500 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Wed, 15 Jun 2022 18:58:50 +0800 Subject: [PATCH 15/22] Fix a bug of using GetMaxVal() as an initial number for comparing --- oneflow/core/graph/straighten_nodes.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 02beb1a1854..404fcb7dd11 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -14,7 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "oneflow/core/graph/straighten_nodes.h" -#include "oneflow/core/common/data_type.h" #include "oneflow/core/graph/op_graph.h" #include "oneflow/core/graph/task_node.h" #include "oneflow/core/job/job_desc.h" @@ -162,7 +161,8 @@ int32_t TopoStruct::GetMinDistance2Transfer(HashMap* task return min_distance2transfer; } // Otherwise, initialize it with a large number - min_distance2transfer = GetMaxVal(); + // Well, the total number in the task graph is large enough + min_distance2transfer = task_node2topo_struct->size(); node->ForEachNodeOnOutEdge([&](TaskNode* out) { min_distance2transfer = std::min(min_distance2transfer, From 60e7800e198f2cc3837bd239b3a11cc361492216 Mon Sep 17 00:00:00 2001 From: Yinggang Wang Date: Thu, 16 Jun 2022 16:14:52 +0800 Subject: [PATCH 16/22] Refactor add straighten algo interface (#8435) * feat(*): export straighten nodes algorithm inferface * export documentation * Update python/oneflow/nn/graph/graph_config.py Co-authored-by: Yipeng Li Co-authored-by: Yipeng Li --- docs/source/graph.rst | 1 + oneflow/core/graph/task_graph.cpp | 4 ++-- oneflow/core/graph/task_graph.h | 2 +- oneflow/core/job/compiler.cpp | 3 ++- oneflow/core/job/job_conf.proto | 2 ++ python/oneflow/nn/graph/graph_config.py | 10 ++++++++++ 6 files changed, 18 insertions(+), 4 deletions(-) diff --git a/docs/source/graph.rst b/docs/source/graph.rst index 5ec08061a8a..5abafc5e9b2 100644 --- a/docs/source/graph.rst +++ b/docs/source/graph.rst @@ -27,6 +27,7 @@ Base class for running neural networks in Static Graph Mode. set_zero_redundancy_optimizer_mode, set_zero_redundancy_optimizer_min_size_after_split, enable_cudnn_conv_heuristic_search_algo, + enable_random_straighten_nodes, :member-order: bysource diff --git a/oneflow/core/graph/task_graph.cpp b/oneflow/core/graph/task_graph.cpp index abd28f421f4..817add04e28 100644 --- a/oneflow/core/graph/task_graph.cpp +++ b/oneflow/core/graph/task_graph.cpp @@ -420,7 +420,7 @@ void ForEachOpGraphNecessaryCtrlEdge( } // namespace -TaskGraph::TaskGraph() { +TaskGraph::TaskGraph(bool random_straighten_nodes) { OpGraph* op_graph = Global::Get(); sub_tsk_gph_builder_ctx_.reset(new SubTskGphBuilderCtx(this)); boxing_logger_ = CreateBoxingLogger(); @@ -451,7 +451,7 @@ TaskGraph::TaskGraph() { } }); - if (ParseBooleanFromEnv("ONEFLOW_RANDOM_STRAIGHTEN_NODES", false)) { + if (random_straighten_nodes) { SetOrderInGraphForEachNode(); } else { StraightenNodes(this, &ordered_task_nodes_); diff --git a/oneflow/core/graph/task_graph.h b/oneflow/core/graph/task_graph.h index 71593a834f1..aaf5c1e9f54 100644 --- a/oneflow/core/graph/task_graph.h +++ b/oneflow/core/graph/task_graph.h @@ -43,7 +43,7 @@ class TaskGraph final : public Graph { OF_DISALLOW_COPY_AND_MOVE(TaskGraph); ~TaskGraph() override; - explicit TaskGraph(); + explicit TaskGraph(bool random_straighten_nodes); const char* TypeName() const override { return "TaskGraph"; } void RemoveEmptyRegsts(); diff --git a/oneflow/core/job/compiler.cpp b/oneflow/core/job/compiler.cpp index 7cdcbb9a5e1..306bd3cbac5 100644 --- a/oneflow/core/job/compiler.cpp +++ b/oneflow/core/job/compiler.cpp @@ -61,7 +61,8 @@ void Compiler::Compile(Job* job, Plan* plan, bool need_job_complete) const { // Step3: build task_gph. // TODO(levi): we can rewrite this part of code in visitor pattern. - auto task_gph = std::make_unique(); + auto task_gph = + std::make_unique(job->job_conf().random_straighten_nodes_in_task_graph()); using std::placeholders::_1; task_gph->ForEachNode(std::bind(&TaskNode::ProduceAllRegstsAndBindEdges, _1)); task_gph->ForEachNode(std::bind(&TaskNode::ConsumeAllRegsts, _1)); diff --git a/oneflow/core/job/job_conf.proto b/oneflow/core/job/job_conf.proto index 69aa7ad29f0..42cae2d4682 100644 --- a/oneflow/core/job/job_conf.proto +++ b/oneflow/core/job/job_conf.proto @@ -239,6 +239,8 @@ message JobConfigProto { optional bool cudnn_conv_enable_pseudo_half = 600 [default = true]; optional bool enable_auto_mixed_precision = 602 [default = false]; optional bool enable_quantization_aware_training = 603 [default = false]; + + optional bool random_straighten_nodes_in_task_graph = 700 [default = false]; optional int64 concurrency_width = 1000 [default = 128]; diff --git a/python/oneflow/nn/graph/graph_config.py b/python/oneflow/nn/graph/graph_config.py index dfb3795b3ac..854502df70e 100644 --- a/python/oneflow/nn/graph/graph_config.py +++ b/python/oneflow/nn/graph/graph_config.py @@ -269,6 +269,16 @@ def build(self, x): """ self.proto.cudnn_conv_heuristic_search_algo = mode + def enable_random_straighten_nodes(self, mode: bool = False): + r""" Whether turn off the straighten algorithm. + + If using nccl compute stream, turning it on might not speed up the training. + If not using nccl compute stream, turning it on might slow down data parallelism by 0.6% and slow down model parallelism by 6%. + + The switch is off by default (i.e. use the straighten algorithm by default). + """ + self.proto.random_straighten_nodes_in_task_graph = mode + def _generate_optimizer_and_variable_configs( self, opt_dict: OptDict = None, variables_conf: OrderedDict = None, ): From 2f0b93bc5efb890333bb045fb19c075449fcfeee Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Fri, 17 Jun 2022 14:17:43 +0800 Subject: [PATCH 17/22] Use TopoForEachNodeFast as default. (#8436) * Use TopoForEachNodeFast as default. Rename the original one as TopoForEachNodeDynamic * Speed up TopoForEachNodeFast when traversing a subgraph --- oneflow/core/graph/graph.h | 135 +++++++++++++++++++----- oneflow/core/graph/op_graph.cpp | 3 +- oneflow/core/graph/straighten_nodes.cpp | 2 +- 3 files changed, 108 insertions(+), 32 deletions(-) diff --git a/oneflow/core/graph/graph.h b/oneflow/core/graph/graph.h index 640318c1ec3..b9f62e01696 100644 --- a/oneflow/core/graph/graph.h +++ b/oneflow/core/graph/graph.h @@ -34,11 +34,15 @@ class Graph { // For Each void ForEachNode(std::function NodeHandler) const; Maybe MaybeForEachNode(std::function(NodeType*)> NodeHandler) const; + // In case you want to change the topological structure during the node handler. + // For example, adding/deleting a node or an edge. + // Still, it might have bugs even if you use TopoForEachNodeDynamic. + void TopoForEachNodeDynamic(std::function NodeHandler) const; void TopoForEachNode(std::function NodeHandler) const; - void TopoForEachNodeFast(std::function NodeHandler) const; + Maybe TopoForEachNodeDynamicWithErrorCaptured( + std::function(NodeType*)> NodeHandler) const; Maybe TopoForEachNodeWithErrorCaptured( std::function(NodeType*)> NodeHandler) const; - Maybe TopoForEachNodeFastMaybe(std::function(NodeType*)> NodeHandler) const; void ReverseTopoForEachNode(std::function NodeHandler) const; void ForEachEdge(std::function EdgeHandler) const; @@ -55,19 +59,36 @@ class Graph { const std::function&)>& ForEachNext, const std::function& Handler) const; + void TopoForEachNodeDynamic( + const std::list& starts, + const std::function&)>& ForEachInNode, + const std::function&)>& ForEachOutNode, + const std::function& Handler) const; + void TopoForEachNode( const std::list& starts, const std::function&)>& ForEachInNode, const std::function&)>& ForEachOutNode, const std::function& Handler) const; + void TopoForEachNode( + const std::function&)>& ForEachInNode, + const std::function&)>& ForEachOutNode, + const std::function& Handler) const; + + Maybe TopoForEachNodeDynamicWithErrorCaptured( + const std::list& starts, + const std::function&)>& ForEachInNode, + const std::function&)>& ForEachOutNode, + const std::function(NodeType*)>& Handler) const; + Maybe TopoForEachNodeWithErrorCaptured( const std::list& starts, const std::function&)>& ForEachInNode, const std::function&)>& ForEachOutNode, const std::function(NodeType*)>& Handler) const; - Maybe TopoForEachNodeFastMaybe( + Maybe TopoForEachNodeWithErrorCaptured( const std::function&)>& ForEachInNode, const std::function&)>& ForEachOutNode, const std::function(NodeType*)>& Handler) const; @@ -219,33 +240,33 @@ NodeType* Graph::SoleSinkNode() const { } template -void Graph::TopoForEachNode(std::function NodeHandler) const { - TopoForEachNode(source_nodes(), &NodeType::ForEachNodeOnInEdge, &NodeType::ForEachNodeOnOutEdge, - NodeHandler); +void Graph::TopoForEachNodeDynamic( + std::function NodeHandler) const { + TopoForEachNodeDynamic(source_nodes(), &NodeType::ForEachNodeOnInEdge, + &NodeType::ForEachNodeOnOutEdge, NodeHandler); } template -void Graph::TopoForEachNodeFast( - std::function NodeHandler) const { - CHECK_JUST(TopoForEachNodeFastMaybe(&NodeType::ForEachNodeOnInEdge, - &NodeType::ForEachNodeOnOutEdge, [&](NodeType* node) { - NodeHandler(node); - return Maybe::Ok(); - })); +void Graph::TopoForEachNode(std::function NodeHandler) const { + CHECK_JUST(TopoForEachNodeWithErrorCaptured(&NodeType::ForEachNodeOnInEdge, + &NodeType::ForEachNodeOnOutEdge, [&](NodeType* node) { + NodeHandler(node); + return Maybe::Ok(); + })); } template -Maybe Graph::TopoForEachNodeWithErrorCaptured( +Maybe Graph::TopoForEachNodeDynamicWithErrorCaptured( std::function(NodeType*)> NodeHandler) const { - return TopoForEachNodeWithErrorCaptured(source_nodes(), &NodeType::ForEachNodeOnInEdge, - &NodeType::ForEachNodeOnOutEdge, NodeHandler); + return TopoForEachNodeDynamicWithErrorCaptured(source_nodes(), &NodeType::ForEachNodeOnInEdge, + &NodeType::ForEachNodeOnOutEdge, NodeHandler); } template -Maybe Graph::TopoForEachNodeFastMaybe( +Maybe Graph::TopoForEachNodeWithErrorCaptured( std::function(NodeType*)> NodeHandler) const { - return TopoForEachNodeFastMaybe(&NodeType::ForEachNodeOnInEdge, &NodeType::ForEachNodeOnOutEdge, - NodeHandler); + return TopoForEachNodeWithErrorCaptured(&NodeType::ForEachNodeOnInEdge, + &NodeType::ForEachNodeOnOutEdge, NodeHandler); } template @@ -253,15 +274,14 @@ void Graph::SortedTopoForEachNode( std::function LessThan, std::function NodeHandler) const { ForEachNode([&](NodeType* node) { node->SortInOutEdges(LessThan); }); - TopoForEachNode(source_nodes(), &NodeType::ForEachNodeOnSortedInEdge, - &NodeType::ForEachNodeOnSortedOutEdge, NodeHandler); + TopoForEachNode(&NodeType::ForEachNodeOnSortedInEdge, &NodeType::ForEachNodeOnSortedOutEdge, + NodeHandler); } template void Graph::ReverseTopoForEachNode( std::function NodeHandler) const { - TopoForEachNode(sink_nodes(), &NodeType::ForEachNodeOnOutEdge, &NodeType::ForEachNodeOnInEdge, - NodeHandler); + TopoForEachNode(&NodeType::ForEachNodeOnOutEdge, &NodeType::ForEachNodeOnInEdge, NodeHandler); } template @@ -517,6 +537,19 @@ std::unique_ptr> Graph::FindFirstNontrivi return std::unique_ptr>(); } +template +void Graph::TopoForEachNodeDynamic( + const std::list& starts, + const std::function&)>& ForEachInNode, + const std::function&)>& ForEachOutNode, + const std::function& Handler) const { + CHECK_JUST(TopoForEachNodeDynamicWithErrorCaptured(starts, ForEachInNode, ForEachOutNode, + [&](NodeType* node) { + Handler(node); + return Maybe::Ok(); + })); +} + template void Graph::TopoForEachNode( const std::list& starts, @@ -531,7 +564,18 @@ void Graph::TopoForEachNode( } template -Maybe Graph::TopoForEachNodeWithErrorCaptured( +void Graph::TopoForEachNode( + const std::function&)>& ForEachInNode, + const std::function&)>& ForEachOutNode, + const std::function& Handler) const { + CHECK_JUST(TopoForEachNodeWithErrorCaptured(ForEachInNode, ForEachOutNode, [&](NodeType* node) { + Handler(node); + return Maybe::Ok(); + })); +} + +template +Maybe Graph::TopoForEachNodeDynamicWithErrorCaptured( const std::list& starts, const std::function&)>& ForEachInNode, const std::function&)>& ForEachOutNode, @@ -562,7 +606,40 @@ Maybe Graph::TopoForEachNodeWithErrorCaptured( } template -Maybe Graph::TopoForEachNodeFastMaybe( +Maybe Graph::TopoForEachNodeWithErrorCaptured( + const std::list& starts, + const std::function&)>& ForEachInNode, + const std::function&)>& ForEachOutNode, + const std::function(NodeType*)>& Handler) const { + HashMap counter_in; + std::queue queue; + for (NodeType* start : starts) { + queue.push(start); + counter_in[start] = 0; + ForEachInNode(start, [&](NodeType*) { LOG(FATAL) << "not a source"; }); + } + while (!queue.empty()) { + NodeType* cur_node = queue.front(); + queue.pop(); + JUST(Handler(cur_node)); + ForEachOutNode(cur_node, [&](NodeType* out) { + auto it = counter_in.find(out); + // Move the initialization here + if (it == counter_in.end()) { + int32_t count = 0; + ForEachInNode(out, [&](NodeType* out_in) { count++; }); + counter_in[out] = count; + it = counter_in.find(out); + } + it->second--; + if (it->second == 0) { queue.push(out); } + }); + } + return Maybe::Ok(); +} + +template +Maybe Graph::TopoForEachNodeWithErrorCaptured( const std::function&)>& ForEachInNode, const std::function&)>& ForEachOutNode, const std::function(NodeType*)>& Handler) const { @@ -595,7 +672,7 @@ void Graph::DfsTopoForEachNodeSortByDistanceToSink( HashMap node2distance_to_sink; { std::list nodes; - TopoForEachNode(starts, ForEachInNode, ForEachOutNode, + TopoForEachNode(ForEachInNode, ForEachOutNode, [&](NodeType* node) { nodes.emplace_back(node); }); std::list sinks; for (NodeType* node : nodes) { @@ -603,7 +680,7 @@ void Graph::DfsTopoForEachNodeSortByDistanceToSink( ForEachOutNode(node, [&](NodeType* out_node) { is_sink = false; }); if (is_sink) { sinks.emplace_back(node); } } - TopoForEachNode(sinks, ForEachOutNode, ForEachInNode, [&](NodeType* node) { + TopoForEachNode(ForEachOutNode, ForEachInNode, [&](NodeType* node) { int64_t distance_to_sink = -1; ForEachOutNode(node, [&](NodeType* out_node) { distance_to_sink = std::max(distance_to_sink, node2distance_to_sink[out_node]); @@ -698,12 +775,12 @@ Graph::MakePredicatorIsReachable( std::shared_ptr id2ancestor(new Id2Ancestor(node_num())); int64_t id = 0; node2id->reserve(node_num()); - TopoForEachNode(starts, ForEachInNode, ForEachOutNode, [&](NodeType* node) { + TopoForEachNode(ForEachInNode, ForEachOutNode, [&](NodeType* node) { node2id->emplace(node, id); id2ancestor->at(id).Resize(node_num()); id += 1; }); - TopoForEachNode(starts, ForEachInNode, ForEachOutNode, [&](NodeType* node) { + TopoForEachNode(ForEachInNode, ForEachOutNode, [&](NodeType* node) { const int64_t node_id = node2id->at(node); auto& ancestor_bitset_vec = id2ancestor->at(node_id); ForEachInNode(node, [&](NodeType* in_node) { diff --git a/oneflow/core/graph/op_graph.cpp b/oneflow/core/graph/op_graph.cpp index 09df4962fcb..82e8b20088d 100644 --- a/oneflow/core/graph/op_graph.cpp +++ b/oneflow/core/graph/op_graph.cpp @@ -466,8 +466,7 @@ void OpGraph::TopoForEachNodeWithCtrlEdge(const std::function& No const std::function& Handler) { ForEachDataAndCtrlOutNode(node, Handler); }; - TopoForEachNode(DataOrCtrlSourceNodes(), OpGraphForEachInDataAndCtrlNode, - OpGraphForEachOutDataAndCtrlNode, NodeHandler); + TopoForEachNode(OpGraphForEachInDataAndCtrlNode, OpGraphForEachOutDataAndCtrlNode, NodeHandler); } std::function diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 404fcb7dd11..ca5cc163fe6 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -233,7 +233,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task task_type2machine_id2node_id2topo_structs; std::map min_node_id2topo_struct; int32_t previous_min_layer = 0; - task_graph->TopoForEachNodeFast([&](TaskNode* node) { + task_graph->TopoForEachNode([&](TaskNode* node) { auto& topo_struct = task_node2topo_struct[node]; topo_struct.node = node; if (node->in_edges().empty()) { From 19f0e81a2d54adcd8df5a89223c147e7df5205e3 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Fri, 17 Jun 2022 16:50:45 +0800 Subject: [PATCH 18/22] Rename the switch and code clean up --- docs/source/graph.rst | 2 +- oneflow/core/graph/straighten_nodes.cpp | 15 +-------------- oneflow/core/graph/task_graph.cpp | 4 ++-- oneflow/core/graph/task_graph.h | 2 +- oneflow/core/job/compiler.cpp | 2 +- oneflow/core/job/job_conf.proto | 2 +- python/oneflow/nn/graph/graph_config.py | 6 +++--- 7 files changed, 10 insertions(+), 23 deletions(-) diff --git a/docs/source/graph.rst b/docs/source/graph.rst index 5abafc5e9b2..5b4abb12a81 100644 --- a/docs/source/graph.rst +++ b/docs/source/graph.rst @@ -27,7 +27,7 @@ Base class for running neural networks in Static Graph Mode. set_zero_redundancy_optimizer_mode, set_zero_redundancy_optimizer_min_size_after_split, enable_cudnn_conv_heuristic_search_algo, - enable_random_straighten_nodes, + disable_straighten_algorithm, :member-order: bysource diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index ca5cc163fe6..50a845220eb 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -24,17 +24,6 @@ namespace oneflow { namespace { -// The same order in the set -// It is only run in the following situation because there are too many implicit conditions. -// bool ShouldRunSimultaneously(TopoStruct* a, TopoStruct* b) { -// // Normal node would have the same name -// if (a->node->GetTaskType() == 1) { return a->node->VisualStr() == b->node->VisualStr(); } -// // Otherwise they must have the same parameters with different machine ids and the closest node -// // id. We only use Min Layer here, since Tributary Layer might be different due to asymmetry of -// // graph. -// return true; -// }; - // move the head from source to target void MoveFrontBetweenMaps(std::map& source, std::map& target) { @@ -265,9 +254,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task // Find out all the same nodes // Stop using Visual string before we find a better key // Currently we can use the topological structure and node id to decide the same nodes - // if (ShouldRunSimultaneously(topo_struct_min_node_id, curr_topo_struct)) { same_nodes.push_back(curr_topo_struct); - // } } // Cyclize them for (int32_t i = 1; i < same_nodes.size(); i++) { @@ -297,7 +284,7 @@ void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task // Generate other parameters in the topological data structure FindMainstem(&task_node2topo_struct); - LOG(INFO) << "Straightening order: " << 5 << ", " << 3; + VLOG(3) << "Straightening order: " << 5 << ", " << 3; // Order in the waiting sets // Decide which node should run first diff --git a/oneflow/core/graph/task_graph.cpp b/oneflow/core/graph/task_graph.cpp index 817add04e28..404b93a455a 100644 --- a/oneflow/core/graph/task_graph.cpp +++ b/oneflow/core/graph/task_graph.cpp @@ -420,7 +420,7 @@ void ForEachOpGraphNecessaryCtrlEdge( } // namespace -TaskGraph::TaskGraph(bool random_straighten_nodes) { +TaskGraph::TaskGraph(bool disable_straighten_algorithm) { OpGraph* op_graph = Global::Get(); sub_tsk_gph_builder_ctx_.reset(new SubTskGphBuilderCtx(this)); boxing_logger_ = CreateBoxingLogger(); @@ -451,7 +451,7 @@ TaskGraph::TaskGraph(bool random_straighten_nodes) { } }); - if (random_straighten_nodes) { + if (disable_straighten_algorithm) { SetOrderInGraphForEachNode(); } else { StraightenNodes(this, &ordered_task_nodes_); diff --git a/oneflow/core/graph/task_graph.h b/oneflow/core/graph/task_graph.h index aaf5c1e9f54..2ec3e15f18e 100644 --- a/oneflow/core/graph/task_graph.h +++ b/oneflow/core/graph/task_graph.h @@ -43,7 +43,7 @@ class TaskGraph final : public Graph { OF_DISALLOW_COPY_AND_MOVE(TaskGraph); ~TaskGraph() override; - explicit TaskGraph(bool random_straighten_nodes); + explicit TaskGraph(bool disable_straighten_algorithm); const char* TypeName() const override { return "TaskGraph"; } void RemoveEmptyRegsts(); diff --git a/oneflow/core/job/compiler.cpp b/oneflow/core/job/compiler.cpp index 306bd3cbac5..a2d47a1d38a 100644 --- a/oneflow/core/job/compiler.cpp +++ b/oneflow/core/job/compiler.cpp @@ -62,7 +62,7 @@ void Compiler::Compile(Job* job, Plan* plan, bool need_job_complete) const { // Step3: build task_gph. // TODO(levi): we can rewrite this part of code in visitor pattern. auto task_gph = - std::make_unique(job->job_conf().random_straighten_nodes_in_task_graph()); + std::make_unique(job->job_conf().disable_straighten_algorithm_in_task_graph()); using std::placeholders::_1; task_gph->ForEachNode(std::bind(&TaskNode::ProduceAllRegstsAndBindEdges, _1)); task_gph->ForEachNode(std::bind(&TaskNode::ConsumeAllRegsts, _1)); diff --git a/oneflow/core/job/job_conf.proto b/oneflow/core/job/job_conf.proto index 42cae2d4682..79f98fecb0b 100644 --- a/oneflow/core/job/job_conf.proto +++ b/oneflow/core/job/job_conf.proto @@ -240,7 +240,7 @@ message JobConfigProto { optional bool enable_auto_mixed_precision = 602 [default = false]; optional bool enable_quantization_aware_training = 603 [default = false]; - optional bool random_straighten_nodes_in_task_graph = 700 [default = false]; + optional bool disable_straighten_algorithm_in_task_graph = 700 [default = false]; optional int64 concurrency_width = 1000 [default = 128]; diff --git a/python/oneflow/nn/graph/graph_config.py b/python/oneflow/nn/graph/graph_config.py index 854502df70e..c38d3e3ba5f 100644 --- a/python/oneflow/nn/graph/graph_config.py +++ b/python/oneflow/nn/graph/graph_config.py @@ -269,15 +269,15 @@ def build(self, x): """ self.proto.cudnn_conv_heuristic_search_algo = mode - def enable_random_straighten_nodes(self, mode: bool = False): - r""" Whether turn off the straighten algorithm. + def disable_straighten_algorithm(self, mode: bool = False): + r""" Whether we disable the straighten algorithm. If using nccl compute stream, turning it on might not speed up the training. If not using nccl compute stream, turning it on might slow down data parallelism by 0.6% and slow down model parallelism by 6%. The switch is off by default (i.e. use the straighten algorithm by default). """ - self.proto.random_straighten_nodes_in_task_graph = mode + self.proto.disable_straighten_algorithm_in_task_graph = mode def _generate_optimizer_and_variable_configs( self, opt_dict: OptDict = None, variables_conf: OrderedDict = None, From 513828c5b580a791dfa99a533f1bfc9b56f2c732 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Fri, 17 Jun 2022 17:23:20 +0800 Subject: [PATCH 19/22] Hide the class TopoStruct --- oneflow/core/graph/straighten_nodes.cpp | 41 +++++++++++++++++++ oneflow/core/graph/straighten_nodes.h | 52 ------------------------- 2 files changed, 41 insertions(+), 52 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 50a845220eb..be5911bfd4a 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -22,6 +22,47 @@ limitations under the License. namespace oneflow { +enum TaskClassifier : int { + kWaitingTransfer = 0, + kWaitingComputation = 1, + kRunASAP = 2, + kRunALAP = 3 +}; + +class TopoStruct { + public: + TaskNode* node; + int32_t min_layer = -1; + int32_t tributary_layer = -1; + bool on_mainstem = false; + int32_t counter = 0; + int32_t min_distance2transfer = -1; + TopoStruct* next_same_node = nullptr; + // We can have some other nodes in it for example + // SbpNode* node; + // SbpEdge* node; + // Or we can omit all the pointers and leave all the useful parameters. + + // Drop down the tributary layer + void DropTributaryLayer(int32_t upper_bound); + + void SpreadTributaryLayer(HashMap* task_node2topo_struct); + + void SpreadMainstem(HashMap* task_node2topo_struct); + + // The minimum computation distance from the beginning of this op to the next transfer + int32_t GetMinDistance2Transfer(HashMap* task_node2topo_struct); + + // deciding parameter + // i = 0: those with small tributary layers go first + // i = 1: those with small minimum distance to transfer go first + // i = 2: first in first out + // i = 3: those with large tributary layers go first + // i = 4: those with long distance to transfer go first + // i = 5: last in first out + int32_t GetDecidingParameter(int32_t i) const; +}; + namespace { // move the head from source to target diff --git a/oneflow/core/graph/straighten_nodes.h b/oneflow/core/graph/straighten_nodes.h index 42c6471c6fb..e68a03c698c 100644 --- a/oneflow/core/graph/straighten_nodes.h +++ b/oneflow/core/graph/straighten_nodes.h @@ -17,61 +17,9 @@ limitations under the License. #define ONEFLOW_CORE_GRAPH_STRAIGHTEN_NODES_H_ #include "oneflow/core/graph/task_graph.h" -#include "oneflow/core/job/task.pb.h" namespace oneflow { -class OpGraph; -class Job; - -enum TaskClassifier : int { - kWaitingTransfer = 0, - kWaitingComputation = 1, - kRunASAP = 2, - kRunALAP = 3 -}; - -bool IsTransferNode(TaskType task_type); - -// Classifier for the set according to the task type -TaskClassifier GetTaskClassifier(const TaskNode* node); - -class TopoStruct { - public: - TaskNode* node; - int32_t min_layer = -1; - int32_t tributary_layer = -1; - bool on_mainstem = false; - int32_t counter = 0; - int32_t min_distance2transfer = -1; - TopoStruct* next_same_node = nullptr; - // We can have some other nodes in it for example - // SbpNode* node; - // SbpEdge* node; - // Or we can omit all the pointers and leave all the useful parameters. - - // Drop down the tributary layer - void DropTributaryLayer(int32_t upper_bound); - - void SpreadTributaryLayer(HashMap* task_node2topo_struct); - - void SpreadMainstem(HashMap* task_node2topo_struct); - - // The minimum computation distance from the beginning of this op to the next transfer - int32_t GetMinDistance2Transfer(HashMap* task_node2topo_struct); - - // deciding parameter - // i = 0: those with small tributary layers go first - // i = 1: those with small minimum distance to transfer go first - // i = 2: first in first out - // i = 3: those with large tributary layers go first - // i = 4: those with long distance to transfer go first - // i = 5: last in first out - int32_t GetDecidingParameter(int32_t i) const; -}; - -void FindMainstem(HashMap* task_node2topo_struct); - void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task_nodes); } // namespace oneflow From e1bd526420bf715d54270665af064a92a67e98b5 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Fri, 17 Jun 2022 17:41:53 +0800 Subject: [PATCH 20/22] Hide all the other functions --- oneflow/core/graph/straighten_nodes.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index be5911bfd4a..8a54e2b9dd8 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -22,6 +22,8 @@ limitations under the License. namespace oneflow { +namespace { + enum TaskClassifier : int { kWaitingTransfer = 0, kWaitingComputation = 1, @@ -63,8 +65,6 @@ class TopoStruct { int32_t GetDecidingParameter(int32_t i) const; }; -namespace { - // move the head from source to target void MoveFrontBetweenMaps(std::map& source, std::map& target) { @@ -98,8 +98,6 @@ bool ShouldRunASAP(TaskType task_type) { } } -} // anonymous namespace - bool IsTransferNode(TaskType task_type) { // return task_type == 12 || task_type == 13 || (48 <= task_type && task_type <= 64); // They are sorted according to frequency of occurrences @@ -252,6 +250,8 @@ void FindMainstem(HashMap* task_node2topo_struct) { } } +} // anonymous namespace + void StraightenNodes(TaskGraph* task_graph, std::vector* ordered_task_nodes) { // The function for settle the order in the graph int64_t order_in_graph = 0; From de4c5aebbbd3d847b7d3f4c6490f81590f9e3ab6 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Fri, 17 Jun 2022 18:04:30 +0800 Subject: [PATCH 21/22] Grammar --- oneflow/core/graph/straighten_nodes.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 8a54e2b9dd8..3c44030829a 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -151,7 +151,6 @@ void TopoStruct::SpreadTributaryLayer(HashMap* task_node2 } else { // On a tributary, the operator could be run later. producer_max_lay = tributary_layer; - // producer_max_lay = tributary_layer - 1; } node->ForEachNodeOnInEdge([&](TaskNode* in) { auto& topo_struct_in = task_node2topo_struct->at(in); @@ -159,7 +158,7 @@ void TopoStruct::SpreadTributaryLayer(HashMap* task_node2 --topo_struct_in.counter; if (topo_struct_in.counter == 0) { topo_struct_in.SpreadTributaryLayer(task_node2topo_struct); } }); - // Reduce counter to -1 to avoid visitting again + // Reduce counter to -1 to avoid visiting again counter--; } @@ -221,7 +220,7 @@ int32_t TopoStruct::GetDecidingParameter(int32_t i) const { return 0; } -// Find the mianstem of the task graph, then reduce the wait time for tributaries +// Find the mainstem of the task graph, then reduce the wait time for tributaries void FindMainstem(HashMap* task_node2topo_struct) { // Find the maximum layer number int32_t max_min_layer = -1; From 5232350b632becd7eeaef290ebc87a15bc6fc3e5 Mon Sep 17 00:00:00 2001 From: Yipeng Li Date: Fri, 17 Jun 2022 19:28:32 +0800 Subject: [PATCH 22/22] Of format --- oneflow/core/graph/straighten_nodes.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oneflow/core/graph/straighten_nodes.cpp b/oneflow/core/graph/straighten_nodes.cpp index 3c44030829a..1e708e19df0 100644 --- a/oneflow/core/graph/straighten_nodes.cpp +++ b/oneflow/core/graph/straighten_nodes.cpp @@ -33,7 +33,7 @@ enum TaskClassifier : int { class TopoStruct { public: - TaskNode* node; + TaskNode* node = nullptr; int32_t min_layer = -1; int32_t tributary_layer = -1; bool on_mainstem = false;