Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Advanced Pub/Sub feature #1582

Merged
merged 91 commits into from
Dec 11, 2024
Merged

Add Advanced Pub/Sub feature #1582

merged 91 commits into from
Dec 11, 2024

Conversation

OlivierHecart
Copy link
Contributor

@OlivierHecart OlivierHecart commented Nov 6, 2024

This PR adds:

An AdvancedPublisher that can:

  • Cache last published samples to be retrieved by AdvancedSubscribers for history or recovery.
  • Sequence samples to allow AdvancedSubscribers to detect missed samples.
  • Automatically create a Liveliness token to assert its presence.

An AdvancedSubscriber that can:

  • Retrieve historical data from AdvancedPublishers.
  • Detect missed samples.
  • Recover missed samples from AdvancedPublishers.
  • Monitor matching AdvancedPublishers through liveliness to query history of late joiners.

This PR also updates the Connectivity Status and Events to be retrievable through an AdvancedSubscriber.
This PR marks the FetchingSubscriber and PublicationCache as deprecated.

@OlivierHecart OlivierHecart added the new feature Something new is needed label Nov 6, 2024
@OlivierHecart OlivierHecart mentioned this pull request Nov 6, 2024
zenoh/src/api/session.rs Show resolved Hide resolved
zenoh/src/api/admin.rs Outdated Show resolved Hide resolved
zenoh-ext/src/advanced_cache.rs Outdated Show resolved Hide resolved
zenoh-ext/src/advanced_cache.rs Outdated Show resolved Hide resolved
zenoh-ext/src/advanced_cache.rs Outdated Show resolved Hide resolved

/// Change the history size for each resource.
#[zenoh_macros::unstable]
pub fn cache(mut self, config: CacheConfig) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would like having some config: impl Into<CacheConfig>, together with impl From<usize> for CacheConfig, but I assume that's not for the first version, is it?

}
}

macro_rules! spawn_periodoic_queries {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a macro and not just a function?

let session = conf.session.clone();
let key_expr = key_expr.clone().into_owned();

move |s: Sample| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too much indentation, extract the callback in a function please

global_pending_queries: u64,
sequenced_states: HashMap<EntityGlobalId, SourceState<u32>>,
timestamped_states: HashMap<ID, SourceState<Timestamp>>,
session: Session,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

State should not have a Session but a WeakSession, otherwise, keeping a reference to an advanced subscriber prevent the session to be closed, contrary to a normal subscriber.

let mut lock = zlock!(statesref);
let states = &mut *lock;
let source_id = s.source_info().source_id().cloned();
let new = handle_sample(states, s);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Samples are handled with lock held. If there are multiple connections, i.e. P2P, it samples from different publishers to be handled in parallel, and it will block the transports.
Having only one big lock, while publishers are independently managed is a big issue.

let session = conf.session.clone();
let statesref = statesref.clone();
let key_expr = key_expr.clone().into_owned();
move |s: Sample| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too much indentation

let key_expr = key_expr.clone().into_owned();
move |s: Sample| {
if s.kind() == SampleKind::Put {
if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let else please

@OlivierHecart OlivierHecart changed the title Advanced pub/sub Add Advanced Pub/Sub feature Dec 11, 2024
@OlivierHecart
Copy link
Contributor Author

The comments above are relevant but not critical. It has been decided to merge this PR and address those in separate PRs.
See #1656 and #1657

@DenisBiryukov91
Copy link
Contributor

Please do not forget to remove zenoh/default features from zenoh-ext Cargo.toml.

@OlivierHecart OlivierHecart merged commit cb3fa54 into main Dec 11, 2024
24 checks passed
@OlivierHecart OlivierHecart deleted the dev/advanced_pubsub branch December 11, 2024 10:00
@OlivierHecart OlivierHecart restored the dev/advanced_pubsub branch December 11, 2024 10:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
new feature Something new is needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants