Skip to content

Commit

Permalink
[GPU] Fix in-order queue synchronization issue related to OCL/OneDNN …
Browse files Browse the repository at this point in the history
…impls interaction with CPU impls (openvinotoolkit#17976)
  • Loading branch information
sshlyapn authored and pull[bot] committed Dec 20, 2023
1 parent cbe9fb7 commit 1540833
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 37 deletions.
14 changes: 13 additions & 1 deletion src/plugins/intel_gpu/src/graph/impls/common/loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ struct loop_impl : typed_primitive_impl<loop> {
}
}

std::vector<event::ptr> all_events;
std::vector<event::ptr> loop_carried_dep(events.begin(), events.end());
int64_t current_iteration_idx = 0;
while (current_iteration_idx < trip_count && execution_condition) {
Expand Down Expand Up @@ -145,6 +146,15 @@ struct loop_impl : typed_primitive_impl<loop> {
loop_carried_dep.emplace_back(body_event);
}

// Collect output events for waiting for all iterations finishing
for (auto& out : body_network->get_outputs()) {
auto output_id = out->id();
if (body_network->has_event(output_id)) {
auto output_event = body_network->get_primitive_event(output_id);
all_events.push_back(output_event);
}
}

//TODO: execution_condition is prepared as they are presented in the
// ngraph opset document for loop operation.
// However they are not being used yet and only TensorIterator which
Expand All @@ -157,7 +167,9 @@ struct loop_impl : typed_primitive_impl<loop> {
++current_iteration_idx;
}

body_network->reset_execution();
// Reset network and wait for all collected events
body_network->reset_execution(false);
stream.wait_for_events(all_events);

// Concatenate sliced output to the outer network
for (size_t i = 0; i < concatenated_output_mem_mappings.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class wait_for_events_impl : public primitive_impl {

event::ptr execute(const std::vector<event::ptr>& events, primitive_inst& instance) override {
auto& stream = instance.get_network().get_stream();
return stream.enqueue_marker(events);

return events.empty() ? stream.create_user_event(true)
: stream.enqueue_marker(events);
}

static std::unique_ptr<primitive_impl> create_data(const data_node& data, const kernel_impl_params&) {
Expand Down
19 changes: 8 additions & 11 deletions src/plugins/intel_gpu/src/graph/impls/ocl/primitive_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ struct typed_primitive_impl_ocl : public typed_primitive_impl<PType> {
if (group && !is_output)
return stream.group_events(events);

return stream.enqueue_marker(events, is_output);
return events.empty() ? stream.create_user_event(true)
: stream.enqueue_marker(events, is_output);
}

void init_kernels(const kernels_cache& kernels_cache, const kernel_impl_params& params) override {
Expand Down Expand Up @@ -259,14 +260,9 @@ struct typed_primitive_impl_ocl : public typed_primitive_impl<PType> {
if (_kernel_data.kernels[kd_idx].skip_execution)
continue;
std::vector<event::ptr> new_events;
// is any user of the prim's users is an detecion output, set prim as a output event (event won't be nullptr)
bool is_output_event;
if (instance.node != nullptr) {
auto users = instance.node->get_users();
is_output_event = is_any_user_cpu(users) || instance.node->is_output();
} else {
is_output_event = instance.is_output_event();
}

// If any user of the prim's users is CPU implementation or network's output, set prim as a output event (event won't be nullptr)
bool needs_completion_event = instance.needs_completion_event();

auto& params = _kernel_data.kernels[kd_idx].params;
auto args = get_arguments(instance);
Expand All @@ -280,9 +276,10 @@ struct typed_primitive_impl_ocl : public typed_primitive_impl<PType> {
const auto& lws = params.workGroups.local;

GPU_DEBUG_TRACE_DETAIL << "Enqueue kernel " << kd_idx << ": gws=[" << gws[0] << ", " << gws[1] << ", " << gws[2] << "] "
<< "lws=[" << lws[0] << ", " << lws[1] << ", " << lws[2] << "]" << std::endl;
<< "lws=[" << lws[0] << ", " << lws[1] << ", " << lws[2] << "]"
<< (needs_completion_event ? " has_completion_event=true" : "") << std::endl;

auto ev = stream.enqueue_kernel(*_kernels[kd_idx], params, args, tmp_events, is_output_event);
auto ev = stream.enqueue_kernel(*_kernels[kd_idx], params, args, tmp_events, needs_completion_event);
new_events.push_back(ev);
all_events.push_back(ev);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,16 +460,6 @@ struct typed_primitive_onednn_impl : public typed_primitive_impl<PType> {

void init_kernels(const kernels_cache&, const kernel_impl_params&) override { }

event::ptr aggregate_events(const std::vector<event::ptr>& events, stream& stream, bool group = false, bool is_output = false) const {
if (events.size() == 1 && !is_output)
return events[0];

if (group && !is_output)
return stream.group_events(events);

return stream.enqueue_marker(events, is_output);
}

void set_arguments_impl(typed_primitive_inst<PType>& instance) override {
if (instance.can_be_optimized())
return;
Expand Down Expand Up @@ -499,6 +489,12 @@ struct typed_primitive_onednn_impl : public typed_primitive_impl<PType> {
}
throw; // rethrowing dnnl::error if not out_of_memory
}

// If oneDNN primitive is the output primitive or it's user is CPU implementation, then enqueue marker
// with empty events wait list (which will trigger wait for all previously enqueued tasks) and
// return it as oneDNN primitive's event as it is a single option for proper synchronization
if (instance.needs_completion_event())
event = stream.enqueue_marker({});
}

if (_enable_profiling) {
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/intel_gpu/src/graph/include/primitive_inst.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class primitive_inst {
bool is_dynamic() const { return _is_dynamic; }
bool can_share_buffer() const { return _can_share_buffer; }
bool is_constant() const { return _is_constant; }
bool is_output_event() const { return _is_output_event; }
bool needs_completion_event() const { return _needs_completion_event; }
bool has_unfused_subgraph() const { return (_unfused_subgraph != nullptr); }

void allocate_internal_buffers();
Expand Down Expand Up @@ -330,7 +330,7 @@ class primitive_inst {
bool _can_be_optimized = false;
bool _can_share_buffer = true;
bool _is_constant = false;
bool _is_output_event = false;
bool _needs_completion_event = false;

size_t max_output_layout_size = 0;
std::vector<size_t> max_intermediates_memory_sizes;
Expand Down
7 changes: 5 additions & 2 deletions src/plugins/intel_gpu/src/graph/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1396,8 +1396,11 @@ void network::execute_primitive(const std::shared_ptr<primitive_inst>& primitive
const std::vector<event::ptr>& events) {
event::ptr ev = primitive->execute(events);

// Collect events only for OOO queue and Profiling mode
if (get_stream().get_queue_type() == QueueTypes::out_of_order || _enable_profiling) {
// Collect events under any of the following conditions:
// 1) OOO queue execution
// 2) Profiling mode is enabled
// 3) Primitive has CPU user or primitive is output
if (get_stream().get_queue_type() == QueueTypes::out_of_order || _enable_profiling || primitive->needs_completion_event()) {
auto id = primitive->id();
_events.insert({id, ev});
}
Expand Down
13 changes: 7 additions & 6 deletions src/plugins/intel_gpu/src/graph/primitive_inst.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,9 @@ event::ptr primitive_inst::execute(const std::vector<event::ptr>& events) {
dependencies = events;
} else {
auto queue_type = get_network().get_stream().get_queue_type();
if (queue_type == QueueTypes::out_of_order) {
// Prepare dependencies events in case of OOO queue, CPU implementation,
// or optimized_out impl which has CPU users (needs_completion_event() && !is_output() condition)
if (queue_type == QueueTypes::out_of_order || _impl->is_cpu() || (can_be_optimized() && needs_completion_event() && !is_output())) {
dependencies.reserve(dependencies.size() + _exec_deps.size());
for (auto& input : _exec_deps) {
auto id = input->id();
Expand Down Expand Up @@ -803,7 +805,8 @@ primitive_inst::primitive_inst(network& network, program_node const& node, bool
, _fused_mem_offset((_fused_mem_count > 0 && node.has_fused_dep()) ? node.get_first_fused_dep_idx() : 0)
, _can_be_optimized(node.can_be_optimized())
, _can_share_buffer(node.can_share_buffer())
, _is_constant(node.is_constant()) {
, _is_constant(node.is_constant())
, _needs_completion_event(is_any_user_cpu(node.get_users()) || node.is_output()) {
if (allocate_memory) {
// In case when output is mutable_data primitive, and other users dependencies are only used for
// suychronization, The output memory of such primitive will be fused with mutable_data
Expand Down Expand Up @@ -1392,9 +1395,7 @@ void primitive_inst::save(cldnn::BinaryOutputBuffer& ob) const {
ob << can_be_optimized();
ob << can_share_buffer();
ob << is_constant();
auto users = get_node().get_users();
bool is_output_event = is_any_user_cpu(users) || get_node().is_output();
ob << is_output_event;
ob << needs_completion_event();

if (type() == cldnn::data::type_id()) {
return;
Expand Down Expand Up @@ -1485,7 +1486,7 @@ void primitive_inst::load(cldnn::BinaryInputBuffer& ib) {
ib >> _can_be_optimized;
ib >> _can_share_buffer;
ib >> _is_constant;
ib >> _is_output_event;
ib >> _needs_completion_event;

if (type() == cldnn::data::type_id()) {
return;
Expand Down
9 changes: 7 additions & 2 deletions src/plugins/intel_gpu/src/runtime/ocl/ocl_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,13 @@ void ocl_stream::enqueue_barrier() {
}

event::ptr ocl_stream::enqueue_marker(std::vector<event::ptr> const& deps, bool is_output) {
if (deps.empty())
return std::make_shared<ocl_user_event>(_engine.get_cl_context(), true);
// Wait for all previously enqueued tasks if deps list is empty
if (deps.empty()) {
cl::Event ret_ev;
_command_queue.enqueueMarkerWithWaitList(nullptr, &ret_ev);

return std::make_shared<ocl_event>(ret_ev);
}

if (sync_method == sync_methods::events) {
cl::Event ret_ev;
Expand Down
149 changes: 149 additions & 0 deletions src/plugins/intel_gpu/tests/unit/module_tests/network_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
#include "intel_gpu/graph/network.hpp"
#include "intel_gpu/primitives/input_layout.hpp"
#include "intel_gpu/primitives/data.hpp"
#include "intel_gpu/primitives/activation.hpp"
#include "intel_gpu/primitives/broadcast.hpp"
#include "intel_gpu/primitives/concatenation.hpp"
#include "intel_gpu/primitives/reorder.hpp"
#include "intel_gpu/primitives/reshape.hpp"

#include "runtime/ocl/ocl_event.hpp"

#include <memory>

Expand Down Expand Up @@ -60,3 +65,147 @@ TEST(network_test, model_with_dynamic_input_is_dynamic) {

ASSERT_TRUE(net.is_dynamic());
}

TEST(network_test, has_proper_event_for_in_order_queue) {
auto& engine = get_test_engine();
layout in_layout{{1, 2, 2, 4}, data_types::f32, format::bfyx};
auto input_mem = engine.allocate_memory(in_layout);
auto const_mem = engine.allocate_memory({{1, 2, 2, 4}, data_types::f32, format::bfyx});

topology topology;
topology.add(input_layout("input1", in_layout));
topology.add(data("input2", const_mem));
topology.add(activation("activation1", input_info("input1"), activation_func::clamp, {-10.f, 10.f}));
topology.add(concatenation("concat", { input_info("activation1"), input_info("input2") }, 1));
topology.add(reorder("reorder", input_info("concat"), in_layout));
topology.add(activation("activation2", input_info("concat"), activation_func::relu));

auto impl_desc = ov::intel_gpu::ImplementationDesc{format::bfyx, "", impl_types::cpu};
auto impl_forcing_map = ov::intel_gpu::ImplForcingMap{{"activation2", impl_desc}};

auto config = get_test_default_config(engine);
config.set_property(ov::intel_gpu::queue_type(QueueTypes::in_order));
config.set_property(ov::intel_gpu::allow_new_shape_infer(true));
config.set_property(ov::intel_gpu::force_implementations(impl_forcing_map));

network net(engine, topology, config);

net.set_input_data("input1", input_mem);
net.execute();

ASSERT_FALSE(net.has_event("activation1"));
ASSERT_TRUE(net.has_event("concat"));
ASSERT_TRUE(net.has_event("reorder"));
ASSERT_TRUE(net.has_event("activation2"));

auto concat_ev = net.get_primitive_event("concat");
auto reorder_ev = net.get_primitive_event("reorder");
auto activation_ev = net.get_primitive_event("activation2");

ASSERT_NO_THROW(downcast<ocl::ocl_base_event>(concat_ev.get()));
ASSERT_NO_THROW(downcast<ocl::ocl_base_event>(reorder_ev.get()));
ASSERT_NO_THROW(downcast<ocl::ocl_base_event>(activation_ev.get()));

// Check if we have real underlying OpenCL events
ASSERT_TRUE(downcast<ocl::ocl_base_event>(concat_ev.get())->get().get() != nullptr);
ASSERT_TRUE(downcast<ocl::ocl_base_event>(reorder_ev.get())->get().get() != nullptr);
ASSERT_TRUE(downcast<ocl::ocl_base_event>(activation_ev.get())->get().get() != nullptr);
}

TEST(network_test, has_proper_event_for_in_order_queue_optimized_out) {
auto& engine = get_test_engine();
layout in_layout{{1, 2, 2, 4}, data_types::f32, format::bfyx};
auto input_mem = engine.allocate_memory(in_layout);
auto const_mem = engine.allocate_memory({{1, 2, 2, 4}, data_types::f32, format::bfyx});

topology topology;
topology.add(input_layout("input1", in_layout));
topology.add(data("input2", const_mem));
topology.add(concatenation("concat", { input_info("input1"), input_info("input2") }, 1));
topology.add(reshape("reshape", input_info("concat"), false, {1, 2, 4, 4}, {1, 2, 4, 4}));
topology.add(reorder("reorder", input_info("reshape"), in_layout));
topology.add(activation("activation", input_info("reshape"), activation_func::relu));

auto impl_desc = ov::intel_gpu::ImplementationDesc{format::bfyx, "", impl_types::cpu};
auto impl_forcing_map = ov::intel_gpu::ImplForcingMap{{"activation", impl_desc}};

auto config = get_test_default_config(engine);
config.set_property(ov::intel_gpu::queue_type(QueueTypes::in_order));
config.set_property(ov::intel_gpu::allow_new_shape_infer(true));
config.set_property(ov::intel_gpu::force_implementations(impl_forcing_map));

network net(engine, topology, config);

net.set_input_data("input1", input_mem);
net.execute();

ASSERT_TRUE(net.has_event("concat"));
ASSERT_TRUE(net.has_event("reshape"));
ASSERT_TRUE(net.has_event("reorder"));
ASSERT_TRUE(net.has_event("activation"));

auto concat_ev = net.get_primitive_event("concat");
auto reshape_ev = net.get_primitive_event("reshape");
auto reorder_ev = net.get_primitive_event("reorder");
auto activation_ev = net.get_primitive_event("activation");

ASSERT_NO_THROW(downcast<ocl::ocl_base_event>(concat_ev.get()));
ASSERT_NO_THROW(downcast<ocl::ocl_base_event>(reshape_ev.get()));
ASSERT_NO_THROW(downcast<ocl::ocl_base_event>(reorder_ev.get()));
ASSERT_NO_THROW(downcast<ocl::ocl_base_event>(activation_ev.get()));

// Check if we have real underlying OpenCL events
ASSERT_TRUE(downcast<ocl::ocl_base_event>(concat_ev.get())->get().get() != nullptr);
ASSERT_TRUE(downcast<ocl::ocl_base_event>(reshape_ev.get())->get().get() != nullptr);
ASSERT_TRUE(downcast<ocl::ocl_base_event>(reorder_ev.get())->get().get() != nullptr);
ASSERT_TRUE(downcast<ocl::ocl_base_event>(activation_ev.get())->get().get() != nullptr);
}

#ifdef ENABLE_ONEDNN_FOR_GPU
TEST(network_test, has_proper_event_for_in_order_queue_onednn) {
auto& engine = get_test_engine();
if (!engine.get_device_info().supports_immad)
return;

layout in_layout{{1, 16, 2, 4}, data_types::f32, format::bfyx};
auto input_mem = engine.allocate_memory(in_layout);
auto weights = engine.allocate_memory({{16, 16, 1, 1}, data_types::f32, format::bfyx});

topology topology;
topology.add(input_layout("input", in_layout));
topology.add(data("weights", weights));
topology.add(convolution("conv", input_info("input"), "weights", "", 1, {1, 1}, {1, 1}, {0, 0}, {0, 0}, false));
topology.add(activation("activation", input_info("conv"), activation_func::relu));
topology.add(reorder("reorder", input_info("conv"), in_layout));

auto impl_desc_cpu = ov::intel_gpu::ImplementationDesc{format::bfyx, "", impl_types::cpu};
auto impl_desc_onednn = ov::intel_gpu::ImplementationDesc{format::bfyx, "", impl_types::onednn};
auto impl_forcing_map = ov::intel_gpu::ImplForcingMap{{"conv", impl_desc_onednn}, {"activation", impl_desc_cpu}};

auto config = get_test_default_config(engine);
config.set_property(ov::intel_gpu::queue_type(QueueTypes::in_order));
config.set_property(ov::intel_gpu::allow_new_shape_infer(true));
config.set_property(ov::intel_gpu::force_implementations(impl_forcing_map));

network net(engine, topology, config);
net.set_input_data("input", input_mem);
net.execute();

ASSERT_TRUE(net.has_event("conv"));
ASSERT_TRUE(net.has_event("reorder"));
ASSERT_TRUE(net.has_event("activation"));

auto conv_ev = net.get_primitive_event("conv");
auto reorder_ev = net.get_primitive_event("reorder");
auto activation_ev = net.get_primitive_event("activation");

ASSERT_NO_THROW(downcast<ocl::ocl_base_event>(conv_ev.get()));
ASSERT_NO_THROW(downcast<ocl::ocl_base_event>(reorder_ev.get()));
ASSERT_NO_THROW(downcast<ocl::ocl_base_event>(activation_ev.get()));

// Check if we have real underlying OpenCL events
ASSERT_TRUE(downcast<ocl::ocl_base_event>(conv_ev.get())->get().get() != nullptr);
ASSERT_TRUE(downcast<ocl::ocl_base_event>(reorder_ev.get())->get().get() != nullptr);
ASSERT_TRUE(downcast<ocl::ocl_base_event>(activation_ev.get())->get().get() != nullptr);
}
#endif
Loading

0 comments on commit 1540833

Please sign in to comment.