Skip to content

Commit

Permalink
Implement service play
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Oct 12, 2023
1 parent ce7f6f5 commit cfea3e0
Show file tree
Hide file tree
Showing 17 changed files with 1,011 additions and 89 deletions.
11 changes: 9 additions & 2 deletions ros2bag/ros2bag/verb/burst.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ros2bag.api import add_standard_reader_args
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_positive_float
from ros2bag.api import convert_service_to_service_event_topic
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
from ros2bag.verb import VerbExtension
Expand All @@ -41,8 +42,12 @@ def add_arguments(self, parser, cli_name): # noqa: D102
'delay of message playback.')
parser.add_argument(
'--topics', type=str, default=[], nargs='+',
help='topics to replay, separated by space. If none specified, all topics will be '
'replayed.')
help='topics to replay, separated by space. At least one topic needs to be '
"specified. If this parameter isn\'t specified, all topics will be replayed.")
parser.add_argument(
'--services', type=str, default=[], nargs='+',
help='services to replay, separated by space. At least one service needs to be '
"specified. If this parameter isn\'t specified, all services will be replayed.")
parser.add_argument(
'--qos-profile-overrides-path', type=FileType('r'),
help='Path to a yaml file defining overrides of the QoS profile for specific topics.')
Expand Down Expand Up @@ -90,6 +95,8 @@ def main(self, *, args): # noqa: D102
play_options.node_prefix = NODE_NAME_PREFIX
play_options.rate = 1.0
play_options.topics_to_filter = args.topics
# Convert service name to service event topic name
play_options.services_to_filter = convert_service_to_service_event_topic(args.services)
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = False
play_options.topic_remapping_options = topic_remapping
Expand Down
37 changes: 35 additions & 2 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
from ros2bag.api import add_standard_reader_args
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_positive_float
from ros2bag.api import convert_service_to_service_event_topic
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
from ros2bag.api import print_warn
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import Player
Expand Down Expand Up @@ -51,14 +53,26 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'--topics', type=str, default=[], nargs='+',
help='Space-delimited list of topics to play.')
parser.add_argument(
'--services', type=str, default=[], nargs='+',
help='Space-delimited list of services to play.')
parser.add_argument(
'-e', '--regex', default='',
help='filter topics by regular expression to replay, separated by space. If none '
'specified, all topics will be replayed.')
parser.add_argument(
'-x', '--exclude', default='',
help='regular expressions to exclude topics from replay, separated by space. If none '
'specified, all topics will be replayed. This argument is deprecated and please '
'use --exclude-topics.')
parser.add_argument(
'--exclude-topics', default='',
help='regular expressions to exclude topics from replay, separated by space. If none '
'specified, all topics will be replayed.')
parser.add_argument(
'--exclude-services', default='',
help='regular expressions to exclude services from replay, separated by space. If '
'none specified, all services will be replayed.')
parser.add_argument(
'--qos-profile-overrides-path', type=FileType('r'),
help='Path to a yaml file defining overrides of the QoS profile for specific topics.')
Expand Down Expand Up @@ -163,6 +177,10 @@ def main(self, *, args): # noqa: D102
except (InvalidQoSProfileException, ValueError) as e:
return print_error(str(e))

if args.exclude and args.exclude_topics:
return print_error(str('-x/--exclude and --exclude_topics cannot be used at the '
'same time.'))

storage_config_file = ''
if args.storage_config_file:
storage_config_file = args.storage_config_file.name
Expand All @@ -182,8 +200,23 @@ def main(self, *, args): # noqa: D102
play_options.node_prefix = NODE_NAME_PREFIX
play_options.rate = args.rate
play_options.topics_to_filter = args.topics
play_options.topics_regex_to_filter = args.regex
play_options.topics_regex_to_exclude = args.exclude

# Convert service name to service event topic name
play_options.services_to_filter = convert_service_to_service_event_topic(args.services)

play_options.regex_to_filter = args.regex

if args.exclude:
print(print_warn(str('-x/--exclude argument is deprecated. Please use '
'--exclude-topics.')))
play_options.topics_regex_to_exclude = args.exclude
else:
play_options.topics_regex_to_exclude = args.exclude_topics

if args.exclude_services:
play_options.services_regex_to_exclude = args.exclude_services + '/_service_event'
else:
play_options.services_regex_to_exclude = args.exclude_services
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = args.loop
play_options.topic_remapping_options = topic_remapping
Expand Down
17 changes: 12 additions & 5 deletions rosbag2_py/src/rosbag2_py/_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,22 @@ PYBIND11_MODULE(_storage, m) {

pybind11::class_<rosbag2_storage::StorageFilter>(m, "StorageFilter")
.def(
pybind11::init<std::vector<std::string>, std::string, std::string>(),
pybind11::init<
std::vector<std::string>, std::vector<std::string>, std::string, std::string, std::string>(),
pybind11::arg("topics") = std::vector<std::string>(),
pybind11::arg("topics_regex") = "",
pybind11::arg("topics_regex_to_exclude") = "")
pybind11::arg("services") = std::vector<std::string>(),
pybind11::arg("regex") = "",
pybind11::arg("topics_regex_to_exclude") = "",
pybind11::arg("services_regex_to_exclude") = "")
.def_readwrite("topics", &rosbag2_storage::StorageFilter::topics)
.def_readwrite("topics_regex", &rosbag2_storage::StorageFilter::topics_regex)
.def_readwrite("services", &rosbag2_storage::StorageFilter::services)
.def_readwrite("regex", &rosbag2_storage::StorageFilter::regex)
.def_readwrite(
"topics_regex_to_exclude",
&rosbag2_storage::StorageFilter::topics_regex_to_exclude);
&rosbag2_storage::StorageFilter::topics_regex_to_exclude)
.def_readwrite(
"services_regex_to_exclude",
&rosbag2_storage::StorageFilter::services_regex_to_exclude);

pybind11::class_<rosbag2_storage::MessageDefinition>(m, "MessageDefinition")
.def(
Expand Down
4 changes: 3 additions & 1 deletion rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,10 @@ PYBIND11_MODULE(_transport, m) {
.def_readwrite("node_prefix", &PlayOptions::node_prefix)
.def_readwrite("rate", &PlayOptions::rate)
.def_readwrite("topics_to_filter", &PlayOptions::topics_to_filter)
.def_readwrite("topics_regex_to_filter", &PlayOptions::topics_regex_to_filter)
.def_readwrite("services_to_filter", &PlayOptions::services_to_filter)
.def_readwrite("regex_to_filter", &PlayOptions::regex_to_filter)
.def_readwrite("topics_regex_to_exclude", &PlayOptions::topics_regex_to_exclude)
.def_readwrite("services_regex_to_exclude", &PlayOptions::services_regex_to_exclude)
.def_property(
"topic_qos_profile_overrides",
&PlayOptions::getTopicQoSProfileOverrides,
Expand Down
20 changes: 15 additions & 5 deletions rosbag2_storage/include/rosbag2_storage/storage_filter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,28 @@ struct StorageFilter
{
// Topic names to whitelist when reading a bag. Only messages matching these
// specified topics will be returned. If list is empty, the filter is ignored
// and all messages are returned.
// and all messages of topics are returned.
std::vector<std::string> topics;

// Regular expression of topic names to whitelist when playing a bag.
// Only messages matching these specified topics will be played.
// Service names to whitelist when reading a bag. Only messages matching these
// specified service will be returned. If list is empty, the filter is ignored
// and all messages of services are returned.
std::vector<std::string> services;

// Regular expression of topic names and service name to whitelist when playing a bag.
// Only messages matching these specified topics or services will be played.
// If list is empty, the filter is ignored and all messages are played.
std::string topics_regex = "";
std::string regex = "";

// Regular expression of topic names to exclude when playing a bag.
// Only messages not matching these specified topics will be played.
// If list is empty, the filter is ignored and all messages are played.
// If list is empty, the filter is ignored and all messages of topics are played.
std::string topics_regex_to_exclude = "";

// Regular expression of topic names to exclude when playing a bag.
// Only messages not matching these specified services will be played.
// If list is empty, the filter is ignored and all messages of services are played.
std::string services_regex_to_exclude = "";
};

} // namespace rosbag2_storage
Expand Down
61 changes: 49 additions & 12 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,26 +534,63 @@ void MCAPStorage::reset_iterator()
options.endTime = mcap::MaxTime;
}
options.readOrder = read_order_;
if (!storage_filter_.topics.empty()) {
options.topicFilter = [this](std::string_view topic) {

auto filter_process = [this](std::string_view topic) {
if (!storage_filter_.topics.empty()) {
for (const auto & match_topic : storage_filter_.topics) {
if (match_topic == topic) {
return true;
}
}
return false;
};
}
}

if (!storage_filter_.services.empty()) {
for (const auto & match_service : storage_filter_.services) {
if (match_service == topic) {
return true;
}
}
}

bool topics_regex_to_exclude_match = false;
bool services_regex_to_exclude_match = false;
std::string topic_string(topic);

if (!storage_filter_.topics_regex_to_exclude.empty()) {
std::smatch m;
std::regex re(storage_filter_.topics_regex_to_exclude);
topics_regex_to_exclude_match = std::regex_match(topic_string, m, re);
}

if (!storage_filter_.services_regex_to_exclude.empty()) {
std::smatch m;
std::regex re(storage_filter_.services_regex_to_exclude);
services_regex_to_exclude_match = std::regex_match(topic_string, m, re);
}

#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_FILTER_TOPIC_REGEX
if (!storage_filter_.topics_regex.empty()) {
options.topicFilter = [this](std::string_view topic) {
if (!storage_filter_.regex.empty()) {
std::smatch m;
std::string topic_string(topic);
std::regex re(storage_filter_.topics_regex);
return std::regex_match(topic_string, m, re);
};
}
std::regex re(storage_filter_.regex);

if (std::regex_match(topic_string, m, re) && !topics_regex_to_exclude_match &&
!services_regex_to_exclude_match) {
return true;
} else {
return false;
}
}
#endif

if ((storage_filter_.topics.empty() && !topics_regex_to_exclude_match) &&
(storage_filter_.services.empty() && !services_regex_to_exclude_match)) {
return true;
}

return false;
};
options.topicFilter = filter_process;

linear_view_ =
std::make_unique<mcap::LinearMessageView>(mcap_reader_->readMessages(OnProblem, options));
linear_iterator_ = std::make_unique<mcap::LinearMessageView::Iterator>(linear_view_->begin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ void SqliteStorage::prepare_for_reading()
"FROM messages JOIN topics ON messages.topic_id = topics.id WHERE ";
std::vector<std::string> where_conditions;

std::string topic_and_service_list;
// add topic filter
if (!storage_filter_.topics.empty()) {
// Construct string for selected topics
Expand All @@ -531,13 +532,39 @@ void SqliteStorage::prepare_for_reading()
topic_list += ",";
}
}
where_conditions.push_back("(topics.name IN (" + topic_list + "))");
topic_and_service_list = "(topics.name IN (" + topic_list + "))";
}
// add topic filter based on regular expression
if (!storage_filter_.topics_regex.empty()) {

// add service filter
if (!storage_filter_.services.empty()) {
// Construct string for selected topics
where_conditions.push_back("(topics.name REGEXP '" + storage_filter_.topics_regex + "')");
std::string service_list{""};
for (auto & service : storage_filter_.services) {
service_list += "'" + service + "'";
if (&service != &storage_filter_.topics.back()) {
service_list += ",";
}
}

topic_and_service_list = topic_and_service_list +
std::string(topic_and_service_list.empty() ? "" : " OR ") +
"(topics.name IN (" + service_list + "))";
}

std::string list_and_regex = topic_and_service_list;
// add topic filter based on regular expression
if (!storage_filter_.regex.empty()) {
std::string regex = "(topics.name REGEXP '" + storage_filter_.regex + "')";
list_and_regex = list_and_regex +
std::string(!list_and_regex.empty() ? " OR " : "") +
regex;
}

if (!list_and_regex.empty()) {
where_conditions.push_back(list_and_regex);
}

std::string exclude_topics_services;
// exclude topics based on regular expressions
if (!storage_filter_.topics_regex_to_exclude.empty()) {
// Construct string for selected topics
Expand All @@ -546,6 +573,14 @@ void SqliteStorage::prepare_for_reading()
"(SELECT topics.name FROM topics WHERE topics.name REGEXP '" +
storage_filter_.topics_regex_to_exclude + "'))");
}
// exclude service based on regular expressions
if (!storage_filter_.services_regex_to_exclude.empty()) {
// Construct string for selected topics
where_conditions.push_back(
"(topics.name NOT IN "
"(SELECT topics.name FROM topics WHERE topics.name REGEXP '" +
storage_filter_.services_regex_to_exclude + "'))");
}

const std::string direction_op = read_order_.reverse ? "<" : ">";
const std::string order_direction = read_order_.reverse ? "DESC" : "ASC";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ TEST_F(StorageTestFixture, read_next_returns_filtered_messages_regex) {
readable_storage->open({db_filename, kPluginID});

rosbag2_storage::StorageFilter storage_filter;
storage_filter.topics_regex = "topic.*";
storage_filter.regex = "topic.*";
readable_storage->set_filter(storage_filter);

EXPECT_TRUE(readable_storage->has_next());
Expand Down
18 changes: 14 additions & 4 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,29 @@ struct PlayOptions

// Topic names to whitelist when playing a bag.
// Only messages matching these specified topics will be played.
// If list is empty, the filter is ignored and all messages are played.
// If list is empty, the filter is ignored and all messages of topics are played.
std::vector<std::string> topics_to_filter = {};

// Regular expression of topic names to whitelist when playing a bag.
// Only messages matching these specified topics will be played.
// Service names to whitelist when playing a bag.
// Only messages matching these specified service will be played.
// If list is empty, the filter is ignored and all messages of services are played.
std::vector<std::string> services_to_filter = {};

// Regular expression of topic names and service name to whitelist when playing a bag.
// Only messages matching these specified topics and services will be played.
// If list is empty, the filter is ignored and all messages are played.
std::string topics_regex_to_filter = "";
std::string regex_to_filter = "";

// Regular expression of topic names to exclude when playing a bag.
// Only messages not matching these specified topics will be played.
// If list is empty, the filter is ignored and all messages are played.
std::string topics_regex_to_exclude = "";

// Regular expression of service names to exclude when playing a bag.
// Only messages not matching these specified topics will be played.
// If list is empty, the filter is ignored and all messages are played.
std::string services_regex_to_exclude = "";

std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides = {};
bool loop = false;
std::vector<std::string> topic_remapping_options = {};
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

#include "rosgraph_msgs/msg/clock.hpp"


namespace rosbag2_cpp
{
class Reader;
Expand Down Expand Up @@ -208,6 +209,11 @@ class Player : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericPublisher>> get_publishers();

/// \brief Getter for clients corresponding to each service name
/// \return Hashtable representing service name to client
ROSBAG2_TRANSPORT_PUBLIC
std::unordered_map<std::string, std::shared_ptr<rclcpp::GenericClient>> get_clients();

/// \brief Getter for inner clock_publisher
/// \return Shared pointer to the inner clock_publisher
ROSBAG2_TRANSPORT_PUBLIC
Expand Down
Loading

0 comments on commit cfea3e0

Please sign in to comment.