Skip to content

Commit

Permalink
Merge pull request #2466 from albinsuresh/feat/2428/registration-mess…
Browse files Browse the repository at this point in the history
…age-processing-priority

Ordered processing of entity registration messages before any other messages

This work fixes the message race issue where an out-of-order delivery of entity data messages delivered before the entity registrations themselves results in those data messages getting dropped. The fix results in the following behaviour:

1. All data messages: telemetry and other metadata messages like twin data, received before their respective registration messages, are cached in-memory and not converted.
    * Telemetry messages for all entities are cached in a cache with a capacity of only 100 entries. Hence, when the cache is full, older entries are replaced with newer entries.
    * Metadata messages are cached in unbounded buffers as we can't afford dropping such critical data
1. All child device registration messages received before their parents are also cached
1. When the registration message is received, itself and all its cached child devices are registered, and all their cached data are also processed.
1. Auto-registration must be turned off if explicit registration is used for any entity. Keeping it turned on while using explicit registration can sometimes result in undesired behaviours like nested child devices getting registered as immediate child devices.
  • Loading branch information
Albin Suresh authored Dec 14, 2023
2 parents 290c50c + 2cac9f0 commit b4974b9
Show file tree
Hide file tree
Showing 8 changed files with 1,008 additions and 69 deletions.
105 changes: 84 additions & 21 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use crate::mqtt_topics::Channel;
use crate::mqtt_topics::EntityTopicId;
use crate::mqtt_topics::MqttSchema;
use crate::mqtt_topics::TopicIdError;
use crate::pending_entity_store::PendingEntityData;
use crate::pending_entity_store::PendingEntityStore;
use log::debug;
use mqtt_channel::Message;
use serde_json::json;
Expand Down Expand Up @@ -133,6 +135,7 @@ pub struct EntityStore {
external_id_validator_fn: ExternalIdValidatorFn,
// TODO: this is a c8y cloud specific concern and it'd be better to put it somewhere else.
default_service_type: String,
pending_entity_store: PendingEntityStore,
}

impl EntityStore {
Expand All @@ -150,19 +153,23 @@ impl EntityStore {
SF: 'static + Send + Sync,
{
Self::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device,
"service".to_string(),
external_id_mapper_fn,
external_id_validator_fn,
100,
)
}

#[must_use]
pub fn with_main_device_and_default_service_type<MF, SF>(
mqtt_schema: MqttSchema,
main_device: EntityRegistrationMessage,
default_service_type: String,
external_id_mapper_fn: MF,
external_id_validator_fn: SF,
telemetry_cache_size: usize,
) -> Option<Self>
where
MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId,
Expand Down Expand Up @@ -191,6 +198,7 @@ impl EntityStore {
external_id_mapper: Box::new(external_id_mapper_fn),
external_id_validator_fn: Box::new(external_id_validator_fn),
default_service_type,
pending_entity_store: PendingEntityStore::new(mqtt_schema, telemetry_cache_size),
})
}

Expand Down Expand Up @@ -330,17 +338,58 @@ impl EntityStore {
pub fn update(
&mut self,
message: EntityRegistrationMessage,
) -> Result<(Vec<EntityTopicId>, Vec<PendingEntityData>), Error> {
match self.register_entity(message.clone()) {
Ok(affected_entities) => {
if affected_entities.is_empty() {
Ok((vec![], vec![]))
} else {
let topic_id = message.topic_id.clone();
let current_entity_data =
self.pending_entity_store.take_cached_entity_data(message);
let mut pending_entities = vec![current_entity_data];

let pending_children = self
.pending_entity_store
.take_cached_child_entities_data(&topic_id);
for pending_child in pending_children {
let child_reg_message = pending_child.reg_message.clone();
self.register_entity(child_reg_message)?;
pending_entities.push(pending_child);
}

Ok((affected_entities, pending_entities))
}
}
Err(Error::NoParent(_)) => {
// When a child device registration message is received before the parent is registered,
// cache it in the unregistered entity store to be processed later
self.pending_entity_store
.cache_early_registration_message(message);
Ok((vec![], vec![]))
}
Err(err) => Err(err),
}
}

fn register_entity(
&mut self,
message: EntityRegistrationMessage,
) -> Result<Vec<EntityTopicId>, Error> {
debug!("Processing entity registration message, {:?}", message);
let topic_id = message.topic_id;
let topic_id = message.topic_id.clone();

let mut affected_entities = vec![];

let parent = match message.r#type {
EntityType::MainDevice => None,
EntityType::ChildDevice => message.parent.or_else(|| Some(self.main_device.clone())),
EntityType::ChildDevice => message
.parent
.clone()
.or_else(|| Some(self.main_device.clone())),
EntityType::Service => message
.parent
.clone()
.or_else(|| topic_id.default_parent_identifier())
.or_else(|| Some(self.main_device.clone())),
};
Expand Down Expand Up @@ -513,6 +562,10 @@ impl EntityStore {

Ok(true)
}

pub fn cache_early_data_message(&mut self, message: Message) {
self.pending_entity_store.cache_early_data_message(message)
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -691,6 +744,31 @@ impl EntityRegistrationMessage {
})
}

pub fn new_custom(topic_id: EntityTopicId, r#type: EntityType) -> Self {
EntityRegistrationMessage {
topic_id,
r#type,
external_id: None,
parent: None,
other: Map::new(),
}
}

pub fn with_parent(mut self, parent: EntityTopicId) -> Self {
let _ = self.parent.insert(parent);
self
}

pub fn with_external_id(mut self, external_id: EntityExternalId) -> Self {
let _ = self.external_id.insert(external_id);
self
}

pub fn with_other_fragment(mut self, key: String, value: JsonValue) -> Self {
let _ = self.other.insert(key, value);
self
}

/// Creates a entity registration message for a main device.
pub fn main_device(main_device_id: String) -> Self {
Self {
Expand Down Expand Up @@ -830,7 +908,7 @@ mod tests {
)
.unwrap();

assert_eq!(updated_entities, ["device/main//"]);
assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(
store.child_devices(&EntityTopicId::default_main_device()),
["device/child1//"]
Expand All @@ -845,7 +923,7 @@ mod tests {
.unwrap(),
)
.unwrap();
assert_eq!(updated_entities, ["device/main//"]);
assert_eq!(updated_entities.0, ["device/main//"]);
let children = store.child_devices(&EntityTopicId::default_main_device());
assert!(children.iter().any(|&e| e == "device/child1//"));
assert!(children.iter().any(|&e| e == "device/child2//"));
Expand All @@ -866,7 +944,7 @@ mod tests {
})
.unwrap();

assert_eq!(updated_entities, ["device/main//"]);
assert_eq!(updated_entities.0, ["device/main//"]);
assert_eq!(
store.services(&EntityTopicId::default_main_device()),
["device/main/service/service1"]
Expand All @@ -882,7 +960,7 @@ mod tests {
})
.unwrap();

assert_eq!(updated_entities, ["device/main//"]);
assert_eq!(updated_entities.0, ["device/main//"]);
let services = store.services(&EntityTopicId::default_main_device());
assert!(services
.iter()
Expand All @@ -892,21 +970,6 @@ mod tests {
.any(|&e| e == &EntityTopicId::default_main_service("service2").unwrap()));
}

#[test]
fn forbids_nonexistent_parents() {
let mut store = new_entity_store();

let res = store.update(EntityRegistrationMessage {
topic_id: EntityTopicId::default_main_device(),
external_id: None,
r#type: EntityType::ChildDevice,
parent: Some(EntityTopicId::default_child_device("myawesomeparent").unwrap()),
other: Map::new(),
});

assert!(matches!(res, Err(Error::NoParent(_))));
}

#[test]
fn list_ancestors() {
let mut store = new_entity_store();
Expand Down
2 changes: 2 additions & 0 deletions crates/core/tedge_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub mod messages;
pub mod mqtt_topics;
pub mod parser;
pub mod path;
pub mod pending_entity_store;
mod ring_buffer;
pub mod serialize;
mod software;
pub mod topic;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const ENTITY_ID_SEGMENTS: usize = 4;
/// topic.name
/// );
/// ```
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct MqttSchema {
pub root: String,
}
Expand Down
Loading

1 comment on commit b4974b9

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
370 0 3 370 100 50m26.153999999s

Please sign in to comment.