Skip to content

Commit

Permalink
Fix multiple call to MatchingListener (fix #10) (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch authored Oct 13, 2023
1 parent 2d380bb commit febb1ee
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 83 deletions.
50 changes: 25 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 5 additions & 58 deletions zenoh-plugin-ros2dds/src/route_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ impl Deref for ZPublisher {
}
}

// struct InnerState<'a> {
// TODO? solution to create empty route, and then add an InnerState after containing
// ZPublisher + dds_reader Atomic + Matching Listener that create DDS Reader
//
// not sure it solves since anyway MatchingListener holds a &Publisher
// }

// a route from DDS to Zenoh
#[allow(clippy::upper_case_acronyms)]
#[derive(Serialize)]
Expand All @@ -86,15 +79,15 @@ pub struct RoutePublisher<'a> {
dds_reader: Arc<AtomicDDSEntity>,
// TypeInfo for Reader creation (if available)
#[serde(skip)]
type_info: Option<Arc<TypeInfo>>,
_type_info: Option<Arc<TypeInfo>>,
// if the topic is keyless
#[serde(skip)]
keyless: bool,
// the QoS for the DDS Reader to be created.
// those are either the QoS announced by a remote bridge on a Reader discovery,
// either the QoS adapted from a local disovered Writer
#[serde(skip)]
reader_qos: Qos,
_reader_qos: Qos,
// a liveliness token associated to this route, for announcement to other plugins
#[serde(skip)]
liveliness_token: Option<LivelinessToken<'a>>,
Expand Down Expand Up @@ -269,55 +262,15 @@ impl RoutePublisher<'_> {
cache_size,
},
dds_reader,
type_info: type_info.clone(),
reader_qos,
_type_info: type_info.clone(),
_reader_qos: reader_qos,
keyless,
liveliness_token: None,
remote_routes: HashSet::new(),
local_nodes: HashSet::new(),
})
}

fn activate_dds_reader(&mut self) -> Result<(), String> {
let topic_name = format!("rt{}", self.ros2_name);
let type_name = ros2_message_type_to_dds_type(&self.ros2_type);
let read_period = get_read_period(&self.context.config, &self.zenoh_key_expr);
let route_id = self.to_string();
let publisher = self.zenoh_publisher.clone();

// create matching DDS Reader that forwards data coming from DDS to Zenoh
let dds_reader = create_dds_reader(
self.context.participant,
topic_name,
type_name,
&self.type_info,
self.keyless,
self.reader_qos.clone(),
read_period,
move |sample: &DDSRawSample| {
do_route_message(
sample, &publisher,
// &self.zenoh_key_expr,
// &self.context.zsession,
&route_id,
);
},
)?;
let old = self.dds_reader.swap(dds_reader, Ordering::Relaxed);
if old != DDS_ENTITY_NULL {
if let Err(e) = delete_dds_entity(old) {
log::warn!("{self}: failed to delete overwritten DDS Reader: {e}");
}
}

// add reader's GID in ros_discovery_info message
self.context
.ros_discovery_mgr
.add_dds_reader(get_guid(&dds_reader)?);

Ok(())
}

fn deactivate_dds_reader(&mut self) {
let dds_reader = self.dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed);
if dds_reader != DDS_ENTITY_NULL {
Expand Down Expand Up @@ -368,12 +321,6 @@ impl RoutePublisher<'_> {
self.remote_routes
.insert(format!("{plugin_id}:{zenoh_key_expr}"));
log::debug!("{self} now serving remote routes {:?}", self.remote_routes);
// if 1st remote route added, activate the DDS Reader
if self.remote_routes.len() == 1 {
if let Err(e) = self.activate_dds_reader() {
log::error!("{self} activation of DDS Reader failed: {e}");
}
}
}

#[inline]
Expand Down Expand Up @@ -462,7 +409,7 @@ fn activate_dds_reader(
// create matching DDS Reader that forwards data coming from DDS to Zenoh
let reader = create_dds_reader(
context.participant,
topic_name,
topic_name.clone(),
type_name,
type_info,
keyless,
Expand Down

0 comments on commit febb1ee

Please sign in to comment.