Skip to content

Commit

Permalink
Circular Message Cache implementation for snapshot feature (ros2#844)
Browse files Browse the repository at this point in the history
* Add circular message cache buffer implementation to be used for in-memory snapshot mode

Signed-off-by: Cameron Miller <cammlle@amazon.com>
Co-authored-by: Emerson Knapp <emerson.b.knapp@gmail.com>
  • Loading branch information
camm73 and emersonknapp authored Sep 13, 2021
1 parent ef40da9 commit 622b0ad
Show file tree
Hide file tree
Showing 16 changed files with 622 additions and 34 deletions.
8 changes: 8 additions & 0 deletions rosbag2_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ find_package(rosidl_typesupport_introspection_cpp REQUIRED)
add_library(${PROJECT_NAME} SHARED
src/rosbag2_cpp/cache/cache_consumer.cpp
src/rosbag2_cpp/cache/message_cache_buffer.cpp
src/rosbag2_cpp/cache/message_cache_circular_buffer.cpp
src/rosbag2_cpp/cache/message_cache.cpp
src/rosbag2_cpp/cache/circular_message_cache.cpp
src/rosbag2_cpp/clocks/time_controller_clock.cpp
src/rosbag2_cpp/converter.cpp
src/rosbag2_cpp/info.cpp
Expand Down Expand Up @@ -187,6 +189,12 @@ if(BUILD_TESTING)
target_link_libraries(test_message_cache ${PROJECT_NAME})
endif()

ament_add_gmock(test_circular_message_cache
test/rosbag2_cpp/test_circular_message_cache.cpp)
if(TARGET test_circular_message_cache)
target_link_libraries(test_circular_message_cache ${PROJECT_NAME})
endif()


# If compiling with gcc, run this test with sanitizers enabled
ament_add_gmock(test_ros2_message
Expand Down
74 changes: 74 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/cache/cache_buffer_interface.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__CACHE__CACHE_BUFFER_INTERFACE_HPP_
#define ROSBAG2_CPP__CACHE__CACHE_BUFFER_INTERFACE_HPP_

#include <memory>
#include <vector>

#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

namespace rosbag2_cpp
{
namespace cache
{
/**
* This class provides the interface for all CacheBuffer implementations. The
* role of these buffers is store messages that are received via topic subscriptions
* before they are written to the bagfile.
*
* Any class that implements CacheBufferInterface is reponsible for handling
* synchronization that is needed for the buffer to operate properly.
*/
class ROSBAG2_CPP_PUBLIC CacheBufferInterface
{
public:
using buffer_element_t = std::shared_ptr<const rosbag2_storage::SerializedBagMessage>;
virtual ~CacheBufferInterface() {}

/**
* Pushes a SerializedBagMessage into the cache buffer.
*
* \param msg SerializedBagMessage to add to the buffer.
* \return whether message was successfully added to the cache.
*/
virtual bool push(buffer_element_t msg) = 0;

/**
* Clears the buffer by removing all remaining messages.
*/
virtual void clear() = 0;

/**
* Check the buffer message count.
*
* \return number of elements in the buffer.
*/
virtual size_t size() = 0;

/**
* Get the data/messages stored in the buffer. This should only be
* called once no more messages are being added to the buffer.
*
* \return a vector containing messages in the buffer.
*/
virtual const std::vector<buffer_element_t> & data() = 0;
};

} // namespace cache
} // namespace rosbag2_cpp

#endif // ROSBAG2_CPP__CACHE__CACHE_BUFFER_INTERFACE_HPP_
8 changes: 5 additions & 3 deletions rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
#include <thread>
#include <vector>

#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_cpp/cache/message_cache.hpp"
#include "rosbag2_cpp/cache/message_cache_interface.hpp"

// This is necessary because of using stl types here. It is completely safe, because
// a) the member is not accessible from the outside
Expand Down Expand Up @@ -61,10 +63,10 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer
{
public:
using consume_callback_function_t = std::function<void (const
std::vector<MessageCacheBuffer::buffer_element_t> &)>;
std::vector<CacheBufferInterface::buffer_element_t> &)>;

CacheConsumer(
std::shared_ptr<MessageCache> message_cache,
std::shared_ptr<MessageCacheInterface> message_cache,
consume_callback_function_t consume_callback);

~CacheConsumer();
Expand All @@ -76,7 +78,7 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer
void change_consume_callback(consume_callback_function_t callback);

private:
std::shared_ptr<MessageCache> message_cache_;
std::shared_ptr<MessageCacheInterface> message_cache_;
consume_callback_function_t consume_callback_;

/// Write buffer data to a storage
Expand Down
76 changes: 76 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__CACHE__CIRCULAR_MESSAGE_CACHE_HPP_
#define ROSBAG2_CPP__CACHE__CIRCULAR_MESSAGE_CACHE_HPP_

#include <memory>
#include <mutex>
#include <string>

#include "rosbag2_cpp/cache/message_cache_circular_buffer.hpp"
#include "rosbag2_cpp/cache/message_cache_interface.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

#include "rosbag2_storage/serialized_bag_message.hpp"

// This is necessary because of using stl types here. It is completely safe, because
// a) the member is not accessible from the outside
// b) there are no inline functions.
#ifdef _WIN32
# pragma warning(push)
# pragma warning(disable:4251)
#endif

namespace rosbag2_cpp
{
namespace cache
{

class ROSBAG2_CPP_PUBLIC CircularMessageCache
: public MessageCacheInterface
{
public:
explicit CircularMessageCache(size_t max_buffer_size);

/// Puts msg into circular buffer, replacing the oldest msg when buffer is full
void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) override;

/// get current buffer to consume
std::shared_ptr<CacheBufferInterface> consumer_buffer() override;

/// Swap the primary and secondary buffer before consumption.
/// NOTE: consumer_buffer() should be called sequentially after
/// swap_buffer() to ensure expected behavior. Calling swap_buffer()
/// while accessing consumer_buffer() will be undefined behavior.
void swap_buffers() override;

private:
/// Double buffers
std::shared_ptr<MessageCacheCircularBuffer> primary_buffer_;
std::shared_ptr<MessageCacheCircularBuffer> secondary_buffer_;

/// Double buffers sync
std::mutex cache_mutex_;
};

} // namespace cache
} // namespace rosbag2_cpp

#ifdef _WIN32
# pragma warning(pop)
#endif

#endif // ROSBAG2_CPP__CACHE__CIRCULAR_MESSAGE_CACHE_HPP_
25 changes: 14 additions & 11 deletions rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <vector>

#include "rosbag2_cpp/cache/message_cache_buffer.hpp"
#include "rosbag2_cpp/cache/message_cache_interface.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

#include "rosbag2_storage/serialized_bag_message.hpp"
Expand All @@ -48,7 +50,7 @@ namespace cache
* the consumer performance (which can be a bottleneck, e.g. disk writes).
*
* Two instances of MessageCacheBuffer are used, one for producer and one for
* the consumer. Buffers are switched through wait_for_buffer function, which
* the consumer. Buffers are switched through swap_buffers function, which
* involves synchronization and a simple pointer switch.
*
* The cache can enter a flushing state, intended as a finalization state,
Expand All @@ -60,26 +62,24 @@ namespace cache
* performance issues, most likely with the CacheConsumer consumer callback.
*/
class ROSBAG2_CPP_PUBLIC MessageCache
: public MessageCacheInterface
{
public:
explicit MessageCache(uint64_t max_buffer_size);
explicit MessageCache(size_t max_buffer_size);

~MessageCache();

/// Puts msg into primary buffer. With full cache, msg is ignored and counted as lost
void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg);
void push(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) override;

/// Summarize dropped/remaining messages
void log_dropped();

/// Producer API: notify consumer to wake-up (primary buffer has data)
void notify_buffer_consumer();
void log_dropped() override;

/// Set the cache to consume-only mode for final buffer flush before closing
void finalize();
void finalize() override;

/// Notify that flushing is complete
void notify_flushing_done();
void notify_flushing_done() override;

/**
* Consumer API: wait until primary buffer is ready and swap it with consumer buffer.
Expand All @@ -88,15 +88,18 @@ class ROSBAG2_CPP_PUBLIC MessageCache
* a) data was inserted into the producer buffer, consuming can continue after a swap
* b) we are flushing the data (in case we missed the last notification when consuming)
**/
void wait_for_buffer();
void swap_buffers() override;

/// Consumer API: get current buffer to consume
std::shared_ptr<MessageCacheBuffer> consumer_buffer();
std::shared_ptr<CacheBufferInterface> consumer_buffer() override;

/// Exposes counts of messages dropped per topic
std::unordered_map<std::string, uint32_t> messages_dropped() const;

private:
/// Producer API: notify consumer to wake-up (primary buffer has data)
void notify_buffer_consumer();

/// Double buffers
std::shared_ptr<MessageCacheBuffer> primary_buffer_;
std::shared_ptr<MessageCacheBuffer> secondary_buffer_;
Expand Down
20 changes: 10 additions & 10 deletions rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <vector>

#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/cache/cache_buffer_interface.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"

// This is necessary because of using stl types here. It is completely safe, because
Expand All @@ -45,32 +46,31 @@ namespace cache
* ->byte_size() - like interface
*/
class ROSBAG2_CPP_PUBLIC MessageCacheBuffer
: public CacheBufferInterface
{
public:
explicit MessageCacheBuffer(const uint64_t max_cache_size);

using buffer_element_t = std::shared_ptr<const rosbag2_storage::SerializedBagMessage>;
explicit MessageCacheBuffer(size_t max_cache_size);

/**
* If buffer size got some space left, we push message regardless of its size, but if
* this results in exceeding buffer size, we mark buffer to drop all new incoming messages.
* This flag is cleared when buffers are swapped.
*/
bool push(buffer_element_t msg);
bool push(CacheBufferInterface::buffer_element_t msg) override;

/// Clear buffer
void clear();
void clear() override;

/// Get number of elements in the buffer
size_t size();
size_t size() override;

/// Get buffer data
const std::vector<buffer_element_t> & data();
const std::vector<CacheBufferInterface::buffer_element_t> & data() override;

private:
std::vector<buffer_element_t> buffer_;
uint64_t buffer_bytes_size_ {0u};
const uint64_t max_bytes_size_;
std::vector<CacheBufferInterface::buffer_element_t> buffer_;
size_t buffer_bytes_size_ {0u};
const size_t max_bytes_size_;

/// set when buffer is full and should drop messages instead of inserting them
std::atomic_bool drop_messages_ {false};
Expand Down
Loading

0 comments on commit 622b0ad

Please sign in to comment.