Skip to content

Commit

Permalink
Replace MockReader with real bag for tests
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov committed Aug 17, 2021
1 parent fa260d3 commit 07e33e9
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 23 deletions.
19 changes: 19 additions & 0 deletions rosbag2_transport/test/resources/test_bag_for_seek/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
rosbag2_bagfile_information:
version: 4
storage_identifier: sqlite3
relative_file_paths:
- test_bag_for_seek_0.db3
duration:
nanoseconds: 400000000
starting_time:
nanoseconds_since_epoch: 1000000000
message_count: 5
topics_with_message_count:
- topic_metadata:
name: topic1
type: test_msgs/BasicTypes
serialization_format: cdr
offered_qos_profiles: ""
message_count: 5
compression_format: ""
compression_mode: ""
Binary file not shown.
83 changes: 60 additions & 23 deletions rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
#include <chrono>
#include <future>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "mock_player.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"
#include "rosbag2_play_test_fixture.hpp"
#include "rosbag2_storage/storage_options.hpp"
#include "test_msgs/message_fixtures.hpp"
#include "test_msgs/msg/basic_types.hpp"

Expand All @@ -36,36 +39,70 @@ class RosBag2PlaySeekTestFixture : public RosBag2PlayTestFixture
: RosBag2PlayTestFixture()
{
topic_types_ = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", "", ""}};
{"topic1", "test_msgs/BasicTypes", rmw_get_serialization_format(), ""}};

messages_.reserve(num_msgs_to_publish_);
const rcpputils::fs::path current_file_path{__FILE__};
const rcpputils::fs::path base = current_file_path.parent_path().parent_path();
const rcpputils::fs::path bag_path = base / "resources" / "test_bag_for_seek";

storage_options_ = rosbag2_storage::StorageOptions({bag_path.string(), "sqlite3", 0, 0, 0});
play_options_.read_ahead_queue_size = 2;
reader_ = std::make_unique<rosbag2_cpp::Reader>();

check_metadata();
}


void check_metadata()
{
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io =
std::make_unique<rosbag2_storage::MetadataIo>();
ASSERT_TRUE(metadata_io->metadata_file_exists(storage_options_.uri));
rosbag2_storage::BagMetadata metadata = metadata_io->read_metadata(storage_options_.uri);
ASSERT_EQ(metadata.message_count, num_msgs_in_bag_);
ASSERT_EQ(start_time_ms_, metadata.starting_time.time_since_epoch().count() / 1000000);
}

void create_bag_file_for_tests()
{
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages;
messages.reserve(num_msgs_in_bag_);

auto primitive_message = get_messages_basic_types()[0];
for (size_t i = 0; i < num_msgs_to_publish_; i++) {
for (size_t i = 0; i < num_msgs_in_bag_; i++) {
primitive_message->int32_value = static_cast<int32_t>(i + 1);
const int64_t timestamp = start_time_ms_ + message_spacing_ms_ * static_cast<int64_t>(i);
messages_.push_back(serialize_test_message("topic1", timestamp, primitive_message));
messages.push_back(serialize_test_message("topic1", timestamp, primitive_message));
}

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages_, topic_types_);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
play_options_.read_ahead_queue_size = 2;
const rosbag2_cpp::ConverterOptions converter_options(
{rmw_get_serialization_format(), rmw_get_serialization_format()});

std::unique_ptr<rosbag2_cpp::writers::SequentialWriter> writer =
std::make_unique<rosbag2_cpp::writers::SequentialWriter>();

writer->open(storage_options_, converter_options);
writer->create_topic(topic_types_[0]);

for (auto const & message : messages) {
writer->write(message);
}
writer->close();
}

const size_t num_msgs_to_publish_ = 5;
const size_t num_msgs_in_bag_ = 5;
const int64_t start_time_ms_ = 1000;
const int64_t message_spacing_ms_ = 100;
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages_;
std::vector<rosbag2_storage::TopicMetadata> topic_types_;
std::unique_ptr<rosbag2_cpp::Reader> reader_;
};

TEST_F(RosBag2PlaySeekTestFixture, seek_back_in_time) {
const size_t expected_number_of_messages = num_msgs_in_bag_ + 2;
auto player = std::make_shared<MockPlayer>(std::move(reader_), storage_options_, play_options_);

sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", messages_.size() + 2);
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", expected_number_of_messages);

// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(
Expand Down Expand Up @@ -95,7 +132,7 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_back_in_time) {
await_received_messages.get();

auto replayed_topic1 = sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1");
ASSERT_THAT(replayed_topic1, SizeIs(num_msgs_to_publish_ + 2));
ASSERT_THAT(replayed_topic1, SizeIs(expected_number_of_messages));

EXPECT_EQ(replayed_topic1[0]->int32_value, 1);
EXPECT_EQ(replayed_topic1[1]->int32_value, 2);
Expand All @@ -108,7 +145,7 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_back_in_time) {
}

TEST_F(RosBag2PlaySeekTestFixture, seek_with_timestamp_later_than_in_last_message) {
const size_t expected_number_of_messages = num_msgs_to_publish_;
const size_t expected_number_of_messages = num_msgs_in_bag_;
auto player = std::make_shared<MockPlayer>(std::move(reader_), storage_options_, play_options_);

sub_ = std::make_shared<SubscriptionManager>();
Expand All @@ -131,7 +168,7 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_with_timestamp_later_than_in_last_messag
// Jump in timestamp equal to the timestamp in last message + 1 nanosecond
EXPECT_FALSE(
player->seek(
(start_time_ms_ + message_spacing_ms_ * (num_msgs_to_publish_ - 1)) * 1000000 + 1)
(start_time_ms_ + message_spacing_ms_ * (num_msgs_in_bag_ - 1)) * 1000000 + 1)
);

EXPECT_TRUE(player->play_next());
Expand All @@ -149,7 +186,7 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_with_timestamp_later_than_in_last_messag
}

TEST_F(RosBag2PlaySeekTestFixture, seek_forward) {
const size_t expected_number_of_messages = num_msgs_to_publish_ - 1;
const size_t expected_number_of_messages = num_msgs_in_bag_ - 1;
auto player = std::make_shared<MockPlayer>(std::move(reader_), storage_options_, play_options_);

sub_ = std::make_shared<SubscriptionManager>();
Expand Down Expand Up @@ -187,7 +224,7 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_forward) {
}

TEST_F(RosBag2PlaySeekTestFixture, seek_back_in_time_from_the_end_of_the_bag) {
const size_t expected_number_of_messages = num_msgs_to_publish_ + num_msgs_to_publish_ - 2;
const size_t expected_number_of_messages = num_msgs_in_bag_ + num_msgs_in_bag_ - 2;
auto player = std::make_shared<MockPlayer>(std::move(reader_), storage_options_, play_options_);

sub_ = std::make_shared<SubscriptionManager>();
Expand All @@ -206,7 +243,7 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_back_in_time_from_the_end_of_the_bag) {

EXPECT_TRUE(player->is_paused());
// Play all messages from bag
for (size_t i = 0; i < num_msgs_to_publish_; i++) {
for (size_t i = 0; i < num_msgs_in_bag_; i++) {
EXPECT_TRUE(player->play_next());
}
EXPECT_FALSE(player->play_next()); // Make sure there are no messages to play
Expand All @@ -221,19 +258,19 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_back_in_time_from_the_end_of_the_bag) {
auto replayed_topic1 = sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1");
EXPECT_THAT(replayed_topic1, SizeIs(expected_number_of_messages));

for (size_t i = 0; i < num_msgs_to_publish_; i++) {
for (size_t i = 0; i < num_msgs_in_bag_; i++) {
EXPECT_EQ(replayed_topic1[i]->int32_value, static_cast<int32_t>(i + 1)) << "i=" << i;
}

for (size_t i = 0; i < (replayed_topic1.size() - num_msgs_to_publish_); i++) {
for (size_t i = 0; i < (replayed_topic1.size() - num_msgs_in_bag_); i++) {
EXPECT_EQ(
replayed_topic1[i + num_msgs_to_publish_]->int32_value,
replayed_topic1[i + num_msgs_in_bag_]->int32_value,
static_cast<int32_t>(i + 3)) << "i=" << i;
}
}

TEST_F(RosBag2PlaySeekTestFixture, seek_forward_from_the_end_of_the_bag) {
const size_t expected_number_of_messages = num_msgs_to_publish_;
const size_t expected_number_of_messages = num_msgs_in_bag_;
auto player = std::make_shared<MockPlayer>(std::move(reader_), storage_options_, play_options_);

sub_ = std::make_shared<SubscriptionManager>();
Expand All @@ -252,15 +289,15 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_forward_from_the_end_of_the_bag) {

EXPECT_TRUE(player->is_paused());
// Play all messages from bag
for (size_t i = 0; i < num_msgs_to_publish_; i++) {
for (size_t i = 0; i < num_msgs_in_bag_; i++) {
EXPECT_TRUE(player->play_next());
}
EXPECT_FALSE(player->play_next()); // Make sure there are no messages to play

// Jump in timestamp equal to the timestamp in last message + 1 nanosecond
EXPECT_FALSE(
player->seek(
(start_time_ms_ + message_spacing_ms_ * (num_msgs_to_publish_ - 1)) * 1000000 + 1)
(start_time_ms_ + message_spacing_ms_ * (num_msgs_in_bag_ - 1)) * 1000000 + 1)
);
EXPECT_FALSE(player->play_next());
player->resume();
Expand Down

0 comments on commit 07e33e9

Please sign in to comment.