From 207caba39f0fe1b69c13feaa0293a8e528ce1166 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Fri, 4 Nov 2022 12:09:23 -0600 Subject: [PATCH 1/3] Fix throwing errors from Executor.join_async() --- python/srf/_pysrf/src/executor.cpp | 13 +- python/srf/tests/CMakeLists.txt | 4 + python/srf/tests/utils.cpp | 51 ++++++++ python/tests/test_executor.py | 187 +++++++++++++++++++++++++++++ 4 files changed, 253 insertions(+), 2 deletions(-) create mode 100644 python/srf/tests/utils.cpp create mode 100644 python/tests/test_executor.py diff --git a/python/srf/_pysrf/src/executor.cpp b/python/srf/_pysrf/src/executor.cpp index b09de9cc8..04883ab96 100644 --- a/python/srf/_pysrf/src/executor.cpp +++ b/python/srf/_pysrf/src/executor.cpp @@ -191,9 +191,15 @@ void Executor::stop() void Executor::join() { - // Release the GIL before blocking - py::gil_scoped_release nogil; + { + // Release the GIL before blocking + py::gil_scoped_release nogil; + // Wait without the GIL + m_join_future.wait(); + } + + // Call get() with the GIL to rethrow any exceptions m_join_future.get(); } @@ -207,6 +213,9 @@ std::shared_ptr Executor::join_async() // Grab the GIL to return a py::object py::gil_scoped_acquire gil; + // Once we have the GIL, call get() to propagate any exceptions + this->m_join_future.get(); + return py::none(); }); diff --git a/python/srf/tests/CMakeLists.txt b/python/srf/tests/CMakeLists.txt index 5c6565d6b..f5a71152c 100644 --- a/python/srf/tests/CMakeLists.txt +++ b/python/srf/tests/CMakeLists.txt @@ -19,4 +19,8 @@ srf_add_pybind11_module(test_edges_cpp SOURCE_FILES test_edges.cpp ) +srf_add_pybind11_module(utils + SOURCE_FILES utils.cpp +) + list(POP_BACK CMAKE_MESSAGE_CONTEXT) diff --git a/python/srf/tests/utils.cpp b/python/srf/tests/utils.cpp new file mode 100644 index 000000000..33e54634b --- /dev/null +++ b/python/srf/tests/utils.cpp @@ -0,0 +1,51 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pysrf/utils.hpp" + +#include +#include + +namespace srf::pytests { + +namespace py = pybind11; + +PYBIND11_MODULE(utils, m) +{ + m.doc() = R"pbdoc()pbdoc"; + + pysrf::import(m, "srf"); + + m.def( + "throw_cpp_error", + [](std::string msg = "") { + if (msg.empty()) + { + msg = "Exception from C++ code"; + } + + throw std::runtime_error(msg); + }, + py::arg("msg") = ""); + +#ifdef VERSION_INFO + m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO); +#else + m.attr("__version__") = "dev"; +#endif +} +} // namespace srf::pytests diff --git a/python/tests/test_executor.py b/python/tests/test_executor.py new file mode 100644 index 000000000..367813f5f --- /dev/null +++ b/python/tests/test_executor.py @@ -0,0 +1,187 @@ +# SPDX-FileCopyrightText: Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import typing + +import pytest + +import srf +from srf.tests.utils import throw_cpp_error + + +def pairwise(t): + it = iter(t) + return zip(it, it) + + +node_fn_type = typing.Callable[[srf.Builder], srf.SegmentObject] + + +@pytest.fixture +def source_pyexception(): + + def build(builder: srf.Builder): + + def gen_data_and_raise(): + yield 1 + yield 2 + yield 3 + + raise RuntimeError("Raised python error") + + return builder.make_source("source", gen_data_and_raise) + + return build + + +@pytest.fixture +def source_cppexception(): + + def build(builder: srf.Builder): + + def gen_data_and_raise(): + yield 1 + yield 2 + yield 3 + + throw_cpp_error() + + return builder.make_source("source", gen_data_and_raise) + + return build + + +@pytest.fixture +def sink(): + + def build(builder: srf.Builder): + + def sink_on_next(data): + print("Got value: {}".format(data)) + + return builder.make_sink("sink", sink_on_next, None, None) + + return build + + +@pytest.fixture +def build_pipeline(): + + def inner(*node_fns: node_fn_type): + + def init_segment(builder: srf.Builder): + + created_nodes = [] + + # Loop over node creation functions + for n in node_fns: + created_nodes.append(n(builder)) + + # For each pair, call make_edge + for source, sink in pairwise(created_nodes): + builder.make_edge(source, sink) + + pipe = srf.Pipeline() + + pipe.make_segment("TestSegment11", init_segment) + + return pipe + + return inner + + +build_pipeline_type = typing.Callable[[typing.Tuple[node_fn_type, ...]], srf.Pipeline] + + +@pytest.fixture +def build_executor(): + + def inner(pipe: srf.Pipeline): + options = srf.Options() + + executor = srf.Executor(options) + executor.register_pipeline(pipe) + + executor.start() + + return executor + + return inner + + +build_executor_type = typing.Callable[[srf.Pipeline], srf.Executor] + + +def test_pyexception_in_source(source_pyexception: node_fn_type, + sink: node_fn_type, + build_pipeline: build_pipeline_type, + build_executor: build_executor_type): + + pipe = build_pipeline(source_pyexception, sink) + + executor = build_executor(pipe) + + with pytest.raises(RuntimeError): + executor.join() + + +def test_cppexception_in_source(source_cppexception: node_fn_type, + sink: node_fn_type, + build_pipeline: build_pipeline_type, + build_executor: build_executor_type): + + pipe = build_pipeline(source_cppexception, sink) + + executor = build_executor(pipe) + + with pytest.raises(RuntimeError): + executor.join() + + +def test_pyexception_in_source_async(source_pyexception: node_fn_type, + sink: node_fn_type, + build_pipeline: build_pipeline_type, + build_executor: build_executor_type): + + pipe = build_pipeline(source_pyexception, sink) + + async def run_pipeline(): + executor = build_executor(pipe) + + with pytest.raises(RuntimeError): + await executor.join_async() + + asyncio.run(run_pipeline()) + + +def test_cppexception_in_source_async(source_cppexception: node_fn_type, + sink: node_fn_type, + build_pipeline: build_pipeline_type, + build_executor: build_executor_type): + + pipe = build_pipeline(source_cppexception, sink) + + async def run_pipeline(): + executor = build_executor(pipe) + + with pytest.raises(RuntimeError): + await executor.join_async() + + asyncio.run(run_pipeline()) + + +if (__name__ in ("__main__", )): + test_pyexception_in_source() From a02c0bc37ab5abcf9c2c7bf22f79b24d9e107522 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Fri, 4 Nov 2022 12:39:50 -0600 Subject: [PATCH 2/3] Fixing style --- python/srf/tests/utils.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/srf/tests/utils.cpp b/python/srf/tests/utils.cpp index 33e54634b..b3e71b931 100644 --- a/python/srf/tests/utils.cpp +++ b/python/srf/tests/utils.cpp @@ -20,6 +20,8 @@ #include #include +#include + namespace srf::pytests { namespace py = pybind11; From a31efc04cafcff7d378bcf5c92a1067d63e9cdc8 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Fri, 4 Nov 2022 14:03:44 -0600 Subject: [PATCH 3/3] Incorporating feedback. --- python/srf/tests/utils.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/srf/tests/utils.cpp b/python/srf/tests/utils.cpp index b3e71b931..46dbb727d 100644 --- a/python/srf/tests/utils.cpp +++ b/python/srf/tests/utils.cpp @@ -26,13 +26,13 @@ namespace srf::pytests { namespace py = pybind11; -PYBIND11_MODULE(utils, m) +PYBIND11_MODULE(utils, module) { - m.doc() = R"pbdoc()pbdoc"; + module.doc() = R"pbdoc()pbdoc"; - pysrf::import(m, "srf"); + pysrf::import(module, "srf"); - m.def( + module.def( "throw_cpp_error", [](std::string msg = "") { if (msg.empty()) @@ -47,7 +47,7 @@ PYBIND11_MODULE(utils, m) #ifdef VERSION_INFO m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO); #else - m.attr("__version__") = "dev"; + module.attr("__version__") = "dev"; #endif } } // namespace srf::pytests