Skip to content

Commit

Permalink
Fix undesired propagation of interests to routers from peers (#1516)
Browse files Browse the repository at this point in the history
* Fix undesired propagation of interests to routers from peers

* Properly fix undesired propagation of interests to routers from peers
  • Loading branch information
OlivierHecart authored Oct 10, 2024
1 parent 7811f8e commit 850171f
Showing 1 changed file with 45 additions and 43 deletions.
88 changes: 45 additions & 43 deletions zenoh/src/net/routing/hat/p2p_peer/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,50 +140,52 @@ impl HatInterestTrait for HatCode {
mode,
});

let propagated_mode = if mode.future() {
InterestMode::CurrentFuture
} else {
mode
};
for dst_face in tables.faces.values_mut().filter(|f| {
f.whatami == WhatAmI::Router
|| (options.tokens()
&& mode == InterestMode::Current
&& f.whatami == WhatAmI::Peer
&& !initial_interest(f).map(|i| i.finalized).unwrap_or(true))
}) {
let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst);
get_mut_unchecked(dst_face).local_interests.insert(
id,
InterestState {
options,
res: res.as_ref().map(|res| (*res).clone()),
finalized: propagated_mode == InterestMode::Future,
},
);
if mode.current() {
let dst_face_mut = get_mut_unchecked(dst_face);
let cancellation_token = dst_face_mut.task_controller.get_cancellation_token();
dst_face_mut
.pending_current_interests
.insert(id, (interest.clone(), cancellation_token));
CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id);
}
let wire_expr = res
.as_ref()
.map(|res| Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client));
dst_face.primitives.send_interest(RoutingContext::with_expr(
Interest {
if face.whatami == WhatAmI::Client {
let propagated_mode = if mode.future() {
InterestMode::CurrentFuture
} else {
mode
};
for dst_face in tables.faces.values_mut().filter(|f| {
f.whatami == WhatAmI::Router
|| (f.whatami == WhatAmI::Peer
&& options.tokens()
&& mode == InterestMode::Current
&& !initial_interest(f).map(|i| i.finalized).unwrap_or(true))
}) {
let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst);
get_mut_unchecked(dst_face).local_interests.insert(
id,
mode: propagated_mode,
options,
wire_expr,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
},
res.as_ref().map(|res| res.expr()).unwrap_or_default(),
));
InterestState {
options,
res: res.as_ref().map(|res| (*res).clone()),
finalized: propagated_mode == InterestMode::Future,
},
);
if mode.current() {
let dst_face_mut = get_mut_unchecked(dst_face);
let cancellation_token = dst_face_mut.task_controller.get_cancellation_token();
dst_face_mut
.pending_current_interests
.insert(id, (interest.clone(), cancellation_token));
CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id);
}
let wire_expr = res.as_ref().map(|res| {
Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client)
});
dst_face.primitives.send_interest(RoutingContext::with_expr(
Interest {
id,
mode: propagated_mode,
options,
wire_expr,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
},
res.as_ref().map(|res| res.expr()).unwrap_or_default(),
));
}
}

if mode.current() {
Expand Down

0 comments on commit 850171f

Please sign in to comment.