Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify notification and control messages using "flexible-message" #11063

Merged
merged 14 commits into from
Nov 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/librealsense2/hpp/rs_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "rs_types.hpp"
#include "rs_sensor.hpp"
#include <array>
#include <cstring>

namespace rs2
{
Expand Down Expand Up @@ -117,6 +118,12 @@ namespace rs2
{
return _dev;
}
bool operator<( device const & other ) const
{
return (
std::strcmp( get_info( RS2_CAMERA_INFO_SERIAL_NUMBER ), other.get_info( RS2_CAMERA_INFO_SERIAL_NUMBER ) )
< 0 );
}

template<class T>
bool is() const
Expand Down
40 changes: 39 additions & 1 deletion include/librealsense2/utilities/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,45 @@ T get( nlohmann::json const & j, char const * key )
}
catch( nlohmann::json::exception & e )
{
throw std::runtime_error( "[while getting '" + std::string( key ) + "']" + e.what() );
throw std::runtime_error( "[getting '" + std::string( key ) + "']" + e.what() );
}
}


// If there, returns the value at the given index (in an array); otherwise throws!
// Turns json exceptions into runtime errors with additional info.
template < class T >
T get( nlohmann::json const & j, int index )
{
try
{
// This will throw for type mismatches, etc.
// Does not check for existence: will throw, too!
return j.at( index ).get< T >();
}
catch( nlohmann::json::exception & e )
{
throw std::runtime_error( "[getting index " + std::to_string( index ) + "]" + e.what() );
}
}


// If there, returns the value at the given iterator; otherwise throws!
// Turns json exceptions into runtime errors with additional info.
template < class T >
T get( nlohmann::json const & j, nlohmann::json::const_iterator const & it )
{
if( it == j.end() )
throw std::runtime_error( "unexpected end of json" );
try
{
// This will throw for type mismatches, etc.
// Does not check for existence: will throw, too!
return it->get< T >();
}
catch( nlohmann::json::exception & e )
{
throw std::runtime_error( std::string( "[getting iterator]" ) + e.what() );
}
}

Expand Down
257 changes: 174 additions & 83 deletions src/context.cpp

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions third-party/realdds/include/realdds/dds-device-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace realdds {

// Forward declaration
namespace topics {
class notification;
class flexible_msg;
namespace raw {
class device_info;
} // namespace raw
Expand Down Expand Up @@ -60,7 +60,7 @@ class dds_device_server
void start_streaming( const std::string & stream_name, const image_header & header );

void publish_image( const std::string & stream_name, const uint8_t * data, size_t size );
void publish_notification( topics::notification && );
void publish_notification( topics::flexible_msg && );

private:
std::shared_ptr< dds_publisher > _publisher;
Expand Down
2 changes: 1 addition & 1 deletion third-party/realdds/include/realdds/dds-device.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class dds_device
size_t foreach_stream( std::function< void( std::shared_ptr< dds_stream > stream ) > fn ) const;

void open( const dds_stream_profiles & profiles );
void close( const std::vector< std::pair< int16_t, int8_t > > & stream_uids );
void close( const dds_streams & streams );

private:
class impl;
Expand Down
10 changes: 5 additions & 5 deletions third-party/realdds/include/realdds/dds-notification-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#pragma once


#include <realdds/topics/notification/notification-msg.h>
#include <realdds/topics/flexible/flexible-msg.h>
#include <librealsense2/utilities/concurrency/concurrency.h>

#include <memory>
Expand Down Expand Up @@ -35,19 +35,19 @@ class dds_notification_server
bool is_running() const { return _active; }

// On-demand notification: these happen sequentially and from another thread
void send_notification( topics::notification && notification );
void send_notification( topics::flexible_msg && notification );

// On-discovery notification: when a new client is detected
void add_discovery_notification( topics::notification && notification );
void add_discovery_notification( topics::flexible_msg && notification );

private:
void send_discovery_notifications();

std::shared_ptr< dds_publisher > _publisher;
std::shared_ptr< dds_topic_writer > _writer;
active_object<> _notifications_loop;
single_consumer_queue< topics::raw::notification > _instant_notifications;
std::vector< topics::raw::notification > _discovery_notifications;
single_consumer_queue< topics::raw::flexible > _instant_notifications;
std::vector< topics::raw::flexible > _discovery_notifications;
std::mutex _notification_send_mutex;
std::condition_variable _send_notification_cv;
std::atomic_bool _send_init_msgs = { false };
Expand Down
78 changes: 29 additions & 49 deletions third-party/realdds/include/realdds/dds-stream-profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,6 @@
namespace realdds {


union dds_stream_uid
{
uint32_t whole = 0;
struct
{
int16_t sid; // Stream ID; assigned by the server, but may not be unique because of index
int8_t index; // Used to distinguish similar streams like IR L / R, 0 otherwise
};

dds_stream_uid() = default;
dds_stream_uid( dds_stream_uid const & ) = default;
dds_stream_uid( dds_stream_uid && ) = default;

dds_stream_uid( uint32_t whole_ )
: whole( whole_ )
{
}

dds_stream_uid( int sid_, int index_ )
{
whole = 0; // it covers an extra byte, which needs to be 0
sid = static_cast<int16_t>( sid_ );
index = static_cast<int8_t>( index_ );
}

std::string to_string() const;
};


inline bool operator<( dds_stream_uid const & l, dds_stream_uid const & r )
{
return l.whole < r.whole;
}


// Similar to fourcc, this describes how a stream data is organized. The characters are zero-terminated so it can be
// shorter than 'size' and can be easily converted from/to string.
//
Expand Down Expand Up @@ -83,39 +48,52 @@ class dds_stream_base;

class dds_stream_profile
{
dds_stream_uid _uid;
dds_stream_format _format;
int16_t _frequency; // "Frames" per second
dds_stream_format _format;

std::weak_ptr< dds_stream_base > _stream;

public:
virtual ~dds_stream_profile() {}

protected:
dds_stream_profile( dds_stream_uid uid, dds_stream_format format, int16_t frequency )
: _uid( uid )
, _format( format )
dds_stream_profile( dds_stream_format format, int16_t frequency )
: _format( format )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is more efficient to initialize members by declaration order.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK the init order is the order of decl, whatever order you write them in.

, _frequency( frequency )
{
}
dds_stream_profile( dds_stream_profile && ) = default;
dds_stream_profile( nlohmann::json const &, int & index );

public:
std::shared_ptr< dds_stream_base > stream() const { return _stream.lock(); }
// This is for initialization and is called from dds_stream_base only!
void init_stream( std::weak_ptr< dds_stream_base > const & stream );

dds_stream_uid uid() const { return _uid; }
dds_stream_format format() const { return _format; }
int16_t frequency() const { return _frequency; }

// These are for debugging - not functional
virtual std::string to_string() const;
virtual std::string details_to_string() const;

// Serialization to a JSON representation
// Serialization to a JSON array representation
virtual nlohmann::json to_json() const;

// Build a profile from a json array object, e.g.:
// auto profile = dds_stream_profile::from_json< dds_video_stream_profile >( j );
// This is the reverse of to_json() which returns a json array
template< class final_stream_profile >
static std::shared_ptr< final_stream_profile > from_json( nlohmann::json const & j )
{
int it = 0;
auto profile = std::make_shared< final_stream_profile >( j, it );
verify_end_of_json( j, it ); // just so it's not in the header
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put all of from_json implementation in the cpp? It is not inlined anyway

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a template...

return profile;
}

private:
static void verify_end_of_json( nlohmann::json const &, int index );
};


Expand All @@ -128,21 +106,19 @@ class dds_video_stream_profile : public dds_stream_profile

uint16_t _width; // Resolution width [pixels]
uint16_t _height; // Resolution height [pixels]
uint8_t _bytes_per_pixel;

public:
dds_video_stream_profile( dds_stream_uid uid, dds_stream_format format, int16_t frequency, uint16_t width, uint16_t height, uint8_t bytes_per_pixel )
: dds_stream_profile( uid, format, frequency )
dds_video_stream_profile( dds_stream_format format, int16_t frequency, uint16_t width, uint16_t height )
: super( format, frequency )
, _width( width )
, _height( height )
, _bytes_per_pixel( bytes_per_pixel )
{
}
dds_video_stream_profile( nlohmann::json const &, int & index );
dds_video_stream_profile( dds_video_stream_profile && ) = default;

uint16_t width() const { return _width; }
uint16_t height() const { return _height; }
uint8_t bytes_per_pixel() const { return _bytes_per_pixel; }

std::string details_to_string() const override;

Expand All @@ -155,8 +131,12 @@ class dds_motion_stream_profile : public dds_stream_profile
typedef dds_stream_profile super;

public:
dds_motion_stream_profile( dds_stream_uid uid, dds_stream_format format, int16_t frequency )
: dds_stream_profile( uid, format, frequency )
dds_motion_stream_profile( dds_stream_format format, int16_t frequency )
: super( format, frequency )
{
}
dds_motion_stream_profile( nlohmann::json const & j, int & index )
: super( j, index )
{
}
dds_motion_stream_profile( dds_motion_stream_profile && ) = default;
Expand Down
5 changes: 4 additions & 1 deletion third-party/realdds/include/realdds/dds-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
#include "dds-stream-base.h"
#include "dds-stream-profile.h"

#include <functional>
#include <memory>
#include <string>
#include <vector>

namespace realdds {

Expand Down Expand Up @@ -62,4 +62,7 @@ class dds_motion_stream : public dds_stream
};


typedef std::vector< std::shared_ptr< dds_stream > > dds_streams;


} // namespace realdds
Loading