diff --git a/rosbag2_cpp/CMakeLists.txt b/rosbag2_cpp/CMakeLists.txt index 0a61a7e0ce..063e2bbc5c 100644 --- a/rosbag2_cpp/CMakeLists.txt +++ b/rosbag2_cpp/CMakeLists.txt @@ -55,7 +55,9 @@ find_package(rosidl_typesupport_introspection_cpp REQUIRED) add_library(${PROJECT_NAME} SHARED src/rosbag2_cpp/cache/cache_consumer.cpp src/rosbag2_cpp/cache/message_cache_buffer.cpp + src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp src/rosbag2_cpp/cache/message_cache.cpp + src/rosbag2_cpp/cache/circular_message_cache.cpp src/rosbag2_cpp/clocks/time_controller_clock.cpp src/rosbag2_cpp/converter.cpp src/rosbag2_cpp/info.cpp @@ -187,6 +189,12 @@ if(BUILD_TESTING) target_link_libraries(test_message_cache ${PROJECT_NAME}) endif() + ament_add_gmock(test_circular_message_cache + test/rosbag2_cpp/test_circular_message_cache.cpp) + if(TARGET test_circular_message_cache) + target_link_libraries(test_circular_message_cache ${PROJECT_NAME}) + endif() + # If compiling with gcc, run this test with sanitizers enabled ament_add_gmock(test_ros2_message diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/cache_buffer_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/cache_buffer_interface.hpp new file mode 100644 index 0000000000..dd65c2aa25 --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/cache_buffer_interface.hpp @@ -0,0 +1,74 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CACHE__CACHE_BUFFER_INTERFACE_HPP_ +#define ROSBAG2_CPP__CACHE__CACHE_BUFFER_INTERFACE_HPP_ + +#include +#include + +#include "rosbag2_cpp/visibility_control.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" + +namespace rosbag2_cpp +{ +namespace cache +{ +/** +* This class provides the interface for all CacheBuffer implementations. The +* role of these buffers is store messages that are received via topic subscriptions +* before they are written to the bagfile. +* +* Any class that implements CacheBufferInterface is reponsible for handling +* synchronization that is needed for the buffer to operate properly. +*/ +class ROSBAG2_CPP_PUBLIC CacheBufferInterface +{ +public: + using buffer_element_t = std::shared_ptr; + virtual ~CacheBufferInterface() {} + + /** + * Pushes a SerializedBagMessage into the cache buffer. + * + * \param msg SerializedBagMessage to add to the buffer. + * \return whether message was successfully added to the cache. + */ + virtual bool push(buffer_element_t msg) = 0; + + /** + * Clears the buffer by removing all remaining messages. + */ + virtual void clear() = 0; + + /** + * Check the buffer message count. + * + * \return number of elements in the buffer. + */ + virtual size_t size() = 0; + + /** + * Get the data/messages stored in the buffer. This should only be + * called once no more messages are being added to the buffer. + * + * \return a vector containing messages in the buffer. + */ + virtual const std::vector & data() = 0; +}; + +} // namespace cache +} // namespace rosbag2_cpp + +#endif // ROSBAG2_CPP__CACHE__CACHE_BUFFER_INTERFACE_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp index 7a61485c15..9a0ebb6c82 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp @@ -22,7 +22,9 @@ #include #include +#include "rosbag2_cpp/cache/cache_buffer_interface.hpp" #include "rosbag2_cpp/cache/message_cache.hpp" +#include "rosbag2_cpp/cache/message_cache_interface.hpp" // This is necessary because of using stl types here. It is completely safe, because // a) the member is not accessible from the outside @@ -61,10 +63,10 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer { public: using consume_callback_function_t = std::function &)>; + std::vector &)>; CacheConsumer( - std::shared_ptr message_cache, + std::shared_ptr message_cache, consume_callback_function_t consume_callback); ~CacheConsumer(); @@ -76,7 +78,7 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer void change_consume_callback(consume_callback_function_t callback); private: - std::shared_ptr message_cache_; + std::shared_ptr message_cache_; consume_callback_function_t consume_callback_; /// Write buffer data to a storage diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp new file mode 100644 index 0000000000..97e3a03175 --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp @@ -0,0 +1,76 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CACHE__CIRCULAR_MESSAGE_CACHE_HPP_ +#define ROSBAG2_CPP__CACHE__CIRCULAR_MESSAGE_CACHE_HPP_ + +#include +#include +#include + +#include "rosbag2_cpp/cache/message_cache_circular_buffer.hpp" +#include "rosbag2_cpp/cache/message_cache_interface.hpp" +#include "rosbag2_cpp/cache/cache_buffer_interface.hpp" +#include "rosbag2_cpp/visibility_control.hpp" + +#include "rosbag2_storage/serialized_bag_message.hpp" + +// This is necessary because of using stl types here. It is completely safe, because +// a) the member is not accessible from the outside +// b) there are no inline functions. +#ifdef _WIN32 +# pragma warning(push) +# pragma warning(disable:4251) +#endif + +namespace rosbag2_cpp +{ +namespace cache +{ + +class ROSBAG2_CPP_PUBLIC CircularMessageCache + : public MessageCacheInterface +{ +public: + explicit CircularMessageCache(size_t max_buffer_size); + + /// Puts msg into circular buffer, replacing the oldest msg when buffer is full + void push(std::shared_ptr msg) override; + + /// get current buffer to consume + std::shared_ptr consumer_buffer() override; + + /// Swap the primary and secondary buffer before consumption. + /// NOTE: consumer_buffer() should be called sequentially after + /// swap_buffer() to ensure expected behavior. Calling swap_buffer() + /// while accessing consumer_buffer() will be undefined behavior. + void swap_buffers() override; + +private: + /// Double buffers + std::shared_ptr primary_buffer_; + std::shared_ptr secondary_buffer_; + + /// Double buffers sync + std::mutex cache_mutex_; +}; + +} // namespace cache +} // namespace rosbag2_cpp + +#ifdef _WIN32 +# pragma warning(pop) +#endif + +#endif // ROSBAG2_CPP__CACHE__CIRCULAR_MESSAGE_CACHE_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp index a24e15348a..92f8562057 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp @@ -24,6 +24,8 @@ #include #include "rosbag2_cpp/cache/message_cache_buffer.hpp" +#include "rosbag2_cpp/cache/message_cache_interface.hpp" +#include "rosbag2_cpp/cache/cache_buffer_interface.hpp" #include "rosbag2_cpp/visibility_control.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" @@ -48,7 +50,7 @@ namespace cache * the consumer performance (which can be a bottleneck, e.g. disk writes). * * Two instances of MessageCacheBuffer are used, one for producer and one for -* the consumer. Buffers are switched through wait_for_buffer function, which +* the consumer. Buffers are switched through swap_buffers function, which * involves synchronization and a simple pointer switch. * * The cache can enter a flushing state, intended as a finalization state, @@ -60,26 +62,24 @@ namespace cache * performance issues, most likely with the CacheConsumer consumer callback. */ class ROSBAG2_CPP_PUBLIC MessageCache + : public MessageCacheInterface { public: - explicit MessageCache(uint64_t max_buffer_size); + explicit MessageCache(size_t max_buffer_size); ~MessageCache(); /// Puts msg into primary buffer. With full cache, msg is ignored and counted as lost - void push(std::shared_ptr msg); + void push(std::shared_ptr msg) override; /// Summarize dropped/remaining messages - void log_dropped(); - - /// Producer API: notify consumer to wake-up (primary buffer has data) - void notify_buffer_consumer(); + void log_dropped() override; /// Set the cache to consume-only mode for final buffer flush before closing - void finalize(); + void finalize() override; /// Notify that flushing is complete - void notify_flushing_done(); + void notify_flushing_done() override; /** * Consumer API: wait until primary buffer is ready and swap it with consumer buffer. @@ -88,15 +88,18 @@ class ROSBAG2_CPP_PUBLIC MessageCache * a) data was inserted into the producer buffer, consuming can continue after a swap * b) we are flushing the data (in case we missed the last notification when consuming) **/ - void wait_for_buffer(); + void swap_buffers() override; /// Consumer API: get current buffer to consume - std::shared_ptr consumer_buffer(); + std::shared_ptr consumer_buffer() override; /// Exposes counts of messages dropped per topic std::unordered_map messages_dropped() const; private: + /// Producer API: notify consumer to wake-up (primary buffer has data) + void notify_buffer_consumer(); + /// Double buffers std::shared_ptr primary_buffer_; std::shared_ptr secondary_buffer_; diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_buffer.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_buffer.hpp index 2fab5ed4a5..f9d8c07fc8 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_buffer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_buffer.hpp @@ -20,6 +20,7 @@ #include #include "rosbag2_cpp/visibility_control.hpp" +#include "rosbag2_cpp/cache/cache_buffer_interface.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" // This is necessary because of using stl types here. It is completely safe, because @@ -45,32 +46,31 @@ namespace cache * ->byte_size() - like interface */ class ROSBAG2_CPP_PUBLIC MessageCacheBuffer + : public CacheBufferInterface { public: - explicit MessageCacheBuffer(const uint64_t max_cache_size); - - using buffer_element_t = std::shared_ptr; + explicit MessageCacheBuffer(size_t max_cache_size); /** * If buffer size got some space left, we push message regardless of its size, but if * this results in exceeding buffer size, we mark buffer to drop all new incoming messages. * This flag is cleared when buffers are swapped. */ - bool push(buffer_element_t msg); + bool push(CacheBufferInterface::buffer_element_t msg) override; /// Clear buffer - void clear(); + void clear() override; /// Get number of elements in the buffer - size_t size(); + size_t size() override; /// Get buffer data - const std::vector & data(); + const std::vector & data() override; private: - std::vector buffer_; - uint64_t buffer_bytes_size_ {0u}; - const uint64_t max_bytes_size_; + std::vector buffer_; + size_t buffer_bytes_size_ {0u}; + const size_t max_bytes_size_; /// set when buffer is full and should drop messages instead of inserting them std::atomic_bool drop_messages_ {false}; diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp new file mode 100644 index 0000000000..bd6a022f2e --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_circular_buffer.hpp @@ -0,0 +1,85 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CACHE__MESSAGE_CACHE_CIRCULAR_BUFFER_HPP_ +#define ROSBAG2_CPP__CACHE__MESSAGE_CACHE_CIRCULAR_BUFFER_HPP_ + +#include +#include +#include + +#include "rosbag2_cpp/visibility_control.hpp" +#include "rosbag2_cpp/cache/cache_buffer_interface.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" + +// This is necessary because of using stl types here. It is completely safe, because +// a) the member is not accessible from the outside +// b) there are no inline functions. +#ifdef _WIN32 +# pragma warning(push) +# pragma warning(disable:4251) +#endif + +namespace rosbag2_cpp +{ +namespace cache +{ + +/** +* This class implements a circular buffer message cache. Since the buffer +* size is limited by total byte size of the storage messages rather than +* a fix number of messages, a deque is used instead of a vector since +* older messages can always be dropped from the front and new messages added +* to the end. The buffer will never consume more than max_cache_size bytes, +* and will log a warning message if an individual message exceeds the buffer +* size. +*/ +class ROSBAG2_CPP_PUBLIC MessageCacheCircularBuffer + : public CacheBufferInterface +{ +public: + // Delete default constructor since max_cache_size is required + MessageCacheCircularBuffer() = delete; + explicit MessageCacheCircularBuffer(size_t max_cache_size); + + /** + * If buffer size has some space left, we push the message regardless of its size, + * but if this results in exceeding buffer size, we begin dropping old messages. + */ + bool push(CacheBufferInterface::buffer_element_t msg) override; + + /// Clear buffer + void clear() override; + + /// Get number of elements in the buffer + size_t size() override; + + /// Get buffer data + const std::vector & data() override; + +private: + std::deque buffer_; + std::vector msg_vector_; + size_t buffer_bytes_size_ {0u}; + const size_t max_bytes_size_; +}; + +} // namespace cache +} // namespace rosbag2_cpp + +#ifdef _WIN32 +# pragma warning(pop) +#endif + +#endif // ROSBAG2_CPP__CACHE__MESSAGE_CACHE_CIRCULAR_BUFFER_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp new file mode 100644 index 0000000000..1609e108c9 --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp @@ -0,0 +1,83 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__CACHE__MESSAGE_CACHE_INTERFACE_HPP_ +#define ROSBAG2_CPP__CACHE__MESSAGE_CACHE_INTERFACE_HPP_ + +#include + +#include "rosbag2_cpp/visibility_control.hpp" +#include "rosbag2_cpp/cache/cache_buffer_interface.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" + +namespace rosbag2_cpp +{ +namespace cache +{ +/** +* This class provides the interface for all MessageCache implementations. +* The interface is designed around double buffered MessageCache implementations +* due to the inherent performance benefit that they provide. +* +* Any class that implements the MessageCacheInterface should make use of a class +* derived from the CacheBufferInterface as its buffer(s). Any synchronization that +* is required to manage more than one CacheBuffer should be handled in the class that +* implements this interface. +*/ +class ROSBAG2_CPP_PUBLIC MessageCacheInterface +{ +public: + virtual ~MessageCacheInterface() {} + + /** + * Push a pointer to a bag message into the primary buffer. + */ + virtual void push(std::shared_ptr msg) = 0; + + /** + * Get a pointer to the buffer that can be used for consuming the + * cached messages. In a double buffer implementation, this should + * always be called after swap_buffers(). + * + * \return a pointer to the consumer buffer interface. + */ + virtual std::shared_ptr consumer_buffer() = 0; + + /** + * Swap primary and secondary buffers when using a double buffer + * configuration. + */ + virtual void swap_buffers() = 0; + + /** + * Mark message cache as having finished flushing + * remaining messages to the bagfile. + */ + virtual void notify_flushing_done() {} + + /** + * Print a log message with details of any dropped messages. + */ + virtual void log_dropped() {} + + /** + * Perform any cleanup or final tasks before exitting. + */ + virtual void finalize() {} +}; + +} // namespace cache +} // namespace rosbag2_cpp + +#endif // ROSBAG2_CPP__CACHE__MESSAGE_CACHE_INTERFACE_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index c3af24d2df..14b5602028 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -23,6 +23,7 @@ #include "rosbag2_cpp/cache/cache_consumer.hpp" #include "rosbag2_cpp/cache/message_cache.hpp" +#include "rosbag2_cpp/cache/message_cache_interface.hpp" #include "rosbag2_cpp/converter.hpp" #include "rosbag2_cpp/serialization_format_converter_factory.hpp" #include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp" @@ -115,7 +116,7 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter std::unique_ptr converter_; bool use_cache_; - std::shared_ptr message_cache_; + std::shared_ptr message_cache_; std::unique_ptr cache_consumer_; void switch_to_next_storage(); diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp index 63dfd14420..1632560b21 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp @@ -23,7 +23,7 @@ namespace cache { CacheConsumer::CacheConsumer( - std::shared_ptr message_cache, + std::shared_ptr message_cache, consume_callback_function_t consume_callback) : message_cache_(message_cache), consume_callback_(consume_callback) @@ -68,7 +68,7 @@ void CacheConsumer::exec_consuming() // Invariant at loop start: consumer buffer is empty // swap producer buffer with consumer buffer - message_cache_->wait_for_buffer(); + message_cache_->swap_buffers(); // make sure to use consistent callback for each iteration auto callback_for_this_loop = consume_callback_; diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp new file mode 100644 index 0000000000..725729aec5 --- /dev/null +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp @@ -0,0 +1,55 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "rosbag2_cpp/cache/circular_message_cache.hpp" +#include "rosbag2_cpp/cache/message_cache_circular_buffer.hpp" +#include "rosbag2_cpp/cache/cache_buffer_interface.hpp" +#include "rosbag2_cpp/logging.hpp" + +namespace rosbag2_cpp +{ +namespace cache +{ + +CircularMessageCache::CircularMessageCache(size_t max_buffer_size) +{ + primary_buffer_ = std::make_shared(max_buffer_size); + secondary_buffer_ = std::make_shared(max_buffer_size); +} + +void CircularMessageCache::push(std::shared_ptr msg) +{ + std::lock_guard cache_lock(cache_mutex_); + primary_buffer_->push(msg); +} + +std::shared_ptr CircularMessageCache::consumer_buffer() +{ + return secondary_buffer_; +} + +void CircularMessageCache::swap_buffers() +{ + std::lock_guard cache_lock(cache_mutex_); + secondary_buffer_->clear(); + std::swap(primary_buffer_, secondary_buffer_); +} + +} // namespace cache +} // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp index 44857644b5..388f87c8df 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp @@ -20,6 +20,7 @@ #include #include "rosbag2_cpp/cache/message_cache.hpp" +#include "rosbag2_cpp/cache/message_cache_interface.hpp" #include "rosbag2_cpp/logging.hpp" namespace rosbag2_cpp @@ -27,7 +28,7 @@ namespace rosbag2_cpp namespace cache { -MessageCache::MessageCache(uint64_t max_buffer_size) +MessageCache::MessageCache(size_t max_buffer_size) { primary_buffer_ = std::make_shared(max_buffer_size); secondary_buffer_ = std::make_shared(max_buffer_size); @@ -73,7 +74,7 @@ void MessageCache::notify_buffer_consumer() cache_condition_var_.notify_one(); } -void MessageCache::wait_for_buffer() +void MessageCache::swap_buffers() { std::unique_lock lock(cache_mutex_); if (!flushing_) { @@ -87,7 +88,7 @@ void MessageCache::wait_for_buffer() std::swap(primary_buffer_, secondary_buffer_); } -std::shared_ptr MessageCache::consumer_buffer() +std::shared_ptr MessageCache::consumer_buffer() { return secondary_buffer_; } diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_buffer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_buffer.cpp index ace6d7fb62..46636d8a3f 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_buffer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_buffer.cpp @@ -15,6 +15,7 @@ #include #include +#include "rosbag2_cpp/cache/cache_buffer_interface.hpp" #include "rosbag2_cpp/cache/message_cache_buffer.hpp" namespace rosbag2_cpp @@ -22,12 +23,12 @@ namespace rosbag2_cpp namespace cache { -MessageCacheBuffer::MessageCacheBuffer(const uint64_t max_cache_size) +MessageCacheBuffer::MessageCacheBuffer(size_t max_cache_size) : max_bytes_size_(max_cache_size) { } -bool MessageCacheBuffer::push(buffer_element_t msg) +bool MessageCacheBuffer::push(CacheBufferInterface::buffer_element_t msg) { bool pushed = false; if (!drop_messages_) { @@ -54,7 +55,7 @@ size_t MessageCacheBuffer::size() return buffer_.size(); } -const std::vector & MessageCacheBuffer::data() +const std::vector & MessageCacheBuffer::data() { return buffer_; } diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp new file mode 100644 index 0000000000..8a1b212db6 --- /dev/null +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp @@ -0,0 +1,73 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rosbag2_cpp/logging.hpp" +#include "rosbag2_cpp/cache/cache_buffer_interface.hpp" +#include "rosbag2_cpp/cache/message_cache_circular_buffer.hpp" + +namespace rosbag2_cpp +{ +namespace cache +{ + +MessageCacheCircularBuffer::MessageCacheCircularBuffer(size_t max_cache_size) +: max_bytes_size_(max_cache_size) +{ +} + +bool MessageCacheCircularBuffer::push(CacheBufferInterface::buffer_element_t msg) +{ + // Drop message if it exceeds the buffer size + if (msg->serialized_data->buffer_length > max_bytes_size_) { + ROSBAG2_CPP_LOG_WARN_STREAM("Last message exceeds snapshot buffer size. Dropping message!"); + return false; + } + + // Remove any old items until there is room for new message + while (buffer_bytes_size_ > (max_bytes_size_ - msg->serialized_data->buffer_length)) { + buffer_bytes_size_ -= buffer_.front()->serialized_data->buffer_length; + buffer_.pop_front(); + } + // Add new message to end of buffer + buffer_bytes_size_ += msg->serialized_data->buffer_length; + buffer_.push_back(msg); + + return true; +} + +void MessageCacheCircularBuffer::clear() +{ + buffer_.clear(); + buffer_bytes_size_ = 0u; +} + +size_t MessageCacheCircularBuffer::size() +{ + return buffer_.size(); +} + +const std::vector & MessageCacheCircularBuffer::data() +{ + // Copy data to vector to maintain same interface as MessageCacheBuffer + msg_vector_ = std::vector( + buffer_.begin(), buffer_.end()); + return msg_vector_; +} + +} // namespace cache +} // namespace rosbag2_cpp diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp new file mode 100644 index 0000000000..aa3f9b732c --- /dev/null +++ b/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp @@ -0,0 +1,126 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include + +#include "rosbag2_storage/ros_helper.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" + +#include "rosbag2_cpp/cache/circular_message_cache.hpp" + +using namespace testing; // NOLINT + +namespace +{ +inline size_t abs_diff(size_t a, size_t b) +{ + return a < b ? b - a : a - b; +} + +std::shared_ptr make_test_msg() +{ + static uint32_t counter = 0; + std::string msg_content = "Hello" + std::to_string(counter++); + auto msg_length = msg_content.length(); + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = rosbag2_storage::make_serialized_message( + msg_content.c_str(), msg_length); + return message; +} + +std::string deserialize_message(std::shared_ptr serialized_message) +{ + std::unique_ptr copied(new uint8_t[serialized_message->buffer_length + 1]); + std::copy( + serialized_message->buffer, + serialized_message->buffer + serialized_message->buffer_length, + copied.get()); + copied.get()[serialized_message->buffer_length] = '\0'; + std::string message_content(reinterpret_cast(copied.get())); + return message_content; +} +} // namespace + +class CircularMessageCacheTest : public Test +{ +public: + CircularMessageCacheTest() {} + + virtual ~CircularMessageCacheTest() = default; + + const size_t cache_size_ {1 * 500}; // 500B cache +}; + +TEST_F(CircularMessageCacheTest, circular_message_cache_overwrites_old) { + const unsigned message_count = 100; + + auto circular_message_cache = std::make_shared( + cache_size_); + + for (unsigned i = 0; i < message_count; ++i) { + auto msg = make_test_msg(); + circular_message_cache->push(msg); + } + // Swap cache + circular_message_cache->swap_buffers(); + + auto consumer_buffer = circular_message_cache->consumer_buffer(); + auto message_vector = consumer_buffer->data(); + std::string first_message = deserialize_message(message_vector.front()->serialized_data); + + // Old messages should be dropped + EXPECT_THAT(first_message, StrNe("Hello0")); + + size_t message_data_size = 0; + + for (auto & msg : message_vector) { + message_data_size += msg->serialized_data->buffer_length; + } + + size_t cache_size_diff = abs_diff(cache_size_, message_data_size); + size_t allowed_diff{10}; + + // Actual stored data size should be roughly the desired cache size + EXPECT_THAT(cache_size_diff, Lt(allowed_diff)); +} + +TEST_F(CircularMessageCacheTest, circular_message_cache_ensure_empty) { + const unsigned message_count = 100; + + auto circular_message_cache = std::make_shared( + cache_size_); + + for (unsigned i = 0; i < message_count; ++i) { + auto msg = make_test_msg(); + circular_message_cache->push(msg); + } + // Swap filled cache to secondary + circular_message_cache->swap_buffers(); + EXPECT_THAT(circular_message_cache->consumer_buffer()->size(), Ne(0u)); + + // Swap back to primary (expected to empty buffer) + circular_message_cache->swap_buffers(); + + // Swap back to secondary without adding messages + circular_message_cache->swap_buffers(); + // Cache should have been emptied + EXPECT_THAT(circular_message_cache->consumer_buffer()->size(), Eq(0u)); +} diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp index 6c0a0f22f5..d8c741359d 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp @@ -173,7 +173,7 @@ class PlaySrvsTest : public RosBag2PlayTestFixture }); } - void topic_callback(const test_msgs::msg::BasicTypes::ConstSharedPtr /* msg */) + void topic_callback(std::shared_ptr/* msg */) { { std::lock_guard lk(got_msg_mutex_);