diff --git a/rosbag2/CMakeLists.txt b/rosbag2/CMakeLists.txt index 36f8f3a41c..b8fcd949ea 100644 --- a/rosbag2/CMakeLists.txt +++ b/rosbag2/CMakeLists.txt @@ -75,13 +75,12 @@ if(BUILD_TESTING) find_package(ament_cmake_gtest REQUIRED) find_package(ament_cmake_gmock REQUIRED) find_package(ament_lint_auto REQUIRED) + find_package(test_msgs REQUIRED) ament_lint_auto_find_test_dependencies() ament_add_gmock(rosbag2_write_integration_test test/rosbag2/rosbag2_write_integration_test.cpp - test/rosbag2/rosbag2_test_fixture.hpp - test/rosbag2/test_helpers.hpp - src/rosbag2/typesupport_helpers.cpp + test/rosbag2/test_memory_management.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) if(TARGET rosbag2_write_integration_test) target_link_libraries(rosbag2_write_integration_test librosbag2) @@ -89,12 +88,13 @@ if(BUILD_TESTING) ament_add_gmock(rosbag2_read_integration_test test/rosbag2/rosbag2_read_integration_test.cpp - test/rosbag2/rosbag2_test_fixture.hpp - test/rosbag2/test_helpers.hpp - src/rosbag2/typesupport_helpers.cpp + test/rosbag2/test_memory_management.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) if(TARGET rosbag2_read_integration_test) target_link_libraries(rosbag2_read_integration_test librosbag2) + ament_target_dependencies(rosbag2_read_integration_test + test_msgs + ) endif() ament_add_gmock(rosbag2_typesupport_helpers_test @@ -102,13 +102,16 @@ if(BUILD_TESTING) src/rosbag2/typesupport_helpers.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) if(TARGET rosbag2_typesupport_helpers_test) - ament_target_dependencies(rosbag2_typesupport_helpers_test rcl Poco ament_index_cpp - rosidl_generator_cpp) + ament_target_dependencies(rosbag2_typesupport_helpers_test + ament_index_cpp + Poco + rcl + rosidl_generator_cpp + ) endif() ament_add_gmock(rosbag2_integration_test test/rosbag2/rosbag2_integration_test.cpp - test/rosbag2/test_helpers.hpp src/rosbag2/typesupport_helpers.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) if(TARGET rosbag2_integration_test) @@ -117,14 +120,19 @@ if(BUILD_TESTING) ament_add_gmock(rosbag2_rosbag_node_test test/rosbag2/rosbag2_rosbag_node_test.cpp - src/rosbag2/typesupport_helpers.cpp - src/rosbag2/generic_subscription.cpp + test/rosbag2/test_memory_management.cpp src/rosbag2/generic_publisher.cpp + src/rosbag2/generic_subscription.cpp src/rosbag2/rosbag2_node.cpp - test/rosbag2/test_helpers.hpp + src/rosbag2/typesupport_helpers.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) if(TARGET rosbag2_rosbag_node_test) - ament_target_dependencies(rosbag2_rosbag_node_test rclcpp Poco ament_index_cpp std_msgs) + ament_target_dependencies(rosbag2_rosbag_node_test + ament_index_cpp + Poco + rclcpp + std_msgs + ) endif() endif() diff --git a/rosbag2/include/rosbag2/rosbag2.hpp b/rosbag2/include/rosbag2/rosbag2.hpp index 59b37e9c96..14ced520ad 100644 --- a/rosbag2/include/rosbag2/rosbag2.hpp +++ b/rosbag2/include/rosbag2/rosbag2.hpp @@ -16,37 +16,54 @@ #define ROSBAG2__ROSBAG2_HPP_ #include +#include #include #include +#include #include "rclcpp/rclcpp.hpp" #include "rosbag2_storage/storage_interfaces/read_only_interface.hpp" +#include "rosbag2_storage/storage_interfaces/read_write_interface.hpp" #include "rosbag2/visibility_control.hpp" namespace rosbag2 { +class GenericPublisher; +class GenericSubscription; +class Rosbag2Node; + class Rosbag2 { public: ROSBAG2_PUBLIC void record( const std::string & file_name, - const std::string & topic_name, - std::function after_write_action = nullptr); + std::vector topic_names, + std::function after_write_action = nullptr); ROSBAG2_PUBLIC - void play(const std::string & file_name, const std::string & topic_name); + void play(const std::string & file_name); ROSBAG2_PUBLIC - std::string get_topic_type( - const std::string & topic_name, const std::shared_ptr & node); + std::map get_topics_with_types( + const std::vector & topic_names, std::shared_ptr node); - ROSBAG2_PUBLIC - std::string get_topic_type( - std::shared_ptr storage, - const std::string & topic); +private: + void prepare_publishers( + std::shared_ptr node, + std::shared_ptr storage); + + std::shared_ptr + create_subscription( + const std::function & after_write_action, + std::shared_ptr storage, + std::shared_ptr & node, + const std::string & topic_name, const std::string & topic_type) const; + + std::vector> subscriptions_; + std::map> publishers_; }; } // namespace rosbag2 diff --git a/rosbag2/package.xml b/rosbag2/package.xml index efa532f506..e99bcd2202 100644 --- a/rosbag2/package.xml +++ b/rosbag2/package.xml @@ -21,6 +21,7 @@ ament_lint_auto ament_lint_common + test_msgs ament_cmake diff --git a/rosbag2/src/rosbag2/demo_play.cpp b/rosbag2/src/rosbag2/demo_play.cpp index b758e15f77..52a3a13d1b 100644 --- a/rosbag2/src/rosbag2/demo_play.cpp +++ b/rosbag2/src/rosbag2/demo_play.cpp @@ -20,16 +20,10 @@ int main(int argc, const char ** argv) { - if (argc < 2) { - std::cerr << "\nThe name of the topic to play to must be given as parameter!\n"; - return 0; - } - std::string topic_name = argv[1]; - rclcpp::init(argc, argv); rosbag2::Rosbag2 rosbag2; - rosbag2.play("test.bag", topic_name); + rosbag2.play("test.bag"); rclcpp::shutdown(); diff --git a/rosbag2/src/rosbag2/demo_record.cpp b/rosbag2/src/rosbag2/demo_record.cpp index 5b64dd7787..314f9c6e3e 100644 --- a/rosbag2/src/rosbag2/demo_record.cpp +++ b/rosbag2/src/rosbag2/demo_record.cpp @@ -14,6 +14,7 @@ #include #include +#include #include "rclcpp/rclcpp.hpp" @@ -22,10 +23,14 @@ int main(int argc, const char ** argv) { if (argc < 2) { - std::cerr << "\nThe name of the topic to record must be given as parameter!\n"; + std::cerr << "\nThe names of topics to record must be given as parameter!\n"; return 0; } - std::string topic_name = argv[1]; + std::vector topics; + + for (int i = 1; i < argc; i++) { + topics.emplace_back(argv[i]); + } // TODO(anhosi): allow output file to be specified by cli argument and do proper checking if // file already exists @@ -35,7 +40,7 @@ int main(int argc, const char ** argv) rclcpp::init(argc, argv); rosbag2::Rosbag2 rosbag2; - rosbag2.record(filename, topic_name); + rosbag2.record(filename, topics); rclcpp::shutdown(); diff --git a/rosbag2/src/rosbag2/generic_subscription.cpp b/rosbag2/src/rosbag2/generic_subscription.cpp index 501b5bd0c5..7e0e8b26f0 100644 --- a/rosbag2/src/rosbag2/generic_subscription.cpp +++ b/rosbag2/src/rosbag2/generic_subscription.cpp @@ -48,7 +48,6 @@ std::shared_ptr GenericSubscription::create_serialized return borrow_serialized_message(0); } - void GenericSubscription::handle_message( std::shared_ptr & message, const rmw_message_info_t & message_info) { diff --git a/rosbag2/src/rosbag2/rosbag2.cpp b/rosbag2/src/rosbag2/rosbag2.cpp index 63ebcd0322..7359c91391 100644 --- a/rosbag2/src/rosbag2/rosbag2.cpp +++ b/rosbag2/src/rosbag2/rosbag2.cpp @@ -15,12 +15,13 @@ #include "rosbag2/rosbag2.hpp" #include +#include #include #include +#include -#include "rclcpp/rclcpp.hpp" #include "rcl/graph.h" - +#include "rclcpp/rclcpp.hpp" #include "rcutils/logging_macros.h" #include "rosbag2_storage/serialized_bag_message.hpp" @@ -39,128 +40,141 @@ namespace rosbag2 const char * ROS_PACKAGE_NAME = "rosbag2"; -std::string Rosbag2::get_topic_type( - const std::string & topic_name, const std::shared_ptr & node) -{ - // TODO(Martin-Idel-SI): This is a short sleep to allow the node some time to discover the topic - // This should be replaced by an auto-discovery system in the future - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - auto topics = node->get_topic_names_and_types(); - std::string complete_topic_name = topic_name; - if (topic_name[0] != '/') { - complete_topic_name = "/" + topic_name; - } - auto position = topics.find(complete_topic_name); - if (position != topics.end()) { - if (position->second.size() > 1) { - RCUTILS_LOG_ERROR_NAMED( - ROS_PACKAGE_NAME, - "Topic '%s' has several types associated. Only ROS topics are supported.", - position->first.c_str()); - return ""; - } - return position->second[0]; - } - return ""; -} - -std::string Rosbag2::get_topic_type( - std::shared_ptr storage, - const std::string & topic) -{ - auto all_topics_and_types = storage->get_all_topics_and_types(); - auto map_iterator_to_topic = all_topics_and_types.find(topic); - - if (map_iterator_to_topic == all_topics_and_types.end()) { - RCUTILS_LOG_ERROR_NAMED( - ROS_PACKAGE_NAME, - "No messages with topic '%s' in bag file '%s'. Messages cannot be played.", - topic.c_str(), storage->info().uri.c_str()); - return ""; - } else if (map_iterator_to_topic->second.empty()) { - RCUTILS_LOG_ERROR_NAMED( - ROS_PACKAGE_NAME, - "No type specified for topic '%s' in bag file '%s'. Messages cannot be played.", - topic.c_str(), storage->info().uri.c_str()); - } - - return map_iterator_to_topic->second; -} - void Rosbag2::record( const std::string & file_name, - const std::string & topic_name, - std::function after_write_action) + std::vector topic_names, + std::function after_write_action) { rosbag2_storage::StorageFactory factory; auto storage = factory.open_read_write(file_name, "sqlite3"); - if (storage) { - RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "Waiting for messages..."); + if (!storage) { + throw std::runtime_error("No storage could be initialized. Abort"); + return; + } - auto node = std::make_shared("rosbag2"); + auto node = std::make_shared("rosbag2"); - std::string type = get_topic_type(topic_name, node); + auto topics_and_types = get_topics_with_types(topic_names, node); - if (type.empty()) { - throw std::runtime_error(" Topic could not be found. Abort"); - } + if (topics_and_types.empty()) { + throw std::runtime_error("No topics found. Abort"); + } + + for (const auto & topic_and_type : topics_and_types) { + auto topic_name = topic_and_type.first; + auto topic_type = topic_and_type.second; - auto subscription = node->create_generic_subscription( - topic_name, - type, - [storage, topic_name, after_write_action](std::shared_ptr message) { - auto bag_message = std::make_shared(); - bag_message->serialized_data = message; - bag_message->topic_name = topic_name; - rcutils_time_point_value_t time_stamp; - int error = rcutils_system_time_now(&time_stamp); - if (error != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - ROS_PACKAGE_NAME, - "Error getting current time. Error: %s", rcutils_get_error_string_safe()); - } - bag_message->time_stamp = time_stamp; - - storage->write(bag_message); - if (after_write_action) { - after_write_action(); - } - }); - - if (!subscription) { - return; + std::shared_ptr subscription = create_subscription( + after_write_action, storage, node, topic_name, topic_type); + + if (subscription) { + subscriptions_.push_back(subscription); + + storage->create_topic(topic_name, topic_type); } + } - storage->create_topic(topic_name, type); + if (subscriptions_.empty()) { + throw std::runtime_error("No topics could be subscribed. Abort"); + return; + } + + RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "Waiting for messages..."); + while (rclcpp::ok()) { + rclcpp::spin(node); + } + subscriptions_.clear(); +} - while (rclcpp::ok()) { - rclcpp::spin(node); +std::map Rosbag2::get_topics_with_types( + const std::vector & topic_names, std::shared_ptr node) +{ + // TODO(Martin-Idel-SI): This is a short sleep to allow the node some time to discover the topic + // This should be replaced by an auto-discovery system in the future + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto topics = node->get_topic_names_and_types(); + + std::map topic_names_and_types; + for (const auto & topic_name : topic_names) { + std::string complete_topic_name = topic_name; + if (topic_name[0] != '/') { + complete_topic_name = "/" + topic_name; + } + auto position = topics.find(complete_topic_name); + if (position != topics.end()) { + if (position->second.size() > 1) { + RCUTILS_LOG_ERROR_NAMED( + ROS_PACKAGE_NAME, + "Topic '%s' has several types associated. Only topics with one type are supported.", + position->first.c_str()); + } else { + topic_names_and_types.insert({position->first, position->second[0]}); + } } } + return topic_names_and_types; +} + +std::shared_ptr +Rosbag2::create_subscription( + const std::function & after_write_action, + std::shared_ptr storage, + std::shared_ptr & node, + const std::string & topic_name, const std::string & topic_type) const +{ + auto subscription = node->create_generic_subscription( + topic_name, + topic_type, + [storage, topic_name, after_write_action](std::shared_ptr message) { + auto bag_message = std::make_shared(); + bag_message->serialized_data = message; + bag_message->topic_name = topic_name; + rcutils_time_point_value_t time_stamp; + int error = rcutils_system_time_now(&time_stamp); + if (error != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + ROS_PACKAGE_NAME, + "Error getting current time. Error: %s", rcutils_get_error_string_safe()); + } + bag_message->time_stamp = time_stamp; + + storage->write(bag_message); + if (after_write_action) { + after_write_action(topic_name); + } + }); + return subscription; } -void Rosbag2::play(const std::string & file_name, const std::string & topic_name) +void Rosbag2::play(const std::string & file_name) { rosbag2_storage::StorageFactory factory; auto storage = factory.open_read_only(file_name, "sqlite3"); if (storage) { - std::string type_name = get_topic_type(storage, topic_name); - if (type_name.empty()) { - return; - } auto node = std::make_shared("rosbag2_node"); - auto publisher = node->create_generic_publisher(topic_name, type_name); + prepare_publishers(node, storage); while (storage->has_next()) { auto message = storage->read_next(); // without the sleep_for() many messages are lost. std::this_thread::sleep_for(std::chrono::milliseconds(50)); - publisher->publish(message->serialized_data); + publishers_[message->topic_name]->publish(message->serialized_data); RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "published message"); } } } +void Rosbag2::prepare_publishers( + std::shared_ptr node, + std::shared_ptr storage) +{ + auto all_topics_and_types = storage->get_all_topics_and_types(); + for (const auto & element : all_topics_and_types) { + publishers_.insert(std::make_pair( + element.first, node->create_generic_publisher(element.first, element.second))); + } +} + } // namespace rosbag2 diff --git a/rosbag2/src/rosbag2/rosbag2_node.hpp b/rosbag2/src/rosbag2/rosbag2_node.hpp index fc2083ea21..f25c71f13b 100644 --- a/rosbag2/src/rosbag2/rosbag2_node.hpp +++ b/rosbag2/src/rosbag2/rosbag2_node.hpp @@ -19,8 +19,7 @@ #include #include "rclcpp/node.hpp" -#include "rclcpp/rclcpp.hpp" -#include "rcl/graph.h" +#include "rcutils/types.h" #include "generic_publisher.hpp" #include "generic_subscription.hpp" diff --git a/rosbag2/test/rosbag2/rosbag2_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_integration_test.cpp index 865fa13a33..d5e993a572 100644 --- a/rosbag2/test/rosbag2/rosbag2_integration_test.cpp +++ b/rosbag2/test/rosbag2/rosbag2_integration_test.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include "rclcpp/rclcpp.hpp" #include "rosbag2/rosbag2.hpp" @@ -32,18 +33,14 @@ class RosBag2IntegrationFixture : public Test RosBag2IntegrationFixture() { node_ = std::make_shared("rosbag2"); + publisher_node_ = std::make_shared("publisher_node"); rosbag2_ = rosbag2::Rosbag2(); } - void SetUp() override + void create_publisher(std::string topic) { - auto publisher_node = std::make_shared("publisher_node"); - auto publisher = publisher_node->create_publisher("string_topic"); - future_ = std::async(std::launch::async, [publisher] { - auto message = std_msgs::msg::String(); - message.data = "string message"; - publisher->publish(message); - }); + auto publisher = publisher_node_->create_publisher(topic); + publishers_.push_back(publisher); } static void SetUpTestCase() @@ -56,34 +53,55 @@ class RosBag2IntegrationFixture : public Test rclcpp::shutdown(); } - void TearDown() override - { - future_.get(); - } - rosbag2::Rosbag2 rosbag2_; std::shared_ptr node_; - std::future future_; + std::shared_ptr publisher_node_; + std::vector> publishers_; }; TEST_F(RosBag2IntegrationFixture, get_topic_returns_with_topic_string_if_topic_is_specified_without_slash) { - std::string type = rosbag2_.get_topic_type("string_topic", node_); + create_publisher("string_topic"); - EXPECT_THAT(type, StrEq("std_msgs/String")); + auto topics_and_types = rosbag2_.get_topics_with_types({"string_topic"}, node_); + + ASSERT_THAT(topics_and_types, SizeIs(1)); + EXPECT_THAT(topics_and_types.begin()->second, StrEq("std_msgs/String")); } TEST_F(RosBag2IntegrationFixture, get_topic_returns_with_topic_string_if_topic_is_specified_with_slash) { - std::string type = rosbag2_.get_topic_type("/string_topic", node_); + create_publisher("string_topic"); + + auto topics_and_types = rosbag2_.get_topics_with_types({"/string_topic"}, node_); + + ASSERT_THAT(topics_and_types, SizeIs(1)); + EXPECT_THAT(topics_and_types.begin()->second, StrEq("std_msgs/String")); +} + +TEST_F(RosBag2IntegrationFixture, returns_multiple_topics_for_multiple_inputs) +{ + std::string first_topic("/string_topic"); + std::string second_topic("/other_topic"); + std::string third_topic("/wrong_topic"); - EXPECT_THAT(type, StrEq("std_msgs/String")); + create_publisher(first_topic); + create_publisher(second_topic); + create_publisher(third_topic); + + auto topics_and_types = rosbag2_.get_topics_with_types({first_topic, second_topic}, node_); + + ASSERT_THAT(topics_and_types, SizeIs(2)); + EXPECT_THAT(topics_and_types.find(first_topic)->second, StrEq("std_msgs/String")); + EXPECT_THAT(topics_and_types.find(second_topic)->second, StrEq("std_msgs/String")); } TEST_F(RosBag2IntegrationFixture, get_topic_returns_empty_if_topic_does_not_exist) { - std::string type = rosbag2_.get_topic_type("/wrong_topic", node_); + create_publisher("string_topic"); + + auto topics_and_types = rosbag2_.get_topics_with_types({"/wrong_topic"}, node_); - EXPECT_THAT(type, IsEmpty()); + ASSERT_THAT(topics_and_types, IsEmpty()); } diff --git a/rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp index dda2c5b679..7243e8d4cd 100644 --- a/rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp +++ b/rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp @@ -14,16 +14,18 @@ #include -#include +#include #include +#include #include #include #include -#include #include "rclcpp/rclcpp.hpp" #include "rosbag2/rosbag2.hpp" -#include "std_msgs/msg/string.hpp" +#include "test_msgs/msg/primitives.hpp" +#include "test_msgs/msg/static_array_primitives.hpp" +#include "test_msgs/message_fixtures.hpp" #include "rosbag2_test_fixture.hpp" @@ -36,62 +38,120 @@ class RosBag2IntegrationTestFixture : public Rosbag2TestFixture { public: RosBag2IntegrationTestFixture() - : Rosbag2TestFixture(), counter_(0) - {} + : Rosbag2TestFixture(), messages_stored_counter_(0) + { + rclcpp::init(0, nullptr); + } + + ~RosBag2IntegrationTestFixture() override + { + rclcpp::shutdown(); + } - std::vector subscribe_messages(size_t expected_messages_number) + template + auto subscribe_messages( + size_t expected_messages_number, const std::string & topic) { - std::vector messages; - auto node = std::make_shared("subscriber_node"); - auto subscription = node->create_subscription("string_topic", - [this, &messages](const std_msgs::msg::String::ConstSharedPtr message) { - messages.emplace_back(message->data); - counter_++; - }, 10); - - while (counter_ < expected_messages_number) { + auto node = rclcpp::Node::make_shared("node_" + topic); + std::vector messages; + size_t messages_received = 0; + + auto subscription = node->create_subscription(topic, + [&messages, &messages_received](typename T::ConstSharedPtr message) { + messages.push_back(message); + ++messages_received; + }); + subscriptions_.push_back(subscription); + + while (messages_received < expected_messages_number) { rclcpp::spin_some(node); } return messages; } - void launch_subscriber(size_t expected_messages_number) + template + std::shared_ptr serialize_test_message( + const std::string & topic, std::shared_ptr message) { - subscriber_future_ = std::async(std::launch::async, [this, expected_messages_number] { - return subscribe_messages(expected_messages_number); - }); + auto bag_msg = std::make_shared(); + bag_msg->serialized_data = memory_management_.serialize_message(message); + bag_msg->topic_name = topic; + + return bag_msg; } - void play_bag(std::string database_name, std::string topic) + template + auto launch_subscriber(size_t expected_messages_number, const std::string & topic) { - rosbag2::Rosbag2 rosbag2; - rosbag2.play(database_name, topic); + return std::async(std::launch::async, [this, expected_messages_number, topic] { + return subscribe_messages(expected_messages_number, topic); + }); } - std::atomic counter_; - std::future> subscriber_future_; + void wait_for_subscribers(size_t count) + { + std::async(std::launch::async, [this, count] { + while (subscriptions_.size() < count) { + std::this_thread::sleep_for(50ms); + } + }).get(); + } + + std::atomic messages_stored_counter_; + std::vector subscriptions_; }; -TEST_F(RosBag2IntegrationTestFixture, recorded_messages_are_played) +TEST_F(RosBag2IntegrationTestFixture, recorded_messages_are_played_for_all_topics) { - rclcpp::init(0, nullptr); + auto primitive_message1 = get_messages_primitives()[0]; + primitive_message1->string_value = "Hello World 1"; + + auto primitive_message2 = get_messages_primitives()[1]; + primitive_message2->string_value = "Hello World 2"; + + auto complex_message1 = get_messages_static_array_primitives()[0]; + complex_message1->string_values = {"Complex Hello1", "Complex Hello2", "Complex Hello3"}; + complex_message1->bool_values = {true, false, true}; + + auto complex_message2 = get_messages_static_array_primitives()[0]; + complex_message2->string_values = {"Complex Hello4", "Complex Hello5", "Complex Hello6"}; + complex_message2->bool_values = {false, false, true}; + + auto topic_types = std::map{ + {"topic1", "test_msgs/Primitives"}, + {"topic2", "test_msgs/StaticArrayPrimitives"}, + }; std::vector> messages = - {serialize_message("Hello World 1"), - serialize_message("Hello World 2"), - serialize_message("Hello World 2")}; + {serialize_test_message("topic1", primitive_message1), + serialize_test_message("topic1", primitive_message2), + serialize_test_message("topic1", primitive_message2), + serialize_test_message("topic2", complex_message1), + serialize_test_message("topic2", complex_message2), + serialize_test_message("topic2", complex_message2)}; - write_messages(database_name_, messages); + write_messages(database_name_, messages, topic_types); // Due to a problem related to the subscriber, we play many (3) messages but make the subscriber // node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument. - launch_subscriber(2); - play_bag(database_name_, "string_topic"); - - auto replayed_messages = subscriber_future_.get(); - ASSERT_THAT(replayed_messages, SizeIs(2)); - ASSERT_THAT(replayed_messages[0], Eq("Hello World 1")); - ASSERT_THAT(replayed_messages[1], Eq("Hello World 2")); - - rclcpp::shutdown(); + auto primitive_subscriber_future = launch_subscriber(2, "topic1"); + auto static_array_subscriber_future = + launch_subscriber(2, "topic2"); + wait_for_subscribers(2); + + Rosbag2 rosbag2; + rosbag2.play(database_name_); + + auto replayed_test_primitives = primitive_subscriber_future.get(); + ASSERT_THAT(replayed_test_primitives, SizeIs(Ge(2u))); + EXPECT_THAT(replayed_test_primitives[0]->string_value, Eq("Hello World 1")); + EXPECT_THAT(replayed_test_primitives[1]->string_value, Eq("Hello World 2")); + auto replayed_test_arrays = static_array_subscriber_future.get(); + ASSERT_THAT(replayed_test_arrays, SizeIs(Ge(2u))); + EXPECT_THAT(replayed_test_arrays[0]->bool_values, ElementsAre(true, false, true)); + EXPECT_THAT(replayed_test_arrays[0]->string_values, + ElementsAre("Complex Hello1", "Complex Hello2", "Complex Hello3")); + EXPECT_THAT(replayed_test_arrays[1]->bool_values, ElementsAre(false, false, true)); + EXPECT_THAT(replayed_test_arrays[1]->string_values, + ElementsAre("Complex Hello4", "Complex Hello5", "Complex Hello6")); } diff --git a/rosbag2/test/rosbag2/rosbag2_rosbag_node_test.cpp b/rosbag2/test/rosbag2/rosbag2_rosbag_node_test.cpp index 49c19d4704..4adc67f191 100644 --- a/rosbag2/test/rosbag2/rosbag2_rosbag_node_test.cpp +++ b/rosbag2/test/rosbag2/rosbag2_rosbag_node_test.cpp @@ -24,7 +24,7 @@ #include "../../src/rosbag2/rosbag2_node.hpp" #include "../../src/rosbag2/typesupport_helpers.hpp" -#include "test_helpers.hpp" +#include "test_memory_management.hpp" using namespace ::testing; // NOLINT @@ -52,8 +52,10 @@ class RosBag2NodeFixture : public Test std::vector messages; size_t counter = 0; auto subscription = node_->create_generic_subscription(topic_name, type, - [&counter, &messages](std::shared_ptr message) { - messages.push_back(test_helpers::deserialize_string_message(message)); + [this, &counter, &messages](std::shared_ptr message) { + auto string_message = memory_management_ + .deserialize_message(message); + messages.push_back(string_message->data); counter++; }); @@ -63,6 +65,14 @@ class RosBag2NodeFixture : public Test return messages; } + std::shared_ptr serialize_string_message(std::string message) + { + auto string_message = std::make_shared(); + string_message->data = message; + return memory_management_.serialize_message(string_message); + } + + test_helpers::TestMemoryManagement memory_management_; std::shared_ptr node_; }; @@ -74,13 +84,14 @@ TEST_F(RosBag2NodeFixture, publisher_and_subscriber_work) std::string topic_name = "string_topic"; std::string type = "std_msgs/String"; + auto publisher = node_->create_generic_publisher(topic_name, type); + auto subscriber_future_ = std::async(std::launch::async, [this, topic_name, type] { return subscribe_raw_messages(1, topic_name, type); }); - auto publisher = node_->create_generic_publisher(topic_name, type); for (const auto & message : test_messages) { - publisher->publish(test_helpers::serialize_string_message(message)); + publisher->publish(serialize_string_message(message)); // This is necessary because sometimes, the subscriber is initialized very late std::this_thread::sleep_for(std::chrono::milliseconds(100)); } diff --git a/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp b/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp index 2ee317c11d..4557e3360f 100644 --- a/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp +++ b/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp @@ -18,12 +18,12 @@ #include #include +#include #include #include #include #include "rclcpp/rclcpp.hpp" -#include "std_msgs/msg/string.hpp" #ifdef _WIN32 # include @@ -34,7 +34,7 @@ #include "rosbag2_storage/storage_interfaces/read_only_interface.hpp" #include "rosbag2_storage/storage_interfaces/read_write_interface.hpp" -#include "test_helpers.hpp" +#include "test_memory_management.hpp" using namespace ::testing; // NOLINT @@ -99,8 +99,7 @@ class Rosbag2TestFixture : public Test { std::vector> table_msgs; rosbag2_storage::StorageFactory factory; - auto storage = - factory.open_read_only(db_name, "sqlite3"); + auto storage = factory.open_read_only(db_name, "sqlite3"); if (storage == nullptr) { throw std::runtime_error("failed to open sqlite3 storage"); } @@ -114,37 +113,36 @@ class Rosbag2TestFixture : public Test void write_messages( const std::string & db_name, - std::vector> messages) + std::vector> messages, + std::map topic_types) { - std::string topic = "string_topic"; rosbag2_storage::StorageFactory factory; auto storage = factory.open_read_write(db_name, "sqlite3"); + for (auto topic_and_type : topic_types) { + storage->create_topic(topic_and_type.first, topic_and_type.second); + } - if (storage) { - storage->create_topic(topic, "std_msgs/String"); - for (auto msg : messages) { - msg->topic_name = topic; - storage->write(msg); - } + for (auto msg : messages) { + storage->write(msg); } } - std::shared_ptr serialize_message(std::string message) + template + std::shared_ptr serialize_message( + std::string topic, typename MessageT::_data_type message) { + auto msg = std::make_shared(); + msg->data = message; auto bag_msg = std::make_shared(); - bag_msg->serialized_data = test_helpers::serialize_string_message(message); + bag_msg->serialized_data = memory_management_.serialize_message(msg); + bag_msg->topic_name = topic; return bag_msg; } - std::string deserialize_message( - std::shared_ptr serialized_message) - { - return test_helpers::deserialize_string_message(serialized_message->serialized_data); - } - std::string database_name_; static std::string temporary_dir_path_; + test_helpers::TestMemoryManagement memory_management_; }; std::string Rosbag2TestFixture::temporary_dir_path_ = ""; // NOLINT diff --git a/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp index 1207bd7a3c..3280d8cc74 100644 --- a/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp +++ b/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp @@ -14,8 +14,8 @@ #include -#include #include +#include #include #include #include @@ -23,74 +23,130 @@ #include "rclcpp/rclcpp.hpp" #include "rosbag2/rosbag2.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" #include "std_msgs/msg/string.hpp" +#include "std_msgs/msg/u_int8.hpp" #include "rosbag2_test_fixture.hpp" +#include "test_memory_management.hpp" using namespace ::testing; // NOLINT using namespace rosbag2; // NOLINT using namespace std::chrono_literals; // NOLINT -// TODO(Martin-Idel-SI): merge with rosbag2_read_integration_test once signal handling is sorted out +// TODO(Martin-Idel-SI): merge with other write and read tests once signal handling is sorted out class RosBag2IntegrationTestFixture : public Rosbag2TestFixture { public: RosBag2IntegrationTestFixture() - : Rosbag2TestFixture(), counter_(0) + : Rosbag2TestFixture() {} - void record_message(const std::string & db_name) + void SetUp() override + { + rclcpp::init(0, nullptr); + publisher_node_ = rclcpp::Node::make_shared("publisher_node"); + } + + void record_message(const std::string & db_name, std::vector topics) { rosbag2::Rosbag2 rosbag2; - rosbag2.record(db_name, "string_topic", [this]() { - counter_++; + rosbag2.record(db_name, topics, [this](std::string topic_name) { + messages_stored_counter_[topic_name]++; }); } - void start_recording() + void start_recording(std::vector topics) { - rclcpp::init(0, nullptr); // the future object returned from std::async needs to be stored not to block the execution future_ = std::async( - std::launch::async, [this]() { - record_message(database_name_); + std::launch::async, [this, topics]() { + record_message(database_name_, topics); }); } void stop_recording() { rclcpp::shutdown(); + future_.get(); } - void publish_messages(std::vector> msgs) + void wait_for_publishers() { - size_t expected_counter_value = msgs.size(); - auto node = std::make_shared("publisher_node"); - auto publisher = node->create_publisher("string_topic"); - auto timer = node->create_wall_timer(50ms, [this, publisher, msgs]() { - publisher->publish(msgs[counter_]->serialized_data.get()); - }); - - while (counter_ < expected_counter_value) { - rclcpp::spin_some(node); + for (auto & future : publisher_futures_) { + future.get(); } } - std::atomic counter_; + void start_publishing( + std::shared_ptr message, + std::string topic_name, size_t number_expected_messages) + { + messages_stored_counter_.insert({topic_name, 0}); + publisher_futures_.push_back(std::async( + std::launch::async, [this, message, topic_name, number_expected_messages]() { + auto publisher = publisher_node_->create_publisher(topic_name); + + while (rclcpp::ok() && messages_stored_counter_[topic_name] < number_expected_messages) { + publisher->publish(message->serialized_data.get()); + // We need to wait a bit, in order for record to process the messages + std::this_thread::sleep_for(50ms); + } + } + )); + } + + template + std::shared_ptr deserialize_message( + std::shared_ptr message) + { + return memory_.deserialize_message(message->serialized_data); + } + + template + std::vector> filter_messages( + std::vector> messages, + std::string topic) + { + std::vector> filtered_messages; + for (const auto & message : messages) { + if (message->topic_name == topic) { + filtered_messages.push_back(deserialize_message(message)); + } + } + return filtered_messages; + } + + rclcpp::Node::SharedPtr publisher_node_; + test_helpers::TestMemoryManagement memory_; + std::map messages_stored_counter_; + std::vector> publisher_futures_; std::future future_; }; -TEST_F(RosBag2IntegrationTestFixture, published_messages_are_recorded) +TEST_F(RosBag2IntegrationTestFixture, published_messages_from_multiple_topics_are_recorded) { - std::string message_to_publish = "test_message"; - auto serialized_message = serialize_message(message_to_publish); + std::string int_topic = "/int_topic"; + auto serialized_int_bag_message = serialize_message(int_topic, 10); + + std::string string_topic = "/string_topic"; + auto serialized_string_bag_message = serialize_message( + string_topic, "test_message"); + + start_publishing(serialized_string_bag_message, string_topic, 2); + start_publishing(serialized_int_bag_message, int_topic, 2); - start_recording(); - publish_messages({serialized_message}); + start_recording({string_topic, int_topic}); + wait_for_publishers(); stop_recording(); auto recorded_messages = get_messages(database_name_); - ASSERT_THAT(recorded_messages, Not(IsEmpty())); - EXPECT_THAT(deserialize_message(recorded_messages[0]), Eq(message_to_publish)); + ASSERT_THAT(recorded_messages, SizeIs(4)); + auto string_messages = filter_messages(recorded_messages, string_topic); + auto int_messages = filter_messages(recorded_messages, int_topic); + ASSERT_THAT(string_messages, SizeIs(2)); + ASSERT_THAT(int_messages, SizeIs(2)); + EXPECT_THAT(string_messages[0]->data, Eq("test_message")); + EXPECT_THAT(int_messages[0]->data, Eq(10)); } diff --git a/rosbag2/test/rosbag2/test_helpers.hpp b/rosbag2/test/rosbag2/test_helpers.hpp deleted file mode 100644 index c065d191c2..0000000000 --- a/rosbag2/test/rosbag2/test_helpers.hpp +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2018, Bosch Software Innovations GmbH. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef ROSBAG2__TEST_HELPERS_HPP_ -#define ROSBAG2__TEST_HELPERS_HPP_ - -#include -#include -#include - -#include "rclcpp/rclcpp.hpp" -#include "std_msgs/msg/string.hpp" - -#include "../../src/rosbag2/typesupport_helpers.hpp" - -namespace test_helpers -{ - -std::shared_ptr serialize_string_message(const std::string & message) -{ - auto test_message = std::make_shared(); - test_message->data = message; - - auto rcutils_allocator = rcutils_get_default_allocator(); - auto initial_capacity = 8u + static_cast(test_message->data.size()); - auto msg = new rmw_serialized_message_t; - *msg = rcutils_get_zero_initialized_char_array(); - auto ret = rcutils_char_array_init(msg, initial_capacity, &rcutils_allocator); - if (ret != RCUTILS_RET_OK) { - throw std::runtime_error("Error allocating resources for serialized message" + - std::to_string(ret)); - } - - auto serialized_message = std::shared_ptr(msg, - [](rmw_serialized_message_t * msg) { - int error = rcutils_char_array_fini(msg); - delete msg; - if (error != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( - "rosbag2", "Leaking memory. Error: %s", rcutils_get_error_string_safe()); - } - }); - - serialized_message->buffer_length = initial_capacity; - - auto string_ts = rosbag2::get_typesupport("std_msgs/String"); - - auto error = rmw_serialize(test_message.get(), string_ts, serialized_message.get()); - if (error != RMW_RET_OK) { - throw std::runtime_error("Something went wrong preparing the serialized message"); - } - - return serialized_message; -} - -std::string deserialize_string_message(std::shared_ptr serialized_message) -{ - char * copied = new char[serialized_message->buffer_length]; - auto string_length = serialized_message->buffer_length - 8; - memcpy(copied, &serialized_message->buffer[8], string_length); - std::string message_content(copied); - delete[] copied; - return message_content; -} - - -} // namespace test_helpers - -#endif // ROSBAG2__TEST_HELPERS_HPP_ diff --git a/rosbag2/test/rosbag2/test_memory_management.cpp b/rosbag2/test/rosbag2/test_memory_management.cpp new file mode 100644 index 0000000000..985fe69e16 --- /dev/null +++ b/rosbag2/test/rosbag2/test_memory_management.cpp @@ -0,0 +1,53 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test_memory_management.hpp" + +#include +#include + +namespace test_helpers +{ + +TestMemoryManagement::TestMemoryManagement() +{ + rcutils_allocator_ = rcutils_get_default_allocator(); +} + +std::shared_ptr +TestMemoryManagement::get_initialized_serialized_message(size_t capacity) +{ + auto msg = new rmw_serialized_message_t; + *msg = rcutils_get_zero_initialized_char_array(); + auto ret = rcutils_char_array_init(msg, capacity, &rcutils_allocator_); + if (ret != RCUTILS_RET_OK) { + throw std::runtime_error("Error allocating resources for serialized message: " + + std::string(rcutils_get_error_string_safe())); + } + + auto serialized_message = std::shared_ptr(msg, + [](rmw_serialized_message_t * msg) { + int error = rcutils_char_array_fini(msg); + delete msg; + if (error != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rosbag2_storage", + "Leaking memory. Error: %s", rcutils_get_error_string_safe()); + } + }); + return serialized_message; +} + + +} // namespace test_helpers diff --git a/rosbag2/test/rosbag2/test_memory_management.hpp b/rosbag2/test/rosbag2/test_memory_management.hpp new file mode 100644 index 0000000000..c55d681cd7 --- /dev/null +++ b/rosbag2/test/rosbag2/test_memory_management.hpp @@ -0,0 +1,76 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2__TEST_MEMORY_MANAGEMENT_HPP_ +#define ROSBAG2__TEST_MEMORY_MANAGEMENT_HPP_ + +#include + +#include "rclcpp/rclcpp.hpp" + +namespace test_helpers +{ +class TestMemoryManagement +{ +public: + TestMemoryManagement(); + ~TestMemoryManagement() = default; + + std::shared_ptr get_initialized_serialized_message(size_t capacity); + + template + inline + const rosidl_message_type_support_t * get_message_typesupport(std::shared_ptr) + { + return rosidl_typesupport_cpp::get_message_type_support_handle(); + } + + template + inline + std::shared_ptr serialize_message(std::shared_ptr message) + { + auto serialized_message = get_initialized_serialized_message(0); + auto error = rmw_serialize( + message.get(), + get_message_typesupport(message), + serialized_message.get()); + if (error != RCL_RET_OK) { + throw std::runtime_error("Failed to serialize"); + } + return serialized_message; + } + + template + inline + std::shared_ptr deserialize_message(std::shared_ptr serialized_msg) + { + auto message = std::make_shared(); + auto error = rmw_deserialize( + serialized_msg.get(), + get_message_typesupport(message), + message.get()); + if (error != RCL_RET_OK) { + throw std::runtime_error("Failed to deserialize"); + } + return message; + } + +private: + rcutils_allocator_t rcutils_allocator_; +}; + + +} // namespace test_helpers + +#endif // ROSBAG2__TEST_MEMORY_MANAGEMENT_HPP_ diff --git a/rosbag2_storage/src/rosbag2_storage/ros_helper.cpp b/rosbag2_storage/src/rosbag2_storage/ros_helper.cpp index 90927f2dd8..8a4d8d99a9 100644 --- a/rosbag2_storage/src/rosbag2_storage/ros_helper.cpp +++ b/rosbag2_storage/src/rosbag2_storage/ros_helper.cpp @@ -23,14 +23,14 @@ namespace rosbag2_storage { +static rcutils_allocator_t allocator = rcutils_get_default_allocator(); + std::shared_ptr make_serialized_message(const void * data, size_t size) { - auto rcutils_allocator = new rcutils_allocator_t; - *rcutils_allocator = rcutils_get_default_allocator(); auto msg = new rcutils_char_array_t; *msg = rcutils_get_zero_initialized_char_array(); - auto ret = rcutils_char_array_init(msg, size, rcutils_allocator); + auto ret = rcutils_char_array_init(msg, size, &allocator); if (ret != RCUTILS_RET_OK) { throw std::runtime_error("Error allocating resources for serialized message: " + std::string(rcutils_get_error_string_safe())); diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.hpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.hpp index 8ac0b92fa5..15d19f25dd 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.hpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.hpp @@ -48,7 +48,7 @@ class SqliteStatementWrapper : public std::enable_shared_from_this statement, int position) - : statement_(statement), next_row_idx_(position) + : statement_(statement), next_row_idx_(position), cached_row_idx_(POSITION_END - 1) { if (next_row_idx_ != POSITION_END) { if (statement_->step()) { @@ -81,6 +81,12 @@ class SqliteStatementWrapper : public std::enable_shared_from_this> @@ -110,8 +118,15 @@ class SqliteStatementWrapper : public std::enable_shared_from_this) const {} // end of recursion + bool is_row_cache_valid() const + { + return cached_row_idx_ == next_row_idx_ - 1; + } + std::shared_ptr statement_; int next_row_idx_; + mutable int cached_row_idx_; + mutable RowType row_cache_; }; explicit QueryResult(std::shared_ptr statement) diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp index adcf496fbe..e017ccf4e7 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp @@ -94,7 +94,10 @@ std::shared_ptr SqliteStorage::read_next( } auto bag_message = std::make_shared(); - std::tie(bag_message->serialized_data, bag_message->time_stamp) = *current_message_row_; + bag_message->serialized_data = std::get<0>(*current_message_row_); + bag_message->time_stamp = std::get<1>(*current_message_row_); + bag_message->topic_name = std::get<2>(*current_message_row_); + ++current_message_row_; return bag_message; } @@ -102,7 +105,7 @@ std::shared_ptr SqliteStorage::read_next( std::map SqliteStorage::get_all_topics_and_types() { if (all_topics_and_types_.empty()) { - fill_topics_and_types_map(); + fill_topics_and_types(); } return all_topics_and_types_; @@ -147,14 +150,16 @@ void SqliteStorage::prepare_for_writing() void SqliteStorage::prepare_for_reading() { - read_statement_ = - database_->prepare_statement("SELECT data, timestamp FROM messages ORDER BY id;"); + read_statement_ = database_->prepare_statement( + "SELECT data, timestamp, topics.name " + "FROM messages JOIN topics ON messages.topic_id = topics.id " + "ORDER BY messages.id;"); message_result_ = read_statement_->execute_query< - std::shared_ptr, rcutils_time_point_value_t>(); + std::shared_ptr, rcutils_time_point_value_t, std::string>(); current_message_row_ = message_result_.begin(); } -void SqliteStorage::fill_topics_and_types_map() +void SqliteStorage::fill_topics_and_types() { auto statement = database_->prepare_statement("SELECT name, type FROM topics ORDER BY id;"); auto query_results = statement->execute_query(); diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp index d6c41a61e7..238d43c6dd 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp @@ -62,17 +62,18 @@ class SqliteStorage : public rosbag2_storage::storage_interfaces::ReadWriteInter void initialize(); void prepare_for_writing(); void prepare_for_reading(); - void fill_topics_and_types_map(); + void fill_topics_and_types(); bool database_exists(const std::string & uri); + using ReadQueryResult = SqliteStatementWrapper::QueryResult< + std::shared_ptr, rcutils_time_point_value_t, std::string>; + std::shared_ptr database_; rosbag2_storage::BagInfo bag_info_; SqliteStatement write_statement_; SqliteStatement read_statement_; - SqliteStatementWrapper::QueryResult, - rcutils_time_point_value_t> message_result_; - SqliteStatementWrapper::QueryResult, - rcutils_time_point_value_t>::Iterator current_message_row_; + ReadQueryResult message_result_; + ReadQueryResult::Iterator current_message_row_; std::map topics_; std::map all_topics_and_types_; }; diff --git a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp index 081f3599b4..413fbd025f 100644 --- a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp +++ b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -26,50 +27,27 @@ using namespace ::testing; // NOLINT TEST_F(StorageTestFixture, string_messages_are_written_and_read_to_and_from_sqlite3_storage) { - std::vector> string_messages = - {std::make_pair("first message", 1), - std::make_pair("second message", 2), - std::make_pair("third message", 3)}; - - write_messages_to_sqlite(string_messages); + std::vector string_messages = {"first message", "second message", "third message"}; + std::vector topics = {"topic1", "topic2", "topic3"}; + std::vector> messages = + {std::make_tuple(string_messages[0], 1, topics[0], "type1"), + std::make_tuple(string_messages[1], 2, topics[1], "type2"), + std::make_tuple(string_messages[2], 3, topics[2], "type3")}; + + write_messages_to_sqlite(messages); auto read_messages = read_all_messages_from_sqlite(); ASSERT_THAT(read_messages, SizeIs(3)); - EXPECT_THAT(deserialize_message(read_messages[0]->serialized_data), Eq(string_messages[0].first)); - EXPECT_THAT(deserialize_message(read_messages[1]->serialized_data), Eq(string_messages[1].first)); - EXPECT_THAT(deserialize_message(read_messages[2]->serialized_data), Eq(string_messages[2].first)); -} - -TEST_F(StorageTestFixture, message_roundtrip_with_arbitrary_char_array_works_correctly) { - std::string message = "test_message"; - std::string topic = "test_topic"; - size_t message_size = strlen(message.c_str()) + 1; - char * test_message = new char[message_size]; - memcpy(test_message, message.c_str(), message_size); - - std::unique_ptr read_write_storage = - std::make_unique(); - read_write_storage->open(database_name_); - read_write_storage->create_topic(topic, "string"); - - auto test = std::make_shared(); - test->serialized_data = std::make_shared(); - test->serialized_data->buffer = test_message; - test->serialized_data->buffer_length = message_size; - test->serialized_data->buffer_capacity = message_size; - test->topic_name = topic; - - read_write_storage->write(test); - - auto read_message = read_write_storage->read_next(); - EXPECT_THAT(strcmp(read_message->serialized_data->buffer, test_message), Eq(0)); - - delete[] test_message; + for (size_t i = 0; i < 3; i++) { + EXPECT_THAT(deserialize_message(read_messages[i]->serialized_data), Eq(string_messages[i])); + EXPECT_THAT(read_messages[i]->time_stamp, Eq(std::get<1>(messages[i]))); + EXPECT_THAT(read_messages[i]->topic_name, Eq(topics[i])); + } } TEST_F(StorageTestFixture, has_next_return_false_if_there_are_no_more_messages) { - std::vector> string_messages = - {std::make_pair("first message", 1), std::make_pair("second message", 2)}; + std::vector> string_messages = + {std::make_tuple("first message", 1, "", ""), std::make_tuple("second message", 2, "", "")}; write_messages_to_sqlite(string_messages); std::unique_ptr readable_storage = @@ -83,17 +61,19 @@ TEST_F(StorageTestFixture, has_next_return_false_if_there_are_no_more_messages) EXPECT_FALSE(readable_storage->has_next()); } -TEST_F(StorageTestFixture, write_stamped_char_array_writes_correct_time_stamp) { - std::vector> string_messages = - {std::make_pair("first message", 1), std::make_pair("second message", 2)}; - - write_messages_to_sqlite(string_messages); - std::unique_ptr readable_storage = +TEST_F(StorageTestFixture, get_all_topics_and_types_returns_the_correct_map) { + std::unique_ptr writable_storage = std::make_unique(); - readable_storage->open(database_name_); - - auto read_message = readable_storage->read_next(); - EXPECT_THAT(read_message->time_stamp, Eq(1)); - read_message = readable_storage->read_next(); - EXPECT_THAT(read_message->time_stamp, Eq(2)); + writable_storage->open(database_name_); + writable_storage->create_topic("topic1", "type1"); + writable_storage->create_topic("topic2", "type2"); + writable_storage.reset(); + + auto readable_storage = std::make_unique(); + readable_storage->open(database_name_, rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY); + auto topics_and_types = readable_storage->get_all_topics_and_types(); + + EXPECT_THAT(topics_and_types, SizeIs(2)); + EXPECT_THAT(topics_and_types["topic1"], Eq("type1")); + EXPECT_THAT(topics_and_types["topic2"], Eq("type2")); } diff --git a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper_integration_test.cpp b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper_integration_test.cpp index 3e531c6405..5f6db56d65 100644 --- a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper_integration_test.cpp +++ b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper_integration_test.cpp @@ -39,11 +39,11 @@ TEST_F(StorageTestFixture, querying_adhoc_results_with_normal_data_gives_content auto result = db.prepare_statement("SELECT 1, 1.465, 'abc';")->execute_query(); - auto row = *(result.begin()); + auto row_iter = result.begin(); - ASSERT_THAT(std::get<0>(row), Eq(1)); - ASSERT_THAT(std::get<1>(row), Eq(1.465)); - ASSERT_THAT(std::get<2>(row), StrEq("abc")); + ASSERT_THAT(std::get<0>(*row_iter), Eq(1)); + ASSERT_THAT(std::get<1>(*row_iter), Eq(1.465)); + ASSERT_THAT(std::get<2>(*row_iter), StrEq("abc")); } TEST_F(StorageTestFixture, bind_values_are_inserted) { diff --git a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/storage_test_fixture.hpp b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/storage_test_fixture.hpp index a10553d658..8b96e746ad 100644 --- a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/storage_test_fixture.hpp +++ b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/storage_test_fixture.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -134,19 +135,22 @@ class StorageTestFixture : public Test return message_content; } - void write_messages_to_sqlite(std::vector> messages) + void write_messages_to_sqlite( + std::vector> messages) { std::unique_ptr writable_storage = std::make_unique(); - std::string topic = "test_topic"; + writable_storage->open(database_name_); - writable_storage->create_topic(topic, "string"); for (auto msg : messages) { + std::string topic_name = std::get<2>(msg); + std::string type_name = std::get<3>(msg); + writable_storage->create_topic(topic_name, type_name); auto bag_message = std::make_shared(); - bag_message->serialized_data = make_serialized_message(msg.first); - bag_message->time_stamp = msg.second; - bag_message->topic_name = topic; + bag_message->serialized_data = make_serialized_message(std::get<0>(msg)); + bag_message->time_stamp = std::get<1>(msg); + bag_message->topic_name = topic_name; writable_storage->write(bag_message); } }