Skip to content

Commit

Permalink
Use rw_lock to protect mcap metadata lists. (#1561)
Browse files Browse the repository at this point in the history
* use rw_lock to protect mcap metadata lists.

Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>

* introduce MCAPStorage::write_lock_free private method.

Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>

---------

Signed-off-by: Tomoya Fujita <Tomoya.Fujita@sony.com>
  • Loading branch information
fujitatomoya authored Feb 3, 2024
1 parent edda376 commit 90d1da8
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rcpputils/thread_safety_annotations.hpp"
#include "rcutils/logging_macros.h"
#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/ros_helper.hpp"
Expand Down Expand Up @@ -40,6 +41,7 @@

#include <algorithm>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -228,6 +230,7 @@ class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorage

private:
void read_metadata();
void write_lock_free(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg);
void open_impl(const std::string & uri, const std::string & preset_profile,
rosbag2_storage::storage_interfaces::IOFlag io_flag,
const std::string & storage_config_uri);
Expand All @@ -244,9 +247,13 @@ class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorage
std::shared_ptr<rosbag2_storage::SerializedBagMessage> next_;

rosbag2_storage::BagMetadata metadata_{};
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_;
std::unordered_map<std::string, mcap::SchemaId> schema_ids_; // datatype -> schema_id
std::unordered_map<std::string, mcap::ChannelId> channel_ids_; // topic -> channel_id
std::mutex mcap_storage_mutex_;
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_
RCPPUTILS_TSA_GUARDED_BY(mcap_storage_mutex_);
std::unordered_map<std::string, mcap::SchemaId> schema_ids_
RCPPUTILS_TSA_GUARDED_BY(mcap_storage_mutex_); // datatype -> schema_id
std::unordered_map<std::string, mcap::ChannelId> channel_ids_
RCPPUTILS_TSA_GUARDED_BY(mcap_storage_mutex_); // topic -> channel_id
rosbag2_storage::StorageFilter storage_filter_{};
mcap::ReadMessageOptions::ReadOrder read_order_ = mcap::ReadMessageOptions::ReadOrder::FileOrder;

Expand Down Expand Up @@ -733,6 +740,21 @@ uint64_t MCAPStorage::get_minimum_split_file_size() const

/** BaseWriteInterface **/
void MCAPStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg)
{
std::lock_guard<std::mutex> lock(mcap_storage_mutex_);
write_lock_free(msg);
}

void MCAPStorage::write(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msgs)
{
std::lock_guard<std::mutex> lock(mcap_storage_mutex_);
for (const auto & msg : msgs) {
write_lock_free(msg);
}
}

void MCAPStorage::write_lock_free(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg)
{
const auto topic_it = topics_.find(msg->topic_name);
if (topic_it == topics_.end()) {
Expand Down Expand Up @@ -773,17 +795,10 @@ void MCAPStorage::write(std::shared_ptr<const rosbag2_storage::SerializedBagMess
metadata_.duration = std::max(metadata_.duration, message_time - metadata_.starting_time);
}

void MCAPStorage::write(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msgs)
{
for (const auto & msg : msgs) {
write(msg);
}
}

void MCAPStorage::create_topic(const rosbag2_storage::TopicMetadata & topic,
const rosbag2_storage::MessageDefinition & message_definition)
{
std::lock_guard<std::mutex> lock(mcap_storage_mutex_);
auto topic_info = rosbag2_storage::TopicInformation{topic, 0};
const auto topic_it = topics_.find(topic.name);
if (topic_it == topics_.end()) {
Expand Down Expand Up @@ -830,6 +845,7 @@ void MCAPStorage::create_topic(const rosbag2_storage::TopicMetadata & topic,

void MCAPStorage::remove_topic(const rosbag2_storage::TopicMetadata & topic)
{
std::lock_guard<std::mutex> lock(mcap_storage_mutex_);
const auto topic_it = topics_.find(topic.name);
if (topic_it != topics_.end()) {
const auto & datatype = topic_it->second.topic_metadata.type;
Expand Down

0 comments on commit 90d1da8

Please sign in to comment.