Skip to content

Commit

Permalink
Dev support tensor pin memory (#8073)
Browse files Browse the repository at this point in the history
* empty support pin_memory

* refine

* tensor.pin_memory()

* refine

* add docs
  • Loading branch information
Flowingsun007 authored May 9, 2022
1 parent 85b8306 commit b826253
Show file tree
Hide file tree
Showing 32 changed files with 262 additions and 51 deletions.
2 changes: 2 additions & 0 deletions docs/source/tensor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,5 @@ OneFlow Tensor Class
where,
zero_,
nms,
pin_memory,

4 changes: 2 additions & 2 deletions oneflow/api/cpp/framework/graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ of::Maybe<void> Graph::GraphImpl::BuildGraph() {
variable_op_name_to_tensor_[op_conf.name()] = JUST(of::one::functional::Empty(
of::Shape(variable_conf.shape()),
JUST(of::DType::Get(static_cast<of::DataType>(variable_conf.data_type()))),
*device_.device_));
*device_.device_, /*pin_memory=*/false));
}
return of::Maybe<void>::Ok();
});
Expand All @@ -346,7 +346,7 @@ of::Maybe<void> Graph::GraphImpl::BuildGraph() {
output_name_to_tensor_[op_conf.name()] = JUST(of::one::functional::Empty(
of::Shape(blob_conf.shape()),
JUST(of::DType::Get(static_cast<of::DataType>(blob_conf.data_type()))),
*device_.device_));
*device_.device_, /*pin_memory=*/false));
}
return of::Maybe<void>::Ok();
});
Expand Down
2 changes: 1 addition & 1 deletion oneflow/api/cpp/framework/tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Tensor::Tensor(const Shape& shape, const Device& device, const DType& dtype) {
of::LazyMode::Guard lazy_mode_disabled_guard(/*is_enabled*/ false);
tensor_ = functional::Empty(*shape.shape_,
of::DType::Get(static_cast<of::DataType>(dtype)).GetOrThrow(),
*device.device_)
*device.device_, /*pin_memory=*/false)
.GetPtrOrThrow();
}
Tensor::Tensor(const std::shared_ptr<oneflow::one::Tensor>& tensor) : tensor_(tensor) {}
Expand Down
7 changes: 7 additions & 0 deletions oneflow/api/python/framework/tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ static PyObject* PyTensorObject_contiguous(PyObject* self, PyObject* unused) {
END_HANDLE_ERRORS
}

static PyObject* PyTensorObject_pin_memory(PyObject* self, PyObject* unused) {
HANDLE_ERRORS
return PyTensor_New(PyTensor_Unpack(self)->pin_memory());
END_HANDLE_ERRORS
}

static PyObject* PyTensorObject_requires_grad_(PyObject* self, PyObject* args, PyObject* kwargs) {
HANDLE_ERRORS
int requires_grad = 1;
Expand Down Expand Up @@ -319,6 +325,7 @@ static PyMethodDef PyTensorObject_methods[] = {
{"stride", PyTensorObject_stride, METH_NOARGS, NULL},
{"is_contiguous", PyTensorObject_is_contiguous, METH_NOARGS, NULL},
{"contiguous", PyTensorObject_contiguous, METH_NOARGS, NULL},
{"pin_memory", PyTensorObject_pin_memory, METH_NOARGS, NULL},
{"requires_grad_", (PyCFunction)PyTensorObject_requires_grad_, METH_VARARGS | METH_KEYWORDS,
NULL},
{"retain_grad", PyTensorObject_retain_grad, METH_NOARGS, NULL},
Expand Down
3 changes: 2 additions & 1 deletion oneflow/api/python/functional/indexing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ Maybe<Tensor> ConvertToIndexingTensor(PyObject* object) {
if (PyArray_Check(object)) { return TensorWithData(object, NullOpt, device, false); }

const auto& sizes = InferArraySizes(object);
const auto& tensor = JUST(functional::Empty(sizes, CHECK_JUST(DType::Get(dtype)), device));
const auto& tensor =
JUST(functional::Empty(sizes, CHECK_JUST(DType::Get(dtype)), device, /*pin_memory=*/false));
// Prevent the python object release until the callback is complete.
Py_INCREF(object);
auto handle = std::shared_ptr<PyObject>(PyObjectPtr(object));
Expand Down
7 changes: 4 additions & 3 deletions oneflow/api/python/functional/tensor_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class TensorWithShapeCtorFunctor {
} else {
device_ = JUST(Device::New("cpu"));
}
return functional::Empty(shape, DType::Float(), device_);
return functional::Empty(shape, DType::Float(), device_, /*pin_memory=*/false);
}
};

Expand All @@ -207,6 +207,7 @@ class AssignLocalTensorFunctor {
}
Maybe<void> operator()(const std::shared_ptr<one::Tensor>& ref,
const std::shared_ptr<one::Tensor>& value) const {
// JUST(CheckInplaceValid(ref)); // align check to torch
CHECK_OR_RETURN(ref->is_local() && value->is_local())
<< "Both ref and value must be local tensor.";
JUST(OpInterpUtil::Dispatch<TensorTuple>(*op_, {ref, value}));
Expand Down Expand Up @@ -277,7 +278,7 @@ class LocalTensorSharedNumpyDataFunctor {
/*ls_leaf=*/true);

// Init blob
JUST(tensor_impl->InitEagerBlobObject(NewLocalDepObject()));
JUST(tensor_impl->InitEagerBlobObject(NewLocalDepObject(), /*pin_memory=*/false));
const auto& stream = GetDefaultStreamByDevice(device);
JUST(tensor_impl->eager_blob_object())->set_last_used_stream(stream);
std::shared_ptr<Tensor> out(new MirroredTensor(tensor_impl));
Expand All @@ -297,7 +298,7 @@ ONEFLOW_FUNCTION_LIBRARY(m) {
m.add_functor<impl::ConsistentTensorWithDataCtorFunctor>("ConsistentTensorWithDataCtor");
m.add_functor<impl::TensorWithShapeCtorFunctor>("TensorWithShapeCtor");
m.add_functor<impl::ConsistentTensorWithShapeCtorFunctor>("ConsistentTensorWithShapeCtor");
m.add_functor<impl::AssignLocalTensorFunctor>("AssignLocalTensorFunctor");
m.add_functor<impl::AssignLocalTensorFunctor>("AssignLocalTensor");
m.add_functor<impl::LocalTensorSharedNumpyDataFunctor>("LocalTensorSharedNumpyData");
}

Expand Down
2 changes: 1 addition & 1 deletion oneflow/api/python/functional/tensor_api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
bind_python: True

- name: "assign_local_tensor"
signature: "Void (Tensor ref, Tensor value) => AssignLocalTensorFunctor"
signature: "Void (Tensor ref, Tensor value) => AssignLocalTensor"
bind_python: True

- name: "from_numpy"
Expand Down
4 changes: 2 additions & 2 deletions oneflow/api/python/utils/tensor_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ Maybe<Tensor> MakeLocalTensorFromData(PyObject* data, const Optional<Symbol<DTyp
device_ = JUST(Device::New("cpu"));
}
std::shared_ptr<Tensor> tensor =
JUST(functional::Empty(shape, JUST(DType::Get(data_type)), device_));
JUST(functional::Empty(shape, JUST(DType::Get(data_type)), device_, /*pin_memory=*/false));
JUST(SwitchCopyMirroredTensorFromUntypedArray(SwitchCase(data_type), tensor, array));

Py_DECREF(array);
Expand Down Expand Up @@ -229,7 +229,7 @@ Maybe<Tensor> MakeConsistentTensorFromData(PyObject* data, const Optional<Symbol
device = JUST(Device::New("cuda"));
}
std::shared_ptr<Tensor> local_tensor =
JUST(functional::Empty(shape, JUST(DType::Get(data_type)), device));
JUST(functional::Empty(shape, JUST(DType::Get(data_type)), device, /*pin_memory=*/false));
JUST(SwitchCopyMirroredTensorFromUntypedArray(SwitchCase(data_type), local_tensor, array));

Py_DECREF(array);
Expand Down
3 changes: 2 additions & 1 deletion oneflow/core/autograd/gradient_funcs/narrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ class Narrow : public OpExprGradFunction<NarrowCaptureState> {
if (LazyMode::is_enabled()) {
like = ctx->SavedTensors().at(0);
} else if (dy->is_local()) {
like = JUST(functional::Empty(ctx->shape, dy->dtype(), JUST(dy->device())));
like = JUST(
functional::Empty(ctx->shape, dy->dtype(), JUST(dy->device()), /*pin_memory=*/false));
} else {
like = JUST(
functional::ConsistentEmpty(ctx->shape, dy->dtype(), JUST(dy->parallel_desc()),
Expand Down
5 changes: 3 additions & 2 deletions oneflow/core/boxing/asymmetric_broadcast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ Maybe<one::Tensor> AsymmetricBroadcast(const std::shared_ptr<one::Tensor>& tenso
const auto& in_parallel_id = JUST(GetParallelId4CurrentProcessCtx(in_placement));
if (!in_parallel_id->has_value()) {
const std::string& device_type = in_placement->device_tag();
local_tensor = JUST(one::functional::Empty(*tensor->shape(), tensor->dtype(),
JUST(Device::New(device_type))));
local_tensor =
JUST(one::functional::Empty(*tensor->shape(), tensor->dtype(),
JUST(Device::New(device_type)), /*pin_memory=*/false));
}
const auto& broadcast_group = JUST(GetBroadcastGroup(in_placement, out_placement));

Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/boxing/cuda_copy_boxing_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Maybe<one::Tensor> CudaCopy(const std::shared_ptr<one::Tensor>& tensor, Symbol<P
const std::string& device_type = tensor_placement->device_tag();
local_tensor = JUST(one::functional::Empty(
*JUST(GetPhysicalShape(*tensor->shape(), *tensor_nd_sbp, *tensor_placement, 0)),
tensor->dtype(), JUST(Device::New(device_type))));
tensor->dtype(), JUST(Device::New(device_type)), /*pin_memory=*/false));
}
const auto& sbp_list = JUST(GetSbpList(out->nd_sbp()));
return JUST(one::functional::LocalToConsistent(local_tensor, out->placement(), *sbp_list,
Expand Down
3 changes: 3 additions & 0 deletions oneflow/core/device/cpu_device_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
#include "oneflow/core/kernel/kernel_context.h"
#include "oneflow/core/device/event_record.h"
#include "oneflow/core/vm/cpu_allocator.h"
#include "oneflow/core/vm/cuda_host_allocator.h"
#include "oneflow/core/ep/cpu/cpu_stream.h"
#include "oneflow/core/ep/cpu/cpu_device.h"
#include "oneflow/core/ep/include/device_manager_registry.h"
Expand All @@ -38,6 +39,8 @@ class CpuDeviceCtx final : public DeviceCtx {

vm::Allocator* mut_allocator() override { return Global<vm::CpuAllocator>::Get(); }

vm::Allocator* mut_pin_memory_allocator() { return Global<vm::CudaHostAllocator>::Get(); }

DeviceType device_type() const override { return DeviceType::kCPU; }

ep::Stream* stream() override { return stream_; }
Expand Down
22 changes: 20 additions & 2 deletions oneflow/core/eager/eager_blob_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
#include "oneflow/core/framework/to_string.h"
#include "oneflow/core/framework/shut_down_util.h"
#include "oneflow/core/common/shape_vec.h"
#include "oneflow/core/device/cpu_device_context.h"

namespace oneflow {
namespace vm {
Expand Down Expand Up @@ -49,8 +50,25 @@ Blob* EagerBlobObject::blob() {
void EagerBlobObject::set_storage_offset(const int64_t offset) { storage_offset_ = offset; }

Maybe<void> EagerBlobObject::TryAllocateBlobBodyMemory(DeviceCtx* device_ctx) {
vm::Allocator* allocator = device_ctx->mut_allocator();
CHECK_NOTNULL_OR_RETURN(allocator);
const bool pin_memory = EagerBlobObject::pin_memory();
vm::Allocator* allocator = nullptr;
if (pin_memory) {
CHECK_EQ_OR_RETURN(device_ctx->device_type(), DeviceType::kCPU)
<< Error::RuntimeError() << "cannot pin tensor with device: " << device_ctx->device_type()
<< ", only dense CPU tensors can be pinned.";
allocator = dynamic_cast<CpuDeviceCtx*>(device_ctx)->mut_pin_memory_allocator();
if (allocator == nullptr) {
// for some reason, the pin_memory_allocator will fail to create
// e.g. with no CUDA library support and only can use oneflow in cpu only mode
return Error::RuntimeError()
<< "create pin_memory allocator failed for some reason. mostly, this error has "
"occurred because you are trying to use some CUDA functionality, but the CUDA "
"library has not been loaded by the dynamic linker for some reason.";
}
} else {
allocator = device_ctx->mut_allocator();
}
CHECK_NOTNULL_OR_RETURN(allocator) << Error::RuntimeError() << "allocator created failed!";
size_t required_body_bytes = AlignedByteSizeOfBlobBody();
if (required_body_bytes == 0) {
CHECK_ISNULL_OR_RETURN(tensor_storage_->blob_dptr());
Expand Down
5 changes: 5 additions & 0 deletions oneflow/core/eager/eager_blob_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class EagerBlobObject final {
tensor_storage_->set_last_used_stream(last_used_stream);
}

void set_pin_memory(const bool pin_memory) { pin_memory_ = pin_memory; }

bool pin_memory() const { return pin_memory_; }

std::shared_ptr<const Shape> shape_ptr() const { return shape_; }
const Shape& shape() const { return *shape_; }
Shape& mut_shape() { return *shape_; }
Expand Down Expand Up @@ -181,6 +185,7 @@ class EagerBlobObject final {
int64_t storage_offset_;
std::shared_ptr<TensorStorage> tensor_storage_;
std::atomic<bool> is_shape_synced_;
bool pin_memory_;
intrusive::shared_ptr<LocalDepObject> compute_local_dep_object_;

// NOTE: Will be removed soon. Avoid to use it whenever possible.
Expand Down
1 change: 1 addition & 0 deletions oneflow/core/eager/opkernel_instruction_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ struct LocalCallOpKernelUtil final {
operand->consistent_tensor_infer_result().get());
size_t temp_size = InferTmpSizeFn(op_infer_ctx);
temp_eager_blob_object->mut_shape() = Shape({static_cast<int64_t>(temp_size)});
temp_eager_blob_object->set_pin_memory(false);
temp_eager_blob_object->set_is_dynamic(true);
op_infer_ctx->Update(nullptr, nullptr, nullptr);
}
Expand Down
3 changes: 3 additions & 0 deletions oneflow/core/framework/op_interpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ struct OpExprInterpContext {
OpExprInterpContext(const AttrMap& attrs_arg) : attrs(attrs_arg) {}
OpExprInterpContext(const AttrMap& attrs_arg, Symbol<Device> device_arg)
: attrs(attrs_arg), device(device_arg) {}
OpExprInterpContext(const AttrMap& attrs_arg, Symbol<Device> device_arg, const bool pin_memory)
: attrs(attrs_arg), device(device_arg), pin_memory(pin_memory) {}
OpExprInterpContext(const AttrMap& attrs_arg, std::shared_ptr<user_op::OpKernelState> state_arg)
: attrs(attrs_arg), state(state_arg) {}
OpExprInterpContext(const AttrMap& attrs_arg, Symbol<Device> device_arg,
Expand All @@ -53,6 +55,7 @@ struct OpExprInterpContext {
Optional<Symbol<Device>> device; // for local op
Optional<Symbol<ParallelDesc>> parallel_desc; // for consistent op
Optional<Symbol<NdSbp>> nd_sbp; // for consistent op
Optional<bool> pin_memory; // for pin_memory related op
std::shared_ptr<user_op::OpKernelState> state;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,13 @@ Maybe<void> NaiveInterpret(const UserOpExpr& user_op_expr, const TensorTuple& in
return output_tensor_metas->at(i);
}));

const bool pin_memory = ctx.pin_memory.value_or(false);
for (int i = 0; i < output_eager_blob_objects->size(); i++) {
auto* tensor_impl = JUST(TensorImpl4Tensor(outputs->at(i)));
if (!output_eager_blob_objects->at(i)) {
tensor_impl->mut_tensor_meta()->set_stride(std::make_shared<Stride>(*tensor_impl->shape()));
const auto& dep_object = NewLocalDepObject();
JUST(tensor_impl->InitEagerBlobObject(dep_object));
JUST(tensor_impl->InitEagerBlobObject(dep_object, pin_memory));
output_eager_blob_objects->at(i) = JUST(tensor_impl->eager_blob_object());
} else {
// output i is inplaced.
Expand All @@ -171,16 +172,6 @@ Maybe<void> NaiveInterpret(const UserOpExpr& user_op_expr, const TensorTuple& in
return Maybe<void>::Ok();
}

Maybe<void> RunEmptyOp(TensorTuple* outputs) {
CHECK_EQ_OR_RETURN(outputs->size(), 1);
auto* tensor_impl = JUST(TensorImpl4Tensor(outputs->at(0)));
const auto& shape = tensor_impl->tensor_meta()->shape_ptr();
const auto& data_type = tensor_impl->dtype();
const auto& device = tensor_impl->device();
outputs->at(0) = JUST(functional::Empty(*shape, DType(data_type), device));
return Maybe<void>::Ok();
}

static Maybe<void> NaiveInterpret(const UserOpExpr& user_op_expr, const TensorTuple& inputs,
TensorTuple* outputs, const OpExprInterpContext& ctx) {
CHECK_EQ_OR_RETURN(outputs->size(), user_op_expr.output_size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ namespace one {

class Tensor;

Maybe<void> RunEmptyOp(TensorTuple* outputs);
Maybe<Tensor> Broadcast(const std::shared_ptr<Tensor>& tensor, int64_t src_rank,
Symbol<ParallelDesc> parallel_desc, bool inplace);

Expand Down
12 changes: 8 additions & 4 deletions oneflow/core/framework/random_generator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ Maybe<Tensor> CPUGeneratorImpl::GetState() const {
JUST(CPUSynchronize());
CPUGeneratorState state;
const auto& device = JUST(Device::New("cpu"));
const auto& tensor_state = JUST(functional::Empty(Shape{sizeof(state)}, DType::UInt8(), device));
const auto& tensor_state =
JUST(functional::Empty(Shape{sizeof(state)}, DType::UInt8(), device, /*pin_memory=*/false));

std::stringstream ss;
ss << engine_;
Expand Down Expand Up @@ -181,7 +182,8 @@ Maybe<Tensor> CUDAGeneratorImpl::GetState() const {
int64_t state_size = max_block_num_ * max_thread_num_ * sizeof(curandState);
int64_t total_size = state_size + sizeof(int64_t);
const auto& device = JUST(Device::New("cpu"));
const auto& tensor_state = JUST(functional::Empty(Shape{total_size}, DType::UInt8(), device));
const auto& tensor_state =
JUST(functional::Empty(Shape{total_size}, DType::UInt8(), device, /*pin_memory=*/false));

const auto& callback = [&](uint64_t of_blob_ptr) {
auto* of_blob = reinterpret_cast<OfBlob*>(of_blob_ptr);
Expand Down Expand Up @@ -293,7 +295,8 @@ Maybe<Tensor> AutoGeneratorImpl::GetState() const {
}
}
const auto& device = JUST(Device::New("cpu"));
const auto& tensor_state = JUST(functional::Empty(Shape{total_size}, DType::UInt8(), device));
const auto& tensor_state =
JUST(functional::Empty(Shape{total_size}, DType::UInt8(), device, /*pin_memory=*/false));
const auto& callback = [&buffer, &total_size](uint64_t of_blob_ptr) {
auto* of_blob = reinterpret_cast<OfBlob*>(of_blob_ptr);
memcpy(of_blob->mut_blob()->mut_dptr<uint8_t>(), buffer.data(), total_size);
Expand Down Expand Up @@ -337,7 +340,8 @@ Maybe<void> AutoGeneratorImpl::SetState(const std::shared_ptr<Tensor>& tensor_st
std::vector<std::shared_ptr<Tensor>> tensor_states(state.num);
for (int i = 0; i < state.num; ++i) {
int64_t state_size = state_sizes.at(i);
tensor_states[i] = JUST(functional::Empty(Shape{state_size}, DType::UInt8(), device));
tensor_states[i] =
JUST(functional::Empty(Shape{state_size}, DType::UInt8(), device, /*pin_memory=*/false));
const auto& callback = [&data, &state_size](uint64_t of_blob_ptr) {
auto* of_blob = reinterpret_cast<OfBlob*>(of_blob_ptr);
memcpy(of_blob->mut_blob()->mut_dptr<uint8_t>(), data, state_size);
Expand Down
15 changes: 15 additions & 0 deletions oneflow/core/framework/tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ std::shared_ptr<Tensor> Parameter::contiguous() const {
return CHECK_JUST(functional::ToContiguous(tensor));
}

std::shared_ptr<Tensor> Parameter::pin_memory() const {
std::shared_ptr<Tensor> tensor = std::const_pointer_cast<Tensor>(shared_from_this());
return CHECK_JUST(functional::PinMemory(tensor));
}

/* static */ Maybe<MirroredTensor> MirroredTensor::MakeTensor(
const std::shared_ptr<const Shape>& shape, DataType dtype, const Symbol<Device>& device,
bool is_lazy, bool requires_grad, bool is_leaf) {
Expand Down Expand Up @@ -73,6 +78,11 @@ std::shared_ptr<Tensor> MirroredTensor::contiguous() const {
return CHECK_JUST(functional::ToContiguous(tensor));
}

std::shared_ptr<Tensor> MirroredTensor::pin_memory() const {
std::shared_ptr<Tensor> tensor = std::const_pointer_cast<Tensor>(shared_from_this());
return CHECK_JUST(functional::PinMemory(tensor));
}

Maybe<Tensor> MirroredTensor::clone() const {
const auto& device_type = JUST(this->device())->type();
int64_t device_id = JUST(this->device())->device_id();
Expand All @@ -86,6 +96,11 @@ std::shared_ptr<Tensor> ConsistentTensor::contiguous() const {
return CHECK_JUST(functional::ToContiguous(tensor));
}

std::shared_ptr<Tensor> ConsistentTensor::pin_memory() const {
std::shared_ptr<Tensor> tensor = std::const_pointer_cast<Tensor>(shared_from_this());
return CHECK_JUST(functional::PinMemory(tensor));
}

Maybe<Tensor> ConsistentTensor::clone() const {
const auto& local_tensor = JUST(cur_rank_phy_tensor());
const auto& device_type = JUST(local_tensor->device())->type();
Expand Down
Loading

0 comments on commit b826253

Please sign in to comment.