Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notification of significant events during bag recording and playback #908

Merged
merged 44 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4d6c39d
Simple hacky prototype
gbiggs Oct 15, 2021
1abc18f
Improved prototype with more flexibility and event information
gbiggs Oct 20, 2021
fef7111
Use the full relative file path for event info
gbiggs Nov 5, 2021
0426473
Remove no-longer required dependency
gbiggs Nov 9, 2021
0c69e02
Remove left-over merge
gbiggs Nov 9, 2021
2be81a1
Change event names
gbiggs Nov 9, 2021
27ea332
Add overrides to mocks
gbiggs Nov 10, 2021
94a5125
Add missing include
gbiggs Nov 11, 2021
344fa8a
Remove empty file
gbiggs Nov 11, 2021
141801f
Remove unnecessary virtual
gbiggs Nov 11, 2021
7986a31
Correct formatting
gbiggs Nov 11, 2021
81cf14e
Add missing doc
gbiggs Nov 22, 2021
23dfdf2
Reformat documentation
gbiggs Nov 22, 2021
db3f3ad
Change read event topic name
gbiggs Nov 22, 2021
ffd1740
Change read event topic name
gbiggs Nov 22, 2021
b2563f7
Simplify for loop
gbiggs Nov 22, 2021
d8e4117
Remove unnecessary blank line
gbiggs Nov 22, 2021
f23cb74
Change argument type to const
gbiggs Nov 22, 2021
9371fac
Add events handling to mocks
gbiggs Nov 22, 2021
8affe85
Add comment explaining 5-message split
gbiggs Nov 23, 2021
6d2a2dd
Add integration tests for player and recorder
gbiggs Nov 23, 2021
25338bc
Add ability to check if an event has callbacks
gbiggs Nov 23, 2021
e5249fb
Shift event topic publishing to a separate thread
gbiggs Nov 23, 2021
b8498ed
Fix formatting
gbiggs Nov 23, 2021
4b2e2d7
Test SequentialReader event callbacks
gbiggs Nov 23, 2021
93950c9
Test SequentialWriter event callbacks
gbiggs Nov 23, 2021
4b3ef80
Document bag events
gbiggs Dec 1, 2021
5a047fc
Check if joinable before joining
gbiggs Dec 6, 2021
2f49b7e
Correct spelling mistake
gbiggs Dec 6, 2021
e6c5990
Correct spelling mistake
gbiggs Dec 6, 2021
02a6cec
Reduce time wasted waiting for test to reach intended behaviour
gbiggs Dec 6, 2021
dc8ae99
Address test flakiness
gbiggs Dec 6, 2021
9e3ace8
Improve size check
gbiggs Dec 6, 2021
5d12203
Use constant for split size in tests
gbiggs Dec 6, 2021
47bd578
Add missing header
gbiggs Dec 6, 2021
df2b23a
Allocate thread on the stack
gbiggs Dec 7, 2021
83342db
Add getters for number of messages per 'file'
gbiggs Dec 7, 2021
d63e397
Replace magic number
gbiggs Dec 9, 2021
824cb7a
Address unstable OSX build
gbiggs Dec 9, 2021
44365e5
Make path in test cross-platform
gbiggs Dec 9, 2021
2eb9924
Add missing virtual destructor
gbiggs Dec 13, 2021
d1c37d6
Follow uncrustify suggestion
gbiggs Dec 21, 2021
b8bc753
Fix rebase errors
gbiggs Feb 21, 2022
d74ff65
Remove double include
gbiggs Jun 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__BAG_EVENTS_HPP_
#define ROSBAG2_CPP__BAG_EVENTS_HPP_

#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "rclcpp/function_traits.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

namespace rosbag2_cpp
{

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would some comments in this file explaining what the events are and how to implement them be useful? I'm just thinking of future contributors to rosbag2 that may not have been privy to the discussions on this. Comments may help to reduce onboarding time and lessen potential confusion. Maybe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! That file was completely devoid of comments. I've documented the entities in it in ad46aaa. Please take a look.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation is very thorough! Fantastic! Love it!

namespace bag_events
{

/**
* \brief The types of bag events available for registering callbacks.
*/
enum class BagEvent
{
/// The output bag file has been split, starting a new file.
WRITE_SPLIT,
/// Reading of the input bag file has gone over a split, opening the next file.
READ_SPLIT,
};

/**
* \brief The information structure passed to callbacks for the WRITE_SPLIT and READ_SPLIT events.
*/
struct BagSplitInfo
{
/// The URI of the file that was closed.
std::string closed_file;
/// The URI of the file that was opened.
std::string opened_file;
};

using BagSplitCallbackType = std::function<void (BagSplitInfo &)>;

/**
* \brief Use this structure to register callbacks with Writers.
*/
struct WriterEventCallbacks
{
/// The callback to call for the WRITE_SPLIT event.
BagSplitCallbackType write_split_callback;
};

/**
* \brief Use this structure to register callbacks with Readers.
*/
struct ReaderEventCallbacks
{
/// The callback to call for the READ_SPLIT event.
BagSplitCallbackType read_split_callback;
};

/**
* \brief Base class for event callbacks.
*
* This class should not be used directly.
*/
class BagEventCallbackBase
{
public:
using SharedPtr = std::shared_ptr<BagEventCallbackBase>;
using InfoPtr = std::shared_ptr<void>;

virtual ~BagEventCallbackBase()
{}

virtual void execute(InfoPtr & info) = 0;

virtual bool is_type(BagEvent event) const = 0;
};

/**
* \brief Templated class for storing an event callback.
*
* This class should not be used directly by users.
*/
template<typename EventCallbackT>
class BagEventCallback : public BagEventCallbackBase
{
public:
BagEventCallback(const EventCallbackT & callback, BagEvent event)
: callback_(callback),
event_(event)
{}

virtual ~BagEventCallback()
{}

void execute(InfoPtr & info) override
{
callback_(*std::static_pointer_cast<EventCallbackInfoT>(info));
}

bool is_type(BagEvent event) const override
{
return event == event_;
}

private:
using EventCallbackInfoT = typename std::remove_reference<typename
rclcpp::function_traits::function_traits<EventCallbackT>::template argument_type<0>>::type;

EventCallbackT callback_;
BagEvent event_;
};

/**
* \brief The class used to manage event callbacks registered with a Writer or Reader.
*
* Each implementation of the Writer and Reader interfaces should store one instance of this type.
* When new callbacks are registered, they should be passed to that instance using \ref
* add_event_callback. When an event occurs, the callbacks registered for it can be called by
* calling the \ref execute_callbacks method, passing in the event information.
*/
class EventCallbackManager
{
public:
/**
* \brief Add an event callback.
*
* \param callback The callback that should be called for the event.
* \param event The event, one of the values of the \ref BagEvent enumeration.
*/
template<typename EventCallbackT>
void add_event_callback(const EventCallbackT & callback, const BagEvent event)
{
auto cb = std::make_shared<BagEventCallback<EventCallbackT>>(callback, event);
callbacks_.push_back(cb);
}

/**
* \brief Check if a callback is registered for the given event.
*
* \param event The event, one of the values of the \ref BagEvent enumeration.
* \return True if a callback is registered for the event, false otherwise.
*/
bool has_callback_for_event(const BagEvent event) const
{
for (auto & cb : callbacks_) {
if (cb->is_type(event)) {
return true;
}
}
return false;
}

/**
* \brief Execute all callbacks registered for the given event.
*
* The provided information value is passed to each callback by copy.
*
* \param event The event, one of the values of the \ref BagEvent enumeration.
* \param info The information relevant to the event that has occurred. The type varies by event.
*/
void execute_callbacks(const BagEvent event, BagEventCallbackBase::InfoPtr info)
{
for (auto & cb : callbacks_) {
if (cb->is_type(event)) {
cb->execute(info);
}
}
}

private:
std::vector<BagEventCallbackBase::SharedPtr> callbacks_;
};

} // namespace bag_events

} // namespace rosbag2_cpp

#endif // ROSBAG2_CPP__BAG_EVENTS_HPP_
7 changes: 7 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"
#include "rosbag2_cpp/visibility_control.hpp"
Expand Down Expand Up @@ -179,6 +180,12 @@ class ROSBAG2_CPP_PUBLIC Reader final
return *reader_impl_;
}

/**
* \brief Add callbacks for events that may occur during bag reading.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(bag_events::ReaderEventCallbacks & callbacks);
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved

private:
std::unique_ptr<reader_interfaces::BaseReaderInterface> reader_impl_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <vector>

#include "rcutils/types.h"
#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

Expand Down Expand Up @@ -57,6 +58,8 @@ class ROSBAG2_CPP_PUBLIC BaseReaderInterface
virtual void reset_filter() = 0;

virtual void seek(const rcutils_time_point_value_t & timestamp) = 0;

virtual void add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks) = 0;
};

} // namespace reader_interfaces
Expand Down
15 changes: 15 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unordered_set>
#include <vector>

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
Expand Down Expand Up @@ -101,6 +102,18 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
*/
virtual std::string get_current_uri() const;

/**
* \brief Add callbacks for events that may occur during bag reading.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks) override;

/**
* \brief Check if a callback is registered for the given event.
* \return True if there is any callback registered for the event, false otherwise.
*/
bool has_callback_for_event(const bag_events::BagEvent event) const;

protected:
/**
* Opens the current file and sets up the filters in the new storage.
Expand Down Expand Up @@ -172,6 +185,8 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
private:
rosbag2_storage::StorageOptions storage_options_;
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_{};

bag_events::EventCallbackManager callback_manager_;
};

} // namespace readers
Expand Down
7 changes: 7 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/time.hpp"

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"
Expand Down Expand Up @@ -204,6 +205,12 @@ class ROSBAG2_CPP_PUBLIC Writer final
return *writer_impl_;
}

/*
* \brief Add callbacks for events that may occur during bag writing.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(bag_events::WriterEventCallbacks & callbacks);

private:
std::mutex writer_mutex_;
std::unique_ptr<rosbag2_cpp::writer_interfaces::BaseWriterInterface> writer_impl_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <memory>

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

Expand Down Expand Up @@ -51,6 +52,8 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface
* \returns true if snapshot is successful, false if snapshot fails or is not supported
*/
virtual bool take_snapshot() = 0;

virtual void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) = 0;
};

} // namespace writer_interfaces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <unordered_map>
#include <vector>

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/cache/cache_consumer.hpp"
#include "rosbag2_cpp/cache/circular_message_cache.hpp"
#include "rosbag2_cpp/cache/message_cache.hpp"
Expand Down Expand Up @@ -114,6 +115,12 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
*/
bool take_snapshot() override;

/**
* \brief Add callbacks for events that may occur during bag writing.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) override;

protected:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
Expand Down Expand Up @@ -164,6 +171,8 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
void write_messages(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages);
bool is_first_message_ {true};

bag_events::EventCallbackManager callback_manager_;
};

} // namespace writers
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ void Reader::seek(const rcutils_time_point_value_t & timestamp)
reader_impl_->seek(timestamp);
}

void Reader::add_event_callbacks(bag_events::ReaderEventCallbacks & callbacks)
{
reader_impl_->add_event_callbacks(callbacks);
}

} // namespace rosbag2_cpp
18 changes: 18 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,12 @@ void SequentialReader::load_current_file()
void SequentialReader::load_next_file()
{
assert(current_file_iterator_ != file_paths_.end());
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = get_current_file();
current_file_iterator_++;
info->opened_file = get_current_file();
load_current_file();
callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info);
}

std::string SequentialReader::get_current_file() const
Expand Down Expand Up @@ -292,5 +296,19 @@ void SequentialReader::fill_topics_metadata()
}
}

void SequentialReader::add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks)
{
if (callbacks.read_split_callback) {
callback_manager_.add_event_callback(
callbacks.read_split_callback,
bag_events::BagEvent::READ_SPLIT);
}
}

bool SequentialReader::has_callback_for_event(const bag_events::BagEvent event) const
{
return callback_manager_.has_callback_for_event(event);
}

} // namespace readers
} // namespace rosbag2_cpp
5 changes: 5 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,9 @@ void Writer::write(
return write(serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}

void Writer::add_event_callbacks(bag_events::WriterEventCallbacks & callbacks)
{
writer_impl_->add_event_callbacks(callbacks);
}

} // namespace rosbag2_cpp
Loading