Skip to content

Commit

Permalink
Decouple stream and instruction (#7607)
Browse files Browse the repository at this point in the history
* remove deprecated python api

* backup code

* backup code

* fix compiler complaints

* fix typo in refactoring

* kMockDevice

* add unit test test_mock.py

* revert mock kernels

* vert DEVICE_TYPE_SEQ

* mock placement

* address pr comments

* register device kCriticalSectionDevice and kLazyJobLauncher

* kControlDevice

* Stream::vm_stream_

* fix compiler complaints

* backup code

* rename StreamIsTransport to IsCommNetStream

* decouple vm::StreamType and vm::InstructionType

* fix compiler complaints

* remove 'gpu' related code

* address static analyzer complaints

* address static analyzer complaints

* remove unused module in test_mock.py

* the Env is never destroyed.

* export Env into python

* more unittests

* export unittest.TestCase in framework/unittest.py

* SwitchToShuttingDownPhase

* optional is_normal_exit

* VirtualMachine::CloseVMThreads

* Delete env_api.h

env_api.h is deleted by master

* reshape_only_one_dim_infered

* address pr comments

* rollback flow.env.all_device_placement

* no distributed running test_shutting_down.py

* auto format by CI

* expand lifetime of module oneflow in test_shutting_down.py

* refine del depend on of

* fix oneflow.placement.__str__

* revert GlobalSync

* init_producer_stream in oneflow.from_numpy

* debug code for vm

* init disable_vm_threads_ in VirtualMachine::VirtualMachine

* Update oneflow/core/vm/virtual_machine.h

Co-authored-by: daquexian <daquexian566@gmail.com>

* create stream in forked subprocesses.

* refactor StreamRoleSwitch to StreamRoleVisistor

* ThreadLocalGuard

* auto format by CI

* fix compiler complaints

* fix static analyzer complaints

* VirtualMachine::GetVmStream

* fix static analyzer complaints

* reimplement AddAndReadVector by std::deque

* reimplement AddAndReadVector

* merge master

* increase atol for test_consistent_rnn_cell.py

* StreamRole::AsyncLaunchedCommNet is bound to EventRecordedCudaStreamType

* auto format by CI

* remove StreamRoleVisitor<T>::VisitInvalid

* no copy in AddAndReadVector

* fix bug of AddAndReadVector::size_

* disable terminfo to fix missing terminfo symbols

Signed-off-by: daquexian <daquexian566@gmail.com>

* auto format by CI

* fix AddAndReadVector::GetGranularity

* remove bad unittest

* auto format by CI

* rename CallInstructionType to OpCallInstructionType

* static variable  GlobalSingletonPtr is a unique_ptr

* replace ++atomic_cnt with atomic_cnt.fetch_add(1, std::memory_order_relaxed)

* AddAndReadVector::operator[]

* change comments 'lock free' to 'thread safe'

* rename StatefulLocalOpKernel to StatefulOpKernel

* rename VirtualMachine::vm_ to VirtualMachine::engine_

* mark VirtualMachine::NoMoreErasedInstructions private

* mark VirtualMachine::FindOrCreateScheduleLocalDepObject private

* remove unused version of VirtualMachineEngine::Receive

* rename argname for VirtualMachineEngine::Receive

* rename unused PendingInstructionList

* rename AddAndReadVector to SteadyVector

* optimize SteadyVector::operator[] by __builtin_clzll

* refactor SteadyVector::granularity2vector_ to SteadyVector::granularity2data_

* reduce usage of steady_vector::size_

* rename unused anounymous namespace

* greater atol for test_consistent_tensordot.py

* fix BarrierInstructionType::ComputeInFuseMode

* revert container_util.h

* run AccessBlobByCallback in default stream of tensor->device

* reslove static check

* reslove static check

* SteadyVector::MutableOrAdd

Co-authored-by: oneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: chengtbf <472491134@qq.com>
Co-authored-by: oneflow-ci-bot <ci-bot@oneflow.org>
Co-authored-by: Xiaoyu Xu <xiaoyulink@gmail.com>
Co-authored-by: daquexian <daquexian566@gmail.com>
Co-authored-by: binbinHan <han_binbin@163.com>
  • Loading branch information
8 people authored Jun 22, 2022
1 parent 42d53ad commit 9a5e750
Show file tree
Hide file tree
Showing 101 changed files with 1,443 additions and 2,470 deletions.
6 changes: 4 additions & 2 deletions oneflow/api/python/functional/tensor_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,10 @@ class LocalTensorSharedNumpyDataFunctor {

// Init blob
JUST(tensor_impl->InitEagerBlobObject(NewLocalDepObject(), /*pin_memory=*/false));
const auto& stream = GetDefaultStreamByDevice(device);
JUST(tensor_impl->eager_blob_object())->set_last_used_stream(stream);
const auto& stream = JUST(GetDefaultStreamByDevice(device));
const auto& eager_blob_object = JUST(tensor_impl->eager_blob_object());
JUST(eager_blob_object->init_producer_stream(stream));
eager_blob_object->set_last_used_stream(stream);
std::shared_ptr<Tensor> out(new MirroredTensor(tensor_impl));
return out;
}
Expand Down
41 changes: 0 additions & 41 deletions oneflow/api/python/vm/id_generator.cpp

This file was deleted.

1 change: 1 addition & 0 deletions oneflow/core/boxing/slice_boxing_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.

#include "oneflow/core/framework/tensor.h"
#include "oneflow/core/framework/placed_nd_sbp.h"
#include "oneflow/core/job/parallel_desc.h"

namespace oneflow {

Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/common/device_type.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ enum DeviceType {
kInvalidDevice = 0;
kCPU = 1;
kCUDA = 2;
kMockDevice = 3;
kMockDevice = 3; // pseudo device for test.
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,29 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/eager/blob_instruction_type.h"
#include "oneflow/core/vm/cpu_stream_type.h"
#ifndef ONEFLOW_CORE_COMMON_SINGLETON_PTR_H_
#define ONEFLOW_CORE_COMMON_SINGLETON_PTR_H_

#include <memory>

namespace oneflow {
namespace vm {

class CpuAccessBlobByCallbackInstructionType final : public AccessBlobByCallbackInstructionType {
public:
CpuAccessBlobByCallbackInstructionType() = default;
~CpuAccessBlobByCallbackInstructionType() override = default;
namespace private_detail {

template<typename T>
const T* GlobalSingletonPtr() {
static std::unique_ptr<const T> value(new T());
return value.get();
}

using stream_type = vm::CpuStreamType;
};
COMMAND(vm::RegisterInstructionType<CpuAccessBlobByCallbackInstructionType>(
"cpu.AccessBlobByCallback"));
} // namespace private_detail

template<typename T>
const T* SingletonPtr() {
thread_local const T* value = private_detail::GlobalSingletonPtr<T>();
return value;
}

} // namespace vm
} // namespace oneflow

#endif // ONEFLOW_CORE_COMMON_SINGLETON_PTR_H_
102 changes: 102 additions & 0 deletions oneflow/core/common/steady_vector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef ONEFLOW_CORE_COMMON_STEADY_VECTOR_H_
#define ONEFLOW_CORE_COMMON_STEADY_VECTOR_H_

#include <memory>
#include <array>
#include <mutex>
#include <cmath>
#include <glog/logging.h>

namespace oneflow {

template<typename T, int N = 20>
class SteadyVector {
public:
SteadyVector() : size_(0) {}
~SteadyVector() = default;

using value_type = const T;
using size_type = size_t;

// thread safe.
size_t size() const { return size_; }

// thread safe.
const T& at(size_t index) const {
CHECK_GE(index, 0);
CHECK_LT(index, size_);
return (*this)[index];
}

// thread safe.
const T& operator[](size_t index) const {
int gran = 0;
size_t start = 0;
GetGranularityAndStart(index, &gran, &start);
return granularity2data_[gran].get()[index - start];
}

void push_back(const T& elem) { *MutableOrAdd(size_) = elem; }

// `index` shoule be <= size()
T* MutableOrAdd(size_t index) {
std::unique_lock<std::mutex> lock(mutex_);
size_t size = size_;
CHECK_LE(index, size) << "index out of range";
if (index == size) {
int granularity = GetGranularity(size);
if (size + 1 == (1 << granularity)) {
CHECK_LT(granularity, N);
granularity2data_[granularity].reset(new T[1 << granularity]);
}
++size_;
}
return Mutable(index);
}

private:
T* Mutable(size_t index) {
int gran = 0;
size_t start = 0;
GetGranularityAndStart(index, &gran, &start);
return &granularity2data_[gran].get()[index - start];
}

static void GetGranularityAndStart(size_t index, int* gran, size_t* start) {
*gran = GetGranularity(index);
*start = (1 << *gran) - 1;
}

#ifdef __GNUC__
#define LOG2(x) ((unsigned)(8 * sizeof(unsigned long long) - __builtin_clzll((x)) - 1))
#else
#define LOG2(x) std::log2(x)
#endif

static int GetGranularity(size_t index) { return LOG2(index + 1); }

#undef LOG2

std::atomic<size_t> size_;
std::mutex mutex_;
std::array<std::unique_ptr<T[]>, N> granularity2data_;
};

} // namespace oneflow

#endif // ONEFLOW_CORE_COMMON_STEADY_VECTOR_H_
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,24 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/vm/stream_runtime_desc.h"
#include "gtest/gtest.h"
#include "oneflow/core/common/steady_vector.h"

namespace oneflow {
namespace vm {
namespace test {

void StreamRtDesc::__Init__(StreamDesc* stream_desc) {
const StreamType* stream_type = &stream_desc->stream_type();
reset_stream_desc(stream_desc);
set_stream_type(stream_type);
void TestSteadyVector(int granularity) {
CHECK_GT(granularity, 0);
SteadyVector<int> vec;
ASSERT_EQ(vec.size(), 0);
for (int i = 0; i < (1 << granularity); ++i) {
vec.push_back(i);
ASSERT_EQ(vec.at(i), i);
ASSERT_EQ(vec.size(), i + 1);
}
}

} // namespace vm
TEST(SteadyVector, simple) { TestSteadyVector(6); }

} // namespace test
} // namespace oneflow
60 changes: 30 additions & 30 deletions oneflow/core/common/stream_role.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,44 @@ limitations under the License.
#include <functional>
#include <array>
#include "oneflow/core/common/preprocessor.h"
#include "glog/logging.h"

namespace oneflow {

#define STREAM_ROLE_SEQ \
OF_PP_MAKE_TUPLE_SEQ(kCompute) \
OF_PP_MAKE_TUPLE_SEQ(kHost2Device) \
OF_PP_MAKE_TUPLE_SEQ(kDevice2Host) \
OF_PP_MAKE_TUPLE_SEQ(kSyncedLaunchedCommNet) \
OF_PP_MAKE_TUPLE_SEQ(kAsyncedLaunchedCommNet) \
OF_PP_MAKE_TUPLE_SEQ(kCriticalSection)

enum class StreamRole {
kInvalid = 0,
#define DECLARE_STREAM_ROLE(stream_role) stream_role,
OF_PP_FOR_EACH_TUPLE(DECLARE_STREAM_ROLE, STREAM_ROLE_SEQ)
#undef DECLARE_STREAM_ROLE
kCompute,
kHost2Device,
kDevice2Host,
kSyncedLaunchedCommNet,
kAsyncedLaunchedCommNet,
kBarrier,
kCriticalSection,
kLazyJobLauncher
};

static constexpr int kStreamRoleSize = 1 + OF_PP_SEQ_SIZE(STREAM_ROLE_SEQ);

// Act as a class for overloading functions
template<StreamRole stream_role>
struct StreamRoleCase {};

template<typename Functor, typename... Args>
auto StreamRoleSwitch(StreamRole stream_role, Args&&... args)
-> decltype(Functor::Case(StreamRoleCase<StreamRole::kInvalid>(),
std::forward<Args>(args)...)) {
switch (stream_role) {
#define MAKE_ENTRY(stream_role) \
case StreamRole::stream_role: \
return Functor::Case(StreamRoleCase<StreamRole::stream_role>(), std::forward<Args>(args)...);
OF_PP_FOR_EACH_TUPLE(MAKE_ENTRY, STREAM_ROLE_SEQ)
#undef MAKE_ENTRY
default:
return Functor::Case(StreamRoleCase<StreamRole::kInvalid>(), std::forward<Args>(args)...);
template<typename DerivedT>
struct StreamRoleVisitor {
template<typename... Args>
static auto Visit(StreamRole stream_role, Args&&... args) {
switch (stream_role) {
case StreamRole::kInvalid: LOG(FATAL) << "invalid stream role";
case StreamRole::kCompute: return DerivedT::VisitCompute(std::forward<Args>(args)...);
case StreamRole::kHost2Device: return DerivedT::VisitHost2Device(std::forward<Args>(args)...);
case StreamRole::kDevice2Host: return DerivedT::VisitDevice2Host(std::forward<Args>(args)...);
case StreamRole::kSyncedLaunchedCommNet:
return DerivedT::VisitSyncedLaunchedCommNet(std::forward<Args>(args)...);
case StreamRole::kAsyncedLaunchedCommNet:
return DerivedT::VisitAsyncedLaunchedCommNet(std::forward<Args>(args)...);
case StreamRole::kBarrier: return DerivedT::VisitBarrier(std::forward<Args>(args)...);
case StreamRole::kCriticalSection:
return DerivedT::VisitCriticalSection(std::forward<Args>(args)...);
case StreamRole::kLazyJobLauncher:
return DerivedT::VisitLazyJobLauncher(std::forward<Args>(args)...);
}
LOG(FATAL) << "invalid stream role";
}
}
};

} // namespace oneflow

Expand Down
2 changes: 1 addition & 1 deletion oneflow/core/eager/blob_instruction_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void AccessBlobByCallbackInstructionType::ComputeInstrMsg(
const auto* ptr =
dynamic_cast<const vm::AccessBlobArgCbPhyInstrOperand*>(phy_instr_operand.get());
CHECK_NOTNULL(ptr);
DeviceCtx* device_ctx = instr_msg.phy_instr_stream()->device_ctx().get();
DeviceCtx* device_ctx = instr_msg.stream().device_ctx().get();
auto* blob = ptr->eager_blob_object()->blob();
OfBlob ofblob(device_ctx->stream(), blob);
ptr->callback()(reinterpret_cast<uint64_t>(&ofblob));
Expand Down
Loading

0 comments on commit 9a5e750

Please sign in to comment.