Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc DDS changes #12467

Merged
merged 10 commits into from
Dec 5, 2023
81 changes: 67 additions & 14 deletions third-party/realdds/include/realdds/dds-guid.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "dds-defines.h"
#include <fastdds/rtps/common/Guid.h>
#include <iosfwd>


namespace realdds {
Expand All @@ -13,25 +14,77 @@ namespace realdds {
static constexpr auto & unknown_guid = eprosima::fastrtps::rtps::c_Guid_Unknown;


// Custom GUID printer: attempts a more succinct representation
// If a base_prefix is provided, will try to minimize a common denominator (vendor, host, etc.) -- you can use your
// participant's guid if you want to shorten
// Convert a prefix to a hex string: not human-readable!
// In one of two formats:
// 223344556677.<0xPID> // eProsima
// Or:
// 001122334455667788990011 // everything else
// Used internally by next function.
//
std::string print( dds_guid const & guid,
dds_guid_prefix const & base_prefix = unknown_guid.guidPrefix,
bool readable_name = true );
struct print_raw_guid_prefix
{
dds_guid_prefix const & _prefix;
explicit print_raw_guid_prefix( dds_guid_prefix const & prefix,
dds_guid_prefix const & /*base_prefix*/ = unknown_guid.guidPrefix )
: _prefix( prefix )
{
}
};
std::ostream & operator<<( std::ostream &, print_raw_guid_prefix const & );


// Same as above, without a base prefix
inline std::string print( dds_guid const & guid, bool readable_name )
// Custom GUID printer: attempts a more succinct representation:
// If the participant is known, its name will be shown instead of the raw bytes.
// <name-or-prefix>.<entity-id-in-hex>
// If a base_prefix is provided, will try to minimize a common denominator -- you can use your
// participant's guid if you want to shorten.
//
struct print_guid
{
return print( guid, unknown_guid.guidPrefix, readable_name );
}
dds_guid const & _guid;
dds_guid_prefix const & _base_prefix;

explicit print_guid( dds_guid const & guid, dds_guid_prefix const & base_prefix = unknown_guid.guidPrefix )
: _guid( guid )
, _base_prefix( base_prefix )
{
}
explicit print_guid( dds_guid const & guid, dds_guid const & base_guid )
: print_guid( guid, base_guid.guidPrefix )
{
}
};
std::ostream & operator<<( std::ostream &, print_guid const & );


// Same as above, with a guid base for flexibility
inline std::string print( dds_guid const & guid, dds_guid const & base_guid, bool readable_name = true )
// Same, except leaves output in raw form (bytes, not name)
// <prefix>.<entity-id-in-hex>
//
struct print_raw_guid
{
return print( guid, base_guid.guidPrefix, readable_name );
}
dds_guid const & _guid;
dds_guid_prefix const & _base_prefix;

explicit print_raw_guid( dds_guid const & guid, dds_guid_prefix const & base_prefix = unknown_guid.guidPrefix )
: _guid( guid )
, _base_prefix( base_prefix )
{
}
explicit print_raw_guid( dds_guid const & guid, dds_guid const & base_guid )
: print_raw_guid( guid, base_guid.guidPrefix )
{
}
};
std::ostream & operator<<( std::ostream &, print_raw_guid const & );


// The reverse: get a guid from a RAW guid string.
// Expecting one of two formats:
// 223344556677.<0xPID>.<0xEID> // eProsima
// Or:
// 001122334455667788990011.<0xEID> // everything else
// Returns unknown_guid if not parseable
dds_guid guid_from_string( std::string const & );


} // namespace realdds
3 changes: 3 additions & 0 deletions third-party/realdds/include/realdds/dds-topic-writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class dds_topic_writer : protected eprosima::fastdds::dds::DataWriterListener
// The callbacks should be set before we actually create the underlying DDS objects, so the writer does not
void run( qos const & = qos() );

// Waits until all changes were acknowledged; return false on timeout
bool wait_for_acks( dds_time timeout );

// DataWriterListener
protected:
// Called when the Publisher is matched (or unmatched) against an endpoint
Expand Down
89 changes: 75 additions & 14 deletions third-party/realdds/py/pyrealdds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
#include <realdds/dds-stream-sensor-bridge.h>
#include <realdds/dds-metadata-syncer.h>

#include <rsutils/os/special-folder.h>
#include <rsutils/os/executable-name.h>
#include <rsutils/easylogging/easyloggingpp.h>
#include <rsutils/string/from.h>
#include <rsutils/json.h>

#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
Expand All @@ -45,12 +49,6 @@

namespace {

std::string to_string( realdds::dds_guid const & guid )
{
return realdds::print( guid );
}


py::list get_vector3( geometry_msgs::msg::Vector3 const & v )
{
py::list obj( 3 );
Expand All @@ -69,6 +67,55 @@ void set_vector3( geometry_msgs::msg::Vector3 & v, std::array< double, 3 > const
}


std::string script_name()
{
// Returns the name of the python script that's currently running us
return rsutils::os::base_name(
py::module_::import( "__main__" ).attr( "__file__" ).cast< std::string >() );
}


nlohmann::json load_rs_settings( nlohmann::json const & local_settings )
{
nlohmann::json config;

// Load the realsense configuration file settings
std::ifstream f( rsutils::os::get_special_folder( rsutils::os::special_folder::app_data ) + "realsense-config.json" );
if( f.good() )
{
try
{
config = nlohmann::json::parse( f );
}
catch( std::exception const & e )
{
throw std::runtime_error( "failed to load configuration file: " + std::string( e.what() ) );
}
}

// Load "python"-specific settings
auto settings = rsutils::json::load_app_settings( config, "python", "context", "config-file" );

// Take the "dds" settings only
settings = rsutils::json::nested( settings, "dds" );

// Patch any script-specific settings
// NOTE: this is also accessed by pyrealsense2, where a "context" hierarchy is still used
auto script = script_name();
if( auto script_settings = rsutils::json::nested( config, script, "context", "dds" ) )
rsutils::json::patch( settings, script_settings, "config-file/" + script + "/context" );

// We should always have DDS enabled
if( settings.is_object() )
settings.erase( "enabled" );

// Patch the given local settings into the configuration
rsutils::json::patch( settings, local_settings, "local settings" );

return settings;
}


} // namespace


Expand All @@ -94,14 +141,19 @@ PYBIND11_MODULE(NAME, m) {
using realdds::dds_guid;
py::class_< dds_guid >( m, "guid" )
.def( py::init<>() )
.def( "__bool__", []( dds_guid const& self ) { return self != dds_guid::unknown(); } )
.def( "__repr__", []( dds_guid const & self ) { return to_string( self ); } )
.def_static( "from_string",
[]( std::string const & raw_guid ) { return realdds::guid_from_string( raw_guid ); } )
.def( "__bool__", []( dds_guid const & self ) { return self != realdds::unknown_guid; } )
.def( "__str__",
[]( dds_guid const & self ) { return rsutils::string::from( realdds::print_guid( (self) ) ).str(); } )
.def( "__repr__",
[]( dds_guid const & self ) { return rsutils::string::from( realdds::print_raw_guid( ( self ) ) ).str(); } )
// Following two (hash and ==) are needed if we want to be able to use guids as dictionary keys
.def( "__hash__",
[]( dds_guid const & self )
{
return std::hash< std::string >{}(
realdds::print( self, false ) ); // use hex; not the human-readable name
rsutils::string::from( realdds::print_raw_guid( self ) ) );
} )
.def( py::self == py::self );

Expand Down Expand Up @@ -149,12 +201,19 @@ PYBIND11_MODULE(NAME, m) {
( char const * topic_name, eprosima::fastrtps::types::DynamicType_ptr dyn_type ),
callback( topic_name, dyn_type->get_name() ); ) );

m.def( "load_rs_settings", &load_rs_settings, "local-settings"_a = nlohmann::json::object() );
m.def( "script_name", &script_name );

py::class_< dds_participant,
std::shared_ptr< dds_participant > // handled with a shared_ptr
>
participant( m, "participant" );
participant.def( py::init<>() )
.def( "init", &dds_participant::init, "domain-id"_a, "participant-name"_a, "settings"_a = nlohmann::json::object() )
.def( "init",
[]( dds_participant & self, nlohmann::json const & local_settings, realdds::dds_domain_id domain_id )
{ self.init( domain_id, script_name(), local_settings ); },
"local-settings"_a = nlohmann::json::object(), "domain-id"_a = -1 )
.def( "init", &dds_participant::init, "domain-id"_a, "participant-name"_a, "local-settings"_a = nlohmann::json::object() )
.def( "is_valid", &dds_participant::is_valid )
.def( "guid", &dds_participant::guid )
.def( "create_guid", &dds_participant::create_guid )
Expand Down Expand Up @@ -182,7 +241,7 @@ PYBIND11_MODULE(NAME, m) {
eprosima::fastdds::dds::DomainParticipantQos qos;
if( ReturnCode_t::RETCODE_OK == self.get()->get_qos( qos ) )
os << " \"" << qos.name() << "\"";
os << " " << to_string( self.guid() );
os << " " << realdds::print_guid( self.guid() );
}
os << ">";
return os.str();
Expand Down Expand Up @@ -298,8 +357,10 @@ PYBIND11_MODULE(NAME, m) {
callback( self, status.current_count_change ); ) )
.def( "topic", &dds_topic_writer::topic )
.def( "run", &dds_topic_writer::run )
.def( "qos", []() { return writer_qos(); } )
.def( "qos", []( reliability r, durability d ) { return writer_qos( r, d ); } );
.def( "has_readers", &dds_topic_writer::has_readers )
.def( "wait_for_acks", &dds_topic_writer::wait_for_acks )
.def_static( "qos", []() { return writer_qos(); } )
.def_static( "qos", []( reliability r, durability d ) { return writer_qos( r, d ); } );


// The actual types are declared as functions and not classes: the py::init<> inheritance rules are pretty strict
Expand Down Expand Up @@ -350,7 +411,7 @@ PYBIND11_MODULE(NAME, m) {
[]( SampleIdentity const & self )
{
std::ostringstream os;
os << to_string( self.writer_guid() );
os << realdds::print_guid( self.writer_guid() );
os << '.';
os << self.sequence_number();
return os.str();
Expand Down
4 changes: 2 additions & 2 deletions third-party/realdds/scripts/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def domain_arg(x):
if t <= 0 or t > 232:
raise ValueError( f'--domain should be [0,232]' )
return t
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=0, help='DDS domain to use (default=0)' )
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' )
args = args.parse_args()


Expand All @@ -36,7 +36,7 @@ def e( *a, **kw ):
dds.debug( args.debug )

participant = dds.participant()
participant.init( args.domain, 'devices' )
participant.init( dds.load_rs_settings( settings ), args.domain )

watcher = dds.device_watcher( participant )
watcher.start()
Expand Down
4 changes: 2 additions & 2 deletions third-party/realdds/scripts/fps.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def domain_arg(x):
if t <= 0 or t > 232:
raise ValueError( f'--domain should be [0,232]' )
return t
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=0, help='DDS domain to use (default=0)' )
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' )
args.add_argument( '--with-metadata', action='store_true', help='stream with metadata, if available (default off)' )
args = args.parse_args()

Expand All @@ -43,7 +43,7 @@ def e( *a, **kw ):
settings['device'] = { 'metadata' : False };

participant = dds.participant()
participant.init( args.domain, 'fps', settings )
participant.init( dds.load_rs_settings( settings ), args.domain )

# Most important is the topic-root: this assumes we know it in advance and do not have to
# wait for a device-info message (which would complicate the code here).
Expand Down
14 changes: 12 additions & 2 deletions third-party/realdds/scripts/topic-send.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ def json_arg(x):
except Exception as e:
raise ArgumentError( str(e) )
args.add_argument( '--message', metavar='<json>', type=json_arg, help='a message to send', default='{"id":"ping","message":"some message"}' )
args.add_argument( '--ack', action='store_true', help='wait for acks' )
def domain_arg(x):
t = int(x)
if t <= 0 or t > 232:
raise ArgumentError( f'--domain should be [0-232]' )
return t
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=0, help='DDS domain to use (default=0)' )
args.add_argument( '--domain', metavar='<0-232>', type=domain_arg, default=-1, help='DDS domain to use (default=0)' )
args = args.parse_args()


Expand All @@ -43,7 +44,7 @@ def e( *a, **kw ):
settings = {}

participant = dds.participant()
participant.init( args.domain, 'topic-send', settings )
participant.init( dds.load_rs_settings( settings ), args.domain )

message = args.message

Expand Down Expand Up @@ -78,7 +79,16 @@ def e( *a, **kw ):
writer.run( dds.topic_writer.qos() )
# Let the client pick up on the new entity - if we send it too quickly, they won't see it before we disappear...
time.sleep( 1 )
if not writer.has_readers():
e( 'No readers exist on topic:', topic_path )
sys.exit( 1 )
start = dds.now()
dds.message.flexible( message ).write_to( writer )
i( f'Sent {message} on {topic_path}' )
if args.ack:
if not writer.wait_for_acks( dds.time( 5. ) ): # seconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we take the timeout from the settings?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but really I see no need. We can improve the script as we go.

e( 'Timeout waiting for ack' )
sys.exit( 1 )
i( f'Acknowledged ({dds.timestr( dds.now(), start )})' )


6 changes: 3 additions & 3 deletions third-party/realdds/src/dds-device-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ void dds_device::impl::handle_notification( nlohmann::json const & j,
if( sample.size() == 2 && sample.is_array() )
{
// We have to be the ones who sent the control!
auto const guid_string = rsutils::json::get< std::string >( sample, 0 );
auto const control_guid_string = realdds::print( _control_writer->get()->guid(), false ); // raw guid
if( guid_string == control_guid_string )
auto const reply_guid = guid_from_string( rsutils::json::get< std::string >( sample, 0 ) );
auto const control_guid = _control_writer->get()->guid();
if( reply_guid == control_guid )
{
auto const sequence_number = rsutils::json::get< uint64_t >( sample, 1 );
std::unique_lock< std::mutex > lock( _replies_mutex );
Expand Down
2 changes: 1 addition & 1 deletion third-party/realdds/src/dds-device-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ void dds_device_server::on_control_message_received()
{
json reply;
reply[sample_key] = json::array( {
realdds::print( sample.sample_identity.writer_guid(), false ), // raw guid
rsutils::string::from( realdds::print_raw_guid( sample.sample_identity.writer_guid() ) ),
sample.sample_identity.sequence_number().to64long(),
} );
try
Expand Down
2 changes: 1 addition & 1 deletion third-party/realdds/src/dds-device-watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void dds_device_watcher::start()
init();
}
LOG_DEBUG( "DDS device watcher started on '" << _participant->get()->get_qos().name() << "' "
<< realdds::print( _participant->guid() ) );
<< realdds::print_guid( _participant->guid() ) );
}

void dds_device_watcher::stop()
Expand Down
Loading
Loading