Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and clarify logic in test_play filter test #690

Merged
merged 3 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn

std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() override
{
// filter_ was considered when incrementing num_read_ in has_next()
return messages_[num_read_++];
}

Expand Down
120 changes: 67 additions & 53 deletions rosbag2_transport/test/rosbag2_transport/test_play.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,78 +197,92 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics)
serialize_test_message("topic2", 750, complex_message1),
serialize_test_message("topic2", 950, complex_message1)};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
// Filter allows /topic2, blocks /topic1
{
play_options_.topics_to_filter = {"topic2"};

// Set filter
rosbag2_storage::StorageFilter storage_filter;
storage_filter.topics.push_back("topic2");
reader_->set_filter(storage_filter);
// SubscriptionManager has to be recreated for every unique test
// If it isn't, message counts accumulate
sub_.reset();
sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", 0);
sub_->add_subscription<test_msgs::msg::Arrays>("/topic2", 2);

// Due to a problem related to the subscriber, we play many (3) messages but make the subscriber
// node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument.
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", 2);
sub_->add_subscription<test_msgs::msg::Arrays>("/topic2", 2);
auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

auto await_received_messages = sub_->spin_subscriptions();
auto await_received_messages = sub_->spin_subscriptions();

Rosbag2Transport rosbag2_transport(reader_, writer_);
rosbag2_transport.play(storage_options_, play_options_);
Rosbag2Transport rosbag2_transport(reader_, writer_);
rosbag2_transport.play(storage_options_, play_options_);

await_received_messages.get();
await_received_messages.get();

auto replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
"/topic1");
EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(0u)));
auto replayed_topic1 = sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1");
EXPECT_THAT(replayed_topic1, SizeIs(0u));

auto replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>(
"/topic2");
EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u)));
auto replayed_topic2 = sub_->get_received_messages<test_msgs::msg::Arrays>("/topic2");
EXPECT_THAT(replayed_topic2, SizeIs(Ge(2u)));
}

// Set new filter
auto prepared_mock_reader2 = std::make_unique<MockSequentialReader>();
reader_.reset();
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader2));
storage_filter.topics.clear();
storage_filter.topics.push_back("topic1");
reader_->set_filter(storage_filter);
// Filter allows /topic1, blocks /topic2
{
play_options_.topics_to_filter = {"topic1"};

await_received_messages = sub_->spin_subscriptions();
// SubscriptionManager has to be recreated for every unique test
// otherwise counts accumulate, returning the spin immediately
sub_.reset();
sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", 2);
sub_->add_subscription<test_msgs::msg::Arrays>("/topic2", 0);

rosbag2_transport = Rosbag2Transport(reader_, writer_);
rosbag2_transport.play(storage_options_, play_options_);
auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

await_received_messages.get();
auto await_received_messages = sub_->spin_subscriptions();

replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
"/topic1");
EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u)));
Rosbag2Transport rosbag2_transport(reader_, writer_);
rosbag2_transport.play(storage_options_, play_options_);

replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>(
"/topic2");
EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(0u)));
await_received_messages.get();

// Reset filter
auto prepared_mock_reader3 = std::make_unique<MockSequentialReader>();
reader_.reset();
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader3));
reader_->reset_filter();
auto replayed_topic1 = sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1");
EXPECT_THAT(replayed_topic1, SizeIs(Ge(2u)));

await_received_messages = sub_->spin_subscriptions();
auto replayed_topic2 = sub_->get_received_messages<test_msgs::msg::Arrays>("/topic2");
EXPECT_THAT(replayed_topic2, SizeIs(0u));
}

rosbag2_transport = Rosbag2Transport(reader_, writer_);
rosbag2_transport.play(storage_options_, play_options_);
// No filter, receive both topics
{
play_options_.topics_to_filter.clear();

await_received_messages.get();
// SubscriptionManager has to be recreated for every unique test
// otherwise counts accumulate, returning the spin immediately
sub_.reset();
sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", 2);
sub_->add_subscription<test_msgs::msg::Arrays>("/topic2", 2);

replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
"/topic1");
EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u)));
auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
reader_ = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>(
"/topic2");
EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u)));
auto await_received_messages = sub_->spin_subscriptions();

auto rosbag2_transport = Rosbag2Transport(reader_, writer_);
rosbag2_transport.play(storage_options_, play_options_);

await_received_messages.get();

auto replayed_topic1 = sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1");
EXPECT_THAT(replayed_topic1, SizeIs(Ge(2u)));

auto replayed_topic2 = sub_->get_received_messages<test_msgs::msg::Arrays>("/topic2");
EXPECT_THAT(replayed_topic2, SizeIs(Ge(2u)));
}
}

TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_with_unknown_type)
Expand Down