Skip to content

Commit

Permalink
Publish the ParticipantLocation BIT before participant discovery comp…
Browse files Browse the repository at this point in the history
…letes

Applications can use ParticipantLocation to get notified that discovery
is in progress.  The spec-defined Participant BIT won't be published
until participant discovery is complete.
  • Loading branch information
mitza-oci committed Jun 13, 2024
1 parent c1418e1 commit 932b38f
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 197 deletions.
8 changes: 0 additions & 8 deletions dds/DCPS/RTPS/Spdp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,14 +519,6 @@ void Spdp::process_location_updates_i(const DiscoveredParticipantIter& iter, con
return;
}

if (iter->second.bit_ih_ == DDS::HANDLE_NIL) {
// Do not process updates until the participant exists in the built-in topics.
if (DCPS::log_bits) {
ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::process_location_updates_i: %@ %C does not exist in participant bit, returning\n", this, LogGuid(iter->first).c_str()));
}
return;
}

DiscoveredParticipant::LocationUpdateList location_updates;
std::swap(iter->second.location_updates_, location_updates);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
* See: http://www.opendds.org/license.html
*/

#include "ParticipantLocationListenerImpl.h"
#include <dds/OpenddsDcpsExtTypeSupportImpl.h>
#include "BitListener.h"

#include <ace/streams.h>

#include <string>

// Implementation skeleton constructor
ParticipantLocationListenerImpl::ParticipantLocationListenerImpl(const std::string& id,
bool noice,
bool ipv6,
callback_t done_callback)
BitListener::BitListener(const std::string& id,
bool noice,
bool ipv6,
callback_t done_callback)
: id_(id)
, no_ice_(noice)
, ipv6_(ipv6)
Expand All @@ -23,35 +23,43 @@ ParticipantLocationListenerImpl::ParticipantLocationListenerImpl(const std::stri
{
}

// Implementation skeleton destructor
ParticipantLocationListenerImpl::~ParticipantLocationListenerImpl()
BitListener::~BitListener()
{
}

void ParticipantLocationListenerImpl::on_data_available(DDS::DataReader_ptr reader)
void BitListener::on_data_available(DDS::DataReader_ptr reader)
{
ACE_Guard<ACE_Thread_Mutex> g(mutex_);

// 1. Narrow the DataReader to an ParticipantLocationBuiltinTopicDataDataReader
// 2. Read the samples from the data reader
// 3. Print out the contents of the samples
OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader_var builtin_dr =
OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader::_narrow(reader);
if (0 == builtin_dr)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_data_available: _narrow failed." << std::endl;
ACE_OS::exit(1);
}

if (builtin_dr) {
on_data_available_i(builtin_dr);
return;
}

DDS::ParticipantBuiltinTopicDataDataReader_var participant_dr =
DDS::ParticipantBuiltinTopicDataDataReader::_narrow(reader);

if (participant_dr) {
on_data_available_i(participant_dr);
} else {
std::cerr << "BitListener::"
<< "on_data_available: _narrow failed." << std::endl;
ACE_OS::exit(1);
}
}

void BitListener::on_data_available_i(OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader_var builtin_dr)
{
OpenDDS::DCPS::ParticipantLocationBuiltinTopicData participant;
DDS::SampleInfo si;

for (DDS::ReturnCode_t status = builtin_dr->read_next_sample(participant, si);
status == DDS::RETCODE_OK;
status = builtin_dr->read_next_sample(participant, si)) {

// copy octet[] to guid
OpenDDS::DCPS::GUID_t guid;
std::memcpy(&guid, &participant.guid, sizeof(guid));

Expand Down Expand Up @@ -90,8 +98,7 @@ void ParticipantLocationListenerImpl::on_data_available(DDS::DataReader_ptr read
<< " lease: " << OpenDDS::DCPS::TimeDuration(participant.lease_duration).str(9) << std::endl;

// update locations if SampleInfo is valid.
if (si.valid_data == 1)
{
if (si.valid_data) {
std::pair<LocationMapType::iterator, bool> p = location_map.insert(std::make_pair(guid, 0));
p.first->second |= participant.location;
}
Expand All @@ -104,55 +111,72 @@ void ParticipantLocationListenerImpl::on_data_available(DDS::DataReader_ptr read
}
}

void ParticipantLocationListenerImpl::on_requested_deadline_missed(
void BitListener::on_data_available_i(DDS::ParticipantBuiltinTopicDataDataReader_var builtin_dr)
{
DDS::ParticipantBuiltinTopicData participant;
DDS::SampleInfo si;

for (DDS::ReturnCode_t status = builtin_dr->read_next_sample(participant, si);
status == DDS::RETCODE_OK;
status = builtin_dr->read_next_sample(participant, si)) {

if (si.valid_data) {
OpenDDS::DCPS::GUID_t guid;
std::memcpy(&guid, &participant.key, sizeof(guid));
participants_seen_.insert(guid);

if (!done_ && check(true)) {
done_ = true;
std::cout << "== " << id_ << " Participant received all expected locations" << std::endl;
done_callback_();
}
}
}
}

void BitListener::on_requested_deadline_missed(
DDS::DataReader_ptr,
const DDS::RequestedDeadlineMissedStatus &)
const DDS::RequestedDeadlineMissedStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_requested_deadline_missed" << std::endl;
std::cerr << "BitListener::on_requested_deadline_missed" << std::endl;
}

void ParticipantLocationListenerImpl::on_requested_incompatible_qos(
void BitListener::on_requested_incompatible_qos(
DDS::DataReader_ptr,
const DDS::RequestedIncompatibleQosStatus &)
const DDS::RequestedIncompatibleQosStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_requested_incompatible_qos" << std::endl;
std::cerr << "BitListener::on_requested_incompatible_qos" << std::endl;
}

void ParticipantLocationListenerImpl::on_liveliness_changed(
void BitListener::on_liveliness_changed(
DDS::DataReader_ptr,
const DDS::LivelinessChangedStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_liveliness_changed" << std::endl;
std::cerr << "BitListener::on_liveliness_changed" << std::endl;
}

void ParticipantLocationListenerImpl::on_subscription_matched(
void BitListener::on_subscription_matched(
DDS::DataReader_ptr,
const DDS::SubscriptionMatchedStatus &)
const DDS::SubscriptionMatchedStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_subscription_matched" << std::endl;
std::cerr << "BitListener::on_subscription_matched" << std::endl;
}

void ParticipantLocationListenerImpl::on_sample_rejected(
void BitListener::on_sample_rejected(
DDS::DataReader_ptr,
const DDS::SampleRejectedStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_sample_rejected" << std::endl;
std::cerr << "BitListener::on_sample_rejected" << std::endl;
}

void ParticipantLocationListenerImpl::on_sample_lost(
void BitListener::on_sample_lost(
DDS::DataReader_ptr,
const DDS::SampleLostStatus&)
{
std::cerr << "ParticipantLocationListenerImpl::"
<< "on_sample_lost" << std::endl;
std::cerr << "BitListener::on_sample_lost" << std::endl;
}

bool ParticipantLocationListenerImpl::check(bool print_results)
bool BitListener::check(bool print_results)
{
const unsigned long expected =
OpenDDS::DCPS::LOCATION_LOCAL
Expand Down Expand Up @@ -180,7 +204,10 @@ bool ParticipantLocationListenerImpl::check(bool print_results)

bool found = false;
for (LocationMapType::const_iterator pos = location_map.begin(), limit = location_map.end();
pos != limit; ++ pos) {
pos != limit; ++pos) {
if (participants_seen_.count(pos->first) == 0) {
continue;
}
if (print_results) {
std::cout << id_ << " " << pos->first
<< ((pos->second & OpenDDS::DCPS::LOCATION_LOCAL) ? " LOCAL" : "")
Expand Down
74 changes: 74 additions & 0 deletions tests/DCPS/ParticipantLocationTopic/BitListener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
*
*
* Distributed under the OpenDDS License.
* See: http://www.opendds.org/license.html
*/

#ifndef BIT_LISTENER
#define BIT_LISTENER

#include <dds/DdsDcpsCoreTypeSupportC.h>
#include <dds/DdsDcpsSubscriptionC.h>
#include <dds/OpenddsDcpsExtTypeSupportC.h>

#include <dds/DCPS/LocalObject.h>
#include <dds/DCPS/GuidUtils.h>

#include <map>

#if !defined (ACE_LACKS_PRAGMA_ONCE)
#pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

typedef void (*callback_t)();

class BitListener
: public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
public:
BitListener(const std::string& id, bool no_ice, bool ipv6, callback_t done_callback);

virtual ~BitListener();

virtual void on_requested_deadline_missed(DDS::DataReader_ptr reader,
const DDS::RequestedDeadlineMissedStatus& status);

virtual void on_requested_incompatible_qos(DDS::DataReader_ptr reader,
const DDS::RequestedIncompatibleQosStatus& status);

virtual void on_liveliness_changed(DDS::DataReader_ptr reader,
const DDS::LivelinessChangedStatus& status);

virtual void on_subscription_matched(DDS::DataReader_ptr reader,
const DDS::SubscriptionMatchedStatus& status);

virtual void on_sample_rejected(DDS::DataReader_ptr reader,
const DDS::SampleRejectedStatus& status);

virtual void on_data_available(DDS::DataReader_ptr reader);

virtual void on_sample_lost(DDS::DataReader_ptr reader,
const DDS::SampleLostStatus& status);

bool check(bool print_results = true);

private:
ACE_Thread_Mutex mutex_;

typedef std::map<OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::ParticipantLocation, OpenDDS::DCPS::GUID_tKeyLessThan> LocationMapType;
LocationMapType location_map;

const std::string id_;
bool no_ice_;
bool ipv6_;
callback_t done_callback_;
bool done_;

typedef std::set<OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::GUID_tKeyLessThan> GuidSetType;
GuidSetType participants_seen_;

void on_data_available_i(OpenDDS::DCPS::ParticipantLocationBuiltinTopicDataDataReader_var builtin_dr);
void on_data_available_i(DDS::ParticipantBuiltinTopicDataDataReader_var builtin_dr);
};

#endif
5 changes: 0 additions & 5 deletions tests/DCPS/ParticipantLocationTopic/ParticipantLocation.mpc
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
project(ParticipantLocationTopic): dcps_test, dcps_cm, opendds_security, dcps_rtps_udp {
exename = ParticipantLocationTest
requires += built_in_topics

Source_Files {
ParticipantLocationTest.cpp
ParticipantLocationListenerImpl.cpp
}
}

This file was deleted.

Loading

0 comments on commit 932b38f

Please sign in to comment.