Skip to content

Commit

Permalink
rosbag2_storage_mcap: update to new read_order API
Browse files Browse the repository at this point in the history
Signed-off-by: James Smith <james@foxglove.dev>
  • Loading branch information
james-rms committed Nov 30, 2022
1 parent 3a7c9d9 commit c2b36aa
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa

/** BaseReadInterface **/
#ifdef ROSBAG2_STORAGE_MCAP_HAS_SET_READ_ORDER
void set_read_order(const rosbag2_storage::ReadOrder &) override;
bool set_read_order(const rosbag2_storage::ReadOrder &) override;
#endif
bool has_next() override;
std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() override;
Expand Down Expand Up @@ -222,6 +222,7 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa

void reset_iterator(rcutils_time_point_value_t start_time = 0);
bool read_and_enqueue_message();
bool message_indexes_present();
void ensure_summary_read();

std::optional<rosbag2_storage::storage_interfaces::IOFlag> opened_as_;
Expand All @@ -234,8 +235,7 @@ class MCAPStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
std::unordered_map<std::string, mcap::SchemaId> schema_ids_; // datatype -> schema_id
std::unordered_map<std::string, mcap::ChannelId> channel_ids_; // topic -> channel_id
rosbag2_storage::StorageFilter storage_filter_{};
mcap::ReadMessageOptions::ReadOrder read_order_ =
mcap::ReadMessageOptions::ReadOrder::LogTimeOrder;
mcap::ReadMessageOptions::ReadOrder read_order_ = mcap::ReadMessageOptions::ReadOrder::FileOrder;

std::unique_ptr<std::ifstream> input_;
std::unique_ptr<mcap::FileStreamReader> data_source_;
Expand Down Expand Up @@ -502,50 +502,55 @@ void MCAPStorage::ensure_summary_read()
if (!status.ok()) {
throw std::runtime_error(status.message);
}
// check if message indexes are present, if not, read in file order.
bool message_indexes_found = false;
for (const auto & ci : mcap_reader_->chunkIndexes()) {
if (ci.messageIndexLength > 0) {
message_indexes_found = true;
break;
}
}
if (!message_indexes_found) {
RCUTILS_LOG_WARN_NAMED(LOG_NAME,
"no message indices found, falling back to reading in file order");
read_order_ = mcap::ReadMessageOptions::ReadOrder::FileOrder;
}
has_read_summary_ = true;
}
}

bool MCAPStorage::message_indexes_present()
{
ensure_summary_read();
for (const auto & ci : mcap_reader_->chunkIndexes()) {
if (ci.messageIndexLength > 0) {
return true;
}
}
return false;
}

#ifdef ROSBAG2_STORAGE_MCAP_HAS_SET_READ_ORDER
void MCAPStorage::set_read_order(const rosbag2_storage::ReadOrder & read_order)
bool MCAPStorage::set_read_order(const rosbag2_storage::ReadOrder & read_order)
{
auto next_read_order = read_order_;
if (!has_read_summary_) {
throw std::runtime_error("set_read_order called before open()");
}
switch (read_order.sort_by) {
case rosbag2_storage::ReadOrder::ReceivedTimestamp:
if (!message_indexes_present()) {
RCUTILS_LOG_WARN_NAMED(
LOG_NAME, "attempted to read in receive timestamp order with no message index");
return false;
}
if (read_order.reverse) {
next_read_order = mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder;
read_order_ = mcap::ReadMessageOptions::ReadOrder::ReverseLogTimeOrder;
} else {
next_read_order = mcap::ReadMessageOptions::ReadOrder::LogTimeOrder;
read_order_ = mcap::ReadMessageOptions::ReadOrder::LogTimeOrder;
}
break;
case rosbag2_storage::ReadOrder::File:
if (!read_order.reverse) {
next_read_order = mcap::ReadMessageOptions::ReadOrder::FileOrder;
read_order_ = mcap::ReadMessageOptions::ReadOrder::FileOrder;
} else {
throw std::runtime_error("Reverse file order reading not implemented.");
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "reverse file-order reading not implemented");
return false;
}
break;
case rosbag2_storage::ReadOrder::PublishedTimestamp:
throw std::runtime_error("PublishedTimestamp read order not yet implemented in ROS 2");
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "publish timestamp order reading not implemented");
return false;
break;
}
if (next_read_order != read_order_) {
read_order_ = next_read_order;
reset_iterator();
}
reset_iterator();
return true;
}
#endif

Expand Down

0 comments on commit c2b36aa

Please sign in to comment.