-
Notifications
You must be signed in to change notification settings - Fork 682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/graph del by ref #7857
Feat/graph del by ref #7857
Changes from 89 commits
0bae07a
959346b
9a03edb
c59716f
d531d43
ade9d9c
e0d7dbb
ba9e31d
b8f7f89
af55be9
5f1324f
1eaefa8
1b3c3df
516c691
9095397
c3846cd
e32f10d
a517fb7
c944d61
969ba8c
10e5f7e
69e3680
3b85239
d406a30
c7c6c67
21f49ba
65db196
fca7b8d
b049dfc
d0e4c8f
c5bbf35
c1e664b
760d131
7ed5d66
30d4cc0
3bc7fe0
24f02ea
9d7617b
26abf82
19be109
669dc9c
b0dbcdb
3281975
29ccd30
8509484
434b7af
d497049
a64a552
b19c3fb
cf4241f
60b59e5
d9609c6
62540a4
78f2708
1f82f5e
ad0be01
3925eb4
86296cb
454f5e7
d1d9ad7
a58348d
8bb83a1
fe64379
9b5cecc
f7f1fdf
72426fc
2300408
174bc3d
3e8585e
46c9d97
3be6f81
9f10675
da8b44d
c52bc90
b23869c
a80aea6
ec04de3
fde02a6
7547134
b7774c1
8d45134
2104348
809b483
b642cc5
6199229
e4e95ff
d11a7f8
ba1c68a
2ec0e0f
310a9fd
da54f36
5166fe7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
/* | ||
Copyright 2020 The OneFlow Authors. All rights reserved. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
#include <glog/logging.h> | ||
#include <sys/socket.h> | ||
#include <netinet/in.h> | ||
#include <arpa/inet.h> | ||
#include <cstddef> | ||
#include <cstdint> | ||
#include <cstdlib> | ||
#include <memory> | ||
#include <random> | ||
#include <type_traits> | ||
#include "oneflow/api/cpp/env_impl.h" | ||
#include "oneflow/core/common/global.h" | ||
#include "oneflow/core/common/just.h" | ||
#include "oneflow/core/common/optional.h" | ||
#include "oneflow/core/common/util.h" | ||
#include "oneflow/core/framework/session_util.h" | ||
#include "oneflow/core/job/env.pb.h" | ||
#include "oneflow/core/job/cluster_instruction.h" | ||
#include "oneflow/core/control/ctrl_bootstrap.h" | ||
#include "oneflow/core/job/session.h" | ||
#include "oneflow/core/rpc/include/base.h" | ||
#include "oneflow/core/vm/vm_util.h" | ||
|
||
namespace oneflow_api { | ||
|
||
namespace of = oneflow; | ||
|
||
namespace { // for inltialize | ||
|
||
inline bool IsEnvInited() { return of::Global<of::EnvGlobalObjectsScope>::Get() != nullptr; } | ||
|
||
bool HasEnvVar(const std::string& key) { | ||
const char* value = getenv(key.c_str()); | ||
return value != nullptr; | ||
} | ||
|
||
std::string GetEnvVar(const std::string& key, const std::string& default_value) { | ||
const char* value = getenv(key.c_str()); | ||
if (value == nullptr) { return default_value; } | ||
return std::string(value); | ||
} | ||
|
||
int64_t GetEnvVar(const std::string& key, int64_t default_value) { | ||
const char* value = getenv(key.c_str()); | ||
if (value == nullptr) { return default_value; } | ||
return std::atoll(value); | ||
} | ||
|
||
int32_t FindFreePort(const std::string& addr) { | ||
#ifdef __linux__ | ||
int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); | ||
CHECK_GE(sock, 0) << "fail to find a free port."; | ||
int optval = 1; | ||
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); | ||
|
||
std::mt19937 rng; | ||
rng.seed(std::random_device()()); | ||
std::uniform_int_distribution<std::mt19937::result_type> dist(1, 1000); | ||
|
||
int count = 0; | ||
int num_attempts = 200; | ||
do { | ||
int port = 5000 + dist(rng); | ||
struct sockaddr_in sockaddr {}; | ||
memset(&sockaddr, 0, sizeof(sockaddr)); | ||
sockaddr.sin_family = AF_INET; | ||
sockaddr.sin_port = htons(port); | ||
sockaddr.sin_addr.s_addr = inet_addr(addr.c_str()); | ||
int error = bind(sock, (struct sockaddr*)&sockaddr, sizeof(sockaddr)); | ||
if (error == 0) { return port; } | ||
++count; | ||
} while (count < num_attempts); | ||
CHECK_NE(count, num_attempts) << "fail to find a free port."; | ||
#endif // __linux__ | ||
return -1; | ||
} | ||
|
||
void CompleteEnvProto(of::EnvProto& env_proto) { | ||
auto bootstrap_conf = env_proto.mutable_ctrl_bootstrap_conf(); | ||
auto master_addr = bootstrap_conf->mutable_master_addr(); | ||
const std::string addr = GetEnvVar("MASTER_ADDR", "127.0.0.1"); | ||
master_addr->set_host(addr); | ||
master_addr->set_port(GetEnvVar("MASTER_PORT", FindFreePort(addr))); | ||
bootstrap_conf->set_world_size(GetEnvVar("WORLD_SIZE", 1)); | ||
bootstrap_conf->set_rank(GetEnvVar("RANK", 0)); | ||
|
||
auto cpp_logging_conf = env_proto.mutable_cpp_logging_conf(); | ||
if (HasEnvVar("GLOG_log_dir")) { cpp_logging_conf->set_log_dir(GetEnvVar("GLOG_log_dir", "")); } | ||
if (HasEnvVar("GLOG_logtostderr")) { | ||
cpp_logging_conf->set_logtostderr(GetEnvVar("GLOG_logtostderr", -1)); | ||
} | ||
if (HasEnvVar("GLOG_logbuflevel")) { | ||
cpp_logging_conf->set_logbuflevel(GetEnvVar("GLOG_logbuflevel", -1)); | ||
} | ||
} | ||
} // namespace | ||
|
||
OneFlowEnv::OneFlowEnv() { | ||
of::EnvProto env_proto; | ||
CompleteEnvProto(env_proto); | ||
|
||
env_ctx_ = std::make_shared<of::EnvGlobalObjectsScope>(env_proto); | ||
|
||
of::ConfigProto config_proto; | ||
config_proto.mutable_resource()->set_cpu_device_num(1); // useless, will be set in TryInit | ||
const int64_t session_id = of::NewSessionId(); | ||
CHECK_JUST(of::RegsiterSession(session_id)); | ||
config_proto.set_session_id(session_id); | ||
|
||
session_ctx_ = std::make_shared<of::MultiClientSessionContext>(env_ctx_); | ||
CHECK_JUST(session_ctx_->TryInit(config_proto)); | ||
} | ||
|
||
OneFlowEnv::~OneFlowEnv() { | ||
session_ctx_.reset(); | ||
env_ctx_.reset(); | ||
} | ||
|
||
} // namespace oneflow_api |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
Copyright 2020 The OneFlow Authors. All rights reserved. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
#include <memory> | ||
#include "oneflow/core/framework/multi_client_session_context.h" | ||
#include "oneflow/core/job/env_global_objects_scope.h" | ||
|
||
#ifndef ONEFLOW_API_CPP_ENV_IMPL_H_ | ||
#define ONEFLOW_API_CPP_ENV_IMPL_H_ | ||
|
||
namespace oneflow_api { | ||
namespace of = oneflow; | ||
class OneFlowEnv { | ||
public: | ||
OF_DISALLOW_COPY(OneFlowEnv); | ||
OneFlowEnv(); | ||
~OneFlowEnv(); | ||
std::shared_ptr<of::MultiClientSessionContext> GetSessionCtx() { return session_ctx_; } | ||
|
||
private: | ||
std::shared_ptr<of::EnvGlobalObjectsScope> env_ctx_; | ||
std::shared_ptr<of::MultiClientSessionContext> session_ctx_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个session_ctx一旦初始化就不会被修改是吗? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里稍稍有点反直觉。我一直以为是session_ctx持有oneflow_env。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 是的,这里是 serving的用法,看起来serving那里目前是这么设定的。 session_ctx_会持有env_ctx_;oneflow_env包括了env和session,是导出给serving做oneflow总体启动和关闭的,之前的命名直接是env,这里沿袭了一下,叫oneflow env。 |
||
}; | ||
} // namespace oneflow_api | ||
|
||
#endif // ONEFLOW_API_CPP_ENV_IMPL_H_ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ limitations under the License. | |
*/ | ||
|
||
#include "oneflow/api/common/ofblob.h" | ||
#include "oneflow/api/cpp/env_impl.h" | ||
#include "oneflow/api/cpp/framework/device.h" | ||
#include "oneflow/api/cpp/framework/dtype.h" | ||
#include "oneflow/api/cpp/framework/graph.h" | ||
|
@@ -54,6 +55,7 @@ limitations under the License. | |
#include "oneflow/core/operator/interface_blob_conf.pb.h" | ||
#include "oneflow/core/operator/op_conf.pb.h" | ||
#include "oneflow/core/register/logical_blob_id.pb.h" | ||
#include "oneflow/core/vm/vm_util.h" | ||
|
||
namespace oneflow_api { | ||
|
||
|
@@ -134,7 +136,7 @@ class Graph::GraphImpl final { | |
GraphImpl(const GraphImpl& graph) = delete; | ||
GraphImpl(GraphImpl&& graph) = default; | ||
|
||
~GraphImpl() = default; | ||
~GraphImpl(); | ||
|
||
GraphImpl& operator=(const GraphImpl& graph) = delete; | ||
GraphImpl& operator=(GraphImpl&& graph) = default; | ||
|
@@ -228,8 +230,9 @@ Graph::GraphImpl::GraphImpl(const std::string& model_path, const Device& device) | |
if (of::ParseBooleanFromEnv("ONEFLOW_SERVING_DEBUG", false)) { LOG(ERROR) << job_.DebugString(); } | ||
job_.mutable_job_conf()->mutable_predict_conf(); | ||
job_.mutable_job_conf()->set_job_name(job_.mutable_job_conf()->job_name() + of::NewUniqueId()); | ||
graph_ = std::make_shared<of::NNGraph>(job_.job_conf().job_name()); | ||
of::Global<of::MultiClientSessionContext>::Get()->AddCGraph(graph_).GetOrThrow(); | ||
CHECK(of::Global<OneFlowEnv>::Get() != nullptr); | ||
graph_ = std::make_shared<of::NNGraph>(job_.job_conf().job_name(), | ||
of::Global<OneFlowEnv>::Get()->GetSessionCtx()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 如果直接是 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 现在也在尽量避免用Global来读写session,而是用引用计数,生命周期、依赖关系比较明确。 Global这种全局指针,在链式资源依赖 + 多线程里面,释放时很难做对,所以这里没有把session导出到Global。 这里下个PR再尝试把serving这里的接口也改成更彻底的依赖注入(oneflow graph那里已经改好了),可以避免使用Global这种跳线的写法了。 |
||
} | ||
|
||
InputOutputInfos Graph::GraphImpl::GetInputInfos() { return input_infos_; } | ||
|
@@ -403,4 +406,6 @@ of::Maybe<void> Graph::GraphImpl::RegisterTensors(const std::vector<Tensor>& inp | |
return of::Maybe<void>::Ok(); | ||
} | ||
|
||
Graph::GraphImpl::~GraphImpl() { of::vm::ClusterSync().GetOrThrow(); } | ||
|
||
} // namespace oneflow_api |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ limitations under the License. | |
#include <pybind11/pybind11.h> | ||
#include "oneflow/api/python/env/env.h" | ||
#include "oneflow/api/python/of_api_registry.h" | ||
#include "oneflow/core/job/env_global_objects_scope.h" | ||
#include "oneflow/core/common/global.h" | ||
#include "oneflow/core/vm/vm_util.h" | ||
#include "oneflow/core/vm/virtual_machine.h" | ||
|
@@ -42,10 +43,9 @@ ONEFLOW_API_PYBIND11_MODULE("", m) { | |
m.def("EnvResource", &EnvResource); | ||
m.def("EnableEagerEnvironment", &EnableEagerEnvironment); | ||
|
||
py::class_<EnvGlobalObjectsScope, std::shared_ptr<EnvGlobalObjectsScope>>(m, "Env") | ||
.def(py::init([](const std::string& env_proto_str) { | ||
return CreateEnv(env_proto_str).GetPtrOrThrow(); | ||
})) | ||
py::class_<oneflow::EnvGlobalObjectsScope, std::shared_ptr<oneflow::EnvGlobalObjectsScope>>( | ||
m, "EnvContext") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 统一命名为context |
||
.def(py::init<const std::string&>()) | ||
.def("SwitchToShuttingDownPhase", &SwitchToShuttingDownPhase, | ||
py::call_guard<py::gil_scoped_release>()); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Env for serving c++ api