Skip to content

Commit

Permalink
Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#…
Browse files Browse the repository at this point in the history
…1557)

* Gracefully handle SIGINT and SIGTERM signals for play and burst CLI

- Intercept signals to call Player::stop() instead of relying on the
rclcpp::shutdown() in default signal handlers.
- Also added static Player::Cancel() method.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Add test_play_cancel to the test_transport.py

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Add missing imports in test_transport.py

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Regenerate Python stub files (.pyi) after altering API

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Add call for original deferred signal handler for Player and Recorder

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

---------

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
(cherry picked from commit 32bd5c3)

# Conflicts:
#	rosbag2_py/rosbag2_py/_transport.pyi
#	rosbag2_py/src/rosbag2_py/_transport.cpp
#	rosbag2_transport/src/rosbag2_transport/player.cpp
  • Loading branch information
MichaelOrlov authored and mergify[bot] committed Jun 4, 2024
1 parent 6050648 commit 9b7eb21
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 24 deletions.
67 changes: 67 additions & 0 deletions rosbag2_py/rosbag2_py/_transport.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Any, List

import datetime
import rosbag2_py._storage

class PlayOptions:
clock_publish_frequency: float
clock_publish_on_topic_publish: bool
clock_topics: List[str]
delay: float
disable_keyboard_controls: bool
disable_loan_message: bool
loop: bool
node_prefix: str
playback_duration: float
playback_until_timestamp: int
rate: float
read_ahead_queue_size: int
start_offset: float
start_paused: bool
topic_qos_profile_overrides: dict
topic_remapping_options: List[str]
topics_regex_to_exclude: str
topics_regex_to_filter: str
topics_to_filter: List[str]
wait_acked_timeout: int
def __init__(self) -> None: ...

class Player:
def __init__(self) -> None: ...
def burst(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions, num_messages: int) -> None: ...
def cancel(self, *args, **kwargs) -> Any: ...
def play(self, storage_options: rosbag2_py._storage.StorageOptions, play_options: PlayOptions) -> None: ...

class RecordOptions:
all_services: bool
all_topics: bool
compression_format: str
compression_mode: str
compression_queue_size: int
compression_threads: int
exclude_regex: str
exclude_service_events: List[str]
exclude_topic_types: List[str]
exclude_topics: List[str]
ignore_leaf_topics: bool
include_hidden_topics: bool
include_unpublished_topics: bool
is_discovery_disabled: bool
node_prefix: str
regex: str
rmw_serialization_format: str
services: List[str]
start_paused: bool
topic_polling_interval: datetime.timedelta
topic_qos_profile_overrides: dict
topic_types: List[str]
topics: List[str]
use_sim_time: bool
def __init__(self) -> None: ...

class Recorder:
def __init__(self) -> None: ...
def cancel(self, *args, **kwargs) -> Any: ...
def record(self, storage_options: rosbag2_py._storage.StorageOptions, record_options: RecordOptions, node_name: str = ...) -> None: ...

def bag_rewrite(arg0: List[rosbag2_py._storage.StorageOptions], arg1: str) -> None: ...
219 changes: 195 additions & 24 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ namespace rosbag2_py
class Player
{
public:
<<<<<<< HEAD
explicit Player(const std::string & log_level = "info")
=======
using SignalHandlerType = void (*)(int);

Player()
>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557))
{
Arguments arguments({"--ros-args", "--log-level", log_level});
rclcpp::init(arguments.argc(), arguments.argv());
Expand All @@ -172,10 +178,17 @@ class Player
rclcpp::shutdown();
}

static void cancel()
{
exit_ = true;
wait_for_exit_cv_.notify_all();
}

void play(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options)
{
<<<<<<< HEAD
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(reader), storage_options, play_options);
Expand All @@ -190,13 +203,17 @@ class Player

exec.cancel();
spin_thread.join();
=======
play_impl(storage_options, play_options, false);
>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557))
}

void burst(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
size_t num_messages)
{
<<<<<<< HEAD
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(reader), storage_options, play_options);
Expand All @@ -216,13 +233,133 @@ class Player
exec.cancel();
spin_thread.join();
play_thread.join();
=======
play_impl(storage_options, play_options, true, num_messages);
>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557))
}

protected:
static void signal_handler(int sig_num)
{
if (sig_num == SIGINT || sig_num == SIGTERM) {
deferred_sig_number_ = sig_num;
rosbag2_py::Player::cancel();
}
}

static void install_signal_handlers()
{
deferred_sig_number_ = -1;
old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Player::signal_handler);
old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Player::signal_handler);
}

static void uninstall_signal_handlers()
{
if (old_sigterm_handler_ != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler_);
old_sigterm_handler_ = SIG_ERR;
}
if (old_sigint_handler_ != SIG_ERR) {
std::signal(SIGINT, old_sigint_handler_);
old_sigint_handler_ = SIG_ERR;
}
deferred_sig_number_ = -1;
}

static void process_deferred_signal()
{
auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) {
if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) {
signal_handler(sig_num);
}
};

if (deferred_sig_number_ == SIGINT) {
call_signal_handler(old_sigint_handler_, deferred_sig_number_);
} else if (deferred_sig_number_ == SIGTERM) {
call_signal_handler(old_sigterm_handler_, deferred_sig_number_);
}
}

void play_impl(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
bool burst = false,
size_t burst_num_messages = 0)
{
install_signal_handlers();
try {
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(reader), storage_options, play_options);

rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
player->play();

auto wait_for_exit_thread = std::thread(
[&]() {
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Player::exit_.load();});
player->stop();
});
{
// Release the GIL for long-running play, so that calling Python code
// can use other threads
py::gil_scoped_release release;
if (burst) {
player->burst(burst_num_messages);
}
player->wait_for_playback_to_finish();
}

rosbag2_py::Player::cancel(); // Need to trigger exit from wait_for_exit_thread
if (wait_for_exit_thread.joinable()) {
wait_for_exit_thread.join();
}

exec.cancel();
if (spin_thread.joinable()) {
spin_thread.join();
}
exec.remove_node(player);
} catch (...) {
process_deferred_signal();
uninstall_signal_handlers();
throw;
}
process_deferred_signal();
uninstall_signal_handlers();
}

static std::atomic_bool exit_;
static std::condition_variable wait_for_exit_cv_;
static SignalHandlerType old_sigint_handler_;
static SignalHandlerType old_sigterm_handler_;
static int deferred_sig_number_;
std::mutex wait_for_exit_mutex_;
};

Player::SignalHandlerType Player::old_sigint_handler_ {SIG_ERR};
Player::SignalHandlerType Player::old_sigterm_handler_ {SIG_ERR};
int Player::deferred_sig_number_{-1};
std::atomic_bool Player::exit_{false};
std::condition_variable Player::wait_for_exit_cv_{};

class Recorder
{
public:
<<<<<<< HEAD
explicit Recorder(const std::string & log_level = "info")
=======
using SignalHandlerType = void (*)(int);
Recorder()
>>>>>>> 32bd5c3a (Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (#1557))
{
Arguments arguments({"--ros-args", "--log-level", log_level});
rclcpp::init(arguments.argc(), arguments.argv());
Expand All @@ -238,15 +375,7 @@ class Recorder
RecordOptions & record_options,
std::string & node_name)
{
auto old_sigterm_handler = std::signal(
SIGTERM, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
});
auto old_sigint_handler = std::signal(
SIGINT, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
});

install_signal_handlers();
try {
exit_ = false;
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
Expand Down Expand Up @@ -280,22 +409,12 @@ class Recorder
}
exec->remove_node(recorder);
} catch (...) {
// Return old signal handlers anyway
if (old_sigterm_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler);
}
if (old_sigint_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigint_handler);
}
process_deferred_signal();
uninstall_signal_handlers();
throw;
}
// Return old signal handlers
if (old_sigterm_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler);
}
if (old_sigint_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigint_handler);
}
process_deferred_signal();
uninstall_signal_handlers();
}

static void cancel()
Expand All @@ -305,11 +424,60 @@ class Recorder
}

protected:
static void signal_handler(int sig_num)
{
if (sig_num == SIGINT || sig_num == SIGTERM) {
deferred_sig_number_ = sig_num;
rosbag2_py::Recorder::cancel();
}
}

static void install_signal_handlers()
{
deferred_sig_number_ = -1;
old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Recorder::signal_handler);
old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Recorder::signal_handler);
}

static void uninstall_signal_handlers()
{
if (old_sigterm_handler_ != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler_);
old_sigterm_handler_ = SIG_ERR;
}
if (old_sigint_handler_ != SIG_ERR) {
std::signal(SIGINT, old_sigint_handler_);
old_sigint_handler_ = SIG_ERR;
}
deferred_sig_number_ = -1;
}

static void process_deferred_signal()
{
auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) {
if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) {
signal_handler(sig_num);
}
};

if (deferred_sig_number_ == SIGINT) {
call_signal_handler(old_sigint_handler_, deferred_sig_number_);
} else if (deferred_sig_number_ == SIGTERM) {
call_signal_handler(old_sigterm_handler_, deferred_sig_number_);
}
}

static std::atomic_bool exit_;
static std::condition_variable wait_for_exit_cv_;
static SignalHandlerType old_sigint_handler_;
static SignalHandlerType old_sigterm_handler_;
static int deferred_sig_number_;
std::mutex wait_for_exit_mutex_;
};

Recorder::SignalHandlerType Recorder::old_sigint_handler_ {SIG_ERR};
Recorder::SignalHandlerType Recorder::old_sigterm_handler_ {SIG_ERR};
int Recorder::deferred_sig_number_{-1};
std::atomic_bool Recorder::exit_{false};
std::condition_variable Recorder::wait_for_exit_cv_{};

Expand Down Expand Up @@ -430,7 +598,10 @@ PYBIND11_MODULE(_transport, m) {
.def(py::init<>())
.def(py::init<const std::string &>())
.def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
.def("burst", &rosbag2_py::Player::burst)
.def(
"burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"),
py::arg("num_messages"))
.def_static("cancel", &rosbag2_py::Player::cancel)
;

py::class_<rosbag2_py::Recorder>(m, "Recorder")
Expand Down
Loading

0 comments on commit 9b7eb21

Please sign in to comment.