Skip to content

Commit

Permalink
Add the peer's SPDP user tag to ParticipantLocation BIT
Browse files Browse the repository at this point in the history
  • Loading branch information
mitza-oci committed Jun 26, 2024
1 parent e0cd8c8 commit dda2497
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 4 deletions.
2 changes: 2 additions & 0 deletions dds/DCPS/RTPS/DiscoveredEntities.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ struct DiscoveredParticipant {
, bit_ih_(DDS::HANDLE_NIL)
, max_seq_(seq)
, seq_reset_count_(0)
, opendds_user_tag_(p.participantProxy.opendds_user_tag)
#if OPENDDS_CONFIG_SECURITY
, have_spdp_info_(false)
, have_sedp_info_(false)
Expand Down Expand Up @@ -155,6 +156,7 @@ struct DiscoveredParticipant {
DDS::InstanceHandle_t bit_ih_;
DCPS::SequenceNumber max_seq_;
ACE_UINT16 seq_reset_count_;
ACE_CDR::ULong opendds_user_tag_;
typedef OPENDDS_LIST(BuiltinAssociationRecord) BuiltinAssociationRecords;
BuiltinAssociationRecords builtin_pending_records_;
BuiltinAssociationRecords builtin_associated_records_;
Expand Down
4 changes: 4 additions & 0 deletions dds/DCPS/RTPS/ParameterListConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ bool from_param_list(const ParameterList& param_list,
proxy.expectsInlineQos = false;
proxy.opendds_participant_flags.bits = 0;
proxy.opendds_rtps_relay_application_participant = false;
proxy.opendds_user_tag = 0;

const CORBA::ULong length = param_list.length();
for (CORBA::ULong i = 0; i < length; ++i) {
Expand Down Expand Up @@ -656,6 +657,9 @@ bool from_param_list(const ParameterList& param_list,
case PID_OPENDDS_RTPS_RELAY_APPLICATION_PARTICIPANT:
proxy.opendds_rtps_relay_application_participant = param.opendds_rtps_relay_application_participant();
break;
case PID_OPENDDS_SPDP_USER_TAG:
proxy.opendds_user_tag = param.user_tag();
break;
case PID_SENTINEL:
case PID_PAD:
// ignore
Expand Down
5 changes: 5 additions & 0 deletions dds/DCPS/RTPS/RtpsCore.idl
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ module OpenDDS {
const ParameterId_t PID_OPENDDS_ICE_CANDIDATE = PID_OPENDDS_BASE + 4;
const ParameterId_t PID_OPENDDS_PARTICIPANT_FLAGS = PID_OPENDDS_BASE + 5;
const ParameterId_t PID_OPENDDS_RTPS_RELAY_APPLICATION_PARTICIPANT = PID_OPENDDS_BASE + 6;
const ParameterId_t PID_OPENDDS_SPDP_USER_TAG = PID_OPENDDS_BASE + 7;

/* Always used inside a ParameterList */
/* custom de/serializer implemented in opendds_idl */
Expand Down Expand Up @@ -517,6 +518,9 @@ module OpenDDS {
case PID_XTYPES_TYPE_CONSISTENCY:
DDS::TypeConsistencyEnforcementQosPolicy type_consistency;

case PID_OPENDDS_SPDP_USER_TAG:
unsigned long user_tag;

default:
DDS::OctetSeq unknown_data;
};
Expand Down Expand Up @@ -622,6 +626,7 @@ module OpenDDS {
#if OPENDDS_CONFIG_SECURITY
DDS::Security::ExtendedBuiltinEndpointSet_t availableExtendedBuiltinEndpoints;
#endif
unsigned long opendds_user_tag;
};

// top-level data type for SPDP
Expand Down
61 changes: 57 additions & 4 deletions dds/DCPS/RTPS/Spdp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ void Spdp::process_location_updates_i(const DiscoveredParticipantIter& iter, con

DCPS::ParticipantLocationBuiltinTopicData& location_data = iter->second.location_data_;
location_data.lease_duration = leaseDuration.to_dds_duration();
location_data.user_tag = iter->second.opendds_user_tag_;

bool published = false;
for (DiscoveredParticipant::LocationUpdateList::const_iterator pos = location_updates.begin(),
Expand Down Expand Up @@ -2904,12 +2905,31 @@ Spdp::SpdpTransport::write_i(const DCPS::GUID_t& guid, const DCPS::NetworkAddres
wbuff_.reset();
DCPS::Serializer ser(&wbuff_, encoding_plain_native);
DCPS::EncapsulationHeader encap(ser.encoding(), DCPS::MUTABLE);
if (!(ser << hdr_) || !(ser << info_dst) || !(ser << data_) || !(ser << encap)
|| !(ser << plist)) {
if (!(ser << hdr_)) {
if (log_level >= LogLevel::Error) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i: ")
ACE_TEXT("failed to serialize RTPS header for SPDP\n")));
}
}

// The implementation-specific UserTagSubmessage is designed to directly
// follow the RTPS Message Header. No other submessages should be added
// before it. This enables filtering based on a fixed offset.
if (user_tag_.smHeader.submessageId && !(ser << user_tag_)) {
if (log_level >= LogLevel::Error) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i: ")
ACE_TEXT("failed to serialize user tag for SPDP\n")));
}
return;
}

if (!(ser << info_dst) || !(ser << data_) || !(ser << encap) || !(ser << plist)) {
if (DCPS::DCPS_debug_level > 0) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i() - ")
ACE_TEXT("failed to serialize headers for SPDP\n")));
ACE_TEXT("failed to serialize submessages for SPDP\n")));
}
return;
}
Expand Down Expand Up @@ -3101,13 +3121,14 @@ Spdp::SpdpTransport::handle_input(ACE_HANDLE h)
}

DCPS::GuidPrefix_t destinationGuidPrefix = {0};
ACE_CDR::ULong userTag = 0;

while (buff_.length() > 3) {
const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1];
ser.swap_bytes((flags & FLAG_E) != ACE_CDR_BYTE_ORDER);
const size_t start = buff_.length();
CORBA::UShort submessageLength = 0;
switch (subm) {
switch (static_cast<ACE_CDR::Octet>(subm)) {
case DATA: {
DataSubmessage data;
if (!(ser >> data)) {
Expand Down Expand Up @@ -3161,6 +3182,12 @@ Spdp::SpdpTransport::handle_input(ACE_HANDLE h)
plist[0]._d(PID_PARTICIPANT_GUID);
}

if (userTag) {
const ACE_CDR::ULong len = plist.length();
plist.length(len + 1);
plist[len].user_tag(userTag);
}

DCPS::RcHandle<Spdp> outer = outer_.lock();
if (outer) {
outer->data_received(data, plist, remote_na);
Expand All @@ -3183,6 +3210,32 @@ Spdp::SpdpTransport::handle_input(ACE_HANDLE h)
}
break;
}
case SUBMESSAGE_KIND_USER_TAG: {
if (hdr_.vendorId == VENDORID_OPENDDS) {
UserTagSubmessage sm;
if (!(ser >> sm)) {
if (DCPS::DCPS_debug_level > 0) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - "
"failed to deserialize UserTagSubmessage for SPDP\n"));
}
return 0;
}
submessageLength = sm.smHeader.submessageLength;
userTag = sm.userTag;
} else {
SubmessageHeader smHeader;
if (!(ser >> smHeader)) {
if (DCPS::DCPS_debug_level > 0) {
ACE_ERROR((LM_ERROR,
ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n")));
}
return 0;
}
submessageLength = smHeader.submessageLength;
}
break;
}
default:
SubmessageHeader smHeader;
if (!(ser >> smHeader)) {
Expand Down
1 change: 1 addition & 0 deletions dds/OpenddsDcpsExt.idl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ module OpenDDS
string relay6_addr;
DDS::Time_t relay6_timestamp;
DDS::Duration_t lease_duration;
unsigned long user_tag;
};

const string RTPS_RELAY_STUN_PROTOCOL = "RtpsRelay:STUN";
Expand Down
1 change: 1 addition & 0 deletions tests/transport/spdp/spdp_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ bool run_test()
#if OPENDDS_CONFIG_SECURITY
, availableExtendedBuiltinEndpoints
#endif
, 0
},
{ // Duration_t (leaseDuration)
static_cast<CORBA::Long>((rd.resend_period() * 10).value().sec()),
Expand Down

0 comments on commit dda2497

Please sign in to comment.