diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 574519ac41..9e906d103e 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -161,6 +161,8 @@ namespace rosbag2_py class Player { public: + using SignalHandlerType = void (*)(int); + explicit Player(const std::string & log_level = "info") { Arguments arguments({"--ros-args", "--log-level", log_level}); @@ -172,24 +174,17 @@ class Player rclcpp::shutdown(); } + static void cancel() + { + exit_ = true; + wait_for_exit_cv_.notify_all(); + } + void play( const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options) { - auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); - auto player = std::make_shared( - std::move(reader), storage_options, play_options); - - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(player); - auto spin_thread = std::thread( - [&exec]() { - exec.spin(); - }); - player->play(); - - exec.cancel(); - spin_thread.join(); + play_impl(storage_options, play_options, false); } void burst( @@ -197,31 +192,134 @@ class Player PlayOptions & play_options, size_t num_messages) { - auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); - auto player = std::make_shared( - std::move(reader), storage_options, play_options); - - rclcpp::executors::SingleThreadedExecutor exec; - exec.add_node(player); - auto spin_thread = std::thread( - [&exec]() { - exec.spin(); - }); - auto play_thread = std::thread( - [&player]() { - player->play(); - }); - player->burst(num_messages); - - exec.cancel(); - spin_thread.join(); - play_thread.join(); + play_impl(storage_options, play_options, true, num_messages); + } + +protected: + static void signal_handler(int sig_num) + { + if (sig_num == SIGINT || sig_num == SIGTERM) { + deferred_sig_number_ = sig_num; + rosbag2_py::Player::cancel(); + } + } + + static void install_signal_handlers() + { + deferred_sig_number_ = -1; + old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Player::signal_handler); + old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Player::signal_handler); + } + + static void uninstall_signal_handlers() + { + if (old_sigterm_handler_ != SIG_ERR) { + std::signal(SIGTERM, old_sigterm_handler_); + old_sigterm_handler_ = SIG_ERR; + } + if (old_sigint_handler_ != SIG_ERR) { + std::signal(SIGINT, old_sigint_handler_); + old_sigint_handler_ = SIG_ERR; + } + deferred_sig_number_ = -1; + } + + static void process_deferred_signal() + { + auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) { + if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) { + signal_handler(sig_num); + } + }; + + if (deferred_sig_number_ == SIGINT) { + call_signal_handler(old_sigint_handler_, deferred_sig_number_); + } else if (deferred_sig_number_ == SIGTERM) { + call_signal_handler(old_sigterm_handler_, deferred_sig_number_); + } } + + void play_impl( + const rosbag2_storage::StorageOptions & storage_options, + PlayOptions & play_options, + bool burst = false, + size_t burst_num_messages = 0) + { + exit_ = false; + install_signal_handlers(); + try { + auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); + auto player = std::make_shared( + std::move(reader), storage_options, play_options); + + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(player); + auto spin_thread = std::thread( + [&exec]() { + exec.spin(); + }); + + auto wait_for_exit_thread = std::thread( + [&]() { + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Player::exit_.load();}); + player->stop(); + }); + { + // Release the GIL for long-running play, so that calling Python code + // can use other threads + py::gil_scoped_release release; + if (burst) { + auto play_thread = std::thread( + [&player]() { + player->play(); + }); + player->burst(burst_num_messages); + rosbag2_py::Player::cancel(); // Need to trigger exit from wait_for_exit_thread + play_thread.join(); + } else { + player->play(); + rosbag2_py::Player::cancel(); // Need to trigger exit from wait_for_exit_thread + } + } + + if (wait_for_exit_thread.joinable()) { + wait_for_exit_thread.join(); + } + + exec.cancel(); + if (spin_thread.joinable()) { + spin_thread.join(); + } + exec.remove_node(player); + } catch (...) { + process_deferred_signal(); + uninstall_signal_handlers(); + throw; + } + process_deferred_signal(); + uninstall_signal_handlers(); + } + + static std::atomic_bool exit_; + static std::condition_variable wait_for_exit_cv_; + static SignalHandlerType old_sigint_handler_; + static SignalHandlerType old_sigterm_handler_; + static int deferred_sig_number_; + std::mutex wait_for_exit_mutex_; }; +Player::SignalHandlerType Player::old_sigint_handler_ {SIG_ERR}; +Player::SignalHandlerType Player::old_sigterm_handler_ {SIG_ERR}; +int Player::deferred_sig_number_{-1}; +std::atomic_bool Player::exit_{false}; +std::condition_variable Player::wait_for_exit_cv_{}; + class Recorder { public: + using SignalHandlerType = void (*)(int); + explicit Recorder(const std::string & log_level = "info") { Arguments arguments({"--ros-args", "--log-level", log_level}); @@ -238,15 +336,7 @@ class Recorder RecordOptions & record_options, std::string & node_name) { - auto old_sigterm_handler = std::signal( - SIGTERM, [](int /* signal */) { - rosbag2_py::Recorder::cancel(); - }); - auto old_sigint_handler = std::signal( - SIGINT, [](int /* signal */) { - rosbag2_py::Recorder::cancel(); - }); - + install_signal_handlers(); try { exit_ = false; auto exec = std::make_unique(); @@ -280,22 +370,12 @@ class Recorder } exec->remove_node(recorder); } catch (...) { - // Return old signal handlers anyway - if (old_sigterm_handler != SIG_ERR) { - std::signal(SIGTERM, old_sigterm_handler); - } - if (old_sigint_handler != SIG_ERR) { - std::signal(SIGTERM, old_sigint_handler); - } + process_deferred_signal(); + uninstall_signal_handlers(); throw; } - // Return old signal handlers - if (old_sigterm_handler != SIG_ERR) { - std::signal(SIGTERM, old_sigterm_handler); - } - if (old_sigint_handler != SIG_ERR) { - std::signal(SIGTERM, old_sigint_handler); - } + process_deferred_signal(); + uninstall_signal_handlers(); } static void cancel() @@ -305,11 +385,60 @@ class Recorder } protected: + static void signal_handler(int sig_num) + { + if (sig_num == SIGINT || sig_num == SIGTERM) { + deferred_sig_number_ = sig_num; + rosbag2_py::Recorder::cancel(); + } + } + + static void install_signal_handlers() + { + deferred_sig_number_ = -1; + old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Recorder::signal_handler); + old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Recorder::signal_handler); + } + + static void uninstall_signal_handlers() + { + if (old_sigterm_handler_ != SIG_ERR) { + std::signal(SIGTERM, old_sigterm_handler_); + old_sigterm_handler_ = SIG_ERR; + } + if (old_sigint_handler_ != SIG_ERR) { + std::signal(SIGINT, old_sigint_handler_); + old_sigint_handler_ = SIG_ERR; + } + deferred_sig_number_ = -1; + } + + static void process_deferred_signal() + { + auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) { + if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) { + signal_handler(sig_num); + } + }; + + if (deferred_sig_number_ == SIGINT) { + call_signal_handler(old_sigint_handler_, deferred_sig_number_); + } else if (deferred_sig_number_ == SIGTERM) { + call_signal_handler(old_sigterm_handler_, deferred_sig_number_); + } + } + static std::atomic_bool exit_; static std::condition_variable wait_for_exit_cv_; + static SignalHandlerType old_sigint_handler_; + static SignalHandlerType old_sigterm_handler_; + static int deferred_sig_number_; std::mutex wait_for_exit_mutex_; }; +Recorder::SignalHandlerType Recorder::old_sigint_handler_ {SIG_ERR}; +Recorder::SignalHandlerType Recorder::old_sigterm_handler_ {SIG_ERR}; +int Recorder::deferred_sig_number_{-1}; std::atomic_bool Recorder::exit_{false}; std::condition_variable Recorder::wait_for_exit_cv_{}; @@ -430,7 +559,10 @@ PYBIND11_MODULE(_transport, m) { .def(py::init<>()) .def(py::init()) .def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) - .def("burst", &rosbag2_py::Player::burst) + .def( + "burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"), + py::arg("num_messages")) + .def_static("cancel", &rosbag2_py::Player::cancel) ; py::class_(m, "Recorder") diff --git a/rosbag2_py/test/test_transport.py b/rosbag2_py/test/test_transport.py index 4fa6d194b6..1c4c45742c 100644 --- a/rosbag2_py/test/test_transport.py +++ b/rosbag2_py/test/test_transport.py @@ -13,6 +13,8 @@ # limitations under the License. import datetime +from pathlib import Path +import re import threading from common import get_rosbag_options, wait_for @@ -26,6 +28,10 @@ from std_msgs.msg import String +RESOURCES_PATH = Path(__file__).parent / 'resources' +PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING = r'\[rosbag2_player]: Playback until timestamp: -1' + + def test_options_qos_conversion(): # Tests that the to-and-from C++ conversions are working properly in the pybind structs simple_overrides = { @@ -106,3 +112,38 @@ def test_record_cancel(tmp_path, storage_id): storage_path = bag_path / metadata.relative_file_paths[0] assert wait_for(lambda: storage_path.is_file(), timeout=rclpy.duration.Duration(seconds=3)) + + +@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) +def test_play_cancel(storage_id, capfd): + bag_path = str(RESOURCES_PATH / storage_id / 'talker') + storage_options, converter_options = get_rosbag_options(bag_path, storage_id) + + player = rosbag2_py.Player() + + play_options = rosbag2_py.PlayOptions() + play_options.loop = True + play_options.start_paused = True + + player_thread = threading.Thread( + target=player.play, + args=(storage_options, play_options), + daemon=True) + player_thread.start() + + def check_playback_start_output(cumulative_out, cumulative_err): + out, err = capfd.readouterr() + cumulative_err += err + cumulative_out += out + expected_string_regex = re.compile(PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING) + matches = expected_string_regex.search(cumulative_err) + return matches is not None + + captured_out = '' + captured_err = '' + assert wait_for(lambda: check_playback_start_output(captured_out, captured_err), + timeout=rclpy.duration.Duration(seconds=3)) + + player.cancel() + player_thread.join(3) + assert not player_thread.is_alive() diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 2623ec17d1..67d4891ec1 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -384,7 +384,8 @@ bool Player::play_next() return false; } - RCLCPP_INFO_STREAM(get_logger(), "Playing next message."); + // Use RCLCPP_DEBUG_STREAM to avoid delays in the burst mode + RCLCPP_DEBUG_STREAM(get_logger(), "Playing next message."); // Temporary take over playback from play_messages_from_queue() std::lock_guard main_play_loop_lk(skip_message_in_main_play_loop_mutex_); @@ -434,6 +435,7 @@ size_t Player::burst(const size_t num_messages) } } + RCLCPP_INFO_STREAM(get_logger(), "Burst " << messages_played << " messages."); return messages_played; }