From 21d5f3973bac88ecf3e6d5209f8010c90e073695 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Thu, 14 Dec 2023 11:03:11 +0000 Subject: [PATCH] Make entity store persistent #2428 Persist the entity store as a JSON lines file. Every registration message and twin data message is persisted as JSON lines. On startup the in-memory entity store is rebuilt by replaying these messages. --- Cargo.lock | 2 + crates/common/mqtt_channel/Cargo.toml | 1 + crates/common/mqtt_channel/src/messages.rs | 41 +- .../src/tedge_config_cli/tedge_config.rs | 7 + crates/core/c8y_api/src/json_c8y.rs | 15 +- crates/core/tedge/src/cli/init.rs | 9 + crates/core/tedge_api/Cargo.toml | 1 + crates/core/tedge_api/src/entity_store.rs | 359 ++++++++++++++---- crates/core/tedge_api/src/lib.rs | 1 + crates/core/tedge_api/src/message_log.rs | 117 ++++++ crates/extensions/c8y_mapper_ext/src/actor.rs | 2 + .../extensions/c8y_mapper_ext/src/config.rs | 5 + .../c8y_mapper_ext/src/converter.rs | 6 + .../c8y_mapper_ext/src/inventory.rs | 13 +- .../c8y_mapper_ext/src/service_monitor.rs | 7 +- crates/extensions/c8y_mapper_ext/src/tests.rs | 1 + crates/extensions/tedge_mqtt_ext/src/lib.rs | 1 + .../registration/device_registration.robot | 27 +- 18 files changed, 536 insertions(+), 79 deletions(-) create mode 100644 crates/core/tedge_api/src/message_log.rs diff --git a/Cargo.lock b/Cargo.lock index e675e7bde78..53a3edf4cc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2192,6 +2192,7 @@ dependencies = [ "mqtt_tests", "rumqttc", "serde", + "serde_json", "serial_test", "thiserror", "tokio", @@ -3845,6 +3846,7 @@ dependencies = [ "serde", "serde_json", "shell-words", + "tempfile", "test-case", "thiserror", "time", diff --git a/crates/common/mqtt_channel/Cargo.toml b/crates/common/mqtt_channel/Cargo.toml index 3fee030dbf5..bf0be1f7dfe 100644 --- a/crates/common/mqtt_channel/Cargo.toml +++ b/crates/common/mqtt_channel/Cargo.toml @@ -23,4 +23,5 @@ zeroize = { workspace = true } [dev-dependencies] anyhow = { workspace = true } mqtt_tests = { workspace = true } +serde_json = { workspace = true } serial_test = { workspace = true } diff --git a/crates/common/mqtt_channel/src/messages.rs b/crates/common/mqtt_channel/src/messages.rs index b9dde62d7fa..704b44689c7 100644 --- a/crates/common/mqtt_channel/src/messages.rs +++ b/crates/common/mqtt_channel/src/messages.rs @@ -2,20 +2,43 @@ use crate::errors::MqttError; use crate::topics::Topic; use rumqttc::Publish; use rumqttc::QoS; +use serde::Deserialize; +use serde::Serialize; use std::fmt::Debug; use std::fmt::Formatter; use std::fmt::Write; /// A message to be sent to or received from MQTT. -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct Message { pub topic: Topic, pub payload: DebugPayload, + #[serde(serialize_with = "serialize_qos", deserialize_with = "deserialize_qos")] pub qos: QoS, pub retain: bool, } -#[derive(Clone, Eq, PartialEq)] +fn serialize_qos(qos: &QoS, serializer: S) -> Result +where + S: serde::Serializer, +{ + (*qos as u8).serialize(serializer) +} + +fn deserialize_qos<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let value = u8::deserialize(deserializer)?; + match value { + 0 => Ok(QoS::AtMostOnce), + 1 => Ok(QoS::AtLeastOnce), + 2 => Ok(QoS::ExactlyOnce), + _ => Err(serde::de::Error::custom("Invalid QoS value")), + } +} + +#[derive(Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct DebugPayload(Payload); impl Debug for DebugPayload { @@ -188,4 +211,18 @@ mod tests { "Invalid UTF8 payload: invalid utf-8 sequence of 1 bytes from index 0: ..." ); } + + #[test] + fn message_serialize_deserialize() { + let message = Message { + topic: Topic::new("test").unwrap(), + payload: DebugPayload("payload".as_bytes().to_vec()), + qos: QoS::AtMostOnce, + retain: true, + }; + + let json = serde_json::to_string(&message).expect("Serialization failed"); + let deserialized: Message = serde_json::from_str(&json).expect("Deserialization failed"); + assert_eq!(deserialized, message); + } } diff --git a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs index 5a382a7f278..941aff627d3 100644 --- a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs @@ -700,6 +700,13 @@ define_tedge_config! { path: Utf8PathBuf, }, + db: { + /// The directory used to store any persistent data that survives tedge updates and even firmware updates + #[tedge_config(example = "/data/tedge", default(value = "/data/tedge"))] + #[doku(as = "PathBuf")] + path: Utf8PathBuf, + }, + data: { /// The directory used to store data like cached files, runtime metadata, etc. #[tedge_config(example = "/var/tedge", default(value = "/var/tedge"))] diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index b816d0a3de1..28e29fa0ab9 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -404,6 +404,7 @@ mod tests { use tedge_api::event::ThinEdgeEventData; use tedge_api::messages::SoftwareListCommandPayload; use tedge_api::mqtt_topics::EntityTopicId; + use tedge_api::mqtt_topics::MqttSchema; use test_case::test_case; use time::macros::datetime; @@ -768,11 +769,16 @@ mod tests { ;"convert to clear alarm" )] fn check_alarm_translation(tedge_alarm: ThinEdgeAlarm, expected_c8y_alarm: C8yAlarm) { + let temp_dir = tempfile::tempdir().unwrap(); let main_device = EntityRegistrationMessage::main_device("test-main".into()); - let mut entity_store = EntityStore::with_main_device( + let mut entity_store = EntityStore::with_main_device_and_default_service_type( + MqttSchema::default(), main_device, + "service".into(), dummy_external_id_mapper, dummy_external_id_validator, + 5, + &temp_dir, ) .unwrap(); @@ -800,11 +806,16 @@ mod tests { }), }; + let temp_dir = tempfile::tempdir().unwrap(); let main_device = EntityRegistrationMessage::main_device("test-main".into()); - let entity_store = EntityStore::with_main_device( + let entity_store = EntityStore::with_main_device_and_default_service_type( + MqttSchema::default(), main_device, + "service".into(), dummy_external_id_mapper, dummy_external_id_validator, + 5, + &temp_dir, ) .unwrap(); diff --git a/crates/core/tedge/src/cli/init.rs b/crates/core/tedge/src/cli/init.rs index 90ad65bc116..24c0e689aed 100644 --- a/crates/core/tedge/src/cli/init.rs +++ b/crates/core/tedge/src/cli/init.rs @@ -172,6 +172,15 @@ impl TEdgeInitCmd { ), )?; + create_directory( + &config.db.path, + PermissionEntry::new( + Some(self.user.clone()), + Some(self.group.clone()), + Some(0o775), + ), + )?; + Ok(()) } } diff --git a/crates/core/tedge_api/Cargo.toml b/crates/core/tedge_api/Cargo.toml index eacb84cb257..d3ecb5ce7ed 100644 --- a/crates/core/tedge_api/Cargo.toml +++ b/crates/core/tedge_api/Cargo.toml @@ -35,6 +35,7 @@ clock = { workspace = true } maplit = { workspace = true } mockall = { workspace = true } regex = { workspace = true } +tempfile = { workspace = true } test-case = { workspace = true } time = { workspace = true, features = ["macros"] } toml = { workspace = true } diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 8de93ce2281..40fed9ce973 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -8,6 +8,7 @@ // TODO: move entity business logic to its own module use crate::entity_store; +use crate::message_log::MqttMessageLog; use crate::mqtt_topics::Channel; use crate::mqtt_topics::EntityTopicId; use crate::mqtt_topics::MqttSchema; @@ -15,6 +16,8 @@ use crate::mqtt_topics::TopicIdError; use crate::pending_entity_store::PendingEntityData; use crate::pending_entity_store::PendingEntityStore; use log::debug; +use log::error; +use log::warn; use mqtt_channel::Message; use serde_json::json; use serde_json::Map; @@ -22,6 +25,7 @@ use serde_json::Value as JsonValue; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt::Display; +use std::path::Path; use thiserror::Error; /// Represents an "Entity topic identifier" portion of the MQTT topic @@ -114,6 +118,7 @@ type ExternalIdValidatorFn = /// /// ``` /// # use mqtt_channel::{Message, Topic}; +/// # use tedge_api::mqtt_topics::MqttSchema; /// # use tedge_api::entity_store::{EntityStore, EntityRegistrationMessage}; /// let mqtt_message = Message::new( /// &Topic::new("te/device/main//").unwrap(), @@ -121,13 +126,18 @@ type ExternalIdValidatorFn = /// ); /// let registration_message = EntityRegistrationMessage::try_from(&mqtt_message).unwrap(); /// -/// let mut entity_store = EntityStore::with_main_device( +/// let mut entity_store = EntityStore::with_main_device_and_default_service_type( +/// MqttSchema::default(), /// registration_message, +/// "service".into(), /// |tid, xid| tid.to_string().into(), /// |xid| Ok(xid.into()), +/// 5, +/// "/tmp" /// ); /// ``` pub struct EntityStore { + mqtt_schema: MqttSchema, main_device: EntityTopicId, entities: HashMap, entity_id_index: HashMap, @@ -136,52 +146,36 @@ pub struct EntityStore { // TODO: this is a c8y cloud specific concern and it'd be better to put it somewhere else. default_service_type: String, pending_entity_store: PendingEntityStore, + // The persistent message log to persist entity registrations and twin data messages + message_log: MqttMessageLog, } impl EntityStore { - /// Creates a new entity store with a given main device. - #[must_use] - pub fn with_main_device( - main_device: EntityRegistrationMessage, - external_id_mapper_fn: MF, - external_id_validator_fn: SF, - ) -> Option - where - MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId, - MF: 'static + Send + Sync, - SF: Fn(&str) -> Result, - SF: 'static + Send + Sync, - { - Self::with_main_device_and_default_service_type( - MqttSchema::default(), - main_device, - "service".to_string(), - external_id_mapper_fn, - external_id_validator_fn, - 100, - ) - } - - #[must_use] - pub fn with_main_device_and_default_service_type( + pub fn with_main_device_and_default_service_type( mqtt_schema: MqttSchema, main_device: EntityRegistrationMessage, default_service_type: String, external_id_mapper_fn: MF, external_id_validator_fn: SF, telemetry_cache_size: usize, - ) -> Option + log_dir: P, + ) -> Result where MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId, MF: 'static + Send + Sync, SF: Fn(&str) -> Result, SF: 'static + Send + Sync, + P: AsRef, { if main_device.r#type != EntityType::MainDevice { - return None; + return Err(InitError::Custom( + "Provided main device is not of type main-device".into(), + )); } - let entity_id: EntityExternalId = main_device.external_id?; + let entity_id: EntityExternalId = main_device.external_id.ok_or_else(|| { + InitError::Custom("External id for the main device not provided".into()) + })?; let metadata = EntityMetadata { topic_id: main_device.topic_id.clone(), external_id: entity_id.clone(), @@ -191,7 +185,10 @@ impl EntityStore { twin_data: Map::new(), }; - Some(EntityStore { + let message_log = MqttMessageLog::new(log_dir)?; + + let mut entity_store = EntityStore { + mqtt_schema: mqtt_schema.clone(), main_device: main_device.topic_id.clone(), entities: HashMap::from([(main_device.topic_id.clone(), metadata)]), entity_id_index: HashMap::from([(entity_id, main_device.topic_id)]), @@ -199,7 +196,78 @@ impl EntityStore { external_id_validator_fn: Box::new(external_id_validator_fn), default_service_type, pending_entity_store: PendingEntityStore::new(mqtt_schema, telemetry_cache_size), - }) + message_log, + }; + + entity_store.load_from_message_log(); + + Ok(entity_store) + } + + pub fn load_from_message_log(&mut self) { + loop { + match self.message_log.next_message() { + Err(err) => { + error!("Parsing log entry failed with {err}"); + continue; + } + Ok(None) => return, + Ok(Some(message)) => { + if let Ok((source, channel)) = + self.mqtt_schema.entity_channel_of(&message.topic) + { + match channel { + Channel::EntityMetadata => { + if let Ok(register_message) = + EntityRegistrationMessage::try_from(&message) + { + if let Err(err) = self.update(register_message) { + error!("Failed to re-register {source} from the persistent entity store due to {err}"); + continue; + } + } + } + Channel::EntityTwinData { fragment_key } => { + let fragment_value = if message.payload_bytes().is_empty() { + JsonValue::Null + } else { + match serde_json::from_slice::( + message.payload_bytes(), + ) { + Ok(json_value) => json_value, + Err(err) => { + error!("Failed to parse twin fragment value of {fragment_key} of {source} from the persistent entity store due to {err}"); + continue; + } + } + }; + + let twin_data = EntityTwinMessage::new( + source.clone(), + fragment_key, + fragment_value, + ); + if let Err(err) = self.update_twin_data(twin_data.clone()) { + error!("Failed to restore twin fragment: {twin_data:?} from the persistent entity store due to {err}"); + continue; + } + } + Channel::CommandMetadata { .. } => { + // Do nothing for now as supported operations are not part of the entity store + } + channel => { + warn!("Restoring messages on channel: {:?} not supported", channel) + } + } + } else { + warn!( + "Ignoring unsupported message retrieved from entity store: {:?}", + message + ); + } + } + } + } } /// Returns information about an entity under a given MQTT entity topic identifier. @@ -376,6 +444,7 @@ impl EntityStore { &mut self, message: EntityRegistrationMessage, ) -> Result, Error> { + let message_clone = message.clone(); debug!("Processing entity registration message, {:?}", message); let topic_id = message.topic_id.clone(); @@ -460,6 +529,11 @@ impl EntityStore { debug!("Updated entity map: {:?}", self.entities); debug!("Updated external id map: {:?}", self.entity_id_index); + if !affected_entities.is_empty() { + self.message_log + .append_message(&message_clone.to_mqtt_message(&self.mqtt_schema))? + } + Ok(affected_entities) } @@ -541,11 +615,11 @@ impl EntityStore { /// If the provided fragment already existed, `false` is returned. pub fn update_twin_data( &mut self, - entity_topic_id: &EntityTopicId, - fragment_key: String, - fragment_value: JsonValue, + twin_message: EntityTwinMessage, ) -> Result { - let entity = self.try_get_mut(entity_topic_id)?; + let fragment_key = twin_message.fragment_key.clone(); + let fragment_value = twin_message.fragment_value.clone(); + let entity = self.try_get_mut(&twin_message.topic_id)?; if fragment_value.is_null() { let existing = entity.twin_data.remove(&fragment_key); if existing.is_none() { @@ -560,6 +634,8 @@ impl EntityStore { } } + self.message_log + .append_message(&twin_message.to_mqtt_message(&self.mqtt_schema))?; Ok(true) } @@ -625,7 +701,7 @@ impl EntityMetadata { } /// Represents an error encountered while updating the store. -#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +#[derive(thiserror::Error, Debug)] pub enum Error { #[error("Specified parent {0:?} does not exist in the store")] NoParent(Box), @@ -649,6 +725,27 @@ pub enum Error { // TODO: remove this error variant when `EntityRegistrationMessage` is changed #[error("`EntityRegistrationMessage::other` field needs to be a Map")] EntityRegistrationOtherNotMap, + + #[error(transparent)] + FromStdIoError(#[from] std::io::Error), + + #[error(transparent)] + FromSerdeJson(#[from] serde_json::Error), +} + +#[derive(thiserror::Error, Debug)] +pub enum InitError { + #[error(transparent)] + FromError(#[from] Error), + + #[error(transparent)] + FromStdIoError(#[from] std::io::Error), + + #[error(transparent)] + FromSerdeJson(#[from] serde_json::Error), + + #[error("Initialization failed with: {0}")] + Custom(String), } /// An object representing a valid entity registration message. @@ -825,6 +922,33 @@ fn parse_entity_register_payload(payload: &[u8]) -> Option { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EntityTwinMessage { + topic_id: EntityTopicId, + fragment_key: String, + fragment_value: JsonValue, +} + +impl EntityTwinMessage { + pub fn new(topic_id: EntityTopicId, fragment_key: String, fragment_value: JsonValue) -> Self { + EntityTwinMessage { + topic_id, + fragment_key, + fragment_value, + } + } + + pub fn to_mqtt_message(self, mqtt_schema: &MqttSchema) -> Message { + let message_topic = mqtt_schema.topic_for( + &self.topic_id, + &Channel::EntityTwinData { + fragment_key: self.fragment_key, + }, + ); + Message::new(&message_topic, self.fragment_value.to_string()).with_retain() + } +} + #[cfg(test)] mod tests { use super::*; @@ -833,6 +957,7 @@ mod tests { use serde_json::json; use std::collections::HashSet; use std::str::FromStr; + use tempfile::TempDir; fn dummy_external_id_mapper( entity_topic_id: &EntityTopicId, @@ -886,7 +1011,8 @@ mod tests { #[test] fn registers_main_device() { - let store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let store = new_entity_store(&temp_dir); assert_eq!(store.main_device(), &EntityTopicId::default_main_device()); assert!(store.get(&EntityTopicId::default_main_device()).is_some()); @@ -894,7 +1020,8 @@ mod tests { #[test] fn lists_child_devices() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); // If the @parent info is not provided, it is assumed to be an immediate // child of the main device. @@ -931,7 +1058,8 @@ mod tests { #[test] fn lists_services() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); // Services are namespaced under devices, so `parent` is not necessary let updated_entities = store @@ -972,7 +1100,8 @@ mod tests { #[test] fn list_ancestors() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); // Assert no ancestors of main device assert!(store @@ -1078,7 +1207,8 @@ mod tests { #[test] fn list_ancestors_external_ids() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); // Assert ancestor external ids of main device assert!(store @@ -1188,7 +1318,8 @@ mod tests { #[test] fn auto_register_service() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let service_topic_id = EntityTopicId::default_child_service("child1", "service1").unwrap(); let res = store.auto_register_entity(&service_topic_id).unwrap(); @@ -1218,7 +1349,8 @@ mod tests { #[test] fn auto_register_child_device() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let child_topic_id = EntityTopicId::default_child_device("child2").unwrap(); let res = store.auto_register_entity(&child_topic_id).unwrap(); @@ -1237,7 +1369,8 @@ mod tests { #[test] fn auto_register_custom_topic_scheme_not_supported() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); assert_matches!( store.auto_register_entity(&EntityTopicId::from_str("custom/child2//").unwrap()), Err(Error::NonDefaultTopicScheme(_)) @@ -1246,7 +1379,8 @@ mod tests { #[test] fn register_main_device_custom_scheme() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); // Register main device with custom topic scheme let main_topic_id = EntityTopicId::from_str("custom/main//").unwrap(); @@ -1313,7 +1447,8 @@ mod tests { #[test] fn external_id_validation() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); let res = store.update(EntityRegistrationMessage { @@ -1325,22 +1460,21 @@ mod tests { }); // Assert service registered under main device with custom topic scheme - assert_eq!( - res, - Err(Error::InvalidExternalIdError(InvalidExternalIdError { - external_id: "bad+id".into(), - invalid_char: '+' - })) - ); + assert_matches!(res, Err(Error::InvalidExternalIdError(_))); } #[test] fn update_twin_data_new_fragment() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let topic_id = EntityTopicId::default_main_device(); let updated = store - .update_twin_data(&topic_id, "hardware".into(), json!({ "version": 5 })) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "hardware".into(), + json!({ "version": 5 }), + )) .unwrap(); assert!( updated, @@ -1356,15 +1490,24 @@ mod tests { #[test] fn update_twin_data_update_existing_fragment() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let topic_id = EntityTopicId::default_main_device(); let _ = store - .update_twin_data(&topic_id, "hardware".into(), json!({ "version": 5 })) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "hardware".into(), + json!({ "version": 5 }), + )) .unwrap(); let updated = store - .update_twin_data(&topic_id, "hardware".into(), json!({ "version": 6 })) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "hardware".into(), + json!({ "version": 6 }), + )) .unwrap(); assert!( updated, @@ -1380,16 +1523,25 @@ mod tests { #[test] fn update_twin_data_remove_fragment() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let topic_id = EntityTopicId::default_main_device(); let _ = store - .update_twin_data(&topic_id, "foo".into(), json!("bar")) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "foo".into(), + json!("bar"), + )) .unwrap(); let updated = store - .update_twin_data(&topic_id, "foo".into(), json!(null)) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "foo".into(), + json!(null), + )) .unwrap(); assert!( updated, @@ -1402,9 +1554,11 @@ mod tests { #[test] fn updated_registration_message_after_twin_updates() { + let temp_dir = tempfile::tempdir().unwrap(); // Create an entity store with main device having an explicit `name` fragment let topic_id = EntityTopicId::default_main_device(); - let mut store = EntityStore::with_main_device( + let mut store = EntityStore::with_main_device_and_default_service_type( + MqttSchema::default(), EntityRegistrationMessage { topic_id: topic_id.clone(), external_id: Some("test-device".into()), @@ -1415,14 +1569,21 @@ mod tests { .unwrap() .to_owned(), }, + "service".into(), dummy_external_id_mapper, dummy_external_id_sanitizer, + 5, + &temp_dir, ) .unwrap(); // Add some additional fragments to the device twin data let _ = store - .update_twin_data(&topic_id, "hardware".into(), json!({ "version": 5 })) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "hardware".into(), + json!({ "version": 5 }), + )) .unwrap(); // Update the name of the device with @@ -1457,8 +1618,73 @@ mod tests { ); } - fn new_entity_store() -> EntityStore { - EntityStore::with_main_device( + #[test] + fn duplicate_registration_message_ignored() { + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); + let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); + let reg_message = EntityRegistrationMessage { + topic_id: entity_topic_id.clone(), + r#type: EntityType::ChildDevice, + external_id: Some("child1".into()), + parent: None, + other: Map::new(), + }; + + store.update(reg_message.clone()).unwrap(); + let affected_entities = store.update(reg_message).unwrap(); + assert!(affected_entities.0.is_empty()); + } + + #[test] + fn entities_persisted_and_restored() { + let temp_dir = tempfile::tempdir().unwrap(); + + let topic_id = EntityTopicId::default_child_device("child1").unwrap(); + let reg_message = EntityRegistrationMessage { + topic_id: topic_id.clone(), + r#type: EntityType::ChildDevice, + external_id: Some("child1".into()), + parent: None, + other: Map::new(), + }; + + let twin_fragment_key = "foo".to_string(); + let twin_fragment_value = json!("bar"); + + { + let mut store = new_entity_store(&temp_dir); + store.update(reg_message.clone()).unwrap(); + store + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + twin_fragment_key.clone(), + twin_fragment_value.clone(), + )) + .unwrap(); + } + + { + // Reload the entity store using the same persistent file + let store = new_entity_store(&temp_dir); + let entity_metadata = store.get(&topic_id).unwrap(); + let mut expected_entity_metadata = + EntityMetadata::child_device("child1".into()).unwrap(); + expected_entity_metadata + .twin_data + .insert(twin_fragment_key.clone(), twin_fragment_value.clone()); + + assert_eq!(entity_metadata, &expected_entity_metadata); + assert_eq!( + entity_metadata.twin_data.get(&twin_fragment_key).unwrap(), + &twin_fragment_value + ); + } + } + + fn new_entity_store(temp_dir: &TempDir) -> EntityStore { + EntityStore::with_main_device_and_default_service_type( + MqttSchema::default(), EntityRegistrationMessage { topic_id: EntityTopicId::default_main_device(), external_id: Some("test-device".into()), @@ -1466,8 +1692,11 @@ mod tests { parent: None, other: Map::new(), }, + "service".into(), dummy_external_id_mapper, dummy_external_id_sanitizer, + 5, + temp_dir, ) .unwrap() } diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index bdf350eefe2..71788d40d17 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -7,6 +7,7 @@ pub mod event; pub mod group; pub mod health; pub mod measurement; +pub mod message_log; pub mod messages; pub mod mqtt_topics; pub mod parser; diff --git a/crates/core/tedge_api/src/message_log.rs b/crates/core/tedge_api/src/message_log.rs new file mode 100644 index 00000000000..1390008d86f --- /dev/null +++ b/crates/core/tedge_api/src/message_log.rs @@ -0,0 +1,117 @@ +use mqtt_channel::Message as MqttMessage; +use std::fs::File; +use std::fs::OpenOptions; +use std::io::BufRead; +use std::io::BufReader; +use std::io::BufWriter; +use std::io::Write; +use std::path::Path; + +const LOG_FILE_NAME: &str = "entity_store.jsonl"; + +/// A persistent append-only log of MQTT messages. +/// Each line is the JSON representation of that MQTT message. +/// The underlying file is a JSON lines file. +pub struct MqttMessageLog { + reader: BufReader, + writer: BufWriter, +} + +#[derive(thiserror::Error, Debug)] +pub enum LogEntryError { + #[error(transparent)] + FromStdIo(std::io::Error), + + #[error("Deserialization failed with {0} while parsing {1}")] + FromSerdeJson(#[source] serde_json::Error, String), +} + +impl MqttMessageLog { + pub fn new

(dir: P) -> Result + where + P: AsRef, + { + let file = OpenOptions::new() + .create(true) + .read(true) + .append(true) + .open(dir.as_ref().join(LOG_FILE_NAME))?; + + let reader = BufReader::new(file.try_clone()?); + let writer = BufWriter::new(file); + + Ok(MqttMessageLog { reader, writer }) + } + + /// Return the next MQTT message from the log + /// The reads start from the beginning of the file + /// and each read advances the file pointer to the next line + pub fn next_message(&mut self) -> Result, LogEntryError> { + let mut buffer = String::new(); + match self.reader.read_line(&mut buffer) { + Ok(bytes_read) if bytes_read > 0 => { + let message: MqttMessage = serde_json::from_str(&buffer) + .map_err(|err| LogEntryError::FromSerdeJson(err, buffer))?; + Ok(Some(message)) + } + Ok(_) => Ok(None), // EOF + Err(err) => Err(LogEntryError::FromStdIo(err)), + } + } + + /// Append the JSON representation of the given message to the log. + /// Each message is appended on a new line. + pub fn append_message(&mut self, message: &MqttMessage) -> Result<(), std::io::Error> { + let json_line = serde_json::to_string(message)?; + writeln!(self.writer, "{}", json_line)?; + self.writer.flush()?; + self.writer.get_ref().sync_all()?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::MqttMessageLog; + use mqtt_channel::Message; + use mqtt_channel::Topic; + use tempfile::tempdir; + + #[test] + fn test_append_and_retrieve() { + let temp_dir = tempdir().unwrap(); + + // Prepare some dummy messages + let mut messages = vec![]; + for i in 1..5 { + let message = Message::new( + &Topic::new(&format!("topic{i}")).unwrap(), + format!("payload{i}"), + ); + messages.push(message); + } + + // Populate the log + { + let mut message_log = MqttMessageLog::new(&temp_dir).unwrap(); + + assert_eq!(message_log.next_message().unwrap(), None); + + for message in messages.clone() { + message_log.append_message(&message).unwrap(); + } + } + + // Read from the log + { + // Reload the message log + let mut message_log = MqttMessageLog::new(&temp_dir).unwrap(); + + for message in messages { + assert_eq!(message_log.next_message().unwrap(), Some(message)); + } + // EOF -> None + assert_eq!(message_log.next_message().unwrap(), None); + } + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/actor.rs b/crates/extensions/c8y_mapper_ext/src/actor.rs index 7d4db8d6a23..84c7560cb72 100644 --- a/crates/extensions/c8y_mapper_ext/src/actor.rs +++ b/crates/extensions/c8y_mapper_ext/src/actor.rs @@ -396,6 +396,8 @@ impl C8yMapperBuilder { create_directory_with_defaults(config.ops_dir.clone())?; // Create directory for device custom fragments create_directory_with_defaults(config.config_dir.join("device"))?; + // Create directory for persistent entity store + create_directory_with_defaults(&config.db_dir)?; Ok(()) } } diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index e5b3a99fcfd..14c1031147e 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -29,6 +29,7 @@ pub struct C8yMapperConfig { pub config_dir: PathBuf, pub logs_path: Utf8PathBuf, pub data_dir: DataDir, + pub db_dir: Utf8PathBuf, pub device_id: String, pub device_topic_id: EntityTopicId, pub device_type: String, @@ -52,6 +53,7 @@ impl C8yMapperConfig { config_dir: PathBuf, logs_path: Utf8PathBuf, data_dir: DataDir, + db_dir: Utf8PathBuf, tmp_dir: Arc, device_id: String, device_topic_id: EntityTopicId, @@ -73,6 +75,7 @@ impl C8yMapperConfig { config_dir, logs_path, data_dir, + db_dir, device_id, device_topic_id, device_type, @@ -99,6 +102,7 @@ impl C8yMapperConfig { let logs_path = tedge_config.logs.path.clone(); let data_dir: DataDir = tedge_config.data.path.clone().into(); + let db_dir = tedge_config.db.path.clone(); let tmp_dir = tedge_config.tmp.path.clone().into(); let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string(); @@ -170,6 +174,7 @@ impl C8yMapperConfig { config_dir, logs_path, data_dir, + db_dir, tmp_dir, device_id, device_topic_id, diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 0a8ff74ff22..73244d37054 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -93,6 +93,7 @@ use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tedge_utils::file::create_directory_with_defaults; use tedge_utils::file::create_file_with_defaults; +use tedge_utils::file::FileError; use tedge_utils::size_threshold::SizeThreshold; use thiserror::Error; use time::format_description::well_known::Rfc3339; @@ -264,6 +265,7 @@ impl CumulocityConverter { Self::map_to_c8y_external_id, Self::validate_external_id, EARLY_MESSAGE_BUFFER_SIZE, + &config.db_dir, ) .unwrap(); @@ -919,6 +921,9 @@ pub enum CumulocityConverterBuildError { #[error(transparent)] OperationLogsError(#[from] OperationLogsError), + + #[error(transparent)] + FileError(#[from] FileError), } impl CumulocityConverter { @@ -3308,6 +3313,7 @@ pub(crate) mod tests { tmp_dir.to_path_buf(), tmp_dir.utf8_path_buf(), tmp_dir.utf8_path_buf().into(), + tmp_dir.utf8_path_buf(), tmp_dir.utf8_path_buf().into(), device_id, device_topic_id, diff --git a/crates/extensions/c8y_mapper_ext/src/inventory.rs b/crates/extensions/c8y_mapper_ext/src/inventory.rs index 12eb3c5de09..2ed65d1e42d 100644 --- a/crates/extensions/c8y_mapper_ext/src/inventory.rs +++ b/crates/extensions/c8y_mapper_ext/src/inventory.rs @@ -8,6 +8,7 @@ use serde_json::Value as JsonValue; use std::fs::File; use std::io::Read; use std::path::Path; +use tedge_api::entity_store::EntityTwinMessage; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; use tedge_mqtt_ext::Message; @@ -42,11 +43,11 @@ impl CumulocityConverter { if let JsonValue::Object(map) = inventory_base { for (key, value) in map { let main_device_tid = self.entity_store.main_device().clone(); - let _ = self.entity_store.update_twin_data( - &main_device_tid, + let _ = self.entity_store.update_twin_data(EntityTwinMessage::new( + main_device_tid.clone(), key.clone(), value.clone(), - )?; + ))?; let mapped_message = self.entity_twin_data_message(&main_device_tid, key.clone(), value.clone()); messages.push(mapped_message); @@ -88,11 +89,11 @@ impl CumulocityConverter { serde_json::from_slice::(message.payload_bytes())? }; - let updated = self.entity_store.update_twin_data( - source, + let updated = self.entity_store.update_twin_data(EntityTwinMessage::new( + source.clone(), fragment_key.into(), fragment_value.clone(), - )?; + ))?; if !updated { return Ok(vec![]); } diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index 63ac92b3b95..1dfb6e7f143 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -136,12 +136,17 @@ mod tests { c8y_monitor_payload.as_bytes(), ); + let temp_dir = tempfile::tempdir().unwrap(); let main_device_registration = EntityRegistrationMessage::main_device(device_name.to_string()); - let mut entity_store = EntityStore::with_main_device( + let mut entity_store = EntityStore::with_main_device_and_default_service_type( + MqttSchema::default(), main_device_registration, + "service".into(), crate::converter::CumulocityConverter::map_to_c8y_external_id, crate::converter::CumulocityConverter::validate_external_id, + 5, + &temp_dir, ) .unwrap(); diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 4af1208e880..26b1685d3ce 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2567,6 +2567,7 @@ pub(crate) async fn spawn_c8y_mapper_actor( config_dir.to_path_buf(), config_dir.utf8_path_buf(), config_dir.utf8_path_buf().into(), + config_dir.utf8_path_buf(), config_dir.utf8_path_buf().into(), device_name, device_topic_id, diff --git a/crates/extensions/tedge_mqtt_ext/src/lib.rs b/crates/extensions/tedge_mqtt_ext/src/lib.rs index 9b6ca1c3ddb..277a0bf005c 100644 --- a/crates/extensions/tedge_mqtt_ext/src/lib.rs +++ b/crates/extensions/tedge_mqtt_ext/src/lib.rs @@ -27,6 +27,7 @@ use tedge_actors::WrappedInput; pub type MqttConfig = mqtt_channel::Config; pub type MqttMessage = mqtt_channel::Message; +pub use mqtt_channel::DebugPayload; pub use mqtt_channel::Message; pub use mqtt_channel::MqttError; pub use mqtt_channel::QoS; diff --git a/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot b/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot index da6a69a6195..3adac1987ad 100644 --- a/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot +++ b/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot @@ -127,9 +127,10 @@ Register tedge-agent when tedge-mapper-c8y is not running #2389 Should Have MQTT Messages te/device/offlinechild1///cmd/restart/+ Early data messages cached and processed + [Teardown] Re-enable Auto-registration ${timestamp}= Get Unix Timestamp Execute Command sudo tedge config set c8y.entity_store.auto_register false - Restart Service tedge-mapper-c8y + Restart Service tedge-mapper-c8y Service Health Status Should Be Up tedge-mapper-c8y ${children}= Create List child0 child00 child01 child000 child0000 child00000 @@ -151,11 +152,31 @@ Early data messages cached and processed Device Should Have Fragments maintenance_mode END - Execute Command sudo tedge config unset c8y.entity_store.auto_register - Restart Service tedge-mapper-c8y + +Entities persisted and restored + Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc1/' '{"@type":"child-device","@id":"plc1"}' + Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc2/' '{"@type":"child-device","@id":"plc2"}' + Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc1/sensor1' '{"@type":"child-device","@parent":"factory1/shop1/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc1/sensor2' '{"@type":"child-device","@parent":"factory1/shop1/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc2/sensor1' '{"@type":"child-device","@parent":"factory1/shop1/plc2/"}' + Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc1/metrics' '{"@type":"service","@parent":"factory1/shop1/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory1/shop1/plc2/metrics' '{"@type":"service","@parent":"factory1/shop1/plc2/"}' + + ${timestamp}= Get Unix Timestamp + FOR ${counter} IN RANGE 0 5 + Restart Service tedge-mapper-c8y + Service Health Status Should Be Up tedge-mapper-c8y + END + + Should Have MQTT Messages c8y/s/us message_contains="101" date_from=${timestamp} minimum=0 maximum=0 + *** Keywords *** +Re-enable Auto-registration + Execute Command sudo tedge config unset c8y.entity_store.auto_register + Restart Service tedge-mapper-c8y + Check Child Device [Arguments] ${parent_sn} ${child_sn} ${child_name} ${child_type} ${child_mo}= Device Should Exist ${child_sn}