Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set_read_order: return success #1177

Merged
merged 7 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
MOCK_METHOD1(update_metadata, void(const rosbag2_storage::BagMetadata &));
MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(set_read_order, void(const rosbag2_storage::ReadOrder &));
MOCK_METHOD1(set_read_order, bool(const rosbag2_storage::ReadOrder &));
MOCK_METHOD0(has_next, bool());
MOCK_METHOD0(read_next, std::shared_ptr<rosbag2_storage::SerializedBagMessage>());
MOCK_METHOD1(write, void(std::shared_ptr<const rosbag2_storage::SerializedBagMessage>));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class SequentialCompressionReaderTest : public Test

ON_CALL(*storage_, get_all_topics_and_types()).WillByDefault(Return(topics_and_types));
ON_CALL(*storage_, read_next()).WillByDefault(Return(message));
ON_CALL(*storage_, set_read_order).WillByDefault(Return(true));
ON_CALL(*storage_factory_, open_read_only(_)).WillByDefault(Return(storage_));

initialize_dummy_storage_files();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
[this](const rosbag2_storage::BagMetadata & metadata) {
v_intercepted_update_metadata_.emplace_back(metadata);
});
ON_CALL(*storage_, set_read_order).WillByDefault(Return(true));
}

~SequentialCompressionWriterTest() override
Expand Down
6 changes: 4 additions & 2 deletions rosbag2_cpp/include/rosbag2_cpp/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ class ROSBAG2_CPP_PUBLIC Reader
* read head timestamp.
*
* \param read_order Sorting criterion and direction to read messages in
* \throws runtime_error if the Reader is not open.
* \return true if the requested read order has been successfully set.
* \note Calling set_read_order(order) concurrently with has_next(), seek(t), has_next_file()
* or load_next_file() will cause undefined behavior
* or load_next_file() will cause undefined behavior.
*/
void set_read_order(const rosbag2_storage::ReadOrder & read_order);
bool set_read_order(const rosbag2_storage::ReadOrder & read_order);

/**
* Ask whether the underlying bagfile contains at least one more message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ROSBAG2_CPP_PUBLIC BaseReaderInterface

virtual void close() = 0;

virtual void set_read_order(const rosbag2_storage::ReadOrder &) = 0;
virtual bool set_read_order(const rosbag2_storage::ReadOrder &) = 0;

virtual bool has_next() = 0;

Expand Down
5 changes: 3 additions & 2 deletions rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
void close() override;

/**
* \throws runtime_error if the Reader is not open.
* \note Calling set_read_order(order) concurrently with has_next(), seek(t), has_next_file()
* or load_next_file() will cause undefined behavior
* or load_next_file() will cause undefined behavior.
*/
void set_read_order(const rosbag2_storage::ReadOrder & order) override;
bool set_read_order(const rosbag2_storage::ReadOrder & order) override;

bool has_next() override;

Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/src/rosbag2_cpp/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void Reader::close()
reader_impl_->close();
}

void Reader::set_read_order(const rosbag2_storage::ReadOrder & order)
bool Reader::set_read_order(const rosbag2_storage::ReadOrder & order)
{
return reader_impl_->set_read_order(order);
}
Expand Down
18 changes: 14 additions & 4 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ std::vector<std::string> resolve_relative_paths(
}
} // namespace details

static const rosbag2_storage::ReadOrder kFallbackOrder(rosbag2_storage::ReadOrder::SortBy::File,
false);

SequentialReader::SequentialReader(
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory,
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory,
Expand Down Expand Up @@ -106,7 +109,13 @@ void SequentialReader::open(
if (!storage_) {
throw std::runtime_error{"No storage could be initialized from the inputs."};
}
storage_->set_read_order(read_order_);
if (!set_read_order(read_order_)) {
ROSBAG2_CPP_LOG_WARN(
"Could not set read order on open(), falling back to file order");
if (!set_read_order(kFallbackOrder)) {
throw std::runtime_error("Could not set read order on open()");
}
}
metadata_ = storage_->get_metadata();
if (metadata_.relative_file_paths.empty()) {
ROSBAG2_CPP_LOG_WARN("No file paths were found in metadata.");
Expand All @@ -129,12 +138,13 @@ void SequentialReader::open(
topics[0].topic_metadata.serialization_format);
}

void SequentialReader::set_read_order(const rosbag2_storage::ReadOrder & order)
bool SequentialReader::set_read_order(const rosbag2_storage::ReadOrder & order)
{
if (storage_) {
storage_->set_read_order(order);
if (!storage_) {
james-rms marked this conversation as resolved.
Show resolved Hide resolved
throw std::runtime_error("read order can only be set after open()");
}
read_order_ = order;
return storage_->set_read_order(read_order_);
}

bool SequentialReader::has_next()
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
MOCK_METHOD1(update_metadata, void(const rosbag2_storage::BagMetadata &));
MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(set_read_order, void(const rosbag2_storage::ReadOrder &));
MOCK_METHOD1(set_read_order, bool(const rosbag2_storage::ReadOrder &));
MOCK_METHOD0(has_next, bool());
MOCK_METHOD0(has_next_file, bool());
MOCK_METHOD0(read_next, std::shared_ptr<rosbag2_storage::SerializedBagMessage>());
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class MultifileReaderTest : public Test

EXPECT_CALL(*storage_, get_all_topics_and_types())
.Times(AtMost(1)).WillRepeatedly(Return(topics_and_types));
ON_CALL(*storage_, set_read_order).WillByDefault(Return(true));
ON_CALL(*storage_, read_next()).WillByDefault(Return(message));
EXPECT_CALL(*storage_factory, open_read_only(_)).WillRepeatedly(Return(storage_));

Expand Down
20 changes: 9 additions & 11 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class SequentialReaderTest : public Test
});
EXPECT_CALL(*storage_, has_next_file()).WillRepeatedly(Return(true));
EXPECT_CALL(*storage_, read_next()).WillRepeatedly(Return(message));
ON_CALL(*storage_, set_read_order).WillByDefault(Return(true));

EXPECT_CALL(*storage_factory, open_read_only(_)).Times(AnyNumber());
ON_CALL(*storage_factory, open_read_only).WillByDefault(
Expand Down Expand Up @@ -328,10 +329,10 @@ class ReadOrderTest : public TemporaryDirectoryFixture
TEST_F(ReadOrderTest, received_timestamp_order) {
rosbag2_storage::ReadOrder order(rosbag2_storage::ReadOrder::ReceivedTimestamp, false);
sort_expected(order);
reader.set_read_order(order);

for (bool do_reset : {false, true}) {
reader.open(storage_options, rosbag2_cpp::ConverterOptions{});
EXPECT_TRUE(reader.set_read_order(order));
check_against_sorted(do_reset);
reader.close();
}
Expand All @@ -340,8 +341,8 @@ TEST_F(ReadOrderTest, received_timestamp_order) {
TEST_F(ReadOrderTest, reverse_received_timestamp_order) {
rosbag2_storage::ReadOrder order(rosbag2_storage::ReadOrder::ReceivedTimestamp, true);
sort_expected(order);
reader.set_read_order(order);
reader.open(storage_options, rosbag2_cpp::ConverterOptions{});
EXPECT_TRUE(reader.set_read_order(order));
auto metadata = reader.get_metadata();
// Seek to end before reading reverse messages
auto end_timestamp = (metadata.starting_time + metadata.duration).time_since_epoch().count();
Expand All @@ -357,22 +358,19 @@ TEST_F(ReadOrderTest, reverse_received_timestamp_order) {

TEST_F(ReadOrderTest, file_order) {
reader.open(storage_options, rosbag2_cpp::ConverterOptions{});
EXPECT_THROW(
reader.set_read_order(rosbag2_storage::ReadOrder(rosbag2_storage::ReadOrder::File, false)),
std::runtime_error);
EXPECT_FALSE(
reader.set_read_order(rosbag2_storage::ReadOrder(rosbag2_storage::ReadOrder::File, false)));
}

TEST_F(ReadOrderTest, reverse_file_order) {
reader.open(storage_options, rosbag2_cpp::ConverterOptions{});
EXPECT_THROW(
reader.set_read_order(rosbag2_storage::ReadOrder(rosbag2_storage::ReadOrder::File, true)),
std::runtime_error);
EXPECT_FALSE(
reader.set_read_order(rosbag2_storage::ReadOrder(rosbag2_storage::ReadOrder::File, true)));
}

TEST_F(ReadOrderTest, published_timestamp_order) {
reader.open(storage_options, rosbag2_cpp::ConverterOptions{});
EXPECT_THROW(
EXPECT_FALSE(
reader.set_read_order(
rosbag2_storage::ReadOrder(rosbag2_storage::ReadOrder::PublishedTimestamp, false)),
std::runtime_error);
rosbag2_storage::ReadOrder(rosbag2_storage::ReadOrder::PublishedTimestamp, false)));
}
1 change: 1 addition & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class SequentialWriterTest : public Test
[this](const rosbag2_storage::BagMetadata & metadata) {
v_intercepted_update_metadata_.emplace_back(metadata);
});
ON_CALL(*storage_, set_read_order).WillByDefault(Return(true));
}

~SequentialWriterTest() override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ TEST_F(StorageWithoutMetadataFileTest, open_uses_storage_id_from_storage_options
metadata.topics_with_message_count = {topic_information};

EXPECT_CALL(*storage_, get_metadata).Times(1).WillOnce(Return(metadata));
EXPECT_CALL(*storage_, set_read_order).Times(1).WillOnce(Return(true));
}

auto storage_factory = std::make_unique<StrictMock<MockStorageFactory>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ class ROSBAG2_STORAGE_PUBLIC BaseReadInterface
/// Note that when setting to reverse order, this will not change the read head, so user
/// must first seek() to the end in order to read messages from the end.
/// @param read_order The order in which to return messages.
virtual void set_read_order(const ReadOrder & read_order) = 0;
/// @throws runtime_error if the reader is not open.
/// @return true if the requested read order has been successfully set.
virtual bool set_read_order(const ReadOrder & read_order) = 0;

virtual bool has_next() = 0;

Expand Down
3 changes: 2 additions & 1 deletion rosbag2_storage/test/rosbag2_storage/test_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ void TestPlugin::update_metadata(const rosbag2_storage::BagMetadata & metadata)
(void)metadata;
}

void TestPlugin::set_read_order(const rosbag2_storage::ReadOrder & order)
bool TestPlugin::set_read_order(const rosbag2_storage::ReadOrder & order)
{
std::cout << "Set read order " << order.sort_by << " " << order.reverse << std::endl;
return true;
}

bool TestPlugin::has_next()
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_storage/test/rosbag2_storage/test_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TestPlugin : public rosbag2_storage::storage_interfaces::ReadWriteInterfac

void remove_topic(const rosbag2_storage::TopicMetadata & topic) override;

void set_read_order(const rosbag2_storage::ReadOrder &) override;
bool set_read_order(const rosbag2_storage::ReadOrder &) override;

bool has_next() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ void TestReadOnlyPlugin::open(
std::cout << "config file uri: " << storage_options.storage_config_uri << ".\n";
}

void TestReadOnlyPlugin::set_read_order(const rosbag2_storage::ReadOrder & order)
bool TestReadOnlyPlugin::set_read_order(const rosbag2_storage::ReadOrder & order)
{
std::cout << "Set read order " << order.sort_by << " " << order.reverse << std::endl;
return true;
}

bool TestReadOnlyPlugin::has_next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TestReadOnlyPlugin : public rosbag2_storage::storage_interfaces::ReadOnlyI
const rosbag2_storage::StorageOptions & storage_options,
rosbag2_storage::storage_interfaces::IOFlag flag) override;

void set_read_order(const rosbag2_storage::ReadOrder &) override;
bool set_read_order(const rosbag2_storage::ReadOrder &) override;

bool has_next() override;

Expand Down
59 changes: 32 additions & 27 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa

/** BaseReadInterface **/
#ifdef ROSBAG2_STORAGE_MCAP_HAS_SET_READ_ORDER
void set_read_order(const rosbag2_storage::ReadOrder &) override;
bool set_read_order(const rosbag2_storage::ReadOrder &) override;
#endif
bool has_next() override;
std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() override;
Expand Down Expand Up @@ -222,6 +222,7 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa

void reset_iterator(rcutils_time_point_value_t start_time = 0);
bool read_and_enqueue_message();
bool message_indexes_present();
void ensure_summary_read();

std::optional<rosbag2_storage::storage_interfaces::IOFlag> opened_as_;
Expand All @@ -234,8 +235,7 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
std::unordered_map<std::string, mcap::SchemaId> schema_ids_; // datatype -> schema_id
std::unordered_map<std::string, mcap::ChannelId> channel_ids_; // topic -> channel_id
rosbag2_storage::StorageFilter storage_filter_{};
mcap::ReadMessageOptions::ReadOrder read_order_ =
mcap::ReadMessageOptions::ReadOrder::LogTimeOrder;
mcap::ReadMessageOptions::ReadOrder read_order_ = mcap::ReadMessageOptions::ReadOrder::FileOrder;

std::unique_ptr<std::ifstream> input_;
std::unique_ptr<mcap::FileStreamReader> data_source_;
Expand Down Expand Up @@ -502,50 +502,55 @@ void MCAPStorage::ensure_summary_read()
if (!status.ok()) {
throw std::runtime_error(status.message);
}
// check if message indexes are present, if not, read in file order.
bool message_indexes_found = false;
for (const auto & ci : mcap_reader_->chunkIndexes()) {
if (ci.messageIndexLength > 0) {
message_indexes_found = true;
break;
}
}
if (!message_indexes_found) {
RCUTILS_LOG_WARN_NAMED(LOG_NAME,
"no message indices found, falling back to reading in file order");
read_order_ = mcap::ReadMessageOptions::ReadOrder::FileOrder;
}
has_read_summary_ = true;
}
}

bool MCAPStorage::message_indexes_present()
{
ensure_summary_read();
for (const auto & ci : mcap_reader_->chunkIndexes()) {
if (ci.messageIndexLength > 0) {
return true;
}
}
return false;
}

#ifdef ROSBAG2_STORAGE_MCAP_HAS_SET_READ_ORDER
void MCAPStorage::set_read_order(const rosbag2_storage::ReadOrder & read_order)
bool MCAPStorage::set_read_order(const rosbag2_storage::ReadOrder & read_order)
{
auto next_read_order = read_order_;
if (!has_read_summary_) {
throw std::runtime_error("set_read_order called before open()");
}
switch (read_order.sort_by) {
case rosbag2_storage::ReadOrder::ReceivedTimestamp:
if (!message_indexes_present()) {
RCUTILS_LOG_WARN_NAMED(
LOG_NAME, "attempted to read in receive timestamp order with no message index");
return false;
}
if (read_order.reverse) {
next_read_order = mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder;
read_order_ = mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder;
} else {
next_read_order = mcap::ReadMessageOptions::ReadOrder::LogTimeOrder;
read_order_ = mcap::ReadMessageOptions::ReadOrder::LogTimeOrder;
}
break;
case rosbag2_storage::ReadOrder::File:
if (!read_order.reverse) {
next_read_order = mcap::ReadMessageOptions::ReadOrder::FileOrder;
read_order_ = mcap::ReadMessageOptions::ReadOrder::FileOrder;
} else {
throw std::runtime_error("Reverse file order reading not implemented.");
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "reverse file-order reading not implemented");
return false;
}
break;
case rosbag2_storage::ReadOrder::PublishedTimestamp:
throw std::runtime_error("PublishedTimestamp read order not yet implemented in ROS 2");
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "publish timestamp order reading not implemented");
return false;
break;
}
if (next_read_order != read_order_) {
read_order_ = next_read_order;
reset_iterator();
}
reset_iterator();
return true;
}
#endif

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages)
override;

void set_read_order(const rosbag2_storage::ReadOrder &) override;
bool set_read_order(const rosbag2_storage::ReadOrder &) override;

bool has_next() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,17 +320,20 @@ void SqliteStorage::write(
commit_transaction();
}

void SqliteStorage::set_read_order(const rosbag2_storage::ReadOrder & read_order)
bool SqliteStorage::set_read_order(const rosbag2_storage::ReadOrder & read_order)
{
if (read_order.sort_by == rosbag2_storage::ReadOrder::PublishedTimestamp) {
throw std::runtime_error("Not Implemented - PublishedTimestamp read order.");
james-rms marked this conversation as resolved.
Show resolved Hide resolved
ROSBAG2_STORAGE_DEFAULT_PLUGINS_LOG_DEBUG("ReadOrder::PublishedTimestamp not implemented");
return false;
}
if (read_order.sort_by == rosbag2_storage::ReadOrder::File) {
throw std::runtime_error("Not Implemented - File read order");
ROSBAG2_STORAGE_DEFAULT_PLUGINS_LOG_DEBUG("ReadOrder::File not implemented");
return false;
}

read_order_ = read_order;
read_statement_ = nullptr;
return true;
}

bool SqliteStorage::has_next()
Expand Down
Loading