Skip to content

Commit

Permalink
refine_error_info_with_diff_device_input_and_fix_typo (#8054)
Browse files Browse the repository at this point in the history
* refine_error_info_with_diff_device_input_and_fix_typo

* add positional info

Co-authored-by: Houjiang Chen <chenhoujiangcug@gmail.com>
  • Loading branch information
clackhan and hjchen2 authored Apr 29, 2022
1 parent da5cdd1 commit 882b38e
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 26 deletions.
2 changes: 1 addition & 1 deletion oneflow/api/python/symbol/placement_symbol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ struct PlacementSymbolExportUtil {
if (type == "cuda") {
const int64_t gpu_device_num = GetGpuDeviceNum();
CHECK_NE_OR_RETURN(gpu_device_num, 0)
<< "Can\'t construct placment with \"cuda\" type because there is no CUDA device!";
<< "Can\'t construct placement with \"cuda\" type because there is no CUDA device!";
device_num = std::min(device_num, gpu_device_num);
}
std::vector<std::string> machine_device_ids;
Expand Down
16 changes: 14 additions & 2 deletions oneflow/core/framework/consistent_tensor_infer_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
#include "oneflow/core/framework/tensor.h"
#include "oneflow/core/framework/op_expr.h"
#include "oneflow/core/framework/user_op_registry_manager.h"
#include "oneflow/core/common/container_util.h"

namespace oneflow {
namespace one {
Expand Down Expand Up @@ -164,8 +165,19 @@ Maybe<void> CheckInputParallelDescIdentical(const ConsistentTensorMetaInferArgs&
if (infer_args.input_consistent_tensor_metas().empty()) { return Maybe<void>::Ok(); }
const auto& first_parallel_desc =
infer_args.input_consistent_tensor_metas().begin()->tensor_meta()->parallel_desc();
for (const auto& input_meta : infer_args.input_consistent_tensor_metas()) {
CHECK_OR_RETURN(first_parallel_desc == input_meta.tensor_meta()->parallel_desc());
for (int i = 0; i < infer_args.input_consistent_tensor_metas().size(); ++i) {
CHECK_OR_RETURN(first_parallel_desc
== JUST(VectorAt(infer_args.input_consistent_tensor_metas(), i))
.tensor_meta()
->parallel_desc())
<< Error::RuntimeError()
<< "Expected all tensors to be on the same placement, but found "
"at least two placements, "
<< *JUST(PlacementToString(first_parallel_desc)) << " (positional 0) and "
<< *JUST(PlacementToString(JUST(VectorAt(infer_args.input_consistent_tensor_metas(), i))
.tensor_meta()
->parallel_desc()))
<< " (positional " << i << ")!";
}
return Maybe<void>::Ok();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ Maybe<void> NaiveInterpret(const UserOpExpr& user_op_expr, const TensorTuple& in
for (int i = 0; i < inputs.size(); i++) {
const auto& input_device = JUST(inputs.at(i)->device());
if (i > 0) {
CHECK_OR_RETURN(*default_device == *input_device) << Error::InputDeviceNotMatchError();
CHECK_OR_RETURN(*default_device == *input_device)
<< Error::RuntimeError()
<< "Expected all tensors to be on the same device, but found at least two devices, "
<< default_device->ToString() << " (positional 0) and " << input_device->ToString()
<< " (positional " << i << ")!";
}
input_eager_blob_objects->at(i) = JUST(inputs.at(i)->eager_blob_object());
}
Expand Down
33 changes: 22 additions & 11 deletions oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -873,18 +873,29 @@ Maybe<void> LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTu

for (int i = 0; i < inputs.size(); ++i) {
const auto& input_tensor = inputs.at(i);
CHECK_OR_RETURN(device_tag == JUST(GetDeviceTagOfTensor(input_tensor)))
<< " Lazy nn.Graph name : " << graph_name << " encountered ERROR where multi-input tensor"
<< " has different device types in module/op_name: " << new_op_name
<< ". Please use tensor.to() or tensor.to_global() to synchronize all the input with the "
"same device.";
// TODO: Print out all the placement
CHECK_OR_RETURN(parallel_desc->Equals(*JUST(GetParallelDescOfTensor(input_tensor))))
<< " Lazy nn.Graph name : " << graph_name << " encountered ERROR where multi-input tensor"
<< " has different placements in module/op_name: " << new_op_name
<< ". Please use tensor.to() or tensor.to_global() to synchronize all the input with the "
"same placement.";
CHECK_EQ_OR_RETURN(is_local, input_tensor->is_local());
if (is_local) {
CHECK_OR_RETURN(device_tag == JUST(GetDeviceTagOfTensor(input_tensor)))
<< Error::RuntimeError() << "Lazy nn.Graph name: " << graph_name
<< " encountered ERROR in module/op_name: " << new_op_name
<< ". Expected all tensors to be on the same device, but found at least two devices, "
<< JUST(JUST(VectorAt(inputs, 0))->device())->ToString() << " (positional 0) and "
<< JUST(JUST(VectorAt(inputs, i))->device())->ToString() << " (positional " << i
<< ")! Please use tensor.to() to synchronize all the input with the same device.";
} else {
// TODO: Print out all the placement
CHECK_OR_RETURN(parallel_desc->Equals(*JUST(GetParallelDescOfTensor(input_tensor))))
<< Error::RuntimeError() << "Lazy nn.Graph name: " << graph_name
<< " encountered ERROR in module/op_name: " << new_op_name
<< ". Expected all tensors to be on the same placement, but found at least two "
"placements, "
<< *JUST(PlacementToString(JUST(JUST(VectorAt(inputs, 0))->parallel_desc())))
<< " (positional 0) and "
<< *JUST(PlacementToString(JUST(JUST(VectorAt(inputs, i))->parallel_desc())))
<< " (positional " << i
<< ")! Please use tensor.to_global() to synchronize all the input with the same "
"placement.";
}
const std::string& ibn = op_expr.indexed_ibns().at(i);
std::string lbn = TensorNameScope::Global()->Lookup(input_tensor);
if (lbn.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/framework/sbp_infer_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ double ComputeSbpInferPriority(const NdSbp& producer_sbp_parallel,
// consumer
return 0.0;
} else {
// Penality: this blob have different placments and sbps but it does not support boxing
// Penality: this blob have different placements and sbps but it does not support boxing
return 2.0;
}
} else {
Expand Down
11 changes: 7 additions & 4 deletions oneflow/core/job/parallel_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,23 +321,26 @@ Maybe<void> ParallelDesc::CheckDeviceIdsIsValid() const {
machine_id2sorted_dev_phy_ids_->find(GlobalProcessCtx::Rank());
for (int64_t machine_id : sorted_machine_ids_) {
CHECK_LT_OR_RETURN(machine_id, GlobalProcessCtx::WorldSize())
<< "Placment is invalid because rank must be less than world size!";
<< Error::RuntimeError()
<< "Placement is invalid because rank must be less than world size!";
}
if (sorted_dev_phy_ids_iter != machine_id2sorted_dev_phy_ids_->end()) {
for (int64_t dev_phy_id : *sorted_dev_phy_ids_iter->second) {
if (device_type_ == DeviceType::kCUDA) {
const int64_t gpu_device_num = GetGpuDeviceNum();
CHECK_NE_OR_RETURN(gpu_device_num, 0)
<< "Placment with \"cuda\" type is invalid because there is no CUDA device!";
<< Error::RuntimeError()
<< "Placement with \"cuda\" type is invalid because there is no CUDA device!";
int64_t device_num = std::min(GlobalProcessCtx::NumOfProcessPerNode(), gpu_device_num);
CHECK_LT_OR_RETURN(dev_phy_id, device_num)
<< "Placment is invalid because device id must be less than "
<< Error::RuntimeError() << "Placement is invalid because device id must be less than "
<< (gpu_device_num < GlobalProcessCtx::NumOfProcessPerNode()
? "num of CUDA devices on node"
: "num of process per node");
} else if (device_type_ == DeviceType::kCPU) {
CHECK_LT_OR_RETURN(dev_phy_id, GlobalProcessCtx::NumOfProcessPerNode())
<< "Placment is invalid because device id must be less than num of process per node";
<< Error::RuntimeError()
<< "Placement is invalid because device id must be less than num of process per node";
} else {
OF_UNIMPLEMENTED();
}
Expand Down
13 changes: 7 additions & 6 deletions oneflow/core/job_rewriter/pipeline_buffer_pass.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,13 @@ Maybe<void> PipelineBufferPass::Apply(const OpGraph& op_graph, JobBuilder* job_b
continue; /* last stage(loss) does NOT need to insert buffer */
}
if (src_stage_id != dst_stage_id) {
LOG(WARNING) << " Cross diff stage link From: [" << src_node->op().op_conf().DebugString()
<< "](stage_id:" << std::to_string(src_stage_id) << ") -> ["
<< this_node->op().op_conf().DebugString()
<< "](stage_id:" << std::to_string(dst_stage_id)
<< "). Make sure to change the tensor's placment before it enter the module "
"of a next pipeline stage.\n";
LOG(WARNING)
<< " Cross diff stage link From: [" << src_node->op().op_conf().DebugString()
<< "](stage_id:" << std::to_string(src_stage_id) << ") -> ["
<< this_node->op().op_conf().DebugString()
<< "](stage_id:" << std::to_string(dst_stage_id)
<< "). Make sure to change the tensor's placement before it enter the module "
"of a next pipeline stage.\n";
}
const int64_t buffer_size = total_stage_num * 2; /* NOTE(chengcheng): max buffer size */
TryInsertOrUseBufferOpToDstNode(in_edge, buffer_size, &buffer_op_name2op_conf,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import os
import unittest

import oneflow as flow
import oneflow.unittest


@unittest.skipIf(os.getenv("ONEFLOW_TEST_CPU_ONLY"), "only test cpu cases")
class TestModule(flow.unittest.TestCase):
@flow.unittest.skip_unless_1n1d()
def test_multi_input_with_diff_device(test_case):
# torch exception and messge:
#
# RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:0 and cpu!
#
x = flow.tensor([1, 2, 3, 4])
y = flow.tensor([2, 4, 6, 8], device="cuda")
with test_case.assertRaises(RuntimeError) as ctx:
z = flow.add(x, y)
test_case.assertTrue(
"Expected all tensors to be on the same device, but found at least two devices"
in str(ctx.exception)
)

@flow.unittest.skip_unless_1n2d()
def test_multi_input_with_diff_placement(test_case):
x = flow.tensor(
[1, 2, 3, 4], placement=flow.placement("cuda", [0]), sbp=flow.sbp.broadcast
)
y = flow.tensor(
[2, 4, 6, 8], placement=flow.placement("cuda", [1]), sbp=flow.sbp.broadcast
)
with test_case.assertRaises(RuntimeError) as ctx:
z = flow.add(x, y)
test_case.assertTrue(
"Expected all tensors to be on the same placement, but found at least two placements"
in str(ctx.exception)
)


if __name__ == "__main__":
unittest.main()

0 comments on commit 882b38e

Please sign in to comment.