Skip to content

Commit

Permalink
Improve resilience against clock adjustments (eProsima#5018)
Browse files Browse the repository at this point in the history
* Use steady_clock instead of high_resolution_clock for status checks (high_resolution_clock might not be steady depending on STL impl)

Signed-off-by: Matthias Schneider <ma30002000@yahoo.de>

* Use steady_clock instead for system_clock for calculating timeouts

Signed-off-by: Matthias Schneider <ma30002000@yahoo.de>

* Use correct clock's duration for duration_cast

Signed-off-by: Matthias Schneider <ma30002000@yahoo.de>

* Use Time_t::now()

Signed-off-by: Matthias Schneider <ma30002000@yahoo.de>

* Fix build.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21314. Refactor on DataWriterImpl.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21314. Refactor on DataReaderImpl.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21314. Refactor on StatefulWriter.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Refs #21314. Protect current_time_since_unix_epoch against clock adjustments.

Signed-off-by: Miguel Company <miguelcompany@eprosima.com>

* Revert "Use steady_clock instead of high_resolution_clock for status checks (high_resolution_clock might not be steady depending on STL impl)"

This reverts commit d69eb91.

---------

Signed-off-by: Matthias Schneider <ma30002000@yahoo.de>
Signed-off-by: Miguel Company <miguelcompany@eprosima.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
Signed-off-by: paxifaer <807128216@qq.com>
  • Loading branch information
2 people authored and paxifaer committed Sep 7, 2024
1 parent 30328eb commit 28a3371
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 95 deletions.
9 changes: 1 addition & 8 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1252,14 +1252,7 @@ bool DomainParticipantImpl::contains_entity(
ReturnCode_t DomainParticipantImpl::get_current_time(
fastdds::dds::Time_t& current_time) const
{
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
duration -= seconds;
auto nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(duration);

current_time.seconds = static_cast<int32_t>(seconds.count());
current_time.nanosec = static_cast<uint32_t>(nanos.count());
fastdds::dds::Time_t::now(current_time);

return RETCODE_OK;
}
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ class DomainParticipantImpl
DomainParticipantListener* listener,
const std::chrono::seconds timeout = std::chrono::seconds::max())
{
auto time_out = std::chrono::time_point<std::chrono::system_clock>::max();
auto time_out = std::chrono::time_point<std::chrono::steady_clock>::max();
if (timeout < std::chrono::seconds::max())
{
auto now = std::chrono::system_clock::now();
auto now = std::chrono::steady_clock::now();
time_out = now + timeout;
}

Expand Down
34 changes: 10 additions & 24 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include <fastdds/publisher/filtering/DataWriterFilteredChangePool.hpp>
#include <fastdds/publisher/PublisherImpl.hpp>
#include <fastdds/rtps/builtin/data/TopicDescription.hpp>
#include <fastdds/rtps/common/Time_t.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.hpp>
#include <fastdds/rtps/RTPSDomain.hpp>
#include <fastdds/rtps/writer/RTPSWriter.hpp>
Expand Down Expand Up @@ -1076,7 +1077,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
{
if (!history_->set_next_deadline(
handle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
}
Expand Down Expand Up @@ -1547,7 +1548,7 @@ bool DataWriterImpl::deadline_missed()

if (!history_->set_next_deadline(
timer_owner_,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
return false;
Expand Down Expand Up @@ -1597,39 +1598,24 @@ bool DataWriterImpl::lifespan_expired()
{
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

fastdds::rtps::Time_t current_ts;
fastdds::rtps::Time_t::now(current_ts);

CacheChange_t* earliest_change;
while (history_->get_earliest_change(&earliest_change))
{
auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastdds::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration;

// Check that the earliest change has expired (the change which started the timer could have been removed from the history)
if (now - source_timestamp < lifespan_duration_us_)
if (current_ts < expiration_ts)
{
auto interval = source_timestamp - now + lifespan_duration_us_;
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
fastdds::rtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
return true;
}

// The earliest change has expired
history_->remove_change_pub(earliest_change);

// Set the timer for the next change if there is one
if (!history_->get_earliest_change(&earliest_change))
{
return false;
}

// Calculate when the next change is due to expire and restart
source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
now = system_clock::now();
auto interval = source_timestamp - now + lifespan_duration_us_;

if (interval.count() > 0)
{
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
return true;
}
}

return false;
Expand Down
46 changes: 16 additions & 30 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <fastdds/dds/topic/TypeSupport.hpp>
#include <fastdds/domain/DomainParticipantImpl.hpp>
#include <fastdds/rtps/builtin/data/TopicDescription.hpp>
#include <fastdds/rtps/common/Time_t.hpp>
#include <fastdds/rtps/participant/RTPSParticipant.hpp>
#include <fastdds/rtps/reader/RTPSReader.hpp>
#include <fastdds/rtps/RTPSDomain.hpp>
Expand Down Expand Up @@ -1115,7 +1116,7 @@ bool DataReaderImpl::on_new_cache_change_added(
{
if (!history_.set_next_deadline(
change->instanceHandle,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_)))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
{
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history");
}
Expand All @@ -1134,12 +1135,13 @@ bool DataReaderImpl::on_new_cache_change_added(
return true;
}

auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastdds::rtps::Time_t expiration_ts = change->sourceTimestamp + qos_.lifespan().duration;
fastdds::rtps::Time_t current_ts;
fastdds::rtps::Time_t::now(current_ts);

// The new change could have expired if it arrived too late
// If so, remove it from the history and return false to avoid notifying the listener
if (now - source_timestamp >= lifespan_duration_us_)
if (expiration_ts < current_ts)
{
history_.remove_change_sub(new_change);
return false;
Expand All @@ -1161,11 +1163,10 @@ bool DataReaderImpl::on_new_cache_change_added(
EPROSIMA_LOG_ERROR(SUBSCRIBER, "A change was added to history that could not be retrieved");
}

auto interval = source_timestamp - now + duration_cast<nanoseconds>(lifespan_duration_us_);

// Update and restart the timer
// If the timer is already running this will not have any effect
lifespan_timer_->update_interval_millisec(interval.count() * 1e-6);
fastdds::rtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
lifespan_timer_->restart_timer();
return true;
}
Expand Down Expand Up @@ -1253,7 +1254,7 @@ bool DataReaderImpl::deadline_missed()

if (!history_.set_next_deadline(
timer_owner_,
steady_clock::now() + duration_cast<system_clock::duration>(deadline_duration_us_), true))
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_), true))
{
EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history");
return false;
Expand Down Expand Up @@ -1284,41 +1285,26 @@ bool DataReaderImpl::lifespan_expired()
{
std::unique_lock<RecursiveTimedMutex> lock(reader_->getMutex());

fastdds::rtps::Time_t current_ts;
fastdds::rtps::Time_t::now(current_ts);

CacheChange_t* earliest_change;
while (history_.get_earliest_change(&earliest_change))
{
auto source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
auto now = system_clock::now();
fastdds::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration;

// Check that the earliest change has expired (the change which started the timer could have been removed from the history)
if (now - source_timestamp < lifespan_duration_us_)
if (current_ts < expiration_ts)
{
auto interval = source_timestamp - now + lifespan_duration_us_;
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
fastdds::rtps::Time_t interval = expiration_ts - current_ts;
lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
return true;
}

// The earliest change has expired
history_.remove_change_sub(earliest_change);

try_notify_read_conditions();

// Set the timer for the next change if there is one
if (!history_.get_earliest_change(&earliest_change))
{
return false;
}

// Calculate when the next change is due to expire and restart
source_timestamp = system_clock::time_point() + nanoseconds(earliest_change->sourceTimestamp.to_ns());
now = system_clock::now();
auto interval = source_timestamp - now + lifespan_duration_us_;

if (interval.count() > 0)
{
lifespan_timer_->update_interval_millisec(static_cast<double>(duration_cast<milliseconds>(interval).count()));
return true;
}
}

return false;
Expand Down
48 changes: 24 additions & 24 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ StatefulWriter::StatefulWriter(
, may_remove_change_(0)
, disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback)
, disable_positive_acks_(att.disable_positive_acks)
, keep_duration_us_(att.keep_duration.to_ns() * 1e-3)
, keep_duration_(att.keep_duration)
, last_sequence_number_()
, biggest_removed_sequence_number_()
, sendBufferSize_(pimpl->get_min_network_send_buffer_size())
Expand Down Expand Up @@ -370,12 +370,13 @@ void StatefulWriter::unsent_change_added_to_history(

if (disable_positive_acks_)
{
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);
Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
Time_t current_ts;
Time_t::now(current_ts);
assert(expiration_ts >= current_ts);
auto interval = (expiration_ts - current_ts).to_duration_t();

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->update_interval(interval);
ack_event_->restart_timer(max_blocking_time);
}

Expand Down Expand Up @@ -890,12 +891,13 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
if ( !(ack_event_->getRemainingTimeMilliSec() > 0))
{
// Restart ack_timer
auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
auto interval = source_timestamp - now + keep_duration_us_;
assert(interval.count() >= 0);
Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
Time_t current_ts;
Time_t::now(current_ts);
assert(expiration_ts >= current_ts);
auto interval = (expiration_ts - current_ts).to_duration_t();

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
ack_event_->update_interval(interval);
ack_event_->restart_timer(max_blocking_time);
}
}
Expand Down Expand Up @@ -1606,13 +1608,9 @@ void StatefulWriter::update_positive_acks_times(
const WriterAttributes& att)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
if (keep_duration_us_.count() != (att.keep_duration.to_ns() * 1e-3))
{
// Implicit conversion to microseconds
keep_duration_us_ = std::chrono::nanoseconds {att.keep_duration.to_ns()};
}
keep_duration_ = att.keep_duration;
// Restart ack timer with new duration
ack_event_->update_interval_millisec(keep_duration_us_.count() * 1e-3);
ack_event_->update_interval(keep_duration_);
ack_event_->restart_timer();
}

Expand Down Expand Up @@ -2025,14 +2023,16 @@ bool StatefulWriter::ack_timer_expired()
// The timer has expired so the earliest non-acked change must be marked as acknowledged
// This will be done in the first while iteration, as we start with a negative interval

auto interval = -keep_duration_us_;
Time_t expiration_ts;
Time_t current_ts;
Time_t::now(current_ts);

// On the other hand, we've seen in the tests that if samples are sent very quickly with little
// time between consecutive samples, the timer interval could end up being negative
// In this case, we keep marking changes as acknowledged until the timer is able to keep up, hence the while
// loop

while (interval.count() < 0)
do
{
bool acks_flag = false;
for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
Expand Down Expand Up @@ -2071,13 +2071,13 @@ bool StatefulWriter::ack_timer_expired()
return false;
}

auto source_timestamp = system_clock::time_point() + nanoseconds(change->sourceTimestamp.to_ns());
auto now = system_clock::now();
interval = source_timestamp - now + keep_duration_us_;
Time_t::now(current_ts);
expiration_ts = change->sourceTimestamp + keep_duration_;
}
assert(interval.count() >= 0);
while (expiration_ts < current_ts);

ack_event_->update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
auto interval = (expiration_ts - current_ts).to_duration_t();
ack_event_->update_interval(interval);
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/writer/StatefulWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ class StatefulWriter : public BaseWriter
bool disable_heartbeat_piggyback_;
/// True to disable positive ACKs
bool disable_positive_acks_;
/// Keep duration for disable positive ACKs QoS, in microseconds
std::chrono::duration<double, std::ratio<1, 1000000>> keep_duration_us_;
/// Keep duration for disable positive ACKs QoS
fastdds::dds::Duration_t keep_duration_;
/// Last acknowledged cache change (only used if using disable positive ACKs QoS)
SequenceNumber_t last_sequence_number_;
/// Biggest sequence number removed from history
Expand Down
8 changes: 4 additions & 4 deletions src/cpp/utils/SystemInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ bool SystemInfo::wait_for_file_closure(
const std::string& filename,
const std::chrono::seconds timeout)
{
auto start = std::chrono::system_clock::now();
auto start = std::chrono::steady_clock::now();

#ifdef _MSC_VER
std::ofstream os;
Expand All @@ -174,7 +174,7 @@ bool SystemInfo::wait_for_file_closure(
os.open(filename, std::ios::out | std::ios::app, _SH_DENYWR);
if (!os.is_open()
// If the file is lock-opened in an external editor do not hang
&& (std::chrono::system_clock::now() - start) < timeout )
&& (std::chrono::steady_clock::now() - start) < timeout )
{
std::this_thread::yield();
}
Expand All @@ -189,7 +189,7 @@ bool SystemInfo::wait_for_file_closure(

while (flock(fd, LOCK_EX | LOCK_NB)
// If the file is lock-opened in an external editor do not hang
&& (std::chrono::system_clock::now() - start) < timeout )
&& (std::chrono::steady_clock::now() - start) < timeout )
{
std::this_thread::yield();
}
Expand All @@ -204,7 +204,7 @@ bool SystemInfo::wait_for_file_closure(
(void)filename;
#endif // ifdef _MSC_VER

return std::chrono::system_clock::now() - start < timeout;
return std::chrono::steady_clock::now() - start < timeout;
}

fastdds::dds::ReturnCode_t SystemInfo::set_environment_file()
Expand Down
6 changes: 5 additions & 1 deletion src/cpp/utils/time_t_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ static void current_time_since_unix_epoch(
{
using namespace std::chrono;

static const auto init_time_since_epoch = system_clock::now().time_since_epoch();
static const auto init_steady_time = steady_clock::now();

// Get time since epoch
auto t_since_epoch = system_clock::now().time_since_epoch();
auto t_elapsed = steady_clock::now() - init_steady_time;
auto t_since_epoch = init_time_since_epoch + t_elapsed;
// Get seconds
auto secs_t = duration_cast<seconds>(t_since_epoch);
// Remove seconds from time
Expand Down

0 comments on commit 28a3371

Please sign in to comment.