diff --git a/rosbag2_py/rosbag2_py/_transport.pyi b/rosbag2_py/rosbag2_py/_transport.pyi new file mode 100644 index 0000000000..2d3f771100 --- /dev/null +++ b/rosbag2_py/rosbag2_py/_transport.pyi @@ -0,0 +1,67 @@ +from typing import Any, List + +import datetime +import rosbag2_py._storage + +class PlayOptions: + clock_publish_frequency: float + clock_publish_on_topic_publish: bool + clock_topics: List[str] + delay: float + disable_keyboard_controls: bool + disable_loan_message: bool + loop: bool + node_prefix: str + playback_duration: float + playback_until_timestamp: int + rate: float + read_ahead_queue_size: int + start_offset: float + start_paused: bool + topic_qos_profile_overrides: dict + topic_remapping_options: List[str] + topics_regex_to_exclude: str + topics_regex_to_filter: str + topics_to_filter: List[str] + wait_acked_timeout: int + def __init__(self) -> None: ... + +class Player: + def __init__(self) -> None: ... + def burst(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions, num_messages: int) -> None: ... + def cancel(self, *args, **kwargs) -> Any: ... + def play(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions) -> None: ... + +class RecordOptions: + all_services: bool + all_topics: bool + compression_format: str + compression_mode: str + compression_queue_size: int + compression_threads: int + exclude_regex: str + exclude_service_events: List[str] + exclude_topic_types: List[str] + exclude_topics: List[str] + ignore_leaf_topics: bool + include_hidden_topics: bool + include_unpublished_topics: bool + is_discovery_disabled: bool + node_prefix: str + regex: str + rmw_serialization_format: str + services: List[str] + start_paused: bool + topic_polling_interval: datetime.timedelta + topic_qos_profile_overrides: dict + topic_types: List[str] + topics: List[str] + use_sim_time: bool + def __init__(self) -> None: ... + +class Recorder: + def __init__(self) -> None: ... + def cancel(self, *args, **kwargs) -> Any: ... + def record(self, storage_options: rosbag2_py._storage.StorageOptions, record_options: RecordOptions, node_name: str = ...) -> None: ... + +def bag_rewrite(arg0: List[rosbag2_py._storage.StorageOptions], arg1: str) -> None: ... diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index cc2d128657..08adcc3754 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -139,7 +139,13 @@ namespace rosbag2_py class Player { public: +<<<<<<< HEAD explicit Player(const std::string & log_level = "info") +======= + using SignalHandlerType = void (*)(int); + + Player() +>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557)) { Arguments arguments({"--ros-args", "--log-level", log_level}); rclcpp::init(arguments.argc(), arguments.argv()); @@ -150,10 +156,17 @@ class Player rclcpp::shutdown(); } + static void cancel() + { + exit_ = true; + wait_for_exit_cv_.notify_all(); + } + void play( const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options) { +<<<<<<< HEAD auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); auto player = std::make_shared( std::move(reader), storage_options, play_options); @@ -168,13 +181,141 @@ class Player exec.cancel(); spin_thread.join(); +======= + play_impl(storage_options, play_options, false); + } + + void burst( + const rosbag2_storage::StorageOptions & storage_options, + PlayOptions & play_options, + size_t num_messages) + { + play_impl(storage_options, play_options, true, num_messages); +>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557)) } + +protected: + static void signal_handler(int sig_num) + { + if (sig_num == SIGINT || sig_num == SIGTERM) { + deferred_sig_number_ = sig_num; + rosbag2_py::Player::cancel(); + } + } + + static void install_signal_handlers() + { + deferred_sig_number_ = -1; + old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Player::signal_handler); + old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Player::signal_handler); + } + + static void uninstall_signal_handlers() + { + if (old_sigterm_handler_ != SIG_ERR) { + std::signal(SIGTERM, old_sigterm_handler_); + old_sigterm_handler_ = SIG_ERR; + } + if (old_sigint_handler_ != SIG_ERR) { + std::signal(SIGINT, old_sigint_handler_); + old_sigint_handler_ = SIG_ERR; + } + deferred_sig_number_ = -1; + } + + static void process_deferred_signal() + { + auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) { + if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) { + signal_handler(sig_num); + } + }; + + if (deferred_sig_number_ == SIGINT) { + call_signal_handler(old_sigint_handler_, deferred_sig_number_); + } else if (deferred_sig_number_ == SIGTERM) { + call_signal_handler(old_sigterm_handler_, deferred_sig_number_); + } + } + + void play_impl( + const rosbag2_storage::StorageOptions & storage_options, + PlayOptions & play_options, + bool burst = false, + size_t burst_num_messages = 0) + { + install_signal_handlers(); + try { + auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); + auto player = std::make_shared( + std::move(reader), storage_options, play_options); + + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(player); + auto spin_thread = std::thread( + [&exec]() { + exec.spin(); + }); + player->play(); + + auto wait_for_exit_thread = std::thread( + [&]() { + std::unique_lock lock(wait_for_exit_mutex_); + wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Player::exit_.load();}); + player->stop(); + }); + { + // Release the GIL for long-running play, so that calling Python code + // can use other threads + py::gil_scoped_release release; + if (burst) { + player->burst(burst_num_messages); + } + player->wait_for_playback_to_finish(); + } + + rosbag2_py::Player::cancel(); // Need to trigger exit from wait_for_exit_thread + if (wait_for_exit_thread.joinable()) { + wait_for_exit_thread.join(); + } + + exec.cancel(); + if (spin_thread.joinable()) { + spin_thread.join(); + } + exec.remove_node(player); + } catch (...) { + process_deferred_signal(); + uninstall_signal_handlers(); + throw; + } + process_deferred_signal(); + uninstall_signal_handlers(); + } + + static std::atomic_bool exit_; + static std::condition_variable wait_for_exit_cv_; + static SignalHandlerType old_sigint_handler_; + static SignalHandlerType old_sigterm_handler_; + static int deferred_sig_number_; + std::mutex wait_for_exit_mutex_; }; +Player::SignalHandlerType Player::old_sigint_handler_ {SIG_ERR}; +Player::SignalHandlerType Player::old_sigterm_handler_ {SIG_ERR}; +int Player::deferred_sig_number_{-1}; +std::atomic_bool Player::exit_{false}; +std::condition_variable Player::wait_for_exit_cv_{}; + class Recorder { public: +<<<<<<< HEAD explicit Recorder(const std::string & log_level = "info") +======= + using SignalHandlerType = void (*)(int); + Recorder() +>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557)) { Arguments arguments({"--ros-args", "--log-level", log_level}); rclcpp::init(arguments.argc(), arguments.argv()); @@ -189,15 +330,7 @@ class Recorder const rosbag2_storage::StorageOptions & storage_options, RecordOptions & record_options) { - auto old_sigterm_handler = std::signal( - SIGTERM, [](int /* signal */) { - rosbag2_py::Recorder::cancel(); - }); - auto old_sigint_handler = std::signal( - SIGINT, [](int /* signal */) { - rosbag2_py::Recorder::cancel(); - }); - + install_signal_handlers(); try { exit_ = false; auto exec = std::make_unique(); @@ -231,22 +364,12 @@ class Recorder } exec->remove_node(recorder); } catch (...) { - // Return old signal handlers anyway - if (old_sigterm_handler != SIG_ERR) { - std::signal(SIGTERM, old_sigterm_handler); - } - if (old_sigint_handler != SIG_ERR) { - std::signal(SIGTERM, old_sigint_handler); - } + process_deferred_signal(); + uninstall_signal_handlers(); throw; } - // Return old signal handlers - if (old_sigterm_handler != SIG_ERR) { - std::signal(SIGTERM, old_sigterm_handler); - } - if (old_sigint_handler != SIG_ERR) { - std::signal(SIGTERM, old_sigint_handler); - } + process_deferred_signal(); + uninstall_signal_handlers(); } static void cancel() @@ -256,11 +379,60 @@ class Recorder } protected: + static void signal_handler(int sig_num) + { + if (sig_num == SIGINT || sig_num == SIGTERM) { + deferred_sig_number_ = sig_num; + rosbag2_py::Recorder::cancel(); + } + } + + static void install_signal_handlers() + { + deferred_sig_number_ = -1; + old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Recorder::signal_handler); + old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Recorder::signal_handler); + } + + static void uninstall_signal_handlers() + { + if (old_sigterm_handler_ != SIG_ERR) { + std::signal(SIGTERM, old_sigterm_handler_); + old_sigterm_handler_ = SIG_ERR; + } + if (old_sigint_handler_ != SIG_ERR) { + std::signal(SIGINT, old_sigint_handler_); + old_sigint_handler_ = SIG_ERR; + } + deferred_sig_number_ = -1; + } + + static void process_deferred_signal() + { + auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) { + if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) { + signal_handler(sig_num); + } + }; + + if (deferred_sig_number_ == SIGINT) { + call_signal_handler(old_sigint_handler_, deferred_sig_number_); + } else if (deferred_sig_number_ == SIGTERM) { + call_signal_handler(old_sigterm_handler_, deferred_sig_number_); + } + } + static std::atomic_bool exit_; static std::condition_variable wait_for_exit_cv_; + static SignalHandlerType old_sigint_handler_; + static SignalHandlerType old_sigterm_handler_; + static int deferred_sig_number_; std::mutex wait_for_exit_mutex_; }; +Recorder::SignalHandlerType Recorder::old_sigint_handler_ {SIG_ERR}; +Recorder::SignalHandlerType Recorder::old_sigterm_handler_ {SIG_ERR}; +int Recorder::deferred_sig_number_{-1}; std::atomic_bool Recorder::exit_{false}; std::condition_variable Recorder::wait_for_exit_cv_{}; @@ -355,8 +527,16 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Player") .def(py::init()) +<<<<<<< HEAD .def(py::init()) .def("play", &rosbag2_py::Player::play) +======= + .def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) + .def( + "burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"), + py::arg("num_messages")) + .def_static("cancel", &rosbag2_py::Player::cancel) +>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557)) ; py::class_(m, "Recorder") diff --git a/rosbag2_py/test/test_transport.py b/rosbag2_py/test/test_transport.py index d67c472d01..47dc88ec3c 100644 --- a/rosbag2_py/test/test_transport.py +++ b/rosbag2_py/test/test_transport.py @@ -13,8 +13,13 @@ # limitations under the License. import datetime +<<<<<<< HEAD import os import sys +======= +from pathlib import Path +import re +>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557)) import threading @@ -33,6 +38,10 @@ sys.setdlopenflags(os.RTLD_GLOBAL | os.RTLD_LAZY) +RESOURCES_PATH = Path(__file__).parent / 'resources' +PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING = r'\[rosbag2_player]: Playback until timestamp: -1' + + def test_options_qos_conversion(): # Tests that the to-and-from C++ conversions are working properly in the pybind structs simple_overrides = { @@ -106,3 +115,47 @@ def test_record_cancel(tmp_path): assert wait_for(lambda: metadata_path.is_file() and db3_path.is_file(), timeout=rclpy.duration.Duration(seconds=3)) record_thread.join() +<<<<<<< HEAD +======= + + metadata = metadata_io.read_metadata(str(bag_path)) + assert len(metadata.relative_file_paths) + storage_path = bag_path / metadata.relative_file_paths[0] + assert wait_for(lambda: storage_path.is_file(), + timeout=rclpy.duration.Duration(seconds=3)) + + +@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS) +def test_play_cancel(storage_id, capfd): + bag_path = str(RESOURCES_PATH / storage_id / 'talker') + storage_options, converter_options = get_rosbag_options(bag_path, storage_id) + + player = rosbag2_py.Player() + + play_options = rosbag2_py.PlayOptions() + play_options.loop = True + play_options.start_paused = True + + player_thread = threading.Thread( + target=player.play, + args=(storage_options, play_options), + daemon=True) + player_thread.start() + + def check_playback_start_output(cumulative_out, cumulative_err): + out, err = capfd.readouterr() + cumulative_err += err + cumulative_out += out + expected_string_regex = re.compile(PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING) + matches = expected_string_regex.search(cumulative_err) + return matches is not None + + captured_out = '' + captured_err = '' + assert wait_for(lambda: check_playback_start_output(captured_out, captured_err), + timeout=rclpy.duration.Duration(seconds=3)) + + player.cancel() + player_thread.join(3) + assert not player_thread.is_alive() +>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557)) diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index c8e9e5f58f..075d8eb38d 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -333,7 +333,12 @@ bool Player::play_next() return false; } +<<<<<<< HEAD RCLCPP_INFO_STREAM(get_logger(), "Playing next message."); +======= + // Use RCLCPP_DEBUG_STREAM to avoid delays in the burst mode + RCLCPP_DEBUG_STREAM(owner_->get_logger(), "Playing next message."); +>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557)) // Temporary take over playback from play_messages_from_queue() std::lock_guard main_play_loop_lk(skip_message_in_main_play_loop_mutex_); @@ -371,6 +376,7 @@ size_t Player::burst(const size_t num_messages) } } + RCLCPP_INFO_STREAM(owner_->get_logger(), "Burst " << messages_played << " messages."); return messages_played; }