Skip to content

Commit

Permalink
Add pause/resume to PlayerClock
Browse files Browse the repository at this point in the history
Expose it on Player class as well

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
Emerson Knapp committed Apr 8, 2021
1 parent fb7fc1c commit df78227
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 7 deletions.
20 changes: 20 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ class PlayerClock
*/
ROSBAG2_CPP_PUBLIC
virtual double get_rate() const = 0;

/**
* Stop the flow of time of the clock.
* If this changes the pause state, this will wake any waiting `sleep_until`
*/
ROSBAG2_CPP_PUBLIC
virtual void pause() = 0;

/**
* Start the flow of time of the clock
* If this changes the pause state, this will wake any waiting `sleep_until`
*/
ROSBAG2_CPP_PUBLIC
virtual void resume() = 0;

/**
* Return whether the clock is currently paused.
*/
ROSBAG2_CPP_PUBLIC
virtual bool is_paused() const = 0;
};

} // namespace rosbag2_cpp
Expand Down
23 changes: 22 additions & 1 deletion rosbag2_cpp/include/rosbag2_cpp/clocks/time_controller_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class TimeControllerClock : public PlayerClock
TimeControllerClock(
rcutils_time_point_value_t starting_time,
double rate = 1.0,
NowFunction now_fn = std::chrono::steady_clock::now);
NowFunction now_fn = std::chrono::steady_clock::now,
std::chrono::milliseconds sleep_time_while_paused = std::chrono::milliseconds{100});

ROSBAG2_CPP_PUBLIC
virtual ~TimeControllerClock();
Expand All @@ -73,6 +74,26 @@ class TimeControllerClock : public PlayerClock
ROSBAG2_CPP_PUBLIC
double get_rate() const override;

/**
* Stop the flow of time of the clock.
* If this changes the pause state, this will wake any waiting `sleep_until`
*/
ROSBAG2_CPP_PUBLIC
void pause() override;

/**
* Start the flow of time of the clock
* If this changes the pause state, this will wake any waiting `sleep_until`
*/
ROSBAG2_CPP_PUBLIC
void resume() override;

/**
* Return whether the clock is currently paused.
*/
ROSBAG2_CPP_PUBLIC
bool is_paused() const override;

private:
std::unique_ptr<TimeControllerClockImpl> impl_;
};
Expand Down
63 changes: 57 additions & 6 deletions rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ class TimeControllerClockImpl
std::chrono::steady_clock::time_point steady;
};

explicit TimeControllerClockImpl(PlayerClock::NowFunction now_fn)
: now_fn(now_fn)
explicit TimeControllerClockImpl(
PlayerClock::NowFunction now_fn, std::chrono::milliseconds sleep_time_while_paused)
: now_fn(now_fn),
sleep_time_while_paused(sleep_time_while_paused)
{}
virtual ~TimeControllerClockImpl() = default;

Expand All @@ -81,19 +83,29 @@ class TimeControllerClockImpl
return reference.steady + std::chrono::nanoseconds(diff_nanos);
}

void snapshot(rcutils_time_point_value_t ros_time)
RCPPUTILS_TSA_REQUIRES(state_mutex)
{
reference.ros = ros_time;
reference.steady = now_fn();
}

const PlayerClock::NowFunction now_fn;
const std::chrono::milliseconds sleep_time_while_paused;

std::mutex state_mutex;
std::condition_variable cv RCPPUTILS_TSA_GUARDED_BY(state_mutex);
double rate RCPPUTILS_TSA_GUARDED_BY(state_mutex) = 1.0;
bool paused RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false;
TimeReference reference RCPPUTILS_TSA_GUARDED_BY(state_mutex);
};

TimeControllerClock::TimeControllerClock(
rcutils_time_point_value_t starting_time,
double rate,
NowFunction now_fn)
: impl_(std::make_unique<TimeControllerClockImpl>(now_fn))
NowFunction now_fn,
std::chrono::milliseconds sleep_time_while_paused)
: impl_(std::make_unique<TimeControllerClockImpl>(now_fn, sleep_time_while_paused))
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
impl_->reference.ros = starting_time;
Expand All @@ -114,8 +126,12 @@ bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until)
{
{
TSAUniqueLock lock(impl_->state_mutex);
const auto steady_until = impl_->ros_to_steady(until);
impl_->cv.wait_until(lock, steady_until);
if (impl_->paused) {
impl_->cv.wait_for(lock, impl_->sleep_time_while_paused);
} else {
const auto steady_until = impl_->ros_to_steady(until);
impl_->cv.wait_until(lock, steady_until);
}
}
return now() >= until;
}
Expand All @@ -126,4 +142,39 @@ double TimeControllerClock::get_rate() const
return impl_->rate;
}

void TimeControllerClock::pause()
{
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
if (impl_->paused) {
return;
}
// Note: needs to not be paused when taking snapshot, otherwise it will use last ros ref
impl_->snapshot(now());
impl_->paused = true;
}
impl_->cv.notify_all();
}

void TimeControllerClock::resume()
{
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
if (!impl_->paused) {
return;
}
// Note: needs to not be paused when taking snapshot, otherwise it will use last ros ref
impl_->paused = false;
impl_->snapshot(now());
}
impl_->cv.notify_all();
}

bool TimeControllerClock::is_paused() const
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
return impl_->paused;
}


} // namespace rosbag2_cpp

0 comments on commit df78227

Please sign in to comment.