From dda2497e405faabcc25ff22891feb038465fe381 Mon Sep 17 00:00:00 2001 From: Adam Mitz Date: Wed, 26 Jun 2024 21:24:38 +0000 Subject: [PATCH] Add the peer's SPDP user tag to ParticipantLocation BIT --- dds/DCPS/RTPS/DiscoveredEntities.h | 2 + dds/DCPS/RTPS/ParameterListConverter.cpp | 4 ++ dds/DCPS/RTPS/RtpsCore.idl | 5 ++ dds/DCPS/RTPS/Spdp.cpp | 61 ++++++++++++++++++++++-- dds/OpenddsDcpsExt.idl | 1 + tests/transport/spdp/spdp_transport.cpp | 1 + 6 files changed, 70 insertions(+), 4 deletions(-) diff --git a/dds/DCPS/RTPS/DiscoveredEntities.h b/dds/DCPS/RTPS/DiscoveredEntities.h index 77183ad0922..db6bcedb76e 100644 --- a/dds/DCPS/RTPS/DiscoveredEntities.h +++ b/dds/DCPS/RTPS/DiscoveredEntities.h @@ -87,6 +87,7 @@ struct DiscoveredParticipant { , bit_ih_(DDS::HANDLE_NIL) , max_seq_(seq) , seq_reset_count_(0) + , opendds_user_tag_(p.participantProxy.opendds_user_tag) #if OPENDDS_CONFIG_SECURITY , have_spdp_info_(false) , have_sedp_info_(false) @@ -155,6 +156,7 @@ struct DiscoveredParticipant { DDS::InstanceHandle_t bit_ih_; DCPS::SequenceNumber max_seq_; ACE_UINT16 seq_reset_count_; + ACE_CDR::ULong opendds_user_tag_; typedef OPENDDS_LIST(BuiltinAssociationRecord) BuiltinAssociationRecords; BuiltinAssociationRecords builtin_pending_records_; BuiltinAssociationRecords builtin_associated_records_; diff --git a/dds/DCPS/RTPS/ParameterListConverter.cpp b/dds/DCPS/RTPS/ParameterListConverter.cpp index 5dec08e2a44..e4f319a9097 100644 --- a/dds/DCPS/RTPS/ParameterListConverter.cpp +++ b/dds/DCPS/RTPS/ParameterListConverter.cpp @@ -579,6 +579,7 @@ bool from_param_list(const ParameterList& param_list, proxy.expectsInlineQos = false; proxy.opendds_participant_flags.bits = 0; proxy.opendds_rtps_relay_application_participant = false; + proxy.opendds_user_tag = 0; const CORBA::ULong length = param_list.length(); for (CORBA::ULong i = 0; i < length; ++i) { @@ -656,6 +657,9 @@ bool from_param_list(const ParameterList& param_list, case PID_OPENDDS_RTPS_RELAY_APPLICATION_PARTICIPANT: proxy.opendds_rtps_relay_application_participant = param.opendds_rtps_relay_application_participant(); break; + case PID_OPENDDS_SPDP_USER_TAG: + proxy.opendds_user_tag = param.user_tag(); + break; case PID_SENTINEL: case PID_PAD: // ignore diff --git a/dds/DCPS/RTPS/RtpsCore.idl b/dds/DCPS/RTPS/RtpsCore.idl index 2f939b42738..de735cd166f 100644 --- a/dds/DCPS/RTPS/RtpsCore.idl +++ b/dds/DCPS/RTPS/RtpsCore.idl @@ -345,6 +345,7 @@ module OpenDDS { const ParameterId_t PID_OPENDDS_ICE_CANDIDATE = PID_OPENDDS_BASE + 4; const ParameterId_t PID_OPENDDS_PARTICIPANT_FLAGS = PID_OPENDDS_BASE + 5; const ParameterId_t PID_OPENDDS_RTPS_RELAY_APPLICATION_PARTICIPANT = PID_OPENDDS_BASE + 6; + const ParameterId_t PID_OPENDDS_SPDP_USER_TAG = PID_OPENDDS_BASE + 7; /* Always used inside a ParameterList */ /* custom de/serializer implemented in opendds_idl */ @@ -517,6 +518,9 @@ module OpenDDS { case PID_XTYPES_TYPE_CONSISTENCY: DDS::TypeConsistencyEnforcementQosPolicy type_consistency; + case PID_OPENDDS_SPDP_USER_TAG: + unsigned long user_tag; + default: DDS::OctetSeq unknown_data; }; @@ -622,6 +626,7 @@ module OpenDDS { #if OPENDDS_CONFIG_SECURITY DDS::Security::ExtendedBuiltinEndpointSet_t availableExtendedBuiltinEndpoints; #endif + unsigned long opendds_user_tag; }; // top-level data type for SPDP diff --git a/dds/DCPS/RTPS/Spdp.cpp b/dds/DCPS/RTPS/Spdp.cpp index 1842f74458c..1ad0dc0cbed 100644 --- a/dds/DCPS/RTPS/Spdp.cpp +++ b/dds/DCPS/RTPS/Spdp.cpp @@ -532,6 +532,7 @@ void Spdp::process_location_updates_i(const DiscoveredParticipantIter& iter, con DCPS::ParticipantLocationBuiltinTopicData& location_data = iter->second.location_data_; location_data.lease_duration = leaseDuration.to_dds_duration(); + location_data.user_tag = iter->second.opendds_user_tag_; bool published = false; for (DiscoveredParticipant::LocationUpdateList::const_iterator pos = location_updates.begin(), @@ -2904,12 +2905,31 @@ Spdp::SpdpTransport::write_i(const DCPS::GUID_t& guid, const DCPS::NetworkAddres wbuff_.reset(); DCPS::Serializer ser(&wbuff_, encoding_plain_native); DCPS::EncapsulationHeader encap(ser.encoding(), DCPS::MUTABLE); - if (!(ser << hdr_) || !(ser << info_dst) || !(ser << data_) || !(ser << encap) - || !(ser << plist)) { + if (!(ser << hdr_)) { + if (log_level >= LogLevel::Error) { + ACE_ERROR((LM_ERROR, + ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i: ") + ACE_TEXT("failed to serialize RTPS header for SPDP\n"))); + } + } + + // The implementation-specific UserTagSubmessage is designed to directly + // follow the RTPS Message Header. No other submessages should be added + // before it. This enables filtering based on a fixed offset. + if (user_tag_.smHeader.submessageId && !(ser << user_tag_)) { + if (log_level >= LogLevel::Error) { + ACE_ERROR((LM_ERROR, + ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i: ") + ACE_TEXT("failed to serialize user tag for SPDP\n"))); + } + return; + } + + if (!(ser << info_dst) || !(ser << data_) || !(ser << encap) || !(ser << plist)) { if (DCPS::DCPS_debug_level > 0) { ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i() - ") - ACE_TEXT("failed to serialize headers for SPDP\n"))); + ACE_TEXT("failed to serialize submessages for SPDP\n"))); } return; } @@ -3101,13 +3121,14 @@ Spdp::SpdpTransport::handle_input(ACE_HANDLE h) } DCPS::GuidPrefix_t destinationGuidPrefix = {0}; + ACE_CDR::ULong userTag = 0; while (buff_.length() > 3) { const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1]; ser.swap_bytes((flags & FLAG_E) != ACE_CDR_BYTE_ORDER); const size_t start = buff_.length(); CORBA::UShort submessageLength = 0; - switch (subm) { + switch (static_cast(subm)) { case DATA: { DataSubmessage data; if (!(ser >> data)) { @@ -3161,6 +3182,12 @@ Spdp::SpdpTransport::handle_input(ACE_HANDLE h) plist[0]._d(PID_PARTICIPANT_GUID); } + if (userTag) { + const ACE_CDR::ULong len = plist.length(); + plist.length(len + 1); + plist[len].user_tag(userTag); + } + DCPS::RcHandle outer = outer_.lock(); if (outer) { outer->data_received(data, plist, remote_na); @@ -3183,6 +3210,32 @@ Spdp::SpdpTransport::handle_input(ACE_HANDLE h) } break; } + case SUBMESSAGE_KIND_USER_TAG: { + if (hdr_.vendorId == VENDORID_OPENDDS) { + UserTagSubmessage sm; + if (!(ser >> sm)) { + if (DCPS::DCPS_debug_level > 0) { + ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - " + "failed to deserialize UserTagSubmessage for SPDP\n")); + } + return 0; + } + submessageLength = sm.smHeader.submessageLength; + userTag = sm.userTag; + } else { + SubmessageHeader smHeader; + if (!(ser >> smHeader)) { + if (DCPS::DCPS_debug_level > 0) { + ACE_ERROR((LM_ERROR, + ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ") + ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n"))); + } + return 0; + } + submessageLength = smHeader.submessageLength; + } + break; + } default: SubmessageHeader smHeader; if (!(ser >> smHeader)) { diff --git a/dds/OpenddsDcpsExt.idl b/dds/OpenddsDcpsExt.idl index 90b354ee2fd..e6492eae24b 100644 --- a/dds/OpenddsDcpsExt.idl +++ b/dds/OpenddsDcpsExt.idl @@ -57,6 +57,7 @@ module OpenDDS string relay6_addr; DDS::Time_t relay6_timestamp; DDS::Duration_t lease_duration; + unsigned long user_tag; }; const string RTPS_RELAY_STUN_PROTOCOL = "RtpsRelay:STUN"; diff --git a/tests/transport/spdp/spdp_transport.cpp b/tests/transport/spdp/spdp_transport.cpp index 9b88b246c95..1793fab50f4 100644 --- a/tests/transport/spdp/spdp_transport.cpp +++ b/tests/transport/spdp/spdp_transport.cpp @@ -425,6 +425,7 @@ bool run_test() #if OPENDDS_CONFIG_SECURITY , availableExtendedBuiltinEndpoints #endif + , 0 }, { // Duration_t (leaseDuration) static_cast((rd.resend_period() * 10).value().sec()),