Skip to content

Commit

Permalink
Updated RelayThreadMonitor error reporting
Browse files Browse the repository at this point in the history
All threads that have missed the deadline are reported before the process dies.
These log messages include the timestamp of the most recent update for each
thread that has missed the deadline.
  • Loading branch information
mitza-oci committed Aug 20, 2024
1 parent 5609309 commit 0a3fee2
Showing 1 changed file with 48 additions and 39 deletions.
87 changes: 48 additions & 39 deletions tools/rtpsrelay/RelayThreadMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

#include <ace/Thread.h>

#include <cstdlib>
#include <vector>

using namespace OpenDDS::DCPS;
namespace RtpsRelay {

Expand Down Expand Up @@ -49,49 +52,55 @@ int RelayThreadMonitor::svc()

ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, -1);

int count = 0;

while (running_) {
for (auto count = 0; running_; count = (count + 1) % safety_factor) {
condition_.wait_until(MonotonicTimePoint::now() + thread_status_interval, thread_status_manager);
if (running_) {
OpenDDS::DCPS::InternalThreadBuiltinTopicDataSeq datas;
DDS::SampleInfoSeq infos;
const DDS::ReturnCode_t ret = thread_status_reader_->read(datas,
infos,
DDS::LENGTH_UNLIMITED,
DDS::ANY_SAMPLE_STATE,
DDS::ANY_VIEW_STATE,
DDS::ANY_INSTANCE_STATE);
if (ret != DDS::RETCODE_OK) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RelayThreadMonitor::svc failed to read %C\n"), OpenDDS::DCPS::retcode_to_string(ret)));
continue;
if (!running_) {
break;
}
OpenDDS::DCPS::InternalThreadBuiltinTopicDataSeq datas;
DDS::SampleInfoSeq infos;
const DDS::ReturnCode_t ret = thread_status_reader_->read(datas,
infos,
DDS::LENGTH_UNLIMITED,
DDS::ANY_SAMPLE_STATE,
DDS::ANY_VIEW_STATE,
DDS::ANY_INSTANCE_STATE);
if (ret != DDS::RETCODE_OK) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RelayThreadMonitor::svc failed to read %C\n", OpenDDS::DCPS::retcode_to_string(ret)));
continue;
}

const SystemTimePoint expire = SystemTimePoint::now() - thread_status_interval * safety_factor;
const auto log_all_threads = count == 0 && config_.log_thread_status();
std::vector<DDS::UInt32> late_thread_indexes;

for (CORBA::ULong idx = 0; idx != infos.length(); ++idx) {
if (infos[idx].valid_data) {
utilization_[datas[idx].thread_id.in()] = datas[idx].utilization;
} else if (infos[idx].instance_state != DDS::ALIVE_INSTANCE_STATE) {
utilization_.erase(datas[idx].thread_id.in());
}

const SystemTimePoint expire = SystemTimePoint::now() - thread_status_interval * safety_factor;

for (CORBA::ULong idx = 0; idx != infos.length(); ++idx) {
if (infos[idx].valid_data) {
utilization_[datas[idx].thread_id.in()] = datas[idx].utilization;
} else if (infos[idx].instance_state != DDS::ALIVE_INSTANCE_STATE) {
utilization_.erase(datas[idx].thread_id.in());
}

if (count == 0 && config_.log_thread_status()) {
ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) INFO: Thread Status %C %C\n"),
to_json(datas[idx]).c_str(),
to_json(infos[idx]).c_str()));
}

const SystemTimePoint timestamp(infos[idx].source_timestamp);
if (infos[idx].instance_state == DDS::ALIVE_INSTANCE_STATE && timestamp < expire) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("(%P|%t) ERROR: RelayThreadMonitor::svc thread %C failed to update status. Aborting...\n"),
datas[idx].thread_id.in()));
abort();
}
if (log_all_threads) {
ACE_DEBUG((LM_INFO, "(%P|%t) INFO: Thread Status %C %C\n",
infos[idx].valid_data ? to_json(datas[idx]).c_str() : datas[idx].thread_id.in(),
to_json(infos[idx]).c_str()));
}

count = (count + 1) % safety_factor;
if (infos[idx].instance_state == DDS::ALIVE_INSTANCE_STATE && SystemTimePoint(infos[idx].source_timestamp) < expire) {
late_thread_indexes.push_back(idx);
}
}

for (const auto idx : late_thread_indexes) {
const SystemTimePoint timestamp(infos[idx].source_timestamp);
ACE_ERROR((LM_ERROR,
"(%P|%t) ERROR: RelayThreadMonitor::svc thread %C last update %#T. Aborting...\n",
datas[idx].thread_id.in(), &timestamp.value()));
}

if (!late_thread_indexes.empty()) {
std::abort();
}
}

Expand All @@ -116,7 +125,7 @@ void RelayThreadMonitor::on_data_available(DDS::DataReader_ptr /*reader*/)
}

if (ret != DDS::RETCODE_OK) {
ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RelayThreadMonitor::on_data_available failed to read %C\n"), OpenDDS::DCPS::retcode_to_string(ret)));
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RelayThreadMonitor::on_data_available failed to read %C\n", OpenDDS::DCPS::retcode_to_string(ret)));
return;
}

Expand Down

0 comments on commit 0a3fee2

Please sign in to comment.