diff --git a/rosbag2_storage_mcap/src/mcap_storage.cpp b/rosbag2_storage_mcap/src/mcap_storage.cpp index 962b426427..4fe6895b22 100644 --- a/rosbag2_storage_mcap/src/mcap_storage.cpp +++ b/rosbag2_storage_mcap/src/mcap_storage.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "rcpputils/thread_safety_annotations.hpp" #include "rcutils/logging_macros.h" #include "rosbag2_storage/metadata_io.hpp" #include "rosbag2_storage/ros_helper.hpp" @@ -40,6 +41,7 @@ #include #include +#include #include #include #include @@ -228,6 +230,7 @@ class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorage private: void read_metadata(); + void write_lock_free(std::shared_ptr msg); void open_impl(const std::string & uri, const std::string & preset_profile, rosbag2_storage::storage_interfaces::IOFlag io_flag, const std::string & storage_config_uri); @@ -244,9 +247,13 @@ class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorage std::shared_ptr next_; rosbag2_storage::BagMetadata metadata_{}; - std::unordered_map topics_; - std::unordered_map schema_ids_; // datatype -> schema_id - std::unordered_map channel_ids_; // topic -> channel_id + std::mutex mcap_storage_mutex_; + std::unordered_map topics_ + RCPPUTILS_TSA_GUARDED_BY(mcap_storage_mutex_); + std::unordered_map schema_ids_ + RCPPUTILS_TSA_GUARDED_BY(mcap_storage_mutex_); // datatype -> schema_id + std::unordered_map channel_ids_ + RCPPUTILS_TSA_GUARDED_BY(mcap_storage_mutex_); // topic -> channel_id rosbag2_storage::StorageFilter storage_filter_{}; mcap::ReadMessageOptions::ReadOrder read_order_ = mcap::ReadMessageOptions::ReadOrder::FileOrder; @@ -733,6 +740,21 @@ uint64_t MCAPStorage::get_minimum_split_file_size() const /** BaseWriteInterface **/ void MCAPStorage::write(std::shared_ptr msg) +{ + std::lock_guard lock(mcap_storage_mutex_); + write_lock_free(msg); +} + +void MCAPStorage::write( + const std::vector> & msgs) +{ + std::lock_guard lock(mcap_storage_mutex_); + for (const auto & msg : msgs) { + write_lock_free(msg); + } +} + +void MCAPStorage::write_lock_free(std::shared_ptr msg) { const auto topic_it = topics_.find(msg->topic_name); if (topic_it == topics_.end()) { @@ -773,17 +795,10 @@ void MCAPStorage::write(std::shared_ptr> & msgs) -{ - for (const auto & msg : msgs) { - write(msg); - } -} - void MCAPStorage::create_topic(const rosbag2_storage::TopicMetadata & topic, const rosbag2_storage::MessageDefinition & message_definition) { + std::lock_guard lock(mcap_storage_mutex_); auto topic_info = rosbag2_storage::TopicInformation{topic, 0}; const auto topic_it = topics_.find(topic.name); if (topic_it == topics_.end()) { @@ -830,6 +845,7 @@ void MCAPStorage::create_topic(const rosbag2_storage::TopicMetadata & topic, void MCAPStorage::remove_topic(const rosbag2_storage::TopicMetadata & topic) { + std::lock_guard lock(mcap_storage_mutex_); const auto topic_it = topics_.find(topic.name); if (topic_it != topics_.end()) { const auto & datatype = topic_it->second.topic_metadata.type;