Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[iron] Gracefully handle SIGINT and SIGTERM signals for play and burst CLI (backport #1557) #1690

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 189 additions & 57 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ namespace rosbag2_py
class Player
{
public:
using SignalHandlerType = void (*)(int);

explicit Player(const std::string & log_level = "info")
{
Arguments arguments({"--ros-args", "--log-level", log_level});
Expand All @@ -172,56 +174,152 @@ class Player
rclcpp::shutdown();
}

static void cancel()
{
exit_ = true;
wait_for_exit_cv_.notify_all();
}

void play(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options)
{
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(reader), storage_options, play_options);

rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
player->play();

exec.cancel();
spin_thread.join();
play_impl(storage_options, play_options, false);
}

void burst(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
size_t num_messages)
{
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(reader), storage_options, play_options);

rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
auto play_thread = std::thread(
[&player]() {
player->play();
});
player->burst(num_messages);

exec.cancel();
spin_thread.join();
play_thread.join();
play_impl(storage_options, play_options, true, num_messages);
}

protected:
static void signal_handler(int sig_num)
{
if (sig_num == SIGINT || sig_num == SIGTERM) {
deferred_sig_number_ = sig_num;
rosbag2_py::Player::cancel();
}
}

static void install_signal_handlers()
{
deferred_sig_number_ = -1;
old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Player::signal_handler);
old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Player::signal_handler);
}

static void uninstall_signal_handlers()
{
if (old_sigterm_handler_ != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler_);
old_sigterm_handler_ = SIG_ERR;
}
if (old_sigint_handler_ != SIG_ERR) {
std::signal(SIGINT, old_sigint_handler_);
old_sigint_handler_ = SIG_ERR;
}
deferred_sig_number_ = -1;
}

static void process_deferred_signal()
{
auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) {
if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) {
signal_handler(sig_num);
}
};

if (deferred_sig_number_ == SIGINT) {
call_signal_handler(old_sigint_handler_, deferred_sig_number_);
} else if (deferred_sig_number_ == SIGTERM) {
call_signal_handler(old_sigterm_handler_, deferred_sig_number_);
}
}

void play_impl(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
bool burst = false,
size_t burst_num_messages = 0)
{
exit_ = false;
install_signal_handlers();
try {
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(reader), storage_options, play_options);

rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});

auto wait_for_exit_thread = std::thread(
[&]() {
std::unique_lock<std::mutex> lock(wait_for_exit_mutex_);
wait_for_exit_cv_.wait(lock, [] {return rosbag2_py::Player::exit_.load();});
player->stop();
});
{
// Release the GIL for long-running play, so that calling Python code
// can use other threads
py::gil_scoped_release release;
if (burst) {
auto play_thread = std::thread(
[&player]() {
player->play();
});
player->burst(burst_num_messages);
rosbag2_py::Player::cancel(); // Need to trigger exit from wait_for_exit_thread
play_thread.join();
} else {
player->play();
rosbag2_py::Player::cancel(); // Need to trigger exit from wait_for_exit_thread
}
}

if (wait_for_exit_thread.joinable()) {
wait_for_exit_thread.join();
}

exec.cancel();
if (spin_thread.joinable()) {
spin_thread.join();
}
exec.remove_node(player);
} catch (...) {
process_deferred_signal();
uninstall_signal_handlers();
throw;
}
process_deferred_signal();
uninstall_signal_handlers();
}

static std::atomic_bool exit_;
static std::condition_variable wait_for_exit_cv_;
static SignalHandlerType old_sigint_handler_;
static SignalHandlerType old_sigterm_handler_;
static int deferred_sig_number_;
std::mutex wait_for_exit_mutex_;
};

Player::SignalHandlerType Player::old_sigint_handler_ {SIG_ERR};
Player::SignalHandlerType Player::old_sigterm_handler_ {SIG_ERR};
int Player::deferred_sig_number_{-1};
std::atomic_bool Player::exit_{false};
std::condition_variable Player::wait_for_exit_cv_{};

class Recorder
{
public:
using SignalHandlerType = void (*)(int);

explicit Recorder(const std::string & log_level = "info")
{
Arguments arguments({"--ros-args", "--log-level", log_level});
Expand All @@ -238,15 +336,7 @@ class Recorder
RecordOptions & record_options,
std::string & node_name)
{
auto old_sigterm_handler = std::signal(
SIGTERM, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
});
auto old_sigint_handler = std::signal(
SIGINT, [](int /* signal */) {
rosbag2_py::Recorder::cancel();
});

install_signal_handlers();
try {
exit_ = false;
auto exec = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
Expand Down Expand Up @@ -280,22 +370,12 @@ class Recorder
}
exec->remove_node(recorder);
} catch (...) {
// Return old signal handlers anyway
if (old_sigterm_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler);
}
if (old_sigint_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigint_handler);
}
process_deferred_signal();
uninstall_signal_handlers();
throw;
}
// Return old signal handlers
if (old_sigterm_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler);
}
if (old_sigint_handler != SIG_ERR) {
std::signal(SIGTERM, old_sigint_handler);
}
process_deferred_signal();
uninstall_signal_handlers();
}

static void cancel()
Expand All @@ -305,11 +385,60 @@ class Recorder
}

protected:
static void signal_handler(int sig_num)
{
if (sig_num == SIGINT || sig_num == SIGTERM) {
deferred_sig_number_ = sig_num;
rosbag2_py::Recorder::cancel();
}
}

static void install_signal_handlers()
{
deferred_sig_number_ = -1;
old_sigterm_handler_ = std::signal(SIGTERM, &rosbag2_py::Recorder::signal_handler);
old_sigint_handler_ = std::signal(SIGINT, &rosbag2_py::Recorder::signal_handler);
}

static void uninstall_signal_handlers()
{
if (old_sigterm_handler_ != SIG_ERR) {
std::signal(SIGTERM, old_sigterm_handler_);
old_sigterm_handler_ = SIG_ERR;
}
if (old_sigint_handler_ != SIG_ERR) {
std::signal(SIGINT, old_sigint_handler_);
old_sigint_handler_ = SIG_ERR;
}
deferred_sig_number_ = -1;
}

static void process_deferred_signal()
{
auto call_signal_handler = [](const SignalHandlerType & signal_handler, int sig_num) {
if (signal_handler != SIG_ERR && signal_handler != SIG_IGN && signal_handler != SIG_DFL) {
signal_handler(sig_num);
}
};

if (deferred_sig_number_ == SIGINT) {
call_signal_handler(old_sigint_handler_, deferred_sig_number_);
} else if (deferred_sig_number_ == SIGTERM) {
call_signal_handler(old_sigterm_handler_, deferred_sig_number_);
}
}

static std::atomic_bool exit_;
static std::condition_variable wait_for_exit_cv_;
static SignalHandlerType old_sigint_handler_;
static SignalHandlerType old_sigterm_handler_;
static int deferred_sig_number_;
std::mutex wait_for_exit_mutex_;
};

Recorder::SignalHandlerType Recorder::old_sigint_handler_ {SIG_ERR};
Recorder::SignalHandlerType Recorder::old_sigterm_handler_ {SIG_ERR};
int Recorder::deferred_sig_number_{-1};
std::atomic_bool Recorder::exit_{false};
std::condition_variable Recorder::wait_for_exit_cv_{};

Expand Down Expand Up @@ -430,7 +559,10 @@ PYBIND11_MODULE(_transport, m) {
.def(py::init<>())
.def(py::init<const std::string &>())
.def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
.def("burst", &rosbag2_py::Player::burst)
.def(
"burst", &rosbag2_py::Player::burst, py::arg("storage_options"), py::arg("play_options"),
py::arg("num_messages"))
.def_static("cancel", &rosbag2_py::Player::cancel)
;

py::class_<rosbag2_py::Recorder>(m, "Recorder")
Expand Down
41 changes: 41 additions & 0 deletions rosbag2_py/test/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

import datetime
from pathlib import Path
import re
import threading

from common import get_rosbag_options, wait_for
Expand All @@ -26,6 +28,10 @@
from std_msgs.msg import String


RESOURCES_PATH = Path(__file__).parent / 'resources'
PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING = r'\[rosbag2_player]: Playback until timestamp: -1'


def test_options_qos_conversion():
# Tests that the to-and-from C++ conversions are working properly in the pybind structs
simple_overrides = {
Expand Down Expand Up @@ -106,3 +112,38 @@ def test_record_cancel(tmp_path, storage_id):
storage_path = bag_path / metadata.relative_file_paths[0]
assert wait_for(lambda: storage_path.is_file(),
timeout=rclpy.duration.Duration(seconds=3))


@pytest.mark.parametrize('storage_id', TESTED_STORAGE_IDS)
def test_play_cancel(storage_id, capfd):
bag_path = str(RESOURCES_PATH / storage_id / 'talker')
storage_options, converter_options = get_rosbag_options(bag_path, storage_id)

player = rosbag2_py.Player()

play_options = rosbag2_py.PlayOptions()
play_options.loop = True
play_options.start_paused = True

player_thread = threading.Thread(
target=player.play,
args=(storage_options, play_options),
daemon=True)
player_thread.start()

def check_playback_start_output(cumulative_out, cumulative_err):
out, err = capfd.readouterr()
cumulative_err += err
cumulative_out += out
expected_string_regex = re.compile(PLAYBACK_UNTIL_TIMESTAMP_REGEX_STRING)
matches = expected_string_regex.search(cumulative_err)
return matches is not None

captured_out = ''
captured_err = ''
assert wait_for(lambda: check_playback_start_output(captured_out, captured_err),
timeout=rclpy.duration.Duration(seconds=3))

player.cancel()
player_thread.join(3)
assert not player_thread.is_alive()
4 changes: 3 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ bool Player::play_next()
return false;
}

RCLCPP_INFO_STREAM(get_logger(), "Playing next message.");
// Use RCLCPP_DEBUG_STREAM to avoid delays in the burst mode
RCLCPP_DEBUG_STREAM(get_logger(), "Playing next message.");

// Temporary take over playback from play_messages_from_queue()
std::lock_guard<std::mutex> main_play_loop_lk(skip_message_in_main_play_loop_mutex_);
Expand Down Expand Up @@ -434,6 +435,7 @@ size_t Player::burst(const size_t num_messages)
}
}

RCLCPP_INFO_STREAM(get_logger(), "Burst " << messages_played << " messages.");
return messages_played;
}

Expand Down
Loading